Switch back to unbounded channel

This commit is contained in:
Gregory Schier
2025-12-28 08:41:56 -08:00
parent e32930034d
commit ba00274045
2 changed files with 14 additions and 26 deletions

View File

@@ -389,8 +389,8 @@ async fn execute_transaction<R: Runtime>(
}
Some(SendableBody::Stream(stream)) => {
// Wrap stream with TeeReader to capture data as it's read
// Bounded channel with buffer size of 10 chunks (~10MB) provides backpressure
let (body_chunk_tx, body_chunk_rx) = tokio::sync::mpsc::channel::<Vec<u8>>(10);
// Use unbounded channel to ensure all data is captured without blocking the HTTP request
let (body_chunk_tx, body_chunk_rx) = tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
let tee_reader = TeeReader::new(stream, body_chunk_tx);
let pinned: Pin<Box<dyn AsyncRead + Send + 'static>> = Box::pin(tee_reader);
@@ -584,7 +584,7 @@ async fn write_stream_chunks_to_db<R: Runtime>(
workspace_id: &str,
response_id: &str,
update_source: &UpdateSource,
mut rx: tokio::sync::mpsc::Receiver<Vec<u8>>,
mut rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
) -> Result<()> {
let mut buffer = Vec::with_capacity(REQUEST_BODY_CHUNK_SIZE);
let mut chunk_index = 0;

View File

@@ -6,14 +6,14 @@ use tokio::sync::mpsc;
/// A reader that forwards all read data to a channel while also returning it to the caller.
/// This allows capturing request body data as it's being sent.
/// Uses a bounded channel to provide backpressure if the receiver is slow.
/// Uses an unbounded channel to ensure all data is captured without blocking the request.
pub struct TeeReader<R> {
inner: R,
tx: mpsc::Sender<Vec<u8>>,
tx: mpsc::UnboundedSender<Vec<u8>>,
}
impl<R> TeeReader<R> {
pub fn new(inner: R, tx: mpsc::Sender<Vec<u8>>) -> Self {
pub fn new(inner: R, tx: mpsc::UnboundedSender<Vec<u8>>) -> Self {
Self { inner, tx }
}
}
@@ -32,21 +32,9 @@ impl<R: AsyncRead + Unpin> AsyncRead for TeeReader<R> {
if after_len > before_len {
// Data was read, send a copy to the channel
let data = buf.filled()[before_len..after_len].to_vec();
// Use try_send to avoid blocking. If channel is full, we drop the data
// rather than blocking the HTTP request. This provides backpressure
// by slowing down the reader when the DB writer can't keep up.
match self.tx.try_send(data) {
Ok(_) => {} // Successfully sent
Err(mpsc::error::TrySendError::Full(_)) => {
// Channel is full - apply backpressure by returning Pending
// This will cause the reader to be polled again later
cx.waker().wake_by_ref();
return Poll::Pending;
}
Err(mpsc::error::TrySendError::Closed(_)) => {
// Receiver dropped - continue without capturing
}
}
// Send to unbounded channel - this never blocks
// Ignore error if receiver is closed
let _ = self.tx.send(data);
}
Poll::Ready(Ok(()))
}
@@ -66,7 +54,7 @@ mod tests {
async fn test_tee_reader_captures_all_data() {
let data = b"Hello, World!";
let cursor = Cursor::new(data.to_vec());
let (tx, mut rx) = mpsc::channel(10);
let (tx, mut rx) = mpsc::unbounded_channel();
let mut tee = TeeReader::new(cursor, tx);
let mut output = Vec::new();
@@ -87,7 +75,7 @@ mod tests {
async fn test_tee_reader_with_chunked_reads() {
let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
let cursor = Cursor::new(data.to_vec());
let (tx, mut rx) = mpsc::channel(10);
let (tx, mut rx) = mpsc::unbounded_channel();
let mut tee = TeeReader::new(cursor, tx);
@@ -117,7 +105,7 @@ mod tests {
async fn test_tee_reader_empty_data() {
let data: Vec<u8> = vec![];
let cursor = Cursor::new(data.clone());
let (tx, mut rx) = mpsc::channel(10);
let (tx, mut rx) = mpsc::unbounded_channel();
let mut tee = TeeReader::new(cursor, tx);
let mut output = Vec::new();
@@ -134,7 +122,7 @@ mod tests {
async fn test_tee_reader_works_when_receiver_dropped() {
let data = b"Hello, World!";
let cursor = Cursor::new(data.to_vec());
let (tx, rx) = mpsc::channel(10);
let (tx, rx) = mpsc::unbounded_channel();
// Drop the receiver before reading
drop(rx);
@@ -152,7 +140,7 @@ mod tests {
// Test with 1MB of data
let data: Vec<u8> = (0..1024 * 1024).map(|i| (i % 256) as u8).collect();
let cursor = Cursor::new(data.clone());
let (tx, mut rx) = mpsc::channel(100);
let (tx, mut rx) = mpsc::unbounded_channel();
let mut tee = TeeReader::new(cursor, tx);
let mut output = Vec::new();