All checks were successful
Deploy to Production / Build and Deploy (push) Successful in 2s
1745 lines
66 KiB
Python
1745 lines
66 KiB
Python
"""
|
||
Admin API v1 Endpoints
|
||
All admin endpoints under /api/v1/admin/
|
||
Story 3.5: API Versioning - Migrated from main.py
|
||
"""
|
||
|
||
import os
|
||
import secrets
|
||
import time
|
||
import logging
|
||
from datetime import datetime, timezone
|
||
from typing import Optional, Literal
|
||
|
||
from fastapi import APIRouter, Depends, Header, HTTPException, Form, Request, Query
|
||
from fastapi.responses import JSONResponse
|
||
from passlib.context import CryptContext
|
||
from pydantic import BaseModel, Field
|
||
|
||
from config import config
|
||
from models.subscription import PlanType, PLANS
|
||
from services import pricing_config as pricing_cfg
|
||
|
||
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
router = APIRouter(prefix="/api/v1/admin", tags=["Admin"])
|
||
|
||
ADMIN_USERNAME = os.getenv("ADMIN_USERNAME")
|
||
ADMIN_PASSWORD_HASH = os.getenv("ADMIN_PASSWORD_HASH")
|
||
ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD")
|
||
if not ADMIN_PASSWORD_HASH and not ADMIN_PASSWORD:
|
||
ADMIN_PASSWORD = os.getenv("ADMIN_DEV_DEFAULT")
|
||
|
||
_admin_token_secret = os.getenv("ADMIN_TOKEN_SECRET")
|
||
if not _admin_token_secret:
|
||
_admin_token_secret = secrets.token_hex(32)
|
||
logger.critical(
|
||
"SECURITY: ADMIN_TOKEN_SECRET is not configured! Using an ephemeral random key. "
|
||
"ALL ADMIN SESSIONS WILL BE INVALIDATED ON EVERY RESTART. "
|
||
"Set ADMIN_TOKEN_SECRET in your .env file immediately."
|
||
)
|
||
ADMIN_TOKEN_SECRET = _admin_token_secret
|
||
|
||
REDIS_URL = os.getenv("REDIS_URL", "")
|
||
_redis_client = None
|
||
_memory_sessions: dict = {}
|
||
|
||
# Brute-force protection: IP → (failed_count, first_fail_ts)
|
||
_login_attempts: dict[str, tuple[int, float]] = {}
|
||
_MAX_LOGIN_ATTEMPTS = 5
|
||
_LOCKOUT_SECONDS = 300 # 5 minutes
|
||
|
||
|
||
def get_redis_client():
|
||
"""Return shared sync Redis client from core.redis, or None."""
|
||
from core.redis import get_sync_redis
|
||
|
||
return get_sync_redis()
|
||
|
||
|
||
def hash_password(password: str) -> str:
|
||
return pwd_context.hash(password)
|
||
|
||
|
||
def verify_admin_password(password: str) -> bool:
|
||
if not ADMIN_PASSWORD_HASH and not ADMIN_PASSWORD:
|
||
return False
|
||
p = (password or "").strip()
|
||
if ADMIN_PASSWORD_HASH:
|
||
try:
|
||
return pwd_context.verify(p, ADMIN_PASSWORD_HASH)
|
||
except Exception:
|
||
return False
|
||
return p == (ADMIN_PASSWORD or "").strip()
|
||
|
||
|
||
def _get_session_key(token: str) -> str:
|
||
return f"admin_session:{token}"
|
||
|
||
|
||
def create_admin_token() -> str:
|
||
token = secrets.token_urlsafe(32)
|
||
expiry = int(time.time()) + (24 * 60 * 60)
|
||
redis_client = get_redis_client()
|
||
if redis_client:
|
||
try:
|
||
redis_client.setex(_get_session_key(token), 24 * 60 * 60, str(expiry))
|
||
except Exception as e:
|
||
logger.warning(f"Redis session save failed: {e}")
|
||
_memory_sessions[token] = expiry
|
||
else:
|
||
_memory_sessions[token] = expiry
|
||
return token
|
||
|
||
|
||
def verify_admin_token(token: str) -> bool:
|
||
redis_client = get_redis_client()
|
||
if redis_client:
|
||
try:
|
||
expiry = redis_client.get(_get_session_key(token))
|
||
if expiry and int(expiry) > time.time():
|
||
return True
|
||
return False
|
||
except Exception as e:
|
||
logger.warning(f"Redis session check failed: {e}")
|
||
if token not in _memory_sessions:
|
||
return False
|
||
if time.time() > _memory_sessions[token]:
|
||
del _memory_sessions[token]
|
||
return False
|
||
return True
|
||
|
||
|
||
def delete_admin_token(token: str):
|
||
redis_client = get_redis_client()
|
||
if redis_client:
|
||
try:
|
||
redis_client.delete(_get_session_key(token))
|
||
except Exception:
|
||
pass
|
||
if token in _memory_sessions:
|
||
del _memory_sessions[token]
|
||
|
||
|
||
async def require_admin(authorization: Optional[str] = Header(None)) -> str:
|
||
if not ADMIN_USERNAME or (not ADMIN_PASSWORD_HASH and not ADMIN_PASSWORD):
|
||
raise HTTPException(
|
||
status_code=503, detail="Admin authentication not configured"
|
||
)
|
||
if not authorization:
|
||
raise HTTPException(status_code=401, detail="Authorization header required")
|
||
parts = authorization.split(" ")
|
||
if len(parts) != 2 or parts[0].lower() != "bearer":
|
||
raise HTTPException(
|
||
status_code=401, detail="Invalid authorization format. Use: Bearer <token>"
|
||
)
|
||
token = parts[1]
|
||
if not verify_admin_token(token):
|
||
raise HTTPException(status_code=401, detail="Invalid or expired token")
|
||
return ADMIN_USERNAME
|
||
|
||
|
||
class AdminLoginRequest(BaseModel):
|
||
password: str
|
||
|
||
|
||
class AdminUpdateUserTierRequest(BaseModel):
|
||
plan: Literal["free", "starter", "pro", "business", "enterprise"]
|
||
|
||
|
||
class AdminResetPasswordRequest(BaseModel):
|
||
new_password: str = Field(..., min_length=8)
|
||
|
||
|
||
def _plan_info_for_admin(plan_raw) -> dict:
|
||
"""Résout PLANS[...] à partir d'une valeur SQLAlchemy, string ou enum."""
|
||
try:
|
||
if isinstance(plan_raw, str):
|
||
pt = PlanType(plan_raw.lower())
|
||
elif hasattr(plan_raw, "value"):
|
||
pt = PlanType(plan_raw.value)
|
||
else:
|
||
pt = PlanType(plan_raw)
|
||
except (ValueError, KeyError, TypeError):
|
||
pt = PlanType.FREE
|
||
return PLANS.get(pt, PLANS[PlanType.FREE])
|
||
|
||
|
||
def _subscription_status_str(raw) -> str:
|
||
if raw is None:
|
||
return "active"
|
||
return raw.value if hasattr(raw, "value") else str(raw)
|
||
|
||
|
||
@router.post("/login")
|
||
async def admin_login(request: AdminLoginRequest, req: Request):
|
||
"""Admin login endpoint - Returns a bearer token for authenticated admin access"""
|
||
client_ip = req.client.host if req.client else "unknown"
|
||
|
||
# Brute-force protection
|
||
now = time.time()
|
||
attempts, first_fail = _login_attempts.get(client_ip, (0, now))
|
||
if attempts >= _MAX_LOGIN_ATTEMPTS:
|
||
elapsed = now - first_fail
|
||
if elapsed < _LOCKOUT_SECONDS:
|
||
remaining = int(_LOCKOUT_SECONDS - elapsed)
|
||
logger.warning(f"Admin login blocked (brute-force) for IP {client_ip}")
|
||
raise HTTPException(
|
||
status_code=429,
|
||
detail=f"Too many failed attempts. Try again in {remaining}s.",
|
||
headers={"Retry-After": str(remaining)},
|
||
)
|
||
else:
|
||
_login_attempts.pop(client_ip, None)
|
||
|
||
if not verify_admin_password(request.password):
|
||
count, first = _login_attempts.get(client_ip, (0, now))
|
||
_login_attempts[client_ip] = (count + 1, first if count > 0 else now)
|
||
logger.warning(f"Failed admin login attempt from {client_ip} ({count + 1}/{_MAX_LOGIN_ATTEMPTS})")
|
||
raise HTTPException(status_code=401, detail="Invalid credentials")
|
||
|
||
_login_attempts.pop(client_ip, None)
|
||
token = create_admin_token()
|
||
logger.info(f"Admin login successful from {client_ip}")
|
||
return {
|
||
"status": "success",
|
||
"access_token": token,
|
||
"token_type": "bearer",
|
||
"expires_in": 86400,
|
||
"message": "Login successful",
|
||
}
|
||
|
||
|
||
@router.post("/logout")
|
||
async def admin_logout(authorization: Optional[str] = Header(None)):
|
||
"""Logout and invalidate admin token"""
|
||
if authorization:
|
||
parts = authorization.split(" ")
|
||
if len(parts) == 2 and parts[0].lower() == "bearer":
|
||
token = parts[1]
|
||
delete_admin_token(token)
|
||
logger.info("Admin logout successful")
|
||
return {"status": "success", "message": "Logged out"}
|
||
|
||
|
||
@router.get("/verify")
|
||
async def verify_admin_session(is_admin: bool = Depends(require_admin)):
|
||
"""Verify admin token is still valid"""
|
||
return {"status": "valid", "authenticated": True}
|
||
|
||
|
||
@router.get("/dashboard")
|
||
async def get_admin_dashboard(is_admin: bool = Depends(require_admin)):
|
||
"""Get comprehensive admin dashboard data"""
|
||
from middleware.cleanup import create_cleanup_manager
|
||
from middleware.rate_limiting import RateLimitManager, RateLimitConfig
|
||
from services.translation_service import _translation_cache
|
||
|
||
cleanup_manager = create_cleanup_manager(config)
|
||
rate_limit_config = RateLimitConfig(
|
||
requests_per_minute=int(os.getenv("RATE_LIMIT_PER_MINUTE", "30")),
|
||
requests_per_hour=int(os.getenv("RATE_LIMIT_PER_HOUR", "200")),
|
||
translations_per_minute=int(os.getenv("TRANSLATIONS_PER_MINUTE", "10")),
|
||
translations_per_hour=int(os.getenv("TRANSLATIONS_PER_HOUR", "50")),
|
||
max_concurrent_translations=int(os.getenv("MAX_CONCURRENT_TRANSLATIONS", "5")),
|
||
)
|
||
rate_limit_manager = RateLimitManager(rate_limit_config)
|
||
|
||
health_status = {
|
||
"status": "healthy",
|
||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||
}
|
||
cleanup_stats = cleanup_manager.get_stats()
|
||
rate_limit_stats = rate_limit_manager.get_stats()
|
||
tracked_files = cleanup_manager.get_tracked_files()
|
||
|
||
providers_status = {}
|
||
try:
|
||
from services.providers.google_provider import get_google_provider
|
||
|
||
google_health = get_google_provider().health_check()
|
||
providers_status["google"] = google_health.model_dump()
|
||
except Exception as e:
|
||
providers_status["google"] = {
|
||
"name": "google",
|
||
"available": False,
|
||
"error": str(e)[:100],
|
||
"last_check": None,
|
||
}
|
||
|
||
return {
|
||
"timestamp": health_status.get("timestamp"),
|
||
"status": health_status.get("status"),
|
||
"system": {"memory": {}, "disk": {}},
|
||
"providers": providers_status,
|
||
"cleanup": {**cleanup_stats, "tracked_files_count": len(tracked_files)},
|
||
"rate_limits": rate_limit_stats,
|
||
"config": {
|
||
"max_file_size_mb": config.MAX_FILE_SIZE_MB,
|
||
"supported_extensions": list(config.SUPPORTED_EXTENSIONS),
|
||
"translation_service": config.TRANSLATION_SERVICE,
|
||
},
|
||
}
|
||
|
||
|
||
@router.get("/users")
|
||
async def get_admin_users(is_admin: bool = Depends(require_admin)):
|
||
"""Liste tous les utilisateurs (base SQLite/PostgreSQL + comptes uniquement dans users.json)."""
|
||
from services.auth_service import USE_DATABASE, DATABASE_AVAILABLE, load_users
|
||
from database.connection import get_sync_session
|
||
from database.models import ApiKey
|
||
|
||
users_list = []
|
||
seen_ids: set[str] = set()
|
||
seen_emails: set[str] = set()
|
||
|
||
with get_sync_session() as session:
|
||
if USE_DATABASE and DATABASE_AVAILABLE:
|
||
from database.models import User as DBUser
|
||
|
||
db_users = session.query(DBUser).order_by(DBUser.created_at.desc()).all()
|
||
for db_user in db_users:
|
||
plan_val = db_user.plan
|
||
plan_str = (
|
||
plan_val.value
|
||
if hasattr(plan_val, "value")
|
||
else (str(plan_val) if plan_val else "free")
|
||
)
|
||
plan_info = _plan_info_for_admin(plan_val)
|
||
|
||
active_api_keys = (
|
||
session.query(ApiKey)
|
||
.filter(ApiKey.user_id == db_user.id, ApiKey.is_active == True)
|
||
.all()
|
||
)
|
||
row = {
|
||
"id": str(db_user.id),
|
||
"email": db_user.email or "",
|
||
"name": db_user.name or "",
|
||
"plan": plan_str,
|
||
"subscription_status": _subscription_status_str(
|
||
db_user.subscription_status
|
||
),
|
||
"docs_translated_this_month": db_user.docs_translated_this_month or 0,
|
||
"pages_translated_this_month": db_user.pages_translated_this_month or 0,
|
||
"extra_credits": db_user.extra_credits or 0,
|
||
"created_at": db_user.created_at.isoformat() if db_user.created_at else "",
|
||
"plan_limits": {
|
||
"docs_per_month": plan_info.get("docs_per_month", 0),
|
||
"max_pages_per_doc": plan_info.get("max_pages_per_doc", 0),
|
||
},
|
||
"api_keys_count": len(active_api_keys),
|
||
"api_key_ids": [key.id for key in active_api_keys],
|
||
"storage": "database",
|
||
}
|
||
users_list.append(row)
|
||
seen_ids.add(row["id"])
|
||
if row["email"]:
|
||
seen_emails.add(row["email"].lower())
|
||
|
||
# Comptes uniquement dans data/users.json (legacy) — absents de la table users
|
||
users_data = load_users()
|
||
for user_id, user_data in users_data.items():
|
||
email = (user_data.get("email") or "").strip().lower()
|
||
if user_id in seen_ids or (email and email in seen_emails):
|
||
continue
|
||
plan_raw = user_data.get("plan", "free")
|
||
plan_str = (
|
||
plan_raw.value
|
||
if hasattr(plan_raw, "value")
|
||
else str(plan_raw)
|
||
)
|
||
plan_info = _plan_info_for_admin(plan_str)
|
||
|
||
active_api_keys = (
|
||
session.query(ApiKey)
|
||
.filter(ApiKey.user_id == user_id, ApiKey.is_active == True)
|
||
.all()
|
||
)
|
||
users_list.append(
|
||
{
|
||
"id": user_id,
|
||
"email": user_data.get("email", ""),
|
||
"name": user_data.get("name", ""),
|
||
"plan": plan_str,
|
||
"subscription_status": str(
|
||
user_data.get("subscription_status", "active")
|
||
),
|
||
"docs_translated_this_month": user_data.get(
|
||
"docs_translated_this_month", 0
|
||
),
|
||
"pages_translated_this_month": user_data.get(
|
||
"pages_translated_this_month", 0
|
||
),
|
||
"extra_credits": user_data.get("extra_credits", 0),
|
||
"created_at": user_data.get("created_at", ""),
|
||
"plan_limits": {
|
||
"docs_per_month": plan_info.get("docs_per_month", 0),
|
||
"max_pages_per_doc": plan_info.get("max_pages_per_doc", 0),
|
||
},
|
||
"api_keys_count": len(active_api_keys),
|
||
"api_key_ids": [key.id for key in active_api_keys],
|
||
"storage": "json_file",
|
||
}
|
||
)
|
||
seen_ids.add(user_id)
|
||
if email:
|
||
seen_emails.add(email)
|
||
|
||
users_list.sort(key=lambda x: x.get("created_at", ""), reverse=True)
|
||
|
||
return {"total": len(users_list), "users": users_list}
|
||
|
||
|
||
@router.patch("/users/{user_id}")
|
||
async def patch_admin_user_tier(
|
||
user_id: str,
|
||
body: AdminUpdateUserTierRequest,
|
||
is_admin: bool = Depends(require_admin),
|
||
):
|
||
"""Update a user's plan/tier - Admin only"""
|
||
from services.auth_service import get_user_by_id, update_user_plan
|
||
|
||
user = get_user_by_id(user_id)
|
||
if not user:
|
||
return JSONResponse(
|
||
status_code=404,
|
||
content={"error": "NOT_FOUND", "message": "User not found"},
|
||
)
|
||
|
||
updated = update_user_plan(user_id, body.plan)
|
||
if not updated:
|
||
return JSONResponse(
|
||
status_code=400,
|
||
content={
|
||
"error": "INVALID_PLAN",
|
||
"message": "Invalid plan. Allowed: free, starter, pro, business, enterprise",
|
||
"details": {
|
||
"allowed": ["free", "starter", "pro", "business", "enterprise"]
|
||
},
|
||
},
|
||
)
|
||
|
||
plan_value = (
|
||
updated.plan.value if hasattr(updated.plan, "value") else str(updated.plan)
|
||
)
|
||
new_tier = (
|
||
"pro"
|
||
if updated.plan in (PlanType.PRO, PlanType.BUSINESS, PlanType.ENTERPRISE)
|
||
else "free"
|
||
)
|
||
|
||
logger.info(
|
||
"admin_tier_change",
|
||
extra={
|
||
"event": "admin_tier_change",
|
||
"target_user_id": user_id,
|
||
"new_tier": new_tier,
|
||
"new_plan": plan_value,
|
||
"admin_id": "admin_session",
|
||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||
},
|
||
)
|
||
|
||
return {
|
||
"data": {
|
||
"id": updated.id,
|
||
"email": updated.email,
|
||
"name": getattr(updated, "name", ""),
|
||
"plan": plan_value,
|
||
"tier": new_tier,
|
||
},
|
||
"meta": {},
|
||
}
|
||
|
||
|
||
@router.post("/users/{user_id}/password")
|
||
async def admin_reset_user_password(
|
||
user_id: str,
|
||
body: AdminResetPasswordRequest,
|
||
is_admin: bool = Depends(require_admin),
|
||
):
|
||
"""Définit un nouveau mot de passe pour un utilisateur (sans email de réinitialisation)."""
|
||
from services.auth_service import admin_set_user_password
|
||
|
||
try:
|
||
user = admin_set_user_password(user_id, body.new_password)
|
||
except ValueError as e:
|
||
raise HTTPException(status_code=400, detail=str(e)) from e
|
||
if not user:
|
||
return JSONResponse(
|
||
status_code=404,
|
||
content={
|
||
"error": "NOT_FOUND",
|
||
"message": "Utilisateur introuvable (id inconnu en base et dans users.json).",
|
||
},
|
||
)
|
||
logger.info("admin_password_reset for user_id=%s", user_id)
|
||
return {
|
||
"data": {"id": user.id, "email": user.email},
|
||
"meta": {},
|
||
}
|
||
|
||
|
||
@router.get("/stats")
|
||
async def get_admin_stats(is_admin: bool = Depends(require_admin)):
|
||
"""Get comprehensive admin statistics"""
|
||
from services.auth_service import USE_DATABASE, DATABASE_AVAILABLE, load_users
|
||
from services.translation_service import _translation_cache
|
||
|
||
users_data = load_users()
|
||
plan_distribution: dict = {}
|
||
total_docs_translated = 0
|
||
total_pages_translated = 0
|
||
active_users = 0
|
||
|
||
if USE_DATABASE and DATABASE_AVAILABLE:
|
||
from database.connection import get_sync_session
|
||
from database.models import User as DBUser
|
||
|
||
with get_sync_session() as session:
|
||
db_rows = session.query(DBUser).all()
|
||
db_ids = {str(u.id) for u in db_rows}
|
||
db_emails = {(u.email or "").strip().lower() for u in db_rows}
|
||
for u in db_rows:
|
||
pv = u.plan.value if hasattr(u.plan, "value") else str(u.plan)
|
||
plan_distribution[pv] = plan_distribution.get(pv, 0) + 1
|
||
docs = u.docs_translated_this_month or 0
|
||
pages = u.pages_translated_this_month or 0
|
||
total_docs_translated += docs
|
||
total_pages_translated += pages
|
||
if docs > 0:
|
||
active_users += 1
|
||
json_only = 0
|
||
for uid, ud in users_data.items():
|
||
if uid in db_ids:
|
||
continue
|
||
em = (ud.get("email") or "").strip().lower()
|
||
if em and em in db_emails:
|
||
continue
|
||
json_only += 1
|
||
pr = ud.get("plan", "free")
|
||
pv = pr.value if hasattr(pr, "value") else str(pr)
|
||
plan_distribution[pv] = plan_distribution.get(pv, 0) + 1
|
||
docs = int(ud.get("docs_translated_this_month", 0) or 0)
|
||
pages = int(ud.get("pages_translated_this_month", 0) or 0)
|
||
total_docs_translated += docs
|
||
total_pages_translated += pages
|
||
if docs > 0:
|
||
active_users += 1
|
||
total_users = len(db_ids) + json_only
|
||
else:
|
||
total_users = len(users_data)
|
||
for user_data in users_data.values():
|
||
plan = user_data.get("plan", "free")
|
||
pv = plan.value if hasattr(plan, "value") else str(plan)
|
||
plan_distribution[pv] = plan_distribution.get(pv, 0) + 1
|
||
docs = user_data.get("docs_translated_this_month", 0)
|
||
pages = user_data.get("pages_translated_this_month", 0)
|
||
total_docs_translated += docs
|
||
total_pages_translated += pages
|
||
if docs > 0:
|
||
active_users += 1
|
||
|
||
cache_stats = _translation_cache.get_stats()
|
||
|
||
return {
|
||
"users": {
|
||
"total": total_users,
|
||
"active_this_month": active_users,
|
||
"by_plan": plan_distribution,
|
||
},
|
||
"translations": {
|
||
"docs_this_month": total_docs_translated,
|
||
"pages_this_month": total_pages_translated,
|
||
},
|
||
"cache": cache_stats,
|
||
"config": {
|
||
"translation_service": config.TRANSLATION_SERVICE,
|
||
"max_file_size_mb": config.MAX_FILE_SIZE_MB,
|
||
"supported_extensions": list(config.SUPPORTED_EXTENSIONS),
|
||
},
|
||
}
|
||
|
||
|
||
@router.post("/cleanup/trigger")
|
||
async def trigger_cleanup(is_admin: bool = Depends(require_admin)):
|
||
"""Trigger manual cleanup of expired files"""
|
||
from middleware.cleanup import create_cleanup_manager
|
||
|
||
cleanup_manager = create_cleanup_manager(config)
|
||
try:
|
||
cleaned = await cleanup_manager.cleanup_expired()
|
||
return {
|
||
"status": "success",
|
||
"files_cleaned": cleaned,
|
||
"message": f"Cleaned up {cleaned} expired files",
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"Manual cleanup failed: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=f"Cleanup failed: {str(e)}")
|
||
|
||
|
||
@router.get("/files/tracked")
|
||
async def get_tracked_files(is_admin: bool = Depends(require_admin)):
|
||
"""Get list of currently tracked files"""
|
||
from middleware.cleanup import create_cleanup_manager
|
||
|
||
cleanup_manager = create_cleanup_manager(config)
|
||
tracked = cleanup_manager.get_tracked_files()
|
||
return {"count": len(tracked), "files": tracked}
|
||
|
||
|
||
@router.post("/quota/reset")
|
||
async def reset_translation_quotas(is_admin: bool = Depends(require_admin)):
|
||
"""Reset monthly translation quotas for all free-tier users.
|
||
Clears Redis keys matching quota:monthly:*
|
||
"""
|
||
try:
|
||
from core.redis import get_async_redis
|
||
redis_client = get_async_redis()
|
||
if not redis_client:
|
||
return {"status": "skipped", "message": "Redis not available, quotas are in-memory only"}
|
||
|
||
# Find all monthly quota keys
|
||
keys = []
|
||
async for key in redis_client.scan_iter(match="quota:monthly:*"):
|
||
keys.append(key)
|
||
|
||
if keys:
|
||
deleted = await redis_client.delete(*keys)
|
||
logger.info("admin_quota_reset", keys_deleted=deleted)
|
||
return {"status": "success", "keys_deleted": deleted, "message": f"Reset {deleted} quota counters"}
|
||
else:
|
||
return {"status": "success", "keys_deleted": 0, "message": "No active quotas to reset"}
|
||
|
||
except Exception as e:
|
||
logger.error(f"Admin quota reset failed: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=f"Quota reset failed: {str(e)}")
|
||
|
||
|
||
@router.post("/config/provider")
|
||
async def update_default_provider(
|
||
provider: str = Form(...),
|
||
is_admin: bool = Depends(require_admin),
|
||
):
|
||
"""Update the default translation provider"""
|
||
valid_providers = [
|
||
"google",
|
||
"deepl",
|
||
"openai",
|
||
"ollama",
|
||
"openrouter",
|
||
"zai",
|
||
"libre",
|
||
"classic",
|
||
"llm",
|
||
]
|
||
if provider not in valid_providers:
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail=f"Invalid provider. Must be one of: {valid_providers}",
|
||
)
|
||
|
||
config.TRANSLATION_SERVICE = provider
|
||
|
||
return {
|
||
"status": "success",
|
||
"message": f"Default provider updated to {provider}",
|
||
"provider": provider,
|
||
}
|
||
|
||
|
||
class AdminRevokeApiKeyRequest(BaseModel):
|
||
reason: Optional[str] = None
|
||
|
||
|
||
@router.delete("/api-keys/{key_id}")
|
||
async def admin_revoke_api_key(
|
||
key_id: str,
|
||
body: Optional[AdminRevokeApiKeyRequest] = None,
|
||
admin_id: str = Depends(require_admin),
|
||
):
|
||
"""Revoke any user's API key - Admin only"""
|
||
from database.connection import get_sync_session
|
||
from database.models import ApiKey
|
||
|
||
revoke_reason = body.reason if body else None
|
||
|
||
with get_sync_session() as session:
|
||
api_key = (
|
||
session.query(ApiKey)
|
||
.filter(ApiKey.id == key_id, ApiKey.is_active == True)
|
||
.first()
|
||
)
|
||
|
||
if not api_key:
|
||
return JSONResponse(
|
||
status_code=404,
|
||
content={
|
||
"error": "API_KEY_NOT_FOUND",
|
||
"message": "Clé API non trouvée ou déjà révoquée",
|
||
},
|
||
)
|
||
|
||
owner_user_id = api_key.user_id
|
||
|
||
api_key.is_active = False
|
||
api_key.revoked_at = datetime.now(timezone.utc)
|
||
session.commit()
|
||
|
||
logger.info(
|
||
"admin_api_key_revoked",
|
||
extra={
|
||
"admin_id": admin_id,
|
||
"key_id": key_id,
|
||
"owner_user_id": owner_user_id,
|
||
"reason": revoke_reason,
|
||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||
},
|
||
)
|
||
|
||
return JSONResponse(
|
||
status_code=200,
|
||
content={
|
||
"data": {
|
||
"id": api_key.id,
|
||
"revoked": True,
|
||
"revoked_at": datetime.now(timezone.utc).isoformat(),
|
||
"owner_user_id": owner_user_id,
|
||
"reason": revoke_reason,
|
||
},
|
||
"meta": {},
|
||
},
|
||
)
|
||
|
||
|
||
def _extract_error_code(error_message: Optional[str]) -> Optional[str]:
|
||
"""Extract a short error code from error message for log display (NFR: no document content)."""
|
||
if not error_message or not error_message.strip():
|
||
return None
|
||
import re
|
||
m = re.search(r"\b([A-Z][A-Z0-9_]{2,})\b", error_message)
|
||
if m:
|
||
return m.group(1)
|
||
first = error_message.strip().split()[0] if error_message.strip() else ""
|
||
if first:
|
||
return first.upper()[:20]
|
||
return None
|
||
|
||
|
||
@router.get("/logs")
|
||
def get_admin_logs(
|
||
is_admin: str = Depends(require_admin),
|
||
level: str = Query(default="all", pattern="^(all|error|warning|info)$"),
|
||
search: str = Query(default="", max_length=200),
|
||
page: int = Query(default=1, ge=1),
|
||
per_page: int = Query(default=50, ge=1, le=200),
|
||
):
|
||
"""Get admin error logs from failed translations. No document content or original_filename exposed (NFR11, NFR16).
|
||
Search matches user_id and error_message (error codes typically appear in error_message)."""
|
||
from database.connection import get_sync_session
|
||
from database.models import Translation
|
||
from sqlalchemy import or_, desc
|
||
|
||
if level == "warning" or level == "info":
|
||
return {
|
||
"data": {
|
||
"logs": [],
|
||
"total": 0,
|
||
"page": page,
|
||
"per_page": per_page,
|
||
},
|
||
"meta": {"generated_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")},
|
||
}
|
||
|
||
with get_sync_session() as session:
|
||
base = session.query(Translation).filter(Translation.status == "failed")
|
||
|
||
if search and search.strip():
|
||
term = f"%{search.strip()}%"
|
||
base = base.filter(
|
||
or_(
|
||
Translation.user_id.ilike(term),
|
||
Translation.error_message.ilike(term),
|
||
)
|
||
)
|
||
|
||
total = base.count()
|
||
rows = (
|
||
base.order_by(desc(Translation.created_at))
|
||
.offset((page - 1) * per_page)
|
||
.limit(per_page)
|
||
.all()
|
||
)
|
||
|
||
def _ts(created_at):
|
||
if not created_at:
|
||
return ""
|
||
s = created_at.isoformat()
|
||
return s.replace("+00:00", "Z") if "+00:00" in s else s + "Z"
|
||
|
||
logs = [
|
||
{
|
||
"timestamp": _ts(t.created_at),
|
||
"level": "error",
|
||
"message": (t.error_message or "Translation failed").strip()[:500],
|
||
"user_id": t.user_id,
|
||
"error_code": _extract_error_code(t.error_message),
|
||
"provider": t.provider,
|
||
"file_type": t.file_type,
|
||
}
|
||
for t in rows
|
||
]
|
||
|
||
return {
|
||
"data": {
|
||
"logs": logs,
|
||
"total": total,
|
||
"page": page,
|
||
"per_page": per_page,
|
||
},
|
||
"meta": {"generated_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")},
|
||
}
|
||
|
||
|
||
SETTINGS_FILE = "data/provider_settings.json"
|
||
|
||
|
||
class ProviderSettings(BaseModel):
|
||
enabled: bool = False
|
||
api_key: Optional[str] = None
|
||
base_url: Optional[str] = None
|
||
model: Optional[str] = None
|
||
timeout: int = 30
|
||
max_retries: int = 3
|
||
|
||
|
||
class SmtpSettings(BaseModel):
|
||
enabled: bool = False
|
||
host: Optional[str] = None # SMTP_HOST
|
||
port: int = 587 # SMTP_PORT
|
||
username: Optional[str] = None # SMTP_USERNAME
|
||
password: Optional[str] = None # SMTP_PASSWORD
|
||
from_email: Optional[str] = None # SMTP_FROM_EMAIL
|
||
use_tls: bool = True
|
||
|
||
|
||
class SettingsConfig(BaseModel):
|
||
google: ProviderSettings = ProviderSettings(enabled=True)
|
||
google_cloud: ProviderSettings = ProviderSettings() # Cloud Translation API v2 (clé API)
|
||
deepl: ProviderSettings = ProviderSettings()
|
||
openai: ProviderSettings = ProviderSettings()
|
||
ollama: ProviderSettings = ProviderSettings() # dev-only in UI
|
||
openrouter: ProviderSettings = ProviderSettings() # "Traduction IA Essentielle"
|
||
openrouter_premium: ProviderSettings = ProviderSettings() # "Traduction IA Premium"
|
||
deepseek: ProviderSettings = ProviderSettings()
|
||
minimax: ProviderSettings = ProviderSettings()
|
||
zai: ProviderSettings = ProviderSettings()
|
||
smtp: SmtpSettings = SmtpSettings()
|
||
fallback_chain: str = "google,google_cloud,deepl,openrouter,openrouter_premium,openai,deepseek,zai"
|
||
fallback_chain_classic: str = "google,google_cloud,deepl"
|
||
fallback_chain_llm: str = "openrouter,openrouter_premium,openai,deepseek,zai"
|
||
|
||
|
||
def load_settings() -> SettingsConfig:
|
||
try:
|
||
import json
|
||
from pathlib import Path
|
||
|
||
settings_path = Path(SETTINGS_FILE)
|
||
if settings_path.exists():
|
||
with open(settings_path) as f:
|
||
data = json.load(f)
|
||
return SettingsConfig(**data)
|
||
except Exception as e:
|
||
logger.warning(f"Failed to load settings: {e}")
|
||
return SettingsConfig()
|
||
|
||
|
||
def save_settings(settings: SettingsConfig):
|
||
import json
|
||
from pathlib import Path
|
||
|
||
settings_path = Path(SETTINGS_FILE)
|
||
settings_path.parent.mkdir(exist_ok=True)
|
||
with open(settings_path, "w") as f:
|
||
json.dump(settings.model_dump(), f, indent=2)
|
||
|
||
|
||
@router.get("/settings")
|
||
async def get_settings(admin_id: str = Depends(require_admin)):
|
||
settings = load_settings()
|
||
|
||
# Merge env-var values into provider configs when JSON has no value.
|
||
# Env vars fill models/URLs; API keys are never exposed (only hinted via env_info).
|
||
# If an admin explicitly saves a value in the UI, JSON takes priority.
|
||
def _merge_env(
|
||
provider_settings: ProviderSettings,
|
||
key_env: str = "",
|
||
model_env: str = "",
|
||
url_env: str = "",
|
||
default_model: str = "",
|
||
default_url: str = "",
|
||
) -> dict:
|
||
d = provider_settings.model_dump()
|
||
# Model: env var > JSON null > code default
|
||
if model_env and not d.get("model"):
|
||
d["model"] = os.getenv(model_env, "").strip() or default_model or None
|
||
elif not d.get("model") and default_model:
|
||
d["model"] = default_model
|
||
# Base URL: env var > JSON null > code default
|
||
if url_env and not d.get("base_url"):
|
||
d["base_url"] = os.getenv(url_env, "").strip() or default_url or None
|
||
elif not d.get("base_url") and default_url:
|
||
d["base_url"] = default_url
|
||
# API key: never expose; leave empty (UI shows "clé dans .env" badge via env_info)
|
||
return d
|
||
|
||
payload = settings.model_dump()
|
||
# Essentielle : DeepSeek V3.2 — meilleur rapport qualité/prix (mars 2026)
|
||
payload["openrouter"] = _merge_env(settings.openrouter, key_env="OPENROUTER_API_KEY", model_env="OPENROUTER_MODEL", default_model="deepseek/deepseek-v3.2")
|
||
# Premium : Claude 3.5 Haiku — précision maximale sur documents complexes
|
||
payload["openrouter_premium"] = _merge_env(settings.openrouter_premium, key_env="OPENROUTER_API_KEY", model_env="OPENROUTER_PREMIUM_MODEL", default_model="anthropic/claude-3.5-haiku")
|
||
payload["openai"] = _merge_env(settings.openai, key_env="OPENAI_API_KEY", model_env="OPENAI_MODEL", default_model="gpt-4o-mini")
|
||
payload["deepl"] = _merge_env(settings.deepl, key_env="DEEPL_API_KEY")
|
||
payload["deepseek"] = _merge_env(settings.deepseek, key_env="DEEPSEEK_API_KEY", model_env="DEEPSEEK_MODEL", default_model="deepseek-chat")
|
||
payload["minimax"] = _merge_env(settings.minimax, key_env="MINIMAX_API_KEY", model_env="MINIMAX_MODEL", default_model="abab6.5s-chat")
|
||
payload["zai"] = _merge_env(settings.zai, key_env="ZAI_API_KEY", model_env="ZAI_MODEL", url_env="ZAI_BASE_URL", default_model="grok-2-1212", default_url="https://api.x.ai/v1")
|
||
payload["ollama"] = _merge_env(settings.ollama, url_env="OLLAMA_BASE_URL", model_env="OLLAMA_MODEL", default_url="http://localhost:11434", default_model="llama3")
|
||
payload["google_cloud"] = _merge_env(settings.google_cloud, key_env="GOOGLE_CLOUD_API_KEY")
|
||
|
||
# SMTP: merge from env vars, but never expose password
|
||
smtp_data = settings.smtp.model_dump()
|
||
if not smtp_data["host"]:
|
||
smtp_data["host"] = os.getenv("SMTP_HOST", "").strip() or None
|
||
if not smtp_data["username"]:
|
||
smtp_data["username"] = os.getenv("SMTP_USERNAME", "").strip() or None
|
||
smtp_data["password"] = None # never expose
|
||
if not smtp_data["from_email"]:
|
||
smtp_data["from_email"] = os.getenv("SMTP_FROM_EMAIL", "").strip() or None
|
||
smtp_env_port = os.getenv("SMTP_PORT", "").strip()
|
||
if smtp_env_port and smtp_data["port"] == 587:
|
||
try:
|
||
smtp_data["port"] = int(smtp_env_port)
|
||
except ValueError:
|
||
pass
|
||
smtp_env_tls = os.getenv("SMTP_USE_TLS", "").strip().lower()
|
||
if smtp_env_tls in ("false", "0", "no"):
|
||
smtp_data["use_tls"] = False
|
||
payload["smtp"] = smtp_data
|
||
|
||
# Inform the frontend which providers have API keys configured via env vars
|
||
# (boolean only — never expose actual values)
|
||
has_openrouter = bool(os.getenv("OPENROUTER_API_KEY", "").strip())
|
||
env_info = {
|
||
"deepl": bool(os.getenv("DEEPL_API_KEY", "").strip()),
|
||
"openai": bool(os.getenv("OPENAI_API_KEY", "").strip()),
|
||
"openrouter": has_openrouter,
|
||
"openrouter_premium": has_openrouter, # same key, different model
|
||
"deepseek": bool(os.getenv("DEEPSEEK_API_KEY", "").strip()),
|
||
"minimax": bool(os.getenv("MINIMAX_API_KEY", "").strip()),
|
||
"zai": bool(os.getenv("ZAI_API_KEY", "").strip()),
|
||
"ollama": bool(os.getenv("OLLAMA_BASE_URL", "").strip()),
|
||
"google_cloud": bool(os.getenv("GOOGLE_CLOUD_API_KEY", "").strip()),
|
||
"smtp": bool(os.getenv("SMTP_HOST", "").strip()),
|
||
}
|
||
return JSONResponse(
|
||
status_code=200,
|
||
content={"data": payload, "env_info": env_info, "meta": {}},
|
||
)
|
||
|
||
|
||
@router.put("/settings")
|
||
async def update_settings(
|
||
settings: SettingsConfig, admin_id: str = Depends(require_admin)
|
||
):
|
||
# Preserve SMTP password: frontend always sends null (never exposed via GET)
|
||
existing = load_settings()
|
||
if settings.smtp.password is None and existing.smtp.password:
|
||
settings.smtp.password = existing.smtp.password
|
||
# If frontend sends a new non-empty password, keep it; if empty string, clear it
|
||
if settings.smtp.password is not None:
|
||
settings.smtp.password = settings.smtp.password.strip() or None
|
||
|
||
save_settings(settings)
|
||
logger.info(f"admin_settings_updated by {admin_id}")
|
||
return JSONResponse(
|
||
status_code=200, content={"data": settings.model_dump(), "meta": {}}
|
||
)
|
||
|
||
|
||
class SmtpTestRequest(BaseModel):
|
||
"""Optional body for SMTP test — allows testing unsaved form values."""
|
||
host: Optional[str] = None
|
||
port: Optional[int] = None
|
||
username: Optional[str] = None
|
||
password: Optional[str] = None
|
||
from_email: Optional[str] = None
|
||
use_tls: Optional[bool] = None
|
||
|
||
|
||
@router.post("/providers/{provider}/test")
|
||
async def test_provider(
|
||
provider: str,
|
||
admin_id: str = Depends(require_admin),
|
||
smtp_body: Optional[SmtpTestRequest] = None,
|
||
):
|
||
"""Test a provider connection. Works even when provider is disabled.
|
||
Always falls back to env vars when the JSON api_key is empty.
|
||
For SMTP, accepts an optional JSON body with current form values."""
|
||
settings = load_settings()
|
||
|
||
provider_config = getattr(settings, provider, None)
|
||
if not provider_config:
|
||
return JSONResponse(
|
||
status_code=404,
|
||
content={
|
||
"error": "PROVIDER_NOT_FOUND",
|
||
"message": f"Provider {provider} not found",
|
||
},
|
||
)
|
||
|
||
# Helper: resolve api_key from JSON config first, then env var
|
||
def _key(json_val: Optional[str], env_var: str) -> str:
|
||
return (json_val or "").strip() or os.getenv(env_var, "").strip()
|
||
|
||
try:
|
||
if provider == "google":
|
||
from deep_translator import GoogleTranslator
|
||
|
||
result = GoogleTranslator(source="auto", target="en").translate("bonjour")
|
||
return JSONResponse(
|
||
status_code=200, content={"available": True, "test_result": result}
|
||
)
|
||
|
||
elif provider == "google_cloud":
|
||
api_key = _key(provider_config.api_key, "GOOGLE_CLOUD_API_KEY")
|
||
if not api_key:
|
||
return JSONResponse(
|
||
status_code=400,
|
||
content={
|
||
"available": False,
|
||
"error": "Aucune clé API Google Cloud trouvée (JSON ou .env GOOGLE_CLOUD_API_KEY).",
|
||
},
|
||
)
|
||
import requests as _requests
|
||
|
||
resp = _requests.post(
|
||
"https://translation.googleapis.com/language/translate/v2",
|
||
params={"key": api_key},
|
||
json={"q": "bonjour", "target": "en", "format": "text"},
|
||
timeout=10,
|
||
)
|
||
if resp.ok:
|
||
translated = (
|
||
resp.json()
|
||
.get("data", {})
|
||
.get("translations", [{}])[0]
|
||
.get("translatedText", "")
|
||
)
|
||
return JSONResponse(
|
||
status_code=200,
|
||
content={"available": True, "test_result": translated},
|
||
)
|
||
elif resp.status_code in (401, 403):
|
||
return JSONResponse(
|
||
status_code=resp.status_code,
|
||
content={
|
||
"available": False,
|
||
"error": f"Clé API invalide ou API non activée dans Google Cloud Console (HTTP {resp.status_code}).",
|
||
},
|
||
)
|
||
else:
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content={
|
||
"available": False,
|
||
"error": f"Erreur Google Cloud Translation HTTP {resp.status_code}: {resp.text[:200]}",
|
||
},
|
||
)
|
||
|
||
elif provider == "deepl":
|
||
api_key = _key(provider_config.api_key, "DEEPL_API_KEY")
|
||
if not api_key:
|
||
return JSONResponse(
|
||
status_code=400,
|
||
content={"available": False, "error": "Aucune clé API DeepL trouvée (JSON ou .env)"},
|
||
)
|
||
import deepl
|
||
|
||
translator = deepl.Translator(api_key)
|
||
usage = translator.get_usage()
|
||
return JSONResponse(
|
||
status_code=200, content={"available": True, "usage": str(usage)}
|
||
)
|
||
|
||
elif provider == "openrouter_premium":
|
||
current.openrouter_premium = _update_provider(current.openrouter_premium, update_data)
|
||
elif provider == "deepseek":
|
||
current.deepseek = _update_provider(current.deepseek, update_data)
|
||
elif provider == "minimax":
|
||
current.minimax = _update_provider(current.minimax, update_data)
|
||
elif provider == "zai":
|
||
current.zai = _update_provider(current.zai, update_data)
|
||
elif provider == "openai":
|
||
api_key = _key(provider_config.api_key, "OPENAI_API_KEY")
|
||
if not api_key:
|
||
return JSONResponse(
|
||
status_code=400,
|
||
content={"available": False, "error": "Aucune clé API OpenAI trouvée (JSON ou .env)"},
|
||
)
|
||
import openai as _openai
|
||
|
||
client = _openai.OpenAI(api_key=api_key)
|
||
models = list(client.models.list())
|
||
return JSONResponse(
|
||
status_code=200,
|
||
content={"available": True, "models_count": len(models)},
|
||
)
|
||
|
||
elif provider == "ollama":
|
||
import requests as _requests
|
||
|
||
base_url = (provider_config.base_url or "").strip() or os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
|
||
resp = _requests.get(f"{base_url}/api/tags", timeout=5)
|
||
if resp.ok:
|
||
return JSONResponse(
|
||
status_code=200,
|
||
content={
|
||
"available": True,
|
||
"models": resp.json().get("models", []),
|
||
},
|
||
)
|
||
return JSONResponse(
|
||
status_code=500, content={"available": False, "error": str(resp.text)}
|
||
)
|
||
|
||
elif provider in ("openrouter", "openrouter_premium"):
|
||
api_key = _key(provider_config.api_key, "OPENROUTER_API_KEY")
|
||
if not api_key:
|
||
# openrouter_premium shares the same key as openrouter
|
||
api_key = os.getenv("OPENROUTER_API_KEY", "").strip()
|
||
if not api_key:
|
||
return JSONResponse(
|
||
status_code=400,
|
||
content={"available": False, "error": "Aucune clé API OpenRouter trouvée (JSON ou .env)"},
|
||
)
|
||
import requests as _requests
|
||
|
||
resp = _requests.get(
|
||
"https://openrouter.ai/api/v1/auth/key",
|
||
headers={"Authorization": f"Bearer {api_key}"},
|
||
timeout=10,
|
||
)
|
||
if resp.ok:
|
||
data = resp.json()
|
||
return JSONResponse(
|
||
status_code=200,
|
||
content={"available": True, "label": data.get("data", {}).get("label", "OK")},
|
||
)
|
||
return JSONResponse(
|
||
status_code=500, content={"available": False, "error": f"HTTP {resp.status_code}: {resp.text[:200]}"}
|
||
)
|
||
|
||
elif provider == "zai":
|
||
api_key = _key(provider_config.api_key, "ZAI_API_KEY")
|
||
if not api_key:
|
||
return JSONResponse(
|
||
status_code=400,
|
||
content={"available": False, "error": "Aucune clé API xAI trouvée (JSON ou .env)"},
|
||
)
|
||
import openai as _openai
|
||
|
||
base_url = (provider_config.base_url or "").strip() or os.getenv("ZAI_BASE_URL", "https://api.x.ai/v1")
|
||
client = _openai.OpenAI(api_key=api_key, base_url=base_url)
|
||
try:
|
||
models = list(client.models.list())
|
||
return JSONResponse(
|
||
status_code=200,
|
||
content={
|
||
"available": True,
|
||
"models_count": len(models),
|
||
"sample_models": [m.id for m in models[:5]],
|
||
},
|
||
)
|
||
except _openai.AuthenticationError:
|
||
return JSONResponse(
|
||
status_code=401,
|
||
content={"available": False, "error": "Clé xAI invalide"},
|
||
)
|
||
|
||
elif provider == "smtp":
|
||
import smtplib as _smtplib
|
||
|
||
# Priority: request body (form values) > saved settings > env vars
|
||
host = ((smtp_body and smtp_body.host) or "").strip() or (provider_config.host or "").strip() or os.getenv("SMTP_HOST", "").strip()
|
||
if not host:
|
||
return JSONResponse(
|
||
status_code=400,
|
||
content={"available": False, "error": "Aucun hôte SMTP configuré (JSON ou .env SMTP_HOST)"},
|
||
)
|
||
port = (smtp_body and smtp_body.port) or provider_config.port or 587
|
||
try:
|
||
port = int(os.getenv("SMTP_PORT", "").strip() or port)
|
||
except ValueError:
|
||
port = 587
|
||
username = ((smtp_body and smtp_body.username) or "").strip() or (provider_config.username or "").strip() or os.getenv("SMTP_USERNAME", "").strip()
|
||
password = ((smtp_body and smtp_body.password) or "").strip() or (provider_config.password or "").strip() or os.getenv("SMTP_PASSWORD", "").strip()
|
||
use_tls = (smtp_body and smtp_body.use_tls) if (smtp_body and smtp_body.use_tls is not None) else (provider_config.use_tls if provider_config.use_tls is not None else True)
|
||
|
||
try:
|
||
server = _smtplib.SMTP(host, port, timeout=10)
|
||
server.ehlo()
|
||
if use_tls:
|
||
server.starttls()
|
||
server.ehlo()
|
||
if username and password:
|
||
server.login(username, password)
|
||
server.quit()
|
||
return JSONResponse(
|
||
status_code=200,
|
||
content={"available": True, "test_result": f"Connexion SMTP OK ({host}:{port})"},
|
||
)
|
||
except _smtplib.SMTPAuthenticationError as e:
|
||
return JSONResponse(
|
||
status_code=401,
|
||
content={"available": False, "error": f"Authentification SMTP échouée: {e}"},
|
||
)
|
||
except _smtplib.SMTPConnectError as e:
|
||
return JSONResponse(
|
||
status_code=502,
|
||
content={"available": False, "error": f"Connexion SMTP échouée: {e}"},
|
||
)
|
||
except Exception as e:
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content={"available": False, "error": f"Erreur SMTP: {str(e)[:200]}"},
|
||
)
|
||
|
||
else:
|
||
return JSONResponse(
|
||
status_code=404,
|
||
content={"available": False, "error": "Provider inconnu"},
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Provider test failed: {e}")
|
||
return JSONResponse(
|
||
status_code=500, content={"available": False, "error": str(e)}
|
||
)
|
||
|
||
|
||
@router.post("/providers/smtp/test-send")
|
||
async def test_send_email(
|
||
smtp_body: Optional[SmtpTestRequest] = None,
|
||
admin_id: str = Depends(require_admin),
|
||
):
|
||
"""Envoie un email de test a l'adresse from_email configuree.
|
||
Accepte un body JSON optionnel avec les valeurs du formulaire en cours."""
|
||
import smtplib as _smtplib
|
||
from email.mime.text import MIMEText
|
||
from email.mime.multipart import MIMEMultipart
|
||
|
||
settings = load_settings()
|
||
smtp = settings.smtp
|
||
|
||
# Priority: request body (form values) > saved settings > env vars
|
||
host = ((smtp_body and smtp_body.host) or "").strip() or (smtp.host or "").strip() or os.getenv("SMTP_HOST", "").strip()
|
||
if not host:
|
||
return JSONResponse(
|
||
status_code=400,
|
||
content={"available": False, "error": "Aucun hôte SMTP configuré"},
|
||
)
|
||
port = (smtp_body and smtp_body.port) or smtp.port or 587
|
||
try:
|
||
port = int(os.getenv("SMTP_PORT", "").strip() or port)
|
||
except ValueError:
|
||
port = 587
|
||
username = ((smtp_body and smtp_body.username) or "").strip() or (smtp.username or "").strip() or os.getenv("SMTP_USERNAME", "").strip()
|
||
password = ((smtp_body and smtp_body.password) or "").strip() or (smtp.password or "").strip() or os.getenv("SMTP_PASSWORD", "").strip()
|
||
from_email = ((smtp_body and smtp_body.from_email) or "").strip() or (smtp.from_email or "").strip() or os.getenv("SMTP_FROM_EMAIL", "").strip() or username
|
||
use_tls = (smtp_body and smtp_body.use_tls) if (smtp_body and smtp_body.use_tls is not None) else (smtp.use_tls if smtp.use_tls is not None else True)
|
||
|
||
if not from_email:
|
||
return JSONResponse(
|
||
status_code=400,
|
||
content={"available": False, "error": "Aucune adresse d'expédition configurée (from_email ou SMTP_FROM_EMAIL)"},
|
||
)
|
||
|
||
msg = MIMEMultipart()
|
||
msg["From"] = from_email
|
||
msg["To"] = from_email
|
||
msg["Subject"] = "Test Office Translator — Email SMTP"
|
||
msg.attach(MIMEText(
|
||
"Ceci est un email de test envoyé depuis la page d'administration Office Translator.\n\n"
|
||
"Si vous recevez cet email, la configuration SMTP est correcte.",
|
||
"plain", "utf-8"
|
||
))
|
||
|
||
try:
|
||
server = _smtplib.SMTP(host, port, timeout=15)
|
||
server.ehlo()
|
||
if use_tls:
|
||
server.starttls()
|
||
server.ehlo()
|
||
if username and password:
|
||
server.login(username, password)
|
||
server.sendmail(from_email, from_email, msg.as_string())
|
||
server.quit()
|
||
logger.info(f"admin_smtp_test_email sent to {from_email}")
|
||
return JSONResponse(
|
||
status_code=200,
|
||
content={"available": True, "test_result": f"Email de test envoyé à {from_email}"},
|
||
)
|
||
except _smtplib.SMTPAuthenticationError as e:
|
||
return JSONResponse(
|
||
status_code=401,
|
||
content={"available": False, "error": f"Authentification SMTP échouée: {e}"},
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"SMTP test-send failed: {e}")
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content={"available": False, "error": f"Erreur envoi email: {str(e)[:200]}"},
|
||
)
|
||
|
||
|
||
@router.get("/providers/ollama/models")
|
||
async def list_ollama_models(
|
||
base_url: Optional[str] = Query(None),
|
||
admin_id: str = Depends(require_admin),
|
||
):
|
||
"""List available models from Ollama server. Accepts optional base_url query param
|
||
so the frontend can pass the URL currently being edited (before save)."""
|
||
import requests
|
||
from config import config as app_config
|
||
|
||
settings = load_settings()
|
||
resolved = (
|
||
base_url
|
||
or settings.ollama.base_url
|
||
or app_config.OLLAMA_BASE_URL
|
||
or "http://localhost:11434"
|
||
)
|
||
|
||
try:
|
||
response = requests.get(f"{resolved}/api/tags", timeout=5)
|
||
if response.ok:
|
||
data = response.json()
|
||
models = []
|
||
for model in data.get("models", []):
|
||
models.append(
|
||
{
|
||
"name": model.get("name", ""),
|
||
"size": model.get("size", 0),
|
||
"modified_at": model.get("modified_at", ""),
|
||
}
|
||
)
|
||
return JSONResponse(status_code=200, content={"data": models, "meta": {}})
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content={
|
||
"error": "OLLAMA_UNAVAILABLE",
|
||
"message": f"Ollama returned: {response.text}",
|
||
},
|
||
)
|
||
except requests.exceptions.ConnectionError:
|
||
return JSONResponse(
|
||
status_code=503,
|
||
content={
|
||
"error": "OLLAMA_CONNECTION_ERROR",
|
||
"message": f"Cannot connect to Ollama at {resolved}",
|
||
},
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"List Ollama models failed: {e}")
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content={"error": "INTERNAL_ERROR", "message": str(e)},
|
||
)
|
||
|
||
|
||
@router.get("/providers/openai/models")
|
||
async def list_openai_models(admin_id: str = Depends(require_admin)):
|
||
"""List available models from OpenAI API"""
|
||
settings = load_settings()
|
||
api_key = (settings.openai.api_key or "").strip() or os.getenv("OPENAI_API_KEY", "").strip()
|
||
if not api_key:
|
||
return JSONResponse(
|
||
status_code=400,
|
||
content={"error": "NO_API_KEY", "message": "Aucune clé API OpenAI trouvée (JSON ou .env)"},
|
||
)
|
||
try:
|
||
import openai as _openai
|
||
|
||
client = _openai.OpenAI(api_key=api_key)
|
||
raw_models = list(client.models.list())
|
||
models = [
|
||
{"id": m.id, "owned_by": m.owned_by, "created": m.created}
|
||
for m in raw_models
|
||
]
|
||
return JSONResponse(
|
||
status_code=200,
|
||
content={"data": models, "meta": {"total": len(models)}},
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"List OpenAI models failed: {e}")
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content={"error": "INTERNAL_ERROR", "message": str(e)},
|
||
)
|
||
|
||
|
||
@router.get("/providers/openrouter/models")
|
||
async def list_openrouter_models(admin_id: str = Depends(require_admin)):
|
||
"""List available models from OpenRouter (public endpoint, no API key needed)"""
|
||
try:
|
||
import requests as _requests
|
||
|
||
resp = _requests.get(
|
||
"https://openrouter.ai/api/v1/models",
|
||
timeout=15,
|
||
)
|
||
if not resp.ok:
|
||
return JSONResponse(
|
||
status_code=502,
|
||
content={"error": "OPENROUTER_ERROR", "message": f"OpenRouter returned HTTP {resp.status_code}"},
|
||
)
|
||
data = resp.json()
|
||
raw = data.get("data", [])
|
||
models = [
|
||
{
|
||
"id": m.get("id", ""),
|
||
"name": m.get("name", ""),
|
||
"context_length": m.get("context_length"),
|
||
"pricing": m.get("pricing", {}),
|
||
}
|
||
for m in raw
|
||
]
|
||
return JSONResponse(
|
||
status_code=200,
|
||
content={"data": models, "meta": {"total": len(models)}},
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"List OpenRouter models failed: {e}")
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content={"error": "INTERNAL_ERROR", "message": str(e)},
|
||
)
|
||
|
||
|
||
@router.get("/providers/zai/models")
|
||
async def list_zai_models(admin_id: str = Depends(require_admin)):
|
||
"""List available models from xAI / zAI (OpenAI-compatible API)"""
|
||
settings = load_settings()
|
||
api_key = (settings.zai.api_key or "").strip() or os.getenv("ZAI_API_KEY", "").strip()
|
||
if not api_key:
|
||
return JSONResponse(
|
||
status_code=400,
|
||
content={"error": "NO_API_KEY", "message": "Aucune clé API xAI trouvée (JSON ou .env)"},
|
||
)
|
||
try:
|
||
import openai as _openai
|
||
|
||
base_url = (settings.zai.base_url or "").strip() or os.getenv("ZAI_BASE_URL", "https://api.x.ai/v1")
|
||
client = _openai.OpenAI(api_key=api_key, base_url=base_url)
|
||
raw_models = list(client.models.list())
|
||
models = [
|
||
{"id": m.id, "owned_by": m.owned_by, "created": m.created}
|
||
for m in raw_models
|
||
]
|
||
return JSONResponse(
|
||
status_code=200,
|
||
content={"data": models, "meta": {"total": len(models)}},
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"List xAI models failed: {e}")
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content={"error": "INTERNAL_ERROR", "message": str(e)},
|
||
)
|
||
|
||
|
||
# ============================================================
|
||
# PRICING MANAGEMENT (Admin only)
|
||
# ============================================================
|
||
|
||
|
||
class PlanPricingUpdate(BaseModel):
|
||
price_monthly: Optional[float] = None
|
||
price_yearly: Optional[float] = None # ignoré : calculé serveur (cohérence −20 % annuel)
|
||
stripe_price_id_monthly: Optional[str] = None
|
||
stripe_price_id_yearly: Optional[str] = None
|
||
|
||
|
||
class PricingConfig(BaseModel):
|
||
starter: PlanPricingUpdate = PlanPricingUpdate()
|
||
pro: PlanPricingUpdate = PlanPricingUpdate()
|
||
business: PlanPricingUpdate = PlanPricingUpdate()
|
||
stripe_secret_key: Optional[str] = None
|
||
stripe_publishable_key: Optional[str] = None
|
||
stripe_webhook_secret: Optional[str] = None
|
||
|
||
|
||
def _update_env_file(updates: dict):
|
||
"""Persist key=value pairs into .env file (in-place)."""
|
||
env_path = config.BASE_DIR / ".env"
|
||
if not env_path.exists():
|
||
return
|
||
lines = env_path.read_text().splitlines()
|
||
updated_keys = set()
|
||
new_lines = []
|
||
for line in lines:
|
||
stripped = line.strip()
|
||
if "=" in stripped and not stripped.startswith("#"):
|
||
key = stripped.split("=", 1)[0]
|
||
if key in updates:
|
||
new_lines.append(f"{key}={updates[key]}")
|
||
updated_keys.add(key)
|
||
continue
|
||
new_lines.append(line)
|
||
# Append any keys not already in .env
|
||
for key, val in updates.items():
|
||
if key not in updated_keys:
|
||
new_lines.append(f"{key}={val}")
|
||
env_path.write_text("\n".join(new_lines))
|
||
|
||
|
||
@router.get("/pricing")
|
||
async def get_pricing_config(admin_id: str = Depends(require_admin)):
|
||
"""Return current pricing config (plans + Stripe IDs)."""
|
||
result = {}
|
||
for plan_id in ("starter", "pro", "business"):
|
||
try:
|
||
pt = PlanType(plan_id)
|
||
plan = PLANS[pt]
|
||
except Exception:
|
||
continue
|
||
monthly, yearly = pricing_cfg.get_effective_monthly_yearly(plan_id)
|
||
sm, sy = pricing_cfg.stripe_price_ids_for_plan(plan_id)
|
||
result[plan_id] = {
|
||
"name": plan["name"],
|
||
"price_monthly": monthly,
|
||
"price_yearly": yearly,
|
||
"annual_discount_percent": pricing_cfg.ANNUAL_DISCOUNT_PERCENT,
|
||
"stripe_price_id_monthly": sm,
|
||
"stripe_price_id_yearly": sy,
|
||
}
|
||
|
||
stripe_configured = bool(os.getenv("STRIPE_SECRET_KEY", "").strip().startswith("sk_"))
|
||
return JSONResponse(status_code=200, content={
|
||
"data": result,
|
||
"stripe": {
|
||
"configured": stripe_configured,
|
||
"has_secret_key": bool(os.getenv("STRIPE_SECRET_KEY", "").strip()),
|
||
"has_publishable_key": bool(os.getenv("STRIPE_PUBLISHABLE_KEY", "").strip()),
|
||
"has_webhook_secret": bool(os.getenv("STRIPE_WEBHOOK_SECRET", "").strip()),
|
||
"publishable_key": os.getenv("STRIPE_PUBLISHABLE_KEY", ""),
|
||
},
|
||
"meta": {
|
||
"annual_discount_percent": pricing_cfg.ANNUAL_DISCOUNT_PERCENT,
|
||
"yearly_discount_factor": pricing_cfg.YEARLY_DISCOUNT_FACTOR,
|
||
},
|
||
})
|
||
|
||
|
||
@router.put("/pricing")
|
||
async def update_pricing_config(
|
||
config_update: PricingConfig,
|
||
admin_id: str = Depends(require_admin),
|
||
):
|
||
"""Update pricing (prices + Stripe IDs). Persists to JSON and .env."""
|
||
try:
|
||
overrides = pricing_cfg.load_pricing_overrides()
|
||
env_updates = {}
|
||
|
||
for plan_id in ("starter", "pro", "business"):
|
||
plan_update = getattr(config_update, plan_id)
|
||
ov = dict(overrides.get(plan_id, {}))
|
||
|
||
if plan_update.price_monthly is not None:
|
||
pricing_cfg.validate_monthly_price_eur(plan_update.price_monthly)
|
||
ov["price_monthly"] = float(plan_update.price_monthly)
|
||
ov = pricing_cfg.normalize_plan_override_block(plan_id, ov)
|
||
|
||
if plan_update.stripe_price_id_monthly is not None:
|
||
mid = pricing_cfg.validate_stripe_price_id(
|
||
plan_update.stripe_price_id_monthly,
|
||
f"{plan_id} stripe_price_id_monthly",
|
||
)
|
||
ov["stripe_price_id_monthly"] = mid
|
||
if mid:
|
||
env_updates[f"STRIPE_PRICE_{plan_id.upper()}_MONTHLY"] = mid
|
||
|
||
if plan_update.stripe_price_id_yearly is not None:
|
||
yid = pricing_cfg.validate_stripe_price_id(
|
||
plan_update.stripe_price_id_yearly,
|
||
f"{plan_id} stripe_price_id_yearly",
|
||
)
|
||
ov["stripe_price_id_yearly"] = yid
|
||
if yid:
|
||
env_updates[f"STRIPE_PRICE_{plan_id.upper()}_YEARLY"] = yid
|
||
|
||
overrides[plan_id] = ov
|
||
|
||
if config_update.stripe_secret_key:
|
||
pricing_cfg.validate_stripe_secret_key(config_update.stripe_secret_key)
|
||
env_updates["STRIPE_SECRET_KEY"] = config_update.stripe_secret_key.strip()
|
||
try:
|
||
import stripe as _stripe
|
||
|
||
_stripe.api_key = env_updates["STRIPE_SECRET_KEY"]
|
||
except Exception:
|
||
pass
|
||
if config_update.stripe_publishable_key:
|
||
pricing_cfg.validate_stripe_publishable_key(
|
||
config_update.stripe_publishable_key
|
||
)
|
||
env_updates["STRIPE_PUBLISHABLE_KEY"] = (
|
||
config_update.stripe_publishable_key.strip()
|
||
)
|
||
if config_update.stripe_webhook_secret:
|
||
pricing_cfg.validate_stripe_webhook_secret(
|
||
config_update.stripe_webhook_secret
|
||
)
|
||
env_updates["STRIPE_WEBHOOK_SECRET"] = (
|
||
config_update.stripe_webhook_secret.strip()
|
||
)
|
||
|
||
for plan_id in ("starter", "pro", "business"):
|
||
overrides[plan_id] = pricing_cfg.normalize_plan_override_block(
|
||
plan_id, overrides.get(plan_id, {})
|
||
)
|
||
|
||
try:
|
||
pricing_cfg.save_pricing_overrides(overrides)
|
||
except OSError as e:
|
||
logger.exception("admin_pricing_save_failed: %s", e)
|
||
raise HTTPException(
|
||
status_code=500,
|
||
detail=(
|
||
"Impossible d'écrire data/pricing_overrides.json (droits disque ou chemin). "
|
||
f"Détail : {e}"
|
||
),
|
||
) from e
|
||
if env_updates:
|
||
_update_env_file(env_updates)
|
||
pricing_cfg.apply_runtime_config_after_admin_write()
|
||
|
||
logger.info("admin_pricing_updated by %s keys=%s", admin_id, list(env_updates.keys()))
|
||
return JSONResponse(
|
||
status_code=200,
|
||
content={
|
||
"data": overrides,
|
||
"meta": {
|
||
"annual_discount_percent": pricing_cfg.ANNUAL_DISCOUNT_PERCENT,
|
||
},
|
||
},
|
||
)
|
||
except ValueError as e:
|
||
raise HTTPException(status_code=400, detail=str(e)) from e
|
||
|
||
|
||
@router.post("/pricing/setup-stripe")
|
||
async def setup_stripe_products(admin_id: str = Depends(require_admin)):
|
||
"""Auto-create Stripe products & prices from backend, then save IDs to .env."""
|
||
secret_key = os.getenv("STRIPE_SECRET_KEY", "").strip()
|
||
if not secret_key or not secret_key.startswith("sk_"):
|
||
return JSONResponse(status_code=400, content={
|
||
"error": "STRIPE_NOT_CONFIGURED",
|
||
"message": "STRIPE_SECRET_KEY manquante ou invalide. Configurez-la d'abord.",
|
||
})
|
||
|
||
try:
|
||
import stripe as _stripe
|
||
_stripe.api_key = secret_key
|
||
except ImportError:
|
||
return JSONResponse(status_code=500, content={
|
||
"error": "STRIPE_NOT_INSTALLED",
|
||
"message": "Le module stripe n'est pas installé sur le serveur.",
|
||
})
|
||
|
||
plans_meta = [
|
||
("starter", "Office Translator Starter"),
|
||
("pro", "Office Translator Pro"),
|
||
("business", "Office Translator Business"),
|
||
]
|
||
|
||
results = {}
|
||
env_updates = {}
|
||
errors = []
|
||
|
||
for plan_id, product_name in plans_meta:
|
||
monthly, yearly = pricing_cfg.get_effective_monthly_yearly(plan_id)
|
||
monthly_cents = int(round(monthly * 100))
|
||
yearly_cents = int(round(yearly * 100))
|
||
try:
|
||
# Find or create product
|
||
existing = _stripe.Product.search(query=f"name:\"{product_name}\"")
|
||
if existing.data:
|
||
product = existing.data[0]
|
||
else:
|
||
product = _stripe.Product.create(
|
||
name=product_name,
|
||
description=f"Abonnement Office Translator — Forfait {plan_id.capitalize()}",
|
||
metadata={"plan": plan_id},
|
||
)
|
||
|
||
# Find or create monthly price
|
||
prices = _stripe.Price.list(product=product.id, active=True, limit=20)
|
||
monthly_price = next((
|
||
p for p in prices.data
|
||
if p.recurring and p.recurring.interval == "month"
|
||
and p.unit_amount == monthly_cents and p.currency == "eur"
|
||
), None)
|
||
yearly_price = next((
|
||
p for p in prices.data
|
||
if p.recurring and p.recurring.interval == "year"
|
||
and p.unit_amount == yearly_cents and p.currency == "eur"
|
||
), None)
|
||
|
||
if not monthly_price:
|
||
monthly_price = _stripe.Price.create(
|
||
product=product.id, unit_amount=monthly_cents, currency="eur",
|
||
recurring={"interval": "month"}, metadata={"plan": plan_id},
|
||
)
|
||
if not yearly_price:
|
||
yearly_price = _stripe.Price.create(
|
||
product=product.id, unit_amount=yearly_cents, currency="eur",
|
||
recurring={"interval": "year"}, metadata={"plan": plan_id},
|
||
)
|
||
|
||
results[plan_id] = {
|
||
"product_id": product.id,
|
||
"monthly_price_id": monthly_price.id,
|
||
"yearly_price_id": yearly_price.id,
|
||
"created": True,
|
||
}
|
||
env_updates[f"STRIPE_PRICE_{plan_id.upper()}_MONTHLY"] = monthly_price.id
|
||
env_updates[f"STRIPE_PRICE_{plan_id.upper()}_YEARLY"] = yearly_price.id
|
||
|
||
except Exception as e:
|
||
errors.append({"plan": plan_id, "error": str(e)})
|
||
|
||
if env_updates:
|
||
_update_env_file(env_updates)
|
||
overrides = pricing_cfg.load_pricing_overrides()
|
||
for plan_id, ids in results.items():
|
||
ov = overrides.get(plan_id, {})
|
||
ov["stripe_price_id_monthly"] = ids["monthly_price_id"]
|
||
ov["stripe_price_id_yearly"] = ids["yearly_price_id"]
|
||
overrides[plan_id] = ov
|
||
pricing_cfg.save_pricing_overrides(overrides)
|
||
pricing_cfg.apply_runtime_config_after_admin_write()
|
||
|
||
logger.info(f"admin_stripe_setup by {admin_id}: {list(results.keys())}")
|
||
return JSONResponse(status_code=200, content={
|
||
"data": results,
|
||
"errors": errors,
|
||
"env_updated": list(env_updates.keys()),
|
||
"meta": {},
|
||
})
|