154 lines
4.5 KiB
Python
154 lines
4.5 KiB
Python
"""Routes API pour les utilisateurs."""
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from typing import List
|
|
from passlib.context import CryptContext
|
|
from app.database import get_db
|
|
from app.models.user import User
|
|
from app.schemas.user import UserCreate, UserResponse, UserLoginRequest
|
|
|
|
# Configuration du hashage de mot de passe
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
|
"""Vérifie un mot de passe en clair contre le hash"""
|
|
return pwd_context.verify(plain_password, hashed_password)
|
|
|
|
|
|
def get_password_hash(password: str) -> str:
|
|
"""Génère un hash sécurisé pour un mot de passe"""
|
|
return pwd_context.hash(password)
|
|
|
|
|
|
@router.post("/", response_model=UserResponse)
|
|
def create_user(user: UserCreate, db: Session = Depends(get_db)):
|
|
"""
|
|
Créer un nouvel utilisateur (inscription).
|
|
|
|
Args:
|
|
user: Données utilisateur à créer.
|
|
db: Session de base de données.
|
|
|
|
Returns:
|
|
UserResponse: L'utilisateur créé.
|
|
|
|
Raises:
|
|
400: Si validation échoue
|
|
409: Si email déjà utilisé
|
|
"""
|
|
# Vérifier si l'email existe déjà
|
|
existing_user = db.query(User).filter(User.email == user.email).first()
|
|
if existing_user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="Cet email est déjà utilisé"
|
|
)
|
|
|
|
# Hasher le mot de passe
|
|
password_hash = get_password_hash(user.password)
|
|
|
|
# Générer un code de parrainage unique
|
|
import secrets
|
|
import string
|
|
referral_code = ''.join(secrets.choice(string.ascii_uppercase + string.digits) for _ in range(8))
|
|
while db.query(User).filter(User.referral_code == referral_code).first():
|
|
referral_code = ''.join(secrets.choice(string.ascii_uppercase + string.digits) for _ in range(8))
|
|
|
|
# Créer l'utilisateur avec le mot de passe hashé
|
|
new_user = User(
|
|
email=user.email,
|
|
name=user.name,
|
|
password_hash=password_hash,
|
|
is_premium=False,
|
|
referral_code=referral_code
|
|
)
|
|
|
|
db.add(new_user)
|
|
db.commit()
|
|
db.refresh(new_user)
|
|
|
|
return UserResponse(
|
|
id=new_user.id,
|
|
email=new_user.email,
|
|
name=new_user.name,
|
|
created_at=new_user.created_at,
|
|
updated_at=new_user.updated_at
|
|
)
|
|
|
|
|
|
@router.post("/login", response_model=UserResponse)
|
|
def login_user(user: UserLoginRequest, db: Session = Depends(get_db)):
|
|
"""
|
|
Connecter un utilisateur.
|
|
|
|
Args:
|
|
user: Email et mot de passe de l'utilisateur.
|
|
db: Session de base de données.
|
|
|
|
Returns:
|
|
UserResponse: L'utilisateur connecté.
|
|
|
|
Raises:
|
|
401: Si email ou mot de passe incorrect
|
|
"""
|
|
# Vérifier si l'utilisateur existe
|
|
db_user = db.query(User).filter(User.email == user.email).first()
|
|
if not db_user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Email ou mot de passe incorrect"
|
|
)
|
|
|
|
# Vérifier le mot de passe
|
|
if not verify_password(user.password, db_user.password_hash):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Email ou mot de passe incorrect"
|
|
)
|
|
|
|
return UserResponse(
|
|
id=db_user.id,
|
|
email=db_user.email,
|
|
name=db_user.name,
|
|
created_at=db_user.created_at,
|
|
updated_at=db_user.updated_at
|
|
)
|
|
|
|
|
|
@router.get("/", response_model=List[UserResponse])
|
|
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
"""
|
|
Récupérer la liste des utilisateurs.
|
|
|
|
Args:
|
|
skip: Nombre d'éléments à sauter.
|
|
limit: Nombre maximum d'éléments à retourner.
|
|
db: Session de base de données.
|
|
|
|
Returns:
|
|
List[UserResponse]: Liste des utilisateurs.
|
|
"""
|
|
users = db.query(User).offset(skip).limit(limit).all()
|
|
return users
|
|
|
|
|
|
@router.get("/{user_id}", response_model=UserResponse)
|
|
def read_user(user_id: int, db: Session = Depends(get_db)):
|
|
"""
|
|
Récupérer un utilisateur par son ID.
|
|
|
|
Args:
|
|
user_id: ID de l'utilisateur.
|
|
db: Session de base de données.
|
|
|
|
Returns:
|
|
UserResponse: L'utilisateur trouvé.
|
|
"""
|
|
db_user = db.query(User).filter(User.id == user_id).first()
|
|
if db_user is None:
|
|
raise HTTPException(status_code=404, detail="Utilisateur non trouvé")
|
|
return db_user
|