483 lines
17 KiB
Python
483 lines
17 KiB
Python
"""
|
|
PowerPoint Translation Module
|
|
Translates PowerPoint files while preserving all layouts, animations, and media
|
|
OPTIMIZED: Uses batch translation for 5-10x faster processing
|
|
|
|
Updated to use new TranslationProvider interface with structured error handling.
|
|
"""
|
|
|
|
import time
|
|
import concurrent.futures
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple, Optional, Callable, Any
|
|
|
|
from lxml import etree
|
|
from pptx import Presentation
|
|
from pptx.shapes.base import BaseShape
|
|
from pptx.shapes.group import GroupShape
|
|
from pptx.enum.shapes import MSO_SHAPE_TYPE
|
|
|
|
from services.providers.base import TranslationProvider
|
|
|
|
# DrawingML namespace used by pptx XML
|
|
_NS_A = "http://schemas.openxmlformats.org/drawingml/2006/main"
|
|
|
|
# Languages written right-to-left
|
|
RTL_LANGUAGES: frozenset = frozenset(
|
|
{"ar", "he", "fa", "ur", "ku", "ps", "ug", "sd", "yi", "dv", "ckb"}
|
|
)
|
|
|
|
|
|
try:
|
|
import structlog
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
_HAS_STRUCTLOG = True
|
|
except ImportError:
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
_HAS_STRUCTLOG = False
|
|
|
|
|
|
def _log_info(event: str, **kwargs):
|
|
"""Log info with structlog or standard logging compatibility."""
|
|
if _HAS_STRUCTLOG:
|
|
logger.info(event, **kwargs)
|
|
else:
|
|
msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items())
|
|
logger.info(msg)
|
|
|
|
|
|
def _log_error(event: str, **kwargs):
|
|
"""Log error with structlog or standard logging compatibility."""
|
|
if _HAS_STRUCTLOG:
|
|
logger.error(event, **kwargs)
|
|
else:
|
|
msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items())
|
|
logger.error(msg)
|
|
|
|
|
|
def _set_pptx_paragraph_rtl(paragraph) -> None:
|
|
"""
|
|
Enable RTL mode on a PowerPoint paragraph.
|
|
|
|
Sets rtl="1" and algn="r" on the <a:pPr> element, which controls
|
|
both text direction and horizontal alignment in DrawingML.
|
|
"""
|
|
p_elem = paragraph._p
|
|
tag_pPr = f"{{{_NS_A}}}pPr"
|
|
pPr = p_elem.find(tag_pPr)
|
|
if pPr is None:
|
|
pPr = etree.Element(tag_pPr)
|
|
p_elem.insert(0, pPr)
|
|
pPr.set("rtl", "1")
|
|
pPr.set("algn", "r")
|
|
|
|
|
|
def _apply_rtl_to_presentation(presentation: Presentation) -> None:
|
|
"""Apply RTL direction to every paragraph in all slides."""
|
|
for slide in presentation.slides:
|
|
for shape in slide.shapes:
|
|
_apply_rtl_to_shape(shape)
|
|
|
|
|
|
def _apply_rtl_to_shape(shape) -> None:
|
|
"""Recursively apply RTL to a shape (handles groups and tables)."""
|
|
if shape.has_text_frame:
|
|
for paragraph in shape.text_frame.paragraphs:
|
|
_set_pptx_paragraph_rtl(paragraph)
|
|
|
|
if shape.shape_type == MSO_SHAPE_TYPE.TABLE:
|
|
for row in shape.table.rows:
|
|
for cell in row.cells:
|
|
for paragraph in cell.text_frame.paragraphs:
|
|
_set_pptx_paragraph_rtl(paragraph)
|
|
|
|
if shape.shape_type == MSO_SHAPE_TYPE.GROUP:
|
|
for sub_shape in shape.shapes:
|
|
_apply_rtl_to_shape(sub_shape)
|
|
|
|
|
|
class PptxProcessorError(Exception):
|
|
"""Exception for PowerPoint processing errors with structured error codes."""
|
|
|
|
INVALID_FORMAT = "INVALID_FORMAT"
|
|
PPTX_CORRUPTED = "PPTX_CORRUPTED"
|
|
PPTX_READ_ERROR = "PPTX_READ_ERROR"
|
|
PPTX_WRITE_ERROR = "PPTX_WRITE_ERROR"
|
|
PPTX_TOO_LARGE = "PPTX_TOO_LARGE"
|
|
|
|
ERROR_MESSAGES = {
|
|
INVALID_FORMAT: "Format de fichier non supporte. Utilisez .pptx.",
|
|
PPTX_CORRUPTED: "Le fichier PowerPoint est corrompu ou illisible.",
|
|
PPTX_READ_ERROR: "Erreur lors de la lecture du fichier PowerPoint.",
|
|
PPTX_WRITE_ERROR: "Erreur lors de la creation du fichier traduit.",
|
|
PPTX_TOO_LARGE: "Le fichier est trop volumineux (max 50 Mo).",
|
|
}
|
|
|
|
def __init__(
|
|
self,
|
|
code: str,
|
|
message: Optional[str] = None,
|
|
details: Optional[Dict[str, Any]] = None,
|
|
):
|
|
self.code = code
|
|
self.message = message or self.ERROR_MESSAGES.get(code, "Erreur inconnue")
|
|
self.details = details or {}
|
|
super().__init__(self.message)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert error to dictionary format for API responses."""
|
|
result = {"error": self.code, "message": self.message}
|
|
if self.details:
|
|
result["details"] = self.details
|
|
return result
|
|
|
|
|
|
class PowerPointTranslator:
|
|
"""
|
|
Handles translation of PowerPoint presentations with strict formatting preservation.
|
|
|
|
Uses the new TranslationProvider interface for improved error handling
|
|
and fallback chain support.
|
|
"""
|
|
|
|
MAX_FILE_SIZE_MB = 50
|
|
PPTX_MAGIC_BYTES = b"PK" # .pptx files are ZIP archives
|
|
|
|
def __init__(self, provider: Optional[TranslationProvider] = None):
|
|
"""
|
|
Initialize PowerPointTranslator.
|
|
|
|
Args:
|
|
provider: TranslationProvider instance for translations.
|
|
If None, will use fallback to legacy translation_service.
|
|
"""
|
|
self._provider = provider
|
|
self._custom_prompt: Optional[str] = None
|
|
|
|
def set_provider(self, provider: TranslationProvider) -> None:
|
|
"""Set the translation provider."""
|
|
self._provider = provider
|
|
|
|
def set_custom_prompt(self, prompt: Optional[str]) -> None:
|
|
"""Set custom system prompt for LLM providers."""
|
|
self._custom_prompt = prompt
|
|
|
|
def translate_file(
|
|
self,
|
|
input_path: Path,
|
|
output_path: Path,
|
|
target_language: str,
|
|
source_language: str = "auto",
|
|
progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None,
|
|
) -> Path:
|
|
"""
|
|
Translate a PowerPoint presentation while preserving all formatting.
|
|
Uses batch translation for improved performance.
|
|
|
|
Args:
|
|
input_path: Path to input PowerPoint file
|
|
output_path: Path for translated output file
|
|
target_language: Target language code (e.g., 'fr', 'en')
|
|
source_language: Source language code (default: auto-detect)
|
|
progress_callback: Optional callback for progress updates
|
|
Receives dict with: slide, total_slides, runs_translated
|
|
|
|
Returns:
|
|
Path to translated file
|
|
|
|
Raises:
|
|
PptxProcessorError: If file is invalid, corrupted, or processing fails
|
|
"""
|
|
start_time = time.time()
|
|
|
|
input_path = Path(input_path)
|
|
output_path = Path(output_path)
|
|
|
|
self._validate_file(input_path)
|
|
|
|
try:
|
|
presentation = Presentation(input_path)
|
|
except Exception as e:
|
|
raise PptxProcessorError(
|
|
code=PptxProcessorError.PPTX_CORRUPTED,
|
|
details={"file_name": input_path.name, "error": str(e)},
|
|
)
|
|
|
|
try:
|
|
runs_translated = 0
|
|
total_slides = len(presentation.slides)
|
|
|
|
if progress_callback:
|
|
progress_callback(
|
|
{
|
|
"current": 0,
|
|
"total": total_slides,
|
|
"slide": 0,
|
|
"total_slides": total_slides,
|
|
"runs_translated": 0,
|
|
}
|
|
)
|
|
|
|
text_elements: List[Tuple[str, Callable[[str], None]]] = []
|
|
|
|
for slide_idx, slide in enumerate(presentation.slides):
|
|
if slide.has_notes_slide and slide.notes_slide.notes_text_frame:
|
|
self._collect_from_text_frame(
|
|
slide.notes_slide.notes_text_frame, text_elements
|
|
)
|
|
|
|
for shape in slide.shapes:
|
|
self._collect_from_shape(shape, text_elements)
|
|
|
|
if progress_callback:
|
|
progress_callback(
|
|
{
|
|
"current": slide_idx + 1,
|
|
"total": total_slides,
|
|
"slide": slide_idx + 1,
|
|
"total_slides": total_slides,
|
|
"runs_translated": runs_translated,
|
|
}
|
|
)
|
|
|
|
if text_elements:
|
|
texts = [elem[0] for elem in text_elements]
|
|
total_elements = len(texts)
|
|
_log_info(
|
|
"pptx_batch_translation_start",
|
|
file_name=input_path.name,
|
|
text_count=total_elements,
|
|
target_lang=target_language,
|
|
)
|
|
|
|
# Parallel chunk translation with real-time progress.
|
|
CHUNK_SIZE = 15
|
|
MAX_WORKERS = 6
|
|
chunks = [
|
|
(i, texts[i : i + CHUNK_SIZE])
|
|
for i in range(0, total_elements, CHUNK_SIZE)
|
|
]
|
|
translated_texts: List[str] = [""] * total_elements
|
|
completed_items = [0]
|
|
|
|
def _translate_chunk(
|
|
chunk_idx: int, chunk: List[str]
|
|
) -> Tuple[int, List[str]]:
|
|
return chunk_idx, self._batch_translate(
|
|
chunk, target_language, source_language
|
|
)
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
|
|
future_map = {
|
|
pool.submit(_translate_chunk, idx, chunk): (idx, chunk)
|
|
for idx, chunk in chunks
|
|
}
|
|
for future in concurrent.futures.as_completed(future_map):
|
|
chunk_idx, translated_chunk = future.result()
|
|
for j, t in enumerate(translated_chunk):
|
|
translated_texts[chunk_idx + j] = t
|
|
completed_items[0] += len(translated_chunk)
|
|
if progress_callback:
|
|
done = min(completed_items[0], total_elements)
|
|
progress_callback(
|
|
{
|
|
"current": done,
|
|
"total": total_elements,
|
|
"slide": done,
|
|
"total_slides": total_elements,
|
|
"runs_translated": runs_translated,
|
|
}
|
|
)
|
|
|
|
# Apply translations
|
|
for i, ((original_text, setter), translated) in enumerate(
|
|
zip(text_elements, translated_texts)
|
|
):
|
|
if translated is not None and setter is not None:
|
|
try:
|
|
setter(translated)
|
|
runs_translated += 1
|
|
except Exception as e:
|
|
_log_error(
|
|
"pptx_setter_error",
|
|
error=str(e),
|
|
index=i,
|
|
)
|
|
|
|
# Apply RTL layout when the target language is written right-to-left.
|
|
if target_language.lower() in RTL_LANGUAGES:
|
|
_apply_rtl_to_presentation(presentation)
|
|
|
|
try:
|
|
presentation.save(output_path)
|
|
except Exception as e:
|
|
raise PptxProcessorError(
|
|
code=PptxProcessorError.PPTX_WRITE_ERROR,
|
|
details={"file_name": output_path.name, "error": str(e)},
|
|
)
|
|
|
|
processing_time_ms = round((time.time() - start_time) * 1000, 2)
|
|
|
|
_log_info(
|
|
"pptx_translation_success",
|
|
file_name=input_path.name,
|
|
slides_count=total_slides,
|
|
runs_translated=runs_translated,
|
|
source_lang=source_language,
|
|
target_lang=target_language,
|
|
processing_time_ms=processing_time_ms,
|
|
)
|
|
|
|
return output_path
|
|
|
|
except PptxProcessorError:
|
|
raise
|
|
except Exception as e:
|
|
raise PptxProcessorError(
|
|
code=PptxProcessorError.PPTX_READ_ERROR,
|
|
details={"file_name": input_path.name, "error": str(e)},
|
|
)
|
|
|
|
def _validate_file(self, file_path: Path) -> None:
|
|
"""Validate file format and size."""
|
|
if not file_path.exists():
|
|
raise PptxProcessorError(
|
|
code=PptxProcessorError.PPTX_READ_ERROR,
|
|
message=f"Fichier introuvable: {file_path.name}",
|
|
details={"file_name": file_path.name},
|
|
)
|
|
|
|
if file_path.suffix.lower() != ".pptx":
|
|
raise PptxProcessorError(
|
|
code=PptxProcessorError.INVALID_FORMAT,
|
|
details={
|
|
"file_name": file_path.name,
|
|
"extension": file_path.suffix,
|
|
"expected": ".pptx",
|
|
},
|
|
)
|
|
|
|
with open(file_path, "rb") as f:
|
|
header = f.read(4)
|
|
if header[:2] != self.PPTX_MAGIC_BYTES:
|
|
raise PptxProcessorError(
|
|
code=PptxProcessorError.INVALID_FORMAT,
|
|
details={"file_name": file_path.name, "reason": "Invalid file header"},
|
|
)
|
|
|
|
file_size_mb = file_path.stat().st_size / (1024 * 1024)
|
|
if file_size_mb > self.MAX_FILE_SIZE_MB:
|
|
raise PptxProcessorError(
|
|
code=PptxProcessorError.PPTX_TOO_LARGE,
|
|
details={
|
|
"file_name": file_path.name,
|
|
"size_mb": round(file_size_mb, 2),
|
|
"max_mb": self.MAX_FILE_SIZE_MB,
|
|
},
|
|
)
|
|
|
|
def _batch_translate(
|
|
self, texts: List[str], target_language: str, source_language: str = "auto"
|
|
) -> List[str]:
|
|
"""
|
|
Batch translate using new provider interface.
|
|
|
|
Args:
|
|
texts: List of texts to translate
|
|
target_language: Target language code
|
|
source_language: Source language code
|
|
|
|
Returns:
|
|
List of translated texts (same order as input)
|
|
"""
|
|
if not texts:
|
|
return []
|
|
|
|
if self._provider is not None:
|
|
return self._translate_with_provider(
|
|
texts, target_language, source_language
|
|
)
|
|
|
|
return self._translate_with_legacy(texts, target_language, source_language)
|
|
|
|
def _translate_with_provider(
|
|
self, texts: List[str], target_language: str, source_language: str
|
|
) -> List[str]:
|
|
"""Translate using the TranslationProvider.translate_batch() interface."""
|
|
translated = self._provider.translate_batch(texts, target_language, source_language)
|
|
return [
|
|
t if (t and t.strip()) else orig
|
|
for t, orig in zip(translated, texts)
|
|
]
|
|
|
|
def _translate_with_legacy(
|
|
self, texts: List[str], target_language: str, source_language: str
|
|
) -> List[str]:
|
|
"""Fallback to legacy translation_service for backward compatibility."""
|
|
from services.translation_service import translation_service
|
|
|
|
_log_info(
|
|
"pptx_using_legacy_service",
|
|
text_count=len(texts),
|
|
target_lang=target_language,
|
|
)
|
|
|
|
return translation_service.translate_batch(
|
|
texts, target_language, source_language
|
|
)
|
|
|
|
def _collect_from_shape(
|
|
self, shape: BaseShape, text_elements: List[Tuple[str, Callable[[str], None]]]
|
|
) -> None:
|
|
"""Collect text from a shape and its children."""
|
|
if shape.has_text_frame:
|
|
self._collect_from_text_frame(shape.text_frame, text_elements)
|
|
|
|
if shape.shape_type == MSO_SHAPE_TYPE.TABLE:
|
|
for row in shape.table.rows:
|
|
for cell in row.cells:
|
|
self._collect_from_text_frame(cell.text_frame, text_elements)
|
|
|
|
if shape.shape_type == MSO_SHAPE_TYPE.GROUP:
|
|
for sub_shape in shape.shapes:
|
|
self._collect_from_shape(sub_shape, text_elements)
|
|
|
|
if hasattr(shape, "shapes"):
|
|
try:
|
|
for sub_shape in shape.shapes:
|
|
self._collect_from_shape(sub_shape, text_elements)
|
|
except Exception:
|
|
pass
|
|
|
|
def _collect_from_text_frame(
|
|
self, text_frame, text_elements: List[Tuple[str, Callable[[str], None]]]
|
|
) -> None:
|
|
"""Collect text from a text frame, preserving leading/trailing whitespace."""
|
|
if not text_frame.text.strip():
|
|
return
|
|
|
|
for paragraph in text_frame.paragraphs:
|
|
if not paragraph.text.strip():
|
|
continue
|
|
|
|
for run in paragraph.runs:
|
|
if run.text and run.text.strip():
|
|
original = run.text
|
|
leading = original[: len(original) - len(original.lstrip())]
|
|
trailing = original[len(original.rstrip()) :]
|
|
stripped = original.strip()
|
|
|
|
def make_setter(r, lead: str, trail: str):
|
|
def setter(text: str) -> None:
|
|
r.text = lead + text.strip() + trail
|
|
|
|
return setter
|
|
|
|
text_elements.append((stripped, make_setter(run, leading, trailing)))
|
|
|
|
|
|
pptx_translator = PowerPointTranslator()
|