Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/http_jsc/websocket_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2092,6 +2092,24 @@ impl<const SSL: bool> WebSocket<SSL> {
// This is under-estimated a little, as we don't include usockets context.
cost
}

/// Bytes queued by `send()` that have not yet been written to the socket.
/// Backs the client `WebSocket.bufferedAmount` getter. Includes the framing
/// bytes of buffered frames (the send buffer holds fully framed messages),
/// plus any encrypted bytes the proxy tunnel still holds.
//
// `extern "C"` entrypoint; `this` is non-null by C++ contract (see SAFETY comment below).
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn get_buffered_amount(this: *const Self) -> usize {
// SAFETY: called from C++ with a valid pointer
let this = unsafe { &*this };
let mut buffered = this.send_buffer.readable_length();
if let Some(tunnel) = &this.proxy_tunnel {
// SAFETY: `tunnel` holds a live ref (RefPtr has no `Deref`).
buffered += unsafe { tunnel.as_ref() }.buffered_amount();
}
buffered
Comment thread
robobun marked this conversation as resolved.
}
Comment thread
robobun marked this conversation as resolved.
}

// ──────────────────────────────────────────────────────────────────────────
Expand All @@ -2110,6 +2128,7 @@ macro_rules! export_websocket_client {
cancel = $cancel:ident,
close = $close:ident,
finalize = $finalize:ident,
get_buffered_amount = $get_buffered_amount:ident,
init = $init:ident,
init_with_tunnel = $init_with_tunnel:ident,
memory_cost = $memory_cost:ident,
Expand All @@ -2130,6 +2149,10 @@ macro_rules! export_websocket_client {
WebSocket::<$ssl>::finalize(this)
}
#[unsafe(no_mangle)]
pub extern "C" fn $get_buffered_amount(this: *const WebSocket<$ssl>) -> usize {
WebSocket::<$ssl>::get_buffered_amount(this)
}
#[unsafe(no_mangle)]
pub extern "C" fn $init(
outgoing: *mut CppWebSocket,
input_socket: *mut c_void,
Expand Down Expand Up @@ -2200,6 +2223,7 @@ export_websocket_client!(
cancel = Bun__WebSocketClient__cancel,
close = Bun__WebSocketClient__close,
finalize = Bun__WebSocketClient__finalize,
get_buffered_amount = Bun__WebSocketClient__getBufferedAmount,
init = Bun__WebSocketClient__init,
init_with_tunnel = Bun__WebSocketClient__initWithTunnel,
memory_cost = Bun__WebSocketClient__memoryCost,
Expand All @@ -2212,6 +2236,7 @@ export_websocket_client!(
cancel = Bun__WebSocketClientTLS__cancel,
close = Bun__WebSocketClientTLS__close,
finalize = Bun__WebSocketClientTLS__finalize,
get_buffered_amount = Bun__WebSocketClientTLS__getBufferedAmount,
init = Bun__WebSocketClientTLS__init,
init_with_tunnel = Bun__WebSocketClientTLS__initWithTunnel,
memory_cost = Bun__WebSocketClientTLS__memoryCost,
Expand Down
5 changes: 5 additions & 0 deletions src/http_jsc/websocket_client/WebSocketProxyTunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,11 @@ impl WebSocketProxyTunnel {
pub(crate) fn has_backpressure(&self) -> bool {
self.write_buffer.is_not_empty()
}

/// Encrypted bytes still buffered in the tunnel awaiting a writable socket.
pub(crate) fn buffered_amount(&self) -> usize {
self.write_buffer.size()
}
}

impl Drop for WebSocketProxyTunnel {
Expand Down
2 changes: 2 additions & 0 deletions src/jsc/bindings/headers.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 19 additions & 5 deletions src/jsc/bindings/webcore/WebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -857,8 +857,6 @@ void WebSocket::sendWebSocketData(const char* baseAddress, size_t length, const
switch (m_connectedWebSocketKind) {
case ConnectedWebSocketKind::Client: {
Bun__WebSocketClient__writeBinaryData(this->m_connectedWebSocket.client, reinterpret_cast<const unsigned char*>(baseAddress), length, static_cast<uint8_t>(op));
// this->m_connectedWebSocket.client->send({ baseAddress, length }, opCode);
// this->m_bufferedAmount = this->m_connectedWebSocket.client->getBufferedAmount();
break;
}
case ConnectedWebSocketKind::ClientSSL: {
Expand Down Expand Up @@ -887,8 +885,6 @@ void WebSocket::sendWebSocketString(const String& message, const Opcode op)
case ConnectedWebSocketKind::Client: {
auto zigStr = Zig::toZigString(message);
Bun__WebSocketClient__writeString(this->m_connectedWebSocket.client, &zigStr, static_cast<uint8_t>(op));
// this->m_connectedWebSocket.client->send({ baseAddress, length }, opCode);
// this->m_bufferedAmount = this->m_connectedWebSocket.client->getBufferedAmount();
break;
}
case ConnectedWebSocketKind::ClientSSL: {
Expand Down Expand Up @@ -1224,7 +1220,25 @@ WebSocket::State WebSocket::readyState() const

unsigned WebSocket::bufferedAmount() const
{
return saturateAdd(m_bufferedAmount, m_bufferedAmountAfterClose);
// Query the live send-buffer size from the connection so backpressure is
// observable while OPEN. After close the connection is gone, so fall back
// to m_bufferedAmount (set by didClose() to the unhandled buffered amount).
size_t buffered = m_bufferedAmount;
switch (m_connectedWebSocketKind) {
case ConnectedWebSocketKind::Client:
buffered = Bun__WebSocketClient__getBufferedAmount(this->m_connectedWebSocket.client);
break;
case ConnectedWebSocketKind::ClientSSL:
buffered = Bun__WebSocketClientTLS__getBufferedAmount(this->m_connectedWebSocket.clientSSL);
break;
case ConnectedWebSocketKind::None:
break;
Comment thread
robobun marked this conversation as resolved.
}

unsigned clamped = buffered > std::numeric_limits<unsigned>::max()
? std::numeric_limits<unsigned>::max()
: static_cast<unsigned>(buffered);
return saturateAdd(clamped, m_bufferedAmountAfterClose);
Comment thread
robobun marked this conversation as resolved.
Outdated
}

String WebSocket::protocol() const
Expand Down
76 changes: 76 additions & 0 deletions test/js/web/websocket/websocket-buffered-amount.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { describe, expect, test } from "bun:test";
import crypto from "node:crypto";
import net from "node:net";

const WS_MAGIC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";

// Raw TCP server that completes the WebSocket handshake and then stops reading
// from the socket (`pause()`), so the client's outbound frames cannot drain to
// the peer and pile up in the in-process send buffer.
function nonDrainingServer(): Promise<{ port: number; close: () => void }> {
return new Promise((resolve, reject) => {
const server = net.createServer(sock => {
let buf = "";
let upgraded = false;
sock.on("data", d => {
if (upgraded) return;
buf += d.toString("latin1");
if (!buf.includes("\r\n\r\n")) return;
const key = /sec-websocket-key:\s*(.+)\r\n/i.exec(buf)?.[1]?.trim() ?? "";
const accept = crypto
.createHash("sha1")
.update(key + WS_MAGIC)
.digest("base64");
sock.write(
"HTTP/1.1 101 Switching Protocols\r\n" +
"Upgrade: websocket\r\n" +
"Connection: Upgrade\r\n" +
`Sec-WebSocket-Accept: ${accept}\r\n\r\n`,
);
upgraded = true;
sock.pause(); // never read the client's frames
});
sock.on("error", () => {});
});
server.on("error", reject);
server.listen(0, "127.0.0.1", () => {
const address = server.address() as net.AddressInfo;
resolve({ port: address.port, close: () => server.close() });
});
});
}

describe("WebSocket.bufferedAmount (client)", () => {
test("reflects the backlog queued to a peer that stopped reading", async () => {
const { port, close } = await nonDrainingServer();
try {
const ws = new WebSocket(`ws://127.0.0.1:${port}/`);
const { promise, resolve, reject } = Promise.withResolvers<{ atOpen: number; max: number }>();
ws.onerror = () => reject(new Error("unexpected error event"));
ws.onopen = () => {
// Nothing queued yet: the baseline must be 0, not a constant.
const atOpen = ws.bufferedAmount;
const chunk = Buffer.alloc(64 * 1024, 0x79).toString();
let max = atOpen;
// 4000 * 64 KiB = ~250 MiB — far more than any socket buffer can accept,
// so the excess must queue in-process.
for (let i = 0; i < 4000; i++) {
ws.send(chunk);
if (ws.bufferedAmount > max) max = ws.bufferedAmount;
}
resolve({ atOpen, max });
};
const { atOpen, max } = await promise;
ws.close();

// Baseline with nothing queued.
expect(atOpen).toBe(0);
// Before the fix, bufferedAmount was hard-wired to 0 for the client
// WebSocket. It must now track the unsent backlog — which is far larger
// than a single 64 KiB frame once the peer stops reading.
expect(max).toBeGreaterThan(64 * 1024);
} finally {
close();
}
});
});
Loading