Files
KohakuHub/scripts/dev/init_lakefs.py

153 lines
4.4 KiB
Python
Executable File

#!/usr/bin/env python3
"""Initialize local LakeFS and persist credentials for the Python backend."""
from __future__ import annotations
import argparse
import os
import sys
import time
from pathlib import Path
import httpx
def wait_for_lakefs(endpoint: str, timeout_seconds: int) -> None:
health_url = f"{endpoint.rstrip('/')}/_health"
deadline = time.time() + timeout_seconds
while time.time() < deadline:
try:
response = httpx.get(health_url, timeout=2.0)
if response.status_code == 200:
return
except Exception:
pass
time.sleep(1)
raise TimeoutError(f"Timed out waiting for LakeFS: {health_url}")
def read_credentials(path: Path) -> dict[str, str]:
credentials: dict[str, str] = {}
if not path.exists():
return credentials
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
credentials[key] = value
return credentials
def write_credentials(path: Path, access_key: str, secret_key: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
"\n".join(
[
f"KOHAKU_HUB_LAKEFS_ACCESS_KEY={access_key}",
f"KOHAKU_HUB_LAKEFS_SECRET_KEY={secret_key}",
"",
]
),
encoding="utf-8",
)
def is_initialized(client: httpx.Client, endpoint: str) -> bool:
response = client.get(f"{endpoint.rstrip('/')}/api/v1/setup_lakefs", timeout=5.0)
if response.status_code != 200:
return False
try:
return bool(response.json().get("initialized"))
except Exception:
return False
def initialize_lakefs(
endpoint: str,
credentials_file: Path,
admin_user: str,
timeout_seconds: int,
) -> int:
wait_for_lakefs(endpoint, timeout_seconds)
existing = read_credentials(credentials_file)
if existing.get("KOHAKU_HUB_LAKEFS_ACCESS_KEY") and existing.get(
"KOHAKU_HUB_LAKEFS_SECRET_KEY"
):
print(f"LakeFS credentials already exist: {credentials_file}")
return 0
with httpx.Client() as client:
if is_initialized(client, endpoint):
print(
"LakeFS is already initialized but no local credentials file was found.\n"
f"Expected: {credentials_file}\n"
"Either restore that file or remove hub-meta/dev/lakefs-data to re-initialize.",
file=sys.stderr,
)
return 1
response = client.post(
f"{endpoint.rstrip('/')}/api/v1/setup_lakefs",
json={"username": admin_user},
headers={"accept": "application/json"},
timeout=10.0,
)
response.raise_for_status()
payload = response.json()
access_key = payload["access_key_id"]
secret_key = payload["secret_access_key"]
write_credentials(credentials_file, access_key, secret_key)
print(f"Initialized LakeFS and wrote credentials to {credentials_file}")
return 0
def main() -> int:
parser = argparse.ArgumentParser(
description="Initialize local LakeFS and persist credentials."
)
parser.add_argument(
"--endpoint",
default=os.environ.get("KOHAKU_HUB_LAKEFS_ENDPOINT", "http://127.0.0.1:28000"),
help="LakeFS endpoint",
)
parser.add_argument(
"--credentials-file",
type=Path,
default=Path("hub-meta/dev/lakefs/credentials.env"),
help="Path to the generated credentials.env",
)
parser.add_argument(
"--admin-user",
default=os.environ.get("LAKEFS_ADMIN_USER", "admin"),
help="LakeFS bootstrap admin username",
)
parser.add_argument(
"--timeout-seconds",
type=int,
default=60,
help="How long to wait for LakeFS to become healthy",
)
args = parser.parse_args()
try:
return initialize_lakefs(
endpoint=args.endpoint,
credentials_file=args.credentials_file,
admin_user=args.admin_user,
timeout_seconds=args.timeout_seconds,
)
except Exception as exc:
print(f"Failed to initialize LakeFS: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())