mirror of
https://github.com/MLSysBook/TinyTorch.git
synced 2026-03-26 10:02:19 -05:00
387 lines
15 KiB
Python
387 lines
15 KiB
Python
"""
|
|
Checkpoint 15: Acceleration (After Module 15 - Acceleration)
|
|
Question: "Can I accelerate computations through algorithmic optimization?"
|
|
"""
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
def test_checkpoint_15_acceleration():
|
|
"""
|
|
Checkpoint 15: Acceleration
|
|
|
|
Validates that students can implement algorithmic acceleration techniques
|
|
that provide "free" speedups through better algorithms and hardware
|
|
utilization without sacrificing accuracy.
|
|
"""
|
|
print("\n🚀 Checkpoint 15: Acceleration")
|
|
print("=" * 50)
|
|
|
|
try:
|
|
# Import acceleration components
|
|
from tinytorch.core.tensor import Tensor
|
|
from tinytorch.core.layers import Linear, Conv2D
|
|
from tinytorch.core.activations import ReLU
|
|
from tinytorch.core.networks import Sequential
|
|
from tinytorch.core.kernels import (
|
|
time_kernel, vectorized_relu, optimized_matmul,
|
|
cache_efficient_conv, memory_pool_allocator
|
|
)
|
|
from tinytorch.core.acceleration import (
|
|
AlgorithmicOptimizer, VectorizedOperations, CacheOptimizer,
|
|
ParallelCompute, MemoryOptimizer
|
|
)
|
|
except ImportError as e:
|
|
pytest.fail(f"❌ Cannot import acceleration classes - complete Module 15 first: {e}")
|
|
|
|
# Test 1: Vectorized operations
|
|
print("⚡ Testing vectorized operations...")
|
|
|
|
try:
|
|
# Create test data
|
|
input_data = np.random.randn(1000, 256).astype(np.float32)
|
|
|
|
# Naive vs vectorized ReLU comparison
|
|
vectorized_ops = VectorizedOperations()
|
|
|
|
# Benchmark naive implementation
|
|
def naive_relu(x):
|
|
"""Naive element-wise ReLU implementation."""
|
|
result = np.zeros_like(x)
|
|
for i in range(x.shape[0]):
|
|
for j in range(x.shape[1]):
|
|
result[i, j] = max(0, x[i, j])
|
|
return result
|
|
|
|
# Benchmark vectorized implementation
|
|
naive_time, naive_result = time_kernel(lambda: naive_relu(input_data))
|
|
vectorized_time, vectorized_result = time_kernel(lambda: vectorized_relu(input_data))
|
|
|
|
# Verify results are equivalent
|
|
results_match = np.allclose(naive_result, vectorized_result, rtol=1e-6)
|
|
speedup = naive_time / vectorized_time
|
|
|
|
print(f"✅ Vectorized operations:")
|
|
print(f" Naive time: {naive_time*1000:.2f}ms")
|
|
print(f" Vectorized time: {vectorized_time*1000:.2f}ms")
|
|
print(f" Speedup: {speedup:.1f}x")
|
|
print(f" Results match: {results_match}")
|
|
|
|
# Verify significant speedup
|
|
assert speedup >= 2.0, f"Expected significant speedup, got {speedup:.1f}x"
|
|
assert results_match, "Vectorized and naive results should match"
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Vectorized operations: {e}")
|
|
|
|
# Test 2: Optimized matrix multiplication
|
|
print("🔢 Testing optimized matrix multiplication...")
|
|
|
|
try:
|
|
# Create matrices for multiplication
|
|
A = np.random.randn(512, 256).astype(np.float32)
|
|
B = np.random.randn(256, 128).astype(np.float32)
|
|
|
|
# Standard numpy matmul (baseline)
|
|
numpy_time, numpy_result = time_kernel(lambda: np.dot(A, B))
|
|
|
|
# Optimized matmul
|
|
optimized_time, optimized_result = time_kernel(lambda: optimized_matmul(A, B))
|
|
|
|
# Verify correctness
|
|
matmul_match = np.allclose(numpy_result, optimized_result, rtol=1e-5)
|
|
matmul_speedup = numpy_time / optimized_time
|
|
|
|
print(f"✅ Optimized matrix multiplication:")
|
|
print(f" NumPy time: {numpy_time*1000:.2f}ms")
|
|
print(f" Optimized time: {optimized_time*1000:.2f}ms")
|
|
print(f" Speedup: {matmul_speedup:.1f}x")
|
|
print(f" Results match: {matmul_match}")
|
|
|
|
# Verify optimization effectiveness
|
|
assert matmul_match, "Optimized matmul should produce correct results"
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Optimized matrix multiplication: {e}")
|
|
|
|
# Test 3: Cache-efficient convolution
|
|
print("🏁 Testing cache-efficient convolution...")
|
|
|
|
try:
|
|
# Create convolution test case
|
|
input_tensor = np.random.randn(4, 3, 32, 32).astype(np.float32) # NCHW format
|
|
kernel = np.random.randn(16, 3, 3, 3).astype(np.float32) # Output channels, input channels, H, W
|
|
|
|
# Standard convolution
|
|
def naive_conv2d(input_data, kernel_weights):
|
|
"""Simplified naive convolution for comparison."""
|
|
batch_size, in_channels, input_height, input_width = input_data.shape
|
|
out_channels, _, kernel_height, kernel_width = kernel_weights.shape
|
|
|
|
output_height = input_height - kernel_height + 1
|
|
output_width = input_width - kernel_width + 1
|
|
|
|
output = np.zeros((batch_size, out_channels, output_height, output_width))
|
|
|
|
for b in range(batch_size):
|
|
for oc in range(out_channels):
|
|
for oh in range(output_height):
|
|
for ow in range(output_width):
|
|
for ic in range(in_channels):
|
|
for kh in range(kernel_height):
|
|
for kw in range(kernel_width):
|
|
output[b, oc, oh, ow] += (
|
|
input_data[b, ic, oh + kh, ow + kw] *
|
|
kernel_weights[oc, ic, kh, kw]
|
|
)
|
|
return output
|
|
|
|
# Cache-efficient convolution
|
|
cache_optimizer = CacheOptimizer()
|
|
|
|
naive_conv_time, naive_conv_result = time_kernel(lambda: naive_conv2d(input_tensor, kernel))
|
|
efficient_conv_time, efficient_conv_result = time_kernel(
|
|
lambda: cache_efficient_conv(input_tensor, kernel, cache_optimizer)
|
|
)
|
|
|
|
conv_speedup = naive_conv_time / efficient_conv_time
|
|
conv_match = np.allclose(naive_conv_result, efficient_conv_result, rtol=1e-4)
|
|
|
|
print(f"✅ Cache-efficient convolution:")
|
|
print(f" Naive convolution: {naive_conv_time*1000:.2f}ms")
|
|
print(f" Cache-efficient: {efficient_conv_time*1000:.2f}ms")
|
|
print(f" Speedup: {conv_speedup:.1f}x")
|
|
print(f" Results match: {conv_match}")
|
|
|
|
# Verify cache optimization works
|
|
assert conv_match, "Cache-efficient convolution should produce correct results"
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Cache-efficient convolution: {e}")
|
|
|
|
# Test 4: Memory optimization
|
|
print("💾 Testing memory optimization...")
|
|
|
|
try:
|
|
# Memory pool allocator test
|
|
memory_optimizer = MemoryOptimizer()
|
|
|
|
# Test memory pool vs standard allocation
|
|
allocation_sizes = [1024, 2048, 4096, 8192]
|
|
|
|
# Standard allocation timing
|
|
standard_alloc_times = []
|
|
for size in allocation_sizes:
|
|
alloc_time, _ = time_kernel(lambda: np.zeros(size, dtype=np.float32))
|
|
standard_alloc_times.append(alloc_time)
|
|
|
|
# Memory pool allocation timing
|
|
pool_alloc_times = []
|
|
memory_pool = memory_pool_allocator(max_size=32768)
|
|
|
|
for size in allocation_sizes:
|
|
pool_alloc_time, _ = time_kernel(lambda: memory_pool.allocate(size))
|
|
pool_alloc_times.append(pool_alloc_time)
|
|
|
|
avg_standard_time = np.mean(standard_alloc_times)
|
|
avg_pool_time = np.mean(pool_alloc_times)
|
|
memory_speedup = avg_standard_time / avg_pool_time
|
|
|
|
print(f"✅ Memory optimization:")
|
|
print(f" Standard allocation: {avg_standard_time*1000:.3f}ms average")
|
|
print(f" Pool allocation: {avg_pool_time*1000:.3f}ms average")
|
|
print(f" Memory speedup: {memory_speedup:.1f}x")
|
|
|
|
# Test memory usage reduction
|
|
baseline_memory = sum(size * 4 for size in allocation_sizes) # 4 bytes per float32
|
|
optimized_memory = memory_optimizer.get_peak_usage()
|
|
memory_efficiency = baseline_memory / optimized_memory if optimized_memory > 0 else 1
|
|
|
|
print(f" Memory efficiency: {memory_efficiency:.1f}x")
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Memory optimization: {e}")
|
|
|
|
# Test 5: Parallel computation
|
|
print("🔄 Testing parallel computation...")
|
|
|
|
try:
|
|
# Test parallel vs sequential processing
|
|
parallel_compute = ParallelCompute(num_workers=4)
|
|
|
|
# Create computational workload
|
|
matrices = [np.random.randn(256, 256).astype(np.float32) for _ in range(8)]
|
|
|
|
# Sequential processing
|
|
def sequential_processing(matrix_list):
|
|
results = []
|
|
for matrix in matrix_list:
|
|
# Simulate expensive computation
|
|
result = np.linalg.svd(matrix, compute_uv=False)
|
|
results.append(result)
|
|
return results
|
|
|
|
# Parallel processing
|
|
def parallel_task(matrix):
|
|
return np.linalg.svd(matrix, compute_uv=False)
|
|
|
|
sequential_time, sequential_results = time_kernel(lambda: sequential_processing(matrices))
|
|
parallel_time, parallel_results = time_kernel(
|
|
lambda: parallel_compute.map(parallel_task, matrices)
|
|
)
|
|
|
|
parallel_speedup = sequential_time / parallel_time
|
|
|
|
print(f"✅ Parallel computation:")
|
|
print(f" Sequential time: {sequential_time*1000:.2f}ms")
|
|
print(f" Parallel time: {parallel_time*1000:.2f}ms")
|
|
print(f" Parallel speedup: {parallel_speedup:.1f}x")
|
|
print(f" Workers used: {parallel_compute.num_workers}")
|
|
|
|
# Verify parallel speedup
|
|
assert parallel_speedup >= 1.5, f"Expected parallel speedup, got {parallel_speedup:.1f}x"
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Parallel computation: {e}")
|
|
|
|
# Test 6: Algorithmic optimization patterns
|
|
print("🧠 Testing algorithmic optimization patterns...")
|
|
|
|
try:
|
|
# Test different algorithmic approaches
|
|
optimizer = AlgorithmicOptimizer()
|
|
|
|
# Example: Optimize attention computation
|
|
seq_len = 128
|
|
d_model = 64
|
|
|
|
query = np.random.randn(1, seq_len, d_model).astype(np.float32)
|
|
key = np.random.randn(1, seq_len, d_model).astype(np.float32)
|
|
value = np.random.randn(1, seq_len, d_model).astype(np.float32)
|
|
|
|
# Naive attention computation (O(N²))
|
|
def naive_attention(q, k, v):
|
|
scores = np.matmul(q, k.transpose(0, 2, 1)) / np.sqrt(d_model)
|
|
attention_weights = np.exp(scores) / np.sum(np.exp(scores), axis=-1, keepdims=True)
|
|
output = np.matmul(attention_weights, v)
|
|
return output
|
|
|
|
# Optimized attention (with algorithmic improvements)
|
|
def optimized_attention(q, k, v):
|
|
# Simulate optimized implementation with better memory access patterns
|
|
scores = optimizer.efficient_matmul(q, k.transpose(0, 2, 1)) / np.sqrt(d_model)
|
|
attention_weights = optimizer.stable_softmax(scores)
|
|
output = optimizer.efficient_matmul(attention_weights, v)
|
|
return output
|
|
|
|
naive_attn_time, naive_attn_result = time_kernel(lambda: naive_attention(query, key, value))
|
|
optimized_attn_time, optimized_attn_result = time_kernel(
|
|
lambda: optimized_attention(query, key, value)
|
|
)
|
|
|
|
algorithm_speedup = naive_attn_time / optimized_attn_time
|
|
algorithm_match = np.allclose(naive_attn_result, optimized_attn_result, rtol=1e-5)
|
|
|
|
print(f"✅ Algorithmic optimization:")
|
|
print(f" Naive attention: {naive_attn_time*1000:.2f}ms")
|
|
print(f" Optimized attention: {optimized_attn_time*1000:.2f}ms")
|
|
print(f" Algorithm speedup: {algorithm_speedup:.1f}x")
|
|
print(f" Results match: {algorithm_match}")
|
|
|
|
# Verify algorithmic improvements
|
|
assert algorithm_match, "Optimized algorithm should produce correct results"
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Algorithmic optimization: {e}")
|
|
|
|
# Test 7: End-to-end acceleration pipeline
|
|
print("🏭 Testing end-to-end acceleration...")
|
|
|
|
try:
|
|
# Create model for end-to-end acceleration
|
|
model = Sequential([
|
|
Linear(128, 256),
|
|
ReLU(),
|
|
Linear(256, 128),
|
|
ReLU(),
|
|
Linear(128, 10)
|
|
])
|
|
|
|
# Test data
|
|
test_input = Tensor(np.random.randn(32, 128).astype(np.float32))
|
|
|
|
# Baseline inference
|
|
baseline_time, baseline_output = time_kernel(lambda: model(test_input))
|
|
|
|
# Apply all acceleration techniques
|
|
accelerated_model = optimizer.accelerate_model(model)
|
|
|
|
# Accelerated inference
|
|
accelerated_time, accelerated_output = time_kernel(lambda: accelerated_model(test_input))
|
|
|
|
end_to_end_speedup = baseline_time / accelerated_time
|
|
end_to_end_match = np.allclose(baseline_output.data, accelerated_output.data, rtol=1e-4)
|
|
|
|
print(f"✅ End-to-end acceleration:")
|
|
print(f" Baseline inference: {baseline_time*1000:.2f}ms")
|
|
print(f" Accelerated inference: {accelerated_time*1000:.2f}ms")
|
|
print(f" End-to-end speedup: {end_to_end_speedup:.1f}x")
|
|
print(f" Results match: {end_to_end_match}")
|
|
|
|
# Summary of acceleration techniques applied
|
|
acceleration_techniques = [
|
|
'Vectorized operations',
|
|
'Optimized matrix multiplication',
|
|
'Cache-efficient memory access',
|
|
'Memory pool allocation',
|
|
'Parallel computation',
|
|
'Algorithmic improvements'
|
|
]
|
|
|
|
print(f" Techniques applied: {len(acceleration_techniques)}")
|
|
for technique in acceleration_techniques:
|
|
print(f" - {technique}")
|
|
|
|
# Verify overall acceleration
|
|
assert end_to_end_speedup >= 1.5, f"Expected overall speedup, got {end_to_end_speedup:.1f}x"
|
|
assert end_to_end_match, "Accelerated model should produce correct results"
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ End-to-end acceleration: {e}")
|
|
|
|
# Final acceleration assessment
|
|
print("\n🔬 Acceleration Mastery Assessment...")
|
|
|
|
capabilities = {
|
|
'Vectorized Operations': True,
|
|
'Optimized Matrix Multiplication': True,
|
|
'Cache-Efficient Algorithms': True,
|
|
'Memory Optimization': True,
|
|
'Parallel Computation': True,
|
|
'Algorithmic Optimization': True,
|
|
'End-to-End Acceleration': True
|
|
}
|
|
|
|
mastered_capabilities = sum(capabilities.values())
|
|
total_capabilities = len(capabilities)
|
|
mastery_percentage = mastered_capabilities / total_capabilities * 100
|
|
|
|
print(f"✅ Acceleration capabilities: {mastered_capabilities}/{total_capabilities} mastered ({mastery_percentage:.0f}%)")
|
|
|
|
if mastery_percentage >= 90:
|
|
readiness = "EXPERT - Ready for high-performance computing"
|
|
elif mastery_percentage >= 75:
|
|
readiness = "PROFICIENT - Solid acceleration understanding"
|
|
else:
|
|
readiness = "DEVELOPING - Continue practicing optimization"
|
|
|
|
print(f" Acceleration mastery: {readiness}")
|
|
|
|
print("\n🎉 ACCELERATION CHECKPOINT COMPLETE!")
|
|
print("📝 You can now accelerate computations through algorithmic optimization")
|
|
print("🚀 BREAKTHROUGH: Free speedups through better algorithms and hardware utilization!")
|
|
print("🧠 Key insight: Understanding hardware enables dramatic performance improvements")
|
|
print("⚡ Next: Learn precision-speed trade-offs with quantization!")
|
|
|
|
if __name__ == "__main__":
|
|
test_checkpoint_15_acceleration() |