All checks were successful
Deploy to Production / Build and Deploy (push) Successful in 2m5s
191 lines
6.7 KiB
Python
191 lines
6.7 KiB
Python
"""
|
|
Tests for GET /api/v1/admin/logs - Admin Error Logs Viewer (Story 5.7).
|
|
AC: auth required, pagination, filters, no original_filename or document content.
|
|
"""
|
|
|
|
import pytest
|
|
from pathlib import Path
|
|
from unittest.mock import patch, MagicMock
|
|
from datetime import datetime, timezone
|
|
|
|
ADMIN_LOGIN_URL = "/api/v1/admin/login"
|
|
ADMIN_LOGS_URL = "/api/v1/admin/logs"
|
|
|
|
|
|
@pytest.fixture
|
|
def users_file(tmp_path: Path) -> Path:
|
|
return tmp_path / "users.json"
|
|
|
|
|
|
@pytest.fixture
|
|
def client(users_file: Path, monkeypatch):
|
|
"""TestClient with admin and rate limiting disabled."""
|
|
import services.auth_service as auth_svc
|
|
from middleware.rate_limiting import RateLimitManager
|
|
|
|
monkeypatch.setattr(auth_svc, "USERS_FILE", users_file)
|
|
monkeypatch.setattr(auth_svc, "USE_DATABASE", False)
|
|
monkeypatch.setattr(auth_svc, "DATABASE_AVAILABLE", False)
|
|
|
|
async def _check_request_allow(self, request):
|
|
return True, "ok", "test"
|
|
|
|
async def _check_translation_allow(self, request, file_size_mb=0):
|
|
return True, "ok"
|
|
|
|
monkeypatch.setattr(RateLimitManager, "check_request", _check_request_allow)
|
|
monkeypatch.setattr(RateLimitManager, "check_translation", _check_translation_allow)
|
|
|
|
from main import app
|
|
from fastapi.testclient import TestClient
|
|
|
|
return TestClient(app, raise_server_exceptions=True)
|
|
|
|
|
|
@pytest.fixture
|
|
def admin_password():
|
|
return "admin-secret"
|
|
|
|
|
|
@pytest.fixture
|
|
def client_with_admin(client, admin_password, monkeypatch):
|
|
import routes.admin_routes as admin_routes_mod
|
|
|
|
monkeypatch.setattr(admin_routes_mod, "ADMIN_USERNAME", "admin")
|
|
monkeypatch.setattr(admin_routes_mod, "ADMIN_PASSWORD", admin_password)
|
|
monkeypatch.setattr(admin_routes_mod, "ADMIN_PASSWORD_HASH", None)
|
|
return client
|
|
|
|
|
|
@pytest.fixture
|
|
def admin_token(client_with_admin, admin_password):
|
|
r = client_with_admin.post(ADMIN_LOGIN_URL, json={"password": admin_password})
|
|
assert r.status_code == 200, r.text
|
|
return r.json()["access_token"]
|
|
|
|
|
|
def _make_mock_translation(
|
|
user_id="usr_abc",
|
|
error_message="Translation failed: PROVIDER_UNAVAILABLE",
|
|
created_at=None,
|
|
provider="google",
|
|
file_type="xlsx",
|
|
original_filename="secret.docx",
|
|
):
|
|
"""Build a mock Translation row. original_filename must never appear in API response."""
|
|
m = MagicMock()
|
|
m.user_id = user_id
|
|
m.error_message = error_message
|
|
m.created_at = created_at or datetime.now(timezone.utc)
|
|
m.provider = provider
|
|
m.file_type = file_type
|
|
m.original_filename = original_filename # must NOT be in response
|
|
return m
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auth: 401 without token / invalid token
|
|
# ---------------------------------------------------------------------------
|
|
def test_admin_logs_without_token_returns_401(client_with_admin):
|
|
r = client_with_admin.get(ADMIN_LOGS_URL)
|
|
assert r.status_code == 401
|
|
|
|
|
|
def test_admin_logs_with_invalid_token_returns_401(client_with_admin):
|
|
r = client_with_admin.get(
|
|
ADMIN_LOGS_URL,
|
|
headers={"Authorization": "Bearer invalid-token"},
|
|
)
|
|
assert r.status_code == 401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 200 + response shape; no sensitive data (NFR11, NFR16)
|
|
# ---------------------------------------------------------------------------
|
|
def test_admin_logs_returns_200_and_shape(client_with_admin, admin_token):
|
|
"""With empty DB or mocked empty list, response has data.logs, data.total, data.page, data.per_page, meta.generated_at."""
|
|
r = client_with_admin.get(
|
|
ADMIN_LOGS_URL,
|
|
headers={"Authorization": f"Bearer {admin_token}"},
|
|
)
|
|
assert r.status_code == 200, r.text
|
|
body = r.json()
|
|
assert "data" in body
|
|
assert "logs" in body["data"]
|
|
assert "total" in body["data"]
|
|
assert "page" in body["data"]
|
|
assert "per_page" in body["data"]
|
|
assert "meta" in body
|
|
assert "generated_at" in body["meta"]
|
|
assert isinstance(body["data"]["logs"], list)
|
|
|
|
|
|
def test_admin_logs_no_original_filename_in_response(client_with_admin, admin_token):
|
|
"""NFR11/NFR16: response must never contain original_filename or document content."""
|
|
row = _make_mock_translation(original_filename="sensitive.docx")
|
|
with patch("database.connection.get_sync_session") as mock_get_session:
|
|
session_mock = MagicMock()
|
|
mock_get_session.return_value.__enter__.return_value = session_mock
|
|
mock_get_session.return_value.__exit__.return_value = None
|
|
|
|
q = MagicMock()
|
|
q.filter.return_value = q
|
|
q.count.return_value = 1
|
|
q.order_by.return_value.offset.return_value.limit.return_value.all.return_value = [row]
|
|
session_mock.query.return_value = q
|
|
|
|
r = client_with_admin.get(
|
|
ADMIN_LOGS_URL,
|
|
headers={"Authorization": f"Bearer {admin_token}"},
|
|
)
|
|
assert r.status_code == 200
|
|
data_str = str(r.json())
|
|
assert "original_filename" not in data_str
|
|
assert "sensitive.docx" not in data_str
|
|
assert "data" in r.json()
|
|
logs = r.json()["data"]["logs"]
|
|
assert len(logs) == 1
|
|
entry = logs[0]
|
|
assert "timestamp" in entry
|
|
assert "level" in entry
|
|
assert entry["level"] == "error"
|
|
assert "message" in entry
|
|
assert "user_id" in entry
|
|
assert "error_code" in entry
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# level=warning and level=info return empty logs
|
|
# ---------------------------------------------------------------------------
|
|
def test_admin_logs_level_warning_returns_empty(client_with_admin, admin_token):
|
|
r = client_with_admin.get(
|
|
f"{ADMIN_LOGS_URL}?level=warning",
|
|
headers={"Authorization": f"Bearer {admin_token}"},
|
|
)
|
|
assert r.status_code == 200
|
|
assert r.json()["data"]["logs"] == []
|
|
assert r.json()["data"]["total"] == 0
|
|
|
|
|
|
def test_admin_logs_level_info_returns_empty(client_with_admin, admin_token):
|
|
r = client_with_admin.get(
|
|
f"{ADMIN_LOGS_URL}?level=info",
|
|
headers={"Authorization": f"Bearer {admin_token}"},
|
|
)
|
|
assert r.status_code == 200
|
|
assert r.json()["data"]["logs"] == []
|
|
assert r.json()["data"]["total"] == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Query params: page, per_page
|
|
# ---------------------------------------------------------------------------
|
|
def test_admin_logs_accepts_page_and_per_page(client_with_admin, admin_token):
|
|
r = client_with_admin.get(
|
|
f"{ADMIN_LOGS_URL}?page=2&per_page=25",
|
|
headers={"Authorization": f"Bearer {admin_token}"},
|
|
)
|
|
assert r.status_code == 200
|
|
assert r.json()["data"]["page"] == 2
|
|
assert r.json()["data"]["per_page"] == 25
|