Files
office_translator/translators/excel_translator.py
Sepehr Ramezani 26bd096a06 feat: production deployment - full update with providers, admin, glossaries, pricing, tests
Major changes across backend, frontend, infrastructure:
- Provider system with model selection (Google, DeepL, OpenAI, Ollama, Google Cloud)
- Admin panel: user management, pricing, settings
- Glossary system with CSV import/export
- Subscription and tier quota management
- Security hardening (rate limiting, API key auth, path traversal fixes)
- Docker compose for dev, prod, and IONOS deployment
- Alembic migrations for new tables
- Frontend: dashboard, pricing page, landing page, i18n (en/fr)
- Test suite and verification scripts

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-04-25 15:01:47 +02:00

549 lines
20 KiB
Python

"""
Excel Translation Module
Translates Excel files while preserving all formatting, formulas, images, and layout
OPTIMIZED: Uses batch translation for 5-10x faster processing
Updated to use new TranslationProvider interface with structured error handling.
"""
import re
import tempfile
import os
import time
import concurrent.futures
from pathlib import Path
from typing import Dict, Set, List, Tuple, Optional, Callable, Any
from openpyxl import load_workbook
from openpyxl.worksheet.worksheet import Worksheet
from openpyxl.cell.cell import Cell
from openpyxl.utils import get_column_letter
from services.providers.base import TranslationProvider
from core.logging import get_logger
logger = get_logger(__name__)
_HAS_STRUCTLOG = True
def _log_info(event: str, **kwargs):
"""Log info with structlog or standard logging compatibility."""
if _HAS_STRUCTLOG:
logger.info(event, **kwargs)
else:
msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items())
logger.info(msg)
def _log_error(event: str, **kwargs):
"""Log error with structlog or standard logging compatibility."""
if _HAS_STRUCTLOG:
logger.error(event, **kwargs)
else:
msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items())
logger.error(msg)
class ExcelProcessorError(Exception):
"""Exception for Excel processing errors with structured error codes."""
INVALID_FORMAT = "INVALID_FORMAT"
EXCEL_CORRUPTED = "EXCEL_CORRUPTED"
EXCEL_READ_ERROR = "EXCEL_READ_ERROR"
EXCEL_WRITE_ERROR = "EXCEL_WRITE_ERROR"
EXCEL_TOO_LARGE = "EXCEL_TOO_LARGE"
ERROR_MESSAGES = {
INVALID_FORMAT: "Format de fichier non supporte. Utilisez .xlsx.",
EXCEL_CORRUPTED: "Le fichier Excel est corrompu ou illisible.",
EXCEL_READ_ERROR: "Erreur lors de la lecture du fichier Excel.",
EXCEL_WRITE_ERROR: "Erreur lors de la creation du fichier traduit.",
EXCEL_TOO_LARGE: "Le fichier est trop volumineux (max 50 Mo).",
}
def __init__(
self,
code: str,
message: Optional[str] = None,
details: Optional[Dict[str, Any]] = None,
):
self.code = code
self.message = message or self.ERROR_MESSAGES.get(code, "Erreur inconnue")
self.details = details or {}
super().__init__(self.message)
def to_dict(self) -> Dict[str, Any]:
"""Convert error to dictionary format for API responses."""
result = {"error": self.code, "message": self.message}
if self.details:
result["details"] = self.details
return result
class ExcelTranslator:
"""
Handles translation of Excel files with strict formatting preservation.
Uses the new TranslationProvider interface for improved error handling
and fallback chain support.
"""
MAX_FILE_SIZE_MB = 50
XLSX_MAGIC_BYTES = b"PK" # .xlsx files are ZIP archives
def __init__(self, provider: Optional[TranslationProvider] = None):
"""
Initialize ExcelTranslator.
Args:
provider: TranslationProvider instance for translations.
If None, will use fallback to legacy translation_service.
"""
self._provider = provider
self.formula_pattern = re.compile(r"=.*")
self._custom_prompt: Optional[str] = None
def set_provider(self, provider: TranslationProvider) -> None:
"""Set the translation provider."""
self._provider = provider
def set_custom_prompt(self, prompt: Optional[str]) -> None:
"""Set custom system prompt for LLM providers."""
self._custom_prompt = prompt
def translate_file(
self,
input_path: Path,
output_path: Path,
target_language: str,
source_language: str = "auto",
progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None,
) -> Path:
"""
Translate an Excel file while preserving all formatting and structure.
Uses batch translation for improved performance.
Args:
input_path: Path to input Excel file
output_path: Path for translated output file
target_language: Target language code (e.g., 'fr', 'en')
source_language: Source language code (default: auto-detect)
progress_callback: Optional callback for progress updates
Receives dict with: sheet, total_sheets, cells_translated
Returns:
Path to translated file
Raises:
ExcelProcessorError: If file is invalid, corrupted, or processing fails
"""
start_time = time.time()
input_path = Path(input_path)
output_path = Path(output_path)
self._validate_file(input_path)
try:
workbook = load_workbook(input_path, data_only=False)
except Exception as e:
raise ExcelProcessorError(
code=ExcelProcessorError.EXCEL_CORRUPTED,
details={"file_name": input_path.name, "error": str(e)},
)
try:
cells_translated = 0
total_sheets = len(workbook.sheetnames)
# Emit initial progress
if progress_callback:
progress_callback(
{
"current": 0,
"total": total_sheets,
"sheet": 0,
"total_sheets": total_sheets,
"cells_translated": 0,
}
)
text_elements: List[Tuple[str, Callable[[str], None]]] = []
sheet_names_to_translate = []
for sheet_idx, sheet_name in enumerate(workbook.sheetnames):
worksheet = workbook[sheet_name]
self._collect_from_worksheet(worksheet, text_elements)
sheet_names_to_translate.append(sheet_name)
# Emit progress after each sheet collection (ensures < 500ms latency)
if progress_callback:
progress_callback(
{
"current": sheet_idx + 1,
"total": total_sheets,
"sheet": sheet_idx + 1,
"total_sheets": total_sheets,
"cells_translated": cells_translated,
}
)
for sheet_name in sheet_names_to_translate:
text_elements.append((sheet_name, None))
if text_elements:
texts = [elem[0] for elem in text_elements]
total_texts = len(texts)
sheet_name_offset = total_texts - len(sheet_names_to_translate)
_log_info(
"excel_batch_translation_start",
file_name=input_path.name,
text_count=total_texts,
target_lang=target_language,
)
# Translate all text elements in parallel chunks, reporting real-time
# progress after each chunk completes.
CHUNK_SIZE = 15
MAX_WORKERS = 6
chunks = [
(i, texts[i : i + CHUNK_SIZE])
for i in range(0, total_texts, CHUNK_SIZE)
]
translated_texts: List[str] = [""] * total_texts
completed_items = [0]
def _translate_chunk(
chunk_idx: int, chunk: List[str]
) -> Tuple[int, List[str]]:
return chunk_idx, self._batch_translate(
chunk, target_language, source_language
)
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
future_map = {
pool.submit(_translate_chunk, idx, chunk): (idx, chunk)
for idx, chunk in chunks
}
for future in concurrent.futures.as_completed(future_map):
chunk_idx, translated_chunk = future.result()
for j, t in enumerate(translated_chunk):
translated_texts[chunk_idx + j] = t
completed_items[0] += len(translated_chunk)
if progress_callback:
done = min(completed_items[0], total_texts)
progress_callback(
{
"current": done,
"total": total_texts,
"sheet": done,
"total_sheets": total_texts,
"cells_translated": cells_translated,
}
)
# Apply cell translations
for i, ((original_text, setter), translated) in enumerate(
zip(
text_elements[:sheet_name_offset],
translated_texts[:sheet_name_offset],
)
):
if translated is not None and setter is not None:
try:
setter(translated)
cells_translated += 1
except Exception as e:
_log_error(
"excel_setter_error",
error=str(e),
index=i,
)
# Apply sheet name translations
sheet_name_mapping = {}
for i, (sheet_name, translated) in enumerate(
zip(sheet_names_to_translate, translated_texts[sheet_name_offset:])
):
if translated and translated != sheet_name:
new_name = self._sanitize_sheet_name(translated)
counter = 1
base_name = new_name[:28] if len(new_name) > 28 else new_name
while (
new_name in sheet_name_mapping.values()
or new_name in workbook.sheetnames
):
new_name = f"{base_name}_{counter}"
counter += 1
sheet_name_mapping[sheet_name] = new_name
for original_name, new_name in sheet_name_mapping.items():
try:
workbook[original_name].title = new_name
except ValueError:
_log_error(
"excel_sheet_rename_failed",
original_name=original_name,
new_name=new_name,
)
try:
workbook.save(output_path)
except Exception as e:
raise ExcelProcessorError(
code=ExcelProcessorError.EXCEL_WRITE_ERROR,
details={"file_name": output_path.name, "error": str(e)},
)
workbook.close()
processing_time_ms = round((time.time() - start_time) * 1000, 2)
_log_info(
"excel_translation_success",
file_name=input_path.name,
sheets_processed=total_sheets,
cells_translated=cells_translated,
source_lang=source_language,
target_lang=target_language,
processing_time_ms=processing_time_ms,
)
return output_path
except ExcelProcessorError:
raise
except Exception as e:
raise ExcelProcessorError(
code=ExcelProcessorError.EXCEL_READ_ERROR,
details={"file_name": input_path.name, "error": str(e)},
)
def _validate_file(self, file_path: Path) -> None:
"""Validate file format and size."""
if not file_path.exists():
raise ExcelProcessorError(
code=ExcelProcessorError.EXCEL_READ_ERROR,
message=f"Fichier introuvable: {file_path.name}",
details={"file_name": file_path.name},
)
if file_path.suffix.lower() != ".xlsx":
raise ExcelProcessorError(
code=ExcelProcessorError.INVALID_FORMAT,
details={
"file_name": file_path.name,
"extension": file_path.suffix,
"expected": ".xlsx",
},
)
with open(file_path, "rb") as f:
header = f.read(4)
if header[:2] != self.XLSX_MAGIC_BYTES:
raise ExcelProcessorError(
code=ExcelProcessorError.INVALID_FORMAT,
details={"file_name": file_path.name, "reason": "Invalid file header"},
)
file_size_mb = file_path.stat().st_size / (1024 * 1024)
if file_size_mb > self.MAX_FILE_SIZE_MB:
raise ExcelProcessorError(
code=ExcelProcessorError.EXCEL_TOO_LARGE,
details={
"file_name": file_path.name,
"size_mb": round(file_size_mb, 2),
"max_mb": self.MAX_FILE_SIZE_MB,
},
)
def _sanitize_sheet_name(self, name: str) -> str:
"""
Sanitize a sheet name to be valid for Excel.
Excel forbids: : \\ / ? * [ ]
Max length: 31 characters
"""
invalid_chars = ":\\/?*[]"
sanitized = "".join(c if c not in invalid_chars else "_" for c in name)
return sanitized[:31]
def _batch_translate(
self, texts: List[str], target_language: str, source_language: str = "auto"
) -> List[str]:
"""
Batch translate using new provider interface.
Args:
texts: List of texts to translate
target_language: Target language code
source_language: Source language code
Returns:
List of translated texts (same order as input)
"""
if not texts:
return []
if self._provider is not None:
return self._translate_with_provider(
texts, target_language, source_language
)
return self._translate_with_legacy(texts, target_language, source_language)
def _translate_with_provider(
self, texts: List[str], target_language: str, source_language: str
) -> List[str]:
"""Translate using the TranslationProvider.translate_batch() interface."""
translated = self._provider.translate_batch(texts, target_language, source_language)
return [
t if (t and t.strip()) else orig
for t, orig in zip(translated, texts)
]
def _translate_with_legacy(
self, texts: List[str], target_language: str, source_language: str
) -> List[str]:
"""Fallback to legacy translation_service for backward compatibility."""
from services.translation_service import translation_service
_log_info(
"excel_using_legacy_service",
text_count=len(texts),
target_lang=target_language,
)
return translation_service.translate_batch(
texts, target_language, source_language
)
def _collect_from_worksheet(
self,
worksheet: Worksheet,
text_elements: List[Tuple[str, Callable[[str], None]]],
) -> None:
"""Collect all translatable text from worksheet cells."""
for row in worksheet.iter_rows():
for cell in row:
if cell.value is not None:
self._collect_from_cell(cell, text_elements)
def _collect_from_cell(
self, cell: Cell, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Collect text from a cell."""
original_value = cell.value
if original_value is None:
return
if isinstance(original_value, str) and original_value.startswith("="):
# Handle both double quotes and single quotes in formulas
# Also handles escaped quotes: "He said ""hello""" -> He said "hello"
string_pattern = re.compile(r'"((?:[^"\\]|\\.)*)"')
single_quote_pattern = re.compile(r"'((?:[^'\\]|\\.)*)'")
strings = string_pattern.findall(original_value)
strings.extend(single_quote_pattern.findall(original_value))
for s in strings:
if s.strip():
def make_formula_setter(c, orig_formula, orig_string):
def setter(translated):
# Escape quotes in translated text to preserve formula validity
escaped_translated = translated.replace('"', '""')
c.value = orig_formula.replace(
f'"{orig_string}"', f'"{escaped_translated}"'
)
return setter
text_elements.append(
(s, make_formula_setter(cell, original_value, s))
)
elif isinstance(original_value, str) and original_value.strip():
def make_setter(c):
def setter(text):
c.value = text
return setter
text_elements.append((original_value, make_setter(cell)))
def _translate_images(self, worksheet: Worksheet, target_language: str) -> None:
"""
Translate text in images using vision model.
NOTE: This method is currently NOT CALLED in translate_file() as image translation
is not part of the current story scope (Story 2.7). It is intentionally preserved
for future implementation when vision model support is prioritized.
TODO: Call this method during translate_file() when implementing image translation feature.
"""
try:
images = getattr(worksheet, "_images", [])
for idx, image in enumerate(images):
try:
image_data = image._data()
ext = image.format or "png"
with tempfile.NamedTemporaryFile(
suffix=f".{ext}", delete=False
) as tmp:
tmp.write(image_data)
tmp_path = tmp.name
translated_text = self._translate_image_with_legacy(
tmp_path, target_language
)
os.unlink(tmp_path)
if translated_text and translated_text.strip():
anchor = image.anchor
if hasattr(anchor, "_from"):
cell_ref = f"{get_column_letter(anchor._from.col + 1)}{anchor._from.row + 1}"
cell = worksheet[cell_ref]
from openpyxl.comments import Comment
cell.comment = Comment(
f"Image translation: {translated_text}", "Translator"
)
_log_info(
"excel_image_translation_added",
cell_ref=cell_ref,
)
except Exception as e:
_log_error(
"excel_image_translation_error",
image_index=idx,
error=str(e),
)
except Exception as e:
_log_error(
"excel_image_processing_error",
error=str(e),
)
def _translate_image_with_legacy(
self, image_path: str, target_language: str
) -> str:
"""Translate image using legacy service."""
from services.translation_service import translation_service
if hasattr(translation_service, "translate_image"):
return translation_service.translate_image(image_path, target_language)
return ""
excel_translator = ExcelTranslator()