""" Leaderboard Service. This module handles the calculation and retrieval of user rankings based on prediction accuracy. """ from typing import Optional from datetime import datetime from sqlalchemy.orm import Session from sqlalchemy import func, case, desc from app.models.user import User from app.models.user_prediction import UserPrediction class LeaderboardService: """Service for managing leaderboard calculations and queries.""" def __init__(self, db: Session): """Initialize leaderboard service with database session.""" self.db = db def get_leaderboard(self, limit: int = 100) -> list[dict]: """ Get leaderboard sorted by accuracy (descending) with predictions count as tie-breaker. Args: limit: Maximum number of users to return (default: 100) Returns: List of dictionaries containing user_id, username, accuracy, predictions_count """ # Calculate accuracy for each user who has viewed predictions # Accuracy = (number of correct predictions / total predictions where was_correct is not NULL) * 100 query = ( self.db.query( User.id.label('user_id'), User.name.label('username'), func.count(UserPrediction.id).label('predictions_count'), func.sum( case( (UserPrediction.was_correct == True, 1), else_=0 ) ).label('correct_count'), func.sum( case( (UserPrediction.was_correct.isnot(None), 1), else_=0 ) ).label('completed_predictions_count') ) .join(UserPrediction, User.id == UserPrediction.user_id) .filter( UserPrediction.was_correct.isnot(None) # Only include predictions where match is completed ) .group_by(User.id, User.name) .order_by( desc( # Calculate accuracy: correct / completed * 100 func.coalesce( func.sum( case( (UserPrediction.was_correct == True, 1), else_=0 ) ) * 100.0 / func.sum( case( (UserPrediction.was_correct.isnot(None), 1), else_=0 ) ), 0.0 ) ), desc('predictions_count') # Tie-breaker: more predictions = higher rank ) .limit(limit) ) results = query.all() # Format results leaderboard = [] for row in results: completed_count = row.completed_predictions_count or 0 correct_count = row.correct_count or 0 accuracy = (correct_count / completed_count * 100) if completed_count > 0 else 0.0 leaderboard.append({ 'user_id': row.user_id, 'username': row.username, 'accuracy': round(accuracy, 1), 'predictions_count': row.predictions_count }) return leaderboard def get_personal_rank(self, user_id: int) -> Optional[dict]: """ Get personal rank data for a specific user. Args: user_id: ID of the user Returns: Dictionary containing rank, accuracy, predictions_count, or None if user not found """ # Get user's stats user_stats = ( self.db.query( User.id, func.count(UserPrediction.id).label('predictions_count'), func.sum( case( (UserPrediction.was_correct == True, 1), else_=0 ) ).label('correct_count'), func.sum( case( (UserPrediction.was_correct.isnot(None), 1), else_=0 ) ).label('completed_predictions_count') ) .join(UserPrediction, User.id == UserPrediction.user_id) .filter(User.id == user_id) .filter(UserPrediction.was_correct.isnot(None)) .group_by(User.id) .first() ) if not user_stats: return None completed_count = user_stats.completed_predictions_count or 0 correct_count = user_stats.correct_count or 0 accuracy = (correct_count / completed_count * 100) if completed_count > 0 else 0.0 predictions_count = user_stats.predictions_count or 0 # Get full leaderboard to calculate rank full_leaderboard = self.get_leaderboard(limit=1000) # Get more to find rank accurately # Find user's rank rank = None for idx, entry in enumerate(full_leaderboard, start=1): if entry['user_id'] == user_id: rank = idx break if rank is None: return None return { 'rank': rank, 'accuracy': round(accuracy, 1), 'predictions_count': predictions_count }