mirror of
https://github.com/KohakuBlueleaf/KohakuHub.git
synced 2026-05-07 20:38:08 -05:00
153 lines
4.4 KiB
Python
Executable File
153 lines
4.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Initialize local LakeFS and persist credentials for the Python backend."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
|
|
|
|
def wait_for_lakefs(endpoint: str, timeout_seconds: int) -> None:
|
|
health_url = f"{endpoint.rstrip('/')}/_health"
|
|
deadline = time.time() + timeout_seconds
|
|
|
|
while time.time() < deadline:
|
|
try:
|
|
response = httpx.get(health_url, timeout=2.0)
|
|
if response.status_code == 200:
|
|
return
|
|
except Exception:
|
|
pass
|
|
time.sleep(1)
|
|
|
|
raise TimeoutError(f"Timed out waiting for LakeFS: {health_url}")
|
|
|
|
|
|
def read_credentials(path: Path) -> dict[str, str]:
|
|
credentials: dict[str, str] = {}
|
|
if not path.exists():
|
|
return credentials
|
|
|
|
for line in path.read_text(encoding="utf-8").splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, value = line.split("=", 1)
|
|
credentials[key] = value
|
|
return credentials
|
|
|
|
|
|
def write_credentials(path: Path, access_key: str, secret_key: str) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(
|
|
"\n".join(
|
|
[
|
|
f"KOHAKU_HUB_LAKEFS_ACCESS_KEY={access_key}",
|
|
f"KOHAKU_HUB_LAKEFS_SECRET_KEY={secret_key}",
|
|
"",
|
|
]
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
|
|
def is_initialized(client: httpx.Client, endpoint: str) -> bool:
|
|
response = client.get(f"{endpoint.rstrip('/')}/api/v1/setup_lakefs", timeout=5.0)
|
|
if response.status_code != 200:
|
|
return False
|
|
|
|
try:
|
|
return bool(response.json().get("initialized"))
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def initialize_lakefs(
|
|
endpoint: str,
|
|
credentials_file: Path,
|
|
admin_user: str,
|
|
timeout_seconds: int,
|
|
) -> int:
|
|
wait_for_lakefs(endpoint, timeout_seconds)
|
|
|
|
existing = read_credentials(credentials_file)
|
|
if existing.get("KOHAKU_HUB_LAKEFS_ACCESS_KEY") and existing.get(
|
|
"KOHAKU_HUB_LAKEFS_SECRET_KEY"
|
|
):
|
|
print(f"LakeFS credentials already exist: {credentials_file}")
|
|
return 0
|
|
|
|
with httpx.Client() as client:
|
|
if is_initialized(client, endpoint):
|
|
print(
|
|
"LakeFS is already initialized but no local credentials file was found.\n"
|
|
f"Expected: {credentials_file}\n"
|
|
"Either restore that file or remove hub-meta/dev/lakefs-data to re-initialize.",
|
|
file=sys.stderr,
|
|
)
|
|
return 1
|
|
|
|
response = client.post(
|
|
f"{endpoint.rstrip('/')}/api/v1/setup_lakefs",
|
|
json={"username": admin_user},
|
|
headers={"accept": "application/json"},
|
|
timeout=10.0,
|
|
)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
|
|
access_key = payload["access_key_id"]
|
|
secret_key = payload["secret_access_key"]
|
|
write_credentials(credentials_file, access_key, secret_key)
|
|
print(f"Initialized LakeFS and wrote credentials to {credentials_file}")
|
|
return 0
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(
|
|
description="Initialize local LakeFS and persist credentials."
|
|
)
|
|
parser.add_argument(
|
|
"--endpoint",
|
|
default=os.environ.get("KOHAKU_HUB_LAKEFS_ENDPOINT", "http://127.0.0.1:28000"),
|
|
help="LakeFS endpoint",
|
|
)
|
|
parser.add_argument(
|
|
"--credentials-file",
|
|
type=Path,
|
|
default=Path("hub-meta/dev/lakefs/credentials.env"),
|
|
help="Path to the generated credentials.env",
|
|
)
|
|
parser.add_argument(
|
|
"--admin-user",
|
|
default=os.environ.get("LAKEFS_ADMIN_USER", "admin"),
|
|
help="LakeFS bootstrap admin username",
|
|
)
|
|
parser.add_argument(
|
|
"--timeout-seconds",
|
|
type=int,
|
|
default=60,
|
|
help="How long to wait for LakeFS to become healthy",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
return initialize_lakefs(
|
|
endpoint=args.endpoint,
|
|
credentials_file=args.credentials_file,
|
|
admin_user=args.admin_user,
|
|
timeout_seconds=args.timeout_seconds,
|
|
)
|
|
except Exception as exc:
|
|
print(f"Failed to initialize LakeFS: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|