Files
TinyTorch/tinytorch/core/compression.py
Vijay Janapa Reddi e82bc8ba97 Complete comprehensive system validation and cleanup
🎯 Major Accomplishments:
•  All 15 module dev files validated and unit tests passing
•  Comprehensive integration tests (11/11 pass)
•  All 3 examples working with PyTorch-like API (XOR, MNIST, CIFAR-10)
•  Training capability verified (4/4 tests pass, XOR shows 35.8% improvement)
•  Clean directory structure (modules/source/ → modules/)

🧹 Repository Cleanup:
• Removed experimental/debug files and old logos
• Deleted redundant documentation (API_SIMPLIFICATION_COMPLETE.md, etc.)
• Removed empty module directories and backup files
• Streamlined examples (kept modern API versions only)
• Cleaned up old TinyGPT implementation (moved to examples concept)

📊 Validation Results:
• Module unit tests: 15/15 
• Integration tests: 11/11 
• Example validation: 3/3 
• Training validation: 4/4 

🔧 Key Fixes:
• Fixed activations module requires_grad test
• Fixed networks module layer name test (Dense → Linear)
• Fixed spatial module Conv2D weights attribute issues
• Updated all documentation to reflect new structure

📁 Structure Improvements:
• Simplified modules/source/ → modules/ (removed unnecessary nesting)
• Added comprehensive validation test suites
• Created VALIDATION_COMPLETE.md and WORKING_MODULES.md documentation
• Updated book structure to reflect ML evolution story

🚀 System Status: READY FOR PRODUCTION
All components validated, examples working, training capability verified.
Test-first approach successfully implemented and proven.
2025-09-23 10:00:33 -04:00

1173 lines
49 KiB
Python
Generated

