""" Document Translation API FastAPI application for translating complex documents while preserving formatting """ from fastapi import FastAPI, UploadFile, File, Form, HTTPException from fastapi.responses import FileResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from pathlib import Path from typing import Optional import asyncio import logging from config import config from translators import excel_translator, word_translator, pptx_translator from utils import file_handler, handle_translation_error, DocumentProcessingError # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Ensure necessary directories exist config.ensure_directories() # Create FastAPI app app = FastAPI( title=config.API_TITLE, version=config.API_VERSION, description=config.API_DESCRIPTION ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # Configure appropriately for production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Mount static files static_dir = Path(__file__).parent / "static" if static_dir.exists(): app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") @app.get("/") async def root(): """Root endpoint with API information""" return { "name": config.API_TITLE, "version": config.API_VERSION, "status": "operational", "supported_formats": list(config.SUPPORTED_EXTENSIONS), "endpoints": { "translate": "/translate", "health": "/health", "supported_languages": "/languages" } } @app.get("/health") async def health_check(): """Health check endpoint""" return { "status": "healthy", "translation_service": config.TRANSLATION_SERVICE } @app.get("/languages") async def get_supported_languages(): """Get list of supported language codes""" return { "supported_languages": { "es": "Spanish", "fr": "French", "de": "German", "it": "Italian", "pt": "Portuguese", "ru": "Russian", "zh": "Chinese (Simplified)", "ja": "Japanese", "ko": "Korean", "ar": "Arabic", "hi": "Hindi", "nl": "Dutch", "pl": "Polish", "tr": "Turkish", "sv": "Swedish", "da": "Danish", "no": "Norwegian", "fi": "Finnish", "cs": "Czech", "el": "Greek", "th": "Thai", "vi": "Vietnamese", "id": "Indonesian", "uk": "Ukrainian", "ro": "Romanian", "hu": "Hungarian" }, "note": "Supported languages may vary depending on the translation service configured" } @app.post("/translate") async def translate_document( file: UploadFile = File(..., description="Document file to translate (.xlsx, .docx, or .pptx)"), target_language: str = Form(..., description="Target language code (e.g., 'es', 'fr', 'de')"), source_language: str = Form(default="auto", description="Source language code (default: auto-detect)"), provider: str = Form(default="google", description="Translation provider (google, ollama, deepl, libre)"), translate_images: bool = Form(default=False, description="Translate images with Ollama vision (only for Ollama provider)"), cleanup: bool = Form(default=True, description="Delete input file after translation") ): """ Translate a document while preserving all formatting, layout, and embedded media **Supported File Types:** - Excel (.xlsx) - Preserves formulas, merged cells, styling, and images - Word (.docx) - Preserves headings, tables, images, headers/footers - PowerPoint (.pptx) - Preserves layouts, animations, and media **Parameters:** - **file**: The document file to translate - **target_language**: Target language code (e.g., 'es' for Spanish, 'fr' for French) - **source_language**: Source language code (optional, default: auto-detect) - **cleanup**: Whether to delete the uploaded file after translation (default: True) **Returns:** - Translated document file with preserved formatting """ input_path = None output_path = None try: # Validate file extension file_extension = file_handler.validate_file_extension(file.filename) logger.info(f"Processing {file_extension} file: {file.filename}") # Validate file size file_handler.validate_file_size(file) # Generate unique filenames input_filename = file_handler.generate_unique_filename(file.filename, "input") output_filename = file_handler.generate_unique_filename(file.filename, "translated") # Save uploaded file input_path = config.UPLOAD_DIR / input_filename output_path = config.OUTPUT_DIR / output_filename await file_handler.save_upload_file(file, input_path) logger.info(f"Saved input file to: {input_path}") # Configure translation provider from services.translation_service import GoogleTranslationProvider, DeepLTranslationProvider, LibreTranslationProvider, OllamaTranslationProvider, WebLLMTranslationProvider, translation_service if provider.lower() == "deepl": if not config.DEEPL_API_KEY: raise HTTPException(status_code=400, detail="DeepL API key not configured") translation_provider = DeepLTranslationProvider(config.DEEPL_API_KEY) elif provider.lower() == "libre": translation_provider = LibreTranslationProvider() elif provider.lower() == "ollama": vision_model = getattr(config, 'OLLAMA_VISION_MODEL', 'llava') translation_provider = OllamaTranslationProvider(config.OLLAMA_BASE_URL, config.OLLAMA_MODEL, vision_model) elif provider.lower() == "webllm": translation_provider = WebLLMTranslationProvider() else: translation_provider = GoogleTranslationProvider() # Update the global translation service translation_service.provider = translation_provider # Store translate_images flag for translators to access translation_service.translate_images = translate_images # Translate based on file type if file_extension == ".xlsx": logger.info("Translating Excel file...") excel_translator.translate_file(input_path, output_path, target_language) elif file_extension == ".docx": logger.info("Translating Word document...") word_translator.translate_file(input_path, output_path, target_language) elif file_extension == ".pptx": logger.info("Translating PowerPoint presentation...") pptx_translator.translate_file(input_path, output_path, target_language) else: raise DocumentProcessingError(f"Unsupported file type: {file_extension}") logger.info(f"Translation completed: {output_path}") # Get file info output_info = file_handler.get_file_info(output_path) # Cleanup input file if requested if cleanup and input_path: file_handler.cleanup_file(input_path) logger.info(f"Cleaned up input file: {input_path}") # Return the translated file return FileResponse( path=output_path, filename=f"translated_{file.filename}", media_type="application/octet-stream", headers={ "X-Original-Filename": file.filename, "X-File-Size-MB": str(output_info.get("size_mb", 0)), "X-Target-Language": target_language } ) except HTTPException: # Re-raise HTTP exceptions raise except Exception as e: logger.error(f"Translation error: {str(e)}", exc_info=True) # Cleanup files on error if input_path: file_handler.cleanup_file(input_path) if output_path: file_handler.cleanup_file(output_path) raise handle_translation_error(e) @app.delete("/cleanup/{filename}") async def cleanup_translated_file(filename: str): """ Cleanup a translated file after download **Parameters:** - **filename**: Name of the file to delete from the outputs directory """ try: file_path = config.OUTPUT_DIR / filename if not file_path.exists(): raise HTTPException(status_code=404, detail="File not found") file_handler.cleanup_file(file_path) return {"message": f"File {filename} deleted successfully"} except HTTPException: raise except Exception as e: logger.error(f"Cleanup error: {str(e)}") raise HTTPException(status_code=500, detail="Error cleaning up file") @app.post("/translate-batch") async def translate_batch_documents( files: list[UploadFile] = File(..., description="Multiple document files to translate"), target_language: str = Form(..., description="Target language code"), source_language: str = Form(default="auto", description="Source language code") ): """ Translate multiple documents in batch **Note:** This endpoint processes files sequentially. For large batches, consider calling the single file endpoint multiple times with concurrent requests. """ results = [] for file in files: try: # Process each file using the same logic as single file translation file_extension = file_handler.validate_file_extension(file.filename) file_handler.validate_file_size(file) input_filename = file_handler.generate_unique_filename(file.filename, "input") output_filename = file_handler.generate_unique_filename(file.filename, "translated") input_path = config.UPLOAD_DIR / input_filename output_path = config.OUTPUT_DIR / output_filename await file_handler.save_upload_file(file, input_path) # Translate based on file type if file_extension == ".xlsx": excel_translator.translate_file(input_path, output_path, target_language) elif file_extension == ".docx": word_translator.translate_file(input_path, output_path, target_language) elif file_extension == ".pptx": pptx_translator.translate_file(input_path, output_path, target_language) # Cleanup input file file_handler.cleanup_file(input_path) results.append({ "filename": file.filename, "status": "success", "output_file": output_filename, "download_url": f"/download/{output_filename}" }) except Exception as e: logger.error(f"Error processing {file.filename}: {str(e)}") results.append({ "filename": file.filename, "status": "error", "error": str(e) }) return { "total_files": len(files), "successful": len([r for r in results if r["status"] == "success"]), "failed": len([r for r in results if r["status"] == "error"]), "results": results } @app.get("/download/{filename}") async def download_file(filename: str): """ Download a translated file by filename **Parameters:** - **filename**: Name of the file to download from the outputs directory """ file_path = config.OUTPUT_DIR / filename if not file_path.exists(): raise HTTPException(status_code=404, detail="File not found") return FileResponse( path=file_path, filename=filename, media_type="application/octet-stream" ) @app.get("/ollama/models") async def list_ollama_models(base_url: Optional[str] = None): """ List available Ollama models **Parameters:** - **base_url**: Ollama server URL (default: from config) """ from services.translation_service import OllamaTranslationProvider url = base_url or config.OLLAMA_BASE_URL models = OllamaTranslationProvider.list_models(url) return { "ollama_url": url, "models": models, "count": len(models) } @app.post("/ollama/configure") async def configure_ollama(base_url: str = Form(...), model: str = Form(...)): """ Configure Ollama settings **Parameters:** - **base_url**: Ollama server URL (e.g., http://localhost:11434) - **model**: Model name to use for translation (e.g., llama3, mistral) """ config.OLLAMA_BASE_URL = base_url config.OLLAMA_MODEL = model return { "status": "success", "message": "Ollama configuration updated", "ollama_url": base_url, "model": model } if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)