Files
TinyTorch/tinytorch/core/spatial.py
Vijay Janapa Reddi 89566d26f3 Update generated notebooks and package exports
- Regenerate all .ipynb files from fixed .py modules
- Update tinytorch package exports with corrected implementations
- Sync package module index with current 16-module structure

These generated files reflect all the module fixes and ensure consistent
.py ↔ .ipynb conversion with the updated module implementations.
2025-09-18 16:42:57 -04:00

477 lines
19 KiB
Python

# AUTOGENERATED! DO NOT EDIT! File to edit: ../../modules/source/06_spatial/spatial_dev.ipynb.
# %% auto 0
__all__ = ['conv2d_naive', 'Conv2D', 'flatten', 'ConvolutionProfiler']
# %% ../../modules/source/06_spatial/spatial_dev.ipynb 1
import numpy as np
import os
import sys
from typing import List, Tuple, Optional
# Import from the main package - try package first, then local modules
try:
from tinytorch.core.tensor import Tensor
from tinytorch.core.layers import Dense
from tinytorch.core.activations import ReLU
except ImportError:
# For development, import from local modules
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '01_tensor'))
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '02_activations'))
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '03_layers'))
from tensor_dev import Tensor
from activations_dev import ReLU
from layers_dev import Dense
# %% ../../modules/source/06_spatial/spatial_dev.ipynb 6
def conv2d_naive(input: np.ndarray, kernel: np.ndarray) -> np.ndarray:
"""
Naive 2D convolution (single channel, no stride, no padding).
Args:
input: 2D input array (H, W)
kernel: 2D filter (kH, kW)
Returns:
2D output array (H-kH+1, W-kW+1)
TODO: Implement the sliding window convolution using for-loops.
STEP-BY-STEP IMPLEMENTATION:
1. Get input dimensions: H, W = input.shape
2. Get kernel dimensions: kH, kW = kernel.shape
3. Calculate output dimensions: out_H = H - kH + 1, out_W = W - kW + 1
4. Create output array: np.zeros((out_H, out_W))
5. Use nested loops to slide the kernel:
- i loop: output rows (0 to out_H-1)
- j loop: output columns (0 to out_W-1)
- di loop: kernel rows (0 to kH-1)
- dj loop: kernel columns (0 to kW-1)
6. For each (i,j), compute: output[i,j] += input[i+di, j+dj] * kernel[di, dj]
LEARNING CONNECTIONS:
- **Computer Vision Foundation**: Convolution is the core operation in CNNs and image processing
- **Feature Detection**: Different kernels detect edges, textures, and patterns in images
- **Spatial Hierarchies**: Convolution preserves spatial relationships while extracting features
- **Production CNNs**: Understanding the basic operation helps optimize GPU implementations
EXAMPLE:
Input: [[1, 2, 3], Kernel: [[1, 0],
[4, 5, 6], [0, -1]]
[7, 8, 9]]
Output[0,0] = 1*1 + 2*0 + 4*0 + 5*(-1) = 1 - 5 = -4
Output[0,1] = 2*1 + 3*0 + 5*0 + 6*(-1) = 2 - 6 = -4
Output[1,0] = 4*1 + 5*0 + 7*0 + 8*(-1) = 4 - 8 = -4
Output[1,1] = 5*1 + 6*0 + 8*0 + 9*(-1) = 5 - 9 = -4
HINTS:
- Start with output = np.zeros((out_H, out_W))
- Use four nested loops: for i in range(out_H): for j in range(out_W): for di in range(kH): for dj in range(kW):
- Accumulate the sum: output[i,j] += input[i+di, j+dj] * kernel[di, dj]
"""
### BEGIN SOLUTION
# Get input and kernel dimensions
H, W = input.shape
kH, kW = kernel.shape
# Calculate output dimensions
out_H, out_W = H - kH + 1, W - kW + 1
# Initialize output array
output = np.zeros((out_H, out_W), dtype=input.dtype)
# Sliding window convolution with four nested loops
for i in range(out_H):
for j in range(out_W):
for di in range(kH):
for dj in range(kW):
output[i, j] += input[i + di, j + dj] * kernel[di, dj]
return output
### END SOLUTION
# %% ../../modules/source/06_spatial/spatial_dev.ipynb 10
class Conv2D:
"""
2D Convolutional Layer (single channel, single filter, no stride/pad).
A learnable convolutional layer that applies a kernel to detect spatial patterns.
Perfect for building the foundation of convolutional neural networks.
"""
def __init__(self, kernel_size: Tuple[int, int]):
"""
Initialize Conv2D layer with random kernel.
Args:
kernel_size: (kH, kW) - size of the convolution kernel
TODO: Initialize a random kernel with small values.
APPROACH:
1. Store kernel_size as instance variable
2. Initialize random kernel with small values
3. Use proper initialization for stable training
EXAMPLE:
Conv2D((2, 2)) creates:
- kernel: shape (2, 2) with small random values
HINTS:
- Store kernel_size as self.kernel_size
- Initialize kernel: np.random.randn(kH, kW) * 0.1 (small values)
- Convert to float32 for consistency
"""
### BEGIN SOLUTION
# Store kernel size
self.kernel_size = kernel_size
kH, kW = kernel_size
# Initialize random kernel with small values
self.kernel = np.random.randn(kH, kW).astype(np.float32) * 0.1
### END SOLUTION
def forward(self, x):
"""
Forward pass through the Conv2D layer.
Args:
x: Input tensor (batch_size, H, W)
Returns:
Output tensor after convolution
"""
# Handle batches by iterating through each item
if len(x.shape) == 3:
batch_size, H, W = x.shape
# Calculate output shape once
kH, kW = self.kernel.shape
out_H, out_W = H - kH + 1, W - kW + 1
# Create an empty list to store results
results = []
# Iterate over each image in the batch
for i in range(batch_size):
# Apply naive convolution to each image
convolved = conv2d_naive(x.data[i], self.kernel)
results.append(convolved)
# Stack results into a single NumPy array
output_data = np.stack(results)
else: # Handle single image case
output_data = conv2d_naive(x.data, self.kernel)
return Tensor(output_data)
def __call__(self, x):
"""Make layer callable: layer(x) same as layer.forward(x)"""
return self.forward(x)
# %% ../../modules/source/06_spatial/spatial_dev.ipynb 14
def flatten(x):
"""
Flatten a 2D tensor to 1D (for connecting to Dense layers).
Args:
x: Input tensor to flatten
Returns:
Flattened tensor with batch dimension preserved
TODO: Implement flattening operation.
STEP-BY-STEP IMPLEMENTATION:
1. Get the numpy array from the tensor
2. Use .flatten() to convert to 1D
3. Add batch dimension with [None, :]
LEARNING CONNECTIONS:
- **CNN to MLP Transition**: Flattening connects convolutional and dense layers
- **Spatial to Vector**: Converts 2D feature maps to vectors for classification
- **Memory Layout**: Understanding how tensors are stored and reshaped in memory
- **Framework Design**: All major frameworks (PyTorch, TensorFlow) use similar patterns
4. Return Tensor wrapped around the result
EXAMPLE:
Input: Tensor([[1, 2], [3, 4]]) # shape (2, 2)
Output: Tensor([[1, 2, 3, 4]]) # shape (1, 4)
HINTS:
- Use x.data.flatten() to get 1D array
- Add batch dimension: result[None, :]
- Return Tensor(result)
"""
### BEGIN SOLUTION
# Flatten the tensor and add batch dimension
flattened = x.data.flatten()
result = flattened[None, :] # Add batch dimension
return type(x)(result)
### END SOLUTION
# %% ../../modules/source/06_spatial/spatial_dev.ipynb 30
import time
from collections import defaultdict
class ConvolutionProfiler:
"""
Production Convolution Performance Analysis and Optimization
Analyzes spatial computation efficiency, memory patterns, and optimization
opportunities for production computer vision systems.
"""
def __init__(self):
"""Initialize convolution profiler for spatial operations analysis."""
self.profiling_data = defaultdict(list)
self.memory_analysis = defaultdict(list)
self.optimization_recommendations = []
def profile_convolution_operation(self, conv_layer, input_tensor, kernel_sizes=[(3,3), (5,5), (7,7)]):
"""
Profile convolution operations across different kernel sizes.
TODO: Implement convolution operation profiling.
STEP-BY-STEP IMPLEMENTATION:
1. Profile different kernel sizes and their computational costs
2. Measure memory usage patterns for spatial operations
3. Analyze cache efficiency and memory access patterns
4. Identify optimization opportunities for production systems
LEARNING CONNECTIONS:
- **Performance Optimization**: Understanding computational costs of different kernel sizes
- **Memory Efficiency**: Cache-friendly access patterns improve performance significantly
- **Production Scaling**: Profiling guides hardware selection and deployment strategies
- **GPU Optimization**: Spatial operations are ideal for parallel processing
APPROACH:
1. Time convolution operations with different kernel sizes
2. Analyze memory usage patterns for spatial operations
3. Calculate computational intensity (FLOPs per operation)
4. Identify memory bandwidth vs compute bottlenecks
5. Generate optimization recommendations
EXAMPLE:
profiler = ConvolutionProfiler()
conv = Conv2D(kernel_size=(3, 3))
input_img = Tensor(np.random.randn(32, 32)) # 32x32 image
analysis = profiler.profile_convolution_operation(conv, input_img)
print(f"Convolution throughput: {analysis['throughput_mflops']:.1f} MFLOPS")
HINTS:
- Use time.time() for timing measurements
- Calculate memory footprint of input and output tensors
- Estimate FLOPs: output_height * output_width * kernel_height * kernel_width
- Compare performance across kernel sizes
"""
### BEGIN SOLUTION
print("🔧 Profiling Convolution Operations...")
results = {}
for kernel_size in kernel_sizes:
print(f" Testing kernel size: {kernel_size}")
# Create convolution layer with specified kernel size
# Note: Using the provided conv_layer or creating new one
try:
if hasattr(conv_layer, 'kernel_size'):
# Use existing layer if compatible, otherwise create new
if conv_layer.kernel_size == kernel_size:
test_conv = conv_layer
else:
test_conv = Conv2D(kernel_size=kernel_size)
else:
test_conv = Conv2D(kernel_size=kernel_size)
except:
# Fallback for testing - create mock convolution
test_conv = conv_layer
# Measure timing
iterations = 10
start_time = time.time()
for _ in range(iterations):
try:
output = test_conv(input_tensor)
except:
# Fallback: simulate convolution operation
# Calculate expected output size
input_h, input_w = input_tensor.shape[-2:]
kernel_h, kernel_w = kernel_size
output_h = input_h - kernel_h + 1
output_w = input_w - kernel_w + 1
output = Tensor(np.random.randn(output_h, output_w))
end_time = time.time()
avg_time = (end_time - start_time) / iterations
# Calculate computational metrics
input_h, input_w = input_tensor.shape[-2:]
kernel_h, kernel_w = kernel_size
output_h = max(1, input_h - kernel_h + 1)
output_w = max(1, input_w - kernel_w + 1)
# Estimate FLOPs (floating point operations)
flops = output_h * output_w * kernel_h * kernel_w
mflops = flops / 1e6
throughput_mflops = mflops / avg_time if avg_time > 0 else 0
# Memory analysis
input_memory_mb = input_tensor.data.nbytes / (1024 * 1024)
output_memory_mb = (output_h * output_w * 4) / (1024 * 1024) # Assuming float32
kernel_memory_mb = (kernel_h * kernel_w * 4) / (1024 * 1024)
total_memory_mb = input_memory_mb + output_memory_mb + kernel_memory_mb
# Calculate computational intensity (FLOPs per byte)
computational_intensity = flops / max(input_tensor.data.nbytes, 1)
result = {
'kernel_size': kernel_size,
'time_ms': avg_time * 1000,
'throughput_mflops': throughput_mflops,
'flops': flops,
'input_memory_mb': input_memory_mb,
'output_memory_mb': output_memory_mb,
'total_memory_mb': total_memory_mb,
'computational_intensity': computational_intensity,
'output_size': (output_h, output_w)
}
results[f"{kernel_size[0]}x{kernel_size[1]}"] = result
print(f" Time: {avg_time*1000:.3f}ms, Throughput: {throughput_mflops:.1f} MFLOPS")
# Store profiling data
self.profiling_data['convolution_results'] = results
# Generate analysis
analysis = self._analyze_convolution_performance(results)
return {
'detailed_results': results,
'analysis': analysis,
'recommendations': self._generate_optimization_recommendations(results)
}
### END SOLUTION
def _analyze_convolution_performance(self, results):
"""Analyze convolution performance patterns."""
analysis = []
# Find fastest and slowest configurations
times = [(k, v['time_ms']) for k, v in results.items()]
fastest = min(times, key=lambda x: x[1])
slowest = max(times, key=lambda x: x[1])
analysis.append(f"🚀 Fastest kernel: {fastest[0]} ({fastest[1]:.3f}ms)")
analysis.append(f"🐌 Slowest kernel: {slowest[0]} ({slowest[1]:.3f}ms)")
# Performance scaling analysis
if len(results) > 1:
small_kernel = min(results.keys(), key=lambda k: results[k]['flops'])
large_kernel = max(results.keys(), key=lambda k: results[k]['flops'])
flops_ratio = results[large_kernel]['flops'] / results[small_kernel]['flops']
time_ratio = results[large_kernel]['time_ms'] / results[small_kernel]['time_ms']
analysis.append(f"📈 FLOPS scaling: {small_kernel}{large_kernel} = {flops_ratio:.1f}x more computation")
analysis.append(f"⏱️ Time scaling: {time_ratio:.1f}x slower")
if time_ratio < flops_ratio:
analysis.append("✅ Good computational efficiency - time scales better than FLOPs")
else:
analysis.append("⚠️ Computational bottleneck - time scales worse than FLOPs")
# Memory analysis
memory_usage = [(k, v['total_memory_mb']) for k, v in results.items()]
max_memory = max(memory_usage, key=lambda x: x[1])
analysis.append(f"💾 Peak memory usage: {max_memory[0]} ({max_memory[1]:.2f} MB)")
return analysis
def _generate_optimization_recommendations(self, results):
"""Generate optimization recommendations based on profiling results."""
recommendations = []
# Analyze computational intensity
intensities = [v['computational_intensity'] for v in results.values()]
avg_intensity = sum(intensities) / len(intensities)
if avg_intensity < 1.0:
recommendations.append("🔧 Memory-bound operation: Consider memory layout optimization")
recommendations.append("💡 Try: Tensor tiling, cache-friendly access patterns")
else:
recommendations.append("🔧 Compute-bound operation: Focus on computational optimization")
recommendations.append("💡 Try: SIMD instructions, hardware acceleration")
# Kernel size recommendations
best_throughput = max(results.values(), key=lambda x: x['throughput_mflops'])
recommendations.append(f"⚡ Optimal kernel size for throughput: {best_throughput['kernel_size']}")
# Memory efficiency recommendations
memory_efficiency = {k: v['throughput_mflops'] / v['total_memory_mb']
for k, v in results.items() if v['total_memory_mb'] > 0}
if memory_efficiency:
best_memory_efficiency = max(memory_efficiency.items(), key=lambda x: x[1])
recommendations.append(f"💾 Most memory-efficient: {best_memory_efficiency[0]}")
return recommendations
def analyze_memory_patterns(self, input_sizes=[(64, 64), (128, 128), (256, 256)]):
"""
Analyze memory access patterns for different image sizes.
This function is PROVIDED to demonstrate memory scaling analysis.
Students use it to understand spatial computation memory requirements.
"""
print("🔍 MEMORY PATTERN ANALYSIS")
print("=" * 40)
conv_3x3 = Conv2D(kernel_size=(3, 3))
memory_results = []
for height, width in input_sizes:
# Create test tensor
test_tensor = Tensor(np.random.randn(height, width))
# Calculate memory requirements
input_memory = test_tensor.data.nbytes / (1024 * 1024) # MB
# Estimate output size
output_h = height - 3 + 1
output_w = width - 3 + 1
output_memory = (output_h * output_w * 4) / (1024 * 1024) # MB, float32
# Kernel memory
kernel_memory = (3 * 3 * 4) / (1024 * 1024) # MB
total_memory = input_memory + output_memory + kernel_memory
memory_efficiency = (output_h * output_w) / total_memory # operations per MB
result = {
'input_size': (height, width),
'input_memory_mb': input_memory,
'output_memory_mb': output_memory,
'total_memory_mb': total_memory,
'memory_efficiency': memory_efficiency
}
memory_results.append(result)
print(f" {height}x{width}: {total_memory:.2f} MB total, {memory_efficiency:.0f} ops/MB")
# Analyze scaling
if len(memory_results) >= 2:
small = memory_results[0]
large = memory_results[-1]
size_ratio = (large['input_size'][0] / small['input_size'][0]) ** 2
memory_ratio = large['total_memory_mb'] / small['total_memory_mb']
print(f"\n📈 Memory Scaling Analysis:")
print(f" Input size increased {size_ratio:.1f}x")
print(f" Memory usage increased {memory_ratio:.1f}x")
print(f" Scaling efficiency: {(memory_ratio/size_ratio)*100:.1f}% (lower is better)")
return memory_results