fix(glossaries): dedup by (user_id, name) instead of (user_id, template_id)
All checks were successful
Deploy to Production / Build and Deploy (push) Successful in 3m1s

Le groupement par template_id etait faux sur la prod :
- Les doublons historiques ont template_id=NULL (crees avant la migration)
- Deux glossaires 'Finance - FR->Anglais' et 'Finance - FR->Multilingue'
  partagent le meme template_id mais DOIVENT etre conserves separement.

Changements :
- Groupement par (user_id, name) -> c'est ce que l'utilisateur voit dans l'UI
  et la definition reelle d'un doublon.
- Les glossaires multilingues ('-> Multilingue') ont un nom distinct des
  versions '-> Anglais' : ils ne sont jamais fusionnes (preserve par design).
- Fallback automatique si la colonne template_id est absente du schema
  (dev DB) : warning + requete sans la colonne, aucun crash.
- Suppression du flag --allow-missing-template-id devenu inutile.
- Nettoyage des imports ORM inutiles (text brut uniquement, plus rapide).
This commit is contained in:
Sepehr
2026-06-03 21:48:53 +02:00
parent cd32a42b1a
commit 8f55e3d9aa
3 changed files with 147 additions and 200 deletions

View File

@@ -59,7 +59,7 @@ jobs:
done done
# Construire les flags. # Construire les flags.
FLAGS="--user \${USER_ID} --allow-missing-template-id" FLAGS="--user \${USER_ID}"
if [ "\${DRY_RUN}" = "true" ]; then if [ "\${DRY_RUN}" = "true" ]; then
FLAGS="\${FLAGS} --dry-run" FLAGS="\${FLAGS} --dry-run"
fi fi

View File

