mirror of
https://github.com/MLSysBook/TinyTorch.git
synced 2026-04-28 18:00:02 -05:00
Restore TinyGPT implementation files after stash merge
- Move TinyGPT files to correct directory structure - Resolve merge conflicts from stash restoration - TinyGPT now implements attention and transformer models using TinyTorch foundation
This commit is contained in:
352
tinyGPT/tinyGPT/core/attention.py
Normal file
352
tinyGPT/tinyGPT/core/attention.py
Normal file
@@ -0,0 +1,352 @@
|
||||
"""
|
||||
Attention mechanisms for TinyGPT transformer models.
|
||||
|
||||
Implements self-attention and multi-head attention using TinyTorch components.
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Add TinyTorch to path for reusing components
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
|
||||
|
||||
try:
|
||||
from tinytorch.core.tensor import Tensor
|
||||
from tinytorch.core.layers import Dense
|
||||
from tinytorch.core.activations import Softmax
|
||||
except ImportError:
|
||||
print("⚠️ TinyTorch not available. Using mock implementations for development.")
|
||||
# Mock implementations for development
|
||||
class Tensor:
|
||||
def __init__(self, data):
|
||||
self.data = np.array(data)
|
||||
self.shape = self.data.shape
|
||||
|
||||
def __matmul__(self, other):
|
||||
if isinstance(other, Tensor):
|
||||
return Tensor(self.data @ other.data)
|
||||
return Tensor(self.data @ other)
|
||||
|
||||
def transpose(self, axes=None):
|
||||
if axes is None:
|
||||
return Tensor(self.data.T)
|
||||
return Tensor(np.transpose(self.data, axes))
|
||||
|
||||
def softmax(self, axis=-1):
|
||||
exp_data = np.exp(self.data - np.max(self.data, axis=axis, keepdims=True))
|
||||
return Tensor(exp_data / np.sum(exp_data, axis=axis, keepdims=True))
|
||||
|
||||
def __add__(self, other):
|
||||
if isinstance(other, Tensor):
|
||||
return Tensor(self.data + other.data)
|
||||
return Tensor(self.data + other)
|
||||
|
||||
def __mul__(self, other):
|
||||
if isinstance(other, Tensor):
|
||||
return Tensor(self.data * other.data)
|
||||
return Tensor(self.data * other)
|
||||
|
||||
class Dense:
|
||||
def __init__(self, in_features, out_features):
|
||||
self.in_features = in_features
|
||||
self.out_features = out_features
|
||||
self.weight = Tensor(np.random.randn(in_features, out_features) * 0.1)
|
||||
self.bias = Tensor(np.zeros(out_features))
|
||||
|
||||
def forward(self, x):
|
||||
return x @ self.weight + self.bias
|
||||
|
||||
class Softmax:
|
||||
def forward(self, x):
|
||||
return x.softmax()
|
||||
|
||||
|
||||
class MultiHeadAttention:
|
||||
"""Multi-head self-attention mechanism using TinyTorch Dense layers."""
|
||||
|
||||
def __init__(self, d_model: int, num_heads: int, dropout: float = 0.1):
|
||||
"""Initialize multi-head attention.
|
||||
|
||||
Args:
|
||||
d_model: Model dimension (embedding size)
|
||||
num_heads: Number of attention heads
|
||||
dropout: Dropout rate (not implemented yet)
|
||||
"""
|
||||
assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
|
||||
|
||||
self.d_model = d_model
|
||||
self.num_heads = num_heads
|
||||
self.d_k = d_model // num_heads
|
||||
self.dropout = dropout
|
||||
|
||||
# Linear projections for Q, K, V using TinyTorch Dense layers
|
||||
self.w_q = Dense(d_model, d_model)
|
||||
self.w_k = Dense(d_model, d_model)
|
||||
self.w_v = Dense(d_model, d_model)
|
||||
self.w_o = Dense(d_model, d_model) # Output projection
|
||||
|
||||
self.softmax = Softmax()
|
||||
|
||||
def forward(self, query: Tensor, key: Tensor, value: Tensor,
|
||||
mask: Tensor = None) -> Tensor:
|
||||
"""Forward pass of multi-head attention.
|
||||
|
||||
Args:
|
||||
query: Query tensor of shape (batch_size, seq_len, d_model)
|
||||
key: Key tensor of shape (batch_size, seq_len, d_model)
|
||||
value: Value tensor of shape (batch_size, seq_len, d_model)
|
||||
mask: Optional attention mask
|
||||
|
||||
Returns:
|
||||
Attention output of shape (batch_size, seq_len, d_model)
|
||||
"""
|
||||
batch_size, seq_len, d_model = query.shape
|
||||
|
||||
# Reshape for TinyTorch Dense layers (expects 2D)
|
||||
query_2d = Tensor(query.data.reshape(-1, d_model)) # (batch_size * seq_len, d_model)
|
||||
key_2d = Tensor(key.data.reshape(-1, d_model))
|
||||
value_2d = Tensor(value.data.reshape(-1, d_model))
|
||||
|
||||
# Linear projections
|
||||
Q_2d = self.w_q.forward(query_2d) # (batch_size * seq_len, d_model)
|
||||
K_2d = self.w_k.forward(key_2d)
|
||||
V_2d = self.w_v.forward(value_2d)
|
||||
|
||||
# Reshape back to 3D
|
||||
Q = Tensor(Q_2d.data.reshape(batch_size, seq_len, d_model))
|
||||
K = Tensor(K_2d.data.reshape(batch_size, seq_len, d_model))
|
||||
V = Tensor(V_2d.data.reshape(batch_size, seq_len, d_model))
|
||||
|
||||
# Reshape for multi-head attention
|
||||
Q = self._reshape_for_attention(Q) # (batch_size, num_heads, seq_len, d_k)
|
||||
K = self._reshape_for_attention(K) # (batch_size, num_heads, seq_len, d_k)
|
||||
V = self._reshape_for_attention(V) # (batch_size, num_heads, seq_len, d_k)
|
||||
|
||||
# Scaled dot-product attention
|
||||
attention_output = self._scaled_dot_product_attention(Q, K, V, mask)
|
||||
|
||||
# Concatenate heads
|
||||
attention_output = self._combine_heads(attention_output)
|
||||
|
||||
# Final linear projection (reshape for Dense layer)
|
||||
batch_size, seq_len, d_model = attention_output.shape
|
||||
attention_2d = Tensor(attention_output.data.reshape(-1, d_model))
|
||||
output_2d = self.w_o.forward(attention_2d)
|
||||
output = Tensor(output_2d.data.reshape(batch_size, seq_len, d_model))
|
||||
|
||||
return output
|
||||
|
||||
def _reshape_for_attention(self, x: Tensor) -> Tensor:
|
||||
"""Reshape tensor for multi-head attention."""
|
||||
batch_size, seq_len, d_model = x.shape
|
||||
# Reshape to (batch_size, seq_len, num_heads, d_k)
|
||||
reshaped = Tensor(x.data.reshape(batch_size, seq_len, self.num_heads, self.d_k))
|
||||
# Transpose to (batch_size, num_heads, seq_len, d_k)
|
||||
return Tensor(reshaped.data.transpose(0, 2, 1, 3))
|
||||
|
||||
def _combine_heads(self, x: Tensor) -> Tensor:
|
||||
"""Combine attention heads back into single tensor."""
|
||||
batch_size, num_heads, seq_len, d_k = x.shape
|
||||
# Transpose back to (batch_size, seq_len, num_heads, d_k)
|
||||
transposed = Tensor(x.data.transpose(0, 2, 1, 3))
|
||||
# Reshape to (batch_size, seq_len, d_model)
|
||||
return Tensor(transposed.data.reshape(batch_size, seq_len, self.d_model))
|
||||
|
||||
def _scaled_dot_product_attention(self, Q: Tensor, K: Tensor, V: Tensor,
|
||||
mask: Tensor = None) -> Tensor:
|
||||
"""Compute scaled dot-product attention."""
|
||||
# Compute attention scores
|
||||
# Q: (batch_size, num_heads, seq_len, d_k)
|
||||
# K: (batch_size, num_heads, seq_len, d_k)
|
||||
# Scores: (batch_size, num_heads, seq_len, seq_len)
|
||||
|
||||
K_T = K.data.transpose(0, 1, 3, 2) # Transpose K
|
||||
scores = Tensor(np.matmul(Q.data, K_T)) # QK^T using numpy matmul
|
||||
scores = scores * (1.0 / np.sqrt(self.d_k)) # Scale
|
||||
|
||||
# Apply mask if provided (for causal attention)
|
||||
if mask is not None:
|
||||
scores = scores + (mask * -1e9)
|
||||
|
||||
# Apply softmax manually since TinyTorch Tensor doesn't have softmax
|
||||
# Subtract max for numerical stability
|
||||
scores_max = np.max(scores.data, axis=-1, keepdims=True)
|
||||
scores_shifted = scores.data - scores_max
|
||||
exp_scores = np.exp(scores_shifted)
|
||||
softmax_weights = exp_scores / np.sum(exp_scores, axis=-1, keepdims=True)
|
||||
attention_weights = Tensor(softmax_weights)
|
||||
|
||||
# Apply attention to values
|
||||
# attention_weights: (batch_size, num_heads, seq_len, seq_len)
|
||||
# V: (batch_size, num_heads, seq_len, d_k)
|
||||
# Output: (batch_size, num_heads, seq_len, d_k)
|
||||
output = Tensor(np.matmul(attention_weights.data, V.data))
|
||||
|
||||
return output
|
||||
|
||||
|
||||
class SelfAttention:
|
||||
"""Simplified self-attention for easier understanding."""
|
||||
|
||||
def __init__(self, d_model: int):
|
||||
"""Initialize self-attention.
|
||||
|
||||
Args:
|
||||
d_model: Model dimension
|
||||
"""
|
||||
self.d_model = d_model
|
||||
self.scale = 1.0 / np.sqrt(d_model)
|
||||
|
||||
# Single-head attention projections
|
||||
self.w_q = Dense(d_model, d_model)
|
||||
self.w_k = Dense(d_model, d_model)
|
||||
self.w_v = Dense(d_model, d_model)
|
||||
|
||||
self.softmax = Softmax()
|
||||
|
||||
def forward(self, x: Tensor, mask: Tensor = None) -> Tensor:
|
||||
"""Forward pass of self-attention.
|
||||
|
||||
Args:
|
||||
x: Input tensor of shape (batch_size, seq_len, d_model)
|
||||
mask: Optional attention mask
|
||||
|
||||
Returns:
|
||||
Attention output of same shape as input
|
||||
"""
|
||||
# Compute Q, K, V
|
||||
Q = self.w_q.forward(x) # (batch_size, seq_len, d_model)
|
||||
K = self.w_k.forward(x) # (batch_size, seq_len, d_model)
|
||||
V = self.w_v.forward(x) # (batch_size, seq_len, d_model)
|
||||
|
||||
# Compute attention scores
|
||||
scores = Q @ K.transpose((0, 2, 1)) # (batch_size, seq_len, seq_len)
|
||||
scores = scores * self.scale
|
||||
|
||||
# Apply mask if provided
|
||||
if mask is not None:
|
||||
scores = scores + (mask * -1e9)
|
||||
|
||||
# Apply softmax
|
||||
attention_weights = scores.softmax(axis=-1)
|
||||
|
||||
# Apply attention to values
|
||||
output = attention_weights @ V # (batch_size, seq_len, d_model)
|
||||
|
||||
return output
|
||||
|
||||
|
||||
def create_causal_mask(seq_len: int) -> Tensor:
|
||||
"""Create causal mask for preventing attention to future tokens.
|
||||
|
||||
Args:
|
||||
seq_len: Sequence length
|
||||
|
||||
Returns:
|
||||
Causal mask of shape (seq_len, seq_len)
|
||||
"""
|
||||
# Create lower triangular matrix (0 = attend, 1 = mask)
|
||||
mask = np.triu(np.ones((seq_len, seq_len)), k=1)
|
||||
return Tensor(mask)
|
||||
|
||||
|
||||
class PositionalEncoding:
|
||||
"""Sinusoidal positional encoding for transformer models."""
|
||||
|
||||
def __init__(self, d_model: int, max_length: int = 5000):
|
||||
"""Initialize positional encoding.
|
||||
|
||||
Args:
|
||||
d_model: Model dimension
|
||||
max_length: Maximum sequence length
|
||||
"""
|
||||
self.d_model = d_model
|
||||
self.max_length = max_length
|
||||
|
||||
# Create positional encoding matrix
|
||||
pe = np.zeros((max_length, d_model))
|
||||
position = np.arange(0, max_length).reshape(-1, 1)
|
||||
|
||||
# Compute div_term for sinusoidal encoding
|
||||
div_term = np.exp(np.arange(0, d_model, 2) * -(np.log(10000.0) / d_model))
|
||||
|
||||
# Apply sin to even indices
|
||||
pe[:, 0::2] = np.sin(position * div_term)
|
||||
|
||||
# Apply cos to odd indices
|
||||
if d_model % 2 == 0:
|
||||
pe[:, 1::2] = np.cos(position * div_term)
|
||||
else:
|
||||
pe[:, 1::2] = np.cos(position * div_term[:-1])
|
||||
|
||||
self.pe = Tensor(pe)
|
||||
|
||||
def forward(self, x: Tensor) -> Tensor:
|
||||
"""Add positional encoding to input embeddings.
|
||||
|
||||
Args:
|
||||
x: Input embeddings of shape (batch_size, seq_len, d_model)
|
||||
|
||||
Returns:
|
||||
Embeddings with positional encoding added
|
||||
"""
|
||||
batch_size, seq_len, d_model = x.shape
|
||||
|
||||
# Get positional encodings for this sequence length
|
||||
pos_encoding = Tensor(self.pe.data[:seq_len, :])
|
||||
|
||||
# Add to input (broadcasting across batch dimension)
|
||||
return x + pos_encoding
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Test attention mechanisms
|
||||
print("🧪 Testing TinyGPT Attention Mechanisms")
|
||||
print("=" * 50)
|
||||
|
||||
# Test parameters
|
||||
batch_size = 2
|
||||
seq_len = 10
|
||||
d_model = 64
|
||||
num_heads = 8
|
||||
|
||||
# Create sample input
|
||||
x = Tensor(np.random.randn(batch_size, seq_len, d_model))
|
||||
print(f"Input shape: {x.shape}")
|
||||
|
||||
# Test self-attention
|
||||
print("\n🎯 Self-Attention:")
|
||||
self_attn = SelfAttention(d_model)
|
||||
output = self_attn.forward(x)
|
||||
print(f"Output shape: {output.shape}")
|
||||
|
||||
# Test multi-head attention
|
||||
print("\n🔀 Multi-Head Attention:")
|
||||
multi_head_attn = MultiHeadAttention(d_model, num_heads)
|
||||
output = multi_head_attn.forward(x, x, x)
|
||||
print(f"Output shape: {output.shape}")
|
||||
|
||||
# Test causal mask
|
||||
print("\n🎭 Causal Mask:")
|
||||
mask = create_causal_mask(seq_len)
|
||||
print(f"Mask shape: {mask.shape}")
|
||||
print(f"Mask sample:\n{mask.data[:5, :5]}")
|
||||
|
||||
# Test with causal mask
|
||||
masked_output = self_attn.forward(x, mask)
|
||||
print(f"Masked output shape: {masked_output.shape}")
|
||||
|
||||
# Test positional encoding
|
||||
print("\n📍 Positional Encoding:")
|
||||
pos_encoding = PositionalEncoding(d_model, max_length=100)
|
||||
encoded_x = pos_encoding.forward(x)
|
||||
print(f"Encoded shape: {encoded_x.shape}")
|
||||
|
||||
print("\n✅ Attention mechanism tests completed!")
|
||||
print("\n💡 Key insights:")
|
||||
print(" • Self-attention allows tokens to attend to each other")
|
||||
print(" • Multi-head attention captures different types of relationships")
|
||||
print(" • Causal masking prevents attention to future tokens")
|
||||
print(" • Positional encoding adds sequence order information")
|
||||
print(" • All components reuse TinyTorch Dense layers! 🎉")
|
||||
425
tinyGPT/tinyGPT/core/models.py
Normal file
425
tinyGPT/tinyGPT/core/models.py
Normal file
@@ -0,0 +1,425 @@
|
||||
"""
|
||||
TinyGPT transformer models built on TinyTorch components.
|
||||
|
||||
Implements GPT-style autoregressive language models that maximize reuse
|
||||
of TinyTorch layers while adding transformer-specific components.
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Add TinyTorch to path
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..'))
|
||||
|
||||
try:
|
||||
from tinytorch.core.tensor import Tensor
|
||||
from tinytorch.core.layers import Dense
|
||||
from tinytorch.core.activations import ReLU, Softmax
|
||||
# Don't import Sequential from TinyTorch - it doesn't handle 3D tensors
|
||||
TINYTORCH_AVAILABLE = True
|
||||
except ImportError:
|
||||
print("⚠️ TinyTorch not available. Using mock implementations.")
|
||||
# Use mock implementations from attention.py
|
||||
from .attention import Tensor, Dense
|
||||
TINYTORCH_AVAILABLE = False
|
||||
|
||||
class ReLU:
|
||||
def forward(self, x):
|
||||
return Tensor(np.maximum(0, x.data))
|
||||
|
||||
class Softmax:
|
||||
def forward(self, x):
|
||||
return x.softmax()
|
||||
|
||||
# Custom Sequential that handles 3D tensors (works with or without TinyTorch)
|
||||
class Sequential:
|
||||
def __init__(self, layers):
|
||||
self.layers = layers
|
||||
|
||||
def forward(self, x):
|
||||
# Handle 3D tensors by reshaping for Dense layers
|
||||
original_shape = x.shape
|
||||
if len(original_shape) == 3:
|
||||
batch_size, seq_len, d_model = original_shape
|
||||
x = Tensor(x.data.reshape(-1, d_model))
|
||||
|
||||
for layer in self.layers:
|
||||
x = layer.forward(x)
|
||||
|
||||
# Reshape back to original dimensions
|
||||
if len(original_shape) == 3:
|
||||
x = Tensor(x.data.reshape(batch_size, seq_len, -1))
|
||||
|
||||
return x
|
||||
|
||||
from .attention import MultiHeadAttention, PositionalEncoding, create_causal_mask
|
||||
|
||||
|
||||
class LayerNorm:
|
||||
"""Layer normalization for transformer models."""
|
||||
|
||||
def __init__(self, d_model: int, eps: float = 1e-6):
|
||||
"""Initialize layer normalization.
|
||||
|
||||
Args:
|
||||
d_model: Model dimension
|
||||
eps: Small constant for numerical stability
|
||||
"""
|
||||
self.d_model = d_model
|
||||
self.eps = eps
|
||||
|
||||
# Learnable parameters (simplified - would need proper gradient handling)
|
||||
self.gamma = Tensor(np.ones(d_model))
|
||||
self.beta = Tensor(np.zeros(d_model))
|
||||
|
||||
def forward(self, x: Tensor) -> Tensor:
|
||||
"""Apply layer normalization.
|
||||
|
||||
Args:
|
||||
x: Input tensor of shape (..., d_model)
|
||||
|
||||
Returns:
|
||||
Normalized tensor of same shape
|
||||
"""
|
||||
# Compute mean and variance along last dimension
|
||||
mean = np.mean(x.data, axis=-1, keepdims=True)
|
||||
var = np.var(x.data, axis=-1, keepdims=True)
|
||||
|
||||
# Normalize
|
||||
normalized = (x.data - mean) / np.sqrt(var + self.eps)
|
||||
|
||||
# Scale and shift
|
||||
output = normalized * self.gamma.data + self.beta.data
|
||||
|
||||
return Tensor(output)
|
||||
|
||||
|
||||
class TransformerBlock:
|
||||
"""Single transformer block with self-attention and feedforward network."""
|
||||
|
||||
def __init__(self, d_model: int, num_heads: int, d_ff: int, dropout: float = 0.1):
|
||||
"""Initialize transformer block.
|
||||
|
||||
Args:
|
||||
d_model: Model dimension
|
||||
num_heads: Number of attention heads
|
||||
d_ff: Feedforward network dimension
|
||||
dropout: Dropout rate (not implemented)
|
||||
"""
|
||||
self.d_model = d_model
|
||||
self.num_heads = num_heads
|
||||
self.d_ff = d_ff
|
||||
self.dropout = dropout
|
||||
|
||||
# Multi-head self-attention
|
||||
self.self_attention = MultiHeadAttention(d_model, num_heads, dropout)
|
||||
|
||||
# Feedforward network using TinyTorch Dense layers
|
||||
self.feedforward = Sequential([
|
||||
Dense(d_model, d_ff),
|
||||
ReLU(),
|
||||
Dense(d_ff, d_model)
|
||||
])
|
||||
|
||||
# Layer normalization
|
||||
self.ln1 = LayerNorm(d_model)
|
||||
self.ln2 = LayerNorm(d_model)
|
||||
|
||||
def forward(self, x: Tensor, mask: Tensor = None) -> Tensor:
|
||||
"""Forward pass of transformer block.
|
||||
|
||||
Args:
|
||||
x: Input tensor of shape (batch_size, seq_len, d_model)
|
||||
mask: Optional attention mask
|
||||
|
||||
Returns:
|
||||
Output tensor of same shape as input
|
||||
"""
|
||||
# Self-attention with residual connection and layer norm
|
||||
attn_output = self.self_attention.forward(x, x, x, mask)
|
||||
x = self.ln1.forward(x + attn_output) # Residual connection
|
||||
|
||||
# Feedforward with residual connection and layer norm
|
||||
ff_output = self.feedforward.forward(x)
|
||||
x = self.ln2.forward(x + ff_output) # Residual connection
|
||||
|
||||
return x
|
||||
|
||||
|
||||
class TinyGPT:
|
||||
"""TinyGPT: GPT-style transformer model using TinyTorch components."""
|
||||
|
||||
def __init__(self, vocab_size: int, d_model: int = 256, num_heads: int = 8,
|
||||
num_layers: int = 6, d_ff: int = None, max_length: int = 1024,
|
||||
dropout: float = 0.1):
|
||||
"""Initialize TinyGPT model.
|
||||
|
||||
Args:
|
||||
vocab_size: Vocabulary size
|
||||
d_model: Model dimension (embedding size)
|
||||
num_heads: Number of attention heads
|
||||
num_layers: Number of transformer layers
|
||||
d_ff: Feedforward dimension (default: 4 * d_model)
|
||||
max_length: Maximum sequence length
|
||||
dropout: Dropout rate
|
||||
"""
|
||||
self.vocab_size = vocab_size
|
||||
self.d_model = d_model
|
||||
self.num_heads = num_heads
|
||||
self.num_layers = num_layers
|
||||
self.d_ff = d_ff or 4 * d_model
|
||||
self.max_length = max_length
|
||||
self.dropout = dropout
|
||||
|
||||
# Token embeddings using TinyTorch Dense layer
|
||||
self.token_embedding = Dense(vocab_size, d_model)
|
||||
|
||||
# Positional encoding
|
||||
self.positional_encoding = PositionalEncoding(d_model, max_length)
|
||||
|
||||
# Transformer blocks
|
||||
self.blocks = [
|
||||
TransformerBlock(d_model, num_heads, self.d_ff, dropout)
|
||||
for _ in range(num_layers)
|
||||
]
|
||||
|
||||
# Final layer norm
|
||||
self.ln_final = LayerNorm(d_model)
|
||||
|
||||
# Output projection to vocabulary using TinyTorch Dense layer
|
||||
self.output_projection = Dense(d_model, vocab_size)
|
||||
|
||||
print(f"🤖 TinyGPT initialized:")
|
||||
print(f" Vocab size: {vocab_size}")
|
||||
print(f" Model dim: {d_model}")
|
||||
print(f" Heads: {num_heads}")
|
||||
print(f" Layers: {num_layers}")
|
||||
print(f" Parameters: ~{self.count_parameters():,}")
|
||||
|
||||
def forward(self, input_ids: Tensor, use_cache: bool = False) -> Tensor:
|
||||
"""Forward pass of TinyGPT.
|
||||
|
||||
Args:
|
||||
input_ids: Token indices of shape (batch_size, seq_len)
|
||||
use_cache: Whether to use caching (not implemented)
|
||||
|
||||
Returns:
|
||||
Logits of shape (batch_size, seq_len, vocab_size)
|
||||
"""
|
||||
batch_size, seq_len = input_ids.shape
|
||||
|
||||
# Convert token indices to one-hot encoding for embedding
|
||||
# This is a simplified approach - in practice, we'd use proper embedding layers
|
||||
one_hot = np.zeros((batch_size, seq_len, self.vocab_size))
|
||||
for b in range(batch_size):
|
||||
for s in range(seq_len):
|
||||
token_id = int(input_ids.data[b, s])
|
||||
if 0 <= token_id < self.vocab_size:
|
||||
one_hot[b, s, token_id] = 1.0
|
||||
|
||||
# Token embeddings (reshape for Dense layer)
|
||||
one_hot_2d = Tensor(one_hot.reshape(-1, self.vocab_size)) # (batch_size * seq_len, vocab_size)
|
||||
x_2d = self.token_embedding.forward(one_hot_2d) # (batch_size * seq_len, d_model)
|
||||
x = Tensor(x_2d.data.reshape(batch_size, seq_len, self.d_model)) # (batch_size, seq_len, d_model)
|
||||
|
||||
# Add positional encoding
|
||||
x = self.positional_encoding.forward(x)
|
||||
|
||||
# Create causal mask
|
||||
mask = create_causal_mask(seq_len)
|
||||
|
||||
# Pass through transformer blocks
|
||||
for block in self.blocks:
|
||||
x = block.forward(x, mask)
|
||||
|
||||
# Final layer norm
|
||||
x = self.ln_final.forward(x)
|
||||
|
||||
# Project to vocabulary (reshape for Dense layer)
|
||||
x_2d = Tensor(x.data.reshape(-1, self.d_model)) # (batch_size * seq_len, d_model)
|
||||
logits_2d = self.output_projection.forward(x_2d) # (batch_size * seq_len, vocab_size)
|
||||
logits = Tensor(logits_2d.data.reshape(batch_size, seq_len, self.vocab_size)) # (batch_size, seq_len, vocab_size)
|
||||
|
||||
return logits
|
||||
|
||||
def generate(self, input_ids: Tensor, max_new_tokens: int = 50,
|
||||
temperature: float = 1.0, do_sample: bool = True) -> Tensor:
|
||||
"""Generate text autoregressively.
|
||||
|
||||
Args:
|
||||
input_ids: Starting token indices of shape (1, seq_len)
|
||||
max_new_tokens: Maximum number of new tokens to generate
|
||||
temperature: Sampling temperature (higher = more random)
|
||||
do_sample: Whether to sample or use greedy decoding
|
||||
|
||||
Returns:
|
||||
Generated token sequence including input
|
||||
"""
|
||||
generated = input_ids.data.copy()
|
||||
|
||||
for _ in range(max_new_tokens):
|
||||
# Forward pass
|
||||
logits = self.forward(Tensor(generated))
|
||||
|
||||
# Get logits for last token
|
||||
next_token_logits = logits.data[0, -1, :] # (vocab_size,)
|
||||
|
||||
# Apply temperature
|
||||
if temperature != 1.0:
|
||||
next_token_logits = next_token_logits / temperature
|
||||
|
||||
# Sample next token
|
||||
if do_sample:
|
||||
# Softmax to get probabilities
|
||||
probs = np.exp(next_token_logits) / np.sum(np.exp(next_token_logits))
|
||||
next_token = np.random.choice(len(probs), p=probs)
|
||||
else:
|
||||
# Greedy decoding
|
||||
next_token = np.argmax(next_token_logits)
|
||||
|
||||
# Append to sequence
|
||||
generated = np.concatenate([
|
||||
generated,
|
||||
np.array([[next_token]])
|
||||
], axis=1)
|
||||
|
||||
# Stop if we hit maximum length
|
||||
if generated.shape[1] >= self.max_length:
|
||||
break
|
||||
|
||||
return Tensor(generated)
|
||||
|
||||
def count_parameters(self) -> int:
|
||||
"""Estimate number of parameters in the model."""
|
||||
params = 0
|
||||
|
||||
# Token embedding: vocab_size * d_model
|
||||
params += self.vocab_size * self.d_model
|
||||
|
||||
# Each transformer block
|
||||
for _ in range(self.num_layers):
|
||||
# Multi-head attention: 4 * d_model * d_model (Q, K, V, O projections)
|
||||
params += 4 * self.d_model * self.d_model
|
||||
|
||||
# Feedforward: d_model * d_ff + d_ff * d_model
|
||||
params += 2 * self.d_model * self.d_ff
|
||||
|
||||
# Layer norms: 2 * 2 * d_model (gamma and beta for each)
|
||||
params += 4 * self.d_model
|
||||
|
||||
# Final layer norm: 2 * d_model
|
||||
params += 2 * self.d_model
|
||||
|
||||
# Output projection: d_model * vocab_size
|
||||
params += self.d_model * self.vocab_size
|
||||
|
||||
return params
|
||||
|
||||
|
||||
class SimpleLM:
|
||||
"""Simplified language model for testing and comparison."""
|
||||
|
||||
def __init__(self, vocab_size: int, d_model: int = 128, d_hidden: int = 256):
|
||||
"""Initialize simple language model.
|
||||
|
||||
Args:
|
||||
vocab_size: Vocabulary size
|
||||
d_model: Embedding dimension
|
||||
d_hidden: Hidden layer dimension
|
||||
"""
|
||||
self.vocab_size = vocab_size
|
||||
self.d_model = d_model
|
||||
self.d_hidden = d_hidden
|
||||
|
||||
# Simple feedforward network using TinyTorch components
|
||||
self.embedding = Dense(vocab_size, d_model)
|
||||
self.hidden = Dense(d_model, d_hidden)
|
||||
self.activation = ReLU()
|
||||
self.output = Dense(d_hidden, vocab_size)
|
||||
|
||||
print(f"🔤 Simple LM initialized: {vocab_size} vocab, {d_model} dim")
|
||||
|
||||
def forward(self, input_ids: Tensor) -> Tensor:
|
||||
"""Forward pass of simple language model."""
|
||||
batch_size, seq_len = input_ids.shape
|
||||
|
||||
# Convert to one-hot
|
||||
one_hot = np.zeros((batch_size, seq_len, self.vocab_size))
|
||||
for b in range(batch_size):
|
||||
for s in range(seq_len):
|
||||
token_id = int(input_ids.data[b, s])
|
||||
if 0 <= token_id < self.vocab_size:
|
||||
one_hot[b, s, token_id] = 1.0
|
||||
|
||||
# Simple feedforward (reshape for Dense layers)
|
||||
one_hot_2d = Tensor(one_hot.reshape(-1, self.vocab_size))
|
||||
x = self.embedding.forward(one_hot_2d)
|
||||
x = self.hidden.forward(x)
|
||||
x = self.activation.forward(x)
|
||||
logits_2d = self.output.forward(x)
|
||||
logits = Tensor(logits_2d.data.reshape(batch_size, seq_len, self.vocab_size))
|
||||
|
||||
return logits
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Test TinyGPT models
|
||||
print("🧪 Testing TinyGPT Models")
|
||||
print("=" * 50)
|
||||
|
||||
# Model parameters
|
||||
vocab_size = 50
|
||||
d_model = 64
|
||||
num_heads = 4
|
||||
num_layers = 2
|
||||
seq_len = 10
|
||||
batch_size = 2
|
||||
|
||||
# Create sample input (token indices)
|
||||
input_ids = Tensor(np.random.randint(0, vocab_size, (batch_size, seq_len)))
|
||||
print(f"Input shape: {input_ids.shape}")
|
||||
print(f"Sample tokens: {input_ids.data[0, :5]}")
|
||||
|
||||
# Test TinyGPT
|
||||
print("\n🤖 TinyGPT:")
|
||||
model = TinyGPT(
|
||||
vocab_size=vocab_size,
|
||||
d_model=d_model,
|
||||
num_heads=num_heads,
|
||||
num_layers=num_layers,
|
||||
max_length=128
|
||||
)
|
||||
|
||||
# Forward pass
|
||||
logits = model.forward(input_ids)
|
||||
print(f"Logits shape: {logits.shape}")
|
||||
print(f"Logits sample: {logits.data[0, 0, :5]}")
|
||||
|
||||
# Test generation
|
||||
print("\n📝 Text Generation:")
|
||||
start_tokens = Tensor(np.array([[1, 2, 3]])) # Start with tokens 1, 2, 3
|
||||
generated = model.generate(start_tokens, max_new_tokens=10, temperature=0.8)
|
||||
print(f"Generated shape: {generated.shape}")
|
||||
print(f"Generated tokens: {generated.data[0]}")
|
||||
|
||||
# Test simple LM for comparison
|
||||
print("\n🔤 Simple LM (for comparison):")
|
||||
simple_model = SimpleLM(vocab_size=vocab_size, d_model=d_model)
|
||||
simple_logits = simple_model.forward(input_ids)
|
||||
print(f"Simple LM logits shape: {simple_logits.shape}")
|
||||
|
||||
# Compare model sizes
|
||||
print("\n📊 Model Comparison:")
|
||||
print(f"TinyGPT parameters: ~{model.count_parameters():,}")
|
||||
simple_params = vocab_size * d_model + d_model * 256 + 256 * vocab_size
|
||||
print(f"Simple LM parameters: ~{simple_params:,}")
|
||||
print(f"TinyGPT is {model.count_parameters() / simple_params:.1f}x larger")
|
||||
|
||||
print("\n✅ Model tests completed!")
|
||||
print("\n💡 Key insights:")
|
||||
print(" • TinyGPT successfully reuses TinyTorch Dense layers")
|
||||
print(" • Transformer architecture much more powerful than simple LM")
|
||||
print(" • Self-attention enables long-range dependencies")
|
||||
print(" • Autoregressive generation works out of the box")
|
||||
print(" • 🎉 Vision and language models share the same foundation!")
|
||||
Reference in New Issue
Block a user