All checks were successful
Deploy to Production / Build and Deploy (push) Successful in 2m34s
426 lines
14 KiB
Python
426 lines
14 KiB
Python
"""
|
|
Tests for glossary CRUD endpoints.
|
|
Story 3.9: Glossaires - Endpoint CRUD
|
|
"""
|
|
|
|
import pytest
|
|
import jwt
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from fastapi.testclient import TestClient
|
|
|
|
from database.connection import get_sync_session
|
|
from database.models import Glossary, GlossaryTerm
|
|
|
|
REGISTER_URL = "/api/v1/auth/register"
|
|
LOGIN_URL = "/api/v1/auth/login"
|
|
GLOSSARIES_URL = "/api/v1/glossaries"
|
|
|
|
|
|
@pytest.fixture
|
|
def users_file(tmp_path: Path) -> Path:
|
|
return tmp_path / "users.json"
|
|
|
|
|
|
@pytest.fixture
|
|
def client(users_file: Path, monkeypatch):
|
|
"""TestClient with JSON auth and rate limiting disabled."""
|
|
import services.auth_service as auth_svc
|
|
from middleware.rate_limiting import RateLimitManager
|
|
|
|
monkeypatch.setattr(auth_svc, "USERS_FILE", users_file)
|
|
monkeypatch.setattr(auth_svc, "USE_DATABASE", False)
|
|
monkeypatch.setattr(auth_svc, "DATABASE_AVAILABLE", False)
|
|
|
|
async def _check_request_allow(self, request):
|
|
return True, "ok", "test"
|
|
|
|
async def _check_translation_allow(self, request, file_size_mb=0):
|
|
return True, "ok"
|
|
|
|
monkeypatch.setattr(RateLimitManager, "check_request", _check_request_allow)
|
|
monkeypatch.setattr(RateLimitManager, "check_translation", _check_translation_allow)
|
|
|
|
from main import app
|
|
|
|
return TestClient(app, raise_server_exceptions=True)
|
|
|
|
|
|
@pytest.fixture
|
|
def pro_user_token(client, monkeypatch):
|
|
"""Create a Pro user and return auth token."""
|
|
import services.auth_service as auth_svc
|
|
|
|
if not auth_svc.JWT_AVAILABLE:
|
|
pytest.skip("PyJWT non disponible dans cet environnement")
|
|
|
|
email = "pro@test.com"
|
|
password = "Password123!"
|
|
|
|
client.post(
|
|
REGISTER_URL, json={"email": email, "password": password, "name": "Pro User"}
|
|
)
|
|
|
|
r = client.post(LOGIN_URL, json={"email": email, "password": password})
|
|
assert r.status_code == 200, r.text
|
|
token = r.json()["data"]["access_token"]
|
|
payload = jwt.decode(token, auth_svc.SECRET_KEY, algorithms=["HS256"])
|
|
user_id = payload["sub"]
|
|
|
|
users = auth_svc.load_users()
|
|
if user_id in users:
|
|
users[user_id]["plan"] = "pro"
|
|
users[user_id]["tier"] = "pro"
|
|
auth_svc.save_users(users)
|
|
|
|
return token, user_id
|
|
|
|
|
|
@pytest.fixture
|
|
def free_user_token(client):
|
|
"""Create a Free user and return auth token."""
|
|
import services.auth_service as auth_svc
|
|
|
|
if not auth_svc.JWT_AVAILABLE:
|
|
pytest.skip("PyJWT non disponible dans cet environnement")
|
|
|
|
email = "free@test.com"
|
|
password = "Password123!"
|
|
|
|
client.post(
|
|
REGISTER_URL, json={"email": email, "password": password, "name": "Free User"}
|
|
)
|
|
|
|
r = client.post(LOGIN_URL, json={"email": email, "password": password})
|
|
assert r.status_code == 200, r.text
|
|
token = r.json()["data"]["access_token"]
|
|
payload = jwt.decode(token, auth_svc.SECRET_KEY, algorithms=["HS256"])
|
|
user_id = payload["sub"]
|
|
return token, user_id
|
|
|
|
|
|
def _auth_header(token):
|
|
return {"Authorization": f"Bearer {token}"}
|
|
|
|
|
|
class TestGlossaryCRUD:
|
|
"""Tests for glossary CRUD operations."""
|
|
|
|
def test_create_glossary_pro_user(self, client, pro_user_token):
|
|
"""Pro user can create a glossary."""
|
|
token, _ = pro_user_token
|
|
|
|
response = client.post(
|
|
GLOSSARIES_URL,
|
|
json={
|
|
"name": "Test Glossary",
|
|
"terms": [
|
|
{"source": "hello", "target": "bonjour"},
|
|
{"source": "world", "target": "monde"},
|
|
],
|
|
},
|
|
headers=_auth_header(token),
|
|
)
|
|
|
|
assert response.status_code == 201
|
|
data = response.json()["data"]
|
|
assert data["name"] == "Test Glossary"
|
|
assert len(data["terms"]) == 2
|
|
assert data["terms"][0]["source"] == "hello"
|
|
assert data["terms"][0]["target"] == "bonjour"
|
|
|
|
def test_create_glossary_free_user_forbidden(self, client, free_user_token):
|
|
"""Free user cannot create glossaries."""
|
|
token, _ = free_user_token
|
|
|
|
response = client.post(
|
|
GLOSSARIES_URL,
|
|
json={"name": "Test", "terms": []},
|
|
headers=_auth_header(token),
|
|
)
|
|
|
|
assert response.status_code == 403
|
|
data = response.json()
|
|
assert data["error"] == "PRO_FEATURE_REQUIRED"
|
|
|
|
def test_create_glossary_unauthorized(self, client):
|
|
"""Unauthorized user cannot create glossaries."""
|
|
response = client.post(
|
|
GLOSSARIES_URL,
|
|
json={"name": "Test", "terms": []},
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
|
|
def test_list_glossaries(self, client, pro_user_token):
|
|
"""Pro user can list their glossaries."""
|
|
token, user_id = pro_user_token
|
|
|
|
with get_sync_session() as session:
|
|
for i in range(3):
|
|
glossary = Glossary(
|
|
user_id=user_id,
|
|
name=f"Glossary {i}",
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc),
|
|
)
|
|
session.add(glossary)
|
|
session.commit()
|
|
|
|
response = client.get(GLOSSARIES_URL, headers=_auth_header(token))
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data["data"]) == 3
|
|
assert data["meta"]["total"] == 3
|
|
|
|
def test_list_glossaries_free_user_forbidden(self, client, free_user_token):
|
|
"""Free user cannot list glossaries."""
|
|
token, _ = free_user_token
|
|
|
|
response = client.get(GLOSSARIES_URL, headers=_auth_header(token))
|
|
|
|
assert response.status_code == 403
|
|
|
|
def test_get_glossary(self, client, pro_user_token):
|
|
"""Pro user can get a specific glossary."""
|
|
token, user_id = pro_user_token
|
|
|
|
with get_sync_session() as session:
|
|
glossary = Glossary(
|
|
user_id=user_id,
|
|
name="Test Glossary",
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc),
|
|
)
|
|
term = GlossaryTerm(
|
|
glossary=glossary,
|
|
source="hello",
|
|
target="bonjour",
|
|
created_at=datetime.now(timezone.utc),
|
|
)
|
|
session.add(glossary)
|
|
session.add(term)
|
|
session.commit()
|
|
glossary_id = glossary.id
|
|
|
|
response = client.get(
|
|
f"{GLOSSARIES_URL}/{glossary_id}", headers=_auth_header(token)
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()["data"]
|
|
assert data["name"] == "Test Glossary"
|
|
assert len(data["terms"]) == 1
|
|
assert data["terms"][0]["source"] == "hello"
|
|
|
|
def test_get_glossary_not_owner(self, client, monkeypatch):
|
|
"""User cannot access another user's glossary."""
|
|
import services.auth_service as auth_svc
|
|
|
|
if not auth_svc.JWT_AVAILABLE:
|
|
pytest.skip("PyJWT non disponible")
|
|
|
|
# Create user 1 (Pro)
|
|
email1 = "pro1@test.com"
|
|
password1 = "Password123!"
|
|
client.post(
|
|
REGISTER_URL,
|
|
json={"email": email1, "password": password1, "name": "Pro User 1"},
|
|
)
|
|
r1 = client.post(LOGIN_URL, json={"email": email1, "password": password1})
|
|
token1 = r1.json()["data"]["access_token"]
|
|
payload1 = jwt.decode(token1, auth_svc.SECRET_KEY, algorithms=["HS256"])
|
|
user_id1 = payload1["sub"]
|
|
users = auth_svc.load_users()
|
|
if user_id1 in users:
|
|
users[user_id1]["plan"] = "pro"
|
|
users[user_id1]["tier"] = "pro"
|
|
auth_svc.save_users(users)
|
|
|
|
# Create user 2 (Pro)
|
|
email2 = "pro2@test.com"
|
|
password2 = "Password123!"
|
|
client.post(
|
|
REGISTER_URL,
|
|
json={"email": email2, "password": password2, "name": "Pro User 2"},
|
|
)
|
|
r2 = client.post(LOGIN_URL, json={"email": email2, "password": password2})
|
|
token2 = r2.json()["data"]["access_token"]
|
|
payload2 = jwt.decode(token2, auth_svc.SECRET_KEY, algorithms=["HS256"])
|
|
user_id2 = payload2["sub"]
|
|
users = auth_svc.load_users()
|
|
if user_id2 in users:
|
|
users[user_id2]["plan"] = "pro"
|
|
users[user_id2]["tier"] = "pro"
|
|
auth_svc.save_users(users)
|
|
|
|
# Create glossary as user 1
|
|
with get_sync_session() as session:
|
|
glossary = Glossary(
|
|
user_id=user_id1,
|
|
name="User 1 Glossary",
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc),
|
|
)
|
|
session.add(glossary)
|
|
session.commit()
|
|
glossary_id = glossary.id
|
|
|
|
# Try to access as user 2
|
|
response = client.get(
|
|
f"{GLOSSARIES_URL}/{glossary_id}", headers=_auth_header(token2)
|
|
)
|
|
|
|
assert response.status_code == 404
|
|
assert response.json()["error"] == "GLOSSARY_NOT_FOUND"
|
|
|
|
def test_update_glossary(self, client, pro_user_token):
|
|
"""Pro user can update their glossary."""
|
|
token, user_id = pro_user_token
|
|
|
|
with get_sync_session() as session:
|
|
glossary = Glossary(
|
|
user_id=user_id,
|
|
name="Original Name",
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc),
|
|
)
|
|
session.add(glossary)
|
|
session.commit()
|
|
glossary_id = glossary.id
|
|
|
|
response = client.patch(
|
|
f"{GLOSSARIES_URL}/{glossary_id}",
|
|
json={"name": "Updated Name"},
|
|
headers=_auth_header(token),
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert response.json()["data"]["name"] == "Updated Name"
|
|
|
|
def test_update_glossary_terms(self, client, pro_user_token):
|
|
"""Pro user can update glossary terms."""
|
|
token, user_id = pro_user_token
|
|
|
|
with get_sync_session() as session:
|
|
glossary = Glossary(
|
|
user_id=user_id,
|
|
name="Test Glossary",
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc),
|
|
)
|
|
term = GlossaryTerm(
|
|
glossary=glossary,
|
|
source="old",
|
|
target="vieux",
|
|
created_at=datetime.now(timezone.utc),
|
|
)
|
|
session.add(glossary)
|
|
session.add(term)
|
|
session.commit()
|
|
glossary_id = glossary.id
|
|
|
|
response = client.patch(
|
|
f"{GLOSSARIES_URL}/{glossary_id}",
|
|
json={
|
|
"terms": [
|
|
{"source": "new", "target": "nouveau"},
|
|
]
|
|
},
|
|
headers=_auth_header(token),
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()["data"]
|
|
assert len(data["terms"]) == 1
|
|
assert data["terms"][0]["source"] == "new"
|
|
|
|
def test_delete_glossary(self, client, pro_user_token):
|
|
"""Pro user can delete their glossary."""
|
|
token, user_id = pro_user_token
|
|
|
|
with get_sync_session() as session:
|
|
glossary = Glossary(
|
|
user_id=user_id,
|
|
name="To Delete",
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc),
|
|
)
|
|
session.add(glossary)
|
|
session.commit()
|
|
glossary_id = glossary.id
|
|
|
|
response = client.delete(
|
|
f"{GLOSSARIES_URL}/{glossary_id}", headers=_auth_header(token)
|
|
)
|
|
|
|
assert response.status_code == 204
|
|
|
|
response = client.get(
|
|
f"{GLOSSARIES_URL}/{glossary_id}", headers=_auth_header(token)
|
|
)
|
|
assert response.status_code == 404
|
|
|
|
def test_delete_glossary_cascades_terms(self, client, pro_user_token):
|
|
"""Deleting a glossary should delete all its terms."""
|
|
token, user_id = pro_user_token
|
|
|
|
with get_sync_session() as session:
|
|
glossary = Glossary(
|
|
user_id=user_id,
|
|
name="To Delete",
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc),
|
|
)
|
|
term = GlossaryTerm(
|
|
glossary=glossary,
|
|
source="hello",
|
|
target="bonjour",
|
|
created_at=datetime.now(timezone.utc),
|
|
)
|
|
session.add(glossary)
|
|
session.add(term)
|
|
session.commit()
|
|
glossary_id = glossary.id
|
|
|
|
response = client.delete(
|
|
f"{GLOSSARIES_URL}/{glossary_id}", headers=_auth_header(token)
|
|
)
|
|
assert response.status_code == 204
|
|
|
|
with get_sync_session() as session:
|
|
remaining_terms = (
|
|
session.query(GlossaryTerm)
|
|
.filter(GlossaryTerm.glossary_id == glossary_id)
|
|
.count()
|
|
)
|
|
assert remaining_terms == 0
|
|
|
|
def test_create_glossary_with_empty_terms(self, client, pro_user_token):
|
|
"""Pro user can create a glossary with no terms."""
|
|
token, _ = pro_user_token
|
|
|
|
response = client.post(
|
|
GLOSSARIES_URL,
|
|
json={"name": "Empty Glossary", "terms": []},
|
|
headers=_auth_header(token),
|
|
)
|
|
|
|
assert response.status_code == 201
|
|
data = response.json()["data"]
|
|
assert data["name"] == "Empty Glossary"
|
|
assert len(data["terms"]) == 0
|
|
|
|
def test_invalid_glossary_id_format(self, client, pro_user_token):
|
|
"""Invalid glossary ID format returns 400."""
|
|
token, _ = pro_user_token
|
|
|
|
response = client.get(
|
|
f"{GLOSSARIES_URL}/invalid-uuid", headers=_auth_header(token)
|
|
)
|
|
|
|
assert response.status_code == 400
|
|
assert response.json()["error"] == "INVALID_GLOSSARY_ID"
|