Major changes across backend, frontend, infrastructure: - Provider system with model selection (Google, DeepL, OpenAI, Ollama, Google Cloud) - Admin panel: user management, pricing, settings - Glossary system with CSV import/export - Subscription and tier quota management - Security hardening (rate limiting, API key auth, path traversal fixes) - Docker compose for dev, prod, and IONOS deployment - Alembic migrations for new tables - Frontend: dashboard, pricing page, landing page, i18n (en/fr) - Test suite and verification scripts Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
143 lines
5.0 KiB
Python
143 lines
5.0 KiB
Python
"""
|
|
Tests for Alembic async support and env - AC3, AC5, and migration integration (M3).
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
import pytest
|
|
|
|
|
|
# Project root (parent of tests/)
|
|
PROJECT_ROOT = Path(__file__).resolve().parent.parent
|
|
|
|
|
|
class TestAlembicAsyncConfig:
|
|
"""AC3: Alembic configured for async migrations."""
|
|
|
|
def test_env_has_async_migrations_runner(self):
|
|
"""run_async_migrations and run_migrations_online exist and are callable."""
|
|
# Load project's alembic/env.py (not the alembic package) and inspect
|
|
env_py = PROJECT_ROOT / "alembic" / "env.py"
|
|
assert env_py.exists(), "alembic/env.py not found"
|
|
code = """
|
|
import asyncio
|
|
import sys
|
|
import importlib.util
|
|
sys.path.insert(0, %r)
|
|
spec = importlib.util.spec_from_file_location("env", %r)
|
|
e = importlib.util.module_from_spec(spec)
|
|
# Avoid running migrations on load: spec.loader.exec_module(e) would run env
|
|
# So we only check that the file defines the symbols (compile + ast or exec with mock)
|
|
with open(%r) as f:
|
|
src = f.read()
|
|
assert "run_async_migrations" in src and "async def run_async_migrations" in src
|
|
assert "run_migrations_online" in src and "asyncio.run" in src
|
|
assert "create_async_engine" in src
|
|
print("ok")
|
|
""" % (str(PROJECT_ROOT), str(env_py), str(env_py))
|
|
result = subprocess.run(
|
|
[sys.executable, "-c", code],
|
|
cwd=PROJECT_ROOT,
|
|
env=os.environ.copy(),
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=15,
|
|
)
|
|
assert result.returncode == 0, (result.stderr or result.stdout or "subprocess failed")
|
|
assert "ok" in (result.stdout or "")
|
|
|
|
def test_env_uses_convert_to_async_url(self):
|
|
"""Alembic env uses shared convert_to_async_url for async URL."""
|
|
from database.utils import convert_to_async_url
|
|
|
|
# Just ensure the helper is used by env (env imports it)
|
|
assert callable(convert_to_async_url)
|
|
assert "asyncpg" in convert_to_async_url("postgresql://localhost/db")
|
|
assert "aiosqlite" in convert_to_async_url("sqlite:///./foo.db")
|
|
|
|
|
|
class TestSecretsFromEnvironment:
|
|
"""AC5: All secrets (e.g. DATABASE_URL) loaded from environment."""
|
|
|
|
def test_alembic_env_reads_database_url_from_env(self):
|
|
"""Alembic env.py reads DATABASE_URL from os.getenv."""
|
|
env_py = PROJECT_ROOT / "alembic" / "env.py"
|
|
code = """
|
|
with open(%r) as f:
|
|
src = f.read()
|
|
assert "os.getenv" in src and "DATABASE_URL" in src
|
|
assert "SQLITE_PATH" in src or "sqlite" in src.lower()
|
|
print("ok")
|
|
""" % str(env_py)
|
|
result = subprocess.run(
|
|
[sys.executable, "-c", code],
|
|
cwd=PROJECT_ROOT,
|
|
env=os.environ.copy(),
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=15,
|
|
)
|
|
assert result.returncode == 0, (result.stderr or result.stdout or "subprocess failed")
|
|
|
|
def test_connection_module_uses_env_for_url(self):
|
|
"""database.connection uses os.getenv for DATABASE_URL."""
|
|
import database.connection as conn_module
|
|
|
|
# The module reads DATABASE_URL at import time
|
|
assert hasattr(conn_module, "DATABASE_URL")
|
|
assert isinstance(conn_module.DATABASE_URL, str) or conn_module.DATABASE_URL == ""
|
|
|
|
|
|
class TestAlembicMigrationIntegration:
|
|
"""M3: Integration test - alembic upgrade head and downgrade -1."""
|
|
|
|
@pytest.fixture
|
|
def temp_db_env(self, tmp_path):
|
|
"""Set env to use a temp SQLite file for migrations (absolute path)."""
|
|
db_file = tmp_path / "test_migration.db"
|
|
# SQLite needs absolute path when cwd differs; use 3 slashes + abs path
|
|
url = "sqlite:///" + str(db_file.resolve()).replace("\\", "/")
|
|
env = os.environ.copy()
|
|
env["DATABASE_URL"] = url
|
|
return env
|
|
|
|
def test_alembic_upgrade_head_succeeds(self, temp_db_env):
|
|
"""Task 6.1: alembic upgrade head runs successfully."""
|
|
result = subprocess.run(
|
|
[sys.executable, "-m", "alembic", "upgrade", "head"],
|
|
cwd=PROJECT_ROOT,
|
|
env=temp_db_env,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30,
|
|
)
|
|
assert result.returncode == 0, (
|
|
f"alembic upgrade head failed: {result.stderr or result.stdout}"
|
|
)
|
|
|
|
def test_alembic_downgrade_one_succeeds(self, temp_db_env):
|
|
"""Task 6.2: alembic downgrade -1 runs after upgrade."""
|
|
# First upgrade to head
|
|
subprocess.run(
|
|
[sys.executable, "-m", "alembic", "upgrade", "head"],
|
|
cwd=PROJECT_ROOT,
|
|
env=temp_db_env,
|
|
capture_output=True,
|
|
timeout=30,
|
|
)
|
|
# Then downgrade one revision
|
|
result = subprocess.run(
|
|
[sys.executable, "-m", "alembic", "downgrade", "-1"],
|
|
cwd=PROJECT_ROOT,
|
|
env=temp_db_env,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30,
|
|
)
|
|
assert result.returncode == 0, (
|
|
f"alembic downgrade -1 failed: {result.stderr or result.stdout}"
|
|
)
|