-
Notifications
You must be signed in to change notification settings - Fork 4.7k
http: couple fetch() receive backpressure to JS body consumption (h1/h2/h3) #29831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
robobun
wants to merge
46
commits into
main
Choose a base branch
from
farm/0a9cea98/h2-window-update-backpressure
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 16 commits
Commits
Show all changes
46 commits
Select commit
Hold shift + click to select a range
8db502f
http: couple per-stream h2 WINDOW_UPDATE to JS body consumption
robobun dc050be
test: move h2 backpressure tests to standalone file
robobun 794ee0e
h2_client: clamp consumed_bytes to outstanding wire bytes
robobun 4484653
Merge branch 'main' into farm/0a9cea98/h2-window-update-backpressure
Jarred-Sumner a566b1b
h2_client: gate consumption on body_consumption_tracked, not response…
robobun 4803eac
Merge branch 'main' into farm/0a9cea98/h2-window-update-backpressure
dylan-conway fbb3f10
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun c28c8c0
ByteStream: report pre-buffered bytes handed out via drain()
robobun 6aa5782
Merge branch 'main' into farm/0a9cea98/h2-window-update-backpressure
Jarred-Sumner 4113c53
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun 670012d
Extend response-body backpressure to HTTP/1.1 and HTTP/3
robobun 7f808f2
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun 08198eb
ci: retrigger gate (release WebKit now prefetched)
robobun f35a1b5
h1: resume the socket when the body completes while paused
robobun 66aab38
h1: count total_body_received delta, not raw wire bytes
robobun 77a98f2
test(h3): settle() holds for two consecutive 100ms gaps
robobun 4b5dcf6
doc: InternalState backpressure fields are h1-only
robobun 5f2d15d
ci: retrigger gate (WebKit cached in /root/.bun/build-cache)
robobun 1c38a8f
test(h1): extend cancel-test stall pump to 256 MiB
robobun 8709b87
h1 backpressure: observe pause/resume from the client; count body_out…
robobun e55438d
doc: ByteStream.drain comment is transport-agnostic too
robobun 530c155
doc: body_consumption_tracked comments transport-agnostic (Signals, i…
robobun 9b3b4b8
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun 3b6d8f1
h1: bump h1_socket_resumes on every receive_paused clear; refresh 'wi…
robobun 71af8d5
Cover remaining receive_paused edges and pre-buffered drain credits
robobun c88346f
HTTPThread: only wakeup on consume append, not coalesce
robobun 783f56a
Raise receive_body_high_water 1→4 MiB, low_water 256K→1 MiB
robobun ceea99e
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun 90bd4c9
h1: don't re-arm idle timeout during repeat-recv while paused; test f…
robobun ac57e89
test(node-http-backpressure-max): raise timeout 60→120s for darwin-14…
robobun 425d2e2
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun e30543a
Merge remote-tracking branch 'origin/main' into farm/841d7d2d/fix-280…
robobun aa454aa
test: fetch().body.pipeThrough() propagates backpressure to socket (#…
robobun 896feb4
Signals.isEmpty: include body_consumption_tracked in the null check
robobun 3b45ea1
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun 99915ab
test(node-http-backpressure): raise INT_MAX timeouts 30→60s for darwi…
robobun 3c33ac6
FetchTasklet: rename clearStreamCancelHandler → clearStreamHandlers
robobun 67640e5
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun 338b758
[autofix.ci] apply automated fixes
autofix-ci[bot] c35a549
ci: retrigger (darwin x64 build-rust agent terminated mid-build on #5…
robobun 61e10b4
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun 59f65dc
http: fix consume_response_body doc — h2/h3 route to their session ha…
robobun bf070b0
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun 39bee27
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun 574eedb
Merge remote-tracking branch 'origin/main' into farm/0a9cea98/h2-wind…
robobun df93330
test: surface child stderr when lineReader hits EOF before the awaite…
robobun File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -301,6 +301,20 @@ | |
| this.start_(true); | ||
| } | ||
|
|
||
| /// Response-body receive backpressure for streaming consumers | ||
| /// (`signals.body_consumption_tracked` set). Once the wire bytes handed | ||
| /// to JS but not yet reported drained via `scheduleResponseBodyConsumed` | ||
| /// exceed `receive_body_high_water`, the transport applies backpressure: | ||
| /// HTTP/1.1 pauses the socket read (`us_socket_pause` → TCP rwnd closes), | ||
| /// HTTP/3 stops `lsquic_stream_read` (`wantRead(false)` → lsquic withholds | ||
| /// `MAX_STREAM_DATA`). HTTP/2 uses a different mechanism (per-stream | ||
| /// WINDOW_UPDATE gated on the same consumption reports — see | ||
| /// `h2_client/ClientSession.replenishWindow`) so its limit is governed by | ||
| /// `H2.local_initial_window_size` instead. Resumed once consumption | ||
| /// reports bring the outstanding count below `receive_body_low_water`. | ||
| pub const receive_body_high_water: usize = 1 << 20; // 1 MiB | ||
| pub const receive_body_low_water: usize = 256 * 1024; | ||
|
|
||
| /// REFUSED_STREAM or graceful GOAWAY past our id: the server promises it | ||
| /// did not process the request, so re-dispatch from the top. Only reached | ||
| /// for `.bytes` bodies (replayable). | ||
|
|
@@ -2031,11 +2045,29 @@ | |
| .body => { | ||
| this.setTimeout(socket, 5); | ||
|
|
||
| const before = this.state.total_body_received; | ||
| const report_progress = this.handleResponseBody(incoming_data, false) catch |err| { | ||
| this.closeAndFail(err, is_ssl, socket); | ||
| return; | ||
| }; | ||
|
|
||
| // Must run before progressUpdate: a final chunk makes | ||
| // onAsyncHTTPCallback destroy the ThreadlocalAsyncHTTP that | ||
| // owns *this* HTTPClient inline on this thread, so `this` | ||
| // is poisoned by the time progressUpdate returns. | ||
| // | ||
| // Count the `total_body_received` delta, not | ||
| // `incoming_data.len`: for Transfer-Encoding: chunked the | ||
| // latter includes per-chunk framing (hex size + CRLFs) that | ||
| // never reaches JS, so `didDrain` can't credit it and it | ||
| // would accumulate as a permanent floor under | ||
| // `outstanding_body_bytes`. Once that floor passes the | ||
| // low-water mark, a long-lived SSE stream deadlocks on the | ||
| // first pause. The .body arm uses the same delta for | ||
| // symmetry (and to discard any trailing bytes past | ||
| // Content-Length). | ||
| this.maybePauseReceive(is_ssl, socket, this.state.total_body_received -| before); | ||
|
|
||
| if (report_progress) { | ||
| this.progressUpdate(is_ssl, ctx, socket); | ||
| return; | ||
|
|
@@ -2045,11 +2077,14 @@ | |
| .body_chunk => { | ||
| this.setTimeout(socket, 5); | ||
|
|
||
| const before = this.state.total_body_received; | ||
| const report_progress = this.handleResponseBodyChunkedEncoding(incoming_data) catch |err| { | ||
| this.closeAndFail(err, is_ssl, socket); | ||
| return; | ||
| }; | ||
|
|
||
| this.maybePauseReceive(is_ssl, socket, this.state.total_body_received -| before); | ||
|
|
||
| if (report_progress) { | ||
| this.progressUpdate(is_ssl, ctx, socket); | ||
| return; | ||
|
|
@@ -2204,6 +2239,74 @@ | |
| socket.setTimeoutMinutes(minutes); | ||
| } | ||
|
|
||
| /// Called from `onData` after body bytes have been appended and (if | ||
| /// streaming) delivered to JS. Counts `n` wire bytes towards | ||
| /// `outstanding_body_bytes` and, if the JS reader is reporting | ||
|
robobun marked this conversation as resolved.
Outdated
|
||
| /// consumption and the outstanding total has crossed the high-water | ||
| /// mark, pauses the socket read so TCP rwnd backpressures the server. | ||
| /// The proxy-tunnel path is excluded: its inner TLS session needs the | ||
| /// socket readable to complete handshake/close_notify regardless of the | ||
| /// application body, and pausing the carrier socket would wedge that. | ||
| fn maybePauseReceive(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, n: usize) void { | ||
| if (!this.signals.get(.body_consumption_tracked)) return; | ||
| if (this.state.stage == .done or this.state.stage == .fail) return; | ||
| this.state.outstanding_body_bytes +|= n; | ||
| // handleResponseBody just ran: if the body is complete (or a redirect | ||
| // is pending) the socket is about to be released to the keep-alive | ||
| // pool — or closed — inside progressUpdate. Leaving it paused would | ||
| // hand the next pooled request a socket with LIBUS_SOCKET_READABLE | ||
| // cleared and no consumeResponseBody to re-enable it. | ||
| // | ||
| // We can reach here with `receive_paused` already true because | ||
| // uSockets' repeat-recv fast path (loop.c) keeps calling recv() in | ||
| // the same epoll tick while the buffer comes back full, without | ||
| // re-consulting poll flags; a us_socket_pause() issued from a | ||
| // previous on_data in that loop doesn't stop the next recv(). The | ||
| // final chunk can therefore arrive while receive_paused is set. | ||
| if (this.state.isDone() or this.state.flags.is_redirect_pending) { | ||
| if (this.state.flags.receive_paused) { | ||
| this.state.flags.receive_paused = false; | ||
| if (!socket.isClosedOrHasError()) _ = socket.resumeStream(); | ||
| } | ||
| return; | ||
| } | ||
| if (this.state.flags.receive_paused) return; | ||
| if (this.state.outstanding_body_bytes < receive_body_high_water) return; | ||
| if (this.proxy_tunnel != null) return; | ||
| if (socket.isClosedOrHasError()) return; | ||
| this.state.flags.receive_paused = true; | ||
| // While paused, the idle timer would otherwise fire with no socket | ||
| // activity to re-arm it. The response isn't "stalled" — the reader | ||
| // chose not to pull — so drop the timeout until we resume. | ||
| socket.setTimeoutMinutes(0); | ||
| socket.timeout(0); | ||
| _ = socket.pauseStream(); | ||
| log("maybePauseReceive: paused at {d} bytes outstanding", .{this.state.outstanding_body_bytes}); | ||
| } | ||
|
|
||
| /// HTTP-thread wake-up from `scheduleResponseBodyConsumed`: the JS reader | ||
| /// drained `bytes` from the ByteStream. For HTTP/1.1 this may resume a | ||
| /// socket read that `maybePauseReceive` paused. HTTP/2 and HTTP/3 have | ||
| /// dedicated handlers that reach here via their session dispatch. | ||
| pub fn consumeResponseBody(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, bytes: u32) void { | ||
| // `bytes` is post-decompression; clamp so a compression surplus | ||
| // doesn't leave `outstanding_body_bytes` underflowed — same rule as | ||
| // the h2 `consumed_bytes` clamp. | ||
| this.state.outstanding_body_bytes -|= @min(@as(usize, bytes), this.state.outstanding_body_bytes); | ||
| if (!this.state.flags.receive_paused) return; | ||
| // Disarming `body_consumption_tracked` (e.g. reader.cancel()) posts a | ||
| // saturating sentinel; treat that as "resume now" regardless of the | ||
| // low-water mark so the abandoned body can drain for keep-alive reuse. | ||
| const should_resume = this.state.outstanding_body_bytes <= receive_body_low_water or | ||
| !this.signals.get(.body_consumption_tracked); | ||
| if (!should_resume) return; | ||
| this.state.flags.receive_paused = false; | ||
| if (socket.isClosedOrHasError()) return; | ||
| _ = socket.resumeStream(); | ||
| this.setTimeout(socket, 5); | ||
| log("consumeResponseBody: resumed, {d} bytes outstanding", .{this.state.outstanding_body_bytes}); | ||
| } | ||
|
Check failure on line 2308 in src/http.zig
|
||
|
|
||
| pub fn drainResponseBody(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void { | ||
|
|
||
| // Find out if we should not send any update. | ||
|
|
@@ -2267,6 +2370,16 @@ | |
|
|
||
| if (this.isKeepAlivePossible() and !socket.isClosedOrHasError() and tunnel_poolable) { | ||
| log("release socket", .{}); | ||
| // Defensive: maybePauseReceive's isDone() branch resumes for | ||
| // the normal .body/.body_chunk onData path, but a | ||
| // close-delimited body or an early server reply can reach | ||
| // here without another onData. The uSockets `is_paused` | ||
| // flag survives state.reset(), so a paused socket released | ||
| // to the pool would hang the next request that adopts it. | ||
| if (this.state.flags.receive_paused) { | ||
| this.state.flags.receive_paused = false; | ||
| _ = socket.resumeStream(); | ||
| } | ||
| const tunnel = this.proxy_tunnel; | ||
| this.proxy_tunnel = null; | ||
| if (tunnel) |t| t.detachOwner(this); | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.