""" Unit tests for progress tracking functionality (Story 2.11). Tests cover: - ProgressTracker class - Job data model extensions - Status endpoint with progress fields - Status transitions (queued -> processing -> completed/failed) """ import pytest from datetime import datetime, timezone from unittest.mock import Mock, patch, MagicMock import asyncio import sys class TestProgressTracker: """Tests for ProgressTracker class.""" def test_progress_tracker_update_sets_percent_and_step(self): """Test that update() sets progress_percent and current_step correctly.""" from services.progress_tracker import ProgressTracker storage = {} tracker = ProgressTracker("job_123", storage) storage["job_123"] = {"id": "job_123"} tracker.update(50, "Translating sheet 2/4") assert storage["job_123"]["progress_percent"] == 50 assert storage["job_123"]["current_step"] == "Translating sheet 2/4" def test_progress_tracker_update_clamps_percent_to_0_100(self): """Test that update() clamps progress_percent between 0 and 100.""" from services.progress_tracker import ProgressTracker storage = {} tracker = ProgressTracker("job_123", storage) storage["job_123"] = {"id": "job_123"} tracker.update(-10, "Testing negative") assert storage["job_123"]["progress_percent"] == 0 tracker.update(150, "Testing overflow") assert storage["job_123"]["progress_percent"] == 100 def test_progress_tracker_update_item_calculates_percent(self): """Test that update_item() calculates percent from current/total.""" from services.progress_tracker import ProgressTracker storage = {} tracker = ProgressTracker("job_123", storage) storage["job_123"] = {"id": "job_123"} tracker.update_item(3, 10, "Translating slide") assert storage["job_123"]["progress_percent"] == 30 assert storage["job_123"]["current_step"] == "Translating slide 3/10" def test_progress_tracker_update_item_handles_zero_total(self): """Test that update_item() handles zero total gracefully.""" from services.progress_tracker import ProgressTracker storage = {} tracker = ProgressTracker("job_123", storage) storage["job_123"] = {"id": "job_123"} tracker.update_item(0, 0, "Processing") assert storage["job_123"]["progress_percent"] == 0 assert storage["job_123"]["current_step"] == "Processing 0/0" def test_progress_tracker_sets_total_and_processed_items(self): """Test that update_item() sets total_items and processed_items.""" from services.progress_tracker import ProgressTracker storage = {} tracker = ProgressTracker("job_123", storage) storage["job_123"] = {"id": "job_123"} tracker.update_item(5, 20, "Processing") assert storage["job_123"]["processed_items"] == 5 assert storage["job_123"]["total_items"] == 20 def test_progress_tracker_no_op_for_missing_job(self): """Test that update() does nothing if job doesn't exist.""" from services.progress_tracker import ProgressTracker storage = {} tracker = ProgressTracker("job_nonexistent", storage) tracker.update(50, "Should not crash") assert "job_nonexistent" not in storage class TestJobDataModelExtensions: """Tests for extended job data model fields - using mocked storage.""" def test_job_creation_includes_progress_fields(self): """Test that new jobs include all progress-related fields.""" storage = { "job_id": { "id": "tr_test_123", "status": "queued", "progress_percent": 0, "current_step": "Initializing", "total_items": 0, "processed_items": 0, "error_message": None, } } job = storage["job_id"] assert job["progress_percent"] == 0 assert job["current_step"] == "Initializing" assert job["total_items"] == 0 assert job["processed_items"] == 0 assert job["error_message"] is None def test_job_status_transitions_from_queued_to_processing(self): """Test job status transitions from queued to processing.""" storage = { "job_id": { "id": "tr_test_456", "status": "queued", "progress_percent": 0, } } storage["job_id"]["status"] = "processing" storage["job_id"]["progress_percent"] = 10 assert storage["job_id"]["status"] == "processing" assert storage["job_id"]["progress_percent"] == 10 def test_job_failed_status_includes_error_message(self): """Test that failed jobs include error_message.""" storage = { "job_id": { "id": "tr_test_789", "status": "failed", "progress_percent": 30, "current_step": "Error during translation", "error_message": "Provider unavailable: timeout after 30s", } } job = storage["job_id"] assert job["status"] == "failed" assert job["error_message"] == "Provider unavailable: timeout after 30s" class TestTranslationStatusEndpoint: """Tests for GET /api/v1/translations/{job_id} endpoint - using mocks.""" @pytest.mark.asyncio async def test_status_endpoint_returns_progress_fields(self): """Test that status endpoint returns all progress fields.""" mock_jobs = { "tr_status_test_1": { "id": "tr_status_test_1", "status": "processing", "progress_percent": 45, "current_step": "Translating sheet 2/5", "file_name": "report.xlsx", "source_lang": "en", "target_lang": "fr", "created_at": "2024-01-15T10:30:00Z", "total_items": 5, "processed_items": 2, } } job = mock_jobs["tr_status_test_1"] response_data = { "id": job["id"], "status": job["status"], "progress_percent": job.get("progress_percent", 0), "current_step": job.get("current_step", "Unknown"), } assert response_data["id"] == "tr_status_test_1" assert response_data["status"] == "processing" assert response_data["progress_percent"] == 45 assert response_data["current_step"] == "Translating sheet 2/5" @pytest.mark.asyncio async def test_status_endpoint_returns_404_for_nonexistent_job(self): """Test that status endpoint returns 404 for non-existent job.""" mock_jobs = {} job = mock_jobs.get("tr_nonexistent") assert job is None @pytest.mark.asyncio async def test_status_endpoint_includes_error_message_for_failed(self): """Test that failed status includes error_message field.""" mock_jobs = { "tr_failed_test_1": { "id": "tr_failed_test_1", "status": "failed", "progress_percent": 30, "current_step": "Error during translation", "error_message": "Provider unavailable: timeout", "file_name": "report.xlsx", "source_lang": "en", "target_lang": "fr", "created_at": "2024-01-15T10:30:00Z", "failed_at": "2024-01-15T10:30:15Z", } } job = mock_jobs["tr_failed_test_1"] response_data = { "status": job["status"], "error_message": job.get("error_message"), } assert response_data["status"] == "failed" assert response_data["error_message"] == "Provider unavailable: timeout" @pytest.mark.asyncio async def test_status_endpoint_returns_completed_at_for_completed_jobs(self): """Test that completed jobs include completed_at timestamp.""" mock_jobs = { "tr_completed_test_1": { "id": "tr_completed_test_1", "status": "completed", "progress_percent": 100, "current_step": "Translation complete", "file_name": "report.xlsx", "source_lang": "en", "target_lang": "fr", "created_at": "2024-01-15T10:30:00Z", "completed_at": "2024-01-15T10:30:45Z", } } job = mock_jobs["tr_completed_test_1"] assert job["status"] == "completed" assert "completed_at" in job class TestLLMProgressDetails: """Tests for LLM mode progress details.""" def test_progress_shows_slide_format(self): """Test that progress shows 'Translating slide X/Y' format for PPTX.""" from services.progress_tracker import ProgressTracker storage = {} tracker = ProgressTracker("job_pptx", storage) storage["job_pptx"] = {"id": "job_pptx"} tracker.update_item(3, 10, "Translating slide") assert "Translating slide 3/10" == storage["job_pptx"]["current_step"] def test_progress_shows_sheet_format(self): """Test that progress shows 'Translating sheet X/Y' format for Excel.""" from services.progress_tracker import ProgressTracker storage = {} tracker = ProgressTracker("job_excel", storage) storage["job_excel"] = {"id": "job_excel"} tracker.update_item(2, 5, "Translating sheet") assert "Translating sheet 2/5" == storage["job_excel"]["current_step"] class TestStatusTransitions: """Tests for job status transitions.""" def test_status_queued_to_processing(self): """Test transition from queued to processing.""" from services.progress_tracker import ProgressTracker storage = {"job_1": {"id": "job_1", "status": "queued", "progress_percent": 0}} tracker = ProgressTracker("job_1", storage) storage["job_1"]["status"] = "processing" tracker.update(10, "Starting translation") assert storage["job_1"]["status"] == "processing" assert storage["job_1"]["progress_percent"] == 10 def test_status_processing_to_completed(self): """Test transition from processing to completed.""" from services.progress_tracker import ProgressTracker storage = { "job_2": {"id": "job_2", "status": "processing", "progress_percent": 90} } tracker = ProgressTracker("job_2", storage) storage["job_2"]["status"] = "completed" tracker.update(100, "Translation complete") assert storage["job_2"]["status"] == "completed" assert storage["job_2"]["progress_percent"] == 100 def test_status_processing_to_failed(self): """Test transition from processing to failed with error message.""" storage = { "job_3": { "id": "job_3", "status": "processing", "progress_percent": 30, } } storage["job_3"]["status"] = "failed" storage["job_3"]["error_message"] = "Provider timeout" assert storage["job_3"]["status"] == "failed" assert storage["job_3"]["error_message"] == "Provider timeout" def test_progress_tracker_throttle_is_thread_safe(self): """Test that throttling check happens inside the lock to prevent race conditions.""" from services.progress_tracker import ProgressTracker import threading storage = { "job_race": {"id": "job_race", "progress_percent": 0, "current_step": ""} } tracker = ProgressTracker("job_race", storage) results = [] def update_many(): for i in range(10): tracker.update(i * 10, f"Step {i}") results.append(storage["job_race"]["progress_percent"]) threads = [threading.Thread(target=update_many) for _ in range(3)] for t in threads: t.start() for t in threads: t.join() assert storage["job_race"]["progress_percent"] >= 0 assert storage["job_race"]["progress_percent"] <= 100 class TestEstimatedRemainingSeconds: """Tests for estimated_remaining_seconds calculation.""" def test_estimated_remaining_calculated_for_processing_jobs(self): """Test that estimated_remaining_seconds is calculated for processing jobs.""" from datetime import datetime, timezone, timedelta created_at = (datetime.now(timezone.utc) - timedelta(seconds=10)).isoformat() mock_jobs = { "tr_est_test": { "id": "tr_est_test", "status": "processing", "progress_percent": 50, "current_step": "Translating", "created_at": created_at, } } job = mock_jobs["tr_est_test"] progress_percent = job.get("progress_percent", 0) estimated_remaining = None if progress_percent > 0: created_at_dt = datetime.fromisoformat(created_at.replace("Z", "+00:00")) elapsed = (datetime.now(timezone.utc) - created_at_dt).total_seconds() total_estimated = elapsed / (progress_percent / 100) estimated_remaining = max(1, int(total_estimated - elapsed)) assert estimated_remaining is not None assert estimated_remaining >= 1 def test_estimated_remaining_none_for_completed_jobs(self): """Test that estimated_remaining_seconds is None for completed jobs.""" mock_jobs = { "tr_completed": { "id": "tr_completed", "status": "completed", "progress_percent": 100, } } job = mock_jobs["tr_completed"] estimated_remaining = None if job["status"] == "processing" and job.get("progress_percent", 0) > 0: estimated_remaining = 0 assert estimated_remaining is None class TestJobCleanup: """Tests for job cleanup mechanism.""" def test_cleanup_removes_old_completed_jobs(self): """Test that completed jobs older than TTL are cleaned up.""" from datetime import datetime, timezone, timedelta import time as time_module old_completed_at = (datetime.now(timezone.utc) - timedelta(hours=2)).isoformat() mock_jobs = { "tr_old_completed": { "id": "tr_old_completed", "status": "completed", "completed_at": old_completed_at, }, "tr_recent_completed": { "id": "tr_recent_completed", "status": "completed", "completed_at": datetime.now(timezone.utc).isoformat(), }, } ttl_seconds = 3600 expired = [] current_time = time_module.time() for job_id, job in mock_jobs.items(): if job.get("status") in ("completed", "failed"): completed_at = job.get("completed_at") or job.get("failed_at") if completed_at: try: completed_ts = datetime.fromisoformat( completed_at.replace("Z", "+00:00") ).timestamp() if current_time - completed_ts > ttl_seconds: expired.append(job_id) except Exception: pass assert "tr_old_completed" in expired assert "tr_recent_completed" not in expired