Files
office_translator/services/payment_service.py
sepehr 45e44dd7b2
All checks were successful
Deploy to Production / Build and Deploy (push) Successful in 3m16s
fix(billing): unify quota counters, fix Stripe webhooks, tier/plan sync
2026-06-14 17:39:34 +02:00

823 lines
31 KiB
Python

"""
Stripe payment integration for subscriptions and credits
"""
import os
import logging
from typing import Optional, Dict, Any
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
# Try to import stripe
try:
import stripe
STRIPE_AVAILABLE = True
except ImportError:
STRIPE_AVAILABLE = False
stripe = None
from models.subscription import PlanType, PLANS, CREDIT_PACKAGES, SubscriptionStatus
from services.auth_service import get_user_by_id, update_user, add_credits
from services.pricing_config import (
get_subscription_line_amount_eur,
stripe_price_ids_for_plan,
)
# Stripe configuration
STRIPE_WEBHOOK_SECRET = os.getenv("STRIPE_WEBHOOK_SECRET", "")
STRIPE_PUBLISHABLE_KEY = os.getenv("STRIPE_PUBLISHABLE_KEY", "")
def _get_stripe_secret_key() -> str:
"""Read Stripe secret key at runtime to support hot-reload/admin updates."""
return os.getenv("STRIPE_SECRET_KEY", "").strip()
def _ensure_stripe_client_configured() -> bool:
"""Configure Stripe SDK lazily with the latest key from environment."""
if not STRIPE_AVAILABLE:
return False
secret = _get_stripe_secret_key()
if not secret:
return False
stripe.api_key = secret
return True
def is_stripe_configured() -> bool:
"""Check if Stripe is properly configured"""
return _ensure_stripe_client_configured()
async def create_checkout_session(
user_id: str,
plan: PlanType,
billing_period: str = "monthly", # monthly or yearly
success_url: str = "",
cancel_url: str = "",
) -> Optional[Dict[str, Any]]:
"""Create a Stripe checkout session for subscription"""
if not is_stripe_configured():
return {"error": "Stripe not configured", "demo_mode": True}
user = get_user_by_id(user_id)
if not user:
return {"error": "User not found"}
plan_config = PLANS[plan]
plan_id = plan.value
mid, yid = stripe_price_ids_for_plan(plan_id)
price_id = mid if billing_period == "monthly" else yid
# If no Stripe price id is configured, fall back to inline price_data.
# This makes test-mode checkout work with only STRIPE_SECRET_KEY configured.
line_item: Dict[str, Any]
if price_id and price_id not in ("price_xxx", ""):
line_item = {"price": price_id, "quantity": 1}
else:
amount = get_subscription_line_amount_eur(plan, billing_period)
if amount in (None, "", -1) or float(amount) <= 0:
return {
"error": "Impossible de créer le checkout: tarif invalide pour ce forfait."
}
amount_cents = int(round(float(amount) * 100))
interval = "year" if billing_period == "yearly" else "month"
line_item = {
"price_data": {
"currency": "eur",
"unit_amount": amount_cents,
"recurring": {"interval": interval},
"product_data": {
"name": f"Office Translator - {plan_config.get('name', plan_id)}"
},
},
"quantity": 1,
}
try:
# Create or get Stripe customer
if user.stripe_customer_id:
customer_id = user.stripe_customer_id
else:
customer = stripe.Customer.create(
email=user.email,
name=user.name,
metadata={"user_id": user_id}
)
customer_id = customer.id
update_user(user_id, {"stripe_customer_id": customer_id})
# Create checkout session
session = stripe.checkout.Session.create(
customer=customer_id,
mode="subscription",
payment_method_types=["card"],
line_items=[line_item],
success_url=success_url or f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}/checkout/success?session_id={{CHECKOUT_SESSION_ID}}",
cancel_url=cancel_url or f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}/pricing",
metadata={"user_id": user_id, "plan": plan.value},
subscription_data={
"metadata": {"user_id": user_id, "plan": plan.value}
}
)
return {
"session_id": session.id,
"url": session.url
}
except Exception as e:
return {"error": str(e)}
async def sync_checkout_session(
user_id: str,
session_id: str,
) -> Dict[str, Any]:
"""
Sync a completed Stripe Checkout session to local user state.
Useful in local/dev environments where webhooks may not reach the backend.
"""
if not is_stripe_configured():
return {"error": "Stripe not configured"}
user = get_user_by_id(user_id)
if not user:
return {"error": "User not found"}
if not session_id or not session_id.startswith("cs_"):
return {"error": "Invalid checkout session id"}
try:
session = stripe.checkout.Session.retrieve(session_id, expand=["subscription"])
except Exception as e:
return {"error": f"Unable to retrieve checkout session: {str(e)}"}
metadata = session.get("metadata", {}) or {}
session_user_id = metadata.get("user_id")
session_customer_id = session.get("customer")
# Security check: session must belong to current authenticated user.
if session_user_id and session_user_id != user_id:
return {"error": "Checkout session does not belong to current user"}
if user.stripe_customer_id and session_customer_id and session_customer_id != user.stripe_customer_id:
return {"error": "Checkout customer mismatch"}
if session.get("status") != "complete":
return {"error": "Checkout session is not completed yet"}
await handle_checkout_completed(session)
return {
"status": "synced",
"plan": metadata.get("plan"),
"session_id": session_id,
}
async def create_credits_checkout(
user_id: str,
package_index: int,
success_url: str = "",
cancel_url: str = "",
) -> Optional[Dict[str, Any]]:
"""Create a Stripe checkout session for credit purchase"""
if not is_stripe_configured():
return {"error": "Stripe not configured", "demo_mode": True}
if package_index < 0 or package_index >= len(CREDIT_PACKAGES):
return {"error": "Invalid package"}
user = get_user_by_id(user_id)
if not user:
return {"error": "User not found"}
package = CREDIT_PACKAGES[package_index]
try:
# Create or get Stripe customer
if user.stripe_customer_id:
customer_id = user.stripe_customer_id
else:
customer = stripe.Customer.create(
email=user.email,
name=user.name,
metadata={"user_id": user_id}
)
customer_id = customer.id
update_user(user_id, {"stripe_customer_id": customer_id})
# Create checkout session
session = stripe.checkout.Session.create(
customer=customer_id,
mode="payment",
payment_method_types=["card"],
line_items=[{"price": package["stripe_price_id"], "quantity": 1}],
success_url=success_url or f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}/dashboard?credits=purchased",
cancel_url=cancel_url or f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}/pricing",
metadata={
"user_id": user_id,
"credits": package["credits"],
"type": "credits"
}
)
return {
"session_id": session.id,
"url": session.url
}
except Exception as e:
return {"error": str(e)}
async def handle_webhook(payload: bytes, sig_header: str) -> Dict[str, Any]:
"""Handle Stripe webhook events"""
if not is_stripe_configured():
return {"error": "Stripe not configured"}
try:
event = stripe.Webhook.construct_event(
payload, sig_header, STRIPE_WEBHOOK_SECRET
)
except ValueError:
return {"error": "Invalid payload"}
except stripe.error.SignatureVerificationError:
return {"error": "Invalid signature"}
# Handle the event
if event["type"] == "checkout.session.completed":
session = event["data"]["object"]
await handle_checkout_completed(session)
elif event["type"] == "customer.subscription.updated":
subscription = event["data"]["object"]
await handle_subscription_updated(subscription)
elif event["type"] == "customer.subscription.deleted":
subscription = event["data"]["object"]
await handle_subscription_deleted(subscription)
elif event["type"] == "invoice.payment_failed":
invoice = event["data"]["object"]
await handle_payment_failed(invoice)
elif event["type"] == "invoice.paid":
invoice = event["data"]["object"]
await handle_invoice_paid(invoice)
return {"status": "success"}
async def handle_checkout_completed(session: Dict):
"""Handle successful checkout"""
metadata = session.get("metadata", {}) or {}
user_id = metadata.get("user_id")
if not user_id:
return
session_id = session.get("id")
# Check for duplicate session processing using PaymentHistory.
# Use the Stripe payment_intent (one-time) or subscription id (recurring) as the
# idempotency key, NOT the checkout session id — Stripe can redeliver the event.
db_available = False
try:
from database.connection import get_sync_session
from database.models import PaymentHistory as DBPaymentHistory
db_available = True
except ImportError:
pass
payment_intent_id = session.get("payment_intent")
subscription_id = session.get("subscription")
if db_available and session_id:
try:
with get_sync_session() as db_session:
from sqlalchemy import or_
filters = [DBPaymentHistory.stripe_payment_intent_id == session_id]
if payment_intent_id:
filters.append(DBPaymentHistory.stripe_payment_intent_id == payment_intent_id)
if subscription_id:
filters.append(DBPaymentHistory.stripe_invoice_id == subscription_id)
existing = db_session.query(DBPaymentHistory).filter(or_(*filters)).first()
if existing:
logger.info("Checkout session %s already processed (pi=%s sub=%s). Skipping.",
session_id, payment_intent_id, subscription_id)
return
except Exception as e:
logger.error("Error checking PaymentHistory duplication: %s", e)
user = get_user_by_id(user_id)
if not user:
return
# Check if it's a credit purchase
if metadata.get("type") == "credits":
credits = int(metadata.get("credits", 0))
add_credits(user_id, credits)
# Log to PaymentHistory
if db_available and session_id:
try:
with get_sync_session() as db_session:
payment = DBPaymentHistory(
user_id=user_id,
stripe_payment_intent_id=payment_intent_id or session_id,
stripe_invoice_id=session.get("invoice") or session.get("subscription"),
amount_cents=session.get("amount_total") or 0,
currency=session.get("currency") or "usd",
payment_type="credits",
status="succeeded",
description=f"Achat de {credits} crédits",
)
db_session.add(payment)
db_session.commit()
except Exception as e:
logger.error("Failed to write credits payment to history: %s", e)
# Send Email
try:
from services.email_service import send_subscription_email_async
await send_subscription_email_async(
to_email=user.email,
user_name=user.name,
event_type="credits_purchased",
details={"credits": credits}
)
except Exception as e:
logger.error("Failed to send credit purchase email: %s", e)
# Send Telegram notification
try:
from utils.telegram import send_telegram_notification
import asyncio
msg = (
f"💰 *Achat de crédits !*\n\n"
f"• *Utilisateur* : `{user.name or 'Non renseigné'}` (`{user.email}`)\n"
f"• *Crédits achetés* : `{credits}`\n"
f"• *Montant* : `{session.get('amount_total', 0) / 100:.2f} {session.get('currency', 'eur').upper()}`\n"
f"• *ID Transaction* : `{session_id}`"
)
asyncio.create_task(send_telegram_notification(msg))
except Exception as tel_err:
logger.error(f"Failed to send telegram notification for credits checkout: {tel_err}")
return
# It's a subscription
plan = metadata.get("plan")
if plan:
subscription_raw = session.get("subscription")
subscription_id = None
subscription_ends_at = None
if isinstance(subscription_raw, str):
# Not expanded — fetch subscription to get period end
try:
sub = stripe.Subscription.retrieve(subscription_raw)
subscription_id = sub["id"]
if sub.get("current_period_end"):
from datetime import timezone
subscription_ends_at = datetime.fromtimestamp(
sub["current_period_end"], tz=timezone.utc
)
except Exception:
subscription_id = subscription_raw
elif subscription_raw:
# Expanded subscription object (from sync path)
subscription_id = subscription_raw.get("id")
period_end = subscription_raw.get("current_period_end")
if period_end:
subscription_ends_at = datetime.fromtimestamp(period_end, tz=timezone.utc)
# Derive tier from plan (DB constraint: only 'free' or 'pro')
tier = "pro" if plan in ("pro", "business", "enterprise") else "free"
is_new_sub = (
user.stripe_subscription_id != subscription_id
or user.subscription_status != SubscriptionStatus.ACTIVE.value
or user.plan != plan
)
update_user(user_id, {
"plan": plan,
"tier": tier,
"subscription_status": SubscriptionStatus.ACTIVE.value,
"stripe_subscription_id": subscription_id,
"stripe_customer_id": session.get("customer") or None,
"subscription_ends_at": subscription_ends_at,
"cancel_at_period_end": False,
"docs_translated_this_month": 0,
"pages_translated_this_month": 0,
})
logger.info("Checkout synced: user %s → plan=%s tier=%s sub=%s", user_id, plan, tier, subscription_id)
# Log to PaymentHistory
if db_available and session_id:
try:
with get_sync_session() as db_session:
payment = DBPaymentHistory(
user_id=user_id,
stripe_payment_intent_id=payment_intent_id or session_id,
stripe_invoice_id=subscription_id or session.get("invoice"),
amount_cents=session.get("amount_total") or 0,
currency=session.get("currency") or "usd",
payment_type="subscription",
status="succeeded",
description=f"Abonnement au forfait {plan}",
)
db_session.add(payment)
db_session.commit()
except Exception as e:
logger.error("Failed to write subscription payment to history: %s", e)
# Send activation email
if is_new_sub:
try:
from services.email_service import send_subscription_email_async
ends_str = subscription_ends_at.strftime("%d/%m/%Y") if subscription_ends_at else ""
await send_subscription_email_async(
to_email=user.email,
user_name=user.name,
event_type="activated",
details={"plan_name": plan, "ends_at": ends_str}
)
except Exception as e:
logger.error("Failed to send activation email: %s", e)
# Send Telegram notification
try:
from utils.telegram import send_telegram_notification
import asyncio
ends_str = subscription_ends_at.strftime("%d/%m/%Y") if subscription_ends_at else "Non spécifiée"
msg = (
f"⭐ *Nouvel Abonnement Activé !*\n\n"
f"• *Utilisateur* : `{user.name or 'Non renseigné'}` (`{user.email}`)\n"
f"• *Forfait* : `{plan.upper()}`\n"
f"• *Montant* : `{session.get('amount_total', 0) / 100:.2f} {session.get('currency', 'eur').upper()}`\n"
f"• *Date de fin* : {ends_str}"
)
asyncio.create_task(send_telegram_notification(msg))
except Exception as tel_err:
logger.error(f"Failed to send telegram notification for subscription activation: {tel_err}")
async def handle_subscription_updated(subscription: Dict):
"""Handle subscription updates"""
metadata = subscription.get("metadata", {}) or {}
user_id = metadata.get("user_id")
if not user_id:
return
user = get_user_by_id(user_id)
if not user:
return
status_map = {
"active": SubscriptionStatus.ACTIVE,
"past_due": SubscriptionStatus.PAST_DUE,
"canceled": SubscriptionStatus.CANCELED,
"trialing": SubscriptionStatus.TRIALING,
"paused": SubscriptionStatus.PAUSED,
}
stripe_status = subscription.get("status", "active")
status = status_map.get(stripe_status, SubscriptionStatus.ACTIVE)
stripe_cancel_at_period_end = subscription.get("cancel_at_period_end", False)
is_newly_cancelling = stripe_cancel_at_period_end and not user.cancel_at_period_end
period_end = subscription.get("current_period_end")
ends_str = ""
if period_end:
ends_str = datetime.fromtimestamp(period_end, tz=timezone.utc).strftime("%d/%m/%Y")
period_end = subscription.get("current_period_end")
update_user(user_id, {
"subscription_status": status.value,
"cancel_at_period_end": stripe_cancel_at_period_end,
"subscription_ends_at": datetime.fromtimestamp(period_end, tz=timezone.utc) if period_end else None,
})
# Send cancellation email if they just selected to cancel
if is_newly_cancelling:
try:
from services.email_service import send_subscription_email_async
await send_subscription_email_async(
to_email=user.email,
user_name=user.name,
event_type="cancelled",
details={"ends_at": ends_str}
)
except Exception as e:
logger.error("Failed to send cancellation email in handle_subscription_updated: %s", e)
# Send Telegram notification for cancellation request
try:
from utils.telegram import send_telegram_notification
import asyncio
msg = (
f"🔕 *Désinscription programmée (Stripe)*\n\n"
f"• *Utilisateur* : `{user.name or 'Non renseigné'}` (`{user.email}`)\n"
f"• *Date de fin d'accès* : {ends_str}"
)
asyncio.create_task(send_telegram_notification(msg))
except Exception as tel_err:
logger.error(f"Failed to send telegram notification for subscription cancellation request: {tel_err}")
async def handle_subscription_deleted(subscription: Dict):
"""Handle subscription cancellation (actually ended)"""
metadata = subscription.get("metadata", {}) or {}
user_id = metadata.get("user_id")
if not user_id:
return
user = get_user_by_id(user_id)
if not user:
return
had_active_sub = user.plan != PlanType.FREE or user.tier != "free"
update_user(user_id, {
"plan": PlanType.FREE.value,
"tier": "free",
"subscription_status": SubscriptionStatus.CANCELED.value,
"stripe_subscription_id": None,
})
# Send ended email
if had_active_sub:
try:
from services.email_service import send_subscription_email_async
await send_subscription_email_async(
to_email=user.email,
user_name=user.name,
event_type="ended",
details={}
)
except Exception as e:
logger.error("Failed to send subscription ended email: %s", e)
# Send Telegram notification for ended subscription
try:
from utils.telegram import send_telegram_notification
import asyncio
msg = (
f"❌ *Abonnement Terminé / Expiré (Stripe)*\n\n"
f"• *Utilisateur* : `{user.name or 'Non renseigné'}` (`{user.email}`)\n"
f"• *Statut* : Retour au forfait `FREE`"
)
asyncio.create_task(send_telegram_notification(msg))
except Exception as tel_err:
logger.error(f"Failed to send telegram notification for subscription ended: {tel_err}")
async def handle_payment_failed(invoice: Dict):
"""Handle failed payment — set subscription status to PAST_DUE and send email"""
customer_id = invoice.get("customer")
if not customer_id:
return
# Find user by stripe_customer_id
try:
from database.connection import get_sync_session
from database.models import User as DBUser
with get_sync_session() as session:
db_user = (
session.query(DBUser)
.filter(DBUser.stripe_customer_id == customer_id)
.first()
)
if db_user:
user_id = str(db_user.id)
db_user.subscription_status = SubscriptionStatus.PAST_DUE
session.commit()
logger.warning(
"Payment failed for customer %s (user %s) — status set to past_due",
customer_id, user_id,
)
# Send email
try:
from services.email_service import send_subscription_email_async
await send_subscription_email_async(
to_email=db_user.email,
user_name=db_user.name,
event_type="payment_failed",
details={}
)
except Exception as e:
logger.error("Failed to send payment failed email: %s", e)
# Send Telegram notification
try:
from utils.telegram import send_telegram_notification
import asyncio
msg = (
f"⚠️ *Échec de paiement (Stripe)*\n\n"
f"• *Utilisateur* : `{db_user.name or 'Non renseigné'}` (`{db_user.email}`)\n"
f"• *Statut* : Facture impayée, forfait passé en `PAST_DUE`"
)
asyncio.create_task(send_telegram_notification(msg))
except Exception as tel_err:
logger.error(f"Failed to send telegram notification for payment failed: {tel_err}")
except Exception as exc:
logger.error("handle_payment_failed DB error: %s", exc)
async def handle_invoice_paid(invoice: Dict):
"""Extend subscription_ends_at when a recurring invoice is paid."""
customer_id = invoice.get("customer")
if not customer_id:
return
subscription_id = invoice.get("subscription")
period_end = invoice.get("period_end") or invoice.get("lines", {}).get("data", [{}])[0].get("period", {}).get("end")
try:
from database.connection import get_sync_session
from database.models import User as DBUser
with get_sync_session() as session:
db_user = (
session.query(DBUser)
.filter(DBUser.stripe_customer_id == customer_id)
.first()
)
if not db_user:
return
if subscription_id and db_user.stripe_subscription_id != subscription_id:
# The paid invoice belongs to a different subscription; do not update.
logger.warning(
"Invoice paid for customer %s but subscription id mismatch (expected %s, got %s)",
customer_id, db_user.stripe_subscription_id, subscription_id,
)
return
if period_end:
new_end = datetime.fromtimestamp(period_end, tz=timezone.utc)
if db_user.subscription_ends_at is None or new_end > db_user.subscription_ends_at:
db_user.subscription_ends_at = new_end
db_user.updated_at = datetime.now(timezone.utc)
session.commit()
logger.info(
"Extended subscription_ends_at for user %s to %s",
db_user.id, new_end.isoformat(),
)
except Exception as exc:
logger.error("handle_invoice_paid error: %s", exc)
async def cancel_subscription(user_id: str) -> Dict[str, Any]:
"""Cancel a user's subscription at period end."""
if not is_stripe_configured():
return {"error": "Stripe not configured"}
user = get_user_by_id(user_id)
if not user or not user.stripe_subscription_id:
return {"error": "No active subscription found"}
try:
subscription = stripe.Subscription.retrieve(user.stripe_subscription_id)
if subscription.customer != user.stripe_customer_id:
return {"error": "Subscription does not belong to current user"}
subscription = stripe.Subscription.modify(
user.stripe_subscription_id,
cancel_at_period_end=True,
)
cancel_at = None
if subscription.cancel_at:
cancel_at = datetime.fromtimestamp(subscription.cancel_at, tz=timezone.utc)
subscription_ends_at = None
ends_str = ""
if subscription.current_period_end:
subscription_ends_at = datetime.fromtimestamp(subscription.current_period_end, tz=timezone.utc)
ends_str = datetime.fromtimestamp(subscription.current_period_end, tz=timezone.utc).strftime("%d/%m/%Y")
is_new_cancel = not user.cancel_at_period_end
update_user(user_id, {
"cancel_at_period_end": True,
"subscription_ends_at": subscription_ends_at,
})
# Send cancellation confirmation email
if is_new_cancel:
try:
from services.email_service import send_subscription_email_async
await send_subscription_email_async(
to_email=user.email,
user_name=user.name,
event_type="cancelled",
details={"ends_at": ends_str}
)
except Exception as e:
logger.error("Failed to send cancellation email in cancel_subscription: %s", e)
return {
"status": "canceling",
"cancel_at": cancel_at,
"subscription_ends_at": subscription_ends_at,
}
except Exception as e:
return {"error": str(e)}
# ─── cached portal config id (created once per process) ───────────────────────
_PORTAL_CONFIG_ID: Optional[str] = None
def _get_or_create_portal_config() -> Optional[str]:
"""
Return a Stripe Customer Portal configuration ID.
Creates one programmatically if none exists yet, so no Stripe Dashboard
manual setup is required.
"""
global _PORTAL_CONFIG_ID
if _PORTAL_CONFIG_ID:
return _PORTAL_CONFIG_ID
try:
# Re-use the first existing configuration if there is one
existing = stripe.billing_portal.Configuration.list(active=True, limit=1)
if existing.data:
_PORTAL_CONFIG_ID = existing.data[0].id
return _PORTAL_CONFIG_ID
# Create a minimal configuration programmatically
config = stripe.billing_portal.Configuration.create(
business_profile={
"headline": "Wordly.art — Gérer mon abonnement",
},
features={
"invoice_history": {"enabled": True},
"payment_method_update": {"enabled": True},
"subscription_cancel": {
"enabled": True,
"mode": "at_period_end", # keeps access until end of paid period
"proration_behavior": "none",
"cancellation_reason": {
"enabled": True,
"options": [
"too_expensive",
"missing_features",
"switched_service",
"unused",
"other",
],
},
},
"customer_update": {
"enabled": True,
"allowed_updates": ["email", "name", "address"],
},
},
)
_PORTAL_CONFIG_ID = config.id
logger.info("Stripe portal configuration created: %s", _PORTAL_CONFIG_ID)
return _PORTAL_CONFIG_ID
except Exception as e:
logger.error("Failed to get/create Stripe portal config: %s", e)
return None
async def get_billing_portal_url(user_id: str) -> Optional[str]:
"""Get Stripe billing portal URL for customer"""
if not is_stripe_configured():
return None
user = get_user_by_id(user_id)
if not user or not user.stripe_customer_id:
return None
try:
config_id = _get_or_create_portal_config()
kwargs: Dict[str, Any] = {
"customer": user.stripe_customer_id,
"return_url": (
f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}"
"/dashboard/profile?tab=subscription"
),
}
if config_id:
kwargs["configuration"] = config_id
session = stripe.billing_portal.Session.create(**kwargs)
return session.url
except Exception as e:
logger.error("get_billing_portal_url error: %s", e)
return None