Major changes across backend, frontend, infrastructure: - Provider system with model selection (Google, DeepL, OpenAI, Ollama, Google Cloud) - Admin panel: user management, pricing, settings - Glossary system with CSV import/export - Subscription and tier quota management - Security hardening (rate limiting, API key auth, path traversal fixes) - Docker compose for dev, prod, and IONOS deployment - Alembic migrations for new tables - Frontend: dashboard, pricing page, landing page, i18n (en/fr) - Test suite and verification scripts Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
271 lines
8.6 KiB
Python
271 lines
8.6 KiB
Python
import os
|
|
import sys
|
|
import unittest
|
|
from pathlib import Path
|
|
import shutil
|
|
import time
|
|
from unittest.mock import patch, AsyncMock
|
|
import pytest
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
|
|
class TestCleanupConfig(unittest.TestCase):
|
|
def test_cleanup_interval_default(self):
|
|
"""Test that the default cleanup interval is 5 minutes as per Story 2.15"""
|
|
import importlib
|
|
import config
|
|
|
|
old_val = os.environ.get("CLEANUP_INTERVAL_MINUTES")
|
|
if "CLEANUP_INTERVAL_MINUTES" in os.environ:
|
|
del os.environ["CLEANUP_INTERVAL_MINUTES"]
|
|
|
|
try:
|
|
importlib.reload(config)
|
|
self.assertEqual(
|
|
config.config.CLEANUP_INTERVAL_MINUTES,
|
|
5,
|
|
"Default CLEANUP_INTERVAL_MINUTES should be 5",
|
|
)
|
|
finally:
|
|
if old_val is not None:
|
|
os.environ["CLEANUP_INTERVAL_MINUTES"] = old_val
|
|
importlib.reload(config)
|
|
|
|
def test_cleanup_interval_env_override(self):
|
|
"""Test that CLEANUP_INTERVAL_MINUTES can be overridden via env var (AC: #2)"""
|
|
import importlib
|
|
import config
|
|
|
|
old_val = os.environ.get("CLEANUP_INTERVAL_MINUTES")
|
|
os.environ["CLEANUP_INTERVAL_MINUTES"] = "10"
|
|
|
|
try:
|
|
importlib.reload(config)
|
|
self.assertEqual(
|
|
config.config.CLEANUP_INTERVAL_MINUTES,
|
|
10,
|
|
"CLEANUP_INTERVAL_MINUTES should be 10 when set via env",
|
|
)
|
|
finally:
|
|
if old_val is not None:
|
|
os.environ["CLEANUP_INTERVAL_MINUTES"] = old_val
|
|
else:
|
|
del os.environ["CLEANUP_INTERVAL_MINUTES"]
|
|
importlib.reload(config)
|
|
|
|
|
|
@pytest.fixture
|
|
def temp_dirs():
|
|
"""Create temporary test directories."""
|
|
test_dir = Path("temp_test_cleanup")
|
|
uploads = test_dir / "uploads"
|
|
outputs = test_dir / "outputs"
|
|
temp = test_dir / "temp"
|
|
|
|
for d in [uploads, outputs, temp]:
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
|
|
yield {"test_dir": test_dir, "uploads": uploads, "outputs": outputs, "temp": temp}
|
|
|
|
if test_dir.exists():
|
|
shutil.rmtree(test_dir)
|
|
|
|
|
|
_cleanup_module = None
|
|
|
|
|
|
def _get_cleanup_module():
|
|
"""Load cleanup module without triggering middleware/__init__.py"""
|
|
global _cleanup_module
|
|
if _cleanup_module is not None:
|
|
return _cleanup_module
|
|
|
|
import importlib.util
|
|
|
|
cleanup_path = Path(__file__).parent.parent / "middleware" / "cleanup.py"
|
|
spec = importlib.util.spec_from_file_location("cleanup_module_direct", cleanup_path)
|
|
if spec is None or spec.loader is None:
|
|
raise ImportError(f"Could not load spec from {cleanup_path}")
|
|
_cleanup_module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(_cleanup_module)
|
|
return _cleanup_module
|
|
|
|
|
|
def _get_redis_patcher(mock_redis):
|
|
"""Create a patcher for _get_async_redis in the cleanup module."""
|
|
module = _get_cleanup_module()
|
|
return patch.object(module, "_get_async_redis", return_value=mock_redis)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_orphan_deletion(temp_dirs):
|
|
"""Test that orphaned files are deleted as per Story 2.15 (AC: #4)"""
|
|
cleanup_mod = _get_cleanup_module()
|
|
FileCleanupManager = cleanup_mod.FileCleanupManager
|
|
|
|
uploads = temp_dirs["uploads"]
|
|
outputs = temp_dirs["outputs"]
|
|
temp = temp_dirs["temp"]
|
|
|
|
tracked_file = uploads / "tracked.txt"
|
|
tracked_file.write_text("I am tracked")
|
|
|
|
orphan_file = uploads / "orphan.txt"
|
|
orphan_file.write_text("I am an orphan")
|
|
|
|
manager = FileCleanupManager(uploads, outputs, temp, cleanup_interval_minutes=5)
|
|
|
|
mock_redis = AsyncMock()
|
|
mock_redis.keys.return_value = ["translation:file:job1"]
|
|
mock_redis.get.return_value = (
|
|
'{"file_path": "' + str(tracked_file.absolute()) + '"}'
|
|
)
|
|
|
|
with _get_redis_patcher(mock_redis):
|
|
stats = await manager.cleanup()
|
|
|
|
assert not orphan_file.exists(), "Orphan file should be deleted"
|
|
assert tracked_file.exists(), "Tracked file should be preserved"
|
|
assert "orphaned_deleted" in stats, "Stats should contain orphaned_deleted count"
|
|
assert stats["orphaned_deleted"] >= 1, "Should have deleted at least 1 orphan"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ttl_deletion(temp_dirs):
|
|
"""Test that files older than TTL are deleted (AC: #3)"""
|
|
cleanup_mod = _get_cleanup_module()
|
|
FileCleanupManager = cleanup_mod.FileCleanupManager
|
|
|
|
uploads = temp_dirs["uploads"]
|
|
outputs = temp_dirs["outputs"]
|
|
temp = temp_dirs["temp"]
|
|
|
|
old_file = uploads / "old.txt"
|
|
old_file.write_text("I am old")
|
|
|
|
past_time = time.time() - (2 * 3600)
|
|
os.utime(old_file, (past_time, past_time))
|
|
|
|
new_file = uploads / "new.txt"
|
|
new_file.write_text("I am new")
|
|
|
|
manager = FileCleanupManager(uploads, outputs, temp, max_file_age_minutes=60)
|
|
|
|
mock_redis = AsyncMock()
|
|
mock_redis.keys.return_value = ["job1", "job2"]
|
|
mock_redis.get.side_effect = [
|
|
'{"file_path": "' + str(old_file.absolute()) + '"}',
|
|
'{"file_path": "' + str(new_file.absolute()) + '"}',
|
|
]
|
|
|
|
with _get_redis_patcher(mock_redis):
|
|
await manager.cleanup()
|
|
|
|
assert not old_file.exists(), "Old file (2h old) should be deleted"
|
|
assert new_file.exists(), "New file should be preserved"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cleanup_resilience(temp_dirs):
|
|
"""Test that cleanup continues after individual failure (AC: #6)"""
|
|
cleanup_mod = _get_cleanup_module()
|
|
FileCleanupManager = cleanup_mod.FileCleanupManager
|
|
|
|
uploads = temp_dirs["uploads"]
|
|
outputs = temp_dirs["outputs"]
|
|
temp = temp_dirs["temp"]
|
|
|
|
f1 = uploads / "file1.txt"
|
|
f1.write_text("file1")
|
|
f2 = uploads / "file2.txt"
|
|
f2.write_text("file2")
|
|
|
|
manager = FileCleanupManager(uploads, outputs, temp, max_file_age_minutes=1)
|
|
|
|
original_unlink = Path.unlink
|
|
call_count = [0]
|
|
|
|
def failing_unlink(self, *args, **kwargs):
|
|
call_count[0] += 1
|
|
if call_count[0] == 1:
|
|
raise PermissionError("Cannot delete file")
|
|
return original_unlink(self, *args, **kwargs)
|
|
|
|
mock_redis = AsyncMock()
|
|
mock_redis.keys.return_value = []
|
|
|
|
with _get_redis_patcher(mock_redis):
|
|
with patch.object(Path, "unlink", failing_unlink):
|
|
stats = await manager.cleanup()
|
|
|
|
assert len(stats["errors"]) >= 1, "Should have recorded at least one error"
|
|
assert call_count[0] >= 2, "Should have attempted to delete both files (resilience)"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_logging_format(temp_dirs):
|
|
"""Test that structured logging is used (AC: #5)"""
|
|
cleanup_mod = _get_cleanup_module()
|
|
FileCleanupManager = cleanup_mod.FileCleanupManager
|
|
|
|
uploads = temp_dirs["uploads"]
|
|
outputs = temp_dirs["outputs"]
|
|
temp = temp_dirs["temp"]
|
|
|
|
manager = FileCleanupManager(uploads, outputs, temp)
|
|
|
|
mock_redis = AsyncMock()
|
|
mock_redis.keys.return_value = []
|
|
|
|
with _get_redis_patcher(mock_redis):
|
|
with patch.object(cleanup_mod, "logger") as mock_log:
|
|
await manager.cleanup()
|
|
|
|
assert mock_log.info.called, "Logger should be called"
|
|
|
|
found_cleanup_log = False
|
|
for call in mock_log.info.call_args_list:
|
|
args, kwargs = call
|
|
if args and "cleanup_completed" in str(args[0]):
|
|
found_cleanup_log = True
|
|
assert "files_deleted" in kwargs or any(
|
|
"files_deleted" in str(a) for a in args
|
|
), "Log should contain files_deleted"
|
|
assert "bytes_freed_mb" in kwargs or any(
|
|
"bytes_freed_mb" in str(a) for a in args
|
|
), "Log should contain bytes_freed_mb"
|
|
break
|
|
|
|
assert found_cleanup_log, "Should have logged cleanup_completed event"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_redis_unavailable_graceful(temp_dirs):
|
|
"""Test that cleanup works when Redis is unavailable"""
|
|
cleanup_mod = _get_cleanup_module()
|
|
FileCleanupManager = cleanup_mod.FileCleanupManager
|
|
|
|
uploads = temp_dirs["uploads"]
|
|
outputs = temp_dirs["outputs"]
|
|
temp = temp_dirs["temp"]
|
|
|
|
old_file = uploads / "old.txt"
|
|
old_file.write_text("I am old")
|
|
past_time = time.time() - (2 * 3600)
|
|
os.utime(old_file, (past_time, past_time))
|
|
|
|
manager = FileCleanupManager(uploads, outputs, temp, max_file_age_minutes=60)
|
|
|
|
with patch.object(cleanup_mod, "_get_async_redis", return_value=None):
|
|
stats = await manager.cleanup()
|
|
|
|
assert not old_file.exists(), (
|
|
"Old file should still be deleted (age-based) even without Redis"
|
|
)
|
|
assert stats["files_deleted"] >= 1, "Should have deleted old file"
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|