Files
office_translator/translators/word_translator.py
sepehr c0f93501cc
All checks were successful
Deploy to Production / Build and Deploy (push) Successful in 2s
fix: use Google Cloud API key for classic mode + translation verification
Two critical fixes:

1. Provider "google" (default classic mode) now checks for a Google Cloud
   API key (GOOGLE_CLOUD_API_KEY in env or admin settings). If present,
   uses GoogleCloudTranslationProvider (official API). Previously it
   always fell through to deep_translator (free scraper) which gets
   blocked in production, silently returning untranslated text.

2. Added translation verification: each translator now tracks how many
   texts were attempted vs actually changed. If 0 texts were translated,
   the job is marked as FAILED with a clear error message instead of
   returning the original file as "completed".

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-17 12:09:26 +02:00

1080 lines
43 KiB
Python

"""
Word Document Translation Module
Translates Word files while preserving all formatting, styles, tables, and images
OPTIMIZED: Uses batch translation for 5-10x faster processing
Updated to use new TranslationProvider interface with structured error handling.
"""
import time
import zipfile
import io
import concurrent.futures
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Callable, Any
from docx import Document
from docx.text.paragraph import Paragraph
from docx.text.run import Run
from docx.table import Table, _Cell
from docx.oxml.text.paragraph import CT_P
from docx.oxml.table import CT_Tbl
from docx.oxml import OxmlElement
from docx.oxml.ns import qn
from docx.section import Section
from lxml import etree
from services.providers.base import TranslationProvider
# Languages written right-to-left
RTL_LANGUAGES: frozenset = frozenset(
{"ar", "he", "fa", "ur", "ku", "ps", "ug", "sd", "yi", "dv", "ckb"}
)
from core.logging import get_logger
logger = get_logger(__name__)
_HAS_STRUCTLOG = True
def _log_info(event: str, **kwargs):
"""Log info with structlog or standard logging compatibility."""
if _HAS_STRUCTLOG:
logger.info(event, **kwargs)
else:
msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items())
logger.info(msg)
def _log_error(event: str, **kwargs):
"""Log error with structlog or standard logging compatibility."""
if _HAS_STRUCTLOG:
logger.error(event, **kwargs)
else:
msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items())
logger.error(msg)
def _set_paragraph_rtl(paragraph: Paragraph) -> None:
"""
Enable RTL mode on a paragraph and all its runs.
Sets:
- w:pPr/w:bidi → paragraph text direction = RTL
- w:pPr/w:jc → alignment = right
- w:rPr/w:rtl → run-level RTL marker for each run
"""
pPr = paragraph._p.get_or_add_pPr()
if pPr.find(qn("w:bidi")) is None:
pPr.append(OxmlElement("w:bidi"))
jc = pPr.find(qn("w:jc"))
if jc is None:
jc = OxmlElement("w:jc")
pPr.append(jc)
jc.set(qn("w:val"), "right")
for run in paragraph.runs:
rPr = run._r.get_or_add_rPr()
if rPr.find(qn("w:rtl")) is None:
rPr.append(OxmlElement("w:rtl"))
def _apply_rtl_to_document(document: Document) -> None:
"""Apply RTL direction to every paragraph and section in the document."""
# Body paragraphs
for para in document.paragraphs:
_set_paragraph_rtl(para)
# Body tables
for table in document.tables:
for row in table.rows:
for cell in row.cells:
for para in cell.paragraphs:
_set_paragraph_rtl(para)
# Headers, footers, and section-level RTL (page layout direction)
for section in document.sections:
# Set the section (page) direction to RTL so Word renders margins,
# columns and page numbering from right to left.
sectPr = section._sectPr
if sectPr.find(qn("w:bidi")) is None:
sectPr.append(OxmlElement("w:bidi"))
for hf in (section.header, section.footer):
for para in hf.paragraphs:
_set_paragraph_rtl(para)
for table in hf.tables:
for row in table.rows:
for cell in row.cells:
for para in cell.paragraphs:
_set_paragraph_rtl(para)
class WordProcessorError(Exception):
"""Exception for Word processing errors with structured error codes."""
INVALID_FORMAT = "INVALID_FORMAT"
DOCX_CORRUPTED = "DOCX_CORRUPTED"
DOCX_READ_ERROR = "DOCX_READ_ERROR"
DOCX_WRITE_ERROR = "DOCX_WRITE_ERROR"
DOCX_TOO_LARGE = "DOCX_TOO_LARGE"
ERROR_MESSAGES = {
INVALID_FORMAT: "Format de fichier non supporte. Utilisez .docx.",
DOCX_CORRUPTED: "Le document Word est corrompu ou illisible.",
DOCX_READ_ERROR: "Erreur lors de la lecture du document Word.",
DOCX_WRITE_ERROR: "Erreur lors de la creation du document traduit.",
DOCX_TOO_LARGE: "Le fichier est trop volumineux (max 50 Mo).",
}
def __init__(
self,
code: str,
message: Optional[str] = None,
details: Optional[Dict[str, Any]] = None,
):
self.code = code
self.message = message or self.ERROR_MESSAGES.get(code, "Erreur inconnue")
self.details = details or {}
super().__init__(self.message)
def to_dict(self) -> Dict[str, Any]:
"""Convert error to dictionary format for API responses."""
result = {"error": self.code, "message": self.message}
if self.details:
result["details"] = self.details
return result
class WordTranslator:
"""
Handles translation of Word documents with strict formatting preservation.
Uses the new TranslationProvider interface for improved error handling
and fallback chain support.
"""
MAX_FILE_SIZE_MB = 50
DOCX_MAGIC_BYTES = b"PK" # .docx files are ZIP archives
# Namespace URIs not registered in python-docx's nsmap
_NS_MC = "http://schemas.openxmlformats.org/markup-compatibility/2006"
_TAG_ALT_CONTENT = f"{{{_NS_MC}}}AlternateContent"
def __init__(self, provider: Optional[TranslationProvider] = None):
"""
Initialize WordTranslator.
Args:
provider: TranslationProvider instance for translations.
If None, will use fallback to legacy translation_service.
"""
self._provider = provider
self._custom_prompt: Optional[str] = None
self._translation_stats = {"attempted": 0, "changed": 0}
def set_provider(self, provider: TranslationProvider) -> None:
"""Set the translation provider."""
self._provider = provider
def set_custom_prompt(self, prompt: Optional[str]) -> None:
"""Set custom system prompt for LLM providers."""
self._custom_prompt = prompt
def translate_file(
self,
input_path: Path,
output_path: Path,
target_language: str,
source_language: str = "auto",
progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None,
) -> Path:
"""
Translate a Word document while preserving all formatting and structure.
Uses batch translation for improved performance.
Args:
input_path: Path to input Word file
output_path: Path for translated output file
target_language: Target language code (e.g., 'fr', 'en')
source_language: Source language code (default: auto-detect)
progress_callback: Optional callback for progress updates
Receives dict with: element, total_elements, runs_translated
Returns:
Path to translated file
Raises:
WordProcessorError: If file is invalid, corrupted, or processing fails
"""
start_time = time.time()
input_path = Path(input_path)
output_path = Path(output_path)
self._validate_file(input_path)
try:
document = Document(input_path)
except Exception as e:
raise WordProcessorError(
code=WordProcessorError.DOCX_CORRUPTED,
details={"file_name": input_path.name, "error": str(e)},
)
try:
runs_translated = 0
text_elements: List[Tuple[str, Callable[[str], None]]] = []
chart_translations: List[Dict[str, Any]] = []
diagram_translations: List[Dict[str, Any]] = []
self._collect_from_body(document, text_elements)
# Collect chart text from ZIP (chart titles, axis labels, series names)
self._collect_charts_from_zip(input_path, text_elements, chart_translations)
# Collect SmartArt/diagram text from ZIP
self._collect_diagrams_from_zip(input_path, text_elements, diagram_translations)
total_sections = len(document.sections)
total_elements = 0
for section_idx, section in enumerate(document.sections):
self._collect_from_section(section, text_elements)
total_elements = len(text_elements)
if progress_callback:
progress_callback(
{
"current": section_idx + 1,
"total": total_sections,
"paragraph": section_idx + 1,
"total_paragraphs": total_sections,
"runs_translated": runs_translated,
"phase": "collecting",
}
)
if text_elements:
texts = [elem[0] for elem in text_elements]
total_elements = len(text_elements)
_log_info(
"word_batch_translation_start",
file_name=input_path.name,
text_count=len(texts),
target_lang=target_language,
)
# Split into chunks and translate them IN PARALLEL using a thread
# pool. Each worker handles one chunk independently, making
# full use of available CPU/network concurrency. Progress is
# reported as chunks complete (out-of-order completions are
# fine — the tracker only moves forward).
CHUNK_SIZE = 15
MAX_WORKERS = 6
chunks = [
(i, texts[i : i + CHUNK_SIZE])
for i in range(0, total_elements, CHUNK_SIZE)
]
translated_texts: List[str] = [""] * total_elements
completed_items = [0] # mutable counter shared across threads
def _translate_chunk(
chunk_idx: int, chunk: List[str]
) -> Tuple[int, List[str]]:
result = self._batch_translate(chunk, target_language, source_language)
return chunk_idx, result
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
future_map = {
pool.submit(_translate_chunk, idx, chunk): (idx, chunk)
for idx, chunk in chunks
}
for future in concurrent.futures.as_completed(future_map):
chunk_idx, translated_chunk = future.result()
start = chunk_idx
for j, t in enumerate(translated_chunk):
translated_texts[start + j] = t
completed_items[0] += len(translated_chunk)
if progress_callback:
done = min(completed_items[0], total_elements)
progress_callback(
{
"current": done,
"total": total_elements,
"paragraph": done,
"total_paragraphs": total_elements,
"runs_translated": runs_translated,
"phase": "translating",
}
)
# Apply translations (fast — just text assignment)
for i, ((original_text, setter), translated) in enumerate(
zip(text_elements, translated_texts)
):
if translated is not None and setter is not None:
try:
setter(translated)
runs_translated += 1
except Exception as e:
_log_error(
"word_setter_error",
error=str(e),
index=i,
)
# Apply RTL layout when the target language is written right-to-left.
if target_language.lower() in RTL_LANGUAGES:
_apply_rtl_to_document(document)
if progress_callback:
progress_callback(
{
"current": total_elements if text_elements else total_sections,
"total": total_elements if text_elements else total_sections,
"paragraph": total_sections,
"total_paragraphs": total_sections,
"runs_translated": runs_translated,
"phase": "complete",
}
)
try:
document.save(output_path)
except Exception as e:
raise WordProcessorError(
code=WordProcessorError.DOCX_WRITE_ERROR,
details={"file_name": output_path.name, "error": str(e)},
)
# Re-inject chart translations into the saved .docx ZIP
if chart_translations:
self._apply_chart_translations(input_path, output_path, chart_translations)
# Re-inject SmartArt/diagram translations into the saved .docx ZIP
if diagram_translations:
self._apply_diagram_translations(output_path, diagram_translations)
processing_time_ms = round((time.time() - start_time) * 1000, 2)
_log_info(
"word_translation_success",
file_name=input_path.name,
runs_translated=runs_translated,
source_lang=source_language,
target_lang=target_language,
processing_time_ms=processing_time_ms,
)
return output_path
except WordProcessorError:
raise
except Exception as e:
import traceback
_log_error(
"word_translation_unexpected_error",
file_name=input_path.name,
error=str(e),
traceback=traceback.format_exc(),
)
raise WordProcessorError(
code=WordProcessorError.DOCX_READ_ERROR,
details={"file_name": input_path.name, "error": str(e)},
)
def _validate_file(self, file_path: Path) -> None:
"""Validate file format and size."""
if not file_path.exists():
raise WordProcessorError(
code=WordProcessorError.DOCX_READ_ERROR,
message=f"Fichier introuvable: {file_path.name}",
details={"file_name": file_path.name},
)
if file_path.suffix.lower() != ".docx":
raise WordProcessorError(
code=WordProcessorError.INVALID_FORMAT,
details={
"file_name": file_path.name,
"extension": file_path.suffix,
"expected": ".docx",
},
)
with open(file_path, "rb") as f:
header = f.read(4)
if header[:2] != self.DOCX_MAGIC_BYTES:
raise WordProcessorError(
code=WordProcessorError.INVALID_FORMAT,
details={"file_name": file_path.name, "reason": "Invalid file header"},
)
file_size_mb = file_path.stat().st_size / (1024 * 1024)
if file_size_mb > self.MAX_FILE_SIZE_MB:
raise WordProcessorError(
code=WordProcessorError.DOCX_TOO_LARGE,
details={
"file_name": file_path.name,
"size_mb": round(file_size_mb, 2),
"max_mb": self.MAX_FILE_SIZE_MB,
},
)
def _batch_translate(
self, texts: List[str], target_language: str, source_language: str = "auto"
) -> List[str]:
"""
Batch translate using new provider interface.
Args:
texts: List of texts to translate
target_language: Target language code
source_language: Source language code
Returns:
List of translated texts (same order as input)
"""
if not texts:
return []
non_empty = [t for t in texts if t and t.strip()]
self._translation_stats["attempted"] += len(non_empty)
if self._provider is not None:
translated = self._translate_with_provider(
texts, target_language, source_language
)
else:
translated = self._translate_with_legacy(texts, target_language, source_language)
changed = sum(1 for orig, trans in zip(texts, translated) if orig != trans and trans.strip())
self._translation_stats["changed"] += changed
return translated
def get_translation_stats(self) -> dict:
return dict(self._translation_stats)
def _translate_with_provider(
self, texts: List[str], target_language: str, source_language: str
) -> List[str]:
"""Translate using the TranslationProvider.translate_batch() interface."""
translated = self._provider.translate_batch(texts, target_language, source_language)
# Fallback: keep original text for any empty/failed result
return [
t if (t and t.strip()) else orig
for t, orig in zip(translated, texts)
]
def _translate_with_legacy(
self, texts: List[str], target_language: str, source_language: str
) -> List[str]:
"""Fallback to legacy translation_service for backward compatibility."""
from services.translation_service import translation_service
_log_info(
"word_using_legacy_service",
text_count=len(texts),
target_lang=target_language,
)
return translation_service.translate_batch(
texts, target_language, source_language
)
def _collect_from_body(
self, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Collect all text elements from document body.
Handles: paragraphs, tables, SDT (TOC/index), text boxes, shapes,
AlternateContent blocks, and any nested drawing elements.
"""
count_before = len(text_elements)
# Pass 1: walk direct body children
for element in document.element.body:
self._collect_from_element(element, document, text_elements)
pass1_count = len(text_elements) - count_before
# Pass 2: find ALL <w:txbxContent> in the entire body XML tree.
# Text boxes / rectangles / shapes store their text here, nested deep
# inside <w:drawing> → <a:graphic> → <wps:wsp> → <wps:txbx> or
# inside <w:pict> → <v:shape> → <v:textbox>.
self._collect_from_textboxes(document.element.body, document, text_elements)
pass2_count = len(text_elements) - count_before - pass1_count
# Pass 3: footnotes and endnotes
self._collect_from_footnotes(document, text_elements)
self._collect_from_endnotes(document, text_elements)
total = len(text_elements) - count_before
_log_info(
"word_collection_summary",
body_runs=pass1_count,
textbox_runs=pass2_count,
total_collected=total,
)
def _collect_from_element(
self, element, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Recursively collect from any element type."""
if isinstance(element, CT_P):
paragraph = Paragraph(element, document)
self._collect_from_paragraph(paragraph, text_elements)
elif isinstance(element, CT_Tbl):
table = Table(element, document)
self._collect_from_table(table, text_elements)
elif element.tag == qn("w:sdt"):
self._collect_from_sdt(element, document, text_elements)
elif element.tag == self._TAG_ALT_CONTENT:
# <mc:AlternateContent> wraps drawing/shape content
for part in element:
self._collect_from_element(part, document, text_elements)
else:
# For any other container element, recurse into children
# to catch paragraphs nested in unexpected wrappers
for child in element:
if isinstance(child, CT_P):
paragraph = Paragraph(child, document)
self._collect_from_paragraph(paragraph, text_elements)
elif isinstance(child, CT_Tbl):
table = Table(child, document)
self._collect_from_table(table, text_elements)
def _collect_from_textboxes(
self, root, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Find and collect text from ALL <w:txbxContent> elements in the XML tree.
This catches text in:
- Rectangles / rounded rectangles / any shape with text
- Text boxes
- Callouts
- WordArt (if it has text content)
- Shapes nested in <mc:AlternateContent> blocks
The <w:txbxContent> element contains regular <w:p> paragraphs
with <w:r> runs, just like normal body text.
"""
# Find all w:txbxContent elements anywhere in the tree
for txbx in root.iter(qn("w:txbxContent")):
for child in txbx:
if isinstance(child, CT_P):
paragraph = Paragraph(child, document)
self._collect_from_paragraph(paragraph, text_elements)
elif isinstance(child, CT_Tbl):
table = Table(child, document)
self._collect_from_table(table, text_elements)
def _collect_from_sdt(
self, sdt_element, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Collect text from Structured Document Tags (TOC, index, content controls).
SDT XML structure:
<w:sdt>
<w:sdtPr>...</w:sdtPr>
<w:sdtContent>
<w:p>...</w:p> <!-- paragraphs -->
<w:tbl>...</w:tbl> <!-- tables -->
</w:sdtContent>
</w:sdt>
"""
sdt_content = sdt_element.find(qn("w:sdtContent"))
if sdt_content is None:
return
for child in sdt_content:
if isinstance(child, CT_P):
paragraph = Paragraph(child, document)
self._collect_from_paragraph(paragraph, text_elements)
elif isinstance(child, CT_Tbl):
table = Table(child, document)
self._collect_from_table(table, text_elements)
def _collect_from_footnotes(
self, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Collect text from footnotes."""
try:
footnotes_part = document.part.package.part_related_by(
"http://schemas.openxmlformats.org/officeDocument/2006/relationships/footnotes"
) if hasattr(document.part, 'package') else None
except Exception:
footnotes_part = None
if footnotes_part is None:
# Fallback: try direct XML access
try:
footnotes_element = document.element.find(qn("w:footnotes"))
if footnotes_element is not None:
for child in footnotes_element:
if isinstance(child, CT_P):
paragraph = Paragraph(child, document)
self._collect_from_paragraph(paragraph, text_elements)
except Exception:
pass
return
try:
footnotes_xml = etree.fromstring(footnotes_part.blob)
for child in footnotes_xml:
if child.tag == qn("w:footnote"):
for para_elem in child.findall(qn("w:p")):
paragraph = Paragraph(para_elem, document)
self._collect_from_paragraph(paragraph, text_elements)
except Exception as e:
_log_error("word_footnotes_parse_error", error=str(e))
def _collect_from_endnotes(
self, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Collect text from endnotes."""
try:
endnotes_part = document.part.package.part_related_by(
"http://schemas.openxmlformats.org/officeDocument/2006/relationships/endnotes"
) if hasattr(document.part, 'package') else None
except Exception:
endnotes_part = None
if endnotes_part is None:
try:
endnotes_element = document.element.find(qn("w:endnotes"))
if endnotes_element is not None:
for child in endnotes_element:
if isinstance(child, CT_P):
paragraph = Paragraph(child, document)
self._collect_from_paragraph(paragraph, text_elements)
except Exception:
pass
return
try:
endnotes_xml = etree.fromstring(endnotes_part.blob)
for child in endnotes_xml:
if child.tag == qn("w:endnote"):
for para_elem in child.findall(qn("w:p")):
paragraph = Paragraph(para_elem, document)
self._collect_from_paragraph(paragraph, text_elements)
except Exception as e:
_log_error("word_endnotes_parse_error", error=str(e))
def _collect_from_charts(
self, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Collect text from embedded charts (chart titles, axis labels, series names).
Charts are stored as separate XML parts in the .docx ZIP archive.
The chart XML uses DrawingML namespaces for text content.
"""
_NS_C = "http://schemas.openxmlformats.org/drawingml/2006/chart"
_NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main"
try:
# Access the raw ZIP to find chart parts
docx_path = document.part.package.main_document_part.partname
package = document.part.package
# Find all chart relationship targets
for rel_type, rels in (package.rels or {}).items():
pass # python-docx doesn't expose this cleanly
except Exception:
pass
# More reliable: open the .docx as a ZIP and parse chart XML directly
try:
# Get the original file path from the document
input_file = None
# Try to recover the file path — document object doesn't store it directly
# We'll handle charts in translate_file() instead where we have the path
pass
except Exception:
pass
def _collect_charts_from_zip(
self, input_path: Path, text_elements: List[Tuple[str, Callable[[str], None]]],
chart_translations: List[Dict[str, Any]]
) -> None:
"""Parse chart XML from the .docx ZIP and collect translatable text.
Args:
input_path: Path to the .docx file
text_elements: List to append (text, setter) tuples
chart_translations: List to store chart translation metadata for later re-injection
"""
_NS_C = "http://schemas.openxmlformats.org/drawingml/2006/chart"
_NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main"
try:
with zipfile.ZipFile(input_path, 'r') as zf:
chart_files = [name for name in zf.namelist() if name.startswith('word/charts/') and name.endswith('.xml')]
for chart_file in chart_files:
try:
chart_xml = etree.fromstring(zf.read(chart_file))
# Collect from <c:title><c:tx><a:rich> or <c:tx><a:strRef>
for tag in ['c:title', 'c:cat', 'c:val']:
for parent_elem in chart_xml.iter(f'{{{ _NS_C }}}{tag}' if not tag.startswith('{') else tag):
# Direct rich text: <a:rich><a:p><a:r><a:t>
for t_elem in parent_elem.iter(f'{{{_NS_A}}}t'):
if t_elem.text and t_elem.text.strip():
# Store reference for setter
entry = {
'chart_file': chart_file,
'element_path': self._get_element_path(t_elem),
'original': t_elem.text.strip(),
}
chart_translations.append(entry)
def make_chart_setter(entries, idx):
def setter(text):
entries[idx]['translated'] = text.strip()
return setter
text_elements.append(
(t_elem.text.strip(), make_chart_setter(chart_translations, len(chart_translations) - 1))
)
# Series names in <c:ser><c:tx><c:strRef><c:f> or <c:v>
for ser_elem in chart_xml.iter(f'{{{_NS_C}}}ser'):
for v_elem in ser_elem.iter(f'{{{_NS_C}}}v'):
if v_elem.text and v_elem.text.strip() and not v_elem.text.strip().replace('.', '').replace('-', '').isdigit():
entry = {
'chart_file': chart_file,
'element_path': self._get_element_path(v_elem),
'original': v_elem.text.strip(),
}
chart_translations.append(entry)
def make_chart_val_setter(entries, idx):
def setter(text):
entries[idx]['translated'] = text.strip()
return setter
text_elements.append(
(v_elem.text.strip(), make_chart_val_setter(chart_translations, len(chart_translations) - 1))
)
except Exception as e:
_log_error("word_chart_parse_error", chart_file=chart_file, error=str(e))
except Exception as e:
_log_error("word_charts_zip_error", error=str(e))
def _get_element_path(self, element) -> str:
"""Get a unique XPath-like path for an element within its document."""
path_parts = []
current = element
while current is not None:
parent = current.getparent()
if parent is None:
break
idx = list(parent).index(current)
tag = current.tag.split('}')[-1] if '}' in current.tag else current.tag
path_parts.append(f"{tag}[{idx}]")
current = parent
return '/'.join(reversed(path_parts))
def _apply_chart_translations(self, input_path: Path, output_path: Path, chart_translations: List[Dict[str, Any]]) -> None:
"""Re-inject chart translations into the .docx ZIP.
Modifies chart XML files in-place and rewrites the ZIP.
"""
if not chart_translations:
return
# Only proceed if at least one translation exists
translated_entries = [e for e in chart_translations if 'translated' in e and e['translated']]
if not translated_entries:
return
_NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main"
_NS_C = "http://schemas.openxmlformats.org/drawingml/2006/chart"
# Group by chart file
chart_files_to_update: Dict[str, List[Dict]] = {}
for entry in translated_entries:
cf = entry['chart_file']
if cf not in chart_files_to_update:
chart_files_to_update[cf] = []
chart_files_to_update[cf].append(entry)
try:
# Read all ZIP entries
with zipfile.ZipFile(output_path, 'r') as zf_in:
existing_entries = zf_in.namelist()
# Create new ZIP in memory
buf = io.BytesIO()
with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf_out:
for item in existing_entries:
data = zf_in.read(item)
if item in chart_files_to_update:
# Parse, update, re-serialize this chart XML
try:
chart_xml = etree.fromstring(data)
for entry in chart_files_to_update[item]:
# Find all <a:t> or <c:v> elements and match by original text
tag_to_find = f'{{{_NS_A}}}t'
# Try both a:t and c:v
for t_elem in chart_xml.iter(tag_to_find):
if t_elem.text and t_elem.text.strip() == entry['original']:
t_elem.text = entry['translated']
break
else:
for t_elem in chart_xml.iter(f'{{{_NS_C}}}v'):
if t_elem.text and t_elem.text.strip() == entry['original']:
t_elem.text = entry['translated']
break
data = etree.tostring(chart_xml, xml_declaration=True, encoding='UTF-8', standalone=True)
except Exception as e:
_log_error("word_chart_update_error", chart_file=item, error=str(e))
zf_out.writestr(item, data)
# Replace the output file with the updated ZIP
with open(output_path, 'wb') as f:
f.write(buf.getvalue())
_log_info("word_charts_translated", chart_files=len(chart_files_to_update), translations=len(translated_entries))
except Exception as e:
_log_error("word_chart_zip_rewrite_error", error=str(e))
# ------------------------------------------------------------------
# SmartArt / Diagram support
# ------------------------------------------------------------------
_NS_DGM = "http://schemas.openxmlformats.org/drawingml/2006/diagram"
_NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main"
def _collect_diagrams_from_zip(
self,
input_path: Path,
text_elements: List[Tuple[str, Callable[[str], None]]],
diagram_translations: List[Dict[str, Any]],
) -> None:
"""Parse SmartArt diagram XML from the .docx ZIP and collect translatable text.
SmartArt text lives in ``word/diagrams/data*.xml`` inside the ZIP.
Each diagram data file contains ``<dgm:pt>`` elements with ``<a:t>``
text nodes.
"""
_TAG_A_T = f"{{{self._NS_A}}}t"
try:
with zipfile.ZipFile(input_path, 'r') as zf:
diag_files = [
n for n in zf.namelist()
if n.startswith('word/diagrams/data') and n.endswith('.xml')
]
for diag_file in diag_files:
try:
diag_xml = etree.fromstring(zf.read(diag_file))
for t_elem in diag_xml.iter(_TAG_A_T):
if t_elem.text and t_elem.text.strip():
original = t_elem.text.strip()
# Skip numeric-only or very short tokens
if original.replace('.', '').replace('-', '').replace(',', '').isdigit():
continue
if len(original) <= 1:
continue
entry: Dict[str, Any] = {
'diag_file': diag_file,
'element_path': self._get_element_path(t_elem),
'original': original,
}
diagram_translations.append(entry)
def _make_diag_setter(
entries: List[Dict[str, Any]], idx: int
):
def setter(text: str) -> None:
entries[idx]['translated'] = text.strip()
return setter
text_elements.append(
(original, _make_diag_setter(diagram_translations, len(diagram_translations) - 1))
)
except Exception as e:
_log_error("word_diagram_parse_error", diag_file=diag_file, error=str(e))
if diagram_translations:
_log_info(
"word_diagram_collection",
diagram_files=len(diag_files),
text_count=len(diagram_translations),
)
except Exception as e:
_log_error("word_diagrams_zip_error", error=str(e))
def _apply_diagram_translations(
self,
output_path: Path,
diagram_translations: List[Dict[str, Any]],
) -> None:
"""Re-inject SmartArt/diagram translations into the .docx ZIP.
Modifies diagram data XML files in-place and rewrites the ZIP.
"""
if not diagram_translations:
return
translated_entries = [e for e in diagram_translations if 'translated' in e and e['translated']]
if not translated_entries:
return
_TAG_A_T = f"{{{self._NS_A}}}t"
# Group by diagram file
diag_files_to_update: Dict[str, List[Dict]] = {}
for entry in translated_entries:
df = entry['diag_file']
if df not in diag_files_to_update:
diag_files_to_update[df] = []
diag_files_to_update[df].append(entry)
try:
with zipfile.ZipFile(output_path, 'r') as zf_in:
existing_entries = zf_in.namelist()
buf = io.BytesIO()
with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf_out:
for item in existing_entries:
data = zf_in.read(item)
if item in diag_files_to_update:
try:
diag_xml = etree.fromstring(data)
for entry in diag_files_to_update[item]:
for t_elem in diag_xml.iter(_TAG_A_T):
if t_elem.text and t_elem.text.strip() == entry['original']:
t_elem.text = entry['translated']
break
data = etree.tostring(diag_xml, xml_declaration=True, encoding='UTF-8', standalone=True)
except Exception as e:
_log_error("word_diagram_update_error", diag_file=item, error=str(e))
zf_out.writestr(item, data)
with open(output_path, 'wb') as f:
f.write(buf.getvalue())
_log_info(
"word_diagrams_translated",
diagram_files=len(diag_files_to_update),
translations=len(translated_entries),
)
except Exception as e:
_log_error("word_diagram_zip_rewrite_error", error=str(e))
def _collect_from_paragraph(
self,
paragraph: Paragraph,
text_elements: List[Tuple[str, Callable[[str], None]]],
) -> None:
"""Collect text from paragraph runs, preserving inter-run whitespace.
Each run is sent for translation WITHOUT its surrounding whitespace.
The whitespace is captured and reapplied after translation so that words
at formatting boundaries (e.g. bold/normal) do not get concatenated.
Handles runs both as direct children of <w:p> AND inside <w:hyperlink>
elements (used for TOC entries, cross-references, and bookmarks links).
"""
# Check full paragraph text including nested content (hyperlinks, etc.)
full_text = ''.join(
t.text or '' for t in paragraph._p.iter(qn('w:t'))
).strip()
if not full_text:
return
# Collect from direct child runs
for run in paragraph.runs:
if run.text and run.text.strip():
self._append_run_translation(run, text_elements)
# Collect from runs inside <w:hyperlink> elements
# (TOC entries, cross-references — python-docx's paragraph.runs skips these)
for hl in paragraph._p.iter(qn('w:hyperlink')):
for r_elem in hl.findall(qn('w:r')):
run = Run(r_elem, paragraph)
if run.text and run.text.strip():
self._append_run_translation(run, text_elements)
def _append_run_translation(
self,
run,
text_elements: List[Tuple[str, Callable[[str], None]]],
) -> None:
"""Extract translatable text from a Run and append a (text, setter) tuple."""
original = run.text
# Capture leading/trailing whitespace that must survive translation.
leading = original[: len(original) - len(original.lstrip())]
trailing = original[len(original.rstrip()) :]
stripped = original.strip()
def make_setter(r, lead: str, trail: str):
def setter(text: str) -> None:
# Strip any whitespace the translator may have added/removed
# and reapply the original boundary whitespace.
r.text = lead + text.strip() + trail
return setter
text_elements.append((stripped, make_setter(run, leading, trailing)))
def _collect_from_table(
self, table: Table, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Collect text from table cells."""
for row in table.rows:
for cell in row.cells:
for paragraph in cell.paragraphs:
self._collect_from_paragraph(paragraph, text_elements)
for nested_table in cell.tables:
self._collect_from_table(nested_table, text_elements)
def _collect_from_section(
self, section: Section, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Collect text from headers and footers."""
headers_footers = [
section.header,
section.footer,
section.first_page_header,
section.first_page_footer,
section.even_page_header,
section.even_page_footer,
]
for hf in headers_footers:
if hf:
for paragraph in hf.paragraphs:
self._collect_from_paragraph(paragraph, text_elements)
for table in hf.tables:
self._collect_from_table(table, text_elements)
word_translator = WordTranslator()