mirror of
https://github.com/MLSysBook/TinyTorch.git
synced 2026-03-09 10:12:08 -05:00
Package exports: - Fix tinytorch/__init__.py to export all required components for milestones - Add Dense as alias for Linear for compatibility - Add loss functions (MSELoss, CrossEntropyLoss, BinaryCrossEntropyLoss) - Export spatial operations, data loaders, and transformer components Test infrastructure: - Create tests/conftest.py to handle path setup - Create tests/test_utils.py with shared test utilities - Rename test_progressive_integration.py files to include module number - Fix syntax errors in test files (spaces in class names) - Remove stale test file referencing non-existent modules Documentation: - Update README.md with correct milestone file names - Fix milestone requirements to match actual module dependencies Export system: - Run tito export --all to regenerate package from source modules - Ensure all 20 modules are properly exported
418 lines
16 KiB
Python
Generated
418 lines
16 KiB
Python
Generated
# ╔═══════════════════════════════════════════════════════════════════════════════╗
|
|
# ║ 🚨 CRITICAL WARNING 🚨 ║
|
|
# ║ AUTOGENERATED! DO NOT EDIT! ║
|
|
# ║ ║
|
|
# ║ This file is AUTOMATICALLY GENERATED from source modules. ║
|
|
# ║ ANY CHANGES MADE HERE WILL BE LOST when modules are re-exported! ║
|
|
# ║ ║
|
|
# ║ ✅ TO EDIT: src/XX_compression/XX_compression.py ║
|
|
# ║ ✅ TO EXPORT: Run 'tito module complete <module_name>' ║
|
|
# ║ ║
|
|
# ║ 🛡️ STUDENT PROTECTION: This file contains optimized implementations. ║
|
|
# ║ Editing it directly may break module functionality and training. ║
|
|
# ║ ║
|
|
# ║ 🎓 LEARNING TIP: Work in src/ (developers) or modules/ (learners) ║
|
|
# ║ The tinytorch/ directory is generated code - edit source files instead! ║
|
|
# ╚═══════════════════════════════════════════════════════════════════════════════╝
|
|
# %% auto 0
|
|
__all__ = ['BYTES_PER_FLOAT32', 'MB_TO_BYTES', 'magnitude_prune', 'structured_prune', 'low_rank_approximate',
|
|
'KnowledgeDistillation', 'CompressionComplete', 'measure_sparsity', 'compress_model']
|
|
|
|
# %% ../../modules/16_compression/16_compression.ipynb 1
|
|
import numpy as np
|
|
import copy
|
|
from typing import List, Dict, Any, Tuple, Optional
|
|
import time
|
|
|
|
# Import from TinyTorch package (previous modules must be completed and exported)
|
|
from ..core.tensor import Tensor
|
|
from ..core.layers import Linear
|
|
from ..core.activations import ReLU
|
|
|
|
# Constants for memory calculations
|
|
BYTES_PER_FLOAT32 = 4 # Standard float32 size in bytes
|
|
MB_TO_BYTES = 1024 * 1024 # Megabytes to bytes conversion
|
|
|
|
# %% ../../modules/16_compression/16_compression.ipynb 12
|
|
def magnitude_prune(model, sparsity=0.9):
|
|
"""
|
|
Remove weights with smallest magnitudes to achieve target sparsity.
|
|
|
|
TODO: Implement global magnitude-based pruning
|
|
|
|
APPROACH:
|
|
1. Collect all weights from the model
|
|
2. Calculate absolute values to get magnitudes
|
|
3. Find threshold at desired sparsity percentile
|
|
4. Set weights below threshold to zero (in-place)
|
|
|
|
EXAMPLE:
|
|
>>> # Create model with explicit layer composition
|
|
>>> layer1 = Linear(100, 50)
|
|
>>> layer2 = Linear(50, 10)
|
|
>>> model = SimpleModel(layer1, layer2)
|
|
>>> original_params = sum(p.size for p in model.parameters())
|
|
>>> magnitude_prune(model, sparsity=0.8)
|
|
>>> final_sparsity = measure_sparsity(model)
|
|
>>> print(f"Achieved {final_sparsity:.1f}% sparsity")
|
|
Achieved 80.0% sparsity
|
|
|
|
HINTS:
|
|
- Use np.percentile() to find threshold
|
|
- Modify model parameters in-place
|
|
- Consider only weight matrices, not biases
|
|
"""
|
|
### BEGIN SOLUTION
|
|
# Collect all weights (excluding biases)
|
|
all_weights = []
|
|
weight_params = []
|
|
|
|
for param in model.parameters():
|
|
# Skip biases (typically 1D)
|
|
if len(param.shape) > 1:
|
|
all_weights.extend(param.data.flatten())
|
|
weight_params.append(param)
|
|
|
|
if not all_weights:
|
|
return model
|
|
|
|
# Calculate magnitude threshold
|
|
magnitudes = np.abs(all_weights)
|
|
threshold = np.percentile(magnitudes, sparsity * 100)
|
|
|
|
# Apply pruning to each weight parameter
|
|
for param in weight_params:
|
|
mask = np.abs(param.data) >= threshold
|
|
param.data = param.data * mask
|
|
|
|
return model
|
|
### END SOLUTION
|
|
|
|
# %% ../../modules/16_compression/16_compression.ipynb 15
|
|
def structured_prune(model, prune_ratio=0.5):
|
|
"""
|
|
Remove entire channels/neurons based on L2 norm importance.
|
|
|
|
TODO: Implement structured pruning for Linear layers
|
|
|
|
APPROACH:
|
|
1. For each Linear layer, calculate L2 norm of each output channel
|
|
2. Rank channels by importance (L2 norm)
|
|
3. Remove lowest importance channels by setting to zero
|
|
4. This creates block sparsity that's hardware-friendly
|
|
|
|
EXAMPLE:
|
|
>>> # Create model with explicit layers
|
|
>>> layer1 = Linear(100, 50)
|
|
>>> layer2 = Linear(50, 10)
|
|
>>> model = SimpleModel(layer1, layer2)
|
|
>>> original_shape = layer1.weight.shape
|
|
>>> structured_prune(model, prune_ratio=0.3)
|
|
>>> # 30% of channels are now completely zero
|
|
>>> final_sparsity = measure_sparsity(model)
|
|
>>> print(f"Structured sparsity: {final_sparsity:.1f}%")
|
|
Structured sparsity: 30.0%
|
|
|
|
HINTS:
|
|
- Calculate L2 norm along input dimension for each output channel
|
|
- Use np.linalg.norm(weights[:, channel]) for channel importance
|
|
- Set entire channels to zero (not just individual weights)
|
|
"""
|
|
### BEGIN SOLUTION
|
|
# All Linear layers have .weight attribute
|
|
for layer in model.layers:
|
|
if isinstance(layer, Linear):
|
|
weight = layer.weight.data
|
|
|
|
# Calculate L2 norm for each output channel (column)
|
|
channel_norms = np.linalg.norm(weight, axis=0)
|
|
|
|
# Find channels to prune (lowest importance)
|
|
num_channels = weight.shape[1]
|
|
num_to_prune = int(num_channels * prune_ratio)
|
|
|
|
if num_to_prune > 0:
|
|
# Get indices of channels to prune (smallest norms)
|
|
prune_indices = np.argpartition(channel_norms, num_to_prune)[:num_to_prune]
|
|
|
|
# Zero out entire channels
|
|
weight[:, prune_indices] = 0
|
|
|
|
# Also zero corresponding bias elements if bias exists
|
|
if layer.bias is not None:
|
|
layer.bias.data[prune_indices] = 0
|
|
|
|
return model
|
|
### END SOLUTION
|
|
|
|
# %% ../../modules/16_compression/16_compression.ipynb 18
|
|
def low_rank_approximate(weight_matrix, rank_ratio=0.5):
|
|
"""
|
|
Approximate weight matrix using low-rank decomposition (SVD).
|
|
|
|
TODO: Implement SVD-based low-rank approximation
|
|
|
|
APPROACH:
|
|
1. Perform SVD: W = U @ S @ V^T
|
|
2. Keep only top k singular values where k = rank_ratio * min(dimensions)
|
|
3. Reconstruct: W_approx = U[:,:k] @ diag(S[:k]) @ V[:k,:]
|
|
4. Return decomposed matrices for memory savings
|
|
|
|
EXAMPLE:
|
|
>>> weight = np.random.randn(100, 50)
|
|
>>> U, S, V = low_rank_approximate(weight, rank_ratio=0.3)
|
|
>>> # Original: 100*50 = 5000 params
|
|
>>> # Compressed: 100*15 + 15*50 = 2250 params (55% reduction)
|
|
|
|
HINTS:
|
|
- Use np.linalg.svd() for decomposition
|
|
- Choose k = int(rank_ratio * min(m, n))
|
|
- Return U[:,:k], S[:k], V[:k,:] for reconstruction
|
|
"""
|
|
### BEGIN SOLUTION
|
|
m, n = weight_matrix.shape
|
|
|
|
# Perform SVD
|
|
U, S, V = np.linalg.svd(weight_matrix, full_matrices=False)
|
|
|
|
# Determine target rank
|
|
max_rank = min(m, n)
|
|
target_rank = max(1, int(rank_ratio * max_rank))
|
|
|
|
# Truncate to target rank
|
|
U_truncated = U[:, :target_rank]
|
|
S_truncated = S[:target_rank]
|
|
V_truncated = V[:target_rank, :]
|
|
|
|
return U_truncated, S_truncated, V_truncated
|
|
### END SOLUTION
|
|
|
|
# %% ../../modules/16_compression/16_compression.ipynb 21
|
|
class KnowledgeDistillation:
|
|
"""
|
|
Knowledge distillation for model compression.
|
|
|
|
Train a smaller student model to mimic a larger teacher model.
|
|
"""
|
|
|
|
def __init__(self, teacher_model, student_model, temperature=3.0, alpha=0.7):
|
|
"""
|
|
Initialize knowledge distillation.
|
|
|
|
TODO: Set up teacher and student models with distillation parameters
|
|
|
|
APPROACH:
|
|
1. Store teacher and student models
|
|
2. Set temperature for softening probability distributions
|
|
3. Set alpha for balancing hard vs soft targets
|
|
|
|
EXAMPLE:
|
|
>>> # Create teacher with more capacity (explicit layers)
|
|
>>> teacher_l1 = Linear(100, 200)
|
|
>>> teacher_l2 = Linear(200, 50)
|
|
>>> teacher = SimpleModel(teacher_l1, teacher_l2)
|
|
>>>
|
|
>>> # Create smaller student (explicit layer)
|
|
>>> student = SimpleModel(Linear(100, 50))
|
|
>>>
|
|
>>> kd = KnowledgeDistillation(teacher, student, temperature=4.0, alpha=0.8)
|
|
>>> print(f"Temperature: {kd.temperature}, Alpha: {kd.alpha}")
|
|
Temperature: 4.0, Alpha: 0.8
|
|
|
|
HINTS:
|
|
- Simply assign the parameters to instance variables
|
|
- Temperature typically ranges from 3-5 for effective softening
|
|
- Alpha of 0.7 means 70% soft targets, 30% hard targets
|
|
|
|
Args:
|
|
teacher_model: Large, pre-trained model
|
|
student_model: Smaller model to train
|
|
temperature: Softening parameter for distributions
|
|
alpha: Weight for soft target loss (1-alpha for hard targets)
|
|
"""
|
|
### BEGIN SOLUTION
|
|
self.teacher_model = teacher_model
|
|
self.student_model = student_model
|
|
self.temperature = temperature
|
|
self.alpha = alpha
|
|
### END SOLUTION
|
|
|
|
def distillation_loss(self, student_logits, teacher_logits, true_labels):
|
|
"""
|
|
Calculate combined distillation loss.
|
|
|
|
TODO: Implement knowledge distillation loss function
|
|
|
|
APPROACH:
|
|
1. Calculate hard target loss (student vs true labels)
|
|
2. Calculate soft target loss (student vs teacher, with temperature)
|
|
3. Combine losses: alpha * soft_loss + (1-alpha) * hard_loss
|
|
|
|
EXAMPLE:
|
|
>>> kd = KnowledgeDistillation(teacher, student)
|
|
>>> loss = kd.distillation_loss(student_out, teacher_out, labels)
|
|
>>> print(f"Distillation loss: {loss:.4f}")
|
|
|
|
HINTS:
|
|
- Use temperature to soften distributions: logits/temperature
|
|
- Soft targets use KL divergence or cross-entropy
|
|
- Hard targets use standard classification loss
|
|
"""
|
|
### BEGIN SOLUTION
|
|
# Extract numpy arrays from Tensors
|
|
# student_logits and teacher_logits are always Tensors from forward passes
|
|
student_logits = student_logits.data
|
|
teacher_logits = teacher_logits.data
|
|
|
|
# true_labels might be numpy array or Tensor
|
|
if isinstance(true_labels, Tensor):
|
|
true_labels = true_labels.data
|
|
|
|
# Soften distributions with temperature
|
|
student_soft = self._softmax(student_logits / self.temperature)
|
|
teacher_soft = self._softmax(teacher_logits / self.temperature)
|
|
|
|
# Soft target loss (KL divergence)
|
|
soft_loss = self._kl_divergence(student_soft, teacher_soft)
|
|
|
|
# Hard target loss (cross-entropy)
|
|
student_hard = self._softmax(student_logits)
|
|
hard_loss = self._cross_entropy(student_hard, true_labels)
|
|
|
|
# Combined loss
|
|
total_loss = self.alpha * soft_loss + (1 - self.alpha) * hard_loss
|
|
|
|
return total_loss
|
|
### END SOLUTION
|
|
|
|
def _softmax(self, logits):
|
|
"""Compute softmax with numerical stability."""
|
|
exp_logits = np.exp(logits - np.max(logits, axis=-1, keepdims=True))
|
|
return exp_logits / np.sum(exp_logits, axis=-1, keepdims=True)
|
|
|
|
def _kl_divergence(self, p, q):
|
|
"""Compute KL divergence between distributions."""
|
|
return np.sum(p * np.log(p / (q + 1e-8) + 1e-8))
|
|
|
|
def _cross_entropy(self, predictions, labels):
|
|
"""Compute cross-entropy loss."""
|
|
# Simple implementation for integer labels
|
|
if labels.ndim == 1:
|
|
return -np.mean(np.log(predictions[np.arange(len(labels)), labels] + 1e-8))
|
|
else:
|
|
return -np.mean(np.sum(labels * np.log(predictions + 1e-8), axis=1))
|
|
|
|
# %% ../../modules/16_compression/16_compression.ipynb 37
|
|
class CompressionComplete:
|
|
"""
|
|
Complete compression system for milestone use.
|
|
|
|
Provides pruning, distillation, and low-rank approximation techniques.
|
|
"""
|
|
|
|
@staticmethod
|
|
def measure_sparsity(model) -> float:
|
|
"""Measure the sparsity of a model (fraction of zero weights)."""
|
|
# SimpleModel has .layers, each layer has .parameters() method
|
|
total_params = 0
|
|
zero_params = 0
|
|
|
|
for layer in model.layers:
|
|
for param in layer.parameters():
|
|
total_params += param.size
|
|
zero_params += np.sum(param.data == 0)
|
|
|
|
return zero_params / total_params if total_params > 0 else 0.0
|
|
|
|
@staticmethod
|
|
def magnitude_prune(model, sparsity=0.5):
|
|
"""
|
|
Prune model weights by magnitude (smallest weights set to zero).
|
|
|
|
Args:
|
|
model: SimpleModel with .layers attribute
|
|
sparsity: Fraction of weights to prune (0-1)
|
|
"""
|
|
# SimpleModel has .layers, each layer has .parameters() method
|
|
for layer in model.layers:
|
|
for param in layer.parameters():
|
|
threshold = np.percentile(np.abs(param.data), sparsity * 100)
|
|
param.data[np.abs(param.data) < threshold] = 0
|
|
|
|
return model
|
|
|
|
@staticmethod
|
|
def structured_prune(model, prune_ratio=0.5):
|
|
"""
|
|
Prune entire neurons/channels (structured pruning).
|
|
|
|
Args:
|
|
model: SimpleModel with .layers attribute
|
|
prune_ratio: Fraction of structures to prune (0-1)
|
|
"""
|
|
# SimpleModel has .layers, process Linear layers
|
|
for layer in model.layers:
|
|
if isinstance(layer, Linear):
|
|
# Linear layers have .weight attribute with .data
|
|
weight = layer.weight
|
|
if len(weight.shape) == 2: # Linear layer
|
|
# Prune output neurons
|
|
neuron_norms = np.linalg.norm(weight.data, axis=0)
|
|
threshold = np.percentile(neuron_norms, prune_ratio * 100)
|
|
mask = neuron_norms >= threshold
|
|
weight.data[:, ~mask] = 0
|
|
|
|
return model
|
|
|
|
@staticmethod
|
|
def compress_model(model, compression_config: Dict[str, Any]):
|
|
"""
|
|
Apply complete compression pipeline to a model.
|
|
|
|
Args:
|
|
model: Model to compress
|
|
compression_config: Dictionary with compression settings
|
|
- 'magnitude_sparsity': float (0-1)
|
|
- 'structured_prune_ratio': float (0-1)
|
|
|
|
Returns:
|
|
Compressed model with sparsity stats
|
|
"""
|
|
stats = {
|
|
'original_sparsity': CompressionComplete.measure_sparsity(model)
|
|
}
|
|
|
|
# Apply magnitude pruning
|
|
if 'magnitude_sparsity' in compression_config:
|
|
model = CompressionComplete.magnitude_prune(
|
|
model, compression_config['magnitude_sparsity']
|
|
)
|
|
|
|
# Apply structured pruning
|
|
if 'structured_prune_ratio' in compression_config:
|
|
model = CompressionComplete.structured_prune(
|
|
model, compression_config['structured_prune_ratio']
|
|
)
|
|
|
|
stats['final_sparsity'] = CompressionComplete.measure_sparsity(model)
|
|
stats['compression_ratio'] = 1.0 / (1.0 - stats['final_sparsity']) if stats['final_sparsity'] < 1.0 else float('inf')
|
|
|
|
return model, stats
|
|
|
|
# Convenience functions for backward compatibility
|
|
def measure_sparsity(model) -> float:
|
|
"""Measure model sparsity."""
|
|
return CompressionComplete.measure_sparsity(model)
|
|
|
|
def magnitude_prune(model, sparsity=0.5):
|
|
"""Apply magnitude-based pruning."""
|
|
return CompressionComplete.magnitude_prune(model, sparsity)
|
|
|
|
def structured_prune(model, prune_ratio=0.5):
|
|
"""Apply structured pruning."""
|
|
return CompressionComplete.structured_prune(model, prune_ratio)
|
|
|
|
def compress_model(model, compression_config: Dict[str, Any]):
|
|
"""Apply complete compression pipeline."""
|
|
return CompressionComplete.compress_model(model, compression_config)
|