# AUTOGENERATED! DO NOT EDIT! File to edit: ../../modules/source/temp_holding/16_regularization/regularization_dev.ipynb.
# %% auto 0
__all__ = ['setup_import_paths', 'CompressionMetrics', 'prune_weights_by_magnitude', 'calculate_sparsity',
'quantize_layer_weights', 'DistillationLoss', 'compute_neuron_importance', 'prune_layer_neurons',
'CompressionSystemsProfiler', 'compare_compression_techniques']
# %% ../../modules/source/temp_holding/16_regularization/regularization_dev.ipynb 1
import numpy as np
import sys
import os
from typing import List, Dict, Any, Optional, Union, Tuple
# Helper function to set up import paths
def setup_import_paths():
"""Set up import paths for development modules."""
import sys
import os
# Add module directories to path
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
module_dirs = [
'01_tensor', '02_activations', '03_layers', '04_networks',
'05_cnn', '06_dataloader', '07_autograd', '08_optimizers', '09_training'
]
for module_dir in module_dirs:
sys.path.append(os.path.join(base_dir, module_dir))
# Set up paths
setup_import_paths()
# Import all the building blocks we need
try:
from tinytorch.core.tensor import Tensor
from tinytorch.core.layers import Dense
from tinytorch.core.networks import Sequential
from tinytorch.core.training import CrossEntropyLoss, Trainer
except ImportError:
# For development, create mock classes or import from local modules
try:
from tensor_dev import Tensor
from layers_dev import Dense
from networks_dev import Sequential
from training_dev import CrossEntropyLoss, Trainer
except ImportError:
# Create minimal mock classes for development
class Tensor:
def __init__(self, data):
self.data = np.array(data)
self.shape = self.data.shape
def __str__(self):
return f"Tensor({self.data})"
class Dense:
def __init__(self, input_size, output_size):
self.input_size = input_size
self.output_size = output_size
self.weights = Tensor(np.random.randn(input_size, output_size) * 0.1)
self.bias = Tensor(np.zeros(output_size))
def __str__(self):
return f"Dense({self.input_size}, {self.output_size})"
class Sequential:
def __init__(self, layers=None):
self.layers = layers or []
class CrossEntropyLoss:
def __init__(self):
pass
class Trainer:
def __init__(self, model, optimizer, loss_function):
self.model = model
self.optimizer = optimizer
self.loss_function = loss_function
# %% ../../modules/source/temp_holding/16_regularization/regularization_dev.ipynb 7
class CompressionMetrics:
"""
Utilities for measuring model size, sparsity, and compression efficiency.
This class provides tools to analyze neural network models and understand
their memory footprint, parameter distribution, and compression potential.
"""
def __init__(self):
"""Initialize compression metrics analyzer."""
pass
def count_parameters(self, model: Sequential) -> Dict[str, int]:
"""
Count parameters in a neural network model.
Args:
model: Sequential model to analyze
Returns:
Dictionary with parameter counts per layer and total
TODO: Implement parameter counting for neural network analysis.
STEP-BY-STEP IMPLEMENTATION:
1. Initialize counters for different parameter types
2. Iterate through each layer in the model
3. Count weights and biases for each layer
4. Calculate total parameters across all layers
5. Return detailed breakdown dictionary
EXAMPLE OUTPUT:
{
'layer_0_weights': 100352,
'layer_0_bias': 128,
'layer_1_weights': 8192,
'layer_1_bias': 64,
'layer_2_weights': 640,
'layer_2_bias': 10,
'total_parameters': 109386,
'total_weights': 109184,
'total_bias': 202
}
IMPLEMENTATION HINTS:
- Use hasattr() to check if layer has weights/bias attributes
- Weight matrices have shape (input_size, output_size)
- Bias vectors have shape (output_size,)
- Use np.prod() to calculate total elements from shape
- Track layer index for detailed reporting
LEARNING CONNECTIONS:
- This is like `model.numel()` in PyTorch
- Understanding where parameters are concentrated
- Foundation for compression target selection
"""
### BEGIN SOLUTION
param_counts = {}
total_params = 0
total_weights = 0
total_bias = 0
for i, layer in enumerate(model.layers):
# Count weights if layer has them
if hasattr(layer, 'weights') and layer.weights is not None:
# Handle different weight formats
if hasattr(layer.weights, 'shape'):
weight_count = np.prod(layer.weights.shape)
else:
weight_count = np.prod(layer.weights.data.shape)
param_counts[f'layer_{i}_weights'] = weight_count
total_weights += weight_count
total_params += weight_count
# Count bias if layer has them
if hasattr(layer, 'bias') and layer.bias is not None:
# Handle different bias formats
if hasattr(layer.bias, 'shape'):
bias_count = np.prod(layer.bias.shape)
else:
bias_count = np.prod(layer.bias.data.shape)
param_counts[f'layer_{i}_bias'] = bias_count
total_bias += bias_count
total_params += bias_count
# Add summary statistics
param_counts['total_parameters'] = total_params
param_counts['total_weights'] = total_weights
param_counts['total_bias'] = total_bias
return param_counts
### END SOLUTION
def calculate_model_size(self, model: Sequential, dtype: str = 'float32') -> Dict[str, Any]:
"""
Calculate memory footprint of a neural network model.
Args:
model: Sequential model to analyze
dtype: Data type for size calculation ('float32', 'float16', 'int8')
Returns:
Dictionary with size information in different units
"""
# Get parameter count
param_info = self.count_parameters(model)
total_params = param_info['total_parameters']
# Determine bytes per parameter
bytes_per_param = {
'float32': 4,
'float16': 2,
'int8': 1
}.get(dtype, 4)
# Calculate sizes
total_bytes = total_params * bytes_per_param
size_kb = total_bytes / 1024
size_mb = size_kb / 1024
return {
'total_parameters': total_params,
'bytes_per_parameter': bytes_per_param,
'total_bytes': total_bytes,
'size_kb': round(size_kb, 2),
'size_mb': round(size_mb, 2),
'dtype': dtype
}
# %% ../../modules/source/temp_holding/16_regularization/regularization_dev.ipynb 11
def prune_weights_by_magnitude(layer: Dense, pruning_ratio: float = 0.5) -> Tuple[Dense, Dict[str, Any]]:
"""
Prune weights in a Dense layer by magnitude.
Args:
layer: Dense layer to prune
pruning_ratio: Fraction of weights to remove (0.0 to 1.0)
Returns:
Tuple of (pruned_layer, pruning_info)
TODO: Implement magnitude-based weight pruning.
STEP-BY-STEP IMPLEMENTATION:
1. Get weight matrix from layer
2. Calculate absolute values (magnitudes)
3. Find threshold using percentile
4. Create binary mask for weights above threshold
5. Apply mask to weights (set small weights to zero)
6. Update layer weights and return pruning statistics
EXAMPLE USAGE:
```python
layer = Dense(784, 128)
pruned_layer, info = prune_weights_by_magnitude(layer, pruning_ratio=0.3)
print(f"Pruned {info['weights_removed']} weights, sparsity: {info['sparsity']:.2f}")
```
IMPLEMENTATION HINTS:
- Use np.percentile() with pruning_ratio * 100 for threshold
- Create mask with np.abs(weights) > threshold
- Apply mask by element-wise multiplication
- Count zeros to calculate sparsity
- Return original layer (modified) and statistics
LEARNING CONNECTIONS:
- This is the foundation of network pruning
- Magnitude pruning is simplest but effective
- Sparsity = fraction of weights that are zero
- Threshold selection affects accuracy vs compression trade-off
"""
### BEGIN SOLUTION
# Get current weights and ensure they're numpy arrays
weights = layer.weights.data
if not isinstance(weights, np.ndarray):
weights = np.array(weights)
original_weights = weights.copy()
# Calculate magnitudes and threshold
magnitudes = np.abs(weights)
threshold = np.percentile(magnitudes, pruning_ratio * 100)
# Create mask and apply pruning
mask = magnitudes > threshold
pruned_weights = weights * mask
# Update layer weights by creating a new Tensor
layer.weights = Tensor(pruned_weights)
# Calculate pruning statistics
total_weights = weights.size
zero_weights = np.sum(pruned_weights == 0)
weights_removed = zero_weights - np.sum(original_weights == 0)
sparsity = zero_weights / total_weights
pruning_info = {
'pruning_ratio': pruning_ratio,
'threshold': float(threshold),
'total_weights': total_weights,
'weights_removed': weights_removed,
'remaining_weights': total_weights - zero_weights,
'sparsity': float(sparsity),
'compression_ratio': 1 / (1 - sparsity) if sparsity < 1 else float('inf')
}
return layer, pruning_info
### END SOLUTION
# %% ../../modules/source/temp_holding/16_regularization/regularization_dev.ipynb 12
def calculate_sparsity(layer: Dense) -> float:
"""
Calculate sparsity (fraction of zero weights) in a Dense layer.
Args:
layer: Dense layer to analyze
Returns:
Sparsity as float between 0.0 and 1.0
TODO: Implement sparsity calculation.
STEP-BY-STEP IMPLEMENTATION:
1. Get weight matrix from layer
2. Count total number of weights
3. Count number of zero weights
4. Calculate sparsity = zero_weights / total_weights
5. Return as float
EXAMPLE USAGE:
```python
layer = Dense(100, 50)
sparsity = calculate_sparsity(layer)
print(f"Layer sparsity: {sparsity:.2%}")
```
IMPLEMENTATION HINTS:
- Use np.sum() with condition to count zeros
- Use .size attribute for total elements
- Return 0.0 if no weights (edge case)
- Sparsity of 0.0 = dense, 1.0 = completely sparse
LEARNING CONNECTIONS:
- Sparsity is key metric for compression
- Higher sparsity = more compression
- Sparsity patterns affect hardware efficiency
"""
### BEGIN SOLUTION
if not hasattr(layer, 'weights') or layer.weights is None:
return 0.0
weights = layer.weights.data
if not isinstance(weights, np.ndarray):
weights = np.array(weights)
total_weights = weights.size
zero_weights = np.sum(weights == 0)
return zero_weights / total_weights if total_weights > 0 else 0.0
### END SOLUTION
# %% ../../modules/source/temp_holding/16_regularization/regularization_dev.ipynb 16
def quantize_layer_weights(layer: Dense, bits: int = 8) -> Tuple[Dense, Dict[str, Any]]:
"""
Quantize layer weights to reduce precision.
Args:
layer: Dense layer to quantize
bits: Number of bits for quantization (8, 16, etc.)
Returns:
Tuple of (quantized_layer, quantization_info)
TODO: Implement weight quantization for memory efficiency.
STEP-BY-STEP IMPLEMENTATION:
1. Get weight matrix from layer
2. Find min and max values for quantization range
3. Calculate scale factor: (max - min) / (2^bits - 1)
4. Quantize: round((weights - min) / scale)
5. Dequantize back to float: quantized * scale + min
6. Update layer weights and return statistics
EXAMPLE USAGE:
```python
layer = Dense(784, 128)
quantized_layer, info = quantize_layer_weights(layer, bits=8)
print(f"Memory reduction: {info['memory_reduction']:.1f}x")
```
IMPLEMENTATION HINTS:
- Use np.min() and np.max() to find weight range
- Clamp quantized values to valid range [0, 2^bits-1]
- Store original dtype for memory calculation
- Calculate theoretical memory savings
LEARNING CONNECTIONS:
- This is how mobile AI frameworks work
- Hardware accelerators optimize for INT8
- Precision-performance trade-off is key
"""
### BEGIN SOLUTION
# Get current weights and ensure they're numpy arrays
weights = layer.weights.data
if not isinstance(weights, np.ndarray):
weights = np.array(weights)
original_weights = weights.copy()
original_dtype = weights.dtype
# Find min and max for quantization range
w_min, w_max = np.min(weights), np.max(weights)
# Calculate scale factor
scale = (w_max - w_min) / (2**bits - 1)
# Quantize weights
quantized = np.round((weights - w_min) / scale)
quantized = np.clip(quantized, 0, 2**bits - 1) # Clamp to valid range
# Dequantize back to float (simulation of quantized inference)
dequantized = quantized * scale + w_min
# Update layer weights
layer.weights = Tensor(dequantized.astype(np.float32))
# Calculate quantization statistics
total_weights = weights.size
original_bytes = total_weights * 4 # FP32 = 4 bytes
quantized_bytes = total_weights * (bits // 8) # bits/8 bytes per weight
memory_reduction = original_bytes / quantized_bytes if quantized_bytes > 0 else 1.0
# Calculate quantization error
mse_error = np.mean((original_weights - dequantized) ** 2)
max_error = np.max(np.abs(original_weights - dequantized))
quantization_info = {
'bits': bits,
'scale': float(scale),
'min_val': float(w_min),
'max_val': float(w_max),
'total_weights': total_weights,
'original_bytes': original_bytes,
'quantized_bytes': quantized_bytes,
'memory_reduction': float(memory_reduction),
'mse_error': float(mse_error),
'max_error': float(max_error),
'original_dtype': str(original_dtype)
}
return layer, quantization_info
### END SOLUTION
# %% ../../modules/source/temp_holding/16_regularization/regularization_dev.ipynb 20
class DistillationLoss:
"""
Combined loss function for knowledge distillation.
This loss combines standard classification loss (hard targets) with
distillation loss (soft targets from teacher) for training compact models.
"""
def __init__(self, temperature: float = 3.0, alpha: float = 0.5):
"""
Initialize distillation loss.
Args:
temperature: Temperature for softening probability distributions
alpha: Weight for hard loss (1-alpha for soft loss)
"""
self.temperature = temperature
self.alpha = alpha
self.ce_loss = CrossEntropyLoss()
def __call__(self, student_logits: np.ndarray, teacher_logits: np.ndarray,
true_labels: np.ndarray) -> float:
"""
Calculate combined distillation loss.
Args:
student_logits: Raw outputs from student model
teacher_logits: Raw outputs from teacher model
true_labels: Ground truth labels
Returns:
Combined loss value
TODO: Implement knowledge distillation loss function.
STEP-BY-STEP IMPLEMENTATION:
1. Calculate hard loss using standard cross-entropy
2. Apply temperature scaling to both logits
3. Calculate soft targets from teacher logits
4. Calculate soft loss between student and teacher distributions
5. Combine hard and soft losses with alpha weighting
6. Return total loss
EXAMPLE USAGE:
```python
distill_loss = DistillationLoss(temperature=3.0, alpha=0.5)
loss = distill_loss(student_out, teacher_out, labels)
```
IMPLEMENTATION HINTS:
- Use temperature scaling before softmax: logits / temperature
- Implement stable softmax to avoid numerical issues
- Scale soft loss by temperature^2 (standard practice)
- Ensure proper normalization for both losses
LEARNING CONNECTIONS:
- This is how DistilBERT was trained
- Temperature controls knowledge transfer richness
- Alpha balances accuracy vs compression
"""
### BEGIN SOLUTION
# Convert inputs to numpy arrays if needed
if not isinstance(student_logits, np.ndarray):
student_logits = np.array(student_logits)
if not isinstance(teacher_logits, np.ndarray):
teacher_logits = np.array(teacher_logits)
if not isinstance(true_labels, np.ndarray):
true_labels = np.array(true_labels)
# Hard loss: standard classification loss
hard_loss = self._cross_entropy_loss(student_logits, true_labels)
# Soft loss: distillation from teacher
# Apply temperature scaling
teacher_soft = self._softmax(teacher_logits / self.temperature)
student_soft = self._softmax(student_logits / self.temperature)
# Calculate soft loss (KL divergence)
soft_loss = -np.mean(np.sum(teacher_soft * np.log(student_soft + 1e-10), axis=-1))
# Scale soft loss by temperature^2 (standard practice)
soft_loss *= (self.temperature ** 2)
# Combine losses
total_loss = self.alpha * hard_loss + (1 - self.alpha) * soft_loss
return float(total_loss)
### END SOLUTION
def _softmax(self, logits: np.ndarray) -> np.ndarray:
"""Numerically stable softmax."""
# Subtract max for numerical stability
exp_logits = np.exp(logits - np.max(logits, axis=-1, keepdims=True))
return exp_logits / np.sum(exp_logits, axis=-1, keepdims=True)
def _cross_entropy_loss(self, logits: np.ndarray, labels: np.ndarray) -> float:
"""Simple cross-entropy loss implementation."""
# Convert labels to one-hot if needed
if labels.ndim == 1:
num_classes = logits.shape[-1]
one_hot = np.zeros((labels.shape[0], num_classes))
one_hot[np.arange(labels.shape[0]), labels] = 1
labels = one_hot
# Apply softmax and calculate cross-entropy
probs = self._softmax(logits)
return -np.mean(np.sum(labels * np.log(probs + 1e-10), axis=-1))
# %% ../../modules/source/temp_holding/16_regularization/regularization_dev.ipynb 24
def compute_neuron_importance(layer: Dense, method: str = 'weight_magnitude') -> np.ndarray:
"""
Compute importance scores for each neuron in a Dense layer.
Args:
layer: Dense layer to analyze
method: Importance computation method
Returns:
Array of importance scores for each output neuron
TODO: Implement neuron importance calculation.
STEP-BY-STEP IMPLEMENTATION:
1. Get weight matrix from layer
2. Choose importance metric based on method
3. Calculate per-neuron importance scores
4. Return array of scores (one per output neuron)
AVAILABLE METHODS:
- 'weight_magnitude': Sum of absolute weights per neuron
- 'weight_variance': Variance of weights per neuron
- 'random': Random importance (for baseline comparison)
IMPLEMENTATION HINTS:
- Weights shape is (input_size, output_size)
- Each column represents one output neuron
- Use axis=0 for operations across input dimensions
- Higher scores = more important neurons
LEARNING CONNECTIONS:
- This is how neural architecture search works
- Different metrics capture different aspects of importance
- Importance ranking is crucial for effective pruning
"""
### BEGIN SOLUTION
# Get weights and ensure they're numpy arrays
weights = layer.weights.data
if not isinstance(weights, np.ndarray):
weights = np.array(weights)
if method == 'weight_magnitude':
# Sum of absolute weights per neuron (column)
importance = np.sum(np.abs(weights), axis=0)
elif method == 'weight_variance':
# Variance of weights per neuron (column)
importance = np.var(weights, axis=0)
elif method == 'random':
# Random importance for baseline comparison
importance = np.random.rand(weights.shape[1])
else:
raise ValueError(f"Unknown importance method: {method}")
return importance
### END SOLUTION
# %% ../../modules/source/temp_holding/16_regularization/regularization_dev.ipynb 25
def prune_layer_neurons(layer: Dense, keep_ratio: float = 0.7,
importance_method: str = 'weight_magnitude') -> Tuple[Dense, Dict[str, Any]]:
"""
Remove least important neurons from a Dense layer.
Args:
layer: Dense layer to prune
keep_ratio: Fraction of neurons to keep (0.0 to 1.0)
importance_method: Method for computing neuron importance
Returns:
Tuple of (pruned_layer, pruning_info)
TODO: Implement structured neuron pruning.
STEP-BY-STEP IMPLEMENTATION:
1. Compute importance scores for all neurons
2. Determine how many neurons to keep
3. Select indices of most important neurons
4. Create new layer with reduced dimensions
5. Copy weights and biases for selected neurons
6. Return pruned layer and statistics
EXAMPLE USAGE:
```python
layer = Dense(784, 128)
pruned_layer, info = prune_layer_neurons(layer, keep_ratio=0.75)
print(f"Reduced from {info['original_neurons']} to {info['remaining_neurons']} neurons")
```
IMPLEMENTATION HINTS:
- Use np.argsort() to rank neurons by importance
- Take the top keep_count neurons: indices[-keep_count:]
- Create new layer with reduced output size
- Copy both weights and bias for selected neurons
- Track original and new sizes for statistics
LEARNING CONNECTIONS:
- This is actual model architecture modification
- Hardware gets real speedup from smaller matrices
- Must consider cascade effects on next layers
"""
### BEGIN SOLUTION
# Compute neuron importance
importance_scores = compute_neuron_importance(layer, importance_method)
# Determine how many neurons to keep
original_neurons = layer.output_size
keep_count = max(1, int(original_neurons * keep_ratio)) # Keep at least 1 neuron
# Select most important neurons
sorted_indices = np.argsort(importance_scores)
keep_indices = sorted_indices[-keep_count:] # Take top keep_count neurons
keep_indices = np.sort(keep_indices) # Sort for consistent ordering
# Get current weights and biases
weights = layer.weights.data
if not isinstance(weights, np.ndarray):
weights = np.array(weights)
bias = layer.bias.data if layer.bias is not None else None
if bias is not None and not isinstance(bias, np.ndarray):
bias = np.array(bias)
# Create new layer with reduced dimensions
pruned_layer = Dense(layer.input_size, keep_count)
# Copy weights for selected neurons
pruned_weights = weights[:, keep_indices]
pruned_layer.weights = Tensor(np.ascontiguousarray(pruned_weights))
# Copy bias for selected neurons
if bias is not None:
pruned_bias = bias[keep_indices]
pruned_layer.bias = Tensor(np.ascontiguousarray(pruned_bias))
# Calculate pruning statistics
neurons_removed = original_neurons - keep_count
compression_ratio = original_neurons / keep_count if keep_count > 0 else float('inf')
# Calculate parameter reduction
original_params = layer.input_size * original_neurons + (original_neurons if bias is not None else 0)
new_params = layer.input_size * keep_count + (keep_count if bias is not None else 0)
param_reduction = (original_params - new_params) / original_params
pruning_info = {
'keep_ratio': keep_ratio,
'importance_method': importance_method,
'original_neurons': original_neurons,
'remaining_neurons': keep_count,
'neurons_removed': neurons_removed,
'compression_ratio': float(compression_ratio),
'original_params': original_params,
'new_params': new_params,
'param_reduction': float(param_reduction),
'keep_indices': keep_indices.tolist()
}
return pruned_layer, pruning_info
### END SOLUTION
# %% ../../modules/source/temp_holding/16_regularization/regularization_dev.ipynb 29
class CompressionSystemsProfiler:
"""
Advanced profiling system for analyzing compression techniques in production environments.
This profiler provides 65% implementation level analysis of compression techniques,
focusing on production deployment scenarios including quantization impact analysis,
inference speedup measurements, and hardware-specific optimizations.
"""
def __init__(self):
"""Initialize the compression systems profiler."""
self.metrics = CompressionMetrics()
self.compression_history = []
def analyze_quantization_impact(self, model: Sequential, target_bits: List[int] = [32, 16, 8, 4]) -> Dict[str, Any]:
"""
Analyze quantization impact across different bit widths for production deployment.
Args:
model: Sequential model to analyze
target_bits: List of bit widths to test
Returns:
Comprehensive quantization analysis including accuracy vs compression tradeoffs
TODO: Implement advanced quantization impact analysis (65% implementation level).
STEP-BY-STEP IMPLEMENTATION:
1. Create model copies for each bit width
2. Apply quantization with different bit widths
3. Measure memory reduction and inference implications
4. Calculate theoretical speedup for different hardware
5. Analyze accuracy degradation patterns
6. Generate production deployment recommendations
PRODUCTION PATTERNS TO ANALYZE:
- Mobile deployment (ARM processors, limited memory)
- Edge inference (TPUs, power constraints)
- Cloud serving (GPU acceleration, batch processing)
- Real-time systems (latency requirements)
IMPLEMENTATION HINTS:
- Model different hardware characteristics
- Consider memory bandwidth limitations
- Include power consumption estimates
- Analyze batch vs single inference patterns
LEARNING CONNECTIONS:
- This mirrors TensorFlow Lite quantization analysis
- Production systems need this kind of comprehensive analysis
- Hardware-aware compression is crucial for deployment
"""
### BEGIN SOLUTION
results = {
'quantization_analysis': {},
'hardware_recommendations': {},
'deployment_scenarios': {}
}
baseline_size = self.metrics.calculate_model_size(model, dtype='float32')
baseline_params = self.metrics.count_parameters(model)['total_parameters']
for bits in target_bits:
# Create model copy for quantization
test_model = Sequential([Dense(layer.input_size, layer.output_size) for layer in model.layers])
for i, layer in enumerate(test_model.layers):
layer.weights = Tensor(model.layers[i].weights.data.copy() if hasattr(model.layers[i].weights.data, 'copy') else np.array(model.layers[i].weights.data))
if hasattr(layer, 'bias') and model.layers[i].bias is not None:
layer.bias = Tensor(model.layers[i].bias.data.copy() if hasattr(model.layers[i].bias.data, 'copy') else np.array(model.layers[i].bias.data))
# Apply quantization to all layers
total_error = 0
for i, layer in enumerate(test_model.layers):
if isinstance(layer, Dense):
_, quant_info = quantize_layer_weights(layer, bits=bits)
total_error += quant_info['mse_error']
# Calculate quantized model size
dtype_map = {32: 'float32', 16: 'float16', 8: 'int8', 4: 'int8'} # Approximate for 4-bit
quantized_size = self.metrics.calculate_model_size(test_model, dtype=dtype_map.get(bits, 'int8'))
# Memory and performance analysis
memory_reduction = baseline_size['size_mb'] / quantized_size['size_mb']
# Hardware-specific analysis
hardware_analysis = {
'mobile_arm': {
'memory_bandwidth_improvement': memory_reduction * 0.8, # ARM efficiency
'inference_speedup': min(memory_reduction * 0.6, 4.0), # Conservative estimate
'power_reduction': memory_reduction * 0.7, # Power scales with memory access
'deployment_feasibility': 'excellent' if quantized_size['size_mb'] < 10 else 'good' if quantized_size['size_mb'] < 50 else 'limited'
},
'edge_tpu': {
'quantization_compatibility': 'native' if bits == 8 else 'emulated',
'inference_speedup': 8.0 if bits == 8 else 1.0, # TPUs optimized for INT8
'power_efficiency': 'optimal' if bits == 8 else 'suboptimal',
'deployment_feasibility': 'excellent' if bits == 8 and quantized_size['size_mb'] < 20 else 'limited'
},
'gpu_cloud': {
'tensor_core_acceleration': True if bits in [16, 8] else False,
'batch_throughput_improvement': memory_reduction * 1.2, # GPU batch efficiency
'memory_capacity_improvement': memory_reduction,
'deployment_feasibility': 'excellent' # Cloud has fewer constraints
}
}
results['quantization_analysis'][f'{bits}bit'] = {
'bits': bits,
'model_size_mb': quantized_size['size_mb'],
'memory_reduction_factor': memory_reduction,
'quantization_error': total_error / len(test_model.layers),
'compression_ratio': baseline_size['size_mb'] / quantized_size['size_mb'],
'hardware_analysis': hardware_analysis
}
# Generate deployment recommendations
results['deployment_scenarios'] = {
'mobile_deployment': {
'recommended_bits': 8,
'rationale': 'INT8 provides optimal balance of size reduction and ARM processor efficiency',
'expected_benefits': 'Memory reduction, inference speedup, improved battery life',
'considerations': 'Monitor accuracy degradation, test on target devices'
},
'edge_inference': {
'recommended_bits': 8,
'rationale': 'Edge TPUs and similar hardware optimized for INT8 quantization',
'expected_benefits': 'Maximum hardware acceleration, minimal power consumption',
'considerations': 'Ensure quantization-aware training for best accuracy'
},
'cloud_serving': {
'recommended_bits': 16,
'rationale': 'FP16 provides good compression with minimal accuracy loss and GPU acceleration',
'expected_benefits': 'Increased batch throughput, reduced memory usage',
'considerations': 'Consider mixed precision for optimal performance'
}
}
return results
### END SOLUTION
def measure_inference_speedup(self, original_model: Sequential, compressed_model: Sequential,
batch_sizes: List[int] = [1, 8, 32, 128]) -> Dict[str, Any]:
"""
Measure theoretical inference speedup from compression techniques.
Args:
original_model: Baseline model
compressed_model: Compressed model to compare
batch_sizes: Different batch sizes for analysis
Returns:
Inference speedup analysis across different scenarios
"""
results = {
'flops_analysis': {},
'memory_analysis': {},
'speedup_estimates': {}
}
# Calculate FLOPs for both models
original_flops = self._calculate_model_flops(original_model)
compressed_flops = self._calculate_model_flops(compressed_model)
# Memory analysis
original_size = self.metrics.calculate_model_size(original_model)
compressed_size = self.metrics.calculate_model_size(compressed_model)
results['flops_analysis'] = {
'original_flops': original_flops,
'compressed_flops': compressed_flops,
'flops_reduction': (original_flops - compressed_flops) / original_flops,
'computational_speedup': original_flops / compressed_flops if compressed_flops > 0 else float('inf')
}
results['memory_analysis'] = {
'original_size_mb': original_size['size_mb'],
'compressed_size_mb': compressed_size['size_mb'],
'memory_reduction': (original_size['size_mb'] - compressed_size['size_mb']) / original_size['size_mb'],
'memory_speedup': original_size['size_mb'] / compressed_size['size_mb']
}
# Estimate speedup for different scenarios
for batch_size in batch_sizes:
compute_time_original = original_flops * batch_size / 1e9 # Assume 1 GFLOPS baseline
compute_time_compressed = compressed_flops * batch_size / 1e9
memory_time_original = original_size['size_mb'] * batch_size / 100 # Assume 100 MB/s memory bandwidth
memory_time_compressed = compressed_size['size_mb'] * batch_size / 100
total_time_original = compute_time_original + memory_time_original
total_time_compressed = compute_time_compressed + memory_time_compressed
results['speedup_estimates'][f'batch_{batch_size}'] = {
'compute_speedup': compute_time_original / compute_time_compressed if compute_time_compressed > 0 else float('inf'),
'memory_speedup': memory_time_original / memory_time_compressed if memory_time_compressed > 0 else float('inf'),
'total_speedup': total_time_original / total_time_compressed if total_time_compressed > 0 else float('inf')
}
return results
def analyze_accuracy_tradeoffs(self, model: Sequential, compression_levels: List[float] = [0.1, 0.3, 0.5, 0.7, 0.9]) -> Dict[str, Any]:
"""
Analyze accuracy vs compression tradeoffs across different compression levels.
Args:
model: Model to analyze
compression_levels: Different compression ratios to test
Returns:
Analysis of accuracy degradation patterns
"""
results = {
'compression_curves': {},
'optimal_operating_points': {},
'production_recommendations': {}
}
baseline_size = self.metrics.calculate_model_size(model)
for level in compression_levels:
# Test different compression techniques at this level
techniques = {
'magnitude_pruning': self._apply_magnitude_pruning(model, level),
'structured_pruning': self._apply_structured_pruning(model, 1 - level),
'quantization': self._apply_quantization(model, max(4, int(32 * (1 - level))))
}
for technique_name, compressed_model in techniques.items():
if compressed_model is not None:
compressed_size = self.metrics.calculate_model_size(compressed_model)
compression_ratio = baseline_size['size_mb'] / compressed_size['size_mb']
if technique_name not in results['compression_curves']:
results['compression_curves'][technique_name] = []
results['compression_curves'][technique_name].append({
'compression_level': level,
'compression_ratio': compression_ratio,
'size_mb': compressed_size['size_mb'],
'estimated_accuracy_retention': 1.0 - (level * 0.5) # Simplified model
})
# Find optimal operating points
for technique in results['compression_curves']:
curves = results['compression_curves'][technique]
# Find point with best accuracy/compression balance
best_point = max(curves, key=lambda x: x['compression_ratio'] * x['estimated_accuracy_retention'])
results['optimal_operating_points'][technique] = best_point
return results
def _calculate_model_flops(self, model: Sequential) -> int:
"""Calculate FLOPs for a Sequential model."""
total_flops = 0
for layer in model.layers:
if isinstance(layer, Dense):
total_flops += layer.input_size * layer.output_size * 2 # Multiply-add operations
return total_flops
def _apply_magnitude_pruning(self, model: Sequential, pruning_ratio: float) -> Optional[Sequential]:
"""Apply magnitude pruning to a model copy."""
try:
test_model = Sequential([Dense(layer.input_size, layer.output_size) for layer in model.layers])
for i, layer in enumerate(test_model.layers):
layer.weights = Tensor(model.layers[i].weights.data.copy() if hasattr(model.layers[i].weights.data, 'copy') else np.array(model.layers[i].weights.data))
if hasattr(layer, 'bias') and model.layers[i].bias is not None:
layer.bias = Tensor(model.layers[i].bias.data.copy() if hasattr(model.layers[i].bias.data, 'copy') else np.array(model.layers[i].bias.data))
prune_weights_by_magnitude(layer, pruning_ratio)
return test_model
except Exception:
return None
def _apply_structured_pruning(self, model: Sequential, keep_ratio: float) -> Optional[Sequential]:
"""Apply structured pruning to a model copy."""
try:
test_model = Sequential([Dense(layer.input_size, layer.output_size) for layer in model.layers])
for i, layer in enumerate(test_model.layers):
layer.weights = Tensor(model.layers[i].weights.data.copy() if hasattr(model.layers[i].weights.data, 'copy') else np.array(model.layers[i].weights.data))
if hasattr(layer, 'bias') and model.layers[i].bias is not None:
layer.bias = Tensor(model.layers[i].bias.data.copy() if hasattr(model.layers[i].bias.data, 'copy') else np.array(model.layers[i].bias.data))
pruned_layer, _ = prune_layer_neurons(layer, keep_ratio)
test_model.layers[i] = pruned_layer
return test_model
except Exception:
return None
def _apply_quantization(self, model: Sequential, bits: int) -> Optional[Sequential]:
"""Apply quantization to a model copy."""
try:
test_model = Sequential([Dense(layer.input_size, layer.output_size) for layer in model.layers])
for i, layer in enumerate(test_model.layers):
layer.weights = Tensor(model.layers[i].weights.data.copy() if hasattr(model.layers[i].weights.data, 'copy') else np.array(model.layers[i].weights.data))
if hasattr(layer, 'bias') and model.layers[i].bias is not None:
layer.bias = Tensor(model.layers[i].bias.data.copy() if hasattr(model.layers[i].bias.data, 'copy') else np.array(model.layers[i].bias.data))
quantize_layer_weights(layer, bits)
return test_model
except Exception:
return None
# %% ../../modules/source/temp_holding/16_regularization/regularization_dev.ipynb 30
def compare_compression_techniques(original_model: Sequential) -> Dict[str, Dict[str, Any]]:
"""
Compare all compression techniques on the same model.
Args:
original_model: Base model to compress using different techniques
Returns:
Dictionary comparing results from different compression approaches
TODO: Implement comprehensive compression comparison.
STEP-BY-STEP IMPLEMENTATION:
1. Set up baseline metrics from original model
2. Apply each compression technique individually
3. Apply combined compression techniques
4. Measure and compare all results
5. Return comprehensive comparison data
COMPARISON DIMENSIONS:
- Model size (MB)
- Parameter count
- Compression ratio
- Memory reduction
- Estimated speedup (for structured techniques)
IMPLEMENTATION HINTS:
- Create separate model copies for each technique
- Use consistent parameters across techniques
- Track both individual and combined effects
- Include baseline for reference
LEARNING CONNECTIONS:
- This is how research papers compare compression methods
- Production systems need this analysis for deployment decisions
- Understanding trade-offs guides technique selection
"""
### BEGIN SOLUTION
results = {}
metrics = CompressionMetrics()
# Baseline: Original model
baseline_params = metrics.count_parameters(original_model)
baseline_size = metrics.calculate_model_size(original_model)
results['baseline'] = {
'technique': 'Original Model',
'parameters': baseline_params['total_parameters'],
'size_mb': baseline_size['size_mb'],
'compression_ratio': 1.0,
'memory_reduction': 0.0
}
# Technique 1: Magnitude-based pruning only
model_pruning = Sequential([Dense(layer.input_size, layer.output_size) for layer in original_model.layers])
for i, layer in enumerate(model_pruning.layers):
layer.weights = Tensor(original_model.layers[i].weights.data.copy() if hasattr(original_model.layers[i].weights.data, 'copy') else np.array(original_model.layers[i].weights.data))
if hasattr(layer, 'bias') and original_model.layers[i].bias is not None:
layer.bias = Tensor(original_model.layers[i].bias.data.copy() if hasattr(original_model.layers[i].bias.data, 'copy') else np.array(original_model.layers[i].bias.data))
# Apply magnitude pruning to each layer
total_sparsity = 0
for i, layer in enumerate(model_pruning.layers):
if isinstance(layer, Dense):
_, prune_info = prune_weights_by_magnitude(layer, pruning_ratio=0.3)
total_sparsity += prune_info['sparsity']
avg_sparsity = total_sparsity / len(model_pruning.layers)
pruning_params = metrics.count_parameters(model_pruning)
pruning_size = metrics.calculate_model_size(model_pruning)
results['magnitude_pruning'] = {
'technique': 'Magnitude Pruning (30%)',
'parameters': pruning_params['total_parameters'],
'size_mb': pruning_size['size_mb'],
'compression_ratio': baseline_size['size_mb'] / pruning_size['size_mb'],
'memory_reduction': (baseline_size['size_mb'] - pruning_size['size_mb']) / baseline_size['size_mb'],
'sparsity': avg_sparsity
}
# Technique 2: Quantization only
model_quantization = Sequential([Dense(layer.input_size, layer.output_size) for layer in original_model.layers])
for i, layer in enumerate(model_quantization.layers):
layer.weights = Tensor(original_model.layers[i].weights.data.copy() if hasattr(original_model.layers[i].weights.data, 'copy') else np.array(original_model.layers[i].weights.data))
if hasattr(layer, 'bias') and original_model.layers[i].bias is not None:
layer.bias = Tensor(original_model.layers[i].bias.data.copy() if hasattr(original_model.layers[i].bias.data, 'copy') else np.array(original_model.layers[i].bias.data))
# Apply quantization to each layer
total_memory_reduction = 0
for i, layer in enumerate(model_quantization.layers):
if isinstance(layer, Dense):
_, quant_info = quantize_layer_weights(layer, bits=8)
total_memory_reduction += quant_info['memory_reduction']
avg_memory_reduction = total_memory_reduction / len(model_quantization.layers)
quantization_size = metrics.calculate_model_size(model_quantization, dtype='int8')
results['quantization'] = {
'technique': 'Quantization (INT8)',
'parameters': baseline_params['total_parameters'],
'size_mb': quantization_size['size_mb'],
'compression_ratio': baseline_size['size_mb'] / quantization_size['size_mb'],
'memory_reduction': (baseline_size['size_mb'] - quantization_size['size_mb']) / baseline_size['size_mb'],
'avg_memory_reduction_factor': avg_memory_reduction
}
# Technique 3: Structured pruning only
model_structured = Sequential([Dense(layer.input_size, layer.output_size) for layer in original_model.layers])
for i, layer in enumerate(model_structured.layers):
layer.weights = Tensor(original_model.layers[i].weights.data.copy() if hasattr(original_model.layers[i].weights.data, 'copy') else np.array(original_model.layers[i].weights.data))
if hasattr(layer, 'bias') and original_model.layers[i].bias is not None:
layer.bias = Tensor(original_model.layers[i].bias.data.copy() if hasattr(original_model.layers[i].bias.data, 'copy') else np.array(original_model.layers[i].bias.data))
# Apply structured pruning to each layer
total_param_reduction = 0
for i, layer in enumerate(model_structured.layers):
if isinstance(layer, Dense):
pruned_layer, struct_info = prune_layer_neurons(layer, keep_ratio=0.75)
model_structured.layers[i] = pruned_layer
total_param_reduction += struct_info['param_reduction']
avg_param_reduction = total_param_reduction / len(model_structured.layers)
structured_params = metrics.count_parameters(model_structured)
structured_size = metrics.calculate_model_size(model_structured)
results['structured_pruning'] = {
'technique': 'Structured Pruning (75% neurons kept)',
'parameters': structured_params['total_parameters'],
'size_mb': structured_size['size_mb'],
'compression_ratio': baseline_size['size_mb'] / structured_size['size_mb'],
'memory_reduction': (baseline_size['size_mb'] - structured_size['size_mb']) / baseline_size['size_mb'],
'param_reduction': avg_param_reduction
}
# Technique 4: Combined approach
model_combined = Sequential([Dense(layer.input_size, layer.output_size) for layer in original_model.layers])
for i, layer in enumerate(model_combined.layers):
layer.weights = Tensor(original_model.layers[i].weights.data.copy() if hasattr(original_model.layers[i].weights.data, 'copy') else np.array(original_model.layers[i].weights.data))
if hasattr(layer, 'bias') and original_model.layers[i].bias is not None:
layer.bias = Tensor(original_model.layers[i].bias.data.copy() if hasattr(original_model.layers[i].bias.data, 'copy') else np.array(original_model.layers[i].bias.data))
# Apply magnitude pruning + quantization + structured pruning
for i, layer in enumerate(model_combined.layers):
if isinstance(layer, Dense):
# Step 1: Magnitude pruning
_, _ = prune_weights_by_magnitude(layer, pruning_ratio=0.2)
# Step 2: Quantization
_, _ = quantize_layer_weights(layer, bits=8)
# Step 3: Structured pruning
pruned_layer, _ = prune_layer_neurons(layer, keep_ratio=0.8)
model_combined.layers[i] = pruned_layer
combined_params = metrics.count_parameters(model_combined)
combined_size = metrics.calculate_model_size(model_combined, dtype='int8')
results['combined'] = {
'technique': 'Combined (Pruning + Quantization + Structured)',
'parameters': combined_params['total_parameters'],
'size_mb': combined_size['size_mb'],
'compression_ratio': baseline_size['size_mb'] / combined_size['size_mb'],
'memory_reduction': (baseline_size['size_mb'] - combined_size['size_mb']) / baseline_size['size_mb']
}
return results
### END SOLUTION