""" Admin API v1 Endpoints All admin endpoints under /api/v1/admin/ Story 3.5: API Versioning - Migrated from main.py """ import os import secrets import time import logging from datetime import datetime, timezone from typing import Optional, Literal from fastapi import APIRouter, Depends, Header, HTTPException, Form, Request, Query from fastapi.responses import JSONResponse from passlib.context import CryptContext from pydantic import BaseModel, Field from config import config from models.subscription import PlanType, PLANS from services import pricing_config as pricing_cfg pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1/admin", tags=["Admin"]) ADMIN_USERNAME = os.getenv("ADMIN_USERNAME") ADMIN_PASSWORD_HASH = os.getenv("ADMIN_PASSWORD_HASH") ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD") if not ADMIN_PASSWORD_HASH and not ADMIN_PASSWORD: ADMIN_PASSWORD = os.getenv("ADMIN_DEV_DEFAULT") _admin_token_secret = os.getenv("ADMIN_TOKEN_SECRET") if not _admin_token_secret: _admin_token_secret = secrets.token_hex(32) logger.critical( "SECURITY: ADMIN_TOKEN_SECRET is not configured! Using an ephemeral random key. " "ALL ADMIN SESSIONS WILL BE INVALIDATED ON EVERY RESTART. " "Set ADMIN_TOKEN_SECRET in your .env file immediately." ) ADMIN_TOKEN_SECRET = _admin_token_secret REDIS_URL = os.getenv("REDIS_URL", "") _redis_client = None _memory_sessions: dict = {} # Brute-force protection: IP → (failed_count, first_fail_ts) _login_attempts: dict[str, tuple[int, float]] = {} _MAX_LOGIN_ATTEMPTS = 5 _LOCKOUT_SECONDS = 300 # 5 minutes def get_redis_client(): """Return shared sync Redis client from core.redis, or None.""" from core.redis import get_sync_redis return get_sync_redis() def hash_password(password: str) -> str: return pwd_context.hash(password) def verify_admin_password(password: str) -> bool: if not ADMIN_PASSWORD_HASH and not ADMIN_PASSWORD: return False p = (password or "").strip() if ADMIN_PASSWORD_HASH: try: return pwd_context.verify(p, ADMIN_PASSWORD_HASH) except Exception: return False return p == (ADMIN_PASSWORD or "").strip() def _get_session_key(token: str) -> str: return f"admin_session:{token}" def create_admin_token() -> str: token = secrets.token_urlsafe(32) expiry = int(time.time()) + (24 * 60 * 60) redis_client = get_redis_client() if redis_client: try: redis_client.setex(_get_session_key(token), 24 * 60 * 60, str(expiry)) except Exception as e: logger.warning(f"Redis session save failed: {e}") _memory_sessions[token] = expiry else: _memory_sessions[token] = expiry return token def verify_admin_token(token: str) -> bool: redis_client = get_redis_client() if redis_client: try: expiry = redis_client.get(_get_session_key(token)) if expiry and int(expiry) > time.time(): return True return False except Exception as e: logger.warning(f"Redis session check failed: {e}") if token not in _memory_sessions: return False if time.time() > _memory_sessions[token]: del _memory_sessions[token] return False return True def delete_admin_token(token: str): redis_client = get_redis_client() if redis_client: try: redis_client.delete(_get_session_key(token)) except Exception: pass if token in _memory_sessions: del _memory_sessions[token] async def require_admin(authorization: Optional[str] = Header(None)) -> str: if not ADMIN_USERNAME or (not ADMIN_PASSWORD_HASH and not ADMIN_PASSWORD): raise HTTPException( status_code=503, detail="Admin authentication not configured" ) if not authorization: raise HTTPException(status_code=401, detail="Authorization header required") parts = authorization.split(" ") if len(parts) != 2 or parts[0].lower() != "bearer": raise HTTPException( status_code=401, detail="Invalid authorization format. Use: Bearer " ) token = parts[1] if not verify_admin_token(token): raise HTTPException(status_code=401, detail="Token invalide ou expiré") return ADMIN_USERNAME class AdminLoginRequest(BaseModel): password: str class AdminUpdateUserTierRequest(BaseModel): plan: Literal["free", "starter", "pro", "business", "enterprise"] class AdminResetPasswordRequest(BaseModel): new_password: str = Field(..., min_length=8) def _plan_info_for_admin(plan_raw) -> dict: """Résout PLANS[...] à partir d'une valeur SQLAlchemy, string ou enum.""" try: if isinstance(plan_raw, str): pt = PlanType(plan_raw.lower()) elif hasattr(plan_raw, "value"): pt = PlanType(plan_raw.value) else: pt = PlanType(plan_raw) except (ValueError, KeyError, TypeError): pt = PlanType.FREE return PLANS.get(pt, PLANS[PlanType.FREE]) def _subscription_status_str(raw) -> str: if raw is None: return "active" return raw.value if hasattr(raw, "value") else str(raw) def _get_client_ip(request: Request) -> str: """Get real client IP from headers or connection""" forwarded = request.headers.get("X-Forwarded-For") if forwarded: return forwarded.split(",")[0].strip() real_ip = request.headers.get("X-Real-IP") if real_ip: return real_ip if request.client: return request.client.host return "unknown" @router.post("/login") async def admin_login(request: AdminLoginRequest, req: Request): """Admin login endpoint - Returns a bearer token for authenticated admin access""" client_ip = _get_client_ip(req) # Brute-force protection now = time.time() attempts, first_fail = _login_attempts.get(client_ip, (0, now)) if attempts >= _MAX_LOGIN_ATTEMPTS: elapsed = now - first_fail if elapsed < _LOCKOUT_SECONDS: remaining = int(_LOCKOUT_SECONDS - elapsed) logger.warning(f"Admin login blocked (brute-force) for IP {client_ip}") raise HTTPException( status_code=429, detail=f"Too many failed attempts. Try again in {remaining}s.", headers={"Retry-After": str(remaining)}, ) else: _login_attempts.pop(client_ip, None) if not verify_admin_password(request.password): count, first = _login_attempts.get(client_ip, (0, now)) _login_attempts[client_ip] = (count + 1, first if count > 0 else now) logger.warning(f"Failed admin login attempt from {client_ip} ({count + 1}/{_MAX_LOGIN_ATTEMPTS})") raise HTTPException(status_code=401, detail="Invalid credentials") _login_attempts.pop(client_ip, None) token = create_admin_token() logger.info(f"Admin login successful from {client_ip}") return { "status": "success", "access_token": token, "token_type": "bearer", "expires_in": 86400, "message": "Login successful", } @router.post("/logout") async def admin_logout(authorization: Optional[str] = Header(None)): """Logout and invalidate admin token""" if authorization: parts = authorization.split(" ") if len(parts) == 2 and parts[0].lower() == "bearer": token = parts[1] delete_admin_token(token) logger.info("Admin logout successful") return {"status": "success", "message": "Logged out"} @router.get("/verify") async def verify_admin_session(admin_id: str = Depends(require_admin)): """Verify admin token is still valid""" return {"status": "valid", "authenticated": True} @router.get("/dashboard") async def get_admin_dashboard(admin_id: str = Depends(require_admin)): """Get comprehensive admin dashboard data""" from middleware.cleanup import create_cleanup_manager from middleware.rate_limiting import RateLimitManager, RateLimitConfig from services.translation_service import _translation_cache cleanup_manager = create_cleanup_manager(config) rate_limit_config = RateLimitConfig( requests_per_minute=int(os.getenv("RATE_LIMIT_PER_MINUTE", "30")), requests_per_hour=int(os.getenv("RATE_LIMIT_PER_HOUR", "200")), translations_per_minute=int(os.getenv("TRANSLATIONS_PER_MINUTE", "10")), translations_per_hour=int(os.getenv("TRANSLATIONS_PER_HOUR", "50")), max_concurrent_translations=int(os.getenv("MAX_CONCURRENT_TRANSLATIONS", "5")), ) rate_limit_manager = RateLimitManager(rate_limit_config) health_status = { "status": "healthy", "timestamp": datetime.now(timezone.utc).isoformat(), } cleanup_stats = cleanup_manager.get_stats() rate_limit_stats = rate_limit_manager.get_stats() tracked_files = cleanup_manager.get_tracked_files() providers_status = {} try: from services.providers.google_provider import get_google_provider google_health = get_google_provider().health_check() providers_status["google"] = google_health.model_dump() except Exception as e: providers_status["google"] = { "name": "google", "available": False, "error": str(e)[:100], "last_check": None, } return { "timestamp": health_status.get("timestamp"), "status": health_status.get("status"), "system": {"memory": {}, "disk": {}}, "providers": providers_status, "cleanup": {**cleanup_stats, "tracked_files_count": len(tracked_files)}, "rate_limits": rate_limit_stats, "config": { "max_file_size_mb": config.MAX_FILE_SIZE_MB, "supported_extensions": list(config.SUPPORTED_EXTENSIONS), "translation_service": config.TRANSLATION_SERVICE, }, } @router.get("/users") async def get_admin_users(admin_id: str = Depends(require_admin)): """Liste tous les utilisateurs (base SQLite/PostgreSQL + comptes uniquement dans users.json).""" from services.auth_service import USE_DATABASE, DATABASE_AVAILABLE, load_users from database.connection import get_sync_session from database.models import ApiKey users_list = [] seen_ids: set[str] = set() seen_emails: set[str] = set() with get_sync_session() as session: if USE_DATABASE and DATABASE_AVAILABLE: from database.models import User as DBUser db_users = session.query(DBUser).order_by(DBUser.created_at.desc()).all() for db_user in db_users: plan_val = db_user.plan plan_str = ( plan_val.value if hasattr(plan_val, "value") else (str(plan_val) if plan_val else "free") ) plan_info = _plan_info_for_admin(plan_val) active_api_keys = ( session.query(ApiKey) .filter(ApiKey.user_id == db_user.id, ApiKey.is_active == True) .all() ) row = { "id": str(db_user.id), "email": db_user.email or "", "name": db_user.name or "", "plan": plan_str, "subscription_status": _subscription_status_str( db_user.subscription_status ), "docs_translated_this_month": db_user.docs_translated_this_month or 0, "pages_translated_this_month": db_user.pages_translated_this_month or 0, "extra_credits": db_user.extra_credits or 0, "created_at": db_user.created_at.isoformat() if db_user.created_at else "", "plan_limits": { "docs_per_month": plan_info.get("docs_per_month", 0), "max_pages_per_doc": plan_info.get("max_pages_per_doc", 0), }, "api_keys_count": len(active_api_keys), "api_key_ids": [key.id for key in active_api_keys], "storage": "database", } users_list.append(row) seen_ids.add(row["id"]) if row["email"]: seen_emails.add(row["email"].lower()) # Comptes uniquement dans data/users.json (legacy) — absents de la table users users_data = load_users() for user_id, user_data in users_data.items(): email = (user_data.get("email") or "").strip().lower() if user_id in seen_ids or (email and email in seen_emails): continue plan_raw = user_data.get("plan", "free") plan_str = ( plan_raw.value if hasattr(plan_raw, "value") else str(plan_raw) ) plan_info = _plan_info_for_admin(plan_str) active_api_keys = ( session.query(ApiKey) .filter(ApiKey.user_id == user_id, ApiKey.is_active == True) .all() ) users_list.append( { "id": user_id, "email": user_data.get("email", ""), "name": user_data.get("name", ""), "plan": plan_str, "subscription_status": str( user_data.get("subscription_status", "active") ), "docs_translated_this_month": user_data.get( "docs_translated_this_month", 0 ), "pages_translated_this_month": user_data.get( "pages_translated_this_month", 0 ), "extra_credits": user_data.get("extra_credits", 0), "created_at": user_data.get("created_at", ""), "plan_limits": { "docs_per_month": plan_info.get("docs_per_month", 0), "max_pages_per_doc": plan_info.get("max_pages_per_doc", 0), }, "api_keys_count": len(active_api_keys), "api_key_ids": [key.id for key in active_api_keys], "storage": "json_file", } ) seen_ids.add(user_id) if email: seen_emails.add(email) users_list.sort(key=lambda x: x.get("created_at", ""), reverse=True) return {"total": len(users_list), "users": users_list} @router.patch("/users/{user_id}") async def patch_admin_user_tier( user_id: str, body: AdminUpdateUserTierRequest, admin_id: str = Depends(require_admin), ): """Update a user's plan/tier - Admin only""" from services.auth_service import get_user_by_id, update_user_plan user = get_user_by_id(user_id) if not user: return JSONResponse( status_code=404, content={"error": "NOT_FOUND", "message": "User not found"}, ) updated = update_user_plan(user_id, body.plan) if not updated: return JSONResponse( status_code=400, content={ "error": "INVALID_PLAN", "message": "Invalid plan. Allowed: free, starter, pro, business, enterprise", "details": { "allowed": ["free", "starter", "pro", "business", "enterprise"] }, }, ) plan_value = ( updated.plan.value if hasattr(updated.plan, "value") else str(updated.plan) ) new_tier = ( "pro" if updated.plan in (PlanType.PRO, PlanType.BUSINESS, PlanType.ENTERPRISE) else "free" ) logger.info( "admin_tier_change", extra={ "event": "admin_tier_change", "target_user_id": user_id, "new_tier": new_tier, "new_plan": plan_value, "admin_id": "admin_session", "timestamp": datetime.now(timezone.utc).isoformat(), }, ) return { "data": { "id": updated.id, "email": updated.email, "name": getattr(updated, "name", ""), "plan": plan_value, "tier": new_tier, }, "meta": {}, } @router.post("/users/{user_id}/password") async def admin_reset_user_password( user_id: str, body: AdminResetPasswordRequest, admin_id: str = Depends(require_admin), ): """Définit un nouveau mot de passe pour un utilisateur (sans email de réinitialisation).""" from services.auth_service import admin_set_user_password try: user = admin_set_user_password(user_id, body.new_password) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) from e if not user: return JSONResponse( status_code=404, content={ "error": "NOT_FOUND", "message": "Utilisateur introuvable (id inconnu en base et dans users.json).", }, ) logger.info("admin_password_reset for user_id=%s", user_id) return { "data": {"id": user.id, "email": user.email}, "meta": {}, } @router.get("/stats") async def get_admin_stats(admin_id: str = Depends(require_admin)): """Get comprehensive admin statistics""" from services.auth_service import USE_DATABASE, DATABASE_AVAILABLE, load_users from services.translation_service import _translation_cache users_data = load_users() plan_distribution: dict = {} total_docs_translated = 0 total_pages_translated = 0 active_users = 0 if USE_DATABASE and DATABASE_AVAILABLE: from database.connection import get_sync_session from database.models import User as DBUser with get_sync_session() as session: db_rows = session.query(DBUser).all() db_ids = {str(u.id) for u in db_rows} db_emails = {(u.email or "").strip().lower() for u in db_rows} for u in db_rows: pv = u.plan.value if hasattr(u.plan, "value") else str(u.plan) plan_distribution[pv] = plan_distribution.get(pv, 0) + 1 docs = u.docs_translated_this_month or 0 pages = u.pages_translated_this_month or 0 total_docs_translated += docs total_pages_translated += pages if docs > 0: active_users += 1 json_only = 0 for uid, ud in users_data.items(): if uid in db_ids: continue em = (ud.get("email") or "").strip().lower() if em and em in db_emails: continue json_only += 1 pr = ud.get("plan", "free") pv = pr.value if hasattr(pr, "value") else str(pr) plan_distribution[pv] = plan_distribution.get(pv, 0) + 1 docs = int(ud.get("docs_translated_this_month", 0) or 0) pages = int(ud.get("pages_translated_this_month", 0) or 0) total_docs_translated += docs total_pages_translated += pages if docs > 0: active_users += 1 total_users = len(db_ids) + json_only else: total_users = len(users_data) for user_data in users_data.values(): plan = user_data.get("plan", "free") pv = plan.value if hasattr(plan, "value") else str(plan) plan_distribution[pv] = plan_distribution.get(pv, 0) + 1 docs = user_data.get("docs_translated_this_month", 0) pages = user_data.get("pages_translated_this_month", 0) total_docs_translated += docs total_pages_translated += pages if docs > 0: active_users += 1 cache_stats = _translation_cache.get_stats() return { "users": { "total": total_users, "active_this_month": active_users, "by_plan": plan_distribution, }, "translations": { "docs_this_month": total_docs_translated, "pages_this_month": total_pages_translated, }, "cache": cache_stats, "config": { "translation_service": config.TRANSLATION_SERVICE, "max_file_size_mb": config.MAX_FILE_SIZE_MB, "supported_extensions": list(config.SUPPORTED_EXTENSIONS), }, } @router.post("/cleanup/trigger") async def trigger_cleanup(admin_id: str = Depends(require_admin)): """Trigger manual cleanup of expired files""" from middleware.cleanup import create_cleanup_manager cleanup_manager = create_cleanup_manager(config) try: cleaned = await cleanup_manager.cleanup_expired() return { "status": "success", "files_cleaned": cleaned, "message": f"Cleaned up {cleaned} expired files", } except Exception as e: logger.error(f"Manual cleanup failed: {str(e)}") raise HTTPException(status_code=500, detail=f"Cleanup failed: {str(e)}") @router.get("/files/tracked") async def get_tracked_files(admin_id: str = Depends(require_admin)): """Get list of currently tracked files""" from middleware.cleanup import create_cleanup_manager cleanup_manager = create_cleanup_manager(config) tracked = cleanup_manager.get_tracked_files() return {"count": len(tracked), "files": tracked} @router.post("/quota/reset") async def reset_translation_quotas(admin_id: str = Depends(require_admin)): """Reset monthly translation quotas for all free-tier users. Clears Redis keys matching quota:monthly:* """ try: from core.redis import get_async_redis redis_client = get_async_redis() if not redis_client: return {"status": "skipped", "message": "Redis not available, quotas are in-memory only"} # Find all monthly quota keys keys = [] async for key in redis_client.scan_iter(match="quota:monthly:*"): keys.append(key) if keys: deleted = await redis_client.delete(*keys) logger.info("admin_quota_reset", keys_deleted=deleted) return {"status": "success", "keys_deleted": deleted, "message": f"Reset {deleted} quota counters"} else: return {"status": "success", "keys_deleted": 0, "message": "No active quotas to reset"} except Exception as e: logger.error(f"Admin quota reset failed: {str(e)}") raise HTTPException(status_code=500, detail=f"Quota reset failed: {str(e)}") @router.post("/config/provider") async def update_default_provider( provider: str = Form(...), admin_id: str = Depends(require_admin), ): """Update the default translation provider""" valid_providers = [ "google", "deepl", "openai", "openrouter", "zai", "classic", "llm", ] if provider not in valid_providers: raise HTTPException( status_code=400, detail=f"Invalid provider. Must be one of: {valid_providers}", ) config.TRANSLATION_SERVICE = provider return { "status": "success", "message": f"Default provider updated to {provider}", "provider": provider, } class AdminRevokeApiKeyRequest(BaseModel): reason: Optional[str] = None @router.delete("/api-keys/{key_id}") async def admin_revoke_api_key( key_id: str, body: Optional[AdminRevokeApiKeyRequest] = None, admin_id: str = Depends(require_admin), ): """Revoke any user's API key - Admin only""" from database.connection import get_sync_session from database.models import ApiKey revoke_reason = body.reason if body else None with get_sync_session() as session: api_key = ( session.query(ApiKey) .filter(ApiKey.id == key_id, ApiKey.is_active == True) .first() ) if not api_key: return JSONResponse( status_code=404, content={ "error": "API_KEY_NOT_FOUND", "message": "Clé API non trouvée ou déjà révoquée", }, ) owner_user_id = api_key.user_id api_key.is_active = False api_key.revoked_at = datetime.now(timezone.utc) session.commit() logger.info( "admin_api_key_revoked", extra={ "admin_id": admin_id, "key_id": key_id, "owner_user_id": owner_user_id, "reason": revoke_reason, "timestamp": datetime.now(timezone.utc).isoformat(), }, ) return JSONResponse( status_code=200, content={ "data": { "id": api_key.id, "revoked": True, "revoked_at": datetime.now(timezone.utc).isoformat(), "owner_user_id": owner_user_id, "reason": revoke_reason, }, "meta": {}, }, ) def _extract_error_code(error_message: Optional[str]) -> Optional[str]: """Extract a short error code from error message for log display (NFR: no document content).""" if not error_message or not error_message.strip(): return None import re m = re.search(r"\b([A-Z][A-Z0-9_]{2,})\b", error_message) if m: return m.group(1) first = error_message.strip().split()[0] if error_message.strip() else "" if first: return first.upper()[:20] return None @router.get("/logs") def get_admin_logs( admin_id: str = Depends(require_admin), level: str = Query(default="all", pattern="^(all|error|warning|info)$"), search: str = Query(default="", max_length=200), page: int = Query(default=1, ge=1), per_page: int = Query(default=50, ge=1, le=200), ): """Get admin error logs from failed translations. No document content or original_filename exposed (NFR11, NFR16). Search matches user_id and error_message (error codes typically appear in error_message).""" from database.connection import get_sync_session from database.models import Translation from sqlalchemy import or_, desc if level == "warning" or level == "info": return { "data": { "logs": [], "total": 0, "page": page, "per_page": per_page, }, "meta": {"generated_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")}, } with get_sync_session() as session: base = session.query(Translation).filter(Translation.status == "failed") if search and search.strip(): term = f"%{search.strip()}%" base = base.filter( or_( Translation.user_id.ilike(term), Translation.error_message.ilike(term), ) ) total = base.count() rows = ( base.order_by(desc(Translation.created_at)) .offset((page - 1) * per_page) .limit(per_page) .all() ) def _ts(created_at): if not created_at: return "" s = created_at.isoformat() return s.replace("+00:00", "Z") if "+00:00" in s else s + "Z" logs = [ { "timestamp": _ts(t.created_at), "level": "error", "message": (t.error_message or "Translation failed").strip()[:500], "user_id": t.user_id, "error_code": _extract_error_code(t.error_message), "provider": t.provider, "file_type": t.file_type, } for t in rows ] return { "data": { "logs": logs, "total": total, "page": page, "per_page": per_page, }, "meta": {"generated_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")}, } SETTINGS_FILE = "data/provider_settings.json" class ProviderSettings(BaseModel): enabled: bool = False api_key: Optional[str] = None base_url: Optional[str] = None model: Optional[str] = None timeout: int = 30 max_retries: int = 3 class SmtpSettings(BaseModel): enabled: bool = False host: Optional[str] = None # SMTP_HOST port: int = 587 # SMTP_PORT username: Optional[str] = None # SMTP_USERNAME password: Optional[str] = None # SMTP_PASSWORD from_email: Optional[str] = None # SMTP_FROM_EMAIL use_tls: bool = True class SettingsConfig(BaseModel): google: ProviderSettings = ProviderSettings(enabled=True) google_cloud: ProviderSettings = ProviderSettings() # Cloud Translation API v2 (clé API) deepl: ProviderSettings = ProviderSettings() openai: ProviderSettings = ProviderSettings() openrouter: ProviderSettings = ProviderSettings() # "Traduction IA Essentielle" openrouter_premium: ProviderSettings = ProviderSettings() # "Traduction IA Premium" deepseek: ProviderSettings = ProviderSettings() minimax: ProviderSettings = ProviderSettings() zai: ProviderSettings = ProviderSettings() smtp: SmtpSettings = SmtpSettings() fallback_chain: str = "google,google_cloud,deepl,openrouter,openrouter_premium,openai,deepseek,zai" fallback_chain_classic: str = "google,google_cloud,deepl" fallback_chain_llm: str = "openrouter,openrouter_premium,openai,deepseek,zai" def load_settings() -> SettingsConfig: try: import json from pathlib import Path settings_path = Path(SETTINGS_FILE) if settings_path.exists(): with open(settings_path) as f: data = json.load(f) return SettingsConfig(**data) except Exception as e: logger.warning(f"Failed to load settings: {e}") return SettingsConfig() def save_settings(settings: SettingsConfig): import json from pathlib import Path settings_path = Path(SETTINGS_FILE) settings_path.parent.mkdir(exist_ok=True) with open(settings_path, "w") as f: json.dump(settings.model_dump(), f, indent=2) @router.get("/settings") async def get_settings(admin_id: str = Depends(require_admin)): settings = load_settings() # Merge env-var values into provider configs when JSON has no value. # Env vars fill models/URLs; API keys are never exposed (only hinted via env_info). # If an admin explicitly saves a value in the UI, JSON takes priority. def _merge_env( provider_settings: ProviderSettings, key_env: str = "", model_env: str = "", url_env: str = "", default_model: str = "", default_url: str = "", ) -> dict: d = provider_settings.model_dump() # Model: env var > JSON null > code default if model_env and not d.get("model"): d["model"] = os.getenv(model_env, "").strip() or default_model or None elif not d.get("model") and default_model: d["model"] = default_model # Base URL: env var > JSON null > code default if url_env and not d.get("base_url"): d["base_url"] = os.getenv(url_env, "").strip() or default_url or None elif not d.get("base_url") and default_url: d["base_url"] = default_url # API key: never expose; leave empty (UI shows "clé dans .env" badge via env_info) return d payload = settings.model_dump() # Essentielle : Gemini 3.5 Flash / DeepSeek Chat — meilleur rapport qualité/prix (juin 2026) payload["openrouter"] = _merge_env(settings.openrouter, key_env="OPENROUTER_API_KEY", model_env="OPENROUTER_MODEL", default_model="google/gemini-3.5-flash") # Premium : Claude Sonnet 4.6 — précision maximale sur documents complexes payload["openrouter_premium"] = _merge_env(settings.openrouter_premium, key_env="OPENROUTER_API_KEY", model_env="OPENROUTER_PREMIUM_MODEL", default_model="anthropic/claude-sonnet-4.6") payload["openai"] = _merge_env(settings.openai, key_env="OPENAI_API_KEY", model_env="OPENAI_MODEL", default_model="gpt-4o-mini") payload["deepl"] = _merge_env(settings.deepl, key_env="DEEPL_API_KEY") payload["deepseek"] = _merge_env(settings.deepseek, key_env="DEEPSEEK_API_KEY", model_env="DEEPSEEK_MODEL", default_model="deepseek-chat") payload["minimax"] = _merge_env(settings.minimax, key_env="MINIMAX_API_KEY", model_env="MINIMAX_MODEL", default_model="abab6.5s-chat") payload["zai"] = _merge_env(settings.zai, key_env="ZAI_API_KEY", model_env="ZAI_MODEL", url_env="ZAI_BASE_URL", default_model="grok-2-1212", default_url="https://api.x.ai/v1") payload["google_cloud"] = _merge_env(settings.google_cloud, key_env="GOOGLE_CLOUD_API_KEY") # SMTP: merge from env vars, but never expose password smtp_data = settings.smtp.model_dump() if not smtp_data["host"]: smtp_data["host"] = os.getenv("SMTP_HOST", "").strip() or None if not smtp_data["username"]: smtp_data["username"] = os.getenv("SMTP_USERNAME", "").strip() or None smtp_data["password"] = None # never expose if not smtp_data["from_email"]: smtp_data["from_email"] = os.getenv("SMTP_FROM_EMAIL", "").strip() or None smtp_env_port = os.getenv("SMTP_PORT", "").strip() if smtp_env_port and smtp_data["port"] == 587: try: smtp_data["port"] = int(smtp_env_port) except ValueError: pass smtp_env_tls = os.getenv("SMTP_USE_TLS", "").strip().lower() if smtp_env_tls in ("false", "0", "no"): smtp_data["use_tls"] = False payload["smtp"] = smtp_data # Inform the frontend which providers have API keys configured via env vars # (boolean only — never expose actual values) has_openrouter = bool(os.getenv("OPENROUTER_API_KEY", "").strip()) env_info = { "deepl": bool(os.getenv("DEEPL_API_KEY", "").strip()), "openai": bool(os.getenv("OPENAI_API_KEY", "").strip()), "openrouter": has_openrouter, "openrouter_premium": has_openrouter, # same key, different model "deepseek": bool(os.getenv("DEEPSEEK_API_KEY", "").strip()), "minimax": bool(os.getenv("MINIMAX_API_KEY", "").strip()), "zai": bool(os.getenv("ZAI_API_KEY", "").strip()), "google_cloud": bool(os.getenv("GOOGLE_CLOUD_API_KEY", "").strip()), "smtp": bool(os.getenv("SMTP_HOST", "").strip()), } return JSONResponse( status_code=200, content={"data": payload, "env_info": env_info, "meta": {}}, ) @router.put("/settings") async def update_settings( settings: SettingsConfig, admin_id: str = Depends(require_admin) ): # Preserve SMTP password: frontend always sends null (never exposed via GET) existing = load_settings() if settings.smtp.password is None and existing.smtp.password: settings.smtp.password = existing.smtp.password # If frontend sends a new non-empty password, keep it; if empty string, clear it if settings.smtp.password is not None: settings.smtp.password = settings.smtp.password.strip() or None save_settings(settings) logger.info(f"admin_settings_updated by {admin_id}") return JSONResponse( status_code=200, content={"data": settings.model_dump(), "meta": {}} ) class SmtpTestRequest(BaseModel): """Optional body for SMTP test — allows testing unsaved form values.""" host: Optional[str] = None port: Optional[int] = None username: Optional[str] = None password: Optional[str] = None from_email: Optional[str] = None use_tls: Optional[bool] = None @router.post("/providers/{provider}/test") async def test_provider( provider: str, admin_id: str = Depends(require_admin), smtp_body: Optional[SmtpTestRequest] = None, ): """Test a provider connection. Works even when provider is disabled. Always falls back to env vars when the JSON api_key is empty. For SMTP, accepts an optional JSON body with current form values.""" settings = load_settings() provider_config = getattr(settings, provider, None) if not provider_config: return JSONResponse( status_code=404, content={ "error": "PROVIDER_NOT_FOUND", "message": f"Provider {provider} not found", }, ) # Helper: resolve api_key from JSON config first, then env var def _key(json_val: Optional[str], env_var: str) -> str: return (json_val or "").strip() or os.getenv(env_var, "").strip() try: if provider == "google": from deep_translator import GoogleTranslator result = GoogleTranslator(source="auto", target="en").translate("bonjour") return JSONResponse( status_code=200, content={"available": True, "test_result": result} ) elif provider == "google_cloud": api_key = _key(provider_config.api_key, "GOOGLE_CLOUD_API_KEY") if not api_key: return JSONResponse( status_code=400, content={ "available": False, "error": "Aucune clé API Google Cloud trouvée (JSON ou .env GOOGLE_CLOUD_API_KEY).", }, ) import requests as _requests resp = _requests.post( "https://translation.googleapis.com/language/translate/v2", params={"key": api_key}, json={"q": "bonjour", "target": "en", "format": "text"}, timeout=10, ) if resp.ok: translated = ( resp.json() .get("data", {}) .get("translations", [{}])[0] .get("translatedText", "") ) return JSONResponse( status_code=200, content={"available": True, "test_result": translated}, ) elif resp.status_code in (401, 403): return JSONResponse( status_code=resp.status_code, content={ "available": False, "error": f"Clé API invalide ou API non activée dans Google Cloud Console (HTTP {resp.status_code}).", }, ) else: return JSONResponse( status_code=500, content={ "available": False, "error": f"Erreur Google Cloud Translation HTTP {resp.status_code}: {resp.text[:200]}", }, ) elif provider == "deepl": api_key = _key(provider_config.api_key, "DEEPL_API_KEY") if not api_key: return JSONResponse( status_code=400, content={"available": False, "error": "Aucune clé API DeepL trouvée (JSON ou .env)"}, ) import deepl translator = deepl.Translator(api_key) usage = translator.get_usage() return JSONResponse( status_code=200, content={"available": True, "usage": str(usage)} ) elif provider == "openrouter_premium": current.openrouter_premium = _update_provider(current.openrouter_premium, update_data) elif provider == "deepseek": current.deepseek = _update_provider(current.deepseek, update_data) elif provider == "minimax": current.minimax = _update_provider(current.minimax, update_data) elif provider == "zai": current.zai = _update_provider(current.zai, update_data) elif provider == "openai": api_key = _key(provider_config.api_key, "OPENAI_API_KEY") if not api_key: return JSONResponse( status_code=400, content={"available": False, "error": "Aucune clé API OpenAI trouvée (JSON ou .env)"}, ) import openai as _openai client = _openai.OpenAI(api_key=api_key) models = list(client.models.list()) return JSONResponse( status_code=200, content={"available": True, "models_count": len(models)}, ) elif provider in ("openrouter", "openrouter_premium"): api_key = _key(provider_config.api_key, "OPENROUTER_API_KEY") if not api_key: # openrouter_premium shares the same key as openrouter api_key = os.getenv("OPENROUTER_API_KEY", "").strip() if not api_key: return JSONResponse( status_code=400, content={"available": False, "error": "Aucune clé API OpenRouter trouvée (JSON ou .env)"}, ) import requests as _requests resp = _requests.get( "https://openrouter.ai/api/v1/auth/key", headers={"Authorization": f"Bearer {api_key}"}, timeout=10, ) if resp.ok: data = resp.json() return JSONResponse( status_code=200, content={"available": True, "label": data.get("data", {}).get("label", "OK")}, ) return JSONResponse( status_code=500, content={"available": False, "error": f"HTTP {resp.status_code}: {resp.text[:200]}"} ) elif provider == "zai": api_key = _key(provider_config.api_key, "ZAI_API_KEY") if not api_key: return JSONResponse( status_code=400, content={"available": False, "error": "Aucune clé API xAI trouvée (JSON ou .env)"}, ) import openai as _openai base_url = (provider_config.base_url or "").strip() or os.getenv("ZAI_BASE_URL", "https://api.x.ai/v1") client = _openai.OpenAI(api_key=api_key, base_url=base_url) try: models = list(client.models.list()) return JSONResponse( status_code=200, content={ "available": True, "models_count": len(models), "sample_models": [m.id for m in models[:5]], }, ) except _openai.AuthenticationError: return JSONResponse( status_code=401, content={"available": False, "error": "Clé xAI invalide"}, ) elif provider == "smtp": import smtplib as _smtplib # Priority: request body (form values) > saved settings > env vars host = ((smtp_body and smtp_body.host) or "").strip() or (provider_config.host or "").strip() or os.getenv("SMTP_HOST", "").strip() if not host: return JSONResponse( status_code=400, content={"available": False, "error": "Aucun hôte SMTP configuré (JSON ou .env SMTP_HOST)"}, ) port = (smtp_body and smtp_body.port) or provider_config.port or 587 try: port = int(os.getenv("SMTP_PORT", "").strip() or port) except ValueError: port = 587 username = ((smtp_body and smtp_body.username) or "").strip() or (provider_config.username or "").strip() or os.getenv("SMTP_USERNAME", "").strip() password = ((smtp_body and smtp_body.password) or "").strip() or (provider_config.password or "").strip() or os.getenv("SMTP_PASSWORD", "").strip() use_tls = (smtp_body and smtp_body.use_tls) if (smtp_body and smtp_body.use_tls is not None) else (provider_config.use_tls if provider_config.use_tls is not None else True) try: server = _smtplib.SMTP(host, port, timeout=10) server.ehlo() if use_tls: server.starttls() server.ehlo() if username and password: server.login(username, password) server.quit() return JSONResponse( status_code=200, content={"available": True, "test_result": f"Connexion SMTP OK ({host}:{port})"}, ) except _smtplib.SMTPAuthenticationError as e: return JSONResponse( status_code=401, content={"available": False, "error": f"Authentification SMTP échouée: {e}"}, ) except _smtplib.SMTPConnectError as e: return JSONResponse( status_code=502, content={"available": False, "error": f"Connexion SMTP échouée: {e}"}, ) except Exception as e: return JSONResponse( status_code=500, content={"available": False, "error": f"Erreur SMTP: {str(e)[:200]}"}, ) else: return JSONResponse( status_code=404, content={"available": False, "error": "Provider inconnu"}, ) except Exception as e: logger.error(f"Provider test failed: {e}") return JSONResponse( status_code=500, content={"available": False, "error": str(e)} ) @router.post("/providers/smtp/test-send") async def test_send_email( smtp_body: Optional[SmtpTestRequest] = None, admin_id: str = Depends(require_admin), ): """Envoie un email de test a l'adresse from_email configuree. Accepte un body JSON optionnel avec les valeurs du formulaire en cours.""" import smtplib as _smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart settings = load_settings() smtp = settings.smtp # Priority: request body (form values) > saved settings > env vars host = ((smtp_body and smtp_body.host) or "").strip() or (smtp.host or "").strip() or os.getenv("SMTP_HOST", "").strip() if not host: return JSONResponse( status_code=400, content={"available": False, "error": "Aucun hôte SMTP configuré"}, ) port = (smtp_body and smtp_body.port) or smtp.port or 587 try: port = int(os.getenv("SMTP_PORT", "").strip() or port) except ValueError: port = 587 username = ((smtp_body and smtp_body.username) or "").strip() or (smtp.username or "").strip() or os.getenv("SMTP_USERNAME", "").strip() password = ((smtp_body and smtp_body.password) or "").strip() or (smtp.password or "").strip() or os.getenv("SMTP_PASSWORD", "").strip() from_email = ((smtp_body and smtp_body.from_email) or "").strip() or (smtp.from_email or "").strip() or os.getenv("SMTP_FROM_EMAIL", "").strip() or username if from_email: from_email = from_email.replace("\r", "").replace("\n", "") use_tls = (smtp_body and smtp_body.use_tls) if (smtp_body and smtp_body.use_tls is not None) else (smtp.use_tls if smtp.use_tls is not None else True) if not from_email: return JSONResponse( status_code=400, content={"available": False, "error": "Aucune adresse d'expédition configurée (from_email ou SMTP_FROM_EMAIL)"}, ) msg = MIMEMultipart() msg["From"] = from_email msg["To"] = from_email msg["Subject"] = "Test Office Translator — Email SMTP" msg.attach(MIMEText( "Ceci est un email de test envoyé depuis la page d'administration Office Translator.\n\n" "Si vous recevez cet email, la configuration SMTP est correcte.", "plain", "utf-8" )) try: server = _smtplib.SMTP(host, port, timeout=15) server.ehlo() if use_tls: server.starttls() server.ehlo() if username and password: server.login(username, password) server.sendmail(from_email, from_email, msg.as_string()) server.quit() logger.info(f"admin_smtp_test_email sent to {from_email}") return JSONResponse( status_code=200, content={"available": True, "test_result": f"Email de test envoyé à {from_email}"}, ) except _smtplib.SMTPAuthenticationError as e: return JSONResponse( status_code=401, content={"available": False, "error": f"Authentification SMTP échouée: {e}"}, ) except Exception as e: logger.error(f"SMTP test-send failed: {e}") return JSONResponse( status_code=500, content={"available": False, "error": f"Erreur envoi email: {str(e)[:200]}"}, ) @router.get("/providers/openai/models") async def list_openai_models(admin_id: str = Depends(require_admin)): """List available models from OpenAI API""" settings = load_settings() api_key = (settings.openai.api_key or "").strip() or os.getenv("OPENAI_API_KEY", "").strip() if not api_key: return JSONResponse( status_code=400, content={"error": "NO_API_KEY", "message": "Aucune clé API OpenAI trouvée (JSON ou .env)"}, ) try: import openai as _openai client = _openai.OpenAI(api_key=api_key) raw_models = list(client.models.list()) models = [ {"id": m.id, "owned_by": m.owned_by, "created": m.created} for m in raw_models ] return JSONResponse( status_code=200, content={"data": models, "meta": {"total": len(models)}}, ) except Exception as e: logger.error(f"List OpenAI models failed: {e}") return JSONResponse( status_code=500, content={"error": "INTERNAL_ERROR", "message": str(e)}, ) @router.get("/providers/openrouter/models") async def list_openrouter_models(admin_id: str = Depends(require_admin)): """List available models from OpenRouter (public endpoint, no API key needed)""" try: import requests as _requests resp = _requests.get( "https://openrouter.ai/api/v1/models", timeout=15, ) if not resp.ok: return JSONResponse( status_code=502, content={"error": "OPENROUTER_ERROR", "message": f"OpenRouter returned HTTP {resp.status_code}"}, ) data = resp.json() raw = data.get("data", []) models = [ { "id": m.get("id", ""), "name": m.get("name", ""), "context_length": m.get("context_length"), "pricing": m.get("pricing", {}), } for m in raw ] return JSONResponse( status_code=200, content={"data": models, "meta": {"total": len(models)}}, ) except Exception as e: logger.error(f"List OpenRouter models failed: {e}") return JSONResponse( status_code=500, content={"error": "INTERNAL_ERROR", "message": str(e)}, ) @router.get("/providers/zai/models") async def list_zai_models(admin_id: str = Depends(require_admin)): """List available models from xAI / zAI (OpenAI-compatible API)""" settings = load_settings() api_key = (settings.zai.api_key or "").strip() or os.getenv("ZAI_API_KEY", "").strip() if not api_key: return JSONResponse( status_code=400, content={"error": "NO_API_KEY", "message": "Aucune clé API xAI trouvée (JSON ou .env)"}, ) try: import openai as _openai base_url = (settings.zai.base_url or "").strip() or os.getenv("ZAI_BASE_URL", "https://api.x.ai/v1") client = _openai.OpenAI(api_key=api_key, base_url=base_url) raw_models = list(client.models.list()) models = [ {"id": m.id, "owned_by": m.owned_by, "created": m.created} for m in raw_models ] return JSONResponse( status_code=200, content={"data": models, "meta": {"total": len(models)}}, ) except Exception as e: logger.error(f"List xAI models failed: {e}") return JSONResponse( status_code=500, content={"error": "INTERNAL_ERROR", "message": str(e)}, ) # ============================================================ # PRICING MANAGEMENT (Admin only) # ============================================================ class PlanPricingUpdate(BaseModel): price_monthly: Optional[float] = None price_yearly: Optional[float] = None # ignoré : calculé serveur (cohérence −20 % annuel) stripe_price_id_monthly: Optional[str] = None stripe_price_id_yearly: Optional[str] = None class PricingConfig(BaseModel): starter: PlanPricingUpdate = PlanPricingUpdate() pro: PlanPricingUpdate = PlanPricingUpdate() business: PlanPricingUpdate = PlanPricingUpdate() stripe_secret_key: Optional[str] = None stripe_publishable_key: Optional[str] = None stripe_webhook_secret: Optional[str] = None def _update_env_file(updates: dict): """Persist key=value pairs into .env file (in-place).""" env_path = config.BASE_DIR / ".env" if not env_path.exists(): return lines = env_path.read_text().splitlines() updated_keys = set() new_lines = [] for line in lines: stripped = line.strip() if "=" in stripped and not stripped.startswith("#"): key = stripped.split("=", 1)[0] if key in updates: new_lines.append(f"{key}={updates[key]}") updated_keys.add(key) continue new_lines.append(line) # Append any keys not already in .env for key, val in updates.items(): if key not in updated_keys: new_lines.append(f"{key}={val}") env_path.write_text("\n".join(new_lines)) @router.get("/pricing") async def get_pricing_config(admin_id: str = Depends(require_admin)): """Return current pricing config (plans + Stripe IDs).""" result = {} for plan_id in ("starter", "pro", "business"): try: pt = PlanType(plan_id) plan = PLANS[pt] except Exception: continue monthly, yearly = pricing_cfg.get_effective_monthly_yearly(plan_id) sm, sy = pricing_cfg.stripe_price_ids_for_plan(plan_id) result[plan_id] = { "name": plan["name"], "price_monthly": monthly, "price_yearly": yearly, "annual_discount_percent": pricing_cfg.ANNUAL_DISCOUNT_PERCENT, "stripe_price_id_monthly": sm, "stripe_price_id_yearly": sy, } stripe_configured = bool(os.getenv("STRIPE_SECRET_KEY", "").strip().startswith("sk_")) return JSONResponse(status_code=200, content={ "data": result, "stripe": { "configured": stripe_configured, "has_secret_key": bool(os.getenv("STRIPE_SECRET_KEY", "").strip()), "has_publishable_key": bool(os.getenv("STRIPE_PUBLISHABLE_KEY", "").strip()), "has_webhook_secret": bool(os.getenv("STRIPE_WEBHOOK_SECRET", "").strip()), "publishable_key": os.getenv("STRIPE_PUBLISHABLE_KEY", ""), }, "meta": { "annual_discount_percent": pricing_cfg.ANNUAL_DISCOUNT_PERCENT, "yearly_discount_factor": pricing_cfg.YEARLY_DISCOUNT_FACTOR, }, }) @router.put("/pricing") async def update_pricing_config( config_update: PricingConfig, admin_id: str = Depends(require_admin), ): """Update pricing (prices + Stripe IDs). Persists to JSON and .env.""" try: overrides = pricing_cfg.load_pricing_overrides() env_updates = {} for plan_id in ("starter", "pro", "business"): plan_update = getattr(config_update, plan_id) ov = dict(overrides.get(plan_id, {})) if plan_update.price_monthly is not None: pricing_cfg.validate_monthly_price_eur(plan_update.price_monthly) ov["price_monthly"] = float(plan_update.price_monthly) ov = pricing_cfg.normalize_plan_override_block(plan_id, ov) if plan_update.stripe_price_id_monthly is not None: mid = pricing_cfg.validate_stripe_price_id( plan_update.stripe_price_id_monthly, f"{plan_id} stripe_price_id_monthly", ) ov["stripe_price_id_monthly"] = mid if mid: env_updates[f"STRIPE_PRICE_{plan_id.upper()}_MONTHLY"] = mid if plan_update.stripe_price_id_yearly is not None: yid = pricing_cfg.validate_stripe_price_id( plan_update.stripe_price_id_yearly, f"{plan_id} stripe_price_id_yearly", ) ov["stripe_price_id_yearly"] = yid if yid: env_updates[f"STRIPE_PRICE_{plan_id.upper()}_YEARLY"] = yid overrides[plan_id] = ov if config_update.stripe_secret_key: pricing_cfg.validate_stripe_secret_key(config_update.stripe_secret_key) env_updates["STRIPE_SECRET_KEY"] = config_update.stripe_secret_key.strip() try: import stripe as _stripe _stripe.api_key = env_updates["STRIPE_SECRET_KEY"] except Exception: pass if config_update.stripe_publishable_key: pricing_cfg.validate_stripe_publishable_key( config_update.stripe_publishable_key ) env_updates["STRIPE_PUBLISHABLE_KEY"] = ( config_update.stripe_publishable_key.strip() ) if config_update.stripe_webhook_secret: pricing_cfg.validate_stripe_webhook_secret( config_update.stripe_webhook_secret ) env_updates["STRIPE_WEBHOOK_SECRET"] = ( config_update.stripe_webhook_secret.strip() ) for plan_id in ("starter", "pro", "business"): overrides[plan_id] = pricing_cfg.normalize_plan_override_block( plan_id, overrides.get(plan_id, {}) ) try: pricing_cfg.save_pricing_overrides(overrides) except OSError as e: logger.exception("admin_pricing_save_failed: %s", e) raise HTTPException( status_code=500, detail=( "Impossible d'écrire data/pricing_overrides.json (droits disque ou chemin). " f"Détail : {e}" ), ) from e if env_updates: _update_env_file(env_updates) pricing_cfg.apply_runtime_config_after_admin_write() logger.info("admin_pricing_updated by %s keys=%s", admin_id, list(env_updates.keys())) return JSONResponse( status_code=200, content={ "data": overrides, "meta": { "annual_discount_percent": pricing_cfg.ANNUAL_DISCOUNT_PERCENT, }, }, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) from e @router.post("/pricing/setup-stripe") async def setup_stripe_products(admin_id: str = Depends(require_admin)): """Auto-create Stripe products & prices from backend, then save IDs to .env.""" secret_key = os.getenv("STRIPE_SECRET_KEY", "").strip() if not secret_key or not secret_key.startswith("sk_"): return JSONResponse(status_code=400, content={ "error": "STRIPE_NOT_CONFIGURED", "message": "STRIPE_SECRET_KEY manquante ou invalide. Configurez-la d'abord.", }) try: import stripe as _stripe _stripe.api_key = secret_key except ImportError: return JSONResponse(status_code=500, content={ "error": "STRIPE_NOT_INSTALLED", "message": "Le module stripe n'est pas installé sur le serveur.", }) plans_meta = [ ("starter", "Office Translator Starter"), ("pro", "Office Translator Pro"), ("business", "Office Translator Business"), ] results = {} env_updates = {} errors = [] for plan_id, product_name in plans_meta: monthly, yearly = pricing_cfg.get_effective_monthly_yearly(plan_id) monthly_cents = int(round(monthly * 100)) yearly_cents = int(round(yearly * 100)) try: # Find or create product existing = _stripe.Product.search(query=f"name:\"{product_name}\"") if existing.data: product = existing.data[0] else: product = _stripe.Product.create( name=product_name, description=f"Abonnement Office Translator — Forfait {plan_id.capitalize()}", metadata={"plan": plan_id}, ) # Find or create monthly price prices = _stripe.Price.list(product=product.id, active=True, limit=20) monthly_price = next(( p for p in prices.data if p.recurring and p.recurring.interval == "month" and p.unit_amount == monthly_cents and p.currency == "eur" ), None) yearly_price = next(( p for p in prices.data if p.recurring and p.recurring.interval == "year" and p.unit_amount == yearly_cents and p.currency == "eur" ), None) if not monthly_price: monthly_price = _stripe.Price.create( product=product.id, unit_amount=monthly_cents, currency="eur", recurring={"interval": "month"}, metadata={"plan": plan_id}, ) if not yearly_price: yearly_price = _stripe.Price.create( product=product.id, unit_amount=yearly_cents, currency="eur", recurring={"interval": "year"}, metadata={"plan": plan_id}, ) results[plan_id] = { "product_id": product.id, "monthly_price_id": monthly_price.id, "yearly_price_id": yearly_price.id, "created": True, } env_updates[f"STRIPE_PRICE_{plan_id.upper()}_MONTHLY"] = monthly_price.id env_updates[f"STRIPE_PRICE_{plan_id.upper()}_YEARLY"] = yearly_price.id except Exception as e: errors.append({"plan": plan_id, "error": str(e)}) if env_updates: _update_env_file(env_updates) overrides = pricing_cfg.load_pricing_overrides() for plan_id, ids in results.items(): ov = overrides.get(plan_id, {}) ov["stripe_price_id_monthly"] = ids["monthly_price_id"] ov["stripe_price_id_yearly"] = ids["yearly_price_id"] overrides[plan_id] = ov pricing_cfg.save_pricing_overrides(overrides) pricing_cfg.apply_runtime_config_after_admin_write() logger.info(f"admin_stripe_setup by {admin_id}: {list(results.keys())}") return JSONResponse(status_code=200, content={ "data": results, "errors": errors, "env_updated": list(env_updates.keys()), "meta": {}, })