Files
office_translator/routes/legacy_routes.py
sepehr fa637abff0
All checks were successful
Deploy to Production / Build and Deploy (push) Successful in 2m49s
perf+security: fix build, secure downloads, dedupe translations, refactor i18n
Frontend:
- Fix Framer Motion / motion-dom build error by pinning framer-motion to
  11.18.2 (compatible with React 19 and Next.js 16).
- Add cross-env and build:local script to bypass standalone symlink errors
  on Windows without Developer Mode.
- Allow NEXT_OUTPUT=default to disable standalone output for local builds.
- Refactor i18n: split 14,177-line src/lib/i18n.tsx into per-locale,
  per-namespace JSON files under src/lib/i18n/messages/.
- Load English synchronously; other locales loaded on demand via dynamic
  imports (reduces initial bundle, improves maintainability).
- Remove unused next-intl message files src/messages/en.json and fr.json.

Backend:
- Remove insecure legacy /api/v1/download/{filename} and /api/v1/cleanup/{filename}
  endpoints. The job-based /api/v1/download/{job_id} already enforces ownership.
- Deduplicate texts in TranslationService.translate_batch before sending them
  to the provider, reducing API calls for repeated strings.
- Pin httpx to <0.28 to fix TestClient incompatibility with starlette 0.35.1.
- Add pytest-cov and ruff dev dependencies/config.

DevOps:
- Remove hardcoded Grafana password from docker-compose.yml and
  docker-compose.monitoring.yml; use GRAFANA_PASSWORD env var.
- Change default TRANSLATION_SERVICE from ollama to google in
  docker-compose.yml (Ollama is an optional profile).
- Add GRAFANA_PASSWORD to .env.example.
- Add .coverage and frontend/pnpm-workspace.yaml to .gitignore.

Tests:
- Update API versioning tests for removed legacy endpoints.
- Add tests/test_translation_service.py for deduplication behavior.

