mirror of
https://github.com/MLSysBook/TinyTorch.git
synced 2026-03-09 13:42:01 -05:00
Updated test imports and paths after modules/source/ removal: - Progressive integration tests for modules 03, 06, 08, 13, 14 - Checkpoint integration tests - Module completion orchestrator - Optimizer integration tests - Gradient flow regression tests Updated test documentation: - tests/README.md with new module paths - tests/TEST_STRATEGY.md with restructuring notes All tests now reference modules/XX_name/ instead of modules/source/.
437 lines
14 KiB
Python
437 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Comprehensive Gradient Flow Tests for TinyTorch
|
|
================================================
|
|
|
|
Tests that gradients flow correctly through:
|
|
1. Simple networks (single layer)
|
|
2. Multi-layer networks (MLP)
|
|
3. Convolutional networks (CNN)
|
|
4. Attention mechanisms
|
|
5. Complete training loops
|
|
|
|
This ensures backpropagation works correctly end-to-end.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import numpy as np
|
|
|
|
# Add project root to path
|
|
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
sys.path.insert(0, project_root)
|
|
|
|
from tinytorch.core.tensor import Tensor
|
|
from tinytorch.core.layers import Linear, Dropout
|
|
from tinytorch.core.activations import ReLU, Sigmoid, Softmax
|
|
from tinytorch.core.losses import MSELoss, BinaryCrossEntropyLoss, CrossEntropyLoss
|
|
from tinytorch.core.optimizers import SGD, Adam
|
|
from tinytorch.core.spatial import Conv2d, MaxPool2d
|
|
from tinytorch.core.autograd import enable_autograd
|
|
|
|
# Enable autograd
|
|
enable_autograd()
|
|
|
|
def test_simple_linear_gradient_flow():
|
|
"""Test gradients flow through a single linear layer"""
|
|
print("\n" + "="*70)
|
|
print("TEST 1: Simple Linear Layer Gradient Flow")
|
|
print("="*70)
|
|
|
|
# Create simple network: Linear(2->1)
|
|
layer = Linear(2, 1)
|
|
|
|
# Input
|
|
x = Tensor([[1.0, 2.0]], requires_grad=True)
|
|
target = Tensor([[3.0]])
|
|
|
|
# Forward pass
|
|
output = layer.forward(x)
|
|
|
|
# Loss
|
|
loss_fn = MSELoss()
|
|
loss = loss_fn.forward(output, target)
|
|
|
|
print(f"Initial loss: {float(loss.data):.4f}")
|
|
print(f"Initial weight shape: {layer.weight.shape}")
|
|
print(f"Initial bias shape: {layer.bias.shape}")
|
|
|
|
# Backward pass
|
|
loss.backward()
|
|
|
|
# Check gradients exist
|
|
assert layer.weight.grad is not None, "Weight gradient is None!"
|
|
assert layer.bias.grad is not None, "Bias gradient is None!"
|
|
assert x.grad is not None, "Input gradient is None!"
|
|
|
|
# Check gradients are non-zero
|
|
weight_grad_norm = np.linalg.norm(layer.weight.grad.data)
|
|
bias_grad_norm = np.linalg.norm(layer.bias.grad.data)
|
|
input_grad_norm = np.linalg.norm(x.grad.data)
|
|
|
|
print(f"\n✓ Weight gradient norm: {weight_grad_norm:.6f}")
|
|
print(f"✓ Bias gradient norm: {bias_grad_norm:.6f}")
|
|
print(f"✓ Input gradient norm: {input_grad_norm:.6f}")
|
|
|
|
assert weight_grad_norm > 1e-6, f"Weight gradients too small: {weight_grad_norm}"
|
|
assert bias_grad_norm > 1e-6, f"Bias gradients too small: {bias_grad_norm}"
|
|
assert input_grad_norm > 1e-6, f"Input gradients too small: {input_grad_norm}"
|
|
|
|
print("\n✅ TEST PASSED: Gradients flow correctly through linear layer")
|
|
return True
|
|
|
|
|
|
def test_mlp_gradient_flow():
|
|
"""Test gradients flow through multi-layer perceptron"""
|
|
print("\n" + "="*70)
|
|
print("TEST 2: Multi-Layer Perceptron Gradient Flow")
|
|
print("="*70)
|
|
|
|
# Create MLP: Input(4) -> Linear(4->8) -> ReLU -> Linear(8->2)
|
|
layer1 = Linear(4, 8)
|
|
activation = ReLU()
|
|
layer2 = Linear(8, 2)
|
|
|
|
# Input and target
|
|
x = Tensor(np.random.randn(3, 4), requires_grad=True)
|
|
target = Tensor(np.array([[1, 0], [0, 1], [1, 0]]))
|
|
|
|
print(f"Input shape: {x.shape}")
|
|
print(f"Target shape: {target.shape}")
|
|
|
|
# Forward pass
|
|
h1 = layer1.forward(x)
|
|
h1_activated = activation.forward(h1)
|
|
output = layer2.forward(h1_activated)
|
|
|
|
print(f"Hidden layer shape: {h1.shape}")
|
|
print(f"Output shape: {output.shape}")
|
|
|
|
# Loss
|
|
loss_fn = MSELoss()
|
|
loss = loss_fn.forward(output, target)
|
|
|
|
print(f"Initial loss: {float(loss.data):.4f}")
|
|
|
|
# Backward pass
|
|
loss.backward()
|
|
|
|
# Check all layer gradients exist
|
|
assert layer1.weight.grad is not None, "Layer1 weight gradient is None!"
|
|
assert layer1.bias.grad is not None, "Layer1 bias gradient is None!"
|
|
assert layer2.weight.grad is not None, "Layer2 weight gradient is None!"
|
|
assert layer2.bias.grad is not None, "Layer2 bias gradient is None!"
|
|
|
|
# Check gradient magnitudes
|
|
l1_weight_norm = np.linalg.norm(layer1.weight.grad.data)
|
|
l1_bias_norm = np.linalg.norm(layer1.bias.grad.data)
|
|
l2_weight_norm = np.linalg.norm(layer2.weight.grad.data)
|
|
l2_bias_norm = np.linalg.norm(layer2.bias.grad.data)
|
|
|
|
print(f"\n✓ Layer1 weight gradient norm: {l1_weight_norm:.6f}")
|
|
print(f"✓ Layer1 bias gradient norm: {l1_bias_norm:.6f}")
|
|
print(f"✓ Layer2 weight gradient norm: {l2_weight_norm:.6f}")
|
|
print(f"✓ Layer2 bias gradient norm: {l2_bias_norm:.6f}")
|
|
|
|
assert l1_weight_norm > 1e-6, "Layer1 weight gradients too small"
|
|
assert l1_bias_norm > 1e-6, "Layer1 bias gradients too small"
|
|
assert l2_weight_norm > 1e-6, "Layer2 weight gradients too small"
|
|
assert l2_bias_norm > 1e-6, "Layer2 bias gradients too small"
|
|
|
|
print("\n✅ TEST PASSED: Gradients flow correctly through MLP")
|
|
return True
|
|
|
|
|
|
def test_mlp_training_updates():
|
|
"""Test that MLP actually learns (loss decreases)"""
|
|
print("\n" + "="*70)
|
|
print("TEST 3: MLP Training - Loss Reduction")
|
|
print("="*70)
|
|
|
|
# Create simple MLP
|
|
layer1 = Linear(2, 4)
|
|
activation = ReLU()
|
|
layer2 = Linear(4, 1)
|
|
|
|
# Simple dataset (XOR-like)
|
|
X = Tensor(np.array([[0.0, 0.0], [0.0, 1.0], [1.0, 0.0], [1.0, 1.0]]), requires_grad=False)
|
|
y = Tensor(np.array([[0.0], [1.0], [1.0], [0.0]]))
|
|
|
|
# Optimizer
|
|
optimizer = SGD([layer1.weight, layer1.bias, layer2.weight, layer2.bias], lr=0.1)
|
|
loss_fn = MSELoss()
|
|
|
|
losses = []
|
|
|
|
print("Training for 50 epochs...")
|
|
for epoch in range(50):
|
|
# Forward
|
|
h1 = layer1.forward(X)
|
|
h1_act = activation.forward(h1)
|
|
output = layer2.forward(h1_act)
|
|
|
|
# Loss
|
|
loss = loss_fn.forward(output, y)
|
|
losses.append(float(loss.data))
|
|
|
|
# Backward
|
|
optimizer.zero_grad()
|
|
loss.backward()
|
|
|
|
# Update
|
|
optimizer.step()
|
|
|
|
if (epoch + 1) % 10 == 0:
|
|
print(f"Epoch {epoch+1:2d}: Loss = {float(loss.data):.6f}")
|
|
|
|
# Check loss decreased
|
|
initial_loss = losses[0]
|
|
final_loss = losses[-1]
|
|
reduction = initial_loss - final_loss
|
|
reduction_pct = (reduction / initial_loss) * 100
|
|
|
|
print(f"\n✓ Initial loss: {initial_loss:.6f}")
|
|
print(f"✓ Final loss: {final_loss:.6f}")
|
|
print(f"✓ Reduction: {reduction:.6f} ({reduction_pct:.1f}%)")
|
|
|
|
assert final_loss < initial_loss, f"Loss didn't decrease! Initial: {initial_loss}, Final: {final_loss}"
|
|
assert reduction_pct > 10, f"Loss reduction too small: {reduction_pct:.1f}%"
|
|
|
|
print("\n✅ TEST PASSED: MLP learns successfully (loss decreases)")
|
|
return True
|
|
|
|
|
|
def test_cnn_gradient_flow():
|
|
"""Test gradients flow through convolutional layers"""
|
|
print("\n" + "="*70)
|
|
print("TEST 4: CNN Gradient Flow")
|
|
print("="*70)
|
|
|
|
# Create simple CNN: Conv2d -> ReLU -> Linear
|
|
conv = Conv2d(in_channels=1, out_channels=4, kernel_size=3, stride=1, padding=0)
|
|
activation = ReLU()
|
|
|
|
# Input: batch=2, channels=1, height=8, width=8
|
|
x = Tensor(np.random.randn(2, 1, 8, 8), requires_grad=True)
|
|
|
|
print(f"Input shape: {x.shape}")
|
|
print(f"Conv weight shape: {conv.weight.shape}")
|
|
|
|
# Forward through conv
|
|
conv_out = conv.forward(x)
|
|
print(f"Conv output shape: {conv_out.shape}")
|
|
|
|
activated = activation.forward(conv_out)
|
|
|
|
# Flatten for linear layer
|
|
batch_size = activated.shape[0]
|
|
flattened_size = np.prod(activated.shape[1:])
|
|
# Use reshape method to maintain gradient flow
|
|
flattened = activated.reshape(batch_size, flattened_size)
|
|
|
|
linear = Linear(flattened_size, 2)
|
|
output = linear.forward(flattened)
|
|
|
|
print(f"Flattened shape: {flattened.shape}")
|
|
print(f"Output shape: {output.shape}")
|
|
|
|
# Loss
|
|
target = Tensor(np.array([[1, 0], [0, 1]]))
|
|
loss_fn = MSELoss()
|
|
loss = loss_fn.forward(output, target)
|
|
|
|
print(f"Initial loss: {float(loss.data):.4f}")
|
|
|
|
# Backward
|
|
loss.backward()
|
|
|
|
# Check gradients
|
|
assert conv.weight.grad is not None, "Conv weight gradient is None!"
|
|
assert conv.bias.grad is not None, "Conv bias gradient is None!"
|
|
assert linear.weight.grad is not None, "Linear weight gradient is None!"
|
|
|
|
weight_grad_norm = np.linalg.norm(conv.weight.grad.data)
|
|
conv_bias_norm = np.linalg.norm(conv.bias.grad.data)
|
|
linear_grad_norm = np.linalg.norm(linear.weight.grad.data)
|
|
|
|
print(f"\n✓ Conv weight gradient norm: {weight_grad_norm:.6f}")
|
|
print(f"✓ Conv bias gradient norm: {conv_bias_norm:.6f}")
|
|
print(f"✓ Linear weight gradient norm: {linear_grad_norm:.6f}")
|
|
|
|
assert weight_grad_norm > 1e-6, f"Conv weight gradients too small: {weight_grad_norm}"
|
|
assert conv_bias_norm > 1e-6, f"Conv bias gradients too small: {conv_bias_norm}"
|
|
assert linear_grad_norm > 1e-6, f"Linear gradients too small: {linear_grad_norm}"
|
|
|
|
print("\n✅ TEST PASSED: Gradients flow correctly through CNN")
|
|
return True
|
|
|
|
|
|
def test_cnn_training_updates():
|
|
"""Test that CNN actually learns on simple data"""
|
|
print("\n" + "="*70)
|
|
print("TEST 5: CNN Training - Loss Reduction")
|
|
print("="*70)
|
|
|
|
# Simple CNN
|
|
conv = Conv2d(1, 2, kernel_size=3, stride=1, padding=1)
|
|
activation = ReLU()
|
|
|
|
# Simple data: 4 samples, 1 channel, 4x4 images
|
|
X = Tensor(np.random.randn(4, 1, 4, 4), requires_grad=False)
|
|
|
|
# After conv: (4, 2, 4, 4) -> flatten to (4, 32)
|
|
conv_out_size = 2 * 4 * 4 # channels * height * width
|
|
linear = Linear(conv_out_size, 2)
|
|
|
|
y = Tensor(np.array([[1, 0], [0, 1], [1, 0], [0, 1]]))
|
|
|
|
# Get parameters with gradients
|
|
params = []
|
|
for p in [conv.weight, conv.bias, linear.weight, linear.bias]:
|
|
if not p.requires_grad:
|
|
p.requires_grad = True
|
|
params.append(p)
|
|
|
|
# Optimizer
|
|
optimizer = SGD(params, lr=0.01)
|
|
loss_fn = MSELoss()
|
|
|
|
losses = []
|
|
|
|
print("Training for 30 epochs...")
|
|
for epoch in range(30):
|
|
# Forward
|
|
conv_out = conv.forward(X)
|
|
activated = activation.forward(conv_out)
|
|
|
|
# Flatten using reshape to maintain gradients
|
|
batch_size = activated.shape[0]
|
|
flattened = activated.reshape(batch_size, -1)
|
|
|
|
output = linear.forward(flattened)
|
|
|
|
# Loss
|
|
loss = loss_fn.forward(output, y)
|
|
losses.append(float(loss.data))
|
|
|
|
# Backward
|
|
optimizer.zero_grad()
|
|
loss.backward()
|
|
|
|
# Update
|
|
optimizer.step()
|
|
|
|
if (epoch + 1) % 10 == 0:
|
|
print(f"Epoch {epoch+1:2d}: Loss = {float(loss.data):.6f}")
|
|
|
|
# Check loss decreased
|
|
initial_loss = losses[0]
|
|
final_loss = losses[-1]
|
|
reduction = initial_loss - final_loss
|
|
reduction_pct = (reduction / initial_loss) * 100
|
|
|
|
print(f"\n✓ Initial loss: {initial_loss:.6f}")
|
|
print(f"✓ Final loss: {final_loss:.6f}")
|
|
print(f"✓ Reduction: {reduction:.6f} ({reduction_pct:.1f}%)")
|
|
|
|
assert final_loss < initial_loss, f"Loss didn't decrease! Initial: {initial_loss}, Final: {final_loss}"
|
|
|
|
print("\n✅ TEST PASSED: CNN learns successfully (loss decreases)")
|
|
return True
|
|
|
|
|
|
def test_gradient_accumulation():
|
|
"""Test that gradients accumulate correctly across batches"""
|
|
print("\n" + "="*70)
|
|
print("TEST 6: Gradient Accumulation")
|
|
print("="*70)
|
|
|
|
layer = Linear(2, 1)
|
|
|
|
# Two batches
|
|
x1 = Tensor([[1.0, 2.0]], requires_grad=True)
|
|
x2 = Tensor([[3.0, 4.0]], requires_grad=True)
|
|
target = Tensor([[1.0]])
|
|
|
|
loss_fn = MSELoss()
|
|
|
|
# Forward + backward on first batch (don't zero grad)
|
|
out1 = layer.forward(x1)
|
|
loss1 = loss_fn.forward(out1, target)
|
|
loss1.backward()
|
|
|
|
grad_after_first = np.array(layer.weight.grad.data)
|
|
|
|
# Forward + backward on second batch (gradients should accumulate)
|
|
out2 = layer.forward(x2)
|
|
loss2 = loss_fn.forward(out2, target)
|
|
loss2.backward()
|
|
|
|
grad_after_second = layer.weight.grad.data
|
|
|
|
# Gradients should have accumulated (not been replaced)
|
|
grad_diff = np.linalg.norm(grad_after_second - grad_after_first)
|
|
|
|
print(f"✓ Gradient after first batch norm: {np.linalg.norm(grad_after_first):.6f}")
|
|
print(f"✓ Gradient after second batch norm: {np.linalg.norm(grad_after_second):.6f}")
|
|
print(f"✓ Difference: {grad_diff:.6f}")
|
|
|
|
assert grad_diff > 1e-6, "Gradients didn't accumulate properly"
|
|
|
|
print("\n✅ TEST PASSED: Gradients accumulate correctly")
|
|
return True
|
|
|
|
|
|
def main():
|
|
"""Run all gradient flow tests"""
|
|
print("\n" + "="*70)
|
|
print(" TINYTORCH GRADIENT FLOW TEST SUITE")
|
|
print("="*70)
|
|
|
|
tests = [
|
|
("Simple Linear", test_simple_linear_gradient_flow),
|
|
("MLP Gradient Flow", test_mlp_gradient_flow),
|
|
("MLP Training", test_mlp_training_updates),
|
|
("CNN Gradient Flow", test_cnn_gradient_flow),
|
|
("CNN Training", test_cnn_training_updates),
|
|
("Gradient Accumulation", test_gradient_accumulation),
|
|
]
|
|
|
|
results = []
|
|
|
|
for name, test_func in tests:
|
|
try:
|
|
result = test_func()
|
|
results.append((name, "PASSED" if result else "FAILED"))
|
|
except Exception as e:
|
|
print(f"\n❌ TEST FAILED: {name}")
|
|
print(f"Error: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
results.append((name, "FAILED"))
|
|
|
|
# Summary
|
|
print("\n" + "="*70)
|
|
print(" TEST SUMMARY")
|
|
print("="*70)
|
|
|
|
passed = sum(1 for _, status in results if status == "PASSED")
|
|
total = len(results)
|
|
|
|
for name, status in results:
|
|
symbol = "✅" if status == "PASSED" else "❌"
|
|
print(f"{symbol} {name}: {status}")
|
|
|
|
print(f"\nTotal: {passed}/{total} tests passed")
|
|
|
|
if passed == total:
|
|
print("\n🎉 ALL TESTS PASSED! Gradients flow correctly through TinyTorch.")
|
|
return 0
|
|
else:
|
|
print(f"\n⚠️ {total - passed} tests failed. Please review the errors above.")
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
exit(main())
|