Files
TinyTorch/tinytorch/core/kernels.py
Vijay Janapa Reddi 6d11a2be40 Complete comprehensive system validation and cleanup
🎯 Major Accomplishments:
•  All 15 module dev files validated and unit tests passing
•  Comprehensive integration tests (11/11 pass)
•  All 3 examples working with PyTorch-like API (XOR, MNIST, CIFAR-10)
•  Training capability verified (4/4 tests pass, XOR shows 35.8% improvement)
•  Clean directory structure (modules/source/ → modules/)

🧹 Repository Cleanup:
• Removed experimental/debug files and old logos
• Deleted redundant documentation (API_SIMPLIFICATION_COMPLETE.md, etc.)
• Removed empty module directories and backup files
• Streamlined examples (kept modern API versions only)
• Cleaned up old TinyGPT implementation (moved to examples concept)

📊 Validation Results:
• Module unit tests: 15/15 
• Integration tests: 11/11 
• Example validation: 3/3 
• Training validation: 4/4 

🔧 Key Fixes:
• Fixed activations module requires_grad test
• Fixed networks module layer name test (Dense → Linear)
• Fixed spatial module Conv2D weights attribute issues
• Updated all documentation to reflect new structure

📁 Structure Improvements:
• Simplified modules/source/ → modules/ (removed unnecessary nesting)
• Added comprehensive validation test suites
• Created VALIDATION_COMPLETE.md and WORKING_MODULES.md documentation
• Updated book structure to reflect ML evolution story

🚀 System Status: READY FOR PRODUCTION
All components validated, examples working, training capability verified.
Test-first approach successfully implemented and proven.
2025-09-23 10:00:33 -04:00

1254 lines
50 KiB
Python
Generated

