Allow pipe functions to return AsyncGenerator #1586

Closed
opened 2025-11-11 14:47:52 -06:00 by GiteaMirror · 2 comments
Owner

Originally created by @RobinBially on GitHub (Jul 23, 2024).

Is your feature request related to a problem? Please describe.
Currently, a function blocks the event loop if it is called synchronously, but if we call it asynchronously, we need to return a starlette StreamingResponse, which complicates the code.

Describe the solution you'd like
Just add this code to apps/webui/main.py, line 400:

         if isinstance(res, AsyncGenerator):
                async for line in res:
                    if isinstance(line, BaseModel):
                        line = line.model_dump_json()
                        line = f"data: {line}"
                    if isinstance(line, dict):
                        line = f"data: {json.dumps(line)}"

                    try:
                        line = line.decode("utf-8")
                    except:
                        pass

                    if line.startswith("data:"):
                        yield f"{line}\n\n"
                    else:
                        line = stream_message_template(form_data["model"], line)
                        yield f"data: {json.dumps(line)}\n\n"

and

                elif isinstance(res, AsyncGenerator):
                    async for line in res:
                        return line

in line 449 (non streaming case)

Additionally, we should be able to run a non-coroutine pipe function in a separate thread.

Originally created by @RobinBially on GitHub (Jul 23, 2024). **Is your feature request related to a problem? Please describe.** Currently, a function blocks the event loop if it is called synchronously, but if we call it asynchronously, we need to return a starlette StreamingResponse, which complicates the code. **Describe the solution you'd like** Just add this code to apps/webui/main.py, line 400: ``` if isinstance(res, AsyncGenerator): async for line in res: if isinstance(line, BaseModel): line = line.model_dump_json() line = f"data: {line}" if isinstance(line, dict): line = f"data: {json.dumps(line)}" try: line = line.decode("utf-8") except: pass if line.startswith("data:"): yield f"{line}\n\n" else: line = stream_message_template(form_data["model"], line) yield f"data: {json.dumps(line)}\n\n" ``` and ``` elif isinstance(res, AsyncGenerator): async for line in res: return line ``` in line 449 (non streaming case) Additionally, we should be able to run a non-coroutine pipe function in a separate thread.
Author
Owner

@justinh-rahb commented on GitHub (Jul 23, 2024):

PR welcomed 👍

@justinh-rahb commented on GitHub (Jul 23, 2024): PR welcomed 👍
Author
Owner

@tjbck commented on GitHub (Jul 24, 2024):

Added to dev!

@tjbck commented on GitHub (Jul 24, 2024): Added to dev!
Sign in to join this conversation.
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: github-starred/open-webui#1586