167 lines
4.7 KiB
Python
167 lines
4.7 KiB
Python
"""
|
|
API Key Service.
|
|
|
|
This module handles API key generation, validation, and management.
|
|
"""
|
|
|
|
import secrets
|
|
import hashlib
|
|
from typing import Optional
|
|
from datetime import datetime
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.api_key import ApiKey
|
|
from app.models.user import User
|
|
|
|
|
|
class ApiKeyService:
|
|
"""Service for managing API keys."""
|
|
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
|
|
def generate_api_key(self, user_id: int, rate_limit: int = 100) -> ApiKey:
|
|
"""
|
|
Generate a new API key for a user.
|
|
|
|
Args:
|
|
user_id: ID of the user
|
|
rate_limit: Rate limit per minute (default: 100)
|
|
|
|
Returns:
|
|
Created ApiKey object
|
|
|
|
Raises:
|
|
ValueError: If user doesn't exist
|
|
"""
|
|
# Verify user exists
|
|
user = self.db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise ValueError(f"User with id {user_id} not found")
|
|
|
|
# Generate API key (32 bytes = 256 bits)
|
|
api_key = secrets.token_urlsafe(32)
|
|
|
|
# Hash the API key for storage (SHA-256)
|
|
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
|
|
|
|
# Store prefix for identification (first 8 characters)
|
|
key_prefix = api_key[:8]
|
|
|
|
# Create API key record
|
|
api_key_record = ApiKey(
|
|
user_id=user_id,
|
|
key_hash=key_hash,
|
|
key_prefix=key_prefix,
|
|
rate_limit=rate_limit,
|
|
is_active=True,
|
|
created_at=datetime.utcnow()
|
|
)
|
|
|
|
self.db.add(api_key_record)
|
|
self.db.commit()
|
|
self.db.refresh(api_key_record)
|
|
|
|
# Return the plain API key (only time it's shown)
|
|
# We'll add it to the dict temporarily for the response
|
|
api_key_dict = api_key_record.to_dict()
|
|
api_key_dict['api_key'] = api_key
|
|
|
|
return api_key_dict
|
|
|
|
def validate_api_key(self, api_key: str) -> Optional[ApiKey]:
|
|
"""
|
|
Validate an API key.
|
|
|
|
Args:
|
|
api_key: The API key to validate
|
|
|
|
Returns:
|
|
ApiKey object if valid, None otherwise
|
|
"""
|
|
# Hash the provided API key
|
|
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
|
|
|
|
# Query for matching hash
|
|
api_key_record = self.db.query(ApiKey).filter(
|
|
ApiKey.key_hash == key_hash,
|
|
ApiKey.is_active == True
|
|
).first()
|
|
|
|
if api_key_record:
|
|
# Update last used timestamp
|
|
api_key_record.last_used_at = datetime.utcnow()
|
|
self.db.commit()
|
|
self.db.refresh(api_key_record)
|
|
return api_key_record
|
|
|
|
return None
|
|
|
|
def get_user_api_keys(self, user_id: int) -> list[dict]:
|
|
"""
|
|
Get all API keys for a user.
|
|
|
|
Args:
|
|
user_id: ID of the user
|
|
|
|
Returns:
|
|
List of API key dictionaries (without actual keys)
|
|
"""
|
|
api_keys = self.db.query(ApiKey).filter(
|
|
ApiKey.user_id == user_id
|
|
).all()
|
|
|
|
return [api_key.to_dict() for api_key in api_keys]
|
|
|
|
def revoke_api_key(self, api_key_id: int, user_id: int) -> bool:
|
|
"""
|
|
Revoke (deactivate) an API key.
|
|
|
|
Args:
|
|
api_key_id: ID of the API key to revoke
|
|
user_id: ID of the user (for authorization)
|
|
|
|
Returns:
|
|
True if revoked, False otherwise
|
|
"""
|
|
api_key = self.db.query(ApiKey).filter(
|
|
ApiKey.id == api_key_id,
|
|
ApiKey.user_id == user_id
|
|
).first()
|
|
|
|
if api_key:
|
|
api_key.is_active = False
|
|
self.db.commit()
|
|
return True
|
|
|
|
return False
|
|
|
|
def regenerate_api_key(self, api_key_id: int, user_id: int) -> Optional[dict]:
|
|
"""
|
|
Regenerate an API key (create new, deactivate old).
|
|
|
|
Args:
|
|
api_key_id: ID of the API key to regenerate
|
|
user_id: ID of the user (for authorization)
|
|
|
|
Returns:
|
|
New API key dict with plain key, or None if not found
|
|
"""
|
|
old_api_key = self.db.query(ApiKey).filter(
|
|
ApiKey.id == api_key_id,
|
|
ApiKey.user_id == user_id
|
|
).first()
|
|
|
|
if not old_api_key:
|
|
return None
|
|
|
|
# Get rate limit from old key
|
|
rate_limit = old_api_key.rate_limit
|
|
|
|
# Deactivate old key
|
|
old_api_key.is_active = False
|
|
self.db.commit()
|
|
|
|
# Generate new key
|
|
return self.generate_api_key(user_id, rate_limit)
|