""" Authentication service with JWT tokens and password hashing """ import os import secrets import hashlib from datetime import datetime, timedelta from typing import Optional, Dict, Any import json from pathlib import Path # Try to import optional dependencies try: import jwt JWT_AVAILABLE = True except ImportError: JWT_AVAILABLE = False try: from passlib.context import CryptContext pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") PASSLIB_AVAILABLE = True except ImportError: PASSLIB_AVAILABLE = False from models.subscription import User, UserCreate, PlanType, SubscriptionStatus, PLANS # Configuration SECRET_KEY = os.getenv("JWT_SECRET_KEY", secrets.token_urlsafe(32)) ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_HOURS = 24 REFRESH_TOKEN_EXPIRE_DAYS = 30 # Simple file-based storage (replace with database in production) USERS_FILE = Path("data/users.json") USERS_FILE.parent.mkdir(exist_ok=True) def hash_password(password: str) -> str: """Hash a password using bcrypt or fallback to SHA256""" if PASSLIB_AVAILABLE: return pwd_context.hash(password) else: # Fallback to SHA256 with salt salt = secrets.token_hex(16) hashed = hashlib.sha256(f"{salt}{password}".encode()).hexdigest() return f"sha256${salt}${hashed}" def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash""" if PASSLIB_AVAILABLE and not hashed_password.startswith("sha256$"): return pwd_context.verify(plain_password, hashed_password) else: # Fallback SHA256 verification parts = hashed_password.split("$") if len(parts) == 3 and parts[0] == "sha256": salt = parts[1] expected_hash = parts[2] actual_hash = hashlib.sha256(f"{salt}{plain_password}".encode()).hexdigest() return secrets.compare_digest(actual_hash, expected_hash) return False def create_access_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str: """Create a JWT access token""" if not JWT_AVAILABLE: # Fallback to simple token token_data = { "user_id": user_id, "exp": (datetime.utcnow() + (expires_delta or timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS))).isoformat() } import base64 return base64.urlsafe_b64encode(json.dumps(token_data).encode()).decode() expire = datetime.utcnow() + (expires_delta or timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)) to_encode = {"sub": user_id, "exp": expire, "type": "access"} return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def create_refresh_token(user_id: str) -> str: """Create a JWT refresh token""" if not JWT_AVAILABLE: token_data = { "user_id": user_id, "exp": (datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)).isoformat() } import base64 return base64.urlsafe_b64encode(json.dumps(token_data).encode()).decode() expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) to_encode = {"sub": user_id, "exp": expire, "type": "refresh"} return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def verify_token(token: str) -> Optional[Dict[str, Any]]: """Verify a JWT token and return payload""" if not JWT_AVAILABLE: try: import base64 data = json.loads(base64.urlsafe_b64decode(token.encode()).decode()) exp = datetime.fromisoformat(data["exp"]) if exp < datetime.utcnow(): return None return {"sub": data["user_id"]} except: return None try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload except jwt.ExpiredSignatureError: return None except jwt.JWTError: return None def load_users() -> Dict[str, Dict]: """Load users from file storage""" if USERS_FILE.exists(): try: with open(USERS_FILE, 'r') as f: return json.load(f) except: return {} return {} def save_users(users: Dict[str, Dict]): """Save users to file storage""" with open(USERS_FILE, 'w') as f: json.dump(users, f, indent=2, default=str) def get_user_by_email(email: str) -> Optional[User]: """Get a user by email""" users = load_users() for user_data in users.values(): if user_data.get("email", "").lower() == email.lower(): return User(**user_data) return None def get_user_by_id(user_id: str) -> Optional[User]: """Get a user by ID""" users = load_users() if user_id in users: return User(**users[user_id]) return None def create_user(user_create: UserCreate) -> User: """Create a new user""" users = load_users() # Check if email exists if get_user_by_email(user_create.email): raise ValueError("Email already registered") # Generate user ID user_id = secrets.token_urlsafe(16) # Create user user = User( id=user_id, email=user_create.email, name=user_create.name, password_hash=hash_password(user_create.password), plan=PlanType.FREE, subscription_status=SubscriptionStatus.ACTIVE, ) # Save to storage users[user_id] = user.model_dump() save_users(users) return user def authenticate_user(email: str, password: str) -> Optional[User]: """Authenticate a user with email and password""" user = get_user_by_email(email) if not user: return None if not verify_password(password, user.password_hash): return None return user def update_user(user_id: str, updates: Dict[str, Any]) -> Optional[User]: """Update a user's data""" users = load_users() if user_id not in users: return None users[user_id].update(updates) users[user_id]["updated_at"] = datetime.utcnow().isoformat() save_users(users) return User(**users[user_id]) def check_usage_limits(user: User) -> Dict[str, Any]: """Check if user has exceeded their plan limits""" plan = PLANS[user.plan] # Reset usage if it's a new month now = datetime.utcnow() if user.usage_reset_date.month != now.month or user.usage_reset_date.year != now.year: update_user(user.id, { "docs_translated_this_month": 0, "pages_translated_this_month": 0, "api_calls_this_month": 0, "usage_reset_date": now.isoformat() }) user.docs_translated_this_month = 0 user.pages_translated_this_month = 0 user.api_calls_this_month = 0 docs_limit = plan["docs_per_month"] docs_remaining = max(0, docs_limit - user.docs_translated_this_month) if docs_limit > 0 else -1 return { "can_translate": docs_remaining != 0 or user.extra_credits > 0, "docs_used": user.docs_translated_this_month, "docs_limit": docs_limit, "docs_remaining": docs_remaining, "pages_used": user.pages_translated_this_month, "extra_credits": user.extra_credits, "max_pages_per_doc": plan["max_pages_per_doc"], "max_file_size_mb": plan["max_file_size_mb"], "allowed_providers": plan["providers"], } def record_usage(user_id: str, pages_count: int, use_credits: bool = False) -> bool: """Record document translation usage""" user = get_user_by_id(user_id) if not user: return False updates = { "docs_translated_this_month": user.docs_translated_this_month + 1, "pages_translated_this_month": user.pages_translated_this_month + pages_count, } if use_credits: updates["extra_credits"] = max(0, user.extra_credits - pages_count) update_user(user_id, updates) return True def add_credits(user_id: str, credits: int) -> bool: """Add credits to a user's account""" user = get_user_by_id(user_id) if not user: return False update_user(user_id, {"extra_credits": user.extra_credits + credits}) return True