Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions src/http_jsc/websocket_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,12 @@
jsc::mark_binding!();
if let Some(ws) = self.outgoing_websocket.take() {
log!("fail ({})", <&'static str>::from(code));
CppWebSocket::opaque_ref(ws.as_ptr()).did_abrupt_close(code);
// Snapshot the unsent backlog before did_abrupt_close(): the JS
// close event fires synchronously inside it, yet the send buffer is
// not freed until cancel() below, so C++ must be told the amount now
// (it cannot query the connection across this &mut self borrow).
let buffered = self.buffered_amount();
CppWebSocket::opaque_ref(ws.as_ptr()).did_abrupt_close(code, buffered);
Comment thread
robobun marked this conversation as resolved.
Outdated
// SAFETY: `self: &mut Self` → `*mut Self`; allocation kept live by
// the socket/tunnel I/O ref (or by caller's guard).
unsafe { Self::deref(self) };
Expand Down Expand Up @@ -1632,7 +1637,11 @@
};
self.poll_ref.unref(Self::vm_loop_ctx(&self.global_this));
jsc::mark_binding!();
CppWebSocket::opaque_ref(out.as_ptr()).did_abrupt_close(code);
// Capture the unsent backlog before the call (it may already be 0 if a
// caller such as handle_close() cleared the buffer first) so C++ can
// keep bufferedAmount from resetting to 0 on abrupt close.
let buffered = self.buffered_amount();
CppWebSocket::opaque_ref(out.as_ptr()).did_abrupt_close(code, buffered);
Comment thread
robobun marked this conversation as resolved.
Outdated
// SAFETY: `self: &mut Self` → `*mut Self`; allocation kept live by
// caller's ref guard (see cancel/handle_close).
unsafe { Self::deref(self) };
Expand Down Expand Up @@ -2092,6 +2101,26 @@
// This is under-estimated a little, as we don't include usockets context.
cost
}

/// Bytes queued by `send()` that have not yet been written to the socket.
/// Backs the client `WebSocket.bufferedAmount` getter. Includes the framing
/// bytes of buffered frames (the send buffer holds fully framed messages),
/// plus any encrypted bytes the proxy tunnel still holds.
pub fn buffered_amount(&self) -> usize {
let mut buffered = self.send_buffer.readable_length();
if let Some(tunnel) = &self.proxy_tunnel {
// SAFETY: `tunnel` holds a live ref (RefPtr has no `Deref`).
buffered += unsafe { tunnel.as_ref() }.buffered_amount();
}

Check failure on line 2114 in src/http_jsc/websocket_client.rs

View check run for this annotation

Claude / Claude Code Review

Stacked Borrows violation: buffered_amount() forms whole-struct &WebSocketProxyTunnel from inside SSL callback

When `fail()` is reached from inside a tunnel SSL-wrapper callback (`on_data`/`on_close` → `handle_tunnel_data`/… → `terminate` → `fail`), the new `self.buffered_amount()` call does `unsafe { tunnel.as_ref() }.buffered_amount()`, which forms a whole-struct `&WebSocketProxyTunnel` from Box provenance while the driving frame still holds `&mut SslWrapper` over the tunnel's `wrapper` field. WebSocketProxyTunnel.rs's module-level Aliasing Model doc explicitly forbids exactly this ("Callbacks **never*
Comment thread
robobun marked this conversation as resolved.
Outdated
buffered
Comment thread
robobun marked this conversation as resolved.
}

// `extern "C"` entrypoint; `this` is non-null by C++ contract (see SAFETY comment below).
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn get_buffered_amount(this: *const Self) -> usize {
// SAFETY: called from C++ with a valid pointer
unsafe { &*this }.buffered_amount()
}
Comment thread
robobun marked this conversation as resolved.
}

