334 lines
13 KiB
Python
334 lines
13 KiB
Python
from openpyxl import load_workbook, Workbook
|
||
import asyncio
|
||
from googletrans import Translator
|
||
import os
|
||
from tqdm import tqdm
|
||
import copy
|
||
import re
|
||
import shutil
|
||
import logging
|
||
import xml.etree.ElementTree as ET
|
||
import zipfile
|
||
import tempfile
|
||
from openpyxl.utils import get_column_letter
|
||
from openpyxl.styles import PatternFill, Border, Side, Alignment, Protection, Font
|
||
from pydantic_ai import Agent
|
||
from pydantic_ai.models.openai import OpenAIModel
|
||
from pydantic_ai.providers.openai import OpenAIProvider
|
||
from pydantic import BaseModel, Field
|
||
from typing import Optional
|
||
# Configure logging
|
||
logging.basicConfig(level=None, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
|
||
|
||
|
||
class TranslationOutput(BaseModel):
|
||
translated_text: str
|
||
async def translate_with_ollama(text: str, target_language: str, model_name: str = "llama3.1:latest") -> str:
|
||
"""Translate text using Ollama via PydanticAI
|
||
|
||
Args:
|
||
text: Text to translate
|
||
target_language: Target language code (e.g. 'fr', 'en')
|
||
model_name: Ollama model name (default: llama3:latest)
|
||
|
||
Returns:
|
||
Translated text
|
||
"""
|
||
# Skip empty text
|
||
if not text or text.strip() == "":
|
||
return text
|
||
|
||
try:
|
||
# Create agent with Ollama
|
||
ollama_model = OpenAIModel(
|
||
model_name=model_name, provider=OpenAIProvider(base_url='http://localhost:11434/v1')
|
||
)
|
||
agent = Agent(ollama_model, result_type=TranslationOutput,retries=2)
|
||
# Create the input
|
||
|
||
|
||
# Execute translation
|
||
result = await agent.run(
|
||
f"Translate this text to this language: {text} to {target_language}. Only return the translation, nothing else. NO ADDITIONAL TEXT.AND AVIOID Text to be translated,ADDITIONAL REMARKS,TRY TO understand the contexte for the translation, and avoid literal translation. "
|
||
)
|
||
# print(text)
|
||
# print(result.data.translated_text)
|
||
return result.data.translated_text
|
||
|
||
except Exception as e:
|
||
logging.error(f"Translation error with Ollama: {e}")
|
||
return text # Return original text on error
|
||
|
||
async def translate_text(translator, text, target_language):
|
||
"""Translate text to target language"""
|
||
try:
|
||
translation = await translator.translate(text, dest=target_language)
|
||
return translation.text
|
||
except Exception as e:
|
||
logging.error(f"Translation error: {e}")
|
||
return text # Return original if translation fails
|
||
|
||
def is_formula(text):
|
||
"""Check if cell value is a formula"""
|
||
if isinstance(text, str):
|
||
return text.startswith('=')
|
||
return False
|
||
|
||
def should_translate(cell):
|
||
"""Determine if a cell should be translated"""
|
||
if cell.value is None:
|
||
return False
|
||
|
||
# Skip formulas
|
||
if is_formula(cell.value):
|
||
return False
|
||
|
||
# Only translate string values
|
||
if not isinstance(cell.value, str):
|
||
return False
|
||
|
||
return True
|
||
|
||
def copy_cell_formatting(source_cell, target_cell):
|
||
"""Complete and robust copy of cell formatting"""
|
||
if not source_cell or not target_cell:
|
||
return
|
||
|
||
# Font
|
||
if source_cell.font:
|
||
font = Font(
|
||
name=source_cell.font.name,
|
||
size=source_cell.font.size,
|
||
bold=source_cell.font.bold,
|
||
italic=source_cell.font.italic,
|
||
vertAlign=source_cell.font.vertAlign,
|
||
underline=source_cell.font.underline,
|
||
strike=source_cell.font.strike,
|
||
color=source_cell.font.color
|
||
)
|
||
target_cell.font = font
|
||
|
||
# Fill
|
||
if source_cell.fill:
|
||
fill = PatternFill(
|
||
fill_type=source_cell.fill.fill_type,
|
||
start_color=source_cell.fill.start_color,
|
||
end_color=source_cell.fill.end_color
|
||
)
|
||
target_cell.fill = fill
|
||
|
||
# Border
|
||
if source_cell.border:
|
||
border = Border(
|
||
left=copy.copy(source_cell.border.left) if source_cell.border.left else None,
|
||
right=copy.copy(source_cell.border.right) if source_cell.border.right else None,
|
||
top=copy.copy(source_cell.border.top) if source_cell.border.top else None,
|
||
bottom=copy.copy(source_cell.border.bottom) if source_cell.border.bottom else None,
|
||
diagonal=copy.copy(source_cell.border.diagonal) if source_cell.border.diagonal else None,
|
||
diagonal_direction=source_cell.border.diagonal_direction,
|
||
outline=source_cell.border.outline,
|
||
vertical=source_cell.border.vertical,
|
||
horizontal=source_cell.border.horizontal
|
||
)
|
||
target_cell.border = border
|
||
|
||
# Alignment
|
||
if source_cell.alignment:
|
||
alignment = Alignment(
|
||
horizontal=source_cell.alignment.horizontal,
|
||
vertical=source_cell.alignment.vertical,
|
||
textRotation=source_cell.alignment.textRotation,
|
||
wrapText=source_cell.alignment.wrapText,
|
||
shrinkToFit=source_cell.alignment.shrinkToFit,
|
||
indent=source_cell.alignment.indent,
|
||
relativeIndent=source_cell.alignment.relativeIndent,
|
||
justifyLastLine=source_cell.alignment.justifyLastLine,
|
||
readingOrder=source_cell.alignment.readingOrder
|
||
)
|
||
target_cell.alignment = alignment
|
||
|
||
# Number Format
|
||
if source_cell.number_format:
|
||
target_cell.number_format = source_cell.number_format
|
||
|
||
# Protection
|
||
if source_cell.protection:
|
||
protection = Protection(
|
||
locked=source_cell.protection.locked,
|
||
hidden=source_cell.protection.hidden
|
||
)
|
||
target_cell.protection = protection
|
||
|
||
# Hyperlink
|
||
if source_cell.hyperlink:
|
||
target_cell.hyperlink = copy.copy(source_cell.hyperlink)
|
||
|
||
def copy_sheet_formatting(source_sheet, target_sheet):
|
||
"""Copy all formatting aspects of a sheet"""
|
||
|
||
# Copy column dimensions
|
||
for col_idx in range(1, source_sheet.max_column + 1):
|
||
col_letter = get_column_letter(col_idx)
|
||
if col_letter in source_sheet.column_dimensions:
|
||
source_dim = source_sheet.column_dimensions[col_letter]
|
||
target_sheet.column_dimensions[col_letter].width = source_dim.width
|
||
target_sheet.column_dimensions[col_letter].hidden = source_dim.hidden
|
||
|
||
# Copy row dimensions
|
||
for row_idx in range(1, source_sheet.max_row + 1):
|
||
if row_idx in source_sheet.row_dimensions:
|
||
source_row = source_sheet.row_dimensions[row_idx]
|
||
target_sheet.row_dimensions[row_idx].height = source_row.height
|
||
|
||
async def process_table_xml_safely(zip_path, target_language, translator, translated_cache):
|
||
"""Process table XML files in Excel to translate headers with proper ZIP handling"""
|
||
# Create temp directory
|
||
temp_dir = tempfile.mkdtemp()
|
||
|
||
try:
|
||
# Extract all files
|
||
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
||
zip_ref.extractall(temp_dir)
|
||
|
||
# Find all table XML files
|
||
table_files = []
|
||
for root, dirs, files in os.walk(os.path.join(temp_dir, 'xl', 'tables')):
|
||
for file in files:
|
||
if file.endswith('.xml'):
|
||
table_files.append(os.path.join(root, file))
|
||
|
||
if not table_files:
|
||
logging.info("No table XML files found in the Excel file")
|
||
return
|
||
|
||
logging.info(f"Found {len(table_files)} table XML files to process")
|
||
|
||
# Process each table XML
|
||
for table_file in table_files:
|
||
logging.info(f"Processing table XML: {os.path.basename(table_file)}")
|
||
|
||
# Parse XML
|
||
tree = ET.parse(table_file)
|
||
root = tree.getroot()
|
||
|
||
# Find namespace
|
||
ns = root.tag.split('}')[0].strip('{') if '}' in root.tag else ''
|
||
|
||
# Find and translate table name/displayName if present
|
||
for attr in ['displayName', 'name']:
|
||
if attr in root.attrib:
|
||
original_text = root.attrib[attr]
|
||
if original_text in translated_cache:
|
||
root.attrib[attr] = translated_cache[original_text]
|
||
else:
|
||
translated_text = await translate_text(translator, original_text, target_language)
|
||
root.attrib[attr] = translated_text
|
||
translated_cache[original_text] = translated_text
|
||
|
||
# Find table columns and translate headers
|
||
ns_prefix = '{' + ns + '}' if ns else ''
|
||
columns_tag = f"{ns_prefix}tableColumns" if ns else "tableColumns"
|
||
column_tag = f"{ns_prefix}tableColumn" if ns else "tableColumn"
|
||
|
||
columns_element = root.find(f".//{columns_tag}")
|
||
if columns_element is not None:
|
||
for column in columns_element.findall(f".//{column_tag}"):
|
||
if 'name' in column.attrib:
|
||
header_text = column.attrib['name']
|
||
if header_text in translated_cache:
|
||
column.attrib['name'] = translated_cache[header_text]
|
||
else:
|
||
# translated_header = await translate_text(translator, header_text, target_language)
|
||
translated_header = await translate_with_ollama(header_text, target_language)
|
||
column.attrib['name'] = translated_header
|
||
translated_cache[header_text] = translated_header
|
||
|
||
# Save the changes
|
||
tree.write(table_file, encoding='UTF-8', xml_declaration=True)
|
||
|
||
# Create a new zip file
|
||
new_zip_path = zip_path + '.new'
|
||
with zipfile.ZipFile(new_zip_path, 'w') as new_zip:
|
||
for folder_path, subfolders, files in os.walk(temp_dir):
|
||
for file in files:
|
||
absolute_path = os.path.join(folder_path, file)
|
||
relative_path = os.path.relpath(absolute_path, temp_dir)
|
||
new_zip.write(absolute_path, relative_path)
|
||
|
||
# Replace the old zip with the new one
|
||
shutil.move(new_zip_path, zip_path)
|
||
|
||
finally:
|
||
# Clean up
|
||
shutil.rmtree(temp_dir)
|
||
|
||
async def translate_excel(file_path: str, target_language: str):
|
||
"""Translate Excel file while preserving all formatting including tables"""
|
||
# Verify file exists
|
||
if not os.path.exists(file_path):
|
||
logging.error(f"File not found: {file_path}")
|
||
raise FileNotFoundError(f"The file {file_path} does not exist.")
|
||
|
||
# Create a copy of the original file to work with
|
||
base_name = os.path.splitext(file_path)[0]
|
||
translated_file_path = f"{base_name}_translated_{target_language}.xlsx"
|
||
|
||
logging.info(f"Creating a copy of the original file...")
|
||
shutil.copy2(file_path, translated_file_path)
|
||
|
||
# Open the copied file and modify it in-place
|
||
workbook = load_workbook(translated_file_path)
|
||
translator = Translator()
|
||
|
||
# Track unique values to minimize API calls
|
||
translated_cache = {}
|
||
|
||
# Count total cells for progress bar
|
||
total_cells = sum(
|
||
sheet.max_row * sheet.max_column
|
||
for sheet in workbook.worksheets
|
||
)
|
||
|
||
with tqdm(total=total_cells, desc=f"Translating to {target_language}") as progress_bar:
|
||
# Process each sheet
|
||
for sheet in workbook.worksheets:
|
||
logging.info(f"Processing sheet: {sheet.title} ({sheet.max_row} rows × {sheet.max_column} columns)")
|
||
|
||
# Process cells row by row, column by column
|
||
for row in range(1, sheet.max_row + 1):
|
||
for col in range(1, sheet.max_column + 1):
|
||
cell = sheet.cell(row=row, column=col)
|
||
progress_bar.update(1)
|
||
|
||
# Check if cell should be translated
|
||
if should_translate(cell):
|
||
original_text = str(cell.value)
|
||
|
||
# Use cached translation if available
|
||
if original_text in translated_cache:
|
||
cell.value = translated_cache[original_text]
|
||
else:
|
||
# translated_text = await translate_text(translator, original_text, target_language)
|
||
translated_text = await translate_with_ollama(original_text, target_language)
|
||
cell.value = translated_text
|
||
translated_cache[original_text] = translated_text
|
||
|
||
# Save the translated workbook
|
||
workbook.save(translated_file_path)
|
||
logging.info(f"Basic cell translation complete")
|
||
|
||
# Process table XML files separately to fix table headers
|
||
logging.info("Processing table structures...")
|
||
await process_table_xml_safely(translated_file_path, target_language, translator, translated_cache)
|
||
|
||
logging.info(f"Translation complete! File saved as: {translated_file_path}")
|
||
return translated_file_path
|
||
|
||
async def main():
|
||
input_file = r"F:\Dev\excel-translator\data\sample\test_sample.xlsx"
|
||
language = "fr" # French
|
||
await translate_excel(input_file, language)
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main()) |