Files
cs249r_book/interviews/vault-cli/scripts/diagnose_chain_coverage.py
Vijay Janapa Reddi a74c98576e Merge origin/dev into yaml-audit
Sync the yaml-audit branch with the latest dev work since the previous
sync (5c5af75ed). Brings in 73 commits including:

  - CI security fixes: postcss XSS bump, uuid bounds bump, codeql
    paths-ignore for vendored bundles, read-only token on
    staffml-validate-vault workflow
  - kits/ dark mode polish: code-block readability, dropdown contrast
  - vault-cli/: pre-commit ruff hook + 20 ruff fixes, all-contributors
    auto-credit workflow change to pull_request_target
  - dev's earlier merge of yaml-audit (836d481b5) carrying the
    pre-trailer-strip Phase 1/2/3 history; this merge harmonises that
    with the current trailer-clean yaml-audit tip
  - misc bug fixes (tinytorch perceptron seed, infra workflows,
    socratiq vite dev injector)

Conflicts resolved (if any) preserve the yaml-audit-side authoritative
state for vault/* files (we own those) and the dev-side authoritative
state for .github/workflows/* and other shared infrastructure.

# Conflicts:
#	.github/workflows/all-contributors-auto-credit.yml
#	.github/workflows/staffml-preview-dev.yml
#	interviews/staffml/src/data/corpus-summary.json
#	interviews/staffml/src/data/vault-manifest.json
#	interviews/staffml/tests/chain-and-vault-smoke.mjs
#	interviews/vault-cli/README.md
#	interviews/vault-cli/docs/CHAIN_ROADMAP.md
#	interviews/vault-cli/scripts/build_chains_with_gemini.py
#	interviews/vault-cli/scripts/generate_question_for_gap.py
#	interviews/vault-cli/scripts/merge_chain_passes.py
#	interviews/vault-cli/scripts/validate_drafts.py
#	interviews/vault-cli/src/vault_cli/legacy_export.py
#	interviews/vault-cli/tests/test_chain_validation.py
#	interviews/vault/.gitignore
#	interviews/vault/ARCHITECTURE.md
#	interviews/vault/chains.json
#	interviews/vault/id-registry.yaml
#	interviews/vault/questions/edge/optimization/edge-2536.yaml
#	interviews/vault/questions/mobile/deployment/mobile-2147.yaml
#	tinytorch/src/03_layers/03_layers.py
2026-05-02 11:06:43 -04:00

213 lines
7.2 KiB
Python
Executable File

#!/usr/bin/env python3
"""Diagnose chain coverage by (track, topic) bucket.
For each (track, topic) bucket of published questions, reports how many
questions live in the bucket and how many chains currently cover any of
them. Surfaces two lists worth a second-pass Gemini sweep:
- ``uncovered_buckets``: ≥3 published questions, 0 chains
- ``under_covered_buckets``: ≥6 published questions, exactly 1 chain
Output:
- JSON sidecar at ``interviews/vault/chain-coverage.json`` (regeneratable;
gitignored) — feeds Phase 1.4 (--buckets-from)
- Human-readable summary on stdout: per-track totals, biggest gaps
Usage:
python3 diagnose_chain_coverage.py
python3 diagnose_chain_coverage.py --output path/to/coverage.json
"""
from __future__ import annotations
import argparse
import json
from collections import defaultdict
from pathlib import Path
import yaml
from vault_cli.policy import is_published, load_policy
REPO_ROOT = Path(__file__).resolve().parents[3]
VAULT_DIR = REPO_ROOT / "interviews" / "vault"
QUESTIONS_DIR = VAULT_DIR / "questions"
CHAINS_PATH = VAULT_DIR / "chains.json"
POLICY_PATH = VAULT_DIR / "release-policy.yaml"
DEFAULT_OUTPUT = VAULT_DIR / "chain-coverage.json"
UNCOVERED_MIN_QUESTIONS = 3
UNDER_COVERED_MIN_QUESTIONS = 6
UNDER_COVERED_MAX_CHAINS = 1
def load_published_corpus() -> dict[str, dict]:
policy = load_policy(POLICY_PATH)
corpus: dict[str, dict] = {}
for path in QUESTIONS_DIR.rglob("*.yaml"):
with path.open(encoding="utf-8") as f:
d = yaml.safe_load(f)
if not isinstance(d, dict) or "id" not in d:
continue
if not is_published(d, policy):
continue
corpus[d["id"]] = d
return corpus
def bucket_corpus(corpus: dict[str, dict]) -> dict[tuple[str, str], list[str]]:
by_bucket: dict[tuple[str, str], list[str]] = defaultdict(list)
for qid, d in corpus.items():
track = d.get("track")
topic = d.get("topic")
if not track or not topic:
continue
by_bucket[(track, topic)].append(qid)
for k in by_bucket:
by_bucket[k].sort()
return dict(by_bucket)
def load_chains() -> list[dict]:
with CHAINS_PATH.open(encoding="utf-8") as f:
return json.load(f)
def chains_per_bucket(chains: list[dict]) -> dict[tuple[str, str], list[str]]:
"""Map (track, topic) -> list of chain_ids that target that bucket."""
by_bucket: dict[tuple[str, str], list[str]] = defaultdict(list)
for c in chains:
track = c.get("track")
topic = c.get("topic")
cid = c.get("chain_id")
if not (track and topic and cid):
continue
by_bucket[(track, topic)].append(cid)
for k in by_bucket:
by_bucket[k].sort()
return dict(by_bucket)
def build_report(
buckets: dict[tuple[str, str], list[str]],
chains_by_bucket: dict[tuple[str, str], list[str]],
) -> dict:
bucket_rows: list[dict] = []
for (track, topic), qids in sorted(buckets.items()):
cids = chains_by_bucket.get((track, topic), [])
bucket_rows.append({
"track": track,
"topic": topic,
"question_count": len(qids),
"chain_count": len(cids),
"qids": qids,
"chain_ids": cids,
})
uncovered = [
b for b in bucket_rows
if b["question_count"] >= UNCOVERED_MIN_QUESTIONS and b["chain_count"] == 0
]
under_covered = [
b for b in bucket_rows
if b["question_count"] >= UNDER_COVERED_MIN_QUESTIONS
and b["chain_count"] <= UNDER_COVERED_MAX_CHAINS
and b["chain_count"] > 0
]
# Stable, useful ordering: most-questions-first within each list.
uncovered.sort(key=lambda b: (-b["question_count"], b["track"], b["topic"]))
under_covered.sort(key=lambda b: (-b["question_count"], b["track"], b["topic"]))
return {
"thresholds": {
"uncovered_min_questions": UNCOVERED_MIN_QUESTIONS,
"under_covered_min_questions": UNDER_COVERED_MIN_QUESTIONS,
"under_covered_max_chains": UNDER_COVERED_MAX_CHAINS,
},
"totals": {
"buckets": len(bucket_rows),
"questions": sum(b["question_count"] for b in bucket_rows),
"chains": sum(b["chain_count"] for b in bucket_rows),
"uncovered_buckets": len(uncovered),
"under_covered_buckets": len(under_covered),
},
"all_buckets": bucket_rows,
"uncovered_buckets": uncovered,
"under_covered_buckets": under_covered,
}
def print_summary(report: dict) -> None:
totals = report["totals"]
print(f"Buckets: {totals['buckets']}")
print(f"Published questions: {totals['questions']}")
print(f"Total chains: {totals['chains']}")
print(f"Uncovered buckets: {totals['uncovered_buckets']} "
f"(≥{UNCOVERED_MIN_QUESTIONS} questions, 0 chains)")
print(f"Under-covered: {totals['under_covered_buckets']} "
f"(≥{UNDER_COVERED_MIN_QUESTIONS} questions, ≤{UNDER_COVERED_MAX_CHAINS} chain)")
# Per-track breakdown
per_track: dict[str, dict[str, int]] = defaultdict(lambda: {
"buckets": 0, "questions": 0, "chains": 0,
"uncovered": 0, "under_covered": 0,
})
for b in report["all_buckets"]:
t = per_track[b["track"]]
t["buckets"] += 1
t["questions"] += b["question_count"]
t["chains"] += b["chain_count"]
for b in report["uncovered_buckets"]:
per_track[b["track"]]["uncovered"] += 1
for b in report["under_covered_buckets"]:
per_track[b["track"]]["under_covered"] += 1
print()
print(f"{'track':<10} {'buckets':>8} {'questions':>10} {'chains':>7} "
f"{'chains/topic':>13} {'uncov':>6} {'undercov':>9}")
for track in sorted(per_track):
t = per_track[track]
density = t["chains"] / t["buckets"] if t["buckets"] else 0.0
print(f"{track:<10} {t['buckets']:>8} {t['questions']:>10} {t['chains']:>7} "
f"{density:>13.2f} {t['uncovered']:>6} {t['under_covered']:>9}")
print()
print("Top 10 uncovered buckets by question count:")
for b in report["uncovered_buckets"][:10]:
print(f" {b['track']:<8} {b['topic']:<40} q={b['question_count']}")
if report["under_covered_buckets"]:
print()
print("Top 10 under-covered buckets:")
for b in report["under_covered_buckets"][:10]:
print(f" {b['track']:<8} {b['topic']:<40} "
f"q={b['question_count']} chains={b['chain_count']}")
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT,
help=f"output JSON path (default: {DEFAULT_OUTPUT})")
args = parser.parse_args()
corpus = load_published_corpus()
buckets = bucket_corpus(corpus)
chains = load_chains()
cbb = chains_per_bucket(chains)
report = build_report(buckets, cbb)
args.output.parent.mkdir(parents=True, exist_ok=True)
with args.output.open("w", encoding="utf-8") as f:
json.dump(report, f, indent=2, sort_keys=False)
f.write("\n")
print_summary(report)
print()
print(f"wrote {args.output.relative_to(REPO_ROOT)}")
if __name__ == "__main__":
main()