mirror of
https://github.com/harvard-edge/cs249r_book.git
synced 2026-05-22 14:03:46 -05:00
apply_format_skip_level.py applies marker-compliant common_mistake / napkin_math corrections for published qids whose proposed fix got skipped during Phase 5 because the row was entangled with a level relabel (relabel-up or chain-monotonicity-block) or a high-risk realistic_solution rewrite. The script applies ONLY the format fields when the current YAML's value is malformed AND the proposed value matches the AUTHORING.md markers. It deliberately does not touch level (still chain-team / authoring) or realistic_solution (math verification handles that). Phase 6 pre-flight: a survey on 2026-05-04 found 77 published YAMLs with malformed markers. This pass fixes 41 of them. Remaining 36 have no marker-compliant proposal in the audit and need a fresh authoring round before the LinkML pattern can land cleanly.
154 lines
5.4 KiB
Python
154 lines
5.4 KiB
Python
#!/usr/bin/env python3
|
|
"""Apply marker-compliant common_mistake / napkin_math corrections for
|
|
published qids whose proposed format fix got skipped during Phase 5.
|
|
|
|
Phase 6 (schema tightening) wants a LinkML pattern requiring the
|
|
authoring markers (Pitfall/Rationale/Consequence and
|
|
Assumptions/Calculations/Conclusion). A pre-flight survey on
|
|
2026-05-04 found ~77 published YAMLs that still have malformed
|
|
markers. About 49 of those have a marker-compliant proposed cm/nm fix
|
|
in the audit's suggested_corrections, but mass_apply skipped them
|
|
because they were entangled with a level relabel that hit relabel-up
|
|
or chain-monotonicity-block, OR they had a realistic_solution that
|
|
classified the row as high-risk.
|
|
|
|
This script applies ONLY the cm/nm fields when:
|
|
1. The current YAML's field is malformed (fails the marker regex), AND
|
|
2. The audit's proposed value IS marker-compliant.
|
|
|
|
It deliberately ignores level (still a chain-team / authoring decision)
|
|
and realistic_solution (handled by the math-verify pipeline).
|
|
|
|
Usage:
|
|
|
|
python3 interviews/vault-cli/scripts/apply_format_skip_level.py \\
|
|
--audit interviews/vault/_pipeline/runs/full-corpus-20260503-merged/01_audit.json
|
|
|
|
CORPUS_HARDENING_PLAN.md Phase 6 — format-skip-level cleanup.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from collections import Counter
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[3]
|
|
sys.path.insert(0, str(REPO_ROOT / "interviews" / "vault-cli" / "src"))
|
|
|
|
from vault_cli.models import Question # noqa: E402
|
|
from vault_cli.yaml_io import dump_str, load_file # noqa: E402
|
|
|
|
QUESTIONS_DIR = REPO_ROOT / "interviews" / "vault" / "questions"
|
|
|
|
CM_PATTERN = re.compile(
|
|
r"(?s).*\*\*The Pitfall:\*\*.*\*\*The Rationale:\*\*.*\*\*The Consequence:\*\*.*"
|
|
)
|
|
NM_PATTERN = re.compile(
|
|
r"(?s).*\*\*Assumptions.*\*\*Calculations:\*\*.*\*\*Conclusion.*"
|
|
)
|
|
|
|
|
|
def find_question_file(qid: str) -> Path | None:
|
|
for p in QUESTIONS_DIR.rglob(f"{qid}.yaml"):
|
|
return p
|
|
return None
|
|
|
|
|
|
def write_yaml(path: Path, body: dict) -> None:
|
|
text = dump_str(body)
|
|
tmp = path.with_suffix(path.suffix + ".tmp")
|
|
tmp.write_text(text, encoding="utf-8")
|
|
os.replace(tmp, path)
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser(description=__doc__,
|
|
formatter_class=argparse.RawDescriptionHelpFormatter)
|
|
ap.add_argument("--audit", type=Path, required=True,
|
|
help="01_audit.json with suggested_corrections")
|
|
ap.add_argument("--dry-run", action="store_true",
|
|
help="show plan without writing")
|
|
args = ap.parse_args()
|
|
|
|
audit = json.loads(args.audit.read_text(encoding="utf-8"))
|
|
rows_by_qid = {r["qid"]: r for r in audit["rows"]}
|
|
|
|
counters: Counter[str] = Counter()
|
|
dispositions: list[dict] = []
|
|
|
|
for yp in QUESTIONS_DIR.rglob("*.yaml"):
|
|
body = load_file(yp)
|
|
if not isinstance(body, dict):
|
|
continue
|
|
if body.get("status") != "published":
|
|
continue
|
|
qid = body.get("id")
|
|
row = rows_by_qid.get(qid)
|
|
if not row:
|
|
continue
|
|
sc = row.get("suggested_corrections") or {}
|
|
|
|
details = body.get("details") or {}
|
|
cm_now = (details.get("common_mistake") or "").strip()
|
|
nm_now = (details.get("napkin_math") or "").strip()
|
|
cm_bad = bool(cm_now) and not CM_PATTERN.match(cm_now)
|
|
nm_bad = bool(nm_now) and not NM_PATTERN.match(nm_now)
|
|
if not (cm_bad or nm_bad):
|
|
continue
|
|
|
|
proposed = json.loads(json.dumps(body))
|
|
pdetails = proposed.setdefault("details", {})
|
|
applied_fields: list[str] = []
|
|
|
|
if cm_bad and sc.get("common_mistake") and CM_PATTERN.match(sc["common_mistake"]):
|
|
pdetails["common_mistake"] = sc["common_mistake"]
|
|
applied_fields.append("common_mistake")
|
|
if nm_bad and sc.get("napkin_math") and NM_PATTERN.match(sc["napkin_math"]):
|
|
pdetails["napkin_math"] = sc["napkin_math"]
|
|
applied_fields.append("napkin_math")
|
|
|
|
if not applied_fields:
|
|
counters["no-marker-compliant-fix"] += 1
|
|
dispositions.append({"qid": qid, "result": "no-marker-compliant-fix",
|
|
"cm_bad": cm_bad, "nm_bad": nm_bad})
|
|
continue
|
|
|
|
try:
|
|
Question.model_validate(proposed)
|
|
except Exception as e:
|
|
counters["pydantic-fail"] += 1
|
|
dispositions.append({"qid": qid, "result": "pydantic-fail",
|
|
"error": str(e)[:300]})
|
|
continue
|
|
|
|
if args.dry_run:
|
|
print(f" [dry] {qid}: would apply {applied_fields}")
|
|
else:
|
|
write_yaml(yp, proposed)
|
|
counters["applied"] += 1
|
|
dispositions.append({"qid": qid, "result": "applied",
|
|
"fields": applied_fields})
|
|
|
|
print(f"\ncounters: {dict(counters)}")
|
|
|
|
out_path = args.audit.parent / "06_format_skip_level.json"
|
|
if not args.dry_run:
|
|
out_path.write_text(json.dumps({
|
|
"generated_at": datetime.now(UTC).isoformat(timespec="seconds"),
|
|
"summary": dict(counters),
|
|
"dispositions": dispositions,
|
|
}, indent=2) + "\n", encoding="utf-8")
|
|
print(f"wrote {out_path}")
|
|
|
|
return 0 if counters["applied"] > 0 else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|