""" Stripe payment integration for subscriptions and credits """ import os import logging from typing import Optional, Dict, Any from datetime import datetime logger = logging.getLogger(__name__) # Try to import stripe try: import stripe STRIPE_AVAILABLE = True except ImportError: STRIPE_AVAILABLE = False stripe = None from models.subscription import PlanType, PLANS, CREDIT_PACKAGES, SubscriptionStatus from services.auth_service import get_user_by_id, update_user, add_credits from services.pricing_config import ( get_subscription_line_amount_eur, stripe_price_ids_for_plan, ) # Stripe configuration STRIPE_WEBHOOK_SECRET = os.getenv("STRIPE_WEBHOOK_SECRET", "") STRIPE_PUBLISHABLE_KEY = os.getenv("STRIPE_PUBLISHABLE_KEY", "") def _get_stripe_secret_key() -> str: """Read Stripe secret key at runtime to support hot-reload/admin updates.""" return os.getenv("STRIPE_SECRET_KEY", "").strip() def _ensure_stripe_client_configured() -> bool: """Configure Stripe SDK lazily with the latest key from environment.""" if not STRIPE_AVAILABLE: return False secret = _get_stripe_secret_key() if not secret: return False stripe.api_key = secret return True def is_stripe_configured() -> bool: """Check if Stripe is properly configured""" return _ensure_stripe_client_configured() async def create_checkout_session( user_id: str, plan: PlanType, billing_period: str = "monthly", # monthly or yearly success_url: str = "", cancel_url: str = "", ) -> Optional[Dict[str, Any]]: """Create a Stripe checkout session for subscription""" if not is_stripe_configured(): return {"error": "Stripe not configured", "demo_mode": True} user = get_user_by_id(user_id) if not user: return {"error": "User not found"} plan_config = PLANS[plan] plan_id = plan.value mid, yid = stripe_price_ids_for_plan(plan_id) price_id = mid if billing_period == "monthly" else yid # If no Stripe price id is configured, fall back to inline price_data. # This makes test-mode checkout work with only STRIPE_SECRET_KEY configured. line_item: Dict[str, Any] if price_id and price_id not in ("price_xxx", ""): line_item = {"price": price_id, "quantity": 1} else: amount = get_subscription_line_amount_eur(plan, billing_period) if amount in (None, "", -1) or float(amount) <= 0: return { "error": "Impossible de créer le checkout: tarif invalide pour ce forfait." } amount_cents = int(round(float(amount) * 100)) interval = "year" if billing_period == "yearly" else "month" line_item = { "price_data": { "currency": "eur", "unit_amount": amount_cents, "recurring": {"interval": interval}, "product_data": { "name": f"Office Translator - {plan_config.get('name', plan_id)}" }, }, "quantity": 1, } try: # Create or get Stripe customer if user.stripe_customer_id: customer_id = user.stripe_customer_id else: customer = stripe.Customer.create( email=user.email, name=user.name, metadata={"user_id": user_id} ) customer_id = customer.id update_user(user_id, {"stripe_customer_id": customer_id}) # Create checkout session session = stripe.checkout.Session.create( customer=customer_id, mode="subscription", payment_method_types=["card"], line_items=[line_item], success_url=success_url or f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}/dashboard?session_id={{CHECKOUT_SESSION_ID}}", cancel_url=cancel_url or f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}/pricing", metadata={"user_id": user_id, "plan": plan.value}, subscription_data={ "metadata": {"user_id": user_id, "plan": plan.value} } ) return { "session_id": session.id, "url": session.url } except Exception as e: return {"error": str(e)} async def sync_checkout_session( user_id: str, session_id: str, ) -> Dict[str, Any]: """ Sync a completed Stripe Checkout session to local user state. Useful in local/dev environments where webhooks may not reach the backend. """ if not is_stripe_configured(): return {"error": "Stripe not configured"} user = get_user_by_id(user_id) if not user: return {"error": "User not found"} if not session_id or not session_id.startswith("cs_"): return {"error": "Invalid checkout session id"} try: session = stripe.checkout.Session.retrieve(session_id, expand=["subscription"]) except Exception as e: return {"error": f"Unable to retrieve checkout session: {str(e)}"} metadata = session.get("metadata", {}) or {} session_user_id = metadata.get("user_id") session_customer_id = session.get("customer") # Security check: session must belong to current authenticated user. if session_user_id and session_user_id != user_id: return {"error": "Checkout session does not belong to current user"} if user.stripe_customer_id and session_customer_id and session_customer_id != user.stripe_customer_id: return {"error": "Checkout customer mismatch"} if session.get("status") != "complete": return {"error": "Checkout session is not completed yet"} await handle_checkout_completed(session) return { "status": "synced", "plan": metadata.get("plan"), "session_id": session_id, } async def create_credits_checkout( user_id: str, package_index: int, success_url: str = "", cancel_url: str = "", ) -> Optional[Dict[str, Any]]: """Create a Stripe checkout session for credit purchase""" if not is_stripe_configured(): return {"error": "Stripe not configured", "demo_mode": True} if package_index < 0 or package_index >= len(CREDIT_PACKAGES): return {"error": "Invalid package"} user = get_user_by_id(user_id) if not user: return {"error": "User not found"} package = CREDIT_PACKAGES[package_index] try: # Create or get Stripe customer if user.stripe_customer_id: customer_id = user.stripe_customer_id else: customer = stripe.Customer.create( email=user.email, name=user.name, metadata={"user_id": user_id} ) customer_id = customer.id update_user(user_id, {"stripe_customer_id": customer_id}) # Create checkout session session = stripe.checkout.Session.create( customer=customer_id, mode="payment", payment_method_types=["card"], line_items=[{"price": package["stripe_price_id"], "quantity": 1}], success_url=success_url or f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}/dashboard?credits=purchased", cancel_url=cancel_url or f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}/pricing", metadata={ "user_id": user_id, "credits": package["credits"], "type": "credits" } ) return { "session_id": session.id, "url": session.url } except Exception as e: return {"error": str(e)} async def handle_webhook(payload: bytes, sig_header: str) -> Dict[str, Any]: """Handle Stripe webhook events""" if not is_stripe_configured(): return {"error": "Stripe not configured"} try: event = stripe.Webhook.construct_event( payload, sig_header, STRIPE_WEBHOOK_SECRET ) except ValueError: return {"error": "Invalid payload"} except stripe.error.SignatureVerificationError: return {"error": "Invalid signature"} # Handle the event if event["type"] == "checkout.session.completed": session = event["data"]["object"] await handle_checkout_completed(session) elif event["type"] == "customer.subscription.updated": subscription = event["data"]["object"] await handle_subscription_updated(subscription) elif event["type"] == "customer.subscription.deleted": subscription = event["data"]["object"] await handle_subscription_deleted(subscription) elif event["type"] == "invoice.payment_failed": invoice = event["data"]["object"] await handle_payment_failed(invoice) return {"status": "success"} async def handle_checkout_completed(session: Dict): """Handle successful checkout""" metadata = session.get("metadata", {}) user_id = metadata.get("user_id") if not user_id: return # Check if it's a credit purchase if metadata.get("type") == "credits": credits = int(metadata.get("credits", 0)) add_credits(user_id, credits) return # It's a subscription plan = metadata.get("plan") if plan: subscription_raw = session.get("subscription") subscription_id = None subscription_ends_at = None if isinstance(subscription_raw, str): subscription_id = subscription_raw elif subscription_raw: # Expanded subscription object (from sync path with expand=["subscription"]) subscription_id = subscription_raw.get("id") period_end = subscription_raw.get("current_period_end") if period_end: subscription_ends_at = datetime.fromtimestamp(period_end).isoformat() update_user(user_id, { "plan": plan, "subscription_status": SubscriptionStatus.ACTIVE.value, "stripe_subscription_id": subscription_id, "subscription_ends_at": subscription_ends_at, "docs_translated_this_month": 0, "pages_translated_this_month": 0, }) async def handle_subscription_updated(subscription: Dict): """Handle subscription updates""" metadata = subscription.get("metadata", {}) user_id = metadata.get("user_id") if not user_id: return status_map = { "active": SubscriptionStatus.ACTIVE, "past_due": SubscriptionStatus.PAST_DUE, "canceled": SubscriptionStatus.CANCELED, "trialing": SubscriptionStatus.TRIALING, "paused": SubscriptionStatus.PAUSED, } stripe_status = subscription.get("status", "active") status = status_map.get(stripe_status, SubscriptionStatus.ACTIVE) update_user(user_id, { "subscription_status": status.value, "cancel_at_period_end": subscription.get("cancel_at_period_end", False), "subscription_ends_at": datetime.fromtimestamp( subscription.get("current_period_end", 0) ).isoformat() if subscription.get("current_period_end") else None }) async def handle_subscription_deleted(subscription: Dict): """Handle subscription cancellation""" metadata = subscription.get("metadata", {}) user_id = metadata.get("user_id") if not user_id: return update_user(user_id, { "plan": PlanType.FREE.value, "subscription_status": SubscriptionStatus.CANCELED.value, "stripe_subscription_id": None, }) async def handle_payment_failed(invoice: Dict): """Handle failed payment — set subscription status to PAST_DUE""" customer_id = invoice.get("customer") if not customer_id: return # Find user by stripe_customer_id user = None try: from database.connection import get_sync_session from database.models import User as DBUser with get_sync_session() as session: db_user = ( session.query(DBUser) .filter(DBUser.stripe_customer_id == customer_id) .first() ) if db_user: user_id = str(db_user.id) db_user.subscription_status = SubscriptionStatus.PAST_DUE session.commit() logger.warning( "Payment failed for customer %s (user %s) — status set to past_due", customer_id, user_id, ) except Exception as exc: logger.error("handle_payment_failed DB error: %s", exc) async def cancel_subscription(user_id: str) -> Dict[str, Any]: """Cancel a user's subscription at period end.""" if not is_stripe_configured(): return {"error": "Stripe not configured"} user = get_user_by_id(user_id) if not user or not user.stripe_subscription_id: return {"error": "No active subscription found"} try: subscription = stripe.Subscription.modify( user.stripe_subscription_id, cancel_at_period_end=True, ) cancel_at = None if subscription.cancel_at: cancel_at = datetime.fromtimestamp(subscription.cancel_at).isoformat() subscription_ends_at = None if subscription.current_period_end: subscription_ends_at = datetime.fromtimestamp(subscription.current_period_end).isoformat() update_user(user_id, { "cancel_at_period_end": True, "subscription_ends_at": subscription_ends_at, }) return { "status": "canceling", "cancel_at": cancel_at, "subscription_ends_at": subscription_ends_at, } except Exception as e: return {"error": str(e)} async def get_billing_portal_url(user_id: str) -> Optional[str]: """Get Stripe billing portal URL for customer""" if not is_stripe_configured(): return None user = get_user_by_id(user_id) if not user or not user.stripe_customer_id: return None try: session = stripe.billing_portal.Session.create( customer=user.stripe_customer_id, return_url=f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}/dashboard" ) return session.url except: return None