Files
office_translator/tests/test_glossaries.py
Sepehr Ramezani 26bd096a06 feat: production deployment - full update with providers, admin, glossaries, pricing, tests
Major changes across backend, frontend, infrastructure:
- Provider system with model selection (Google, DeepL, OpenAI, Ollama, Google Cloud)
- Admin panel: user management, pricing, settings
- Glossary system with CSV import/export
- Subscription and tier quota management
- Security hardening (rate limiting, API key auth, path traversal fixes)
- Docker compose for dev, prod, and IONOS deployment
- Alembic migrations for new tables
- Frontend: dashboard, pricing page, landing page, i18n (en/fr)
- Test suite and verification scripts

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-04-25 15:01:47 +02:00

423 lines
14 KiB
Python

"""
Tests for glossary CRUD endpoints.
Story 3.9: Glossaires - Endpoint CRUD
"""
import pytest
import jwt
from datetime import datetime, timezone
from pathlib import Path
from fastapi.testclient import TestClient
from database.connection import get_sync_session
from database.models import Glossary, GlossaryTerm
REGISTER_URL = "/api/v1/auth/register"
LOGIN_URL = "/api/v1/auth/login"
GLOSSARIES_URL = "/api/v1/glossaries"
@pytest.fixture
def users_file(tmp_path: Path) -> Path:
return tmp_path / "users.json"
@pytest.fixture
def client(users_file: Path, monkeypatch):
"""TestClient with JSON auth and rate limiting disabled."""
import services.auth_service as auth_svc
from middleware.rate_limiting import RateLimitManager
monkeypatch.setattr(auth_svc, "USERS_FILE", users_file)
monkeypatch.setattr(auth_svc, "USE_DATABASE", False)
monkeypatch.setattr(auth_svc, "DATABASE_AVAILABLE", False)
async def _check_request_allow(self, request):
return True, "ok", "test"
async def _check_translation_allow(self, request, file_size_mb=0):
return True, "ok"
monkeypatch.setattr(RateLimitManager, "check_request", _check_request_allow)
monkeypatch.setattr(RateLimitManager, "check_translation", _check_translation_allow)
from main import app
return TestClient(app, raise_server_exceptions=True)
@pytest.fixture
def pro_user_token(client, monkeypatch):
"""Create a Pro user and return auth token."""
import services.auth_service as auth_svc
if not auth_svc.JWT_AVAILABLE:
pytest.skip("PyJWT non disponible dans cet environnement")
email = "pro@test.com"
password = "Password123!"
client.post(
REGISTER_URL, json={"email": email, "password": password, "name": "Pro User"}
)
r = client.post(LOGIN_URL, json={"email": email, "password": password})
assert r.status_code == 200, r.text
token = r.json()["data"]["access_token"]
payload = jwt.decode(token, auth_svc.SECRET_KEY, algorithms=["HS256"])
user_id = payload["sub"]
users = auth_svc.load_users()
if user_id in users:
users[user_id]["plan"] = "pro"
auth_svc.save_users(users)
return token, user_id
@pytest.fixture
def free_user_token(client):
"""Create a Free user and return auth token."""
import services.auth_service as auth_svc
if not auth_svc.JWT_AVAILABLE:
pytest.skip("PyJWT non disponible dans cet environnement")
email = "free@test.com"
password = "Password123!"
client.post(
REGISTER_URL, json={"email": email, "password": password, "name": "Free User"}
)
r = client.post(LOGIN_URL, json={"email": email, "password": password})
assert r.status_code == 200, r.text
token = r.json()["data"]["access_token"]
payload = jwt.decode(token, auth_svc.SECRET_KEY, algorithms=["HS256"])
user_id = payload["sub"]
return token, user_id
def _auth_header(token):
return {"Authorization": f"Bearer {token}"}
class TestGlossaryCRUD:
"""Tests for glossary CRUD operations."""
def test_create_glossary_pro_user(self, client, pro_user_token):
"""Pro user can create a glossary."""
token, _ = pro_user_token
response = client.post(
GLOSSARIES_URL,
json={
"name": "Test Glossary",
"terms": [
{"source": "hello", "target": "bonjour"},
{"source": "world", "target": "monde"},
],
},
headers=_auth_header(token),
)
assert response.status_code == 201
data = response.json()["data"]
assert data["name"] == "Test Glossary"
assert len(data["terms"]) == 2
assert data["terms"][0]["source"] == "hello"
assert data["terms"][0]["target"] == "bonjour"
def test_create_glossary_free_user_forbidden(self, client, free_user_token):
"""Free user cannot create glossaries."""
token, _ = free_user_token
response = client.post(
GLOSSARIES_URL,
json={"name": "Test", "terms": []},
headers=_auth_header(token),
)
assert response.status_code == 403
data = response.json()
assert data["error"] == "PRO_FEATURE_REQUIRED"
def test_create_glossary_unauthorized(self, client):
"""Unauthorized user cannot create glossaries."""
response = client.post(
GLOSSARIES_URL,
json={"name": "Test", "terms": []},
)
assert response.status_code == 401
def test_list_glossaries(self, client, pro_user_token):
"""Pro user can list their glossaries."""
token, user_id = pro_user_token
with get_sync_session() as session:
for i in range(3):
glossary = Glossary(
user_id=user_id,
name=f"Glossary {i}",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(glossary)
session.commit()
response = client.get(GLOSSARIES_URL, headers=_auth_header(token))
assert response.status_code == 200
data = response.json()
assert len(data["data"]) == 3
assert data["meta"]["total"] == 3
def test_list_glossaries_free_user_forbidden(self, client, free_user_token):
"""Free user cannot list glossaries."""
token, _ = free_user_token
response = client.get(GLOSSARIES_URL, headers=_auth_header(token))
assert response.status_code == 403
def test_get_glossary(self, client, pro_user_token):
"""Pro user can get a specific glossary."""
token, user_id = pro_user_token
with get_sync_session() as session:
glossary = Glossary(
user_id=user_id,
name="Test Glossary",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
term = GlossaryTerm(
glossary=glossary,
source="hello",
target="bonjour",
created_at=datetime.now(timezone.utc),
)
session.add(glossary)
session.add(term)
session.commit()
glossary_id = glossary.id
response = client.get(
f"{GLOSSARIES_URL}/{glossary_id}", headers=_auth_header(token)
)
assert response.status_code == 200
data = response.json()["data"]
assert data["name"] == "Test Glossary"
assert len(data["terms"]) == 1
assert data["terms"][0]["source"] == "hello"
def test_get_glossary_not_owner(self, client, monkeypatch):
"""User cannot access another user's glossary."""
import services.auth_service as auth_svc
if not auth_svc.JWT_AVAILABLE:
pytest.skip("PyJWT non disponible")
# Create user 1 (Pro)
email1 = "pro1@test.com"
password1 = "Password123!"
client.post(
REGISTER_URL,
json={"email": email1, "password": password1, "name": "Pro User 1"},
)
r1 = client.post(LOGIN_URL, json={"email": email1, "password": password1})
token1 = r1.json()["data"]["access_token"]
payload1 = jwt.decode(token1, auth_svc.SECRET_KEY, algorithms=["HS256"])
user_id1 = payload1["sub"]
users = auth_svc.load_users()
if user_id1 in users:
users[user_id1]["plan"] = "pro"
auth_svc.save_users(users)
# Create user 2 (Pro)
email2 = "pro2@test.com"
password2 = "Password123!"
client.post(
REGISTER_URL,
json={"email": email2, "password": password2, "name": "Pro User 2"},
)
r2 = client.post(LOGIN_URL, json={"email": email2, "password": password2})
token2 = r2.json()["data"]["access_token"]
payload2 = jwt.decode(token2, auth_svc.SECRET_KEY, algorithms=["HS256"])
user_id2 = payload2["sub"]
users = auth_svc.load_users()
if user_id2 in users:
users[user_id2]["plan"] = "pro"
auth_svc.save_users(users)
# Create glossary as user 1
with get_sync_session() as session:
glossary = Glossary(
user_id=user_id1,
name="User 1 Glossary",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(glossary)
session.commit()
glossary_id = glossary.id
# Try to access as user 2
response = client.get(
f"{GLOSSARIES_URL}/{glossary_id}", headers=_auth_header(token2)
)
assert response.status_code == 404
assert response.json()["error"] == "GLOSSARY_NOT_FOUND"
def test_update_glossary(self, client, pro_user_token):
"""Pro user can update their glossary."""
token, user_id = pro_user_token
with get_sync_session() as session:
glossary = Glossary(
user_id=user_id,
name="Original Name",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(glossary)
session.commit()
glossary_id = glossary.id
response = client.patch(
f"{GLOSSARIES_URL}/{glossary_id}",
json={"name": "Updated Name"},
headers=_auth_header(token),
)
assert response.status_code == 200
assert response.json()["data"]["name"] == "Updated Name"
def test_update_glossary_terms(self, client, pro_user_token):
"""Pro user can update glossary terms."""
token, user_id = pro_user_token
with get_sync_session() as session:
glossary = Glossary(
user_id=user_id,
name="Test Glossary",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
term = GlossaryTerm(
glossary=glossary,
source="old",
target="vieux",
created_at=datetime.now(timezone.utc),
)
session.add(glossary)
session.add(term)
session.commit()
glossary_id = glossary.id
response = client.patch(
f"{GLOSSARIES_URL}/{glossary_id}",
json={
"terms": [
{"source": "new", "target": "nouveau"},
]
},
headers=_auth_header(token),
)
assert response.status_code == 200
data = response.json()["data"]
assert len(data["terms"]) == 1
assert data["terms"][0]["source"] == "new"
def test_delete_glossary(self, client, pro_user_token):
"""Pro user can delete their glossary."""
token, user_id = pro_user_token
with get_sync_session() as session:
glossary = Glossary(
user_id=user_id,
name="To Delete",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(glossary)
session.commit()
glossary_id = glossary.id
response = client.delete(
f"{GLOSSARIES_URL}/{glossary_id}", headers=_auth_header(token)
)
assert response.status_code == 204
response = client.get(
f"{GLOSSARIES_URL}/{glossary_id}", headers=_auth_header(token)
)
assert response.status_code == 404
def test_delete_glossary_cascades_terms(self, client, pro_user_token):
"""Deleting a glossary should delete all its terms."""
token, user_id = pro_user_token
with get_sync_session() as session:
glossary = Glossary(
user_id=user_id,
name="To Delete",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
term = GlossaryTerm(
glossary=glossary,
source="hello",
target="bonjour",
created_at=datetime.now(timezone.utc),
)
session.add(glossary)
session.add(term)
session.commit()
glossary_id = glossary.id
response = client.delete(
f"{GLOSSARIES_URL}/{glossary_id}", headers=_auth_header(token)
)
assert response.status_code == 204
with get_sync_session() as session:
remaining_terms = (
session.query(GlossaryTerm)
.filter(GlossaryTerm.glossary_id == glossary_id)
.count()
)
assert remaining_terms == 0
def test_create_glossary_with_empty_terms(self, client, pro_user_token):
"""Pro user can create a glossary with no terms."""
token, _ = pro_user_token
response = client.post(
GLOSSARIES_URL,
json={"name": "Empty Glossary", "terms": []},
headers=_auth_header(token),
)
assert response.status_code == 201
data = response.json()["data"]
assert data["name"] == "Empty Glossary"
assert len(data["terms"]) == 0
def test_invalid_glossary_id_format(self, client, pro_user_token):
"""Invalid glossary ID format returns 400."""
token, _ = pro_user_token
response = client.get(
f"{GLOSSARIES_URL}/invalid-uuid", headers=_auth_header(token)
)
assert response.status_code == 400
assert response.json()["error"] == "INVALID_GLOSSARY_ID"