# AUTOGENERATED! DO NOT EDIT! File to edit: ../../modules/14_transformers/transformers_dev.ipynb. # %% auto 0 __all__ = ['LayerNorm', 'PositionwiseFeedForward', 'TransformerBlock', 'Transformer', 'TransformerProfiler', 'analyze_transformer_system_design'] # %% ../../modules/14_transformers/transformers_dev.ipynb 1 import math import numpy as np import os import sys from typing import Union, List, Optional, Tuple, Dict # Import our Tensor class - try from package first, then from local module try: from tinytorch.core.tensor import Tensor except ImportError: # For development, import from local tensor module sys.path.append(os.path.join(os.path.dirname(__file__), '..', '02_tensor')) from tensor_dev import Tensor # Try to import attention classes try: from tinytorch.core.attention import ScaledDotProductAttention, MultiHeadAttention, KVCache except ImportError: # For development, import from local module sys.path.append(os.path.join(os.path.dirname(__file__), '..', '13_attention')) try: from attention_dev import ScaledDotProductAttention, MultiHeadAttention, KVCache except ImportError: # Create minimal mock classes if not available class MultiHeadAttention: def __init__(self, embed_dim, num_heads): self.embed_dim = embed_dim self.num_heads = num_heads def forward(self, q, k, v, mask=None): return q # Mock implementation class ScaledDotProductAttention: def __init__(self): pass class KVCache: def __init__(self, *args, **kwargs): pass # Try to import embedding classes try: from tinytorch.core.embeddings import Embedding, PositionalEncoding except ImportError: # For development, import from local module sys.path.append(os.path.join(os.path.dirname(__file__), '..', '12_embeddings')) try: from embeddings_dev import Embedding, PositionalEncoding except ImportError: # Create minimal mock classes if not available class Embedding: def __init__(self, vocab_size, embedding_dim): self.vocab_size = vocab_size self.embedding_dim = embedding_dim class PositionalEncoding: def __init__(self, embedding_dim, max_seq_length=5000): self.embedding_dim = embedding_dim # %% ../../modules/14_transformers/transformers_dev.ipynb 6 class LayerNorm: """ Layer Normalization for transformers. Normalizes across the feature dimension (last axis) for each sample, making training more stable and enabling deeper networks. """ def __init__(self, normalized_shape: Union[int, Tuple[int]], eps: float = 1e-5): """ Initialize layer normalization with learnable parameters. TODO: Implement layer normalization initialization. STEP-BY-STEP IMPLEMENTATION: 1. Store normalization configuration 2. Initialize learnable scale (gamma) and shift (beta) parameters 3. Set epsilon for numerical stability 4. Set up parameter tracking for optimization MATHEMATICAL FOUNDATION: LayerNorm(x) = γ * (x - μ) / σ + β Where: - μ = mean across feature dimensions - σ = std across feature dimensions - γ = learnable scale parameter - β = learnable shift parameter Args: normalized_shape: Shape of features to normalize (e.g., embedding_dim) eps: Small value for numerical stability """ ### BEGIN SOLUTION if isinstance(normalized_shape, int): self.normalized_shape = (normalized_shape,) else: self.normalized_shape = normalized_shape self.eps = eps # Initialize learnable parameters # Gamma (scale): initialized to ones # Beta (bias): initialized to zeros self.gamma = Tensor(np.ones(self.normalized_shape)) self.beta = Tensor(np.zeros(self.normalized_shape)) # Track parameters for optimization self.parameters = [self.gamma, self.beta] ### END SOLUTION def forward(self, x: Tensor) -> Tensor: """ Apply layer normalization to input tensor. TODO: Implement layer normalization forward pass. STEP-BY-STEP IMPLEMENTATION: 1. Calculate mean across feature dimensions 2. Calculate standard deviation across feature dimensions 3. Normalize: (x - mean) / (std + eps) 4. Apply learnable scale and shift: gamma * normalized + beta NUMERICAL STABILITY: - Add eps to variance before taking sqrt - Use unbiased variance calculation EXAMPLE: layer_norm = LayerNorm(256) x = Tensor(np.random.randn(32, 128, 256)) # (batch, seq, features) normalized = layer_norm.forward(x) # Same shape as input Args: x: Input tensor with shape (..., *normalized_shape) Returns: Normalized tensor with same shape as input """ ### BEGIN SOLUTION # Calculate mean and variance across the feature dimensions (last axes) # For shape (..., *normalized_shape), we want to normalize over the last len(normalized_shape) axes # Determine axes to normalize over axes_to_normalize = tuple(range(len(x.shape) - len(self.normalized_shape), len(x.shape))) # Calculate mean mean = np.mean(x.data, axis=axes_to_normalize, keepdims=True) # Calculate variance variance = np.var(x.data, axis=axes_to_normalize, keepdims=True) # Normalize normalized = (x.data - mean) / np.sqrt(variance + self.eps) # Apply learnable scale and shift # Reshape gamma and beta to be broadcastable gamma_broadcasted = self.gamma.data.reshape([1] * (len(x.shape) - len(self.normalized_shape)) + list(self.normalized_shape)) beta_broadcasted = self.beta.data.reshape([1] * (len(x.shape) - len(self.normalized_shape)) + list(self.normalized_shape)) output = gamma_broadcasted * normalized + beta_broadcasted return Tensor(output) ### END SOLUTION def __call__(self, x: Tensor) -> Tensor: """Make the class callable.""" return self.forward(x) def get_memory_usage(self) -> Dict[str, float]: """ Calculate memory usage of layer normalization parameters. This function is PROVIDED to show memory analysis. """ # Parameter memory param_memory_mb = sum(param.data.nbytes for param in self.parameters) / (1024 * 1024) return { 'parameter_memory_mb': param_memory_mb, 'total_parameters': sum(param.data.size for param in self.parameters), 'normalized_shape': self.normalized_shape } # %% ../../modules/14_transformers/transformers_dev.ipynb 10 class PositionwiseFeedForward: """ Position-wise feed-forward network used in transformer blocks. Applies the same feed-forward network to each position in the sequence: FFN(x) = max(0, xW₁ + b₁)W₂ + b₂ """ def __init__(self, embed_dim: int, hidden_dim: int, dropout: float = 0.0): """ Initialize position-wise feed-forward network. TODO: Implement feed-forward network initialization. STEP-BY-STEP IMPLEMENTATION: 1. Store network configuration 2. Initialize weight matrices and bias vectors for two linear layers 3. Set up parameter tracking for optimization 4. Store dropout rate for training ARCHITECTURE: - Input: (batch, seq_len, embed_dim) - Linear 1: embed_dim → hidden_dim - ReLU activation - Linear 2: hidden_dim → embed_dim - Output: (batch, seq_len, embed_dim) PARAMETER INITIALIZATION: Use Xavier/Glorot initialization for stable training Args: embed_dim: Embedding dimension (input and output size) hidden_dim: Hidden layer dimension (typically 4 * embed_dim) dropout: Dropout rate for regularization """ ### BEGIN SOLUTION self.embed_dim = embed_dim self.hidden_dim = hidden_dim self.dropout = dropout # Initialize weights using Xavier initialization # W1: embed_dim → hidden_dim xavier_bound_1 = math.sqrt(6.0 / (embed_dim + hidden_dim)) self.w1 = Tensor(np.random.uniform(-xavier_bound_1, xavier_bound_1, (embed_dim, hidden_dim))) self.b1 = Tensor(np.zeros(hidden_dim)) # W2: hidden_dim → embed_dim xavier_bound_2 = math.sqrt(6.0 / (hidden_dim + embed_dim)) self.w2 = Tensor(np.random.uniform(-xavier_bound_2, xavier_bound_2, (hidden_dim, embed_dim))) self.b2 = Tensor(np.zeros(embed_dim)) # Track parameters for optimization self.parameters = [self.w1, self.b1, self.w2, self.b2] ### END SOLUTION def forward(self, x: Tensor) -> Tensor: """ Apply position-wise feed-forward transformation. TODO: Implement feed-forward forward pass. STEP-BY-STEP IMPLEMENTATION: 1. Apply first linear transformation: x @ W1 + b1 2. Apply ReLU activation: max(0, linear1) 3. Apply second linear transformation: relu @ W2 + b2 4. Return result with same shape as input MATHEMATICAL FORMULATION: hidden = ReLU(x @ W1 + b1) output = hidden @ W2 + b2 Args: x: Input tensor with shape (batch_size, seq_len, embed_dim) Returns: Output tensor with shape (batch_size, seq_len, embed_dim) """ ### BEGIN SOLUTION # Reshape input for matrix multiplication if needed original_shape = x.shape if len(x.shape) == 3: batch_size, seq_len, embed_dim = x.shape # Reshape to (batch_size * seq_len, embed_dim) for efficient computation x_reshaped = x.data.reshape(-1, embed_dim) else: x_reshaped = x.data # First linear transformation: x @ W1 + b1 hidden = np.matmul(x_reshaped, self.w1.data) + self.b1.data # ReLU activation hidden_relu = np.maximum(0, hidden) # Second linear transformation: hidden @ W2 + b2 output = np.matmul(hidden_relu, self.w2.data) + self.b2.data # Reshape back to original shape if len(original_shape) == 3: output = output.reshape(original_shape) return Tensor(output) ### END SOLUTION def __call__(self, x: Tensor) -> Tensor: """Make the class callable.""" return self.forward(x) def get_memory_usage(self) -> Dict[str, float]: """ Calculate memory usage of feed-forward parameters. This function is PROVIDED to show memory analysis. """ # Parameter memory param_memory_mb = sum(param.data.nbytes for param in self.parameters) / (1024 * 1024) # Calculate parameter counts w1_params = self.embed_dim * self.hidden_dim w2_params = self.hidden_dim * self.embed_dim bias_params = self.hidden_dim + self.embed_dim total_params = w1_params + w2_params + bias_params return { 'parameter_memory_mb': param_memory_mb, 'total_parameters': total_params, 'w1_parameters': w1_params, 'w2_parameters': w2_params, 'bias_parameters': bias_params, 'embed_dim': self.embed_dim, 'hidden_dim': self.hidden_dim } # %% ../../modules/14_transformers/transformers_dev.ipynb 14 class TransformerBlock: """ Complete transformer block with self-attention and feed-forward layers. Combines multi-head self-attention, layer normalization, residual connections, and position-wise feed-forward networks into the standard transformer architecture. """ def __init__(self, embed_dim: int, num_heads: int, hidden_dim: int, dropout: float = 0.0, pre_norm: bool = True): """ Initialize transformer block with all components. TODO: Implement transformer block initialization. STEP-BY-STEP IMPLEMENTATION: 1. Store block configuration 2. Create multi-head attention layer 3. Create two layer normalization layers (for attention and FFN) 4. Create position-wise feed-forward network 5. Set up parameter tracking from all sub-components ARCHITECTURE CHOICE: Pre-norm vs Post-norm - Pre-norm: LayerNorm → Attention → Residual (more stable) - Post-norm: Attention → LayerNorm → Residual (original paper) Args: embed_dim: Embedding dimension num_heads: Number of attention heads hidden_dim: Feed-forward hidden dimension (typically 4 * embed_dim) dropout: Dropout rate for regularization pre_norm: Whether to use pre-normalization (recommended) """ ### BEGIN SOLUTION self.embed_dim = embed_dim self.num_heads = num_heads self.hidden_dim = hidden_dim self.dropout = dropout self.pre_norm = pre_norm # Multi-head self-attention self.attention = MultiHeadAttention(embed_dim=embed_dim, num_heads=num_heads) # Layer normalization layers self.norm1 = LayerNorm(embed_dim) # For attention self.norm2 = LayerNorm(embed_dim) # For feed-forward # Position-wise feed-forward network self.ffn = PositionwiseFeedForward(embed_dim=embed_dim, hidden_dim=hidden_dim, dropout=dropout) # Collect all parameters from sub-components self.parameters = [] if hasattr(self.attention, 'parameters'): self.parameters.extend(self.attention.parameters) self.parameters.extend(self.norm1.parameters) self.parameters.extend(self.norm2.parameters) self.parameters.extend(self.ffn.parameters) ### END SOLUTION def forward(self, x: Tensor, mask: Optional[Tensor] = None, return_attention_weights: bool = False) -> Union[Tensor, Tuple[Tensor, Tensor]]: """ Process input through complete transformer block. TODO: Implement transformer block forward pass. STEP-BY-STEP IMPLEMENTATION (Pre-norm): 1. Self-attention with residual: x + attention(norm1(x)) 2. Feed-forward with residual: attn_out + ffn(norm2(attn_out)) 3. Return final output (and optionally attention weights) RESIDUAL CONNECTIONS: Essential for training deep networks - allow gradients to flow directly Args: x: Input tensor with shape (batch_size, seq_len, embed_dim) mask: Optional attention mask return_attention_weights: Whether to return attention weights Returns: Transformer block output with same shape as input Optionally also attention weights """ ### BEGIN SOLUTION if self.pre_norm: # Pre-normalization: LayerNorm before attention/FFN # Self-attention with residual connection norm1_x = self.norm1(x) if return_attention_weights: attn_output, attn_weights = self.attention.forward( norm1_x, norm1_x, norm1_x, mask=mask, return_attention_weights=True ) else: attn_output = self.attention.forward(norm1_x, norm1_x, norm1_x, mask=mask) # Residual connection x = Tensor(x.data + attn_output.data) # Feed-forward with residual connection norm2_x = self.norm2(x) ffn_output = self.ffn.forward(norm2_x) # Residual connection output = Tensor(x.data + ffn_output.data) else: # Post-normalization: LayerNorm after attention/FFN (original transformer) # Self-attention with residual connection if return_attention_weights: attn_output, attn_weights = self.attention.forward( x, x, x, mask=mask, return_attention_weights=True ) else: attn_output = self.attention.forward(x, x, x, mask=mask) # Residual + LayerNorm attn_residual = Tensor(x.data + attn_output.data) norm1_output = self.norm1(attn_residual) # Feed-forward with residual connection ffn_output = self.ffn.forward(norm1_output) # Residual + LayerNorm ffn_residual = Tensor(norm1_output.data + ffn_output.data) output = self.norm2(ffn_residual) if return_attention_weights: return output, attn_weights else: return output ### END SOLUTION def __call__(self, x: Tensor, mask: Optional[Tensor] = None, return_attention_weights: bool = False) -> Union[Tensor, Tuple[Tensor, Tensor]]: """Make the class callable.""" return self.forward(x, mask, return_attention_weights) def get_memory_usage(self) -> Dict[str, float]: """ Calculate memory usage of transformer block components. This function is PROVIDED to show memory analysis. """ # Get memory usage from components if hasattr(self.attention, 'get_memory_usage'): attention_memory = self.attention.get_memory_usage()['total_parameter_memory_mb'] else: attention_memory = 0.0 norm1_memory = self.norm1.get_memory_usage()['parameter_memory_mb'] norm2_memory = self.norm2.get_memory_usage()['parameter_memory_mb'] ffn_memory = self.ffn.get_memory_usage()['parameter_memory_mb'] total_memory = attention_memory + norm1_memory + norm2_memory + ffn_memory total_params = len(self.parameters) if hasattr(self, 'parameters') else 0 return { 'total_memory_mb': total_memory, 'attention_memory_mb': attention_memory, 'norm_memory_mb': norm1_memory + norm2_memory, 'ffn_memory_mb': ffn_memory, 'total_parameters': sum(p.data.size for p in self.parameters) if hasattr(self, 'parameters') else 0, 'embed_dim': self.embed_dim, 'num_heads': self.num_heads, 'hidden_dim': self.hidden_dim, 'pre_norm': self.pre_norm } # %% ../../modules/14_transformers/transformers_dev.ipynb 18 class Transformer: """ Complete transformer model for language processing. Stacks multiple transformer blocks with token embeddings and positional encoding to create a complete language model architecture. """ def __init__(self, vocab_size: int, embed_dim: int, num_heads: int, num_layers: int, hidden_dim: int, max_seq_length: int = 1024, dropout: float = 0.0, pre_norm: bool = True): """ Initialize complete transformer model. TODO: Implement transformer model initialization. STEP-BY-STEP IMPLEMENTATION: 1. Store model configuration 2. Create token embedding layer 3. Create positional encoding 4. Create stack of transformer blocks 5. Create output projection layer (for language modeling) 6. Set up parameter tracking from all components LANGUAGE MODELING HEAD: Final linear layer that projects hidden states to vocabulary logits Args: vocab_size: Size of vocabulary embed_dim: Embedding dimension num_heads: Number of attention heads per layer num_layers: Number of transformer blocks hidden_dim: Feed-forward hidden dimension max_seq_length: Maximum sequence length for positional encoding dropout: Dropout rate pre_norm: Whether to use pre-normalization """ ### BEGIN SOLUTION self.vocab_size = vocab_size self.embed_dim = embed_dim self.num_heads = num_heads self.num_layers = num_layers self.hidden_dim = hidden_dim self.max_seq_length = max_seq_length self.dropout = dropout self.pre_norm = pre_norm # Token embedding layer self.token_embedding = Embedding(vocab_size=vocab_size, embedding_dim=embed_dim) # Positional encoding self.pos_encoding = PositionalEncoding(embedding_dim=embed_dim, max_seq_length=max_seq_length) # Stack of transformer blocks self.transformer_blocks = [] for _ in range(num_layers): block = TransformerBlock( embed_dim=embed_dim, num_heads=num_heads, hidden_dim=hidden_dim, dropout=dropout, pre_norm=pre_norm ) self.transformer_blocks.append(block) # Final layer normalization (for pre-norm architecture) if pre_norm: self.final_norm = LayerNorm(embed_dim) else: self.final_norm = None # Language modeling head (projects to vocabulary) xavier_bound = math.sqrt(6.0 / (embed_dim + vocab_size)) self.lm_head = Tensor(np.random.uniform(-xavier_bound, xavier_bound, (embed_dim, vocab_size))) # Collect all parameters self.parameters = [] if hasattr(self.token_embedding, 'parameters'): self.parameters.extend(self.token_embedding.parameters) for block in self.transformer_blocks: if hasattr(block, 'parameters'): self.parameters.extend(block.parameters) if self.final_norm: self.parameters.extend(self.final_norm.parameters) self.parameters.append(self.lm_head) ### END SOLUTION def forward(self, input_ids: Tensor, mask: Optional[Tensor] = None, return_attention_weights: bool = False) -> Union[Tensor, Tuple[Tensor, List[Tensor]]]: """ Process input through complete transformer model. TODO: Implement transformer model forward pass. STEP-BY-STEP IMPLEMENTATION: 1. Convert token IDs to embeddings 2. Add positional encoding 3. Process through all transformer blocks 4. Apply final normalization (if pre-norm) 5. Apply language modeling head 6. Return logits (and optionally attention weights) Args: input_ids: Token indices with shape (batch_size, seq_len) mask: Optional attention mask return_attention_weights: Whether to return all attention weights Returns: Logits with shape (batch_size, seq_len, vocab_size) Optionally also list of attention weights from each layer """ ### BEGIN SOLUTION # Token embeddings embeddings = self.token_embedding.forward(input_ids) # Add positional encoding x = self.pos_encoding.forward(embeddings) # Process through transformer blocks all_attention_weights = [] for block in self.transformer_blocks: if return_attention_weights: x, attn_weights = block.forward(x, mask=mask, return_attention_weights=True) all_attention_weights.append(attn_weights) else: x = block.forward(x, mask=mask) # Final layer normalization (for pre-norm) if self.final_norm: x = self.final_norm.forward(x) # Language modeling head # x: (batch_size, seq_len, embed_dim) # lm_head: (embed_dim, vocab_size) # output: (batch_size, seq_len, vocab_size) batch_size, seq_len, embed_dim = x.shape x_reshaped = x.data.reshape(-1, embed_dim) # (batch_size * seq_len, embed_dim) logits_reshaped = np.matmul(x_reshaped, self.lm_head.data) # (batch_size * seq_len, vocab_size) logits = logits_reshaped.reshape(batch_size, seq_len, self.vocab_size) if return_attention_weights: return Tensor(logits), all_attention_weights else: return Tensor(logits) ### END SOLUTION def __call__(self, input_ids: Tensor, mask: Optional[Tensor] = None, return_attention_weights: bool = False) -> Union[Tensor, Tuple[Tensor, List[Tensor]]]: """Make the class callable.""" return self.forward(input_ids, mask, return_attention_weights) def generate(self, input_ids: Tensor, max_new_tokens: int = 50, temperature: float = 1.0) -> Tensor: """ Generate text autoregressively. This function is PROVIDED to show text generation capability. """ batch_size, current_seq_len = input_ids.shape if current_seq_len >= self.max_seq_length: raise ValueError(f"Input sequence length {current_seq_len} exceeds max {self.max_seq_length}") generated_ids = input_ids.data.copy() for _ in range(max_new_tokens): # Create causal mask seq_len = generated_ids.shape[1] causal_mask = np.triu(np.ones((seq_len, seq_len)), k=1) causal_mask = 1 - causal_mask # Forward pass logits = self.forward(Tensor(generated_ids), mask=Tensor(causal_mask)) # Get logits for last position last_logits = logits.data[:, -1, :] # (batch_size, vocab_size) # Apply temperature last_logits = last_logits / temperature # Sample next token (using simple sampling) # Convert to probabilities exp_logits = np.exp(last_logits - np.max(last_logits, axis=-1, keepdims=True)) probs = exp_logits / np.sum(exp_logits, axis=-1, keepdims=True) # Sample from distribution next_tokens = [] for i in range(batch_size): next_token = np.random.choice(self.vocab_size, p=probs[i]) next_tokens.append(next_token) next_tokens = np.array(next_tokens).reshape(batch_size, 1) # Append to sequence generated_ids = np.concatenate([generated_ids, next_tokens], axis=1) # Stop if we reach max sequence length if generated_ids.shape[1] >= self.max_seq_length: break return Tensor(generated_ids) def get_memory_usage(self) -> Dict[str, float]: """ Calculate memory usage of complete transformer model. This function is PROVIDED to show memory analysis. """ # Token embedding memory if hasattr(self.token_embedding, 'get_memory_usage'): embedding_memory = self.token_embedding.get_memory_usage()['total_memory_mb'] else: embedding_memory = self.vocab_size * self.embed_dim * 4 / (1024 * 1024) # Transformer blocks memory block_memory = 0 if self.transformer_blocks: single_block_memory = self.transformer_blocks[0].get_memory_usage()['total_memory_mb'] block_memory = single_block_memory * self.num_layers # Final norm memory final_norm_memory = 0 if self.final_norm: final_norm_memory = self.final_norm.get_memory_usage()['parameter_memory_mb'] # Language modeling head memory lm_head_memory = self.lm_head.data.nbytes / (1024 * 1024) total_memory = embedding_memory + block_memory + final_norm_memory + lm_head_memory total_params = sum(p.data.size for p in self.parameters) if hasattr(self, 'parameters') else 0 return { 'total_memory_mb': total_memory, 'embedding_memory_mb': embedding_memory, 'transformer_blocks_memory_mb': block_memory, 'lm_head_memory_mb': lm_head_memory, 'total_parameters': total_params, 'vocab_size': self.vocab_size, 'embed_dim': self.embed_dim, 'num_layers': self.num_layers, 'num_heads': self.num_heads, 'hidden_dim': self.hidden_dim } # %% ../../modules/14_transformers/transformers_dev.ipynb 22 import time class TransformerProfiler: """ Performance profiling toolkit for transformer architectures. Helps ML engineers understand computational costs, memory scaling, and architectural trade-offs in transformer-based models. """ def __init__(self): self.results = {} def measure_scaling_with_depth(self, base_config: Dict, layer_counts: List[int]) -> Dict: """ Measure how transformer performance scales with number of layers. TODO: Implement transformer depth scaling measurement. STEP-BY-STEP IMPLEMENTATION: 1. Create transformers with different layer counts 2. Measure memory usage and computation time for each 3. Calculate scaling patterns (should be linear with depth) 4. Analyze parameter growth and memory requirements 5. Return comprehensive scaling analysis EXPECTED SCALING: - Parameters: Linear with depth - Memory: Linear with depth - Computation: Linear with depth - Quality: Generally improves with depth (to a point) Args: base_config: Base transformer configuration layer_counts: List of layer counts to test Returns: Dictionary with scaling analysis results """ ### BEGIN SOLUTION scaling_results = {} # Test input batch_size = 4 seq_len = 32 vocab_size = base_config['vocab_size'] test_input = Tensor(np.random.randint(0, vocab_size, (batch_size, seq_len))) for num_layers in layer_counts: # Create transformer with this depth transformer = Transformer( vocab_size=base_config['vocab_size'], embed_dim=base_config['embed_dim'], num_heads=base_config['num_heads'], num_layers=num_layers, hidden_dim=base_config['hidden_dim'], max_seq_length=base_config.get('max_seq_length', 128) ) # Measure memory usage memory_stats = transformer.get_memory_usage() # Measure computation time start_time = time.time() logits = transformer.forward(test_input) end_time = time.time() computation_time_ms = (end_time - start_time) * 1000 # Calculate throughput total_tokens = batch_size * seq_len tokens_per_second = total_tokens / (end_time - start_time) if end_time > start_time else 0 scaling_results[num_layers] = { 'num_layers': num_layers, 'total_parameters': memory_stats['total_parameters'], 'total_memory_mb': memory_stats['total_memory_mb'], 'computation_time_ms': computation_time_ms, 'tokens_per_second': tokens_per_second, 'memory_per_layer_mb': memory_stats['transformer_blocks_memory_mb'] / num_layers if num_layers > 0 else 0, 'parameters_per_layer': (memory_stats['total_parameters'] - base_config['vocab_size'] * base_config['embed_dim'] * 2) // num_layers if num_layers > 0 else 0 } return scaling_results ### END SOLUTION def analyze_width_vs_depth_tradeoffs(self, base_params: int, configurations: List[Dict]) -> Dict: """ Compare different ways to allocate a fixed parameter budget. This function is PROVIDED to show parameter allocation analysis. """ print(f"📊 WIDTH vs DEPTH TRADE-OFF ANALYSIS") print(f"Target parameter budget: ~{base_params:,} parameters") print("=" * 70) results = {} # Test input batch_size = 4 seq_len = 32 test_input = Tensor(np.random.randint(0, 1000, (batch_size, seq_len))) print(f"{'Config':<15} {'Layers':<7} {'Embed':<6} {'Heads':<6} {'Hidden':<7} {'Params':<12} {'Time (ms)':<10} {'Memory'}") print("-" * 80) for i, config in enumerate(configurations): try: # Create transformer transformer = Transformer( vocab_size=1000, # Fixed vocab size embed_dim=config['embed_dim'], num_heads=config['num_heads'], num_layers=config['num_layers'], hidden_dim=config['hidden_dim'], max_seq_length=128 ) # Get actual parameter count memory_stats = transformer.get_memory_usage() actual_params = memory_stats['total_parameters'] # Measure performance start_time = time.time() logits = transformer.forward(test_input) computation_time = (time.time() - start_time) * 1000 config_name = f"Config_{i+1}" results[config_name] = { 'config': config, 'actual_parameters': actual_params, 'computation_time_ms': computation_time, 'memory_mb': memory_stats['total_memory_mb'], 'parameter_efficiency': abs(actual_params - base_params) / base_params } print(f"{config_name:<15} {config['num_layers']:<7} {config['embed_dim']:<6} " f"{config['num_heads']:<6} {config['hidden_dim']:<7} {actual_params:<12,} " f"{computation_time:<10.2f} {memory_stats['total_memory_mb']:.1f}MB") except Exception as e: print(f"{config_name:<15} ERROR: {str(e)[:50]}") # Analysis print(f"\n💡 TRADE-OFF INSIGHTS:") print(f" - Deeper models: Better at learning complex patterns, more sequential") print(f" - Wider models: More parallelizable, can capture diverse features") print(f" - More heads: Richer attention patterns, more computation") print(f" - Hidden dimension: Affects FFN capacity, major parameter contributor") return results def simulate_production_scaling(self, model_sizes: List[str]) -> Dict: """ Simulate memory and computation requirements for production model sizes. This function is PROVIDED to show production scaling analysis. """ print(f"\n🏭 PRODUCTION MODEL SCALING SIMULATION") print("=" * 60) # Production model configurations (simplified) size_configs = { 'Small': {'vocab_size': 50000, 'embed_dim': 512, 'num_heads': 8, 'num_layers': 6, 'hidden_dim': 2048}, 'Medium': {'vocab_size': 50000, 'embed_dim': 768, 'num_heads': 12, 'num_layers': 12, 'hidden_dim': 3072}, 'Large': {'vocab_size': 50000, 'embed_dim': 1024, 'num_heads': 16, 'num_layers': 24, 'hidden_dim': 4096}, 'XL': {'vocab_size': 50000, 'embed_dim': 1280, 'num_heads': 20, 'num_layers': 36, 'hidden_dim': 5120} } results = {} print(f"{'Model Size':<12} {'Parameters':<12} {'Memory (GB)':<12} {'Training GPU':<12} {'Inference'}") print("-" * 70) for size in model_sizes: if size not in size_configs: continue config = size_configs[size] # Estimate parameters # Embedding: vocab_size * embed_dim * 2 (input + output) embedding_params = config['vocab_size'] * config['embed_dim'] * 2 # Per layer: # - Attention: 4 * embed_dim^2 (Q, K, V, O projections) # - FFN: 2 * embed_dim * hidden_dim + embed_dim + hidden_dim (weights + biases) # - LayerNorm: 2 * embed_dim * 2 (two norms per layer) attention_params_per_layer = 4 * config['embed_dim'] ** 2 ffn_params_per_layer = 2 * config['embed_dim'] * config['hidden_dim'] + config['embed_dim'] + config['hidden_dim'] norm_params_per_layer = 4 * config['embed_dim'] layer_params = attention_params_per_layer + ffn_params_per_layer + norm_params_per_layer total_params = embedding_params + layer_params * config['num_layers'] # Estimate memory (parameters + activations + gradients for training) param_memory_gb = total_params * 4 / (1024**3) # 4 bytes per float32 # Training memory: parameters + gradients + optimizer states + activations training_memory_gb = param_memory_gb * 4 # Rough estimate (param + grad + 2x optimizer states) # Inference memory: just parameters + activations inference_memory_gb = param_memory_gb * 1.5 # Parameters + activation memory # GPU requirements (very rough estimates) if training_memory_gb < 24: training_gpu = "Single RTX 4090" elif training_memory_gb < 80: training_gpu = "Single A100" else: training_gpu = "Multi-GPU" if inference_memory_gb < 12: inference_req = "RTX 4060 Ti" elif inference_memory_gb < 24: inference_req = "RTX 4090" else: inference_req = "A100+" results[size] = { 'config': config, 'total_parameters': total_params, 'training_memory_gb': training_memory_gb, 'inference_memory_gb': inference_memory_gb, 'training_gpu_req': training_gpu, 'inference_gpu_req': inference_req } print(f"{size:<12} {total_params/1e6:.1f}M {training_memory_gb:.1f} {training_gpu:<12} {inference_req}") print(f"\n📈 SCALING OBSERVATIONS:") print(f" - Model size grows super-linearly with dimension increases") print(f" - Memory requirements dominate deployment decisions") print(f" - Training requires 3-4x more memory than inference") print(f" - Multi-GPU becomes necessary for large models") return results def analyze_transformer_system_design(): """ Comprehensive analysis of transformer system design choices and trade-offs. This function is PROVIDED to show systems-level design thinking. """ print("🏗️ TRANSFORMER SYSTEM DESIGN ANALYSIS") print("=" * 60) # Architecture decision analysis design_choices = { 'Layer Normalization': { 'Pre-norm': {'stability': 'High', 'training': 'Easier', 'performance': 'Good'}, 'Post-norm': {'stability': 'Lower', 'training': 'Harder', 'performance': 'Potentially better'} }, 'Attention Patterns': { 'Full attention': {'complexity': 'O(N²)', 'quality': 'Best', 'scalability': 'Limited'}, 'Sparse attention': {'complexity': 'O(N√N)', 'quality': 'Good', 'scalability': 'Better'}, 'Linear attention': {'complexity': 'O(N)', 'quality': 'Reduced', 'scalability': 'Excellent'} }, 'Feed-Forward Size': { '2x embed_dim': {'parameters': 'Low', 'capacity': 'Limited', 'speed': 'Fast'}, '4x embed_dim': {'parameters': 'Standard', 'capacity': 'Good', 'speed': 'Medium'}, '8x embed_dim': {'parameters': 'High', 'capacity': 'High', 'speed': 'Slow'} } } print("🎯 ARCHITECTURAL DESIGN CHOICES:") for category, choices in design_choices.items(): print(f"\n{category}:") for choice, properties in choices.items(): prop_str = ", ".join([f"{k}: {v}" for k, v in properties.items()]) print(f" - {choice}: {prop_str}") # Memory scaling analysis print(f"\n📊 MEMORY SCALING PATTERNS:") print(f"Component breakdown for typical transformer:") print(f" - Token embeddings: vocab_size × embed_dim parameters") print(f" - Position encodings: 0 parameters (sinusoidal) or seq_len × embed_dim (learned)") print(f" - Attention layers: 4 × embed_dim² parameters per layer") print(f" - Feed-forward: 2 × embed_dim × hidden_dim parameters per layer") print(f" - Layer normalization: 2 × embed_dim parameters per layer") print(f" - Output projection: embed_dim × vocab_size parameters") print(f"\n🔧 OPTIMIZATION STRATEGIES:") optimization_techniques = [ "Gradient checkpointing: Trade computation for memory", "Mixed precision training: Use FP16 for 2x memory reduction", "Parameter sharing: Share weights across layers", "Sparse attention: Reduce quadratic scaling", "Model parallelism: Distribute layers across GPUs", "Pipeline parallelism: Process different batch elements on different GPUs", "Activation checkpointing: Recompute activations instead of storing" ] for technique in optimization_techniques: print(f" - {technique}") print(f"\n🎯 PRODUCTION DEPLOYMENT CONSIDERATIONS:") deployment_factors = [ "Batch size: Larger batches improve GPU utilization but increase memory", "Sequence length: Quadratic impact on attention memory", "Model depth: Linear impact on memory and computation", "Model width: Quadratic impact on attention parameters", "Precision: FP32 vs FP16 vs INT8 trade-offs", "Hardware: GPU memory and compute capabilities", "Latency requirements: Real-time vs batch processing", "Throughput requirements: Tokens per second targets" ] for factor in deployment_factors: print(f" - {factor}")