- Restructured docker-compose for Nginx Proxy Manager (no custom nginx) - Added domain wordly.art configuration - Added Prometheus + Grafana monitoring stack with pre-configured dashboards - Added PostgreSQL backup script to NAS (daily/weekly/monthly rotation) - Added alert rules for backend, system, and Docker metrics - Updated deployment guide for NPM + IONOS DNS homelab setup - Added marketing plan document - PDF translator and watermark support - Enhanced middleware, routes, and translator modules Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
694 lines
24 KiB
Python
694 lines
24 KiB
Python
"""
|
|
Legacy API v1 Endpoints
|
|
Endpoints migrated from main.py that don't fit in other routers
|
|
Story 3.5: API Versioning
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Optional, Any
|
|
|
|
from fastapi import APIRouter, File, Form, UploadFile, HTTPException, Request, Depends
|
|
from fastapi.responses import FileResponse, JSONResponse
|
|
|
|
from config import config
|
|
from utils import file_handler
|
|
from middleware.api_key_auth import get_authenticated_user
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/api/v1", tags=["Legacy"])
|
|
|
|
|
|
def _safe_output_path(filename: str):
|
|
"""
|
|
Resolve filename to a path under config.OUTPUT_DIR. Prevents path traversal.
|
|
Returns (Path, True) if valid, (None, False) if invalid.
|
|
"""
|
|
if not filename or ".." in filename or "/" in filename or "\\" in filename:
|
|
return None, False
|
|
safe_name = Path(filename).name
|
|
if not safe_name.strip():
|
|
return None, False
|
|
base = config.OUTPUT_DIR.resolve()
|
|
try:
|
|
resolved = (config.OUTPUT_DIR / safe_name).resolve()
|
|
if not resolved.is_relative_to(base):
|
|
return None, False
|
|
return resolved, True
|
|
except (ValueError, OSError):
|
|
return None, False
|
|
|
|
|
|
def _resolve_model(
|
|
cfg_model: Optional[str],
|
|
model_env: str,
|
|
default: str,
|
|
) -> str:
|
|
"""Resolve effective model: JSON config > env var > default."""
|
|
v = (cfg_model or "").strip() or os.getenv(model_env, "").strip()
|
|
return v or default
|
|
|
|
|
|
@router.get("/providers/available")
|
|
async def get_available_providers():
|
|
"""
|
|
Return every provider that is enabled — checking BOTH the admin settings JSON
|
|
AND environment variables (env vars act as a fallback / override).
|
|
|
|
Rules:
|
|
- Google Translate is always shown.
|
|
- Ollama is only shown in DEV mode (APP_ENV=development or SHOW_OLLAMA=true).
|
|
- openrouter → shown as "Traduction IA Essentielle" (cheap models).
|
|
- openrouter_premium → shown as "Traduction IA Premium" (premium models).
|
|
"""
|
|
from routes.admin_routes import load_settings
|
|
|
|
settings = load_settings()
|
|
is_dev = os.getenv("APP_ENV", "production").lower() == "development" or \
|
|
os.getenv("SHOW_OLLAMA", "false").lower() == "true"
|
|
|
|
def _key_ready(key_var: str) -> bool:
|
|
return bool(os.getenv(key_var, "").strip())
|
|
|
|
def _url_ready(url_var: str) -> bool:
|
|
return bool(os.getenv(url_var, "").strip())
|
|
|
|
def _is_enabled(name: str, key_var: str = "", url_var: str = "") -> bool:
|
|
cfg = getattr(settings, name, None)
|
|
if cfg and cfg.enabled:
|
|
return True
|
|
if key_var and _key_ready(key_var):
|
|
return True
|
|
if url_var and _url_ready(url_var):
|
|
return True
|
|
return False
|
|
|
|
available = []
|
|
|
|
# Google Translate — always available
|
|
available.append({
|
|
"id": "google",
|
|
"label": "Google Traduction",
|
|
"description": "Traduction rapide, 130+ langues, fiable",
|
|
"mode": "classic",
|
|
"tier": "free",
|
|
})
|
|
|
|
# DeepL — if configured
|
|
if _is_enabled("deepl", key_var="DEEPL_API_KEY"):
|
|
available.append({
|
|
"id": "deepl",
|
|
"label": "DeepL",
|
|
"description": "Traduction professionnelle haute qualité (langues européennes)",
|
|
"mode": "classic",
|
|
"tier": "pro",
|
|
})
|
|
|
|
# AI Essentielle (OpenRouter — cheap model)
|
|
if _is_enabled("openrouter", key_var="OPENROUTER_API_KEY"):
|
|
or_cfg = getattr(settings, "openrouter", None)
|
|
model = _resolve_model(
|
|
or_cfg.model if or_cfg else None,
|
|
"OPENROUTER_MODEL",
|
|
"deepseek/deepseek-v3.2",
|
|
)
|
|
available.append({
|
|
"id": "openrouter",
|
|
"label": "Traduction IA Essentielle",
|
|
"description": "IA rapide et économique — idéale pour documents techniques",
|
|
"mode": "llm",
|
|
"tier": "pro",
|
|
"model": model,
|
|
})
|
|
|
|
# AI Premium (OpenRouter — premium model)
|
|
if _is_enabled("openrouter_premium", key_var="OPENROUTER_API_KEY"):
|
|
orp_cfg = getattr(settings, "openrouter_premium", None)
|
|
model = _resolve_model(
|
|
orp_cfg.model if orp_cfg else None,
|
|
"OPENROUTER_PREMIUM_MODEL",
|
|
"anthropic/claude-3.5-haiku",
|
|
)
|
|
available.append({
|
|
"id": "openrouter_premium",
|
|
"label": "Traduction IA Premium",
|
|
"description": "IA haute précision (GPT-4, Claude) — meilleure qualité littéraire",
|
|
"mode": "llm",
|
|
"tier": "business",
|
|
"model": model,
|
|
})
|
|
|
|
# OpenAI direct — if configured
|
|
if _is_enabled("openai", key_var="OPENAI_API_KEY"):
|
|
oai_cfg = getattr(settings, "openai", None)
|
|
model = _resolve_model(
|
|
oai_cfg.model if oai_cfg else None,
|
|
"OPENAI_MODEL",
|
|
"gpt-4o-mini",
|
|
)
|
|
available.append({
|
|
"id": "openai",
|
|
"label": "OpenAI GPT",
|
|
"description": "Traduction IA via OpenAI directement",
|
|
"mode": "llm",
|
|
"tier": "business",
|
|
"model": model,
|
|
})
|
|
|
|
# z.AI / xAI Grok — if configured
|
|
if _is_enabled("zai", key_var="ZAI_API_KEY"):
|
|
zai_cfg = getattr(settings, "zai", None)
|
|
model = _resolve_model(
|
|
zai_cfg.model if zai_cfg else None,
|
|
"ZAI_MODEL",
|
|
"grok-2-1212",
|
|
)
|
|
available.append({
|
|
"id": "zai",
|
|
"label": "Grok (xAI)",
|
|
"description": "IA Grok par xAI — traduction avancée",
|
|
"mode": "llm",
|
|
"tier": "business",
|
|
"model": model,
|
|
})
|
|
|
|
# Ollama — dev only
|
|
if is_dev and _is_enabled("ollama", url_var="OLLAMA_BASE_URL"):
|
|
oll_cfg = getattr(settings, "ollama", None)
|
|
model = _resolve_model(
|
|
oll_cfg.model if oll_cfg else None,
|
|
"OLLAMA_MODEL",
|
|
"llama3",
|
|
)
|
|
available.append({
|
|
"id": "ollama",
|
|
"label": "Ollama (Local)",
|
|
"description": "Modèle LLM local — développement uniquement",
|
|
"mode": "llm",
|
|
"tier": "dev",
|
|
"model": model,
|
|
})
|
|
|
|
return {"providers": available}
|
|
|
|
|
|
@router.get("/languages")
|
|
async def get_supported_languages():
|
|
"""Get list of supported language codes, ordered by internet popularity"""
|
|
return {
|
|
"supported_languages": {
|
|
# Top 5 — dominant on the internet
|
|
"en": "English",
|
|
"es": "Spanish",
|
|
"de": "German",
|
|
"fr": "French",
|
|
"ja": "Japanese",
|
|
# Top 6-15
|
|
"pt": "Portuguese",
|
|
"ru": "Russian",
|
|
"it": "Italian",
|
|
"zh-CN": "Chinese (Simplified)",
|
|
"zh-TW": "Chinese (Traditional)",
|
|
"pl": "Polish",
|
|
"nl": "Dutch",
|
|
"tr": "Turkish",
|
|
"ko": "Korean",
|
|
"ar": "Arabic",
|
|
# Top 16-25
|
|
"fa": "Persian (Farsi)",
|
|
"vi": "Vietnamese",
|
|
"id": "Indonesian",
|
|
"uk": "Ukrainian",
|
|
"sv": "Swedish",
|
|
"cs": "Czech",
|
|
"el": "Greek",
|
|
"he": "Hebrew",
|
|
"hi": "Hindi",
|
|
"ro": "Romanian",
|
|
# Others
|
|
"da": "Danish",
|
|
"fi": "Finnish",
|
|
"no": "Norwegian",
|
|
"hu": "Hungarian",
|
|
"th": "Thai",
|
|
"sk": "Slovak",
|
|
"bg": "Bulgarian",
|
|
"hr": "Croatian",
|
|
"ca": "Catalan",
|
|
"ms": "Malay",
|
|
},
|
|
"note": "Supported languages may vary depending on the translation service configured",
|
|
}
|
|
|
|
|
|
@router.post("/translate-batch")
|
|
async def translate_batch_documents(
|
|
files: list[UploadFile] = File(
|
|
..., description="Multiple document files to translate"
|
|
),
|
|
target_language: str = Form(..., description="Target language code"),
|
|
source_language: str = Form(default="auto", description="Source language code"),
|
|
current_user: Optional[Any] = Depends(get_authenticated_user),
|
|
):
|
|
"""Translate multiple documents in batch"""
|
|
from translators import excel_translator, word_translator, pptx_translator
|
|
|
|
results = []
|
|
|
|
for file in files:
|
|
try:
|
|
file_extension = file_handler.validate_file_extension(file.filename)
|
|
file_handler.validate_file_size(file)
|
|
|
|
input_filename = file_handler.generate_unique_filename(
|
|
file.filename, "input"
|
|
)
|
|
output_filename = file_handler.generate_unique_filename(
|
|
file.filename, "translated"
|
|
)
|
|
|
|
input_path = config.UPLOAD_DIR / input_filename
|
|
output_path = config.OUTPUT_DIR / output_filename
|
|
|
|
file_handler.save_upload_file(file, input_path)
|
|
|
|
if file_extension == ".xlsx":
|
|
excel_translator.translate_file(
|
|
input_path, output_path, target_language, source_language
|
|
)
|
|
elif file_extension == ".docx":
|
|
word_translator.translate_file(
|
|
input_path, output_path, target_language, source_language
|
|
)
|
|
elif file_extension == ".pptx":
|
|
pptx_translator.translate_file(
|
|
input_path, output_path, target_language, source_language
|
|
)
|
|
|
|
file_handler.cleanup_file(input_path)
|
|
|
|
results.append(
|
|
{
|
|
"filename": file.filename,
|
|
"status": "success",
|
|
"output_file": output_filename,
|
|
"download_url": f"/api/v1/download/{output_filename}",
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Error processing {file.filename}")
|
|
results.append(
|
|
{
|
|
"filename": file.filename,
|
|
"status": "error",
|
|
"error": "INTERNAL_ERROR",
|
|
"message": "Erreur lors du traitement du fichier.",
|
|
"details": {},
|
|
}
|
|
)
|
|
|
|
return {
|
|
"total_files": len(files),
|
|
"successful": len([r for r in results if r["status"] == "success"]),
|
|
"failed": len([r for r in results if r["status"] == "error"]),
|
|
"results": results,
|
|
}
|
|
|
|
|
|
@router.get("/download/{filename}")
|
|
async def download_file(filename: str):
|
|
"""Download a translated file by filename. Filename is sanitized to prevent path traversal."""
|
|
file_path, ok = _safe_output_path(filename)
|
|
if not ok or file_path is None:
|
|
raise HTTPException(status_code=400, detail="Invalid filename")
|
|
if not file_path.exists():
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
return FileResponse(
|
|
path=file_path,
|
|
filename=file_path.name,
|
|
media_type="application/octet-stream",
|
|
)
|
|
|
|
|
|
@router.delete("/cleanup/{filename}")
|
|
async def cleanup_translated_file(filename: str):
|
|
"""Cleanup a translated file after download. Filename is sanitized to prevent path traversal."""
|
|
file_path, ok = _safe_output_path(filename)
|
|
if not ok or file_path is None:
|
|
raise HTTPException(status_code=400, detail="Invalid filename")
|
|
try:
|
|
if not file_path.exists():
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
file_handler.cleanup_file(file_path)
|
|
return {"message": f"File {file_path.name} deleted successfully"}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.exception("Cleanup error")
|
|
raise HTTPException(
|
|
status_code=500, detail="Erreur lors de la suppression du fichier."
|
|
)
|
|
|
|
|
|
@router.post("/extract-texts")
|
|
async def extract_texts_from_document(
|
|
file: UploadFile = File(..., description="Document file to extract texts from"),
|
|
current_user: Optional[Any] = Depends(get_authenticated_user),
|
|
):
|
|
"""Extract all translatable texts from a document for client-side translation"""
|
|
import uuid
|
|
import json
|
|
|
|
try:
|
|
file_extension = file_handler.validate_file_extension(file.filename)
|
|
logger.info(f"Extracting texts from {file_extension} file: {file.filename}")
|
|
|
|
file_handler.validate_file_size(file)
|
|
|
|
session_id = str(uuid.uuid4())
|
|
|
|
input_filename = f"session_{session_id}{file_extension}"
|
|
input_path = config.UPLOAD_DIR / input_filename
|
|
file_handler.save_upload_file(file, input_path)
|
|
|
|
texts = []
|
|
|
|
if file_extension == ".xlsx":
|
|
from openpyxl import load_workbook
|
|
|
|
wb = load_workbook(input_path)
|
|
for sheet in wb.worksheets:
|
|
for row in sheet.iter_rows():
|
|
for cell in row:
|
|
if (
|
|
cell.value
|
|
and isinstance(cell.value, str)
|
|
and cell.value.strip()
|
|
):
|
|
texts.append(
|
|
{
|
|
"id": f"{sheet.title}!{cell.coordinate}",
|
|
"text": cell.value,
|
|
}
|
|
)
|
|
wb.close()
|
|
elif file_extension == ".docx":
|
|
from docx import Document
|
|
|
|
doc = Document(input_path)
|
|
para_idx = 0
|
|
for para in doc.paragraphs:
|
|
if para.text.strip():
|
|
texts.append({"id": f"para_{para_idx}", "text": para.text})
|
|
para_idx += 1
|
|
table_idx = 0
|
|
for table in doc.tables:
|
|
for row_idx, row in enumerate(table.rows):
|
|
for cell_idx, cell in enumerate(row.cells):
|
|
if cell.text.strip():
|
|
texts.append(
|
|
{
|
|
"id": f"table_{table_idx}_r{row_idx}_c{cell_idx}",
|
|
"text": cell.text,
|
|
}
|
|
)
|
|
table_idx += 1
|
|
elif file_extension == ".pptx":
|
|
from pptx import Presentation
|
|
|
|
prs = Presentation(input_path)
|
|
for slide_idx, slide in enumerate(prs.slides):
|
|
for shape_idx, shape in enumerate(slide.shapes):
|
|
if shape.has_text_frame:
|
|
for para_idx, para in enumerate(shape.text_frame.paragraphs):
|
|
for run_idx, run in enumerate(para.runs):
|
|
if run.text.strip():
|
|
texts.append(
|
|
{
|
|
"id": f"slide_{slide_idx}_shape_{shape_idx}_para_{para_idx}_run_{run_idx}",
|
|
"text": run.text,
|
|
}
|
|
)
|
|
|
|
session_data = {
|
|
"original_filename": file.filename,
|
|
"file_extension": file_extension,
|
|
"input_path": str(input_path),
|
|
"text_count": len(texts),
|
|
}
|
|
session_file = config.UPLOAD_DIR / f"session_{session_id}.json"
|
|
with open(session_file, "w", encoding="utf-8") as f:
|
|
json.dump(session_data, f)
|
|
|
|
logger.info(
|
|
f"Extracted {len(texts)} texts from {file.filename}, session: {session_id}"
|
|
)
|
|
|
|
return {
|
|
"session_id": session_id,
|
|
"texts": texts,
|
|
"file_type": file_extension,
|
|
"text_count": len(texts),
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.exception("Text extraction error")
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={
|
|
"error": "INTERNAL_ERROR",
|
|
"message": "Erreur lors de l'extraction des textes. Veuillez reessayer.",
|
|
},
|
|
)
|
|
|
|
|
|
@router.post("/reconstruct-document")
|
|
async def reconstruct_document(
|
|
session_id: str = Form(..., description="Session ID from extract-texts"),
|
|
translations: str = Form(
|
|
..., description="JSON array of {id, translated_text} objects"
|
|
),
|
|
current_user: Optional[Any] = Depends(get_authenticated_user),
|
|
):
|
|
"""Reconstruct a document with translated texts. session_id must be a valid UUID."""
|
|
import json
|
|
import uuid
|
|
|
|
try:
|
|
uuid.UUID(session_id)
|
|
except (ValueError, TypeError):
|
|
raise HTTPException(status_code=400, detail="Invalid session ID")
|
|
|
|
try:
|
|
session_file = config.UPLOAD_DIR / f"session_{session_id}.json"
|
|
if not session_file.exists():
|
|
raise HTTPException(status_code=404, detail="Session not found or expired")
|
|
|
|
with open(session_file, "r", encoding="utf-8") as f:
|
|
session_data = json.load(f)
|
|
|
|
input_path = Path(session_data["input_path"]).resolve()
|
|
upload_dir_resolved = config.UPLOAD_DIR.resolve()
|
|
if not input_path.is_relative_to(upload_dir_resolved):
|
|
raise HTTPException(status_code=400, detail="Invalid session data")
|
|
file_extension = session_data["file_extension"]
|
|
original_filename = session_data["original_filename"]
|
|
|
|
if not input_path.exists():
|
|
raise HTTPException(
|
|
status_code=404, detail="Source file not found or expired"
|
|
)
|
|
|
|
translation_list = json.loads(translations)
|
|
translation_map = {t["id"]: t["translated_text"] for t in translation_list}
|
|
|
|
output_filename = file_handler.generate_unique_filename(
|
|
original_filename, "translated"
|
|
)
|
|
output_path = config.OUTPUT_DIR / output_filename
|
|
|
|
if file_extension == ".xlsx":
|
|
from openpyxl import load_workbook
|
|
import shutil
|
|
|
|
shutil.copy(input_path, output_path)
|
|
wb = load_workbook(output_path)
|
|
for sheet in wb.worksheets:
|
|
for row in sheet.iter_rows():
|
|
for cell in row:
|
|
cell_id = f"{sheet.title}!{cell.coordinate}"
|
|
if cell_id in translation_map:
|
|
cell.value = translation_map[cell_id]
|
|
wb.save(output_path)
|
|
wb.close()
|
|
|
|
elif file_extension == ".docx":
|
|
from docx import Document
|
|
import shutil
|
|
|
|
shutil.copy(input_path, output_path)
|
|
doc = Document(output_path)
|
|
para_idx = 0
|
|
for para in doc.paragraphs:
|
|
para_id = f"para_{para_idx}"
|
|
if para_id in translation_map and para.text.strip():
|
|
for run in para.runs:
|
|
run.text = ""
|
|
if para.runs:
|
|
para.runs[0].text = translation_map[para_id]
|
|
else:
|
|
para.text = translation_map[para_id]
|
|
para_idx += 1
|
|
table_idx = 0
|
|
for table in doc.tables:
|
|
for row_idx, row in enumerate(table.rows):
|
|
for cell_idx, cell in enumerate(row.cells):
|
|
cell_id = f"table_{table_idx}_r{row_idx}_c{cell_idx}"
|
|
if cell_id in translation_map:
|
|
for para in cell.paragraphs:
|
|
for run in para.runs:
|
|
run.text = ""
|
|
if cell.paragraphs and cell.paragraphs[0].runs:
|
|
cell.paragraphs[0].runs[0].text = translation_map[
|
|
cell_id
|
|
]
|
|
elif cell.paragraphs:
|
|
cell.paragraphs[0].text = translation_map[cell_id]
|
|
table_idx += 1
|
|
doc.save(output_path)
|
|
|
|
elif file_extension == ".pptx":
|
|
from pptx import Presentation
|
|
import shutil
|
|
|
|
shutil.copy(input_path, output_path)
|
|
prs = Presentation(output_path)
|
|
for slide_idx, slide in enumerate(prs.slides):
|
|
for shape_idx, shape in enumerate(slide.shapes):
|
|
if shape.has_text_frame:
|
|
for para_idx, para in enumerate(shape.text_frame.paragraphs):
|
|
for run_idx, run in enumerate(para.runs):
|
|
run_id = f"slide_{slide_idx}_shape_{shape_idx}_para_{para_idx}_run_{run_idx}"
|
|
if run_id in translation_map:
|
|
run.text = translation_map[run_id]
|
|
prs.save(output_path)
|
|
|
|
file_handler.cleanup_file(input_path)
|
|
file_handler.cleanup_file(session_file)
|
|
|
|
logger.info(f"Reconstructed document: {output_path}")
|
|
|
|
return FileResponse(
|
|
path=output_path,
|
|
filename=f"translated_{original_filename}",
|
|
media_type="application/octet-stream",
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.exception("Reconstruction error")
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={
|
|
"error": "INTERNAL_ERROR",
|
|
"message": "Erreur lors de la reconstruction du document. Veuillez reessayer.",
|
|
},
|
|
)
|
|
|
|
|
|
@router.get("/ollama/models")
|
|
async def list_ollama_models(base_url: Optional[str] = None):
|
|
"""List available Ollama models"""
|
|
from services.translation_service import OllamaTranslationProvider
|
|
|
|
url = base_url or config.OLLAMA_BASE_URL
|
|
models = OllamaTranslationProvider.list_models(url)
|
|
|
|
return {"ollama_url": url, "models": models, "count": len(models)}
|
|
|
|
|
|
@router.post("/ollama/configure")
|
|
async def configure_ollama(base_url: str = Form(...), model: str = Form(...)):
|
|
"""Configure Ollama settings"""
|
|
config.OLLAMA_BASE_URL = base_url
|
|
config.OLLAMA_MODEL = model
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": "Ollama configuration updated",
|
|
"ollama_url": base_url,
|
|
"model": model,
|
|
}
|
|
|
|
|
|
@router.get("/metrics")
|
|
async def get_metrics(
|
|
current_user: Optional[Any] = Depends(get_authenticated_user),
|
|
):
|
|
"""Get system metrics and statistics for monitoring"""
|
|
from middleware.cleanup import create_cleanup_manager
|
|
from middleware.rate_limiting import RateLimitManager, RateLimitConfig
|
|
|
|
cleanup_manager = create_cleanup_manager(config)
|
|
rate_limit_config = RateLimitConfig(
|
|
requests_per_minute=config.RATE_LIMIT_PER_MINUTE,
|
|
requests_per_hour=config.RATE_LIMIT_PER_HOUR,
|
|
translations_per_minute=config.TRANSLATIONS_PER_MINUTE,
|
|
translations_per_hour=config.TRANSLATIONS_PER_HOUR,
|
|
max_concurrent_translations=config.MAX_CONCURRENT_TRANSLATIONS,
|
|
)
|
|
rate_limit_manager = RateLimitManager(rate_limit_config)
|
|
|
|
cleanup_stats = cleanup_manager.get_stats()
|
|
rate_limit_stats = rate_limit_manager.get_stats()
|
|
|
|
return {
|
|
"system": {
|
|
"memory": {},
|
|
"disk": {},
|
|
"status": "healthy",
|
|
},
|
|
"cleanup": cleanup_stats,
|
|
"rate_limits": rate_limit_stats,
|
|
"config": {
|
|
"max_file_size_mb": config.MAX_FILE_SIZE_MB,
|
|
"supported_extensions": list(config.SUPPORTED_EXTENSIONS),
|
|
"translation_service": config.TRANSLATION_SERVICE,
|
|
},
|
|
}
|
|
|
|
|
|
@router.get("/rate-limit/status")
|
|
async def get_rate_limit_status(request: Request):
|
|
"""Get current rate limit status for the requesting client"""
|
|
from middleware.rate_limiting import RateLimitManager, RateLimitConfig
|
|
|
|
rate_limit_config = RateLimitConfig(
|
|
requests_per_minute=config.RATE_LIMIT_PER_MINUTE,
|
|
requests_per_hour=config.RATE_LIMIT_PER_HOUR,
|
|
translations_per_minute=config.TRANSLATIONS_PER_MINUTE,
|
|
translations_per_hour=config.TRANSLATIONS_PER_HOUR,
|
|
max_concurrent_translations=config.MAX_CONCURRENT_TRANSLATIONS,
|
|
)
|
|
rate_limit_manager = RateLimitManager(rate_limit_config)
|
|
|
|
client_ip = request.client.host if request.client else "unknown"
|
|
status = await rate_limit_manager.get_client_status(client_ip)
|
|
|
|
return {
|
|
"client_ip": client_ip,
|
|
"limits": {
|
|
"requests_per_minute": rate_limit_config.requests_per_minute,
|
|
"requests_per_hour": rate_limit_config.requests_per_hour,
|
|
"translations_per_minute": rate_limit_config.translations_per_minute,
|
|
"translations_per_hour": rate_limit_config.translations_per_hour,
|
|
},
|
|
"current_usage": status,
|
|
}
|