Files
cs249r_book/interviews/vault-cli/scripts/apply_corrections.py
Vijay Janapa Reddi 1722133faa feat(vault-cli): apply_corrections.py — interactive accept/reject for Gemini-proposed fixes
Phase 5's interactive review tool. Reads a 01_audit.json from a
--propose-fixes run, walks rows with non-empty suggested_corrections,
shows a unified-diff per modified field, and prompts accept/reject/
edit/skip/quit. Validates every accepted body against Pydantic before
writing.

Per CORPUS_HARDENING_PLAN.md correction policy:
  - math errors: rewrite napkin_math AND realistic_solution as a unit
  - level inflation: relabel DOWN, never rewrite up to match
  - format markers: add markers without changing prose semantics

Resumable: dispositions persist to 02_dispositions.json after each
decision; re-running skips already-decided qids. --auto-accept-format
auto-accepts format-marker-only fixes (lower-risk).

Smoke-tested against the in-flight Phase 4 audit: 0 candidates (no
--propose-fixes data yet) and exits clean.

CORPUS_HARDENING_PLAN.md Phase 5.
2026-05-03 11:03:53 -04:00

481 lines
18 KiB
Python

#!/usr/bin/env python3
"""Interactive accept/reject for Gemini-proposed corrections.
Reads a 01_audit.json file produced by audit_corpus_batched.py
--propose-fixes, walks each row that has a non-empty
``suggested_corrections`` block, and prompts the operator to:
[a]ccept — apply the proposed correction(s) to the YAML
[r]eject — leave the YAML untouched
[e]dit — open $EDITOR with the correction loaded; save to apply
[s]kip — defer to a later session
[q]uit — stop reviewing; persist dispositions and exit
[h]elp — print this menu
Per the CORPUS_HARDENING_PLAN.md correction policy:
- Math errors: when math_correct=fail, the proposed correction
typically rewrites BOTH napkin_math AND realistic_solution as a
unit (the solution often depends on the napkin number). Review
them together; accept-or-reject the pair, never split.
- Level inflation: when level_fit=fail, the proposed correction
relabels DOWN (e.g. L4 → L2) — never attempts to rewrite the
question to match a higher claimed level.
- Format markers: when format_compliance=fail, the proposed
correction adds the missing markers but should NOT change the
underlying prose semantics. If the diff shows prose changes
beyond marker insertion, that's a sign the prompt drifted —
reject and re-run propose-fixes with a tighter prompt.
CORPUS_HARDENING_PLAN.md Phase 5.
Usage:
python3 interviews/vault-cli/scripts/apply_corrections.py \\
--input interviews/vault/_pipeline/runs/<dir>/01_audit.json
# Filter to a specific track or gate:
apply_corrections.py --input <path> --filter-track cloud
apply_corrections.py --input <path> --filter-gate format_compliance
# Auto-accept format-marker-only corrections (low-risk; review
# everything else):
apply_corrections.py --input <path> --auto-accept-format
# Resume an earlier session: dispositions persist to a sidecar so
# already-accepted corrections aren't re-prompted.
apply_corrections.py --input <path>
"""
from __future__ import annotations
import argparse
import difflib
import json
import os
import subprocess
import sys
import tempfile
from datetime import UTC, datetime
from pathlib import Path
# Locate vault_cli for round-trip-safe YAML I/O + Pydantic validation.
REPO_ROOT = Path(__file__).resolve().parents[3]
sys.path.insert(0, str(REPO_ROOT / "interviews" / "vault-cli" / "src"))
from vault_cli.models import Question # noqa: E402
from vault_cli.yaml_io import dump_str, load_file # noqa: E402
VAULT_DIR = REPO_ROOT / "interviews" / "vault"
QUESTIONS_DIR = VAULT_DIR / "questions"
# Disposition statuses written to the sidecar.
DISPOSITION_ACCEPTED = "accepted"
DISPOSITION_REJECTED = "rejected"
DISPOSITION_SKIPPED = "skipped"
DISPOSITION_EDITED = "edited"
DISPOSITION_FAILED = "failed-to-apply"
# ─── locating + loading ──────────────────────────────────────────────────
def find_question_file(qid: str) -> Path | None:
"""Locate a question YAML by id."""
for path in QUESTIONS_DIR.rglob(f"{qid}.yaml"):
return path
return None
def load_dispositions(path: Path) -> dict[str, dict]:
if not path.exists():
return {}
try:
d = json.loads(path.read_text(encoding="utf-8"))
return d.get("dispositions", {}) if isinstance(d, dict) else {}
except json.JSONDecodeError:
return {}
def save_dispositions(path: Path, dispositions: dict[str, dict],
input_path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
out = {
"schema_version": 1,
"generated_at": datetime.now(UTC).isoformat(timespec="seconds"),
"input_audit": str(input_path),
"dispositions": dispositions,
}
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(json.dumps(out, indent=2) + "\n", encoding="utf-8")
os.replace(tmp, path)
# ─── correction application ──────────────────────────────────────────────
def apply_correction_to_dict(body: dict, correction: dict) -> dict:
"""Apply a correction dict to a question body in-place style.
Returns a NEW dict (doesn't mutate input). Fields supported:
- title → top-level
- level → top-level (relabel-down per Q3)
- common_mistake → details.common_mistake
- napkin_math → details.napkin_math
- realistic_solution → details.realistic_solution
Other keys in correction are ignored with a warning.
"""
out = json.loads(json.dumps(body)) # deep copy via JSON round-trip
details = out.setdefault("details", {})
if "title" in correction:
out["title"] = correction["title"]
if "level" in correction:
out["level"] = correction["level"]
if "common_mistake" in correction:
details["common_mistake"] = correction["common_mistake"]
if "napkin_math" in correction:
details["napkin_math"] = correction["napkin_math"]
if "realistic_solution" in correction:
details["realistic_solution"] = correction["realistic_solution"]
unknown = set(correction.keys()) - {
"title", "level", "common_mistake", "napkin_math", "realistic_solution",
}
if unknown:
print(f" WARN: ignoring unknown correction keys: {sorted(unknown)}",
file=sys.stderr)
return out
def validate_proposed(body: dict) -> tuple[bool, str]:
"""Run Pydantic validation on the proposed body. Returns (ok, error)."""
try:
Question.model_validate(body)
return True, ""
except Exception as e:
return False, str(e)[:300]
def write_yaml(path: Path, body: dict) -> None:
"""Atomic write: temp file then os.replace."""
text = dump_str(body)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(text, encoding="utf-8")
os.replace(tmp, path)
# ─── diff rendering ──────────────────────────────────────────────────────
def render_field_diff(label: str, before: str | None, after: str | None) -> str:
"""Return a unified-diff snippet for one field; empty string if unchanged."""
if before == after:
return ""
before_lines = (before or "").splitlines(keepends=True)
after_lines = (after or "").splitlines(keepends=True)
diff = list(difflib.unified_diff(
before_lines, after_lines,
fromfile=f"{label} (current)",
tofile=f"{label} (proposed)",
n=2,
))
if not diff:
return ""
return "".join(diff)
def render_correction(body: dict, correction: dict) -> str:
"""Pretty-print the diff between current YAML body and proposed correction."""
parts: list[str] = []
details = body.get("details") or {}
for field, label in [
("title", "title"),
("level", "level"),
("common_mistake", "common_mistake"),
("napkin_math", "napkin_math"),
("realistic_solution", "realistic_solution"),
]:
if field not in correction:
continue
current = (body.get(field) if field in ("title", "level")
else details.get(field))
proposed = correction.get(field)
if current == proposed:
continue
diff = render_field_diff(label, str(current or ""), str(proposed or ""))
if diff:
parts.append(diff)
else:
parts.append(f"--- {label}: {current!r}{proposed!r}\n")
return "".join(parts) if parts else "(no field changes)\n"
# ─── interactive prompt ──────────────────────────────────────────────────
HELP_MENU = """\
[a]ccept apply the proposed correction(s) to the YAML
[r]eject leave the YAML untouched
[e]dit open the proposed YAML in $EDITOR; save to apply, exit empty to reject
[s]kip defer to a later session
[q]uit stop reviewing; save dispositions and exit
[h]elp show this menu
"""
def prompt_choice() -> str:
while True:
try:
ans = input(" [a/r/e/s/q/h]> ").strip().lower()
except EOFError:
return "q"
if ans in {"a", "r", "e", "s", "q", "h", "accept", "reject",
"edit", "skip", "quit", "help"}:
return ans[0]
def edit_in_editor(initial_body: dict) -> dict | None:
"""Open the proposed body in $EDITOR. Return the edited body, or None
if the user emptied the file or didn't change it."""
editor = os.environ.get("EDITOR", "vi")
with tempfile.NamedTemporaryFile(
mode="w", suffix=".yaml", delete=False, encoding="utf-8"
) as f:
f.write(dump_str(initial_body))
tmp_path = Path(f.name)
try:
subprocess.run([editor, str(tmp_path)], check=False)
text = tmp_path.read_text(encoding="utf-8")
if not text.strip():
return None
# Round-trip through yaml_io to apply the same hardening + Pydantic
# to ensure the edited file is valid before applying.
try:
return load_file(tmp_path)
except Exception as e:
print(f" edited file failed to parse: {e}", file=sys.stderr)
return None
finally:
tmp_path.unlink(missing_ok=True)
# ─── auto-accept heuristic ──────────────────────────────────────────────
def is_format_only_correction(correction: dict, body: dict) -> bool:
"""True iff the correction touches ONLY common_mistake and/or
napkin_math, AND the proposed text contains the canonical markers
while the current text does not.
Used by --auto-accept-format. Lower-risk than auto-accepting other
correction types because format markers are mechanical structure
additions, not semantic rewrites.
"""
keys = set(correction.keys())
if not keys.issubset({"common_mistake", "napkin_math"}):
return False
if "common_mistake" in correction:
new = correction["common_mistake"] or ""
for marker in ("**The Pitfall:**", "**The Rationale:**", "**The Consequence:**"):
if marker not in new:
return False
if "napkin_math" in correction:
new = correction["napkin_math"] or ""
if not all(m in new for m in ("**Calculations:**",)):
return False
# accepts both "Assumptions:" and "Assumptions & Constraints:"
if "**Assumptions" not in new:
return False
if "**Conclusion" not in new:
return False
return True
# ─── main loop ───────────────────────────────────────────────────────────
def review_one(
row: dict,
*,
auto_accept_format: bool,
) -> tuple[str, str]:
"""Returns (disposition, message). May write to disk."""
qid = row.get("qid")
correction = row.get("suggested_corrections") or {}
if not correction:
return DISPOSITION_SKIPPED, "no correction proposed"
yaml_path = find_question_file(qid)
if not yaml_path:
return DISPOSITION_FAILED, f"YAML not found for {qid}"
try:
body = load_file(yaml_path)
except Exception as e:
return DISPOSITION_FAILED, f"YAML load failed: {e}"
if not isinstance(body, dict):
return DISPOSITION_FAILED, "YAML did not parse to a dict"
# Print summary of the row's gate verdicts, then the diff.
print(f"\n─── {qid} ─── [{body.get('track')}/{body.get('level')}] "
f"{body.get('title', '')[:60]}")
gate_summary = ", ".join(
f"{g}={row.get(g, '?')}"
for g in ("format_compliance", "level_fit", "coherence",
"math_correct", "title_quality")
)
print(f" gates: {gate_summary}")
if row.get("level_fit_rationale") or row.get("coherence_rationale"):
for k in ("level_fit_rationale", "coherence_rationale"):
if row.get(k):
print(f" {k}: {row[k]}")
print()
print(render_correction(body, correction))
if auto_accept_format and is_format_only_correction(correction, body):
print(" [auto-accept] format-marker-only correction")
choice = "a"
else:
choice = prompt_choice()
if choice == "h":
print(HELP_MENU)
return review_one(row, auto_accept_format=auto_accept_format)
if choice == "q":
return "quit", ""
if choice == "s":
return DISPOSITION_SKIPPED, ""
if choice == "r":
return DISPOSITION_REJECTED, ""
if choice == "e":
proposed_body = apply_correction_to_dict(body, correction)
edited = edit_in_editor(proposed_body)
if edited is None:
return DISPOSITION_REJECTED, "editor returned empty"
ok, why = validate_proposed(edited)
if not ok:
return DISPOSITION_FAILED, f"edited body fails validation: {why}"
write_yaml(yaml_path, edited)
return DISPOSITION_EDITED, ""
# accept
proposed_body = apply_correction_to_dict(body, correction)
ok, why = validate_proposed(proposed_body)
if not ok:
return DISPOSITION_FAILED, f"proposed body fails validation: {why}"
write_yaml(yaml_path, proposed_body)
return DISPOSITION_ACCEPTED, ""
def main() -> int:
ap = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
ap.add_argument("--input", type=Path, required=True,
help="path to 01_audit.json from a --propose-fixes run")
ap.add_argument("--dispositions-out", type=Path, default=None,
help="sidecar JSON to persist accept/reject decisions "
"(default: <input-dir>/02_dispositions.json)")
ap.add_argument("--filter-track", type=str, default=None,
help="only review qids in this track")
ap.add_argument("--filter-gate", type=str, default=None,
help="only review rows where this gate failed "
"(format_compliance / level_fit / coherence / "
"math_correct / title_quality)")
ap.add_argument("--auto-accept-format", action="store_true",
help="auto-accept format-marker-only corrections "
"(lower-risk: just adds the canonical markers)")
ap.add_argument("--limit", type=int, default=None,
help="cap how many corrections to review this session")
args = ap.parse_args()
if not args.input.exists():
print(f"input not found: {args.input}", file=sys.stderr)
return 1
audit = json.loads(args.input.read_text(encoding="utf-8"))
rows = audit.get("rows", [])
print(f"loaded {len(rows)} audit rows from {args.input}")
# Filter to rows that have a correction.
candidates = [r for r in rows if r.get("suggested_corrections")]
print(f" with proposed corrections: {len(candidates)}")
if args.filter_track:
# Need to look up track from the YAML since rows don't always carry it.
before = len(candidates)
filtered = []
for r in candidates:
yp = find_question_file(r.get("qid"))
if not yp:
continue
try:
b = load_file(yp)
if b.get("track") == args.filter_track:
filtered.append(r)
except Exception:
continue
candidates = filtered
print(f" after --filter-track={args.filter_track}: "
f"{len(candidates)} (was {before})")
if args.filter_gate:
candidates = [r for r in candidates if r.get(args.filter_gate) == "fail"]
print(f" after --filter-gate={args.filter_gate}: {len(candidates)}")
# Load prior dispositions; skip qids already accepted/rejected.
disp_path = args.dispositions_out or (args.input.parent / "02_dispositions.json")
dispositions = load_dispositions(disp_path)
candidates = [r for r in candidates
if dispositions.get(r.get("qid"), {}).get("disposition")
not in {DISPOSITION_ACCEPTED, DISPOSITION_REJECTED,
DISPOSITION_EDITED, DISPOSITION_FAILED}]
print(f" remaining (not yet accepted/rejected/edited): {len(candidates)}")
if args.limit:
candidates = candidates[: args.limit]
print(f" capped at --limit={args.limit}")
if not candidates:
print("nothing to review.")
return 0
counters = {DISPOSITION_ACCEPTED: 0, DISPOSITION_REJECTED: 0,
DISPOSITION_EDITED: 0, DISPOSITION_SKIPPED: 0,
DISPOSITION_FAILED: 0}
for i, row in enumerate(candidates, start=1):
print(f"\n[{i}/{len(candidates)}]", end="")
result, msg = review_one(row, auto_accept_format=args.auto_accept_format)
if result == "quit":
print("\n[quit] stopping; persisting dispositions...")
break
counters[result] = counters.get(result, 0) + 1
dispositions[row["qid"]] = {
"disposition": result,
"decided_at": datetime.now(UTC).isoformat(timespec="seconds"),
"message": msg,
}
save_dispositions(disp_path, dispositions, args.input)
print()
print("session summary:")
for k, v in counters.items():
print(f" {k:20s} {v}")
print(f"\nwrote dispositions to {disp_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())