From c8622adcb01f3091b17ca50f8c8e2f20c7b9cd2a Mon Sep 17 00:00:00 2001 From: Tim Baek Date: Wed, 7 Jan 2026 08:58:58 -0500 Subject: [PATCH] feat: builtin kb tools --- backend/open_webui/tools/builtin.py | 366 ++++++++++++++++++++++++++++ backend/open_webui/utils/tools.py | 12 +- 2 files changed, 377 insertions(+), 1 deletion(-) diff --git a/backend/open_webui/tools/builtin.py b/backend/open_webui/tools/builtin.py index 11069ae014..b8cda36012 100644 --- a/backend/open_webui/tools/builtin.py +++ b/backend/open_webui/tools/builtin.py @@ -1148,3 +1148,369 @@ async def view_channel_thread( except Exception as e: log.exception(f"view_channel_thread error: {e}") return json.dumps({"error": str(e)}) + + +# ============================================================================= +# KNOWLEDGE BASE TOOLS +# ============================================================================= + + +async def list_knowledge_bases( + count: int = 10, + skip: int = 0, + __request__: Request = None, + __user__: dict = None, +) -> str: + """ + List the user's accessible knowledge bases. + + :param count: Maximum number of KBs to return (default: 10) + :param skip: Number of results to skip for pagination (default: 0) + :return: JSON with KBs containing id, name, description, and file_count + """ + if __request__ is None: + return json.dumps({"error": "Request context not available"}) + + if not __user__: + return json.dumps({"error": "User context not available"}) + + try: + from open_webui.models.knowledge import Knowledges + + user_id = __user__.get("id") + user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)] + + result = Knowledges.search_knowledge_bases( + user_id, + filter={ + "query": "", # Empty query to get all + "user_id": user_id, + "group_ids": user_group_ids, + }, + skip=skip, + limit=count, + ) + + knowledge_bases = [] + for knowledge_base in result.items: + # Get file count for this KB + files = Knowledges.get_files_by_id(knowledge_base.id) + file_count = len(files) if files else 0 + + knowledge_bases.append( + { + "id": knowledge_base.id, + "name": knowledge_base.name, + "description": knowledge_base.description or "", + "file_count": file_count, + "updated_at": knowledge_base.updated_at, + } + ) + + return json.dumps(knowledge_bases, ensure_ascii=False) + except Exception as e: + log.exception(f"list_knowledge_bases error: {e}") + return json.dumps({"error": str(e)}) + +async def search_knowledge_bases( + query: str, + count: int = 5, + skip: int = 0, + __request__: Request = None, + __user__: dict = None, +) -> str: + """ + Search the user's accessible knowledge bases by name and description. + + :param query: The search query to find matching knowledge bases + :param count: Maximum number of results to return (default: 5) + :param skip: Number of results to skip for pagination (default: 0) + :return: JSON with matching KBs containing id, name, description, and file_count + """ + if __request__ is None: + return json.dumps({"error": "Request context not available"}) + + if not __user__: + return json.dumps({"error": "User context not available"}) + + try: + from open_webui.models.knowledge import Knowledges + + user_id = __user__.get("id") + user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)] + + result = Knowledges.search_knowledge_bases( + user_id, + filter={ + "query": query, + "user_id": user_id, + "group_ids": user_group_ids, + }, + skip=skip, + limit=count, + ) + + knowledge_bases = [] + for knowledge_base in result.items: + # Get file count for this KB + files = Knowledges.get_files_by_id(knowledge_base.id) + file_count = len(files) if files else 0 + + knowledge_bases.append( + { + "id": knowledge_base.id, + "name": knowledge_base.name, + "description": knowledge_base.description or "", + "file_count": file_count, + "updated_at": knowledge_base.updated_at, + } + ) + + return json.dumps(knowledge_bases, ensure_ascii=False) + except Exception as e: + log.exception(f"search_knowledge_bases error: {e}") + return json.dumps({"error": str(e)}) + + +async def search_knowledge_files( + query: str, + knowledge_id: Optional[str] = None, + count: int = 5, + skip: int = 0, + __request__: Request = None, + __user__: dict = None, +) -> str: + """ + Search files across knowledge bases the user has access to. + + :param query: The search query to find matching files by filename + :param knowledge_id: Optional KB id to limit search to a specific knowledge base + :param count: Maximum number of results to return (default: 5) + :param skip: Number of results to skip for pagination (default: 0) + :return: JSON with matching files containing id, filename, knowledge_id, knowledge_name, and updated_at + """ + if __request__ is None: + return json.dumps({"error": "Request context not available"}) + + if not __user__: + return json.dumps({"error": "User context not available"}) + + try: + from open_webui.models.knowledge import Knowledges + + user_id = __user__.get("id") + user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)] + + if knowledge_id: + # Search within a specific KB + result = Knowledges.search_files_by_id( + knowledge_id=knowledge_id, + user_id=user_id, + filter={"query": query}, + skip=skip, + limit=count, + ) + else: + # Search across all accessible KBs + result = Knowledges.search_knowledge_files( + filter={ + "query": query, + "user_id": user_id, + "group_ids": user_group_ids, + }, + skip=skip, + limit=count, + ) + + files = [] + for file in result.items: + file_info = { + "id": file.id, + "filename": file.filename, + "updated_at": file.updated_at, + } + + # Add KB info if available (from search_knowledge_files) + if hasattr(file, "collection") and file.collection: + file_info["knowledge_id"] = file.collection.get("id", "") + file_info["knowledge_name"] = file.collection.get("name", "") + + files.append(file_info) + + return json.dumps(files, ensure_ascii=False) + except Exception as e: + log.exception(f"search_knowledge_files error: {e}") + return json.dumps({"error": str(e)}) + + +async def view_knowledge_file( + file_id: str, + __request__: Request = None, + __user__: dict = None, +) -> str: + """ + Get the full content of a file from a knowledge base. + + :param file_id: The ID of the file to retrieve + :return: JSON with the file's id, filename, and full text content + """ + if __request__ is None: + return json.dumps({"error": "Request context not available"}) + + if not __user__: + return json.dumps({"error": "User context not available"}) + + try: + from open_webui.models.files import Files + from open_webui.models.knowledge import Knowledges + from open_webui.utils.access_control import has_access + + user_id = __user__.get("id") + user_role = __user__.get("role", "user") + user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)] + + # Get the file + file = Files.get_file_by_id(file_id) + if not file: + return json.dumps({"error": "File not found"}) + + # Check if user has access via any KB containing this file + knowledges = Knowledges.get_knowledges_by_file_id(file_id) + + has_knowledge_access = False + knowledge_info = None + for knowledge_base in knowledges: + if ( + user_role == "admin" + or knowledge_base.user_id == user_id + or has_access(user_id, "read", knowledge_base.access_control, user_group_ids) + ): + has_knowledge_access = True + knowledge_info = {"id": knowledge_base.id, "name": knowledge_base.name} + break + + if not has_knowledge_access: + # Also allow if user owns the file directly + if file.user_id != user_id and user_role != "admin": + return json.dumps({"error": "Access denied"}) + + # Get file content (extracted text stored during upload) + content = "" + if file.data: + content = file.data.get("content", "") + + result = { + "id": file.id, + "filename": file.filename, + "content": content, + "updated_at": file.updated_at, + "created_at": file.created_at, + } + + if knowledge_info: + result["knowledge_id"] = knowledge_info["id"] + result["knowledge_name"] = knowledge_info["name"] + + return json.dumps(result, ensure_ascii=False) + except Exception as e: + log.exception(f"view_knowledge_file error: {e}") + return json.dumps({"error": str(e)}) + + +async def query_knowledge_bases( + query: str, + knowledge_ids: Optional[list[str]] = None, + count: int = 5, + __request__: Request = None, + __user__: dict = None, +) -> str: + """ + Search knowledge bases using semantic/vector search to find relevant content chunks. + + :param query: The search query to find semantically relevant content + :param knowledge_ids: Optional list of KB ids to limit search to specific knowledge bases + :param count: Maximum number of results to return (default: 5) + :return: JSON with relevant chunks containing content, source filename, and relevance score + """ + if __request__ is None: + return json.dumps({"error": "Request context not available"}) + + if not __user__: + return json.dumps({"error": "User context not available"}) + + try: + from open_webui.models.knowledge import Knowledges + from open_webui.retrieval.utils import query_collection + from open_webui.utils.access_control import has_access + + user_id = __user__.get("id") + user_role = __user__.get("role", "user") + user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)] + + # Get embedding function from app state + embedding_function = __request__.app.state.EMBEDDING_FUNCTION + if not embedding_function: + return json.dumps({"error": "Embedding function not configured"}) + + # Determine which KB collections to search + collection_names = [] + + if knowledge_ids: + # Search specific KBs - verify access for each + for knowledge_id in knowledge_ids: + knowledge = Knowledges.get_knowledge_by_id(knowledge_id) + if knowledge and ( + user_role == "admin" + or knowledge.user_id == user_id + or has_access(user_id, "read", knowledge.access_control, user_group_ids) + ): + collection_names.append(knowledge_id) + else: + # Search all accessible KBs + result = Knowledges.search_knowledge_bases( + user_id, + filter={ + "query": "", + "user_id": user_id, + "group_ids": user_group_ids, + }, + skip=0, + limit=50, # Get up to 50 accessible KBs + ) + collection_names = [knowledge_base.id for knowledge_base in result.items] + + if not collection_names: + return json.dumps([]) + + # Perform vector search across collections + query_results = await query_collection( + collection_names=collection_names, + queries=[query], + embedding_function=embedding_function, + k=count, + ) + + # Format results + chunks = [] + if query_results and "documents" in query_results: + documents = query_results.get("documents", [[]])[0] + metadatas = query_results.get("metadatas", [[]])[0] + distances = query_results.get("distances", [[]])[0] + + for idx, doc in enumerate(documents): + chunk_info = { + "content": doc, + "source": metadatas[idx].get("source", metadatas[idx].get("name", "Unknown")), + "file_id": metadatas[idx].get("file_id", ""), + } + + # Add relevance score if available + if idx < len(distances): + chunk_info["distance"] = distances[idx] + + chunks.append(chunk_info) + + return json.dumps(chunks, ensure_ascii=False) + except Exception as e: + log.exception(f"query_knowledge_bases error: {e}") + return json.dumps({"error": str(e)}) diff --git a/backend/open_webui/utils/tools.py b/backend/open_webui/utils/tools.py index 8a970e1c56..fdfdbc5fb9 100644 --- a/backend/open_webui/utils/tools.py +++ b/backend/open_webui/utils/tools.py @@ -63,6 +63,11 @@ from open_webui.tools.builtin import ( view_channel_thread, replace_note_content, write_note, + list_knowledge_bases, + search_knowledge_bases, + search_knowledge_files, + view_knowledge_file, + query_knowledge_bases, ) import copy @@ -360,9 +365,14 @@ def get_builtin_tools( # Time utilities - always available for date calculations builtin_functions.extend([get_current_timestamp, calculate_timestamp]) - # Chats tools - search and fetch user's chat history (always available) + # Chats tools - search and fetch user's chat history builtin_functions.extend([search_chats, view_chat]) + # Knowledge base tools - list, search, query, and view user's accessible knowledge bases + builtin_functions.extend( + [list_knowledge_bases, search_knowledge_bases, search_knowledge_files, view_knowledge_file, query_knowledge_bases] + ) + # Add memory tools if enabled for this chat if features.get("memory"): builtin_functions.extend([search_memories, add_memory, replace_memory_content])