#!/usr/bin/env python3 """ Sauvegarde en JSON les glossaires dupliqués (même user_id + même nom) créés avant la mise en place de la garde anti-doublon dans le backend. ⚠️ Ce script ne supprime RIEN — il produit uniquement un fichier de backup contenant l'intégralité des doublons (métadonnées + termes) en vue d'une analyse ou d'une suppression manuelle ultérieure. Pour chaque couple (user_id, name) avec > 1 glossaire, le plus ancien (premier créé) est marqué "keeper" et les copies sont listées dans "duplicates" avec tous leurs termes. Les glossaires multilingues (« Français → Multilingue ») ont un nom distinct de leurs homologues « Français → Anglais » : ils ne sont jamais fusionnés. Usage: # Cible la base de prod PostgreSQL (lu via DATABASE_URL) : DATABASE_URL=postgresql://user:pass@host:5432/db python scripts/backup_duplicate_glossaries.py # Ou préciser une base SQLite spécifique : SQLITE_PATH=/path/to/translate.db python scripts/backup_duplicate_glossaries.py # Limiter à un seul utilisateur : DATABASE_URL=... python scripts/backup_duplicate_glossaries.py --user # Choisir le fichier de sortie : DATABASE_URL=... python scripts/backup_duplicate_glossaries.py --output backups/dupes.json """ import argparse import json import logging import sys from collections import defaultdict from datetime import datetime, timezone from pathlib import Path ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT)) from sqlalchemy import text from database.connection import get_sync_session logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) logger = logging.getLogger("backup_dup_glossaries") def find_duplicates( session, user_id: str | None = None, ) -> dict[tuple[str, str], list[dict]]: """Group glossaries by (user_id, name). Returns lightweight dicts (not ORM). Utilise du SQL brut pour rester robuste si la colonne `template_id` est absente du schéma (ex. ancienne DB de dev). En cas d'absence, retombe automatiquement sur une requête sans `template_id`. """ base_cols = "id, user_id, name, source_language, target_language, created_at, updated_at" try: sql = f"SELECT {base_cols}, template_id FROM glossaries" params: dict = {} if user_id: sql += " WHERE user_id = :user_id" params["user_id"] = user_id sql += " ORDER BY user_id, name, created_at" rows = session.execute(text(sql), params).fetchall() has_template_id = True except Exception as e: if "no such column" not in str(e).lower() and "undefined column" not in str(e).lower(): raise logger.warning("⚠️ Colonne `template_id` absente du schéma — fallback sans template_id.") sql = f"SELECT {base_cols} FROM glossaries" params = {} if user_id: sql += " WHERE user_id = :user_id" params["user_id"] = user_id sql += " ORDER BY user_id, name, created_at" rows = session.execute(text(sql), params).fetchall() has_template_id = False groups: dict[tuple[str, str], list[dict]] = defaultdict(list) for r in rows: groups[(r.user_id, r.name)].append({ "id": r.id, "user_id": r.user_id, "name": r.name, "source_language": r.source_language, "target_language": r.target_language, "template_id": r.template_id if has_template_id else None, "created_at": r.created_at, "updated_at": r.updated_at, }) return {k: v for k, v in groups.items() if len(v) > 1} def _stable_sort(glossaries: list[dict]) -> tuple[list[dict], int]: """Sort glossaries by (created_at ASC, id ASC) for deterministic ordering. Returns the sorted list and the number of entries with None created_at. """ none_count = sum(1 for g in glossaries if g["created_at"] is None) if none_count: logger.warning( "⚠️ %d glossaire(s) ont un created_at NULL — tri secondaire par id.", none_count, ) return sorted( glossaries, key=lambda g: (g["created_at"] or datetime.min.replace(tzinfo=timezone.utc), g["id"]), ), none_count def serialize_group(user_id: str, name: str, glossaries: list[dict]) -> dict: """Convert a duplicate group to a JSON-serializable dict.""" sorted_glossaries, _ = _stable_sort(glossaries) keeper = sorted_glossaries[0] duplicates = sorted_glossaries[1:] def to_iso(value) -> str | None: if value is None: return None if isinstance(value, datetime): return value.isoformat() if isinstance(value, str): try: return datetime.fromisoformat(value).isoformat() except ValueError: return value return str(value) def count_terms(session, glossary_id: str) -> int: return session.execute( text("SELECT COUNT(*) FROM glossary_terms WHERE glossary_id = :id"), {"id": glossary_id}, ).scalar() or 0 with get_sync_session() as session: keeper_terms = count_terms(session, keeper["id"]) duplicate_payload = [] for d in duplicates: tcount = count_terms(session, d["id"]) duplicate_payload.append({ **d, "created_at": to_iso(d["created_at"]), "updated_at": to_iso(d["updated_at"]), "terms": _fetch_terms(session, d["id"]), "terms_count": tcount, }) return { "user_id": user_id, "name": name, "keep": { "id": keeper["id"], "name": keeper["name"], "source_language": keeper["source_language"], "target_language": keeper["target_language"], "template_id": keeper["template_id"], "created_at": to_iso(keeper["created_at"]), "updated_at": to_iso(keeper["updated_at"]), "terms_count": keeper_terms, }, "duplicates_count": len(duplicates), "duplicates": duplicate_payload, } def _fetch_terms(session, glossary_id: str) -> list[dict]: """Fetch all terms for a glossary (used to back up duplicates before deletion).""" rows = session.execute( text( "SELECT id, source, target, translations " "FROM glossary_terms WHERE glossary_id = :id ORDER BY id" ), {"id": glossary_id}, ).fetchall() return [ { "id": r.id, "source": r.source, "target": r.target, "translations": r.translations or {}, } for r in rows ] def write_backup(groups: dict[tuple[str, str], list[dict]], output_path: Path) -> dict: """Write the full backup to `output_path` and return a stats dict.""" output_path.parent.mkdir(parents=True, exist_ok=True) payload = { "generated_at": datetime.now(timezone.utc).isoformat(), "schema_version": 2, "note": "Aucun glossaire n'a été supprimé. Ce fichier documente les doublons. " "Les glossaires multilingues (« → Multilingue ») ont un nom distinct " "et ne sont jamais fusionnés avec leurs homologues « → Anglais ».", "total_groups": len(groups), "total_duplicates": sum(len(v) - 1 for v in groups.values()), "groups": [ serialize_group(uid, name, gs) for (uid, name), gs in sorted(groups.items(), key=lambda x: (x[0][1], x[0][0])) ], } with open(output_path, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) return payload def print_report(payload: dict) -> None: """Print a human-readable summary on stdout.""" logger.info("=" * 78) logger.info("Récapitulatif du backup") logger.info("=" * 78) logger.info("Généré le : %s", payload["generated_at"]) logger.info("Groupes concernés: %d", payload["total_groups"]) logger.info("Doublons totaux : %d", payload["total_duplicates"]) logger.info("") logger.info("Détail par nom :") for g in payload["groups"]: logger.info(" '%s' → %d doublon(s) à supprimer", g["name"], g["duplicates_count"]) logger.info("=" * 78) def main() -> int: parser = argparse.ArgumentParser( description="Sauvegarde (sans suppression) les glossaires dupliqués en JSON." ) parser.add_argument( "--user", metavar="USER_ID", help="Limite le backup à un seul utilisateur.", ) parser.add_argument( "--output", metavar="PATH", help="Chemin du fichier JSON de sortie (défaut : backups/glossary_duplicates_.json).", ) args = parser.parse_args() logger.info( "🔍 Recherche de doublons (user_id, name)%s…", f" pour user_id={args.user}" if args.user else "", ) with get_sync_session() as session: groups = find_duplicates(session, user_id=args.user) if not groups: if args.user: logger.info("✅ Aucun doublon trouvé pour user_id=%s.", args.user) else: logger.info("✅ Aucun doublon trouvé.") return 0 if args.output: output_path = Path(args.output) else: ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") output_path = ROOT / "backups" / f"glossary_duplicates_{ts}.json" logger.info("💾 Écriture du backup vers %s …", output_path) payload = write_backup(groups, output_path) print_report(payload) logger.info("✅ Backup écrit : %s (%d octets)", output_path, output_path.stat().st_size) logger.info("ℹ️ Aucune suppression effectuée. Relire le JSON pour décider de l'action manuelle.") return 0 if __name__ == "__main__": sys.exit(main())