""" Tests pour POST /api/v1/auth/refresh Couvre les AC 1–4 de la story 1.5 : Refresh Token """ import pytest from pathlib import Path from fastapi.testclient import TestClient REFRESH_URL = "/api/v1/auth/refresh" LOGIN_URL = "/api/v1/auth/login" REGISTER_URL = "/api/v1/auth/register" VALID_USER = { "email": "refresh@example.com", "password": "Password123!", "name": "Refresh User", } VALID_CREDENTIALS = { "email": "refresh@example.com", "password": "Password123!", } @pytest.fixture() def client(tmp_path: Path, monkeypatch): """TestClient avec stockage JSON isolé, blocklist réinitialisée et rate limiting désactivé.""" import services.auth_service as auth_svc monkeypatch.setattr(auth_svc, "USERS_FILE", tmp_path / "users.json") monkeypatch.setattr(auth_svc, "USE_DATABASE", False) monkeypatch.setattr(auth_svc, "DATABASE_AVAILABLE", False) monkeypatch.setattr(auth_svc, "_revoked_jtis", {}) from middleware.rate_limiting import RateLimitManager async def _allow(self, request): return True, "ok", "test" async def _allow_translation(self, request, file_size_mb=0): return True, "ok" monkeypatch.setattr(RateLimitManager, "check_request", _allow) monkeypatch.setattr(RateLimitManager, "check_translation", _allow_translation) from main import app return TestClient(app, raise_server_exceptions=True) @pytest.fixture() def tokens(client): """Enregistre un utilisateur, login v1, retourne access_token et refresh_token.""" client.post(REGISTER_URL, json=VALID_USER) resp = client.post(LOGIN_URL, json=VALID_CREDENTIALS) assert resp.status_code == 200 data = resp.json()["data"] return data["access_token"], data["refresh_token"] # --------------------------------------------------------------------------- # AC1 & AC2 : Endpoint existe, refresh valide → 200 + nouvel access_token et refresh_token # --------------------------------------------------------------------------- class TestRefreshSuccess: """AC1 & AC2 : POST /refresh avec token valide → 200 + data.access_token, data.refresh_token, token_type bearer""" def test_returns_200_with_valid_refresh_token(self, client, tokens): _, refresh_token = tokens response = client.post(REFRESH_URL, json={"refresh_token": refresh_token}) assert response.status_code == 200 def test_response_structure(self, client, tokens): _, refresh_token = tokens response = client.post(REFRESH_URL, json={"refresh_token": refresh_token}) body = response.json() assert "data" in body assert "access_token" in body["data"] assert "refresh_token" in body["data"] assert body["data"]["token_type"] == "bearer" assert "meta" in body assert body["meta"] == {} def test_new_tokens_are_different(self, client, tokens): _, refresh_token = tokens r1 = client.post(REFRESH_URL, json={"refresh_token": refresh_token}) assert r1.status_code == 200 access_1 = r1.json()["data"]["access_token"] refresh_1 = r1.json()["data"]["refresh_token"] assert access_1 != refresh_token assert refresh_1 != refresh_token def test_new_access_token_has_15min_expiry(self, client, tokens): """AC2 / 2.6 : Vérification optionnelle du payload JWT (exp 15 min).""" import jwt as pyjwt import services.auth_service as auth_svc from datetime import datetime, timedelta, timezone _, refresh_token = tokens response = client.post(REFRESH_URL, json={"refresh_token": refresh_token}) assert response.status_code == 200 new_access = response.json()["data"]["access_token"] payload = pyjwt.decode( new_access, auth_svc.SECRET_KEY, algorithms=[auth_svc.ALGORITHM] ) assert payload.get("type") == "access" exp = payload.get("exp") assert exp is not None # Expiry should be ~15 min from now (allow 14–16 min tolerance) now = datetime.now(timezone.utc) exp_dt = datetime.fromtimestamp(exp, tz=timezone.utc) delta = (exp_dt - now).total_seconds() assert 14 * 60 <= delta <= 16 * 60 # --------------------------------------------------------------------------- # AC3 : Refresh token expiré → 401 TOKEN_EXPIRED # --------------------------------------------------------------------------- class TestRefreshTokenExpired: """AC3 : refresh token expiré → 401 TOKEN_EXPIRED""" def test_expired_refresh_token_returns_401(self, client, monkeypatch): import jwt as pyjwt import services.auth_service as auth_svc from datetime import datetime, timedelta, timezone expired_payload = { "sub": "some-user-id", "type": "refresh", "exp": datetime.now(timezone.utc) - timedelta(minutes=5), "jti": "expired-refresh-jti", } expired_token = pyjwt.encode( expired_payload, auth_svc.SECRET_KEY, algorithm=auth_svc.ALGORITHM ) response = client.post( REFRESH_URL, json={"refresh_token": expired_token}, ) assert response.status_code == 401 def test_expired_refresh_token_error_code(self, client, monkeypatch): import jwt as pyjwt import services.auth_service as auth_svc from datetime import datetime, timedelta, timezone expired_payload = { "sub": "some-user-id", "type": "refresh", "exp": datetime.now(timezone.utc) - timedelta(minutes=5), "jti": "expired-refresh-jti", } expired_token = pyjwt.encode( expired_payload, auth_svc.SECRET_KEY, algorithm=auth_svc.ALGORITHM ) response = client.post( REFRESH_URL, json={"refresh_token": expired_token}, ) body = response.json() assert body["error"] == "TOKEN_EXPIRED" assert ( "Token invalide ou expiré" in body["message"] or "invalide" in body["message"].lower() ) # --------------------------------------------------------------------------- # AC3 : Refresh token révoqué (après logout) → 401 # --------------------------------------------------------------------------- class TestRefreshTokenRevoked: """AC3 / 2.4 : refresh token révoqué après logout → 401""" def test_refresh_after_logout_returns_401(self, client, tokens): access_token, refresh_token = tokens logout_resp = client.post( "/api/v1/auth/logout", headers={"Authorization": f"Bearer {access_token}"}, json={"refresh_token": refresh_token}, ) assert logout_resp.status_code == 200 refresh_resp = client.post( REFRESH_URL, json={"refresh_token": refresh_token}, ) assert refresh_resp.status_code == 401 assert refresh_resp.json().get("error") == "TOKEN_EXPIRED" # --------------------------------------------------------------------------- # AC4 : Corps manquant ou sans refresh_token → 400 INVALID_REQUEST # --------------------------------------------------------------------------- class TestRefreshInvalidRequest: """AC4 : requête sans corps ou sans champ refresh_token valide → 400 INVALID_REQUEST""" def test_no_body_returns_400(self, client): response = client.post(REFRESH_URL) assert response.status_code == 400 def test_no_body_error_code(self, client): response = client.post(REFRESH_URL) body = response.json() assert body["error"] == "INVALID_REQUEST" assert "message" in body def test_empty_json_returns_400(self, client): response = client.post(REFRESH_URL, json={}) assert response.status_code == 400 assert response.json()["error"] == "INVALID_REQUEST" def test_missing_refresh_token_field_returns_400(self, client): response = client.post(REFRESH_URL, json={"other": "value"}) assert response.status_code == 400 assert response.json()["error"] == "INVALID_REQUEST" assert "Refresh token requis" in response.json()["message"] def test_empty_refresh_token_string_returns_400(self, client): response = client.post(REFRESH_URL, json={"refresh_token": ""}) assert response.status_code == 400 assert response.json()["error"] == "INVALID_REQUEST" def test_refresh_token_not_string_returns_400(self, client): response = client.post(REFRESH_URL, json={"refresh_token": 123}) assert response.status_code == 400 assert response.json()["error"] == "INVALID_REQUEST" def test_non_object_json_body_returns_400(self, client): """Body JSON valide mais non-objet (ex. tableau) → 400, pas 500.""" response = client.post( REFRESH_URL, data="[]", headers={"Content-Type": "application/json"}, ) assert response.status_code == 400 assert response.json()["error"] == "INVALID_REQUEST"