Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions src/http_jsc/websocket_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,12 @@
jsc::mark_binding!();
if let Some(ws) = self.outgoing_websocket.take() {
log!("fail ({})", <&'static str>::from(code));
CppWebSocket::opaque_ref(ws.as_ptr()).did_abrupt_close(code);
// Snapshot the unsent backlog before did_abrupt_close(): the JS
// close event fires synchronously inside it, yet the send buffer is
// not freed until cancel() below, so C++ must be told the amount now
// (it cannot query the connection across this &mut self borrow).
let buffered = self.buffered_amount();
CppWebSocket::opaque_ref(ws.as_ptr()).did_abrupt_close(code, buffered);

Check warning on line 269 in src/http_jsc/websocket_client.rs

View check run for this annotation

Claude / Claude Code Review

fail() snapshot reads 0 on FailedToWrite: send_buffer_out drops swapped-out backlog before terminate

On the tunnel `FailedToWrite` path, this snapshot reads ~0: `send_buffer_out()` does `mem::replace(&mut self.send_buffer, LinearFifo::init())` (line 1251) and the `Err(true)` arm `drop(buf)`s the swapped-out backlog *before* `terminate(FailedToWrite)` → `fail()`, so `self.send_buffer.readable_length()` here is 0 and `bufferedAmount` resets in `onclose`. The line-1288 comment was correct before `fail()` started reading `send_buffer`; restoring with `self.send_buffer = buf;` instead of `drop(buf)`
Comment thread
robobun marked this conversation as resolved.
Outdated
// SAFETY: `self: &mut Self` → `*mut Self`; allocation kept live by
// the socket/tunnel I/O ref (or by caller's guard).
unsafe { Self::deref(self) };
Expand Down Expand Up @@ -1632,7 +1637,11 @@
};
self.poll_ref.unref(Self::vm_loop_ctx(&self.global_this));
jsc::mark_binding!();
CppWebSocket::opaque_ref(out.as_ptr()).did_abrupt_close(code);
// Capture the unsent backlog before the call (it may already be 0 if a
// caller such as handle_close() cleared the buffer first) so C++ can
// keep bufferedAmount from resetting to 0 on abrupt close.
let buffered = self.buffered_amount();
CppWebSocket::opaque_ref(out.as_ptr()).did_abrupt_close(code, buffered);

Check failure on line 1644 in src/http_jsc/websocket_client.rs

View check run for this annotation

Claude / Claude Code Review

bufferedAmount resets to 0 on server-initiated graceful close (received Close frame)

The fourth close path — a **server-initiated graceful close** (peer sends a Close frame) — still resets `bufferedAmount` to 0: `ReceiveState::Close` → `send_close_with_body()` calls `clear_data()` (dropping the whole `send_buffer` backlog) before `dispatch_close()`, and `WebSocket__didClose` still hardcodes `didClose(0, …)`, so `m_bufferedAmount` stays 0 and `onclose` reads `bufferedAmount === 0`. The "total only ever increases" comment in `bufferedAmount()` is therefore inaccurate for this path
Comment thread
robobun marked this conversation as resolved.
Outdated
// SAFETY: `self: &mut Self` → `*mut Self`; allocation kept live by
// caller's ref guard (see cancel/handle_close).
unsafe { Self::deref(self) };
Expand Down Expand Up @@ -2092,6 +2101,31 @@
// This is under-estimated a little, as we don't include usockets context.
cost
}

/// Bytes queued by `send()` that have not yet been written to the socket.
/// Backs the client `WebSocket.bufferedAmount` getter. Includes the framing
/// bytes of buffered frames (the send buffer holds fully framed messages),
/// plus any encrypted bytes the proxy tunnel still holds.
pub fn buffered_amount(&self) -> usize {
let mut buffered = self.send_buffer.readable_length();
if let Some(tunnel) = &self.proxy_tunnel {
// Use the raw-ptr accessor, not `tunnel.as_ref()`: this runs inside
// the tunnel's SSL-wrapper callbacks on an abrupt close (fail()),
// where a whole-struct `&WebSocketProxyTunnel` would overlap the
// live `&mut SslWrapper` over its `wrapper` field (UB under Stacked
// Borrows — see WebSocketProxyTunnel's Aliasing model doc).
// SAFETY: `tunnel` (NonNull) points to a live tunnel.
buffered += unsafe { WebSocketProxyTunnel::buffered_amount(tunnel.as_ptr()) };
}
buffered
}

