mirror of
https://github.com/KohakuBlueleaf/KohakuHub.git
synced 2026-04-28 18:38:17 -05:00
clean up
This commit is contained in:
@@ -563,53 +563,90 @@ header = b'PACK' + struct.pack('>I', 2) + struct.pack('>I', num_objects)
|
||||
| OFS_DELTA | 6 | Offset delta |
|
||||
| REF_DELTA | 7 | Reference delta |
|
||||
|
||||
### Creating Pack Files with pygit2
|
||||
### Creating Pack Files (Pure Python)
|
||||
|
||||
**KohakuHub uses a pure Python implementation - no native dependencies!**
|
||||
|
||||
```python
|
||||
import pygit2
|
||||
import tempfile
|
||||
import hashlib
|
||||
import struct
|
||||
import zlib
|
||||
|
||||
def create_pack_file(repo: pygit2.Repository, wants: list[str], haves: list[str]) -> bytes:
|
||||
"""Build pack file using pygit2."""
|
||||
def create_pack_file(objects: list[tuple[int, bytes]]) -> bytes:
|
||||
"""Build pack file using pure Python.
|
||||
|
||||
# Get commit OID
|
||||
commit_oid = pygit2.Oid(hex=wants[0])
|
||||
Args:
|
||||
objects: List of (type, object_data_with_header) tuples
|
||||
Types: 1=commit, 2=tree, 3=blob
|
||||
|
||||
# Walk commit tree
|
||||
walker = repo.walk(commit_oid, pygit2.enums.SortMode.TOPOLOGICAL)
|
||||
Returns:
|
||||
Complete pack file bytes
|
||||
"""
|
||||
# Pack header
|
||||
pack_data = b"PACK"
|
||||
pack_data += struct.pack(">I", 2) # Version 2
|
||||
pack_data += struct.pack(">I", len(objects)) # Object count
|
||||
|
||||
# Collect all OIDs to pack
|
||||
oids_to_pack = set()
|
||||
# Add each object
|
||||
for obj_type, obj_data in objects:
|
||||
# Extract content (remove "type size\0" header)
|
||||
null_pos = obj_data.find(b"\0")
|
||||
content = obj_data[null_pos + 1:] if null_pos > 0 else obj_data
|
||||
|
||||
for commit in walker:
|
||||
# Stop if client already has this commit
|
||||
if str(commit.id) in haves:
|
||||
break
|
||||
# Encode object header (type + size in variable-length encoding)
|
||||
header = encode_pack_object_header(obj_type, len(content))
|
||||
|
||||
# Add commit
|
||||
oids_to_pack.add(commit.id)
|
||||
# Compress with zlib
|
||||
compressed = zlib.compress(content)
|
||||
|
||||
# Add tree and blobs recursively
|
||||
collect_tree_objects(repo, commit.tree_id, oids_to_pack)
|
||||
# Add to pack
|
||||
pack_data += header + compressed
|
||||
|
||||
# Use pygit2 PackBuilder
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
pack_builder = pygit2.PackBuilder(repo)
|
||||
# Add pack checksum (SHA-1 of everything)
|
||||
checksum = hashlib.sha1(pack_data).digest()
|
||||
pack_data += checksum
|
||||
|
||||
for oid in oids_to_pack:
|
||||
pack_builder.add(oid)
|
||||
return pack_data
|
||||
|
||||
# Write pack to temp directory
|
||||
pack_builder.write(temp_dir)
|
||||
|
||||
# Read generated pack file
|
||||
pack_files = [f for f in os.listdir(temp_dir) if f.endswith(".pack")]
|
||||
pack_path = os.path.join(temp_dir, pack_files[0])
|
||||
# Complete example - no temp files!
|
||||
async def build_pack(repo_id, branch):
|
||||
# 1. Build blobs (LFS pointers for large files)
|
||||
blobs = {} # path -> (sha1, data_with_header, mode)
|
||||
|
||||
with open(pack_path, "rb") as f:
|
||||
return f.read()
|
||||
for file in files:
|
||||
if is_lfs(file):
|
||||
pointer = create_lfs_pointer(file.sha256, file.size)
|
||||
sha1, blob_data = create_blob_object(pointer)
|
||||
blobs[file.path] = (sha1, blob_data, "100644")
|
||||
else:
|
||||
content = await download(file.path)
|
||||
sha1, blob_data = create_blob_object(content)
|
||||
blobs[file.path] = (sha1, blob_data, "100644")
|
||||
|
||||
# 2. Build trees (pure logic)
|
||||
flat = [(mode, path, sha1) for path, (sha1, data, mode) in blobs.items()]
|
||||
root_tree_sha1, tree_objects = build_nested_trees(flat)
|
||||
|
||||
# 3. Build commit
|
||||
commit_sha1, commit_data = create_commit_object(...)
|
||||
|
||||
# 4. Build pack
|
||||
pack_objects = [(1, commit_data)] # Commit
|
||||
pack_objects.extend(tree_objects) # Trees
|
||||
for path, (sha1, data, mode) in blobs.items():
|
||||
pack_objects.append((3, data)) # Blobs
|
||||
|
||||
return create_pack_file(pack_objects)
|
||||
```
|
||||
|
||||
**Benefits:**
|
||||
- No native dependencies (easier deployment)
|
||||
- Full control over memory usage
|
||||
- No temporary files needed
|
||||
- Easier debugging
|
||||
- Better performance with LFS
|
||||
|
||||
### Empty Pack File
|
||||
|
||||
```python
|
||||
@@ -1381,9 +1418,9 @@ async def list_all_objects(repo, ref):
|
||||
|
||||
### Libraries
|
||||
|
||||
- [pygit2](https://www.pygit2.org/) - Python bindings for libgit2
|
||||
- [FastAPI](https://fastapi.tiangolo.com/) - Modern web framework
|
||||
- [httpx](https://www.python-httpx.org/) - Async HTTP client
|
||||
- Pure Python (stdlib only) - No native dependencies for Git operations
|
||||
|
||||
### Tutorials
|
||||
|
||||
@@ -1399,9 +1436,18 @@ Building a Git-compatible server involves:
|
||||
2. **Implementing core handlers**: Parsing requests, generating pack files
|
||||
3. **Integrating with storage**: Translating Git operations to your backend (LakeFS)
|
||||
4. **Adding authentication**: Token validation and permission checks
|
||||
5. **Optimizing performance**: Caching, streaming, pagination
|
||||
5. **Optimizing performance**: LFS pointers, concurrent processing, chunking
|
||||
6. **Pure Python approach**: No native dependencies, full control, better debugging
|
||||
|
||||
This guide provides a foundation for implementing Git Smart HTTP protocol in any FastAPI application. The KohakuHub implementation demonstrates how to bridge Git operations with LakeFS for version control of machine learning models and datasets.
|
||||
**KohakuHub Implementation Highlights:**
|
||||
- ✅ **Pure Python** - No pygit2, no libgit2, no native dependencies
|
||||
- ✅ **In-memory** - No temporary directories or files
|
||||
- ✅ **LFS integration** - Automatic LFS pointers for large files (>1MB)
|
||||
- ✅ **Concurrent** - Parallel processing with asyncio.gather
|
||||
- ✅ **Memory efficient** - Only downloads small files, pointers for large files
|
||||
- ✅ **Production ready** - Handles repos of any size without OOM
|
||||
|
||||
This demonstrates how to build a complete Git server using only Python stdlib + FastAPI, with full Git LFS support for machine learning models and datasets.
|
||||
|
||||
---
|
||||
|
||||
|
||||
@@ -23,7 +23,6 @@ dependencies = [
|
||||
"peewee",
|
||||
"psycopg2-binary",
|
||||
"pydantic[email]",
|
||||
"pygit2>=1.14.0",
|
||||
"python-multipart",
|
||||
"pyyaml",
|
||||
"questionary",
|
||||
|
||||
307
scripts/generate_test_repo.py
Normal file
307
scripts/generate_test_repo.py
Normal file
@@ -0,0 +1,307 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Generate test repository with nested folders and mixed LFS/non-LFS files."""
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def random_content(size: int) -> bytes:
|
||||
"""Generate random binary content.
|
||||
|
||||
Args:
|
||||
size: Size in bytes
|
||||
|
||||
Returns:
|
||||
Random bytes
|
||||
"""
|
||||
return os.urandom(size)
|
||||
|
||||
|
||||
def random_text_content(size: int) -> bytes:
|
||||
"""Generate random text content.
|
||||
|
||||
Args:
|
||||
size: Size in bytes
|
||||
|
||||
Returns:
|
||||
Random text as bytes
|
||||
"""
|
||||
chars = string.ascii_letters + string.digits + " \n"
|
||||
content = "".join(random.choices(chars, k=size))
|
||||
return content.encode("utf-8")
|
||||
|
||||
|
||||
def create_file(path: Path, size: int, binary: bool = False):
|
||||
"""Create a file with random content.
|
||||
|
||||
Args:
|
||||
path: File path
|
||||
size: File size in bytes
|
||||
binary: If True, use binary content; otherwise text
|
||||
"""
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if binary:
|
||||
content = random_content(size)
|
||||
else:
|
||||
content = random_text_content(size)
|
||||
|
||||
path.write_bytes(content)
|
||||
|
||||
# Compute hashes for verification
|
||||
sha256 = hashlib.sha256(content).hexdigest()
|
||||
sha1 = hashlib.sha1(content).hexdigest()
|
||||
|
||||
print(
|
||||
f" {str(path):<60} {size:>10} bytes sha256:{sha256[:8]} {'[LFS]' if size >= 1_000_000 else ''}"
|
||||
)
|
||||
|
||||
return sha256, size
|
||||
|
||||
|
||||
def generate_test_repo(base_path: str = "test_folder"):
|
||||
"""Generate test repository with nested structure.
|
||||
|
||||
Structure:
|
||||
test_folder/
|
||||
README.md (small text)
|
||||
.gitattributes (LFS config)
|
||||
config/
|
||||
settings.json (small JSON)
|
||||
large_config.yaml (LFS - 2MB)
|
||||
models/
|
||||
small_model.txt (small text)
|
||||
large_model.bin (LFS - 10MB)
|
||||
checkpoints/
|
||||
checkpoint_1.safetensors (LFS - 5MB)
|
||||
checkpoint_2.safetensors (LFS - 5MB)
|
||||
metadata.json (small)
|
||||
data/
|
||||
train/
|
||||
samples/
|
||||
image_001.png (LFS - 1.5MB)
|
||||
image_002.png (LFS - 1.5MB)
|
||||
labels.txt (small)
|
||||
dataset.csv (medium - 500KB)
|
||||
test/
|
||||
results.json (small)
|
||||
docs/
|
||||
guide.md (small)
|
||||
images/
|
||||
diagram.png (LFS - 2MB)
|
||||
screenshot.jpg (LFS - 1MB)
|
||||
scripts/
|
||||
train.py (small)
|
||||
evaluate.py (small)
|
||||
"""
|
||||
base = Path(base_path)
|
||||
|
||||
# Clean up if exists
|
||||
if base.exists():
|
||||
import shutil
|
||||
|
||||
shutil.rmtree(base)
|
||||
|
||||
print(f"\n{'='*100}")
|
||||
print(f"Generating test repository: {base_path}")
|
||||
print(f"LFS threshold: 1,000,000 bytes (1 MB)")
|
||||
print(f"{'='*100}\n")
|
||||
|
||||
files_created = []
|
||||
|
||||
# Root level files
|
||||
print("Root level:")
|
||||
files_created.append(create_file(base / "README.md", 5_000, binary=False))
|
||||
files_created.append(
|
||||
create_file(base / ".gitattributes", 200, binary=False)
|
||||
) # Will overwrite with proper content
|
||||
|
||||
# config/
|
||||
print("\nconfig/:")
|
||||
files_created.append(
|
||||
create_file(base / "config" / "settings.json", 1_500, binary=False)
|
||||
)
|
||||
files_created.append(
|
||||
create_file(base / "config" / "large_config.yaml", 2_000_000, binary=False)
|
||||
) # LFS
|
||||
|
||||
# models/
|
||||
print("\nmodels/:")
|
||||
files_created.append(
|
||||
create_file(base / "models" / "small_model.txt", 50_000, binary=False)
|
||||
)
|
||||
files_created.append(
|
||||
create_file(base / "models" / "large_model.bin", 10_000_000, binary=True)
|
||||
) # LFS
|
||||
|
||||
# models/checkpoints/
|
||||
print("\nmodels/checkpoints/:")
|
||||
files_created.append(
|
||||
create_file(
|
||||
base / "models" / "checkpoints" / "checkpoint_1.safetensors",
|
||||
5_000_000,
|
||||
binary=True,
|
||||
)
|
||||
) # LFS
|
||||
files_created.append(
|
||||
create_file(
|
||||
base / "models" / "checkpoints" / "checkpoint_2.safetensors",
|
||||
5_500_000,
|
||||
binary=True,
|
||||
)
|
||||
) # LFS
|
||||
files_created.append(
|
||||
create_file(
|
||||
base / "models" / "checkpoints" / "metadata.json", 800, binary=False
|
||||
)
|
||||
)
|
||||
|
||||
# data/train/samples/
|
||||
print("\ndata/train/samples/:")
|
||||
files_created.append(
|
||||
create_file(
|
||||
base / "data" / "train" / "samples" / "image_001.png",
|
||||
1_500_000,
|
||||
binary=True,
|
||||
)
|
||||
) # LFS
|
||||
files_created.append(
|
||||
create_file(
|
||||
base / "data" / "train" / "samples" / "image_002.png",
|
||||
1_600_000,
|
||||
binary=True,
|
||||
)
|
||||
) # LFS
|
||||
files_created.append(
|
||||
create_file(
|
||||
base / "data" / "train" / "samples" / "image_003.png",
|
||||
1_400_000,
|
||||
binary=True,
|
||||
)
|
||||
) # LFS
|
||||
files_created.append(
|
||||
create_file(
|
||||
base / "data" / "train" / "samples" / "labels.txt", 3_000, binary=False
|
||||
)
|
||||
)
|
||||
|
||||
# data/train/
|
||||
print("\ndata/train/:")
|
||||
files_created.append(
|
||||
create_file(base / "data" / "train" / "dataset.csv", 500_000, binary=False)
|
||||
)
|
||||
|
||||
# data/test/
|
||||
print("\ndata/test/:")
|
||||
files_created.append(
|
||||
create_file(base / "data" / "test" / "results.json", 2_500, binary=False)
|
||||
)
|
||||
|
||||
# docs/
|
||||
print("\ndocs/:")
|
||||
files_created.append(create_file(base / "docs" / "guide.md", 8_000, binary=False))
|
||||
|
||||
# docs/images/
|
||||
print("\ndocs/images/:")
|
||||
files_created.append(
|
||||
create_file(base / "docs" / "images" / "diagram.png", 2_000_000, binary=True)
|
||||
) # LFS
|
||||
files_created.append(
|
||||
create_file(base / "docs" / "images" / "screenshot.jpg", 1_200_000, binary=True)
|
||||
) # LFS
|
||||
|
||||
# scripts/
|
||||
print("\nscripts/:")
|
||||
files_created.append(
|
||||
create_file(base / "scripts" / "train.py", 4_000, binary=False)
|
||||
)
|
||||
files_created.append(
|
||||
create_file(base / "scripts" / "evaluate.py", 3_500, binary=False)
|
||||
)
|
||||
|
||||
# Generate proper .gitattributes
|
||||
print("\nGenerating .gitattributes...")
|
||||
lfs_files = []
|
||||
regular_files = []
|
||||
|
||||
for sha256, size in files_created:
|
||||
if size >= 1_000_000:
|
||||
lfs_files.append((sha256, size))
|
||||
else:
|
||||
regular_files.append((sha256, size))
|
||||
|
||||
gitattributes_lines = ["# Git LFS tracking\n"]
|
||||
gitattributes_lines.append("*.bin filter=lfs diff=lfs merge=lfs -text\n")
|
||||
gitattributes_lines.append("*.safetensors filter=lfs diff=lfs merge=lfs -text\n")
|
||||
gitattributes_lines.append("*.png filter=lfs diff=lfs merge=lfs -text\n")
|
||||
gitattributes_lines.append("*.jpg filter=lfs diff=lfs merge=lfs -text\n")
|
||||
gitattributes_lines.append(
|
||||
"config/large_config.yaml filter=lfs diff=lfs merge=lfs -text\n"
|
||||
)
|
||||
|
||||
(base / ".gitattributes").write_text("".join(gitattributes_lines))
|
||||
|
||||
# Summary
|
||||
print(f"\n{'='*100}")
|
||||
print("Summary:")
|
||||
print(f" Total files: {len(files_created)}")
|
||||
print(f" LFS files (>=1MB): {len(lfs_files)}")
|
||||
print(f" Regular files (<1MB): {len(regular_files)}")
|
||||
|
||||
total_size = sum(size for _, size in files_created)
|
||||
lfs_size = sum(size for _, size in lfs_files)
|
||||
regular_size = sum(size for _, size in regular_files)
|
||||
|
||||
print(f"\n Total size: {total_size / 1024 / 1024:.2f} MB")
|
||||
print(f" LFS size: {lfs_size / 1024 / 1024:.2f} MB")
|
||||
print(f" Regular size: {regular_size / 1024:.2f} KB")
|
||||
|
||||
print(f"\n Directory structure:")
|
||||
print(f" - Root: 2 files")
|
||||
print(f" - config/: 2 files")
|
||||
print(f" - models/: 2 files")
|
||||
print(f" - models/checkpoints/: 3 files")
|
||||
print(f" - data/train/: 1 file")
|
||||
print(f" - data/train/samples/: 4 files")
|
||||
print(f" - data/test/: 1 file")
|
||||
print(f" - docs/: 1 file")
|
||||
print(f" - docs/images/: 2 files")
|
||||
print(f" - scripts/: 2 files")
|
||||
|
||||
print(f"\n Test repository created at: {base.absolute()}")
|
||||
print(f"{'='*100}\n")
|
||||
|
||||
# Create file listing
|
||||
file_list_path = base / "FILE_LIST.txt"
|
||||
with open(file_list_path, "w") as f:
|
||||
f.write("# File listing for test repository\n\n")
|
||||
for root, dirs, files in os.walk(base):
|
||||
level = root.replace(str(base), "").count(os.sep)
|
||||
indent = " " * 2 * level
|
||||
f.write(f"{indent}{os.path.basename(root)}/\n")
|
||||
sub_indent = " " * 2 * (level + 1)
|
||||
for file in files:
|
||||
if file == "FILE_LIST.txt":
|
||||
continue
|
||||
file_path = Path(root) / file
|
||||
size = file_path.stat().st_size
|
||||
lfs_marker = "[LFS]" if size >= 1_000_000 else ""
|
||||
f.write(f"{sub_indent}{file} ({size} bytes) {lfs_marker}\n")
|
||||
|
||||
print(f"File listing saved to: {file_list_path}\n")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
base_path = sys.argv[1] if len(sys.argv) > 1 else "test_folder"
|
||||
generate_test_repo(base_path)
|
||||
print("✅ Test repository generated successfully!")
|
||||
print("\nNext steps:")
|
||||
print(" 1. Upload to KohakuHub via API/CLI")
|
||||
print(" 2. Test git clone")
|
||||
print(" 3. Verify folder structure matches")
|
||||
print(" 4. Test git lfs pull for large files")
|
||||
@@ -10,7 +10,7 @@ from fastapi import APIRouter, Header, HTTPException, Request, Response
|
||||
from kohakuhub.db import Repository, Token, User
|
||||
from kohakuhub.db_async import execute_db_query
|
||||
from kohakuhub.logger import get_logger
|
||||
from kohakuhub.api.utils.git_lakefs_bridge_pure import GitLakeFSBridgePure
|
||||
from kohakuhub.api.utils.git_lakefs_bridge import GitLakeFSBridge
|
||||
from kohakuhub.api.utils.git_server import (
|
||||
GitReceivePackHandler,
|
||||
GitUploadPackHandler,
|
||||
@@ -130,7 +130,7 @@ async def git_info_refs(
|
||||
raise HTTPException(400, detail=f"Unknown service: {service}")
|
||||
|
||||
# Get refs from LakeFS using the repository's actual type
|
||||
bridge = GitLakeFSBridgePure(repo.repo_type, namespace, name)
|
||||
bridge = GitLakeFSBridge(repo.repo_type, namespace, name)
|
||||
refs = await bridge.get_refs(branch="main")
|
||||
|
||||
# Generate service advertisement
|
||||
@@ -197,7 +197,7 @@ async def git_upload_pack(
|
||||
request_body = await request.body()
|
||||
|
||||
# Create bridge for LakeFS integration using the repository's actual type
|
||||
bridge = GitLakeFSBridgePure(repo.repo_type, namespace, name)
|
||||
bridge = GitLakeFSBridge(repo.repo_type, namespace, name)
|
||||
|
||||
# Handle upload-pack
|
||||
handler = GitUploadPackHandler(repo_id, bridge=bridge)
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,488 +0,0 @@
|
||||
"""Pure Python Git-LakeFS bridge - NO pygit2, NO file I/O, pure logic only."""
|
||||
|
||||
import asyncio
|
||||
import fnmatch
|
||||
import hashlib
|
||||
from datetime import datetime
|
||||
|
||||
from kohakuhub.config import cfg
|
||||
from kohakuhub.db import File
|
||||
from kohakuhub.db_async import execute_db_query
|
||||
from kohakuhub.logger import get_logger
|
||||
from kohakuhub.api.utils.git_objects import (
|
||||
build_nested_trees,
|
||||
compute_git_object_sha1,
|
||||
create_blob_object,
|
||||
create_commit_object,
|
||||
create_empty_pack_file,
|
||||
create_pack_file,
|
||||
)
|
||||
from kohakuhub.api.utils.git_server import create_empty_pack
|
||||
from kohakuhub.api.utils.lakefs import get_lakefs_client, lakefs_repo_name
|
||||
|
||||
logger = get_logger("GIT_LAKEFS_PURE")
|
||||
|
||||
|
||||
def create_lfs_pointer(sha256: str, size: int) -> bytes:
|
||||
"""Create Git LFS pointer file content."""
|
||||
pointer = f"""version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:{sha256}
|
||||
size {size}
|
||||
"""
|
||||
return pointer.encode("utf-8")
|
||||
|
||||
|
||||
def generate_lfsconfig(base_url: str, namespace: str, name: str) -> bytes:
|
||||
"""Generate .lfsconfig."""
|
||||
lfs_url = f"{base_url}/{namespace}/{name}.git/info/lfs"
|
||||
config = f"""[lfs]
|
||||
\turl = {lfs_url}
|
||||
"""
|
||||
return config.encode("utf-8")
|
||||
|
||||
|
||||
class GitLakeFSBridgePure:
|
||||
"""Pure Python Git-LakeFS bridge - in-memory only."""
|
||||
|
||||
def __init__(self, repo_type: str, namespace: str, name: str):
|
||||
self.repo_type = repo_type
|
||||
self.namespace = namespace
|
||||
self.name = name
|
||||
self.repo_id = f"{namespace}/{name}"
|
||||
self.lakefs_repo = lakefs_repo_name(repo_type, self.repo_id)
|
||||
self.lakefs_client = get_lakefs_client()
|
||||
|
||||
async def get_refs(self, branch: str = "main") -> dict[str, str]:
|
||||
"""Get Git refs - pure logical, no temp files."""
|
||||
try:
|
||||
# Get branch info
|
||||
branch_info = await self.lakefs_client.get_branch(
|
||||
repository=self.lakefs_repo, branch=branch
|
||||
)
|
||||
|
||||
commit_id = branch_info.get("commit_id")
|
||||
if not commit_id:
|
||||
return {}
|
||||
|
||||
# Build commit SHA-1 in memory
|
||||
commit_sha1 = await self._build_commit_sha1(branch, commit_id)
|
||||
|
||||
if not commit_sha1:
|
||||
return {}
|
||||
|
||||
return {
|
||||
f"refs/heads/{branch}": commit_sha1,
|
||||
"HEAD": commit_sha1,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to get refs for {self.repo_id}", e)
|
||||
return {}
|
||||
|
||||
async def _build_commit_sha1(self, branch: str, commit_id: str) -> str | None:
|
||||
"""Build Git commit SHA-1 purely in memory - no files created."""
|
||||
try:
|
||||
# Step 1: Get all objects from LakeFS
|
||||
all_objects = []
|
||||
after = ""
|
||||
has_more = True
|
||||
|
||||
while has_more:
|
||||
result = await self.lakefs_client.list_objects(
|
||||
repository=self.lakefs_repo,
|
||||
ref=branch,
|
||||
prefix="",
|
||||
after=after,
|
||||
amount=1000,
|
||||
)
|
||||
all_objects.extend(result.get("results", []))
|
||||
pagination = result.get("pagination", {})
|
||||
has_more = pagination.get("has_more", False)
|
||||
if has_more:
|
||||
after = pagination.get("next_offset", "")
|
||||
|
||||
file_objects = [
|
||||
obj for obj in all_objects if obj.get("path_type") == "object"
|
||||
]
|
||||
logger.info(f"Found {len(file_objects)} files in LakeFS")
|
||||
|
||||
if not file_objects:
|
||||
return None
|
||||
|
||||
# Step 2: Build blob objects (in memory, no download for large files!)
|
||||
blob_data = await self._build_blob_sha1s(file_objects, branch)
|
||||
|
||||
if not blob_data:
|
||||
return None
|
||||
|
||||
# Step 3: Build nested tree structure (pure logic, no I/O)
|
||||
flat_entries = [
|
||||
(mode, path, sha1) for path, (sha1, _, mode) in blob_data.items()
|
||||
]
|
||||
root_tree_sha1, tree_objects = build_nested_trees(flat_entries)
|
||||
|
||||
logger.success(
|
||||
f"Built tree structure: {len(tree_objects)} tree objects, root={root_tree_sha1[:8]}"
|
||||
)
|
||||
|
||||
# Step 4: Build commit object (in memory)
|
||||
commit_info = await self.lakefs_client.get_commit(
|
||||
repository=self.lakefs_repo, commit_id=commit_id
|
||||
)
|
||||
|
||||
author_name = commit_info.get("committer", "KohakuHub")
|
||||
message = commit_info.get("message", "Initial commit")
|
||||
timestamp = commit_info.get("creation_date", 0)
|
||||
|
||||
# Parse timestamp
|
||||
if isinstance(timestamp, str):
|
||||
try:
|
||||
dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
|
||||
timestamp = int(dt.timestamp())
|
||||
except:
|
||||
timestamp = 0
|
||||
|
||||
# Create commit object
|
||||
commit_sha1, commit_data = create_commit_object(
|
||||
tree_sha1=root_tree_sha1,
|
||||
parent_sha1s=[], # No parents for now
|
||||
author_name=author_name,
|
||||
author_email="noreply@kohakuhub.local",
|
||||
committer_name=author_name,
|
||||
committer_email="noreply@kohakuhub.local",
|
||||
author_timestamp=timestamp,
|
||||
committer_timestamp=timestamp,
|
||||
timezone="+0000",
|
||||
message=message,
|
||||
)
|
||||
|
||||
logger.success(f"Created commit: {commit_sha1}")
|
||||
|
||||
return commit_sha1
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Failed to build commit SHA-1", e)
|
||||
return None
|
||||
|
||||
async def _build_blob_sha1s(
|
||||
self, file_objects: list[dict], branch: str
|
||||
) -> dict[str, tuple[str, bytes, str]]:
|
||||
"""Build blob SHA-1s AND data for all files (LFS pointers for large files).
|
||||
|
||||
Returns:
|
||||
Dict of path -> (blob_sha1, blob_data, mode)
|
||||
"""
|
||||
|
||||
# Get File table records for LFS tracking
|
||||
def _get_all_files():
|
||||
return {
|
||||
f.path_in_repo: f
|
||||
for f in File.select().where(File.repo_full_id == self.repo_id)
|
||||
}
|
||||
|
||||
file_records = await execute_db_query(_get_all_files)
|
||||
|
||||
# Check for .gitattributes and parse LFS patterns
|
||||
existing_lfs_patterns = set()
|
||||
gitattributes_obj = None
|
||||
gitattributes_content = None
|
||||
|
||||
for obj in file_objects:
|
||||
if obj["path"] == ".gitattributes":
|
||||
gitattributes_obj = obj
|
||||
try:
|
||||
gitattributes_content = await self.lakefs_client.get_object(
|
||||
repository=self.lakefs_repo, ref=branch, path=".gitattributes"
|
||||
)
|
||||
existing_lfs_patterns = self._parse_gitattributes(
|
||||
gitattributes_content.decode("utf-8")
|
||||
)
|
||||
logger.info(
|
||||
f"Found .gitattributes with {len(existing_lfs_patterns)} LFS patterns"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to parse .gitattributes: {e}")
|
||||
break
|
||||
|
||||
# Classify files
|
||||
small_files = []
|
||||
large_files = []
|
||||
|
||||
for obj in file_objects:
|
||||
if obj["path"] == ".gitattributes":
|
||||
continue
|
||||
|
||||
path = obj["path"]
|
||||
size = obj.get("size_bytes", 0)
|
||||
file_record = file_records.get(path)
|
||||
|
||||
# Should be LFS if:
|
||||
# 1. Marked in File table, OR
|
||||
# 2. Size >= threshold, OR
|
||||
# 3. Matches existing LFS pattern
|
||||
should_be_lfs = (
|
||||
(file_record and file_record.lfs)
|
||||
or size >= cfg.app.lfs_threshold_bytes
|
||||
or self._matches_pattern(path, existing_lfs_patterns)
|
||||
)
|
||||
|
||||
if should_be_lfs:
|
||||
large_files.append((obj, file_record))
|
||||
else:
|
||||
small_files.append((obj, file_record))
|
||||
|
||||
logger.info(
|
||||
f"Classified: {len(small_files)} regular files, {len(large_files)} LFS files"
|
||||
)
|
||||
|
||||
# Process files concurrently - return (path, sha1, data, mode)
|
||||
async def process_small(obj, file_record):
|
||||
"""Download and create blob."""
|
||||
path = obj["path"]
|
||||
try:
|
||||
content = await self.lakefs_client.get_object(
|
||||
repository=self.lakefs_repo, ref=branch, path=path
|
||||
)
|
||||
sha1, blob_with_header = create_blob_object(content)
|
||||
logger.debug(f"Blob {path}: {len(content)} bytes → {sha1[:8]}")
|
||||
return path, sha1, blob_with_header, "100644", False
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to download {path}: {e}")
|
||||
return None
|
||||
|
||||
async def process_large(obj, file_record):
|
||||
"""Create LFS pointer blob."""
|
||||
path = obj["path"]
|
||||
try:
|
||||
# Use File table SHA256
|
||||
if file_record and file_record.lfs:
|
||||
sha256 = file_record.sha256
|
||||
size = file_record.size
|
||||
else:
|
||||
# Fallback to LakeFS stat
|
||||
stat = await self.lakefs_client.stat_object(
|
||||
repository=self.lakefs_repo, ref=branch, path=path
|
||||
)
|
||||
size = stat.get("size_bytes", 0)
|
||||
checksum = stat.get("checksum", "")
|
||||
sha256 = (
|
||||
checksum.replace("sha256:", "")
|
||||
if checksum.startswith("sha256:")
|
||||
else ""
|
||||
)
|
||||
|
||||
if not sha256:
|
||||
# Last resort: download
|
||||
content = await self.lakefs_client.get_object(
|
||||
repository=self.lakefs_repo, ref=branch, path=path
|
||||
)
|
||||
sha256 = hashlib.sha256(content).hexdigest()
|
||||
size = len(content)
|
||||
|
||||
# Create LFS pointer
|
||||
pointer = create_lfs_pointer(sha256, size)
|
||||
sha1, blob_with_header = create_blob_object(pointer)
|
||||
logger.debug(f"LFS {path}: {size} bytes → pointer {sha1[:8]}")
|
||||
return path, sha1, blob_with_header, "100644", True
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to create LFS pointer for {path}: {e}")
|
||||
return None
|
||||
|
||||
# Process concurrently
|
||||
small_results = await asyncio.gather(
|
||||
*[process_small(obj, rec) for obj, rec in small_files]
|
||||
)
|
||||
large_results = await asyncio.gather(
|
||||
*[process_large(obj, rec) for obj, rec in large_files]
|
||||
)
|
||||
|
||||
# Collect results
|
||||
blob_data = {} # path -> (sha1, blob_data_with_header, mode)
|
||||
lfs_paths = []
|
||||
|
||||
for result in small_results:
|
||||
if result:
|
||||
path, sha1, data, mode, is_lfs = result
|
||||
blob_data[path] = (sha1, data, mode)
|
||||
|
||||
for result in large_results:
|
||||
if result:
|
||||
path, sha1, data, mode, is_lfs = result
|
||||
blob_data[path] = (sha1, data, mode)
|
||||
lfs_paths.append(path)
|
||||
|
||||
# Add .gitattributes
|
||||
if gitattributes_obj and gitattributes_content:
|
||||
sha1, blob_with_header = create_blob_object(gitattributes_content)
|
||||
blob_data[".gitattributes"] = (sha1, blob_with_header, "100644")
|
||||
elif lfs_paths:
|
||||
gitattributes = self._generate_gitattributes(lfs_paths)
|
||||
sha1, blob_with_header = create_blob_object(gitattributes)
|
||||
blob_data[".gitattributes"] = (sha1, blob_with_header, "100644")
|
||||
|
||||
# Add .lfsconfig
|
||||
if lfs_paths:
|
||||
lfsconfig = generate_lfsconfig(cfg.app.base_url, self.namespace, self.name)
|
||||
sha1, blob_with_header = create_blob_object(lfsconfig)
|
||||
blob_data[".lfsconfig"] = (sha1, blob_with_header, "100644")
|
||||
|
||||
return blob_data
|
||||
|
||||
def _parse_gitattributes(self, content: str) -> set[str]:
|
||||
"""Parse LFS patterns from .gitattributes."""
|
||||
patterns = set()
|
||||
for line in content.splitlines():
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
if "filter=lfs" in line:
|
||||
parts = line.split()
|
||||
if parts:
|
||||
patterns.add(parts[0])
|
||||
return patterns
|
||||
|
||||
def _matches_pattern(self, path: str, patterns: set[str]) -> bool:
|
||||
"""Check if path matches any LFS pattern."""
|
||||
for pattern in patterns:
|
||||
if fnmatch.fnmatch(path, pattern):
|
||||
return True
|
||||
# Also check basename for *.ext patterns
|
||||
if pattern.startswith("*."):
|
||||
basename = path.split("/")[-1]
|
||||
if fnmatch.fnmatch(basename, pattern):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _generate_gitattributes(self, lfs_paths: list[str]) -> bytes:
|
||||
"""Generate .gitattributes for specific LFS files."""
|
||||
lines = ["# Git LFS tracking (auto-generated)\n\n"]
|
||||
for path in sorted(lfs_paths):
|
||||
lines.append(f"{path} filter=lfs diff=lfs merge=lfs -text\n")
|
||||
return "".join(lines).encode("utf-8")
|
||||
|
||||
async def build_pack_file(
|
||||
self, wants: list[str], haves: list[str], branch: str = "main"
|
||||
) -> bytes:
|
||||
"""Build Git pack file - pure in-memory, no temp dirs.
|
||||
|
||||
Args:
|
||||
wants: Commit SHAs client wants
|
||||
haves: Commit SHAs client has (ignored for now)
|
||||
branch: Branch name
|
||||
|
||||
Returns:
|
||||
Pack file bytes
|
||||
"""
|
||||
try:
|
||||
# Get all objects from LakeFS
|
||||
all_objects = []
|
||||
after = ""
|
||||
has_more = True
|
||||
|
||||
while has_more:
|
||||
result = await self.lakefs_client.list_objects(
|
||||
repository=self.lakefs_repo,
|
||||
ref=branch,
|
||||
prefix="",
|
||||
after=after,
|
||||
amount=1000,
|
||||
)
|
||||
all_objects.extend(result.get("results", []))
|
||||
pagination = result.get("pagination", {})
|
||||
has_more = pagination.get("has_more", False)
|
||||
if has_more:
|
||||
after = pagination.get("next_offset", "")
|
||||
|
||||
file_objects = [
|
||||
obj for obj in all_objects if obj.get("path_type") == "object"
|
||||
]
|
||||
|
||||
if not file_objects:
|
||||
logger.warning("No files found, returning empty pack")
|
||||
return create_empty_pack()
|
||||
|
||||
# Build blob objects with data (in memory, LFS pointers for large files!)
|
||||
blob_data = await self._build_blob_sha1s(file_objects, branch)
|
||||
|
||||
if not blob_data:
|
||||
logger.warning("No blobs created, returning empty pack")
|
||||
return create_empty_pack()
|
||||
|
||||
# Build tree objects (pure logic, no I/O)
|
||||
flat_entries = [
|
||||
(mode, path, sha1) for path, (sha1, _, mode) in blob_data.items()
|
||||
]
|
||||
root_tree_sha1, tree_objects = build_nested_trees(flat_entries)
|
||||
|
||||
logger.info(
|
||||
f"Built {len(tree_objects)} tree objects, root={root_tree_sha1[:8]}"
|
||||
)
|
||||
|
||||
# Get branch info for commit
|
||||
branch_info = await self.lakefs_client.get_branch(
|
||||
repository=self.lakefs_repo, branch=branch
|
||||
)
|
||||
|
||||
commit_id = branch_info.get("commit_id")
|
||||
if not commit_id:
|
||||
logger.warning("No commit_id in branch, using defaults")
|
||||
author_name = "KohakuHub"
|
||||
message = "Initial commit"
|
||||
timestamp = 0
|
||||
else:
|
||||
# Build commit object
|
||||
commit_info = await self.lakefs_client.get_commit(
|
||||
repository=self.lakefs_repo, commit_id=commit_id
|
||||
)
|
||||
|
||||
author_name = commit_info.get("committer", "KohakuHub")
|
||||
message = commit_info.get("message", "Initial commit")
|
||||
timestamp = commit_info.get("creation_date", 0)
|
||||
|
||||
if isinstance(timestamp, str):
|
||||
try:
|
||||
dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
|
||||
timestamp = int(dt.timestamp())
|
||||
except:
|
||||
timestamp = 0
|
||||
|
||||
commit_sha1, commit_with_header = create_commit_object(
|
||||
tree_sha1=root_tree_sha1,
|
||||
parent_sha1s=[],
|
||||
author_name=author_name,
|
||||
author_email="noreply@kohakuhub.local",
|
||||
committer_name=author_name,
|
||||
committer_email="noreply@kohakuhub.local",
|
||||
author_timestamp=timestamp,
|
||||
committer_timestamp=timestamp,
|
||||
timezone="+0000",
|
||||
message=message,
|
||||
)
|
||||
|
||||
# Build pack file (in memory) - ALL objects
|
||||
pack_objects = []
|
||||
|
||||
# Add commit (type 1)
|
||||
pack_objects.append((1, commit_with_header))
|
||||
|
||||
# Add trees (type 2)
|
||||
pack_objects.extend(tree_objects)
|
||||
|
||||
# Add blobs (type 3) - includes LFS pointers!
|
||||
for path, (sha1, blob_with_header, mode) in blob_data.items():
|
||||
pack_objects.append((3, blob_with_header))
|
||||
|
||||
logger.info(
|
||||
f"Pack contains: {len(pack_objects)} objects (1 commit + {len(tree_objects)} trees + {len(blob_data)} blobs)"
|
||||
)
|
||||
|
||||
# Create pack
|
||||
pack_bytes = create_pack_file(pack_objects)
|
||||
|
||||
logger.success(f"Created pack: {len(pack_bytes)} bytes")
|
||||
|
||||
return pack_bytes
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Failed to build pack file", e)
|
||||
return create_empty_pack()
|
||||
@@ -284,14 +284,8 @@ def build_nested_trees(
|
||||
for mode, name, entry_sha1 in dir_contents[dir_path]:
|
||||
entries.append((mode, name, entry_sha1))
|
||||
|
||||
# DEBUG: Log what we're adding to this directory
|
||||
print(
|
||||
f"DEBUG: Building tree for '{dir_path}', files: {len(dir_contents[dir_path])}"
|
||||
)
|
||||
|
||||
# Add subdirectories that have been processed (bottom-up)
|
||||
# Find all direct children of this directory
|
||||
subdirs_added = 0
|
||||
for child_dir_path, child_tree_sha1 in dir_sha1s.items():
|
||||
# Check if child_dir_path is a direct child of dir_path
|
||||
if dir_path == "":
|
||||
@@ -302,10 +296,6 @@ def build_nested_trees(
|
||||
# Check not already added as file
|
||||
if not any(name == dir_name for _, name, _ in entries):
|
||||
entries.append(("40000", dir_name, child_tree_sha1))
|
||||
subdirs_added += 1
|
||||
print(
|
||||
f" DEBUG: Added subdir to root: {dir_name} → {child_tree_sha1[:8]}"
|
||||
)
|
||||
else:
|
||||
# Non-root directory - find direct children
|
||||
prefix = dir_path + "/"
|
||||
@@ -316,27 +306,17 @@ def build_nested_trees(
|
||||
dir_name = remainder
|
||||
if not any(name == dir_name for _, name, _ in entries):
|
||||
entries.append(("40000", dir_name, child_tree_sha1))
|
||||
subdirs_added += 1
|
||||
print(
|
||||
f" DEBUG: Added subdir to {dir_path}: {dir_name} → {child_tree_sha1[:8]}"
|
||||
)
|
||||
|
||||
print(
|
||||
f" DEBUG: Total entries: {len(entries)} ({len(dir_contents[dir_path])} files + {subdirs_added} subdirs)"
|
||||
)
|
||||
|
||||
# Create tree object for this directory
|
||||
if entries:
|
||||
tree_sha1, tree_data = create_tree_object(entries)
|
||||
dir_sha1s[dir_path] = tree_sha1
|
||||
tree_objects.append((2, tree_data)) # Type 2 = tree
|
||||
print(f" DEBUG: Created tree {tree_sha1[:8]} with {len(entries)} entries")
|
||||
else:
|
||||
# Empty directory - create empty tree
|
||||
tree_sha1, tree_data = create_tree_object([])
|
||||
dir_sha1s[dir_path] = tree_sha1
|
||||
tree_objects.append((2, tree_data))
|
||||
print(f" DEBUG: Created empty tree {tree_sha1[:8]}")
|
||||
|
||||
# Return root tree SHA-1
|
||||
root_sha1 = dir_sha1s.get("")
|
||||
|
||||
@@ -7,8 +7,6 @@ import base64
|
||||
import hashlib
|
||||
import struct
|
||||
|
||||
import pygit2
|
||||
|
||||
from kohakuhub.logger import get_logger
|
||||
|
||||
logger = get_logger("GIT")
|
||||
|
||||
Reference in New Issue
Block a user