diff --git a/src-tauri/src/http_request.rs b/src-tauri/src/http_request.rs index c465e2e4..64230cbf 100644 --- a/src-tauri/src/http_request.rs +++ b/src-tauri/src/http_request.rs @@ -389,8 +389,8 @@ async fn execute_transaction( } Some(SendableBody::Stream(stream)) => { // Wrap stream with TeeReader to capture data as it's read - // Bounded channel with buffer size of 10 chunks (~10MB) provides backpressure - let (body_chunk_tx, body_chunk_rx) = tokio::sync::mpsc::channel::>(10); + // Use unbounded channel to ensure all data is captured without blocking the HTTP request + let (body_chunk_tx, body_chunk_rx) = tokio::sync::mpsc::unbounded_channel::>(); let tee_reader = TeeReader::new(stream, body_chunk_tx); let pinned: Pin> = Box::pin(tee_reader); @@ -584,7 +584,7 @@ async fn write_stream_chunks_to_db( workspace_id: &str, response_id: &str, update_source: &UpdateSource, - mut rx: tokio::sync::mpsc::Receiver>, + mut rx: tokio::sync::mpsc::UnboundedReceiver>, ) -> Result<()> { let mut buffer = Vec::with_capacity(REQUEST_BODY_CHUNK_SIZE); let mut chunk_index = 0; diff --git a/src-tauri/yaak-http/src/tee_reader.rs b/src-tauri/yaak-http/src/tee_reader.rs index b70372a2..2ee70088 100644 --- a/src-tauri/yaak-http/src/tee_reader.rs +++ b/src-tauri/yaak-http/src/tee_reader.rs @@ -6,14 +6,14 @@ use tokio::sync::mpsc; /// A reader that forwards all read data to a channel while also returning it to the caller. /// This allows capturing request body data as it's being sent. -/// Uses a bounded channel to provide backpressure if the receiver is slow. +/// Uses an unbounded channel to ensure all data is captured without blocking the request. pub struct TeeReader { inner: R, - tx: mpsc::Sender>, + tx: mpsc::UnboundedSender>, } impl TeeReader { - pub fn new(inner: R, tx: mpsc::Sender>) -> Self { + pub fn new(inner: R, tx: mpsc::UnboundedSender>) -> Self { Self { inner, tx } } } @@ -32,21 +32,9 @@ impl AsyncRead for TeeReader { if after_len > before_len { // Data was read, send a copy to the channel let data = buf.filled()[before_len..after_len].to_vec(); - // Use try_send to avoid blocking. If channel is full, we drop the data - // rather than blocking the HTTP request. This provides backpressure - // by slowing down the reader when the DB writer can't keep up. - match self.tx.try_send(data) { - Ok(_) => {} // Successfully sent - Err(mpsc::error::TrySendError::Full(_)) => { - // Channel is full - apply backpressure by returning Pending - // This will cause the reader to be polled again later - cx.waker().wake_by_ref(); - return Poll::Pending; - } - Err(mpsc::error::TrySendError::Closed(_)) => { - // Receiver dropped - continue without capturing - } - } + // Send to unbounded channel - this never blocks + // Ignore error if receiver is closed + let _ = self.tx.send(data); } Poll::Ready(Ok(())) } @@ -66,7 +54,7 @@ mod tests { async fn test_tee_reader_captures_all_data() { let data = b"Hello, World!"; let cursor = Cursor::new(data.to_vec()); - let (tx, mut rx) = mpsc::channel(10); + let (tx, mut rx) = mpsc::unbounded_channel(); let mut tee = TeeReader::new(cursor, tx); let mut output = Vec::new(); @@ -87,7 +75,7 @@ mod tests { async fn test_tee_reader_with_chunked_reads() { let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ"; let cursor = Cursor::new(data.to_vec()); - let (tx, mut rx) = mpsc::channel(10); + let (tx, mut rx) = mpsc::unbounded_channel(); let mut tee = TeeReader::new(cursor, tx); @@ -117,7 +105,7 @@ mod tests { async fn test_tee_reader_empty_data() { let data: Vec = vec![]; let cursor = Cursor::new(data.clone()); - let (tx, mut rx) = mpsc::channel(10); + let (tx, mut rx) = mpsc::unbounded_channel(); let mut tee = TeeReader::new(cursor, tx); let mut output = Vec::new(); @@ -134,7 +122,7 @@ mod tests { async fn test_tee_reader_works_when_receiver_dropped() { let data = b"Hello, World!"; let cursor = Cursor::new(data.to_vec()); - let (tx, rx) = mpsc::channel(10); + let (tx, rx) = mpsc::unbounded_channel(); // Drop the receiver before reading drop(rx); @@ -152,7 +140,7 @@ mod tests { // Test with 1MB of data let data: Vec = (0..1024 * 1024).map(|i| (i % 256) as u8).collect(); let cursor = Cursor::new(data.clone()); - let (tx, mut rx) = mpsc::channel(100); + let (tx, mut rx) = mpsc::unbounded_channel(); let mut tee = TeeReader::new(cursor, tx); let mut output = Vec::new();