380 lines
13 KiB
Python

"""
Document Translation API
FastAPI application for translating complex documents while preserving formatting
"""
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import FileResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pathlib import Path
from typing import Optional
import asyncio
import logging
from config import config
from translators import excel_translator, word_translator, pptx_translator
from utils import file_handler, handle_translation_error, DocumentProcessingError
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Ensure necessary directories exist
config.ensure_directories()
# Create FastAPI app
app = FastAPI(
title=config.API_TITLE,
version=config.API_VERSION,
description=config.API_DESCRIPTION
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount static files
static_dir = Path(__file__).parent / "static"
if static_dir.exists():
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"name": config.API_TITLE,
"version": config.API_VERSION,
"status": "operational",
"supported_formats": list(config.SUPPORTED_EXTENSIONS),
"endpoints": {
"translate": "/translate",
"health": "/health",
"supported_languages": "/languages"
}
}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"translation_service": config.TRANSLATION_SERVICE
}
@app.get("/languages")
async def get_supported_languages():
"""Get list of supported language codes"""
return {
"supported_languages": {
"es": "Spanish",
"fr": "French",
"de": "German",
"it": "Italian",
"pt": "Portuguese",
"ru": "Russian",
"zh": "Chinese (Simplified)",
"ja": "Japanese",
"ko": "Korean",
"ar": "Arabic",
"hi": "Hindi",
"nl": "Dutch",
"pl": "Polish",
"tr": "Turkish",
"sv": "Swedish",
"da": "Danish",
"no": "Norwegian",
"fi": "Finnish",
"cs": "Czech",
"el": "Greek",
"th": "Thai",
"vi": "Vietnamese",
"id": "Indonesian",
"uk": "Ukrainian",
"ro": "Romanian",
"hu": "Hungarian"
},
"note": "Supported languages may vary depending on the translation service configured"
}
@app.post("/translate")
async def translate_document(
file: UploadFile = File(..., description="Document file to translate (.xlsx, .docx, or .pptx)"),
target_language: str = Form(..., description="Target language code (e.g., 'es', 'fr', 'de')"),
source_language: str = Form(default="auto", description="Source language code (default: auto-detect)"),
provider: str = Form(default="google", description="Translation provider (google, ollama, deepl, libre)"),
translate_images: bool = Form(default=False, description="Translate images with Ollama vision (only for Ollama provider)"),
cleanup: bool = Form(default=True, description="Delete input file after translation")
):
"""
Translate a document while preserving all formatting, layout, and embedded media
**Supported File Types:**
- Excel (.xlsx) - Preserves formulas, merged cells, styling, and images
- Word (.docx) - Preserves headings, tables, images, headers/footers
- PowerPoint (.pptx) - Preserves layouts, animations, and media
**Parameters:**
- **file**: The document file to translate
- **target_language**: Target language code (e.g., 'es' for Spanish, 'fr' for French)
- **source_language**: Source language code (optional, default: auto-detect)
- **cleanup**: Whether to delete the uploaded file after translation (default: True)
**Returns:**
- Translated document file with preserved formatting
"""
input_path = None
output_path = None
try:
# Validate file extension
file_extension = file_handler.validate_file_extension(file.filename)
logger.info(f"Processing {file_extension} file: {file.filename}")
# Validate file size
file_handler.validate_file_size(file)
# Generate unique filenames
input_filename = file_handler.generate_unique_filename(file.filename, "input")
output_filename = file_handler.generate_unique_filename(file.filename, "translated")
# Save uploaded file
input_path = config.UPLOAD_DIR / input_filename
output_path = config.OUTPUT_DIR / output_filename
await file_handler.save_upload_file(file, input_path)
logger.info(f"Saved input file to: {input_path}")
# Configure translation provider
from services.translation_service import GoogleTranslationProvider, DeepLTranslationProvider, LibreTranslationProvider, OllamaTranslationProvider, WebLLMTranslationProvider, translation_service
if provider.lower() == "deepl":
if not config.DEEPL_API_KEY:
raise HTTPException(status_code=400, detail="DeepL API key not configured")
translation_provider = DeepLTranslationProvider(config.DEEPL_API_KEY)
elif provider.lower() == "libre":
translation_provider = LibreTranslationProvider()
elif provider.lower() == "ollama":
vision_model = getattr(config, 'OLLAMA_VISION_MODEL', 'llava')
translation_provider = OllamaTranslationProvider(config.OLLAMA_BASE_URL, config.OLLAMA_MODEL, vision_model)
elif provider.lower() == "webllm":
translation_provider = WebLLMTranslationProvider()
else:
translation_provider = GoogleTranslationProvider()
# Update the global translation service
translation_service.provider = translation_provider
# Store translate_images flag for translators to access
translation_service.translate_images = translate_images
# Translate based on file type
if file_extension == ".xlsx":
logger.info("Translating Excel file...")
excel_translator.translate_file(input_path, output_path, target_language)
elif file_extension == ".docx":
logger.info("Translating Word document...")
word_translator.translate_file(input_path, output_path, target_language)
elif file_extension == ".pptx":
logger.info("Translating PowerPoint presentation...")
pptx_translator.translate_file(input_path, output_path, target_language)
else:
raise DocumentProcessingError(f"Unsupported file type: {file_extension}")
logger.info(f"Translation completed: {output_path}")
# Get file info
output_info = file_handler.get_file_info(output_path)
# Cleanup input file if requested
if cleanup and input_path:
file_handler.cleanup_file(input_path)
logger.info(f"Cleaned up input file: {input_path}")
# Return the translated file
return FileResponse(
path=output_path,
filename=f"translated_{file.filename}",
media_type="application/octet-stream",
headers={
"X-Original-Filename": file.filename,
"X-File-Size-MB": str(output_info.get("size_mb", 0)),
"X-Target-Language": target_language
}
)
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
logger.error(f"Translation error: {str(e)}", exc_info=True)
# Cleanup files on error
if input_path:
file_handler.cleanup_file(input_path)
if output_path:
file_handler.cleanup_file(output_path)
raise handle_translation_error(e)
@app.delete("/cleanup/{filename}")
async def cleanup_translated_file(filename: str):
"""
Cleanup a translated file after download
**Parameters:**
- **filename**: Name of the file to delete from the outputs directory
"""
try:
file_path = config.OUTPUT_DIR / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
file_handler.cleanup_file(file_path)
return {"message": f"File {filename} deleted successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Cleanup error: {str(e)}")
raise HTTPException(status_code=500, detail="Error cleaning up file")
@app.post("/translate-batch")
async def translate_batch_documents(
files: list[UploadFile] = File(..., description="Multiple document files to translate"),
target_language: str = Form(..., description="Target language code"),
source_language: str = Form(default="auto", description="Source language code")
):
"""
Translate multiple documents in batch
**Note:** This endpoint processes files sequentially. For large batches, consider
calling the single file endpoint multiple times with concurrent requests.
"""
results = []
for file in files:
try:
# Process each file using the same logic as single file translation
file_extension = file_handler.validate_file_extension(file.filename)
file_handler.validate_file_size(file)
input_filename = file_handler.generate_unique_filename(file.filename, "input")
output_filename = file_handler.generate_unique_filename(file.filename, "translated")
input_path = config.UPLOAD_DIR / input_filename
output_path = config.OUTPUT_DIR / output_filename
await file_handler.save_upload_file(file, input_path)
# Translate based on file type
if file_extension == ".xlsx":
excel_translator.translate_file(input_path, output_path, target_language)
elif file_extension == ".docx":
word_translator.translate_file(input_path, output_path, target_language)
elif file_extension == ".pptx":
pptx_translator.translate_file(input_path, output_path, target_language)
# Cleanup input file
file_handler.cleanup_file(input_path)
results.append({
"filename": file.filename,
"status": "success",
"output_file": output_filename,
"download_url": f"/download/{output_filename}"
})
except Exception as e:
logger.error(f"Error processing {file.filename}: {str(e)}")
results.append({
"filename": file.filename,
"status": "error",
"error": str(e)
})
return {
"total_files": len(files),
"successful": len([r for r in results if r["status"] == "success"]),
"failed": len([r for r in results if r["status"] == "error"]),
"results": results
}
@app.get("/download/{filename}")
async def download_file(filename: str):
"""
Download a translated file by filename
**Parameters:**
- **filename**: Name of the file to download from the outputs directory
"""
file_path = config.OUTPUT_DIR / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(
path=file_path,
filename=filename,
media_type="application/octet-stream"
)
@app.get("/ollama/models")
async def list_ollama_models(base_url: Optional[str] = None):
"""
List available Ollama models
**Parameters:**
- **base_url**: Ollama server URL (default: from config)
"""
from services.translation_service import OllamaTranslationProvider
url = base_url or config.OLLAMA_BASE_URL
models = OllamaTranslationProvider.list_models(url)
return {
"ollama_url": url,
"models": models,
"count": len(models)
}
@app.post("/ollama/configure")
async def configure_ollama(base_url: str = Form(...), model: str = Form(...)):
"""
Configure Ollama settings
**Parameters:**
- **base_url**: Ollama server URL (e.g., http://localhost:11434)
- **model**: Model name to use for translation (e.g., llama3, mistral)
"""
config.OLLAMA_BASE_URL = base_url
config.OLLAMA_MODEL = model
return {
"status": "success",
"message": "Ollama configuration updated",
"ollama_url": base_url,
"model": model
}
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)