Some checks failed
Deploy to Production / Build and Deploy (push) Has been cancelled
523 lines
18 KiB
Python
523 lines
18 KiB
Python
"""
|
|
Stripe payment integration for subscriptions and credits
|
|
"""
|
|
import os
|
|
import logging
|
|
from typing import Optional, Dict, Any
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Try to import stripe
|
|
try:
|
|
import stripe
|
|
STRIPE_AVAILABLE = True
|
|
except ImportError:
|
|
STRIPE_AVAILABLE = False
|
|
stripe = None
|
|
|
|
from models.subscription import PlanType, PLANS, CREDIT_PACKAGES, SubscriptionStatus
|
|
from services.auth_service import get_user_by_id, update_user, add_credits
|
|
from services.pricing_config import (
|
|
get_subscription_line_amount_eur,
|
|
stripe_price_ids_for_plan,
|
|
)
|
|
|
|
|
|
# Stripe configuration
|
|
STRIPE_WEBHOOK_SECRET = os.getenv("STRIPE_WEBHOOK_SECRET", "")
|
|
STRIPE_PUBLISHABLE_KEY = os.getenv("STRIPE_PUBLISHABLE_KEY", "")
|
|
|
|
|
|
def _get_stripe_secret_key() -> str:
|
|
"""Read Stripe secret key at runtime to support hot-reload/admin updates."""
|
|
return os.getenv("STRIPE_SECRET_KEY", "").strip()
|
|
|
|
|
|
def _ensure_stripe_client_configured() -> bool:
|
|
"""Configure Stripe SDK lazily with the latest key from environment."""
|
|
if not STRIPE_AVAILABLE:
|
|
return False
|
|
secret = _get_stripe_secret_key()
|
|
if not secret:
|
|
return False
|
|
stripe.api_key = secret
|
|
return True
|
|
|
|
|
|
def is_stripe_configured() -> bool:
|
|
"""Check if Stripe is properly configured"""
|
|
return _ensure_stripe_client_configured()
|
|
|
|
|
|
async def create_checkout_session(
|
|
user_id: str,
|
|
plan: PlanType,
|
|
billing_period: str = "monthly", # monthly or yearly
|
|
success_url: str = "",
|
|
cancel_url: str = "",
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""Create a Stripe checkout session for subscription"""
|
|
if not is_stripe_configured():
|
|
return {"error": "Stripe not configured", "demo_mode": True}
|
|
|
|
user = get_user_by_id(user_id)
|
|
if not user:
|
|
return {"error": "User not found"}
|
|
|
|
plan_config = PLANS[plan]
|
|
plan_id = plan.value
|
|
|
|
mid, yid = stripe_price_ids_for_plan(plan_id)
|
|
price_id = mid if billing_period == "monthly" else yid
|
|
|
|
# If no Stripe price id is configured, fall back to inline price_data.
|
|
# This makes test-mode checkout work with only STRIPE_SECRET_KEY configured.
|
|
line_item: Dict[str, Any]
|
|
if price_id and price_id not in ("price_xxx", ""):
|
|
line_item = {"price": price_id, "quantity": 1}
|
|
else:
|
|
amount = get_subscription_line_amount_eur(plan, billing_period)
|
|
if amount in (None, "", -1) or float(amount) <= 0:
|
|
return {
|
|
"error": "Impossible de créer le checkout: tarif invalide pour ce forfait."
|
|
}
|
|
|
|
amount_cents = int(round(float(amount) * 100))
|
|
interval = "year" if billing_period == "yearly" else "month"
|
|
line_item = {
|
|
"price_data": {
|
|
"currency": "eur",
|
|
"unit_amount": amount_cents,
|
|
"recurring": {"interval": interval},
|
|
"product_data": {
|
|
"name": f"Office Translator - {plan_config.get('name', plan_id)}"
|
|
},
|
|
},
|
|
"quantity": 1,
|
|
}
|
|
|
|
|
|
try:
|
|
# Create or get Stripe customer
|
|
if user.stripe_customer_id:
|
|
customer_id = user.stripe_customer_id
|
|
else:
|
|
customer = stripe.Customer.create(
|
|
email=user.email,
|
|
name=user.name,
|
|
metadata={"user_id": user_id}
|
|
)
|
|
customer_id = customer.id
|
|
update_user(user_id, {"stripe_customer_id": customer_id})
|
|
|
|
# Create checkout session
|
|
session = stripe.checkout.Session.create(
|
|
customer=customer_id,
|
|
mode="subscription",
|
|
payment_method_types=["card"],
|
|
line_items=[line_item],
|
|
success_url=success_url or f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}/dashboard?session_id={{CHECKOUT_SESSION_ID}}",
|
|
cancel_url=cancel_url or f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}/pricing",
|
|
metadata={"user_id": user_id, "plan": plan.value},
|
|
subscription_data={
|
|
"metadata": {"user_id": user_id, "plan": plan.value}
|
|
}
|
|
)
|
|
|
|
return {
|
|
"session_id": session.id,
|
|
"url": session.url
|
|
}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
|
|
async def sync_checkout_session(
|
|
user_id: str,
|
|
session_id: str,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Sync a completed Stripe Checkout session to local user state.
|
|
Useful in local/dev environments where webhooks may not reach the backend.
|
|
"""
|
|
if not is_stripe_configured():
|
|
return {"error": "Stripe not configured"}
|
|
|
|
user = get_user_by_id(user_id)
|
|
if not user:
|
|
return {"error": "User not found"}
|
|
|
|
if not session_id or not session_id.startswith("cs_"):
|
|
return {"error": "Invalid checkout session id"}
|
|
|
|
try:
|
|
session = stripe.checkout.Session.retrieve(session_id, expand=["subscription"])
|
|
except Exception as e:
|
|
return {"error": f"Unable to retrieve checkout session: {str(e)}"}
|
|
|
|
metadata = session.get("metadata", {}) or {}
|
|
session_user_id = metadata.get("user_id")
|
|
session_customer_id = session.get("customer")
|
|
|
|
# Security check: session must belong to current authenticated user.
|
|
if session_user_id and session_user_id != user_id:
|
|
return {"error": "Checkout session does not belong to current user"}
|
|
if user.stripe_customer_id and session_customer_id and session_customer_id != user.stripe_customer_id:
|
|
return {"error": "Checkout customer mismatch"}
|
|
|
|
if session.get("status") != "complete":
|
|
return {"error": "Checkout session is not completed yet"}
|
|
|
|
await handle_checkout_completed(session)
|
|
return {
|
|
"status": "synced",
|
|
"plan": metadata.get("plan"),
|
|
"session_id": session_id,
|
|
}
|
|
|
|
|
|
async def create_credits_checkout(
|
|
user_id: str,
|
|
package_index: int,
|
|
success_url: str = "",
|
|
cancel_url: str = "",
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""Create a Stripe checkout session for credit purchase"""
|
|
if not is_stripe_configured():
|
|
return {"error": "Stripe not configured", "demo_mode": True}
|
|
|
|
if package_index < 0 or package_index >= len(CREDIT_PACKAGES):
|
|
return {"error": "Invalid package"}
|
|
|
|
user = get_user_by_id(user_id)
|
|
if not user:
|
|
return {"error": "User not found"}
|
|
|
|
package = CREDIT_PACKAGES[package_index]
|
|
|
|
try:
|
|
# Create or get Stripe customer
|
|
if user.stripe_customer_id:
|
|
customer_id = user.stripe_customer_id
|
|
else:
|
|
customer = stripe.Customer.create(
|
|
email=user.email,
|
|
name=user.name,
|
|
metadata={"user_id": user_id}
|
|
)
|
|
customer_id = customer.id
|
|
update_user(user_id, {"stripe_customer_id": customer_id})
|
|
|
|
# Create checkout session
|
|
session = stripe.checkout.Session.create(
|
|
customer=customer_id,
|
|
mode="payment",
|
|
payment_method_types=["card"],
|
|
line_items=[{"price": package["stripe_price_id"], "quantity": 1}],
|
|
success_url=success_url or f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}/dashboard?credits=purchased",
|
|
cancel_url=cancel_url or f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}/pricing",
|
|
metadata={
|
|
"user_id": user_id,
|
|
"credits": package["credits"],
|
|
"type": "credits"
|
|
}
|
|
)
|
|
|
|
return {
|
|
"session_id": session.id,
|
|
"url": session.url
|
|
}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
|
|
async def handle_webhook(payload: bytes, sig_header: str) -> Dict[str, Any]:
|
|
"""Handle Stripe webhook events"""
|
|
if not is_stripe_configured():
|
|
return {"error": "Stripe not configured"}
|
|
|
|
try:
|
|
event = stripe.Webhook.construct_event(
|
|
payload, sig_header, STRIPE_WEBHOOK_SECRET
|
|
)
|
|
except ValueError:
|
|
return {"error": "Invalid payload"}
|
|
except stripe.error.SignatureVerificationError:
|
|
return {"error": "Invalid signature"}
|
|
|
|
# Handle the event
|
|
if event["type"] == "checkout.session.completed":
|
|
session = event["data"]["object"]
|
|
await handle_checkout_completed(session)
|
|
|
|
elif event["type"] == "customer.subscription.updated":
|
|
subscription = event["data"]["object"]
|
|
await handle_subscription_updated(subscription)
|
|
|
|
elif event["type"] == "customer.subscription.deleted":
|
|
subscription = event["data"]["object"]
|
|
await handle_subscription_deleted(subscription)
|
|
|
|
elif event["type"] == "invoice.payment_failed":
|
|
invoice = event["data"]["object"]
|
|
await handle_payment_failed(invoice)
|
|
|
|
return {"status": "success"}
|
|
|
|
|
|
async def handle_checkout_completed(session: Dict):
|
|
"""Handle successful checkout"""
|
|
metadata = session.get("metadata", {})
|
|
user_id = metadata.get("user_id")
|
|
|
|
if not user_id:
|
|
return
|
|
|
|
# Check if it's a credit purchase
|
|
if metadata.get("type") == "credits":
|
|
credits = int(metadata.get("credits", 0))
|
|
add_credits(user_id, credits)
|
|
return
|
|
|
|
# It's a subscription
|
|
plan = metadata.get("plan")
|
|
if plan:
|
|
subscription_raw = session.get("subscription")
|
|
subscription_id = None
|
|
subscription_ends_at = None
|
|
|
|
if isinstance(subscription_raw, str):
|
|
# Not expanded — fetch subscription to get period end
|
|
try:
|
|
sub = stripe.Subscription.retrieve(subscription_raw)
|
|
subscription_id = sub["id"]
|
|
if sub.get("current_period_end"):
|
|
from datetime import timezone
|
|
subscription_ends_at = datetime.fromtimestamp(
|
|
sub["current_period_end"], tz=timezone.utc
|
|
)
|
|
except Exception:
|
|
subscription_id = subscription_raw
|
|
elif subscription_raw:
|
|
# Expanded subscription object (from sync path)
|
|
subscription_id = subscription_raw.get("id")
|
|
period_end = subscription_raw.get("current_period_end")
|
|
if period_end:
|
|
from datetime import timezone
|
|
subscription_ends_at = datetime.fromtimestamp(period_end, tz=timezone.utc)
|
|
|
|
# Derive tier from plan (DB constraint: only 'free' or 'pro')
|
|
tier = "pro" if plan in ("pro", "business", "enterprise") else "free"
|
|
|
|
update_user(user_id, {
|
|
"plan": plan,
|
|
"tier": tier,
|
|
"subscription_status": SubscriptionStatus.ACTIVE.value,
|
|
"stripe_subscription_id": subscription_id,
|
|
"stripe_customer_id": session.get("customer") or None,
|
|
"subscription_ends_at": subscription_ends_at,
|
|
"cancel_at_period_end": False,
|
|
"docs_translated_this_month": 0,
|
|
"pages_translated_this_month": 0,
|
|
})
|
|
logger.info("Checkout synced: user %s → plan=%s tier=%s sub=%s", user_id, plan, tier, subscription_id)
|
|
|
|
|
|
async def handle_subscription_updated(subscription: Dict):
|
|
"""Handle subscription updates"""
|
|
metadata = subscription.get("metadata", {})
|
|
user_id = metadata.get("user_id")
|
|
|
|
if not user_id:
|
|
return
|
|
|
|
status_map = {
|
|
"active": SubscriptionStatus.ACTIVE,
|
|
"past_due": SubscriptionStatus.PAST_DUE,
|
|
"canceled": SubscriptionStatus.CANCELED,
|
|
"trialing": SubscriptionStatus.TRIALING,
|
|
"paused": SubscriptionStatus.PAUSED,
|
|
}
|
|
|
|
stripe_status = subscription.get("status", "active")
|
|
status = status_map.get(stripe_status, SubscriptionStatus.ACTIVE)
|
|
|
|
update_user(user_id, {
|
|
"subscription_status": status.value,
|
|
"cancel_at_period_end": subscription.get("cancel_at_period_end", False),
|
|
"subscription_ends_at": datetime.fromtimestamp(
|
|
subscription.get("current_period_end", 0)
|
|
).isoformat() if subscription.get("current_period_end") else None
|
|
})
|
|
|
|
|
|
async def handle_subscription_deleted(subscription: Dict):
|
|
"""Handle subscription cancellation"""
|
|
metadata = subscription.get("metadata", {})
|
|
user_id = metadata.get("user_id")
|
|
|
|
if not user_id:
|
|
return
|
|
|
|
update_user(user_id, {
|
|
"plan": PlanType.FREE.value,
|
|
"subscription_status": SubscriptionStatus.CANCELED.value,
|
|
"stripe_subscription_id": None,
|
|
})
|
|
|
|
|
|
async def handle_payment_failed(invoice: Dict):
|
|
"""Handle failed payment — set subscription status to PAST_DUE"""
|
|
customer_id = invoice.get("customer")
|
|
if not customer_id:
|
|
return
|
|
|
|
# Find user by stripe_customer_id
|
|
user = None
|
|
try:
|
|
from database.connection import get_sync_session
|
|
from database.models import User as DBUser
|
|
|
|
with get_sync_session() as session:
|
|
db_user = (
|
|
session.query(DBUser)
|
|
.filter(DBUser.stripe_customer_id == customer_id)
|
|
.first()
|
|
)
|
|
if db_user:
|
|
user_id = str(db_user.id)
|
|
db_user.subscription_status = SubscriptionStatus.PAST_DUE
|
|
session.commit()
|
|
logger.warning(
|
|
"Payment failed for customer %s (user %s) — status set to past_due",
|
|
customer_id, user_id,
|
|
)
|
|
except Exception as exc:
|
|
logger.error("handle_payment_failed DB error: %s", exc)
|
|
|
|
|
|
async def cancel_subscription(user_id: str) -> Dict[str, Any]:
|
|
"""Cancel a user's subscription at period end."""
|
|
if not is_stripe_configured():
|
|
return {"error": "Stripe not configured"}
|
|
|
|
user = get_user_by_id(user_id)
|
|
if not user or not user.stripe_subscription_id:
|
|
return {"error": "No active subscription found"}
|
|
|
|
try:
|
|
subscription = stripe.Subscription.modify(
|
|
user.stripe_subscription_id,
|
|
cancel_at_period_end=True,
|
|
)
|
|
|
|
cancel_at = None
|
|
if subscription.cancel_at:
|
|
cancel_at = datetime.fromtimestamp(subscription.cancel_at).isoformat()
|
|
|
|
subscription_ends_at = None
|
|
if subscription.current_period_end:
|
|
subscription_ends_at = datetime.fromtimestamp(subscription.current_period_end).isoformat()
|
|
|
|
update_user(user_id, {
|
|
"cancel_at_period_end": True,
|
|
"subscription_ends_at": subscription_ends_at,
|
|
})
|
|
|
|
return {
|
|
"status": "canceling",
|
|
"cancel_at": cancel_at,
|
|
"subscription_ends_at": subscription_ends_at,
|
|
}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
|
|
# ─── cached portal config id (created once per process) ───────────────────────
|
|
_PORTAL_CONFIG_ID: Optional[str] = None
|
|
|
|
|
|
def _get_or_create_portal_config() -> Optional[str]:
|
|
"""
|
|
Return a Stripe Customer Portal configuration ID.
|
|
Creates one programmatically if none exists yet, so no Stripe Dashboard
|
|
manual setup is required.
|
|
"""
|
|
global _PORTAL_CONFIG_ID
|
|
if _PORTAL_CONFIG_ID:
|
|
return _PORTAL_CONFIG_ID
|
|
|
|
try:
|
|
# Re-use the first existing configuration if there is one
|
|
existing = stripe.billing_portal.Configuration.list(active=True, limit=1)
|
|
if existing.data:
|
|
_PORTAL_CONFIG_ID = existing.data[0].id
|
|
return _PORTAL_CONFIG_ID
|
|
|
|
# Create a minimal configuration programmatically
|
|
config = stripe.billing_portal.Configuration.create(
|
|
business_profile={
|
|
"headline": "Wordly.art — Gérer mon abonnement",
|
|
},
|
|
features={
|
|
"invoice_history": {"enabled": True},
|
|
"payment_method_update": {"enabled": True},
|
|
"subscription_cancel": {
|
|
"enabled": True,
|
|
"mode": "at_period_end", # keeps access until end of paid period
|
|
"proration_behavior": "none",
|
|
"cancellation_reason": {
|
|
"enabled": True,
|
|
"options": [
|
|
"too_expensive",
|
|
"missing_features",
|
|
"switched_service",
|
|
"unused",
|
|
"other",
|
|
],
|
|
},
|
|
},
|
|
"customer_update": {
|
|
"enabled": True,
|
|
"allowed_updates": ["email", "name", "address"],
|
|
},
|
|
},
|
|
)
|
|
_PORTAL_CONFIG_ID = config.id
|
|
logger.info("Stripe portal configuration created: %s", _PORTAL_CONFIG_ID)
|
|
return _PORTAL_CONFIG_ID
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to get/create Stripe portal config: %s", e)
|
|
return None
|
|
|
|
|
|
async def get_billing_portal_url(user_id: str) -> Optional[str]:
|
|
"""Get Stripe billing portal URL for customer"""
|
|
if not is_stripe_configured():
|
|
return None
|
|
|
|
user = get_user_by_id(user_id)
|
|
if not user or not user.stripe_customer_id:
|
|
return None
|
|
|
|
try:
|
|
config_id = _get_or_create_portal_config()
|
|
kwargs: Dict[str, Any] = {
|
|
"customer": user.stripe_customer_id,
|
|
"return_url": (
|
|
f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}"
|
|
"/dashboard/profile?tab=subscription"
|
|
),
|
|
}
|
|
if config_id:
|
|
kwargs["configuration"] = config_id
|
|
|
|
session = stripe.billing_portal.Session.create(**kwargs)
|
|
return session.url
|
|
except Exception as e:
|
|
logger.error("get_billing_portal_url error: %s", e)
|
|
return None
|
|
|