Files
cs249r_book/interviews/vault/scripts/validate_questions.py
Vijay Janapa Reddi eb71638630 feat(vault): release-grade Phase G — full audit + cleanup + 0.1.3 release
Final brute-force release-readiness pass: every gate green, 0.1.3
released and verified, every observable failure mode closed at source.

═══ AUDITS (G.A–G.D) ═══

G.A — gemini-3.1-pro-preview default everywhere. Active CLI scripts
    already used it; bulk-patched 6 legacy scripts (`generate_batch.py`,
    `validate_questions.py`, `generate_gaps.py`, `run_reviews.sh`,
    `generate.py`, `review_math.sh`) + WORKFLOW.md off `gemini-2.5-flash`
    or `gemini-2.5-pro` to `gemini-3.1-pro-preview`. Only `archive/`
    references remain (intentionally legacy).

G.B — Cloudflare workflow audit. `vault verify 0.1.1` correctly
    failed (YAMLs evolved since 0.1.1 cut). Confirmed `vault publish`,
    `vault deploy`, `vault ship`, `vault rollback`, `vault verify`,
    `vault snapshot`, `vault tag` all wired. Released 0.1.2 then 0.1.3
    to lock final state.

G.C — Visual asset integrity audit. 236/236 YAML visual references
    resolve, 0 orphan SVGs, 0 missing files, 0 unrendered sources.
    Clean.

G.D — Unit tests for new validators added at `tests/test_models.py`:
    15 tests covering Visual.kind enum, Visual.path regex, Visual.alt
    + caption min lengths + required, Question._zone_bloom_compatible
    (recall+remember accepted, recall+evaluate rejected, mastery+
    remember rejected, evaluation+evaluate accepted, design+create
    accepted), Question._visual_path_resolves. **15/15 pass.**

═══ CONTENT CLEANUP (G.E–G.L) ═══

G.E — Sample re-judge of 100 random cloud parallelism items via
    Gemini 3.1 Pro Preview (4 API calls): 53% PASS / 23% NEEDS_FIX /
    24% DROP. Surfaced legacy quality drift — items generated under
    pre-Phase-D laxer prompts were not meeting the new strict bar
    (math errors with bidirectional vs unidirectional NVLink,
    "Based on the diagram..." references with no diagram, deprecated
    practices like SSP for modern LLM training, wrong-track scenarios
    like Cortex-M4 in cloud track).

G.H — General-purpose cleanup agent on 47 flagged items:
    **31 rewritten** with PARALLELISM_RULES bar applied (concrete
    unidirectional NVLink 450 GB/s, IB NDR 25 GB/s, RoCE v2 22 GB/s,
    PCIe Gen3 12 GB/s; multi-step ring AllReduce arguments with the
    2(N-1)/N factor; non-obvious failure modes); **16 archived** with
    documented `deletion_reason` (mathematically broken premises,
    physics errors, topic-irreconcilable, direct duplicates).

