mirror of
https://github.com/harvard-edge/cs249r_book.git
synced 2026-05-07 18:18:42 -05:00
Remove pedagogy module, add provenance tracking and design space exploration. Update evaluation engine, pipeline callbacks, and documentation including new tutorials.
274 lines
9.7 KiB
Plaintext
274 lines
9.7 KiB
Plaintext
---
|
|
title: "Extending the Engine"
|
|
subtitle: "The 3-Tier API Contract for building custom Models, Solvers, and Optimizers."
|
|
---
|
|
|
|
MLSys·im is designed to be fully extensible. Researchers and students can add custom analytical tools to resolve new constraints or search new design spaces.
|
|
|
|
To ensure mathematical rigor and prevent "spaghetti code," all extensions must adhere to the **3-Tier API Contract**. This contract forces you to explicitly define the mathematical nature of the tool you are building.
|
|
|
|
---
|
|
|
|
## The 3-Tier API Contract
|
|
|
|
Before you write code, ask yourself: *What kind of math am I doing?*
|
|
|
|
### 1. `BaseModel`: The Physics Engine
|
|
* **The Math:** $Y = f(X)$. Forward propagation.
|
|
* **When to use:** You want to evaluate a physical or logical state. You have a fixed hardware config and a fixed workload, and you want to predict latency, cost, memory footprint, or energy.
|
|
* **Rule:** A Model **cannot** make decisions or loop through options. It must be a deterministic calculation of a single state.
|
|
|
|
### 2. `BaseSolver`: The Math Engine
|
|
* **The Math:** $X = f^{-1}(Y)$ or $
|
|
abla f$. Algebraic inversion or calculus.
|
|
* **When to use:** You have a specific target (like a latency SLA or a memory budget) and you want to algebraically solve for the exact hardware or model parameter required to hit it.
|
|
* **Rule:** A Solver should yield a mathematically precise answer derived from inverting a Model's equations.
|
|
|
|
### 3. `BaseOptimizer`: The Engineering Engine
|
|
* **The Math:** $\max_{x \in X} f(x) ext{ s.t. } g(x) \le c$. Constrained optimization.
|
|
* **When to use:** You want to search a design space (discrete or continuous) to find the "best" configuration, balancing competing trade-offs (e.g., maximizing throughput while minimizing carbon).
|
|
* **Rule:** An Optimizer must internally call `Models` to evaluate candidates. It must return an `OptimizerResult` tracking the objective value and the size of the search space.
|
|
|
|
---
|
|
|
|
## 1. Building a Custom Model
|
|
|
|
Every resolver follows the same pattern: declare inputs (`requires`), declare outputs (`produces`), and implement `solve()`.
|
|
|
|
Let's build a custom `PowerEfficiencyModel` that calculates TFLOPs per Watt.
|
|
|
|
```python
|
|
from mlsysim.core.solver import BaseModel
|
|
from mlsysim.core.results import SolverResult
|
|
from mlsysim.core.constants import Q_
|
|
from mlsysim.hardware.types import HardwareNode
|
|
from mlsysim.core.types import Quantity
|
|
|
|
# 1. Define the strictly typed Output
|
|
class PowerEfficiencyResult(SolverResult):
|
|
flops_per_watt: Quantity
|
|
is_efficient: bool
|
|
|
|
# 2. Implement the Model
|
|
class PowerEfficiencyModel(BaseModel):
|
|
"""Evaluates the compute efficiency per watt of an accelerator."""
|
|
requires = ("hardware",)
|
|
produces = PowerEfficiencyResult
|
|
|
|
def solve(self, hardware: HardwareNode) -> PowerEfficiencyResult:
|
|
if hardware.tdp is None:
|
|
raise ValueError(f"{hardware.name} has no TDP specified.")
|
|
|
|
fpw = hardware.compute.peak_flops / hardware.tdp
|
|
|
|
threshold = Q_("1 TFLOPs/s / W")
|
|
is_eff = fpw > threshold
|
|
|
|
return PowerEfficiencyResult(
|
|
flops_per_watt=fpw.to("TFLOPs/s/W"),
|
|
is_efficient=is_eff
|
|
)
|
|
```
|
|
|
|
---
|
|
|
|
## 2. Building a Custom Solver
|
|
|
|
A solver algebraically inverts an equation. For example, if $T = \frac{W}{BW}$, and we have a target $T$, we solve for $BW = \frac{W}{T}$.
|
|
|
|
```python
|
|
from mlsysim.core.solver import BaseSolver
|
|
from mlsysim.core.results import SolverResult
|
|
from mlsysim.models.types import Workload
|
|
from mlsysim.core.types import Quantity
|
|
|
|
class RequiredBandwidthResult(SolverResult):
|
|
required_bw: Quantity
|
|
|
|
class RequiredBandwidthSolver(BaseSolver):
|
|
"""Solves for the exact memory bandwidth needed to hit an SLA."""
|
|
requires = ("workload", "target_latency")
|
|
produces = RequiredBandwidthResult
|
|
|
|
def solve(self, model: Workload, target_latency: Quantity) -> RequiredBandwidthResult:
|
|
weight_bytes = model.size_in_bytes()
|
|
t_target = target_latency.to("s")
|
|
|
|
# Algebraic inversion
|
|
required_bw = (weight_bytes / t_target).to("GB/s")
|
|
|
|
return RequiredBandwidthResult(required_bw=required_bw)
|
|
```
|
|
|
|
---
|
|
|
|
## 3. Building a Custom Optimizer
|
|
|
|
An Optimizer explores a design space. It MUST inherit from `BaseOptimizer` and its result MUST inherit from `OptimizerResult`.
|
|
|
|
Let's build a `CheapestHardwareOptimizer` that searches the `HardwareZoo` for the cheapest chip that satisfies a minimum TFLOP requirement.
|
|
|
|
```python
|
|
from mlsysim.core.solver import BaseOptimizer
|
|
from mlsysim.core.results import OptimizerResult
|
|
from mlsysim.hardware.registry import Hardware
|
|
from typing import Dict, Any
|
|
|
|
# Inherit from OptimizerResult, which requires specific fields
|
|
class CheapestHardwareResult(OptimizerResult):
|
|
cheapest_cost: float
|
|
hardware_name: str
|
|
|
|
class CheapestHardwareOptimizer(BaseOptimizer):
|
|
requires = ("min_tflops",)
|
|
produces = CheapestHardwareResult
|
|
|
|
def solve(self, min_tflops: float) -> CheapestHardwareResult:
|
|
candidates = []
|
|
target = Q_(min_tflops, "TFLOPs/s")
|
|
|
|
# 1. Define Search Space
|
|
for hw in Hardware.list():
|
|
if hw.unit_cost is None:
|
|
continue
|
|
|
|
# 2. Evaluate Constraint
|
|
if hw.compute.peak_flops >= target:
|
|
candidates.append({
|
|
"name": hw.name,
|
|
"cost": hw.unit_cost.magnitude
|
|
})
|
|
|
|
if not candidates:
|
|
raise ValueError("No hardware meets the requirement.")
|
|
|
|
# 3. Optimize Objective (Minimize cost)
|
|
best = min(candidates, key=lambda x: x["cost"])
|
|
|
|
# 4. Return standard OptimizerResult structure
|
|
return CheapestHardwareResult(
|
|
objective_value=best["cost"], # Standard field
|
|
best_config={"hardware": best["name"]}, # Standard field
|
|
total_searched=len(Hardware.list()), # Standard field
|
|
cheapest_cost=best["cost"],
|
|
hardware_name=best["name"]
|
|
)
|
|
```
|
|
|
|
---
|
|
|
|
## 4. Composable Pipelines & Callbacks
|
|
|
|
MLSys·im doesn't just evaluate models in a vacuum. It uses a **Composable Pipeline** architecture. You can snap your custom solvers into a pipeline and execute them sequentially, with outputs from earlier stages passing automatically to later stages.
|
|
|
|
```python
|
|
from mlsysim.core.pipeline import Pipeline
|
|
from mlsysim.core.solver import DistributedModel, EconomicsModel
|
|
|
|
# Build a pipeline with your custom solver in the middle
|
|
my_pipeline = Pipeline([
|
|
DistributedModel(),
|
|
PowerEfficiencyModel(), # Your custom model
|
|
EconomicsModel()
|
|
])
|
|
|
|
# Run the pipeline
|
|
results = my_pipeline.run(
|
|
model=mlsysim.Models.Llama3_8B,
|
|
fleet=mlsysim.Systems.Clusters.Frontier_8K,
|
|
duration_days=30
|
|
)
|
|
|
|
# Access individual stage results
|
|
print(results["PowerEfficiencyModel"].flops_per_watt)
|
|
```
|
|
|
|
### Callbacks (Middleware)
|
|
You can intercept the pipeline execution to log metrics to MLOps platforms like Weights & Biases or Datadog by implementing a callback.
|
|
|
|
```python
|
|
class WandbLogger:
|
|
def on_stage_end(self, stage_name: str, result: SolverResult):
|
|
# Example pseudo-code
|
|
wandb.log({f"{stage_name}/{k}": v for k, v in result.dict().items()})
|
|
|
|
my_pipeline.register_callback(WandbLogger())
|
|
```
|
|
|
|
---
|
|
|
|
## 5. Registering Your Custom Resolver (Plugins)
|
|
|
|
You don't need to fork `mlsysim` to use your custom models. You can package your code as a standard Python module and register it via `pyproject.toml` entry points. When a user installs your package, `mlsysim` will automatically discover and load your custom solvers into the engine.
|
|
|
|
In your plugin's `pyproject.toml`, add:
|
|
|
|
```toml
|
|
[project.entry-points."mlsysim.solvers"]
|
|
power_efficiency = "my_custom_package.solvers:PowerEfficiencyModel"
|
|
required_bandwidth = "my_custom_package.solvers:RequiredBandwidthSolver"
|
|
cheapest_hardware = "my_custom_package.solvers:CheapestHardwareOptimizer"
|
|
```
|
|
|
|
Once installed via `pip install .`, you can check that `mlsysim` has discovered your models by running:
|
|
|
|
```python
|
|
from mlsysim.core.resolver_factory import ResolverFactory
|
|
|
|
# This will now include 'PowerEfficiencyModel', etc.
|
|
print(ResolverFactory.list_available().keys())
|
|
```
|
|
|
|
---
|
|
|
|
## 6. Testing Your Extension
|
|
|
|
Every custom resolver should have at least one unit test that verifies dimensional correctness and expected output:
|
|
|
|
```python
|
|
import pytest
|
|
from mlsysim import Hardware
|
|
|
|
def test_power_efficiency_h100():
|
|
model = PowerEfficiencyModel()
|
|
result = model.solve(hardware=Hardware.Cloud.H100)
|
|
|
|
assert result.flops_per_watt.units == "TFLOPs/s/W"
|
|
assert result.flops_per_watt.magnitude > 0
|
|
assert isinstance(result.is_efficient, bool)
|
|
|
|
def test_power_efficiency_no_tdp():
|
|
"""Hardware without TDP should raise ValueError."""
|
|
from mlsysim.hardware.types import HardwareNode, ComputeCore, MemoryHierarchy
|
|
from mlsysim.core.constants import Q_
|
|
|
|
hw = HardwareNode(
|
|
name="NoTDP",
|
|
release_year=2024,
|
|
compute=ComputeCore(peak_flops=Q_("100 TFLOPs/s")),
|
|
memory=MemoryHierarchy(capacity=Q_("80 GB"), bandwidth=Q_("2 TB/s")),
|
|
tdp=None,
|
|
)
|
|
with pytest.raises(ValueError, match="has no TDP"):
|
|
PowerEfficiencyModel().solve(hardware=hw)
|
|
```
|
|
|
|
Run tests with:
|
|
|
|
```bash
|
|
pytest tests/test_my_extension.py -v
|
|
```
|
|
|
|
---
|
|
|
|
## Why Strict Typing?
|
|
|
|
By forcing inputs and outputs to use `pint.Quantity`, MLSys·im guarantees dimensional consistency. The `Pipeline` module uses these class signatures (`requires` and `produces`) to automatically stitch different Models, Solvers, and Optimizers together into a single execution DAG.
|
|
|
|
This means your custom extension automatically works with:
|
|
|
|
- **`Engine.sweep()`** — sweep your model across hardware
|
|
- **`SystemEvaluator.evaluate()`** — include your model in the full scorecard
|
|
- **The CLI** — expose your extension via `mlsysim eval` with YAML input
|
|
- **MCP agents** — AI agents can discover and invoke your extension
|