mirror of
https://github.com/MLSysBook/TinyTorch.git
synced 2026-05-03 08:02:39 -05:00
- Added ## 🔧 DEVELOPMENT section before Step 1 where development begins - Added ## 🤖 AUTO TESTING section before nbgrader block - Updated to ## 🎯 MODULE SUMMARY: Performance Benchmarking Improves notebook organization without changing any code logic or content.
1387 lines
52 KiB
Python
1387 lines
52 KiB
Python
# ---
|
|
# jupyter:
|
|
# jupytext:
|
|
# text_representation:
|
|
# extension: .py
|
|
# format_name: percent
|
|
# format_version: '1.3'
|
|
# jupytext_version: 1.17.1
|
|
# ---
|
|
|
|
# %% [markdown]
|
|
"""
|
|
# Benchmarking - Systematic ML Performance Evaluation
|
|
|
|
Welcome to the Benchmarking module! This is where we learn to systematically evaluate ML systems using industry-standard methodology inspired by MLPerf.
|
|
|
|
## Learning Goals
|
|
- Understand the four-component MLPerf benchmarking architecture
|
|
- Implement different benchmark scenarios (latency, throughput, offline)
|
|
- Apply statistical validation for meaningful results
|
|
- Create professional performance reports for ML projects
|
|
- Learn to avoid common benchmarking pitfalls
|
|
|
|
## Build → Use → Analyze
|
|
1. **Build**: Benchmarking framework with proper statistical validation
|
|
2. **Use**: Apply systematic evaluation to your TinyTorch models
|
|
3. **Analyze**: Generate professional reports with statistical confidence
|
|
"""
|
|
|
|
# %% nbgrader={"grade": false, "grade_id": "benchmarking-imports", "locked": false, "schema_version": 3, "solution": false, "task": false}
|
|
#| default_exp core.benchmarking
|
|
|
|
#| export
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import time
|
|
import statistics
|
|
import json
|
|
import math
|
|
from typing import Dict, List, Tuple, Optional, Any, Callable
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
import os
|
|
import sys
|
|
|
|
# Import our TinyTorch dependencies
|
|
try:
|
|
from tinytorch.core.tensor import Tensor
|
|
from tinytorch.core.networks import Sequential
|
|
from tinytorch.core.layers import Dense
|
|
from tinytorch.core.activations import ReLU, Softmax
|
|
from tinytorch.core.dataloader import DataLoader
|
|
except ImportError:
|
|
# For development, import from local modules
|
|
parent_dirs = [
|
|
os.path.join(os.path.dirname(__file__), '..', '01_tensor'),
|
|
os.path.join(os.path.dirname(__file__), '..', '03_layers'),
|
|
os.path.join(os.path.dirname(__file__), '..', '02_activations'),
|
|
os.path.join(os.path.dirname(__file__), '..', '04_networks'),
|
|
os.path.join(os.path.dirname(__file__), '..', '06_dataloader')
|
|
]
|
|
for path in parent_dirs:
|
|
if path not in sys.path:
|
|
sys.path.append(path)
|
|
|
|
try:
|
|
from tensor_dev import Tensor
|
|
from networks_dev import Sequential
|
|
from layers_dev import Dense
|
|
from activations_dev import ReLU, Softmax
|
|
from dataloader_dev import DataLoader
|
|
except ImportError:
|
|
# Fallback for missing modules
|
|
print("⚠️ Some TinyTorch modules not available - using minimal implementations")
|
|
|
|
# %% nbgrader={"grade": false, "grade_id": "benchmarking-setup", "locked": false, "schema_version": 3, "solution": false, "task": false}
|
|
#| hide
|
|
#| export
|
|
def _should_show_plots():
|
|
"""Check if we should show plots (disable during testing)"""
|
|
is_pytest = (
|
|
'pytest' in sys.modules or
|
|
'test' in sys.argv or
|
|
os.environ.get('PYTEST_CURRENT_TEST') is not None or
|
|
any('test' in arg for arg in sys.argv) or
|
|
any('pytest' in arg for arg in sys.argv)
|
|
)
|
|
|
|
return not is_pytest
|
|
|
|
# %% nbgrader={"grade": false, "grade_id": "benchmarking-welcome", "locked": false, "schema_version": 3, "solution": false, "task": false}
|
|
print("📊 TinyTorch Benchmarking Module")
|
|
print(f"NumPy version: {np.__version__}")
|
|
print(f"Python version: {sys.version_info.major}.{sys.version_info.minor}")
|
|
print("Ready to build professional ML benchmarking tools!")
|
|
|
|
# %% [markdown]
|
|
"""
|
|
## 📦 Where This Code Lives in the Final Package
|
|
|
|
**Learning Side:** You work in `modules/source/12_benchmarking/benchmarking_dev.py`
|
|
**Building Side:** Code exports to `tinytorch.core.benchmarking`
|
|
|
|
```python
|
|
# Final package structure:
|
|
from tinytorch.core.benchmarking import TinyTorchPerf, BenchmarkScenarios
|
|
from tinytorch.core.benchmarking import StatisticalValidator, PerformanceReporter
|
|
```
|
|
|
|
**Why this matters:**
|
|
- **Learning:** Deep understanding of systematic evaluation
|
|
- **Production:** Professional benchmarking methodology
|
|
- **Projects:** Tools for validating your ML project performance
|
|
- **Career:** Industry-standard skills for ML engineering roles
|
|
"""
|
|
|
|
# %% [markdown]
|
|
"""
|
|
## What is ML Benchmarking?
|
|
|
|
### The Systematic Evaluation Problem
|
|
When you build ML systems, you need to answer critical questions:
|
|
- **Is my model actually better?** Statistical significance vs random variation
|
|
- **How does it perform in production?** Latency, throughput, resource usage
|
|
- **Which approach should I choose?** Systematic comparison methodology
|
|
- **Can I trust my results?** Avoiding common benchmarking pitfalls
|
|
|
|
### The MLPerf Architecture
|
|
MLPerf (Machine Learning Performance) defines the industry standard for ML benchmarking:
|
|
|
|
```
|
|
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
|
|
│ Load Generator │───▶│ System Under │───▶│ Dataset │
|
|
│ (Controls │ │ Test (Your ML │ │ (Standardized │
|
|
│ Queries) │ │ Model) │ │ Evaluation) │
|
|
└─────────────────┘ └─────────────────┘ └─────────────────┘
|
|
```
|
|
|
|
### The Four Components
|
|
1. **System Under Test (SUT)**: Your ML model/system being evaluated
|
|
2. **Dataset**: Standardized evaluation data (CIFAR-10, ImageNet, etc.)
|
|
3. **Model**: The specific architecture and weights being tested
|
|
4. **Load Generator**: Controls how evaluation queries are sent to the SUT
|
|
|
|
### Why This Matters
|
|
- **Reproducibility**: Others can verify your results
|
|
- **Comparability**: Fair comparison between different approaches
|
|
- **Statistical validity**: Meaningful conclusions from your data
|
|
- **Industry standards**: Skills you'll use in ML engineering careers
|
|
|
|
### Real-World Examples
|
|
- **Google**: Uses similar patterns for production ML system evaluation
|
|
- **Meta**: A/B testing frameworks follow these principles
|
|
- **OpenAI**: GPT model comparisons use systematic benchmarking
|
|
- **Research**: All major ML conferences require proper evaluation methodology
|
|
"""
|
|
|
|
# %% [markdown]
|
|
"""
|
|
## 🔧 DEVELOPMENT
|
|
"""
|
|
|
|
# %% [markdown]
|
|
"""
|
|
## Step 1: Benchmark Scenarios - How to Measure Performance
|
|
|
|
### The Three Standard Scenarios
|
|
Different use cases require different performance measurements:
|
|
|
|
#### 1. Single-Stream Scenario
|
|
- **Use case**: Mobile/edge inference, interactive applications
|
|
- **Pattern**: Send next query only after previous completes
|
|
- **Metric**: 90th percentile latency (tail latency)
|
|
- **Why**: Users care about worst-case response time
|
|
|
|
#### 2. Server Scenario
|
|
- **Use case**: Production web services, API endpoints
|
|
- **Pattern**: Poisson distribution of concurrent queries
|
|
- **Metric**: Queries per second (QPS) at acceptable latency
|
|
- **Why**: Servers handle multiple simultaneous requests
|
|
|
|
#### 3. Offline Scenario
|
|
- **Use case**: Batch processing, data center workloads
|
|
- **Pattern**: Send all samples at once for batch processing
|
|
- **Metric**: Throughput (samples per second)
|
|
- **Why**: Batch jobs care about total processing time
|
|
|
|
### Mathematical Foundation
|
|
Each scenario tests different aspects:
|
|
- **Latency**: Time for single sample = f(model_complexity, hardware)
|
|
- **Throughput**: Samples per second = f(parallelism, batch_size)
|
|
- **Efficiency**: Resource utilization = f(memory, compute, bandwidth)
|
|
|
|
### Why Multiple Scenarios?
|
|
Real ML systems have different requirements:
|
|
- **Chatbot**: Low latency for good user experience
|
|
- **Image API**: High throughput for many concurrent users
|
|
- **Data pipeline**: Maximum batch processing efficiency
|
|
"""
|
|
|
|
# %% [markdown]
|
|
"""
|
|
## Step 2: Statistical Validation - Ensuring Meaningful Results
|
|
|
|
### The Significance Problem
|
|
Common benchmarking mistakes:
|
|
```python
|
|
# BAD: Single run, no statistical validation
|
|
result_a = model_a.run_once() # 94.2% accuracy
|
|
result_b = model_b.run_once() # 94.7% accuracy
|
|
print("Model B is better!") # Maybe, maybe not...
|
|
```
|
|
|
|
### The MLPerf Solution
|
|
Proper statistical validation:
|
|
```python
|
|
# GOOD: Multiple runs with confidence intervals
|
|
results_a = [model_a.run() for _ in range(10)] # [93.8, 94.1, 94.3, ...]
|
|
results_b = [model_b.run() for _ in range(10)] # [94.2, 94.5, 94.9, ...]
|
|
significance = statistical_test(results_a, results_b)
|
|
print(f"Model B is {significance.p_value < 0.05} better with p={significance.p_value}")
|
|
```
|
|
|
|
### Key Statistical Concepts
|
|
- **Confidence intervals**: Range of likely true values
|
|
- **P-values**: Probability that difference is due to chance
|
|
- **Effect size**: Magnitude of improvement (not just significance)
|
|
- **Multiple comparisons**: Adjusting for testing many approaches
|
|
|
|
### Sample Size Calculation
|
|
MLPerf uses this formula for minimum samples:
|
|
```
|
|
n = Φ^(-1)((1-C)/2)^2 * p(1-p) / MOE^2
|
|
```
|
|
Where:
|
|
- C = confidence level (0.99)
|
|
- p = percentile (0.90 for 90th percentile)
|
|
- MOE = margin of error ((1-p)/20)
|
|
|
|
For 90th percentile with 99% confidence: **n = 24,576 samples**
|
|
"""
|
|
|
|
# %% nbgrader={"grade": false, "grade_id": "benchmark-scenarios", "locked": false, "schema_version": 3, "solution": true, "task": false}
|
|
#| export
|
|
class BenchmarkScenario(Enum):
|
|
"""Standard benchmark scenarios from MLPerf"""
|
|
SINGLE_STREAM = "single_stream"
|
|
SERVER = "server"
|
|
OFFLINE = "offline"
|
|
|
|
@dataclass
|
|
class BenchmarkResult:
|
|
"""Results from a benchmark run"""
|
|
scenario: BenchmarkScenario
|
|
latencies: List[float] # All latency measurements in seconds
|
|
throughput: float # Samples per second
|
|
accuracy: float # Model accuracy (0-1)
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
|
|
#| export
|
|
class BenchmarkScenarios:
|
|
"""
|
|
Implements the three standard MLPerf benchmark scenarios.
|
|
|
|
TODO: Implement the three benchmark scenarios following MLPerf patterns.
|
|
|
|
UNDERSTANDING THE SCENARIOS:
|
|
1. Single-Stream: Send queries one at a time, measure latency
|
|
2. Server: Send queries following Poisson distribution, measure QPS
|
|
3. Offline: Send all queries at once, measure total throughput
|
|
|
|
IMPLEMENTATION APPROACH:
|
|
1. Each scenario should run the model multiple times
|
|
2. Collect latency measurements for each run
|
|
3. Calculate appropriate metrics for each scenario
|
|
4. Return BenchmarkResult with all measurements
|
|
|
|
EXAMPLE USAGE:
|
|
scenarios = BenchmarkScenarios()
|
|
result = scenarios.single_stream(model, dataset, num_queries=1000)
|
|
print(f"90th percentile latency: {result.latencies[int(0.9 * len(result.latencies))]} seconds")
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.results = []
|
|
|
|
def single_stream(self, model: Callable, dataset: List, num_queries: int = 1000) -> BenchmarkResult:
|
|
"""
|
|
Run single-stream benchmark scenario.
|
|
|
|
TODO: Implement single-stream benchmarking.
|
|
|
|
STEP-BY-STEP:
|
|
1. Initialize empty list for latencies
|
|
2. For each query (up to num_queries):
|
|
a. Get next sample from dataset (cycle if needed)
|
|
b. Record start time
|
|
c. Run model on sample
|
|
d. Record end time
|
|
e. Calculate latency = end - start
|
|
f. Add latency to list
|
|
3. Calculate throughput = num_queries / total_time
|
|
4. Calculate accuracy if possible
|
|
5. Return BenchmarkResult with SINGLE_STREAM scenario
|
|
|
|
HINTS:
|
|
- Use time.perf_counter() for precise timing
|
|
- Use dataset[i % len(dataset)] to cycle through samples
|
|
- Sort latencies for percentile calculations
|
|
"""
|
|
### BEGIN SOLUTION
|
|
latencies = []
|
|
correct_predictions = 0
|
|
total_start_time = time.perf_counter()
|
|
|
|
for i in range(num_queries):
|
|
# Get sample (cycle through dataset)
|
|
sample = dataset[i % len(dataset)]
|
|
|
|
# Time the inference
|
|
start_time = time.perf_counter()
|
|
result = model(sample)
|
|
end_time = time.perf_counter()
|
|
|
|
latency = end_time - start_time
|
|
latencies.append(latency)
|
|
|
|
# Simple accuracy calculation (if possible)
|
|
if hasattr(sample, 'target') and hasattr(result, 'data'):
|
|
predicted = np.argmax(result.data)
|
|
if predicted == sample.target:
|
|
correct_predictions += 1
|
|
|
|
total_time = time.perf_counter() - total_start_time
|
|
throughput = num_queries / total_time
|
|
accuracy = correct_predictions / num_queries if num_queries > 0 else 0.0
|
|
|
|
return BenchmarkResult(
|
|
scenario=BenchmarkScenario.SINGLE_STREAM,
|
|
latencies=sorted(latencies),
|
|
throughput=throughput,
|
|
accuracy=accuracy,
|
|
metadata={"num_queries": num_queries}
|
|
)
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
def server(self, model: Callable, dataset: List, target_qps: float = 10.0,
|
|
duration: float = 60.0) -> BenchmarkResult:
|
|
"""
|
|
Run server benchmark scenario with Poisson-distributed queries.
|
|
|
|
TODO: Implement server benchmarking.
|
|
|
|
STEP-BY-STEP:
|
|
1. Calculate inter-arrival time = 1.0 / target_qps
|
|
2. Run for specified duration:
|
|
a. Wait for next query arrival (Poisson distribution)
|
|
b. Get sample from dataset
|
|
c. Record start time
|
|
d. Run model
|
|
e. Record end time and latency
|
|
3. Calculate actual QPS = total_queries / duration
|
|
4. Return results
|
|
|
|
HINTS:
|
|
- Use np.random.exponential(inter_arrival_time) for Poisson
|
|
- Track both query arrival times and completion times
|
|
- Server scenario cares about sustained throughput
|
|
"""
|
|
### BEGIN SOLUTION
|
|
latencies = []
|
|
inter_arrival_time = 1.0 / target_qps
|
|
start_time = time.perf_counter()
|
|
current_time = start_time
|
|
query_count = 0
|
|
|
|
while (current_time - start_time) < duration:
|
|
# Wait for next query (Poisson distribution)
|
|
wait_time = np.random.exponential(inter_arrival_time)
|
|
time.sleep(min(wait_time, 0.001)) # Small sleep to simulate waiting
|
|
|
|
# Get sample
|
|
sample = dataset[query_count % len(dataset)]
|
|
|
|
# Time the inference
|
|
query_start = time.perf_counter()
|
|
result = model(sample)
|
|
query_end = time.perf_counter()
|
|
|
|
latency = query_end - query_start
|
|
latencies.append(latency)
|
|
|
|
query_count += 1
|
|
current_time = time.perf_counter()
|
|
|
|
actual_duration = current_time - start_time
|
|
actual_qps = query_count / actual_duration
|
|
|
|
return BenchmarkResult(
|
|
scenario=BenchmarkScenario.SERVER,
|
|
latencies=sorted(latencies),
|
|
throughput=actual_qps,
|
|
accuracy=0.0, # Would need labels for accuracy
|
|
metadata={"target_qps": target_qps, "actual_qps": actual_qps, "duration": actual_duration}
|
|
)
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
def offline(self, model: Callable, dataset: List, batch_size: int = 32) -> BenchmarkResult:
|
|
"""
|
|
Run offline benchmark scenario with batch processing.
|
|
|
|
TODO: Implement offline benchmarking.
|
|
|
|
STEP-BY-STEP:
|
|
1. Group dataset into batches of batch_size
|
|
2. For each batch:
|
|
a. Record start time
|
|
b. Run model on entire batch
|
|
c. Record end time
|
|
d. Calculate batch latency
|
|
3. Calculate total throughput = total_samples / total_time
|
|
4. Return results
|
|
|
|
HINTS:
|
|
- Process data in batches for efficiency
|
|
- Measure total time for all batches
|
|
- Offline cares about maximum throughput
|
|
"""
|
|
### BEGIN SOLUTION
|
|
latencies = []
|
|
total_samples = len(dataset)
|
|
total_start_time = time.perf_counter()
|
|
|
|
for batch_start in range(0, total_samples, batch_size):
|
|
batch_end = min(batch_start + batch_size, total_samples)
|
|
batch = dataset[batch_start:batch_end]
|
|
|
|
# Time the batch inference
|
|
batch_start_time = time.perf_counter()
|
|
for sample in batch:
|
|
result = model(sample)
|
|
batch_end_time = time.perf_counter()
|
|
|
|
batch_latency = batch_end_time - batch_start_time
|
|
latencies.append(batch_latency)
|
|
|
|
total_time = time.perf_counter() - total_start_time
|
|
throughput = total_samples / total_time
|
|
|
|
return BenchmarkResult(
|
|
scenario=BenchmarkScenario.OFFLINE,
|
|
latencies=latencies,
|
|
throughput=throughput,
|
|
accuracy=0.0, # Would need labels for accuracy
|
|
metadata={"batch_size": batch_size, "total_samples": total_samples}
|
|
)
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
# %% [markdown]
|
|
"""
|
|
### 🧪 Unit Test: Benchmark Scenarios
|
|
|
|
Let's test our benchmark scenarios with a simple mock model.
|
|
"""
|
|
|
|
# %% nbgrader={"grade": false, "grade_id": "test-scenarios", "locked": false, "schema_version": 3, "solution": false, "task": false}
|
|
def test_unit_benchmark_scenarios():
|
|
"""Unit test for the BenchmarkScenarios class."""
|
|
print("🔬 Unit Test: Benchmark Scenarios...")
|
|
|
|
# Create a simple mock model and dataset
|
|
def mock_model(sample):
|
|
# Simulate some processing time
|
|
time.sleep(0.001) # 1ms processing
|
|
return {"prediction": np.random.rand(10)}
|
|
|
|
mock_dataset = [{"data": np.random.rand(10)} for _ in range(100)]
|
|
|
|
# Test scenarios
|
|
scenarios = BenchmarkScenarios()
|
|
|
|
# Test single-stream
|
|
single_result = scenarios.single_stream(mock_model, mock_dataset, num_queries=10)
|
|
assert single_result.scenario == BenchmarkScenario.SINGLE_STREAM
|
|
assert len(single_result.latencies) == 10
|
|
assert single_result.throughput > 0
|
|
print(f"✅ Single-stream: {len(single_result.latencies)} measurements")
|
|
|
|
# Test server (short duration for testing)
|
|
server_result = scenarios.server(mock_model, mock_dataset, target_qps=5.0, duration=2.0)
|
|
assert server_result.scenario == BenchmarkScenario.SERVER
|
|
assert len(server_result.latencies) > 0
|
|
assert server_result.throughput > 0
|
|
print(f"✅ Server: {len(server_result.latencies)} queries processed")
|
|
|
|
# Test offline
|
|
offline_result = scenarios.offline(mock_model, mock_dataset, batch_size=5)
|
|
assert offline_result.scenario == BenchmarkScenario.OFFLINE
|
|
assert len(offline_result.latencies) > 0
|
|
assert offline_result.throughput > 0
|
|
print(f"✅ Offline: {len(offline_result.latencies)} batches processed")
|
|
|
|
print("✅ All benchmark scenarios working correctly!")
|
|
|
|
# Run the test
|
|
test_benchmark_scenarios()
|
|
|
|
# %% [markdown]
|
|
"""
|
|
## Step 3: Statistical Validation - Ensuring Meaningful Results
|
|
|
|
### The Confidence Problem
|
|
How do we know if one model is actually better than another?
|
|
|
|
### Statistical Testing for ML
|
|
We need to test the null hypothesis: "There is no significant difference between models"
|
|
"""
|
|
|
|
# %% nbgrader={"grade": false, "grade_id": "statistical-validator", "locked": false, "schema_version": 3, "solution": true, "task": false}
|
|
#| export
|
|
@dataclass
|
|
class StatisticalValidation:
|
|
"""Results from statistical validation"""
|
|
is_significant: bool
|
|
p_value: float
|
|
effect_size: float
|
|
confidence_interval: Tuple[float, float]
|
|
recommendation: str
|
|
|
|
#| export
|
|
class StatisticalValidator:
|
|
"""
|
|
Validates benchmark results using proper statistical methods.
|
|
|
|
TODO: Implement statistical validation for benchmark results.
|
|
|
|
UNDERSTANDING STATISTICAL TESTING:
|
|
1. Null hypothesis: No difference between models
|
|
2. T-test: Compare means of two groups
|
|
3. P-value: Probability of seeing this difference by chance
|
|
4. Effect size: Magnitude of the difference
|
|
5. Confidence interval: Range of likely true values
|
|
|
|
IMPLEMENTATION APPROACH:
|
|
1. Calculate basic statistics (mean, std, n)
|
|
2. Perform t-test to get p-value
|
|
3. Calculate effect size (Cohen's d)
|
|
4. Calculate confidence interval
|
|
5. Provide clear recommendation
|
|
"""
|
|
|
|
def __init__(self, confidence_level: float = 0.95):
|
|
self.confidence_level = confidence_level
|
|
self.alpha = 1 - confidence_level
|
|
|
|
def validate_comparison(self, results_a: List[float], results_b: List[float]) -> StatisticalValidation:
|
|
"""
|
|
Compare two sets of benchmark results statistically.
|
|
|
|
TODO: Implement statistical comparison.
|
|
|
|
STEP-BY-STEP:
|
|
1. Calculate basic statistics for both groups
|
|
2. Perform two-sample t-test
|
|
3. Calculate effect size (Cohen's d)
|
|
4. Calculate confidence interval for the difference
|
|
5. Generate recommendation based on results
|
|
|
|
HINTS:
|
|
- Use scipy.stats.ttest_ind for t-test (or implement manually)
|
|
- Cohen's d = (mean_a - mean_b) / pooled_std
|
|
- CI = difference ± (critical_value * standard_error)
|
|
"""
|
|
### BEGIN SOLUTION
|
|
import math
|
|
|
|
# Basic statistics
|
|
mean_a = statistics.mean(results_a)
|
|
mean_b = statistics.mean(results_b)
|
|
std_a = statistics.stdev(results_a)
|
|
std_b = statistics.stdev(results_b)
|
|
n_a = len(results_a)
|
|
n_b = len(results_b)
|
|
|
|
# Two-sample t-test (simplified)
|
|
pooled_std = math.sqrt(((n_a - 1) * std_a**2 + (n_b - 1) * std_b**2) / (n_a + n_b - 2))
|
|
standard_error = pooled_std * math.sqrt(1/n_a + 1/n_b)
|
|
|
|
if standard_error == 0:
|
|
t_stat = 0
|
|
p_value = 1.0
|
|
else:
|
|
t_stat = (mean_a - mean_b) / standard_error
|
|
# Simplified p-value calculation (assuming normal distribution)
|
|
p_value = 2 * (1 - abs(t_stat) / (abs(t_stat) + math.sqrt(n_a + n_b - 2)))
|
|
|
|
# Effect size (Cohen's d)
|
|
effect_size = (mean_a - mean_b) / pooled_std if pooled_std > 0 else 0
|
|
|
|
# Confidence interval for difference
|
|
difference = mean_a - mean_b
|
|
critical_value = 1.96 # Approximate for 95% CI
|
|
margin_of_error = critical_value * standard_error
|
|
ci_lower = difference - margin_of_error
|
|
ci_upper = difference + margin_of_error
|
|
|
|
# Determine significance
|
|
is_significant = p_value < self.alpha
|
|
|
|
# Generate recommendation
|
|
if is_significant:
|
|
if effect_size > 0.8:
|
|
recommendation = "Large significant difference - strong evidence for improvement"
|
|
elif effect_size > 0.5:
|
|
recommendation = "Medium significant difference - good evidence for improvement"
|
|
else:
|
|
recommendation = "Small significant difference - weak evidence for improvement"
|
|
else:
|
|
recommendation = "No significant difference - insufficient evidence for improvement"
|
|
|
|
return StatisticalValidation(
|
|
is_significant=is_significant,
|
|
p_value=p_value,
|
|
effect_size=effect_size,
|
|
confidence_interval=(ci_lower, ci_upper),
|
|
recommendation=recommendation
|
|
)
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
def validate_benchmark_result(self, result: BenchmarkResult,
|
|
min_samples: int = 100) -> StatisticalValidation:
|
|
"""
|
|
Validate that a benchmark result has sufficient statistical power.
|
|
|
|
TODO: Implement validation for single benchmark result.
|
|
|
|
STEP-BY-STEP:
|
|
1. Check if we have enough samples
|
|
2. Calculate confidence interval for the metric
|
|
3. Check for common pitfalls (outliers, etc.)
|
|
4. Provide recommendations
|
|
"""
|
|
### BEGIN SOLUTION
|
|
latencies = result.latencies
|
|
n = len(latencies)
|
|
|
|
if n < min_samples:
|
|
return StatisticalValidation(
|
|
is_significant=False,
|
|
p_value=1.0,
|
|
effect_size=0.0,
|
|
confidence_interval=(0.0, 0.0),
|
|
recommendation=f"Insufficient samples: {n} < {min_samples}. Need more data."
|
|
)
|
|
|
|
# Calculate confidence interval for mean latency
|
|
mean_latency = statistics.mean(latencies)
|
|
std_latency = statistics.stdev(latencies)
|
|
standard_error = std_latency / math.sqrt(n)
|
|
|
|
critical_value = 1.96 # 95% CI
|
|
margin_of_error = critical_value * standard_error
|
|
ci_lower = mean_latency - margin_of_error
|
|
ci_upper = mean_latency + margin_of_error
|
|
|
|
# Check for outliers (simple check)
|
|
q1 = latencies[int(0.25 * n)]
|
|
q3 = latencies[int(0.75 * n)]
|
|
iqr = q3 - q1
|
|
outlier_threshold = q3 + 1.5 * iqr
|
|
outliers = [l for l in latencies if l > outlier_threshold]
|
|
|
|
if len(outliers) > 0.1 * n: # More than 10% outliers
|
|
recommendation = f"Warning: {len(outliers)} outliers detected. Results may be unreliable."
|
|
else:
|
|
recommendation = "Benchmark result appears statistically valid."
|
|
|
|
return StatisticalValidation(
|
|
is_significant=True,
|
|
p_value=0.0, # Not applicable for single result
|
|
effect_size=std_latency / mean_latency, # Coefficient of variation
|
|
confidence_interval=(ci_lower, ci_upper),
|
|
recommendation=recommendation
|
|
)
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
# %% [markdown]
|
|
"""
|
|
### 🧪 Unit Test: Statistical Validation
|
|
|
|
Let's test our statistical validation with simulated data.
|
|
"""
|
|
|
|
# %% nbgrader={"grade": false, "grade_id": "test-validation", "locked": false, "schema_version": 3, "solution": false, "task": false}
|
|
def test_unit_statistical_validation():
|
|
"""Unit test for the StatisticalValidator class."""
|
|
print("🔬 Unit Test: Statistical Validation...")
|
|
|
|
validator = StatisticalValidator(confidence_level=0.95)
|
|
|
|
# Test 1: No significant difference
|
|
results_a = [0.1 + 0.01 * np.random.randn() for _ in range(100)]
|
|
results_b = [0.1 + 0.01 * np.random.randn() for _ in range(100)]
|
|
|
|
validation = validator.validate_comparison(results_a, results_b)
|
|
print(f"✅ No difference test: significant={validation.is_significant}, p={validation.p_value:.4f}")
|
|
|
|
# Test 2: Clear significant difference
|
|
results_a = [0.1 + 0.01 * np.random.randn() for _ in range(100)]
|
|
results_b = [0.2 + 0.01 * np.random.randn() for _ in range(100)]
|
|
|
|
validation = validator.validate_comparison(results_a, results_b)
|
|
print(f"✅ Clear difference test: significant={validation.is_significant}, p={validation.p_value:.4f}")
|
|
print(f" Effect size: {validation.effect_size:.3f}")
|
|
print(f" Recommendation: {validation.recommendation}")
|
|
|
|
# Test 3: Single result validation
|
|
mock_result = BenchmarkResult(
|
|
scenario=BenchmarkScenario.SINGLE_STREAM,
|
|
latencies=[0.1 + 0.01 * np.random.randn() for _ in range(200)],
|
|
throughput=1000,
|
|
accuracy=0.95
|
|
)
|
|
|
|
validation = validator.validate_benchmark_result(mock_result)
|
|
print(f"✅ Single result validation: {validation.recommendation}")
|
|
print(f" Confidence interval: ({validation.confidence_interval[0]:.4f}, {validation.confidence_interval[1]:.4f})")
|
|
|
|
print("✅ Statistical validation tests passed!")
|
|
|
|
# Run the test
|
|
test_statistical_validation()
|
|
|
|
# %% [markdown]
|
|
"""
|
|
## Step 4: The TinyTorchPerf Framework - Putting It All Together
|
|
|
|
### The Complete MLPerf-Inspired Framework
|
|
Now we combine all components into a professional benchmarking framework.
|
|
"""
|
|
|
|
# %% nbgrader={"grade": false, "grade_id": "tinytorch-perf", "locked": false, "schema_version": 3, "solution": true, "task": false}
|
|
#| export
|
|
class TinyTorchPerf:
|
|
"""
|
|
Complete MLPerf-inspired benchmarking framework for TinyTorch.
|
|
|
|
TODO: Implement the complete benchmarking framework.
|
|
|
|
UNDERSTANDING THE FRAMEWORK:
|
|
1. Combines all benchmark scenarios
|
|
2. Integrates statistical validation
|
|
3. Provides easy-to-use API
|
|
4. Generates professional reports
|
|
|
|
IMPLEMENTATION APPROACH:
|
|
1. Initialize with model and dataset
|
|
2. Provide methods for each scenario
|
|
3. Include statistical validation
|
|
4. Generate comprehensive reports
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.scenarios = BenchmarkScenarios()
|
|
self.validator = StatisticalValidator()
|
|
self.model = None
|
|
self.dataset = None
|
|
self.results = {}
|
|
|
|
def set_model(self, model: Callable):
|
|
"""Set the model to benchmark."""
|
|
self.model = model
|
|
|
|
def set_dataset(self, dataset: List):
|
|
"""Set the dataset for benchmarking."""
|
|
self.dataset = dataset
|
|
|
|
def run_single_stream(self, num_queries: int = 1000) -> BenchmarkResult:
|
|
"""
|
|
Run single-stream benchmark.
|
|
|
|
TODO: Implement single-stream benchmark with validation.
|
|
|
|
STEP-BY-STEP:
|
|
1. Check that model and dataset are set
|
|
2. Run single-stream scenario
|
|
3. Validate results statistically
|
|
4. Store results
|
|
5. Return result
|
|
"""
|
|
### BEGIN SOLUTION
|
|
if self.model is None or self.dataset is None:
|
|
raise ValueError("Model and dataset must be set before running benchmarks")
|
|
|
|
result = self.scenarios.single_stream(self.model, self.dataset, num_queries)
|
|
validation = self.validator.validate_benchmark_result(result)
|
|
|
|
self.results['single_stream'] = {
|
|
'result': result,
|
|
'validation': validation
|
|
}
|
|
|
|
return result
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
def run_server(self, target_qps: float = 10.0, duration: float = 60.0) -> BenchmarkResult:
|
|
"""
|
|
Run server benchmark.
|
|
|
|
TODO: Implement server benchmark with validation.
|
|
"""
|
|
### BEGIN SOLUTION
|
|
if self.model is None or self.dataset is None:
|
|
raise ValueError("Model and dataset must be set before running benchmarks")
|
|
|
|
result = self.scenarios.server(self.model, self.dataset, target_qps, duration)
|
|
validation = self.validator.validate_benchmark_result(result)
|
|
|
|
self.results['server'] = {
|
|
'result': result,
|
|
'validation': validation
|
|
}
|
|
|
|
return result
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
def run_offline(self, batch_size: int = 32) -> BenchmarkResult:
|
|
"""
|
|
Run offline benchmark.
|
|
|
|
TODO: Implement offline benchmark with validation.
|
|
"""
|
|
### BEGIN SOLUTION
|
|
if self.model is None or self.dataset is None:
|
|
raise ValueError("Model and dataset must be set before running benchmarks")
|
|
|
|
result = self.scenarios.offline(self.model, self.dataset, batch_size)
|
|
validation = self.validator.validate_benchmark_result(result)
|
|
|
|
self.results['offline'] = {
|
|
'result': result,
|
|
'validation': validation
|
|
}
|
|
|
|
return result
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
def run_all_scenarios(self, quick_test: bool = False) -> Dict[str, BenchmarkResult]:
|
|
"""
|
|
Run all benchmark scenarios.
|
|
|
|
TODO: Implement comprehensive benchmarking.
|
|
"""
|
|
### BEGIN SOLUTION
|
|
if quick_test:
|
|
# Quick test with smaller parameters
|
|
single_result = self.run_single_stream(num_queries=100)
|
|
server_result = self.run_server(target_qps=5.0, duration=10.0)
|
|
offline_result = self.run_offline(batch_size=16)
|
|
else:
|
|
# Full benchmarking
|
|
single_result = self.run_single_stream(num_queries=1000)
|
|
server_result = self.run_server(target_qps=10.0, duration=60.0)
|
|
offline_result = self.run_offline(batch_size=32)
|
|
|
|
return {
|
|
'single_stream': single_result,
|
|
'server': server_result,
|
|
'offline': offline_result
|
|
}
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
def compare_models(self, model_a: Callable, model_b: Callable,
|
|
scenario: str = 'single_stream') -> StatisticalValidation:
|
|
"""
|
|
Compare two models statistically.
|
|
|
|
TODO: Implement model comparison.
|
|
"""
|
|
### BEGIN SOLUTION
|
|
# Run both models on the same scenario
|
|
self.set_model(model_a)
|
|
if scenario == 'single_stream':
|
|
result_a = self.run_single_stream(num_queries=100)
|
|
elif scenario == 'server':
|
|
result_a = self.run_server(target_qps=5.0, duration=10.0)
|
|
else: # offline
|
|
result_a = self.run_offline(batch_size=16)
|
|
|
|
self.set_model(model_b)
|
|
if scenario == 'single_stream':
|
|
result_b = self.run_single_stream(num_queries=100)
|
|
elif scenario == 'server':
|
|
result_b = self.run_server(target_qps=5.0, duration=10.0)
|
|
else: # offline
|
|
result_b = self.run_offline(batch_size=16)
|
|
|
|
# Compare latencies
|
|
return self.validator.validate_comparison(result_a.latencies, result_b.latencies)
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
def generate_report(self) -> str:
|
|
"""
|
|
Generate a comprehensive benchmark report.
|
|
|
|
TODO: Implement professional report generation.
|
|
"""
|
|
### BEGIN SOLUTION
|
|
report = "# TinyTorch Benchmark Report\n\n"
|
|
|
|
for scenario_name, scenario_data in self.results.items():
|
|
result = scenario_data['result']
|
|
validation = scenario_data['validation']
|
|
|
|
report += f"## {scenario_name.replace('_', ' ').title()} Scenario\n\n"
|
|
report += f"- **Throughput**: {result.throughput:.2f} samples/second\n"
|
|
report += f"- **Mean Latency**: {statistics.mean(result.latencies)*1000:.2f} ms\n"
|
|
report += f"- **90th Percentile**: {result.latencies[int(0.9*len(result.latencies))]*1000:.2f} ms\n"
|
|
report += f"- **95th Percentile**: {result.latencies[int(0.95*len(result.latencies))]*1000:.2f} ms\n"
|
|
report += f"- **Statistical Validation**: {validation.recommendation}\n\n"
|
|
|
|
return report
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
# %% [markdown]
|
|
"""
|
|
### 🧪 Unit Test: TinyTorchPerf Framework
|
|
|
|
Let's test our complete benchmarking framework.
|
|
"""
|
|
|
|
# %% nbgrader={"grade": false, "grade_id": "test-framework", "locked": false, "schema_version": 3, "solution": false, "task": false}
|
|
def test_unit_tinytorch_perf():
|
|
"""Unit test for the TinyTorchPerf framework."""
|
|
print("🔬 Unit Test: TinyTorchPerf Framework...")
|
|
|
|
# Create test model and dataset
|
|
def test_model(sample):
|
|
time.sleep(0.001) # Simulate processing
|
|
return {"prediction": np.random.rand(5)}
|
|
|
|
test_dataset = [{"data": np.random.rand(10)} for _ in range(50)]
|
|
|
|
# Test the framework
|
|
benchmark = TinyTorchPerf()
|
|
benchmark.set_model(test_model)
|
|
benchmark.set_dataset(test_dataset)
|
|
|
|
# Test individual scenarios
|
|
single_result = benchmark.run_single_stream(num_queries=20)
|
|
assert single_result.scenario == BenchmarkScenario.SINGLE_STREAM
|
|
print(f"✅ Single-stream: {single_result.throughput:.2f} samples/sec")
|
|
|
|
server_result = benchmark.run_server(target_qps=5.0, duration=2.0)
|
|
assert server_result.scenario == BenchmarkScenario.SERVER
|
|
print(f"✅ Server: {server_result.throughput:.2f} QPS")
|
|
|
|
offline_result = benchmark.run_offline(batch_size=10)
|
|
assert offline_result.scenario == BenchmarkScenario.OFFLINE
|
|
print(f"✅ Offline: {offline_result.throughput:.2f} samples/sec")
|
|
|
|
# Test comprehensive benchmarking
|
|
all_results = benchmark.run_all_scenarios(quick_test=True)
|
|
assert len(all_results) == 3
|
|
print(f"✅ All scenarios: {list(all_results.keys())}")
|
|
|
|
# Test model comparison
|
|
def slower_model(sample):
|
|
time.sleep(0.002) # Twice as slow
|
|
return {"prediction": np.random.rand(5)}
|
|
|
|
comparison = benchmark.compare_models(test_model, slower_model)
|
|
print(f"✅ Model comparison: {comparison.recommendation}")
|
|
|
|
# Test report generation
|
|
report = benchmark.generate_report()
|
|
assert "TinyTorch Benchmark Report" in report
|
|
print("✅ Report generation working")
|
|
|
|
print("✅ Complete TinyTorchPerf framework working!")
|
|
|
|
# Run the test
|
|
test_tinytorch_perf()
|
|
|
|
# %% [markdown]
|
|
"""
|
|
## Step 5: Professional Reporting - Project-Ready Results
|
|
|
|
### Why Professional Reports Matter
|
|
Your ML projects need:
|
|
- **Clear performance metrics** for presentations
|
|
- **Statistical validation** for credibility
|
|
- **Comparison baselines** for context
|
|
- **Professional formatting** for academic/industry standards
|
|
"""
|
|
|
|
# %% nbgrader={"grade": false, "grade_id": "performance-reporter", "locked": false, "schema_version": 3, "solution": true, "task": false}
|
|
#| export
|
|
class PerformanceReporter:
|
|
"""
|
|
Generates professional performance reports for ML projects.
|
|
|
|
TODO: Implement professional report generation.
|
|
|
|
UNDERSTANDING PROFESSIONAL REPORTS:
|
|
1. Executive summary with key metrics
|
|
2. Detailed methodology section
|
|
3. Statistical validation results
|
|
4. Comparison with baselines
|
|
5. Recommendations for improvement
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.reports = []
|
|
|
|
def generate_project_report(self, benchmark_results: Dict[str, BenchmarkResult],
|
|
model_name: str = "TinyTorch Model") -> str:
|
|
"""
|
|
Generate a professional performance report for ML projects.
|
|
|
|
TODO: Implement project report generation.
|
|
|
|
STEP-BY-STEP:
|
|
1. Create executive summary
|
|
2. Add methodology section
|
|
3. Present detailed results
|
|
4. Include statistical validation
|
|
5. Add recommendations
|
|
"""
|
|
### BEGIN SOLUTION
|
|
report = f"""# {model_name} Performance Report
|
|
|
|
## Executive Summary
|
|
|
|
This report presents comprehensive performance benchmarking results for {model_name} using MLPerf-inspired methodology. The evaluation covers three standard scenarios: single-stream (latency), server (throughput), and offline (batch processing).
|
|
|
|
### Key Findings
|
|
"""
|
|
|
|
# Add key metrics
|
|
for scenario_name, result in benchmark_results.items():
|
|
mean_latency = statistics.mean(result.latencies) * 1000
|
|
p90_latency = result.latencies[int(0.9 * len(result.latencies))] * 1000
|
|
|
|
report += f"- **{scenario_name.replace('_', ' ').title()}**: {result.throughput:.2f} samples/sec, "
|
|
report += f"{mean_latency:.2f}ms mean latency, {p90_latency:.2f}ms 90th percentile\n"
|
|
|
|
report += """
|
|
## Methodology
|
|
|
|
### Benchmark Framework
|
|
- **Architecture**: MLPerf-inspired four-component system
|
|
- **Scenarios**: Single-stream, server, and offline evaluation
|
|
- **Statistical Validation**: Multiple runs with confidence intervals
|
|
- **Metrics**: Latency distribution, throughput, accuracy
|
|
|
|
### Test Environment
|
|
- **Hardware**: Standard development machine
|
|
- **Software**: TinyTorch framework
|
|
- **Dataset**: Standardized evaluation dataset
|
|
- **Validation**: Statistical significance testing
|
|
|
|
## Detailed Results
|
|
|
|
"""
|
|
|
|
# Add detailed results for each scenario
|
|
for scenario_name, result in benchmark_results.items():
|
|
report += f"### {scenario_name.replace('_', ' ').title()} Scenario\n\n"
|
|
|
|
latencies_ms = [l * 1000 for l in result.latencies]
|
|
|
|
report += f"- **Sample Count**: {len(result.latencies)}\n"
|
|
report += f"- **Mean Latency**: {statistics.mean(latencies_ms):.2f} ms\n"
|
|
report += f"- **Median Latency**: {statistics.median(latencies_ms):.2f} ms\n"
|
|
report += f"- **90th Percentile**: {latencies_ms[int(0.9 * len(latencies_ms))]:.2f} ms\n"
|
|
report += f"- **95th Percentile**: {latencies_ms[int(0.95 * len(latencies_ms))]:.2f} ms\n"
|
|
report += f"- **Standard Deviation**: {statistics.stdev(latencies_ms):.2f} ms\n"
|
|
report += f"- **Throughput**: {result.throughput:.2f} samples/second\n"
|
|
|
|
if result.accuracy > 0:
|
|
report += f"- **Accuracy**: {result.accuracy:.4f}\n"
|
|
|
|
report += "\n"
|
|
|
|
report += """## Statistical Validation
|
|
|
|
All results include proper statistical validation:
|
|
- Multiple independent runs for reliability
|
|
- Confidence intervals for key metrics
|
|
- Outlier detection and handling
|
|
- Significance testing for comparisons
|
|
|
|
## Recommendations
|
|
|
|
Based on the benchmark results:
|
|
1. **Performance Characteristics**: Model shows consistent performance across scenarios
|
|
2. **Optimization Opportunities**: Focus on reducing tail latency for production deployment
|
|
3. **Scalability**: Server scenario results indicate good potential for production scaling
|
|
4. **Further Testing**: Consider testing with larger datasets and different hardware configurations
|
|
|
|
## Conclusion
|
|
|
|
This comprehensive benchmarking demonstrates {model_name}'s performance characteristics using industry-standard methodology. The results provide a solid foundation for production deployment decisions and further optimization efforts.
|
|
"""
|
|
|
|
return report
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
def save_report(self, report: str, filename: str = "benchmark_report.md"):
|
|
"""Save report to file."""
|
|
with open(filename, 'w') as f:
|
|
f.write(report)
|
|
print(f"📄 Report saved to {filename}")
|
|
|
|
def plot_benchmark_results(benchmark_results: Dict[str, BenchmarkResult]):
|
|
"""Visualize benchmark results."""
|
|
if not _should_show_plots():
|
|
return
|
|
|
|
# Create visualizations
|
|
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
|
|
|
|
# Latency distribution for single-stream
|
|
if 'single_stream' in benchmark_results:
|
|
axes[0].hist(benchmark_results['single_stream'].latencies, bins=50, color='skyblue')
|
|
axes[0].set_title("Single-Stream Latency Distribution")
|
|
axes[0].set_xlabel("Latency (s)")
|
|
axes[0].set_ylabel("Frequency")
|
|
|
|
# Server scenario latency
|
|
if 'server' in benchmark_results:
|
|
axes[1].plot(benchmark_results['server'].latencies, marker='o', linestyle='-', color='salmon')
|
|
axes[1].set_title("Server Scenario Latency Over Time")
|
|
axes[1].set_xlabel("Query Index")
|
|
axes[1].set_ylabel("Latency (s)")
|
|
|
|
# Offline scenario throughput
|
|
if 'offline' in benchmark_results:
|
|
offline_result = benchmark_results['offline']
|
|
throughput = len(offline_result.latencies) / sum(offline_result.latencies)
|
|
axes[2].bar(['Throughput'], [throughput], color='lightgreen')
|
|
axes[2].set_title("Offline Scenario Throughput")
|
|
axes[2].set_ylabel("Samples per second")
|
|
|
|
plt.tight_layout()
|
|
plt.show()
|
|
|
|
# %% [markdown]
|
|
"""
|
|
### 🧪 Unit Test: Performance Reporter
|
|
|
|
Let's test our professional reporting system.
|
|
"""
|
|
|
|
# %% nbgrader={"grade": false, "grade_id": "test-reporter", "locked": false, "schema_version": 3, "solution": false, "task": false}
|
|
def test_unit_performance_reporter():
|
|
"""Unit test for the PerformanceReporter class."""
|
|
print("🔬 Unit Test: Performance Reporter...")
|
|
|
|
# Create mock benchmark results
|
|
mock_results = {
|
|
'single_stream': BenchmarkResult(
|
|
scenario=BenchmarkScenario.SINGLE_STREAM,
|
|
latencies=[0.01 + 0.002 * np.random.randn() for _ in range(100)],
|
|
throughput=95.0,
|
|
accuracy=0.942
|
|
),
|
|
'server': BenchmarkResult(
|
|
scenario=BenchmarkScenario.SERVER,
|
|
latencies=[0.012 + 0.003 * np.random.randn() for _ in range(150)],
|
|
throughput=87.0,
|
|
accuracy=0.938
|
|
),
|
|
'offline': BenchmarkResult(
|
|
scenario=BenchmarkScenario.OFFLINE,
|
|
latencies=[0.008 + 0.001 * np.random.randn() for _ in range(50)],
|
|
throughput=120.0,
|
|
accuracy=0.945
|
|
)
|
|
}
|
|
|
|
# Test report generation
|
|
reporter = PerformanceReporter()
|
|
report = reporter.generate_project_report(mock_results, "My Project Model")
|
|
|
|
# Verify report content
|
|
assert "Performance Report" in report
|
|
assert "Executive Summary" in report
|
|
assert "Methodology" in report
|
|
assert "Detailed Results" in report
|
|
assert "Statistical Validation" in report
|
|
assert "Recommendations" in report
|
|
|
|
print("✅ Report generated successfully")
|
|
print(f"✅ Report length: {len(report)} characters")
|
|
print(f"✅ Contains all required sections")
|
|
|
|
# Test saving
|
|
reporter.save_report(report, "test_report.md")
|
|
print("✅ Report saving working")
|
|
|
|
# Test plotting
|
|
plot_benchmark_results(mock_results)
|
|
print("✅ Plotting working")
|
|
|
|
print("✅ Performance reporter tests passed!")
|
|
|
|
# Run the test
|
|
test_performance_reporter()
|
|
|
|
# %% [markdown]
|
|
"""
|
|
## Comprehensive Integration Test
|
|
|
|
Let's test everything together with a realistic TinyTorch model.
|
|
"""
|
|
|
|
# %% nbgrader={"grade": false, "grade_id": "integration-test", "locked": false, "schema_version": 3, "solution": false, "task": false}
|
|
def test_module_comprehensive_benchmarking():
|
|
"""Comprehensive integration test for the entire benchmarking system."""
|
|
print("🔬 Integration Test: Comprehensive Benchmarking...")
|
|
|
|
# Create a realistic TinyTorch model
|
|
def create_simple_model():
|
|
"""Create a simple classification model for testing."""
|
|
def model(sample):
|
|
# Simulate a simple neural network
|
|
x = np.array(sample['data'])
|
|
|
|
# Layer 1: 10 -> 5
|
|
W1 = np.random.randn(10, 5) * 0.1
|
|
b1 = np.zeros(5)
|
|
h1 = np.maximum(0, x @ W1 + b1) # ReLU
|
|
|
|
# Layer 2: 5 -> 3
|
|
W2 = np.random.randn(5, 3) * 0.1
|
|
b2 = np.zeros(3)
|
|
output = h1 @ W2 + b2
|
|
|
|
# Simulate some processing time
|
|
time.sleep(0.001)
|
|
|
|
return {"prediction": output}
|
|
|
|
return model
|
|
|
|
# Create test dataset
|
|
test_dataset = []
|
|
for i in range(100):
|
|
sample = {
|
|
'data': np.random.randn(10),
|
|
'target': np.random.randint(0, 3)
|
|
}
|
|
test_dataset.append(sample)
|
|
|
|
# Test complete workflow
|
|
model = create_simple_model()
|
|
|
|
# 1. Run comprehensive benchmarking
|
|
benchmark = TinyTorchPerf()
|
|
benchmark.set_model(model)
|
|
benchmark.set_dataset(test_dataset)
|
|
|
|
print("📊 Running comprehensive benchmarking...")
|
|
all_results = benchmark.run_all_scenarios(quick_test=True)
|
|
|
|
# 2. Generate professional report
|
|
reporter = PerformanceReporter()
|
|
report = reporter.generate_project_report(all_results, "TinyTorch CNN Model")
|
|
|
|
# 3. Validate results
|
|
for scenario_name, result in all_results.items():
|
|
assert result.throughput > 0, f"{scenario_name} should have positive throughput"
|
|
assert len(result.latencies) > 0, f"{scenario_name} should have latency measurements"
|
|
print(f"✅ {scenario_name}: {result.throughput:.2f} samples/sec")
|
|
|
|
# 4. Test model comparison
|
|
def create_slower_model():
|
|
"""Create a slower model for comparison."""
|
|
def model(sample):
|
|
x = np.array(sample['data'])
|
|
W1 = np.random.randn(10, 5) * 0.1
|
|
b1 = np.zeros(5)
|
|
h1 = np.maximum(0, x @ W1 + b1)
|
|
|
|
W2 = np.random.randn(5, 3) * 0.1
|
|
b2 = np.zeros(3)
|
|
output = h1 @ W2 + b2
|
|
|
|
time.sleep(0.002) # Slower
|
|
return {"prediction": output}
|
|
|
|
return model
|
|
|
|
slower_model = create_slower_model()
|
|
comparison = benchmark.compare_models(model, slower_model)
|
|
print(f"✅ Model comparison: {comparison.recommendation}")
|
|
|
|
# 5. Test report quality
|
|
assert len(report) > 1000, "Report should be comprehensive"
|
|
print(f"✅ Generated {len(report)} character report")
|
|
|
|
print("✅ Comprehensive integration test passed!")
|
|
print("🎉 Complete benchmarking system working!")
|
|
|
|
# Run the comprehensive test
|
|
test_comprehensive_benchmarking()
|
|
|
|
# %% [markdown]
|
|
"""
|
|
## 🧪 Module Testing
|
|
|
|
Time to test your implementation! This section uses TinyTorch's standardized testing framework to ensure your implementation works correctly.
|
|
|
|
**This testing section is locked** - it provides consistent feedback across all modules and cannot be modified.
|
|
"""
|
|
|
|
# %% [markdown]
|
|
"""
|
|
## 🤖 AUTO TESTING
|
|
"""
|
|
|
|
# %% nbgrader={"grade": false, "grade_id": "standardized-testing", "locked": true, "schema_version": 3, "solution": false, "task": false}
|
|
# =============================================================================
|
|
# STANDARDIZED MODULE TESTING - DO NOT MODIFY
|
|
# This cell is locked to ensure consistent testing across all TinyTorch modules
|
|
# =============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
from tito.tools.testing import run_module_tests_auto
|
|
|
|
# Automatically discover and run all tests in this module
|
|
success = run_module_tests_auto("Benchmarking")
|
|
|
|
# %% [markdown]
|
|
"""
|
|
## 🎯 MODULE SUMMARY: Performance Benchmarking
|
|
|
|
### What You've Built
|
|
You've implemented a comprehensive MLPerf-inspired benchmarking framework:
|
|
|
|
1. **Benchmark Scenarios**: Single-stream (latency), server (throughput), and offline (batch processing)
|
|
2. **Statistical Validation**: Confidence intervals, significance testing, and effect size calculation
|
|
3. **MLPerf Architecture**: Four-component system with load generator, model, dataset, and evaluation
|
|
4. **Professional Reporting**: Generate conference-quality performance reports with proper methodology
|
|
5. **Model Comparison**: Systematic comparison framework with statistical validation
|
|
|
|
### Key Insights
|
|
- **Systematic evaluation beats intuition**: Proper benchmarking reveals true performance characteristics
|
|
- **Statistics matter**: Single measurements are meaningless; confidence intervals provide real insights
|
|
- **Scenarios capture reality**: Different use cases (mobile, server, batch) require different metrics
|
|
- **Reproducibility is crucial**: Others must be able to verify your results
|
|
- **Professional presentation**: Clear methodology and statistical validation build credibility
|
|
|
|
### Real-World Connections
|
|
- **MLPerf**: Uses identical four-component architecture and scenario patterns
|
|
- **Production systems**: A/B testing frameworks follow these statistical principles
|
|
- **Research papers**: Proper experimental methodology is required for publication
|
|
- **ML engineering**: Systematic evaluation prevents costly production mistakes
|
|
- **Open source**: Contributing benchmarks to libraries like PyTorch and TensorFlow
|
|
|
|
### Next Steps
|
|
In real ML systems, you'd:
|
|
1. **GPU benchmarking**: Extend to CUDA/OpenCL performance measurement
|
|
2. **Distributed evaluation**: Scale benchmarking across multiple machines
|
|
3. **Continuous monitoring**: Integrate with CI/CD pipelines for regression detection
|
|
4. **Domain-specific metrics**: Develop specialized benchmarks for your problem domain
|
|
5. **Hardware optimization**: Evaluate performance across different architectures
|
|
|
|
### 🏆 Achievement Unlocked
|
|
You've mastered systematic ML evaluation using industry-standard methodology. You understand how to design proper experiments, validate results statistically, and present findings professionally!
|
|
|
|
**You've completed the TinyTorch Benchmarking module!** 🎉
|
|
""" |