feat: revue de code, doc CODE_REVIEW, forfaits 2026, traduction LLM, providers avec modèle

Made-with: Cursor
This commit is contained in:
Sepehr Ramezani
2026-03-07 11:42:58 +01:00
parent 3d37ce4582
commit 473b3e26c7
181 changed files with 30617 additions and 7170 deletions

View File

@@ -1,10 +1,17 @@
"""Translators package initialization"""
from .excel_translator import ExcelTranslator, excel_translator
from .word_translator import WordTranslator, word_translator
from .pptx_translator import PowerPointTranslator, pptx_translator
from .excel_translator import ExcelTranslator, excel_translator, ExcelProcessorError
from .word_translator import WordTranslator, word_translator, WordProcessorError
from .pptx_translator import PowerPointTranslator, pptx_translator, PptxProcessorError
__all__ = [
'ExcelTranslator', 'excel_translator',
'WordTranslator', 'word_translator',
'PowerPointTranslator', 'pptx_translator'
"ExcelTranslator",
"excel_translator",
"ExcelProcessorError",
"WordTranslator",
"word_translator",
"WordProcessorError",
"PowerPointTranslator",
"pptx_translator",
"PptxProcessorError",
]

View File

@@ -2,159 +2,553 @@
Excel Translation Module
Translates Excel files while preserving all formatting, formulas, images, and layout
OPTIMIZED: Uses batch translation for 5-10x faster processing
Updated to use new TranslationProvider interface with structured error handling.
"""
import re
import tempfile
import os
import time
import concurrent.futures
from pathlib import Path
from typing import Dict, Set, List, Tuple
from typing import Dict, Set, List, Tuple, Optional, Callable, Any
from openpyxl import load_workbook
from openpyxl.worksheet.worksheet import Worksheet
from openpyxl.cell.cell import Cell
from openpyxl.utils import get_column_letter
from services.translation_service import translation_service
from services.providers.base import TranslationProvider
try:
import structlog
logger = structlog.get_logger(__name__)
_HAS_STRUCTLOG = True
except ImportError:
import logging
logger = logging.getLogger(__name__)
_HAS_STRUCTLOG = False
def _log_info(event: str, **kwargs):
"""Log info with structlog or standard logging compatibility."""
if _HAS_STRUCTLOG:
logger.info(event, **kwargs)
else:
msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items())
logger.info(msg)
def _log_error(event: str, **kwargs):
"""Log error with structlog or standard logging compatibility."""
if _HAS_STRUCTLOG:
logger.error(event, **kwargs)
else:
msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items())
logger.error(msg)
class ExcelProcessorError(Exception):
"""Exception for Excel processing errors with structured error codes."""
INVALID_FORMAT = "INVALID_FORMAT"
EXCEL_CORRUPTED = "EXCEL_CORRUPTED"
EXCEL_READ_ERROR = "EXCEL_READ_ERROR"
EXCEL_WRITE_ERROR = "EXCEL_WRITE_ERROR"
EXCEL_TOO_LARGE = "EXCEL_TOO_LARGE"
ERROR_MESSAGES = {
INVALID_FORMAT: "Format de fichier non supporte. Utilisez .xlsx.",
EXCEL_CORRUPTED: "Le fichier Excel est corrompu ou illisible.",
EXCEL_READ_ERROR: "Erreur lors de la lecture du fichier Excel.",
EXCEL_WRITE_ERROR: "Erreur lors de la creation du fichier traduit.",
EXCEL_TOO_LARGE: "Le fichier est trop volumineux (max 50 Mo).",
}
def __init__(
self,
code: str,
message: Optional[str] = None,
details: Optional[Dict[str, Any]] = None,
):
self.code = code
self.message = message or self.ERROR_MESSAGES.get(code, "Erreur inconnue")
self.details = details or {}
super().__init__(self.message)
def to_dict(self) -> Dict[str, Any]:
"""Convert error to dictionary format for API responses."""
result = {"error": self.code, "message": self.message}
if self.details:
result["details"] = self.details
return result
class ExcelTranslator:
"""Handles translation of Excel files with strict formatting preservation"""
def __init__(self):
self.translation_service = translation_service
self.formula_pattern = re.compile(r'=.*')
def translate_file(self, input_path: Path, output_path: Path, target_language: str) -> Path:
"""
Handles translation of Excel files with strict formatting preservation.
Uses the new TranslationProvider interface for improved error handling
and fallback chain support.
"""
MAX_FILE_SIZE_MB = 50
XLSX_MAGIC_BYTES = b"PK" # .xlsx files are ZIP archives
def __init__(self, provider: Optional[TranslationProvider] = None):
"""
Initialize ExcelTranslator.
Args:
provider: TranslationProvider instance for translations.
If None, will use fallback to legacy translation_service.
"""
self._provider = provider
self.formula_pattern = re.compile(r"=.*")
self._custom_prompt: Optional[str] = None
def set_provider(self, provider: TranslationProvider) -> None:
"""Set the translation provider."""
self._provider = provider
def set_custom_prompt(self, prompt: Optional[str]) -> None:
"""Set custom system prompt for LLM providers."""
self._custom_prompt = prompt
def translate_file(
self,
input_path: Path,
output_path: Path,
target_language: str,
source_language: str = "auto",
progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None,
) -> Path:
"""
Translate an Excel file while preserving all formatting and structure.
Uses batch translation for improved performance.
Args:
input_path: Path to input Excel file
output_path: Path for translated output file
target_language: Target language code (e.g., 'fr', 'en')
source_language: Source language code (default: auto-detect)
progress_callback: Optional callback for progress updates
Receives dict with: sheet, total_sheets, cells_translated
Returns:
Path to translated file
Raises:
ExcelProcessorError: If file is invalid, corrupted, or processing fails
"""
workbook = load_workbook(input_path, data_only=False)
# Collect all translatable text elements
text_elements = [] # List of (text, setter_function)
sheet_names_to_translate = []
for sheet_name in workbook.sheetnames:
worksheet = workbook[sheet_name]
self._collect_from_worksheet(worksheet, text_elements)
sheet_names_to_translate.append(sheet_name)
# Add sheet names to translate
sheet_name_setters = []
for sheet_name in sheet_names_to_translate:
text_elements.append((sheet_name, None)) # None setter - handled separately
sheet_name_setters.append(sheet_name)
# Batch translate all texts at once
if text_elements:
texts = [elem[0] for elem in text_elements]
print(f"Batch translating {len(texts)} text segments...")
translated_texts = self.translation_service.translate_batch(texts, target_language)
# Apply translations to cells
sheet_name_offset = len(text_elements) - len(sheet_name_setters)
for i, ((original_text, setter), translated) in enumerate(zip(text_elements[:sheet_name_offset], translated_texts[:sheet_name_offset])):
if translated is not None and setter is not None:
start_time = time.time()
input_path = Path(input_path)
output_path = Path(output_path)
self._validate_file(input_path)
try:
workbook = load_workbook(input_path, data_only=False)
except Exception as e:
raise ExcelProcessorError(
code=ExcelProcessorError.EXCEL_CORRUPTED,
details={"file_name": input_path.name, "error": str(e)},
)
try:
cells_translated = 0
total_sheets = len(workbook.sheetnames)
# Emit initial progress
if progress_callback:
progress_callback(
{
"current": 0,
"total": total_sheets,
"sheet": 0,
"total_sheets": total_sheets,
"cells_translated": 0,
}
)
text_elements: List[Tuple[str, Callable[[str], None]]] = []
sheet_names_to_translate = []
for sheet_idx, sheet_name in enumerate(workbook.sheetnames):
worksheet = workbook[sheet_name]
self._collect_from_worksheet(worksheet, text_elements)
sheet_names_to_translate.append(sheet_name)
# Emit progress after each sheet collection (ensures < 500ms latency)
if progress_callback:
progress_callback(
{
"current": sheet_idx + 1,
"total": total_sheets,
"sheet": sheet_idx + 1,
"total_sheets": total_sheets,
"cells_translated": cells_translated,
}
)
for sheet_name in sheet_names_to_translate:
text_elements.append((sheet_name, None))
if text_elements:
texts = [elem[0] for elem in text_elements]
total_texts = len(texts)
sheet_name_offset = total_texts - len(sheet_names_to_translate)
_log_info(
"excel_batch_translation_start",
file_name=input_path.name,
text_count=total_texts,
target_lang=target_language,
)
# Translate all text elements in parallel chunks, reporting real-time
# progress after each chunk completes.
CHUNK_SIZE = 15
MAX_WORKERS = 6
chunks = [
(i, texts[i : i + CHUNK_SIZE])
for i in range(0, total_texts, CHUNK_SIZE)
]
translated_texts: List[str] = [""] * total_texts
completed_items = [0]
def _translate_chunk(
chunk_idx: int, chunk: List[str]
) -> Tuple[int, List[str]]:
return chunk_idx, self._batch_translate(
chunk, target_language, source_language
)
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
future_map = {
pool.submit(_translate_chunk, idx, chunk): (idx, chunk)
for idx, chunk in chunks
}
for future in concurrent.futures.as_completed(future_map):
chunk_idx, translated_chunk = future.result()
for j, t in enumerate(translated_chunk):
translated_texts[chunk_idx + j] = t
completed_items[0] += len(translated_chunk)
if progress_callback:
done = min(completed_items[0], total_texts)
progress_callback(
{
"current": done,
"total": total_texts,
"sheet": done,
"total_sheets": total_texts,
"cells_translated": cells_translated,
}
)
# Apply cell translations
for i, ((original_text, setter), translated) in enumerate(
zip(
text_elements[:sheet_name_offset],
translated_texts[:sheet_name_offset],
)
):
if translated is not None and setter is not None:
try:
setter(translated)
cells_translated += 1
except Exception as e:
_log_error(
"excel_setter_error",
error=str(e),
index=i,
)
# Apply sheet name translations
sheet_name_mapping = {}
for i, (sheet_name, translated) in enumerate(
zip(sheet_names_to_translate, translated_texts[sheet_name_offset:])
):
if translated and translated != sheet_name:
new_name = self._sanitize_sheet_name(translated)
counter = 1
base_name = new_name[:28] if len(new_name) > 28 else new_name
while (
new_name in sheet_name_mapping.values()
or new_name in workbook.sheetnames
):
new_name = f"{base_name}_{counter}"
counter += 1
sheet_name_mapping[sheet_name] = new_name
for original_name, new_name in sheet_name_mapping.items():
try:
setter(translated)
except Exception as e:
print(f"Error applying translation: {e}")
# Apply sheet name translations
sheet_name_mapping = {}
for i, (sheet_name, translated) in enumerate(zip(sheet_name_setters, translated_texts[sheet_name_offset:])):
if translated and translated != sheet_name:
new_name = translated[:31]
counter = 1
base_name = new_name[:28] if len(new_name) > 28 else new_name
while new_name in sheet_name_mapping.values() or new_name in workbook.sheetnames:
new_name = f"{base_name}_{counter}"
counter += 1
sheet_name_mapping[sheet_name] = new_name
# Rename sheets
for original_name, new_name in sheet_name_mapping.items():
workbook[original_name].title = new_name
# Translate images if enabled (separate process)
if getattr(self.translation_service, 'translate_images', False):
for sheet_name in workbook.sheetnames:
self._translate_images(workbook[sheet_name], target_language)
workbook.save(output_path)
workbook.close()
return output_path
def _collect_from_worksheet(self, worksheet: Worksheet, text_elements: List[Tuple[str, callable]]):
"""Collect all translatable text from worksheet cells"""
workbook[original_name].title = new_name
except ValueError:
_log_error(
"excel_sheet_rename_failed",
original_name=original_name,
new_name=new_name,
)
try:
workbook.save(output_path)
except Exception as e:
raise ExcelProcessorError(
code=ExcelProcessorError.EXCEL_WRITE_ERROR,
details={"file_name": output_path.name, "error": str(e)},
)
workbook.close()
processing_time_ms = round((time.time() - start_time) * 1000, 2)
_log_info(
"excel_translation_success",
file_name=input_path.name,
sheets_processed=total_sheets,
cells_translated=cells_translated,
source_lang=source_language,
target_lang=target_language,
processing_time_ms=processing_time_ms,
)
return output_path
except ExcelProcessorError:
raise
except Exception as e:
raise ExcelProcessorError(
code=ExcelProcessorError.EXCEL_READ_ERROR,
details={"file_name": input_path.name, "error": str(e)},
)
def _validate_file(self, file_path: Path) -> None:
"""Validate file format and size."""
if not file_path.exists():
raise ExcelProcessorError(
code=ExcelProcessorError.EXCEL_READ_ERROR,
message=f"Fichier introuvable: {file_path.name}",
details={"file_name": file_path.name},
)
if file_path.suffix.lower() != ".xlsx":
raise ExcelProcessorError(
code=ExcelProcessorError.INVALID_FORMAT,
details={
"file_name": file_path.name,
"extension": file_path.suffix,
"expected": ".xlsx",
},
)
with open(file_path, "rb") as f:
header = f.read(4)
if header[:2] != self.XLSX_MAGIC_BYTES:
raise ExcelProcessorError(
code=ExcelProcessorError.INVALID_FORMAT,
details={"file_name": file_path.name, "reason": "Invalid file header"},
)
file_size_mb = file_path.stat().st_size / (1024 * 1024)
if file_size_mb > self.MAX_FILE_SIZE_MB:
raise ExcelProcessorError(
code=ExcelProcessorError.EXCEL_TOO_LARGE,
details={
"file_name": file_path.name,
"size_mb": round(file_size_mb, 2),
"max_mb": self.MAX_FILE_SIZE_MB,
},
)
def _sanitize_sheet_name(self, name: str) -> str:
"""
Sanitize a sheet name to be valid for Excel.
Excel forbids: : \\ / ? * [ ]
Max length: 31 characters
"""
invalid_chars = ":\\/?*[]"
sanitized = "".join(c if c not in invalid_chars else "_" for c in name)
return sanitized[:31]
def _batch_translate(
self, texts: List[str], target_language: str, source_language: str = "auto"
) -> List[str]:
"""
Batch translate using new provider interface.
Args:
texts: List of texts to translate
target_language: Target language code
source_language: Source language code
Returns:
List of translated texts (same order as input)
"""
if not texts:
return []
if self._provider is not None:
return self._translate_with_provider(
texts, target_language, source_language
)
return self._translate_with_legacy(texts, target_language, source_language)
def _translate_with_provider(
self, texts: List[str], target_language: str, source_language: str
) -> List[str]:
"""Translate using the TranslationProvider.translate_batch() interface."""
translated = self._provider.translate_batch(texts, target_language, source_language)
return [
t if (t and t.strip()) else orig
for t, orig in zip(translated, texts)
]
def _translate_with_legacy(
self, texts: List[str], target_language: str, source_language: str
) -> List[str]:
"""Fallback to legacy translation_service for backward compatibility."""
from services.translation_service import translation_service
_log_info(
"excel_using_legacy_service",
text_count=len(texts),
target_lang=target_language,
)
return translation_service.translate_batch(
texts, target_language, source_language
)
def _collect_from_worksheet(
self,
worksheet: Worksheet,
text_elements: List[Tuple[str, Callable[[str], None]]],
) -> None:
"""Collect all translatable text from worksheet cells."""
for row in worksheet.iter_rows():
for cell in row:
if cell.value is not None:
self._collect_from_cell(cell, text_elements)
def _collect_from_cell(self, cell: Cell, text_elements: List[Tuple[str, callable]]):
"""Collect text from a cell"""
def _collect_from_cell(
self, cell: Cell, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Collect text from a cell."""
original_value = cell.value
if original_value is None:
return
# Handle formulas - collect text inside quotes
if isinstance(original_value, str) and original_value.startswith('='):
string_pattern = re.compile(r'"([^"]*)"')
if isinstance(original_value, str) and original_value.startswith("="):
# Handle both double quotes and single quotes in formulas
# Also handles escaped quotes: "He said ""hello""" -> He said "hello"
string_pattern = re.compile(r'"((?:[^"\\]|\\.)*)"')
single_quote_pattern = re.compile(r"'((?:[^'\\]|\\.)*)'")
strings = string_pattern.findall(original_value)
strings.extend(single_quote_pattern.findall(original_value))
for s in strings:
if s.strip():
def make_formula_setter(c, orig_formula, orig_string):
def setter(translated):
c.value = orig_formula.replace(f'"{orig_string}"', f'"{translated}"')
# Escape quotes in translated text to preserve formula validity
escaped_translated = translated.replace('"', '""')
c.value = orig_formula.replace(
f'"{orig_string}"', f'"{escaped_translated}"'
)
return setter
text_elements.append((s, make_formula_setter(cell, original_value, s)))
# Handle regular text
text_elements.append(
(s, make_formula_setter(cell, original_value, s))
)
elif isinstance(original_value, str) and original_value.strip():
def make_setter(c):
def setter(text):
c.value = text
return setter
text_elements.append((original_value, make_setter(cell)))
def _translate_images(self, worksheet: Worksheet, target_language: str):
"""Translate text in images using vision model"""
from services.translation_service import OllamaTranslationProvider
if not isinstance(self.translation_service.provider, OllamaTranslationProvider):
return
def _translate_images(self, worksheet: Worksheet, target_language: str) -> None:
"""
Translate text in images using vision model.
NOTE: This method is currently NOT CALLED in translate_file() as image translation
is not part of the current story scope (Story 2.7). It is intentionally preserved
for future implementation when vision model support is prioritized.
TODO: Call this method during translate_file() when implementing image translation feature.
"""
try:
images = getattr(worksheet, '_images', [])
images = getattr(worksheet, "_images", [])
for idx, image in enumerate(images):
try:
image_data = image._data()
ext = image.format or 'png'
with tempfile.NamedTemporaryFile(suffix=f'.{ext}', delete=False) as tmp:
ext = image.format or "png"
with tempfile.NamedTemporaryFile(
suffix=f".{ext}", delete=False
) as tmp:
tmp.write(image_data)
tmp_path = tmp.name
translated_text = self.translation_service.provider.translate_image(tmp_path, target_language)
translated_text = self._translate_image_with_legacy(
tmp_path, target_language
)
os.unlink(tmp_path)
if translated_text and translated_text.strip():
anchor = image.anchor
if hasattr(anchor, '_from'):
if hasattr(anchor, "_from"):
cell_ref = f"{get_column_letter(anchor._from.col + 1)}{anchor._from.row + 1}"
cell = worksheet[cell_ref]
from openpyxl.comments import Comment
cell.comment = Comment(f"Image translation: {translated_text}", "Translator")
print(f"Added Excel image translation at {cell_ref}")
cell.comment = Comment(
f"Image translation: {translated_text}", "Translator"
)
_log_info(
"excel_image_translation_added",
cell_ref=cell_ref,
)
except Exception as e:
print(f"Error translating Excel image {idx}: {e}")
_log_error(
"excel_image_translation_error",
image_index=idx,
error=str(e),
)
except Exception as e:
print(f"Error processing Excel images: {e}")
_log_error(
"excel_image_processing_error",
error=str(e),
)
def _translate_image_with_legacy(
self, image_path: str, target_language: str
) -> str:
"""Translate image using legacy service."""
from services.translation_service import translation_service
if hasattr(translation_service, "translate_image"):
return translation_service.translate_image(image_path, target_language)
return ""
# Global translator instance
excel_translator = ExcelTranslator()

