""" Word Document Translation Module Translates Word files while preserving all formatting, styles, tables, and images OPTIMIZED: Uses batch translation for 5-10x faster processing Updated to use new TranslationProvider interface with structured error handling. """ import time import zipfile import io import concurrent.futures from pathlib import Path from typing import Dict, List, Tuple, Optional, Callable, Any from docx import Document from docx.text.paragraph import Paragraph from docx.text.run import Run from docx.table import Table, _Cell from docx.oxml.text.paragraph import CT_P from docx.oxml.table import CT_Tbl from docx.oxml import OxmlElement from docx.oxml.ns import qn from docx.section import Section from lxml import etree from services.providers.base import TranslationProvider # Languages written right-to-left RTL_LANGUAGES: frozenset = frozenset( {"ar", "he", "fa", "ur", "ku", "ps", "ug", "sd", "yi", "dv", "ckb"} ) from core.logging import get_logger logger = get_logger(__name__) _HAS_STRUCTLOG = True def _log_info(event: str, **kwargs): """Log info with structlog or standard logging compatibility.""" if _HAS_STRUCTLOG: logger.info(event, **kwargs) else: msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items()) logger.info(msg) def _log_error(event: str, **kwargs): """Log error with structlog or standard logging compatibility.""" if _HAS_STRUCTLOG: logger.error(event, **kwargs) else: msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items()) logger.error(msg) def _set_paragraph_rtl(paragraph: Paragraph) -> None: """ Enable RTL mode on a paragraph and all its runs. Sets: - w:pPr/w:bidi → paragraph text direction = RTL - w:pPr/w:jc → alignment = right - w:rPr/w:rtl → run-level RTL marker for each run """ pPr = paragraph._p.get_or_add_pPr() if pPr.find(qn("w:bidi")) is None: pPr.append(OxmlElement("w:bidi")) jc = pPr.find(qn("w:jc")) if jc is None: jc = OxmlElement("w:jc") pPr.append(jc) jc.set(qn("w:val"), "right") for run in paragraph.runs: rPr = run._r.get_or_add_rPr() if rPr.find(qn("w:rtl")) is None: rPr.append(OxmlElement("w:rtl")) def _apply_rtl_to_document(document: Document) -> None: """Apply RTL direction to every paragraph and section in the document.""" # Body paragraphs for para in document.paragraphs: _set_paragraph_rtl(para) # Body tables for table in document.tables: for row in table.rows: for cell in row.cells: for para in cell.paragraphs: _set_paragraph_rtl(para) # Headers, footers, and section-level RTL (page layout direction) for section in document.sections: # Set the section (page) direction to RTL so Word renders margins, # columns and page numbering from right to left. sectPr = section._sectPr if sectPr.find(qn("w:bidi")) is None: sectPr.append(OxmlElement("w:bidi")) for hf in (section.header, section.footer): for para in hf.paragraphs: _set_paragraph_rtl(para) for table in hf.tables: for row in table.rows: for cell in row.cells: for para in cell.paragraphs: _set_paragraph_rtl(para) class WordProcessorError(Exception): """Exception for Word processing errors with structured error codes.""" INVALID_FORMAT = "INVALID_FORMAT" DOCX_CORRUPTED = "DOCX_CORRUPTED" DOCX_READ_ERROR = "DOCX_READ_ERROR" DOCX_WRITE_ERROR = "DOCX_WRITE_ERROR" DOCX_TOO_LARGE = "DOCX_TOO_LARGE" ERROR_MESSAGES = { INVALID_FORMAT: "Format de fichier non supporte. Utilisez .docx.", DOCX_CORRUPTED: "Le document Word est corrompu ou illisible.", DOCX_READ_ERROR: "Erreur lors de la lecture du document Word.", DOCX_WRITE_ERROR: "Erreur lors de la creation du document traduit.", DOCX_TOO_LARGE: "Le fichier est trop volumineux (max 50 Mo).", } def __init__( self, code: str, message: Optional[str] = None, details: Optional[Dict[str, Any]] = None, ): self.code = code self.message = message or self.ERROR_MESSAGES.get(code, "Erreur inconnue") self.details = details or {} super().__init__(self.message) def to_dict(self) -> Dict[str, Any]: """Convert error to dictionary format for API responses.""" result = {"error": self.code, "message": self.message} if self.details: result["details"] = self.details return result class WordTranslator: """ Handles translation of Word documents with strict formatting preservation. Uses the new TranslationProvider interface for improved error handling and fallback chain support. """ MAX_FILE_SIZE_MB = 50 DOCX_MAGIC_BYTES = b"PK" # .docx files are ZIP archives # Namespace URIs not registered in python-docx's nsmap _NS_MC = "http://schemas.openxmlformats.org/markup-compatibility/2006" _TAG_ALT_CONTENT = f"{{{_NS_MC}}}AlternateContent" def __init__(self, provider: Optional[TranslationProvider] = None): """ Initialize WordTranslator. Args: provider: TranslationProvider instance for translations. If None, will use fallback to legacy translation_service. """ self._provider = provider self._custom_prompt: Optional[str] = None self._translation_stats = {"attempted": 0, "changed": 0} def set_provider(self, provider: TranslationProvider) -> None: """Set the translation provider.""" self._provider = provider def set_custom_prompt(self, prompt: Optional[str]) -> None: """Set custom system prompt for LLM providers.""" self._custom_prompt = prompt def translate_file( self, input_path: Path, output_path: Path, target_language: str, source_language: str = "auto", progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None, translate_images: bool = False, ) -> Path: """ Translate a Word document while preserving all formatting and structure. Uses batch translation for improved performance. Args: input_path: Path to input Word file output_path: Path for translated output file target_language: Target language code (e.g., 'fr', 'en') source_language: Source language code (default: auto-detect) progress_callback: Optional callback for progress updates Receives dict with: element, total_elements, runs_translated Returns: Path to translated file Raises: WordProcessorError: If file is invalid, corrupted, or processing fails """ start_time = time.time() input_path = Path(input_path) output_path = Path(output_path) self._validate_file(input_path) try: document = Document(input_path) except Exception as e: raise WordProcessorError( code=WordProcessorError.DOCX_CORRUPTED, details={"file_name": input_path.name, "error": str(e)}, ) try: runs_translated = 0 text_elements: List[Tuple[str, Callable[[str], None]]] = [] chart_translations: List[Dict[str, Any]] = [] diagram_translations: List[Dict[str, Any]] = [] self._collect_from_body(document, text_elements) # Collect chart text from ZIP (chart titles, axis labels, series names) self._collect_charts_from_zip(input_path, text_elements, chart_translations) # Collect SmartArt/diagram text from ZIP self._collect_diagrams_from_zip(input_path, text_elements, diagram_translations) total_sections = len(document.sections) total_elements = 0 for section_idx, section in enumerate(document.sections): self._collect_from_section(section, text_elements) total_elements = len(text_elements) if progress_callback: progress_callback( { "current": section_idx + 1, "total": total_sections, "paragraph": section_idx + 1, "total_paragraphs": total_sections, "runs_translated": runs_translated, "phase": "collecting", } ) if text_elements: texts = [elem[0] for elem in text_elements] total_elements = len(text_elements) _log_info( "word_batch_translation_start", file_name=input_path.name, text_count=len(texts), target_lang=target_language, ) # Split into chunks and translate them IN PARALLEL using a thread # pool. Each worker handles one chunk independently, making # full use of available CPU/network concurrency. Progress is # reported as chunks complete (out-of-order completions are # fine — the tracker only moves forward). CHUNK_SIZE = 15 MAX_WORKERS = 6 chunks = [ (i, texts[i : i + CHUNK_SIZE]) for i in range(0, total_elements, CHUNK_SIZE) ] translated_texts: List[str] = [""] * total_elements completed_items = [0] # mutable counter shared across threads def _translate_chunk( chunk_idx: int, chunk: List[str] ) -> Tuple[int, List[str]]: result = self._batch_translate(chunk, target_language, source_language) return chunk_idx, result with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool: future_map = { pool.submit(_translate_chunk, idx, chunk): (idx, chunk) for idx, chunk in chunks } for future in concurrent.futures.as_completed(future_map): chunk_idx, translated_chunk = future.result() start = chunk_idx for j, t in enumerate(translated_chunk): translated_texts[start + j] = t completed_items[0] += len(translated_chunk) if progress_callback: done = min(completed_items[0], total_elements) progress_callback( { "current": done, "total": total_elements, "paragraph": done, "total_paragraphs": total_elements, "runs_translated": runs_translated, "phase": "translating", } ) # Apply translations (fast — just text assignment) for i, ((original_text, setter), translated) in enumerate( zip(text_elements, translated_texts) ): if translated is not None and setter is not None: try: setter(translated) runs_translated += 1 except Exception as e: _log_error( "word_setter_error", error=str(e), index=i, ) # Apply RTL layout when the target language is written right-to-left. if target_language.lower() in RTL_LANGUAGES: _apply_rtl_to_document(document) if progress_callback: progress_callback( { "current": total_elements if text_elements else total_sections, "total": total_elements if text_elements else total_sections, "paragraph": total_sections, "total_paragraphs": total_sections, "runs_translated": runs_translated, "phase": "complete", } ) if translate_images: try: self._translate_images(document, target_language) except Exception as e: _log_error("word_document_images_failed", error=str(e)) try: document.save(output_path) except Exception as e: raise WordProcessorError( code=WordProcessorError.DOCX_WRITE_ERROR, details={"file_name": output_path.name, "error": str(e)}, ) # Re-inject chart translations into the saved .docx ZIP if chart_translations: self._apply_chart_translations(input_path, output_path, chart_translations) # Re-inject SmartArt/diagram translations into the saved .docx ZIP if diagram_translations: self._apply_diagram_translations(output_path, diagram_translations) processing_time_ms = round((time.time() - start_time) * 1000, 2) _log_info( "word_translation_success", file_name=input_path.name, runs_translated=runs_translated, source_lang=source_language, target_lang=target_language, processing_time_ms=processing_time_ms, ) return output_path except WordProcessorError: raise except Exception as e: import traceback _log_error( "word_translation_unexpected_error", file_name=input_path.name, error=str(e), traceback=traceback.format_exc(), ) raise WordProcessorError( code=WordProcessorError.DOCX_READ_ERROR, details={"file_name": input_path.name, "error": str(e)}, ) def _validate_file(self, file_path: Path) -> None: """Validate file format and size.""" if not file_path.exists(): raise WordProcessorError( code=WordProcessorError.DOCX_READ_ERROR, message=f"Fichier introuvable: {file_path.name}", details={"file_name": file_path.name}, ) if file_path.suffix.lower() != ".docx": raise WordProcessorError( code=WordProcessorError.INVALID_FORMAT, details={ "file_name": file_path.name, "extension": file_path.suffix, "expected": ".docx", }, ) with open(file_path, "rb") as f: header = f.read(4) if header[:2] != self.DOCX_MAGIC_BYTES: raise WordProcessorError( code=WordProcessorError.INVALID_FORMAT, details={"file_name": file_path.name, "reason": "Invalid file header"}, ) file_size_mb = file_path.stat().st_size / (1024 * 1024) if file_size_mb > self.MAX_FILE_SIZE_MB: raise WordProcessorError( code=WordProcessorError.DOCX_TOO_LARGE, details={ "file_name": file_path.name, "size_mb": round(file_size_mb, 2), "max_mb": self.MAX_FILE_SIZE_MB, }, ) def _batch_translate( self, texts: List[str], target_language: str, source_language: str = "auto" ) -> List[str]: """ Batch translate using new provider interface. Args: texts: List of texts to translate target_language: Target language code source_language: Source language code Returns: List of translated texts (same order as input) """ if not texts: return [] non_empty = [t for t in texts if t and t.strip()] self._translation_stats["attempted"] += len(non_empty) if self._provider is not None: translated = self._translate_with_provider( texts, target_language, source_language ) else: translated = self._translate_with_legacy(texts, target_language, source_language) changed = sum(1 for orig, trans in zip(texts, translated) if orig != trans and trans.strip()) self._translation_stats["changed"] += changed return translated def get_translation_stats(self) -> dict: return dict(self._translation_stats) def _translate_with_provider( self, texts: List[str], target_language: str, source_language: str ) -> List[str]: """Translate using the TranslationProvider.translate_batch() interface.""" from services.providers.base import TranslationProvider as NewTranslationProvider is_new_style = False if isinstance(self._provider, NewTranslationProvider): is_new_style = True elif hasattr(self._provider, "__class__") and self._provider.__class__.__name__ in ( "MockTranslationProvider", "Mock", "MagicMock", ): is_new_style = True if is_new_style: from services.providers.schemas import TranslationRequest custom_prompt = getattr(self, "_custom_prompt", None) metadata = {"custom_prompt": custom_prompt} if custom_prompt else None requests = [ TranslationRequest( text=t, target_language=target_language, source_language=source_language, metadata=metadata, ) for t in texts ] responses = self._provider.translate_batch(requests) translated = [resp.translated_text for resp in responses] else: translated = self._provider.translate_batch(texts, target_language, source_language) # Fallback: keep original text for any empty/failed result return [ t if (t and t.strip()) else orig for t, orig in zip(translated, texts) ] def _translate_with_legacy( self, texts: List[str], target_language: str, source_language: str ) -> List[str]: """Fallback to legacy translation_service for backward compatibility.""" from services.translation_service import translation_service _log_info( "word_using_legacy_service", text_count=len(texts), target_lang=target_language, ) return translation_service.translate_batch( texts, target_language, source_language ) def _collect_from_body( self, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]] ) -> None: """Collect all text elements from document body. Handles: paragraphs, tables, SDT (TOC/index), text boxes, shapes, AlternateContent blocks, and any nested drawing elements. """ count_before = len(text_elements) # Pass 1: walk direct body children for element in document.element.body: self._collect_from_element(element, document, text_elements) pass1_count = len(text_elements) - count_before # Pass 2: find ALL in the entire body XML tree. # Text boxes / rectangles / shapes store their text here, nested deep # inside or # inside . self._collect_from_textboxes(document.element.body, document, text_elements) pass2_count = len(text_elements) - count_before - pass1_count # Pass 3: footnotes and endnotes self._collect_from_footnotes(document, text_elements) self._collect_from_endnotes(document, text_elements) total = len(text_elements) - count_before _log_info( "word_collection_summary", body_runs=pass1_count, textbox_runs=pass2_count, total_collected=total, ) def _collect_from_element( self, element, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]] ) -> None: """Recursively collect from any element type.""" if isinstance(element, CT_P): paragraph = Paragraph(element, document) self._collect_from_paragraph(paragraph, text_elements) elif isinstance(element, CT_Tbl): table = Table(element, document) self._collect_from_table(table, text_elements) elif element.tag == qn("w:sdt"): self._collect_from_sdt(element, document, text_elements) elif element.tag == self._TAG_ALT_CONTENT: # wraps drawing/shape content for part in element: self._collect_from_element(part, document, text_elements) else: # For any other container element, recurse into children # to catch paragraphs nested in unexpected wrappers for child in element: if isinstance(child, CT_P): paragraph = Paragraph(child, document) self._collect_from_paragraph(paragraph, text_elements) elif isinstance(child, CT_Tbl): table = Table(child, document) self._collect_from_table(table, text_elements) def _collect_from_textboxes( self, root, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]] ) -> None: """Find and collect text from ALL elements in the XML tree. This catches text in: - Rectangles / rounded rectangles / any shape with text - Text boxes - Callouts - WordArt (if it has text content) - Shapes nested in blocks The element contains regular paragraphs with runs, just like normal body text. """ # Find all w:txbxContent elements anywhere in the tree for txbx in root.iter(qn("w:txbxContent")): for child in txbx: if isinstance(child, CT_P): paragraph = Paragraph(child, document) self._collect_from_paragraph(paragraph, text_elements) elif isinstance(child, CT_Tbl): table = Table(child, document) self._collect_from_table(table, text_elements) def _collect_from_sdt( self, sdt_element, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]] ) -> None: """Collect text from Structured Document Tags (TOC, index, content controls). SDT XML structure: ... ... ... """ sdt_content = sdt_element.find(qn("w:sdtContent")) if sdt_content is None: return for child in sdt_content: if isinstance(child, CT_P): paragraph = Paragraph(child, document) self._collect_from_paragraph(paragraph, text_elements) elif isinstance(child, CT_Tbl): table = Table(child, document) self._collect_from_table(table, text_elements) def _collect_from_footnotes( self, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]] ) -> None: """Collect text from footnotes.""" try: footnotes_part = document.part.package.part_related_by( "http://schemas.openxmlformats.org/officeDocument/2006/relationships/footnotes" ) if hasattr(document.part, 'package') else None except Exception: footnotes_part = None if footnotes_part is None: # Fallback: try direct XML access try: footnotes_element = document.element.find(qn("w:footnotes")) if footnotes_element is not None: for child in footnotes_element: if isinstance(child, CT_P): paragraph = Paragraph(child, document) self._collect_from_paragraph(paragraph, text_elements) except Exception: pass return try: footnotes_xml = etree.fromstring(footnotes_part.blob) for child in footnotes_xml: if child.tag == qn("w:footnote"): for para_elem in child.findall(qn("w:p")): paragraph = Paragraph(para_elem, document) self._collect_from_paragraph(paragraph, text_elements) except Exception as e: _log_error("word_footnotes_parse_error", error=str(e)) def _collect_from_endnotes( self, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]] ) -> None: """Collect text from endnotes.""" try: endnotes_part = document.part.package.part_related_by( "http://schemas.openxmlformats.org/officeDocument/2006/relationships/endnotes" ) if hasattr(document.part, 'package') else None except Exception: endnotes_part = None if endnotes_part is None: try: endnotes_element = document.element.find(qn("w:endnotes")) if endnotes_element is not None: for child in endnotes_element: if isinstance(child, CT_P): paragraph = Paragraph(child, document) self._collect_from_paragraph(paragraph, text_elements) except Exception: pass return try: endnotes_xml = etree.fromstring(endnotes_part.blob) for child in endnotes_xml: if child.tag == qn("w:endnote"): for para_elem in child.findall(qn("w:p")): paragraph = Paragraph(para_elem, document) self._collect_from_paragraph(paragraph, text_elements) except Exception as e: _log_error("word_endnotes_parse_error", error=str(e)) def _collect_from_charts( self, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]] ) -> None: """Collect text from embedded charts (chart titles, axis labels, series names). Charts are stored as separate XML parts in the .docx ZIP archive. The chart XML uses DrawingML namespaces for text content. """ _NS_C = "http://schemas.openxmlformats.org/drawingml/2006/chart" _NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main" try: # Access the raw ZIP to find chart parts docx_path = document.part.package.main_document_part.partname package = document.part.package # Find all chart relationship targets for rel_type, rels in (package.rels or {}).items(): pass # python-docx doesn't expose this cleanly except Exception: pass # More reliable: open the .docx as a ZIP and parse chart XML directly try: # Get the original file path from the document input_file = None # Try to recover the file path — document object doesn't store it directly # We'll handle charts in translate_file() instead where we have the path pass except Exception: pass def _collect_charts_from_zip( self, input_path: Path, text_elements: List[Tuple[str, Callable[[str], None]]], chart_translations: List[Dict[str, Any]] ) -> None: """Parse chart XML from the .docx ZIP and collect translatable text. Args: input_path: Path to the .docx file text_elements: List to append (text, setter) tuples chart_translations: List to store chart translation metadata for later re-injection """ _NS_C = "http://schemas.openxmlformats.org/drawingml/2006/chart" _NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main" try: with zipfile.ZipFile(input_path, 'r') as zf: chart_files = [name for name in zf.namelist() if name.startswith('word/charts/') and name.endswith('.xml')] for chart_file in chart_files: try: chart_xml = etree.fromstring(zf.read(chart_file)) # Collect from or for tag in ['c:title', 'c:cat', 'c:val']: for parent_elem in chart_xml.iter(f'{{{ _NS_C }}}{tag}' if not tag.startswith('{') else tag): # Direct rich text: for t_elem in parent_elem.iter(f'{{{_NS_A}}}t'): if t_elem.text and t_elem.text.strip(): # Store reference for setter entry = { 'chart_file': chart_file, 'element_path': self._get_element_path(t_elem), 'original': t_elem.text.strip(), } chart_translations.append(entry) def make_chart_setter(entries, idx): def setter(text): entries[idx]['translated'] = text.strip() return setter text_elements.append( (t_elem.text.strip(), make_chart_setter(chart_translations, len(chart_translations) - 1)) ) # Series names in or for ser_elem in chart_xml.iter(f'{{{_NS_C}}}ser'): for v_elem in ser_elem.iter(f'{{{_NS_C}}}v'): if v_elem.text and v_elem.text.strip() and not v_elem.text.strip().replace('.', '').replace('-', '').isdigit(): entry = { 'chart_file': chart_file, 'element_path': self._get_element_path(v_elem), 'original': v_elem.text.strip(), } chart_translations.append(entry) def make_chart_val_setter(entries, idx): def setter(text): entries[idx]['translated'] = text.strip() return setter text_elements.append( (v_elem.text.strip(), make_chart_val_setter(chart_translations, len(chart_translations) - 1)) ) except Exception as e: _log_error("word_chart_parse_error", chart_file=chart_file, error=str(e)) except Exception as e: _log_error("word_charts_zip_error", error=str(e)) def _get_element_path(self, element) -> str: """Get a unique XPath-like path for an element within its document.""" path_parts = [] current = element while current is not None: parent = current.getparent() if parent is None: break idx = list(parent).index(current) tag = current.tag.split('}')[-1] if '}' in current.tag else current.tag path_parts.append(f"{tag}[{idx}]") current = parent return '/'.join(reversed(path_parts)) def _apply_chart_translations(self, input_path: Path, output_path: Path, chart_translations: List[Dict[str, Any]]) -> None: """Re-inject chart translations into the .docx ZIP. Modifies chart XML files in-place and rewrites the ZIP. """ if not chart_translations: return # Only proceed if at least one translation exists translated_entries = [e for e in chart_translations if 'translated' in e and e['translated']] if not translated_entries: return _NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main" _NS_C = "http://schemas.openxmlformats.org/drawingml/2006/chart" # Group by chart file chart_files_to_update: Dict[str, List[Dict]] = {} for entry in translated_entries: cf = entry['chart_file'] if cf not in chart_files_to_update: chart_files_to_update[cf] = [] chart_files_to_update[cf].append(entry) try: # Read all ZIP entries with zipfile.ZipFile(output_path, 'r') as zf_in: existing_entries = zf_in.namelist() # Create new ZIP in memory buf = io.BytesIO() with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf_out: for item in existing_entries: data = zf_in.read(item) if item in chart_files_to_update: # Parse, update, re-serialize this chart XML try: chart_xml = etree.fromstring(data) for entry in chart_files_to_update[item]: # Find all or elements and match by original text tag_to_find = f'{{{_NS_A}}}t' # Try both a:t and c:v for t_elem in chart_xml.iter(tag_to_find): if t_elem.text and t_elem.text.strip() == entry['original']: t_elem.text = entry['translated'] break else: for t_elem in chart_xml.iter(f'{{{_NS_C}}}v'): if t_elem.text and t_elem.text.strip() == entry['original']: t_elem.text = entry['translated'] break data = etree.tostring(chart_xml, xml_declaration=True, encoding='UTF-8', standalone=True) except Exception as e: _log_error("word_chart_update_error", chart_file=item, error=str(e)) zf_out.writestr(item, data) # Replace the output file with the updated ZIP with open(output_path, 'wb') as f: f.write(buf.getvalue()) _log_info("word_charts_translated", chart_files=len(chart_files_to_update), translations=len(translated_entries)) except Exception as e: _log_error("word_chart_zip_rewrite_error", error=str(e)) # ------------------------------------------------------------------ # SmartArt / Diagram support # ------------------------------------------------------------------ _NS_DGM = "http://schemas.openxmlformats.org/drawingml/2006/diagram" _NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main" def _collect_diagrams_from_zip( self, input_path: Path, text_elements: List[Tuple[str, Callable[[str], None]]], diagram_translations: List[Dict[str, Any]], ) -> None: """Parse SmartArt diagram XML from the .docx ZIP and collect translatable text. SmartArt text lives in ``word/diagrams/data*.xml`` inside the ZIP. Each diagram data file contains ```` elements with ```` text nodes. """ _TAG_A_T = f"{{{self._NS_A}}}t" try: with zipfile.ZipFile(input_path, 'r') as zf: diag_files = [ n for n in zf.namelist() if n.startswith('word/diagrams/data') and n.endswith('.xml') ] for diag_file in diag_files: try: diag_xml = etree.fromstring(zf.read(diag_file)) for t_elem in diag_xml.iter(_TAG_A_T): if t_elem.text and t_elem.text.strip(): original = t_elem.text.strip() # Skip numeric-only or very short tokens if original.replace('.', '').replace('-', '').replace(',', '').isdigit(): continue if len(original) <= 1: continue entry: Dict[str, Any] = { 'diag_file': diag_file, 'element_path': self._get_element_path(t_elem), 'original': original, } diagram_translations.append(entry) def _make_diag_setter( entries: List[Dict[str, Any]], idx: int ): def setter(text: str) -> None: entries[idx]['translated'] = text.strip() return setter text_elements.append( (original, _make_diag_setter(diagram_translations, len(diagram_translations) - 1)) ) except Exception as e: _log_error("word_diagram_parse_error", diag_file=diag_file, error=str(e)) if diagram_translations: _log_info( "word_diagram_collection", diagram_files=len(diag_files), text_count=len(diagram_translations), ) except Exception as e: _log_error("word_diagrams_zip_error", error=str(e)) def _apply_diagram_translations( self, output_path: Path, diagram_translations: List[Dict[str, Any]], ) -> None: """Re-inject SmartArt/diagram translations into the .docx ZIP. Modifies diagram data XML files in-place and rewrites the ZIP. """ if not diagram_translations: return translated_entries = [e for e in diagram_translations if 'translated' in e and e['translated']] if not translated_entries: return _TAG_A_T = f"{{{self._NS_A}}}t" # Group by diagram file diag_files_to_update: Dict[str, List[Dict]] = {} for entry in translated_entries: df = entry['diag_file'] if df not in diag_files_to_update: diag_files_to_update[df] = [] diag_files_to_update[df].append(entry) try: with zipfile.ZipFile(output_path, 'r') as zf_in: existing_entries = zf_in.namelist() buf = io.BytesIO() with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf_out: for item in existing_entries: data = zf_in.read(item) if item in diag_files_to_update: try: diag_xml = etree.fromstring(data) for entry in diag_files_to_update[item]: for t_elem in diag_xml.iter(_TAG_A_T): if t_elem.text and t_elem.text.strip() == entry['original']: t_elem.text = entry['translated'] break data = etree.tostring(diag_xml, xml_declaration=True, encoding='UTF-8', standalone=True) except Exception as e: _log_error("word_diagram_update_error", diag_file=item, error=str(e)) zf_out.writestr(item, data) with open(output_path, 'wb') as f: f.write(buf.getvalue()) _log_info( "word_diagrams_translated", diagram_files=len(diag_files_to_update), translations=len(translated_entries), ) except Exception as e: _log_error("word_diagram_zip_rewrite_error", error=str(e)) def _collect_from_paragraph( self, paragraph: Paragraph, text_elements: List[Tuple[str, Callable[[str], None]]], ) -> None: """Collect text from paragraph runs, preserving inter-run whitespace. Each run is sent for translation WITHOUT its surrounding whitespace. The whitespace is captured and reapplied after translation so that words at formatting boundaries (e.g. bold/normal) do not get concatenated. Handles runs both as direct children of AND inside elements (used for TOC entries, cross-references, and bookmarks links). """ # Check full paragraph text including nested content (hyperlinks, etc.) full_text = ''.join( t.text or '' for t in paragraph._p.iter(qn('w:t')) ).strip() if not full_text: return # Collect from direct child runs for run in paragraph.runs: if run.text and run.text.strip(): self._append_run_translation(run, text_elements) # Collect from runs inside elements # (TOC entries, cross-references — python-docx's paragraph.runs skips these) for hl in paragraph._p.iter(qn('w:hyperlink')): for r_elem in hl.findall(qn('w:r')): run = Run(r_elem, paragraph) if run.text and run.text.strip(): self._append_run_translation(run, text_elements) def _append_run_translation( self, run, text_elements: List[Tuple[str, Callable[[str], None]]], ) -> None: """Extract translatable text from a Run and append a (text, setter) tuple.""" original = run.text # Capture leading/trailing whitespace that must survive translation. leading = original[: len(original) - len(original.lstrip())] trailing = original[len(original.rstrip()) :] stripped = original.strip() def make_setter(r, lead: str, trail: str): def setter(text: str) -> None: # Strip any whitespace the translator may have added/removed # and reapply the original boundary whitespace. r.text = lead + text.strip() + trail return setter text_elements.append((stripped, make_setter(run, leading, trailing))) def _collect_from_table( self, table: Table, text_elements: List[Tuple[str, Callable[[str], None]]] ) -> None: """Collect text from table cells.""" for row in table.rows: for cell in row.cells: for paragraph in cell.paragraphs: self._collect_from_paragraph(paragraph, text_elements) for nested_table in cell.tables: self._collect_from_table(nested_table, text_elements) def _collect_from_section( self, section: Section, text_elements: List[Tuple[str, Callable[[str], None]]] ) -> None: """Collect text from headers and footers.""" headers_footers = [ section.header, section.footer, section.first_page_header, section.first_page_footer, section.even_page_header, section.even_page_footer, ] for hf in headers_footers: if hf: for paragraph in hf.paragraphs: self._collect_from_paragraph(paragraph, text_elements) for table in hf.tables: self._collect_from_table(table, text_elements) def _translate_images(self, document: Document, target_language: str) -> None: """Extract and translate text from images in Word document. Inserts the translated text as a caption paragraph under each image.""" try: inline_shapes = getattr(document, "inline_shapes", []) _log_info("word_image_translation_start", count=len(inline_shapes)) for idx, shape in enumerate(inline_shapes): # Type 3 is picture, type 12 is linked picture if not (hasattr(shape, "type") and shape.type in (3, 12)): continue try: image = getattr(shape, "image", None) if not image: continue image_data = image.blob ext = getattr(image, "ext", "png") or "png" import tempfile import os with tempfile.NamedTemporaryFile(suffix=f".{ext}", delete=False) as tmp: tmp.write(image_data) tmp_path = tmp.name translated_text = self._translate_image_text(tmp_path, target_language) try: os.unlink(tmp_path) except: pass if translated_text and translated_text.strip(): parent = shape._inline.getparent() while parent is not None and parent.tag != qn("w:p"): parent = parent.getparent() if parent is not None: p_elem = parent new_p_elem = OxmlElement("w:p") p_elem.addnext(new_p_elem) from docx.text.paragraph import Paragraph new_p = Paragraph(new_p_elem, document) from docx.shared import Pt, RGBColor run = new_p.add_run(f" [Image translation: {translated_text.strip()}] ") run.font.italic = True run.font.size = Pt(9) run.font.color.rgb = RGBColor(128, 128, 128) _log_info("word_image_translation_added", index=idx) except Exception as shape_err: _log_error("word_image_shape_translation_error", index=idx, error=str(shape_err)) except Exception as e: _log_error("word_image_processing_error", error=str(e)) def _translate_image_text( self, image_path: str, target_language: str ) -> str: """Translate image using active provider or legacy service.""" if self._provider and hasattr(self._provider, "translate_image"): try: return self._provider.translate_image(image_path, target_language) except Exception as e: _log_error("word_image_translation_provider_error", error=str(e)) from services.translation_service import translation_service # Temporarily enable translate_images flag on translation_service to bypass the hardcoded check old_val = getattr(translation_service, "translate_images", False) try: translation_service.translate_images = True if hasattr(translation_service, "translate_image"): return translation_service.translate_image(image_path, target_language) except Exception as e: _log_error("word_image_translation_legacy_error", error=str(e)) finally: translation_service.translate_images = old_val return "" word_translator = WordTranslator()