All checks were successful
Deploy to Production / Build and Deploy (push) Successful in 2m36s
875 lines
31 KiB
Python
875 lines
31 KiB
Python
"""
|
||
PDF Document Translation Module — Layout-Preserving
|
||
|
||
Primary strategy (layout mode):
|
||
Use PyMuPDF (fitz) for direct, in-place text replacement on each page.
|
||
|
||
For each page:
|
||
1. Extract text blocks with positions, fonts, sizes, colors
|
||
2. Translate each block as a unit (preserving context within block)
|
||
3. Redact original text area
|
||
4. Write translated text at the same position, auto-adjusting font size
|
||
|
||
This preserves:
|
||
- Page structure, images, vector graphics, backgrounds
|
||
- Text positions within original bounding boxes
|
||
- Approximate font styling (size, color)
|
||
|
||
Fallback:
|
||
If PyMuPDF direct editing fails, falls back to the pdf2docx pipeline
|
||
(PDF → DOCX → WordTranslator → PDF via LibreOffice).
|
||
|
||
Text-only mode:
|
||
Extract text, translate, generate a clean formatted PDF via reportlab.
|
||
"""
|
||
|
||
import time
|
||
import shutil
|
||
import subprocess
|
||
from pathlib import Path
|
||
from typing import Dict, Any, Optional, Callable, List
|
||
|
||
from core.logging import get_logger
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
# Minimum readable font size (points)
|
||
MIN_FONT_SIZE = 4.5
|
||
|
||
# Font size reduction factor when text overflows its bounding box
|
||
FONT_SHRINK_FACTOR = 0.87
|
||
|
||
# RTL language codes
|
||
RTL_LANGUAGES = frozenset({"ar", "he", "fa", "ur", "ku", "ps", "ug", "sd", "yi", "dv", "ckb"})
|
||
|
||
|
||
class PDFTranslator:
|
||
"""Translates PDF files with layout preservation using PyMuPDF."""
|
||
|
||
_FONT_SEARCH_PATHS = [
|
||
"/usr/share/fonts/opentype/noto/NotoSans-Regular.ttf",
|
||
"/usr/share/fonts/opentype/noto/NotoSans.ttc",
|
||
"/usr/share/fonts/truetype/noto/NotoSans-Regular.ttf",
|
||
"/usr/share/fonts/truetype/noto/NotoSans[Noto].ttf",
|
||
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
|
||
"/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf",
|
||
"/usr/share/fonts/truetype/freefont/FreeSans.ttf",
|
||
"/app/fonts/NotoSans-Regular.ttf",
|
||
"C:/Windows/Fonts/arial.ttf",
|
||
"C:/Windows/Fonts/msyh.ttc",
|
||
"/System/Library/Fonts/Helvetica.ttc",
|
||
]
|
||
|
||
def __init__(self, provider=None):
|
||
self._provider = provider
|
||
self._font_path: Optional[str] = None
|
||
self._translation_stats = {"attempted": 0, "changed": 0}
|
||
|
||
def _get_font_path(self) -> Optional[str]:
|
||
"""Resolve a Unicode-capable TTF/OTF font file."""
|
||
if self._font_path is not None:
|
||
return self._font_path
|
||
for p in self._FONT_SEARCH_PATHS:
|
||
if Path(p).exists():
|
||
self._font_path = p
|
||
return p
|
||
logger.warning("no_unicode_font_found")
|
||
return None
|
||
|
||
def translate_file(
|
||
self,
|
||
input_path: Path,
|
||
output_path: Path,
|
||
target_language: str,
|
||
source_language: str = "auto",
|
||
progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None,
|
||
pdf_mode: str = "layout",
|
||
translate_images: bool = False,
|
||
**kwargs,
|
||
) -> Path:
|
||
input_path = Path(input_path)
|
||
output_path = Path(output_path)
|
||
self._validate_file(input_path)
|
||
|
||
if pdf_mode == "text_only":
|
||
return self._translate_text_only(
|
||
input_path, output_path, target_language, source_language, progress_callback
|
||
)
|
||
return self._translate_preserve_layout(
|
||
input_path, output_path, target_language, source_language, progress_callback,
|
||
translate_images=translate_images,
|
||
)
|
||
|
||
# ------------------------------------------------------------------ #
|
||
# LAYOUT MODE — PyMuPDF in-place text replacement
|
||
# ------------------------------------------------------------------ #
|
||
|
||
def _translate_preserve_layout(
|
||
self,
|
||
input_path: Path,
|
||
output_path: Path,
|
||
target_language: str,
|
||
source_language: str,
|
||
progress_callback,
|
||
translate_images: bool = False,
|
||
) -> Path:
|
||
"""Translate PDF preserving layout via PyMuPDF direct text replacement."""
|
||
start_time = time.time()
|
||
|
||
try:
|
||
import fitz
|
||
except ImportError:
|
||
logger.warning("pymupdf_missing_fallback_docx")
|
||
return self._translate_preserve_layout_fallback(
|
||
input_path, output_path, target_language, source_language, progress_callback,
|
||
translate_images=translate_images,
|
||
)
|
||
|
||
doc = fitz.open(str(input_path))
|
||
total_pages = len(doc)
|
||
|
||
if total_pages == 0:
|
||
doc.close()
|
||
raise RuntimeError("PDF has no pages.")
|
||
|
||
font_path = self._get_font_path()
|
||
logger.info(
|
||
"pdf_layout_start",
|
||
pages=total_pages,
|
||
file=input_path.name,
|
||
font=font_path or "built-in",
|
||
)
|
||
|
||
if progress_callback:
|
||
progress_callback({
|
||
"current": 1, "total": total_pages,
|
||
"phase": "extracting",
|
||
"paragraph": 1, "total_paragraphs": total_pages,
|
||
})
|
||
|
||
try:
|
||
result_path = self._process_pages_inplace(
|
||
doc, total_pages, output_path,
|
||
target_language, source_language,
|
||
font_path, progress_callback,
|
||
)
|
||
|
||
processing_time_ms = round((time.time() - start_time) * 1000, 2)
|
||
logger.info(
|
||
"pdf_layout_success",
|
||
pages=total_pages,
|
||
processing_time_ms=processing_time_ms,
|
||
output=str(result_path),
|
||
)
|
||
return result_path
|
||
|
||
except Exception as e:
|
||
doc.close()
|
||
logger.warning("inplace_failed_fallback", error=str(e))
|
||
return self._translate_preserve_layout_fallback(
|
||
input_path, output_path, target_language, source_language, progress_callback,
|
||
translate_images=translate_images,
|
||
)
|
||
|
||
def _process_pages_inplace(
|
||
self,
|
||
doc,
|
||
total_pages: int,
|
||
output_path: Path,
|
||
target_language: str,
|
||
source_language: str,
|
||
font_path: Optional[str],
|
||
progress_callback,
|
||
) -> Path:
|
||
"""Core PyMuPDF in-place processing — one page at a time."""
|
||
import fitz
|
||
|
||
is_rtl = target_language.lower() in RTL_LANGUAGES
|
||
total_blocks = 0
|
||
translated_blocks = 0
|
||
|
||
for page_num in range(total_pages):
|
||
page = doc[page_num]
|
||
raw_blocks = self._extract_text_blocks(page)
|
||
|
||
if not raw_blocks:
|
||
if progress_callback:
|
||
pct = int(30 + 65 * (page_num + 1) / total_pages)
|
||
progress_callback({
|
||
"current": page_num + 1, "total": total_pages,
|
||
"phase": f"Page {page_num + 1}/{total_pages} (no text)",
|
||
"paragraph": page_num + 1,
|
||
"total_paragraphs": total_pages,
|
||
"progress_override": pct,
|
||
})
|
||
continue
|
||
|
||
# Merge adjacent blocks that form a single paragraph
|
||
blocks = self._merge_adjacent_blocks(raw_blocks, page.rect)
|
||
total_blocks += len(blocks)
|
||
|
||
# Phase 1: translate all blocks on this page
|
||
for block in blocks:
|
||
original = block["text"]
|
||
if not original.strip():
|
||
continue
|
||
|
||
try:
|
||
translated = self._translate_single(
|
||
original, target_language, source_language
|
||
)
|
||
if translated and translated.strip():
|
||
block["translated"] = translated
|
||
translated_blocks += 1
|
||
else:
|
||
logger.warning(
|
||
"block_translation_empty",
|
||
page=page_num + 1,
|
||
text_preview=original[:60],
|
||
)
|
||
except Exception as e:
|
||
logger.warning(
|
||
"block_translation_failed",
|
||
page=page_num + 1,
|
||
error=str(e),
|
||
)
|
||
|
||
# Phase 2: redact original text areas
|
||
for block in blocks:
|
||
if block.get("translated"):
|
||
for sub_rect in block["sub_bboxes"]:
|
||
page.add_redact_annot(fitz.Rect(sub_rect), fill=(1, 1, 1))
|
||
|
||
page.apply_redactions(images=fitz.PDF_REDACT_IMAGE_NONE)
|
||
|
||
# Phase 3: write translated text
|
||
for block in blocks:
|
||
if block.get("translated"):
|
||
self._write_translated_block(
|
||
page, block, font_path, is_rtl
|
||
)
|
||
|
||
if progress_callback:
|
||
pct = int(30 + 65 * (page_num + 1) / total_pages)
|
||
progress_callback({
|
||
"current": page_num + 1, "total": total_pages,
|
||
"phase": f"Translating page {page_num + 1}/{total_pages}",
|
||
"paragraph": page_num + 1,
|
||
"total_paragraphs": total_pages,
|
||
"progress_override": pct,
|
||
})
|
||
|
||
logger.info(
|
||
"pdf_blocks_processed",
|
||
total_blocks=total_blocks,
|
||
translated_blocks=translated_blocks,
|
||
)
|
||
|
||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||
doc.save(str(output_path), garbage=4, deflate=True)
|
||
doc.close()
|
||
return output_path
|
||
|
||
def _extract_text_blocks(self, page) -> List[Dict]:
|
||
"""Extract text blocks with position, font, and color information."""
|
||
import fitz
|
||
|
||
blocks = []
|
||
data = page.get_text("dict", flags=fitz.TEXT_PRESERVE_WHITESPACE)
|
||
|
||
for block in data.get("blocks", []):
|
||
if block.get("type") != 0:
|
||
continue
|
||
|
||
lines = block.get("lines", [])
|
||
if not lines:
|
||
continue
|
||
|
||
line_parts = []
|
||
spans_info = []
|
||
|
||
for line in lines:
|
||
span_parts = []
|
||
for span in line.get("spans", []):
|
||
text = span.get("text", "")
|
||
if text:
|
||
span_parts.append(text)
|
||
spans_info.append({
|
||
"size": span.get("size", 12),
|
||
"font": span.get("font", "Helvetica"),
|
||
"color": span.get("color", 0),
|
||
"flags": span.get("flags", 0),
|
||
"origin": span.get("origin", (0, 0)),
|
||
})
|
||
if span_parts:
|
||
line_parts.append("".join(span_parts))
|
||
|
||
full_text = "\n".join(line_parts).strip()
|
||
if not full_text:
|
||
continue
|
||
|
||
avg_size = (
|
||
sum(s["size"] for s in spans_info) / len(spans_info)
|
||
if spans_info
|
||
else 12.0
|
||
)
|
||
first_color = spans_info[0]["color"] if spans_info else 0
|
||
is_bold = any(s["flags"] & 16 for s in spans_info)
|
||
is_italic = any(s["flags"] & 2 for s in spans_info)
|
||
|
||
blocks.append({
|
||
"bbox": tuple(block["bbox"]),
|
||
"text": full_text,
|
||
"font_size": round(avg_size, 1),
|
||
"color": first_color,
|
||
"is_bold": is_bold,
|
||
"is_italic": is_italic,
|
||
"line_count": len(line_parts),
|
||
"translated": None,
|
||
"sub_bboxes": [tuple(block["bbox"])],
|
||
})
|
||
|
||
return blocks
|
||
|
||
def _merge_adjacent_blocks(
|
||
self, blocks: List[Dict], page_rect
|
||
) -> List[Dict]:
|
||
"""Merge consecutive text blocks that form a single paragraph.
|
||
|
||
Blocks are merged when they:
|
||
- Have the same (or very close) font size
|
||
- Are vertically adjacent (gap < 1.5× line height)
|
||
- Have the same x-origin (left-aligned) or same width
|
||
This produces larger bounding boxes for better translation context
|
||
and prevents excessive font-size reduction for multi-line paragraphs.
|
||
"""
|
||
if len(blocks) <= 1:
|
||
return blocks
|
||
|
||
merged = []
|
||
current = dict(blocks[0])
|
||
|
||
for next_block in blocks[1:]:
|
||
should_merge = self._should_merge_blocks(current, next_block)
|
||
if should_merge:
|
||
# Merge: combine text and expand bounding box
|
||
current["text"] += "\n" + next_block["text"]
|
||
current["line_count"] += next_block["line_count"]
|
||
# Expand bbox to cover both
|
||
x0 = min(current["bbox"][0], next_block["bbox"][0])
|
||
y0 = min(current["bbox"][1], next_block["bbox"][1])
|
||
x1 = max(current["bbox"][2], next_block["bbox"][2])
|
||
y1 = max(current["bbox"][3], next_block["bbox"][3])
|
||
current["bbox"] = (x0, y0, x1, y1)
|
||
current["sub_bboxes"].extend(next_block["sub_bboxes"])
|
||
else:
|
||
merged.append(current)
|
||
current = dict(next_block)
|
||
|
||
merged.append(current)
|
||
return merged
|
||
|
||
def _should_merge_blocks(self, a: Dict, b: Dict) -> bool:
|
||
"""Check if two blocks should be merged into one paragraph."""
|
||
a_bbox = a["bbox"]
|
||
b_bbox = b["bbox"]
|
||
|
||
# Must have similar font size (within 20%)
|
||
if abs(a["font_size"] - b["font_size"]) > max(a["font_size"], b["font_size"]) * 0.2:
|
||
return False
|
||
|
||
# Block b must start soon after block a ends vertically
|
||
vertical_gap = b_bbox[1] - a_bbox[3]
|
||
line_height = a["font_size"] * 1.4
|
||
if vertical_gap < 0 or vertical_gap > line_height * 1.5:
|
||
return False
|
||
|
||
# Similar horizontal position (within 15pt)
|
||
if abs(a_bbox[0] - b_bbox[0]) > 15:
|
||
return False
|
||
|
||
# Don't merge if widths are very different (likely different columns)
|
||
a_width = a_bbox[2] - a_bbox[0]
|
||
b_width = b_bbox[2] - b_bbox[0]
|
||
if a_width > 0 and abs(b_width - a_width) / a_width > 0.5:
|
||
return False
|
||
|
||
return True
|
||
|
||
def _write_translated_block(
|
||
self,
|
||
page,
|
||
block: Dict,
|
||
font_path: Optional[str],
|
||
is_rtl: bool,
|
||
) -> None:
|
||
"""Write translated text into the block's bounding box.
|
||
|
||
Priority: respect original font size as much as possible.
|
||
Strategy:
|
||
1. Try original rect at original font size.
|
||
2. Expand bbox to page margins (same font size).
|
||
3. Expand bbox vertically downward (same font size).
|
||
4. Only THEN shrink font as a last resort, with a floor of 70% original.
|
||
"""
|
||
import fitz
|
||
|
||
original_rect = fitz.Rect(block["bbox"])
|
||
translated = block["translated"]
|
||
target_size = block["font_size"]
|
||
|
||
color = self._int_to_rgb(block["color"])
|
||
align = fitz.TEXT_ALIGN_RIGHT if is_rtl else fitz.TEXT_ALIGN_LEFT
|
||
|
||
fontname = None
|
||
fontfile = font_path
|
||
|
||
# Step 1: original rect, original size
|
||
size = target_size
|
||
rc = self._try_insert(page, original_rect, translated, size, fontname, fontfile, color, align)
|
||
if rc is not None and rc >= 0:
|
||
return
|
||
|
||
# Step 2: expand to page margins (horizontal)
|
||
page_rect = page.rect
|
||
margin = 18
|
||
expanded_h = fitz.Rect(
|
||
max(original_rect.x0, page_rect.x0 + margin),
|
||
original_rect.y0,
|
||
min(original_rect.x1, page_rect.x1 - margin),
|
||
original_rect.y1,
|
||
)
|
||
if expanded_h.width > original_rect.width:
|
||
rc = self._try_insert(page, expanded_h, translated, size, fontname, fontfile, color, align)
|
||
if rc is not None and rc >= 0:
|
||
return
|
||
|
||
# Step 3: expand vertically (allow text to flow down)
|
||
max_expand_y = min(page_rect.y1 - margin - original_rect.y1, original_rect.height * 1.5)
|
||
expanded = fitz.Rect(
|
||
expanded_h.x0,
|
||
expanded_h.y0,
|
||
expanded_h.x1,
|
||
expanded_h.y1 + max_expand_y,
|
||
)
|
||
if expanded.height > expanded_h.height:
|
||
rc = self._try_insert(page, expanded, translated, size, fontname, fontfile, color, align)
|
||
if rc is not None and rc >= 0:
|
||
return
|
||
|
||
# Step 4: shrink font — but never below 70% of original
|
||
min_size = max(target_size * 0.70, MIN_FONT_SIZE)
|
||
rect = expanded
|
||
for attempt in range(8):
|
||
size *= FONT_SHRINK_FACTOR
|
||
if size < min_size:
|
||
size = min_size
|
||
rc = self._try_insert(page, rect, translated, size, fontname, fontfile, color, align)
|
||
break
|
||
rc = self._try_insert(page, rect, translated, size, fontname, fontfile, color, align)
|
||
if rc is not None and rc >= 0:
|
||
return
|
||
|
||
# Last resort
|
||
if rc is None or rc < 0:
|
||
try:
|
||
page.insert_textbox(
|
||
rect,
|
||
translated,
|
||
fontsize=min_size,
|
||
fontname=fontname or "helv",
|
||
fontfile=fontfile,
|
||
color=color,
|
||
align=align,
|
||
overlay=True,
|
||
)
|
||
except Exception as e:
|
||
logger.warning("textbox_final_failed", error=str(e))
|
||
|
||
def _try_insert(
|
||
self, page, rect, text, fontsize, fontname, fontfile, color, align
|
||
):
|
||
"""Attempt insert_textbox, returns rc or None on error."""
|
||
try:
|
||
return page.insert_textbox(
|
||
rect,
|
||
text,
|
||
fontsize=fontsize,
|
||
fontname=fontname,
|
||
fontfile=fontfile,
|
||
color=color,
|
||
align=align,
|
||
overlay=True,
|
||
)
|
||
except Exception:
|
||
return None
|
||
|
||
@staticmethod
|
||
def _int_to_rgb(color_int: int) -> tuple:
|
||
"""Convert integer color (0xRRGGBB) to (r, g, b) float tuple."""
|
||
r = ((color_int >> 16) & 0xFF) / 255.0
|
||
g = ((color_int >> 8) & 0xFF) / 255.0
|
||
b = (color_int & 0xFF) / 255.0
|
||
return (r, g, b)
|
||
|
||
# ------------------------------------------------------------------ #
|
||
# FALLBACK — pdf2docx → WordTranslator → LibreOffice
|
||
# ------------------------------------------------------------------ #
|
||
|
||
def _translate_preserve_layout_fallback(
|
||
self,
|
||
input_path: Path,
|
||
output_path: Path,
|
||
target_language: str,
|
||
source_language: str,
|
||
progress_callback,
|
||
translate_images: bool = False,
|
||
) -> Path:
|
||
"""Fallback: PDF → DOCX (pdf2docx) → WordTranslator → PDF (LibreOffice)."""
|
||
start_time = time.time()
|
||
|
||
try:
|
||
if progress_callback:
|
||
progress_callback({
|
||
"current": 1, "total": 3,
|
||
"phase": "converting",
|
||
"paragraph": 1, "total_paragraphs": 3,
|
||
})
|
||
|
||
docx_path = self._convert_pdf_to_docx(input_path)
|
||
|
||
if progress_callback:
|
||
progress_callback({
|
||
"current": 2, "total": 3,
|
||
"phase": "translating",
|
||
"paragraph": 2, "total_paragraphs": 3,
|
||
})
|
||
|
||
from translators.word_translator import WordTranslator
|
||
|
||
translated_docx = output_path.with_suffix(".docx")
|
||
wt = WordTranslator(provider=self._provider)
|
||
wt.translate_file(
|
||
docx_path, translated_docx,
|
||
target_language, source_language,
|
||
progress_callback=None,
|
||
translate_images=translate_images,
|
||
)
|
||
|
||
if progress_callback:
|
||
progress_callback({
|
||
"current": 3, "total": 3,
|
||
"phase": "converting_back",
|
||
"paragraph": 3, "total_paragraphs": 3,
|
||
})
|
||
|
||
final_path = self._convert_docx_to_pdf(translated_docx, output_path)
|
||
|
||
for tmp in [docx_path, translated_docx]:
|
||
if tmp.exists() and tmp != final_path:
|
||
try:
|
||
tmp.unlink()
|
||
except Exception:
|
||
pass
|
||
|
||
processing_time_ms = round((time.time() - start_time) * 1000, 2)
|
||
logger.info(
|
||
"pdf_layout_fallback_success",
|
||
file_name=input_path.name,
|
||
processing_time_ms=processing_time_ms,
|
||
output=str(final_path),
|
||
)
|
||
return final_path
|
||
|
||
except Exception as e:
|
||
logger.error("pdf_layout_fallback_error", file=str(input_path), error=str(e))
|
||
raise
|
||
|
||
def _convert_pdf_to_docx(self, pdf_path: Path) -> Path:
|
||
"""Convert PDF to DOCX using pdf2docx."""
|
||
try:
|
||
from pdf2docx import Converter
|
||
except ImportError:
|
||
raise RuntimeError("pdf2docx is not installed")
|
||
|
||
docx_path = pdf_path.with_suffix(".docx")
|
||
cv = Converter(str(pdf_path))
|
||
try:
|
||
cv.convert(str(docx_path))
|
||
finally:
|
||
cv.close()
|
||
|
||
if not docx_path.exists() or docx_path.stat().st_size == 0:
|
||
raise RuntimeError("PDF conversion produced empty output")
|
||
|
||
from docx import Document
|
||
doc = Document(str(docx_path))
|
||
total_text = "".join(p.text for p in doc.paragraphs).strip()
|
||
if not total_text:
|
||
raise RuntimeError("PDF appears to be scanned or contains only images")
|
||
|
||
logger.info("pdf_converted_to_docx", pages=len(doc.paragraphs))
|
||
return docx_path
|
||
|
||
def _convert_docx_to_pdf(self, docx_path: Path, target_pdf: Path) -> Path:
|
||
"""Convert DOCX → PDF using LibreOffice headless."""
|
||
try:
|
||
result = subprocess.run(
|
||
[
|
||
"libreoffice", "--headless", "--convert-to", "pdf",
|
||
"--outdir", str(target_pdf.parent),
|
||
str(docx_path),
|
||
],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=120,
|
||
)
|
||
expected_pdf = docx_path.with_suffix(".pdf")
|
||
if expected_pdf.exists() and expected_pdf.stat().st_size > 0:
|
||
if expected_pdf != target_pdf:
|
||
shutil.move(str(expected_pdf), str(target_pdf))
|
||
logger.info("docx_to_pdf_success")
|
||
return target_pdf
|
||
logger.warning("docx_to_pdf_no_output", stderr=result.stderr)
|
||
except FileNotFoundError:
|
||
logger.warning("libreoffice_not_found")
|
||
except subprocess.TimeoutExpired:
|
||
logger.warning("libreoffice_timeout")
|
||
except Exception as e:
|
||
logger.warning("docx_to_pdf_failed", error=str(e))
|
||
|
||
docx_output = target_pdf.with_suffix(".docx")
|
||
if docx_path != docx_output and docx_path.exists():
|
||
shutil.move(str(docx_path), str(docx_output))
|
||
return docx_output
|
||
|
||
# ------------------------------------------------------------------ #
|
||
# MODE: text_only — extract text, translate, clean PDF output
|
||
# ------------------------------------------------------------------ #
|
||
|
||
def _translate_text_only(
|
||
self,
|
||
input_path: Path,
|
||
output_path: Path,
|
||
target_language: str,
|
||
source_language: str,
|
||
progress_callback,
|
||
) -> Path:
|
||
"""Extract text from PDF, translate, output as a clean formatted PDF."""
|
||
import fitz
|
||
|
||
start_time = time.time()
|
||
doc = fitz.open(str(input_path))
|
||
total_pages = len(doc)
|
||
|
||
if total_pages == 0:
|
||
doc.close()
|
||
raise RuntimeError("PDF has no pages.")
|
||
|
||
logger.info("pdf_text_only_start", pages=total_pages, file=input_path.name)
|
||
|
||
pages_text = []
|
||
for page_num in range(total_pages):
|
||
page = doc[page_num]
|
||
text = page.get_text("text").strip()
|
||
pages_text.append(text)
|
||
doc.close()
|
||
|
||
non_empty_indices = [i for i, t in enumerate(pages_text) if t]
|
||
|
||
if progress_callback:
|
||
progress_callback({
|
||
"current": 1, "total": 3,
|
||
"phase": "translating",
|
||
"paragraph": 1, "total_paragraphs": 3,
|
||
})
|
||
|
||
translated_pages = list(pages_text)
|
||
|
||
for seq, page_idx in enumerate(non_empty_indices):
|
||
text = pages_text[page_idx]
|
||
if not text.strip():
|
||
continue
|
||
try:
|
||
translated = self._translate_single(text, target_language, source_language)
|
||
if translated and translated.strip():
|
||
translated_pages[page_idx] = translated
|
||
else:
|
||
logger.warning("page_translation_empty", page=page_idx + 1)
|
||
except Exception as e:
|
||
logger.warning("page_translation_failed", page=page_idx + 1, error=str(e))
|
||
|
||
if progress_callback:
|
||
pct = int(30 + 60 * (seq + 1) / len(non_empty_indices))
|
||
progress_callback({
|
||
"current": seq + 1,
|
||
"total": len(non_empty_indices),
|
||
"phase": f"Translating page {page_idx + 1}/{total_pages}",
|
||
"paragraph": seq + 1,
|
||
"total_paragraphs": len(non_empty_indices),
|
||
"progress_override": pct,
|
||
})
|
||
|
||
final_path = output_path.with_suffix(".pdf")
|
||
self._generate_clean_pdf(translated_pages, final_path, target_language)
|
||
|
||
processing_time_ms = round((time.time() - start_time) * 1000, 2)
|
||
logger.info(
|
||
"pdf_text_only_success",
|
||
file_name=input_path.name,
|
||
pages=total_pages,
|
||
processing_time_ms=processing_time_ms,
|
||
)
|
||
|
||
return final_path
|
||
|
||
def _generate_clean_pdf(
|
||
self, pages_text: List[str], output_path: Path, target_language: str = "en"
|
||
) -> None:
|
||
"""Generate a clean, well-formatted PDF from translated page texts."""
|
||
from reportlab.lib.pagesizes import A4
|
||
from reportlab.lib.styles import ParagraphStyle
|
||
from reportlab.lib.units import mm
|
||
from reportlab.lib.enums import TA_LEFT, TA_JUSTIFY, TA_RIGHT
|
||
from reportlab.lib import colors
|
||
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak
|
||
from reportlab.lib.styles import getSampleStyleSheet
|
||
|
||
is_rtl = target_language.lower() in RTL_LANGUAGES
|
||
alignment = TA_RIGHT if is_rtl else TA_JUSTIFY
|
||
|
||
styles = getSampleStyleSheet()
|
||
|
||
pdf_doc = SimpleDocTemplate(
|
||
str(output_path),
|
||
pagesize=A4,
|
||
leftMargin=25 * mm,
|
||
rightMargin=25 * mm,
|
||
topMargin=25 * mm,
|
||
bottomMargin=25 * mm,
|
||
)
|
||
|
||
body_style = ParagraphStyle(
|
||
"BodyText_Custom",
|
||
parent=styles["Normal"],
|
||
fontSize=11,
|
||
leading=16,
|
||
spaceAfter=6,
|
||
alignment=alignment,
|
||
textColor=colors.HexColor("#1a1a1a"),
|
||
)
|
||
|
||
page_number_style = ParagraphStyle(
|
||
"PageNumber",
|
||
parent=styles["Normal"],
|
||
fontSize=9,
|
||
textColor=colors.HexColor("#999999"),
|
||
alignment=TA_LEFT,
|
||
)
|
||
|
||
elements = []
|
||
for i, page_text in enumerate(pages_text):
|
||
if not page_text.strip():
|
||
continue
|
||
|
||
if len(pages_text) > 1:
|
||
elements.append(Paragraph(f"— Page {i + 1} —", page_number_style))
|
||
elements.append(Spacer(1, 8))
|
||
|
||
for para_text in page_text.split("\n"):
|
||
para_text = para_text.strip()
|
||
if not para_text:
|
||
elements.append(Spacer(1, 4))
|
||
continue
|
||
|
||
safe = (
|
||
para_text
|
||
.replace("&", "&")
|
||
.replace("<", "<")
|
||
.replace(">", ">")
|
||
)
|
||
|
||
try:
|
||
elements.append(Paragraph(safe, body_style))
|
||
except Exception:
|
||
elements.append(
|
||
Paragraph(
|
||
para_text.encode("ascii", "replace").decode(),
|
||
body_style,
|
||
)
|
||
)
|
||
|
||
if i < len(pages_text) - 1:
|
||
elements.append(PageBreak())
|
||
|
||
if not elements:
|
||
raise RuntimeError("No text content to generate PDF")
|
||
|
||
pdf_doc.build(elements)
|
||
|
||
# ------------------------------------------------------------------ #
|
||
# Shared helpers
|
||
# ------------------------------------------------------------------ #
|
||
|
||
def _translate_single(
|
||
self, text: str, target_language: str, source_language: str
|
||
) -> str:
|
||
"""Translate a single text string."""
|
||
if self._provider is not None:
|
||
try:
|
||
result = self._provider.translate(text, target_language, source_language)
|
||
if result and result.strip():
|
||
return result
|
||
except Exception as e:
|
||
logger.warning("provider_single_failed", error=str(e))
|
||
|
||
from services.translation_service import translation_service
|
||
try:
|
||
return translation_service.translate_text(text, target_language, source_language)
|
||
except Exception as e:
|
||
logger.warning("legacy_single_failed", error=str(e))
|
||
return text
|
||
|
||
def _translate_batch(
|
||
self, texts: List[str], target_language: str, source_language: str
|
||
) -> List[str]:
|
||
"""Translate a batch of texts."""
|
||
non_empty = [t for t in texts if t and t.strip()]
|
||
self._translation_stats["attempted"] += len(non_empty)
|
||
|
||
translated = None
|
||
if self._provider is not None:
|
||
try:
|
||
translated = self._provider.translate_batch(texts, target_language, source_language)
|
||
except Exception as e:
|
||
logger.warning("provider_translate_failed", error=str(e))
|
||
|
||
if translated is None:
|
||
from services.translation_service import translation_service
|
||
try:
|
||
translated = translation_service.translate_batch(texts, target_language, source_language)
|
||
except Exception as e:
|
||
logger.warning("legacy_translate_failed", error=str(e))
|
||
translated = texts
|
||
|
||
changed = sum(1 for orig, trans in zip(texts, translated) if orig != trans and trans.strip())
|
||
self._translation_stats["changed"] += changed
|
||
|
||
return translated
|
||
|
||
def get_translation_stats(self) -> dict:
|
||
return dict(self._translation_stats)
|
||
|
||
def _validate_file(self, file_path: Path) -> None:
|
||
if not file_path.exists():
|
||
raise FileNotFoundError(f"File not found: {file_path.name}")
|
||
if file_path.suffix.lower() != ".pdf":
|
||
raise ValueError(f"Expected .pdf file, got {file_path.suffix}")
|
||
with open(file_path, "rb") as f:
|
||
header = f.read(5)
|
||
if header[:4] != b"%PDF":
|
||
raise ValueError("File does not appear to be a valid PDF.")
|
||
|
||
|
||
pdf_translator = PDFTranslator()
|