- Restructured docker-compose for Nginx Proxy Manager (no custom nginx) - Added domain wordly.art configuration - Added Prometheus + Grafana monitoring stack with pre-configured dashboards - Added PostgreSQL backup script to NAS (daily/weekly/monthly rotation) - Added alert rules for backend, system, and Docker metrics - Updated deployment guide for NPM + IONOS DNS homelab setup - Added marketing plan document - PDF translator and watermark support - Enhanced middleware, routes, and translator modules Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1068 lines
43 KiB
Python
1068 lines
43 KiB
Python
"""
|
|
Word Document Translation Module
|
|
Translates Word files while preserving all formatting, styles, tables, and images
|
|
OPTIMIZED: Uses batch translation for 5-10x faster processing
|
|
|
|
Updated to use new TranslationProvider interface with structured error handling.
|
|
"""
|
|
|
|
import time
|
|
import zipfile
|
|
import io
|
|
import concurrent.futures
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple, Optional, Callable, Any
|
|
|
|
from docx import Document
|
|
from docx.text.paragraph import Paragraph
|
|
from docx.text.run import Run
|
|
from docx.table import Table, _Cell
|
|
from docx.oxml.text.paragraph import CT_P
|
|
from docx.oxml.table import CT_Tbl
|
|
from docx.oxml import OxmlElement
|
|
from docx.oxml.ns import qn
|
|
from docx.section import Section
|
|
from lxml import etree
|
|
|
|
from services.providers.base import TranslationProvider
|
|
|
|
# Languages written right-to-left
|
|
RTL_LANGUAGES: frozenset = frozenset(
|
|
{"ar", "he", "fa", "ur", "ku", "ps", "ug", "sd", "yi", "dv", "ckb"}
|
|
)
|
|
|
|
|
|
from core.logging import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
_HAS_STRUCTLOG = True
|
|
|
|
|
|
def _log_info(event: str, **kwargs):
|
|
"""Log info with structlog or standard logging compatibility."""
|
|
if _HAS_STRUCTLOG:
|
|
logger.info(event, **kwargs)
|
|
else:
|
|
msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items())
|
|
logger.info(msg)
|
|
|
|
|
|
def _log_error(event: str, **kwargs):
|
|
"""Log error with structlog or standard logging compatibility."""
|
|
if _HAS_STRUCTLOG:
|
|
logger.error(event, **kwargs)
|
|
else:
|
|
msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items())
|
|
logger.error(msg)
|
|
|
|
|
|
def _set_paragraph_rtl(paragraph: Paragraph) -> None:
|
|
"""
|
|
Enable RTL mode on a paragraph and all its runs.
|
|
|
|
Sets:
|
|
- w:pPr/w:bidi → paragraph text direction = RTL
|
|
- w:pPr/w:jc → alignment = right
|
|
- w:rPr/w:rtl → run-level RTL marker for each run
|
|
"""
|
|
pPr = paragraph._p.get_or_add_pPr()
|
|
|
|
if pPr.find(qn("w:bidi")) is None:
|
|
pPr.append(OxmlElement("w:bidi"))
|
|
|
|
jc = pPr.find(qn("w:jc"))
|
|
if jc is None:
|
|
jc = OxmlElement("w:jc")
|
|
pPr.append(jc)
|
|
jc.set(qn("w:val"), "right")
|
|
|
|
for run in paragraph.runs:
|
|
rPr = run._r.get_or_add_rPr()
|
|
if rPr.find(qn("w:rtl")) is None:
|
|
rPr.append(OxmlElement("w:rtl"))
|
|
|
|
|
|
def _apply_rtl_to_document(document: Document) -> None:
|
|
"""Apply RTL direction to every paragraph and section in the document."""
|
|
# Body paragraphs
|
|
for para in document.paragraphs:
|
|
_set_paragraph_rtl(para)
|
|
# Body tables
|
|
for table in document.tables:
|
|
for row in table.rows:
|
|
for cell in row.cells:
|
|
for para in cell.paragraphs:
|
|
_set_paragraph_rtl(para)
|
|
# Headers, footers, and section-level RTL (page layout direction)
|
|
for section in document.sections:
|
|
# Set the section (page) direction to RTL so Word renders margins,
|
|
# columns and page numbering from right to left.
|
|
sectPr = section._sectPr
|
|
if sectPr.find(qn("w:bidi")) is None:
|
|
sectPr.append(OxmlElement("w:bidi"))
|
|
|
|
for hf in (section.header, section.footer):
|
|
for para in hf.paragraphs:
|
|
_set_paragraph_rtl(para)
|
|
for table in hf.tables:
|
|
for row in table.rows:
|
|
for cell in row.cells:
|
|
for para in cell.paragraphs:
|
|
_set_paragraph_rtl(para)
|
|
|
|
|
|
class WordProcessorError(Exception):
|
|
"""Exception for Word processing errors with structured error codes."""
|
|
|
|
INVALID_FORMAT = "INVALID_FORMAT"
|
|
DOCX_CORRUPTED = "DOCX_CORRUPTED"
|
|
DOCX_READ_ERROR = "DOCX_READ_ERROR"
|
|
DOCX_WRITE_ERROR = "DOCX_WRITE_ERROR"
|
|
DOCX_TOO_LARGE = "DOCX_TOO_LARGE"
|
|
|
|
ERROR_MESSAGES = {
|
|
INVALID_FORMAT: "Format de fichier non supporte. Utilisez .docx.",
|
|
DOCX_CORRUPTED: "Le document Word est corrompu ou illisible.",
|
|
DOCX_READ_ERROR: "Erreur lors de la lecture du document Word.",
|
|
DOCX_WRITE_ERROR: "Erreur lors de la creation du document traduit.",
|
|
DOCX_TOO_LARGE: "Le fichier est trop volumineux (max 50 Mo).",
|
|
}
|
|
|
|
def __init__(
|
|
self,
|
|
code: str,
|
|
message: Optional[str] = None,
|
|
details: Optional[Dict[str, Any]] = None,
|
|
):
|
|
self.code = code
|
|
self.message = message or self.ERROR_MESSAGES.get(code, "Erreur inconnue")
|
|
self.details = details or {}
|
|
super().__init__(self.message)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert error to dictionary format for API responses."""
|
|
result = {"error": self.code, "message": self.message}
|
|
if self.details:
|
|
result["details"] = self.details
|
|
return result
|
|
|
|
|
|
class WordTranslator:
|
|
"""
|
|
Handles translation of Word documents with strict formatting preservation.
|
|
|
|
Uses the new TranslationProvider interface for improved error handling
|
|
and fallback chain support.
|
|
"""
|
|
|
|
MAX_FILE_SIZE_MB = 50
|
|
DOCX_MAGIC_BYTES = b"PK" # .docx files are ZIP archives
|
|
|
|
# Namespace URIs not registered in python-docx's nsmap
|
|
_NS_MC = "http://schemas.openxmlformats.org/markup-compatibility/2006"
|
|
_TAG_ALT_CONTENT = f"{{{_NS_MC}}}AlternateContent"
|
|
|
|
def __init__(self, provider: Optional[TranslationProvider] = None):
|
|
"""
|
|
Initialize WordTranslator.
|
|
|
|
Args:
|
|
provider: TranslationProvider instance for translations.
|
|
If None, will use fallback to legacy translation_service.
|
|
"""
|
|
self._provider = provider
|
|
self._custom_prompt: Optional[str] = None
|
|
|
|
def set_provider(self, provider: TranslationProvider) -> None:
|
|
"""Set the translation provider."""
|
|
self._provider = provider
|
|
|
|
def set_custom_prompt(self, prompt: Optional[str]) -> None:
|
|
"""Set custom system prompt for LLM providers."""
|
|
self._custom_prompt = prompt
|
|
|
|
def translate_file(
|
|
self,
|
|
input_path: Path,
|
|
output_path: Path,
|
|
target_language: str,
|
|
source_language: str = "auto",
|
|
progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None,
|
|
) -> Path:
|
|
"""
|
|
Translate a Word document while preserving all formatting and structure.
|
|
Uses batch translation for improved performance.
|
|
|
|
Args:
|
|
input_path: Path to input Word file
|
|
output_path: Path for translated output file
|
|
target_language: Target language code (e.g., 'fr', 'en')
|
|
source_language: Source language code (default: auto-detect)
|
|
progress_callback: Optional callback for progress updates
|
|
Receives dict with: element, total_elements, runs_translated
|
|
|
|
Returns:
|
|
Path to translated file
|
|
|
|
Raises:
|
|
WordProcessorError: If file is invalid, corrupted, or processing fails
|
|
"""
|
|
start_time = time.time()
|
|
|
|
input_path = Path(input_path)
|
|
output_path = Path(output_path)
|
|
|
|
self._validate_file(input_path)
|
|
|
|
try:
|
|
document = Document(input_path)
|
|
except Exception as e:
|
|
raise WordProcessorError(
|
|
code=WordProcessorError.DOCX_CORRUPTED,
|
|
details={"file_name": input_path.name, "error": str(e)},
|
|
)
|
|
|
|
try:
|
|
runs_translated = 0
|
|
|
|
text_elements: List[Tuple[str, Callable[[str], None]]] = []
|
|
chart_translations: List[Dict[str, Any]] = []
|
|
diagram_translations: List[Dict[str, Any]] = []
|
|
|
|
self._collect_from_body(document, text_elements)
|
|
|
|
# Collect chart text from ZIP (chart titles, axis labels, series names)
|
|
self._collect_charts_from_zip(input_path, text_elements, chart_translations)
|
|
|
|
# Collect SmartArt/diagram text from ZIP
|
|
self._collect_diagrams_from_zip(input_path, text_elements, diagram_translations)
|
|
|
|
total_sections = len(document.sections)
|
|
total_elements = 0
|
|
for section_idx, section in enumerate(document.sections):
|
|
self._collect_from_section(section, text_elements)
|
|
total_elements = len(text_elements)
|
|
|
|
if progress_callback:
|
|
progress_callback(
|
|
{
|
|
"current": section_idx + 1,
|
|
"total": total_sections,
|
|
"paragraph": section_idx + 1,
|
|
"total_paragraphs": total_sections,
|
|
"runs_translated": runs_translated,
|
|
"phase": "collecting",
|
|
}
|
|
)
|
|
|
|
if text_elements:
|
|
texts = [elem[0] for elem in text_elements]
|
|
total_elements = len(text_elements)
|
|
_log_info(
|
|
"word_batch_translation_start",
|
|
file_name=input_path.name,
|
|
text_count=len(texts),
|
|
target_lang=target_language,
|
|
)
|
|
|
|
# Split into chunks and translate them IN PARALLEL using a thread
|
|
# pool. Each worker handles one chunk independently, making
|
|
# full use of available CPU/network concurrency. Progress is
|
|
# reported as chunks complete (out-of-order completions are
|
|
# fine — the tracker only moves forward).
|
|
CHUNK_SIZE = 15
|
|
MAX_WORKERS = 6
|
|
chunks = [
|
|
(i, texts[i : i + CHUNK_SIZE])
|
|
for i in range(0, total_elements, CHUNK_SIZE)
|
|
]
|
|
translated_texts: List[str] = [""] * total_elements
|
|
completed_items = [0] # mutable counter shared across threads
|
|
|
|
def _translate_chunk(
|
|
chunk_idx: int, chunk: List[str]
|
|
) -> Tuple[int, List[str]]:
|
|
result = self._batch_translate(chunk, target_language, source_language)
|
|
return chunk_idx, result
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
|
|
future_map = {
|
|
pool.submit(_translate_chunk, idx, chunk): (idx, chunk)
|
|
for idx, chunk in chunks
|
|
}
|
|
for future in concurrent.futures.as_completed(future_map):
|
|
chunk_idx, translated_chunk = future.result()
|
|
start = chunk_idx
|
|
for j, t in enumerate(translated_chunk):
|
|
translated_texts[start + j] = t
|
|
completed_items[0] += len(translated_chunk)
|
|
if progress_callback:
|
|
done = min(completed_items[0], total_elements)
|
|
progress_callback(
|
|
{
|
|
"current": done,
|
|
"total": total_elements,
|
|
"paragraph": done,
|
|
"total_paragraphs": total_elements,
|
|
"runs_translated": runs_translated,
|
|
"phase": "translating",
|
|
}
|
|
)
|
|
|
|
# Apply translations (fast — just text assignment)
|
|
for i, ((original_text, setter), translated) in enumerate(
|
|
zip(text_elements, translated_texts)
|
|
):
|
|
if translated is not None and setter is not None:
|
|
try:
|
|
setter(translated)
|
|
runs_translated += 1
|
|
except Exception as e:
|
|
_log_error(
|
|
"word_setter_error",
|
|
error=str(e),
|
|
index=i,
|
|
)
|
|
|
|
# Apply RTL layout when the target language is written right-to-left.
|
|
if target_language.lower() in RTL_LANGUAGES:
|
|
_apply_rtl_to_document(document)
|
|
|
|
if progress_callback:
|
|
progress_callback(
|
|
{
|
|
"current": total_elements if text_elements else total_sections,
|
|
"total": total_elements if text_elements else total_sections,
|
|
"paragraph": total_sections,
|
|
"total_paragraphs": total_sections,
|
|
"runs_translated": runs_translated,
|
|
"phase": "complete",
|
|
}
|
|
)
|
|
|
|
try:
|
|
document.save(output_path)
|
|
except Exception as e:
|
|
raise WordProcessorError(
|
|
code=WordProcessorError.DOCX_WRITE_ERROR,
|
|
details={"file_name": output_path.name, "error": str(e)},
|
|
)
|
|
|
|
# Re-inject chart translations into the saved .docx ZIP
|
|
if chart_translations:
|
|
self._apply_chart_translations(input_path, output_path, chart_translations)
|
|
|
|
# Re-inject SmartArt/diagram translations into the saved .docx ZIP
|
|
if diagram_translations:
|
|
self._apply_diagram_translations(output_path, diagram_translations)
|
|
|
|
processing_time_ms = round((time.time() - start_time) * 1000, 2)
|
|
|
|
_log_info(
|
|
"word_translation_success",
|
|
file_name=input_path.name,
|
|
runs_translated=runs_translated,
|
|
source_lang=source_language,
|
|
target_lang=target_language,
|
|
processing_time_ms=processing_time_ms,
|
|
)
|
|
|
|
return output_path
|
|
|
|
except WordProcessorError:
|
|
raise
|
|
except Exception as e:
|
|
import traceback
|
|
_log_error(
|
|
"word_translation_unexpected_error",
|
|
file_name=input_path.name,
|
|
error=str(e),
|
|
traceback=traceback.format_exc(),
|
|
)
|
|
raise WordProcessorError(
|
|
code=WordProcessorError.DOCX_READ_ERROR,
|
|
details={"file_name": input_path.name, "error": str(e)},
|
|
)
|
|
|
|
def _validate_file(self, file_path: Path) -> None:
|
|
"""Validate file format and size."""
|
|
if not file_path.exists():
|
|
raise WordProcessorError(
|
|
code=WordProcessorError.DOCX_READ_ERROR,
|
|
message=f"Fichier introuvable: {file_path.name}",
|
|
details={"file_name": file_path.name},
|
|
)
|
|
|
|
if file_path.suffix.lower() != ".docx":
|
|
raise WordProcessorError(
|
|
code=WordProcessorError.INVALID_FORMAT,
|
|
details={
|
|
"file_name": file_path.name,
|
|
"extension": file_path.suffix,
|
|
"expected": ".docx",
|
|
},
|
|
)
|
|
|
|
with open(file_path, "rb") as f:
|
|
header = f.read(4)
|
|
if header[:2] != self.DOCX_MAGIC_BYTES:
|
|
raise WordProcessorError(
|
|
code=WordProcessorError.INVALID_FORMAT,
|
|
details={"file_name": file_path.name, "reason": "Invalid file header"},
|
|
)
|
|
|
|
file_size_mb = file_path.stat().st_size / (1024 * 1024)
|
|
if file_size_mb > self.MAX_FILE_SIZE_MB:
|
|
raise WordProcessorError(
|
|
code=WordProcessorError.DOCX_TOO_LARGE,
|
|
details={
|
|
"file_name": file_path.name,
|
|
"size_mb": round(file_size_mb, 2),
|
|
"max_mb": self.MAX_FILE_SIZE_MB,
|
|
},
|
|
)
|
|
|
|
def _batch_translate(
|
|
self, texts: List[str], target_language: str, source_language: str = "auto"
|
|
) -> List[str]:
|
|
"""
|
|
Batch translate using new provider interface.
|
|
|
|
Args:
|
|
texts: List of texts to translate
|
|
target_language: Target language code
|
|
source_language: Source language code
|
|
|
|
Returns:
|
|
List of translated texts (same order as input)
|
|
"""
|
|
if not texts:
|
|
return []
|
|
|
|
if self._provider is not None:
|
|
return self._translate_with_provider(
|
|
texts, target_language, source_language
|
|
)
|
|
|
|
return self._translate_with_legacy(texts, target_language, source_language)
|
|
|
|
def _translate_with_provider(
|
|
self, texts: List[str], target_language: str, source_language: str
|
|
) -> List[str]:
|
|
"""Translate using the TranslationProvider.translate_batch() interface."""
|
|
translated = self._provider.translate_batch(texts, target_language, source_language)
|
|
# Fallback: keep original text for any empty/failed result
|
|
return [
|
|
t if (t and t.strip()) else orig
|
|
for t, orig in zip(translated, texts)
|
|
]
|
|
|
|
def _translate_with_legacy(
|
|
self, texts: List[str], target_language: str, source_language: str
|
|
) -> List[str]:
|
|
"""Fallback to legacy translation_service for backward compatibility."""
|
|
from services.translation_service import translation_service
|
|
|
|
_log_info(
|
|
"word_using_legacy_service",
|
|
text_count=len(texts),
|
|
target_lang=target_language,
|
|
)
|
|
|
|
return translation_service.translate_batch(
|
|
texts, target_language, source_language
|
|
)
|
|
|
|
def _collect_from_body(
|
|
self, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]]
|
|
) -> None:
|
|
"""Collect all text elements from document body.
|
|
|
|
Handles: paragraphs, tables, SDT (TOC/index), text boxes, shapes,
|
|
AlternateContent blocks, and any nested drawing elements.
|
|
"""
|
|
count_before = len(text_elements)
|
|
|
|
# Pass 1: walk direct body children
|
|
for element in document.element.body:
|
|
self._collect_from_element(element, document, text_elements)
|
|
|
|
pass1_count = len(text_elements) - count_before
|
|
|
|
# Pass 2: find ALL <w:txbxContent> in the entire body XML tree.
|
|
# Text boxes / rectangles / shapes store their text here, nested deep
|
|
# inside <w:drawing> → <a:graphic> → <wps:wsp> → <wps:txbx> or
|
|
# inside <w:pict> → <v:shape> → <v:textbox>.
|
|
self._collect_from_textboxes(document.element.body, document, text_elements)
|
|
|
|
pass2_count = len(text_elements) - count_before - pass1_count
|
|
|
|
# Pass 3: footnotes and endnotes
|
|
self._collect_from_footnotes(document, text_elements)
|
|
self._collect_from_endnotes(document, text_elements)
|
|
|
|
total = len(text_elements) - count_before
|
|
_log_info(
|
|
"word_collection_summary",
|
|
body_runs=pass1_count,
|
|
textbox_runs=pass2_count,
|
|
total_collected=total,
|
|
)
|
|
|
|
def _collect_from_element(
|
|
self, element, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]]
|
|
) -> None:
|
|
"""Recursively collect from any element type."""
|
|
if isinstance(element, CT_P):
|
|
paragraph = Paragraph(element, document)
|
|
self._collect_from_paragraph(paragraph, text_elements)
|
|
elif isinstance(element, CT_Tbl):
|
|
table = Table(element, document)
|
|
self._collect_from_table(table, text_elements)
|
|
elif element.tag == qn("w:sdt"):
|
|
self._collect_from_sdt(element, document, text_elements)
|
|
elif element.tag == self._TAG_ALT_CONTENT:
|
|
# <mc:AlternateContent> wraps drawing/shape content
|
|
for part in element:
|
|
self._collect_from_element(part, document, text_elements)
|
|
else:
|
|
# For any other container element, recurse into children
|
|
# to catch paragraphs nested in unexpected wrappers
|
|
for child in element:
|
|
if isinstance(child, CT_P):
|
|
paragraph = Paragraph(child, document)
|
|
self._collect_from_paragraph(paragraph, text_elements)
|
|
elif isinstance(child, CT_Tbl):
|
|
table = Table(child, document)
|
|
self._collect_from_table(table, text_elements)
|
|
|
|
def _collect_from_textboxes(
|
|
self, root, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]]
|
|
) -> None:
|
|
"""Find and collect text from ALL <w:txbxContent> elements in the XML tree.
|
|
|
|
This catches text in:
|
|
- Rectangles / rounded rectangles / any shape with text
|
|
- Text boxes
|
|
- Callouts
|
|
- WordArt (if it has text content)
|
|
- Shapes nested in <mc:AlternateContent> blocks
|
|
|
|
The <w:txbxContent> element contains regular <w:p> paragraphs
|
|
with <w:r> runs, just like normal body text.
|
|
"""
|
|
# Find all w:txbxContent elements anywhere in the tree
|
|
for txbx in root.iter(qn("w:txbxContent")):
|
|
for child in txbx:
|
|
if isinstance(child, CT_P):
|
|
paragraph = Paragraph(child, document)
|
|
self._collect_from_paragraph(paragraph, text_elements)
|
|
elif isinstance(child, CT_Tbl):
|
|
table = Table(child, document)
|
|
self._collect_from_table(table, text_elements)
|
|
|
|
def _collect_from_sdt(
|
|
self, sdt_element, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]]
|
|
) -> None:
|
|
"""Collect text from Structured Document Tags (TOC, index, content controls).
|
|
|
|
SDT XML structure:
|
|
<w:sdt>
|
|
<w:sdtPr>...</w:sdtPr>
|
|
<w:sdtContent>
|
|
<w:p>...</w:p> <!-- paragraphs -->
|
|
<w:tbl>...</w:tbl> <!-- tables -->
|
|
</w:sdtContent>
|
|
</w:sdt>
|
|
"""
|
|
sdt_content = sdt_element.find(qn("w:sdtContent"))
|
|
if sdt_content is None:
|
|
return
|
|
|
|
for child in sdt_content:
|
|
if isinstance(child, CT_P):
|
|
paragraph = Paragraph(child, document)
|
|
self._collect_from_paragraph(paragraph, text_elements)
|
|
elif isinstance(child, CT_Tbl):
|
|
table = Table(child, document)
|
|
self._collect_from_table(table, text_elements)
|
|
|
|
def _collect_from_footnotes(
|
|
self, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]]
|
|
) -> None:
|
|
"""Collect text from footnotes."""
|
|
try:
|
|
footnotes_part = document.part.package.part_related_by(
|
|
"http://schemas.openxmlformats.org/officeDocument/2006/relationships/footnotes"
|
|
) if hasattr(document.part, 'package') else None
|
|
except Exception:
|
|
footnotes_part = None
|
|
|
|
if footnotes_part is None:
|
|
# Fallback: try direct XML access
|
|
try:
|
|
footnotes_element = document.element.find(qn("w:footnotes"))
|
|
if footnotes_element is not None:
|
|
for child in footnotes_element:
|
|
if isinstance(child, CT_P):
|
|
paragraph = Paragraph(child, document)
|
|
self._collect_from_paragraph(paragraph, text_elements)
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
try:
|
|
footnotes_xml = etree.fromstring(footnotes_part.blob)
|
|
for child in footnotes_xml:
|
|
if child.tag == qn("w:footnote"):
|
|
for para_elem in child.findall(qn("w:p")):
|
|
paragraph = Paragraph(para_elem, document)
|
|
self._collect_from_paragraph(paragraph, text_elements)
|
|
except Exception as e:
|
|
_log_error("word_footnotes_parse_error", error=str(e))
|
|
|
|
def _collect_from_endnotes(
|
|
self, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]]
|
|
) -> None:
|
|
"""Collect text from endnotes."""
|
|
try:
|
|
endnotes_part = document.part.package.part_related_by(
|
|
"http://schemas.openxmlformats.org/officeDocument/2006/relationships/endnotes"
|
|
) if hasattr(document.part, 'package') else None
|
|
except Exception:
|
|
endnotes_part = None
|
|
|
|
if endnotes_part is None:
|
|
try:
|
|
endnotes_element = document.element.find(qn("w:endnotes"))
|
|
if endnotes_element is not None:
|
|
for child in endnotes_element:
|
|
if isinstance(child, CT_P):
|
|
paragraph = Paragraph(child, document)
|
|
self._collect_from_paragraph(paragraph, text_elements)
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
try:
|
|
endnotes_xml = etree.fromstring(endnotes_part.blob)
|
|
for child in endnotes_xml:
|
|
if child.tag == qn("w:endnote"):
|
|
for para_elem in child.findall(qn("w:p")):
|
|
paragraph = Paragraph(para_elem, document)
|
|
self._collect_from_paragraph(paragraph, text_elements)
|
|
except Exception as e:
|
|
_log_error("word_endnotes_parse_error", error=str(e))
|
|
|
|
def _collect_from_charts(
|
|
self, document: Document, text_elements: List[Tuple[str, Callable[[str], None]]]
|
|
) -> None:
|
|
"""Collect text from embedded charts (chart titles, axis labels, series names).
|
|
|
|
Charts are stored as separate XML parts in the .docx ZIP archive.
|
|
The chart XML uses DrawingML namespaces for text content.
|
|
"""
|
|
_NS_C = "http://schemas.openxmlformats.org/drawingml/2006/chart"
|
|
_NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main"
|
|
|
|
try:
|
|
# Access the raw ZIP to find chart parts
|
|
docx_path = document.part.package.main_document_part.partname
|
|
package = document.part.package
|
|
|
|
# Find all chart relationship targets
|
|
for rel_type, rels in (package.rels or {}).items():
|
|
pass # python-docx doesn't expose this cleanly
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
# More reliable: open the .docx as a ZIP and parse chart XML directly
|
|
try:
|
|
# Get the original file path from the document
|
|
input_file = None
|
|
# Try to recover the file path — document object doesn't store it directly
|
|
# We'll handle charts in translate_file() instead where we have the path
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
def _collect_charts_from_zip(
|
|
self, input_path: Path, text_elements: List[Tuple[str, Callable[[str], None]]],
|
|
chart_translations: List[Dict[str, Any]]
|
|
) -> None:
|
|
"""Parse chart XML from the .docx ZIP and collect translatable text.
|
|
|
|
Args:
|
|
input_path: Path to the .docx file
|
|
text_elements: List to append (text, setter) tuples
|
|
chart_translations: List to store chart translation metadata for later re-injection
|
|
"""
|
|
_NS_C = "http://schemas.openxmlformats.org/drawingml/2006/chart"
|
|
_NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main"
|
|
|
|
try:
|
|
with zipfile.ZipFile(input_path, 'r') as zf:
|
|
chart_files = [name for name in zf.namelist() if name.startswith('word/charts/') and name.endswith('.xml')]
|
|
|
|
for chart_file in chart_files:
|
|
try:
|
|
chart_xml = etree.fromstring(zf.read(chart_file))
|
|
|
|
# Collect from <c:title><c:tx><a:rich> or <c:tx><a:strRef>
|
|
for tag in ['c:title', 'c:cat', 'c:val']:
|
|
for parent_elem in chart_xml.iter(f'{{{ _NS_C }}}{tag}' if not tag.startswith('{') else tag):
|
|
# Direct rich text: <a:rich><a:p><a:r><a:t>
|
|
for t_elem in parent_elem.iter(f'{{{_NS_A}}}t'):
|
|
if t_elem.text and t_elem.text.strip():
|
|
# Store reference for setter
|
|
entry = {
|
|
'chart_file': chart_file,
|
|
'element_path': self._get_element_path(t_elem),
|
|
'original': t_elem.text.strip(),
|
|
}
|
|
chart_translations.append(entry)
|
|
|
|
def make_chart_setter(entries, idx):
|
|
def setter(text):
|
|
entries[idx]['translated'] = text.strip()
|
|
return setter
|
|
|
|
text_elements.append(
|
|
(t_elem.text.strip(), make_chart_setter(chart_translations, len(chart_translations) - 1))
|
|
)
|
|
|
|
# Series names in <c:ser><c:tx><c:strRef><c:f> or <c:v>
|
|
for ser_elem in chart_xml.iter(f'{{{_NS_C}}}ser'):
|
|
for v_elem in ser_elem.iter(f'{{{_NS_C}}}v'):
|
|
if v_elem.text and v_elem.text.strip() and not v_elem.text.strip().replace('.', '').replace('-', '').isdigit():
|
|
entry = {
|
|
'chart_file': chart_file,
|
|
'element_path': self._get_element_path(v_elem),
|
|
'original': v_elem.text.strip(),
|
|
}
|
|
chart_translations.append(entry)
|
|
|
|
def make_chart_val_setter(entries, idx):
|
|
def setter(text):
|
|
entries[idx]['translated'] = text.strip()
|
|
return setter
|
|
|
|
text_elements.append(
|
|
(v_elem.text.strip(), make_chart_val_setter(chart_translations, len(chart_translations) - 1))
|
|
)
|
|
|
|
except Exception as e:
|
|
_log_error("word_chart_parse_error", chart_file=chart_file, error=str(e))
|
|
|
|
except Exception as e:
|
|
_log_error("word_charts_zip_error", error=str(e))
|
|
|
|
def _get_element_path(self, element) -> str:
|
|
"""Get a unique XPath-like path for an element within its document."""
|
|
path_parts = []
|
|
current = element
|
|
while current is not None:
|
|
parent = current.getparent()
|
|
if parent is None:
|
|
break
|
|
idx = list(parent).index(current)
|
|
tag = current.tag.split('}')[-1] if '}' in current.tag else current.tag
|
|
path_parts.append(f"{tag}[{idx}]")
|
|
current = parent
|
|
return '/'.join(reversed(path_parts))
|
|
|
|
def _apply_chart_translations(self, input_path: Path, output_path: Path, chart_translations: List[Dict[str, Any]]) -> None:
|
|
"""Re-inject chart translations into the .docx ZIP.
|
|
|
|
Modifies chart XML files in-place and rewrites the ZIP.
|
|
"""
|
|
if not chart_translations:
|
|
return
|
|
|
|
# Only proceed if at least one translation exists
|
|
translated_entries = [e for e in chart_translations if 'translated' in e and e['translated']]
|
|
if not translated_entries:
|
|
return
|
|
|
|
_NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main"
|
|
_NS_C = "http://schemas.openxmlformats.org/drawingml/2006/chart"
|
|
|
|
# Group by chart file
|
|
chart_files_to_update: Dict[str, List[Dict]] = {}
|
|
for entry in translated_entries:
|
|
cf = entry['chart_file']
|
|
if cf not in chart_files_to_update:
|
|
chart_files_to_update[cf] = []
|
|
chart_files_to_update[cf].append(entry)
|
|
|
|
try:
|
|
# Read all ZIP entries
|
|
with zipfile.ZipFile(output_path, 'r') as zf_in:
|
|
existing_entries = zf_in.namelist()
|
|
|
|
# Create new ZIP in memory
|
|
buf = io.BytesIO()
|
|
with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf_out:
|
|
for item in existing_entries:
|
|
data = zf_in.read(item)
|
|
|
|
if item in chart_files_to_update:
|
|
# Parse, update, re-serialize this chart XML
|
|
try:
|
|
chart_xml = etree.fromstring(data)
|
|
|
|
for entry in chart_files_to_update[item]:
|
|
# Find all <a:t> or <c:v> elements and match by original text
|
|
tag_to_find = f'{{{_NS_A}}}t'
|
|
# Try both a:t and c:v
|
|
for t_elem in chart_xml.iter(tag_to_find):
|
|
if t_elem.text and t_elem.text.strip() == entry['original']:
|
|
t_elem.text = entry['translated']
|
|
break
|
|
else:
|
|
for t_elem in chart_xml.iter(f'{{{_NS_C}}}v'):
|
|
if t_elem.text and t_elem.text.strip() == entry['original']:
|
|
t_elem.text = entry['translated']
|
|
break
|
|
|
|
data = etree.tostring(chart_xml, xml_declaration=True, encoding='UTF-8', standalone=True)
|
|
except Exception as e:
|
|
_log_error("word_chart_update_error", chart_file=item, error=str(e))
|
|
|
|
zf_out.writestr(item, data)
|
|
|
|
# Replace the output file with the updated ZIP
|
|
with open(output_path, 'wb') as f:
|
|
f.write(buf.getvalue())
|
|
|
|
_log_info("word_charts_translated", chart_files=len(chart_files_to_update), translations=len(translated_entries))
|
|
|
|
except Exception as e:
|
|
_log_error("word_chart_zip_rewrite_error", error=str(e))
|
|
|
|
# ------------------------------------------------------------------
|
|
# SmartArt / Diagram support
|
|
# ------------------------------------------------------------------
|
|
_NS_DGM = "http://schemas.openxmlformats.org/drawingml/2006/diagram"
|
|
_NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main"
|
|
|
|
def _collect_diagrams_from_zip(
|
|
self,
|
|
input_path: Path,
|
|
text_elements: List[Tuple[str, Callable[[str], None]]],
|
|
diagram_translations: List[Dict[str, Any]],
|
|
) -> None:
|
|
"""Parse SmartArt diagram XML from the .docx ZIP and collect translatable text.
|
|
|
|
SmartArt text lives in ``word/diagrams/data*.xml`` inside the ZIP.
|
|
Each diagram data file contains ``<dgm:pt>`` elements with ``<a:t>``
|
|
text nodes.
|
|
"""
|
|
_TAG_A_T = f"{{{self._NS_A}}}t"
|
|
|
|
try:
|
|
with zipfile.ZipFile(input_path, 'r') as zf:
|
|
diag_files = [
|
|
n for n in zf.namelist()
|
|
if n.startswith('word/diagrams/data') and n.endswith('.xml')
|
|
]
|
|
|
|
for diag_file in diag_files:
|
|
try:
|
|
diag_xml = etree.fromstring(zf.read(diag_file))
|
|
|
|
for t_elem in diag_xml.iter(_TAG_A_T):
|
|
if t_elem.text and t_elem.text.strip():
|
|
original = t_elem.text.strip()
|
|
|
|
# Skip numeric-only or very short tokens
|
|
if original.replace('.', '').replace('-', '').replace(',', '').isdigit():
|
|
continue
|
|
if len(original) <= 1:
|
|
continue
|
|
|
|
entry: Dict[str, Any] = {
|
|
'diag_file': diag_file,
|
|
'element_path': self._get_element_path(t_elem),
|
|
'original': original,
|
|
}
|
|
diagram_translations.append(entry)
|
|
|
|
def _make_diag_setter(
|
|
entries: List[Dict[str, Any]], idx: int
|
|
):
|
|
def setter(text: str) -> None:
|
|
entries[idx]['translated'] = text.strip()
|
|
return setter
|
|
|
|
text_elements.append(
|
|
(original, _make_diag_setter(diagram_translations, len(diagram_translations) - 1))
|
|
)
|
|
|
|
except Exception as e:
|
|
_log_error("word_diagram_parse_error", diag_file=diag_file, error=str(e))
|
|
|
|
if diagram_translations:
|
|
_log_info(
|
|
"word_diagram_collection",
|
|
diagram_files=len(diag_files),
|
|
text_count=len(diagram_translations),
|
|
)
|
|
|
|
except Exception as e:
|
|
_log_error("word_diagrams_zip_error", error=str(e))
|
|
|
|
def _apply_diagram_translations(
|
|
self,
|
|
output_path: Path,
|
|
diagram_translations: List[Dict[str, Any]],
|
|
) -> None:
|
|
"""Re-inject SmartArt/diagram translations into the .docx ZIP.
|
|
|
|
Modifies diagram data XML files in-place and rewrites the ZIP.
|
|
"""
|
|
if not diagram_translations:
|
|
return
|
|
|
|
translated_entries = [e for e in diagram_translations if 'translated' in e and e['translated']]
|
|
if not translated_entries:
|
|
return
|
|
|
|
_TAG_A_T = f"{{{self._NS_A}}}t"
|
|
|
|
# Group by diagram file
|
|
diag_files_to_update: Dict[str, List[Dict]] = {}
|
|
for entry in translated_entries:
|
|
df = entry['diag_file']
|
|
if df not in diag_files_to_update:
|
|
diag_files_to_update[df] = []
|
|
diag_files_to_update[df].append(entry)
|
|
|
|
try:
|
|
with zipfile.ZipFile(output_path, 'r') as zf_in:
|
|
existing_entries = zf_in.namelist()
|
|
|
|
buf = io.BytesIO()
|
|
with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf_out:
|
|
for item in existing_entries:
|
|
data = zf_in.read(item)
|
|
|
|
if item in diag_files_to_update:
|
|
try:
|
|
diag_xml = etree.fromstring(data)
|
|
|
|
for entry in diag_files_to_update[item]:
|
|
for t_elem in diag_xml.iter(_TAG_A_T):
|
|
if t_elem.text and t_elem.text.strip() == entry['original']:
|
|
t_elem.text = entry['translated']
|
|
break
|
|
|
|
data = etree.tostring(diag_xml, xml_declaration=True, encoding='UTF-8', standalone=True)
|
|
except Exception as e:
|
|
_log_error("word_diagram_update_error", diag_file=item, error=str(e))
|
|
|
|
zf_out.writestr(item, data)
|
|
|
|
with open(output_path, 'wb') as f:
|
|
f.write(buf.getvalue())
|
|
|
|
_log_info(
|
|
"word_diagrams_translated",
|
|
diagram_files=len(diag_files_to_update),
|
|
translations=len(translated_entries),
|
|
)
|
|
|
|
except Exception as e:
|
|
_log_error("word_diagram_zip_rewrite_error", error=str(e))
|
|
|
|
def _collect_from_paragraph(
|
|
self,
|
|
paragraph: Paragraph,
|
|
text_elements: List[Tuple[str, Callable[[str], None]]],
|
|
) -> None:
|
|
"""Collect text from paragraph runs, preserving inter-run whitespace.
|
|
|
|
Each run is sent for translation WITHOUT its surrounding whitespace.
|
|
The whitespace is captured and reapplied after translation so that words
|
|
at formatting boundaries (e.g. bold/normal) do not get concatenated.
|
|
|
|
Handles runs both as direct children of <w:p> AND inside <w:hyperlink>
|
|
elements (used for TOC entries, cross-references, and bookmarks links).
|
|
"""
|
|
# Check full paragraph text including nested content (hyperlinks, etc.)
|
|
full_text = ''.join(
|
|
t.text or '' for t in paragraph._p.iter(qn('w:t'))
|
|
).strip()
|
|
if not full_text:
|
|
return
|
|
|
|
# Collect from direct child runs
|
|
for run in paragraph.runs:
|
|
if run.text and run.text.strip():
|
|
self._append_run_translation(run, text_elements)
|
|
|
|
# Collect from runs inside <w:hyperlink> elements
|
|
# (TOC entries, cross-references — python-docx's paragraph.runs skips these)
|
|
for hl in paragraph._p.iter(qn('w:hyperlink')):
|
|
for r_elem in hl.findall(qn('w:r')):
|
|
run = Run(r_elem, paragraph)
|
|
if run.text and run.text.strip():
|
|
self._append_run_translation(run, text_elements)
|
|
|
|
def _append_run_translation(
|
|
self,
|
|
run,
|
|
text_elements: List[Tuple[str, Callable[[str], None]]],
|
|
) -> None:
|
|
"""Extract translatable text from a Run and append a (text, setter) tuple."""
|
|
original = run.text
|
|
# Capture leading/trailing whitespace that must survive translation.
|
|
leading = original[: len(original) - len(original.lstrip())]
|
|
trailing = original[len(original.rstrip()) :]
|
|
stripped = original.strip()
|
|
|
|
def make_setter(r, lead: str, trail: str):
|
|
def setter(text: str) -> None:
|
|
# Strip any whitespace the translator may have added/removed
|
|
# and reapply the original boundary whitespace.
|
|
r.text = lead + text.strip() + trail
|
|
|
|
return setter
|
|
|
|
text_elements.append((stripped, make_setter(run, leading, trailing)))
|
|
|
|
def _collect_from_table(
|
|
self, table: Table, text_elements: List[Tuple[str, Callable[[str], None]]]
|
|
) -> None:
|
|
"""Collect text from table cells."""
|
|
for row in table.rows:
|
|
for cell in row.cells:
|
|
for paragraph in cell.paragraphs:
|
|
self._collect_from_paragraph(paragraph, text_elements)
|
|
for nested_table in cell.tables:
|
|
self._collect_from_table(nested_table, text_elements)
|
|
|
|
def _collect_from_section(
|
|
self, section: Section, text_elements: List[Tuple[str, Callable[[str], None]]]
|
|
) -> None:
|
|
"""Collect text from headers and footers."""
|
|
headers_footers = [
|
|
section.header,
|
|
section.footer,
|
|
section.first_page_header,
|
|
section.first_page_footer,
|
|
section.even_page_header,
|
|
section.even_page_footer,
|
|
]
|
|
|
|
for hf in headers_footers:
|
|
if hf:
|
|
for paragraph in hf.paragraphs:
|
|
self._collect_from_paragraph(paragraph, text_elements)
|
|
for table in hf.tables:
|
|
self._collect_from_table(table, text_elements)
|
|
|
|
|
|
word_translator = WordTranslator()
|