Files
office_translator/translators/pptx_translator.py
sepehr c1ea65f10f
All checks were successful
Deploy to Production / Build and Deploy (push) Successful in 2m12s
feat(translate): refonte du design de la page de traduction et du sélecteur de moteurs (Etapes 1-3)
2026-05-31 10:14:23 +02:00

717 lines
28 KiB
Python

"""
PowerPoint Translation Module
Translates PowerPoint files while preserving all layouts, animations, and media
OPTIMIZED: Uses batch translation for 5-10x faster processing
Updated to use new TranslationProvider interface with structured error handling.
"""
import time
import zipfile
import io
import concurrent.futures
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Callable, Any
from lxml import etree
from pptx import Presentation
from pptx.shapes.base import BaseShape
from pptx.shapes.group import GroupShape
from pptx.enum.shapes import MSO_SHAPE_TYPE
from services.providers.base import TranslationProvider
# DrawingML namespace used by pptx XML
_NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main"
# Languages written right-to-left
RTL_LANGUAGES: frozenset = frozenset(
{"ar", "he", "fa", "ur", "ku", "ps", "ug", "sd", "yi", "dv", "ckb"}
)
from core.logging import get_logger
logger = get_logger(__name__)
_HAS_STRUCTLOG = True
def _log_info(event: str, **kwargs):
"""Log info with structlog or standard logging compatibility."""
if _HAS_STRUCTLOG:
logger.info(event, **kwargs)
else:
msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items())
logger.info(msg)
def _log_error(event: str, **kwargs):
"""Log error with structlog or standard logging compatibility."""
if _HAS_STRUCTLOG:
logger.error(event, **kwargs)
else:
msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items())
logger.error(msg)
def _set_pptx_paragraph_rtl(paragraph) -> None:
"""
Enable RTL mode on a PowerPoint paragraph.
Sets rtl="1" and algn="r" on the <a:pPr> element, which controls
both text direction and horizontal alignment in DrawingML.
"""
p_elem = paragraph._p
tag_pPr = f"{{{_NS_A}}}pPr"
pPr = p_elem.find(tag_pPr)
if pPr is None:
pPr = etree.Element(tag_pPr)
p_elem.insert(0, pPr)
pPr.set("rtl", "1")
pPr.set("algn", "r")
def _apply_rtl_to_presentation(presentation: Presentation) -> None:
"""Apply RTL direction to every paragraph in all slides."""
for slide in presentation.slides:
for shape in slide.shapes:
_apply_rtl_to_shape(shape)
def _apply_rtl_to_shape(shape) -> None:
"""Recursively apply RTL to a shape (handles groups and tables)."""
if shape.has_text_frame:
for paragraph in shape.text_frame.paragraphs:
_set_pptx_paragraph_rtl(paragraph)
if shape.shape_type == MSO_SHAPE_TYPE.TABLE:
for row in shape.table.rows:
for cell in row.cells:
for paragraph in cell.text_frame.paragraphs:
_set_pptx_paragraph_rtl(paragraph)
if shape.shape_type == MSO_SHAPE_TYPE.GROUP:
for sub_shape in shape.shapes:
_apply_rtl_to_shape(sub_shape)
class PptxProcessorError(Exception):
"""Exception for PowerPoint processing errors with structured error codes."""
INVALID_FORMAT = "INVALID_FORMAT"
PPTX_CORRUPTED = "PPTX_CORRUPTED"
PPTX_READ_ERROR = "PPTX_READ_ERROR"
PPTX_WRITE_ERROR = "PPTX_WRITE_ERROR"
PPTX_TOO_LARGE = "PPTX_TOO_LARGE"
ERROR_MESSAGES = {
INVALID_FORMAT: "Format de fichier non supporte. Utilisez .pptx.",
PPTX_CORRUPTED: "Le fichier PowerPoint est corrompu ou illisible.",
PPTX_READ_ERROR: "Erreur lors de la lecture du fichier PowerPoint.",
PPTX_WRITE_ERROR: "Erreur lors de la creation du fichier traduit.",
PPTX_TOO_LARGE: "Le fichier est trop volumineux (max 50 Mo).",
}
def __init__(
self,
code: str,
message: Optional[str] = None,
details: Optional[Dict[str, Any]] = None,
):
self.code = code
self.message = message or self.ERROR_MESSAGES.get(code, "Erreur inconnue")
self.details = details or {}
super().__init__(self.message)
def to_dict(self) -> Dict[str, Any]:
"""Convert error to dictionary format for API responses."""
result = {"error": self.code, "message": self.message}
if self.details:
result["details"] = self.details
return result
class PowerPointTranslator:
"""
Handles translation of PowerPoint presentations with strict formatting preservation.
Uses the new TranslationProvider interface for improved error handling
and fallback chain support.
"""
MAX_FILE_SIZE_MB = 50
PPTX_MAGIC_BYTES = b"PK" # .pptx files are ZIP archives
def __init__(self, provider: Optional[TranslationProvider] = None):
"""
Initialize PowerPointTranslator.
Args:
provider: TranslationProvider instance for translations.
If None, will use fallback to legacy translation_service.
"""
self._provider = provider
self._custom_prompt: Optional[str] = None
self._translation_stats = {"attempted": 0, "changed": 0}
def set_provider(self, provider: TranslationProvider) -> None:
"""Set the translation provider."""
self._provider = provider
def set_custom_prompt(self, prompt: Optional[str]) -> None:
"""Set custom system prompt for LLM providers."""
self._custom_prompt = prompt
def translate_file(
self,
input_path: Path,
output_path: Path,
target_language: str,
source_language: str = "auto",
progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None,
translate_images: bool = False,
) -> Path:
"""
Translate a PowerPoint presentation while preserving all formatting.
Uses batch translation for improved performance.
Args:
input_path: Path to input PowerPoint file
output_path: Path for translated output file
target_language: Target language code (e.g., 'fr', 'en')
source_language: Source language code (default: auto-detect)
progress_callback: Optional callback for progress updates
Receives dict with: slide, total_slides, runs_translated
Returns:
Path to translated file
Raises:
PptxProcessorError: If file is invalid, corrupted, or processing fails
"""
start_time = time.time()
input_path = Path(input_path)
output_path = Path(output_path)
self._validate_file(input_path)
try:
presentation = Presentation(input_path)
except Exception as e:
raise PptxProcessorError(
code=PptxProcessorError.PPTX_CORRUPTED,
details={"file_name": input_path.name, "error": str(e)},
)
try:
runs_translated = 0
total_slides = len(presentation.slides)
if progress_callback:
progress_callback(
{
"current": 0,
"total": total_slides,
"slide": 0,
"total_slides": total_slides,
"runs_translated": 0,
}
)
text_elements: List[Tuple[str, Callable[[str], None]]] = []
for slide_idx, slide in enumerate(presentation.slides):
if slide.has_notes_slide and slide.notes_slide.notes_text_frame:
self._collect_from_text_frame(
slide.notes_slide.notes_text_frame, text_elements
)
for shape in slide.shapes:
self._collect_from_shape(shape, text_elements)
if progress_callback:
progress_callback(
{
"current": slide_idx + 1,
"total": total_slides,
"slide": slide_idx + 1,
"total_slides": total_slides,
"runs_translated": runs_translated,
}
)
if text_elements:
texts = [elem[0] for elem in text_elements]
total_elements = len(texts)
_log_info(
"pptx_batch_translation_start",
file_name=input_path.name,
text_count=total_elements,
target_lang=target_language,
)
# Parallel chunk translation with real-time progress.
CHUNK_SIZE = 15
MAX_WORKERS = 6
chunks = [
(i, texts[i : i + CHUNK_SIZE])
for i in range(0, total_elements, CHUNK_SIZE)
]
translated_texts: List[str] = [""] * total_elements
completed_items = [0]
def _translate_chunk(
chunk_idx: int, chunk: List[str]
) -> Tuple[int, List[str]]:
return chunk_idx, self._batch_translate(
chunk, target_language, source_language
)
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
future_map = {
pool.submit(_translate_chunk, idx, chunk): (idx, chunk)
for idx, chunk in chunks
}
for future in concurrent.futures.as_completed(future_map):
chunk_idx, translated_chunk = future.result()
for j, t in enumerate(translated_chunk):
translated_texts[chunk_idx + j] = t
completed_items[0] += len(translated_chunk)
if progress_callback:
done = min(completed_items[0], total_elements)
progress_callback(
{
"current": done,
"total": total_elements,
"slide": done,
"total_slides": total_elements,
"runs_translated": runs_translated,
}
)
# Apply translations
for i, ((original_text, setter), translated) in enumerate(
zip(text_elements, translated_texts)
):
if translated is not None and setter is not None:
try:
setter(translated)
runs_translated += 1
except Exception as e:
_log_error(
"pptx_setter_error",
error=str(e),
index=i,
)
# Apply RTL layout when the target language is written right-to-left.
if target_language.lower() in RTL_LANGUAGES:
_apply_rtl_to_presentation(presentation)
if translate_images:
try:
self._translate_images(presentation, target_language)
except Exception as e:
_log_error("pptx_document_images_failed", error=str(e))
try:
presentation.save(output_path)
except Exception as e:
raise PptxProcessorError(
code=PptxProcessorError.PPTX_WRITE_ERROR,
details={"file_name": output_path.name, "error": str(e)},
)
# Re-inject chart translations into chart XML parts
self._apply_chart_translations(output_path)
processing_time_ms = round((time.time() - start_time) * 1000, 2)
_log_info(
"pptx_translation_success",
file_name=input_path.name,
slides_count=total_slides,
runs_translated=runs_translated,
source_lang=source_language,
target_lang=target_language,
processing_time_ms=processing_time_ms,
)
return output_path
except PptxProcessorError:
raise
except Exception as e:
raise PptxProcessorError(
code=PptxProcessorError.PPTX_READ_ERROR,
details={"file_name": input_path.name, "error": str(e)},
)
def _validate_file(self, file_path: Path) -> None:
"""Validate file format and size."""
if not file_path.exists():
raise PptxProcessorError(
code=PptxProcessorError.PPTX_READ_ERROR,
message=f"Fichier introuvable: {file_path.name}",
details={"file_name": file_path.name},
)
if file_path.suffix.lower() != ".pptx":
raise PptxProcessorError(
code=PptxProcessorError.INVALID_FORMAT,
details={
"file_name": file_path.name,
"extension": file_path.suffix,
"expected": ".pptx",
},
)
with open(file_path, "rb") as f:
header = f.read(4)
if header[:2] != self.PPTX_MAGIC_BYTES:
raise PptxProcessorError(
code=PptxProcessorError.INVALID_FORMAT,
details={"file_name": file_path.name, "reason": "Invalid file header"},
)
file_size_mb = file_path.stat().st_size / (1024 * 1024)
if file_size_mb > self.MAX_FILE_SIZE_MB:
raise PptxProcessorError(
code=PptxProcessorError.PPTX_TOO_LARGE,
details={
"file_name": file_path.name,
"size_mb": round(file_size_mb, 2),
"max_mb": self.MAX_FILE_SIZE_MB,
},
)
def _batch_translate(
self, texts: List[str], target_language: str, source_language: str = "auto"
) -> List[str]:
if not texts:
return []
non_empty = [t for t in texts if t and t.strip()]
self._translation_stats["attempted"] += len(non_empty)
if self._provider is not None:
translated = self._translate_with_provider(
texts, target_language, source_language
)
else:
translated = self._translate_with_legacy(texts, target_language, source_language)
changed = sum(1 for orig, trans in zip(texts, translated) if orig != trans and trans.strip())
self._translation_stats["changed"] += changed
return translated
def get_translation_stats(self) -> dict:
return dict(self._translation_stats)
def _translate_with_provider(
self, texts: List[str], target_language: str, source_language: str
) -> List[str]:
"""Translate using the TranslationProvider.translate_batch() interface."""
from services.providers.base import TranslationProvider as NewTranslationProvider
is_new_style = False
if isinstance(self._provider, NewTranslationProvider):
is_new_style = True
elif hasattr(self._provider, "__class__") and self._provider.__class__.__name__ in (
"MockTranslationProvider",
"Mock",
"MagicMock",
):
is_new_style = True
if is_new_style:
from services.providers.schemas import TranslationRequest
custom_prompt = getattr(self, "_custom_prompt", None)
metadata = {"custom_prompt": custom_prompt} if custom_prompt else None
requests = [
TranslationRequest(
text=t,
target_language=target_language,
source_language=source_language,
metadata=metadata,
)
for t in texts
]
responses = self._provider.translate_batch(requests)
translated = [resp.translated_text for resp in responses]
else:
translated = self._provider.translate_batch(texts, target_language, source_language)
return [
t if (t and t.strip()) else orig
for t, orig in zip(translated, texts)
]
def _translate_with_legacy(
self, texts: List[str], target_language: str, source_language: str
) -> List[str]:
"""Fallback to legacy translation_service for backward compatibility."""
from services.translation_service import translation_service
_log_info(
"pptx_using_legacy_service",
text_count=len(texts),
target_lang=target_language,
)
return translation_service.translate_batch(
texts, target_language, source_language
)
def _collect_from_shape(
self, shape: BaseShape, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Collect text from a shape and its children."""
if shape.has_text_frame:
self._collect_from_text_frame(shape.text_frame, text_elements)
if shape.shape_type == MSO_SHAPE_TYPE.TABLE:
for row in shape.table.rows:
for cell in row.cells:
self._collect_from_text_frame(cell.text_frame, text_elements)
if shape.shape_type == MSO_SHAPE_TYPE.GROUP:
for sub_shape in shape.shapes:
self._collect_from_shape(sub_shape, text_elements)
# Chart shapes — text is stored in separate chart XML parts
if shape.shape_type == MSO_SHAPE_TYPE.CHART:
self._collect_from_chart_shape(shape, text_elements)
if hasattr(shape, "shapes"):
try:
for sub_shape in shape.shapes:
self._collect_from_shape(sub_shape, text_elements)
except Exception:
pass
def _collect_from_chart_shape(
self, shape: BaseShape, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Collect translatable text from a chart shape.
Chart text (title, axis titles, series names, data labels) is stored
in a separate chart XML part, not in shape.text_frame.
"""
_NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main"
_NS_C = "http://schemas.openxmlformats.org/drawingml/2006/chart"
try:
chart_data = shape.chart
# Access the chart XML part through the chart's part
chart_part = chart_data.part
chart_xml = etree.fromstring(chart_part.blob)
# Collect text from <a:t> elements in chart XML
# These include: chart title, axis titles, legend entries, data labels
seen_texts: set = set()
chart_text_entries: List[Dict[str, Any]] = []
for t_elem in chart_xml.iter(f'{{{_NS_A}}}t'):
text = t_elem.text
if text and text.strip() and text.strip() not in seen_texts:
seen_texts.add(text.strip())
entry = {
'element': t_elem,
'original': text.strip(),
'translated': None,
}
chart_text_entries.append(entry)
def make_chart_setter(entries, idx):
def setter(translated_text):
entries[idx]['translated'] = translated_text.strip()
return setter
text_elements.append(
(text.strip(), make_chart_setter(chart_text_entries, len(chart_text_entries) - 1))
)
# Also collect from <c:v> (cell values used as category names)
for v_elem in chart_xml.iter(f'{{{_NS_C}}}v'):
text = v_elem.text
if text and text.strip() and not text.strip().replace('.', '').replace('-', '').replace(',', '').isdigit():
if text.strip() not in seen_texts:
seen_texts.add(text.strip())
entry = {
'element': v_elem,
'original': text.strip(),
'translated': None,
}
chart_text_entries.append(entry)
def make_chart_v_setter(entries, idx):
def setter(translated_text):
entries[idx]['translated'] = translated_text.strip()
return setter
text_elements.append(
(text.strip(), make_chart_v_setter(chart_text_entries, len(chart_text_entries) - 1))
)
# Store chart_part reference and entries for later re-injection
if chart_text_entries:
if not hasattr(self, '_chart_entries'):
self._chart_entries = []
self._chart_entries.append({
'chart_part': chart_part,
'entries': chart_text_entries,
})
except Exception as e:
_log_error("pptx_chart_collect_error", error=str(e))
def _collect_from_text_frame(
self, text_frame, text_elements: List[Tuple[str, Callable[[str], None]]]
) -> None:
"""Collect text from a text frame, preserving leading/trailing whitespace."""
if not text_frame.text.strip():
return
for paragraph in text_frame.paragraphs:
if not paragraph.text.strip():
continue
for run in paragraph.runs:
if run.text and run.text.strip():
original = run.text
leading = original[: len(original) - len(original.lstrip())]
trailing = original[len(original.rstrip()) :]
stripped = original.strip()
def make_setter(r, lead: str, trail: str):
def setter(text: str) -> None:
r.text = lead + text.strip() + trail
return setter
text_elements.append((stripped, make_setter(run, leading, trailing)))
def _apply_chart_translations(self, output_path: Path) -> None:
"""Re-inject chart text translations by modifying chart XML parts in the .pptx ZIP."""
if not hasattr(self, '_chart_entries') or not self._chart_entries:
return
_NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main"
_NS_C = "http://schemas.openxmlformats.org/drawingml/2006/chart"
total_translated = 0
for chart_data in self._chart_entries:
entries = chart_data['entries']
chart_part = chart_data['chart_part']
translated_entries = [e for e in entries if e.get('translated')]
if not translated_entries:
continue
try:
chart_xml = etree.fromstring(chart_part.blob)
for entry in translated_entries:
# Try to find and update <a:t> elements
for t_elem in chart_xml.iter(f'{{{_NS_A}}}t'):
if t_elem.text and t_elem.text.strip() == entry['original']:
t_elem.text = entry['translated']
total_translated += 1
break
else:
# Try <c:v> elements
for v_elem in chart_xml.iter(f'{{{_NS_C}}}v'):
if v_elem.text and v_elem.text.strip() == entry['original']:
v_elem.text = entry['translated']
total_translated += 1
break
# Update the chart part blob
chart_part._blob = etree.tostring(chart_xml, xml_declaration=True, encoding='UTF-8', standalone=True)
except Exception as e:
_log_error("pptx_chart_update_error", error=str(e))
# Clean up
self._chart_entries = []
if total_translated > 0:
_log_info("pptx_charts_translated", total=total_translated)
def _translate_images(self, presentation, target_language: str) -> None:
"""Extract and translate text from images in PowerPoint.
Appends the translated text to the slide notes."""
try:
from pptx.enum.shapes import MSO_SHAPE_TYPE
_log_info("pptx_image_translation_start", slides=len(presentation.slides))
for slide_idx, slide in enumerate(presentation.slides):
for shape_idx, shape in enumerate(slide.shapes):
if shape.shape_type != MSO_SHAPE_TYPE.PICTURE:
continue
try:
image = getattr(shape, "image", None)
if not image:
continue
image_data = image.blob
ext = getattr(image, "ext", "png") or "png"
import tempfile
import os
with tempfile.NamedTemporaryFile(suffix=f".{ext}", delete=False) as tmp:
tmp.write(image_data)
tmp_path = tmp.name
translated_text = self._translate_image_text(tmp_path, target_language)
try:
os.unlink(tmp_path)
except:
pass
if translated_text and translated_text.strip():
notes_slide = slide.notes_slide
notes_text_frame = notes_slide.notes_text_frame
notes_text = notes_text_frame.text or ""
separator = "\n" if notes_text else ""
notes_text_frame.text = f"{notes_text}{separator}[Image translation: {translated_text.strip()}]"
_log_info("pptx_image_translation_added", slide=slide_idx, shape=shape_idx)
except Exception as shape_err:
_log_error("pptx_image_shape_translation_error", slide=slide_idx, error=str(shape_err))
except Exception as e:
_log_error("pptx_image_processing_error", error=str(e))
def _translate_image_text(
self, image_path: str, target_language: str
) -> str:
"""Translate image using active provider or legacy service."""
if self._provider and hasattr(self._provider, "translate_image"):
try:
return self._provider.translate_image(image_path, target_language)
except Exception as e:
_log_error("pptx_image_translation_provider_error", error=str(e))
from services.translation_service import translation_service
# Temporarily enable translate_images flag on translation_service to bypass the hardcoded check
old_val = getattr(translation_service, "translate_images", False)
try:
translation_service.translate_images = True
if hasattr(translation_service, "translate_image"):
return translation_service.translate_image(image_path, target_language)
except Exception as e:
_log_error("pptx_image_translation_legacy_error", error=str(e))
finally:
translation_service.translate_images = old_val
return ""
pptx_translator = PowerPointTranslator()