This commit is contained in:
Timothy Jaeryang Baek
2026-02-23 16:01:03 -06:00
parent 3d99de6771
commit 176f9a7816
7 changed files with 125 additions and 261 deletions

View File

@@ -29,8 +29,8 @@ from open_webui.storage.provider import Storage
from open_webui.constants import ERROR_MESSAGES
from open_webui.utils.auth import get_verified_user, get_admin_user
from open_webui.utils.access_control import has_permission
from open_webui.models.access_grants import AccessGrants, has_public_read_access_grant, has_user_access_grant, strip_user_access_grants
from open_webui.utils.access_control import has_permission, filter_allowed_access_grants
from open_webui.models.access_grants import AccessGrants
from open_webui.config import BYPASS_ADMIN_ACCESS_CONTROL
@@ -251,7 +251,7 @@ async def create_new_knowledge(
user=Depends(get_verified_user),
):
# NOTE: We intentionally do NOT use Depends(get_session) here.
# Database operations (has_permission, insert_new_knowledge) manage their own sessions.
# Database operations (has_permission, filter_allowed_access_grants, insert_new_knowledge) manage their own sessions.
# This prevents holding a connection during embed_knowledge_base_metadata()
# which makes external embedding API calls (1-5+ seconds).
if user.role != "admin" and not has_permission(
@@ -262,29 +262,13 @@ async def create_new_knowledge(
detail=ERROR_MESSAGES.UNAUTHORIZED,
)
# Check if user can share publicly
if (
user.role != "admin"
and has_public_read_access_grant(form_data.access_grants)
and not has_permission(
user.id,
"sharing.public_knowledge",
request.app.state.config.USER_PERMISSIONS,
)
):
form_data.access_grants = []
# Strip individual user sharing if user lacks permission
if (
user.role != "admin"
and has_user_access_grant(form_data.access_grants)
and not has_permission(
user.id,
"access_grants.allow_users",
request.app.state.config.USER_PERMISSIONS,
)
):
form_data.access_grants = strip_user_access_grants(form_data.access_grants)
form_data.access_grants = filter_allowed_access_grants(
request.app.state.config.USER_PERMISSIONS,
user.id,
user.role,
form_data.access_grants,
"sharing.public_knowledge",
)
knowledge = Knowledges.insert_new_knowledge(user.id, form_data)
@@ -494,29 +478,13 @@ async def update_knowledge_by_id(
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
# Check if user can share publicly
if (
user.role != "admin"
and has_public_read_access_grant(form_data.access_grants)
and not has_permission(
user.id,
"sharing.public_knowledge",
request.app.state.config.USER_PERMISSIONS,
)
):
form_data.access_grants = []
# Strip individual user sharing if user lacks permission
if (
user.role != "admin"
and has_user_access_grant(form_data.access_grants)
and not has_permission(
user.id,
"access_grants.allow_users",
request.app.state.config.USER_PERMISSIONS,
)
):
form_data.access_grants = strip_user_access_grants(form_data.access_grants)
form_data.access_grants = filter_allowed_access_grants(
request.app.state.config.USER_PERMISSIONS,
user.id,
user.role,
form_data.access_grants,
"sharing.public_knowledge",
)
knowledge = Knowledges.update_knowledge_by_id(id=id, form_data=form_data)
if knowledge:
@@ -578,36 +546,13 @@ async def update_knowledge_access_by_id(
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
# Strip public sharing if user lacks permission
if (
user.role != "admin"
and has_public_read_access_grant(form_data.access_grants)
and not has_permission(
user.id,
"sharing.public_knowledge",
request.app.state.config.USER_PERMISSIONS,
)
):
form_data.access_grants = [
grant
for grant in form_data.access_grants
if not (
grant.get("principal_type") == "user"
and grant.get("principal_id") == "*"
)
]
# Strip individual user sharing if user lacks permission
if (
user.role != "admin"
and has_user_access_grant(form_data.access_grants)
and not has_permission(
user.id,
"access_grants.allow_users",
request.app.state.config.USER_PERMISSIONS,
)
):
form_data.access_grants = strip_user_access_grants(form_data.access_grants)
form_data.access_grants = filter_allowed_access_grants(
request.app.state.config.USER_PERMISSIONS,
user.id,
user.role,
form_data.access_grants,
"sharing.public_knowledge"
)
AccessGrants.set_access_grants("knowledge", id, form_data.access_grants, db=db)

View File

@@ -17,7 +17,7 @@ from open_webui.models.models import (
ModelAccessResponse,
Models,
)
from open_webui.models.access_grants import AccessGrants, has_public_read_access_grant, has_user_access_grant, strip_user_access_grants
from open_webui.models.access_grants import AccessGrants
from pydantic import BaseModel
from open_webui.constants import ERROR_MESSAGES
@@ -33,7 +33,7 @@ from fastapi.responses import FileResponse, StreamingResponse
from open_webui.utils.auth import get_admin_user, get_verified_user
from open_webui.utils.access_control import has_permission
from open_webui.utils.access_control import has_permission, filter_allowed_access_grants
from open_webui.config import BYPASS_ADMIN_ACCESS_CONTROL, STATIC_DIR
from open_webui.internal.db import get_session
from sqlalchemy.orm import Session
@@ -565,36 +565,13 @@ async def update_model_access_by_id(
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
# Strip public sharing if user lacks permission
if (
user.role != "admin"
and has_public_read_access_grant(form_data.access_grants)
and not has_permission(
user.id,
"sharing.public_models",
request.app.state.config.USER_PERMISSIONS,
)
):
form_data.access_grants = [
grant
for grant in form_data.access_grants
if not (
grant.get("principal_type") == "user"
and grant.get("principal_id") == "*"
)
]
# Strip individual user sharing if user lacks permission
if (
user.role != "admin"
and has_user_access_grant(form_data.access_grants)
and not has_permission(
user.id,
"access_grants.allow_users",
request.app.state.config.USER_PERMISSIONS,
)
):
form_data.access_grants = strip_user_access_grants(form_data.access_grants)
form_data.access_grants = filter_allowed_access_grants(
request.app.state.config.USER_PERMISSIONS,
user.id,
user.role,
form_data.access_grants,
"sharing.public_models"
)
AccessGrants.set_access_grants(
"model", form_data.id, form_data.access_grants, db=db

View File

@@ -27,8 +27,8 @@ from open_webui.constants import ERROR_MESSAGES
from open_webui.utils.auth import get_admin_user, get_verified_user
from open_webui.utils.access_control import has_permission
from open_webui.models.access_grants import AccessGrants, has_public_read_access_grant, has_user_access_grant, strip_user_access_grants
from open_webui.utils.access_control import has_permission, filter_allowed_access_grants
from open_webui.models.access_grants import AccessGrants
from open_webui.internal.db import get_session
from sqlalchemy.orm import Session
@@ -283,30 +283,14 @@ async def update_note_by_id(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
# Check if user can share publicly
if (
user.role != "admin"
and has_public_read_access_grant(form_data.access_grants)
and not has_permission(
user.id,
"sharing.public_notes",
request.app.state.config.USER_PERMISSIONS,
db=db,
)
):
form_data.access_grants = []
# Strip individual user sharing if user lacks permission
if (
user.role != "admin"
and has_user_access_grant(form_data.access_grants)
and not has_permission(
user.id,
"access_grants.allow_users",
request.app.state.config.USER_PERMISSIONS,
)
):
form_data.access_grants = strip_user_access_grants(form_data.access_grants)
form_data.access_grants = filter_allowed_access_grants(
request.app.state.config.USER_PERMISSIONS,
user.id,
user.role,
form_data.access_grants,
"sharing.public_notes",
db=db,
)
try:
note = Notes.update_note_by_id(id, form_data, db=db)
@@ -369,36 +353,13 @@ async def update_note_access_by_id(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT()
)
# Strip public sharing if user lacks permission
if (
user.role != "admin"
and has_public_read_access_grant(form_data.access_grants)
and not has_permission(
user.id,
"sharing.public_notes",
request.app.state.config.USER_PERMISSIONS,
)
):
form_data.access_grants = [
grant
for grant in form_data.access_grants
if not (
grant.get("principal_type") == "user"
and grant.get("principal_id") == "*"
)
]
# Strip individual user sharing if user lacks permission
if (
user.role != "admin"
and has_user_access_grant(form_data.access_grants)
and not has_permission(
user.id,
"access_grants.allow_users",
request.app.state.config.USER_PERMISSIONS,
)
):
form_data.access_grants = strip_user_access_grants(form_data.access_grants)
form_data.access_grants = filter_allowed_access_grants(
request.app.state.config.USER_PERMISSIONS,
user.id,
user.role,
form_data.access_grants,
"sharing.public_notes"
)
AccessGrants.set_access_grants("note", id, form_data.access_grants, db=db)

View File

@@ -9,7 +9,7 @@ from open_webui.models.prompts import (
PromptModel,
Prompts,
)
from open_webui.models.access_grants import AccessGrants, has_public_read_access_grant, has_user_access_grant, strip_user_access_grants
from open_webui.models.access_grants import AccessGrants
from open_webui.models.groups import Groups
from open_webui.models.prompt_history import (
PromptHistories,
@@ -18,7 +18,7 @@ from open_webui.models.prompt_history import (
)
from open_webui.constants import ERROR_MESSAGES
from open_webui.utils.auth import get_admin_user, get_verified_user
from open_webui.utils.access_control import has_permission
from open_webui.utils.access_control import has_permission, filter_allowed_access_grants
from open_webui.config import BYPASS_ADMIN_ACCESS_CONTROL
from open_webui.internal.db import get_session
from sqlalchemy.orm import Session
@@ -473,36 +473,13 @@ async def update_prompt_access_by_id(
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
# Strip public sharing if user lacks permission
if (
user.role != "admin"
and has_public_read_access_grant(form_data.access_grants)
and not has_permission(
user.id,
"sharing.public_prompts",
request.app.state.config.USER_PERMISSIONS,
)
):
form_data.access_grants = [
grant
for grant in form_data.access_grants
if not (
grant.get("principal_type") == "user"
and grant.get("principal_id") == "*"
)
]
# Strip individual user sharing if user lacks permission
if (
user.role != "admin"
and has_user_access_grant(form_data.access_grants)
and not has_permission(
user.id,
"access_grants.allow_users",
request.app.state.config.USER_PERMISSIONS,
)
):
form_data.access_grants = strip_user_access_grants(form_data.access_grants)
form_data.access_grants = filter_allowed_access_grants(
request.app.state.config.USER_PERMISSIONS,
user.id,
user.role,
form_data.access_grants,
"sharing.public_prompts"
)
AccessGrants.set_access_grants("prompt", prompt_id, form_data.access_grants, db=db)

View File

@@ -17,7 +17,7 @@ from open_webui.models.skills import (
SkillAccessListResponse,
Skills,
)
from open_webui.models.access_grants import AccessGrants, has_public_read_access_grant, has_user_access_grant, strip_user_access_grants
from open_webui.models.access_grants import AccessGrants
from open_webui.utils.auth import get_admin_user, get_verified_user
from open_webui.utils.access_control import has_access, has_permission
@@ -341,36 +341,13 @@ async def update_skill_access_by_id(
detail=ERROR_MESSAGES.UNAUTHORIZED,
)
# Strip public sharing if user lacks permission
if (
user.role != "admin"
and has_public_read_access_grant(form_data.access_grants)
and not has_permission(
user.id,
"sharing.public_skills",
request.app.state.config.USER_PERMISSIONS,
)
):
form_data.access_grants = [
grant
for grant in form_data.access_grants
if not (
grant.get("principal_type") == "user"
and grant.get("principal_id") == "*"
)
]
# Strip individual user sharing if user lacks permission
if (
user.role != "admin"
and has_user_access_grant(form_data.access_grants)
and not has_permission(
user.id,
"access_grants.allow_users",
request.app.state.config.USER_PERMISSIONS,
)
):
form_data.access_grants = strip_user_access_grants(form_data.access_grants)
form_data.access_grants = filter_allowed_access_grants(
request.app.state.config.USER_PERMISSIONS,
user.id,
user.role,
form_data.access_grants,
"sharing.public_skills"
)
AccessGrants.set_access_grants("skill", id, form_data.access_grants, db=db)

View File

@@ -21,7 +21,7 @@ from open_webui.models.tools import (
ToolAccessResponse,
Tools,
)
from open_webui.models.access_grants import AccessGrants, has_public_read_access_grant, has_user_access_grant, strip_user_access_grants
from open_webui.models.access_grants import AccessGrants
from open_webui.utils.plugin import (
load_tool_module_by_id,
replace_imports,
@@ -576,36 +576,13 @@ async def update_tool_access_by_id(
detail=ERROR_MESSAGES.UNAUTHORIZED,
)
# Strip public sharing if user lacks permission
if (
user.role != "admin"
and has_public_read_access_grant(form_data.access_grants)
and not has_permission(
user.id,
"sharing.public_tools",
request.app.state.config.USER_PERMISSIONS,
)
):
form_data.access_grants = [
grant
for grant in form_data.access_grants
if not (
grant.get("principal_type") == "user"
and grant.get("principal_id") == "*"
)
]
# Strip individual user sharing if user lacks permission
if (
user.role != "admin"
and has_user_access_grant(form_data.access_grants)
and not has_permission(
user.id,
"access_grants.allow_users",
request.app.state.config.USER_PERMISSIONS,
)
):
form_data.access_grants = strip_user_access_grants(form_data.access_grants)
form_data.access_grants = filter_allowed_access_grants(
request.app.state.config.USER_PERMISSIONS,
user.id,
user.role,
form_data.access_grants,
"sharing.public_tools"
)
AccessGrants.set_access_grants("tool", id, form_data.access_grants, db=db)

View File

@@ -194,3 +194,53 @@ def migrate_access_control(
data[grants_key] = grants
data.pop(ac_key, None)
from open_webui.models.access_grants import (
has_public_read_access_grant,
has_user_access_grant,
strip_user_access_grants,
)
def filter_allowed_access_grants(
default_permissions: Dict[str, Any],
user_id: str,
user_role: str,
access_grants: list,
public_permission_key: str,
db: Optional[Any] = None,
) -> list:
"""
Checks if the user has the required permissions to grant access to a resource.
Returns the filtered list of access grants if permissions are missing.
"""
if user_role == "admin" or not access_grants:
return access_grants
# Check if user can share publicly
if has_public_read_access_grant(access_grants) and not has_permission(
user_id,
public_permission_key,
default_permissions,
db=db,
):
access_grants = [
grant
for grant in access_grants
if not (
(grant.get("principal_type") if isinstance(grant, dict) else getattr(grant, "principal_type", None)) == "user"
and (grant.get("principal_id") if isinstance(grant, dict) else getattr(grant, "principal_id", None)) == "*"
)
]
# Strip individual user sharing if user lacks permission
if has_user_access_grant(access_grants) and not has_permission(
user_id,
"access_grants.allow_users",
default_permissions,
db=db,
):
access_grants = strip_user_access_grants(access_grants)
return access_grants