This commit is contained in:
Timothy Jaeryang Baek
2026-02-11 02:06:43 -06:00
parent 30f72672fa
commit 3e56261c5e
8 changed files with 115 additions and 209 deletions

View File

@@ -107,71 +107,79 @@ def has_permission(
return get_permission(default_permissions, permission_hierarchy)
def get_permitted_group_and_user_ids(
type: str = "write", access_control: Optional[dict] = None
) -> Union[Dict[str, List[str]], None]:
if access_control is None:
return None
permission_access = access_control.get(type, {})
permitted_group_ids = permission_access.get("group_ids", [])
permitted_user_ids = permission_access.get("user_ids", [])
return {
"group_ids": permitted_group_ids,
"user_ids": permitted_user_ids,
}
def has_access(
user_id: str,
type: str = "write",
access_control: Optional[dict] = None,
permission: str = "read",
access_grants: Optional[list] = None,
user_group_ids: Optional[Set[str]] = None,
strict: bool = True,
db: Optional[Any] = None,
) -> bool:
if access_control is None:
if strict:
return type == "read"
else:
return True
"""
Check if a user has the specified permission using an in-memory access_grants list.
Used for config-driven resources (arena models, tool servers) that store
access control as JSON in PersistentConfig rather than in the access_grant DB table.
Semantics:
- None or [] → private (owner-only, deny all)
- [{"principal_type": "user", "principal_id": "*", "permission": "read"}] → public read
- Specific grants → check user/group membership
"""
if not access_grants:
return False
if user_group_ids is None:
user_groups = Groups.get_groups_by_member_id(user_id, db=db)
user_group_ids = {group.id for group in user_groups}
permitted_ids = get_permitted_group_and_user_ids(type, access_control)
if permitted_ids is None:
return False
permitted_group_ids = permitted_ids.get("group_ids", [])
permitted_user_ids = permitted_ids.get("user_ids", [])
return user_id in permitted_user_ids or any(
group_id in permitted_group_ids for group_id in user_group_ids
)
for grant in access_grants:
if not isinstance(grant, dict):
continue
if grant.get("permission") != permission:
continue
principal_type = grant.get("principal_type")
principal_id = grant.get("principal_id")
if principal_type == "user" and (principal_id == "*" or principal_id == user_id):
return True
if principal_type == "group" and user_group_ids and principal_id in user_group_ids:
return True
# Get all users with access to a resource
def get_users_with_access(
type: str = "write", access_control: Optional[dict] = None, db: Optional[Any] = None
) -> list[UserModel]:
if access_control is None:
result = Users.get_users(filter={"roles": ["!pending"]}, db=db)
return result.get("users", [])
return False
permitted_ids = get_permitted_group_and_user_ids(type, access_control)
if permitted_ids is None:
return []
permitted_group_ids = permitted_ids.get("group_ids", [])
permitted_user_ids = permitted_ids.get("user_ids", [])
def migrate_access_control(data: dict, ac_key: str = "access_control", grants_key: str = "access_grants") -> None:
"""
Auto-migrate a config dict in-place from legacy access_control dict to access_grants list.
user_ids_with_access = set(permitted_user_ids)
If `grants_key` already exists, does nothing.
If `ac_key` exists (old format), converts it and stores as `grants_key`, then removes `ac_key`.
"""
if grants_key in data:
return
group_user_ids_map = Groups.get_group_user_ids_by_ids(permitted_group_ids, db=db)
for user_ids in group_user_ids_map.values():
user_ids_with_access.update(user_ids)
access_control = data.get(ac_key)
if access_control is None and ac_key not in data:
return
return Users.get_users_by_user_ids(list(user_ids_with_access), db=db)
grants: List[Dict[str, str]] = []
if access_control and isinstance(access_control, dict):
for perm in ["read", "write"]:
perm_data = access_control.get(perm, {})
if not perm_data:
continue
for group_id in perm_data.get("group_ids", []):
grants.append({
"principal_type": "group",
"principal_id": group_id,
"permission": perm,
})
for uid in perm_data.get("user_ids", []):
grants.append({
"principal_type": "user",
"principal_id": uid,
"permission": perm,
})
data[grants_key] = grants
data.pop(ac_key, None)