mirror of
https://github.com/KohakuBlueleaf/KohakuHub.git
synced 2026-05-06 04:17:46 -05:00
Add HF-compatible tree expand metadata
This commit is contained in:
@@ -451,20 +451,21 @@
|
||||
<template v-else>
|
||||
<!-- Header Row (desktop only) -->
|
||||
<div
|
||||
class="hidden md:grid md:grid-cols-[auto_1fr_120px_150px] gap-3 py-2 px-2 text-sm font-medium text-gray-600 dark:text-gray-400 border-b"
|
||||
class="hidden md:grid md:grid-cols-[auto_minmax(0,1.4fr)_minmax(0,2fr)_120px_110px] gap-3 py-2 px-2 text-sm font-medium text-gray-600 dark:text-gray-400 border-b"
|
||||
>
|
||||
<div></div>
|
||||
<!-- Icon column -->
|
||||
<div>Name</div>
|
||||
<div>Last Commit</div>
|
||||
<div class="text-right">Updated</div>
|
||||
<div class="text-right">Size</div>
|
||||
<div class="text-right">Last Modified</div>
|
||||
</div>
|
||||
|
||||
<!-- File Rows -->
|
||||
<div
|
||||
v-for="file in filteredFiles"
|
||||
:key="file.path"
|
||||
class="py-3 grid grid-cols-[auto_1fr_auto] md:grid-cols-[auto_1fr_120px_150px] gap-3 items-center hover:bg-gray-50 dark:hover:bg-gray-700 px-2 cursor-pointer transition-colors"
|
||||
class="py-3 grid grid-cols-[auto_1fr] md:grid-cols-[auto_minmax(0,1.4fr)_minmax(0,2fr)_120px_110px] gap-3 items-center hover:bg-gray-50 dark:hover:bg-gray-700 px-2 cursor-pointer transition-colors"
|
||||
@click="handleFileClick(file)"
|
||||
>
|
||||
<div
|
||||
@@ -479,16 +480,34 @@
|
||||
<div class="font-medium truncate">
|
||||
{{ getFileName(file.path) }}
|
||||
</div>
|
||||
<div
|
||||
class="mt-1 text-sm text-gray-500 dark:text-gray-400 truncate md:hidden"
|
||||
>
|
||||
{{ getEntryCommitTitle(file) }}
|
||||
</div>
|
||||
<div
|
||||
class="mt-1 text-xs text-gray-400 dark:text-gray-500 md:hidden"
|
||||
>
|
||||
{{ getEntryUpdatedAt(file) }}
|
||||
<span v-if="formatEntrySize(file) !== '-'">
|
||||
· {{ formatEntrySize(file) }}
|
||||
</span>
|
||||
</div>
|
||||
</div>
|
||||
<div
|
||||
class="text-sm text-gray-500 dark:text-gray-400 text-right"
|
||||
class="hidden md:block min-w-0 text-sm text-gray-500 dark:text-gray-400 truncate"
|
||||
>
|
||||
{{ formatSize(file.size) }}
|
||||
{{ getEntryCommitTitle(file) }}
|
||||
</div>
|
||||
<div
|
||||
class="hidden md:block text-sm text-gray-500 dark:text-gray-400 text-right"
|
||||
>
|
||||
{{ formatLastModified(file.lastModified) }}
|
||||
{{ getEntryUpdatedAt(file) }}
|
||||
</div>
|
||||
<div
|
||||
class="hidden md:block text-sm text-gray-500 dark:text-gray-400 text-right"
|
||||
>
|
||||
{{ formatEntrySize(file) }}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -866,8 +885,10 @@ const isLiked = ref(false);
|
||||
const likesCount = ref(0);
|
||||
const likingInProgress = ref(false);
|
||||
const deletingFolder = ref(false);
|
||||
const fileTreeRequestId = ref(0);
|
||||
|
||||
const baseUrl = window.location.origin;
|
||||
const PATHS_INFO_BATCH_SIZE = 1000;
|
||||
|
||||
// Computed
|
||||
const activeTab = computed(() => props.tab);
|
||||
@@ -903,7 +924,6 @@ const pathSegments = computed(() => {
|
||||
});
|
||||
|
||||
const filteredFiles = computed(() => {
|
||||
// Backend now provides folder stats, so just filter
|
||||
if (!fileSearchQuery.value) return fileTree.value;
|
||||
|
||||
const query = fileSearchQuery.value.toLowerCase();
|
||||
@@ -1006,8 +1026,17 @@ function formatSize(bytes) {
|
||||
return (bytes / (1000 * 1000 * 1000)).toFixed(1) + " GB";
|
||||
}
|
||||
|
||||
function formatLastModified(dateString) {
|
||||
return formatRelativeTime(dateString, "-");
|
||||
function formatEntrySize(file) {
|
||||
if (file.type === "directory") return "-";
|
||||
return formatSize(file.size);
|
||||
}
|
||||
|
||||
function getEntryCommitTitle(file) {
|
||||
return file.lastCommit?.title || "-";
|
||||
}
|
||||
|
||||
function getEntryUpdatedAt(file) {
|
||||
return formatRelativeTime(file.lastCommit?.date || file.lastModified, "-");
|
||||
}
|
||||
|
||||
function getFileName(path) {
|
||||
@@ -1015,6 +1044,22 @@ function getFileName(path) {
|
||||
return parts[parts.length - 1] || path;
|
||||
}
|
||||
|
||||
function sortFileEntries(entries) {
|
||||
return [...entries].sort((a, b) => {
|
||||
if (a.type === "directory" && b.type !== "directory") return -1;
|
||||
if (a.type !== "directory" && b.type === "directory") return 1;
|
||||
return a.path.localeCompare(b.path);
|
||||
});
|
||||
}
|
||||
|
||||
function chunkPaths(paths, size) {
|
||||
const chunks = [];
|
||||
for (let index = 0; index < paths.length; index += size) {
|
||||
chunks.push(paths.slice(index, index + size));
|
||||
}
|
||||
return chunks;
|
||||
}
|
||||
|
||||
function navigateToTab(tab) {
|
||||
switch (tab) {
|
||||
case "files":
|
||||
@@ -1164,8 +1209,13 @@ async function toggleLike() {
|
||||
|
||||
async function loadFileTree() {
|
||||
filesLoading.value = true;
|
||||
const requestId = fileTreeRequestId.value + 1;
|
||||
fileTreeRequestId.value = requestId;
|
||||
|
||||
let sortedEntries = [];
|
||||
|
||||
try {
|
||||
const { data } = await repoAPI.listTree(
|
||||
const data = await repoAPI.listTreeAll(
|
||||
props.repoType,
|
||||
props.namespace,
|
||||
props.name,
|
||||
@@ -1174,16 +1224,60 @@ async function loadFileTree() {
|
||||
{ recursive: false },
|
||||
);
|
||||
|
||||
fileTree.value = data.sort((a, b) => {
|
||||
if (a.type === "directory" && b.type !== "directory") return -1;
|
||||
if (a.type !== "directory" && b.type === "directory") return 1;
|
||||
return a.path.localeCompare(b.path);
|
||||
});
|
||||
if (requestId !== fileTreeRequestId.value) return;
|
||||
|
||||
sortedEntries = sortFileEntries(data || []);
|
||||
fileTree.value = sortedEntries;
|
||||
|
||||
if (sortedEntries.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
} catch (err) {
|
||||
console.error("Failed to load file tree:", err);
|
||||
fileTree.value = [];
|
||||
if (requestId === fileTreeRequestId.value) {
|
||||
fileTree.value = [];
|
||||
}
|
||||
} finally {
|
||||
filesLoading.value = false;
|
||||
if (requestId === fileTreeRequestId.value) {
|
||||
filesLoading.value = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (sortedEntries.length === 0 || requestId !== fileTreeRequestId.value) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const pathInfoByPath = new Map();
|
||||
const pathBatches = chunkPaths(
|
||||
sortedEntries.map((file) => file.path),
|
||||
PATHS_INFO_BATCH_SIZE,
|
||||
);
|
||||
|
||||
for (const pathBatch of pathBatches) {
|
||||
const { data: expandedEntries } = await repoAPI.getPathsInfo(
|
||||
props.repoType,
|
||||
props.namespace,
|
||||
props.name,
|
||||
currentBranch.value,
|
||||
pathBatch,
|
||||
true,
|
||||
);
|
||||
|
||||
if (requestId !== fileTreeRequestId.value) return;
|
||||
|
||||
for (const entry of expandedEntries || []) {
|
||||
pathInfoByPath.set(entry.path, entry);
|
||||
}
|
||||
}
|
||||
|
||||
fileTree.value = sortedEntries.map((file) => ({
|
||||
...file,
|
||||
...(pathInfoByPath.get(file.path) || {}),
|
||||
}));
|
||||
} catch (err) {
|
||||
console.error("Failed to load expanded path info:", err);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -39,6 +39,12 @@ api.interceptors.response.use(
|
||||
|
||||
export default api;
|
||||
|
||||
function getNextLinkFromHeader(linkHeader) {
|
||||
if (!linkHeader) return null;
|
||||
const match = linkHeader.match(/<([^>]+)>;\s*rel="next"/i);
|
||||
return match ? match[1] : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Auth API
|
||||
*/
|
||||
@@ -140,6 +146,65 @@ export const repoAPI = {
|
||||
params,
|
||||
}),
|
||||
|
||||
/**
|
||||
* List all repository file tree entries by following Link pagination
|
||||
* @param {string} type - Repository type
|
||||
* @param {string} namespace - Owner namespace
|
||||
* @param {string} name - Repository name
|
||||
* @param {string} revision - Branch name or commit hash
|
||||
* @param {string} path - Path within repository (with leading /)
|
||||
* @param {Object} params - Query parameters { recursive?, expand? }
|
||||
* @returns {Promise<Array>} - Array of files and directories across all pages
|
||||
*/
|
||||
listTreeAll: async (type, namespace, name, revision, path, params) => {
|
||||
const entries = [];
|
||||
let response = await repoAPI.listTree(
|
||||
type,
|
||||
namespace,
|
||||
name,
|
||||
revision,
|
||||
path,
|
||||
params,
|
||||
);
|
||||
|
||||
entries.push(...(response.data || []));
|
||||
|
||||
let nextUrl = getNextLinkFromHeader(response.headers?.link);
|
||||
while (nextUrl) {
|
||||
response = await api.get(nextUrl);
|
||||
entries.push(...(response.data || []));
|
||||
nextUrl = getNextLinkFromHeader(response.headers?.link);
|
||||
}
|
||||
|
||||
return entries;
|
||||
},
|
||||
|
||||
/**
|
||||
* Get repository metadata for specific paths
|
||||
* @param {string} type - Repository type
|
||||
* @param {string} namespace - Owner namespace
|
||||
* @param {string} name - Repository name
|
||||
* @param {string} revision - Branch name or commit hash
|
||||
* @param {Array<string>} paths - Repository-relative paths
|
||||
* @param {boolean} expand - Whether to request expanded metadata
|
||||
* @returns {Promise} - Array of files and directories
|
||||
*/
|
||||
getPathsInfo: (type, namespace, name, revision, paths, expand = false) => {
|
||||
const formData = new URLSearchParams();
|
||||
paths.forEach((path) => formData.append("paths", path));
|
||||
formData.append("expand", expand ? "true" : "false");
|
||||
|
||||
return api.post(
|
||||
`/api/${type}s/${namespace}/${name}/paths-info/${revision}`,
|
||||
formData,
|
||||
{
|
||||
headers: {
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
},
|
||||
},
|
||||
);
|
||||
},
|
||||
|
||||
/**
|
||||
* Upload files to repository
|
||||
* @param {string} type - Repository type
|
||||
|
||||
@@ -189,12 +189,20 @@ def with_repo_fallback(operation: OperationType):
|
||||
case "tree":
|
||||
revision = kwargs.get("revision", "main")
|
||||
path = kwargs.get("path", "")
|
||||
recursive = kwargs.get("recursive", False)
|
||||
expand = kwargs.get("expand", False)
|
||||
limit = kwargs.get("limit")
|
||||
cursor = kwargs.get("cursor")
|
||||
result = await try_fallback_tree(
|
||||
repo_type,
|
||||
namespace,
|
||||
name,
|
||||
revision,
|
||||
path,
|
||||
recursive=recursive,
|
||||
expand=expand,
|
||||
limit=limit,
|
||||
cursor=cursor,
|
||||
user_tokens=user_tokens,
|
||||
)
|
||||
|
||||
@@ -207,12 +215,14 @@ def with_repo_fallback(operation: OperationType):
|
||||
# For paths-info, extract paths and revision from kwargs
|
||||
revision = kwargs.get("revision", "main")
|
||||
paths = kwargs.get("paths", [])
|
||||
expand = kwargs.get("expand", False)
|
||||
result = await try_fallback_paths_info(
|
||||
repo_type,
|
||||
namespace,
|
||||
name,
|
||||
revision,
|
||||
paths,
|
||||
expand=expand,
|
||||
user_tokens=user_tokens,
|
||||
)
|
||||
|
||||
|
||||
@@ -258,8 +258,12 @@ async def try_fallback_tree(
|
||||
name: str,
|
||||
revision: str,
|
||||
path: str = "",
|
||||
recursive: bool = False,
|
||||
expand: bool = False,
|
||||
limit: int | None = None,
|
||||
cursor: str | None = None,
|
||||
user_tokens: dict[str, str] | None = None,
|
||||
) -> Optional[list]:
|
||||
) -> Optional[Response]:
|
||||
"""Try to get repository tree from fallback sources.
|
||||
|
||||
Args:
|
||||
@@ -271,7 +275,7 @@ async def try_fallback_tree(
|
||||
user_tokens: User-provided external tokens (overrides admin tokens)
|
||||
|
||||
Returns:
|
||||
List of file/folder objects or None if not found
|
||||
JSON response or None if not found
|
||||
"""
|
||||
sources = get_enabled_sources(namespace, user_tokens=user_tokens)
|
||||
|
||||
@@ -291,15 +295,30 @@ async def try_fallback_tree(
|
||||
token=source.get("token"),
|
||||
)
|
||||
|
||||
response = await client.get(kohaku_path, repo_type)
|
||||
params = {
|
||||
"recursive": recursive,
|
||||
"expand": expand,
|
||||
}
|
||||
if limit is not None:
|
||||
params["limit"] = limit
|
||||
if cursor:
|
||||
params["cursor"] = cursor
|
||||
|
||||
response = await client.get(kohaku_path, repo_type, params=params)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
|
||||
logger.info(
|
||||
f"Fallback tree SUCCESS: {repo_type}/{namespace}/{name}/tree from {source['name']}"
|
||||
)
|
||||
return data
|
||||
headers = {}
|
||||
if response.headers.get("link"):
|
||||
headers["Link"] = response.headers["link"]
|
||||
return Response(
|
||||
content=response.content,
|
||||
status_code=response.status_code,
|
||||
media_type=response.headers.get("content-type"),
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
elif not should_retry_source(response):
|
||||
return None
|
||||
@@ -317,6 +336,7 @@ async def try_fallback_paths_info(
|
||||
name: str,
|
||||
revision: str,
|
||||
paths: list[str],
|
||||
expand: bool = False,
|
||||
user_tokens: dict[str, str] | None = None,
|
||||
) -> Optional[list]:
|
||||
"""Try to get paths info from fallback sources.
|
||||
@@ -351,7 +371,7 @@ async def try_fallback_paths_info(
|
||||
|
||||
# POST request with form data
|
||||
response = await client.post(
|
||||
kohaku_path, repo_type, data={"paths": paths, "expand": False}
|
||||
kohaku_path, repo_type, data={"paths": paths, "expand": expand}
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
|
||||
@@ -1,21 +1,30 @@
|
||||
"""Repository tree listing and path information endpoints - Refactored version."""
|
||||
"""Repository tree listing and path information endpoints."""
|
||||
|
||||
import asyncio
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
from typing import Literal
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from fastapi import APIRouter, Depends, Form, Request
|
||||
from fastapi import APIRouter, Depends, Form, Query, Request
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
from kohakuhub.config import cfg
|
||||
from kohakuhub.constants import DATETIME_FORMAT_ISO
|
||||
from kohakuhub.db import File, Repository, User
|
||||
from kohakuhub.db_operations import get_file, get_repository, should_use_lfs
|
||||
from kohakuhub.logger import get_logger
|
||||
from kohakuhub.auth.dependencies import get_optional_user
|
||||
from kohakuhub.auth.permissions import check_repo_read_permission
|
||||
from kohakuhub.utils.lakefs import get_lakefs_client, lakefs_repo_name
|
||||
from kohakuhub.constants import DATETIME_FORMAT_ISO
|
||||
from kohakuhub.db import File, Repository, User
|
||||
from kohakuhub.db_operations import get_repository, should_use_lfs
|
||||
from kohakuhub.lakefs_rest_client import get_lakefs_rest_client
|
||||
from kohakuhub.logger import get_logger
|
||||
from kohakuhub.utils.lakefs import (
|
||||
get_lakefs_client,
|
||||
lakefs_repo_name,
|
||||
resolve_revision,
|
||||
)
|
||||
from kohakuhub.api.fallback import with_repo_fallback
|
||||
from kohakuhub.api.repo.utils.hf import (
|
||||
hf_bad_request,
|
||||
hf_entry_not_found,
|
||||
hf_repo_not_found,
|
||||
hf_revision_not_found,
|
||||
hf_server_error,
|
||||
@@ -28,196 +37,363 @@ router = APIRouter()
|
||||
|
||||
RepoType = Literal["model", "dataset", "space"]
|
||||
|
||||
|
||||
async def fetch_lakefs_objects(
|
||||
lakefs_repo: str, revision: str, prefix: str, recursive: bool
|
||||
) -> list:
|
||||
"""Fetch all objects from LakeFS with pagination.
|
||||
|
||||
Args:
|
||||
lakefs_repo: LakeFS repository name
|
||||
revision: Branch or commit
|
||||
prefix: Path prefix
|
||||
recursive: Whether to list recursively
|
||||
|
||||
Returns:
|
||||
List of all LakeFS objects
|
||||
|
||||
Raises:
|
||||
Exception: If listing fails
|
||||
"""
|
||||
client = get_lakefs_client()
|
||||
|
||||
all_results = []
|
||||
after = ""
|
||||
has_more = True
|
||||
|
||||
while has_more:
|
||||
result = await client.list_objects(
|
||||
repository=lakefs_repo,
|
||||
ref=revision,
|
||||
prefix=prefix,
|
||||
delimiter="" if recursive else "/",
|
||||
amount=1000, # Max per request
|
||||
after=after,
|
||||
)
|
||||
|
||||
all_results.extend(result["results"])
|
||||
|
||||
# Check pagination
|
||||
if result.get("pagination") and result["pagination"].get("has_more"):
|
||||
after = result["pagination"]["next_offset"]
|
||||
has_more = True
|
||||
else:
|
||||
has_more = False
|
||||
|
||||
return all_results
|
||||
TREE_PAGE_SIZE = 1000
|
||||
TREE_EXPAND_PAGE_SIZE = 50
|
||||
TREE_DIFF_PAGE_SIZE = 1000
|
||||
TREE_COMMIT_SCAN_PAGE_SIZE = 100
|
||||
PATHS_INFO_MAX_PATHS = 1000
|
||||
PATHS_INFO_CONCURRENCY = 16
|
||||
|
||||
|
||||
async def calculate_folder_stats(
|
||||
lakefs_repo: str, revision: str, folder_path: str
|
||||
) -> tuple[int, float | None]:
|
||||
"""Calculate folder size and latest modification time.
|
||||
|
||||
Args:
|
||||
lakefs_repo: LakeFS repository name
|
||||
revision: Branch or commit
|
||||
folder_path: Full folder path
|
||||
|
||||
Returns:
|
||||
Tuple of (total_size, latest_mtime)
|
||||
"""
|
||||
folder_size = 0
|
||||
folder_latest_mtime = None
|
||||
|
||||
try:
|
||||
client = get_lakefs_client()
|
||||
|
||||
# Paginate through all objects in folder
|
||||
after = ""
|
||||
has_more = True
|
||||
|
||||
while has_more:
|
||||
folder_contents = await client.list_objects(
|
||||
repository=lakefs_repo,
|
||||
ref=revision,
|
||||
prefix=folder_path,
|
||||
delimiter="", # No delimiter = recursive
|
||||
amount=1000,
|
||||
after=after,
|
||||
)
|
||||
|
||||
# Calculate total size and find latest modification
|
||||
for child_obj in folder_contents["results"]:
|
||||
if child_obj["path_type"] == "object":
|
||||
folder_size += child_obj.get("size_bytes") or 0
|
||||
if child_obj.get("mtime"):
|
||||
if (
|
||||
folder_latest_mtime is None
|
||||
or child_obj["mtime"] > folder_latest_mtime
|
||||
):
|
||||
folder_latest_mtime = child_obj["mtime"]
|
||||
|
||||
# Check pagination
|
||||
pagination = folder_contents.get("pagination", {})
|
||||
has_more = pagination.get("has_more", False)
|
||||
if has_more:
|
||||
after = pagination.get("next_offset", "")
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Could not calculate stats for folder {folder_path}: {str(e)}")
|
||||
|
||||
return folder_size, folder_latest_mtime
|
||||
def _normalize_repo_path(path: str) -> str:
|
||||
"""Normalize a repository-relative path."""
|
||||
return path.lstrip("/").rstrip("/")
|
||||
|
||||
|
||||
async def convert_file_object(obj, repository: Repository) -> dict:
|
||||
"""Convert LakeFS file object to HuggingFace format.
|
||||
def _format_last_modified(mtime: float | int | None) -> str | None:
|
||||
"""Format LakeFS mtime for HF-compatible responses."""
|
||||
if not mtime:
|
||||
return None
|
||||
return datetime.fromtimestamp(mtime, tz=timezone.utc).strftime(DATETIME_FORMAT_ISO)
|
||||
|
||||
Args:
|
||||
obj: LakeFS object dict
|
||||
repository: Repository object (FK)
|
||||
|
||||
Returns:
|
||||
HuggingFace formatted file object
|
||||
"""
|
||||
# Use repo-specific LFS settings
|
||||
is_lfs = should_use_lfs(repository, obj["path"], obj["size_bytes"])
|
||||
|
||||
# Use full path relative to repository root (HuggingFace spec)
|
||||
file_path = obj["path"]
|
||||
|
||||
# Get correct checksum from database using repository FK
|
||||
file_record = get_file(repository, obj["path"])
|
||||
|
||||
checksum = (
|
||||
file_record.sha256 if file_record and file_record.sha256 else obj["checksum"]
|
||||
)
|
||||
|
||||
file_obj = {
|
||||
"type": "file",
|
||||
"oid": checksum, # Git blob SHA1 for non-LFS, SHA256 for LFS
|
||||
"size": obj["size_bytes"],
|
||||
"path": file_path,
|
||||
}
|
||||
|
||||
# Add last modified info if available
|
||||
if obj.get("mtime"):
|
||||
file_obj["lastModified"] = datetime.fromtimestamp(obj["mtime"]).strftime(
|
||||
def _format_commit_date(value) -> str | None:
|
||||
"""Normalize commit dates to the HF datetime wire format."""
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, (int, float)):
|
||||
return datetime.fromtimestamp(value, tz=timezone.utc).strftime(
|
||||
DATETIME_FORMAT_ISO
|
||||
)
|
||||
return value
|
||||
|
||||
# Add LFS metadata if it's an LFS file
|
||||
if is_lfs:
|
||||
file_obj["lfs"] = {
|
||||
"oid": checksum, # SHA256 for LFS files
|
||||
"size": obj["size_bytes"],
|
||||
"pointerSize": 134, # Standard Git LFS pointer size
|
||||
|
||||
def _serialize_last_commit(commit: dict) -> dict:
|
||||
"""Convert a LakeFS commit payload to the HF wire format."""
|
||||
return {
|
||||
"id": commit["id"],
|
||||
"title": commit.get("message", ""),
|
||||
"date": _format_commit_date(commit.get("creation_date")),
|
||||
}
|
||||
|
||||
|
||||
def _build_lfs_payload(checksum: str, size_bytes: int) -> dict:
|
||||
"""Build the HF LFS metadata payload."""
|
||||
return {
|
||||
"oid": checksum,
|
||||
"size": size_bytes,
|
||||
"pointerSize": 134,
|
||||
}
|
||||
|
||||
|
||||
def _build_public_link(
|
||||
request: Request,
|
||||
*,
|
||||
limit: int,
|
||||
cursor: str,
|
||||
) -> str:
|
||||
"""Build a public-facing pagination link using the configured base URL."""
|
||||
query_params = dict(request.query_params)
|
||||
query_params["limit"] = str(limit)
|
||||
query_params["cursor"] = cursor
|
||||
return f"{cfg.app.base_url.rstrip('/')}{request.url.path}?{urlencode(query_params)}"
|
||||
|
||||
|
||||
def _build_file_record_map(
|
||||
repository: Repository, paths: list[str]
|
||||
) -> dict[str, File]:
|
||||
"""Fetch file records in one query for the provided paths."""
|
||||
if not paths:
|
||||
return {}
|
||||
|
||||
query = File.select().where(
|
||||
(File.repository == repository)
|
||||
& (File.path_in_repo.in_(paths))
|
||||
& (File.is_deleted == False)
|
||||
)
|
||||
return {file.path_in_repo: file for file in query}
|
||||
|
||||
|
||||
async def fetch_lakefs_objects_page(
|
||||
lakefs_repo: str,
|
||||
revision: str,
|
||||
prefix: str,
|
||||
recursive: bool,
|
||||
amount: int,
|
||||
after: str | None = None,
|
||||
) -> dict:
|
||||
"""Fetch one page of objects from LakeFS."""
|
||||
client = get_lakefs_client()
|
||||
return await client.list_objects(
|
||||
repository=lakefs_repo,
|
||||
ref=revision,
|
||||
prefix=prefix,
|
||||
delimiter="" if recursive else "/",
|
||||
amount=amount,
|
||||
after=after or "",
|
||||
)
|
||||
|
||||
|
||||
def _make_tree_item(
|
||||
obj: dict,
|
||||
repository: Repository,
|
||||
file_records: dict[str, File],
|
||||
expand: bool,
|
||||
last_commit: dict | None = None,
|
||||
) -> dict:
|
||||
"""Convert a LakeFS object/common_prefix to the HF tree wire format."""
|
||||
path = _normalize_repo_path(obj["path"])
|
||||
item_type = obj["path_type"]
|
||||
|
||||
if item_type == "object":
|
||||
size_bytes = obj.get("size_bytes") or 0
|
||||
file_record = file_records.get(path)
|
||||
checksum = (
|
||||
file_record.sha256 if file_record and file_record.sha256 else obj.get("checksum", "")
|
||||
)
|
||||
is_lfs = (
|
||||
file_record.lfs
|
||||
if file_record is not None
|
||||
else should_use_lfs(repository, path, size_bytes)
|
||||
)
|
||||
|
||||
file_obj = {
|
||||
"type": "file",
|
||||
"oid": checksum,
|
||||
"size": size_bytes,
|
||||
"path": path,
|
||||
}
|
||||
|
||||
return file_obj
|
||||
last_modified = _format_last_modified(obj.get("mtime"))
|
||||
if last_modified:
|
||||
file_obj["lastModified"] = last_modified
|
||||
|
||||
if is_lfs:
|
||||
file_obj["lfs"] = _build_lfs_payload(checksum, size_bytes)
|
||||
|
||||
async def convert_directory_object(
|
||||
obj, lakefs_repo: str, revision: str
|
||||
) -> dict:
|
||||
"""Convert LakeFS directory object to HuggingFace format.
|
||||
if expand:
|
||||
file_obj["lastCommit"] = last_commit
|
||||
file_obj["securityFileStatus"] = None
|
||||
|
||||
Args:
|
||||
obj: LakeFS common_prefix object dict
|
||||
lakefs_repo: LakeFS repository name
|
||||
revision: Branch or commit
|
||||
|
||||
Returns:
|
||||
HuggingFace formatted directory object
|
||||
"""
|
||||
# Use full path relative to repository root (HuggingFace spec)
|
||||
dir_path = obj["path"]
|
||||
|
||||
# Calculate folder stats
|
||||
folder_size, folder_latest_mtime = await calculate_folder_stats(
|
||||
lakefs_repo, revision, obj["path"]
|
||||
)
|
||||
return file_obj
|
||||
|
||||
dir_obj = {
|
||||
"type": "directory",
|
||||
"oid": obj.get("checksum", ""),
|
||||
"size": folder_size,
|
||||
"path": dir_path.rstrip("/"), # Remove trailing slash
|
||||
"size": 0,
|
||||
"path": path,
|
||||
}
|
||||
|
||||
# Add last modified info
|
||||
if folder_latest_mtime:
|
||||
dir_obj["lastModified"] = datetime.fromtimestamp(folder_latest_mtime).strftime(
|
||||
DATETIME_FORMAT_ISO
|
||||
)
|
||||
elif obj.get("mtime"):
|
||||
dir_obj["lastModified"] = datetime.fromtimestamp(obj["mtime"]).strftime(
|
||||
DATETIME_FORMAT_ISO
|
||||
)
|
||||
last_modified = _format_last_modified(obj.get("mtime"))
|
||||
if last_modified:
|
||||
dir_obj["lastModified"] = last_modified
|
||||
|
||||
if expand:
|
||||
dir_obj["lastCommit"] = last_commit
|
||||
|
||||
return dir_obj
|
||||
|
||||
|
||||
def _apply_changed_path(
|
||||
changed_path: str,
|
||||
unresolved_files: set[str],
|
||||
unresolved_directories: set[str],
|
||||
resolved: dict[str, dict | None],
|
||||
commit_info: dict,
|
||||
) -> None:
|
||||
"""Resolve file and ancestor directory targets touched by a diff path."""
|
||||
normalized_path = _normalize_repo_path(changed_path)
|
||||
if not normalized_path:
|
||||
return
|
||||
|
||||
if normalized_path in unresolved_files:
|
||||
unresolved_files.remove(normalized_path)
|
||||
resolved[normalized_path] = commit_info
|
||||
|
||||
if normalized_path in unresolved_directories:
|
||||
unresolved_directories.remove(normalized_path)
|
||||
resolved[normalized_path] = commit_info
|
||||
|
||||
ancestor = normalized_path
|
||||
while "/" in ancestor and unresolved_directories:
|
||||
ancestor = ancestor.rsplit("/", 1)[0]
|
||||
if ancestor in unresolved_directories:
|
||||
unresolved_directories.remove(ancestor)
|
||||
resolved[ancestor] = commit_info
|
||||
|
||||
|
||||
async def resolve_last_commits_for_paths(
|
||||
lakefs_repo: str,
|
||||
revision: str,
|
||||
targets: list[dict[str, str]],
|
||||
) -> dict[str, dict | None]:
|
||||
"""Resolve the latest commit touching each target path."""
|
||||
unresolved_files = {
|
||||
target["path"] for target in targets if target["type"] == "file" and target["path"]
|
||||
}
|
||||
unresolved_directories = {
|
||||
target["path"]
|
||||
for target in targets
|
||||
if target["type"] == "directory" and target["path"]
|
||||
}
|
||||
if not unresolved_files and not unresolved_directories:
|
||||
return {}
|
||||
|
||||
client = get_lakefs_rest_client()
|
||||
resolved: dict[str, dict | None] = {}
|
||||
commit_cursor: str | None = None
|
||||
|
||||
while unresolved_files or unresolved_directories:
|
||||
log_result = await client.log_commits(
|
||||
repository=lakefs_repo,
|
||||
ref=revision,
|
||||
after=commit_cursor,
|
||||
amount=TREE_COMMIT_SCAN_PAGE_SIZE,
|
||||
)
|
||||
commits = log_result.get("results", [])
|
||||
if not commits:
|
||||
break
|
||||
|
||||
for commit in commits:
|
||||
commit_info = _serialize_last_commit(commit)
|
||||
parent_ids = commit.get("parents") or []
|
||||
parent_id = parent_ids[0] if parent_ids else None
|
||||
|
||||
if not parent_id:
|
||||
for path in unresolved_files:
|
||||
resolved[path] = commit_info
|
||||
for path in unresolved_directories:
|
||||
resolved[path] = commit_info
|
||||
unresolved_files.clear()
|
||||
unresolved_directories.clear()
|
||||
break
|
||||
|
||||
diff_cursor: str | None = None
|
||||
while unresolved_files or unresolved_directories:
|
||||
diff_result = await client.diff_refs(
|
||||
repository=lakefs_repo,
|
||||
left_ref=parent_id,
|
||||
right_ref=commit["id"],
|
||||
after=diff_cursor,
|
||||
amount=TREE_DIFF_PAGE_SIZE,
|
||||
)
|
||||
|
||||
for entry in diff_result.get("results", []):
|
||||
diff_path = entry.get("path")
|
||||
if diff_path:
|
||||
_apply_changed_path(
|
||||
diff_path,
|
||||
unresolved_files,
|
||||
unresolved_directories,
|
||||
resolved,
|
||||
commit_info,
|
||||
)
|
||||
if not unresolved_files and not unresolved_directories:
|
||||
break
|
||||
|
||||
pagination = diff_result.get("pagination") or {}
|
||||
if (
|
||||
not pagination.get("has_more")
|
||||
or (not unresolved_files and not unresolved_directories)
|
||||
):
|
||||
break
|
||||
diff_cursor = pagination.get("next_offset")
|
||||
|
||||
if not unresolved_files and not unresolved_directories:
|
||||
break
|
||||
|
||||
pagination = log_result.get("pagination") or {}
|
||||
if not pagination.get("has_more") or (
|
||||
not unresolved_files and not unresolved_directories
|
||||
):
|
||||
break
|
||||
commit_cursor = pagination.get("next_offset")
|
||||
|
||||
return resolved
|
||||
|
||||
|
||||
async def _process_single_path(
|
||||
lakefs_repo: str,
|
||||
revision: str,
|
||||
repository: Repository,
|
||||
clean_path: str,
|
||||
file_records: dict[str, File],
|
||||
semaphore: asyncio.Semaphore,
|
||||
) -> dict | None:
|
||||
"""Resolve one path to either a file or directory entry."""
|
||||
client = get_lakefs_client()
|
||||
|
||||
async with semaphore:
|
||||
try:
|
||||
obj_stats = await client.stat_object(
|
||||
repository=lakefs_repo,
|
||||
ref=revision,
|
||||
path=clean_path,
|
||||
)
|
||||
|
||||
file_record = file_records.get(clean_path)
|
||||
checksum = (
|
||||
file_record.sha256
|
||||
if file_record and file_record.sha256
|
||||
else obj_stats.get("checksum", "")
|
||||
)
|
||||
size_bytes = obj_stats.get("size_bytes") or 0
|
||||
is_lfs = (
|
||||
file_record.lfs
|
||||
if file_record is not None
|
||||
else should_use_lfs(repository, clean_path, size_bytes)
|
||||
)
|
||||
|
||||
file_info = {
|
||||
"type": "file",
|
||||
"path": clean_path,
|
||||
"size": size_bytes,
|
||||
"oid": checksum,
|
||||
}
|
||||
|
||||
last_modified = _format_last_modified(obj_stats.get("mtime"))
|
||||
if last_modified:
|
||||
file_info["lastModified"] = last_modified
|
||||
|
||||
if is_lfs:
|
||||
file_info["lfs"] = _build_lfs_payload(checksum, size_bytes)
|
||||
|
||||
return file_info
|
||||
except Exception as error:
|
||||
if not is_lakefs_not_found_error(error):
|
||||
logger.debug(f"Failed to stat path {clean_path}: {error}")
|
||||
return None
|
||||
|
||||
try:
|
||||
list_result = await client.list_objects(
|
||||
repository=lakefs_repo,
|
||||
ref=revision,
|
||||
prefix=f"{clean_path}/",
|
||||
amount=1,
|
||||
)
|
||||
except Exception as error:
|
||||
if not is_lakefs_not_found_error(error):
|
||||
logger.debug(f"Failed to inspect directory path {clean_path}: {error}")
|
||||
return None
|
||||
|
||||
if not list_result.get("results"):
|
||||
return None
|
||||
|
||||
first_result = list_result["results"][0]
|
||||
dir_info = {
|
||||
"type": "directory",
|
||||
"path": clean_path,
|
||||
"oid": first_result.get("checksum", ""),
|
||||
"size": 0,
|
||||
}
|
||||
|
||||
last_modified = _format_last_modified(first_result.get("mtime"))
|
||||
if last_modified:
|
||||
dir_info["lastModified"] = last_modified
|
||||
|
||||
return dir_info
|
||||
|
||||
|
||||
@router.get("/{repo_type}s/{namespace}/{repo_name}/tree/{revision}{path:path}")
|
||||
@with_repo_fallback("tree")
|
||||
async def list_repo_tree(
|
||||
@@ -229,81 +405,108 @@ async def list_repo_tree(
|
||||
path: str = "",
|
||||
recursive: bool = False,
|
||||
expand: bool = False,
|
||||
limit: int | None = Query(default=None, ge=1),
|
||||
cursor: str | None = None,
|
||||
fallback: bool = True,
|
||||
user: User | None = Depends(get_optional_user),
|
||||
):
|
||||
"""List repository file tree.
|
||||
|
||||
Returns a flat list of files and folders in HuggingFace format.
|
||||
|
||||
Args:
|
||||
repo_type: Type of repository
|
||||
namespace: Repository namespace
|
||||
repo_name: Repository name
|
||||
revision: Branch name or commit hash (default: "main")
|
||||
path: Path within repository (default: root)
|
||||
recursive: List recursively (default: False)
|
||||
expand: Include detailed metadata (default: False)
|
||||
user: Current authenticated user (optional)
|
||||
|
||||
Returns:
|
||||
Flat list of file/folder objects
|
||||
"""
|
||||
# Construct full repo ID
|
||||
"""List repository file tree."""
|
||||
repo_id = f"{namespace}/{repo_name}"
|
||||
|
||||
# Check if repository exists using get_repository
|
||||
repo_row = get_repository(repo_type, namespace, repo_name)
|
||||
|
||||
if not repo_row:
|
||||
return hf_repo_not_found(repo_id, repo_type)
|
||||
|
||||
# Check read permission for private repos
|
||||
check_repo_read_permission(repo_row, user)
|
||||
|
||||
lakefs_repo = lakefs_repo_name(repo_type, repo_id)
|
||||
clean_path = _normalize_repo_path(path)
|
||||
prefix = f"{clean_path}/" if clean_path else ""
|
||||
default_limit = TREE_EXPAND_PAGE_SIZE if expand else TREE_PAGE_SIZE
|
||||
page_size = min(limit or default_limit, default_limit)
|
||||
|
||||
# Clean path - ensure it ends with / if not empty
|
||||
prefix = path.lstrip("/") if path and path != "/" else ""
|
||||
if prefix and not prefix.endswith("/"):
|
||||
prefix += "/"
|
||||
|
||||
# Fetch all objects from LakeFS
|
||||
try:
|
||||
all_results = await fetch_lakefs_objects(
|
||||
lakefs_repo, revision, prefix, recursive
|
||||
resolved_revision, _ = await resolve_revision(
|
||||
get_lakefs_client(), lakefs_repo, revision
|
||||
)
|
||||
except Exception as e:
|
||||
# Check for specific error types
|
||||
if is_lakefs_not_found_error(e):
|
||||
if is_lakefs_revision_error(e):
|
||||
except Exception:
|
||||
return hf_revision_not_found(repo_id, revision)
|
||||
|
||||
try:
|
||||
page = await fetch_lakefs_objects_page(
|
||||
lakefs_repo=lakefs_repo,
|
||||
revision=resolved_revision,
|
||||
prefix=prefix,
|
||||
recursive=recursive,
|
||||
amount=page_size,
|
||||
after=cursor,
|
||||
)
|
||||
except Exception as error:
|
||||
if is_lakefs_not_found_error(error):
|
||||
if is_lakefs_revision_error(error):
|
||||
return hf_revision_not_found(repo_id, revision)
|
||||
else:
|
||||
# Return empty list for non-existent paths
|
||||
return []
|
||||
return hf_entry_not_found(repo_id, clean_path or "/", revision)
|
||||
|
||||
# Other errors are server errors
|
||||
logger.exception(f"Failed to list objects for {repo_id}", e)
|
||||
return hf_server_error(f"Failed to list objects: {str(e)}")
|
||||
logger.exception(f"Failed to list objects for {repo_id}", error)
|
||||
return hf_server_error(f"Failed to list objects: {str(error)}")
|
||||
|
||||
# Convert LakeFS objects to HuggingFace format
|
||||
result_list = []
|
||||
page_results = page.get("results", [])
|
||||
if clean_path and not page_results:
|
||||
return hf_entry_not_found(repo_id, clean_path, revision)
|
||||
|
||||
for obj in all_results:
|
||||
match obj["path_type"]:
|
||||
case "object":
|
||||
# File object - pass Repository FK instead of repo_id
|
||||
file_obj = await convert_file_object(obj, repo_row)
|
||||
result_list.append(file_obj)
|
||||
file_paths = [
|
||||
_normalize_repo_path(obj["path"])
|
||||
for obj in page_results
|
||||
if obj.get("path_type") == "object"
|
||||
]
|
||||
file_records = _build_file_record_map(repo_row, file_paths)
|
||||
|
||||
case "common_prefix":
|
||||
# Directory object
|
||||
dir_obj = await convert_directory_object(
|
||||
obj, lakefs_repo, revision
|
||||
)
|
||||
result_list.append(dir_obj)
|
||||
last_commit_map: dict[str, dict | None] = {}
|
||||
if expand and page_results:
|
||||
targets = [
|
||||
{
|
||||
"path": _normalize_repo_path(obj["path"]),
|
||||
"type": "file"
|
||||
if obj.get("path_type") == "object"
|
||||
else "directory",
|
||||
}
|
||||
for obj in page_results
|
||||
]
|
||||
try:
|
||||
last_commit_map = await resolve_last_commits_for_paths(
|
||||
lakefs_repo=lakefs_repo,
|
||||
revision=resolved_revision,
|
||||
targets=targets,
|
||||
)
|
||||
except Exception as error:
|
||||
if is_lakefs_not_found_error(error) and is_lakefs_revision_error(error):
|
||||
return hf_revision_not_found(repo_id, revision)
|
||||
logger.warning(
|
||||
f"Failed to expand last commit data for {repo_id}/{revision}: {error}"
|
||||
)
|
||||
|
||||
return result_list
|
||||
result_list = [
|
||||
_make_tree_item(
|
||||
obj=obj,
|
||||
repository=repo_row,
|
||||
file_records=file_records,
|
||||
expand=expand,
|
||||
last_commit=last_commit_map.get(_normalize_repo_path(obj["path"])),
|
||||
)
|
||||
for obj in page_results
|
||||
]
|
||||
|
||||
headers = {}
|
||||
pagination = page.get("pagination") or {}
|
||||
if pagination.get("has_more") and pagination.get("next_offset"):
|
||||
next_url = _build_public_link(
|
||||
request=request,
|
||||
limit=page_size,
|
||||
cursor=pagination["next_offset"],
|
||||
)
|
||||
headers["Link"] = f'<{next_url}>; rel="next"'
|
||||
|
||||
return JSONResponse(content=result_list, headers=headers)
|
||||
|
||||
|
||||
@router.post("/{repo_type}s/{namespace}/{repo_name}/paths-info/{revision}")
|
||||
@@ -319,123 +522,77 @@ async def get_paths_info(
|
||||
fallback: bool = True,
|
||||
user: User | None = Depends(get_optional_user),
|
||||
):
|
||||
"""Get information about specific paths in a repository.
|
||||
|
||||
This endpoint matches HuggingFace Hub API format.
|
||||
|
||||
Args:
|
||||
repo_type: Type of repository (model/dataset/space)
|
||||
namespace: Repository namespace
|
||||
repo_name: Repository name
|
||||
revision: Branch name or commit hash
|
||||
paths: List of paths to get information about
|
||||
expand: Whether to fetch extended metadata
|
||||
user: Current authenticated user (optional)
|
||||
|
||||
Returns:
|
||||
List of path information objects (files and folders)
|
||||
"""
|
||||
# Construct full repo ID
|
||||
"""Get information about specific paths in a repository."""
|
||||
repo_id = f"{namespace}/{repo_name}"
|
||||
|
||||
# Check if repository exists using get_repository
|
||||
repo_row = get_repository(repo_type, namespace, repo_name)
|
||||
|
||||
if not repo_row:
|
||||
return hf_repo_not_found(repo_id, repo_type)
|
||||
|
||||
# Check read permission for private repos
|
||||
check_repo_read_permission(repo_row, user)
|
||||
|
||||
normalized_paths = [_normalize_repo_path(path) for path in paths if path]
|
||||
if len(normalized_paths) > PATHS_INFO_MAX_PATHS:
|
||||
return hf_bad_request(
|
||||
f"Too many paths requested. Maximum supported paths per request is {PATHS_INFO_MAX_PATHS}."
|
||||
)
|
||||
|
||||
lakefs_repo = lakefs_repo_name(repo_type, repo_id)
|
||||
try:
|
||||
resolved_revision, _ = await resolve_revision(
|
||||
get_lakefs_client(), lakefs_repo, revision
|
||||
)
|
||||
except Exception:
|
||||
return hf_revision_not_found(repo_id, revision)
|
||||
|
||||
# Helper function to process a single path
|
||||
async def process_path(path: str) -> dict | None:
|
||||
"""Process single path and return its info, or None if path doesn't exist."""
|
||||
clean_path = path.lstrip("/")
|
||||
file_records = _build_file_record_map(repo_row, normalized_paths)
|
||||
semaphore = asyncio.Semaphore(PATHS_INFO_CONCURRENCY)
|
||||
|
||||
try:
|
||||
results = await asyncio.gather(
|
||||
*[
|
||||
_process_single_path(
|
||||
lakefs_repo=lakefs_repo,
|
||||
revision=resolved_revision,
|
||||
repository=repo_row,
|
||||
clean_path=clean_path,
|
||||
file_records=file_records,
|
||||
semaphore=semaphore,
|
||||
)
|
||||
for clean_path in normalized_paths
|
||||
]
|
||||
)
|
||||
except Exception as error:
|
||||
if is_lakefs_not_found_error(error):
|
||||
if is_lakefs_revision_error(error):
|
||||
return hf_revision_not_found(repo_id, revision)
|
||||
return []
|
||||
logger.exception(f"Failed to fetch paths info for {repo_id}", error)
|
||||
return hf_server_error(f"Failed to fetch paths info: {str(error)}")
|
||||
|
||||
existing_entries = [entry for entry in results if entry is not None]
|
||||
|
||||
if expand and existing_entries:
|
||||
targets = [
|
||||
{"path": entry["path"], "type": entry["type"]} for entry in existing_entries
|
||||
]
|
||||
try:
|
||||
# Try to get object stats
|
||||
client = get_lakefs_client()
|
||||
obj_stats = await client.stat_object(
|
||||
repository=lakefs_repo,
|
||||
ref=revision,
|
||||
path=clean_path,
|
||||
last_commit_map = await resolve_last_commits_for_paths(
|
||||
lakefs_repo=lakefs_repo,
|
||||
revision=resolved_revision,
|
||||
targets=targets,
|
||||
)
|
||||
|
||||
# It's a file - use repo-specific LFS settings
|
||||
is_lfs = should_use_lfs(repo_row, clean_path, obj_stats["size_bytes"])
|
||||
|
||||
# Get correct checksum from database using repository FK
|
||||
file_record = get_file(repo_row, clean_path)
|
||||
|
||||
checksum = (
|
||||
file_record.sha256
|
||||
if file_record and file_record.sha256
|
||||
else obj_stats["checksum"]
|
||||
except Exception as error:
|
||||
if is_lakefs_not_found_error(error) and is_lakefs_revision_error(error):
|
||||
return hf_revision_not_found(repo_id, revision)
|
||||
logger.warning(
|
||||
f"Failed to expand paths-info commit data for {repo_id}/{revision}: {error}"
|
||||
)
|
||||
last_commit_map = {}
|
||||
|
||||
file_info = {
|
||||
"type": "file",
|
||||
"path": clean_path,
|
||||
"size": obj_stats["size_bytes"],
|
||||
"oid": checksum, # Git blob SHA1 for non-LFS, SHA256 for LFS
|
||||
"lfs": None,
|
||||
"last_commit": None,
|
||||
"security": None,
|
||||
}
|
||||
for entry in existing_entries:
|
||||
entry["lastCommit"] = last_commit_map.get(entry["path"])
|
||||
if entry["type"] == "file":
|
||||
entry["securityFileStatus"] = None
|
||||
|
||||
# Add LFS metadata if applicable
|
||||
if is_lfs:
|
||||
file_info["lfs"] = {
|
||||
"oid": checksum, # SHA256 for LFS files
|
||||
"size": obj_stats["size_bytes"],
|
||||
"pointerSize": 134,
|
||||
}
|
||||
|
||||
return file_info
|
||||
|
||||
except Exception as e:
|
||||
# Check if it might be a directory by trying to list with prefix
|
||||
if is_lakefs_not_found_error(e):
|
||||
try:
|
||||
# Try to list objects with this path as prefix
|
||||
client = get_lakefs_client()
|
||||
prefix = (
|
||||
clean_path if clean_path.endswith("/") else clean_path + "/"
|
||||
)
|
||||
list_result = await client.list_objects(
|
||||
repository=lakefs_repo,
|
||||
ref=revision,
|
||||
prefix=prefix,
|
||||
amount=1, # Just check if any objects exist
|
||||
)
|
||||
|
||||
# If we get results, it's a directory
|
||||
if list_result["results"]:
|
||||
# Try to get an oid from the first result if available
|
||||
oid = ""
|
||||
if list_result["results"]:
|
||||
oid = list_result["results"][0].get("checksum", "")
|
||||
|
||||
return {
|
||||
"type": "directory",
|
||||
"path": clean_path,
|
||||
"oid": oid,
|
||||
"tree_id": oid,
|
||||
"last_commit": None,
|
||||
}
|
||||
# Path doesn't exist, return None
|
||||
return None
|
||||
except Exception as ex:
|
||||
# Path doesn't exist, skip it (as per HF behavior)
|
||||
logger.debug(f"Path {clean_path} doesn't exist or is invalid")
|
||||
return None
|
||||
# For other errors, also return None
|
||||
return None
|
||||
|
||||
# Process all paths in parallel
|
||||
results = await asyncio.gather(*[process_path(path) for path in paths])
|
||||
|
||||
# Filter out None values (paths that don't exist)
|
||||
return [r for r in results if r is not None]
|
||||
return existing_entries
|
||||
|
||||
Reference in New Issue
Block a user