#!/usr/bin/env python3 """ Supprime les glossaires dupliqués en se basant sur un fichier JSON de backup produit par `scripts/backup_duplicate_glossaries.py`. ⚠️ DESTRUCTIF. Par défaut, demande confirmation interactive avant la suppression. Utiliser --yes pour les exécutions automatisées. Pour chaque groupe (user_id, name) dans le JSON, le glossaire listé dans "keep" est conservé, ceux listés dans "duplicates" sont supprimés (ainsi que leurs termes via cascade). Les glossaires multilingues (« → Multilingue ») ont un nom distinct et ne peuvent pas être inclus dans un groupe de doublons : ils sont préservés. Usage: # Dry-run (relecture) : python scripts/delete_duplicate_glossaries.py backups/glossary_duplicates_xxx.json --dry-run # Confirmation interactive : python scripts/delete_duplicate_glossaries.py backups/glossary_duplicates_xxx.json # Sans confirmation (CI / cron) : python scripts/delete_duplicate_glossaries.py backups/glossary_duplicates_xxx.json --yes # Re-génère le backup à la volée (si BACKUP_JSON absent) : DATABASE_URL=... python scripts/delete_duplicate_glossaries.py --user --dry-run # Backup auto + suppression : DATABASE_URL=... python scripts/delete_duplicate_glossaries.py --user --yes """ import argparse import json import logging import os import subprocess import sys from datetime import datetime, timezone from pathlib import Path ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT)) from sqlalchemy import text from database.connection import get_sync_session logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) logger = logging.getLogger("delete_dup_glossaries") def load_backup(path: Path) -> dict: """Load and validate a backup JSON file.""" if not path.exists(): logger.error("❌ Fichier de backup introuvable : %s", path) sys.exit(2) with open(path, "r", encoding="utf-8") as f: data = json.load(f) if "groups" not in data: logger.error("❌ Le fichier %s n'a pas de clé 'groups' — format invalide.", path) sys.exit(2) return data def validate_ids(session, backup: dict) -> tuple[list[dict], list[str], list[str]]: """Check that every duplicate id exists in the DB and matches the expected user. Returns (valid_groups, errors, warnings). - errors: bloquants (mismatch owner / id introuvable) - warnings: informatifs (user parent absent — données orphelines) """ errors: list[str] = [] warnings: list[str] = [] valid: list[dict] = [] for group in backup["groups"]: user_id = group["user_id"] user_exists = session.execute( text("SELECT id FROM users WHERE id = :uid"), {"uid": user_id} ).first() if not user_exists: warnings.append( f"User parent introuvable (données orphelines) : user_id={user_id} — " f"la suppression des glossaires associés sera tentée." ) for dup in group["duplicates"]: dup_id = dup["id"] row = session.execute( text("SELECT id FROM glossaries WHERE id = :gid AND user_id = :uid"), {"gid": dup_id, "uid": user_id}, ).first() if not row: errors.append( f"Glossary introuvable ou owner mismatch : id={dup_id} user={user_id}" ) valid.append(group) return valid, errors, warnings def print_preview(valid_groups: list[dict]) -> tuple[int, int]: """Print what would be deleted. Returns (total_dupes, total_terms).""" total_dupes = 0 total_terms = 0 logger.info("=" * 78) logger.info("Aperçu de la suppression") logger.info("=" * 78) for g in valid_groups: total_dupes += g["duplicates_count"] for d in g["duplicates"]: total_terms += d.get("terms_count", len(d.get("terms", []))) logger.info("Groupes à traiter : %d", len(valid_groups)) logger.info("Glossaires à supprimer : %d", total_dupes) logger.info("Termes concernés : %d", total_terms) logger.info("") logger.info("Détail par nom :") for g in valid_groups: logger.info(" '%s' → %d doublon(s)", g["name"], g["duplicates_count"]) logger.info("=" * 78) return total_dupes, total_terms def delete_group(session, group: dict) -> tuple[int, int]: """Delete the duplicates of a single group. Returns (glossaries_deleted, terms_deleted).""" deleted = 0 terms_deleted = 0 for dup in group["duplicates"]: term_count = session.execute( text("SELECT COUNT(*) FROM glossary_terms WHERE glossary_id = :gid"), {"gid": dup["id"]}, ).scalar() or 0 try: session.execute( text("DELETE FROM glossary_terms WHERE glossary_id = :gid"), {"gid": dup["id"]}, ) session.execute( text("DELETE FROM glossaries WHERE id = :gid"), {"gid": dup["id"]}, ) session.flush() except Exception as e: session.rollback() raise RuntimeError(f"Échec suppression glossary {dup['id']}: {e}") from e deleted += 1 terms_deleted += term_count logger.info(" 🗑️ Supprimé id=%s (%d termes)", dup["id"], term_count) return deleted, terms_deleted def perform_deletion(backup: dict, dry_run: bool) -> int: """Run the full deletion flow. Returns process exit code.""" with get_sync_session() as session: valid_groups, errors, warnings = validate_ids(session, backup) if errors: logger.error("❌ %d erreur(s) de validation :", len(errors)) for e in errors: logger.error(" - %s", e) logger.error("Annulation. Corrigez le backup ou la DB puis ré-essayez.") return 3 for w in warnings: logger.warning("⚠️ %s", w) if not valid_groups: logger.info("✅ Aucun groupe à supprimer.") return 0 total_dupes, total_terms = print_preview(valid_groups) if dry_run: logger.info("⚠️ Mode --dry-run : aucune suppression effectuée.") return 0 if total_dupes == 0: return 0 # Commit par groupe (user + name) pour limiter l'impact d'une erreur partielle. grand_deleted = 0 grand_terms = 0 for group in valid_groups: user_id = group["user_id"] name = group["name"] logger.info("👤 user=%s name=%s — suppression…", user_id, name) try: deleted, terms = delete_group(session, group) session.commit() grand_deleted += deleted grand_terms += terms except Exception as e: session.rollback() logger.error("❌ Échec pour user=%s name=%s : %s", user_id, name, e) logger.error(" Transaction annulée pour ce groupe, on continue.") logger.info("=" * 78) logger.info( "✅ Terminé : %d glossaire(s) supprimé(s), %d termes supprimé(s).", grand_deleted, grand_terms, ) logger.info("=" * 78) return 0 def confirm(prompt: str) -> bool: """Ask the user for confirmation. Returns True if user accepts.""" try: answer = input(f"{prompt} [oui/non] : ").strip().lower() except EOFError: return False return answer in ("oui", "o", "yes", "y") def regenerate_backup(user_id: str | None) -> Path: """Run the backup script as a subprocess to get a fresh JSON.""" timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") out_path = ROOT / "backups" / f"glossary_duplicates_{timestamp}.json" cmd = [ sys.executable, str(ROOT / "scripts" / "backup_duplicate_glossaries.py"), ] if user_id: cmd += ["--user", user_id] cmd += ["--output", str(out_path)] logger.info("🔄 Génération d'un backup frais : %s", " ".join(cmd)) res = subprocess.run(cmd, env=os.environ.copy()) if res.returncode != 0: logger.error("❌ Échec de la génération du backup (code=%d).", res.returncode) sys.exit(2) return out_path def main() -> int: parser = argparse.ArgumentParser( description="Supprime les doublons de glossaires en se basant sur un backup JSON." ) parser.add_argument( "input", nargs="?", metavar="BACKUP_JSON", help="Fichier JSON de backup. Si absent, --user doit être fourni pour en générer un.", ) parser.add_argument( "--user", metavar="USER_ID", help="Génère un backup frais limité à cet utilisateur (utilisé si BACKUP_JSON absent).", ) parser.add_argument( "--dry-run", action="store_true", help="Affiche ce qui serait supprimé sans rien modifier.", ) parser.add_argument( "--yes", action="store_true", help="Ne demande pas de confirmation interactive.", ) args = parser.parse_args() # Résoudre le fichier d'entrée if args.input: backup_path = Path(args.input) elif args.user: backup_path = regenerate_backup(args.user) else: parser.error("Fournissez un BACKUP_JSON ou bien --user USER_ID.") backup = load_backup(backup_path) logger.info("📄 Backup chargé : %s (généré le %s)", backup_path, backup.get("generated_at")) if not args.dry_run and not args.yes: total = backup.get("total_duplicates", 0) if total == 0: logger.info("✅ Aucun doublon à supprimer dans ce backup.") return 0 if not confirm(f"Supprimer {total} glossaire(s) listé(s) dans le backup ?"): logger.info("Annulé par l'utilisateur.") return 1 return perform_deletion(backup, dry_run=args.dry_run) if __name__ == "__main__": sys.exit(main())