594 lines
24 KiB
Python
594 lines
24 KiB
Python
"""
|
|
Translation Service Abstraction
|
|
Provides a unified interface for different translation providers
|
|
Optimized for high performance with parallel processing
|
|
"""
|
|
from abc import ABC, abstractmethod
|
|
from typing import Optional, List, Dict, Tuple
|
|
import requests
|
|
from deep_translator import GoogleTranslator, DeeplTranslator, LibreTranslator
|
|
from config import config
|
|
import concurrent.futures
|
|
import threading
|
|
import asyncio
|
|
from functools import lru_cache
|
|
import time
|
|
|
|
|
|
# Global thread pool for parallel translations
|
|
_executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)
|
|
|
|
|
|
class TranslationProvider(ABC):
|
|
"""Abstract base class for translation providers"""
|
|
|
|
@abstractmethod
|
|
def translate(self, text: str, target_language: str, source_language: str = 'auto') -> str:
|
|
"""Translate text from source to target language"""
|
|
pass
|
|
|
|
def translate_batch(self, texts: List[str], target_language: str, source_language: str = 'auto') -> List[str]:
|
|
"""Translate multiple texts at once - default implementation"""
|
|
return [self.translate(text, target_language, source_language) for text in texts]
|
|
|
|
def translate_batch_parallel(self, texts: List[str], target_language: str, source_language: str = 'auto', max_workers: int = 4) -> List[str]:
|
|
"""Parallel batch translation using thread pool"""
|
|
if not texts:
|
|
return []
|
|
|
|
results = [''] * len(texts)
|
|
non_empty = [(i, t) for i, t in enumerate(texts) if t and t.strip()]
|
|
|
|
if not non_empty:
|
|
return [t if t else '' for t in texts]
|
|
|
|
def translate_one(item: Tuple[int, str]) -> Tuple[int, str]:
|
|
idx, text = item
|
|
try:
|
|
return (idx, self.translate(text, target_language, source_language))
|
|
except Exception as e:
|
|
print(f"Translation error at index {idx}: {e}")
|
|
return (idx, text)
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
for idx, translated in executor.map(translate_one, non_empty):
|
|
results[idx] = translated
|
|
|
|
# Fill empty positions
|
|
for i, text in enumerate(texts):
|
|
if not text or not text.strip():
|
|
results[i] = text if text else ''
|
|
|
|
return results
|
|
|
|
|
|
class GoogleTranslationProvider(TranslationProvider):
|
|
"""Google Translate implementation with batch support"""
|
|
|
|
def __init__(self):
|
|
self._local = threading.local()
|
|
|
|
def _get_translator(self, source_language: str, target_language: str) -> GoogleTranslator:
|
|
"""Get or create a translator instance for the current thread"""
|
|
key = f"{source_language}_{target_language}"
|
|
if not hasattr(self._local, 'translators'):
|
|
self._local.translators = {}
|
|
if key not in self._local.translators:
|
|
self._local.translators[key] = GoogleTranslator(source=source_language, target=target_language)
|
|
return self._local.translators[key]
|
|
|
|
def translate(self, text: str, target_language: str, source_language: str = 'auto') -> str:
|
|
if not text or not text.strip():
|
|
return text
|
|
|
|
try:
|
|
translator = self._get_translator(source_language, target_language)
|
|
return translator.translate(text)
|
|
except Exception as e:
|
|
print(f"Translation error: {e}")
|
|
return text
|
|
|
|
def translate_batch(self, texts: List[str], target_language: str, source_language: str = 'auto', batch_size: int = 50) -> List[str]:
|
|
"""
|
|
Translate multiple texts using batch processing for speed.
|
|
Uses deep_translator's batch capability when possible.
|
|
"""
|
|
if not texts:
|
|
return []
|
|
|
|
# Filter and track empty texts
|
|
results = [''] * len(texts)
|
|
non_empty_indices = []
|
|
non_empty_texts = []
|
|
|
|
for i, text in enumerate(texts):
|
|
if text and text.strip():
|
|
non_empty_indices.append(i)
|
|
non_empty_texts.append(text)
|
|
else:
|
|
results[i] = text if text else ''
|
|
|
|
if not non_empty_texts:
|
|
return results
|
|
|
|
try:
|
|
translator = GoogleTranslator(source=source_language, target=target_language)
|
|
|
|
# Process in batches
|
|
translated_texts = []
|
|
for i in range(0, len(non_empty_texts), batch_size):
|
|
batch = non_empty_texts[i:i + batch_size]
|
|
try:
|
|
# Use translate_batch if available
|
|
if hasattr(translator, 'translate_batch'):
|
|
batch_result = translator.translate_batch(batch)
|
|
else:
|
|
# Fallback: join with separator, translate, split
|
|
separator = "\n|||SPLIT|||\n"
|
|
combined = separator.join(batch)
|
|
translated_combined = translator.translate(combined)
|
|
if translated_combined:
|
|
batch_result = translated_combined.split("|||SPLIT|||")
|
|
# Clean up results
|
|
batch_result = [t.strip() for t in batch_result]
|
|
# If split didn't work correctly, fall back to individual
|
|
if len(batch_result) != len(batch):
|
|
batch_result = [translator.translate(t) for t in batch]
|
|
else:
|
|
batch_result = batch
|
|
translated_texts.extend(batch_result)
|
|
except Exception as e:
|
|
print(f"Batch translation error, falling back to individual: {e}")
|
|
for text in batch:
|
|
try:
|
|
translated_texts.append(translator.translate(text))
|
|
except:
|
|
translated_texts.append(text)
|
|
|
|
# Map back to original positions
|
|
for idx, translated in zip(non_empty_indices, translated_texts):
|
|
results[idx] = translated if translated else texts[idx]
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
print(f"Batch translation failed: {e}")
|
|
# Fallback to individual translations
|
|
for idx, text in zip(non_empty_indices, non_empty_texts):
|
|
try:
|
|
results[idx] = GoogleTranslator(source=source_language, target=target_language).translate(text) or text
|
|
except:
|
|
results[idx] = text
|
|
return results
|
|
|
|
|
|
class DeepLTranslationProvider(TranslationProvider):
|
|
"""DeepL Translate implementation with batch support"""
|
|
|
|
def __init__(self, api_key: str):
|
|
self.api_key = api_key
|
|
self._translator_cache = {}
|
|
|
|
def _get_translator(self, source_language: str, target_language: str) -> DeeplTranslator:
|
|
key = f"{source_language}_{target_language}"
|
|
if key not in self._translator_cache:
|
|
self._translator_cache[key] = DeeplTranslator(api_key=self.api_key, source=source_language, target=target_language)
|
|
return self._translator_cache[key]
|
|
|
|
def translate(self, text: str, target_language: str, source_language: str = 'auto') -> str:
|
|
if not text or not text.strip():
|
|
return text
|
|
|
|
try:
|
|
translator = self._get_translator(source_language, target_language)
|
|
return translator.translate(text)
|
|
except Exception as e:
|
|
print(f"Translation error: {e}")
|
|
return text
|
|
|
|
def translate_batch(self, texts: List[str], target_language: str, source_language: str = 'auto') -> List[str]:
|
|
"""Batch translate using DeepL"""
|
|
if not texts:
|
|
return []
|
|
|
|
results = [''] * len(texts)
|
|
non_empty = [(i, t) for i, t in enumerate(texts) if t and t.strip()]
|
|
|
|
if not non_empty:
|
|
return [t if t else '' for t in texts]
|
|
|
|
try:
|
|
translator = self._get_translator(source_language, target_language)
|
|
non_empty_texts = [t for _, t in non_empty]
|
|
|
|
if hasattr(translator, 'translate_batch'):
|
|
translated = translator.translate_batch(non_empty_texts)
|
|
else:
|
|
translated = [translator.translate(t) for t in non_empty_texts]
|
|
|
|
for (idx, _), trans in zip(non_empty, translated):
|
|
results[idx] = trans if trans else texts[idx]
|
|
|
|
# Fill empty positions
|
|
for i, text in enumerate(texts):
|
|
if not text or not text.strip():
|
|
results[i] = text if text else ''
|
|
|
|
return results
|
|
except Exception as e:
|
|
print(f"DeepL batch error: {e}")
|
|
return [self.translate(t, target_language, source_language) for t in texts]
|
|
|
|
|
|
class LibreTranslationProvider(TranslationProvider):
|
|
"""LibreTranslate implementation with batch support"""
|
|
|
|
def __init__(self, custom_url: str = "https://libretranslate.com"):
|
|
self.custom_url = custom_url
|
|
self._translator_cache = {}
|
|
|
|
def _get_translator(self, source_language: str, target_language: str) -> LibreTranslator:
|
|
key = f"{source_language}_{target_language}"
|
|
if key not in self._translator_cache:
|
|
self._translator_cache[key] = LibreTranslator(source=source_language, target=target_language, custom_url=self.custom_url)
|
|
return self._translator_cache[key]
|
|
|
|
def translate(self, text: str, target_language: str, source_language: str = 'auto') -> str:
|
|
if not text or not text.strip():
|
|
return text
|
|
|
|
try:
|
|
translator = self._get_translator(source_language, target_language)
|
|
return translator.translate(text)
|
|
except Exception as e:
|
|
print(f"LibreTranslate error: {e}")
|
|
return text
|
|
|
|
def translate_batch(self, texts: List[str], target_language: str, source_language: str = 'auto') -> List[str]:
|
|
"""Batch translate using LibreTranslate"""
|
|
if not texts:
|
|
return []
|
|
|
|
results = [''] * len(texts)
|
|
non_empty = [(i, t) for i, t in enumerate(texts) if t and t.strip()]
|
|
|
|
if not non_empty:
|
|
return [t if t else '' for t in texts]
|
|
|
|
try:
|
|
translator = self._get_translator(source_language, target_language)
|
|
|
|
for idx, text in non_empty:
|
|
try:
|
|
results[idx] = translator.translate(text) or text
|
|
except:
|
|
results[idx] = text
|
|
|
|
for i, text in enumerate(texts):
|
|
if not text or not text.strip():
|
|
results[i] = text if text else ''
|
|
|
|
return results
|
|
except Exception as e:
|
|
print(f"LibreTranslate batch error: {e}")
|
|
return texts
|
|
|
|
|
|
class OllamaTranslationProvider(TranslationProvider):
|
|
"""Ollama LLM translation implementation"""
|
|
|
|
def __init__(self, base_url: str = "http://localhost:11434", model: str = "llama3", vision_model: str = "llava", system_prompt: str = ""):
|
|
self.base_url = base_url.rstrip('/')
|
|
self.model = model.strip() # Remove any leading/trailing whitespace
|
|
self.vision_model = vision_model.strip()
|
|
self.custom_system_prompt = system_prompt # Custom context, glossary, instructions
|
|
|
|
def translate(self, text: str, target_language: str, source_language: str = 'auto') -> str:
|
|
if not text or not text.strip():
|
|
return text
|
|
|
|
# Skip very short text or numbers only
|
|
if len(text.strip()) < 2 or text.strip().isdigit():
|
|
return text
|
|
|
|
try:
|
|
# Build system prompt with custom context if provided
|
|
base_prompt = f"""You are a professional translator. Your ONLY task is to translate text to {target_language}.
|
|
|
|
CRITICAL RULES:
|
|
1. Output ONLY the translated text - no explanations, no comments, no notes
|
|
2. Preserve the exact formatting (line breaks, spacing, punctuation)
|
|
3. Do NOT add any prefixes like "Here's the translation:" or "Translation:"
|
|
4. Do NOT refuse to translate or ask clarifying questions
|
|
5. If the text is already in {target_language}, return it unchanged
|
|
6. Translate everything literally and accurately
|
|
7. NEVER provide comments, opinions, or explanations - you are JUST a translator
|
|
8. If you have any doubt about the translation, return the original text unchanged
|
|
9. Do not interpret or analyze the content - simply translate word by word
|
|
10. Your response must contain ONLY the translated text, nothing else"""
|
|
|
|
if self.custom_system_prompt:
|
|
system_content = f"""{base_prompt}
|
|
|
|
ADDITIONAL CONTEXT AND INSTRUCTIONS:
|
|
{self.custom_system_prompt}"""
|
|
else:
|
|
system_content = base_prompt
|
|
|
|
# Use /api/chat endpoint (more compatible with all models)
|
|
response = requests.post(
|
|
f"{self.base_url}/api/chat",
|
|
json={
|
|
"model": self.model,
|
|
"messages": [
|
|
{
|
|
"role": "system",
|
|
"content": system_content
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": text
|
|
}
|
|
],
|
|
"stream": False,
|
|
"options": {
|
|
"temperature": 0.3,
|
|
"num_predict": 500
|
|
}
|
|
},
|
|
timeout=120 # 2 minutes timeout
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
translated = result.get("message", {}).get("content", "").strip()
|
|
return translated if translated else text
|
|
except requests.exceptions.ConnectionError:
|
|
print(f"Ollama error: Cannot connect to {self.base_url}. Is Ollama running?")
|
|
return text
|
|
except requests.exceptions.Timeout:
|
|
print(f"Ollama error: Request timeout after 120s")
|
|
return text
|
|
except Exception as e:
|
|
print(f"Ollama translation error: {e}")
|
|
return text
|
|
|
|
def translate_image(self, image_path: str, target_language: str) -> str:
|
|
"""Translate text within an image using Ollama vision model"""
|
|
import base64
|
|
|
|
try:
|
|
# Read and encode image
|
|
with open(image_path, 'rb') as img_file:
|
|
image_data = base64.b64encode(img_file.read()).decode('utf-8')
|
|
|
|
# Use /api/chat for vision models too
|
|
response = requests.post(
|
|
f"{self.base_url}/api/chat",
|
|
json={
|
|
"model": self.vision_model,
|
|
"messages": [
|
|
{
|
|
"role": "user",
|
|
"content": f"Extract all text from this image and translate it to {target_language}. Return ONLY the translated text, preserving the structure and formatting.",
|
|
"images": [image_data]
|
|
}
|
|
],
|
|
"stream": False
|
|
},
|
|
timeout=60
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
return result.get("message", {}).get("content", "").strip()
|
|
except Exception as e:
|
|
print(f"Ollama vision translation error: {e}")
|
|
return ""
|
|
|
|
@staticmethod
|
|
def list_models(base_url: str = "http://localhost:11434") -> List[str]:
|
|
"""List available Ollama models"""
|
|
try:
|
|
response = requests.get(f"{base_url.rstrip('/')}/api/tags", timeout=5)
|
|
response.raise_for_status()
|
|
models = response.json().get("models", [])
|
|
return [model["name"] for model in models]
|
|
except Exception as e:
|
|
print(f"Error listing Ollama models: {e}")
|
|
return []
|
|
|
|
|
|
class WebLLMTranslationProvider(TranslationProvider):
|
|
"""WebLLM browser-based translation (client-side processing)"""
|
|
|
|
def translate(self, text: str, target_language: str, source_language: str = 'auto') -> str:
|
|
# WebLLM translation happens client-side in the browser
|
|
# This is just a placeholder - actual translation is done by JavaScript
|
|
# For server-side, we'll just pass through for now
|
|
return text
|
|
|
|
|
|
class OpenAITranslationProvider(TranslationProvider):
|
|
"""OpenAI GPT translation implementation with vision support"""
|
|
|
|
def __init__(self, api_key: str, model: str = "gpt-4o-mini", system_prompt: str = ""):
|
|
self.api_key = api_key
|
|
self.model = model
|
|
self.custom_system_prompt = system_prompt
|
|
|
|
def translate(self, text: str, target_language: str, source_language: str = 'auto') -> str:
|
|
if not text or not text.strip():
|
|
return text
|
|
|
|
# Skip very short text or numbers only
|
|
if len(text.strip()) < 2 or text.strip().isdigit():
|
|
return text
|
|
|
|
try:
|
|
import openai
|
|
client = openai.OpenAI(api_key=self.api_key)
|
|
|
|
# Build system prompt with custom context if provided
|
|
base_prompt = f"""You are a professional translator. Your ONLY task is to translate text to {target_language}.
|
|
|
|
CRITICAL RULES:
|
|
1. Output ONLY the translated text - no explanations, no comments, no notes
|
|
2. Preserve the exact formatting (line breaks, spacing, punctuation)
|
|
3. Do NOT add any prefixes like "Here's the translation:" or "Translation:"
|
|
4. Do NOT refuse to translate or ask clarifying questions
|
|
5. If the text is already in {target_language}, return it unchanged
|
|
6. Translate everything literally and accurately
|
|
7. NEVER provide comments, opinions, or explanations - you are JUST a translator
|
|
8. If you have any doubt about the translation, return the original text unchanged
|
|
9. Do not interpret or analyze the content - simply translate word by word
|
|
10. Your response must contain ONLY the translated text, nothing else"""
|
|
|
|
if self.custom_system_prompt:
|
|
system_content = f"""{base_prompt}
|
|
|
|
ADDITIONAL CONTEXT AND INSTRUCTIONS:
|
|
{self.custom_system_prompt}"""
|
|
else:
|
|
system_content = base_prompt
|
|
|
|
response = client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[
|
|
{"role": "system", "content": system_content},
|
|
{"role": "user", "content": text}
|
|
],
|
|
temperature=0.3,
|
|
max_tokens=500
|
|
)
|
|
|
|
translated = response.choices[0].message.content.strip()
|
|
return translated if translated else text
|
|
except Exception as e:
|
|
print(f"OpenAI translation error: {e}")
|
|
return text
|
|
|
|
def translate_image(self, image_path: str, target_language: str) -> str:
|
|
"""Translate text within an image using OpenAI vision model"""
|
|
import base64
|
|
|
|
try:
|
|
import openai
|
|
client = openai.OpenAI(api_key=self.api_key)
|
|
|
|
# Read and encode image
|
|
with open(image_path, 'rb') as img_file:
|
|
image_data = base64.b64encode(img_file.read()).decode('utf-8')
|
|
|
|
# Determine image type from extension
|
|
ext = image_path.lower().split('.')[-1]
|
|
media_type = f"image/{ext}" if ext in ['png', 'jpg', 'jpeg', 'gif', 'webp'] else "image/png"
|
|
|
|
response = client.chat.completions.create(
|
|
model=self.model, # gpt-4o and gpt-4o-mini support vision
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": f"Extract all text from this image and translate it to {target_language}. Return ONLY the translated text, preserving the structure and formatting."
|
|
},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:{media_type};base64,{image_data}"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
],
|
|
max_tokens=1000
|
|
)
|
|
|
|
return response.choices[0].message.content.strip()
|
|
except Exception as e:
|
|
print(f"OpenAI vision translation error: {e}")
|
|
return ""
|
|
|
|
|
|
class TranslationService:
|
|
"""Main translation service that delegates to the configured provider"""
|
|
|
|
def __init__(self, provider: Optional[TranslationProvider] = None):
|
|
if provider:
|
|
self.provider = provider
|
|
else:
|
|
# Auto-select provider based on configuration
|
|
self.provider = self._get_default_provider()
|
|
self.translate_images = False # Flag to enable image translation
|
|
|
|
def _get_default_provider(self) -> TranslationProvider:
|
|
"""Get the default translation provider from configuration"""
|
|
# Always use Google Translate by default to avoid API key issues
|
|
# Provider will be overridden per request in the API endpoint
|
|
return GoogleTranslationProvider()
|
|
|
|
def translate_text(self, text: str, target_language: str, source_language: str = 'auto') -> str:
|
|
"""
|
|
Translate a single text string
|
|
|
|
Args:
|
|
text: Text to translate
|
|
target_language: Target language code (e.g., 'es', 'fr', 'de')
|
|
source_language: Source language code (default: 'auto' for auto-detection)
|
|
|
|
Returns:
|
|
Translated text
|
|
"""
|
|
if not text or not text.strip():
|
|
return text
|
|
|
|
return self.provider.translate(text, target_language, source_language)
|
|
|
|
def translate_image(self, image_path: str, target_language: str) -> str:
|
|
"""
|
|
Translate text in an image using vision model (Ollama or OpenAI)
|
|
|
|
Args:
|
|
image_path: Path to image file
|
|
target_language: Target language code
|
|
|
|
Returns:
|
|
Translated text from image
|
|
"""
|
|
if not self.translate_images:
|
|
return ""
|
|
|
|
# Ollama and OpenAI support image translation
|
|
if isinstance(self.provider, OllamaTranslationProvider):
|
|
return self.provider.translate_image(image_path, target_language)
|
|
elif isinstance(self.provider, OpenAITranslationProvider):
|
|
return self.provider.translate_image(image_path, target_language)
|
|
|
|
return ""
|
|
|
|
def translate_batch(self, texts: list[str], target_language: str, source_language: str = 'auto') -> list[str]:
|
|
"""
|
|
Translate multiple text strings efficiently using batch processing.
|
|
|
|
Args:
|
|
texts: List of texts to translate
|
|
target_language: Target language code
|
|
source_language: Source language code (default: 'auto')
|
|
|
|
Returns:
|
|
List of translated texts
|
|
"""
|
|
if not texts:
|
|
return []
|
|
|
|
# Use provider's batch method if available
|
|
if hasattr(self.provider, 'translate_batch'):
|
|
return self.provider.translate_batch(texts, target_language, source_language)
|
|
|
|
# Fallback to individual translations
|
|
return [self.translate_text(text, target_language, source_language) for text in texts]
|
|
|
|
|
|
# Global translation service instance
|
|
translation_service = TranslationService()
|