Files
cs249r_book/interviews/vault/scripts/vault_loop.py
Vijay Janapa Reddi 26e0ab3856 restructure interviews/ with vault separation and per-directory licenses
- Move corpus, taxonomy, chains, scripts into interviews/vault/
- Rename interviews/staffml/ (was interviews/staffml/) as the branded app
- Add CC BY-NC-SA 4.0 LICENSE to: book, kits, labs, slides, instructors, interviews
- Add AGPL-3.0 LICENSE to interviews/staffml/ (the app)
- Add vault LICENSE for pipeline scripts
- Update all GitHub Actions workflows for new paths
- Update README links and vault.yaml export paths
- Fix regex patterns in site/book deploy workflows

License structure:
  interviews/LICENSE      — CC BY-NC-SA 4.0 (corpus + data)
  interviews/staffml/LICENSE — AGPL-3.0 (app code)
  interviews/vault/LICENSE   — pipeline copyright
  book|kits|labs|slides|instructors/LICENSE — CC BY-NC-SA 4.0
  tinytorch/LICENSE       — Apache 2.0 (unchanged)
2026-03-25 15:18:14 -04:00

465 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Vault Loop — Overnight autonomous generation for balanced question coverage.
Unlike vault_fill.py (which fills deficit cells once), this script:
1. Fills deficit cells (coverage gaps)
2. Balances over-represented cells by boosting under-represented ones
3. Loops until the cube is both FULL and BALANCED
4. Logs every round to vault_loop.log
Balance metric: coefficient of variation (CV) per track.
Target: CV < 0.30 across all competency areas within each track,
AND total deficit ≤ 10.
Usage:
# Run overnight (stops when saturated)
python3 vault_loop.py
# Dry run — show balance report only
python3 vault_loop.py --dry-run
# Set max rounds (default: 20)
python3 vault_loop.py --max-rounds 50
# Set parallelism (default: 8)
python3 vault_loop.py --parallel 10
"""
import json
import logging
import math
import subprocess
import sys
import time
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.rule import Rule
from rich import box
from engine.taxonomy import (
normalize_tag, get_area_for_tag, ALL_TAGS, TAXONOMY,
get_cell_target, LEVELS_LIST,
)
console = Console()
# ── Logging ──────────────────────────────────────────────────────────────────
LOG_FILE = Path(__file__).parent / "vault_loop.log"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[
logging.FileHandler(LOG_FILE, mode="a"),
logging.StreamHandler(sys.stdout),
],
)
log = logging.getLogger("vault_loop")
# ── Cube Analysis ────────────────────────────────────────────────────────────
TRACKS = ["cloud", "edge", "mobile", "tinyml"]
AREAS = list(TAXONOMY.keys()) + ["cross-cutting"]
def load_cube():
"""Load corpus and build 3D coverage cube."""
corpus_path = Path(__file__).parent / "corpus.json"
with open(corpus_path) as f:
corpus = json.load(f)
cube = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
for q in corpus:
t = q.get("track", "?")
l = q.get("level", "?")
if l in ("L6", "L6%2B"):
l = "L6+"
tag = normalize_tag(q.get("topic", ""))
area = get_area_for_tag(tag) if tag in ALL_TAGS else "unmapped"
cube[t][area][l] += 1
return cube, len(corpus)
def compute_balance(cube):
"""
Compute balance metrics per track.
Returns:
balance_report: dict with per-track stats
imbalanced_cells: list of cells that need boosting for balance
"""
balance_report = {}
imbalanced_cells = []
for track in TRACKS:
area_totals = {}
for area in AREAS:
total = sum(cube[track][area].get(l, 0) for l in LEVELS_LIST)
target = sum(get_cell_target(track, area, l) for l in LEVELS_LIST)
area_totals[area] = {"count": total, "target": target}
# Compute stats across areas (excluding zero-target areas)
counts = [v["count"] for v in area_totals.values() if v["target"] > 0]
targets = [v["target"] for v in area_totals.values() if v["target"] > 0]
if not counts:
continue
mean_count = sum(counts) / len(counts)
mean_target = sum(targets) / len(targets)
# Ratio: actual / target for each area
ratios = []
for area in AREAS:
info = area_totals[area]
if info["target"] > 0:
ratio = info["count"] / info["target"]
ratios.append(ratio)
mean_ratio = sum(ratios) / len(ratios) if ratios else 1.0
variance = sum((r - mean_ratio) ** 2 for r in ratios) / len(ratios) if ratios else 0
cv = math.sqrt(variance) / mean_ratio if mean_ratio > 0 else 0
balance_report[track] = {
"mean_count": mean_count,
"mean_target": mean_target,
"mean_ratio": mean_ratio,
"cv": cv,
"area_totals": area_totals,
}
# Find under-filled areas relative to the track's mean ratio
# If an area's fill ratio is < 70% of the track's mean, boost it
for area in AREAS:
info = area_totals[area]
if info["target"] == 0:
continue
area_ratio = info["count"] / info["target"]
if area_ratio < mean_ratio * 0.70 and area_ratio < 1.5:
# Find which levels in this area are lowest
for level in LEVELS_LIST:
cell_target = get_cell_target(track, area, level)
cell_count = cube[track][area].get(level, 0)
if cell_target > 0 and cell_count < cell_target * 1.2:
boost = max(1, cell_target - cell_count)
imbalanced_cells.append({
"track": track,
"area": area,
"level": level,
"current": cell_count,
"target": cell_target,
"deficit": boost,
"reason": f"balance (ratio {area_ratio:.1f} vs track mean {mean_ratio:.1f})",
})
return balance_report, imbalanced_cells
def find_deficit_cells(cube):
"""Find all cells below their weighted target (coverage gaps)."""
cells = []
from vault_fill import CONCEPT_MAP
for t in TRACKS:
for a in AREAS:
for l in LEVELS_LIST:
target = get_cell_target(t, a, l)
current = cube[t][a].get(l, 0)
deficit = max(0, target - current)
if deficit > 0:
cells.append({
"track": t,
"area": a,
"level": l,
"current": current,
"target": target,
"deficit": deficit,
"concept": CONCEPT_MAP.get(a, a),
"reason": "coverage",
})
return cells
def print_balance_report(balance_report, round_num=None):
"""Pretty-print the balance report."""
title = "Balance Report"
if round_num is not None:
title += f" (Round {round_num})"
table = Table(box=box.ROUNDED, title=title)
table.add_column("Track", width=8)
table.add_column("Avg Count", justify="center", width=10)
table.add_column("Avg Target", justify="center", width=10)
table.add_column("Fill Ratio", justify="center", width=10)
table.add_column("CV", justify="center", width=8)
table.add_column("Status", width=10)
for track in TRACKS:
if track not in balance_report:
continue
info = balance_report[track]
cv = info["cv"]
status = "[green]BALANCED[/]" if cv < 0.30 else "[yellow]UNEVEN[/]" if cv < 0.50 else "[red]IMBALANCED[/]"
table.add_row(
track,
f"{info['mean_count']:.1f}",
f"{info['mean_target']:.1f}",
f"{info['mean_ratio']:.2f}x",
f"{cv:.2f}",
status,
)
console.print(table)
def print_area_breakdown(balance_report):
"""Show per-area fill ratios within each track."""
for track in TRACKS:
if track not in balance_report:
continue
info = balance_report[track]
table = Table(box=box.SIMPLE, title=f"{track.upper()} — Area Breakdown")
table.add_column("Area", width=14)
table.add_column("Count", justify="center", width=7)
table.add_column("Target", justify="center", width=7)
table.add_column("Ratio", justify="center", width=7)
table.add_column("Bar", width=20)
sorted_areas = sorted(
info["area_totals"].items(),
key=lambda x: x[1]["count"] / max(x[1]["target"], 1),
)
for area, data in sorted_areas:
if data["target"] == 0:
continue
ratio = data["count"] / data["target"]
bar_len = min(20, int(ratio * 10))
bar = "" * bar_len + "" * (20 - bar_len)
color = "green" if ratio >= 1.0 else "yellow" if ratio >= 0.7 else "red"
table.add_row(
area,
str(data["count"]),
str(data["target"]),
f"[{color}]{ratio:.1f}x[/{color}]",
f"[{color}]{bar}[/{color}]",
)
console.print(table)
# ── Main Loop ────────────────────────────────────────────────────────────────
def is_saturated(deficit_cells, balance_report):
"""Check if we should stop."""
total_deficit = sum(c["deficit"] for c in deficit_cells)
# All tracks balanced?
all_balanced = all(
info["cv"] < 0.30
for info in balance_report.values()
)
return total_deficit <= 10 and all_balanced
def run_round(cells_to_fill, parallel: int = 8):
"""Run one generation round: plan → run → merge → rebuild."""
if not cells_to_fill:
return 0
from vault_fill import VAULT_DIR, CONCEPT_MAP
VAULT_DIR.mkdir(exist_ok=True)
# Clean old vault files
for f in VAULT_DIR.glob("*.json"):
f.unlink()
worker = Path(__file__).parent / "engine" / "vault_worker.py"
commands = []
for i, c in enumerate(cells_to_fill):
out_file = VAULT_DIR / f"cell_{i:03d}_{c['track']}_{c['area']}_{c['level']}.json"
concept = c.get("concept", CONCEPT_MAP.get(c["area"], c["area"]))
cmd = [
"python3", str(worker),
c["track"], concept, c["level"],
"1", # Generate 1 per cell for even distribution
str(out_file),
]
commands.append((cmd, out_file, c))
# Run in parallel batches
total_generated = 0
batch_size = parallel
for batch_start in range(0, len(commands), batch_size):
batch = commands[batch_start:batch_start + batch_size]
procs = []
for cmd, out_file, cell in batch:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
cwd=str(Path(__file__).parent),
)
procs.append((proc, out_file, cell))
for proc, out_file, cell in procs:
proc.wait()
if out_file.exists():
try:
data = json.loads(out_file.read_text())
total_generated += len(data)
console.print(
f" [green]✓[/] {cell['track']}/{cell['area']}/{cell['level']} "
f"+{len(data)} ({cell.get('reason', 'coverage')})"
)
except Exception:
console.print(f" [red]✗[/] {cell['track']}/{cell['area']}/{cell['level']} parse error")
else:
console.print(f" [red]✗[/] {cell['track']}/{cell['area']}/{cell['level']} no output")
# Merge
from vault_fill import cmd_merge
cmd_merge()
return total_generated
def main(max_rounds: int = 20, parallel: int = 8, dry_run: bool = False):
"""Main loop: fill + balance until saturated."""
log.info("=" * 60)
log.info(f"VAULT LOOP STARTED | max_rounds={max_rounds} parallel={parallel}")
log.info("=" * 60)
for round_num in range(1, max_rounds + 1):
console.print()
console.print(Rule(f"[bold cyan]Round {round_num} / {max_rounds}[/]", style="cyan"))
# Rebuild corpus first
subprocess.run(
["python3", "build_corpus.py"],
capture_output=True,
cwd=str(Path(__file__).parent),
)
# Analyze
cube, total = load_cube()
deficit_cells = find_deficit_cells(cube)
balance_report, imbalanced_cells = compute_balance(cube)
total_deficit = sum(c["deficit"] for c in deficit_cells)
log.info(
f"Round {round_num}: {total} questions | "
f"deficit={total_deficit} in {len(deficit_cells)} cells | "
f"imbalanced={len(imbalanced_cells)} cells"
)
# Print reports
console.print(Panel(
f"[bold]{total}[/] questions\n"
f"[bold red]{len(deficit_cells)}[/] deficit cells ({total_deficit} questions)\n"
f"[bold yellow]{len(imbalanced_cells)}[/] imbalanced cells",
title=f"[bold]Round {round_num} Status",
border_style="cyan",
))
print_balance_report(balance_report, round_num)
print_area_breakdown(balance_report)
# Check saturation
if is_saturated(deficit_cells, balance_report):
log.info(f"SATURATED at round {round_num}! Deficit ≤ 10 and all tracks balanced.")
console.print(Panel(
f"[bold green]SATURATED![/]\n\n"
f"Total questions: {total}\n"
f"Deficit: {total_deficit}\n"
f"All tracks have CV < 0.30",
title="[bold green]Cube Complete",
border_style="green",
))
break
if dry_run:
console.print("[dim]Dry run — not generating.[/]")
break
# Combine deficit + imbalanced cells, dedup by (track, area, level)
seen = set()
cells_to_fill = []
# Deficit cells first (coverage is priority)
for c in deficit_cells:
key = (c["track"], c["area"], c["level"])
if key not in seen:
seen.add(key)
cells_to_fill.append(c)
# Then balance cells
for c in imbalanced_cells:
key = (c["track"], c["area"], c["level"])
if key not in seen:
seen.add(key)
cells_to_fill.append(c)
if not cells_to_fill:
log.info("Nothing to generate. Stopping.")
break
log.info(f"Generating {len(cells_to_fill)} cells ({sum(c['deficit'] for c in cells_to_fill)} questions)")
generated = run_round(cells_to_fill, parallel=parallel)
log.info(f"Round {round_num} complete: +{generated} questions")
# Cooldown between rounds (respect Gemini rate limits)
if round_num < max_rounds:
console.print("[dim]Cooling down 10s before next round...[/]")
time.sleep(10)
# Final summary
cube, total = load_cube()
deficit_cells = find_deficit_cells(cube)
balance_report, _ = compute_balance(cube)
total_deficit = sum(c["deficit"] for c in deficit_cells)
log.info(f"FINAL: {total} questions | deficit={total_deficit} | rounds={round_num}")
console.print()
console.print(Rule("[bold green]Final State[/]", style="green"))
print_balance_report(balance_report)
print_area_breakdown(balance_report)
console.print(Panel(
f"[bold]{total}[/] total questions\n"
f"[bold]{total_deficit}[/] remaining deficit\n"
f"[bold]{round_num}[/] rounds completed\n"
f"Log: {LOG_FILE}",
title="[bold green]Vault Loop Complete",
border_style="green",
))
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Vault Loop — overnight balanced generation")
parser.add_argument("--max-rounds", type=int, default=20, help="Max generation rounds (default: 20)")
parser.add_argument("--parallel", type=int, default=8, help="Max parallel Gemini calls (default: 8)")
parser.add_argument("--dry-run", action="store_true", help="Show balance report without generating")
args = parser.parse_args()
main(max_rounds=args.max_rounds, parallel=args.parallel, dry_run=args.dry_run)