mirror of
https://github.com/MLSysBook/TinyTorch.git
synced 2026-03-10 15:11:59 -05:00
Package exports: - Fix tinytorch/__init__.py to export all required components for milestones - Add Dense as alias for Linear for compatibility - Add loss functions (MSELoss, CrossEntropyLoss, BinaryCrossEntropyLoss) - Export spatial operations, data loaders, and transformer components Test infrastructure: - Create tests/conftest.py to handle path setup - Create tests/test_utils.py with shared test utilities - Rename test_progressive_integration.py files to include module number - Fix syntax errors in test files (spaces in class names) - Remove stale test file referencing non-existent modules Documentation: - Update README.md with correct milestone file names - Fix milestone requirements to match actual module dependencies Export system: - Run tito export --all to regenerate package from source modules - Ensure all 20 modules are properly exported
409 lines
16 KiB
Python
Generated
409 lines
16 KiB
Python
Generated
# ╔═══════════════════════════════════════════════════════════════════════════════╗
|
|
# ║ 🚨 CRITICAL WARNING 🚨 ║
|
|
# ║ AUTOGENERATED! DO NOT EDIT! ║
|
|
# ║ ║
|
|
# ║ This file is AUTOMATICALLY GENERATED from source modules. ║
|
|
# ║ ANY CHANGES MADE HERE WILL BE LOST when modules are re-exported! ║
|
|
# ║ ║
|
|
# ║ ✅ TO EDIT: src/07_training/07_training.py ║
|
|
# ║ ✅ TO EXPORT: Run 'tito module complete <module_name>' ║
|
|
# ║ ║
|
|
# ║ 🛡️ STUDENT PROTECTION: This file contains optimized implementations. ║
|
|
# ║ Editing it directly may break module functionality and training. ║
|
|
# ║ ║
|
|
# ║ 🎓 LEARNING TIP: Work in src/ (developers) or modules/ (learners) ║
|
|
# ║ The tinytorch/ directory is generated code - edit source files instead! ║
|
|
# ╚═══════════════════════════════════════════════════════════════════════════════╝
|
|
# %% auto 0
|
|
__all__ = ['DEFAULT_MAX_LR', 'DEFAULT_MIN_LR', 'DEFAULT_TOTAL_EPOCHS', 'CosineSchedule', 'clip_grad_norm', 'Trainer']
|
|
|
|
# %% ../../modules/07_training/07_training.ipynb 1
|
|
import numpy as np
|
|
import pickle
|
|
import time
|
|
from typing import Dict, List, Optional, Tuple, Any, Callable
|
|
from pathlib import Path
|
|
import sys
|
|
import os
|
|
|
|
# Import dependencies from other modules
|
|
from .tensor import Tensor
|
|
from .layers import Linear
|
|
from .losses import MSELoss, CrossEntropyLoss
|
|
from .optimizers import SGD, AdamW
|
|
|
|
# Constants for learning rate scheduling defaults
|
|
DEFAULT_MAX_LR = 0.1 # Default maximum learning rate for cosine schedule
|
|
DEFAULT_MIN_LR = 0.01 # Default minimum learning rate for cosine schedule
|
|
DEFAULT_TOTAL_EPOCHS = 100 # Default total epochs for learning rate schedule
|
|
|
|
# %% ../../modules/07_training/07_training.ipynb 6
|
|
class CosineSchedule:
|
|
"""
|
|
Cosine annealing learning rate schedule.
|
|
|
|
Starts at max_lr, decreases following a cosine curve to min_lr over T epochs.
|
|
This provides aggressive learning initially, then fine-tuning at the end.
|
|
|
|
TODO: Implement cosine annealing schedule
|
|
|
|
APPROACH:
|
|
1. Store max_lr, min_lr, and total_epochs
|
|
2. In get_lr(), compute cosine factor: (1 + cos(π * epoch / total_epochs)) / 2
|
|
3. Interpolate: min_lr + (max_lr - min_lr) * cosine_factor
|
|
|
|
EXAMPLE:
|
|
>>> schedule = CosineSchedule(max_lr=0.1, min_lr=0.01, total_epochs=100)
|
|
>>> print(schedule.get_lr(0)) # Start: 0.1
|
|
>>> print(schedule.get_lr(50)) # Middle: ~0.055
|
|
>>> print(schedule.get_lr(100)) # End: 0.01
|
|
|
|
HINT: Use np.cos() and np.pi for the cosine calculation
|
|
"""
|
|
### BEGIN SOLUTION
|
|
def __init__(self, max_lr: float = DEFAULT_MAX_LR, min_lr: float = DEFAULT_MIN_LR, total_epochs: int = DEFAULT_TOTAL_EPOCHS):
|
|
self.max_lr = max_lr
|
|
self.min_lr = min_lr
|
|
self.total_epochs = total_epochs
|
|
|
|
def get_lr(self, epoch: int) -> float:
|
|
"""Get learning rate for current epoch."""
|
|
if epoch >= self.total_epochs:
|
|
return self.min_lr
|
|
|
|
# Cosine annealing formula
|
|
cosine_factor = (1 + np.cos(np.pi * epoch / self.total_epochs)) / 2
|
|
return self.min_lr + (self.max_lr - self.min_lr) * cosine_factor
|
|
### END SOLUTION
|
|
|
|
# %% ../../modules/07_training/07_training.ipynb 10
|
|
def clip_grad_norm(parameters: List, max_norm: float = 1.0) -> float:
|
|
"""
|
|
Clip gradients by global norm to prevent exploding gradients.
|
|
|
|
This is crucial for training stability, especially with RNNs and deep networks.
|
|
Instead of clipping each gradient individually, we compute the global norm
|
|
across all parameters and scale uniformly if needed.
|
|
|
|
TODO: Implement gradient clipping by global norm
|
|
|
|
APPROACH:
|
|
1. Compute total norm: sqrt(sum of squared gradients across all parameters)
|
|
2. If total_norm > max_norm, compute clip_coef = max_norm / total_norm
|
|
3. Scale all gradients by clip_coef: grad *= clip_coef
|
|
4. Return the original norm for monitoring
|
|
|
|
EXAMPLE:
|
|
>>> params = [Tensor([1, 2, 3], requires_grad=True)]
|
|
>>> params[0].grad = Tensor([10, 20, 30]) # Large gradients
|
|
>>> original_norm = clip_grad_norm(params, max_norm=1.0)
|
|
>>> print(f"Clipped norm: {np.linalg.norm(params[0].grad.data):.2f}") # Should be ≤ 1.0
|
|
|
|
HINTS:
|
|
- Use np.linalg.norm() to compute norms
|
|
- Only clip if total_norm > max_norm
|
|
- Modify gradients in-place for efficiency
|
|
"""
|
|
### BEGIN SOLUTION
|
|
if not parameters:
|
|
return 0.0
|
|
|
|
# Collect all gradients and compute global norm
|
|
total_norm = 0.0
|
|
for param in parameters:
|
|
if param.grad is not None:
|
|
# Handle both Tensor gradients and numpy array gradients
|
|
if isinstance(param.grad, np.ndarray):
|
|
grad_data = param.grad
|
|
else:
|
|
# Trust that Tensor has .data attribute
|
|
grad_data = param.grad.data
|
|
total_norm += np.sum(grad_data ** 2)
|
|
|
|
total_norm = np.sqrt(total_norm)
|
|
|
|
# Clip if necessary
|
|
if total_norm > max_norm:
|
|
clip_coef = max_norm / total_norm
|
|
for param in parameters:
|
|
if param.grad is not None:
|
|
# Handle both Tensor gradients and numpy array gradients
|
|
if isinstance(param.grad, np.ndarray):
|
|
param.grad = param.grad * clip_coef
|
|
else:
|
|
# Trust that Tensor has .data attribute
|
|
param.grad.data = param.grad.data * clip_coef
|
|
|
|
return float(total_norm)
|
|
### END SOLUTION
|
|
|
|
# %% ../../modules/07_training/07_training.ipynb 14
|
|
class Trainer:
|
|
"""
|
|
Complete training orchestrator for neural networks.
|
|
|
|
Handles the full training lifecycle: forward pass, loss computation,
|
|
backward pass, optimization, scheduling, checkpointing, and evaluation.
|
|
|
|
This is the central class that brings together all the components
|
|
you've built in previous modules.
|
|
|
|
TODO: Implement complete Trainer class
|
|
|
|
APPROACH:
|
|
1. Store model, optimizer, loss function, and optional scheduler
|
|
2. train_epoch(): Loop through data, compute loss, update parameters
|
|
3. evaluate(): Similar loop but without gradient updates
|
|
4. save/load_checkpoint(): Persist training state for resumption
|
|
|
|
DESIGN PATTERNS:
|
|
- Context managers for train/eval modes
|
|
- Gradient accumulation for effective large batch sizes
|
|
- Progress tracking for monitoring
|
|
- Flexible scheduling integration
|
|
"""
|
|
### BEGIN SOLUTION
|
|
def __init__(self, model, optimizer, loss_fn, scheduler=None, grad_clip_norm=None):
|
|
"""
|
|
Initialize trainer with model and training components.
|
|
|
|
Args:
|
|
model: Neural network to train
|
|
optimizer: Parameter update strategy (SGD, Adam, etc.)
|
|
loss_fn: Loss function (CrossEntropy, MSE, etc.)
|
|
scheduler: Optional learning rate scheduler
|
|
grad_clip_norm: Optional gradient clipping threshold
|
|
"""
|
|
self.model = model
|
|
self.optimizer = optimizer
|
|
self.loss_fn = loss_fn
|
|
self.scheduler = scheduler
|
|
self.grad_clip_norm = grad_clip_norm
|
|
|
|
# Training state
|
|
self.epoch = 0
|
|
self.step = 0
|
|
self.training_mode = True
|
|
|
|
# History tracking
|
|
self.history = {
|
|
'train_loss': [],
|
|
'eval_loss': [],
|
|
'learning_rates': []
|
|
}
|
|
|
|
def train_epoch(self, dataloader, accumulation_steps=1):
|
|
"""
|
|
Train for one epoch through the dataset.
|
|
|
|
Args:
|
|
dataloader: Iterable yielding (inputs, targets) batches
|
|
accumulation_steps: Number of batches to accumulate before update
|
|
|
|
Returns:
|
|
Average loss for the epoch
|
|
"""
|
|
self.model.training = True
|
|
self.training_mode = True
|
|
|
|
total_loss = 0.0
|
|
num_batches = 0
|
|
accumulated_loss = 0.0
|
|
|
|
for batch_idx, (inputs, targets) in enumerate(dataloader):
|
|
# Forward pass
|
|
outputs = self.model.forward(inputs)
|
|
loss = self.loss_fn.forward(outputs, targets)
|
|
|
|
# Scale loss for accumulation
|
|
scaled_loss = loss.data / accumulation_steps
|
|
accumulated_loss += scaled_loss
|
|
|
|
# Backward pass
|
|
loss.backward()
|
|
|
|
# Update parameters every accumulation_steps
|
|
if (batch_idx + 1) % accumulation_steps == 0:
|
|
# Gradient clipping
|
|
if self.grad_clip_norm is not None:
|
|
params = self.model.parameters()
|
|
clip_grad_norm(params, self.grad_clip_norm)
|
|
|
|
# Optimizer step
|
|
self.optimizer.step()
|
|
self.optimizer.zero_grad()
|
|
|
|
total_loss += accumulated_loss
|
|
accumulated_loss = 0.0
|
|
num_batches += 1
|
|
self.step += 1
|
|
|
|
# Handle remaining accumulated gradients
|
|
if accumulated_loss > 0:
|
|
if self.grad_clip_norm is not None:
|
|
params = self.model.parameters()
|
|
clip_grad_norm(params, self.grad_clip_norm)
|
|
|
|
self.optimizer.step()
|
|
self.optimizer.zero_grad()
|
|
total_loss += accumulated_loss
|
|
num_batches += 1
|
|
|
|
avg_loss = total_loss / max(num_batches, 1)
|
|
self.history['train_loss'].append(avg_loss)
|
|
|
|
# Update scheduler
|
|
if self.scheduler is not None:
|
|
current_lr = self.scheduler.get_lr(self.epoch)
|
|
# Update optimizer learning rate (trust it has lr attribute)
|
|
self.optimizer.lr = current_lr
|
|
self.history['learning_rates'].append(current_lr)
|
|
|
|
self.epoch += 1
|
|
return avg_loss
|
|
|
|
def evaluate(self, dataloader):
|
|
"""
|
|
Evaluate model on dataset without updating parameters.
|
|
|
|
Args:
|
|
dataloader: Iterable yielding (inputs, targets) batches
|
|
|
|
Returns:
|
|
Average loss and accuracy
|
|
"""
|
|
self.model.training = False
|
|
self.training_mode = False
|
|
|
|
total_loss = 0.0
|
|
correct = 0
|
|
total = 0
|
|
|
|
for inputs, targets in dataloader:
|
|
# Forward pass only
|
|
outputs = self.model.forward(inputs)
|
|
loss = self.loss_fn.forward(outputs, targets)
|
|
|
|
total_loss += loss.data
|
|
|
|
# Calculate accuracy (for classification)
|
|
# Trust that Tensors have .data attribute
|
|
if len(outputs.data.shape) > 1: # Multi-class
|
|
predictions = np.argmax(outputs.data, axis=1)
|
|
if len(targets.data.shape) == 1: # Integer targets
|
|
correct += np.sum(predictions == targets.data)
|
|
else: # One-hot targets
|
|
correct += np.sum(predictions == np.argmax(targets.data, axis=1))
|
|
total += len(predictions)
|
|
|
|
avg_loss = total_loss / len(dataloader) if len(dataloader) > 0 else 0.0
|
|
accuracy = correct / total if total > 0 else 0.0
|
|
|
|
self.history['eval_loss'].append(avg_loss)
|
|
|
|
return avg_loss, accuracy
|
|
|
|
def save_checkpoint(self, path: str):
|
|
"""
|
|
Save complete training state for resumption.
|
|
|
|
Args:
|
|
path: File path to save checkpoint
|
|
"""
|
|
checkpoint = {
|
|
'epoch': self.epoch,
|
|
'step': self.step,
|
|
'model_state': self._get_model_state(),
|
|
'optimizer_state': self._get_optimizer_state(),
|
|
'scheduler_state': self._get_scheduler_state(),
|
|
'history': self.history,
|
|
'training_mode': self.training_mode
|
|
}
|
|
|
|
Path(path).parent.mkdir(parents=True, exist_ok=True)
|
|
with open(path, 'wb') as f:
|
|
pickle.dump(checkpoint, f)
|
|
|
|
def load_checkpoint(self, path: str):
|
|
"""
|
|
Load training state from checkpoint.
|
|
|
|
Args:
|
|
path: File path to load checkpoint from
|
|
"""
|
|
with open(path, 'rb') as f:
|
|
checkpoint = pickle.load(f)
|
|
|
|
self.epoch = checkpoint['epoch']
|
|
self.step = checkpoint['step']
|
|
self.history = checkpoint['history']
|
|
self.training_mode = checkpoint['training_mode']
|
|
|
|
# Restore states (simplified for educational purposes)
|
|
if 'model_state' in checkpoint:
|
|
self._set_model_state(checkpoint['model_state'])
|
|
if 'optimizer_state' in checkpoint:
|
|
self._set_optimizer_state(checkpoint['optimizer_state'])
|
|
if 'scheduler_state' in checkpoint:
|
|
self._set_scheduler_state(checkpoint['scheduler_state'])
|
|
|
|
def _get_model_state(self):
|
|
"""Extract model parameters for checkpointing."""
|
|
# Trust model has parameters() method
|
|
return {i: param.data.copy() for i, param in enumerate(self.model.parameters())}
|
|
|
|
def _set_model_state(self, state):
|
|
"""Restore model parameters from checkpoint."""
|
|
# Trust model has parameters() method
|
|
for i, param in enumerate(self.model.parameters()):
|
|
if i in state:
|
|
param.data = state[i].copy()
|
|
|
|
def _get_optimizer_state(self):
|
|
"""Extract optimizer state for checkpointing."""
|
|
state = {}
|
|
# Trust optimizer has lr attribute (from Modules 06)
|
|
state['lr'] = self.optimizer.lr
|
|
# Use explicit API for momentum state (Module 06)
|
|
# All optimizers with momentum support have get_momentum_state() method
|
|
if hasattr(self.optimizer, 'has_momentum') and self.optimizer.has_momentum():
|
|
momentum_state = self.optimizer.get_momentum_state()
|
|
if momentum_state is not None:
|
|
state['momentum_buffers'] = momentum_state
|
|
return state
|
|
|
|
def _set_optimizer_state(self, state):
|
|
"""Restore optimizer state from checkpoint."""
|
|
if 'lr' in state:
|
|
# Trust optimizer has lr attribute (from Modules 06)
|
|
self.optimizer.lr = state['lr']
|
|
# Use explicit API for momentum state (Module 06)
|
|
# All optimizers with momentum support have set_momentum_state() method
|
|
if 'momentum_buffers' in state:
|
|
if hasattr(self.optimizer, 'has_momentum') and self.optimizer.has_momentum():
|
|
self.optimizer.set_momentum_state(state['momentum_buffers'])
|
|
|
|
def _get_scheduler_state(self):
|
|
"""Extract scheduler state for checkpointing."""
|
|
if self.scheduler is None:
|
|
return None
|
|
return {
|
|
'max_lr': getattr(self.scheduler, 'max_lr', None),
|
|
'min_lr': getattr(self.scheduler, 'min_lr', None),
|
|
'total_epochs': getattr(self.scheduler, 'total_epochs', None)
|
|
}
|
|
|
|
def _set_scheduler_state(self, state):
|
|
"""Restore scheduler state from checkpoint."""
|
|
if state is None or self.scheduler is None:
|
|
return
|
|
# Educational Note: hasattr() is legitimate here because:
|
|
# 1. Schedulers are user-extensible with custom attributes
|
|
# 2. State dict may have keys from different scheduler types
|
|
# 3. We safely skip attributes that don't exist on current scheduler
|
|
# This is duck-typing for polymorphic checkpoint restoration
|
|
for key, value in state.items():
|
|
if hasattr(self.scheduler, key):
|
|
setattr(self.scheduler, key, value)
|
|
### END SOLUTION
|