Files
office_translator/middleware/tier_quota.py
Sepehr Ramezani 26bd096a06 feat: production deployment - full update with providers, admin, glossaries, pricing, tests
Major changes across backend, frontend, infrastructure:
- Provider system with model selection (Google, DeepL, OpenAI, Ollama, Google Cloud)
- Admin panel: user management, pricing, settings
- Glossary system with CSV import/export
- Subscription and tier quota management
- Security hardening (rate limiting, API key auth, path traversal fixes)
- Docker compose for dev, prod, and IONOS deployment
- Alembic migrations for new tables
- Frontend: dashboard, pricing page, landing page, i18n (en/fr)
- Test suite and verification scripts

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-04-25 15:01:47 +02:00

166 lines
5.8 KiB
Python

"""
Tier-based daily translation quota (Story 1.6).
Uses Redis sliding-window daily counter per user; fallback in-memory when Redis unavailable.
Coexists with IP-based rate limiting in rate_limiting.py.
Source of truth: Redis (key per user per UTC date) is the authority for quota enforcement.
User.daily_translation_count in DB is kept in sync on each successful translation for
reporting/analytics; reset at midnight UTC is automatic in Redis (new key per day). DB
reset can be done by a scheduled job at midnight UTC if needed.
"""
from __future__ import annotations
import os
import logging
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from typing import Optional
logger = logging.getLogger(__name__)
# Free tier: 5 translations per day (UTC). Pro (and equivalent) tiers: no daily cap.
FREE_TIER_DAILY_LIMIT = 5
KEY_PREFIX = "rate_limit:daily"
def _utc_date_str(dt: Optional[datetime] = None) -> str:
"""Current date in UTC as YYYY-MM-DD."""
t = dt or datetime.now(timezone.utc)
return t.strftime("%Y-%m-%d")
def _next_midnight_utc(dt: Optional[datetime] = None) -> datetime:
"""Next midnight UTC after the given time (or now)."""
now = dt or datetime.now(timezone.utc)
tomorrow = (now.date() + timedelta(days=1))
return datetime(tomorrow.year, tomorrow.month, tomorrow.day, tzinfo=timezone.utc)
def _seconds_until_midnight_utc(dt: Optional[datetime] = None) -> int:
"""Seconds until next midnight UTC."""
now = dt or datetime.now(timezone.utc)
return max(0, int((_next_midnight_utc(now) - now).total_seconds()))
@dataclass
class QuotaResult:
"""Result of a quota check."""
allowed: bool
remaining: int # -1 for pro (unlimited)
reset_at_utc: datetime
current_usage: int = 0
limit: int = FREE_TIER_DAILY_LIMIT
# ---------------------------------------------------------------------------
# Redis backend (shared client from core.redis)
# ---------------------------------------------------------------------------
def _get_async_redis():
"""Return async Redis client or None. Uses shared client from core.redis."""
from core.redis import get_async_redis
return get_async_redis()
# ---------------------------------------------------------------------------
# In-memory fallback (per process; not shared across workers). Documented as fallback.
# ---------------------------------------------------------------------------
_memory_usage: dict[tuple[str, str], int] = {} # (user_id, date_utc_str) -> count
def _memory_get(user_id: str, date_str: str) -> int:
return _memory_usage.get((user_id, date_str), 0)
def _memory_incr(user_id: str, date_str: str) -> int:
key = (user_id, date_str)
_memory_usage[key] = _memory_usage.get(key, 0) + 1
return _memory_usage[key]
# ---------------------------------------------------------------------------
# TierQuotaService
# ---------------------------------------------------------------------------
class TierQuotaService:
"""
Daily translation quota per user by tier.
Redis key pattern: rate_limit:daily:{user_id}:{YYYY-MM-DD}, TTL 25h.
If Redis is unavailable, uses in-memory dict (documented fallback).
"""
def __init__(self):
self._redis = None # Lazy init on first use
def _redis_client(self):
if self._redis is None:
self._redis = _get_async_redis()
return self._redis
def _date_str(self, dt: Optional[datetime] = None) -> str:
return _utc_date_str(dt)
async def check_quota(self, user_id: str, tier: str) -> QuotaResult:
"""
Check if user has quota for one more translation today (UTC).
tier "free" -> limit 5/day; "pro" (or equivalent) -> unlimited.
"""
reset_at = _next_midnight_utc()
tier_lower = (tier or "free").lower()
if tier_lower in ("pro", "business", "enterprise", "starter"):
return QuotaResult(
allowed=True,
remaining=-1,
reset_at_utc=reset_at,
current_usage=0,
limit=0,
)
# Free tier
date_str = self._date_str()
redis_client = self._redis_client()
if redis_client:
try:
key = f"{KEY_PREFIX}:{user_id}:{date_str}"
count = await redis_client.get(key)
count = int(count or 0)
except Exception as e:
logger.warning("Tier quota Redis get failed: %s, using in-memory", e)
count = _memory_get(user_id, date_str)
else:
count = _memory_get(user_id, date_str)
remaining = max(0, FREE_TIER_DAILY_LIMIT - count)
return QuotaResult(
allowed=count < FREE_TIER_DAILY_LIMIT,
remaining=remaining,
reset_at_utc=reset_at,
current_usage=count,
limit=FREE_TIER_DAILY_LIMIT,
)
async def increment_on_success(self, user_id: str) -> None:
"""Increment daily translation count for user (call after successful translation)."""
date_str = self._date_str()
redis_client = self._redis_client()
if redis_client:
try:
key = f"{KEY_PREFIX}:{user_id}:{date_str}"
pipe = redis_client.pipeline()
pipe.incr(key)
pipe.expire(key, 25 * 3600) # 25h so key expires after midnight UTC
await pipe.execute()
return
except Exception as e:
logger.warning("Tier quota Redis increment failed: %s, using in-memory", e)
_memory_incr(user_id, date_str)
def seconds_until_reset(self) -> int:
"""Seconds until next midnight UTC (for Retry-After header)."""
return _seconds_until_midnight_utc()
# Singleton for app use
tier_quota_service = TierQuotaService()