// ──────────────────────────────────────────────────────────────────────────
Expand All @@ -2110,6 +2139,7 @@
cancel = $cancel:ident,
close = $close:ident,
finalize = $finalize:ident,
get_buffered_amount = $get_buffered_amount:ident,
init = $init:ident,
init_with_tunnel = $init_with_tunnel:ident,
memory_cost = $memory_cost:ident,
Expand All @@ -2130,6 +2160,10 @@
WebSocket::<$ssl>::finalize(this)
}
#[unsafe(no_mangle)]
pub extern "C" fn $get_buffered_amount(this: *const WebSocket<$ssl>) -> usize {
WebSocket::<$ssl>::get_buffered_amount(this)
}
#[unsafe(no_mangle)]
pub extern "C" fn $init(
outgoing: *mut CppWebSocket,
input_socket: *mut c_void,
Expand Down Expand Up @@ -2200,6 +2234,7 @@
cancel = Bun__WebSocketClient__cancel,
close = Bun__WebSocketClient__close,
finalize = Bun__WebSocketClient__finalize,
get_buffered_amount = Bun__WebSocketClient__getBufferedAmount,
init = Bun__WebSocketClient__init,
init_with_tunnel = Bun__WebSocketClient__initWithTunnel,
memory_cost = Bun__WebSocketClient__memoryCost,
Expand All @@ -2212,6 +2247,7 @@
cancel = Bun__WebSocketClientTLS__cancel,
close = Bun__WebSocketClientTLS__close,
finalize = Bun__WebSocketClientTLS__finalize,
get_buffered_amount = Bun__WebSocketClientTLS__getBufferedAmount,
init = Bun__WebSocketClientTLS__init,
init_with_tunnel = Bun__WebSocketClientTLS__initWithTunnel,
memory_cost = Bun__WebSocketClientTLS__memoryCost,
Expand Down
13 changes: 10 additions & 3 deletions src/http_jsc/websocket_client/CppWebSocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ unsafe extern "C" {
buffered_len: usize,
deflate_params: *const websocket_deflate::Params,
);
safe fn WebSocket__didAbruptClose(websocket_context: &CppWebSocket, reason: ErrorCode);
safe fn WebSocket__didAbruptClose(
websocket_context: &CppWebSocket,
reason: ErrorCode,
buffered_amount: usize,
);
fn WebSocket__didClose(websocket_context: &CppWebSocket, code: u16, reason: *const BunString);
fn WebSocket__didReceiveText(
websocket_context: &CppWebSocket,
Expand All @@ -69,12 +73,15 @@ unsafe extern "C" {
// borrows (often while `&mut WebSocket<SSL>` is also live), so `&mut self`
// would force needless `unsafe { &mut *ptr }` at every site.
impl CppWebSocket {
pub(crate) fn did_abrupt_close(&self, reason: ErrorCode) {
/// `buffered_amount` is the sender's unsent backlog captured *before* this
/// call (the connection's send buffer may be freed during the abrupt-close
/// teardown), so C++ can keep `WebSocket.bufferedAmount` from resetting to 0.
pub(crate) fn did_abrupt_close(&self, reason: ErrorCode, buffered_amount: usize) {
// SAFETY: VirtualMachine::get() returns the live current-thread VM;
// event_loop() yields its raw event-loop pointer (live for VM lifetime).
let event_loop = VirtualMachine::get().event_loop_mut();
event_loop.enter();
WebSocket__didAbruptClose(self, reason);
WebSocket__didAbruptClose(self, reason, buffered_amount);
event_loop.exit();
}

Expand Down
5 changes: 5 additions & 0 deletions src/http_jsc/websocket_client/WebSocketProxyTunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,11 @@ impl WebSocketProxyTunnel {
pub(crate) fn has_backpressure(&self) -> bool {
self.write_buffer.is_not_empty()
}

/// Encrypted bytes still buffered in the tunnel awaiting a writable socket.
pub(crate) fn buffered_amount(&self) -> usize {
self.write_buffer.size()
}
}

impl Drop for WebSocketProxyTunnel {
Expand Down
3 changes: 2 additions & 1 deletion src/http_jsc/websocket_client/WebSocketUpgradeClient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,8 @@ impl<const SSL: bool> HTTPClient<SSL> {
// SAFETY: short-lived `&mut` for the field take; ends before the FFI call.
let ws = unsafe { (*this).outgoing_websocket.take() };
if let Some(ws) = ws {
CppWebSocket::opaque_ref(ws).did_abrupt_close(code);
// The upgrade handshake has no send buffer yet, so the backlog is 0.
CppWebSocket::opaque_ref(ws).did_abrupt_close(code, 0);
// SAFETY: `this` carries root provenance; may free `this`.
unsafe { Self::deref(this) };
}
Expand Down
2 changes: 2 additions & 0 deletions src/jsc/bindings/headers.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 50 additions & 11 deletions src/jsc/bindings/webcore/WebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ static unsigned saturateAdd(unsigned a, unsigned b)
return a + b;
}

static unsigned clampToUnsigned(size_t value)
{
return value > std::numeric_limits<unsigned>::max()
? std::numeric_limits<unsigned>::max()
: static_cast<unsigned>(value);
}

ASCIILiteral WebSocket::subprotocolSeparator()
{
return ", "_s;
Expand Down Expand Up @@ -857,8 +864,6 @@ void WebSocket::sendWebSocketData(const char* baseAddress, size_t length, const
switch (m_connectedWebSocketKind) {
case ConnectedWebSocketKind::Client: {
Bun__WebSocketClient__writeBinaryData(this->m_connectedWebSocket.client, reinterpret_cast<const unsigned char*>(baseAddress), length, static_cast<uint8_t>(op));
// this->m_connectedWebSocket.client->send({ baseAddress, length }, opCode);
// this->m_bufferedAmount = this->m_connectedWebSocket.client->getBufferedAmount();
break;
}
case ConnectedWebSocketKind::ClientSSL: {
Expand Down Expand Up @@ -887,8 +892,6 @@ void WebSocket::sendWebSocketString(const String& message, const Opcode op)
case ConnectedWebSocketKind::Client: {
auto zigStr = Zig::toZigString(message);
Bun__WebSocketClient__writeString(this->m_connectedWebSocket.client, &zigStr, static_cast<uint8_t>(op));
// this->m_connectedWebSocket.client->send({ baseAddress, length }, opCode);
// this->m_bufferedAmount = this->m_connectedWebSocket.client->getBufferedAmount();
break;
}
case ConnectedWebSocketKind::ClientSSL: {
Expand Down Expand Up @@ -991,17 +994,19 @@ ExceptionOr<void> WebSocket::close(std::optional<unsigned short> optionalCode, c
m_state = CLOSING;
switch (m_connectedWebSocketKind) {
case ConnectedWebSocketKind::Client: {
// Snapshot the backlog before the connection (and its send buffer) is
// torn down: per spec bufferedAmount must not reset to 0 on close.
m_bufferedAmount = clampToUnsigned(Bun__WebSocketClient__getBufferedAmount(this->m_connectedWebSocket.client));
ZigString reasonZigStr = Zig::toZigString(reason);
Bun__WebSocketClient__close(this->m_connectedWebSocket.client, code, &reasonZigStr);
updateHasPendingActivity();
// this->m_bufferedAmount = this->m_connectedWebSocket.client->getBufferedAmount();
break;
}
case ConnectedWebSocketKind::ClientSSL: {
m_bufferedAmount = clampToUnsigned(Bun__WebSocketClientTLS__getBufferedAmount(this->m_connectedWebSocket.clientSSL));
ZigString reasonZigStr = Zig::toZigString(reason);
Bun__WebSocketClientTLS__close(this->m_connectedWebSocket.clientSSL, code, &reasonZigStr);
updateHasPendingActivity();
// this->m_bufferedAmount = this->m_connectedWebSocket.clientSSL->getBufferedAmount();
break;
}
// case ConnectedWebSocketKind::Server: {
Expand Down Expand Up @@ -1036,11 +1041,15 @@ ExceptionOr<void> WebSocket::terminate()
m_state = CLOSING;
switch (m_connectedWebSocketKind) {
case ConnectedWebSocketKind::Client: {
// Snapshot the backlog before cancel() frees the send buffer, so
// bufferedAmount does not reset to 0 (see bufferedAmount()).
m_bufferedAmount = clampToUnsigned(Bun__WebSocketClient__getBufferedAmount(this->m_connectedWebSocket.client));
Bun__WebSocketClient__cancel(this->m_connectedWebSocket.client);
updateHasPendingActivity();
break;
}
case ConnectedWebSocketKind::ClientSSL: {
m_bufferedAmount = clampToUnsigned(Bun__WebSocketClientTLS__getBufferedAmount(this->m_connectedWebSocket.clientSSL));
Bun__WebSocketClientTLS__cancel(this->m_connectedWebSocket.clientSSL);
updateHasPendingActivity();
break;
Expand Down Expand Up @@ -1224,7 +1233,24 @@ WebSocket::State WebSocket::readyState() const

unsigned WebSocket::bufferedAmount() const
{
return saturateAdd(m_bufferedAmount, m_bufferedAmountAfterClose);
// While OPEN, query the live send-buffer size from the connection so
// backpressure is observable. Once closed the connection is gone, but the
// spec requires bufferedAmount not to reset to 0 — close()/terminate()
// snapshot the final backlog into m_bufferedAmount, and send() after close
// adds to m_bufferedAmountAfterClose, so the total only ever increases.
unsigned buffered = m_bufferedAmount;
switch (m_connectedWebSocketKind) {
case ConnectedWebSocketKind::Client:
buffered = clampToUnsigned(Bun__WebSocketClient__getBufferedAmount(this->m_connectedWebSocket.client));
break;
case ConnectedWebSocketKind::ClientSSL:
buffered = clampToUnsigned(Bun__WebSocketClientTLS__getBufferedAmount(this->m_connectedWebSocket.clientSSL));
break;
case ConnectedWebSocketKind::None:
break;
Comment thread
robobun marked this conversation as resolved.
}

return saturateAdd(buffered, m_bufferedAmountAfterClose);
}

String WebSocket::protocol() const
Expand Down Expand Up @@ -1588,7 +1614,11 @@ void WebSocket::didClose(unsigned unhandledBufferedAmount, unsigned short code,

bool wasClean = m_state == CLOSING && !unhandledBufferedAmount && code != 0; // WebSocketChannel::CloseEventCodeAbnormalClosure;
m_state = CLOSED;
m_bufferedAmount = unhandledBufferedAmount;
// Don't reset the backlog: close()/terminate() already snapshotted the
// unsent bytes into m_bufferedAmount, and the spec requires bufferedAmount
// not to drop to 0 once closed. Keep whichever is larger.
if (unhandledBufferedAmount > m_bufferedAmount)
m_bufferedAmount = unhandledBufferedAmount;
ASSERT(scriptExecutionContext());
this->m_connectedWebSocketKind = ConnectedWebSocketKind::None;
this->m_upgradeClient = nullptr;
Expand Down Expand Up @@ -1653,13 +1683,22 @@ void WebSocket::didConnect(us_socket_t* socket, char* bufferedData, size_t buffe
this->didConnect();
}

void WebSocket::didFailWithErrorCode(Bun::WebSocketErrorCode code)
void WebSocket::didFailWithErrorCode(Bun::WebSocketErrorCode code, size_t bufferedAmount)
{
// from new WebSocket() -> connect()

if (m_state == CLOSED)
return;

// Keep the backlog reported before this abrupt close (captured on the Rust
// side, since the connection's send buffer is freed during teardown) so
// bufferedAmount does not reset to 0 — see bufferedAmount(). Keep the larger
// value; this also makes the socket-close path (buffer already cleared → 0)
// a no-op.
unsigned clamped = clampToUnsigned(bufferedAmount);
if (clamped > m_bufferedAmount)
m_bufferedAmount = clamped;

this->m_upgradeClient = nullptr;
if (this->m_connectedWebSocketKind == ConnectedWebSocketKind::ClientSSL) {
this->m_connectedWebSocket.clientSSL = nullptr;
Expand Down Expand Up @@ -1886,9 +1925,9 @@ extern "C" void WebSocket__didConnectWithTunnel(WebCore::WebSocket* webSocket, v
webSocket->didConnectWithTunnel(tunnel, bufferedData, len, deflate_params);
}

extern "C" void WebSocket__didAbruptClose(WebCore::WebSocket* webSocket, Bun::WebSocketErrorCode errorCode)
extern "C" void WebSocket__didAbruptClose(WebCore::WebSocket* webSocket, Bun::WebSocketErrorCode errorCode, size_t bufferedAmount)
{
webSocket->didFailWithErrorCode(errorCode);
webSocket->didFailWithErrorCode(errorCode, bufferedAmount);
}
extern "C" void WebSocket__didClose(WebCore::WebSocket* webSocket, uint16_t errorCode, BunString* reason)
{
Expand Down
2 changes: 1 addition & 1 deletion src/jsc/bindings/webcore/WebSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class WebSocket final : public RefCounted<WebSocket>, public EventTargetWithInli
void didClose(unsigned unhandledBufferedAmount, unsigned short code, const String& reason);
void didConnect(us_socket_t* socket, char* bufferedData, size_t bufferedDataSize, const PerMessageDeflateParams* deflate_params, void* customSSLCtx);
void didConnectWithTunnel(void* tunnel, char* bufferedData, size_t bufferedDataSize, const PerMessageDeflateParams* deflate_params);
void didFailWithErrorCode(Bun::WebSocketErrorCode code);
void didFailWithErrorCode(Bun::WebSocketErrorCode code, size_t bufferedAmount = 0);

void didReceiveMessage(String&& message);
void didReceiveData(const char* data, size_t length);
Expand Down
Loading
Loading