401 lines
14 KiB
Python

"""
Cleanup and Resource Management for SaaS robustness
Automatic cleanup of temporary files and resources
"""
import os
import time
import asyncio
import threading
from pathlib import Path
from datetime import datetime, timedelta
from typing import Optional, Set
import logging
logger = logging.getLogger(__name__)
class FileCleanupManager:
"""Manages automatic cleanup of temporary and output files"""
def __init__(
self,
upload_dir: Path,
output_dir: Path,
temp_dir: Path,
max_file_age_hours: int = 1,
cleanup_interval_minutes: int = 10,
max_total_size_gb: float = 10.0
):
self.upload_dir = Path(upload_dir)
self.output_dir = Path(output_dir)
self.temp_dir = Path(temp_dir)
self.max_file_age_seconds = max_file_age_hours * 3600
self.cleanup_interval = cleanup_interval_minutes * 60
self.max_total_size_bytes = int(max_total_size_gb * 1024 * 1024 * 1024)
self._running = False
self._task: Optional[asyncio.Task] = None
self._protected_files: Set[str] = set()
self._tracked_files: dict = {} # filepath -> {created, ttl_minutes}
self._lock = threading.Lock()
self._stats = {
"files_cleaned": 0,
"bytes_freed": 0,
"cleanup_runs": 0
}
async def track_file(self, filepath: Path, ttl_minutes: int = 60):
"""Track a file for automatic cleanup after TTL expires"""
with self._lock:
self._tracked_files[str(filepath)] = {
"created": time.time(),
"ttl_minutes": ttl_minutes,
"expires_at": time.time() + (ttl_minutes * 60)
}
def get_tracked_files(self) -> list:
"""Get list of currently tracked files with their status"""
now = time.time()
result = []
with self._lock:
for filepath, info in self._tracked_files.items():
remaining = info["expires_at"] - now
result.append({
"path": filepath,
"exists": Path(filepath).exists(),
"expires_in_seconds": max(0, int(remaining)),
"ttl_minutes": info["ttl_minutes"]
})
return result
async def cleanup_expired(self) -> int:
"""Cleanup expired tracked files"""
now = time.time()
cleaned = 0
to_remove = []
with self._lock:
for filepath, info in list(self._tracked_files.items()):
if now > info["expires_at"]:
to_remove.append(filepath)
for filepath in to_remove:
try:
path = Path(filepath)
if path.exists() and not self.is_protected(path):
size = path.stat().st_size
path.unlink()
cleaned += 1
self._stats["files_cleaned"] += 1
self._stats["bytes_freed"] += size
logger.info(f"Cleaned expired file: {filepath}")
with self._lock:
self._tracked_files.pop(filepath, None)
except Exception as e:
logger.warning(f"Failed to clean expired file {filepath}: {e}")
return cleaned
def get_stats(self) -> dict:
"""Get cleanup statistics"""
disk_usage = self.get_disk_usage()
with self._lock:
tracked_count = len(self._tracked_files)
return {
"files_cleaned_total": self._stats["files_cleaned"],
"bytes_freed_total_mb": round(self._stats["bytes_freed"] / (1024 * 1024), 2),
"cleanup_runs": self._stats["cleanup_runs"],
"tracked_files": tracked_count,
"disk_usage": disk_usage,
"is_running": self._running
}
def protect_file(self, filepath: Path):
"""Mark a file as protected (being processed)"""
with self._lock:
self._protected_files.add(str(filepath))
def unprotect_file(self, filepath: Path):
"""Remove protection from a file"""
with self._lock:
self._protected_files.discard(str(filepath))
def is_protected(self, filepath: Path) -> bool:
"""Check if a file is protected"""
with self._lock:
return str(filepath) in self._protected_files
async def start(self):
"""Start the cleanup background task"""
if self._running:
return
self._running = True
self._task = asyncio.create_task(self._cleanup_loop())
logger.info("File cleanup manager started")
async def stop(self):
"""Stop the cleanup background task"""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("File cleanup manager stopped")
async def _cleanup_loop(self):
"""Background loop for periodic cleanup"""
while self._running:
try:
await self.cleanup()
await self.cleanup_expired()
self._stats["cleanup_runs"] += 1
except Exception as e:
logger.error(f"Cleanup error: {e}")
await asyncio.sleep(self.cleanup_interval)
async def cleanup(self) -> dict:
"""Perform cleanup of old files"""
stats = {
"files_deleted": 0,
"bytes_freed": 0,
"errors": []
}
now = time.time()
# Cleanup each directory
for directory in [self.upload_dir, self.output_dir, self.temp_dir]:
if not directory.exists():
continue
for filepath in directory.iterdir():
if not filepath.is_file():
continue
# Skip protected files
if self.is_protected(filepath):
continue
try:
# Check file age
file_age = now - filepath.stat().st_mtime
if file_age > self.max_file_age_seconds:
file_size = filepath.stat().st_size
filepath.unlink()
stats["files_deleted"] += 1
stats["bytes_freed"] += file_size
logger.debug(f"Deleted old file: {filepath}")
except Exception as e:
stats["errors"].append(str(e))
logger.warning(f"Failed to delete {filepath}: {e}")
# Force cleanup if total size exceeds limit
await self._enforce_size_limit(stats)
if stats["files_deleted"] > 0:
mb_freed = stats["bytes_freed"] / (1024 * 1024)
logger.info(f"Cleanup: deleted {stats['files_deleted']} files, freed {mb_freed:.2f}MB")
return stats
async def _enforce_size_limit(self, stats: dict):
"""Delete oldest files if total size exceeds limit"""
files_with_mtime = []
total_size = 0
for directory in [self.upload_dir, self.output_dir, self.temp_dir]:
if not directory.exists():
continue
for filepath in directory.iterdir():
if not filepath.is_file() or self.is_protected(filepath):
continue
try:
stat = filepath.stat()
files_with_mtime.append((filepath, stat.st_mtime, stat.st_size))
total_size += stat.st_size
except Exception:
pass
# If under limit, nothing to do
if total_size <= self.max_total_size_bytes:
return
# Sort by modification time (oldest first)
files_with_mtime.sort(key=lambda x: x[1])
# Delete oldest files until under limit
for filepath, _, size in files_with_mtime:
if total_size <= self.max_total_size_bytes:
break
try:
filepath.unlink()
total_size -= size
stats["files_deleted"] += 1
stats["bytes_freed"] += size
logger.info(f"Deleted file to free space: {filepath}")
except Exception as e:
stats["errors"].append(str(e))
def get_disk_usage(self) -> dict:
"""Get current disk usage statistics"""
total_files = 0
total_size = 0
for directory in [self.upload_dir, self.output_dir, self.temp_dir]:
if not directory.exists():
continue
for filepath in directory.iterdir():
if filepath.is_file():
total_files += 1
try:
total_size += filepath.stat().st_size
except Exception:
pass
return {
"total_files": total_files,
"total_size_mb": round(total_size / (1024 * 1024), 2),
"max_size_gb": self.max_total_size_bytes / (1024 * 1024 * 1024),
"usage_percent": round((total_size / self.max_total_size_bytes) * 100, 1) if self.max_total_size_bytes > 0 else 0,
"directories": {
"uploads": str(self.upload_dir),
"outputs": str(self.output_dir),
"temp": str(self.temp_dir)
}
}
class MemoryMonitor:
"""Monitors memory usage and triggers cleanup if needed"""
def __init__(self, max_memory_percent: float = 80.0):
self.max_memory_percent = max_memory_percent
self._high_memory_callbacks = []
def get_memory_usage(self) -> dict:
"""Get current memory usage"""
try:
import psutil
process = psutil.Process()
memory_info = process.memory_info()
system_memory = psutil.virtual_memory()
return {
"process_rss_mb": round(memory_info.rss / (1024 * 1024), 2),
"process_vms_mb": round(memory_info.vms / (1024 * 1024), 2),
"system_total_gb": round(system_memory.total / (1024 * 1024 * 1024), 2),
"system_available_gb": round(system_memory.available / (1024 * 1024 * 1024), 2),
"system_percent": system_memory.percent
}
except ImportError:
return {"error": "psutil not installed"}
except Exception as e:
return {"error": str(e)}
def check_memory(self) -> bool:
"""Check if memory usage is within limits"""
usage = self.get_memory_usage()
if "error" in usage:
return True # Can't check, assume OK
return usage.get("system_percent", 0) < self.max_memory_percent
def on_high_memory(self, callback):
"""Register callback for high memory situations"""
self._high_memory_callbacks.append(callback)
class HealthChecker:
"""Comprehensive health checking for the application"""
def __init__(self, cleanup_manager: FileCleanupManager, memory_monitor: MemoryMonitor):
self.cleanup_manager = cleanup_manager
self.memory_monitor = memory_monitor
self.start_time = datetime.now()
self._translation_count = 0
self._error_count = 0
self._lock = threading.Lock()
def record_translation(self, success: bool = True):
"""Record a translation attempt"""
with self._lock:
self._translation_count += 1
if not success:
self._error_count += 1
async def check_health(self) -> dict:
"""Get comprehensive health status (async version)"""
return self.get_health()
def get_health(self) -> dict:
"""Get comprehensive health status"""
memory = self.memory_monitor.get_memory_usage()
disk = self.cleanup_manager.get_disk_usage()
# Determine overall status
status = "healthy"
issues = []
if "error" not in memory:
if memory.get("system_percent", 0) > 90:
status = "degraded"
issues.append("High memory usage")
elif memory.get("system_percent", 0) > 80:
issues.append("Memory usage elevated")
if disk.get("usage_percent", 0) > 90:
status = "degraded"
issues.append("High disk usage")
elif disk.get("usage_percent", 0) > 80:
issues.append("Disk usage elevated")
uptime = datetime.now() - self.start_time
return {
"status": status,
"issues": issues,
"uptime_seconds": int(uptime.total_seconds()),
"uptime_human": str(uptime).split('.')[0],
"translations": {
"total": self._translation_count,
"errors": self._error_count,
"success_rate": round(
((self._translation_count - self._error_count) / self._translation_count * 100)
if self._translation_count > 0 else 100, 1
)
},
"memory": memory,
"disk": disk,
"cleanup_service": self.cleanup_manager.get_stats(),
"timestamp": datetime.now().isoformat()
}
# Create default instances
def create_cleanup_manager(config) -> FileCleanupManager:
"""Create cleanup manager with config"""
return FileCleanupManager(
upload_dir=config.UPLOAD_DIR,
output_dir=config.OUTPUT_DIR,
temp_dir=config.TEMP_DIR,
max_file_age_hours=getattr(config, 'MAX_FILE_AGE_HOURS', 1),
cleanup_interval_minutes=getattr(config, 'CLEANUP_INTERVAL_MINUTES', 10),
max_total_size_gb=getattr(config, 'MAX_TOTAL_SIZE_GB', 10.0)
)