# ╔═══════════════════════════════════════════════════════════════════════════════╗ # ║ 🚨 CRITICAL WARNING 🚨 ║ # ║ AUTOGENERATED! DO NOT EDIT! ║ # ║ ║ # ║ This file is AUTOMATICALLY GENERATED from source modules. ║ # ║ ANY CHANGES MADE HERE WILL BE LOST when modules are re-exported! ║ # ║ ║ # ║ ✅ TO EDIT: src/07_training/07_training.py ║ # ║ ✅ TO EXPORT: Run 'tito module complete ' ║ # ║ ║ # ║ 🛡️ STUDENT PROTECTION: This file contains optimized implementations. ║ # ║ Editing it directly may break module functionality and training. ║ # ║ ║ # ║ 🎓 LEARNING TIP: Work in src/ (developers) or modules/ (learners) ║ # ║ The tinytorch/ directory is generated code - edit source files instead! ║ # ╚═══════════════════════════════════════════════════════════════════════════════╝ # %% auto 0 __all__ = ['DEFAULT_MAX_LR', 'DEFAULT_MIN_LR', 'DEFAULT_TOTAL_EPOCHS', 'CosineSchedule', 'clip_grad_norm', 'Trainer'] # %% ../../modules/07_training/07_training.ipynb 1 import numpy as np import pickle import time from typing import Dict, List, Optional, Tuple, Any, Callable from pathlib import Path import sys import os # Import dependencies from other modules from .tensor import Tensor from .layers import Linear from .losses import MSELoss, CrossEntropyLoss from .optimizers import SGD, AdamW # Constants for learning rate scheduling defaults DEFAULT_MAX_LR = 0.1 # Default maximum learning rate for cosine schedule DEFAULT_MIN_LR = 0.01 # Default minimum learning rate for cosine schedule DEFAULT_TOTAL_EPOCHS = 100 # Default total epochs for learning rate schedule # %% ../../modules/07_training/07_training.ipynb 6 class CosineSchedule: """ Cosine annealing learning rate schedule. Starts at max_lr, decreases following a cosine curve to min_lr over T epochs. This provides aggressive learning initially, then fine-tuning at the end. TODO: Implement cosine annealing schedule APPROACH: 1. Store max_lr, min_lr, and total_epochs 2. In get_lr(), compute cosine factor: (1 + cos(π * epoch / total_epochs)) / 2 3. Interpolate: min_lr + (max_lr - min_lr) * cosine_factor EXAMPLE: >>> schedule = CosineSchedule(max_lr=0.1, min_lr=0.01, total_epochs=100) >>> print(schedule.get_lr(0)) # Start: 0.1 >>> print(schedule.get_lr(50)) # Middle: ~0.055 >>> print(schedule.get_lr(100)) # End: 0.01 HINT: Use np.cos() and np.pi for the cosine calculation """ ### BEGIN SOLUTION def __init__(self, max_lr: float = DEFAULT_MAX_LR, min_lr: float = DEFAULT_MIN_LR, total_epochs: int = DEFAULT_TOTAL_EPOCHS): self.max_lr = max_lr self.min_lr = min_lr self.total_epochs = total_epochs def get_lr(self, epoch: int) -> float: """Get learning rate for current epoch.""" if epoch >= self.total_epochs: return self.min_lr # Cosine annealing formula cosine_factor = (1 + np.cos(np.pi * epoch / self.total_epochs)) / 2 return self.min_lr + (self.max_lr - self.min_lr) * cosine_factor ### END SOLUTION # %% ../../modules/07_training/07_training.ipynb 10 def clip_grad_norm(parameters: List, max_norm: float = 1.0) -> float: """ Clip gradients by global norm to prevent exploding gradients. This is crucial for training stability, especially with RNNs and deep networks. Instead of clipping each gradient individually, we compute the global norm across all parameters and scale uniformly if needed. TODO: Implement gradient clipping by global norm APPROACH: 1. Compute total norm: sqrt(sum of squared gradients across all parameters) 2. If total_norm > max_norm, compute clip_coef = max_norm / total_norm 3. Scale all gradients by clip_coef: grad *= clip_coef 4. Return the original norm for monitoring EXAMPLE: >>> params = [Tensor([1, 2, 3], requires_grad=True)] >>> params[0].grad = Tensor([10, 20, 30]) # Large gradients >>> original_norm = clip_grad_norm(params, max_norm=1.0) >>> print(f"Clipped norm: {np.linalg.norm(params[0].grad.data):.2f}") # Should be ≤ 1.0 HINTS: - Use np.linalg.norm() to compute norms - Only clip if total_norm > max_norm - Modify gradients in-place for efficiency """ ### BEGIN SOLUTION if not parameters: return 0.0 # Collect all gradients and compute global norm total_norm = 0.0 for param in parameters: if param.grad is not None: # Handle both Tensor gradients and numpy array gradients if isinstance(param.grad, np.ndarray): grad_data = param.grad else: # Trust that Tensor has .data attribute grad_data = param.grad.data total_norm += np.sum(grad_data ** 2) total_norm = np.sqrt(total_norm) # Clip if necessary if total_norm > max_norm: clip_coef = max_norm / total_norm for param in parameters: if param.grad is not None: # Handle both Tensor gradients and numpy array gradients if isinstance(param.grad, np.ndarray): param.grad = param.grad * clip_coef else: # Trust that Tensor has .data attribute param.grad.data = param.grad.data * clip_coef return float(total_norm) ### END SOLUTION # %% ../../modules/07_training/07_training.ipynb 14 class Trainer: """ Complete training orchestrator for neural networks. Handles the full training lifecycle: forward pass, loss computation, backward pass, optimization, scheduling, checkpointing, and evaluation. This is the central class that brings together all the components you've built in previous modules. TODO: Implement complete Trainer class APPROACH: 1. Store model, optimizer, loss function, and optional scheduler 2. train_epoch(): Loop through data, compute loss, update parameters 3. evaluate(): Similar loop but without gradient updates 4. save/load_checkpoint(): Persist training state for resumption DESIGN PATTERNS: - Context managers for train/eval modes - Gradient accumulation for effective large batch sizes - Progress tracking for monitoring - Flexible scheduling integration """ ### BEGIN SOLUTION def __init__(self, model, optimizer, loss_fn, scheduler=None, grad_clip_norm=None): """ Initialize trainer with model and training components. Args: model: Neural network to train optimizer: Parameter update strategy (SGD, Adam, etc.) loss_fn: Loss function (CrossEntropy, MSE, etc.) scheduler: Optional learning rate scheduler grad_clip_norm: Optional gradient clipping threshold """ self.model = model self.optimizer = optimizer self.loss_fn = loss_fn self.scheduler = scheduler self.grad_clip_norm = grad_clip_norm # Training state self.epoch = 0 self.step = 0 self.training_mode = True # History tracking self.history = { 'train_loss': [], 'eval_loss': [], 'learning_rates': [] } def train_epoch(self, dataloader, accumulation_steps=1): """ Train for one epoch through the dataset. Args: dataloader: Iterable yielding (inputs, targets) batches accumulation_steps: Number of batches to accumulate before update Returns: Average loss for the epoch """ self.model.training = True self.training_mode = True total_loss = 0.0 num_batches = 0 accumulated_loss = 0.0 for batch_idx, (inputs, targets) in enumerate(dataloader): # Forward pass outputs = self.model.forward(inputs) loss = self.loss_fn.forward(outputs, targets) # Scale loss for accumulation scaled_loss = loss.data / accumulation_steps accumulated_loss += scaled_loss # Backward pass loss.backward() # Update parameters every accumulation_steps if (batch_idx + 1) % accumulation_steps == 0: # Gradient clipping if self.grad_clip_norm is not None: params = self.model.parameters() clip_grad_norm(params, self.grad_clip_norm) # Optimizer step self.optimizer.step() self.optimizer.zero_grad() total_loss += accumulated_loss accumulated_loss = 0.0 num_batches += 1 self.step += 1 # Handle remaining accumulated gradients if accumulated_loss > 0: if self.grad_clip_norm is not None: params = self.model.parameters() clip_grad_norm(params, self.grad_clip_norm) self.optimizer.step() self.optimizer.zero_grad() total_loss += accumulated_loss num_batches += 1 avg_loss = total_loss / max(num_batches, 1) self.history['train_loss'].append(avg_loss) # Update scheduler if self.scheduler is not None: current_lr = self.scheduler.get_lr(self.epoch) # Update optimizer learning rate (trust it has lr attribute) self.optimizer.lr = current_lr self.history['learning_rates'].append(current_lr) self.epoch += 1 return avg_loss def evaluate(self, dataloader): """ Evaluate model on dataset without updating parameters. Args: dataloader: Iterable yielding (inputs, targets) batches Returns: Average loss and accuracy """ self.model.training = False self.training_mode = False total_loss = 0.0 correct = 0 total = 0 for inputs, targets in dataloader: # Forward pass only outputs = self.model.forward(inputs) loss = self.loss_fn.forward(outputs, targets) total_loss += loss.data # Calculate accuracy (for classification) # Trust that Tensors have .data attribute if len(outputs.data.shape) > 1: # Multi-class predictions = np.argmax(outputs.data, axis=1) if len(targets.data.shape) == 1: # Integer targets correct += np.sum(predictions == targets.data) else: # One-hot targets correct += np.sum(predictions == np.argmax(targets.data, axis=1)) total += len(predictions) avg_loss = total_loss / len(dataloader) if len(dataloader) > 0 else 0.0 accuracy = correct / total if total > 0 else 0.0 self.history['eval_loss'].append(avg_loss) return avg_loss, accuracy def save_checkpoint(self, path: str): """ Save complete training state for resumption. Args: path: File path to save checkpoint """ checkpoint = { 'epoch': self.epoch, 'step': self.step, 'model_state': self._get_model_state(), 'optimizer_state': self._get_optimizer_state(), 'scheduler_state': self._get_scheduler_state(), 'history': self.history, 'training_mode': self.training_mode } Path(path).parent.mkdir(parents=True, exist_ok=True) with open(path, 'wb') as f: pickle.dump(checkpoint, f) def load_checkpoint(self, path: str): """ Load training state from checkpoint. Args: path: File path to load checkpoint from """ with open(path, 'rb') as f: checkpoint = pickle.load(f) self.epoch = checkpoint['epoch'] self.step = checkpoint['step'] self.history = checkpoint['history'] self.training_mode = checkpoint['training_mode'] # Restore states (simplified for educational purposes) if 'model_state' in checkpoint: self._set_model_state(checkpoint['model_state']) if 'optimizer_state' in checkpoint: self._set_optimizer_state(checkpoint['optimizer_state']) if 'scheduler_state' in checkpoint: self._set_scheduler_state(checkpoint['scheduler_state']) def _get_model_state(self): """Extract model parameters for checkpointing.""" # Trust model has parameters() method return {i: param.data.copy() for i, param in enumerate(self.model.parameters())} def _set_model_state(self, state): """Restore model parameters from checkpoint.""" # Trust model has parameters() method for i, param in enumerate(self.model.parameters()): if i in state: param.data = state[i].copy() def _get_optimizer_state(self): """Extract optimizer state for checkpointing.""" state = {} # Trust optimizer has lr attribute (from Modules 06) state['lr'] = self.optimizer.lr # Use explicit API for momentum state (Module 06) # All optimizers with momentum support have get_momentum_state() method if hasattr(self.optimizer, 'has_momentum') and self.optimizer.has_momentum(): momentum_state = self.optimizer.get_momentum_state() if momentum_state is not None: state['momentum_buffers'] = momentum_state return state def _set_optimizer_state(self, state): """Restore optimizer state from checkpoint.""" if 'lr' in state: # Trust optimizer has lr attribute (from Modules 06) self.optimizer.lr = state['lr'] # Use explicit API for momentum state (Module 06) # All optimizers with momentum support have set_momentum_state() method if 'momentum_buffers' in state: if hasattr(self.optimizer, 'has_momentum') and self.optimizer.has_momentum(): self.optimizer.set_momentum_state(state['momentum_buffers']) def _get_scheduler_state(self): """Extract scheduler state for checkpointing.""" if self.scheduler is None: return None return { 'max_lr': getattr(self.scheduler, 'max_lr', None), 'min_lr': getattr(self.scheduler, 'min_lr', None), 'total_epochs': getattr(self.scheduler, 'total_epochs', None) } def _set_scheduler_state(self, state): """Restore scheduler state from checkpoint.""" if state is None or self.scheduler is None: return # Educational Note: hasattr() is legitimate here because: # 1. Schedulers are user-extensible with custom attributes # 2. State dict may have keys from different scheduler types # 3. We safely skip attributes that don't exist on current scheduler # This is duck-typing for polymorphic checkpoint restoration for key, value in state.items(): if hasattr(self.scheduler, key): setattr(self.scheduler, key, value) ### END SOLUTION