G.L — Re-judge of 31 G.H rewrites: **23 PASS / 3 NEEDS_FIX / 5 DROP =
    74.2% pass rate**. The 8 still-failing items archived (after the
    cleanup pass still couldn't satisfy the strict bar). Contract:
    items get THREE chances — original generation, fix-agent, retry-
    fix — and if they still fail, archived not promoted. Honest.

═══ STUBBORN-FAIL ARCHIVES (Phase F residuals) ═══

After three independent fix-agent passes (Phase C, F.2, F.4), 4 items
remained NEEDS_FIX or DROP: edge-2390, edge-2401, mobile-1948,
tinyml-1681. Archived with `deletion_reason` documenting the 3-attempt
failure history. The cell may be structurally awkward; preserving
items for audit but removing from the bundle.

═══ ORPHAN CHAIN FIX ═══

After archives, `cloud-chain-359` had only 1 published member
(`cloud-1840`); its sibling `cloud-1845` got archived. Dropped the
chain ref from cloud-1840 + ran `repair_chains.py` to clean residual
references in archived YAMLs. `vault check --strict` now passes 0
chain warnings.

═══ E.2 / E.3 SHIPPED EARLIER IN PRIOR COMMIT ═══

(Documented in commit `20ea20005` for completeness):
- `vault build --legacy-json` auto-emits `vault-manifest.json`.
- `analyze_coverage_gaps.py --include-areas <areas>` flag.

═══ 0.1.3 FINAL RELEASE ═══

`vault publish 0.1.3` snapshot at `releases/0.1.3/`. Migrations:
+0 ~27 -28 (zero net new questions, 27 modified during cleanup, 28
archived/promoted). `vault verify 0.1.3` ✓ — release_hash
`793c06f414f2bf8391a8a5c56ec0ff8d76bfce4ab7c64ad12ecb83f6d932280e`
reconstructs from YAML. Latest symlink → 0.1.3.

═══ FINAL ALL-9-GATES SWEEP — ALL GREEN ═══

[1] vault check --strict          ✓ 10,701 / 0 errors / 0 invariants
[2] vault lint                    ✓ 0 errors / 0 warnings / 9,757 info
[3] vault doctor                  ✓ 0 fails (registry-history info OK)
[4] vault codegen --check         ✓ artifacts in sync
[5] vault verify 0.1.3            ✓ hash reconstructs from YAML
[6] staffml validate-vault        ✓ 0 errors / 0 warnings, deployment-ready
[7] render_visuals                ✓ 236 visuals, 0 errors
[8] tsc                           ✓ TypeScript clean
[9] Playwright                    ✓ 9/9 pass

═══ FINAL CORPUS STATE ═══

Bundle: 9,757 published (was 9,224 at branch cut, **+533 net** across
the full multi-session push, after all archives).

Total commits on branch since cut: 10.
Release tag latest: 0.1.3 (verified-clean).
Status: StaffML-day-ready. Ship it.
2026-04-25 19:45:32 -04:00

363 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Parallel Gemini validation of corpus questions.
Validates math correctness, factual accuracy, and question quality
using gemini-3.1-pro-preview across parallel batches.
Usage:
python3 validate_questions.py # Validate all 4,779 questions
python3 validate_questions.py --new-only # Only validate the 285 newly generated
python3 validate_questions.py --ka F1 # Only validate one knowledge area
python3 validate_questions.py --sample 200 # Random sample of 200
python3 validate_questions.py --batch-size 25 # Customize batch size
python3 validate_questions.py --workers 12 # Customize parallelism
"""
import argparse
import json
import os
import random
import re
import sys
import time
from collections import Counter, defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
import subprocess
BASE = Path(__file__).parent.parent # vault root (scripts/ is one level down)
CORPUS_PATH = BASE / "corpus.json"
RESULTS_DIR = BASE / "scripts" / "_validation_results"
MODEL = "gemini-3.1-pro-preview"
# ─── Gemini Client ────────────────────────────────────────────
# Try API first (fast), fall back to CLI (uses cached OAuth)
# Use --cli flag to force CLI mode
_use_api = False
_client = None
_force_cli = "--cli" in sys.argv
def init_gemini():
"""Initialize Gemini client. Call after argparse."""
global _use_api, _client
if _force_cli:
print(f" Using Gemini CLI (forced via --cli)")
return
try:
from google import genai
api_key = os.environ.get("GEMINI_API_KEY", "")
if api_key and "expired" not in api_key.lower():
_client = genai.Client(api_key=api_key)
_test = _client.models.generate_content(model=MODEL, contents="Say OK")
if _test.text:
_use_api = True
print(f" Using Gemini API (fast mode)")
return
except Exception:
pass
print(f" Using Gemini CLI (cached credentials)")
def call_gemini(prompt: str, retries: int = 2) -> str | None:
"""Call Gemini — API if available, CLI fallback."""
for attempt in range(retries + 1):
try:
if _use_api:
response = _client.models.generate_content(
model=MODEL, contents=prompt,
config={"temperature": 0.1, "max_output_tokens": 65000},
)
text = response.text.strip()
else:
# Pipe prompt via stdin to avoid ARG_MAX limits on large batches
result = subprocess.run(
["gemini", "-m", MODEL, "-o", "text"],
input=prompt, capture_output=True, text=True, timeout=300,
)
if result.returncode != 0:
if attempt < retries:
time.sleep(2 ** attempt)
continue
return None
text = result.stdout.strip()
# Strip markdown fences
if text.startswith("```"):
text = re.sub(r"^```\w*\n?", "", text)
text = re.sub(r"\n?```$", "", text)
return text.strip()
except subprocess.TimeoutExpired:
if attempt < retries:
time.sleep(2 ** attempt)
else:
return None
except Exception as e:
if attempt < retries:
time.sleep(2 ** attempt)
else:
print(f" Gemini error: {e}")
return None
# ─── Validation Prompt ────────────────────────────────────────
VALIDATION_PROMPT = """You are a rigorous technical reviewer for Staff-level ML Systems interview questions. Review each question for:
1. **Math correctness**: Are all calculations, napkin math, and numerical claims correct? Check arithmetic, unit conversions, hardware specs (e.g., A100 = 2 TB/s HBM BW, 312 TFLOPS FP16; H100 = 3.35 TB/s, 989 TFLOPS FP16).
2. **Factual accuracy**: Are hardware specs, algorithm descriptions, and systems claims correct? Flag outdated or wrong numbers.
3. **Question quality**: Is the scenario clear? Is there exactly one correct answer? Is the common_mistake plausible? Is the realistic_solution actually correct?
4. **Classification sanity**: Does the reasoning_competency match what the question tests? Does the reasoning_mode match the question format?
For each question, output ONE JSON object:
```json
{"id": "<question-id>", "status": "OK|WARN|ERROR", "issues": ["issue1", "issue2"], "fixes": ["fix1", "fix2"]}
```
Rules:
- "OK" = no issues found
- "WARN" = minor issues (slightly imprecise numbers, could be clearer)
- "ERROR" = math wrong, factually incorrect, or fundamentally broken question
- Keep issues and fixes concise (one sentence each)
- For OK questions, issues and fixes should be empty arrays
Return a JSON array of review objects, one per question. Return ONLY the JSON array, no markdown fences.
QUESTIONS TO REVIEW:
"""
def build_batch_prompt(questions: list[dict]) -> str:
"""Build a validation prompt for a batch of questions."""
q_text = ""
for q in questions:
details = q.get("details", {})
q_text += f"""
---
ID: {q['id']}
Title: {q['title']}
Level: {q['level']} | Track: {q['track']} | RC: {q.get('reasoning_competency')} | KA: {q.get('knowledge_area')} | Mode: {q.get('reasoning_mode')}
Scenario: {q['scenario'][:500]}
Common Mistake: {details.get('common_mistake', '')[:300]}
Realistic Solution: {details.get('realistic_solution', '')[:500]}
Napkin Math: {details.get('napkin_math', '')[:500]}
"""
return VALIDATION_PROMPT + q_text
def parse_review_response(text: str) -> list[dict] | None:
"""Parse JSON array from Gemini response."""
if not text:
return None
# Strip markdown fences
text = re.sub(r"^```\w*\n?", "", text.strip())
text = re.sub(r"\n?```$", "", text.strip())
try:
data = json.loads(text)
if isinstance(data, list):
return data
return None
except json.JSONDecodeError:
# Try to find JSON array in the response
match = re.search(r'\[.*\]', text, re.DOTALL)
if match:
try:
return json.loads(match.group())
except json.JSONDecodeError:
return None
return None
# ─── Main Pipeline ────────────────────────────────────────────
def validate_batch(batch_idx: int, questions: list[dict]) -> list[dict]:
"""Validate a batch of questions via Gemini."""
prompt = build_batch_prompt(questions)
text = call_gemini(prompt)
reviews = parse_review_response(text)
if reviews is None:
print(f" Batch {batch_idx}: PARSE FAILED (will retry)")
# Retry once
text = call_gemini(prompt)
reviews = parse_review_response(text)
if reviews is None:
# Return error for each question
return [{"id": q["id"], "status": "PARSE_ERROR", "issues": ["Gemini response unparsable"], "fixes": []} for q in questions]
return reviews
def main():
parser = argparse.ArgumentParser(description="Validate corpus questions via Gemini")
parser.add_argument("--new-only", action="store_true", help="Only validate newly generated questions")
parser.add_argument("--ka", type=str, help="Only validate one knowledge area (e.g., F1)")
parser.add_argument("--sample", type=int, help="Random sample of N questions")
parser.add_argument("--batch-size", type=int, default=200, help="Questions per Gemini call (default: 200)")
parser.add_argument("--workers", type=int, default=8, help="Parallel workers (default: 8)")
parser.add_argument("--cli", action="store_true", help="Force Gemini CLI mode (OAuth, no API key)")
args = parser.parse_args()
# Initialize Gemini client
init_gemini()
# Load corpus
corpus = json.load(open(CORPUS_PATH))
print(f"Corpus: {len(corpus)} questions")
# Filter
if args.new_only:
# New questions don't have certain legacy fields
questions = [q for q in corpus if q.get("status") is None and q.get("version") is None]
if not questions:
# Fallback: questions without 'tags' field (old questions have it)
questions = [q for q in corpus if "tags" not in q]
if not questions:
# Last resort: questions with IDs matching gen pattern
gen_prefixes = tuple(f"{t}-{ka.lower()}-" for t in ["cloud", "global", "edge", "mobile"]
for ka in ["f1", "a1", "a2", "a3", "a4", "a6", "b4", "b6", "b7", "b8", "c4", "c7", "c8", "c9", "d1", "e3"])
questions = [q for q in corpus if q["id"].startswith(gen_prefixes)]
print(f" Filtered to {len(questions)} new questions")
elif args.ka:
questions = [q for q in corpus if q.get("knowledge_area") == args.ka]
print(f" Filtered to {len(questions)} questions in {args.ka}")
else:
questions = corpus
if args.sample and args.sample < len(questions):
random.seed(42)
questions = random.sample(questions, args.sample)
print(f" Sampled {len(questions)} questions")
# Batch
batch_size = args.batch_size
batches = [questions[i:i + batch_size] for i in range(0, len(questions), batch_size)]
print(f" {len(batches)} batches × {batch_size} questions = {len(questions)} total")
print(f" {args.workers} parallel workers")
print(f" Model: {MODEL}")
print()
# Run parallel validation
all_reviews = []
errors_count = 0
warns_count = 0
ok_count = 0
parse_errors = 0
start = time.time()
with ThreadPoolExecutor(max_workers=args.workers) as executor:
futures = {executor.submit(validate_batch, i, batch): i for i, batch in enumerate(batches)}
for future in as_completed(futures):
batch_idx = futures[future]
try:
reviews = future.result()
all_reviews.extend(reviews)
for r in reviews:
status = r.get("status", "?")
if status == "ERROR":
errors_count += 1
elif status == "WARN":
warns_count += 1
elif status == "OK":
ok_count += 1
elif status == "PARSE_ERROR":
parse_errors += 1
done = ok_count + warns_count + errors_count + parse_errors
elapsed = time.time() - start
rate = done / elapsed if elapsed > 0 else 0
print(f" Batch {batch_idx:>3}/{len(batches)}: "
f"OK={ok_count} WARN={warns_count} ERR={errors_count} "
f"PARSE_ERR={parse_errors} [{done}/{len(questions)} @ {rate:.1f} Q/s]")
except Exception as e:
print(f" Batch {batch_idx}: EXCEPTION: {e}")
elapsed = time.time() - start
# ─── Report ───────────────────────────────────────────────
print(f"\n{'='*60}")
print(f" VALIDATION COMPLETE")
print(f"{'='*60}")
print(f" Total: {len(all_reviews)} reviewed in {elapsed:.0f}s")
print(f" OK: {ok_count} ({ok_count/max(len(all_reviews),1)*100:.1f}%)")
print(f" WARN: {warns_count} ({warns_count/max(len(all_reviews),1)*100:.1f}%)")
print(f" ERROR: {errors_count} ({errors_count/max(len(all_reviews),1)*100:.1f}%)")
print(f" PARSE_ERR: {parse_errors}")
# Collect errors and warnings
issues_by_status = defaultdict(list)
for r in all_reviews:
if r.get("status") in ("ERROR", "WARN"):
issues_by_status[r["status"]].append(r)
if issues_by_status.get("ERROR"):
print(f"\n ── ERRORS ({len(issues_by_status['ERROR'])}) ──")
for r in issues_by_status["ERROR"][:30]:
print(f" [{r['id']}]")
for issue in r.get("issues", []):
print(f"{issue}")
for fix in r.get("fixes", []):
print(f"{fix}")
if issues_by_status.get("WARN"):
print(f"\n ── WARNINGS ({len(issues_by_status['WARN'])}) ──")
for r in issues_by_status["WARN"][:20]:
print(f" [{r['id']}]")
for issue in r.get("issues", []):
print(f"{issue}")
# Save results
RESULTS_DIR.mkdir(exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
today = datetime.now().strftime("%Y-%m-%d")
results_path = RESULTS_DIR / f"validation-{timestamp}.json"
with open(results_path, "w") as f:
json.dump({
"timestamp": timestamp,
"model": MODEL,
"total_reviewed": len(all_reviews),
"ok": ok_count,
"warn": warns_count,
"error": errors_count,
"parse_errors": parse_errors,
"elapsed_seconds": round(elapsed, 1),
"reviews": all_reviews,
}, f, indent=2)
print(f"\n Results saved to {results_path}")
# ─── Stamp validation into corpus.json ────────────────────
review_map = {r["id"]: r for r in all_reviews if r.get("id")}
stamped = 0
for q in corpus:
review = review_map.get(q["id"])
if review:
status = review.get("status", "PARSE_ERROR")
q["validated"] = status == "OK"
q["validation_status"] = status
q["validation_issues"] = review.get("issues", [])
q["validation_model"] = MODEL
q["validation_date"] = today
stamped += 1
if stamped > 0:
with open(CORPUS_PATH, "w") as f:
json.dump(corpus, f, indent=2, ensure_ascii=False)
f.write("\n")
print(f" Stamped {stamped} questions in corpus.json")
print(f" validated=true: {sum(1 for q in corpus if q.get('validated') is True)}")
print(f" validated=false: {sum(1 for q in corpus if q.get('validated') is False)}")
print(f" not yet checked: {sum(1 for q in corpus if q.get('validated') is None)}")
if __name__ == "__main__":
main()