Files
office_translator/scripts/backup_duplicate_glossaries.py
Sepehr 8f55e3d9aa
All checks were successful
Deploy to Production / Build and Deploy (push) Successful in 3m1s
fix(glossaries): dedup by (user_id, name) instead of (user_id, template_id)
Le groupement par template_id etait faux sur la prod :
- Les doublons historiques ont template_id=NULL (crees avant la migration)
- Deux glossaires 'Finance - FR->Anglais' et 'Finance - FR->Multilingue'
  partagent le meme template_id mais DOIVENT etre conserves separement.

Changements :
- Groupement par (user_id, name) -> c'est ce que l'utilisateur voit dans l'UI
  et la definition reelle d'un doublon.
- Les glossaires multilingues ('-> Multilingue') ont un nom distinct des
  versions '-> Anglais' : ils ne sont jamais fusionnes (preserve par design).
- Fallback automatique si la colonne template_id est absente du schema
  (dev DB) : warning + requete sans la colonne, aucun crash.
- Suppression du flag --allow-missing-template-id devenu inutile.
- Nettoyage des imports ORM inutiles (text brut uniquement, plus rapide).
2026-06-03 21:48:53 +02:00

280 lines
9.8 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Sauvegarde en JSON les glossaires dupliqués (même user_id + même nom) créés
avant la mise en place de la garde anti-doublon dans le backend.
⚠️ Ce script ne supprime RIEN — il produit uniquement un fichier de backup
contenant l'intégralité des doublons (métadonnées + termes) en vue d'une
analyse ou d'une suppression manuelle ultérieure.
Pour chaque couple (user_id, name) avec > 1 glossaire, le plus ancien
(premier créé) est marqué "keeper" et les copies sont listées dans "duplicates"
avec tous leurs termes.
Les glossaires multilingues (« Français → Multilingue ») ont un nom distinct
de leurs homologues « Français → Anglais » : ils ne sont jamais fusionnés.
Usage:
# Cible la base de prod PostgreSQL (lu via DATABASE_URL) :
DATABASE_URL=postgresql://user:pass@host:5432/db python scripts/backup_duplicate_glossaries.py
# Ou préciser une base SQLite spécifique :
SQLITE_PATH=/path/to/translate.db python scripts/backup_duplicate_glossaries.py
# Limiter à un seul utilisateur :
DATABASE_URL=... python scripts/backup_duplicate_glossaries.py --user <user_id>
# Choisir le fichier de sortie :
DATABASE_URL=... python scripts/backup_duplicate_glossaries.py --output backups/dupes.json
"""
import argparse
import json
import logging
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
from sqlalchemy import text
from database.connection import get_sync_session
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger("backup_dup_glossaries")
def find_duplicates(
session, user_id: str | None = None,
) -> dict[tuple[str, str], list[dict]]:
"""Group glossaries by (user_id, name). Returns lightweight dicts (not ORM).
Utilise du SQL brut pour rester robuste si la colonne `template_id` est
absente du schéma (ex. ancienne DB de dev). En cas d'absence, retombe
automatiquement sur une requête sans `template_id`.
"""
base_cols = "id, user_id, name, source_language, target_language, created_at, updated_at"
try:
sql = f"SELECT {base_cols}, template_id FROM glossaries"
params: dict = {}
if user_id:
sql += " WHERE user_id = :user_id"
params["user_id"] = user_id
sql += " ORDER BY user_id, name, created_at"
rows = session.execute(text(sql), params).fetchall()
has_template_id = True
except Exception as e:
if "no such column" not in str(e).lower() and "undefined column" not in str(e).lower():
raise
logger.warning("⚠️ Colonne `template_id` absente du schéma — fallback sans template_id.")
sql = f"SELECT {base_cols} FROM glossaries"
params = {}
if user_id:
sql += " WHERE user_id = :user_id"
params["user_id"] = user_id
sql += " ORDER BY user_id, name, created_at"
rows = session.execute(text(sql), params).fetchall()
has_template_id = False
groups: dict[tuple[str, str], list[dict]] = defaultdict(list)
for r in rows:
groups[(r.user_id, r.name)].append({
"id": r.id,
"user_id": r.user_id,
"name": r.name,
"source_language": r.source_language,
"target_language": r.target_language,
"template_id": r.template_id if has_template_id else None,
"created_at": r.created_at,
"updated_at": r.updated_at,
})
return {k: v for k, v in groups.items() if len(v) > 1}
def _stable_sort(glossaries: list[dict]) -> tuple[list[dict], int]:
"""Sort glossaries by (created_at ASC, id ASC) for deterministic ordering.
Returns the sorted list and the number of entries with None created_at.
"""
none_count = sum(1 for g in glossaries if g["created_at"] is None)
if none_count:
logger.warning(
"⚠️ %d glossaire(s) ont un created_at NULL — tri secondaire par id.",
none_count,
)
return sorted(
glossaries,
key=lambda g: (g["created_at"] or datetime.min.replace(tzinfo=timezone.utc), g["id"]),
), none_count
def serialize_group(user_id: str, name: str, glossaries: list[dict]) -> dict:
"""Convert a duplicate group to a JSON-serializable dict."""
sorted_glossaries, _ = _stable_sort(glossaries)
keeper = sorted_glossaries[0]
duplicates = sorted_glossaries[1:]
def to_iso(value) -> str | None:
if value is None:
return None
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, str):
try:
return datetime.fromisoformat(value).isoformat()
except ValueError:
return value
return str(value)
def count_terms(session, glossary_id: str) -> int:
return session.execute(
text("SELECT COUNT(*) FROM glossary_terms WHERE glossary_id = :id"),
{"id": glossary_id},
).scalar() or 0
with get_sync_session() as session:
keeper_terms = count_terms(session, keeper["id"])
duplicate_payload = []
for d in duplicates:
tcount = count_terms(session, d["id"])
duplicate_payload.append({
**d,
"created_at": to_iso(d["created_at"]),
"updated_at": to_iso(d["updated_at"]),
"terms": _fetch_terms(session, d["id"]),
"terms_count": tcount,
})
return {
"user_id": user_id,
"name": name,
"keep": {
"id": keeper["id"],
"name": keeper["name"],
"source_language": keeper["source_language"],
"target_language": keeper["target_language"],
"template_id": keeper["template_id"],
"created_at": to_iso(keeper["created_at"]),
"updated_at": to_iso(keeper["updated_at"]),
"terms_count": keeper_terms,
},
"duplicates_count": len(duplicates),
"duplicates": duplicate_payload,
}
def _fetch_terms(session, glossary_id: str) -> list[dict]:
"""Fetch all terms for a glossary (used to back up duplicates before deletion)."""
rows = session.execute(
text(
"SELECT id, source, target, translations "
"FROM glossary_terms WHERE glossary_id = :id ORDER BY id"
),
{"id": glossary_id},
).fetchall()
return [
{
"id": r.id,
"source": r.source,
"target": r.target,
"translations": r.translations or {},
}
for r in rows
]
def write_backup(groups: dict[tuple[str, str], list[dict]], output_path: Path) -> dict:
"""Write the full backup to `output_path` and return a stats dict."""
output_path.parent.mkdir(parents=True, exist_ok=True)
payload = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"schema_version": 2,
"note": "Aucun glossaire n'a été supprimé. Ce fichier documente les doublons. "
"Les glossaires multilingues (« → Multilingue ») ont un nom distinct "
"et ne sont jamais fusionnés avec leurs homologues « → Anglais ».",
"total_groups": len(groups),
"total_duplicates": sum(len(v) - 1 for v in groups.values()),
"groups": [
serialize_group(uid, name, gs)
for (uid, name), gs in sorted(groups.items(), key=lambda x: (x[0][1], x[0][0]))
],
}
with open(output_path, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
return payload
def print_report(payload: dict) -> None:
"""Print a human-readable summary on stdout."""
logger.info("=" * 78)
logger.info("Récapitulatif du backup")
logger.info("=" * 78)
logger.info("Généré le : %s", payload["generated_at"])
logger.info("Groupes concernés: %d", payload["total_groups"])
logger.info("Doublons totaux : %d", payload["total_duplicates"])
logger.info("")
logger.info("Détail par nom :")
for g in payload["groups"]:
logger.info(" '%s'%d doublon(s) à supprimer", g["name"], g["duplicates_count"])
logger.info("=" * 78)
def main() -> int:
parser = argparse.ArgumentParser(
description="Sauvegarde (sans suppression) les glossaires dupliqués en JSON."
)
parser.add_argument(
"--user",
metavar="USER_ID",
help="Limite le backup à un seul utilisateur.",
)
parser.add_argument(
"--output",
metavar="PATH",
help="Chemin du fichier JSON de sortie (défaut : backups/glossary_duplicates_<timestamp>.json).",
)
args = parser.parse_args()
logger.info(
"🔍 Recherche de doublons (user_id, name)%s",
f" pour user_id={args.user}" if args.user else "",
)
with get_sync_session() as session:
groups = find_duplicates(session, user_id=args.user)
if not groups:
if args.user:
logger.info("✅ Aucun doublon trouvé pour user_id=%s.", args.user)
else:
logger.info("✅ Aucun doublon trouvé.")
return 0
if args.output:
output_path = Path(args.output)
else:
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
output_path = ROOT / "backups" / f"glossary_duplicates_{ts}.json"
logger.info("💾 Écriture du backup vers %s", output_path)
payload = write_backup(groups, output_path)
print_report(payload)
logger.info("✅ Backup écrit : %s (%d octets)", output_path, output_path.stat().st_size)
logger.info(" Aucune suppression effectuée. Relire le JSON pour décider de l'action manuelle.")
return 0
if __name__ == "__main__":
sys.exit(main())