mirror of
https://github.com/open-webui/open-webui.git
synced 2026-05-05 10:28:06 -05:00
1552 lines
52 KiB
Python
1552 lines
52 KiB
Python
"""
|
|
Built-in tools for Open WebUI.
|
|
|
|
These tools are automatically available when native function calling is enabled.
|
|
|
|
IMPORTANT: DO NOT IMPORT THIS MODULE DIRECTLY IN OTHER PARTS OF THE CODEBASE.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import time
|
|
import asyncio
|
|
from typing import Optional
|
|
|
|
from fastapi import Request
|
|
|
|
from open_webui.models.users import UserModel
|
|
from open_webui.routers.retrieval import search_web as _search_web
|
|
from open_webui.retrieval.utils import get_content_from_url
|
|
from open_webui.routers.images import (
|
|
image_generations,
|
|
image_edits,
|
|
CreateImageForm,
|
|
EditImageForm,
|
|
)
|
|
from open_webui.routers.memories import (
|
|
query_memory,
|
|
add_memory as _add_memory,
|
|
update_memory_by_id,
|
|
QueryMemoryForm,
|
|
AddMemoryForm,
|
|
MemoryUpdateModel,
|
|
)
|
|
from open_webui.models.notes import Notes
|
|
from open_webui.models.chats import Chats
|
|
from open_webui.models.channels import Channels, ChannelMember, Channel
|
|
from open_webui.models.messages import Messages, Message
|
|
from open_webui.models.groups import Groups
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# =============================================================================
|
|
# TIME UTILITIES
|
|
# =============================================================================
|
|
|
|
|
|
async def get_current_timestamp(
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
Get the current Unix timestamp in seconds.
|
|
|
|
:return: JSON with current_timestamp (seconds) and current_iso (ISO format)
|
|
"""
|
|
try:
|
|
import datetime
|
|
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
|
return json.dumps(
|
|
{
|
|
"current_timestamp": int(now.timestamp()),
|
|
"current_iso": now.isoformat(),
|
|
},
|
|
ensure_ascii=False,
|
|
)
|
|
except Exception as e:
|
|
log.exception(f"get_current_timestamp error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
async def calculate_timestamp(
|
|
days_ago: int = 0,
|
|
weeks_ago: int = 0,
|
|
months_ago: int = 0,
|
|
years_ago: int = 0,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
Get the current Unix timestamp, optionally adjusted by days, weeks, months, or years.
|
|
Use this to calculate timestamps for date filtering in search functions.
|
|
Examples: "last week" = weeks_ago=1, "3 days ago" = days_ago=3, "a year ago" = years_ago=1
|
|
|
|
:param days_ago: Number of days to subtract from current time (default: 0)
|
|
:param weeks_ago: Number of weeks to subtract from current time (default: 0)
|
|
:param months_ago: Number of months to subtract from current time (default: 0)
|
|
:param years_ago: Number of years to subtract from current time (default: 0)
|
|
:return: JSON with current_timestamp and calculated_timestamp (both in seconds)
|
|
"""
|
|
try:
|
|
import datetime
|
|
from dateutil.relativedelta import relativedelta
|
|
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
|
current_ts = int(now.timestamp())
|
|
|
|
# Calculate the adjusted time
|
|
total_days = days_ago + (weeks_ago * 7)
|
|
adjusted = now - datetime.timedelta(days=total_days)
|
|
|
|
# Handle months and years separately (variable length)
|
|
if months_ago > 0 or years_ago > 0:
|
|
adjusted = adjusted - relativedelta(months=months_ago, years=years_ago)
|
|
|
|
adjusted_ts = int(adjusted.timestamp())
|
|
|
|
return json.dumps(
|
|
{
|
|
"current_timestamp": current_ts,
|
|
"current_iso": now.isoformat(),
|
|
"calculated_timestamp": adjusted_ts,
|
|
"calculated_iso": adjusted.isoformat(),
|
|
},
|
|
ensure_ascii=False,
|
|
)
|
|
except ImportError:
|
|
# Fallback without dateutil
|
|
import datetime
|
|
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
|
current_ts = int(now.timestamp())
|
|
total_days = days_ago + (weeks_ago * 7) + (months_ago * 30) + (years_ago * 365)
|
|
adjusted = now - datetime.timedelta(days=total_days)
|
|
adjusted_ts = int(adjusted.timestamp())
|
|
return json.dumps(
|
|
{
|
|
"current_timestamp": current_ts,
|
|
"current_iso": now.isoformat(),
|
|
"calculated_timestamp": adjusted_ts,
|
|
"calculated_iso": adjusted.isoformat(),
|
|
},
|
|
ensure_ascii=False,
|
|
)
|
|
except Exception as e:
|
|
log.exception(f"calculate_timestamp error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
# =============================================================================
|
|
# WEB SEARCH TOOLS
|
|
# =============================================================================
|
|
|
|
|
|
async def search_web(
|
|
query: str,
|
|
count: int = 5,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
Search the public web for information. Best for current events, external references,
|
|
or topics not covered in internal documents. If knowledge base tools are available,
|
|
consider checking those first for internal information.
|
|
|
|
:param query: The search query to look up
|
|
:param count: Number of results to return (default: 5)
|
|
:return: JSON with search results containing title, link, and snippet for each result
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
try:
|
|
engine = __request__.app.state.config.WEB_SEARCH_ENGINE
|
|
user = UserModel(**__user__) if __user__ else None
|
|
|
|
results = _search_web(__request__, engine, query, user)
|
|
|
|
# Limit results
|
|
results = results[:count] if results else []
|
|
|
|
return json.dumps(
|
|
[{"title": r.title, "link": r.link, "snippet": r.snippet} for r in results],
|
|
ensure_ascii=False,
|
|
)
|
|
except Exception as e:
|
|
log.exception(f"search_web error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
async def fetch_url(
|
|
url: str,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
Fetch and extract the main text content from a web page URL.
|
|
|
|
:param url: The URL to fetch content from
|
|
:return: The extracted text content from the page
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
try:
|
|
content, _ = await asyncio.to_thread(get_content_from_url, __request__, url)
|
|
|
|
# Truncate if too long (avoid overwhelming context)
|
|
max_length = 50000
|
|
if len(content) > max_length:
|
|
content = content[:max_length] + "\n\n[Content truncated...]"
|
|
|
|
return content
|
|
except Exception as e:
|
|
log.exception(f"fetch_url error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
# =============================================================================
|
|
# IMAGE GENERATION TOOLS
|
|
# =============================================================================
|
|
|
|
|
|
async def generate_image(
|
|
prompt: str,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
__event_emitter__: callable = None,
|
|
__chat_id__: str = None,
|
|
__message_id__: str = None,
|
|
) -> str:
|
|
"""
|
|
Generate an image based on a text prompt.
|
|
|
|
:param prompt: A detailed description of the image to generate
|
|
:return: Confirmation that the image was generated, or an error message
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
try:
|
|
user = UserModel(**__user__) if __user__ else None
|
|
|
|
images = await image_generations(
|
|
request=__request__,
|
|
form_data=CreateImageForm(prompt=prompt),
|
|
user=user,
|
|
)
|
|
|
|
# Prepare file entries for the images
|
|
image_files = [{"type": "image", "url": img["url"]} for img in images]
|
|
|
|
# Persist files to DB if chat context is available
|
|
if __chat_id__ and __message_id__ and images:
|
|
image_files = Chats.add_message_files_by_id_and_message_id(
|
|
__chat_id__,
|
|
__message_id__,
|
|
image_files,
|
|
)
|
|
|
|
# Emit the images to the UI if event emitter is available
|
|
if __event_emitter__ and image_files:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "chat:message:files",
|
|
"data": {
|
|
"files": image_files,
|
|
},
|
|
}
|
|
)
|
|
# Return a message indicating the image is already displayed
|
|
return json.dumps(
|
|
{
|
|
"status": "success",
|
|
"message": "The image has been successfully generated and is already visible to the user in the chat. You do not need to display or embed the image again - just acknowledge that it has been created.",
|
|
"images": images,
|
|
},
|
|
ensure_ascii=False,
|
|
)
|
|
|
|
return json.dumps({"status": "success", "images": images}, ensure_ascii=False)
|
|
except Exception as e:
|
|
log.exception(f"generate_image error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
async def edit_image(
|
|
prompt: str,
|
|
image_urls: list[str],
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
__event_emitter__: callable = None,
|
|
__chat_id__: str = None,
|
|
__message_id__: str = None,
|
|
) -> str:
|
|
"""
|
|
Edit existing images based on a text prompt.
|
|
|
|
:param prompt: A description of the changes to make to the images
|
|
:param image_urls: A list of URLs of the images to edit
|
|
:return: Confirmation that the images were edited, or an error message
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
try:
|
|
user = UserModel(**__user__) if __user__ else None
|
|
|
|
images = await image_edits(
|
|
request=__request__,
|
|
form_data=EditImageForm(prompt=prompt, image=image_urls),
|
|
user=user,
|
|
)
|
|
|
|
# Prepare file entries for the images
|
|
image_files = [{"type": "image", "url": img["url"]} for img in images]
|
|
|
|
# Persist files to DB if chat context is available
|
|
if __chat_id__ and __message_id__ and images:
|
|
image_files = Chats.add_message_files_by_id_and_message_id(
|
|
__chat_id__,
|
|
__message_id__,
|
|
image_files,
|
|
)
|
|
|
|
# Emit the images to the UI if event emitter is available
|
|
if __event_emitter__ and image_files:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "chat:message:files",
|
|
"data": {
|
|
"files": image_files,
|
|
},
|
|
}
|
|
)
|
|
# Return a message indicating the image is already displayed
|
|
return json.dumps(
|
|
{
|
|
"status": "success",
|
|
"message": "The edited image has been successfully generated and is already visible to the user in the chat. You do not need to display or embed the image again - just acknowledge that it has been created.",
|
|
"images": images,
|
|
},
|
|
ensure_ascii=False,
|
|
)
|
|
|
|
return json.dumps({"status": "success", "images": images}, ensure_ascii=False)
|
|
except Exception as e:
|
|
log.exception(f"edit_image error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
# =============================================================================
|
|
# MEMORY TOOLS
|
|
# =============================================================================
|
|
|
|
|
|
async def search_memories(
|
|
query: str,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
Search the user's stored memories for relevant information.
|
|
|
|
:param query: The search query to find relevant memories
|
|
:return: JSON with matching memories and their dates
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
try:
|
|
user = UserModel(**__user__) if __user__ else None
|
|
|
|
results = await query_memory(
|
|
__request__,
|
|
QueryMemoryForm(content=query, k=5),
|
|
user,
|
|
)
|
|
|
|
if results and hasattr(results, "documents") and results.documents:
|
|
memories = []
|
|
for doc_idx, doc in enumerate(results.documents[0]):
|
|
memory_id = None
|
|
if results.ids and results.ids[0]:
|
|
memory_id = results.ids[0][doc_idx]
|
|
created_at = "Unknown"
|
|
if results.metadatas and results.metadatas[0][doc_idx].get(
|
|
"created_at"
|
|
):
|
|
created_at = time.strftime(
|
|
"%Y-%m-%d",
|
|
time.localtime(results.metadatas[0][doc_idx]["created_at"]),
|
|
)
|
|
memories.append({"id": memory_id, "date": created_at, "content": doc})
|
|
return json.dumps(memories, ensure_ascii=False)
|
|
else:
|
|
return json.dumps([])
|
|
except Exception as e:
|
|
log.exception(f"search_memories error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
async def add_memory(
|
|
content: str,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
Store a new memory for the user.
|
|
|
|
:param content: The memory content to store
|
|
:return: Confirmation that the memory was stored
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
try:
|
|
user = UserModel(**__user__) if __user__ else None
|
|
|
|
memory = await _add_memory(
|
|
__request__,
|
|
AddMemoryForm(content=content),
|
|
user,
|
|
)
|
|
|
|
return json.dumps({"status": "success", "id": memory.id}, ensure_ascii=False)
|
|
except Exception as e:
|
|
log.exception(f"add_memory error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
async def replace_memory_content(
|
|
memory_id: str,
|
|
content: str,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
Update the content of an existing memory by its ID.
|
|
|
|
:param memory_id: The ID of the memory to update
|
|
:param content: The new content for the memory
|
|
:return: Confirmation that the memory was updated
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
try:
|
|
user = UserModel(**__user__) if __user__ else None
|
|
|
|
memory = await update_memory_by_id(
|
|
memory_id=memory_id,
|
|
request=__request__,
|
|
form_data=MemoryUpdateModel(content=content),
|
|
user=user,
|
|
)
|
|
|
|
return json.dumps(
|
|
{"status": "success", "id": memory.id, "content": memory.content},
|
|
ensure_ascii=False,
|
|
)
|
|
except Exception as e:
|
|
log.exception(f"replace_memory_content error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
# =============================================================================
|
|
# NOTES TOOLS
|
|
# =============================================================================
|
|
|
|
|
|
async def search_notes(
|
|
query: str,
|
|
count: int = 5,
|
|
start_timestamp: Optional[int] = None,
|
|
end_timestamp: Optional[int] = None,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
Search the user's notes by title and content.
|
|
|
|
:param query: The search query to find matching notes
|
|
:param count: Maximum number of results to return (default: 5)
|
|
:param start_timestamp: Only include notes updated after this Unix timestamp (seconds)
|
|
:param end_timestamp: Only include notes updated before this Unix timestamp (seconds)
|
|
:return: JSON with matching notes containing id, title, and content snippet
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
if not __user__:
|
|
return json.dumps({"error": "User context not available"})
|
|
|
|
try:
|
|
user_id = __user__.get("id")
|
|
user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)]
|
|
|
|
result = Notes.search_notes(
|
|
user_id=user_id,
|
|
filter={
|
|
"query": query,
|
|
"user_id": user_id,
|
|
"group_ids": user_group_ids,
|
|
"permission": "read",
|
|
},
|
|
skip=0,
|
|
limit=count * 3, # Fetch more for filtering
|
|
)
|
|
|
|
# Convert timestamps to nanoseconds for comparison
|
|
start_ts = start_timestamp * 1_000_000_000 if start_timestamp else None
|
|
end_ts = end_timestamp * 1_000_000_000 if end_timestamp else None
|
|
|
|
notes = []
|
|
for note in result.items:
|
|
# Apply date filters (updated_at is in nanoseconds)
|
|
if start_ts and note.updated_at < start_ts:
|
|
continue
|
|
if end_ts and note.updated_at > end_ts:
|
|
continue
|
|
|
|
# Extract a snippet from the markdown content
|
|
content_snippet = ""
|
|
if note.data and note.data.get("content", {}).get("md"):
|
|
md_content = note.data["content"]["md"]
|
|
lower_content = md_content.lower()
|
|
lower_query = query.lower()
|
|
idx = lower_content.find(lower_query)
|
|
if idx != -1:
|
|
start = max(0, idx - 50)
|
|
end = min(len(md_content), idx + len(query) + 100)
|
|
content_snippet = (
|
|
("..." if start > 0 else "")
|
|
+ md_content[start:end]
|
|
+ ("..." if end < len(md_content) else "")
|
|
)
|
|
else:
|
|
content_snippet = md_content[:150] + (
|
|
"..." if len(md_content) > 150 else ""
|
|
)
|
|
|
|
notes.append(
|
|
{
|
|
"id": note.id,
|
|
"title": note.title,
|
|
"snippet": content_snippet,
|
|
"updated_at": note.updated_at,
|
|
}
|
|
)
|
|
|
|
if len(notes) >= count:
|
|
break
|
|
|
|
return json.dumps(notes, ensure_ascii=False)
|
|
except Exception as e:
|
|
log.exception(f"search_notes error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
async def view_note(
|
|
note_id: str,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
Get the full content of a note by its ID.
|
|
|
|
:param note_id: The ID of the note to retrieve
|
|
:return: JSON with the note's id, title, and full markdown content
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
if not __user__:
|
|
return json.dumps({"error": "User context not available"})
|
|
|
|
try:
|
|
note = Notes.get_note_by_id(note_id)
|
|
|
|
if not note:
|
|
return json.dumps({"error": "Note not found"})
|
|
|
|
# Check access permission
|
|
user_id = __user__.get("id")
|
|
user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)]
|
|
|
|
from open_webui.utils.access_control import has_access
|
|
|
|
if note.user_id != user_id and not has_access(
|
|
user_id, "read", note.access_control, user_group_ids
|
|
):
|
|
return json.dumps({"error": "Access denied"})
|
|
|
|
# Extract markdown content
|
|
content = ""
|
|
if note.data and note.data.get("content", {}).get("md"):
|
|
content = note.data["content"]["md"]
|
|
|
|
return json.dumps(
|
|
{
|
|
"id": note.id,
|
|
"title": note.title,
|
|
"content": content,
|
|
"updated_at": note.updated_at,
|
|
"created_at": note.created_at,
|
|
},
|
|
ensure_ascii=False,
|
|
)
|
|
except Exception as e:
|
|
log.exception(f"view_note error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
async def write_note(
|
|
title: str,
|
|
content: str,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
Create a new note with the given title and content.
|
|
|
|
:param title: The title of the new note
|
|
:param content: The markdown content for the note
|
|
:return: JSON with success status and new note id
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
if not __user__:
|
|
return json.dumps({"error": "User context not available"})
|
|
|
|
try:
|
|
from open_webui.models.notes import NoteForm
|
|
|
|
user_id = __user__.get("id")
|
|
|
|
form = NoteForm(
|
|
title=title,
|
|
data={"content": {"md": content}},
|
|
access_control={}, # Private by default - only owner can access
|
|
)
|
|
|
|
new_note = Notes.insert_new_note(user_id, form)
|
|
|
|
if not new_note:
|
|
return json.dumps({"error": "Failed to create note"})
|
|
|
|
return json.dumps(
|
|
{
|
|
"status": "success",
|
|
"id": new_note.id,
|
|
"title": new_note.title,
|
|
"created_at": new_note.created_at,
|
|
},
|
|
ensure_ascii=False,
|
|
)
|
|
except Exception as e:
|
|
log.exception(f"write_note error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
async def replace_note_content(
|
|
note_id: str,
|
|
content: str,
|
|
title: Optional[str] = None,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
Update the content of a note. Use this to modify task lists, add notes, or update content.
|
|
|
|
:param note_id: The ID of the note to update
|
|
:param content: The new markdown content for the note
|
|
:param title: Optional new title for the note
|
|
:return: JSON with success status and updated note info
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
if not __user__:
|
|
return json.dumps({"error": "User context not available"})
|
|
|
|
try:
|
|
from open_webui.models.notes import NoteUpdateForm
|
|
|
|
note = Notes.get_note_by_id(note_id)
|
|
|
|
if not note:
|
|
return json.dumps({"error": "Note not found"})
|
|
|
|
# Check write permission
|
|
user_id = __user__.get("id")
|
|
user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)]
|
|
|
|
from open_webui.utils.access_control import has_access
|
|
|
|
if note.user_id != user_id and not has_access(
|
|
user_id, "write", note.access_control, user_group_ids
|
|
):
|
|
return json.dumps({"error": "Write access denied"})
|
|
|
|
# Build update form
|
|
update_data = {"data": {"content": {"md": content}}}
|
|
if title:
|
|
update_data["title"] = title
|
|
|
|
form = NoteUpdateForm(**update_data)
|
|
updated_note = Notes.update_note_by_id(note_id, form)
|
|
|
|
if not updated_note:
|
|
return json.dumps({"error": "Failed to update note"})
|
|
|
|
return json.dumps(
|
|
{
|
|
"status": "success",
|
|
"id": updated_note.id,
|
|
"title": updated_note.title,
|
|
"updated_at": updated_note.updated_at,
|
|
},
|
|
ensure_ascii=False,
|
|
)
|
|
except Exception as e:
|
|
log.exception(f"replace_note_content error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
# =============================================================================
|
|
# CHATS TOOLS
|
|
# =============================================================================
|
|
|
|
|
|
async def search_chats(
|
|
query: str,
|
|
count: int = 5,
|
|
start_timestamp: Optional[int] = None,
|
|
end_timestamp: Optional[int] = None,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
Search the user's previous chat conversations by title and message content.
|
|
|
|
:param query: The search query to find matching chats
|
|
:param count: Maximum number of results to return (default: 5)
|
|
:param start_timestamp: Only include chats updated after this Unix timestamp (seconds)
|
|
:param end_timestamp: Only include chats updated before this Unix timestamp (seconds)
|
|
:return: JSON with matching chats containing id, title, updated_at, and content snippet
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
if not __user__:
|
|
return json.dumps({"error": "User context not available"})
|
|
|
|
try:
|
|
user_id = __user__.get("id")
|
|
|
|
chats = Chats.get_chats_by_user_id_and_search_text(
|
|
user_id=user_id,
|
|
search_text=query,
|
|
include_archived=False,
|
|
skip=0,
|
|
limit=count * 3, # Fetch more for filtering
|
|
)
|
|
|
|
results = []
|
|
for chat in chats:
|
|
# Apply date filters (updated_at is in seconds)
|
|
if start_timestamp and chat.updated_at < start_timestamp:
|
|
continue
|
|
if end_timestamp and chat.updated_at > end_timestamp:
|
|
continue
|
|
|
|
# Find a matching message snippet
|
|
snippet = ""
|
|
messages = chat.chat.get("history", {}).get("messages", {})
|
|
lower_query = query.lower()
|
|
|
|
for msg_id, msg in messages.items():
|
|
content = msg.get("content", "")
|
|
if isinstance(content, str) and lower_query in content.lower():
|
|
idx = content.lower().find(lower_query)
|
|
start = max(0, idx - 50)
|
|
end = min(len(content), idx + len(query) + 100)
|
|
snippet = (
|
|
("..." if start > 0 else "")
|
|
+ content[start:end]
|
|
+ ("..." if end < len(content) else "")
|
|
)
|
|
break
|
|
|
|
if not snippet and lower_query in chat.title.lower():
|
|
snippet = f"Title match: {chat.title}"
|
|
|
|
results.append(
|
|
{
|
|
"id": chat.id,
|
|
"title": chat.title,
|
|
"snippet": snippet,
|
|
"updated_at": chat.updated_at,
|
|
}
|
|
)
|
|
|
|
if len(results) >= count:
|
|
break
|
|
|
|
return json.dumps(results, ensure_ascii=False)
|
|
except Exception as e:
|
|
log.exception(f"search_chats error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
async def view_chat(
|
|
chat_id: str,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
Get the full conversation history of a chat by its ID.
|
|
|
|
:param chat_id: The ID of the chat to retrieve
|
|
:return: JSON with the chat's id, title, and messages
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
if not __user__:
|
|
return json.dumps({"error": "User context not available"})
|
|
|
|
try:
|
|
user_id = __user__.get("id")
|
|
|
|
chat = Chats.get_chat_by_id_and_user_id(chat_id, user_id)
|
|
|
|
if not chat:
|
|
return json.dumps({"error": "Chat not found or access denied"})
|
|
|
|
# Extract messages from history
|
|
messages = []
|
|
history = chat.chat.get("history", {})
|
|
msg_dict = history.get("messages", {})
|
|
|
|
# Build message chain from currentId
|
|
current_id = history.get("currentId")
|
|
visited = set()
|
|
|
|
while current_id and current_id not in visited:
|
|
visited.add(current_id)
|
|
msg = msg_dict.get(current_id)
|
|
if msg:
|
|
messages.append(
|
|
{
|
|
"role": msg.get("role", ""),
|
|
"content": msg.get("content", ""),
|
|
}
|
|
)
|
|
current_id = msg.get("parentId") if msg else None
|
|
|
|
# Reverse to get chronological order
|
|
messages.reverse()
|
|
|
|
return json.dumps(
|
|
{
|
|
"id": chat.id,
|
|
"title": chat.title,
|
|
"messages": messages,
|
|
"updated_at": chat.updated_at,
|
|
"created_at": chat.created_at,
|
|
},
|
|
ensure_ascii=False,
|
|
)
|
|
except Exception as e:
|
|
log.exception(f"view_chat error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
# =============================================================================
|
|
# CHANNELS TOOLS
|
|
# =============================================================================
|
|
|
|
|
|
async def search_channels(
|
|
query: str,
|
|
count: int = 5,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
Search for channels by name and description that the user has access to.
|
|
|
|
:param query: The search query to find matching channels
|
|
:param count: Maximum number of results to return (default: 5)
|
|
:return: JSON with matching channels containing id, name, description, and type
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
if not __user__:
|
|
return json.dumps({"error": "User context not available"})
|
|
|
|
try:
|
|
user_id = __user__.get("id")
|
|
|
|
# Get all channels the user has access to
|
|
all_channels = Channels.get_channels_by_user_id(user_id)
|
|
|
|
# Filter by query
|
|
lower_query = query.lower()
|
|
matching_channels = []
|
|
|
|
for channel in all_channels:
|
|
name_match = lower_query in channel.name.lower() if channel.name else False
|
|
desc_match = lower_query in (channel.description or "").lower()
|
|
|
|
if name_match or desc_match:
|
|
matching_channels.append(
|
|
{
|
|
"id": channel.id,
|
|
"name": channel.name,
|
|
"description": channel.description or "",
|
|
"type": channel.type or "public",
|
|
}
|
|
)
|
|
|
|
if len(matching_channels) >= count:
|
|
break
|
|
|
|
return json.dumps(matching_channels, ensure_ascii=False)
|
|
except Exception as e:
|
|
log.exception(f"search_channels error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
async def search_channel_messages(
|
|
query: str,
|
|
count: int = 10,
|
|
start_timestamp: Optional[int] = None,
|
|
end_timestamp: Optional[int] = None,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
Search for messages in channels the user is a member of, including thread replies.
|
|
|
|
:param query: The search query to find matching messages
|
|
:param count: Maximum number of results to return (default: 10)
|
|
:param start_timestamp: Only include messages created after this Unix timestamp (seconds)
|
|
:param end_timestamp: Only include messages created before this Unix timestamp (seconds)
|
|
:return: JSON with matching messages containing channel info, message content, and thread context
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
if not __user__:
|
|
return json.dumps({"error": "User context not available"})
|
|
|
|
try:
|
|
user_id = __user__.get("id")
|
|
|
|
# Get all channels the user has access to
|
|
user_channels = Channels.get_channels_by_user_id(user_id)
|
|
channel_ids = [c.id for c in user_channels]
|
|
channel_map = {c.id: c for c in user_channels}
|
|
|
|
if not channel_ids:
|
|
return json.dumps([])
|
|
|
|
# Convert timestamps to nanoseconds (Message.created_at is in nanoseconds)
|
|
start_ts = start_timestamp * 1_000_000_000 if start_timestamp else None
|
|
end_ts = end_timestamp * 1_000_000_000 if end_timestamp else None
|
|
|
|
# Search messages using the model method
|
|
matching_messages = Messages.search_messages_by_channel_ids(
|
|
channel_ids=channel_ids,
|
|
query=query,
|
|
start_timestamp=start_ts,
|
|
end_timestamp=end_ts,
|
|
limit=count,
|
|
)
|
|
|
|
results = []
|
|
for msg in matching_messages:
|
|
channel = channel_map.get(msg.channel_id)
|
|
|
|
# Extract snippet around the match
|
|
content = msg.content or ""
|
|
lower_query = query.lower()
|
|
idx = content.lower().find(lower_query)
|
|
if idx != -1:
|
|
start = max(0, idx - 50)
|
|
end = min(len(content), idx + len(query) + 100)
|
|
snippet = (
|
|
("..." if start > 0 else "")
|
|
+ content[start:end]
|
|
+ ("..." if end < len(content) else "")
|
|
)
|
|
else:
|
|
snippet = content[:150] + ("..." if len(content) > 150 else "")
|
|
|
|
results.append(
|
|
{
|
|
"channel_id": msg.channel_id,
|
|
"channel_name": channel.name if channel else "Unknown",
|
|
"message_id": msg.id,
|
|
"content_snippet": snippet,
|
|
"is_thread_reply": msg.parent_id is not None,
|
|
"parent_id": msg.parent_id,
|
|
"created_at": msg.created_at,
|
|
}
|
|
)
|
|
|
|
return json.dumps(results, ensure_ascii=False)
|
|
except Exception as e:
|
|
log.exception(f"search_channel_messages error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
async def view_channel_message(
|
|
message_id: str,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
Get the full content of a channel message by its ID, including thread replies.
|
|
|
|
:param message_id: The ID of the message to retrieve
|
|
:return: JSON with the message content, channel info, and thread replies if any
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
if not __user__:
|
|
return json.dumps({"error": "User context not available"})
|
|
|
|
try:
|
|
user_id = __user__.get("id")
|
|
|
|
message = Messages.get_message_by_id(message_id)
|
|
|
|
if not message:
|
|
return json.dumps({"error": "Message not found"})
|
|
|
|
# Verify user has access to the channel
|
|
channel = Channels.get_channel_by_id(message.channel_id)
|
|
if not channel:
|
|
return json.dumps({"error": "Channel not found"})
|
|
|
|
# Check if user has access to the channel
|
|
user_channels = Channels.get_channels_by_user_id(user_id)
|
|
channel_ids = [c.id for c in user_channels]
|
|
|
|
if message.channel_id not in channel_ids:
|
|
return json.dumps({"error": "Access denied"})
|
|
|
|
# Build response with thread information
|
|
result = {
|
|
"id": message.id,
|
|
"channel_id": message.channel_id,
|
|
"channel_name": channel.name,
|
|
"content": message.content,
|
|
"user_id": message.user_id,
|
|
"is_thread_reply": message.parent_id is not None,
|
|
"parent_id": message.parent_id,
|
|
"reply_count": message.reply_count,
|
|
"created_at": message.created_at,
|
|
"updated_at": message.updated_at,
|
|
}
|
|
|
|
# Include user info if available
|
|
if message.user:
|
|
result["user_name"] = message.user.name
|
|
|
|
return json.dumps(result, ensure_ascii=False)
|
|
except Exception as e:
|
|
log.exception(f"view_channel_message error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
async def view_channel_thread(
|
|
parent_message_id: str,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
Get all messages in a channel thread, including the parent message and all replies.
|
|
|
|
:param parent_message_id: The ID of the parent message that started the thread
|
|
:return: JSON with the parent message and all thread replies in chronological order
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
if not __user__:
|
|
return json.dumps({"error": "User context not available"})
|
|
|
|
try:
|
|
user_id = __user__.get("id")
|
|
|
|
# Get the parent message
|
|
parent_message = Messages.get_message_by_id(parent_message_id)
|
|
|
|
if not parent_message:
|
|
return json.dumps({"error": "Message not found"})
|
|
|
|
# Verify user has access to the channel
|
|
channel = Channels.get_channel_by_id(parent_message.channel_id)
|
|
if not channel:
|
|
return json.dumps({"error": "Channel not found"})
|
|
|
|
user_channels = Channels.get_channels_by_user_id(user_id)
|
|
channel_ids = [c.id for c in user_channels]
|
|
|
|
if parent_message.channel_id not in channel_ids:
|
|
return json.dumps({"error": "Access denied"})
|
|
|
|
# Get all thread replies
|
|
thread_replies = Messages.get_thread_replies_by_message_id(parent_message_id)
|
|
|
|
# Build the response
|
|
messages = []
|
|
|
|
# Add parent message first
|
|
messages.append(
|
|
{
|
|
"id": parent_message.id,
|
|
"content": parent_message.content,
|
|
"user_id": parent_message.user_id,
|
|
"user_name": parent_message.user.name if parent_message.user else None,
|
|
"is_parent": True,
|
|
"created_at": parent_message.created_at,
|
|
}
|
|
)
|
|
|
|
# Add thread replies (reverse to get chronological order)
|
|
for reply in reversed(thread_replies):
|
|
messages.append(
|
|
{
|
|
"id": reply.id,
|
|
"content": reply.content,
|
|
"user_id": reply.user_id,
|
|
"user_name": reply.user.name if reply.user else None,
|
|
"is_parent": False,
|
|
"reply_to_id": reply.reply_to_id,
|
|
"created_at": reply.created_at,
|
|
}
|
|
)
|
|
|
|
return json.dumps(
|
|
{
|
|
"channel_id": parent_message.channel_id,
|
|
"channel_name": channel.name,
|
|
"thread_id": parent_message_id,
|
|
"message_count": len(messages),
|
|
"messages": messages,
|
|
},
|
|
ensure_ascii=False,
|
|
)
|
|
except Exception as e:
|
|
log.exception(f"view_channel_thread error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
# =============================================================================
|
|
# KNOWLEDGE BASE TOOLS
|
|
# =============================================================================
|
|
|
|
|
|
async def list_knowledge_bases(
|
|
count: int = 10,
|
|
skip: int = 0,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
List the user's accessible knowledge bases.
|
|
|
|
:param count: Maximum number of KBs to return (default: 10)
|
|
:param skip: Number of results to skip for pagination (default: 0)
|
|
:return: JSON with KBs containing id, name, description, and file_count
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
if not __user__:
|
|
return json.dumps({"error": "User context not available"})
|
|
|
|
try:
|
|
from open_webui.models.knowledge import Knowledges
|
|
|
|
user_id = __user__.get("id")
|
|
user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)]
|
|
|
|
result = Knowledges.search_knowledge_bases(
|
|
user_id,
|
|
filter={
|
|
"query": "",
|
|
"user_id": user_id,
|
|
"group_ids": user_group_ids,
|
|
},
|
|
skip=skip,
|
|
limit=count,
|
|
)
|
|
|
|
knowledge_bases = []
|
|
for knowledge_base in result.items:
|
|
files = Knowledges.get_files_by_id(knowledge_base.id)
|
|
file_count = len(files) if files else 0
|
|
|
|
knowledge_bases.append(
|
|
{
|
|
"id": knowledge_base.id,
|
|
"name": knowledge_base.name,
|
|
"description": knowledge_base.description or "",
|
|
"file_count": file_count,
|
|
"updated_at": knowledge_base.updated_at,
|
|
}
|
|
)
|
|
|
|
return json.dumps(knowledge_bases, ensure_ascii=False)
|
|
except Exception as e:
|
|
log.exception(f"list_knowledge_bases error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
async def search_knowledge_bases(
|
|
query: str,
|
|
count: int = 5,
|
|
skip: int = 0,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
Search the user's accessible knowledge bases by name and description.
|
|
|
|
:param query: The search query to find matching knowledge bases
|
|
:param count: Maximum number of results to return (default: 5)
|
|
:param skip: Number of results to skip for pagination (default: 0)
|
|
:return: JSON with matching KBs containing id, name, description, and file_count
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
if not __user__:
|
|
return json.dumps({"error": "User context not available"})
|
|
|
|
try:
|
|
from open_webui.models.knowledge import Knowledges
|
|
|
|
user_id = __user__.get("id")
|
|
user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)]
|
|
|
|
result = Knowledges.search_knowledge_bases(
|
|
user_id,
|
|
filter={
|
|
"query": query,
|
|
"user_id": user_id,
|
|
"group_ids": user_group_ids,
|
|
},
|
|
skip=skip,
|
|
limit=count,
|
|
)
|
|
|
|
knowledge_bases = []
|
|
for knowledge_base in result.items:
|
|
files = Knowledges.get_files_by_id(knowledge_base.id)
|
|
file_count = len(files) if files else 0
|
|
|
|
knowledge_bases.append(
|
|
{
|
|
"id": knowledge_base.id,
|
|
"name": knowledge_base.name,
|
|
"description": knowledge_base.description or "",
|
|
"file_count": file_count,
|
|
"updated_at": knowledge_base.updated_at,
|
|
}
|
|
)
|
|
|
|
return json.dumps(knowledge_bases, ensure_ascii=False)
|
|
except Exception as e:
|
|
log.exception(f"search_knowledge_bases error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
async def search_knowledge_files(
|
|
query: str,
|
|
knowledge_id: Optional[str] = None,
|
|
count: int = 5,
|
|
skip: int = 0,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
Search files across knowledge bases the user has access to.
|
|
|
|
:param query: The search query to find matching files by filename
|
|
:param knowledge_id: Optional KB id to limit search to a specific knowledge base
|
|
:param count: Maximum number of results to return (default: 5)
|
|
:param skip: Number of results to skip for pagination (default: 0)
|
|
:return: JSON with matching files containing id, filename, and updated_at
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
if not __user__:
|
|
return json.dumps({"error": "User context not available"})
|
|
|
|
try:
|
|
from open_webui.models.knowledge import Knowledges
|
|
|
|
user_id = __user__.get("id")
|
|
user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)]
|
|
|
|
if knowledge_id:
|
|
result = Knowledges.search_files_by_id(
|
|
knowledge_id=knowledge_id,
|
|
user_id=user_id,
|
|
filter={"query": query},
|
|
skip=skip,
|
|
limit=count,
|
|
)
|
|
else:
|
|
result = Knowledges.search_knowledge_files(
|
|
filter={
|
|
"query": query,
|
|
"user_id": user_id,
|
|
"group_ids": user_group_ids,
|
|
},
|
|
skip=skip,
|
|
limit=count,
|
|
)
|
|
|
|
files = []
|
|
for file in result.items:
|
|
file_info = {
|
|
"id": file.id,
|
|
"filename": file.filename,
|
|
"updated_at": file.updated_at,
|
|
}
|
|
if hasattr(file, "collection") and file.collection:
|
|
file_info["knowledge_id"] = file.collection.get("id", "")
|
|
file_info["knowledge_name"] = file.collection.get("name", "")
|
|
files.append(file_info)
|
|
|
|
return json.dumps(files, ensure_ascii=False)
|
|
except Exception as e:
|
|
log.exception(f"search_knowledge_files error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
async def view_knowledge_file(
|
|
file_id: str,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
) -> str:
|
|
"""
|
|
Get the full content of a file from a knowledge base.
|
|
|
|
:param file_id: The ID of the file to retrieve
|
|
:return: JSON with the file's id, filename, and full text content
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
if not __user__:
|
|
return json.dumps({"error": "User context not available"})
|
|
|
|
try:
|
|
from open_webui.models.files import Files
|
|
from open_webui.models.knowledge import Knowledges
|
|
from open_webui.utils.access_control import has_access
|
|
|
|
user_id = __user__.get("id")
|
|
user_role = __user__.get("role", "user")
|
|
user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)]
|
|
|
|
file = Files.get_file_by_id(file_id)
|
|
if not file:
|
|
return json.dumps({"error": "File not found"})
|
|
|
|
# Check access via any KB containing this file
|
|
knowledges = Knowledges.get_knowledges_by_file_id(file_id)
|
|
has_knowledge_access = False
|
|
knowledge_info = None
|
|
|
|
for knowledge_base in knowledges:
|
|
if (
|
|
user_role == "admin"
|
|
or knowledge_base.user_id == user_id
|
|
or has_access(user_id, "read", knowledge_base.access_control, user_group_ids)
|
|
):
|
|
has_knowledge_access = True
|
|
knowledge_info = {"id": knowledge_base.id, "name": knowledge_base.name}
|
|
break
|
|
|
|
if not has_knowledge_access:
|
|
if file.user_id != user_id and user_role != "admin":
|
|
return json.dumps({"error": "Access denied"})
|
|
|
|
content = ""
|
|
if file.data:
|
|
content = file.data.get("content", "")
|
|
|
|
result = {
|
|
"id": file.id,
|
|
"filename": file.filename,
|
|
"content": content,
|
|
"updated_at": file.updated_at,
|
|
"created_at": file.created_at,
|
|
}
|
|
if knowledge_info:
|
|
result["knowledge_id"] = knowledge_info["id"]
|
|
result["knowledge_name"] = knowledge_info["name"]
|
|
|
|
return json.dumps(result, ensure_ascii=False)
|
|
except Exception as e:
|
|
log.exception(f"view_knowledge_file error: {e}")
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
async def query_knowledge_bases(
|
|
query: str,
|
|
knowledge_ids: Optional[list[str]] = None,
|
|
count: int = 5,
|
|
__request__: Request = None,
|
|
__user__: dict = None,
|
|
__model_knowledge__: list[dict] = None,
|
|
) -> str:
|
|
"""
|
|
Search internal knowledge bases using semantic/vector search. This should be your first
|
|
choice for finding information before searching the web. Searches across collections (KBs),
|
|
individual files, and notes that the user has access to.
|
|
|
|
:param query: The search query to find semantically relevant content
|
|
:param knowledge_ids: Optional list of KB ids to limit search to specific knowledge bases
|
|
:param count: Maximum number of results to return (default: 5)
|
|
:return: JSON with relevant chunks containing content, source filename, and relevance score
|
|
"""
|
|
if __request__ is None:
|
|
return json.dumps({"error": "Request context not available"})
|
|
|
|
if not __user__:
|
|
return json.dumps({"error": "User context not available"})
|
|
|
|
try:
|
|
from open_webui.models.knowledge import Knowledges
|
|
from open_webui.models.files import Files
|
|
from open_webui.models.notes import Notes
|
|
from open_webui.retrieval.utils import query_collection
|
|
from open_webui.utils.access_control import has_access
|
|
|
|
user_id = __user__.get("id")
|
|
user_role = __user__.get("role", "user")
|
|
user_group_ids = [group.id for group in Groups.get_groups_by_member_id(user_id)]
|
|
|
|
embedding_function = __request__.app.state.EMBEDDING_FUNCTION
|
|
if not embedding_function:
|
|
return json.dumps({"error": "Embedding function not configured"})
|
|
|
|
collection_names = []
|
|
note_results = [] # Notes aren't vectorized, handle separately
|
|
|
|
# If model has attached knowledge, use those
|
|
if __model_knowledge__:
|
|
for item in __model_knowledge__:
|
|
item_type = item.get("type")
|
|
item_id = item.get("id")
|
|
|
|
if item_type == "collection":
|
|
# Knowledge base - use KB ID as collection name
|
|
knowledge = Knowledges.get_knowledge_by_id(item_id)
|
|
if knowledge and (
|
|
user_role == "admin"
|
|
or knowledge.user_id == user_id
|
|
or has_access(user_id, "read", knowledge.access_control, user_group_ids)
|
|
):
|
|
collection_names.append(item_id)
|
|
|
|
elif item_type == "file":
|
|
# Individual file - use file-{id} as collection name
|
|
file = Files.get_file_by_id(item_id)
|
|
if file and (user_role == "admin" or file.user_id == user_id):
|
|
collection_names.append(f"file-{item_id}")
|
|
|
|
elif item_type == "note":
|
|
# Note - always return full content as context
|
|
note = Notes.get_note_by_id(item_id)
|
|
if note and (
|
|
user_role == "admin"
|
|
or note.user_id == user_id
|
|
or has_access(user_id, "read", note.access_control)
|
|
):
|
|
content = note.data.get("content", {}).get("md", "")
|
|
note_results.append({
|
|
"content": content,
|
|
"source": note.title,
|
|
"note_id": note.id,
|
|
"type": "note",
|
|
})
|
|
|
|
elif knowledge_ids:
|
|
# User specified specific KBs
|
|
for knowledge_id in knowledge_ids:
|
|
knowledge = Knowledges.get_knowledge_by_id(knowledge_id)
|
|
if knowledge and (
|
|
user_role == "admin"
|
|
or knowledge.user_id == user_id
|
|
or has_access(user_id, "read", knowledge.access_control, user_group_ids)
|
|
):
|
|
collection_names.append(knowledge_id)
|
|
else:
|
|
# No model knowledge and no specific IDs - search all accessible KBs
|
|
result = Knowledges.search_knowledge_bases(
|
|
user_id,
|
|
filter={
|
|
"query": "",
|
|
"user_id": user_id,
|
|
"group_ids": user_group_ids,
|
|
},
|
|
skip=0,
|
|
limit=50,
|
|
)
|
|
collection_names = [knowledge_base.id for knowledge_base in result.items]
|
|
|
|
chunks = []
|
|
|
|
# Add note results first
|
|
chunks.extend(note_results)
|
|
|
|
# Query vector collections if any
|
|
if collection_names:
|
|
query_results = await query_collection(
|
|
collection_names=collection_names,
|
|
queries=[query],
|
|
embedding_function=embedding_function,
|
|
k=count,
|
|
)
|
|
|
|
if query_results and "documents" in query_results:
|
|
documents = query_results.get("documents", [[]])[0]
|
|
metadatas = query_results.get("metadatas", [[]])[0]
|
|
distances = query_results.get("distances", [[]])[0]
|
|
|
|
for idx, doc in enumerate(documents):
|
|
chunk_info = {
|
|
"content": doc,
|
|
"source": metadatas[idx].get("source", metadatas[idx].get("name", "Unknown")),
|
|
"file_id": metadatas[idx].get("file_id", ""),
|
|
}
|
|
if idx < len(distances):
|
|
chunk_info["distance"] = distances[idx]
|
|
chunks.append(chunk_info)
|
|
|
|
# Limit to requested count
|
|
chunks = chunks[:count]
|
|
|
|
return json.dumps(chunks, ensure_ascii=False)
|
|
except Exception as e:
|
|
log.exception(f"query_knowledge_bases error: {e}")
|
|
return json.dumps({"error": str(e)})
|