From f5257aa0425aa399d0b075ebeb9f5eb01a229325 Mon Sep 17 00:00:00 2001 From: Vijay Janapa Reddi Date: Sat, 22 Nov 2025 17:29:20 -0500 Subject: [PATCH] Fix CNN gradient flow with Conv2dBackward and MaxPool2dBackward - Implemented Conv2dBackward class in spatial module for proper gradient computation - Implemented MaxPool2dBackward to route gradients through max pooling - Fixed reshape usage in CNN test to preserve autograd graph - Fixed conv gradient capture timing in test (before zero_grad) - All 6 CNN parameters now receive gradients and update properly - CNN learning verification test now passes with 74% accuracy and 63% loss decrease --- modules/09_spatial/spatial.py | 204 +++++++++++++++++- .../milestones/test_learning_verification.py | 12 +- 2 files changed, 205 insertions(+), 11 deletions(-) diff --git a/modules/09_spatial/spatial.py b/modules/09_spatial/spatial.py index b3b6f303..4dfe8d9a 100644 --- a/modules/09_spatial/spatial.py +++ b/modules/09_spatial/spatial.py @@ -65,6 +65,7 @@ import numpy as np import time from tinytorch.core.tensor import Tensor +from tinytorch.core.autograd import Function # Constants for convolution defaults DEFAULT_KERNEL_SIZE = 3 # Default kernel size for convolutions @@ -297,6 +298,109 @@ This reveals why convolution is expensive: O(B×C_out×H×W×K_h×K_w×C_in) ope #| export +class Conv2dBackward(Function): + """ + Gradient computation for 2D convolution. + + Computes gradients for Conv2d backward pass: + - grad_input: gradient w.r.t. input (for backprop to previous layer) + - grad_weight: gradient w.r.t. filters (for weight updates) + - grad_bias: gradient w.r.t. bias (for bias updates) + + This uses explicit loops to show the gradient computation, matching + the educational approach of the forward pass. + """ + + def __init__(self, x, weight, bias, stride, padding, kernel_size, padded_shape): + # Register all tensors that need gradients with autograd + if bias is not None: + super().__init__(x, weight, bias) + else: + super().__init__(x, weight) + self.x = x + self.weight = weight + self.bias = bias + self.stride = stride + self.padding = padding + self.kernel_size = kernel_size + self.padded_shape = padded_shape + + def apply(self, grad_output): + """ + Compute gradients for convolution inputs and parameters. + + Args: + grad_output: Gradient flowing back from next layer + Shape: (batch_size, out_channels, out_height, out_width) + + Returns: + Tuple of (grad_input, grad_weight, grad_bias) + """ + batch_size, out_channels, out_height, out_width = grad_output.shape + _, in_channels, in_height, in_width = self.x.shape + kernel_h, kernel_w = self.kernel_size + + # Apply padding to input if needed (for gradient computation) + if self.padding > 0: + padded_input = np.pad(self.x.data, + ((0, 0), (0, 0), (self.padding, self.padding), (self.padding, self.padding)), + mode='constant', constant_values=0) + else: + padded_input = self.x.data + + # Initialize gradients + grad_input_padded = np.zeros_like(padded_input) + grad_weight = np.zeros_like(self.weight.data) + grad_bias = None if self.bias is None else np.zeros_like(self.bias.data) + + # Compute gradients using explicit loops (educational approach) + for b in range(batch_size): + for out_ch in range(out_channels): + for out_h in range(out_height): + for out_w in range(out_width): + # Position in input + in_h_start = out_h * self.stride + in_w_start = out_w * self.stride + + # Gradient value flowing back to this position + grad_val = grad_output[b, out_ch, out_h, out_w] + + # Distribute gradient to weight and input + for k_h in range(kernel_h): + for k_w in range(kernel_w): + for in_ch in range(in_channels): + # Input position + in_h = in_h_start + k_h + in_w = in_w_start + k_w + + # Gradient w.r.t. weight + grad_weight[out_ch, in_ch, k_h, k_w] += ( + padded_input[b, in_ch, in_h, in_w] * grad_val + ) + + # Gradient w.r.t. input + grad_input_padded[b, in_ch, in_h, in_w] += ( + self.weight.data[out_ch, in_ch, k_h, k_w] * grad_val + ) + + # Compute gradient w.r.t. bias (sum over batch and spatial dimensions) + if grad_bias is not None: + for out_ch in range(out_channels): + grad_bias[out_ch] = grad_output[:, out_ch, :, :].sum() + + # Remove padding from input gradient + if self.padding > 0: + grad_input = grad_input_padded[:, :, + self.padding:-self.padding, + self.padding:-self.padding] + else: + grad_input = grad_input_padded + + # Return gradients as numpy arrays (autograd system handles storage) + # Following TinyTorch protocol: return (grad_input, grad_weight, grad_bias) + return grad_input, grad_weight, grad_bias + + class Conv2d: """ 2D Convolution layer for spatial feature extraction. @@ -456,11 +560,13 @@ class Conv2d: # Return Tensor with gradient tracking enabled result = Tensor(output, requires_grad=(x.requires_grad or self.weight.requires_grad)) - # Note: This simple implementation uses manual loops and doesn't integrate - # with autograd's computation graph. For full gradient support, Conv2d - # needs a backward() implementation or should use tensor operations that - # autograd tracks automatically. This is left as a future enhancement. - # Current implementation works for inference and demonstrates O(N²M²K²) complexity. + # Attach backward function for gradient computation (following TinyTorch protocol) + if result.requires_grad: + result._grad_fn = Conv2dBackward( + x, self.weight, self.bias, + self.stride, self.padding, self.kernel_size, + padded_input.shape + ) return result ### END SOLUTION @@ -692,6 +798,83 @@ For input (1, 64, 224, 224) with 2×2 pooling: #| export +class MaxPool2dBackward(Function): + """ + Gradient computation for 2D max pooling. + + Max pooling gradients flow only to the positions that were selected + as the maximum in the forward pass. + """ + + def __init__(self, x, output_shape, kernel_size, stride, padding): + super().__init__(x) + self.x = x + self.output_shape = output_shape + self.kernel_size = kernel_size + self.stride = stride + self.padding = padding + # Store max positions for gradient routing + self.max_positions = {} + + def apply(self, grad_output): + """ + Route gradients back to max positions. + + Args: + grad_output: Gradient from next layer + + Returns: + Gradient w.r.t. input + """ + batch_size, channels, in_height, in_width = self.x.shape + _, _, out_height, out_width = self.output_shape + kernel_h, kernel_w = self.kernel_size + + # Apply padding if needed + if self.padding > 0: + padded_input = np.pad(self.x.data, + ((0, 0), (0, 0), (self.padding, self.padding), (self.padding, self.padding)), + mode='constant', constant_values=-np.inf) + grad_input_padded = np.zeros_like(padded_input) + else: + padded_input = self.x.data + grad_input_padded = np.zeros_like(self.x.data) + + # Route gradients to max positions + for b in range(batch_size): + for c in range(channels): + for out_h in range(out_height): + for out_w in range(out_width): + in_h_start = out_h * self.stride + in_w_start = out_w * self.stride + + # Find max position in this window + max_val = -np.inf + max_h, max_w = 0, 0 + for k_h in range(kernel_h): + for k_w in range(kernel_w): + in_h = in_h_start + k_h + in_w = in_w_start + k_w + val = padded_input[b, c, in_h, in_w] + if val > max_val: + max_val = val + max_h, max_w = in_h, in_w + + # Route gradient to max position + grad_input_padded[b, c, max_h, max_w] += grad_output[b, c, out_h, out_w] + + # Remove padding + if self.padding > 0: + grad_input = grad_input_padded[:, :, + self.padding:-self.padding, + self.padding:-self.padding] + else: + grad_input = grad_input_padded + + # Return as tuple (following Function protocol) + return (grad_input,) + + class MaxPool2d: """ 2D Max Pooling layer for spatial dimension reduction. @@ -815,7 +998,16 @@ class MaxPool2d: # Store result output[b, c, out_h, out_w] = max_val - return Tensor(output) + # Return Tensor with gradient tracking + result = Tensor(output, requires_grad=x.requires_grad) + + # Attach backward function for gradient computation + if result.requires_grad: + result._grad_fn = MaxPool2dBackward( + x, output.shape, self.kernel_size, self.stride, self.padding + ) + + return result ### END SOLUTION def parameters(self): diff --git a/tests/milestones/test_learning_verification.py b/tests/milestones/test_learning_verification.py index 83d559e7..671a1f4a 100644 --- a/tests/milestones/test_learning_verification.py +++ b/tests/milestones/test_learning_verification.py @@ -688,9 +688,9 @@ def test_cnn_learning(): x = relu2(x) # No second pooling - would create 0x0! - # Flatten and classify + # Flatten and classify (using Tensor.reshape to preserve autograd) batch_size = x.shape[0] - x = Tensor(x.data.reshape(batch_size, -1)) + x = x.reshape(batch_size, -1) x = fc(x) return x @@ -709,6 +709,7 @@ def test_cnn_learning(): epochs = 15 loss_history = [] test_acc_history = [] + conv_grad_mean = 0.0 # Track conv gradient magnitude console.print("\n🔬 Training CNN on TinyDigits...") @@ -724,9 +725,11 @@ def test_cnn_learning(): # Backward pass loss.backward() - # Check gradients on first batch + # Check gradients on first batch (before zero_grad clears them!) if epoch == 0 and batch_count == 0: grad_stats = check_gradient_flow(params) + # Also capture conv gradient magnitude before it gets zeroed + conv_grad_mean = np.abs(conv1.weight.grad.data).mean() if conv1.weight.grad is not None else 0.0 # Update weights optimizer.step() @@ -779,8 +782,7 @@ def test_cnn_learning(): f"{grad_stats['params_with_grad']}/{grad_stats['total_params']}", "✅ PASS" if grad_stats['params_with_grad'] == grad_stats['total_params'] else "❌ FAIL" ) - # Check convolutional gradients exist - conv_grad_mean = np.abs(conv1.weight.grad.data).mean() if conv1.weight.grad is not None else 0.0 + # Check convolutional gradients exist (captured during training before zero_grad) table.add_row( "Conv Gradients", f"{conv_grad_mean:.6f}",