mirror of
https://github.com/harvard-edge/cs249r_book.git
synced 2026-03-09 07:15:51 -05:00
refactor(tests): reorganize test folders and fix misplaced files
Folder consolidation: - Merge system/ into integration/ (removed duplicate folder) - Remove performance/ (only had framework, no tests) File relocations: - Move test_dense_layer.py, test_dense_integration.py from 04_losses/ to 03_layers/ - Move test_network_capability.py from 04_losses/ to integration/ - Move test_kv_cache_integration.py from 14_profiling/ to 18_memoization/ - Move system/ tests (forward_passes, gradients, shapes, etc.) to integration/ Removed duplicates: - system/test_gradient_flow_overall.py (duplicate of integration version) - system/test_integration.py (redundant with integration/ folder) - system/test_milestones.py (duplicate of milestones/ tests) Final structure: 26 folders, 100 test files
This commit is contained in:
@@ -1,248 +0,0 @@
|
||||
# TinyTorch Performance Testing Framework
|
||||
|
||||
This directory contains comprehensive performance tests that validate whether TinyTorch's optimization modules actually deliver their claimed benefits through **scientific measurement**.
|
||||
|
||||
## Overview
|
||||
|
||||
The performance testing framework addresses a critical question: **Do the optimization modules really work?**
|
||||
|
||||
Rather than accepting theoretical claims, we measure:
|
||||
- **Actual speedups** with confidence intervals
|
||||
- **Real memory usage** with proper profiling
|
||||
- **Genuine accuracy preservation** with statistical validation
|
||||
- **Honest reporting** of both successes and failures
|
||||
|
||||
## Framework Design Principles
|
||||
|
||||
### Scientific Rigor
|
||||
- **Statistical methodology**: Multiple runs, warmup periods, confidence intervals
|
||||
- **Proper baselines**: Compare against realistic implementations, not strawmen
|
||||
- **Noise reduction**: Control for GC, system load, measurement overhead
|
||||
- **Reproducibility**: Consistent results across runs and environments
|
||||
|
||||
### Honest Assessment
|
||||
- **Report failures**: When optimizations don't work, we say so
|
||||
- **Measure real workloads**: Use realistic data sizes and operations
|
||||
- **Validate claims**: Test specific performance assertions (e.g., "4× speedup")
|
||||
- **Systems focus**: Measure what matters for ML systems engineering
|
||||
|
||||
### Comprehensive Coverage
|
||||
- **All optimization modules**: 15 (Profiling), 16 (Acceleration), 17 (Quantization), 19 (Caching), 20 (Benchmarking)
|
||||
- **Multiple metrics**: Speed, memory, accuracy, complexity, correctness
|
||||
- **Scaling behavior**: How do optimizations perform with different input sizes?
|
||||
- **Edge cases**: Do optimizations work across different scenarios?
|
||||
|
||||
## Framework Components
|
||||
|
||||
### 1. `performance_test_framework.py` - Core Infrastructure
|
||||
- **ScientificTimer**: High-precision timing with statistical rigor
|
||||
- **PerformanceComparator**: Statistical comparison of implementations
|
||||
- **WorkloadGenerator**: Realistic ML workloads for testing
|
||||
- **PerformanceTestSuite**: Orchestrates complete test execution
|
||||
|
||||
### 2. Module-Specific Test Files
|
||||
- **`test_module_15_profiling.py`**: Validates profiling tool accuracy
|
||||
- **`test_module_16_acceleration.py`**: Measures acceleration speedups
|
||||
- **`test_module_17_quantization.py`**: Tests quantization benefits and accuracy
|
||||
- **`test_module_19_caching.py`**: Validates KV cache complexity reduction
|
||||
- **`test_module_20_benchmarking.py`**: Tests benchmarking system reliability
|
||||
|
||||
### 3. `run_all_performance_tests.py` - Complete Validation
|
||||
- Executes all module tests systematically
|
||||
- Generates comprehensive analysis report
|
||||
- Provides honest assessment of optimization effectiveness
|
||||
- Saves detailed results for further analysis
|
||||
|
||||
## Quick Start
|
||||
|
||||
### Run All Tests
|
||||
```bash
|
||||
cd tests/performance
|
||||
python run_all_performance_tests.py
|
||||
```
|
||||
|
||||
This will:
|
||||
1. Test all optimization modules (15-20)
|
||||
2. Generate detailed performance measurements
|
||||
3. Provide statistical analysis of results
|
||||
4. Create honest assessment of what works and what doesn't
|
||||
5. Save complete results to `validation_results/`
|
||||
|
||||
### Run Individual Module Tests
|
||||
```bash
|
||||
python test_module_15_profiling.py # Test profiling tools
|
||||
python test_module_16_acceleration.py # Test acceleration techniques
|
||||
python test_module_17_quantization.py # Test quantization benefits
|
||||
python test_module_19_caching.py # Test KV caching speedups
|
||||
python test_module_20_benchmarking.py # Test benchmarking reliability
|
||||
```
|
||||
|
||||
## Understanding Test Results
|
||||
|
||||
### Success Criteria
|
||||
Each test reports **specific, measurable success criteria**:
|
||||
|
||||
**Module 15 (Profiling)**:
|
||||
- Timer accuracy: Can detect known performance differences
|
||||
- Memory profiler: Correctly tracks memory allocations
|
||||
- FLOP counter: Accurately calculates operation counts
|
||||
- Low overhead: Profiling doesn't significantly slow operations
|
||||
|
||||
**Module 16 (Acceleration)**:
|
||||
- Naive vs blocked: Cache-friendly algorithms show improvement
|
||||
- Blocked vs NumPy: NumPy demonstrates hardware acceleration benefits
|
||||
- Full spectrum: 5-100× speedups from naive loops to optimized libraries
|
||||
- Backend system: Smart dispatch works with minimal overhead
|
||||
|
||||
**Module 15 (Quantization)**:
|
||||
- Memory reduction: 3-4× reduction in model size
|
||||
- Inference speedup: Faster execution (hardware dependent)
|
||||
- Accuracy preservation: <5% degradation in model quality
|
||||
- Quantization precision: Round-trip error within acceptable bounds
|
||||
|
||||
**Module 19 (Caching)**:
|
||||
- Memory efficiency: Cache scales linearly with sequence length
|
||||
- Correctness: Cached values retrieved accurately
|
||||
- Complexity reduction: O(N²) → O(N) scaling demonstrated
|
||||
- Practical speedups: Measurable improvement in sequential generation
|
||||
|
||||
**Module 20 (Benchmarking)**:
|
||||
- Reproducibility: Consistent results across runs
|
||||
- Performance detection: Can identify real optimization differences
|
||||
- Fair comparison: Different events provide meaningful competition
|
||||
- Scoring accuracy: Relative performance measured correctly
|
||||
|
||||
### Interpreting Results
|
||||
|
||||
**✅ PASS**: Optimization delivers claimed benefits with statistical significance
|
||||
**⚠️ PARTIAL**: Some benefits shown but not all claims validated
|
||||
**❌ FAIL**: Optimization doesn't provide meaningful improvements
|
||||
**🚨 ERROR**: Implementation issues prevent proper testing
|
||||
|
||||
### Statistical Validity
|
||||
All timing comparisons include:
|
||||
- **Confidence intervals**: 95% confidence bounds on measurements
|
||||
- **Significance testing**: Statistical tests for meaningful differences
|
||||
- **Variance analysis**: Coefficient of variation to assess measurement quality
|
||||
- **Sample sizes**: Sufficient runs for statistical power
|
||||
|
||||
## Test Categories
|
||||
|
||||
### 1. Correctness Tests
|
||||
Verify that optimizations produce correct results:
|
||||
- Mathematical equivalence of optimized vs baseline implementations
|
||||
- Numerical precision within acceptable bounds
|
||||
- Edge case handling (empty inputs, extreme values)
|
||||
|
||||
### 2. Performance Tests
|
||||
Measure actual performance improvements:
|
||||
- **Timing**: Wall-clock time with proper statistical methodology
|
||||
- **Memory**: Peak usage, allocation patterns, memory efficiency
|
||||
- **Throughput**: Operations per second, batching efficiency
|
||||
- **Scaling**: How performance changes with input size
|
||||
|
||||
### 3. Systems Tests
|
||||
Evaluate systems engineering aspects:
|
||||
- **Cache behavior**: Memory access patterns and cache efficiency
|
||||
- **Resource utilization**: CPU, memory, bandwidth usage
|
||||
- **Overhead analysis**: Cost of optimizations vs benefits
|
||||
- **Integration**: How optimizations work together
|
||||
|
||||
### 4. Robustness Tests
|
||||
Test optimization reliability:
|
||||
- **Input variation**: Different data distributions, sizes, types
|
||||
- **Environmental factors**: Different hardware, system loads
|
||||
- **Error handling**: Graceful degradation when optimizations can't be applied
|
||||
- **Consistency**: Reliable performance across multiple runs
|
||||
|
||||
## Key Insights from Testing
|
||||
|
||||
### What We've Learned
|
||||
|
||||
**Profiling Tools (Module 14)**:
|
||||
- Timer accuracy varies significantly with operation complexity
|
||||
- Memory profiling has substantial overhead on small operations
|
||||
- FLOP counting can be accurate but requires careful implementation
|
||||
- Production profiling needs minimal overhead for practical use
|
||||
|
||||
**Quantization (Module 15)**:
|
||||
- Memory reduction: Reliable 3-4× improvement in model size
|
||||
- Speed improvement: Depends heavily on hardware INT8 support
|
||||
- Accuracy preservation: Achievable with proper calibration
|
||||
- Educational vs production: Large gap in actual speedup implementation
|
||||
|
||||
**Compression (Module 16)**:
|
||||
- Pruning reduces parameters 50%+ with minimal accuracy loss
|
||||
- Structured vs unstructured pruning tradeoffs
|
||||
- Magnitude-based pruning is simple but effective
|
||||
|
||||
**KV Caching (Module 18)**:
|
||||
- Complexity reduction: Demonstrable O(N²) → O(N) improvement
|
||||
- Memory growth: Linear scaling validates cache design
|
||||
- Practical speedups: Most visible in longer sequences (>32 tokens)
|
||||
- Implementation complexity: Easy to introduce subtle bugs
|
||||
|
||||
**Acceleration (Module 17)**:
|
||||
- NumPy vs naive loops: 10-100× speedups easily achievable
|
||||
- Cache blocking: 20-50% improvements on appropriate workloads
|
||||
- Backend dispatch: Can add 5-20% overhead if not implemented carefully
|
||||
- Scaling behavior: Benefits increase with problem size (memory-bound operations)
|
||||
|
||||
**Benchmarking (Module 19)**:
|
||||
- Reproducibility: Achievable with proper methodology
|
||||
- Fair comparison: Requires careful workload design
|
||||
- Performance detection: Can identify differences >20% reliably
|
||||
- Competition scoring: Relative metrics more reliable than absolute
|
||||
|
||||
### Unexpected Findings
|
||||
|
||||
1. **Profiling overhead**: More significant than expected on small operations
|
||||
2. **Quantization educational gap**: Real speedups require hardware support
|
||||
3. **Cache behavior**: Memory access patterns matter more than algorithmic complexity
|
||||
4. **Statistical measurement**: High variance requires many runs for reliable results
|
||||
5. **Integration effects**: Optimizations can interfere with each other
|
||||
|
||||
## Limitations and Future Work
|
||||
|
||||
### Current Limitations
|
||||
- **Hardware dependency**: Some optimizations require specific hardware (INT8, vectorization)
|
||||
- **Workload scope**: Limited to synthetic benchmarks, not real ML applications
|
||||
- **Environmental factors**: Results may vary significantly across different systems
|
||||
- **Educational constraints**: Some "optimizations" are pedagogical rather than production-ready
|
||||
|
||||
### Future Enhancements
|
||||
- **Continuous integration**: Automated performance testing on code changes
|
||||
- **Hardware matrix**: Testing across different CPU/GPU configurations
|
||||
- **Real workload integration**: Performance testing on actual student ML projects
|
||||
- **Regression detection**: Automated alerts when optimizations regress
|
||||
- **Comparative analysis**: Benchmarking against PyTorch/TensorFlow equivalents
|
||||
|
||||
## Contributing
|
||||
|
||||
### Adding New Performance Tests
|
||||
1. **Create test file**: `test_module_XX_description.py`
|
||||
2. **Use framework**: Import and extend `PerformanceTestSuite`
|
||||
3. **Scientific methodology**: Multiple runs, proper baselines, statistical analysis
|
||||
4. **Honest reporting**: Report both successes and failures
|
||||
5. **Integration**: Add to `run_all_performance_tests.py`
|
||||
|
||||
### Test Quality Standards
|
||||
- **Reproducible**: Same results across runs (within statistical bounds)
|
||||
- **Meaningful**: Test realistic scenarios students will encounter
|
||||
- **Scientific**: Proper statistical methodology and significance testing
|
||||
- **Honest**: Report when optimizations don't work as claimed
|
||||
- **Documented**: Clear explanation of what's being tested and why
|
||||
|
||||
## Results Archive
|
||||
|
||||
Performance test results are saved to `validation_results/` with timestamps for historical comparison and regression analysis.
|
||||
|
||||
Each results file contains:
|
||||
- **Raw measurements**: All timing, memory, and accuracy data
|
||||
- **Statistical analysis**: Confidence intervals, significance tests
|
||||
- **Assessment**: Human-readable evaluation of optimization effectiveness
|
||||
- **Metadata**: Test environment, configuration, timestamps
|
||||
|
||||
---
|
||||
|
||||
**The goal of this framework is scientific honesty about optimization effectiveness. We measure what actually works, report what doesn't, and help students understand the real performance characteristics of ML systems optimizations.**
|
||||
@@ -1,295 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Scientific Performance Testing Framework for TinyTorch
|
||||
====================================================
|
||||
|
||||
This framework provides rigorous, scientific performance measurement
|
||||
with proper statistical analysis and confidence intervals.
|
||||
|
||||
Key Features:
|
||||
- Statistical timing with warmup and multiple runs
|
||||
- Memory profiling with peak usage tracking
|
||||
- Confidence intervals and significance testing
|
||||
- Controlled environment for reliable measurements
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import time
|
||||
import gc
|
||||
import tracemalloc
|
||||
from typing import Dict, List, Tuple, Callable, Any, Optional
|
||||
import statistics
|
||||
|
||||
|
||||
class PerformanceTimer:
|
||||
"""Statistical timing with proper warmup and confidence intervals."""
|
||||
|
||||
def __init__(self, warmup_runs: int = 3, timing_runs: int = 10):
|
||||
self.warmup_runs = warmup_runs
|
||||
self.timing_runs = timing_runs
|
||||
|
||||
def measure(self, func: Callable, *args, **kwargs) -> Dict[str, float]:
|
||||
"""Measure function performance with statistical rigor."""
|
||||
# Force garbage collection before measurement
|
||||
gc.collect()
|
||||
|
||||
# Warmup runs (not timed)
|
||||
for _ in range(self.warmup_runs):
|
||||
func(*args, **kwargs)
|
||||
|
||||
# Actual timing runs
|
||||
times = []
|
||||
for _ in range(self.timing_runs):
|
||||
gc.collect() # Clean state for each run
|
||||
|
||||
start_time = time.perf_counter()
|
||||
result = func(*args, **kwargs)
|
||||
end_time = time.perf_counter()
|
||||
|
||||
times.append(end_time - start_time)
|
||||
|
||||
# Statistical analysis
|
||||
mean_time = statistics.mean(times)
|
||||
std_time = statistics.stdev(times) if len(times) > 1 else 0.0
|
||||
median_time = statistics.median(times)
|
||||
min_time = min(times)
|
||||
max_time = max(times)
|
||||
|
||||
# 95% confidence interval
|
||||
if len(times) > 1:
|
||||
confidence_95 = 1.96 * std_time / (len(times) ** 0.5)
|
||||
else:
|
||||
confidence_95 = 0.0
|
||||
|
||||
return {
|
||||
'mean': mean_time,
|
||||
'std': std_time,
|
||||
'median': median_time,
|
||||
'min': min_time,
|
||||
'max': max_time,
|
||||
'runs': len(times),
|
||||
'confidence_95': confidence_95,
|
||||
'coefficient_of_variation': std_time / mean_time if mean_time > 0 else 0.0,
|
||||
'result': result # Store last result for validation
|
||||
}
|
||||
|
||||
|
||||
class MemoryProfiler:
|
||||
"""Memory usage profiling with peak usage tracking."""
|
||||
|
||||
def measure(self, func: Callable, *args, **kwargs) -> Dict[str, Any]:
|
||||
"""Measure memory usage during function execution."""
|
||||
tracemalloc.start()
|
||||
|
||||
# Baseline memory
|
||||
baseline_mem = tracemalloc.get_traced_memory()[0]
|
||||
|
||||
# Execute function
|
||||
result = func(*args, **kwargs)
|
||||
|
||||
# Peak memory during execution
|
||||
current_mem, peak_mem = tracemalloc.get_traced_memory()
|
||||
tracemalloc.stop()
|
||||
|
||||
return {
|
||||
'baseline_bytes': baseline_mem,
|
||||
'peak_bytes': peak_mem,
|
||||
'current_bytes': current_mem,
|
||||
'allocated_bytes': peak_mem - baseline_mem,
|
||||
'baseline_mb': baseline_mem / 1024 / 1024,
|
||||
'peak_mb': peak_mem / 1024 / 1024,
|
||||
'allocated_mb': (peak_mem - baseline_mem) / 1024 / 1024,
|
||||
'result': result
|
||||
}
|
||||
|
||||
|
||||
class AccuracyTester:
|
||||
"""Test accuracy preservation during optimizations."""
|
||||
|
||||
@staticmethod
|
||||
def compare_outputs(original: Any, optimized: Any, tolerance: float = 1e-6) -> Dict[str, float]:
|
||||
"""Compare two outputs for numerical equivalence."""
|
||||
if hasattr(original, 'data'):
|
||||
original = original.data
|
||||
if hasattr(optimized, 'data'):
|
||||
optimized = optimized.data
|
||||
|
||||
# Convert to numpy arrays
|
||||
orig_array = np.array(original)
|
||||
opt_array = np.array(optimized)
|
||||
|
||||
# Check shapes match
|
||||
if orig_array.shape != opt_array.shape:
|
||||
return {
|
||||
'shapes_match': False,
|
||||
'max_diff': float('inf'),
|
||||
'mean_diff': float('inf'),
|
||||
'accuracy_preserved': False
|
||||
}
|
||||
|
||||
# Calculate differences
|
||||
diff = np.abs(orig_array - opt_array)
|
||||
max_diff = np.max(diff)
|
||||
mean_diff = np.mean(diff)
|
||||
|
||||
# Relative accuracy
|
||||
if np.max(np.abs(orig_array)) > 0:
|
||||
relative_error = max_diff / np.max(np.abs(orig_array))
|
||||
else:
|
||||
relative_error = max_diff
|
||||
|
||||
accuracy_preserved = max_diff < tolerance
|
||||
|
||||
return {
|
||||
'shapes_match': True,
|
||||
'max_diff': float(max_diff),
|
||||
'mean_diff': float(mean_diff),
|
||||
'relative_error': float(relative_error),
|
||||
'accuracy_preserved': accuracy_preserved,
|
||||
'tolerance': tolerance
|
||||
}
|
||||
|
||||
|
||||
class PerformanceTester:
|
||||
"""Main performance testing framework combining timing, memory, and accuracy."""
|
||||
|
||||
def __init__(self, warmup_runs: int = 3, timing_runs: int = 10):
|
||||
self.timer = PerformanceTimer(warmup_runs, timing_runs)
|
||||
self.memory = MemoryProfiler()
|
||||
self.accuracy = AccuracyTester()
|
||||
|
||||
def compare_performance(self,
|
||||
baseline_func: Callable,
|
||||
optimized_func: Callable,
|
||||
args: Tuple = (),
|
||||
kwargs: Dict = None,
|
||||
test_name: str = "Performance Test") -> Dict[str, Any]:
|
||||
"""Compare baseline vs optimized implementations comprehensively."""
|
||||
if kwargs is None:
|
||||
kwargs = {}
|
||||
|
||||
print(f"\n🧪 {test_name}")
|
||||
print("=" * 50)
|
||||
|
||||
# Test baseline performance
|
||||
print(" Testing baseline implementation...")
|
||||
baseline_timing = self.timer.measure(baseline_func, *args, **kwargs)
|
||||
baseline_memory = self.memory.measure(baseline_func, *args, **kwargs)
|
||||
|
||||
# Test optimized performance
|
||||
print(" Testing optimized implementation...")
|
||||
optimized_timing = self.timer.measure(optimized_func, *args, **kwargs)
|
||||
optimized_memory = self.memory.measure(optimized_func, *args, **kwargs)
|
||||
|
||||
# Compare accuracy
|
||||
accuracy_comparison = self.accuracy.compare_outputs(
|
||||
baseline_timing['result'],
|
||||
optimized_timing['result']
|
||||
)
|
||||
|
||||
# Calculate speedup
|
||||
speedup = baseline_timing['mean'] / optimized_timing['mean']
|
||||
memory_ratio = optimized_memory['peak_mb'] / baseline_memory['peak_mb']
|
||||
|
||||
# Statistical significance of speedup
|
||||
baseline_ci = baseline_timing['confidence_95']
|
||||
optimized_ci = optimized_timing['confidence_95']
|
||||
speedup_significant = (baseline_timing['mean'] - baseline_ci) > (optimized_timing['mean'] + optimized_ci)
|
||||
|
||||
results = {
|
||||
'test_name': test_name,
|
||||
'baseline': {
|
||||
'timing': baseline_timing,
|
||||
'memory': baseline_memory
|
||||
},
|
||||
'optimized': {
|
||||
'timing': optimized_timing,
|
||||
'memory': optimized_memory
|
||||
},
|
||||
'comparison': {
|
||||
'speedup': speedup,
|
||||
'memory_ratio': memory_ratio,
|
||||
'accuracy': accuracy_comparison,
|
||||
'speedup_significant': speedup_significant
|
||||
}
|
||||
}
|
||||
|
||||
# Print results
|
||||
self._print_results(results)
|
||||
|
||||
return results
|
||||
|
||||
def _print_results(self, results: Dict[str, Any]):
|
||||
"""Print formatted test results."""
|
||||
baseline = results['baseline']
|
||||
optimized = results['optimized']
|
||||
comparison = results['comparison']
|
||||
|
||||
print(f"\n 📊 Results:")
|
||||
print(f" Baseline: {baseline['timing']['mean']*1000:.3f} ± {baseline['timing']['confidence_95']*1000:.3f} ms")
|
||||
print(f" Optimized: {optimized['timing']['mean']*1000:.3f} ± {optimized['timing']['confidence_95']*1000:.3f} ms")
|
||||
print(f" Speedup: {comparison['speedup']:.2f}× {'✅ significant' if comparison['speedup_significant'] else '⚠️ not significant'}")
|
||||
|
||||
print(f"\n Memory Usage:")
|
||||
print(f" Baseline: {baseline['memory']['peak_mb']:.2f} MB")
|
||||
print(f" Optimized: {optimized['memory']['peak_mb']:.2f} MB")
|
||||
print(f" Ratio: {comparison['memory_ratio']:.2f}× {'(less memory)' if comparison['memory_ratio'] < 1 else '(more memory)'}")
|
||||
|
||||
print(f"\n Accuracy:")
|
||||
if comparison['accuracy']['shapes_match']:
|
||||
print(f" Max diff: {comparison['accuracy']['max_diff']:.2e}")
|
||||
print(f" Accuracy: {'✅ preserved' if comparison['accuracy']['accuracy_preserved'] else '❌ lost'}")
|
||||
else:
|
||||
print(f" Shapes: ❌ don't match")
|
||||
|
||||
# Overall assessment
|
||||
overall_success = (
|
||||
comparison['speedup'] > 1.1 and # At least 10% speedup
|
||||
comparison['speedup_significant'] and # Statistically significant
|
||||
comparison['accuracy']['accuracy_preserved'] # Accuracy preserved
|
||||
)
|
||||
|
||||
print(f"\n 🎯 Overall: {'✅ OPTIMIZATION SUCCESSFUL' if overall_success else '⚠️ NEEDS IMPROVEMENT'}")
|
||||
|
||||
|
||||
def create_test_data(size: int = 1000) -> Tuple[np.ndarray, np.ndarray]:
|
||||
"""Create standard test data for benchmarks."""
|
||||
np.random.seed(42) # Reproducible results
|
||||
X = np.random.randn(size, size).astype(np.float32)
|
||||
y = np.random.randn(size, size).astype(np.float32)
|
||||
return X, y
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Demo of the framework
|
||||
print("🧪 TinyTorch Performance Testing Framework")
|
||||
print("=========================================")
|
||||
|
||||
# Example: Compare naive vs numpy matrix multiplication
|
||||
def naive_matmul(a, b):
|
||||
"""Naive O(n³) matrix multiplication."""
|
||||
n, m = a.shape[0], b.shape[1]
|
||||
k = a.shape[1]
|
||||
result = np.zeros((n, m), dtype=np.float32)
|
||||
for i in range(n):
|
||||
for j in range(m):
|
||||
for idx in range(k):
|
||||
result[i, j] += a[i, idx] * b[idx, j]
|
||||
return result
|
||||
|
||||
def optimized_matmul(a, b):
|
||||
"""NumPy optimized matrix multiplication."""
|
||||
return np.dot(a, b)
|
||||
|
||||
# Test with small matrices for speed
|
||||
test_size = 100
|
||||
A, B = create_test_data(test_size)
|
||||
|
||||
tester = PerformanceTester(warmup_runs=2, timing_runs=5)
|
||||
results = tester.compare_performance(
|
||||
naive_matmul, optimized_matmul,
|
||||
args=(A, B),
|
||||
test_name="Matrix Multiplication: Naive vs NumPy"
|
||||
)
|
||||
|
||||
print(f"\nFramework demonstrates real {results['comparison']['speedup']:.1f}× speedup!")
|
||||
@@ -1,451 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Comprehensive Gradient Flow Tests for TinyTorch
|
||||
================================================
|
||||
|
||||
Tests that gradients flow correctly through:
|
||||
1. Simple networks (single layer)
|
||||
2. Multi-layer networks (MLP)
|
||||
3. Convolutional networks (CNN)
|
||||
4. Attention mechanisms
|
||||
5. Complete training loops
|
||||
|
||||
This ensures backpropagation works correctly end-to-end.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import numpy as np
|
||||
|
||||
# Add project root to path
|
||||
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
sys.path.insert(0, project_root)
|
||||
|
||||
from tinytorch.core.tensor import Tensor
|
||||
from tinytorch.core.layers import Linear, Dropout
|
||||
from tinytorch.core.activations import ReLU, Sigmoid, Softmax
|
||||
from tinytorch.core.losses import MSELoss, BinaryCrossEntropyLoss, CrossEntropyLoss
|
||||
from tinytorch.core.optimizers import SGD, Adam
|
||||
from tinytorch.core.spatial import Conv2d, MaxPool2d
|
||||
from tinytorch.core.autograd import enable_autograd
|
||||
|
||||
# Enable autograd
|
||||
enable_autograd()
|
||||
|
||||
def test_simple_linear_gradient_flow():
|
||||
"""Test gradients flow through a single linear layer"""
|
||||
print("\n" + "="*70)
|
||||
print("TEST 1: Simple Linear Layer Gradient Flow")
|
||||
print("="*70)
|
||||
|
||||
# Create simple network: Linear(2->1)
|
||||
layer = Linear(2, 1)
|
||||
|
||||
# Create optimizer - this enables requires_grad on layer parameters
|
||||
# (reflects real usage: students always create optimizer before training)
|
||||
optimizer = SGD(layer.parameters(), lr=0.01)
|
||||
|
||||
# Input
|
||||
x = Tensor([[1.0, 2.0]], requires_grad=True)
|
||||
target = Tensor([[3.0]])
|
||||
|
||||
# Forward pass
|
||||
output = layer.forward(x)
|
||||
|
||||
# Loss
|
||||
loss_fn = MSELoss()
|
||||
loss = loss_fn.forward(output, target)
|
||||
|
||||
print(f"Initial loss: {float(loss.data):.4f}")
|
||||
print(f"Initial weight shape: {layer.weight.shape}")
|
||||
print(f"Initial bias shape: {layer.bias.shape}")
|
||||
|
||||
# Backward pass
|
||||
loss.backward()
|
||||
|
||||
# Check gradients exist
|
||||
assert layer.weight.grad is not None, "Weight gradient is None!"
|
||||
assert layer.bias.grad is not None, "Bias gradient is None!"
|
||||
assert x.grad is not None, "Input gradient is None!"
|
||||
|
||||
# Check gradients are non-zero
|
||||
weight_grad_norm = np.linalg.norm(layer.weight.grad.data)
|
||||
bias_grad_norm = np.linalg.norm(layer.bias.grad.data)
|
||||
input_grad_norm = np.linalg.norm(x.grad.data)
|
||||
|
||||
print(f"\n✓ Weight gradient norm: {weight_grad_norm:.6f}")
|
||||
print(f"✓ Bias gradient norm: {bias_grad_norm:.6f}")
|
||||
print(f"✓ Input gradient norm: {input_grad_norm:.6f}")
|
||||
|
||||
assert weight_grad_norm > 1e-6, f"Weight gradients too small: {weight_grad_norm}"
|
||||
assert bias_grad_norm > 1e-6, f"Bias gradients too small: {bias_grad_norm}"
|
||||
assert input_grad_norm > 1e-6, f"Input gradients too small: {input_grad_norm}"
|
||||
|
||||
print("\n✅ TEST PASSED: Gradients flow correctly through linear layer")
|
||||
return True
|
||||
|
||||
|
||||
def test_mlp_gradient_flow():
|
||||
"""Test gradients flow through multi-layer perceptron"""
|
||||
print("\n" + "="*70)
|
||||
print("TEST 2: Multi-Layer Perceptron Gradient Flow")
|
||||
print("="*70)
|
||||
|
||||
# Create MLP: Input(4) -> Linear(4->8) -> ReLU -> Linear(8->2)
|
||||
layer1 = Linear(4, 8)
|
||||
activation = ReLU()
|
||||
layer2 = Linear(8, 2)
|
||||
|
||||
# Create optimizer - this enables requires_grad on layer parameters
|
||||
optimizer = SGD(layer1.parameters() + layer2.parameters(), lr=0.01)
|
||||
|
||||
# Input and target
|
||||
x = Tensor(np.random.randn(3, 4), requires_grad=True)
|
||||
target = Tensor(np.array([[1, 0], [0, 1], [1, 0]]))
|
||||
|
||||
print(f"Input shape: {x.shape}")
|
||||
print(f"Target shape: {target.shape}")
|
||||
|
||||
# Forward pass
|
||||
h1 = layer1.forward(x)
|
||||
h1_activated = activation.forward(h1)
|
||||
output = layer2.forward(h1_activated)
|
||||
|
||||
print(f"Hidden layer shape: {h1.shape}")
|
||||
print(f"Output shape: {output.shape}")
|
||||
|
||||
# Loss
|
||||
loss_fn = MSELoss()
|
||||
loss = loss_fn.forward(output, target)
|
||||
|
||||
print(f"Initial loss: {float(loss.data):.4f}")
|
||||
|
||||
# Backward pass
|
||||
loss.backward()
|
||||
|
||||
# Check all layer gradients exist
|
||||
assert layer1.weight.grad is not None, "Layer1 weight gradient is None!"
|
||||
assert layer1.bias.grad is not None, "Layer1 bias gradient is None!"
|
||||
assert layer2.weight.grad is not None, "Layer2 weight gradient is None!"
|
||||
assert layer2.bias.grad is not None, "Layer2 bias gradient is None!"
|
||||
|
||||
# Check gradient magnitudes
|
||||
l1_weight_norm = np.linalg.norm(layer1.weight.grad.data)
|
||||
l1_bias_norm = np.linalg.norm(layer1.bias.grad.data)
|
||||
l2_weight_norm = np.linalg.norm(layer2.weight.grad.data)
|
||||
l2_bias_norm = np.linalg.norm(layer2.bias.grad.data)
|
||||
|
||||
print(f"\n✓ Layer1 weight gradient norm: {l1_weight_norm:.6f}")
|
||||
print(f"✓ Layer1 bias gradient norm: {l1_bias_norm:.6f}")
|
||||
print(f"✓ Layer2 weight gradient norm: {l2_weight_norm:.6f}")
|
||||
print(f"✓ Layer2 bias gradient norm: {l2_bias_norm:.6f}")
|
||||
|
||||
assert l1_weight_norm > 1e-6, "Layer1 weight gradients too small"
|
||||
assert l1_bias_norm > 1e-6, "Layer1 bias gradients too small"
|
||||
assert l2_weight_norm > 1e-6, "Layer2 weight gradients too small"
|
||||
assert l2_bias_norm > 1e-6, "Layer2 bias gradients too small"
|
||||
|
||||
print("\n✅ TEST PASSED: Gradients flow correctly through MLP")
|
||||
return True
|
||||
|
||||
|
||||
def test_mlp_training_updates():
|
||||
"""Test that MLP actually learns (loss decreases)"""
|
||||
print("\n" + "="*70)
|
||||
print("TEST 3: MLP Training - Loss Reduction")
|
||||
print("="*70)
|
||||
|
||||
# Create simple MLP
|
||||
layer1 = Linear(2, 4)
|
||||
activation = ReLU()
|
||||
layer2 = Linear(4, 1)
|
||||
|
||||
# Simple dataset (XOR-like)
|
||||
X = Tensor(np.array([[0.0, 0.0], [0.0, 1.0], [1.0, 0.0], [1.0, 1.0]]), requires_grad=False)
|
||||
y = Tensor(np.array([[0.0], [1.0], [1.0], [0.0]]))
|
||||
|
||||
# Optimizer
|
||||
optimizer = SGD([layer1.weight, layer1.bias, layer2.weight, layer2.bias], lr=0.1)
|
||||
loss_fn = MSELoss()
|
||||
|
||||
losses = []
|
||||
|
||||
print("Training for 50 epochs...")
|
||||
for epoch in range(50):
|
||||
# Forward
|
||||
h1 = layer1.forward(X)
|
||||
h1_act = activation.forward(h1)
|
||||
output = layer2.forward(h1_act)
|
||||
|
||||
# Loss
|
||||
loss = loss_fn.forward(output, y)
|
||||
losses.append(float(loss.data))
|
||||
|
||||
# Backward
|
||||
optimizer.zero_grad()
|
||||
loss.backward()
|
||||
|
||||
# Update
|
||||
optimizer.step()
|
||||
|
||||
if (epoch + 1) % 10 == 0:
|
||||
print(f"Epoch {epoch+1:2d}: Loss = {float(loss.data):.6f}")
|
||||
|
||||
# Check loss decreased
|
||||
initial_loss = losses[0]
|
||||
final_loss = losses[-1]
|
||||
reduction = initial_loss - final_loss
|
||||
reduction_pct = (reduction / initial_loss) * 100
|
||||
|
||||
print(f"\n✓ Initial loss: {initial_loss:.6f}")
|
||||
print(f"✓ Final loss: {final_loss:.6f}")
|
||||
print(f"✓ Reduction: {reduction:.6f} ({reduction_pct:.1f}%)")
|
||||
|
||||
assert final_loss < initial_loss, f"Loss didn't decrease! Initial: {initial_loss}, Final: {final_loss}"
|
||||
assert reduction_pct > 10, f"Loss reduction too small: {reduction_pct:.1f}%"
|
||||
|
||||
print("\n✅ TEST PASSED: MLP learns successfully (loss decreases)")
|
||||
return True
|
||||
|
||||
|
||||
def test_cnn_gradient_flow():
|
||||
"""Test gradients flow through convolutional layers"""
|
||||
print("\n" + "="*70)
|
||||
print("TEST 4: CNN Gradient Flow")
|
||||
print("="*70)
|
||||
|
||||
# Create simple CNN: Conv2d -> ReLU -> Linear
|
||||
conv = Conv2d(in_channels=1, out_channels=4, kernel_size=3, stride=1, padding=0)
|
||||
activation = ReLU()
|
||||
|
||||
# Input: batch=2, channels=1, height=8, width=8
|
||||
x = Tensor(np.random.randn(2, 1, 8, 8), requires_grad=True)
|
||||
|
||||
print(f"Input shape: {x.shape}")
|
||||
print(f"Conv weight shape: {conv.weight.shape}")
|
||||
|
||||
# Forward through conv
|
||||
conv_out = conv.forward(x)
|
||||
print(f"Conv output shape: {conv_out.shape}")
|
||||
|
||||
activated = activation.forward(conv_out)
|
||||
|
||||
# Flatten for linear layer
|
||||
batch_size = activated.shape[0]
|
||||
flattened_size = np.prod(activated.shape[1:])
|
||||
# Use reshape method to maintain gradient flow
|
||||
flattened = activated.reshape(batch_size, flattened_size)
|
||||
|
||||
linear = Linear(flattened_size, 2)
|
||||
|
||||
# Create optimizer - enables requires_grad on all layer parameters
|
||||
all_params = [conv.weight, conv.bias, linear.weight, linear.bias]
|
||||
optimizer = SGD(all_params, lr=0.01)
|
||||
|
||||
output = linear.forward(flattened)
|
||||
|
||||
print(f"Flattened shape: {flattened.shape}")
|
||||
print(f"Output shape: {output.shape}")
|
||||
|
||||
# Loss
|
||||
target = Tensor(np.array([[1, 0], [0, 1]]))
|
||||
loss_fn = MSELoss()
|
||||
loss = loss_fn.forward(output, target)
|
||||
|
||||
print(f"Initial loss: {float(loss.data):.4f}")
|
||||
|
||||
# Backward
|
||||
loss.backward()
|
||||
|
||||
# Check gradients
|
||||
assert conv.weight.grad is not None, "Conv weight gradient is None!"
|
||||
assert conv.bias.grad is not None, "Conv bias gradient is None!"
|
||||
assert linear.weight.grad is not None, "Linear weight gradient is None!"
|
||||
|
||||
weight_grad_norm = np.linalg.norm(conv.weight.grad.data)
|
||||
conv_bias_norm = np.linalg.norm(conv.bias.grad.data)
|
||||
linear_grad_norm = np.linalg.norm(linear.weight.grad.data)
|
||||
|
||||
print(f"\n✓ Conv weight gradient norm: {weight_grad_norm:.6f}")
|
||||
print(f"✓ Conv bias gradient norm: {conv_bias_norm:.6f}")
|
||||
print(f"✓ Linear weight gradient norm: {linear_grad_norm:.6f}")
|
||||
|
||||
assert weight_grad_norm > 1e-6, f"Conv weight gradients too small: {weight_grad_norm}"
|
||||
assert conv_bias_norm > 1e-6, f"Conv bias gradients too small: {conv_bias_norm}"
|
||||
assert linear_grad_norm > 1e-6, f"Linear gradients too small: {linear_grad_norm}"
|
||||
|
||||
print("\n✅ TEST PASSED: Gradients flow correctly through CNN")
|
||||
return True
|
||||
|
||||
|
||||
def test_cnn_training_updates():
|
||||
"""Test that CNN actually learns on simple data"""
|
||||
print("\n" + "="*70)
|
||||
print("TEST 5: CNN Training - Loss Reduction")
|
||||
print("="*70)
|
||||
|
||||
# Simple CNN
|
||||
conv = Conv2d(1, 2, kernel_size=3, stride=1, padding=1)
|
||||
activation = ReLU()
|
||||
|
||||
# Simple data: 4 samples, 1 channel, 4x4 images
|
||||
X = Tensor(np.random.randn(4, 1, 4, 4), requires_grad=False)
|
||||
|
||||
# After conv: (4, 2, 4, 4) -> flatten to (4, 32)
|
||||
conv_out_size = 2 * 4 * 4 # channels * height * width
|
||||
linear = Linear(conv_out_size, 2)
|
||||
|
||||
y = Tensor(np.array([[1, 0], [0, 1], [1, 0], [0, 1]]))
|
||||
|
||||
# Get parameters with gradients
|
||||
params = []
|
||||
for p in [conv.weight, conv.bias, linear.weight, linear.bias]:
|
||||
if not p.requires_grad:
|
||||
p.requires_grad = True
|
||||
params.append(p)
|
||||
|
||||
# Optimizer
|
||||
optimizer = SGD(params, lr=0.01)
|
||||
loss_fn = MSELoss()
|
||||
|
||||
losses = []
|
||||
|
||||
print("Training for 30 epochs...")
|
||||
for epoch in range(30):
|
||||
# Forward
|
||||
conv_out = conv.forward(X)
|
||||
activated = activation.forward(conv_out)
|
||||
|
||||
# Flatten using reshape to maintain gradients
|
||||
batch_size = activated.shape[0]
|
||||
flattened = activated.reshape(batch_size, -1)
|
||||
|
||||
output = linear.forward(flattened)
|
||||
|
||||
# Loss
|
||||
loss = loss_fn.forward(output, y)
|
||||
losses.append(float(loss.data))
|
||||
|
||||
# Backward
|
||||
optimizer.zero_grad()
|
||||
loss.backward()
|
||||
|
||||
# Update
|
||||
optimizer.step()
|
||||
|
||||
if (epoch + 1) % 10 == 0:
|
||||
print(f"Epoch {epoch+1:2d}: Loss = {float(loss.data):.6f}")
|
||||
|
||||
# Check loss decreased
|
||||
initial_loss = losses[0]
|
||||
final_loss = losses[-1]
|
||||
reduction = initial_loss - final_loss
|
||||
reduction_pct = (reduction / initial_loss) * 100
|
||||
|
||||
print(f"\n✓ Initial loss: {initial_loss:.6f}")
|
||||
print(f"✓ Final loss: {final_loss:.6f}")
|
||||
print(f"✓ Reduction: {reduction:.6f} ({reduction_pct:.1f}%)")
|
||||
|
||||
assert final_loss < initial_loss, f"Loss didn't decrease! Initial: {initial_loss}, Final: {final_loss}"
|
||||
|
||||
print("\n✅ TEST PASSED: CNN learns successfully (loss decreases)")
|
||||
return True
|
||||
|
||||
|
||||
def test_gradient_accumulation():
|
||||
"""Test that gradients accumulate correctly across batches"""
|
||||
print("\n" + "="*70)
|
||||
print("TEST 6: Gradient Accumulation")
|
||||
print("="*70)
|
||||
|
||||
layer = Linear(2, 1)
|
||||
|
||||
# Create optimizer - enables requires_grad on layer parameters
|
||||
optimizer = SGD(layer.parameters(), lr=0.01)
|
||||
|
||||
# Two batches
|
||||
x1 = Tensor([[1.0, 2.0]], requires_grad=True)
|
||||
x2 = Tensor([[3.0, 4.0]], requires_grad=True)
|
||||
target = Tensor([[1.0]])
|
||||
|
||||
loss_fn = MSELoss()
|
||||
|
||||
# Forward + backward on first batch (don't zero grad)
|
||||
out1 = layer.forward(x1)
|
||||
loss1 = loss_fn.forward(out1, target)
|
||||
loss1.backward()
|
||||
|
||||
grad_after_first = np.array(layer.weight.grad.data)
|
||||
|
||||
# Forward + backward on second batch (gradients should accumulate)
|
||||
out2 = layer.forward(x2)
|
||||
loss2 = loss_fn.forward(out2, target)
|
||||
loss2.backward()
|
||||
|
||||
grad_after_second = layer.weight.grad.data
|
||||
|
||||
# Gradients should have accumulated (not been replaced)
|
||||
grad_diff = np.linalg.norm(grad_after_second - grad_after_first)
|
||||
|
||||
print(f"✓ Gradient after first batch norm: {np.linalg.norm(grad_after_first):.6f}")
|
||||
print(f"✓ Gradient after second batch norm: {np.linalg.norm(grad_after_second):.6f}")
|
||||
print(f"✓ Difference: {grad_diff:.6f}")
|
||||
|
||||
assert grad_diff > 1e-6, "Gradients didn't accumulate properly"
|
||||
|
||||
print("\n✅ TEST PASSED: Gradients accumulate correctly")
|
||||
return True
|
||||
|
||||
|
||||
def main():
|
||||
"""Run all gradient flow tests"""
|
||||
print("\n" + "="*70)
|
||||
print(" TINYTORCH GRADIENT FLOW TEST SUITE")
|
||||
print("="*70)
|
||||
|
||||
tests = [
|
||||
("Simple Linear", test_simple_linear_gradient_flow),
|
||||
("MLP Gradient Flow", test_mlp_gradient_flow),
|
||||
("MLP Training", test_mlp_training_updates),
|
||||
("CNN Gradient Flow", test_cnn_gradient_flow),
|
||||
("CNN Training", test_cnn_training_updates),
|
||||
("Gradient Accumulation", test_gradient_accumulation),
|
||||
]
|
||||
|
||||
results = []
|
||||
|
||||
for name, test_func in tests:
|
||||
try:
|
||||
result = test_func()
|
||||
results.append((name, "PASSED" if result else "FAILED"))
|
||||
except Exception as e:
|
||||
print(f"\n❌ TEST FAILED: {name}")
|
||||
print(f"Error: {str(e)}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
results.append((name, "FAILED"))
|
||||
|
||||
# Summary
|
||||
print("\n" + "="*70)
|
||||
print(" TEST SUMMARY")
|
||||
print("="*70)
|
||||
|
||||
passed = sum(1 for _, status in results if status == "PASSED")
|
||||
total = len(results)
|
||||
|
||||
for name, status in results:
|
||||
symbol = "✅" if status == "PASSED" else "❌"
|
||||
print(f"{symbol} {name}: {status}")
|
||||
|
||||
print(f"\nTotal: {passed}/{total} tests passed")
|
||||
|
||||
if passed == total:
|
||||
print("\n🎉 ALL TESTS PASSED! Gradients flow correctly through TinyTorch.")
|
||||
return 0
|
||||
else:
|
||||
print(f"\n⚠️ {total - passed} tests failed. Please review the errors above.")
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
exit(main())
|
||||
@@ -1,636 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
"""
|
||||
Integration Tests for TinyTorch
|
||||
================================
|
||||
Tests complete pipelines work end-to-end.
|
||||
Validates that all components work together correctly.
|
||||
|
||||
Test Categories:
|
||||
- Complete training loops
|
||||
- Data loading pipelines
|
||||
- Model save/load
|
||||
- Checkpoint/resume
|
||||
- Multi-component architectures
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import numpy as np
|
||||
import tempfile
|
||||
import pytest
|
||||
|
||||
# Add project root to path
|
||||
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))
|
||||
sys.path.insert(0, project_root)
|
||||
|
||||
from tinytorch.core.tensor import Tensor
|
||||
from tinytorch.core.layers import Linear
|
||||
from tinytorch.core.activations import ReLU, Sigmoid
|
||||
from tinytorch.core.losses import MSELoss as MeanSquaredError, CrossEntropyLoss
|
||||
from tinytorch.core.optimizers import SGD, Adam
|
||||
from tinytorch.core.spatial import Conv2d
|
||||
from tinytorch.core.dataloader import Dataset, DataLoader
|
||||
|
||||
class Sequential:
|
||||
"""Simple sequential container for testing."""
|
||||
def __init__(self, layers):
|
||||
self.layers = layers
|
||||
def __call__(self, x):
|
||||
for layer in self.layers:
|
||||
x = layer(x)
|
||||
return x
|
||||
def parameters(self):
|
||||
params = []
|
||||
for layer in self.layers:
|
||||
if hasattr(layer, 'parameters'):
|
||||
params.extend(layer.parameters())
|
||||
return params
|
||||
|
||||
class F:
|
||||
"""Functional interface for testing."""
|
||||
@staticmethod
|
||||
def relu(x):
|
||||
from tinytorch.core.activations import ReLU
|
||||
return ReLU()(x)
|
||||
@staticmethod
|
||||
def sigmoid(x):
|
||||
from tinytorch.core.activations import Sigmoid
|
||||
return Sigmoid()(x)
|
||||
@staticmethod
|
||||
def max_pool2d(x, kernel_size):
|
||||
from tinytorch.core.spatial import MaxPool2d
|
||||
return MaxPool2d(kernel_size)(x)
|
||||
@staticmethod
|
||||
def flatten(x, start_dim=1):
|
||||
import numpy as np
|
||||
shape = x.shape
|
||||
new_shape = shape[:start_dim] + (np.prod(shape[start_dim:]),)
|
||||
return x.reshape(*new_shape)
|
||||
|
||||
|
||||
# ============== Complete Training Loop Tests ==============
|
||||
|
||||
def test_basic_training_loop():
|
||||
"""Complete training loop with all components."""
|
||||
# Create simple dataset
|
||||
X_train = Tensor(np.random.randn(100, 10))
|
||||
y_train = Tensor(np.random.randn(100, 5))
|
||||
|
||||
# Build model
|
||||
model = Sequential([
|
||||
Linear(10, 20),
|
||||
ReLU(),
|
||||
Linear(20, 5)
|
||||
])
|
||||
|
||||
# Setup training
|
||||
optimizer = SGD(model.parameters(), lr=0.01)
|
||||
criterion = MeanSquaredError()
|
||||
|
||||
# Training loop
|
||||
initial_loss = None
|
||||
final_loss = None
|
||||
|
||||
for epoch in range(10):
|
||||
# Forward pass
|
||||
y_pred = model(X_train)
|
||||
loss = criterion(y_pred, y_train)
|
||||
|
||||
if epoch == 0:
|
||||
initial_loss = float(loss.data) if hasattr(loss, 'data') else float(loss)
|
||||
if epoch == 9:
|
||||
final_loss = float(loss.data) if hasattr(loss, 'data') else float(loss)
|
||||
|
||||
# Backward pass
|
||||
try:
|
||||
optimizer.zero_grad()
|
||||
loss.backward()
|
||||
optimizer.step()
|
||||
except:
|
||||
# If autograd not available, just test forward passes
|
||||
pass
|
||||
|
||||
# Loss should decrease (or at least not increase much)
|
||||
assert final_loss is not None, "Training loop didn't complete"
|
||||
if initial_loss and final_loss:
|
||||
assert final_loss <= initial_loss * 1.1, "Loss increased during training"
|
||||
|
||||
|
||||
def test_minibatch_training():
|
||||
"""Training with mini-batches."""
|
||||
# Create dataset
|
||||
dataset_size = 128
|
||||
batch_size = 16
|
||||
|
||||
X_train = Tensor(np.random.randn(dataset_size, 10))
|
||||
y_train = Tensor(np.random.randn(dataset_size, 5))
|
||||
|
||||
# Model
|
||||
model = Sequential([
|
||||
Linear(10, 20),
|
||||
ReLU(),
|
||||
Linear(20, 5)
|
||||
])
|
||||
|
||||
optimizer = Adam(model.parameters(), lr=0.001)
|
||||
criterion = MeanSquaredError()
|
||||
|
||||
# Mini-batch training
|
||||
n_batches = dataset_size // batch_size
|
||||
losses = []
|
||||
|
||||
for epoch in range(2):
|
||||
epoch_loss = 0
|
||||
for batch_idx in range(n_batches):
|
||||
# Get batch
|
||||
start_idx = batch_idx * batch_size
|
||||
end_idx = start_idx + batch_size
|
||||
X_batch = Tensor(X_train.data[start_idx:end_idx])
|
||||
y_batch = Tensor(y_train.data[start_idx:end_idx])
|
||||
|
||||
# Training step
|
||||
y_pred = model(X_batch)
|
||||
loss = criterion(y_pred, y_batch)
|
||||
epoch_loss += float(loss.data) if hasattr(loss, 'data') else float(loss)
|
||||
|
||||
try:
|
||||
optimizer.zero_grad()
|
||||
loss.backward()
|
||||
optimizer.step()
|
||||
except:
|
||||
pass
|
||||
|
||||
losses.append(epoch_loss / n_batches)
|
||||
|
||||
# Training should complete without errors
|
||||
assert len(losses) == 2, "Mini-batch training didn't complete"
|
||||
|
||||
|
||||
def test_classification_training():
|
||||
"""Classification task with cross-entropy loss."""
|
||||
# Create classification dataset
|
||||
n_samples = 100
|
||||
n_classes = 3
|
||||
n_features = 10
|
||||
|
||||
X_train = Tensor(np.random.randn(n_samples, n_features))
|
||||
y_train = Tensor(np.random.randint(0, n_classes, n_samples))
|
||||
|
||||
# Classification model
|
||||
model = Sequential([
|
||||
Linear(n_features, 20),
|
||||
ReLU(),
|
||||
Linear(20, n_classes)
|
||||
])
|
||||
|
||||
optimizer = Adam(model.parameters(), lr=0.01)
|
||||
criterion = CrossEntropyLoss()
|
||||
|
||||
# Training
|
||||
for epoch in range(5):
|
||||
logits = model(X_train)
|
||||
loss = criterion(logits, y_train)
|
||||
|
||||
try:
|
||||
optimizer.zero_grad()
|
||||
loss.backward()
|
||||
optimizer.step()
|
||||
except:
|
||||
pass
|
||||
|
||||
# Should produce valid class predictions
|
||||
final_logits = model(X_train)
|
||||
predictions = np.argmax(final_logits.data, axis=1)
|
||||
assert predictions.shape == (n_samples,), "Invalid prediction shape"
|
||||
assert np.all((predictions >= 0) & (predictions < n_classes)), "Invalid class predictions"
|
||||
|
||||
|
||||
# ============== Data Loading Pipeline Tests ==============
|
||||
|
||||
def test_dataset_iteration():
|
||||
"""Dataset and DataLoader work together."""
|
||||
class SimpleDataset(Dataset):
|
||||
def __init__(self, size):
|
||||
self.X = np.random.randn(size, 10)
|
||||
self.y = np.random.randn(size, 5)
|
||||
|
||||
def __len__(self):
|
||||
return len(self.X)
|
||||
|
||||
def __getitem__(self, idx):
|
||||
return Tensor(self.X[idx]), Tensor(self.y[idx])
|
||||
|
||||
dataset = SimpleDataset(100)
|
||||
dataloader = DataLoader(dataset, batch_size=10, shuffle=True)
|
||||
|
||||
# Iterate through dataloader
|
||||
batch_count = 0
|
||||
for X_batch, y_batch in dataloader:
|
||||
assert X_batch.shape == (10, 10), f"Wrong batch shape: {X_batch.shape}"
|
||||
assert y_batch.shape == (10, 5), f"Wrong target shape: {y_batch.shape}"
|
||||
batch_count += 1
|
||||
|
||||
assert batch_count == 10, f"Expected 10 batches, got {batch_count}"
|
||||
|
||||
|
||||
def test_data_augmentation_pipeline():
|
||||
"""Data augmentation in loading pipeline."""
|
||||
class AugmentedDataset(Dataset):
|
||||
def __init__(self, size):
|
||||
self.X = np.random.randn(size, 3, 32, 32)
|
||||
self.y = np.random.randint(0, 10, size)
|
||||
|
||||
def __len__(self):
|
||||
return len(self.X)
|
||||
|
||||
def __getitem__(self, idx):
|
||||
# Simple augmentation: random flip
|
||||
x = self.X[idx]
|
||||
if np.random.random() > 0.5:
|
||||
x = np.flip(x, axis=-1) # Horizontal flip
|
||||
return Tensor(x), Tensor(self.y[idx])
|
||||
|
||||
dataset = AugmentedDataset(50)
|
||||
dataloader = DataLoader(dataset, batch_size=5, shuffle=False)
|
||||
|
||||
# Should handle augmented data
|
||||
for X_batch, y_batch in dataloader:
|
||||
assert X_batch.shape == (5, 3, 32, 32), "Augmented batch wrong shape"
|
||||
break # Just test first batch
|
||||
|
||||
|
||||
# ============== Model Save/Load Tests ==============
|
||||
|
||||
def test_model_save_load():
|
||||
"""Save and load model weights."""
|
||||
model = Sequential([
|
||||
Linear(10, 20),
|
||||
ReLU(),
|
||||
Linear(20, 5)
|
||||
])
|
||||
|
||||
# Get initial predictions
|
||||
x_test = Tensor(np.random.randn(3, 10))
|
||||
initial_output = model(x_test)
|
||||
|
||||
# Save model
|
||||
with tempfile.NamedTemporaryFile(suffix='.pkl', delete=False) as f:
|
||||
temp_path = f.name
|
||||
|
||||
try:
|
||||
# Save weights
|
||||
import pickle
|
||||
weights = {}
|
||||
for i, layer in enumerate(model.layers):
|
||||
if hasattr(layer, 'weight'):
|
||||
weights[f'layer_{i}_weights'] = layer.weight.data
|
||||
if hasattr(layer, 'bias') and layer.bias is not None:
|
||||
weights[f'layer_{i}_bias'] = layer.bias.data
|
||||
|
||||
with open(temp_path, 'wb') as f:
|
||||
pickle.dump(weights, f)
|
||||
|
||||
# Modify model (to ensure load works)
|
||||
for layer in model.layers:
|
||||
if hasattr(layer, 'weight'):
|
||||
layer.weight.data = np.random.randn(*layer.weight.shape)
|
||||
|
||||
# Load weights
|
||||
with open(temp_path, 'rb') as f:
|
||||
loaded_weights = pickle.load(f)
|
||||
|
||||
for i, layer in enumerate(model.layers):
|
||||
if hasattr(layer, 'weight'):
|
||||
layer.weight.data = loaded_weights[f'layer_{i}_weights']
|
||||
if f'layer_{i}_bias' in loaded_weights:
|
||||
layer.bias.data = loaded_weights[f'layer_{i}_bias']
|
||||
|
||||
# Check outputs match
|
||||
loaded_output = model(x_test)
|
||||
assert np.allclose(initial_output.data, loaded_output.data), \
|
||||
"Model outputs differ after save/load"
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
if os.path.exists(temp_path):
|
||||
os.remove(temp_path)
|
||||
|
||||
|
||||
def test_checkpoint_resume_training():
|
||||
"""Save checkpoint and resume training."""
|
||||
# Initial training
|
||||
model = Linear(10, 5)
|
||||
optimizer = SGD(model.parameters(), lr=0.01)
|
||||
|
||||
X = Tensor(np.random.randn(20, 10))
|
||||
y = Tensor(np.random.randn(20, 5))
|
||||
|
||||
# Train for a few steps
|
||||
losses_before = []
|
||||
for _ in range(3):
|
||||
y_pred = model(X)
|
||||
loss = MeanSquaredError()(y_pred, y)
|
||||
losses_before.append(float(loss.data) if hasattr(loss, 'data') else float(loss))
|
||||
|
||||
try:
|
||||
optimizer.zero_grad()
|
||||
loss.backward()
|
||||
optimizer.step()
|
||||
except:
|
||||
pass
|
||||
|
||||
# Save checkpoint
|
||||
checkpoint = {
|
||||
'model_weights': model.weight.data.copy(),
|
||||
'model_bias': model.bias.data.copy() if model.bias is not None else None,
|
||||
'optimizer_state': {'step': 3}, # Simplified
|
||||
'losses': losses_before
|
||||
}
|
||||
|
||||
# Continue training
|
||||
for _ in range(3):
|
||||
y_pred = model(X)
|
||||
loss = MeanSquaredError()(y_pred, y)
|
||||
try:
|
||||
optimizer.zero_grad()
|
||||
loss.backward()
|
||||
optimizer.step()
|
||||
except:
|
||||
pass
|
||||
|
||||
# Restore checkpoint
|
||||
model.weight.data = checkpoint['model_weights']
|
||||
if checkpoint['model_bias'] is not None:
|
||||
model.bias.data = checkpoint['model_bias']
|
||||
|
||||
# Verify restoration worked
|
||||
y_pred = model(X)
|
||||
restored_loss = MeanSquaredError()(y_pred, y)
|
||||
restored_loss_val = float(restored_loss.data) if hasattr(restored_loss, 'data') else float(restored_loss)
|
||||
|
||||
# Loss should be close to checkpoint loss (not the continued training loss)
|
||||
assert abs(restored_loss_val - losses_before[-1]) < abs(restored_loss_val - losses_before[0]), \
|
||||
"Checkpoint restore failed"
|
||||
|
||||
|
||||
# ============== Multi-Component Architecture Tests ==============
|
||||
|
||||
def test_cnn_to_fc_integration():
|
||||
"""CNN features feed into FC classifier."""
|
||||
class CNNClassifier:
|
||||
def __init__(self):
|
||||
# CNN feature extractor
|
||||
self.conv1 = Conv2d(3, 16, kernel_size=3)
|
||||
self.conv2 = Conv2d(16, 32, kernel_size=3)
|
||||
# Classifier head
|
||||
self.fc1 = Linear(32 * 6 * 6, 128)
|
||||
self.fc2 = Linear(128, 10)
|
||||
|
||||
def forward(self, x):
|
||||
# Feature extraction
|
||||
x = F.relu(self.conv1(x))
|
||||
x = F.max_pool2d(x, 2)
|
||||
x = F.relu(self.conv2(x))
|
||||
x = F.max_pool2d(x, 2)
|
||||
# Classification
|
||||
x = F.flatten(x, start_dim=1)
|
||||
x = F.relu(self.fc1(x))
|
||||
return self.fc2(x)
|
||||
|
||||
def parameters(self):
|
||||
params = []
|
||||
for layer in [self.conv1, self.conv2, self.fc1, self.fc2]:
|
||||
if hasattr(layer, 'parameters'):
|
||||
params.extend(layer.parameters())
|
||||
return params
|
||||
|
||||
model = CNNClassifier()
|
||||
x = Tensor(np.random.randn(8, 3, 32, 32))
|
||||
|
||||
# Forward pass should work
|
||||
output = model.forward(x)
|
||||
assert output.shape == (8, 10), f"Wrong output shape: {output.shape}"
|
||||
|
||||
# Training step should work
|
||||
y_true = Tensor(np.random.randint(0, 10, 8))
|
||||
loss = CrossEntropyLoss()(output, y_true)
|
||||
|
||||
optimizer = Adam(model.parameters(), lr=0.001)
|
||||
try:
|
||||
optimizer.zero_grad()
|
||||
loss.backward()
|
||||
optimizer.step()
|
||||
except:
|
||||
pass # Autograd might not be implemented
|
||||
|
||||
|
||||
def test_encoder_decoder_integration():
|
||||
"""Encoder-decoder architecture integration."""
|
||||
class SimpleAutoencoder:
|
||||
def __init__(self, input_dim=784, latent_dim=32):
|
||||
# Encoder
|
||||
self.enc1 = Linear(input_dim, 128)
|
||||
self.enc2 = Linear(128, latent_dim)
|
||||
# Decoder
|
||||
self.dec1 = Linear(latent_dim, 128)
|
||||
self.dec2 = Linear(128, input_dim)
|
||||
|
||||
def encode(self, x):
|
||||
x = F.relu(self.enc1(x))
|
||||
return self.enc2(x)
|
||||
|
||||
def decode(self, z):
|
||||
z = F.relu(self.dec1(z))
|
||||
return F.sigmoid(self.dec2(z))
|
||||
|
||||
def forward(self, x):
|
||||
z = self.encode(x)
|
||||
return self.decode(z)
|
||||
|
||||
def parameters(self):
|
||||
params = []
|
||||
for layer in [self.enc1, self.enc2, self.dec1, self.dec2]:
|
||||
if hasattr(layer, 'parameters'):
|
||||
params.extend(layer.parameters())
|
||||
return params
|
||||
|
||||
model = SimpleAutoencoder()
|
||||
x = Tensor(np.random.randn(16, 784))
|
||||
|
||||
# Test encoding
|
||||
latent = model.encode(x)
|
||||
assert latent.shape == (16, 32), f"Wrong latent shape: {latent.shape}"
|
||||
|
||||
# Test full forward
|
||||
reconstruction = model.forward(x)
|
||||
assert reconstruction.shape == x.shape, "Reconstruction shape mismatch"
|
||||
|
||||
# Test training
|
||||
loss = MeanSquaredError()(reconstruction, x)
|
||||
optimizer = Adam(model.parameters(), lr=0.001)
|
||||
|
||||
try:
|
||||
optimizer.zero_grad()
|
||||
loss.backward()
|
||||
optimizer.step()
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
def test_multi_loss_training():
|
||||
"""Training with multiple loss functions."""
|
||||
# Model with multiple outputs
|
||||
class MultiOutputModel:
|
||||
def __init__(self):
|
||||
self.shared = Linear(10, 20)
|
||||
self.head1 = Linear(20, 5) # Regression head
|
||||
self.head2 = Linear(20, 3) # Classification head
|
||||
|
||||
def forward(self, x):
|
||||
shared_features = F.relu(self.shared(x))
|
||||
out1 = self.head1(shared_features)
|
||||
out2 = self.head2(shared_features)
|
||||
return out1, out2
|
||||
|
||||
def parameters(self):
|
||||
params = []
|
||||
for layer in [self.shared, self.head1, self.head2]:
|
||||
if hasattr(layer, 'parameters'):
|
||||
params.extend(layer.parameters())
|
||||
return params
|
||||
|
||||
model = MultiOutputModel()
|
||||
optimizer = Adam(model.parameters(), lr=0.001)
|
||||
|
||||
# Data
|
||||
X = Tensor(np.random.randn(32, 10))
|
||||
y_reg = Tensor(np.random.randn(32, 5)) # Regression targets
|
||||
y_cls = Tensor(np.random.randint(0, 3, 32)) # Classification targets
|
||||
|
||||
# Forward
|
||||
out_reg, out_cls = model.forward(X)
|
||||
|
||||
# Multiple losses
|
||||
loss_reg = MeanSquaredError()(out_reg, y_reg)
|
||||
loss_cls = CrossEntropyLoss()(out_cls, y_cls)
|
||||
|
||||
# Combined loss
|
||||
total_loss_val = (float(loss_reg.data) if hasattr(loss_reg, 'data') else float(loss_reg)) + \
|
||||
(float(loss_cls.data) if hasattr(loss_cls, 'data') else float(loss_cls))
|
||||
|
||||
# Should handle multiple losses
|
||||
assert total_loss_val > 0, "Combined loss calculation failed"
|
||||
|
||||
|
||||
# ============== End-to-End Pipeline Tests ==============
|
||||
|
||||
def test_mnist_pipeline():
|
||||
"""Complete MNIST training pipeline."""
|
||||
# Simplified MNIST-like data
|
||||
X_train = Tensor(np.random.randn(100, 784)) # Flattened 28x28
|
||||
y_train = Tensor(np.random.randint(0, 10, 100))
|
||||
|
||||
X_val = Tensor(np.random.randn(20, 784))
|
||||
y_val = Tensor(np.random.randint(0, 10, 20))
|
||||
|
||||
# MNIST model
|
||||
model = Sequential([
|
||||
Linear(784, 256),
|
||||
ReLU(),
|
||||
Linear(256, 128),
|
||||
ReLU(),
|
||||
Linear(128, 10)
|
||||
])
|
||||
|
||||
optimizer = Adam(model.parameters(), lr=0.001)
|
||||
criterion = CrossEntropyLoss()
|
||||
|
||||
# Training
|
||||
train_losses = []
|
||||
for epoch in range(3):
|
||||
# Training
|
||||
logits = model(X_train)
|
||||
loss = criterion(logits, y_train)
|
||||
train_losses.append(float(loss.data) if hasattr(loss, 'data') else float(loss))
|
||||
|
||||
try:
|
||||
optimizer.zero_grad()
|
||||
loss.backward()
|
||||
optimizer.step()
|
||||
except:
|
||||
pass
|
||||
|
||||
# Validation
|
||||
val_logits = model(X_val)
|
||||
val_loss = criterion(val_logits, y_val)
|
||||
|
||||
# Accuracy
|
||||
predictions = np.argmax(val_logits.data, axis=1)
|
||||
accuracy = np.mean(predictions == y_val.data)
|
||||
|
||||
# Pipeline should complete
|
||||
assert len(train_losses) == 3, "Training didn't complete"
|
||||
assert 0 <= accuracy <= 1, "Invalid accuracy"
|
||||
|
||||
|
||||
def test_cifar10_pipeline():
|
||||
"""Complete CIFAR-10 training pipeline."""
|
||||
# Simplified CIFAR-like data
|
||||
X_train = Tensor(np.random.randn(50, 3, 32, 32))
|
||||
y_train = Tensor(np.random.randint(0, 10, 50))
|
||||
|
||||
# Simple CNN for CIFAR
|
||||
class SimpleCIFARNet:
|
||||
def __init__(self):
|
||||
self.conv1 = Conv2d(3, 32, kernel_size=3)
|
||||
self.conv2 = Conv2d(32, 64, kernel_size=3)
|
||||
self.fc = Linear(64 * 6 * 6, 10)
|
||||
|
||||
def forward(self, x):
|
||||
x = F.relu(self.conv1(x))
|
||||
x = F.max_pool2d(x, 2)
|
||||
x = F.relu(self.conv2(x))
|
||||
x = F.max_pool2d(x, 2)
|
||||
x = F.flatten(x, start_dim=1)
|
||||
return self.fc(x)
|
||||
|
||||
def parameters(self):
|
||||
params = []
|
||||
for layer in [self.conv1, self.conv2, self.fc]:
|
||||
if hasattr(layer, 'parameters'):
|
||||
params.extend(layer.parameters())
|
||||
return params
|
||||
|
||||
model = SimpleCIFARNet()
|
||||
optimizer = SGD(model.parameters(), lr=0.01)
|
||||
criterion = CrossEntropyLoss()
|
||||
|
||||
# Quick training
|
||||
for epoch in range(2):
|
||||
output = model.forward(X_train)
|
||||
loss = criterion(output, y_train)
|
||||
|
||||
try:
|
||||
optimizer.zero_grad()
|
||||
loss.backward()
|
||||
optimizer.step()
|
||||
except:
|
||||
pass
|
||||
|
||||
# Final predictions
|
||||
final_output = model.forward(X_train)
|
||||
predictions = np.argmax(final_output.data, axis=1)
|
||||
|
||||
# Should produce valid predictions
|
||||
assert predictions.shape == (50,), "Wrong prediction shape"
|
||||
assert np.all((predictions >= 0) & (predictions < 10)), "Invalid predictions"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# When run directly, use pytest
|
||||
import subprocess
|
||||
result = subprocess.run(["pytest", __file__, "-v"], capture_output=True, text=True)
|
||||
print(result.stdout)
|
||||
if result.stderr:
|
||||
print(result.stderr)
|
||||
sys.exit(result.returncode)
|
||||
@@ -1,267 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
"""
|
||||
TinyTorch Milestone Validation Tests
|
||||
=====================================
|
||||
Ensures all three major milestones work end-to-end.
|
||||
Students should be able to build and run these examples successfully.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import numpy as np
|
||||
|
||||
# Add project root to path
|
||||
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))
|
||||
sys.path.insert(0, project_root)
|
||||
|
||||
from tinytorch.core.tensor import Tensor
|
||||
from tinytorch.core.losses import MSELoss as MeanSquaredError
|
||||
from tinytorch.core.layers import Linear
|
||||
from tinytorch.core.activations import ReLU, Sigmoid
|
||||
from tinytorch.core.spatial import Conv2d
|
||||
from tinytorch.core.transformers import TransformerBlock
|
||||
from tinytorch.core.embeddings import Embedding, PositionalEncoding
|
||||
|
||||
class F:
|
||||
"""Functional interface for testing."""
|
||||
@staticmethod
|
||||
def relu(x):
|
||||
from tinytorch.core.activations import ReLU
|
||||
return ReLU()(x)
|
||||
@staticmethod
|
||||
def max_pool2d(x, kernel_size):
|
||||
from tinytorch.core.spatial import MaxPool2d
|
||||
return MaxPool2d(kernel_size)(x)
|
||||
@staticmethod
|
||||
def flatten(x, start_dim=1):
|
||||
import numpy as np
|
||||
shape = x.shape
|
||||
new_shape = shape[:start_dim] + (np.prod(shape[start_dim:]),)
|
||||
return x.reshape(*new_shape)
|
||||
|
||||
|
||||
def test_milestone1_xor():
|
||||
"""Test Milestone 1: XOR Problem with Perceptron."""
|
||||
print("\n" + "="*60)
|
||||
print("MILESTONE 1: XOR Problem (Perceptron)")
|
||||
print("="*60)
|
||||
|
||||
# XOR dataset
|
||||
X = Tensor([[0.0, 0.0], [0.0, 1.0], [1.0, 0.0], [1.0, 1.0]])
|
||||
y = Tensor([[0.0], [1.0], [1.0], [0.0]])
|
||||
|
||||
# Build simple neural network (perceptron with hidden layer)
|
||||
class Sequential:
|
||||
def __init__(self, layers):
|
||||
self.layers = layers
|
||||
def __call__(self, x):
|
||||
for layer in self.layers:
|
||||
x = layer(x)
|
||||
return x
|
||||
model = Sequential([
|
||||
Linear(2, 4),
|
||||
ReLU(),
|
||||
Linear(4, 1),
|
||||
Sigmoid()
|
||||
])
|
||||
|
||||
# Forward pass test
|
||||
output = model(X)
|
||||
print(f"Input shape: {X.shape}")
|
||||
print(f"Output shape: {output.shape}")
|
||||
print(f"✅ XOR network structure works!")
|
||||
|
||||
# Loss function test
|
||||
criterion = MeanSquaredError()
|
||||
loss = criterion(output, y)
|
||||
print(f"Loss value: {loss.data if hasattr(loss, 'data') else loss}")
|
||||
print(f"✅ Loss computation works!")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def test_milestone2_cnn():
|
||||
"""Test Milestone 2: CNN for CIFAR-10."""
|
||||
print("\n" + "="*60)
|
||||
print("MILESTONE 2: CNN for Image Classification")
|
||||
print("="*60)
|
||||
|
||||
# Create simple CNN
|
||||
class SimpleCNN:
|
||||
def __init__(self):
|
||||
self.conv1 = Conv2d(3, 32, kernel_size=(3, 3))
|
||||
self.conv2 = Conv2d(32, 64, kernel_size=(3, 3))
|
||||
# Correct dimensions after convs and pools
|
||||
self.fc1 = Linear(64 * 6 * 6, 256)
|
||||
self.fc2 = Linear(256, 10)
|
||||
|
||||
def forward(self, x):
|
||||
# Conv block 1
|
||||
x = self.conv1(x)
|
||||
x = F.relu(x)
|
||||
x = F.max_pool2d(x, 2)
|
||||
|
||||
# Conv block 2
|
||||
x = self.conv2(x)
|
||||
x = F.relu(x)
|
||||
x = F.max_pool2d(x, 2)
|
||||
|
||||
# Classification head
|
||||
x = F.flatten(x, start_dim=1)
|
||||
x = self.fc1(x)
|
||||
x = F.relu(x)
|
||||
return self.fc2(x)
|
||||
|
||||
# Test with dummy CIFAR-10 batch
|
||||
model = SimpleCNN()
|
||||
batch_size = 4
|
||||
x = Tensor(np.random.randn(batch_size, 3, 32, 32))
|
||||
|
||||
print(f"Input shape (CIFAR batch): {x.shape}")
|
||||
|
||||
# Test each stage
|
||||
x1 = model.conv1(x)
|
||||
print(f"After conv1: {x1.shape} (expected: {batch_size}, 32, 30, 30)")
|
||||
|
||||
x2 = F.max_pool2d(x1, 2)
|
||||
print(f"After pool1: {x2.shape} (expected: {batch_size}, 32, 15, 15)")
|
||||
|
||||
x3 = model.conv2(x2)
|
||||
print(f"After conv2: {x3.shape} (expected: {batch_size}, 64, 13, 13)")
|
||||
|
||||
x4 = F.max_pool2d(x3, 2)
|
||||
print(f"After pool2: {x4.shape} (expected: {batch_size}, 64, 6, 6)")
|
||||
|
||||
# Full forward pass
|
||||
output = model.forward(x)
|
||||
print(f"Final output: {output.shape} (expected: {batch_size}, 10)")
|
||||
|
||||
assert output.shape == (batch_size, 10), f"Output shape mismatch: {output.shape}"
|
||||
print(f"✅ CNN architecture works for CIFAR-10!")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def test_milestone3_tinygpt():
|
||||
"""Test Milestone 3: TinyGPT Language Model."""
|
||||
print("\n" + "="*60)
|
||||
print("MILESTONE 3: TinyGPT Language Model")
|
||||
print("="*60)
|
||||
|
||||
# GPT parameters
|
||||
vocab_size = 100
|
||||
embed_dim = 64
|
||||
seq_length = 10
|
||||
batch_size = 2
|
||||
num_heads = 4
|
||||
|
||||
# Build simple GPT
|
||||
class SimpleGPT:
|
||||
def __init__(self):
|
||||
self.embedding = Embedding(vocab_size, embed_dim)
|
||||
self.pos_encoding = PositionalEncoding(max_seq_len=seq_length, embed_dim=embed_dim)
|
||||
self.transformer = TransformerBlock(embed_dim, num_heads, ff_dim=embed_dim * 4)
|
||||
self.output_proj = Linear(embed_dim, vocab_size)
|
||||
|
||||
def forward(self, x):
|
||||
# Embed tokens
|
||||
x = self.embedding(x)
|
||||
x = self.pos_encoding(x)
|
||||
|
||||
# Transform
|
||||
x = self.transformer(x)
|
||||
|
||||
# Project to vocabulary (with reshaping for Linear)
|
||||
batch, seq, embed = x.shape
|
||||
x_2d = x.reshape(batch * seq, embed)
|
||||
logits_2d = self.output_proj(x_2d)
|
||||
logits = logits_2d.reshape(batch, seq, vocab_size)
|
||||
|
||||
return logits
|
||||
|
||||
# Test with dummy tokens
|
||||
model = SimpleGPT()
|
||||
input_ids = Tensor(np.random.randint(0, vocab_size, (batch_size, seq_length)))
|
||||
|
||||
print(f"Input tokens shape: {input_ids.shape}")
|
||||
|
||||
# Test embedding
|
||||
embedded = model.embedding(input_ids)
|
||||
print(f"After embedding: {embedded.shape} (expected: {batch_size}, {seq_length}, {embed_dim})")
|
||||
|
||||
# Test position encoding
|
||||
with_pos = model.pos_encoding(embedded)
|
||||
print(f"After pos encoding: {with_pos.shape} (expected: {batch_size}, {seq_length}, {embed_dim})")
|
||||
|
||||
# Test transformer
|
||||
transformed = model.transformer(with_pos)
|
||||
print(f"After transformer: {transformed.shape} (expected: {batch_size}, {seq_length}, {embed_dim})")
|
||||
|
||||
# Full forward pass
|
||||
output = model.forward(input_ids)
|
||||
print(f"Final logits: {output.shape} (expected: {batch_size}, {seq_length}, {vocab_size})")
|
||||
|
||||
assert output.shape == (batch_size, seq_length, vocab_size), f"Output shape mismatch: {output.shape}"
|
||||
print(f"✅ TinyGPT architecture works!")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def run_all_milestone_tests():
|
||||
"""Run all milestone validation tests."""
|
||||
print("\n" + "🎯"*30)
|
||||
print("TINYTORCH MILESTONE VALIDATION SUITE")
|
||||
print("Testing that all major learning milestones work correctly")
|
||||
print("🎯"*30)
|
||||
|
||||
results = []
|
||||
|
||||
# Test each milestone
|
||||
try:
|
||||
result1 = test_milestone1_xor()
|
||||
results.append(("XOR/Perceptron", result1))
|
||||
except Exception as e:
|
||||
print(f"❌ XOR test failed: {e}")
|
||||
results.append(("XOR/Perceptron", False))
|
||||
|
||||
try:
|
||||
result2 = test_milestone2_cnn()
|
||||
results.append(("CNN/CIFAR-10", result2))
|
||||
except Exception as e:
|
||||
print(f"❌ CNN test failed: {e}")
|
||||
results.append(("CNN/CIFAR-10", False))
|
||||
|
||||
try:
|
||||
result3 = test_milestone3_tinygpt()
|
||||
results.append(("TinyGPT", result3))
|
||||
except Exception as e:
|
||||
print(f"❌ TinyGPT test failed: {e}")
|
||||
results.append(("TinyGPT", False))
|
||||
|
||||
# Summary
|
||||
print("\n" + "="*60)
|
||||
print("📊 MILESTONE TEST SUMMARY")
|
||||
print("="*60)
|
||||
|
||||
all_passed = True
|
||||
for name, passed in results:
|
||||
status = "✅ PASSED" if passed else "❌ FAILED"
|
||||
print(f"{name}: {status}")
|
||||
all_passed = all_passed and passed
|
||||
|
||||
if all_passed:
|
||||
print("\n🎉 ALL MILESTONES WORKING!")
|
||||
print("Students can successfully build:")
|
||||
print(" 1. Neural networks that solve XOR")
|
||||
print(" 2. CNNs that process real images")
|
||||
print(" 3. Transformers for language modeling")
|
||||
print("\n✨ The learning sandbox is robust!")
|
||||
else:
|
||||
print("\n⚠️ Some milestones need attention")
|
||||
|
||||
return all_passed
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
success = run_all_milestone_tests()
|
||||
sys.exit(0 if success else 1)
|
||||
Reference in New Issue
Block a user