From 176f9a781619d836be003d28d53904639cad4128 Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Mon, 23 Feb 2026 16:01:03 -0600 Subject: [PATCH] refac --- backend/open_webui/routers/knowledge.py | 103 +++++---------------- backend/open_webui/routers/models.py | 41 ++------ backend/open_webui/routers/notes.py | 73 ++++----------- backend/open_webui/routers/prompts.py | 41 ++------ backend/open_webui/routers/skills.py | 39 ++------ backend/open_webui/routers/tools.py | 39 ++------ backend/open_webui/utils/access_control.py | 50 ++++++++++ 7 files changed, 125 insertions(+), 261 deletions(-) diff --git a/backend/open_webui/routers/knowledge.py b/backend/open_webui/routers/knowledge.py index b5cecb6fac..2531f8b92a 100644 --- a/backend/open_webui/routers/knowledge.py +++ b/backend/open_webui/routers/knowledge.py @@ -29,8 +29,8 @@ from open_webui.storage.provider import Storage from open_webui.constants import ERROR_MESSAGES from open_webui.utils.auth import get_verified_user, get_admin_user -from open_webui.utils.access_control import has_permission -from open_webui.models.access_grants import AccessGrants, has_public_read_access_grant, has_user_access_grant, strip_user_access_grants +from open_webui.utils.access_control import has_permission, filter_allowed_access_grants +from open_webui.models.access_grants import AccessGrants from open_webui.config import BYPASS_ADMIN_ACCESS_CONTROL @@ -251,7 +251,7 @@ async def create_new_knowledge( user=Depends(get_verified_user), ): # NOTE: We intentionally do NOT use Depends(get_session) here. - # Database operations (has_permission, insert_new_knowledge) manage their own sessions. + # Database operations (has_permission, filter_allowed_access_grants, insert_new_knowledge) manage their own sessions. # This prevents holding a connection during embed_knowledge_base_metadata() # which makes external embedding API calls (1-5+ seconds). if user.role != "admin" and not has_permission( @@ -262,29 +262,13 @@ async def create_new_knowledge( detail=ERROR_MESSAGES.UNAUTHORIZED, ) - # Check if user can share publicly - if ( - user.role != "admin" - and has_public_read_access_grant(form_data.access_grants) - and not has_permission( - user.id, - "sharing.public_knowledge", - request.app.state.config.USER_PERMISSIONS, - ) - ): - form_data.access_grants = [] - - # Strip individual user sharing if user lacks permission - if ( - user.role != "admin" - and has_user_access_grant(form_data.access_grants) - and not has_permission( - user.id, - "access_grants.allow_users", - request.app.state.config.USER_PERMISSIONS, - ) - ): - form_data.access_grants = strip_user_access_grants(form_data.access_grants) + form_data.access_grants = filter_allowed_access_grants( + request.app.state.config.USER_PERMISSIONS, + user.id, + user.role, + form_data.access_grants, + "sharing.public_knowledge", + ) knowledge = Knowledges.insert_new_knowledge(user.id, form_data) @@ -494,29 +478,13 @@ async def update_knowledge_by_id( detail=ERROR_MESSAGES.ACCESS_PROHIBITED, ) - # Check if user can share publicly - if ( - user.role != "admin" - and has_public_read_access_grant(form_data.access_grants) - and not has_permission( - user.id, - "sharing.public_knowledge", - request.app.state.config.USER_PERMISSIONS, - ) - ): - form_data.access_grants = [] - - # Strip individual user sharing if user lacks permission - if ( - user.role != "admin" - and has_user_access_grant(form_data.access_grants) - and not has_permission( - user.id, - "access_grants.allow_users", - request.app.state.config.USER_PERMISSIONS, - ) - ): - form_data.access_grants = strip_user_access_grants(form_data.access_grants) + form_data.access_grants = filter_allowed_access_grants( + request.app.state.config.USER_PERMISSIONS, + user.id, + user.role, + form_data.access_grants, + "sharing.public_knowledge", + ) knowledge = Knowledges.update_knowledge_by_id(id=id, form_data=form_data) if knowledge: @@ -578,36 +546,13 @@ async def update_knowledge_access_by_id( detail=ERROR_MESSAGES.ACCESS_PROHIBITED, ) - # Strip public sharing if user lacks permission - if ( - user.role != "admin" - and has_public_read_access_grant(form_data.access_grants) - and not has_permission( - user.id, - "sharing.public_knowledge", - request.app.state.config.USER_PERMISSIONS, - ) - ): - form_data.access_grants = [ - grant - for grant in form_data.access_grants - if not ( - grant.get("principal_type") == "user" - and grant.get("principal_id") == "*" - ) - ] - - # Strip individual user sharing if user lacks permission - if ( - user.role != "admin" - and has_user_access_grant(form_data.access_grants) - and not has_permission( - user.id, - "access_grants.allow_users", - request.app.state.config.USER_PERMISSIONS, - ) - ): - form_data.access_grants = strip_user_access_grants(form_data.access_grants) + form_data.access_grants = filter_allowed_access_grants( + request.app.state.config.USER_PERMISSIONS, + user.id, + user.role, + form_data.access_grants, + "sharing.public_knowledge" + ) AccessGrants.set_access_grants("knowledge", id, form_data.access_grants, db=db) diff --git a/backend/open_webui/routers/models.py b/backend/open_webui/routers/models.py index 9feb0e8548..ce89eb5af8 100644 --- a/backend/open_webui/routers/models.py +++ b/backend/open_webui/routers/models.py @@ -17,7 +17,7 @@ from open_webui.models.models import ( ModelAccessResponse, Models, ) -from open_webui.models.access_grants import AccessGrants, has_public_read_access_grant, has_user_access_grant, strip_user_access_grants +from open_webui.models.access_grants import AccessGrants from pydantic import BaseModel from open_webui.constants import ERROR_MESSAGES @@ -33,7 +33,7 @@ from fastapi.responses import FileResponse, StreamingResponse from open_webui.utils.auth import get_admin_user, get_verified_user -from open_webui.utils.access_control import has_permission +from open_webui.utils.access_control import has_permission, filter_allowed_access_grants from open_webui.config import BYPASS_ADMIN_ACCESS_CONTROL, STATIC_DIR from open_webui.internal.db import get_session from sqlalchemy.orm import Session @@ -565,36 +565,13 @@ async def update_model_access_by_id( detail=ERROR_MESSAGES.ACCESS_PROHIBITED, ) - # Strip public sharing if user lacks permission - if ( - user.role != "admin" - and has_public_read_access_grant(form_data.access_grants) - and not has_permission( - user.id, - "sharing.public_models", - request.app.state.config.USER_PERMISSIONS, - ) - ): - form_data.access_grants = [ - grant - for grant in form_data.access_grants - if not ( - grant.get("principal_type") == "user" - and grant.get("principal_id") == "*" - ) - ] - - # Strip individual user sharing if user lacks permission - if ( - user.role != "admin" - and has_user_access_grant(form_data.access_grants) - and not has_permission( - user.id, - "access_grants.allow_users", - request.app.state.config.USER_PERMISSIONS, - ) - ): - form_data.access_grants = strip_user_access_grants(form_data.access_grants) + form_data.access_grants = filter_allowed_access_grants( + request.app.state.config.USER_PERMISSIONS, + user.id, + user.role, + form_data.access_grants, + "sharing.public_models" + ) AccessGrants.set_access_grants( "model", form_data.id, form_data.access_grants, db=db diff --git a/backend/open_webui/routers/notes.py b/backend/open_webui/routers/notes.py index 7c971fa312..f25a5dcfd0 100644 --- a/backend/open_webui/routers/notes.py +++ b/backend/open_webui/routers/notes.py @@ -27,8 +27,8 @@ from open_webui.constants import ERROR_MESSAGES from open_webui.utils.auth import get_admin_user, get_verified_user -from open_webui.utils.access_control import has_permission -from open_webui.models.access_grants import AccessGrants, has_public_read_access_grant, has_user_access_grant, strip_user_access_grants +from open_webui.utils.access_control import has_permission, filter_allowed_access_grants +from open_webui.models.access_grants import AccessGrants from open_webui.internal.db import get_session from sqlalchemy.orm import Session @@ -283,30 +283,14 @@ async def update_note_by_id( status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() ) - # Check if user can share publicly - if ( - user.role != "admin" - and has_public_read_access_grant(form_data.access_grants) - and not has_permission( - user.id, - "sharing.public_notes", - request.app.state.config.USER_PERMISSIONS, - db=db, - ) - ): - form_data.access_grants = [] - - # Strip individual user sharing if user lacks permission - if ( - user.role != "admin" - and has_user_access_grant(form_data.access_grants) - and not has_permission( - user.id, - "access_grants.allow_users", - request.app.state.config.USER_PERMISSIONS, - ) - ): - form_data.access_grants = strip_user_access_grants(form_data.access_grants) + form_data.access_grants = filter_allowed_access_grants( + request.app.state.config.USER_PERMISSIONS, + user.id, + user.role, + form_data.access_grants, + "sharing.public_notes", + db=db, + ) try: note = Notes.update_note_by_id(id, form_data, db=db) @@ -369,36 +353,13 @@ async def update_note_access_by_id( status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() ) - # Strip public sharing if user lacks permission - if ( - user.role != "admin" - and has_public_read_access_grant(form_data.access_grants) - and not has_permission( - user.id, - "sharing.public_notes", - request.app.state.config.USER_PERMISSIONS, - ) - ): - form_data.access_grants = [ - grant - for grant in form_data.access_grants - if not ( - grant.get("principal_type") == "user" - and grant.get("principal_id") == "*" - ) - ] - - # Strip individual user sharing if user lacks permission - if ( - user.role != "admin" - and has_user_access_grant(form_data.access_grants) - and not has_permission( - user.id, - "access_grants.allow_users", - request.app.state.config.USER_PERMISSIONS, - ) - ): - form_data.access_grants = strip_user_access_grants(form_data.access_grants) + form_data.access_grants = filter_allowed_access_grants( + request.app.state.config.USER_PERMISSIONS, + user.id, + user.role, + form_data.access_grants, + "sharing.public_notes" + ) AccessGrants.set_access_grants("note", id, form_data.access_grants, db=db) diff --git a/backend/open_webui/routers/prompts.py b/backend/open_webui/routers/prompts.py index d79fdbbd6e..0e2799d7b4 100644 --- a/backend/open_webui/routers/prompts.py +++ b/backend/open_webui/routers/prompts.py @@ -9,7 +9,7 @@ from open_webui.models.prompts import ( PromptModel, Prompts, ) -from open_webui.models.access_grants import AccessGrants, has_public_read_access_grant, has_user_access_grant, strip_user_access_grants +from open_webui.models.access_grants import AccessGrants from open_webui.models.groups import Groups from open_webui.models.prompt_history import ( PromptHistories, @@ -18,7 +18,7 @@ from open_webui.models.prompt_history import ( ) from open_webui.constants import ERROR_MESSAGES from open_webui.utils.auth import get_admin_user, get_verified_user -from open_webui.utils.access_control import has_permission +from open_webui.utils.access_control import has_permission, filter_allowed_access_grants from open_webui.config import BYPASS_ADMIN_ACCESS_CONTROL from open_webui.internal.db import get_session from sqlalchemy.orm import Session @@ -473,36 +473,13 @@ async def update_prompt_access_by_id( detail=ERROR_MESSAGES.ACCESS_PROHIBITED, ) - # Strip public sharing if user lacks permission - if ( - user.role != "admin" - and has_public_read_access_grant(form_data.access_grants) - and not has_permission( - user.id, - "sharing.public_prompts", - request.app.state.config.USER_PERMISSIONS, - ) - ): - form_data.access_grants = [ - grant - for grant in form_data.access_grants - if not ( - grant.get("principal_type") == "user" - and grant.get("principal_id") == "*" - ) - ] - - # Strip individual user sharing if user lacks permission - if ( - user.role != "admin" - and has_user_access_grant(form_data.access_grants) - and not has_permission( - user.id, - "access_grants.allow_users", - request.app.state.config.USER_PERMISSIONS, - ) - ): - form_data.access_grants = strip_user_access_grants(form_data.access_grants) + form_data.access_grants = filter_allowed_access_grants( + request.app.state.config.USER_PERMISSIONS, + user.id, + user.role, + form_data.access_grants, + "sharing.public_prompts" + ) AccessGrants.set_access_grants("prompt", prompt_id, form_data.access_grants, db=db) diff --git a/backend/open_webui/routers/skills.py b/backend/open_webui/routers/skills.py index 6532b6ee04..c91dbc5b79 100644 --- a/backend/open_webui/routers/skills.py +++ b/backend/open_webui/routers/skills.py @@ -17,7 +17,7 @@ from open_webui.models.skills import ( SkillAccessListResponse, Skills, ) -from open_webui.models.access_grants import AccessGrants, has_public_read_access_grant, has_user_access_grant, strip_user_access_grants +from open_webui.models.access_grants import AccessGrants from open_webui.utils.auth import get_admin_user, get_verified_user from open_webui.utils.access_control import has_access, has_permission @@ -341,36 +341,13 @@ async def update_skill_access_by_id( detail=ERROR_MESSAGES.UNAUTHORIZED, ) - # Strip public sharing if user lacks permission - if ( - user.role != "admin" - and has_public_read_access_grant(form_data.access_grants) - and not has_permission( - user.id, - "sharing.public_skills", - request.app.state.config.USER_PERMISSIONS, - ) - ): - form_data.access_grants = [ - grant - for grant in form_data.access_grants - if not ( - grant.get("principal_type") == "user" - and grant.get("principal_id") == "*" - ) - ] - - # Strip individual user sharing if user lacks permission - if ( - user.role != "admin" - and has_user_access_grant(form_data.access_grants) - and not has_permission( - user.id, - "access_grants.allow_users", - request.app.state.config.USER_PERMISSIONS, - ) - ): - form_data.access_grants = strip_user_access_grants(form_data.access_grants) + form_data.access_grants = filter_allowed_access_grants( + request.app.state.config.USER_PERMISSIONS, + user.id, + user.role, + form_data.access_grants, + "sharing.public_skills" + ) AccessGrants.set_access_grants("skill", id, form_data.access_grants, db=db) diff --git a/backend/open_webui/routers/tools.py b/backend/open_webui/routers/tools.py index d60bcfb01b..7032b1b4b1 100644 --- a/backend/open_webui/routers/tools.py +++ b/backend/open_webui/routers/tools.py @@ -21,7 +21,7 @@ from open_webui.models.tools import ( ToolAccessResponse, Tools, ) -from open_webui.models.access_grants import AccessGrants, has_public_read_access_grant, has_user_access_grant, strip_user_access_grants +from open_webui.models.access_grants import AccessGrants from open_webui.utils.plugin import ( load_tool_module_by_id, replace_imports, @@ -576,36 +576,13 @@ async def update_tool_access_by_id( detail=ERROR_MESSAGES.UNAUTHORIZED, ) - # Strip public sharing if user lacks permission - if ( - user.role != "admin" - and has_public_read_access_grant(form_data.access_grants) - and not has_permission( - user.id, - "sharing.public_tools", - request.app.state.config.USER_PERMISSIONS, - ) - ): - form_data.access_grants = [ - grant - for grant in form_data.access_grants - if not ( - grant.get("principal_type") == "user" - and grant.get("principal_id") == "*" - ) - ] - - # Strip individual user sharing if user lacks permission - if ( - user.role != "admin" - and has_user_access_grant(form_data.access_grants) - and not has_permission( - user.id, - "access_grants.allow_users", - request.app.state.config.USER_PERMISSIONS, - ) - ): - form_data.access_grants = strip_user_access_grants(form_data.access_grants) + form_data.access_grants = filter_allowed_access_grants( + request.app.state.config.USER_PERMISSIONS, + user.id, + user.role, + form_data.access_grants, + "sharing.public_tools" + ) AccessGrants.set_access_grants("tool", id, form_data.access_grants, db=db) diff --git a/backend/open_webui/utils/access_control.py b/backend/open_webui/utils/access_control.py index b7ea9830db..63fa8b26ca 100644 --- a/backend/open_webui/utils/access_control.py +++ b/backend/open_webui/utils/access_control.py @@ -194,3 +194,53 @@ def migrate_access_control( data[grants_key] = grants data.pop(ac_key, None) + +from open_webui.models.access_grants import ( + has_public_read_access_grant, + has_user_access_grant, + strip_user_access_grants, +) + + +def filter_allowed_access_grants( + default_permissions: Dict[str, Any], + user_id: str, + user_role: str, + access_grants: list, + public_permission_key: str, + db: Optional[Any] = None, +) -> list: + """ + Checks if the user has the required permissions to grant access to a resource. + Returns the filtered list of access grants if permissions are missing. + """ + if user_role == "admin" or not access_grants: + return access_grants + + # Check if user can share publicly + if has_public_read_access_grant(access_grants) and not has_permission( + user_id, + public_permission_key, + default_permissions, + db=db, + ): + access_grants = [ + grant + for grant in access_grants + if not ( + (grant.get("principal_type") if isinstance(grant, dict) else getattr(grant, "principal_type", None)) == "user" + and (grant.get("principal_id") if isinstance(grant, dict) else getattr(grant, "principal_id", None)) == "*" + ) + ] + + # Strip individual user sharing if user lacks permission + if has_user_access_grant(access_grants) and not has_permission( + user_id, + "access_grants.allow_users", + default_permissions, + db=db, + ): + access_grants = strip_user_access_grants(access_grants) + + return access_grants +