diff --git a/routes/legacy_routes.py b/routes/legacy_routes.py index 8a371dd..2e6829b 100644 --- a/routes/legacy_routes.py +++ b/routes/legacy_routes.py @@ -20,6 +20,26 @@ logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1", tags=["Legacy"]) +def _safe_output_path(filename: str): + """ + Resolve filename to a path under config.OUTPUT_DIR. Prevents path traversal. + Returns (Path, True) if valid, (None, False) if invalid. + """ + if not filename or ".." in filename or "/" in filename or "\\" in filename: + return None, False + safe_name = Path(filename).name + if not safe_name.strip(): + return None, False + base = config.OUTPUT_DIR.resolve() + try: + resolved = (config.OUTPUT_DIR / safe_name).resolve() + if not resolved.is_relative_to(base): + return None, False + return resolved, True + except (ValueError, OSError): + return None, False + + def _resolve_model( cfg_model: Optional[str], model_env: str, @@ -298,32 +318,30 @@ async def translate_batch_documents( @router.get("/download/{filename}") async def download_file(filename: str): - """Download a translated file by filename""" - file_path = config.OUTPUT_DIR / filename - + """Download a translated file by filename. Filename is sanitized to prevent path traversal.""" + file_path, ok = _safe_output_path(filename) + if not ok or file_path is None: + raise HTTPException(status_code=400, detail="Invalid filename") if not file_path.exists(): raise HTTPException(status_code=404, detail="File not found") - return FileResponse( path=file_path, - filename=filename, + filename=file_path.name, media_type="application/octet-stream", ) @router.delete("/cleanup/{filename}") async def cleanup_translated_file(filename: str): - """Cleanup a translated file after download""" + """Cleanup a translated file after download. Filename is sanitized to prevent path traversal.""" + file_path, ok = _safe_output_path(filename) + if not ok or file_path is None: + raise HTTPException(status_code=400, detail="Invalid filename") try: - file_path = config.OUTPUT_DIR / filename - if not file_path.exists(): raise HTTPException(status_code=404, detail="File not found") - file_handler.cleanup_file(file_path) - - return {"message": f"File {filename} deleted successfully"} - + return {"message": f"File {file_path.name} deleted successfully"} except HTTPException: raise except Exception as e: @@ -454,8 +472,14 @@ async def reconstruct_document( ), target_language: str = Form(..., description="Target language code"), ): - """Reconstruct a document with translated texts""" + """Reconstruct a document with translated texts. session_id must be a valid UUID.""" import json + import uuid + + try: + uuid.UUID(session_id) + except (ValueError, TypeError): + raise HTTPException(status_code=400, detail="Invalid session ID") try: session_file = config.UPLOAD_DIR / f"session_{session_id}.json" @@ -465,7 +489,10 @@ async def reconstruct_document( with open(session_file, "r", encoding="utf-8") as f: session_data = json.load(f) - input_path = Path(session_data["input_path"]) + input_path = Path(session_data["input_path"]).resolve() + upload_dir_resolved = config.UPLOAD_DIR.resolve() + if not input_path.is_relative_to(upload_dir_resolved): + raise HTTPException(status_code=400, detail="Invalid session data") file_extension = session_data["file_extension"] original_filename = session_data["original_filename"]