""" Tests for glossary CRUD endpoints. Story 3.9: Glossaires - Endpoint CRUD """ import pytest import jwt from datetime import datetime, timezone from pathlib import Path from fastapi.testclient import TestClient from database.connection import get_sync_session from database.models import Glossary, GlossaryTerm REGISTER_URL = "/api/v1/auth/register" LOGIN_URL = "/api/v1/auth/login" GLOSSARIES_URL = "/api/v1/glossaries" @pytest.fixture def users_file(tmp_path: Path) -> Path: return tmp_path / "users.json" @pytest.fixture def client(users_file: Path, monkeypatch): """TestClient with JSON auth and rate limiting disabled.""" import services.auth_service as auth_svc from middleware.rate_limiting import RateLimitManager monkeypatch.setattr(auth_svc, "USERS_FILE", users_file) monkeypatch.setattr(auth_svc, "USE_DATABASE", False) monkeypatch.setattr(auth_svc, "DATABASE_AVAILABLE", False) async def _check_request_allow(self, request): return True, "ok", "test" async def _check_translation_allow(self, request, file_size_mb=0): return True, "ok" monkeypatch.setattr(RateLimitManager, "check_request", _check_request_allow) monkeypatch.setattr(RateLimitManager, "check_translation", _check_translation_allow) from main import app return TestClient(app, raise_server_exceptions=True) @pytest.fixture def pro_user_token(client, monkeypatch): """Create a Pro user and return auth token.""" import services.auth_service as auth_svc if not auth_svc.JWT_AVAILABLE: pytest.skip("PyJWT non disponible dans cet environnement") email = "pro@test.com" password = "Password123!" client.post( REGISTER_URL, json={"email": email, "password": password, "name": "Pro User"} ) r = client.post(LOGIN_URL, json={"email": email, "password": password}) assert r.status_code == 200, r.text token = r.json()["data"]["access_token"] payload = jwt.decode(token, auth_svc.SECRET_KEY, algorithms=["HS256"]) user_id = payload["sub"] users = auth_svc.load_users() if user_id in users: users[user_id]["plan"] = "pro" auth_svc.save_users(users) return token, user_id @pytest.fixture def free_user_token(client): """Create a Free user and return auth token.""" import services.auth_service as auth_svc if not auth_svc.JWT_AVAILABLE: pytest.skip("PyJWT non disponible dans cet environnement") email = "free@test.com" password = "Password123!" client.post( REGISTER_URL, json={"email": email, "password": password, "name": "Free User"} ) r = client.post(LOGIN_URL, json={"email": email, "password": password}) assert r.status_code == 200, r.text token = r.json()["data"]["access_token"] payload = jwt.decode(token, auth_svc.SECRET_KEY, algorithms=["HS256"]) user_id = payload["sub"] return token, user_id def _auth_header(token): return {"Authorization": f"Bearer {token}"} class TestGlossaryCRUD: """Tests for glossary CRUD operations.""" def test_create_glossary_pro_user(self, client, pro_user_token): """Pro user can create a glossary.""" token, _ = pro_user_token response = client.post( GLOSSARIES_URL, json={ "name": "Test Glossary", "terms": [ {"source": "hello", "target": "bonjour"}, {"source": "world", "target": "monde"}, ], }, headers=_auth_header(token), ) assert response.status_code == 201 data = response.json()["data"] assert data["name"] == "Test Glossary" assert len(data["terms"]) == 2 assert data["terms"][0]["source"] == "hello" assert data["terms"][0]["target"] == "bonjour" def test_create_glossary_free_user_forbidden(self, client, free_user_token): """Free user cannot create glossaries.""" token, _ = free_user_token response = client.post( GLOSSARIES_URL, json={"name": "Test", "terms": []}, headers=_auth_header(token), ) assert response.status_code == 403 data = response.json() assert data["error"] == "PRO_FEATURE_REQUIRED" def test_create_glossary_unauthorized(self, client): """Unauthorized user cannot create glossaries.""" response = client.post( GLOSSARIES_URL, json={"name": "Test", "terms": []}, ) assert response.status_code == 401 def test_list_glossaries(self, client, pro_user_token): """Pro user can list their glossaries.""" token, user_id = pro_user_token with get_sync_session() as session: for i in range(3): glossary = Glossary( user_id=user_id, name=f"Glossary {i}", created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) session.add(glossary) session.commit() response = client.get(GLOSSARIES_URL, headers=_auth_header(token)) assert response.status_code == 200 data = response.json() assert len(data["data"]) == 3 assert data["meta"]["total"] == 3 def test_list_glossaries_free_user_forbidden(self, client, free_user_token): """Free user cannot list glossaries.""" token, _ = free_user_token response = client.get(GLOSSARIES_URL, headers=_auth_header(token)) assert response.status_code == 403 def test_get_glossary(self, client, pro_user_token): """Pro user can get a specific glossary.""" token, user_id = pro_user_token with get_sync_session() as session: glossary = Glossary( user_id=user_id, name="Test Glossary", created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) term = GlossaryTerm( glossary=glossary, source="hello", target="bonjour", created_at=datetime.now(timezone.utc), ) session.add(glossary) session.add(term) session.commit() glossary_id = glossary.id response = client.get( f"{GLOSSARIES_URL}/{glossary_id}", headers=_auth_header(token) ) assert response.status_code == 200 data = response.json()["data"] assert data["name"] == "Test Glossary" assert len(data["terms"]) == 1 assert data["terms"][0]["source"] == "hello" def test_get_glossary_not_owner(self, client, monkeypatch): """User cannot access another user's glossary.""" import services.auth_service as auth_svc if not auth_svc.JWT_AVAILABLE: pytest.skip("PyJWT non disponible") # Create user 1 (Pro) email1 = "pro1@test.com" password1 = "Password123!" client.post( REGISTER_URL, json={"email": email1, "password": password1, "name": "Pro User 1"}, ) r1 = client.post(LOGIN_URL, json={"email": email1, "password": password1}) token1 = r1.json()["data"]["access_token"] payload1 = jwt.decode(token1, auth_svc.SECRET_KEY, algorithms=["HS256"]) user_id1 = payload1["sub"] users = auth_svc.load_users() if user_id1 in users: users[user_id1]["plan"] = "pro" auth_svc.save_users(users) # Create user 2 (Pro) email2 = "pro2@test.com" password2 = "Password123!" client.post( REGISTER_URL, json={"email": email2, "password": password2, "name": "Pro User 2"}, ) r2 = client.post(LOGIN_URL, json={"email": email2, "password": password2}) token2 = r2.json()["data"]["access_token"] payload2 = jwt.decode(token2, auth_svc.SECRET_KEY, algorithms=["HS256"]) user_id2 = payload2["sub"] users = auth_svc.load_users() if user_id2 in users: users[user_id2]["plan"] = "pro" auth_svc.save_users(users) # Create glossary as user 1 with get_sync_session() as session: glossary = Glossary( user_id=user_id1, name="User 1 Glossary", created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) session.add(glossary) session.commit() glossary_id = glossary.id # Try to access as user 2 response = client.get( f"{GLOSSARIES_URL}/{glossary_id}", headers=_auth_header(token2) ) assert response.status_code == 404 assert response.json()["error"] == "GLOSSARY_NOT_FOUND" def test_update_glossary(self, client, pro_user_token): """Pro user can update their glossary.""" token, user_id = pro_user_token with get_sync_session() as session: glossary = Glossary( user_id=user_id, name="Original Name", created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) session.add(glossary) session.commit() glossary_id = glossary.id response = client.patch( f"{GLOSSARIES_URL}/{glossary_id}", json={"name": "Updated Name"}, headers=_auth_header(token), ) assert response.status_code == 200 assert response.json()["data"]["name"] == "Updated Name" def test_update_glossary_terms(self, client, pro_user_token): """Pro user can update glossary terms.""" token, user_id = pro_user_token with get_sync_session() as session: glossary = Glossary( user_id=user_id, name="Test Glossary", created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) term = GlossaryTerm( glossary=glossary, source="old", target="vieux", created_at=datetime.now(timezone.utc), ) session.add(glossary) session.add(term) session.commit() glossary_id = glossary.id response = client.patch( f"{GLOSSARIES_URL}/{glossary_id}", json={ "terms": [ {"source": "new", "target": "nouveau"}, ] }, headers=_auth_header(token), ) assert response.status_code == 200 data = response.json()["data"] assert len(data["terms"]) == 1 assert data["terms"][0]["source"] == "new" def test_delete_glossary(self, client, pro_user_token): """Pro user can delete their glossary.""" token, user_id = pro_user_token with get_sync_session() as session: glossary = Glossary( user_id=user_id, name="To Delete", created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) session.add(glossary) session.commit() glossary_id = glossary.id response = client.delete( f"{GLOSSARIES_URL}/{glossary_id}", headers=_auth_header(token) ) assert response.status_code == 204 response = client.get( f"{GLOSSARIES_URL}/{glossary_id}", headers=_auth_header(token) ) assert response.status_code == 404 def test_delete_glossary_cascades_terms(self, client, pro_user_token): """Deleting a glossary should delete all its terms.""" token, user_id = pro_user_token with get_sync_session() as session: glossary = Glossary( user_id=user_id, name="To Delete", created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) term = GlossaryTerm( glossary=glossary, source="hello", target="bonjour", created_at=datetime.now(timezone.utc), ) session.add(glossary) session.add(term) session.commit() glossary_id = glossary.id response = client.delete( f"{GLOSSARIES_URL}/{glossary_id}", headers=_auth_header(token) ) assert response.status_code == 204 with get_sync_session() as session: remaining_terms = ( session.query(GlossaryTerm) .filter(GlossaryTerm.glossary_id == glossary_id) .count() ) assert remaining_terms == 0 def test_create_glossary_with_empty_terms(self, client, pro_user_token): """Pro user can create a glossary with no terms.""" token, _ = pro_user_token response = client.post( GLOSSARIES_URL, json={"name": "Empty Glossary", "terms": []}, headers=_auth_header(token), ) assert response.status_code == 201 data = response.json()["data"] assert data["name"] == "Empty Glossary" assert len(data["terms"]) == 0 def test_invalid_glossary_id_format(self, client, pro_user_token): """Invalid glossary ID format returns 400.""" token, _ = pro_user_token response = client.get( f"{GLOSSARIES_URL}/invalid-uuid", headers=_auth_header(token) ) assert response.status_code == 400 assert response.json()["error"] == "INVALID_GLOSSARY_ID"