Files
TinyTorch/tests/test_gradient_flow.py
Vijay Janapa Reddi 90581b23c0 Update test suite for module restructuring
Updated test imports and paths after modules/source/ removal:
- Progressive integration tests for modules 03, 06, 08, 13, 14
- Checkpoint integration tests
- Module completion orchestrator
- Optimizer integration tests
- Gradient flow regression tests

Updated test documentation:
- tests/README.md with new module paths
- tests/TEST_STRATEGY.md with restructuring notes

All tests now reference modules/XX_name/ instead of modules/source/.
2025-11-10 19:42:23 -05:00

437 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Comprehensive Gradient Flow Tests for TinyTorch
================================================
Tests that gradients flow correctly through:
1. Simple networks (single layer)
2. Multi-layer networks (MLP)
3. Convolutional networks (CNN)
4. Attention mechanisms
5. Complete training loops
This ensures backpropagation works correctly end-to-end.
"""
import sys
import os
import numpy as np
# Add project root to path
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, project_root)
from tinytorch.core.tensor import Tensor
from tinytorch.core.layers import Linear, Dropout
from tinytorch.core.activations import ReLU, Sigmoid, Softmax
from tinytorch.core.losses import MSELoss, BinaryCrossEntropyLoss, CrossEntropyLoss
from tinytorch.core.optimizers import SGD, Adam
from tinytorch.core.spatial import Conv2d, MaxPool2d
from tinytorch.core.autograd import enable_autograd
# Enable autograd
enable_autograd()
def test_simple_linear_gradient_flow():
"""Test gradients flow through a single linear layer"""
print("\n" + "="*70)
print("TEST 1: Simple Linear Layer Gradient Flow")
print("="*70)
# Create simple network: Linear(2->1)
layer = Linear(2, 1)
# Input
x = Tensor([[1.0, 2.0]], requires_grad=True)
target = Tensor([[3.0]])
# Forward pass
output = layer.forward(x)
# Loss
loss_fn = MSELoss()
loss = loss_fn.forward(output, target)
print(f"Initial loss: {float(loss.data):.4f}")
print(f"Initial weight shape: {layer.weight.shape}")
print(f"Initial bias shape: {layer.bias.shape}")
# Backward pass
loss.backward()
# Check gradients exist
assert layer.weight.grad is not None, "Weight gradient is None!"
assert layer.bias.grad is not None, "Bias gradient is None!"
assert x.grad is not None, "Input gradient is None!"
# Check gradients are non-zero
weight_grad_norm = np.linalg.norm(layer.weight.grad.data)
bias_grad_norm = np.linalg.norm(layer.bias.grad.data)
input_grad_norm = np.linalg.norm(x.grad.data)
print(f"\n✓ Weight gradient norm: {weight_grad_norm:.6f}")
print(f"✓ Bias gradient norm: {bias_grad_norm:.6f}")
print(f"✓ Input gradient norm: {input_grad_norm:.6f}")
assert weight_grad_norm > 1e-6, f"Weight gradients too small: {weight_grad_norm}"
assert bias_grad_norm > 1e-6, f"Bias gradients too small: {bias_grad_norm}"
assert input_grad_norm > 1e-6, f"Input gradients too small: {input_grad_norm}"
print("\n✅ TEST PASSED: Gradients flow correctly through linear layer")
return True
def test_mlp_gradient_flow():
"""Test gradients flow through multi-layer perceptron"""
print("\n" + "="*70)
print("TEST 2: Multi-Layer Perceptron Gradient Flow")
print("="*70)
# Create MLP: Input(4) -> Linear(4->8) -> ReLU -> Linear(8->2)
layer1 = Linear(4, 8)
activation = ReLU()
layer2 = Linear(8, 2)
# Input and target
x = Tensor(np.random.randn(3, 4), requires_grad=True)
target = Tensor(np.array([[1, 0], [0, 1], [1, 0]]))
print(f"Input shape: {x.shape}")
print(f"Target shape: {target.shape}")
# Forward pass
h1 = layer1.forward(x)
h1_activated = activation.forward(h1)
output = layer2.forward(h1_activated)
print(f"Hidden layer shape: {h1.shape}")
print(f"Output shape: {output.shape}")
# Loss
loss_fn = MSELoss()
loss = loss_fn.forward(output, target)
print(f"Initial loss: {float(loss.data):.4f}")
# Backward pass
loss.backward()
# Check all layer gradients exist
assert layer1.weight.grad is not None, "Layer1 weight gradient is None!"
assert layer1.bias.grad is not None, "Layer1 bias gradient is None!"
assert layer2.weight.grad is not None, "Layer2 weight gradient is None!"
assert layer2.bias.grad is not None, "Layer2 bias gradient is None!"
# Check gradient magnitudes
l1_weight_norm = np.linalg.norm(layer1.weight.grad.data)
l1_bias_norm = np.linalg.norm(layer1.bias.grad.data)
l2_weight_norm = np.linalg.norm(layer2.weight.grad.data)
l2_bias_norm = np.linalg.norm(layer2.bias.grad.data)
print(f"\n✓ Layer1 weight gradient norm: {l1_weight_norm:.6f}")
print(f"✓ Layer1 bias gradient norm: {l1_bias_norm:.6f}")
print(f"✓ Layer2 weight gradient norm: {l2_weight_norm:.6f}")
print(f"✓ Layer2 bias gradient norm: {l2_bias_norm:.6f}")
assert l1_weight_norm > 1e-6, "Layer1 weight gradients too small"
assert l1_bias_norm > 1e-6, "Layer1 bias gradients too small"
assert l2_weight_norm > 1e-6, "Layer2 weight gradients too small"
assert l2_bias_norm > 1e-6, "Layer2 bias gradients too small"
print("\n✅ TEST PASSED: Gradients flow correctly through MLP")
return True
def test_mlp_training_updates():
"""Test that MLP actually learns (loss decreases)"""
print("\n" + "="*70)
print("TEST 3: MLP Training - Loss Reduction")
print("="*70)
# Create simple MLP
layer1 = Linear(2, 4)
activation = ReLU()
layer2 = Linear(4, 1)
# Simple dataset (XOR-like)
X = Tensor(np.array([[0.0, 0.0], [0.0, 1.0], [1.0, 0.0], [1.0, 1.0]]), requires_grad=False)
y = Tensor(np.array([[0.0], [1.0], [1.0], [0.0]]))
# Optimizer
optimizer = SGD([layer1.weight, layer1.bias, layer2.weight, layer2.bias], lr=0.1)
loss_fn = MSELoss()
losses = []
print("Training for 50 epochs...")
for epoch in range(50):
# Forward
h1 = layer1.forward(X)
h1_act = activation.forward(h1)
output = layer2.forward(h1_act)
# Loss
loss = loss_fn.forward(output, y)
losses.append(float(loss.data))
# Backward
optimizer.zero_grad()
loss.backward()
# Update
optimizer.step()
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1:2d}: Loss = {float(loss.data):.6f}")
# Check loss decreased
initial_loss = losses[0]
final_loss = losses[-1]
reduction = initial_loss - final_loss
reduction_pct = (reduction / initial_loss) * 100
print(f"\n✓ Initial loss: {initial_loss:.6f}")
print(f"✓ Final loss: {final_loss:.6f}")
print(f"✓ Reduction: {reduction:.6f} ({reduction_pct:.1f}%)")
assert final_loss < initial_loss, f"Loss didn't decrease! Initial: {initial_loss}, Final: {final_loss}"
assert reduction_pct > 10, f"Loss reduction too small: {reduction_pct:.1f}%"
print("\n✅ TEST PASSED: MLP learns successfully (loss decreases)")
return True
def test_cnn_gradient_flow():
"""Test gradients flow through convolutional layers"""
print("\n" + "="*70)
print("TEST 4: CNN Gradient Flow")
print("="*70)
# Create simple CNN: Conv2d -> ReLU -> Linear
conv = Conv2d(in_channels=1, out_channels=4, kernel_size=3, stride=1, padding=0)
activation = ReLU()
# Input: batch=2, channels=1, height=8, width=8
x = Tensor(np.random.randn(2, 1, 8, 8), requires_grad=True)
print(f"Input shape: {x.shape}")
print(f"Conv weight shape: {conv.weight.shape}")
# Forward through conv
conv_out = conv.forward(x)
print(f"Conv output shape: {conv_out.shape}")
activated = activation.forward(conv_out)
# Flatten for linear layer
batch_size = activated.shape[0]
flattened_size = np.prod(activated.shape[1:])
# Use reshape method to maintain gradient flow
flattened = activated.reshape(batch_size, flattened_size)
linear = Linear(flattened_size, 2)
output = linear.forward(flattened)
print(f"Flattened shape: {flattened.shape}")
print(f"Output shape: {output.shape}")
# Loss
target = Tensor(np.array([[1, 0], [0, 1]]))
loss_fn = MSELoss()
loss = loss_fn.forward(output, target)
print(f"Initial loss: {float(loss.data):.4f}")
# Backward
loss.backward()
# Check gradients
assert conv.weight.grad is not None, "Conv weight gradient is None!"
assert conv.bias.grad is not None, "Conv bias gradient is None!"
assert linear.weight.grad is not None, "Linear weight gradient is None!"
weight_grad_norm = np.linalg.norm(conv.weight.grad.data)
conv_bias_norm = np.linalg.norm(conv.bias.grad.data)
linear_grad_norm = np.linalg.norm(linear.weight.grad.data)
print(f"\n✓ Conv weight gradient norm: {weight_grad_norm:.6f}")
print(f"✓ Conv bias gradient norm: {conv_bias_norm:.6f}")
print(f"✓ Linear weight gradient norm: {linear_grad_norm:.6f}")
assert weight_grad_norm > 1e-6, f"Conv weight gradients too small: {weight_grad_norm}"
assert conv_bias_norm > 1e-6, f"Conv bias gradients too small: {conv_bias_norm}"
assert linear_grad_norm > 1e-6, f"Linear gradients too small: {linear_grad_norm}"
print("\n✅ TEST PASSED: Gradients flow correctly through CNN")
return True
def test_cnn_training_updates():
"""Test that CNN actually learns on simple data"""
print("\n" + "="*70)
print("TEST 5: CNN Training - Loss Reduction")
print("="*70)
# Simple CNN
conv = Conv2d(1, 2, kernel_size=3, stride=1, padding=1)
activation = ReLU()
# Simple data: 4 samples, 1 channel, 4x4 images
X = Tensor(np.random.randn(4, 1, 4, 4), requires_grad=False)
# After conv: (4, 2, 4, 4) -> flatten to (4, 32)
conv_out_size = 2 * 4 * 4 # channels * height * width
linear = Linear(conv_out_size, 2)
y = Tensor(np.array([[1, 0], [0, 1], [1, 0], [0, 1]]))
# Get parameters with gradients
params = []
for p in [conv.weight, conv.bias, linear.weight, linear.bias]:
if not p.requires_grad:
p.requires_grad = True
params.append(p)
# Optimizer
optimizer = SGD(params, lr=0.01)
loss_fn = MSELoss()
losses = []
print("Training for 30 epochs...")
for epoch in range(30):
# Forward
conv_out = conv.forward(X)
activated = activation.forward(conv_out)
# Flatten using reshape to maintain gradients
batch_size = activated.shape[0]
flattened = activated.reshape(batch_size, -1)
output = linear.forward(flattened)
# Loss
loss = loss_fn.forward(output, y)
losses.append(float(loss.data))
# Backward
optimizer.zero_grad()
loss.backward()
# Update
optimizer.step()
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1:2d}: Loss = {float(loss.data):.6f}")
# Check loss decreased
initial_loss = losses[0]
final_loss = losses[-1]
reduction = initial_loss - final_loss
reduction_pct = (reduction / initial_loss) * 100
print(f"\n✓ Initial loss: {initial_loss:.6f}")
print(f"✓ Final loss: {final_loss:.6f}")
print(f"✓ Reduction: {reduction:.6f} ({reduction_pct:.1f}%)")
assert final_loss < initial_loss, f"Loss didn't decrease! Initial: {initial_loss}, Final: {final_loss}"
print("\n✅ TEST PASSED: CNN learns successfully (loss decreases)")
return True
def test_gradient_accumulation():
"""Test that gradients accumulate correctly across batches"""
print("\n" + "="*70)
print("TEST 6: Gradient Accumulation")
print("="*70)
layer = Linear(2, 1)
# Two batches
x1 = Tensor([[1.0, 2.0]], requires_grad=True)
x2 = Tensor([[3.0, 4.0]], requires_grad=True)
target = Tensor([[1.0]])
loss_fn = MSELoss()
# Forward + backward on first batch (don't zero grad)
out1 = layer.forward(x1)
loss1 = loss_fn.forward(out1, target)
loss1.backward()
grad_after_first = np.array(layer.weight.grad.data)
# Forward + backward on second batch (gradients should accumulate)
out2 = layer.forward(x2)
loss2 = loss_fn.forward(out2, target)
loss2.backward()
grad_after_second = layer.weight.grad.data
# Gradients should have accumulated (not been replaced)
grad_diff = np.linalg.norm(grad_after_second - grad_after_first)
print(f"✓ Gradient after first batch norm: {np.linalg.norm(grad_after_first):.6f}")
print(f"✓ Gradient after second batch norm: {np.linalg.norm(grad_after_second):.6f}")
print(f"✓ Difference: {grad_diff:.6f}")
assert grad_diff > 1e-6, "Gradients didn't accumulate properly"
print("\n✅ TEST PASSED: Gradients accumulate correctly")
return True
def main():
"""Run all gradient flow tests"""
print("\n" + "="*70)
print(" TINYTORCH GRADIENT FLOW TEST SUITE")
print("="*70)
tests = [
("Simple Linear", test_simple_linear_gradient_flow),
("MLP Gradient Flow", test_mlp_gradient_flow),
("MLP Training", test_mlp_training_updates),
("CNN Gradient Flow", test_cnn_gradient_flow),
("CNN Training", test_cnn_training_updates),
("Gradient Accumulation", test_gradient_accumulation),
]
results = []
for name, test_func in tests:
try:
result = test_func()
results.append((name, "PASSED" if result else "FAILED"))
except Exception as e:
print(f"\n❌ TEST FAILED: {name}")
print(f"Error: {str(e)}")
import traceback
traceback.print_exc()
results.append((name, "FAILED"))
# Summary
print("\n" + "="*70)
print(" TEST SUMMARY")
print("="*70)
passed = sum(1 for _, status in results if status == "PASSED")
total = len(results)
for name, status in results:
symbol = "" if status == "PASSED" else ""
print(f"{symbol} {name}: {status}")
print(f"\nTotal: {passed}/{total} tests passed")
if passed == total:
print("\n🎉 ALL TESTS PASSED! Gradients flow correctly through TinyTorch.")
return 0
else:
print(f"\n⚠️ {total - passed} tests failed. Please review the errors above.")
return 1
if __name__ == "__main__":
exit(main())