""" Centralized Redis client for the application. Single shared async and sync clients; REDIS_URL from environment. Used by: rate limiting, tier quota, storage tracker, auth blocklist, admin sessions, health check. """ from __future__ import annotations import os import logging logger = logging.getLogger(__name__) _async_client = None _sync_client = None # Sentinel: False = no REDIS_URL or connection failed; None = not yet tried _async_attempted = None _sync_attempted = None def get_redis_url() -> str: """Return REDIS_URL from environment (empty if not set).""" return (os.getenv("REDIS_URL") or "").strip() def get_async_redis(): """ Return shared async Redis client or None. Uses REDIS_URL from env. Lazy init; on failure returns None (graceful degradation). """ global _async_client, _async_attempted if _async_attempted is not None: return _async_client if _async_client is not None else None url = get_redis_url() if not url: _async_attempted = True return None try: import redis.asyncio as redis _async_client = redis.Redis.from_url(url, decode_responses=True) logger.info("Redis async client connected (shared)") _async_attempted = True return _async_client except Exception as e: logger.warning("Redis async connection failed: %s", e) _async_client = None _async_attempted = True return None def get_sync_redis(): """ Return shared sync Redis client or None. Uses REDIS_URL from env. Lazy init; on failure returns None. Used by health check, admin sessions, auth blocklist. """ global _sync_client, _sync_attempted if _sync_attempted is not None: return _sync_client if _sync_client is not None else None url = get_redis_url() if not url: _sync_attempted = True return None try: import redis _sync_client = redis.Redis.from_url( url, decode_responses=True, socket_connect_timeout=5 ) _sync_client.ping() logger.info("Redis sync client connected (shared)") _sync_attempted = True return _sync_client except Exception as e: logger.warning("Redis sync connection failed: %s", e) _sync_client = None _sync_attempted = True return None def ping_sync() -> tuple[bool, str]: """ Ping Redis (sync). Returns (success, error_message). Use for health/readiness checks. """ client = get_sync_redis() if not client: return False, "not_configured" try: client.ping() return True, "" except Exception as e: return False, str(e) # --------------------------------------------------------------------------- # Translation job status cache (for GET /api/v1/translations/{id}) # Key: translation:job:{job_id}, value: JSON, TTL e.g. 2h # --------------------------------------------------------------------------- JOB_STATUS_KEY_PREFIX = "translation:job" JOB_STATUS_TTL = 2 * 3600 # 2 hours async def set_job_status_async(job_id: str, data: dict, ttl: int = JOB_STATUS_TTL) -> bool: """Store job status in Redis for multi-instance progress polling. Returns True if stored.""" client = get_async_redis() if not client: return False try: import json key = f"{JOB_STATUS_KEY_PREFIX}:{job_id}" await client.set(key, json.dumps(data), ex=ttl) return True except Exception as e: logger.warning("Redis set_job_status_async failed for %s: %s", job_id, e) return False async def get_job_status_async(job_id: str) -> dict | None: """Retrieve job status from Redis. Returns None if not found or Redis unavailable.""" client = get_async_redis() if not client: return None try: import json key = f"{JOB_STATUS_KEY_PREFIX}:{job_id}" data = await client.get(key) return json.loads(data) if data else None except Exception as e: logger.warning("Redis get_job_status_async failed for %s: %s", job_id, e) return None