Files
office_translator/tests/test_prompts.py
sepehr 233a054e34
All checks were successful
Deploy to Production / Build and Deploy (push) Successful in 2m34s
fix(tests): isolate test DB and sync tier=pro in Pro user fixtures
2026-06-14 19:44:25 +02:00

485 lines
16 KiB
Python

"""
Tests for custom prompt CRUD endpoints.
Story 3.11: Custom Prompts - Endpoint CRUD
"""
import pytest
import jwt
from datetime import datetime, timezone
from pathlib import Path
from fastapi.testclient import TestClient
from database.connection import get_sync_session
from database.models import CustomPrompt
REGISTER_URL = "/api/v1/auth/register"
LOGIN_URL = "/api/v1/auth/login"
PROMPTS_URL = "/api/v1/prompts"
@pytest.fixture
def users_file(tmp_path: Path) -> Path:
return tmp_path / "users.json"
@pytest.fixture
def client(users_file: Path, monkeypatch):
"""TestClient with JSON auth and rate limiting disabled."""
import services.auth_service as auth_svc
from middleware.rate_limiting import RateLimitManager
monkeypatch.setattr(auth_svc, "USERS_FILE", users_file)
monkeypatch.setattr(auth_svc, "USE_DATABASE", False)
monkeypatch.setattr(auth_svc, "DATABASE_AVAILABLE", False)
async def _check_request_allow(self, request):
return True, "ok", "test"
async def _check_translation_allow(self, request, file_size_mb=0):
return True, "ok"
monkeypatch.setattr(RateLimitManager, "check_request", _check_request_allow)
monkeypatch.setattr(RateLimitManager, "check_translation", _check_translation_allow)
from main import app
return TestClient(app, raise_server_exceptions=True)
@pytest.fixture
def pro_user_token(client, monkeypatch):
"""Create a Pro user and return auth token."""
import services.auth_service as auth_svc
if not auth_svc.JWT_AVAILABLE:
pytest.skip("PyJWT non disponible dans cet environnement")
email = "pro@test.com"
password = "Password123!"
client.post(
REGISTER_URL, json={"email": email, "password": password, "name": "Pro User"}
)
r = client.post(LOGIN_URL, json={"email": email, "password": password})
assert r.status_code == 200, r.text
token = r.json()["data"]["access_token"]
payload = jwt.decode(token, auth_svc.SECRET_KEY, algorithms=["HS256"])
user_id = payload["sub"]
users = auth_svc.load_users()
if user_id in users:
users[user_id]["plan"] = "pro"
users[user_id]["tier"] = "pro"
auth_svc.save_users(users)
return token, user_id
@pytest.fixture
def free_user_token(client):
"""Create a Free user and return auth token."""
import services.auth_service as auth_svc
if not auth_svc.JWT_AVAILABLE:
pytest.skip("PyJWT non disponible dans cet environnement")
email = "free@test.com"
password = "Password123!"
client.post(
REGISTER_URL, json={"email": email, "password": password, "name": "Free User"}
)
r = client.post(LOGIN_URL, json={"email": email, "password": password})
assert r.status_code == 200, r.text
token = r.json()["data"]["access_token"]
payload = jwt.decode(token, auth_svc.SECRET_KEY, algorithms=["HS256"])
user_id = payload["sub"]
return token, user_id
def _auth_header(token):
return {"Authorization": f"Bearer {token}"}
class TestPromptCRUD:
"""Tests for prompt CRUD operations."""
def test_create_prompt_pro_user(self, client, pro_user_token):
"""Pro user can create a prompt."""
token, _ = pro_user_token
response = client.post(
PROMPTS_URL,
json={
"name": "Technical Translation",
"content": "You are an expert technical translator. Preserve terminology.",
},
headers=_auth_header(token),
)
assert response.status_code == 201
data = response.json()["data"]
assert data["name"] == "Technical Translation"
assert (
data["content"]
== "You are an expert technical translator. Preserve terminology."
)
assert "id" in data
assert "created_at" in data
def test_create_prompt_free_user_forbidden(self, client, free_user_token):
"""Free user cannot create prompts."""
token, _ = free_user_token
response = client.post(
PROMPTS_URL,
json={"name": "Test", "content": "Test content"},
headers=_auth_header(token),
)
assert response.status_code == 403
data = response.json()
assert data["error"] == "PRO_FEATURE_REQUIRED"
def test_create_prompt_unauthorized(self, client):
"""Unauthorized user cannot create prompts."""
response = client.post(
PROMPTS_URL,
json={"name": "Test", "content": "Test content"},
)
assert response.status_code == 401
def test_list_prompts(self, client, pro_user_token):
"""Pro user can list their prompts."""
token, user_id = pro_user_token
with get_sync_session() as session:
for i in range(3):
prompt = CustomPrompt(
user_id=user_id,
name=f"Prompt {i}",
content=f"Content {i}" * 10,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(prompt)
session.commit()
response = client.get(PROMPTS_URL, headers=_auth_header(token))
assert response.status_code == 200
data = response.json()
assert len(data["data"]) == 3
assert data["meta"]["total"] == 3
assert "content_preview" in data["data"][0]
assert len(data["data"][0]["content_preview"]) <= 100
def test_list_prompts_pagination(self, client, pro_user_token):
"""List prompts with pagination."""
token, user_id = pro_user_token
with get_sync_session() as session:
for i in range(10):
prompt = CustomPrompt(
user_id=user_id,
name=f"Prompt {i}",
content=f"Content {i}",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(prompt)
session.commit()
response = client.get(
f"{PROMPTS_URL}?page=1&per_page=5", headers=_auth_header(token)
)
assert response.status_code == 200
data = response.json()
assert len(data["data"]) == 5
assert data["meta"]["total"] == 10
assert data["meta"]["page"] == 1
assert data["meta"]["per_page"] == 5
assert data["meta"]["total_pages"] == 2
def test_list_prompts_free_user_forbidden(self, client, free_user_token):
"""Free user cannot list prompts."""
token, _ = free_user_token
response = client.get(PROMPTS_URL, headers=_auth_header(token))
assert response.status_code == 403
def test_get_prompt(self, client, pro_user_token):
"""Pro user can get a specific prompt."""
token, user_id = pro_user_token
with get_sync_session() as session:
prompt = CustomPrompt(
user_id=user_id,
name="Test Prompt",
content="Full content of the prompt here",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(prompt)
session.commit()
prompt_id = prompt.id
response = client.get(f"{PROMPTS_URL}/{prompt_id}", headers=_auth_header(token))
assert response.status_code == 200
data = response.json()["data"]
assert data["name"] == "Test Prompt"
assert data["content"] == "Full content of the prompt here"
def test_get_prompt_not_found(self, client, pro_user_token):
"""Non-existent prompt returns 404."""
token, _ = pro_user_token
import uuid
fake_id = str(uuid.uuid4())
response = client.get(f"{PROMPTS_URL}/{fake_id}", headers=_auth_header(token))
assert response.status_code == 404
assert response.json()["error"] == "PROMPT_NOT_FOUND"
def test_get_prompt_not_owner(self, client, monkeypatch):
"""User cannot access another user's prompt."""
import services.auth_service as auth_svc
if not auth_svc.JWT_AVAILABLE:
pytest.skip("PyJWT non disponible")
email1 = "pro1@test.com"
password1 = "Password123!"
client.post(
REGISTER_URL,
json={"email": email1, "password": password1, "name": "Pro User 1"},
)
r1 = client.post(LOGIN_URL, json={"email": email1, "password": password1})
token1 = r1.json()["data"]["access_token"]
payload1 = jwt.decode(token1, auth_svc.SECRET_KEY, algorithms=["HS256"])
user_id1 = payload1["sub"]
users = auth_svc.load_users()
if user_id1 in users:
users[user_id1]["plan"] = "pro"
users[user_id1]["tier"] = "pro"
auth_svc.save_users(users)
email2 = "pro2@test.com"
password2 = "Password123!"
client.post(
REGISTER_URL,
json={"email": email2, "password": password2, "name": "Pro User 2"},
)
r2 = client.post(LOGIN_URL, json={"email": email2, "password": password2})
token2 = r2.json()["data"]["access_token"]
payload2 = jwt.decode(token2, auth_svc.SECRET_KEY, algorithms=["HS256"])
user_id2 = payload2["sub"]
users = auth_svc.load_users()
if user_id2 in users:
users[user_id2]["plan"] = "pro"
users[user_id2]["tier"] = "pro"
auth_svc.save_users(users)
with get_sync_session() as session:
prompt = CustomPrompt(
user_id=user_id1,
name="User 1 Prompt",
content="Secret content",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(prompt)
session.commit()
prompt_id = prompt.id
response = client.get(
f"{PROMPTS_URL}/{prompt_id}", headers=_auth_header(token2)
)
assert response.status_code == 404
assert response.json()["error"] == "PROMPT_NOT_FOUND"
def test_update_prompt(self, client, pro_user_token):
"""Pro user can update their prompt."""
token, user_id = pro_user_token
with get_sync_session() as session:
prompt = CustomPrompt(
user_id=user_id,
name="Original Name",
content="Original content",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(prompt)
session.commit()
prompt_id = prompt.id
response = client.patch(
f"{PROMPTS_URL}/{prompt_id}",
json={"name": "Updated Name"},
headers=_auth_header(token),
)
assert response.status_code == 200
assert response.json()["data"]["name"] == "Updated Name"
assert response.json()["data"]["content"] == "Original content"
def test_update_prompt_content(self, client, pro_user_token):
"""Pro user can update prompt content."""
token, user_id = pro_user_token
with get_sync_session() as session:
prompt = CustomPrompt(
user_id=user_id,
name="Test Prompt",
content="Old content",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(prompt)
session.commit()
prompt_id = prompt.id
response = client.patch(
f"{PROMPTS_URL}/{prompt_id}",
json={"content": "New content"},
headers=_auth_header(token),
)
assert response.status_code == 200
assert response.json()["data"]["content"] == "New content"
def test_update_prompt_empty_body(self, client, pro_user_token):
"""Empty PATCH body returns 400."""
token, user_id = pro_user_token
with get_sync_session() as session:
prompt = CustomPrompt(
user_id=user_id,
name="Test Prompt",
content="Test content",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(prompt)
session.commit()
prompt_id = prompt.id
response = client.patch(
f"{PROMPTS_URL}/{prompt_id}",
json={},
headers=_auth_header(token),
)
assert response.status_code == 400
assert response.json()["error"] == "NO_UPDATE_FIELDS"
def test_delete_prompt(self, client, pro_user_token):
"""Pro user can delete their prompt."""
token, user_id = pro_user_token
with get_sync_session() as session:
prompt = CustomPrompt(
user_id=user_id,
name="To Delete",
content="Delete me",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(prompt)
session.commit()
prompt_id = prompt.id
response = client.delete(
f"{PROMPTS_URL}/{prompt_id}", headers=_auth_header(token)
)
assert response.status_code == 204
response = client.get(f"{PROMPTS_URL}/{prompt_id}", headers=_auth_header(token))
assert response.status_code == 404
def test_invalid_prompt_id_format(self, client, pro_user_token):
"""Invalid prompt ID format returns 400."""
token, _ = pro_user_token
response = client.get(
f"{PROMPTS_URL}/invalid-uuid", headers=_auth_header(token)
)
assert response.status_code == 400
assert response.json()["error"] == "INVALID_PROMPT_ID"
def test_content_max_length(self, client, pro_user_token):
"""Content > 10000 chars returns 422 or 400."""
token, _ = pro_user_token
response = client.post(
PROMPTS_URL,
json={"name": "Test", "content": "x" * 10001},
headers=_auth_header(token),
)
assert response.status_code in (400, 422)
def test_name_max_length(self, client, pro_user_token):
"""Name > 255 chars returns 422 or 400."""
token, _ = pro_user_token
response = client.post(
PROMPTS_URL,
json={"name": "x" * 256, "content": "Test content"},
headers=_auth_header(token),
)
assert response.status_code in (400, 422)
def test_empty_name(self, client, pro_user_token):
"""Empty name returns 422 or 400."""
token, _ = pro_user_token
response = client.post(
PROMPTS_URL,
json={"name": "", "content": "Test content"},
headers=_auth_header(token),
)
assert response.status_code in (400, 422)
def test_empty_content(self, client, pro_user_token):
"""Empty content returns 422 or 400."""
token, _ = pro_user_token
response = client.post(
PROMPTS_URL,
json={"name": "Test", "content": ""},
headers=_auth_header(token),
)
assert response.status_code in (400, 422)
def test_whitespace_stripped(self, client, pro_user_token):
"""Whitespace is stripped from name and content."""
token, _ = pro_user_token
response = client.post(
PROMPTS_URL,
json={"name": " Test Name ", "content": " Test Content "},
headers=_auth_header(token),
)
assert response.status_code == 201
data = response.json()["data"]
assert data["name"] == "Test Name"
assert data["content"] == "Test Content"