Files
cs249r_book/interviews/vault-cli/scripts/build_chains_with_gemini.py
Vijay Janapa Reddi a74c98576e Merge origin/dev into yaml-audit
Sync the yaml-audit branch with the latest dev work since the previous
sync (5c5af75ed). Brings in 73 commits including:

  - CI security fixes: postcss XSS bump, uuid bounds bump, codeql
    paths-ignore for vendored bundles, read-only token on
    staffml-validate-vault workflow
  - kits/ dark mode polish: code-block readability, dropdown contrast
  - vault-cli/: pre-commit ruff hook + 20 ruff fixes, all-contributors
    auto-credit workflow change to pull_request_target
  - dev's earlier merge of yaml-audit (836d481b5) carrying the
    pre-trailer-strip Phase 1/2/3 history; this merge harmonises that
    with the current trailer-clean yaml-audit tip
  - misc bug fixes (tinytorch perceptron seed, infra workflows,
    socratiq vite dev injector)

Conflicts resolved (if any) preserve the yaml-audit-side authoritative
state for vault/* files (we own those) and the dev-side authoritative
state for .github/workflows/* and other shared infrastructure.

# Conflicts:
#	.github/workflows/all-contributors-auto-credit.yml
#	.github/workflows/staffml-preview-dev.yml
#	interviews/staffml/src/data/corpus-summary.json
#	interviews/staffml/src/data/vault-manifest.json
#	interviews/staffml/tests/chain-and-vault-smoke.mjs
#	interviews/vault-cli/README.md
#	interviews/vault-cli/docs/CHAIN_ROADMAP.md
#	interviews/vault-cli/scripts/build_chains_with_gemini.py
#	interviews/vault-cli/scripts/generate_question_for_gap.py
#	interviews/vault-cli/scripts/merge_chain_passes.py
#	interviews/vault-cli/scripts/validate_drafts.py
#	interviews/vault-cli/src/vault_cli/legacy_export.py
#	interviews/vault-cli/tests/test_chain_validation.py
#	interviews/vault/.gitignore
#	interviews/vault/ARCHITECTURE.md
#	interviews/vault/chains.json
#	interviews/vault/id-registry.yaml
#	interviews/vault/questions/edge/optimization/edge-2536.yaml
#	interviews/vault/questions/mobile/deployment/mobile-2147.yaml
#	tinytorch/src/03_layers/03_layers.py
2026-05-02 11:06:43 -04:00

591 lines
24 KiB
Python

#!/usr/bin/env python3
"""Build pedagogical chains within (track, topic) buckets via Gemini CLI.
For each bucket of published questions, prompts gemini-3.1-pro-preview to
identify natural chains (groups of 2-6 questions progressing through Bloom
levels, where one builds on another). Output is validated against the
chain schema and written to a staging file for human review before
replacing chains.json.
Design decisions:
- Sidecar architecture: chains.json is the authoritative registry.
This script writes a *new* staging chains.json — never edits YAMLs.
- Adaptive batching: packs multiple small buckets per call to use
Gemini's 1M context efficiently without maxing it (target ~80K
input tokens per call). Aim: full corpus in ≤90 calls (250/day cap).
- Validation: every chain is checked structurally — all member ids
exist in input, level non-decreasing, 2 ≤ size ≤ 6, single-topic.
Usage:
python3 build_chains_with_gemini.py --dry-run # preview batching plan
python3 build_chains_with_gemini.py --bucket cloud:kv-cache # one bucket
python3 build_chains_with_gemini.py --all # full corpus
python3 build_chains_with_gemini.py --output proposed_chains.json --all
Modes:
--mode strict (default): Δ ∈ {1, 2} between consecutive members. This is
the cleanest pedagogical shape and what we want for primary chains.
--mode lenient: Δ ∈ {1, 2, 3}. Used for second-pass coverage on buckets
the strict pass missed; resulting chains are tagged tier=secondary.
Earlier revisions of lenient mode also allowed Δ=0 for
"shared scenario, different angle" pairs; that constraint did not
bind in practice (audit found 54/55 Δ=0 chains had no shared
scenario), so Δ=0 was removed 2026-05-02.
Bucket scoping:
--buckets-from <chain-coverage.json>: limit the run to the
``uncovered_buckets`` list in a coverage report (output of
diagnose_chain_coverage.py). Use with --mode lenient for the
Phase 1.4 second-pass sweep.
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
import time
from collections import defaultdict
from pathlib import Path
import yaml
VAULT_DIR = Path(__file__).resolve().parents[2] / "vault"
QUESTIONS_DIR = VAULT_DIR / "questions"
# AI-pipeline intermediate artifacts live under _pipeline/ (gitignored).
# See interviews/CLAUDE.md for the convention.
PIPELINE_DIR = VAULT_DIR / "_pipeline"
DEFAULT_OUTPUT = PIPELINE_DIR / "chains.proposed.json"
GEMINI_MODEL = "gemini-3.1-pro-preview"
TOKENS_PER_CHAR = 0.25
MAX_INPUT_CHARS_PER_CALL = 320_000 # ~80K tokens — safely under 1M
MAX_QUESTIONS_PER_CALL = 250 # Gemini quality degrades on huge tasks
SCENARIO_CHAR_BUDGET = 280 # truncate per question for prompt budget
LEVEL_RANK = {"L1": 1, "L2": 2, "L3": 3, "L4": 4, "L5": 5, "L6+": 6}
def load_corpus() -> dict[str, dict]:
"""Load all published question YAMLs."""
corpus = {}
for path in QUESTIONS_DIR.rglob("*.yaml"):
try:
with open(path) as f:
d = yaml.safe_load(f)
if d.get("status") not in ("published", None):
continue
corpus[d["id"]] = d
except Exception:
continue
return corpus
def bucket_corpus(corpus: dict[str, dict]) -> dict[tuple[str, str], list[str]]:
"""(track, topic) -> sorted list of qids."""
by_bucket: dict[tuple[str, str], list[str]] = defaultdict(list)
for qid, d in corpus.items():
by_bucket[(d.get("track"), d.get("topic"))].append(qid)
for k in by_bucket:
by_bucket[k].sort()
return dict(by_bucket)
def question_payload(corpus: dict[str, dict], qid: str) -> dict:
"""Compact JSON payload for one question — input to Gemini."""
d = corpus[qid]
scenario = (d.get("scenario") or "")[:SCENARIO_CHAR_BUDGET]
return {
"id": qid,
"level": d.get("level"),
"title": d.get("title", ""),
"question": d.get("question") or "",
"scenario": scenario,
"competency_area": d.get("competency_area"),
}
def estimate_chars(buckets: list[tuple[tuple[str, str], list[str]]],
corpus: dict[str, dict]) -> int:
"""Roughly estimate the prompt size of these buckets."""
n = 0
for _, qids in buckets:
for qid in qids:
d = corpus[qid]
n += len(d.get("title", "")) + min(len(d.get("scenario","")), SCENARIO_CHAR_BUDGET) + len(d.get("question","") or "") + 40
return n
def plan_batches(buckets: dict[tuple[str, str], list[str]],
corpus: dict[str, dict]) -> list[list[tuple[tuple[str, str], list[str]]]]:
"""Pack buckets into batches under MAX_INPUT_CHARS_PER_CALL each."""
items = sorted(buckets.items(), key=lambda x: -len(x[1])) # big first
batches: list[list[tuple[tuple[str, str], list[str]]]] = []
cur: list[tuple[tuple[str, str], list[str]]] = []
cur_chars = 0
cur_count = 0
for k, qids in items:
item_chars = estimate_chars([(k, qids)], corpus)
if (cur_chars + item_chars > MAX_INPUT_CHARS_PER_CALL
or cur_count + len(qids) > MAX_QUESTIONS_PER_CALL) and cur:
batches.append(cur)
cur = []
cur_chars = 0
cur_count = 0
cur.append((k, qids))
cur_chars += item_chars
cur_count += len(qids)
if cur:
batches.append(cur)
return batches
STRICT_PROMPT_TEMPLATE = """You are an expert ML systems educator helping curate pedagogical chains
of interview questions. A "chain" is a sequence of 2-6 questions within a
SINGLE topic that progress through Bloom levels (L1 → L2 → ... up to L6+),
where each question naturally builds on its predecessor — same scenario or
concept, increasing in cognitive demand.
You will be given several BUCKETS, each containing all published questions
for one (track, topic) pair. For each bucket, identify the BEST natural
chains. A bucket may yield 0 chains (no good progressions), 1 chain (one
arc through the topic), or several chains (multiple distinct arcs).
LEVEL PROGRESSION RULES (HARD):
- Each consecutive pair of members MUST satisfy: cand_level - prev_level ∈ {{1, 2}}
- PREFER strict +1 progression (L1→L2→L3→L4→L5→L6+) — this is the cleanest
pedagogical shape and should account for the majority of chains
- ACCEPT a +2 jump (e.g., L1→L3 or L3→L5) ONLY when no Δ=1 candidate is
available within the bucket and the conceptual progression is genuinely
natural — i.e., the harder question still meaningfully builds on the
easier one even with one Bloom step skipped
- REJECT Δ=0 (same-level pairs) — same Bloom level isn't a progression
- REJECT Δ ≥ 3 (e.g., L1→L4) and any backward step — too large to be a
coherent single-step pedagogical move
OTHER CONSTRAINTS:
- 2 ≤ chain size ≤ 6 members
- All members from the SAME (track, topic) bucket
- A question MAY appear in UP TO 2 different chains if and only if:
(a) The question is L1 or L2 (a foundational anchor)
(b) The two chains diverge into genuinely distinct sub-progressions
AFTER this anchor — not the same arc viewed twice
(c) Each chain is individually coherent and pedagogically valuable
Default to 1 chain per question; multi-membership is the exception.
- Prefer chains where Q[i+1] genuinely builds on Q[i] (shared scenario,
sequential reasoning) over loosely related same-topic questions
- Don't force chains — if questions are unrelated, return 0 chains for
that bucket. Quality over coverage.
GAP DETECTION (free signal — emit alongside chains):
For each bucket, also identify "missing-rung" gaps: pedagogical arcs that
WOULD form a clean strict +1 chain if the bucket had a question at a
specific Bloom level it currently lacks. Example: bucket has L1, L3, L5
on the same scenario thread → propose a missing-L2 and missing-L4
question that would link them. These gaps drive future authoring; we
don't act on them in this pass.
Return STRICT JSON in this exact shape, no prose:
{{
"buckets": [
{{
"track": "<track>",
"topic": "<topic>",
"chains": [
{{
"questions": ["<qid1>", "<qid2>", ...],
"rationale": "<one sentence — what does this chain teach?>"
}}
],
"gaps": [
{{
"missing_level": "L<N>",
"between": ["<qid_lower>", "<qid_higher>"],
"rationale": "<what concept the missing question should cover to bridge these two>"
}}
]
}}
]
}}
INPUT (buckets to process):
{buckets_json}
"""
# Lenient prompt for the second-pass coverage sweep (Phase 1.4 of
# CHAIN_ROADMAP.md). Same structural envelope as STRICT, but with relaxed
# Δ rules so we can wring at least one chain out of buckets the strict pass
# rejected. Chains produced under this prompt are tagged tier=secondary.
LENIENT_PROMPT_TEMPLATE = """You are an expert ML systems educator helping curate pedagogical chains
of interview questions. A "chain" is a sequence of 2-6 questions within a
SINGLE topic that progress through Bloom levels (L1 → L2 → ... up to L6+),
where each question naturally builds on its predecessor — same scenario or
concept, increasing in cognitive demand.
You will be given several BUCKETS, each containing all published questions
for one (track, topic) pair. These are buckets a stricter first pass was
unable to chain — your job is to find at least one coherent progression
per bucket if any pedagogical clustering exists at all. Only return zero
chains for a bucket when its questions are genuinely unrelated even on
the loosest reading.
LEVEL PROGRESSION RULES (LENIENT MODE):
- Each consecutive pair of members satisfies: cand_level - prev_level ∈ {{1, 2, 3}}
- STRONGLY PREFER strict +1 progression where it exists
- +2 jumps acceptable when no Δ=1 candidate is available
- +3 jumps allowed only when no smaller intermediate exists in the bucket
- REJECT Δ=0 (same-level pair). Earlier versions of this prompt allowed
Δ=0 for "shared scenario / different angle" pairs, but in practice
that constraint did not bind — Gemini routinely produced Δ=0 chains
that were just two unrelated same-level same-topic questions.
If two same-level questions share a scenario thread, model them as
siblings (separate registry entries pointing at the anchor), not
as a chain.
- REJECT any backward step (Δ < 0)
OTHER CONSTRAINTS:
- 2 ≤ chain size ≤ 6 members
- All members from the SAME (track, topic) bucket
- A question MAY appear in UP TO 2 different chains if and only if:
(a) The question is L1 or L2 (a foundational anchor)
(b) The two chains diverge into genuinely distinct sub-progressions
AFTER this anchor — not the same arc viewed twice
(c) Each chain is individually coherent and pedagogically valuable
Default to 1 chain per question; multi-membership is the exception.
- Prefer chains where Q[i+1] genuinely builds on Q[i] (shared scenario,
sequential reasoning) over loosely related same-topic questions
- Quality still matters — but err on the side of producing at least one
chain per bucket rather than rejecting the bucket entirely
GAP DETECTION (free signal — emit alongside chains):
For each bucket, also identify "missing-rung" gaps: pedagogical arcs that
WOULD form a clean strict +1 chain if the bucket had a question at a
specific Bloom level it currently lacks. Example: bucket has L1, L3, L5
on the same scenario thread → propose a missing-L2 and missing-L4
question that would link them. These gaps drive future authoring; we
don't act on them in this pass.
Return STRICT JSON in this exact shape, no prose:
{{
"buckets": [
{{
"track": "<track>",
"topic": "<topic>",
"chains": [
{{
"questions": ["<qid1>", "<qid2>", ...],
"rationale": "<one sentence — what does this chain teach?>"
}}
],
"gaps": [
{{
"missing_level": "L<N>",
"between": ["<qid_lower>", "<qid_higher>"],
"rationale": "<what concept the missing question should cover to bridge these two>"
}}
]
}}
]
}}
INPUT (buckets to process):
{buckets_json}
"""
# Map mode -> prompt template + accepted Δ set. Single source of truth so
# build_prompt and validate_chain stay in lockstep when modes are added.
MODE_CONFIG = {
"strict": {
"prompt_template": STRICT_PROMPT_TEMPLATE,
"allowed_deltas": frozenset({1, 2}),
},
"lenient": {
"prompt_template": LENIENT_PROMPT_TEMPLATE,
"allowed_deltas": frozenset({1, 2, 3}),
},
}
# Backwards-compatible alias for any external readers — strict was the
# original (and only) prompt before Phase 1.2.
PROMPT_TEMPLATE = STRICT_PROMPT_TEMPLATE
def build_prompt(batch: list[tuple[tuple[str, str], list[str]]],
corpus: dict[str, dict],
mode: str = "strict") -> str:
payload = []
for (track, topic), qids in batch:
payload.append({
"track": track,
"topic": topic,
"questions": [question_payload(corpus, qid) for qid in qids],
})
template = MODE_CONFIG[mode]["prompt_template"]
return template.format(buckets_json=json.dumps(payload, indent=2))
def call_gemini(prompt: str, model: str = GEMINI_MODEL, timeout: int = 600) -> dict | None:
"""Run gemini -p '...' --yolo and parse JSON response.
Gemini CLI sometimes exits non-zero even when stdout contains a valid
JSON response (e.g., YOLO-mode info messages, transient 429s that the
CLI internally retries past). We try to parse stdout regardless and
only treat unparsable output as failure.
"""
try:
result = subprocess.run(
["gemini", "-m", model, "-p", prompt, "--yolo"],
capture_output=True, text=True, timeout=timeout,
)
except subprocess.TimeoutExpired:
return None
out = (result.stdout or "").strip()
# Strip code fences if present
if out.startswith("```"):
out = out.strip("`")
if out.startswith("json"):
out = out[4:].lstrip()
# Find first { ... } block
i = out.find("{")
j = out.rfind("}")
if i == -1 or j == -1:
# No JSON in stdout — surface stderr so the operator sees what happened
if result.returncode != 0:
print(f" gemini exit {result.returncode}, no JSON: {(result.stderr or '')[:200]}",
file=sys.stderr)
return None
try:
return json.loads(out[i:j+1])
except json.JSONDecodeError as e:
print(f" JSON parse failed: {e}", file=sys.stderr)
return None
def validate_chain(
chain: dict,
bucket_qids: set[str],
corpus: dict[str, dict],
mode: str = "strict",
) -> tuple[bool, str]:
"""Structural validation of a Gemini-proposed chain.
Δ-rule depends on mode:
strict → Δ ∈ {1, 2} (clean +1 progression, +2 if no intermediate)
lenient → Δ ∈ {1, 2, 3} (Δ=3 last-resort when no smaller rung exists)
Both modes reject backward steps, Δ=0 (same-level edges), and require
the chain to be single-topic.
"""
if mode not in MODE_CONFIG:
return False, f"unknown mode {mode!r}"
allowed_deltas = MODE_CONFIG[mode]["allowed_deltas"]
qs = chain.get("questions", [])
if len(qs) < 2 or len(qs) > 6:
return False, f"size {len(qs)} out of [2, 6]"
seen = set()
levels = []
topics = set()
for qid in qs:
if qid not in bucket_qids:
return False, f"qid {qid} not in bucket"
if qid in seen:
return False, f"qid {qid} duplicated"
seen.add(qid)
d = corpus[qid]
levels.append(LEVEL_RANK.get(d.get("level"), 0))
topics.add(d.get("topic"))
deltas = [levels[i+1] - levels[i] for i in range(len(levels)-1)]
bad_deltas = [d for d in deltas if d not in allowed_deltas]
if bad_deltas:
return False, (
f"levels {levels} have Δ={deltas} "
f"(need each Δ ∈ {sorted(allowed_deltas)} under mode={mode!r})"
)
if len(topics) != 1:
return False, f"multi-topic: {topics}"
return True, ""
def process_batch(batch: list[tuple[tuple[str, str], list[str]]],
corpus: dict[str, dict],
call_idx: int,
mode: str = "strict") -> tuple[list[dict], list[dict]]:
"""Call Gemini on this batch. Returns (validated_chains, raw_gaps).
In lenient mode, accepted chains carry tier="secondary"; strict-mode
chains are emitted without a tier field (primary tagging is backfilled
in the merge step — see merge_chain_passes.py / Phase 1.5).
"""
prompt = build_prompt(batch, corpus, mode=mode)
n_questions = sum(len(qids) for _, qids in batch)
print(f" [call {call_idx}] {len(batch)} buckets, {n_questions} questions, "
f"{len(prompt)//1000}K char prompt (mode={mode})")
response = call_gemini(prompt)
if response is None:
print(f" [call {call_idx}] no response")
return [], []
out_chains: list[dict] = []
out_gaps: list[dict] = []
chain_seq = 0
chain_id_suffix = "-secondary" if mode == "lenient" else ""
for bucket_resp in response.get("buckets", []):
track = bucket_resp.get("track")
topic = bucket_resp.get("topic")
bucket_qids = set()
for (t, p), qids in batch:
if t == track and p == topic:
bucket_qids = set(qids)
break
if not bucket_qids:
print(f" [call {call_idx}] response references unknown bucket ({track},{topic})")
continue
for ch in bucket_resp.get("chains", []):
ok, why = validate_chain(ch, bucket_qids, corpus, mode=mode)
if not ok:
print(f" [call {call_idx}] dropped invalid chain in {track}/{topic}: {why}")
continue
chain_seq += 1
chain_id = f"{track}-chain-auto{chain_id_suffix}-{call_idx:03d}-{chain_seq:02d}"
entry = {
"chain_id": chain_id,
"track": track,
"topic": topic,
"competency_area": corpus[ch["questions"][0]].get("competency_area"),
"levels": [corpus[qid].get("level") for qid in ch["questions"]],
"questions": [
{
"level": corpus[qid].get("level"),
"id": qid,
"title": corpus[qid].get("title", ""),
"bloom": corpus[qid].get("bloom_level"),
}
for qid in ch["questions"]
],
"rationale": ch.get("rationale", ""),
"_origin": "gemini-3.1-pro-preview",
}
if mode == "lenient":
entry["tier"] = "secondary"
out_chains.append(entry)
# Capture gap recommendations as-is (not validated structurally —
# they describe questions that DON'T exist yet). We store them for
# a follow-up authoring pass.
for gap in bucket_resp.get("gaps", []) or []:
gap_record = {
"track": track,
"topic": topic,
"missing_level": gap.get("missing_level"),
"between": gap.get("between") or [],
"rationale": gap.get("rationale", ""),
"_origin": "gemini-3.1-pro-preview",
"_source_call": call_idx,
}
out_gaps.append(gap_record)
print(f" [call {call_idx}] accepted {len(out_chains)} chain(s), "
f"{len(out_gaps)} gap(s)")
return out_chains, out_gaps
def load_buckets_filter(path: Path) -> list[tuple[str, str]]:
"""Read uncovered_buckets from a chain-coverage.json report.
Output of diagnose_chain_coverage.py — we use the ``uncovered_buckets``
array (≥3 questions, 0 chains) as the input set for Phase 1.4.
"""
data = json.loads(Path(path).read_text(encoding="utf-8"))
rows = data.get("uncovered_buckets") or []
return [(b["track"], b["topic"]) for b in rows]
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--all", action="store_true", help="Process every bucket")
ap.add_argument("--bucket", help="Process one bucket: <track>:<topic>")
ap.add_argument(
"--buckets-from",
type=Path,
help="Restrict to uncovered_buckets in a chain-coverage.json report "
"(output of diagnose_chain_coverage.py). Pair with --mode lenient.",
)
ap.add_argument("--dry-run", action="store_true", help="Show plan, don't call Gemini")
ap.add_argument("--output", default=str(DEFAULT_OUTPUT))
ap.add_argument(
"--mode",
choices=sorted(MODE_CONFIG.keys()),
default="strict",
help="strict (default): Δ ∈ {1,2}; lenient: Δ ∈ {1,2,3}, "
"tags chains tier=secondary",
)
ap.add_argument("--max-calls", type=int, default=200,
help="Daily cap (Gemini Pro is 250 calls/day; reserve some buffer)")
args = ap.parse_args()
corpus = load_corpus()
buckets = bucket_corpus(corpus)
print(f"corpus: {len(corpus)} published questions in {len(buckets)} (track, topic) buckets")
print(f"mode: {args.mode}")
selectors = [bool(args.all), bool(args.bucket), bool(args.buckets_from)]
if sum(selectors) > 1:
ap.error("--all, --bucket, and --buckets-from are mutually exclusive")
if not any(selectors):
ap.error("specify --all, --bucket <track>:<topic>, or --buckets-from <path>")
if args.bucket:
track, topic = args.bucket.split(":", 1)
if (track, topic) not in buckets:
print(f"unknown bucket: {args.bucket}")
return 1
buckets = {(track, topic): buckets[(track, topic)]}
elif args.buckets_from:
wanted = load_buckets_filter(args.buckets_from)
missing = [b for b in wanted if b not in buckets]
if missing:
print(f"WARNING: {len(missing)} buckets in coverage report not found in corpus "
f"(skipping): {missing[:3]}{'...' if len(missing) > 3 else ''}")
buckets = {b: buckets[b] for b in wanted if b in buckets}
print(f"buckets-from filter: {len(buckets)} buckets selected from "
f"{args.buckets_from.name}")
batches = plan_batches(buckets, corpus)
sizes = [sum(len(qids) for _, qids in b) for b in batches]
print(f"\nbatching plan: {len(batches)} calls")
print(f" questions/call — min {min(sizes)}, mean {sum(sizes)//len(sizes)}, max {max(sizes)}")
print(f" daily cap: {args.max_calls}; budget OK: {len(batches) <= args.max_calls}")
if args.dry_run:
return 0
if len(batches) > args.max_calls:
print(f"\nWARNING: {len(batches)} batches exceeds max-calls {args.max_calls}")
return 1
all_chains: list[dict] = []
all_gaps: list[dict] = []
gaps_path = Path(args.output).with_name(
Path(args.output).stem.replace("chains.proposed", "gaps.proposed") + ".json"
)
inter_call_delay_s = 8 # backoff: avoid Gemini-side 429 from rapid-fire calls
for i, batch in enumerate(batches, start=1):
if i > 1:
time.sleep(inter_call_delay_s)
chains, gaps = process_batch(batch, corpus, i, mode=args.mode)
all_chains.extend(chains)
all_gaps.extend(gaps)
Path(args.output).write_text(json.dumps(all_chains, indent=2) + "\n")
gaps_path.write_text(json.dumps(all_gaps, indent=2) + "\n")
print(f"\nDONE: {len(all_chains)} chains accepted across {len(batches)} calls; "
f"{len(all_gaps)} corpus gaps identified for future authoring")
print(f"output: {args.output}")
print(f"gaps: {gaps_path}")
print("review the staging file before replacing interviews/vault/chains.json")
return 0
if __name__ == "__main__":
sys.exit(main())