enh: channels streaming agent

This commit is contained in:
Timothy Jaeryang Baek
2026-05-11 02:50:30 +09:00
parent c951b4f262
commit 0037baeb26
4 changed files with 184 additions and 114 deletions

View File

@@ -1796,7 +1796,7 @@ async def chat_completion(
if metadata.get('chat_id') and user:
chat_id = metadata['chat_id']
if not chat_id.startswith('local:'): # temporary chats are not stored
if not chat_id.startswith('local:') and not chat_id.startswith('channel:'): # temporary/channel chats are not stored
if is_new_chat:
# Build the full history upfront with ALL assistant placeholders
user_message = metadata.get('user_message') or {}
@@ -2012,7 +2012,7 @@ async def chat_completion(
if metadata.get('chat_id') and metadata.get('message_id'):
# Update the chat message with the error
try:
if not metadata['chat_id'].startswith('local:'):
if not metadata['chat_id'].startswith('local:') and not metadata['chat_id'].startswith('channel:'):
await Chats.upsert_message_to_chat_by_id_and_message_id(
metadata['chat_id'],
metadata['message_id'],
@@ -2275,7 +2275,7 @@ async def list_tasks_endpoint(request: Request, user=Depends(get_admin_user)):
@app.get('/api/tasks/chat/{chat_id:path}')
async def list_tasks_by_chat_id_endpoint(request: Request, chat_id: str, user=Depends(get_verified_user)):
if chat_id.startswith('local:'):
if chat_id.startswith('local:') or chat_id.startswith('channel:'):
socket_id = chat_id[len('local:') :]
owner_id = get_user_id_from_session_pool(socket_id)
if owner_id != user.id and user.role != 'admin':
@@ -2293,7 +2293,7 @@ async def list_tasks_by_chat_id_endpoint(request: Request, chat_id: str, user=De
@app.post('/api/tasks/chat/{chat_id:path}/stop')
async def stop_tasks_by_chat_id_endpoint(request: Request, chat_id: str, user=Depends(get_verified_user)):
if chat_id.startswith('local:'):
if chat_id.startswith('local:') or chat_id.startswith('channel:'):
socket_id = chat_id[len('local:') :]
owner_id = get_user_id_from_session_pool(socket_id)
if owner_id != user.id and user.role != 'admin':

View File

@@ -57,7 +57,7 @@ from open_webui.utils.models import (
get_all_models,
get_filtered_models,
)
from open_webui.utils.chat import generate_chat_completion
from open_webui.utils.auth import get_admin_user, get_verified_user
@@ -979,57 +979,50 @@ async def model_response_handler(request, channel, message, user, db=None):
],
]
# Resolve model config (same helpers automations use)
from open_webui.utils.automations import (
_resolve_model_tool_ids,
_resolve_model_features,
_resolve_model_filter_ids,
)
tool_ids = _resolve_model_tool_ids(request.app, model_id)
features = _resolve_model_features(request.app, model_id)
filter_ids = _resolve_model_filter_ids(request.app, model_id)
# Build full form_data — same shape as frontend POST.
# The channel: prefix routes pipeline events to the
# channel emitter in socket/main.py instead of the
# default chat emitter.
form_data = {
'model': model_id,
'messages': [
system_message,
{'role': 'user', 'content': content},
],
'stream': False,
'stream': True,
'chat_id': f'channel:{channel.id}',
'id': response_message.id,
'session_id': f'channel:{channel.id}',
'background_tasks': {},
}
if tool_ids:
form_data['tool_ids'] = tool_ids
if features:
form_data['features'] = features
if filter_ids:
form_data['filter_ids'] = filter_ids
res = await generate_chat_completion(
request,
form_data=form_data,
user=user,
# Call the full chat completion pipeline — streaming,
# tools, filters, RAG — everything. The pipeline runs as
# an async task; the channel emitter handles progressive
# message updates via socket events.
await request.app.state.CHAT_COMPLETION_HANDLER(
request, form_data, user=user
)
if res:
if res.get('choices', []) and len(res['choices']) > 0:
await update_message_by_id(
request,
channel.id,
response_message.id,
MessageForm(
**{
'content': res['choices'][0]['message']['content'],
'meta': {
'done': True,
},
}
),
user,
db,
)
elif res.get('error', None):
await update_message_by_id(
request,
channel.id,
response_message.id,
MessageForm(
**{
'content': f'Error: {res["error"]}',
'meta': {
'done': True,
},
}
),
user,
db,
)
except Exception as e:
log.info(e)
pass
log.exception(e)
return True

View File

@@ -832,7 +832,80 @@ async def disconnect(sid):
# print(f"Unknown session ID {sid} disconnected")
async def _make_channel_emitter(request_info):
"""Event emitter that routes pipeline output to a channel message.
Translates chat:completion events into channel message:update socket
emissions, throttled to avoid flooding with per-token updates.
"""
channel_id = request_info['chat_id'].removeprefix('channel:')
message_id = request_info['message_id']
state = {'last_emit_at': 0.0}
THROTTLE_INTERVAL = 0.15 # ~6 updates/sec
async def _emit_channel_update(content: str, done: bool = False):
from open_webui.models.messages import Messages, MessageForm
update_form = MessageForm(content=content)
if done:
# Merge done flag into existing meta (preserve model_id etc.)
msg = await Messages.get_message_by_id(message_id)
existing_meta = (msg.meta or {}) if msg else {}
update_form = MessageForm(
content=content,
meta={**existing_meta, 'done': True},
)
await Messages.update_message_by_id(message_id, update_form)
message = await Messages.get_message_by_id(message_id)
if message:
await sio.emit(
'events:channel',
{
'channel_id': channel_id,
'message_id': message_id,
'data': {
'type': 'message:update',
'data': message.model_dump(),
},
},
to=f'channel:{channel_id}',
)
async def __channel_emitter__(event_data):
event_type = event_data.get('type')
if event_type == 'chat:completion':
data = event_data.get('data', {})
content = data.get('content', '')
done = data.get('done', False)
if not content and not done:
return
now = __import__('time').time()
if done or (now - state['last_emit_at']) >= THROTTLE_INTERVAL:
state['last_emit_at'] = now
await _emit_channel_update(content, done)
elif event_type == 'chat:message:error':
error = event_data.get('data', {}).get('error', {})
error_content = (
error.get('content', 'An error occurred')
if isinstance(error, dict)
else str(error)
)
await _emit_channel_update(f'Error: {error_content}', done=True)
return __channel_emitter__
async def get_event_emitter(request_info, update_db=True):
# Channel mode: route pipeline output to channel message updates
if request_info.get('chat_id', '').startswith('channel:'):
return await _make_channel_emitter(request_info)
async def __event_emitter__(event_data):
user_id = request_info['user_id']
chat_id = request_info['chat_id']

View File

@@ -1708,7 +1708,7 @@ async def add_file_context(messages: list, chat_id: str, user) -> list:
"""
Add file URLs to messages for native function calling.
"""
if not chat_id or chat_id.startswith('local:'):
if not chat_id or chat_id.startswith('local:') or chat_id.startswith('channel:'):
return messages
chat = await Chats.get_chat_by_id_and_user_id(chat_id, user.id)
@@ -1764,7 +1764,7 @@ async def chat_image_generation_handler(request: Request, form_data: dict, extra
if not chat_id or not isinstance(chat_id, str) or not __event_emitter__:
return form_data
if chat_id.startswith('local:'):
if chat_id.startswith('local:') or chat_id.startswith('channel:'):
message_list = form_data.get('messages', [])
else:
chat = await Chats.get_chat_by_id_and_user_id(chat_id, user.id)
@@ -2296,7 +2296,7 @@ async def process_chat_payload(request, form_data, user, metadata, model):
chat_id = metadata.get('chat_id')
user_message_id = metadata.get('user_message_id')
if chat_id and user_message_id and not chat_id.startswith('local:'):
if chat_id and user_message_id and not chat_id.startswith('local:') and not chat_id.startswith('channel:'):
db_messages = await load_messages_from_db(chat_id, user_message_id)
if db_messages:
# Continue: frontend sends assistant_message_id when continuing
@@ -3058,7 +3058,7 @@ async def background_tasks_handler(ctx):
message = None
messages = []
if 'chat_id' in metadata and not metadata['chat_id'].startswith('local:'):
if 'chat_id' in metadata and not metadata['chat_id'].startswith('local:') and not metadata['chat_id'].startswith('channel:'):
messages_map = await Chats.get_messages_map_by_chat_id(metadata['chat_id'])
message = messages_map.get(metadata['message_id']) if messages_map else None
@@ -3138,7 +3138,7 @@ async def background_tasks_handler(ctx):
}
)
if not metadata.get('chat_id', '').startswith('local:'):
if not metadata.get('chat_id', '').startswith('local:') and not metadata.get('chat_id', '').startswith('channel:'):
await Chats.upsert_message_to_chat_by_id_and_message_id(
metadata['chat_id'],
metadata['message_id'],
@@ -3150,7 +3150,7 @@ async def background_tasks_handler(ctx):
except Exception as e:
pass
if not metadata.get('chat_id', '').startswith('local:'): # Only update titles and tags for non-temp chats
if not metadata.get('chat_id', '').startswith('local:') and not metadata.get('chat_id', '').startswith('channel:'): # Only update titles and tags for non-temp chats
if TASKS.TITLE_GENERATION in tasks:
user_message = get_last_user_message(messages)
if user_message and len(user_message) > 100:
@@ -3274,7 +3274,7 @@ async def outlet_filter_handler(ctx):
if not chat_id or not message_id:
return
is_temp_chat = chat_id.startswith('local:')
is_temp_chat = chat_id.startswith('local:') or chat_id.startswith('channel:')
try:
messages_map = None
@@ -3416,13 +3416,14 @@ async def non_streaming_chat_response_handler(response, ctx):
log.error('Provider returned error (non-streaming): %s', error)
await Chats.upsert_message_to_chat_by_id_and_message_id(
metadata['chat_id'],
metadata['message_id'],
{
'error': {'content': error},
},
)
if not metadata['chat_id'].startswith('channel:'):
await Chats.upsert_message_to_chat_by_id_and_message_id(
metadata['chat_id'],
metadata['message_id'],
{
'error': {'content': error},
},
)
if isinstance(error, str) or isinstance(error, dict):
await event_emitter(
{
@@ -3431,7 +3432,7 @@ async def non_streaming_chat_response_handler(response, ctx):
}
)
if 'selected_model_id' in response_data:
if 'selected_model_id' in response_data and not metadata['chat_id'].startswith('channel:'):
await Chats.upsert_message_to_chat_by_id_and_message_id(
metadata['chat_id'],
metadata['message_id'],
@@ -3452,7 +3453,7 @@ async def non_streaming_chat_response_handler(response, ctx):
}
)
title = await Chats.get_chat_title_by_id(metadata['chat_id'])
title = await Chats.get_chat_title_by_id(metadata['chat_id']) if not metadata['chat_id'].startswith('channel:') else ''
# Use output from backend if provided (OR-compliant backends),
# otherwise generate from response content
@@ -3483,17 +3484,18 @@ async def non_streaming_chat_response_handler(response, ctx):
# Save message in the database
usage = normalize_usage(response_data.get('usage', {}) or {})
await Chats.upsert_message_to_chat_by_id_and_message_id(
metadata['chat_id'],
metadata['message_id'],
{
'done': True,
'role': 'assistant',
'content': content,
'output': response_output,
**({'usage': usage} if usage else {}),
},
)
if not metadata['chat_id'].startswith('channel:'):
await Chats.upsert_message_to_chat_by_id_and_message_id(
metadata['chat_id'],
metadata['message_id'],
{
'done': True,
'role': 'assistant',
'content': content,
'output': response_output,
**({'usage': usage} if usage else {}),
},
)
# Send a webhook notification if the user is not active
if request.app.state.config.ENABLE_USER_WEBHOOKS and not await Users.is_user_active(user.id):
@@ -4348,7 +4350,7 @@ async def streaming_chat_response_handler(response, ctx):
if end:
break
if ENABLE_REALTIME_CHAT_SAVE:
if ENABLE_REALTIME_CHAT_SAVE and not metadata['chat_id'].startswith('channel:'):
# Save message in the database
await Chats.upsert_message_to_chat_by_id_and_message_id(
metadata['chat_id'],
@@ -5024,7 +5026,7 @@ async def streaming_chat_response_handler(response, ctx):
if item.get('status') == 'in_progress':
item['status'] = 'completed'
title = await Chats.get_chat_title_by_id(metadata['chat_id'])
title = await Chats.get_chat_title_by_id(metadata['chat_id']) if not metadata['chat_id'].startswith('channel:') else ''
data = {
'done': True,
'content': serialize_output(output),
@@ -5033,30 +5035,31 @@ async def streaming_chat_response_handler(response, ctx):
**({'usage': usage} if usage else {}),
}
if not ENABLE_REALTIME_CHAT_SAVE:
# Save message in the database
await Chats.upsert_message_to_chat_by_id_and_message_id(
metadata['chat_id'],
metadata['message_id'],
{
'done': True,
'content': serialize_output(output),
'output': output,
**({'usage': usage} if usage else {}),
},
)
elif usage:
await Chats.upsert_message_to_chat_by_id_and_message_id(
metadata['chat_id'],
metadata['message_id'],
{'done': True, 'usage': usage},
)
else:
await Chats.upsert_message_to_chat_by_id_and_message_id(
metadata['chat_id'],
metadata['message_id'],
{'done': True},
)
if not metadata['chat_id'].startswith('channel:'):
if not ENABLE_REALTIME_CHAT_SAVE:
# Save message in the database
await Chats.upsert_message_to_chat_by_id_and_message_id(
metadata['chat_id'],
metadata['message_id'],
{
'done': True,
'content': serialize_output(output),
'output': output,
**({'usage': usage} if usage else {}),
},
)
elif usage:
await Chats.upsert_message_to_chat_by_id_and_message_id(
metadata['chat_id'],
metadata['message_id'],
{'done': True, 'usage': usage},
)
else:
await Chats.upsert_message_to_chat_by_id_and_message_id(
metadata['chat_id'],
metadata['message_id'],
{'done': True},
)
# Send a webhook notification if the user is not active
if request.app.state.config.ENABLE_USER_WEBHOOKS and not await Users.is_user_active(user.id):
@@ -5103,22 +5106,23 @@ async def streaming_chat_response_handler(response, ctx):
async def save_cancelled_state():
await event_emitter({'type': 'chat:tasks:cancel'})
if not ENABLE_REALTIME_CHAT_SAVE:
await Chats.upsert_message_to_chat_by_id_and_message_id(
metadata['chat_id'],
metadata['message_id'],
{
'done': True,
'content': serialize_output(output),
'output': output,
},
)
else:
await Chats.upsert_message_to_chat_by_id_and_message_id(
metadata['chat_id'],
metadata['message_id'],
{'done': True},
)
if not metadata['chat_id'].startswith('channel:'):
if not ENABLE_REALTIME_CHAT_SAVE:
await Chats.upsert_message_to_chat_by_id_and_message_id(
metadata['chat_id'],
metadata['message_id'],
{
'done': True,
'content': serialize_output(output),
'output': output,
},
)
else:
await Chats.upsert_message_to_chat_by_id_and_message_id(
metadata['chat_id'],
metadata['message_id'],
{'done': True},
)
try:
await asyncio.shield(save_cancelled_state())