mirror of
https://github.com/KohakuBlueleaf/KohakuHub.git
synced 2026-05-04 19:37:53 -05:00
Fix minor bugs, add commit listing api, better logging
This commit is contained in:
@@ -25,7 +25,9 @@ from .hf_utils import (
|
||||
is_lakefs_revision_error,
|
||||
)
|
||||
from .lakefs_utils import get_lakefs_client, lakefs_repo_name
|
||||
from ..logger import get_logger
|
||||
|
||||
logger = get_logger("REPO")
|
||||
router = APIRouter()
|
||||
init_db()
|
||||
|
||||
@@ -53,6 +55,9 @@ def create_repo(payload: CreateRepoPayload, user: User = Depends(get_current_use
|
||||
Returns:
|
||||
Created repository information
|
||||
"""
|
||||
logger.info(
|
||||
f"Creating repository: {payload.organization or user.username}/{payload.name}"
|
||||
)
|
||||
namespace = payload.organization or user.username
|
||||
|
||||
# Check if user has permission to use this namespace
|
||||
@@ -83,6 +88,7 @@ def create_repo(payload: CreateRepoPayload, user: User = Depends(get_current_use
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(f"LakeFS repository creation failed for {full_id}", e)
|
||||
return hf_server_error(f"LakeFS repository creation failed: {str(e)}")
|
||||
|
||||
# Store in database for listing/metadata
|
||||
@@ -155,13 +161,14 @@ async def delete_repo(
|
||||
try:
|
||||
# Note: Deleting a LakeFS repo is generally fast as it only deletes metadata
|
||||
client.repositories.delete_repository(repository=lakefs_repo)
|
||||
print(f"Successfully deleted LakeFS repository: {lakefs_repo}")
|
||||
logger.success(f"Successfully deleted LakeFS repository: {lakefs_repo}")
|
||||
except Exception as e:
|
||||
# LakeFS returns 404 if repo doesn't exist, which is fine
|
||||
if not is_lakefs_not_found_error(e):
|
||||
# If LakeFS deletion fails for other reasons, fail the whole operation
|
||||
logger.exception(f"LakeFS repository deletion failed for {lakefs_repo}", e)
|
||||
return hf_server_error(f"LakeFS repository deletion failed: {str(e)}")
|
||||
print(f"LakeFS repository {lakefs_repo} not found/already deleted (OK)")
|
||||
logger.info(f"LakeFS repository {lakefs_repo} not found/already deleted (OK)")
|
||||
|
||||
# 3. Delete related metadata from database (manual cascade)
|
||||
try:
|
||||
@@ -169,8 +176,9 @@ async def delete_repo(
|
||||
File.delete().where(File.repo_full_id == full_id).execute()
|
||||
StagingUpload.delete().where(StagingUpload.repo_full_id == full_id).execute()
|
||||
repo_row.delete_instance()
|
||||
print(f"Successfully deleted database records for: {full_id}")
|
||||
logger.success(f"Successfully deleted database records for: {full_id}")
|
||||
except Exception as e:
|
||||
logger.exception(f"Database deletion failed for {full_id}", e)
|
||||
return hf_server_error(f"Database deletion failed for {full_id}: {str(e)}")
|
||||
|
||||
# 4. Return success response (200 OK with a simple message)
|
||||
@@ -262,11 +270,11 @@ async def get_repo_info(
|
||||
last_modified = datetime.fromtimestamp(
|
||||
commit_info.creation_date
|
||||
).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as ex:
|
||||
logger.debug(f"Could not get commit info: {str(ex)}")
|
||||
except Exception as e:
|
||||
# Log warning but continue - repo exists even if LakeFS has issues
|
||||
print(f"Warning: Could not get branch info for {lakefs_repo}/main: {e}")
|
||||
logger.warning(f"Could not get branch info for {lakefs_repo}/main: {str(e)}")
|
||||
|
||||
# Format created_at
|
||||
created_at = format_hf_datetime(repo_row.created_at)
|
||||
@@ -362,16 +370,35 @@ async def list_repo_tree(
|
||||
prefix += "/"
|
||||
|
||||
try:
|
||||
# List objects from LakeFS
|
||||
# List objects from LakeFS with pagination support
|
||||
from ..async_utils import get_async_lakefs_client
|
||||
|
||||
async_client = get_async_lakefs_client()
|
||||
result = await async_client.list_objects(
|
||||
repository=lakefs_repo,
|
||||
ref=revision,
|
||||
prefix=prefix,
|
||||
delimiter="" if recursive else "/",
|
||||
)
|
||||
|
||||
# Collect all results with pagination
|
||||
all_results = []
|
||||
after = ""
|
||||
has_more = True
|
||||
|
||||
while has_more:
|
||||
result = await async_client.list_objects(
|
||||
repository=lakefs_repo,
|
||||
ref=revision,
|
||||
prefix=prefix,
|
||||
delimiter="" if recursive else "/",
|
||||
amount=1000, # Max per request
|
||||
after=after,
|
||||
)
|
||||
|
||||
all_results.extend(result.results)
|
||||
|
||||
# Check pagination
|
||||
if result.pagination and result.pagination.has_more:
|
||||
after = result.pagination.next_offset
|
||||
has_more = True
|
||||
else:
|
||||
has_more = False
|
||||
|
||||
except Exception as e:
|
||||
# Check for specific error types
|
||||
if is_lakefs_not_found_error(e):
|
||||
@@ -384,13 +411,14 @@ async def list_repo_tree(
|
||||
return []
|
||||
|
||||
# Other errors are server errors
|
||||
logger.exception(f"Failed to list objects for {repo_id}", e)
|
||||
return hf_server_error(f"Failed to list objects: {str(e)}")
|
||||
|
||||
# Convert LakeFS objects to HuggingFace format (flat list)
|
||||
result_list = []
|
||||
prefix_len = len(prefix)
|
||||
|
||||
for obj in result.results:
|
||||
for obj in all_results:
|
||||
if obj.path_type == "object":
|
||||
# File object
|
||||
is_lfs = obj.size_bytes > cfg.app.lfs_threshold_bytes
|
||||
@@ -417,6 +445,14 @@ async def list_repo_tree(
|
||||
"path": relative_path,
|
||||
}
|
||||
|
||||
# Add last modified info if available from LakeFS
|
||||
if hasattr(obj, "mtime") and obj.mtime:
|
||||
from datetime import datetime
|
||||
|
||||
file_obj["lastModified"] = datetime.fromtimestamp(obj.mtime).strftime(
|
||||
"%Y-%m-%dT%H:%M:%S.%fZ"
|
||||
)
|
||||
|
||||
# Add LFS metadata if it's an LFS file
|
||||
if is_lfs:
|
||||
file_obj["lfs"] = {
|
||||
@@ -432,18 +468,24 @@ async def list_repo_tree(
|
||||
# Remove prefix from path to get relative path
|
||||
relative_path = obj.path[prefix_len:] if prefix else obj.path
|
||||
|
||||
result_list.append(
|
||||
{
|
||||
"type": "directory",
|
||||
"oid": (
|
||||
obj.checksum
|
||||
if hasattr(obj, "checksum") and obj.checksum
|
||||
else ""
|
||||
),
|
||||
"size": 0,
|
||||
"path": relative_path.rstrip("/"), # Remove trailing slash
|
||||
}
|
||||
)
|
||||
dir_obj = {
|
||||
"type": "directory",
|
||||
"oid": (
|
||||
obj.checksum if hasattr(obj, "checksum") and obj.checksum else ""
|
||||
),
|
||||
"size": 0,
|
||||
"path": relative_path.rstrip("/"), # Remove trailing slash
|
||||
}
|
||||
|
||||
# Add last modified info if available
|
||||
if hasattr(obj, "mtime") and obj.mtime:
|
||||
from datetime import datetime
|
||||
|
||||
dir_obj["lastModified"] = datetime.fromtimestamp(obj.mtime).strftime(
|
||||
"%Y-%m-%dT%H:%M:%S.%fZ"
|
||||
)
|
||||
|
||||
result_list.append(dir_obj)
|
||||
|
||||
return result_list
|
||||
|
||||
@@ -583,9 +625,9 @@ async def get_paths_info(
|
||||
}
|
||||
)
|
||||
# else: path doesn't exist, skip it (as per HF behavior)
|
||||
except Exception:
|
||||
except Exception as ex:
|
||||
# Path doesn't exist, skip it (as per HF behavior)
|
||||
pass
|
||||
logger.debug(f"Path {clean_path} doesn't exist or is invalid")
|
||||
# For other errors, also skip the path
|
||||
|
||||
return result
|
||||
|
||||
Reference in New Issue
Block a user