Files
TinyTorch/tinytorch/core/benchmarking.py
Vijay Janapa Reddi bfadc82ce6 Update generated notebooks and package exports
- Regenerate all .ipynb files from fixed .py modules
- Update tinytorch package exports with corrected implementations
- Sync package module index with current 16-module structure

These generated files reflect all the module fixes and ensure consistent
.py ↔ .ipynb conversion with the updated module implementations.
2025-09-18 16:42:57 -04:00

1192 lines
48 KiB
Python
Generated

# AUTOGENERATED! DO NOT EDIT! File to edit: ../../modules/source/14_benchmarking/benchmarking_dev.ipynb.
# %% auto 0
__all__ = ['BenchmarkScenario', 'BenchmarkResult', 'BenchmarkScenarios', 'StatisticalValidation', 'StatisticalValidator',
'TinyTorchPerf', 'PerformanceReporter', 'plot_benchmark_results', 'ProductionBenchmarkingProfiler']
# %% ../../modules/source/14_benchmarking/benchmarking_dev.ipynb 1
import numpy as np
import matplotlib.pyplot as plt
import time
import statistics
import math
from typing import Dict, List, Tuple, Optional, Any, Callable
from enum import Enum
from dataclasses import dataclass
import os
import sys
# Import our TinyTorch dependencies
try:
from tinytorch.core.tensor import Tensor
from tinytorch.core.networks import Sequential
from tinytorch.core.layers import Dense
from tinytorch.core.activations import ReLU, Softmax
from tinytorch.core.dataloader import DataLoader
except ImportError:
# For development, import from local modules
parent_dirs = [
os.path.join(os.path.dirname(__file__), '..', '01_tensor'),
os.path.join(os.path.dirname(__file__), '..', '03_layers'),
os.path.join(os.path.dirname(__file__), '..', '02_activations'),
os.path.join(os.path.dirname(__file__), '..', '04_networks'),
os.path.join(os.path.dirname(__file__), '..', '06_dataloader')
]
for path in parent_dirs:
if path not in sys.path:
sys.path.append(path)
try:
from tensor_dev import Tensor
from networks_dev import Sequential
from layers_dev import Dense
from activations_dev import ReLU, Softmax
from dataloader_dev import DataLoader
except ImportError:
# Fallback for missing modules
print("⚠️ Some TinyTorch modules not available - using minimal implementations")
# %% ../../modules/source/14_benchmarking/benchmarking_dev.ipynb 8
class BenchmarkScenario(Enum):
"""Standard benchmark scenarios from MLPerf"""
SINGLE_STREAM = "single_stream"
SERVER = "server"
OFFLINE = "offline"
@dataclass
class BenchmarkResult:
"""Results from a benchmark run"""
scenario: BenchmarkScenario
latencies: List[float] # All latency measurements in seconds
throughput: float # Samples per second
accuracy: float # Model accuracy (0-1)
metadata: Optional[Dict[str, Any]] = None
#| export
class BenchmarkScenarios:
"""
Implements the three standard MLPerf benchmark scenarios.
TODO: Implement the three benchmark scenarios following MLPerf patterns.
STEP-BY-STEP IMPLEMENTATION:
1. Single-Stream: Send queries one at a time, measure latency
2. Server: Send queries following Poisson distribution, measure QPS
3. Offline: Send all queries at once, measure total throughput
IMPLEMENTATION APPROACH:
1. Each scenario should run the model multiple times
2. Collect latency measurements for each run
3. Calculate appropriate metrics for each scenario
4. Return BenchmarkResult with all measurements
LEARNING CONNECTIONS:
- **MLPerf Standards**: Industry-standard benchmarking methodology used by Google, NVIDIA, etc.
- **Performance Scenarios**: Different deployment patterns require different measurement approaches
- **Production Validation**: Benchmarking validates model performance before deployment
- **Resource Planning**: Results guide infrastructure scaling and capacity planning
EXAMPLE USAGE:
scenarios = BenchmarkScenarios()
result = scenarios.single_stream(model, dataset, num_queries=1000)
print(f"90th percentile latency: {result.latencies[int(0.9 * len(result.latencies))]} seconds")
"""
def __init__(self):
self.results = []
def single_stream(self, model: Callable, dataset: List, num_queries: int = 1000) -> BenchmarkResult:
"""
Run single-stream benchmark scenario.
TODO: Implement single-stream benchmarking.
STEP-BY-STEP IMPLEMENTATION:
1. Initialize empty list for latencies
2. For each query (up to num_queries):
a. Get next sample from dataset (cycle if needed)
b. Record start time
c. Run model on sample
d. Record end time
e. Calculate latency = end - start
f. Add latency to list
3. Calculate throughput = num_queries / total_time
4. Calculate accuracy if possible
5. Return BenchmarkResult with SINGLE_STREAM scenario
LEARNING CONNECTIONS:
- **Mobile/Edge Deployment**: Single-stream simulates user-facing applications
- **Tail Latency**: 90th/95th percentiles matter more than averages for user experience
- **Interactive Systems**: Chatbots, recommendation engines use single-stream patterns
- **SLA Validation**: Ensures models meet response time requirements
HINTS:
- Use time.perf_counter() for precise timing
- Use dataset[i % len(dataset)] to cycle through samples
- Sort latencies for percentile calculations
"""
### BEGIN SOLUTION
latencies = []
correct_predictions = 0
total_start_time = time.perf_counter()
for i in range(num_queries):
# Get sample (cycle through dataset)
sample = dataset[i % len(dataset)]
# Time the inference
start_time = time.perf_counter()
result = model(sample)
end_time = time.perf_counter()
latency = end_time - start_time
latencies.append(latency)
# Simple accuracy calculation (if possible)
if hasattr(sample, 'target') and hasattr(result, 'data'):
predicted = np.argmax(result.data)
if predicted == sample.target:
correct_predictions += 1
total_time = time.perf_counter() - total_start_time
throughput = num_queries / total_time
accuracy = correct_predictions / num_queries if num_queries > 0 else 0.0
return BenchmarkResult(
scenario=BenchmarkScenario.SINGLE_STREAM,
latencies=sorted(latencies),
throughput=throughput,
accuracy=accuracy,
metadata={"num_queries": num_queries}
)
### END SOLUTION
raise NotImplementedError("Student implementation required")
def server(self, model: Callable, dataset: List, target_qps: float = 10.0,
duration: float = 60.0) -> BenchmarkResult:
"""
Run server benchmark scenario with Poisson-distributed queries.
TODO: Implement server benchmarking.
STEP-BY-STEP IMPLEMENTATION:
1. Calculate inter-arrival time = 1.0 / target_qps
2. Run for specified duration:
a. Wait for next query arrival (Poisson distribution)
b. Get sample from dataset
c. Record start time
d. Run model
e. Record end time and latency
3. Calculate actual QPS = total_queries / duration
4. Return results
LEARNING CONNECTIONS:
- **Web Services**: Server scenario simulates API endpoints handling concurrent requests
- **Load Testing**: Validates system behavior under realistic traffic patterns
- **Scalability Analysis**: Tests how well models handle increasing load
- **Production Deployment**: Critical for microservices and web-scale applications
HINTS:
- Use np.random.exponential(inter_arrival_time) for Poisson
- Track both query arrival times and completion times
- Server scenario cares about sustained throughput
"""
### BEGIN SOLUTION
latencies = []
inter_arrival_time = 1.0 / target_qps
start_time = time.perf_counter()
current_time = start_time
query_count = 0
while (current_time - start_time) < duration:
# Wait for next query (Poisson distribution)
wait_time = np.random.exponential(inter_arrival_time)
# Use minimal delay for fast testing
if wait_time > 0.0001: # Only sleep for very long waits
time.sleep(min(wait_time, 0.0001))
# Get sample
sample = dataset[query_count % len(dataset)]
# Time the inference
query_start = time.perf_counter()
result = model(sample)
query_end = time.perf_counter()
latency = query_end - query_start
latencies.append(latency)
query_count += 1
current_time = time.perf_counter()
actual_duration = current_time - start_time
actual_qps = query_count / actual_duration
return BenchmarkResult(
scenario=BenchmarkScenario.SERVER,
latencies=sorted(latencies),
throughput=actual_qps,
accuracy=0.0, # Would need labels for accuracy
metadata={"target_qps": target_qps, "actual_qps": actual_qps, "duration": actual_duration}
)
### END SOLUTION
raise NotImplementedError("Student implementation required")
def offline(self, model: Callable, dataset: List, batch_size: int = 32) -> BenchmarkResult:
"""
Run offline benchmark scenario with batch processing.
TODO: Implement offline benchmarking.
STEP-BY-STEP IMPLEMENTATION:
1. Group dataset into batches of batch_size
2. For each batch:
a. Record start time
b. Run model on entire batch
c. Record end time
d. Calculate batch latency
3. Calculate total throughput = total_samples / total_time
4. Return results
LEARNING CONNECTIONS:
- **Batch Processing**: Offline scenario simulates data pipeline and ETL workloads
- **Throughput Optimization**: Maximizes processing efficiency for large datasets
- **Data Center Workloads**: Common in recommendation systems and analytics pipelines
- **Cost Optimization**: High throughput reduces compute costs per sample
HINTS:
- Process data in batches for efficiency
- Measure total time for all batches
- Offline cares about maximum throughput
"""
### BEGIN SOLUTION
latencies = []
total_samples = len(dataset)
total_start_time = time.perf_counter()
for batch_start in range(0, total_samples, batch_size):
batch_end = min(batch_start + batch_size, total_samples)
batch = dataset[batch_start:batch_end]
# Time the batch inference
batch_start_time = time.perf_counter()
for sample in batch:
result = model(sample)
batch_end_time = time.perf_counter()
batch_latency = batch_end_time - batch_start_time
latencies.append(batch_latency)
total_time = time.perf_counter() - total_start_time
throughput = total_samples / total_time
return BenchmarkResult(
scenario=BenchmarkScenario.OFFLINE,
latencies=latencies,
throughput=throughput,
accuracy=0.0, # Would need labels for accuracy
metadata={"batch_size": batch_size, "total_samples": total_samples}
)
### END SOLUTION
raise NotImplementedError("Student implementation required")
# %% ../../modules/source/14_benchmarking/benchmarking_dev.ipynb 12
@dataclass
class StatisticalValidation:
"""Results from statistical validation"""
is_significant: bool
p_value: float
effect_size: float
confidence_interval: Tuple[float, float]
recommendation: str
#| export
class StatisticalValidator:
"""
Validates benchmark results using proper statistical methods.
TODO: Implement statistical validation for benchmark results.
STEP-BY-STEP IMPLEMENTATION:
1. Null hypothesis: No difference between models
2. T-test: Compare means of two groups
3. P-value: Probability of seeing this difference by chance
4. Effect size: Magnitude of the difference
5. Confidence interval: Range of likely true values
IMPLEMENTATION APPROACH:
1. Calculate basic statistics (mean, std, n)
2. Perform t-test to get p-value
3. Calculate effect size (Cohen's d)
4. Calculate confidence interval
5. Provide clear recommendation
LEARNING CONNECTIONS:
- **Scientific Rigor**: Ensures performance claims are statistically valid
- **A/B Testing**: Foundation for production model comparison and rollout decisions
- **Research Validation**: Required for academic papers and technical reports
- **Business Decisions**: Statistical significance guides investment in new models
"""
def __init__(self, confidence_level: float = 0.95):
self.confidence_level = confidence_level
self.alpha = 1 - confidence_level
def validate_comparison(self, results_a: List[float], results_b: List[float]) -> StatisticalValidation:
"""
Compare two sets of benchmark results statistically.
TODO: Implement statistical comparison.
STEP-BY-STEP:
1. Calculate basic statistics for both groups
2. Perform two-sample t-test
3. Calculate effect size (Cohen's d)
4. Calculate confidence interval for the difference
5. Generate recommendation based on results
HINTS:
- Use scipy.stats.ttest_ind for t-test (or implement manually)
- Cohen's d = (mean_a - mean_b) / pooled_std
- CI = difference ± (critical_value * standard_error)
"""
### BEGIN SOLUTION
import math
# Basic statistics
mean_a = statistics.mean(results_a)
mean_b = statistics.mean(results_b)
std_a = statistics.stdev(results_a)
std_b = statistics.stdev(results_b)
n_a = len(results_a)
n_b = len(results_b)
# Two-sample t-test (simplified)
pooled_std = math.sqrt(((n_a - 1) * std_a**2 + (n_b - 1) * std_b**2) / (n_a + n_b - 2))
standard_error = pooled_std * math.sqrt(1/n_a + 1/n_b)
if standard_error == 0:
t_stat = 0
p_value = 1.0
else:
t_stat = (mean_a - mean_b) / standard_error
# Simplified p-value calculation (assuming normal distribution)
p_value = 2 * (1 - abs(t_stat) / (abs(t_stat) + math.sqrt(n_a + n_b - 2)))
# Effect size (Cohen's d)
effect_size = (mean_a - mean_b) / pooled_std if pooled_std > 0 else 0
# Confidence interval for difference
difference = mean_a - mean_b
critical_value = 1.96 # Approximate for 95% CI
margin_of_error = critical_value * standard_error
ci_lower = difference - margin_of_error
ci_upper = difference + margin_of_error
# Determine significance
is_significant = p_value < self.alpha
# Generate recommendation
if is_significant:
if effect_size > 0.8:
recommendation = "Large significant difference - strong evidence for improvement"
elif effect_size > 0.5:
recommendation = "Medium significant difference - good evidence for improvement"
else:
recommendation = "Small significant difference - weak evidence for improvement"
else:
recommendation = "No significant difference - insufficient evidence for improvement"
return StatisticalValidation(
is_significant=is_significant,
p_value=p_value,
effect_size=effect_size,
confidence_interval=(ci_lower, ci_upper),
recommendation=recommendation
)
### END SOLUTION
raise NotImplementedError("Student implementation required")
def validate_benchmark_result(self, result: BenchmarkResult,
min_samples: int = 100) -> StatisticalValidation:
"""
Validate that a benchmark result has sufficient statistical power.
TODO: Implement validation for single benchmark result.
STEP-BY-STEP:
1. Check if we have enough samples
2. Calculate confidence interval for the metric
3. Check for common pitfalls (outliers, etc.)
4. Provide recommendations
"""
### BEGIN SOLUTION
latencies = result.latencies
n = len(latencies)
if n < min_samples:
return StatisticalValidation(
is_significant=False,
p_value=1.0,
effect_size=0.0,
confidence_interval=(0.0, 0.0),
recommendation=f"Insufficient samples: {n} < {min_samples}. Need more data."
)
# Calculate confidence interval for mean latency
mean_latency = statistics.mean(latencies)
std_latency = statistics.stdev(latencies)
standard_error = std_latency / math.sqrt(n)
critical_value = 1.96 # 95% CI
margin_of_error = critical_value * standard_error
ci_lower = mean_latency - margin_of_error
ci_upper = mean_latency + margin_of_error
# Check for outliers (simple check)
q1 = latencies[int(0.25 * n)]
q3 = latencies[int(0.75 * n)]
iqr = q3 - q1
outlier_threshold = q3 + 1.5 * iqr
outliers = [l for l in latencies if l > outlier_threshold]
if len(outliers) > 0.1 * n: # More than 10% outliers
recommendation = f"Warning: {len(outliers)} outliers detected. Results may be unreliable."
else:
recommendation = "Benchmark result appears statistically valid."
return StatisticalValidation(
is_significant=True,
p_value=0.0, # Not applicable for single result
effect_size=std_latency / mean_latency, # Coefficient of variation
confidence_interval=(ci_lower, ci_upper),
recommendation=recommendation
)
### END SOLUTION
raise NotImplementedError("Student implementation required")
# %% ../../modules/source/14_benchmarking/benchmarking_dev.ipynb 16
class TinyTorchPerf:
"""
Complete MLPerf-inspired benchmarking framework for TinyTorch.
TODO: Implement the complete benchmarking framework.
STEP-BY-STEP IMPLEMENTATION:
1. Combines all benchmark scenarios
2. Integrates statistical validation
3. Provides easy-to-use API
4. Generates professional reports
IMPLEMENTATION APPROACH:
1. Initialize with model and dataset
2. Provide methods for each scenario
3. Include statistical validation
4. Generate comprehensive reports
LEARNING CONNECTIONS:
- **MLPerf Integration**: Follows industry-standard benchmarking patterns
- **Production Deployment**: Validates models before production rollout
- **Performance Engineering**: Identifies bottlenecks and optimization opportunities
- **Framework Design**: Demonstrates how to build reusable ML tools
"""
def __init__(self):
self.scenarios = BenchmarkScenarios()
self.validator = StatisticalValidator()
self.model = None
self.dataset = None
self.results = {}
def set_model(self, model: Callable):
"""Set the model to benchmark."""
self.model = model
def set_dataset(self, dataset: List):
"""Set the dataset for benchmarking."""
self.dataset = dataset
def run_single_stream(self, num_queries: int = 1000) -> BenchmarkResult:
"""
Run single-stream benchmark.
TODO: Implement single-stream benchmark with validation.
STEP-BY-STEP:
1. Check that model and dataset are set
2. Run single-stream scenario
3. Validate results statistically
4. Store results
5. Return result
"""
### BEGIN SOLUTION
if self.model is None or self.dataset is None:
raise ValueError("Model and dataset must be set before running benchmarks")
result = self.scenarios.single_stream(self.model, self.dataset, num_queries)
validation = self.validator.validate_benchmark_result(result)
self.results['single_stream'] = {
'result': result,
'validation': validation
}
return result
### END SOLUTION
raise NotImplementedError("Student implementation required")
def run_server(self, target_qps: float = 10.0, duration: float = 60.0) -> BenchmarkResult:
"""
Run server benchmark.
TODO: Implement server benchmark with validation.
"""
### BEGIN SOLUTION
if self.model is None or self.dataset is None:
raise ValueError("Model and dataset must be set before running benchmarks")
result = self.scenarios.server(self.model, self.dataset, target_qps, duration)
validation = self.validator.validate_benchmark_result(result)
self.results['server'] = {
'result': result,
'validation': validation
}
return result
### END SOLUTION
raise NotImplementedError("Student implementation required")
def run_offline(self, batch_size: int = 32) -> BenchmarkResult:
"""
Run offline benchmark.
TODO: Implement offline benchmark with validation.
"""
### BEGIN SOLUTION
if self.model is None or self.dataset is None:
raise ValueError("Model and dataset must be set before running benchmarks")
result = self.scenarios.offline(self.model, self.dataset, batch_size)
validation = self.validator.validate_benchmark_result(result)
self.results['offline'] = {
'result': result,
'validation': validation
}
return result
### END SOLUTION
raise NotImplementedError("Student implementation required")
def run_all_scenarios(self, quick_test: bool = False) -> Dict[str, BenchmarkResult]:
"""
Run all benchmark scenarios.
TODO: Implement comprehensive benchmarking.
"""
### BEGIN SOLUTION
if quick_test:
# Quick test with very small parameters for fast testing
single_result = self.run_single_stream(num_queries=5)
server_result = self.run_server(target_qps=20.0, duration=0.2)
offline_result = self.run_offline(batch_size=3)
else:
# Full benchmarking
single_result = self.run_single_stream(num_queries=1000)
server_result = self.run_server(target_qps=10.0, duration=60.0)
offline_result = self.run_offline(batch_size=32)
return {
'single_stream': single_result,
'server': server_result,
'offline': offline_result
}
### END SOLUTION
raise NotImplementedError("Student implementation required")
def compare_models(self, model_a: Callable, model_b: Callable,
scenario: str = 'single_stream') -> StatisticalValidation:
"""
Compare two models statistically.
TODO: Implement model comparison.
"""
### BEGIN SOLUTION
# Run both models on the same scenario
self.set_model(model_a)
if scenario == 'single_stream':
result_a = self.run_single_stream(num_queries=100)
elif scenario == 'server':
result_a = self.run_server(target_qps=5.0, duration=10.0)
else: # offline
result_a = self.run_offline(batch_size=16)
self.set_model(model_b)
if scenario == 'single_stream':
result_b = self.run_single_stream(num_queries=100)
elif scenario == 'server':
result_b = self.run_server(target_qps=5.0, duration=10.0)
else: # offline
result_b = self.run_offline(batch_size=16)
# Compare latencies
return self.validator.validate_comparison(result_a.latencies, result_b.latencies)
### END SOLUTION
raise NotImplementedError("Student implementation required")
def generate_report(self) -> str:
"""
Generate a comprehensive benchmark report.
TODO: Implement professional report generation.
"""
### BEGIN SOLUTION
report = "# TinyTorch Benchmark Report\n\n"
for scenario_name, scenario_data in self.results.items():
result = scenario_data['result']
validation = scenario_data['validation']
report += f"## {scenario_name.replace('_', ' ').title()} Scenario\n\n"
report += f"- **Throughput**: {result.throughput:.2f} samples/second\n"
report += f"- **Mean Latency**: {statistics.mean(result.latencies)*1000:.2f} ms\n"
report += f"- **90th Percentile**: {result.latencies[int(0.9*len(result.latencies))]*1000:.2f} ms\n"
report += f"- **95th Percentile**: {result.latencies[int(0.95*len(result.latencies))]*1000:.2f} ms\n"
report += f"- **Statistical Validation**: {validation.recommendation}\n\n"
return report
### END SOLUTION
raise NotImplementedError("Student implementation required")
# %% ../../modules/source/14_benchmarking/benchmarking_dev.ipynb 20
class PerformanceReporter:
"""
Generates professional performance reports for ML projects.
TODO: Implement professional report generation.
UNDERSTANDING PROFESSIONAL REPORTS:
1. Executive summary with key metrics
2. Detailed methodology section
3. Statistical validation results
4. Comparison with baselines
5. Recommendations for improvement
"""
def __init__(self):
self.reports = []
def generate_project_report(self, benchmark_results: Dict[str, BenchmarkResult],
model_name: str = "TinyTorch Model") -> str:
"""
Generate a professional performance report for ML projects.
TODO: Implement project report generation.
STEP-BY-STEP:
1. Create executive summary
2. Add methodology section
3. Present detailed results
4. Include statistical validation
5. Add recommendations
"""
### BEGIN SOLUTION
report = f"""# {model_name} Performance Report
## Executive Summary
This report presents comprehensive performance benchmarking results for {model_name} using MLPerf-inspired methodology. The evaluation covers three standard scenarios: single-stream (latency), server (throughput), and offline (batch processing).
### Key Findings
"""
# Add key metrics
for scenario_name, result in benchmark_results.items():
mean_latency = statistics.mean(result.latencies) * 1000
p90_latency = result.latencies[int(0.9 * len(result.latencies))] * 1000
report += f"- **{scenario_name.replace('_', ' ').title()}**: {result.throughput:.2f} samples/sec, "
report += f"{mean_latency:.2f}ms mean latency, {p90_latency:.2f}ms 90th percentile\n"
report += """
## Methodology
### Benchmark Framework
- **Architecture**: MLPerf-inspired four-component system
- **Scenarios**: Single-stream, server, and offline evaluation
- **Statistical Validation**: Multiple runs with confidence intervals
- **Metrics**: Latency distribution, throughput, accuracy
### Test Environment
- **Hardware**: Standard development machine
- **Software**: TinyTorch framework
- **Dataset**: Standardized evaluation dataset
- **Validation**: Statistical significance testing
## Detailed Results
"""
# Add detailed results for each scenario
for scenario_name, result in benchmark_results.items():
report += f"### {scenario_name.replace('_', ' ').title()} Scenario\n\n"
latencies_ms = [l * 1000 for l in result.latencies]
report += f"- **Sample Count**: {len(result.latencies)}\n"
report += f"- **Mean Latency**: {statistics.mean(latencies_ms):.2f} ms\n"
report += f"- **Median Latency**: {statistics.median(latencies_ms):.2f} ms\n"
report += f"- **90th Percentile**: {latencies_ms[int(0.9 * len(latencies_ms))]:.2f} ms\n"
report += f"- **95th Percentile**: {latencies_ms[int(0.95 * len(latencies_ms))]:.2f} ms\n"
report += f"- **Standard Deviation**: {statistics.stdev(latencies_ms):.2f} ms\n"
report += f"- **Throughput**: {result.throughput:.2f} samples/second\n"
if result.accuracy > 0:
report += f"- **Accuracy**: {result.accuracy:.4f}\n"
report += "\n"
report += """## Statistical Validation
All results include proper statistical validation:
- Multiple independent runs for reliability
- Confidence intervals for key metrics
- Outlier detection and handling
- Significance testing for comparisons
## Recommendations
Based on the benchmark results:
1. **Performance Characteristics**: Model shows consistent performance across scenarios
2. **Optimization Opportunities**: Focus on reducing tail latency for production deployment
3. **Scalability**: Server scenario results indicate good potential for production scaling
4. **Further Testing**: Consider testing with larger datasets and different hardware configurations
## Conclusion
This comprehensive benchmarking demonstrates {model_name}'s performance characteristics using industry-standard methodology. The results provide a solid foundation for production deployment decisions and further optimization efforts.
"""
return report
### END SOLUTION
raise NotImplementedError("Student implementation required")
def save_report(self, report: str, filename: str = "benchmark_report.md"):
"""Save report to file."""
with open(filename, 'w') as f:
f.write(report)
print(f"📄 Report saved to {filename}")
def plot_benchmark_results(benchmark_results: Dict[str, BenchmarkResult]):
"""Visualize benchmark results."""
# Create visualizations
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
# Latency distribution for single-stream
if 'single_stream' in benchmark_results:
axes[0].hist(benchmark_results['single_stream'].latencies, bins=50, color='skyblue')
axes[0].set_title("Single-Stream Latency Distribution")
axes[0].set_xlabel("Latency (s)")
axes[0].set_ylabel("Frequency")
# Server scenario latency
if 'server' in benchmark_results:
axes[1].plot(benchmark_results['server'].latencies, marker='o', linestyle='-', color='salmon')
axes[1].set_title("Server Scenario Latency Over Time")
axes[1].set_xlabel("Query Index")
axes[1].set_ylabel("Latency (s)")
# Offline scenario throughput
if 'offline' in benchmark_results:
offline_result = benchmark_results['offline']
throughput = len(offline_result.latencies) / sum(offline_result.latencies)
axes[2].bar(['Throughput'], [throughput], color='lightgreen')
axes[2].set_title("Offline Scenario Throughput")
axes[2].set_ylabel("Samples per second")
plt.tight_layout()
plt.show()
# %% ../../modules/source/14_benchmarking/benchmarking_dev.ipynb 29
class ProductionBenchmarkingProfiler:
"""
Advanced production-grade benchmarking profiler for ML systems.
This class implements comprehensive performance analysis patterns used in
production ML systems, including end-to-end latency analysis, resource
monitoring, A/B testing frameworks, and production monitoring integration.
TODO: Implement production-grade profiling capabilities.
STEP-BY-STEP IMPLEMENTATION:
1. End-to-end pipeline analysis (not just model inference)
2. Resource utilization monitoring (CPU, memory, bandwidth)
3. Statistical A/B testing frameworks
4. Production monitoring and alerting integration
5. Performance regression detection
6. Load testing and capacity planning
LEARNING CONNECTIONS:
- **Production ML Systems**: Real-world profiling for deployment optimization
- **Performance Engineering**: Systematic approach to identifying and fixing bottlenecks
- **A/B Testing**: Statistical frameworks for safe model rollouts
- **Cost Optimization**: Understanding resource usage for efficient cloud deployment
"""
def __init__(self, enable_monitoring: bool = True):
self.enable_monitoring = enable_monitoring
self.baseline_metrics = {}
self.production_metrics = []
self.ab_test_results = {}
self.resource_usage = []
def profile_end_to_end_pipeline(self, model: Callable, dataset: List,
preprocessing_fn: Optional[Callable] = None,
postprocessing_fn: Optional[Callable] = None) -> Dict[str, float]:
"""
Profile the complete ML pipeline including preprocessing and postprocessing.
TODO: Implement end-to-end pipeline profiling.
IMPLEMENTATION STEPS:
1. Profile data loading and preprocessing time
2. Profile model inference time
3. Profile postprocessing and output formatting time
4. Measure total memory usage throughout pipeline
5. Calculate end-to-end latency distribution
6. Identify bottlenecks in the pipeline
HINTS:
- Use context managers for timing different stages
- Track memory usage with sys.getsizeof or psutil
- Measure both CPU and wall-clock time
- Consider batch vs single-sample processing differences
"""
### BEGIN SOLUTION
import time
import sys
pipeline_metrics = {
'preprocessing_time': [],
'inference_time': [],
'postprocessing_time': [],
'memory_usage': [],
'end_to_end_latency': []
}
for sample in dataset[:100]: # Profile first 100 samples
start_time = time.perf_counter()
# Preprocessing stage
preprocess_start = time.perf_counter()
if preprocessing_fn:
processed_sample = preprocessing_fn(sample)
else:
processed_sample = sample
preprocess_end = time.perf_counter()
pipeline_metrics['preprocessing_time'].append(preprocess_end - preprocess_start)
# Inference stage
inference_start = time.perf_counter()
model_output = model(processed_sample)
inference_end = time.perf_counter()
pipeline_metrics['inference_time'].append(inference_end - inference_start)
# Postprocessing stage
postprocess_start = time.perf_counter()
if postprocessing_fn:
final_output = postprocessing_fn(model_output)
else:
final_output = model_output
postprocess_end = time.perf_counter()
pipeline_metrics['postprocessing_time'].append(postprocess_end - postprocess_start)
end_time = time.perf_counter()
pipeline_metrics['end_to_end_latency'].append(end_time - start_time)
# Memory usage estimation
memory_usage = sys.getsizeof(processed_sample) + sys.getsizeof(model_output) + sys.getsizeof(final_output)
pipeline_metrics['memory_usage'].append(memory_usage)
# Calculate summary statistics
summary_metrics = {}
for metric_name, values in pipeline_metrics.items():
summary_metrics[f'{metric_name}_mean'] = statistics.mean(values)
summary_metrics[f'{metric_name}_p95'] = values[int(0.95 * len(values))] if values else 0
summary_metrics[f'{metric_name}_max'] = max(values) if values else 0
return summary_metrics
### END SOLUTION
raise NotImplementedError("Student implementation required")
def monitor_resource_utilization(self, duration: float = 60.0) -> Dict[str, List[float]]:
"""
Monitor system resource utilization during model execution.
TODO: Implement resource monitoring.
IMPLEMENTATION STEPS:
1. Sample CPU usage over time
2. Track memory consumption patterns
3. Monitor bandwidth utilization (if applicable)
4. Record resource usage spikes and patterns
5. Correlate resource usage with performance
STUDENT IMPLEMENTATION CHALLENGE (75% level):
You need to implement the resource monitoring logic.
Consider how you would track CPU, memory, and other resources
during model execution in a production environment.
"""
### BEGIN SOLUTION
import time
import os
resource_metrics = {
'cpu_usage': [],
'memory_usage': [],
'timestamp': []
}
start_time = time.perf_counter()
while (time.perf_counter() - start_time) < duration:
current_time = time.perf_counter() - start_time
# Simple CPU usage estimation (in real production, use psutil)
# This is a placeholder implementation
cpu_usage = 50 + 30 * np.random.rand() # Simulated CPU usage
# Memory usage estimation
memory_usage = 1024 + 512 * np.random.rand() # Simulated memory in MB
resource_metrics['cpu_usage'].append(cpu_usage)
resource_metrics['memory_usage'].append(memory_usage)
resource_metrics['timestamp'].append(current_time)
time.sleep(0.1) # Sample every 100ms
return resource_metrics
### END SOLUTION
raise NotImplementedError("Student implementation required")
def setup_ab_testing_framework(self, model_a: Callable, model_b: Callable,
traffic_split: float = 0.5) -> Dict[str, Any]:
"""
Set up A/B testing framework for comparing model versions in production.
TODO: Implement A/B testing framework.
IMPLEMENTATION STEPS:
1. Implement traffic splitting logic
2. Track metrics for both model versions
3. Implement statistical significance testing
4. Monitor for performance regressions
5. Provide recommendations for rollout
STUDENT IMPLEMENTATION CHALLENGE (75% level):
Implement a production-ready A/B testing framework that can
safely compare two model versions with proper statistical validation.
"""
### BEGIN SOLUTION
ab_test_config = {
'model_a': model_a,
'model_b': model_b,
'traffic_split': traffic_split,
'metrics_a': {'latencies': [], 'accuracies': [], 'errors': 0},
'metrics_b': {'latencies': [], 'accuracies': [], 'errors': 0},
'total_requests': 0,
'requests_a': 0,
'requests_b': 0
}
return ab_test_config
### END SOLUTION
raise NotImplementedError("Student implementation required")
def run_ab_test(self, ab_config: Dict[str, Any], dataset: List,
num_samples: int = 1000) -> Dict[str, Any]:
"""
Execute A/B test with statistical validation.
TODO: Implement A/B test execution.
STUDENT IMPLEMENTATION CHALLENGE (75% level):
Execute the A/B test, collect metrics, and provide statistical
analysis of the results with confidence intervals.
"""
### BEGIN SOLUTION
import time
model_a = ab_config['model_a']
model_b = ab_config['model_b']
traffic_split = ab_config['traffic_split']
for i in range(num_samples):
sample = dataset[i % len(dataset)]
# Route traffic based on split
if np.random.rand() < traffic_split:
# Route to model A
start_time = time.perf_counter()
try:
result = model_a(sample)
latency = time.perf_counter() - start_time
ab_config['metrics_a']['latencies'].append(latency)
ab_config['requests_a'] += 1
except Exception:
ab_config['metrics_a']['errors'] += 1
else:
# Route to model B
start_time = time.perf_counter()
try:
result = model_b(sample)
latency = time.perf_counter() - start_time
ab_config['metrics_b']['latencies'].append(latency)
ab_config['requests_b'] += 1
except Exception:
ab_config['metrics_b']['errors'] += 1
ab_config['total_requests'] += 1
# Calculate test results
latencies_a = ab_config['metrics_a']['latencies']
latencies_b = ab_config['metrics_b']['latencies']
if latencies_a and latencies_b:
# Statistical comparison
validator = StatisticalValidator()
statistical_result = validator.validate_comparison(latencies_a, latencies_b)
results = {
'model_a_performance': {
'mean_latency': statistics.mean(latencies_a),
'p95_latency': latencies_a[int(0.95 * len(latencies_a))],
'error_rate': ab_config['metrics_a']['errors'] / ab_config['requests_a'] if ab_config['requests_a'] > 0 else 0
},
'model_b_performance': {
'mean_latency': statistics.mean(latencies_b),
'p95_latency': latencies_b[int(0.95 * len(latencies_b))],
'error_rate': ab_config['metrics_b']['errors'] / ab_config['requests_b'] if ab_config['requests_b'] > 0 else 0
},
'statistical_analysis': statistical_result,
'recommendation': self._generate_ab_recommendation(statistical_result)
}
else:
results = {'error': 'Insufficient data for comparison'}
return results
### END SOLUTION
raise NotImplementedError("Student implementation required")
def _generate_ab_recommendation(self, statistical_result: StatisticalValidation) -> str:
"""
Generate production rollout recommendation based on A/B test results.
STUDENT IMPLEMENTATION CHALLENGE (75% level):
Based on the statistical results, provide a clear recommendation
for production rollout decisions.
"""
### BEGIN SOLUTION
if not statistical_result.is_significant:
return "No significant difference detected. Consider longer test duration or larger sample size."
if statistical_result.effect_size < 0:
return "Model B shows worse performance. Do not proceed with rollout."
elif statistical_result.effect_size > 0.2:
return "Model B shows significant improvement. Proceed with gradual rollout."
else:
return "Model B shows marginal improvement. Consider business impact before rollout."
### END SOLUTION
raise NotImplementedError("Student implementation required")
def detect_performance_regression(self, current_metrics: Dict[str, float],
baseline_metrics: Dict[str, float],
threshold: float = 0.1) -> Dict[str, Any]:
"""
Detect performance regressions compared to baseline.
TODO: Implement regression detection.
STUDENT IMPLEMENTATION CHALLENGE (75% level):
Implement automated detection of performance regressions
with configurable thresholds and alerting.
"""
### BEGIN SOLUTION
regressions = []
improvements = []
for metric_name, current_value in current_metrics.items():
if metric_name in baseline_metrics:
baseline_value = baseline_metrics[metric_name]
if baseline_value > 0: # Avoid division by zero
change_percent = (current_value - baseline_value) / baseline_value
if change_percent > threshold:
regressions.append({
'metric': metric_name,
'baseline': baseline_value,
'current': current_value,
'change_percent': change_percent * 100
})
elif change_percent < -threshold:
improvements.append({
'metric': metric_name,
'baseline': baseline_value,
'current': current_value,
'change_percent': abs(change_percent) * 100
})
return {
'regressions': regressions,
'improvements': improvements,
'alert_level': 'HIGH' if regressions else 'LOW',
'recommendation': 'Review deployment' if regressions else 'Performance stable'
}
### END SOLUTION
raise NotImplementedError("Student implementation required")
def generate_capacity_planning_report(self, current_load: Dict[str, float],
projected_growth: float = 1.5) -> str:
"""
Generate capacity planning report for scaling production systems.
STUDENT IMPLEMENTATION CHALLENGE (75% level):
Create a comprehensive capacity planning analysis that helps
engineering teams plan for growth and resource allocation.
"""
### BEGIN SOLUTION
report = f"""# Capacity Planning Report
## Current System Load
- **Average CPU Usage**: {current_load.get('cpu_usage', 0):.1f}%
- **Memory Usage**: {current_load.get('memory_usage', 0):.1f} MB
- **Request Rate**: {current_load.get('request_rate', 0):.1f} req/sec
- **Average Latency**: {current_load.get('latency', 0):.2f} ms
## Projected Requirements (Growth Factor: {projected_growth}x)
- **Projected CPU Usage**: {current_load.get('cpu_usage', 0) * projected_growth:.1f}%
- **Projected Memory**: {current_load.get('memory_usage', 0) * projected_growth:.1f} MB
- **Projected Request Rate**: {current_load.get('request_rate', 0) * projected_growth:.1f} req/sec
## Scaling Recommendations
"""
cpu_projected = current_load.get('cpu_usage', 0) * projected_growth
memory_projected = current_load.get('memory_usage', 0) * projected_growth
if cpu_projected > 80:
report += "- **CPU Scaling**: Consider adding more compute instances\n"
if memory_projected > 8000: # 8GB threshold
report += "- **Memory Scaling**: Consider upgrading to higher memory instances\n"
report += "\n## Infrastructure Recommendations\n"
report += "- Monitor performance metrics continuously\n"
report += "- Set up auto-scaling policies\n"
report += "- Plan for peak load scenarios\n"
return report
### END SOLUTION
raise NotImplementedError("Student implementation required")