office_translator/services/auth_service.py
Sepehr fcabe882cd feat: Add complete monetization system
Backend:
- User authentication with JWT tokens (auth_service.py)
- Subscription plans: Free, Starter (), Pro (), Business (), Enterprise
- Stripe integration for payments (payment_service.py)
- Usage tracking and quotas
- Credit packages for pay-per-use
- Plan-based provider restrictions

Frontend:
- Landing page with hero, features, pricing preview (landing-sections.tsx)
- Pricing page with all plans and credit packages (/pricing)
- User dashboard with usage stats (/dashboard)
- Login/Register pages with validation (/auth/login, /auth/register)
- Ollama self-hosting setup guide (/ollama-setup)
- Updated sidebar with user section and plan badge

Monetization strategy:
- Freemium: 3 docs/day, Ollama only
- Starter: 50 docs/month, Google Translate
- Pro: 200 docs/month, all providers, API access
- Business: 1000 docs/month, team management
- Enterprise: Custom pricing, SLA

Self-hosted option:
- Free unlimited usage with own Ollama server
- Complete privacy (data never leaves machine)
- Step-by-step setup guide included
2025-11-30 21:11:51 +01:00

263 lines
8.0 KiB
Python

"""
Authentication service with JWT tokens and password hashing
"""
import os
import secrets
import hashlib
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import json
from pathlib import Path
# Try to import optional dependencies
try:
import jwt
JWT_AVAILABLE = True
except ImportError:
JWT_AVAILABLE = False
try:
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
PASSLIB_AVAILABLE = True
except ImportError:
PASSLIB_AVAILABLE = False
from models.subscription import User, UserCreate, PlanType, SubscriptionStatus, PLANS
# Configuration
SECRET_KEY = os.getenv("JWT_SECRET_KEY", secrets.token_urlsafe(32))
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_HOURS = 24
REFRESH_TOKEN_EXPIRE_DAYS = 30
# Simple file-based storage (replace with database in production)
USERS_FILE = Path("data/users.json")
USERS_FILE.parent.mkdir(exist_ok=True)
def hash_password(password: str) -> str:
"""Hash a password using bcrypt or fallback to SHA256"""
if PASSLIB_AVAILABLE:
return pwd_context.hash(password)
else:
# Fallback to SHA256 with salt
salt = secrets.token_hex(16)
hashed = hashlib.sha256(f"{salt}{password}".encode()).hexdigest()
return f"sha256${salt}${hashed}"
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash"""
if PASSLIB_AVAILABLE and not hashed_password.startswith("sha256$"):
return pwd_context.verify(plain_password, hashed_password)
else:
# Fallback SHA256 verification
parts = hashed_password.split("$")
if len(parts) == 3 and parts[0] == "sha256":
salt = parts[1]
expected_hash = parts[2]
actual_hash = hashlib.sha256(f"{salt}{plain_password}".encode()).hexdigest()
return secrets.compare_digest(actual_hash, expected_hash)
return False
def create_access_token(user_id: str, expires_delta: Optional[timedelta] = None) -> str:
"""Create a JWT access token"""
if not JWT_AVAILABLE:
# Fallback to simple token
token_data = {
"user_id": user_id,
"exp": (datetime.utcnow() + (expires_delta or timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS))).isoformat()
}
import base64
return base64.urlsafe_b64encode(json.dumps(token_data).encode()).decode()
expire = datetime.utcnow() + (expires_delta or timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS))
to_encode = {"sub": user_id, "exp": expire, "type": "access"}
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(user_id: str) -> str:
"""Create a JWT refresh token"""
if not JWT_AVAILABLE:
token_data = {
"user_id": user_id,
"exp": (datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)).isoformat()
}
import base64
return base64.urlsafe_b64encode(json.dumps(token_data).encode()).decode()
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode = {"sub": user_id, "exp": expire, "type": "refresh"}
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(token: str) -> Optional[Dict[str, Any]]:
"""Verify a JWT token and return payload"""
if not JWT_AVAILABLE:
try:
import base64
data = json.loads(base64.urlsafe_b64decode(token.encode()).decode())
exp = datetime.fromisoformat(data["exp"])
if exp < datetime.utcnow():
return None
return {"sub": data["user_id"]}
except:
return None
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.JWTError:
return None
def load_users() -> Dict[str, Dict]:
"""Load users from file storage"""
if USERS_FILE.exists():
try:
with open(USERS_FILE, 'r') as f:
return json.load(f)
except:
return {}
return {}
def save_users(users: Dict[str, Dict]):
"""Save users to file storage"""
with open(USERS_FILE, 'w') as f:
json.dump(users, f, indent=2, default=str)
def get_user_by_email(email: str) -> Optional[User]:
"""Get a user by email"""
users = load_users()
for user_data in users.values():
if user_data.get("email", "").lower() == email.lower():
return User(**user_data)
return None
def get_user_by_id(user_id: str) -> Optional[User]:
"""Get a user by ID"""
users = load_users()
if user_id in users:
return User(**users[user_id])
return None
def create_user(user_create: UserCreate) -> User:
"""Create a new user"""
users = load_users()
# Check if email exists
if get_user_by_email(user_create.email):
raise ValueError("Email already registered")
# Generate user ID
user_id = secrets.token_urlsafe(16)
# Create user
user = User(
id=user_id,
email=user_create.email,
name=user_create.name,
password_hash=hash_password(user_create.password),
plan=PlanType.FREE,
subscription_status=SubscriptionStatus.ACTIVE,
)
# Save to storage
users[user_id] = user.model_dump()
save_users(users)
return user
def authenticate_user(email: str, password: str) -> Optional[User]:
"""Authenticate a user with email and password"""
user = get_user_by_email(email)
if not user:
return None
if not verify_password(password, user.password_hash):
return None
return user
def update_user(user_id: str, updates: Dict[str, Any]) -> Optional[User]:
"""Update a user's data"""
users = load_users()
if user_id not in users:
return None
users[user_id].update(updates)
users[user_id]["updated_at"] = datetime.utcnow().isoformat()
save_users(users)
return User(**users[user_id])
def check_usage_limits(user: User) -> Dict[str, Any]:
"""Check if user has exceeded their plan limits"""
plan = PLANS[user.plan]
# Reset usage if it's a new month
now = datetime.utcnow()
if user.usage_reset_date.month != now.month or user.usage_reset_date.year != now.year:
update_user(user.id, {
"docs_translated_this_month": 0,
"pages_translated_this_month": 0,
"api_calls_this_month": 0,
"usage_reset_date": now.isoformat()
})
user.docs_translated_this_month = 0
user.pages_translated_this_month = 0
user.api_calls_this_month = 0
docs_limit = plan["docs_per_month"]
docs_remaining = max(0, docs_limit - user.docs_translated_this_month) if docs_limit > 0 else -1
return {
"can_translate": docs_remaining != 0 or user.extra_credits > 0,
"docs_used": user.docs_translated_this_month,
"docs_limit": docs_limit,
"docs_remaining": docs_remaining,
"pages_used": user.pages_translated_this_month,
"extra_credits": user.extra_credits,
"max_pages_per_doc": plan["max_pages_per_doc"],
"max_file_size_mb": plan["max_file_size_mb"],
"allowed_providers": plan["providers"],
}
def record_usage(user_id: str, pages_count: int, use_credits: bool = False) -> bool:
"""Record document translation usage"""
user = get_user_by_id(user_id)
if not user:
return False
updates = {
"docs_translated_this_month": user.docs_translated_this_month + 1,
"pages_translated_this_month": user.pages_translated_this_month + pages_count,
}
if use_credits:
updates["extra_credits"] = max(0, user.extra_credits - pages_count)
update_user(user_id, updates)
return True
def add_credits(user_id: str, credits: int) -> bool:
"""Add credits to a user's account"""
user = get_user_by_id(user_id)
if not user:
return False
update_user(user_id, {"extra_credits": user.extra_credits + credits})
return True