""" Translation Service Abstraction Provides a unified interface for different translation providers Optimized for high performance with parallel processing and caching """ from abc import ABC, abstractmethod from typing import Optional, List, Dict, Tuple import requests from deep_translator import GoogleTranslator, DeeplTranslator, LibreTranslator from config import config import concurrent.futures import threading import asyncio from functools import lru_cache, wraps import time import hashlib import random import logging from collections import OrderedDict from core.logging import get_logger logger = get_logger(__name__) # Map language codes to full names for LLM prompts (models understand "French" better than "fr") _LLM_LANG_NAMES = { "en": "English", "es": "Spanish", "de": "German", "fr": "French", "ja": "Japanese", "pt": "Portuguese", "ru": "Russian", "it": "Italian", "zh": "Chinese", "zh-CN": "Chinese (Simplified)", "zh-TW": "Chinese (Traditional)", "pl": "Polish", "nl": "Dutch", "tr": "Turkish", "ko": "Korean", "ar": "Arabic", "fa": "Persian", "vi": "Vietnamese", "id": "Indonesian", "uk": "Ukrainian", "sv": "Swedish", "cs": "Czech", "el": "Greek", "he": "Hebrew", "hi": "Hindi", "ro": "Romanian", "da": "Danish", "fi": "Finnish", "no": "Norwegian", "hu": "Hungarian", "th": "Thai", "sk": "Slovak", "bg": "Bulgarian", "hr": "Croatian", "ca": "Catalan", "ms": "Malay", } def _lang_name(code: str) -> str: """Return full language name for LLM prompts; fallback to code if unknown.""" if not code or code == "auto": return "" return _LLM_LANG_NAMES.get(code, _LLM_LANG_NAMES.get(code.split("-")[0], code)) # Global thread pool for parallel translations _executor = concurrent.futures.ThreadPoolExecutor(max_workers=8) def retry_with_backoff( max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 30.0 ): """ Decorator for retry logic with exponential backoff and jitter. Used for API calls that may fail due to rate limiting or transient errors. """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): last_exception = None for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: last_exception = e if attempt < max_retries - 1: # Exponential backoff with jitter delay = min( base_delay * (2**attempt) + random.uniform(0, 1), max_delay ) logger.warning( f"Retry {attempt + 1}/{max_retries} for {func.__name__} after {delay:.2f}s: {e}" ) time.sleep(delay) # All retries exhausted logger.error( f"All {max_retries} retries failed for {func.__name__}: {last_exception}" ) raise last_exception return wrapper return decorator class TranslationCache: """Thread-safe LRU cache for translations to avoid redundant API calls""" def __init__(self, maxsize: int = 5000): self.cache: OrderedDict = OrderedDict() self.maxsize = maxsize self.lock = threading.RLock() self.hits = 0 self.misses = 0 def _make_key( self, text: str, target_language: str, source_language: str, provider: str ) -> str: """Create a unique cache key""" content = f"{provider}:{source_language}:{target_language}:{text}" return hashlib.md5(content.encode("utf-8")).hexdigest() def get( self, text: str, target_language: str, source_language: str, provider: str ) -> Optional[str]: """Get a cached translation if available""" key = self._make_key(text, target_language, source_language, provider) with self.lock: if key in self.cache: self.hits += 1 # Move to end (most recently used) self.cache.move_to_end(key) return self.cache[key] self.misses += 1 return None def set( self, text: str, target_language: str, source_language: str, provider: str, translation: str, ): """Cache a translation result""" key = self._make_key(text, target_language, source_language, provider) with self.lock: if key in self.cache: self.cache.move_to_end(key) self.cache[key] = translation # Remove oldest if exceeding maxsize while len(self.cache) > self.maxsize: self.cache.popitem(last=False) def clear(self): """Clear the cache""" with self.lock: self.cache.clear() self.hits = 0 self.misses = 0 def stats(self) -> Dict: """Get cache statistics""" with self.lock: total = self.hits + self.misses hit_rate = (self.hits / total * 100) if total > 0 else 0 return { "size": len(self.cache), "maxsize": self.maxsize, "hits": self.hits, "misses": self.misses, "hit_rate": f"{hit_rate:.1f}%", } # Global translation cache _translation_cache = TranslationCache(maxsize=5000) class TranslationProvider(ABC): """Abstract base class for translation providers""" @abstractmethod def translate( self, text: str, target_language: str, source_language: str = "auto" ) -> str: """Translate text from source to target language""" pass def translate_batch( self, texts: List[str], target_language: str, source_language: str = "auto" ) -> List[str]: """Translate multiple texts at once - default implementation""" return [ self.translate(text, target_language, source_language) for text in texts ] def translate_batch_parallel( self, texts: List[str], target_language: str, source_language: str = "auto", max_workers: int = 4, ) -> List[str]: """Parallel batch translation using thread pool""" if not texts: return [] results = [""] * len(texts) non_empty = [(i, t) for i, t in enumerate(texts) if t and t.strip()] if not non_empty: return [t if t else "" for t in texts] def translate_one(item: Tuple[int, str]) -> Tuple[int, str]: idx, text = item try: return (idx, self.translate(text, target_language, source_language)) except Exception as e: logger.warning( "translation_error_at_index", index=idx, error_type=type(e).__name__, ) return (idx, text) with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: for idx, translated in executor.map(translate_one, non_empty): results[idx] = translated # Fill empty positions for i, text in enumerate(texts): if not text or not text.strip(): results[i] = text if text else "" return results class GoogleTranslationProvider(TranslationProvider): """Google Translate implementation with batch support and caching""" # deep_translator requires specific codes that differ from BCP-47 / ISO 639-1. # Map common codes sent by the frontend to the codes deep_translator expects. _LANG_MAP: dict[str, str] = { "zh": "zh-CN", # Chinese (Simplified) — deep_translator only accepts zh-CN "zh-cn": "zh-CN", "zh-tw": "zh-TW", # Chinese (Traditional) "iw": "he", # Hebrew: old ISO code → Google uses 'iw' internally "he": "iw", # deep_translator maps Hebrew as 'iw' "jv": "jw", # Javanese "nb": "no", # Norwegian Bokmål } def __init__(self): self._local = threading.local() self.provider_name = "google" def _normalize_lang(self, code: str) -> str: """Normalise a language code to what deep_translator's GoogleTranslator accepts.""" if not code or code == "auto": return "auto" return self._LANG_MAP.get(code, self._LANG_MAP.get(code.lower(), code)) def _get_translator( self, source_language: str, target_language: str ) -> GoogleTranslator: """Get or create a translator instance for the current thread""" src = self._normalize_lang(source_language) tgt = self._normalize_lang(target_language) key = f"{src}_{tgt}" if not hasattr(self._local, "translators"): self._local.translators = {} if key not in self._local.translators: self._local.translators[key] = GoogleTranslator( source=src, target=tgt ) return self._local.translators[key] @retry_with_backoff(max_retries=3, base_delay=1.0) def _do_translate(self, translator: GoogleTranslator, text: str) -> str: """Perform translation with retry logic""" return translator.translate(text) def translate( self, text: str, target_language: str, source_language: str = "auto" ) -> str: if not text or not text.strip(): return text # Check cache first cached = _translation_cache.get( text, target_language, source_language, self.provider_name ) if cached is not None: return cached try: translator = self._get_translator(source_language, target_language) result = self._do_translate(translator, text) # Cache the result _translation_cache.set( text, target_language, source_language, self.provider_name, result ) return result except Exception as e: logger.error(f"Translation error: {e}") return text def translate_batch( self, texts: List[str], target_language: str, source_language: str = "auto", batch_size: int = 50, ) -> List[str]: """ Translate multiple texts using batch processing for speed. Uses caching to avoid redundant translations. """ if not texts: return [] # Filter and track empty texts results = [""] * len(texts) non_empty_indices = [] non_empty_texts = [] texts_to_translate = [] indices_to_translate = [] for i, text in enumerate(texts): if text and text.strip(): # Check cache first cached = _translation_cache.get( text, target_language, source_language, self.provider_name ) if cached is not None: results[i] = cached else: non_empty_indices.append(i) non_empty_texts.append(text) texts_to_translate.append(text) indices_to_translate.append(i) else: results[i] = text if text else "" if not texts_to_translate: return results src_norm = self._normalize_lang(source_language) tgt_norm = self._normalize_lang(target_language) try: translator = GoogleTranslator(source=src_norm, target=tgt_norm) # Process in batches translated_texts = [] for i in range(0, len(texts_to_translate), batch_size): batch = texts_to_translate[i : i + batch_size] try: # Use translate_batch if available if hasattr(translator, "translate_batch"): batch_result = translator.translate_batch(batch) else: # Fallback: join with separator, translate, split separator = "\n|||SPLIT|||\n" combined = separator.join(batch) translated_combined = translator.translate(combined) if translated_combined: batch_result = translated_combined.split("|||SPLIT|||") # Clean up results batch_result = [t.strip() for t in batch_result] # If split didn't work correctly, fall back to individual if len(batch_result) != len(batch): batch_result = [translator.translate(t) for t in batch] else: batch_result = batch translated_texts.extend(batch_result) except Exception as e: logger.warning( "batch_translation_fallback", error_type=type(e).__name__, ) for text in batch: try: translated_texts.append(translator.translate(text)) except: translated_texts.append(text) # Map back to original positions and cache results for idx, (original, translated) in zip( indices_to_translate, zip(texts_to_translate, translated_texts) ): result = translated if translated else texts[idx] results[idx] = result # Cache successful translations _translation_cache.set( texts[idx], target_language, source_language, self.provider_name, result, ) return results except Exception as e: logger.warning("batch_translation_failed", error_type=type(e).__name__) # Fallback to individual translations for idx, text in zip(indices_to_translate, texts_to_translate): try: results[idx] = ( GoogleTranslator( source=src_norm, target=tgt_norm ).translate(text) or text ) except Exception: results[idx] = text return results class DeepLTranslationProvider(TranslationProvider): """DeepL Translate implementation with batch support""" def __init__(self, api_key: str): self.api_key = api_key self._translator_cache = {} def _get_translator( self, source_language: str, target_language: str ) -> DeeplTranslator: key = f"{source_language}_{target_language}" if key not in self._translator_cache: self._translator_cache[key] = DeeplTranslator( api_key=self.api_key, source=source_language, target=target_language ) return self._translator_cache[key] def translate( self, text: str, target_language: str, source_language: str = "auto" ) -> str: if not text or not text.strip(): return text try: translator = self._get_translator(source_language, target_language) return translator.translate(text) except Exception as e: logger.warning("translation_error", error_type=type(e).__name__) return text def translate_batch( self, texts: List[str], target_language: str, source_language: str = "auto" ) -> List[str]: """Batch translate using DeepL""" if not texts: return [] results = [""] * len(texts) non_empty = [(i, t) for i, t in enumerate(texts) if t and t.strip()] if not non_empty: return [t if t else "" for t in texts] try: translator = self._get_translator(source_language, target_language) non_empty_texts = [t for _, t in non_empty] if hasattr(translator, "translate_batch"): translated = translator.translate_batch(non_empty_texts) else: translated = [translator.translate(t) for t in non_empty_texts] for (idx, _), trans in zip(non_empty, translated): results[idx] = trans if trans else texts[idx] # Fill empty positions for i, text in enumerate(texts): if not text or not text.strip(): results[i] = text if text else "" return results except Exception as e: logger.warning("deepl_batch_error", error_type=type(e).__name__) return [self.translate(t, target_language, source_language) for t in texts] class LibreTranslationProvider(TranslationProvider): """LibreTranslate implementation with batch support""" def __init__(self, custom_url: str = "https://libretranslate.com"): self.custom_url = custom_url self._translator_cache = {} def _get_translator( self, source_language: str, target_language: str ) -> LibreTranslator: key = f"{source_language}_{target_language}" if key not in self._translator_cache: self._translator_cache[key] = LibreTranslator( source=source_language, target=target_language, custom_url=self.custom_url, ) return self._translator_cache[key] def translate( self, text: str, target_language: str, source_language: str = "auto" ) -> str: if not text or not text.strip(): return text try: translator = self._get_translator(source_language, target_language) return translator.translate(text) except Exception as e: logger.warning("libretranslate_error", error_type=type(e).__name__) return text def translate_batch( self, texts: List[str], target_language: str, source_language: str = "auto" ) -> List[str]: """Batch translate using LibreTranslate""" if not texts: return [] results = [""] * len(texts) non_empty = [(i, t) for i, t in enumerate(texts) if t and t.strip()] if not non_empty: return [t if t else "" for t in texts] try: translator = self._get_translator(source_language, target_language) for idx, text in non_empty: try: results[idx] = translator.translate(text) or text except: results[idx] = text for i, text in enumerate(texts): if not text or not text.strip(): results[i] = text if text else "" return results except Exception as e: logger.warning("libretranslate_batch_error", error_type=type(e).__name__) return texts class OllamaTranslationProvider(TranslationProvider): """Ollama LLM translation implementation""" def __init__( self, base_url: str = "http://localhost:11434", model: str = "llama3", vision_model: str = "llava", system_prompt: str = "", ): self.base_url = base_url.rstrip("/") self.model = model.strip() # Remove any leading/trailing whitespace self.vision_model = vision_model.strip() self.custom_system_prompt = ( system_prompt # Custom context, glossary, instructions ) def translate( self, text: str, target_language: str, source_language: str = "auto" ) -> str: if not text or not text.strip(): return text # Skip very short text or numbers only if len(text.strip()) < 2 or text.strip().isdigit(): return text try: target_name = _lang_name(target_language) or target_language source_name = _lang_name(source_language) if source_language and source_language != "auto" else None if source_name: base_prompt = f"""You are a translator. Translate the following text FROM {source_name} TO {target_name}. Output ONLY the translated text. No explanations, no quotes. Preserve formatting. If already in {target_name}, return unchanged.""" else: base_prompt = f"""You are a translator. Translate the following text TO {target_name}. Output ONLY the translated text. No explanations, no quotes. Preserve formatting. Detect source language if needed. If already in {target_name}, return unchanged.""" if self.custom_system_prompt: system_content = f"""{base_prompt} ADDITIONAL CONTEXT: {self.custom_system_prompt}""" else: system_content = base_prompt # Use /api/chat endpoint (more compatible with all models) response = requests.post( f"{self.base_url}/api/chat", json={ "model": self.model, "messages": [ {"role": "system", "content": system_content}, {"role": "user", "content": text}, ], "stream": False, "options": {"temperature": 0.3, "num_predict": 500}, }, timeout=120, # 2 minutes timeout ) response.raise_for_status() result = response.json() translated = result.get("message", {}).get("content", "").strip() return translated if translated else text except requests.exceptions.ConnectionError: logger.warning( "ollama_connection_error", base_url=self.base_url, ) return text except requests.exceptions.Timeout: logger.warning("ollama_timeout", timeout_s=120) return text except Exception as e: logger.warning("ollama_translation_error", error_type=type(e).__name__) return text def translate_batch( self, texts: List[str], target_language: str, source_language: str = "auto", max_workers: int = 4, ) -> List[str]: """ Batch translate using parallel requests to Ollama. Uses ThreadPoolExecutor for concurrent translations. """ import concurrent.futures if not texts: return [] results = [""] * len(texts) texts_to_translate = [] indices_to_translate = [] for i, text in enumerate(texts): if not text or not text.strip(): results[i] = text if text else "" elif len(text.strip()) < 2 or text.strip().isdigit(): results[i] = text else: texts_to_translate.append(text) indices_to_translate.append(i) if not texts_to_translate: return results def translate_one(text: str) -> str: return self.translate(text, target_language, source_language) with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: translated = list(executor.map(translate_one, texts_to_translate)) for idx, trans in zip(indices_to_translate, translated): results[idx] = trans return results def list_models(self) -> List[dict]: """List available models from Ollama server""" try: response = requests.get(f"{self.base_url}/api/tags", timeout=5) if response.ok: data = response.json() return data.get("models", []) return [] except Exception as e: logger.warning("ollama_list_models_error", error_type=type(e).__name__) return [] def translate_image(self, image_path: str, target_language: str) -> str: """Translate text within an image using Ollama vision model""" import base64 try: # Read and encode image with open(image_path, "rb") as img_file: image_data = base64.b64encode(img_file.read()).decode("utf-8") # Use /api/chat for vision models too response = requests.post( f"{self.base_url}/api/chat", json={ "model": self.vision_model, "messages": [ { "role": "user", "content": f"Extract all text from this image and translate it to {target_language}. Return ONLY the translated text, preserving the structure and formatting.", "images": [image_data], } ], "stream": False, }, timeout=60, ) response.raise_for_status() result = response.json() return result.get("message", {}).get("content", "").strip() except Exception as e: logger.warning("ollama_vision_error", error_type=type(e).__name__) return "" @staticmethod def list_models(base_url: str = "http://localhost:11434") -> List[str]: """List available Ollama models""" try: response = requests.get(f"{base_url.rstrip('/')}/api/tags", timeout=5) response.raise_for_status() models = response.json().get("models", []) return [model["name"] for model in models] except Exception as e: logger.warning("ollama_list_models_error", error_type=type(e).__name__) return [] class OpenRouterTranslationProvider(TranslationProvider): """ OpenRouter API translation - Access to many cheap & high-quality models Recommended models for translation (by cost/quality): - deepseek/deepseek-chat: $0.14/M tokens - Excellent quality, very cheap - mistralai/mistral-7b-instruct: $0.06/M tokens - Fast and cheap - meta-llama/llama-3.1-8b-instruct: $0.06/M tokens - Good quality - google/gemma-2-9b-it: $0.08/M tokens - Good for European languages """ def __init__( self, api_key: str, model: str = "deepseek/deepseek-chat", system_prompt: str = "", ): self.api_key = api_key self.model = model self.custom_system_prompt = system_prompt self.base_url = "https://openrouter.ai/api/v1" self.provider_name = "openrouter" self._session = None def _get_session(self): """Get or create a requests session for connection pooling""" if self._session is None: import requests self._session = requests.Session() self._session.headers.update( { "Authorization": f"Bearer {self.api_key}", "HTTP-Referer": "https://translate-app.local", "X-Title": "Document Translator", "Content-Type": "application/json", } ) return self._session def translate( self, text: str, target_language: str, source_language: str = "auto" ) -> str: if not text or not text.strip(): return text # Skip very short text or numbers only if len(text.strip()) < 2 or text.strip().isdigit(): return text # Check cache first cached = _translation_cache.get( text, target_language, source_language, self.provider_name ) if cached is not None: return cached session = self._get_session() target_name = _lang_name(target_language) or target_language source_name = _lang_name(source_language) if source_language and source_language != "auto" else None if source_name: system_prompt = f"""You are a translator. Translate the following text FROM {source_name} TO {target_name}. RULES: - Output ONLY the translated text. No explanations, no quotes, no "Translation:" prefix. - Preserve formatting (line breaks, spacing). - If the text is already in {target_name}, return it unchanged. - Never add comments or notes.""" else: system_prompt = f"""You are a translator. Translate the following text TO {target_name}. RULES: - Output ONLY the translated text. No explanations, no quotes, no "Translation:" prefix. - Preserve formatting (line breaks, spacing). - Detect the source language automatically. - If the text is already in {target_name}, return it unchanged. - Never add comments or notes.""" if self.custom_system_prompt: system_prompt = ( f"{system_prompt}\n\nAdditional context: {self.custom_system_prompt}" ) payload = { "model": self.model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": text}, ], "temperature": 0.2, "max_tokens": 1000, } last_error = None for attempt in range(3): try: response = session.post( f"{self.base_url}/chat/completions", json=payload, timeout=30, ) if response.status_code == 429: wait = (attempt + 1) * 5 logger.warning(f"OpenRouter rate limit (429), retry in {wait}s (attempt {attempt + 1}/3)") time.sleep(wait) continue response.raise_for_status() result = response.json() translated = ( result.get("choices", [{}])[0] .get("message", {}) .get("content", "") .strip() ) if translated: _translation_cache.set( text, target_language, source_language, self.provider_name, translated ) return translated raise ValueError("OpenRouter returned empty translation") except Exception as e: last_error = e if attempt < 2 and "429" in str(e): time.sleep((attempt + 1) * 5) continue break err_msg = str(last_error) if last_error else "Unknown error" logger.error(f"OpenRouter translation failed: {err_msg}") raise RuntimeError( f"Traduction IA échouée: {err_msg}. " "Si vous utilisez un modèle gratuit (ex: gemma:free), il est souvent limité. " "Passez à deepseek/deepseek-v3.2 dans les paramètres admin." ) def translate_batch( self, texts: List[str], target_language: str, source_language: str = "auto" ) -> List[str]: """ Batch translate using OpenRouter with parallel requests. Uses caching to avoid redundant translations. """ if not texts: return [] results = [""] * len(texts) texts_to_translate = [] indices_to_translate = [] # Check cache first for i, text in enumerate(texts): if not text or not text.strip(): results[i] = text if text else "" else: cached = _translation_cache.get( text, target_language, source_language, self.provider_name ) if cached is not None: results[i] = cached else: texts_to_translate.append(text) indices_to_translate.append(i) if not texts_to_translate: return results # Translate in parallel batches import concurrent.futures def translate_one(text: str) -> str: return self.translate(text, target_language, source_language) # Use thread pool for parallel requests with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: translated = list(executor.map(translate_one, texts_to_translate)) # Map back results for idx, trans in zip(indices_to_translate, translated): results[idx] = trans return results def translate_image(self, image_path: str, target_language: str) -> str: """Translate text within an image using OpenRouter vision model""" import base64 try: # Read and encode image with open(image_path, "rb") as img_file: image_data = base64.b64encode(img_file.read()).decode("utf-8") # Determine image type from extension ext = image_path.lower().split(".")[-1] media_type = ( f"image/{ext}" if ext in ["png", "jpg", "jpeg", "gif", "webp"] else "image/png" ) vision_model = self.model vision_supported_keywords = ["gpt-4", "claude", "gemini", "fable", "pixtral", "llama-3.2", "grok-2"] has_vision = any(kw in vision_model.lower() for kw in vision_supported_keywords) if not has_vision: vision_model = "google/gemini-3.5-flash" session = self._get_session() payload = { "model": vision_model, "messages": [ { "role": "user", "content": [ { "type": "text", "text": f"Extract all text from this image and translate it to {target_language}. Return ONLY the translated text, preserving the structure and formatting.", }, { "type": "image_url", "image_url": { "url": f"data:{media_type};base64,{image_data}" }, }, ], } ], "max_tokens": 1000, } response = session.post( f"{self.base_url}/chat/completions", json=payload, timeout=60, ) response.raise_for_status() result = response.json() translated = ( result.get("choices", [{}])[0] .get("message", {}) .get("content", "") .strip() ) return translated except Exception as e: logger.warning("openrouter_vision_error", error_type=type(e).__name__, error=str(e)) return "" @staticmethod def list_recommended_models() -> List[dict]: """List recommended models for translation with pricing""" return [ { "id": "deepseek/deepseek-chat", "name": "DeepSeek Chat", "price": "$0.14/M tokens", "quality": "Excellent", "speed": "Fast", }, { "id": "mistralai/mistral-7b-instruct", "name": "Mistral 7B", "price": "$0.06/M tokens", "quality": "Good", "speed": "Very Fast", }, { "id": "meta-llama/llama-3.1-8b-instruct", "name": "Llama 3.1 8B", "price": "$0.06/M tokens", "quality": "Good", "speed": "Fast", }, { "id": "google/gemma-2-9b-it", "name": "Gemma 2 9B", "price": "$0.08/M tokens", "quality": "Good", "speed": "Fast", }, { "id": "anthropic/claude-3-haiku", "name": "Claude 3 Haiku", "price": "$0.25/M tokens", "quality": "Excellent", "speed": "Fast", }, { "id": "openai/gpt-4o-mini", "name": "GPT-4o Mini", "price": "$0.15/M tokens", "quality": "Excellent", "speed": "Fast", }, ] class WebLLMTranslationProvider(TranslationProvider): """WebLLM browser-based translation (client-side processing)""" def translate( self, text: str, target_language: str, source_language: str = "auto" ) -> str: # WebLLM translation happens client-side in the browser # This is just a placeholder - actual translation is done by JavaScript # For server-side, we'll just pass through for now return text class OpenAITranslationProvider(TranslationProvider): """OpenAI-compatible LLM translation provider (OpenAI, xAI/Grok, Azure, etc.)""" def __init__( self, api_key: str, model: str = "gpt-4o-mini", system_prompt: str = "", base_url: Optional[str] = None, ): self.api_key = api_key self.model = model self.custom_system_prompt = system_prompt self.base_url = base_url # None → uses default OpenAI endpoint def translate( self, text: str, target_language: str, source_language: str = "auto" ) -> str: if not text or not text.strip(): return text # Skip very short text or numbers only if len(text.strip()) < 2 or text.strip().isdigit(): return text try: import openai client_kwargs = {"api_key": self.api_key} if self.base_url: client_kwargs["base_url"] = self.base_url client = openai.OpenAI(**client_kwargs) target_name = _lang_name(target_language) or target_language source_name = _lang_name(source_language) if source_language and source_language != "auto" else None if source_name: base_prompt = f"""You are a translator. Translate the following text FROM {source_name} TO {target_name}. Output ONLY the translated text. No explanations, no quotes. Preserve formatting. If already in {target_name}, return unchanged.""" else: base_prompt = f"""You are a translator. Translate the following text TO {target_name}. Output ONLY the translated text. No explanations, no quotes. Preserve formatting. Detect source language if needed. If already in {target_name}, return unchanged.""" if self.custom_system_prompt: system_content = f"""{base_prompt} ADDITIONAL CONTEXT AND INSTRUCTIONS: {self.custom_system_prompt}""" else: system_content = base_prompt response = client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": system_content}, {"role": "user", "content": text}, ], temperature=0.3, max_tokens=500, ) translated = response.choices[0].message.content.strip() return translated if translated else text except Exception as e: logger.warning("openai_translation_error", error_type=type(e).__name__) return text def translate_image(self, image_path: str, target_language: str) -> str: """Translate text within an image using OpenAI vision model""" import base64 try: import openai client_kwargs = {"api_key": self.api_key} if self.base_url: client_kwargs["base_url"] = self.base_url client = openai.OpenAI(**client_kwargs) # Read and encode image with open(image_path, "rb") as img_file: image_data = base64.b64encode(img_file.read()).decode("utf-8") # Determine image type from extension ext = image_path.lower().split(".")[-1] media_type = ( f"image/{ext}" if ext in ["png", "jpg", "jpeg", "gif", "webp"] else "image/png" ) # Determine a vision model. If the current model doesn't support vision, # use a fast vision fallback model vision_model = self.model vision_supported_keywords = ["gpt-4", "claude", "gemini", "fable", "pixtral", "llama-3.2", "grok-2"] has_vision = any(kw in vision_model.lower() for kw in vision_supported_keywords) if not has_vision: if self.base_url and "openrouter.ai" in self.base_url: vision_model = "google/gemini-3.5-flash" else: vision_model = "gpt-4o-mini" response = client.chat.completions.create( model=vision_model, messages=[ { "role": "user", "content": [ { "type": "text", "text": f"Extract all text from this image and translate it to {target_language}. Return ONLY the translated text, preserving the structure and formatting.", }, { "type": "image_url", "image_url": { "url": f"data:{media_type};base64,{image_data}" }, }, ], } ], max_tokens=1000, ) return response.choices[0].message.content.strip() except Exception as e: logger.warning("openai_vision_error", error_type=type(e).__name__, error=str(e)) return "" class TranslationService: """Main translation service that delegates to the configured provider""" def __init__(self, provider: Optional[TranslationProvider] = None): if provider: self.provider = provider else: # Auto-select provider based on configuration self.provider = self._get_default_provider() self.translate_images = False # Flag to enable image translation def _get_default_provider(self) -> TranslationProvider: """Get the default translation provider from configuration""" # Always use Google Translate by default to avoid API key issues # Provider will be overridden per request in the API endpoint return GoogleTranslationProvider() def translate_text( self, text: str, target_language: str, source_language: str = "auto" ) -> str: """ Translate a single text string Args: text: Text to translate target_language: Target language code (e.g., 'es', 'fr', 'de') source_language: Source language code (default: 'auto' for auto-detection) Returns: Translated text """ if not text or not text.strip(): return text return self.provider.translate(text, target_language, source_language) def translate_image(self, image_path: str, target_language: str) -> str: """ Translate text in an image using vision model (Ollama or OpenAI) Args: image_path: Path to image file target_language: Target language code Returns: Translated text from image """ if not self.translate_images: return "" # Ollama, OpenAI, and OpenRouter support image translation if isinstance(self.provider, OllamaTranslationProvider): return self.provider.translate_image(image_path, target_language) elif isinstance(self.provider, OpenAITranslationProvider): return self.provider.translate_image(image_path, target_language) elif isinstance(self.provider, OpenRouterTranslationProvider): return self.provider.translate_image(image_path, target_language) return "" def translate_batch( self, texts: list[str], target_language: str, source_language: str = "auto" ) -> list[str]: """ Translate multiple text strings efficiently using batch processing. Args: texts: List of texts to translate target_language: Target language code source_language: Source language code (default: 'auto') Returns: List of translated texts """ if not texts: return [] # Use provider's batch method if available if hasattr(self.provider, "translate_batch"): return self.provider.translate_batch( texts, target_language, source_language ) # Fallback to individual translations return [ self.translate_text(text, target_language, source_language) for text in texts ] # Global translation service instance translation_service = TranslationService()