Files
office_translator/routes/legacy_routes.py
sepehr ce8e150a61 feat: homelab deployment - NPM + IONOS DNS + monitoring + NAS backup
- Restructured docker-compose for Nginx Proxy Manager (no custom nginx)
- Added domain wordly.art configuration
- Added Prometheus + Grafana monitoring stack with pre-configured dashboards
- Added PostgreSQL backup script to NAS (daily/weekly/monthly rotation)
- Added alert rules for backend, system, and Docker metrics
- Updated deployment guide for NPM + IONOS DNS homelab setup
- Added marketing plan document
- PDF translator and watermark support
- Enhanced middleware, routes, and translator modules

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-10 11:43:28 +02:00

694 lines
24 KiB
Python

"""
Legacy API v1 Endpoints
Endpoints migrated from main.py that don't fit in other routers
Story 3.5: API Versioning
"""
import logging
import os
from pathlib import Path
from typing import Optional, Any
from fastapi import APIRouter, File, Form, UploadFile, HTTPException, Request, Depends
from fastapi.responses import FileResponse, JSONResponse
from config import config
from utils import file_handler
from middleware.api_key_auth import get_authenticated_user
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1", tags=["Legacy"])
def _safe_output_path(filename: str):
"""
Resolve filename to a path under config.OUTPUT_DIR. Prevents path traversal.
Returns (Path, True) if valid, (None, False) if invalid.
"""
if not filename or ".." in filename or "/" in filename or "\\" in filename:
return None, False
safe_name = Path(filename).name
if not safe_name.strip():
return None, False
base = config.OUTPUT_DIR.resolve()
try:
resolved = (config.OUTPUT_DIR / safe_name).resolve()
if not resolved.is_relative_to(base):
return None, False
return resolved, True
except (ValueError, OSError):
return None, False
def _resolve_model(
cfg_model: Optional[str],
model_env: str,
default: str,
) -> str:
"""Resolve effective model: JSON config > env var > default."""
v = (cfg_model or "").strip() or os.getenv(model_env, "").strip()
return v or default
@router.get("/providers/available")
async def get_available_providers():
"""
Return every provider that is enabled — checking BOTH the admin settings JSON
AND environment variables (env vars act as a fallback / override).
Rules:
- Google Translate is always shown.
- Ollama is only shown in DEV mode (APP_ENV=development or SHOW_OLLAMA=true).
- openrouter → shown as "Traduction IA Essentielle" (cheap models).
- openrouter_premium → shown as "Traduction IA Premium" (premium models).
"""
from routes.admin_routes import load_settings
settings = load_settings()
is_dev = os.getenv("APP_ENV", "production").lower() == "development" or \
os.getenv("SHOW_OLLAMA", "false").lower() == "true"
def _key_ready(key_var: str) -> bool:
return bool(os.getenv(key_var, "").strip())
def _url_ready(url_var: str) -> bool:
return bool(os.getenv(url_var, "").strip())
def _is_enabled(name: str, key_var: str = "", url_var: str = "") -> bool:
cfg = getattr(settings, name, None)
if cfg and cfg.enabled:
return True
if key_var and _key_ready(key_var):
return True
if url_var and _url_ready(url_var):
return True
return False
available = []
# Google Translate — always available
available.append({
"id": "google",
"label": "Google Traduction",
"description": "Traduction rapide, 130+ langues, fiable",
"mode": "classic",
"tier": "free",
})
# DeepL — if configured
if _is_enabled("deepl", key_var="DEEPL_API_KEY"):
available.append({
"id": "deepl",
"label": "DeepL",
"description": "Traduction professionnelle haute qualité (langues européennes)",
"mode": "classic",
"tier": "pro",
})
# AI Essentielle (OpenRouter — cheap model)
if _is_enabled("openrouter", key_var="OPENROUTER_API_KEY"):
or_cfg = getattr(settings, "openrouter", None)
model = _resolve_model(
or_cfg.model if or_cfg else None,
"OPENROUTER_MODEL",
"deepseek/deepseek-v3.2",
)
available.append({
"id": "openrouter",
"label": "Traduction IA Essentielle",
"description": "IA rapide et économique — idéale pour documents techniques",
"mode": "llm",
"tier": "pro",
"model": model,
})
# AI Premium (OpenRouter — premium model)
if _is_enabled("openrouter_premium", key_var="OPENROUTER_API_KEY"):
orp_cfg = getattr(settings, "openrouter_premium", None)
model = _resolve_model(
orp_cfg.model if orp_cfg else None,
"OPENROUTER_PREMIUM_MODEL",
"anthropic/claude-3.5-haiku",
)
available.append({
"id": "openrouter_premium",
"label": "Traduction IA Premium",
"description": "IA haute précision (GPT-4, Claude) — meilleure qualité littéraire",
"mode": "llm",
"tier": "business",
"model": model,
})
# OpenAI direct — if configured
if _is_enabled("openai", key_var="OPENAI_API_KEY"):
oai_cfg = getattr(settings, "openai", None)
model = _resolve_model(
oai_cfg.model if oai_cfg else None,
"OPENAI_MODEL",
"gpt-4o-mini",
)
available.append({
"id": "openai",
"label": "OpenAI GPT",
"description": "Traduction IA via OpenAI directement",
"mode": "llm",
"tier": "business",
"model": model,
})
# z.AI / xAI Grok — if configured
if _is_enabled("zai", key_var="ZAI_API_KEY"):
zai_cfg = getattr(settings, "zai", None)
model = _resolve_model(
zai_cfg.model if zai_cfg else None,
"ZAI_MODEL",
"grok-2-1212",
)
available.append({
"id": "zai",
"label": "Grok (xAI)",
"description": "IA Grok par xAI — traduction avancée",
"mode": "llm",
"tier": "business",
"model": model,
})
# Ollama — dev only
if is_dev and _is_enabled("ollama", url_var="OLLAMA_BASE_URL"):
oll_cfg = getattr(settings, "ollama", None)
model = _resolve_model(
oll_cfg.model if oll_cfg else None,
"OLLAMA_MODEL",
"llama3",
)
available.append({
"id": "ollama",
"label": "Ollama (Local)",
"description": "Modèle LLM local — développement uniquement",
"mode": "llm",
"tier": "dev",
"model": model,
})
return {"providers": available}
@router.get("/languages")
async def get_supported_languages():
"""Get list of supported language codes, ordered by internet popularity"""
return {
"supported_languages": {
# Top 5 — dominant on the internet
"en": "English",
"es": "Spanish",
"de": "German",
"fr": "French",
"ja": "Japanese",
# Top 6-15
"pt": "Portuguese",
"ru": "Russian",
"it": "Italian",
"zh-CN": "Chinese (Simplified)",
"zh-TW": "Chinese (Traditional)",
"pl": "Polish",
"nl": "Dutch",
"tr": "Turkish",
"ko": "Korean",
"ar": "Arabic",
# Top 16-25
"fa": "Persian (Farsi)",
"vi": "Vietnamese",
"id": "Indonesian",
"uk": "Ukrainian",
"sv": "Swedish",
"cs": "Czech",
"el": "Greek",
"he": "Hebrew",
"hi": "Hindi",
"ro": "Romanian",
# Others
"da": "Danish",
"fi": "Finnish",
"no": "Norwegian",
"hu": "Hungarian",
"th": "Thai",
"sk": "Slovak",
"bg": "Bulgarian",
"hr": "Croatian",
"ca": "Catalan",
"ms": "Malay",
},
"note": "Supported languages may vary depending on the translation service configured",
}
@router.post("/translate-batch")
async def translate_batch_documents(
files: list[UploadFile] = File(
..., description="Multiple document files to translate"
),
target_language: str = Form(..., description="Target language code"),
source_language: str = Form(default="auto", description="Source language code"),
current_user: Optional[Any] = Depends(get_authenticated_user),
):
"""Translate multiple documents in batch"""
from translators import excel_translator, word_translator, pptx_translator
results = []
for file in files:
try:
file_extension = file_handler.validate_file_extension(file.filename)
file_handler.validate_file_size(file)
input_filename = file_handler.generate_unique_filename(
file.filename, "input"
)
output_filename = file_handler.generate_unique_filename(
file.filename, "translated"
)
input_path = config.UPLOAD_DIR / input_filename
output_path = config.OUTPUT_DIR / output_filename
file_handler.save_upload_file(file, input_path)
if file_extension == ".xlsx":
excel_translator.translate_file(
input_path, output_path, target_language, source_language
)
elif file_extension == ".docx":
word_translator.translate_file(
input_path, output_path, target_language, source_language
)
elif file_extension == ".pptx":
pptx_translator.translate_file(
input_path, output_path, target_language, source_language
)
file_handler.cleanup_file(input_path)
results.append(
{
"filename": file.filename,
"status": "success",
"output_file": output_filename,
"download_url": f"/api/v1/download/{output_filename}",
}
)
except Exception as e:
logger.exception(f"Error processing {file.filename}")
results.append(
{
"filename": file.filename,
"status": "error",
"error": "INTERNAL_ERROR",
"message": "Erreur lors du traitement du fichier.",
"details": {},
}
)
return {
"total_files": len(files),
"successful": len([r for r in results if r["status"] == "success"]),
"failed": len([r for r in results if r["status"] == "error"]),
"results": results,
}
@router.get("/download/{filename}")
async def download_file(filename: str):
"""Download a translated file by filename. Filename is sanitized to prevent path traversal."""
file_path, ok = _safe_output_path(filename)
if not ok or file_path is None:
raise HTTPException(status_code=400, detail="Invalid filename")
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(
path=file_path,
filename=file_path.name,
media_type="application/octet-stream",
)
@router.delete("/cleanup/{filename}")
async def cleanup_translated_file(filename: str):
"""Cleanup a translated file after download. Filename is sanitized to prevent path traversal."""
file_path, ok = _safe_output_path(filename)
if not ok or file_path is None:
raise HTTPException(status_code=400, detail="Invalid filename")
try:
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
file_handler.cleanup_file(file_path)
return {"message": f"File {file_path.name} deleted successfully"}
except HTTPException:
raise
except Exception as e:
logger.exception("Cleanup error")
raise HTTPException(
status_code=500, detail="Erreur lors de la suppression du fichier."
)
@router.post("/extract-texts")
async def extract_texts_from_document(
file: UploadFile = File(..., description="Document file to extract texts from"),
current_user: Optional[Any] = Depends(get_authenticated_user),
):
"""Extract all translatable texts from a document for client-side translation"""
import uuid
import json
try:
file_extension = file_handler.validate_file_extension(file.filename)
logger.info(f"Extracting texts from {file_extension} file: {file.filename}")
file_handler.validate_file_size(file)
session_id = str(uuid.uuid4())
input_filename = f"session_{session_id}{file_extension}"
input_path = config.UPLOAD_DIR / input_filename
file_handler.save_upload_file(file, input_path)
texts = []
if file_extension == ".xlsx":
from openpyxl import load_workbook
wb = load_workbook(input_path)
for sheet in wb.worksheets:
for row in sheet.iter_rows():
for cell in row:
if (
cell.value
and isinstance(cell.value, str)
and cell.value.strip()
):
texts.append(
{
"id": f"{sheet.title}!{cell.coordinate}",
"text": cell.value,
}
)
wb.close()
elif file_extension == ".docx":
from docx import Document
doc = Document(input_path)
para_idx = 0
for para in doc.paragraphs:
if para.text.strip():
texts.append({"id": f"para_{para_idx}", "text": para.text})
para_idx += 1
table_idx = 0
for table in doc.tables:
for row_idx, row in enumerate(table.rows):
for cell_idx, cell in enumerate(row.cells):
if cell.text.strip():
texts.append(
{
"id": f"table_{table_idx}_r{row_idx}_c{cell_idx}",
"text": cell.text,
}
)
table_idx += 1
elif file_extension == ".pptx":
from pptx import Presentation
prs = Presentation(input_path)
for slide_idx, slide in enumerate(prs.slides):
for shape_idx, shape in enumerate(slide.shapes):
if shape.has_text_frame:
for para_idx, para in enumerate(shape.text_frame.paragraphs):
for run_idx, run in enumerate(para.runs):
if run.text.strip():
texts.append(
{
"id": f"slide_{slide_idx}_shape_{shape_idx}_para_{para_idx}_run_{run_idx}",
"text": run.text,
}
)
session_data = {
"original_filename": file.filename,
"file_extension": file_extension,
"input_path": str(input_path),
"text_count": len(texts),
}
session_file = config.UPLOAD_DIR / f"session_{session_id}.json"
with open(session_file, "w", encoding="utf-8") as f:
json.dump(session_data, f)
logger.info(
f"Extracted {len(texts)} texts from {file.filename}, session: {session_id}"
)
return {
"session_id": session_id,
"texts": texts,
"file_type": file_extension,
"text_count": len(texts),
}
except HTTPException:
raise
except Exception as e:
logger.exception("Text extraction error")
return JSONResponse(
status_code=500,
content={
"error": "INTERNAL_ERROR",
"message": "Erreur lors de l'extraction des textes. Veuillez reessayer.",
},
)
@router.post("/reconstruct-document")
async def reconstruct_document(
session_id: str = Form(..., description="Session ID from extract-texts"),
translations: str = Form(
..., description="JSON array of {id, translated_text} objects"
),
current_user: Optional[Any] = Depends(get_authenticated_user),
):
"""Reconstruct a document with translated texts. session_id must be a valid UUID."""
import json
import uuid
try:
uuid.UUID(session_id)
except (ValueError, TypeError):
raise HTTPException(status_code=400, detail="Invalid session ID")
try:
session_file = config.UPLOAD_DIR / f"session_{session_id}.json"
if not session_file.exists():
raise HTTPException(status_code=404, detail="Session not found or expired")
with open(session_file, "r", encoding="utf-8") as f:
session_data = json.load(f)
input_path = Path(session_data["input_path"]).resolve()
upload_dir_resolved = config.UPLOAD_DIR.resolve()
if not input_path.is_relative_to(upload_dir_resolved):
raise HTTPException(status_code=400, detail="Invalid session data")
file_extension = session_data["file_extension"]
original_filename = session_data["original_filename"]
if not input_path.exists():
raise HTTPException(
status_code=404, detail="Source file not found or expired"
)
translation_list = json.loads(translations)
translation_map = {t["id"]: t["translated_text"] for t in translation_list}
output_filename = file_handler.generate_unique_filename(
original_filename, "translated"
)
output_path = config.OUTPUT_DIR / output_filename
if file_extension == ".xlsx":
from openpyxl import load_workbook
import shutil
shutil.copy(input_path, output_path)
wb = load_workbook(output_path)
for sheet in wb.worksheets:
for row in sheet.iter_rows():
for cell in row:
cell_id = f"{sheet.title}!{cell.coordinate}"
if cell_id in translation_map:
cell.value = translation_map[cell_id]
wb.save(output_path)
wb.close()
elif file_extension == ".docx":
from docx import Document
import shutil
shutil.copy(input_path, output_path)
doc = Document(output_path)
para_idx = 0
for para in doc.paragraphs:
para_id = f"para_{para_idx}"
if para_id in translation_map and para.text.strip():
for run in para.runs:
run.text = ""
if para.runs:
para.runs[0].text = translation_map[para_id]
else:
para.text = translation_map[para_id]
para_idx += 1
table_idx = 0
for table in doc.tables:
for row_idx, row in enumerate(table.rows):
for cell_idx, cell in enumerate(row.cells):
cell_id = f"table_{table_idx}_r{row_idx}_c{cell_idx}"
if cell_id in translation_map:
for para in cell.paragraphs:
for run in para.runs:
run.text = ""
if cell.paragraphs and cell.paragraphs[0].runs:
cell.paragraphs[0].runs[0].text = translation_map[
cell_id
]
elif cell.paragraphs:
cell.paragraphs[0].text = translation_map[cell_id]
table_idx += 1
doc.save(output_path)
elif file_extension == ".pptx":
from pptx import Presentation
import shutil
shutil.copy(input_path, output_path)
prs = Presentation(output_path)
for slide_idx, slide in enumerate(prs.slides):
for shape_idx, shape in enumerate(slide.shapes):
if shape.has_text_frame:
for para_idx, para in enumerate(shape.text_frame.paragraphs):
for run_idx, run in enumerate(para.runs):
run_id = f"slide_{slide_idx}_shape_{shape_idx}_para_{para_idx}_run_{run_idx}"
if run_id in translation_map:
run.text = translation_map[run_id]
prs.save(output_path)
file_handler.cleanup_file(input_path)
file_handler.cleanup_file(session_file)
logger.info(f"Reconstructed document: {output_path}")
return FileResponse(
path=output_path,
filename=f"translated_{original_filename}",
media_type="application/octet-stream",
)
except HTTPException:
raise
except Exception as e:
logger.exception("Reconstruction error")
return JSONResponse(
status_code=500,
content={
"error": "INTERNAL_ERROR",
"message": "Erreur lors de la reconstruction du document. Veuillez reessayer.",
},
)
@router.get("/ollama/models")
async def list_ollama_models(base_url: Optional[str] = None):
"""List available Ollama models"""
from services.translation_service import OllamaTranslationProvider
url = base_url or config.OLLAMA_BASE_URL
models = OllamaTranslationProvider.list_models(url)
return {"ollama_url": url, "models": models, "count": len(models)}
@router.post("/ollama/configure")
async def configure_ollama(base_url: str = Form(...), model: str = Form(...)):
"""Configure Ollama settings"""
config.OLLAMA_BASE_URL = base_url
config.OLLAMA_MODEL = model
return {
"status": "success",
"message": "Ollama configuration updated",
"ollama_url": base_url,
"model": model,
}
@router.get("/metrics")
async def get_metrics(
current_user: Optional[Any] = Depends(get_authenticated_user),
):
"""Get system metrics and statistics for monitoring"""
from middleware.cleanup import create_cleanup_manager
from middleware.rate_limiting import RateLimitManager, RateLimitConfig
cleanup_manager = create_cleanup_manager(config)
rate_limit_config = RateLimitConfig(
requests_per_minute=config.RATE_LIMIT_PER_MINUTE,
requests_per_hour=config.RATE_LIMIT_PER_HOUR,
translations_per_minute=config.TRANSLATIONS_PER_MINUTE,
translations_per_hour=config.TRANSLATIONS_PER_HOUR,
max_concurrent_translations=config.MAX_CONCURRENT_TRANSLATIONS,
)
rate_limit_manager = RateLimitManager(rate_limit_config)
cleanup_stats = cleanup_manager.get_stats()
rate_limit_stats = rate_limit_manager.get_stats()
return {
"system": {
"memory": {},
"disk": {},
"status": "healthy",
},
"cleanup": cleanup_stats,
"rate_limits": rate_limit_stats,
"config": {
"max_file_size_mb": config.MAX_FILE_SIZE_MB,
"supported_extensions": list(config.SUPPORTED_EXTENSIONS),
"translation_service": config.TRANSLATION_SERVICE,
},
}
@router.get("/rate-limit/status")
async def get_rate_limit_status(request: Request):
"""Get current rate limit status for the requesting client"""
from middleware.rate_limiting import RateLimitManager, RateLimitConfig
rate_limit_config = RateLimitConfig(
requests_per_minute=config.RATE_LIMIT_PER_MINUTE,
requests_per_hour=config.RATE_LIMIT_PER_HOUR,
translations_per_minute=config.TRANSLATIONS_PER_MINUTE,
translations_per_hour=config.TRANSLATIONS_PER_HOUR,
max_concurrent_translations=config.MAX_CONCURRENT_TRANSLATIONS,
)
rate_limit_manager = RateLimitManager(rate_limit_config)
client_ip = request.client.host if request.client else "unknown"
status = await rate_limit_manager.get_client_status(client_ip)
return {
"client_ip": client_ip,
"limits": {
"requests_per_minute": rate_limit_config.requests_per_minute,
"requests_per_hour": rate_limit_config.requests_per_hour,
"translations_per_minute": rate_limit_config.translations_per_minute,
"translations_per_hour": rate_limit_config.translations_per_hour,
},
"current_usage": status,
}