mirror of
https://github.com/MLSysBook/TinyTorch.git
synced 2026-05-10 16:38:39 -05:00
- Flattened tests/ directory structure (removed integration/ and system/ subdirectories) - Renamed all integration tests with _integration.py suffix for clarity - Created test_utils.py with setup_integration_test() function - Updated integration tests to use ONLY tinytorch package imports - Ensured all modules are exported before running tests via tito export --all - Optimized module test timing for fast execution (under 5 seconds each) - Fixed MLOps test reliability and reduced timing parameters across modules - Exported all modules (compression, kernels, benchmarking, mlops) to tinytorch package
756 lines
28 KiB
Python
756 lines
28 KiB
Python
# AUTOGENERATED! DO NOT EDIT! File to edit: ../../modules/source/12_benchmarking/benchmarking_dev.ipynb.
|
|
|
|
# %% auto 0
|
|
__all__ = ['BenchmarkScenario', 'BenchmarkResult', 'BenchmarkScenarios', 'StatisticalValidation', 'StatisticalValidator',
|
|
'TinyTorchPerf', 'PerformanceReporter']
|
|
|
|
# %% ../../modules/source/12_benchmarking/benchmarking_dev.ipynb 1
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import time
|
|
import statistics
|
|
import json
|
|
import math
|
|
from typing import Dict, List, Tuple, Optional, Any, Callable
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
import os
|
|
import sys
|
|
|
|
# Import our TinyTorch dependencies
|
|
try:
|
|
from tinytorch.core.tensor import Tensor
|
|
from tinytorch.core.networks import Sequential
|
|
from tinytorch.core.layers import Dense
|
|
from tinytorch.core.activations import ReLU, Softmax
|
|
from tinytorch.core.dataloader import DataLoader
|
|
except ImportError:
|
|
# For development, import from local modules
|
|
parent_dirs = [
|
|
os.path.join(os.path.dirname(__file__), '..', '01_tensor'),
|
|
os.path.join(os.path.dirname(__file__), '..', '03_layers'),
|
|
os.path.join(os.path.dirname(__file__), '..', '02_activations'),
|
|
os.path.join(os.path.dirname(__file__), '..', '04_networks'),
|
|
os.path.join(os.path.dirname(__file__), '..', '06_dataloader')
|
|
]
|
|
for path in parent_dirs:
|
|
if path not in sys.path:
|
|
sys.path.append(path)
|
|
|
|
try:
|
|
from tensor_dev import Tensor
|
|
from networks_dev import Sequential
|
|
from layers_dev import Dense
|
|
from activations_dev import ReLU, Softmax
|
|
from dataloader_dev import DataLoader
|
|
except ImportError:
|
|
# Fallback for missing modules
|
|
print("⚠️ Some TinyTorch modules not available - using minimal implementations")
|
|
|
|
# %% ../../modules/source/12_benchmarking/benchmarking_dev.ipynb 2
|
|
def _should_show_plots():
|
|
"""Check if we should show plots (disable during testing)"""
|
|
is_pytest = (
|
|
'pytest' in sys.modules or
|
|
'test' in sys.argv or
|
|
os.environ.get('PYTEST_CURRENT_TEST') is not None or
|
|
any('test' in arg for arg in sys.argv) or
|
|
any('pytest' in arg for arg in sys.argv)
|
|
)
|
|
|
|
return not is_pytest
|
|
|
|
# %% ../../modules/source/12_benchmarking/benchmarking_dev.ipynb 8
|
|
class BenchmarkScenario(Enum):
|
|
"""Standard benchmark scenarios from MLPerf"""
|
|
SINGLE_STREAM = "single_stream"
|
|
SERVER = "server"
|
|
OFFLINE = "offline"
|
|
|
|
@dataclass
|
|
class BenchmarkResult:
|
|
"""Results from a benchmark run"""
|
|
scenario: BenchmarkScenario
|
|
latencies: List[float] # All latency measurements in seconds
|
|
throughput: float # Samples per second
|
|
accuracy: float # Model accuracy (0-1)
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
|
|
#| export
|
|
class BenchmarkScenarios:
|
|
"""
|
|
Implements the three standard MLPerf benchmark scenarios.
|
|
|
|
TODO: Implement the three benchmark scenarios following MLPerf patterns.
|
|
|
|
UNDERSTANDING THE SCENARIOS:
|
|
1. Single-Stream: Send queries one at a time, measure latency
|
|
2. Server: Send queries following Poisson distribution, measure QPS
|
|
3. Offline: Send all queries at once, measure total throughput
|
|
|
|
IMPLEMENTATION APPROACH:
|
|
1. Each scenario should run the model multiple times
|
|
2. Collect latency measurements for each run
|
|
3. Calculate appropriate metrics for each scenario
|
|
4. Return BenchmarkResult with all measurements
|
|
|
|
EXAMPLE USAGE:
|
|
scenarios = BenchmarkScenarios()
|
|
result = scenarios.single_stream(model, dataset, num_queries=1000)
|
|
print(f"90th percentile latency: {result.latencies[int(0.9 * len(result.latencies))]} seconds")
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.results = []
|
|
|
|
def single_stream(self, model: Callable, dataset: List, num_queries: int = 1000) -> BenchmarkResult:
|
|
"""
|
|
Run single-stream benchmark scenario.
|
|
|
|
TODO: Implement single-stream benchmarking.
|
|
|
|
STEP-BY-STEP:
|
|
1. Initialize empty list for latencies
|
|
2. For each query (up to num_queries):
|
|
a. Get next sample from dataset (cycle if needed)
|
|
b. Record start time
|
|
c. Run model on sample
|
|
d. Record end time
|
|
e. Calculate latency = end - start
|
|
f. Add latency to list
|
|
3. Calculate throughput = num_queries / total_time
|
|
4. Calculate accuracy if possible
|
|
5. Return BenchmarkResult with SINGLE_STREAM scenario
|
|
|
|
HINTS:
|
|
- Use time.perf_counter() for precise timing
|
|
- Use dataset[i % len(dataset)] to cycle through samples
|
|
- Sort latencies for percentile calculations
|
|
"""
|
|
### BEGIN SOLUTION
|
|
latencies = []
|
|
correct_predictions = 0
|
|
total_start_time = time.perf_counter()
|
|
|
|
for i in range(num_queries):
|
|
# Get sample (cycle through dataset)
|
|
sample = dataset[i % len(dataset)]
|
|
|
|
# Time the inference
|
|
start_time = time.perf_counter()
|
|
result = model(sample)
|
|
end_time = time.perf_counter()
|
|
|
|
latency = end_time - start_time
|
|
latencies.append(latency)
|
|
|
|
# Simple accuracy calculation (if possible)
|
|
if hasattr(sample, 'target') and hasattr(result, 'data'):
|
|
predicted = np.argmax(result.data)
|
|
if predicted == sample.target:
|
|
correct_predictions += 1
|
|
|
|
total_time = time.perf_counter() - total_start_time
|
|
throughput = num_queries / total_time
|
|
accuracy = correct_predictions / num_queries if num_queries > 0 else 0.0
|
|
|
|
return BenchmarkResult(
|
|
scenario=BenchmarkScenario.SINGLE_STREAM,
|
|
latencies=sorted(latencies),
|
|
throughput=throughput,
|
|
accuracy=accuracy,
|
|
metadata={"num_queries": num_queries}
|
|
)
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
def server(self, model: Callable, dataset: List, target_qps: float = 10.0,
|
|
duration: float = 60.0) -> BenchmarkResult:
|
|
"""
|
|
Run server benchmark scenario with Poisson-distributed queries.
|
|
|
|
TODO: Implement server benchmarking.
|
|
|
|
STEP-BY-STEP:
|
|
1. Calculate inter-arrival time = 1.0 / target_qps
|
|
2. Run for specified duration:
|
|
a. Wait for next query arrival (Poisson distribution)
|
|
b. Get sample from dataset
|
|
c. Record start time
|
|
d. Run model
|
|
e. Record end time and latency
|
|
3. Calculate actual QPS = total_queries / duration
|
|
4. Return results
|
|
|
|
HINTS:
|
|
- Use np.random.exponential(inter_arrival_time) for Poisson
|
|
- Track both query arrival times and completion times
|
|
- Server scenario cares about sustained throughput
|
|
"""
|
|
### BEGIN SOLUTION
|
|
latencies = []
|
|
inter_arrival_time = 1.0 / target_qps
|
|
start_time = time.perf_counter()
|
|
current_time = start_time
|
|
query_count = 0
|
|
|
|
while (current_time - start_time) < duration:
|
|
# Wait for next query (Poisson distribution)
|
|
wait_time = np.random.exponential(inter_arrival_time)
|
|
time.sleep(min(wait_time, 0.001)) # Small sleep to simulate waiting
|
|
|
|
# Get sample
|
|
sample = dataset[query_count % len(dataset)]
|
|
|
|
# Time the inference
|
|
query_start = time.perf_counter()
|
|
result = model(sample)
|
|
query_end = time.perf_counter()
|
|
|
|
latency = query_end - query_start
|
|
latencies.append(latency)
|
|
|
|
query_count += 1
|
|
current_time = time.perf_counter()
|
|
|
|
actual_duration = current_time - start_time
|
|
actual_qps = query_count / actual_duration
|
|
|
|
return BenchmarkResult(
|
|
scenario=BenchmarkScenario.SERVER,
|
|
latencies=sorted(latencies),
|
|
throughput=actual_qps,
|
|
accuracy=0.0, # Would need labels for accuracy
|
|
metadata={"target_qps": target_qps, "actual_qps": actual_qps, "duration": actual_duration}
|
|
)
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
def offline(self, model: Callable, dataset: List, batch_size: int = 32) -> BenchmarkResult:
|
|
"""
|
|
Run offline benchmark scenario with batch processing.
|
|
|
|
TODO: Implement offline benchmarking.
|
|
|
|
STEP-BY-STEP:
|
|
1. Group dataset into batches of batch_size
|
|
2. For each batch:
|
|
a. Record start time
|
|
b. Run model on entire batch
|
|
c. Record end time
|
|
d. Calculate batch latency
|
|
3. Calculate total throughput = total_samples / total_time
|
|
4. Return results
|
|
|
|
HINTS:
|
|
- Process data in batches for efficiency
|
|
- Measure total time for all batches
|
|
- Offline cares about maximum throughput
|
|
"""
|
|
### BEGIN SOLUTION
|
|
latencies = []
|
|
total_samples = len(dataset)
|
|
total_start_time = time.perf_counter()
|
|
|
|
for batch_start in range(0, total_samples, batch_size):
|
|
batch_end = min(batch_start + batch_size, total_samples)
|
|
batch = dataset[batch_start:batch_end]
|
|
|
|
# Time the batch inference
|
|
batch_start_time = time.perf_counter()
|
|
for sample in batch:
|
|
result = model(sample)
|
|
batch_end_time = time.perf_counter()
|
|
|
|
batch_latency = batch_end_time - batch_start_time
|
|
latencies.append(batch_latency)
|
|
|
|
total_time = time.perf_counter() - total_start_time
|
|
throughput = total_samples / total_time
|
|
|
|
return BenchmarkResult(
|
|
scenario=BenchmarkScenario.OFFLINE,
|
|
latencies=latencies,
|
|
throughput=throughput,
|
|
accuracy=0.0, # Would need labels for accuracy
|
|
metadata={"batch_size": batch_size, "total_samples": total_samples}
|
|
)
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
# %% ../../modules/source/12_benchmarking/benchmarking_dev.ipynb 12
|
|
@dataclass
|
|
class StatisticalValidation:
|
|
"""Results from statistical validation"""
|
|
is_significant: bool
|
|
p_value: float
|
|
effect_size: float
|
|
confidence_interval: Tuple[float, float]
|
|
recommendation: str
|
|
|
|
#| export
|
|
class StatisticalValidator:
|
|
"""
|
|
Validates benchmark results using proper statistical methods.
|
|
|
|
TODO: Implement statistical validation for benchmark results.
|
|
|
|
UNDERSTANDING STATISTICAL TESTING:
|
|
1. Null hypothesis: No difference between models
|
|
2. T-test: Compare means of two groups
|
|
3. P-value: Probability of seeing this difference by chance
|
|
4. Effect size: Magnitude of the difference
|
|
5. Confidence interval: Range of likely true values
|
|
|
|
IMPLEMENTATION APPROACH:
|
|
1. Calculate basic statistics (mean, std, n)
|
|
2. Perform t-test to get p-value
|
|
3. Calculate effect size (Cohen's d)
|
|
4. Calculate confidence interval
|
|
5. Provide clear recommendation
|
|
"""
|
|
|
|
def __init__(self, confidence_level: float = 0.95):
|
|
self.confidence_level = confidence_level
|
|
self.alpha = 1 - confidence_level
|
|
|
|
def validate_comparison(self, results_a: List[float], results_b: List[float]) -> StatisticalValidation:
|
|
"""
|
|
Compare two sets of benchmark results statistically.
|
|
|
|
TODO: Implement statistical comparison.
|
|
|
|
STEP-BY-STEP:
|
|
1. Calculate basic statistics for both groups
|
|
2. Perform two-sample t-test
|
|
3. Calculate effect size (Cohen's d)
|
|
4. Calculate confidence interval for the difference
|
|
5. Generate recommendation based on results
|
|
|
|
HINTS:
|
|
- Use scipy.stats.ttest_ind for t-test (or implement manually)
|
|
- Cohen's d = (mean_a - mean_b) / pooled_std
|
|
- CI = difference ± (critical_value * standard_error)
|
|
"""
|
|
### BEGIN SOLUTION
|
|
import math
|
|
|
|
# Basic statistics
|
|
mean_a = statistics.mean(results_a)
|
|
mean_b = statistics.mean(results_b)
|
|
std_a = statistics.stdev(results_a)
|
|
std_b = statistics.stdev(results_b)
|
|
n_a = len(results_a)
|
|
n_b = len(results_b)
|
|
|
|
# Two-sample t-test (simplified)
|
|
pooled_std = math.sqrt(((n_a - 1) * std_a**2 + (n_b - 1) * std_b**2) / (n_a + n_b - 2))
|
|
standard_error = pooled_std * math.sqrt(1/n_a + 1/n_b)
|
|
|
|
if standard_error == 0:
|
|
t_stat = 0
|
|
p_value = 1.0
|
|
else:
|
|
t_stat = (mean_a - mean_b) / standard_error
|
|
# Simplified p-value calculation (assuming normal distribution)
|
|
p_value = 2 * (1 - abs(t_stat) / (abs(t_stat) + math.sqrt(n_a + n_b - 2)))
|
|
|
|
# Effect size (Cohen's d)
|
|
effect_size = (mean_a - mean_b) / pooled_std if pooled_std > 0 else 0
|
|
|
|
# Confidence interval for difference
|
|
difference = mean_a - mean_b
|
|
critical_value = 1.96 # Approximate for 95% CI
|
|
margin_of_error = critical_value * standard_error
|
|
ci_lower = difference - margin_of_error
|
|
ci_upper = difference + margin_of_error
|
|
|
|
# Determine significance
|
|
is_significant = p_value < self.alpha
|
|
|
|
# Generate recommendation
|
|
if is_significant:
|
|
if effect_size > 0.8:
|
|
recommendation = "Large significant difference - strong evidence for improvement"
|
|
elif effect_size > 0.5:
|
|
recommendation = "Medium significant difference - good evidence for improvement"
|
|
else:
|
|
recommendation = "Small significant difference - weak evidence for improvement"
|
|
else:
|
|
recommendation = "No significant difference - insufficient evidence for improvement"
|
|
|
|
return StatisticalValidation(
|
|
is_significant=is_significant,
|
|
p_value=p_value,
|
|
effect_size=effect_size,
|
|
confidence_interval=(ci_lower, ci_upper),
|
|
recommendation=recommendation
|
|
)
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
def validate_benchmark_result(self, result: BenchmarkResult,
|
|
min_samples: int = 100) -> StatisticalValidation:
|
|
"""
|
|
Validate that a benchmark result has sufficient statistical power.
|
|
|
|
TODO: Implement validation for single benchmark result.
|
|
|
|
STEP-BY-STEP:
|
|
1. Check if we have enough samples
|
|
2. Calculate confidence interval for the metric
|
|
3. Check for common pitfalls (outliers, etc.)
|
|
4. Provide recommendations
|
|
"""
|
|
### BEGIN SOLUTION
|
|
latencies = result.latencies
|
|
n = len(latencies)
|
|
|
|
if n < min_samples:
|
|
return StatisticalValidation(
|
|
is_significant=False,
|
|
p_value=1.0,
|
|
effect_size=0.0,
|
|
confidence_interval=(0.0, 0.0),
|
|
recommendation=f"Insufficient samples: {n} < {min_samples}. Need more data."
|
|
)
|
|
|
|
# Calculate confidence interval for mean latency
|
|
mean_latency = statistics.mean(latencies)
|
|
std_latency = statistics.stdev(latencies)
|
|
standard_error = std_latency / math.sqrt(n)
|
|
|
|
critical_value = 1.96 # 95% CI
|
|
margin_of_error = critical_value * standard_error
|
|
ci_lower = mean_latency - margin_of_error
|
|
ci_upper = mean_latency + margin_of_error
|
|
|
|
# Check for outliers (simple check)
|
|
q1 = latencies[int(0.25 * n)]
|
|
q3 = latencies[int(0.75 * n)]
|
|
iqr = q3 - q1
|
|
outlier_threshold = q3 + 1.5 * iqr
|
|
outliers = [l for l in latencies if l > outlier_threshold]
|
|
|
|
if len(outliers) > 0.1 * n: # More than 10% outliers
|
|
recommendation = f"Warning: {len(outliers)} outliers detected. Results may be unreliable."
|
|
else:
|
|
recommendation = "Benchmark result appears statistically valid."
|
|
|
|
return StatisticalValidation(
|
|
is_significant=True,
|
|
p_value=0.0, # Not applicable for single result
|
|
effect_size=std_latency / mean_latency, # Coefficient of variation
|
|
confidence_interval=(ci_lower, ci_upper),
|
|
recommendation=recommendation
|
|
)
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
# %% ../../modules/source/12_benchmarking/benchmarking_dev.ipynb 16
|
|
class TinyTorchPerf:
|
|
"""
|
|
Complete MLPerf-inspired benchmarking framework for TinyTorch.
|
|
|
|
TODO: Implement the complete benchmarking framework.
|
|
|
|
UNDERSTANDING THE FRAMEWORK:
|
|
1. Combines all benchmark scenarios
|
|
2. Integrates statistical validation
|
|
3. Provides easy-to-use API
|
|
4. Generates professional reports
|
|
|
|
IMPLEMENTATION APPROACH:
|
|
1. Initialize with model and dataset
|
|
2. Provide methods for each scenario
|
|
3. Include statistical validation
|
|
4. Generate comprehensive reports
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.scenarios = BenchmarkScenarios()
|
|
self.validator = StatisticalValidator()
|
|
self.model = None
|
|
self.dataset = None
|
|
self.results = {}
|
|
|
|
def set_model(self, model: Callable):
|
|
"""Set the model to benchmark."""
|
|
self.model = model
|
|
|
|
def set_dataset(self, dataset: List):
|
|
"""Set the dataset for benchmarking."""
|
|
self.dataset = dataset
|
|
|
|
def run_single_stream(self, num_queries: int = 1000) -> BenchmarkResult:
|
|
"""
|
|
Run single-stream benchmark.
|
|
|
|
TODO: Implement single-stream benchmark with validation.
|
|
|
|
STEP-BY-STEP:
|
|
1. Check that model and dataset are set
|
|
2. Run single-stream scenario
|
|
3. Validate results statistically
|
|
4. Store results
|
|
5. Return result
|
|
"""
|
|
### BEGIN SOLUTION
|
|
if self.model is None or self.dataset is None:
|
|
raise ValueError("Model and dataset must be set before running benchmarks")
|
|
|
|
result = self.scenarios.single_stream(self.model, self.dataset, num_queries)
|
|
validation = self.validator.validate_benchmark_result(result)
|
|
|
|
self.results['single_stream'] = {
|
|
'result': result,
|
|
'validation': validation
|
|
}
|
|
|
|
return result
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
def run_server(self, target_qps: float = 10.0, duration: float = 60.0) -> BenchmarkResult:
|
|
"""
|
|
Run server benchmark.
|
|
|
|
TODO: Implement server benchmark with validation.
|
|
"""
|
|
### BEGIN SOLUTION
|
|
if self.model is None or self.dataset is None:
|
|
raise ValueError("Model and dataset must be set before running benchmarks")
|
|
|
|
result = self.scenarios.server(self.model, self.dataset, target_qps, duration)
|
|
validation = self.validator.validate_benchmark_result(result)
|
|
|
|
self.results['server'] = {
|
|
'result': result,
|
|
'validation': validation
|
|
}
|
|
|
|
return result
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
def run_offline(self, batch_size: int = 32) -> BenchmarkResult:
|
|
"""
|
|
Run offline benchmark.
|
|
|
|
TODO: Implement offline benchmark with validation.
|
|
"""
|
|
### BEGIN SOLUTION
|
|
if self.model is None or self.dataset is None:
|
|
raise ValueError("Model and dataset must be set before running benchmarks")
|
|
|
|
result = self.scenarios.offline(self.model, self.dataset, batch_size)
|
|
validation = self.validator.validate_benchmark_result(result)
|
|
|
|
self.results['offline'] = {
|
|
'result': result,
|
|
'validation': validation
|
|
}
|
|
|
|
return result
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
def run_all_scenarios(self, quick_test: bool = False) -> Dict[str, BenchmarkResult]:
|
|
"""
|
|
Run all benchmark scenarios.
|
|
|
|
TODO: Implement comprehensive benchmarking.
|
|
"""
|
|
### BEGIN SOLUTION
|
|
if quick_test:
|
|
# Quick test with smaller parameters
|
|
single_result = self.run_single_stream(num_queries=100)
|
|
server_result = self.run_server(target_qps=5.0, duration=10.0)
|
|
offline_result = self.run_offline(batch_size=16)
|
|
else:
|
|
# Full benchmarking
|
|
single_result = self.run_single_stream(num_queries=1000)
|
|
server_result = self.run_server(target_qps=10.0, duration=60.0)
|
|
offline_result = self.run_offline(batch_size=32)
|
|
|
|
return {
|
|
'single_stream': single_result,
|
|
'server': server_result,
|
|
'offline': offline_result
|
|
}
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
def compare_models(self, model_a: Callable, model_b: Callable,
|
|
scenario: str = 'single_stream') -> StatisticalValidation:
|
|
"""
|
|
Compare two models statistically.
|
|
|
|
TODO: Implement model comparison.
|
|
"""
|
|
### BEGIN SOLUTION
|
|
# Run both models on the same scenario
|
|
self.set_model(model_a)
|
|
if scenario == 'single_stream':
|
|
result_a = self.run_single_stream(num_queries=100)
|
|
elif scenario == 'server':
|
|
result_a = self.run_server(target_qps=5.0, duration=10.0)
|
|
else: # offline
|
|
result_a = self.run_offline(batch_size=16)
|
|
|
|
self.set_model(model_b)
|
|
if scenario == 'single_stream':
|
|
result_b = self.run_single_stream(num_queries=100)
|
|
elif scenario == 'server':
|
|
result_b = self.run_server(target_qps=5.0, duration=10.0)
|
|
else: # offline
|
|
result_b = self.run_offline(batch_size=16)
|
|
|
|
# Compare latencies
|
|
return self.validator.validate_comparison(result_a.latencies, result_b.latencies)
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
def generate_report(self) -> str:
|
|
"""
|
|
Generate a comprehensive benchmark report.
|
|
|
|
TODO: Implement professional report generation.
|
|
"""
|
|
### BEGIN SOLUTION
|
|
report = "# TinyTorch Benchmark Report\n\n"
|
|
|
|
for scenario_name, scenario_data in self.results.items():
|
|
result = scenario_data['result']
|
|
validation = scenario_data['validation']
|
|
|
|
report += f"## {scenario_name.replace('_', ' ').title()} Scenario\n\n"
|
|
report += f"- **Throughput**: {result.throughput:.2f} samples/second\n"
|
|
report += f"- **Mean Latency**: {statistics.mean(result.latencies)*1000:.2f} ms\n"
|
|
report += f"- **90th Percentile**: {result.latencies[int(0.9*len(result.latencies))]*1000:.2f} ms\n"
|
|
report += f"- **95th Percentile**: {result.latencies[int(0.95*len(result.latencies))]*1000:.2f} ms\n"
|
|
report += f"- **Statistical Validation**: {validation.recommendation}\n\n"
|
|
|
|
return report
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
# %% ../../modules/source/12_benchmarking/benchmarking_dev.ipynb 20
|
|
class PerformanceReporter:
|
|
"""
|
|
Generates professional performance reports for ML projects.
|
|
|
|
TODO: Implement professional report generation.
|
|
|
|
UNDERSTANDING PROFESSIONAL REPORTS:
|
|
1. Executive summary with key metrics
|
|
2. Detailed methodology section
|
|
3. Statistical validation results
|
|
4. Comparison with baselines
|
|
5. Recommendations for improvement
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.reports = []
|
|
|
|
def generate_project_report(self, benchmark_results: Dict[str, BenchmarkResult],
|
|
model_name: str = "TinyTorch Model") -> str:
|
|
"""
|
|
Generate a professional performance report for ML projects.
|
|
|
|
TODO: Implement project report generation.
|
|
|
|
STEP-BY-STEP:
|
|
1. Create executive summary
|
|
2. Add methodology section
|
|
3. Present detailed results
|
|
4. Include statistical validation
|
|
5. Add recommendations
|
|
"""
|
|
### BEGIN SOLUTION
|
|
report = f"""# {model_name} Performance Report
|
|
|
|
## Executive Summary
|
|
|
|
This report presents comprehensive performance benchmarking results for {model_name} using MLPerf-inspired methodology. The evaluation covers three standard scenarios: single-stream (latency), server (throughput), and offline (batch processing).
|
|
|
|
### Key Findings
|
|
"""
|
|
|
|
# Add key metrics
|
|
for scenario_name, result in benchmark_results.items():
|
|
mean_latency = statistics.mean(result.latencies) * 1000
|
|
p90_latency = result.latencies[int(0.9 * len(result.latencies))] * 1000
|
|
|
|
report += f"- **{scenario_name.replace('_', ' ').title()}**: {result.throughput:.2f} samples/sec, "
|
|
report += f"{mean_latency:.2f}ms mean latency, {p90_latency:.2f}ms 90th percentile\n"
|
|
|
|
report += """
|
|
## Methodology
|
|
|
|
### Benchmark Framework
|
|
- **Architecture**: MLPerf-inspired four-component system
|
|
- **Scenarios**: Single-stream, server, and offline evaluation
|
|
- **Statistical Validation**: Multiple runs with confidence intervals
|
|
- **Metrics**: Latency distribution, throughput, accuracy
|
|
|
|
### Test Environment
|
|
- **Hardware**: Standard development machine
|
|
- **Software**: TinyTorch framework
|
|
- **Dataset**: Standardized evaluation dataset
|
|
- **Validation**: Statistical significance testing
|
|
|
|
## Detailed Results
|
|
|
|
"""
|
|
|
|
# Add detailed results for each scenario
|
|
for scenario_name, result in benchmark_results.items():
|
|
report += f"### {scenario_name.replace('_', ' ').title()} Scenario\n\n"
|
|
|
|
latencies_ms = [l * 1000 for l in result.latencies]
|
|
|
|
report += f"- **Sample Count**: {len(result.latencies)}\n"
|
|
report += f"- **Mean Latency**: {statistics.mean(latencies_ms):.2f} ms\n"
|
|
report += f"- **Median Latency**: {statistics.median(latencies_ms):.2f} ms\n"
|
|
report += f"- **90th Percentile**: {latencies_ms[int(0.9 * len(latencies_ms))]:.2f} ms\n"
|
|
report += f"- **95th Percentile**: {latencies_ms[int(0.95 * len(latencies_ms))]:.2f} ms\n"
|
|
report += f"- **Standard Deviation**: {statistics.stdev(latencies_ms):.2f} ms\n"
|
|
report += f"- **Throughput**: {result.throughput:.2f} samples/second\n"
|
|
|
|
if result.accuracy > 0:
|
|
report += f"- **Accuracy**: {result.accuracy:.4f}\n"
|
|
|
|
report += "\n"
|
|
|
|
report += """## Statistical Validation
|
|
|
|
All results include proper statistical validation:
|
|
- Multiple independent runs for reliability
|
|
- Confidence intervals for key metrics
|
|
- Outlier detection and handling
|
|
- Significance testing for comparisons
|
|
|
|
## Recommendations
|
|
|
|
Based on the benchmark results:
|
|
1. **Performance Characteristics**: Model shows consistent performance across scenarios
|
|
2. **Optimization Opportunities**: Focus on reducing tail latency for production deployment
|
|
3. **Scalability**: Server scenario results indicate good potential for production scaling
|
|
4. **Further Testing**: Consider testing with larger datasets and different hardware configurations
|
|
|
|
## Conclusion
|
|
|
|
This comprehensive benchmarking demonstrates {model_name}'s performance characteristics using industry-standard methodology. The results provide a solid foundation for production deployment decisions and further optimization efforts.
|
|
"""
|
|
|
|
return report
|
|
### END SOLUTION
|
|
raise NotImplementedError("Student implementation required")
|
|
|
|
def save_report(self, report: str, filename: str = "benchmark_report.md"):
|
|
"""Save report to file."""
|
|
with open(filename, 'w') as f:
|
|
f.write(report)
|
|
print(f"📄 Report saved to {filename}")
|