[GH-ISSUE #7685] Streaming chat/completions behind a gateway with timeout #51417

Closed
opened 2026-04-28 19:57:56 -05:00 by GiteaMirror · 3 comments
Owner

Originally created by @Upabjojr on GitHub (Nov 15, 2024).
Original GitHub issue: https://github.com/ollama/ollama/issues/7685

I am using Ollama on a server behind a gateway that has a 30 second timeout on every forwarded HTTP request. If Ollama takes more than 30 seconds to respond to the HTTP request, the connection will be reset.

So far, enabling streaming on chat/completions has been an efficient workaround, as streaming chunks of generated takes much less time than 30 seconds.

There are, however, some cases that still cause this issue, in particular:

  1. Posting a very long context may take more than 30 seconds to process before the streaming of chunks starts.
  2. If the Ollama server is busy responding at many parallel requests the streaming may take longer than 30 seconds to start.

In order to avoid hitting the timeout threshold that resets the connection to Ollama on my gateway, I was wondering if it is possible to add support to chat/completions for streaming empty strings immediately, even before the LLM text generation has started?

Originally created by @Upabjojr on GitHub (Nov 15, 2024). Original GitHub issue: https://github.com/ollama/ollama/issues/7685 I am using Ollama on a server behind a gateway that has a 30 second timeout on every forwarded HTTP request. If Ollama takes more than 30 seconds to respond to the HTTP request, the connection will be reset. So far, enabling streaming on chat/completions has been an efficient workaround, as streaming chunks of generated takes much less time than 30 seconds. There are, however, some cases that still cause this issue, in particular: 1. Posting a very long context may take more than 30 seconds to process before the streaming of chunks starts. 2. If the Ollama server is busy responding at many parallel requests the streaming may take longer than 30 seconds to start. In order to avoid hitting the timeout threshold that resets the connection to Ollama on my gateway, I was wondering if it is possible to add support to chat/completions for streaming empty strings immediately, even before the LLM text generation has started?
GiteaMirror added the feature request label 2026-04-28 19:57:56 -05:00
Author
Owner

@rick-github commented on GitHub (Nov 15, 2024):

I think it would be a while, if ever, before something like that would be added to a release, so it might be better to find a way to work around the problem you have. Which gateway service is being used? Can you add a service between the gateway and the ollama server?

<!-- gh-comment-id:2478614064 --> @rick-github commented on GitHub (Nov 15, 2024): I think it would be a while, if ever, before something like that would be added to a release, so it might be better to find a way to work around the problem you have. Which gateway service is being used? Can you add a service between the gateway and the ollama server?
Author
Owner

@Upabjojr commented on GitHub (Nov 15, 2024):

Can you add a service between the gateway and the ollama server?

Potentially yes, I could create a bridge service in front of ollama to modify the streaming response. But that would require the development of an ad-hoc proxy server.

I was wondering if someone else has similar issues.

<!-- gh-comment-id:2478748560 --> @Upabjojr commented on GitHub (Nov 15, 2024): > Can you add a service between the gateway and the ollama server? Potentially yes, I could create a bridge service in front of ollama to modify the streaming response. But that would require the development of an ad-hoc proxy server. I was wondering if someone else has similar issues.
Author
Owner

@rick-github commented on GitHub (Dec 13, 2024):

#!/usr/bin/env python3

import socket
import threading
import logging
import time
import io
import os
import errno

logging.basicConfig(level=logging.INFO) #, format="%(threadName)s:%(message)s")
logger = logging.getLogger(__name__)

class StreamingProxy:
    def __init__(self, clienthost='127.0.0.1', clientport=11436, serverhost='127.0.0.1', serverport=11434, keep_alive_timeout=10):
        self.clienthost = clienthost
        self.clientport = clientport
        self.serverhost = serverhost
        self.serverport = serverport
        self.keep_alive_timeout = keep_alive_timeout
        
    def start(self):
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server.bind((self.clienthost, self.clientport))
        server.listen(5)
        
        logger.info(f"Proxy listening on {self.clienthost}:{self.clientport}")
        
        while True:
            client_sock, addr = server.accept()
            logger.info(f"Accepted connection from {addr}")
            client_thread = threading.Thread(target=self.handle_client, args=(client_sock,))
            client_thread.daemon = True
            client_thread.start()

    def send_keep_alive(self, client_socket):
        keep_alive_response = (
            "HTTP/1.1 100 Continue\r\n"
            "Connection: keep-alive\r\n"
            "Keep-Alive: timeout=5\r\n"
            "\r\n"
        )
        try:
            client_socket.send(keep_alive_response.encode())
            logger.info("Sent keep-alive response to client")
        except Exception as e:
            logger.error(f"Error sending keep-alive: {str(e)}")
            return False
        return True
            
    def handle_client(self, client_socket):
        try:
            # Read the initial request
            request_data = bytearray()
            while b'\r\n\r\n' not in request_data:  # Read until we find end of headers
                chunk = client_socket.recv(8192)
                if not chunk:
                    return
                request_data.extend(chunk)

            # Connect to target server
            server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server_sock.settimeout(self.keep_alive_timeout)
            server_sock.connect((self.serverhost, self.serverport))
            
            # Forward the original request
            server_sock.send(request_data)
            
            def forward(source, destination, is_server_to_client=False):
                last_data_time = time.time()
                try:
                    while True:
                        try:
                            data = source.recv(8192)
                            current_time = time.time()
                            
                            if not data:
                                break
                                
                            if is_server_to_client:
                                last_data_time = current_time
                            
                            destination.send(data)
                            
                            # Check if we need to send keep-alive
                            if (is_server_to_client and 
                                current_time - last_data_time >= self.keep_alive_timeout):
                                if not self.send_keep_alive(destination):
                                  break
                                last_data_time = current_time
                                
                        except socket.timeout:
                            if is_server_to_client:
                                if not self.send_keep_alive(destination):
                                  break
                                self.send_keep_alive(destination)
                                last_data_time = time.time()
                                continue
                            break
                        
                except Exception as e:
                    logger.error(f"Error in forward: {str(e)}")
                finally:
                    try:
                        destination.shutdown(socket.SHUT_WR)
                    except Exception as e:
                        logger.error(f"Error in shutdown: {str(e)}")
            
            client_to_server = threading.Thread(
                target=forward, 
                args=(client_socket, server_sock, False)
            )
            server_to_client = threading.Thread(
                target=forward, 
                args=(server_sock, client_socket, True)
            )
            
            client_to_server.daemon = True
            server_to_client.daemon = True
            
            client_to_server.start()
            server_to_client.start()
            
        except Exception as e:
            logger.error(f"Error handling client: {str(e)}")
            client_socket.close()

if __name__ == "__main__":
    proxy = StreamingProxy(keep_alive_timeout=10)
    try:
        proxy.start()
    except KeyboardInterrupt:
        logger.info("Proxy server stopped")
<!-- gh-comment-id:2542506763 --> @rick-github commented on GitHub (Dec 13, 2024): ```python #!/usr/bin/env python3 import socket import threading import logging import time import io import os import errno logging.basicConfig(level=logging.INFO) #, format="%(threadName)s:%(message)s") logger = logging.getLogger(__name__) class StreamingProxy: def __init__(self, clienthost='127.0.0.1', clientport=11436, serverhost='127.0.0.1', serverport=11434, keep_alive_timeout=10): self.clienthost = clienthost self.clientport = clientport self.serverhost = serverhost self.serverport = serverport self.keep_alive_timeout = keep_alive_timeout def start(self): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind((self.clienthost, self.clientport)) server.listen(5) logger.info(f"Proxy listening on {self.clienthost}:{self.clientport}") while True: client_sock, addr = server.accept() logger.info(f"Accepted connection from {addr}") client_thread = threading.Thread(target=self.handle_client, args=(client_sock,)) client_thread.daemon = True client_thread.start() def send_keep_alive(self, client_socket): keep_alive_response = ( "HTTP/1.1 100 Continue\r\n" "Connection: keep-alive\r\n" "Keep-Alive: timeout=5\r\n" "\r\n" ) try: client_socket.send(keep_alive_response.encode()) logger.info("Sent keep-alive response to client") except Exception as e: logger.error(f"Error sending keep-alive: {str(e)}") return False return True def handle_client(self, client_socket): try: # Read the initial request request_data = bytearray() while b'\r\n\r\n' not in request_data: # Read until we find end of headers chunk = client_socket.recv(8192) if not chunk: return request_data.extend(chunk) # Connect to target server server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_sock.settimeout(self.keep_alive_timeout) server_sock.connect((self.serverhost, self.serverport)) # Forward the original request server_sock.send(request_data) def forward(source, destination, is_server_to_client=False): last_data_time = time.time() try: while True: try: data = source.recv(8192) current_time = time.time() if not data: break if is_server_to_client: last_data_time = current_time destination.send(data) # Check if we need to send keep-alive if (is_server_to_client and current_time - last_data_time >= self.keep_alive_timeout): if not self.send_keep_alive(destination): break last_data_time = current_time except socket.timeout: if is_server_to_client: if not self.send_keep_alive(destination): break self.send_keep_alive(destination) last_data_time = time.time() continue break except Exception as e: logger.error(f"Error in forward: {str(e)}") finally: try: destination.shutdown(socket.SHUT_WR) except Exception as e: logger.error(f"Error in shutdown: {str(e)}") client_to_server = threading.Thread( target=forward, args=(client_socket, server_sock, False) ) server_to_client = threading.Thread( target=forward, args=(server_sock, client_socket, True) ) client_to_server.daemon = True server_to_client.daemon = True client_to_server.start() server_to_client.start() except Exception as e: logger.error(f"Error handling client: {str(e)}") client_socket.close() if __name__ == "__main__": proxy = StreamingProxy(keep_alive_timeout=10) try: proxy.start() except KeyboardInterrupt: logger.info("Proxy server stopped") ```
Sign in to join this conversation.
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: github-starred/ollama#51417