mirror of
https://github.com/open-webui/open-webui.git
synced 2026-06-19 18:19:02 -05:00
enh: channels streaming agent
This commit is contained in:
@@ -1796,7 +1796,7 @@ async def chat_completion(
|
||||
|
||||
if metadata.get('chat_id') and user:
|
||||
chat_id = metadata['chat_id']
|
||||
if not chat_id.startswith('local:'): # temporary chats are not stored
|
||||
if not chat_id.startswith('local:') and not chat_id.startswith('channel:'): # temporary/channel chats are not stored
|
||||
if is_new_chat:
|
||||
# Build the full history upfront with ALL assistant placeholders
|
||||
user_message = metadata.get('user_message') or {}
|
||||
@@ -2012,7 +2012,7 @@ async def chat_completion(
|
||||
if metadata.get('chat_id') and metadata.get('message_id'):
|
||||
# Update the chat message with the error
|
||||
try:
|
||||
if not metadata['chat_id'].startswith('local:'):
|
||||
if not metadata['chat_id'].startswith('local:') and not metadata['chat_id'].startswith('channel:'):
|
||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||
metadata['chat_id'],
|
||||
metadata['message_id'],
|
||||
@@ -2275,7 +2275,7 @@ async def list_tasks_endpoint(request: Request, user=Depends(get_admin_user)):
|
||||
|
||||
@app.get('/api/tasks/chat/{chat_id:path}')
|
||||
async def list_tasks_by_chat_id_endpoint(request: Request, chat_id: str, user=Depends(get_verified_user)):
|
||||
if chat_id.startswith('local:'):
|
||||
if chat_id.startswith('local:') or chat_id.startswith('channel:'):
|
||||
socket_id = chat_id[len('local:') :]
|
||||
owner_id = get_user_id_from_session_pool(socket_id)
|
||||
if owner_id != user.id and user.role != 'admin':
|
||||
@@ -2293,7 +2293,7 @@ async def list_tasks_by_chat_id_endpoint(request: Request, chat_id: str, user=De
|
||||
|
||||
@app.post('/api/tasks/chat/{chat_id:path}/stop')
|
||||
async def stop_tasks_by_chat_id_endpoint(request: Request, chat_id: str, user=Depends(get_verified_user)):
|
||||
if chat_id.startswith('local:'):
|
||||
if chat_id.startswith('local:') or chat_id.startswith('channel:'):
|
||||
socket_id = chat_id[len('local:') :]
|
||||
owner_id = get_user_id_from_session_pool(socket_id)
|
||||
if owner_id != user.id and user.role != 'admin':
|
||||
|
||||
@@ -57,7 +57,7 @@ from open_webui.utils.models import (
|
||||
get_all_models,
|
||||
get_filtered_models,
|
||||
)
|
||||
from open_webui.utils.chat import generate_chat_completion
|
||||
|
||||
|
||||
|
||||
from open_webui.utils.auth import get_admin_user, get_verified_user
|
||||
@@ -979,57 +979,50 @@ async def model_response_handler(request, channel, message, user, db=None):
|
||||
],
|
||||
]
|
||||
|
||||
# Resolve model config (same helpers automations use)
|
||||
from open_webui.utils.automations import (
|
||||
_resolve_model_tool_ids,
|
||||
_resolve_model_features,
|
||||
_resolve_model_filter_ids,
|
||||
)
|
||||
|
||||
tool_ids = _resolve_model_tool_ids(request.app, model_id)
|
||||
features = _resolve_model_features(request.app, model_id)
|
||||
filter_ids = _resolve_model_filter_ids(request.app, model_id)
|
||||
|
||||
# Build full form_data — same shape as frontend POST.
|
||||
# The channel: prefix routes pipeline events to the
|
||||
# channel emitter in socket/main.py instead of the
|
||||
# default chat emitter.
|
||||
form_data = {
|
||||
'model': model_id,
|
||||
'messages': [
|
||||
system_message,
|
||||
{'role': 'user', 'content': content},
|
||||
],
|
||||
'stream': False,
|
||||
'stream': True,
|
||||
'chat_id': f'channel:{channel.id}',
|
||||
'id': response_message.id,
|
||||
'session_id': f'channel:{channel.id}',
|
||||
'background_tasks': {},
|
||||
}
|
||||
if tool_ids:
|
||||
form_data['tool_ids'] = tool_ids
|
||||
if features:
|
||||
form_data['features'] = features
|
||||
if filter_ids:
|
||||
form_data['filter_ids'] = filter_ids
|
||||
|
||||
res = await generate_chat_completion(
|
||||
request,
|
||||
form_data=form_data,
|
||||
user=user,
|
||||
# Call the full chat completion pipeline — streaming,
|
||||
# tools, filters, RAG — everything. The pipeline runs as
|
||||
# an async task; the channel emitter handles progressive
|
||||
# message updates via socket events.
|
||||
await request.app.state.CHAT_COMPLETION_HANDLER(
|
||||
request, form_data, user=user
|
||||
)
|
||||
|
||||
if res:
|
||||
if res.get('choices', []) and len(res['choices']) > 0:
|
||||
await update_message_by_id(
|
||||
request,
|
||||
channel.id,
|
||||
response_message.id,
|
||||
MessageForm(
|
||||
**{
|
||||
'content': res['choices'][0]['message']['content'],
|
||||
'meta': {
|
||||
'done': True,
|
||||
},
|
||||
}
|
||||
),
|
||||
user,
|
||||
db,
|
||||
)
|
||||
elif res.get('error', None):
|
||||
await update_message_by_id(
|
||||
request,
|
||||
channel.id,
|
||||
response_message.id,
|
||||
MessageForm(
|
||||
**{
|
||||
'content': f'Error: {res["error"]}',
|
||||
'meta': {
|
||||
'done': True,
|
||||
},
|
||||
}
|
||||
),
|
||||
user,
|
||||
db,
|
||||
)
|
||||
except Exception as e:
|
||||
log.info(e)
|
||||
pass
|
||||
log.exception(e)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
@@ -832,7 +832,80 @@ async def disconnect(sid):
|
||||
# print(f"Unknown session ID {sid} disconnected")
|
||||
|
||||
|
||||
async def _make_channel_emitter(request_info):
|
||||
"""Event emitter that routes pipeline output to a channel message.
|
||||
|
||||
Translates chat:completion events into channel message:update socket
|
||||
emissions, throttled to avoid flooding with per-token updates.
|
||||
"""
|
||||
channel_id = request_info['chat_id'].removeprefix('channel:')
|
||||
message_id = request_info['message_id']
|
||||
|
||||
state = {'last_emit_at': 0.0}
|
||||
THROTTLE_INTERVAL = 0.15 # ~6 updates/sec
|
||||
|
||||
async def _emit_channel_update(content: str, done: bool = False):
|
||||
from open_webui.models.messages import Messages, MessageForm
|
||||
|
||||
update_form = MessageForm(content=content)
|
||||
if done:
|
||||
# Merge done flag into existing meta (preserve model_id etc.)
|
||||
msg = await Messages.get_message_by_id(message_id)
|
||||
existing_meta = (msg.meta or {}) if msg else {}
|
||||
update_form = MessageForm(
|
||||
content=content,
|
||||
meta={**existing_meta, 'done': True},
|
||||
)
|
||||
|
||||
await Messages.update_message_by_id(message_id, update_form)
|
||||
message = await Messages.get_message_by_id(message_id)
|
||||
if message:
|
||||
await sio.emit(
|
||||
'events:channel',
|
||||
{
|
||||
'channel_id': channel_id,
|
||||
'message_id': message_id,
|
||||
'data': {
|
||||
'type': 'message:update',
|
||||
'data': message.model_dump(),
|
||||
},
|
||||
},
|
||||
to=f'channel:{channel_id}',
|
||||
)
|
||||
|
||||
async def __channel_emitter__(event_data):
|
||||
event_type = event_data.get('type')
|
||||
|
||||
if event_type == 'chat:completion':
|
||||
data = event_data.get('data', {})
|
||||
content = data.get('content', '')
|
||||
done = data.get('done', False)
|
||||
|
||||
if not content and not done:
|
||||
return
|
||||
|
||||
now = __import__('time').time()
|
||||
if done or (now - state['last_emit_at']) >= THROTTLE_INTERVAL:
|
||||
state['last_emit_at'] = now
|
||||
await _emit_channel_update(content, done)
|
||||
|
||||
elif event_type == 'chat:message:error':
|
||||
error = event_data.get('data', {}).get('error', {})
|
||||
error_content = (
|
||||
error.get('content', 'An error occurred')
|
||||
if isinstance(error, dict)
|
||||
else str(error)
|
||||
)
|
||||
await _emit_channel_update(f'Error: {error_content}', done=True)
|
||||
|
||||
return __channel_emitter__
|
||||
|
||||
|
||||
async def get_event_emitter(request_info, update_db=True):
|
||||
# Channel mode: route pipeline output to channel message updates
|
||||
if request_info.get('chat_id', '').startswith('channel:'):
|
||||
return await _make_channel_emitter(request_info)
|
||||
|
||||
async def __event_emitter__(event_data):
|
||||
user_id = request_info['user_id']
|
||||
chat_id = request_info['chat_id']
|
||||
|
||||
@@ -1708,7 +1708,7 @@ async def add_file_context(messages: list, chat_id: str, user) -> list:
|
||||
"""
|
||||
Add file URLs to messages for native function calling.
|
||||
"""
|
||||
if not chat_id or chat_id.startswith('local:'):
|
||||
if not chat_id or chat_id.startswith('local:') or chat_id.startswith('channel:'):
|
||||
return messages
|
||||
|
||||
chat = await Chats.get_chat_by_id_and_user_id(chat_id, user.id)
|
||||
@@ -1764,7 +1764,7 @@ async def chat_image_generation_handler(request: Request, form_data: dict, extra
|
||||
if not chat_id or not isinstance(chat_id, str) or not __event_emitter__:
|
||||
return form_data
|
||||
|
||||
if chat_id.startswith('local:'):
|
||||
if chat_id.startswith('local:') or chat_id.startswith('channel:'):
|
||||
message_list = form_data.get('messages', [])
|
||||
else:
|
||||
chat = await Chats.get_chat_by_id_and_user_id(chat_id, user.id)
|
||||
@@ -2296,7 +2296,7 @@ async def process_chat_payload(request, form_data, user, metadata, model):
|
||||
chat_id = metadata.get('chat_id')
|
||||
user_message_id = metadata.get('user_message_id')
|
||||
|
||||
if chat_id and user_message_id and not chat_id.startswith('local:'):
|
||||
if chat_id and user_message_id and not chat_id.startswith('local:') and not chat_id.startswith('channel:'):
|
||||
db_messages = await load_messages_from_db(chat_id, user_message_id)
|
||||
if db_messages:
|
||||
# Continue: frontend sends assistant_message_id when continuing
|
||||
@@ -3058,7 +3058,7 @@ async def background_tasks_handler(ctx):
|
||||
message = None
|
||||
messages = []
|
||||
|
||||
if 'chat_id' in metadata and not metadata['chat_id'].startswith('local:'):
|
||||
if 'chat_id' in metadata and not metadata['chat_id'].startswith('local:') and not metadata['chat_id'].startswith('channel:'):
|
||||
messages_map = await Chats.get_messages_map_by_chat_id(metadata['chat_id'])
|
||||
message = messages_map.get(metadata['message_id']) if messages_map else None
|
||||
|
||||
@@ -3138,7 +3138,7 @@ async def background_tasks_handler(ctx):
|
||||
}
|
||||
)
|
||||
|
||||
if not metadata.get('chat_id', '').startswith('local:'):
|
||||
if not metadata.get('chat_id', '').startswith('local:') and not metadata.get('chat_id', '').startswith('channel:'):
|
||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||
metadata['chat_id'],
|
||||
metadata['message_id'],
|
||||
@@ -3150,7 +3150,7 @@ async def background_tasks_handler(ctx):
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
if not metadata.get('chat_id', '').startswith('local:'): # Only update titles and tags for non-temp chats
|
||||
if not metadata.get('chat_id', '').startswith('local:') and not metadata.get('chat_id', '').startswith('channel:'): # Only update titles and tags for non-temp chats
|
||||
if TASKS.TITLE_GENERATION in tasks:
|
||||
user_message = get_last_user_message(messages)
|
||||
if user_message and len(user_message) > 100:
|
||||
@@ -3274,7 +3274,7 @@ async def outlet_filter_handler(ctx):
|
||||
if not chat_id or not message_id:
|
||||
return
|
||||
|
||||
is_temp_chat = chat_id.startswith('local:')
|
||||
is_temp_chat = chat_id.startswith('local:') or chat_id.startswith('channel:')
|
||||
|
||||
try:
|
||||
messages_map = None
|
||||
@@ -3416,13 +3416,14 @@ async def non_streaming_chat_response_handler(response, ctx):
|
||||
|
||||
log.error('Provider returned error (non-streaming): %s', error)
|
||||
|
||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||
metadata['chat_id'],
|
||||
metadata['message_id'],
|
||||
{
|
||||
'error': {'content': error},
|
||||
},
|
||||
)
|
||||
if not metadata['chat_id'].startswith('channel:'):
|
||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||
metadata['chat_id'],
|
||||
metadata['message_id'],
|
||||
{
|
||||
'error': {'content': error},
|
||||
},
|
||||
)
|
||||
if isinstance(error, str) or isinstance(error, dict):
|
||||
await event_emitter(
|
||||
{
|
||||
@@ -3431,7 +3432,7 @@ async def non_streaming_chat_response_handler(response, ctx):
|
||||
}
|
||||
)
|
||||
|
||||
if 'selected_model_id' in response_data:
|
||||
if 'selected_model_id' in response_data and not metadata['chat_id'].startswith('channel:'):
|
||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||
metadata['chat_id'],
|
||||
metadata['message_id'],
|
||||
@@ -3452,7 +3453,7 @@ async def non_streaming_chat_response_handler(response, ctx):
|
||||
}
|
||||
)
|
||||
|
||||
title = await Chats.get_chat_title_by_id(metadata['chat_id'])
|
||||
title = await Chats.get_chat_title_by_id(metadata['chat_id']) if not metadata['chat_id'].startswith('channel:') else ''
|
||||
|
||||
# Use output from backend if provided (OR-compliant backends),
|
||||
# otherwise generate from response content
|
||||
@@ -3483,17 +3484,18 @@ async def non_streaming_chat_response_handler(response, ctx):
|
||||
# Save message in the database
|
||||
usage = normalize_usage(response_data.get('usage', {}) or {})
|
||||
|
||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||
metadata['chat_id'],
|
||||
metadata['message_id'],
|
||||
{
|
||||
'done': True,
|
||||
'role': 'assistant',
|
||||
'content': content,
|
||||
'output': response_output,
|
||||
**({'usage': usage} if usage else {}),
|
||||
},
|
||||
)
|
||||
if not metadata['chat_id'].startswith('channel:'):
|
||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||
metadata['chat_id'],
|
||||
metadata['message_id'],
|
||||
{
|
||||
'done': True,
|
||||
'role': 'assistant',
|
||||
'content': content,
|
||||
'output': response_output,
|
||||
**({'usage': usage} if usage else {}),
|
||||
},
|
||||
)
|
||||
|
||||
# Send a webhook notification if the user is not active
|
||||
if request.app.state.config.ENABLE_USER_WEBHOOKS and not await Users.is_user_active(user.id):
|
||||
@@ -4348,7 +4350,7 @@ async def streaming_chat_response_handler(response, ctx):
|
||||
if end:
|
||||
break
|
||||
|
||||
if ENABLE_REALTIME_CHAT_SAVE:
|
||||
if ENABLE_REALTIME_CHAT_SAVE and not metadata['chat_id'].startswith('channel:'):
|
||||
# Save message in the database
|
||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||
metadata['chat_id'],
|
||||
@@ -5024,7 +5026,7 @@ async def streaming_chat_response_handler(response, ctx):
|
||||
if item.get('status') == 'in_progress':
|
||||
item['status'] = 'completed'
|
||||
|
||||
title = await Chats.get_chat_title_by_id(metadata['chat_id'])
|
||||
title = await Chats.get_chat_title_by_id(metadata['chat_id']) if not metadata['chat_id'].startswith('channel:') else ''
|
||||
data = {
|
||||
'done': True,
|
||||
'content': serialize_output(output),
|
||||
@@ -5033,30 +5035,31 @@ async def streaming_chat_response_handler(response, ctx):
|
||||
**({'usage': usage} if usage else {}),
|
||||
}
|
||||
|
||||
if not ENABLE_REALTIME_CHAT_SAVE:
|
||||
# Save message in the database
|
||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||
metadata['chat_id'],
|
||||
metadata['message_id'],
|
||||
{
|
||||
'done': True,
|
||||
'content': serialize_output(output),
|
||||
'output': output,
|
||||
**({'usage': usage} if usage else {}),
|
||||
},
|
||||
)
|
||||
elif usage:
|
||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||
metadata['chat_id'],
|
||||
metadata['message_id'],
|
||||
{'done': True, 'usage': usage},
|
||||
)
|
||||
else:
|
||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||
metadata['chat_id'],
|
||||
metadata['message_id'],
|
||||
{'done': True},
|
||||
)
|
||||
if not metadata['chat_id'].startswith('channel:'):
|
||||
if not ENABLE_REALTIME_CHAT_SAVE:
|
||||
# Save message in the database
|
||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||
metadata['chat_id'],
|
||||
metadata['message_id'],
|
||||
{
|
||||
'done': True,
|
||||
'content': serialize_output(output),
|
||||
'output': output,
|
||||
**({'usage': usage} if usage else {}),
|
||||
},
|
||||
)
|
||||
elif usage:
|
||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||
metadata['chat_id'],
|
||||
metadata['message_id'],
|
||||
{'done': True, 'usage': usage},
|
||||
)
|
||||
else:
|
||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||
metadata['chat_id'],
|
||||
metadata['message_id'],
|
||||
{'done': True},
|
||||
)
|
||||
|
||||
# Send a webhook notification if the user is not active
|
||||
if request.app.state.config.ENABLE_USER_WEBHOOKS and not await Users.is_user_active(user.id):
|
||||
@@ -5103,22 +5106,23 @@ async def streaming_chat_response_handler(response, ctx):
|
||||
|
||||
async def save_cancelled_state():
|
||||
await event_emitter({'type': 'chat:tasks:cancel'})
|
||||
if not ENABLE_REALTIME_CHAT_SAVE:
|
||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||
metadata['chat_id'],
|
||||
metadata['message_id'],
|
||||
{
|
||||
'done': True,
|
||||
'content': serialize_output(output),
|
||||
'output': output,
|
||||
},
|
||||
)
|
||||
else:
|
||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||
metadata['chat_id'],
|
||||
metadata['message_id'],
|
||||
{'done': True},
|
||||
)
|
||||
if not metadata['chat_id'].startswith('channel:'):
|
||||
if not ENABLE_REALTIME_CHAT_SAVE:
|
||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||
metadata['chat_id'],
|
||||
metadata['message_id'],
|
||||
{
|
||||
'done': True,
|
||||
'content': serialize_output(output),
|
||||
'output': output,
|
||||
},
|
||||
)
|
||||
else:
|
||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||
metadata['chat_id'],
|
||||
metadata['message_id'],
|
||||
{'done': True},
|
||||
)
|
||||
|
||||
try:
|
||||
await asyncio.shield(save_cancelled_state())
|
||||
|
||||
Reference in New Issue
Block a user