# AUTOGENERATED! DO NOT EDIT! File to edit: ../../modules/source/temp_holding/13_kernels/kernels_dev.ipynb.
# %% auto 0
__all__ = ['time_kernel', 'matmul_baseline', 'vectorized_relu', 'vectorized_operations', 'cache_friendly_matmul', 'parallel_relu',
'parallel_batch_processing', 'quantized_matmul', 'quantized_relu', 'KernelOptimizationProfiler']
# %% ../../modules/source/temp_holding/13_kernels/kernels_dev.ipynb 1
import numpy as np
import sys
import os
import time
import psutil
from typing import Callable, Dict, Any, Optional, Tuple, List
# Import our existing components
try:
from tinytorch.core.tensor import Tensor
from tinytorch.core.layers import matmul_naive as matmul
from tinytorch.core.activations import ReLU, Sigmoid, Tanh
from tinytorch.core.cnn import Conv2D
except ImportError:
# For development, import from local modules
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.extend([
os.path.join(base_dir, '01_tensor'),
os.path.join(base_dir, '02_activations'),
os.path.join(base_dir, '03_layers'),
os.path.join(base_dir, '05_cnn'),
os.path.join(base_dir, 'utils')
])
try:
from tensor_dev import Tensor
from layers_dev import matmul_naive as matmul
from activations_dev import ReLU, Sigmoid, Tanh
from cnn_dev import Conv2D
except ImportError:
# Create minimal mock for development
class Tensor:
def __init__(self, data):
self.data = np.array(data)
self.shape = self.data.shape
def __str__(self):
return f"Tensor({self.data})"
# Simple timing utility for kernel performance measurement
def time_kernel(func, *args, **kwargs):
"""
Simple timing function for measuring kernel performance.
Returns:
tuple: (result, time_in_microseconds)
"""
start = time.perf_counter()
result = func(*args, **kwargs)
end = time.perf_counter()
microseconds = (end - start) * 1_000_000
return result, microseconds
# %% ../../modules/source/temp_holding/13_kernels/kernels_dev.ipynb 7
def matmul_baseline(A: Tensor, B: Tensor) -> Tensor:
"""
Baseline matrix multiplication using TinyTorch's proven implementation.
This function demonstrates how to build on existing TinyTorch components
rather than reinventing the wheel. We use the standard matmul from Module 03
as our baseline for comparison with optimized kernels.
This is NOT a custom implementation - it's the standard TinyTorch matmul
wrapped for use in kernel comparisons and benchmarking.
TODO: Use TinyTorch's standard matmul implementation as a baseline.
STEP-BY-STEP IMPLEMENTATION:
1. Import the standard matmul function from tinytorch.core.layers
2. Extract numpy arrays from input Tensors
3. Use the proven implementation from TinyTorch
4. Wrap result back in Tensor format
5. Return the result
CODE REUSE PRINCIPLES:
1. Always use the packaged version for reliability
2. Don't duplicate working code - reference the source
3. Use descriptive names that indicate what the function actually does
4. Keep dependencies simple and reliable
EXAMPLE USAGE:
```python
A = Tensor([[1, 2], [3, 4]])
B = Tensor([[5, 6], [7, 8]])
C = matmul_baseline(A, B)
# Expected: [[19, 22], [43, 50]]
```
LEARNING CONNECTIONS:
- This shows how to use TinyTorch as a library
- Demonstrates reliable dependency management
- Serves as baseline for kernel performance comparisons
- Shows proper software engineering practices
"""
### BEGIN SOLUTION
# Extract numpy arrays from Tensors
A_data = A.data if hasattr(A, 'data') else A
B_data = B.data if hasattr(B, 'data') else B
# Use NumPy's matrix multiplication as our baseline
# This is our baseline - reliable, tested, and consistent
result_data = np.dot(A_data, B_data)
# Wrap the result back in a Tensor for consistency
result = Tensor(result_data)
return result
### END SOLUTION
# %% ../../modules/source/temp_holding/13_kernels/kernels_dev.ipynb 11
def vectorized_relu(x: Tensor) -> Tensor:
"""
Vectorized ReLU implementation demonstrating SIMD principles.
This function shows how to write operations that take advantage of
CPU vectorization capabilities for better performance.
TODO: Implement a vectorized ReLU that's optimized for performance.
STEP-BY-STEP IMPLEMENTATION:
1. Extract numpy array from Tensor
2. Use NumPy's vectorized operations (these compile to SIMD instructions)
3. Apply ReLU: f(x) = max(0, x) for all elements simultaneously
4. Return result as Tensor
VECTORIZATION TECHNIQUES:
1. Use np.maximum instead of loops - this is vectorized
2. Ensure input is contiguous in memory for better SIMD performance
3. Consider using specific dtypes (float32 vs float64) for SIMD alignment
4. Avoid conditional operations that break vectorization
EXAMPLE USAGE:
```python
x = Tensor([-2, -1, 0, 1, 2])
y = vectorized_relu(x)
# Expected: [0, 0, 0, 1, 2]
```
PERFORMANCE CONSIDERATIONS:
- np.maximum is vectorized and uses SIMD instructions
- Memory layout matters: contiguous arrays are faster
- Data type matters: float32 allows more SIMD parallelism than float64
- Avoid Python loops - they can't be vectorized
LEARNING CONNECTIONS:
- This is how PyTorch's ReLU is implemented under the hood
- GPU kernels use similar principles with thousands of parallel threads
- Modern CPUs can process 4-16 floats simultaneously with SIMD
"""
### BEGIN SOLUTION
# Extract numpy array
x_data = x.data if hasattr(x, 'data') else x
# Ensure contiguous memory layout for better SIMD performance
if not x_data.flags.c_contiguous:
x_data = np.ascontiguousarray(x_data)
# Vectorized ReLU using NumPy's maximum function
# This compiles to SIMD instructions on modern CPUs
result = np.maximum(0, x_data)
return Tensor(result)
### END SOLUTION
# %% ../../modules/source/temp_holding/13_kernels/kernels_dev.ipynb 12
def vectorized_operations(x: Tensor, y: Tensor) -> Dict[str, Tensor]:
"""
Demonstration of various vectorized operations.
Shows how multiple operations can be vectorized for better performance.
TODO: Implement a collection of vectorized operations.
STEP-BY-STEP IMPLEMENTATION:
1. Extract numpy arrays from input Tensors
2. Implement vectorized versions of common operations
3. Use NumPy's built-in vectorized functions
4. Return dictionary of results
OPERATIONS TO IMPLEMENT:
- element_wise_multiply: x * y (element-wise)
- element_wise_add: x + y (element-wise)
- squared_difference: (x - y)^2
- euclidean_distance: sqrt(sum((x - y)^2))
- dot_product: sum(x * y)
VECTORIZATION PRINCIPLES:
- Use NumPy operations instead of Python loops
- Combine operations when possible: (x - y)**2 instead of subtract then square
- Consider memory layout and data types
- Measure performance improvements
EXAMPLE USAGE:
```python
x = Tensor([1, 2, 3, 4])
y = Tensor([2, 3, 4, 5])
results = vectorized_operations(x, y)
# Returns dict with all vectorized operation results
```
"""
### BEGIN SOLUTION
# Extract numpy arrays
x_data = x.data if hasattr(x, 'data') else x
y_data = y.data if hasattr(y, 'data') else y
# Ensure arrays are the same shape for element-wise operations
assert x_data.shape == y_data.shape, f"Shape mismatch: {x_data.shape} vs {y_data.shape}"
# Vectorized operations
results = {
'element_wise_multiply': Tensor(x_data * y_data),
'element_wise_add': Tensor(x_data + y_data),
'squared_difference': Tensor((x_data - y_data) ** 2),
'euclidean_distance': Tensor(np.sqrt(np.sum((x_data - y_data) ** 2))),
'dot_product': Tensor(np.dot(x_data.flatten(), y_data.flatten()))
}
return results
### END SOLUTION
# %% ../../modules/source/temp_holding/13_kernels/kernels_dev.ipynb 15
def cache_friendly_matmul(A: Tensor, B: Tensor, block_size: int = 32) -> Tensor:
"""
Cache-friendly matrix multiplication using blocking technique.
This implementation uses cache blocking to improve memory access patterns
and achieve better performance on modern CPUs.
TODO: Implement cache-friendly matrix multiplication using blocking.
STEP-BY-STEP IMPLEMENTATION:
1. Extract numpy arrays and get dimensions
2. Pre-allocate output matrix
3. Use three nested loops for blocks: block_i, block_j, block_k
4. Within each block, use three nested loops for elements: i, j, k
5. Process data in cache-sized blocks for better locality
BLOCKING ALGORITHM:
1. Divide matrices into blocks of size block_size x block_size
2. For each block of C, compute contribution from corresponding A and B blocks
3. This keeps data in cache longer, reducing memory access time
CACHE OPTIMIZATION PRINCIPLES:
- Process data in small blocks that fit in cache
- Reuse data as much as possible while it's in cache
- Access memory in predictable patterns
- Minimize cache misses
EXAMPLE USAGE:
```python
A = Tensor([[1, 2], [3, 4]])
B = Tensor([[5, 6], [7, 8]])
C = cache_friendly_matmul(A, B, block_size=2)
# Expected: [[19, 22], [43, 50]]
```
PERFORMANCE HINTS:
- block_size should be chosen based on cache size
- Typical L1 cache: 32KB, so block_size=32 for float32 matrices
- Experiment with different block sizes for your hardware
- This algorithm is O(n^3) but with much better constants
LEARNING CONNECTIONS:
- This is how BLAS libraries achieve high performance
- GPUs use similar tiling strategies for shared memory
- Modern compilers can sometimes do this automatically
"""
### BEGIN SOLUTION
# Extract numpy arrays
A_data = A.data if hasattr(A, 'data') else A
B_data = B.data if hasattr(B, 'data') else B
# Get dimensions
m, k = A_data.shape
k2, n = B_data.shape
assert k == k2, f"Cannot multiply {A_data.shape} and {B_data.shape}"
# Pre-allocate output matrix
C = np.zeros((m, n), dtype=A_data.dtype)
# Cache-friendly blocked matrix multiplication
for block_i in range(0, m, block_size):
for block_j in range(0, n, block_size):
for block_k in range(0, k, block_size):
# Define block boundaries
end_i = min(block_i + block_size, m)
end_j = min(block_j + block_size, n)
end_k = min(block_k + block_size, k)
# Process block - good cache locality
for i in range(block_i, end_i):
for j in range(block_j, end_j):
for k_idx in range(block_k, end_k):
C[i, j] += A_data[i, k_idx] * B_data[k_idx, j]
return Tensor(C)
### END SOLUTION
# %% ../../modules/source/temp_holding/13_kernels/kernels_dev.ipynb 18
def parallel_relu(x: Tensor, num_workers: int = 4) -> Tensor:
"""
Parallel ReLU implementation using multiple CPU cores.
This function demonstrates data parallelism by splitting the input
across multiple worker processes.
TODO: Implement parallel ReLU using multiprocessing or threading.
STEP-BY-STEP IMPLEMENTATION:
1. Extract numpy array from Tensor
2. Split array into chunks for parallel processing
3. Define worker function that applies ReLU to a chunk
4. Use ThreadPoolExecutor to process chunks in parallel
5. Combine results from all workers
6. Return result as Tensor
PARALLELIZATION STRATEGY:
1. Split input into num_workers chunks
2. Each worker processes its chunk independently
3. Apply ReLU: max(0, x) to each chunk
4. Combine results preserving original order
EXAMPLE USAGE:
```python
x = Tensor(np.random.randn(1000))
y = parallel_relu(x, num_workers=4)
# Processes data using 4 parallel workers
```
PERFORMANCE CONSIDERATIONS:
- Overhead of parallel processing may not be worth it for small arrays
- Threading vs multiprocessing trade-offs
- Chunk size should be large enough to amortize overhead
- Consider memory bandwidth limitations
LEARNING CONNECTIONS:
- This is how PyTorch processes batches in parallel
- GPUs naturally do this with thousands of parallel threads
- Modern deep learning frameworks heavily use parallelism
"""
### BEGIN SOLUTION
from concurrent.futures import ThreadPoolExecutor
# Extract numpy array
x_data = x.data if hasattr(x, 'data') else x
# For small arrays, parallel processing isn't worth the overhead
if x_data.size < 1000:
return Tensor(np.maximum(0, x_data))
# Split array into chunks
chunk_size = max(1, x_data.size // num_workers)
chunks = []
flat_data = x_data.flatten()
for i in range(0, len(flat_data), chunk_size):
chunks.append(flat_data[i:i + chunk_size])
# Worker function
def relu_chunk(chunk):
return np.maximum(0, chunk)
# Process chunks in parallel
with ThreadPoolExecutor(max_workers=num_workers) as executor:
future_to_chunk = {executor.submit(relu_chunk, chunk): i for i, chunk in enumerate(chunks)}
results = [None] * len(chunks)
for future in future_to_chunk:
chunk_idx = future_to_chunk[future]
results[chunk_idx] = future.result()
# Combine results
combined_result = np.concatenate(results)
# Reshape back to original shape
result = combined_result.reshape(x_data.shape)
return Tensor(result)
### END SOLUTION
# %% ../../modules/source/temp_holding/13_kernels/kernels_dev.ipynb 19
def parallel_batch_processing(batch_data: List[Tensor], operation: Callable, num_workers: int = 4) -> List[Tensor]:
"""
Process a batch of tensors in parallel using multiple workers.
This function demonstrates how to parallelize operations across
multiple data samples, similar to how modern ML frameworks work.
TODO: Implement parallel batch processing.
STEP-BY-STEP IMPLEMENTATION:
1. Take a list of Tensors and an operation function
2. Use ThreadPoolExecutor to process multiple tensors simultaneously
3. Apply the operation to each tensor in parallel
4. Return list of results in original order
PARALLELIZATION STRATEGY:
1. Each worker processes one tensor at a time
2. Multiple workers can process different tensors simultaneously
3. Preserve order of results to match input order
EXAMPLE USAGE:
```python
batch = [Tensor(np.random.randn(100, 100)) for _ in range(8)]
relu_op = lambda x: vectorized_relu(x)
results = parallel_batch_processing(batch, relu_op, num_workers=4)
# Processes 8 tensors using 4 parallel workers
```
PERFORMANCE CONSIDERATIONS:
- Each tensor should be large enough to justify parallel overhead
- Balance number of workers with available CPU cores
- Consider memory usage with multiple workers
- Thread vs process pool trade-offs
LEARNING CONNECTIONS:
- This is how PyTorch's DataLoader processes batches
- Similar to how GPUs process multiple samples simultaneously
- Foundation for distributed training across multiple nodes
"""
### BEGIN SOLUTION
from concurrent.futures import ThreadPoolExecutor
# For small batches, parallel processing might not be worth it
if len(batch_data) < num_workers:
return [operation(tensor) for tensor in batch_data]
# Process batch in parallel
with ThreadPoolExecutor(max_workers=num_workers) as executor:
# Submit all tasks
future_to_index = {executor.submit(operation, tensor): i for i, tensor in enumerate(batch_data)}
# Collect results in original order
results = [None] * len(batch_data)
for future in future_to_index:
index = future_to_index[future]
results[index] = future.result()
return results
### END SOLUTION
# %% ../../modules/source/temp_holding/13_kernels/kernels_dev.ipynb 24
def quantized_matmul(A: Tensor, B: Tensor, scale_A: float = 1.0, scale_B: float = 1.0) -> Tensor:
"""
Quantized matrix multiplication kernel for compressed models.
This function demonstrates how to perform matrix multiplication
with quantized (int8) weights while maintaining numerical accuracy.
TODO: Implement quantized matrix multiplication.
STEP-BY-STEP IMPLEMENTATION:
1. Extract numpy arrays from Tensors
2. Quantize inputs to int8 using provided scales
3. Perform integer matrix multiplication
4. Rescale result back to appropriate range
5. Return result as Tensor
QUANTIZATION PROCESS:
1. Quantize: int8_value = round(float_value / scale)
2. Compute: int8_result = int8_A @ int8_B
3. Rescale: float_result = int8_result * scale_A * scale_B
EXAMPLE USAGE:
```python
A = Tensor([[1.0, 2.0], [3.0, 4.0]])
B = Tensor([[0.5, 1.5], [2.5, 3.5]])
C = quantized_matmul(A, B, scale_A=1.0/127, scale_B=1.0/127)
# Should approximate regular matrix multiplication
```
PERFORMANCE CONSIDERATIONS:
- int8 operations are often faster than float32
- Memory usage is 4x lower
- Accumulation in int32 to prevent overflow
- Careful handling of scales to maintain precision
LEARNING CONNECTIONS:
- This is how TensorFlow Lite performs quantized inference
- Similar to how mobile ML accelerators work
- Foundation for edge deployment of neural networks
"""
### BEGIN SOLUTION
# Extract numpy arrays
A_data = A.data if hasattr(A, 'data') else A
B_data = B.data if hasattr(B, 'data') else B
# Quantize inputs to int8
A_int8 = np.round(A_data / scale_A).astype(np.int8)
B_int8 = np.round(B_data / scale_B).astype(np.int8)
# Perform integer matrix multiplication
# Use int32 for accumulation to prevent overflow
C_int32 = np.dot(A_int8.astype(np.int32), B_int8.astype(np.int32))
# Rescale result back to float
C_float = C_int32 * scale_A * scale_B
return Tensor(C_float)
### END SOLUTION
# %% ../../modules/source/temp_holding/13_kernels/kernels_dev.ipynb 25
def quantized_relu(x: Tensor, scale: float = 1.0) -> Tensor:
"""
Quantized ReLU implementation for compressed models.
This function shows how to apply ReLU activation to quantized values
while maintaining the quantization format.
TODO: Implement quantized ReLU activation.
STEP-BY-STEP IMPLEMENTATION:
1. Extract numpy array from Tensor
2. Quantize input to int8 using provided scale
3. Apply ReLU in integer domain: max(0, x)
4. Keep result in int8 format (no rescaling needed for ReLU)
5. Convert back to float using scale
6. Return result as Tensor
QUANTIZED RELU PROCESS:
1. Quantize: int8_value = round(float_value / scale)
2. Apply ReLU: int8_result = max(0, int8_value)
3. Dequantize: float_result = int8_result * scale
EXAMPLE USAGE:
```python
x = Tensor([-1.0, 0.0, 1.0, 2.0])
y = quantized_relu(x, scale=1.0/127)
# Should produce [0.0, 0.0, 1.0, 2.0] (approximately)
```
OPTIMIZATION NOTES:
- ReLU in int8 is just max(0, x) - very fast
- No floating-point operations needed during activation
- Maintains quantization format throughout
- Can be vectorized efficiently
LEARNING CONNECTIONS:
- This is how quantized neural networks maintain speed
- Similar to how mobile processors optimize ML inference
- Foundation for real-time edge computing applications
"""
### BEGIN SOLUTION
# Extract numpy array
x_data = x.data if hasattr(x, 'data') else x
# Quantize input to int8
x_int8 = np.round(x_data / scale).astype(np.int8)
# Apply ReLU in integer domain
x_relu_int8 = np.maximum(0, x_int8)
# Convert back to float
x_relu_float = x_relu_int8 * scale
return Tensor(x_relu_float)
### END SOLUTION
# %% ../../modules/source/temp_holding/13_kernels/kernels_dev.ipynb 29
class KernelOptimizationProfiler:
"""
Production-grade kernel optimization profiler for ML systems.
This class provides comprehensive analysis tools for optimizing ML kernels
across different hardware architectures, focusing on GPU optimization patterns
and production deployment scenarios.
Key Features:
- CUDA kernel performance analysis
- Memory coalescing pattern detection
- Warp divergence analysis
- Shared memory optimization
- Tensor core utilization metrics
- Kernel fusion opportunities
- Multi-GPU scaling analysis
"""
def __init__(self, hardware_config: Optional[Dict[str, Any]] = None):
"""
Initialize the kernel optimization profiler.
Args:
hardware_config: Dictionary containing hardware specifications
"""
self.hardware_config = hardware_config or self._detect_hardware()
self.profile_results = {}
self.optimization_recommendations = []
def _detect_hardware(self) -> Dict[str, Any]:
"""Detect current hardware configuration."""
return {
'cpu_cores': psutil.cpu_count(),
'memory_gb': psutil.virtual_memory().total // (1024**3),
'cache_sizes': {
'l1': 32768, # Typical L1 cache size in bytes
'l2': 262144, # Typical L2 cache size in bytes
'l3': 8388608 # Typical L3 cache size in bytes
},
'gpu_available': False, # Would check for CUDA/OpenCL in real implementation
'gpu_memory_gb': 0,
'tensor_cores': False,
'warp_size': 32 # NVIDIA GPU warp size
}
def analyze_cuda_kernel_performance(self, kernel_func: Callable, input_data: Tensor,
iterations: int = 100) -> Dict[str, Any]:
"""
Analyze CUDA kernel performance characteristics.
In a real implementation, this would interface with CUDA profiling tools
to measure actual GPU kernel performance metrics.
"""
# Simulate CUDA kernel analysis
total_time = 0
memory_bandwidth = 0
compute_utilization = 0
for _ in range(iterations):
result, execution_time = time_kernel(kernel_func, input_data)
total_time += execution_time
# Simulate GPU metrics calculation
data_size = input_data.data.nbytes
memory_bandwidth += (data_size * 2) / (execution_time / 1_000_000) # Read + Write
compute_utilization += np.random.uniform(0.3, 0.9) # Simulated utilization
avg_time = total_time / iterations
avg_bandwidth = memory_bandwidth / iterations
avg_utilization = compute_utilization / iterations
analysis = {
'avg_execution_time_us': avg_time,
'memory_bandwidth_gb_s': avg_bandwidth / (1024**3),
'compute_utilization': avg_utilization,
'theoretical_peak_bandwidth': 900, # GB/s for high-end GPU
'bandwidth_efficiency': min(100, (avg_bandwidth / (1024**3)) / 900 * 100),
'bottleneck_analysis': self._identify_bottlenecks(avg_bandwidth / (1024**3), avg_utilization)
}
self.profile_results['cuda_analysis'] = analysis
return analysis
def analyze_memory_coalescing(self, access_pattern: str, data_shape: Tuple[int, ...]) -> Dict[str, Any]:
"""
Analyze memory access patterns for GPU coalescing efficiency.
Memory coalescing is critical for GPU performance - threads in a warp
should access contiguous memory locations.
"""
coalescing_efficiency = 1.0
if access_pattern == 'row_major':
# Good coalescing for row-major access
coalescing_efficiency = 0.95
elif access_pattern == 'column_major':
# Poor coalescing for column-major access
coalescing_efficiency = 0.3
elif access_pattern == 'strided':
# Moderate coalescing for strided access
stride = data_shape[1] if len(data_shape) > 1 else 1
coalescing_efficiency = max(0.1, 1.0 / stride)
elif access_pattern == 'random':
# Very poor coalescing for random access
coalescing_efficiency = 0.1
analysis = {
'access_pattern': access_pattern,
'data_shape': data_shape,
'coalescing_efficiency': coalescing_efficiency,
'memory_transactions': self._calculate_memory_transactions(data_shape, coalescing_efficiency),
'optimization_potential': 1.0 - coalescing_efficiency
}
self.profile_results['memory_coalescing'] = analysis
return analysis
def analyze_warp_divergence(self, conditional_operations: int, total_operations: int) -> Dict[str, Any]:
"""
Analyze warp divergence patterns in kernel execution.
Warp divergence occurs when threads in a warp take different execution paths,
reducing parallelism efficiency.
"""
divergence_ratio = conditional_operations / total_operations
efficiency_loss = divergence_ratio * 0.5 # Simplified model
analysis = {
'conditional_operations': conditional_operations,
'total_operations': total_operations,
'divergence_ratio': divergence_ratio,
'efficiency_loss': efficiency_loss,
'warp_efficiency': 1.0 - efficiency_loss,
'optimization_suggestions': self._generate_divergence_optimizations(divergence_ratio)
}
self.profile_results['warp_divergence'] = analysis
return analysis
def analyze_shared_memory_usage(self, kernel_data_size: int, reuse_factor: float) -> Dict[str, Any]:
"""
Analyze shared memory optimization opportunities.
Shared memory is fast on-chip memory that can dramatically improve
performance when used effectively for data reuse.
"""
shared_memory_size = 48 * 1024 # 48KB typical shared memory per SM
bank_conflicts = self._estimate_bank_conflicts(kernel_data_size)
analysis = {
'data_size_bytes': kernel_data_size,
'shared_memory_available': shared_memory_size,
'utilization_ratio': min(1.0, kernel_data_size / shared_memory_size),
'reuse_factor': reuse_factor,
'bank_conflicts': bank_conflicts,
'performance_gain': min(10.0, reuse_factor * (1.0 - bank_conflicts)),
'optimization_opportunities': self._identify_shared_memory_optimizations(kernel_data_size, reuse_factor)
}
self.profile_results['shared_memory'] = analysis
return analysis
def analyze_tensor_core_utilization(self, operation_type: str, data_types: List[str]) -> Dict[str, Any]:
"""
Analyze tensor core utilization for mixed-precision operations.
Tensor cores provide massive acceleration for mixed-precision matrix operations
when data shapes and types are optimized correctly.
"""
tensor_core_compatible = (
operation_type in ['matmul', 'conv2d'] and
any(dtype in ['float16', 'bfloat16', 'int8'] for dtype in data_types)
)
if tensor_core_compatible:
theoretical_speedup = 4.0 # Typical tensor core speedup
actual_utilization = 0.7 # Realistic utilization
else:
theoretical_speedup = 1.0
actual_utilization = 0.0
analysis = {
'operation_type': operation_type,
'data_types': data_types,
'tensor_core_compatible': tensor_core_compatible,
'theoretical_speedup': theoretical_speedup,
'actual_utilization': actual_utilization,
'performance_gain': theoretical_speedup * actual_utilization,
'optimization_requirements': self._get_tensor_core_requirements()
}
self.profile_results['tensor_core'] = analysis
return analysis
def analyze_kernel_fusion_opportunities(self, operation_sequence: List[str]) -> Dict[str, Any]:
"""
Analyze opportunities for kernel fusion to reduce memory overhead.
Kernel fusion combines multiple operations into a single kernel,
reducing memory bandwidth requirements and improving performance.
"""
fusable_patterns = [
['matmul', 'relu'],
['conv2d', 'batchnorm', 'relu'],
['add', 'relu'],
['mul', 'add']
]
fusion_opportunities = []
memory_savings = 0
for pattern in fusable_patterns:
if self._sequence_contains_pattern(operation_sequence, pattern):
fusion_opportunities.append(pattern)
memory_savings += len(pattern) - 1 # Save intermediate results
analysis = {
'operation_sequence': operation_sequence,
'fusion_opportunities': fusion_opportunities,
'memory_savings_factor': memory_savings,
'performance_improvement': min(2.0, 1 + memory_savings * 0.3),
'implementation_complexity': len(fusion_opportunities) * 2
}
self.profile_results['kernel_fusion'] = analysis
return analysis
def analyze_multi_gpu_scaling(self, data_size: int, num_gpus: int) -> Dict[str, Any]:
"""
Analyze multi-GPU scaling patterns and communication overhead.
Multi-GPU deployments require careful optimization of data distribution
and communication patterns to achieve good scaling efficiency.
"""
communication_overhead = self._calculate_communication_overhead(data_size, num_gpus)
compute_scaling = min(num_gpus, data_size / 1000) # Simplified scaling model
analysis = {
'data_size': data_size,
'num_gpus': num_gpus,
'communication_overhead': communication_overhead,
'compute_scaling': compute_scaling,
'scaling_efficiency': compute_scaling / num_gpus,
'bottleneck_type': 'communication' if communication_overhead > 0.3 else 'compute',
'optimization_strategies': self._get_multi_gpu_optimizations(communication_overhead)
}
self.profile_results['multi_gpu'] = analysis
return analysis
def generate_optimization_report(self) -> str:
"""Generate comprehensive optimization report with recommendations."""
report = ["🚀 Kernel Optimization Analysis Report", "=" * 50, ""]
for analysis_type, results in self.profile_results.items():
report.append(f"📊 {analysis_type.replace('_', ' ').title()} Analysis:")
report.append("-" * 30)
for key, value in results.items():
if isinstance(value, float):
report.append(f" {key}: {value:.3f}")
elif isinstance(value, list):
report.append(f" {key}: {', '.join(map(str, value))}")
else:
report.append(f" {key}: {value}")
report.append("")
# Add optimization recommendations
report.append("🎯 Optimization Recommendations:")
report.append("-" * 30)
for rec in self.optimization_recommendations:
report.append(f"{rec}")
return "\n".join(report)
# Helper methods
def _identify_bottlenecks(self, bandwidth_gb_s: float, utilization: float) -> str:
"""Identify performance bottlenecks."""
if bandwidth_gb_s < 100:
return "Memory bandwidth limited"
elif utilization < 0.5:
return "Compute utilization limited"
else:
return "Well balanced"
def _calculate_memory_transactions(self, shape: Tuple[int, ...], efficiency: float) -> int:
"""Calculate memory transaction count."""
total_elements = np.prod(shape)
return int(total_elements / (32 * efficiency)) # 32 threads per warp
def _generate_divergence_optimizations(self, divergence_ratio: float) -> List[str]:
"""Generate warp divergence optimization suggestions."""
suggestions = []
if divergence_ratio > 0.3:
suggestions.append("Reduce conditional operations in inner loops")
suggestions.append("Use predicated execution instead of branching")
if divergence_ratio > 0.5:
suggestions.append("Restructure algorithm to minimize thread divergence")
return suggestions
def _estimate_bank_conflicts(self, data_size: int) -> float:
"""Estimate shared memory bank conflicts."""
# Simplified model - assumes some degree of bank conflicts
return min(0.5, data_size / (32 * 4)) # 32 banks, 4 bytes per bank
def _identify_shared_memory_optimizations(self, size: int, reuse: float) -> List[str]:
"""Identify shared memory optimization opportunities."""
optimizations = []
if reuse > 2.0:
optimizations.append("High reuse factor - shared memory beneficial")
if size < 16384: # 16KB
optimizations.append("Data fits in shared memory - implement tiling")
return optimizations
def _get_tensor_core_requirements(self) -> List[str]:
"""Get tensor core optimization requirements."""
return [
"Use mixed precision (float16/bfloat16)",
"Ensure matrix dimensions are multiples of 8",
"Use proper memory layout (NHWC for convolutions)"
]
def _sequence_contains_pattern(self, sequence: List[str], pattern: List[str]) -> bool:
"""Check if operation sequence contains fusable pattern."""
for i in range(len(sequence) - len(pattern) + 1):
if sequence[i:i+len(pattern)] == pattern:
return True
return False
def _calculate_communication_overhead(self, data_size: int, num_gpus: int) -> float:
"""Calculate multi-GPU communication overhead."""
# Simplified model based on data size and GPU count
return min(0.8, (data_size / 1000) / num_gpus + 0.1)
def _get_multi_gpu_optimizations(self, overhead: float) -> List[str]:
"""Get multi-GPU optimization strategies."""
strategies = []
if overhead > 0.3:
strategies.append("Implement gradient compression")
strategies.append("Use asynchronous communication")
if overhead > 0.5:
strategies.append("Increase batch size to amortize communication")
return strategies
# %% ../../modules/source/temp_holding/13_kernels/kernels_dev.ipynb 32
class KernelOptimizationProfiler:
"""
Production-grade kernel optimization profiler for ML systems.
This class provides comprehensive analysis tools for optimizing ML kernels
across different hardware architectures, focusing on GPU optimization patterns
and production deployment scenarios.
Key Features:
- CUDA kernel performance analysis
- Memory coalescing pattern detection
- Warp divergence analysis
- Shared memory optimization
- Tensor core utilization metrics
- Kernel fusion opportunities
- Multi-GPU scaling analysis
"""
def __init__(self, hardware_config: Optional[Dict[str, Any]] = None):
"""
Initialize the kernel optimization profiler.
Args:
hardware_config: Dictionary containing hardware specifications
"""
self.hardware_config = hardware_config or self._detect_hardware()
self.profile_results = {}
self.optimization_recommendations = []
def _detect_hardware(self) -> Dict[str, Any]:
"""Detect current hardware configuration."""
return {
'cpu_cores': psutil.cpu_count(),
'memory_gb': psutil.virtual_memory().total // (1024**3),
'cache_sizes': {
'l1': 32768, # Typical L1 cache size in bytes
'l2': 262144, # Typical L2 cache size in bytes
'l3': 8388608 # Typical L3 cache size in bytes
},
'gpu_available': False, # Would check for CUDA/OpenCL in real implementation
'gpu_memory_gb': 0,
'tensor_cores': False,
'warp_size': 32 # NVIDIA GPU warp size
}
def analyze_cuda_kernel_performance(self, kernel_func: Callable, input_data: Tensor,
iterations: int = 100) -> Dict[str, Any]:
"""
Analyze CUDA kernel performance characteristics.
In a real implementation, this would interface with CUDA profiling tools
to measure actual GPU kernel performance metrics.
"""
# Simulate CUDA kernel analysis
total_time = 0
memory_bandwidth = 0
compute_utilization = 0
for _ in range(iterations):
result, execution_time = time_kernel(kernel_func, input_data)
total_time += execution_time
# Simulate GPU metrics calculation
data_size = input_data.data.nbytes
memory_bandwidth += (data_size * 2) / (execution_time / 1_000_000) # Read + Write
compute_utilization += np.random.uniform(0.3, 0.9) # Simulated utilization
avg_time = total_time / iterations
avg_bandwidth = memory_bandwidth / iterations
avg_utilization = compute_utilization / iterations
analysis = {
'avg_execution_time_us': avg_time,
'memory_bandwidth_gb_s': avg_bandwidth / (1024**3),
'compute_utilization': avg_utilization,
'theoretical_peak_bandwidth': 900, # GB/s for high-end GPU
'bandwidth_efficiency': min(100, (avg_bandwidth / (1024**3)) / 900 * 100),
'bottleneck_analysis': self._identify_bottlenecks(avg_bandwidth / (1024**3), avg_utilization)
}
self.profile_results['cuda_analysis'] = analysis
return analysis
def analyze_memory_coalescing(self, access_pattern: str, data_shape: Tuple[int, ...]) -> Dict[str, Any]:
"""
Analyze memory access patterns for GPU coalescing efficiency.
Memory coalescing is critical for GPU performance - threads in a warp
should access contiguous memory locations.
"""
coalescing_efficiency = 1.0
if access_pattern == 'row_major':
# Good coalescing for row-major access
coalescing_efficiency = 0.95
elif access_pattern == 'column_major':
# Poor coalescing for column-major access
coalescing_efficiency = 0.3
elif access_pattern == 'strided':
# Moderate coalescing for strided access
stride = data_shape[1] if len(data_shape) > 1 else 1
coalescing_efficiency = max(0.1, 1.0 / stride)
elif access_pattern == 'random':
# Very poor coalescing for random access
coalescing_efficiency = 0.1
analysis = {
'access_pattern': access_pattern,
'data_shape': data_shape,
'coalescing_efficiency': coalescing_efficiency,
'memory_transactions': self._calculate_memory_transactions(data_shape, coalescing_efficiency),
'optimization_potential': 1.0 - coalescing_efficiency
}
self.profile_results['memory_coalescing'] = analysis
return analysis
def analyze_warp_divergence(self, conditional_operations: int, total_operations: int) -> Dict[str, Any]:
"""
Analyze warp divergence patterns in kernel execution.
Warp divergence occurs when threads in a warp take different execution paths,
reducing parallelism efficiency.
"""
divergence_ratio = conditional_operations / total_operations
efficiency_loss = divergence_ratio * 0.5 # Simplified model
analysis = {
'conditional_operations': conditional_operations,
'total_operations': total_operations,
'divergence_ratio': divergence_ratio,
'efficiency_loss': efficiency_loss,
'warp_efficiency': 1.0 - efficiency_loss,
'optimization_suggestions': self._generate_divergence_optimizations(divergence_ratio)
}
self.profile_results['warp_divergence'] = analysis
return analysis
def analyze_shared_memory_usage(self, kernel_data_size: int, reuse_factor: float) -> Dict[str, Any]:
"""
Analyze shared memory optimization opportunities.
Shared memory is fast on-chip memory that can dramatically improve
performance when used effectively for data reuse.
"""
shared_memory_size = 48 * 1024 # 48KB typical shared memory per SM
bank_conflicts = self._estimate_bank_conflicts(kernel_data_size)
analysis = {
'data_size_bytes': kernel_data_size,
'shared_memory_available': shared_memory_size,
'utilization_ratio': min(1.0, kernel_data_size / shared_memory_size),
'reuse_factor': reuse_factor,
'bank_conflicts': bank_conflicts,
'performance_gain': min(10.0, reuse_factor * (1.0 - bank_conflicts)),
'optimization_opportunities': self._identify_shared_memory_optimizations(kernel_data_size, reuse_factor)
}
self.profile_results['shared_memory'] = analysis
return analysis
def analyze_tensor_core_utilization(self, operation_type: str, data_types: List[str]) -> Dict[str, Any]:
"""
Analyze tensor core utilization for mixed-precision operations.
Tensor cores provide massive acceleration for mixed-precision matrix operations
when data shapes and types are optimized correctly.
"""
tensor_core_compatible = (
operation_type in ['matmul', 'conv2d'] and
any(dtype in ['float16', 'bfloat16', 'int8'] for dtype in data_types)
)
if tensor_core_compatible:
theoretical_speedup = 4.0 # Typical tensor core speedup
actual_utilization = 0.7 # Realistic utilization
else:
theoretical_speedup = 1.0
actual_utilization = 0.0
analysis = {
'operation_type': operation_type,
'data_types': data_types,
'tensor_core_compatible': tensor_core_compatible,
'theoretical_speedup': theoretical_speedup,
'actual_utilization': actual_utilization,
'performance_gain': theoretical_speedup * actual_utilization,
'optimization_requirements': self._get_tensor_core_requirements()
}
self.profile_results['tensor_core'] = analysis
return analysis
def analyze_kernel_fusion_opportunities(self, operation_sequence: List[str]) -> Dict[str, Any]:
"""
Analyze opportunities for kernel fusion to reduce memory overhead.
Kernel fusion combines multiple operations into a single kernel,
reducing memory bandwidth requirements and improving performance.
"""
fusable_patterns = [
['matmul', 'relu'],
['conv2d', 'batchnorm', 'relu'],
['add', 'relu'],
['mul', 'add']
]
fusion_opportunities = []
memory_savings = 0
for pattern in fusable_patterns:
if self._sequence_contains_pattern(operation_sequence, pattern):
fusion_opportunities.append(pattern)
memory_savings += len(pattern) - 1 # Save intermediate results
analysis = {
'operation_sequence': operation_sequence,
'fusion_opportunities': fusion_opportunities,
'memory_savings_factor': memory_savings,
'performance_improvement': min(2.0, 1 + memory_savings * 0.3),
'implementation_complexity': len(fusion_opportunities) * 2
}
self.profile_results['kernel_fusion'] = analysis
return analysis
def analyze_multi_gpu_scaling(self, data_size: int, num_gpus: int) -> Dict[str, Any]:
"""
Analyze multi-GPU scaling patterns and communication overhead.
Multi-GPU deployments require careful optimization of data distribution
and communication patterns to achieve good scaling efficiency.
"""
communication_overhead = self._calculate_communication_overhead(data_size, num_gpus)
compute_scaling = min(num_gpus, data_size / 1000) # Simplified scaling model
analysis = {
'data_size': data_size,
'num_gpus': num_gpus,
'communication_overhead': communication_overhead,
'compute_scaling': compute_scaling,
'scaling_efficiency': compute_scaling / num_gpus,
'bottleneck_type': 'communication' if communication_overhead > 0.3 else 'compute',
'optimization_strategies': self._get_multi_gpu_optimizations(communication_overhead)
}
self.profile_results['multi_gpu'] = analysis
return analysis
def generate_optimization_report(self) -> str:
"""Generate comprehensive optimization report with recommendations."""
report = ["🚀 Kernel Optimization Analysis Report", "=" * 50, ""]
for analysis_type, results in self.profile_results.items():
report.append(f"📊 {analysis_type.replace('_', ' ').title()} Analysis:")
report.append("-" * 30)
for key, value in results.items():
if isinstance(value, float):
report.append(f" {key}: {value:.3f}")
elif isinstance(value, list):
report.append(f" {key}: {', '.join(map(str, value))}")
else:
report.append(f" {key}: {value}")
report.append("")
# Add optimization recommendations
report.append("🎯 Optimization Recommendations:")
report.append("-" * 30)
for rec in self.optimization_recommendations:
report.append(f"{rec}")
return "\n".join(report)
# Helper methods
def _identify_bottlenecks(self, bandwidth_gb_s: float, utilization: float) -> str:
"""Identify performance bottlenecks."""
if bandwidth_gb_s < 100:
return "Memory bandwidth limited"
elif utilization < 0.5:
return "Compute utilization limited"
else:
return "Well balanced"
def _calculate_memory_transactions(self, shape: Tuple[int, ...], efficiency: float) -> int:
"""Calculate memory transaction count."""
total_elements = np.prod(shape)
return int(total_elements / (32 * efficiency)) # 32 threads per warp
def _generate_divergence_optimizations(self, divergence_ratio: float) -> List[str]:
"""Generate warp divergence optimization suggestions."""
suggestions = []
if divergence_ratio > 0.3:
suggestions.append("Reduce conditional operations in inner loops")
suggestions.append("Use predicated execution instead of branching")
if divergence_ratio > 0.5:
suggestions.append("Restructure algorithm to minimize thread divergence")
return suggestions
def _estimate_bank_conflicts(self, data_size: int) -> float:
"""Estimate shared memory bank conflicts."""
# Simplified model - assumes some degree of bank conflicts
return min(0.5, data_size / (32 * 4)) # 32 banks, 4 bytes per bank
def _identify_shared_memory_optimizations(self, size: int, reuse: float) -> List[str]:
"""Identify shared memory optimization opportunities."""
optimizations = []
if reuse > 2.0:
optimizations.append("High reuse factor - shared memory beneficial")
if size < 16384: # 16KB
optimizations.append("Data fits in shared memory - implement tiling")
return optimizations
def _get_tensor_core_requirements(self) -> List[str]:
"""Get tensor core optimization requirements."""
return [
"Use mixed precision (float16/bfloat16)",
"Ensure matrix dimensions are multiples of 8",
"Use proper memory layout (NHWC for convolutions)"
]
def _sequence_contains_pattern(self, sequence: List[str], pattern: List[str]) -> bool:
"""Check if operation sequence contains fusable pattern."""
for i in range(len(sequence) - len(pattern) + 1):
if sequence[i:i+len(pattern)] == pattern:
return True
return False
def _calculate_communication_overhead(self, data_size: int, num_gpus: int) -> float:
"""Calculate multi-GPU communication overhead."""
# Simplified model based on data size and GPU count
return min(0.8, (data_size / 1000) / num_gpus + 0.1)
def _get_multi_gpu_optimizations(self, overhead: float) -> List[str]:
"""Get multi-GPU optimization strategies."""
strategies = []
if overhead > 0.3:
strategies.append("Implement gradient compression")
strategies.append("Use asynchronous communication")
if overhead > 0.5:
strategies.append("Increase batch size to amortize communication")
return strategies