""" Utility functions for file handling and validation """ import os import uuid import hashlib from pathlib import Path from typing import Optional from fastapi import UploadFile, HTTPException from config import config class FileHandler: """Handles file operations for the translation API""" @staticmethod def calculate_sha256(file_path: Path) -> Optional[str]: """ Calculate the SHA256 hash of a file Args: file_path: Path to the file Returns: SHA256 hash string or None if error """ try: if not file_path.exists(): return None sha256_hash = hashlib.sha256() with open(file_path, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) return sha256_hash.hexdigest() except Exception as e: import logging logging.getLogger(__name__).error( f"SHA256 calculation failed for {file_path}: {e}" ) return None @staticmethod def validate_file_extension(filename: str) -> str: """ Validate that the file extension is supported Args: filename: Name of the file Returns: File extension (lowercase, with dot) Raises: HTTPException: If file extension is not supported """ file_extension = Path(filename).suffix.lower() if file_extension not in config.SUPPORTED_EXTENSIONS: raise HTTPException( status_code=400, detail=f"Unsupported file type. Supported types: {', '.join(config.SUPPORTED_EXTENSIONS)}", ) return file_extension @staticmethod def validate_file_size(file: UploadFile) -> None: """ Validate that the file size is within limits Args: file: Uploaded file Raises: HTTPException: If file is too large """ # Get file size file.file.seek(0, 2) # Move to end of file file_size = file.file.tell() # Get position (file size) file.file.seek(0) # Reset to beginning if file_size > config.MAX_FILE_SIZE_BYTES: raise HTTPException( status_code=400, detail=f"File too large. Maximum size: {config.MAX_FILE_SIZE_MB}MB", ) @staticmethod async def save_upload_file(file: UploadFile, destination: Path, chunk_size: int = 65536) -> Path: """ Save an uploaded file to disk using chunked streaming to avoid loading the entire file into memory at once. Args: file: Uploaded file destination: Path to save the file chunk_size: Read/write chunk size in bytes (default 64KB) Returns: Path to the saved file """ destination.parent.mkdir(parents=True, exist_ok=True) with open(destination, "wb") as buffer: while True: chunk = await file.read(chunk_size) if not chunk: break buffer.write(chunk) return destination @staticmethod def generate_unique_filename(original_filename: str, prefix: str = "") -> str: """ Generate a unique filename to avoid collisions Args: original_filename: Original filename prefix: Optional prefix for the filename Returns: Unique filename """ file_path = Path(original_filename) unique_id = str(uuid.uuid4())[:8] if prefix: return f"{prefix}_{unique_id}_{file_path.stem}{file_path.suffix}" else: return f"{unique_id}_{file_path.stem}{file_path.suffix}" @staticmethod def cleanup_file(file_path: Path) -> None: """ Delete a file if it exists Args: file_path: Path to the file to delete """ import logging _logger = logging.getLogger(__name__) try: if file_path.exists(): file_path.unlink() _logger.debug(f"Deleted file: {file_path}") except Exception as e: _logger.warning(f"Error deleting file {file_path}: {e}") @staticmethod def get_file_info(file_path: Path) -> dict: """ Get information about a file Args: file_path: Path to the file Returns: Dictionary with file information """ if not file_path.exists(): return {} stat = file_path.stat() return { "filename": file_path.name, "size_bytes": stat.st_size, "size_mb": round(stat.st_size / (1024 * 1024), 2), "sha256": FileHandler.calculate_sha256(file_path), "extension": file_path.suffix, "created": stat.st_ctime, "modified": stat.st_mtime, } # Global file handler instance file_handler = FileHandler()