# ╔═══════════════════════════════════════════════════════════════════════════════╗ # ║ 🚨 CRITICAL WARNING 🚨 ║ # ║ AUTOGENERATED! DO NOT EDIT! ║ # ║ ║ # ║ This file is AUTOMATICALLY GENERATED from source modules. ║ # ║ ANY CHANGES MADE HERE WILL BE LOST when modules are re-exported! ║ # ║ ║ # ║ ✅ TO EDIT: src/XX_benchmark/XX_benchmark.py ║ # ║ ✅ TO EXPORT: Run 'tito module complete ' ║ # ║ ║ # ║ 🛡️ STUDENT PROTECTION: This file contains optimized implementations. ║ # ║ Editing it directly may break module functionality and training. ║ # ║ ║ # ║ 🎓 LEARNING TIP: Work in src/ (developers) or modules/ (learners) ║ # ║ The tinytorch/ directory is generated code - edit source files instead! ║ # ╚═══════════════════════════════════════════════════════════════════════════════╝ # %% auto 0 __all__ = ['DEFAULT_WARMUP_RUNS', 'DEFAULT_MEASUREMENT_RUNS', 'BenchmarkResult', 'test_unit_benchmark_result', 'Benchmark', 'test_unit_benchmark', 'BenchmarkSuite', 'test_unit_benchmark_suite', 'TinyMLPerf', 'test_unit_tinymlperf'] # %% ../../modules/19_benchmarking/19_benchmarking.ipynb 0 from dataclasses import dataclass, field from typing import List, Dict, Optional, Any, Callable, Tuple import time import platform import os import statistics import numpy as np from tinytorch.profiling.profiler import Profiler # Constants for benchmarking defaults DEFAULT_WARMUP_RUNS = 5 # Default warmup runs for JIT compilation and cache warming DEFAULT_MEASUREMENT_RUNS = 10 # Default measurement runs for statistical significance # %% ../../modules/19_benchmarking/19_benchmarking.ipynb 9 @dataclass class BenchmarkResult: """ Container for benchmark measurements with statistical analysis. TODO: Implement a robust result container that stores measurements and metadata APPROACH: 1. Store raw measurements and computed statistics 2. Include metadata about test conditions 3. Provide methods for statistical analysis 4. Support serialization for result persistence EXAMPLE: >>> result = BenchmarkResult("model_accuracy", [0.95, 0.94, 0.96]) >>> print(f"Mean: {result.mean:.3f} ± {result.std:.3f}") Mean: 0.950 ± 0.010 HINTS: - Use statistics module for robust mean/std calculations - Store both raw data and summary statistics - Include confidence intervals for professional reporting """ ### BEGIN SOLUTION metric_name: str values: List[float] metadata: Dict[str, Any] = field(default_factory=dict) def __post_init__(self): """Compute statistics after initialization.""" if not self.values: raise ValueError( "BenchmarkResult requires at least one measurement.\n" " Issue: Cannot compute statistics without any measurements.\n" " Fix: Ensure benchmark runs produce at least one measurement before creating BenchmarkResult." ) self.mean = statistics.mean(self.values) self.std = statistics.stdev(self.values) if len(self.values) > 1 else 0.0 self.median = statistics.median(self.values) self.min_val = min(self.values) self.max_val = max(self.values) self.count = len(self.values) # 95% confidence interval for the mean if len(self.values) > 1: t_score = 1.96 # Approximate for large samples margin_error = t_score * (self.std / np.sqrt(self.count)) self.ci_lower = self.mean - margin_error self.ci_upper = self.mean + margin_error else: self.ci_lower = self.ci_upper = self.mean def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for serialization.""" return { 'metric_name': self.metric_name, 'values': self.values, 'mean': self.mean, 'std': self.std, 'median': self.median, 'min': self.min_val, 'max': self.max_val, 'count': self.count, 'ci_lower': self.ci_lower, 'ci_upper': self.ci_upper, 'metadata': self.metadata } def __str__(self) -> str: return f"{self.metric_name}: {self.mean:.4f} ± {self.std:.4f} (n={self.count})" ### END SOLUTION def test_unit_benchmark_result(): """🔬 Test BenchmarkResult statistical calculations.""" print("🔬 Unit Test: BenchmarkResult...") # Test basic statistics values = [1.0, 2.0, 3.0, 4.0, 5.0] result = BenchmarkResult("test_metric", values) assert result.mean == 3.0 assert abs(result.std - statistics.stdev(values)) < 1e-10 assert result.median == 3.0 assert result.min_val == 1.0 assert result.max_val == 5.0 assert result.count == 5 # Test confidence intervals assert result.ci_lower < result.mean < result.ci_upper # Test serialization result_dict = result.to_dict() assert result_dict['metric_name'] == "test_metric" assert result_dict['mean'] == 3.0 print("✅ BenchmarkResult works correctly!") if __name__ == "__main__": test_unit_benchmark_result() # %% ../../modules/19_benchmarking/19_benchmarking.ipynb 13 class Benchmark: """ Professional benchmarking system for ML models and operations. TODO: Implement a comprehensive benchmark runner with statistical rigor APPROACH: 1. Support multiple models, datasets, and metrics 2. Run repeated measurements with proper warmup 3. Control for system variance and compute confidence intervals 4. Generate structured results for analysis EXAMPLE: >>> benchmark = Benchmark(models=[model1, model2], datasets=[test_data]) >>> results = benchmark.run_accuracy_benchmark() >>> benchmark.plot_results(results) HINTS: - Use warmup runs to stabilize performance - Collect multiple samples for statistical significance - Store metadata about system conditions - Provide different benchmark types (accuracy, latency, memory) """ ### BEGIN SOLUTION def __init__(self, models: List[Any], datasets: List[Any], warmup_runs: int = DEFAULT_WARMUP_RUNS, measurement_runs: int = DEFAULT_MEASUREMENT_RUNS): """Initialize benchmark with models and datasets.""" self.models = models self.datasets = datasets self.warmup_runs = warmup_runs self.measurement_runs = measurement_runs self.results = {} # Use Profiler from Module 14 for measurements self.profiler = Profiler() # System information for metadata (using Python standard library) self.system_info = { 'platform': platform.platform(), 'processor': platform.processor(), 'python_version': platform.python_version(), 'cpu_count': os.cpu_count() or 1, # os.cpu_count() can return None } # Note: System total memory not available via standard library # Process memory measurement uses tracemalloc (via Profiler) def run_latency_benchmark(self, input_shape: Tuple[int, ...] = (1, 28, 28)) -> Dict[str, BenchmarkResult]: """Benchmark model inference latency using Profiler.""" results = {} for i, model in enumerate(self.models): model_name = getattr(model, 'name', f'model_{i}') # Create input tensor for profiling from tinytorch.core.tensor import Tensor input_tensor = Tensor(np.random.randn(*input_shape).astype(np.float32)) # Use Profiler to measure latency with proper warmup and iterations latency_ms = self.profiler.measure_latency( model, input_tensor, warmup=self.warmup_runs, iterations=self.measurement_runs ) # Profiler returns single median value # For BenchmarkResult, we need multiple measurements # Run additional measurements for statistical analysis latencies = [] for _ in range(self.measurement_runs): single_latency = self.profiler.measure_latency( model, input_tensor, warmup=0, iterations=1 ) latencies.append(single_latency) results[model_name] = BenchmarkResult( f"{model_name}_latency_ms", latencies, metadata={'input_shape': input_shape, **self.system_info} ) return results def run_accuracy_benchmark(self) -> Dict[str, BenchmarkResult]: """Benchmark model accuracy across datasets.""" results = {} for i, model in enumerate(self.models): model_name = getattr(model, 'name', f'model_{i}') accuracies = [] for dataset in self.datasets: # Simulate accuracy measurement # In practice, this would evaluate the model on the dataset try: if hasattr(model, 'evaluate'): accuracy = model.evaluate(dataset) else: # Simulate accuracy for demonstration base_accuracy = 0.85 + i * 0.05 # Different models have different base accuracies accuracy = base_accuracy + np.random.normal(0, 0.02) # Add noise accuracy = max(0.0, min(1.0, accuracy)) # Clamp to [0, 1] except: # Fallback simulation accuracy = 0.80 + np.random.normal(0, 0.05) accuracy = max(0.0, min(1.0, accuracy)) accuracies.append(accuracy) results[model_name] = BenchmarkResult( f"{model_name}_accuracy", accuracies, metadata={'num_datasets': len(self.datasets), **self.system_info} ) return results def run_memory_benchmark(self, input_shape: Tuple[int, ...] = (1, 28, 28)) -> Dict[str, BenchmarkResult]: """Benchmark model memory usage using Profiler.""" results = {} for i, model in enumerate(self.models): model_name = getattr(model, 'name', f'model_{i}') memory_usages = [] for run in range(self.measurement_runs): # Use Profiler to measure memory memory_stats = self.profiler.measure_memory(model, input_shape) # Use peak_memory_mb as the primary metric memory_used = memory_stats['peak_memory_mb'] # If no significant memory change detected, estimate from parameters if memory_used < 1.0: param_count = self.profiler.count_parameters(model) memory_used = param_count * 4 / (1024**2) # 4 bytes per float32 memory_usages.append(max(0, memory_used)) results[model_name] = BenchmarkResult( f"{model_name}_memory_mb", memory_usages, metadata={'input_shape': input_shape, **self.system_info} ) return results def compare_models(self, metric: str = "latency"): """Compare models across a specific metric.""" if metric == "latency": results = self.run_latency_benchmark() elif metric == "accuracy": results = self.run_accuracy_benchmark() elif metric == "memory": results = self.run_memory_benchmark() else: raise ValueError( f"Unknown metric: '{metric}'.\n" f" Available metrics: 'latency', 'memory', 'accuracy'.\n" f" Fix: Use one of the supported metric names." ) # Return structured list of dicts for easy comparison # (No pandas dependency - students can convert to DataFrame if needed) comparison_data = [] for model_name, result in results.items(): comparison_data.append({ 'model': model_name.replace(f'_{metric}', '').replace('_ms', '').replace('_mb', ''), 'metric': metric, 'mean': result.mean, 'std': result.std, 'ci_lower': result.ci_lower, 'ci_upper': result.ci_upper, 'count': result.count }) return comparison_data ### END SOLUTION def test_unit_benchmark(): """🔬 Test Benchmark class functionality.""" print("🔬 Unit Test: Benchmark...") # Create mock models for testing class MockModel: def __init__(self, name): self.name = name def forward(self, x): time.sleep(0.001) # Simulate computation return x models = [MockModel("fast_model"), MockModel("slow_model")] datasets = [{"data": "test1"}, {"data": "test2"}] benchmark = Benchmark(models, datasets, warmup_runs=2, measurement_runs=3) # Test latency benchmark latency_results = benchmark.run_latency_benchmark() assert len(latency_results) == 2 assert "fast_model" in latency_results assert all(isinstance(result, BenchmarkResult) for result in latency_results.values()) # Test accuracy benchmark accuracy_results = benchmark.run_accuracy_benchmark() assert len(accuracy_results) == 2 assert all(0 <= result.mean <= 1 for result in accuracy_results.values()) # Test memory benchmark memory_results = benchmark.run_memory_benchmark() assert len(memory_results) == 2 assert all(result.mean >= 0 for result in memory_results.values()) # Test comparison (returns list of dicts, not DataFrame) comparison_data = benchmark.compare_models("latency") assert len(comparison_data) == 2 assert isinstance(comparison_data, list) assert all(isinstance(item, dict) for item in comparison_data) assert "model" in comparison_data[0] assert "mean" in comparison_data[0] print("✅ Benchmark works correctly!") if __name__ == "__main__": test_unit_benchmark() # %% ../../modules/19_benchmarking/19_benchmarking.ipynb 15 class BenchmarkSuite: """ Comprehensive benchmark suite for ML systems evaluation. TODO: Implement a full benchmark suite that runs multiple test categories APPROACH: 1. Combine multiple benchmark types (latency, accuracy, memory, energy) 2. Generate comprehensive reports with visualizations 3. Support different model categories and hardware configurations 4. Provide recommendations based on results EXAMPLE: >>> suite = BenchmarkSuite(models, datasets) >>> report = suite.run_full_benchmark() >>> suite.generate_report(report) HINTS: - Organize results by benchmark type and model - Create Pareto frontier analysis for trade-offs - Include system information and test conditions - Generate actionable insights and recommendations """ ### BEGIN SOLUTION def __init__(self, models: List[Any], datasets: List[Any], output_dir: str = "benchmark_results"): """Initialize comprehensive benchmark suite.""" self.models = models self.datasets = datasets self.output_dir = Path(output_dir) self.output_dir.mkdir(exist_ok=True) self.benchmark = Benchmark(models, datasets) self.results = {} def run_full_benchmark(self) -> Dict[str, Dict[str, BenchmarkResult]]: """Run all benchmark categories.""" print("🔬 Running comprehensive benchmark suite...") # Run all benchmark types print(" 📊 Measuring latency...") self.results['latency'] = self.benchmark.run_latency_benchmark() print(" 🎯 Measuring accuracy...") self.results['accuracy'] = self.benchmark.run_accuracy_benchmark() print(" 💾 Measuring memory usage...") self.results['memory'] = self.benchmark.run_memory_benchmark() # Simulate energy benchmark (would require specialized hardware) print(" ⚡ Estimating energy efficiency...") self.results['energy'] = self._estimate_energy_efficiency() return self.results def _estimate_energy_efficiency(self) -> Dict[str, BenchmarkResult]: """Estimate energy efficiency (simplified simulation).""" energy_results = {} for i, model in enumerate(self.models): model_name = getattr(model, 'name', f'model_{i}') # Energy roughly correlates with latency * memory usage if 'latency' in self.results and 'memory' in self.results: latency_result = self.results['latency'].get(model_name) memory_result = self.results['memory'].get(model_name) if latency_result and memory_result: # Energy ∝ power × time, power ∝ memory usage energy_values = [] for lat, mem in zip(latency_result.values, memory_result.values): # Simplified energy model: energy = base + latency_factor * time + memory_factor * memory energy = 0.1 + (lat / 1000) * 2.0 + mem * 0.01 # Joules energy_values.append(energy) energy_results[model_name] = BenchmarkResult( f"{model_name}_energy_joules", energy_values, metadata={'estimated': True, **self.benchmark.system_info} ) # Fallback if no latency/memory results if not energy_results: for i, model in enumerate(self.models): model_name = getattr(model, 'name', f'model_{i}') # Simulate energy measurements energy_values = [0.5 + np.random.normal(0, 0.1) for _ in range(5)] energy_results[model_name] = BenchmarkResult( f"{model_name}_energy_joules", energy_values, metadata={'estimated': True, **self.benchmark.system_info} ) return energy_results def plot_results(self, save_plots: bool = True): """Generate visualization plots for benchmark results.""" if not self.results: print("No results to plot. Run benchmark first.") return if not MATPLOTLIB_AVAILABLE: print("⚠️ matplotlib not available - skipping plots. Install with: pip install matplotlib") return fig, axes = plt.subplots(2, 2, figsize=(15, 12)) fig.suptitle('ML Model Benchmark Results', fontsize=16, fontweight='bold') # Plot each metric type metrics = ['latency', 'accuracy', 'memory', 'energy'] units = ['ms', 'accuracy', 'MB', 'J'] for idx, (metric, unit) in enumerate(zip(metrics, units)): ax = axes[idx // 2, idx % 2] if metric in self.results: model_names = [] means = [] stds = [] for model_name, result in self.results[metric].items(): clean_name = model_name.replace(f'_{metric}', '').replace('_ms', '').replace('_mb', '').replace('_joules', '') model_names.append(clean_name) means.append(result.mean) stds.append(result.std) bars = ax.bar(model_names, means, yerr=stds, capsize=5, alpha=0.7) ax.set_title(f'{metric.capitalize()} Comparison') ax.set_ylabel(f'{metric.capitalize()} ({unit})') ax.tick_params(axis='x', rotation=45) # Color bars by performance (green = better) if metric in ['latency', 'memory', 'energy']: # Lower is better best_idx = means.index(min(means)) else: # Higher is better (accuracy) best_idx = means.index(max(means)) for i, bar in enumerate(bars): if i == best_idx: bar.set_color('green') bar.set_alpha(0.8) else: ax.text(0.5, 0.5, f'No {metric} data', ha='center', va='center', transform=ax.transAxes) ax.set_title(f'{metric.capitalize()} Comparison') plt.tight_layout() if save_plots: plot_path = self.output_dir / 'benchmark_comparison.png' plt.savefig(plot_path, dpi=300, bbox_inches='tight') print(f"📊 Plots saved to {plot_path}") plt.show() def plot_pareto_frontier(self, x_metric: str = 'latency', y_metric: str = 'accuracy'): """Plot Pareto frontier for two competing objectives.""" if not MATPLOTLIB_AVAILABLE: print("⚠️ matplotlib not available - skipping plots. Install with: pip install matplotlib") return if x_metric not in self.results or y_metric not in self.results: print(f"Missing data for {x_metric} or {y_metric}") return plt.figure(figsize=(10, 8)) x_values = [] y_values = [] model_names = [] for model_name in self.results[x_metric].keys(): clean_name = model_name.replace(f'_{x_metric}', '').replace('_ms', '').replace('_mb', '').replace('_joules', '') if clean_name in [mn.replace(f'_{y_metric}', '') for mn in self.results[y_metric].keys()]: x_val = self.results[x_metric][model_name].mean # Find corresponding y value y_key = None for key in self.results[y_metric].keys(): if clean_name in key: y_key = key break if y_key: y_val = self.results[y_metric][y_key].mean x_values.append(x_val) y_values.append(y_val) model_names.append(clean_name) # Plot points plt.scatter(x_values, y_values, s=100, alpha=0.7) # Label points for i, name in enumerate(model_names): plt.annotate(name, (x_values[i], y_values[i]), xytext=(5, 5), textcoords='offset points') # Determine if lower or higher is better for each metric x_lower_better = x_metric in ['latency', 'memory', 'energy'] y_lower_better = y_metric in ['latency', 'memory', 'energy'] plt.xlabel(f'{x_metric.capitalize()} ({"lower" if x_lower_better else "higher"} is better)') plt.ylabel(f'{y_metric.capitalize()} ({"lower" if y_lower_better else "higher"} is better)') plt.title(f'Pareto Frontier: {x_metric.capitalize()} vs {y_metric.capitalize()}') plt.grid(True, alpha=0.3) # Save plot plot_path = self.output_dir / f'pareto_{x_metric}_vs_{y_metric}.png' plt.savefig(plot_path, dpi=300, bbox_inches='tight') print(f"📊 Pareto plot saved to {plot_path}") plt.show() def generate_report(self) -> str: """Generate comprehensive benchmark report.""" if not self.results: return "No benchmark results available. Run benchmark first." report_lines = [] report_lines.append("# ML Model Benchmark Report") report_lines.append("=" * 50) report_lines.append("") # System information report_lines.append("## System Information") system_info = self.benchmark.system_info for key, value in system_info.items(): report_lines.append(f"- {key}: {value}") report_lines.append("") # Results summary report_lines.append("## Benchmark Results Summary") report_lines.append("") for metric_type, results in self.results.items(): report_lines.append(f"### {metric_type.capitalize()} Results") report_lines.append("") # Find best performer if metric_type in ['latency', 'memory', 'energy']: # Lower is better best_model = min(results.items(), key=lambda x: x[1].mean) comparison_text = "fastest" if metric_type == 'latency' else "most efficient" else: # Higher is better best_model = max(results.items(), key=lambda x: x[1].mean) comparison_text = "most accurate" report_lines.append(f"**Best performer**: {best_model[0]} ({comparison_text})") report_lines.append("") # Detailed results for model_name, result in results.items(): clean_name = model_name.replace(f'_{metric_type}', '').replace('_ms', '').replace('_mb', '').replace('_joules', '') report_lines.append(f"- **{clean_name}**: {result.mean:.4f} ± {result.std:.4f}") report_lines.append("") # Recommendations report_lines.append("## Recommendations") report_lines.append("") if len(self.results) >= 2: # Find overall best trade-off model if 'latency' in self.results and 'accuracy' in self.results: report_lines.append("### Accuracy vs Speed Trade-off") # Simple scoring: normalize metrics and combine latency_results = self.results['latency'] accuracy_results = self.results['accuracy'] scores = {} for model_name in latency_results.keys(): clean_name = model_name.replace('_latency', '').replace('_ms', '') # Find corresponding accuracy acc_key = None for key in accuracy_results.keys(): if clean_name in key: acc_key = key break if acc_key: # Normalize: latency (lower better), accuracy (higher better) lat_vals = [r.mean for r in latency_results.values()] acc_vals = [r.mean for r in accuracy_results.values()] norm_latency = 1 - (latency_results[model_name].mean - min(lat_vals)) / (max(lat_vals) - min(lat_vals) + 1e-8) norm_accuracy = (accuracy_results[acc_key].mean - min(acc_vals)) / (max(acc_vals) - min(acc_vals) + 1e-8) # Combined score (equal weight) scores[clean_name] = (norm_latency + norm_accuracy) / 2 if scores: best_overall = max(scores.items(), key=lambda x: x[1]) report_lines.append(f"- **Best overall trade-off**: {best_overall[0]} (score: {best_overall[1]:.3f})") report_lines.append("") report_lines.append("### Usage Recommendations") if 'accuracy' in self.results and 'latency' in self.results: acc_results = self.results['accuracy'] lat_results = self.results['latency'] # Find highest accuracy model best_acc_model = max(acc_results.items(), key=lambda x: x[1].mean) best_lat_model = min(lat_results.items(), key=lambda x: x[1].mean) report_lines.append(f"- **For maximum accuracy**: Use {best_acc_model[0].replace('_accuracy', '')}") report_lines.append(f"- **For minimum latency**: Use {best_lat_model[0].replace('_latency_ms', '')}") report_lines.append("- **For production deployment**: Consider the best overall trade-off model above") report_lines.append("") report_lines.append("---") report_lines.append("Report generated by TinyTorch Benchmarking Suite") # Save report report_text = "\n".join(report_lines) report_path = self.output_dir / 'benchmark_report.md' with open(report_path, 'w') as f: f.write(report_text) print(f"📄 Report saved to {report_path}") return report_text ### END SOLUTION def test_unit_benchmark_suite(): """🔬 Test BenchmarkSuite comprehensive functionality.""" print("🔬 Unit Test: BenchmarkSuite...") # Create mock models class MockModel: def __init__(self, name): self.name = name def forward(self, x): time.sleep(0.001) return x models = [MockModel("efficient_model"), MockModel("accurate_model")] datasets = [{"test": "data"}] # Create temporary directory for test output import tempfile with tempfile.TemporaryDirectory() as tmp_dir: suite = BenchmarkSuite(models, datasets, output_dir=tmp_dir) # Run full benchmark results = suite.run_full_benchmark() # Verify all benchmark types completed assert 'latency' in results assert 'accuracy' in results assert 'memory' in results assert 'energy' in results # Verify results structure for metric_results in results.values(): assert len(metric_results) == 2 # Two models assert all(isinstance(result, BenchmarkResult) for result in metric_results.values()) # Test report generation report = suite.generate_report() assert "Benchmark Report" in report assert "System Information" in report assert "Recommendations" in report # Verify files are created output_path = Path(tmp_dir) assert (output_path / 'benchmark_report.md').exists() print("✅ BenchmarkSuite works correctly!") if __name__ == "__main__": test_unit_benchmark_suite() # %% ../../modules/19_benchmarking/19_benchmarking.ipynb 17 class TinyMLPerf: """ TinyMLPerf-style standardized benchmarking for edge ML systems. TODO: Implement standardized benchmarks following TinyMLPerf methodology APPROACH: 1. Define standard benchmark tasks and datasets 2. Implement standardized measurement protocols 3. Ensure reproducible results across different systems 4. Generate compliance reports for fair comparison EXAMPLE: >>> perf = TinyMLPerf() >>> results = perf.run_keyword_spotting_benchmark(model) >>> perf.generate_compliance_report(results) HINTS: - Use fixed random seeds for reproducibility - Implement warm-up and measurement phases - Follow TinyMLPerf power and latency measurement standards - Generate standardized result formats """ ### BEGIN SOLUTION def __init__(self, random_seed: int = 42): """Initialize TinyMLPerf benchmark suite.""" self.random_seed = random_seed np.random.seed(random_seed) # Standard TinyMLPerf benchmark configurations self.benchmarks = { 'keyword_spotting': { 'input_shape': (1, 16000), # 1 second of 16kHz audio 'target_accuracy': 0.90, 'max_latency_ms': 100, 'description': 'Wake word detection' }, 'visual_wake_words': { 'input_shape': (1, 96, 96, 3), # 96x96 RGB image 'target_accuracy': 0.80, 'max_latency_ms': 200, 'description': 'Person detection in images' }, 'anomaly_detection': { 'input_shape': (1, 640), # Machine sensor data 'target_accuracy': 0.85, 'max_latency_ms': 50, 'description': 'Industrial anomaly detection' }, 'image_classification': { 'input_shape': (1, 32, 32, 3), # CIFAR-10 style 'target_accuracy': 0.75, 'max_latency_ms': 150, 'description': 'Tiny image classification' } } def run_standard_benchmark(self, model: Any, benchmark_name: str, num_runs: int = 100) -> Dict[str, Any]: """Run a standardized TinyMLPerf benchmark.""" if benchmark_name not in self.benchmarks: raise ValueError( f"Unknown benchmark: '{benchmark_name}'.\n" f" Available benchmarks: {list(self.benchmarks.keys())}.\n" f" Fix: Use one of the supported benchmark names from the list above." ) config = self.benchmarks[benchmark_name] print(f"🔬 Running TinyMLPerf {benchmark_name} benchmark...") print(f" Target: {config['target_accuracy']:.1%} accuracy, " f"<{config['max_latency_ms']}ms latency") # Generate standardized test inputs input_shape = config['input_shape'] test_inputs = [] for i in range(num_runs): # Use deterministic random generation for reproducibility np.random.seed(self.random_seed + i) if len(input_shape) == 2: # Audio/sequence data test_input = np.random.randn(*input_shape).astype(np.float32) else: # Image data test_input = np.random.randint(0, 256, input_shape).astype(np.float32) / 255.0 test_inputs.append(test_input) # Warmup phase (10% of runs) warmup_runs = max(1, num_runs // 10) print(f" Warming up ({warmup_runs} runs)...") for i in range(warmup_runs): if hasattr(model, 'forward'): model.forward(test_inputs[i]) elif hasattr(model, 'predict'): model.predict(test_inputs[i]) elif callable(model): model(test_inputs[i]) # Measurement phase print(f" Measuring performance ({num_runs} runs)...") latencies = [] predictions = [] for i, test_input in enumerate(test_inputs): with precise_timer() as timer: try: if hasattr(model, 'forward'): output = model.forward(test_input) elif hasattr(model, 'predict'): output = model.predict(test_input) elif callable(model): output = model(test_input) else: # Simulate prediction output = np.random.rand(2) if benchmark_name in ['keyword_spotting', 'visual_wake_words'] else np.random.rand(10) predictions.append(output) except: # Fallback simulation predictions.append(np.random.rand(2)) latencies.append(timer.elapsed * 1000) # Convert to ms # Simulate accuracy calculation (would use real labels in practice) # Generate synthetic ground truth labels np.random.seed(self.random_seed) if benchmark_name in ['keyword_spotting', 'visual_wake_words']: # Binary classification true_labels = np.random.randint(0, 2, num_runs) predicted_labels = [] for pred in predictions: if hasattr(pred, 'data'): pred_array = pred.data else: pred_array = np.array(pred) # Convert to numpy array if needed (handle memoryview objects) if not isinstance(pred_array, np.ndarray): pred_array = np.array(pred_array) if len(pred_array.shape) > 1: pred_array = pred_array.flatten() if len(pred_array) >= 2: predicted_labels.append(1 if pred_array[1] > pred_array[0] else 0) else: predicted_labels.append(1 if pred_array[0] > 0.5 else 0) else: # Multi-class classification num_classes = 10 if benchmark_name == 'image_classification' else 5 true_labels = np.random.randint(0, num_classes, num_runs) predicted_labels = [] for pred in predictions: if hasattr(pred, 'data'): pred_array = pred.data else: pred_array = np.array(pred) if len(pred_array.shape) > 1: pred_array = pred_array.flatten() predicted_labels.append(np.argmax(pred_array) % num_classes) # Calculate accuracy correct_predictions = sum(1 for true, pred in zip(true_labels, predicted_labels) if true == pred) accuracy = correct_predictions / num_runs # Add some realistic noise based on model complexity model_name = getattr(model, 'name', 'unknown_model') if 'efficient' in model_name.lower(): accuracy = min(0.95, accuracy + 0.1) # Efficient models might be less accurate elif 'accurate' in model_name.lower(): accuracy = min(0.98, accuracy + 0.2) # Accurate models perform better # Compile results mean_latency = float(np.mean(latencies)) accuracy_met = bool(accuracy >= config['target_accuracy']) latency_met = bool(mean_latency <= config['max_latency_ms']) results = { 'benchmark_name': benchmark_name, 'model_name': getattr(model, 'name', 'unknown_model'), 'accuracy': float(accuracy), 'mean_latency_ms': mean_latency, 'std_latency_ms': float(np.std(latencies)), 'p50_latency_ms': float(np.percentile(latencies, 50)), 'p90_latency_ms': float(np.percentile(latencies, 90)), 'p99_latency_ms': float(np.percentile(latencies, 99)), 'max_latency_ms': float(np.max(latencies)), 'throughput_fps': float(1000 / mean_latency), 'target_accuracy': float(config['target_accuracy']), 'target_latency_ms': float(config['max_latency_ms']), 'accuracy_met': accuracy_met, 'latency_met': latency_met, 'compliant': accuracy_met and latency_met, 'num_runs': int(num_runs), 'random_seed': int(self.random_seed) } print(f" Results: {accuracy:.1%} accuracy, {np.mean(latencies):.1f}ms latency") print(f" Compliance: {'✅ PASS' if results['compliant'] else '❌ FAIL'}") return results def run_all_benchmarks(self, model: Any) -> Dict[str, Dict[str, Any]]: """Run all TinyMLPerf benchmarks on a model.""" all_results = {} print(f"🚀 Running full TinyMLPerf suite on {getattr(model, 'name', 'model')}...") print("=" * 60) for benchmark_name in self.benchmarks.keys(): try: results = self.run_standard_benchmark(model, benchmark_name) all_results[benchmark_name] = results print() except Exception as e: print(f" ❌ Failed to run {benchmark_name}: {e}") all_results[benchmark_name] = {'error': str(e)} return all_results def generate_compliance_report(self, results: Dict[str, Dict[str, Any]], output_path: str = "tinymlperf_report.json") -> str: """Generate TinyMLPerf compliance report.""" # Calculate overall compliance compliant_benchmarks = [] total_benchmarks = 0 report_data = { 'tinymlperf_version': '1.0', 'random_seed': self.random_seed, 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), 'model_name': 'unknown', 'benchmarks': {}, 'summary': {} } for benchmark_name, result in results.items(): if 'error' not in result: total_benchmarks += 1 if result.get('compliant', False): compliant_benchmarks.append(benchmark_name) # Set model name from first successful result if report_data['model_name'] == 'unknown': report_data['model_name'] = result.get('model_name', 'unknown') # Store benchmark results report_data['benchmarks'][benchmark_name] = { 'accuracy': result['accuracy'], 'mean_latency_ms': result['mean_latency_ms'], 'p99_latency_ms': result['p99_latency_ms'], 'throughput_fps': result['throughput_fps'], 'target_accuracy': result['target_accuracy'], 'target_latency_ms': result['target_latency_ms'], 'accuracy_met': result['accuracy_met'], 'latency_met': result['latency_met'], 'compliant': result['compliant'] } # Summary statistics if total_benchmarks > 0: compliance_rate = len(compliant_benchmarks) / total_benchmarks report_data['summary'] = { 'total_benchmarks': total_benchmarks, 'compliant_benchmarks': len(compliant_benchmarks), 'compliance_rate': compliance_rate, 'overall_compliant': compliance_rate == 1.0, 'compliant_benchmark_names': compliant_benchmarks } # Save report with open(output_path, 'w') as f: json.dump(report_data, f, indent=2) # Generate human-readable summary summary_lines = [] summary_lines.append("# TinyMLPerf Compliance Report") summary_lines.append("=" * 40) summary_lines.append(f"Model: {report_data['model_name']}") summary_lines.append(f"Date: {report_data['timestamp']}") summary_lines.append("") if total_benchmarks > 0: summary_lines.append(f"## Overall Result: {'✅ COMPLIANT' if report_data['summary']['overall_compliant'] else '❌ NON-COMPLIANT'}") summary_lines.append(f"Compliance Rate: {compliance_rate:.1%} ({len(compliant_benchmarks)}/{total_benchmarks})") summary_lines.append("") summary_lines.append("## Benchmark Details:") for benchmark_name, result in report_data['benchmarks'].items(): status = "✅ PASS" if result['compliant'] else "❌ FAIL" summary_lines.append(f"- **{benchmark_name}**: {status}") summary_lines.append(f" - Accuracy: {result['accuracy']:.1%} (target: {result['target_accuracy']:.1%})") summary_lines.append(f" - Latency: {result['mean_latency_ms']:.1f}ms (target: <{result['target_latency_ms']}ms)") summary_lines.append("") else: summary_lines.append("No successful benchmark runs.") summary_text = "\n".join(summary_lines) # Save human-readable report summary_path = output_path.replace('.json', '_summary.md') with open(summary_path, 'w') as f: f.write(summary_text) print(f"📄 TinyMLPerf report saved to {output_path}") print(f"📄 Summary saved to {summary_path}") return summary_text ### END SOLUTION def test_unit_tinymlperf(): """🔬 Test TinyMLPerf standardized benchmarking.""" print("🔬 Unit Test: TinyMLPerf...") # Create mock model for testing class MockModel: def __init__(self, name): self.name = name def forward(self, x): time.sleep(0.001) # Simulate computation # Return appropriate output shape for different benchmarks if hasattr(x, 'shape'): if len(x.shape) == 2: # Audio/sequence return np.random.rand(2) # Binary classification else: # Image return np.random.rand(10) # Multi-class return np.random.rand(2) model = MockModel("test_model") perf = TinyMLPerf(random_seed=42) # Test individual benchmark result = perf.run_standard_benchmark(model, 'keyword_spotting', num_runs=5) # Verify result structure required_keys = ['accuracy', 'mean_latency_ms', 'throughput_fps', 'compliant'] assert all(key in result for key in required_keys) assert 0 <= result['accuracy'] <= 1 assert result['mean_latency_ms'] > 0 assert result['throughput_fps'] > 0 # Test full benchmark suite (with fewer runs for speed) import tempfile with tempfile.TemporaryDirectory() as tmp_dir: # Run subset of benchmarks for testing subset_results = {} for benchmark in ['keyword_spotting', 'image_classification']: subset_results[benchmark] = perf.run_standard_benchmark(model, benchmark, num_runs=3) # Test compliance report generation report_path = f"{tmp_dir}/test_report.json" summary = perf.generate_compliance_report(subset_results, report_path) # Verify report was created assert Path(report_path).exists() assert "TinyMLPerf Compliance Report" in summary assert "Compliance Rate" in summary print("✅ TinyMLPerf works correctly!") if __name__ == "__main__": test_unit_tinymlperf()