""" PowerPoint Translation Module Translates PowerPoint files while preserving all layouts, animations, and media OPTIMIZED: Uses batch translation for 5-10x faster processing Updated to use new TranslationProvider interface with structured error handling. """ import time import zipfile import io import concurrent.futures from pathlib import Path from typing import Dict, List, Tuple, Optional, Callable, Any from lxml import etree from pptx import Presentation from pptx.shapes.base import BaseShape from pptx.shapes.group import GroupShape from pptx.enum.shapes import MSO_SHAPE_TYPE from services.providers.base import TranslationProvider # DrawingML namespace used by pptx XML _NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main" # Languages written right-to-left RTL_LANGUAGES: frozenset = frozenset( {"ar", "he", "fa", "ur", "ku", "ps", "ug", "sd", "yi", "dv", "ckb"} ) from core.logging import get_logger logger = get_logger(__name__) _HAS_STRUCTLOG = True def _log_info(event: str, **kwargs): """Log info with structlog or standard logging compatibility.""" if _HAS_STRUCTLOG: logger.info(event, **kwargs) else: msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items()) logger.info(msg) def _log_error(event: str, **kwargs): """Log error with structlog or standard logging compatibility.""" if _HAS_STRUCTLOG: logger.error(event, **kwargs) else: msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items()) logger.error(msg) def _set_pptx_paragraph_rtl(paragraph) -> None: """ Enable RTL mode on a PowerPoint paragraph. Sets rtl="1" and algn="r" on the element, which controls both text direction and horizontal alignment in DrawingML. """ p_elem = paragraph._p tag_pPr = f"{{{_NS_A}}}pPr" pPr = p_elem.find(tag_pPr) if pPr is None: pPr = etree.Element(tag_pPr) p_elem.insert(0, pPr) pPr.set("rtl", "1") pPr.set("algn", "r") def _apply_rtl_to_presentation(presentation: Presentation) -> None: """Apply RTL direction to every paragraph in all slides.""" for slide in presentation.slides: for shape in slide.shapes: _apply_rtl_to_shape(shape) def _apply_rtl_to_shape(shape) -> None: """Recursively apply RTL to a shape (handles groups and tables).""" if shape.has_text_frame: for paragraph in shape.text_frame.paragraphs: _set_pptx_paragraph_rtl(paragraph) if shape.shape_type == MSO_SHAPE_TYPE.TABLE: for row in shape.table.rows: for cell in row.cells: for paragraph in cell.text_frame.paragraphs: _set_pptx_paragraph_rtl(paragraph) if shape.shape_type == MSO_SHAPE_TYPE.GROUP: for sub_shape in shape.shapes: _apply_rtl_to_shape(sub_shape) class PptxProcessorError(Exception): """Exception for PowerPoint processing errors with structured error codes.""" INVALID_FORMAT = "INVALID_FORMAT" PPTX_CORRUPTED = "PPTX_CORRUPTED" PPTX_READ_ERROR = "PPTX_READ_ERROR" PPTX_WRITE_ERROR = "PPTX_WRITE_ERROR" PPTX_TOO_LARGE = "PPTX_TOO_LARGE" ERROR_MESSAGES = { INVALID_FORMAT: "Format de fichier non supporte. Utilisez .pptx.", PPTX_CORRUPTED: "Le fichier PowerPoint est corrompu ou illisible.", PPTX_READ_ERROR: "Erreur lors de la lecture du fichier PowerPoint.", PPTX_WRITE_ERROR: "Erreur lors de la creation du fichier traduit.", PPTX_TOO_LARGE: "Le fichier est trop volumineux (max 50 Mo).", } def __init__( self, code: str, message: Optional[str] = None, details: Optional[Dict[str, Any]] = None, ): self.code = code self.message = message or self.ERROR_MESSAGES.get(code, "Erreur inconnue") self.details = details or {} super().__init__(self.message) def to_dict(self) -> Dict[str, Any]: """Convert error to dictionary format for API responses.""" result = {"error": self.code, "message": self.message} if self.details: result["details"] = self.details return result class PowerPointTranslator: """ Handles translation of PowerPoint presentations with strict formatting preservation. Uses the new TranslationProvider interface for improved error handling and fallback chain support. """ MAX_FILE_SIZE_MB = 50 PPTX_MAGIC_BYTES = b"PK" # .pptx files are ZIP archives def __init__(self, provider: Optional[TranslationProvider] = None): """ Initialize PowerPointTranslator. Args: provider: TranslationProvider instance for translations. If None, will use fallback to legacy translation_service. """ self._provider = provider self._custom_prompt: Optional[str] = None self._translation_stats = {"attempted": 0, "changed": 0} def set_provider(self, provider: TranslationProvider) -> None: """Set the translation provider.""" self._provider = provider def set_custom_prompt(self, prompt: Optional[str]) -> None: """Set custom system prompt for LLM providers.""" self._custom_prompt = prompt def translate_file( self, input_path: Path, output_path: Path, target_language: str, source_language: str = "auto", progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None, translate_images: bool = False, ) -> Path: """ Translate a PowerPoint presentation while preserving all formatting. Uses batch translation for improved performance. Args: input_path: Path to input PowerPoint file output_path: Path for translated output file target_language: Target language code (e.g., 'fr', 'en') source_language: Source language code (default: auto-detect) progress_callback: Optional callback for progress updates Receives dict with: slide, total_slides, runs_translated Returns: Path to translated file Raises: PptxProcessorError: If file is invalid, corrupted, or processing fails """ start_time = time.time() input_path = Path(input_path) output_path = Path(output_path) self._validate_file(input_path) try: presentation = Presentation(input_path) except Exception as e: raise PptxProcessorError( code=PptxProcessorError.PPTX_CORRUPTED, details={"file_name": input_path.name, "error": str(e)}, ) try: runs_translated = 0 total_slides = len(presentation.slides) if progress_callback: progress_callback( { "current": 0, "total": total_slides, "slide": 0, "total_slides": total_slides, "runs_translated": 0, } ) text_elements: List[Tuple[str, Callable[[str], None]]] = [] for slide_idx, slide in enumerate(presentation.slides): if slide.has_notes_slide and slide.notes_slide.notes_text_frame: self._collect_from_text_frame( slide.notes_slide.notes_text_frame, text_elements ) for shape in slide.shapes: self._collect_from_shape(shape, text_elements) if progress_callback: progress_callback( { "current": slide_idx + 1, "total": total_slides, "slide": slide_idx + 1, "total_slides": total_slides, "runs_translated": runs_translated, } ) if text_elements: texts = [elem[0] for elem in text_elements] total_elements = len(texts) _log_info( "pptx_batch_translation_start", file_name=input_path.name, text_count=total_elements, target_lang=target_language, ) # Parallel chunk translation with real-time progress. CHUNK_SIZE = 15 MAX_WORKERS = 6 chunks = [ (i, texts[i : i + CHUNK_SIZE]) for i in range(0, total_elements, CHUNK_SIZE) ] translated_texts: List[str] = [""] * total_elements completed_items = [0] def _translate_chunk( chunk_idx: int, chunk: List[str] ) -> Tuple[int, List[str]]: return chunk_idx, self._batch_translate( chunk, target_language, source_language ) with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool: future_map = { pool.submit(_translate_chunk, idx, chunk): (idx, chunk) for idx, chunk in chunks } for future in concurrent.futures.as_completed(future_map): chunk_idx, translated_chunk = future.result() for j, t in enumerate(translated_chunk): translated_texts[chunk_idx + j] = t completed_items[0] += len(translated_chunk) if progress_callback: done = min(completed_items[0], total_elements) progress_callback( { "current": done, "total": total_elements, "slide": done, "total_slides": total_elements, "runs_translated": runs_translated, } ) # Apply translations for i, ((original_text, setter), translated) in enumerate( zip(text_elements, translated_texts) ): if translated is not None and setter is not None: try: setter(translated) runs_translated += 1 except Exception as e: _log_error( "pptx_setter_error", error=str(e), index=i, ) # Apply RTL layout when the target language is written right-to-left. if target_language.lower() in RTL_LANGUAGES: _apply_rtl_to_presentation(presentation) if translate_images: try: self._translate_images(presentation, target_language) except Exception as e: _log_error("pptx_document_images_failed", error=str(e)) try: presentation.save(output_path) except Exception as e: raise PptxProcessorError( code=PptxProcessorError.PPTX_WRITE_ERROR, details={"file_name": output_path.name, "error": str(e)}, ) # Re-inject chart translations into chart XML parts self._apply_chart_translations(output_path) processing_time_ms = round((time.time() - start_time) * 1000, 2) _log_info( "pptx_translation_success", file_name=input_path.name, slides_count=total_slides, runs_translated=runs_translated, source_lang=source_language, target_lang=target_language, processing_time_ms=processing_time_ms, ) return output_path except PptxProcessorError: raise except Exception as e: raise PptxProcessorError( code=PptxProcessorError.PPTX_READ_ERROR, details={"file_name": input_path.name, "error": str(e)}, ) def _validate_file(self, file_path: Path) -> None: """Validate file format and size.""" if not file_path.exists(): raise PptxProcessorError( code=PptxProcessorError.PPTX_READ_ERROR, message=f"Fichier introuvable: {file_path.name}", details={"file_name": file_path.name}, ) if file_path.suffix.lower() != ".pptx": raise PptxProcessorError( code=PptxProcessorError.INVALID_FORMAT, details={ "file_name": file_path.name, "extension": file_path.suffix, "expected": ".pptx", }, ) with open(file_path, "rb") as f: header = f.read(4) if header[:2] != self.PPTX_MAGIC_BYTES: raise PptxProcessorError( code=PptxProcessorError.INVALID_FORMAT, details={"file_name": file_path.name, "reason": "Invalid file header"}, ) file_size_mb = file_path.stat().st_size / (1024 * 1024) if file_size_mb > self.MAX_FILE_SIZE_MB: raise PptxProcessorError( code=PptxProcessorError.PPTX_TOO_LARGE, details={ "file_name": file_path.name, "size_mb": round(file_size_mb, 2), "max_mb": self.MAX_FILE_SIZE_MB, }, ) def _batch_translate( self, texts: List[str], target_language: str, source_language: str = "auto" ) -> List[str]: if not texts: return [] non_empty = [t for t in texts if t and t.strip()] self._translation_stats["attempted"] += len(non_empty) if self._provider is not None: translated = self._translate_with_provider( texts, target_language, source_language ) else: translated = self._translate_with_legacy(texts, target_language, source_language) changed = sum(1 for orig, trans in zip(texts, translated) if orig != trans and trans.strip()) self._translation_stats["changed"] += changed return translated def get_translation_stats(self) -> dict: return dict(self._translation_stats) def _translate_with_provider( self, texts: List[str], target_language: str, source_language: str ) -> List[str]: """Translate using the TranslationProvider.translate_batch() interface.""" from services.providers.base import TranslationProvider as NewTranslationProvider is_new_style = False if isinstance(self._provider, NewTranslationProvider): is_new_style = True elif hasattr(self._provider, "__class__") and self._provider.__class__.__name__ in ( "MockTranslationProvider", "Mock", "MagicMock", ): is_new_style = True if is_new_style: from services.providers.schemas import TranslationRequest custom_prompt = getattr(self, "_custom_prompt", None) metadata = {"custom_prompt": custom_prompt} if custom_prompt else None requests = [ TranslationRequest( text=t, target_language=target_language, source_language=source_language, metadata=metadata, ) for t in texts ] responses = self._provider.translate_batch(requests) translated = [resp.translated_text for resp in responses] else: translated = self._provider.translate_batch(texts, target_language, source_language) return [ t if (t and t.strip()) else orig for t, orig in zip(translated, texts) ] def _translate_with_legacy( self, texts: List[str], target_language: str, source_language: str ) -> List[str]: """Fallback to legacy translation_service for backward compatibility.""" from services.translation_service import translation_service _log_info( "pptx_using_legacy_service", text_count=len(texts), target_lang=target_language, ) return translation_service.translate_batch( texts, target_language, source_language ) def _collect_from_shape( self, shape: BaseShape, text_elements: List[Tuple[str, Callable[[str], None]]] ) -> None: """Collect text from a shape and its children.""" if shape.has_text_frame: self._collect_from_text_frame(shape.text_frame, text_elements) if shape.shape_type == MSO_SHAPE_TYPE.TABLE: for row in shape.table.rows: for cell in row.cells: self._collect_from_text_frame(cell.text_frame, text_elements) if shape.shape_type == MSO_SHAPE_TYPE.GROUP: for sub_shape in shape.shapes: self._collect_from_shape(sub_shape, text_elements) # Chart shapes — text is stored in separate chart XML parts if shape.shape_type == MSO_SHAPE_TYPE.CHART: self._collect_from_chart_shape(shape, text_elements) if hasattr(shape, "shapes"): try: for sub_shape in shape.shapes: self._collect_from_shape(sub_shape, text_elements) except Exception: pass def _collect_from_chart_shape( self, shape: BaseShape, text_elements: List[Tuple[str, Callable[[str], None]]] ) -> None: """Collect translatable text from a chart shape. Chart text (title, axis titles, series names, data labels) is stored in a separate chart XML part, not in shape.text_frame. """ _NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main" _NS_C = "http://schemas.openxmlformats.org/drawingml/2006/chart" try: chart_data = shape.chart # Access the chart XML part through the chart's part chart_part = chart_data.part chart_xml = etree.fromstring(chart_part.blob) # Collect text from elements in chart XML # These include: chart title, axis titles, legend entries, data labels seen_texts: set = set() chart_text_entries: List[Dict[str, Any]] = [] for t_elem in chart_xml.iter(f'{{{_NS_A}}}t'): text = t_elem.text if text and text.strip() and text.strip() not in seen_texts: seen_texts.add(text.strip()) entry = { 'element': t_elem, 'original': text.strip(), 'translated': None, } chart_text_entries.append(entry) def make_chart_setter(entries, idx): def setter(translated_text): entries[idx]['translated'] = translated_text.strip() return setter text_elements.append( (text.strip(), make_chart_setter(chart_text_entries, len(chart_text_entries) - 1)) ) # Also collect from (cell values used as category names) for v_elem in chart_xml.iter(f'{{{_NS_C}}}v'): text = v_elem.text if text and text.strip() and not text.strip().replace('.', '').replace('-', '').replace(',', '').isdigit(): if text.strip() not in seen_texts: seen_texts.add(text.strip()) entry = { 'element': v_elem, 'original': text.strip(), 'translated': None, } chart_text_entries.append(entry) def make_chart_v_setter(entries, idx): def setter(translated_text): entries[idx]['translated'] = translated_text.strip() return setter text_elements.append( (text.strip(), make_chart_v_setter(chart_text_entries, len(chart_text_entries) - 1)) ) # Store chart_part reference and entries for later re-injection if chart_text_entries: if not hasattr(self, '_chart_entries'): self._chart_entries = [] self._chart_entries.append({ 'chart_part': chart_part, 'entries': chart_text_entries, }) except Exception as e: _log_error("pptx_chart_collect_error", error=str(e)) def _collect_from_text_frame( self, text_frame, text_elements: List[Tuple[str, Callable[[str], None]]] ) -> None: """Collect text from a text frame, preserving leading/trailing whitespace.""" if not text_frame.text.strip(): return for paragraph in text_frame.paragraphs: if not paragraph.text.strip(): continue for run in paragraph.runs: if run.text and run.text.strip(): original = run.text leading = original[: len(original) - len(original.lstrip())] trailing = original[len(original.rstrip()) :] stripped = original.strip() def make_setter(r, lead: str, trail: str): def setter(text: str) -> None: r.text = lead + text.strip() + trail return setter text_elements.append((stripped, make_setter(run, leading, trailing))) def _apply_chart_translations(self, output_path: Path) -> None: """Re-inject chart text translations by modifying chart XML parts in the .pptx ZIP.""" if not hasattr(self, '_chart_entries') or not self._chart_entries: return _NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main" _NS_C = "http://schemas.openxmlformats.org/drawingml/2006/chart" total_translated = 0 for chart_data in self._chart_entries: entries = chart_data['entries'] chart_part = chart_data['chart_part'] translated_entries = [e for e in entries if e.get('translated')] if not translated_entries: continue try: chart_xml = etree.fromstring(chart_part.blob) for entry in translated_entries: # Try to find and update elements for t_elem in chart_xml.iter(f'{{{_NS_A}}}t'): if t_elem.text and t_elem.text.strip() == entry['original']: t_elem.text = entry['translated'] total_translated += 1 break else: # Try elements for v_elem in chart_xml.iter(f'{{{_NS_C}}}v'): if v_elem.text and v_elem.text.strip() == entry['original']: v_elem.text = entry['translated'] total_translated += 1 break # Update the chart part blob chart_part._blob = etree.tostring(chart_xml, xml_declaration=True, encoding='UTF-8', standalone=True) except Exception as e: _log_error("pptx_chart_update_error", error=str(e)) # Clean up self._chart_entries = [] if total_translated > 0: _log_info("pptx_charts_translated", total=total_translated) def _translate_images(self, presentation, target_language: str) -> None: """Extract and translate text from images in PowerPoint. Appends the translated text to the slide notes.""" try: from pptx.enum.shapes import MSO_SHAPE_TYPE _log_info("pptx_image_translation_start", slides=len(presentation.slides)) for slide_idx, slide in enumerate(presentation.slides): for shape_idx, shape in enumerate(slide.shapes): if shape.shape_type != MSO_SHAPE_TYPE.PICTURE: continue try: image = getattr(shape, "image", None) if not image: continue image_data = image.blob ext = getattr(image, "ext", "png") or "png" import tempfile import os with tempfile.NamedTemporaryFile(suffix=f".{ext}", delete=False) as tmp: tmp.write(image_data) tmp_path = tmp.name translated_text = self._translate_image_text(tmp_path, target_language) try: os.unlink(tmp_path) except: pass if translated_text and translated_text.strip(): notes_slide = slide.notes_slide notes_text_frame = notes_slide.notes_text_frame notes_text = notes_text_frame.text or "" separator = "\n" if notes_text else "" notes_text_frame.text = f"{notes_text}{separator}[Image translation: {translated_text.strip()}]" _log_info("pptx_image_translation_added", slide=slide_idx, shape=shape_idx) except Exception as shape_err: _log_error("pptx_image_shape_translation_error", slide=slide_idx, error=str(shape_err)) except Exception as e: _log_error("pptx_image_processing_error", error=str(e)) def _translate_image_text( self, image_path: str, target_language: str ) -> str: """Translate image using active provider or legacy service.""" if self._provider and hasattr(self._provider, "translate_image"): try: return self._provider.translate_image(image_path, target_language) except Exception as e: _log_error("pptx_image_translation_provider_error", error=str(e)) from services.translation_service import translation_service # Temporarily enable translate_images flag on translation_service to bypass the hardcoded check old_val = getattr(translation_service, "translate_images", False) try: translation_service.translate_images = True if hasattr(translation_service, "translate_image"): return translation_service.translate_image(image_path, target_language) except Exception as e: _log_error("pptx_image_translation_legacy_error", error=str(e)) finally: translation_service.translate_images = old_val return "" pptx_translator = PowerPointTranslator()