Verified:
- pnpm run build:local passes.
- uv run pytest tests/test_providers/* tests/test_translation_service.py
  tests/test_story_3_5_api_versioning.py tests/test_download_endpoint.py
  tests/test_translators/test_excel_translator.py: provider/translator tests
  pass; one pre-existing French error-message test still fails (message is
  returned in English, unrelated to this change).
2026-06-14 16:44:18 +02:00

639 lines
22 KiB
Python

"""
Legacy API v1 Endpoints
Endpoints migrated from main.py that don't fit in other routers
Story 3.5: API Versioning
"""
import logging
import os
from pathlib import Path
from typing import Optional, Any
from fastapi import APIRouter, File, Form, UploadFile, HTTPException, Request, Depends
from fastapi.responses import FileResponse, JSONResponse
from config import config
from utils import file_handler
from middleware.api_key_auth import get_authenticated_user
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1", tags=["Legacy"])
def _resolve_model(
cfg_model: Optional[str],
model_env: str,
default: str,
) -> str:
"""Resolve effective model: JSON config > env var > default."""
v = (cfg_model or "").strip() or os.getenv(model_env, "").strip()
return v or default
@router.get("/providers/available")
async def get_available_providers():
"""
Return every provider that is enabled — checking BOTH the admin settings JSON
AND environment variables (env vars act as a fallback / override).
Rules:
- Google Translate is always shown.
- Ollama is only shown in DEV mode (APP_ENV=development or SHOW_OLLAMA=true).
- openrouter → shown as "Traduction IA Essentielle" (cheap models).
- openrouter_premium → shown as "Traduction IA Premium" (premium models).
"""
from routes.admin_routes import load_settings
settings = load_settings()
is_dev = os.getenv("APP_ENV", "production").lower() == "development"
def _key_ready(key_var: str) -> bool:
return bool(os.getenv(key_var, "").strip())
def _url_ready(url_var: str) -> bool:
return bool(os.getenv(url_var, "").strip())
def _is_enabled(name: str, key_var: str = "", url_var: str = "") -> bool:
cfg = getattr(settings, name, None)
if cfg and cfg.enabled:
return True
if key_var and _key_ready(key_var):
return True
if url_var and _url_ready(url_var):
return True
return False
available = []
# Google Translate — always available
available.append({
"id": "google",
"label": "Google Traduction",
"description": "Traduction rapide, 130+ langues, fiable",
"mode": "classic",
"tier": "free",
})
# DeepL — if configured
if _is_enabled("deepl", key_var="DEEPL_API_KEY"):
available.append({
"id": "deepl",
"label": "DeepL",
"description": "Traduction professionnelle haute qualité (langues européennes)",
"mode": "classic",
"tier": "pro",
})
# AI Essentielle (OpenRouter — cheap model / Eco)
if _is_enabled("openrouter", key_var="OPENROUTER_API_KEY"):
or_cfg = getattr(settings, "openrouter", None)
model = _resolve_model(
or_cfg.model if or_cfg else None,
"OPENROUTER_MODEL",
"google/gemini-3.5-flash",
)
available.append({
"id": "openrouter",
"label": "Traduction IA Éco",
"description": "IA rapide, économique et complète — supporte les images",
"mode": "llm",
"tier": "pro",
"model": model,
})
# AI Standard (DeepSeek via OpenRouter or direct DeepSeek API)
if _is_enabled("openrouter", key_var="OPENROUTER_API_KEY") or _is_enabled("deepseek", key_var="DEEPSEEK_API_KEY"):
ds_cfg = getattr(settings, "deepseek", None)
model = _resolve_model(
ds_cfg.model if ds_cfg else None,
"DEEPSEEK_MODEL",
"deepseek/deepseek-chat",
)
available.append({
"id": "deepseek",
"label": "Traduction IA Standard",
"description": "IA ultra-précise pour le texte (ne traduit pas les images)",
"mode": "llm",
"tier": "pro",
"model": model,
})
# AI Premium (OpenRouter — premium model)
if _is_enabled("openrouter_premium", key_var="OPENROUTER_API_KEY") or _is_enabled("openrouter", key_var="OPENROUTER_API_KEY"):
orp_cfg = getattr(settings, "openrouter_premium", None)
model = _resolve_model(
orp_cfg.model if orp_cfg else None,
"OPENROUTER_PREMIUM_MODEL",
"anthropic/claude-sonnet-4.6",
)
available.append({
"id": "openrouter_premium",
"label": "Traduction IA Premium",
"description": "IA haut de gamme — excellente qualité littéraire et multimodal",
"mode": "llm",
"tier": "business",
"model": model,
})
# OpenAI direct — if configured with direct API key
if _is_enabled("openai", key_var="OPENAI_API_KEY"):
oai_cfg = getattr(settings, "openai", None)
model = _resolve_model(
oai_cfg.model if oai_cfg else None,
"OPENAI_MODEL",
"gpt-4o-mini",
)
available.append({
"id": "openai",
"label": "OpenAI GPT",
"description": "Traduction IA via OpenAI directement",
"mode": "llm",
"tier": "business",
"model": model,
})
# MiniMax direct — if configured with direct API key
if _is_enabled("minimax", key_var="MINIMAX_API_KEY"):
mm_cfg = getattr(settings, "minimax", None)
model = _resolve_model(
mm_cfg.model if mm_cfg else None,
"MINIMAX_MODEL",
"abab6.5s-chat",
)
available.append({
"id": "minimax",
"label": "Traduction IA Avancée",
"description": "Traduction IA haute performance",
"mode": "llm",
"tier": "pro",
"model": model,
})
# z.AI / xAI Grok — if configured with direct API key
if _is_enabled("zai", key_var="ZAI_API_KEY"):
zai_cfg = getattr(settings, "zai", None)
model = _resolve_model(
zai_cfg.model if zai_cfg else None,
"ZAI_MODEL",
"grok-2-1212",
)
available.append({
"id": "zai",
"label": "Grok (xAI)",
"description": "IA Grok par xAI — traduction avancée",
"mode": "llm",
"tier": "business",
"model": model,
})
return JSONResponse(
status_code=200,
headers={"Cache-Control": "no-cache, no-store, must-revalidate"},
content={"providers": available}
)
@router.get("/languages")
async def get_supported_languages():
"""Get list of supported language codes, ordered by internet popularity"""
return {
"supported_languages": {
# Top 5 — dominant on the internet
"en": "English",
"es": "Spanish",
"de": "German",
"fr": "French",
"ja": "Japanese",
# Top 6-15
"pt": "Portuguese",
"ru": "Russian",
"it": "Italian",
"zh-CN": "Chinese (Simplified)",
"zh-TW": "Chinese (Traditional)",
"pl": "Polish",
"nl": "Dutch",
"tr": "Turkish",
"ko": "Korean",
"ar": "Arabic",
# Top 16-25
"fa": "Persian (Farsi)",
"vi": "Vietnamese",
"id": "Indonesian",
"uk": "Ukrainian",
"sv": "Swedish",
"cs": "Czech",
"el": "Greek",
"he": "Hebrew",
"hi": "Hindi",
"ro": "Romanian",
# Others
"da": "Danish",
"fi": "Finnish",
"no": "Norwegian",
"hu": "Hungarian",
"th": "Thai",
"sk": "Slovak",
"bg": "Bulgarian",
"hr": "Croatian",
"ca": "Catalan",
"ms": "Malay",
},
"note": "Supported languages may vary depending on the translation service configured",
}
@router.post("/translate-batch")
async def translate_batch_documents(
files: list[UploadFile] = File(
..., description="Multiple document files to translate"
),
target_language: str = Form(..., description="Target language code"),
source_language: str = Form(default="auto", description="Source language code"),
current_user: Optional[Any] = Depends(get_authenticated_user),
):
"""Translate multiple documents in batch"""
from translators import excel_translator, word_translator, pptx_translator
results = []
for file in files:
try:
file_extension = file_handler.validate_file_extension(file.filename)
file_handler.validate_file_size(file)
input_filename = file_handler.generate_unique_filename(
file.filename, "input"
)
output_filename = file_handler.generate_unique_filename(
file.filename, "translated"
)
input_path = config.UPLOAD_DIR / input_filename
output_path = config.OUTPUT_DIR / output_filename
file_handler.save_upload_file(file, input_path)
if file_extension == ".xlsx":
excel_translator.translate_file(
input_path, output_path, target_language, source_language
)
elif file_extension == ".docx":
word_translator.translate_file(
input_path, output_path, target_language, source_language
)
elif file_extension == ".pptx":
pptx_translator.translate_file(
input_path, output_path, target_language, source_language
)
file_handler.cleanup_file(input_path)
results.append(
{
"filename": file.filename,
"status": "success",
"output_file": output_filename,
"download_url": f"/api/v1/download/{output_filename}",
}
)
except Exception as e:
logger.exception(f"Error processing {file.filename}")
results.append(
{
"filename": file.filename,
"status": "error",
"error": "INTERNAL_ERROR",
"message": "Erreur lors du traitement du fichier.",
"details": {},
}
)
return {
"total_files": len(files),
"successful": len([r for r in results if r["status"] == "success"]),
"failed": len([r for r in results if r["status"] == "error"]),
"results": results,
}
@router.post("/extract-texts")
async def extract_texts_from_document(
file: UploadFile = File(..., description="Document file to extract texts from"),
current_user: Optional[Any] = Depends(get_authenticated_user),
):
"""Extract all translatable texts from a document for client-side translation"""
import uuid
import json
try:
file_extension = file_handler.validate_file_extension(file.filename)
logger.info(f"Extracting texts from {file_extension} file: {file.filename}")
file_handler.validate_file_size(file)
session_id = str(uuid.uuid4())
input_filename = f"session_{session_id}{file_extension}"
input_path = config.UPLOAD_DIR / input_filename
file_handler.save_upload_file(file, input_path)
texts = []
if file_extension == ".xlsx":
from openpyxl import load_workbook
wb = load_workbook(input_path)
for sheet in wb.worksheets:
for row in sheet.iter_rows():
for cell in row:
if (
cell.value
and isinstance(cell.value, str)
and cell.value.strip()
):
texts.append(
{
"id": f"{sheet.title}!{cell.coordinate}",
"text": cell.value,
}
)
wb.close()
elif file_extension == ".docx":
from docx import Document
doc = Document(input_path)
para_idx = 0
for para in doc.paragraphs:
if para.text.strip():
texts.append({"id": f"para_{para_idx}", "text": para.text})
para_idx += 1
table_idx = 0
for table in doc.tables:
for row_idx, row in enumerate(table.rows):
for cell_idx, cell in enumerate(row.cells):
if cell.text.strip():
texts.append(
{
"id": f"table_{table_idx}_r{row_idx}_c{cell_idx}",
"text": cell.text,
}
)
table_idx += 1
elif file_extension == ".pptx":
from pptx import Presentation
prs = Presentation(input_path)
for slide_idx, slide in enumerate(prs.slides):
for shape_idx, shape in enumerate(slide.shapes):
if shape.has_text_frame:
for para_idx, para in enumerate(shape.text_frame.paragraphs):
for run_idx, run in enumerate(para.runs):
if run.text.strip():
texts.append(
{
"id": f"slide_{slide_idx}_shape_{shape_idx}_para_{para_idx}_run_{run_idx}",
"text": run.text,
}
)
session_data = {
"original_filename": file.filename,
"file_extension": file_extension,
"input_path": str(input_path),
"text_count": len(texts),
}
session_file = config.UPLOAD_DIR / f"session_{session_id}.json"
with open(session_file, "w", encoding="utf-8") as f:
json.dump(session_data, f)
logger.info(
f"Extracted {len(texts)} texts from {file.filename}, session: {session_id}"
)
return {
"session_id": session_id,
"texts": texts,
"file_type": file_extension,
"text_count": len(texts),
}
except HTTPException:
raise
except Exception as e:
logger.exception("Text extraction error")
return JSONResponse(
status_code=500,
content={
"error": "INTERNAL_ERROR",
"message": "Erreur lors de l'extraction des textes. Veuillez reessayer.",
},
)
@router.post("/reconstruct-document")
async def reconstruct_document(
session_id: str = Form(..., description="Session ID from extract-texts"),
translations: str = Form(
..., description="JSON array of {id, translated_text} objects"
),
current_user: Optional[Any] = Depends(get_authenticated_user),
):
"""Reconstruct a document with translated texts. session_id must be a valid UUID."""
import json
import uuid
try:
uuid.UUID(session_id)
except (ValueError, TypeError):
raise HTTPException(status_code=400, detail="Invalid session ID")
try:
session_file = config.UPLOAD_DIR / f"session_{session_id}.json"
if not session_file.exists():
raise HTTPException(status_code=404, detail="Session not found or expired")
with open(session_file, "r", encoding="utf-8") as f:
session_data = json.load(f)
input_path = Path(session_data["input_path"]).resolve()
upload_dir_resolved = config.UPLOAD_DIR.resolve()
if not input_path.is_relative_to(upload_dir_resolved):
raise HTTPException(status_code=400, detail="Invalid session data")
file_extension = session_data["file_extension"]
original_filename = session_data["original_filename"]
if not input_path.exists():
raise HTTPException(
status_code=404, detail="Source file not found or expired"
)
translation_list = json.loads(translations)
translation_map = {t["id"]: t["translated_text"] for t in translation_list}
output_filename = file_handler.generate_unique_filename(
original_filename, "translated"
)
output_path = config.OUTPUT_DIR / output_filename
if file_extension == ".xlsx":
from openpyxl import load_workbook
import shutil
shutil.copy(input_path, output_path)
wb = load_workbook(output_path)
for sheet in wb.worksheets:
for row in sheet.iter_rows():
for cell in row:
cell_id = f"{sheet.title}!{cell.coordinate}"
if cell_id in translation_map:
cell.value = translation_map[cell_id]
wb.save(output_path)
wb.close()
elif file_extension == ".docx":
from docx import Document
import shutil
shutil.copy(input_path, output_path)
doc = Document(output_path)
para_idx = 0
for para in doc.paragraphs:
para_id = f"para_{para_idx}"
if para_id in translation_map and para.text.strip():
for run in para.runs:
run.text = ""
if para.runs:
para.runs[0].text = translation_map[para_id]
else:
para.text = translation_map[para_id]
para_idx += 1
table_idx = 0
for table in doc.tables:
for row_idx, row in enumerate(table.rows):
for cell_idx, cell in enumerate(row.cells):
cell_id = f"table_{table_idx}_r{row_idx}_c{cell_idx}"
if cell_id in translation_map:
for para in cell.paragraphs:
for run in para.runs:
run.text = ""
if cell.paragraphs and cell.paragraphs[0].runs:
cell.paragraphs[0].runs[0].text = translation_map[
cell_id
]
elif cell.paragraphs:
cell.paragraphs[0].text = translation_map[cell_id]
table_idx += 1
doc.save(output_path)
elif file_extension == ".pptx":
from pptx import Presentation
import shutil
shutil.copy(input_path, output_path)
prs = Presentation(output_path)
for slide_idx, slide in enumerate(prs.slides):
for shape_idx, shape in enumerate(slide.shapes):
if shape.has_text_frame:
for para_idx, para in enumerate(shape.text_frame.paragraphs):
for run_idx, run in enumerate(para.runs):
run_id = f"slide_{slide_idx}_shape_{shape_idx}_para_{para_idx}_run_{run_idx}"
if run_id in translation_map:
run.text = translation_map[run_id]
prs.save(output_path)
file_handler.cleanup_file(input_path)
file_handler.cleanup_file(session_file)
logger.info(f"Reconstructed document: {output_path}")
return FileResponse(
path=output_path,
filename=f"translated_{original_filename}",
media_type="application/octet-stream",
)
except HTTPException:
raise
except Exception as e:
logger.exception("Reconstruction error")
return JSONResponse(
status_code=500,
content={
"error": "INTERNAL_ERROR",
"message": "Erreur lors de la reconstruction du document. Veuillez reessayer.",
},
)
@router.get("/metrics")
async def get_metrics(
current_user: Optional[Any] = Depends(get_authenticated_user),
):
"""Get system metrics and statistics for monitoring"""
from middleware.cleanup import create_cleanup_manager
from middleware.rate_limiting import RateLimitManager, RateLimitConfig
cleanup_manager = create_cleanup_manager(config)
rate_limit_config = RateLimitConfig(
requests_per_minute=config.RATE_LIMIT_PER_MINUTE,
requests_per_hour=config.RATE_LIMIT_PER_HOUR,
translations_per_minute=config.TRANSLATIONS_PER_MINUTE,
translations_per_hour=config.TRANSLATIONS_PER_HOUR,
max_concurrent_translations=config.MAX_CONCURRENT_TRANSLATIONS,
)
rate_limit_manager = RateLimitManager(rate_limit_config)
cleanup_stats = cleanup_manager.get_stats()
rate_limit_stats = rate_limit_manager.get_stats()
return {
"system": {
"memory": {},
"disk": {},
"status": "healthy",
},
"cleanup": cleanup_stats,
"rate_limits": rate_limit_stats,
"config": {
"max_file_size_mb": config.MAX_FILE_SIZE_MB,
"supported_extensions": list(config.SUPPORTED_EXTENSIONS),
"translation_service": config.TRANSLATION_SERVICE,
},
}
@router.get("/rate-limit/status")
async def get_rate_limit_status(request: Request):
"""Get current rate limit status for the requesting client"""
from middleware.rate_limiting import RateLimitManager, RateLimitConfig
rate_limit_config = RateLimitConfig(
requests_per_minute=config.RATE_LIMIT_PER_MINUTE,
requests_per_hour=config.RATE_LIMIT_PER_HOUR,
translations_per_minute=config.TRANSLATIONS_PER_MINUTE,
translations_per_hour=config.TRANSLATIONS_PER_HOUR,
max_concurrent_translations=config.MAX_CONCURRENT_TRANSLATIONS,
)
rate_limit_manager = RateLimitManager(rate_limit_config)
client_ip = request.client.host if request.client else "unknown"
status = await rate_limit_manager.get_client_status(client_ip)
return {
"client_ip": client_ip,
"limits": {
"requests_per_minute": rate_limit_config.requests_per_minute,
"requests_per_hour": rate_limit_config.requests_per_hour,
"translations_per_minute": rate_limit_config.translations_per_minute,
"translations_per_hour": rate_limit_config.translations_per_hour,
},
"current_usage": status,
}