175 lines
6.1 KiB
Python
175 lines
6.1 KiB
Python
"""
|
|
Progress Tracker Service (Story 2.11)
|
|
|
|
Provides real-time progress tracking for translation jobs.
|
|
Designed for O(1) updates and < 500ms latency (NFR3).
|
|
"""
|
|
|
|
from typing import Dict, Any, Optional, Callable
|
|
import threading
|
|
import time
|
|
|
|
|
|
class ProgressTracker:
|
|
"""
|
|
Track translation progress with callback support.
|
|
|
|
Designed for high-performance updates with minimal overhead.
|
|
Uses in-memory storage for MVP (consistent with Story 2.10 pattern).
|
|
|
|
Usage:
|
|
storage = {} # Reference to _translation_jobs dict
|
|
tracker = ProgressTracker("job_123", storage)
|
|
tracker.update(50, "Translating sheet 2/4")
|
|
|
|
# Or use item-based progress
|
|
tracker.update_item(3, 10, "Translating slide")
|
|
"""
|
|
|
|
def __init__(self, job_id: str, storage: Dict[str, Any]):
|
|
"""
|
|
Initialize progress tracker.
|
|
|
|
Args:
|
|
job_id: The translation job ID
|
|
storage: Reference to the job storage dict (e.g., _translation_jobs)
|
|
"""
|
|
self.job_id = job_id
|
|
self.storage = storage
|
|
self._lock = threading.RLock()
|
|
self._last_update_time = 0
|
|
self._min_update_interval = 0.05 # 50ms minimum between updates (throttling)
|
|
|
|
def update(self, percent: int, step: str) -> None:
|
|
"""
|
|
Update progress percentage and current step.
|
|
|
|
Thread-safe and throttled to prevent excessive updates.
|
|
|
|
Args:
|
|
percent: Progress percentage (0-100), will be clamped
|
|
step: Human-readable description of current operation
|
|
"""
|
|
with self._lock:
|
|
current_time = time.time()
|
|
if current_time - self._last_update_time < self._min_update_interval:
|
|
if percent < 100:
|
|
return
|
|
|
|
job = self.storage.get(self.job_id)
|
|
if job:
|
|
# Never decrease progress — only move forward.
|
|
new_percent = min(100, max(0, percent))
|
|
job["progress_percent"] = max(job.get("progress_percent", 0), new_percent)
|
|
job["current_step"] = step
|
|
job["processed_items"] = job.get("processed_items", 0)
|
|
job["total_items"] = job.get("total_items", 0)
|
|
self._last_update_time = current_time
|
|
|
|
def update_item(
|
|
self, current: int, total: int, item_name: str, max_percent: int = 100
|
|
) -> None:
|
|
"""
|
|
Update progress based on item count (e.g., slides, sheets).
|
|
|
|
Calculates percentage from current/total and formats step message.
|
|
|
|
Args:
|
|
current: Current item number (1-based)
|
|
total: Total number of items
|
|
item_name: Name of item type (e.g., "Translating slide", "Processing sheet")
|
|
max_percent: Upper bound for the computed percentage (default 100).
|
|
Use 95 to reserve the last 5% for file-save + set_completed().
|
|
"""
|
|
percent = int((current / total) * 100) if total > 0 else 0
|
|
percent = min(percent, max_percent)
|
|
step = f"{item_name} {current}/{total}"
|
|
|
|
with self._lock:
|
|
current_time = time.time()
|
|
if current_time - self._last_update_time < self._min_update_interval:
|
|
if percent < 100:
|
|
return
|
|
|
|
job = self.storage.get(self.job_id)
|
|
if job:
|
|
# Never decrease progress — only move forward.
|
|
new_percent = min(100, max(0, percent))
|
|
job["progress_percent"] = max(job.get("progress_percent", 0), new_percent)
|
|
job["current_step"] = step
|
|
job["processed_items"] = current
|
|
job["total_items"] = total
|
|
self._last_update_time = current_time
|
|
|
|
def set_error(
|
|
self, error_message: str, step: str = "Error during translation"
|
|
) -> None:
|
|
"""
|
|
Mark job as failed with error message.
|
|
|
|
Args:
|
|
error_message: Description of the error
|
|
step: Current step description (default: "Error during translation")
|
|
"""
|
|
with self._lock:
|
|
job = self.storage.get(self.job_id)
|
|
if job:
|
|
job["status"] = "failed"
|
|
job["error_message"] = error_message
|
|
job["current_step"] = step
|
|
job["failed_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
|
|
|
|
def set_completed(self, output_path: Optional[str] = None) -> None:
|
|
"""
|
|
Mark job as completed.
|
|
|
|
Args:
|
|
output_path: Optional path to the output file
|
|
"""
|
|
with self._lock:
|
|
job = self.storage.get(self.job_id)
|
|
if job:
|
|
job["status"] = "completed"
|
|
job["progress_percent"] = 100
|
|
job["current_step"] = "Translation complete"
|
|
job["completed_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
|
|
if output_path:
|
|
job["output_path"] = str(output_path)
|
|
|
|
|
|
def create_progress_callback(
|
|
tracker: ProgressTracker, item_name: str, total_items: int
|
|
) -> Callable[[Dict[str, Any]], None]:
|
|
"""
|
|
Create a progress callback function for use with translators.
|
|
|
|
Args:
|
|
tracker: ProgressTracker instance
|
|
item_name: Name of item being processed (e.g., "Translating slide")
|
|
total_items: Total number of items
|
|
|
|
Returns:
|
|
Callback function compatible with translator progress_callback parameter
|
|
"""
|
|
|
|
def callback(progress_info: Dict[str, Any]) -> None:
|
|
"""Progress callback that updates the tracker."""
|
|
# Extract item number from progress_info dict
|
|
# Different translators use different keys
|
|
current = progress_info.get(
|
|
"slide",
|
|
progress_info.get(
|
|
"sheet", progress_info.get("paragraph", progress_info.get("element", 1))
|
|
),
|
|
)
|
|
total = progress_info.get(
|
|
"total_slides",
|
|
progress_info.get(
|
|
"total", progress_info.get("total_paragraphs", total_items)
|
|
),
|
|
)
|
|
|
|
tracker.update_item(current, total, item_name)
|
|
|
|
return callback
|