# ╔═══════════════════════════════════════════════════════════════════════════════╗ # ║ 🚨 CRITICAL WARNING 🚨 ║ # ║ AUTOGENERATED! DO NOT EDIT! ║ # ║ ║ # ║ This file is AUTOMATICALLY GENERATED from source modules. ║ # ║ ANY CHANGES MADE HERE WILL BE LOST when modules are re-exported! ║ # ║ ║ # ║ ✅ TO EDIT: src/XX_quantization/XX_quantization.py ║ # ║ ✅ TO EXPORT: Run 'tito module complete ' ║ # ║ ║ # ║ 🛡️ STUDENT PROTECTION: This file contains optimized implementations. ║ # ║ Editing it directly may break module functionality and training. ║ # ║ ║ # ║ 🎓 LEARNING TIP: Work in src/ (developers) or modules/ (learners) ║ # ║ The tinytorch/ directory is generated code - edit source files instead! ║ # ╚═══════════════════════════════════════════════════════════════════════════════╝ # %% auto 0 __all__ = ['INT8_MIN_VALUE', 'INT8_MAX_VALUE', 'INT8_RANGE', 'EPSILON', 'BYTES_PER_FLOAT32', 'BYTES_PER_INT8', 'MB_TO_BYTES', 'SimpleModel', 'QuantizedLinear', 'QuantizationComplete', 'quantize_int8', 'dequantize_int8', 'quantize_model'] # %% ../../modules/15_quantization/15_quantization.ipynb 3 import numpy as np import time from typing import Tuple, Dict, List, Optional import warnings # Import dependencies from other modules from ..core.tensor import Tensor from ..core.layers import Linear from ..core.activations import ReLU # Constants for INT8 quantization INT8_MIN_VALUE = -128 INT8_MAX_VALUE = 127 INT8_RANGE = 256 # Number of possible INT8 values (from -128 to 127 inclusive) EPSILON = 1e-8 # Small value for numerical stability (constant tensor detection) # Constants for memory calculations BYTES_PER_FLOAT32 = 4 # Standard float32 size in bytes BYTES_PER_INT8 = 1 # INT8 size in bytes MB_TO_BYTES = 1024 * 1024 # Megabytes to bytes conversion # SimpleModel helper for testing (TinyTorch doesn't use Sequential) class SimpleModel: """Simple model container for testing - demonstrates explicit composition.""" def __init__(self, *layers): self.layers = list(layers) def forward(self, x): for layer in self.layers: x = layer.forward(x) return x if __name__ == "__main__": print("✅ Quantization module imports complete") # %% ../../modules/15_quantization/15_quantization.ipynb 17 class QuantizedLinear: """Quantized version of Linear layer using INT8 arithmetic.""" def __init__(self, linear_layer: Linear): """ Create quantized version of existing linear layer. TODO: Quantize weights and bias, store quantization parameters APPROACH: 1. Quantize weights using quantize_int8 2. Quantize bias if it exists 3. Store original layer reference for forward pass 4. Store quantization parameters for dequantization IMPLEMENTATION STRATEGY: - Store quantized weights, scales, and zero points - Implement forward pass using dequantized computation (educational approach) - Production: Would use INT8 matrix multiplication libraries """ ### BEGIN SOLUTION self.original_layer = linear_layer # Quantize weights self.q_weight, self.weight_scale, self.weight_zero_point = quantize_int8(linear_layer.weight) # Quantize bias if it exists if linear_layer.bias is not None: self.q_bias, self.bias_scale, self.bias_zero_point = quantize_int8(linear_layer.bias) else: self.q_bias = None self.bias_scale = None self.bias_zero_point = None # Store input quantization parameters (set during calibration) self.input_scale = None self.input_zero_point = None ### END SOLUTION def calibrate(self, sample_inputs: List[Tensor]): """ Calibrate input quantization parameters using sample data. TODO: Calculate optimal input quantization parameters APPROACH: 1. Collect statistics from sample inputs 2. Calculate optimal scale and zero_point for inputs 3. Store for use in forward pass """ ### BEGIN SOLUTION # Collect all input values all_values = [] for inp in sample_inputs: all_values.extend(inp.data.flatten()) all_values = np.array(all_values) # Calculate input quantization parameters min_val = float(np.min(all_values)) max_val = float(np.max(all_values)) if abs(max_val - min_val) < EPSILON: self.input_scale = 1.0 self.input_zero_point = 0 else: self.input_scale = (max_val - min_val) / (INT8_RANGE - 1) self.input_zero_point = int(np.round(INT8_MIN_VALUE - min_val / self.input_scale)) self.input_zero_point = np.clip(self.input_zero_point, INT8_MIN_VALUE, INT8_MAX_VALUE) ### END SOLUTION def forward(self, x: Tensor) -> Tensor: """ Forward pass with quantized computation. TODO: Implement quantized forward pass APPROACH: 1. Quantize input (if calibrated) 2. Dequantize weights and input for computation (educational approach) 3. Perform matrix multiplication 4. Return FP32 result NOTE: Production quantization uses INT8 GEMM libraries for speed """ ### BEGIN SOLUTION # For educational purposes, we dequantize and compute in FP32 # Production systems use specialized INT8 GEMM operations # Dequantize weights weight_fp32 = dequantize_int8(self.q_weight, self.weight_scale, self.weight_zero_point) # Perform computation (same as original layer) result = x.matmul(weight_fp32) # Add bias if it exists if self.q_bias is not None: bias_fp32 = dequantize_int8(self.q_bias, self.bias_scale, self.bias_zero_point) result = Tensor(result.data + bias_fp32.data) return result ### END SOLUTION def __call__(self, x: Tensor) -> Tensor: """Allows the quantized linear layer to be called like a function.""" return self.forward(x) def parameters(self) -> List[Tensor]: """Return quantized parameters.""" params = [self.q_weight] if self.q_bias is not None: params.append(self.q_bias) return params def memory_usage(self) -> Dict[str, float]: """Calculate memory usage in bytes.""" ### BEGIN SOLUTION # Original FP32 usage original_weight_bytes = self.original_layer.weight.data.size * BYTES_PER_FLOAT32 original_bias_bytes = 0 if self.original_layer.bias is not None: original_bias_bytes = self.original_layer.bias.data.size * BYTES_PER_FLOAT32 # Quantized INT8 usage quantized_weight_bytes = self.q_weight.data.size * BYTES_PER_INT8 quantized_bias_bytes = 0 if self.q_bias is not None: quantized_bias_bytes = self.q_bias.data.size * BYTES_PER_INT8 # Add overhead for scales and zero points (small) # 2 floats: one scale for weights, one scale for bias (if present) overhead_bytes = BYTES_PER_FLOAT32 * 2 quantized_total = quantized_weight_bytes + quantized_bias_bytes + overhead_bytes original_total = original_weight_bytes + original_bias_bytes return { 'original_bytes': original_total, 'quantized_bytes': quantized_total, 'compression_ratio': original_total / quantized_total if quantized_total > 0 else 1.0 } ### END SOLUTION # %% ../../modules/15_quantization/15_quantization.ipynb 36 class QuantizationComplete: """ Complete quantization system for milestone use. Provides INT8 quantization with calibration for 4× memory reduction. """ @staticmethod def quantize_tensor(tensor: Tensor) -> Tuple[Tensor, float, int]: """Quantize FP32 tensor to INT8.""" data = tensor.data min_val = float(np.min(data)) max_val = float(np.max(data)) if abs(max_val - min_val) < EPSILON: return Tensor(np.zeros_like(data, dtype=np.int8)), 1.0, 0 scale = (max_val - min_val) / (INT8_RANGE - 1) zero_point = int(np.round(INT8_MIN_VALUE - min_val / scale)) zero_point = int(np.clip(zero_point, INT8_MIN_VALUE, INT8_MAX_VALUE)) quantized_data = np.round(data / scale + zero_point) quantized_data = np.clip(quantized_data, INT8_MIN_VALUE, INT8_MAX_VALUE).astype(np.int8) return Tensor(quantized_data), scale, zero_point @staticmethod def dequantize_tensor(q_tensor: Tensor, scale: float, zero_point: int) -> Tensor: """Dequantize INT8 tensor back to FP32.""" dequantized_data = (q_tensor.data.astype(np.float32) - zero_point) * scale return Tensor(dequantized_data) @staticmethod def quantize_model(model, calibration_data: Optional[List[Tensor]] = None) -> Dict[str, any]: """ Quantize all Linear layers in a model. Returns dictionary with quantization info and memory savings. """ quantized_layers = {} original_size = 0 quantized_size = 0 # Iterate through model parameters # SimpleModel has .layers, each layer has .parameters() method param_idx = 0 for layer in model.layers: for param in layer.parameters(): param_size = param.data.nbytes original_size += param_size # Quantize parameter q_param, scale, zp = QuantizationComplete.quantize_tensor(param) quantized_size += q_param.data.nbytes quantized_layers[f'param_{param_idx}'] = { 'quantized': q_param, 'scale': scale, 'zero_point': zp, 'original_shape': param.data.shape } param_idx += 1 return { 'quantized_layers': quantized_layers, 'original_size_mb': original_size / MB_TO_BYTES, 'quantized_size_mb': quantized_size / MB_TO_BYTES, 'compression_ratio': original_size / quantized_size if quantized_size > 0 else 1.0 } @staticmethod def compare_models(original_model, quantized_info: Dict) -> Dict[str, float]: """Compare memory usage between original and quantized models.""" return { 'original_mb': quantized_info['original_size_mb'], 'quantized_mb': quantized_info['quantized_size_mb'], 'compression_ratio': quantized_info['compression_ratio'], 'memory_saved_mb': quantized_info['original_size_mb'] - quantized_info['quantized_size_mb'] } # Convenience functions for backward compatibility def quantize_int8(tensor: Tensor) -> Tuple[Tensor, float, int]: """Quantize FP32 tensor to INT8.""" return QuantizationComplete.quantize_tensor(tensor) def dequantize_int8(q_tensor: Tensor, scale: float, zero_point: int) -> Tensor: """Dequantize INT8 tensor back to FP32.""" return QuantizationComplete.dequantize_tensor(q_tensor, scale, zero_point) def quantize_model(model, calibration_data: Optional[List[Tensor]] = None) -> Dict[str, any]: """Quantize entire model to INT8.""" return QuantizationComplete.quantize_model(model, calibration_data)