mirror of
https://github.com/MLSysBook/TinyTorch.git
synced 2026-05-07 11:23:01 -05:00
- Exported 09_training module using nbdev directly from Python file - Exported 08_optimizers module to resolve import dependencies - All training components now available in tinytorch.core.training: * MeanSquaredError, CrossEntropyLoss, BinaryCrossEntropyLoss * Accuracy metric * Trainer class with complete training orchestration - All optimizers now available in tinytorch.core.optimizers: * SGD, Adam optimizers * StepLR learning rate scheduler - All components properly exported and functional - Integration tests passing (17/17) - Inline tests passing (6/6) - tito CLI integration working correctly Package exports: - tinytorch.core.training: 688 lines, 5 main classes - tinytorch.core.optimizers: 17,396 bytes, complete optimizer suite - Clean separation of development vs package code - Ready for production use and further development
503 lines
17 KiB
Python
503 lines
17 KiB
Python
# AUTOGENERATED! DO NOT EDIT! File to edit: ../../modules/source/08_optimizers/optimizers_dev.ipynb.
|
|
|
|
# %% auto 0
|
|
__all__ = ['setup_import_paths', 'gradient_descent_step', 'SGD', 'Adam', 'StepLR']
|
|
|
|
# %% ../../modules/source/08_optimizers/optimizers_dev.ipynb 1
|
|
import math
|
|
import numpy as np
|
|
import sys
|
|
import os
|
|
from typing import List, Dict, Any, Optional, Union
|
|
from collections import defaultdict
|
|
|
|
# Helper function to set up import paths
|
|
def setup_import_paths():
|
|
"""Set up import paths for development modules."""
|
|
import sys
|
|
import os
|
|
|
|
# Add module directories to path
|
|
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
tensor_dir = os.path.join(base_dir, '01_tensor')
|
|
autograd_dir = os.path.join(base_dir, '07_autograd')
|
|
|
|
if tensor_dir not in sys.path:
|
|
sys.path.append(tensor_dir)
|
|
if autograd_dir not in sys.path:
|
|
sys.path.append(autograd_dir)
|
|
|
|
# Import our existing components
|
|
try:
|
|
from tinytorch.core.tensor import Tensor
|
|
from tinytorch.core.autograd import Variable
|
|
except ImportError:
|
|
# For development, try local imports
|
|
try:
|
|
setup_import_paths()
|
|
from tensor_dev import Tensor
|
|
from autograd_dev import Variable
|
|
except ImportError:
|
|
# Create minimal fallback classes for testing
|
|
print("Warning: Using fallback classes for testing")
|
|
|
|
class Tensor:
|
|
def __init__(self, data):
|
|
self.data = np.array(data)
|
|
self.shape = self.data.shape
|
|
|
|
def __str__(self):
|
|
return f"Tensor({self.data})"
|
|
|
|
class Variable:
|
|
def __init__(self, data, requires_grad=True):
|
|
if isinstance(data, (int, float)):
|
|
self.data = Tensor([data])
|
|
else:
|
|
self.data = Tensor(data)
|
|
self.requires_grad = requires_grad
|
|
self.grad = None
|
|
|
|
def zero_grad(self):
|
|
self.grad = None
|
|
|
|
def __str__(self):
|
|
return f"Variable({self.data.data})"
|
|
|
|
# %% ../../modules/source/08_optimizers/optimizers_dev.ipynb 6
|
|
def gradient_descent_step(parameter: Variable, learning_rate: float) -> None:
|
|
"""
|
|
Perform one step of gradient descent on a parameter.
|
|
|
|
Args:
|
|
parameter: Variable with gradient information
|
|
learning_rate: How much to update parameter
|
|
|
|
TODO: Implement basic gradient descent parameter update.
|
|
|
|
STEP-BY-STEP IMPLEMENTATION:
|
|
1. Check if parameter has a gradient
|
|
2. Get current parameter value and gradient
|
|
3. Update parameter: new_value = old_value - learning_rate * gradient
|
|
4. Update parameter data with new value
|
|
5. Handle edge cases (no gradient, invalid values)
|
|
|
|
EXAMPLE USAGE:
|
|
```python
|
|
# Parameter with gradient
|
|
w = Variable(2.0, requires_grad=True)
|
|
w.grad = Variable(0.5) # Gradient from loss
|
|
|
|
# Update parameter
|
|
gradient_descent_step(w, learning_rate=0.1)
|
|
# w.data now contains: 2.0 - 0.1 * 0.5 = 1.95
|
|
```
|
|
|
|
IMPLEMENTATION HINTS:
|
|
- Check if parameter.grad is not None
|
|
- Use parameter.grad.data.data to get gradient value
|
|
- Update parameter.data with new Tensor
|
|
- Don't modify gradient (it's used for logging)
|
|
|
|
LEARNING CONNECTIONS:
|
|
- This is the foundation of all neural network training
|
|
- PyTorch's optimizer.step() does exactly this
|
|
- The learning rate determines convergence speed
|
|
"""
|
|
### BEGIN SOLUTION
|
|
if parameter.grad is not None:
|
|
# Get current parameter value and gradient
|
|
current_value = parameter.data.data
|
|
gradient_value = parameter.grad.data.data
|
|
|
|
# Update parameter: new_value = old_value - learning_rate * gradient
|
|
new_value = current_value - learning_rate * gradient_value
|
|
|
|
# Update parameter data
|
|
parameter.data = Tensor(new_value)
|
|
### END SOLUTION
|
|
|
|
# %% ../../modules/source/08_optimizers/optimizers_dev.ipynb 10
|
|
class SGD:
|
|
"""
|
|
SGD Optimizer with Momentum
|
|
|
|
Implements stochastic gradient descent with momentum:
|
|
v_t = momentum * v_{t-1} + gradient
|
|
parameter = parameter - learning_rate * v_t
|
|
"""
|
|
|
|
def __init__(self, parameters: List[Variable], learning_rate: float = 0.01,
|
|
momentum: float = 0.0, weight_decay: float = 0.0):
|
|
"""
|
|
Initialize SGD optimizer.
|
|
|
|
Args:
|
|
parameters: List of Variables to optimize
|
|
learning_rate: Learning rate (default: 0.01)
|
|
momentum: Momentum coefficient (default: 0.0)
|
|
weight_decay: L2 regularization coefficient (default: 0.0)
|
|
|
|
TODO: Implement SGD optimizer initialization.
|
|
|
|
APPROACH:
|
|
1. Store parameters and hyperparameters
|
|
2. Initialize momentum buffers for each parameter
|
|
3. Set up state tracking for optimization
|
|
4. Prepare for step() and zero_grad() methods
|
|
|
|
EXAMPLE:
|
|
```python
|
|
# Create optimizer
|
|
optimizer = SGD([w1, w2, b1, b2], learning_rate=0.01, momentum=0.9)
|
|
|
|
# In training loop:
|
|
optimizer.zero_grad()
|
|
loss.backward()
|
|
optimizer.step()
|
|
```
|
|
|
|
HINTS:
|
|
- Store parameters as a list
|
|
- Initialize momentum buffers as empty dict
|
|
- Use parameter id() as key for momentum tracking
|
|
- Momentum buffers will be created lazily in step()
|
|
"""
|
|
### BEGIN SOLUTION
|
|
self.parameters = parameters
|
|
self.learning_rate = learning_rate
|
|
self.momentum = momentum
|
|
self.weight_decay = weight_decay
|
|
|
|
# Initialize momentum buffers (created lazily)
|
|
self.momentum_buffers = {}
|
|
|
|
# Track optimization steps
|
|
self.step_count = 0
|
|
### END SOLUTION
|
|
|
|
def step(self) -> None:
|
|
"""
|
|
Perform one optimization step.
|
|
|
|
TODO: Implement SGD parameter update with momentum.
|
|
|
|
APPROACH:
|
|
1. Iterate through all parameters
|
|
2. For each parameter with gradient:
|
|
a. Get current gradient
|
|
b. Apply weight decay if specified
|
|
c. Update momentum buffer (or create if first time)
|
|
d. Update parameter using momentum
|
|
3. Increment step count
|
|
|
|
MATHEMATICAL FORMULATION:
|
|
- If weight_decay > 0: gradient = gradient + weight_decay * parameter
|
|
- momentum_buffer = momentum * momentum_buffer + gradient
|
|
- parameter = parameter - learning_rate * momentum_buffer
|
|
|
|
IMPLEMENTATION HINTS:
|
|
- Use id(param) as key for momentum buffers
|
|
- Initialize buffer with zeros if not exists
|
|
- Handle case where momentum = 0 (no momentum)
|
|
- Update parameter.data with new Tensor
|
|
"""
|
|
### BEGIN SOLUTION
|
|
for param in self.parameters:
|
|
if param.grad is not None:
|
|
# Get gradient
|
|
gradient = param.grad.data.data
|
|
|
|
# Apply weight decay (L2 regularization)
|
|
if self.weight_decay > 0:
|
|
gradient = gradient + self.weight_decay * param.data.data
|
|
|
|
# Get or create momentum buffer
|
|
param_id = id(param)
|
|
if param_id not in self.momentum_buffers:
|
|
self.momentum_buffers[param_id] = np.zeros_like(param.data.data)
|
|
|
|
# Update momentum buffer
|
|
self.momentum_buffers[param_id] = (
|
|
self.momentum * self.momentum_buffers[param_id] + gradient
|
|
)
|
|
|
|
# Update parameter
|
|
param.data = Tensor(
|
|
param.data.data - self.learning_rate * self.momentum_buffers[param_id]
|
|
)
|
|
|
|
self.step_count += 1
|
|
### END SOLUTION
|
|
|
|
def zero_grad(self) -> None:
|
|
"""
|
|
Zero out gradients for all parameters.
|
|
|
|
TODO: Implement gradient zeroing.
|
|
|
|
APPROACH:
|
|
1. Iterate through all parameters
|
|
2. Set gradient to None for each parameter
|
|
3. This prepares for next backward pass
|
|
|
|
IMPLEMENTATION HINTS:
|
|
- Simply set param.grad = None
|
|
- This is called before loss.backward()
|
|
- Essential for proper gradient accumulation
|
|
"""
|
|
### BEGIN SOLUTION
|
|
for param in self.parameters:
|
|
param.grad = None
|
|
### END SOLUTION
|
|
|
|
# %% ../../modules/source/08_optimizers/optimizers_dev.ipynb 14
|
|
class Adam:
|
|
"""
|
|
Adam Optimizer
|
|
|
|
Implements Adam algorithm with adaptive learning rates:
|
|
- First moment: exponential moving average of gradients
|
|
- Second moment: exponential moving average of squared gradients
|
|
- Bias correction: accounts for initialization bias
|
|
- Adaptive updates: different learning rate per parameter
|
|
"""
|
|
|
|
def __init__(self, parameters: List[Variable], learning_rate: float = 0.001,
|
|
beta1: float = 0.9, beta2: float = 0.999, epsilon: float = 1e-8,
|
|
weight_decay: float = 0.0):
|
|
"""
|
|
Initialize Adam optimizer.
|
|
|
|
Args:
|
|
parameters: List of Variables to optimize
|
|
learning_rate: Learning rate (default: 0.001)
|
|
beta1: Exponential decay rate for first moment (default: 0.9)
|
|
beta2: Exponential decay rate for second moment (default: 0.999)
|
|
epsilon: Small constant for numerical stability (default: 1e-8)
|
|
weight_decay: L2 regularization coefficient (default: 0.0)
|
|
|
|
TODO: Implement Adam optimizer initialization.
|
|
|
|
APPROACH:
|
|
1. Store parameters and hyperparameters
|
|
2. Initialize first moment buffers (m_t)
|
|
3. Initialize second moment buffers (v_t)
|
|
4. Set up step counter for bias correction
|
|
|
|
EXAMPLE:
|
|
```python
|
|
# Create Adam optimizer
|
|
optimizer = Adam([w1, w2, b1, b2], learning_rate=0.001)
|
|
|
|
# In training loop:
|
|
optimizer.zero_grad()
|
|
loss.backward()
|
|
optimizer.step()
|
|
```
|
|
|
|
HINTS:
|
|
- Store all hyperparameters
|
|
- Initialize moment buffers as empty dicts
|
|
- Use parameter id() as key for tracking
|
|
- Buffers will be created lazily in step()
|
|
"""
|
|
### BEGIN SOLUTION
|
|
self.parameters = parameters
|
|
self.learning_rate = learning_rate
|
|
self.beta1 = beta1
|
|
self.beta2 = beta2
|
|
self.epsilon = epsilon
|
|
self.weight_decay = weight_decay
|
|
|
|
# Initialize moment buffers (created lazily)
|
|
self.first_moment = {} # m_t
|
|
self.second_moment = {} # v_t
|
|
|
|
# Track optimization steps for bias correction
|
|
self.step_count = 0
|
|
### END SOLUTION
|
|
|
|
def step(self) -> None:
|
|
"""
|
|
Perform one optimization step using Adam algorithm.
|
|
|
|
TODO: Implement Adam parameter update.
|
|
|
|
APPROACH:
|
|
1. Increment step count
|
|
2. For each parameter with gradient:
|
|
a. Get current gradient
|
|
b. Apply weight decay if specified
|
|
c. Update first moment (momentum)
|
|
d. Update second moment (variance)
|
|
e. Apply bias correction
|
|
f. Update parameter with adaptive learning rate
|
|
|
|
MATHEMATICAL FORMULATION:
|
|
- m_t = beta1 * m_{t-1} + (1 - beta1) * gradient
|
|
- v_t = beta2 * v_{t-1} + (1 - beta2) * gradient^2
|
|
- m_hat = m_t / (1 - beta1^t)
|
|
- v_hat = v_t / (1 - beta2^t)
|
|
- parameter = parameter - learning_rate * m_hat / (sqrt(v_hat) + epsilon)
|
|
|
|
IMPLEMENTATION HINTS:
|
|
- Use id(param) as key for moment buffers
|
|
- Initialize buffers with zeros if not exists
|
|
- Use np.sqrt() for square root
|
|
- Handle numerical stability with epsilon
|
|
"""
|
|
### BEGIN SOLUTION
|
|
self.step_count += 1
|
|
|
|
for param in self.parameters:
|
|
if param.grad is not None:
|
|
# Get gradient
|
|
gradient = param.grad.data.data
|
|
|
|
# Apply weight decay (L2 regularization)
|
|
if self.weight_decay > 0:
|
|
gradient = gradient + self.weight_decay * param.data.data
|
|
|
|
# Get or create moment buffers
|
|
param_id = id(param)
|
|
if param_id not in self.first_moment:
|
|
self.first_moment[param_id] = np.zeros_like(param.data.data)
|
|
self.second_moment[param_id] = np.zeros_like(param.data.data)
|
|
|
|
# Update first moment (momentum)
|
|
self.first_moment[param_id] = (
|
|
self.beta1 * self.first_moment[param_id] +
|
|
(1 - self.beta1) * gradient
|
|
)
|
|
|
|
# Update second moment (variance)
|
|
self.second_moment[param_id] = (
|
|
self.beta2 * self.second_moment[param_id] +
|
|
(1 - self.beta2) * gradient * gradient
|
|
)
|
|
|
|
# Bias correction
|
|
first_moment_corrected = (
|
|
self.first_moment[param_id] / (1 - self.beta1 ** self.step_count)
|
|
)
|
|
second_moment_corrected = (
|
|
self.second_moment[param_id] / (1 - self.beta2 ** self.step_count)
|
|
)
|
|
|
|
# Update parameter with adaptive learning rate
|
|
param.data = Tensor(
|
|
param.data.data - self.learning_rate * first_moment_corrected /
|
|
(np.sqrt(second_moment_corrected) + self.epsilon)
|
|
)
|
|
### END SOLUTION
|
|
|
|
def zero_grad(self) -> None:
|
|
"""
|
|
Zero out gradients for all parameters.
|
|
|
|
TODO: Implement gradient zeroing (same as SGD).
|
|
|
|
IMPLEMENTATION HINTS:
|
|
- Set param.grad = None for all parameters
|
|
- This is identical to SGD implementation
|
|
"""
|
|
### BEGIN SOLUTION
|
|
for param in self.parameters:
|
|
param.grad = None
|
|
### END SOLUTION
|
|
|
|
# %% ../../modules/source/08_optimizers/optimizers_dev.ipynb 19
|
|
class StepLR:
|
|
"""
|
|
Step Learning Rate Scheduler
|
|
|
|
Decays learning rate by gamma every step_size epochs:
|
|
learning_rate = initial_lr * (gamma ^ (epoch // step_size))
|
|
"""
|
|
|
|
def __init__(self, optimizer: Union[SGD, Adam], step_size: int, gamma: float = 0.1):
|
|
"""
|
|
Initialize step learning rate scheduler.
|
|
|
|
Args:
|
|
optimizer: Optimizer to schedule
|
|
step_size: Number of epochs between decreases
|
|
gamma: Multiplicative factor for learning rate decay
|
|
|
|
TODO: Implement learning rate scheduler initialization.
|
|
|
|
APPROACH:
|
|
1. Store optimizer reference
|
|
2. Store scheduling parameters
|
|
3. Save initial learning rate
|
|
4. Initialize step counter
|
|
|
|
EXAMPLE:
|
|
```python
|
|
optimizer = SGD([w1, w2], learning_rate=0.1)
|
|
scheduler = StepLR(optimizer, step_size=10, gamma=0.1)
|
|
|
|
# In training loop:
|
|
for epoch in range(100):
|
|
train_one_epoch()
|
|
scheduler.step() # Update learning rate
|
|
```
|
|
|
|
HINTS:
|
|
- Store optimizer reference
|
|
- Save initial learning rate from optimizer
|
|
- Initialize step counter to 0
|
|
- gamma is the decay factor (0.1 = 10x reduction)
|
|
"""
|
|
### BEGIN SOLUTION
|
|
self.optimizer = optimizer
|
|
self.step_size = step_size
|
|
self.gamma = gamma
|
|
self.initial_lr = optimizer.learning_rate
|
|
self.step_count = 0
|
|
### END SOLUTION
|
|
|
|
def step(self) -> None:
|
|
"""
|
|
Update learning rate based on current step.
|
|
|
|
TODO: Implement learning rate update.
|
|
|
|
APPROACH:
|
|
1. Increment step counter
|
|
2. Calculate new learning rate using step decay formula
|
|
3. Update optimizer's learning rate
|
|
|
|
MATHEMATICAL FORMULATION:
|
|
new_lr = initial_lr * (gamma ^ ((step_count - 1) // step_size))
|
|
|
|
IMPLEMENTATION HINTS:
|
|
- Use // for integer division
|
|
- Use ** for exponentiation
|
|
- Update optimizer.learning_rate directly
|
|
"""
|
|
### BEGIN SOLUTION
|
|
self.step_count += 1
|
|
|
|
# Calculate new learning rate
|
|
decay_factor = self.gamma ** ((self.step_count - 1) // self.step_size)
|
|
new_lr = self.initial_lr * decay_factor
|
|
|
|
# Update optimizer's learning rate
|
|
self.optimizer.learning_rate = new_lr
|
|
### END SOLUTION
|
|
|
|
def get_lr(self) -> float:
|
|
"""
|
|
Get current learning rate.
|
|
|
|
TODO: Return current learning rate.
|
|
|
|
IMPLEMENTATION HINTS:
|
|
- Return optimizer.learning_rate
|
|
"""
|
|
### BEGIN SOLUTION
|
|
return self.optimizer.learning_rate
|
|
### END SOLUTION
|