Files
office_translator/middleware/cleanup.py
Sepehr Ramezani 26bd096a06 feat: production deployment - full update with providers, admin, glossaries, pricing, tests
Major changes across backend, frontend, infrastructure:
- Provider system with model selection (Google, DeepL, OpenAI, Ollama, Google Cloud)
- Admin panel: user management, pricing, settings
- Glossary system with CSV import/export
- Subscription and tier quota management
- Security hardening (rate limiting, API key auth, path traversal fixes)
- Docker compose for dev, prod, and IONOS deployment
- Alembic migrations for new tables
- Frontend: dashboard, pricing page, landing page, i18n (en/fr)
- Test suite and verification scripts

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-04-25 15:01:47 +02:00

478 lines
16 KiB
Python

"""
Cleanup and Resource Management for SaaS robustness
Automatic cleanup of temporary files and resources
"""
import os
import time
import asyncio
import threading
from pathlib import Path
from datetime import datetime, timedelta
from typing import Optional, Set
import logging
import json
from services.storage_tracker import _get_async_redis, KEY_PREFIX
from core.logging import get_logger
logger = get_logger(__name__)
_HAS_STRUCTLOG = True
class FileCleanupManager:
"""Manages automatic cleanup of temporary and output files"""
def __init__(
self,
upload_dir: Path,
output_dir: Path,
temp_dir: Path,
max_file_age_minutes: int = 60,
cleanup_interval_minutes: int = 5,
max_total_size_gb: float = 10.0,
):
self.upload_dir = Path(upload_dir)
self.output_dir = Path(output_dir)
self.temp_dir = Path(temp_dir)
self.max_file_age_seconds = max_file_age_minutes * 60
self.cleanup_interval = cleanup_interval_minutes * 60
self.max_total_size_bytes = int(max_total_size_gb * 1024 * 1024 * 1024)
self._running = False
self._task: Optional[asyncio.Task] = None
self._protected_files: Set[str] = set()
self._tracked_files: dict = {} # filepath -> {created, ttl_minutes}
self._lock = threading.Lock()
self._stats = {"files_cleaned": 0, "bytes_freed": 0, "cleanup_runs": 0}
async def track_file(self, filepath: Path, ttl_minutes: int = 60):
"""Track a file for automatic cleanup after TTL expires"""
with self._lock:
self._tracked_files[str(filepath)] = {
"created": time.time(),
"ttl_minutes": ttl_minutes,
"expires_at": time.time() + (ttl_minutes * 60),
}
def get_tracked_files(self) -> list:
"""Get list of currently tracked files with their status"""
now = time.time()
result = []
with self._lock:
for filepath, info in self._tracked_files.items():
remaining = info["expires_at"] - now
result.append(
{
"path": filepath,
"exists": Path(filepath).exists(),
"expires_in_seconds": max(0, int(remaining)),
"ttl_minutes": info["ttl_minutes"],
}
)
return result
async def cleanup_expired(self) -> int:
"""Cleanup expired tracked files"""
now = time.time()
cleaned = 0
to_remove = []
with self._lock:
for filepath, info in list(self._tracked_files.items()):
if now > info["expires_at"]:
to_remove.append(filepath)
for filepath in to_remove:
try:
path = Path(filepath)
if path.exists() and not self.is_protected(path):
size = path.stat().st_size
path.unlink()
cleaned += 1
self._stats["files_cleaned"] += 1
self._stats["bytes_freed"] += size
logger.info(f"Cleaned expired file: {filepath}")
with self._lock:
self._tracked_files.pop(filepath, None)
except Exception as e:
logger.warning(f"Failed to clean expired file {filepath}: {e}")
return cleaned
def get_stats(self) -> dict:
"""Get cleanup statistics"""
disk_usage = self.get_disk_usage()
with self._lock:
tracked_count = len(self._tracked_files)
return {
"files_cleaned_total": self._stats["files_cleaned"],
"bytes_freed_total_mb": round(
self._stats["bytes_freed"] / (1024 * 1024), 2
),
"cleanup_runs": self._stats["cleanup_runs"],
"tracked_files": tracked_count,
"disk_usage": disk_usage,
"is_running": self._running,
}
def protect_file(self, filepath: Path):
"""Mark a file as protected (being processed)"""
with self._lock:
self._protected_files.add(str(filepath))
def unprotect_file(self, filepath: Path):
"""Remove protection from a file"""
with self._lock:
self._protected_files.discard(str(filepath))
def is_protected(self, filepath: Path) -> bool:
"""Check if a file is protected"""
with self._lock:
return str(filepath) in self._protected_files
async def start(self):
"""Start the cleanup background task"""
if self._running:
return
self._running = True
self._task = asyncio.create_task(self._cleanup_loop())
logger.info("File cleanup manager started")
async def stop(self):
"""Stop the cleanup background task"""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("File cleanup manager stopped")
async def _cleanup_loop(self):
"""Background loop for periodic cleanup"""
while self._running:
try:
await self.cleanup()
await self.cleanup_expired()
self._stats["cleanup_runs"] += 1
except Exception as e:
logger.error(f"Cleanup error: {e}")
await asyncio.sleep(self.cleanup_interval)
async def cleanup(self) -> dict:
"""Perform cleanup of old files and orphans"""
stats = {
"files_deleted": 0,
"bytes_freed": 0,
"orphaned_deleted": 0,
"errors": [],
}
now = time.time()
# Get tracked paths from Redis to identify orphans
tracked_paths = set()
redis_client = _get_async_redis()
redis_available = redis_client is not None
if redis_client:
try:
keys = await redis_client.keys(f"{KEY_PREFIX}:*")
for key in keys:
data = await redis_client.get(key)
if data:
metadata = json.loads(data)
if "file_path" in metadata:
# Normalize path to absolute string for comparison
path_str = str(Path(metadata["file_path"]).absolute())
tracked_paths.add(path_str)
except Exception as e:
logger.warning(f"Failed to fetch tracked paths from Redis: {e}")
redis_available = False
else:
logger.warning(
"Redis unavailable - orphan detection disabled, using age-based cleanup only"
)
# Cleanup each directory (collect files first to avoid race condition)
for directory in [self.upload_dir, self.output_dir, self.temp_dir]:
if not directory.exists():
continue
try:
files_to_check = list(directory.iterdir())
except OSError as e:
logger.warning(f"Failed to list directory {directory}: {e}")
continue
for filepath in files_to_check:
if not filepath.is_file():
continue
# Skip protected files
if self.is_protected(filepath):
continue
try:
# Check if it's an orphan (only if Redis is available)
abs_path = str(filepath.absolute())
is_orphan = redis_available and abs_path not in tracked_paths
# Check file age
file_age = now - filepath.stat().st_mtime
should_delete = False
reason = ""
if is_orphan:
should_delete = True
reason = "orphan"
elif file_age > self.max_file_age_seconds:
should_delete = True
reason = "expired"
if should_delete:
file_size = filepath.stat().st_size
filepath.unlink()
stats["files_deleted"] += 1
stats["bytes_freed"] += file_size
if reason == "orphan":
stats["orphaned_deleted"] += 1
logger.info(f"Deleted {reason} file: {filepath}")
except Exception as e:
stats["errors"].append(str(e))
logger.warning(f"Failed to delete {filepath}: {e}")
# Force cleanup if total size exceeds limit
await self._enforce_size_limit(stats)
mb_freed = stats["bytes_freed"] / (1024 * 1024)
cleanup_timestamp = datetime.now().isoformat()
# Structured logging (AC: #5)
log_data = {
"files_deleted": stats["files_deleted"],
"bytes_freed_mb": round(mb_freed, 2),
"orphaned_deleted": stats["orphaned_deleted"],
"cleanup_run_timestamp": cleanup_timestamp,
}
if _HAS_STRUCTLOG:
logger.info("cleanup_completed", **log_data)
else:
logger.info(f"Cleanup completed: {log_data}")
return stats
async def _enforce_size_limit(self, stats: dict):
"""Delete oldest files if total size exceeds limit"""
files_with_mtime = []
total_size = 0
for directory in [self.upload_dir, self.output_dir, self.temp_dir]:
if not directory.exists():
continue
for filepath in directory.iterdir():
if not filepath.is_file() or self.is_protected(filepath):
continue
try:
stat = filepath.stat()
files_with_mtime.append((filepath, stat.st_mtime, stat.st_size))
total_size += stat.st_size
except Exception:
pass
# If under limit, nothing to do
if total_size <= self.max_total_size_bytes:
return
# Sort by modification time (oldest first)
files_with_mtime.sort(key=lambda x: x[1])
# Delete oldest files until under limit
for filepath, _, size in files_with_mtime:
if total_size <= self.max_total_size_bytes:
break
try:
filepath.unlink()
total_size -= size
stats["files_deleted"] += 1
stats["bytes_freed"] += size
logger.info(f"Deleted file to free space: {filepath}")
except Exception as e:
stats["errors"].append(str(e))
def get_disk_usage(self) -> dict:
"""Get current disk usage statistics"""
total_files = 0
total_size = 0
for directory in [self.upload_dir, self.output_dir, self.temp_dir]:
if not directory.exists():
continue
for filepath in directory.iterdir():
if filepath.is_file():
total_files += 1
try:
total_size += filepath.stat().st_size
except Exception:
pass
return {
"total_files": total_files,
"total_size_mb": round(total_size / (1024 * 1024), 2),
"max_size_gb": self.max_total_size_bytes / (1024 * 1024 * 1024),
"usage_percent": round((total_size / self.max_total_size_bytes) * 100, 1)
if self.max_total_size_bytes > 0
else 0,
"directories": {
"uploads": str(self.upload_dir),
"outputs": str(self.output_dir),
"temp": str(self.temp_dir),
},
}
class MemoryMonitor:
"""Monitors memory usage and triggers cleanup if needed"""
def __init__(self, max_memory_percent: float = 80.0):
self.max_memory_percent = max_memory_percent
self._high_memory_callbacks = []
def get_memory_usage(self) -> dict:
"""Get current memory usage"""
try:
import psutil
process = psutil.Process()
memory_info = process.memory_info()
system_memory = psutil.virtual_memory()
return {
"process_rss_mb": round(memory_info.rss / (1024 * 1024), 2),
"process_vms_mb": round(memory_info.vms / (1024 * 1024), 2),
"system_total_gb": round(system_memory.total / (1024 * 1024 * 1024), 2),
"system_available_gb": round(
system_memory.available / (1024 * 1024 * 1024), 2
),
"system_percent": system_memory.percent,
}
except ImportError:
return {"error": "psutil not installed"}
except Exception as e:
return {"error": str(e)}
def check_memory(self) -> bool:
"""Check if memory usage is within limits"""
usage = self.get_memory_usage()
if "error" in usage:
return True # Can't check, assume OK
return usage.get("system_percent", 0) < self.max_memory_percent
def on_high_memory(self, callback):
"""Register callback for high memory situations"""
self._high_memory_callbacks.append(callback)
class HealthChecker:
"""Comprehensive health checking for the application"""
def __init__(
self, cleanup_manager: FileCleanupManager, memory_monitor: MemoryMonitor
):
self.cleanup_manager = cleanup_manager
self.memory_monitor = memory_monitor
self.start_time = datetime.now()
self._translation_count = 0
self._error_count = 0
self._lock = threading.Lock()
def record_translation(self, success: bool = True):
"""Record a translation attempt"""
with self._lock:
self._translation_count += 1
if not success:
self._error_count += 1
async def check_health(self) -> dict:
"""Get comprehensive health status (async version)"""
return self.get_health()
def get_health(self) -> dict:
"""Get comprehensive health status"""
memory = self.memory_monitor.get_memory_usage()
disk = self.cleanup_manager.get_disk_usage()
# Determine overall status
status = "healthy"
issues = []
if "error" not in memory:
if memory.get("system_percent", 0) > 90:
status = "degraded"
issues.append("High memory usage")
elif memory.get("system_percent", 0) > 80:
issues.append("Memory usage elevated")
if disk.get("usage_percent", 0) > 90:
status = "degraded"
issues.append("High disk usage")
elif disk.get("usage_percent", 0) > 80:
issues.append("Disk usage elevated")
uptime = datetime.now() - self.start_time
return {
"status": status,
"issues": issues,
"uptime_seconds": int(uptime.total_seconds()),
"uptime_human": str(uptime).split(".")[0],
"translations": {
"total": self._translation_count,
"errors": self._error_count,
"success_rate": round(
(
(self._translation_count - self._error_count)
/ self._translation_count
* 100
)
if self._translation_count > 0
else 100,
1,
),
},
"memory": memory,
"disk": disk,
"cleanup_service": self.cleanup_manager.get_stats(),
"timestamp": datetime.now().isoformat(),
}
# Create default instances
def create_cleanup_manager(config) -> FileCleanupManager:
"""Create cleanup manager with config"""
return FileCleanupManager(
upload_dir=config.UPLOAD_DIR,
output_dir=config.OUTPUT_DIR,
temp_dir=config.TEMP_DIR,
max_file_age_minutes=getattr(config, "FILE_TTL_MINUTES", 60),
cleanup_interval_minutes=getattr(config, "CLEANUP_INTERVAL_MINUTES", 5),
max_total_size_gb=getattr(config, "MAX_TOTAL_SIZE_GB", 10.0),
)