Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/http_jsc/websocket_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2092,6 +2092,24 @@ impl<const SSL: bool> WebSocket<SSL> {
// This is under-estimated a little, as we don't include usockets context.
cost
}

/// Bytes queued by `send()` that have not yet been written to the socket.
/// Backs the client `WebSocket.bufferedAmount` getter. Includes the framing
/// bytes of buffered frames (the send buffer holds fully framed messages),
/// plus any encrypted bytes the proxy tunnel still holds.
//
// `extern "C"` entrypoint; `this` is non-null by C++ contract (see SAFETY comment below).
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn get_buffered_amount(this: *const Self) -> usize {
// SAFETY: called from C++ with a valid pointer
let this = unsafe { &*this };
let mut buffered = this.send_buffer.readable_length();
if let Some(tunnel) = &this.proxy_tunnel {
// SAFETY: `tunnel` holds a live ref (RefPtr has no `Deref`).
buffered += unsafe { tunnel.as_ref() }.buffered_amount();
}
buffered
Comment thread
robobun marked this conversation as resolved.
}
Comment thread
robobun marked this conversation as resolved.
}

// ──────────────────────────────────────────────────────────────────────────
Expand All @@ -2110,6 +2128,7 @@ macro_rules! export_websocket_client {
cancel = $cancel:ident,
close = $close:ident,
finalize = $finalize:ident,
get_buffered_amount = $get_buffered_amount:ident,
init = $init:ident,
init_with_tunnel = $init_with_tunnel:ident,
memory_cost = $memory_cost:ident,
Expand All @@ -2130,6 +2149,10 @@ macro_rules! export_websocket_client {
WebSocket::<$ssl>::finalize(this)
}
#[unsafe(no_mangle)]
pub extern "C" fn $get_buffered_amount(this: *const WebSocket<$ssl>) -> usize {
WebSocket::<$ssl>::get_buffered_amount(this)
}
#[unsafe(no_mangle)]
pub extern "C" fn $init(
outgoing: *mut CppWebSocket,
input_socket: *mut c_void,
Expand Down Expand Up @@ -2200,6 +2223,7 @@ export_websocket_client!(
cancel = Bun__WebSocketClient__cancel,
close = Bun__WebSocketClient__close,
finalize = Bun__WebSocketClient__finalize,
get_buffered_amount = Bun__WebSocketClient__getBufferedAmount,
init = Bun__WebSocketClient__init,
init_with_tunnel = Bun__WebSocketClient__initWithTunnel,
memory_cost = Bun__WebSocketClient__memoryCost,
Expand All @@ -2212,6 +2236,7 @@ export_websocket_client!(
cancel = Bun__WebSocketClientTLS__cancel,
close = Bun__WebSocketClientTLS__close,
finalize = Bun__WebSocketClientTLS__finalize,
get_buffered_amount = Bun__WebSocketClientTLS__getBufferedAmount,
init = Bun__WebSocketClientTLS__init,
init_with_tunnel = Bun__WebSocketClientTLS__initWithTunnel,
memory_cost = Bun__WebSocketClientTLS__memoryCost,
Expand Down
5 changes: 5 additions & 0 deletions src/http_jsc/websocket_client/WebSocketProxyTunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,11 @@ impl WebSocketProxyTunnel {
pub(crate) fn has_backpressure(&self) -> bool {
self.write_buffer.is_not_empty()
}

/// Encrypted bytes still buffered in the tunnel awaiting a writable socket.
pub(crate) fn buffered_amount(&self) -> usize {
self.write_buffer.size()
}
}

impl Drop for WebSocketProxyTunnel {
Expand Down
2 changes: 2 additions & 0 deletions src/jsc/bindings/headers.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 19 additions & 5 deletions src/jsc/bindings/webcore/WebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -857,8 +857,6 @@
switch (m_connectedWebSocketKind) {
case ConnectedWebSocketKind::Client: {
Bun__WebSocketClient__writeBinaryData(this->m_connectedWebSocket.client, reinterpret_cast<const unsigned char*>(baseAddress), length, static_cast<uint8_t>(op));
// this->m_connectedWebSocket.client->send({ baseAddress, length }, opCode);
// this->m_bufferedAmount = this->m_connectedWebSocket.client->getBufferedAmount();
break;
}
case ConnectedWebSocketKind::ClientSSL: {
Expand Down Expand Up @@ -887,8 +885,6 @@
case ConnectedWebSocketKind::Client: {
auto zigStr = Zig::toZigString(message);
Bun__WebSocketClient__writeString(this->m_connectedWebSocket.client, &zigStr, static_cast<uint8_t>(op));
// this->m_connectedWebSocket.client->send({ baseAddress, length }, opCode);
// this->m_bufferedAmount = this->m_connectedWebSocket.client->getBufferedAmount();
break;
}
case ConnectedWebSocketKind::ClientSSL: {
Expand Down Expand Up @@ -1224,7 +1220,25 @@

unsigned WebSocket::bufferedAmount() const
{
return saturateAdd(m_bufferedAmount, m_bufferedAmountAfterClose);
// Query the live send-buffer size from the connection so backpressure is
// observable while OPEN. After close the connection is gone, so fall back
// to m_bufferedAmount (set by didClose() to the unhandled buffered amount).
size_t buffered = m_bufferedAmount;
switch (m_connectedWebSocketKind) {
case ConnectedWebSocketKind::Client:
buffered = Bun__WebSocketClient__getBufferedAmount(this->m_connectedWebSocket.client);
break;
case ConnectedWebSocketKind::ClientSSL:
buffered = Bun__WebSocketClientTLS__getBufferedAmount(this->m_connectedWebSocket.clientSSL);
break;
case ConnectedWebSocketKind::None:
break;
Comment thread
robobun marked this conversation as resolved.
}

unsigned clamped = buffered > std::numeric_limits<unsigned>::max()
? std::numeric_limits<unsigned>::max()
: static_cast<unsigned>(buffered);
return saturateAdd(clamped, m_bufferedAmountAfterClose);

Check warning on line 1241 in src/jsc/bindings/webcore/WebSocket.cpp

View check run for this annotation

Claude / Claude Code Review

bufferedAmount resets to 0 after close() instead of retaining backlog

After `ws.close()` (or `terminate()`), `bufferedAmount` drops back to 0 even if a large backlog was queued, because `m_connectedWebSocketKind` is set to `None` and the fallback `m_bufferedAmount` is always 0 (its only writer, `WebSocket__didClose`, hardcodes `0` for `unhandledBufferedAmount`). The WHATWG spec says the number must not reset once the connection closes; consider snapshotting `getBufferedAmount` into `m_bufferedAmount` in `close()`/`terminate()` just before clearing `m_connectedWebS
Comment thread
robobun marked this conversation as resolved.
Outdated
}

String WebSocket::protocol() const
Expand Down
73 changes: 73 additions & 0 deletions test/js/web/websocket/websocket-buffered-amount.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { describe, expect, test } from "bun:test";
import crypto from "node:crypto";
import net from "node:net";

const WS_MAGIC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";

// Raw TCP server that completes the WebSocket handshake and then stops reading
// from the socket (`pause()`), so the client's outbound frames cannot drain to
// the peer and pile up in the in-process send buffer.
function nonDrainingServer(): Promise<{ port: number; close: () => void }> {
return new Promise((resolve, reject) => {
const server = net.createServer(sock => {
let buf = "";
let upgraded = false;
sock.on("data", d => {
if (upgraded) return;
buf += d.toString("latin1");
if (!buf.includes("\r\n\r\n")) return;
const key = /sec-websocket-key:\s*(.+)\r\n/i.exec(buf)?.[1]?.trim() ?? "";
const accept = crypto.createHash("sha1").update(key + WS_MAGIC).digest("base64");
sock.write(
"HTTP/1.1 101 Switching Protocols\r\n" +
"Upgrade: websocket\r\n" +
"Connection: Upgrade\r\n" +
`Sec-WebSocket-Accept: ${accept}\r\n\r\n`,
);
upgraded = true;
sock.pause(); // never read the client's frames
});
sock.on("error", () => {});
});
server.on("error", reject);
server.listen(0, "127.0.0.1", () => {
const address = server.address() as net.AddressInfo;
resolve({ port: address.port, close: () => server.close() });
});
});
}

describe("WebSocket.bufferedAmount (client)", () => {
test("reflects the backlog queued to a peer that stopped reading", async () => {
const { port, close } = await nonDrainingServer();
try {
const ws = new WebSocket(`ws://127.0.0.1:${port}/`);
const { promise, resolve, reject } = Promise.withResolvers<{ atOpen: number; max: number }>();
ws.onerror = () => reject(new Error("unexpected error event"));
ws.onopen = () => {
// Nothing queued yet: the baseline must be 0, not a constant.
const atOpen = ws.bufferedAmount;
const chunk = Buffer.alloc(64 * 1024, 0x79).toString();
let max = atOpen;
// 4000 * 64 KiB = ~250 MiB — far more than any socket buffer can accept,
// so the excess must queue in-process.
for (let i = 0; i < 4000; i++) {
ws.send(chunk);
if (ws.bufferedAmount > max) max = ws.bufferedAmount;
}
resolve({ atOpen, max });
};
const { atOpen, max } = await promise;
ws.close();

// Baseline with nothing queued.
expect(atOpen).toBe(0);
// Before the fix, bufferedAmount was hard-wired to 0 for the client
// WebSocket. It must now track the unsent backlog — which is far larger
// than a single 64 KiB frame once the peer stops reading.
expect(max).toBeGreaterThan(64 * 1024);
} finally {
close();
}
});
});
Loading