issue: synchronous sqlalchemy usage blocks the event-loop, seriously harming responsiveness #4263

Closed
opened 2025-11-11 15:49:52 -06:00 by GiteaMirror · 3 comments
Owner

Originally created by @lattwood on GitHub (Mar 5, 2025).

Check Existing Issues

  • I have searched the existing issues and discussions.

Installation Method

Git Clone

Open WebUI Version

n/a (master)

Ollama Version (if applicable)

No response

Operating System

n/a

Browser (if applicable)

n/a

Confirmation

  • I have read and followed all instructions in README.md.
  • I am using the latest version of both Open WebUI and Ollama.
  • I have checked the browser console logs.
  • I have checked the Docker container logs.
  • I have listed steps to reproduce the bug in detail.

Expected Behavior

Database queries should be executed asynchronously when being run via async request handlers.

Actual Behavior

Database queries in request handlers are executed synchronously from a asynchronous context, blocking the Python async event-loop.

Steps to Reproduce

4770285c04/backend/open_webui/models/chats.py (L108)
4770285c04/backend/open_webui/routers/chats.py (L97-L100)

view those links? Do some grepping? :)

Logs & Screenshots

N/A

Additional Information

I am assuming a high level understanding of event-loops as they relate to async Python, but I am happy to explain where needed. Around blockage specifically, FastAPI is unable to accept connections, respond to other requests, etc, all while Python/SQLAlchemy is blocking the event loop waiting for a response from the database.

