378 lines
13 KiB
Python
378 lines
13 KiB
Python
"""
|
|
Document Translation API
|
|
FastAPI application for translating complex documents while preserving formatting
|
|
"""
|
|
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
|
|
from fastapi.responses import FileResponse, JSONResponse
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.staticfiles import StaticFiles
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
import asyncio
|
|
import logging
|
|
|
|
from config import config
|
|
from translators import excel_translator, word_translator, pptx_translator
|
|
from utils import file_handler, handle_translation_error, DocumentProcessingError
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Ensure necessary directories exist
|
|
config.ensure_directories()
|
|
|
|
# Create FastAPI app
|
|
app = FastAPI(
|
|
title=config.API_TITLE,
|
|
version=config.API_VERSION,
|
|
description=config.API_DESCRIPTION
|
|
)
|
|
|
|
# Add CORS middleware
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"], # Configure appropriately for production
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Mount static files
|
|
static_dir = Path(__file__).parent / "static"
|
|
if static_dir.exists():
|
|
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
|
|
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
"""Root endpoint with API information"""
|
|
return {
|
|
"name": config.API_TITLE,
|
|
"version": config.API_VERSION,
|
|
"status": "operational",
|
|
"supported_formats": list(config.SUPPORTED_EXTENSIONS),
|
|
"endpoints": {
|
|
"translate": "/translate",
|
|
"health": "/health",
|
|
"supported_languages": "/languages"
|
|
}
|
|
}
|
|
|
|
|
|
@app.get("/health")
|
|
async def health_check():
|
|
"""Health check endpoint"""
|
|
return {
|
|
"status": "healthy",
|
|
"translation_service": config.TRANSLATION_SERVICE
|
|
}
|
|
|
|
|
|
@app.get("/languages")
|
|
async def get_supported_languages():
|
|
"""Get list of supported language codes"""
|
|
return {
|
|
"supported_languages": {
|
|
"es": "Spanish",
|
|
"fr": "French",
|
|
"de": "German",
|
|
"it": "Italian",
|
|
"pt": "Portuguese",
|
|
"ru": "Russian",
|
|
"zh": "Chinese (Simplified)",
|
|
"ja": "Japanese",
|
|
"ko": "Korean",
|
|
"ar": "Arabic",
|
|
"hi": "Hindi",
|
|
"nl": "Dutch",
|
|
"pl": "Polish",
|
|
"tr": "Turkish",
|
|
"sv": "Swedish",
|
|
"da": "Danish",
|
|
"no": "Norwegian",
|
|
"fi": "Finnish",
|
|
"cs": "Czech",
|
|
"el": "Greek",
|
|
"th": "Thai",
|
|
"vi": "Vietnamese",
|
|
"id": "Indonesian",
|
|
"uk": "Ukrainian",
|
|
"ro": "Romanian",
|
|
"hu": "Hungarian"
|
|
},
|
|
"note": "Supported languages may vary depending on the translation service configured"
|
|
}
|
|
|
|
|
|
@app.post("/translate")
|
|
async def translate_document(
|
|
file: UploadFile = File(..., description="Document file to translate (.xlsx, .docx, or .pptx)"),
|
|
target_language: str = Form(..., description="Target language code (e.g., 'es', 'fr', 'de')"),
|
|
source_language: str = Form(default="auto", description="Source language code (default: auto-detect)"),
|
|
provider: str = Form(default="google", description="Translation provider (google, ollama, deepl, libre)"),
|
|
translate_images: bool = Form(default=False, description="Translate images with Ollama vision (only for Ollama provider)"),
|
|
cleanup: bool = Form(default=True, description="Delete input file after translation")
|
|
):
|
|
"""
|
|
Translate a document while preserving all formatting, layout, and embedded media
|
|
|
|
**Supported File Types:**
|
|
- Excel (.xlsx) - Preserves formulas, merged cells, styling, and images
|
|
- Word (.docx) - Preserves headings, tables, images, headers/footers
|
|
- PowerPoint (.pptx) - Preserves layouts, animations, and media
|
|
|
|
**Parameters:**
|
|
- **file**: The document file to translate
|
|
- **target_language**: Target language code (e.g., 'es' for Spanish, 'fr' for French)
|
|
- **source_language**: Source language code (optional, default: auto-detect)
|
|
- **cleanup**: Whether to delete the uploaded file after translation (default: True)
|
|
|
|
**Returns:**
|
|
- Translated document file with preserved formatting
|
|
"""
|
|
input_path = None
|
|
output_path = None
|
|
|
|
try:
|
|
# Validate file extension
|
|
file_extension = file_handler.validate_file_extension(file.filename)
|
|
logger.info(f"Processing {file_extension} file: {file.filename}")
|
|
|
|
# Validate file size
|
|
file_handler.validate_file_size(file)
|
|
|
|
# Generate unique filenames
|
|
input_filename = file_handler.generate_unique_filename(file.filename, "input")
|
|
output_filename = file_handler.generate_unique_filename(file.filename, "translated")
|
|
|
|
# Save uploaded file
|
|
input_path = config.UPLOAD_DIR / input_filename
|
|
output_path = config.OUTPUT_DIR / output_filename
|
|
|
|
await file_handler.save_upload_file(file, input_path)
|
|
logger.info(f"Saved input file to: {input_path}")
|
|
|
|
# Configure translation provider
|
|
from services.translation_service import GoogleTranslationProvider, DeepLTranslationProvider, LibreTranslationProvider, OllamaTranslationProvider, translation_service
|
|
|
|
if provider.lower() == "deepl":
|
|
if not config.DEEPL_API_KEY:
|
|
raise HTTPException(status_code=400, detail="DeepL API key not configured")
|
|
translation_provider = DeepLTranslationProvider(config.DEEPL_API_KEY)
|
|
elif provider.lower() == "libre":
|
|
translation_provider = LibreTranslationProvider()
|
|
elif provider.lower() == "ollama":
|
|
vision_model = getattr(config, 'OLLAMA_VISION_MODEL', 'llava')
|
|
translation_provider = OllamaTranslationProvider(config.OLLAMA_BASE_URL, config.OLLAMA_MODEL, vision_model)
|
|
else:
|
|
translation_provider = GoogleTranslationProvider()
|
|
|
|
# Update the global translation service
|
|
translation_service.provider = translation_provider
|
|
|
|
# Store translate_images flag for translators to access
|
|
translation_service.translate_images = translate_images
|
|
|
|
# Translate based on file type
|
|
if file_extension == ".xlsx":
|
|
logger.info("Translating Excel file...")
|
|
excel_translator.translate_file(input_path, output_path, target_language)
|
|
elif file_extension == ".docx":
|
|
logger.info("Translating Word document...")
|
|
word_translator.translate_file(input_path, output_path, target_language)
|
|
elif file_extension == ".pptx":
|
|
logger.info("Translating PowerPoint presentation...")
|
|
pptx_translator.translate_file(input_path, output_path, target_language)
|
|
else:
|
|
raise DocumentProcessingError(f"Unsupported file type: {file_extension}")
|
|
|
|
logger.info(f"Translation completed: {output_path}")
|
|
|
|
# Get file info
|
|
output_info = file_handler.get_file_info(output_path)
|
|
|
|
# Cleanup input file if requested
|
|
if cleanup and input_path:
|
|
file_handler.cleanup_file(input_path)
|
|
logger.info(f"Cleaned up input file: {input_path}")
|
|
|
|
# Return the translated file
|
|
return FileResponse(
|
|
path=output_path,
|
|
filename=f"translated_{file.filename}",
|
|
media_type="application/octet-stream",
|
|
headers={
|
|
"X-Original-Filename": file.filename,
|
|
"X-File-Size-MB": str(output_info.get("size_mb", 0)),
|
|
"X-Target-Language": target_language
|
|
}
|
|
)
|
|
|
|
except HTTPException:
|
|
# Re-raise HTTP exceptions
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Translation error: {str(e)}", exc_info=True)
|
|
|
|
# Cleanup files on error
|
|
if input_path:
|
|
file_handler.cleanup_file(input_path)
|
|
if output_path:
|
|
file_handler.cleanup_file(output_path)
|
|
|
|
raise handle_translation_error(e)
|
|
|
|
|
|
@app.delete("/cleanup/{filename}")
|
|
async def cleanup_translated_file(filename: str):
|
|
"""
|
|
Cleanup a translated file after download
|
|
|
|
**Parameters:**
|
|
- **filename**: Name of the file to delete from the outputs directory
|
|
"""
|
|
try:
|
|
file_path = config.OUTPUT_DIR / filename
|
|
|
|
if not file_path.exists():
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
file_handler.cleanup_file(file_path)
|
|
|
|
return {"message": f"File {filename} deleted successfully"}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Cleanup error: {str(e)}")
|
|
raise HTTPException(status_code=500, detail="Error cleaning up file")
|
|
|
|
|
|
@app.post("/translate-batch")
|
|
async def translate_batch_documents(
|
|
files: list[UploadFile] = File(..., description="Multiple document files to translate"),
|
|
target_language: str = Form(..., description="Target language code"),
|
|
source_language: str = Form(default="auto", description="Source language code")
|
|
):
|
|
"""
|
|
Translate multiple documents in batch
|
|
|
|
**Note:** This endpoint processes files sequentially. For large batches, consider
|
|
calling the single file endpoint multiple times with concurrent requests.
|
|
"""
|
|
results = []
|
|
|
|
for file in files:
|
|
try:
|
|
# Process each file using the same logic as single file translation
|
|
file_extension = file_handler.validate_file_extension(file.filename)
|
|
file_handler.validate_file_size(file)
|
|
|
|
input_filename = file_handler.generate_unique_filename(file.filename, "input")
|
|
output_filename = file_handler.generate_unique_filename(file.filename, "translated")
|
|
|
|
input_path = config.UPLOAD_DIR / input_filename
|
|
output_path = config.OUTPUT_DIR / output_filename
|
|
|
|
await file_handler.save_upload_file(file, input_path)
|
|
|
|
# Translate based on file type
|
|
if file_extension == ".xlsx":
|
|
excel_translator.translate_file(input_path, output_path, target_language)
|
|
elif file_extension == ".docx":
|
|
word_translator.translate_file(input_path, output_path, target_language)
|
|
elif file_extension == ".pptx":
|
|
pptx_translator.translate_file(input_path, output_path, target_language)
|
|
|
|
# Cleanup input file
|
|
file_handler.cleanup_file(input_path)
|
|
|
|
results.append({
|
|
"filename": file.filename,
|
|
"status": "success",
|
|
"output_file": output_filename,
|
|
"download_url": f"/download/{output_filename}"
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing {file.filename}: {str(e)}")
|
|
results.append({
|
|
"filename": file.filename,
|
|
"status": "error",
|
|
"error": str(e)
|
|
})
|
|
|
|
return {
|
|
"total_files": len(files),
|
|
"successful": len([r for r in results if r["status"] == "success"]),
|
|
"failed": len([r for r in results if r["status"] == "error"]),
|
|
"results": results
|
|
}
|
|
|
|
|
|
@app.get("/download/{filename}")
|
|
async def download_file(filename: str):
|
|
"""
|
|
Download a translated file by filename
|
|
|
|
**Parameters:**
|
|
- **filename**: Name of the file to download from the outputs directory
|
|
"""
|
|
file_path = config.OUTPUT_DIR / filename
|
|
|
|
if not file_path.exists():
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
return FileResponse(
|
|
path=file_path,
|
|
filename=filename,
|
|
media_type="application/octet-stream"
|
|
)
|
|
|
|
|
|
@app.get("/ollama/models")
|
|
async def list_ollama_models(base_url: Optional[str] = None):
|
|
"""
|
|
List available Ollama models
|
|
|
|
**Parameters:**
|
|
- **base_url**: Ollama server URL (default: from config)
|
|
"""
|
|
from services.translation_service import OllamaTranslationProvider
|
|
|
|
url = base_url or config.OLLAMA_BASE_URL
|
|
models = OllamaTranslationProvider.list_models(url)
|
|
|
|
return {
|
|
"ollama_url": url,
|
|
"models": models,
|
|
"count": len(models)
|
|
}
|
|
|
|
|
|
@app.post("/ollama/configure")
|
|
async def configure_ollama(base_url: str = Form(...), model: str = Form(...)):
|
|
"""
|
|
Configure Ollama settings
|
|
|
|
**Parameters:**
|
|
- **base_url**: Ollama server URL (e.g., http://localhost:11434)
|
|
- **model**: Model name to use for translation (e.g., llama3, mistral)
|
|
"""
|
|
config.OLLAMA_BASE_URL = base_url
|
|
config.OLLAMA_MODEL = model
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": "Ollama configuration updated",
|
|
"ollama_url": base_url,
|
|
"model": model
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
|
|
|