Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
8db502f
http: couple per-stream h2 WINDOW_UPDATE to JS body consumption
robobun Apr 28, 2026
dc050be
test: move h2 backpressure tests to standalone file
robobun Apr 28, 2026
794ee0e
h2_client: clamp consumed_bytes to outstanding wire bytes
robobun Apr 28, 2026
4484653
Merge branch 'main' into farm/0a9cea98/h2-window-update-backpressure
Jarred-Sumner Apr 28, 2026
a566b1b
h2_client: gate consumption on body_consumption_tracked, not response…
robobun Apr 28, 2026
4803eac
Merge branch 'main' into farm/0a9cea98/h2-window-update-backpressure
dylan-conway Apr 28, 2026
fbb3f10
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun Apr 29, 2026
c28c8c0
ByteStream: report pre-buffered bytes handed out via drain()
robobun Apr 29, 2026
6aa5782
Merge branch 'main' into farm/0a9cea98/h2-window-update-backpressure
Jarred-Sumner Apr 29, 2026
4113c53
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 1, 2026
670012d
Extend response-body backpressure to HTTP/1.1 and HTTP/3
robobun May 2, 2026
7f808f2
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 2, 2026
08198eb
ci: retrigger gate (release WebKit now prefetched)
robobun May 2, 2026
f35a1b5
h1: resume the socket when the body completes while paused
robobun May 2, 2026
66aab38
h1: count total_body_received delta, not raw wire bytes
robobun May 2, 2026
77a98f2
test(h3): settle() holds for two consecutive 100ms gaps
robobun May 2, 2026
4b5dcf6
doc: InternalState backpressure fields are h1-only
robobun May 2, 2026
5f2d15d
ci: retrigger gate (WebKit cached in /root/.bun/build-cache)
robobun May 2, 2026
1c38a8f
test(h1): extend cancel-test stall pump to 256 MiB
robobun May 2, 2026
8709b87
h1 backpressure: observe pause/resume from the client; count body_out…
robobun May 2, 2026
e55438d
doc: ByteStream.drain comment is transport-agnostic too
robobun May 2, 2026
530c155
doc: body_consumption_tracked comments transport-agnostic (Signals, i…
robobun May 2, 2026
9b3b4b8
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 2, 2026
3b6d8f1
h1: bump h1_socket_resumes on every receive_paused clear; refresh 'wi…
robobun May 2, 2026
71af8d5
Cover remaining receive_paused edges and pre-buffered drain credits
robobun May 2, 2026
c88346f
HTTPThread: only wakeup on consume append, not coalesce
robobun May 2, 2026
783f56a
Raise receive_body_high_water 1→4 MiB, low_water 256K→1 MiB
robobun May 2, 2026
ceea99e
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 2, 2026
90bd4c9
h1: don't re-arm idle timeout during repeat-recv while paused; test f…
robobun May 2, 2026
ac57e89
test(node-http-backpressure-max): raise timeout 60→120s for darwin-14…
robobun May 2, 2026
425d2e2
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 5, 2026
e30543a
Merge remote-tracking branch 'origin/main' into farm/841d7d2d/fix-280…
robobun May 8, 2026
aa454aa
test: fetch().body.pipeThrough() propagates backpressure to socket (#…
robobun May 8, 2026
896feb4
Signals.isEmpty: include body_consumption_tracked in the null check
robobun May 8, 2026
3b45ea1
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 9, 2026
99915ab
test(node-http-backpressure): raise INT_MAX timeouts 30→60s for darwi…
robobun May 10, 2026
3c33ac6
FetchTasklet: rename clearStreamCancelHandler → clearStreamHandlers
robobun May 10, 2026
67640e5
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 22, 2026
338b758
[autofix.ci] apply automated fixes
autofix-ci[bot] May 22, 2026
c35a549
ci: retrigger (darwin x64 build-rust agent terminated mid-build on #5…
robobun May 22, 2026
61e10b4
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 24, 2026
59f65dc
http: fix consume_response_body doc — h2/h3 route to their session ha…
robobun May 24, 2026
bf070b0
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 26, 2026
39bee27
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun Jun 4, 2026
574eedb
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun Jun 6, 2026
df93330
test: surface child stderr when lineReader hits EOF before the awaite…
robobun Jun 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/bun.js/webcore/Body.zig
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
onStartStreaming: ?*const fn (ctx: *anyopaque) jsc.WebCore.DrainResult = null,
onReadableStreamAvailable: ?*const fn (ctx: *anyopaque, globalThis: *jsc.JSGlobalObject, readable: jsc.WebCore.ReadableStream) void = null,
onStreamCancelled: ?*const fn (ctx: ?*anyopaque) void = null,
onStreamConsumed: ?*const fn (ctx: ?*anyopaque, bytes: usize) void = null,
size_hint: Blob.SizeType = 0,

deinit: bool = false,
Expand Down Expand Up @@ -519,6 +520,12 @@
reader.cancel_ctx = task;
}
}
if (locked.onStreamConsumed) |onConsumed| {
if (locked.task) |task| {
reader.drain_handler = onConsumed;
reader.drain_ctx = task;
}
}

Check failure on line 528 in src/bun.js/webcore/Body.zig

View check run for this annotation

Claude / Claude Code Review

Response.clone() over h2 stalls at 16 MiB: tee() enables response_body_streaming without wiring drain_handler

`Body.Value.tee()` (reached via `res.clone()` on a `.Locked` fetch body) calls `onStartStreaming` — flipping `response_body_streaming` true — and then constructs a `ByteStream.Source` at Body.zig:1039-1044 without setting `reader.drain_handler`, unlike the `toReadableStream` path patched here. With the new `replenishWindow` gate, `avail = @min(consumed_bytes=0, unacked_bytes) = 0` forever, so no per-stream WINDOW_UPDATE is ever sent and `res.clone()` over HTTP/2 permanently stalls once the serve

reader.context.setup();

Expand Down
18 changes: 18 additions & 0 deletions src/bun.js/webcore/ByteStream.zig
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ pub fn unpipeWithoutDeref(this: *@This()) void {
this.pipe.onPipe = null;
}

/// Report `n` bytes delivered to the JS consumer (reader fulfilled, pipe
/// target, or buffer-action sink) so the producer can release backpressure.
/// Bytes parked in `this.buffer` awaiting a future `onPull` are *not*
/// reported until that pull happens.
inline fn didDrain(this: *@This(), n: usize) void {
if (n == 0) return;
const source = this.parent();
if (source.drain_handler) |handler| handler(source.drain_ctx, n);
}

pub fn onData(
this: *@This(),
stream: streams.Result,
Expand All @@ -98,13 +108,18 @@ pub fn onData(
this.has_received_last_chunk = stream.isDone();

if (this.pipe.ctx) |ctx| {
this.didDrain(stream.slice().len);
this.pipe.onPipe.?(ctx, stream, allocator);
return;
}

const chunk = stream.slice();

if (this.buffer_action) |*action| {
// Buffer-action consumers (`readableStreamToText` etc.) explicitly
// want the whole body; treat every append as consumed so the
// producer isn't throttled waiting for a pull that never comes.
this.didDrain(chunk.len);
if (stream == .err) {
defer {
this.buffer.clearAndFree();
Expand Down Expand Up @@ -206,6 +221,7 @@ pub fn onData(

log("ByteStream.onData pending.run()", .{});

this.didDrain(to_copy.len);
this.pending.run();

return;
Expand Down Expand Up @@ -306,6 +322,7 @@ pub fn onPull(this: *@This(), buffer: []u8, view: jsc.JSValue) streams.Result {
if (this.has_received_last_chunk and remaining_in_buffer.len == 0) {
this.buffer.clearAndFree();
this.done = true;
this.didDrain(to_write);

return .{
.into_array_and_done = .{
Expand All @@ -315,6 +332,7 @@ pub fn onPull(this: *@This(), buffer: []u8, view: jsc.JSValue) streams.Result {
};
}

this.didDrain(to_write);
return .{
.into_array = .{
.value = view,
Expand Down
5 changes: 5 additions & 0 deletions src/bun.js/webcore/ReadableStream.zig
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,11 @@ pub fn NewSource(
close_jsvalue: jsc.Strong.Optional = .empty,
cancel_handler: ?*const fn (?*anyopaque) void = null,
cancel_ctx: ?*anyopaque = null,
/// Invoked whenever `Context` hands bytes to the JS reader (as
/// opposed to parking them in an internal buffer). Used by fetch to
/// couple HTTP/2 per-stream WINDOW_UPDATE to actual body consumption.
drain_handler: ?*const fn (?*anyopaque, usize) void = null,
drain_ctx: ?*anyopaque = null,
globalThis: *JSGlobalObject = undefined,
this_jsvalue: jsc.JSValue = .zero,
is_closed: bool = false,
Expand Down
14 changes: 14 additions & 0 deletions src/bun.js/webcore/fetch/FetchTasklet.zig
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,8 @@ pub const FetchTasklet = struct {
const source = readable.ptr.Bytes.parent();
source.cancel_handler = null;
source.cancel_ctx = null;
source.drain_handler = null;
source.drain_ctx = null;
}
}
}
Expand All @@ -935,6 +937,17 @@ pub const FetchTasklet = struct {
this.ignoreRemainingResponseBody();
}

/// ByteStream delivered `bytes` to the JS reader. Forward to the HTTP
/// thread so the HTTP/2 client can release per-stream flow-control
/// window proportional to what JS has actually drained; no-op for
/// HTTP/1.1 sockets.
Comment thread
robobun marked this conversation as resolved.
Outdated
fn onStreamConsumedCallback(ctx: ?*anyopaque, bytes: usize) void {
const this = bun.cast(*FetchTasklet, ctx.?);
if (this.signal_store.aborted.load(.monotonic)) return;
const http_ = this.http orelse return;
bun.http.http_thread.scheduleResponseBodyConsumed(http_.async_http_id, bytes);
}

fn toBodyValue(this: *FetchTasklet) Body.Value {
if (this.getAbortError()) |err| {
return .{ .Error = err };
Expand All @@ -948,6 +961,7 @@ pub const FetchTasklet = struct {
.onStartStreaming = FetchTasklet.onStartStreamingHTTPResponseBodyCallback,
.onReadableStreamAvailable = FetchTasklet.onReadableStreamAvailable,
.onStreamCancelled = FetchTasklet.onStreamCancelledCallback,
.onStreamConsumed = FetchTasklet.onStreamConsumedCallback,
},
};
return response;
Expand Down
66 changes: 66 additions & 0 deletions src/http/HTTPThread.zig
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ has_pending_queued_abort: bool = false,
queued_shutdowns: std.ArrayListUnmanaged(ShutdownMessage) = std.ArrayListUnmanaged(ShutdownMessage){},
queued_writes: std.ArrayListUnmanaged(WriteMessage) = std.ArrayListUnmanaged(WriteMessage){},
queued_response_body_drains: std.ArrayListUnmanaged(DrainMessage) = std.ArrayListUnmanaged(DrainMessage){},
queued_response_body_consumed: std.ArrayListUnmanaged(ConsumeMessage) = std.ArrayListUnmanaged(ConsumeMessage){},

queued_shutdowns_lock: bun.Mutex = .{},
queued_writes_lock: bun.Mutex = .{},
queued_response_body_drains_lock: bun.Mutex = .{},
queued_response_body_consumed_lock: bun.Mutex = .{},

queued_threadlocal_proxy_derefs: std.ArrayListUnmanaged(*ProxyTunnel) = std.ArrayListUnmanaged(*ProxyTunnel){},

Expand Down Expand Up @@ -117,6 +119,10 @@ const WriteMessage = struct {
const DrainMessage = struct {
async_http_id: u32,
};
const ConsumeMessage = struct {
async_http_id: u32,
bytes: u32,
};
const ShutdownMessage = struct {
async_http_id: u32,
};
Expand Down Expand Up @@ -510,9 +516,45 @@ fn drainQueuedHTTPResponseBodyDrains(this: *@This()) void {
}
}

fn drainQueuedHTTPResponseBodyConsumed(this: *@This()) void {
while (true) {
var queued = brk: {
this.queued_response_body_consumed_lock.lock();
defer this.queued_response_body_consumed_lock.unlock();
const q = this.queued_response_body_consumed;
this.queued_response_body_consumed = .{};
break :brk q;
};
defer queued.deinit(bun.default_allocator);

for (queued.items) |msg| {
if (bun.http.socket_async_http_abort_tracker.get(msg.async_http_id)) |socket_ptr| {
switch (socket_ptr) {
inline .SocketTLS, .SocketTCP => |socket, tag| {
const is_tls = tag == .SocketTLS;
const HTTPContext = HTTPThread.NewHTTPContext(comptime is_tls);
const tagged = HTTPContext.getTaggedFromSocket(socket);
// Only HTTP/2 has per-stream receive windows; for
// HTTP/1.1 the tagged ptr is an HTTPClient and the
// message is a no-op.
if (tagged.get(bun.http.H2.ClientSession)) |session| {
session.consumeResponseBodyByHttpId(msg.async_http_id, msg.bytes);
}
},
}
}
}
if (queued.items.len == 0) {
break;
}
threadlog("drained {d} queued consumes", .{queued.items.len});
}
}

fn drainEvents(this: *@This()) void {
// Process any pending writes **before** aborting.
this.drainQueuedHTTPResponseBodyDrains();
this.drainQueuedHTTPResponseBodyConsumed();
this.drainQueuedWrites();
this.drainQueuedShutdowns();
bun.http.H3.PendingConnect.drainResolved();
Expand Down Expand Up @@ -664,6 +706,30 @@ pub fn scheduleResponseBodyDrain(this: *@This(), async_http_id: u32) void {
this.loop.loop.wakeup();
}

/// JS-thread → HTTP-thread notice that the `ReadableStream` reader for
/// `async_http_id` has drained `bytes` from its buffer. Consecutive messages
/// for the same id are coalesced under the lock so a tight `read()` loop
/// posts one entry per wake instead of one per pull.
pub fn scheduleResponseBodyConsumed(this: *@This(), async_http_id: u32, bytes: usize) void {
const n: u32 = @truncate(@min(bytes, @as(usize, std.math.maxInt(u32))));
if (n == 0) return;
{
this.queued_response_body_consumed_lock.lock();
defer this.queued_response_body_consumed_lock.unlock();
const items = this.queued_response_body_consumed.items;
if (items.len > 0 and items[items.len - 1].async_http_id == async_http_id) {
items[items.len - 1].bytes +|= n;
} else {
this.queued_response_body_consumed.append(bun.default_allocator, .{
.async_http_id = async_http_id,
.bytes = n,
}) catch |err| bun.handleOom(err);
}
}
if (this.has_awoken.load(.monotonic))
this.loop.loop.wakeup();
}

pub fn scheduleShutdown(this: *@This(), http: *AsyncHTTP) void {
threadlog("scheduleShutdown {d}", .{http.async_http_id});
{
Expand Down
38 changes: 35 additions & 3 deletions src/http/h2_client/ClientSession.zig
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,26 @@
}
}

/// HTTP-thread wake-up from `scheduleResponseBodyConsumed`: the JS reader
/// drained `bytes` from the ByteStream. Bump the stream's consumption
/// counter and release any per-stream window credit that has become
/// available.
pub fn consumeResponseBodyByHttpId(this: *ClientSession, async_http_id: u32, bytes: u32) void {
this.ref();
defer this.deref();
for (this.streams.values()) |stream| {
const client = stream.client orelse continue;
if (client.async_http_id != async_http_id) continue;
// `bytes` is decompressed; clamp the running total to wire bytes
// still outstanding so a compression surplus isn't banked to
// credit later DATA the reader hasn't touched.
stream.consumed_bytes = @min(stream.consumed_bytes +| bytes, stream.unacked_bytes);
this.replenishWindow();
if (this.write_buffer.isNotEmpty()) _ = this.flush() catch |err| this.failAll(err);
return;
}
}

/// HTTP-thread wake-up from `scheduleRequestWrite`: new body bytes (or
/// end-of-body) are available in the ThreadSafeStreamBuffer.
pub fn streamBodyByHttpId(this: *ClientSession, async_http_id: u32, ended: bool) void {
Expand All @@ -372,17 +392,29 @@

fn replenishWindow(this: *ClientSession) void {
const threshold = local_initial_window_size / 2;
// Connection-level credit stays receipt-based so one stream whose JS
// reader is stalled doesn't starve siblings of the shared window.
if (this.conn_unacked_bytes >= threshold) {
this.writeWindowUpdate(0, @intCast(this.conn_unacked_bytes));
this.conn_unacked_bytes = 0;
}
var it = this.streams.iterator();
while (it.next()) |e| {
const s = e.value_ptr.*;
if (s.unacked_bytes >= threshold and !s.remoteClosed()) {
this.writeWindowUpdate(s.id, @intCast(s.unacked_bytes));
s.unacked_bytes = 0;
if (s.remoteClosed()) continue;
// Streaming consumer (`res.body.getReader()`): credit only what JS
// has actually drained, clamped to wire bytes received so a
// decompressed body can't inflate the window past what was sent.
// Buffering consumers (`await res.text()` etc.) keep receipt-based
// crediting — the whole body is going into memory regardless, so
// withholding the window just slows the transfer.
const streaming = if (s.client) |c| c.signals.get(.response_body_streaming) else false;
const avail: u32 = if (streaming) @min(s.consumed_bytes, s.unacked_bytes) else s.unacked_bytes;
if (avail >= threshold) {
this.writeWindowUpdate(s.id, @intCast(avail));
s.unacked_bytes -= avail;
s.consumed_bytes -|= avail;
}

Check failure on line 417 in src/http/h2_client/ClientSession.zig

View check run for this annotation

Claude / Claude Code Review

Abandoned/cancelled h2 bodies and S3 streaming downloads stall at 16 MiB under the new streaming gate

The new `response_body_streaming` gate assumes every path that sets that signal also drives `consumed_bytes`, but several don't: `ignoreRemainingResponseBody()` (reached via `reader.cancel()` and `Response` GC) calls `enableResponseBodyStreaming()` then nulls `drain_handler`, and `S3HttpDownloadStreamingTask` (s3/client.zig:615) sets the signal at creation but never posts `scheduleResponseBodyConsumed`. In both cases `avail = @min(0, unacked_bytes) = 0` forever, so the server stalls at the 16 Mi
Comment thread
claude[bot] marked this conversation as resolved.
Outdated
}
}

Expand Down
11 changes: 10 additions & 1 deletion src/http/h2_client/Stream.zig
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,17 @@ headers_end_stream: bool = false,
/// or final status arrives.
awaiting_continue: bool = false,
fatal_error: ?anyerror = null,
/// DATA bytes consumed since the last WINDOW_UPDATE for this stream.
/// DATA bytes received since the last per-stream WINDOW_UPDATE. For
/// buffering consumers (`response_body_streaming` false) this alone drives
/// the credit; for streaming consumers it is the ceiling on what
/// `consumed_bytes` may release.
unacked_bytes: u32 = 0,
/// Bytes the JS `ReadableStream` reader has actually drained, reported via
/// `scheduleResponseBodyConsumed`. Only consulted when
/// `response_body_streaming` is true; `replenishWindow` credits
/// `min(consumed_bytes, unacked_bytes)` so a stalled reader withholds the
/// per-stream window and a compressed body can't over-credit.
consumed_bytes: u32 = 0,
/// Σ DATA payload bytes (post-padding) for §8.1.1 Content-Length check —
/// `total_body_received` is clamped at content_length so it can't catch
/// overshoot.
Expand Down
Loading
Loading