2026-02-01 09:31:38 +01:00

262 lines
7.7 KiB
Python

"""
Authentication API Endpoints.
This module provides endpoints for user authentication including
login, registration, and logout.
"""
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from passlib.context import CryptContext
from app.database import get_db
from app.models.user import User
from app.models.badge import Badge, UserBadge
from app.schemas.auth import (
LoginRequest,
RegisterRequest,
AuthResponse,
ErrorResponse
)
# #region agent log
def write_debug_log(hypothesisId: str, location: str, message: str, data: dict = None):
"""Écrit un log NDJSON pour le debug."""
import json
log_entry = {
"sessionId": "debug-session",
"runId": "run1",
"hypothesisId": hypothesisId,
"location": location,
"message": message,
"data": data or {},
"timestamp": datetime.now().timestamp() * 1000
}
with open(r"d:\\dev_new_pc\\chartbastan\\.cursor\\debug.log", "a") as f:
f.write(json.dumps(log_entry) + "\n")
# #endregion
# Configuration du hashage de mot de passe
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
router = APIRouter(tags=["authentication"])
# #region agent log
write_debug_log("C", "auth.py:29", "Auth router initialized", {"prefix": "/api/v1/auth", "tags": ["authentication"]})
# #endregion
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Vérifie un mot de passe en clair contre le hash"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Génère un hash sécurisé pour un mot de passe"""
return pwd_context.hash(password)
def generate_referral_code() -> str:
"""Génère un code de parrainage unique de 8 caractères"""
import secrets
import string
alphabet = string.ascii_uppercase + string.digits
return ''.join(secrets.choice(alphabet) for _ in range(8))
@router.post("/login", response_model=AuthResponse, responses={401: {"model": ErrorResponse}})
def login_user(
request: LoginRequest,
db: Session = Depends(get_db)
):
"""
Connecter un utilisateur avec email et mot de passe.
Args:
request: Email et mot de passe de l'utilisateur
db: Session de base de données
Returns:
AuthResponse avec token et données utilisateur
Raises:
401: Si email ou mot de passe incorrect
500: Si erreur serveur
"""
# #region agent log
write_debug_log("D", "auth.py:48", "Login endpoint called", {"email": request.email})
# #endregion
user = db.query(User).filter(User.email == request.email).first()
# #region agent log
write_debug_log("D", "auth.py:67", "User lookup", {"email": request.email, "user_found": user is not None})
# #endregion
# Vérifier si l'utilisateur existe
if not user:
# #region agent log
write_debug_log("D", "auth.py:70", "User not found", {"email": request.email})
# #endregion
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Email ou mot de passe incorrect"
)
# Vérifier le mot de passe
if not user.password_hash:
# #region agent log
write_debug_log("D", "auth.py:81", "No password hash", {"email": request.email, "user_id": user.id})
# #endregion
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Email ou mot de passe incorrect"
)
password_valid = verify_password(request.password, user.password_hash)
# #region agent log
write_debug_log("D", "auth.py:91", "Password verification", {"email": request.email, "user_id": user.id, "password_valid": password_valid})
# #endregion
if not password_valid:
# #region agent log
write_debug_log("D", "auth.py:96", "Invalid password", {"email": request.email, "user_id": user.id})
# #endregion
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Email ou mot de passe incorrect"
)
# Retourner les données utilisateur (sans le hash de mot de passe)
# #region agent log
write_debug_log("D", "auth.py:107", "Login successful", {"email": request.email, "user_id": user.id})
# #endregion
return {
"data": {
"id": user.id,
"email": user.email,
"name": user.name,
"is_premium": user.is_premium,
"referral_code": user.referral_code,
"created_at": user.created_at,
},
"meta": {
"timestamp": datetime.utcnow().isoformat(),
"version": "v1"
}
}
@router.post("/register", response_model=AuthResponse, status_code=status.HTTP_201_CREATED)
def register_user(
request: RegisterRequest,
db: Session = Depends(get_db)
):
"""
Inscrire un nouvel utilisateur.
Args:
request: Email, mot de passe, nom optionnel et code de parrainage
db: Session de base de données
Returns:
AuthResponse avec données utilisateur
Raises:
400: Si validation échoue
409: Si email déjà utilisé
500: Si erreur serveur
"""
# Vérifier si l'email existe déjà
existing_user = db.query(User).filter(User.email == request.email).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Cet email est déjà utilisé"
)
# Valider le code de parrainage si fourni
referrer: Optional[User] = None
if request.referral_code:
referrer = db.query(User).filter(
User.referral_code == request.referral_code
).first()
if not referrer:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Code de parrainage invalide"
)
# Générer un code de parrainage unique
referral_code = generate_referral_code()
while db.query(User).filter(User.referral_code == referral_code).first():
referral_code = generate_referral_code()
# Hasher le mot de passe
password_hash = get_password_hash(request.password)
# Créer l'utilisateur
new_user = User(
email=request.email,
password_hash=password_hash,
name=request.name,
is_premium=False,
referral_code=referral_code,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
db.add(new_user)
db.commit()
db.refresh(new_user)
# Attribuer le badge de bienvenue (première connexion)
welcome_badge = db.query(Badge).filter(
Badge.badge_id == "first_login"
).first()
if welcome_badge:
user_badge = UserBadge(
user_id=new_user.id,
badge_id=welcome_badge.id,
unlocked_at=datetime.utcnow()
)
db.add(user_badge)
db.commit()
return {
"data": {
"id": new_user.id,
"email": new_user.email,
"name": new_user.name,
"is_premium": new_user.is_premium,
"referral_code": new_user.referral_code,
"created_at": new_user.created_at,
},
"meta": {
"timestamp": datetime.utcnow().isoformat(),
"version": "v1"
}
}
@router.post("/logout")
def logout_user():
"""
Déconnecter l'utilisateur.
Note: Le backend utilise des cookies côté client.
Cette endpoint est disponible pour compatibilité future avec JWT.
"""
return {
"data": {
"message": "Déconnexion réussie"
},
"meta": {
"timestamp": datetime.utcnow().isoformat(),
"version": "v1"
}
}