664 lines
25 KiB
Python

"""
Document Translation API
FastAPI application for translating complex documents while preserving formatting
"""
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import FileResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pathlib import Path
from typing import Optional
import asyncio
import logging
from config import config
from translators import excel_translator, word_translator, pptx_translator
from utils import file_handler, handle_translation_error, DocumentProcessingError
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def build_full_prompt(system_prompt: str, glossary: str) -> str:
"""Combine system prompt and glossary into a single prompt for LLM translation."""
parts = []
# Add system prompt if provided
if system_prompt and system_prompt.strip():
parts.append(system_prompt.strip())
# Add glossary if provided
if glossary and glossary.strip():
glossary_section = """
TECHNICAL GLOSSARY - Use these exact translations for the following terms:
{}
Always use the translations from this glossary when you encounter these terms.""".format(glossary.strip())
parts.append(glossary_section)
return "\n\n".join(parts) if parts else ""
# Ensure necessary directories exist
config.ensure_directories()
# Create FastAPI app
app = FastAPI(
title=config.API_TITLE,
version=config.API_VERSION,
description=config.API_DESCRIPTION
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount static files
static_dir = Path(__file__).parent / "static"
if static_dir.exists():
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"name": config.API_TITLE,
"version": config.API_VERSION,
"status": "operational",
"supported_formats": list(config.SUPPORTED_EXTENSIONS),
"endpoints": {
"translate": "/translate",
"health": "/health",
"supported_languages": "/languages"
}
}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"translation_service": config.TRANSLATION_SERVICE
}
@app.get("/languages")
async def get_supported_languages():
"""Get list of supported language codes"""
return {
"supported_languages": {
"es": "Spanish",
"fr": "French",
"de": "German",
"it": "Italian",
"pt": "Portuguese",
"ru": "Russian",
"zh": "Chinese (Simplified)",
"ja": "Japanese",
"ko": "Korean",
"ar": "Arabic",
"hi": "Hindi",
"nl": "Dutch",
"pl": "Polish",
"tr": "Turkish",
"sv": "Swedish",
"da": "Danish",
"no": "Norwegian",
"fi": "Finnish",
"cs": "Czech",
"el": "Greek",
"th": "Thai",
"vi": "Vietnamese",
"id": "Indonesian",
"uk": "Ukrainian",
"ro": "Romanian",
"hu": "Hungarian"
},
"note": "Supported languages may vary depending on the translation service configured"
}
@app.post("/translate")
async def translate_document(
file: UploadFile = File(..., description="Document file to translate (.xlsx, .docx, or .pptx)"),
target_language: str = Form(..., description="Target language code (e.g., 'es', 'fr', 'de')"),
source_language: str = Form(default="auto", description="Source language code (default: auto-detect)"),
provider: str = Form(default="google", description="Translation provider (google, ollama, deepl, libre, openai)"),
translate_images: bool = Form(default=False, description="Translate images with multimodal Ollama/OpenAI model"),
ollama_model: str = Form(default="", description="Ollama model to use (also used for vision if multimodal)"),
system_prompt: str = Form(default="", description="Custom system prompt with context or instructions for LLM translation"),
glossary: str = Form(default="", description="Technical glossary (format: source=target, one per line)"),
libre_url: str = Form(default="https://libretranslate.com", description="LibreTranslate server URL"),
openai_api_key: str = Form(default="", description="OpenAI API key"),
openai_model: str = Form(default="gpt-4o-mini", description="OpenAI model to use (gpt-4o-mini is cheapest with vision)"),
cleanup: bool = Form(default=True, description="Delete input file after translation")
):
"""
Translate a document while preserving all formatting, layout, and embedded media
**Supported File Types:**
- Excel (.xlsx) - Preserves formulas, merged cells, styling, and images
- Word (.docx) - Preserves headings, tables, images, headers/footers
- PowerPoint (.pptx) - Preserves layouts, animations, and media
**Parameters:**
- **file**: The document file to translate
- **target_language**: Target language code (e.g., 'es' for Spanish, 'fr' for French)
- **source_language**: Source language code (optional, default: auto-detect)
- **cleanup**: Whether to delete the uploaded file after translation (default: True)
**Returns:**
- Translated document file with preserved formatting
"""
input_path = None
output_path = None
try:
# Validate file extension
file_extension = file_handler.validate_file_extension(file.filename)
logger.info(f"Processing {file_extension} file: {file.filename}")
# Validate file size
file_handler.validate_file_size(file)
# Generate unique filenames
input_filename = file_handler.generate_unique_filename(file.filename, "input")
output_filename = file_handler.generate_unique_filename(file.filename, "translated")
# Save uploaded file
input_path = config.UPLOAD_DIR / input_filename
output_path = config.OUTPUT_DIR / output_filename
await file_handler.save_upload_file(file, input_path)
logger.info(f"Saved input file to: {input_path}")
# Configure translation provider
from services.translation_service import GoogleTranslationProvider, DeepLTranslationProvider, LibreTranslationProvider, OllamaTranslationProvider, OpenAITranslationProvider, translation_service
if provider.lower() == "deepl":
if not config.DEEPL_API_KEY:
raise HTTPException(status_code=400, detail="DeepL API key not configured")
translation_provider = DeepLTranslationProvider(config.DEEPL_API_KEY)
elif provider.lower() == "libre":
libre_server = libre_url.strip() if libre_url else "https://libretranslate.com"
logger.info(f"Using LibreTranslate server: {libre_server}")
translation_provider = LibreTranslationProvider(libre_server)
elif provider.lower() == "openai":
api_key = openai_api_key.strip() if openai_api_key else ""
if not api_key:
raise HTTPException(status_code=400, detail="OpenAI API key not provided")
model_to_use = openai_model.strip() if openai_model else "gpt-4o-mini"
# Combine system prompt and glossary
custom_prompt = build_full_prompt(system_prompt, glossary)
logger.info(f"Using OpenAI model: {model_to_use}")
if custom_prompt:
logger.info(f"Custom system prompt provided ({len(custom_prompt)} chars)")
translation_provider = OpenAITranslationProvider(api_key, model_to_use, custom_prompt)
elif provider.lower() == "ollama":
# Use the same model for text and vision (multimodal models like gemma3, qwen3-vl)
model_to_use = ollama_model.strip() if ollama_model else config.OLLAMA_MODEL
# Combine system prompt and glossary
custom_prompt = build_full_prompt(system_prompt, glossary)
logger.info(f"Using Ollama model: {model_to_use} (text + vision)")
if custom_prompt:
logger.info(f"Custom system prompt provided ({len(custom_prompt)} chars)")
translation_provider = OllamaTranslationProvider(config.OLLAMA_BASE_URL, model_to_use, model_to_use, custom_prompt)
else:
translation_provider = GoogleTranslationProvider()
# Update the global translation service
translation_service.provider = translation_provider
# Store translate_images flag for translators to access
translation_service.translate_images = translate_images
# Translate based on file type
if file_extension == ".xlsx":
logger.info("Translating Excel file...")
excel_translator.translate_file(input_path, output_path, target_language)
elif file_extension == ".docx":
logger.info("Translating Word document...")
word_translator.translate_file(input_path, output_path, target_language)
elif file_extension == ".pptx":
logger.info("Translating PowerPoint presentation...")
pptx_translator.translate_file(input_path, output_path, target_language)
else:
raise DocumentProcessingError(f"Unsupported file type: {file_extension}")
logger.info(f"Translation completed: {output_path}")
# Get file info
output_info = file_handler.get_file_info(output_path)
# Cleanup input file if requested
if cleanup and input_path:
file_handler.cleanup_file(input_path)
logger.info(f"Cleaned up input file: {input_path}")
# Return the translated file
return FileResponse(
path=output_path,
filename=f"translated_{file.filename}",
media_type="application/octet-stream",
headers={
"X-Original-Filename": file.filename,
"X-File-Size-MB": str(output_info.get("size_mb", 0)),
"X-Target-Language": target_language
}
)
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
logger.error(f"Translation error: {str(e)}", exc_info=True)
# Cleanup files on error
if input_path:
file_handler.cleanup_file(input_path)
if output_path:
file_handler.cleanup_file(output_path)
raise handle_translation_error(e)
@app.delete("/cleanup/{filename}")
async def cleanup_translated_file(filename: str):
"""
Cleanup a translated file after download
**Parameters:**
- **filename**: Name of the file to delete from the outputs directory
"""
try:
file_path = config.OUTPUT_DIR / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
file_handler.cleanup_file(file_path)
return {"message": f"File {filename} deleted successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Cleanup error: {str(e)}")
raise HTTPException(status_code=500, detail="Error cleaning up file")
@app.post("/translate-batch")
async def translate_batch_documents(
files: list[UploadFile] = File(..., description="Multiple document files to translate"),
target_language: str = Form(..., description="Target language code"),
source_language: str = Form(default="auto", description="Source language code")
):
"""
Translate multiple documents in batch
**Note:** This endpoint processes files sequentially. For large batches, consider
calling the single file endpoint multiple times with concurrent requests.
"""
results = []
for file in files:
try:
# Process each file using the same logic as single file translation
file_extension = file_handler.validate_file_extension(file.filename)
file_handler.validate_file_size(file)
input_filename = file_handler.generate_unique_filename(file.filename, "input")
output_filename = file_handler.generate_unique_filename(file.filename, "translated")
input_path = config.UPLOAD_DIR / input_filename
output_path = config.OUTPUT_DIR / output_filename
await file_handler.save_upload_file(file, input_path)
# Translate based on file type
if file_extension == ".xlsx":
excel_translator.translate_file(input_path, output_path, target_language)
elif file_extension == ".docx":
word_translator.translate_file(input_path, output_path, target_language)
elif file_extension == ".pptx":
pptx_translator.translate_file(input_path, output_path, target_language)
# Cleanup input file
file_handler.cleanup_file(input_path)
results.append({
"filename": file.filename,
"status": "success",
"output_file": output_filename,
"download_url": f"/download/{output_filename}"
})
except Exception as e:
logger.error(f"Error processing {file.filename}: {str(e)}")
results.append({
"filename": file.filename,
"status": "error",
"error": str(e)
})
return {
"total_files": len(files),
"successful": len([r for r in results if r["status"] == "success"]),
"failed": len([r for r in results if r["status"] == "error"]),
"results": results
}
@app.get("/download/{filename}")
async def download_file(filename: str):
"""
Download a translated file by filename
**Parameters:**
- **filename**: Name of the file to download from the outputs directory
"""
file_path = config.OUTPUT_DIR / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(
path=file_path,
filename=filename,
media_type="application/octet-stream"
)
@app.get("/ollama/models")
async def list_ollama_models(base_url: Optional[str] = None):
"""
List available Ollama models
**Parameters:**
- **base_url**: Ollama server URL (default: from config)
"""
from services.translation_service import OllamaTranslationProvider
url = base_url or config.OLLAMA_BASE_URL
models = OllamaTranslationProvider.list_models(url)
return {
"ollama_url": url,
"models": models,
"count": len(models)
}
@app.post("/ollama/configure")
async def configure_ollama(base_url: str = Form(...), model: str = Form(...)):
"""
Configure Ollama settings
**Parameters:**
- **base_url**: Ollama server URL (e.g., http://localhost:11434)
- **model**: Model name to use for translation (e.g., llama3, mistral)
"""
config.OLLAMA_BASE_URL = base_url
config.OLLAMA_MODEL = model
return {
"status": "success",
"message": "Ollama configuration updated",
"ollama_url": base_url,
"model": model
}
@app.post("/extract-texts")
async def extract_texts_from_document(
file: UploadFile = File(..., description="Document file to extract texts from"),
):
"""
Extract all translatable texts from a document for client-side translation (WebLLM).
Returns a list of texts and a session ID to use for reconstruction.
**Parameters:**
- **file**: The document file to extract texts from
**Returns:**
- session_id: Unique ID to reference this extraction
- texts: Array of texts to translate
- file_type: Type of the document
"""
import uuid
import json
try:
# Validate file extension
file_extension = file_handler.validate_file_extension(file.filename)
logger.info(f"Extracting texts from {file_extension} file: {file.filename}")
# Validate file size
file_handler.validate_file_size(file)
# Generate session ID
session_id = str(uuid.uuid4())
# Save uploaded file
input_filename = f"session_{session_id}{file_extension}"
input_path = config.UPLOAD_DIR / input_filename
await file_handler.save_upload_file(file, input_path)
# Extract texts based on file type
texts = []
if file_extension == ".xlsx":
from openpyxl import load_workbook
wb = load_workbook(input_path)
for sheet in wb.worksheets:
for row in sheet.iter_rows():
for cell in row:
if cell.value and isinstance(cell.value, str) and cell.value.strip():
texts.append({
"id": f"{sheet.title}!{cell.coordinate}",
"text": cell.value
})
wb.close()
elif file_extension == ".docx":
from docx import Document
doc = Document(input_path)
para_idx = 0
for para in doc.paragraphs:
if para.text.strip():
texts.append({
"id": f"para_{para_idx}",
"text": para.text
})
para_idx += 1
# Also extract from tables
table_idx = 0
for table in doc.tables:
for row_idx, row in enumerate(table.rows):
for cell_idx, cell in enumerate(row.cells):
if cell.text.strip():
texts.append({
"id": f"table_{table_idx}_r{row_idx}_c{cell_idx}",
"text": cell.text
})
table_idx += 1
elif file_extension == ".pptx":
from pptx import Presentation
prs = Presentation(input_path)
for slide_idx, slide in enumerate(prs.slides):
for shape_idx, shape in enumerate(slide.shapes):
if shape.has_text_frame:
for para_idx, para in enumerate(shape.text_frame.paragraphs):
for run_idx, run in enumerate(para.runs):
if run.text.strip():
texts.append({
"id": f"slide_{slide_idx}_shape_{shape_idx}_para_{para_idx}_run_{run_idx}",
"text": run.text
})
# Save session metadata
session_data = {
"original_filename": file.filename,
"file_extension": file_extension,
"input_path": str(input_path),
"text_count": len(texts)
}
session_file = config.UPLOAD_DIR / f"session_{session_id}.json"
with open(session_file, "w", encoding="utf-8") as f:
json.dump(session_data, f)
logger.info(f"Extracted {len(texts)} texts from {file.filename}, session: {session_id}")
return {
"session_id": session_id,
"texts": texts,
"file_type": file_extension,
"text_count": len(texts)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Text extraction error: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to extract texts: {str(e)}")
@app.post("/reconstruct-document")
async def reconstruct_document(
session_id: str = Form(..., description="Session ID from extract-texts"),
translations: str = Form(..., description="JSON array of {id, translated_text} objects"),
target_language: str = Form(..., description="Target language code"),
):
"""
Reconstruct a document with translated texts.
**Parameters:**
- **session_id**: The session ID from extract-texts
- **translations**: JSON array of translations with matching IDs
- **target_language**: Target language for filename
**Returns:**
- Translated document file
"""
import json
try:
# Load session data
session_file = config.UPLOAD_DIR / f"session_{session_id}.json"
if not session_file.exists():
raise HTTPException(status_code=404, detail="Session not found or expired")
with open(session_file, "r", encoding="utf-8") as f:
session_data = json.load(f)
input_path = Path(session_data["input_path"])
file_extension = session_data["file_extension"]
original_filename = session_data["original_filename"]
if not input_path.exists():
raise HTTPException(status_code=404, detail="Source file not found or expired")
# Parse translations
translation_list = json.loads(translations)
translation_map = {t["id"]: t["translated_text"] for t in translation_list}
# Generate output path
output_filename = file_handler.generate_unique_filename(original_filename, "translated")
output_path = config.OUTPUT_DIR / output_filename
# Reconstruct based on file type
if file_extension == ".xlsx":
from openpyxl import load_workbook
import shutil
shutil.copy(input_path, output_path)
wb = load_workbook(output_path)
for sheet in wb.worksheets:
for row in sheet.iter_rows():
for cell in row:
cell_id = f"{sheet.title}!{cell.coordinate}"
if cell_id in translation_map:
cell.value = translation_map[cell_id]
wb.save(output_path)
wb.close()
elif file_extension == ".docx":
from docx import Document
import shutil
shutil.copy(input_path, output_path)
doc = Document(output_path)
para_idx = 0
for para in doc.paragraphs:
para_id = f"para_{para_idx}"
if para_id in translation_map and para.text.strip():
# Replace text while keeping formatting
for run in para.runs:
run.text = ""
if para.runs:
para.runs[0].text = translation_map[para_id]
else:
para.text = translation_map[para_id]
para_idx += 1
# Also handle tables
table_idx = 0
for table in doc.tables:
for row_idx, row in enumerate(table.rows):
for cell_idx, cell in enumerate(row.cells):
cell_id = f"table_{table_idx}_r{row_idx}_c{cell_idx}"
if cell_id in translation_map:
# Clear and set new text
for para in cell.paragraphs:
for run in para.runs:
run.text = ""
if cell.paragraphs and cell.paragraphs[0].runs:
cell.paragraphs[0].runs[0].text = translation_map[cell_id]
elif cell.paragraphs:
cell.paragraphs[0].text = translation_map[cell_id]
table_idx += 1
doc.save(output_path)
elif file_extension == ".pptx":
from pptx import Presentation
import shutil
shutil.copy(input_path, output_path)
prs = Presentation(output_path)
for slide_idx, slide in enumerate(prs.slides):
for shape_idx, shape in enumerate(slide.shapes):
if shape.has_text_frame:
for para_idx, para in enumerate(shape.text_frame.paragraphs):
for run_idx, run in enumerate(para.runs):
run_id = f"slide_{slide_idx}_shape_{shape_idx}_para_{para_idx}_run_{run_idx}"
if run_id in translation_map:
run.text = translation_map[run_id]
prs.save(output_path)
# Cleanup session files
file_handler.cleanup_file(input_path)
file_handler.cleanup_file(session_file)
logger.info(f"Reconstructed document: {output_path}")
return FileResponse(
path=output_path,
filename=f"translated_{original_filename}",
media_type="application/octet-stream"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Reconstruction error: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to reconstruct document: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)