mirror of
https://github.com/KohakuBlueleaf/KohakuHub.git
synced 2026-03-11 17:34:08 -05:00
358 lines
11 KiB
Python
358 lines
11 KiB
Python
"""File operation tests.
|
|
|
|
Tests file upload, download, deletion, and listing.
|
|
Covers both small files (inline) and large files (LFS).
|
|
"""
|
|
|
|
import hashlib
|
|
import os
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
|
|
class TestFileOperations:
|
|
"""Test file upload, download, and deletion operations."""
|
|
|
|
def test_upload_small_file_hf_client(self, temp_repo):
|
|
"""Test uploading small file (<10MB) using HuggingFace Hub client."""
|
|
repo_id, repo_type, hf_client = temp_repo
|
|
|
|
# Create small test file in temp directory
|
|
import tempfile
|
|
|
|
test_content = b"Hello, KohakuHub! This is a small test file."
|
|
test_file = (
|
|
Path(tempfile.gettempdir()) / f"test_small_{os.urandom(4).hex()}.txt"
|
|
)
|
|
test_file.write_bytes(test_content)
|
|
|
|
# Upload file
|
|
hf_client.upload_file(
|
|
path_or_fileobj=str(test_file),
|
|
path_in_repo="test_small.txt",
|
|
repo_id=repo_id,
|
|
repo_type=repo_type,
|
|
commit_message="Add small test file",
|
|
)
|
|
|
|
# Download and verify
|
|
downloaded = hf_client.download_file(
|
|
repo_id=repo_id, filename="test_small.txt", repo_type=repo_type
|
|
)
|
|
assert Path(downloaded).exists()
|
|
content = Path(downloaded).read_bytes()
|
|
assert content == test_content
|
|
|
|
# Cleanup temp file
|
|
test_file.unlink(missing_ok=True)
|
|
|
|
def test_upload_folder_hf_client(self, temp_repo):
|
|
"""Test uploading folder using HuggingFace Hub client."""
|
|
repo_id, repo_type, hf_client = temp_repo
|
|
|
|
# Create temp folder with files
|
|
import tempfile
|
|
|
|
temp_dir = Path(tempfile.mkdtemp())
|
|
(temp_dir / "file1.txt").write_bytes(b"File 1 content")
|
|
(temp_dir / "file2.txt").write_bytes(b"File 2 content")
|
|
(temp_dir / "subdir").mkdir()
|
|
(temp_dir / "subdir" / "file3.txt").write_bytes(b"File 3 content")
|
|
|
|
# Upload folder
|
|
hf_client.upload_folder(
|
|
folder_path=str(temp_dir),
|
|
path_in_repo="uploaded_folder/",
|
|
repo_id=repo_id,
|
|
repo_type=repo_type,
|
|
commit_message="Upload folder",
|
|
)
|
|
|
|
# Verify files exist
|
|
files = hf_client.list_repo_files(repo_id=repo_id, repo_type=repo_type)
|
|
assert "uploaded_folder/file1.txt" in files
|
|
assert "uploaded_folder/file2.txt" in files
|
|
assert "uploaded_folder/subdir/file3.txt" in files
|
|
|
|
# Cleanup
|
|
import shutil
|
|
|
|
shutil.rmtree(temp_dir)
|
|
|
|
def test_download_file_hf_client(self, temp_repo):
|
|
"""Test downloading file using HuggingFace Hub client."""
|
|
repo_id, repo_type, hf_client = temp_repo
|
|
|
|
# Upload a file first
|
|
import tempfile
|
|
|
|
test_content = b"Download test content"
|
|
test_file = (
|
|
Path(tempfile.gettempdir()) / f"test_download_{os.urandom(4).hex()}.txt"
|
|
)
|
|
test_file.write_bytes(test_content)
|
|
|
|
hf_client.upload_file(
|
|
path_or_fileobj=str(test_file),
|
|
path_in_repo="test_download.txt",
|
|
repo_id=repo_id,
|
|
repo_type=repo_type,
|
|
)
|
|
|
|
# Download file
|
|
downloaded = hf_client.download_file(
|
|
repo_id=repo_id, filename="test_download.txt", repo_type=repo_type
|
|
)
|
|
|
|
# Verify content
|
|
assert Path(downloaded).exists()
|
|
content = Path(downloaded).read_bytes()
|
|
assert content == test_content
|
|
|
|
# Cleanup
|
|
test_file.unlink(missing_ok=True)
|
|
|
|
def test_delete_file_hf_client(self, temp_repo):
|
|
"""Test deleting file using HuggingFace Hub client."""
|
|
repo_id, repo_type, hf_client = temp_repo
|
|
|
|
# Upload a file first
|
|
import tempfile
|
|
|
|
test_content = b"File to be deleted"
|
|
test_file = (
|
|
Path(tempfile.gettempdir()) / f"test_delete_{os.urandom(4).hex()}.txt"
|
|
)
|
|
test_file.write_bytes(test_content)
|
|
|
|
hf_client.upload_file(
|
|
path_or_fileobj=str(test_file),
|
|
path_in_repo="test_delete.txt",
|
|
repo_id=repo_id,
|
|
repo_type=repo_type,
|
|
)
|
|
|
|
# Verify file exists
|
|
files = hf_client.list_repo_files(repo_id=repo_id, repo_type=repo_type)
|
|
assert "test_delete.txt" in files
|
|
|
|
# Delete file
|
|
hf_client.delete_file(
|
|
path_in_repo="test_delete.txt",
|
|
repo_id=repo_id,
|
|
repo_type=repo_type,
|
|
commit_message="Delete test file",
|
|
)
|
|
|
|
# Verify file is deleted
|
|
files = hf_client.list_repo_files(repo_id=repo_id, repo_type=repo_type)
|
|
assert "test_delete.txt" not in files
|
|
|
|
# Cleanup
|
|
test_file.unlink(missing_ok=True)
|
|
|
|
def test_list_repo_files(self, temp_repo):
|
|
"""Test listing repository files."""
|
|
repo_id, repo_type, hf_client = temp_repo
|
|
|
|
# Upload multiple files
|
|
import tempfile
|
|
|
|
temp_dir = Path(tempfile.mkdtemp())
|
|
files_to_upload = {
|
|
"file1.txt": b"Content 1",
|
|
"file2.txt": b"Content 2",
|
|
"subdir/file3.txt": b"Content 3",
|
|
}
|
|
|
|
for file_path, content in files_to_upload.items():
|
|
full_path = temp_dir / file_path
|
|
full_path.parent.mkdir(parents=True, exist_ok=True)
|
|
full_path.write_bytes(content)
|
|
|
|
hf_client.upload_folder(
|
|
folder_path=str(temp_dir),
|
|
path_in_repo="",
|
|
repo_id=repo_id,
|
|
repo_type=repo_type,
|
|
)
|
|
|
|
# List files
|
|
files = hf_client.list_repo_files(repo_id=repo_id, repo_type=repo_type)
|
|
assert isinstance(files, list)
|
|
assert "file1.txt" in files
|
|
assert "file2.txt" in files
|
|
assert "subdir/file3.txt" in files
|
|
|
|
# Cleanup
|
|
import shutil
|
|
|
|
shutil.rmtree(temp_dir)
|
|
|
|
def test_file_metadata_head_request(self, random_user, temp_repo):
|
|
"""Test getting file metadata via HEAD request."""
|
|
username, token, _ = random_user
|
|
repo_id, repo_type, hf_client = temp_repo
|
|
|
|
# Upload a file
|
|
import tempfile
|
|
|
|
test_content = b"Metadata test content"
|
|
test_file = (
|
|
Path(tempfile.gettempdir()) / f"test_metadata_{os.urandom(4).hex()}.txt"
|
|
)
|
|
test_file.write_bytes(test_content)
|
|
|
|
hf_client.upload_file(
|
|
path_or_fileobj=str(test_file),
|
|
path_in_repo="test_metadata.txt",
|
|
repo_id=repo_id,
|
|
repo_type=repo_type,
|
|
)
|
|
|
|
# HEAD request to get metadata using repo owner's token
|
|
from tests.base import HTTPClient
|
|
|
|
user_http_client = HTTPClient(token=token)
|
|
|
|
namespace, repo_name = repo_id.split("/")
|
|
resp = user_http_client.head(
|
|
f"/{repo_type}s/{namespace}/{repo_name}/resolve/main/test_metadata.txt"
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
# Check headers
|
|
assert "X-Repo-Commit" in resp.headers or "ETag" in resp.headers
|
|
if "Content-Length" in resp.headers:
|
|
assert int(resp.headers["Content-Length"]) == len(test_content)
|
|
|
|
# Cleanup
|
|
test_file.unlink(missing_ok=True)
|
|
|
|
def test_upload_with_commit_message(self, temp_repo):
|
|
"""Test uploading file with custom commit message."""
|
|
repo_id, repo_type, hf_client = temp_repo
|
|
|
|
# Upload file with custom message
|
|
import tempfile
|
|
|
|
test_content = b"Commit message test"
|
|
test_file = (
|
|
Path(tempfile.gettempdir()) / f"test_commit_{os.urandom(4).hex()}.txt"
|
|
)
|
|
test_file.write_bytes(test_content)
|
|
|
|
commit_message = "Custom commit message for testing"
|
|
hf_client.upload_file(
|
|
path_or_fileobj=str(test_file),
|
|
path_in_repo="test_commit.txt",
|
|
repo_id=repo_id,
|
|
repo_type=repo_type,
|
|
commit_message=commit_message,
|
|
)
|
|
|
|
# Note: Verifying commit message would require commit history API
|
|
# Just verify file was uploaded
|
|
files = hf_client.list_repo_files(repo_id=repo_id, repo_type=repo_type)
|
|
assert "test_commit.txt" in files
|
|
|
|
# Cleanup
|
|
test_file.unlink(missing_ok=True)
|
|
|
|
def test_file_content_integrity(self, temp_repo):
|
|
"""Test that file content integrity is preserved through upload/download."""
|
|
repo_id, repo_type, hf_client = temp_repo
|
|
|
|
# Create file with random content
|
|
import tempfile
|
|
|
|
test_content = os.urandom(100 * 1000) # 100KB random data
|
|
original_hash = hashlib.sha256(test_content).hexdigest()
|
|
|
|
test_file = (
|
|
Path(tempfile.gettempdir()) / f"test_integrity_{os.urandom(4).hex()}.bin"
|
|
)
|
|
test_file.write_bytes(test_content)
|
|
|
|
# Upload
|
|
hf_client.upload_file(
|
|
path_or_fileobj=str(test_file),
|
|
path_in_repo="test_integrity.bin",
|
|
repo_id=repo_id,
|
|
repo_type=repo_type,
|
|
)
|
|
|
|
# Download
|
|
downloaded = hf_client.download_file(
|
|
repo_id=repo_id, filename="test_integrity.bin", repo_type=repo_type
|
|
)
|
|
|
|
# Verify integrity
|
|
downloaded_content = Path(downloaded).read_bytes()
|
|
downloaded_hash = hashlib.sha256(downloaded_content).hexdigest()
|
|
assert (
|
|
downloaded_hash == original_hash
|
|
), "File content corrupted during upload/download"
|
|
|
|
# Cleanup
|
|
test_file.unlink(missing_ok=True)
|
|
|
|
def test_nonexistent_file_download(self, temp_repo):
|
|
"""Test downloading non-existent file."""
|
|
repo_id, repo_type, hf_client = temp_repo
|
|
|
|
# Try to download non-existent file
|
|
with pytest.raises(Exception) as exc_info:
|
|
hf_client.download_file(
|
|
repo_id=repo_id, filename="nonexistent.txt", repo_type=repo_type
|
|
)
|
|
|
|
# Should be an error (404 or file not found)
|
|
error_msg = str(exc_info.value).lower()
|
|
assert (
|
|
"404" in error_msg or "not found" in error_msg or "cannot find" in error_msg
|
|
)
|
|
|
|
def test_tree_endpoint(self, random_user, temp_repo):
|
|
"""Test tree endpoint for listing files."""
|
|
username, token, _ = random_user
|
|
repo_id, repo_type, hf_client = temp_repo
|
|
|
|
# Upload some files
|
|
import tempfile
|
|
|
|
temp_dir = Path(tempfile.mkdtemp())
|
|
(temp_dir / "file1.txt").write_bytes(b"Content 1")
|
|
(temp_dir / "dir1").mkdir()
|
|
(temp_dir / "dir1" / "file2.txt").write_bytes(b"Content 2")
|
|
|
|
hf_client.upload_folder(
|
|
folder_path=str(temp_dir),
|
|
path_in_repo="",
|
|
repo_id=repo_id,
|
|
repo_type=repo_type,
|
|
)
|
|
|
|
# Query tree endpoint using repo owner's token
|
|
from tests.base import HTTPClient
|
|
|
|
user_http_client = HTTPClient(token=token)
|
|
|
|
namespace, repo_name = repo_id.split("/")
|
|
resp = user_http_client.get(
|
|
f"/api/{repo_type}s/{namespace}/{repo_name}/tree/main/"
|
|
)
|
|
assert resp.status_code == 200
|
|
tree_data = resp.json()
|
|
assert isinstance(tree_data, list)
|
|
|
|
# Check files are in tree
|
|
paths = [item["path"] for item in tree_data]
|
|
assert "file1.txt" in paths
|
|
|
|
# Cleanup
|
|
import shutil
|
|
|
|
shutil.rmtree(temp_dir)
|