Files
Entropyk/.opencode/skills/bmad-distillator/scripts/analyze_sources.py
Sepehr ab5dc7e568 chore: remove BMAD framework files and IDE configuration artifacts
Clean up unused BMAD workflow, agent, and command files across all IDE
configurations (.agent, .clinerules, .cursor, .gemini, .github, .kilocode,
.opencode) and internal module files (_bmad/bmb, _bmad/bmm).

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-04-25 15:01:09 +02:00

301 lines
9.7 KiB
Python

# /// script
# /// requires-python = ">=3.10"
# /// dependencies = []
# ///
"""Analyze source documents for the distillation generator.
Enumerates files from paths/folders/globs, computes sizes and token estimates,
detects document types from naming conventions, and suggests groupings for
related documents (e.g., a brief paired with its discovery notes).
Accepts: file paths, folder paths (scans recursively for .md/.txt/.yaml/.yml/.json),
or glob patterns. Skips node_modules, .git, __pycache__, .venv, _bmad-output.
Output JSON structure:
status: "ok" | "error"
files[]: path, filename, size_bytes, estimated_tokens, doc_type
summary: total_files, total_size_bytes, total_estimated_tokens
groups[]: group_key, files[] with role (primary/companion/standalone)
- Groups related docs by naming convention (e.g., brief + discovery-notes)
routing: recommendation ("single" | "fan-out"), reason
- single: ≤3 files AND ≤15K estimated tokens
- fan-out: >3 files OR >15K estimated tokens
split_prediction: prediction ("likely" | "unlikely"), reason, estimated_distillate_tokens
- Estimates distillate at ~1/3 source size; splits if >5K tokens
"""
from __future__ import annotations
import argparse
import glob
import json
import os
import re
import sys
from pathlib import Path
# Extensions to include when scanning folders
INCLUDE_EXTENSIONS = {".md", ".txt", ".yaml", ".yml", ".json"}
# Directories to skip when scanning folders
SKIP_DIRS = {
"node_modules", ".git", "__pycache__", ".venv", "venv",
".claude", "_bmad-output", ".cursor", ".vscode",
}
# Approximate chars per token for estimation
CHARS_PER_TOKEN = 4
# Thresholds
SINGLE_COMPRESSOR_MAX_TOKENS = 15_000
SINGLE_DISTILLATE_MAX_TOKENS = 5_000
# Naming patterns for document type detection
DOC_TYPE_PATTERNS = [
(r"discovery[_-]notes", "discovery-notes"),
(r"product[_-]brief", "product-brief"),
(r"research[_-]report", "research-report"),
(r"architecture", "architecture-doc"),
(r"prd", "prd"),
(r"distillate", "distillate"),
(r"changelog", "changelog"),
(r"readme", "readme"),
(r"spec", "specification"),
(r"requirements", "requirements"),
(r"design[_-]doc", "design-doc"),
(r"meeting[_-]notes", "meeting-notes"),
(r"brainstorm", "brainstorming"),
(r"interview", "interview-notes"),
]
# Patterns for grouping related documents
GROUP_PATTERNS = [
# base document + discovery notes
(r"^(.+?)(?:-discovery-notes|-discovery_notes)\.(\w+)$", r"\1.\2"),
# base document + appendix
(r"^(.+?)(?:-appendix|-addendum)(?:-\w+)?\.(\w+)$", r"\1.\2"),
# base document + review/feedback
(r"^(.+?)(?:-review|-feedback)\.(\w+)$", r"\1.\2"),
]
def resolve_inputs(inputs: list[str]) -> list[Path]:
"""Resolve input arguments to a flat list of file paths."""
files: list[Path] = []
for inp in inputs:
path = Path(inp)
if path.is_file():
files.append(path.resolve())
elif path.is_dir():
for root, dirs, filenames in os.walk(path):
dirs[:] = [d for d in dirs if d not in SKIP_DIRS]
for fn in sorted(filenames):
fp = Path(root) / fn
if fp.suffix.lower() in INCLUDE_EXTENSIONS:
files.append(fp.resolve())
else:
# Try as glob
matches = glob.glob(inp, recursive=True)
for m in sorted(matches):
mp = Path(m)
if mp.is_file() and mp.suffix.lower() in INCLUDE_EXTENSIONS:
files.append(mp.resolve())
# Deduplicate while preserving order
seen: set[Path] = set()
deduped: list[Path] = []
for f in files:
if f not in seen:
seen.add(f)
deduped.append(f)
return deduped
def detect_doc_type(filename: str) -> str:
"""Detect document type from filename."""
name_lower = filename.lower()
for pattern, doc_type in DOC_TYPE_PATTERNS:
if re.search(pattern, name_lower):
return doc_type
return "unknown"
def suggest_groups(files: list[Path]) -> list[dict]:
"""Suggest document groupings based on naming conventions."""
groups: dict[str, list[dict]] = {}
ungrouped: list[dict] = []
file_map = {f.name: f for f in files}
assigned: set[str] = set()
for f in files:
if f.name in assigned:
continue
matched = False
for pattern, base_pattern in GROUP_PATTERNS:
m = re.match(pattern, f.name, re.IGNORECASE)
if m:
# This file is a companion — find its base
base_name = re.sub(pattern, base_pattern, f.name, flags=re.IGNORECASE)
group_key = base_name
if group_key not in groups:
groups[group_key] = []
# Add the base file if it exists
if base_name in file_map and base_name not in assigned:
groups[group_key].append({
"path": str(file_map[base_name]),
"filename": base_name,
"role": "primary",
})
assigned.add(base_name)
groups[group_key].append({
"path": str(f),
"filename": f.name,
"role": "companion",
})
assigned.add(f.name)
matched = True
break
if not matched:
# Check if this file is a base that already has companions
if f.name in groups:
continue # Already added as primary
ungrouped.append({
"path": str(f),
"filename": f.name,
})
result = []
for group_key, members in groups.items():
result.append({
"group_key": group_key,
"files": members,
})
for ug in ungrouped:
if ug["filename"] not in assigned:
result.append({
"group_key": ug["filename"],
"files": [{"path": ug["path"], "filename": ug["filename"], "role": "standalone"}],
})
return result
def analyze(inputs: list[str], output_path: str | None = None) -> None:
"""Main analysis function."""
files = resolve_inputs(inputs)
if not files:
result = {
"status": "error",
"error": "No readable files found from provided inputs",
"inputs": inputs,
}
output_json(result, output_path)
return
# Analyze each file
file_details = []
total_chars = 0
for f in files:
size = f.stat().st_size
total_chars += size
file_details.append({
"path": str(f),
"filename": f.name,
"size_bytes": size,
"estimated_tokens": size // CHARS_PER_TOKEN,
"doc_type": detect_doc_type(f.name),
})
total_tokens = total_chars // CHARS_PER_TOKEN
groups = suggest_groups(files)
# Routing recommendation
if len(files) <= 3 and total_tokens <= SINGLE_COMPRESSOR_MAX_TOKENS:
routing = "single"
routing_reason = (
f"{len(files)} file(s), ~{total_tokens:,} estimated tokens — "
f"within single compressor threshold"
)
else:
routing = "fan-out"
routing_reason = (
f"{len(files)} file(s), ~{total_tokens:,} estimated tokens — "
f"exceeds single compressor threshold "
f"({'>' + str(SINGLE_COMPRESSOR_MAX_TOKENS) + ' tokens' if total_tokens > SINGLE_COMPRESSOR_MAX_TOKENS else '> 3 files'})"
)
# Split prediction
estimated_distillate_tokens = total_tokens // 3 # rough: distillate is ~1/3 of source
if estimated_distillate_tokens > SINGLE_DISTILLATE_MAX_TOKENS:
split_prediction = "likely"
split_reason = (
f"Estimated distillate ~{estimated_distillate_tokens:,} tokens "
f"exceeds {SINGLE_DISTILLATE_MAX_TOKENS:,} threshold"
)
else:
split_prediction = "unlikely"
split_reason = (
f"Estimated distillate ~{estimated_distillate_tokens:,} tokens "
f"within {SINGLE_DISTILLATE_MAX_TOKENS:,} threshold"
)
result = {
"status": "ok",
"files": file_details,
"summary": {
"total_files": len(files),
"total_size_bytes": total_chars,
"total_estimated_tokens": total_tokens,
},
"groups": groups,
"routing": {
"recommendation": routing,
"reason": routing_reason,
},
"split_prediction": {
"prediction": split_prediction,
"reason": split_reason,
"estimated_distillate_tokens": estimated_distillate_tokens,
},
}
output_json(result, output_path)
def output_json(data: dict, output_path: str | None) -> None:
"""Write JSON to file or stdout."""
json_str = json.dumps(data, indent=2)
if output_path:
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
Path(output_path).write_text(json_str + "\n")
print(f"Results written to {output_path}", file=sys.stderr)
else:
print(json_str)
def main() -> None:
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"inputs",
nargs="+",
help="File paths, folder paths, or glob patterns to analyze",
)
parser.add_argument(
"-o", "--output",
help="Output JSON to file instead of stdout",
)
args = parser.parse_args()
analyze(args.inputs, args.output)
sys.exit(0)
if __name__ == "__main__":
main()