# ╔═══════════════════════════════════════════════════════════════════════════════╗ # ║ 🚨 CRITICAL WARNING 🚨 ║ # ║ AUTOGENERATED! DO NOT EDIT! ║ # ║ ║ # ║ This file is AUTOMATICALLY GENERATED from source modules. ║ # ║ ANY CHANGES MADE HERE WILL BE LOST when modules are re-exported! ║ # ║ ║ # ║ ✅ TO EDIT: src/XX_profiler/XX_profiler.py ║ # ║ ✅ TO EXPORT: Run 'tito module complete ' ║ # ║ ║ # ║ 🛡️ STUDENT PROTECTION: This file contains optimized implementations. ║ # ║ Editing it directly may break module functionality and training. ║ # ║ ║ # ║ 🎓 LEARNING TIP: Work in src/ (developers) or modules/ (learners) ║ # ║ The tinytorch/ directory is generated code - edit source files instead! ║ # ╚═══════════════════════════════════════════════════════════════════════════════╝ # %% auto 0 __all__ = ['BYTES_PER_FLOAT32', 'KB_TO_BYTES', 'MB_TO_BYTES', 'Profiler', 'quick_profile', 'analyze_weight_distribution'] # %% ../../modules/14_profiling/14_profiling.ipynb 1 import sys import os import time import numpy as np import tracemalloc from typing import Dict, List, Any, Optional, Tuple from collections import defaultdict import gc # Import from TinyTorch package (previous modules must be completed and exported) from ..core.tensor import Tensor from ..core.layers import Linear from ..core.spatial import Conv2d # Constants for memory and performance measurement BYTES_PER_FLOAT32 = 4 # Standard float32 size in bytes KB_TO_BYTES = 1024 # Kilobytes to bytes conversion MB_TO_BYTES = 1024 * 1024 # Megabytes to bytes conversion # %% ../../modules/14_profiling/14_profiling.ipynb 6 class Profiler: """ Professional-grade ML model profiler for performance analysis. Measures parameters, FLOPs, memory usage, and latency with statistical rigor. Used for optimization guidance and deployment planning. """ def __init__(self): """ Initialize profiler with measurement state. TODO: Set up profiler tracking structures APPROACH: 1. Create empty measurements dictionary 2. Initialize operation counters 3. Set up memory tracking state EXAMPLE: >>> profiler = Profiler() >>> profiler.measurements {} HINTS: - Use defaultdict(int) for operation counters - measurements dict will store timing results """ ### BEGIN SOLUTION self.measurements = {} self.operation_counts = defaultdict(int) self.memory_tracker = None ### END SOLUTION def count_parameters(self, model) -> int: """ Count total trainable parameters in a model. TODO: Implement parameter counting for any model with parameters() method APPROACH: 1. Get all parameters from model.parameters() if available 2. For single layers, count weight and bias directly 3. Sum total element count across all parameter tensors EXAMPLE: >>> linear = Linear(128, 64) # 128*64 + 64 = 8256 parameters >>> profiler = Profiler() >>> count = profiler.count_parameters(linear) >>> print(count) 8256 HINTS: - Use parameter.data.size for tensor element count - Handle models with and without parameters() method - Don't forget bias terms when present """ ### BEGIN SOLUTION total_params = 0 # Handle SimpleModel pattern (has .layers attribute) if hasattr(model, 'layers'): # SimpleModel: iterate through layers for layer in model.layers: for param in layer.parameters(): total_params += param.data.size elif hasattr(model, 'parameters'): # Model with direct parameters() method for param in model.parameters(): total_params += param.data.size elif hasattr(model, 'weight'): # Single layer (Linear, Conv2d) - all have .weight total_params += model.weight.data.size # Check for bias (may be None) if hasattr(model, 'bias') and model.bias is not None: total_params += model.bias.data.size else: # No parameters (activations, etc.) total_params = 0 return total_params ### END SOLUTION def count_flops(self, model, input_shape: Tuple[int, ...]) -> int: """ Count FLOPs (Floating Point Operations) for one forward pass. TODO: Implement FLOP counting for different layer types APPROACH: 1. Create dummy input with given shape 2. Calculate FLOPs based on layer type and dimensions 3. Handle different model architectures (Linear, Conv2d, Sequential) LAYER-SPECIFIC FLOP FORMULAS: - Linear: input_features × output_features × 2 (matmul + bias) - Conv2d: output_h × output_w × kernel_h × kernel_w × in_channels × out_channels × 2 - Activation: Usually 1 FLOP per element (ReLU, Sigmoid) EXAMPLE: >>> linear = Linear(128, 64) >>> profiler = Profiler() >>> flops = profiler.count_flops(linear, (1, 128)) >>> print(flops) # 128 * 64 * 2 = 16384 16384 HINTS: - Batch dimension doesn't affect per-sample FLOPs - Focus on major operations (matmul, conv) first - For Sequential models, sum FLOPs of all layers """ ### BEGIN SOLUTION # Create dummy input (unused but kept for interface consistency) _dummy_input = Tensor(np.random.randn(*input_shape)) total_flops = 0 # Handle different model types if hasattr(model, '__class__'): model_name = model.__class__.__name__ if model_name == 'Linear': # Linear layer: input_features × output_features × 2 in_features = input_shape[-1] out_features = model.weight.shape[1] if hasattr(model, 'weight') else 1 total_flops = in_features * out_features * 2 elif model_name == 'Conv2d': # Conv2d layer: complex calculation based on output size # Simplified: assume we know the output dimensions if hasattr(model, 'kernel_size') and hasattr(model, 'in_channels'): _batch_size = input_shape[0] if len(input_shape) > 3 else 1 in_channels = model.in_channels out_channels = model.out_channels kernel_h = kernel_w = model.kernel_size # Estimate output size (simplified) input_h, input_w = input_shape[-2], input_shape[-1] output_h = input_h // (model.stride if hasattr(model, 'stride') else 1) output_w = input_w // (model.stride if hasattr(model, 'stride') else 1) total_flops = (output_h * output_w * kernel_h * kernel_w * in_channels * out_channels * 2) elif model_name == 'Sequential' or hasattr(model, 'layers'): # Sequential model or model with layers: sum FLOPs of all layers current_shape = input_shape for layer in model.layers: layer_flops = self.count_flops(layer, current_shape) total_flops += layer_flops # Update shape for next layer (simplified) if hasattr(layer, 'weight'): current_shape = current_shape[:-1] + (layer.weight.shape[1],) else: # Activation or other: assume 1 FLOP per element total_flops = np.prod(input_shape) return total_flops ### END SOLUTION def measure_memory(self, model, input_shape: Tuple[int, ...]) -> Dict[str, float]: """ Measure memory usage during forward pass. TODO: Implement memory tracking for model execution APPROACH: 1. Use tracemalloc to track memory allocation 2. Measure baseline memory before model execution 3. Run forward pass and track peak usage 4. Calculate different memory components RETURN DICTIONARY: - 'parameter_memory_mb': Memory for model parameters - 'activation_memory_mb': Memory for activations - 'peak_memory_mb': Maximum memory usage - 'memory_efficiency': Ratio of useful to total memory EXAMPLE: >>> linear = Linear(1024, 512) >>> profiler = Profiler() >>> memory = profiler.measure_memory(linear, (32, 1024)) >>> print(f"Parameters: {memory['parameter_memory_mb']:.1f} MB") Parameters: 2.1 MB HINTS: - Use tracemalloc.start() and tracemalloc.get_traced_memory() - Account for float32 = 4 bytes per parameter - Activation memory scales with batch size """ ### BEGIN SOLUTION # Start memory tracking tracemalloc.start() # Measure baseline memory (unused but kept for completeness) _baseline_memory = tracemalloc.get_traced_memory()[0] # Calculate parameter memory param_count = self.count_parameters(model) parameter_memory_bytes = param_count * BYTES_PER_FLOAT32 parameter_memory_mb = parameter_memory_bytes / MB_TO_BYTES # Create input and measure activation memory dummy_input = Tensor(np.random.randn(*input_shape)) input_memory_bytes = dummy_input.data.nbytes # Estimate activation memory (simplified) activation_memory_bytes = input_memory_bytes * 2 # Rough estimate activation_memory_mb = activation_memory_bytes / MB_TO_BYTES # Run forward pass to measure peak memory usage _ = model.forward(dummy_input) # Get peak memory _current_memory, peak_memory = tracemalloc.get_traced_memory() peak_memory_mb = (peak_memory - _baseline_memory) / MB_TO_BYTES tracemalloc.stop() # Calculate efficiency useful_memory = parameter_memory_mb + activation_memory_mb memory_efficiency = useful_memory / max(peak_memory_mb, 0.001) # Avoid division by zero return { 'parameter_memory_mb': parameter_memory_mb, 'activation_memory_mb': activation_memory_mb, 'peak_memory_mb': max(peak_memory_mb, useful_memory), 'memory_efficiency': min(memory_efficiency, 1.0) } ### END SOLUTION def measure_latency(self, model, input_tensor, warmup: int = 10, iterations: int = 100) -> float: """ Measure model inference latency with statistical rigor. TODO: Implement accurate latency measurement APPROACH: 1. Run warmup iterations to stabilize performance 2. Measure multiple iterations for statistical accuracy 3. Calculate median latency to handle outliers 4. Return latency in milliseconds PARAMETERS: - warmup: Number of warmup runs (default 10) - iterations: Number of measurement runs (default 100) EXAMPLE: >>> linear = Linear(128, 64) >>> input_tensor = Tensor(np.random.randn(1, 128)) >>> profiler = Profiler() >>> latency = profiler.measure_latency(linear, input_tensor) >>> print(f"Latency: {latency:.2f} ms") Latency: 0.15 ms HINTS: - Use time.perf_counter() for high precision - Use median instead of mean for robustness against outliers - Handle different model interfaces (forward, __call__) """ ### BEGIN SOLUTION # Warmup runs to stabilize performance for _ in range(warmup): _ = model.forward(input_tensor) # Measurement runs times = [] for _ in range(iterations): start_time = time.perf_counter() _ = model.forward(input_tensor) end_time = time.perf_counter() times.append((end_time - start_time) * 1000) # Convert to milliseconds # Calculate statistics - use median for robustness times = np.array(times) median_latency = np.median(times) return float(median_latency) ### END SOLUTION def profile_layer(self, layer, input_shape: Tuple[int, ...]) -> Dict[str, Any]: """ Profile a single layer comprehensively. TODO: Implement layer-wise profiling APPROACH: 1. Count parameters for this layer 2. Count FLOPs for this layer 3. Measure memory usage 4. Measure latency 5. Return comprehensive layer profile EXAMPLE: >>> linear = Linear(256, 128) >>> profiler = Profiler() >>> profile = profiler.profile_layer(linear, (32, 256)) >>> print(f"Layer uses {profile['parameters']} parameters") Layer uses 32896 parameters HINTS: - Use existing profiler methods (count_parameters, count_flops, etc.) - Create dummy input for latency measurement - Include layer type information in profile """ ### BEGIN SOLUTION # Create dummy input for latency measurement dummy_input = Tensor(np.random.randn(*input_shape)) # Gather all measurements params = self.count_parameters(layer) flops = self.count_flops(layer, input_shape) memory = self.measure_memory(layer, input_shape) latency = self.measure_latency(layer, dummy_input, warmup=3, iterations=10) # Compute derived metrics gflops_per_second = (flops / 1e9) / max(latency / 1000, 1e-6) return { 'layer_type': layer.__class__.__name__, 'parameters': params, 'flops': flops, 'latency_ms': latency, 'gflops_per_second': gflops_per_second, **memory } ### END SOLUTION def profile_forward_pass(self, model, input_tensor) -> Dict[str, Any]: """ Comprehensive profiling of a model's forward pass. TODO: Implement complete forward pass analysis APPROACH: 1. Use Profiler class to gather all measurements 2. Create comprehensive performance profile 3. Add derived metrics and insights 4. Return structured analysis results RETURN METRICS: - All basic profiler measurements - FLOPs per second (computational efficiency) - Memory bandwidth utilization - Performance bottleneck identification EXAMPLE: >>> model = Linear(256, 128) >>> input_data = Tensor(np.random.randn(32, 256)) >>> profiler = Profiler() >>> profile = profiler.profile_forward_pass(model, input_data) >>> print(f"Throughput: {profile['gflops_per_second']:.2f} GFLOP/s") Throughput: 2.45 GFLOP/s HINTS: - GFLOP/s = (FLOPs / 1e9) / (latency_ms / 1000) - Memory bandwidth = memory_mb / (latency_ms / 1000) - Consider realistic hardware limits for efficiency calculations """ ### BEGIN SOLUTION # Basic measurements param_count = self.count_parameters(model) flops = self.count_flops(model, input_tensor.shape) memory_stats = self.measure_memory(model, input_tensor.shape) latency_ms = self.measure_latency(model, input_tensor, warmup=5, iterations=20) # Derived metrics latency_seconds = latency_ms / 1000.0 gflops_per_second = (flops / 1e9) / max(latency_seconds, 1e-6) # Memory bandwidth (MB/s) memory_bandwidth = memory_stats['peak_memory_mb'] / max(latency_seconds, 1e-6) # Efficiency metrics theoretical_peak_gflops = 100.0 # Assume 100 GFLOP/s theoretical peak for CPU computational_efficiency = min(gflops_per_second / theoretical_peak_gflops, 1.0) # Bottleneck analysis is_memory_bound = memory_bandwidth > gflops_per_second * 100 # Rough heuristic is_compute_bound = not is_memory_bound return { # Basic measurements 'parameters': param_count, 'flops': flops, 'latency_ms': latency_ms, **memory_stats, # Derived metrics 'gflops_per_second': gflops_per_second, 'memory_bandwidth_mbs': memory_bandwidth, 'computational_efficiency': computational_efficiency, # Bottleneck analysis 'is_memory_bound': is_memory_bound, 'is_compute_bound': is_compute_bound, 'bottleneck': 'memory' if is_memory_bound else 'compute' } ### END SOLUTION def profile_backward_pass(self, model, input_tensor, _loss_fn=None) -> Dict[str, Any]: """ Profile both forward and backward passes for training analysis. TODO: Implement training-focused profiling APPROACH: 1. Profile forward pass first 2. Estimate backward pass costs (typically 2× forward) 3. Calculate total training iteration metrics 4. Analyze memory requirements for gradients and optimizers BACKWARD PASS ESTIMATES: - FLOPs: ~2× forward pass (gradient computation) - Memory: +1× parameters (gradient storage) - Latency: ~2× forward pass (more complex operations) EXAMPLE: >>> model = Linear(128, 64) >>> input_data = Tensor(np.random.randn(16, 128)) >>> profiler = Profiler() >>> profile = profiler.profile_backward_pass(model, input_data) >>> print(f"Training iteration: {profile['total_latency_ms']:.2f} ms") Training iteration: 0.45 ms HINTS: - Total memory = parameters + activations + gradients - Optimizer memory depends on algorithm (SGD: 0×, Adam: 2×) - Consider gradient accumulation effects """ ### BEGIN SOLUTION # Get forward pass profile forward_profile = self.profile_forward_pass(model, input_tensor) # Estimate backward pass (typically 2× forward) backward_flops = forward_profile['flops'] * 2 backward_latency_ms = forward_profile['latency_ms'] * 2 # Gradient memory (equal to parameter memory) gradient_memory_mb = forward_profile['parameter_memory_mb'] # Total training iteration total_flops = forward_profile['flops'] + backward_flops total_latency_ms = forward_profile['latency_ms'] + backward_latency_ms total_memory_mb = (forward_profile['parameter_memory_mb'] + forward_profile['activation_memory_mb'] + gradient_memory_mb) # Training efficiency total_gflops_per_second = (total_flops / 1e9) / (total_latency_ms / 1000.0) # Optimizer memory estimates optimizer_memory_estimates = { 'sgd': 0, # No extra memory 'adam': gradient_memory_mb * 2, # Momentum + velocity 'adamw': gradient_memory_mb * 2, # Same as Adam } return { # Forward pass 'forward_flops': forward_profile['flops'], 'forward_latency_ms': forward_profile['latency_ms'], 'forward_memory_mb': forward_profile['peak_memory_mb'], # Backward pass estimates 'backward_flops': backward_flops, 'backward_latency_ms': backward_latency_ms, 'gradient_memory_mb': gradient_memory_mb, # Total training iteration 'total_flops': total_flops, 'total_latency_ms': total_latency_ms, 'total_memory_mb': total_memory_mb, 'total_gflops_per_second': total_gflops_per_second, # Optimizer memory requirements 'optimizer_memory_estimates': optimizer_memory_estimates, # Training insights 'memory_efficiency': forward_profile['memory_efficiency'], 'bottleneck': forward_profile['bottleneck'] } ### END SOLUTION # %% ../../modules/14_profiling/14_profiling.ipynb 8 def quick_profile(model, input_tensor, profiler=None): """ Quick profiling function for immediate insights. Provides a simplified interface for profiling that displays key metrics in a student-friendly format. Args: model: Model to profile input_tensor: Input data for profiling profiler: Optional Profiler instance (creates new one if None) Returns: dict: Profile results with key metrics Example: >>> model = Linear(128, 64) >>> input_data = Tensor(np.random.randn(16, 128)) >>> results = quick_profile(model, input_data) >>> # Displays formatted output automatically """ if profiler is None: profiler = Profiler() profile = profiler.profile_forward_pass(model, input_tensor) # Display formatted results print("🔬 Quick Profile Results:") print(f" Parameters: {profile['parameters']:,}") print(f" FLOPs: {profile['flops']:,}") print(f" Latency: {profile['latency_ms']:.2f} ms") print(f" Memory: {profile['peak_memory_mb']:.2f} MB") print(f" Bottleneck: {profile['bottleneck']}") print(f" Efficiency: {profile['computational_efficiency']*100:.1f}%") return profile # %% ../../modules/14_profiling/14_profiling.ipynb 9 def analyze_weight_distribution(model, percentiles=[10, 25, 50, 75, 90]): """ Analyze weight distribution for compression insights. Helps understand which weights are small and might be prunable. Used by Module 17 (Compression) to motivate pruning. Args: model: Model to analyze percentiles: List of percentiles to compute Returns: dict: Weight distribution statistics Example: >>> model = Linear(512, 512) >>> stats = analyze_weight_distribution(model) >>> print(f"Weights < 0.01: {stats['below_threshold_001']:.1f}%") """ # Collect all weights weights = [] if hasattr(model, 'parameters'): for param in model.parameters(): weights.extend(param.data.flatten().tolist()) elif hasattr(model, 'weight'): weights.extend(model.weight.data.flatten().tolist()) else: return {'error': 'No weights found'} weights = np.array(weights) abs_weights = np.abs(weights) # Calculate statistics stats = { 'total_weights': len(weights), 'mean': float(np.mean(abs_weights)), 'std': float(np.std(abs_weights)), 'min': float(np.min(abs_weights)), 'max': float(np.max(abs_weights)), } # Percentile analysis for p in percentiles: stats[f'percentile_{p}'] = float(np.percentile(abs_weights, p)) # Threshold analysis (useful for pruning) for threshold in [0.001, 0.01, 0.1]: below = np.sum(abs_weights < threshold) / len(weights) * 100 stats[f'below_threshold_{str(threshold).replace(".", "")}'] = below return stats