Files
office_translator/middleware/api_key_auth.py
Sepehr Ramezani 26bd096a06 feat: production deployment - full update with providers, admin, glossaries, pricing, tests
Major changes across backend, frontend, infrastructure:
- Provider system with model selection (Google, DeepL, OpenAI, Ollama, Google Cloud)
- Admin panel: user management, pricing, settings
- Glossary system with CSV import/export
- Subscription and tier quota management
- Security hardening (rate limiting, API key auth, path traversal fixes)
- Docker compose for dev, prod, and IONOS deployment
- Alembic migrations for new tables
- Frontend: dashboard, pricing page, landing page, i18n (en/fr)
- Test suite and verification scripts

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-04-25 15:01:47 +02:00

238 lines
7.7 KiB
Python

"""
API Key Authentication Middleware
Provides reusable dependencies for API key authentication across all endpoints.
Story 3.4: Authentification API via X-API-Key
Story 6.4: Bind user_id to structlog context when user is resolved (logs include user_id).
"""
from typing import Optional, Any, Union
from fastapi import Header, Depends
from fastapi.responses import JSONResponse
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
security = HTTPBearer(auto_error=False)
class APIKeyError(Exception):
"""Exception for API key authentication errors with structured error codes."""
INVALID_API_KEY = "INVALID_API_KEY"
API_KEY_REVOKED = "API_KEY_REVOKED"
API_KEY_EXPIRED = "API_KEY_EXPIRED"
MISSING_API_KEY = "MISSING_API_KEY"
UNAUTHORIZED = "UNAUTHORIZED"
ERROR_MESSAGES = {
INVALID_API_KEY: "Clé API invalide ou non reconnue.",
API_KEY_REVOKED: "Cette clé API a été révoquée.",
API_KEY_EXPIRED: "Cette clé API a expiré.",
MISSING_API_KEY: "Clé API requise pour cet endpoint.",
UNAUTHORIZED: "Authentification requise. Utilisez X-API-Key ou Authorization: Bearer.",
}
def __init__(self, code: str, message: Optional[str] = None):
self.code = code
self.message = message or self.ERROR_MESSAGES.get(code, "Erreur d'authentification")
super().__init__(self.message)
def to_response(self, status_code: int = 401) -> JSONResponse:
"""Convert to JSONResponse for FastAPI."""
return JSONResponse(
status_code=status_code,
content={
"error": self.code,
"message": self.message,
},
)
def _raise_api_key_error(code: str, message: Optional[str] = None) -> None:
"""Raise an APIKeyError and convert it to JSONResponse for FastAPI."""
raise APIKeyError(code, message)
async def get_user_from_api_key(
x_api_key: Optional[str] = Header(None, alias="X-API-Key"),
) -> Optional[Any]:
"""
Get user from X-API-Key header if provided.
Returns:
User object if valid API key provided
None if no API key provided (caller should try other auth methods)
Raises:
APIKeyError: With structured error code if API key is invalid/revoked/expired
"""
if not x_api_key:
return None
try:
from services.auth_service import get_user_by_api_key
from core.logging import bind_request_context
user = get_user_by_api_key(x_api_key)
if user is not None:
bind_request_context(user_id=str(getattr(user, "id", user)))
return user
except ValueError as e:
# Handle revoked/expired API keys with specific error codes
error_code = str(e)
if error_code == "API_KEY_REVOKED":
raise APIKeyError("API_KEY_REVOKED", "Cette clé API a été révoquée.")
elif error_code == "API_KEY_EXPIRED":
raise APIKeyError("API_KEY_EXPIRED", "Cette clé API a expiré.")
else:
# Unknown error - treat as invalid
raise APIKeyError("INVALID_API_KEY", "Clé API invalide ou non reconnue.")
except Exception:
# Unexpected error - treat as invalid
raise APIKeyError("INVALID_API_KEY", "Clé API invalide ou non reconnue.")
async def get_current_user_optional(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
) -> Optional[Any]:
"""Get current user if authenticated via JWT, None otherwise."""
if not credentials:
return None
try:
from routes.auth_routes import get_current_user
user = await get_current_user(credentials)
return user
except Exception:
return None
async def get_authenticated_user_optional(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
x_api_key: Optional[str] = Header(None, alias="X-API-Key"),
) -> Optional[Any]:
"""
Get authenticated user from API key or JWT (optional - returns None if not authenticated).
Priority:
1. X-API-Key header (automation users)
2. JWT Bearer token (web users)
3. None (unauthenticated)
Returns:
User object if authenticated, None otherwise (never raises for auth failures)
"""
# Try API key first (priority for automation)
if x_api_key:
try:
user = await get_user_from_api_key(x_api_key)
if user:
from core.logging import bind_request_context
bind_request_context(user_id=str(getattr(user, "id", user)))
return user
except APIKeyError:
# Invalid API key, fall through to JWT
pass
# Fall back to JWT
if credentials:
try:
from routes.auth_routes import get_current_user
user = await get_current_user(credentials)
if user:
from core.logging import bind_request_context
bind_request_context(user_id=str(getattr(user, "id", user)))
return user
except Exception:
pass
return None
async def get_authenticated_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
x_api_key: Optional[str] = Header(None, alias="X-API-Key"),
) -> Optional[Any]:
"""
Get authenticated user from API key or JWT.
Priority:
1. X-API-Key header (automation users)
2. JWT Bearer token (web users)
3. None (unauthenticated)
Returns:
User object if authenticated
None if not authenticated
Raises:
APIKeyError: If API key is provided but invalid/revoked/expired
"""
# Try API key first (priority for automation)
if x_api_key:
# get_user_from_api_key will raise APIKeyError for invalid keys
user = await get_user_from_api_key(x_api_key)
if user:
from core.logging import bind_request_context
bind_request_context(user_id=str(getattr(user, "id", user)))
return user
# Should not reach here - get_user_from_api_key returns None only if no key provided
raise APIKeyError("INVALID_API_KEY", "Clé API invalide ou non reconnue.")
# Fall back to JWT
if credentials:
try:
from routes.auth_routes import get_current_user
user = await get_current_user(credentials)
if user:
from core.logging import bind_request_context
bind_request_context(user_id=str(getattr(user, "id", user)))
return user
except Exception:
pass
return None
async def require_authenticated_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
x_api_key: Optional[str] = Header(None, alias="X-API-Key"),
) -> Any:
"""
Require authentication (API key or JWT).
Raises:
APIKeyError: 401 if not authenticated
Returns:
User object (guaranteed to be authenticated)
"""
user = await get_authenticated_user(credentials, x_api_key)
if not user:
raise APIKeyError("MISSING_API_KEY", "Authentification requise. Utilisez X-API-Key ou Authorization: Bearer.")
from core.logging import bind_request_context
bind_request_context(user_id=str(getattr(user, "id", user)))
return user
async def require_api_key(
x_api_key: str = Header(..., alias="X-API-Key"),
) -> Any:
"""
Require API key authentication (no JWT fallback).
Use this for endpoints that MUST use API key (e.g., certain automation endpoints).
Raises:
APIKeyError: 401 if API key is missing, invalid, revoked, or expired
Returns:
User object (guaranteed to be authenticated via API key)
"""
return await get_user_from_api_key(x_api_key)