diff --git a/src/http/H3Client.rs b/src/http/H3Client.rs index 384647ff5bf..ba62a84a9a1 100644 --- a/src/http/H3Client.rs +++ b/src/http/H3Client.rs @@ -54,6 +54,15 @@ pub static live_sessions: AtomicU32 = AtomicU32::new(0); pub static live_streams: AtomicU32 = AtomicU32::new(0); pub use live_sessions as LIVE_SESSIONS; pub use live_streams as LIVE_STREAMS; +/// Response-body bytes delivered to the JS side across all h3 streams +/// since process start. Surfaced through `fetchH3Internals.liveCounts()` +/// as `bodyBytesReceived` so `fetch-backpressure.test.ts` can observe a +/// plateau from a subprocess when `lsquic_stream_wantread(0)` takes +/// effect — the test can't see per-stream state, and this counter +/// stalls exactly when the h3 read does. +#[allow(non_upper_case_globals)] +pub static body_bytes_received: core::sync::atomic::AtomicU64 = + core::sync::atomic::AtomicU64::new(0); // H3TestingAPIs lives in bun_http_jsc and is accessed via the // extension-trait pattern there. diff --git a/src/http/HTTPThread.rs b/src/http/HTTPThread.rs index ad7ede90683..c38efadcc1a 100644 --- a/src/http/HTTPThread.rs +++ b/src/http/HTTPThread.rs @@ -123,11 +123,13 @@ pub struct HttpThread { pub queued_shutdowns: Vec, pub queued_writes: Vec, pub queued_response_body_drains: Vec, + pub queued_response_body_consumed: Vec, pub queued_cert_check_resumes: Vec, pub queued_shutdowns_lock: Mutex, pub queued_writes_lock: Mutex, pub queued_response_body_drains_lock: Mutex, + pub queued_response_body_consumed_lock: Mutex, pub queued_cert_check_resumes_lock: Mutex, pub queued_threadlocal_proxy_derefs: Vec<*mut ProxyTunnel>, @@ -178,10 +180,12 @@ impl HttpThread { queued_shutdowns: Vec::new(), queued_writes: Vec::new(), queued_response_body_drains: Vec::new(), + queued_response_body_consumed: Vec::new(), queued_cert_check_resumes: Vec::new(), queued_shutdowns_lock: Mutex::new(), queued_writes_lock: Mutex::new(), queued_response_body_drains_lock: Mutex::new(), + queued_response_body_consumed_lock: Mutex::new(), queued_cert_check_resumes_lock: Mutex::new(), queued_threadlocal_proxy_derefs: Vec::new(), has_awoken: AtomicBool::new(false), @@ -271,6 +275,11 @@ pub struct DrainMessage { pub async_http_id: u32, } +pub struct ConsumeMessage { + pub async_http_id: u32, + pub bytes: u32, +} + pub struct ShutdownMessage { pub async_http_id: u32, } @@ -860,9 +869,62 @@ impl HttpThread { } } + fn drain_queued_http_response_body_consumed(&mut self) { + loop { + let queued = { + let _guard = self.queued_response_body_consumed_lock.lock_guard(); + core::mem::take(&mut self.queued_response_body_consumed) + }; + + for msg in &queued { + if let Some(socket_ptr) = abort_tracker().get(&msg.async_http_id) { + match *socket_ptr { + uws::AnySocket::SocketTls(socket) => { + let tagged = HTTPContext::::get_tagged_from_socket(socket); + if let Some(client) = tagged.client_mut() { + // HTTP/1.1: may resume a paused socket read. + client.consume_response_body::(socket, msg.bytes); + } + if let Some(session) = tagged.session_mut() { + // HTTP/2: releases per-stream WINDOW_UPDATE. + session + .consume_response_body_by_http_id(msg.async_http_id, msg.bytes); + } + } + uws::AnySocket::SocketTcp(socket) => { + let tagged = HTTPContext::::get_tagged_from_socket(socket); + if let Some(client) = tagged.client_mut() { + client.consume_response_body::(socket, msg.bytes); + } + if let Some(session) = tagged.session_mut() { + session + .consume_response_body_by_http_id(msg.async_http_id, msg.bytes); + } + } + } + } else { + // HTTP/3: QUIC streams aren't in the TCP-socket tracker; + // dispatch via the session registry. May resume a + // lsquic `want_read(false)` pause. + h3::ClientContext::consume_response_body_by_http_id( + msg.async_http_id, + msg.bytes, + ); + } + } + let len = queued.len(); + drop(queued); + if len == 0 { + break; + } + bun_core::scoped_log!(HTTPThread, "drained {} queued consumes", len); + } + } + pub fn drain_events(&mut self) { // Process any pending writes **before** aborting. self.drain_queued_http_response_body_drains(); + self.drain_queued_http_response_body_consumed(); self.drain_queued_writes(); self.drain_queued_shutdowns(); // After shutdowns: an abort or cert-rejection scheduled in the same JS @@ -973,6 +1035,41 @@ impl HttpThread { self.wakeup(); } + /// JS-thread → HTTP-thread notice that the `ReadableStream` reader for + /// `async_http_id` has drained `bytes` from its buffer. Consecutive + /// messages for the same id are coalesced under the lock so a tight + /// `read()` loop posts one entry per wake instead of one per pull. + pub fn schedule_response_body_consumed(&mut self, async_http_id: u32, bytes: usize) { + let n: u32 = u32::try_from(bytes).unwrap_or(u32::MAX); + if n == 0 { + return; + } + let appended = { + let _guard = self.queued_response_body_consumed_lock.lock_guard(); + match self.queued_response_body_consumed.last_mut() { + Some(last) if last.async_http_id == async_http_id => { + last.bytes = last.bytes.saturating_add(n); + false + } + _ => { + self.queued_response_body_consumed.push(ConsumeMessage { + async_http_id, + bytes: n, + }); + true + } + } + }; + // Only wake on the append path: if we coalesced into an existing + // entry, the HTTP thread was already woken for that entry's + // original append and will see the updated `bytes` when it swaps + // the queue under the same lock. A tight `read()` loop over a + // multi-GiB body otherwise issues one eventfd write per pull. + if appended { + self.wakeup(); + } + } + pub fn schedule_shutdown(&mut self, http: &AsyncHttp) { bun_core::scoped_log!(HTTPThread, "scheduleShutdown {}", http.async_http_id); { diff --git a/src/http/InternalState.rs b/src/http/InternalState.rs index 4daf7442d72..58fdfeb1ec0 100644 --- a/src/http/InternalState.rs +++ b/src/http/InternalState.rs @@ -36,6 +36,17 @@ pub struct InternalState<'a> { pub compressed_body: MutableString, pub content_length: Option, pub total_body_received: usize, + /// Body bytes delivered to the JS thread (counted as `body_out_str` + /// delta, i.e. post-dechunk/post-decompress) that the ByteStream + /// hasn't yet reported drained via `schedule_response_body_consumed`. + /// Feeds `maybe_pause_receive`/`consume_response_body` so an h1 + /// socket read stalls once the JS reader falls behind + /// `RECEIVE_BODY_HIGH_WATER`. Only meaningful while + /// `Signals::BodyConsumptionTracked` is armed. Decrement is + /// saturating (the reader.cancel sentinel is `u32::MAX`, and + /// `ByteStream::drain()` can credit pre-arming bytes that were never + /// counted here). + pub outstanding_body_bytes: usize, // Self-borrow into `original_request_body.bytes`; `RawSlice` carries the // outlives-holder invariant (the backing `original_request_body` is a // sibling field, so it lives exactly as long as this struct). @@ -74,6 +85,14 @@ pub struct InternalStateFlags { /// check passed (and implicitly by `InternalState::reset()` on every /// redirect hop / failure, so each hop re-parks independently). pub is_waiting_for_cert_check: bool, + /// `maybe_pause_receive` called `socket.pause_stream()` and cleared + /// the idle timer. Every true→false transition goes through + /// `H1_SOCKET_RESUMES.fetch_add` (including the done/redirect and + /// keep-alive-release resumes) so the test-visible + /// `h1BackpressureCounts()` stays balanced; the pooled socket's + /// uSockets `is_paused` bit survives `reset()`, so a paused socket + /// handed back to the pool would hang its next borrower. + pub receive_paused: bool, } impl InternalStateFlags { @@ -88,6 +107,7 @@ impl InternalStateFlags { resend_request_body_on_redirect: false, clear_hostname_on_redirect: false, is_waiting_for_cert_check: false, + receive_paused: false, } } } @@ -116,6 +136,7 @@ impl Default for InternalState<'_> { compressed_body: MutableString::init_empty(), content_length: None, total_body_received: 0, + outstanding_body_bytes: 0, request_body: bun_ptr::RawSlice::EMPTY, original_request_body: HTTPRequestBody::Bytes(b""), request_sent_len: 0, diff --git a/src/http/Signals.rs b/src/http/Signals.rs index 7ab91b78927..aa9ec22bc97 100644 --- a/src/http/Signals.rs +++ b/src/http/Signals.rs @@ -7,6 +7,17 @@ pub struct Signals { // PORTING.md); the `Store` outlives every `Signals` derived from it. pub header_progress: Option>, pub response_body_streaming: Option>, + /// Distinct from `response_body_streaming`: set only while a JS + /// consumer is wired to report drained bytes via + /// `schedule_response_body_consumed`. `response_body_streaming` is + /// also set by paths that never report consumption (S3 streaming + /// download, abandoned bodies via `ignore_remaining_response_body`); + /// gating flow-control on that would deadlock those streams. All + /// three transports key receive-side backpressure on this signal — + /// not `response_body_streaming` — to decide whether flow control + /// is consumption-gated or receipt-based (h1 `maybe_pause_receive`, + /// h2 `replenish_window`, h3 `on_stream_data`). + pub body_consumption_tracked: Option>, pub aborted: Option>, pub cert_errors: Option>, pub upgraded: Option>, @@ -16,6 +27,7 @@ impl Signals { pub fn is_empty(&self) -> bool { self.aborted.is_none() && self.response_body_streaming.is_none() + && self.body_consumption_tracked.is_none() && self.header_progress.is_none() && self.cert_errors.is_none() && self.upgraded.is_none() @@ -38,6 +50,7 @@ impl Signals { let ptr: NonNull = match field { Field::HeaderProgress => self.header_progress, Field::ResponseBodyStreaming => self.response_body_streaming, + Field::BodyConsumptionTracked => self.body_consumption_tracked, Field::Aborted => self.aborted, Field::CertErrors => self.cert_errors, Field::Upgraded => self.upgraded, @@ -61,6 +74,7 @@ impl Signals { pub struct Store { pub header_progress: AtomicBool, pub response_body_streaming: AtomicBool, + pub body_consumption_tracked: AtomicBool, pub aborted: AtomicBool, pub cert_errors: AtomicBool, pub upgraded: AtomicBool, @@ -71,6 +85,7 @@ impl Default for Store { Self { header_progress: AtomicBool::new(false), response_body_streaming: AtomicBool::new(false), + body_consumption_tracked: AtomicBool::new(false), aborted: AtomicBool::new(false), cert_errors: AtomicBool::new(false), upgraded: AtomicBool::new(false), @@ -83,6 +98,7 @@ impl Store { Signals { header_progress: Some(NonNull::from(&self.header_progress)), response_body_streaming: Some(NonNull::from(&self.response_body_streaming)), + body_consumption_tracked: Some(NonNull::from(&self.body_consumption_tracked)), aborted: Some(NonNull::from(&self.aborted)), cert_errors: Some(NonNull::from(&self.cert_errors)), upgraded: Some(NonNull::from(&self.upgraded)), @@ -95,6 +111,7 @@ impl Store { pub enum Field { HeaderProgress, ResponseBodyStreaming, + BodyConsumptionTracked, Aborted, CertErrors, Upgraded, diff --git a/src/http/h2_client/ClientSession.rs b/src/http/h2_client/ClientSession.rs index d798f4276a6..88cf2e71bda 100644 --- a/src/http/h2_client/ClientSession.rs +++ b/src/http/h2_client/ClientSession.rs @@ -548,6 +548,40 @@ impl ClientSession { } } + /// HTTP-thread wake-up from `schedule_response_body_consumed`: the JS + /// reader drained `bytes` from the ByteStream. Bump the stream's + /// consumption counter and release any per-stream window credit that has + /// become available. + pub fn consume_response_body_by_http_id(&mut self, async_http_id: u32, bytes: u32) { + let _guard = self.ref_scope(); + let mut found = false; + for &stream in self.streams.values() { + let s = stream_mut(stream); + let Some(client) = s.client_mut() else { + continue; + }; + if client.async_http_id != async_http_id { + continue; + } + // `bytes` is decompressed; clamp the running total to wire bytes + // still outstanding so a compression surplus isn't banked to + // credit later DATA the reader hasn't touched. + s.consumed_bytes = + core::cmp::min(s.consumed_bytes.saturating_add(bytes), s.unacked_bytes); + found = true; + break; + } + if !found { + return; + } + self.replenish_window(); + if self.write_buffer.is_not_empty() { + if let Err(err) = self.flush() { + self.fail_all(err); + } + } + } + /// HTTP-thread wake-up from `scheduleRequestWrite`: new body bytes (or /// end-of-body) are available in the ThreadSafeStreamBuffer. pub fn stream_body_by_http_id(&mut self, async_http_id: u32, ended: bool) { @@ -595,21 +629,44 @@ impl ClientSession { fn replenish_window(&mut self) { let threshold = LOCAL_INITIAL_WINDOW_SIZE / 2; + // Connection-level credit stays receipt-based so one stream whose JS + // reader is stalled doesn't starve siblings of the shared window. if self.conn_unacked_bytes >= threshold { self.write_window_update(0, self.conn_unacked_bytes); self.conn_unacked_bytes = 0; } - // Collect (id, unacked) pairs before mutating self. + // Collect (id, credit) pairs before mutating self. let mut updates: Vec<(u32, u32)> = Vec::new(); for &s in self.streams.values() { let s = stream_mut(s); - if s.unacked_bytes >= threshold && !s.remote_closed() { - updates.push((s.id, s.unacked_bytes)); - s.unacked_bytes = 0; + if s.remote_closed() { + continue; + } + // `body_consumption_tracked` is set only while a JS consumer is + // reporting drained bytes via `schedule_response_body_consumed` + // (fetch `res.body` with a `drain_handler` wired). It is *not* + // set for buffering consumers (`await res.text()`), S3 streaming + // downloads, or once the body is abandoned via + // `ignore_remaining_response_body` — those stay receipt-based so + // the transfer completes. When tracked, credit only what JS has + // actually drained, clamped to wire bytes received so a + // decompressed body can't inflate the window past what was sent. + let tracked = s + .client_mut() + .is_some_and(|c| c.signals.get(signals::Field::BodyConsumptionTracked)); + let avail = if tracked { + core::cmp::min(s.consumed_bytes, s.unacked_bytes) + } else { + s.unacked_bytes + }; + if avail >= threshold { + updates.push((s.id, avail)); + s.unacked_bytes -= avail; + s.consumed_bytes = s.consumed_bytes.saturating_sub(avail); } } - for (id, unacked) in updates { - self.write_window_update(id, unacked); + for (id, avail) in updates { + self.write_window_update(id, avail); } // PERF: could iterate and write in one pass; profile if extra Vec matters. } diff --git a/src/http/h2_client/Stream.rs b/src/http/h2_client/Stream.rs index e9dc4ad1b9b..c241c7ac66b 100644 --- a/src/http/h2_client/Stream.rs +++ b/src/http/h2_client/Stream.rs @@ -52,8 +52,17 @@ pub struct Stream { /// or final status arrives. pub awaiting_continue: bool, pub fatal_error: Option, - /// DATA bytes consumed since the last WINDOW_UPDATE for this stream. + /// DATA bytes received since the last per-stream WINDOW_UPDATE. For + /// consumers without `body_consumption_tracked` set this alone drives + /// the credit; for tracked consumers it is the ceiling on what + /// `consumed_bytes` may release. pub unacked_bytes: u32, + /// Bytes the JS `ReadableStream` reader has actually drained, reported + /// via `schedule_response_body_consumed`. Only consulted when + /// `body_consumption_tracked` is true; `replenish_window` credits + /// `min(consumed_bytes, unacked_bytes)` so a stalled reader withholds + /// the per-stream window and a compressed body can't over-credit. + pub consumed_bytes: u32, /// Σ DATA payload bytes (post-padding) for §8.1.1 Content-Length check — /// `total_body_received` is clamped at content_length so it can't catch /// overshoot. @@ -142,6 +151,7 @@ impl Stream { awaiting_continue: false, fatal_error: None, unacked_bytes: 0, + consumed_bytes: 0, data_bytes_received: 0, send_window, pending_body: bun_ptr::RawSlice::EMPTY, diff --git a/src/http/h3_client/ClientContext.rs b/src/http/h3_client/ClientContext.rs index 0bae72b83ad..08d7d576ae5 100644 --- a/src/http/h3_client/ClientContext.rs +++ b/src/http/h3_client/ClientContext.rs @@ -224,4 +224,21 @@ impl ClientContext { session_mut(s).stream_body_by_http_id(async_http_id, ended); } } + + /// HTTP-thread dispatch for `schedule_response_body_consumed` when the + /// `async_http_id` isn't in the TCP-socket abort tracker — QUIC streams + /// never are. May resume an lsquic `want_read(false)` pause. + pub fn consume_response_body_by_http_id(async_http_id: u32, bytes: u32) { + let Some(this) = Self::get() else { + return; + }; + // See `abort_by_http_id` — `BackRef` over the process-lifetime singleton. + let ctx = bun_ptr::BackRef::from(this); + for &s in ctx.sessions.iter() { + // Registry only holds live sessions — `session_mut` upgrade. + if session_mut(s).consume_response_body_by_http_id(async_http_id, bytes) { + return; + } + } + } } diff --git a/src/http/h3_client/ClientSession.rs b/src/http/h3_client/ClientSession.rs index 2a490435d91..60caec7e5c0 100644 --- a/src/http/h3_client/ClientSession.rs +++ b/src/http/h3_client/ClientSession.rs @@ -143,6 +143,43 @@ impl ClientSession { } } + /// HTTP-thread wake-up from `schedule_response_body_consumed`: the JS + /// reader drained `bytes` from the ByteStream. Decrement the outstanding + /// count and, if the stream's `on_read` was paused for backpressure, + /// re-enable it so lsquic resumes draining the QUIC receive buffer and + /// issuing `MAX_STREAM_DATA` credit. + pub fn consume_response_body_by_http_id(&mut self, async_http_id: u32, bytes: u32) -> bool { + for &stream_ptr in self.pending.iter() { + let stream = stream_mut(stream_ptr); + let Some(client) = stream.client else { + continue; + }; + let client = client_mut(client); + if client.async_http_id != async_http_id { + continue; + } + stream.outstanding_body_bytes = + stream.outstanding_body_bytes.saturating_sub(bytes as usize); + if !stream.read_paused { + return true; + } + let should_resume = stream.outstanding_body_bytes <= crate::RECEIVE_BODY_LOW_WATER + || !client.signals.get(Signal::BodyConsumptionTracked); + if !should_resume { + return true; + } + stream.read_paused = false; + if let Some(qs) = stream.qstream_mut() { + qs.want_read(true); + } + // lsquic's `process_conns` runs from the loop's post-tick hook, + // so the re-enabled `on_read` fires on the very next iteration + // (this handler runs from `drain_events`, before that tick). + return true; + } + false + } + pub(super) fn detach(&mut self, stream: *mut Stream) { let st = stream_mut(stream); if let Some(cl) = st.client { diff --git a/src/http/h3_client/Stream.rs b/src/http/h3_client/Stream.rs index d0b99863331..7873ffc3cd5 100644 --- a/src/http/h3_client/Stream.rs +++ b/src/http/h3_client/Stream.rs @@ -36,6 +36,12 @@ pub struct Stream { pub request_body_done: bool, pub is_streaming_body: bool, pub headers_delivered: bool, + /// Wire bytes delivered to JS via `deliver()` that haven't been reported + /// drained via `schedule_response_body_consumed`. Once over + /// `RECEIVE_BODY_HIGH_WATER` we stop `lsquic_stream_wantread` so lsquic + /// withholds `MAX_STREAM_DATA` and the server backpressures. + pub outstanding_body_bytes: usize, + pub read_paused: bool, } impl Stream { @@ -54,6 +60,8 @@ impl Stream { request_body_done: false, is_streaming_body: false, headers_delivered: false, + outstanding_body_bytes: 0, + read_paused: false, })) } diff --git a/src/http/h3_client/callbacks.rs b/src/http/h3_client/callbacks.rs index 941b4f4d5a0..dcda0178d0f 100644 --- a/src/http/h3_client/callbacks.rs +++ b/src/http/h3_client/callbacks.rs @@ -260,7 +260,42 @@ extern "C" fn on_stream_data(s: *mut quic::Stream, data: *const u8, len: c_uint, // SAFETY: lsquic guarantees `data` points to `len` valid bytes (or `(null,0)`). let slice = unsafe { bun_core::ffi::slice(data, len as usize) }; stream.body_buffer.extend_from_slice(slice); + if len > 0 { + let _ = H3::body_bytes_received + .fetch_add(u64::from(len), core::sync::atomic::Ordering::Relaxed); + } stream.session_mut().deliver(stream, fin != 0); + // `deliver` may have detached (fin/error); re-resolve from the quic + // stream's ext slot before touching the Stream again. + let Some(still) = stream_of(s) else { return }; + if fin != 0 || still.read_paused { + return; + } + let Some(client) = still.client else { return }; + let client = super::client_session::client_mut(client); + // Only count bytes that arrived while a JS reader is wired to + // report consumption. Pre-armed bytes would otherwise become a + // permanent floor under `outstanding_body_bytes` (their `didDrain` + // credit saturates against nothing counted), and if that floor is + // above `RECEIVE_BODY_LOW_WATER` the first want_read(false) pause + // never resumes. Matches h1's `maybe_pause_receive` early-return. + if !client + .signals + .get(crate::signals::Field::BodyConsumptionTracked) + { + return; + } + still.outstanding_body_bytes = still.outstanding_body_bytes.saturating_add(len as usize); + if still.outstanding_body_bytes < crate::RECEIVE_BODY_HIGH_WATER { + return; + } + still.read_paused = true; + s.want_read(false); + bun_core::scoped_log!( + h3_client, + "stream read paused at {} bytes outstanding", + still.outstanding_body_bytes + ); } extern "C" fn on_stream_writable(s: *mut quic::Stream) { diff --git a/src/http/lib.rs b/src/http/lib.rs index 82ba943c9e1..f840a3a327d 100644 --- a/src/http/lib.rs +++ b/src/http/lib.rs @@ -281,6 +281,26 @@ pub static OVERRIDDEN_DEFAULT_USER_AGENT: std::sync::OnceLock<&'static [u8]> = /// [`SocketTimeout::set_timeout`]), so they round up to the next whole minute. pub static IDLE_TIMEOUT_SECONDS: AtomicU32 = AtomicU32::new(300); +/// h1 `maybe_pause_receive` thresholds: once `body_out_str` bytes +/// delivered to the JS thread but not yet reported consumed by +/// `ByteStream::did_drain` reach `HIGH_WATER`, the socket read is +/// paused (TCP rwnd collapses); it resumes below `LOW_WATER` or when +/// `body_consumption_tracked` is disarmed. h3 uses the same marks to +/// gate `lsquic_stream_wantread`. 4 MiB / 1 MiB were chosen to keep +/// the pause/resume overhead on a 2 GiB fast-drain transfer under +/// ~20% (see `node-http-backpressure.test.ts`) while still bounding a +/// stalled stream to single-digit MiB. +pub const RECEIVE_BODY_HIGH_WATER: usize = 4 << 20; +pub const RECEIVE_BODY_LOW_WATER: usize = 1 << 20; + +/// Process-wide pause/resume counts for h1 sockets — incremented at +/// every `receive_paused` transition. Surfaced through +/// `fetchInternals.h1BackpressureCounts()` so the backpressure test can +/// assert pause/resume deterministically from a client subprocess +/// without depending on kernel loopback autotuning. +pub static H1_SOCKET_PAUSES: AtomicU32 = AtomicU32::new(0); +pub static H1_SOCKET_RESUMES: AtomicU32 = AtomicU32::new(0); + /// Safe accessor for [`IDLE_TIMEOUT_SECONDS`]. #[inline] pub fn idle_timeout_seconds() -> c_uint { @@ -3411,8 +3431,18 @@ impl<'a> HTTPClient<'a> { self.handle_on_data_headers::(incoming_data, ctx, socket); } ResponseStage::Body => { - self.set_timeout(&socket); + // Don't re-arm the idle timer while the reader has + // intentionally stalled: maybe_pause_receive cleared it on + // the false→true transition, but uSockets' repeat-recv + // fast path can land more on_data calls in the same + // epoll tick after pause_stream(), and re-arming here + // would time out a reader that stalls past + // `idle_timeout_seconds`. + if !self.state.flags.receive_paused { + self.set_timeout(&socket); + } + let before = self.body_out_str().map_or(0, |b| b.list.len()); let report_progress = match self.handle_response_body(incoming_data, false) { Ok(b) => b, Err(err) => { @@ -3421,14 +3451,39 @@ impl<'a> HTTPClient<'a> { } }; + // Must run before progress_update: a final chunk makes + // on_async_http_callback destroy the ThreadlocalAsyncHTTP + // that owns *this* HTTPClient inline on this thread, so + // `self` is poisoned by the time progress_update returns. + // + // Count the `body_out_str` delta — i.e. exactly what + // progress_update hands to FetchTasklet and onward to + // `ByteStream.onData` → `didDrain`. `incoming_data.len()` + // would include Transfer-Encoding: chunked framing; + // `total_body_received` is pre-decompression wire bytes. + // Either mismatch accumulates as a permanent floor under + // `outstanding_body_bytes` (framing for uncompressed + // chunked SSE; stored-block overhead for gzip on + // incompressible content) and eventually deadlocks a + // long-lived stream on the first pause past the low-water + // mark. For `body_consumption_tracked` consumers + // `report_progress` is true for every non-empty chunk and + // the callback resets `body_out_str`, so `before` is 0 + // and the delta is this chunk's post-decompress length. + let after = self.body_out_str().map_or(0, |b| b.list.len()); + self.maybe_pause_receive::(socket, after.saturating_sub(before)); + if report_progress { self.progress_update::(ctx, socket); return; } } ResponseStage::BodyChunk => { - self.set_timeout(&socket); + if !self.state.flags.receive_paused { + self.set_timeout(&socket); + } + let before = self.body_out_str().map_or(0, |b| b.list.len()); let report_progress = match self.handle_response_body_chunked_encoding(incoming_data) { Ok(b) => b, @@ -3438,6 +3493,9 @@ impl<'a> HTTPClient<'a> { } }; + let after = self.body_out_str().map_or(0, |b| b.list.len()); + self.maybe_pause_receive::(socket, after.saturating_sub(before)); + if report_progress { self.progress_update::(ctx, socket); return; @@ -3591,6 +3649,122 @@ impl<'a> HTTPClient<'a> { socket.set_timeout(idle_timeout_seconds()); } + /// Ran from `on_data`'s `.body`/`.body_chunk` arms right after + /// `handle_response_body*`. Counts newly-decoded body bytes towards + /// `outstanding_body_bytes` — the same currency `didDrain` credits — + /// and, if the JS reader is reporting consumption and the outstanding + /// total has crossed the high-water mark, pauses the socket read so + /// TCP rwnd backpressures the server. The proxy-tunnel path is + /// excluded: its inner TLS session needs the socket readable to + /// complete handshake/close_notify regardless of the application + /// body, and pausing the carrier socket would wedge that. + fn maybe_pause_receive(&mut self, socket: HttpSocket, n: usize) { + if !self.signals.get(signals::Field::BodyConsumptionTracked) { + return; + } + if self.state.stage == Stage::Done || self.state.stage == Stage::Fail { + return; + } + self.state.outstanding_body_bytes = self.state.outstanding_body_bytes.saturating_add(n); + // handle_response_body just ran: if the body is complete (or a + // redirect is pending) the socket is about to be released to the + // keep-alive pool — or closed — inside progress_update. Leaving it + // paused would hand the next pooled request a socket with + // LIBUS_SOCKET_READABLE cleared and no consume_response_body to + // re-enable it. + // + // We can reach here with `receive_paused` already true because + // uSockets' repeat-recv fast path (loop.c) keeps calling recv() in + // the same epoll tick while the buffer comes back full, without + // re-consulting poll flags; a us_socket_pause() issued from a + // previous on_data in that loop doesn't stop the next recv(). The + // final chunk can therefore arrive while receive_paused is set. + if self.state.is_done() || self.state.flags.is_redirect_pending { + if self.state.flags.receive_paused { + self.state.flags.receive_paused = false; + let _ = H1_SOCKET_RESUMES.fetch_add(1, Ordering::Relaxed); + if !socket.is_closed_or_has_error() { + let _ = socket.resume_stream(); + } + } + return; + } + if self.state.flags.receive_paused { + return; + } + if self.state.outstanding_body_bytes < RECEIVE_BODY_HIGH_WATER { + return; + } + if self.proxy_tunnel.is_some() { + return; + } + if socket.is_closed_or_has_error() { + return; + } + self.state.flags.receive_paused = true; + // While paused, the idle timer would otherwise fire with no socket + // activity to re-arm it. The response isn't "stalled" — the reader + // chose not to pull — so drop the timeout until we resume. + // `socket.set_timeout(0)` clears both the short-tick and + // long-minute timers (whichever `idle_timeout_seconds` selected). + socket.set_timeout(0); + let _ = socket.pause_stream(); + let _ = H1_SOCKET_PAUSES.fetch_add(1, Ordering::Relaxed); + bun_core::scoped_log!( + fetch, + "maybePauseReceive: paused at {} bytes outstanding", + self.state.outstanding_body_bytes + ); + } + + /// HTTP-thread wake-up from `schedule_response_body_consumed`: the JS + /// reader drained `bytes` from the ByteStream. For HTTP/1.1 this may + /// resume a socket read that `maybe_pause_receive` paused. HTTP/2 and + /// HTTP/3 never reach this function — the HTTPThread consume drain + /// routes them to their dedicated session-level handlers + /// (`consume_response_body_by_http_id`) instead. + pub fn consume_response_body( + &mut self, + socket: HttpSocket, + bytes: u32, + ) { + // Both sides count the `body_out_str` delta so increment and + // decrement are the same currency; the saturating subtract is + // still needed for the `u32::MAX` sentinel from + // `ignore_remaining_response_body` and for `ByteStream.drain()` + // reporting bytes that arrived before `body_consumption_tracked` + // was armed (those were never counted here because the first + // line of maybe_pause_receive early-returns on the unset signal). + self.state.outstanding_body_bytes = self + .state + .outstanding_body_bytes + .saturating_sub(bytes as usize); + if !self.state.flags.receive_paused { + return; + } + // Disarming `body_consumption_tracked` (e.g. reader.cancel()) posts + // a saturating sentinel; treat that as "resume now" regardless of + // the low-water mark so the abandoned body can drain for keep-alive + // reuse. + let should_resume = self.state.outstanding_body_bytes <= RECEIVE_BODY_LOW_WATER + || !self.signals.get(signals::Field::BodyConsumptionTracked); + if !should_resume { + return; + } + self.state.flags.receive_paused = false; + let _ = H1_SOCKET_RESUMES.fetch_add(1, Ordering::Relaxed); + if socket.is_closed_or_has_error() { + return; + } + let _ = socket.resume_stream(); + self.set_timeout(&socket); + bun_core::scoped_log!( + fetch, + "consumeResponseBody: resumed, {} bytes outstanding", + self.state.outstanding_body_bytes + ); + } + pub fn drain_response_body(&mut self, socket: HttpSocket) { // Find out if we should not send any update. match self.state.stage { @@ -3715,6 +3889,25 @@ impl<'a> HTTPClient<'a> { _ => true, }; + // Defensive: maybe_pause_receive's is_done() branch resumes for + // the normal .body/.body_chunk on_data path, but a + // close-delimited body or an early server reply can reach + // here without another on_data. The uSockets `is_paused` flag + // survives state.reset(), so a paused socket released to the + // pool would hang the next request that adopts it. Lives + // above the keepalive check so `H1_SOCKET_RESUMES` tracks + // `H1_SOCKET_PAUSES` on the close branch too (where + // state.reset() would otherwise clear `receive_paused` + // without a counter bump); resume_stream on a closed socket + // is a no-op in uSockets. + if self.state.flags.receive_paused { + self.state.flags.receive_paused = false; + let _ = H1_SOCKET_RESUMES.fetch_add(1, Ordering::Relaxed); + if !socket.is_closed_or_has_error() { + let _ = socket.resume_stream(); + } + } + if self.is_keep_alive_possible() && !socket.is_closed_or_has_error() && tunnel_poolable diff --git a/src/http_jsc/headers_jsc.rs b/src/http_jsc/headers_jsc.rs index 2e1283c63ce..6c89fa5edfa 100644 --- a/src/http_jsc/headers_jsc.rs +++ b/src/http_jsc/headers_jsc.rs @@ -172,7 +172,7 @@ impl H3TestingAPIs { _frame: &CallFrame, ) -> JsResult { use bun_http::h3_client; - let obj = JSValue::create_empty_object(global, 2); + let obj = JSValue::create_empty_object(global, 3); // h3 atomics are `AtomicU32`; widen to u64 for `js_number_from_uint64`. obj.put( global, @@ -188,13 +188,51 @@ impl H3TestingAPIs { h3_client::live_streams.load(Ordering::Relaxed), )), ); + obj.put( + global, + b"bodyBytesReceived", + JSValue::js_number_from_uint64(h3_client::body_bytes_received.load(Ordering::Relaxed)), + ); + Ok(obj) + } +} + +pub struct HTTPTestingAPIs; + +impl HTTPTestingAPIs { + /// `h1_socket_pauses`/`h1_socket_resumes` surfaced for + /// `fetch-backpressure.test.ts` so the h1 stalled-reader test can + /// observe `maybe_pause_receive` / `consume_response_body` from + /// inside the fetch client subprocess without depending on kernel + /// loopback-buffer autotuning. + pub fn h1_backpressure_counts( + global: &JSGlobalObject, + _frame: &CallFrame, + ) -> JsResult { + let obj = JSValue::create_empty_object(global, 2); + obj.put( + global, + b"pauses", + JSValue::js_number_from_uint64(u64::from( + bun_http::H1_SOCKET_PAUSES.load(Ordering::Relaxed), + )), + ); + obj.put( + global, + b"resumes", + JSValue::js_number_from_uint64(u64::from( + bun_http::H1_SOCKET_RESUMES.load(Ordering::Relaxed), + )), + ); Ok(obj) } } /// Free-fn aliases of [`H2TestingAPIs::live_counts`] / -/// [`H3TestingAPIs::quic_live_counts`] so `bun_runtime::dispatch::js2native` -/// can `pub use` them (associated fns aren't importable items). +/// [`H3TestingAPIs::quic_live_counts`] / +/// [`HTTPTestingAPIs::h1_backpressure_counts`] so +/// `bun_runtime::dispatch::js2native` can `pub use` them (associated fns +/// aren't importable items). #[inline] pub fn h2_live_counts(global: &JSGlobalObject, frame: &CallFrame) -> JsResult { H2TestingAPIs::live_counts(global, frame) @@ -203,3 +241,10 @@ pub fn h2_live_counts(global: &JSGlobalObject, frame: &CallFrame) -> JsResult JsResult { H3TestingAPIs::quic_live_counts(global, frame) } +#[inline] +pub fn http_h1_backpressure_counts( + global: &JSGlobalObject, + frame: &CallFrame, +) -> JsResult { + HTTPTestingAPIs::h1_backpressure_counts(global, frame) +} diff --git a/src/js/internal-for-testing.ts b/src/js/internal-for-testing.ts index 10064f1f8b3..7a800010f05 100644 --- a/src/js/internal-for-testing.ts +++ b/src/js/internal-for-testing.ts @@ -330,6 +330,18 @@ export const fetchH3Internals = { liveCounts: $newZigFunction("http/H3Client.zig", "TestingAPIs.quicLiveCounts", 0) as () => { sessions: number; streams: number; + bodyBytesReceived: number; + }, +}; + +export const fetchInternals = { + /** Process-wide HTTP/1.1 `maybePauseReceive` / `consumeResponseBody` + * transition counters — lets the backpressure tests observe pause/resume + * from inside the fetching subprocess instead of inferring it from a + * server-side `drain` timeout that varies with kernel loopback tuning. */ + h1BackpressureCounts: $newZigFunction("http/http.zig", "TestingAPIs.h1BackpressureCounts", 0) as () => { + pauses: number; + resumes: number; }, }; diff --git a/src/runtime/dispatch_js2native.rs b/src/runtime/dispatch_js2native.rs index 4af6e63fec5..72f6989b7e1 100644 --- a/src/runtime/dispatch_js2native.rs +++ b/src/runtime/dispatch_js2native.rs @@ -62,6 +62,7 @@ pub use bun_sys_jsc::error_jsc::TestingAPIs::translate_uv_error_to_e as sys_sys_ pub use bun_http_jsc::headers_jsc::h2_live_counts as http_h2_client_testing_ap_is_live_counts; pub use bun_http_jsc::headers_jsc::h3_quic_live_counts as http_h3_client_testing_ap_is_quic_live_counts; +pub use bun_http_jsc::headers_jsc::http_h1_backpressure_counts as http_http_testing_ap_is_h1_backpressure_counts; /// Lives here (not in `src/bun.rs`) /// because the flag it reads — `cli::Arguments::Bun__Node__UseSystemCA` — is diff --git a/src/runtime/webcore/Body.rs b/src/runtime/webcore/Body.rs index d255480c039..542ff7ca9b1 100644 --- a/src/runtime/webcore/Body.rs +++ b/src/runtime/webcore/Body.rs @@ -280,6 +280,11 @@ pub struct PendingValue { pub on_readable_stream_available: Option, pub on_stream_cancelled: Option)>, + /// Wired to the ByteStream's `drain_handler` when the Locked body is + /// turned into a ReadableStream (`to_readable_stream` / `tee`): called + /// with the byte count each time the stream hands data to the JS + /// reader, so fetch can release receive-side backpressure. + pub on_stream_consumed: Option, bytes: usize)>, pub size_hint: blob::SizeType, pub deinit: bool, @@ -309,6 +314,7 @@ impl Default for PendingValue { on_start_streaming: None, on_readable_stream_available: None, on_stream_cancelled: None, + on_stream_consumed: None, size_hint: 0, deinit: false, action: Action::None, @@ -882,6 +888,12 @@ impl Value { reader.cancel_ctx.set(Some(task)); } } + if let Some(on_consumed) = locked.on_stream_consumed { + if let Some(task) = locked.task { + reader.drain_handler.set(Some(on_consumed)); + reader.drain_ctx.set(Some(task)); + } + } reader.context.setup(); @@ -1559,6 +1571,22 @@ impl Value { unreachable!() }; + // Same teardown/backpressure contracts as `to_readable_stream`: the + // producer (FetchTasklet) must hear about cancellation of either + // branch and about bytes drained by the shared underlying source. + if let Some(on_cancelled) = locked.on_stream_cancelled { + if let Some(task) = locked.task { + reader.cancel_handler.set(Some(on_cancelled)); + reader.cancel_ctx.set(Some(task)); + } + } + if let Some(on_consumed) = locked.on_stream_consumed { + if let Some(task) = locked.task { + reader.drain_handler.set(Some(on_consumed)); + reader.drain_ctx.set(Some(task)); + } + } + let context_ptr: *mut ByteStream = &raw mut reader.context; locked.readable = webcore::readable_stream::Strong::init( ReadableStream { diff --git a/src/runtime/webcore/ByteStream.rs b/src/runtime/webcore/ByteStream.rs index 792d132a4e5..2b26eef7dc0 100644 --- a/src/runtime/webcore/ByteStream.rs +++ b/src/runtime/webcore/ByteStream.rs @@ -158,6 +158,21 @@ impl ByteStream { }); } + /// Report `n` bytes delivered to the JS consumer (reader fulfilled, pipe + /// target, or buffer-action sink) so the producer can release + /// backpressure. Bytes parked in `self.buffer` awaiting a future + /// `on_pull` are *not* reported until that pull happens. + #[inline] + fn did_drain(&self, n: usize) { + if n == 0 { + return; + } + let source = self.parent_const(); + if let Some(handler) = source.drain_handler.get() { + handler(source.drain_ctx.get(), n); + } + } + pub(crate) fn on_data(&self, stream: streams::Result) -> Result<(), bun_jsc::JsTerminated> { bun_jsc::mark_binding!(); if self.done.get() { @@ -183,11 +198,16 @@ impl ByteStream { (p.ctx, p.on_pipe) }; if let Some(ctx) = pipe_ctx { + self.did_drain(stream.slice().len()); (pipe_fn.unwrap())(ctx, stream); return Ok(()); } if self.buffer_action.get().is_some() { + // Buffer-action consumers (`readableStreamToText` etc.) explicitly + // want the whole body; treat every append as consumed so the + // producer isn't throttled waiting for a pull that never comes. + self.did_drain(stream.slice().len()); if let streams::Result::Err(err) = &stream { // Explicit post-reject cleanup; runs after `action.reject` // (`?` would skip it). @@ -329,6 +349,8 @@ impl ByteStream { bun_output::scoped_log!(ByteStream, "ByteStream.onData pending.run()"); + self.did_drain(to_copy_len); + // R-2: `Pending::run` resolves a JS promise (re-enters JS); the // `with_mut` borrow is `UnsafeCell`-backed so `noalias` is // suppressed on `&self`, which is the load-bearing fix vs the old @@ -434,6 +456,7 @@ impl ByteStream { b.shrink_to_fit(); }); self.done.set(true); + self.did_drain(to_write); return streams::Result::IntoArrayAndDone(IntoArray { value: view, @@ -441,6 +464,7 @@ impl ByteStream { }); } + self.did_drain(to_write); return streams::Result::IntoArray(IntoArray { value: view, len: to_write as blob::SizeType, // @truncate @@ -544,6 +568,13 @@ impl ByteStream { pub(crate) fn drain(&self) -> Vec { if !self.buffer.get().is_empty() { + // Bytes placed here before the JS stream was constructed (e.g. + // the `Owned` drain_result from `onStartStreaming`) are handed + // out via `handle.drain()` without going through `on_pull`; + // report them so they don't become a permanent floor in the + // transport's outstanding-bytes accounting (h2 `unacked_bytes`, + // h1/h3 `outstanding_body_bytes`). + self.did_drain(self.buffer.get().len()); return Vec::::move_from_list(self.buffer.replace(Vec::new())); } Vec::::default() @@ -597,6 +628,16 @@ impl ByteStream { return Ok(blob.to_promise(global_this, action)?); } + // Bytes parked in `self.buffer` (the `Owned` drain_result from + // `onStartStreaming`, plus any `on_data` calls that fell through to + // `append()` before this ran) will be folded into the eventual blob + // by the buffer_action resolution path without going through + // `on_pull` or `drain()`. Credit them now — same rationale as the + // `on_data` buffer_action arm ("wants the whole body; treat every + // append as consumed") and the `drain()` call site, but on the + // `tryUseReadableStreamBufferedFastPath` route that skips both. + self.did_drain(self.buffer.get().len()); + self.buffer_action .set(Some(BufferAction::new(action, global_this))); diff --git a/src/runtime/webcore/ReadableStream.rs b/src/runtime/webcore/ReadableStream.rs index 637b437a23b..e71fc01260b 100644 --- a/src/runtime/webcore/ReadableStream.rs +++ b/src/runtime/webcore/ReadableStream.rs @@ -655,10 +655,18 @@ pub struct NewSource { /// `on_js_close` and leaves this `None` — see [`Self::on_close`]. pub close_ctx: Option>, pub close_jsvalue: bun_jsc::strong::Optional, - /// R-2: cleared via `&self` from `FetchTasklet::clear_stream_cancel_handler` + /// R-2: cleared via `&self` from `FetchTasklet::clear_stream_handlers` /// (through `ByteStream::parent_const`), so interior-mutable. pub cancel_handler: Cell)>>, pub cancel_ctx: Cell>, + /// Invoked whenever `Context` hands bytes to the JS reader (as + /// opposed to parking them in an internal buffer). Used by fetch + /// to release response-body receive backpressure (h2 per-stream + /// WINDOW_UPDATE, h1 socket resume, h3 `want_read`). Cleared from + /// `FetchTasklet::clear_stream_handlers` alongside the cancel pair, + /// so interior-mutable for the same reason. + pub drain_handler: Cell, usize)>>, + pub drain_ctx: Cell>, // JSC_BORROW: process-lifetime VM global. Heap m_ctx field reassigned in // `start()` from a fresh `&JSGlobalObject`; `BackRef` gives a safe `Deref` // projection without propagating a lifetime parameter into FFI codegen. @@ -683,6 +691,8 @@ impl Default for NewSource { close_jsvalue: bun_jsc::strong::Optional::empty(), cancel_handler: Cell::new(None), cancel_ctx: Cell::new(None), + drain_handler: Cell::new(None), + drain_ctx: Cell::new(None), global_this: None, this_jsvalue: JSValue::ZERO, is_closed: Cell::new(false), diff --git a/src/runtime/webcore/fetch/FetchTasklet.rs b/src/runtime/webcore/fetch/FetchTasklet.rs index dbd9ba71d0d..b78d7e695c8 100644 --- a/src/runtime/webcore/fetch/FetchTasklet.rs +++ b/src/runtime/webcore/fetch/FetchTasklet.rs @@ -474,7 +474,7 @@ impl FetchTasklet { Response::unref(response); } - self.clear_stream_cancel_handler(); + self.clear_stream_handlers(); self.readable_stream_ref.deinit(); self.scheduled_response_buffer = MutableString::default(); @@ -700,7 +700,7 @@ impl FetchTasklet { let chunk = self.scheduled_response_buffer.list.as_slice(); bytes.on_data(Self::temporary_chunk(chunk, false))?; } else { - self.clear_stream_cancel_handler(); + self.clear_stream_handlers(); let prev = core::mem::take(&mut self.readable_stream_ref); buffer_reset.set(false); @@ -1515,6 +1515,15 @@ impl FetchTasklet { if let Some(http_) = this.http.as_mut() { http_.enable_response_body_streaming(); + // Both Body::to_readable_stream and Body::tee wire `drain_handler` + // on the ByteStream they construct after this returns, so + // `schedule_response_body_consumed` will fire for every reader + // pull. Arm the signal so the transport gates receive + // flow-control on those reports instead of on receipt (h2 + // per-stream WINDOW_UPDATE, h1 socket pause, h3 want_read). + this.signal_store + .body_consumption_tracked + .store(true, Ordering::Release); // If the server sent the headers and the response body in two separate socket writes // and if the server doesn't close the connection by itself @@ -1551,17 +1560,24 @@ impl FetchTasklet { } } - /// Clear the cancel_handler on the ByteStream.Source to prevent use-after-free. - /// Must be called before releasing readable_stream_ref, while the Strong ref - /// still keeps the ReadableStream (and thus the ByteStream.Source) alive. - fn clear_stream_cancel_handler(&mut self) { + /// Clear every ByteStream.Source callback whose ctx pointer is this + /// FetchTasklet (cancel_handler/cancel_ctx and drain_handler/drain_ctx) + /// to prevent use-after-free. Must be called before releasing + /// readable_stream_ref, while the Strong ref still keeps the + /// ReadableStream (and thus the ByteStream.Source) alive — the stream + /// can outlive the tasklet in JS, and a late `did_drain` or + /// `on_stream_cancelled` firing against a freed FetchTasklet is the + /// UAF this guards. + fn clear_stream_handlers(&mut self) { if let Some(readable) = self.readable_stream_ref.get(&self.global_this) { if let Some(bytes) = readable.ptr.bytes() { - // R-2: project to the parent `NewSource` via `&self`; the two + // R-2: project to the parent `NewSource` via `&self`; the // fields are `Cell`-wrapped for exactly this caller. let source = bytes.parent_const(); source.cancel_handler.set(None); source.cancel_ctx.set(None); + source.drain_handler.set(None); + source.drain_ctx.set(None); } } } @@ -1574,6 +1590,22 @@ impl FetchTasklet { this.ignore_remaining_response_body(); } + /// ByteStream delivered `bytes` to the JS reader. Forward to the + /// HTTP thread so the transport can release response-body receive + /// backpressure: HTTP/2 emits per-stream WINDOW_UPDATE, HTTP/1.1 + /// resumes a paused socket read, HTTP/3 re-enables + /// `lsquic_stream_wantread`. + fn on_stream_consumed_callback(ctx: Option<*mut c_void>, bytes: usize) { + let this = Self::from_ctx(ctx.expect("ctx")); + if this.signal_store.aborted.load(Ordering::Relaxed) { + return; + } + let Some(http_) = this.http.as_ref() else { + return; + }; + http::http_thread().schedule_response_body_consumed(http_.async_http_id, bytes); + } + fn to_body_value(&mut self) -> BodyValue { if let Some(err) = self.get_abort_error() { return BodyValue::Error(err); @@ -1586,6 +1618,7 @@ impl FetchTasklet { Some(FetchTasklet::on_start_streaming_http_response_body_callback); pending.on_readable_stream_available = Some(FetchTasklet::on_readable_stream_available); pending.on_stream_cancelled = Some(FetchTasklet::on_stream_cancelled_callback); + pending.on_stream_consumed = Some(FetchTasklet::on_stream_consumed_callback); return BodyValue::Locked(pending); } @@ -1658,11 +1691,30 @@ impl FetchTasklet { if let Some(http_) = self.http.as_mut() { http_.enable_response_body_streaming(); } + // `drain_handler` is about to be cleared and incoming chunks will + // be dropped without ever reaching the ByteStream, so no more + // `schedule_response_body_consumed` reports. Disarm the tracking + // signal so the transport falls back to receipt-based flow + // control (h2 per-stream WINDOW_UPDATE, h1 socket resume, h3 + // `want_read(true)`) and the abandoned body can drain instead of + // stalling. The u32::MAX sentinel consume both wakes the HTTP + // thread (the only other consume trigger is inbound body data, + // and a window-stalled / socket-paused server sends none) and + // saturates the transport's outstanding counter so the first + // re-run releases whatever is already buffered regardless of + // which order the atomic store and the queue drain land in. + self.signal_store + .body_consumption_tracked + .store(false, Ordering::Release); + if let Some(http_) = self.http.as_ref() { + http::http_thread() + .schedule_response_body_consumed(http_.async_http_id, u32::MAX as usize); + } // we should not keep the process alive if we are ignoring the body let _ = self.javascript_vm; self.poll_ref.unref(bun_io::js_vm_ctx()); // clean any remaining references - self.clear_stream_cancel_handler(); + self.clear_stream_handlers(); self.readable_stream_ref.deinit(); self.response.clear(); diff --git a/src/uws_sys/quic/Stream.rs b/src/uws_sys/quic/Stream.rs index 4157d2c22f8..f985316c446 100644 --- a/src/uws_sys/quic/Stream.rs +++ b/src/uws_sys/quic/Stream.rs @@ -25,6 +25,7 @@ unsafe extern "C" { safe fn us_quic_stream_ext(s: &mut Stream) -> *mut c_void; fn us_quic_stream_write(s: *mut Stream, data: *const u8, len: c_uint) -> c_int; safe fn us_quic_stream_want_write(s: &mut Stream, want: c_int); + safe fn us_quic_stream_want_read(s: &mut Stream, want: c_int); fn us_quic_stream_send_headers( s: *mut Stream, h: *const Header, @@ -88,6 +89,15 @@ impl Stream { us_quic_stream_want_write(self, want as c_int) } + /// Toggle lsquic's `on_read` callback for this stream. When `false`, + /// bytes accumulate inside lsquic's receive buffer and it withholds + /// `MAX_STREAM_DATA` credit — the QUIC-level equivalent of pausing a + /// TCP socket read. Used by the HTTP/3 client to backpressure the + /// server when the JS `ReadableStream` reader has stalled. + pub fn want_read(&mut self, want: bool) { + us_quic_stream_want_read(self, want as c_int) + } + pub fn send_headers(&mut self, headers: &[Header], end_stream: bool) -> c_int { // SAFETY: self is a valid us_quic_stream_t; headers.ptr valid for headers.len() entries. unsafe { diff --git a/test/js/node/http/node-http-backpressure-max.test.ts b/test/js/node/http/node-http-backpressure-max.test.ts index 8f32393fd70..7f2812e1dd2 100644 --- a/test/js/node/http/node-http-backpressure-max.test.ts +++ b/test/js/node/http/node-http-backpressure-max.test.ts @@ -45,6 +45,12 @@ describe("backpressure", () => { expect(totalBytes).toBe(payloadSize); }, - 60_000, + // 60 s was tight on darwin-14-x64 once fetch()'s response-body + // backpressure started pausing the socket whenever the reader + // briefly fell behind (~50 pause/resume cycles over 4 GiB; each + // cycle is a kevent change + a short stall while the consume + // report round-trips). ~8-13% overhead on an already-loaded + // x64 mac mini pushed it past 60 s. + 120_000, ); }); diff --git a/test/js/node/http/node-http-backpressure.test.ts b/test/js/node/http/node-http-backpressure.test.ts index 201523be3bd..0e9c5aa8455 100644 --- a/test/js/node/http/node-http-backpressure.test.ts +++ b/test/js/node/http/node-http-backpressure.test.ts @@ -83,7 +83,14 @@ describe("backpressure", () => { const totalBytes = await countResponseBytes(PORT); expect(totalBytes).toBe(totalSize); - }, 30_000); + // `response.body.getReader()` couples the fetch client's socket + // read to JS consumption: the HTTP thread pauses the socket once + // delivered-but-not-yet-credited bytes reach 4 MiB, and the + // cross-thread credit lands on the next HTTP-thread loop tick. For + // 2 GiB that's ~150 pause/resume cycles (~10–20% overhead in + // release). On the slow darwin-14-x64 CI runner 30s had no + // headroom. + }, 60_000); it("should handle backpressure with more than INT_MAX bytes", async () => { // enough to fill the socket buffer @@ -105,5 +112,5 @@ describe("backpressure", () => { const totalBytes = await countResponseBytes(PORT); expect(totalBytes).toBe(totalSize + smallPayloadSize); - }, 30_000); + }, 60_000); }); diff --git a/test/js/web/fetch/fetch-backpressure.test.ts b/test/js/web/fetch/fetch-backpressure.test.ts new file mode 100644 index 00000000000..2507ae2b977 --- /dev/null +++ b/test/js/web/fetch/fetch-backpressure.test.ts @@ -0,0 +1,807 @@ +// Response-body receive backpressure for the fetch() client across all +// three transports. A `res.body.getReader()` that stalls must stop the +// server from filling memory; one that drains must let it continue; +// `reader.cancel()` / body abandonment must fall back so the transfer +// completes for keep-alive / stream reuse. +// +// - HTTP/2: per-stream WINDOW_UPDATE gated on `scheduleResponseBodyConsumed` +// reports. `local_initial_window_size` = 16 MiB, 8 MiB replenish +// threshold. Connection-level credit stays receipt-based (asserted). +// - HTTP/1.1: `us_socket_pause` once outstanding > `receive_body_high_water` +// (4 MiB), resumed below `receive_body_low_water` (1 MiB). TCP rwnd +// does the rest. +// - HTTP/3: `lsquic_stream_wantread(0)` at the same thresholds; lsquic +// withholds `MAX_STREAM_DATA`. +// +// Kept in its own file because each test pushes several MiB through a +// debug-build subprocess and the existing protocol-specific suites run +// under `describe.concurrent` with tight timeouts. + +import { setSocketOptions } from "bun:internal-for-testing"; +import { describe, expect, test } from "bun:test"; +import { bunEnv, bunExe, tls } from "harness"; +import { once } from "node:events"; +import net from "node:net"; +import nodetls from "node:tls"; + +// --- Raw HTTP/2 frame server ------------------------------------------------ +// Minimal TLS+ALPN(h2) server that speaks the wire format directly so the +// test can observe the exact WINDOW_UPDATE frames the client emits. + +function frame(type: number, flags: number, streamId: number, payload: Uint8Array | Buffer = Buffer.alloc(0)) { + const buf = Buffer.alloc(9 + payload.length); + buf.writeUIntBE(payload.length, 0, 3); + buf[3] = type; + buf[4] = flags; + buf.writeUInt32BE(streamId & 0x7fffffff, 5); + Buffer.from(payload.buffer, payload.byteOffset, payload.byteLength).copy(buf, 9); + return buf; +} + +// HPACK static-table index 8 = `:status: 200`. +const hpackStatus200 = Buffer.from([0x80 | 8]); + +type RawConn = { + socket: nodetls.TLSSocket; + headers(streamId: number, block: Buffer): void; + /** Send a PING and resolve once the matching ACK arrives — a barrier: by + * the time the client ACKs, it has parsed every frame written before. */ + ping(): Promise; +}; + +type RawState = { + windowUpdates: Array<{ id: number; increment: number }>; +}; + +async function withRawH2Server( + onStream: (conn: RawConn, streamId: number) => void, + fn: (url: string, state: RawState) => Promise, +) { + const state: RawState = { windowUpdates: [] }; + const server = nodetls.createServer({ ...tls, ALPNProtocols: ["h2"] }, socket => { + const pingWaiters: Array<() => void> = []; + const conn: RawConn = { + socket, + headers: (id, block) => socket.write(frame(1, 4, id, block)), + ping: () => { + socket.write(frame(6, 0, 0, Buffer.alloc(8))); + return new Promise(resolve => pingWaiters.push(resolve)); + }, + }; + let buf = Buffer.alloc(0); + let prefaceSeen = false; + socket.on("data", chunk => { + buf = Buffer.concat([buf, chunk]); + if (!prefaceSeen) { + if (buf.length < 24) return; + buf = buf.subarray(24); + prefaceSeen = true; + socket.write(frame(4, 0, 0)); // server preface: empty SETTINGS + } + while (buf.length >= 9) { + const len = buf.readUIntBE(0, 3); + if (buf.length < 9 + len) return; + const type = buf[3], + flags = buf[4], + id = buf.readUInt32BE(5) & 0x7fffffff; + const payload = buf.subarray(9, 9 + len); + buf = buf.subarray(9 + len); + if (type === 4 && !(flags & 1)) socket.write(frame(4, 1, 0)); // ack their SETTINGS + if (type === 1) onStream(conn, id); + if (type === 6 && flags & 1) pingWaiters.shift()?.(); + if (type === 8) state.windowUpdates.push({ id, increment: payload.readUInt32BE(0) & 0x7fffffff }); + } + }); + socket.on("error", () => {}); + }); + server.listen(0); + await once(server, "listening"); + const { port } = server.address() as import("node:net").AddressInfo; + try { + await fn(`https://localhost:${port}`, state); + } finally { + server.close(); + } +} + +function spawnFetch(script: string, extraEnv: Record = {}) { + return Bun.spawn({ + cmd: [bunExe(), "--no-warnings", "-e", script], + env: { + ...bunEnv, + BUN_FEATURE_FLAG_EXPERIMENTAL_HTTP2_CLIENT: "1", + NODE_TLS_REJECT_UNAUTHORIZED: "0", + ...extraEnv, + }, + stdin: "pipe", + stdout: "pipe", + stderr: "pipe", + }); +} + +// Reads the child's stdout line-by-line. Drains stderr eagerly so that when +// the child dies before printing the awaited line, the error carries the +// actual crash output instead of just "closed stdout". +function lineReader(proc: { stdout: ReadableStream; stderr: ReadableStream }) { + const stderrP = proc.stderr.text(); + const reader = proc.stdout.getReader(); + let acc = ""; + return async function waitFor(prefix: string) { + while (true) { + const nl = acc.indexOf("\n"); + if (nl >= 0) { + const line = acc.slice(0, nl); + acc = acc.slice(nl + 1); + if (line.startsWith(prefix)) return line; + continue; + } + const { value, done } = await reader.read(); + if (done) { + throw new Error( + `subprocess closed stdout without ${JSON.stringify(prefix)}; buffered: ${acc}\nstderr: ${await stderrP}`, + ); + } + acc += Buffer.from(value).toString(); + } + }; +} + +// Push 12 MiB of DATA to `streamId` in 384 KiB batches, round-tripping a +// PING after each so the client has fully parsed (and flushed any +// WINDOW_UPDATE reply) before the next batch lands. Dumping the whole body +// in one burst tickles a pre-existing uSockets-TLS quirk: the h2 client's +// `onData` calls `socket.write()` mid-callback, and +// `us_internal_ssl_socket_write` zeroes `ssl_read_input_length` +// (bun-usockets openssl.c:1815 — the comment there acknowledges it). When +// SSL_read then hits WANT_WRITE with input still queued, `ssl_on_data` +// closes the socket with code 0 (openssl.c:562). Pacing keeps each +// `ssl_on_data` invocation below that threshold. +async function floodData(conn: RawConn, streamId: number) { + const dataFrame = frame(0, 0, streamId, Buffer.alloc(16 * 1024, 0x62)); + const batch = Buffer.concat(Array.from({ length: 24 }, () => dataFrame)); + for (let i = 0; i < 32; i++) { + conn.socket.write(batch); + await conn.ping(); + } +} + +// Both tests send only HEADERS from `onStream`, then wait for the child to +// confirm `getReader()` has run before flooding DATA. That ordering is the +// point: `response_body_streaming` must be true on the HTTP thread before +// any DATA is parsed, otherwise receipt-based crediting would fire and the +// stalled-reader assertion becomes timing-dependent. + +describe("fetch() over HTTP/2 — per-stream receive-window backpressure", () => { + test("stalled getReader() withholds per-stream WINDOW_UPDATE", async () => { + let conn!: RawConn; + const { promise: opened, resolve: markOpened } = Promise.withResolvers(); + await withRawH2Server( + (c, id) => { + conn = c; + c.headers(id, hpackStatus200); + markOpened(); + }, + async (url, state) => { + await using proc = spawnFetch(` + const res = await fetch("${url}", { tls: { rejectUnauthorized: false } }); + const reader = res.body.getReader(); + process.stdout.write("reader\\n"); + await new Promise(() => {}); // hold the reader; test kills us + `); + const waitFor = lineReader(proc); + await waitFor("reader"); + await opened; + // 12 MiB crosses the 8 MiB replenish threshold under receipt-based + // accounting. The final PING in floodData() is the barrier: once + // the client ACKs it, it has parsed every DATA frame and run + // replenishWindow() from onData. + await floodData(conn, 1); + const perStream = state.windowUpdates.filter(w => w.id === 1); + const connLevel = state.windowUpdates.filter(w => w.id === 0); + // Conn-level credit is receipt-based and should have fired + // (12 MiB received >= 8 MiB threshold, plus the preface bump). + expect(connLevel.length).toBeGreaterThan(0); + // Per-stream credit is coupled to JS consumption; reader never + // called read(), so no credit. + expect(perStream).toEqual([]); + conn.socket.destroy(); + proc.kill(); + await proc.exited; + }, + ); + }, 30_000); + + test("getReader() that drains releases per-stream WINDOW_UPDATE", async () => { + let conn!: RawConn; + const { promise: opened, resolve: markOpened } = Promise.withResolvers(); + await withRawH2Server( + (c, id) => { + conn = c; + c.headers(id, hpackStatus200); + markOpened(); + }, + async (url, state) => { + await using proc = spawnFetch(` + const res = await fetch("${url}", { tls: { rejectUnauthorized: false } }); + const reader = res.body.getReader(); + process.stdout.write("reader\\n"); + let total = 0; + while (total < 10 * 1024 * 1024) { + const { value, done } = await reader.read(); + if (done) break; + total += value.byteLength; + } + process.stdout.write("read:" + total + "\\n"); + await new Promise(() => {}); + `); + const waitFor = lineReader(proc); + await waitFor("reader"); + await opened; + // 12 MiB, no END_STREAM: the h2 Stream must stay in the session + // map while the consume messages arrive, otherwise the credit is + // dropped as a lookup miss. + await floodData(conn, 1); + const read = await waitFor("read:"); + expect(Number(read.slice(5))).toBeGreaterThanOrEqual(10 * 1024 * 1024); + // PING barrier *after* JS has posted its consume messages: the + // HTTP thread's drainEvents() processes the consume queue before + // the socket tick that answers this PING, so any remaining + // WINDOW_UPDATE is on the wire by the time the ACK comes back. + await conn.ping(); + const perStream = state.windowUpdates.filter(w => w.id === 1); + expect(perStream.length).toBeGreaterThanOrEqual(1); + const credited = perStream.reduce((a, w) => a + w.increment, 0); + // At least the 8 MiB threshold, and never more than wire bytes received. + expect(credited).toBeGreaterThanOrEqual(8 * 1024 * 1024); + expect(credited).toBeLessThanOrEqual(12 * 1024 * 1024); + conn.socket.destroy(); + proc.kill(); + await proc.exited; + }, + ); + }, 30_000); + + test("reader.cancel() falls back to receipt-based per-stream WINDOW_UPDATE", async () => { + // `ignoreRemainingResponseBody()` (reader.cancel / Response GC) flips + // `response_body_streaming` on so the HTTP thread stops buffering, + // then clears the ByteStream's drain_handler. If the consumption gate + // keyed off `response_body_streaming`, `consumed_bytes` would stay 0 + // forever and the abandoned body would wedge the stream at the + // initial window. It keys off `body_consumption_tracked` instead, + // which `ignoreRemainingResponseBody` disarms — so the per-stream + // credit reverts to receipt-based and the body keeps draining. + let conn!: RawConn; + const { promise: opened, resolve: markOpened } = Promise.withResolvers(); + await withRawH2Server( + (c, id) => { + conn = c; + c.headers(id, hpackStatus200); + markOpened(); + }, + async (url, state) => { + await using proc = spawnFetch(` + const res = await fetch("${url}", { tls: { rejectUnauthorized: false } }); + const reader = res.body.getReader(); + await reader.cancel(); + process.stdout.write("cancelled\\n"); + await new Promise(() => {}); + `); + const waitFor = lineReader(proc); + await waitFor("cancelled"); + await opened; + await floodData(conn, 1); + const perStream = state.windowUpdates.filter(w => w.id === 1); + // Receipt-based: 12 MiB received crosses the 8 MiB threshold. + expect(perStream.length).toBeGreaterThanOrEqual(1); + conn.socket.destroy(); + proc.kill(); + await proc.exited; + }, + ); + }, 30_000); +}); + +// --- HTTP/1.1 ---------------------------------------------------------------- +// The client subprocess reads the process-wide `h1_socket_pauses` / +// `h1_socket_resumes` counters via `fetchInternals.h1BackpressureCounts()` +// so the test observes the `us_socket_pause` / `resumeStream` transitions +// directly — no dependency on the kernel's loopback sndbuf/rcvbuf +// autotuning, which on some CI hosts let 64 MiB land in buffers without +// the server seeing a `drain` stall. The server only needs to push enough +// body bytes past `receive_body_high_water` (4 MiB) for the pause to fire. + +describe("fetch() over HTTP/1.1 — socket-read backpressure", () => { + const h1Prelude = ` + const { fetchInternals } = require("bun:internal-for-testing"); + const counts = () => fetchInternals.h1BackpressureCounts(); + // Poll a counter until it reaches \`target\`, or bail after \`maxMs\`. + async function until(pick, target, maxMs = 10000) { + const deadline = Date.now() + maxMs; + while (pick(counts()) < target) { + if (Date.now() > deadline) return false; + await Bun.sleep(10); + } + return true; + } + `; + + /** Write `cap` bytes in 64 KiB chunks, respecting `drain`, then resolve + * with the total written. Stops early if the socket closes. */ + async function pump(socket: net.Socket, cap: number) { + const chunk = Buffer.alloc(64 * 1024, 0x61); + let written = 0; + const closed = once(socket, "close").then(() => false as const); + while (written < cap && !socket.destroyed) { + if (!socket.write(chunk)) { + const ok = await Promise.race([once(socket, "drain").then(() => true as const), closed]); + if (!ok) break; + } + written += chunk.length; + } + return written; + } + + async function withH1Server(fn: (url: string, onReq: (h: (s: net.Socket) => void) => void) => Promise) { + let handler: ((s: net.Socket) => void) | undefined; + const server = net.createServer(socket => { + // Pin SO_SNDBUF small so the server-side kernel buffer isn't a + // multi-MiB autotuned term in "how much the server can write + // past the client's pause". Not load-bearing for the + // counter-based assertions below — it just keeps the pump + // volume bounded for the draining test. `_handle` is the + // underlying Bun TCPSocket; posix-only. + if ((socket as any)._handle && process.platform !== "win32") { + setSocketOptions((socket as any)._handle, 1, 64 * 1024); + } + socket.once("data", () => { + // Don't parse; just respond. No Content-Length so the client + // reads until close (body_out_str path, not the single-packet + // fast path). + socket.write("HTTP/1.1 200 OK\r\nConnection: close\r\n\r\n"); + handler?.(socket); + }); + socket.on("error", () => {}); + }); + server.listen(0); + await once(server, "listening"); + const { port } = server.address() as net.AddressInfo; + try { + await fn(`http://127.0.0.1:${port}`, h => (handler = h)); + } finally { + server.close(); + } + } + + test("stalled getReader() pauses the socket read", async () => { + await withH1Server(async (url, onReq) => { + const { promise: gotSocket, resolve } = Promise.withResolvers(); + onReq(resolve); + await using proc = spawnFetch(` + ${h1Prelude} + const res = await fetch("${url}"); + const reader = res.body.getReader(); + process.stdout.write("reader\\n"); + // maybePauseReceive fires once outstanding >= receive_body_high_water. + const sawPause = await until(c => c.pauses, 1); + const c = counts(); + process.stdout.write("paused:" + sawPause + ":" + c.pauses + ":" + c.resumes + "\\n"); + await new Promise(() => {}); + `); + const waitFor = lineReader(proc); + await waitFor("reader"); + const socket = await gotSocket; + // Only need to push past receive_body_high_water (4 MiB) for the + // client's maybePauseReceive to fire; 8 MiB gives comfortable + // margin for the headers-then-body split and the first + // progressUpdate to reach JS. The pump runs concurrently with + // the child's `until(pauses, 1)`. + void pump(socket, 8 * 1024 * 1024); + const line = await waitFor("paused:"); + const [, sawPause, pauses, resumes] = line.split(":"); + // Reader never read a byte: pauses > 0, resumes == 0 (the pause + // is still in effect when the child reports). + expect({ sawPause, pauses: Number(pauses), resumes: Number(resumes) }).toEqual({ + sawPause: "true", + pauses: 1, + resumes: 0, + }); + socket.destroy(); + proc.kill(); + await proc.exited; + }); + }, 30_000); + + test("draining getReader() keeps the socket readable", async () => { + await withH1Server(async (url, onReq) => { + const { promise: gotSocket, resolve } = Promise.withResolvers(); + onReq(resolve); + await using proc = spawnFetch(` + ${h1Prelude} + const res = await fetch("${url}"); + const reader = res.body.getReader(); + process.stdout.write("reader\\n"); + let total = 0; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + total += value.byteLength; + } + const c = counts(); + process.stdout.write("read:" + total + ":" + c.pauses + ":" + c.resumes + "\\n"); + `); + const waitFor = lineReader(proc); + await waitFor("reader"); + const socket = await gotSocket; + // 16 MiB gives the HTTP thread room to pull ahead of the + // draining JS reader and cross the 4 MiB high-water mark at + // least once; the reader catches up so every pause is matched + // by a resume and the full body reaches JS. + await pump(socket, 16 * 1024 * 1024); + socket.end(); + const line = await waitFor("read:"); + const [, total, pauses, resumes] = line.split(":").map(Number); + expect(total).toBe(16 * 1024 * 1024); + // Pause/resume may or may not fire depending on relative + // scheduling of the HTTP thread vs the JS reader; the invariant + // is that every transition of `receive_paused` was matched so + // the body completed. `h1_socket_resumes` bumps at every edge + // that clears `receive_paused` (consumeResponseBody, the + // isDone() branch, and the pre-release guard) so this holds + // regardless of which path cleared the last pause. + expect(resumes).toBe(pauses); + socket.destroy(); + proc.kill(); + await proc.exited; + }); + }, 30_000); + + test("reader.cancel() resumes a paused socket", async () => { + await withH1Server(async (url, onReq) => { + const { promise: gotSocket, resolve } = Promise.withResolvers(); + onReq(resolve); + await using proc = spawnFetch(` + ${h1Prelude} + const res = await fetch("${url}"); + const reader = res.body.getReader(); + process.stdout.write("reader\\n"); + // Wait for the pause to fire, then cancel. + const sawPause = await until(c => c.pauses, 1); + await reader.cancel(); + // ignoreRemainingResponseBody disarms body_consumption_tracked + // and posts the maxInt sentinel; consumeResponseBody on the + // HTTP thread sees the signal cleared and resumes regardless + // of the low-water mark. + const sawResume = await until(c => c.resumes, 1); + const c = counts(); + process.stdout.write("done:" + sawPause + ":" + sawResume + ":" + c.pauses + ":" + c.resumes + "\\n"); + await new Promise(() => {}); + `); + const waitFor = lineReader(proc); + await waitFor("reader"); + const socket = await gotSocket; + void pump(socket, 8 * 1024 * 1024); + const line = await waitFor("done:"); + const [, sawPause, sawResume, pauses, resumes] = line.split(":"); + expect({ sawPause, sawResume }).toEqual({ sawPause: "true", sawResume: "true" }); + expect(Number(pauses)).toBeGreaterThanOrEqual(1); + expect(Number(resumes)).toBeGreaterThanOrEqual(1); + socket.destroy(); + proc.kill(); + await proc.exited; + }); + }, 30_000); + + // #28035: `res.body.pipeThrough(TransformStream)` must propagate + // backpressure back to the socket. The pipeTo loop waits on the + // TransformStream writable's readyPromise, so a stalled reader on the + // piped output stops `onPull` → `didDrain` stops firing → + // `outstanding_body_bytes` climbs past `receive_body_high_water` → + // `maybePauseReceive` pauses the socket. Without the fix the + // ByteStream.buffer grows unbounded because the HTTP thread never + // observes the stall. + test("stalled pipeThrough(TransformStream) reader pauses the socket read (#28035)", async () => { + await withH1Server(async (url, onReq) => { + const { promise: gotSocket, resolve } = Promise.withResolvers(); + onReq(resolve); + await using proc = spawnFetch(` + ${h1Prelude} + const res = await fetch("${url}"); + // Identity TransformStream: the pipeTo loop reads res.body and + // writes here; when the output reader stalls the writable's + // readyPromise goes pending and the loop stops pulling. + const reader = res.body + .pipeThrough(new TransformStream()) + .getReader(); + process.stdout.write("reader\\n"); + const first = await reader.read(); + // Sit on the first chunk: maybePauseReceive fires once + // outstanding >= receive_body_high_water. + const sawPause = await until(c => c.pauses, 1); + const c = counts(); + process.stdout.write("paused:" + sawPause + ":" + c.pauses + ":" + c.resumes + ":" + first.value.byteLength + "\\n"); + // Now drain: every read() posts a consume via didDrain and the + // socket resumes below receive_body_low_water. + let total = first.value.byteLength; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + total += value.byteLength; + } + const c2 = counts(); + process.stdout.write("read:" + total + ":" + c2.pauses + ":" + c2.resumes + "\\n"); + `); + const waitFor = lineReader(proc); + await waitFor("reader"); + const socket = await gotSocket; + // Push past receive_body_high_water (4 MiB) while the piped + // reader sits on its first chunk. + const pumped = pump(socket, 16 * 1024 * 1024); + const paused = await waitFor("paused:"); + const [, sawPause, pauses, resumes, firstLen] = paused.split(":"); + expect({ sawPause, pauses: Number(pauses), resumes: Number(resumes) }).toEqual({ + sawPause: "true", + pauses: 1, + resumes: 0, + }); + expect(Number(firstLen)).toBeGreaterThan(0); + // Child is now draining; let the pump finish and close so the + // reader sees done. + await pumped; + socket.end(); + const read = await waitFor("read:"); + const [, total, pauses2, resumes2] = read.split(":").map(Number); + expect(total).toBe(16 * 1024 * 1024); + // Every pause was matched by a resume so the body completed. + expect(resumes2).toBe(pauses2); + socket.destroy(); + proc.kill(); + await proc.exited; + }); + }, 30_000); + + // Same as above but through TextDecoderStream — the shape reported in + // the #28035 comments ("blocking our SSE client"). TextDecoderStream + // is a TransformStream under the hood; the extra hop through + // `createTransformStream` must not change the backpressure behaviour. + test("stalled pipeThrough(TextDecoderStream) reader pauses the socket read (#28035)", async () => { + await withH1Server(async (url, onReq) => { + const { promise: gotSocket, resolve } = Promise.withResolvers(); + onReq(resolve); + await using proc = spawnFetch(` + ${h1Prelude} + const res = await fetch("${url}"); + const reader = res.body + .pipeThrough(new TextDecoderStream()) + .getReader(); + process.stdout.write("reader\\n"); + const first = await reader.read(); + const sawPause = await until(c => c.pauses, 1); + const c = counts(); + process.stdout.write("paused:" + sawPause + ":" + c.pauses + ":" + c.resumes + ":" + first.value.length + "\\n"); + await reader.cancel(); + const sawResume = await until(c => c.resumes, c.pauses); + process.stdout.write("cancelled:" + sawResume + "\\n"); + await new Promise(() => {}); + `); + const waitFor = lineReader(proc); + await waitFor("reader"); + const socket = await gotSocket; + void pump(socket, 8 * 1024 * 1024); + const paused = await waitFor("paused:"); + const [, sawPause, pauses, , firstLen] = paused.split(":"); + expect(sawPause).toBe("true"); + expect(Number(pauses)).toBeGreaterThanOrEqual(1); + expect(Number(firstLen)).toBeGreaterThan(0); + // reader.cancel() on the piped output propagates through + // pipeToErrorsMustBePropagatedBackward → readableStreamCancel on + // res.body → ignoreRemainingResponseBody disarms + // body_consumption_tracked and posts the sentinel → socket + // resumes. + const cancelled = await waitFor("cancelled:"); + expect(cancelled).toBe("cancelled:true"); + socket.destroy(); + proc.kill(); + await proc.exited; + }); + }, 30_000); + + // Regression: uSockets' repeat-recv fast path keeps calling recv() in + // the same epoll tick while the buffer comes back full, so a + // us_socket_pause() issued mid-stream doesn't stop the final chunk + // arriving. The socket was then released to the keep-alive pool with + // `is_paused` still set at the uSockets level; the next request on it + // never saw any data. `res.body; res.arrayBuffer()` is the trigger — + // accessing `.body` arms `body_consumption_tracked` and the + // buffer-action path consumes the body without a reader loop, so the + // pause/resume accounting has to be exact. + test("res.body then arrayBuffer() on a keep-alive socket doesn't wedge the pooled socket", async () => { + await using proc = Bun.spawn({ + cmd: [ + bunExe(), + "-e", + ` + const buf = Buffer.alloc(8 * 1024 * 1024, 0x41); + using server = Bun.serve({ + port: 0, + static: { "/big": new Response(buf) }, + fetch: () => new Response("no"), + }); + const route = server.url.href + "big"; + for (let iter = 0; iter < 10; iter++) { + const batch = []; + for (let i = 0; i < 48; i++) { + batch.push(fetch(route).then(res => { + res.body; + return res.arrayBuffer(); + }).then(ab => { + if (ab.byteLength !== buf.byteLength) throw new Error("short: " + ab.byteLength); + })); + } + await Promise.all(batch); + Bun.gc(); + } + process.stdout.write("ok\\n"); + `, + ], + env: bunEnv, + stdout: "pipe", + stderr: "pipe", + }); + const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]); + expect(stderr).toBe(""); + expect(stdout).toBe("ok\n"); + expect(exitCode).toBe(0); + }, 60_000); +}); + +// --- HTTP/3 ------------------------------------------------------------------ +// `Bun.serve({ http3: true })` runs in the test process; the fetch client +// runs in a subprocess (lsquic's client and server engines can't share +// the same event loop). Bun.serve's response sink buffers ahead of +// QUIC flow control, and `reader.read()` itself triggers a consume +// report that resumes the stream, so neither server-side pull count +// nor a JS-side drain loop can observe the pause directly. Instead the +// client reads the process-wide `onStreamData` byte counter via +// `fetchH3Internals.liveCounts().bodyBytesReceived`: with the +// `wantRead(false)` gate that counter stops near `receive_body_high_water`; +// without it it tracks whatever the server pushed. + +describe("fetch() over HTTP/3 — lsquic wantRead backpressure", () => { + async function withH3Server(bodyBytes: number, fn: (url: string) => Promise) { + const chunk = Buffer.alloc(64 * 1024, 0x62); + await using server = Bun.serve({ + port: 0, + tls, + http3: true, + http1: false, + fetch() { + let sent = 0; + return new Response( + new ReadableStream({ + type: "bytes", + async pull(ctrl) { + if (sent >= bodyBytes) return ctrl.close(); + ctrl.enqueue(chunk.slice()); + sent += chunk.length; + }, + }), + ); + }, + }); + await fn(`https://127.0.0.1:${server.port}`); + } + + const h3Client = (url: string, body: string) => ` + const { fetchH3Internals } = require("bun:internal-for-testing"); + const received = () => fetchH3Internals.liveCounts().bodyBytesReceived; + // Poll the onStreamData counter until it holds steady across two + // consecutive 100 ms gaps — that's the point wantRead(false) took + // effect (or the body finished). Two gaps so a transient QUIC + // scheduling hiccup isn't mistaken for the backpressure plateau. + async function settle() { + let last = received(), stable = 0; + while (stable < 2) { + await Bun.sleep(100); + const cur = received(); + if (cur === last) stable++; else { last = cur; stable = 0; } + } + return last; + } + const res = await fetch("${url}/", { + protocol: "http3", + tls: { rejectUnauthorized: false }, + }); + const reader = res.body.getReader(); + process.stdout.write("reader\\n"); + ${body} + await new Promise(() => {}); + `; + + test("stalled getReader() bounds bytes delivered to the client", async () => { + await withH3Server(32 * 1024 * 1024, async url => { + await using proc = spawnFetch( + h3Client( + url, + ` + process.stdout.write("settled:" + (await settle()) + "\\n"); + `, + ), + { BUN_FEATURE_FLAG_EXPERIMENTAL_HTTP3_CLIENT: "1" }, + ); + const waitFor = lineReader(proc); + await waitFor("reader"); + const settled = Number((await waitFor("settled:")).slice(8)); + // ~4 MiB high-water plus whatever lsquic's on_read loop + // delivered in the batch that crossed it (≤ one + // US_QUIC_READ_BUF pass). Without the gate this climbs to the + // full 32 MiB body. + expect(settled).toBeGreaterThan(1024 * 1024); + expect(settled).toBeLessThan(10 * 1024 * 1024); + proc.kill(); + await proc.exited; + }); + }, 30_000); + + test("draining getReader() reads the full body", async () => { + await withH3Server(8 * 1024 * 1024, async url => { + await using proc = spawnFetch( + h3Client( + url, + ` + let total = 0; + while (true) { + const { value, done } = await reader.read(); + if (done) break; + total += value.byteLength; + } + process.stdout.write("read:" + total + ":" + received() + "\\n"); + `, + ), + { BUN_FEATURE_FLAG_EXPERIMENTAL_HTTP3_CLIENT: "1" }, + ); + const waitFor = lineReader(proc); + await waitFor("reader"); + const [, read, recv] = (await waitFor("read:")).split(":").map(Number); + // Actively draining: the pause/resume cycle must let the full + // body through, and every byte delivered by onStreamData is + // eventually read by JS. + expect(read).toBe(8 * 1024 * 1024); + expect(recv).toBe(8 * 1024 * 1024); + proc.kill(); + await proc.exited; + }); + }, 30_000); + + test("reader.cancel() resumes a paused lsquic stream", async () => { + await withH3Server(32 * 1024 * 1024, async url => { + await using proc = spawnFetch( + h3Client( + url, + ` + const stalledAt = await settle(); + await reader.cancel(); + // ignoreRemainingResponseBody disarms body_consumption_tracked + // and posts the sentinel consume → consumeResponseBodyByHttpId + // → wantRead(true). onStreamData resumes and the counter + // moves past the stall point. + let moved = false; + for (let i = 0; i < 50 && !moved; i++) { + await Bun.sleep(50); + if (received() > stalledAt) moved = true; + } + process.stdout.write("resumed:" + stalledAt + ":" + (moved ? 1 : 0) + "\\n"); + `, + ), + { BUN_FEATURE_FLAG_EXPERIMENTAL_HTTP3_CLIENT: "1" }, + ); + const waitFor = lineReader(proc); + await waitFor("reader"); + const [, stalledAt, moved] = (await waitFor("resumed:")).split(":").map(Number); + expect(stalledAt).toBeGreaterThan(0); + expect(stalledAt).toBeLessThan(10 * 1024 * 1024); + expect(moved).toBe(1); + proc.kill(); + await proc.exited; + }); + }, 30_000); +});