All checks were successful
Deploy to Production / Build and Deploy (push) Successful in 2m5s
396 lines
14 KiB
Python
396 lines
14 KiB
Python
"""
|
|
Tests for PATCH /admin/users/{user_id} - Admin changement de tier manuel (Story 1.7).
|
|
AC1: 200 + user tier updated in DB; AC2/3: quota effect; AC4: audit log.
|
|
"""
|
|
|
|
import json
|
|
import pytest
|
|
from pathlib import Path
|
|
from fastapi.testclient import TestClient
|
|
|
|
REGISTER_URL = "/api/v1/auth/register"
|
|
LOGIN_URL = "/api/v1/auth/login"
|
|
ADMIN_LOGIN_URL = "/api/v1/admin/login"
|
|
ADMIN_USERS_PATCH = "/api/v1/admin/users" # + /{user_id}
|
|
|
|
|
|
@pytest.fixture
|
|
def users_file(tmp_path: Path) -> Path:
|
|
return tmp_path / "users.json"
|
|
|
|
|
|
@pytest.fixture
|
|
def client(users_file: Path, monkeypatch):
|
|
"""TestClient with JSON auth and rate limiting disabled."""
|
|
import services.auth_service as auth_svc
|
|
from middleware.rate_limiting import RateLimitManager
|
|
|
|
monkeypatch.setattr(auth_svc, "USERS_FILE", users_file)
|
|
monkeypatch.setattr(auth_svc, "USE_DATABASE", False)
|
|
monkeypatch.setattr(auth_svc, "DATABASE_AVAILABLE", False)
|
|
|
|
async def _check_request_allow(self, request):
|
|
return True, "ok", "test"
|
|
|
|
async def _check_translation_allow(self, request, file_size_mb=0):
|
|
return True, "ok"
|
|
|
|
monkeypatch.setattr(RateLimitManager, "check_request", _check_request_allow)
|
|
monkeypatch.setattr(RateLimitManager, "check_translation", _check_translation_allow)
|
|
|
|
from main import app
|
|
|
|
return TestClient(app, raise_server_exceptions=True)
|
|
|
|
|
|
@pytest.fixture
|
|
def admin_password():
|
|
return "admin-secret"
|
|
|
|
|
|
@pytest.fixture
|
|
def client_with_admin(client, admin_password, monkeypatch):
|
|
"""Same as client but with admin credentials patched in admin_routes (read at import time)."""
|
|
import routes.admin_routes as admin_routes_mod
|
|
|
|
monkeypatch.setattr(admin_routes_mod, "ADMIN_USERNAME", "admin")
|
|
monkeypatch.setattr(admin_routes_mod, "ADMIN_PASSWORD", admin_password)
|
|
monkeypatch.setattr(admin_routes_mod, "ADMIN_PASSWORD_HASH", None)
|
|
return client
|
|
|
|
|
|
@pytest.fixture
|
|
def admin_token(client_with_admin, admin_password):
|
|
"""Get admin Bearer token."""
|
|
r = client_with_admin.post(ADMIN_LOGIN_URL, json={"password": admin_password})
|
|
assert r.status_code == 200, r.text
|
|
return r.json()["access_token"]
|
|
|
|
|
|
@pytest.fixture
|
|
def registered_user_id_with_admin(client_with_admin, admin_token, users_file):
|
|
"""Register user then get id via admin GET /admin/users."""
|
|
payload = {
|
|
"email": "patchuser@example.com",
|
|
"password": "Password123!",
|
|
"name": "Patch User",
|
|
}
|
|
client_with_admin.post(REGISTER_URL, json=payload)
|
|
r = client_with_admin.get(
|
|
"/api/v1/admin/users", headers={"Authorization": f"Bearer {admin_token}"}
|
|
)
|
|
assert r.status_code == 200
|
|
users = r.json()["users"]
|
|
assert users
|
|
return users[0]["id"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 4.1 Admin PATCH valid tier → 200, user tier updated
|
|
# ---------------------------------------------------------------------------
|
|
def test_admin_patch_valid_tier_returns_200_and_updates_user(
|
|
client_with_admin, admin_token, registered_user_id_with_admin
|
|
):
|
|
user_id = registered_user_id_with_admin
|
|
r = client_with_admin.patch(
|
|
f"{ADMIN_USERS_PATCH}/{user_id}",
|
|
json={"plan": "pro"},
|
|
headers={"Authorization": f"Bearer {admin_token}"},
|
|
)
|
|
assert r.status_code == 200, r.text
|
|
body = r.json()
|
|
assert "data" in body
|
|
assert body["data"]["id"] == user_id
|
|
assert body["data"]["plan"] == "pro"
|
|
assert body["data"]["tier"] == "pro"
|
|
|
|
# Verify persistence: GET /admin/users shows updated plan
|
|
r2 = client_with_admin.get(
|
|
"/api/v1/admin/users", headers={"Authorization": f"Bearer {admin_token}"}
|
|
)
|
|
assert r2.status_code == 200
|
|
users = {u["id"]: u for u in r2.json()["users"]}
|
|
assert users[user_id]["plan"] == "pro"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 4.2 Admin PATCH invalid tier → 400
|
|
# ---------------------------------------------------------------------------
|
|
def test_admin_patch_invalid_tier_returns_400(
|
|
client_with_admin, admin_token, registered_user_id_with_admin
|
|
):
|
|
"""Invalid plan value: Pydantic validation returns 422; backend validation returns 400 with INVALID_PLAN."""
|
|
r = client_with_admin.patch(
|
|
f"{ADMIN_USERS_PATCH}/{registered_user_id_with_admin}",
|
|
json={"plan": "invalid"},
|
|
headers={"Authorization": f"Bearer {admin_token}"},
|
|
)
|
|
# Literal schema → 422 for invalid enum value
|
|
assert r.status_code in (400, 422), r.text
|
|
body = r.json()
|
|
if r.status_code == 400:
|
|
assert body.get("error") in (
|
|
"INVALID_PLAN",
|
|
"VALIDATION_ERROR",
|
|
"INVALID_FORMAT",
|
|
)
|
|
assert "Erreur de validation" in (body.get("message") or "")
|
|
else:
|
|
assert "detail" in body
|
|
assert any(
|
|
"plan" in str(d.get("loc", []))
|
|
for d in body.get("detail", [])
|
|
if isinstance(body.get("detail"), list)
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 4.3 Admin PATCH unknown user_id → 404
|
|
# ---------------------------------------------------------------------------
|
|
def test_admin_patch_unknown_user_returns_404(client_with_admin, admin_token):
|
|
r = client_with_admin.patch(
|
|
f"{ADMIN_USERS_PATCH}/00000000-0000-0000-0000-000000000000",
|
|
json={"plan": "pro"},
|
|
headers={"Authorization": f"Bearer {admin_token}"},
|
|
)
|
|
assert r.status_code == 404
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 4.4 Non-admin PATCH → 401
|
|
# ---------------------------------------------------------------------------
|
|
def test_admin_patch_without_token_returns_401(
|
|
client_with_admin, registered_user_id_with_admin
|
|
):
|
|
r = client_with_admin.patch(
|
|
f"{ADMIN_USERS_PATCH}/{registered_user_id_with_admin}",
|
|
json={"plan": "pro"},
|
|
)
|
|
assert r.status_code == 401
|
|
|
|
|
|
def test_admin_patch_with_invalid_token_returns_401(
|
|
client_with_admin, registered_user_id_with_admin
|
|
):
|
|
r = client_with_admin.patch(
|
|
f"{ADMIN_USERS_PATCH}/{registered_user_id_with_admin}",
|
|
json={"plan": "pro"},
|
|
headers={"Authorization": "Bearer invalid-token"},
|
|
)
|
|
assert r.status_code == 401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 4.5 After upgrade to pro, user can translate beyond 5/day; after downgrade to free, quota 5 applies
|
|
# ---------------------------------------------------------------------------
|
|
@pytest.fixture
|
|
def app_client_for_quota(tmp_path, monkeypatch, admin_password):
|
|
"""Client with JSON auth and in-memory tier quota for translate tests."""
|
|
import routes.admin_routes as admin_routes_mod
|
|
|
|
monkeypatch.setattr(admin_routes_mod, "ADMIN_USERNAME", "admin")
|
|
monkeypatch.setattr(admin_routes_mod, "ADMIN_PASSWORD", admin_password)
|
|
monkeypatch.setattr(admin_routes_mod, "ADMIN_PASSWORD_HASH", None)
|
|
|
|
import services.auth_service as auth_svc
|
|
from middleware import tier_quota as tier_quota_mod
|
|
from middleware.tier_quota import _memory_usage
|
|
from middleware.rate_limiting import RateLimitManager
|
|
|
|
monkeypatch.setattr(auth_svc, "USERS_FILE", tmp_path / "users.json")
|
|
monkeypatch.setattr(auth_svc, "USE_DATABASE", False)
|
|
monkeypatch.setattr(auth_svc, "DATABASE_AVAILABLE", False)
|
|
monkeypatch.setattr(tier_quota_mod, "_get_async_redis", lambda: None)
|
|
monkeypatch.setenv("REDIS_URL", "")
|
|
_memory_usage.clear()
|
|
|
|
async def _allow_request(self, request):
|
|
return True, "ok", "test"
|
|
|
|
async def _allow_translation(self, request, file_size_mb=0):
|
|
return True, ""
|
|
|
|
async def _allow_translation_limit(self, client_id, file_size_mb=0):
|
|
return True
|
|
|
|
monkeypatch.setattr(RateLimitManager, "check_request", _allow_request)
|
|
monkeypatch.setattr(RateLimitManager, "check_translation", _allow_translation)
|
|
monkeypatch.setattr(
|
|
RateLimitManager, "check_translation_limit", _allow_translation_limit
|
|
)
|
|
|
|
from main import app
|
|
|
|
return TestClient(app, raise_server_exceptions=True)
|
|
|
|
|
|
def test_after_upgrade_to_pro_user_can_translate_beyond_five(
|
|
app_client_for_quota, minimal_xlsx, admin_password
|
|
):
|
|
"""After admin upgrades user to pro, user can translate more than 5 files (quota unlimited)."""
|
|
client = app_client_for_quota
|
|
client.post(
|
|
REGISTER_URL,
|
|
json={
|
|
"email": "quota@example.com",
|
|
"password": "Password123!",
|
|
"name": "Quota User",
|
|
},
|
|
)
|
|
admin_r = client.post(ADMIN_LOGIN_URL, json={"password": admin_password})
|
|
admin_token = admin_r.json()["access_token"]
|
|
users_r = client.get(
|
|
"/api/v1/admin/users", headers={"Authorization": f"Bearer {admin_token}"}
|
|
)
|
|
user_id = next(
|
|
u["id"] for u in users_r.json()["users"] if u["email"] == "quota@example.com"
|
|
)
|
|
|
|
client.patch(
|
|
f"{ADMIN_USERS_PATCH}/{user_id}",
|
|
json={"plan": "pro"},
|
|
headers={"Authorization": f"Bearer {admin_token}"},
|
|
)
|
|
|
|
login_r = client.post(
|
|
LOGIN_URL, json={"email": "quota@example.com", "password": "Password123!"}
|
|
)
|
|
access_token = login_r.json()["data"]["access_token"]
|
|
|
|
from unittest.mock import patch
|
|
from pathlib import Path
|
|
|
|
def _fake_translate(
|
|
input_path, output_path, target_language, source_language="auto", **kwargs
|
|
):
|
|
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
|
|
Path(output_path).write_bytes(b"dummy")
|
|
|
|
with patch(
|
|
"routes.translate_routes.ExcelTranslator.translate_file",
|
|
side_effect=_fake_translate,
|
|
), patch(
|
|
"routes.translate_routes.ExcelTranslator.get_translation_stats",
|
|
return_value={"attempted": 1, "changed": 1},
|
|
):
|
|
for _ in range(6):
|
|
with open(minimal_xlsx, "rb") as f:
|
|
r = client.post(
|
|
"/api/v1/translate",
|
|
files={
|
|
"file": (
|
|
"t.xlsx",
|
|
f,
|
|
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
)
|
|
},
|
|
data={"target_lang": "fr", "provider": "google"},
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
assert r.status_code == 202, (
|
|
f"Pro user should not hit quota at request {_ + 1}: {r.text}"
|
|
)
|
|
|
|
|
|
def test_after_downgrade_to_free_quota_five_applies(
|
|
app_client_for_quota, minimal_xlsx, admin_password
|
|
):
|
|
"""After admin downgrades user to free, quota 5 applies (6th returns 429)."""
|
|
client = app_client_for_quota
|
|
client.post(
|
|
REGISTER_URL,
|
|
json={
|
|
"email": "downgrade@example.com",
|
|
"password": "Password123!",
|
|
"name": "Downgrade User",
|
|
},
|
|
)
|
|
admin_r = client.post(ADMIN_LOGIN_URL, json={"password": admin_password})
|
|
admin_token = admin_r.json()["access_token"]
|
|
users_r = client.get(
|
|
"/api/v1/admin/users", headers={"Authorization": f"Bearer {admin_token}"}
|
|
)
|
|
user_id = next(
|
|
u["id"]
|
|
for u in users_r.json()["users"]
|
|
if u["email"] == "downgrade@example.com"
|
|
)
|
|
|
|
client.patch(
|
|
f"{ADMIN_USERS_PATCH}/{user_id}",
|
|
json={"plan": "pro"},
|
|
headers={"Authorization": f"Bearer {admin_token}"},
|
|
)
|
|
login_r = client.post(
|
|
LOGIN_URL, json={"email": "downgrade@example.com", "password": "Password123!"}
|
|
)
|
|
access_token = login_r.json()["data"]["access_token"]
|
|
|
|
from unittest.mock import patch
|
|
from pathlib import Path
|
|
|
|
def _fake_translate(
|
|
input_path, output_path, target_language, source_language="auto", **kwargs
|
|
):
|
|
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
|
|
Path(output_path).write_bytes(b"dummy")
|
|
|
|
with patch(
|
|
"routes.translate_routes.ExcelTranslator.translate_file",
|
|
side_effect=_fake_translate,
|
|
), patch(
|
|
"routes.translate_routes.ExcelTranslator.get_translation_stats",
|
|
return_value={"attempted": 1, "changed": 1},
|
|
):
|
|
for _ in range(5):
|
|
with open(minimal_xlsx, "rb") as f:
|
|
client.post(
|
|
"/api/v1/translate",
|
|
files={
|
|
"file": (
|
|
"t.xlsx",
|
|
f,
|
|
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
)
|
|
},
|
|
data={"target_lang": "fr", "provider": "google"},
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
import time
|
|
time.sleep(0.5)
|
|
client.patch(
|
|
f"{ADMIN_USERS_PATCH}/{user_id}",
|
|
json={"plan": "free"},
|
|
headers={"Authorization": f"Bearer {admin_token}"},
|
|
)
|
|
|
|
with open(minimal_xlsx, "rb") as f:
|
|
r = client.post(
|
|
"/api/v1/translate",
|
|
files={
|
|
"file": (
|
|
"t.xlsx",
|
|
f,
|
|
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
)
|
|
},
|
|
data={"target_lang": "fr", "provider": "google"},
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
assert r.status_code == 429, r.text
|
|
assert "QUOTA_EXCEEDED" in (r.json().get("error") or "")
|
|
|
|
|
|
@pytest.fixture
|
|
def minimal_xlsx(tmp_path):
|
|
try:
|
|
import openpyxl
|
|
|
|
wb = openpyxl.Workbook()
|
|
wb.active["A1"] = "Hello"
|
|
p = tmp_path / "minimal.xlsx"
|
|
wb.save(p)
|
|
return p
|
|
except ImportError:
|
|
pytest.skip("openpyxl required")
|