"""Routes API pour les utilisateurs.""" from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from typing import List from passlib.context import CryptContext from app.database import get_db from app.models.user import User from app.schemas.user import UserCreate, UserResponse, UserLoginRequest # Configuration du hashage de mot de passe pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") router = APIRouter() def verify_password(plain_password: str, hashed_password: str) -> bool: """Vérifie un mot de passe en clair contre le hash""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Génère un hash sécurisé pour un mot de passe""" return pwd_context.hash(password) @router.post("/", response_model=UserResponse) def create_user(user: UserCreate, db: Session = Depends(get_db)): """ Créer un nouvel utilisateur (inscription). Args: user: Données utilisateur à créer. db: Session de base de données. Returns: UserResponse: L'utilisateur créé. Raises: 400: Si validation échoue 409: Si email déjà utilisé """ # Vérifier si l'email existe déjà existing_user = db.query(User).filter(User.email == user.email).first() if existing_user: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Cet email est déjà utilisé" ) # Hasher le mot de passe password_hash = get_password_hash(user.password) # Générer un code de parrainage unique import secrets import string referral_code = ''.join(secrets.choice(string.ascii_uppercase + string.digits) for _ in range(8)) while db.query(User).filter(User.referral_code == referral_code).first(): referral_code = ''.join(secrets.choice(string.ascii_uppercase + string.digits) for _ in range(8)) # Créer l'utilisateur avec le mot de passe hashé new_user = User( email=user.email, name=user.name, password_hash=password_hash, is_premium=False, referral_code=referral_code ) db.add(new_user) db.commit() db.refresh(new_user) return UserResponse( id=new_user.id, email=new_user.email, name=new_user.name, created_at=new_user.created_at, updated_at=new_user.updated_at ) @router.post("/login", response_model=UserResponse) def login_user(user: UserLoginRequest, db: Session = Depends(get_db)): """ Connecter un utilisateur. Args: user: Email et mot de passe de l'utilisateur. db: Session de base de données. Returns: UserResponse: L'utilisateur connecté. Raises: 401: Si email ou mot de passe incorrect """ # Vérifier si l'utilisateur existe db_user = db.query(User).filter(User.email == user.email).first() if not db_user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Email ou mot de passe incorrect" ) # Vérifier le mot de passe if not verify_password(user.password, db_user.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Email ou mot de passe incorrect" ) return UserResponse( id=db_user.id, email=db_user.email, name=db_user.name, created_at=db_user.created_at, updated_at=db_user.updated_at ) @router.get("/", response_model=List[UserResponse]) def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): """ Récupérer la liste des utilisateurs. Args: skip: Nombre d'éléments à sauter. limit: Nombre maximum d'éléments à retourner. db: Session de base de données. Returns: List[UserResponse]: Liste des utilisateurs. """ users = db.query(User).offset(skip).limit(limit).all() return users @router.get("/{user_id}", response_model=UserResponse) def read_user(user_id: int, db: Session = Depends(get_db)): """ Récupérer un utilisateur par son ID. Args: user_id: ID de l'utilisateur. db: Session de base de données. Returns: UserResponse: L'utilisateur trouvé. """ db_user = db.query(User).filter(User.id == user_id).first() if db_user is None: raise HTTPException(status_code=404, detail="Utilisateur non trouvé") return db_user