feat: builtin kb tools

This commit is contained in:
Tim Baek
2026-01-07 08:58:58 -05:00
parent 2789f6a24d
commit c8622adcb0
2 changed files with 377 additions and 1 deletions

View File

@@ -1148,3 +1148,369 @@ async def view_channel_thread(
except Exception as e:
log.exception(f"view_channel_thread error: {e}")
return json.dumps({"error": str(e)})
# =============================================================================
# KNOWLEDGE BASE TOOLS
# =============================================================================
async def list_knowledge_bases(
count: int = 10,
skip: int = 0,
__request__: Request = None,
__user__: dict = None,
) -> str:
"""
List the user's accessible knowledge bases.
:param count: Maximum number of KBs to return (default: 10)
:param skip: Number of results to skip for pagination (default: 0)
:return: JSON with KBs containing id, name, description, and file_count
"""
if __request__ is None:
return json.dumps({"error": "Request context not available"})
if not __user__:
return json.dumps({"error": "User context not available"})
try:
from open_webui.models.knowledge import Knowledges
user_id = __user__.get("id")
user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)]
result = Knowledges.search_knowledge_bases(
user_id,
filter={
"query": "", # Empty query to get all
"user_id": user_id,
"group_ids": user_group_ids,
},
skip=skip,
limit=count,
)
knowledge_bases = []
for knowledge_base in result.items:
# Get file count for this KB
files = Knowledges.get_files_by_id(knowledge_base.id)
file_count = len(files) if files else 0
knowledge_bases.append(
{
"id": knowledge_base.id,
"name": knowledge_base.name,
"description": knowledge_base.description or "",
"file_count": file_count,
"updated_at": knowledge_base.updated_at,
}
)
return json.dumps(knowledge_bases, ensure_ascii=False)
except Exception as e:
log.exception(f"list_knowledge_bases error: {e}")
return json.dumps({"error": str(e)})
async def search_knowledge_bases(
query: str,
count: int = 5,
skip: int = 0,
__request__: Request = None,
__user__: dict = None,
) -> str:
"""
Search the user's accessible knowledge bases by name and description.
:param query: The search query to find matching knowledge bases
:param count: Maximum number of results to return (default: 5)
:param skip: Number of results to skip for pagination (default: 0)
:return: JSON with matching KBs containing id, name, description, and file_count
"""
if __request__ is None:
return json.dumps({"error": "Request context not available"})
if not __user__:
return json.dumps({"error": "User context not available"})
try:
from open_webui.models.knowledge import Knowledges
user_id = __user__.get("id")
user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)]
result = Knowledges.search_knowledge_bases(
user_id,
filter={
"query": query,
"user_id": user_id,
"group_ids": user_group_ids,
},
skip=skip,
limit=count,
)
knowledge_bases = []
for knowledge_base in result.items:
# Get file count for this KB
files = Knowledges.get_files_by_id(knowledge_base.id)
file_count = len(files) if files else 0
knowledge_bases.append(
{
"id": knowledge_base.id,
"name": knowledge_base.name,
"description": knowledge_base.description or "",
"file_count": file_count,
"updated_at": knowledge_base.updated_at,
}
)
return json.dumps(knowledge_bases, ensure_ascii=False)
except Exception as e:
log.exception(f"search_knowledge_bases error: {e}")
return json.dumps({"error": str(e)})
async def search_knowledge_files(
query: str,
knowledge_id: Optional[str] = None,
count: int = 5,
skip: int = 0,
__request__: Request = None,
__user__: dict = None,
) -> str:
"""
Search files across knowledge bases the user has access to.
:param query: The search query to find matching files by filename
:param knowledge_id: Optional KB id to limit search to a specific knowledge base
:param count: Maximum number of results to return (default: 5)
:param skip: Number of results to skip for pagination (default: 0)
:return: JSON with matching files containing id, filename, knowledge_id, knowledge_name, and updated_at
"""
if __request__ is None:
return json.dumps({"error": "Request context not available"})
if not __user__:
return json.dumps({"error": "User context not available"})
try:
from open_webui.models.knowledge import Knowledges
user_id = __user__.get("id")
user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)]
if knowledge_id:
# Search within a specific KB
result = Knowledges.search_files_by_id(
knowledge_id=knowledge_id,
user_id=user_id,
filter={"query": query},
skip=skip,
limit=count,
)
else:
# Search across all accessible KBs
result = Knowledges.search_knowledge_files(
filter={
"query": query,
"user_id": user_id,
"group_ids": user_group_ids,
},
skip=skip,
limit=count,
)
files = []
for file in result.items:
file_info = {
"id": file.id,
"filename": file.filename,
"updated_at": file.updated_at,
}
# Add KB info if available (from search_knowledge_files)
if hasattr(file, "collection") and file.collection:
file_info["knowledge_id"] = file.collection.get("id", "")
file_info["knowledge_name"] = file.collection.get("name", "")
files.append(file_info)
return json.dumps(files, ensure_ascii=False)
except Exception as e:
log.exception(f"search_knowledge_files error: {e}")
return json.dumps({"error": str(e)})
async def view_knowledge_file(
file_id: str,
__request__: Request = None,
__user__: dict = None,
) -> str:
"""
Get the full content of a file from a knowledge base.
:param file_id: The ID of the file to retrieve
:return: JSON with the file's id, filename, and full text content
"""
if __request__ is None:
return json.dumps({"error": "Request context not available"})
if not __user__:
return json.dumps({"error": "User context not available"})
try:
from open_webui.models.files import Files
from open_webui.models.knowledge import Knowledges
from open_webui.utils.access_control import has_access
user_id = __user__.get("id")
user_role = __user__.get("role", "user")
user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)]
# Get the file
file = Files.get_file_by_id(file_id)
if not file:
return json.dumps({"error": "File not found"})
# Check if user has access via any KB containing this file
knowledges = Knowledges.get_knowledges_by_file_id(file_id)
has_knowledge_access = False
knowledge_info = None
for knowledge_base in knowledges:
if (
user_role == "admin"
or knowledge_base.user_id == user_id
or has_access(user_id, "read", knowledge_base.access_control, user_group_ids)
):
has_knowledge_access = True
knowledge_info = {"id": knowledge_base.id, "name": knowledge_base.name}
break
if not has_knowledge_access:
# Also allow if user owns the file directly
if file.user_id != user_id and user_role != "admin":
return json.dumps({"error": "Access denied"})
# Get file content (extracted text stored during upload)
content = ""
if file.data:
content = file.data.get("content", "")
result = {
"id": file.id,
"filename": file.filename,
"content": content,
"updated_at": file.updated_at,
"created_at": file.created_at,
}
if knowledge_info:
result["knowledge_id"] = knowledge_info["id"]
result["knowledge_name"] = knowledge_info["name"]
return json.dumps(result, ensure_ascii=False)
except Exception as e:
log.exception(f"view_knowledge_file error: {e}")
return json.dumps({"error": str(e)})
async def query_knowledge_bases(
query: str,
knowledge_ids: Optional[list[str]] = None,
count: int = 5,
__request__: Request = None,
__user__: dict = None,
) -> str:
"""
Search knowledge bases using semantic/vector search to find relevant content chunks.
:param query: The search query to find semantically relevant content
:param knowledge_ids: Optional list of KB ids to limit search to specific knowledge bases
:param count: Maximum number of results to return (default: 5)
:return: JSON with relevant chunks containing content, source filename, and relevance score
"""
if __request__ is None:
return json.dumps({"error": "Request context not available"})
if not __user__:
return json.dumps({"error": "User context not available"})
try:
from open_webui.models.knowledge import Knowledges
from open_webui.retrieval.utils import query_collection
from open_webui.utils.access_control import has_access
user_id = __user__.get("id")
user_role = __user__.get("role", "user")
user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)]
# Get embedding function from app state
embedding_function = __request__.app.state.EMBEDDING_FUNCTION
if not embedding_function:
return json.dumps({"error": "Embedding function not configured"})
# Determine which KB collections to search
collection_names = []
if knowledge_ids:
# Search specific KBs - verify access for each
for knowledge_id in knowledge_ids:
knowledge = Knowledges.get_knowledge_by_id(knowledge_id)
if knowledge and (
user_role == "admin"
or knowledge.user_id == user_id
or has_access(user_id, "read", knowledge.access_control, user_group_ids)
):
collection_names.append(knowledge_id)
else:
# Search all accessible KBs
result = Knowledges.search_knowledge_bases(
user_id,
filter={
"query": "",
"user_id": user_id,
"group_ids": user_group_ids,
},
skip=0,
limit=50, # Get up to 50 accessible KBs
)
collection_names = [knowledge_base.id for knowledge_base in result.items]
if not collection_names:
return json.dumps([])
# Perform vector search across collections
query_results = await query_collection(
collection_names=collection_names,
queries=[query],
embedding_function=embedding_function,
k=count,
)
# Format results
chunks = []
if query_results and "documents" in query_results:
documents = query_results.get("documents", [[]])[0]
metadatas = query_results.get("metadatas", [[]])[0]
distances = query_results.get("distances", [[]])[0]
for idx, doc in enumerate(documents):
chunk_info = {
"content": doc,
"source": metadatas[idx].get("source", metadatas[idx].get("name", "Unknown")),
"file_id": metadatas[idx].get("file_id", ""),
}
# Add relevance score if available
if idx < len(distances):
chunk_info["distance"] = distances[idx]
chunks.append(chunk_info)
return json.dumps(chunks, ensure_ascii=False)
except Exception as e:
log.exception(f"query_knowledge_bases error: {e}")
return json.dumps({"error": str(e)})

View File

@@ -63,6 +63,11 @@ from open_webui.tools.builtin import (
view_channel_thread,
replace_note_content,
write_note,
list_knowledge_bases,
search_knowledge_bases,
search_knowledge_files,
view_knowledge_file,
query_knowledge_bases,
)
import copy
@@ -360,9 +365,14 @@ def get_builtin_tools(
# Time utilities - always available for date calculations
builtin_functions.extend([get_current_timestamp, calculate_timestamp])
# Chats tools - search and fetch user's chat history (always available)
# Chats tools - search and fetch user's chat history
builtin_functions.extend([search_chats, view_chat])
# Knowledge base tools - list, search, query, and view user's accessible knowledge bases
builtin_functions.extend(
[list_knowledge_bases, search_knowledge_bases, search_knowledge_files, view_knowledge_file, query_knowledge_bases]
)
# Add memory tools if enabled for this chat
if features.get("memory"):
builtin_functions.extend([search_memories, add_memory, replace_memory_content])