mirror of
https://github.com/open-webui/open-webui.git
synced 2026-06-19 18:19:02 -05:00
enh: channels streaming agent
This commit is contained in:
@@ -1796,7 +1796,7 @@ async def chat_completion(
|
|||||||
|
|
||||||
if metadata.get('chat_id') and user:
|
if metadata.get('chat_id') and user:
|
||||||
chat_id = metadata['chat_id']
|
chat_id = metadata['chat_id']
|
||||||
if not chat_id.startswith('local:'): # temporary chats are not stored
|
if not chat_id.startswith('local:') and not chat_id.startswith('channel:'): # temporary/channel chats are not stored
|
||||||
if is_new_chat:
|
if is_new_chat:
|
||||||
# Build the full history upfront with ALL assistant placeholders
|
# Build the full history upfront with ALL assistant placeholders
|
||||||
user_message = metadata.get('user_message') or {}
|
user_message = metadata.get('user_message') or {}
|
||||||
@@ -2012,7 +2012,7 @@ async def chat_completion(
|
|||||||
if metadata.get('chat_id') and metadata.get('message_id'):
|
if metadata.get('chat_id') and metadata.get('message_id'):
|
||||||
# Update the chat message with the error
|
# Update the chat message with the error
|
||||||
try:
|
try:
|
||||||
if not metadata['chat_id'].startswith('local:'):
|
if not metadata['chat_id'].startswith('local:') and not metadata['chat_id'].startswith('channel:'):
|
||||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||||
metadata['chat_id'],
|
metadata['chat_id'],
|
||||||
metadata['message_id'],
|
metadata['message_id'],
|
||||||
@@ -2275,7 +2275,7 @@ async def list_tasks_endpoint(request: Request, user=Depends(get_admin_user)):
|
|||||||
|
|
||||||
@app.get('/api/tasks/chat/{chat_id:path}')
|
@app.get('/api/tasks/chat/{chat_id:path}')
|
||||||
async def list_tasks_by_chat_id_endpoint(request: Request, chat_id: str, user=Depends(get_verified_user)):
|
async def list_tasks_by_chat_id_endpoint(request: Request, chat_id: str, user=Depends(get_verified_user)):
|
||||||
if chat_id.startswith('local:'):
|
if chat_id.startswith('local:') or chat_id.startswith('channel:'):
|
||||||
socket_id = chat_id[len('local:') :]
|
socket_id = chat_id[len('local:') :]
|
||||||
owner_id = get_user_id_from_session_pool(socket_id)
|
owner_id = get_user_id_from_session_pool(socket_id)
|
||||||
if owner_id != user.id and user.role != 'admin':
|
if owner_id != user.id and user.role != 'admin':
|
||||||
@@ -2293,7 +2293,7 @@ async def list_tasks_by_chat_id_endpoint(request: Request, chat_id: str, user=De
|
|||||||
|
|
||||||
@app.post('/api/tasks/chat/{chat_id:path}/stop')
|
@app.post('/api/tasks/chat/{chat_id:path}/stop')
|
||||||
async def stop_tasks_by_chat_id_endpoint(request: Request, chat_id: str, user=Depends(get_verified_user)):
|
async def stop_tasks_by_chat_id_endpoint(request: Request, chat_id: str, user=Depends(get_verified_user)):
|
||||||
if chat_id.startswith('local:'):
|
if chat_id.startswith('local:') or chat_id.startswith('channel:'):
|
||||||
socket_id = chat_id[len('local:') :]
|
socket_id = chat_id[len('local:') :]
|
||||||
owner_id = get_user_id_from_session_pool(socket_id)
|
owner_id = get_user_id_from_session_pool(socket_id)
|
||||||
if owner_id != user.id and user.role != 'admin':
|
if owner_id != user.id and user.role != 'admin':
|
||||||
|
|||||||
@@ -57,7 +57,7 @@ from open_webui.utils.models import (
|
|||||||
get_all_models,
|
get_all_models,
|
||||||
get_filtered_models,
|
get_filtered_models,
|
||||||
)
|
)
|
||||||
from open_webui.utils.chat import generate_chat_completion
|
|
||||||
|
|
||||||
|
|
||||||
from open_webui.utils.auth import get_admin_user, get_verified_user
|
from open_webui.utils.auth import get_admin_user, get_verified_user
|
||||||
@@ -979,57 +979,50 @@ async def model_response_handler(request, channel, message, user, db=None):
|
|||||||
],
|
],
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# Resolve model config (same helpers automations use)
|
||||||
|
from open_webui.utils.automations import (
|
||||||
|
_resolve_model_tool_ids,
|
||||||
|
_resolve_model_features,
|
||||||
|
_resolve_model_filter_ids,
|
||||||
|
)
|
||||||
|
|
||||||
|
tool_ids = _resolve_model_tool_ids(request.app, model_id)
|
||||||
|
features = _resolve_model_features(request.app, model_id)
|
||||||
|
filter_ids = _resolve_model_filter_ids(request.app, model_id)
|
||||||
|
|
||||||
|
# Build full form_data — same shape as frontend POST.
|
||||||
|
# The channel: prefix routes pipeline events to the
|
||||||
|
# channel emitter in socket/main.py instead of the
|
||||||
|
# default chat emitter.
|
||||||
form_data = {
|
form_data = {
|
||||||
'model': model_id,
|
'model': model_id,
|
||||||
'messages': [
|
'messages': [
|
||||||
system_message,
|
system_message,
|
||||||
{'role': 'user', 'content': content},
|
{'role': 'user', 'content': content},
|
||||||
],
|
],
|
||||||
'stream': False,
|
'stream': True,
|
||||||
|
'chat_id': f'channel:{channel.id}',
|
||||||
|
'id': response_message.id,
|
||||||
|
'session_id': f'channel:{channel.id}',
|
||||||
|
'background_tasks': {},
|
||||||
}
|
}
|
||||||
|
if tool_ids:
|
||||||
|
form_data['tool_ids'] = tool_ids
|
||||||
|
if features:
|
||||||
|
form_data['features'] = features
|
||||||
|
if filter_ids:
|
||||||
|
form_data['filter_ids'] = filter_ids
|
||||||
|
|
||||||
res = await generate_chat_completion(
|
# Call the full chat completion pipeline — streaming,
|
||||||
request,
|
# tools, filters, RAG — everything. The pipeline runs as
|
||||||
form_data=form_data,
|
# an async task; the channel emitter handles progressive
|
||||||
user=user,
|
# message updates via socket events.
|
||||||
|
await request.app.state.CHAT_COMPLETION_HANDLER(
|
||||||
|
request, form_data, user=user
|
||||||
)
|
)
|
||||||
|
|
||||||
if res:
|
|
||||||
if res.get('choices', []) and len(res['choices']) > 0:
|
|
||||||
await update_message_by_id(
|
|
||||||
request,
|
|
||||||
channel.id,
|
|
||||||
response_message.id,
|
|
||||||
MessageForm(
|
|
||||||
**{
|
|
||||||
'content': res['choices'][0]['message']['content'],
|
|
||||||
'meta': {
|
|
||||||
'done': True,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
),
|
|
||||||
user,
|
|
||||||
db,
|
|
||||||
)
|
|
||||||
elif res.get('error', None):
|
|
||||||
await update_message_by_id(
|
|
||||||
request,
|
|
||||||
channel.id,
|
|
||||||
response_message.id,
|
|
||||||
MessageForm(
|
|
||||||
**{
|
|
||||||
'content': f'Error: {res["error"]}',
|
|
||||||
'meta': {
|
|
||||||
'done': True,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
),
|
|
||||||
user,
|
|
||||||
db,
|
|
||||||
)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.info(e)
|
log.exception(e)
|
||||||
pass
|
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|||||||
@@ -832,7 +832,80 @@ async def disconnect(sid):
|
|||||||
# print(f"Unknown session ID {sid} disconnected")
|
# print(f"Unknown session ID {sid} disconnected")
|
||||||
|
|
||||||
|
|
||||||
|
async def _make_channel_emitter(request_info):
|
||||||
|
"""Event emitter that routes pipeline output to a channel message.
|
||||||
|
|
||||||
|
Translates chat:completion events into channel message:update socket
|
||||||
|
emissions, throttled to avoid flooding with per-token updates.
|
||||||
|
"""
|
||||||
|
channel_id = request_info['chat_id'].removeprefix('channel:')
|
||||||
|
message_id = request_info['message_id']
|
||||||
|
|
||||||
|
state = {'last_emit_at': 0.0}
|
||||||
|
THROTTLE_INTERVAL = 0.15 # ~6 updates/sec
|
||||||
|
|
||||||
|
async def _emit_channel_update(content: str, done: bool = False):
|
||||||
|
from open_webui.models.messages import Messages, MessageForm
|
||||||
|
|
||||||
|
update_form = MessageForm(content=content)
|
||||||
|
if done:
|
||||||
|
# Merge done flag into existing meta (preserve model_id etc.)
|
||||||
|
msg = await Messages.get_message_by_id(message_id)
|
||||||
|
existing_meta = (msg.meta or {}) if msg else {}
|
||||||
|
update_form = MessageForm(
|
||||||
|
content=content,
|
||||||
|
meta={**existing_meta, 'done': True},
|
||||||
|
)
|
||||||
|
|
||||||
|
await Messages.update_message_by_id(message_id, update_form)
|
||||||
|
message = await Messages.get_message_by_id(message_id)
|
||||||
|
if message:
|
||||||
|
await sio.emit(
|
||||||
|
'events:channel',
|
||||||
|
{
|
||||||
|
'channel_id': channel_id,
|
||||||
|
'message_id': message_id,
|
||||||
|
'data': {
|
||||||
|
'type': 'message:update',
|
||||||
|
'data': message.model_dump(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
to=f'channel:{channel_id}',
|
||||||
|
)
|
||||||
|
|
||||||
|
async def __channel_emitter__(event_data):
|
||||||
|
event_type = event_data.get('type')
|
||||||
|
|
||||||
|
if event_type == 'chat:completion':
|
||||||
|
data = event_data.get('data', {})
|
||||||
|
content = data.get('content', '')
|
||||||
|
done = data.get('done', False)
|
||||||
|
|
||||||
|
if not content and not done:
|
||||||
|
return
|
||||||
|
|
||||||
|
now = __import__('time').time()
|
||||||
|
if done or (now - state['last_emit_at']) >= THROTTLE_INTERVAL:
|
||||||
|
state['last_emit_at'] = now
|
||||||
|
await _emit_channel_update(content, done)
|
||||||
|
|
||||||
|
elif event_type == 'chat:message:error':
|
||||||
|
error = event_data.get('data', {}).get('error', {})
|
||||||
|
error_content = (
|
||||||
|
error.get('content', 'An error occurred')
|
||||||
|
if isinstance(error, dict)
|
||||||
|
else str(error)
|
||||||
|
)
|
||||||
|
await _emit_channel_update(f'Error: {error_content}', done=True)
|
||||||
|
|
||||||
|
return __channel_emitter__
|
||||||
|
|
||||||
|
|
||||||
async def get_event_emitter(request_info, update_db=True):
|
async def get_event_emitter(request_info, update_db=True):
|
||||||
|
# Channel mode: route pipeline output to channel message updates
|
||||||
|
if request_info.get('chat_id', '').startswith('channel:'):
|
||||||
|
return await _make_channel_emitter(request_info)
|
||||||
|
|
||||||
async def __event_emitter__(event_data):
|
async def __event_emitter__(event_data):
|
||||||
user_id = request_info['user_id']
|
user_id = request_info['user_id']
|
||||||
chat_id = request_info['chat_id']
|
chat_id = request_info['chat_id']
|
||||||
|
|||||||
@@ -1708,7 +1708,7 @@ async def add_file_context(messages: list, chat_id: str, user) -> list:
|
|||||||
"""
|
"""
|
||||||
Add file URLs to messages for native function calling.
|
Add file URLs to messages for native function calling.
|
||||||
"""
|
"""
|
||||||
if not chat_id or chat_id.startswith('local:'):
|
if not chat_id or chat_id.startswith('local:') or chat_id.startswith('channel:'):
|
||||||
return messages
|
return messages
|
||||||
|
|
||||||
chat = await Chats.get_chat_by_id_and_user_id(chat_id, user.id)
|
chat = await Chats.get_chat_by_id_and_user_id(chat_id, user.id)
|
||||||
@@ -1764,7 +1764,7 @@ async def chat_image_generation_handler(request: Request, form_data: dict, extra
|
|||||||
if not chat_id or not isinstance(chat_id, str) or not __event_emitter__:
|
if not chat_id or not isinstance(chat_id, str) or not __event_emitter__:
|
||||||
return form_data
|
return form_data
|
||||||
|
|
||||||
if chat_id.startswith('local:'):
|
if chat_id.startswith('local:') or chat_id.startswith('channel:'):
|
||||||
message_list = form_data.get('messages', [])
|
message_list = form_data.get('messages', [])
|
||||||
else:
|
else:
|
||||||
chat = await Chats.get_chat_by_id_and_user_id(chat_id, user.id)
|
chat = await Chats.get_chat_by_id_and_user_id(chat_id, user.id)
|
||||||
@@ -2296,7 +2296,7 @@ async def process_chat_payload(request, form_data, user, metadata, model):
|
|||||||
chat_id = metadata.get('chat_id')
|
chat_id = metadata.get('chat_id')
|
||||||
user_message_id = metadata.get('user_message_id')
|
user_message_id = metadata.get('user_message_id')
|
||||||
|
|
||||||
if chat_id and user_message_id and not chat_id.startswith('local:'):
|
if chat_id and user_message_id and not chat_id.startswith('local:') and not chat_id.startswith('channel:'):
|
||||||
db_messages = await load_messages_from_db(chat_id, user_message_id)
|
db_messages = await load_messages_from_db(chat_id, user_message_id)
|
||||||
if db_messages:
|
if db_messages:
|
||||||
# Continue: frontend sends assistant_message_id when continuing
|
# Continue: frontend sends assistant_message_id when continuing
|
||||||
@@ -3058,7 +3058,7 @@ async def background_tasks_handler(ctx):
|
|||||||
message = None
|
message = None
|
||||||
messages = []
|
messages = []
|
||||||
|
|
||||||
if 'chat_id' in metadata and not metadata['chat_id'].startswith('local:'):
|
if 'chat_id' in metadata and not metadata['chat_id'].startswith('local:') and not metadata['chat_id'].startswith('channel:'):
|
||||||
messages_map = await Chats.get_messages_map_by_chat_id(metadata['chat_id'])
|
messages_map = await Chats.get_messages_map_by_chat_id(metadata['chat_id'])
|
||||||
message = messages_map.get(metadata['message_id']) if messages_map else None
|
message = messages_map.get(metadata['message_id']) if messages_map else None
|
||||||
|
|
||||||
@@ -3138,7 +3138,7 @@ async def background_tasks_handler(ctx):
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
if not metadata.get('chat_id', '').startswith('local:'):
|
if not metadata.get('chat_id', '').startswith('local:') and not metadata.get('chat_id', '').startswith('channel:'):
|
||||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||||
metadata['chat_id'],
|
metadata['chat_id'],
|
||||||
metadata['message_id'],
|
metadata['message_id'],
|
||||||
@@ -3150,7 +3150,7 @@ async def background_tasks_handler(ctx):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
if not metadata.get('chat_id', '').startswith('local:'): # Only update titles and tags for non-temp chats
|
if not metadata.get('chat_id', '').startswith('local:') and not metadata.get('chat_id', '').startswith('channel:'): # Only update titles and tags for non-temp chats
|
||||||
if TASKS.TITLE_GENERATION in tasks:
|
if TASKS.TITLE_GENERATION in tasks:
|
||||||
user_message = get_last_user_message(messages)
|
user_message = get_last_user_message(messages)
|
||||||
if user_message and len(user_message) > 100:
|
if user_message and len(user_message) > 100:
|
||||||
@@ -3274,7 +3274,7 @@ async def outlet_filter_handler(ctx):
|
|||||||
if not chat_id or not message_id:
|
if not chat_id or not message_id:
|
||||||
return
|
return
|
||||||
|
|
||||||
is_temp_chat = chat_id.startswith('local:')
|
is_temp_chat = chat_id.startswith('local:') or chat_id.startswith('channel:')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
messages_map = None
|
messages_map = None
|
||||||
@@ -3416,13 +3416,14 @@ async def non_streaming_chat_response_handler(response, ctx):
|
|||||||
|
|
||||||
log.error('Provider returned error (non-streaming): %s', error)
|
log.error('Provider returned error (non-streaming): %s', error)
|
||||||
|
|
||||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
if not metadata['chat_id'].startswith('channel:'):
|
||||||
metadata['chat_id'],
|
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||||
metadata['message_id'],
|
metadata['chat_id'],
|
||||||
{
|
metadata['message_id'],
|
||||||
'error': {'content': error},
|
{
|
||||||
},
|
'error': {'content': error},
|
||||||
)
|
},
|
||||||
|
)
|
||||||
if isinstance(error, str) or isinstance(error, dict):
|
if isinstance(error, str) or isinstance(error, dict):
|
||||||
await event_emitter(
|
await event_emitter(
|
||||||
{
|
{
|
||||||
@@ -3431,7 +3432,7 @@ async def non_streaming_chat_response_handler(response, ctx):
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
if 'selected_model_id' in response_data:
|
if 'selected_model_id' in response_data and not metadata['chat_id'].startswith('channel:'):
|
||||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||||
metadata['chat_id'],
|
metadata['chat_id'],
|
||||||
metadata['message_id'],
|
metadata['message_id'],
|
||||||
@@ -3452,7 +3453,7 @@ async def non_streaming_chat_response_handler(response, ctx):
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
title = await Chats.get_chat_title_by_id(metadata['chat_id'])
|
title = await Chats.get_chat_title_by_id(metadata['chat_id']) if not metadata['chat_id'].startswith('channel:') else ''
|
||||||
|
|
||||||
# Use output from backend if provided (OR-compliant backends),
|
# Use output from backend if provided (OR-compliant backends),
|
||||||
# otherwise generate from response content
|
# otherwise generate from response content
|
||||||
@@ -3483,17 +3484,18 @@ async def non_streaming_chat_response_handler(response, ctx):
|
|||||||
# Save message in the database
|
# Save message in the database
|
||||||
usage = normalize_usage(response_data.get('usage', {}) or {})
|
usage = normalize_usage(response_data.get('usage', {}) or {})
|
||||||
|
|
||||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
if not metadata['chat_id'].startswith('channel:'):
|
||||||
metadata['chat_id'],
|
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||||
metadata['message_id'],
|
metadata['chat_id'],
|
||||||
{
|
metadata['message_id'],
|
||||||
'done': True,
|
{
|
||||||
'role': 'assistant',
|
'done': True,
|
||||||
'content': content,
|
'role': 'assistant',
|
||||||
'output': response_output,
|
'content': content,
|
||||||
**({'usage': usage} if usage else {}),
|
'output': response_output,
|
||||||
},
|
**({'usage': usage} if usage else {}),
|
||||||
)
|
},
|
||||||
|
)
|
||||||
|
|
||||||
# Send a webhook notification if the user is not active
|
# Send a webhook notification if the user is not active
|
||||||
if request.app.state.config.ENABLE_USER_WEBHOOKS and not await Users.is_user_active(user.id):
|
if request.app.state.config.ENABLE_USER_WEBHOOKS and not await Users.is_user_active(user.id):
|
||||||
@@ -4348,7 +4350,7 @@ async def streaming_chat_response_handler(response, ctx):
|
|||||||
if end:
|
if end:
|
||||||
break
|
break
|
||||||
|
|
||||||
if ENABLE_REALTIME_CHAT_SAVE:
|
if ENABLE_REALTIME_CHAT_SAVE and not metadata['chat_id'].startswith('channel:'):
|
||||||
# Save message in the database
|
# Save message in the database
|
||||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||||
metadata['chat_id'],
|
metadata['chat_id'],
|
||||||
@@ -5024,7 +5026,7 @@ async def streaming_chat_response_handler(response, ctx):
|
|||||||
if item.get('status') == 'in_progress':
|
if item.get('status') == 'in_progress':
|
||||||
item['status'] = 'completed'
|
item['status'] = 'completed'
|
||||||
|
|
||||||
title = await Chats.get_chat_title_by_id(metadata['chat_id'])
|
title = await Chats.get_chat_title_by_id(metadata['chat_id']) if not metadata['chat_id'].startswith('channel:') else ''
|
||||||
data = {
|
data = {
|
||||||
'done': True,
|
'done': True,
|
||||||
'content': serialize_output(output),
|
'content': serialize_output(output),
|
||||||
@@ -5033,30 +5035,31 @@ async def streaming_chat_response_handler(response, ctx):
|
|||||||
**({'usage': usage} if usage else {}),
|
**({'usage': usage} if usage else {}),
|
||||||
}
|
}
|
||||||
|
|
||||||
if not ENABLE_REALTIME_CHAT_SAVE:
|
if not metadata['chat_id'].startswith('channel:'):
|
||||||
# Save message in the database
|
if not ENABLE_REALTIME_CHAT_SAVE:
|
||||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
# Save message in the database
|
||||||
metadata['chat_id'],
|
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||||
metadata['message_id'],
|
metadata['chat_id'],
|
||||||
{
|
metadata['message_id'],
|
||||||
'done': True,
|
{
|
||||||
'content': serialize_output(output),
|
'done': True,
|
||||||
'output': output,
|
'content': serialize_output(output),
|
||||||
**({'usage': usage} if usage else {}),
|
'output': output,
|
||||||
},
|
**({'usage': usage} if usage else {}),
|
||||||
)
|
},
|
||||||
elif usage:
|
)
|
||||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
elif usage:
|
||||||
metadata['chat_id'],
|
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||||
metadata['message_id'],
|
metadata['chat_id'],
|
||||||
{'done': True, 'usage': usage},
|
metadata['message_id'],
|
||||||
)
|
{'done': True, 'usage': usage},
|
||||||
else:
|
)
|
||||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
else:
|
||||||
metadata['chat_id'],
|
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||||
metadata['message_id'],
|
metadata['chat_id'],
|
||||||
{'done': True},
|
metadata['message_id'],
|
||||||
)
|
{'done': True},
|
||||||
|
)
|
||||||
|
|
||||||
# Send a webhook notification if the user is not active
|
# Send a webhook notification if the user is not active
|
||||||
if request.app.state.config.ENABLE_USER_WEBHOOKS and not await Users.is_user_active(user.id):
|
if request.app.state.config.ENABLE_USER_WEBHOOKS and not await Users.is_user_active(user.id):
|
||||||
@@ -5103,22 +5106,23 @@ async def streaming_chat_response_handler(response, ctx):
|
|||||||
|
|
||||||
async def save_cancelled_state():
|
async def save_cancelled_state():
|
||||||
await event_emitter({'type': 'chat:tasks:cancel'})
|
await event_emitter({'type': 'chat:tasks:cancel'})
|
||||||
if not ENABLE_REALTIME_CHAT_SAVE:
|
if not metadata['chat_id'].startswith('channel:'):
|
||||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
if not ENABLE_REALTIME_CHAT_SAVE:
|
||||||
metadata['chat_id'],
|
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||||
metadata['message_id'],
|
metadata['chat_id'],
|
||||||
{
|
metadata['message_id'],
|
||||||
'done': True,
|
{
|
||||||
'content': serialize_output(output),
|
'done': True,
|
||||||
'output': output,
|
'content': serialize_output(output),
|
||||||
},
|
'output': output,
|
||||||
)
|
},
|
||||||
else:
|
)
|
||||||
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
else:
|
||||||
metadata['chat_id'],
|
await Chats.upsert_message_to_chat_by_id_and_message_id(
|
||||||
metadata['message_id'],
|
metadata['chat_id'],
|
||||||
{'done': True},
|
metadata['message_id'],
|
||||||
)
|
{'done': True},
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await asyncio.shield(save_cancelled_state())
|
await asyncio.shield(save_cancelled_state())
|
||||||
|
|||||||
Reference in New Issue
Block a user