262 lines
7.7 KiB
Python
262 lines
7.7 KiB
Python
"""
|
|
Authentication API Endpoints.
|
|
|
|
This module provides endpoints for user authentication including
|
|
login, registration, and logout.
|
|
"""
|
|
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from passlib.context import CryptContext
|
|
|
|
from app.database import get_db
|
|
from app.models.user import User
|
|
from app.models.badge import Badge, UserBadge
|
|
from app.schemas.auth import (
|
|
LoginRequest,
|
|
RegisterRequest,
|
|
AuthResponse,
|
|
ErrorResponse
|
|
)
|
|
|
|
# #region agent log
|
|
def write_debug_log(hypothesisId: str, location: str, message: str, data: dict = None):
|
|
"""Écrit un log NDJSON pour le debug."""
|
|
import json
|
|
log_entry = {
|
|
"sessionId": "debug-session",
|
|
"runId": "run1",
|
|
"hypothesisId": hypothesisId,
|
|
"location": location,
|
|
"message": message,
|
|
"data": data or {},
|
|
"timestamp": datetime.now().timestamp() * 1000
|
|
}
|
|
with open(r"d:\\dev_new_pc\\chartbastan\\.cursor\\debug.log", "a") as f:
|
|
f.write(json.dumps(log_entry) + "\n")
|
|
# #endregion
|
|
|
|
# Configuration du hashage de mot de passe
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
router = APIRouter(tags=["authentication"])
|
|
|
|
# #region agent log
|
|
write_debug_log("C", "auth.py:29", "Auth router initialized", {"prefix": "/api/v1/auth", "tags": ["authentication"]})
|
|
# #endregion
|
|
|
|
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
|
"""Vérifie un mot de passe en clair contre le hash"""
|
|
return pwd_context.verify(plain_password, hashed_password)
|
|
|
|
|
|
def get_password_hash(password: str) -> str:
|
|
"""Génère un hash sécurisé pour un mot de passe"""
|
|
return pwd_context.hash(password)
|
|
|
|
|
|
def generate_referral_code() -> str:
|
|
"""Génère un code de parrainage unique de 8 caractères"""
|
|
import secrets
|
|
import string
|
|
alphabet = string.ascii_uppercase + string.digits
|
|
return ''.join(secrets.choice(alphabet) for _ in range(8))
|
|
|
|
|
|
@router.post("/login", response_model=AuthResponse, responses={401: {"model": ErrorResponse}})
|
|
def login_user(
|
|
request: LoginRequest,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Connecter un utilisateur avec email et mot de passe.
|
|
|
|
Args:
|
|
request: Email et mot de passe de l'utilisateur
|
|
db: Session de base de données
|
|
|
|
Returns:
|
|
AuthResponse avec token et données utilisateur
|
|
|
|
Raises:
|
|
401: Si email ou mot de passe incorrect
|
|
500: Si erreur serveur
|
|
"""
|
|
# #region agent log
|
|
write_debug_log("D", "auth.py:48", "Login endpoint called", {"email": request.email})
|
|
# #endregion
|
|
|
|
user = db.query(User).filter(User.email == request.email).first()
|
|
|
|
# #region agent log
|
|
write_debug_log("D", "auth.py:67", "User lookup", {"email": request.email, "user_found": user is not None})
|
|
# #endregion
|
|
|
|
# Vérifier si l'utilisateur existe
|
|
if not user:
|
|
# #region agent log
|
|
write_debug_log("D", "auth.py:70", "User not found", {"email": request.email})
|
|
# #endregion
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Email ou mot de passe incorrect"
|
|
)
|
|
|
|
# Vérifier le mot de passe
|
|
if not user.password_hash:
|
|
# #region agent log
|
|
write_debug_log("D", "auth.py:81", "No password hash", {"email": request.email, "user_id": user.id})
|
|
# #endregion
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Email ou mot de passe incorrect"
|
|
)
|
|
|
|
password_valid = verify_password(request.password, user.password_hash)
|
|
# #region agent log
|
|
write_debug_log("D", "auth.py:91", "Password verification", {"email": request.email, "user_id": user.id, "password_valid": password_valid})
|
|
# #endregion
|
|
|
|
if not password_valid:
|
|
# #region agent log
|
|
write_debug_log("D", "auth.py:96", "Invalid password", {"email": request.email, "user_id": user.id})
|
|
# #endregion
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Email ou mot de passe incorrect"
|
|
)
|
|
|
|
# Retourner les données utilisateur (sans le hash de mot de passe)
|
|
# #region agent log
|
|
write_debug_log("D", "auth.py:107", "Login successful", {"email": request.email, "user_id": user.id})
|
|
# #endregion
|
|
return {
|
|
"data": {
|
|
"id": user.id,
|
|
"email": user.email,
|
|
"name": user.name,
|
|
"is_premium": user.is_premium,
|
|
"referral_code": user.referral_code,
|
|
"created_at": user.created_at,
|
|
},
|
|
"meta": {
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"version": "v1"
|
|
}
|
|
}
|
|
|
|
|
|
@router.post("/register", response_model=AuthResponse, status_code=status.HTTP_201_CREATED)
|
|
def register_user(
|
|
request: RegisterRequest,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Inscrire un nouvel utilisateur.
|
|
|
|
Args:
|
|
request: Email, mot de passe, nom optionnel et code de parrainage
|
|
db: Session de base de données
|
|
|
|
Returns:
|
|
AuthResponse avec données utilisateur
|
|
|
|
Raises:
|
|
400: Si validation échoue
|
|
409: Si email déjà utilisé
|
|
500: Si erreur serveur
|
|
"""
|
|
# Vérifier si l'email existe déjà
|
|
existing_user = db.query(User).filter(User.email == request.email).first()
|
|
if existing_user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="Cet email est déjà utilisé"
|
|
)
|
|
|
|
# Valider le code de parrainage si fourni
|
|
referrer: Optional[User] = None
|
|
if request.referral_code:
|
|
referrer = db.query(User).filter(
|
|
User.referral_code == request.referral_code
|
|
).first()
|
|
if not referrer:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Code de parrainage invalide"
|
|
)
|
|
|
|
# Générer un code de parrainage unique
|
|
referral_code = generate_referral_code()
|
|
while db.query(User).filter(User.referral_code == referral_code).first():
|
|
referral_code = generate_referral_code()
|
|
|
|
# Hasher le mot de passe
|
|
password_hash = get_password_hash(request.password)
|
|
|
|
# Créer l'utilisateur
|
|
new_user = User(
|
|
email=request.email,
|
|
password_hash=password_hash,
|
|
name=request.name,
|
|
is_premium=False,
|
|
referral_code=referral_code,
|
|
created_at=datetime.utcnow(),
|
|
updated_at=datetime.utcnow()
|
|
)
|
|
|
|
db.add(new_user)
|
|
db.commit()
|
|
db.refresh(new_user)
|
|
|
|
# Attribuer le badge de bienvenue (première connexion)
|
|
welcome_badge = db.query(Badge).filter(
|
|
Badge.badge_id == "first_login"
|
|
).first()
|
|
|
|
if welcome_badge:
|
|
user_badge = UserBadge(
|
|
user_id=new_user.id,
|
|
badge_id=welcome_badge.id,
|
|
unlocked_at=datetime.utcnow()
|
|
)
|
|
db.add(user_badge)
|
|
db.commit()
|
|
|
|
return {
|
|
"data": {
|
|
"id": new_user.id,
|
|
"email": new_user.email,
|
|
"name": new_user.name,
|
|
"is_premium": new_user.is_premium,
|
|
"referral_code": new_user.referral_code,
|
|
"created_at": new_user.created_at,
|
|
},
|
|
"meta": {
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"version": "v1"
|
|
}
|
|
}
|
|
|
|
|
|
@router.post("/logout")
|
|
def logout_user():
|
|
"""
|
|
Déconnecter l'utilisateur.
|
|
|
|
Note: Le backend utilise des cookies côté client.
|
|
Cette endpoint est disponible pour compatibilité future avec JWT.
|
|
"""
|
|
return {
|
|
"data": {
|
|
"message": "Déconnexion réussie"
|
|
},
|
|
"meta": {
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"version": "v1"
|
|
}
|
|
}
|