Files
cs249r_book/interviews/vault-cli/scripts/diagnose_chain_coverage.py
Vijay Janapa Reddi 5225059754 fix(vault-cli): clear ruff violations flagged by --all-files sweep
Auto-fix removed extraneous f-string prefixes, unused imports
(re, sys, textwrap, defaultdict), an unused local (qids), and
converted datetime.now(timezone.utc) to datetime.now(UTC) (UP017).
Manual fixes split colon/semicolon one-liners onto separate lines
(E701/E702), renamed unused loop vars (cid, chain_id) with leading
underscores (B007), replaced bare except with except Exception (E722),
and renamed loop var L to level to satisfy N806.
2026-05-02 09:17:15 -04:00

213 lines
7.2 KiB
Python
Executable File

#!/usr/bin/env python3
"""Diagnose chain coverage by (track, topic) bucket.
For each (track, topic) bucket of published questions, reports how many
questions live in the bucket and how many chains currently cover any of
them. Surfaces two lists worth a second-pass Gemini sweep:
- ``uncovered_buckets``: ≥3 published questions, 0 chains
- ``under_covered_buckets``: ≥6 published questions, exactly 1 chain
Output:
- JSON sidecar at ``interviews/vault/chain-coverage.json`` (regeneratable;
gitignored) — feeds Phase 1.4 (--buckets-from)
- Human-readable summary on stdout: per-track totals, biggest gaps
Usage:
python3 diagnose_chain_coverage.py
python3 diagnose_chain_coverage.py --output path/to/coverage.json
"""
from __future__ import annotations
import argparse
import json
from collections import defaultdict
from pathlib import Path
import yaml
from vault_cli.policy import is_published, load_policy
REPO_ROOT = Path(__file__).resolve().parents[3]
VAULT_DIR = REPO_ROOT / "interviews" / "vault"
QUESTIONS_DIR = VAULT_DIR / "questions"
CHAINS_PATH = VAULT_DIR / "chains.json"
POLICY_PATH = VAULT_DIR / "release-policy.yaml"
DEFAULT_OUTPUT = VAULT_DIR / "chain-coverage.json"
UNCOVERED_MIN_QUESTIONS = 3
UNDER_COVERED_MIN_QUESTIONS = 6
UNDER_COVERED_MAX_CHAINS = 1
def load_published_corpus() -> dict[str, dict]:
policy = load_policy(POLICY_PATH)
corpus: dict[str, dict] = {}
for path in QUESTIONS_DIR.rglob("*.yaml"):
with path.open(encoding="utf-8") as f:
d = yaml.safe_load(f)
if not isinstance(d, dict) or "id" not in d:
continue
if not is_published(d, policy):
continue
corpus[d["id"]] = d
return corpus
def bucket_corpus(corpus: dict[str, dict]) -> dict[tuple[str, str], list[str]]:
by_bucket: dict[tuple[str, str], list[str]] = defaultdict(list)
for qid, d in corpus.items():
track = d.get("track")
topic = d.get("topic")
if not track or not topic:
continue
by_bucket[(track, topic)].append(qid)
for k in by_bucket:
by_bucket[k].sort()
return dict(by_bucket)
def load_chains() -> list[dict]:
with CHAINS_PATH.open(encoding="utf-8") as f:
return json.load(f)
def chains_per_bucket(chains: list[dict]) -> dict[tuple[str, str], list[str]]:
"""Map (track, topic) -> list of chain_ids that target that bucket."""
by_bucket: dict[tuple[str, str], list[str]] = defaultdict(list)
for c in chains:
track = c.get("track")
topic = c.get("topic")
cid = c.get("chain_id")
if not (track and topic and cid):
continue
by_bucket[(track, topic)].append(cid)
for k in by_bucket:
by_bucket[k].sort()
return dict(by_bucket)
def build_report(
buckets: dict[tuple[str, str], list[str]],
chains_by_bucket: dict[tuple[str, str], list[str]],
) -> dict:
bucket_rows: list[dict] = []
for (track, topic), qids in sorted(buckets.items()):
cids = chains_by_bucket.get((track, topic), [])
bucket_rows.append({
"track": track,
"topic": topic,
"question_count": len(qids),
"chain_count": len(cids),
"qids": qids,
"chain_ids": cids,
})
uncovered = [
b for b in bucket_rows
if b["question_count"] >= UNCOVERED_MIN_QUESTIONS and b["chain_count"] == 0
]
under_covered = [
b for b in bucket_rows
if b["question_count"] >= UNDER_COVERED_MIN_QUESTIONS
and b["chain_count"] <= UNDER_COVERED_MAX_CHAINS
and b["chain_count"] > 0
]
# Stable, useful ordering: most-questions-first within each list.
uncovered.sort(key=lambda b: (-b["question_count"], b["track"], b["topic"]))
under_covered.sort(key=lambda b: (-b["question_count"], b["track"], b["topic"]))
return {
"thresholds": {
"uncovered_min_questions": UNCOVERED_MIN_QUESTIONS,
"under_covered_min_questions": UNDER_COVERED_MIN_QUESTIONS,
"under_covered_max_chains": UNDER_COVERED_MAX_CHAINS,
},
"totals": {
"buckets": len(bucket_rows),
"questions": sum(b["question_count"] for b in bucket_rows),
"chains": sum(b["chain_count"] for b in bucket_rows),
"uncovered_buckets": len(uncovered),
"under_covered_buckets": len(under_covered),
},
"all_buckets": bucket_rows,
"uncovered_buckets": uncovered,
"under_covered_buckets": under_covered,
}
def print_summary(report: dict) -> None:
totals = report["totals"]
print(f"Buckets: {totals['buckets']}")
print(f"Published questions: {totals['questions']}")
print(f"Total chains: {totals['chains']}")
print(f"Uncovered buckets: {totals['uncovered_buckets']} "
f"(≥{UNCOVERED_MIN_QUESTIONS} questions, 0 chains)")
print(f"Under-covered: {totals['under_covered_buckets']} "
f"(≥{UNDER_COVERED_MIN_QUESTIONS} questions, ≤{UNDER_COVERED_MAX_CHAINS} chain)")
# Per-track breakdown
per_track: dict[str, dict[str, int]] = defaultdict(lambda: {
"buckets": 0, "questions": 0, "chains": 0,
"uncovered": 0, "under_covered": 0,
})
for b in report["all_buckets"]:
t = per_track[b["track"]]
t["buckets"] += 1
t["questions"] += b["question_count"]
t["chains"] += b["chain_count"]
for b in report["uncovered_buckets"]:
per_track[b["track"]]["uncovered"] += 1
for b in report["under_covered_buckets"]:
per_track[b["track"]]["under_covered"] += 1
print()
print(f"{'track':<10} {'buckets':>8} {'questions':>10} {'chains':>7} "
f"{'chains/topic':>13} {'uncov':>6} {'undercov':>9}")
for track in sorted(per_track):
t = per_track[track]
density = t["chains"] / t["buckets"] if t["buckets"] else 0.0
print(f"{track:<10} {t['buckets']:>8} {t['questions']:>10} {t['chains']:>7} "
f"{density:>13.2f} {t['uncovered']:>6} {t['under_covered']:>9}")
print()
print("Top 10 uncovered buckets by question count:")
for b in report["uncovered_buckets"][:10]:
print(f" {b['track']:<8} {b['topic']:<40} q={b['question_count']}")
if report["under_covered_buckets"]:
print()
print("Top 10 under-covered buckets:")
for b in report["under_covered_buckets"][:10]:
print(f" {b['track']:<8} {b['topic']:<40} "
f"q={b['question_count']} chains={b['chain_count']}")
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT,
help=f"output JSON path (default: {DEFAULT_OUTPUT})")
args = parser.parse_args()
corpus = load_published_corpus()
buckets = bucket_corpus(corpus)
chains = load_chains()
cbb = chains_per_bucket(chains)
report = build_report(buckets, cbb)
args.output.parent.mkdir(parents=True, exist_ok=True)
with args.output.open("w", encoding="utf-8") as f:
json.dump(report, f, indent=2, sort_keys=False)
f.write("\n")
print_summary(report)
print()
print(f"wrote {args.output.relative_to(REPO_ROOT)}")
if __name__ == "__main__":
main()