office_translator/services/translation_service.py

680 lines
27 KiB
Python

"""
Translation Service Abstraction
Provides a unified interface for different translation providers
Optimized for high performance with parallel processing and caching
"""
from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Tuple
import requests
from deep_translator import GoogleTranslator, DeeplTranslator, LibreTranslator
from config import config
import concurrent.futures
import threading
import asyncio
from functools import lru_cache
import time
import hashlib
from collections import OrderedDict
# Global thread pool for parallel translations
_executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)
class TranslationCache:
"""Thread-safe LRU cache for translations to avoid redundant API calls"""
def __init__(self, maxsize: int = 5000):
self.cache: OrderedDict = OrderedDict()
self.maxsize = maxsize
self.lock = threading.RLock()
self.hits = 0
self.misses = 0
def _make_key(self, text: str, target_language: str, source_language: str, provider: str) -> str:
"""Create a unique cache key"""
content = f"{provider}:{source_language}:{target_language}:{text}"
return hashlib.md5(content.encode('utf-8')).hexdigest()
def get(self, text: str, target_language: str, source_language: str, provider: str) -> Optional[str]:
"""Get a cached translation if available"""
key = self._make_key(text, target_language, source_language, provider)
with self.lock:
if key in self.cache:
self.hits += 1
# Move to end (most recently used)
self.cache.move_to_end(key)
return self.cache[key]
self.misses += 1
return None
def set(self, text: str, target_language: str, source_language: str, provider: str, translation: str):
"""Cache a translation result"""
key = self._make_key(text, target_language, source_language, provider)
with self.lock:
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = translation
# Remove oldest if exceeding maxsize
while len(self.cache) > self.maxsize:
self.cache.popitem(last=False)
def clear(self):
"""Clear the cache"""
with self.lock:
self.cache.clear()
self.hits = 0
self.misses = 0
def stats(self) -> Dict:
"""Get cache statistics"""
with self.lock:
total = self.hits + self.misses
hit_rate = (self.hits / total * 100) if total > 0 else 0
return {
"size": len(self.cache),
"maxsize": self.maxsize,
"hits": self.hits,
"misses": self.misses,
"hit_rate": f"{hit_rate:.1f}%"
}
# Global translation cache
_translation_cache = TranslationCache(maxsize=5000)
class TranslationProvider(ABC):
"""Abstract base class for translation providers"""
@abstractmethod
def translate(self, text: str, target_language: str, source_language: str = 'auto') -> str:
"""Translate text from source to target language"""
pass
def translate_batch(self, texts: List[str], target_language: str, source_language: str = 'auto') -> List[str]:
"""Translate multiple texts at once - default implementation"""
return [self.translate(text, target_language, source_language) for text in texts]
def translate_batch_parallel(self, texts: List[str], target_language: str, source_language: str = 'auto', max_workers: int = 4) -> List[str]:
"""Parallel batch translation using thread pool"""
if not texts:
return []
results = [''] * len(texts)
non_empty = [(i, t) for i, t in enumerate(texts) if t and t.strip()]
if not non_empty:
return [t if t else '' for t in texts]
def translate_one(item: Tuple[int, str]) -> Tuple[int, str]:
idx, text = item
try:
return (idx, self.translate(text, target_language, source_language))
except Exception as e:
print(f"Translation error at index {idx}: {e}")
return (idx, text)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
for idx, translated in executor.map(translate_one, non_empty):
results[idx] = translated
# Fill empty positions
for i, text in enumerate(texts):
if not text or not text.strip():
results[i] = text if text else ''
return results
class GoogleTranslationProvider(TranslationProvider):
"""Google Translate implementation with batch support and caching"""
def __init__(self):
self._local = threading.local()
self.provider_name = "google"
def _get_translator(self, source_language: str, target_language: str) -> GoogleTranslator:
"""Get or create a translator instance for the current thread"""
key = f"{source_language}_{target_language}"
if not hasattr(self._local, 'translators'):
self._local.translators = {}
if key not in self._local.translators:
self._local.translators[key] = GoogleTranslator(source=source_language, target=target_language)
return self._local.translators[key]
def translate(self, text: str, target_language: str, source_language: str = 'auto') -> str:
if not text or not text.strip():
return text
# Check cache first
cached = _translation_cache.get(text, target_language, source_language, self.provider_name)
if cached is not None:
return cached
try:
translator = self._get_translator(source_language, target_language)
result = translator.translate(text)
# Cache the result
_translation_cache.set(text, target_language, source_language, self.provider_name, result)
return result
except Exception as e:
print(f"Translation error: {e}")
return text
def translate_batch(self, texts: List[str], target_language: str, source_language: str = 'auto', batch_size: int = 50) -> List[str]:
"""
Translate multiple texts using batch processing for speed.
Uses caching to avoid redundant translations.
"""
if not texts:
return []
# Filter and track empty texts
results = [''] * len(texts)
non_empty_indices = []
non_empty_texts = []
texts_to_translate = []
indices_to_translate = []
for i, text in enumerate(texts):
if text and text.strip():
# Check cache first
cached = _translation_cache.get(text, target_language, source_language, self.provider_name)
if cached is not None:
results[i] = cached
else:
non_empty_indices.append(i)
non_empty_texts.append(text)
texts_to_translate.append(text)
indices_to_translate.append(i)
else:
results[i] = text if text else ''
if not texts_to_translate:
return results
try:
translator = GoogleTranslator(source=source_language, target=target_language)
# Process in batches
translated_texts = []
for i in range(0, len(texts_to_translate), batch_size):
batch = texts_to_translate[i:i + batch_size]
try:
# Use translate_batch if available
if hasattr(translator, 'translate_batch'):
batch_result = translator.translate_batch(batch)
else:
# Fallback: join with separator, translate, split
separator = "\n|||SPLIT|||\n"
combined = separator.join(batch)
translated_combined = translator.translate(combined)
if translated_combined:
batch_result = translated_combined.split("|||SPLIT|||")
# Clean up results
batch_result = [t.strip() for t in batch_result]
# If split didn't work correctly, fall back to individual
if len(batch_result) != len(batch):
batch_result = [translator.translate(t) for t in batch]
else:
batch_result = batch
translated_texts.extend(batch_result)
except Exception as e:
print(f"Batch translation error, falling back to individual: {e}")
for text in batch:
try:
translated_texts.append(translator.translate(text))
except:
translated_texts.append(text)
# Map back to original positions and cache results
for idx, (original, translated) in zip(indices_to_translate, zip(texts_to_translate, translated_texts)):
result = translated if translated else texts[idx]
results[idx] = result
# Cache successful translations
_translation_cache.set(texts[idx], target_language, source_language, self.provider_name, result)
return results
except Exception as e:
print(f"Batch translation failed: {e}")
# Fallback to individual translations
for idx, text in zip(indices_to_translate, texts_to_translate):
try:
results[idx] = GoogleTranslator(source=source_language, target=target_language).translate(text) or text
except:
results[idx] = text
return results
class DeepLTranslationProvider(TranslationProvider):
"""DeepL Translate implementation with batch support"""
def __init__(self, api_key: str):
self.api_key = api_key
self._translator_cache = {}
def _get_translator(self, source_language: str, target_language: str) -> DeeplTranslator:
key = f"{source_language}_{target_language}"
if key not in self._translator_cache:
self._translator_cache[key] = DeeplTranslator(api_key=self.api_key, source=source_language, target=target_language)
return self._translator_cache[key]
def translate(self, text: str, target_language: str, source_language: str = 'auto') -> str:
if not text or not text.strip():
return text
try:
translator = self._get_translator(source_language, target_language)
return translator.translate(text)
except Exception as e:
print(f"Translation error: {e}")
return text
def translate_batch(self, texts: List[str], target_language: str, source_language: str = 'auto') -> List[str]:
"""Batch translate using DeepL"""
if not texts:
return []
results = [''] * len(texts)
non_empty = [(i, t) for i, t in enumerate(texts) if t and t.strip()]
if not non_empty:
return [t if t else '' for t in texts]
try:
translator = self._get_translator(source_language, target_language)
non_empty_texts = [t for _, t in non_empty]
if hasattr(translator, 'translate_batch'):
translated = translator.translate_batch(non_empty_texts)
else:
translated = [translator.translate(t) for t in non_empty_texts]
for (idx, _), trans in zip(non_empty, translated):
results[idx] = trans if trans else texts[idx]
# Fill empty positions
for i, text in enumerate(texts):
if not text or not text.strip():
results[i] = text if text else ''
return results
except Exception as e:
print(f"DeepL batch error: {e}")
return [self.translate(t, target_language, source_language) for t in texts]
class LibreTranslationProvider(TranslationProvider):
"""LibreTranslate implementation with batch support"""
def __init__(self, custom_url: str = "https://libretranslate.com"):
self.custom_url = custom_url
self._translator_cache = {}
def _get_translator(self, source_language: str, target_language: str) -> LibreTranslator:
key = f"{source_language}_{target_language}"
if key not in self._translator_cache:
self._translator_cache[key] = LibreTranslator(source=source_language, target=target_language, custom_url=self.custom_url)
return self._translator_cache[key]
def translate(self, text: str, target_language: str, source_language: str = 'auto') -> str:
if not text or not text.strip():
return text
try:
translator = self._get_translator(source_language, target_language)
return translator.translate(text)
except Exception as e:
print(f"LibreTranslate error: {e}")
return text
def translate_batch(self, texts: List[str], target_language: str, source_language: str = 'auto') -> List[str]:
"""Batch translate using LibreTranslate"""
if not texts:
return []
results = [''] * len(texts)
non_empty = [(i, t) for i, t in enumerate(texts) if t and t.strip()]
if not non_empty:
return [t if t else '' for t in texts]
try:
translator = self._get_translator(source_language, target_language)
for idx, text in non_empty:
try:
results[idx] = translator.translate(text) or text
except:
results[idx] = text
for i, text in enumerate(texts):
if not text or not text.strip():
results[i] = text if text else ''
return results
except Exception as e:
print(f"LibreTranslate batch error: {e}")
return texts
class OllamaTranslationProvider(TranslationProvider):
"""Ollama LLM translation implementation"""
def __init__(self, base_url: str = "http://localhost:11434", model: str = "llama3", vision_model: str = "llava", system_prompt: str = ""):
self.base_url = base_url.rstrip('/')
self.model = model.strip() # Remove any leading/trailing whitespace
self.vision_model = vision_model.strip()
self.custom_system_prompt = system_prompt # Custom context, glossary, instructions
def translate(self, text: str, target_language: str, source_language: str = 'auto') -> str:
if not text or not text.strip():
return text
# Skip very short text or numbers only
if len(text.strip()) < 2 or text.strip().isdigit():
return text
try:
# Build system prompt with custom context if provided
base_prompt = f"""You are a professional translator. Your ONLY task is to translate text to {target_language}.
CRITICAL RULES:
1. Output ONLY the translated text - no explanations, no comments, no notes
2. Preserve the exact formatting (line breaks, spacing, punctuation)
3. Do NOT add any prefixes like "Here's the translation:" or "Translation:"
4. Do NOT refuse to translate or ask clarifying questions
5. If the text is already in {target_language}, return it unchanged
6. Translate everything literally and accurately
7. NEVER provide comments, opinions, or explanations - you are JUST a translator
8. If you have any doubt about the translation, return the original text unchanged
9. Do not interpret or analyze the content - simply translate word by word
10. Your response must contain ONLY the translated text, nothing else"""
if self.custom_system_prompt:
system_content = f"""{base_prompt}
ADDITIONAL CONTEXT AND INSTRUCTIONS:
{self.custom_system_prompt}"""
else:
system_content = base_prompt
# Use /api/chat endpoint (more compatible with all models)
response = requests.post(
f"{self.base_url}/api/chat",
json={
"model": self.model,
"messages": [
{
"role": "system",
"content": system_content
},
{
"role": "user",
"content": text
}
],
"stream": False,
"options": {
"temperature": 0.3,
"num_predict": 500
}
},
timeout=120 # 2 minutes timeout
)
response.raise_for_status()
result = response.json()
translated = result.get("message", {}).get("content", "").strip()
return translated if translated else text
except requests.exceptions.ConnectionError:
print(f"Ollama error: Cannot connect to {self.base_url}. Is Ollama running?")
return text
except requests.exceptions.Timeout:
print(f"Ollama error: Request timeout after 120s")
return text
except Exception as e:
print(f"Ollama translation error: {e}")
return text
def translate_image(self, image_path: str, target_language: str) -> str:
"""Translate text within an image using Ollama vision model"""
import base64
try:
# Read and encode image
with open(image_path, 'rb') as img_file:
image_data = base64.b64encode(img_file.read()).decode('utf-8')
# Use /api/chat for vision models too
response = requests.post(
f"{self.base_url}/api/chat",
json={
"model": self.vision_model,
"messages": [
{
"role": "user",
"content": f"Extract all text from this image and translate it to {target_language}. Return ONLY the translated text, preserving the structure and formatting.",
"images": [image_data]
}
],
"stream": False
},
timeout=60
)
response.raise_for_status()
result = response.json()
return result.get("message", {}).get("content", "").strip()
except Exception as e:
print(f"Ollama vision translation error: {e}")
return ""
@staticmethod
def list_models(base_url: str = "http://localhost:11434") -> List[str]:
"""List available Ollama models"""
try:
response = requests.get(f"{base_url.rstrip('/')}/api/tags", timeout=5)
response.raise_for_status()
models = response.json().get("models", [])
return [model["name"] for model in models]
except Exception as e:
print(f"Error listing Ollama models: {e}")
return []
class WebLLMTranslationProvider(TranslationProvider):
"""WebLLM browser-based translation (client-side processing)"""
def translate(self, text: str, target_language: str, source_language: str = 'auto') -> str:
# WebLLM translation happens client-side in the browser
# This is just a placeholder - actual translation is done by JavaScript
# For server-side, we'll just pass through for now
return text
class OpenAITranslationProvider(TranslationProvider):
"""OpenAI GPT translation implementation with vision support"""
def __init__(self, api_key: str, model: str = "gpt-4o-mini", system_prompt: str = ""):
self.api_key = api_key
self.model = model
self.custom_system_prompt = system_prompt
def translate(self, text: str, target_language: str, source_language: str = 'auto') -> str:
if not text or not text.strip():
return text
# Skip very short text or numbers only
if len(text.strip()) < 2 or text.strip().isdigit():
return text
try:
import openai
client = openai.OpenAI(api_key=self.api_key)
# Build system prompt with custom context if provided
base_prompt = f"""You are a professional translator. Your ONLY task is to translate text to {target_language}.
CRITICAL RULES:
1. Output ONLY the translated text - no explanations, no comments, no notes
2. Preserve the exact formatting (line breaks, spacing, punctuation)
3. Do NOT add any prefixes like "Here's the translation:" or "Translation:"
4. Do NOT refuse to translate or ask clarifying questions
5. If the text is already in {target_language}, return it unchanged
6. Translate everything literally and accurately
7. NEVER provide comments, opinions, or explanations - you are JUST a translator
8. If you have any doubt about the translation, return the original text unchanged
9. Do not interpret or analyze the content - simply translate word by word
10. Your response must contain ONLY the translated text, nothing else"""
if self.custom_system_prompt:
system_content = f"""{base_prompt}
ADDITIONAL CONTEXT AND INSTRUCTIONS:
{self.custom_system_prompt}"""
else:
system_content = base_prompt
response = client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_content},
{"role": "user", "content": text}
],
temperature=0.3,
max_tokens=500
)
translated = response.choices[0].message.content.strip()
return translated if translated else text
except Exception as e:
print(f"OpenAI translation error: {e}")
return text
def translate_image(self, image_path: str, target_language: str) -> str:
"""Translate text within an image using OpenAI vision model"""
import base64
try:
import openai
client = openai.OpenAI(api_key=self.api_key)
# Read and encode image
with open(image_path, 'rb') as img_file:
image_data = base64.b64encode(img_file.read()).decode('utf-8')
# Determine image type from extension
ext = image_path.lower().split('.')[-1]
media_type = f"image/{ext}" if ext in ['png', 'jpg', 'jpeg', 'gif', 'webp'] else "image/png"
response = client.chat.completions.create(
model=self.model, # gpt-4o and gpt-4o-mini support vision
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": f"Extract all text from this image and translate it to {target_language}. Return ONLY the translated text, preserving the structure and formatting."
},
{
"type": "image_url",
"image_url": {
"url": f"data:{media_type};base64,{image_data}"
}
}
]
}
],
max_tokens=1000
)
return response.choices[0].message.content.strip()
except Exception as e:
print(f"OpenAI vision translation error: {e}")
return ""
class TranslationService:
"""Main translation service that delegates to the configured provider"""
def __init__(self, provider: Optional[TranslationProvider] = None):
if provider:
self.provider = provider
else:
# Auto-select provider based on configuration
self.provider = self._get_default_provider()
self.translate_images = False # Flag to enable image translation
def _get_default_provider(self) -> TranslationProvider:
"""Get the default translation provider from configuration"""
# Always use Google Translate by default to avoid API key issues
# Provider will be overridden per request in the API endpoint
return GoogleTranslationProvider()
def translate_text(self, text: str, target_language: str, source_language: str = 'auto') -> str:
"""
Translate a single text string
Args:
text: Text to translate
target_language: Target language code (e.g., 'es', 'fr', 'de')
source_language: Source language code (default: 'auto' for auto-detection)
Returns:
Translated text
"""
if not text or not text.strip():
return text
return self.provider.translate(text, target_language, source_language)
def translate_image(self, image_path: str, target_language: str) -> str:
"""
Translate text in an image using vision model (Ollama or OpenAI)
Args:
image_path: Path to image file
target_language: Target language code
Returns:
Translated text from image
"""
if not self.translate_images:
return ""
# Ollama and OpenAI support image translation
if isinstance(self.provider, OllamaTranslationProvider):
return self.provider.translate_image(image_path, target_language)
elif isinstance(self.provider, OpenAITranslationProvider):
return self.provider.translate_image(image_path, target_language)
return ""
def translate_batch(self, texts: list[str], target_language: str, source_language: str = 'auto') -> list[str]:
"""
Translate multiple text strings efficiently using batch processing.
Args:
texts: List of texts to translate
target_language: Target language code
source_language: Source language code (default: 'auto')
Returns:
List of translated texts
"""
if not texts:
return []
# Use provider's batch method if available
if hasattr(self.provider, 'translate_batch'):
return self.provider.translate_batch(texts, target_language, source_language)
# Fallback to individual translations
return [self.translate_text(text, target_language, source_language) for text in texts]
# Global translation service instance
translation_service = TranslationService()