diff --git a/backend/open_webui/routers/ollama.py b/backend/open_webui/routers/ollama.py index 93745440c4..728d09e524 100644 --- a/backend/open_webui/routers/ollama.py +++ b/backend/open_webui/routers/ollama.py @@ -47,6 +47,7 @@ from open_webui.internal.db import get_session from open_webui.models.models import Models from open_webui.models.access_grants import AccessGrants from open_webui.models.groups import Groups +from open_webui.utils.access_control import check_model_access from open_webui.utils.misc import ( calculate_sha256, cleanup_response, @@ -1097,29 +1098,9 @@ async def generate_chat_completion( if not bypass_system_prompt: payload = apply_system_prompt_to_body(system, payload, metadata, user) - # Check if user has access to the model - if not bypass_filter and user.role == 'user': - user_group_ids = {group.id for group in Groups.get_groups_by_member_id(user.id)} - if not ( - user.id == model_info.user_id - or AccessGrants.has_access( - user_id=user.id, - resource_type='model', - resource_id=model_info.id, - permission='read', - user_group_ids=user_group_ids, - ) - ): - raise HTTPException( - status_code=403, - detail='Model not found', - ) - elif not bypass_filter: - if user.role != 'admin': - raise HTTPException( - status_code=403, - detail='Model not found', - ) + check_model_access(user, model_info, bypass_filter) + else: + check_model_access(user, None, bypass_filter) url, url_idx = await get_ollama_url(request, payload['model'], url_idx) api_config = request.app.state.config.OLLAMA_API_CONFIGS.get( @@ -1206,29 +1187,9 @@ async def generate_openai_completion( if params: payload = apply_model_params_to_body_openai(params, payload) - # Check if user has access to the model - if user.role == 'user': - user_group_ids = {group.id for group in Groups.get_groups_by_member_id(user.id)} - if not ( - user.id == model_info.user_id - or AccessGrants.has_access( - user_id=user.id, - resource_type='model', - resource_id=model_info.id, - permission='read', - user_group_ids=user_group_ids, - ) - ): - raise HTTPException( - status_code=403, - detail='Model not found', - ) + check_model_access(user, model_info) else: - if user.role != 'admin': - raise HTTPException( - status_code=403, - detail='Model not found', - ) + check_model_access(user, None) url, url_idx = await get_ollama_url(request, payload['model'], url_idx) api_config = request.app.state.config.OLLAMA_API_CONFIGS.get( @@ -1292,29 +1253,9 @@ async def generate_openai_chat_completion( payload = apply_model_params_to_body_openai(params, payload) payload = apply_system_prompt_to_body(system, payload, metadata, user) - # Check if user has access to the model - if user.role == 'user': - user_group_ids = {group.id for group in Groups.get_groups_by_member_id(user.id)} - if not ( - user.id == model_info.user_id - or AccessGrants.has_access( - user_id=user.id, - resource_type='model', - resource_id=model_info.id, - permission='read', - user_group_ids=user_group_ids, - ) - ): - raise HTTPException( - status_code=403, - detail='Model not found', - ) + check_model_access(user, model_info) else: - if user.role != 'admin': - raise HTTPException( - status_code=403, - detail='Model not found', - ) + check_model_access(user, None) url, url_idx = await get_ollama_url(request, payload['model'], url_idx) api_config = request.app.state.config.OLLAMA_API_CONFIGS.get( @@ -1364,29 +1305,9 @@ async def generate_anthropic_messages( if model_info.base_model_id: payload['model'] = model_info.base_model_id - # Check if user has access to the model - if user.role == 'user': - user_group_ids = {group.id for group in Groups.get_groups_by_member_id(user.id)} - if not ( - user.id == model_info.user_id - or AccessGrants.has_access( - user_id=user.id, - resource_type='model', - resource_id=model_info.id, - permission='read', - user_group_ids=user_group_ids, - ) - ): - raise HTTPException( - status_code=403, - detail='Model not found', - ) + check_model_access(user, model_info) else: - if user.role != 'admin': - raise HTTPException( - status_code=403, - detail='Model not found', - ) + check_model_access(user, None) url, url_idx = await get_ollama_url(request, payload['model'], url_idx) api_config = request.app.state.config.OLLAMA_API_CONFIGS.get( diff --git a/backend/open_webui/routers/openai.py b/backend/open_webui/routers/openai.py index 836517df9d..83a537f913 100644 --- a/backend/open_webui/routers/openai.py +++ b/backend/open_webui/routers/openai.py @@ -28,6 +28,7 @@ from open_webui.internal.db import get_session from open_webui.models.models import Models from open_webui.models.access_grants import AccessGrants from open_webui.models.groups import Groups +from open_webui.utils.access_control import has_connection_access, check_model_access from open_webui.config import ( CACHE_DIR, ) @@ -1044,29 +1045,9 @@ async def generate_chat_completion( if not bypass_system_prompt: payload = apply_system_prompt_to_body(system, payload, metadata, user) - # Check if user has access to the model - if not bypass_filter and user.role == 'user': - user_group_ids = {group.id for group in Groups.get_groups_by_member_id(user.id)} - if not ( - user.id == model_info.user_id - or AccessGrants.has_access( - user_id=user.id, - resource_type='model', - resource_id=model_info.id, - permission='read', - user_group_ids=user_group_ids, - ) - ): - raise HTTPException( - status_code=403, - detail='Model not found', - ) - elif not bypass_filter: - if user.role != 'admin': - raise HTTPException( - status_code=403, - detail='Model not found', - ) + check_model_access(user, model_info, bypass_filter) + else: + check_model_access(user, None, bypass_filter) # Check if model is already in app state cache to avoid expensive get_all_models() call models = request.app.state.OPENAI_MODELS @@ -1340,10 +1321,15 @@ async def responses( Routes to the correct upstream backend based on the model field. """ payload = form_data.model_dump(exclude_none=True) - body = json.dumps(payload) idx = 0 model_id = form_data.model + + # Enforce per-model access control + check_model_access(user, Models.get_model_by_id(model_id), BYPASS_MODEL_ACCESS_CONTROL) + + body = json.dumps(payload) + if model_id: models = request.app.state.OPENAI_MODELS if not models or model_id not in models: diff --git a/backend/open_webui/utils/access_control/__init__.py b/backend/open_webui/utils/access_control/__init__.py index f31c59e158..9c91371384 100644 --- a/backend/open_webui/utils/access_control/__init__.py +++ b/backend/open_webui/utils/access_control/__init__.py @@ -255,3 +255,47 @@ def filter_allowed_access_grants( access_grants = strip_user_access_grants(access_grants) return access_grants + + +def check_model_access( + user: UserModel, + model_info, + bypass_filter: bool = False, +) -> None: + """ + Enforce per-model read access for the given user. + + Raises HTTPException(403) if the user is not authorized. + Does nothing if bypass_filter is True. + + Args: + user: The authenticated user. + model_info: The model record from Models.get_model_by_id(), + or None if the model is not registered. + bypass_filter: If True, skip all access checks (used by + internal callers and BYPASS_MODEL_ACCESS_CONTROL). + """ + from fastapi import HTTPException + + if bypass_filter: + return + + if model_info: + if user.role == 'user': + from open_webui.models.access_grants import AccessGrants + + user_group_ids = {group.id for group in Groups.get_groups_by_member_id(user.id)} + if not ( + user.id == model_info.user_id + or AccessGrants.has_access( + user_id=user.id, + resource_type='model', + resource_id=model_info.id, + permission='read', + user_group_ids=user_group_ids, + ) + ): + raise HTTPException(status_code=403, detail='Model not found') + else: + if user.role != 'admin': + raise HTTPException(status_code=403, detail='Model not found')