392 lines
16 KiB
Python
392 lines
16 KiB
Python
from openpyxl import load_workbook, Workbook
|
||
import asyncio
|
||
from googletrans import Translator
|
||
import os
|
||
from tqdm import tqdm
|
||
import copy
|
||
import re
|
||
import shutil
|
||
import logging
|
||
import xml.etree.ElementTree as ET
|
||
import zipfile
|
||
import tempfile
|
||
from openpyxl.utils import get_column_letter
|
||
from openpyxl.styles import PatternFill, Border, Side, Alignment, Protection, Font
|
||
from pydantic_ai import Agent
|
||
from pydantic_ai.models.openai import OpenAIModel
|
||
from pydantic_ai.providers.openai import OpenAIProvider
|
||
from pydantic import BaseModel, Field
|
||
from typing import Optional, Literal
|
||
# Configure logging
|
||
logging.basicConfig(level=None, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
|
||
|
||
class TranslationOutput(BaseModel):
|
||
translated_text: str
|
||
|
||
async def translate_with_ollama(text: str, target_language: str, model_name: str = "llama3.1:8b") -> str:
|
||
"""Translate text using Ollama via PydanticAI
|
||
|
||
Args:
|
||
text: Text to translate
|
||
target_language: Target language code (e.g. 'fr', 'en')
|
||
model_name: Ollama model name (default: llama3:latest)
|
||
|
||
Returns:
|
||
Translated text
|
||
"""
|
||
# Skip empty text
|
||
if not text or text.strip() == "":
|
||
return text
|
||
|
||
try:
|
||
# Create agent with Ollama
|
||
ollama_model = OpenAIModel(
|
||
model_name=model_name, provider=OpenAIProvider(base_url='http://172.30.124.125:11434/v1')
|
||
)
|
||
agent = Agent(ollama_model, result_type=TranslationOutput,retries=2)
|
||
# Create the input
|
||
|
||
|
||
# Execute translation
|
||
result = await agent.run(
|
||
f"Translate this text to this language: {text} to {target_language}. Only return the translation, nothing else. NO ADDITIONAL TEXT.AND AVIOID Text to be translated,ADDITIONAL REMARKS,TRY TO understand the contexte for the translation, and avoid literal translation. DON'T TrANSLATE NAMES AND SURNAMES and copy just the name for output, "
|
||
)
|
||
# print(text)
|
||
# print(result.data.translated_text)
|
||
return result.data.translated_text
|
||
|
||
except Exception as e:
|
||
logging.error(f"Translation error with Ollama: {e}")
|
||
return text # Return original text on error
|
||
|
||
async def translate_text_google(translator, text, target_language):
|
||
"""Translate text to target language using Google Translate"""
|
||
try:
|
||
translation = await translator.translate(text, dest=target_language)
|
||
return translation.text
|
||
except Exception as e:
|
||
logging.error(f"Translation error with Google Translate: {e}")
|
||
return text # Return original if translation fails
|
||
|
||
async def translate_text(text, target_language, translation_method, translator=None, model_name="llama3.1:8b"):
|
||
"""Unified translation function that uses either Google Translate or LLM
|
||
|
||
Args:
|
||
text: Text to translate
|
||
target_language: Target language code
|
||
translation_method: 'google' or 'llm'
|
||
translator: Google translator instance (required if method is 'google')
|
||
model_name: LLM model name (used if method is 'llm')
|
||
|
||
Returns:
|
||
Translated text
|
||
"""
|
||
if not text or (isinstance(text, str) and text.strip() == ""):
|
||
return text
|
||
|
||
if translation_method == "google":
|
||
if translator is None:
|
||
raise ValueError("Translator instance is required when using Google Translate")
|
||
return await translate_text_google(translator, text, target_language)
|
||
elif translation_method == "llm":
|
||
return await translate_with_ollama(text, target_language, model_name)
|
||
else:
|
||
logging.error(f"Unknown translation method: {translation_method}")
|
||
return text
|
||
|
||
def is_formula(text):
|
||
"""Check if cell value is a formula"""
|
||
if isinstance(text, str):
|
||
return text.startswith('=')
|
||
return False
|
||
|
||
def should_translate(cell):
|
||
"""Determine if a cell should be translated"""
|
||
if cell.value is None:
|
||
return False
|
||
|
||
# Skip formulas
|
||
if is_formula(cell.value):
|
||
return False
|
||
|
||
# Only translate string values
|
||
if not isinstance(cell.value, str):
|
||
return False
|
||
|
||
return True
|
||
|
||
def copy_cell_formatting(source_cell, target_cell):
|
||
"""Complete and robust copy of cell formatting"""
|
||
if not source_cell or not target_cell:
|
||
return
|
||
|
||
# Font
|
||
if source_cell.font:
|
||
font = Font(
|
||
name=source_cell.font.name,
|
||
size=source_cell.font.size,
|
||
bold=source_cell.font.bold,
|
||
italic=source_cell.font.italic,
|
||
vertAlign=source_cell.font.vertAlign,
|
||
underline=source_cell.font.underline,
|
||
strike=source_cell.font.strike,
|
||
color=source_cell.font.color
|
||
)
|
||
target_cell.font = font
|
||
|
||
# Fill
|
||
if source_cell.fill:
|
||
fill = PatternFill(
|
||
fill_type=source_cell.fill.fill_type,
|
||
start_color=source_cell.fill.start_color,
|
||
end_color=source_cell.fill.end_color
|
||
)
|
||
target_cell.fill = fill
|
||
|
||
# Border
|
||
if source_cell.border:
|
||
border = Border(
|
||
left=copy.copy(source_cell.border.left) if source_cell.border.left else None,
|
||
right=copy.copy(source_cell.border.right) if source_cell.border.right else None,
|
||
top=copy.copy(source_cell.border.top) if source_cell.border.top else None,
|
||
bottom=copy.copy(source_cell.border.bottom) if source_cell.border.bottom else None,
|
||
diagonal=copy.copy(source_cell.border.diagonal) if source_cell.border.diagonal else None,
|
||
diagonal_direction=source_cell.border.diagonal_direction,
|
||
outline=source_cell.border.outline,
|
||
vertical=source_cell.border.vertical,
|
||
horizontal=source_cell.border.horizontal
|
||
)
|
||
target_cell.border = border
|
||
|
||
# Alignment
|
||
if source_cell.alignment:
|
||
alignment = Alignment(
|
||
horizontal=source_cell.alignment.horizontal,
|
||
vertical=source_cell.alignment.vertical,
|
||
textRotation=source_cell.alignment.textRotation,
|
||
wrapText=source_cell.alignment.wrapText,
|
||
shrinkToFit=source_cell.alignment.shrinkToFit,
|
||
indent=source_cell.alignment.indent,
|
||
relativeIndent=source_cell.alignment.relativeIndent,
|
||
justifyLastLine=source_cell.alignment.justifyLastLine,
|
||
readingOrder=source_cell.alignment.readingOrder
|
||
)
|
||
target_cell.alignment = alignment
|
||
|
||
# Number Format
|
||
if source_cell.number_format:
|
||
target_cell.number_format = source_cell.number_format
|
||
|
||
# Protection
|
||
if source_cell.protection:
|
||
protection = Protection(
|
||
locked=source_cell.protection.locked,
|
||
hidden=source_cell.protection.hidden
|
||
)
|
||
target_cell.protection = protection
|
||
|
||
# Hyperlink
|
||
if source_cell.hyperlink:
|
||
target_cell.hyperlink = copy.copy(source_cell.hyperlink)
|
||
|
||
def copy_sheet_formatting(source_sheet, target_sheet):
|
||
"""Copy all formatting aspects of a sheet"""
|
||
|
||
# Copy column dimensions
|
||
for col_idx in range(1, source_sheet.max_column + 1):
|
||
col_letter = get_column_letter(col_idx)
|
||
if col_letter in source_sheet.column_dimensions:
|
||
source_dim = source_sheet.column_dimensions[col_letter]
|
||
target_sheet.column_dimensions[col_letter].width = source_dim.width
|
||
target_sheet.column_dimensions[col_letter].hidden = source_dim.hidden
|
||
|
||
# Copy row dimensions
|
||
for row_idx in range(1, source_sheet.max_row + 1):
|
||
if row_idx in source_sheet.row_dimensions:
|
||
source_row = source_sheet.row_dimensions[row_idx]
|
||
target_sheet.row_dimensions[row_idx].height = source_row.height
|
||
|
||
async def process_table_xml_safely(zip_path, target_language, translator, translated_cache, translation_method, model_name):
|
||
"""Process table XML files in Excel to translate headers with proper ZIP handling"""
|
||
# Create temp directory
|
||
temp_dir = tempfile.mkdtemp()
|
||
|
||
try:
|
||
# Extract all files
|
||
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
||
zip_ref.extractall(temp_dir)
|
||
|
||
# Find all table XML files
|
||
table_files = []
|
||
for root, dirs, files in os.walk(os.path.join(temp_dir, 'xl', 'tables')):
|
||
for file in files:
|
||
if file.endswith('.xml'):
|
||
table_files.append(os.path.join(root, file))
|
||
|
||
if not table_files:
|
||
logging.info("No table XML files found in the Excel file")
|
||
return
|
||
|
||
logging.info(f"Found {len(table_files)} table XML files to process")
|
||
|
||
# Process each table XML
|
||
for table_file in table_files:
|
||
logging.info(f"Processing table XML: {os.path.basename(table_file)}")
|
||
|
||
# Parse XML
|
||
tree = ET.parse(table_file)
|
||
root = tree.getroot()
|
||
|
||
# Find namespace
|
||
ns = root.tag.split('}')[0].strip('{') if '}' in root.tag else ''
|
||
|
||
# Find and translate table name/displayName if present
|
||
for attr in ['displayName', 'name']:
|
||
if attr in root.attrib:
|
||
original_text = root.attrib[attr]
|
||
if original_text in translated_cache:
|
||
root.attrib[attr] = translated_cache[original_text]
|
||
else:
|
||
translated_text = await translate_text(original_text, target_language,
|
||
translation_method, translator, model_name)
|
||
root.attrib[attr] = translated_text
|
||
translated_cache[original_text] = translated_text
|
||
|
||
# Find table columns and translate headers
|
||
ns_prefix = '{' + ns + '}' if ns else ''
|
||
columns_tag = f"{ns_prefix}tableColumns" if ns else "tableColumns"
|
||
column_tag = f"{ns_prefix}tableColumn" if ns else "tableColumn"
|
||
|
||
columns_element = root.find(f".//{columns_tag}")
|
||
if columns_element is not None:
|
||
for column in columns_element.findall(f".//{column_tag}"):
|
||
if 'name' in column.attrib:
|
||
header_text = column.attrib['name']
|
||
if header_text in translated_cache:
|
||
column.attrib['name'] = translated_cache[header_text]
|
||
else:
|
||
translated_header = await translate_text(header_text, target_language,
|
||
translation_method, translator, model_name)
|
||
column.attrib['name'] = translated_header
|
||
translated_cache[header_text] = translated_header
|
||
|
||
# Save the changes
|
||
tree.write(table_file, encoding='UTF-8', xml_declaration=True)
|
||
|
||
# Create a new zip file
|
||
new_zip_path = zip_path + '.new'
|
||
with zipfile.ZipFile(new_zip_path, 'w') as new_zip:
|
||
for folder_path, subfolders, files in os.walk(temp_dir):
|
||
for file in files:
|
||
absolute_path = os.path.join(folder_path, file)
|
||
relative_path = os.path.relpath(absolute_path, temp_dir)
|
||
new_zip.write(absolute_path, relative_path)
|
||
|
||
# Replace the old zip with the new one
|
||
shutil.move(new_zip_path, zip_path)
|
||
|
||
finally:
|
||
# Clean up
|
||
shutil.rmtree(temp_dir)
|
||
|
||
async def translate_excel(file_path: str, target_language: str, translation_method: str = "google",
|
||
llm_model: str = "llama3.1:8b"):
|
||
"""Translate Excel file while preserving all formatting including tables
|
||
|
||
Args:
|
||
file_path: Path to Excel file
|
||
target_language: Target language code (e.g. 'fr', 'en')
|
||
translation_method: 'google' or 'llm'
|
||
llm_model: LLM model name (used if method is 'llm')
|
||
"""
|
||
# Verify file exists
|
||
if not os.path.exists(file_path):
|
||
logging.error(f"File not found: {file_path}")
|
||
raise FileNotFoundError(f"The file {file_path} does not exist.")
|
||
|
||
# Validate translation method
|
||
if translation_method not in ["google", "llm"]:
|
||
logging.error(f"Invalid translation method: {translation_method}")
|
||
raise ValueError("Translation method must be 'google' or 'llm'")
|
||
|
||
# Create a copy of the original file to work with
|
||
base_name = os.path.splitext(file_path)[0]
|
||
method_suffix = "gt" if translation_method == "google" else "llm"
|
||
translated_file_path = f"{base_name}_translated_{target_language}_{method_suffix}.xlsx"
|
||
|
||
logging.info(f"Creating a copy of the original file...")
|
||
shutil.copy2(file_path, translated_file_path)
|
||
|
||
# Open the copied file and modify it in-place
|
||
workbook = load_workbook(translated_file_path)
|
||
|
||
# Initialize translator if using Google
|
||
translator = Translator() if translation_method == "google" else None
|
||
|
||
# Track unique values to minimize API calls
|
||
translated_cache = {}
|
||
|
||
# Count total cells for progress bar
|
||
total_cells = sum(
|
||
sheet.max_row * sheet.max_column
|
||
for sheet in workbook.worksheets
|
||
)
|
||
|
||
with tqdm(total=total_cells, desc=f"Translating to {target_language} using {translation_method}") as progress_bar:
|
||
# Process each sheet
|
||
for sheet in workbook.worksheets:
|
||
logging.info(f"Processing sheet: {sheet.title} ({sheet.max_row} rows × {sheet.max_column} columns)")
|
||
|
||
# Process cells row by row, column by column
|
||
for row in range(1, sheet.max_row + 1):
|
||
for col in range(1, sheet.max_column + 1):
|
||
cell = sheet.cell(row=row, column=col)
|
||
progress_bar.update(1)
|
||
|
||
# Check if cell should be translated
|
||
if should_translate(cell):
|
||
original_text = str(cell.value)
|
||
|
||
# Use cached translation if available
|
||
if original_text in translated_cache:
|
||
cell.value = translated_cache[original_text]
|
||
else:
|
||
translated_text = await translate_text(
|
||
original_text,
|
||
target_language,
|
||
translation_method,
|
||
translator,
|
||
llm_model
|
||
)
|
||
cell.value = translated_text
|
||
translated_cache[original_text] = translated_text
|
||
|
||
# Save the translated workbook
|
||
workbook.save(translated_file_path)
|
||
logging.info(f"Basic cell translation complete")
|
||
|
||
# Process table XML files separately to fix table headers
|
||
logging.info("Processing table structures...")
|
||
await process_table_xml_safely(translated_file_path, target_language,
|
||
translator, translated_cache,
|
||
translation_method, llm_model)
|
||
|
||
logging.info(f"Translation complete! File saved as: {translated_file_path}")
|
||
return translated_file_path
|
||
|
||
async def main():
|
||
input_file = r"C:\Users\serameza\host-data\Excels\BOLT eCAT System Modeling Q&A.xlsx"
|
||
language = "en" # French
|
||
|
||
# Choose translation method: "google" or "llm"
|
||
translation_method = "llm" # Change to "llm" to use the LLM instead
|
||
|
||
# LLM model name (only used if translation_method is "llm")
|
||
# llm_model = "llama3.1:8b"
|
||
llm_model = "qwen2.5:14b"
|
||
|
||
await translate_excel(input_file, language, translation_method, llm_model)
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main()) |