Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 101 additions & 17 deletions src/http_jsc/websocket_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,22 @@
// the destructor's finalize() — does not leak. When reached via
// fail(), outgoing_websocket is already None and this is a no-op.
if had_tunnel {
this.dispatch_abrupt_close(ErrorCode::Ended);
this.dispatch_abrupt_close(ErrorCode::Ended, None);
}
}

pub fn fail(&mut self, code: ErrorCode) {
jsc::mark_binding!();
if let Some(ws) = self.outgoing_websocket.take() {
log!("fail ({})", <&'static str>::from(code));
CppWebSocket::opaque_ref(ws.as_ptr()).did_abrupt_close(code);
// Snapshot the unsent backlog before did_abrupt_close(): the JS
// close event fires synchronously inside it, yet the send buffer is
// not freed until cancel() below, so C++ must be told the amount now
// (it cannot query the connection across this &mut self borrow).
// SAFETY: `self` is a live `&mut Self`; buffered_amount only does
// short-lived raw-ptr field reads.
let buffered = unsafe { Self::buffered_amount(self) };
CppWebSocket::opaque_ref(ws.as_ptr()).did_abrupt_close(code, buffered);
// SAFETY: `self: &mut Self` → `*mut Self`; allocation kept live by
// the socket/tunnel I/O ref (or by caller's guard).
unsafe { Self::deref(self) };
Expand Down Expand Up @@ -338,10 +345,15 @@
pub fn handle_close(&mut self, _socket: Socket<SSL>, _code: c_int, _reason: *mut c_void) {
log!("onClose");
jsc::mark_binding!();
// Snapshot the backlog before clear_data() frees it, so the close event
// does not see bufferedAmount reset to 0 (e.g. peer RST with unsent
// frames). SAFETY: `self` is a live `&mut Self`; buffered_amount only
// does short-lived raw-ptr field reads.
let buffered = unsafe { Self::buffered_amount(self) };
self.clear_data();
self.tcp.detach();

self.dispatch_abrupt_close(ErrorCode::Ended);
self.dispatch_abrupt_close(ErrorCode::Ended, Some(buffered));

// For the socket.
// SAFETY: `self: &mut Self` → `*mut Self`; this is the terminal
Expand Down Expand Up @@ -1280,9 +1292,21 @@
true
}
Err(true) => {
// `terminate → clear_data` resets `send_buffer`; drop the
// taken fifo without restoring.
drop(buf);
// Restore the backlog before terminating: fail() snapshots
// send_buffer.readable_length() for bufferedAmount, so it must
// still be here. `terminate → cancel → clear_data` frees it
// immediately afterward, so this does not leak.
//
// KNOWN GAP (tunnel only): if the tunnel's SslWrapper::write_data
// hits a fatal SSL error it fires on_close → fail() *synchronously
// inside* the write above — before this restore — so that
// bufferedAmount snapshot reads 0. Reporting the data as
// `self.send_buffer` cannot be kept populated across the write

Check warning on line 1304 in src/http_jsc/websocket_client.rs

View check run for this annotation

Claude / Claude Code Review

Garbled sentence in KNOWN GAP comment

The KNOWN GAP comment has a garbled sentence at lines 1303–1304: "…snapshot reads 0. Reporting the data as `self.send_buffer` cannot be kept populated across the write…". The fragment "Reporting the data as" is an editing artifact that doesn't connect to what follows — it should just read "`self.send_buffer` cannot be kept populated across the write without either…" (matching your own phrasing in the resolved review thread).
Comment thread
robobun marked this conversation as resolved.
Outdated
// without either aliasing UB (the slice handed to write is
// borrowed from it) or an extra per-flush copy; the window is a
// fatal-handshake/close-notify error mid-flush, and 0 there is no
// worse than the pre-feature behavior.
self.send_buffer = buf;
self.terminate(ErrorCode::FailedToWrite);
Comment thread
robobun marked this conversation as resolved.
false
}
Expand All @@ -1295,7 +1319,7 @@

fn send_pong(&mut self) -> bool {
if !self.has_tcp() {
self.dispatch_abrupt_close(ErrorCode::Ended);
self.dispatch_abrupt_close(ErrorCode::Ended, None);
return false;
}

Expand Down Expand Up @@ -1355,7 +1379,7 @@
let body_len = body_len.min(123);
log!("Sending close with code {}", code);
if !self.has_tcp() {
self.dispatch_abrupt_close(ErrorCode::Ended);
self.dispatch_abrupt_close(ErrorCode::Ended, None);
self.clear_data();
return;
}
Expand Down Expand Up @@ -1407,8 +1431,14 @@
let slice = &final_body_bytes[..slice_len];

if self.enqueue_encoded_bytes(slice) {
// Snapshot the unsent backlog before clear_data() frees it, so the
// JS close event does not see bufferedAmount reset to 0 (spec: it
// does not reset once the connection closes).
// SAFETY: `self` is a live `&mut Self`; buffered_amount only does
// short-lived raw-ptr field reads.
let buffered = unsafe { Self::buffered_amount(self) };
self.clear_data();
self.dispatch_close(dispatch_code.unwrap_or(code), &mut reason);
self.dispatch_close(dispatch_code.unwrap_or(code), &mut reason, buffered);
}
}

Expand Down Expand Up @@ -1466,7 +1496,7 @@
let this = unsafe { &mut *this_ptr };

if !this.has_tcp() || op > 0xF {
this.dispatch_abrupt_close(ErrorCode::Ended);
this.dispatch_abrupt_close(ErrorCode::Ended, None);
return;
}

Expand Down Expand Up @@ -1512,7 +1542,7 @@
let this = unsafe { &mut *this_ptr };

if !this.has_tcp() || op > 0xF {
this.dispatch_abrupt_close(ErrorCode::Ended);
this.dispatch_abrupt_close(ErrorCode::Ended, None);
return;
}

Expand Down Expand Up @@ -1555,7 +1585,7 @@
let _ = this.send_data(bytes, !this.has_backpressure(), opcode);
} else {
// Invalid blob, close connection
this.dispatch_abrupt_close(ErrorCode::Ended);
this.dispatch_abrupt_close(ErrorCode::Ended, None);
}
}

Expand All @@ -1573,7 +1603,7 @@
// SAFETY: str_ is a valid pointer from C++
let str = unsafe { &*str_ };
if !this.has_tcp() {
this.dispatch_abrupt_close(ErrorCode::Ended);
this.dispatch_abrupt_close(ErrorCode::Ended, None);
return;
}

Expand Down Expand Up @@ -1626,25 +1656,33 @@
);
}

