""" Stripe payment integration for subscriptions and credits """ import os import logging from typing import Optional, Dict, Any from datetime import datetime logger = logging.getLogger(__name__) # Try to import stripe try: import stripe STRIPE_AVAILABLE = True except ImportError: STRIPE_AVAILABLE = False stripe = None from models.subscription import PlanType, PLANS, CREDIT_PACKAGES, SubscriptionStatus from services.auth_service import get_user_by_id, update_user, add_credits from services.pricing_config import ( get_subscription_line_amount_eur, stripe_price_ids_for_plan, ) # Stripe configuration STRIPE_WEBHOOK_SECRET = os.getenv("STRIPE_WEBHOOK_SECRET", "") STRIPE_PUBLISHABLE_KEY = os.getenv("STRIPE_PUBLISHABLE_KEY", "") def _get_stripe_secret_key() -> str: """Read Stripe secret key at runtime to support hot-reload/admin updates.""" return os.getenv("STRIPE_SECRET_KEY", "").strip() def _ensure_stripe_client_configured() -> bool: """Configure Stripe SDK lazily with the latest key from environment.""" if not STRIPE_AVAILABLE: return False secret = _get_stripe_secret_key() if not secret: return False stripe.api_key = secret return True def is_stripe_configured() -> bool: """Check if Stripe is properly configured""" return _ensure_stripe_client_configured() async def create_checkout_session( user_id: str, plan: PlanType, billing_period: str = "monthly", # monthly or yearly success_url: str = "", cancel_url: str = "", ) -> Optional[Dict[str, Any]]: """Create a Stripe checkout session for subscription""" if not is_stripe_configured(): return {"error": "Stripe not configured", "demo_mode": True} user = get_user_by_id(user_id) if not user: return {"error": "User not found"} plan_config = PLANS[plan] plan_id = plan.value mid, yid = stripe_price_ids_for_plan(plan_id) price_id = mid if billing_period == "monthly" else yid # If no Stripe price id is configured, fall back to inline price_data. # This makes test-mode checkout work with only STRIPE_SECRET_KEY configured. line_item: Dict[str, Any] if price_id and price_id not in ("price_xxx", ""): line_item = {"price": price_id, "quantity": 1} else: amount = get_subscription_line_amount_eur(plan, billing_period) if amount in (None, "", -1) or float(amount) <= 0: return { "error": "Impossible de créer le checkout: tarif invalide pour ce forfait." } amount_cents = int(round(float(amount) * 100)) interval = "year" if billing_period == "yearly" else "month" line_item = { "price_data": { "currency": "eur", "unit_amount": amount_cents, "recurring": {"interval": interval}, "product_data": { "name": f"Office Translator - {plan_config.get('name', plan_id)}" }, }, "quantity": 1, } try: # Create or get Stripe customer if user.stripe_customer_id: customer_id = user.stripe_customer_id else: customer = stripe.Customer.create( email=user.email, name=user.name, metadata={"user_id": user_id} ) customer_id = customer.id update_user(user_id, {"stripe_customer_id": customer_id}) # Create checkout session session = stripe.checkout.Session.create( customer=customer_id, mode="subscription", payment_method_types=["card"], line_items=[line_item], success_url=success_url or f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}/checkout/success?session_id={{CHECKOUT_SESSION_ID}}", cancel_url=cancel_url or f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}/pricing", metadata={"user_id": user_id, "plan": plan.value}, subscription_data={ "metadata": {"user_id": user_id, "plan": plan.value} } ) return { "session_id": session.id, "url": session.url } except Exception as e: return {"error": str(e)} async def sync_checkout_session( user_id: str, session_id: str, ) -> Dict[str, Any]: """ Sync a completed Stripe Checkout session to local user state. Useful in local/dev environments where webhooks may not reach the backend. """ if not is_stripe_configured(): return {"error": "Stripe not configured"} user = get_user_by_id(user_id) if not user: return {"error": "User not found"} if not session_id or not session_id.startswith("cs_"): return {"error": "Invalid checkout session id"} try: session = stripe.checkout.Session.retrieve(session_id, expand=["subscription"]) except Exception as e: return {"error": f"Unable to retrieve checkout session: {str(e)}"} metadata = session.get("metadata", {}) or {} session_user_id = metadata.get("user_id") session_customer_id = session.get("customer") # Security check: session must belong to current authenticated user. if session_user_id and session_user_id != user_id: return {"error": "Checkout session does not belong to current user"} if user.stripe_customer_id and session_customer_id and session_customer_id != user.stripe_customer_id: return {"error": "Checkout customer mismatch"} if session.get("status") != "complete": return {"error": "Checkout session is not completed yet"} await handle_checkout_completed(session) return { "status": "synced", "plan": metadata.get("plan"), "session_id": session_id, } async def create_credits_checkout( user_id: str, package_index: int, success_url: str = "", cancel_url: str = "", ) -> Optional[Dict[str, Any]]: """Create a Stripe checkout session for credit purchase""" if not is_stripe_configured(): return {"error": "Stripe not configured", "demo_mode": True} if package_index < 0 or package_index >= len(CREDIT_PACKAGES): return {"error": "Invalid package"} user = get_user_by_id(user_id) if not user: return {"error": "User not found"} package = CREDIT_PACKAGES[package_index] try: # Create or get Stripe customer if user.stripe_customer_id: customer_id = user.stripe_customer_id else: customer = stripe.Customer.create( email=user.email, name=user.name, metadata={"user_id": user_id} ) customer_id = customer.id update_user(user_id, {"stripe_customer_id": customer_id}) # Create checkout session session = stripe.checkout.Session.create( customer=customer_id, mode="payment", payment_method_types=["card"], line_items=[{"price": package["stripe_price_id"], "quantity": 1}], success_url=success_url or f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}/dashboard?credits=purchased", cancel_url=cancel_url or f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}/pricing", metadata={ "user_id": user_id, "credits": package["credits"], "type": "credits" } ) return { "session_id": session.id, "url": session.url } except Exception as e: return {"error": str(e)} async def handle_webhook(payload: bytes, sig_header: str) -> Dict[str, Any]: """Handle Stripe webhook events""" if not is_stripe_configured(): return {"error": "Stripe not configured"} try: event = stripe.Webhook.construct_event( payload, sig_header, STRIPE_WEBHOOK_SECRET ) except ValueError: return {"error": "Invalid payload"} except stripe.error.SignatureVerificationError: return {"error": "Invalid signature"} # Handle the event if event["type"] == "checkout.session.completed": session = event["data"]["object"] await handle_checkout_completed(session) elif event["type"] == "customer.subscription.updated": subscription = event["data"]["object"] await handle_subscription_updated(subscription) elif event["type"] == "customer.subscription.deleted": subscription = event["data"]["object"] await handle_subscription_deleted(subscription) elif event["type"] == "invoice.payment_failed": invoice = event["data"]["object"] await handle_payment_failed(invoice) return {"status": "success"} async def handle_checkout_completed(session: Dict): """Handle successful checkout""" metadata = session.get("metadata", {}) or {} user_id = metadata.get("user_id") if not user_id: return session_id = session.get("id") # Check for duplicate session processing using PaymentHistory db_available = False try: from database.connection import get_sync_session from database.models import PaymentHistory as DBPaymentHistory db_available = True except ImportError: pass if db_available and session_id: try: with get_sync_session() as db_session: existing = db_session.query(DBPaymentHistory).filter( DBPaymentHistory.stripe_payment_intent_id == session_id ).first() if existing: logger.info("Checkout session %s already processed. Skipping.", session_id) return except Exception as e: logger.error("Error checking PaymentHistory duplication: %s", e) user = get_user_by_id(user_id) if not user: return # Check if it's a credit purchase if metadata.get("type") == "credits": credits = int(metadata.get("credits", 0)) add_credits(user_id, credits) # Log to PaymentHistory if db_available and session_id: try: with get_sync_session() as db_session: payment = DBPaymentHistory( user_id=user_id, stripe_payment_intent_id=session_id, stripe_invoice_id=session.get("invoice") or session.get("subscription"), amount_cents=session.get("amount_total") or 0, currency=session.get("currency") or "usd", payment_type="credits", status="succeeded", description=f"Achat de {credits} crédits", ) db_session.add(payment) db_session.commit() except Exception as e: logger.error("Failed to write credits payment to history: %s", e) # Send Email try: from services.email_service import send_subscription_email_async await send_subscription_email_async( to_email=user.email, user_name=user.name, event_type="credits_purchased", details={"credits": credits} ) except Exception as e: logger.error("Failed to send credit purchase email: %s", e) return # It's a subscription plan = metadata.get("plan") if plan: subscription_raw = session.get("subscription") subscription_id = None subscription_ends_at = None if isinstance(subscription_raw, str): # Not expanded — fetch subscription to get period end try: sub = stripe.Subscription.retrieve(subscription_raw) subscription_id = sub["id"] if sub.get("current_period_end"): from datetime import timezone subscription_ends_at = datetime.fromtimestamp( sub["current_period_end"], tz=timezone.utc ) except Exception: subscription_id = subscription_raw elif subscription_raw: # Expanded subscription object (from sync path) subscription_id = subscription_raw.get("id") period_end = subscription_raw.get("current_period_end") if period_end: from datetime import timezone subscription_ends_at = datetime.fromtimestamp(period_end, tz=timezone.utc) # Derive tier from plan (DB constraint: only 'free' or 'pro') tier = "pro" if plan in ("pro", "business", "enterprise") else "free" is_new_sub = ( user.stripe_subscription_id != subscription_id or user.subscription_status != SubscriptionStatus.ACTIVE.value or user.plan != plan ) update_user(user_id, { "plan": plan, "tier": tier, "subscription_status": SubscriptionStatus.ACTIVE.value, "stripe_subscription_id": subscription_id, "stripe_customer_id": session.get("customer") or None, "subscription_ends_at": subscription_ends_at, "cancel_at_period_end": False, "docs_translated_this_month": 0, "pages_translated_this_month": 0, }) logger.info("Checkout synced: user %s → plan=%s tier=%s sub=%s", user_id, plan, tier, subscription_id) # Log to PaymentHistory if db_available and session_id: try: with get_sync_session() as db_session: payment = DBPaymentHistory( user_id=user_id, stripe_payment_intent_id=session_id, stripe_invoice_id=subscription_id or session.get("invoice"), amount_cents=session.get("amount_total") or 0, currency=session.get("currency") or "usd", payment_type="subscription", status="succeeded", description=f"Abonnement au forfait {plan}", ) db_session.add(payment) db_session.commit() except Exception as e: logger.error("Failed to write subscription payment to history: %s", e) # Send activation email if is_new_sub: try: from services.email_service import send_subscription_email_async ends_str = subscription_ends_at.strftime("%d/%m/%Y") if subscription_ends_at else "" await send_subscription_email_async( to_email=user.email, user_name=user.name, event_type="activated", details={"plan_name": plan, "ends_at": ends_str} ) except Exception as e: logger.error("Failed to send activation email: %s", e) async def handle_subscription_updated(subscription: Dict): """Handle subscription updates""" metadata = subscription.get("metadata", {}) or {} user_id = metadata.get("user_id") if not user_id: return user = get_user_by_id(user_id) if not user: return status_map = { "active": SubscriptionStatus.ACTIVE, "past_due": SubscriptionStatus.PAST_DUE, "canceled": SubscriptionStatus.CANCELED, "trialing": SubscriptionStatus.TRIALING, "paused": SubscriptionStatus.PAUSED, } stripe_status = subscription.get("status", "active") status = status_map.get(stripe_status, SubscriptionStatus.ACTIVE) stripe_cancel_at_period_end = subscription.get("cancel_at_period_end", False) is_newly_cancelling = stripe_cancel_at_period_end and not user.cancel_at_period_end period_end = subscription.get("current_period_end") ends_str = "" if period_end: from datetime import timezone ends_str = datetime.fromtimestamp(period_end, tz=timezone.utc).strftime("%d/%m/%Y") update_user(user_id, { "subscription_status": status.value, "cancel_at_period_end": stripe_cancel_at_period_end, "subscription_ends_at": datetime.fromtimestamp( subscription.get("current_period_end", 0) ).isoformat() if subscription.get("current_period_end") else None }) # Send cancellation email if they just selected to cancel if is_newly_cancelling: try: from services.email_service import send_subscription_email_async await send_subscription_email_async( to_email=user.email, user_name=user.name, event_type="cancelled", details={"ends_at": ends_str} ) except Exception as e: logger.error("Failed to send cancellation email in handle_subscription_updated: %s", e) async def handle_subscription_deleted(subscription: Dict): """Handle subscription cancellation (actually ended)""" metadata = subscription.get("metadata", {}) or {} user_id = metadata.get("user_id") if not user_id: return user = get_user_by_id(user_id) if not user: return had_active_sub = user.plan != PlanType.FREE.value or user.tier != "free" update_user(user_id, { "plan": PlanType.FREE.value, "tier": "free", "subscription_status": SubscriptionStatus.CANCELED.value, "stripe_subscription_id": None, }) # Send ended email if had_active_sub: try: from services.email_service import send_subscription_email_async await send_subscription_email_async( to_email=user.email, user_name=user.name, event_type="ended", details={} ) except Exception as e: logger.error("Failed to send subscription ended email: %s", e) async def handle_payment_failed(invoice: Dict): """Handle failed payment — set subscription status to PAST_DUE and send email""" customer_id = invoice.get("customer") if not customer_id: return # Find user by stripe_customer_id try: from database.connection import get_sync_session from database.models import User as DBUser with get_sync_session() as session: db_user = ( session.query(DBUser) .filter(DBUser.stripe_customer_id == customer_id) .first() ) if db_user: user_id = str(db_user.id) db_user.subscription_status = SubscriptionStatus.PAST_DUE session.commit() logger.warning( "Payment failed for customer %s (user %s) — status set to past_due", customer_id, user_id, ) # Send email try: from services.email_service import send_subscription_email_async await send_subscription_email_async( to_email=db_user.email, user_name=db_user.name, event_type="payment_failed", details={} ) except Exception as e: logger.error("Failed to send payment failed email: %s", e) except Exception as exc: logger.error("handle_payment_failed DB error: %s", exc) async def cancel_subscription(user_id: str) -> Dict[str, Any]: """Cancel a user's subscription at period end.""" if not is_stripe_configured(): return {"error": "Stripe not configured"} user = get_user_by_id(user_id) if not user or not user.stripe_subscription_id: return {"error": "No active subscription found"} try: subscription = stripe.Subscription.modify( user.stripe_subscription_id, cancel_at_period_end=True, ) cancel_at = None if subscription.cancel_at: cancel_at = datetime.fromtimestamp(subscription.cancel_at).isoformat() subscription_ends_at = None ends_str = "" if subscription.current_period_end: subscription_ends_at = datetime.fromtimestamp(subscription.current_period_end).isoformat() ends_str = datetime.fromtimestamp(subscription.current_period_end).strftime("%d/%m/%Y") is_new_cancel = not user.cancel_at_period_end update_user(user_id, { "cancel_at_period_end": True, "subscription_ends_at": subscription_ends_at, }) # Send cancellation confirmation email if is_new_cancel: try: from services.email_service import send_subscription_email_async await send_subscription_email_async( to_email=user.email, user_name=user.name, event_type="cancelled", details={"ends_at": ends_str} ) except Exception as e: logger.error("Failed to send cancellation email in cancel_subscription: %s", e) return { "status": "canceling", "cancel_at": cancel_at, "subscription_ends_at": subscription_ends_at, } except Exception as e: return {"error": str(e)} # ─── cached portal config id (created once per process) ─────────────────────── _PORTAL_CONFIG_ID: Optional[str] = None def _get_or_create_portal_config() -> Optional[str]: """ Return a Stripe Customer Portal configuration ID. Creates one programmatically if none exists yet, so no Stripe Dashboard manual setup is required. """ global _PORTAL_CONFIG_ID if _PORTAL_CONFIG_ID: return _PORTAL_CONFIG_ID try: # Re-use the first existing configuration if there is one existing = stripe.billing_portal.Configuration.list(active=True, limit=1) if existing.data: _PORTAL_CONFIG_ID = existing.data[0].id return _PORTAL_CONFIG_ID # Create a minimal configuration programmatically config = stripe.billing_portal.Configuration.create( business_profile={ "headline": "Wordly.art — Gérer mon abonnement", }, features={ "invoice_history": {"enabled": True}, "payment_method_update": {"enabled": True}, "subscription_cancel": { "enabled": True, "mode": "at_period_end", # keeps access until end of paid period "proration_behavior": "none", "cancellation_reason": { "enabled": True, "options": [ "too_expensive", "missing_features", "switched_service", "unused", "other", ], }, }, "customer_update": { "enabled": True, "allowed_updates": ["email", "name", "address"], }, }, ) _PORTAL_CONFIG_ID = config.id logger.info("Stripe portal configuration created: %s", _PORTAL_CONFIG_ID) return _PORTAL_CONFIG_ID except Exception as e: logger.error("Failed to get/create Stripe portal config: %s", e) return None async def get_billing_portal_url(user_id: str) -> Optional[str]: """Get Stripe billing portal URL for customer""" if not is_stripe_configured(): return None user = get_user_by_id(user_id) if not user or not user.stripe_customer_id: return None try: config_id = _get_or_create_portal_config() kwargs: Dict[str, Any] = { "customer": user.stripe_customer_id, "return_url": ( f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}" "/dashboard/profile?tab=subscription" ), } if config_id: kwargs["configuration"] = config_id session = stripe.billing_portal.Session.create(**kwargs) return session.url except Exception as e: logger.error("get_billing_portal_url error: %s", e) return None