@@ -1,16 +1,19 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Sauvegarde en JSON les glossaires dupliqués (même user_id + template_id) créés Sauvegarde en JSON les glossaires dupliqués (même user_id + même nom) créés
avant la mise en place de la garde anti-doublon dans le backend. avant la mise en place de la garde anti-doublon dans le backend.
⚠️ Ce script ne supprime RIEN — il produit uniquement un fichier de backup ⚠️ Ce script ne supprime RIEN — il produit uniquement un fichier de backup
contenant l'intégralité des doublons (métadonnées + termes) en vue d'une contenant l'intégralité des doublons (métadonnées + termes) en vue d'une
analyse ou d'une suppression manuelle ultérieure. analyse ou d'une suppression manuelle ultérieure.
Pour chaque couple (user_id, template_id) avec > 1 glossaire, le plus ancien Pour chaque couple (user_id, name) avec > 1 glossaire, le plus ancien
(premier créé) est marqué "keeper" et les copies sont listées dans "duplicates" (premier créé) est marqué "keeper" et les copies sont listées dans "duplicates"
avec tous leurs termes. avec tous leurs termes.
Les glossaires multilingues (« Français → Multilingue ») ont un nom distinct
de leurs homologues « Français → Anglais » : ils ne sont jamais fusionnés.
Usage: Usage:
# Cible la base de prod PostgreSQL (lu via DATABASE_URL) : # Cible la base de prod PostgreSQL (lu via DATABASE_URL) :
DATABASE_URL=postgresql://user:pass@host:5432/db python scripts/backup_duplicate_glossaries.py DATABASE_URL=postgresql://user:pass@host:5432/db python scripts/backup_duplicate_glossaries.py
@@ -23,10 +26,6 @@ Usage:
# Choisir le fichier de sortie : # Choisir le fichier de sortie :
DATABASE_URL=... python scripts/backup_duplicate_glossaries.py --output backups/dupes.json DATABASE_URL=... python scripts/backup_duplicate_glossaries.py --output backups/dupes.json
# Forcer l'exécution même si la colonne template_id est absente du schéma
# (utile pour un dump partiel des glossaires sans template_id) :
DATABASE_URL=... python scripts/backup_duplicate_glossaries.py --allow-missing-template-id
""" """
import argparse import argparse
@@ -40,10 +39,9 @@ from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT)) sys.path.insert(0, str(ROOT))
from sqlalchemy import inspect, text from sqlalchemy import text
from database.connection import sync_engine from database.connection import get_sync_session
from database.models import Glossary
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@@ -52,80 +50,71 @@ logging.basicConfig(
logger = logging.getLogger("backup_dup_glossaries") logger = logging.getLogger("backup_dup_glossaries")
def _has_template_id_column() -> bool:
"""Vérifie que la colonne `template_id` existe sur la table `glossaries`."""
try:
inspector = inspect(sync_engine)
cols = {c["name"] for c in inspector.get_columns("glossaries")}
return "template_id" in cols
except Exception as e:
logger.error("Impossible d'inspecter le schéma : %s", e)
return False
def find_duplicates( def find_duplicates(
session, user_id: str | None = None, include_no_template: bool = False, session, user_id: str | None = None,
use_raw_query: bool = False, ) -> dict[tuple[str, str], list[dict]]:
) -> dict[tuple[str, str | None], list[Glossary]]: """Group glossaries by (user_id, name). Returns lightweight dicts (not ORM).
"""Group glossaries by (user_id, template_id). By default, only template-linked ones.
Si `include_no_template=True`, les glossaires sans template_id sont groupés sous Utilise du SQL brut pour rester robuste si la colonne `template_id` est
la clé (user_id, None). absente du schéma (ex. ancienne DB de dev). En cas d'absence, retombe
automatiquement sur une requête sans `template_id`.
Si `use_raw_query=True`, on bypass l'ORM (utile quand la colonne `template_id`
n'existe pas dans le schéma).
""" """
if use_raw_query: base_cols = "id, user_id, name, source_language, target_language, created_at, updated_at"
# Bypass ORM : SELECT * ne dépend pas du mapping modèle. try:
sql = "SELECT id, user_id, name, source_language, target_language, " \ sql = f"SELECT {base_cols}, template_id FROM glossaries"
"created_at, updated_at FROM glossaries"
params: dict = {} params: dict = {}
clauses: list[str] = []
if user_id: if user_id:
clauses.append("user_id = :user_id") sql += " WHERE user_id = :user_id"
params["user_id"] = user_id params["user_id"] = user_id
if clauses: sql += " ORDER BY user_id, name, created_at"
sql += " WHERE " + " AND ".join(clauses)
rows = session.execute(text(sql), params).fetchall() rows = session.execute(text(sql), params).fetchall()
has_template_id = True
except Exception as e:
if "no such column" not in str(e).lower() and "undefined column" not in str(e).lower():
raise
logger.warning("⚠️ Colonne `template_id` absente du schéma — fallback sans template_id.")
sql = f"SELECT {base_cols} FROM glossaries"
params = {}
if user_id:
sql += " WHERE user_id = :user_id"
params["user_id"] = user_id
sql += " ORDER BY user_id, name, created_at"
rows = session.execute(text(sql), params).fetchall()
has_template_id = False
groups: dict[tuple[str, str | None], list[Glossary]] = defaultdict(list) groups: dict[tuple[str, str], list[dict]] = defaultdict(list)
for r in rows: for r in rows:
# Sans colonne template_id, on groupe par (user_id, None). groups[(r.user_id, r.name)].append({
g = Glossary( "id": r.id,
id=r.id, user_id=r.user_id, name=r.name, "user_id": r.user_id,
source_language=r.source_language, target_language=r.target_language, "name": r.name,
created_at=r.created_at, updated_at=r.updated_at, "source_language": r.source_language,
) "target_language": r.target_language,
groups[(g.user_id, None)].append(g) "template_id": r.template_id if has_template_id else None,
return {k: v for k, v in groups.items() if len(v) > 1} "created_at": r.created_at,
"updated_at": r.updated_at,
q = session.query(Glossary) })
if not include_no_template:
q = q.filter(Glossary.template_id.isnot(None))
if user_id:
q = q.filter(Glossary.user_id == user_id)
groups = defaultdict(list)
for g in q.all():
groups[(g.user_id, g.template_id)].append(g)
return {k: v for k, v in groups.items() if len(v) > 1} return {k: v for k, v in groups.items() if len(v) > 1}
def _stable_sort(glossaries: list[Glossary]) -> tuple[list[Glossary], int]: def _stable_sort(glossaries: list[dict]) -> tuple[list[dict], int]:
"""Sort glossaries by (created_at ASC, id ASC) for deterministic ordering. """Sort glossaries by (created_at ASC, id ASC) for deterministic ordering.
Returns the sorted list and the number of entries with None created_at. Returns the sorted list and the number of entries with None created_at.
""" """
none_count = sum(1 for g in glossaries if g.created_at is None) none_count = sum(1 for g in glossaries if g["created_at"] is None)
if none_count: if none_count:
logger.warning( logger.warning(
"⚠️ %d glossaire(s) ont un created_at NULL — tri secondaire par id.", "⚠️ %d glossaire(s) ont un created_at NULL — tri secondaire par id.",
none_count, none_count,
) )
return sorted(glossaries, key=lambda g: (g.created_at or datetime.min.replace(tzinfo=timezone.utc), g.id)), none_count return sorted(
glossaries,
key=lambda g: (g["created_at"] or datetime.min.replace(tzinfo=timezone.utc), g["id"]),
), none_count
def serialize_group(user_id: str, template_id: str | None, glossaries: list[Glossary]) -> dict: def serialize_group(user_id: str, name: str, glossaries: list[dict]) -> dict:
"""Convert a duplicate group to a JSON-serializable dict.""" """Convert a duplicate group to a JSON-serializable dict."""
sorted_glossaries, _ = _stable_sort(glossaries) sorted_glossaries, _ = _stable_sort(glossaries)
keeper = sorted_glossaries[0] keeper = sorted_glossaries[0]
@@ -136,7 +125,6 @@ def serialize_group(user_id: str, template_id: str | None, glossaries: list[Glos
return None return None
if isinstance(value, datetime): if isinstance(value, datetime):
return value.isoformat() return value.isoformat()
# SQLite renvoie parfois des strings ; on tente de normaliser.
if isinstance(value, str): if isinstance(value, str):
try: try:
return datetime.fromisoformat(value).isoformat() return datetime.fromisoformat(value).isoformat()
@@ -144,51 +132,79 @@ def serialize_group(user_id: str, template_id: str | None, glossaries: list[Glos
return value return value
return str(value) return str(value)
def serialize_glossary(g: Glossary, include_terms: bool) -> dict: def count_terms(session, glossary_id: str) -> int:
data = { return session.execute(
"id": g.id, text("SELECT COUNT(*) FROM glossary_terms WHERE glossary_id = :id"),
"name": g.name, {"id": glossary_id},
"source_language": g.source_language, ).scalar() or 0
"target_language": g.target_language,
"template_id": getattr(g, "template_id", None), with get_sync_session() as session:
"created_at": to_iso(g.created_at), keeper_terms = count_terms(session, keeper["id"])
"updated_at": to_iso(g.updated_at), duplicate_payload = []
} for d in duplicates:
if include_terms: tcount = count_terms(session, d["id"])
data["terms"] = [ duplicate_payload.append({
{ **d,
"id": t.id, "created_at": to_iso(d["created_at"]),
"source": t.source, "updated_at": to_iso(d["updated_at"]),
"target": t.target, "terms": _fetch_terms(session, d["id"]),
"translations": t.translations or {}, "terms_count": tcount,
} })
for t in g.terms
] if g.terms else []
else:
data["terms_count"] = len(g.terms) if g.terms else 0
return data
return { return {
"user_id": user_id, "user_id": user_id,
"template_id": template_id, "name": name,
"keep": serialize_glossary(keeper, include_terms=False), "keep": {
"id": keeper["id"],
"name": keeper["name"],
"source_language": keeper["source_language"],
"target_language": keeper["target_language"],
"template_id": keeper["template_id"],
"created_at": to_iso(keeper["created_at"]),
"updated_at": to_iso(keeper["updated_at"]),
"terms_count": keeper_terms,
},
"duplicates_count": len(duplicates), "duplicates_count": len(duplicates),
"duplicates": [serialize_glossary(d, include_terms=True) for d in duplicates], "duplicates": duplicate_payload,
} }
def write_backup(groups: dict[tuple[str, str | None], list[Glossary]], output_path: Path) -> dict: def _fetch_terms(session, glossary_id: str) -> list[dict]:
"""Fetch all terms for a glossary (used to back up duplicates before deletion)."""
rows = session.execute(
text(
"SELECT id, source, target, translations "
"FROM glossary_terms WHERE glossary_id = :id ORDER BY id"
),
{"id": glossary_id},
).fetchall()
return [
{
"id": r.id,
"source": r.source,
"target": r.target,
"translations": r.translations or {},
}
for r in rows
]
def write_backup(groups: dict[tuple[str, str], list[dict]], output_path: Path) -> dict:
"""Write the full backup to `output_path` and return a stats dict.""" """Write the full backup to `output_path` and return a stats dict."""
output_path.parent.mkdir(parents=True, exist_ok=True) output_path.parent.mkdir(parents=True, exist_ok=True)
payload = { payload = {
"generated_at": datetime.now(timezone.utc).isoformat(), "generated_at": datetime.now(timezone.utc).isoformat(),
"schema_version": 1, "schema_version": 2,
"note": "Aucun glossaire n'a été supprimé. Ce fichier documente les doublons.", "note": "Aucun glossaire n'a été supprimé. Ce fichier documente les doublons. "
"Les glossaires multilingues (« → Multilingue ») ont un nom distinct "
"et ne sont jamais fusionnés avec leurs homologues « → Anglais ».",
"total_groups": len(groups), "total_groups": len(groups),
"total_duplicates": sum(len(v) - 1 for v in groups.values()), "total_duplicates": sum(len(v) - 1 for v in groups.values()),
"groups": [serialize_group(uid, tid, gs) for (uid, tid), gs in "groups": [
sorted(groups.items(), key=lambda x: (x[0][1] or "", x[0][0]))], serialize_group(uid, name, gs)
for (uid, name), gs in sorted(groups.items(), key=lambda x: (x[0][1], x[0][0]))
],
} }
with open(output_path, "w", encoding="utf-8") as f: with open(output_path, "w", encoding="utf-8") as f:
@@ -205,17 +221,10 @@ def print_report(payload: dict) -> None:
logger.info("Généré le : %s", payload["generated_at"]) logger.info("Généré le : %s", payload["generated_at"])
logger.info("Groupes concernés: %d", payload["total_groups"]) logger.info("Groupes concernés: %d", payload["total_groups"])
logger.info("Doublons totaux : %d", payload["total_duplicates"]) logger.info("Doublons totaux : %d", payload["total_duplicates"])
logger.info("")
by_template: dict[str, int] = defaultdict(int) logger.info("Détail par nom :")
for g in payload["groups"]: for g in payload["groups"]:
tid = g["template_id"] or "(no template)" logger.info(" '%s'%d doublon(s) à supprimer", g["name"], g["duplicates_count"])
by_template[tid] += g["duplicates_count"]
if by_template:
logger.info("")
logger.info("Par template :")
for tid, count in sorted(by_template.items()):
logger.info(" %-14s %d doublon(s)", tid, count)
logger.info("=" * 78) logger.info("=" * 78)
@@ -233,50 +242,15 @@ def main() -> int:
metavar="PATH", metavar="PATH",
help="Chemin du fichier JSON de sortie (défaut : backups/glossary_duplicates_<timestamp>.json).", help="Chemin du fichier JSON de sortie (défaut : backups/glossary_duplicates_<timestamp>.json).",
) )
parser.add_argument(
"--include-no-template",
action="store_true",
help="Inclut aussi les glossaires sans template_id dans la recherche de doublons.",
)
parser.add_argument(
"--allow-missing-template-id",
action="store_true",
help="Continue sans erreur si la colonne `template_id` est absente du schéma "
"(équivaut à --include-no-template, mais le script ne plantera pas).",
)
args = parser.parse_args() args = parser.parse_args()
if not _has_template_id_column():
if args.allow_missing_template_id or args.include_no_template:
logger.warning("⚠️ Colonne `template_id` absente — bascule en mode sans-template.")
args.include_no_template = True
else:
logger.error(
"❌ La colonne `glossaries.template_id` est absente du schéma actuel. "
"Appliquez d'abord la migration Alembic (alembic upgrade head) ou relancez "
"avec --allow-missing-template-id pour ne sauvegarder que les glossaires sans template_id."
)
return 2
from database.connection import get_sync_session
use_raw = args.allow_missing_template_id or args.include_no_template
if args.allow_missing_template_id and not args.include_no_template:
args.include_no_template = True
logger.info( logger.info(
"🔍 Recherche de doublons%s%s", "🔍 Recherche de doublons (user_id, name)%s",
f" pour user_id={args.user}" if args.user else "", f" pour user_id={args.user}" if args.user else "",
" (incl. sans template)" if args.include_no_template else "",
) )
with get_sync_session() as session: with get_sync_session() as session:
groups = find_duplicates( groups = find_duplicates(session, user_id=args.user)
session,
user_id=args.user,
include_no_template=args.include_no_template,
use_raw_query=use_raw,
)
if not groups: if not groups:
if args.user: if args.user:

View File

@@ -3,11 +3,15 @@
Supprime les glossaires dupliqués en se basant sur un fichier JSON de backup Supprime les glossaires dupliqués en se basant sur un fichier JSON de backup
produit par `scripts/backup_duplicate_glossaries.py`. produit par `scripts/backup_duplicate_glossaries.py`.
⚠️ DESTRUCTIF. Par défaut, demande confirmation interactive avant chaque ⚠️ DESTRUCTIF. Par défaut, demande confirmation interactive avant la
suppression. Utiliser --yes pour les exécutions automatisées. suppression. Utiliser --yes pour les exécutions automatisées.
Pour chaque groupe dans le JSON, le glossaire listé dans "keep" est conservé, Pour chaque groupe (user_id, name) dans le JSON, le glossaire listé dans "keep"
ceux listés dans "duplicates" sont supprimés (ainsi que leurs termes via cascade). est conservé, ceux listés dans "duplicates" sont supprimés (ainsi que leurs
termes via cascade).
Les glossaires multilingues (« → Multilingue ») ont un nom distinct et ne
peuvent pas être inclus dans un groupe de doublons : ils sont préservés.
Usage: Usage:
# Dry-run (relecture) : # Dry-run (relecture) :
@@ -19,16 +23,18 @@ Usage:
# Sans confirmation (CI / cron) : # Sans confirmation (CI / cron) :
python scripts/delete_duplicate_glossaries.py backups/glossary_duplicates_xxx.json --yes python scripts/delete_duplicate_glossaries.py backups/glossary_duplicates_xxx.json --yes
# Re-génère le backup à la volée (si --input absent) : # Re-génère le backup à la volée (si BACKUP_JSON absent) :
DATABASE_URL=... python scripts/delete_duplicate_glossaries.py --user <USER_ID> --dry-run DATABASE_URL=... python scripts/delete_duplicate_glossaries.py --user <USER_ID> --dry-run
# Back-up automatique avant suppression (recommandé) : # Backup auto + suppression :
python scripts/delete_duplicate_glossaries.py --user <USER_ID> --yes DATABASE_URL=... python scripts/delete_duplicate_glossaries.py --user <USER_ID> --yes
""" """
import argparse import argparse
import json import json
import logging import logging
import os
import subprocess
import sys import sys
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
@@ -36,10 +42,9 @@ from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT)) sys.path.insert(0, str(ROOT))
from sqlalchemy import inspect, text from sqlalchemy import text
from database.connection import get_sync_session, sync_engine from database.connection import get_sync_session
from database.models import Glossary, User
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@@ -67,9 +72,6 @@ def validate_ids(session, backup: dict) -> tuple[list[dict], list[str], list[str
Returns (valid_groups, errors, warnings). Returns (valid_groups, errors, warnings).
- errors: bloquants (mismatch owner / id introuvable) - errors: bloquants (mismatch owner / id introuvable)
- warnings: informatifs (user parent absent — données orphelines) - warnings: informatifs (user parent absent — données orphelines)
Utilise du SQL brut pour ne pas dépendre du mapping ORM (qui planterait
si la colonne `template_id` n'existe pas dans le schéma).
""" """
errors: list[str] = [] errors: list[str] = []
warnings: list[str] = [] warnings: list[str] = []
@@ -104,7 +106,7 @@ def validate_ids(session, backup: dict) -> tuple[list[dict], list[str], list[str
def print_preview(valid_groups: list[dict]) -> tuple[int, int]: def print_preview(valid_groups: list[dict]) -> tuple[int, int]:
"""Print what would be deleted. Returns (total_dupes, total_user_groups).""" """Print what would be deleted. Returns (total_dupes, total_terms)."""
total_dupes = 0 total_dupes = 0
total_terms = 0 total_terms = 0
@@ -112,50 +114,33 @@ def print_preview(valid_groups: list[dict]) -> tuple[int, int]:
logger.info("Aperçu de la suppression") logger.info("Aperçu de la suppression")
logger.info("=" * 78) logger.info("=" * 78)
by_template: dict[str, int] = {}
for g in valid_groups: for g in valid_groups:
tid = g.get("template_id") or "(no template)"
by_template[tid] = by_template.get(tid, 0) + g["duplicates_count"]
total_dupes += g["duplicates_count"] total_dupes += g["duplicates_count"]
total_terms += sum(len(d.get("terms", [])) for d in g["duplicates"]) for d in g["duplicates"]:
total_terms += d.get("terms_count", len(d.get("terms", [])))
logger.info("Groupes à traiter : %d", len(valid_groups)) logger.info("Groupes à traiter : %d", len(valid_groups))
logger.info("Glossaires à supprimer : %d", total_dupes) logger.info("Glossaires à supprimer : %d", total_dupes)
logger.info("Termes concernés (estim.) : %d", total_terms) logger.info("Termes concernés : %d", total_terms)
logger.info("") logger.info("")
logger.info("Détail par template :") logger.info("Détail par nom :")
for tid, count in sorted(by_template.items()): for g in valid_groups:
logger.info(" %-14s %d doublon(s) à supprimer", tid, count) logger.info(" '%s' %d doublon(s)", g["name"], g["duplicates_count"])
logger.info("=" * 78) logger.info("=" * 78)
return total_dupes, total_terms return total_dupes, total_terms
def delete_group(session, group: dict) -> tuple[int, int]: def delete_group(session, group: dict) -> tuple[int, int]:
"""Delete the duplicates of a single group. Returns (glossaries_deleted, terms_deleted). """Delete the duplicates of a single group. Returns (glossaries_deleted, terms_deleted)."""
Utilise SQL brut pour ne pas dépendre du mapping ORM (qui planterait
si la colonne `template_id` n'existe pas dans le schéma).
"""
deleted = 0 deleted = 0
terms_deleted = 0 terms_deleted = 0
for dup in group["duplicates"]: for dup in group["duplicates"]:
# 1. Compter les termes (avant suppression, pour les logs).
term_count = session.execute( term_count = session.execute(
text("SELECT COUNT(*) FROM glossary_terms WHERE glossary_id = :gid"), text("SELECT COUNT(*) FROM glossary_terms WHERE glossary_id = :gid"),
{"gid": dup["id"]}, {"gid": dup["id"]},
).scalar() or 0 ).scalar() or 0
# 2. Récupérer le template_id pour le log (best effort).
try:
tpl = session.execute(
text("SELECT template_id FROM glossaries WHERE id = :gid"),
{"gid": dup["id"]},
).scalar()
except Exception:
tpl = None
# 3. Supprimer d'abord les termes (FK), puis le glossaire.
try: try:
session.execute( session.execute(
text("DELETE FROM glossary_terms WHERE glossary_id = :gid"), text("DELETE FROM glossary_terms WHERE glossary_id = :gid"),
@@ -172,10 +157,7 @@ def delete_group(session, group: dict) -> tuple[int, int]:
deleted += 1 deleted += 1
terms_deleted += term_count terms_deleted += term_count
logger.info( logger.info(" 🗑️ Supprimé id=%s (%d termes)", dup["id"], term_count)
" 🗑️ Supprimé id=%s (template=%s, %d termes)",
dup["id"], tpl, term_count,
)
return deleted, terms_deleted return deleted, terms_deleted
@@ -206,13 +188,13 @@ def perform_deletion(backup: dict, dry_run: bool) -> int:
if total_dupes == 0: if total_dupes == 0:
return 0 return 0
# Commit par user pour limiter l'impact d'une erreur partielle (F6). # Commit par groupe (user + name) pour limiter l'impact d'une erreur partielle.
grand_deleted = 0 grand_deleted = 0
grand_terms = 0 grand_terms = 0
for group in valid_groups: for group in valid_groups:
user_id = group["user_id"] user_id = group["user_id"]
tid = group.get("template_id") name = group["name"]
logger.info("👤 user=%s template=%s — suppression…", user_id, tid) logger.info("👤 user=%s name=%s — suppression…", user_id, name)
try: try:
deleted, terms = delete_group(session, group) deleted, terms = delete_group(session, group)
session.commit() session.commit()
@@ -220,7 +202,7 @@ def perform_deletion(backup: dict, dry_run: bool) -> int:
grand_terms += terms grand_terms += terms
except Exception as e: except Exception as e:
session.rollback() session.rollback()
logger.error("❌ Échec pour user=%s template=%s : %s", user_id, tid, e) logger.error("❌ Échec pour user=%s name=%s : %s", user_id, name, e)
logger.error(" Transaction annulée pour ce groupe, on continue.") logger.error(" Transaction annulée pour ce groupe, on continue.")
logger.info("=" * 78) logger.info("=" * 78)
@@ -241,10 +223,8 @@ def confirm(prompt: str) -> bool:
return answer in ("oui", "o", "yes", "y") return answer in ("oui", "o", "yes", "y")
def regenerate_backup(user_id: str | None, allow_missing_template_id: bool = False) -> Path: def regenerate_backup(user_id: str | None) -> Path:
"""Run the backup script as a subprocess to get a fresh JSON.""" """Run the backup script as a subprocess to get a fresh JSON."""
import subprocess
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
out_path = ROOT / "backups" / f"glossary_duplicates_{timestamp}.json" out_path = ROOT / "backups" / f"glossary_duplicates_{timestamp}.json"
cmd = [ cmd = [
@@ -253,11 +233,9 @@ def regenerate_backup(user_id: str | None, allow_missing_template_id: bool = Fal
] ]
if user_id: if user_id:
cmd += ["--user", user_id] cmd += ["--user", user_id]
if allow_missing_template_id:
cmd += ["--allow-missing-template-id"]
cmd += ["--output", str(out_path)] cmd += ["--output", str(out_path)]
logger.info("🔄 Génération d'un backup frais : %s", " ".join(cmd)) logger.info("🔄 Génération d'un backup frais : %s", " ".join(cmd))
res = subprocess.run(cmd, env=__import__("os").environ.copy()) res = subprocess.run(cmd, env=os.environ.copy())
if res.returncode != 0: if res.returncode != 0:
logger.error("❌ Échec de la génération du backup (code=%d).", res.returncode) logger.error("❌ Échec de la génération du backup (code=%d).", res.returncode)
sys.exit(2) sys.exit(2)
@@ -279,11 +257,6 @@ def main() -> int:
metavar="USER_ID", metavar="USER_ID",
help="Génère un backup frais limité à cet utilisateur (utilisé si BACKUP_JSON absent).", help="Génère un backup frais limité à cet utilisateur (utilisé si BACKUP_JSON absent).",
) )
parser.add_argument(
"--allow-missing-template-id",
action="store_true",
help="Transmis au script de backup si le schéma DB n'a pas la colonne template_id.",
)
parser.add_argument( parser.add_argument(
"--dry-run", "--dry-run",
action="store_true", action="store_true",
@@ -300,7 +273,7 @@ def main() -> int:
if args.input: if args.input:
backup_path = Path(args.input) backup_path = Path(args.input)
elif args.user: elif args.user:
backup_path = regenerate_backup(args.user, args.allow_missing_template_id) backup_path = regenerate_backup(args.user)
else: else:
parser.error("Fournissez un BACKUP_JSON ou bien --user USER_ID.") parser.error("Fournissez un BACKUP_JSON ou bien --user USER_ID.")