mirror of
https://github.com/harvard-edge/cs249r_book.git
synced 2026-05-01 01:59:10 -05:00
Complete MLSYSIM v0.1.0 implementation with: - Documentation website (Quarto): landing page with animated hero and capability carousel, 4 tutorials (hello world, LLM serving, distributed training, sustainability), hardware/model/fleet/infra catalogs, solver guide, whitepaper, math foundations, glossary, and full quartodoc API reference - Typed registry system: Hardware (18 devices across 5 tiers), Models (15 workloads), Systems (fleets, clusters, fabrics), Infrastructure (grid profiles, rack configs, datacenters) - Core types: Pint-backed Quantity, Metadata provenance tracking, custom exception hierarchy (OOMError, SLAViolation) - SimulationConfig with YAML/JSON loading and pre-validation - Scenario system tying workloads to systems with SLA constraints - Multi-level evaluation scorecard (feasibility, performance, macro) - Examples, tests, and Jetson Orin NX spec fix (100 → 25 TFLOP/s) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
360 lines
14 KiB
Plaintext
360 lines
14 KiB
Plaintext
---
|
||
title: "Distributed Training: 3D Parallelism and Scaling Efficiency"
|
||
subtitle: "Discover why 1024 GPUs rarely deliver 1024× speedup — and how to minimize the gap."
|
||
---
|
||
|
||
::: {.callout-note}
|
||
## Background: Why distributed training?
|
||
|
||
Some models are too large to fit in a single GPU's memory, and some training jobs would take months on one GPU. **Distributed training** splits the work across many GPUs. This tutorial explores the three main ways to split work and the overhead each one introduces. You should complete the Hello World and LLM Serving tutorials before this one.
|
||
:::
|
||
|
||
Scaling a training job from 1 GPU to 1024 GPUs incurs overhead at every step.
|
||
Communication, pipeline stalls, and coordination each chip away at theoretical speedup.
|
||
Understanding where that efficiency goes, and how to recover it, is what separates
|
||
a well-tuned distributed training job from an expensive waste of cluster time.
|
||
|
||
By the end of this tutorial you will understand:
|
||
|
||
- How **Data Parallelism**, **Tensor Parallelism**, and **Pipeline Parallelism** decompose work across GPUs
|
||
- Why synchronization (ring all-reduce) overhead depends on model size and network bandwidth
|
||
- Why **pipeline bubbles** reduce effective GPU utilization
|
||
- How to calculate **scaling efficiency** for a real cluster
|
||
|
||
::: {.callout-tip}
|
||
## 3D Parallelism at a Glance
|
||
|
||
Modern distributed training uses three orthogonal strategies simultaneously:
|
||
|
||
| Strategy | What it splits | Main overhead |
|
||
|:---|:---|:---|
|
||
| **Data Parallelism (DP)** | Batch across GPUs | All-reduce gradients after backward pass |
|
||
| **Tensor Parallelism (TP)** | Individual matrix ops within a layer | All-gather within each forward/backward |
|
||
| **Pipeline Parallelism (PP)** | Layer groups across nodes | Pipeline bubble at start/end of batch |
|
||
|
||
The product $\text{DP} \times \text{TP} \times \text{PP} = \text{total GPUs}$.
|
||
:::
|
||
|
||
---
|
||
|
||
## 1. Setup
|
||
|
||
```{python}
|
||
#| echo: false
|
||
#| output: false
|
||
# Build-system path setup — hidden from students
|
||
import sys, os, importlib.util
|
||
current_dir = os.getcwd()
|
||
root_path = os.path.abspath(os.path.join(current_dir, "../../../"))
|
||
if not os.path.exists(os.path.join(root_path, "mlsysim")):
|
||
root_path = os.path.abspath("../../")
|
||
package_path = os.path.join(root_path, "mlsysim")
|
||
init_file = os.path.join(package_path, "__init__.py")
|
||
spec = importlib.util.spec_from_file_location("mlsysim", init_file)
|
||
mlsysim_mod = importlib.util.module_from_spec(spec)
|
||
sys.modules["mlsysim"] = mlsysim_mod
|
||
spec.loader.exec_module(mlsysim_mod)
|
||
import mlsysim
|
||
```
|
||
|
||
```python
|
||
import mlsysim
|
||
from mlsysim import DistributedSolver
|
||
```
|
||
|
||
```{python}
|
||
from mlsysim import DistributedSolver
|
||
|
||
# Llama-3.1-70B: the model requires distributed training — too large for a single GPU
|
||
model = mlsysim.Models.Llama3_70B
|
||
|
||
# A research-scale cluster: 32 DGX H100 nodes × 8 GPUs = 256 H100s
|
||
# (DGX is NVIDIA's pre-built server containing 8 H100 GPUs connected via NVLink)
|
||
cluster = mlsysim.Systems.Clusters.Research_256
|
||
|
||
print(f"Model: {model.name} ({model.parameters.to('Gparam'):.0f} params)")
|
||
print(f"Cluster: {cluster.name}")
|
||
print(f" Nodes: {cluster.count} × {cluster.node.accelerators_per_node} GPUs/node")
|
||
print(f" Total: {cluster.total_accelerators} accelerators")
|
||
print(f" Fabric: {cluster.fabric.name} @ {cluster.fabric.bandwidth.to('GB/s'):.0f} GB/s/link")
|
||
```
|
||
|
||
---
|
||
|
||
## 2. Visualizing 3D Parallelism
|
||
|
||
Before working through the numbers, consider how 3D parallelism decomposes a training job across a cluster. Each dimension splits work differently and introduces a different type of overhead:
|
||
|
||
**Data Parallelism (DP=4)** — each GPU holds a full model copy and processes 1/4 of the batch. After the backward pass, gradients are synchronized via All-Reduce.
|
||
|
||
```{mermaid}
|
||
%%| fig-cap: "Data Parallelism: replicate the model, split the batch, synchronize gradients."
|
||
flowchart LR
|
||
R1["Replica 1<br/>Batch 1/4"] <-.->|"All-Reduce"| R2["Replica 2<br/>Batch 2/4"]
|
||
R2 <-.->|"All-Reduce"| R3["Replica 3<br/>Batch 3/4"]
|
||
R3 <-.->|"All-Reduce"| R4["Replica 4<br/>Batch 4/4"]
|
||
```
|
||
|
||
**Tensor Parallelism (TP=2)** — each layer is split across GPUs. Requires fast interconnect (NVLink).
|
||
|
||
```{mermaid}
|
||
%%| fig-cap: "Tensor Parallelism: split each layer across GPUs, communicate via NVLink."
|
||
flowchart LR
|
||
G1["GPU 0<br/>Left half of each layer"] <-->|"All-Gather<br/>(NVLink)"| G2["GPU 1<br/>Right half of each layer"]
|
||
```
|
||
|
||
**Pipeline Parallelism (PP=4)** — model layers are partitioned across stages. Activations flow forward; gradients flow backward.
|
||
|
||
```{mermaid}
|
||
%%| fig-cap: "Pipeline Parallelism: partition layers across stages, activations flow forward."
|
||
flowchart LR
|
||
S1["Stage 1<br/>Layers 1–20"] --> S2["Stage 2<br/>Layers 21–40"]
|
||
S2 --> S3["Stage 3<br/>Layers 41–60"]
|
||
S3 --> S4["Stage 4<br/>Layers 61–80"]
|
||
```
|
||
```
|
||
|
||
The key insight: **DP** uses inter-node bandwidth (network fabric), **TP** uses intra-node bandwidth (NVLink), and **PP** introduces idle time (pipeline bubbles). The optimal configuration balances all three overheads.
|
||
|
||
---
|
||
|
||
## 3. Baseline: Pure Data Parallelism
|
||
|
||
Start with the simplest configuration — no model splitting, just replicate the full model
|
||
on every GPU and split the batch. The per-GPU compute time is determined by the same
|
||
roofline model you used in the Hello World tutorial. The new element here is **communication
|
||
overhead**: after each training step, all GPUs must synchronize their gradients via the
|
||
network before the next step can begin.
|
||
|
||
```{python}
|
||
solver = DistributedSolver()
|
||
|
||
result_dp = solver.solve(
|
||
model=model,
|
||
fleet=cluster,
|
||
batch_size=256,
|
||
precision="fp16",
|
||
tp_size=1, # no tensor parallelism
|
||
pp_size=1, # no pipeline parallelism
|
||
)
|
||
|
||
node_perf = result_dp["node_performance"]
|
||
print(f"Single-GPU compute time: {node_perf.latency.to('ms'):.1f} ms/step")
|
||
print(f"DP all-reduce overhead: {result_dp['dp_communication_latency'].to('ms'):.2f} ms")
|
||
print(f"Pipeline bubble: {result_dp['pipeline_bubble_latency'].to('ms'):.2f} ms")
|
||
print(f"")
|
||
print(f"Total step latency: {result_dp['step_latency_total'].to('ms'):.1f} ms")
|
||
print(f"Scaling efficiency: {result_dp['scaling_efficiency']:.1%}")
|
||
print(f"Effective throughput: {result_dp['effective_throughput'].magnitude:.0f} samples/s")
|
||
print(f"Parallelism: DP={result_dp['parallelism']['dp']} TP={result_dp['parallelism']['tp']} PP={result_dp['parallelism']['pp']}")
|
||
```
|
||
|
||
::: {.callout-note}
|
||
## What does scaling efficiency mean?
|
||
|
||
If scaling efficiency is 80%, then your 256-GPU cluster is delivering the equivalent of
|
||
about 205 fully-utilized GPUs. The other ~51 GPUs worth of compute is being spent on
|
||
communication overhead. This is the **communication tax** of distributed training.
|
||
|
||
The tax is paid in **ring all-reduce**: after the backward pass, every GPU must synchronize
|
||
gradients with every other GPU. The time to do this grows with model size and shrinks with
|
||
network bandwidth.
|
||
:::
|
||
|
||
---
|
||
|
||
## 4. Ring All-Reduce: The Network Tax
|
||
|
||
The `DP all-reduce overhead` comes from the **ring all-reduce algorithm**, which is the
|
||
standard method for gradient synchronization. Its time depends on:
|
||
|
||
$$t_{\text{allreduce}} = 2 \times \frac{M \times (N-1)}{N \times B_{\text{eff}}}$$
|
||
|
||
Where $M$ is the message size (model gradient = 2× weights in fp16), $N$ is the number
|
||
of data-parallel replicas, and $B_{\text{eff}}$ is the effective inter-node bandwidth.
|
||
|
||
The following sweep shows how fabric bandwidth affects overhead:
|
||
|
||
```{python}
|
||
from mlsysim import Fleet, NetworkFabric, Systems
|
||
|
||
fabrics = [
|
||
("100GbE", Systems.Fabrics.Ethernet_100G),
|
||
("IB HDR", Systems.Fabrics.InfiniBand_HDR),
|
||
("IB NDR", Systems.Fabrics.InfiniBand_NDR),
|
||
]
|
||
|
||
print(f"{'Fabric':>10} {'BW (GB/s)':>10} {'Comm overhead':>14} {'Efficiency':>11}")
|
||
print("-" * 52)
|
||
|
||
for fab_name, fabric in fabrics:
|
||
custom_cluster = Fleet(
|
||
name="Custom",
|
||
node=Systems.Nodes.DGX_H100,
|
||
count=32,
|
||
fabric=fabric
|
||
)
|
||
r = solver.solve(
|
||
model=model,
|
||
fleet=custom_cluster,
|
||
batch_size=256,
|
||
precision="fp16"
|
||
)
|
||
print(
|
||
f"{fab_name:>10} "
|
||
f"{fabric.bandwidth.to('GB/s'):>10.0f~} "
|
||
f"{r['dp_communication_latency'].to('ms'):>14.2f~} "
|
||
f"{r['scaling_efficiency']:>11.1%}"
|
||
)
|
||
```
|
||
|
||
::: {.callout-warning}
|
||
## Fabric choice determines scaling efficiency
|
||
|
||
Upgrading from 100GbE to InfiniBand NDR roughly doubles the effective inter-node bandwidth.
|
||
On a model the size of Llama-70B (140 GB of gradients per step in fp16), that difference
|
||
is significant. For smaller models, it matters less — compute time dominates.
|
||
:::
|
||
|
||
---
|
||
|
||
## 5. Pipeline Parallelism and the Bubble
|
||
|
||
**Pipeline Parallelism** splits the model's layers across multiple nodes. Node 1 runs layers
|
||
1–20, node 2 runs layers 21–40, etc. This allows a much larger model to be trained than
|
||
fits on a single node.
|
||
|
||
The downside: a **pipeline bubble**. The first microbatch must flow through all stages before
|
||
the last stage can start processing the second microbatch. During that startup phase, most
|
||
GPUs are idle.
|
||
|
||
$$\text{Bubble fraction} = \frac{P - 1}{P - 1 + M}$$
|
||
|
||
Where $P$ is the pipeline depth (number of stages) and $M$ is the number of microbatches.
|
||
|
||
```{python}
|
||
print(f"{'PP stages':>10} {'Microbatches':>13} {'Bubble %':>9} {'Comm (ms)':>10} {'Efficiency':>11}")
|
||
print("-" * 60)
|
||
|
||
for pp_size in [1, 2, 4, 8]:
|
||
for m in [1, 4, 16]:
|
||
# Only show interesting combinations
|
||
if pp_size == 1 and m > 1:
|
||
continue
|
||
tp = min(8, cluster.total_accelerators // (pp_size * 4))
|
||
r = solver.solve(
|
||
model=model,
|
||
fleet=cluster,
|
||
batch_size=256,
|
||
precision="fp16",
|
||
tp_size=1,
|
||
pp_size=pp_size,
|
||
microbatch_count=m
|
||
)
|
||
bubble_pct = r["bubble_fraction"] * 100
|
||
print(
|
||
f"{pp_size:>10} "
|
||
f"{m:>13} "
|
||
f"{bubble_pct:>9.1f}% "
|
||
f"{r['pipeline_bubble_latency'].to('ms'):>10.1f~} "
|
||
f"{r['scaling_efficiency']:>11.1%}"
|
||
)
|
||
```
|
||
|
||
::: {.callout-tip}
|
||
## Recovering bubble efficiency
|
||
|
||
Increasing the number of **microbatches** ($M$) reduces the bubble fraction. With $M = 16$
|
||
and $P = 8$, the bubble is only $7/(7+16) ≈ 30\%$ of the pipeline, down from $88\%$ with
|
||
$M = 1$.
|
||
|
||
In practice, frameworks like Megatron-LM use **interleaved pipeline schedules** that further
|
||
reduce the bubble. But even with the standard 1F1B schedule, choosing $M \gg P$ is essential.
|
||
:::
|
||
|
||
---
|
||
|
||
## 6. Finding the Optimal Configuration
|
||
|
||
Now combine all three parallelism strategies and find the configuration that maximizes
|
||
scaling efficiency for the `Research_256` cluster. In practice, 70-80% scaling efficiency
|
||
on hundreds of GPUs is considered excellent. Below 50% typically signals a suboptimal
|
||
parallelism configuration or insufficient network bandwidth.
|
||
|
||
```{python}
|
||
configs = [
|
||
# (description, tp, pp, m)
|
||
("DP only", 1, 1, 1),
|
||
("DP + TP=2", 2, 1, 1),
|
||
("DP + PP=4, M=16", 1, 4, 16),
|
||
("DP + TP=2 + PP=4, M=16", 2, 4, 16),
|
||
("DP + TP=8 + PP=4, M=16", 8, 4, 16),
|
||
]
|
||
|
||
print(f"{'Config':<26} {'DP':>4} {'TP':>4} {'PP':>4} {'Efficiency':>11} {'Throughput':>14}")
|
||
print("-" * 72)
|
||
|
||
for desc, tp, pp, m in configs:
|
||
try:
|
||
r = solver.solve(
|
||
model=model,
|
||
fleet=cluster,
|
||
batch_size=256,
|
||
precision="fp16",
|
||
tp_size=tp,
|
||
pp_size=pp,
|
||
microbatch_count=m
|
||
)
|
||
print(
|
||
f"{desc:<26} "
|
||
f"{r['parallelism']['dp']:>4} "
|
||
f"{r['parallelism']['tp']:>4} "
|
||
f"{r['parallelism']['pp']:>4} "
|
||
f"{r['scaling_efficiency']:>11.1%} "
|
||
f"{r['effective_throughput'].magnitude:>14.1f}"
|
||
)
|
||
except ValueError as e:
|
||
print(f"{desc:<26} {'INFEASIBLE':>44} ({e})")
|
||
```
|
||
|
||
---
|
||
|
||
## Your Turn
|
||
|
||
::: {.callout-caution}
|
||
## Exercises
|
||
|
||
**Exercise 1: Predict before you observe.**
|
||
For a 256-GPU cluster training Llama-3.1-70B, predict: will DP=256, TP=1, PP=1 have higher or lower scaling efficiency than DP=32, TP=4, PP=2? Write your prediction and reasoning, then run both configurations. Were you right?
|
||
|
||
**Exercise 2: Find the optimal configuration.**
|
||
Sweep all valid 3D parallelism configurations for 256 GPUs (where DP x TP x PP = 256). Which configuration maximizes scaling efficiency? Is it the same for Ethernet 100G vs. InfiniBand NDR? (Hint: valid TP values are divisors of 8, the GPUs per node: 1, 2, 4, 8. For each TP, valid PP values are divisors of 256/TP.)
|
||
|
||
**Exercise 3: The microbatch lever.**
|
||
With PP=8, sweep microbatch count M from 1 to 64. Plot the pipeline bubble fraction vs. M. At what value of M does the bubble fraction drop below 10%? (Use the formula from Section 5: bubble = (P-1)/(P-1+M). Predict the answer analytically before running the sweep.)
|
||
|
||
**Self-check:** Why must tensor parallelism (TP) stay within a single node on most clusters? What would happen to communication overhead if TP crossed node boundaries?
|
||
:::
|
||
|
||
---
|
||
|
||
## What You Learned
|
||
|
||
- **3D Parallelism** decomposes the training problem across $\text{DP} \times \text{TP} \times \text{PP}$ GPUs,
|
||
each with distinct communication costs.
|
||
- **Ring all-reduce** is the network tax of data parallelism. It grows with model size and
|
||
shrinks with fabric bandwidth. Switching from 100GbE to InfiniBand can recover 10-30%
|
||
efficiency on large models.
|
||
- **Pipeline bubbles** waste GPU cycles proportional to $\frac{P-1}{P-1+M}$. Use large
|
||
microbatch counts ($M \gg P$) to minimize waste.
|
||
- **Scaling efficiency below 100%** is normal and unavoidable. A well-tuned job at 70-80%
|
||
efficiency on hundreds of GPUs is excellent. Below 50% signals a configuration problem.
|
||
|
||
---
|
||
|
||
## Next Steps
|
||
|
||
- **[LLM Serving Lab](llm_serving.qmd)**: After training, learn how to model the serving cost of the same model
|
||
- **[Math Foundations](../math.qmd)**: Full derivations for ring all-reduce, pipeline bubble, and MFU
|
||
- **[Fleet Zoo](../zoo/fleets.qmd)**: Browse the available cluster configurations and their network specs
|