401 lines
14 KiB
Python
401 lines
14 KiB
Python
"""
|
|
Cleanup and Resource Management for SaaS robustness
|
|
Automatic cleanup of temporary files and resources
|
|
"""
|
|
import os
|
|
import time
|
|
import asyncio
|
|
import threading
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Set
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FileCleanupManager:
|
|
"""Manages automatic cleanup of temporary and output files"""
|
|
|
|
def __init__(
|
|
self,
|
|
upload_dir: Path,
|
|
output_dir: Path,
|
|
temp_dir: Path,
|
|
max_file_age_hours: int = 1,
|
|
cleanup_interval_minutes: int = 10,
|
|
max_total_size_gb: float = 10.0
|
|
):
|
|
self.upload_dir = Path(upload_dir)
|
|
self.output_dir = Path(output_dir)
|
|
self.temp_dir = Path(temp_dir)
|
|
self.max_file_age_seconds = max_file_age_hours * 3600
|
|
self.cleanup_interval = cleanup_interval_minutes * 60
|
|
self.max_total_size_bytes = int(max_total_size_gb * 1024 * 1024 * 1024)
|
|
|
|
self._running = False
|
|
self._task: Optional[asyncio.Task] = None
|
|
self._protected_files: Set[str] = set()
|
|
self._tracked_files: dict = {} # filepath -> {created, ttl_minutes}
|
|
self._lock = threading.Lock()
|
|
self._stats = {
|
|
"files_cleaned": 0,
|
|
"bytes_freed": 0,
|
|
"cleanup_runs": 0
|
|
}
|
|
|
|
async def track_file(self, filepath: Path, ttl_minutes: int = 60):
|
|
"""Track a file for automatic cleanup after TTL expires"""
|
|
with self._lock:
|
|
self._tracked_files[str(filepath)] = {
|
|
"created": time.time(),
|
|
"ttl_minutes": ttl_minutes,
|
|
"expires_at": time.time() + (ttl_minutes * 60)
|
|
}
|
|
|
|
def get_tracked_files(self) -> list:
|
|
"""Get list of currently tracked files with their status"""
|
|
now = time.time()
|
|
result = []
|
|
|
|
with self._lock:
|
|
for filepath, info in self._tracked_files.items():
|
|
remaining = info["expires_at"] - now
|
|
result.append({
|
|
"path": filepath,
|
|
"exists": Path(filepath).exists(),
|
|
"expires_in_seconds": max(0, int(remaining)),
|
|
"ttl_minutes": info["ttl_minutes"]
|
|
})
|
|
|
|
return result
|
|
|
|
async def cleanup_expired(self) -> int:
|
|
"""Cleanup expired tracked files"""
|
|
now = time.time()
|
|
cleaned = 0
|
|
to_remove = []
|
|
|
|
with self._lock:
|
|
for filepath, info in list(self._tracked_files.items()):
|
|
if now > info["expires_at"]:
|
|
to_remove.append(filepath)
|
|
|
|
for filepath in to_remove:
|
|
try:
|
|
path = Path(filepath)
|
|
if path.exists() and not self.is_protected(path):
|
|
size = path.stat().st_size
|
|
path.unlink()
|
|
cleaned += 1
|
|
self._stats["files_cleaned"] += 1
|
|
self._stats["bytes_freed"] += size
|
|
logger.info(f"Cleaned expired file: {filepath}")
|
|
|
|
with self._lock:
|
|
self._tracked_files.pop(filepath, None)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to clean expired file {filepath}: {e}")
|
|
|
|
return cleaned
|
|
|
|
def get_stats(self) -> dict:
|
|
"""Get cleanup statistics"""
|
|
disk_usage = self.get_disk_usage()
|
|
|
|
with self._lock:
|
|
tracked_count = len(self._tracked_files)
|
|
|
|
return {
|
|
"files_cleaned_total": self._stats["files_cleaned"],
|
|
"bytes_freed_total_mb": round(self._stats["bytes_freed"] / (1024 * 1024), 2),
|
|
"cleanup_runs": self._stats["cleanup_runs"],
|
|
"tracked_files": tracked_count,
|
|
"disk_usage": disk_usage,
|
|
"is_running": self._running
|
|
}
|
|
|
|
def protect_file(self, filepath: Path):
|
|
"""Mark a file as protected (being processed)"""
|
|
with self._lock:
|
|
self._protected_files.add(str(filepath))
|
|
|
|
def unprotect_file(self, filepath: Path):
|
|
"""Remove protection from a file"""
|
|
with self._lock:
|
|
self._protected_files.discard(str(filepath))
|
|
|
|
def is_protected(self, filepath: Path) -> bool:
|
|
"""Check if a file is protected"""
|
|
with self._lock:
|
|
return str(filepath) in self._protected_files
|
|
|
|
async def start(self):
|
|
"""Start the cleanup background task"""
|
|
if self._running:
|
|
return
|
|
|
|
self._running = True
|
|
self._task = asyncio.create_task(self._cleanup_loop())
|
|
logger.info("File cleanup manager started")
|
|
|
|
async def stop(self):
|
|
"""Stop the cleanup background task"""
|
|
self._running = False
|
|
if self._task:
|
|
self._task.cancel()
|
|
try:
|
|
await self._task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
logger.info("File cleanup manager stopped")
|
|
|
|
async def _cleanup_loop(self):
|
|
"""Background loop for periodic cleanup"""
|
|
while self._running:
|
|
try:
|
|
await self.cleanup()
|
|
await self.cleanup_expired()
|
|
self._stats["cleanup_runs"] += 1
|
|
except Exception as e:
|
|
logger.error(f"Cleanup error: {e}")
|
|
|
|
await asyncio.sleep(self.cleanup_interval)
|
|
|
|
async def cleanup(self) -> dict:
|
|
"""Perform cleanup of old files"""
|
|
stats = {
|
|
"files_deleted": 0,
|
|
"bytes_freed": 0,
|
|
"errors": []
|
|
}
|
|
|
|
now = time.time()
|
|
|
|
# Cleanup each directory
|
|
for directory in [self.upload_dir, self.output_dir, self.temp_dir]:
|
|
if not directory.exists():
|
|
continue
|
|
|
|
for filepath in directory.iterdir():
|
|
if not filepath.is_file():
|
|
continue
|
|
|
|
# Skip protected files
|
|
if self.is_protected(filepath):
|
|
continue
|
|
|
|
try:
|
|
# Check file age
|
|
file_age = now - filepath.stat().st_mtime
|
|
|
|
if file_age > self.max_file_age_seconds:
|
|
file_size = filepath.stat().st_size
|
|
filepath.unlink()
|
|
stats["files_deleted"] += 1
|
|
stats["bytes_freed"] += file_size
|
|
logger.debug(f"Deleted old file: {filepath}")
|
|
|
|
except Exception as e:
|
|
stats["errors"].append(str(e))
|
|
logger.warning(f"Failed to delete {filepath}: {e}")
|
|
|
|
# Force cleanup if total size exceeds limit
|
|
await self._enforce_size_limit(stats)
|
|
|
|
if stats["files_deleted"] > 0:
|
|
mb_freed = stats["bytes_freed"] / (1024 * 1024)
|
|
logger.info(f"Cleanup: deleted {stats['files_deleted']} files, freed {mb_freed:.2f}MB")
|
|
|
|
return stats
|
|
|
|
async def _enforce_size_limit(self, stats: dict):
|
|
"""Delete oldest files if total size exceeds limit"""
|
|
files_with_mtime = []
|
|
total_size = 0
|
|
|
|
for directory in [self.upload_dir, self.output_dir, self.temp_dir]:
|
|
if not directory.exists():
|
|
continue
|
|
|
|
for filepath in directory.iterdir():
|
|
if not filepath.is_file() or self.is_protected(filepath):
|
|
continue
|
|
|
|
try:
|
|
stat = filepath.stat()
|
|
files_with_mtime.append((filepath, stat.st_mtime, stat.st_size))
|
|
total_size += stat.st_size
|
|
except Exception:
|
|
pass
|
|
|
|
# If under limit, nothing to do
|
|
if total_size <= self.max_total_size_bytes:
|
|
return
|
|
|
|
# Sort by modification time (oldest first)
|
|
files_with_mtime.sort(key=lambda x: x[1])
|
|
|
|
# Delete oldest files until under limit
|
|
for filepath, _, size in files_with_mtime:
|
|
if total_size <= self.max_total_size_bytes:
|
|
break
|
|
|
|
try:
|
|
filepath.unlink()
|
|
total_size -= size
|
|
stats["files_deleted"] += 1
|
|
stats["bytes_freed"] += size
|
|
logger.info(f"Deleted file to free space: {filepath}")
|
|
except Exception as e:
|
|
stats["errors"].append(str(e))
|
|
|
|
def get_disk_usage(self) -> dict:
|
|
"""Get current disk usage statistics"""
|
|
total_files = 0
|
|
total_size = 0
|
|
|
|
for directory in [self.upload_dir, self.output_dir, self.temp_dir]:
|
|
if not directory.exists():
|
|
continue
|
|
|
|
for filepath in directory.iterdir():
|
|
if filepath.is_file():
|
|
total_files += 1
|
|
try:
|
|
total_size += filepath.stat().st_size
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"total_files": total_files,
|
|
"total_size_mb": round(total_size / (1024 * 1024), 2),
|
|
"max_size_gb": self.max_total_size_bytes / (1024 * 1024 * 1024),
|
|
"usage_percent": round((total_size / self.max_total_size_bytes) * 100, 1) if self.max_total_size_bytes > 0 else 0,
|
|
"directories": {
|
|
"uploads": str(self.upload_dir),
|
|
"outputs": str(self.output_dir),
|
|
"temp": str(self.temp_dir)
|
|
}
|
|
}
|
|
|
|
|
|
class MemoryMonitor:
|
|
"""Monitors memory usage and triggers cleanup if needed"""
|
|
|
|
def __init__(self, max_memory_percent: float = 80.0):
|
|
self.max_memory_percent = max_memory_percent
|
|
self._high_memory_callbacks = []
|
|
|
|
def get_memory_usage(self) -> dict:
|
|
"""Get current memory usage"""
|
|
try:
|
|
import psutil
|
|
process = psutil.Process()
|
|
memory_info = process.memory_info()
|
|
system_memory = psutil.virtual_memory()
|
|
|
|
return {
|
|
"process_rss_mb": round(memory_info.rss / (1024 * 1024), 2),
|
|
"process_vms_mb": round(memory_info.vms / (1024 * 1024), 2),
|
|
"system_total_gb": round(system_memory.total / (1024 * 1024 * 1024), 2),
|
|
"system_available_gb": round(system_memory.available / (1024 * 1024 * 1024), 2),
|
|
"system_percent": system_memory.percent
|
|
}
|
|
except ImportError:
|
|
return {"error": "psutil not installed"}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
def check_memory(self) -> bool:
|
|
"""Check if memory usage is within limits"""
|
|
usage = self.get_memory_usage()
|
|
if "error" in usage:
|
|
return True # Can't check, assume OK
|
|
|
|
return usage.get("system_percent", 0) < self.max_memory_percent
|
|
|
|
def on_high_memory(self, callback):
|
|
"""Register callback for high memory situations"""
|
|
self._high_memory_callbacks.append(callback)
|
|
|
|
|
|
class HealthChecker:
|
|
"""Comprehensive health checking for the application"""
|
|
|
|
def __init__(self, cleanup_manager: FileCleanupManager, memory_monitor: MemoryMonitor):
|
|
self.cleanup_manager = cleanup_manager
|
|
self.memory_monitor = memory_monitor
|
|
self.start_time = datetime.now()
|
|
self._translation_count = 0
|
|
self._error_count = 0
|
|
self._lock = threading.Lock()
|
|
|
|
def record_translation(self, success: bool = True):
|
|
"""Record a translation attempt"""
|
|
with self._lock:
|
|
self._translation_count += 1
|
|
if not success:
|
|
self._error_count += 1
|
|
|
|
async def check_health(self) -> dict:
|
|
"""Get comprehensive health status (async version)"""
|
|
return self.get_health()
|
|
|
|
def get_health(self) -> dict:
|
|
"""Get comprehensive health status"""
|
|
memory = self.memory_monitor.get_memory_usage()
|
|
disk = self.cleanup_manager.get_disk_usage()
|
|
|
|
# Determine overall status
|
|
status = "healthy"
|
|
issues = []
|
|
|
|
if "error" not in memory:
|
|
if memory.get("system_percent", 0) > 90:
|
|
status = "degraded"
|
|
issues.append("High memory usage")
|
|
elif memory.get("system_percent", 0) > 80:
|
|
issues.append("Memory usage elevated")
|
|
|
|
if disk.get("usage_percent", 0) > 90:
|
|
status = "degraded"
|
|
issues.append("High disk usage")
|
|
elif disk.get("usage_percent", 0) > 80:
|
|
issues.append("Disk usage elevated")
|
|
|
|
uptime = datetime.now() - self.start_time
|
|
|
|
return {
|
|
"status": status,
|
|
"issues": issues,
|
|
"uptime_seconds": int(uptime.total_seconds()),
|
|
"uptime_human": str(uptime).split('.')[0],
|
|
"translations": {
|
|
"total": self._translation_count,
|
|
"errors": self._error_count,
|
|
"success_rate": round(
|
|
((self._translation_count - self._error_count) / self._translation_count * 100)
|
|
if self._translation_count > 0 else 100, 1
|
|
)
|
|
},
|
|
"memory": memory,
|
|
"disk": disk,
|
|
"cleanup_service": self.cleanup_manager.get_stats(),
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
|
|
# Create default instances
|
|
def create_cleanup_manager(config) -> FileCleanupManager:
|
|
"""Create cleanup manager with config"""
|
|
return FileCleanupManager(
|
|
upload_dir=config.UPLOAD_DIR,
|
|
output_dir=config.OUTPUT_DIR,
|
|
temp_dir=config.TEMP_DIR,
|
|
max_file_age_hours=getattr(config, 'MAX_FILE_AGE_HOURS', 1),
|
|
cleanup_interval_minutes=getattr(config, 'CLEANUP_INTERVAL_MINUTES', 10),
|
|
max_total_size_gb=getattr(config, 'MAX_TOTAL_SIZE_GB', 10.0)
|
|
)
|