I have a lot of empathy for the team & project over this, because I've had to deal with this before on closed source projects. It sucks there isn't more widespread knowledge about this issue, but at least there's usually three ways to solve it. In this case there's two, but I'll include all three options for the sake of completeness, and I've provided a python sample at the bottom of this issue that will demonstrate the ultimate impact of this problem, as well as how the 3 options listed can solve it.

  1. Convert all the async handlers to sync, and rely on FastAPI's implicit support for synchronous handlers to not block the event loop by running them in a Threadpool. (included for completeness)
    • Pros
    • Cons
      • Not on the table due to asyncio.gather usage at the very least.
  2. Wrap all the database code in Starlette's run_in_threadpool method. This is what FastAPI does internally when calling synchronous handlers or doing Depends resolution. An example is below.
    • Pros
      • Nowhere near as labour intensive as Option 3.
      • Can also be implemented/created as a decorator (and I'm including the code for that)
    • Cons
      • Bluntly, it's a bandaid
      • Eats into the FastAPI threadpool
# Using run_in_threadpool
from starlette.concurrency import run_in_threadpool

def sync_function(argument1, argument2):
    # logic for the sync function here
    pass

async def example_route():
    result = await run_in_threadpool(sync_function, 'argument1', 'argument2')
  1. Switch to async SQLAlchemy.
    • Pros
      • The correct way to do things
    • Cons
      • Requires duplicating everything that's sync as async, including connection pools & engines. There's create_engine_async vs create_engine.
      • Lots of footguns in using SQLAlchemy async, because there's so many ways to implicitly do network IO due to SQLAlchemy's lazy loading support which would block the event loop!
      • You still run the risk of someone else doing stuff on the event-loop and blocking it.

Here's an example of the decorator solution that creates async versions of all methods on a class and prefixes them with async_. The example also demonstrates what the UX is for handling concurrent HTTP requests on a single FastAPI/Uvicorn worker for the current state of affairs, as well as the three options mentioned above, through the use of asyncio.gather.

#!/usr/bin/env python

import asyncio
import inspect
import time
from functools import wraps
from starlette.concurrency import run_in_threadpool
from typing_extensions import dataclass_transform

@dataclass_transform()
def asyncify(target):
    if inspect.isclass(target):  # If it's a class
        for name, method in target.__dict__.copy().items():
            if callable(method) and not name.startswith("__"):
                # Skip already async methods
                if inspect.iscoroutinefunction(method):
                    continue

                # Wrap synchronous methods
                @wraps(method)
                async def async_wrapper(self, *args, _method=method, **kwargs):
                    return await run_in_threadpool(_method, self, *args, **kwargs)

                setattr(target, "async_" + name, async_wrapper)
        return target
    elif callable(target):  # If it's a function or method
        # Skip if the function is already async
        if inspect.iscoroutinefunction(target):
            return target

        @wraps(target)
        async def async_wrapper(*args, **kwargs):
            return await run_in_threadpool(target, *args, **kwargs)

        return async_wrapper
    else:
        raise TypeError("asyncify can only be applied to classes, functions, or methods.")

@asyncify
class ExampleSyncAsync:
    def __init__(self):
        pass

    async def current_request_handler(self, num):
        # this represents the current state of affairs in the repo
        print(f"Handling Current Request {num}")
        self.fake_sync_database_operation(num)
        print(f"Current Request {num} handled")

    def option1_request_handler(self, num):
        # this represents option 1
        print(f"Handling Sync Request {num}")
        self.fake_sync_database_operation(num)
        print(f"Sync Request {num} handled")

    async def option2_request_handler(self, num):
        # this represents option 2
        print(f"Handling Async Request {num}")
        await self.async_fake_sync_database_operation(num)
        print(f"Async Request {num} handled")

    async def option3_request_handler(self, num):
        # this represents option 3
        print(f"Handling Async Request {num}")
        await self.async_fake_sync_database_operation(num)
        print(f"Async Request {num} handled")

    def fake_sync_database_operation(self, num):
        # this represents using sqlalchemy sync
        print(f"Doing Sync DB OP {num}")
        time.sleep(1)  # this represents sending the request to the database & getting a response back
        print(f"Done Sync DB OP {num}")

    async def fake_async_database_operation(self, num):
        # this represents using sqlalchemy async
        print(f"Doing Sync DB OP {num}")
        asyncio.sleep(1)  # this represents sending the request to the database & getting a response back
        print(f"Done Sync DB OP {num}")


async def main():
    print("Starting Demo")
    print()
    esa = ExampleSyncAsync()

    print("Running Requests Current Style")
    print("==============================")
    await asyncio.gather(
            esa.current_request_handler(1),
            esa.current_request_handler(2),
            esa.current_request_handler(3),
            esa.current_request_handler(4),
            esa.current_request_handler(5),
    )
    print("Requests Ran Current Style")
    print()

    print("Running Requests Option 1 Style")
    print("==============================")
    await asyncio.gather( # i'm using the magic async_ methods as that's what FastAPI does under the hood
            esa.async_option1_request_handler(1),
            esa.async_option1_request_handler(2),
            esa.async_option1_request_handler(3),
            esa.async_option1_request_handler(4),
            esa.async_option1_request_handler(5),
    )
    print("Requests Ran Option 1 Style")
    print()

    print("Running Requests Option 2 Style")
    print("==============================")
    await asyncio.gather(
            esa.option2_request_handler(1),
            esa.option2_request_handler(2),
            esa.option2_request_handler(3),
            esa.option2_request_handler(4),
            esa.option2_request_handler(5),
    )
    print("Requests Ran Option 2 Style")
    print()

    print("Running Requests Option 3 Style")
    print("==============================")
    await asyncio.gather(
            esa.option3_request_handler(1),
            esa.option3_request_handler(2),
            esa.option3_request_handler(3),
            esa.option3_request_handler(4),
            esa.option3_request_handler(5),
    )
    print("Requests Ran Option 3 Style")


asyncio.run(main())

And here is the output when I run it locally.

Starting Demo

Running Requests Current Style
==============================
Handling Current Request 1
Doing Sync DB OP 1
Done Sync DB OP 1
Current Request 1 handled
Handling Current Request 2
Doing Sync DB OP 2
Done Sync DB OP 2
Current Request 2 handled
Handling Current Request 3
Doing Sync DB OP 3
Done Sync DB OP 3
Current Request 3 handled
Handling Current Request 4
Doing Sync DB OP 4
Done Sync DB OP 4
Current Request 4 handled
Handling Current Request 5
Doing Sync DB OP 5
Done Sync DB OP 5
Current Request 5 handled
Requests Ran Current Style

Running Requests Option 1 Style
==============================
Handling Sync Request 2
Doing Sync DB OP 2
Handling Sync Request 1
Doing Sync DB OP 1
Handling Sync Request 3
Doing Sync DB OP 3
Handling Sync Request 4
Doing Sync DB OP 4
Handling Sync Request 5
Doing Sync DB OP 5
Done Sync DB OP 1
Sync Request 1 handled
Done Sync DB OP 3
Sync Request 3 handled
Done Sync DB OP 2
Sync Request 2 handled
Done Sync DB OP 5
Sync Request 5 handled
Done Sync DB OP 4
Sync Request 4 handled
Requests Ran Option 1 Style

Running Requests Option 2 Style
==============================
Handling Async Request 1
Handling Async Request 2
Handling Async Request 3
Handling Async Request 4
Handling Async Request 5
Doing Sync DB OP 5
Doing Sync DB OP 2
Doing Sync DB OP 3
Doing Sync DB OP 4
Doing Sync DB OP 1
Done Sync DB OP 3
Done Sync DB OP 2
Done Sync DB OP 1
Done Sync DB OP 4
Async Request 3 handled
Async Request 2 handled
Async Request 1 handled
Async Request 4 handled
Done Sync DB OP 5
Async Request 5 handled
Requests Ran Option 2 Style

Running Requests Option 3 Style
==============================
Handling Async Request 1
Handling Async Request 2
Handling Async Request 3
Handling Async Request 4
Handling Async Request 5
Doing Sync DB OP 5
Doing Sync DB OP 2
Doing Sync DB OP 3
Doing Sync DB OP 4
Doing Sync DB OP 1
Done Sync DB OP 2
Done Sync DB OP 3
Done Sync DB OP 1
Done Sync DB OP 4
Done Sync DB OP 5
Async Request 2 handled
Async Request 3 handled
Async Request 1 handled
Async Request 4 handled
Async Request 5 handled
Requests Ran Option 3 Style
Originally created by @lattwood on GitHub (Mar 5, 2025). ### Check Existing Issues - [x] I have searched the existing issues and discussions. ### Installation Method Git Clone ### Open WebUI Version n/a (master) ### Ollama Version (if applicable) _No response_ ### Operating System n/a ### Browser (if applicable) n/a ### Confirmation - [x] I have read and followed all instructions in `README.md`. - [x] I am using the latest version of **both** Open WebUI and Ollama. - [x] I have checked the browser console logs. - [x] I have checked the Docker container logs. - [x] I have listed steps to reproduce the bug in detail. ### Expected Behavior Database queries should be executed asynchronously when being run via async request handlers. ### Actual Behavior Database queries in request handlers are executed synchronously from a asynchronous context, blocking the Python async event-loop. ### Steps to Reproduce https://github.com/open-webui/open-webui/blob/4770285c04b81dfc3eb9ac173dfb2a8afef68105/backend/open_webui/models/chats.py#L108 https://github.com/open-webui/open-webui/blob/4770285c04b81dfc3eb9ac173dfb2a8afef68105/backend/open_webui/routers/chats.py#L97-L100 view those links? Do some grepping? :) ### Logs & Screenshots N/A ### Additional Information I am assuming a high level understanding of event-loops as they relate to async Python, but I am happy to explain where needed. Around blockage specifically, FastAPI is unable to accept connections, respond to other requests, etc, all while Python/SQLAlchemy is blocking the event loop waiting for a response from the database. I have a *_lot of empathy_* for the team & project over this, because I've had to deal with this before on closed source projects. It *sucks* there isn't more widespread knowledge about this issue, but at least there's usually three ways to solve it. In this case there's two, but I'll include all three options for the sake of completeness, and I've provided a python sample at the bottom of this issue that will demonstrate the ultimate impact of this problem, as well as how the 3 options listed can solve it. 1. Convert all the async handlers to sync, and rely on FastAPI's implicit support for synchronous handlers to not block the event loop by running them in a Threadpool. (included for completeness) * Pros * You're already using this functionality implicitly when FastAPI is doing dependency resolution through `Depends` blocks. See the [get_session_user handler](https://github.com/open-webui/open-webui/blob/4770285c04b81dfc3eb9ac173dfb2a8afef68105/backend/open_webui/routers/auths.py#L72-L74) and its dependency, [get_current_user](https://github.com/open-webui/open-webui/blob/4770285c04b81dfc3eb9ac173dfb2a8afef68105/backend/open_webui/utils/auth.py#L152). * Cons * Not on the table due to `asyncio.gather` usage at the very least. 2. Wrap all the database code in Starlette's `run_in_threadpool` method. This is what FastAPI does internally when calling synchronous handlers or doing `Depends` resolution. An example is below. * Pros * Nowhere near as labour intensive as Option 3. * Can also be implemented/created as a decorator (and I'm including the code for that) * Cons * Bluntly, it's a bandaid * Eats into the FastAPI threadpool ```python # Using run_in_threadpool from starlette.concurrency import run_in_threadpool def sync_function(argument1, argument2): # logic for the sync function here pass async def example_route(): result = await run_in_threadpool(sync_function, 'argument1', 'argument2') ``` 3. Switch to async SQLAlchemy. * Pros * The correct way to do things * Cons * Requires duplicating everything that's sync as async, _including connection pools & engines_. There's `create_engine_async` vs `create_engine`. * Lots of footguns in using SQLAlchemy async, because there's so many ways to implicitly do network IO due to SQLAlchemy's lazy loading support which would block the event loop! * You still run the risk of someone else doing stuff on the event-loop and blocking it. Here's an example of the decorator solution that creates async versions of all methods on a class and prefixes them with `async_`. The example also demonstrates what the UX is for handling concurrent HTTP requests on a single FastAPI/Uvicorn worker for the current state of affairs, as well as the three options mentioned above, through the use of `asyncio.gather`. ```python #!/usr/bin/env python import asyncio import inspect import time from functools import wraps from starlette.concurrency import run_in_threadpool from typing_extensions import dataclass_transform @dataclass_transform() def asyncify(target): if inspect.isclass(target): # If it's a class for name, method in target.__dict__.copy().items(): if callable(method) and not name.startswith("__"): # Skip already async methods if inspect.iscoroutinefunction(method): continue # Wrap synchronous methods @wraps(method) async def async_wrapper(self, *args, _method=method, **kwargs): return await run_in_threadpool(_method, self, *args, **kwargs) setattr(target, "async_" + name, async_wrapper) return target elif callable(target): # If it's a function or method # Skip if the function is already async if inspect.iscoroutinefunction(target): return target @wraps(target) async def async_wrapper(*args, **kwargs): return await run_in_threadpool(target, *args, **kwargs) return async_wrapper else: raise TypeError("asyncify can only be applied to classes, functions, or methods.") @asyncify class ExampleSyncAsync: def __init__(self): pass async def current_request_handler(self, num): # this represents the current state of affairs in the repo print(f"Handling Current Request {num}") self.fake_sync_database_operation(num) print(f"Current Request {num} handled") def option1_request_handler(self, num): # this represents option 1 print(f"Handling Sync Request {num}") self.fake_sync_database_operation(num) print(f"Sync Request {num} handled") async def option2_request_handler(self, num): # this represents option 2 print(f"Handling Async Request {num}") await self.async_fake_sync_database_operation(num) print(f"Async Request {num} handled") async def option3_request_handler(self, num): # this represents option 3 print(f"Handling Async Request {num}") await self.async_fake_sync_database_operation(num) print(f"Async Request {num} handled") def fake_sync_database_operation(self, num): # this represents using sqlalchemy sync print(f"Doing Sync DB OP {num}") time.sleep(1) # this represents sending the request to the database & getting a response back print(f"Done Sync DB OP {num}") async def fake_async_database_operation(self, num): # this represents using sqlalchemy async print(f"Doing Sync DB OP {num}") asyncio.sleep(1) # this represents sending the request to the database & getting a response back print(f"Done Sync DB OP {num}") async def main(): print("Starting Demo") print() esa = ExampleSyncAsync() print("Running Requests Current Style") print("==============================") await asyncio.gather( esa.current_request_handler(1), esa.current_request_handler(2), esa.current_request_handler(3), esa.current_request_handler(4), esa.current_request_handler(5), ) print("Requests Ran Current Style") print() print("Running Requests Option 1 Style") print("==============================") await asyncio.gather( # i'm using the magic async_ methods as that's what FastAPI does under the hood esa.async_option1_request_handler(1), esa.async_option1_request_handler(2), esa.async_option1_request_handler(3), esa.async_option1_request_handler(4), esa.async_option1_request_handler(5), ) print("Requests Ran Option 1 Style") print() print("Running Requests Option 2 Style") print("==============================") await asyncio.gather( esa.option2_request_handler(1), esa.option2_request_handler(2), esa.option2_request_handler(3), esa.option2_request_handler(4), esa.option2_request_handler(5), ) print("Requests Ran Option 2 Style") print() print("Running Requests Option 3 Style") print("==============================") await asyncio.gather( esa.option3_request_handler(1), esa.option3_request_handler(2), esa.option3_request_handler(3), esa.option3_request_handler(4), esa.option3_request_handler(5), ) print("Requests Ran Option 3 Style") asyncio.run(main()) ``` And here is the output when I run it locally. ``` Starting Demo Running Requests Current Style ============================== Handling Current Request 1 Doing Sync DB OP 1 Done Sync DB OP 1 Current Request 1 handled Handling Current Request 2 Doing Sync DB OP 2 Done Sync DB OP 2 Current Request 2 handled Handling Current Request 3 Doing Sync DB OP 3 Done Sync DB OP 3 Current Request 3 handled Handling Current Request 4 Doing Sync DB OP 4 Done Sync DB OP 4 Current Request 4 handled Handling Current Request 5 Doing Sync DB OP 5 Done Sync DB OP 5 Current Request 5 handled Requests Ran Current Style Running Requests Option 1 Style ============================== Handling Sync Request 2 Doing Sync DB OP 2 Handling Sync Request 1 Doing Sync DB OP 1 Handling Sync Request 3 Doing Sync DB OP 3 Handling Sync Request 4 Doing Sync DB OP 4 Handling Sync Request 5 Doing Sync DB OP 5 Done Sync DB OP 1 Sync Request 1 handled Done Sync DB OP 3 Sync Request 3 handled Done Sync DB OP 2 Sync Request 2 handled Done Sync DB OP 5 Sync Request 5 handled Done Sync DB OP 4 Sync Request 4 handled Requests Ran Option 1 Style Running Requests Option 2 Style ============================== Handling Async Request 1 Handling Async Request 2 Handling Async Request 3 Handling Async Request 4 Handling Async Request 5 Doing Sync DB OP 5 Doing Sync DB OP 2 Doing Sync DB OP 3 Doing Sync DB OP 4 Doing Sync DB OP 1 Done Sync DB OP 3 Done Sync DB OP 2 Done Sync DB OP 1 Done Sync DB OP 4 Async Request 3 handled Async Request 2 handled Async Request 1 handled Async Request 4 handled Done Sync DB OP 5 Async Request 5 handled Requests Ran Option 2 Style Running Requests Option 3 Style ============================== Handling Async Request 1 Handling Async Request 2 Handling Async Request 3 Handling Async Request 4 Handling Async Request 5 Doing Sync DB OP 5 Doing Sync DB OP 2 Doing Sync DB OP 3 Doing Sync DB OP 4 Doing Sync DB OP 1 Done Sync DB OP 2 Done Sync DB OP 3 Done Sync DB OP 1 Done Sync DB OP 4 Done Sync DB OP 5 Async Request 2 handled Async Request 3 handled Async Request 1 handled Async Request 4 handled Async Request 5 handled Requests Ran Option 3 Style ```
GiteaMirror added the pythonenhancementbuggood first issuehelp wantedcore labels 2025-11-11 15:49:53 -06:00
Author
Owner

@tjbck commented on GitHub (Mar 5, 2025):

Second option seems to be the best for now, would appreciate PRs here!

@tjbck commented on GitHub (Mar 5, 2025): Second option seems to be the best for now, would appreciate PRs here!
Author
Owner

@gaboe commented on GitHub (Mar 12, 2025):

Implemented Solution: Automatically Generated Async Files with Type Safety

I've implemented a solution to this issue that takes a different approach from the suggestions, specifically addressing both the blocking event loop and the typing challenges that arise when working with async wrappers.

Our Approach: Separate Generated Async Modules

Instead of adding async methods to existing classes or using run_in_threadpool directly, we generate entire separate async modules with properly typed interfaces. Here's why we took this route:

Type Safety Challenges

The approach suggested in the original issue (adding async_ prefixed methods via a decorator) works functionally but creates significant typing problems:

  1. The async methods added at runtime aren't visible to static type checkers
  2. IDE autocompletion doesn't work properly with dynamically added methods
  3. Type-checking tools (mypy, pyright) can't validate these methods
  4. Runtime type inspection can't reliably detect parameter/return types

We initially tried solving this with PYI stub files, but this approach also had problems:

  • It shadowed/overrode the original types
  • Required maintaining duplicated type definitions
  • Created confusion between sync/async interfaces

Our Solution: @asyncify with Module Generation

Our decorator creates entire separate async modules with full typing support:

@asyncify("Channels")  # Name of the generated instance
class ChannelTable:
    def get_channels(self) -> list[ChannelModel]:
        with get_db() as db:
            channels = db.query(Channel).all()
            return [ChannelModel.model_validate(channel) for channel in channels]

This generates a channels_async.py file with:

class ChannelTableAsync:
    def __init__(self):
        self._instance = ChannelTable()

    async def get_channels_async(self):
        """Async version of get_channels"""
        return await run_in_threadpool(self._instance.get_channels)

# Create an instance of the async class
ChannelsAsync = ChannelTableAsync()

Key Benefits of This Approach

  1. Complete Type Safety: Both sync and async interfaces have proper typing that IDEs and type checkers can validate.

  2. Clean Import Separation: Code can explicitly import from either the sync or async module based on its needs:

    from models.channels import Channels  # Sync version
    from models.channels_async import ChannelsAsync  # Async version
    
  3. Minimal Runtime Overhead: No decorator overhead in production; all wrappers are generated at development time.

  4. Parameter Preservation: Our generator properly handles method signatures, preserving all parameters.

  5. Automatic Rebuilding: Files are only regenerated when the source changes (tracked via hash).

  6. Clear Documentation: Generated async methods include docstrings that reference their sync counterparts.

  7. No Event Loop Blocking: All database operations run in a thread pool, preventing FastAPI event loop blocking.

Implementation Notes

Unlike the original example, our implementation specifically:

  1. Creates a full class instance rather than calling class methods directly
  2. Preserves and passes all parameters properly
  3. Handles parameter defaults correctly
  4. Maintains the original method's signature for better error reporting

Performance Impact

We've seen significant improvements in concurrent request handling with this approach. The event loop stays responsive even under heavy database load, and requests are properly parallelized across the threadpool.

The code for our asyncify decorator is more sophisticated than the example in the issue, as it handles parameter passing, method signatures, and file generation with proper typing.

This approach strikes a good balance between developer experience (DX) and runtime performance without requiring a major rewrite of the codebase to fully async SQLAlchemy.

asyncify.py

import inspect
import hashlib
from functools import wraps
from pathlib import Path
from starlette.concurrency import run_in_threadpool
from typing import (
    Optional,
)


def _generate_async_module(cls, instance_name: Optional[str] = None) -> str:
    """Generate an async version of a class."""
    try:
        # Calculate hash of the source file
        source_file = inspect.getfile(cls)
        with open(source_file, "rb") as f:
            source_content = f.read()
            source_hash = hashlib.md5(source_content).hexdigest()
    except (TypeError, IOError):
        source_hash = "unknown"

    # Prepare the async class name with Async suffix
    async_cls_name = f"{cls.__name__}Async"

    # Prepare the async instance name
    if instance_name:
        async_instance_name = f"{instance_name}Async"
    else:
        # Default instance name based on class name with first letter lowercase
        base_name = cls.__name__
        async_instance_name = f"{base_name[0].lower()}{base_name[1:]}Async"

    # Start with module imports
    content = f"""# DO NOT EDIT THIS FILE
# This file was automatically generated from {cls.__name__}
# Source hash: {source_hash}

import inspect
from typing import Optional, List, Dict, Any, Union
from starlette.concurrency import run_in_threadpool
# Import the specific class we need for the implementation
from {cls.__module__} import {cls.__name__}
# Import everything from the original module to ensure all types are available
from {cls.__module__} import *
"""

    # Add class declaration with Async suffix
    content += f"\nclass {async_cls_name}:\n"

    # Add class initialization with an instance of the original class
    content += f"""    def __init__(self):
        self._instance = {cls.__name__}()
    
"""

    # Add class methods
    for name, method in inspect.getmembers(cls, predicate=inspect.isfunction):
        if not name.startswith("__"):
            # Skip already async methods
            if inspect.iscoroutinefunction(method):
                continue

            # Get the signature of the method to preserve parameters
            sig = inspect.signature(method)
            # Skip 'self' from the signature for the parameter list
            params = list(sig.parameters.values())[
                1:
            ]  # Skip the first parameter (self)
            # Create a parameter string for the method definition
            param_def = ", ".join([p.name for p in params])
            # Create a parameter string for passing parameters to the original method
            param_pass = ", ".join([f"{p.name}={p.name}" for p in params])

            # Add the async method with proper parameters
            if param_def:
                content += f"""    
    async def {name}_async(self, {param_def}):
        \"\"\"Async version of {name}\"\"\"
        return await run_in_threadpool(self._instance.{name}, {param_pass})
"""
            else:
                content += f"""    
    async def {name}_async(self):
        \"\"\"Async version of {name}\"\"\"
        return await run_in_threadpool(self._instance.{name})
"""

    # Add instance creation at the end of the file
    content += f"""
# Create an instance of the async class
{async_instance_name} = {async_cls_name}()
"""

    return content


def _save_async_module(module_file, content):
    """Save the async module to a file."""
    try:
        # Get module path and create the async module file
        module_path = Path(module_file)
        module_dir = module_path.parent
        module_name = module_path.stem
        async_module_file = module_dir / f"{module_name}_async.py"

        # Check if file exists and has the same content already
        needs_update = True
        if async_module_file.exists():
            try:
                with open(async_module_file, "r") as f:
                    existing_content = f.read()
                    # Check if source hash has changed
                    old_hash = None
                    for line in existing_content.splitlines()[:5]:
                        if "Source hash:" in line:
                            old_hash = line.split("Source hash: ")[1].strip()
                            break

                    new_hash = None
                    for line in content.splitlines()[:5]:
                        if "Source hash:" in line:
                            new_hash = line.split("Source hash: ")[1].strip()
                            break

                    if old_hash == new_hash:
                        needs_update = False
            except Exception as e:
                print(f"[_save_async_module] Error checking existing file: {e}")

        if needs_update:
            print(f"[_save_async_module] Generating async module: {async_module_file}")
            with open(async_module_file, "w") as f:
                f.write(content)
    except Exception as e:
        print(f"[_save_async_module] Error saving async module: {e}")


def asyncify(cls_or_instance_name=None):
    """
    Decorator to create an async version of a class or method.

    If applied to a class, it will create a new file with an async version of the class.
    If applied to a method, it will return an async version of the method.

    The async versions will use run_in_threadpool to avoid blocking the event loop.

    Args:
        cls_or_instance_name: Either the class to asyncify or a string with the instance name

    Example:

    # Simple usage with a class
    @asyncify
    class YourClass:
        ...

    # With instance name as a direct argument
    @asyncify("CustomName")
    class YourClass:
        ...
    """
    # Handle the case where the first parameter is an instance name string
    if isinstance(cls_or_instance_name, str):

        def decorator(cls):
            return _apply_asyncify(cls, cls_or_instance_name)

        return decorator

    # Handle the case where the decorator is used with no arguments
    if cls_or_instance_name is None:

        def decorator(cls):
            return _apply_asyncify(cls)

        return decorator

    # Handle the case where the decorator is applied directly to a class
    return _apply_asyncify(cls_or_instance_name)


def _apply_asyncify(target, instance_name: Optional[str] = None):
    """Internal function to apply the asyncify decorator with or without parameters."""
    if inspect.isclass(target):
        module_name = target.__module__
        module_file = inspect.getmodule(target).__file__

        # Create async version of the class
        async_class_content = _generate_async_module(target, instance_name)

        # Save the async module to a file
        _save_async_module(module_file, async_class_content)

        return target
    elif callable(target):
        # If it's a function or method, return an async version
        if inspect.iscoroutinefunction(target):
            return target

        @wraps(target)
        async def async_wrapper(*args, **kwargs):
            return await run_in_threadpool(target, *args, **kwargs)

        return async_wrapper
    else:
        raise TypeError(
            "asyncify can only be applied to classes, functions, or methods."
        )

What do you guys say about that?

@gaboe commented on GitHub (Mar 12, 2025): # Implemented Solution: Automatically Generated Async Files with Type Safety I've implemented a solution to this issue that takes a different approach from the suggestions, specifically addressing both the blocking event loop and the typing challenges that arise when working with async wrappers. ## Our Approach: Separate Generated Async Modules Instead of adding async methods to existing classes or using run_in_threadpool directly, we generate entire separate async modules with properly typed interfaces. Here's why we took this route: ### Type Safety Challenges The approach suggested in the original issue (adding async\_ prefixed methods via a decorator) works functionally but creates significant typing problems: 1. The async methods added at runtime aren't visible to static type checkers 2. IDE autocompletion doesn't work properly with dynamically added methods 3. Type-checking tools (mypy, pyright) can't validate these methods 4. Runtime type inspection can't reliably detect parameter/return types We initially tried solving this with PYI stub files, but this approach also had problems: - It shadowed/overrode the original types - Required maintaining duplicated type definitions - Created confusion between sync/async interfaces ### Our Solution: `@asyncify` with Module Generation Our decorator creates entire separate async modules with full typing support: ```python @asyncify("Channels") # Name of the generated instance class ChannelTable: def get_channels(self) -> list[ChannelModel]: with get_db() as db: channels = db.query(Channel).all() return [ChannelModel.model_validate(channel) for channel in channels] ``` This generates a `channels_async.py` file with: ```python class ChannelTableAsync: def __init__(self): self._instance = ChannelTable() async def get_channels_async(self): """Async version of get_channels""" return await run_in_threadpool(self._instance.get_channels) # Create an instance of the async class ChannelsAsync = ChannelTableAsync() ``` ### Key Benefits of This Approach 1. **Complete Type Safety**: Both sync and async interfaces have proper typing that IDEs and type checkers can validate. 2. **Clean Import Separation**: Code can explicitly import from either the sync or async module based on its needs: ```python from models.channels import Channels # Sync version from models.channels_async import ChannelsAsync # Async version ``` 3. **Minimal Runtime Overhead**: No decorator overhead in production; all wrappers are generated at development time. 4. **Parameter Preservation**: Our generator properly handles method signatures, preserving all parameters. 5. **Automatic Rebuilding**: Files are only regenerated when the source changes (tracked via hash). 6. **Clear Documentation**: Generated async methods include docstrings that reference their sync counterparts. 7. **No Event Loop Blocking**: All database operations run in a thread pool, preventing FastAPI event loop blocking. ## Implementation Notes Unlike the original example, our implementation specifically: 1. Creates a full class instance rather than calling class methods directly 2. Preserves and passes all parameters properly 3. Handles parameter defaults correctly 4. Maintains the original method's signature for better error reporting ## Performance Impact We've seen significant improvements in concurrent request handling with this approach. The event loop stays responsive even under heavy database load, and requests are properly parallelized across the threadpool. The code for our asyncify decorator is more sophisticated than the example in the issue, as it handles parameter passing, method signatures, and file generation with proper typing. This approach strikes a good balance between developer experience (DX) and runtime performance without requiring a major rewrite of the codebase to fully async SQLAlchemy. `asyncify.py` ```python import inspect import hashlib from functools import wraps from pathlib import Path from starlette.concurrency import run_in_threadpool from typing import ( Optional, ) def _generate_async_module(cls, instance_name: Optional[str] = None) -> str: """Generate an async version of a class.""" try: # Calculate hash of the source file source_file = inspect.getfile(cls) with open(source_file, "rb") as f: source_content = f.read() source_hash = hashlib.md5(source_content).hexdigest() except (TypeError, IOError): source_hash = "unknown" # Prepare the async class name with Async suffix async_cls_name = f"{cls.__name__}Async" # Prepare the async instance name if instance_name: async_instance_name = f"{instance_name}Async" else: # Default instance name based on class name with first letter lowercase base_name = cls.__name__ async_instance_name = f"{base_name[0].lower()}{base_name[1:]}Async" # Start with module imports content = f"""# DO NOT EDIT THIS FILE # This file was automatically generated from {cls.__name__} # Source hash: {source_hash} import inspect from typing import Optional, List, Dict, Any, Union from starlette.concurrency import run_in_threadpool # Import the specific class we need for the implementation from {cls.__module__} import {cls.__name__} # Import everything from the original module to ensure all types are available from {cls.__module__} import * """ # Add class declaration with Async suffix content += f"\nclass {async_cls_name}:\n" # Add class initialization with an instance of the original class content += f""" def __init__(self): self._instance = {cls.__name__}() """ # Add class methods for name, method in inspect.getmembers(cls, predicate=inspect.isfunction): if not name.startswith("__"): # Skip already async methods if inspect.iscoroutinefunction(method): continue # Get the signature of the method to preserve parameters sig = inspect.signature(method) # Skip 'self' from the signature for the parameter list params = list(sig.parameters.values())[ 1: ] # Skip the first parameter (self) # Create a parameter string for the method definition param_def = ", ".join([p.name for p in params]) # Create a parameter string for passing parameters to the original method param_pass = ", ".join([f"{p.name}={p.name}" for p in params]) # Add the async method with proper parameters if param_def: content += f""" async def {name}_async(self, {param_def}): \"\"\"Async version of {name}\"\"\" return await run_in_threadpool(self._instance.{name}, {param_pass}) """ else: content += f""" async def {name}_async(self): \"\"\"Async version of {name}\"\"\" return await run_in_threadpool(self._instance.{name}) """ # Add instance creation at the end of the file content += f""" # Create an instance of the async class {async_instance_name} = {async_cls_name}() """ return content def _save_async_module(module_file, content): """Save the async module to a file.""" try: # Get module path and create the async module file module_path = Path(module_file) module_dir = module_path.parent module_name = module_path.stem async_module_file = module_dir / f"{module_name}_async.py" # Check if file exists and has the same content already needs_update = True if async_module_file.exists(): try: with open(async_module_file, "r") as f: existing_content = f.read() # Check if source hash has changed old_hash = None for line in existing_content.splitlines()[:5]: if "Source hash:" in line: old_hash = line.split("Source hash: ")[1].strip() break new_hash = None for line in content.splitlines()[:5]: if "Source hash:" in line: new_hash = line.split("Source hash: ")[1].strip() break if old_hash == new_hash: needs_update = False except Exception as e: print(f"[_save_async_module] Error checking existing file: {e}") if needs_update: print(f"[_save_async_module] Generating async module: {async_module_file}") with open(async_module_file, "w") as f: f.write(content) except Exception as e: print(f"[_save_async_module] Error saving async module: {e}") def asyncify(cls_or_instance_name=None): """ Decorator to create an async version of a class or method. If applied to a class, it will create a new file with an async version of the class. If applied to a method, it will return an async version of the method. The async versions will use run_in_threadpool to avoid blocking the event loop. Args: cls_or_instance_name: Either the class to asyncify or a string with the instance name Example: # Simple usage with a class @asyncify class YourClass: ... # With instance name as a direct argument @asyncify("CustomName") class YourClass: ... """ # Handle the case where the first parameter is an instance name string if isinstance(cls_or_instance_name, str): def decorator(cls): return _apply_asyncify(cls, cls_or_instance_name) return decorator # Handle the case where the decorator is used with no arguments if cls_or_instance_name is None: def decorator(cls): return _apply_asyncify(cls) return decorator # Handle the case where the decorator is applied directly to a class return _apply_asyncify(cls_or_instance_name) def _apply_asyncify(target, instance_name: Optional[str] = None): """Internal function to apply the asyncify decorator with or without parameters.""" if inspect.isclass(target): module_name = target.__module__ module_file = inspect.getmodule(target).__file__ # Create async version of the class async_class_content = _generate_async_module(target, instance_name) # Save the async module to a file _save_async_module(module_file, async_class_content) return target elif callable(target): # If it's a function or method, return an async version if inspect.iscoroutinefunction(target): return target @wraps(target) async def async_wrapper(*args, **kwargs): return await run_in_threadpool(target, *args, **kwargs) return async_wrapper else: raise TypeError( "asyncify can only be applied to classes, functions, or methods." ) ``` What do you guys say about that?
Author
Owner

@tjbck commented on GitHub (Mar 14, 2025):

https://asyncer.tiangolo.com/ seems like a great option, PR Welcome!

@tjbck commented on GitHub (Mar 14, 2025): https://asyncer.tiangolo.com/ seems like a great option, PR Welcome!
Sign in to join this conversation.
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: github-starred/open-webui#4263