""" Database connection and session management Supports both PostgreSQL (production) and SQLite (development/testing) Async SQLAlchemy 2.0 implementation """ import os import logging from typing import AsyncGenerator, Optional from contextlib import asynccontextmanager from sqlalchemy import text, create_engine from sqlalchemy.orm import sessionmaker, Session from sqlalchemy.ext.asyncio import ( create_async_engine, AsyncSession, async_sessionmaker, AsyncEngine, ) from sqlalchemy.pool import QueuePool, StaticPool from contextlib import contextmanager from database.utils import convert_to_async_url logger = logging.getLogger(__name__) DATABASE_URL = os.getenv("DATABASE_URL", "") _is_sqlite = DATABASE_URL.startswith("sqlite") if DATABASE_URL else True _is_postgres = DATABASE_URL.startswith("postgres") if DATABASE_URL else False if DATABASE_URL and _is_postgres: async_database_url = convert_to_async_url(DATABASE_URL) engine: AsyncEngine = create_async_engine( async_database_url, poolclass=QueuePool, pool_size=5, max_overflow=10, pool_timeout=30, pool_recycle=1800, pool_pre_ping=True, echo=os.getenv("DATABASE_ECHO", "false").lower() == "true", ) logger.info("✅ Database configured with PostgreSQL (async)") else: sqlite_path = os.getenv("SQLITE_PATH", "data/translate.db") os.makedirs( os.path.dirname(sqlite_path) if os.path.dirname(sqlite_path) else ".", exist_ok=True, ) async_database_url = f"sqlite+aiosqlite:///./{sqlite_path}" engine: AsyncEngine = create_async_engine( async_database_url, connect_args={"check_same_thread": False}, poolclass=StaticPool, echo=os.getenv("DATABASE_ECHO", "false").lower() == "true", ) if not DATABASE_URL: logger.warning("⚠️ DATABASE_URL not set, using SQLite for development (async)") else: logger.info(f"✅ Database configured with SQLite: {sqlite_path} (async)") # Sync engine and session for repositories (auth, translation log). # Kept for backward compatibility until all callers use async; see story 1-1. # Prefer get_db() / AsyncSessionLocal for new code. if DATABASE_URL and _is_postgres: # Pour le sync engine, utiliser psycopg2 (pas asyncpg qui ne supporte pas les appels synchrones) sync_url = DATABASE_URL.replace("+asyncpg", "") sync_engine = create_engine( sync_url, poolclass=QueuePool, pool_size=5, max_overflow=10, pool_pre_ping=True, echo=os.getenv("DATABASE_ECHO", "false").lower() == "true", ) else: _sqlite_path = os.getenv("SQLITE_PATH", "data/translate.db") _sync_sqlite_url = f"sqlite:///./{_sqlite_path}" sync_engine = create_engine( _sync_sqlite_url, connect_args={"check_same_thread": False}, poolclass=StaticPool, ) SyncSessionLocal = sessionmaker(bind=sync_engine, autocommit=False, autoflush=False, expire_on_commit=False) @contextmanager def get_sync_session(): """Sync session context manager for use with sync repositories (auth_service, translation log).""" session = SyncSessionLocal() try: yield session session.commit() except Exception: session.rollback() raise finally: session.close() AsyncSessionLocal = async_sessionmaker( bind=engine, class_=AsyncSession, autocommit=False, autoflush=False, expire_on_commit=False, ) async def get_db() -> AsyncGenerator[AsyncSession, None]: """ Async dependency for FastAPI to get database session. Usage: db: AsyncSession = Depends(get_db) """ async with AsyncSessionLocal() as session: try: yield session finally: await session.close() @asynccontextmanager async def get_db_session() -> AsyncGenerator[AsyncSession, None]: """ Async context manager for database session. Usage: async with get_db_session() as db: ... """ async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raise get_async_session = get_db_session async def init_db(): """ Initialize database tables asynchronously. Call this on application startup. """ from database.models import Base async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) logger.info("✅ Database tables initialized (async)") async def check_db_connection() -> bool: """ Check if database connection is healthy. Returns True if connection works, False otherwise. """ try: async with engine.connect() as conn: await conn.execute(text("SELECT 1")) return True except Exception as e: logger.error(f"Database connection check failed: {e}") return False def get_pool_stats() -> dict: """Get database connection pool statistics""" if hasattr(engine.pool, "status"): return { "pool_size": engine.pool.size(), "checked_in": engine.pool.checkedin(), "checked_out": engine.pool.checkedout(), "overflow": engine.pool.overflow(), } return {"status": "pool stats not available"} def get_engine() -> AsyncEngine: """Get the async engine instance""" return engine