Files
TinyTorch/tinytorch/core/spatial.py
Vijay Janapa Reddi ee9355584f Fix all module tests after merge - 20/20 passing
Fixes after merge conflicts:
- Fix tensor reshape error message format
- Fix __init__.py imports (remove BatchNorm2d, fix enable_autograd call)
- Fix attention mask broadcasting for multi-head attention
- Fix memoization module to use matmul instead of @ operator
- Fix capstone module count_parameters and CosineSchedule usage
- Add missing imports to benchmark.py (dataclass, Profiler, platform, os)
- Simplify capstone pipeline test to avoid data shape mismatch

All 20 modules now pass tito test --all
2025-12-03 08:14:27 -08:00

765 lines
29 KiB
Python
Generated
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ╔═══════════════════════════════════════════════════════════════════════════════╗
# ║ 🚨 CRITICAL WARNING 🚨 ║
# ║ AUTOGENERATED! DO NOT EDIT! ║
# ║ ║
# ║ This file is AUTOMATICALLY GENERATED from source modules. ║
# ║ ANY CHANGES MADE HERE WILL BE LOST when modules are re-exported! ║
# ║ ║
# ║ ✅ TO EDIT: src/09_spatial/09_spatial.py ║
# ║ ✅ TO EXPORT: Run 'tito module complete <module_name>' ║
# ║ ║
# ║ 🛡️ STUDENT PROTECTION: This file contains optimized implementations. ║
# ║ Editing it directly may break module functionality and training. ║
# ║ ║
# ║ 🎓 LEARNING TIP: Work in src/ (developers) or modules/ (learners) ║
# ║ The tinytorch/ directory is generated code - edit source files instead! ║
# ╚═══════════════════════════════════════════════════════════════════════════════╝
# %% auto 0
__all__ = ['DEFAULT_KERNEL_SIZE', 'DEFAULT_STRIDE', 'DEFAULT_PADDING', 'Conv2dBackward', 'Conv2d', 'MaxPool2dBackward',
'MaxPool2d', 'AvgPool2d', 'SimpleCNN']
# %% ../../modules/09_spatial/spatial.ipynb 1
import numpy as np
import time
from .tensor import Tensor
from .autograd import Function
# Constants for convolution defaults
DEFAULT_KERNEL_SIZE = 3 # Default kernel size for convolutions
DEFAULT_STRIDE = 1 # Default stride for convolutions
DEFAULT_PADDING = 0 # Default padding for convolutions
# %% ../../modules/09_spatial/spatial.ipynb 6
class Conv2dBackward(Function):
"""
Gradient computation for 2D convolution.
Computes gradients for Conv2d backward pass:
- grad_input: gradient w.r.t. input (for backprop to previous layer)
- grad_weight: gradient w.r.t. filters (for weight updates)
- grad_bias: gradient w.r.t. bias (for bias updates)
This uses explicit loops to show the gradient computation, matching
the educational approach of the forward pass.
"""
def __init__(self, x, weight, bias, stride, padding, kernel_size, padded_shape):
# Register all tensors that need gradients with autograd
if bias is not None:
super().__init__(x, weight, bias)
else:
super().__init__(x, weight)
self.x = x
self.weight = weight
self.bias = bias
self.stride = stride
self.padding = padding
self.kernel_size = kernel_size
self.padded_shape = padded_shape
def apply(self, grad_output):
"""
Compute gradients for convolution inputs and parameters.
Args:
grad_output: Gradient flowing back from next layer
Shape: (batch_size, out_channels, out_height, out_width)
Returns:
Tuple of (grad_input, grad_weight, grad_bias)
"""
batch_size, out_channels, out_height, out_width = grad_output.shape
_, in_channels, in_height, in_width = self.x.shape
kernel_h, kernel_w = self.kernel_size
# Apply padding to input if needed (for gradient computation)
if self.padding > 0:
padded_input = np.pad(self.x.data,
((0, 0), (0, 0), (self.padding, self.padding), (self.padding, self.padding)),
mode='constant', constant_values=0)
else:
padded_input = self.x.data
# Initialize gradients
grad_input_padded = np.zeros_like(padded_input)
grad_weight = np.zeros_like(self.weight.data)
grad_bias = None if self.bias is None else np.zeros_like(self.bias.data)
# Compute gradients using explicit loops (educational approach)
for b in range(batch_size):
for out_ch in range(out_channels):
for out_h in range(out_height):
for out_w in range(out_width):
# Position in input
in_h_start = out_h * self.stride
in_w_start = out_w * self.stride
# Gradient value flowing back to this position
grad_val = grad_output[b, out_ch, out_h, out_w]
# Distribute gradient to weight and input
for k_h in range(kernel_h):
for k_w in range(kernel_w):
for in_ch in range(in_channels):
# Input position
in_h = in_h_start + k_h
in_w = in_w_start + k_w
# Gradient w.r.t. weight
grad_weight[out_ch, in_ch, k_h, k_w] += (
padded_input[b, in_ch, in_h, in_w] * grad_val
)
# Gradient w.r.t. input
grad_input_padded[b, in_ch, in_h, in_w] += (
self.weight.data[out_ch, in_ch, k_h, k_w] * grad_val
)
# Compute gradient w.r.t. bias (sum over batch and spatial dimensions)
if grad_bias is not None:
for out_ch in range(out_channels):
grad_bias[out_ch] = grad_output[:, out_ch, :, :].sum()
# Remove padding from input gradient
if self.padding > 0:
grad_input = grad_input_padded[:, :,
self.padding:-self.padding,
self.padding:-self.padding]
else:
grad_input = grad_input_padded
# Return gradients as numpy arrays (autograd system handles storage)
# Following TinyTorch protocol: return (grad_input, grad_weight, grad_bias)
return grad_input, grad_weight, grad_bias
class Conv2d:
"""
2D Convolution layer for spatial feature extraction.
Implements convolution with explicit loops to demonstrate
computational complexity and memory access patterns.
Args:
in_channels: Number of input channels
out_channels: Number of output feature maps
kernel_size: Size of convolution kernel (int or tuple)
stride: Stride of convolution (default: 1)
padding: Zero-padding added to input (default: 0)
bias: Whether to add learnable bias (default: True)
"""
def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, bias=True):
"""
Initialize Conv2d layer with proper weight initialization.
TODO: Complete Conv2d initialization
APPROACH:
1. Store hyperparameters (channels, kernel_size, stride, padding)
2. Initialize weights using He initialization for ReLU compatibility
3. Initialize bias (if enabled) to zeros
4. Use proper shapes: weight (out_channels, in_channels, kernel_h, kernel_w)
WEIGHT INITIALIZATION:
- He init: std = sqrt(2 / (in_channels * kernel_h * kernel_w))
- This prevents vanishing/exploding gradients with ReLU
HINT: Convert kernel_size to tuple if it's an integer
"""
super().__init__()
### BEGIN SOLUTION
self.in_channels = in_channels
self.out_channels = out_channels
# Handle kernel_size as int or tuple
if isinstance(kernel_size, int):
self.kernel_size = (kernel_size, kernel_size)
else:
self.kernel_size = kernel_size
self.stride = stride
self.padding = padding
# He initialization for ReLU networks
kernel_h, kernel_w = self.kernel_size
fan_in = in_channels * kernel_h * kernel_w
std = np.sqrt(2.0 / fan_in)
# Weight shape: (out_channels, in_channels, kernel_h, kernel_w)
self.weight = Tensor(np.random.normal(0, std,
(out_channels, in_channels, kernel_h, kernel_w)),
requires_grad=True)
# Bias initialization
if bias:
self.bias = Tensor(np.zeros(out_channels), requires_grad=True)
else:
self.bias = None
### END SOLUTION
def forward(self, x):
"""
Forward pass through Conv2d layer.
TODO: Implement convolution with explicit loops
APPROACH:
1. Extract input dimensions and validate
2. Calculate output dimensions
3. Apply padding if needed
4. Implement 6 nested loops for full convolution
5. Add bias if present
LOOP STRUCTURE:
for batch in range(batch_size):
for out_ch in range(out_channels):
for out_h in range(out_height):
for out_w in range(out_width):
for k_h in range(kernel_height):
for k_w in range(kernel_width):
for in_ch in range(in_channels):
# Accumulate: out += input * weight
EXAMPLE:
>>> conv = Conv2d(3, 16, kernel_size=3, padding=1)
>>> x = Tensor(np.random.randn(2, 3, 32, 32)) # batch=2, RGB, 32x32
>>> out = conv(x)
>>> print(out.shape) # Should be (2, 16, 32, 32)
HINTS:
- Handle padding by creating padded input array
- Watch array bounds in inner loops
- Accumulate products for each output position
"""
### BEGIN SOLUTION
# Input validation and shape extraction
if len(x.shape) != 4:
raise ValueError(f"Expected 4D input (batch, channels, height, width), got {x.shape}")
batch_size, in_channels, in_height, in_width = x.shape
out_channels = self.out_channels
kernel_h, kernel_w = self.kernel_size
# Calculate output dimensions
out_height = (in_height + 2 * self.padding - kernel_h) // self.stride + 1
out_width = (in_width + 2 * self.padding - kernel_w) // self.stride + 1
# Apply padding if needed
if self.padding > 0:
padded_input = np.pad(x.data,
((0, 0), (0, 0), (self.padding, self.padding), (self.padding, self.padding)),
mode='constant', constant_values=0)
else:
padded_input = x.data
# Initialize output
output = np.zeros((batch_size, out_channels, out_height, out_width))
# Explicit 6-nested loop convolution to show complexity
for b in range(batch_size):
for out_ch in range(out_channels):
for out_h in range(out_height):
for out_w in range(out_width):
# Calculate input region for this output position
in_h_start = out_h * self.stride
in_w_start = out_w * self.stride
# Accumulate convolution result
conv_sum = 0.0
for k_h in range(kernel_h):
for k_w in range(kernel_w):
for in_ch in range(in_channels):
# Get input and weight values
input_val = padded_input[b, in_ch,
in_h_start + k_h,
in_w_start + k_w]
weight_val = self.weight.data[out_ch, in_ch, k_h, k_w]
# Accumulate
conv_sum += input_val * weight_val
# Store result
output[b, out_ch, out_h, out_w] = conv_sum
# Add bias if present
if self.bias is not None:
# Broadcast bias across spatial dimensions
for out_ch in range(out_channels):
output[:, out_ch, :, :] += self.bias.data[out_ch]
# Return Tensor with gradient tracking enabled
result = Tensor(output, requires_grad=(x.requires_grad or self.weight.requires_grad))
# Attach backward function for gradient computation (following TinyTorch protocol)
if result.requires_grad:
result._grad_fn = Conv2dBackward(
x, self.weight, self.bias,
self.stride, self.padding, self.kernel_size,
padded_input.shape
)
return result
### END SOLUTION
def parameters(self):
"""Return trainable parameters."""
params = [self.weight]
if self.bias is not None:
params.append(self.bias)
return params
def __call__(self, x):
"""Enable model(x) syntax."""
return self.forward(x)
# %% ../../modules/09_spatial/spatial.ipynb 11
class MaxPool2dBackward(Function):
"""
Gradient computation for 2D max pooling.
Max pooling gradients flow only to the positions that were selected
as the maximum in the forward pass.
"""
def __init__(self, x, output_shape, kernel_size, stride, padding):
super().__init__(x)
self.x = x
self.output_shape = output_shape
self.kernel_size = kernel_size
self.stride = stride
self.padding = padding
# Store max positions for gradient routing
self.max_positions = {}
def apply(self, grad_output):
"""
Route gradients back to max positions.
Args:
grad_output: Gradient from next layer
Returns:
Gradient w.r.t. input
"""
batch_size, channels, in_height, in_width = self.x.shape
_, _, out_height, out_width = self.output_shape
kernel_h, kernel_w = self.kernel_size
# Apply padding if needed
if self.padding > 0:
padded_input = np.pad(self.x.data,
((0, 0), (0, 0), (self.padding, self.padding), (self.padding, self.padding)),
mode='constant', constant_values=-np.inf)
grad_input_padded = np.zeros_like(padded_input)
else:
padded_input = self.x.data
grad_input_padded = np.zeros_like(self.x.data)
# Route gradients to max positions
for b in range(batch_size):
for c in range(channels):
for out_h in range(out_height):
for out_w in range(out_width):
in_h_start = out_h * self.stride
in_w_start = out_w * self.stride
# Find max position in this window
max_val = -np.inf
max_h, max_w = 0, 0
for k_h in range(kernel_h):
for k_w in range(kernel_w):
in_h = in_h_start + k_h
in_w = in_w_start + k_w
val = padded_input[b, c, in_h, in_w]
if val > max_val:
max_val = val
max_h, max_w = in_h, in_w
# Route gradient to max position
grad_input_padded[b, c, max_h, max_w] += grad_output[b, c, out_h, out_w]
# Remove padding
if self.padding > 0:
grad_input = grad_input_padded[:, :,
self.padding:-self.padding,
self.padding:-self.padding]
else:
grad_input = grad_input_padded
# Return as tuple (following Function protocol)
return (grad_input,)
class MaxPool2d:
"""
2D Max Pooling layer for spatial dimension reduction.
Applies maximum operation over spatial windows, preserving
the strongest activations while reducing computational load.
Args:
kernel_size: Size of pooling window (int or tuple)
stride: Stride of pooling operation (default: same as kernel_size)
padding: Zero-padding added to input (default: 0)
"""
def __init__(self, kernel_size, stride=None, padding=0):
"""
Initialize MaxPool2d layer.
TODO: Store pooling parameters
APPROACH:
1. Convert kernel_size to tuple if needed
2. Set stride to kernel_size if not provided (non-overlapping)
3. Store padding parameter
HINT: Default stride equals kernel_size for non-overlapping windows
"""
super().__init__()
### BEGIN SOLUTION
# Handle kernel_size as int or tuple
if isinstance(kernel_size, int):
self.kernel_size = (kernel_size, kernel_size)
else:
self.kernel_size = kernel_size
# Default stride equals kernel_size (non-overlapping)
if stride is None:
self.stride = self.kernel_size[0]
else:
self.stride = stride
self.padding = padding
### END SOLUTION
def forward(self, x):
"""
Forward pass through MaxPool2d layer.
TODO: Implement max pooling with explicit loops
APPROACH:
1. Extract input dimensions
2. Calculate output dimensions
3. Apply padding if needed
4. Implement nested loops for pooling windows
5. Find maximum value in each window
LOOP STRUCTURE:
for batch in range(batch_size):
for channel in range(channels):
for out_h in range(out_height):
for out_w in range(out_width):
# Find max in window [in_h:in_h+k_h, in_w:in_w+k_w]
max_val = -infinity
for k_h in range(kernel_height):
for k_w in range(kernel_width):
max_val = max(max_val, input[...])
EXAMPLE:
>>> pool = MaxPool2d(kernel_size=2, stride=2)
>>> x = Tensor(np.random.randn(1, 3, 8, 8))
>>> out = pool(x)
>>> print(out.shape) # Should be (1, 3, 4, 4)
HINTS:
- Initialize max_val to negative infinity
- Handle stride correctly when accessing input
- No parameters to update (pooling has no weights)
"""
### BEGIN SOLUTION
# Input validation and shape extraction
if len(x.shape) != 4:
raise ValueError(f"Expected 4D input (batch, channels, height, width), got {x.shape}")
batch_size, channels, in_height, in_width = x.shape
kernel_h, kernel_w = self.kernel_size
# Calculate output dimensions
out_height = (in_height + 2 * self.padding - kernel_h) // self.stride + 1
out_width = (in_width + 2 * self.padding - kernel_w) // self.stride + 1
# Apply padding if needed
if self.padding > 0:
padded_input = np.pad(x.data,
((0, 0), (0, 0), (self.padding, self.padding), (self.padding, self.padding)),
mode='constant', constant_values=-np.inf)
else:
padded_input = x.data
# Initialize output
output = np.zeros((batch_size, channels, out_height, out_width))
# Explicit nested loop max pooling
for b in range(batch_size):
for c in range(channels):
for out_h in range(out_height):
for out_w in range(out_width):
# Calculate input region for this output position
in_h_start = out_h * self.stride
in_w_start = out_w * self.stride
# Find maximum in window
max_val = -np.inf
for k_h in range(kernel_h):
for k_w in range(kernel_w):
input_val = padded_input[b, c,
in_h_start + k_h,
in_w_start + k_w]
max_val = max(max_val, input_val)
# Store result
output[b, c, out_h, out_w] = max_val
# Return Tensor with gradient tracking
result = Tensor(output, requires_grad=x.requires_grad)
# Attach backward function for gradient computation
if result.requires_grad:
result._grad_fn = MaxPool2dBackward(
x, output.shape, self.kernel_size, self.stride, self.padding
)
return result
### END SOLUTION
def parameters(self):
"""Return empty list (pooling has no parameters)."""
return []
def __call__(self, x):
"""Enable model(x) syntax."""
return self.forward(x)
# %% ../../modules/09_spatial/spatial.ipynb 13
class AvgPool2d:
"""
2D Average Pooling layer for spatial dimension reduction.
Applies average operation over spatial windows, smoothing
features while reducing computational load.
Args:
kernel_size: Size of pooling window (int or tuple)
stride: Stride of pooling operation (default: same as kernel_size)
padding: Zero-padding added to input (default: 0)
"""
def __init__(self, kernel_size, stride=None, padding=0):
"""
Initialize AvgPool2d layer.
TODO: Store pooling parameters (same as MaxPool2d)
APPROACH:
1. Convert kernel_size to tuple if needed
2. Set stride to kernel_size if not provided
3. Store padding parameter
"""
super().__init__()
### BEGIN SOLUTION
# Handle kernel_size as int or tuple
if isinstance(kernel_size, int):
self.kernel_size = (kernel_size, kernel_size)
else:
self.kernel_size = kernel_size
# Default stride equals kernel_size (non-overlapping)
if stride is None:
self.stride = self.kernel_size[0]
else:
self.stride = stride
self.padding = padding
### END SOLUTION
def forward(self, x):
"""
Forward pass through AvgPool2d layer.
TODO: Implement average pooling with explicit loops
APPROACH:
1. Similar structure to MaxPool2d
2. Instead of max, compute average of window
3. Divide sum by window area for true average
LOOP STRUCTURE:
for batch in range(batch_size):
for channel in range(channels):
for out_h in range(out_height):
for out_w in range(out_width):
# Compute average in window
window_sum = 0
for k_h in range(kernel_height):
for k_w in range(kernel_width):
window_sum += input[...]
avg_val = window_sum / (kernel_height * kernel_width)
HINT: Remember to divide by window area to get true average
"""
### BEGIN SOLUTION
# Input validation and shape extraction
if len(x.shape) != 4:
raise ValueError(f"Expected 4D input (batch, channels, height, width), got {x.shape}")
batch_size, channels, in_height, in_width = x.shape
kernel_h, kernel_w = self.kernel_size
# Calculate output dimensions
out_height = (in_height + 2 * self.padding - kernel_h) // self.stride + 1
out_width = (in_width + 2 * self.padding - kernel_w) // self.stride + 1
# Apply padding if needed
if self.padding > 0:
padded_input = np.pad(x.data,
((0, 0), (0, 0), (self.padding, self.padding), (self.padding, self.padding)),
mode='constant', constant_values=0)
else:
padded_input = x.data
# Initialize output
output = np.zeros((batch_size, channels, out_height, out_width))
# Explicit nested loop average pooling
for b in range(batch_size):
for c in range(channels):
for out_h in range(out_height):
for out_w in range(out_width):
# Calculate input region for this output position
in_h_start = out_h * self.stride
in_w_start = out_w * self.stride
# Compute sum in window
window_sum = 0.0
for k_h in range(kernel_h):
for k_w in range(kernel_w):
input_val = padded_input[b, c,
in_h_start + k_h,
in_w_start + k_w]
window_sum += input_val
# Compute average
avg_val = window_sum / (kernel_h * kernel_w)
# Store result
output[b, c, out_h, out_w] = avg_val
return Tensor(output)
### END SOLUTION
def parameters(self):
"""Return empty list (pooling has no parameters)."""
return []
def __call__(self, x):
"""Enable model(x) syntax."""
return self.forward(x)
# %% ../../modules/09_spatial/spatial.ipynb 21
class SimpleCNN:
"""
Simple CNN demonstrating spatial operations integration.
Architecture:
- Conv2d(3→16, 3×3) + ReLU + MaxPool(2×2)
- Conv2d(16→32, 3×3) + ReLU + MaxPool(2×2)
- Flatten + Linear(features→num_classes)
"""
def __init__(self, num_classes=10):
"""
Initialize SimpleCNN.
TODO: Build CNN architecture with spatial and dense layers
APPROACH:
1. Conv layer 1: 3 → 16 channels, 3×3 kernel, padding=1
2. Pool layer 1: 2×2 max pooling
3. Conv layer 2: 16 → 32 channels, 3×3 kernel, padding=1
4. Pool layer 2: 2×2 max pooling
5. Calculate flattened size and add final linear layer
HINT: For 32×32 input → 32→16→8→4 spatial reduction
Final feature size: 32 channels × 4×4 = 512 features
"""
super().__init__()
### BEGIN SOLUTION
# Convolutional layers
self.conv1 = Conv2d(in_channels=3, out_channels=16, kernel_size=3, padding=1)
self.pool1 = MaxPool2d(kernel_size=2, stride=2)
self.conv2 = Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1)
self.pool2 = MaxPool2d(kernel_size=2, stride=2)
# Calculate flattened size
# Input: 32×32 → Conv1+Pool1: 16×16 → Conv2+Pool2: 8×8
# Wait, let's recalculate: 32×32 → Pool1: 16×16 → Pool2: 8×8
# Final: 32 channels × 8×8 = 2048 features
self.flattened_size = 32 * 8 * 8
# Import Linear layer (we'll implement a simple version)
# For now, we'll use a placeholder that we can replace
# This represents the final classification layer
self.num_classes = num_classes
self.flattened_size = 32 * 8 * 8 # Will be used when we add Linear layer
### END SOLUTION
def forward(self, x):
"""
Forward pass through SimpleCNN.
TODO: Implement CNN forward pass
APPROACH:
1. Apply conv1 → ReLU → pool1
2. Apply conv2 → ReLU → pool2
3. Flatten spatial dimensions
4. Apply final linear layer (when available)
For now, return features before final linear layer
since we haven't imported Linear from layers module yet.
"""
### BEGIN SOLUTION
# First conv block
x = self.conv1(x)
x = self.relu(x) # ReLU activation
x = self.pool1(x)
# Second conv block
x = self.conv2(x)
x = self.relu(x) # ReLU activation
x = self.pool2(x)
# Flatten for classification (reshape to 2D)
batch_size = x.shape[0]
x_flat = x.data.reshape(batch_size, -1)
# Return flattened features
# In a complete implementation, this would go through a Linear layer
return Tensor(x_flat)
### END SOLUTION
def relu(self, x):
"""Simple ReLU implementation for CNN."""
return Tensor(np.maximum(0, x.data))
def parameters(self):
"""Return all trainable parameters."""
params = []
params.extend(self.conv1.parameters())
params.extend(self.conv2.parameters())
# Linear layer parameters would be added here
return params
def __call__(self, x):
"""Enable model(x) syntax."""
return self.forward(x)