Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions src/bun.js/webcore/Sink.zig
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,15 @@ pub fn JSSink(comptime SinkType: type, comptime abi_name: []const u8) type {
_ = this;
}

inline fn backpressureCheck(sink: *SinkType, result: jsc.JSValue) jsc.JSValue {
if (comptime @hasField(SinkType, "has_backpressure")) {
if (sink.has_backpressure and result.isNumber()) {
return .false;
}
}
return result;
}

pub fn write(globalThis: *JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue {
jsc.markBinding(@src());
const this = try getThis(globalThis, callframe);
Expand Down Expand Up @@ -394,9 +403,10 @@ pub fn JSSink(comptime SinkType: type, comptime abi_name: []const u8) type {
return jsc.JSValue.jsNumber(0);
}

return this.sink.writeBytes(
const result = this.sink.writeBytes(
.{ .temporary = bun.ByteList.fromBorrowedSliceDangerous(slice) },
).toJS(globalThis);
return backpressureCheck(&this.sink, result);
}

if (!arg.isString()) {
Expand All @@ -413,14 +423,16 @@ pub fn JSSink(comptime SinkType: type, comptime abi_name: []const u8) type {

defer str.ensureStillAlive();
if (view.is16Bit()) {
return this.sink.writeUTF16(.{ .temporary = bun.ByteList.fromBorrowedSliceDangerous(
const result = this.sink.writeUTF16(.{ .temporary = bun.ByteList.fromBorrowedSliceDangerous(
std.mem.sliceAsBytes(view.utf16SliceAligned()),
) }).toJS(globalThis);
return backpressureCheck(&this.sink, result);
}

return this.sink.writeLatin1(
const result = this.sink.writeLatin1(
.{ .temporary = bun.ByteList.fromBorrowedSliceDangerous(view.slice()) },
).toJS(globalThis);
return backpressureCheck(&this.sink, result);
}

pub fn writeUTF8(globalThis: *JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue {
Expand Down
6 changes: 6 additions & 0 deletions src/bun.js/webcore/streams.zig
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,12 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
this.has_backpressure = false;
} else {
this.has_backpressure = res.write(buf) == .backpressure;
if (!this.has_backpressure) {
this.has_backpressure = res.getBufferedAmount() > 1024 * 1024;
}
if (this.has_backpressure) {
res.onWritable(*@This(), onWritable, this);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
Comment thread
robobun marked this conversation as resolved.
Outdated
}
Comment thread
robobun marked this conversation as resolved.
this.handleWrote(buf.len);
return true;
Expand Down
8 changes: 6 additions & 2 deletions src/js/builtins/ReadableStreamInternals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,9 @@ export async function readStreamIntoSink(stream: ReadableStream, sink, isNative)
}

for (var i = 0, values = many.value, length = many.value.length; i < length; i++) {
sink.write(values[i]);
if (sink.write(values[i]) === false) {
await sink.flush(true);
}
}

var streamState = $getByIdDirectPrivate(stream, "state");
Expand All @@ -928,7 +930,9 @@ export async function readStreamIntoSink(stream: ReadableStream, sink, isNative)
return sink.end();
}

sink.write(value);
if (sink.write(value) === false) {
await sink.flush(true);
}
}
Comment thread
robobun marked this conversation as resolved.
} catch (e) {
didThrow = true;
Expand Down
157 changes: 157 additions & 0 deletions test/regression/issue/28035.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import { expect, test } from "bun:test";
import { bunEnv, bunExe, tempDir } from "harness";

// Verify that backpressure propagates through fetch().body.pipeThrough(TransformStream)
// https://github.com/oven-sh/bun/issues/28035
test("fetch body piped through TransformStream propagates backpressure", async () => {
using dir = tempDir("28035", {
"test.ts": `
const TOTAL_CHUNKS = 3000;
let chunksProduced = 0;

const upstream = Bun.serve({
port: 0,
idleTimeout: 255,
fetch() {
chunksProduced = 0;
return new Response(
new ReadableStream({
pull(controller) {
if (chunksProduced >= TOTAL_CHUNKS) { controller.close(); return; }
controller.enqueue(Buffer.alloc(32000, 65));
chunksProduced++;
},
}),
);
},
});

const proxy = Bun.serve({
port: 0,
idleTimeout: 255,
async fetch() {
const res = await fetch("http://localhost:" + upstream.port + "/");
const transform = new TransformStream({
transform(chunk, ctrl) { ctrl.enqueue(chunk); },
});
return new Response(res.body!.pipeThrough(transform));
},
});

// Connect and immediately pause reading to create TCP backpressure.
// With the socket paused, kernel send/receive buffers fill up,
// causing uWS to report backpressure.
const { promise: done, resolve: finish } = Promise.withResolvers<void>();

const conn = await Bun.connect({
hostname: "localhost",
port: proxy.port,
socket: {
open(socket) {
socket.write("GET / HTTP/1.1\\r\\nHost: localhost\\r\\nConnection: close\\r\\n\\r\\n");
socket.pause();
},
data() {},
close() { finish(); },
error() { finish(); },
connectError() { finish(); },
},
});

// Poll until production stabilizes (backpressure stalls it) or
// all chunks are consumed (no backpressure). Awaiting a
// condition instead of sleeping a fixed duration.
let stableCount = 0;
let lastProduced = 0;
while (chunksProduced < TOTAL_CHUNKS && stableCount < 5) {
await Bun.sleep(200);
if (chunksProduced === lastProduced) {
stableCount++;
} else {
stableCount = 0;
lastProduced = chunksProduced;
}
}
const chunksWhilePaused = chunksProduced;

// Resume reading so the connection can close cleanly
conn.resume();
await done;
proxy.stop(true);
upstream.stop(true);
console.log(JSON.stringify({
chunksWhilePaused,
TOTAL_CHUNKS,
backpressureObserved: chunksWhilePaused < TOTAL_CHUNKS,
}));
`,
});

await using proc = Bun.spawn({
cmd: [bunExe(), "run", "test.ts"],
cwd: String(dir),
env: bunEnv,
stdout: "pipe",
stderr: "pipe",
});

const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]);

const lines = stdout.trim().split("\n");
const jsonLine = lines.find(l => l.startsWith("{"));
if (!jsonLine) {
console.log("stdout:", stdout.slice(0, 500));
console.log("stderr:", stderr.slice(0, 2000));
}
expect(jsonLine).toBeDefined();
const result = JSON.parse(jsonLine!);
expect(result.chunksWhilePaused).toBeGreaterThan(0);

// With backpressure: production stalls before all chunks are consumed
// Without backpressure: all chunks consumed eagerly
expect(result.backpressureObserved).toBe(true);
expect(exitCode).toBe(0);
});

// Verify basic streaming through TransformStream delivers all data correctly
test("TransformStream proxy delivers all data", async () => {
const TOTAL_CHUNKS = 500;

await using upstream = Bun.serve({
port: 0,
idleTimeout: 255,
fetch() {
let i = 0;
return new Response(
new ReadableStream({
pull(controller) {
if (i >= TOTAL_CHUNKS) {
controller.close();
return;
}
controller.enqueue(Buffer.alloc(25000, 65));
i++;
},
}),
);
},
});

await using proxy = Bun.serve({
port: 0,
idleTimeout: 255,
async fetch() {
const res = await fetch(`http://localhost:${upstream.port}/`);
const transform = new TransformStream({
transform(chunk, ctrl) {
ctrl.enqueue(chunk);
},
});
return new Response(res.body!.pipeThrough(transform));
},
});

const response = await fetch(`http://localhost:${proxy.port}/`);
const body = await response.bytes();
expect(body.length).toBe(TOTAL_CHUNKS * 25000);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
});
Loading