Files
office_translator/tests/test_admin_logs.py
sepehr 6da8a85b1d
All checks were successful
Deploy to Production / Build and Deploy (push) Successful in 2m5s
fix(admin): secure routes, add real IP detection, SMTP header validation, and fix Next.js layout hydration mismatch
2026-06-01 23:16:03 +02:00

191 lines
6.7 KiB
Python

"""
Tests for GET /api/v1/admin/logs - Admin Error Logs Viewer (Story 5.7).
AC: auth required, pagination, filters, no original_filename or document content.
"""
import pytest
from pathlib import Path
from unittest.mock import patch, MagicMock
from datetime import datetime, timezone
ADMIN_LOGIN_URL = "/api/v1/admin/login"
ADMIN_LOGS_URL = "/api/v1/admin/logs"
@pytest.fixture
def users_file(tmp_path: Path) -> Path:
return tmp_path / "users.json"
@pytest.fixture
def client(users_file: Path, monkeypatch):
"""TestClient with admin and rate limiting disabled."""
import services.auth_service as auth_svc
from middleware.rate_limiting import RateLimitManager
monkeypatch.setattr(auth_svc, "USERS_FILE", users_file)
monkeypatch.setattr(auth_svc, "USE_DATABASE", False)
monkeypatch.setattr(auth_svc, "DATABASE_AVAILABLE", False)
async def _check_request_allow(self, request):
return True, "ok", "test"
async def _check_translation_allow(self, request, file_size_mb=0):
return True, "ok"
monkeypatch.setattr(RateLimitManager, "check_request", _check_request_allow)
monkeypatch.setattr(RateLimitManager, "check_translation", _check_translation_allow)
from main import app
from fastapi.testclient import TestClient
return TestClient(app, raise_server_exceptions=True)
@pytest.fixture
def admin_password():
return "admin-secret"
@pytest.fixture
def client_with_admin(client, admin_password, monkeypatch):
import routes.admin_routes as admin_routes_mod
monkeypatch.setattr(admin_routes_mod, "ADMIN_USERNAME", "admin")
monkeypatch.setattr(admin_routes_mod, "ADMIN_PASSWORD", admin_password)
monkeypatch.setattr(admin_routes_mod, "ADMIN_PASSWORD_HASH", None)
return client
@pytest.fixture
def admin_token(client_with_admin, admin_password):
r = client_with_admin.post(ADMIN_LOGIN_URL, json={"password": admin_password})
assert r.status_code == 200, r.text
return r.json()["access_token"]
def _make_mock_translation(
user_id="usr_abc",
error_message="Translation failed: PROVIDER_UNAVAILABLE",
created_at=None,
provider="google",
file_type="xlsx",
original_filename="secret.docx",
):
"""Build a mock Translation row. original_filename must never appear in API response."""
m = MagicMock()
m.user_id = user_id
m.error_message = error_message
m.created_at = created_at or datetime.now(timezone.utc)
m.provider = provider
m.file_type = file_type
m.original_filename = original_filename # must NOT be in response
return m
# ---------------------------------------------------------------------------
# Auth: 401 without token / invalid token
# ---------------------------------------------------------------------------
def test_admin_logs_without_token_returns_401(client_with_admin):
r = client_with_admin.get(ADMIN_LOGS_URL)
assert r.status_code == 401
def test_admin_logs_with_invalid_token_returns_401(client_with_admin):
r = client_with_admin.get(
ADMIN_LOGS_URL,
headers={"Authorization": "Bearer invalid-token"},
)
assert r.status_code == 401
# ---------------------------------------------------------------------------
# 200 + response shape; no sensitive data (NFR11, NFR16)
# ---------------------------------------------------------------------------
def test_admin_logs_returns_200_and_shape(client_with_admin, admin_token):
"""With empty DB or mocked empty list, response has data.logs, data.total, data.page, data.per_page, meta.generated_at."""
r = client_with_admin.get(
ADMIN_LOGS_URL,
headers={"Authorization": f"Bearer {admin_token}"},
)
assert r.status_code == 200, r.text
body = r.json()
assert "data" in body
assert "logs" in body["data"]
assert "total" in body["data"]
assert "page" in body["data"]
assert "per_page" in body["data"]
assert "meta" in body
assert "generated_at" in body["meta"]
assert isinstance(body["data"]["logs"], list)
def test_admin_logs_no_original_filename_in_response(client_with_admin, admin_token):
"""NFR11/NFR16: response must never contain original_filename or document content."""
row = _make_mock_translation(original_filename="sensitive.docx")
with patch("database.connection.get_sync_session") as mock_get_session:
session_mock = MagicMock()
mock_get_session.return_value.__enter__.return_value = session_mock
mock_get_session.return_value.__exit__.return_value = None
q = MagicMock()
q.filter.return_value = q
q.count.return_value = 1
q.order_by.return_value.offset.return_value.limit.return_value.all.return_value = [row]
session_mock.query.return_value = q
r = client_with_admin.get(
ADMIN_LOGS_URL,
headers={"Authorization": f"Bearer {admin_token}"},
)
assert r.status_code == 200
data_str = str(r.json())
assert "original_filename" not in data_str
assert "sensitive.docx" not in data_str
assert "data" in r.json()
logs = r.json()["data"]["logs"]
assert len(logs) == 1
entry = logs[0]
assert "timestamp" in entry
assert "level" in entry
assert entry["level"] == "error"
assert "message" in entry
assert "user_id" in entry
assert "error_code" in entry
# ---------------------------------------------------------------------------
# level=warning and level=info return empty logs
# ---------------------------------------------------------------------------
def test_admin_logs_level_warning_returns_empty(client_with_admin, admin_token):
r = client_with_admin.get(
f"{ADMIN_LOGS_URL}?level=warning",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert r.status_code == 200
assert r.json()["data"]["logs"] == []
assert r.json()["data"]["total"] == 0
def test_admin_logs_level_info_returns_empty(client_with_admin, admin_token):
r = client_with_admin.get(
f"{ADMIN_LOGS_URL}?level=info",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert r.status_code == 200
assert r.json()["data"]["logs"] == []
assert r.json()["data"]["total"] == 0
# ---------------------------------------------------------------------------
# Query params: page, per_page
# ---------------------------------------------------------------------------
def test_admin_logs_accepts_page_and_per_page(client_with_admin, admin_token):
r = client_with_admin.get(
f"{ADMIN_LOGS_URL}?page=2&per_page=25",
headers={"Authorization": f"Bearer {admin_token}"},
)
assert r.status_code == 200
assert r.json()["data"]["page"] == 2
assert r.json()["data"]["per_page"] == 25