Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
8db502f
http: couple per-stream h2 WINDOW_UPDATE to JS body consumption
robobun Apr 28, 2026
dc050be
test: move h2 backpressure tests to standalone file
robobun Apr 28, 2026
794ee0e
h2_client: clamp consumed_bytes to outstanding wire bytes
robobun Apr 28, 2026
4484653
Merge branch 'main' into farm/0a9cea98/h2-window-update-backpressure
Jarred-Sumner Apr 28, 2026
a566b1b
h2_client: gate consumption on body_consumption_tracked, not response…
robobun Apr 28, 2026
4803eac
Merge branch 'main' into farm/0a9cea98/h2-window-update-backpressure
dylan-conway Apr 28, 2026
fbb3f10
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun Apr 29, 2026
c28c8c0
ByteStream: report pre-buffered bytes handed out via drain()
robobun Apr 29, 2026
6aa5782
Merge branch 'main' into farm/0a9cea98/h2-window-update-backpressure
Jarred-Sumner Apr 29, 2026
4113c53
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 1, 2026
670012d
Extend response-body backpressure to HTTP/1.1 and HTTP/3
robobun May 2, 2026
7f808f2
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 2, 2026
08198eb
ci: retrigger gate (release WebKit now prefetched)
robobun May 2, 2026
f35a1b5
h1: resume the socket when the body completes while paused
robobun May 2, 2026
66aab38
h1: count total_body_received delta, not raw wire bytes
robobun May 2, 2026
77a98f2
test(h3): settle() holds for two consecutive 100ms gaps
robobun May 2, 2026
4b5dcf6
doc: InternalState backpressure fields are h1-only
robobun May 2, 2026
5f2d15d
ci: retrigger gate (WebKit cached in /root/.bun/build-cache)
robobun May 2, 2026
1c38a8f
test(h1): extend cancel-test stall pump to 256 MiB
robobun May 2, 2026
8709b87
h1 backpressure: observe pause/resume from the client; count body_out…
robobun May 2, 2026
e55438d
doc: ByteStream.drain comment is transport-agnostic too
robobun May 2, 2026
530c155
doc: body_consumption_tracked comments transport-agnostic (Signals, i…
robobun May 2, 2026
9b3b4b8
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 2, 2026
3b6d8f1
h1: bump h1_socket_resumes on every receive_paused clear; refresh 'wi…
robobun May 2, 2026
71af8d5
Cover remaining receive_paused edges and pre-buffered drain credits
robobun May 2, 2026
c88346f
HTTPThread: only wakeup on consume append, not coalesce
robobun May 2, 2026
783f56a
Raise receive_body_high_water 1→4 MiB, low_water 256K→1 MiB
robobun May 2, 2026
ceea99e
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 2, 2026
90bd4c9
h1: don't re-arm idle timeout during repeat-recv while paused; test f…
robobun May 2, 2026
ac57e89
test(node-http-backpressure-max): raise timeout 60→120s for darwin-14…
robobun May 2, 2026
425d2e2
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 5, 2026
e30543a
Merge remote-tracking branch 'origin/main' into farm/841d7d2d/fix-280…
robobun May 8, 2026
aa454aa
test: fetch().body.pipeThrough() propagates backpressure to socket (#…
robobun May 8, 2026
896feb4
Signals.isEmpty: include body_consumption_tracked in the null check
robobun May 8, 2026
3b45ea1
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 9, 2026
99915ab
test(node-http-backpressure): raise INT_MAX timeouts 30→60s for darwi…
robobun May 10, 2026
3c33ac6
FetchTasklet: rename clearStreamCancelHandler → clearStreamHandlers
robobun May 10, 2026
67640e5
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 22, 2026
338b758
[autofix.ci] apply automated fixes
autofix-ci[bot] May 22, 2026
c35a549
ci: retrigger (darwin x64 build-rust agent terminated mid-build on #5…
robobun May 22, 2026
61e10b4
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 24, 2026
59f65dc
http: fix consume_response_body doc — h2/h3 route to their session ha…
robobun May 24, 2026
bf070b0
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun May 26, 2026
39bee27
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun Jun 4, 2026
574eedb
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun Jun 6, 2026
df93330
test: surface child stderr when lineReader hits EOF before the awaite…
robobun Jun 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/bun.js/webcore/Body.zig
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub const PendingValue = struct {
onStartStreaming: ?*const fn (ctx: *anyopaque) jsc.WebCore.DrainResult = null,
onReadableStreamAvailable: ?*const fn (ctx: *anyopaque, globalThis: *jsc.JSGlobalObject, readable: jsc.WebCore.ReadableStream) void = null,
onStreamCancelled: ?*const fn (ctx: ?*anyopaque) void = null,
onStreamConsumed: ?*const fn (ctx: ?*anyopaque, bytes: usize) void = null,
size_hint: Blob.SizeType = 0,

deinit: bool = false,
Expand Down Expand Up @@ -519,6 +520,12 @@ pub const Value = union(Tag) {
reader.cancel_ctx = task;
}
}
if (locked.onStreamConsumed) |onConsumed| {
if (locked.task) |task| {
reader.drain_handler = onConsumed;
reader.drain_ctx = task;
}
}

reader.context.setup();

Expand Down Expand Up @@ -1034,6 +1041,19 @@ pub const Value = union(Tag) {
.globalThis = globalThis,
});

if (locked.onStreamCancelled) |onCancelled| {
if (locked.task) |task| {
reader.cancel_handler = onCancelled;
reader.cancel_ctx = task;
}
}
if (locked.onStreamConsumed) |onConsumed| {
if (locked.task) |task| {
reader.drain_handler = onConsumed;
reader.drain_ctx = task;
}
}

reader.context.setup();

if (drain_result == .estimated_size) {
Expand Down
24 changes: 24 additions & 0 deletions src/bun.js/webcore/ByteStream.zig
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ pub fn unpipeWithoutDeref(this: *@This()) void {
this.pipe.onPipe = null;
}

/// Report `n` bytes delivered to the JS consumer (reader fulfilled, pipe
/// target, or buffer-action sink) so the producer can release backpressure.
/// Bytes parked in `this.buffer` awaiting a future `onPull` are *not*
/// reported until that pull happens.
inline fn didDrain(this: *@This(), n: usize) void {
if (n == 0) return;
const source = this.parent();
if (source.drain_handler) |handler| handler(source.drain_ctx, n);
}

pub fn onData(
this: *@This(),
stream: streams.Result,
Expand All @@ -98,13 +108,18 @@ pub fn onData(
this.has_received_last_chunk = stream.isDone();

if (this.pipe.ctx) |ctx| {
this.didDrain(stream.slice().len);
this.pipe.onPipe.?(ctx, stream, allocator);
return;
}

const chunk = stream.slice();

if (this.buffer_action) |*action| {
// Buffer-action consumers (`readableStreamToText` etc.) explicitly
// want the whole body; treat every append as consumed so the
// producer isn't throttled waiting for a pull that never comes.
this.didDrain(chunk.len);
if (stream == .err) {
defer {
this.buffer.clearAndFree();
Expand Down Expand Up @@ -206,6 +221,7 @@ pub fn onData(

log("ByteStream.onData pending.run()", .{});

this.didDrain(to_copy.len);
this.pending.run();

return;
Expand Down Expand Up @@ -306,6 +322,7 @@ pub fn onPull(this: *@This(), buffer: []u8, view: jsc.JSValue) streams.Result {
if (this.has_received_last_chunk and remaining_in_buffer.len == 0) {
this.buffer.clearAndFree();
this.done = true;
this.didDrain(to_write);

return .{
.into_array_and_done = .{
Expand All @@ -315,6 +332,7 @@ pub fn onPull(this: *@This(), buffer: []u8, view: jsc.JSValue) streams.Result {
};
}

this.didDrain(to_write);
return .{
.into_array = .{
.value = view,
Expand Down Expand Up @@ -389,6 +407,12 @@ pub fn deinit(this: *@This()) void {

pub fn drain(this: *@This()) bun.ByteList {
if (this.buffer.items.len > 0) {
// Bytes placed here before the JS stream was constructed (e.g.
// the `.owned` drain_result from `onStartStreaming`) are handed
// out via `handle.drain()` without going through `onPull`; report
// them so they don't become a permanent `unacked_bytes` floor in
// the HTTP/2 per-stream window accounting.
Comment thread
robobun marked this conversation as resolved.
Outdated
this.didDrain(this.buffer.items.len);
return bun.ByteList.moveFromList(&this.buffer);
}
return .{};
Expand Down
5 changes: 5 additions & 0 deletions src/bun.js/webcore/ReadableStream.zig
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,11 @@ pub fn NewSource(
close_jsvalue: jsc.Strong.Optional = .empty,
cancel_handler: ?*const fn (?*anyopaque) void = null,
cancel_ctx: ?*anyopaque = null,
/// Invoked whenever `Context` hands bytes to the JS reader (as
/// opposed to parking them in an internal buffer). Used by fetch to
/// couple HTTP/2 per-stream WINDOW_UPDATE to actual body consumption.
drain_handler: ?*const fn (?*anyopaque, usize) void = null,
drain_ctx: ?*anyopaque = null,
globalThis: *JSGlobalObject = undefined,
this_jsvalue: jsc.JSValue = .zero,
is_closed: bool = false,
Expand Down
36 changes: 36 additions & 0 deletions src/bun.js/webcore/fetch/FetchTasklet.zig
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,12 @@ pub const FetchTasklet = struct {

if (this.http) |http_| {
http_.enableResponseBodyStreaming();
// Both Body.toReadableStream and Body.tee wire `drain_handler`
// on the ByteStream they construct after this returns, so
// `scheduleResponseBodyConsumed` will fire for every reader
// pull. Arm the signal so the HTTP/2 client gates per-stream
// WINDOW_UPDATE on those reports instead of on receipt.
this.signal_store.body_consumption_tracked.store(true, .release);

// If the server sent the headers and the response body in two separate socket writes
// and if the server doesn't close the connection by itself
Expand Down Expand Up @@ -925,6 +931,8 @@ pub const FetchTasklet = struct {
const source = readable.ptr.Bytes.parent();
source.cancel_handler = null;
source.cancel_ctx = null;
source.drain_handler = null;
source.drain_ctx = null;
}
}
}
Expand All @@ -935,6 +943,17 @@ pub const FetchTasklet = struct {
this.ignoreRemainingResponseBody();
}

/// ByteStream delivered `bytes` to the JS reader. Forward to the HTTP
/// thread so the HTTP/2 client can release per-stream flow-control
/// window proportional to what JS has actually drained; no-op for
/// HTTP/1.1 sockets.
Comment thread
robobun marked this conversation as resolved.
Outdated
fn onStreamConsumedCallback(ctx: ?*anyopaque, bytes: usize) void {
const this = bun.cast(*FetchTasklet, ctx.?);
if (this.signal_store.aborted.load(.monotonic)) return;
const http_ = this.http orelse return;
bun.http.http_thread.scheduleResponseBodyConsumed(http_.async_http_id, bytes);
}

fn toBodyValue(this: *FetchTasklet) Body.Value {
if (this.getAbortError()) |err| {
return .{ .Error = err };
Expand All @@ -948,6 +967,7 @@ pub const FetchTasklet = struct {
.onStartStreaming = FetchTasklet.onStartStreamingHTTPResponseBodyCallback,
.onReadableStreamAvailable = FetchTasklet.onReadableStreamAvailable,
.onStreamCancelled = FetchTasklet.onStreamCancelledCallback,
.onStreamConsumed = FetchTasklet.onStreamConsumedCallback,
},
};
return response;
Expand Down Expand Up @@ -998,6 +1018,22 @@ pub const FetchTasklet = struct {
if (this.http) |http_| {
http_.enableResponseBodyStreaming();
}
// `drain_handler` is about to be cleared and incoming chunks will
// be dropped without ever reaching the ByteStream, so no more
// `scheduleResponseBodyConsumed` reports. Disarm the tracking
// signal so the HTTP/2 client falls back to receipt-based
// per-stream WINDOW_UPDATE and the abandoned body can drain
// instead of stalling the stream at the initial window. The
// sentinel consume message both wakes the HTTP thread (so
// `replenishWindow` re-runs — its only other trigger is inbound
// DATA, and a server that has exhausted the window sends none)
// and saturates `consumed_bytes` to `unacked_bytes` so the first
// re-run releases whatever is already outstanding regardless of
// which order the atomic store and the queue drain land in.
this.signal_store.body_consumption_tracked.store(false, .release);
if (this.http) |http_| {
bun.http.http_thread.scheduleResponseBodyConsumed(http_.async_http_id, std.math.maxInt(u32));
}
// we should not keep the process alive if we are ignoring the body
const vm = this.javascript_vm;
this.poll_ref.unref(vm);
Expand Down
66 changes: 66 additions & 0 deletions src/http/HTTPThread.zig
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ has_pending_queued_abort: bool = false,
queued_shutdowns: std.ArrayListUnmanaged(ShutdownMessage) = std.ArrayListUnmanaged(ShutdownMessage){},
queued_writes: std.ArrayListUnmanaged(WriteMessage) = std.ArrayListUnmanaged(WriteMessage){},
queued_response_body_drains: std.ArrayListUnmanaged(DrainMessage) = std.ArrayListUnmanaged(DrainMessage){},
queued_response_body_consumed: std.ArrayListUnmanaged(ConsumeMessage) = std.ArrayListUnmanaged(ConsumeMessage){},

queued_shutdowns_lock: bun.Mutex = .{},
queued_writes_lock: bun.Mutex = .{},
queued_response_body_drains_lock: bun.Mutex = .{},
queued_response_body_consumed_lock: bun.Mutex = .{},

queued_threadlocal_proxy_derefs: std.ArrayListUnmanaged(*ProxyTunnel) = std.ArrayListUnmanaged(*ProxyTunnel){},

Expand Down Expand Up @@ -117,6 +119,10 @@ const WriteMessage = struct {
const DrainMessage = struct {
async_http_id: u32,
};
const ConsumeMessage = struct {
async_http_id: u32,
bytes: u32,
};
const ShutdownMessage = struct {
async_http_id: u32,
};
Expand Down Expand Up @@ -510,9 +516,45 @@ fn drainQueuedHTTPResponseBodyDrains(this: *@This()) void {
}
}

fn drainQueuedHTTPResponseBodyConsumed(this: *@This()) void {
while (true) {
var queued = brk: {
this.queued_response_body_consumed_lock.lock();
defer this.queued_response_body_consumed_lock.unlock();
const q = this.queued_response_body_consumed;
this.queued_response_body_consumed = .{};
break :brk q;
};
defer queued.deinit(bun.default_allocator);

for (queued.items) |msg| {
if (bun.http.socket_async_http_abort_tracker.get(msg.async_http_id)) |socket_ptr| {
switch (socket_ptr) {
inline .SocketTLS, .SocketTCP => |socket, tag| {
const is_tls = tag == .SocketTLS;
const HTTPContext = HTTPThread.NewHTTPContext(comptime is_tls);
const tagged = HTTPContext.getTaggedFromSocket(socket);
// Only HTTP/2 has per-stream receive windows; for
// HTTP/1.1 the tagged ptr is an HTTPClient and the
// message is a no-op.
if (tagged.get(bun.http.H2.ClientSession)) |session| {
session.consumeResponseBodyByHttpId(msg.async_http_id, msg.bytes);
}
},
}
}
}
if (queued.items.len == 0) {
break;
}
threadlog("drained {d} queued consumes", .{queued.items.len});
}
}

fn drainEvents(this: *@This()) void {
// Process any pending writes **before** aborting.
this.drainQueuedHTTPResponseBodyDrains();
this.drainQueuedHTTPResponseBodyConsumed();
this.drainQueuedWrites();
this.drainQueuedShutdowns();
bun.http.H3.PendingConnect.drainResolved();
Expand Down Expand Up @@ -664,6 +706,30 @@ pub fn scheduleResponseBodyDrain(this: *@This(), async_http_id: u32) void {
this.loop.loop.wakeup();
}

/// JS-thread → HTTP-thread notice that the `ReadableStream` reader for
/// `async_http_id` has drained `bytes` from its buffer. Consecutive messages
/// for the same id are coalesced under the lock so a tight `read()` loop
/// posts one entry per wake instead of one per pull.
pub fn scheduleResponseBodyConsumed(this: *@This(), async_http_id: u32, bytes: usize) void {
const n: u32 = @truncate(@min(bytes, @as(usize, std.math.maxInt(u32))));
if (n == 0) return;
{
this.queued_response_body_consumed_lock.lock();
defer this.queued_response_body_consumed_lock.unlock();
const items = this.queued_response_body_consumed.items;
if (items.len > 0 and items[items.len - 1].async_http_id == async_http_id) {
items[items.len - 1].bytes +|= n;
} else {
this.queued_response_body_consumed.append(bun.default_allocator, .{
.async_http_id = async_http_id,
.bytes = n,
}) catch |err| bun.handleOom(err);
}
}
if (this.has_awoken.load(.monotonic))
this.loop.loop.wakeup();
}

pub fn scheduleShutdown(this: *@This(), http: *AsyncHTTP) void {
threadlog("scheduleShutdown {d}", .{http.async_http_id});
{
Expand Down
11 changes: 11 additions & 0 deletions src/http/Signals.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ const Signals = @This();

header_progress: ?*std.atomic.Value(bool) = null,
response_body_streaming: ?*std.atomic.Value(bool) = null,
/// Distinct from `response_body_streaming`: set only while a JS consumer
/// is wired to report drained bytes via `scheduleResponseBodyConsumed`.
/// `response_body_streaming` is also set by paths that never report
/// consumption (S3 streaming download, abandoned bodies via
/// `ignoreRemainingResponseBody`); gating flow-control on that would
/// deadlock those streams. The h2 client uses this signal — not
/// `response_body_streaming` — to decide whether per-stream WINDOW_UPDATE
/// should be consumption-gated or receipt-based.
body_consumption_tracked: ?*std.atomic.Value(bool) = null,
Comment thread
robobun marked this conversation as resolved.
Outdated
Comment thread
robobun marked this conversation as resolved.
Outdated
aborted: ?*std.atomic.Value(bool) = null,
cert_errors: ?*std.atomic.Value(bool) = null,
upgraded: ?*std.atomic.Value(bool) = null,
Expand All @@ -12,13 +21,15 @@ pub fn isEmpty(this: *const Signals) bool {
pub const Store = struct {
header_progress: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
response_body_streaming: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
body_consumption_tracked: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
aborted: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
cert_errors: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
upgraded: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
pub fn to(this: *Store) Signals {
return .{
.header_progress = &this.header_progress,
.response_body_streaming = &this.response_body_streaming,
.body_consumption_tracked = &this.body_consumption_tracked,
.aborted = &this.aborted,
.cert_errors = &this.cert_errors,
.upgraded = &this.upgraded,
Expand Down
41 changes: 38 additions & 3 deletions src/http/h2_client/ClientSession.zig
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,26 @@ pub fn drainResponseBodyByHttpId(this: *ClientSession, async_http_id: u32) void
}
}

/// HTTP-thread wake-up from `scheduleResponseBodyConsumed`: the JS reader
/// drained `bytes` from the ByteStream. Bump the stream's consumption
/// counter and release any per-stream window credit that has become
/// available.
pub fn consumeResponseBodyByHttpId(this: *ClientSession, async_http_id: u32, bytes: u32) void {
this.ref();
defer this.deref();
for (this.streams.values()) |stream| {
const client = stream.client orelse continue;
if (client.async_http_id != async_http_id) continue;
// `bytes` is decompressed; clamp the running total to wire bytes
// still outstanding so a compression surplus isn't banked to
// credit later DATA the reader hasn't touched.
stream.consumed_bytes = @min(stream.consumed_bytes +| bytes, stream.unacked_bytes);
this.replenishWindow();
if (this.write_buffer.isNotEmpty()) _ = this.flush() catch |err| this.failAll(err);
return;
}
}

/// HTTP-thread wake-up from `scheduleRequestWrite`: new body bytes (or
/// end-of-body) are available in the ThreadSafeStreamBuffer.
pub fn streamBodyByHttpId(this: *ClientSession, async_http_id: u32, ended: bool) void {
Expand All @@ -372,16 +392,31 @@ pub fn writeWindowUpdate(this: *ClientSession, stream_id: u32, increment: u31) v

fn replenishWindow(this: *ClientSession) void {
const threshold = local_initial_window_size / 2;
// Connection-level credit stays receipt-based so one stream whose JS
// reader is stalled doesn't starve siblings of the shared window.
if (this.conn_unacked_bytes >= threshold) {
this.writeWindowUpdate(0, @intCast(this.conn_unacked_bytes));
this.conn_unacked_bytes = 0;
}
var it = this.streams.iterator();
while (it.next()) |e| {
const s = e.value_ptr.*;
if (s.unacked_bytes >= threshold and !s.remoteClosed()) {
this.writeWindowUpdate(s.id, @intCast(s.unacked_bytes));
s.unacked_bytes = 0;
if (s.remoteClosed()) continue;
// `body_consumption_tracked` is set only while a JS consumer is
// reporting drained bytes via `scheduleResponseBodyConsumed`
// (fetch `res.body` with a `drain_handler` wired). It is *not*
// set for buffering consumers (`await res.text()`), S3 streaming
// downloads, or once the body is abandoned via
// `ignoreRemainingResponseBody` — those stay receipt-based so
// the transfer completes. When tracked, credit only what JS has
// actually drained, clamped to wire bytes received so a
// decompressed body can't inflate the window past what was sent.
const tracked = if (s.client) |c| c.signals.get(.body_consumption_tracked) else false;
const avail: u32 = if (tracked) @min(s.consumed_bytes, s.unacked_bytes) else s.unacked_bytes;
if (avail >= threshold) {
this.writeWindowUpdate(s.id, @intCast(avail));
s.unacked_bytes -= avail;
s.consumed_bytes -|= avail;
}
}
}
Expand Down
Loading
Loading