Files
office_translator/services/storage_tracker.py
Sepehr Ramezani 26bd096a06 feat: production deployment - full update with providers, admin, glossaries, pricing, tests
Major changes across backend, frontend, infrastructure:
- Provider system with model selection (Google, DeepL, OpenAI, Ollama, Google Cloud)
- Admin panel: user management, pricing, settings
- Glossary system with CSV import/export
- Subscription and tier quota management
- Security hardening (rate limiting, API key auth, path traversal fixes)
- Docker compose for dev, prod, and IONOS deployment
- Alembic migrations for new tables
- Frontend: dashboard, pricing page, landing page, i18n (en/fr)
- Test suite and verification scripts

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-04-25 15:01:47 +02:00

124 lines
3.5 KiB
Python

import os
import json
import logging
from datetime import datetime, timezone
from typing import Optional, Any, Dict
from config import config
from core.logging import get_logger
logger = get_logger(__name__)
_HAS_STRUCTLOG = True
def _log_info(event: str, **kwargs):
"""Log info with structlog or standard logging compatibility."""
if _HAS_STRUCTLOG:
logger.info(event, **kwargs)
else:
msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items())
logger.info(msg)
def _log_error(event: str, **kwargs):
"""Log error with structlog or standard logging compatibility."""
if _HAS_STRUCTLOG:
logger.error(event, **kwargs)
else:
msg = f"{event} " + " ".join(f"{k}={v}" for k, v in kwargs.items())
logger.error(msg)
# Key pattern: translation:file:{job_id}
KEY_PREFIX = "translation:file"
def _get_default_ttl() -> int:
"""Get TTL from config or default to 60 minutes."""
try:
return config.FILE_TTL_MINUTES * 60
except Exception:
return 3600 # 60 minutes default
def _get_async_redis():
"""Return shared async Redis client from core.redis."""
from core.redis import get_async_redis
return get_async_redis()
class StorageTracker:
"""
Tracks file locations and metadata in Redis.
Pattern: translation:file:{job_id} -> JSON metadata
"""
def __init__(self):
self._redis = None
def _redis_client(self):
if self._redis is None:
self._redis = _get_async_redis()
return self._redis
async def track_file(
self, job_id: str, metadata: Dict[str, Any], ttl: Optional[int] = None
) -> bool:
"""
Store file metadata in Redis with TTL and log the upload.
"""
if ttl is None:
ttl = _get_default_ttl()
# Ensure timestamp is present
if "timestamp" not in metadata:
metadata["timestamp"] = datetime.now(timezone.utc).isoformat()
# Log metadata (no content)
_log_info(
"file_uploaded",
job_id=job_id,
original_filename=metadata.get("original_filename"),
file_size=metadata.get("file_size"),
file_hash=metadata.get("file_hash"),
user_id=metadata.get("user_id"),
timestamp=metadata.get("timestamp"),
)
redis_client = self._redis_client()
if not redis_client:
_log_error(
"redis_not_available", job_id=job_id, hint="File tracked in logs only"
)
return False
try:
key = f"{KEY_PREFIX}:{job_id}"
await redis_client.set(key, json.dumps(metadata), ex=ttl)
_log_info("file_tracked_in_redis", job_id=job_id, ttl_seconds=ttl)
return True
except Exception as e:
_log_error("redis_track_failed", job_id=job_id, error=str(e))
return False
async def get_file_metadata(self, job_id: str) -> Optional[Dict[str, Any]]:
"""
Retrieve file metadata from Redis.
"""
redis_client = self._redis_client()
if not redis_client:
return None
try:
key = f"{KEY_PREFIX}:{job_id}"
data = await redis_client.get(key)
return json.loads(data) if data else None
except Exception as e:
_log_error("redis_get_failed", job_id=job_id, error=str(e))
return None
# Singleton for app use
storage_tracker = StorageTracker()