// `extern "C"` entrypoint; `this` is non-null by C++ contract (see SAFETY comment below).
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn get_buffered_amount(this: *const Self) -> usize {
// SAFETY: called from C++ with a valid pointer
unsafe { &*this }.buffered_amount()
}

Check failure on line 2128 in src/http_jsc/websocket_client.rs

View check run for this annotation

Claude / Claude Code Review

get_buffered_amount forms &Self while dispatch_data holds &mut self (re-entrant onmessage)

`get_buffered_amount` forms a whole-struct `&*this` on the `WebSocket<SSL>` allocation, but it is reachable re-entrantly from inside `dispatch_data(&mut self)` via `onmessage` → JS reads `ws.bufferedAmount` — the same Stacked Borrows aliasing class you already worked around for `fail()` (the "cannot query the connection across this `&mut self` borrow" comment) and for `WebSocketProxyTunnel::buffered_amount` in f704b190. The same `addr_of!` field-projection fix applies here: read `send_buffer` /
Comment thread
robobun marked this conversation as resolved.
}

// ──────────────────────────────────────────────────────────────────────────
Expand All @@ -2110,6 +2144,7 @@
cancel = $cancel:ident,
close = $close:ident,
finalize = $finalize:ident,
get_buffered_amount = $get_buffered_amount:ident,
init = $init:ident,
init_with_tunnel = $init_with_tunnel:ident,
memory_cost = $memory_cost:ident,
Expand All @@ -2130,6 +2165,10 @@
WebSocket::<$ssl>::finalize(this)
}
#[unsafe(no_mangle)]
pub extern "C" fn $get_buffered_amount(this: *const WebSocket<$ssl>) -> usize {
WebSocket::<$ssl>::get_buffered_amount(this)
}
#[unsafe(no_mangle)]
pub extern "C" fn $init(
outgoing: *mut CppWebSocket,
input_socket: *mut c_void,
Expand Down Expand Up @@ -2200,6 +2239,7 @@
cancel = Bun__WebSocketClient__cancel,
close = Bun__WebSocketClient__close,
finalize = Bun__WebSocketClient__finalize,
get_buffered_amount = Bun__WebSocketClient__getBufferedAmount,
init = Bun__WebSocketClient__init,
init_with_tunnel = Bun__WebSocketClient__initWithTunnel,
memory_cost = Bun__WebSocketClient__memoryCost,
Expand All @@ -2212,6 +2252,7 @@
cancel = Bun__WebSocketClientTLS__cancel,
close = Bun__WebSocketClientTLS__close,
finalize = Bun__WebSocketClientTLS__finalize,
get_buffered_amount = Bun__WebSocketClientTLS__getBufferedAmount,
init = Bun__WebSocketClientTLS__init,
init_with_tunnel = Bun__WebSocketClientTLS__initWithTunnel,
memory_cost = Bun__WebSocketClientTLS__memoryCost,
Expand Down
13 changes: 10 additions & 3 deletions src/http_jsc/websocket_client/CppWebSocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ unsafe extern "C" {
buffered_len: usize,
deflate_params: *const websocket_deflate::Params,
);
safe fn WebSocket__didAbruptClose(websocket_context: &CppWebSocket, reason: ErrorCode);
safe fn WebSocket__didAbruptClose(
websocket_context: &CppWebSocket,
reason: ErrorCode,
buffered_amount: usize,
);
fn WebSocket__didClose(websocket_context: &CppWebSocket, code: u16, reason: *const BunString);
fn WebSocket__didReceiveText(
websocket_context: &CppWebSocket,
Expand All @@ -69,12 +73,15 @@ unsafe extern "C" {
// borrows (often while `&mut WebSocket<SSL>` is also live), so `&mut self`
// would force needless `unsafe { &mut *ptr }` at every site.
impl CppWebSocket {
pub(crate) fn did_abrupt_close(&self, reason: ErrorCode) {
/// `buffered_amount` is the sender's unsent backlog captured *before* this
/// call (the connection's send buffer may be freed during the abrupt-close
/// teardown), so C++ can keep `WebSocket.bufferedAmount` from resetting to 0.
pub(crate) fn did_abrupt_close(&self, reason: ErrorCode, buffered_amount: usize) {
// SAFETY: VirtualMachine::get() returns the live current-thread VM;
// event_loop() yields its raw event-loop pointer (live for VM lifetime).
let event_loop = VirtualMachine::get().event_loop_mut();
event_loop.enter();
WebSocket__didAbruptClose(self, reason);
WebSocket__didAbruptClose(self, reason, buffered_amount);
event_loop.exit();
}

Expand Down
16 changes: 16 additions & 0 deletions src/http_jsc/websocket_client/WebSocketProxyTunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,22 @@ impl WebSocketProxyTunnel {
pub(crate) fn has_backpressure(&self) -> bool {
self.write_buffer.is_not_empty()
}

/// Encrypted bytes still buffered in the tunnel awaiting a writable socket.
///
/// Takes `*const Self` and projects to `write_buffer` via `addr_of!` rather
/// than forming a whole-struct `&Self`: this is reachable from inside the
/// SSL-wrapper callbacks (abrupt close during the connected phase), which
/// hold a `&mut SslWrapper` over the `wrapper` field — a whole-struct borrow
/// would overlap it (see the module's Aliasing model doc).
///
/// # Safety
/// `this` must point to a live `WebSocketProxyTunnel`.
pub(crate) unsafe fn buffered_amount(this: *const Self) -> usize {
// SAFETY: `this` is live; short-lived shared borrow of the disjoint
// `write_buffer` field only (never touches `wrapper`).
unsafe { (*ptr::addr_of!((*this).write_buffer)).size() }
}
}

impl Drop for WebSocketProxyTunnel {
Expand Down
3 changes: 2 additions & 1 deletion src/http_jsc/websocket_client/WebSocketUpgradeClient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,8 @@ impl<const SSL: bool> HTTPClient<SSL> {
// SAFETY: short-lived `&mut` for the field take; ends before the FFI call.
let ws = unsafe { (*this).outgoing_websocket.take() };
if let Some(ws) = ws {
CppWebSocket::opaque_ref(ws).did_abrupt_close(code);
// The upgrade handshake has no send buffer yet, so the backlog is 0.
CppWebSocket::opaque_ref(ws).did_abrupt_close(code, 0);
// SAFETY: `this` carries root provenance; may free `this`.
unsafe { Self::deref(this) };
}
Expand Down
2 changes: 2 additions & 0 deletions src/jsc/bindings/headers.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 50 additions & 11 deletions src/jsc/bindings/webcore/WebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ static unsigned saturateAdd(unsigned a, unsigned b)
return a + b;
}

static unsigned clampToUnsigned(size_t value)
{
return value > std::numeric_limits<unsigned>::max()
? std::numeric_limits<unsigned>::max()
: static_cast<unsigned>(value);
}

ASCIILiteral WebSocket::subprotocolSeparator()
{
return ", "_s;
Expand Down Expand Up @@ -857,8 +864,6 @@ void WebSocket::sendWebSocketData(const char* baseAddress, size_t length, const
switch (m_connectedWebSocketKind) {
case ConnectedWebSocketKind::Client: {
Bun__WebSocketClient__writeBinaryData(this->m_connectedWebSocket.client, reinterpret_cast<const unsigned char*>(baseAddress), length, static_cast<uint8_t>(op));
// this->m_connectedWebSocket.client->send({ baseAddress, length }, opCode);
// this->m_bufferedAmount = this->m_connectedWebSocket.client->getBufferedAmount();
break;
}
case ConnectedWebSocketKind::ClientSSL: {
Expand Down Expand Up @@ -887,8 +892,6 @@ void WebSocket::sendWebSocketString(const String& message, const Opcode op)
case ConnectedWebSocketKind::Client: {
auto zigStr = Zig::toZigString(message);
Bun__WebSocketClient__writeString(this->m_connectedWebSocket.client, &zigStr, static_cast<uint8_t>(op));
// this->m_connectedWebSocket.client->send({ baseAddress, length }, opCode);
// this->m_bufferedAmount = this->m_connectedWebSocket.client->getBufferedAmount();
break;
}
case ConnectedWebSocketKind::ClientSSL: {
Expand Down Expand Up @@ -991,17 +994,19 @@ ExceptionOr<void> WebSocket::close(std::optional<unsigned short> optionalCode, c
m_state = CLOSING;
switch (m_connectedWebSocketKind) {
case ConnectedWebSocketKind::Client: {
// Snapshot the backlog before the connection (and its send buffer) is
// torn down: per spec bufferedAmount must not reset to 0 on close.
m_bufferedAmount = clampToUnsigned(Bun__WebSocketClient__getBufferedAmount(this->m_connectedWebSocket.client));
ZigString reasonZigStr = Zig::toZigString(reason);
Bun__WebSocketClient__close(this->m_connectedWebSocket.client, code, &reasonZigStr);
updateHasPendingActivity();
// this->m_bufferedAmount = this->m_connectedWebSocket.client->getBufferedAmount();
break;
}
case ConnectedWebSocketKind::ClientSSL: {
m_bufferedAmount = clampToUnsigned(Bun__WebSocketClientTLS__getBufferedAmount(this->m_connectedWebSocket.clientSSL));
ZigString reasonZigStr = Zig::toZigString(reason);
Bun__WebSocketClientTLS__close(this->m_connectedWebSocket.clientSSL, code, &reasonZigStr);
updateHasPendingActivity();
// this->m_bufferedAmount = this->m_connectedWebSocket.clientSSL->getBufferedAmount();
break;
}
// case ConnectedWebSocketKind::Server: {
Expand Down Expand Up @@ -1036,11 +1041,15 @@ ExceptionOr<void> WebSocket::terminate()
m_state = CLOSING;
switch (m_connectedWebSocketKind) {
case ConnectedWebSocketKind::Client: {
// Snapshot the backlog before cancel() frees the send buffer, so
// bufferedAmount does not reset to 0 (see bufferedAmount()).
m_bufferedAmount = clampToUnsigned(Bun__WebSocketClient__getBufferedAmount(this->m_connectedWebSocket.client));
Bun__WebSocketClient__cancel(this->m_connectedWebSocket.client);
updateHasPendingActivity();
break;
}
case ConnectedWebSocketKind::ClientSSL: {
m_bufferedAmount = clampToUnsigned(Bun__WebSocketClientTLS__getBufferedAmount(this->m_connectedWebSocket.clientSSL));
Bun__WebSocketClientTLS__cancel(this->m_connectedWebSocket.clientSSL);
updateHasPendingActivity();
break;
Expand Down Expand Up @@ -1224,7 +1233,24 @@ WebSocket::State WebSocket::readyState() const

unsigned WebSocket::bufferedAmount() const
{
return saturateAdd(m_bufferedAmount, m_bufferedAmountAfterClose);
// While OPEN, query the live send-buffer size from the connection so
// backpressure is observable. Once closed the connection is gone, but the
// spec requires bufferedAmount not to reset to 0 — close()/terminate()
// snapshot the final backlog into m_bufferedAmount, and send() after close
// adds to m_bufferedAmountAfterClose, so the total only ever increases.
unsigned buffered = m_bufferedAmount;
switch (m_connectedWebSocketKind) {
case ConnectedWebSocketKind::Client:
buffered = clampToUnsigned(Bun__WebSocketClient__getBufferedAmount(this->m_connectedWebSocket.client));
break;
case ConnectedWebSocketKind::ClientSSL:
buffered = clampToUnsigned(Bun__WebSocketClientTLS__getBufferedAmount(this->m_connectedWebSocket.clientSSL));
break;
case ConnectedWebSocketKind::None:
break;
Comment thread
robobun marked this conversation as resolved.
}

return saturateAdd(buffered, m_bufferedAmountAfterClose);
}

String WebSocket::protocol() const
Expand Down Expand Up @@ -1588,7 +1614,11 @@ void WebSocket::didClose(unsigned unhandledBufferedAmount, unsigned short code,

bool wasClean = m_state == CLOSING && !unhandledBufferedAmount && code != 0; // WebSocketChannel::CloseEventCodeAbnormalClosure;
m_state = CLOSED;
m_bufferedAmount = unhandledBufferedAmount;
// Don't reset the backlog: close()/terminate() already snapshotted the
// unsent bytes into m_bufferedAmount, and the spec requires bufferedAmount
// not to drop to 0 once closed. Keep whichever is larger.
if (unhandledBufferedAmount > m_bufferedAmount)
m_bufferedAmount = unhandledBufferedAmount;
ASSERT(scriptExecutionContext());
this->m_connectedWebSocketKind = ConnectedWebSocketKind::None;
this->m_upgradeClient = nullptr;
Expand Down Expand Up @@ -1653,13 +1683,22 @@ void WebSocket::didConnect(us_socket_t* socket, char* bufferedData, size_t buffe
this->didConnect();
}

void WebSocket::didFailWithErrorCode(Bun::WebSocketErrorCode code)
void WebSocket::didFailWithErrorCode(Bun::WebSocketErrorCode code, size_t bufferedAmount)
{
// from new WebSocket() -> connect()

if (m_state == CLOSED)
return;

// Keep the backlog reported before this abrupt close (captured on the Rust
// side, since the connection's send buffer is freed during teardown) so
// bufferedAmount does not reset to 0 — see bufferedAmount(). Keep the larger
// value; this also makes the socket-close path (buffer already cleared → 0)
// a no-op.
unsigned clamped = clampToUnsigned(bufferedAmount);
if (clamped > m_bufferedAmount)
m_bufferedAmount = clamped;

this->m_upgradeClient = nullptr;
if (this->m_connectedWebSocketKind == ConnectedWebSocketKind::ClientSSL) {
this->m_connectedWebSocket.clientSSL = nullptr;
Expand Down Expand Up @@ -1886,9 +1925,9 @@ extern "C" void WebSocket__didConnectWithTunnel(WebCore::WebSocket* webSocket, v
webSocket->didConnectWithTunnel(tunnel, bufferedData, len, deflate_params);
}

extern "C" void WebSocket__didAbruptClose(WebCore::WebSocket* webSocket, Bun::WebSocketErrorCode errorCode)
extern "C" void WebSocket__didAbruptClose(WebCore::WebSocket* webSocket, Bun::WebSocketErrorCode errorCode, size_t bufferedAmount)
{
webSocket->didFailWithErrorCode(errorCode);
webSocket->didFailWithErrorCode(errorCode, bufferedAmount);
}
extern "C" void WebSocket__didClose(WebCore::WebSocket* webSocket, uint16_t errorCode, BunString* reason)
{
Expand Down
2 changes: 1 addition & 1 deletion src/jsc/bindings/webcore/WebSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class WebSocket final : public RefCounted<WebSocket>, public EventTargetWithInli
void didClose(unsigned unhandledBufferedAmount, unsigned short code, const String& reason);
void didConnect(us_socket_t* socket, char* bufferedData, size_t bufferedDataSize, const PerMessageDeflateParams* deflate_params, void* customSSLCtx);
void didConnectWithTunnel(void* tunnel, char* bufferedData, size_t bufferedDataSize, const PerMessageDeflateParams* deflate_params);
void didFailWithErrorCode(Bun::WebSocketErrorCode code);
void didFailWithErrorCode(Bun::WebSocketErrorCode code, size_t bufferedAmount = 0);

void didReceiveMessage(String&& message);
void didReceiveData(const char* data, size_t length);
Expand Down
Loading
Loading