mirror of
https://github.com/MLSysBook/TinyTorch.git
synced 2026-03-12 01:13:34 -05:00
- Run tito export --all to update all exported code - Fix file permissions (chmod u+w) to allow export writes - Update 12 modified files with latest module code - Add 3 new files (tinygpt, acceleration, compression) - All 21 modules successfully exported
301 lines
12 KiB
Python
Generated
301 lines
12 KiB
Python
Generated
# ╔═══════════════════════════════════════════════════════════════════════════════╗
|
|
# ║ 🚨 CRITICAL WARNING 🚨 ║
|
|
# ║ AUTOGENERATED! DO NOT EDIT! ║
|
|
# ║ ║
|
|
# ║ This file is AUTOMATICALLY GENERATED from source modules. ║
|
|
# ║ ANY CHANGES MADE HERE WILL BE LOST when modules are re-exported! ║
|
|
# ║ ║
|
|
# ║ ✅ TO EDIT: modules/source/XX_compression/compression_dev.py ║
|
|
# ║ ✅ TO EXPORT: Run 'tito module complete <module_name>' ║
|
|
# ║ ║
|
|
# ║ 🛡️ STUDENT PROTECTION: This file contains optimized implementations. ║
|
|
# ║ Editing it directly may break module functionality and training. ║
|
|
# ║ ║
|
|
# ║ 🎓 LEARNING TIP: Work in modules/source/ - that's where real development ║
|
|
# ║ happens! The tinytorch/ directory is just the compiled output. ║
|
|
# ╚═══════════════════════════════════════════════════════════════════════════════╝
|
|
# %% auto 0
|
|
__all__ = ['Sequential', 'KnowledgeDistillation', 'test_unit_knowledge_distillation', 'CompressionComplete', 'measure_sparsity',
|
|
'magnitude_prune', 'structured_prune', 'compress_model']
|
|
|
|
# %% ../../modules/source/17_compression/compression_dev.ipynb 1
|
|
import numpy as np
|
|
import copy
|
|
from typing import List, Dict, Any, Tuple, Optional
|
|
import time
|
|
|
|
# Import from TinyTorch modules
|
|
from ..core.tensor import Tensor
|
|
from ..core.layers import Linear
|
|
|
|
# Sequential container for model compression
|
|
class Sequential:
|
|
"""Sequential container for compression (not exported from core layers)."""
|
|
def __init__(self, *layers):
|
|
self.layers = list(layers)
|
|
|
|
def forward(self, x):
|
|
for layer in self.layers:
|
|
x = layer.forward(x) if hasattr(layer, 'forward') else layer(x)
|
|
return x
|
|
|
|
def __call__(self, x):
|
|
return self.forward(x)
|
|
|
|
def parameters(self):
|
|
params = []
|
|
for layer in self.layers:
|
|
if hasattr(layer, 'parameters'):
|
|
params.extend(layer.parameters())
|
|
return params
|
|
|
|
# %% ../../modules/source/17_compression/compression_dev.ipynb 15
|
|
class KnowledgeDistillation:
|
|
"""
|
|
Knowledge distillation for model compression.
|
|
|
|
Train a smaller student model to mimic a larger teacher model.
|
|
"""
|
|
|
|
def __init__(self, teacher_model, student_model, temperature=3.0, alpha=0.7):
|
|
"""
|
|
Initialize knowledge distillation.
|
|
|
|
TODO: Set up teacher and student models with distillation parameters
|
|
|
|
APPROACH:
|
|
1. Store teacher and student models
|
|
2. Set temperature for softening probability distributions
|
|
3. Set alpha for balancing hard vs soft targets
|
|
|
|
EXAMPLE:
|
|
>>> teacher = Sequential(Linear(100, 200), Linear(200, 50))
|
|
>>> student = Sequential(Linear(100, 50))
|
|
>>> kd = KnowledgeDistillation(teacher, student, temperature=4.0, alpha=0.8)
|
|
>>> print(f"Temperature: {kd.temperature}, Alpha: {kd.alpha}")
|
|
Temperature: 4.0, Alpha: 0.8
|
|
|
|
HINTS:
|
|
- Simply assign the parameters to instance variables
|
|
- Temperature typically ranges from 3-5 for effective softening
|
|
- Alpha of 0.7 means 70% soft targets, 30% hard targets
|
|
|
|
Args:
|
|
teacher_model: Large, pre-trained model
|
|
student_model: Smaller model to train
|
|
temperature: Softening parameter for distributions
|
|
alpha: Weight for soft target loss (1-alpha for hard targets)
|
|
"""
|
|
### BEGIN SOLUTION
|
|
self.teacher_model = teacher_model
|
|
self.student_model = student_model
|
|
self.temperature = temperature
|
|
self.alpha = alpha
|
|
### END SOLUTION
|
|
|
|
def distillation_loss(self, student_logits, teacher_logits, true_labels):
|
|
"""
|
|
Calculate combined distillation loss.
|
|
|
|
TODO: Implement knowledge distillation loss function
|
|
|
|
APPROACH:
|
|
1. Calculate hard target loss (student vs true labels)
|
|
2. Calculate soft target loss (student vs teacher, with temperature)
|
|
3. Combine losses: alpha * soft_loss + (1-alpha) * hard_loss
|
|
|
|
EXAMPLE:
|
|
>>> kd = KnowledgeDistillation(teacher, student)
|
|
>>> loss = kd.distillation_loss(student_out, teacher_out, labels)
|
|
>>> print(f"Distillation loss: {loss:.4f}")
|
|
|
|
HINTS:
|
|
- Use temperature to soften distributions: logits/temperature
|
|
- Soft targets use KL divergence or cross-entropy
|
|
- Hard targets use standard classification loss
|
|
"""
|
|
### BEGIN SOLUTION
|
|
# Convert to numpy for this implementation
|
|
if hasattr(student_logits, 'data'):
|
|
student_logits = student_logits.data
|
|
if hasattr(teacher_logits, 'data'):
|
|
teacher_logits = teacher_logits.data
|
|
if hasattr(true_labels, 'data'):
|
|
true_labels = true_labels.data
|
|
|
|
# Soften distributions with temperature
|
|
student_soft = self._softmax(student_logits / self.temperature)
|
|
teacher_soft = self._softmax(teacher_logits / self.temperature)
|
|
|
|
# Soft target loss (KL divergence)
|
|
soft_loss = self._kl_divergence(student_soft, teacher_soft)
|
|
|
|
# Hard target loss (cross-entropy)
|
|
student_hard = self._softmax(student_logits)
|
|
hard_loss = self._cross_entropy(student_hard, true_labels)
|
|
|
|
# Combined loss
|
|
total_loss = self.alpha * soft_loss + (1 - self.alpha) * hard_loss
|
|
|
|
return total_loss
|
|
### END SOLUTION
|
|
|
|
def _softmax(self, logits):
|
|
"""Compute softmax with numerical stability."""
|
|
exp_logits = np.exp(logits - np.max(logits, axis=-1, keepdims=True))
|
|
return exp_logits / np.sum(exp_logits, axis=-1, keepdims=True)
|
|
|
|
def _kl_divergence(self, p, q):
|
|
"""Compute KL divergence between distributions."""
|
|
return np.sum(p * np.log(p / (q + 1e-8) + 1e-8))
|
|
|
|
def _cross_entropy(self, predictions, labels):
|
|
"""Compute cross-entropy loss."""
|
|
# Simple implementation for integer labels
|
|
if labels.ndim == 1:
|
|
return -np.mean(np.log(predictions[np.arange(len(labels)), labels] + 1e-8))
|
|
else:
|
|
return -np.mean(np.sum(labels * np.log(predictions + 1e-8), axis=1))
|
|
|
|
def test_unit_knowledge_distillation():
|
|
"""🔬 Test knowledge distillation functionality."""
|
|
print("🔬 Unit Test: Knowledge Distillation...")
|
|
|
|
# Create teacher and student models
|
|
teacher = Sequential(Linear(10, 20), Linear(20, 5))
|
|
student = Sequential(Linear(10, 5)) # Smaller model
|
|
|
|
# Initialize knowledge distillation
|
|
kd = KnowledgeDistillation(teacher, student, temperature=3.0, alpha=0.7)
|
|
|
|
# Create dummy data
|
|
input_data = Tensor(np.random.randn(8, 10)) # Batch of 8
|
|
true_labels = np.array([0, 1, 2, 3, 4, 0, 1, 2]) # Class labels
|
|
|
|
# Forward passes
|
|
teacher_output = teacher.forward(input_data)
|
|
student_output = student.forward(input_data)
|
|
|
|
# Calculate distillation loss
|
|
loss = kd.distillation_loss(student_output, teacher_output, true_labels)
|
|
|
|
# Verify loss is reasonable
|
|
assert isinstance(loss, (float, np.floating)), f"Loss should be float, got {type(loss)}"
|
|
assert loss > 0, f"Loss should be positive, got {loss}"
|
|
assert not np.isnan(loss), "Loss should not be NaN"
|
|
|
|
print("✅ knowledge_distillation works correctly!")
|
|
|
|
test_unit_knowledge_distillation()
|
|
|
|
# %% ../../modules/source/17_compression/compression_dev.ipynb 29
|
|
class CompressionComplete:
|
|
"""
|
|
Complete compression system for milestone use.
|
|
|
|
Provides pruning, distillation, and low-rank approximation techniques.
|
|
"""
|
|
|
|
@staticmethod
|
|
def measure_sparsity(model) -> float:
|
|
"""Measure the sparsity of a model (fraction of zero weights)."""
|
|
total_params = 0
|
|
zero_params = 0
|
|
|
|
if hasattr(model, 'parameters'):
|
|
for param in model.parameters():
|
|
total_params += param.size
|
|
zero_params += np.sum(param.data == 0)
|
|
|
|
return zero_params / total_params if total_params > 0 else 0.0
|
|
|
|
@staticmethod
|
|
def magnitude_prune(model, sparsity=0.5):
|
|
"""
|
|
Prune model weights by magnitude (smallest weights set to zero).
|
|
|
|
Args:
|
|
model: Model with parameters() method
|
|
sparsity: Fraction of weights to prune (0-1)
|
|
"""
|
|
if hasattr(model, 'parameters'):
|
|
for param in model.parameters():
|
|
threshold = np.percentile(np.abs(param.data), sparsity * 100)
|
|
param.data[np.abs(param.data) < threshold] = 0
|
|
|
|
return model
|
|
|
|
@staticmethod
|
|
def structured_prune(model, prune_ratio=0.5):
|
|
"""
|
|
Prune entire neurons/channels (structured pruning).
|
|
|
|
Args:
|
|
model: Model to prune
|
|
prune_ratio: Fraction of structures to prune (0-1)
|
|
"""
|
|
if hasattr(model, 'parameters'):
|
|
params = list(model.parameters())
|
|
if len(params) > 0 and hasattr(params[0], 'data'):
|
|
weight = params[0]
|
|
if len(weight.shape) == 2: # Linear layer
|
|
# Prune output neurons
|
|
neuron_norms = np.linalg.norm(weight.data, axis=0)
|
|
threshold = np.percentile(neuron_norms, prune_ratio * 100)
|
|
mask = neuron_norms >= threshold
|
|
weight.data[:, ~mask] = 0
|
|
|
|
return model
|
|
|
|
@staticmethod
|
|
def compress_model(model, compression_config: Dict[str, Any]):
|
|
"""
|
|
Apply complete compression pipeline to a model.
|
|
|
|
Args:
|
|
model: Model to compress
|
|
compression_config: Dictionary with compression settings
|
|
- 'magnitude_sparsity': float (0-1)
|
|
- 'structured_prune_ratio': float (0-1)
|
|
|
|
Returns:
|
|
Compressed model with sparsity stats
|
|
"""
|
|
stats = {
|
|
'original_sparsity': CompressionComplete.measure_sparsity(model)
|
|
}
|
|
|
|
# Apply magnitude pruning
|
|
if 'magnitude_sparsity' in compression_config:
|
|
model = CompressionComplete.magnitude_prune(
|
|
model, compression_config['magnitude_sparsity']
|
|
)
|
|
|
|
# Apply structured pruning
|
|
if 'structured_prune_ratio' in compression_config:
|
|
model = CompressionComplete.structured_prune(
|
|
model, compression_config['structured_prune_ratio']
|
|
)
|
|
|
|
stats['final_sparsity'] = CompressionComplete.measure_sparsity(model)
|
|
stats['compression_ratio'] = 1.0 / (1.0 - stats['final_sparsity']) if stats['final_sparsity'] < 1.0 else float('inf')
|
|
|
|
return model, stats
|
|
|
|
# Convenience functions for backward compatibility
|
|
def measure_sparsity(model) -> float:
|
|
"""Measure model sparsity."""
|
|
return CompressionComplete.measure_sparsity(model)
|
|
|
|
def magnitude_prune(model, sparsity=0.5):
|
|
"""Apply magnitude-based pruning."""
|
|
return CompressionComplete.magnitude_prune(model, sparsity)
|
|
|
|
def structured_prune(model, prune_ratio=0.5):
|
|
"""Apply structured pruning."""
|
|
return CompressionComplete.structured_prune(model, prune_ratio)
|
|
|
|
def compress_model(model, compression_config: Dict[str, Any]):
|
|
"""Apply complete compression pipeline."""
|
|
return CompressionComplete.compress_model(model, compression_config)
|