fn dispatch_abrupt_close(&mut self, code: ErrorCode) {
/// `buffered_override` lets a caller that already cleared the send buffer
/// (e.g. `handle_close()` calls `clear_data()` first) pass the backlog it
/// captured beforehand. `None` snapshots the live send buffer here.
fn dispatch_abrupt_close(&mut self, code: ErrorCode, buffered_override: Option<usize>) {
let Some(out) = self.outgoing_websocket.take() else {
return;
};
self.poll_ref.unref(Self::vm_loop_ctx(&self.global_this));
jsc::mark_binding!();
CppWebSocket::opaque_ref(out.as_ptr()).did_abrupt_close(code);
// Capture the unsent backlog so C++ can keep bufferedAmount from
// resetting to 0 on abrupt close.
// SAFETY: `self` is a live `&mut Self`; buffered_amount only does
// short-lived raw-ptr field reads.
let buffered = buffered_override.unwrap_or_else(|| unsafe { Self::buffered_amount(self) });
CppWebSocket::opaque_ref(out.as_ptr()).did_abrupt_close(code, buffered);
// SAFETY: `self: &mut Self` → `*mut Self`; allocation kept live by
// caller's ref guard (see cancel/handle_close).
unsafe { Self::deref(self) };
}

fn dispatch_close(&mut self, code: u16, reason: &mut bun_core::String) {
fn dispatch_close(&mut self, code: u16, reason: &mut bun_core::String, buffered_amount: usize) {
let Some(out) = self.outgoing_websocket.take() else {
return;
};
self.poll_ref.unref(Self::vm_loop_ctx(&self.global_this));
jsc::mark_binding!();
CppWebSocket::opaque_ref(out.as_ptr()).did_close(code, reason);
CppWebSocket::opaque_ref(out.as_ptr()).did_close(code, reason, buffered_amount);
// SAFETY: `self: &mut Self` → `*mut Self`; allocation kept live by
// caller's ref guard.
unsafe { Self::deref(self) };
Expand Down Expand Up @@ -2092,6 +2130,45 @@
// This is under-estimated a little, as we don't include usockets context.
cost
}

/// Bytes queued by `send()` that have not yet been written to the socket.
/// Backs the client `WebSocket.bufferedAmount` getter. Includes the framing
/// bytes of buffered frames (the send buffer holds fully framed messages),
/// plus any encrypted bytes the proxy tunnel still holds.
///
/// Takes `*const Self` and projects to `send_buffer`/`proxy_tunnel` via
/// `addr_of!` rather than forming a whole-struct `&Self`: the C++
/// `bufferedAmount` getter can run re-entrantly while a `&mut Self` is live
/// (JS reads `ws.bufferedAmount` inside an `onmessage` handler dispatched
/// from `dispatch_data(&mut self)`), and a whole-struct `&Self` would pop
/// that borrow's Unique tag (UB under Stacked Borrows).
///
/// # Safety
/// `this` must point to a live `WebSocket<SSL>`.
pub unsafe fn buffered_amount(this: *const Self) -> usize {
// SAFETY: `this` is live; short-lived shared borrows of the disjoint
// `send_buffer` and `proxy_tunnel` fields only (never a whole-struct
// `&Self`, which could overlap a live `&mut Self` on a re-entrant call).
let mut buffered = unsafe { (*core::ptr::addr_of!((*this).send_buffer)).readable_length() };
// SAFETY: as above — `proxy_tunnel` is `Copy` (Option<NonNull<_>>).
let tunnel = unsafe { *core::ptr::addr_of!((*this).proxy_tunnel) };
if let Some(tunnel) = tunnel {
// Raw-ptr accessor, not `tunnel.as_ref()`: reachable inside the
// tunnel's SSL-wrapper callbacks on abrupt close, where a
// whole-struct `&WebSocketProxyTunnel` would overlap the live
// `&mut SslWrapper` (see WebSocketProxyTunnel's Aliasing model doc).
// SAFETY: `tunnel` (NonNull) points to a live tunnel.
buffered += unsafe { WebSocketProxyTunnel::buffered_amount(tunnel.as_ptr()) };
}
buffered
}

// `extern "C"` entrypoint; `this` is non-null by C++ contract (see SAFETY comment below).
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn get_buffered_amount(this: *const Self) -> usize {
// SAFETY: called from C++ with a valid pointer.
unsafe { Self::buffered_amount(this) }
}
Comment thread
robobun marked this conversation as resolved.
}

// ──────────────────────────────────────────────────────────────────────────
Expand All @@ -2110,6 +2187,7 @@
cancel = $cancel:ident,
close = $close:ident,
finalize = $finalize:ident,
get_buffered_amount = $get_buffered_amount:ident,
init = $init:ident,
init_with_tunnel = $init_with_tunnel:ident,
memory_cost = $memory_cost:ident,
Expand All @@ -2130,6 +2208,10 @@
WebSocket::<$ssl>::finalize(this)
}
#[unsafe(no_mangle)]
pub extern "C" fn $get_buffered_amount(this: *const WebSocket<$ssl>) -> usize {
WebSocket::<$ssl>::get_buffered_amount(this)
}
#[unsafe(no_mangle)]
pub extern "C" fn $init(
outgoing: *mut CppWebSocket,
input_socket: *mut c_void,
Expand Down Expand Up @@ -2200,6 +2282,7 @@
cancel = Bun__WebSocketClient__cancel,
close = Bun__WebSocketClient__close,
finalize = Bun__WebSocketClient__finalize,
get_buffered_amount = Bun__WebSocketClient__getBufferedAmount,
init = Bun__WebSocketClient__init,
init_with_tunnel = Bun__WebSocketClient__initWithTunnel,
memory_cost = Bun__WebSocketClient__memoryCost,
Expand All @@ -2212,6 +2295,7 @@
cancel = Bun__WebSocketClientTLS__cancel,
close = Bun__WebSocketClientTLS__close,
finalize = Bun__WebSocketClientTLS__finalize,
get_buffered_amount = Bun__WebSocketClientTLS__getBufferedAmount,
init = Bun__WebSocketClientTLS__init,
init_with_tunnel = Bun__WebSocketClientTLS__initWithTunnel,
memory_cost = Bun__WebSocketClientTLS__memoryCost,
Expand Down
27 changes: 21 additions & 6 deletions src/http_jsc/websocket_client/CppWebSocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,17 @@ unsafe extern "C" {
buffered_len: usize,
deflate_params: *const websocket_deflate::Params,
);
safe fn WebSocket__didAbruptClose(websocket_context: &CppWebSocket, reason: ErrorCode);
fn WebSocket__didClose(websocket_context: &CppWebSocket, code: u16, reason: *const BunString);
safe fn WebSocket__didAbruptClose(
websocket_context: &CppWebSocket,
reason: ErrorCode,
buffered_amount: usize,
);
fn WebSocket__didClose(
websocket_context: &CppWebSocket,
code: u16,
reason: *const BunString,
buffered_amount: usize,
);
fn WebSocket__didReceiveText(
websocket_context: &CppWebSocket,
clone: bool,
Expand All @@ -69,22 +78,28 @@ unsafe extern "C" {
// borrows (often while `&mut WebSocket<SSL>` is also live), so `&mut self`
// would force needless `unsafe { &mut *ptr }` at every site.
impl CppWebSocket {
pub(crate) fn did_abrupt_close(&self, reason: ErrorCode) {
/// `buffered_amount` is the sender's unsent backlog captured *before* this
/// call (the connection's send buffer may be freed during the abrupt-close
/// teardown), so C++ can keep `WebSocket.bufferedAmount` from resetting to 0.
pub(crate) fn did_abrupt_close(&self, reason: ErrorCode, buffered_amount: usize) {
// SAFETY: VirtualMachine::get() returns the live current-thread VM;
// event_loop() yields its raw event-loop pointer (live for VM lifetime).
let event_loop = VirtualMachine::get().event_loop_mut();
event_loop.enter();
WebSocket__didAbruptClose(self, reason);
WebSocket__didAbruptClose(self, reason, buffered_amount);
event_loop.exit();
}

pub(crate) fn did_close(&self, code: u16, reason: &mut BunString) {
/// `buffered_amount` is the sender's unsent backlog captured *before* this
/// call (the send buffer is freed during close teardown), so C++ can keep
/// `WebSocket.bufferedAmount` from resetting to 0 once closed.
pub(crate) fn did_close(&self, code: u16, reason: &mut BunString, buffered_amount: usize) {
// SAFETY: VirtualMachine::get() returns the live current-thread VM;
// event_loop() yields its raw event-loop pointer (live for VM lifetime).
let event_loop = VirtualMachine::get().event_loop_mut();
event_loop.enter();
// SAFETY: self is a valid C++ WebCore::WebSocket; reason outlives the call.
unsafe { WebSocket__didClose(self, code, reason) };
unsafe { WebSocket__didClose(self, code, reason, buffered_amount) };
event_loop.exit();
}

Expand Down
16 changes: 16 additions & 0 deletions src/http_jsc/websocket_client/WebSocketProxyTunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,22 @@ impl WebSocketProxyTunnel {
pub(crate) fn has_backpressure(&self) -> bool {
self.write_buffer.is_not_empty()
}

/// Encrypted bytes still buffered in the tunnel awaiting a writable socket.
///
/// Takes `*const Self` and projects to `write_buffer` via `addr_of!` rather
/// than forming a whole-struct `&Self`: this is reachable from inside the
/// SSL-wrapper callbacks (abrupt close during the connected phase), which
/// hold a `&mut SslWrapper` over the `wrapper` field — a whole-struct borrow
/// would overlap it (see the module's Aliasing model doc).
///
/// # Safety
/// `this` must point to a live `WebSocketProxyTunnel`.
pub(crate) unsafe fn buffered_amount(this: *const Self) -> usize {
// SAFETY: `this` is live; short-lived shared borrow of the disjoint
// `write_buffer` field only (never touches `wrapper`).
unsafe { (*ptr::addr_of!((*this).write_buffer)).size() }
}
}

impl Drop for WebSocketProxyTunnel {
Expand Down
3 changes: 2 additions & 1 deletion src/http_jsc/websocket_client/WebSocketUpgradeClient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,8 @@ impl<const SSL: bool> HTTPClient<SSL> {
// SAFETY: short-lived `&mut` for the field take; ends before the FFI call.
let ws = unsafe { (*this).outgoing_websocket.take() };
if let Some(ws) = ws {
CppWebSocket::opaque_ref(ws).did_abrupt_close(code);
// The upgrade handshake has no send buffer yet, so the backlog is 0.
CppWebSocket::opaque_ref(ws).did_abrupt_close(code, 0);
// SAFETY: `this` carries root provenance; may free `this`.
unsafe { Self::deref(this) };
}
Expand Down
2 changes: 2 additions & 0 deletions src/jsc/bindings/headers.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading