diff --git a/backend/open_webui/main.py b/backend/open_webui/main.py index 4adece3825..fb075886a2 100644 --- a/backend/open_webui/main.py +++ b/backend/open_webui/main.py @@ -1796,7 +1796,7 @@ async def chat_completion( if metadata.get('chat_id') and user: chat_id = metadata['chat_id'] - if not chat_id.startswith('local:'): # temporary chats are not stored + if not chat_id.startswith('local:') and not chat_id.startswith('channel:'): # temporary/channel chats are not stored if is_new_chat: # Build the full history upfront with ALL assistant placeholders user_message = metadata.get('user_message') or {} @@ -2012,7 +2012,7 @@ async def chat_completion( if metadata.get('chat_id') and metadata.get('message_id'): # Update the chat message with the error try: - if not metadata['chat_id'].startswith('local:'): + if not metadata['chat_id'].startswith('local:') and not metadata['chat_id'].startswith('channel:'): await Chats.upsert_message_to_chat_by_id_and_message_id( metadata['chat_id'], metadata['message_id'], @@ -2275,7 +2275,7 @@ async def list_tasks_endpoint(request: Request, user=Depends(get_admin_user)): @app.get('/api/tasks/chat/{chat_id:path}') async def list_tasks_by_chat_id_endpoint(request: Request, chat_id: str, user=Depends(get_verified_user)): - if chat_id.startswith('local:'): + if chat_id.startswith('local:') or chat_id.startswith('channel:'): socket_id = chat_id[len('local:') :] owner_id = get_user_id_from_session_pool(socket_id) if owner_id != user.id and user.role != 'admin': @@ -2293,7 +2293,7 @@ async def list_tasks_by_chat_id_endpoint(request: Request, chat_id: str, user=De @app.post('/api/tasks/chat/{chat_id:path}/stop') async def stop_tasks_by_chat_id_endpoint(request: Request, chat_id: str, user=Depends(get_verified_user)): - if chat_id.startswith('local:'): + if chat_id.startswith('local:') or chat_id.startswith('channel:'): socket_id = chat_id[len('local:') :] owner_id = get_user_id_from_session_pool(socket_id) if owner_id != user.id and user.role != 'admin': diff --git a/backend/open_webui/routers/channels.py b/backend/open_webui/routers/channels.py index 3c7fef8773..757f999c25 100644 --- a/backend/open_webui/routers/channels.py +++ b/backend/open_webui/routers/channels.py @@ -57,7 +57,7 @@ from open_webui.utils.models import ( get_all_models, get_filtered_models, ) -from open_webui.utils.chat import generate_chat_completion + from open_webui.utils.auth import get_admin_user, get_verified_user @@ -979,57 +979,50 @@ async def model_response_handler(request, channel, message, user, db=None): ], ] + # Resolve model config (same helpers automations use) + from open_webui.utils.automations import ( + _resolve_model_tool_ids, + _resolve_model_features, + _resolve_model_filter_ids, + ) + + tool_ids = _resolve_model_tool_ids(request.app, model_id) + features = _resolve_model_features(request.app, model_id) + filter_ids = _resolve_model_filter_ids(request.app, model_id) + + # Build full form_data — same shape as frontend POST. + # The channel: prefix routes pipeline events to the + # channel emitter in socket/main.py instead of the + # default chat emitter. form_data = { 'model': model_id, 'messages': [ system_message, {'role': 'user', 'content': content}, ], - 'stream': False, + 'stream': True, + 'chat_id': f'channel:{channel.id}', + 'id': response_message.id, + 'session_id': f'channel:{channel.id}', + 'background_tasks': {}, } + if tool_ids: + form_data['tool_ids'] = tool_ids + if features: + form_data['features'] = features + if filter_ids: + form_data['filter_ids'] = filter_ids - res = await generate_chat_completion( - request, - form_data=form_data, - user=user, + # Call the full chat completion pipeline — streaming, + # tools, filters, RAG — everything. The pipeline runs as + # an async task; the channel emitter handles progressive + # message updates via socket events. + await request.app.state.CHAT_COMPLETION_HANDLER( + request, form_data, user=user ) - if res: - if res.get('choices', []) and len(res['choices']) > 0: - await update_message_by_id( - request, - channel.id, - response_message.id, - MessageForm( - **{ - 'content': res['choices'][0]['message']['content'], - 'meta': { - 'done': True, - }, - } - ), - user, - db, - ) - elif res.get('error', None): - await update_message_by_id( - request, - channel.id, - response_message.id, - MessageForm( - **{ - 'content': f'Error: {res["error"]}', - 'meta': { - 'done': True, - }, - } - ), - user, - db, - ) except Exception as e: - log.info(e) - pass + log.exception(e) return True diff --git a/backend/open_webui/socket/main.py b/backend/open_webui/socket/main.py index 20e78d6c37..918330e3b1 100644 --- a/backend/open_webui/socket/main.py +++ b/backend/open_webui/socket/main.py @@ -832,7 +832,80 @@ async def disconnect(sid): # print(f"Unknown session ID {sid} disconnected") +async def _make_channel_emitter(request_info): + """Event emitter that routes pipeline output to a channel message. + + Translates chat:completion events into channel message:update socket + emissions, throttled to avoid flooding with per-token updates. + """ + channel_id = request_info['chat_id'].removeprefix('channel:') + message_id = request_info['message_id'] + + state = {'last_emit_at': 0.0} + THROTTLE_INTERVAL = 0.15 # ~6 updates/sec + + async def _emit_channel_update(content: str, done: bool = False): + from open_webui.models.messages import Messages, MessageForm + + update_form = MessageForm(content=content) + if done: + # Merge done flag into existing meta (preserve model_id etc.) + msg = await Messages.get_message_by_id(message_id) + existing_meta = (msg.meta or {}) if msg else {} + update_form = MessageForm( + content=content, + meta={**existing_meta, 'done': True}, + ) + + await Messages.update_message_by_id(message_id, update_form) + message = await Messages.get_message_by_id(message_id) + if message: + await sio.emit( + 'events:channel', + { + 'channel_id': channel_id, + 'message_id': message_id, + 'data': { + 'type': 'message:update', + 'data': message.model_dump(), + }, + }, + to=f'channel:{channel_id}', + ) + + async def __channel_emitter__(event_data): + event_type = event_data.get('type') + + if event_type == 'chat:completion': + data = event_data.get('data', {}) + content = data.get('content', '') + done = data.get('done', False) + + if not content and not done: + return + + now = __import__('time').time() + if done or (now - state['last_emit_at']) >= THROTTLE_INTERVAL: + state['last_emit_at'] = now + await _emit_channel_update(content, done) + + elif event_type == 'chat:message:error': + error = event_data.get('data', {}).get('error', {}) + error_content = ( + error.get('content', 'An error occurred') + if isinstance(error, dict) + else str(error) + ) + await _emit_channel_update(f'Error: {error_content}', done=True) + + return __channel_emitter__ + + async def get_event_emitter(request_info, update_db=True): + # Channel mode: route pipeline output to channel message updates + if request_info.get('chat_id', '').startswith('channel:'): + return await _make_channel_emitter(request_info) + async def __event_emitter__(event_data): user_id = request_info['user_id'] chat_id = request_info['chat_id'] diff --git a/backend/open_webui/utils/middleware.py b/backend/open_webui/utils/middleware.py index 60796fa22a..2efc0d54f9 100644 --- a/backend/open_webui/utils/middleware.py +++ b/backend/open_webui/utils/middleware.py @@ -1708,7 +1708,7 @@ async def add_file_context(messages: list, chat_id: str, user) -> list: """ Add file URLs to messages for native function calling. """ - if not chat_id or chat_id.startswith('local:'): + if not chat_id or chat_id.startswith('local:') or chat_id.startswith('channel:'): return messages chat = await Chats.get_chat_by_id_and_user_id(chat_id, user.id) @@ -1764,7 +1764,7 @@ async def chat_image_generation_handler(request: Request, form_data: dict, extra if not chat_id or not isinstance(chat_id, str) or not __event_emitter__: return form_data - if chat_id.startswith('local:'): + if chat_id.startswith('local:') or chat_id.startswith('channel:'): message_list = form_data.get('messages', []) else: chat = await Chats.get_chat_by_id_and_user_id(chat_id, user.id) @@ -2296,7 +2296,7 @@ async def process_chat_payload(request, form_data, user, metadata, model): chat_id = metadata.get('chat_id') user_message_id = metadata.get('user_message_id') - if chat_id and user_message_id and not chat_id.startswith('local:'): + if chat_id and user_message_id and not chat_id.startswith('local:') and not chat_id.startswith('channel:'): db_messages = await load_messages_from_db(chat_id, user_message_id) if db_messages: # Continue: frontend sends assistant_message_id when continuing @@ -3058,7 +3058,7 @@ async def background_tasks_handler(ctx): message = None messages = [] - if 'chat_id' in metadata and not metadata['chat_id'].startswith('local:'): + if 'chat_id' in metadata and not metadata['chat_id'].startswith('local:') and not metadata['chat_id'].startswith('channel:'): messages_map = await Chats.get_messages_map_by_chat_id(metadata['chat_id']) message = messages_map.get(metadata['message_id']) if messages_map else None @@ -3138,7 +3138,7 @@ async def background_tasks_handler(ctx): } ) - if not metadata.get('chat_id', '').startswith('local:'): + if not metadata.get('chat_id', '').startswith('local:') and not metadata.get('chat_id', '').startswith('channel:'): await Chats.upsert_message_to_chat_by_id_and_message_id( metadata['chat_id'], metadata['message_id'], @@ -3150,7 +3150,7 @@ async def background_tasks_handler(ctx): except Exception as e: pass - if not metadata.get('chat_id', '').startswith('local:'): # Only update titles and tags for non-temp chats + if not metadata.get('chat_id', '').startswith('local:') and not metadata.get('chat_id', '').startswith('channel:'): # Only update titles and tags for non-temp chats if TASKS.TITLE_GENERATION in tasks: user_message = get_last_user_message(messages) if user_message and len(user_message) > 100: @@ -3274,7 +3274,7 @@ async def outlet_filter_handler(ctx): if not chat_id or not message_id: return - is_temp_chat = chat_id.startswith('local:') + is_temp_chat = chat_id.startswith('local:') or chat_id.startswith('channel:') try: messages_map = None @@ -3416,13 +3416,14 @@ async def non_streaming_chat_response_handler(response, ctx): log.error('Provider returned error (non-streaming): %s', error) - await Chats.upsert_message_to_chat_by_id_and_message_id( - metadata['chat_id'], - metadata['message_id'], - { - 'error': {'content': error}, - }, - ) + if not metadata['chat_id'].startswith('channel:'): + await Chats.upsert_message_to_chat_by_id_and_message_id( + metadata['chat_id'], + metadata['message_id'], + { + 'error': {'content': error}, + }, + ) if isinstance(error, str) or isinstance(error, dict): await event_emitter( { @@ -3431,7 +3432,7 @@ async def non_streaming_chat_response_handler(response, ctx): } ) - if 'selected_model_id' in response_data: + if 'selected_model_id' in response_data and not metadata['chat_id'].startswith('channel:'): await Chats.upsert_message_to_chat_by_id_and_message_id( metadata['chat_id'], metadata['message_id'], @@ -3452,7 +3453,7 @@ async def non_streaming_chat_response_handler(response, ctx): } ) - title = await Chats.get_chat_title_by_id(metadata['chat_id']) + title = await Chats.get_chat_title_by_id(metadata['chat_id']) if not metadata['chat_id'].startswith('channel:') else '' # Use output from backend if provided (OR-compliant backends), # otherwise generate from response content @@ -3483,17 +3484,18 @@ async def non_streaming_chat_response_handler(response, ctx): # Save message in the database usage = normalize_usage(response_data.get('usage', {}) or {}) - await Chats.upsert_message_to_chat_by_id_and_message_id( - metadata['chat_id'], - metadata['message_id'], - { - 'done': True, - 'role': 'assistant', - 'content': content, - 'output': response_output, - **({'usage': usage} if usage else {}), - }, - ) + if not metadata['chat_id'].startswith('channel:'): + await Chats.upsert_message_to_chat_by_id_and_message_id( + metadata['chat_id'], + metadata['message_id'], + { + 'done': True, + 'role': 'assistant', + 'content': content, + 'output': response_output, + **({'usage': usage} if usage else {}), + }, + ) # Send a webhook notification if the user is not active if request.app.state.config.ENABLE_USER_WEBHOOKS and not await Users.is_user_active(user.id): @@ -4348,7 +4350,7 @@ async def streaming_chat_response_handler(response, ctx): if end: break - if ENABLE_REALTIME_CHAT_SAVE: + if ENABLE_REALTIME_CHAT_SAVE and not metadata['chat_id'].startswith('channel:'): # Save message in the database await Chats.upsert_message_to_chat_by_id_and_message_id( metadata['chat_id'], @@ -5024,7 +5026,7 @@ async def streaming_chat_response_handler(response, ctx): if item.get('status') == 'in_progress': item['status'] = 'completed' - title = await Chats.get_chat_title_by_id(metadata['chat_id']) + title = await Chats.get_chat_title_by_id(metadata['chat_id']) if not metadata['chat_id'].startswith('channel:') else '' data = { 'done': True, 'content': serialize_output(output), @@ -5033,30 +5035,31 @@ async def streaming_chat_response_handler(response, ctx): **({'usage': usage} if usage else {}), } - if not ENABLE_REALTIME_CHAT_SAVE: - # Save message in the database - await Chats.upsert_message_to_chat_by_id_and_message_id( - metadata['chat_id'], - metadata['message_id'], - { - 'done': True, - 'content': serialize_output(output), - 'output': output, - **({'usage': usage} if usage else {}), - }, - ) - elif usage: - await Chats.upsert_message_to_chat_by_id_and_message_id( - metadata['chat_id'], - metadata['message_id'], - {'done': True, 'usage': usage}, - ) - else: - await Chats.upsert_message_to_chat_by_id_and_message_id( - metadata['chat_id'], - metadata['message_id'], - {'done': True}, - ) + if not metadata['chat_id'].startswith('channel:'): + if not ENABLE_REALTIME_CHAT_SAVE: + # Save message in the database + await Chats.upsert_message_to_chat_by_id_and_message_id( + metadata['chat_id'], + metadata['message_id'], + { + 'done': True, + 'content': serialize_output(output), + 'output': output, + **({'usage': usage} if usage else {}), + }, + ) + elif usage: + await Chats.upsert_message_to_chat_by_id_and_message_id( + metadata['chat_id'], + metadata['message_id'], + {'done': True, 'usage': usage}, + ) + else: + await Chats.upsert_message_to_chat_by_id_and_message_id( + metadata['chat_id'], + metadata['message_id'], + {'done': True}, + ) # Send a webhook notification if the user is not active if request.app.state.config.ENABLE_USER_WEBHOOKS and not await Users.is_user_active(user.id): @@ -5103,22 +5106,23 @@ async def streaming_chat_response_handler(response, ctx): async def save_cancelled_state(): await event_emitter({'type': 'chat:tasks:cancel'}) - if not ENABLE_REALTIME_CHAT_SAVE: - await Chats.upsert_message_to_chat_by_id_and_message_id( - metadata['chat_id'], - metadata['message_id'], - { - 'done': True, - 'content': serialize_output(output), - 'output': output, - }, - ) - else: - await Chats.upsert_message_to_chat_by_id_and_message_id( - metadata['chat_id'], - metadata['message_id'], - {'done': True}, - ) + if not metadata['chat_id'].startswith('channel:'): + if not ENABLE_REALTIME_CHAT_SAVE: + await Chats.upsert_message_to_chat_by_id_and_message_id( + metadata['chat_id'], + metadata['message_id'], + { + 'done': True, + 'content': serialize_output(output), + 'output': output, + }, + ) + else: + await Chats.upsert_message_to_chat_by_id_and_message_id( + metadata['chat_id'], + metadata['message_id'], + {'done': True}, + ) try: await asyncio.shield(save_cancelled_state())