mirror of
https://github.com/MLSysBook/TinyTorch.git
synced 2026-03-10 23:21:59 -05:00
Package exports: - Fix tinytorch/__init__.py to export all required components for milestones - Add Dense as alias for Linear for compatibility - Add loss functions (MSELoss, CrossEntropyLoss, BinaryCrossEntropyLoss) - Export spatial operations, data loaders, and transformer components Test infrastructure: - Create tests/conftest.py to handle path setup - Create tests/test_utils.py with shared test utilities - Rename test_progressive_integration.py files to include module number - Fix syntax errors in test files (spaces in class names) - Remove stale test file referencing non-existent modules Documentation: - Update README.md with correct milestone file names - Fix milestone requirements to match actual module dependencies Export system: - Run tito export --all to regenerate package from source modules - Ensure all 20 modules are properly exported
492 lines
19 KiB
Python
Generated
492 lines
19 KiB
Python
Generated
# ╔═══════════════════════════════════════════════════════════════════════════════╗
|
|
# ║ 🚨 CRITICAL WARNING 🚨 ║
|
|
# ║ AUTOGENERATED! DO NOT EDIT! ║
|
|
# ║ ║
|
|
# ║ This file is AUTOMATICALLY GENERATED from source modules. ║
|
|
# ║ ANY CHANGES MADE HERE WILL BE LOST when modules are re-exported! ║
|
|
# ║ ║
|
|
# ║ ✅ TO EDIT: src/06_optimizers/06_optimizers.py ║
|
|
# ║ ✅ TO EXPORT: Run 'tito module complete <module_name>' ║
|
|
# ║ ║
|
|
# ║ 🛡️ STUDENT PROTECTION: This file contains optimized implementations. ║
|
|
# ║ Editing it directly may break module functionality and training. ║
|
|
# ║ ║
|
|
# ║ 🎓 LEARNING TIP: Work in src/ (developers) or modules/ (learners) ║
|
|
# ║ The tinytorch/ directory is generated code - edit source files instead! ║
|
|
# ╚═══════════════════════════════════════════════════════════════════════════════╝
|
|
# %% auto 0
|
|
__all__ = ['DEFAULT_LEARNING_RATE_SGD', 'DEFAULT_LEARNING_RATE_ADAM', 'DEFAULT_MOMENTUM', 'DEFAULT_BETA1', 'DEFAULT_BETA2',
|
|
'DEFAULT_EPS', 'DEFAULT_WEIGHT_DECAY_ADAMW', 'Optimizer', 'SGD', 'Adam', 'AdamW']
|
|
|
|
# %% ../../modules/06_optimizers/06_optimizers.ipynb 1
|
|
import numpy as np
|
|
from typing import List, Union, Optional, Dict, Any
|
|
|
|
# Import Tensor from Module 01 (now with gradient support from Module 05)
|
|
from .tensor import Tensor
|
|
|
|
# Constants for optimizer defaults
|
|
DEFAULT_LEARNING_RATE_SGD = 0.01 # Default learning rate for SGD
|
|
DEFAULT_LEARNING_RATE_ADAM = 0.001 # Default learning rate for Adam/AdamW
|
|
DEFAULT_MOMENTUM = 0.9 # Default momentum for SGD
|
|
DEFAULT_BETA1 = 0.9 # First moment decay rate for Adam
|
|
DEFAULT_BETA2 = 0.999 # Second moment decay rate for Adam
|
|
DEFAULT_EPS = 1e-8 # Small epsilon for numerical stability in Adam
|
|
DEFAULT_WEIGHT_DECAY_ADAMW = 0.01 # Default weight decay for AdamW
|
|
|
|
# %% ../../modules/06_optimizers/06_optimizers.ipynb 5
|
|
class Optimizer:
|
|
"""
|
|
Base class for all optimizers.
|
|
|
|
This class defines the common interface that all optimizers must implement:
|
|
- zero_grad(): Clear gradients from parameters
|
|
- step(): Update parameters based on gradients
|
|
"""
|
|
|
|
def __init__(self, params: List[Tensor]):
|
|
"""
|
|
Initialize optimizer with parameters to optimize.
|
|
|
|
TODO: Set up the parameter list for optimization
|
|
|
|
APPROACH:
|
|
1. Store parameters as a list for iteration
|
|
2. Validate that all parameters require gradients
|
|
3. Initialize step counter for algorithms that need it
|
|
|
|
EXAMPLE:
|
|
>>> linear = Linear(784, 128)
|
|
>>> optimizer = SGD(linear.parameters(), lr=0.01)
|
|
|
|
HINT: Check that each parameter has requires_grad=True
|
|
"""
|
|
### BEGIN SOLUTION
|
|
# Validate and store parameters
|
|
if not isinstance(params, list):
|
|
params = list(params)
|
|
|
|
# Check that parameters require gradients
|
|
for i, param in enumerate(params):
|
|
# Trust that param is a Tensor from Module 01 with data, grad, requires_grad
|
|
if not param.requires_grad:
|
|
raise ValueError(f"Parameter {i} does not require gradients. Set requires_grad=True.")
|
|
|
|
self.params = params
|
|
self.step_count = 0 # For algorithms that need step counting
|
|
### END SOLUTION
|
|
|
|
def zero_grad(self):
|
|
"""
|
|
Clear gradients from all parameters.
|
|
|
|
TODO: Reset all parameter gradients to None
|
|
|
|
APPROACH:
|
|
1. Iterate through all parameters
|
|
2. Set each parameter's grad to None
|
|
|
|
EXAMPLE:
|
|
>>> optimizer.zero_grad() # Clears all gradients
|
|
>>> assert param.grad is None for param in optimizer.params
|
|
|
|
WHY: Gradients accumulate by default, so we need to clear them between batches
|
|
"""
|
|
### BEGIN SOLUTION
|
|
for param in self.params:
|
|
param.grad = None
|
|
### END SOLUTION
|
|
|
|
def step(self):
|
|
"""
|
|
Update parameters based on gradients.
|
|
|
|
This is abstract - each optimizer implements its own update rule.
|
|
"""
|
|
raise NotImplementedError("Subclasses must implement step()")
|
|
|
|
# %% ../../modules/06_optimizers/06_optimizers.ipynb 9
|
|
class SGD(Optimizer):
|
|
"""
|
|
Stochastic Gradient Descent with momentum.
|
|
|
|
SGD is the foundational optimization algorithm that moves parameters
|
|
in the direction opposite to gradients. With momentum, it remembers
|
|
previous updates to reduce oscillations and accelerate convergence.
|
|
"""
|
|
|
|
def __init__(self, params: List[Tensor], lr: float = DEFAULT_LEARNING_RATE_SGD, momentum: float = 0.0, weight_decay: float = 0.0):
|
|
"""
|
|
Initialize SGD optimizer.
|
|
|
|
TODO: Set up SGD with momentum and weight decay
|
|
|
|
APPROACH:
|
|
1. Call parent constructor to set up parameters
|
|
2. Store learning rate, momentum, and weight decay
|
|
3. Initialize momentum buffers for each parameter
|
|
|
|
EXAMPLE:
|
|
>>> optimizer = SGD(model.parameters(), lr=0.01, momentum=0.9)
|
|
|
|
HINTS:
|
|
- Momentum buffers should be initialized as None
|
|
- They'll be created lazily on first step
|
|
"""
|
|
### BEGIN SOLUTION
|
|
super().__init__(params)
|
|
|
|
self.lr = lr
|
|
self.momentum = momentum
|
|
self.weight_decay = weight_decay
|
|
|
|
# Initialize momentum buffers (created lazily)
|
|
self.momentum_buffers = [None for _ in self.params]
|
|
### END SOLUTION
|
|
|
|
def has_momentum(self) -> bool:
|
|
"""
|
|
Check if this optimizer uses momentum.
|
|
|
|
This explicit API method replaces the need for hasattr() checks
|
|
in checkpointing code (Module 07).
|
|
|
|
Returns:
|
|
bool: True if momentum is enabled (momentum > 0), False otherwise
|
|
|
|
EXAMPLE:
|
|
>>> optimizer = SGD(params, lr=0.01, momentum=0.9)
|
|
>>> optimizer.has_momentum()
|
|
True
|
|
"""
|
|
return self.momentum > 0
|
|
|
|
def get_momentum_state(self) -> Optional[List]:
|
|
"""
|
|
Get momentum buffers for checkpointing.
|
|
|
|
This explicit API method provides safe access to momentum buffers
|
|
without using hasattr(), making the API contract clear.
|
|
|
|
Returns:
|
|
Optional[List]: List of momentum buffers if momentum is enabled,
|
|
None otherwise
|
|
|
|
EXAMPLE:
|
|
>>> optimizer = SGD(params, lr=0.01, momentum=0.9)
|
|
>>> optimizer.step() # Initialize buffers
|
|
>>> state = optimizer.get_momentum_state()
|
|
>>> # Later: optimizer.set_momentum_state(state)
|
|
"""
|
|
if not self.has_momentum():
|
|
return None
|
|
return [buf.copy() if buf is not None else None
|
|
for buf in self.momentum_buffers]
|
|
|
|
def set_momentum_state(self, state: Optional[List]) -> None:
|
|
"""
|
|
Restore momentum buffers from checkpointing.
|
|
|
|
This explicit API method provides safe restoration of momentum state
|
|
without using hasattr().
|
|
|
|
Args:
|
|
state: List of momentum buffers or None
|
|
|
|
EXAMPLE:
|
|
>>> optimizer = SGD(params, lr=0.01, momentum=0.9)
|
|
>>> state = optimizer.get_momentum_state()
|
|
>>> # Training interruption...
|
|
>>> new_optimizer = SGD(params, lr=0.01, momentum=0.9)
|
|
>>> new_optimizer.set_momentum_state(state)
|
|
"""
|
|
if state is None or not self.has_momentum():
|
|
return
|
|
|
|
if len(state) != len(self.momentum_buffers):
|
|
raise ValueError(
|
|
f"State length {len(state)} doesn't match "
|
|
f"optimizer parameters {len(self.momentum_buffers)}"
|
|
)
|
|
|
|
for i, buf in enumerate(state):
|
|
if buf is not None:
|
|
self.momentum_buffers[i] = buf.copy()
|
|
|
|
def step(self):
|
|
"""
|
|
Perform SGD update step with momentum.
|
|
|
|
TODO: Implement SGD parameter update with momentum
|
|
|
|
APPROACH:
|
|
1. For each parameter with gradients:
|
|
a. Apply weight decay if specified
|
|
b. Update momentum buffer
|
|
c. Update parameter using momentum
|
|
|
|
FORMULA:
|
|
- With weight decay: grad = grad + weight_decay * param
|
|
- Momentum: v = momentum * v_prev + grad
|
|
- Update: param = param - lr * v
|
|
|
|
HINTS:
|
|
- Skip parameters without gradients
|
|
- Initialize momentum buffers on first use
|
|
- Use in-place operations to save memory
|
|
"""
|
|
### BEGIN SOLUTION
|
|
for i, param in enumerate(self.params):
|
|
if param.grad is None:
|
|
continue
|
|
|
|
# Get gradient data - grad can be Tensor or numpy array
|
|
grad = param.grad
|
|
# Handle both Tensor (with .data) and numpy array (from autograd) cases
|
|
if isinstance(grad, Tensor):
|
|
grad_data = grad.data
|
|
else:
|
|
# grad is already a numpy array from autograd
|
|
grad_data = grad
|
|
|
|
# Apply weight decay
|
|
if self.weight_decay != 0:
|
|
grad_data = grad_data + self.weight_decay * param.data
|
|
|
|
# Update momentum buffer
|
|
if self.momentum != 0:
|
|
if self.momentum_buffers[i] is None:
|
|
# Initialize momentum buffer
|
|
self.momentum_buffers[i] = np.zeros_like(param.data)
|
|
|
|
# Update momentum: v = momentum * v_prev + grad
|
|
self.momentum_buffers[i] = self.momentum * self.momentum_buffers[i] + grad_data
|
|
grad_data = self.momentum_buffers[i]
|
|
|
|
# Update parameter: param = param - lr * grad
|
|
param.data = param.data - self.lr * grad_data
|
|
|
|
# Increment step counter
|
|
self.step_count += 1
|
|
### END SOLUTION
|
|
|
|
# %% ../../modules/06_optimizers/06_optimizers.ipynb 13
|
|
class Adam(Optimizer):
|
|
"""
|
|
Adam optimizer with adaptive learning rates.
|
|
|
|
Adam computes individual adaptive learning rates for different parameters
|
|
from estimates of first and second moments of the gradients.
|
|
This makes it effective for problems with sparse gradients or noisy data.
|
|
"""
|
|
|
|
def __init__(self, params: List[Tensor], lr: float = DEFAULT_LEARNING_RATE_ADAM, betas: tuple = (DEFAULT_BETA1, DEFAULT_BETA2), eps: float = DEFAULT_EPS, weight_decay: float = 0.0):
|
|
"""
|
|
Initialize Adam optimizer.
|
|
|
|
TODO: Set up Adam with adaptive learning rates
|
|
|
|
APPROACH:
|
|
1. Call parent constructor
|
|
2. Store hyperparameters (lr, betas, eps, weight_decay)
|
|
3. Initialize first and second moment buffers
|
|
|
|
PARAMETERS:
|
|
- lr: Learning rate (default: 0.001)
|
|
- betas: Coefficients for computing running averages (default: (0.9, 0.999))
|
|
- eps: Small constant for numerical stability (default: 1e-8)
|
|
- weight_decay: L2 penalty coefficient (default: 0.0)
|
|
|
|
EXAMPLE:
|
|
>>> optimizer = Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999))
|
|
"""
|
|
### BEGIN SOLUTION
|
|
super().__init__(params)
|
|
|
|
self.lr = lr
|
|
self.beta1, self.beta2 = betas
|
|
self.eps = eps
|
|
self.weight_decay = weight_decay
|
|
|
|
# Initialize moment buffers (created lazily)
|
|
self.m_buffers = [None for _ in self.params] # First moment (mean)
|
|
self.v_buffers = [None for _ in self.params] # Second moment (variance)
|
|
### END SOLUTION
|
|
|
|
def step(self):
|
|
"""
|
|
Perform Adam update step.
|
|
|
|
TODO: Implement Adam parameter update with adaptive learning rates
|
|
|
|
APPROACH:
|
|
1. For each parameter with gradients:
|
|
a. Apply weight decay if specified
|
|
b. Update first moment estimate (momentum of gradient)
|
|
c. Update second moment estimate (momentum of squared gradient)
|
|
d. Compute bias-corrected moments
|
|
e. Update parameter using adaptive learning rate
|
|
|
|
FORMULAS:
|
|
- m_t = β₁ * m_{t-1} + (1-β₁) * g_t
|
|
- v_t = β₂ * v_{t-1} + (1-β₂) * g_t²
|
|
- m̂_t = m_t / (1-β₁^t)
|
|
- v̂_t = v_t / (1-β₂^t)
|
|
- θ_t = θ_{t-1} - lr * m̂_t / (√v̂_t + ε)
|
|
|
|
HINTS:
|
|
- Initialize buffers as zeros on first use
|
|
- Use step_count for bias correction
|
|
- Square gradients element-wise for second moment
|
|
"""
|
|
### BEGIN SOLUTION
|
|
# Increment step counter first (needed for bias correction)
|
|
self.step_count += 1
|
|
|
|
for i, param in enumerate(self.params):
|
|
if param.grad is None:
|
|
continue
|
|
|
|
# Get gradient data - grad can be Tensor or numpy array
|
|
grad = param.grad
|
|
# Handle both Tensor (with .data) and numpy array (from autograd) cases
|
|
if isinstance(grad, Tensor):
|
|
grad_data = grad.data
|
|
else:
|
|
# grad is already a numpy array from autograd
|
|
grad_data = grad
|
|
|
|
# Apply weight decay
|
|
if self.weight_decay != 0:
|
|
grad_data = grad_data + self.weight_decay * param.data
|
|
|
|
# Initialize buffers if needed
|
|
if self.m_buffers[i] is None:
|
|
self.m_buffers[i] = np.zeros_like(param.data)
|
|
self.v_buffers[i] = np.zeros_like(param.data)
|
|
|
|
# Update biased first moment estimate
|
|
self.m_buffers[i] = self.beta1 * self.m_buffers[i] + (1 - self.beta1) * grad_data
|
|
|
|
# Update biased second moment estimate
|
|
self.v_buffers[i] = self.beta2 * self.v_buffers[i] + (1 - self.beta2) * (grad_data ** 2)
|
|
|
|
# Compute bias correction
|
|
bias_correction1 = 1 - self.beta1 ** self.step_count
|
|
bias_correction2 = 1 - self.beta2 ** self.step_count
|
|
|
|
# Compute bias-corrected moments
|
|
m_hat = self.m_buffers[i] / bias_correction1
|
|
v_hat = self.v_buffers[i] / bias_correction2
|
|
|
|
# Update parameter
|
|
param.data = param.data - self.lr * m_hat / (np.sqrt(v_hat) + self.eps)
|
|
### END SOLUTION
|
|
|
|
# %% ../../modules/06_optimizers/06_optimizers.ipynb 17
|
|
class AdamW(Optimizer):
|
|
"""
|
|
AdamW optimizer with decoupled weight decay.
|
|
|
|
AdamW fixes a bug in Adam's weight decay implementation by decoupling
|
|
weight decay from the gradient-based update. This leads to better
|
|
regularization and is the preferred version for most applications.
|
|
"""
|
|
|
|
def __init__(self, params: List[Tensor], lr: float = DEFAULT_LEARNING_RATE_ADAM, betas: tuple = (DEFAULT_BETA1, DEFAULT_BETA2), eps: float = DEFAULT_EPS, weight_decay: float = DEFAULT_WEIGHT_DECAY_ADAMW):
|
|
"""
|
|
Initialize AdamW optimizer.
|
|
|
|
TODO: Set up AdamW with decoupled weight decay
|
|
|
|
APPROACH:
|
|
1. Call parent constructor
|
|
2. Store hyperparameters (note higher default weight_decay)
|
|
3. Initialize moment buffers like Adam
|
|
|
|
KEY DIFFERENCE from Adam:
|
|
- Weight decay is applied directly to parameters, not added to gradients
|
|
- This provides better regularization behavior
|
|
|
|
EXAMPLE:
|
|
>>> optimizer = AdamW(model.parameters(), lr=0.001, weight_decay=0.01)
|
|
"""
|
|
### BEGIN SOLUTION
|
|
super().__init__(params)
|
|
|
|
self.lr = lr
|
|
self.beta1, self.beta2 = betas
|
|
self.eps = eps
|
|
self.weight_decay = weight_decay
|
|
|
|
# Initialize moment buffers (same as Adam)
|
|
self.m_buffers = [None for _ in self.params]
|
|
self.v_buffers = [None for _ in self.params]
|
|
### END SOLUTION
|
|
|
|
def step(self):
|
|
"""
|
|
Perform AdamW update step with decoupled weight decay.
|
|
|
|
TODO: Implement AdamW parameter update
|
|
|
|
APPROACH:
|
|
1. For each parameter with gradients:
|
|
a. Update moments using gradients (NOT modified by weight decay)
|
|
b. Compute bias-corrected moments
|
|
c. Apply gradient-based update
|
|
d. Apply weight decay directly to parameters
|
|
|
|
KEY DIFFERENCE from Adam:
|
|
- Weight decay: θ_t = θ_t - lr * weight_decay * θ_t (applied after gradient update)
|
|
- NOT: grad = grad + weight_decay * param (Adam's incorrect approach)
|
|
|
|
FORMULAS:
|
|
- Same moment updates as Adam (using unmodified gradients)
|
|
- Gradient update: θ_t = θ_{t-1} - lr * m̂_t / (√v̂_t + ε)
|
|
- Weight decay: θ_t = θ_t * (1 - lr * weight_decay)
|
|
|
|
HINT: Apply weight decay after gradient update for proper decoupling
|
|
"""
|
|
### BEGIN SOLUTION
|
|
# Increment step counter first
|
|
self.step_count += 1
|
|
|
|
for i, param in enumerate(self.params):
|
|
if param.grad is None:
|
|
continue
|
|
|
|
# Get gradient data - grad can be Tensor or numpy array
|
|
grad = param.grad
|
|
# Handle both Tensor (with .data) and numpy array (from autograd) cases
|
|
if isinstance(grad, Tensor):
|
|
grad_data = grad.data
|
|
else:
|
|
# grad is already a numpy array from autograd
|
|
grad_data = grad
|
|
|
|
# Initialize buffers if needed
|
|
if self.m_buffers[i] is None:
|
|
self.m_buffers[i] = np.zeros_like(param.data)
|
|
self.v_buffers[i] = np.zeros_like(param.data)
|
|
|
|
# Update moments using pure gradients
|
|
self.m_buffers[i] = self.beta1 * self.m_buffers[i] + (1 - self.beta1) * grad_data
|
|
self.v_buffers[i] = self.beta2 * self.v_buffers[i] + (1 - self.beta2) * (grad_data ** 2)
|
|
|
|
# Compute bias correction
|
|
bias_correction1 = 1 - self.beta1 ** self.step_count
|
|
bias_correction2 = 1 - self.beta2 ** self.step_count
|
|
|
|
# Compute bias-corrected moments
|
|
m_hat = self.m_buffers[i] / bias_correction1
|
|
v_hat = self.v_buffers[i] / bias_correction2
|
|
|
|
# Apply gradient-based update
|
|
param.data = param.data - self.lr * m_hat / (np.sqrt(v_hat) + self.eps)
|
|
|
|
# Apply decoupled weight decay
|
|
if self.weight_decay != 0:
|
|
param.data = param.data * (1 - self.lr * self.weight_decay)
|
|
### END SOLUTION
|