Skip to content
Open
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
8db502f
http: couple per-stream h2 WINDOW_UPDATE to JS body consumption
robobun Apr 28, 2026
dc050be
test: move h2 backpressure tests to standalone file
robobun Apr 28, 2026
794ee0e
h2_client: clamp consumed_bytes to outstanding wire bytes
robobun Apr 28, 2026
4484653
Merge branch 'main' into farm/0a9cea98/h2-window-update-backpressure
Jarred-Sumner Apr 28, 2026
a566b1b
h2_client: gate consumption on body_consumption_tracked, not response…
robobun Apr 28, 2026
4803eac
Merge branch 'main' into farm/0a9cea98/h2-window-update-backpressure
dylan-conway Apr 28, 2026
fbb3f10
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun Apr 29, 2026
c28c8c0
ByteStream: report pre-buffered bytes handed out via drain()
robobun Apr 29, 2026
6aa5782
Merge branch 'main' into farm/0a9cea98/h2-window-update-backpressure
Jarred-Sumner Apr 29, 2026
4113c53
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 1, 2026
670012d
Extend response-body backpressure to HTTP/1.1 and HTTP/3
robobun May 2, 2026
7f808f2
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 2, 2026
08198eb
ci: retrigger gate (release WebKit now prefetched)
robobun May 2, 2026
f35a1b5
h1: resume the socket when the body completes while paused
robobun May 2, 2026
66aab38
h1: count total_body_received delta, not raw wire bytes
robobun May 2, 2026
77a98f2
test(h3): settle() holds for two consecutive 100ms gaps
robobun May 2, 2026
4b5dcf6
doc: InternalState backpressure fields are h1-only
robobun May 2, 2026
5f2d15d
ci: retrigger gate (WebKit cached in /root/.bun/build-cache)
robobun May 2, 2026
1c38a8f
test(h1): extend cancel-test stall pump to 256 MiB
robobun May 2, 2026
8709b87
h1 backpressure: observe pause/resume from the client; count body_out…
robobun May 2, 2026
e55438d
doc: ByteStream.drain comment is transport-agnostic too
robobun May 2, 2026
530c155
doc: body_consumption_tracked comments transport-agnostic (Signals, i…
robobun May 2, 2026
9b3b4b8
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 2, 2026
3b6d8f1
h1: bump h1_socket_resumes on every receive_paused clear; refresh 'wi…
robobun May 2, 2026
71af8d5
Cover remaining receive_paused edges and pre-buffered drain credits
robobun May 2, 2026
c88346f
HTTPThread: only wakeup on consume append, not coalesce
robobun May 2, 2026
783f56a
Raise receive_body_high_water 1→4 MiB, low_water 256K→1 MiB
robobun May 2, 2026
ceea99e
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 2, 2026
90bd4c9
h1: don't re-arm idle timeout during repeat-recv while paused; test f…
robobun May 2, 2026
ac57e89
test(node-http-backpressure-max): raise timeout 60→120s for darwin-14…
robobun May 2, 2026
425d2e2
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 5, 2026
e30543a
Merge remote-tracking branch 'origin/main' into farm/841d7d2d/fix-280…
robobun May 8, 2026
aa454aa
test: fetch().body.pipeThrough() propagates backpressure to socket (#…
robobun May 8, 2026
896feb4
Signals.isEmpty: include body_consumption_tracked in the null check
robobun May 8, 2026
3b45ea1
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 9, 2026
99915ab
test(node-http-backpressure): raise INT_MAX timeouts 30→60s for darwi…
robobun May 10, 2026
3c33ac6
FetchTasklet: rename clearStreamCancelHandler → clearStreamHandlers
robobun May 10, 2026
67640e5
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 22, 2026
338b758
[autofix.ci] apply automated fixes
autofix-ci[bot] May 22, 2026
c35a549
ci: retrigger (darwin x64 build-rust agent terminated mid-build on #5…
robobun May 22, 2026
61e10b4
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 24, 2026
59f65dc
http: fix consume_response_body doc — h2/h3 route to their session ha…
robobun May 24, 2026
bf070b0
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 26, 2026
39bee27
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun Jun 4, 2026
574eedb
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun Jun 6, 2026
df93330
test: surface child stderr when lineReader hits EOF before the awaite…
robobun Jun 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/http/H3Client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ pub static live_sessions: AtomicU32 = AtomicU32::new(0);
pub static live_streams: AtomicU32 = AtomicU32::new(0);
pub use live_sessions as LIVE_SESSIONS;
pub use live_streams as LIVE_STREAMS;
/// Response-body bytes delivered to the JS side across all h3 streams
/// since process start. Surfaced through `fetchH3Internals.liveCounts()`
/// as `bodyBytesReceived` so `fetch-backpressure.test.ts` can observe a
/// plateau from a subprocess when `lsquic_stream_wantread(0)` takes
/// effect — the test can't see per-stream state, and this counter
/// stalls exactly when the h3 read does.
#[allow(non_upper_case_globals)]
pub static body_bytes_received: core::sync::atomic::AtomicU64 =
core::sync::atomic::AtomicU64::new(0);

// Zig: pub const TestingAPIs = @import("../http_jsc/headers_jsc.zig").H3TestingAPIs;
// Deleted per PORTING.md — *_jsc aliases are dropped; H3TestingAPIs lives in
Expand Down
97 changes: 97 additions & 0 deletions src/http/HTTPThread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,12 @@ pub struct HttpThread {
pub queued_shutdowns: Vec<ShutdownMessage>,
pub queued_writes: Vec<WriteMessage>,
pub queued_response_body_drains: Vec<DrainMessage>,
pub queued_response_body_consumed: Vec<ConsumeMessage>,

pub queued_shutdowns_lock: Mutex,
pub queued_writes_lock: Mutex,
pub queued_response_body_drains_lock: Mutex,
pub queued_response_body_consumed_lock: Mutex,

pub queued_threadlocal_proxy_derefs: Vec<*mut ProxyTunnel>,

Expand Down Expand Up @@ -179,9 +181,11 @@ impl HttpThread {
queued_shutdowns: Vec::new(),
queued_writes: Vec::new(),
queued_response_body_drains: Vec::new(),
queued_response_body_consumed: Vec::new(),
queued_shutdowns_lock: Mutex::new(),
queued_writes_lock: Mutex::new(),
queued_response_body_drains_lock: Mutex::new(),
queued_response_body_consumed_lock: Mutex::new(),
queued_threadlocal_proxy_derefs: Vec::new(),
has_awoken: AtomicBool::new(false),
timer: Instant::now(),
Expand Down Expand Up @@ -274,6 +278,11 @@ pub struct DrainMessage {
pub async_http_id: u32,
}

pub struct ConsumeMessage {
pub async_http_id: u32,
pub bytes: u32,
}

pub struct ShutdownMessage {
pub async_http_id: u32,
}
Expand Down Expand Up @@ -818,9 +827,62 @@ impl HttpThread {
}
}

fn drain_queued_http_response_body_consumed(&mut self) {
loop {
let queued = {
let _guard = self.queued_response_body_consumed_lock.lock_guard();
core::mem::take(&mut self.queued_response_body_consumed)
};

for msg in &queued {
if let Some(socket_ptr) = abort_tracker().get(&msg.async_http_id) {
match *socket_ptr {
uws::AnySocket::SocketTls(socket) => {
let tagged = HTTPContext::<true>::get_tagged_from_socket(socket);
if let Some(client) = tagged.client_mut() {
// HTTP/1.1: may resume a paused socket read.
client.consume_response_body::<true>(socket, msg.bytes);
}
if let Some(session) = tagged.session_mut() {
// HTTP/2: releases per-stream WINDOW_UPDATE.
session
.consume_response_body_by_http_id(msg.async_http_id, msg.bytes);
}
}
uws::AnySocket::SocketTcp(socket) => {
let tagged = HTTPContext::<false>::get_tagged_from_socket(socket);
if let Some(client) = tagged.client_mut() {
client.consume_response_body::<false>(socket, msg.bytes);
}
if let Some(session) = tagged.session_mut() {
session
.consume_response_body_by_http_id(msg.async_http_id, msg.bytes);
}
}
}
} else {
// HTTP/3: QUIC streams aren't in the TCP-socket tracker;
// dispatch via the session registry. May resume a
// lsquic `want_read(false)` pause.
h3::ClientContext::consume_response_body_by_http_id(
msg.async_http_id,
msg.bytes,
);
}
}
let len = queued.len();
drop(queued);
if len == 0 {
break;
}
bun_core::scoped_log!(HTTPThread, "drained {} queued consumes", len);
}
}

pub fn drain_events(&mut self) {
// Process any pending writes **before** aborting.
self.drain_queued_http_response_body_drains();
self.drain_queued_http_response_body_consumed();
self.drain_queued_writes();
self.drain_queued_shutdowns();
h3::PendingConnect::drain_resolved();
Expand Down Expand Up @@ -927,6 +989,41 @@ impl HttpThread {
self.wakeup();
}

/// JS-thread → HTTP-thread notice that the `ReadableStream` reader for
/// `async_http_id` has drained `bytes` from its buffer. Consecutive
/// messages for the same id are coalesced under the lock so a tight
/// `read()` loop posts one entry per wake instead of one per pull.
pub fn schedule_response_body_consumed(&mut self, async_http_id: u32, bytes: usize) {
let n: u32 = u32::try_from(bytes).unwrap_or(u32::MAX);
if n == 0 {
return;
}
let appended = {
let _guard = self.queued_response_body_consumed_lock.lock_guard();
match self.queued_response_body_consumed.last_mut() {
Some(last) if last.async_http_id == async_http_id => {
last.bytes = last.bytes.saturating_add(n);
false
}
_ => {
self.queued_response_body_consumed.push(ConsumeMessage {
async_http_id,
bytes: n,
});
true
}
}
};
// Only wake on the append path: if we coalesced into an existing
// entry, the HTTP thread was already woken for that entry's
// original append and will see the updated `bytes` when it swaps
// the queue under the same lock. A tight `read()` loop over a
// multi-GiB body otherwise issues one eventfd write per pull.
if appended {
self.wakeup();
}
}

pub fn schedule_shutdown(&mut self, http: &AsyncHttp) {
bun_core::scoped_log!(HTTPThread, "scheduleShutdown {}", http.async_http_id);
{
Expand Down
21 changes: 21 additions & 0 deletions src/http/InternalState.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ pub struct InternalState<'a> {
pub compressed_body: MutableString,
pub content_length: Option<usize>,
pub total_body_received: usize,
/// Body bytes delivered to the JS thread (counted as `body_out_str`
/// delta, i.e. post-dechunk/post-decompress) that the ByteStream
/// hasn't yet reported drained via `schedule_response_body_consumed`.
/// Feeds `maybe_pause_receive`/`consume_response_body` so an h1
/// socket read stalls once the JS reader falls behind
/// `RECEIVE_BODY_HIGH_WATER`. Only meaningful while
/// `Signals::BodyConsumptionTracked` is armed. Decrement is
/// saturating (the reader.cancel sentinel is `u32::MAX`, and
/// `ByteStream::drain()` can credit pre-arming bytes that were never
/// counted here).
pub outstanding_body_bytes: usize,
// Self-borrow into `original_request_body.bytes`; `RawSlice` carries the
// outlives-holder invariant (the backing `original_request_body` is a
// sibling field, so it lives exactly as long as this struct).
Expand Down Expand Up @@ -68,6 +79,14 @@ pub struct InternalStateFlags {
/// pool/close decision — that decision needs `hostname` still set to know
/// the handshake was verified against an override.
pub clear_hostname_on_redirect: bool,
/// `maybe_pause_receive` called `socket.pause_stream()` and cleared
/// the idle timer. Every true→false transition goes through
/// `H1_SOCKET_RESUMES.fetch_add` (including the done/redirect and
/// keep-alive-release resumes) so the test-visible
/// `h1BackpressureCounts()` stays balanced; the pooled socket's
/// uSockets `is_paused` bit survives `reset()`, so a paused socket
/// handed back to the pool would hang its next borrower.
pub receive_paused: bool,
}

impl InternalStateFlags {
Expand All @@ -81,6 +100,7 @@ impl InternalStateFlags {
is_libdeflate_fast_path_disabled: false,
resend_request_body_on_redirect: false,
clear_hostname_on_redirect: false,
receive_paused: false,
}
}
}
Expand Down Expand Up @@ -109,6 +129,7 @@ impl Default for InternalState<'_> {
compressed_body: MutableString::init_empty(),
content_length: None,
total_body_received: 0,
outstanding_body_bytes: 0,
request_body: bun_ptr::RawSlice::EMPTY,
original_request_body: HTTPRequestBody::Bytes(b""),
request_sent_len: 0,
Expand Down
17 changes: 17 additions & 0 deletions src/http/Signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ pub struct Signals {
// LIFETIMES.tsv had no entry; classified as BACKREF (raw) per PORTING.md.
pub header_progress: Option<NonNull<AtomicBool>>,
pub response_body_streaming: Option<NonNull<AtomicBool>>,
/// Distinct from `response_body_streaming`: set only while a JS
/// consumer is wired to report drained bytes via
/// `schedule_response_body_consumed`. `response_body_streaming` is
/// also set by paths that never report consumption (S3 streaming
/// download, abandoned bodies via `ignore_remaining_response_body`);
/// gating flow-control on that would deadlock those streams. All
/// three transports key receive-side backpressure on this signal —
/// not `response_body_streaming` — to decide whether flow control
/// is consumption-gated or receipt-based (h1 `maybe_pause_receive`,
/// h2 `replenish_window`, h3 `on_stream_data`).
pub body_consumption_tracked: Option<NonNull<AtomicBool>>,
pub aborted: Option<NonNull<AtomicBool>>,
pub cert_errors: Option<NonNull<AtomicBool>>,
pub upgraded: Option<NonNull<AtomicBool>>,
Expand All @@ -16,6 +27,7 @@ impl Signals {
pub fn is_empty(&self) -> bool {
self.aborted.is_none()
&& self.response_body_streaming.is_none()
&& self.body_consumption_tracked.is_none()
&& self.header_progress.is_none()
&& self.cert_errors.is_none()
&& self.upgraded.is_none()
Expand All @@ -38,6 +50,7 @@ impl Signals {
let ptr: NonNull<AtomicBool> = match field {
Field::HeaderProgress => self.header_progress,
Field::ResponseBodyStreaming => self.response_body_streaming,
Field::BodyConsumptionTracked => self.body_consumption_tracked,
Field::Aborted => self.aborted,
Field::CertErrors => self.cert_errors,
Field::Upgraded => self.upgraded,
Expand All @@ -64,6 +77,7 @@ impl Signals {
pub struct Store {
pub header_progress: AtomicBool,
pub response_body_streaming: AtomicBool,
pub body_consumption_tracked: AtomicBool,
pub aborted: AtomicBool,
pub cert_errors: AtomicBool,
pub upgraded: AtomicBool,
Expand All @@ -74,6 +88,7 @@ impl Default for Store {
Self {
header_progress: AtomicBool::new(false),
response_body_streaming: AtomicBool::new(false),
body_consumption_tracked: AtomicBool::new(false),
aborted: AtomicBool::new(false),
cert_errors: AtomicBool::new(false),
upgraded: AtomicBool::new(false),
Expand All @@ -86,6 +101,7 @@ impl Store {
Signals {
header_progress: Some(NonNull::from(&self.header_progress)),
response_body_streaming: Some(NonNull::from(&self.response_body_streaming)),
body_consumption_tracked: Some(NonNull::from(&self.body_consumption_tracked)),
aborted: Some(NonNull::from(&self.aborted)),
cert_errors: Some(NonNull::from(&self.cert_errors)),
upgraded: Some(NonNull::from(&self.upgraded)),
Expand All @@ -98,6 +114,7 @@ impl Store {
pub enum Field {
HeaderProgress,
ResponseBodyStreaming,
BodyConsumptionTracked,
Aborted,
CertErrors,
Upgraded,
Expand Down
69 changes: 63 additions & 6 deletions src/http/h2_client/ClientSession.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,40 @@ impl ClientSession {
}
}

/// HTTP-thread wake-up from `schedule_response_body_consumed`: the JS
/// reader drained `bytes` from the ByteStream. Bump the stream's
/// consumption counter and release any per-stream window credit that has
/// become available.
pub fn consume_response_body_by_http_id(&mut self, async_http_id: u32, bytes: u32) {
let _guard = self.ref_scope();
let mut found = false;
for &stream in self.streams.values() {
let s = stream_mut(stream);
let Some(client) = s.client_mut() else {
continue;
};
if client.async_http_id != async_http_id {
continue;
}
// `bytes` is decompressed; clamp the running total to wire bytes
// still outstanding so a compression surplus isn't banked to
// credit later DATA the reader hasn't touched.
s.consumed_bytes =
core::cmp::min(s.consumed_bytes.saturating_add(bytes), s.unacked_bytes);
found = true;
break;
}
if !found {
return;
}
self.replenish_window();
if self.write_buffer.is_not_empty() {
if let Err(err) = self.flush() {
self.fail_all(err);
}
}
}

/// HTTP-thread wake-up from `scheduleRequestWrite`: new body bytes (or
/// end-of-body) are available in the ThreadSafeStreamBuffer.
pub fn stream_body_by_http_id(&mut self, async_http_id: u32, ended: bool) {
Expand Down Expand Up @@ -589,21 +623,44 @@ impl ClientSession {

fn replenish_window(&mut self) {
let threshold = LOCAL_INITIAL_WINDOW_SIZE / 2;
// Connection-level credit stays receipt-based so one stream whose JS
// reader is stalled doesn't starve siblings of the shared window.
if self.conn_unacked_bytes >= threshold {
self.write_window_update(0, self.conn_unacked_bytes);
self.conn_unacked_bytes = 0;
}
// PORT NOTE: reshaped for borrowck — collect (id, unacked) pairs before mutating self.
// PORT NOTE: reshaped for borrowck — collect (id, credit) pairs before mutating self.
let mut updates: Vec<(u32, u32)> = Vec::new();
for &s in self.streams.values() {
let s = stream_mut(s);
if s.unacked_bytes >= threshold && !s.remote_closed() {
updates.push((s.id, s.unacked_bytes));
s.unacked_bytes = 0;
if s.remote_closed() {
continue;
}
// `body_consumption_tracked` is set only while a JS consumer is
// reporting drained bytes via `schedule_response_body_consumed`
// (fetch `res.body` with a `drain_handler` wired). It is *not*
// set for buffering consumers (`await res.text()`), S3 streaming
// downloads, or once the body is abandoned via
// `ignore_remaining_response_body` — those stay receipt-based so
// the transfer completes. When tracked, credit only what JS has
// actually drained, clamped to wire bytes received so a
// decompressed body can't inflate the window past what was sent.
let tracked = s
.client_mut()
.is_some_and(|c| c.signals.get(signals::Field::BodyConsumptionTracked));
let avail = if tracked {
core::cmp::min(s.consumed_bytes, s.unacked_bytes)
} else {
s.unacked_bytes
};
if avail >= threshold {
updates.push((s.id, avail));
s.unacked_bytes -= avail;
s.consumed_bytes = s.consumed_bytes.saturating_sub(avail);
}
}
for (id, unacked) in updates {
self.write_window_update(id, unacked);
for (id, avail) in updates {
self.write_window_update(id, avail);
}
// PERF(port): Zig iterated and wrote in one pass; profile if extra Vec matters.
}
Expand Down
12 changes: 11 additions & 1 deletion src/http/h2_client/Stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,17 @@ pub struct Stream {
/// or final status arrives.
pub awaiting_continue: bool,
pub fatal_error: Option<Error>,
/// DATA bytes consumed since the last WINDOW_UPDATE for this stream.
/// DATA bytes received since the last per-stream WINDOW_UPDATE. For
/// consumers without `body_consumption_tracked` set this alone drives
/// the credit; for tracked consumers it is the ceiling on what
/// `consumed_bytes` may release.
pub unacked_bytes: u32,
/// Bytes the JS `ReadableStream` reader has actually drained, reported
/// via `schedule_response_body_consumed`. Only consulted when
/// `body_consumption_tracked` is true; `replenish_window` credits
/// `min(consumed_bytes, unacked_bytes)` so a stalled reader withholds
/// the per-stream window and a compressed body can't over-credit.
pub consumed_bytes: u32,
/// Σ DATA payload bytes (post-padding) for §8.1.1 Content-Length check —
/// `total_body_received` is clamped at content_length so it can't catch
/// overshoot.
Expand Down Expand Up @@ -144,6 +153,7 @@ impl Stream {
awaiting_continue: false,
fatal_error: None,
unacked_bytes: 0,
consumed_bytes: 0,
data_bytes_received: 0,
send_window,
pending_body: bun_ptr::RawSlice::EMPTY,
Expand Down
Loading
Loading