Fix storage tracking of deleted file, Fix LFS dedup for global wise dedup

This commit is contained in:
Kohaku-Blueleaf
2025-10-18 03:42:00 +08:00
parent d3557d5e44
commit 1224aa5b64
10 changed files with 655 additions and 101 deletions

View File

@@ -0,0 +1,248 @@
#!/usr/bin/env python3
"""
Migration 011: Add soft delete support for File model.
Changes:
1. Add File.is_deleted column (default FALSE)
2. Change LFSObjectHistory.file FK: on_delete CASCADE → SET NULL
This allows:
- File deletion to mark as deleted instead of removing from database
- LFSObjectHistory to persist for quota tracking even after file deletion
- Only squash/move/delete repo will actually delete File entries
"""
import sys
import os
# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src"))
# Add db_migrations to path (for _migration_utils)
sys.path.insert(0, os.path.dirname(__file__))
from kohakuhub.config import cfg
from kohakuhub.db import db
from _migration_utils import check_column_exists, should_skip_due_to_future_migrations
MIGRATION_NUMBER = 11
def is_applied(db, cfg):
"""Check if THIS migration has been applied.
Returns True if File.is_deleted column exists.
"""
return check_column_exists(db, cfg, "file", "is_deleted")
def check_migration_needed():
"""Check if this migration needs to run by checking if column exists."""
cursor = db.cursor()
if cfg.app.db_backend == "postgres":
# Check if File.is_deleted exists
cursor.execute(
"""
SELECT column_name
FROM information_schema.columns
WHERE table_name='file' AND column_name='is_deleted'
"""
)
return cursor.fetchone() is None
else:
# SQLite: Check via PRAGMA
cursor.execute("PRAGMA table_info(file)")
columns = [row[1] for row in cursor.fetchall()]
return "is_deleted" not in columns
def migrate_sqlite():
"""Migrate SQLite database.
Note: This function runs inside a transaction (db.atomic()).
Do NOT call db.commit() or db.rollback() inside this function.
"""
cursor = db.cursor()
# Step 1: Add is_deleted column to File table
try:
cursor.execute(
"ALTER TABLE file ADD COLUMN is_deleted INTEGER DEFAULT 0 NOT NULL"
)
print(" [OK] Added File.is_deleted column")
except Exception as e:
if "duplicate column" in str(e).lower():
print(" - File.is_deleted already exists")
else:
raise
# Step 2: Update LFSObjectHistory FK constraint (CASCADE -> SET NULL)
# SQLite doesn't support ALTER COLUMN for FK, so we need to:
# 1. Create new table with correct FK
# 2. Copy data
# 3. Drop old table
# 4. Rename new table
print(" [INFO] Updating LFSObjectHistory.file FK constraint...")
# Check if lfsobjecthistory_new exists from a previous failed migration
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='lfsobjecthistory_new'"
)
if cursor.fetchone():
cursor.execute("DROP TABLE lfsobjecthistory_new")
print(" - Dropped stale lfsobjecthistory_new table")
# Create new table with SET NULL constraint
cursor.execute(
"""
CREATE TABLE lfsobjecthistory_new (
id INTEGER NOT NULL PRIMARY KEY,
repository_id INTEGER NOT NULL,
path_in_repo VARCHAR(255) NOT NULL,
sha256 VARCHAR(255) NOT NULL,
size INTEGER NOT NULL,
commit_id VARCHAR(255) NOT NULL,
file_id INTEGER,
created_at DATETIME NOT NULL,
FOREIGN KEY (repository_id) REFERENCES repository (id) ON DELETE CASCADE,
FOREIGN KEY (file_id) REFERENCES file (id) ON DELETE SET NULL
)
"""
)
print(" [OK] Created lfsobjecthistory_new table with SET NULL constraint")
# Copy data
cursor.execute(
"""
INSERT INTO lfsobjecthistory_new (id, repository_id, path_in_repo, sha256, size, commit_id, file_id, created_at)
SELECT id, repository_id, path_in_repo, sha256, size, commit_id, file_id, created_at
FROM lfsobjecthistory
"""
)
rows_copied = cursor.rowcount
print(f" [OK] Copied {rows_copied} rows to new table")
# Drop old table
cursor.execute("DROP TABLE lfsobjecthistory")
print(" [OK] Dropped old lfsobjecthistory table")
# Rename new table
cursor.execute("ALTER TABLE lfsobjecthistory_new RENAME TO lfsobjecthistory")
print(" [OK] Renamed lfsobjecthistory_new -> lfsobjecthistory")
# Recreate indexes
cursor.execute(
"CREATE INDEX lfsobjecthistory_repository_id ON lfsobjecthistory (repository_id)"
)
cursor.execute(
"CREATE INDEX lfsobjecthistory_file_id ON lfsobjecthistory (file_id)"
)
cursor.execute(
"CREATE INDEX lfsobjecthistory_commit_id ON lfsobjecthistory (commit_id)"
)
cursor.execute("CREATE INDEX lfsobjecthistory_sha256 ON lfsobjecthistory (sha256)")
cursor.execute(
"CREATE INDEX lfsobjecthistory_path_in_repo ON lfsobjecthistory (path_in_repo)"
)
cursor.execute(
"CREATE INDEX lfsobjecthistory_repository_id_path_in_repo ON lfsobjecthistory (repository_id, path_in_repo)"
)
print(" [OK] Recreated indexes on lfsobjecthistory")
def migrate_postgres():
"""Migrate PostgreSQL database.
Note: This function runs inside a transaction (db.atomic()).
Do NOT call db.commit() or db.rollback() inside this function.
"""
cursor = db.cursor()
# Step 1: Add is_deleted column to File table
try:
cursor.execute(
"ALTER TABLE file ADD COLUMN is_deleted BOOLEAN DEFAULT FALSE NOT NULL"
)
print(" [OK] Added File.is_deleted column")
except Exception as e:
if "already exists" in str(e).lower():
print(" - File.is_deleted already exists")
else:
raise
# Step 2: Update LFSObjectHistory FK constraint (CASCADE -> SET NULL)
print(" [INFO] Updating LFSObjectHistory.file FK constraint...")
try:
# Drop existing FK constraint
cursor.execute(
"""
ALTER TABLE lfsobjecthistory
DROP CONSTRAINT IF EXISTS lfsobjecthistory_file_id_fkey
"""
)
print(" [OK] Dropped old FK constraint")
# Add new FK constraint with SET NULL
cursor.execute(
"""
ALTER TABLE lfsobjecthistory
ADD CONSTRAINT lfsobjecthistory_file_id_fkey
FOREIGN KEY (file_id) REFERENCES file (id) ON DELETE SET NULL
"""
)
print(" [OK] Added new FK constraint with ON DELETE SET NULL")
except Exception as e:
if "does not exist" in str(e).lower():
print(" - FK constraint already correct or doesn't need updating")
else:
raise
def run():
"""Run this migration.
IMPORTANT: Do NOT call db.close() in finally block!
The db connection is managed by run_migrations.py and should stay open
across all migrations to avoid stdout/stderr closure issues on Windows.
"""
db.connect(reuse_if_open=True)
try:
# Pre-flight checks (outside transaction for performance)
if should_skip_due_to_future_migrations(MIGRATION_NUMBER, db, cfg):
print("Migration 011: Skipped (superseded by future migration)")
return True
if not check_migration_needed():
print("Migration 011: Already applied (File.is_deleted exists)")
return True
print("Migration 011: Adding File soft delete support...")
# Run migration in a transaction - will auto-rollback on exception
with db.atomic():
if cfg.app.db_backend == "postgres":
migrate_postgres()
else:
migrate_sqlite()
print("Migration 011: [DONE] Completed")
return True
except Exception as e:
# Transaction automatically rolled back if we reach here
print(f"Migration 011: [FAILED] {e}")
print(" All changes have been rolled back")
import traceback
traceback.print_exc()
return False
# NOTE: No finally block - db connection stays open
if __name__ == "__main__":
success = run()
sys.exit(0 if success else 1)

View File

@@ -903,21 +903,23 @@ async def get_repository_admin(
# Get owner (using FK)
owner = repo.owner
# Count files (using FK)
file_count = File.select().where(File.repository == repo).count()
# Count active files only (using FK)
file_count = (
File.select()
.where((File.repository == repo) & (File.is_deleted == False))
.count()
)
# Count commits (using FK)
commit_count = Commit.select().where(Commit.repository == repo).count()
# Get total file size (using FK)
# Get total file size for active files only (using FK)
total_size = (
File.select()
.where(File.repository == repo)
.select(File.size)
.scalar(as_tuple=False)
File.select(fn.SUM(File.size).alias("total"))
.where((File.repository == repo) & (File.is_deleted == False))
.scalar()
or 0
)
if total_size is None:
total_size = 0
# Get storage info
storage_info = get_repo_storage_info(repo)
@@ -1336,9 +1338,10 @@ async def get_top_repositories(
)
else: # by size
# Top repos by total file size (using FK)
# Top repos by total file size (active files only, using FK)
top_repos = (
File.select(File.repository, fn.SUM(File.size).alias("total_size"))
.where(File.is_deleted == False)
.group_by(File.repository)
.order_by(fn.SUM(File.size).desc())
.limit(limit)

View File

@@ -120,11 +120,21 @@ async def process_regular_file(
# Check if file unchanged (deduplication)
existing = get_file(repo, path)
if existing and existing.sha256 == git_blob_sha1 and existing.size == len(data):
logger.info(f"Skipping unchanged file: {path}")
return False
if existing.is_deleted:
# File was deleted, now being restored - need to re-upload to LakeFS
logger.info(
f"Restoring deleted non-LFS file: {path} (sha256={git_blob_sha1[:8]}, size={file_size:,})"
)
else:
# File unchanged and active, skip
logger.info(f"Skipping unchanged file: {path}")
return False
# File changed, need to upload
logger.info(f"Uploading regular file: {path} ({file_size} bytes)")
# File changed or needs restoration
if existing and existing.is_deleted:
logger.info(f"Uploading to restore non-LFS file: {path} ({file_size} bytes)")
else:
logger.info(f"Uploading regular file: {path} ({file_size} bytes)")
# Upload to LakeFS
try:
@@ -145,6 +155,7 @@ async def process_regular_file(
size=len(data),
sha256=git_blob_sha1,
lfs=False,
is_deleted=False,
owner=repo.owner,
).on_conflict(
conflict_target=(File.repository, File.path_in_repo),
@@ -152,6 +163,7 @@ async def process_regular_file(
File.sha256: git_blob_sha1,
File.size: len(data),
File.lfs: False, # Explicitly set to False
File.is_deleted: False, # File is active (un-delete if previously deleted)
File.updated_at: datetime.now(timezone.utc),
},
).execute()
@@ -188,8 +200,8 @@ async def process_lfs_file(
if not oid:
raise HTTPException(400, detail={"error": f"Missing OID for LFS file {path}"})
# Check for existing file
existing = get_file(repo, path)
# Check for existing file (including deleted files to detect re-upload)
existing = File.get_or_none((File.repository == repo) & (File.path_in_repo == path))
# Track old LFS object for potential deletion
old_lfs_oid = None
@@ -197,10 +209,86 @@ async def process_lfs_file(
old_lfs_oid = existing.sha256
logger.info(f"File {path} will be replaced: {old_lfs_oid}{oid}")
# Check if file unchanged
if existing and existing.sha256 == oid and existing.size == size:
logger.info(f"Skipping unchanged LFS file: {path}")
return False, None
# Check if same content (including deleted files)
# If same sha256+size, DON'T create new LFSObjectHistory
same_content = existing and existing.sha256 == oid and existing.size == size
if same_content:
if existing.is_deleted:
logger.info(
f"[PROCESS_LFS_FILE] Re-uploading deleted file: {path} (sha256={oid[:8]}, size={size:,}) "
f"- RESTORING in LakeFS (reusing existing LFSObjectHistory)"
)
# File was deleted, now being restored
# Need to link physical address in LakeFS to restore the file
# But DON'T create new LFSObjectHistory (already exists)
# Construct S3 physical address
lfs_key = f"lfs/{oid[:2]}/{oid[2:4]}/{oid}"
physical_address = f"s3://{cfg.s3.bucket}/{lfs_key}"
# Link the physical S3 object to LakeFS to restore
try:
staging_metadata = {
"staging": {
"physical_address": physical_address,
},
"checksum": f"{algo}:{oid}",
"size_bytes": size,
}
client = get_lakefs_client()
await client.link_physical_address(
repository=lakefs_repo,
branch=revision,
path=path,
staging_metadata=staging_metadata,
)
logger.success(
f"Successfully restored LFS file in LakeFS: {path} "
f"(oid: {oid[:8]}, size: {size}, physical: {physical_address})"
)
except Exception as e:
logger.exception(
f"Failed to restore LFS file in LakeFS: {path} "
f"(oid: {oid[:8]}, repo: {lakefs_repo}, branch: {revision})",
e,
)
raise HTTPException(
500,
detail={
"error": f"Failed to restore LFS file {path} in LakeFS: {str(e)}"
},
)
# Update database to mark as not deleted
File.update(is_deleted=False, updated_at=datetime.now(timezone.utc)).where(
File.id == existing.id
).execute()
logger.success(f"Restored deleted file in DB: {path} (unmarked is_deleted)")
# Return tracking info for new commit (reusing existing LFS object)
return True, {
"path": path,
"sha256": oid,
"size": size,
"old_sha256": None, # No old version (same content, just restoring)
}
else:
logger.info(
f"[PROCESS_LFS_FILE] File unchanged: {path} (sha256={oid[:8]}, size={size:,}) "
f"- WILL TRACK in LFSObjectHistory"
)
# File exists and is active - normal case
# Still return tracking info for new commit
return False, {
"path": path,
"sha256": oid,
"size": size,
"old_sha256": None, # No old version (file unchanged)
}
# File changed or new
logger.info(f"Linking LFS file: {path}")
@@ -210,13 +298,30 @@ async def process_lfs_file(
physical_address = f"s3://{cfg.s3.bucket}/{lfs_key}"
# Verify object exists in S3
if not await object_exists(cfg.s3.bucket, lfs_key):
try:
exists = await object_exists(cfg.s3.bucket, lfs_key)
if not exists:
logger.error(
f"LFS object not found in S3: {oid[:8]} "
f"(path: {path}, bucket: {cfg.s3.bucket}, key: {lfs_key})"
)
raise HTTPException(
400,
detail={
"error": f"LFS object {oid} not found in storage. "
f"Upload to S3 may have failed. Path: {lfs_key}"
},
)
except HTTPException:
raise # Re-raise HTTPException as-is
except Exception as e:
logger.exception(
f"Failed to check S3 existence for LFS object {oid[:8]} "
f"(path: {path}, bucket: {cfg.s3.bucket}, key: {lfs_key})",
e,
)
raise HTTPException(
400,
detail={
"error": f"LFS object {oid} not found in storage. "
f"Upload to S3 may have failed. Path: {lfs_key}"
},
500, detail={"error": f"Failed to verify LFS object in S3: {str(e)}"}
)
# Get actual size from S3 to verify
@@ -226,12 +331,19 @@ async def process_lfs_file(
if actual_size != size:
logger.warning(
f"Size mismatch for {path}. Expected: {size}, Got: {actual_size}"
f"Size mismatch for {path}. Expected: {size}, Got: {actual_size} "
f"(oid: {oid[:8]}, key: {lfs_key})"
)
size = actual_size
except Exception as e:
logger.exception(f"Failed to get S3 object metadata", e)
logger.warning(f"Could not verify S3 object metadata: {e}")
logger.exception(
f"Failed to get S3 metadata for LFS object {oid[:8]} "
f"(path: {path}, bucket: {cfg.s3.bucket}, key: {lfs_key})",
e,
)
logger.warning(
f"Could not verify S3 object metadata, continuing without size check"
)
# Link the physical S3 object to LakeFS
try:
@@ -251,10 +363,18 @@ async def process_lfs_file(
staging_metadata=staging_metadata,
)
logger.success(f"Successfully linked LFS file in LakeFS: {path}")
logger.success(
f"Successfully linked LFS file in LakeFS: {path} "
f"(oid: {oid[:8]}, size: {size}, physical: {physical_address})"
)
except Exception as e:
logger.exception(f"Failed to link LFS file in LakeFS: {path}", e)
logger.exception(
f"Failed to link LFS file in LakeFS: {path} "
f"(oid: {oid[:8]}, repo: {lakefs_repo}, branch: {revision}, "
f"physical_address: {physical_address})",
e,
)
raise HTTPException(
500,
detail={"error": f"Failed to link LFS file {path} in LakeFS: {str(e)}"},
@@ -267,6 +387,7 @@ async def process_lfs_file(
size=size,
sha256=oid,
lfs=True,
is_deleted=False,
owner=repo.owner,
).on_conflict(
conflict_target=(File.repository, File.path_in_repo),
@@ -274,6 +395,7 @@ async def process_lfs_file(
File.sha256: oid,
File.size: size,
File.lfs: True,
File.is_deleted: False, # File is active (un-delete if previously deleted)
File.updated_at: datetime.now(timezone.utc),
},
).execute()
@@ -281,19 +403,29 @@ async def process_lfs_file(
logger.success(f"Updated database record for LFS file: {path}")
# Return tracking info for GC
return True, {
tracking_info = {
"path": path,
"sha256": oid,
"size": size,
"old_sha256": old_lfs_oid,
}
logger.info(
f"[PROCESS_LFS_FILE] File changed/new: {path} (sha256={oid[:8]}, size={size:,}) "
f"- WILL TRACK in LFSObjectHistory"
)
return True, tracking_info
async def process_deleted_file(
path: str, repo: Repository, lakefs_repo: str, revision: str
) -> bool:
"""Process file deletion.
Marks file as deleted (soft delete) instead of removing from database.
This preserves LFSObjectHistory FK references for quota tracking.
Args:
path: File path to delete
repo: Repository object
@@ -313,15 +445,15 @@ async def process_deleted_file(
# File might not exist, log warning but continue
logger.warning(f"Failed to delete {path} from LakeFS: {e}")
# Remove from database
deleted_count = (
File.delete()
# Mark as deleted in database (soft delete)
updated_count = (
File.update(is_deleted=True, updated_at=datetime.now(timezone.utc))
.where((File.repository == repo) & (File.path_in_repo == path))
.execute()
)
if deleted_count > 0:
logger.success(f"Removed {path} from database")
if updated_count > 0:
logger.success(f"Marked {path} as deleted in database (soft delete)")
else:
logger.info(f"File {path} was not in database")
@@ -393,17 +525,19 @@ async def process_deleted_folder(
logger.success(f"Deleted {len(deleted_files)} files from folder {folder_path}")
# Remove from database
# Mark as deleted in database (soft delete)
if deleted_files:
deleted_count = (
File.delete()
updated_count = (
File.update(is_deleted=True, updated_at=datetime.now(timezone.utc))
.where(
(File.repository == repo)
& (File.path_in_repo.startswith(folder_path))
)
.execute()
)
logger.success(f"Removed {deleted_count} records from database")
logger.success(
f"Marked {updated_count} file(s) as deleted in database (soft delete)"
)
except Exception as e:
logger.warning(f"Error deleting folder {folder_path}: {e}")
@@ -481,6 +615,7 @@ async def process_copy_file(
size=src_file.size,
sha256=src_file.sha256,
lfs=src_file.lfs,
is_deleted=False,
owner=repo.owner,
).on_conflict(
conflict_target=(File.repository, File.path_in_repo),
@@ -488,6 +623,7 @@ async def process_copy_file(
File.sha256: src_file.sha256,
File.size: src_file.size,
File.lfs: src_file.lfs,
File.is_deleted: False, # File is active
File.updated_at: datetime.now(timezone.utc),
},
).execute()
@@ -501,6 +637,7 @@ async def process_copy_file(
size=src_obj["size_bytes"],
sha256=src_obj["checksum"],
lfs=is_lfs,
is_deleted=False,
owner=repo.owner,
).on_conflict(
conflict_target=(File.repository, File.path_in_repo),
@@ -508,6 +645,7 @@ async def process_copy_file(
File.sha256: src_obj["checksum"],
File.size: src_obj["size_bytes"],
File.lfs: is_lfs,
File.is_deleted: False, # File is active
File.updated_at: datetime.now(timezone.utc),
},
).execute()
@@ -636,7 +774,16 @@ async def commit(
)
files_changed = files_changed or changed
if lfs_info:
logger.debug(
f"[COMMIT_OP] Adding LFS file to tracking queue: {path} "
f"(sha256={lfs_info['sha256'][:8]}, size={lfs_info['size']:,})"
)
pending_lfs_tracking.append(lfs_info)
else:
logger.warning(
f"[COMMIT_OP] process_lfs_file returned NO tracking info for: {path} "
f"(oid={value.get('oid', 'MISSING')[:8]})"
)
case "deletedFile":
# Delete single file
@@ -724,7 +871,15 @@ async def commit(
# Track LFS objects and run GC
if pending_lfs_tracking:
logger.info(
f"[COMMIT_LFS_TRACKING] Processing {len(pending_lfs_tracking)} LFS file(s) "
f"for commit {commit_result['id'][:8]}"
)
for lfs_info in pending_lfs_tracking:
logger.debug(
f" - {lfs_info['path']}: sha256={lfs_info['sha256'][:8]}, size={lfs_info['size']:,}"
)
track_lfs_object(
repo_type=repo_type.value,
namespace=namespace,
@@ -747,6 +902,10 @@ async def commit(
logger.info(
f"GC: Cleaned up {deleted_count} old version(s) of {lfs_info['path']}"
)
else:
logger.warning(
f"[COMMIT_LFS_TRACKING] No LFS files to track for commit {commit_result['id'][:8]}"
)
# Update storage usage for namespace and repository after successful commit
try:

View File

@@ -105,11 +105,25 @@ async def process_upload_object(
"""
lfs_key = get_lfs_key(oid)
# Check if object already exists (deduplication)
existing = get_file_by_sha256(oid)
# Check if object exists in S3 (global dedup)
try:
s3_exists = await object_exists(cfg.s3.bucket, lfs_key)
except Exception as e:
logger.exception(
f"Failed to check S3 existence for {oid[:8]} (key: {lfs_key})", e
)
s3_exists = False
if existing and existing.size == size:
# Object exists, no upload needed
# Check File table (per-repository check)
existing_file = get_file_by_sha256(oid)
if s3_exists or (existing_file and existing_file.size == size):
# Object exists (either in S3 globally or in File table)
# Tell client to skip upload
logger.info(
f"LFS object {oid[:8]} already exists "
f"(s3={s3_exists}, db={existing_file is not None}), skipping upload"
)
return LFSObjectResponse(
oid=oid,
size=size,
@@ -165,6 +179,11 @@ async def process_upload_object(
},
)
except Exception as e:
logger.exception(
f"Failed to generate upload URL for LFS object {oid[:8]} "
f"(size: {size}, key: {lfs_key})",
e,
)
return LFSObjectResponse(
oid=oid,
size=size,

View File

@@ -177,7 +177,10 @@ class GitLakeFSBridge:
return {}
file_records = {
f.path_in_repo: f for f in File.select().where(File.repository == repo)
f.path_in_repo: f
for f in File.select().where(
(File.repository == repo) & (File.is_deleted == False)
)
}
# Check for .gitattributes and parse LFS patterns

View File

@@ -7,7 +7,7 @@ for users and organizations with separate tracking for private and public reposi
import asyncio
from kohakuhub.config import cfg
from kohakuhub.db import LFSObjectHistory, Repository, User
from kohakuhub.db import File, LFSObjectHistory, Repository, User
from kohakuhub.db_operations import get_organization
from kohakuhub.logger import get_logger
from kohakuhub.utils.lakefs import get_lakefs_client, lakefs_repo_name
@@ -18,21 +18,32 @@ logger = get_logger("QUOTA")
async def calculate_repository_storage(repo: Repository) -> dict[str, int]:
"""Calculate total storage usage for a repository.
Storage calculation:
- Non-LFS files: Counted from current branch
- LFS files: Counted from ALL history (all versions, including deleted)
This ensures:
- Deleting LFS files doesn't decrease quota (LFS cache preserved)
- No double counting of LFS files
Args:
repo: Repository model instance
Returns:
Dict with keys:
- total_bytes: Total storage used
- current_branch_bytes: Storage in current branch
- total_bytes: Total storage used (non-LFS current + all LFS history)
- current_branch_bytes: Storage in current branch (all files)
- current_branch_non_lfs_bytes: Non-LFS files in current branch
- lfs_total_bytes: Total LFS storage (all versions)
- lfs_unique_bytes: Unique LFS storage (deduplicated by SHA256)
"""
lakefs_repo = lakefs_repo_name(repo.repo_type, repo.full_id)
client = get_lakefs_client()
# Calculate current branch storage
# Calculate current branch storage (all files)
current_branch_bytes = 0
current_branch_lfs_bytes = 0
try:
# List all objects in main branch
after = ""
@@ -49,7 +60,19 @@ async def calculate_repository_storage(repo: Repository) -> dict[str, int]:
for obj in result["results"]:
if obj["path_type"] == "object":
current_branch_bytes += obj.get("size_bytes") or 0
size = obj.get("size_bytes") or 0
current_branch_bytes += size
# Check if this file is LFS (stored in File table)
path = obj.get("path")
if path:
file_record = File.get_or_none(
(File.repository == repo)
& (File.path_in_repo == path)
& (File.is_deleted == False)
)
if file_record and file_record.lfs:
current_branch_lfs_bytes += size
if result.get("pagination") and result["pagination"].get("has_more"):
after = result["pagination"]["next_offset"]
@@ -62,8 +85,10 @@ async def calculate_repository_storage(repo: Repository) -> dict[str, int]:
f"Failed to calculate current branch storage for {repo.full_id}: {e}"
)
# Calculate LFS storage from history
# Total LFS storage (all versions)
# Calculate non-LFS storage in current branch
current_branch_non_lfs_bytes = current_branch_bytes - current_branch_lfs_bytes
# Calculate LFS storage from history (all versions, including deleted)
lfs_total = (
LFSObjectHistory.select().where(LFSObjectHistory.repository == repo).count()
)
@@ -87,12 +112,15 @@ async def calculate_repository_storage(repo: Repository) -> dict[str, int]:
lfs_unique_bytes = sum(obj.size for obj in unique_lfs)
# Total storage = current branch + all LFS versions
total_bytes = current_branch_bytes + lfs_total_bytes
# Total storage = non-LFS in current branch + unique LFS storage (deduplicated)
# Using lfs_unique_bytes ensures global deduplication works correctly for quota
# This avoids counting the same SHA256 object multiple times across versions
total_bytes = current_branch_non_lfs_bytes + lfs_unique_bytes
return {
"total_bytes": total_bytes,
"current_branch_bytes": current_branch_bytes,
"current_branch_non_lfs_bytes": current_branch_non_lfs_bytes,
"lfs_total_bytes": lfs_total_bytes,
"lfs_unique_bytes": lfs_unique_bytes,
}

View File

@@ -1,8 +1,8 @@
"""Garbage collection utilities for LFS objects."""
import asyncio
from datetime import datetime, timezone
from typing import List, Optional
import asyncio
from kohakuhub.config import cfg
from kohakuhub.db import File, LFSObjectHistory, Repository
@@ -30,6 +30,9 @@ def track_lfs_object(
):
"""Track LFS object usage in a commit.
Always creates a new LFSObjectHistory entry for full commit tracking.
GC will count unique oids to determine what to delete.
Args:
repo_type: Repository type (model/dataset/space)
namespace: Repository namespace
@@ -39,6 +42,11 @@ def track_lfs_object(
size: Object size in bytes
commit_id: LakeFS commit ID
"""
logger.info(
f"[TRACK_LFS_OBJECT_CALLED] repo={repo_type}/{namespace}/{name}, "
f"path={path_in_repo}, sha256={sha256[:8]}, size={size:,}, commit={commit_id[:8]}"
)
# Get repository FK object
repo = get_repository(repo_type, namespace, name)
if not repo:
@@ -50,7 +58,7 @@ def track_lfs_object(
(File.repository == repo) & (File.path_in_repo == path_in_repo)
)
# Create LFS history with FK objects
# Always create new LFS history entry with FK objects
create_lfs_history(
repository=repo,
path_in_repo=path_in_repo,
@@ -59,8 +67,9 @@ def track_lfs_object(
commit_id=commit_id,
file=file_fk, # Optional FK for faster lookups
)
logger.debug(
f"Tracked LFS object {sha256[:8]} for {path_in_repo} in commit {commit_id[:8]}"
logger.success(
f"[TRACK_LFS_OBJECT_DONE] Created LFS history for {path_in_repo} "
f"(sha256={sha256[:8]}, commit={commit_id[:8]})"
)
@@ -71,10 +80,14 @@ def get_old_lfs_versions(
) -> List[str]:
"""Get old LFS object hashes that should be garbage collected.
Counts UNIQUE oids (sha256), not individual history entries.
If the same oid appears in multiple commits, it's counted once.
Keeps the newest K unique oids, deletes all others.
Args:
repo: Repository FK object
path_in_repo: File path
keep_count: Number of versions to keep
keep_count: Number of unique versions to keep
Returns:
List of SHA256 hashes to delete
@@ -91,31 +104,37 @@ def get_old_lfs_versions(
all_versions = list(history)
if len(all_versions) <= keep_count:
# Not enough versions to trigger GC
if not all_versions:
logger.debug(f"No LFS history for {path_in_repo}")
return []
# Extract unique sha256 values in order (newest first)
unique_oids = []
seen_oids = set()
for version in all_versions:
if version.sha256 not in seen_oids:
unique_oids.append(version.sha256)
seen_oids.add(version.sha256)
# Check if we have enough unique versions to trigger GC
if len(unique_oids) <= keep_count:
logger.debug(
f"Only {len(all_versions)} versions of {path_in_repo}, keeping all"
f"Only {len(unique_oids)} unique version(s) of {path_in_repo} "
f"({len(all_versions)} total entries), keeping all"
)
return []
# Keep the newest K versions, delete the rest
versions_to_keep = all_versions[:keep_count]
versions_to_delete = all_versions[keep_count:]
keep_hashes = {v.sha256 for v in versions_to_keep}
delete_hashes = []
for old_version in versions_to_delete:
# Only delete if not in the "keep" set (shouldn't happen, but safety check)
if old_version.sha256 not in keep_hashes:
delete_hashes.append(old_version.sha256)
# Keep the newest K unique oids, delete the rest
keep_oids = set(unique_oids[:keep_count])
delete_oids = unique_oids[keep_count:]
logger.info(
f"GC for {path_in_repo}: keeping {len(versions_to_keep)} versions, "
f"marking {len(delete_hashes)} for deletion"
f"GC for {path_in_repo}: {len(unique_oids)} unique version(s), "
f"keeping {len(keep_oids)}, marking {len(delete_oids)} for deletion"
)
return delete_hashes
return delete_oids
def cleanup_lfs_object(sha256: str, repo: Optional[Repository] = None) -> bool:
@@ -128,8 +147,10 @@ def cleanup_lfs_object(sha256: str, repo: Optional[Repository] = None) -> bool:
Returns:
True if deleted, False if still in use or deletion failed
"""
# Check if this object is still referenced in current files
query = File.select().where((File.sha256 == sha256) & (File.lfs == True))
# Check if this object is still referenced in current files (active files only)
query = File.select().where(
(File.sha256 == sha256) & (File.lfs == True) & (File.is_deleted == False)
)
if repo:
query = query.where(File.repository == repo)
@@ -137,7 +158,7 @@ def cleanup_lfs_object(sha256: str, repo: Optional[Repository] = None) -> bool:
if current_uses > 0:
logger.debug(
f"LFS object {sha256[:8]} still used by {current_uses} file(s), keeping"
f"LFS object {sha256[:8]} still used by {current_uses} active file(s), keeping"
)
return False
@@ -172,14 +193,21 @@ def cleanup_lfs_object(sha256: str, repo: Optional[Repository] = None) -> bool:
)
.execute()
)
logger.warning(
f"[LFS_HISTORY_DELETE] Removed {deleted_count} history record(s) "
f"for sha256={sha256[:8]} in repo={repo.full_id}"
)
else:
deleted_count = (
LFSObjectHistory.delete()
.where(LFSObjectHistory.sha256 == sha256)
.execute()
)
logger.warning(
f"[LFS_HISTORY_DELETE] Removed {deleted_count} history record(s) "
f"for sha256={sha256[:8]} (global cleanup)"
)
logger.info(f"Removed {deleted_count} history records for {sha256[:8]}")
return True
except Exception as e:
@@ -477,6 +505,7 @@ async def sync_file_table_with_commit(
size=size_bytes,
sha256=sha256,
lfs=is_lfs,
is_deleted=False,
owner=repo.owner, # Denormalized owner
).on_conflict(
conflict_target=(File.repository, File.path_in_repo),
@@ -484,6 +513,7 @@ async def sync_file_table_with_commit(
File.sha256: sha256,
File.size: size_bytes,
File.lfs: is_lfs,
File.is_deleted: False, # File is active
File.updated_at: datetime.now(timezone.utc),
},
).execute()
@@ -724,6 +754,7 @@ async def track_commit_lfs_objects(
size=size_bytes,
sha256=sha256,
lfs=is_lfs,
is_deleted=False,
owner=repo.owner, # Denormalized owner
).on_conflict(
conflict_target=(File.repository, File.path_in_repo),
@@ -731,6 +762,7 @@ async def track_commit_lfs_objects(
File.sha256: sha256,
File.size: size_bytes,
File.lfs: is_lfs,
File.is_deleted: False, # File is active
File.updated_at: datetime.now(timezone.utc),
},
).execute()

View File

@@ -177,6 +177,7 @@ class File(BaseModel):
size = IntegerField(default=0)
sha256 = CharField(index=True)
lfs = BooleanField(default=False)
is_deleted = BooleanField(default=False, index=True) # Soft delete flag
owner = ForeignKeyField(
User, backref="owned_files", on_delete="CASCADE", index=True
) # Repository owner (denormalized for convenience)
@@ -274,8 +275,10 @@ class LFSObjectHistory(BaseModel):
size = IntegerField()
commit_id = CharField(index=True) # LakeFS commit ID
# Optional link to File record for faster lookups
# IMPORTANT: on_delete="SET NULL" prevents CASCADE deletion when File is deleted
# LFSObjectHistory must persist for quota tracking even after file deletion
file = ForeignKeyField(
File, backref="lfs_versions", on_delete="CASCADE", null=True, index=True
File, backref="lfs_versions", on_delete="SET NULL", null=True, index=True
)
created_at = DateTimeField(default=partial(datetime.now, tz=timezone.utc))

View File

@@ -329,15 +329,17 @@ def list_organization_members(org: User) -> list[UserOrganization]:
def get_file(repo: Repository, path_in_repo: str) -> File | None:
"""Get file by repository FK and path."""
"""Get file by repository FK and path (only active files)."""
return File.get_or_none(
(File.repository == repo) & (File.path_in_repo == path_in_repo)
(File.repository == repo)
& (File.path_in_repo == path_in_repo)
& (File.is_deleted == False)
)
def get_file_by_sha256(sha256: str) -> File | None:
"""Get file by SHA256 hash."""
return File.get_or_none(File.sha256 == sha256)
"""Get file by SHA256 hash (only active files)."""
return File.get_or_none((File.sha256 == sha256) & (File.is_deleted == False))
def create_file(
@@ -355,6 +357,7 @@ def create_file(
size=size,
sha256=sha256,
lfs=lfs,
is_deleted=False,
owner=owner, # Denormalized from repository.owner for convenience
)
@@ -477,7 +480,11 @@ def create_lfs_history(
file: File | None = None,
) -> LFSObjectHistory:
"""Create LFS object history record with ForeignKeys."""
return LFSObjectHistory.create(
from kohakuhub.logger import get_logger
logger = get_logger("DB")
entry = LFSObjectHistory.create(
repository=repository,
path_in_repo=path_in_repo,
sha256=sha256,
@@ -486,6 +493,14 @@ def create_lfs_history(
file=file, # Optional FK to File for faster lookups
)
logger.success(
f"[LFS_HISTORY_CREATE] repo={repository.full_id}, "
f"path={path_in_repo}, sha256={sha256[:8]}, size={size:,}, "
f"commit={commit_id[:8]}, id={entry.id}"
)
return entry
def list_lfs_history(
repository: Repository, path_in_repo: str, limit: int | None = None
@@ -504,6 +519,43 @@ def list_lfs_history(
return list(query)
def get_lfs_history_entry(
repository: Repository,
path_in_repo: str,
sha256: str,
) -> LFSObjectHistory | None:
"""Get existing LFS history entry for repo/path/sha256.
Args:
repository: Repository FK object
path_in_repo: File path
sha256: LFS object SHA256 hash
Returns:
LFSObjectHistory instance or None
"""
return LFSObjectHistory.get_or_none(
(LFSObjectHistory.repository == repository)
& (LFSObjectHistory.path_in_repo == path_in_repo)
& (LFSObjectHistory.sha256 == sha256)
)
def refresh_lfs_history_timestamp(
lfs_entry: LFSObjectHistory,
commit_id: str,
) -> None:
"""Refresh LFS history timestamp and update commit_id.
Args:
lfs_entry: LFSObjectHistory instance to update
commit_id: New commit ID
"""
lfs_entry.commit_id = commit_id
lfs_entry.created_at = datetime.now(timezone.utc)
lfs_entry.save()
def get_effective_lfs_threshold(repo: Repository) -> int:
"""Get effective LFS threshold for a repository.

View File

@@ -74,22 +74,29 @@ class LakeFSRestClient:
"""
if not response.is_success:
# Include response body in error for debugging
error_detail = response.text if response.text else ""
error_detail = response.text if response.text else "(empty response)"
# Comprehensive logging
logger.error(
f"LakeFS API error: {response.status_code} {response.reason_phrase}\n"
f"URL: {response.url}\n"
f"Response: {error_detail}"
f"LakeFS API request failed:\n"
f" Status: {response.status_code} {response.reason_phrase}\n"
f" Method: {response.request.method}\n"
f" URL: {response.url}\n"
f" Response Body: {error_detail}"
)
# Create comprehensive error message
error_msg = (
f"LakeFS API error {response.status_code} {response.reason_phrase} "
f"for {response.request.method} {response.url}: {error_detail}"
)
# Raise HTTPStatusError with full context
raise httpx.HTTPStatusError(
error_msg,
request=response.request,
response=response,
)
# Raise with enhanced error message
try:
self._check_response(response)
except httpx.HTTPStatusError as e:
# Re-raise with response text included
raise httpx.HTTPStatusError(
f"{e.response.status_code} {e.response.reason_phrase}: {error_detail}",
request=e.request,
response=e.response,
)
async def get_object(
self, repository: str, ref: str, path: str, range_header: str | None = None