View File

@@ -2,150 +2,481 @@
PowerPoint Translation Module
Translates PowerPoint files while preserving all layouts, animations, and media
OPTIMIZED: Uses batch translation for 5-10x faster processing
Updated to use new TranslationProvider interface with structured error handling.
"""
import time
import concurrent.futures
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Callable, Any
from lxml import etree
from pptx import Presentation
from pptx.shapes.base import BaseShape
from pptx.shapes.group import GroupShape
from pptx.util import Inches, Pt
from pptx.enum.shapes import MSO_SHAPE_TYPE
from services.translation_service import translation_service
from typing import List, Tuple
import tempfile
import os
from services.providers.base import TranslationProvider
# DrawingML namespace used by pptx XML
_NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main"
# Languages written right-to-left
RTL_LANGUAGES: frozenset = frozenset(
{"ar", "he", "fa", "ur", "ku", "ps", "ug", "sd", "yi", "dv", "ckb"}
)
try:
import structlog
logger = structlog.get_logger(__name__)
_HAS_STRUCTLOG = True
except ImportError:
import logging
logger = logging.getLogger(__name__)
_HAS_STRUCTLOG = False
def _log_info(event: str, **kwargs):
"""Log info with structlog or standard logging compatibility."""
if _HAS_STRUCTLOG:
logger.info(event, **kwargs)
else:
msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items())
logger.info(msg)
def _log_error(event: str, **kwargs):
"""Log error with structlog or standard logging compatibility."""
if _HAS_STRUCTLOG:
logger.error(event, **kwargs)
else:
msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items())
logger.error(msg)
def _set_pptx_paragraph_rtl(paragraph) -> None:
"""
Enable RTL mode on a PowerPoint paragraph.
Sets rtl="1" and algn="r" on the <a:pPr> element, which controls
both text direction and horizontal alignment in DrawingML.
"""
p_elem = paragraph._p
tag_pPr = f"{{{_NS_A}}}pPr"
pPr = p_elem.find(tag_pPr)
if pPr is None:
pPr = etree.Element(tag_pPr)
p_elem.insert(0, pPr)
pPr.set("rtl", "1")
pPr.set("algn", "r")
def _apply_rtl_to_presentation(presentation: Presentation) -> None:
"""Apply RTL direction to every paragraph in all slides."""
for slide in presentation.slides:
for shape in slide.shapes:
_apply_rtl_to_shape(shape)
def _apply_rtl_to_shape(shape) -> None:
"""Recursively apply RTL to a shape (handles groups and tables)."""
if shape.has_text_frame:
for paragraph in shape.text_frame.paragraphs:
_set_pptx_paragraph_rtl(paragraph)
if shape.shape_type == MSO_SHAPE_TYPE.TABLE:
for row in shape.table.rows:
for cell in row.cells:
for paragraph in cell.text_frame.paragraphs:
_set_pptx_paragraph_rtl(paragraph)
if shape.shape_type == MSO_SHAPE_TYPE.GROUP:
for sub_shape in shape.shapes:
_apply_rtl_to_shape(sub_shape)
class PptxProcessorError(Exception):
"""Exception for PowerPoint processing errors with structured error codes."""
INVALID_FORMAT = "INVALID_FORMAT"
PPTX_CORRUPTED = "PPTX_CORRUPTED"
PPTX_READ_ERROR = "PPTX_READ_ERROR"
PPTX_WRITE_ERROR = "PPTX_WRITE_ERROR"
PPTX_TOO_LARGE = "PPTX_TOO_LARGE"
ERROR_MESSAGES = {
INVALID_FORMAT: "Format de fichier non supporte. Utilisez .pptx.",
PPTX_CORRUPTED: "Le fichier PowerPoint est corrompu ou illisible.",
PPTX_READ_ERROR: "Erreur lors de la lecture du fichier PowerPoint.",
PPTX_WRITE_ERROR: "Erreur lors de la creation du fichier traduit.",
PPTX_TOO_LARGE: "Le fichier est trop volumineux (max 50 Mo).",
}
def __init__(
self,
code: str,
message: Optional[str] = None,
details: Optional[Dict[str, Any]] = None,
):
self.code = code
self.message = message or self.ERROR_MESSAGES.get(code, "Erreur inconnue")
self.details = details or {}
super().__init__(self.message)
def to_dict(self) -> Dict[str, Any]:
"""Convert error to dictionary format for API responses."""
result = {"error": self.code, "message": self.message}
if self.details:
result["details"] = self.details
return result
class PowerPointTranslator:
"""Handles translation of PowerPoint presentations with strict formatting preservation"""
def __init__(self):
self.translation_service = translation_service
def translate_file(self, input_path: Path, output_path: Path, target_language: str) -> Path:
"""
Handles translation of PowerPoint presentations with strict formatting preservation.
Uses the new TranslationProvider interface for improved error handling
and fallback chain support.
"""
MAX_FILE_SIZE_MB = 50
PPTX_MAGIC_BYTES = b"PK" # .pptx files are ZIP archives
def __init__(self, provider: Optional[TranslationProvider] = None):
"""
Initialize PowerPointTranslator.
Args:
provider: TranslationProvider instance for translations.
If None, will use fallback to legacy translation_service.
"""
self._provider = provider
self._custom_prompt: Optional[str] = None
def set_provider(self, provider: TranslationProvider) -> None:
"""Set the translation provider."""
self._provider = provider
def set_custom_prompt(self, prompt: Optional[str]) -> None:
"""Set custom system prompt for LLM providers."""
self._custom_prompt = prompt
def translate_file(
self,
input_path: Path,
output_path: Path,
target_language: str,
source_language: str = "auto",
progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None,
) -> Path:
"""
Translate a PowerPoint presentation while preserving all formatting.
Uses batch translation for improved performance.
Args:
input_path: Path to input PowerPoint file
output_path: Path for translated output file
target_language: Target language code (e.g., 'fr', 'en')
source_language: Source language code (default: auto-detect)
progress_callback: Optional callback for progress updates
Receives dict with: slide, total_slides, runs_translated
Returns:
Path to translated file
Raises:
PptxProcessorError: If file is invalid, corrupted, or processing fails
"""
presentation = Presentation(input_path)
# Collect all translatable text elements
text_elements = [] # List of (text, setter_function)
image_shapes = [] # Collect images for separate processing
for slide_idx, slide in enumerate(presentation.slides):
# Collect from notes
if slide.has_notes_slide and slide.notes_slide.notes_text_frame:
self._collect_from_text_frame(slide.notes_slide.notes_text_frame, text_elements)
# Collect from shapes
for shape in slide.shapes:
self._collect_from_shape(shape, text_elements, slide, image_shapes)
# Batch translate all texts at once
if text_elements:
texts = [elem[0] for elem in text_elements]
print(f"Batch translating {len(texts)} text segments...")
translated_texts = self.translation_service.translate_batch(texts, target_language)
# Apply translations
for (original_text, setter), translated in zip(text_elements, translated_texts):
if translated is not None and setter is not None:
try:
setter(translated)
except Exception as e:
print(f"Error applying translation: {e}")
# Translate images if enabled (separate process, can't batch)
if getattr(self.translation_service, 'translate_images', False):
for shape, slide in image_shapes:
self._translate_image_shape(shape, target_language, slide)
presentation.save(output_path)
return output_path
def _collect_from_shape(self, shape: BaseShape, text_elements: List[Tuple[str, callable]], slide=None, image_shapes=None):
"""Collect text from a shape and its children"""
# Handle text-containing shapes
start_time = time.time()
input_path = Path(input_path)
output_path = Path(output_path)
self._validate_file(input_path)
try:
presentation = Presentation(input_path)
except Exception as e:
raise PptxProcessorError(
code=PptxProcessorError.PPTX_CORRUPTED,
details={"file_name": input_path.name, "error": str(e)},
)
try:
runs_translated = 0
total_slides = len(presentation.slides)
if progress_callback:
progress_callback(
{
"current": 0,
"total": total_slides,
"slide": 0,
"total_slides": total_slides,
"runs_translated": 0,
}
)
text_elements: List[Tuple[str, Callable[[str], None]]] = []
for slide_idx, slide in enumerate(presentation.slides):
if slide.has_notes_slide and slide.notes_slide.notes_text_frame:
self._collect_from_text_frame(
slide.notes_slide.notes_text_frame, text_elements
)
for shape in slide.shapes:
self._collect_from_shape(shape, text_elements)
if progress_callback:
progress_callback(
{
"current": slide_idx + 1,
"total": total_slides,
"slide": slide_idx + 1,
"total_slides": total_slides,
"runs_translated": runs_translated,
}
)
if text_elements:
texts = [elem[0] for elem in text_elements]
total_elements = len(texts)
_log_info(
"pptx_batch_translation_start",
file_name=input_path.name,
text_count=total_elements,
target_lang=target_language,
)
# Parallel chunk translation with real-time progress.
CHUNK_SIZE = 15
MAX_WORKERS = 6
chunks = [
(i, texts[i : i + CHUNK_SIZE])
for i in range(0, total_elements, CHUNK_SIZE)
]
translated_texts: List[str] = [""] * total_elements
completed_items = [0]
def _translate_chunk(
chunk_idx: int, chunk: List[str]
) -> Tuple[int, List[str]]:
return chunk_idx, self._batch_translate(
chunk, target_language, source_language
)
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
future_map = {
pool.submit(_translate_chunk, idx, chunk): (idx, chunk)
for idx, chunk in chunks
}
for future in concurrent.futures.as_completed(future_map):
chunk_idx, translated_chunk = future.result()
for j, t in enumerate(translated_chunk):
translated_texts[chunk_idx + j] = t
completed_items[0] += len(translated_chunk)
if progress_callback:
done = min(completed_items[0], total_elements)
progress_callback(
{
"current": done,
"total": total_elements,
"slide": done,
"total_slides": total_elements,
"runs_translated": runs_translated,
}
)
# Apply translations
for i, ((original_text, setter), translated) in enumerate(
zip(text_elements, translated_texts)
):
if translated is not None and setter is not None:
try:
setter(translated)
runs_translated += 1
except Exception as e:
_log_error(
"pptx_setter_error",
error=str(e),
index=i,
)
# Apply RTL layout when the target language is written right-to-left.
if target_language.lower() in RTL_LANGUAGES:
_apply_rtl_to_presentation(presentation)
try:
presentation.save(output_path)
except Exception as e:
raise PptxProcessorError(
code=PptxProcessorError.PPTX_WRITE_ERROR,
details={"file_name": output_path.name, "error": str(e)},
)
processing_time_ms = round((time.time() - start_time) * 1000, 2)
_log_info(
"pptx_translation_success",
file_name=input_path.name,
slides_count=total_slides,
runs_translated=runs_translated,
source_lang=source_language,
target_lang=target_language,
processing_time_ms=processing_time_ms,
)
return output_path
except PptxProcessorError:
raise
except Exception as e:
raise PptxProcessorError(
code=PptxProcessorError.PPTX_READ_ERROR,
details={"file_name": input_path.name, "error": str(e)},
)
def _validate_file(self, file_path: Path) -> None:
"""Validate file format and size."""
if not file_path.exists():
raise PptxProcessorError(
code=PptxProcessorError.PPTX_READ_ERROR,
message=f"Fichier introuvable: {file_path.name}",
details={"file_name": file_path.name},
)
if file_path.suffix.lower() != ".pptx":
raise PptxProcessorError(
code=PptxProcessorError.INVALID_FORMAT,
details={
"file_name": file_path.name,
"extension": file_path.suffix,
"expected": ".pptx",
},
)
with open(file_path, "rb") as f:
header = f.read(4)
if header[:2] != self.PPTX_MAGIC_BYTES:
raise PptxProcessorError(
code=PptxProcessorError.INVALID_FORMAT,
details={"file_name": file_path.name, "reason": "Invalid file header"},
)
file_size_mb = file_path.stat().st_size / (1024 * 1024)
if file_size_mb > self.MAX_FILE_SIZE_MB:
raise PptxProcessorError(
code=PptxProcessorError.PPTX_TOO_LARGE,
details={
"file_name": file_path.name,
"size_mb": round(file_size_mb, 2),
"max_mb": self.MAX_FILE_SIZE_MB,
},
)
def _batch_translate(
self, texts: List[str], target_language: str, source_language: str = "auto"
) -> List[str]:
"""
Batch translate using new provider interface.
Args:
texts: List of texts to translate
target_language: Target language code
source_language: Source language code
Returns:
List of translated texts (same order as input)
"""
if not texts:
return []
if self._provider is not None:
return self._translate_with_provider(
texts, target_language, source_language
)
return self._translate_with_legacy(texts, target_language, source_language)
def _translate_with_provider(
self, texts: List[str], target_language: str, source_language: str
) -> List[str]:
"""Translate using the TranslationProvider.translate_batch() interface."""
translated = self._provider.translate_batch(texts, target_language, source_language)
return [
t if (t and t.strip()) else orig
for t, orig in zip(translated, texts)
]
def _translate_with_legacy(
self, texts: List[str], target_language: str, source_language: str
) -> List[str]:
"""Fallback to legacy translation_service for backward compatibility."""
from services.translation_service import translation_service
_log_info(
"pptx_using_legacy_service",
text_count=len(texts),
target_lang=target_language,
)
return translation_service.translate_batch(
texts, target_language, source_language
)
def _collect_from_shape(
self, shape: BaseShape, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Collect text from a shape and its children."""
if shape.has_text_frame:
self._collect_from_text_frame(shape.text_frame, text_elements)
# Handle tables
if shape.shape_type == MSO_SHAPE_TYPE.TABLE:
for row in shape.table.rows:
for cell in row.cells:
self._collect_from_text_frame(cell.text_frame, text_elements)
# Handle pictures/images
if shape.shape_type == MSO_SHAPE_TYPE.PICTURE and image_shapes is not None:
image_shapes.append((shape, slide))
# Handle group shapes
if shape.shape_type == MSO_SHAPE_TYPE.GROUP:
for sub_shape in shape.shapes:
self._collect_from_shape(sub_shape, text_elements, slide, image_shapes)
# Handle smart art
if hasattr(shape, 'shapes'):
self._collect_from_shape(sub_shape, text_elements)
if hasattr(shape, "shapes"):
try:
for sub_shape in shape.shapes:
self._collect_from_shape(sub_shape, text_elements, slide, image_shapes)
except:
self._collect_from_shape(sub_shape, text_elements)
except Exception:
pass
def _collect_from_text_frame(self, text_frame, text_elements: List[Tuple[str, callable]]):
"""Collect text from a text frame"""
def _collect_from_text_frame(
self, text_frame, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Collect text from a text frame, preserving leading/trailing whitespace."""
if not text_frame.text.strip():
return
for paragraph in text_frame.paragraphs:
if not paragraph.text.strip():
continue
for run in paragraph.runs:
if run.text and run.text.strip():
def make_setter(r):
def setter(text):
r.text = text
original = run.text
leading = original[: len(original) - len(original.lstrip())]
trailing = original[len(original.rstrip()) :]
stripped = original.strip()
def make_setter(r, lead: str, trail: str):
def setter(text: str) -> None:
r.text = lead + text.strip() + trail
return setter
text_elements.append((run.text, make_setter(run)))
def _translate_image_shape(self, shape, target_language: str, slide):
"""Translate text in an image using vision model"""
from services.translation_service import OllamaTranslationProvider
if not isinstance(self.translation_service.provider, OllamaTranslationProvider):
return
try:
image_blob = shape.image.blob
ext = shape.image.ext
with tempfile.NamedTemporaryFile(suffix=f'.{ext}', delete=False) as tmp:
tmp.write(image_blob)
tmp_path = tmp.name
translated_text = self.translation_service.provider.translate_image(tmp_path, target_language)
os.unlink(tmp_path)
if translated_text and translated_text.strip():
left = shape.left
top = shape.top + shape.height + Inches(0.1)
width = shape.width
height = Inches(0.5)
textbox = slide.shapes.add_textbox(left, top, width, height)
tf = textbox.text_frame
p = tf.paragraphs[0]
p.text = f"[{translated_text}]"
p.font.size = Pt(10)
p.font.italic = True
print(f"Added image translation: {translated_text[:50]}...")
except Exception as e:
print(f"Error translating image: {e}")
text_elements.append((stripped, make_setter(run, leading, trailing)))
# Global translator instance
pptx_translator = PowerPointTranslator()

View File

@@ -2,70 +2,456 @@
Word Document Translation Module
Translates Word files while preserving all formatting, styles, tables, and images
OPTIMIZED: Uses batch translation for 5-10x faster processing
Updated to use new TranslationProvider interface with structured error handling.
"""
import time
import concurrent.futures
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Callable, Any
from docx import Document
from docx.text.paragraph import Paragraph
from docx.table import Table, _Cell
from docx.oxml.text.paragraph import CT_P
from docx.oxml.table import CT_Tbl
from docx.section import Section
from docx.shared import Inches, Pt
from docx.oxml import OxmlElement
from docx.oxml.ns import qn
from services.translation_service import translation_service
from typing import List, Tuple, Any
import tempfile
import os
from docx.section import Section
from services.providers.base import TranslationProvider
# Languages written right-to-left
RTL_LANGUAGES: frozenset = frozenset(
{"ar", "he", "fa", "ur", "ku", "ps", "ug", "sd", "yi", "dv", "ckb"}
)
try:
import structlog
logger = structlog.get_logger(__name__)
_HAS_STRUCTLOG = True
except ImportError:
import logging
logger = logging.getLogger(__name__)
_HAS_STRUCTLOG = False
def _log_info(event: str, **kwargs):
"""Log info with structlog or standard logging compatibility."""
if _HAS_STRUCTLOG:
logger.info(event, **kwargs)
else:
msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items())
logger.info(msg)
def _log_error(event: str, **kwargs):
"""Log error with structlog or standard logging compatibility."""
if _HAS_STRUCTLOG:
logger.error(event, **kwargs)
else:
msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items())
logger.error(msg)
def _set_paragraph_rtl(paragraph: Paragraph) -> None:
"""
Enable RTL mode on a paragraph and all its runs.
Sets:
- w:pPr/w:bidi → paragraph text direction = RTL
- w:pPr/w:jc → alignment = right
- w:rPr/w:rtl → run-level RTL marker for each run
"""
pPr = paragraph._p.get_or_add_pPr()
if pPr.find(qn("w:bidi")) is None:
pPr.append(OxmlElement("w:bidi"))
jc = pPr.find(qn("w:jc"))
if jc is None:
jc = OxmlElement("w:jc")
pPr.append(jc)
jc.set(qn("w:val"), "right")
for run in paragraph.runs:
rPr = run._r.get_or_add_rPr()
if rPr.find(qn("w:rtl")) is None:
rPr.append(OxmlElement("w:rtl"))
def _apply_rtl_to_document(document: Document) -> None:
"""Apply RTL direction to every paragraph and section in the document."""
# Body paragraphs
for para in document.paragraphs:
_set_paragraph_rtl(para)
# Body tables
for table in document.tables:
for row in table.rows:
for cell in row.cells:
for para in cell.paragraphs:
_set_paragraph_rtl(para)
# Headers, footers, and section-level RTL (page layout direction)
for section in document.sections:
# Set the section (page) direction to RTL so Word renders margins,
# columns and page numbering from right to left.
sectPr = section._sectPr
if sectPr.find(qn("w:bidi")) is None:
sectPr.append(OxmlElement("w:bidi"))
for hf in (section.header, section.footer):
for para in hf.paragraphs:
_set_paragraph_rtl(para)
for table in hf.tables:
for row in table.rows:
for cell in row.cells:
for para in cell.paragraphs:
_set_paragraph_rtl(para)
class WordProcessorError(Exception):
"""Exception for Word processing errors with structured error codes."""
INVALID_FORMAT = "INVALID_FORMAT"
DOCX_CORRUPTED = "DOCX_CORRUPTED"
DOCX_READ_ERROR = "DOCX_READ_ERROR"
DOCX_WRITE_ERROR = "DOCX_WRITE_ERROR"
DOCX_TOO_LARGE = "DOCX_TOO_LARGE"
ERROR_MESSAGES = {
INVALID_FORMAT: "Format de fichier non supporte. Utilisez .docx.",
DOCX_CORRUPTED: "Le document Word est corrompu ou illisible.",
DOCX_READ_ERROR: "Erreur lors de la lecture du document Word.",
DOCX_WRITE_ERROR: "Erreur lors de la creation du document traduit.",
DOCX_TOO_LARGE: "Le fichier est trop volumineux (max 50 Mo).",
}
def __init__(
self,
code: str,
message: Optional[str] = None,
details: Optional[Dict[str, Any]] = None,
):
self.code = code
self.message = message or self.ERROR_MESSAGES.get(code, "Erreur inconnue")
self.details = details or {}
super().__init__(self.message)
def to_dict(self) -> Dict[str, Any]:
"""Convert error to dictionary format for API responses."""
result = {"error": self.code, "message": self.message}
if self.details:
result["details"] = self.details
return result
class WordTranslator:
"""Handles translation of Word documents with strict formatting preservation"""
def __init__(self):
self.translation_service = translation_service
def translate_file(self, input_path: Path, output_path: Path, target_language: str) -> Path:
"""
Handles translation of Word documents with strict formatting preservation.
Uses the new TranslationProvider interface for improved error handling
and fallback chain support.
"""
MAX_FILE_SIZE_MB = 50
DOCX_MAGIC_BYTES = b"PK" # .docx files are ZIP archives
def __init__(self, provider: Optional[TranslationProvider] = None):
"""
Initialize WordTranslator.
Args:
provider: TranslationProvider instance for translations.
If None, will use fallback to legacy translation_service.
"""
self._provider = provider
self._custom_prompt: Optional[str] = None
def set_provider(self, provider: TranslationProvider) -> None:
"""Set the translation provider."""
self._provider = provider
def set_custom_prompt(self, prompt: Optional[str]) -> None:
"""Set custom system prompt for LLM providers."""
self._custom_prompt = prompt
def translate_file(
self,
input_path: Path,
output_path: Path,
target_language: str,
source_language: str = "auto",
progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None,
) -> Path:
"""
Translate a Word document while preserving all formatting and structure.
Uses batch translation for improved performance.
Args:
input_path: Path to input Word file
output_path: Path for translated output file
target_language: Target language code (e.g., 'fr', 'en')
source_language: Source language code (default: auto-detect)
progress_callback: Optional callback for progress updates
Receives dict with: element, total_elements, runs_translated
Returns:
Path to translated file
Raises:
WordProcessorError: If file is invalid, corrupted, or processing fails
"""
document = Document(input_path)
# Collect all translatable text elements
text_elements = []
# Collect from document body
self._collect_from_body(document, text_elements)
# Collect from headers and footers
for section in document.sections:
self._collect_from_section(section, text_elements)
# Batch translate all texts at once
if text_elements:
texts = [elem[0] for elem in text_elements]
print(f"Batch translating {len(texts)} text segments...")
translated_texts = self.translation_service.translate_batch(texts, target_language)
# Apply translations
for (original_text, setter), translated in zip(text_elements, translated_texts):
if translated is not None and translated != original_text:
try:
setter(translated)
except Exception as e:
print(f"Error applying translation: {e}")
# Translate images if enabled (separate process)
if getattr(self.translation_service, 'translate_images', False):
self._translate_images(document, target_language, input_path)
# Save the translated document
document.save(output_path)
return output_path
def _collect_from_body(self, document: Document, text_elements: List[Tuple[str, callable]]):
"""Collect all text elements from document body"""
start_time = time.time()
input_path = Path(input_path)
output_path = Path(output_path)
self._validate_file(input_path)
try:
document = Document(input_path)
except Exception as e:
raise WordProcessorError(
code=WordProcessorError.DOCX_CORRUPTED,
details={"file_name": input_path.name, "error": str(e)},
)
try:
runs_translated = 0
text_elements: List[Tuple[str, Callable[[str], None]]] = []
self._collect_from_body(document, text_elements)
total_sections = len(document.sections)
total_elements = 0
for section_idx, section in enumerate(document.sections):
self._collect_from_section(section, text_elements)
total_elements = len(text_elements)
if progress_callback:
progress_callback(
{
"current": section_idx + 1,
"total": total_sections,
"paragraph": section_idx + 1,
"total_paragraphs": total_sections,
"runs_translated": runs_translated,
"phase": "collecting",
}
)
if text_elements:
texts = [elem[0] for elem in text_elements]
total_elements = len(text_elements)
_log_info(
"word_batch_translation_start",
file_name=input_path.name,
text_count=len(texts),
target_lang=target_language,
)
# Split into chunks and translate them IN PARALLEL using a thread
# pool. Each worker handles one chunk independently, making
# full use of available CPU/network concurrency. Progress is
# reported as chunks complete (out-of-order completions are
# fine — the tracker only moves forward).
CHUNK_SIZE = 15
MAX_WORKERS = 6
chunks = [
(i, texts[i : i + CHUNK_SIZE])
for i in range(0, total_elements, CHUNK_SIZE)
]
translated_texts: List[str] = [""] * total_elements
completed_items = [0] # mutable counter shared across threads
def _translate_chunk(
chunk_idx: int, chunk: List[str]
) -> Tuple[int, List[str]]:
result = self._batch_translate(chunk, target_language, source_language)
return chunk_idx, result
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
future_map = {
pool.submit(_translate_chunk, idx, chunk): (idx, chunk)
for idx, chunk in chunks
}
for future in concurrent.futures.as_completed(future_map):
chunk_idx, translated_chunk = future.result()
start = chunk_idx
for j, t in enumerate(translated_chunk):
translated_texts[start + j] = t
completed_items[0] += len(translated_chunk)
if progress_callback:
done = min(completed_items[0], total_elements)
progress_callback(
{
"current": done,
"total": total_elements,
"paragraph": done,
"total_paragraphs": total_elements,
"runs_translated": runs_translated,
"phase": "translating",
}
)
# Apply translations (fast — just text assignment)
for i, ((original_text, setter), translated) in enumerate(
zip(text_elements, translated_texts)
):
if translated is not None and setter is not None:
try:
setter(translated)
runs_translated += 1
except Exception as e:
_log_error(
"word_setter_error",
error=str(e),
index=i,
)
# Apply RTL layout when the target language is written right-to-left.
if target_language.lower() in RTL_LANGUAGES:
_apply_rtl_to_document(document)
if progress_callback:
progress_callback(
{
"current": total_elements if text_elements else total_sections,
"total": total_elements if text_elements else total_sections,
"paragraph": total_sections,
"total_paragraphs": total_sections,
"runs_translated": runs_translated,
"phase": "complete",
}
)
try:
document.save(output_path)
except Exception as e:
raise WordProcessorError(
code=WordProcessorError.DOCX_WRITE_ERROR,
details={"file_name": output_path.name, "error": str(e)},
)
processing_time_ms = round((time.time() - start_time) * 1000, 2)
_log_info(
"word_translation_success",
file_name=input_path.name,
runs_translated=runs_translated,
source_lang=source_language,
target_lang=target_language,
processing_time_ms=processing_time_ms,
)
return output_path
except WordProcessorError:
raise
except Exception as e:
raise WordProcessorError(
code=WordProcessorError.DOCX_READ_ERROR,
details={"file_name": input_path.name, "error": str(e)},
)
def _validate_file(self, file_path: Path) -> None:
"""Validate file format and size."""
if not file_path.exists():
raise WordProcessorError(
code=WordProcessorError.DOCX_READ_ERROR,
message=f"Fichier introuvable: {file_path.name}",
details={"file_name": file_path.name},
)
if file_path.suffix.lower() != ".docx":
raise WordProcessorError(
code=WordProcessorError.INVALID_FORMAT,
details={
"file_name": file_path.name,
"extension": file_path.suffix,
"expected": ".docx",
},
)
with open(file_path, "rb") as f:
header = f.read(4)
if header[:2] != self.DOCX_MAGIC_BYTES:
raise WordProcessorError(
code=WordProcessorError.INVALID_FORMAT,
details={"file_name": file_path.name, "reason": "Invalid file header"},
)
file_size_mb = file_path.stat().st_size / (1024 * 1024)
if file_size_mb > self.MAX_FILE_SIZE_MB:
raise WordProcessorError(
code=WordProcessorError.DOCX_TOO_LARGE,
details={
"file_name": file_path.name,
"size_mb": round(file_size_mb, 2),
"max_mb": self.MAX_FILE_SIZE_MB,
},
)
def _batch_translate(
self, texts: List[str], target_language: str, source_language: str = "auto"
) -> List[str]:
"""
Batch translate using new provider interface.
Args:
texts: List of texts to translate
target_language: Target language code
source_language: Source language code
Returns:
List of translated texts (same order as input)
"""
if not texts:
return []
if self._provider is not None:
return self._translate_with_provider(
texts, target_language, source_language
)
return self._translate_with_legacy(texts, target_language, source_language)
def _translate_with_provider(
self, texts: List[str], target_language: str, source_language: str
) -> List[str]:
"""Translate using the TranslationProvider.translate_batch() interface."""
translated = self._provider.translate_batch(texts, target_language, source_language)
# Fallback: keep original text for any empty/failed result
return [
t if (t and t.strip()) else orig
for t, orig in zip(translated, texts)
]
def _translate_with_legacy(
self, texts: List[str], target_language: str, source_language: str
) -> List[str]:
"""Fallback to legacy translation_service for backward compatibility."""
from services.translation_service import translation_service
_log_info(
"word_using_legacy_service",
text_count=len(texts),
target_lang=target_language,
)
return translation_service.translate_batch(
texts, target_language, source_language
)
def _collect_from_body(
self, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Collect all text elements from document body."""
for element in document.element.body:
if isinstance(element, CT_P):
paragraph = Paragraph(element, document)
@@ -73,84 +459,69 @@ class WordTranslator:
elif isinstance(element, CT_Tbl):
table = Table(element, document)
self._collect_from_table(table, text_elements)
def _collect_from_paragraph(self, paragraph: Paragraph, text_elements: List[Tuple[str, callable]]):
"""Collect text from paragraph runs"""
def _collect_from_paragraph(
self,
paragraph: Paragraph,
text_elements: List[Tuple[str, Callable[[str], None]]],
) -> None:
"""Collect text from paragraph runs, preserving inter-run whitespace.
Each run is sent for translation WITHOUT its surrounding whitespace.
The whitespace is captured and reapplied after translation so that words
at formatting boundaries (e.g. bold/normal) do not get concatenated.
"""
if not paragraph.text.strip():
return
for run in paragraph.runs:
if run.text and run.text.strip():
# Create a setter function for this run
def make_setter(r):
def setter(text):
r.text = text
original = run.text
# Capture leading/trailing whitespace that must survive translation.
leading = original[: len(original) - len(original.lstrip())]
trailing = original[len(original.rstrip()) :]
stripped = original.strip()
def make_setter(r, lead: str, trail: str):
def setter(text: str) -> None:
# Strip any whitespace the translator may have added/removed
# and reapply the original boundary whitespace.
r.text = lead + text.strip() + trail
return setter
text_elements.append((run.text, make_setter(run)))
def _collect_from_table(self, table: Table, text_elements: List[Tuple[str, callable]]):
"""Collect text from table cells"""
text_elements.append((stripped, make_setter(run, leading, trailing)))
def _collect_from_table(
self, table: Table, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Collect text from table cells."""
for row in table.rows:
for cell in row.cells:
for paragraph in cell.paragraphs:
self._collect_from_paragraph(paragraph, text_elements)
# Handle nested tables
for nested_table in cell.tables:
self._collect_from_table(nested_table, text_elements)
def _collect_from_section(self, section: Section, text_elements: List[Tuple[str, callable]]):
"""Collect text from headers and footers"""
def _collect_from_section(
self, section: Section, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Collect text from headers and footers."""
headers_footers = [
section.header, section.footer,
section.first_page_header, section.first_page_footer,
section.even_page_header, section.even_page_footer
section.header,
section.footer,
section.first_page_header,
section.first_page_footer,
section.even_page_header,
section.even_page_footer,
]
for hf in headers_footers:
if hf:
for paragraph in hf.paragraphs:
self._collect_from_paragraph(paragraph, text_elements)
for table in hf.tables:
self._collect_from_table(table, text_elements)
def _translate_images(self, document: Document, target_language: str, input_path: Path):
"""Extract text from images and add translations as captions"""
from services.translation_service import OllamaTranslationProvider
if not isinstance(self.translation_service.provider, OllamaTranslationProvider):
return
try:
import zipfile
import base64
with zipfile.ZipFile(input_path, 'r') as zip_ref:
image_files = [f for f in zip_ref.namelist() if f.startswith('word/media/')]
for idx, image_file in enumerate(image_files):
try:
image_data = zip_ref.read(image_file)
ext = os.path.splitext(image_file)[1]
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp:
tmp.write(image_data)
tmp_path = tmp.name
translated_text = self.translation_service.provider.translate_image(tmp_path, target_language)
os.unlink(tmp_path)
if translated_text and translated_text.strip():
p = document.add_paragraph()
p.add_run(f"[Image {idx + 1} translation: ").bold = True
p.add_run(translated_text)
p.add_run("]").bold = True
print(f"Translated image {idx + 1}: {translated_text[:50]}...")
except Exception as e:
print(f"Error translating image {image_file}: {e}")
except Exception as e:
print(f"Error processing images: {e}")
# Global translator instance
word_translator = WordTranslator()