mirror of
https://github.com/harvard-edge/cs249r_book.git
synced 2026-05-06 17:49:07 -05:00
Auto-fix removed extraneous f-string prefixes, unused imports (re, sys, textwrap, defaultdict), an unused local (qids), and converted datetime.now(timezone.utc) to datetime.now(UTC) (UP017). Manual fixes split colon/semicolon one-liners onto separate lines (E701/E702), renamed unused loop vars (cid, chain_id) with leading underscores (B007), replaced bare except with except Exception (E722), and renamed loop var L to level to satisfy N806.
490 lines
18 KiB
Python
Executable File
490 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Validate Gemini-authored draft questions (Phase 3.b).
|
|
|
|
For each ``*.yaml.draft`` under interviews/vault/questions/, run a
|
|
multi-gate scorecard:
|
|
|
|
1. schema — Pydantic Question model (same gate as published)
|
|
2. originality — cosine vs nearest neighbour in the same (track, topic);
|
|
reject if any neighbour exceeds the threshold (default 0.92)
|
|
3. level_fit — Gemini-judge: "does this question's cognitive load match
|
|
level=<L>?", calibrated against ≤5 existing L-level
|
|
questions in the same topic.
|
|
4. coherence — Gemini-judge: "are scenario / question /
|
|
realistic_solution mutually consistent?"
|
|
5. bridge — Gemini-judge: "does this question pedagogically chain
|
|
between <between[0]> and <between[1]> from the gap?"
|
|
|
|
A draft passes when **all** gates return "yes" (or skipped). Output:
|
|
|
|
- per-draft scorecard rows in interviews/vault/draft-validation-scorecard.json
|
|
- stdout summary: pass/fail counts + per-gate failure reasons
|
|
|
|
Use case: pilot run lands ~30 drafts in the tree; this script tells the
|
|
human reviewer which to look at first (passes) vs which to discard
|
|
(failed bridge / failed coherence).
|
|
|
|
The originality gate needs an embedding model. By default it loads
|
|
BAAI/bge-small-en-v1.5 (the same model used for the corpus's
|
|
embeddings.npz) so cosine values are directly comparable. Pass
|
|
``--no-originality`` to skip if the model load is undesirable.
|
|
|
|
The LLM-judge gates need ``gemini`` on PATH (gemini-3.1-pro-preview).
|
|
Pass ``--no-llm-judge`` to skip those gates and only run schema +
|
|
originality.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import subprocess
|
|
import time
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[3]
|
|
VAULT_DIR = REPO_ROOT / "interviews" / "vault"
|
|
QUESTIONS_DIR = VAULT_DIR / "questions"
|
|
EMBEDDINGS_PATH = VAULT_DIR / "embeddings.npz"
|
|
DEFAULT_OUTPUT = VAULT_DIR / "draft-validation-scorecard.json"
|
|
|
|
GEMINI_MODEL = "gemini-3.1-pro-preview"
|
|
ORIGINALITY_THRESHOLD = 0.92 # cosine; >= this is "too duplicative"
|
|
LEVEL_FIT_EXEMPLAR_LIMIT = 5
|
|
|
|
try:
|
|
from vault_cli.models import Question
|
|
except ImportError:
|
|
Question = None # type: ignore
|
|
|
|
|
|
# ─── corpus / drafts ──────────────────────────────────────────────────────
|
|
|
|
|
|
def load_yaml(path: Path) -> dict | None:
|
|
try:
|
|
with path.open(encoding="utf-8") as f:
|
|
d = yaml.safe_load(f)
|
|
except Exception:
|
|
return None
|
|
return d if isinstance(d, dict) else None
|
|
|
|
|
|
def load_corpus_index() -> dict[str, dict]:
|
|
out: dict[str, dict] = {}
|
|
for path in QUESTIONS_DIR.rglob("*.yaml"):
|
|
d = load_yaml(path)
|
|
if d and d.get("id"):
|
|
out[d["id"]] = d
|
|
return out
|
|
|
|
|
|
def find_drafts(scope: Path | None = None) -> list[Path]:
|
|
root = scope or QUESTIONS_DIR
|
|
return sorted(root.rglob("*.yaml.draft"))
|
|
|
|
|
|
def question_payload(q: dict[str, Any]) -> dict[str, Any]:
|
|
d = q.get("details") or {}
|
|
return {
|
|
"id": q.get("id"),
|
|
"level": q.get("level"),
|
|
"title": q.get("title"),
|
|
"scenario": q.get("scenario"),
|
|
"question": q.get("question"),
|
|
"realistic_solution": d.get("realistic_solution"),
|
|
}
|
|
|
|
|
|
# ─── Gate 1: schema ───────────────────────────────────────────────────────
|
|
|
|
|
|
def gate_schema(draft: dict[str, Any]) -> tuple[bool, str]:
|
|
if Question is None:
|
|
return False, "vault_cli not importable; pip install -e interviews/vault-cli/"
|
|
body = {k: v for k, v in draft.items() if not k.startswith("_")}
|
|
if isinstance(body.get("details"), dict):
|
|
body["details"] = {k: v for k, v in body["details"].items() if v is not None}
|
|
try:
|
|
Question.model_validate(body)
|
|
return True, ""
|
|
except Exception as e:
|
|
return False, str(e)[:300]
|
|
|
|
|
|
# ─── Gate 2: originality (cosine vs neighbours) ───────────────────────────
|
|
|
|
|
|
_embed_state: dict[str, Any] = {}
|
|
|
|
|
|
def _load_embedding_model_and_corpus():
|
|
"""Lazy: load BAAI/bge-small-en-v1.5 + corpus vectors once per run."""
|
|
if "model" in _embed_state:
|
|
return _embed_state
|
|
import numpy as np
|
|
from sentence_transformers import SentenceTransformer
|
|
|
|
if not EMBEDDINGS_PATH.exists():
|
|
raise FileNotFoundError(f"missing {EMBEDDINGS_PATH} — needed for originality gate")
|
|
npz = np.load(EMBEDDINGS_PATH, allow_pickle=True)
|
|
model_name = str(npz["model_name"])
|
|
model = SentenceTransformer(model_name)
|
|
_embed_state.update({
|
|
"model": model,
|
|
"model_name": model_name,
|
|
"vectors": npz["vectors"], # (N, dim) L2-normalised
|
|
"qids": [str(x) for x in npz["qids"]],
|
|
"qid_to_row": {str(q): i for i, q in enumerate(npz["qids"])},
|
|
})
|
|
return _embed_state
|
|
|
|
|
|
def gate_originality(
|
|
draft: dict[str, Any],
|
|
corpus: dict[str, dict],
|
|
threshold: float = ORIGINALITY_THRESHOLD,
|
|
) -> tuple[bool, str, dict[str, Any]]:
|
|
"""Return (ok, reason, detail).
|
|
|
|
detail carries the top-1 neighbour qid + cosine, useful for the human
|
|
reviewer to spot-check against.
|
|
"""
|
|
import numpy as np
|
|
state = _load_embedding_model_and_corpus()
|
|
model = state["model"]
|
|
vectors = state["vectors"]
|
|
qid_to_row = state["qid_to_row"]
|
|
|
|
# Embed the draft (concat title + scenario + question — what the v1
|
|
# corpus embedding script also used for its rows).
|
|
text = "\n".join([
|
|
draft.get("title", "") or "",
|
|
draft.get("scenario", "") or "",
|
|
draft.get("question", "") or "",
|
|
])
|
|
vec = model.encode([text], normalize_embeddings=True)[0]
|
|
|
|
# Restrict comparisons to the same (track, topic) bucket — that's
|
|
# where duplicates would actually matter.
|
|
track = draft.get("track")
|
|
topic = draft.get("topic")
|
|
bucket_qids = [
|
|
qid for qid, q in corpus.items()
|
|
if q.get("track") == track and q.get("topic") == topic
|
|
and qid in qid_to_row
|
|
]
|
|
if not bucket_qids:
|
|
return True, "", {"note": "no in-bucket corpus neighbours; skipping"}
|
|
|
|
rows = np.array([qid_to_row[q] for q in bucket_qids], dtype=np.int64)
|
|
# cosine = dot product since both sides are L2-normalised
|
|
sims = vectors[rows] @ vec # (len(rows),)
|
|
top = int(np.argmax(sims))
|
|
top_qid = bucket_qids[top]
|
|
top_cos = float(sims[top])
|
|
|
|
detail = {"top_neighbour": top_qid, "cosine": round(top_cos, 4),
|
|
"threshold": threshold, "bucket_size": len(bucket_qids)}
|
|
if top_cos >= threshold:
|
|
return False, f"too similar to {top_qid} (cosine={top_cos:.3f} >= {threshold})", detail
|
|
return True, "", detail
|
|
|
|
|
|
# ─── Gate 3-5: Gemini judges ──────────────────────────────────────────────
|
|
|
|
|
|
def call_gemini_judge(prompt: str, timeout: int = 240) -> dict | None:
|
|
"""Single judge call; expects strict-JSON {"verdict": "yes|no", "rationale": "..."}."""
|
|
try:
|
|
result = subprocess.run(
|
|
["gemini", "-m", GEMINI_MODEL, "-p", prompt, "--yolo"],
|
|
capture_output=True, text=True, timeout=timeout,
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
return None
|
|
out = (result.stdout or "").strip()
|
|
if out.startswith("```"):
|
|
out = out.strip("`")
|
|
if out.startswith("json"):
|
|
out = out[4:].lstrip()
|
|
i = out.find("{")
|
|
j = out.rfind("}")
|
|
if i == -1 or j == -1:
|
|
return None
|
|
try:
|
|
return json.loads(out[i:j+1])
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
|
|
def _judge_block(draft: dict[str, Any]) -> str:
|
|
return json.dumps(question_payload(draft), indent=2)
|
|
|
|
|
|
def gate_level_fit(draft: dict, corpus: dict[str, dict]) -> tuple[bool, str, dict]:
|
|
target_level = draft.get("level")
|
|
track = draft.get("track")
|
|
topic = draft.get("topic")
|
|
exemplars = sorted(
|
|
[q for q in corpus.values()
|
|
if q.get("track") == track and q.get("topic") == topic
|
|
and q.get("level") == target_level
|
|
and q.get("status") == "published"],
|
|
key=lambda q: q.get("id", ""),
|
|
)[:LEVEL_FIT_EXEMPLAR_LIMIT]
|
|
|
|
if not exemplars:
|
|
return True, "", {"note": f"no published L={target_level} exemplars in bucket; skipping"}
|
|
|
|
prompt = f"""You are calibrating cognitive load. Given an EXAMPLE PAIR of
|
|
existing published interview questions at level={target_level} for
|
|
track={track}, topic={topic}, judge whether the CANDIDATE question
|
|
matches that level's typical cognitive demand.
|
|
|
|
Bloom mapping: L1=remember, L2=understand, L3=apply, L4=analyze,
|
|
L5=evaluate, L6+=create.
|
|
|
|
EXEMPLARS at level={target_level}:
|
|
{json.dumps([question_payload(q) for q in exemplars], indent=2)}
|
|
|
|
CANDIDATE:
|
|
{_judge_block(draft)}
|
|
|
|
Return STRICT JSON with no prose or fences:
|
|
{{"verdict": "yes" | "no", "rationale": "<one sentence>"}}
|
|
"""
|
|
resp = call_gemini_judge(prompt)
|
|
if resp is None:
|
|
return False, "no judge response", {}
|
|
verdict = (resp.get("verdict") or "").strip().lower()
|
|
if verdict == "yes":
|
|
return True, "", {"rationale": resp.get("rationale", "")}
|
|
return False, f"level_fit=no: {resp.get('rationale', '')}", {"rationale": resp.get("rationale")}
|
|
|
|
|
|
def gate_coherence(draft: dict) -> tuple[bool, str, dict]:
|
|
prompt = f"""Judge whether the scenario, question, and realistic_solution
|
|
are MUTUALLY CONSISTENT. Specifically:
|
|
- Does the question logically follow from the scenario?
|
|
- Does the realistic_solution actually answer the question (not adjacent)?
|
|
- Are the numbers / system parameters internally consistent across all
|
|
three fields (no contradictions)?
|
|
|
|
CANDIDATE:
|
|
{_judge_block(draft)}
|
|
|
|
Return STRICT JSON with no prose or fences:
|
|
{{"verdict": "yes" | "no", "rationale": "<one sentence>"}}
|
|
"""
|
|
resp = call_gemini_judge(prompt)
|
|
if resp is None:
|
|
return False, "no judge response", {}
|
|
verdict = (resp.get("verdict") or "").strip().lower()
|
|
if verdict == "yes":
|
|
return True, "", {"rationale": resp.get("rationale", "")}
|
|
return False, f"coherence=no: {resp.get('rationale', '')}", {"rationale": resp.get("rationale")}
|
|
|
|
|
|
def gate_bridge(draft: dict, corpus: dict[str, dict]) -> tuple[bool, str, dict]:
|
|
auth = draft.get("_authoring") or {}
|
|
gap = auth.get("gap") or {}
|
|
between_ids = gap.get("between") or []
|
|
between = [corpus.get(q) for q in between_ids if corpus.get(q)]
|
|
if len(between) < 2:
|
|
# Without two between-questions we can't judge a bridge meaningfully.
|
|
return True, "", {"note": "fewer than 2 between-questions in corpus; skipping"}
|
|
|
|
prompt = f"""Judge whether the CANDIDATE question pedagogically chains
|
|
between the two BETWEEN-questions. Specifically:
|
|
- Is the candidate's cognitive load above between[0]'s level and at or
|
|
below between[1]'s level (Bloom progression direction)?
|
|
- Does the candidate share scenario/concept thread with the between-
|
|
questions (not introducing a new system)?
|
|
- Would inserting the candidate between the two existing questions
|
|
produce a coherent +1 (or +2 last-resort) progression chain?
|
|
|
|
BETWEEN[0] (lower):
|
|
{json.dumps(question_payload(between[0]), indent=2)}
|
|
|
|
BETWEEN[1] (higher):
|
|
{json.dumps(question_payload(between[1]), indent=2)}
|
|
|
|
CANDIDATE:
|
|
{_judge_block(draft)}
|
|
|
|
Return STRICT JSON with no prose or fences:
|
|
{{"verdict": "yes" | "no", "rationale": "<one sentence>"}}
|
|
"""
|
|
resp = call_gemini_judge(prompt)
|
|
if resp is None:
|
|
return False, "no judge response", {}
|
|
verdict = (resp.get("verdict") or "").strip().lower()
|
|
if verdict == "yes":
|
|
return True, "", {"rationale": resp.get("rationale", "")}
|
|
return False, f"bridge=no: {resp.get('rationale', '')}", {"rationale": resp.get("rationale")}
|
|
|
|
|
|
# ─── runner ───────────────────────────────────────────────────────────────
|
|
|
|
|
|
def evaluate_draft(
|
|
draft_path: Path,
|
|
corpus: dict[str, dict],
|
|
args: argparse.Namespace,
|
|
) -> dict[str, Any]:
|
|
draft = load_yaml(draft_path)
|
|
if not draft:
|
|
return {"path": str(draft_path), "verdict": "fail",
|
|
"errors": ["could not load YAML"]}
|
|
|
|
try:
|
|
rel_path = str(draft_path.relative_to(REPO_ROOT))
|
|
except ValueError:
|
|
rel_path = str(draft_path)
|
|
rec: dict[str, Any] = {
|
|
"path": rel_path,
|
|
"draft_id": draft.get("id"),
|
|
"track": draft.get("track"),
|
|
"topic": draft.get("topic"),
|
|
"level": draft.get("level"),
|
|
}
|
|
|
|
# Gate 1 — schema (mandatory)
|
|
ok, why = gate_schema(draft)
|
|
rec["schema_ok"] = ok
|
|
if not ok:
|
|
rec["schema_error"] = why
|
|
rec["verdict"] = "fail"
|
|
return rec # downstream gates assume a structurally valid YAML
|
|
|
|
# Gate 2 — originality
|
|
if args.no_originality:
|
|
rec["originality"] = "skipped"
|
|
else:
|
|
try:
|
|
ok, why, detail = gate_originality(draft, corpus, threshold=args.threshold)
|
|
rec["originality"] = "pass" if ok else "fail"
|
|
rec["originality_detail"] = detail
|
|
if not ok:
|
|
rec["originality_reason"] = why
|
|
except Exception as e:
|
|
rec["originality"] = "error"
|
|
rec["originality_reason"] = str(e)[:200]
|
|
|
|
# Gates 3-5 — Gemini judges
|
|
if args.no_llm_judge:
|
|
rec["level_fit"] = "skipped"
|
|
rec["coherence"] = "skipped"
|
|
rec["bridge"] = "skipped"
|
|
else:
|
|
for name, gate in [("level_fit", gate_level_fit),
|
|
("coherence", gate_coherence),
|
|
("bridge", gate_bridge)]:
|
|
try:
|
|
if name == "coherence":
|
|
ok, why, detail = gate(draft)
|
|
else:
|
|
ok, why, detail = gate(draft, corpus)
|
|
except Exception as e:
|
|
rec[name] = "error"
|
|
rec[f"{name}_reason"] = str(e)[:200]
|
|
continue
|
|
rec[name] = "pass" if ok else "fail"
|
|
rec[f"{name}_detail"] = detail
|
|
if not ok:
|
|
rec[f"{name}_reason"] = why
|
|
time.sleep(args.judge_delay) # be polite between calls
|
|
|
|
# Final verdict: pass iff every non-skipped gate is pass.
|
|
gate_results = [
|
|
rec.get("originality"),
|
|
rec.get("level_fit"),
|
|
rec.get("coherence"),
|
|
rec.get("bridge"),
|
|
]
|
|
has_fail = any(r == "fail" for r in gate_results)
|
|
has_error = any(r == "error" for r in gate_results)
|
|
rec["verdict"] = "fail" if has_fail else ("error" if has_error else "pass")
|
|
return rec
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser(description=__doc__)
|
|
ap.add_argument("--scope", type=Path, default=None,
|
|
help=f"directory tree to scan for *.yaml.draft "
|
|
f"(default {QUESTIONS_DIR})")
|
|
ap.add_argument("--output", type=Path, default=DEFAULT_OUTPUT,
|
|
help=f"scorecard JSON (default {DEFAULT_OUTPUT})")
|
|
ap.add_argument("--no-originality", action="store_true",
|
|
help="skip the embedding-based originality gate")
|
|
ap.add_argument("--no-llm-judge", action="store_true",
|
|
help="skip the Gemini-judge gates (level_fit, coherence, bridge)")
|
|
ap.add_argument("--threshold", type=float, default=ORIGINALITY_THRESHOLD,
|
|
help=f"originality cosine cutoff (default {ORIGINALITY_THRESHOLD})")
|
|
ap.add_argument("--judge-delay", type=float, default=4.0,
|
|
help="seconds between Gemini judge calls (default 4.0)")
|
|
ap.add_argument("--limit", type=int, default=None,
|
|
help="evaluate only the first N drafts")
|
|
args = ap.parse_args()
|
|
|
|
drafts = find_drafts(args.scope)
|
|
if args.limit:
|
|
drafts = drafts[: args.limit]
|
|
if not drafts:
|
|
print(f"no *.yaml.draft files found under {args.scope or QUESTIONS_DIR}")
|
|
return 0
|
|
|
|
corpus = load_corpus_index()
|
|
print(f"corpus: {len(corpus)} published+draft questions; "
|
|
f"drafts to evaluate: {len(drafts)}")
|
|
|
|
rows: list[dict[str, Any]] = []
|
|
for i, p in enumerate(drafts, start=1):
|
|
try:
|
|
display = p.relative_to(REPO_ROOT)
|
|
except ValueError:
|
|
display = p
|
|
print(f"\n[{i}/{len(drafts)}] {display}")
|
|
rec = evaluate_draft(p, corpus, args)
|
|
gate_summary = ", ".join(
|
|
f"{g}={rec.get(g, '-')}"
|
|
for g in ("originality", "level_fit", "coherence", "bridge")
|
|
)
|
|
print(f" verdict={rec.get('verdict'):4s} {gate_summary}")
|
|
if rec.get("verdict") == "fail":
|
|
for k in ("schema_error", "originality_reason",
|
|
"level_fit_reason", "coherence_reason", "bridge_reason"):
|
|
if k in rec:
|
|
print(f" {k}: {str(rec[k])[:200]}")
|
|
rows.append(rec)
|
|
|
|
try:
|
|
out_display = args.output.relative_to(REPO_ROOT)
|
|
except ValueError:
|
|
out_display = args.output
|
|
args.output.parent.mkdir(parents=True, exist_ok=True)
|
|
args.output.write_text(json.dumps({
|
|
"generated_at": datetime.now(UTC).isoformat(timespec="seconds"),
|
|
"originality_threshold": args.threshold,
|
|
"drafts_evaluated": len(rows),
|
|
"passes": sum(1 for r in rows if r.get("verdict") == "pass"),
|
|
"fails": sum(1 for r in rows if r.get("verdict") == "fail"),
|
|
"errors": sum(1 for r in rows if r.get("verdict") == "error"),
|
|
"rows": rows,
|
|
}, indent=2) + "\n")
|
|
print(f"\nwrote {out_display}")
|
|
n_pass = sum(1 for r in rows if r.get("verdict") == "pass")
|
|
n_fail = sum(1 for r in rows if r.get("verdict") == "fail")
|
|
n_err = sum(1 for r in rows if r.get("verdict") == "error")
|
|
print(f"summary: pass={n_pass} fail={n_fail} error={n_err}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|