mirror of
https://github.com/harvard-edge/cs249r_book.git
synced 2026-05-07 02:03:55 -05:00
Sync the yaml-audit branch with the latest dev work since the previous sync (5c5af75ed). Brings in 73 commits including: - CI security fixes: postcss XSS bump, uuid bounds bump, codeql paths-ignore for vendored bundles, read-only token on staffml-validate-vault workflow - kits/ dark mode polish: code-block readability, dropdown contrast - vault-cli/: pre-commit ruff hook + 20 ruff fixes, all-contributors auto-credit workflow change to pull_request_target - dev's earlier merge of yaml-audit (836d481b5) carrying the pre-trailer-strip Phase 1/2/3 history; this merge harmonises that with the current trailer-clean yaml-audit tip - misc bug fixes (tinytorch perceptron seed, infra workflows, socratiq vite dev injector) Conflicts resolved (if any) preserve the yaml-audit-side authoritative state for vault/* files (we own those) and the dev-side authoritative state for .github/workflows/* and other shared infrastructure. # Conflicts: # .github/workflows/all-contributors-auto-credit.yml # .github/workflows/staffml-preview-dev.yml # interviews/staffml/src/data/corpus-summary.json # interviews/staffml/src/data/vault-manifest.json # interviews/staffml/tests/chain-and-vault-smoke.mjs # interviews/vault-cli/README.md # interviews/vault-cli/docs/CHAIN_ROADMAP.md # interviews/vault-cli/scripts/build_chains_with_gemini.py # interviews/vault-cli/scripts/generate_question_for_gap.py # interviews/vault-cli/scripts/merge_chain_passes.py # interviews/vault-cli/scripts/validate_drafts.py # interviews/vault-cli/src/vault_cli/legacy_export.py # interviews/vault-cli/tests/test_chain_validation.py # interviews/vault/.gitignore # interviews/vault/ARCHITECTURE.md # interviews/vault/chains.json # interviews/vault/id-registry.yaml # interviews/vault/questions/edge/optimization/edge-2536.yaml # interviews/vault/questions/mobile/deployment/mobile-2147.yaml # tinytorch/src/03_layers/03_layers.py
213 lines
7.2 KiB
Python
Executable File
213 lines
7.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Diagnose chain coverage by (track, topic) bucket.
|
|
|
|
For each (track, topic) bucket of published questions, reports how many
|
|
questions live in the bucket and how many chains currently cover any of
|
|
them. Surfaces two lists worth a second-pass Gemini sweep:
|
|
|
|
- ``uncovered_buckets``: ≥3 published questions, 0 chains
|
|
- ``under_covered_buckets``: ≥6 published questions, exactly 1 chain
|
|
|
|
Output:
|
|
- JSON sidecar at ``interviews/vault/chain-coverage.json`` (regeneratable;
|
|
gitignored) — feeds Phase 1.4 (--buckets-from)
|
|
- Human-readable summary on stdout: per-track totals, biggest gaps
|
|
|
|
Usage:
|
|
python3 diagnose_chain_coverage.py
|
|
python3 diagnose_chain_coverage.py --output path/to/coverage.json
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
|
|
import yaml
|
|
|
|
from vault_cli.policy import is_published, load_policy
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[3]
|
|
VAULT_DIR = REPO_ROOT / "interviews" / "vault"
|
|
QUESTIONS_DIR = VAULT_DIR / "questions"
|
|
CHAINS_PATH = VAULT_DIR / "chains.json"
|
|
POLICY_PATH = VAULT_DIR / "release-policy.yaml"
|
|
DEFAULT_OUTPUT = VAULT_DIR / "chain-coverage.json"
|
|
|
|
UNCOVERED_MIN_QUESTIONS = 3
|
|
UNDER_COVERED_MIN_QUESTIONS = 6
|
|
UNDER_COVERED_MAX_CHAINS = 1
|
|
|
|
|
|
def load_published_corpus() -> dict[str, dict]:
|
|
policy = load_policy(POLICY_PATH)
|
|
corpus: dict[str, dict] = {}
|
|
for path in QUESTIONS_DIR.rglob("*.yaml"):
|
|
with path.open(encoding="utf-8") as f:
|
|
d = yaml.safe_load(f)
|
|
if not isinstance(d, dict) or "id" not in d:
|
|
continue
|
|
if not is_published(d, policy):
|
|
continue
|
|
corpus[d["id"]] = d
|
|
return corpus
|
|
|
|
|
|
def bucket_corpus(corpus: dict[str, dict]) -> dict[tuple[str, str], list[str]]:
|
|
by_bucket: dict[tuple[str, str], list[str]] = defaultdict(list)
|
|
for qid, d in corpus.items():
|
|
track = d.get("track")
|
|
topic = d.get("topic")
|
|
if not track or not topic:
|
|
continue
|
|
by_bucket[(track, topic)].append(qid)
|
|
for k in by_bucket:
|
|
by_bucket[k].sort()
|
|
return dict(by_bucket)
|
|
|
|
|
|
def load_chains() -> list[dict]:
|
|
with CHAINS_PATH.open(encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def chains_per_bucket(chains: list[dict]) -> dict[tuple[str, str], list[str]]:
|
|
"""Map (track, topic) -> list of chain_ids that target that bucket."""
|
|
by_bucket: dict[tuple[str, str], list[str]] = defaultdict(list)
|
|
for c in chains:
|
|
track = c.get("track")
|
|
topic = c.get("topic")
|
|
cid = c.get("chain_id")
|
|
if not (track and topic and cid):
|
|
continue
|
|
by_bucket[(track, topic)].append(cid)
|
|
for k in by_bucket:
|
|
by_bucket[k].sort()
|
|
return dict(by_bucket)
|
|
|
|
|
|
def build_report(
|
|
buckets: dict[tuple[str, str], list[str]],
|
|
chains_by_bucket: dict[tuple[str, str], list[str]],
|
|
) -> dict:
|
|
bucket_rows: list[dict] = []
|
|
for (track, topic), qids in sorted(buckets.items()):
|
|
cids = chains_by_bucket.get((track, topic), [])
|
|
bucket_rows.append({
|
|
"track": track,
|
|
"topic": topic,
|
|
"question_count": len(qids),
|
|
"chain_count": len(cids),
|
|
"qids": qids,
|
|
"chain_ids": cids,
|
|
})
|
|
|
|
uncovered = [
|
|
b for b in bucket_rows
|
|
if b["question_count"] >= UNCOVERED_MIN_QUESTIONS and b["chain_count"] == 0
|
|
]
|
|
under_covered = [
|
|
b for b in bucket_rows
|
|
if b["question_count"] >= UNDER_COVERED_MIN_QUESTIONS
|
|
and b["chain_count"] <= UNDER_COVERED_MAX_CHAINS
|
|
and b["chain_count"] > 0
|
|
]
|
|
|
|
# Stable, useful ordering: most-questions-first within each list.
|
|
uncovered.sort(key=lambda b: (-b["question_count"], b["track"], b["topic"]))
|
|
under_covered.sort(key=lambda b: (-b["question_count"], b["track"], b["topic"]))
|
|
|
|
return {
|
|
"thresholds": {
|
|
"uncovered_min_questions": UNCOVERED_MIN_QUESTIONS,
|
|
"under_covered_min_questions": UNDER_COVERED_MIN_QUESTIONS,
|
|
"under_covered_max_chains": UNDER_COVERED_MAX_CHAINS,
|
|
},
|
|
"totals": {
|
|
"buckets": len(bucket_rows),
|
|
"questions": sum(b["question_count"] for b in bucket_rows),
|
|
"chains": sum(b["chain_count"] for b in bucket_rows),
|
|
"uncovered_buckets": len(uncovered),
|
|
"under_covered_buckets": len(under_covered),
|
|
},
|
|
"all_buckets": bucket_rows,
|
|
"uncovered_buckets": uncovered,
|
|
"under_covered_buckets": under_covered,
|
|
}
|
|
|
|
|
|
def print_summary(report: dict) -> None:
|
|
totals = report["totals"]
|
|
print(f"Buckets: {totals['buckets']}")
|
|
print(f"Published questions: {totals['questions']}")
|
|
print(f"Total chains: {totals['chains']}")
|
|
print(f"Uncovered buckets: {totals['uncovered_buckets']} "
|
|
f"(≥{UNCOVERED_MIN_QUESTIONS} questions, 0 chains)")
|
|
print(f"Under-covered: {totals['under_covered_buckets']} "
|
|
f"(≥{UNDER_COVERED_MIN_QUESTIONS} questions, ≤{UNDER_COVERED_MAX_CHAINS} chain)")
|
|
|
|
# Per-track breakdown
|
|
per_track: dict[str, dict[str, int]] = defaultdict(lambda: {
|
|
"buckets": 0, "questions": 0, "chains": 0,
|
|
"uncovered": 0, "under_covered": 0,
|
|
})
|
|
for b in report["all_buckets"]:
|
|
t = per_track[b["track"]]
|
|
t["buckets"] += 1
|
|
t["questions"] += b["question_count"]
|
|
t["chains"] += b["chain_count"]
|
|
for b in report["uncovered_buckets"]:
|
|
per_track[b["track"]]["uncovered"] += 1
|
|
for b in report["under_covered_buckets"]:
|
|
per_track[b["track"]]["under_covered"] += 1
|
|
|
|
print()
|
|
print(f"{'track':<10} {'buckets':>8} {'questions':>10} {'chains':>7} "
|
|
f"{'chains/topic':>13} {'uncov':>6} {'undercov':>9}")
|
|
for track in sorted(per_track):
|
|
t = per_track[track]
|
|
density = t["chains"] / t["buckets"] if t["buckets"] else 0.0
|
|
print(f"{track:<10} {t['buckets']:>8} {t['questions']:>10} {t['chains']:>7} "
|
|
f"{density:>13.2f} {t['uncovered']:>6} {t['under_covered']:>9}")
|
|
|
|
print()
|
|
print("Top 10 uncovered buckets by question count:")
|
|
for b in report["uncovered_buckets"][:10]:
|
|
print(f" {b['track']:<8} {b['topic']:<40} q={b['question_count']}")
|
|
|
|
if report["under_covered_buckets"]:
|
|
print()
|
|
print("Top 10 under-covered buckets:")
|
|
for b in report["under_covered_buckets"][:10]:
|
|
print(f" {b['track']:<8} {b['topic']:<40} "
|
|
f"q={b['question_count']} chains={b['chain_count']}")
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT,
|
|
help=f"output JSON path (default: {DEFAULT_OUTPUT})")
|
|
args = parser.parse_args()
|
|
|
|
corpus = load_published_corpus()
|
|
buckets = bucket_corpus(corpus)
|
|
chains = load_chains()
|
|
cbb = chains_per_bucket(chains)
|
|
|
|
report = build_report(buckets, cbb)
|
|
|
|
args.output.parent.mkdir(parents=True, exist_ok=True)
|
|
with args.output.open("w", encoding="utf-8") as f:
|
|
json.dump(report, f, indent=2, sort_keys=False)
|
|
f.write("\n")
|
|
|
|
print_summary(report)
|
|
print()
|
|
print(f"wrote {args.output.relative_to(REPO_ROOT)}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|