""" Tests for custom prompt CRUD endpoints. Story 3.11: Custom Prompts - Endpoint CRUD """ import pytest import jwt from datetime import datetime, timezone from pathlib import Path from fastapi.testclient import TestClient from database.connection import get_sync_session from database.models import CustomPrompt REGISTER_URL = "/api/v1/auth/register" LOGIN_URL = "/api/v1/auth/login" PROMPTS_URL = "/api/v1/prompts" @pytest.fixture def users_file(tmp_path: Path) -> Path: return tmp_path / "users.json" @pytest.fixture def client(users_file: Path, monkeypatch): """TestClient with JSON auth and rate limiting disabled.""" import services.auth_service as auth_svc from middleware.rate_limiting import RateLimitManager monkeypatch.setattr(auth_svc, "USERS_FILE", users_file) monkeypatch.setattr(auth_svc, "USE_DATABASE", False) monkeypatch.setattr(auth_svc, "DATABASE_AVAILABLE", False) async def _check_request_allow(self, request): return True, "ok", "test" async def _check_translation_allow(self, request, file_size_mb=0): return True, "ok" monkeypatch.setattr(RateLimitManager, "check_request", _check_request_allow) monkeypatch.setattr(RateLimitManager, "check_translation", _check_translation_allow) from main import app return TestClient(app, raise_server_exceptions=True) @pytest.fixture def pro_user_token(client, monkeypatch): """Create a Pro user and return auth token.""" import services.auth_service as auth_svc if not auth_svc.JWT_AVAILABLE: pytest.skip("PyJWT non disponible dans cet environnement") email = "pro@test.com" password = "Password123!" client.post( REGISTER_URL, json={"email": email, "password": password, "name": "Pro User"} ) r = client.post(LOGIN_URL, json={"email": email, "password": password}) assert r.status_code == 200, r.text token = r.json()["data"]["access_token"] payload = jwt.decode(token, auth_svc.SECRET_KEY, algorithms=["HS256"]) user_id = payload["sub"] users = auth_svc.load_users() if user_id in users: users[user_id]["plan"] = "pro" auth_svc.save_users(users) return token, user_id @pytest.fixture def free_user_token(client): """Create a Free user and return auth token.""" import services.auth_service as auth_svc if not auth_svc.JWT_AVAILABLE: pytest.skip("PyJWT non disponible dans cet environnement") email = "free@test.com" password = "Password123!" client.post( REGISTER_URL, json={"email": email, "password": password, "name": "Free User"} ) r = client.post(LOGIN_URL, json={"email": email, "password": password}) assert r.status_code == 200, r.text token = r.json()["data"]["access_token"] payload = jwt.decode(token, auth_svc.SECRET_KEY, algorithms=["HS256"]) user_id = payload["sub"] return token, user_id def _auth_header(token): return {"Authorization": f"Bearer {token}"} class TestPromptCRUD: """Tests for prompt CRUD operations.""" def test_create_prompt_pro_user(self, client, pro_user_token): """Pro user can create a prompt.""" token, _ = pro_user_token response = client.post( PROMPTS_URL, json={ "name": "Technical Translation", "content": "You are an expert technical translator. Preserve terminology.", }, headers=_auth_header(token), ) assert response.status_code == 201 data = response.json()["data"] assert data["name"] == "Technical Translation" assert ( data["content"] == "You are an expert technical translator. Preserve terminology." ) assert "id" in data assert "created_at" in data def test_create_prompt_free_user_forbidden(self, client, free_user_token): """Free user cannot create prompts.""" token, _ = free_user_token response = client.post( PROMPTS_URL, json={"name": "Test", "content": "Test content"}, headers=_auth_header(token), ) assert response.status_code == 403 data = response.json() assert data["error"] == "PRO_FEATURE_REQUIRED" def test_create_prompt_unauthorized(self, client): """Unauthorized user cannot create prompts.""" response = client.post( PROMPTS_URL, json={"name": "Test", "content": "Test content"}, ) assert response.status_code == 401 def test_list_prompts(self, client, pro_user_token): """Pro user can list their prompts.""" token, user_id = pro_user_token with get_sync_session() as session: for i in range(3): prompt = CustomPrompt( user_id=user_id, name=f"Prompt {i}", content=f"Content {i}" * 10, created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) session.add(prompt) session.commit() response = client.get(PROMPTS_URL, headers=_auth_header(token)) assert response.status_code == 200 data = response.json() assert len(data["data"]) == 3 assert data["meta"]["total"] == 3 assert "content_preview" in data["data"][0] assert len(data["data"][0]["content_preview"]) <= 100 def test_list_prompts_pagination(self, client, pro_user_token): """List prompts with pagination.""" token, user_id = pro_user_token with get_sync_session() as session: for i in range(10): prompt = CustomPrompt( user_id=user_id, name=f"Prompt {i}", content=f"Content {i}", created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) session.add(prompt) session.commit() response = client.get( f"{PROMPTS_URL}?page=1&per_page=5", headers=_auth_header(token) ) assert response.status_code == 200 data = response.json() assert len(data["data"]) == 5 assert data["meta"]["total"] == 10 assert data["meta"]["page"] == 1 assert data["meta"]["per_page"] == 5 assert data["meta"]["total_pages"] == 2 def test_list_prompts_free_user_forbidden(self, client, free_user_token): """Free user cannot list prompts.""" token, _ = free_user_token response = client.get(PROMPTS_URL, headers=_auth_header(token)) assert response.status_code == 403 def test_get_prompt(self, client, pro_user_token): """Pro user can get a specific prompt.""" token, user_id = pro_user_token with get_sync_session() as session: prompt = CustomPrompt( user_id=user_id, name="Test Prompt", content="Full content of the prompt here", created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) session.add(prompt) session.commit() prompt_id = prompt.id response = client.get(f"{PROMPTS_URL}/{prompt_id}", headers=_auth_header(token)) assert response.status_code == 200 data = response.json()["data"] assert data["name"] == "Test Prompt" assert data["content"] == "Full content of the prompt here" def test_get_prompt_not_found(self, client, pro_user_token): """Non-existent prompt returns 404.""" token, _ = pro_user_token import uuid fake_id = str(uuid.uuid4()) response = client.get(f"{PROMPTS_URL}/{fake_id}", headers=_auth_header(token)) assert response.status_code == 404 assert response.json()["error"] == "PROMPT_NOT_FOUND" def test_get_prompt_not_owner(self, client, monkeypatch): """User cannot access another user's prompt.""" import services.auth_service as auth_svc if not auth_svc.JWT_AVAILABLE: pytest.skip("PyJWT non disponible") email1 = "pro1@test.com" password1 = "Password123!" client.post( REGISTER_URL, json={"email": email1, "password": password1, "name": "Pro User 1"}, ) r1 = client.post(LOGIN_URL, json={"email": email1, "password": password1}) token1 = r1.json()["data"]["access_token"] payload1 = jwt.decode(token1, auth_svc.SECRET_KEY, algorithms=["HS256"]) user_id1 = payload1["sub"] users = auth_svc.load_users() if user_id1 in users: users[user_id1]["plan"] = "pro" auth_svc.save_users(users) email2 = "pro2@test.com" password2 = "Password123!" client.post( REGISTER_URL, json={"email": email2, "password": password2, "name": "Pro User 2"}, ) r2 = client.post(LOGIN_URL, json={"email": email2, "password": password2}) token2 = r2.json()["data"]["access_token"] payload2 = jwt.decode(token2, auth_svc.SECRET_KEY, algorithms=["HS256"]) user_id2 = payload2["sub"] users = auth_svc.load_users() if user_id2 in users: users[user_id2]["plan"] = "pro" auth_svc.save_users(users) with get_sync_session() as session: prompt = CustomPrompt( user_id=user_id1, name="User 1 Prompt", content="Secret content", created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) session.add(prompt) session.commit() prompt_id = prompt.id response = client.get( f"{PROMPTS_URL}/{prompt_id}", headers=_auth_header(token2) ) assert response.status_code == 404 assert response.json()["error"] == "PROMPT_NOT_FOUND" def test_update_prompt(self, client, pro_user_token): """Pro user can update their prompt.""" token, user_id = pro_user_token with get_sync_session() as session: prompt = CustomPrompt( user_id=user_id, name="Original Name", content="Original content", created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) session.add(prompt) session.commit() prompt_id = prompt.id response = client.patch( f"{PROMPTS_URL}/{prompt_id}", json={"name": "Updated Name"}, headers=_auth_header(token), ) assert response.status_code == 200 assert response.json()["data"]["name"] == "Updated Name" assert response.json()["data"]["content"] == "Original content" def test_update_prompt_content(self, client, pro_user_token): """Pro user can update prompt content.""" token, user_id = pro_user_token with get_sync_session() as session: prompt = CustomPrompt( user_id=user_id, name="Test Prompt", content="Old content", created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) session.add(prompt) session.commit() prompt_id = prompt.id response = client.patch( f"{PROMPTS_URL}/{prompt_id}", json={"content": "New content"}, headers=_auth_header(token), ) assert response.status_code == 200 assert response.json()["data"]["content"] == "New content" def test_update_prompt_empty_body(self, client, pro_user_token): """Empty PATCH body returns 400.""" token, user_id = pro_user_token with get_sync_session() as session: prompt = CustomPrompt( user_id=user_id, name="Test Prompt", content="Test content", created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) session.add(prompt) session.commit() prompt_id = prompt.id response = client.patch( f"{PROMPTS_URL}/{prompt_id}", json={}, headers=_auth_header(token), ) assert response.status_code == 400 assert response.json()["error"] == "NO_UPDATE_FIELDS" def test_delete_prompt(self, client, pro_user_token): """Pro user can delete their prompt.""" token, user_id = pro_user_token with get_sync_session() as session: prompt = CustomPrompt( user_id=user_id, name="To Delete", content="Delete me", created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) session.add(prompt) session.commit() prompt_id = prompt.id response = client.delete( f"{PROMPTS_URL}/{prompt_id}", headers=_auth_header(token) ) assert response.status_code == 204 response = client.get(f"{PROMPTS_URL}/{prompt_id}", headers=_auth_header(token)) assert response.status_code == 404 def test_invalid_prompt_id_format(self, client, pro_user_token): """Invalid prompt ID format returns 400.""" token, _ = pro_user_token response = client.get( f"{PROMPTS_URL}/invalid-uuid", headers=_auth_header(token) ) assert response.status_code == 400 assert response.json()["error"] == "INVALID_PROMPT_ID" def test_content_max_length(self, client, pro_user_token): """Content > 10000 chars returns 422 or 400.""" token, _ = pro_user_token response = client.post( PROMPTS_URL, json={"name": "Test", "content": "x" * 10001}, headers=_auth_header(token), ) assert response.status_code in (400, 422) def test_name_max_length(self, client, pro_user_token): """Name > 255 chars returns 422 or 400.""" token, _ = pro_user_token response = client.post( PROMPTS_URL, json={"name": "x" * 256, "content": "Test content"}, headers=_auth_header(token), ) assert response.status_code in (400, 422) def test_empty_name(self, client, pro_user_token): """Empty name returns 422 or 400.""" token, _ = pro_user_token response = client.post( PROMPTS_URL, json={"name": "", "content": "Test content"}, headers=_auth_header(token), ) assert response.status_code in (400, 422) def test_empty_content(self, client, pro_user_token): """Empty content returns 422 or 400.""" token, _ = pro_user_token response = client.post( PROMPTS_URL, json={"name": "Test", "content": ""}, headers=_auth_header(token), ) assert response.status_code in (400, 422) def test_whitespace_stripped(self, client, pro_user_token): """Whitespace is stripped from name and content.""" token, _ = pro_user_token response = client.post( PROMPTS_URL, json={"name": " Test Name ", "content": " Test Content "}, headers=_auth_header(token), ) assert response.status_code == 201 data = response.json()["data"] assert data["name"] == "Test Name" assert data["content"] == "Test Content"