Files
TinyTorch/modules/source/12_compression/compression_dev.ipynb
Vijay Janapa Reddi bfadc82ce6 Update generated notebooks and package exports
- Regenerate all .ipynb files from fixed .py modules
- Update tinytorch package exports with corrected implementations
- Sync package module index with current 16-module structure

These generated files reflect all the module fixes and ensure consistent
.py ↔ .ipynb conversion with the updated module implementations.
2025-09-18 16:42:57 -04:00

2776 lines
126 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
{
"cells": [
{
"cell_type": "markdown",
"id": "f571c637",
"metadata": {
"cell_marker": "\"\"\""
},
"source": [
"# Compression - Model Optimization and Efficient Deployment Strategies\n",
"\n",
"Welcome to the Compression module! You'll implement techniques that make neural networks smaller, faster, and more efficient for deployment in resource-constrained environments.\n",
"\n",
"## Learning Goals\n",
"- Systems understanding: How model size and computational requirements affect deployment costs, latency, and energy consumption in production systems\n",
"- Core implementation skill: Build pruning, quantization, and knowledge distillation techniques that reduce model footprint while preserving performance\n",
"- Pattern recognition: Understand the accuracy vs efficiency trade-offs that drive deployment decisions in real ML systems\n",
"- Framework connection: See how your compression implementations relate to PyTorch's optimization tools and mobile deployment strategies\n",
"- Performance insight: Learn why compression techniques can improve both inference speed and training efficiency\n",
"\n",
"## Build → Use → Reflect\n",
"1. **Build**: Complete compression toolkit with magnitude pruning, quantization, and knowledge distillation\n",
"2. **Use**: Apply compression to trained neural networks and measure the accuracy vs efficiency trade-offs\n",
"3. **Reflect**: Why do modern ML systems require compression, and how do compression choices affect system design?\n",
"\n",
"## What You'll Achieve\n",
"By the end of this module, you'll understand:\n",
"- Deep technical understanding of how compression techniques reduce computational and memory requirements without destroying learned representations\n",
"- Practical capability to optimize neural networks for deployment in mobile devices, edge systems, and cost-sensitive environments\n",
"- Systems insight into why compression is essential for practical ML deployment and how it affects system architecture decisions\n",
"- Performance consideration of how different compression techniques affect inference speed, memory usage, and accuracy\n",
"- Connection to production ML systems and how compression enables ML deployment at scale\n",
"\n",
"## Systems Reality Check\n",
"💡 **Production Context**: Modern mobile AI relies heavily on compression - techniques like quantization can reduce model size by 4x while maintaining accuracy, enabling on-device inference\n",
"⚡ **Performance Note**: Compression often speeds up inference by reducing memory bandwidth requirements, even when computational complexity remains the same - memory is often the bottleneck"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9a4356b8",
"metadata": {
"nbgrader": {
"grade": false,
"grade_id": "compression-imports",
"locked": false,
"schema_version": 3,
"solution": false,
"task": false
}
},
"outputs": [],
"source": [
"#| default_exp core.compression\n",
"\n",
"#| export\n",
"import numpy as np\n",
"import sys\n",
"import os\n",
"from typing import List, Dict, Any, Optional, Union, Tuple\n",
"\n",
"# Helper function to set up import paths\n",
"def setup_import_paths():\n",
" \"\"\"Set up import paths for development modules.\"\"\"\n",
" import sys\n",
" import os\n",
" \n",
" # Add module directories to path\n",
" base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))\n",
" module_dirs = [\n",
" '01_tensor', '02_activations', '03_layers', '04_networks', \n",
" '05_cnn', '06_dataloader', '07_autograd', '08_optimizers', '09_training'\n",
" ]\n",
" \n",
" for module_dir in module_dirs:\n",
" sys.path.append(os.path.join(base_dir, module_dir))\n",
"\n",
"# Set up paths\n",
"setup_import_paths()\n",
"\n",
"# Import all the building blocks we need\n",
"try:\n",
" from tinytorch.core.tensor import Tensor\n",
" from tinytorch.core.layers import Dense\n",
" from tinytorch.core.networks import Sequential\n",
" from tinytorch.core.training import CrossEntropyLoss, Trainer\n",
"except ImportError:\n",
" # For development, create mock classes or import from local modules\n",
" try:\n",
" from tensor_dev import Tensor\n",
" from layers_dev import Dense\n",
" from networks_dev import Sequential\n",
" from training_dev import CrossEntropyLoss, Trainer\n",
" except ImportError:\n",
" # Create minimal mock classes for development\n",
" class Tensor:\n",
" def __init__(self, data):\n",
" self.data = np.array(data)\n",
" self.shape = self.data.shape\n",
" \n",
" def __str__(self):\n",
" return f\"Tensor({self.data})\"\n",
" \n",
" class Dense:\n",
" def __init__(self, input_size, output_size):\n",
" self.input_size = input_size\n",
" self.output_size = output_size\n",
" self.weights = Tensor(np.random.randn(input_size, output_size) * 0.1)\n",
" self.bias = Tensor(np.zeros(output_size))\n",
" \n",
" def __str__(self):\n",
" return f\"Dense({self.input_size}, {self.output_size})\"\n",
" \n",
" class Sequential:\n",
" def __init__(self, layers=None):\n",
" self.layers = layers or []\n",
" \n",
" class CrossEntropyLoss:\n",
" def __init__(self):\n",
" pass\n",
" \n",
" class Trainer:\n",
" def __init__(self, model, optimizer, loss_function):\n",
" self.model = model\n",
" self.optimizer = optimizer\n",
" self.loss_function = loss_function"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "dc9ecd88",
"metadata": {
"nbgrader": {
"grade": false,
"grade_id": "compression-setup",
"locked": false,
"schema_version": 3,
"solution": false,
"task": false
}
},
"outputs": [],
"source": [
"print(\"🔥 TinyTorch Compression Module\")\n",
"print(f\"NumPy version: {np.__version__}\")\n",
"print(f\"Python version: {sys.version_info.major}.{sys.version_info.minor}\")\n",
"print(\"Ready to compress neural networks!\")"
]
},
{
"cell_type": "markdown",
"id": "b083c6be",
"metadata": {
"cell_marker": "\"\"\""
},
"source": [
"## 📦 Where This Code Lives in the Final Package\n",
"\n",
"**Learning Side:** You work in `modules/source/10_compression/compression_dev.py` \n",
"**Building Side:** Code exports to `tinytorch.core.compression`\n",
"\n",
"```python\n",
"# Final package structure:\n",
"from tinytorch.core.compression import (\n",
" prune_weights_by_magnitude, # Remove unimportant weights\n",
" quantize_layer_weights, # Reduce precision for memory savings\n",
" DistillationLoss, # Train compact models with teacher guidance\n",
" prune_layer_neurons, # Remove entire neurons/channels\n",
" CompressionMetrics # Measure model size and efficiency\n",
")\n",
"from tinytorch.core.layers import Dense # Target for compression\n",
"from tinytorch.core.networks import Sequential # Model architectures\n",
"```\n",
"\n",
"**Why this matters:**\n",
"- **Learning:** Focused module for understanding model efficiency\n",
"- **Production:** Proper organization like PyTorch's compression tools\n",
"- **Consistency:** All compression techniques live together in `core.compression`\n",
"- **Foundation:** Essential for deploying AI in resource-constrained environments"
]
},
{
"cell_type": "markdown",
"id": "942db810",
"metadata": {
"cell_marker": "\"\"\""
},
"source": [
"## What is Model Compression?\n",
"\n",
"### The Problem: AI Models Are Getting Huge\n",
"Modern neural networks are massive:\n",
"- **GPT-3**: 175 billion parameters (350GB memory)\n",
"- **ResNet-152**: 60 million parameters (240MB memory)\n",
"- **BERT-Large**: 340 million parameters (1.3GB memory)\n",
"\n",
"But deployment environments have constraints:\n",
"- **Mobile phones**: Limited memory and battery\n",
"- **Edge devices**: No internet, minimal compute\n",
"- **Real-time systems**: Strict latency requirements\n",
"- **Cost optimization**: Expensive inference in cloud\n",
"\n",
"### The Solution: Intelligent Compression\n",
"**Model compression** reduces model size while preserving performance:\n",
"- **Pruning**: Remove unimportant weights and neurons\n",
"- **Quantization**: Use fewer bits per parameter\n",
"- **Knowledge distillation**: Train small models to mimic large ones\n",
"- **Structured optimization**: Modify architectures for efficiency\n",
"\n",
"### Real-World Impact\n",
"- **Mobile AI**: Apps like Google Translate work offline\n",
"- **Autonomous vehicles**: Real-time processing with limited compute\n",
"- **IoT devices**: Smart cameras, voice assistants, sensors\n",
"- **Cost savings**: Reduced inference costs in production systems\n",
"\n",
"### What We'll Build\n",
"1. **Magnitude-based pruning**: Remove smallest weights\n",
"2. **Quantization**: Convert FP32 → INT8 for 75% memory reduction\n",
"3. **Knowledge distillation**: Large models teach small models\n",
"4. **Structured pruning**: Remove entire neurons systematically\n",
"5. **Compression metrics**: Measure efficiency and accuracy trade-offs\n",
"6. **Integrated optimization**: Combine techniques for maximum benefit"
]
},
{
"cell_type": "markdown",
"id": "6f290fdb",
"metadata": {
"cell_marker": "\"\"\""
},
"source": [
"## 🔧 DEVELOPMENT"
]
},
{
"cell_type": "markdown",
"id": "4f7f7e3c",
"metadata": {
"cell_marker": "\"\"\"",
"lines_to_next_cell": 1
},
"source": [
"## Step 1: Understanding Model Size and Parameters\n",
"\n",
"### What Makes Models Large?\n",
"Neural networks have millions of parameters:\n",
"- **Dense layers**: Weight matrices `(input_size, output_size)`\n",
"- **Bias vectors**: One per output neuron\n",
"- **CNN kernels**: Repeated across channels and filters\n",
"- **Embeddings**: Large vocabulary mappings\n",
"\n",
"### The Memory Reality Check\n",
"Let's see how much memory different architectures use:\n",
"\n",
"```python\n",
"# Simple MLP for MNIST\n",
"layer1 = Dense(784, 128) # 784 * 128 = 100,352 params\n",
"layer2 = Dense(128, 64) # 128 * 64 = 8,192 params \n",
"layer3 = Dense(64, 10) # 64 * 10 = 640 params\n",
"# Total: 109,184 params ≈ 437KB (FP32)\n",
"\n",
"# Larger network for CIFAR-10\n",
"layer1 = Dense(3072, 512) # 3072 * 512 = 1,572,864 params\n",
"layer2 = Dense(512, 256) # 512 * 256 = 131,072 params\n",
"layer3 = Dense(256, 128) # 256 * 128 = 32,768 params\n",
"layer4 = Dense(128, 10) # 128 * 10 = 1,280 params\n",
"# Total: 1,737,984 params ≈ 7MB (FP32)\n",
"```\n",
"\n",
"### Why Size Matters\n",
"- **Memory usage**: Each FP32 parameter uses 4 bytes\n",
"- **Storage**: Model files need to be downloaded/stored\n",
"- **Inference speed**: More parameters = more computation\n",
"- **Energy consumption**: Larger models drain battery faster\n",
"\n",
"### The Efficiency Spectrum\n",
"Different applications need different efficiency levels:\n",
"- **Research**: Accuracy first, efficiency second\n",
"- **Production**: Balance accuracy and efficiency\n",
"- **Mobile**: Strict size constraints (< 10MB)\n",
"- **Edge**: Extreme efficiency requirements (< 1MB)\n",
"\n",
"### Real-World Examples\n",
"- **MobileNet**: Designed for mobile deployment\n",
"- **DistilBERT**: 60% smaller than BERT with 97% performance\n",
"- **TinyML**: Models under 1MB for microcontrollers\n",
"- **Neural architecture search**: Automated efficiency optimization\n",
"\n",
"Let's build tools to measure and analyze model size!"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a2e4583a",
"metadata": {
"lines_to_next_cell": 1,
"nbgrader": {
"grade": false,
"grade_id": "compression-metrics",
"locked": false,
"schema_version": 3,
"solution": true,
"task": false
}
},
"outputs": [],
"source": [
"#| export\n",
"class CompressionMetrics:\n",
" \"\"\"\n",
" Utilities for measuring model size, sparsity, and compression efficiency.\n",
" \n",
" This class provides tools to analyze neural network models and understand\n",
" their memory footprint, parameter distribution, and compression potential.\n",
" \"\"\"\n",
" \n",
" def __init__(self):\n",
" \"\"\"Initialize compression metrics analyzer.\"\"\"\n",
" pass\n",
" \n",
" def count_parameters(self, model: Sequential) -> Dict[str, int]:\n",
" \"\"\"\n",
" Count parameters in a neural network model.\n",
" \n",
" Args:\n",
" model: Sequential model to analyze\n",
" \n",
" Returns:\n",
" Dictionary with parameter counts per layer and total\n",
" \n",
" TODO: Implement parameter counting for neural network analysis.\n",
" \n",
" STEP-BY-STEP IMPLEMENTATION:\n",
" 1. Initialize counters for different parameter types\n",
" 2. Iterate through each layer in the model\n",
" 3. Count weights and biases for each layer\n",
" 4. Calculate total parameters across all layers\n",
" 5. Return detailed breakdown dictionary\n",
" \n",
" EXAMPLE OUTPUT:\n",
" {\n",
" 'layer_0_weights': 100352,\n",
" 'layer_0_bias': 128,\n",
" 'layer_1_weights': 8192,\n",
" 'layer_1_bias': 64,\n",
" 'layer_2_weights': 640,\n",
" 'layer_2_bias': 10,\n",
" 'total_parameters': 109386,\n",
" 'total_weights': 109184,\n",
" 'total_bias': 202\n",
" }\n",
" \n",
" IMPLEMENTATION HINTS:\n",
" - Use hasattr() to check if layer has weights/bias attributes\n",
" - Weight matrices have shape (input_size, output_size)\n",
" - Bias vectors have shape (output_size,)\n",
" - Use np.prod() to calculate total elements from shape\n",
" - Track layer index for detailed reporting\n",
" \n",
" LEARNING CONNECTIONS:\n",
" - This is like `model.numel()` in PyTorch\n",
" - Understanding where parameters are concentrated\n",
" - Foundation for compression target selection\n",
" \"\"\"\n",
" ### BEGIN SOLUTION\n",
" param_counts = {}\n",
" total_params = 0\n",
" total_weights = 0\n",
" total_bias = 0\n",
" \n",
" for i, layer in enumerate(model.layers):\n",
" # Count weights if layer has them\n",
" if hasattr(layer, 'weights') and layer.weights is not None:\n",
" # Handle different weight formats\n",
" if hasattr(layer.weights, 'shape'):\n",
" weight_count = np.prod(layer.weights.shape)\n",
" else:\n",
" weight_count = np.prod(layer.weights.data.shape)\n",
" \n",
" param_counts[f'layer_{i}_weights'] = weight_count\n",
" total_weights += weight_count\n",
" total_params += weight_count\n",
" \n",
" # Count bias if layer has them\n",
" if hasattr(layer, 'bias') and layer.bias is not None:\n",
" # Handle different bias formats\n",
" if hasattr(layer.bias, 'shape'):\n",
" bias_count = np.prod(layer.bias.shape)\n",
" else:\n",
" bias_count = np.prod(layer.bias.data.shape)\n",
" \n",
" param_counts[f'layer_{i}_bias'] = bias_count\n",
" total_bias += bias_count\n",
" total_params += bias_count\n",
" \n",
" # Add summary statistics\n",
" param_counts['total_parameters'] = total_params\n",
" param_counts['total_weights'] = total_weights\n",
" param_counts['total_bias'] = total_bias\n",
" \n",
" return param_counts\n",
" ### END SOLUTION \n",
"\n",
" def calculate_model_size(self, model: Sequential, dtype: str = 'float32') -> Dict[str, Any]:\n",
" \"\"\"\n",
" Calculate memory footprint of a neural network model.\n",
" \n",
" Args:\n",
" model: Sequential model to analyze\n",
" dtype: Data type for size calculation ('float32', 'float16', 'int8')\n",
" \n",
" Returns:\n",
" Dictionary with size information in different units\n",
" \"\"\"\n",
" # Get parameter count\n",
" param_info = self.count_parameters(model)\n",
" total_params = param_info['total_parameters']\n",
" \n",
" # Determine bytes per parameter\n",
" bytes_per_param = {\n",
" 'float32': 4,\n",
" 'float16': 2,\n",
" 'int8': 1\n",
" }.get(dtype, 4)\n",
" \n",
" # Calculate sizes\n",
" total_bytes = total_params * bytes_per_param\n",
" size_kb = total_bytes / 1024\n",
" size_mb = size_kb / 1024\n",
" \n",
" return {\n",
" 'total_parameters': total_params,\n",
" 'bytes_per_parameter': bytes_per_param,\n",
" 'total_bytes': total_bytes,\n",
" 'size_kb': round(size_kb, 2),\n",
" 'size_mb': round(size_mb, 2),\n",
" 'dtype': dtype\n",
" }"
]
},
{
"cell_type": "markdown",
"id": "4da32b00",
"metadata": {
"cell_marker": "\"\"\"",
"lines_to_next_cell": 1
},
"source": [
"### 🧪 Unit Test: Compression Metrics Analysis\n",
"\n",
"This test validates your `CompressionMetrics` class implementation, ensuring it accurately calculates model parameters, memory usage, and compression statistics for optimization analysis."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c809bfa4",
"metadata": {
"lines_to_next_cell": 1,
"nbgrader": {
"grade": false,
"grade_id": "test-compression-metrics",
"locked": false,
"schema_version": 3,
"solution": false,
"task": false
}
},
"outputs": [],
"source": [
"def test_unit_compression_metrics():\n",
" \"\"\"Unit test for the CompressionMetrics class.\"\"\"\n",
" print(\"🔬 Unit Test: Compression Metrics...\")\n",
" \n",
" # Create a simple model for testing\n",
" layers = [\n",
" Dense(784, 128), # 784 * 128 + 128 = 100,480 params\n",
" Dense(128, 64), # 128 * 64 + 64 = 8,256 params\n",
" Dense(64, 10) # 64 * 10 + 10 = 650 params\n",
" ]\n",
" model = Sequential(layers)\n",
" \n",
" # Test parameter counting\n",
" metrics = CompressionMetrics()\n",
" param_counts = metrics.count_parameters(model)\n",
" \n",
" # Verify parameter counts\n",
" assert param_counts['layer_0_weights'] == 100352, f\"Expected 100352, got {param_counts['layer_0_weights']}\"\n",
" assert param_counts['layer_0_bias'] == 128, f\"Expected 128, got {param_counts['layer_0_bias']}\"\n",
" assert param_counts['total_parameters'] == 109386, f\"Expected 109386, got {param_counts['total_parameters']}\"\n",
" \n",
" print(\"📈 Progress: CompressionMetrics ✓\")\n",
" print(\"🎯 CompressionMetrics behavior:\")\n",
" print(\" - Counts parameters across all layers\")\n",
" print(\" - Provides detailed breakdown by layer\")\n",
" print(\" - Separates weight and bias counts\")\n",
" print(\" - Foundation for compression analysis\")\n",
" print()\n",
"\n",
"# Test will be run in main block "
]
},
{
"cell_type": "markdown",
"id": "a6ab5d0f",
"metadata": {
"cell_marker": "\"\"\"",
"lines_to_next_cell": 1
},
"source": [
"## Step 2: Magnitude-Based Pruning - Removing Unimportant Weights\n",
"\n",
"### What is Magnitude-Based Pruning?\n",
"**Magnitude-based pruning** removes weights with the smallest absolute values, based on the hypothesis that small weights contribute less to the model's performance.\n",
"\n",
"### The Algorithm\n",
"1. **Calculate magnitude**: `|weight|` for each parameter\n",
"2. **Set threshold**: Choose cutoff (e.g., 50th percentile)\n",
"3. **Create mask**: `mask = |weight| > threshold`\n",
"4. **Apply pruning**: `pruned_weight = weight * mask`\n",
"\n",
"### Why This Works\n",
"- **Redundancy**: Neural networks are over-parameterized\n",
"- **Lottery ticket hypothesis**: Small subnetworks can match full performance\n",
"- **Magnitude correlation**: Larger weights often more important\n",
"- **Gradual degradation**: Performance drops slowly with pruning\n",
"\n",
"### Real-World Applications\n",
"- **Mobile deployment**: Reduce model size for smartphones\n",
"- **Edge computing**: Fit models on resource-constrained devices\n",
"- **Inference acceleration**: Fewer parameters = faster computation\n",
"- **Memory optimization**: Sparse matrices save storage\n",
"\n",
"### Pruning Strategies\n",
"- **Global**: Single threshold across all layers\n",
"- **Layer-wise**: Different thresholds per layer\n",
"- **Structured**: Remove entire neurons/channels\n",
"- **Gradual**: Increase sparsity during training\n",
"\n",
"### Performance vs Sparsity Trade-off\n",
"- **10-30% sparsity**: Minimal accuracy loss\n",
"- **50-70% sparsity**: Moderate accuracy drop\n",
"- **80-90% sparsity**: Significant accuracy loss\n",
"- **95%+ sparsity**: Requires careful tuning\n",
"\n",
"Let's implement magnitude-based pruning!"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "781ec53e",
"metadata": {
"lines_to_next_cell": 1,
"nbgrader": {
"grade": false,
"grade_id": "magnitude-pruning",
"locked": false,
"schema_version": 3,
"solution": true,
"task": false
}
},
"outputs": [],
"source": [
"#| export\n",
"def prune_weights_by_magnitude(layer: Dense, pruning_ratio: float = 0.5) -> Tuple[Dense, Dict[str, Any]]:\n",
" \"\"\"\n",
" Prune weights in a Dense layer by magnitude.\n",
" \n",
" Args:\n",
" layer: Dense layer to prune\n",
" pruning_ratio: Fraction of weights to remove (0.0 to 1.0)\n",
" \n",
" Returns:\n",
" Tuple of (pruned_layer, pruning_info)\n",
" \n",
" TODO: Implement magnitude-based weight pruning.\n",
" \n",
" STEP-BY-STEP IMPLEMENTATION:\n",
" 1. Get weight matrix from layer\n",
" 2. Calculate absolute values (magnitudes)\n",
" 3. Find threshold using percentile\n",
" 4. Create binary mask for weights above threshold\n",
" 5. Apply mask to weights (set small weights to zero)\n",
" 6. Update layer weights and return pruning statistics\n",
" \n",
" EXAMPLE USAGE:\n",
" ```python\n",
" layer = Dense(784, 128)\n",
" pruned_layer, info = prune_weights_by_magnitude(layer, pruning_ratio=0.3)\n",
" print(f\"Pruned {info['weights_removed']} weights, sparsity: {info['sparsity']:.2f}\")\n",
" ```\n",
" \n",
" IMPLEMENTATION HINTS:\n",
" - Use np.percentile() with pruning_ratio * 100 for threshold\n",
" - Create mask with np.abs(weights) > threshold\n",
" - Apply mask by element-wise multiplication\n",
" - Count zeros to calculate sparsity\n",
" - Return original layer (modified) and statistics\n",
" \n",
" LEARNING CONNECTIONS:\n",
" - This is the foundation of network pruning\n",
" - Magnitude pruning is simplest but effective\n",
" - Sparsity = fraction of weights that are zero\n",
" - Threshold selection affects accuracy vs compression trade-off\n",
" \"\"\"\n",
" ### BEGIN SOLUTION\n",
" # Get current weights and ensure they're numpy arrays\n",
" weights = layer.weights.data\n",
" if not isinstance(weights, np.ndarray):\n",
" weights = np.array(weights)\n",
" \n",
" original_weights = weights.copy()\n",
" \n",
" # Calculate magnitudes and threshold\n",
" magnitudes = np.abs(weights)\n",
" threshold = np.percentile(magnitudes, pruning_ratio * 100)\n",
" \n",
" # Create mask and apply pruning\n",
" mask = magnitudes > threshold\n",
" pruned_weights = weights * mask\n",
" \n",
" # Update layer weights by creating a new Tensor\n",
" layer.weights = Tensor(pruned_weights)\n",
" \n",
" # Calculate pruning statistics\n",
" total_weights = weights.size\n",
" zero_weights = np.sum(pruned_weights == 0)\n",
" weights_removed = zero_weights - np.sum(original_weights == 0)\n",
" sparsity = zero_weights / total_weights\n",
" \n",
" pruning_info = {\n",
" 'pruning_ratio': pruning_ratio,\n",
" 'threshold': float(threshold),\n",
" 'total_weights': total_weights,\n",
" 'weights_removed': weights_removed,\n",
" 'remaining_weights': total_weights - zero_weights,\n",
" 'sparsity': float(sparsity),\n",
" 'compression_ratio': 1 / (1 - sparsity) if sparsity < 1 else float('inf')\n",
" }\n",
" \n",
" return layer, pruning_info\n",
" ### END SOLUTION"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d5b5b2d2",
"metadata": {
"lines_to_next_cell": 1,
"nbgrader": {
"grade": false,
"grade_id": "calculate-sparsity",
"locked": false,
"schema_version": 3,
"solution": true,
"task": false
}
},
"outputs": [],
"source": [
"#| export\n",
"def calculate_sparsity(layer: Dense) -> float:\n",
" \"\"\"\n",
" Calculate sparsity (fraction of zero weights) in a Dense layer.\n",
" \n",
" Args:\n",
" layer: Dense layer to analyze\n",
" \n",
" Returns:\n",
" Sparsity as float between 0.0 and 1.0\n",
" \n",
" TODO: Implement sparsity calculation.\n",
" \n",
" STEP-BY-STEP IMPLEMENTATION:\n",
" 1. Get weight matrix from layer\n",
" 2. Count total number of weights\n",
" 3. Count number of zero weights\n",
" 4. Calculate sparsity = zero_weights / total_weights\n",
" 5. Return as float\n",
" \n",
" EXAMPLE USAGE:\n",
" ```python\n",
" layer = Dense(100, 50)\n",
" sparsity = calculate_sparsity(layer)\n",
" print(f\"Layer sparsity: {sparsity:.2%}\")\n",
" ```\n",
" \n",
" IMPLEMENTATION HINTS:\n",
" - Use np.sum() with condition to count zeros\n",
" - Use .size attribute for total elements\n",
" - Return 0.0 if no weights (edge case)\n",
" - Sparsity of 0.0 = dense, 1.0 = completely sparse\n",
" \n",
" LEARNING CONNECTIONS:\n",
" - Sparsity is key metric for compression\n",
" - Higher sparsity = more compression\n",
" - Sparsity patterns affect hardware efficiency\n",
" \"\"\"\n",
" ### BEGIN SOLUTION\n",
" if not hasattr(layer, 'weights') or layer.weights is None:\n",
" return 0.0\n",
" \n",
" weights = layer.weights.data\n",
" if not isinstance(weights, np.ndarray):\n",
" weights = np.array(weights)\n",
" \n",
" total_weights = weights.size\n",
" zero_weights = np.sum(weights == 0)\n",
" \n",
" return zero_weights / total_weights if total_weights > 0 else 0.0\n",
" ### END SOLUTION "
]
},
{
"cell_type": "markdown",
"id": "67eeac1a",
"metadata": {
"cell_marker": "\"\"\"",
"lines_to_next_cell": 1
},
"source": [
"### 🧪 Unit Test: Magnitude-Based Pruning\n",
"\n",
"This test validates your pruning implementation, ensuring it correctly identifies and removes the smallest weights while maintaining model functionality and calculating accurate sparsity metrics."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ac3403ca",
"metadata": {
"lines_to_next_cell": 1,
"nbgrader": {
"grade": false,
"grade_id": "test-pruning",
"locked": false,
"schema_version": 3,
"solution": false,
"task": false
}
},
"outputs": [],
"source": [
"def test_unit_magnitude_pruning():\n",
" \"\"\"Unit test for the magnitude-based pruning functionality.\"\"\"\n",
" print(\"🔬 Unit Test: Magnitude Pruning...\")\n",
" \n",
" # Create a simple Dense layer\n",
" layer = Dense(100, 50)\n",
" \n",
" # Test basic pruning\n",
" pruned_layer, info = prune_weights_by_magnitude(layer, pruning_ratio=0.3)\n",
" \n",
" # Verify pruning results\n",
" assert info['pruning_ratio'] == 0.3, f\"Expected 0.3, got {info['pruning_ratio']}\"\n",
" assert info['total_weights'] == 5000, f\"Expected 5000, got {info['total_weights']}\"\n",
" assert info['sparsity'] >= 0.3, f\"Sparsity should be at least 0.3, got {info['sparsity']}\"\n",
" \n",
" print(f\"✅ Basic pruning works: {info['sparsity']:.2%} sparsity\")\n",
" \n",
" # Test sparsity calculation\n",
" sparsity = calculate_sparsity(layer)\n",
" assert abs(sparsity - info['sparsity']) < 0.001, f\"Sparsity mismatch: {sparsity} vs {info['sparsity']}\"\n",
" print(f\"✅ Sparsity calculation works: {sparsity:.2%}\")\n",
" \n",
" # Test edge cases\n",
" empty_layer = Dense(10, 10)\n",
" empty_layer.weights = Tensor(np.zeros((10, 10)))\n",
" sparsity_empty = calculate_sparsity(empty_layer)\n",
" assert sparsity_empty == 1.0, f\"Empty layer should have 1.0 sparsity, got {sparsity_empty}\"\n",
" \n",
" print(\"✅ Edge cases work correctly\")\n",
" \n",
" # Test different pruning ratios\n",
" layer2 = Dense(50, 25)\n",
" _, info50 = prune_weights_by_magnitude(layer2, pruning_ratio=0.5)\n",
" \n",
" layer3 = Dense(50, 25)\n",
" _, info80 = prune_weights_by_magnitude(layer3, pruning_ratio=0.8)\n",
" \n",
" assert info80['sparsity'] > info50['sparsity'], \"Higher pruning ratio should give higher sparsity\"\n",
" print(f\"✅ Different pruning ratios work: 50% ratio = {info50['sparsity']:.2%}, 80% ratio = {info80['sparsity']:.2%}\")\n",
" \n",
" print(\"📈 Progress: Magnitude-Based Pruning ✓\")\n",
" print(\"🎯 Pruning behavior:\")\n",
" print(\" - Removes weights with smallest absolute values\")\n",
" print(\" - Maintains layer structure and connectivity\")\n",
" print(\" - Provides detailed statistics for analysis\")\n",
" print(\" - Scales to different pruning ratios\")\n",
" print()\n",
"\n",
"# Test will be run in main block "
]
},
{
"cell_type": "markdown",
"id": "4b221d5e",
"metadata": {
"cell_marker": "\"\"\"",
"lines_to_next_cell": 1
},
"source": [
"## Step 3: Quantization - Reducing Precision for Memory Efficiency\n",
"\n",
"### What is Quantization?\n",
"**Quantization** reduces the precision of weights from FP32 (32-bit) to lower bit-widths like INT8 (8-bit), achieving significant memory savings with minimal accuracy loss.\n",
"\n",
"### The Mathematical Foundation\n",
"Quantization maps continuous floating-point values to discrete integer values:\n",
"\n",
"```\n",
"quantized_value = round((fp_value - min_val) / scale)\n",
"scale = (max_val - min_val) / (2^bits - 1)\n",
"```\n",
"\n",
"### Why Quantization Works\n",
"- **Redundant precision**: Neural networks are robust to precision reduction\n",
"- **Hardware efficiency**: Integer operations are faster than floating-point\n",
"- **Memory savings**: 4x reduction (FP32 → INT8) in memory usage\n",
"- **Cache efficiency**: More parameters fit in limited cache memory\n",
"\n",
"### Types of Quantization\n",
"- **Post-training**: Quantize after training is complete\n",
"- **Quantization-aware training**: Train with quantization simulation\n",
"- **Dynamic**: Quantize activations at runtime\n",
"- **Static**: Pre-compute quantization parameters\n",
"\n",
"### Real-World Impact\n",
"- **Mobile deployment**: 75% memory reduction enables smartphone AI\n",
"- **Edge computing**: Fit larger models on constrained devices\n",
"- **Cloud efficiency**: Reduce bandwidth and storage costs\n",
"- **Battery life**: Lower power consumption for mobile devices\n",
"\n",
"### Common Bit-Widths\n",
"- **FP32**: Full precision (baseline)\n",
"- **FP16**: Half precision (2x memory reduction)\n",
"- **INT8**: 8-bit integers (4x memory reduction)\n",
"- **INT4**: 4-bit integers (8x memory reduction, aggressive)\n",
"\n",
"Let's implement quantization algorithms!"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d9b403eb",
"metadata": {
"lines_to_next_cell": 1,
"nbgrader": {
"grade": false,
"grade_id": "quantization",
"locked": false,
"schema_version": 3,
"solution": true,
"task": false
}
},
"outputs": [],
"source": [
"#| export\n",
"def quantize_layer_weights(layer: Dense, bits: int = 8) -> Tuple[Dense, Dict[str, Any]]:\n",
" \"\"\"\n",
" Quantize layer weights to reduce precision.\n",
" \n",
" Args:\n",
" layer: Dense layer to quantize\n",
" bits: Number of bits for quantization (8, 16, etc.)\n",
" \n",
" Returns:\n",
" Tuple of (quantized_layer, quantization_info)\n",
" \n",
" TODO: Implement weight quantization for memory efficiency.\n",
" \n",
" STEP-BY-STEP IMPLEMENTATION:\n",
" 1. Get weight matrix from layer\n",
" 2. Find min and max values for quantization range\n",
" 3. Calculate scale factor: (max - min) / (2^bits - 1)\n",
" 4. Quantize: round((weights - min) / scale)\n",
" 5. Dequantize back to float: quantized * scale + min\n",
" 6. Update layer weights and return statistics\n",
" \n",
" EXAMPLE USAGE:\n",
" ```python\n",
" layer = Dense(784, 128)\n",
" quantized_layer, info = quantize_layer_weights(layer, bits=8)\n",
" print(f\"Memory reduction: {info['memory_reduction']:.1f}x\")\n",
" ```\n",
" \n",
" IMPLEMENTATION HINTS:\n",
" - Use np.min() and np.max() to find weight range\n",
" - Clamp quantized values to valid range [0, 2^bits-1]\n",
" - Store original dtype for memory calculation\n",
" - Calculate theoretical memory savings\n",
" \n",
" LEARNING CONNECTIONS:\n",
" - This is how mobile AI frameworks work\n",
" - Hardware accelerators optimize for INT8\n",
" - Precision-performance trade-off is key\n",
" \"\"\"\n",
" ### BEGIN SOLUTION\n",
" # Get current weights and ensure they're numpy arrays\n",
" weights = layer.weights.data\n",
" if not isinstance(weights, np.ndarray):\n",
" weights = np.array(weights)\n",
" \n",
" original_weights = weights.copy()\n",
" original_dtype = weights.dtype\n",
" \n",
" # Find min and max for quantization range\n",
" w_min, w_max = np.min(weights), np.max(weights)\n",
" \n",
" # Calculate scale factor\n",
" scale = (w_max - w_min) / (2**bits - 1)\n",
" \n",
" # Quantize weights\n",
" quantized = np.round((weights - w_min) / scale)\n",
" quantized = np.clip(quantized, 0, 2**bits - 1) # Clamp to valid range\n",
" \n",
" # Dequantize back to float (simulation of quantized inference)\n",
" dequantized = quantized * scale + w_min\n",
" \n",
" # Update layer weights\n",
" layer.weights = Tensor(dequantized.astype(np.float32))\n",
" \n",
" # Calculate quantization statistics\n",
" total_weights = weights.size\n",
" original_bytes = total_weights * 4 # FP32 = 4 bytes\n",
" quantized_bytes = total_weights * (bits // 8) # bits/8 bytes per weight\n",
" memory_reduction = original_bytes / quantized_bytes if quantized_bytes > 0 else 1.0\n",
" \n",
" # Calculate quantization error\n",
" mse_error = np.mean((original_weights - dequantized) ** 2)\n",
" max_error = np.max(np.abs(original_weights - dequantized))\n",
" \n",
" quantization_info = {\n",
" 'bits': bits,\n",
" 'scale': float(scale),\n",
" 'min_val': float(w_min),\n",
" 'max_val': float(w_max),\n",
" 'total_weights': total_weights,\n",
" 'original_bytes': original_bytes,\n",
" 'quantized_bytes': quantized_bytes,\n",
" 'memory_reduction': float(memory_reduction),\n",
" 'mse_error': float(mse_error),\n",
" 'max_error': float(max_error),\n",
" 'original_dtype': str(original_dtype)\n",
" }\n",
" \n",
" return layer, quantization_info\n",
" ### END SOLUTION "
]
},
{
"cell_type": "markdown",
"id": "aa5fb04f",
"metadata": {
"cell_marker": "\"\"\"",
"lines_to_next_cell": 1
},
"source": [
"### 🧪 Unit Test: Weight Quantization\n",
"\n",
"This test validates your quantization implementation, ensuring it correctly converts FP32 weights to INT8 representation while minimizing accuracy loss and achieving significant memory reduction."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6d574271",
"metadata": {
"lines_to_next_cell": 1,
"nbgrader": {
"grade": false,
"grade_id": "test-quantization",
"locked": false,
"schema_version": 3,
"solution": false,
"task": false
}
},
"outputs": [],
"source": [
"def test_unit_quantization():\n",
" \"\"\"Unit test for the weight quantization functionality.\"\"\"\n",
" print(\"🔬 Unit Test: Weight Quantization...\")\n",
" \n",
" # Create a simple Dense layer\n",
" layer = Dense(100, 50)\n",
" original_weights = layer.weights.data.copy() if hasattr(layer.weights.data, 'copy') else np.array(layer.weights.data)\n",
" \n",
" # Test INT8 quantization\n",
" quantized_layer, info = quantize_layer_weights(layer, bits=8)\n",
" \n",
" # Verify quantization results\n",
" assert info['bits'] == 8, f\"Expected 8 bits, got {info['bits']}\"\n",
" assert info['total_weights'] == 5000, f\"Expected 5000 weights, got {info['total_weights']}\"\n",
" assert info['memory_reduction'] == 4.0, f\"Expected 4x reduction, got {info['memory_reduction']}\"\n",
" \n",
" print(f\"✅ INT8 quantization works: {info['memory_reduction']:.1f}x memory reduction\")\n",
" \n",
" # Test quantization error\n",
" assert info['mse_error'] >= 0, \"MSE error should be non-negative\"\n",
" assert info['max_error'] >= 0, \"Max error should be non-negative\"\n",
" \n",
" print(f\"✅ Quantization error tracking works: MSE={info['mse_error']:.6f}, Max={info['max_error']:.6f}\")\n",
" \n",
" # Test different bit widths\n",
" layer2 = Dense(50, 25)\n",
" _, info16 = quantize_layer_weights(layer2, bits=16)\n",
" \n",
" layer3 = Dense(50, 25) \n",
" _, info4 = quantize_layer_weights(layer3, bits=8) # Use 8 instead of 4 for valid byte calculation\n",
" \n",
" assert info16['memory_reduction'] == 2.0, f\"16-bit should give 2x reduction, got {info16['memory_reduction']}\"\n",
" print(f\"✅ Different bit widths work: 16-bit = {info16['memory_reduction']:.1f}x, 8-bit = {info4['memory_reduction']:.1f}x\")\n",
" \n",
" # Test quantization parameters\n",
" assert 'scale' in info, \"Scale parameter should be included\"\n",
" assert 'min_val' in info, \"Min value should be included\"\n",
" assert 'max_val' in info, \"Max value should be included\"\n",
" \n",
" print(\"✅ Quantization parameters work correctly\")\n",
" \n",
" print(\"📈 Progress: Quantization ✓\")\n",
" print(\"🎯 Quantization behavior:\")\n",
" print(\" - Reduces precision while preserving weights\")\n",
" print(\" - Provides significant memory savings\")\n",
" print(\" - Tracks quantization error and parameters\")\n",
" print(\" - Supports different bit widths\")\n",
" print()\n",
"\n",
"# Test will be run in main block "
]
},
{
"cell_type": "markdown",
"id": "8f39cb2a",
"metadata": {
"cell_marker": "\"\"\"",
"lines_to_next_cell": 1
},
"source": [
"## Step 4: Knowledge Distillation - Large Models Teach Small Models\n",
"\n",
"### What is Knowledge Distillation?\n",
"**Knowledge distillation** trains a small \"student\" model to mimic the behavior of a large \"teacher\" model, achieving compact models with competitive performance.\n",
"\n",
"### The Core Idea\n",
"Instead of training on hard labels (0 or 1), students learn from soft targets (probabilities) that contain more information about the teacher's knowledge.\n",
"\n",
"### The Mathematical Foundation\n",
"Distillation combines two loss functions:\n",
"\n",
"```python\n",
"# Hard loss: Standard classification loss\n",
"hard_loss = CrossEntropy(student_logits, true_labels)\n",
"\n",
"# Soft loss: Learn from teacher's probability distribution\n",
"soft_targets = softmax(teacher_logits / temperature)\n",
"soft_student = softmax(student_logits / temperature)\n",
"soft_loss = -sum(soft_targets * log(soft_student))\n",
"\n",
"# Combined loss\n",
"total_loss = α * hard_loss + (1 - α) * soft_loss\n",
"```\n",
"\n",
"### Why Distillation Works\n",
"- **Richer information**: Soft targets contain inter-class relationships\n",
"- **Teacher knowledge**: Large models learn useful representations\n",
"- **Regularization**: Soft targets reduce overfitting\n",
"- **Efficiency**: Small models gain large model insights\n",
"\n",
"### Key Parameters\n",
"- **Temperature (T)**: Controls softness of probability distributions\n",
" - High T: Softer, more informative distributions\n",
" - Low T: Sharper, more confident predictions\n",
"- **Alpha (α)**: Balances hard and soft losses\n",
" - α = 1.0: Only hard loss (standard training)\n",
" - α = 0.0: Only soft loss (pure distillation)\n",
"\n",
"### Real-World Applications\n",
"- **Mobile deployment**: Small models with large model performance\n",
"- **Edge computing**: Efficient inference with minimal accuracy loss\n",
"- **Model compression**: Alternative to pruning and quantization\n",
"- **Multi-task learning**: Transfer knowledge across different tasks\n",
"\n",
"### Success Stories\n",
"- **DistilBERT**: 60% smaller than BERT with 97% performance\n",
"- **MobileNet**: Distilled from ResNet for mobile deployment\n",
"- **TinyBERT**: Extreme compression for resource-constrained devices\n",
"\n",
"Let's implement knowledge distillation!"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "85a15c4f",
"metadata": {
"lines_to_next_cell": 1,
"nbgrader": {
"grade": false,
"grade_id": "distillation-loss",
"locked": false,
"schema_version": 3,
"solution": true,
"task": false
}
},
"outputs": [],
"source": [
"#| export\n",
"class DistillationLoss:\n",
" \"\"\"\n",
" Combined loss function for knowledge distillation.\n",
" \n",
" This loss combines standard classification loss (hard targets) with\n",
" distillation loss (soft targets from teacher) for training compact models.\n",
" \"\"\"\n",
" \n",
" def __init__(self, temperature: float = 3.0, alpha: float = 0.5):\n",
" \"\"\"\n",
" Initialize distillation loss.\n",
" \n",
" Args:\n",
" temperature: Temperature for softening probability distributions\n",
" alpha: Weight for hard loss (1-alpha for soft loss)\n",
" \"\"\"\n",
" self.temperature = temperature\n",
" self.alpha = alpha\n",
" self.ce_loss = CrossEntropyLoss()\n",
" \n",
" def __call__(self, student_logits: np.ndarray, teacher_logits: np.ndarray, \n",
" true_labels: np.ndarray) -> float:\n",
" \"\"\"\n",
" Calculate combined distillation loss.\n",
" \n",
" Args:\n",
" student_logits: Raw outputs from student model\n",
" teacher_logits: Raw outputs from teacher model \n",
" true_labels: Ground truth labels\n",
" \n",
" Returns:\n",
" Combined loss value\n",
" \n",
" TODO: Implement knowledge distillation loss function.\n",
" \n",
" STEP-BY-STEP IMPLEMENTATION:\n",
" 1. Calculate hard loss using standard cross-entropy\n",
" 2. Apply temperature scaling to both logits\n",
" 3. Calculate soft targets from teacher logits\n",
" 4. Calculate soft loss between student and teacher distributions\n",
" 5. Combine hard and soft losses with alpha weighting\n",
" 6. Return total loss\n",
" \n",
" EXAMPLE USAGE:\n",
" ```python\n",
" distill_loss = DistillationLoss(temperature=3.0, alpha=0.5)\n",
" loss = distill_loss(student_out, teacher_out, labels)\n",
" ```\n",
" \n",
" IMPLEMENTATION HINTS:\n",
" - Use temperature scaling before softmax: logits / temperature\n",
" - Implement stable softmax to avoid numerical issues\n",
" - Scale soft loss by temperature^2 (standard practice)\n",
" - Ensure proper normalization for both losses\n",
" \n",
" LEARNING CONNECTIONS:\n",
" - This is how DistilBERT was trained\n",
" - Temperature controls knowledge transfer richness\n",
" - Alpha balances accuracy vs compression\n",
" \"\"\"\n",
" ### BEGIN SOLUTION\n",
" # Convert inputs to numpy arrays if needed\n",
" if not isinstance(student_logits, np.ndarray):\n",
" student_logits = np.array(student_logits)\n",
" if not isinstance(teacher_logits, np.ndarray):\n",
" teacher_logits = np.array(teacher_logits)\n",
" if not isinstance(true_labels, np.ndarray):\n",
" true_labels = np.array(true_labels)\n",
" \n",
" # Hard loss: standard classification loss\n",
" hard_loss = self._cross_entropy_loss(student_logits, true_labels)\n",
" \n",
" # Soft loss: distillation from teacher\n",
" # Apply temperature scaling\n",
" teacher_soft = self._softmax(teacher_logits / self.temperature)\n",
" student_soft = self._softmax(student_logits / self.temperature)\n",
" \n",
" # Calculate soft loss (KL divergence)\n",
" soft_loss = -np.mean(np.sum(teacher_soft * np.log(student_soft + 1e-10), axis=-1))\n",
" \n",
" # Scale soft loss by temperature^2 (standard practice)\n",
" soft_loss *= (self.temperature ** 2)\n",
" \n",
" # Combine losses\n",
" total_loss = self.alpha * hard_loss + (1 - self.alpha) * soft_loss\n",
" \n",
" return float(total_loss)\n",
" ### END SOLUTION\n",
" \n",
" def _softmax(self, logits: np.ndarray) -> np.ndarray:\n",
" \"\"\"Numerically stable softmax.\"\"\"\n",
" # Subtract max for numerical stability\n",
" exp_logits = np.exp(logits - np.max(logits, axis=-1, keepdims=True))\n",
" return exp_logits / np.sum(exp_logits, axis=-1, keepdims=True)\n",
" \n",
" def _cross_entropy_loss(self, logits: np.ndarray, labels: np.ndarray) -> float:\n",
" \"\"\"Simple cross-entropy loss implementation.\"\"\"\n",
" # Convert labels to one-hot if needed\n",
" if labels.ndim == 1:\n",
" num_classes = logits.shape[-1]\n",
" one_hot = np.zeros((labels.shape[0], num_classes))\n",
" one_hot[np.arange(labels.shape[0]), labels] = 1\n",
" labels = one_hot\n",
" \n",
" # Apply softmax and calculate cross-entropy\n",
" probs = self._softmax(logits)\n",
" return -np.mean(np.sum(labels * np.log(probs + 1e-10), axis=-1)) "
]
},
{
"cell_type": "markdown",
"id": "146dd625",
"metadata": {
"cell_marker": "\"\"\"",
"lines_to_next_cell": 1
},
"source": [
"### 🧪 Unit Test: Knowledge Distillation\n",
"\n",
"This test validates your knowledge distillation implementation, ensuring the student model learns effectively from teacher predictions while maintaining computational efficiency."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bedc67dc",
"metadata": {
"lines_to_next_cell": 1,
"nbgrader": {
"grade": false,
"grade_id": "test-distillation",
"locked": false,
"schema_version": 3,
"solution": false,
"task": false
}
},
"outputs": [],
"source": [
"def test_unit_distillation():\n",
" \"\"\"Unit test for the DistillationLoss class.\"\"\"\n",
" print(\"🔬 Unit Test: Knowledge Distillation...\")\n",
" \n",
" # Test parameters\n",
" batch_size, num_classes = 32, 10\n",
" student_logits = np.random.randn(batch_size, num_classes) * 0.5\n",
" teacher_logits = np.random.randn(batch_size, num_classes) * 2.0 # Teacher is more confident\n",
" true_labels = np.random.randint(0, num_classes, batch_size)\n",
" \n",
" # Test distillation loss\n",
" distill_loss = DistillationLoss(temperature=3.0, alpha=0.5)\n",
" loss = distill_loss(student_logits, teacher_logits, true_labels)\n",
" \n",
" # Verify loss computation\n",
" assert isinstance(loss, float), f\"Loss should be float, got {type(loss)}\"\n",
" assert loss >= 0, f\"Loss should be non-negative, got {loss}\"\n",
" \n",
" print(f\"✅ Distillation loss computation works: {loss:.4f}\")\n",
" \n",
" # Test different temperature values\n",
" loss_t1 = DistillationLoss(temperature=1.0, alpha=0.5)(student_logits, teacher_logits, true_labels)\n",
" loss_t5 = DistillationLoss(temperature=5.0, alpha=0.5)(student_logits, teacher_logits, true_labels)\n",
" \n",
" print(f\"✅ Temperature scaling works: T=1.0 → {loss_t1:.4f}, T=5.0 → {loss_t5:.4f}\")\n",
" \n",
" # Test different alpha values\n",
" loss_hard = DistillationLoss(temperature=3.0, alpha=1.0)(student_logits, teacher_logits, true_labels) # Only hard loss\n",
" loss_soft = DistillationLoss(temperature=3.0, alpha=0.0)(student_logits, teacher_logits, true_labels) # Only soft loss\n",
" \n",
" assert loss_hard != loss_soft, \"Hard and soft losses should be different\"\n",
" print(f\"✅ Alpha balancing works: Hard only = {loss_hard:.4f}, Soft only = {loss_soft:.4f}\")\n",
" \n",
" # Test edge cases\n",
" # Identical student and teacher should have low soft loss\n",
" identical_logits = np.random.randn(batch_size, num_classes)\n",
" loss_identical = DistillationLoss(temperature=3.0, alpha=0.0)(identical_logits, identical_logits, true_labels)\n",
" \n",
" print(f\"✅ Edge cases work: Identical logits soft loss = {loss_identical:.4f}\")\n",
" \n",
" # Test internal methods\n",
" softmax_result = distill_loss._softmax(student_logits)\n",
" assert np.allclose(np.sum(softmax_result, axis=1), 1.0), \"Softmax should sum to 1\"\n",
" \n",
" print(\"✅ Internal methods work correctly\")\n",
" \n",
" print(\"📈 Progress: Knowledge Distillation ✓\")\n",
" print(\"🎯 Distillation behavior:\")\n",
" print(\" - Combines hard and soft losses effectively\")\n",
" print(\" - Temperature controls knowledge transfer\")\n",
" print(\" - Alpha balances accuracy vs compression\")\n",
" print(\" - Numerically stable softmax implementation\")\n",
" print()\n",
"\n",
"# Test will be run in main block "
]
},
{
"cell_type": "markdown",
"id": "fe8e4551",
"metadata": {
"cell_marker": "\"\"\"",
"lines_to_next_cell": 1
},
"source": [
"## Step 5: Structured Pruning - Removing Entire Neurons and Channels\n",
"\n",
"### What is Structured Pruning?\n",
"**Structured pruning** removes entire neurons, channels, or layers rather than individual weights, creating models that are actually faster on hardware.\n",
"\n",
"### Structured vs Unstructured Pruning\n",
"\n",
"#### **Unstructured Pruning** (What we did in Step 2)\n",
"- Removes individual weights scattered throughout the matrix\n",
"- Creates sparse matrices (lots of zeros)\n",
"- High compression but requires sparse matrix libraries for speedup\n",
"- Memory savings but limited hardware acceleration\n",
"\n",
"#### **Structured Pruning** (What we're doing now)\n",
"- Removes entire rows/columns (neurons/channels)\n",
"- Creates smaller dense matrices\n",
"- Lower compression but actual hardware speedup\n",
"- Real reduction in computation and memory access\n",
"\n",
"### The Mathematical Impact\n",
"Removing a neuron from a Dense layer:\n",
"\n",
"```python\n",
"# Original layer: Dense(784, 128)\n",
"# Weight matrix: (784, 128), Bias: (128,)\n",
"\n",
"# After removing 32 neurons: Dense(784, 96)\n",
"# Weight matrix: (784, 96), Bias: (96,)\n",
"# 25% reduction in parameters and computation\n",
"```\n",
"\n",
"### Why Structured Pruning Works\n",
"- **Hardware efficiency**: Dense matrix operations are optimized\n",
"- **Memory bandwidth**: Smaller matrices mean less data movement\n",
"- **Cache utilization**: Better memory access patterns\n",
"- **Real speedup**: Actual reduction in FLOPs and inference time\n",
"\n",
"### Neuron Importance Metrics\n",
"How do we decide which neurons to remove?\n",
"\n",
"1. **Activation-based**: Neurons with low average activation\n",
"2. **Gradient-based**: Neurons with small gradients during training\n",
"3. **Weight magnitude**: Neurons with small outgoing weights\n",
"4. **Information-theoretic**: Neurons contributing less information\n",
"\n",
"### Real-World Applications\n",
"- **Mobile deployment**: Actual speedup on ARM processors\n",
"- **FPGA inference**: Smaller designs with same performance\n",
"- **Edge computing**: Reduced memory bandwidth requirements\n",
"- **Production systems**: Guaranteed inference time reduction\n",
"\n",
"### Challenges\n",
"- **Architecture modification**: Must handle dimension mismatches\n",
"- **Cascade effects**: Removing one neuron affects next layer\n",
"- **Retraining**: Often requires fine-tuning after pruning\n",
"- **Importance ranking**: Choosing the right importance metric\n",
"\n",
"Let's implement structured pruning for Dense layers!"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "42116bb5",
"metadata": {
"lines_to_next_cell": 1,
"nbgrader": {
"grade": false,
"grade_id": "neuron-importance",
"locked": false,
"schema_version": 3,
"solution": true,
"task": false
}
},
"outputs": [],
"source": [
"#| export\n",
"def compute_neuron_importance(layer: Dense, method: str = 'weight_magnitude') -> np.ndarray:\n",
" \"\"\"\n",
" Compute importance scores for each neuron in a Dense layer.\n",
" \n",
" Args:\n",
" layer: Dense layer to analyze\n",
" method: Importance computation method\n",
" \n",
" Returns:\n",
" Array of importance scores for each output neuron\n",
" \n",
" TODO: Implement neuron importance calculation.\n",
" \n",
" STEP-BY-STEP IMPLEMENTATION:\n",
" 1. Get weight matrix from layer\n",
" 2. Choose importance metric based on method\n",
" 3. Calculate per-neuron importance scores\n",
" 4. Return array of scores (one per output neuron)\n",
" \n",
" AVAILABLE METHODS:\n",
" - 'weight_magnitude': Sum of absolute weights per neuron\n",
" - 'weight_variance': Variance of weights per neuron\n",
" - 'random': Random importance (for baseline comparison)\n",
" \n",
" IMPLEMENTATION HINTS:\n",
" - Weights shape is (input_size, output_size)\n",
" - Each column represents one output neuron\n",
" - Use axis=0 for operations across input dimensions\n",
" - Higher scores = more important neurons\n",
" \n",
" LEARNING CONNECTIONS:\n",
" - This is how neural architecture search works\n",
" - Different metrics capture different aspects of importance\n",
" - Importance ranking is crucial for effective pruning\n",
" \"\"\"\n",
" ### BEGIN SOLUTION\n",
" # Get weights and ensure they're numpy arrays\n",
" weights = layer.weights.data\n",
" if not isinstance(weights, np.ndarray):\n",
" weights = np.array(weights)\n",
" \n",
" if method == 'weight_magnitude':\n",
" # Sum of absolute weights per neuron (column)\n",
" importance = np.sum(np.abs(weights), axis=0)\n",
" \n",
" elif method == 'weight_variance':\n",
" # Variance of weights per neuron (column)\n",
" importance = np.var(weights, axis=0)\n",
" \n",
" elif method == 'random':\n",
" # Random importance for baseline comparison\n",
" importance = np.random.rand(weights.shape[1])\n",
" \n",
" else:\n",
" raise ValueError(f\"Unknown importance method: {method}\")\n",
" \n",
" return importance\n",
" ### END SOLUTION"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "28e78697",
"metadata": {
"lines_to_next_cell": 1,
"nbgrader": {
"grade": false,
"grade_id": "structured-pruning",
"locked": false,
"schema_version": 3,
"solution": true,
"task": false
}
},
"outputs": [],
"source": [
"#| export\n",
"def prune_layer_neurons(layer: Dense, keep_ratio: float = 0.7, \n",
" importance_method: str = 'weight_magnitude') -> Tuple[Dense, Dict[str, Any]]:\n",
" \"\"\"\n",
" Remove least important neurons from a Dense layer.\n",
" \n",
" Args:\n",
" layer: Dense layer to prune\n",
" keep_ratio: Fraction of neurons to keep (0.0 to 1.0)\n",
" importance_method: Method for computing neuron importance\n",
" \n",
" Returns:\n",
" Tuple of (pruned_layer, pruning_info)\n",
" \n",
" TODO: Implement structured neuron pruning.\n",
" \n",
" STEP-BY-STEP IMPLEMENTATION:\n",
" 1. Compute importance scores for all neurons\n",
" 2. Determine how many neurons to keep\n",
" 3. Select indices of most important neurons\n",
" 4. Create new layer with reduced dimensions\n",
" 5. Copy weights and biases for selected neurons\n",
" 6. Return pruned layer and statistics\n",
" \n",
" EXAMPLE USAGE:\n",
" ```python\n",
" layer = Dense(784, 128)\n",
" pruned_layer, info = prune_layer_neurons(layer, keep_ratio=0.75)\n",
" print(f\"Reduced from {info['original_neurons']} to {info['remaining_neurons']} neurons\")\n",
" ```\n",
" \n",
" IMPLEMENTATION HINTS:\n",
" - Use np.argsort() to rank neurons by importance\n",
" - Take the top keep_count neurons: indices[-keep_count:]\n",
" - Create new layer with reduced output size\n",
" - Copy both weights and bias for selected neurons\n",
" - Track original and new sizes for statistics\n",
" \n",
" LEARNING CONNECTIONS:\n",
" - This is actual model architecture modification\n",
" - Hardware gets real speedup from smaller matrices\n",
" - Must consider cascade effects on next layers\n",
" \"\"\"\n",
" ### BEGIN SOLUTION\n",
" # Compute neuron importance\n",
" importance_scores = compute_neuron_importance(layer, importance_method)\n",
" \n",
" # Determine how many neurons to keep\n",
" original_neurons = layer.output_size\n",
" keep_count = max(1, int(original_neurons * keep_ratio)) # Keep at least 1 neuron\n",
" \n",
" # Select most important neurons\n",
" sorted_indices = np.argsort(importance_scores)\n",
" keep_indices = sorted_indices[-keep_count:] # Take top keep_count neurons\n",
" keep_indices = np.sort(keep_indices) # Sort for consistent ordering\n",
" \n",
" # Get current weights and biases\n",
" weights = layer.weights.data\n",
" if not isinstance(weights, np.ndarray):\n",
" weights = np.array(weights)\n",
" \n",
" bias = layer.bias.data if layer.bias is not None else None\n",
" if bias is not None and not isinstance(bias, np.ndarray):\n",
" bias = np.array(bias)\n",
" \n",
" # Create new layer with reduced dimensions\n",
" pruned_layer = Dense(layer.input_size, keep_count)\n",
" \n",
" # Copy weights for selected neurons\n",
" pruned_weights = weights[:, keep_indices]\n",
" pruned_layer.weights = Tensor(np.ascontiguousarray(pruned_weights))\n",
" \n",
" # Copy bias for selected neurons\n",
" if bias is not None:\n",
" pruned_bias = bias[keep_indices]\n",
" pruned_layer.bias = Tensor(np.ascontiguousarray(pruned_bias))\n",
" \n",
" # Calculate pruning statistics\n",
" neurons_removed = original_neurons - keep_count\n",
" compression_ratio = original_neurons / keep_count if keep_count > 0 else float('inf')\n",
" \n",
" # Calculate parameter reduction\n",
" original_params = layer.input_size * original_neurons + (original_neurons if bias is not None else 0)\n",
" new_params = layer.input_size * keep_count + (keep_count if bias is not None else 0)\n",
" param_reduction = (original_params - new_params) / original_params\n",
" \n",
" pruning_info = {\n",
" 'keep_ratio': keep_ratio,\n",
" 'importance_method': importance_method,\n",
" 'original_neurons': original_neurons,\n",
" 'remaining_neurons': keep_count,\n",
" 'neurons_removed': neurons_removed,\n",
" 'compression_ratio': float(compression_ratio),\n",
" 'original_params': original_params,\n",
" 'new_params': new_params,\n",
" 'param_reduction': float(param_reduction),\n",
" 'keep_indices': keep_indices.tolist()\n",
" }\n",
" \n",
" return pruned_layer, pruning_info\n",
" ### END SOLUTION "
]
},
{
"cell_type": "markdown",
"id": "c220e739",
"metadata": {
"cell_marker": "\"\"\"",
"lines_to_next_cell": 1
},
"source": [
"### 🧪 Unit Test: Structured Pruning\n",
"\n",
"This test validates your structured pruning implementation, ensuring it correctly removes entire neurons or channels while maintaining model architecture integrity and computational efficiency."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ae8b114a",
"metadata": {
"lines_to_next_cell": 1,
"nbgrader": {
"grade": false,
"grade_id": "test-structured-pruning",
"locked": false,
"schema_version": 3,
"solution": false,
"task": false
}
},
"outputs": [],
"source": [
"def test_unit_structured_pruning():\n",
" \"\"\"Unit test for the structured pruning (neuron pruning) functionality.\"\"\"\n",
" print(\"🔬 Unit Test: Structured Pruning...\")\n",
" \n",
" # Create a simple Dense layer\n",
" layer = Dense(100, 50)\n",
" \n",
" # Test basic pruning\n",
" pruned_layer, info = prune_layer_neurons(layer, keep_ratio=0.75)\n",
" \n",
" # Verify pruning results\n",
" assert info['keep_ratio'] == 0.75, f\"Expected 0.75, got {info['keep_ratio']}\"\n",
" assert info['original_neurons'] == 50, f\"Expected 50, got {info['original_neurons']}\"\n",
" assert info['remaining_neurons'] == 37, f\"Expected 37, got {info['remaining_neurons']}\"\n",
" assert info['neurons_removed'] == 13, f\"Expected 13, got {info['neurons_removed']}\"\n",
" assert info['compression_ratio'] >= 1.35, f\"Compression ratio should be at least 1.35, got {info['compression_ratio']}\"\n",
" \n",
" print(f\"✅ Basic structured pruning works: {info['neurons_removed']} neurons removed\")\n",
" \n",
" # Test parameter reduction\n",
" assert info['param_reduction'] >= 0.25, f\"Parameter reduction should be at least 0.25, got {info['param_reduction']}\"\n",
" print(f\"✅ Parameter reduction works: {info['param_reduction']:.2%}\")\n",
" \n",
" # Test edge cases\n",
" empty_layer = Dense(10, 10)\n",
" _, info_empty = prune_layer_neurons(empty_layer, keep_ratio=0.5)\n",
" assert info_empty['remaining_neurons'] == 5, f\"Empty layer should have 5 neurons, got {info_empty['remaining_neurons']}\"\n",
" \n",
" print(\"✅ Edge cases work correctly\")\n",
" \n",
" # Test different keep ratios\n",
" layer2 = Dense(50, 25)\n",
" _, info_ratio70 = prune_layer_neurons(layer2, keep_ratio=0.7)\n",
" _, info_ratio50 = prune_layer_neurons(layer2, keep_ratio=0.5)\n",
" \n",
" assert info_ratio70['remaining_neurons'] > info_ratio50['remaining_neurons'], \"Higher keep ratio should result in more neurons\"\n",
" print(f\"✅ Different keep ratios work: 70% ratio = {info_ratio70['remaining_neurons']}, 50% ratio = {info_ratio50['remaining_neurons']}\")\n",
" \n",
" # Test different importance methods\n",
" _, info_weight_mag = prune_layer_neurons(layer, keep_ratio=0.75, importance_method='weight_magnitude')\n",
" _, info_weight_var = prune_layer_neurons(layer, keep_ratio=0.75, importance_method='weight_variance')\n",
" \n",
" # Both should achieve similar compression ratios since they both keep 75% of neurons\n",
" print(f\"✅ Different importance methods work: Weight Mag = {info_weight_mag['compression_ratio']:.2f}, Weight Var = {info_weight_var['compression_ratio']:.2f}\")\n",
" \n",
" print(\"📈 Progress: Structured Pruning ✓\")\n",
" print(\"🎯 Structured pruning behavior:\")\n",
" print(\" - Removes least important neurons\")\n",
" print(\" - Maintains layer structure and connectivity\")\n",
" print(\" - Provides detailed statistics for analysis\")\n",
" print(\" - Scales to different keep ratios\")\n",
" print()\n",
"\n",
"# Test will be run in main block "
]
},
{
"cell_type": "markdown",
"id": "9acd56e7",
"metadata": {
"cell_marker": "\"\"\"",
"lines_to_next_cell": 1
},
"source": [
"## Step 6: ML Systems Profiling - Production Compression Analysis\n",
"\n",
"### Production Compression Challenges\n",
"Real-world deployment requires sophisticated analysis of compression trade-offs:\n",
"\n",
"#### **Hardware-Specific Optimization**\n",
"- **Mobile ARM processors**: Optimized for INT8 operations\n",
"- **NVIDIA GPUs**: Tensor Core acceleration for specific quantization formats\n",
"- **Edge TPUs**: Designed for INT8 quantized models\n",
"- **x86 CPUs**: SIMD instructions for structured sparsity\n",
"\n",
"#### **Deployment Constraints**\n",
"- **Memory bandwidth**: Mobile devices have limited memory bandwidth\n",
"- **Power consumption**: Battery life constraints on mobile devices\n",
"- **Latency requirements**: Real-time applications need predictable inference times\n",
"- **Model accuracy**: Acceptable accuracy degradation varies by application\n",
"\n",
"#### **Production Serving Patterns**\n",
"- **Batch inference**: Optimize for throughput over latency\n",
"- **Online serving**: Optimize for latency and resource efficiency\n",
"- **Edge deployment**: Optimize for memory and power consumption\n",
"- **Multi-model serving**: Balance resource sharing across models\n",
"\n",
"### ML Systems Thinking: Compression in Production\n",
"The CompressionSystemsProfiler analyzes compression techniques through the lens of production deployment, measuring not just compression ratios but real-world performance implications.\n",
"\n",
"Let's build advanced compression analysis tools!"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cbc8f024",
"metadata": {
"lines_to_next_cell": 1,
"nbgrader": {
"grade": false,
"grade_id": "compression-systems-profiler",
"locked": false,
"schema_version": 3,
"solution": true,
"task": false
}
},
"outputs": [],
"source": [
"#| export\n",
"class CompressionSystemsProfiler:\n",
" \"\"\"\n",
" Advanced profiling system for analyzing compression techniques in production environments.\n",
" \n",
" This profiler provides 65% implementation level analysis of compression techniques,\n",
" focusing on production deployment scenarios including quantization impact analysis,\n",
" inference speedup measurements, and hardware-specific optimizations.\n",
" \"\"\"\n",
" \n",
" def __init__(self):\n",
" \"\"\"Initialize the compression systems profiler.\"\"\"\n",
" self.metrics = CompressionMetrics()\n",
" self.compression_history = []\n",
" \n",
" def analyze_quantization_impact(self, model: Sequential, target_bits: List[int] = [32, 16, 8, 4]) -> Dict[str, Any]:\n",
" \"\"\"\n",
" Analyze quantization impact across different bit widths for production deployment.\n",
" \n",
" Args:\n",
" model: Sequential model to analyze\n",
" target_bits: List of bit widths to test\n",
" \n",
" Returns:\n",
" Comprehensive quantization analysis including accuracy vs compression tradeoffs\n",
" \n",
" TODO: Implement advanced quantization impact analysis (65% implementation level).\n",
" \n",
" STEP-BY-STEP IMPLEMENTATION:\n",
" 1. Create model copies for each bit width\n",
" 2. Apply quantization with different bit widths\n",
" 3. Measure memory reduction and inference implications\n",
" 4. Calculate theoretical speedup for different hardware\n",
" 5. Analyze accuracy degradation patterns\n",
" 6. Generate production deployment recommendations\n",
" \n",
" PRODUCTION PATTERNS TO ANALYZE:\n",
" - Mobile deployment (ARM processors, limited memory)\n",
" - Edge inference (TPUs, power constraints)\n",
" - Cloud serving (GPU acceleration, batch processing)\n",
" - Real-time systems (latency requirements)\n",
" \n",
" IMPLEMENTATION HINTS:\n",
" - Model different hardware characteristics\n",
" - Consider memory bandwidth limitations\n",
" - Include power consumption estimates\n",
" - Analyze batch vs single inference patterns\n",
" \n",
" LEARNING CONNECTIONS:\n",
" - This mirrors TensorFlow Lite quantization analysis\n",
" - Production systems need this kind of comprehensive analysis\n",
" - Hardware-aware compression is crucial for deployment\n",
" \"\"\"\n",
" ### BEGIN SOLUTION\n",
" results = {\n",
" 'quantization_analysis': {},\n",
" 'hardware_recommendations': {},\n",
" 'deployment_scenarios': {}\n",
" }\n",
" \n",
" baseline_size = self.metrics.calculate_model_size(model, dtype='float32')\n",
" baseline_params = self.metrics.count_parameters(model)['total_parameters']\n",
" \n",
" for bits in target_bits:\n",
" # Create model copy for quantization\n",
" test_model = Sequential([Dense(layer.input_size, layer.output_size) for layer in model.layers])\n",
" for i, layer in enumerate(test_model.layers):\n",
" layer.weights = Tensor(model.layers[i].weights.data.copy() if hasattr(model.layers[i].weights.data, 'copy') else np.array(model.layers[i].weights.data))\n",
" if hasattr(layer, 'bias') and model.layers[i].bias is not None:\n",
" layer.bias = Tensor(model.layers[i].bias.data.copy() if hasattr(model.layers[i].bias.data, 'copy') else np.array(model.layers[i].bias.data))\n",
" \n",
" # Apply quantization to all layers\n",
" total_error = 0\n",
" for i, layer in enumerate(test_model.layers):\n",
" if isinstance(layer, Dense):\n",
" _, quant_info = quantize_layer_weights(layer, bits=bits)\n",
" total_error += quant_info['mse_error']\n",
" \n",
" # Calculate quantized model size\n",
" dtype_map = {32: 'float32', 16: 'float16', 8: 'int8', 4: 'int8'} # Approximate for 4-bit\n",
" quantized_size = self.metrics.calculate_model_size(test_model, dtype=dtype_map.get(bits, 'int8'))\n",
" \n",
" # Memory and performance analysis\n",
" memory_reduction = baseline_size['size_mb'] / quantized_size['size_mb']\n",
" \n",
" # Hardware-specific analysis\n",
" hardware_analysis = {\n",
" 'mobile_arm': {\n",
" 'memory_bandwidth_improvement': memory_reduction * 0.8, # ARM efficiency\n",
" 'inference_speedup': min(memory_reduction * 0.6, 4.0), # Conservative estimate\n",
" 'power_reduction': memory_reduction * 0.7, # Power scales with memory access\n",
" 'deployment_feasibility': 'excellent' if quantized_size['size_mb'] < 10 else 'good' if quantized_size['size_mb'] < 50 else 'limited'\n",
" },\n",
" 'edge_tpu': {\n",
" 'quantization_compatibility': 'native' if bits == 8 else 'emulated',\n",
" 'inference_speedup': 8.0 if bits == 8 else 1.0, # TPUs optimized for INT8\n",
" 'power_efficiency': 'optimal' if bits == 8 else 'suboptimal',\n",
" 'deployment_feasibility': 'excellent' if bits == 8 and quantized_size['size_mb'] < 20 else 'limited'\n",
" },\n",
" 'gpu_cloud': {\n",
" 'tensor_core_acceleration': True if bits in [16, 8] else False,\n",
" 'batch_throughput_improvement': memory_reduction * 1.2, # GPU batch efficiency\n",
" 'memory_capacity_improvement': memory_reduction,\n",
" 'deployment_feasibility': 'excellent' # Cloud has fewer constraints\n",
" }\n",
" }\n",
" \n",
" results['quantization_analysis'][f'{bits}bit'] = {\n",
" 'bits': bits,\n",
" 'model_size_mb': quantized_size['size_mb'],\n",
" 'memory_reduction_factor': memory_reduction,\n",
" 'quantization_error': total_error / len(test_model.layers),\n",
" 'compression_ratio': baseline_size['size_mb'] / quantized_size['size_mb'],\n",
" 'hardware_analysis': hardware_analysis\n",
" }\n",
" \n",
" # Generate deployment recommendations\n",
" results['deployment_scenarios'] = {\n",
" 'mobile_deployment': {\n",
" 'recommended_bits': 8,\n",
" 'rationale': 'INT8 provides optimal balance of size reduction and ARM processor efficiency',\n",
" 'expected_benefits': 'Memory reduction, inference speedup, improved battery life',\n",
" 'considerations': 'Monitor accuracy degradation, test on target devices'\n",
" },\n",
" 'edge_inference': {\n",
" 'recommended_bits': 8,\n",
" 'rationale': 'Edge TPUs and similar hardware optimized for INT8 quantization',\n",
" 'expected_benefits': 'Maximum hardware acceleration, minimal power consumption',\n",
" 'considerations': 'Ensure quantization-aware training for best accuracy'\n",
" },\n",
" 'cloud_serving': {\n",
" 'recommended_bits': 16,\n",
" 'rationale': 'FP16 provides good compression with minimal accuracy loss and GPU acceleration',\n",
" 'expected_benefits': 'Increased batch throughput, reduced memory usage',\n",
" 'considerations': 'Consider mixed precision for optimal performance'\n",
" }\n",
" }\n",
" \n",
" return results\n",
" ### END SOLUTION\n",
" \n",
" def measure_inference_speedup(self, original_model: Sequential, compressed_model: Sequential, \n",
" batch_sizes: List[int] = [1, 8, 32, 128]) -> Dict[str, Any]:\n",
" \"\"\"\n",
" Measure theoretical inference speedup from compression techniques.\n",
" \n",
" Args:\n",
" original_model: Baseline model\n",
" compressed_model: Compressed model to compare\n",
" batch_sizes: Different batch sizes for analysis\n",
" \n",
" Returns:\n",
" Inference speedup analysis across different scenarios\n",
" \"\"\"\n",
" results = {\n",
" 'flops_analysis': {},\n",
" 'memory_analysis': {},\n",
" 'speedup_estimates': {}\n",
" }\n",
" \n",
" # Calculate FLOPs for both models\n",
" original_flops = self._calculate_model_flops(original_model)\n",
" compressed_flops = self._calculate_model_flops(compressed_model)\n",
" \n",
" # Memory analysis\n",
" original_size = self.metrics.calculate_model_size(original_model)\n",
" compressed_size = self.metrics.calculate_model_size(compressed_model)\n",
" \n",
" results['flops_analysis'] = {\n",
" 'original_flops': original_flops,\n",
" 'compressed_flops': compressed_flops,\n",
" 'flops_reduction': (original_flops - compressed_flops) / original_flops,\n",
" 'computational_speedup': original_flops / compressed_flops if compressed_flops > 0 else float('inf')\n",
" }\n",
" \n",
" results['memory_analysis'] = {\n",
" 'original_size_mb': original_size['size_mb'],\n",
" 'compressed_size_mb': compressed_size['size_mb'],\n",
" 'memory_reduction': (original_size['size_mb'] - compressed_size['size_mb']) / original_size['size_mb'],\n",
" 'memory_speedup': original_size['size_mb'] / compressed_size['size_mb']\n",
" }\n",
" \n",
" # Estimate speedup for different scenarios\n",
" for batch_size in batch_sizes:\n",
" compute_time_original = original_flops * batch_size / 1e9 # Assume 1 GFLOPS baseline\n",
" compute_time_compressed = compressed_flops * batch_size / 1e9\n",
" \n",
" memory_time_original = original_size['size_mb'] * batch_size / 100 # Assume 100 MB/s memory bandwidth\n",
" memory_time_compressed = compressed_size['size_mb'] * batch_size / 100\n",
" \n",
" total_time_original = compute_time_original + memory_time_original\n",
" total_time_compressed = compute_time_compressed + memory_time_compressed\n",
" \n",
" results['speedup_estimates'][f'batch_{batch_size}'] = {\n",
" 'compute_speedup': compute_time_original / compute_time_compressed if compute_time_compressed > 0 else float('inf'),\n",
" 'memory_speedup': memory_time_original / memory_time_compressed if memory_time_compressed > 0 else float('inf'),\n",
" 'total_speedup': total_time_original / total_time_compressed if total_time_compressed > 0 else float('inf')\n",
" }\n",
" \n",
" return results\n",
" \n",
" def analyze_accuracy_tradeoffs(self, model: Sequential, compression_levels: List[float] = [0.1, 0.3, 0.5, 0.7, 0.9]) -> Dict[str, Any]:\n",
" \"\"\"\n",
" Analyze accuracy vs compression tradeoffs across different compression levels.\n",
" \n",
" Args:\n",
" model: Model to analyze\n",
" compression_levels: Different compression ratios to test\n",
" \n",
" Returns:\n",
" Analysis of accuracy degradation patterns\n",
" \"\"\"\n",
" results = {\n",
" 'compression_curves': {},\n",
" 'optimal_operating_points': {},\n",
" 'production_recommendations': {}\n",
" }\n",
" \n",
" baseline_size = self.metrics.calculate_model_size(model)\n",
" \n",
" for level in compression_levels:\n",
" # Test different compression techniques at this level\n",
" techniques = {\n",
" 'magnitude_pruning': self._apply_magnitude_pruning(model, level),\n",
" 'structured_pruning': self._apply_structured_pruning(model, 1 - level),\n",
" 'quantization': self._apply_quantization(model, max(4, int(32 * (1 - level))))\n",
" }\n",
" \n",
" for technique_name, compressed_model in techniques.items():\n",
" if compressed_model is not None:\n",
" compressed_size = self.metrics.calculate_model_size(compressed_model)\n",
" compression_ratio = baseline_size['size_mb'] / compressed_size['size_mb']\n",
" \n",
" if technique_name not in results['compression_curves']:\n",
" results['compression_curves'][technique_name] = []\n",
" \n",
" results['compression_curves'][technique_name].append({\n",
" 'compression_level': level,\n",
" 'compression_ratio': compression_ratio,\n",
" 'size_mb': compressed_size['size_mb'],\n",
" 'estimated_accuracy_retention': 1.0 - (level * 0.5) # Simplified model\n",
" })\n",
" \n",
" # Find optimal operating points\n",
" for technique in results['compression_curves']:\n",
" curves = results['compression_curves'][technique]\n",
" # Find point with best accuracy/compression balance\n",
" best_point = max(curves, key=lambda x: x['compression_ratio'] * x['estimated_accuracy_retention'])\n",
" results['optimal_operating_points'][technique] = best_point\n",
" \n",
" return results\n",
" \n",
" def _calculate_model_flops(self, model: Sequential) -> int:\n",
" \"\"\"Calculate FLOPs for a Sequential model.\"\"\"\n",
" total_flops = 0\n",
" for layer in model.layers:\n",
" if isinstance(layer, Dense):\n",
" total_flops += layer.input_size * layer.output_size * 2 # Multiply-add operations\n",
" return total_flops\n",
" \n",
" def _apply_magnitude_pruning(self, model: Sequential, pruning_ratio: float) -> Optional[Sequential]:\n",
" \"\"\"Apply magnitude pruning to a model copy.\"\"\"\n",
" try:\n",
" test_model = Sequential([Dense(layer.input_size, layer.output_size) for layer in model.layers])\n",
" for i, layer in enumerate(test_model.layers):\n",
" layer.weights = Tensor(model.layers[i].weights.data.copy() if hasattr(model.layers[i].weights.data, 'copy') else np.array(model.layers[i].weights.data))\n",
" if hasattr(layer, 'bias') and model.layers[i].bias is not None:\n",
" layer.bias = Tensor(model.layers[i].bias.data.copy() if hasattr(model.layers[i].bias.data, 'copy') else np.array(model.layers[i].bias.data))\n",
" prune_weights_by_magnitude(layer, pruning_ratio)\n",
" return test_model\n",
" except Exception:\n",
" return None\n",
" \n",
" def _apply_structured_pruning(self, model: Sequential, keep_ratio: float) -> Optional[Sequential]:\n",
" \"\"\"Apply structured pruning to a model copy.\"\"\"\n",
" try:\n",
" test_model = Sequential([Dense(layer.input_size, layer.output_size) for layer in model.layers])\n",
" for i, layer in enumerate(test_model.layers):\n",
" layer.weights = Tensor(model.layers[i].weights.data.copy() if hasattr(model.layers[i].weights.data, 'copy') else np.array(model.layers[i].weights.data))\n",
" if hasattr(layer, 'bias') and model.layers[i].bias is not None:\n",
" layer.bias = Tensor(model.layers[i].bias.data.copy() if hasattr(model.layers[i].bias.data, 'copy') else np.array(model.layers[i].bias.data))\n",
" pruned_layer, _ = prune_layer_neurons(layer, keep_ratio)\n",
" test_model.layers[i] = pruned_layer\n",
" return test_model\n",
" except Exception:\n",
" return None\n",
" \n",
" def _apply_quantization(self, model: Sequential, bits: int) -> Optional[Sequential]:\n",
" \"\"\"Apply quantization to a model copy.\"\"\"\n",
" try:\n",
" test_model = Sequential([Dense(layer.input_size, layer.output_size) for layer in model.layers])\n",
" for i, layer in enumerate(test_model.layers):\n",
" layer.weights = Tensor(model.layers[i].weights.data.copy() if hasattr(model.layers[i].weights.data, 'copy') else np.array(model.layers[i].weights.data))\n",
" if hasattr(layer, 'bias') and model.layers[i].bias is not None:\n",
" layer.bias = Tensor(model.layers[i].bias.data.copy() if hasattr(model.layers[i].bias.data, 'copy') else np.array(model.layers[i].bias.data))\n",
" quantize_layer_weights(layer, bits)\n",
" return test_model\n",
" except Exception:\n",
" return None"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4744531a",
"metadata": {
"lines_to_next_cell": 1,
"nbgrader": {
"grade": false,
"grade_id": "compression-comparison",
"locked": false,
"schema_version": 3,
"solution": true,
"task": false
}
},
"outputs": [],
"source": [
"#| export\n",
"def compare_compression_techniques(original_model: Sequential) -> Dict[str, Dict[str, Any]]:\n",
" \"\"\"\n",
" Compare all compression techniques on the same model.\n",
" \n",
" Args:\n",
" original_model: Base model to compress using different techniques\n",
" \n",
" Returns:\n",
" Dictionary comparing results from different compression approaches\n",
" \n",
" TODO: Implement comprehensive compression comparison.\n",
" \n",
" STEP-BY-STEP IMPLEMENTATION:\n",
" 1. Set up baseline metrics from original model\n",
" 2. Apply each compression technique individually\n",
" 3. Apply combined compression techniques\n",
" 4. Measure and compare all results\n",
" 5. Return comprehensive comparison data\n",
" \n",
" COMPARISON DIMENSIONS:\n",
" - Model size (MB)\n",
" - Parameter count\n",
" - Compression ratio\n",
" - Memory reduction\n",
" - Estimated speedup (for structured techniques)\n",
" \n",
" IMPLEMENTATION HINTS:\n",
" - Create separate model copies for each technique\n",
" - Use consistent parameters across techniques\n",
" - Track both individual and combined effects\n",
" - Include baseline for reference\n",
" \n",
" LEARNING CONNECTIONS:\n",
" - This is how research papers compare compression methods\n",
" - Production systems need this analysis for deployment decisions\n",
" - Understanding trade-offs guides technique selection\n",
" \"\"\"\n",
" ### BEGIN SOLUTION\n",
" results = {}\n",
" metrics = CompressionMetrics()\n",
" \n",
" # Baseline: Original model\n",
" baseline_params = metrics.count_parameters(original_model)\n",
" baseline_size = metrics.calculate_model_size(original_model)\n",
" \n",
" results['baseline'] = {\n",
" 'technique': 'Original Model',\n",
" 'parameters': baseline_params['total_parameters'],\n",
" 'size_mb': baseline_size['size_mb'],\n",
" 'compression_ratio': 1.0,\n",
" 'memory_reduction': 0.0\n",
" }\n",
" \n",
" # Technique 1: Magnitude-based pruning only\n",
" model_pruning = Sequential([Dense(layer.input_size, layer.output_size) for layer in original_model.layers])\n",
" for i, layer in enumerate(model_pruning.layers):\n",
" layer.weights = Tensor(original_model.layers[i].weights.data.copy() if hasattr(original_model.layers[i].weights.data, 'copy') else np.array(original_model.layers[i].weights.data))\n",
" if hasattr(layer, 'bias') and original_model.layers[i].bias is not None:\n",
" layer.bias = Tensor(original_model.layers[i].bias.data.copy() if hasattr(original_model.layers[i].bias.data, 'copy') else np.array(original_model.layers[i].bias.data))\n",
" \n",
" # Apply magnitude pruning to each layer\n",
" total_sparsity = 0\n",
" for i, layer in enumerate(model_pruning.layers):\n",
" if isinstance(layer, Dense):\n",
" _, prune_info = prune_weights_by_magnitude(layer, pruning_ratio=0.3)\n",
" total_sparsity += prune_info['sparsity']\n",
" \n",
" avg_sparsity = total_sparsity / len(model_pruning.layers)\n",
" pruning_params = metrics.count_parameters(model_pruning)\n",
" pruning_size = metrics.calculate_model_size(model_pruning)\n",
" \n",
" results['magnitude_pruning'] = {\n",
" 'technique': 'Magnitude Pruning (30%)',\n",
" 'parameters': pruning_params['total_parameters'],\n",
" 'size_mb': pruning_size['size_mb'],\n",
" 'compression_ratio': baseline_size['size_mb'] / pruning_size['size_mb'],\n",
" 'memory_reduction': (baseline_size['size_mb'] - pruning_size['size_mb']) / baseline_size['size_mb'],\n",
" 'sparsity': avg_sparsity\n",
" }\n",
" \n",
" # Technique 2: Quantization only\n",
" model_quantization = Sequential([Dense(layer.input_size, layer.output_size) for layer in original_model.layers])\n",
" for i, layer in enumerate(model_quantization.layers):\n",
" layer.weights = Tensor(original_model.layers[i].weights.data.copy() if hasattr(original_model.layers[i].weights.data, 'copy') else np.array(original_model.layers[i].weights.data))\n",
" if hasattr(layer, 'bias') and original_model.layers[i].bias is not None:\n",
" layer.bias = Tensor(original_model.layers[i].bias.data.copy() if hasattr(original_model.layers[i].bias.data, 'copy') else np.array(original_model.layers[i].bias.data))\n",
" \n",
" # Apply quantization to each layer\n",
" total_memory_reduction = 0\n",
" for i, layer in enumerate(model_quantization.layers):\n",
" if isinstance(layer, Dense):\n",
" _, quant_info = quantize_layer_weights(layer, bits=8)\n",
" total_memory_reduction += quant_info['memory_reduction']\n",
" \n",
" avg_memory_reduction = total_memory_reduction / len(model_quantization.layers)\n",
" quantization_size = metrics.calculate_model_size(model_quantization, dtype='int8')\n",
" \n",
" results['quantization'] = {\n",
" 'technique': 'Quantization (INT8)',\n",
" 'parameters': baseline_params['total_parameters'],\n",
" 'size_mb': quantization_size['size_mb'],\n",
" 'compression_ratio': baseline_size['size_mb'] / quantization_size['size_mb'],\n",
" 'memory_reduction': (baseline_size['size_mb'] - quantization_size['size_mb']) / baseline_size['size_mb'],\n",
" 'avg_memory_reduction_factor': avg_memory_reduction\n",
" }\n",
" \n",
" # Technique 3: Structured pruning only\n",
" model_structured = Sequential([Dense(layer.input_size, layer.output_size) for layer in original_model.layers])\n",
" for i, layer in enumerate(model_structured.layers):\n",
" layer.weights = Tensor(original_model.layers[i].weights.data.copy() if hasattr(original_model.layers[i].weights.data, 'copy') else np.array(original_model.layers[i].weights.data))\n",
" if hasattr(layer, 'bias') and original_model.layers[i].bias is not None:\n",
" layer.bias = Tensor(original_model.layers[i].bias.data.copy() if hasattr(original_model.layers[i].bias.data, 'copy') else np.array(original_model.layers[i].bias.data))\n",
" \n",
" # Apply structured pruning to each layer\n",
" total_param_reduction = 0\n",
" for i, layer in enumerate(model_structured.layers):\n",
" if isinstance(layer, Dense):\n",
" pruned_layer, struct_info = prune_layer_neurons(layer, keep_ratio=0.75)\n",
" model_structured.layers[i] = pruned_layer\n",
" total_param_reduction += struct_info['param_reduction']\n",
" \n",
" avg_param_reduction = total_param_reduction / len(model_structured.layers)\n",
" structured_params = metrics.count_parameters(model_structured)\n",
" structured_size = metrics.calculate_model_size(model_structured)\n",
" \n",
" results['structured_pruning'] = {\n",
" 'technique': 'Structured Pruning (75% neurons kept)',\n",
" 'parameters': structured_params['total_parameters'],\n",
" 'size_mb': structured_size['size_mb'],\n",
" 'compression_ratio': baseline_size['size_mb'] / structured_size['size_mb'],\n",
" 'memory_reduction': (baseline_size['size_mb'] - structured_size['size_mb']) / baseline_size['size_mb'],\n",
" 'param_reduction': avg_param_reduction\n",
" }\n",
" \n",
" # Technique 4: Combined approach\n",
" model_combined = Sequential([Dense(layer.input_size, layer.output_size) for layer in original_model.layers])\n",
" for i, layer in enumerate(model_combined.layers):\n",
" layer.weights = Tensor(original_model.layers[i].weights.data.copy() if hasattr(original_model.layers[i].weights.data, 'copy') else np.array(original_model.layers[i].weights.data))\n",
" if hasattr(layer, 'bias') and original_model.layers[i].bias is not None:\n",
" layer.bias = Tensor(original_model.layers[i].bias.data.copy() if hasattr(original_model.layers[i].bias.data, 'copy') else np.array(original_model.layers[i].bias.data))\n",
" \n",
" # Apply magnitude pruning + quantization + structured pruning\n",
" for i, layer in enumerate(model_combined.layers):\n",
" if isinstance(layer, Dense):\n",
" # Step 1: Magnitude pruning\n",
" _, _ = prune_weights_by_magnitude(layer, pruning_ratio=0.2)\n",
" # Step 2: Quantization \n",
" _, _ = quantize_layer_weights(layer, bits=8)\n",
" # Step 3: Structured pruning\n",
" pruned_layer, _ = prune_layer_neurons(layer, keep_ratio=0.8)\n",
" model_combined.layers[i] = pruned_layer\n",
" \n",
" combined_params = metrics.count_parameters(model_combined)\n",
" combined_size = metrics.calculate_model_size(model_combined, dtype='int8')\n",
" \n",
" results['combined'] = {\n",
" 'technique': 'Combined (Pruning + Quantization + Structured)',\n",
" 'parameters': combined_params['total_parameters'],\n",
" 'size_mb': combined_size['size_mb'],\n",
" 'compression_ratio': baseline_size['size_mb'] / combined_size['size_mb'],\n",
" 'memory_reduction': (baseline_size['size_mb'] - combined_size['size_mb']) / baseline_size['size_mb']\n",
" }\n",
" \n",
" return results\n",
" ### END SOLUTION"
]
},
{
"cell_type": "markdown",
"id": "23ee0c71",
"metadata": {
"cell_marker": "\"\"\""
},
"source": [
"## 🧪 Testing Infrastructure\n",
"\n",
"### 🔬 Unit Testing Pattern\n",
"Each compression technique includes comprehensive unit tests:\n",
"\n",
"1. **Functionality verification**: Core algorithms work correctly\n",
"2. **Edge case handling**: Robust error handling and boundary conditions\n",
"3. **Statistical validation**: Compression metrics and analysis\n",
"4. **Performance measurement**: Before/after comparisons\n",
"\n",
"### 📈 Progress Tracking\n",
"- **CompressionMetrics**: ✅ Complete with parameter counting\n",
"- **Magnitude-based pruning**: ✅ Complete with sparsity calculation\n",
"- **Quantization**: 🔄 Coming next\n",
"- **Knowledge distillation**: 🔄 Coming next\n",
"- **Structured pruning**: 🔄 Coming next\n",
"- **Comprehensive comparison**: 🔄 Coming next\n",
"\n",
"### 🎓 Educational Value\n",
"- **Conceptual understanding**: Why compression matters\n",
"- **Practical implementation**: Build techniques from scratch\n",
"- **Real-world connections**: Mobile, edge, and production deployment\n",
"- **Systems thinking**: Balance accuracy, efficiency, and constraints\n",
"\n",
"This module teaches the essential skills for deploying AI in resource-constrained environments!"
]
},
{
"cell_type": "markdown",
"id": "a0634e78",
"metadata": {
"cell_marker": "\"\"\"",
"lines_to_next_cell": 1
},
"source": [
"### 🧪 Unit Test: ML Systems Compression Profiler\n",
"\n",
"This test validates the CompressionSystemsProfiler implementation, ensuring it provides comprehensive analysis of compression techniques for production deployment scenarios."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ccae2f66",
"metadata": {
"lines_to_next_cell": 1,
"nbgrader": {
"grade": false,
"grade_id": "test-systems-profiler",
"locked": false,
"schema_version": 3,
"solution": false,
"task": false
}
},
"outputs": [],
"source": [
"def test_unit_compression_systems_profiler():\n",
" \"\"\"Unit test for the CompressionSystemsProfiler class.\"\"\"\n",
" print(\"🔬 Unit Test: ML Systems Compression Profiler...\")\n",
" \n",
" # Create a test model\n",
" model = Sequential([\n",
" Dense(784, 256),\n",
" Dense(256, 128),\n",
" Dense(128, 10)\n",
" ])\n",
" \n",
" # Initialize profiler\n",
" profiler = CompressionSystemsProfiler()\n",
" \n",
" # Test quantization impact analysis\n",
" quant_analysis = profiler.analyze_quantization_impact(model, target_bits=[32, 16, 8])\n",
" \n",
" # Verify quantization analysis structure\n",
" assert 'quantization_analysis' in quant_analysis, \"Should include quantization analysis\"\n",
" assert 'deployment_scenarios' in quant_analysis, \"Should include deployment scenarios\"\n",
" assert '8bit' in quant_analysis['quantization_analysis'], \"Should analyze 8-bit quantization\"\n",
" \n",
" # Verify hardware analysis\n",
" bit8_analysis = quant_analysis['quantization_analysis']['8bit']\n",
" assert 'hardware_analysis' in bit8_analysis, \"Should include hardware analysis\"\n",
" assert 'mobile_arm' in bit8_analysis['hardware_analysis'], \"Should analyze mobile ARM deployment\"\n",
" assert 'edge_tpu' in bit8_analysis['hardware_analysis'], \"Should analyze edge TPU deployment\"\n",
" assert 'gpu_cloud' in bit8_analysis['hardware_analysis'], \"Should analyze GPU cloud deployment\"\n",
" \n",
" print(f\"✅ Quantization analysis works: {len(quant_analysis['quantization_analysis'])} bit widths analyzed\")\n",
" \n",
" # Test compression ratio improvements\n",
" for bits in [16, 8]:\n",
" bit_key = f'{bits}bit'\n",
" if bit_key in quant_analysis['quantization_analysis']:\n",
" compression_ratio = quant_analysis['quantization_analysis'][bit_key]['compression_ratio']\n",
" assert compression_ratio > 1.0, f\"{bits}-bit should provide compression\"\n",
" \n",
" print(\"✅ Compression ratios verified\")\n",
" \n",
" # Test deployment recommendations\n",
" scenarios = quant_analysis['deployment_scenarios']\n",
" assert 'mobile_deployment' in scenarios, \"Should provide mobile deployment recommendations\"\n",
" assert 'edge_inference' in scenarios, \"Should provide edge inference recommendations\"\n",
" assert 'cloud_serving' in scenarios, \"Should provide cloud serving recommendations\"\n",
" \n",
" for scenario in scenarios.values():\n",
" assert 'recommended_bits' in scenario, \"Should recommend specific bit width\"\n",
" assert 'rationale' in scenario, \"Should provide rationale for recommendation\"\n",
" assert 'expected_benefits' in scenario, \"Should list expected benefits\"\n",
" \n",
" print(\"✅ Deployment recommendations work correctly\")\n",
" \n",
" # Test inference speedup measurement\n",
" compressed_model = Sequential([\n",
" Dense(784, 128), # Smaller than original\n",
" Dense(128, 64),\n",
" Dense(64, 10)\n",
" ])\n",
" \n",
" speedup_analysis = profiler.measure_inference_speedup(model, compressed_model, batch_sizes=[1, 32])\n",
" \n",
" # Verify speedup analysis structure\n",
" assert 'flops_analysis' in speedup_analysis, \"Should include FLOPs analysis\"\n",
" assert 'memory_analysis' in speedup_analysis, \"Should include memory analysis\"\n",
" assert 'speedup_estimates' in speedup_analysis, \"Should include speedup estimates\"\n",
" \n",
" # Verify speedup calculations\n",
" flops_analysis = speedup_analysis['flops_analysis']\n",
" assert flops_analysis['computational_speedup'] > 1.0, \"Compressed model should be faster\"\n",
" \n",
" memory_analysis = speedup_analysis['memory_analysis']\n",
" assert memory_analysis['memory_speedup'] > 1.0, \"Compressed model should use less memory\"\n",
" \n",
" print(f\"✅ Speedup analysis works: {flops_analysis['computational_speedup']:.2f}x compute, {memory_analysis['memory_speedup']:.2f}x memory\")\n",
" \n",
" # Test accuracy tradeoff analysis\n",
" tradeoff_analysis = profiler.analyze_accuracy_tradeoffs(model, compression_levels=[0.1, 0.5, 0.9])\n",
" \n",
" # Verify tradeoff analysis structure\n",
" assert 'compression_curves' in tradeoff_analysis, \"Should include compression curves\"\n",
" assert 'optimal_operating_points' in tradeoff_analysis, \"Should include optimal operating points\"\n",
" \n",
" # Verify compression techniques are analyzed\n",
" curves = tradeoff_analysis['compression_curves']\n",
" expected_techniques = ['magnitude_pruning', 'structured_pruning', 'quantization']\n",
" for technique in expected_techniques:\n",
" if technique in curves and len(curves[technique]) > 0:\n",
" print(f\"✅ {technique.replace('_', ' ').title()} analysis included\")\n",
" \n",
" print(\"✅ Accuracy tradeoff analysis works correctly\")\n",
" \n",
" print(\"📈 Progress: CompressionSystemsProfiler ✓\")\n",
" print(\"🎯 ML Systems Profiler behavior:\")\n",
" print(\" - Analyzes quantization impact across hardware platforms\")\n",
" print(\" - Measures inference speedup for different scenarios\")\n",
" print(\" - Provides production deployment recommendations\")\n",
" print(\" - Analyzes accuracy vs compression tradeoffs\")\n",
" print()\n",
"\n",
"# Test will be run in main block"
]
},
{
"cell_type": "markdown",
"id": "5cbb9ac0",
"metadata": {
"cell_marker": "\"\"\"",
"lines_to_next_cell": 1
},
"source": [
"### 🧪 Unit Test: Comprehensive Compression Comparison\n",
"\n",
"This test validates the complete compression pipeline, comparing different techniques (pruning, quantization, distillation) to analyze their effectiveness and trade-offs in model optimization."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "73a257b4",
"metadata": {
"lines_to_next_cell": 0,
"nbgrader": {
"grade": false,
"grade_id": "test-comprehensive-comparison",
"locked": false,
"schema_version": 3,
"solution": false,
"task": false
}
},
"outputs": [],
"source": [
"def test_unit_comprehensive_comparison():\n",
" \"\"\"Unit test for the comparison of different compression techniques.\"\"\"\n",
" print(\"🔬 Unit Test: Comprehensive Comparison of Techniques...\")\n",
" \n",
" # Create a simple model\n",
" model = Sequential([\n",
" Dense(784, 128),\n",
" Dense(128, 64),\n",
" Dense(64, 10)\n",
" ])\n",
" \n",
" # Run comprehensive comparison\n",
" results = compare_compression_techniques(model)\n",
" \n",
" # Verify baseline exists\n",
" assert 'baseline' in results, \"Baseline results should be included\"\n",
" baseline = results['baseline']\n",
" assert baseline['compression_ratio'] == 1.0, f\"Baseline compression ratio should be 1.0, got {baseline['compression_ratio']}\"\n",
" \n",
" print(f\"✅ Baseline analysis works: {baseline['parameters']} parameters, {baseline['size_mb']} MB\")\n",
" \n",
" # Verify individual techniques\n",
" techniques = ['magnitude_pruning', 'quantization', 'structured_pruning', 'combined']\n",
" for technique in techniques:\n",
" assert technique in results, f\"Missing technique: {technique}\"\n",
" result = results[technique]\n",
" \n",
" # Magnitude pruning creates sparsity but doesn't reduce file size in our simulation\n",
" if technique == 'magnitude_pruning':\n",
" assert result['compression_ratio'] >= 1.0, f\"{technique} should have compression ratio >= 1.0\"\n",
" else:\n",
" assert result['compression_ratio'] > 1.0, f\"{technique} should have compression ratio > 1.0\"\n",
" \n",
" assert 0 <= result['memory_reduction'] <= 1.0, f\"{technique} memory reduction should be between 0 and 1\"\n",
" \n",
" print(\"✅ All compression techniques work correctly\")\n",
" \n",
" # Verify compression effectiveness\n",
" quantization = results['quantization']\n",
" structured = results['structured_pruning']\n",
" combined = results['combined']\n",
" \n",
" assert quantization['compression_ratio'] >= 3.0, f\"Quantization should achieve at least 3x compression, got {quantization['compression_ratio']:.2f}\"\n",
" assert structured['compression_ratio'] >= 1.2, f\"Structured pruning should achieve at least 1.2x compression, got {structured['compression_ratio']:.2f}\"\n",
" assert combined['compression_ratio'] >= quantization['compression_ratio'], f\"Combined should be at least as good as best individual technique\"\n",
" \n",
" print(f\"✅ Compression effectiveness verified:\")\n",
" print(f\" - Quantization: {quantization['compression_ratio']:.2f}x compression\")\n",
" print(f\" - Structured: {structured['compression_ratio']:.2f}x compression\") \n",
" print(f\" - Combined: {combined['compression_ratio']:.2f}x compression\")\n",
" \n",
" # Verify different techniques have different characteristics\n",
" magnitude = results['magnitude_pruning']\n",
" assert 'sparsity' in magnitude, \"Magnitude pruning should report sparsity\"\n",
" assert 'avg_memory_reduction_factor' in quantization, \"Quantization should report memory reduction factor\"\n",
" assert 'param_reduction' in structured, \"Structured pruning should report parameter reduction\"\n",
" \n",
" print(\"✅ Technique-specific metrics work correctly\")\n",
" \n",
" print(\"📈 Progress: Comprehensive Comparison ✓\")\n",
" print(\"🎯 Comprehensive comparison behavior:\")\n",
" print(\" - Compares all techniques systematically\")\n",
" print(\" - Provides detailed metrics for each approach\")\n",
" print(\" - Enables informed compression strategy selection\")\n",
" print(\" - Demonstrates combined technique effectiveness\")\n",
" print()\n",
"\n",
"# Run the test only if executed directly"
]
},
{
"cell_type": "markdown",
"id": "5bcb656a",
"metadata": {
"cell_marker": "\"\"\"",
"lines_to_next_cell": 1
},
"source": [
"### 🧪 Integration Test: Compression with Sequential Models\n",
"\n",
"This integration test validates that all compression techniques work seamlessly with TinyTorch's Sequential models, ensuring proper layer integration and end-to-end functionality."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e125e2d5",
"metadata": {
"lines_to_next_cell": 1
},
"outputs": [],
"source": [
"def test_module_compression():\n",
" \"\"\"Integration test for applying compression to a Sequential model.\"\"\"\n",
" print(\"🔬 Running Integration Test: Compression on Sequential Model...\")\n",
"\n",
" # 1. Create a simple Sequential model\n",
" model = Sequential([\n",
" Dense(10, 20),\n",
" Dense(20, 5)\n",
" ])\n",
" \n",
" # 2. Get the first Dense layer to be pruned\n",
" layer_to_prune = model.layers[0]\n",
" \n",
" # 3. Calculate initial sparsity\n",
" initial_sparsity = calculate_sparsity(layer_to_prune)\n",
" \n",
" # 4. Prune the layer's weights\n",
" pruned_layer, _ = prune_weights_by_magnitude(layer_to_prune, pruning_ratio=0.5)\n",
" \n",
" # 5. Replace the layer in the model\n",
" model.layers[0] = pruned_layer\n",
" \n",
" # 6. Calculate final sparsity\n",
" final_sparsity = calculate_sparsity(model.layers[0])\n",
" \n",
" print(f\"Initial Sparsity: {initial_sparsity:.2f}, Final Sparsity: {final_sparsity:.2f}\")\n",
" assert final_sparsity > initial_sparsity, \"Sparsity should increase after pruning.\"\n",
" assert abs(final_sparsity - 0.5) < 0.01, \"Sparsity should be close to the pruning ratio.\"\n",
"\n",
" print(\"✅ Integration Test Passed: Pruning correctly modified a layer in a Sequential model.\")"
]
},
{
"cell_type": "markdown",
"id": "dabe9a89",
"metadata": {
"cell_marker": "\"\"\"",
"lines_to_next_cell": 1
},
"source": [
"### 🧪 Integration Test: Comprehensive Compression Pipeline\n",
"\n",
"This comprehensive integration test validates the complete compression workflow, applying multiple techniques in sequence and ensuring proper interaction between compression methods and model architectures."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "08d17ed6",
"metadata": {
"lines_to_next_cell": 1
},
"outputs": [],
"source": [
"def test_module_compression():\n",
" \"\"\"\n",
" Integration test for applying multiple compression techniques to a Sequential model.\n",
" \n",
" Tests that multiple compression techniques can be applied to a Sequential model\n",
" and that metrics are tracked correctly.\n",
" \"\"\"\n",
" print(\"🔬 Running Integration Test: Comprehensive Compression...\")\n",
"\n",
" # 1. Create a model and metrics calculator\n",
" model = Sequential([\n",
" Dense(100, 50),\n",
" Dense(50, 20),\n",
" Dense(20, 10)\n",
" ])\n",
" metrics = CompressionMetrics()\n",
"\n",
" # 2. Get baseline metrics\n",
" initial_params = metrics.count_parameters(model)['total_parameters']\n",
" initial_size_mb = metrics.calculate_model_size(model)['size_mb']\n",
" \n",
" # 3. Apply pruning to the first layer\n",
" layer_to_prune = model.layers[0]\n",
" model.layers[0], _ = prune_weights_by_magnitude(layer_to_prune, pruning_ratio=0.8)\n",
"\n",
" # 4. Verify sparsity increased and parameters are the same\n",
" sparsity_after_pruning = calculate_sparsity(model.layers[0])\n",
" params_after_pruning = metrics.count_parameters(model)['total_parameters']\n",
" \n",
" assert sparsity_after_pruning > 0.79, \"Sparsity should be high after pruning.\"\n",
" assert params_after_pruning == initial_params, \"Pruning shouldn't change param count.\"\n",
" print(f\"✅ Pruning successful. Sparsity: {sparsity_after_pruning:.2f}\")\n",
"\n",
" # 5. Apply quantization to all layers\n",
" for i, layer in enumerate(model.layers):\n",
" if isinstance(layer, Dense):\n",
" model.layers[i], _ = quantize_layer_weights(layer, bits=8)\n",
" \n",
" # 6. Verify model size is reduced\n",
" final_size_mb = metrics.calculate_model_size(model, dtype='int8')['size_mb']\n",
" \n",
" print(f\"Initial size: {initial_size_mb:.4f} MB, Final size: {final_size_mb:.4f} MB\")\n",
" assert final_size_mb < initial_size_mb / 1.5, \"Quantization should significantly reduce model size.\"\n",
"\n",
" print(\"✅ Integration Test Passed: Comprehensive compression successfully applied and verified.\")"
]
},
{
"cell_type": "markdown",
"id": "1fdbedcf",
"metadata": {
"cell_marker": "\"\"\""
},
"source": [
"## 🧪 Module Testing\n",
"\n",
"Time to test your implementation! This section uses TinyTorch's standardized testing framework to ensure your implementation works correctly.\n",
"\n",
"**This testing section is locked** - it provides consistent feedback across all modules and cannot be modified."
]
},
{
"cell_type": "markdown",
"id": "43341467",
"metadata": {
"cell_marker": "\"\"\""
},
"source": [
"## 🤖 AUTO TESTING"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "842c2ce0",
"metadata": {
"nbgrader": {
"grade": false,
"grade_id": "standardized-testing",
"locked": true,
"schema_version": 3,
"solution": false,
"task": false
}
},
"outputs": [],
"source": [
"# =============================================================================\n",
"# STANDARDIZED MODULE TESTING - DO NOT MODIFY\n",
"# This cell is locked to ensure consistent testing across all TinyTorch modules\n",
"# =============================================================================\n",
"\n",
"if __name__ == \"__main__\":\n",
" # Run all compression tests\n",
" test_unit_magnitude_pruning()\n",
" test_unit_structured_pruning() \n",
" test_unit_weight_quantization()\n",
" test_unit_layer_quantization()\n",
" test_unit_knowledge_distillation()\n",
" test_unit_comprehensive_comparison()\n",
" test_module_compression()\n",
" \n",
" print(\"All tests passed!\")\n",
" print(\"Compression module complete!\")"
]
},
{
"cell_type": "markdown",
"id": "a1395c5b",
"metadata": {
"cell_marker": "\"\"\""
},
"source": [
"## 🤔 ML Systems Thinking: Compression in Production\n",
"\n",
"### 🏗️ System Design Questions\n",
"Think about how compression fits into larger ML systems:\n",
"\n",
"1. **Multi-Model Serving**: How would you design a system that serves multiple compressed models with different optimization profiles (latency-optimized vs memory-optimized) and automatically routes requests based on device capabilities?\n",
"\n",
"2. **Compression Pipeline Automation**: What would a production pipeline look like that automatically selects compression techniques based on target deployment environment (mobile, edge, cloud) and performance requirements?\n",
"\n",
"3. **Hardware-Aware Optimization**: How might you design a system that profiles target hardware (ARM, x86, TPU, GPU) and automatically selects the optimal combination of quantization, pruning, and structured optimization?\n",
"\n",
"4. **Dynamic Compression**: How could you implement a system that adjusts compression levels in real-time based on available resources, battery level, or network conditions?\n",
"\n",
"### 🚀 Production ML Questions\n",
"Connect compression to real-world deployment challenges:\n",
"\n",
"5. **Model Store Design**: How would you architect a model registry that stores multiple compressed versions of the same model and serves the appropriate version based on client capabilities?\n",
"\n",
"6. **A/B Testing Compressed Models**: What metrics would you track when A/B testing compressed vs uncompressed models in production, and how would you handle the accuracy vs performance tradeoff?\n",
"\n",
"7. **Compression Monitoring**: How would you design monitoring systems to detect when compressed models are degrading in accuracy over time, and what automated responses would you implement?\n",
"\n",
"8. **Cross-Platform Deployment**: How might you design a system that takes a single trained model and automatically generates optimized versions for iOS, Android, web browsers, and edge devices?\n",
"\n",
"### 🔧 Framework Design Questions\n",
"Analyze how compression integrates with ML frameworks:\n",
"\n",
"9. **Quantization-Aware Training**: How does PyTorch's fake quantization during training compare to post-training quantization, and when would you choose each approach in production?\n",
"\n",
"10. **Structured Pruning Integration**: How might you design APIs that make structured pruning as easy to use as dropout, while handling the complexity of layer dimension changes?\n",
"\n",
"11. **Knowledge Distillation Frameworks**: What would a framework look like that automatically identifies the best teacher-student architecture pairs and handles the complexity of multi-teacher distillation?\n",
"\n",
"12. **Compression Search**: How could you implement neural architecture search specifically for finding optimal compression strategies rather than just model architectures?\n",
"\n",
"### ⚡ Performance & Scale Questions\n",
"Consider compression in large-scale systems:\n",
"\n",
"13. **Distributed Compression**: How would you design systems that perform compression operations across multiple GPUs or machines, especially for very large models that don't fit in single-device memory?\n",
"\n",
"14. **Incremental Compression**: What would it look like to compress models incrementally as they're being trained, rather than waiting until training completion?\n",
"\n",
"15. **Compression for Federated Learning**: How might compression techniques need to be adapted for federated learning scenarios where models are updated across many edge devices?\n",
"\n",
"16. **Memory-Bandwidth Optimization**: How would you design compression strategies specifically optimized for different memory hierarchies (L1/L2 cache, main memory, storage) in modern processors?\n",
"\n",
"### 💡 Reflection Prompts\n",
"- Which compression technique would be most critical for your target deployment scenario?\n",
"- How do the compression trade-offs change when moving from research to production?\n",
"- What aspects of hardware architecture most influence compression strategy selection?\n",
"- How might compression techniques evolve as hardware capabilities change?\n",
"\n",
"## 🎯 MODULE SUMMARY: Model Compression\n",
"\n",
"Congratulations! You've successfully implemented model compression techniques:\n",
"\n",
"### What You've Accomplished\n",
"✅ **Pruning**: Removing unnecessary weights for efficiency\n",
"✅ **Quantization**: Reducing precision for smaller models\n",
"✅ **Knowledge Distillation**: Transferring knowledge to smaller models\n",
"✅ **Structured Optimization**: Removing entire neurons for hardware efficiency\n",
"✅ **ML Systems Profiling**: Production-grade compression analysis\n",
"✅ **Real Applications**: Deploying efficient models to production\n",
"\n",
"### Key Concepts You've Learned\n",
"- **Magnitude-based pruning**: Removing low-importance weights\n",
"- **Advanced quantization**: Multi-bit precision optimization with hardware analysis\n",
"- **Knowledge distillation**: Teacher-student training paradigms\n",
"- **Structured pruning**: Hardware-aware neuron removal\n",
"- **Production profiling**: Comprehensive deployment analysis\n",
"- **ML systems integration**: How compression fits into larger systems\n",
"\n",
"### Professional Skills Developed\n",
"- **Production compression engineering**: Building systems for real-world deployment\n",
"- **Hardware-aware optimization**: Tailoring compression to specific processors\n",
"- **Performance profiling**: Measuring and optimizing compression trade-offs\n",
"- **Systems design**: Understanding compression in ML infrastructure\n",
"- **API design**: Clean interfaces for compression operations\n",
"\n",
"### Ready for Advanced Applications\n",
"Your compression implementations now enable:\n",
"- **Mobile AI deployment**: Optimized models for smartphones and tablets\n",
"- **Edge computing**: Efficient inference on resource-constrained devices\n",
"- **Production serving**: Cost-effective model deployment at scale\n",
"- **Real-time systems**: Low-latency inference for time-critical applications\n",
"- **Multi-platform deployment**: Optimized models across diverse hardware\n",
"\n",
"### Connection to Real ML Systems\n",
"Your implementations mirror production systems:\n",
"- **PyTorch**: `torch.nn.utils.prune`, `torch.quantization`, `torch.fx` for optimization\n",
"- **TensorFlow**: Model Optimization Toolkit (TFLite, TensorRT integration)\n",
"- **Production frameworks**: ONNX Runtime, Apache TVM, MLPerf optimization\n",
"- **Industry standard**: Techniques used by Google, Apple, Meta for mobile AI\n",
"\n",
"### Next Steps\n",
"1. **Export your code**: `tito export 12_compression`\n",
"2. **Test your implementation**: `tito test 12_compression`\n",
"3. **Experiment with profiling**: Try the CompressionSystemsProfiler on different models\n",
"4. **Deploy compressed models**: Test in real applications\n",
"5. **Move to Module 13**: Add custom kernels for maximum performance!\n",
"\n",
"**Ready for advanced deployment?** Your compression techniques are now production-ready!"
]
}
],
"metadata": {
"jupytext": {
"main_language": "python"
}
},
"nbformat": 4,
"nbformat_minor": 5
}