""" PDF Document Translation Module — Layout-Preserving Primary strategy (layout mode): Use PyMuPDF (fitz) for direct, in-place text replacement on each page. For each page: 1. Extract text blocks with positions, fonts, sizes, colors 2. Translate each block as a unit (preserving context within block) 3. Redact original text area 4. Write translated text at the same position, auto-adjusting font size This preserves: - Page structure, images, vector graphics, backgrounds - Text positions within original bounding boxes - Approximate font styling (size, color) Fallback: If PyMuPDF direct editing fails, falls back to the pdf2docx pipeline (PDF → DOCX → WordTranslator → PDF via LibreOffice). Text-only mode: Extract text, translate, generate a clean formatted PDF via reportlab. """ import time import shutil import subprocess from pathlib import Path from typing import Dict, Any, Optional, Callable, List from core.logging import get_logger logger = get_logger(__name__) # Minimum readable font size (points) MIN_FONT_SIZE = 4.5 # Font size reduction factor when text overflows its bounding box FONT_SHRINK_FACTOR = 0.87 # RTL language codes RTL_LANGUAGES = frozenset({"ar", "he", "fa", "ur", "ku", "ps", "ug", "sd", "yi", "dv", "ckb"}) class PDFTranslator: """Translates PDF files with layout preservation using PyMuPDF.""" _FONT_SEARCH_PATHS = [ "/usr/share/fonts/opentype/noto/NotoSans-Regular.ttf", "/usr/share/fonts/opentype/noto/NotoSans.ttc", "/usr/share/fonts/truetype/noto/NotoSans-Regular.ttf", "/usr/share/fonts/truetype/noto/NotoSans[Noto].ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", "/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf", "/usr/share/fonts/truetype/freefont/FreeSans.ttf", "/app/fonts/NotoSans-Regular.ttf", "C:/Windows/Fonts/arial.ttf", "C:/Windows/Fonts/msyh.ttc", "/System/Library/Fonts/Helvetica.ttc", ] def __init__(self, provider=None): self._provider = provider self._font_path: Optional[str] = None def _get_font_path(self) -> Optional[str]: """Resolve a Unicode-capable TTF/OTF font file.""" if self._font_path is not None: return self._font_path for p in self._FONT_SEARCH_PATHS: if Path(p).exists(): self._font_path = p return p logger.warning("no_unicode_font_found") return None def translate_file( self, input_path: Path, output_path: Path, target_language: str, source_language: str = "auto", progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None, pdf_mode: str = "layout", ) -> Path: input_path = Path(input_path) output_path = Path(output_path) self._validate_file(input_path) if pdf_mode == "text_only": return self._translate_text_only( input_path, output_path, target_language, source_language, progress_callback ) return self._translate_preserve_layout( input_path, output_path, target_language, source_language, progress_callback ) # ------------------------------------------------------------------ # # LAYOUT MODE — PyMuPDF in-place text replacement # ------------------------------------------------------------------ # def _translate_preserve_layout( self, input_path: Path, output_path: Path, target_language: str, source_language: str, progress_callback, ) -> Path: """Translate PDF preserving layout via PyMuPDF direct text replacement.""" start_time = time.time() try: import fitz except ImportError: logger.warning("pymupdf_missing_fallback_docx") return self._translate_preserve_layout_fallback( input_path, output_path, target_language, source_language, progress_callback ) doc = fitz.open(str(input_path)) total_pages = len(doc) if total_pages == 0: doc.close() raise RuntimeError("PDF has no pages.") font_path = self._get_font_path() logger.info( "pdf_layout_start", pages=total_pages, file=input_path.name, font=font_path or "built-in", ) if progress_callback: progress_callback({ "current": 1, "total": total_pages, "phase": "extracting", "paragraph": 1, "total_paragraphs": total_pages, }) try: result_path = self._process_pages_inplace( doc, total_pages, output_path, target_language, source_language, font_path, progress_callback, ) processing_time_ms = round((time.time() - start_time) * 1000, 2) logger.info( "pdf_layout_success", pages=total_pages, processing_time_ms=processing_time_ms, output=str(result_path), ) return result_path except Exception as e: doc.close() logger.warning("inplace_failed_fallback", error=str(e)) return self._translate_preserve_layout_fallback( input_path, output_path, target_language, source_language, progress_callback ) def _process_pages_inplace( self, doc, total_pages: int, output_path: Path, target_language: str, source_language: str, font_path: Optional[str], progress_callback, ) -> Path: """Core PyMuPDF in-place processing — one page at a time.""" import fitz is_rtl = target_language.lower() in RTL_LANGUAGES total_blocks = 0 translated_blocks = 0 for page_num in range(total_pages): page = doc[page_num] raw_blocks = self._extract_text_blocks(page) if not raw_blocks: if progress_callback: pct = int(30 + 65 * (page_num + 1) / total_pages) progress_callback({ "current": page_num + 1, "total": total_pages, "phase": f"Page {page_num + 1}/{total_pages} (no text)", "paragraph": page_num + 1, "total_paragraphs": total_pages, "progress_override": pct, }) continue # Merge adjacent blocks that form a single paragraph blocks = self._merge_adjacent_blocks(raw_blocks, page.rect) total_blocks += len(blocks) # Phase 1: translate all blocks on this page for block in blocks: original = block["text"] if not original.strip(): continue try: translated = self._translate_single( original, target_language, source_language ) if translated and translated.strip(): block["translated"] = translated translated_blocks += 1 else: logger.warning( "block_translation_empty", page=page_num + 1, text_preview=original[:60], ) except Exception as e: logger.warning( "block_translation_failed", page=page_num + 1, error=str(e), ) # Phase 2: redact original text areas for block in blocks: if block.get("translated"): for sub_rect in block["sub_bboxes"]: page.add_redact_annot(fitz.Rect(sub_rect), fill=(1, 1, 1)) page.apply_redactions(images=fitz.PDF_REDACT_IMAGE_NONE) # Phase 3: write translated text for block in blocks: if block.get("translated"): self._write_translated_block( page, block, font_path, is_rtl ) if progress_callback: pct = int(30 + 65 * (page_num + 1) / total_pages) progress_callback({ "current": page_num + 1, "total": total_pages, "phase": f"Translating page {page_num + 1}/{total_pages}", "paragraph": page_num + 1, "total_paragraphs": total_pages, "progress_override": pct, }) logger.info( "pdf_blocks_processed", total_blocks=total_blocks, translated_blocks=translated_blocks, ) output_path.parent.mkdir(parents=True, exist_ok=True) doc.save(str(output_path), garbage=4, deflate=True) doc.close() return output_path def _extract_text_blocks(self, page) -> List[Dict]: """Extract text blocks with position, font, and color information.""" import fitz blocks = [] data = page.get_text("dict", flags=fitz.TEXT_PRESERVE_WHITESPACE) for block in data.get("blocks", []): if block.get("type") != 0: continue lines = block.get("lines", []) if not lines: continue line_parts = [] spans_info = [] for line in lines: span_parts = [] for span in line.get("spans", []): text = span.get("text", "") if text: span_parts.append(text) spans_info.append({ "size": span.get("size", 12), "font": span.get("font", "Helvetica"), "color": span.get("color", 0), "flags": span.get("flags", 0), "origin": span.get("origin", (0, 0)), }) if span_parts: line_parts.append("".join(span_parts)) full_text = "\n".join(line_parts).strip() if not full_text: continue avg_size = ( sum(s["size"] for s in spans_info) / len(spans_info) if spans_info else 12.0 ) first_color = spans_info[0]["color"] if spans_info else 0 is_bold = any(s["flags"] & 16 for s in spans_info) is_italic = any(s["flags"] & 2 for s in spans_info) blocks.append({ "bbox": tuple(block["bbox"]), "text": full_text, "font_size": round(avg_size, 1), "color": first_color, "is_bold": is_bold, "is_italic": is_italic, "line_count": len(line_parts), "translated": None, "sub_bboxes": [tuple(block["bbox"])], }) return blocks def _merge_adjacent_blocks( self, blocks: List[Dict], page_rect ) -> List[Dict]: """Merge consecutive text blocks that form a single paragraph. Blocks are merged when they: - Have the same (or very close) font size - Are vertically adjacent (gap < 1.5× line height) - Have the same x-origin (left-aligned) or same width This produces larger bounding boxes for better translation context and prevents excessive font-size reduction for multi-line paragraphs. """ if len(blocks) <= 1: return blocks merged = [] current = dict(blocks[0]) for next_block in blocks[1:]: should_merge = self._should_merge_blocks(current, next_block) if should_merge: # Merge: combine text and expand bounding box current["text"] += "\n" + next_block["text"] current["line_count"] += next_block["line_count"] # Expand bbox to cover both x0 = min(current["bbox"][0], next_block["bbox"][0]) y0 = min(current["bbox"][1], next_block["bbox"][1]) x1 = max(current["bbox"][2], next_block["bbox"][2]) y1 = max(current["bbox"][3], next_block["bbox"][3]) current["bbox"] = (x0, y0, x1, y1) current["sub_bboxes"].extend(next_block["sub_bboxes"]) else: merged.append(current) current = dict(next_block) merged.append(current) return merged def _should_merge_blocks(self, a: Dict, b: Dict) -> bool: """Check if two blocks should be merged into one paragraph.""" a_bbox = a["bbox"] b_bbox = b["bbox"] # Must have similar font size (within 20%) if abs(a["font_size"] - b["font_size"]) > max(a["font_size"], b["font_size"]) * 0.2: return False # Block b must start soon after block a ends vertically vertical_gap = b_bbox[1] - a_bbox[3] line_height = a["font_size"] * 1.4 if vertical_gap < 0 or vertical_gap > line_height * 1.5: return False # Similar horizontal position (within 15pt) if abs(a_bbox[0] - b_bbox[0]) > 15: return False # Don't merge if widths are very different (likely different columns) a_width = a_bbox[2] - a_bbox[0] b_width = b_bbox[2] - b_bbox[0] if a_width > 0 and abs(b_width - a_width) / a_width > 0.5: return False return True def _write_translated_block( self, page, block: Dict, font_path: Optional[str], is_rtl: bool, ) -> None: """Write translated text into the block's bounding box. Priority: respect original font size as much as possible. Strategy: 1. Try original rect at original font size. 2. Expand bbox to page margins (same font size). 3. Expand bbox vertically downward (same font size). 4. Only THEN shrink font as a last resort, with a floor of 70% original. """ import fitz original_rect = fitz.Rect(block["bbox"]) translated = block["translated"] target_size = block["font_size"] color = self._int_to_rgb(block["color"]) align = fitz.TEXT_ALIGN_RIGHT if is_rtl else fitz.TEXT_ALIGN_LEFT fontname = None fontfile = font_path # Step 1: original rect, original size size = target_size rc = self._try_insert(page, original_rect, translated, size, fontname, fontfile, color, align) if rc is not None and rc >= 0: return # Step 2: expand to page margins (horizontal) page_rect = page.rect margin = 18 expanded_h = fitz.Rect( max(original_rect.x0, page_rect.x0 + margin), original_rect.y0, min(original_rect.x1, page_rect.x1 - margin), original_rect.y1, ) if expanded_h.width > original_rect.width: rc = self._try_insert(page, expanded_h, translated, size, fontname, fontfile, color, align) if rc is not None and rc >= 0: return # Step 3: expand vertically (allow text to flow down) max_expand_y = min(page_rect.y1 - margin - original_rect.y1, original_rect.height * 1.5) expanded = fitz.Rect( expanded_h.x0, expanded_h.y0, expanded_h.x1, expanded_h.y1 + max_expand_y, ) if expanded.height > expanded_h.height: rc = self._try_insert(page, expanded, translated, size, fontname, fontfile, color, align) if rc is not None and rc >= 0: return # Step 4: shrink font — but never below 70% of original min_size = max(target_size * 0.70, MIN_FONT_SIZE) rect = expanded for attempt in range(8): size *= FONT_SHRINK_FACTOR if size < min_size: size = min_size rc = self._try_insert(page, rect, translated, size, fontname, fontfile, color, align) break rc = self._try_insert(page, rect, translated, size, fontname, fontfile, color, align) if rc is not None and rc >= 0: return # Last resort if rc is None or rc < 0: try: page.insert_textbox( rect, translated, fontsize=min_size, fontname=fontname or "helv", fontfile=fontfile, color=color, align=align, overlay=True, ) except Exception as e: logger.warning("textbox_final_failed", error=str(e)) def _try_insert( self, page, rect, text, fontsize, fontname, fontfile, color, align ): """Attempt insert_textbox, returns rc or None on error.""" try: return page.insert_textbox( rect, text, fontsize=fontsize, fontname=fontname, fontfile=fontfile, color=color, align=align, overlay=True, ) except Exception: return None @staticmethod def _int_to_rgb(color_int: int) -> tuple: """Convert integer color (0xRRGGBB) to (r, g, b) float tuple.""" r = ((color_int >> 16) & 0xFF) / 255.0 g = ((color_int >> 8) & 0xFF) / 255.0 b = (color_int & 0xFF) / 255.0 return (r, g, b) # ------------------------------------------------------------------ # # FALLBACK — pdf2docx → WordTranslator → LibreOffice # ------------------------------------------------------------------ # def _translate_preserve_layout_fallback( self, input_path: Path, output_path: Path, target_language: str, source_language: str, progress_callback, ) -> Path: """Fallback: PDF → DOCX (pdf2docx) → WordTranslator → PDF (LibreOffice).""" start_time = time.time() try: if progress_callback: progress_callback({ "current": 1, "total": 3, "phase": "converting", "paragraph": 1, "total_paragraphs": 3, }) docx_path = self._convert_pdf_to_docx(input_path) if progress_callback: progress_callback({ "current": 2, "total": 3, "phase": "translating", "paragraph": 2, "total_paragraphs": 3, }) from translators.word_translator import WordTranslator translated_docx = output_path.with_suffix(".docx") wt = WordTranslator(provider=self._provider) wt.translate_file( docx_path, translated_docx, target_language, source_language, progress_callback=None, ) if progress_callback: progress_callback({ "current": 3, "total": 3, "phase": "converting_back", "paragraph": 3, "total_paragraphs": 3, }) final_path = self._convert_docx_to_pdf(translated_docx, output_path) for tmp in [docx_path, translated_docx]: if tmp.exists() and tmp != final_path: try: tmp.unlink() except Exception: pass processing_time_ms = round((time.time() - start_time) * 1000, 2) logger.info( "pdf_layout_fallback_success", file_name=input_path.name, processing_time_ms=processing_time_ms, output=str(final_path), ) return final_path except Exception as e: logger.error("pdf_layout_fallback_error", file=str(input_path), error=str(e)) raise def _convert_pdf_to_docx(self, pdf_path: Path) -> Path: """Convert PDF to DOCX using pdf2docx.""" try: from pdf2docx import Converter except ImportError: raise RuntimeError("pdf2docx is not installed") docx_path = pdf_path.with_suffix(".docx") cv = Converter(str(pdf_path)) try: cv.convert(str(docx_path)) finally: cv.close() if not docx_path.exists() or docx_path.stat().st_size == 0: raise RuntimeError("PDF conversion produced empty output") from docx import Document doc = Document(str(docx_path)) total_text = "".join(p.text for p in doc.paragraphs).strip() if not total_text: raise RuntimeError("PDF appears to be scanned or contains only images") logger.info("pdf_converted_to_docx", pages=len(doc.paragraphs)) return docx_path def _convert_docx_to_pdf(self, docx_path: Path, target_pdf: Path) -> Path: """Convert DOCX → PDF using LibreOffice headless.""" try: result = subprocess.run( [ "libreoffice", "--headless", "--convert-to", "pdf", "--outdir", str(target_pdf.parent), str(docx_path), ], capture_output=True, text=True, timeout=120, ) expected_pdf = docx_path.with_suffix(".pdf") if expected_pdf.exists() and expected_pdf.stat().st_size > 0: if expected_pdf != target_pdf: shutil.move(str(expected_pdf), str(target_pdf)) logger.info("docx_to_pdf_success") return target_pdf logger.warning("docx_to_pdf_no_output", stderr=result.stderr) except FileNotFoundError: logger.warning("libreoffice_not_found") except subprocess.TimeoutExpired: logger.warning("libreoffice_timeout") except Exception as e: logger.warning("docx_to_pdf_failed", error=str(e)) docx_output = target_pdf.with_suffix(".docx") if docx_path != docx_output and docx_path.exists(): shutil.move(str(docx_path), str(docx_output)) return docx_output # ------------------------------------------------------------------ # # MODE: text_only — extract text, translate, clean PDF output # ------------------------------------------------------------------ # def _translate_text_only( self, input_path: Path, output_path: Path, target_language: str, source_language: str, progress_callback, ) -> Path: """Extract text from PDF, translate, output as a clean formatted PDF.""" import fitz start_time = time.time() doc = fitz.open(str(input_path)) total_pages = len(doc) if total_pages == 0: doc.close() raise RuntimeError("PDF has no pages.") logger.info("pdf_text_only_start", pages=total_pages, file=input_path.name) pages_text = [] for page_num in range(total_pages): page = doc[page_num] text = page.get_text("text").strip() pages_text.append(text) doc.close() non_empty_indices = [i for i, t in enumerate(pages_text) if t] if progress_callback: progress_callback({ "current": 1, "total": 3, "phase": "translating", "paragraph": 1, "total_paragraphs": 3, }) translated_pages = list(pages_text) for seq, page_idx in enumerate(non_empty_indices): text = pages_text[page_idx] if not text.strip(): continue try: translated = self._translate_single(text, target_language, source_language) if translated and translated.strip(): translated_pages[page_idx] = translated else: logger.warning("page_translation_empty", page=page_idx + 1) except Exception as e: logger.warning("page_translation_failed", page=page_idx + 1, error=str(e)) if progress_callback: pct = int(30 + 60 * (seq + 1) / len(non_empty_indices)) progress_callback({ "current": seq + 1, "total": len(non_empty_indices), "phase": f"Translating page {page_idx + 1}/{total_pages}", "paragraph": seq + 1, "total_paragraphs": len(non_empty_indices), "progress_override": pct, }) final_path = output_path.with_suffix(".pdf") self._generate_clean_pdf(translated_pages, final_path, target_language) processing_time_ms = round((time.time() - start_time) * 1000, 2) logger.info( "pdf_text_only_success", file_name=input_path.name, pages=total_pages, processing_time_ms=processing_time_ms, ) return final_path def _generate_clean_pdf( self, pages_text: List[str], output_path: Path, target_language: str = "en" ) -> None: """Generate a clean, well-formatted PDF from translated page texts.""" from reportlab.lib.pagesizes import A4 from reportlab.lib.styles import ParagraphStyle from reportlab.lib.units import mm from reportlab.lib.enums import TA_LEFT, TA_JUSTIFY, TA_RIGHT from reportlab.lib import colors from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak from reportlab.lib.styles import getSampleStyleSheet is_rtl = target_language.lower() in RTL_LANGUAGES alignment = TA_RIGHT if is_rtl else TA_JUSTIFY styles = getSampleStyleSheet() pdf_doc = SimpleDocTemplate( str(output_path), pagesize=A4, leftMargin=25 * mm, rightMargin=25 * mm, topMargin=25 * mm, bottomMargin=25 * mm, ) body_style = ParagraphStyle( "BodyText_Custom", parent=styles["Normal"], fontSize=11, leading=16, spaceAfter=6, alignment=alignment, textColor=colors.HexColor("#1a1a1a"), ) page_number_style = ParagraphStyle( "PageNumber", parent=styles["Normal"], fontSize=9, textColor=colors.HexColor("#999999"), alignment=TA_LEFT, ) elements = [] for i, page_text in enumerate(pages_text): if not page_text.strip(): continue if len(pages_text) > 1: elements.append(Paragraph(f"— Page {i + 1} —", page_number_style)) elements.append(Spacer(1, 8)) for para_text in page_text.split("\n"): para_text = para_text.strip() if not para_text: elements.append(Spacer(1, 4)) continue safe = ( para_text .replace("&", "&") .replace("<", "<") .replace(">", ">") ) try: elements.append(Paragraph(safe, body_style)) except Exception: elements.append( Paragraph( para_text.encode("ascii", "replace").decode(), body_style, ) ) if i < len(pages_text) - 1: elements.append(PageBreak()) if not elements: raise RuntimeError("No text content to generate PDF") pdf_doc.build(elements) # ------------------------------------------------------------------ # # Shared helpers # ------------------------------------------------------------------ # def _translate_single( self, text: str, target_language: str, source_language: str ) -> str: """Translate a single text string.""" if self._provider is not None: try: result = self._provider.translate(text, target_language, source_language) if result and result.strip(): return result except Exception as e: logger.warning("provider_single_failed", error=str(e)) from services.translation_service import translation_service try: return translation_service.translate_text(text, target_language, source_language) except Exception as e: logger.warning("legacy_single_failed", error=str(e)) return text def _translate_batch( self, texts: List[str], target_language: str, source_language: str ) -> List[str]: """Translate a batch of texts.""" if self._provider is not None: try: return self._provider.translate_batch(texts, target_language, source_language) except Exception as e: logger.warning("provider_translate_failed", error=str(e)) from services.translation_service import translation_service try: return translation_service.translate_batch(texts, target_language, source_language) except Exception as e: logger.warning("legacy_translate_failed", error=str(e)) return texts def _validate_file(self, file_path: Path) -> None: if not file_path.exists(): raise FileNotFoundError(f"File not found: {file_path.name}") if file_path.suffix.lower() != ".pdf": raise ValueError(f"Expected .pdf file, got {file_path.suffix}") with open(file_path, "rb") as f: header = f.read(5) if header[:4] != b"%PDF": raise ValueError("File does not appear to be a valid PDF.") pdf_translator = PDFTranslator()