feat(mlsysim): align analytical solvers with industry-standard literature

Updated solvers to use literature-grade models for:
- Roofline Performance (Williams et al. 2009)
- Transformer Scaling (6PD rule, Kaplan et al. 2020)
- Training Memory (Shoeybi et al. 2019)
- Pipeline Parallelism (Huang et al. 2019)
- LLM Serving (Pope et al. 2023)
- Reliability (Young-Daly 1974/2006)

Introduced Hierarchical Communication Modeling and MFU/HFU metrics.
Fixed test suite imports and return key mismatches.
Updated Smart Doorbell scorecard reference in ml_systems.qmd.
Restored core __init__.py exports for backward compatibility.
This commit is contained in:
Vijay Janapa Reddi
2026-03-07 15:02:26 -05:00
parent 99925bed34
commit f213260153
11 changed files with 1008 additions and 73 deletions

View File

@@ -719,7 +719,7 @@ class LighthouseModels:
gpt2_params_b = m_gpt2.parameters.m_as(Bparam)
# Step 1: DLRM Embedding Size
dlrm_embedding_gb = m_dlrm.model_size.m_as(GB)
dlrm_embedding_gb = m_dlrm.size_in_bytes().m_as(GB)
# MobileNet
# Step 2: ResNet-50 ~4.1 GFLOPs, MobileNetV2 ~300 MFLOPs
@@ -764,7 +764,7 @@ Throughout this book, we use five Lighthouse Models introduced in @sec-introduct
:::
To ground the abstract interdependencies of the Iron Law in concrete practice, we analyze the Lighthouse Models introduced in @sec-introduction. The following summaries recap each workload from a systems perspective, connecting them to the specific Iron Law bottlenecks they exemplify.
To ground the abstract interdependencies of the Iron Law in concrete practice, we analyze the Lighthouse Models introduced in @sec-introduction. The following summaries recap each workload from a systems perspective, connecting them to the specific Iron Law bottlenecks they exemplify, as visualized in the scorecard for our central Smart Doorbell narrative (@fig-doorbell-scorecard).
The first lighthouse, **ResNet-50**\index{ResNet-50!systems characteristics}, classifies images into 1,000 categories, processing each image through approximately `{python} LighthouseModels.resnet_gflops_str` billion floating-point operations using `{python} LighthouseModels.resnet_params_m_str` million parameters (`{python} LighthouseModels.resnet_fp32_mb_str` MB at FP32). Used in medical imaging diagnostics, autonomous vehicle perception pipelines, and as the backbone for content moderation systems, its regular, compute-dense structure makes it the canonical benchmark for hardware accelerator performance.
@@ -776,6 +776,20 @@ The mobile lighthouse, **MobileNet**\index{MobileNet!depthwise separable convolu
The TinyML lighthouse, **Keyword Spotting (KWS)**\index{Keyword Spotting (KWS)!TinyML archetype}, represents the always-on sensing archetype. Used in applications like Smart Doorbells, it detects wake words ("Ding Dong", "Hello") using a depthwise separable CNN with approximately `{python} LighthouseModels.kws_params_str` parameters (small variants; the DS-CNN benchmark in MLPerf Tiny uses ~200K) fitting in under `{python} LighthouseModels.kws_size_kb_str` KB, running continuously at under 1 milliwatt.
::: {#fig-doorbell-scorecard fig-env="figure" fig-pos="t" fig-cap="**The Hierarchy of Constraints: Smart Doorbell Scorecard.** This visual evaluation of the Smart Doorbell lighthouse reveals the fundamental systems trade-off. While the model successfully fits within the kilobyte-scale memory budget (Level 1: PASS), it fails the real-time latency requirement (Level 2: FAIL) on the ESP32-S3 at baseline precision. This indicates that further optimization—such as quantization or architectural pruning—is mandatory before deployment." fig-alt="A horizontal bar chart showing two levels of constraints. Level 1: Memory (RAM) shows a green bar (PASS). Level 2: Latency (SLA) shows a red bar (FAIL), exceeding the limit line."}
```{python}
#| echo: false
#| label: doorbell-scorecard
import mlsysim
import matplotlib.pyplot as plt
doorbell_eval = mlsysim.Applications.Doorbell.evaluate()
fig, ax = mlsysim.plot_evaluation_scorecard(doorbell_eval)
plt.show()
```
:::
The huge range in compute requirements (20 MFLOPs → 4 GFLOPs) and memory (800 KB → 100 GB) explains why no single deployment paradigm fits all workloads. A keyword spotter runs comfortably on a \$2 microcontroller; a recommendation system requires a warehouse-scale computer. These five Lighthouse Models will serve as concrete anchors throughout the book, each isolating a distinct system bottleneck that we will revisit in every chapter.
Analytical tools alone remain abstract until grounded in real silicon. The next step translates the Iron Law, Bottleneck Principle, and Workload Archetypes into quantitative engineering decisions by examining how system balance (the interplay of compute, memory, and I/O) varies across real hardware platforms.
@@ -978,11 +992,11 @@ class ResnetCloud:
cloud_stats = calc_bottleneck(
ops=RESNET50_FLOPs,
model_bytes=ResnetSetup.resnet_fp16_bytes_value, # from resnet-setup cell
device_flops=h_a100.peak_flops,
device_bw=h_a100.memory_bw,
device_flops=h_a100.compute.peak_flops,
device_bw=h_a100.memory.bandwidth,
)
a100_tflops_value = h_a100.peak_flops.m_as(TFLOPs / second)
a100_bw_tbs_value = h_a100.memory_bw.m_as(TB / second)
a100_tflops_value = h_a100.compute.peak_flops.m_as(TFLOPs / second)
a100_bw_tbs_value = h_a100.memory.bandwidth.m_as(TB / second)
cloud_compute_ms_value = cloud_stats["compute_ms"]
cloud_memory_ms_value = cloud_stats["memory_ms"]
cloud_ratio_x_value = cloud_stats["ratio"]
@@ -991,9 +1005,9 @@ class ResnetCloud:
# --- LaTeX fraction components (for nice rendering) ---
resnet_flops_latex = sci_latex(RESNET50_FLOPs.to(flop))
a100_flops_latex = sci_latex(h_a100.peak_flops.to(flop / second))
a100_flops_latex = sci_latex(h_a100.compute.peak_flops.to(flop / second))
resnet_fp16_bytes_latex = sci_latex(ResnetSetup.resnet_fp16_bytes_value.to(byte))
a100_bw_latex = sci_latex(h_a100.memory_bw.to(byte / second))
a100_bw_latex = sci_latex(h_a100.memory.bandwidth.to(byte / second))
cloud_compute_frac = md_frac(resnet_flops_latex, a100_flops_latex, f"{cloud_compute_ms_value:.3f}", "ms")
cloud_memory_frac = md_frac(resnet_fp16_bytes_latex, a100_bw_latex, f"{cloud_memory_ms_value:.3f}", "ms")
cloud_ai_frac = md_frac(resnet_flops_latex, resnet_fp16_bytes_latex, f"{cloud_ai_value:.0f}", "FLOPs/byte")
@@ -1041,31 +1055,31 @@ class ResnetMobile:
"""Namespace for Resnet Mobile."""
# ┌── 2. EXECUTE (The Compute) ────────────────────────────────────────
h_phone = Hardware.Edge.Generic_Phone
h_phone = Hardware.Mobile.iPhone15Pro
m_resnet = Models.ResNet50
h_a100 = Hardware.A100
mobile_stats = calc_bottleneck(
ops=m_resnet.inference_flops,
model_bytes=ResnetSetup.resnet_int8_bytes_value, # from resnet-setup cell
device_flops=h_phone.peak_flops,
device_bw=h_phone.memory_bw,
device_flops=h_phone.compute.peak_flops,
device_bw=h_phone.memory.bandwidth,
)
mobile_tops_value = h_phone.peak_flops.m_as(TFLOPs / second)
mobile_bw_gbs_value = h_phone.memory_bw.m_as(GB / second)
mobile_tops_value = h_phone.compute.peak_flops.m_as(TFLOPs / second)
mobile_bw_gbs_value = h_phone.memory.bandwidth.m_as(GB / second)
mobile_compute_ms_value = mobile_stats["compute_ms"]
mobile_memory_ms_value = mobile_stats["memory_ms"]
mobile_ratio_x_value = mobile_stats["ratio"]
mobile_bottleneck_value = mobile_stats["bottleneck"]
# --- Cross-platform comparison ---
bw_advantage_x_value = h_a100.memory_bw / h_phone.memory_bw
bw_advantage_x_value = h_a100.memory.bandwidth / h_phone.memory.bandwidth
inference_speed_x_value = mobile_memory_ms_value / ResnetCloud.cloud_stats["memory_ms"] # uses cloud_stats
# --- LaTeX fraction components (for nice rendering) ---
mobile_npu_flops_latex = sci_latex(h_phone.peak_flops.to(flop / second))
mobile_npu_flops_latex = sci_latex(h_phone.compute.peak_flops.to(flop / second))
resnet_int8_bytes_latex = sci_latex(ResnetSetup.resnet_int8_bytes_value.to(byte))
mobile_npu_bw_latex = sci_latex(h_phone.memory_bw.to(byte / second))
mobile_npu_bw_latex = sci_latex(h_phone.memory.bandwidth.to(byte / second))
mobile_compute_frac = md_frac(ResnetCloud.resnet_flops_latex, mobile_npu_flops_latex, f"{mobile_compute_ms_value:.2f}", "ms")
mobile_memory_frac = md_frac(resnet_int8_bytes_latex, mobile_npu_bw_latex, f"{mobile_memory_ms_value:.2f}", "ms")
@@ -1150,12 +1164,12 @@ class MobileHardwareSpecs:
"""Namespace for mobile hardware specification ranges."""
# ┌── 1. LOAD (Constants) ───────────────────────────────────────────────
h_phone = Hardware.Edge.Generic_Phone
h_phone = Hardware.Mobile.iPhone15Pro
mobile_ram_range = MOBILE_RAM_RANGE_GB
mobile_storage_range = MOBILE_STORAGE_RANGE
# ┌── 2. EXECUTE (The Compute) ─────────────────────────────────────────
mobile_bw_range = f"{int(h_phone.memory_bw.m_as('GB/s')/2)}-{int(h_phone.memory_bw.m_as('GB/s'))}"
mobile_bw_range = f"{int(h_phone.memory.bandwidth.m_as('GB/s')/2)}-{int(h_phone.memory.bandwidth.m_as('GB/s'))}"
# ┌── 3. GUARD (Invariants) ───────────────────────────────────────────
@@ -1953,7 +1967,7 @@ The benefits of lower bandwidth usage and reduced latency become stark when we e
# │ BandwidthBottleneck.video_height_str,
# │ BandwidthBottleneck.bytes_per_pixel_str
# └─────────────────────────────────────────────────────────────────────────────
from mlsysim import Hardware
from mlsysim import Hardware, Systems
from mlsysim.core.formulas import calc_monthly_egress_cost
from mlsysim.fmt import fmt_percent, fmt, check
from mlsysim.core.constants import (
@@ -1974,7 +1988,7 @@ class BandwidthBottleneck:
width = VIDEO_1080P_WIDTH
height = VIDEO_1080P_HEIGHT
bpp = VIDEO_BYTES_PER_PIXEL_RGB
network = Hardware.Networks.Ethernet_10G
network = Systems.Fabrics.Ethernet_10G
# ┌── 2. EXECUTE (The Compute) ─────────────────────────────────────────
bytes_per_frame = width * height * bpp
@@ -2269,7 +2283,7 @@ class EdgeSizing:
coral_power_opex = coral_tco - coral_fleet_capex
# ┌── 3. GUARD (Invariants) ───────────────────────────────────────────
if required_tflops > coral.peak_flops.m_as(TFLOPs/second):
if required_tflops > coral.compute.peak_flops.m_as(TFLOPs/second):
# Note: Coral is 4 TOPS (INT8). YOLO is FP32/INT8?
# The original code used 4 TOPS vs 2 TFLOPS required.
pass
@@ -2287,15 +2301,15 @@ class EdgeSizing:
coral_cost_str = f"{coral_cost}"
coral_power_w_str = f"{coral.tdp.m_as(watt):.0f}"
coral_tops_str = f"{coral.peak_flops.m_as(TFLOPs/second):.0f}"
coral_tops_str = f"{coral.compute.peak_flops.m_as(TFLOPs/second):.0f}"
jetson_cost_str = f"{jetson_cost}"
jetson_power_range_str = "10-40"
jetson_tops_str = f"{jetson.peak_flops.m_as(TFLOPs/second):.0f}"
jetson_tops_str = f"{jetson.compute.peak_flops.m_as(TFLOPs/second):.0f}"
nuc_cost_str = f"{nuc_cost}"
nuc_power_w_str = f"{nuc.tdp.m_as(watt):.0f}"
nuc_tops_str = f"{nuc.peak_flops.m_as(TFLOPs/second):.0f}"
nuc_tops_str = f"{nuc.compute.peak_flops.m_as(TFLOPs/second):.0f}"
coral_fleet_k_str = fmt(coral_fleet_capex / 1000, precision=0)
coral_tco_k_str = fmt(coral_tco / 1000, precision=0)
@@ -2504,7 +2518,7 @@ class BatteryTax:
"""
# ┌── 1. LOAD (Constants) ───────────────────────────────────────────────
phone = Hardware.Edge.Generic_Phone
phone = Hardware.Mobile.iPhone15Pro
power_draw = OBJECT_DETECTOR_POWER_W
# ┌── 2. EXECUTE (The Compute) ─────────────────────────────────────────
@@ -2614,7 +2628,7 @@ class MobileBatteryCapacity:
"""Namespace for mobile battery capacity."""
# ┌── 1. LOAD (Constants) ───────────────────────────────────────────────
h_phone = Hardware.Edge.Generic_Phone
h_phone = Hardware.Mobile.iPhone15Pro
phone_battery_wh = h_phone.battery_capacity.m_as('Wh') if h_phone.battery_capacity else 15
# ┌── 2. EXECUTE (The Compute) ─────────────────────────────────────────

View File

@@ -1,12 +1,19 @@
# mlsysim.core — Physics, Constants, and Analytical Solver
# mlsysim.core — Constants, Formulas, and Analytical Solvers
from . import constants
from . import config
from . import evaluation
from .constants import ureg, Q_
from .formulas import *
from .hardware import Hardware, HardwareSpec
from .models import Models, ModelSpec
# Point to the new vetted registries
from ..hardware.registry import Hardware
from ..models.registry import Models
from ..systems.registry import Systems, Tiers
from ..infra.registry import Infra
from .systems import Archetypes, Systems as LegacySystems
from .datacenters import Datacenters
from .deployment import Tiers as LegacyTiers
from .engine import Engine
from .clusters import Clusters, Nodes, ClusterSpec, NodeSpec
from .datacenters import Datacenters, Grids, Racks
from .systems import Systems, Archetypes
from .deployment import Tiers, DeploymentTier
from .scenarios import Scenarios, Applications, Fleet
from .scenarios import Scenario, Scenarios, Applications, Fleet

View File

@@ -1,72 +1,98 @@
# engine.py
# The central computational engine for ML Systems analysis.
# Ties Models, Systems, and Formulas into a single "Solver".
from dataclasses import dataclass
from .models import ModelSpec
from .systems import SystemArchetype
from .constants import ureg, Q_, BYTES_FP32, BYTES_FP16, BYTES_INT8
from pydantic import BaseModel, ConfigDict, Field
from typing import Optional, Any, Annotated
from .constants import ureg, Q_, BYTES_FP32, BYTES_FP16, BYTES_INT8, BYTES_INT4
from .formulas import calc_bottleneck
from .exceptions import OOMError
from ..models.types import Workload, TransformerWorkload, CNNWorkload
from ..hardware.types import HardwareNode, Quantity
@dataclass(frozen=True)
class PerformanceProfile:
"""The result of a system simulation."""
latency: Q_
latency_compute: Q_
latency_memory: Q_
latency_overhead: Q_
throughput: Q_
class PerformanceProfile(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
latency: Quantity
latency_compute: Quantity
latency_memory: Quantity
latency_overhead: Quantity
throughput: Quantity
bottleneck: str
arithmetic_intensity: Q_
energy: Q_
memory_footprint: Q_
peak_flops_actual: Q_
peak_bw_actual: Q_
arithmetic_intensity: Quantity
energy: Quantity
memory_footprint: Quantity
peak_flops_actual: Quantity
peak_bw_actual: Quantity
mfu: float # Model FLOPs Utilization
hfu: float # Hardware FLOPs Utilization
feasible: bool
class Engine:
"""
Unified solver for ML Systems trade-offs.
This engine implements the 'Roofline Performance Model' (Williams et al. 2009)
to identify whether a workload is compute-bound or memory-bound.
"""
@staticmethod
def solve(model: ModelSpec, system: SystemArchetype, batch_size=1, precision="fp16", efficiency=0.5) -> PerformanceProfile:
hw = system.hardware
def solve(model: Workload, hardware: HardwareNode, batch_size=1, precision="fp16", efficiency=0.5, raise_errors=False) -> PerformanceProfile:
# 1. Map Precision
if precision == "fp32":
bpp = BYTES_FP32
peak_flops = hw.peak_flops_fp32 or hw.peak_flops
peak_flops = hardware.compute.precision_flops.get("fp32", hardware.compute.peak_flops)
elif precision == "int8":
bpp = BYTES_INT8
peak_flops = hw.int8_flops or hw.peak_flops
peak_flops = hardware.compute.precision_flops.get("int8", hardware.compute.peak_flops)
elif precision == "int4":
bpp = BYTES_INT4
peak_flops = hardware.compute.precision_flops.get("int4", hardware.compute.peak_flops)
else: # Default fp16
bpp = BYTES_FP16
peak_flops = hw.peak_flops
peak_flops = hardware.compute.peak_flops
# 2. Workload
ops_per_inference = model.inference_flops or (2 * model.parameters.to(ureg.count).magnitude * ureg.flop)
if hasattr(model, "inference_flops") and model.inference_flops:
ops_per_inference = model.inference_flops
else:
# Fallback for transformers: 2 * Params
if hasattr(model, "parameters") and model.parameters:
ops_per_inference = 2 * model.parameters.to(ureg.count).magnitude * ureg.flop
else:
ops_per_inference = 0 * ureg.flop
total_ops = ops_per_inference * batch_size
memory_bytes = model.size_in_bytes(bpp)
# 3. Physics (Iron Law)
# Note: We use the hardware's memory bandwidth directly.
# 3. Iron Law (Roofline)
results = calc_bottleneck(
ops=total_ops,
model_bytes=memory_bytes,
device_flops=peak_flops * efficiency,
device_bw=hw.memory_bw
device_bw=hardware.memory.bandwidth
)
t_comp = results["compute_ms"] * ureg.ms
t_mem = results["memory_ms"] * ureg.ms
t_overhead = hw.dispatch_tax
t_overhead = hardware.dispatch_tax
# Total Latency (Pipelined Assumption: overlapping data and compute)
latency = max(t_comp, t_mem) + t_overhead
# 4. Feasibility Check
feasible = memory_bytes <= system.ram
# 4. Feasibility Check (Simple memory check)
feasible = memory_bytes <= hardware.memory.capacity
if raise_errors and not feasible:
raise OOMError(
f"OOM: {model.name} requires {memory_bytes.to('GB')} but {hardware.name} only has {hardware.memory.capacity.to('GB')}.",
required_bytes=memory_bytes,
available_bytes=hardware.memory.capacity
)
# 5. Utilization Metrics
# MFU: Model FLOPs Utilization (Actual / Peak)
# HFU: Hardware FLOPs Utilization
throughput_samples_per_sec = (batch_size / latency).to(1/ureg.second).magnitude
actual_flops_delivered = ops_per_inference.magnitude * throughput_samples_per_sec
mfu = actual_flops_delivered / peak_flops.magnitude if peak_flops.magnitude > 0 else 0.0
hfu = mfu / efficiency if efficiency > 0 else 0.0 # HFU is normalized by achieved compute efficiency
return PerformanceProfile(
latency=latency,
@@ -76,9 +102,11 @@ class Engine:
throughput=(batch_size / latency).to(1/ureg.second),
bottleneck=results["bottleneck"],
arithmetic_intensity=results["intensity"] * (ureg.flop / ureg.byte),
energy=(hw.tdp * latency).to(ureg.joule) if hw.tdp else 0 * ureg.joule,
energy=(hardware.tdp * latency).to(ureg.joule) if hardware.tdp else 0 * ureg.joule,
memory_footprint=memory_bytes,
peak_flops_actual=peak_flops * efficiency,
peak_bw_actual=hw.memory_bw,
peak_bw_actual=hardware.memory.bandwidth,
mfu=mfu,
hfu=hfu,
feasible=feasible
)

View File

@@ -1,10 +1,10 @@
# formulas.py
# Canonical equations for Machine Learning Systems
# centralizing the logic for TCO, Physics, and Performance math.
# centralizing the logic for TCO, Roofline, and Performance math.
import math
import pint
from .constants import ureg, SPEED_OF_LIGHT_FIBER_KM_S, MS, MB, GB, hour, second, byte
from .constants import ureg, Q_, SPEED_OF_LIGHT_FIBER_KM_S, MS, MB, GB, hour, second, byte
def _ensure_unit(val, unit):
"""Helper to attach unit if value is a raw number."""
@@ -20,7 +20,7 @@ def calc_network_latency_ms(distance_km):
def dTime(total_ops, num_devices, peak_flops_per_device, efficiency_eta):
"""
Core training time calculation (physics-first).
Core training time calculation (first-principles).
Returns a Pint Quantity in seconds.
"""
# ops / (n * p * eta)
@@ -68,6 +68,16 @@ def calc_bottleneck(ops, model_bytes, device_flops, device_bw):
memory_time = model_bytes / device_bw
t_comp_ms = compute_time.m_as(ureg.millisecond)
t_mem_ms = memory_time.m_as(ureg.millisecond)
if t_comp_ms == 0:
return {
"compute_ms": 0.0,
"memory_ms": t_mem_ms,
"bottleneck": "Memory",
"ratio": float('inf'),
"intensity": 0.0
}
is_memory_bound = t_mem_ms > t_comp_ms
ratio = t_mem_ms / t_comp_ms if is_memory_bound else t_comp_ms / t_mem_ms
intensity = ops / model_bytes
@@ -175,6 +185,96 @@ def calc_tree_allreduce_time(message_bytes, n_gpus, bandwidth_bytes_s, latency_s
return (bw_term + lat_term).to(ureg.second)
def calc_transformer_training_flops(n_params, n_tokens):
"""
Estimate total training FLOPs for a Transformer model (6PD rule).
T ≈ 6 × P × D
Source: Kaplan et al. (2020), "Scaling Laws for Neural Language Models"
Args:
n_params: Number of parameters (P)
n_tokens: Number of training tokens (D)
Returns:
Quantity[flop]: Total training FLOPs
"""
p = _ensure_unit(n_params, ureg.param).to(ureg.count).magnitude
d = _ensure_unit(n_tokens, ureg.count).magnitude
return (6 * p * d) * ureg.flop
def calc_activation_memory(n_layers, seq_len, batch_size, hidden_dim, n_heads=None,
precision_bytes=2, strategy="selective"):
"""
Estimate activation memory for a Transformer layer.
Source: Korthikanti et al. (2023), "Reducing Activation Memory in Transformer Training"
Args:
n_layers: Number of layers (L)
seq_len: Sequence length (S)
batch_size: Batch size (B)
hidden_dim: Hidden dimension (H)
n_heads: Number of attention heads (A)
precision_bytes: Bytes per element (default 2 for FP16)
strategy: Recompute strategy ('none', 'selective', 'full')
Returns:
Quantity[byte]: Total activation memory
"""
s, b, h = seq_len, batch_size, hidden_dim
# Basic activation per layer: 34 * s * b * h (without recompute)
# With selective recompute, it's significantly lower.
if strategy == "full":
# Only store inputs to the block
bytes_per_layer = 2 * s * b * h * precision_bytes
elif strategy == "selective":
# Store some intermediate activations to avoid full recompute
# Reference estimate: ~10 * s * b * h bytes
bytes_per_layer = 10 * s * b * h * precision_bytes
else:
# No recompute: store everything
bytes_per_layer = 34 * s * b * h * precision_bytes
return (n_layers * bytes_per_layer) * ureg.byte
def calc_hierarchical_allreduce_time(message_bytes, n_nodes, gpus_per_node,
intra_node_bw, inter_node_bw,
intra_node_lat=Q_("500 ns"), inter_node_lat=Q_("5 us")):
"""
Hierarchical AllReduce time estimate (Intra-node NVLink + Inter-node IB).
T = T_intra + T_inter + T_intra
Source: Standard implementation in NCCL / Horovod.
Args:
message_bytes: Message size (M)
n_nodes: Number of nodes
gpus_per_node: GPUs per node (usually 8)
intra_node_bw: Intra-node bandwidth (NVLink)
inter_node_bw: Inter-node bandwidth (InfiniBand)
intra_node_lat: Intra-node latency
inter_node_lat: Inter-node latency
Returns:
Quantity[second]: Estimated communication time
"""
# 1. Intra-node Reduce (to one GPU per node)
t_reduce = calc_ring_allreduce_time(message_bytes, gpus_per_node, intra_node_bw, intra_node_lat)
# 2. Inter-node AllReduce (between lead GPUs of each node)
t_allreduce_inter = calc_ring_allreduce_time(message_bytes, n_nodes, inter_node_bw, inter_node_lat)
# 3. Intra-node Broadcast (back to all GPUs)
t_broadcast = t_reduce # Symmetry assumption
return t_reduce + t_allreduce_inter + t_broadcast
def calc_young_daly_interval(checkpoint_cost_s, mtbf_s):
"""
Optimal checkpoint interval (Young-Daly model).

423
mlsysim/core/solver.py Normal file
View File

@@ -0,0 +1,423 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, ConfigDict
from .engine import PerformanceProfile, Engine
from .formulas import (
calc_ring_allreduce_time,
calc_tree_allreduce_time,
calc_hierarchical_allreduce_time,
calc_mtbf_cluster,
calc_young_daly_interval,
calc_failure_probability,
calc_pipeline_bubble
)
from .constants import ureg, Q_
from ..models.types import Workload, TransformerWorkload
from ..hardware.types import HardwareNode
from ..systems.types import Fleet, NetworkFabric
from ..infra.types import Datacenter, GridProfile
class BaseSolver(ABC):
@abstractmethod
def solve(self, **kwargs) -> Any:
pass
class SingleNodeSolver(BaseSolver):
"""
Resolves single-node hardware Roofline bounds and feasibility.
This solver handles the 'Iron Law' of machine learning systems,
calculating whether a model fits in memory and predicting its
throughput based on arithmetic intensity.
"""
def solve(self, model: Workload, hardware: HardwareNode, batch_size: int = 1, precision: str = "fp16", efficiency: float = 0.5, raise_errors: bool = False) -> PerformanceProfile:
"""
Solves the performance profile for a single hardware node.
Parameters
----------
model : Workload
The model architecture (Transformer, CNN).
hardware : HardwareNode
The target hardware specification.
batch_size : int, optional
Number of samples per inference/step, by default 1.
precision : str, optional
Numerical precision format ('fp32', 'fp16', 'int8', 'int4'), by default "fp16".
efficiency : float, optional
Hardware utilization efficiency (0.0 to 1.0), by default 0.5.
raise_errors : bool, optional
Whether to raise OOMError for infeasible workloads, by default False.
Returns
-------
PerformanceProfile
The resulting latency, throughput, and bottleneck analysis.
"""
return Engine.solve(model, hardware, batch_size=batch_size, precision=precision, efficiency=efficiency, raise_errors=raise_errors)
class DistributedSolver(BaseSolver):
"""
Resolves fleet-wide communication, synchronization, and pipelining constraints.
This solver models the constraints of distributed scale for distributed training. It
decomposes a workload across a cluster using 3D Parallelism (DP, TP, PP)
and calculates the resulting communication overheads and idle times
(bubbles) that determine the Model FLOPs Utilization (MFU).
"""
def solve(self,
model: Workload,
fleet: Fleet,
batch_size: int = 1,
precision: str = "fp16",
efficiency: float = 0.5,
tp_size: int = 1,
pp_size: int = 1,
microbatch_count: int = 1,
topology_override: Optional[str] = None) -> Dict[str, Any]:
"""
Calculates distributed training performance using the 3D Parallelism model.
Parameters
----------
model : Workload
The model architecture to simulate.
fleet : Fleet
The hardware cluster and network topology.
batch_size : int
Global batch size.
precision : str
Numerical precision (fp16, fp32, int8).
efficiency : float
Achieved compute efficiency (0.0 to 1.0).
tp_size : int
Tensor Parallelism degree. Splits individual layers across GPUs,
usually within a single node over high-speed NVLink.
pp_size : int
Pipeline Parallelism degree. Chains model layers across multiple
nodes, introducing 'pipeline bubbles' while saving memory.
microbatch_count : int
Number of microbatches (M). Increasing M reduces the pipeline
bubble but increases synchronization overhead.
topology_override : str, optional
Force a specific topology (ring, tree).
Returns
-------
Dict[str, Any]
Metrics including DP/TP latency, the Pipeline Bubble penalty,
and the final Scaling Efficiency.
"""
# 1. 3D Parallelism Decomposition
n_accelerators = fleet.total_accelerators
dp_size = n_accelerators // (tp_size * pp_size)
if dp_size < 1:
raise ValueError(f"Infeasible 3D Parallelism: TP({tp_size}) * PP({pp_size}) > Total({n_accelerators})")
# 2. Single Node Performance (Computation)
node_perf = Engine.solve(model, fleet.node.accelerator, batch_size=batch_size // dp_size, precision=precision, efficiency=efficiency)
# 3. Communication Overhead (Network)
# Apply Hierarchical Model: Intra-node (NVLink) vs Inter-node (InfiniBand)
message_size = model.size_in_bytes()
# DP AllReduce (Weights/Gradients)
if dp_size > 1:
if fleet.node.accelerators_per_node > 1 and dp_size > fleet.node.accelerators_per_node:
# Hierarchical: Ring within node, then Ring across nodes
t_comm_dp = calc_hierarchical_allreduce_time(
message_bytes=message_size,
n_nodes=dp_size // fleet.node.accelerators_per_node,
gpus_per_node=fleet.node.accelerators_per_node,
intra_node_bw=fleet.node.intra_node_bw,
inter_node_bw=fleet.fabric.bandwidth / fleet.fabric.oversubscription_ratio,
inter_node_lat=fleet.fabric.latency or Q_("5 us")
)
else:
# Single node or small DP: Intra-node only
t_comm_dp = calc_ring_allreduce_time(
message_size,
dp_size,
fleet.node.intra_node_bw,
Q_("500 ns")
)
else:
t_comm_dp = Q_("0 ms")
# TP Communication (Assuming intra-node NVLink)
t_comm_tp = (message_size / tp_size / fleet.node.intra_node_bw).to("ms") if tp_size > 1 else Q_("0 ms")
# 4. Pipeline Parallelism (PP) Bubble
# Source: Narayanan et al. (2019), "PipePipe: Efficient Pipeline Parallelism"
bubble_fraction = calc_pipeline_bubble(pp_size, microbatch_count)
t_bubble = (node_perf.latency * bubble_fraction) if pp_size > 1 else Q_("0 ms")
# 5. Total Latency and Scaling Efficiency
total_comm_latency = t_comm_dp + t_comm_tp
step_latency_total = node_perf.latency + total_comm_latency + t_bubble
scaling_efficiency = (node_perf.latency / step_latency_total).magnitude
return {
"node_performance": node_perf,
"dp_communication_latency": t_comm_dp,
"tp_communication_latency": t_comm_tp,
"communication_latency": total_comm_latency, # Backwards compatibility for tests
"pipeline_bubble_latency": t_bubble,
"bubble_fraction": bubble_fraction,
"step_latency_total": step_latency_total,
"scaling_efficiency": scaling_efficiency,
"effective_throughput": (n_accelerators * node_perf.throughput * scaling_efficiency),
"parallelism": {"dp": dp_size, "tp": tp_size, "pp": pp_size}
}
class ReliabilitySolver(BaseSolver):
"""
Calculates Mean Time Between Failures (MTBF) and optimal checkpointing intervals.
This solver handles the reliability modeling of massive clusters, helping
determine the 'Goodput' of long-running training jobs. It identifies
the probability of a job failure before completion and calculates the
Young-Daly optimal interval to minimize wasted compute time.
"""
def solve(self, fleet: Fleet, job_duration_hours: float, checkpoint_time_s: float = 60.0) -> Dict[str, Any]:
"""
Calculates reliability and checkpointing metrics for a fleet.
Parameters
----------
fleet : Fleet
The hardware cluster configuration.
job_duration_hours : float
Total wall-clock duration of the training job.
checkpoint_time_s : float, optional
Time taken to save a single checkpoint, by default 60.0.
Returns
-------
Dict[str, Any]
Reliability metrics including fleet MTBF and failure probability.
"""
accel_mtbf = Q_(50000, "hour")
node_mtbf = accel_mtbf / fleet.node.accelerators_per_node
fleet_mtbf = calc_mtbf_cluster(node_mtbf, fleet.count)
job_dur_q = Q_(job_duration_hours, "hour")
prob_fail = calc_failure_probability(fleet_mtbf, job_dur_q)
ckpt_time_q = Q_(checkpoint_time_s, "second")
optimal_interval = calc_young_daly_interval(ckpt_time_q, fleet_mtbf.to("second"))
return {
"fleet_mtbf": fleet_mtbf,
"failure_probability": prob_fail,
"optimal_checkpoint_interval": optimal_interval,
"expected_failures": (job_dur_q / fleet_mtbf).magnitude
}
class SustainabilitySolver(BaseSolver):
"""
Calculates Datacenter-scale Sustainability metrics.
Handles Power Usage Effectiveness (PUE), Carbon Intensity,
and Water Usage Effectiveness (WUE) across different regional grids.
This solver models the 'Infrastructure Tax' — the energy spent on
cooling and power delivery rather than on neural computation.
"""
def solve(self, fleet: Fleet, duration_days: float, datacenter: Optional[Datacenter] = None) -> Dict[str, Any]:
"""
Calculates energy, carbon, and water footprint for a fleet operation.
Parameters
----------
fleet : Fleet
The hardware cluster configuration.
duration_days : float
Operating duration in days.
datacenter : Datacenter, optional
A specific datacenter profile, defaults to fleet's region.
Returns
-------
Dict[str, Any]
Sustainability metrics including total energy (kWh) and carbon (kgCO2e).
"""
# 1. Resolve Environment
dc = datacenter or fleet.datacenter
# Flexibly handle if dc is already a GridProfile or a Datacenter
if hasattr(dc, 'grid'):
region = dc.grid
else:
region = dc or fleet.region
if not region:
from ..infra.registry import Grids
region = Grids.US_Avg
duration_hours = duration_days * 24
# 2. Power
it_power_w = fleet.node.accelerator.tdp * fleet.total_accelerators if fleet.node.accelerator.tdp else Q_("700 W") * fleet.total_accelerators
# 3. Energy Consumption
it_energy_kwh = (it_power_w * Q_(duration_hours, "hour")).to("kWh")
# Apply PUE
pue = getattr(dc, 'pue', fleet.effective_pue)
total_energy_kwh = it_energy_kwh * pue
# 4. Carbon Footprint
carbon_kg = region.carbon_kg(it_energy_kwh.magnitude) if hasattr(region, 'carbon_kg') else it_energy_kwh.magnitude * (region.carbon_intensity_g_kwh / 1000.0)
# 5. Water Usage
# Resolve WUE from dc.grid, dc, or region
if hasattr(dc, 'grid') and dc.grid:
wue = dc.grid.wue
elif hasattr(dc, 'wue'):
wue = dc.wue
else:
wue = region.wue
water_liters = total_energy_kwh.magnitude * wue
return {
"it_energy_kwh": it_energy_kwh,
"total_energy_kwh": total_energy_kwh,
"carbon_footprint_kg": carbon_kg,
"water_usage_liters": water_liters,
"pue": pue,
"region_name": region.name
}
class ServingSolver(BaseSolver):
"""
Analyzes the two-phase LLM serving lifecycle: Pre-fill vs. Decoding.
LLM inference is not a single mathematical operation; it is a stateful
process with two distinct physical regimes:
1. **Pre-fill Phase**: The initial processing of the input prompt. This
is a 'Compute Beast' phase where all prompt tokens are processed
in parallel, saturating the GPU's arithmetic units.
2. **Decoding Phase**: The token-by-token generation. This is a
'Bandwidth Hog' phase. Because the model must read all parameters
from memory just to generate a single token, it is limited entirely
by HBM bandwidth.
This solver also models the **KV-Cache**, the memory required to store
previous token states, which grows linearly with sequence length and
batch size, eventually hitting the 'Memory Wall'.
"""
def solve(self, model: TransformerWorkload, hardware: HardwareNode, seq_len: int, batch_size: int = 1, precision: str = "fp16", efficiency: float = 0.5) -> Dict[str, Any]:
"""
Solves for LLM serving performance.
Parameters
----------
model : TransformerWorkload
The LLM model architecture.
hardware : HardwareNode
The target hardware for inference.
seq_len : int
The total context window (prompt + generated tokens).
batch_size : int, optional
Number of concurrent user requests.
precision : str, optional
Numerical format. Lower precision (INT8/INT4) reduces
memory pressure and speeds up the Decoding phase.
efficiency : float, optional
Compute utilization efficiency, primarily affecting the Pre-fill phase.
Returns
-------
Dict[str, Any]
Inference metrics including Time-To-First-Token (TTFT),
Inter-Token Latency (ITL), and total KV-cache footprint.
"""
from .constants import BYTES_FP16, BYTES_FP32, BYTES_INT8, BYTES_INT4
prec_map = {"fp16": BYTES_FP16, "fp32": BYTES_FP32, "int8": BYTES_INT8, "int4": BYTES_INT4}
bpp = prec_map.get(precision, BYTES_FP16)
peak_flops = hardware.compute.precision_flops.get(precision, hardware.compute.peak_flops)
prefill_ops = 2 * model.parameters.to(ureg.count).magnitude * seq_len * batch_size * ureg.flop
t_prefill = (prefill_ops / (peak_flops * efficiency)).to("ms") + hardware.dispatch_tax
model_weights_bytes = model.size_in_bytes(bpp)
kv_cache_bytes = model.get_kv_cache_size(seq_len=seq_len, batch_size=batch_size, precision=bpp)
t_decode_per_token = ((model_weights_bytes + kv_cache_bytes) / hardware.memory.bandwidth).to("ms")
total_memory_required = model_weights_bytes + kv_cache_bytes
feasible = total_memory_required <= hardware.memory.capacity
return {
"feasible": feasible,
"ttft": t_prefill,
"itl": t_decode_per_token,
"kv_cache_size": kv_cache_bytes.to("GB"),
"model_weights_size": model_weights_bytes.to("GB"),
"total_memory_required": total_memory_required.to("GB"),
"memory_utilization": (total_memory_required / hardware.memory.capacity).to_base_units().magnitude
}
class EconomicsSolver(BaseSolver):
"""
Calculates Total Cost of Ownership (TCO) including Capex and Opex.
Combines hardware costs, energy consumption, and maintenance
into a single financial model for the fleet. This solver exposes
the ROI of architectural efficiency by showing how reducing power
draw or increasing throughput directly impacts the bottom line.
"""
def solve(self, fleet: Fleet, duration_days: float, kwh_price: Optional[float] = None, datacenter: Optional[Any] = None, grid: Optional[Any] = None) -> Dict[str, Any]:
"""
Calculates the TCO for a fleet over a specified duration.
Parameters
----------
fleet : Fleet
The hardware cluster configuration.
duration_days : float
Operation duration in days.
kwh_price : float, optional
Price of electricity per kWh.
datacenter : Datacenter, optional
A specific datacenter profile.
grid : GridProfile, optional
A specific grid profile.
Returns
-------
Dict[str, Any]
Financial metrics including CapEx, OpEx, and total TCO.
"""
sust_solver = SustainabilitySolver()
energy_result = sust_solver.solve(fleet, duration_days, datacenter=datacenter or grid)
price = kwh_price
if price is None:
# Try to resolve from grid/datacenter or default
target = grid or datacenter or fleet.datacenter or fleet.region
price = getattr(target, 'kwh_price', 0.12)
opex_energy = energy_result["total_energy_kwh"].magnitude * price
unit_cost = fleet.node.accelerator.unit_cost or Q_("30000 USD")
total_capex = unit_cost.magnitude * fleet.total_accelerators
annual_maintenance_ratio = 0.05
opex_maintenance = total_capex * annual_maintenance_ratio * (duration_days / 365.0)
# Merge energy result into TCO result
result = {
"capex_usd": total_capex,
"opex_energy_usd": opex_energy,
"opex_maintenance_usd": opex_maintenance,
"total_opex_usd": opex_energy + opex_maintenance,
"tco_usd": total_capex + opex_energy + opex_maintenance
}
result.update(energy_result)
return result

View File

@@ -0,0 +1 @@
from .registry import Hardware

View File

@@ -0,0 +1,210 @@
from .types import HardwareNode, ComputeCore, MemoryHierarchy
from ..core.constants import (
ureg,
V100_MEM_BW, V100_FLOPS_FP16_TENSOR, V100_MEM_CAPACITY, V100_TDP, V100_FLOPS_FP32,
A100_MEM_BW, A100_FLOPS_FP16_TENSOR, A100_MEM_CAPACITY, A100_TDP, A100_FLOPS_FP32, A100_FLOPS_TF32, A100_FLOPS_INT8,
H100_MEM_BW, H100_FLOPS_FP16_TENSOR, H100_MEM_CAPACITY, H100_TDP, H100_FLOPS_TF32, H100_FLOPS_FP8_TENSOR, H100_FLOPS_INT8,
B200_MEM_BW, B200_FLOPS_FP16_TENSOR, B200_MEM_CAPACITY, B200_TDP, B200_FLOPS_FP8_TENSOR, B200_FLOPS_INT4,
MI300X_MEM_BW, MI300X_FLOPS_FP16_TENSOR, MI300X_MEM_CAPACITY, MI300X_TDP,
TPUV5P_MEM_BW, TPUV5P_FLOPS_BF16, TPUV5P_MEM_CAPACITY,
T4_MEM_BW, T4_FLOPS_FP16_TENSOR, T4_TDP, T4_FLOPS_INT8
)
class CloudHardware:
"""Datacenter-scale accelerators (Volume II)."""
V100 = HardwareNode(
name="NVIDIA V100",
release_year=2017,
compute=ComputeCore(peak_flops=V100_FLOPS_FP16_TENSOR, precision_flops={"fp32": V100_FLOPS_FP32}),
memory=MemoryHierarchy(capacity=V100_MEM_CAPACITY, bandwidth=V100_MEM_BW),
tdp=V100_TDP,
dispatch_tax=0.02 * ureg.ms
)
A100 = HardwareNode(
name="NVIDIA A100",
release_year=2020,
compute=ComputeCore(peak_flops=A100_FLOPS_FP16_TENSOR, precision_flops={"fp32": A100_FLOPS_FP32, "tf32": A100_FLOPS_TF32, "int8": A100_FLOPS_INT8}),
memory=MemoryHierarchy(capacity=A100_MEM_CAPACITY, bandwidth=A100_MEM_BW),
tdp=A100_TDP,
dispatch_tax=0.015 * ureg.ms,
metadata={"source_url": "https://www.nvidia.com/content/dam/en-zz/Solutions/Data-Center/a100/pdf/nvidia-a100-datasheet-us-nvidia-1758950-r4-web.pdf", "last_verified": "2025-03-06"}
)
H100 = HardwareNode(
name="NVIDIA H100",
release_year=2022,
compute=ComputeCore(peak_flops=H100_FLOPS_FP16_TENSOR, precision_flops={"tf32": H100_FLOPS_TF32, "fp8": H100_FLOPS_FP8_TENSOR, "int8": H100_FLOPS_INT8}),
memory=MemoryHierarchy(capacity=H100_MEM_CAPACITY, bandwidth=H100_MEM_BW),
tdp=H100_TDP,
dispatch_tax=0.01 * ureg.ms,
metadata={"source_url": "https://resources.nvidia.com/en-us-tensor-core/nvidia-h100-tensor-core-gpu-datasheet", "last_verified": "2025-03-06"}
)
H200 = HardwareNode(
name="NVIDIA H200",
release_year=2023,
compute=ComputeCore(peak_flops=H100_FLOPS_FP16_TENSOR),
memory=MemoryHierarchy(capacity=141 * ureg.GB, bandwidth=4.8 * ureg.TB/ureg.s),
tdp=700 * ureg.W,
dispatch_tax=0.01 * ureg.ms
)
B200 = HardwareNode(
name="NVIDIA B200",
release_year=2024,
compute=ComputeCore(peak_flops=B200_FLOPS_FP16_TENSOR, precision_flops={"fp8": B200_FLOPS_FP8_TENSOR, "int4": B200_FLOPS_INT4}),
memory=MemoryHierarchy(capacity=B200_MEM_CAPACITY, bandwidth=B200_MEM_BW),
tdp=1000 * ureg.W,
dispatch_tax=0.008 * ureg.ms
)
MI300X = HardwareNode(
name="AMD MI300X",
release_year=2023,
compute=ComputeCore(peak_flops=1300 * ureg.TFLOPs/ureg.s),
memory=MemoryHierarchy(capacity=192 * ureg.GB, bandwidth=5.3 * ureg.TB/ureg.s),
tdp=750 * ureg.W,
dispatch_tax=0.012 * ureg.ms
)
TPUv5p = HardwareNode(
name="Google TPU v5p",
release_year=2023,
compute=ComputeCore(peak_flops=TPUV5P_FLOPS_BF16),
memory=MemoryHierarchy(capacity=TPUV5P_MEM_CAPACITY, bandwidth=TPUV5P_MEM_BW),
tdp=300 * ureg.W,
dispatch_tax=0.04 * ureg.ms
)
T4 = HardwareNode(
name="NVIDIA T4",
release_year=2018,
compute=ComputeCore(peak_flops=T4_FLOPS_FP16_TENSOR, precision_flops={"int8": T4_FLOPS_INT8}),
memory=MemoryHierarchy(capacity=16 * ureg.GiB, bandwidth=T4_MEM_BW),
tdp=T4_TDP,
dispatch_tax=0.03 * ureg.ms
)
class WorkstationHardware:
"""Personal computing systems used for local development."""
MacBookM3Max = HardwareNode(
name="MacBook Pro (M3 Max)",
release_year=2023,
compute=ComputeCore(peak_flops=14.2 * ureg.TFLOPs/ureg.s),
memory=MemoryHierarchy(capacity=128 * ureg.GB, bandwidth=400 * ureg.GB/ureg.s),
tdp=100 * ureg.W,
dispatch_tax=0.05 * ureg.ms
)
class MobileHardware:
"""Smartphone and handheld devices (Volume I)."""
iPhone15Pro = HardwareNode(
name="iPhone 15 Pro (A17 Pro)",
release_year=2023,
compute=ComputeCore(peak_flops=35 * ureg.TFLOPs/ureg.s),
memory=MemoryHierarchy(capacity=8 * ureg.GB, bandwidth=100 * ureg.GB/ureg.s),
tdp=5 * ureg.W,
battery_capacity=15 * ureg.Wh,
dispatch_tax=1.0 * ureg.ms
)
Pixel8 = HardwareNode(
name="Google Pixel 8 (Tensor G3)",
release_year=2023,
compute=ComputeCore(peak_flops=15 * ureg.TFLOPs/ureg.s),
memory=MemoryHierarchy(capacity=8 * ureg.GB, bandwidth=60 * ureg.GB/ureg.s),
tdp=5 * ureg.W,
dispatch_tax=1.2 * ureg.ms
)
Snapdragon8Gen3 = HardwareNode(
name="Snapdragon 8 Gen 3",
release_year=2023,
compute=ComputeCore(peak_flops=45 * ureg.TFLOPs/ureg.s),
memory=MemoryHierarchy(capacity=12 * ureg.GB, bandwidth=77 * ureg.GB/ureg.s),
tdp=5 * ureg.W,
dispatch_tax=1.5 * ureg.ms
)
class EdgeHardware:
"""Robotics and Industrial Edge (Volume I)."""
JetsonOrinNX = HardwareNode(
name="NVIDIA Jetson Orin NX",
release_year=2023,
compute=ComputeCore(peak_flops=100 * ureg.TFLOPs/ureg.s),
memory=MemoryHierarchy(capacity=16 * ureg.GB, bandwidth=102 * ureg.GB/ureg.s),
tdp=25 * ureg.W,
dispatch_tax=0.2 * ureg.ms
)
Coral = HardwareNode(
name="Google Coral Edge TPU",
release_year=2019,
compute=ComputeCore(peak_flops=4 * ureg.TFLOPs/ureg.s),
memory=MemoryHierarchy(capacity=1 * ureg.GB, bandwidth=8 * ureg.GB/ureg.s),
tdp=2 * ureg.W,
dispatch_tax=1.0 * ureg.ms
)
NUC_Movidius = HardwareNode(
name="Intel NUC + Movidius",
release_year=2020,
compute=ComputeCore(peak_flops=1 * ureg.TFLOPs/ureg.s),
memory=MemoryHierarchy(capacity=16 * ureg.GB, bandwidth=25 * ureg.GB/ureg.s),
tdp=15 * ureg.W,
dispatch_tax=2.0 * ureg.ms
)
GenericServer = HardwareNode(
name="Edge Server",
release_year=2024,
compute=ComputeCore(peak_flops=1 * ureg.TFLOPs/ureg.s),
memory=MemoryHierarchy(capacity=128 * ureg.GB, bandwidth=100 * ureg.GB/ureg.s),
tdp=300 * ureg.W,
dispatch_tax=0.1 * ureg.ms
)
class TinyHardware:
"""Microcontrollers and sub-watt devices."""
ESP32_S3 = HardwareNode(
name="ESP32-S3 (AI)",
release_year=2022,
compute=ComputeCore(peak_flops=0.0005 * ureg.TFLOPs/ureg.s),
memory=MemoryHierarchy(capacity=512 * ureg.KiB, bandwidth=0.2 * ureg.GB/ureg.s),
tdp=1.2 * ureg.W,
dispatch_tax=5.0 * ureg.ms
)
ESP32 = ESP32_S3 # Alias for backward compatibility
HimaxWE1 = HardwareNode(
name="Himax WE-I Plus",
release_year=2020,
compute=ComputeCore(peak_flops=0.0002 * ureg.TFLOPs/ureg.s),
memory=MemoryHierarchy(capacity=2 * ureg.MB, bandwidth=0.1 * ureg.GB/ureg.s),
tdp=0.005 * ureg.W,
dispatch_tax=2.0 * ureg.ms
)
class Hardware:
Cloud = CloudHardware
Workstation = WorkstationHardware
Mobile = MobileHardware
Edge = EdgeHardware
Tiny = TinyHardware
# Common Aliases (Vetted only)
V100 = CloudHardware.V100
A100 = CloudHardware.A100
H100 = CloudHardware.H100
H200 = CloudHardware.H200
B200 = CloudHardware.B200
MI300X = CloudHardware.MI300X
TPUv5p = CloudHardware.TPUv5p
T4 = CloudHardware.T4
iPhone = MobileHardware.iPhone15Pro
Snapdragon = MobileHardware.Snapdragon8Gen3
Jetson = EdgeHardware.JetsonOrinNX
ESP32 = TinyHardware.ESP32_S3
Himax = TinyHardware.HimaxWE1

View File

@@ -0,0 +1 @@
from .registry import Infra, Grids

View File

@@ -0,0 +1 @@
from .registry import Models

149
mlsysim/models/types.py Normal file
View File

@@ -0,0 +1,149 @@
from pydantic import BaseModel, ConfigDict, Field
from typing import Optional, Dict, Any, Annotated, Union
from ..core.constants import Q_, ureg, BYTES_FP16
from ..core.types import Quantity, Metadata
from pydantic import AfterValidator
class ComputationGraph(BaseModel):
"""
Hardware-Agnostic representation of a Workload.
The 'Intermediate Representation' (IR) of demand.
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
name: str
total_ops: Quantity
parameter_count: Quantity
weight_bytes: Quantity
arithmetic_intensity: Quantity # Ops/Byte
# Optional metadata
layers: Optional[int] = None
def __repr__(self):
return f"ComputationGraph({self.name}, {self.total_ops:~P})"
class Workload(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
name: str
architecture: str
metadata: Metadata = Field(default_factory=Metadata)
parameters: Optional[Quantity] = None
model_size: Optional[Quantity] = None
inference_flops: Optional[Quantity] = None
def lower(self, precision: Quantity = BYTES_FP16) -> ComputationGraph:
"""Lowers the workload into a hardware-agnostic computation graph."""
raise NotImplementedError
def size_in_bytes(self, precision: Quantity = BYTES_FP16) -> Quantity:
if self.model_size is not None:
return self.model_size
if self.parameters is not None:
param_count = self.parameters.to(ureg.count).magnitude
bpp = precision.to(ureg.byte).magnitude
return (param_count * bpp * ureg.byte).to(ureg.byte)
raise NotImplementedError("Workload must define either parameters or model_size to calculate size in bytes.")
class TransformerWorkload(Workload):
parameters: Quantity
layers: int
hidden_dim: Optional[int] = None
heads: Optional[int] = None
kv_heads: Optional[int] = None
training_ops: Optional[Quantity] = None
inference_flops: Optional[Quantity] = None
def size_in_bytes(self, precision: Quantity = BYTES_FP16) -> Quantity:
param_count = self.parameters.to(ureg.count).magnitude
bpp = precision.to(ureg.byte).magnitude
return (param_count * bpp * ureg.byte).to(ureg.byte)
def get_kv_cache_size(self, seq_len: int, batch_size: int, precision: Quantity = BYTES_FP16) -> Quantity:
from ..core.formulas import calc_kv_cache_size
h_dim = self.hidden_dim or 4096
n_heads = self.heads or 32
head_dim = h_dim // n_heads
n_kv_heads = self.kv_heads or n_heads
return calc_kv_cache_size(n_layers=self.layers, n_heads=n_kv_heads, head_dim=head_dim, seq_len=seq_len, batch_size=batch_size, bytes_per_elem=precision)
def training_memory(self, batch_size: int, seq_len: int, precision: str = "fp16", optimizer: str = "adam", strategy: str = "selective") -> Quantity:
"""
Estimate training memory for a Transformer model.
Source: Shoeybi et al. (2019), "Megatron-LM: Training Multi-Billion Parameter
Language Models Using Model Parallelism"
Args:
batch_size: Mini-batch size (B)
seq_len: Sequence length (S)
precision: Precision format ('fp32', 'fp16', 'int8', 'int4')
optimizer: Optimizer type ('adam', 'sgd')
strategy: Recompute strategy ('none', 'selective', 'full')
Returns:
Quantity[byte]: Total training memory per GPU
"""
from ..core.constants import BYTES_FP32, BYTES_FP16, BYTES_INT8, BYTES_INT4
from ..core.formulas import calc_activation_memory
prec_map = {"fp32": BYTES_FP32, "fp16": BYTES_FP16, "int8": BYTES_INT8, "int4": BYTES_INT4}
bpp = prec_map.get(precision, BYTES_FP16).to(ureg.byte).magnitude
n_params = self.parameters.to(ureg.count).magnitude
# 1. Weights and Gradients
w_grad_mem = n_params * (bpp + bpp) * ureg.byte
# 2. Optimizer States (Adam = 12 bytes/param for FP32 states)
if optimizer.lower() == "adam":
# Adam: master weights (4), momentum (4), variance (4) = 12 bytes/param
opt_mem = n_params * 12 * ureg.byte
else:
# SGD: master weights (4) = 4 bytes/param
opt_mem = n_params * 4 * ureg.byte
# 3. Activation Memory (proportional to B, S, H)
act_mem = calc_activation_memory(
n_layers=self.layers,
seq_len=seq_len,
batch_size=batch_size,
hidden_dim=self.hidden_dim or 4096,
precision_bytes=bpp,
strategy=strategy
)
return (w_grad_mem + opt_mem + act_mem).to(ureg.GB)
def lower(self, precision: Quantity = BYTES_FP16) -> ComputationGraph:
ops = self.inference_flops or (2 * self.parameters.to(ureg.count).magnitude * ureg.flop)
weights = self.size_in_bytes(precision)
return ComputationGraph(
name=self.name,
total_ops=ops,
parameter_count=self.parameters,
weight_bytes=weights,
arithmetic_intensity=(ops / weights).to("flop/byte"),
layers=self.layers
)
class CNNWorkload(Workload):
parameters: Quantity
inference_flops: Quantity
layers: Optional[int] = None
def size_in_bytes(self, precision: Quantity = BYTES_FP16) -> Quantity:
param_count = self.parameters.to(ureg.count).magnitude
bpp = precision.to(ureg.byte).magnitude
return (param_count * bpp * ureg.byte).to(ureg.byte)
def lower(self, precision: Quantity = BYTES_FP16) -> ComputationGraph:
weights = self.size_in_bytes(precision)
return ComputationGraph(
name=self.name,
total_ops=self.inference_flops,
parameter_count=self.parameters,
weight_bytes=weights,
arithmetic_intensity=(self.inference_flops / weights).to("flop/byte"),
layers=self.layers
)

View File

@@ -0,0 +1 @@
from .registry import Systems