Major changes across backend, frontend, infrastructure: - Provider system with model selection (Google, DeepL, OpenAI, Ollama, Google Cloud) - Admin panel: user management, pricing, settings - Glossary system with CSV import/export - Subscription and tier quota management - Security hardening (rate limiting, API key auth, path traversal fixes) - Docker compose for dev, prod, and IONOS deployment - Alembic migrations for new tables - Frontend: dashboard, pricing page, landing page, i18n (en/fr) - Test suite and verification scripts Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
238 lines
7.7 KiB
Python
238 lines
7.7 KiB
Python
"""
|
|
API Key Authentication Middleware
|
|
|
|
Provides reusable dependencies for API key authentication across all endpoints.
|
|
Story 3.4: Authentification API via X-API-Key
|
|
Story 6.4: Bind user_id to structlog context when user is resolved (logs include user_id).
|
|
"""
|
|
|
|
from typing import Optional, Any, Union
|
|
from fastapi import Header, Depends
|
|
from fastapi.responses import JSONResponse
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
|
|
security = HTTPBearer(auto_error=False)
|
|
|
|
|
|
class APIKeyError(Exception):
|
|
"""Exception for API key authentication errors with structured error codes."""
|
|
|
|
INVALID_API_KEY = "INVALID_API_KEY"
|
|
API_KEY_REVOKED = "API_KEY_REVOKED"
|
|
API_KEY_EXPIRED = "API_KEY_EXPIRED"
|
|
MISSING_API_KEY = "MISSING_API_KEY"
|
|
UNAUTHORIZED = "UNAUTHORIZED"
|
|
|
|
ERROR_MESSAGES = {
|
|
INVALID_API_KEY: "Clé API invalide ou non reconnue.",
|
|
API_KEY_REVOKED: "Cette clé API a été révoquée.",
|
|
API_KEY_EXPIRED: "Cette clé API a expiré.",
|
|
MISSING_API_KEY: "Clé API requise pour cet endpoint.",
|
|
UNAUTHORIZED: "Authentification requise. Utilisez X-API-Key ou Authorization: Bearer.",
|
|
}
|
|
|
|
def __init__(self, code: str, message: Optional[str] = None):
|
|
self.code = code
|
|
self.message = message or self.ERROR_MESSAGES.get(code, "Erreur d'authentification")
|
|
super().__init__(self.message)
|
|
|
|
def to_response(self, status_code: int = 401) -> JSONResponse:
|
|
"""Convert to JSONResponse for FastAPI."""
|
|
return JSONResponse(
|
|
status_code=status_code,
|
|
content={
|
|
"error": self.code,
|
|
"message": self.message,
|
|
},
|
|
)
|
|
|
|
|
|
def _raise_api_key_error(code: str, message: Optional[str] = None) -> None:
|
|
"""Raise an APIKeyError and convert it to JSONResponse for FastAPI."""
|
|
raise APIKeyError(code, message)
|
|
|
|
|
|
async def get_user_from_api_key(
|
|
x_api_key: Optional[str] = Header(None, alias="X-API-Key"),
|
|
) -> Optional[Any]:
|
|
"""
|
|
Get user from X-API-Key header if provided.
|
|
|
|
Returns:
|
|
User object if valid API key provided
|
|
None if no API key provided (caller should try other auth methods)
|
|
|
|
Raises:
|
|
APIKeyError: With structured error code if API key is invalid/revoked/expired
|
|
"""
|
|
if not x_api_key:
|
|
return None
|
|
|
|
try:
|
|
from services.auth_service import get_user_by_api_key
|
|
from core.logging import bind_request_context
|
|
|
|
user = get_user_by_api_key(x_api_key)
|
|
if user is not None:
|
|
bind_request_context(user_id=str(getattr(user, "id", user)))
|
|
return user
|
|
|
|
except ValueError as e:
|
|
# Handle revoked/expired API keys with specific error codes
|
|
error_code = str(e)
|
|
|
|
if error_code == "API_KEY_REVOKED":
|
|
raise APIKeyError("API_KEY_REVOKED", "Cette clé API a été révoquée.")
|
|
elif error_code == "API_KEY_EXPIRED":
|
|
raise APIKeyError("API_KEY_EXPIRED", "Cette clé API a expiré.")
|
|
else:
|
|
# Unknown error - treat as invalid
|
|
raise APIKeyError("INVALID_API_KEY", "Clé API invalide ou non reconnue.")
|
|
|
|
except Exception:
|
|
# Unexpected error - treat as invalid
|
|
raise APIKeyError("INVALID_API_KEY", "Clé API invalide ou non reconnue.")
|
|
|
|
|
|
async def get_current_user_optional(
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
|
|
) -> Optional[Any]:
|
|
"""Get current user if authenticated via JWT, None otherwise."""
|
|
if not credentials:
|
|
return None
|
|
try:
|
|
from routes.auth_routes import get_current_user
|
|
|
|
user = await get_current_user(credentials)
|
|
return user
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
async def get_authenticated_user_optional(
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
|
|
x_api_key: Optional[str] = Header(None, alias="X-API-Key"),
|
|
) -> Optional[Any]:
|
|
"""
|
|
Get authenticated user from API key or JWT (optional - returns None if not authenticated).
|
|
|
|
Priority:
|
|
1. X-API-Key header (automation users)
|
|
2. JWT Bearer token (web users)
|
|
3. None (unauthenticated)
|
|
|
|
Returns:
|
|
User object if authenticated, None otherwise (never raises for auth failures)
|
|
"""
|
|
# Try API key first (priority for automation)
|
|
if x_api_key:
|
|
try:
|
|
user = await get_user_from_api_key(x_api_key)
|
|
if user:
|
|
from core.logging import bind_request_context
|
|
bind_request_context(user_id=str(getattr(user, "id", user)))
|
|
return user
|
|
except APIKeyError:
|
|
# Invalid API key, fall through to JWT
|
|
pass
|
|
|
|
# Fall back to JWT
|
|
if credentials:
|
|
try:
|
|
from routes.auth_routes import get_current_user
|
|
|
|
user = await get_current_user(credentials)
|
|
if user:
|
|
from core.logging import bind_request_context
|
|
bind_request_context(user_id=str(getattr(user, "id", user)))
|
|
return user
|
|
except Exception:
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
async def get_authenticated_user(
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
|
|
x_api_key: Optional[str] = Header(None, alias="X-API-Key"),
|
|
) -> Optional[Any]:
|
|
"""
|
|
Get authenticated user from API key or JWT.
|
|
|
|
Priority:
|
|
1. X-API-Key header (automation users)
|
|
2. JWT Bearer token (web users)
|
|
3. None (unauthenticated)
|
|
|
|
Returns:
|
|
User object if authenticated
|
|
None if not authenticated
|
|
|
|
Raises:
|
|
APIKeyError: If API key is provided but invalid/revoked/expired
|
|
"""
|
|
# Try API key first (priority for automation)
|
|
if x_api_key:
|
|
# get_user_from_api_key will raise APIKeyError for invalid keys
|
|
user = await get_user_from_api_key(x_api_key)
|
|
if user:
|
|
from core.logging import bind_request_context
|
|
bind_request_context(user_id=str(getattr(user, "id", user)))
|
|
return user
|
|
# Should not reach here - get_user_from_api_key returns None only if no key provided
|
|
raise APIKeyError("INVALID_API_KEY", "Clé API invalide ou non reconnue.")
|
|
|
|
# Fall back to JWT
|
|
if credentials:
|
|
try:
|
|
from routes.auth_routes import get_current_user
|
|
|
|
user = await get_current_user(credentials)
|
|
if user:
|
|
from core.logging import bind_request_context
|
|
bind_request_context(user_id=str(getattr(user, "id", user)))
|
|
return user
|
|
except Exception:
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
async def require_authenticated_user(
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
|
|
x_api_key: Optional[str] = Header(None, alias="X-API-Key"),
|
|
) -> Any:
|
|
"""
|
|
Require authentication (API key or JWT).
|
|
|
|
Raises:
|
|
APIKeyError: 401 if not authenticated
|
|
|
|
Returns:
|
|
User object (guaranteed to be authenticated)
|
|
"""
|
|
user = await get_authenticated_user(credentials, x_api_key)
|
|
|
|
if not user:
|
|
raise APIKeyError("MISSING_API_KEY", "Authentification requise. Utilisez X-API-Key ou Authorization: Bearer.")
|
|
|
|
from core.logging import bind_request_context
|
|
bind_request_context(user_id=str(getattr(user, "id", user)))
|
|
return user
|
|
|
|
|
|
async def require_api_key(
|
|
x_api_key: str = Header(..., alias="X-API-Key"),
|
|
) -> Any:
|
|
"""
|
|
Require API key authentication (no JWT fallback).
|
|
|
|
Use this for endpoints that MUST use API key (e.g., certain automation endpoints).
|
|
|
|
Raises:
|
|
APIKeyError: 401 if API key is missing, invalid, revoked, or expired
|
|
|
|
Returns:
|
|
User object (guaranteed to be authenticated via API key)
|
|
"""
|
|
return await get_user_from_api_key(x_api_key) |