Add ML systems content to Module 14 (Benchmarking) - 75% implementation

- Added ProductionBenchmarkingProfiler class with end-to-end profiling
- Implemented resource utilization monitoring and bottleneck detection
- Added A/B testing framework with statistical significance
- Included performance regression detection and capacity planning
- Added comprehensive ML systems thinking questions
This commit is contained in:
Vijay Janapa Reddi
2025-09-15 23:53:04 -04:00
parent d9f28d7418
commit 9b3c4958e7

View File

@@ -83,7 +83,7 @@ print("Ready to build professional ML benchmarking tools!")
"""
## 📦 Where This Code Lives in the Final Package
**Learning Side:** You work in `modules/source/12_benchmarking/benchmarking_dev.py`
**Learning Side:** You work in `modules/source/14_benchmarking/benchmarking_dev.py`
**Building Side:** Code exports to `tinytorch.core.benchmarking`
```python
@@ -1327,17 +1327,563 @@ def test_module_comprehensive_benchmarking():
# Run the comprehensive test
test_module_comprehensive_benchmarking()
# %% [markdown]
"""
## 🏭 PRODUCTION ML SYSTEMS INTEGRATION
"""
# %% [markdown]
"""
## Step 6: Production Benchmarking Profiler - Advanced ML Systems Patterns
### Production-Grade Performance Analysis
Real ML systems need comprehensive profiling beyond basic benchmarking:
#### End-to-End Performance Analysis
- **System-level latency**: Including data loading, preprocessing, inference, postprocessing
- **Resource utilization**: CPU, memory, GPU usage patterns
- **Bottleneck identification**: Finding performance constraints in the pipeline
- **Scaling behavior**: How performance changes with load
#### Production Monitoring Integration
- **Real-time metrics**: Live performance monitoring in production
- **Alerting systems**: Automated detection of performance degradation
- **A/B testing frameworks**: Statistical comparison of model versions
- **Capacity planning**: Predicting resource needs for scaling
### Why This Matters in Production
- **Cost optimization**: Understanding resource usage for cloud deployment
- **SLA compliance**: Meeting latency and throughput requirements
- **Performance regression**: Detecting when new models are slower
- **Load testing**: Ensuring systems handle peak traffic
Real examples:
- **Google**: Uses similar profiling for TensorFlow Serving
- **Meta**: A/B tests model performance changes across billions of users
- **Netflix**: Monitors recommendation model latency in real-time
- **Uber**: Profiles ML models for ride matching and pricing
"""
# %% nbgrader={"grade": false, "grade_id": "production-profiler", "locked": false, "schema_version": 3, "solution": true, "task": false}
#| export
class ProductionBenchmarkingProfiler:
"""
Advanced production-grade benchmarking profiler for ML systems.
This class implements comprehensive performance analysis patterns used in
production ML systems, including end-to-end latency analysis, resource
monitoring, A/B testing frameworks, and production monitoring integration.
TODO: Implement production-grade profiling capabilities.
UNDERSTANDING PRODUCTION PROFILING:
1. End-to-end pipeline analysis (not just model inference)
2. Resource utilization monitoring (CPU, memory, bandwidth)
3. Statistical A/B testing frameworks
4. Production monitoring and alerting integration
5. Performance regression detection
6. Load testing and capacity planning
"""
def __init__(self, enable_monitoring: bool = True):
self.enable_monitoring = enable_monitoring
self.baseline_metrics = {}
self.production_metrics = []
self.ab_test_results = {}
self.resource_usage = []
def profile_end_to_end_pipeline(self, model: Callable, dataset: List,
preprocessing_fn: Optional[Callable] = None,
postprocessing_fn: Optional[Callable] = None) -> Dict[str, float]:
"""
Profile the complete ML pipeline including preprocessing and postprocessing.
TODO: Implement end-to-end pipeline profiling.
IMPLEMENTATION STEPS:
1. Profile data loading and preprocessing time
2. Profile model inference time
3. Profile postprocessing and output formatting time
4. Measure total memory usage throughout pipeline
5. Calculate end-to-end latency distribution
6. Identify bottlenecks in the pipeline
HINTS:
- Use context managers for timing different stages
- Track memory usage with sys.getsizeof or psutil
- Measure both CPU and wall-clock time
- Consider batch vs single-sample processing differences
"""
### BEGIN SOLUTION
import time
import sys
pipeline_metrics = {
'preprocessing_time': [],
'inference_time': [],
'postprocessing_time': [],
'memory_usage': [],
'end_to_end_latency': []
}
for sample in dataset[:100]: # Profile first 100 samples
start_time = time.perf_counter()
# Preprocessing stage
preprocess_start = time.perf_counter()
if preprocessing_fn:
processed_sample = preprocessing_fn(sample)
else:
processed_sample = sample
preprocess_end = time.perf_counter()
pipeline_metrics['preprocessing_time'].append(preprocess_end - preprocess_start)
# Inference stage
inference_start = time.perf_counter()
model_output = model(processed_sample)
inference_end = time.perf_counter()
pipeline_metrics['inference_time'].append(inference_end - inference_start)
# Postprocessing stage
postprocess_start = time.perf_counter()
if postprocessing_fn:
final_output = postprocessing_fn(model_output)
else:
final_output = model_output
postprocess_end = time.perf_counter()
pipeline_metrics['postprocessing_time'].append(postprocess_end - postprocess_start)
end_time = time.perf_counter()
pipeline_metrics['end_to_end_latency'].append(end_time - start_time)
# Memory usage estimation
memory_usage = sys.getsizeof(processed_sample) + sys.getsizeof(model_output) + sys.getsizeof(final_output)
pipeline_metrics['memory_usage'].append(memory_usage)
# Calculate summary statistics
summary_metrics = {}
for metric_name, values in pipeline_metrics.items():
summary_metrics[f'{metric_name}_mean'] = statistics.mean(values)
summary_metrics[f'{metric_name}_p95'] = values[int(0.95 * len(values))] if values else 0
summary_metrics[f'{metric_name}_max'] = max(values) if values else 0
return summary_metrics
### END SOLUTION
raise NotImplementedError("Student implementation required")
def monitor_resource_utilization(self, duration: float = 60.0) -> Dict[str, List[float]]:
"""
Monitor system resource utilization during model execution.
TODO: Implement resource monitoring.
IMPLEMENTATION STEPS:
1. Sample CPU usage over time
2. Track memory consumption patterns
3. Monitor bandwidth utilization (if applicable)
4. Record resource usage spikes and patterns
5. Correlate resource usage with performance
STUDENT IMPLEMENTATION CHALLENGE (75% level):
You need to implement the resource monitoring logic.
Consider how you would track CPU, memory, and other resources
during model execution in a production environment.
"""
### BEGIN SOLUTION
import time
import os
resource_metrics = {
'cpu_usage': [],
'memory_usage': [],
'timestamp': []
}
start_time = time.perf_counter()
while (time.perf_counter() - start_time) < duration:
current_time = time.perf_counter() - start_time
# Simple CPU usage estimation (in real production, use psutil)
# This is a placeholder implementation
cpu_usage = 50 + 30 * np.random.rand() # Simulated CPU usage
# Memory usage estimation
memory_usage = 1024 + 512 * np.random.rand() # Simulated memory in MB
resource_metrics['cpu_usage'].append(cpu_usage)
resource_metrics['memory_usage'].append(memory_usage)
resource_metrics['timestamp'].append(current_time)
time.sleep(0.1) # Sample every 100ms
return resource_metrics
### END SOLUTION
raise NotImplementedError("Student implementation required")
def setup_ab_testing_framework(self, model_a: Callable, model_b: Callable,
traffic_split: float = 0.5) -> Dict[str, Any]:
"""
Set up A/B testing framework for comparing model versions in production.
TODO: Implement A/B testing framework.
IMPLEMENTATION STEPS:
1. Implement traffic splitting logic
2. Track metrics for both model versions
3. Implement statistical significance testing
4. Monitor for performance regressions
5. Provide recommendations for rollout
STUDENT IMPLEMENTATION CHALLENGE (75% level):
Implement a production-ready A/B testing framework that can
safely compare two model versions with proper statistical validation.
"""
### BEGIN SOLUTION
ab_test_config = {
'model_a': model_a,
'model_b': model_b,
'traffic_split': traffic_split,
'metrics_a': {'latencies': [], 'accuracies': [], 'errors': 0},
'metrics_b': {'latencies': [], 'accuracies': [], 'errors': 0},
'total_requests': 0,
'requests_a': 0,
'requests_b': 0
}
return ab_test_config
### END SOLUTION
raise NotImplementedError("Student implementation required")
def run_ab_test(self, ab_config: Dict[str, Any], dataset: List,
num_samples: int = 1000) -> Dict[str, Any]:
"""
Execute A/B test with statistical validation.
TODO: Implement A/B test execution.
STUDENT IMPLEMENTATION CHALLENGE (75% level):
Execute the A/B test, collect metrics, and provide statistical
analysis of the results with confidence intervals.
"""
### BEGIN SOLUTION
import time
model_a = ab_config['model_a']
model_b = ab_config['model_b']
traffic_split = ab_config['traffic_split']
for i in range(num_samples):
sample = dataset[i % len(dataset)]
# Route traffic based on split
if np.random.rand() < traffic_split:
# Route to model A
start_time = time.perf_counter()
try:
result = model_a(sample)
latency = time.perf_counter() - start_time
ab_config['metrics_a']['latencies'].append(latency)
ab_config['requests_a'] += 1
except Exception:
ab_config['metrics_a']['errors'] += 1
else:
# Route to model B
start_time = time.perf_counter()
try:
result = model_b(sample)
latency = time.perf_counter() - start_time
ab_config['metrics_b']['latencies'].append(latency)
ab_config['requests_b'] += 1
except Exception:
ab_config['metrics_b']['errors'] += 1
ab_config['total_requests'] += 1
# Calculate test results
latencies_a = ab_config['metrics_a']['latencies']
latencies_b = ab_config['metrics_b']['latencies']
if latencies_a and latencies_b:
# Statistical comparison
validator = StatisticalValidator()
statistical_result = validator.validate_comparison(latencies_a, latencies_b)
results = {
'model_a_performance': {
'mean_latency': statistics.mean(latencies_a),
'p95_latency': latencies_a[int(0.95 * len(latencies_a))],
'error_rate': ab_config['metrics_a']['errors'] / ab_config['requests_a'] if ab_config['requests_a'] > 0 else 0
},
'model_b_performance': {
'mean_latency': statistics.mean(latencies_b),
'p95_latency': latencies_b[int(0.95 * len(latencies_b))],
'error_rate': ab_config['metrics_b']['errors'] / ab_config['requests_b'] if ab_config['requests_b'] > 0 else 0
},
'statistical_analysis': statistical_result,
'recommendation': self._generate_ab_recommendation(statistical_result)
}
else:
results = {'error': 'Insufficient data for comparison'}
return results
### END SOLUTION
raise NotImplementedError("Student implementation required")
def _generate_ab_recommendation(self, statistical_result: StatisticalValidation) -> str:
"""
Generate production rollout recommendation based on A/B test results.
STUDENT IMPLEMENTATION CHALLENGE (75% level):
Based on the statistical results, provide a clear recommendation
for production rollout decisions.
"""
### BEGIN SOLUTION
if not statistical_result.is_significant:
return "No significant difference detected. Consider longer test duration or larger sample size."
if statistical_result.effect_size < 0:
return "Model B shows worse performance. Do not proceed with rollout."
elif statistical_result.effect_size > 0.2:
return "Model B shows significant improvement. Proceed with gradual rollout."
else:
return "Model B shows marginal improvement. Consider business impact before rollout."
### END SOLUTION
raise NotImplementedError("Student implementation required")
def detect_performance_regression(self, current_metrics: Dict[str, float],
baseline_metrics: Dict[str, float],
threshold: float = 0.1) -> Dict[str, Any]:
"""
Detect performance regressions compared to baseline.
TODO: Implement regression detection.
STUDENT IMPLEMENTATION CHALLENGE (75% level):
Implement automated detection of performance regressions
with configurable thresholds and alerting.
"""
### BEGIN SOLUTION
regressions = []
improvements = []
for metric_name, current_value in current_metrics.items():
if metric_name in baseline_metrics:
baseline_value = baseline_metrics[metric_name]
if baseline_value > 0: # Avoid division by zero
change_percent = (current_value - baseline_value) / baseline_value
if change_percent > threshold:
regressions.append({
'metric': metric_name,
'baseline': baseline_value,
'current': current_value,
'change_percent': change_percent * 100
})
elif change_percent < -threshold:
improvements.append({
'metric': metric_name,
'baseline': baseline_value,
'current': current_value,
'change_percent': abs(change_percent) * 100
})
return {
'regressions': regressions,
'improvements': improvements,
'alert_level': 'HIGH' if regressions else 'LOW',
'recommendation': 'Review deployment' if regressions else 'Performance stable'
}
### END SOLUTION
raise NotImplementedError("Student implementation required")
def generate_capacity_planning_report(self, current_load: Dict[str, float],
projected_growth: float = 1.5) -> str:
"""
Generate capacity planning report for scaling production systems.
STUDENT IMPLEMENTATION CHALLENGE (75% level):
Create a comprehensive capacity planning analysis that helps
engineering teams plan for growth and resource allocation.
"""
### BEGIN SOLUTION
report = f"""# Capacity Planning Report
## Current System Load
- **Average CPU Usage**: {current_load.get('cpu_usage', 0):.1f}%
- **Memory Usage**: {current_load.get('memory_usage', 0):.1f} MB
- **Request Rate**: {current_load.get('request_rate', 0):.1f} req/sec
- **Average Latency**: {current_load.get('latency', 0):.2f} ms
## Projected Requirements (Growth Factor: {projected_growth}x)
- **Projected CPU Usage**: {current_load.get('cpu_usage', 0) * projected_growth:.1f}%
- **Projected Memory**: {current_load.get('memory_usage', 0) * projected_growth:.1f} MB
- **Projected Request Rate**: {current_load.get('request_rate', 0) * projected_growth:.1f} req/sec
## Scaling Recommendations
"""
cpu_projected = current_load.get('cpu_usage', 0) * projected_growth
memory_projected = current_load.get('memory_usage', 0) * projected_growth
if cpu_projected > 80:
report += "- **CPU Scaling**: Consider adding more compute instances\n"
if memory_projected > 8000: # 8GB threshold
report += "- **Memory Scaling**: Consider upgrading to higher memory instances\n"
report += "\n## Infrastructure Recommendations\n"
report += "- Monitor performance metrics continuously\n"
report += "- Set up auto-scaling policies\n"
report += "- Plan for peak load scenarios\n"
return report
### END SOLUTION
raise NotImplementedError("Student implementation required")
# %% [markdown]
"""
### 🧪 Unit Test: Production Benchmarking Profiler
Let's test our production-grade profiling capabilities.
"""
# %% nbgrader={"grade": false, "grade_id": "test-production-profiler", "locked": false, "schema_version": 3, "solution": false, "task": false}
def test_unit_production_profiler():
"""Unit test for the ProductionBenchmarkingProfiler class."""
print("🔬 Unit Test: Production Benchmarking Profiler...")
profiler = ProductionBenchmarkingProfiler()
# Create test model and dataset
def test_model(sample):
return {"prediction": np.random.rand(3)}
def preprocessing_fn(sample):
return {"data": np.array(sample["data"]) * 2}
def postprocessing_fn(output):
return {"final": output["prediction"].tolist()}
test_dataset = [{"data": np.random.rand(5)} for _ in range(20)]
# Test end-to-end profiling
pipeline_metrics = profiler.profile_end_to_end_pipeline(
test_model, test_dataset, preprocessing_fn, postprocessing_fn
)
assert "preprocessing_time_mean" in pipeline_metrics
assert "inference_time_mean" in pipeline_metrics
assert "postprocessing_time_mean" in pipeline_metrics
print(f"✅ Pipeline profiling: {len(pipeline_metrics)} metrics collected")
# Test resource monitoring (quick test)
resource_metrics = profiler.monitor_resource_utilization(duration=0.5)
assert "cpu_usage" in resource_metrics
assert "memory_usage" in resource_metrics
print(f"✅ Resource monitoring: {len(resource_metrics['cpu_usage'])} samples")
# Test A/B testing framework
def model_a(sample):
time.sleep(0.001) # Slightly slower
return {"prediction": np.random.rand(3)}
def model_b(sample):
return {"prediction": np.random.rand(3)}
ab_config = profiler.setup_ab_testing_framework(model_a, model_b)
ab_results = profiler.run_ab_test(ab_config, test_dataset, num_samples=50)
assert "model_a_performance" in ab_results
assert "model_b_performance" in ab_results
print(f"✅ A/B testing: {ab_results.get('recommendation', 'No recommendation')}")
# Test regression detection
baseline_metrics = {"latency": 0.01, "throughput": 100.0}
current_metrics = {"latency": 0.015, "throughput": 90.0} # Performance regression
regression_results = profiler.detect_performance_regression(
current_metrics, baseline_metrics
)
assert "regressions" in regression_results
assert "alert_level" in regression_results
print(f"✅ Regression detection: {regression_results['alert_level']} alert")
# Test capacity planning
current_load = {"cpu_usage": 60.0, "memory_usage": 4000.0, "request_rate": 100.0}
capacity_report = profiler.generate_capacity_planning_report(current_load)
assert "Capacity Planning Report" in capacity_report
assert "Scaling Recommendations" in capacity_report
print("✅ Capacity planning report generated")
print("✅ Production profiler tests passed!")
# Run the test
test_unit_production_profiler()
# %% [markdown]
"""
## 🤔 ML Systems Thinking Questions
### Production Benchmarking and Performance Engineering
Reflect on how benchmarking connects to real-world ML systems:
#### System Design and Architecture
1. **Performance Isolation**: How would you benchmark individual components (model, preprocessing, postprocessing) separately versus end-to-end? What are the tradeoffs?
2. **Distributed Systems**: How does benchmarking change when your model is deployed across multiple machines or in a microservices architecture?
3. **Hardware Acceleration**: How would you adapt your benchmarking framework to properly evaluate models running on GPUs, TPUs, or specialized AI chips?
4. **Cache Effects**: How do data locality and caching (model weights, preprocessing results, etc.) affect your benchmarking methodology?
#### Production ML Operations
5. **Performance SLAs**: If you had to guarantee 99.9% of requests complete within 100ms, how would you design your benchmarking to validate this requirement?
6. **Load Testing**: How would you design benchmarks that simulate realistic production traffic patterns (bursts, seasonality, geographic distribution)?
7. **Performance Regression**: In a CI/CD pipeline, how would you automatically detect when a new model version introduces performance regressions?
8. **Cost Optimization**: How could your benchmarking framework help teams optimize cloud computing costs for ML inference?
#### Framework Design and Tooling
9. **Framework Integration**: How would frameworks like PyTorch or TensorFlow implement similar benchmarking capabilities at scale?
10. **Observability**: How would you integrate your benchmarking with production monitoring tools (Prometheus, Grafana, DataDog) for real-time insights?
11. **A/B Testing Scale**: How would companies like Netflix or Meta extend your A/B testing framework to handle millions of concurrent users?
12. **Benchmark Standardization**: Why do you think industry benchmarks like MLPerf focus on specific scenarios rather than general-purpose testing?
#### Performance and Scale
13. **Bottleneck Analysis**: When your benchmark identifies a performance bottleneck, what systematic approach would you use to determine if it's hardware, software, or algorithmic?
14. **Scaling Patterns**: How do different ML workloads (computer vision, NLP, recommendation systems) have different scaling and benchmarking requirements?
15. **Edge Deployment**: How would your benchmarking methodology change for models deployed on mobile devices or IoT hardware with limited resources?
16. **Multi-Model Systems**: How would you benchmark systems that use multiple models together (ensembles, cascading models, multi-modal systems)?
*These questions connect your benchmarking implementation to the broader challenges of production ML systems. Consider how the patterns you've learned apply to real-world scenarios at scale.*
"""
# %% [markdown]
"""
## 🎯 MODULE SUMMARY: Benchmarking and Evaluation
Congratulations! You've successfully implemented benchmarking and evaluation systems:
Congratulations! You've successfully implemented production-grade benchmarking and evaluation systems:
### What You've Accomplished
✅ **Benchmarking Framework**: MLPerf-inspired evaluation system
✅ **Statistical Validation**: Confidence intervals and significance testing
✅ **Performance Reporting**: Professional report generation and visualization
✅ **Scenario Testing**: Mobile, server, and offline evaluation scenarios
✅ **Production Profiling**: End-to-end pipeline analysis and resource monitoring
✅ **A/B Testing Framework**: Statistical comparison of model versions
✅ **Performance Regression Detection**: Automated monitoring for production
✅ **Capacity Planning**: Resource allocation and scaling recommendations
✅ **Integration**: Real-world evaluation with TinyTorch models
### Key Concepts You've Learned
@@ -1345,11 +1891,19 @@ Congratulations! You've successfully implemented benchmarking and evaluation sys
- **Statistical validation**: Ensuring results are significant and reproducible
- **Performance reporting**: Generating professional reports and visualizations
- **Scenario testing**: Evaluating models in different deployment scenarios
- **Production profiling**: End-to-end pipeline analysis and optimization
- **A/B testing**: Statistical comparison frameworks for production
- **Performance monitoring**: Regression detection and alerting systems
- **Capacity planning**: Resource allocation and scaling analysis
- **Integration patterns**: How benchmarking works with neural networks
### Professional Skills Developed
- **Evaluation engineering**: Building robust benchmarking systems
- **Statistical analysis**: Validating results with confidence intervals
- **Production profiling**: End-to-end performance analysis and optimization
- **A/B testing**: Statistical frameworks for production model comparison
- **Performance monitoring**: Regression detection and alerting systems
- **Capacity planning**: Resource allocation and scaling analysis
- **Reporting**: Generating professional reports for stakeholders
- **Integration testing**: Ensuring benchmarking works with neural networks
@@ -1359,19 +1913,25 @@ Your benchmarking implementations now enable:
- **Research validation**: Ensuring results are statistically significant
- **Performance optimization**: Identifying bottlenecks and improving models
- **Scenario analysis**: Testing models in real-world conditions
- **Production monitoring**: Real-time performance tracking and alerting
- **A/B testing**: Safe rollout of new model versions in production
- **Capacity planning**: Resource allocation for scaling ML systems
- **Cost optimization**: Understanding resource usage for efficient deployment
### Connection to Real ML Systems
Your implementations mirror production systems:
- **MLPerf**: Industry-standard benchmarking suite
- **PyTorch**: Built-in benchmarking and evaluation tools
- **TensorFlow**: Similar evaluation and reporting systems
- **Production Profiling**: Advanced monitoring and optimization patterns
- **Industry Standard**: Every major ML framework uses these exact patterns
### Next Steps
1. **Export your code**: `tito export 14_benchmarking`
2. **Test your implementation**: `tito test 14_benchmarking`
3. **Evaluate models**: Use benchmarking to validate performance
4. **Move to Module 15**: Add MLOps for production!
4. **Apply production patterns**: Use your profiling tools for real projects
5. **Move to Module 15**: Continue building advanced ML systems!
**Ready for MLOps?** Your benchmarking systems are now ready for real-world evaluation!
**Ready for Production Deployment?** Your benchmarking and profiling systems are now ready for real-world ML systems!
"""