Files
office_translator/translators/excel_translator.py
sepehr c0f93501cc
All checks were successful
Deploy to Production / Build and Deploy (push) Successful in 2s
fix: use Google Cloud API key for classic mode + translation verification
Two critical fixes:

1. Provider "google" (default classic mode) now checks for a Google Cloud
   API key (GOOGLE_CLOUD_API_KEY in env or admin settings). If present,
   uses GoogleCloudTranslationProvider (official API). Previously it
   always fell through to deep_translator (free scraper) which gets
   blocked in production, silently returning untranslated text.

2. Added translation verification: each translator now tracks how many
   texts were attempted vs actually changed. If 0 texts were translated,
   the job is marked as FAILED with a clear error message instead of
   returning the original file as "completed".

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-17 12:09:26 +02:00

716 lines
29 KiB
Python

"""
Excel Translation Module
Translates Excel files while preserving all formatting, formulas, images, and layout
OPTIMIZED: Uses batch translation for 5-10x faster processing
Updated to use new TranslationProvider interface with structured error handling.
"""
import re
import tempfile
import os
import time
import zipfile
import io
import concurrent.futures
from pathlib import Path
from typing import Dict, Set, List, Tuple, Optional, Callable, Any
from lxml import etree
from openpyxl import load_workbook
from openpyxl.worksheet.worksheet import Worksheet
from openpyxl.cell.cell import Cell
from openpyxl.utils import get_column_letter
from services.providers.base import TranslationProvider
from core.logging import get_logger
logger = get_logger(__name__)
_HAS_STRUCTLOG = True
def _log_info(event: str, **kwargs):
"""Log info with structlog or standard logging compatibility."""
if _HAS_STRUCTLOG:
logger.info(event, **kwargs)
else:
msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items())
logger.info(msg)
def _log_error(event: str, **kwargs):
"""Log error with structlog or standard logging compatibility."""
if _HAS_STRUCTLOG:
logger.error(event, **kwargs)
else:
msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items())
logger.error(msg)
class ExcelProcessorError(Exception):
"""Exception for Excel processing errors with structured error codes."""
INVALID_FORMAT = "INVALID_FORMAT"
EXCEL_CORRUPTED = "EXCEL_CORRUPTED"
EXCEL_READ_ERROR = "EXCEL_READ_ERROR"
EXCEL_WRITE_ERROR = "EXCEL_WRITE_ERROR"
EXCEL_TOO_LARGE = "EXCEL_TOO_LARGE"
ERROR_MESSAGES = {
INVALID_FORMAT: "Format de fichier non supporte. Utilisez .xlsx.",
EXCEL_CORRUPTED: "Le fichier Excel est corrompu ou illisible.",
EXCEL_READ_ERROR: "Erreur lors de la lecture du fichier Excel.",
EXCEL_WRITE_ERROR: "Erreur lors de la creation du fichier traduit.",
EXCEL_TOO_LARGE: "Le fichier est trop volumineux (max 50 Mo).",
}
def __init__(
self,
code: str,
message: Optional[str] = None,
details: Optional[Dict[str, Any]] = None,
):
self.code = code
self.message = message or self.ERROR_MESSAGES.get(code, "Erreur inconnue")
self.details = details or {}
super().__init__(self.message)
def to_dict(self) -> Dict[str, Any]:
"""Convert error to dictionary format for API responses."""
result = {"error": self.code, "message": self.message}
if self.details:
result["details"] = self.details
return result
class ExcelTranslator:
"""
Handles translation of Excel files with strict formatting preservation.
Uses the new TranslationProvider interface for improved error handling
and fallback chain support.
"""
MAX_FILE_SIZE_MB = 50
XLSX_MAGIC_BYTES = b"PK" # .xlsx files are ZIP archives
def __init__(self, provider: Optional[TranslationProvider] = None):
"""
Initialize ExcelTranslator.
Args:
provider: TranslationProvider instance for translations.
If None, will use fallback to legacy translation_service.
"""
self._provider = provider
self.formula_pattern = re.compile(r"=.*")
self._custom_prompt: Optional[str] = None
self._translation_stats = {"attempted": 0, "changed": 0}
def set_provider(self, provider: TranslationProvider) -> None:
"""Set the translation provider."""
self._provider = provider
def set_custom_prompt(self, prompt: Optional[str]) -> None:
"""Set custom system prompt for LLM providers."""
self._custom_prompt = prompt
def translate_file(
self,
input_path: Path,
output_path: Path,
target_language: str,
source_language: str = "auto",
progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None,
) -> Path:
"""
Translate an Excel file while preserving all formatting and structure.
Uses batch translation for improved performance.
Args:
input_path: Path to input Excel file
output_path: Path for translated output file
target_language: Target language code (e.g., 'fr', 'en')
source_language: Source language code (default: auto-detect)
progress_callback: Optional callback for progress updates
Receives dict with: sheet, total_sheets, cells_translated
Returns:
Path to translated file
Raises:
ExcelProcessorError: If file is invalid, corrupted, or processing fails
"""
start_time = time.time()
input_path = Path(input_path)
output_path = Path(output_path)
self._validate_file(input_path)
try:
workbook = load_workbook(input_path, data_only=False)
except Exception as e:
raise ExcelProcessorError(
code=ExcelProcessorError.EXCEL_CORRUPTED,
details={"file_name": input_path.name, "error": str(e)},
)
try:
cells_translated = 0
total_sheets = len(workbook.sheetnames)
# Emit initial progress
if progress_callback:
progress_callback(
{
"current": 0,
"total": total_sheets,
"sheet": 0,
"total_sheets": total_sheets,
"cells_translated": 0,
}
)
text_elements: List[Tuple[str, Callable[[str], None]]] = []
sheet_names_to_translate = []
chart_translations: List[Dict[str, Any]] = []
for sheet_idx, sheet_name in enumerate(workbook.sheetnames):
worksheet = workbook[sheet_name]
self._collect_from_worksheet(worksheet, text_elements)
# Collect header/footer text
self._collect_from_header_footer(worksheet, text_elements)
sheet_names_to_translate.append(sheet_name)
# Emit progress after each sheet collection (ensures < 500ms latency)
if progress_callback:
progress_callback(
{
"current": sheet_idx + 1,
"total": total_sheets,
"sheet": sheet_idx + 1,
"total_sheets": total_sheets,
"cells_translated": cells_translated,
}
)
for sheet_name in sheet_names_to_translate:
text_elements.append((sheet_name, None))
# Collect chart text from ZIP
self._collect_charts_from_zip(input_path, text_elements, chart_translations)
if text_elements:
texts = [elem[0] for elem in text_elements]
total_texts = len(texts)
sheet_name_offset = total_texts - len(sheet_names_to_translate)
_log_info(
"excel_batch_translation_start",
file_name=input_path.name,
text_count=total_texts,
target_lang=target_language,
)
# Translate all text elements in parallel chunks, reporting real-time
# progress after each chunk completes.
CHUNK_SIZE = 15
MAX_WORKERS = 6
chunks = [
(i, texts[i : i + CHUNK_SIZE])
for i in range(0, total_texts, CHUNK_SIZE)
]
translated_texts: List[str] = [""] * total_texts
completed_items = [0]
def _translate_chunk(
chunk_idx: int, chunk: List[str]
) -> Tuple[int, List[str]]:
return chunk_idx, self._batch_translate(
chunk, target_language, source_language
)
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
future_map = {
pool.submit(_translate_chunk, idx, chunk): (idx, chunk)
for idx, chunk in chunks
}
for future in concurrent.futures.as_completed(future_map):
chunk_idx, translated_chunk = future.result()
for j, t in enumerate(translated_chunk):
translated_texts[chunk_idx + j] = t
completed_items[0] += len(translated_chunk)
if progress_callback:
done = min(completed_items[0], total_texts)
progress_callback(
{
"current": done,
"total": total_texts,
"sheet": done,
"total_sheets": total_texts,
"cells_translated": cells_translated,
}
)
# Apply cell translations
for i, ((original_text, setter), translated) in enumerate(
zip(
text_elements[:sheet_name_offset],
translated_texts[:sheet_name_offset],
)
):
if translated is not None and setter is not None:
try:
setter(translated)
cells_translated += 1
except Exception as e:
_log_error(
"excel_setter_error",
error=str(e),
index=i,
)
# Apply sheet name translations
sheet_name_mapping = {}
for i, (sheet_name, translated) in enumerate(
zip(sheet_names_to_translate, translated_texts[sheet_name_offset:])
):
if translated and translated != sheet_name:
new_name = self._sanitize_sheet_name(translated)
counter = 1
base_name = new_name[:28] if len(new_name) > 28 else new_name
while (
new_name in sheet_name_mapping.values()
or new_name in workbook.sheetnames
):
new_name = f"{base_name}_{counter}"
counter += 1
sheet_name_mapping[sheet_name] = new_name
for original_name, new_name in sheet_name_mapping.items():
try:
workbook[original_name].title = new_name
except ValueError:
_log_error(
"excel_sheet_rename_failed",
original_name=original_name,
new_name=new_name,
)
try:
workbook.save(output_path)
except Exception as e:
raise ExcelProcessorError(
code=ExcelProcessorError.EXCEL_WRITE_ERROR,
details={"file_name": output_path.name, "error": str(e)},
)
# Re-inject chart translations into the .xlsx ZIP
if chart_translations:
self._apply_chart_translations(output_path, chart_translations)
workbook.close()
processing_time_ms = round((time.time() - start_time) * 1000, 2)
_log_info(
"excel_translation_success",
file_name=input_path.name,
sheets_processed=total_sheets,
cells_translated=cells_translated,
source_lang=source_language,
target_lang=target_language,
processing_time_ms=processing_time_ms,
)
return output_path
except ExcelProcessorError:
raise
except Exception as e:
raise ExcelProcessorError(
code=ExcelProcessorError.EXCEL_READ_ERROR,
details={"file_name": input_path.name, "error": str(e)},
)
def _validate_file(self, file_path: Path) -> None:
"""Validate file format and size."""
if not file_path.exists():
raise ExcelProcessorError(
code=ExcelProcessorError.EXCEL_READ_ERROR,
message=f"Fichier introuvable: {file_path.name}",
details={"file_name": file_path.name},
)
if file_path.suffix.lower() != ".xlsx":
raise ExcelProcessorError(
code=ExcelProcessorError.INVALID_FORMAT,
details={
"file_name": file_path.name,
"extension": file_path.suffix,
"expected": ".xlsx",
},
)
with open(file_path, "rb") as f:
header = f.read(4)
if header[:2] != self.XLSX_MAGIC_BYTES:
raise ExcelProcessorError(
code=ExcelProcessorError.INVALID_FORMAT,
details={"file_name": file_path.name, "reason": "Invalid file header"},
)
file_size_mb = file_path.stat().st_size / (1024 * 1024)
if file_size_mb > self.MAX_FILE_SIZE_MB:
raise ExcelProcessorError(
code=ExcelProcessorError.EXCEL_TOO_LARGE,
details={
"file_name": file_path.name,
"size_mb": round(file_size_mb, 2),
"max_mb": self.MAX_FILE_SIZE_MB,
},
)
def _sanitize_sheet_name(self, name: str) -> str:
"""
Sanitize a sheet name to be valid for Excel.
Excel forbids: : \\ / ? * [ ]
Max length: 31 characters
"""
invalid_chars = ":\\/?*[]"
sanitized = "".join(c if c not in invalid_chars else "_" for c in name)
return sanitized[:31]
def _batch_translate(
self, texts: List[str], target_language: str, source_language: str = "auto"
) -> List[str]:
if not texts:
return []
non_empty = [t for t in texts if t and t.strip()]
self._translation_stats["attempted"] += len(non_empty)
if self._provider is not None:
translated = self._translate_with_provider(
texts, target_language, source_language
)
else:
translated = self._translate_with_legacy(texts, target_language, source_language)
changed = sum(1 for orig, trans in zip(texts, translated) if orig != trans and trans.strip())
self._translation_stats["changed"] += changed
return translated
def get_translation_stats(self) -> dict:
return dict(self._translation_stats)
def _translate_with_provider(
self, texts: List[str], target_language: str, source_language: str
) -> List[str]:
"""Translate using the TranslationProvider.translate_batch() interface."""
translated = self._provider.translate_batch(texts, target_language, source_language)
return [
t if (t and t.strip()) else orig
for t, orig in zip(translated, texts)
]
def _translate_with_legacy(
self, texts: List[str], target_language: str, source_language: str
) -> List[str]:
"""Fallback to legacy translation_service for backward compatibility."""
from services.translation_service import translation_service
_log_info(
"excel_using_legacy_service",
text_count=len(texts),
target_lang=target_language,
)
return translation_service.translate_batch(
texts, target_language, source_language
)
def _collect_from_worksheet(
self,
worksheet: Worksheet,
text_elements: List[Tuple[str, Callable[[str], None]]],
) -> None:
"""Collect all translatable text from worksheet cells."""
for row in worksheet.iter_rows():
for cell in row:
if cell.value is not None:
self._collect_from_cell(cell, text_elements)
def _collect_from_cell(
self, cell: Cell, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Collect text from a cell."""
original_value = cell.value
if original_value is None:
return
if isinstance(original_value, str) and original_value.startswith("="):
# Handle both double quotes and single quotes in formulas
# Also handles escaped quotes: "He said ""hello""" -> He said "hello"
string_pattern = re.compile(r'"((?:[^"\\]|\\.)*)"')
single_quote_pattern = re.compile(r"'((?:[^'\\]|\\.)*)'")
strings = string_pattern.findall(original_value)
strings.extend(single_quote_pattern.findall(original_value))
for s in strings:
if s.strip():
def make_formula_setter(c, orig_formula, orig_string):
def setter(translated):
# Escape quotes in translated text to preserve formula validity
escaped_translated = translated.replace('"', '""')
c.value = orig_formula.replace(
f'"{orig_string}"', f'"{escaped_translated}"'
)
return setter
text_elements.append(
(s, make_formula_setter(cell, original_value, s))
)
elif isinstance(original_value, str) and original_value.strip():
def make_setter(c):
def setter(text):
c.value = text
return setter
text_elements.append((original_value, make_setter(cell)))
def _collect_from_header_footer(
self, worksheet: Worksheet, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Collect text from worksheet headers and footers.
Headers/footers can contain text like "Page &P of &N" or "Confidential - &D".
We translate the static text portions, preserving the &X codes.
"""
for section in worksheet.oddHeader, worksheet.oddFooter, worksheet.evenHeader, worksheet.evenFooter, worksheet.firstHeader, worksheet.firstFooter:
if section is None:
continue
# openpyxl Header/Footer sections have .left, .center, .right attributes
for attr in ('left', 'center', 'right'):
text = getattr(section, attr, None)
if text and isinstance(text, str) and text.strip():
# Extract translatable text (remove &X codes for translation, keep structure)
import re as _re
# Split on &X codes (like &P, &N, &D, &F, &A, etc.)
parts = _re.split(r'(&[A-Za-z])', text)
for i, part in enumerate(parts):
if part and not part.startswith('&') and part.strip():
original = part.strip()
def make_hf_setter(sec, attribute, idx):
def setter(translated):
current = getattr(sec, attribute, '') or ''
parts_local = _re.split(r'(&[A-Za-z])', current)
if idx < len(parts_local):
parts_local[idx] = translated
setattr(sec, attribute, ''.join(parts_local))
return setter
text_elements.append((original, make_hf_setter(section, attr, i)))
def _collect_charts_from_zip(
self, input_path: Path, text_elements: List[Tuple[str, Callable[[str], None]]],
chart_translations: List[Dict[str, Any]]
) -> None:
"""Parse chart XML from the .xlsx ZIP and collect translatable text."""
_NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main"
_NS_C = "http://schemas.openxmlformats.org/drawingml/2006/chart"
try:
with zipfile.ZipFile(input_path, 'r') as zf:
chart_files = [name for name in zf.namelist() if name.startswith('xl/charts/') and name.endswith('.xml')]
for chart_file in chart_files:
try:
chart_xml = etree.fromstring(zf.read(chart_file))
seen_texts: set = set()
# Collect from <a:t> elements (titles, axis labels, legend text)
for t_elem in chart_xml.iter(f'{{{_NS_A}}}t'):
if t_elem.text and t_elem.text.strip() and t_elem.text.strip() not in seen_texts:
seen_texts.add(t_elem.text.strip())
entry = {
'chart_file': chart_file,
'original': t_elem.text.strip(),
'translated': None,
}
chart_translations.append(entry)
def make_chart_setter(entries, idx):
def setter(text):
entries[idx]['translated'] = text.strip()
return setter
text_elements.append(
(t_elem.text.strip(), make_chart_setter(chart_translations, len(chart_translations) - 1))
)
# Collect from <c:v> elements (category names, series names)
for v_elem in chart_xml.iter(f'{{{_NS_C}}}v'):
text = v_elem.text
if text and text.strip() and not text.strip().replace('.', '').replace('-', '').replace(',', '').isdigit():
if text.strip() not in seen_texts:
seen_texts.add(text.strip())
entry = {
'chart_file': chart_file,
'original': text.strip(),
'translated': None,
}
chart_translations.append(entry)
def make_chart_v_setter(entries, idx):
def setter(text):
entries[idx]['translated'] = text.strip()
return setter
text_elements.append(
(text.strip(), make_chart_v_setter(chart_translations, len(chart_translations) - 1))
)
except Exception as e:
_log_error("excel_chart_parse_error", chart_file=chart_file, error=str(e))
except Exception as e:
_log_error("excel_charts_zip_error", error=str(e))
def _apply_chart_translations(self, output_path: Path, chart_translations: List[Dict[str, Any]]) -> None:
"""Re-inject chart translations into the .xlsx ZIP."""
if not chart_translations:
return
translated_entries = [e for e in chart_translations if 'translated' in e and e['translated']]
if not translated_entries:
return
_NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main"
_NS_C = "http://schemas.openxmlformats.org/drawingml/2006/chart"
chart_files_to_update: Dict[str, List[Dict]] = {}
for entry in translated_entries:
cf = entry['chart_file']
if cf not in chart_files_to_update:
chart_files_to_update[cf] = []
chart_files_to_update[cf].append(entry)
try:
with zipfile.ZipFile(output_path, 'r') as zf_in:
existing_entries = zf_in.namelist()
buf = io.BytesIO()
with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf_out:
for item in existing_entries:
data = zf_in.read(item)
if item in chart_files_to_update:
try:
chart_xml = etree.fromstring(data)
for entry in chart_files_to_update[item]:
for t_elem in chart_xml.iter(f'{{{_NS_A}}}t'):
if t_elem.text and t_elem.text.strip() == entry['original']:
t_elem.text = entry['translated']
break
else:
for v_elem in chart_xml.iter(f'{{{_NS_C}}}v'):
if v_elem.text and v_elem.text.strip() == entry['original']:
v_elem.text = entry['translated']
break
data = etree.tostring(chart_xml, xml_declaration=True, encoding='UTF-8', standalone=True)
except Exception as e:
_log_error("excel_chart_update_error", chart_file=item, error=str(e))
zf_out.writestr(item, data)
with open(output_path, 'wb') as f:
f.write(buf.getvalue())
_log_info("excel_charts_translated", chart_files=len(chart_files_to_update), translations=len(translated_entries))
except Exception as e:
_log_error("excel_chart_zip_rewrite_error", error=str(e))
def _translate_images(self, worksheet: Worksheet, target_language: str) -> None:
"""
Translate text in images using vision model.
NOTE: This method is currently NOT CALLED in translate_file() as image translation
is not part of the current story scope (Story 2.7). It is intentionally preserved
for future implementation when vision model support is prioritized.
TODO: Call this method during translate_file() when implementing image translation feature.
"""
try:
images = getattr(worksheet, "_images", [])
for idx, image in enumerate(images):
try:
image_data = image._data()
ext = image.format or "png"
with tempfile.NamedTemporaryFile(
suffix=f".{ext}", delete=False
) as tmp:
tmp.write(image_data)
tmp_path = tmp.name
translated_text = self._translate_image_with_legacy(
tmp_path, target_language
)
os.unlink(tmp_path)
if translated_text and translated_text.strip():
anchor = image.anchor
if hasattr(anchor, "_from"):
cell_ref = f"{get_column_letter(anchor._from.col + 1)}{anchor._from.row + 1}"
cell = worksheet[cell_ref]
from openpyxl.comments import Comment
cell.comment = Comment(
f"Image translation: {translated_text}", "Translator"
)
_log_info(
"excel_image_translation_added",
cell_ref=cell_ref,
)
except Exception as e:
_log_error(
"excel_image_translation_error",
image_index=idx,
error=str(e),
)
except Exception as e:
_log_error(
"excel_image_processing_error",
error=str(e),
)
def _translate_image_with_legacy(
self, image_path: str, target_language: str
) -> str:
"""Translate image using legacy service."""
from services.translation_service import translation_service
if hasattr(translation_service, "translate_image"):
return translation_service.translate_image(image_path, target_language)
return ""
excel_translator = ExcelTranslator()