Files
office_translator/middleware/validation.py
sepehr b9446f166d
All checks were successful
Deploy to Production / Build and Deploy (push) Successful in 2m52s
fix(translate): French error messages and update mock users for quota checks
2026-06-14 19:20:44 +02:00

743 lines
22 KiB
Python

"""
Input Validation Module for SaaS robustness
Validates all user inputs before processing
"""
import re
import magic
import ipaddress
import socket
from pathlib import Path
from urllib.parse import urlparse
from typing import Optional, List, Set, Tuple
from fastapi import UploadFile, HTTPException
import logging
logger = logging.getLogger(__name__)
class ValidationError(Exception):
"""Custom validation error with user-friendly messages"""
def __init__(
self,
message: str,
code: str = "validation_error",
details: Optional[dict] = None,
):
self.message = message
self.code = code
self.details = details or {}
super().__init__(message)
class ValidationResult:
"""Result of a validation check"""
def __init__(
self,
is_valid: bool = True,
errors: Optional[List[str]] = None,
warnings: Optional[List[str]] = None,
data: Optional[dict] = None,
error_code: Optional[str] = None,
):
self.is_valid = is_valid
self.errors = errors or []
self.warnings = warnings or []
self.data = data or {}
self.error_code = error_code
class FileValidator:
"""Validates uploaded files for security and compatibility"""
# Allowed MIME types mapped to extensions
ALLOWED_MIME_TYPES = {
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
"application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx",
"application/pdf": ".pdf",
}
# Magic bytes for Office Open XML files (ZIP format)
OFFICE_MAGIC_BYTES = b"PK\x03\x04"
PDF_MAGIC_BYTES = b"%PDF"
def __init__(
self,
max_size_mb: int = 50,
allowed_extensions: Optional[Set[str]] = None,
scan_content: bool = True,
):
self.max_size_bytes = max_size_mb * 1024 * 1024
self.max_size_mb = max_size_mb
self.allowed_extensions = allowed_extensions or {".xlsx", ".docx", ".pptx", ".pdf"}
self.scan_content = scan_content
async def validate_async(self, file: UploadFile) -> ValidationResult:
"""
Validate an uploaded file asynchronously
Returns ValidationResult with is_valid, errors, warnings
"""
errors = []
warnings = []
data = {}
try:
# Validate filename
if not file.filename:
errors.append("Le nom de fichier est requis")
return ValidationResult(
is_valid=False, errors=errors, error_code="missing_filename"
)
# Sanitize filename
try:
safe_filename = self._sanitize_filename(file.filename)
data["safe_filename"] = safe_filename
except ValidationError as e:
errors.append(str(e.message))
return ValidationResult(
is_valid=False, errors=errors, error_code=e.code
)
# Validate extension
try:
extension = self._validate_extension(safe_filename)
data["extension"] = extension
except ValidationError as e:
errors.append(str(e.message))
return ValidationResult(
is_valid=False, errors=errors, error_code=e.code
)
# Read file content for validation
content = await file.read()
await file.seek(0) # Reset for later processing
# Validate file size
file_size = len(content)
data["size_bytes"] = file_size
data["size_mb"] = round(file_size / (1024 * 1024), 2)
if file_size > self.max_size_bytes:
errors.append(
f"Fichier trop volumineux. La taille maximale est de {self.max_size_mb}Mo, "
f"vous avez envoye {file_size / (1024 * 1024):.1f}Mo"
)
return ValidationResult(
is_valid=False,
errors=errors,
data=data,
error_code="file_too_large",
)
if file_size == 0:
errors.append("Le fichier est vide")
return ValidationResult(
is_valid=False, errors=errors, data=data, error_code="empty_file"
)
# Warn about large files
if file_size > self.max_size_bytes * 0.8:
warnings.append(
f"Le fichier fait {data['size_mb']}Mo, approchant la limite de {self.max_size_mb}Mo"
)
# Validate magic bytes
if self.scan_content:
try:
self._validate_magic_bytes(content, extension)
except ValidationError as e:
errors.append(str(e.message))
return ValidationResult(
is_valid=False, errors=errors, data=data, error_code=e.code
)
# Validate MIME type
try:
mime_type = self._detect_mime_type(content)
data["mime_type"] = mime_type
self._validate_mime_type(mime_type, extension)
except ValidationError as e:
warnings.append(f"Avertissement MIME: {e.message}")
except Exception:
warnings.append("Impossible de verifier le type MIME")
data["original_filename"] = file.filename
return ValidationResult(
is_valid=True, errors=errors, warnings=warnings, data=data
)
except Exception as e:
logger.error(f"Validation error: {str(e)}")
errors.append(f"Erreur de validation: {str(e)}")
return ValidationResult(
is_valid=False,
errors=errors,
warnings=warnings,
data=data,
error_code="validation_error",
)
async def validate(self, file: UploadFile) -> dict:
"""
Validate an uploaded file
Returns validation info dict or raises ValidationError
"""
# Validate filename
if not file.filename:
raise ValidationError(
"Le nom de fichier est requis", code="missing_filename"
)
# Sanitize filename
safe_filename = self._sanitize_filename(file.filename)
# Validate extension
extension = self._validate_extension(safe_filename)
# Read file content for validation
content = await file.read()
await file.seek(0) # Reset for later processing
# Validate file size
file_size = len(content)
if file_size > self.max_size_bytes:
raise ValidationError(
f"Fichier trop volumineux. La taille maximale est de {self.max_size_mb}Mo, "
f"vous avez envoye {file_size / (1024 * 1024):.1f}Mo",
code="file_too_large",
details={
"max_mb": self.max_size_mb,
"actual_mb": round(file_size / (1024 * 1024), 2),
},
)
if file_size == 0:
raise ValidationError("Le fichier est vide", code="empty_file")
# Validate magic bytes (file signature)
if self.scan_content:
self._validate_magic_bytes(content, extension)
# Validate MIME type
mime_type = self._detect_mime_type(content)
self._validate_mime_type(mime_type, extension)
return {
"original_filename": file.filename,
"safe_filename": safe_filename,
"extension": extension,
"size_bytes": file_size,
"size_mb": round(file_size / (1024 * 1024), 2),
"mime_type": mime_type,
}
def _sanitize_filename(self, filename: str) -> str:
"""Sanitize filename to prevent path traversal and other attacks"""
# Remove path components
filename = Path(filename).name
# Remove null bytes and control characters
filename = re.sub(r"[\x00-\x1f\x7f-\x9f]", "", filename)
# Remove potentially dangerous characters
filename = re.sub(r'[<>:"/\\|?*]', "_", filename)
# Limit length
if len(filename) > 255:
name, ext = filename.rsplit(".", 1) if "." in filename else (filename, "")
filename = name[:250] + ("." + ext if ext else "")
# Ensure not empty after sanitization
if not filename or filename.strip() == "":
raise ValidationError("Nom de fichier invalide", code="invalid_filename")
return filename
def _validate_extension(self, filename: str) -> str:
"""Validate and return the file extension"""
if "." not in filename:
raise ValidationError(
f"Le fichier doit avoir une extension. Formats supportes : {', '.join(self.allowed_extensions)}",
code="missing_extension",
details={"allowed_extensions": list(self.allowed_extensions)},
)
extension = "." + filename.rsplit(".", 1)[1].lower()
if extension not in self.allowed_extensions:
raise ValidationError(
f"Format de fichier '{extension}' non supporte. Formats acceptes : {', '.join(self.allowed_extensions)}",
code="unsupported_file_type",
details={
"extension": extension,
"allowed_extensions": list(self.allowed_extensions),
},
)
return extension
def _validate_magic_bytes(self, content: bytes, extension: str):
"""Validate file magic bytes match expected format"""
# PDF files start with %PDF
if extension.lower() == ".pdf":
if not content.startswith(self.PDF_MAGIC_BYTES):
raise ValidationError(
"Le contenu du fichier ne correspond pas au format PDF attendu. "
"Le fichier est peut-être corrompu.",
code="invalid_file_content",
)
return
# Office files are ZIP-based
if not content.startswith(self.OFFICE_MAGIC_BYTES):
raise ValidationError(
"Le contenu du fichier ne correspond pas au format Office attendu. "
"Le fichier est peut-être corrompu ou n'est pas un document Office valide.",
code="invalid_file_content",
)
def _detect_mime_type(self, content: bytes) -> str:
"""Detect MIME type from file content"""
try:
mime = magic.Magic(mime=True)
return mime.from_buffer(content)
except Exception:
# Fallback to basic detection
if content.startswith(self.OFFICE_MAGIC_BYTES):
return "application/zip"
return "application/octet-stream"
def _validate_mime_type(self, mime_type: str, extension: str):
"""Validate MIME type matches extension"""
# Office Open XML files may be detected as ZIP
allowed_mimes = list(self.ALLOWED_MIME_TYPES.keys()) + [
"application/zip",
"application/octet-stream",
]
if mime_type not in allowed_mimes:
raise ValidationError(
f"Type de fichier invalide detecte. Document Office attendu, recu : {mime_type}",
code="invalid_mime_type",
details={"detected_mime": mime_type},
)
class LanguageValidator:
"""Validates language codes"""
SUPPORTED_LANGUAGES = {
# ISO 639-1 codes
"af",
"sq",
"am",
"ar",
"hy",
"az",
"eu",
"be",
"bn",
"bs",
"bg",
"ca",
"ceb",
"zh",
"zh-CN",
"zh-TW",
"co",
"hr",
"cs",
"da",
"nl",
"en",
"eo",
"et",
"fi",
"fr",
"fy",
"gl",
"ka",
"de",
"el",
"gu",
"ht",
"ha",
"haw",
"he",
"hi",
"hmn",
"hu",
"is",
"ig",
"id",
"ga",
"it",
"ja",
"jv",
"kn",
"kk",
"km",
"rw",
"ko",
"ku",
"ky",
"lo",
"la",
"lv",
"lt",
"lb",
"mk",
"mg",
"ms",
"ml",
"mt",
"mi",
"mr",
"mn",
"my",
"ne",
"no",
"ny",
"or",
"ps",
"fa",
"pl",
"pt",
"pa",
"ro",
"ru",
"sm",
"gd",
"sr",
"st",
"sn",
"sd",
"si",
"sk",
"sl",
"so",
"es",
"su",
"sw",
"sv",
"tl",
"tg",
"ta",
"tt",
"te",
"th",
"tr",
"tk",
"uk",
"ur",
"ug",
"uz",
"vi",
"cy",
"xh",
"yi",
"yo",
"zu",
"auto",
}
LANGUAGE_NAMES = {
"en": "English",
"es": "Spanish",
"fr": "French",
"de": "German",
"it": "Italian",
"pt": "Portuguese",
"ru": "Russian",
"zh": "Chinese",
"zh-CN": "Chinese (Simplified)",
"zh-TW": "Chinese (Traditional)",
"ja": "Japanese",
"ko": "Korean",
"ar": "Arabic",
"hi": "Hindi",
"nl": "Dutch",
"pl": "Polish",
"tr": "Turkish",
"sv": "Swedish",
"da": "Danish",
"no": "Norwegian",
"fi": "Finnish",
"cs": "Czech",
"el": "Greek",
"th": "Thai",
"vi": "Vietnamese",
"id": "Indonesian",
"uk": "Ukrainian",
"ro": "Romanian",
"hu": "Hungarian",
"auto": "Auto-detect",
}
@classmethod
def validate(cls, language_code: str, field_name: str = "language") -> str:
"""Validate and normalize language code"""
if not language_code:
raise ValidationError(f"{field_name} est requis", code="missing_language")
# Normalize
normalized = language_code.strip().lower()
# Handle common variations
if normalized in ["chinese", "cn"]:
normalized = "zh-CN"
elif normalized in ["chinese-traditional", "tw"]:
normalized = "zh-TW"
if normalized not in cls.SUPPORTED_LANGUAGES:
raise ValidationError(
f"Code langue non supporte: '{language_code}'. Consultez /languages pour les codes supportes.",
code="unsupported_language",
details={"language": language_code},
)
return normalized
@classmethod
def get_language_name(cls, code: str) -> str:
"""Get human-readable language name"""
return cls.LANGUAGE_NAMES.get(code, code.upper())
class ProviderValidator:
"""Validates translation provider configuration"""
SUPPORTED_PROVIDERS = {
"google",
"google_cloud",
"deepl",
"openai",
"openrouter",
"openrouter_premium",
"deepseek",
"minimax",
"zai",
"classic",
"llm",
}
@classmethod
def validate(cls, provider: str, **kwargs) -> dict:
"""Validate provider and its required configuration"""
if not provider:
raise ValidationError(
"Le fournisseur de traduction est requis", code="missing_provider"
)
normalized = provider.strip().lower()
if normalized not in cls.SUPPORTED_PROVIDERS:
raise ValidationError(
f"Fournisseur non supporte: '{provider}'. Supportes: {', '.join(cls.SUPPORTED_PROVIDERS)}",
code="unsupported_provider",
details={
"provider": provider,
"supported": list(cls.SUPPORTED_PROVIDERS),
},
)
# Provider-specific validation
if normalized == "deepl":
if not kwargs.get("deepl_api_key"):
raise ValidationError(
"La cle API DeepL est requise pour utiliser le fournisseur DeepL",
code="missing_deepl_key",
)
elif normalized == "openai":
if not kwargs.get("openai_api_key"):
raise ValidationError(
"La cle API OpenAI est requise pour utiliser le fournisseur OpenAI",
code="missing_openai_key",
)
return {"provider": normalized, "validated": True}
class InputSanitizer:
"""Sanitizes user inputs to prevent injection attacks"""
@staticmethod
def sanitize_text(text: str, max_length: int = 10000) -> str:
"""Sanitize text input"""
if not text:
return ""
# Remove null bytes
text = text.replace("\x00", "")
# Limit length
if len(text) > max_length:
text = text[:max_length]
return text.strip()
@staticmethod
def sanitize_language_code(code: str) -> str:
"""Sanitize and normalize language code"""
if not code:
return "auto"
# Remove dangerous characters, keep only alphanumeric and hyphen
code = re.sub(r"[^a-zA-Z0-9\-]", "", code.strip())
# Limit length
if len(code) > 10:
code = code[:10]
return code.lower() if code else "auto"
@staticmethod
def sanitize_url(url: str) -> str:
"""Sanitize URL input"""
if not url:
return ""
url = url.strip()
# Basic URL validation
if not re.match(r"^https?://", url, re.IGNORECASE):
raise ValidationError(
"Format d'URL invalide. Doit commencer par http:// ou https://",
code="invalid_url",
)
# Remove trailing slashes
url = url.rstrip("/")
return url
@staticmethod
def sanitize_api_key(key: str) -> str:
"""Sanitize API key (just trim, no logging)"""
if not key:
return ""
return key.strip()
class WebhookURLValidator:
"""
Validator for webhook URLs with security checks.
Prevents SSRF attacks by blocking private IPs and localhost.
Story 3.7: Webhook - Spécification URL
"""
# Allowed URL schemes
ALLOWED_SCHEMES = ("http", "https")
# Blocked hostnames
BLOCKED_HOSTNAMES = {"localhost", "127.0.0.1", "::1", "0.0.0.0"}
def __init__(
self,
allowed_schemes: Tuple[str, ...] = ALLOWED_SCHEMES,
block_private_ips: bool = True
):
self.allowed_schemes = allowed_schemes
self.block_private_ips = block_private_ips
def validate(self, url: Optional[str]) -> Tuple[bool, Optional[str], Optional[dict]]:
"""
Validate webhook URL format and security.
Args:
url: The webhook URL to validate (can be None or empty for optional parameter)
Returns:
Tuple of (is_valid, error_message, details)
"""
# Empty or None URLs are valid (optional parameter)
if not url:
return True, None, None
try:
parsed = urlparse(url)
# Check scheme
if parsed.scheme.lower() not in self.allowed_schemes:
return False, (
f"L'URL doit utiliser {' ou '.join(self.allowed_schemes)}"
), {
"field": "webhook_url",
"allowed_schemes": list(self.allowed_schemes),
"detected_scheme": parsed.scheme or "none"
}
# Check for credentials in URL
if parsed.username or parsed.password:
return False, (
"L'URL ne doit pas contenir d'identifiants (credentials)"
), {"field": "webhook_url", "reason": "credentials_in_url"}
# Check hostname
hostname = parsed.hostname
if not hostname:
return False, (
"URL invalide: nom d'hôte manquant"
), {"field": "webhook_url", "reason": "missing_hostname"}
# Block localhost and common local addresses
if hostname.lower() in self.BLOCKED_HOSTNAMES:
return False, (
"Les URLs localhost ne sont pas autorisées"
), {"field": "webhook_url", "reason": "localhost_blocked"}
# Check for private IPs (SSRF protection)
if self.block_private_ips:
try:
# Try to parse as IP directly
try:
ip = ipaddress.ip_address(hostname)
if self._is_blocked_ip(ip):
return False, (
"Les adresses IP privées ne sont pas autorisées"
), {"field": "webhook_url", "reason": "private_ip_blocked"}
except ValueError:
# Not an IP, try DNS resolution
ip_str = socket.gethostbyname(hostname)
ip = ipaddress.ip_address(ip_str)
if self._is_blocked_ip(ip):
return False, (
"Les adresses IP privées ne sont pas autorisées"
), {"field": "webhook_url", "reason": "private_ip_blocked"}
except socket.gaierror:
# DNS resolution failed - let it through
# Will fail at webhook send time
pass
except Exception:
pass
return True, None, None
except Exception as e:
return False, (
f"Format d'URL invalide: {str(e)}"
), {"field": "webhook_url", "error": str(e)}
def _is_blocked_ip(self, ip: ipaddress.IPv4Address | ipaddress.IPv6Address) -> bool:
"""Check if IP is private, loopback, or link-local."""
return (
ip.is_private or
ip.is_loopback or
ip.is_link_local or
ip.is_reserved or
ip.is_multicast
)
# Default validators
file_validator = FileValidator()
webhook_validator = WebhookURLValidator()