Major changes across backend, frontend, infrastructure: - Provider system with model selection (Google, DeepL, OpenAI, Ollama, Google Cloud) - Admin panel: user management, pricing, settings - Glossary system with CSV import/export - Subscription and tier quota management - Security hardening (rate limiting, API key auth, path traversal fixes) - Docker compose for dev, prod, and IONOS deployment - Alembic migrations for new tables - Frontend: dashboard, pricing page, landing page, i18n (en/fr) - Test suite and verification scripts Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
248 lines
9.0 KiB
Python
248 lines
9.0 KiB
Python
"""
|
||
Tests pour POST /api/v1/auth/refresh
|
||
Couvre les AC 1–4 de la story 1.5 : Refresh Token
|
||
"""
|
||
|
||
import pytest
|
||
from pathlib import Path
|
||
from fastapi.testclient import TestClient
|
||
|
||
|
||
REFRESH_URL = "/api/v1/auth/refresh"
|
||
LOGIN_URL = "/api/v1/auth/login"
|
||
REGISTER_URL = "/api/v1/auth/register"
|
||
|
||
VALID_USER = {
|
||
"email": "refresh@example.com",
|
||
"password": "Password123!",
|
||
"name": "Refresh User",
|
||
}
|
||
|
||
VALID_CREDENTIALS = {
|
||
"email": "refresh@example.com",
|
||
"password": "Password123!",
|
||
}
|
||
|
||
|
||
@pytest.fixture()
|
||
def client(tmp_path: Path, monkeypatch):
|
||
"""TestClient avec stockage JSON isolé, blocklist réinitialisée et rate limiting désactivé."""
|
||
import services.auth_service as auth_svc
|
||
|
||
monkeypatch.setattr(auth_svc, "USERS_FILE", tmp_path / "users.json")
|
||
monkeypatch.setattr(auth_svc, "USE_DATABASE", False)
|
||
monkeypatch.setattr(auth_svc, "DATABASE_AVAILABLE", False)
|
||
monkeypatch.setattr(auth_svc, "_revoked_jtis", {})
|
||
|
||
from middleware.rate_limiting import RateLimitManager
|
||
|
||
async def _allow(self, request):
|
||
return True, "ok", "test"
|
||
|
||
async def _allow_translation(self, request, file_size_mb=0):
|
||
return True, "ok"
|
||
|
||
monkeypatch.setattr(RateLimitManager, "check_request", _allow)
|
||
monkeypatch.setattr(RateLimitManager, "check_translation", _allow_translation)
|
||
|
||
from main import app
|
||
|
||
return TestClient(app, raise_server_exceptions=True)
|
||
|
||
|
||
@pytest.fixture()
|
||
def tokens(client):
|
||
"""Enregistre un utilisateur, login v1, retourne access_token et refresh_token."""
|
||
client.post(REGISTER_URL, json=VALID_USER)
|
||
resp = client.post(LOGIN_URL, json=VALID_CREDENTIALS)
|
||
assert resp.status_code == 200
|
||
data = resp.json()["data"]
|
||
return data["access_token"], data["refresh_token"]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# AC1 & AC2 : Endpoint existe, refresh valide → 200 + nouvel access_token et refresh_token
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class TestRefreshSuccess:
|
||
"""AC1 & AC2 : POST /refresh avec token valide → 200 + data.access_token, data.refresh_token, token_type bearer"""
|
||
|
||
def test_returns_200_with_valid_refresh_token(self, client, tokens):
|
||
_, refresh_token = tokens
|
||
response = client.post(REFRESH_URL, json={"refresh_token": refresh_token})
|
||
assert response.status_code == 200
|
||
|
||
def test_response_structure(self, client, tokens):
|
||
_, refresh_token = tokens
|
||
response = client.post(REFRESH_URL, json={"refresh_token": refresh_token})
|
||
body = response.json()
|
||
assert "data" in body
|
||
assert "access_token" in body["data"]
|
||
assert "refresh_token" in body["data"]
|
||
assert body["data"]["token_type"] == "bearer"
|
||
assert "meta" in body
|
||
assert body["meta"] == {}
|
||
|
||
def test_new_tokens_are_different(self, client, tokens):
|
||
_, refresh_token = tokens
|
||
r1 = client.post(REFRESH_URL, json={"refresh_token": refresh_token})
|
||
assert r1.status_code == 200
|
||
access_1 = r1.json()["data"]["access_token"]
|
||
refresh_1 = r1.json()["data"]["refresh_token"]
|
||
assert access_1 != refresh_token
|
||
assert refresh_1 != refresh_token
|
||
|
||
def test_new_access_token_has_15min_expiry(self, client, tokens):
|
||
"""AC2 / 2.6 : Vérification optionnelle du payload JWT (exp 15 min)."""
|
||
import jwt as pyjwt
|
||
import services.auth_service as auth_svc
|
||
from datetime import datetime, timedelta, timezone
|
||
|
||
_, refresh_token = tokens
|
||
response = client.post(REFRESH_URL, json={"refresh_token": refresh_token})
|
||
assert response.status_code == 200
|
||
new_access = response.json()["data"]["access_token"]
|
||
payload = pyjwt.decode(
|
||
new_access, auth_svc.SECRET_KEY, algorithms=[auth_svc.ALGORITHM]
|
||
)
|
||
assert payload.get("type") == "access"
|
||
exp = payload.get("exp")
|
||
assert exp is not None
|
||
# Expiry should be ~15 min from now (allow 14–16 min tolerance)
|
||
now = datetime.now(timezone.utc)
|
||
exp_dt = datetime.fromtimestamp(exp, tz=timezone.utc)
|
||
delta = (exp_dt - now).total_seconds()
|
||
assert 14 * 60 <= delta <= 16 * 60
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# AC3 : Refresh token expiré → 401 TOKEN_EXPIRED
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class TestRefreshTokenExpired:
|
||
"""AC3 : refresh token expiré → 401 TOKEN_EXPIRED"""
|
||
|
||
def test_expired_refresh_token_returns_401(self, client, monkeypatch):
|
||
import jwt as pyjwt
|
||
import services.auth_service as auth_svc
|
||
from datetime import datetime, timedelta, timezone
|
||
|
||
expired_payload = {
|
||
"sub": "some-user-id",
|
||
"type": "refresh",
|
||
"exp": datetime.now(timezone.utc) - timedelta(minutes=5),
|
||
"jti": "expired-refresh-jti",
|
||
}
|
||
expired_token = pyjwt.encode(
|
||
expired_payload, auth_svc.SECRET_KEY, algorithm=auth_svc.ALGORITHM
|
||
)
|
||
|
||
response = client.post(
|
||
REFRESH_URL,
|
||
json={"refresh_token": expired_token},
|
||
)
|
||
assert response.status_code == 401
|
||
|
||
def test_expired_refresh_token_error_code(self, client, monkeypatch):
|
||
import jwt as pyjwt
|
||
import services.auth_service as auth_svc
|
||
from datetime import datetime, timedelta, timezone
|
||
|
||
expired_payload = {
|
||
"sub": "some-user-id",
|
||
"type": "refresh",
|
||
"exp": datetime.now(timezone.utc) - timedelta(minutes=5),
|
||
"jti": "expired-refresh-jti",
|
||
}
|
||
expired_token = pyjwt.encode(
|
||
expired_payload, auth_svc.SECRET_KEY, algorithm=auth_svc.ALGORITHM
|
||
)
|
||
|
||
response = client.post(
|
||
REFRESH_URL,
|
||
json={"refresh_token": expired_token},
|
||
)
|
||
body = response.json()
|
||
assert body["error"] == "TOKEN_EXPIRED"
|
||
assert (
|
||
"Token invalide ou expiré" in body["message"]
|
||
or "invalide" in body["message"].lower()
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# AC3 : Refresh token révoqué (après logout) → 401
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class TestRefreshTokenRevoked:
|
||
"""AC3 / 2.4 : refresh token révoqué après logout → 401"""
|
||
|
||
def test_refresh_after_logout_returns_401(self, client, tokens):
|
||
access_token, refresh_token = tokens
|
||
logout_resp = client.post(
|
||
"/api/v1/auth/logout",
|
||
headers={"Authorization": f"Bearer {access_token}"},
|
||
json={"refresh_token": refresh_token},
|
||
)
|
||
assert logout_resp.status_code == 200
|
||
|
||
refresh_resp = client.post(
|
||
REFRESH_URL,
|
||
json={"refresh_token": refresh_token},
|
||
)
|
||
assert refresh_resp.status_code == 401
|
||
assert refresh_resp.json().get("error") == "TOKEN_EXPIRED"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# AC4 : Corps manquant ou sans refresh_token → 400 INVALID_REQUEST
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class TestRefreshInvalidRequest:
|
||
"""AC4 : requête sans corps ou sans champ refresh_token valide → 400 INVALID_REQUEST"""
|
||
|
||
def test_no_body_returns_400(self, client):
|
||
response = client.post(REFRESH_URL)
|
||
assert response.status_code == 400
|
||
|
||
def test_no_body_error_code(self, client):
|
||
response = client.post(REFRESH_URL)
|
||
body = response.json()
|
||
assert body["error"] == "INVALID_REQUEST"
|
||
assert "message" in body
|
||
|
||
def test_empty_json_returns_400(self, client):
|
||
response = client.post(REFRESH_URL, json={})
|
||
assert response.status_code == 400
|
||
assert response.json()["error"] == "INVALID_REQUEST"
|
||
|
||
def test_missing_refresh_token_field_returns_400(self, client):
|
||
response = client.post(REFRESH_URL, json={"other": "value"})
|
||
assert response.status_code == 400
|
||
assert response.json()["error"] == "INVALID_REQUEST"
|
||
assert "Refresh token requis" in response.json()["message"]
|
||
|
||
def test_empty_refresh_token_string_returns_400(self, client):
|
||
response = client.post(REFRESH_URL, json={"refresh_token": ""})
|
||
assert response.status_code == 400
|
||
assert response.json()["error"] == "INVALID_REQUEST"
|
||
|
||
def test_refresh_token_not_string_returns_400(self, client):
|
||
response = client.post(REFRESH_URL, json={"refresh_token": 123})
|
||
assert response.status_code == 400
|
||
assert response.json()["error"] == "INVALID_REQUEST"
|
||
|
||
def test_non_object_json_body_returns_400(self, client):
|
||
"""Body JSON valide mais non-objet (ex. tableau) → 400, pas 500."""
|
||
response = client.post(
|
||
REFRESH_URL,
|
||
data="[]",
|
||
headers={"Content-Type": "application/json"},
|
||
)
|
||
assert response.status_code == 400
|
||
assert response.json()["error"] == "INVALID_REQUEST"
|