Files
Vijay Janapa Reddi 390cd59035 feat(staffml): add curated 79-topic taxonomy with ikigai competency model
Replace the 839 LLM-extracted concepts with 79 human-curated topics
organized into 13 competency areas. Each topic has typed edges
(prerequisite, broader, narrower, related) using SKOS vocabulary.

Introduce the ikigai competency model: 4 fundamental skills (recall,
analyze, design, implement) whose intersections produce 11 cognitive
zones for classifying HOW questions test topics.

Schema defined in LinkML (staffml_taxonomy.yaml) which generates
Pydantic, JSON Schema, and TypeScript from a single source of truth.

Key files:
- schema/staffml_taxonomy.yaml: LinkML schema definition
- schema/taxonomy_data.yaml: 79 topics + 123 typed edges
- schema/zones.py: ikigai zone model (4 skills x 11 zones)
- schema/graph.py: NetworkX graph explorer + Graphviz DOT export
- schema/resolve.py: maps corpus questions to new topic+zone system
- topics.json: simplified JSON view (auto-generated from YAML)
- topic_schema.py: Pydantic validator with DAG cycle detection
2026-03-30 23:25:57 -04:00

273 lines
9.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Topic resolver — maps old taxonomy fields to the new 79-topic system.
This is the bridge between the old corpus (primary_concept, reasoning_mode,
reasoning_competency, knowledge_area) and the new system (topic, zone).
Usage:
# As a library
from schema.resolve import resolve_topic, resolve_zone, migrate_question
# As a CLI — dry-run migration on the whole corpus
python3 resolve.py # Show mapping stats
python3 resolve.py --apply # Write topic + zone into corpus.json
python3 resolve.py --question cloud-0042 # Show mapping for one question
"""
from __future__ import annotations
import argparse
import json
import sys
from collections import Counter, defaultdict
from pathlib import Path
import yaml
VAULT = Path(__file__).resolve().parent.parent
CORPUS_PATH = VAULT / "corpus.json"
TAXONOMY_DATA = Path(__file__).resolve().parent / "taxonomy_data.yaml"
from zones import (
REASONING_MODE_TO_ZONE,
ZONE_LEVEL_AFFINITY,
ALL_ZONES,
)
# ── Load taxonomy ────────────────────────────────────────────
def load_topics() -> dict[str, dict]:
"""Load the 79 topics as {id: topic_dict}."""
with open(TAXONOMY_DATA) as f:
data = yaml.safe_load(f)
return {t["id"]: t for t in data["topics"]}
def build_concept_to_topic_map(topics: dict[str, dict]) -> dict[str, str]:
"""Build a mapping from old primary_concept values to new topic IDs.
Strategy:
1. Exact match (concept ID == topic ID)
2. Prefix match (concept starts with a topic ID)
3. Area + keyword match (fallback)
"""
mapping = {}
# All topic IDs are exact matches
for tid in topics:
mapping[tid] = tid
# Build keyword index: for each topic, extract key words from name
topic_keywords = {}
for tid, t in topics.items():
words = set(t["name"].lower().replace("&", "").replace("/", " ").split())
words.discard("the")
words.discard("and")
words.discard("for")
words.discard("a")
topic_keywords[tid] = words
return mapping, topic_keywords
def resolve_topic(concept: str, area: str, topics: dict,
mapping: dict, keywords: dict) -> str | None:
"""Resolve a primary_concept + competency_area to a topic ID."""
if not concept:
return None
# 1. Exact match
if concept in mapping:
return mapping[concept]
# 2. Prefix match — concept starts with a topic ID
for tid in sorted(topics.keys(), key=len, reverse=True):
if concept.startswith(tid):
return tid
# 3. Substring match — topic ID is contained in concept
for tid in sorted(topics.keys(), key=len, reverse=True):
if tid in concept:
return tid
# 4. Area match — find best topic in same area by keyword overlap
concept_words = set(concept.lower().replace("-", " ").split())
best_tid = None
best_score = 0
for tid, t in topics.items():
if t["area"] != area and area:
continue
score = len(concept_words & keywords[tid])
if score > best_score:
best_score = score
best_tid = tid
if best_score >= 1:
return best_tid
# 5. Last resort — pick the most general topic in the area
area_topics = [tid for tid, t in topics.items() if t["area"] == area]
if area_topics:
return area_topics[0]
return None
def resolve_zone(reasoning_mode: str | None, level: str | None) -> str:
"""Resolve old reasoning_mode + level to a new zone."""
# 1. Direct mapping from reasoning_mode
if reasoning_mode and reasoning_mode in REASONING_MODE_TO_ZONE:
return REASONING_MODE_TO_ZONE[reasoning_mode]
# 2. Infer from level
if level:
if level in ("L1", "L2"):
return "recall"
elif level == "L3":
return "fluency"
elif level == "L4":
return "diagnosis"
elif level == "L5":
return "evaluation"
elif level in ("L6", "L6+"):
return "mastery"
return "recall"
def migrate_question(q: dict, topics: dict, mapping: dict,
keywords: dict) -> dict:
"""Add topic and zone fields to a question dict."""
concept = q.get("primary_concept", "") or q.get("taxonomy_concept", "")
area = q.get("competency_area", "")
mode = q.get("reasoning_mode")
level = q.get("level")
# Normalize area
area_norm = area.lower().replace(" ", "-") if area else ""
if area_norm not in {t["area"] for t in topics.values()}:
# Try common normalizations
AREA_MAP = {
"mlops": "deployment", "serving-systems": "deployment",
"distributed-training": "parallelism", "distributed": "parallelism",
"security": "reliability", "safety": "reliability",
"sustainability": "power", "inference": "latency",
"performance": "latency", "benchmarking": "latency",
"frameworks": "optimization", "compilation": "optimization",
"model-architecture": "architecture", "data-engineering": "data",
"economics": "cross-cutting", "monitoring": "reliability",
"fault-tolerance": "reliability",
}
area_norm = AREA_MAP.get(area_norm, area_norm)
topic = resolve_topic(concept, area_norm, topics, mapping, keywords)
zone = resolve_zone(mode, level)
result = dict(q)
if topic:
result["topic"] = topic
result["zone"] = zone
return result
# ── CLI ──────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Migrate corpus to new topic+zone system")
parser.add_argument("--apply", action="store_true",
help="Write topic and zone fields into corpus.json")
parser.add_argument("--question", help="Show mapping for a single question ID")
args = parser.parse_args()
topics = load_topics()
mapping, keywords = build_concept_to_topic_map(topics)
with open(CORPUS_PATH) as f:
corpus = json.load(f)
print(f"Corpus: {len(corpus)} questions")
print(f"Topics: {len(topics)}")
print()
if args.question:
q = next((q for q in corpus if q["id"] == args.question), None)
if not q:
print(f"Question '{args.question}' not found")
sys.exit(1)
migrated = migrate_question(q, topics, mapping, keywords)
print(f"Question: {q['id']}")
print(f" Title: {q.get('title', '?')}")
print(f" Track: {q.get('track', '?')}")
print(f" Level: {q.get('level', '?')}")
print(f" Old primary_concept: {q.get('primary_concept', '?')}")
print(f" Old competency_area: {q.get('competency_area', '?')}")
print(f" Old reasoning_mode: {q.get('reasoning_mode', '?')}")
print(f" → topic: {migrated.get('topic', 'UNMAPPED')}")
print(f" → zone: {migrated.get('zone', 'UNMAPPED')}")
return
# Migrate all questions and collect stats
mapped = 0
unmapped = 0
topic_counts = Counter()
zone_counts = Counter()
unmapped_concepts = Counter()
for q in corpus:
migrated = migrate_question(q, topics, mapping, keywords)
if migrated.get("topic"):
mapped += 1
topic_counts[migrated["topic"]] += 1
else:
unmapped += 1
unmapped_concepts[q.get("primary_concept", "EMPTY")] += 1
zone_counts[migrated["zone"]] += 1
print(f"Mapped: {mapped} ({100*mapped/len(corpus):.1f}%)")
print(f"Unmapped: {unmapped} ({100*unmapped/len(corpus):.1f}%)")
print(f"\nTopic distribution (top 20):")
for t, cnt in topic_counts.most_common(20):
name = topics[t]["name"]
print(f" {name:40s} {cnt:>5}")
print(f"\nZone distribution:")
for z, cnt in sorted(zone_counts.items(), key=lambda x: -x[1]):
print(f" {z:15s} {cnt:>5}")
if unmapped_concepts:
print(f"\nTop unmapped concepts:")
for c, cnt in unmapped_concepts.most_common(15):
print(f" {c}: {cnt}")
# Coverage matrix: topic × zone
print(f"\nTopic × Zone coverage (topics with questions in 3+ zones):")
topic_zones = defaultdict(set)
for q in corpus:
m = migrate_question(q, topics, mapping, keywords)
if m.get("topic"):
topic_zones[m["topic"]].add(m["zone"])
multi_zone = {t: zones for t, zones in topic_zones.items() if len(zones) >= 3}
for t in sorted(multi_zone, key=lambda x: -len(multi_zone[x]))[:15]:
name = topics[t]["name"]
zones = ", ".join(sorted(multi_zone[t]))
print(f" {name:40s} [{len(multi_zone[t])} zones] {zones}")
if args.apply:
print(f"\nApplying migration to corpus.json...")
for i, q in enumerate(corpus):
migrated = migrate_question(q, topics, mapping, keywords)
if migrated.get("topic"):
corpus[i]["topic"] = migrated["topic"]
corpus[i]["zone"] = migrated["zone"]
with open(CORPUS_PATH, "w") as f:
json.dump(corpus, f, indent=2, ensure_ascii=False)
f.write("\n")
print(f" Done. {mapped} questions now have topic + zone fields.")
if __name__ == "__main__":
main()