Major changes across backend, frontend, infrastructure: - Provider system with model selection (Google, DeepL, OpenAI, Ollama, Google Cloud) - Admin panel: user management, pricing, settings - Glossary system with CSV import/export - Subscription and tier quota management - Security hardening (rate limiting, API key auth, path traversal fixes) - Docker compose for dev, prod, and IONOS deployment - Alembic migrations for new tables - Frontend: dashboard, pricing page, landing page, i18n (en/fr) - Test suite and verification scripts Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
74 lines
2.3 KiB
Python
74 lines
2.3 KiB
Python
import pytest
|
|
import json
|
|
from unittest.mock import AsyncMock, patch, MagicMock
|
|
from services.storage_tracker import StorageTracker
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_track_file_redis_success():
|
|
tracker = StorageTracker()
|
|
mock_redis = AsyncMock()
|
|
tracker._redis = mock_redis
|
|
|
|
job_id = "tr_123456789012"
|
|
metadata = {
|
|
"original_filename": "test.docx",
|
|
"file_size": 1024,
|
|
"file_hash": "abcde12345",
|
|
"user_id": "user_1"
|
|
}
|
|
|
|
success = await tracker.track_file(job_id, metadata)
|
|
|
|
assert success is True
|
|
# Check if set was called with correct key and serialized json
|
|
args, kwargs = mock_redis.set.call_args
|
|
assert args[0] == f"translation:file:{job_id}"
|
|
stored_data = json.loads(args[1])
|
|
assert stored_data["original_filename"] == "test.docx"
|
|
assert "timestamp" in stored_data
|
|
assert kwargs["ex"] == 3600
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_file_metadata_success():
|
|
tracker = StorageTracker()
|
|
mock_redis = AsyncMock()
|
|
tracker._redis = mock_redis
|
|
|
|
job_id = "tr_123456789012"
|
|
stored_json = json.dumps({"original_filename": "found.xlsx"})
|
|
mock_redis.get.return_value = stored_json
|
|
|
|
metadata = await tracker.get_file_metadata(job_id)
|
|
|
|
assert metadata is not None
|
|
assert metadata["original_filename"] == "found.xlsx"
|
|
mock_redis.get.assert_called_with(f"translation:file:{job_id}")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_track_file_logging():
|
|
tracker = StorageTracker()
|
|
mock_redis = AsyncMock()
|
|
tracker._redis = mock_redis
|
|
|
|
job_id = "tr_log_test"
|
|
metadata = {
|
|
"original_filename": "log.docx",
|
|
"file_size": 2048,
|
|
"file_hash": "hash_val",
|
|
"user_id": "user_2"
|
|
}
|
|
|
|
# Mock _log_info to capture call
|
|
with patch("services.storage_tracker._log_info") as mock_log:
|
|
await tracker.track_file(job_id, metadata)
|
|
|
|
assert mock_log.call_count >= 1
|
|
# Check first call
|
|
args, kwargs = mock_log.call_args_list[0]
|
|
assert args[0] == "file_uploaded"
|
|
assert kwargs["job_id"] == job_id
|
|
assert kwargs["original_filename"] == "log.docx"
|
|
assert kwargs["file_hash"] == "hash_val"
|
|
assert kwargs["user_id"] == "user_2"
|
|
assert "timestamp" in kwargs
|