Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
8db502f
http: couple per-stream h2 WINDOW_UPDATE to JS body consumption
robobun Apr 28, 2026
dc050be
test: move h2 backpressure tests to standalone file
robobun Apr 28, 2026
794ee0e
h2_client: clamp consumed_bytes to outstanding wire bytes
robobun Apr 28, 2026
4484653
Merge branch 'main' into farm/0a9cea98/h2-window-update-backpressure
Jarred-Sumner Apr 28, 2026
a566b1b
h2_client: gate consumption on body_consumption_tracked, not response…
robobun Apr 28, 2026
4803eac
Merge branch 'main' into farm/0a9cea98/h2-window-update-backpressure
dylan-conway Apr 28, 2026
fbb3f10
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun Apr 29, 2026
c28c8c0
ByteStream: report pre-buffered bytes handed out via drain()
robobun Apr 29, 2026
6aa5782
Merge branch 'main' into farm/0a9cea98/h2-window-update-backpressure
Jarred-Sumner Apr 29, 2026
4113c53
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 1, 2026
670012d
Extend response-body backpressure to HTTP/1.1 and HTTP/3
robobun May 2, 2026
7f808f2
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 2, 2026
08198eb
ci: retrigger gate (release WebKit now prefetched)
robobun May 2, 2026
f35a1b5
h1: resume the socket when the body completes while paused
robobun May 2, 2026
66aab38
h1: count total_body_received delta, not raw wire bytes
robobun May 2, 2026
77a98f2
test(h3): settle() holds for two consecutive 100ms gaps
robobun May 2, 2026
4b5dcf6
doc: InternalState backpressure fields are h1-only
robobun May 2, 2026
5f2d15d
ci: retrigger gate (WebKit cached in /root/.bun/build-cache)
robobun May 2, 2026
1c38a8f
test(h1): extend cancel-test stall pump to 256 MiB
robobun May 2, 2026
8709b87
h1 backpressure: observe pause/resume from the client; count body_out…
robobun May 2, 2026
e55438d
doc: ByteStream.drain comment is transport-agnostic too
robobun May 2, 2026
530c155
doc: body_consumption_tracked comments transport-agnostic (Signals, i…
robobun May 2, 2026
9b3b4b8
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 2, 2026
3b6d8f1
h1: bump h1_socket_resumes on every receive_paused clear; refresh 'wi…
robobun May 2, 2026
71af8d5
Cover remaining receive_paused edges and pre-buffered drain credits
robobun May 2, 2026
c88346f
HTTPThread: only wakeup on consume append, not coalesce
robobun May 2, 2026
783f56a
Raise receive_body_high_water 1→4 MiB, low_water 256K→1 MiB
robobun May 2, 2026
ceea99e
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 2, 2026
90bd4c9
h1: don't re-arm idle timeout during repeat-recv while paused; test f…
robobun May 2, 2026
ac57e89
test(node-http-backpressure-max): raise timeout 60→120s for darwin-14…
robobun May 2, 2026
425d2e2
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 5, 2026
e30543a
Merge remote-tracking branch 'origin/main' into farm/841d7d2d/fix-280…
robobun May 8, 2026
aa454aa
test: fetch().body.pipeThrough() propagates backpressure to socket (#…
robobun May 8, 2026
896feb4
Signals.isEmpty: include body_consumption_tracked in the null check
robobun May 8, 2026
3b45ea1
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 9, 2026
99915ab
test(node-http-backpressure): raise INT_MAX timeouts 30→60s for darwi…
robobun May 10, 2026
3c33ac6
FetchTasklet: rename clearStreamCancelHandler → clearStreamHandlers
robobun May 10, 2026
67640e5
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 22, 2026
338b758
[autofix.ci] apply automated fixes
autofix-ci[bot] May 22, 2026
c35a549
ci: retrigger (darwin x64 build-rust agent terminated mid-build on #5…
robobun May 22, 2026
61e10b4
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 24, 2026
59f65dc
http: fix consume_response_body doc — h2/h3 route to their session ha…
robobun May 24, 2026
bf070b0
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 26, 2026
39bee27
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun Jun 4, 2026
574eedb
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun Jun 6, 2026
df93330
test: surface child stderr when lineReader hits EOF before the awaite…
robobun Jun 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/bun.js/webcore/Body.zig
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub const PendingValue = struct {
onStartStreaming: ?*const fn (ctx: *anyopaque) jsc.WebCore.DrainResult = null,
onReadableStreamAvailable: ?*const fn (ctx: *anyopaque, globalThis: *jsc.JSGlobalObject, readable: jsc.WebCore.ReadableStream) void = null,
onStreamCancelled: ?*const fn (ctx: ?*anyopaque) void = null,
onStreamConsumed: ?*const fn (ctx: ?*anyopaque, bytes: usize) void = null,
size_hint: Blob.SizeType = 0,

deinit: bool = false,
Expand Down Expand Up @@ -519,6 +520,12 @@ pub const Value = union(Tag) {
reader.cancel_ctx = task;
}
}
if (locked.onStreamConsumed) |onConsumed| {
if (locked.task) |task| {
reader.drain_handler = onConsumed;
reader.drain_ctx = task;
}
}

reader.context.setup();

Expand Down Expand Up @@ -1034,6 +1041,19 @@ pub const Value = union(Tag) {
.globalThis = globalThis,
});

if (locked.onStreamCancelled) |onCancelled| {
if (locked.task) |task| {
reader.cancel_handler = onCancelled;
reader.cancel_ctx = task;
}
}
if (locked.onStreamConsumed) |onConsumed| {
if (locked.task) |task| {
reader.drain_handler = onConsumed;
reader.drain_ctx = task;
}
}

reader.context.setup();

if (drain_result == .estimated_size) {
Expand Down
24 changes: 24 additions & 0 deletions src/bun.js/webcore/ByteStream.zig
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ pub fn unpipeWithoutDeref(this: *@This()) void {
this.pipe.onPipe = null;
}

/// Report `n` bytes delivered to the JS consumer (reader fulfilled, pipe
/// target, or buffer-action sink) so the producer can release backpressure.
/// Bytes parked in `this.buffer` awaiting a future `onPull` are *not*
/// reported until that pull happens.
inline fn didDrain(this: *@This(), n: usize) void {
if (n == 0) return;
const source = this.parent();
if (source.drain_handler) |handler| handler(source.drain_ctx, n);
}

pub fn onData(
this: *@This(),
stream: streams.Result,
Expand All @@ -98,13 +108,18 @@ pub fn onData(
this.has_received_last_chunk = stream.isDone();

if (this.pipe.ctx) |ctx| {
this.didDrain(stream.slice().len);
this.pipe.onPipe.?(ctx, stream, allocator);
return;
}

const chunk = stream.slice();

if (this.buffer_action) |*action| {
// Buffer-action consumers (`readableStreamToText` etc.) explicitly
// want the whole body; treat every append as consumed so the
// producer isn't throttled waiting for a pull that never comes.
this.didDrain(chunk.len);
if (stream == .err) {
defer {
this.buffer.clearAndFree();
Expand Down Expand Up @@ -206,6 +221,7 @@ pub fn onData(

log("ByteStream.onData pending.run()", .{});

this.didDrain(to_copy.len);
this.pending.run();

return;
Expand Down Expand Up @@ -306,6 +322,7 @@ pub fn onPull(this: *@This(), buffer: []u8, view: jsc.JSValue) streams.Result {
if (this.has_received_last_chunk and remaining_in_buffer.len == 0) {
this.buffer.clearAndFree();
this.done = true;
this.didDrain(to_write);

return .{
.into_array_and_done = .{
Expand All @@ -315,6 +332,7 @@ pub fn onPull(this: *@This(), buffer: []u8, view: jsc.JSValue) streams.Result {
};
}

this.didDrain(to_write);
return .{
.into_array = .{
.value = view,
Expand Down Expand Up @@ -389,6 +407,12 @@ pub fn deinit(this: *@This()) void {

pub fn drain(this: *@This()) bun.ByteList {
if (this.buffer.items.len > 0) {
// Bytes placed here before the JS stream was constructed (e.g.
// the `.owned` drain_result from `onStartStreaming`) are handed
// out via `handle.drain()` without going through `onPull`; report
// them so they don't become a permanent `unacked_bytes` floor in
// the HTTP/2 per-stream window accounting.
Comment thread
robobun marked this conversation as resolved.
Outdated
this.didDrain(this.buffer.items.len);
return bun.ByteList.moveFromList(&this.buffer);
}
return .{};
Expand Down
5 changes: 5 additions & 0 deletions src/bun.js/webcore/ReadableStream.zig
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,11 @@ pub fn NewSource(
close_jsvalue: jsc.Strong.Optional = .empty,
cancel_handler: ?*const fn (?*anyopaque) void = null,
cancel_ctx: ?*anyopaque = null,
/// Invoked whenever `Context` hands bytes to the JS reader (as
/// opposed to parking them in an internal buffer). Used by fetch to
/// couple HTTP/2 per-stream WINDOW_UPDATE to actual body consumption.
drain_handler: ?*const fn (?*anyopaque, usize) void = null,
drain_ctx: ?*anyopaque = null,
globalThis: *JSGlobalObject = undefined,
this_jsvalue: jsc.JSValue = .zero,
is_closed: bool = false,
Expand Down
36 changes: 36 additions & 0 deletions src/bun.js/webcore/fetch/FetchTasklet.zig
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,12 @@

if (this.http) |http_| {
http_.enableResponseBodyStreaming();
// Both Body.toReadableStream and Body.tee wire `drain_handler`
// on the ByteStream they construct after this returns, so
// `scheduleResponseBodyConsumed` will fire for every reader
// pull. Arm the signal so the HTTP/2 client gates per-stream
// WINDOW_UPDATE on those reports instead of on receipt.
this.signal_store.body_consumption_tracked.store(true, .release);

// If the server sent the headers and the response body in two separate socket writes
// and if the server doesn't close the connection by itself
Expand Down Expand Up @@ -925,6 +931,8 @@
const source = readable.ptr.Bytes.parent();
source.cancel_handler = null;
source.cancel_ctx = null;
source.drain_handler = null;
source.drain_ctx = null;
}
}
}
Expand All @@ -935,6 +943,17 @@
this.ignoreRemainingResponseBody();
}

/// ByteStream delivered `bytes` to the JS reader. Forward to the HTTP
/// thread so the HTTP/2 client can release per-stream flow-control
/// window proportional to what JS has actually drained; no-op for
/// HTTP/1.1 sockets.

Check warning on line 949 in src/bun.js/webcore/fetch/FetchTasklet.zig

View check run for this annotation

Claude / Claude Code Review

Stale h2-only doc comments after h1/h3 extension

nit: this doc comment ("no-op for HTTP/1.1 sockets") was accurate for the original h2-only revision, but commit 670012d extended `scheduleResponseBodyConsumed` to also drive HTTP/1.1 socket pause/resume (`HTTPClient.consumeResponseBody`) and HTTP/3 `wantRead` toggling — so it's now factually wrong. The `drain_handler` comment at `src/bun.js/webcore/ReadableStream.zig:435-437` has the same staleness (mentions only "HTTP/2 per-stream WINDOW_UPDATE"). Consider rewording both to something transport-
Comment thread
robobun marked this conversation as resolved.
Outdated
fn onStreamConsumedCallback(ctx: ?*anyopaque, bytes: usize) void {
const this = bun.cast(*FetchTasklet, ctx.?);
if (this.signal_store.aborted.load(.monotonic)) return;
const http_ = this.http orelse return;
bun.http.http_thread.scheduleResponseBodyConsumed(http_.async_http_id, bytes);
}

fn toBodyValue(this: *FetchTasklet) Body.Value {
if (this.getAbortError()) |err| {
return .{ .Error = err };
Expand All @@ -948,6 +967,7 @@
.onStartStreaming = FetchTasklet.onStartStreamingHTTPResponseBodyCallback,
.onReadableStreamAvailable = FetchTasklet.onReadableStreamAvailable,
.onStreamCancelled = FetchTasklet.onStreamCancelledCallback,
.onStreamConsumed = FetchTasklet.onStreamConsumedCallback,
},
};
return response;
Expand Down Expand Up @@ -998,6 +1018,22 @@
if (this.http) |http_| {
http_.enableResponseBodyStreaming();
}
// `drain_handler` is about to be cleared and incoming chunks will
// be dropped without ever reaching the ByteStream, so no more
// `scheduleResponseBodyConsumed` reports. Disarm the tracking
// signal so the HTTP/2 client falls back to receipt-based
// per-stream WINDOW_UPDATE and the abandoned body can drain
// instead of stalling the stream at the initial window. The
// sentinel consume message both wakes the HTTP thread (so
// `replenishWindow` re-runs — its only other trigger is inbound
// DATA, and a server that has exhausted the window sends none)
// and saturates `consumed_bytes` to `unacked_bytes` so the first
// re-run releases whatever is already outstanding regardless of
// which order the atomic store and the queue drain land in.
this.signal_store.body_consumption_tracked.store(false, .release);
if (this.http) |http_| {
bun.http.http_thread.scheduleResponseBodyConsumed(http_.async_http_id, std.math.maxInt(u32));
}
// we should not keep the process alive if we are ignoring the body
const vm = this.javascript_vm;
this.poll_ref.unref(vm);
Expand Down
10 changes: 10 additions & 0 deletions src/deps/uws/quic/Stream.zig
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ pub const Stream = opaque {
us_quic_stream_want_write(s, @intFromBool(want));
}

extern fn us_quic_stream_want_read(s: *Stream, want: c_int) void;
/// Toggle lsquic's `on_read` callback for this stream. When `false`,
/// bytes accumulate inside lsquic's receive buffer and it withholds
/// `MAX_STREAM_DATA` credit — the QUIC-level equivalent of pausing a
/// TCP socket read. Used by the HTTP/3 client to backpressure the
/// server when the JS `ReadableStream` reader has stalled.
pub fn wantRead(s: *Stream, want: bool) void {
us_quic_stream_want_read(s, @intFromBool(want));
}

extern fn us_quic_stream_send_headers(s: *Stream, h: [*]const Header, n: c_uint, end_stream: c_int) c_int;
pub fn sendHeaders(s: *Stream, headers: []const Header, end_stream: bool) c_int {
return us_quic_stream_send_headers(s, headers.ptr, @intCast(headers.len), @intFromBool(end_stream));
Expand Down
80 changes: 80 additions & 0 deletions src/http.zig
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,20 @@ pub fn retryAfterH2Coalesce(this: *HTTPClient) void {
this.start_(true);
}

/// Response-body receive backpressure for streaming consumers
/// (`signals.body_consumption_tracked` set). Once the wire bytes handed
/// to JS but not yet reported drained via `scheduleResponseBodyConsumed`
/// exceed `receive_body_high_water`, the transport applies backpressure:
/// HTTP/1.1 pauses the socket read (`us_socket_pause` → TCP rwnd closes),
/// HTTP/3 stops `lsquic_stream_read` (`wantRead(false)` → lsquic withholds
/// `MAX_STREAM_DATA`). HTTP/2 uses a different mechanism (per-stream
/// WINDOW_UPDATE gated on the same consumption reports — see
/// `h2_client/ClientSession.replenishWindow`) so its limit is governed by
/// `H2.local_initial_window_size` instead. Resumed once consumption
/// reports bring the outstanding count below `receive_body_low_water`.
pub const receive_body_high_water: usize = 1 << 20; // 1 MiB
pub const receive_body_low_water: usize = 256 * 1024;

/// REFUSED_STREAM or graceful GOAWAY past our id: the server promises it
/// did not process the request, so re-dispatch from the top. Only reached
/// for `.bytes` bodies (replayable).
Expand Down Expand Up @@ -2036,6 +2050,12 @@ pub fn onData(
return;
};

// Must run before progressUpdate: a final chunk makes
// onAsyncHTTPCallback destroy the ThreadlocalAsyncHTTP that
// owns *this* HTTPClient inline on this thread, so `this`
// is poisoned by the time progressUpdate returns.
this.maybePauseReceive(is_ssl, socket, incoming_data.len);

if (report_progress) {
this.progressUpdate(is_ssl, ctx, socket);
return;
Expand All @@ -2050,6 +2070,8 @@ pub fn onData(
return;
};

this.maybePauseReceive(is_ssl, socket, incoming_data.len);
Comment thread
robobun marked this conversation as resolved.
Outdated

if (report_progress) {
this.progressUpdate(is_ssl, ctx, socket);
return;
Expand Down Expand Up @@ -2204,6 +2226,64 @@ pub fn setTimeout(this: *HTTPClient, socket: anytype, minutes: c_uint) void {
socket.setTimeoutMinutes(minutes);
}

/// Called from `onData` after body bytes have been appended and (if
/// streaming) delivered to JS. Counts `n` wire bytes towards
/// `outstanding_body_bytes` and, if the JS reader is reporting
Comment thread
robobun marked this conversation as resolved.
Outdated
/// consumption and the outstanding total has crossed the high-water
/// mark, pauses the socket read so TCP rwnd backpressures the server.
/// The proxy-tunnel path is excluded: its inner TLS session needs the
/// socket readable to complete handshake/close_notify regardless of the
/// application body, and pausing the carrier socket would wedge that.
fn maybePauseReceive(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, n: usize) void {
if (!this.signals.get(.body_consumption_tracked)) return;
if (this.state.stage == .done or this.state.stage == .fail) return;
if (this.state.flags.receive_paused) {
this.state.outstanding_body_bytes +|= n;
return;
}
this.state.outstanding_body_bytes +|= n;
if (this.state.outstanding_body_bytes < receive_body_high_water) return;
// handleResponseBody just ran: if the body is complete the socket is
// about to be released to the keep-alive pool (or closed) inside
// progressUpdate. Pausing it here would hand the next pooled request
// a socket with LIBUS_SOCKET_READABLE cleared and no one to call
// consumeResponseBody on its behalf.
if (this.state.isDone() or this.state.flags.is_redirect_pending) return;
if (this.proxy_tunnel != null) return;
if (socket.isClosedOrHasError()) return;
this.state.flags.receive_paused = true;
// While paused, the idle timer would otherwise fire with no socket
// activity to re-arm it. The response isn't "stalled" — the reader
// chose not to pull — so drop the timeout until we resume.
socket.setTimeoutMinutes(0);
socket.timeout(0);
_ = socket.pauseStream();
log("maybePauseReceive: paused at {d} bytes outstanding", .{this.state.outstanding_body_bytes});
}

/// HTTP-thread wake-up from `scheduleResponseBodyConsumed`: the JS reader
/// drained `bytes` from the ByteStream. For HTTP/1.1 this may resume a
/// socket read that `maybePauseReceive` paused. HTTP/2 and HTTP/3 have
/// dedicated handlers that reach here via their session dispatch.
pub fn consumeResponseBody(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, bytes: u32) void {
// `bytes` is post-decompression; clamp so a compression surplus
// doesn't leave `outstanding_body_bytes` underflowed — same rule as
// the h2 `consumed_bytes` clamp.
this.state.outstanding_body_bytes -|= @min(@as(usize, bytes), this.state.outstanding_body_bytes);
if (!this.state.flags.receive_paused) return;
// Disarming `body_consumption_tracked` (e.g. reader.cancel()) posts a
// saturating sentinel; treat that as "resume now" regardless of the
// low-water mark so the abandoned body can drain for keep-alive reuse.
const should_resume = this.state.outstanding_body_bytes <= receive_body_low_water or
!this.signals.get(.body_consumption_tracked);
if (!should_resume) return;
this.state.flags.receive_paused = false;
if (socket.isClosedOrHasError()) return;
_ = socket.resumeStream();
this.setTimeout(socket, 5);
log("consumeResponseBody: resumed, {d} bytes outstanding", .{this.state.outstanding_body_bytes});
}

pub fn drainResponseBody(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {

// Find out if we should not send any update.
Expand Down
9 changes: 8 additions & 1 deletion src/http/H3Client.zig
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,22 @@ pub const AltSvc = @import("./h3_client/AltSvc.zig");
/// via TestingAPIs.quicLiveCounts so they must be atomic.
pub var live_sessions = std.atomic.Value(u32).init(0);
pub var live_streams = std.atomic.Value(u32).init(0);
/// Cumulative response-body bytes delivered via `onStreamData` across all
/// h3 client streams in this process. Exposed for the backpressure tests:
/// a stalled JS reader should cap this near `receive_body_high_water` once
/// `wantRead(false)` lands, whereas without the gate it tracks whatever
/// the server pushes.
pub var body_bytes_received = std.atomic.Value(u64).init(0);

pub const TestingAPIs = struct {
/// Named distinctly from H2's `liveCounts` because generate-js2native.ts
/// mangles `[^A-Za-z]` to `_`, so `H2Client.zig` and `H3Client.zig` produce
/// the same path prefix and the function name has to differ.
pub fn quicLiveCounts(globalThis: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!jsc.JSValue {
const obj = jsc.JSValue.createEmptyObject(globalThis, 2);
const obj = jsc.JSValue.createEmptyObject(globalThis, 3);
obj.put(globalThis, jsc.ZigString.static("sessions"), .jsNumber(live_sessions.load(.monotonic)));
obj.put(globalThis, jsc.ZigString.static("streams"), .jsNumber(live_streams.load(.monotonic)));
obj.put(globalThis, jsc.ZigString.static("bodyBytesReceived"), .jsNumber(body_bytes_received.load(.monotonic)));
return obj;
}
};
Expand Down
Loading
Loading