-
Notifications
You must be signed in to change notification settings - Fork 4.7k
Propagate backpressure through readStreamIntoSink for HTTP responses #28570
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
16bcfbe
d13c704
dee0c45
a574f8a
f0a73cf
884ee76
194c21e
82fb7f3
87c28bd
c73a1b8
e0cc4df
5044451
d2e4c3e
a18b15c
f7f2bca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -829,11 +829,27 @@ pub fn HTTPServerWritable(comptime ssl: bool) type { | |
| this.finalize(); | ||
| return false; | ||
| } | ||
|
|
||
| // Buffer is empty — data was sent to uWS directly via the | ||
| // fast path in write(). Resolve any pending flush promise | ||
| // so the JS consumer can resume reading. | ||
| if (this.buffer.len == 0) { | ||
| this.flushPromise() catch {}; | ||
| if (this.done) { | ||
| if (this.res) |res| { | ||
| res.clearOnWritable(); | ||
| } | ||
| this.signal.close(null); | ||
| this.finalize(); | ||
| return false; | ||
| } | ||
| return true; | ||
| } | ||
|
Comment on lines
+836
to
+847
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟣 This is a pre-existing issue: in Extended reasoning...What the bug is and how it manifests The The specific code path that triggers it The trigger sequence: (1) A response is buffered via the Why existing code does not prevent it
Why this is pre-existing and addressing the refutation This defect existed before this PR in the Impact On HTTP/1.1 keep-alive connections, after How to fix it In the if (chunk.len == 0) {
if (this.done) {
if (this.res) |res| { res.clearOnWritable(); } // ADD
this.signal.close(null);
this.flushPromise() catch {};
this.finalize();
return false; // CHANGE true -> false
}
}Step-by-step proof
Comment on lines
+833
to
+847
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 The comment on the Extended reasoning...What the comment says vs. what the code actually doesThe guard at lines 833-847 of The guard is reachable - but via a different pathThe refutation verifier makes a valid point that the Why the misleading comment mattersThe comment implies the fast-write path registers Step-by-step proof of the actual reachable scenario
How to fixReplace the comment at lines 833-835 with an accurate description: the empty-buffer case occurs when a second drain fires after the tryEnd path has fully consumed all buffered data. The guard logic itself is correct and necessary (it prevents a u52 underflow in the |
||
|
|
||
| var total_written: u64 = 0; | ||
|
|
||
| // do not write more than available | ||
| // if we do, it will cause this to be delayed until the next call, each time | ||
| // TODO: should we break it in smaller chunks? | ||
| const to_write = @min(@as(Blob.SizeType, @truncate(write_offset)), @as(Blob.SizeType, this.buffer.len - 1)); | ||
| const chunk = this.readableSlice()[to_write..]; | ||
| // if we have nothing to write, we are done | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| import { expect, test } from "bun:test"; | ||
|
|
||
| // Verify streaming through fetch().body.pipeThrough(TransformStream) | ||
| // delivers all data correctly with the backpressure-aware readStreamIntoSink. | ||
| // Without the fix, readStreamIntoSink consumed upstream data in a tight loop | ||
| // without checking sink.write() return values, causing OOM with slow consumers. | ||
| // The actual backpressure behavior requires a slow remote consumer to trigger | ||
| // TCP-level backpressure, which cannot be reliably tested on localhost. | ||
|
Comment on lines
+3
to
+8
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 The file-level comment at lines 3–8 claims the fix makes Extended reasoning...The test file opens with a 6-line comment (lines 3–8) that makes two specific implementation claims: (1) What the comment says vs. what the code actually does. A grep of the entire Addressing the refutation. One verifier argued that "backpressure-aware" refers to the system as a whole via the What the test actually verifies. The remaining test confirms end-to-end byte delivery: three concurrent fetch requests through a TransformStream proxy each receive exactly Why this matters. A developer reading this comment to understand whether issue #28035 (OOM with slow consumers) is fixed would conclude that it is, and would not open a follow-up issue or look for the missing implementation. The comment essentially hides a known gap in the fix by attributing correctness to code that was reverted. How to fix. Replace lines 3–8 with an accurate description: the test verifies that streaming through |
||
| // https://github.com/oven-sh/bun/issues/28035 | ||
| test("TransformStream proxy delivers all data", async () => { | ||
| const TOTAL_CHUNKS = 500; | ||
|
|
||
| await using upstream = Bun.serve({ | ||
| port: 0, | ||
| idleTimeout: 255, | ||
| fetch() { | ||
| let i = 0; | ||
| return new Response( | ||
| new ReadableStream({ | ||
| pull(controller) { | ||
| if (i >= TOTAL_CHUNKS) { | ||
| controller.close(); | ||
| return; | ||
| } | ||
| const chunk = Buffer.alloc(25000, 65); | ||
| chunk.writeUInt32BE(i, 0); | ||
| controller.enqueue(chunk); | ||
| i++; | ||
| }, | ||
| }), | ||
| ); | ||
| }, | ||
| }); | ||
|
|
||
| await using proxy = Bun.serve({ | ||
| port: 0, | ||
| idleTimeout: 255, | ||
| async fetch() { | ||
| const res = await fetch(`http://localhost:${upstream.port}/`); | ||
| const transform = new TransformStream({ | ||
| transform(chunk, ctrl) { | ||
| ctrl.enqueue(chunk); | ||
| }, | ||
| }); | ||
| return new Response(res.body!.pipeThrough(transform)); | ||
| }, | ||
| }); | ||
|
|
||
| // Make multiple concurrent requests to stress the pipeline | ||
| const responses = await Promise.all([ | ||
| fetch(`http://localhost:${proxy.port}/`), | ||
| fetch(`http://localhost:${proxy.port}/`), | ||
| fetch(`http://localhost:${proxy.port}/`), | ||
| ]); | ||
|
|
||
| for (const response of responses) { | ||
| const body = await response.bytes(); | ||
| expect(body.length).toBe(TOTAL_CHUNKS * 25000); | ||
|
|
||
| // Verify chunk ordering | ||
| const view = new DataView(body.buffer, body.byteOffset, body.byteLength); | ||
| expect(view.getUint32(0)).toBe(0); | ||
| expect(view.getUint32((TOTAL_CHUNKS - 1) * 25000)).toBe(TOTAL_CHUNKS - 1); | ||
| } | ||
| }); | ||
Uh oh!
There was an error while loading. Please reload this page.