Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 100 additions & 17 deletions src/http_jsc/websocket_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,22 @@ impl<const SSL: bool> WebSocket<SSL> {
// the destructor's finalize() — does not leak. When reached via
// fail(), outgoing_websocket is already None and this is a no-op.
if had_tunnel {
this.dispatch_abrupt_close(ErrorCode::Ended);
this.dispatch_abrupt_close(ErrorCode::Ended, None);
}
}

pub fn fail(&mut self, code: ErrorCode) {
jsc::mark_binding!();
if let Some(ws) = self.outgoing_websocket.take() {
log!("fail ({})", <&'static str>::from(code));
CppWebSocket::opaque_ref(ws.as_ptr()).did_abrupt_close(code);
// Snapshot the unsent backlog before did_abrupt_close(): the JS
// close event fires synchronously inside it, yet the send buffer is
// not freed until cancel() below, so C++ must be told the amount now
// (it cannot query the connection across this &mut self borrow).
// SAFETY: `self` is a live `&mut Self`; buffered_amount only does
// short-lived raw-ptr field reads.
let buffered = unsafe { Self::buffered_amount(self) };
CppWebSocket::opaque_ref(ws.as_ptr()).did_abrupt_close(code, buffered);
// SAFETY: `self: &mut Self` → `*mut Self`; allocation kept live by
// the socket/tunnel I/O ref (or by caller's guard).
unsafe { Self::deref(self) };
Expand Down Expand Up @@ -338,10 +345,15 @@ impl<const SSL: bool> WebSocket<SSL> {
pub fn handle_close(&mut self, _socket: Socket<SSL>, _code: c_int, _reason: *mut c_void) {
log!("onClose");
jsc::mark_binding!();
// Snapshot the backlog before clear_data() frees it, so the close event
// does not see bufferedAmount reset to 0 (e.g. peer RST with unsent
// frames). SAFETY: `self` is a live `&mut Self`; buffered_amount only
// does short-lived raw-ptr field reads.
let buffered = unsafe { Self::buffered_amount(self) };
self.clear_data();
self.tcp.detach();

self.dispatch_abrupt_close(ErrorCode::Ended);
self.dispatch_abrupt_close(ErrorCode::Ended, Some(buffered));

// For the socket.
// SAFETY: `self: &mut Self` → `*mut Self`; this is the terminal
Expand Down Expand Up @@ -1280,9 +1292,20 @@ impl<const SSL: bool> WebSocket<SSL> {
true
}
Err(true) => {
// `terminate → clear_data` resets `send_buffer`; drop the
// taken fifo without restoring.
drop(buf);
// Restore the backlog before terminating: fail() snapshots
// send_buffer.readable_length() for bufferedAmount, so it must
// still be here. `terminate → cancel → clear_data` frees it
// immediately afterward, so this does not leak.
//
// KNOWN GAP (tunnel only): if the tunnel's SslWrapper::write_data
// hits a fatal SSL error it fires on_close → fail() *synchronously
// inside* the write above — before this restore — so that
// bufferedAmount snapshot reads 0. `self.send_buffer` cannot be
// kept populated across the write without either aliasing UB (the
// slice handed to write is borrowed from it) or an extra per-flush
// copy; the window is a fatal-handshake/close-notify error
// mid-flush, and 0 there is no worse than the pre-feature behavior.
self.send_buffer = buf;
self.terminate(ErrorCode::FailedToWrite);
Comment thread
robobun marked this conversation as resolved.
false
}
Expand All @@ -1295,7 +1318,7 @@ impl<const SSL: bool> WebSocket<SSL> {

fn send_pong(&mut self) -> bool {
if !self.has_tcp() {
self.dispatch_abrupt_close(ErrorCode::Ended);
self.dispatch_abrupt_close(ErrorCode::Ended, None);
return false;
}

Expand Down Expand Up @@ -1355,7 +1378,7 @@ impl<const SSL: bool> WebSocket<SSL> {
let body_len = body_len.min(123);
log!("Sending close with code {}", code);
if !self.has_tcp() {
self.dispatch_abrupt_close(ErrorCode::Ended);
self.dispatch_abrupt_close(ErrorCode::Ended, None);
self.clear_data();
return;
}
Expand Down Expand Up @@ -1407,8 +1430,14 @@ impl<const SSL: bool> WebSocket<SSL> {
let slice = &final_body_bytes[..slice_len];

if self.enqueue_encoded_bytes(slice) {
// Snapshot the unsent backlog before clear_data() frees it, so the
// JS close event does not see bufferedAmount reset to 0 (spec: it
// does not reset once the connection closes).
// SAFETY: `self` is a live `&mut Self`; buffered_amount only does
// short-lived raw-ptr field reads.
let buffered = unsafe { Self::buffered_amount(self) };
self.clear_data();
self.dispatch_close(dispatch_code.unwrap_or(code), &mut reason);
self.dispatch_close(dispatch_code.unwrap_or(code), &mut reason, buffered);
}
}

Expand Down Expand Up @@ -1466,7 +1495,7 @@ impl<const SSL: bool> WebSocket<SSL> {
let this = unsafe { &mut *this_ptr };

if !this.has_tcp() || op > 0xF {
this.dispatch_abrupt_close(ErrorCode::Ended);
this.dispatch_abrupt_close(ErrorCode::Ended, None);
return;
}

Expand Down Expand Up @@ -1512,7 +1541,7 @@ impl<const SSL: bool> WebSocket<SSL> {
let this = unsafe { &mut *this_ptr };

if !this.has_tcp() || op > 0xF {
this.dispatch_abrupt_close(ErrorCode::Ended);
this.dispatch_abrupt_close(ErrorCode::Ended, None);
return;
}

Expand Down Expand Up @@ -1555,7 +1584,7 @@ impl<const SSL: bool> WebSocket<SSL> {
let _ = this.send_data(bytes, !this.has_backpressure(), opcode);
} else {
// Invalid blob, close connection
this.dispatch_abrupt_close(ErrorCode::Ended);
this.dispatch_abrupt_close(ErrorCode::Ended, None);
}
}

Expand All @@ -1573,7 +1602,7 @@ impl<const SSL: bool> WebSocket<SSL> {
// SAFETY: str_ is a valid pointer from C++
let str = unsafe { &*str_ };
if !this.has_tcp() {
this.dispatch_abrupt_close(ErrorCode::Ended);
this.dispatch_abrupt_close(ErrorCode::Ended, None);
return;
}

Expand Down Expand Up @@ -1626,25 +1655,33 @@ impl<const SSL: bool> WebSocket<SSL> {
);
}

fn dispatch_abrupt_close(&mut self, code: ErrorCode) {
/// `buffered_override` lets a caller that already cleared the send buffer
/// (e.g. `handle_close()` calls `clear_data()` first) pass the backlog it
/// captured beforehand. `None` snapshots the live send buffer here.
fn dispatch_abrupt_close(&mut self, code: ErrorCode, buffered_override: Option<usize>) {
let Some(out) = self.outgoing_websocket.take() else {
return;
};
self.poll_ref.unref(Self::vm_loop_ctx(&self.global_this));
jsc::mark_binding!();
CppWebSocket::opaque_ref(out.as_ptr()).did_abrupt_close(code);
// Capture the unsent backlog so C++ can keep bufferedAmount from
// resetting to 0 on abrupt close.
// SAFETY: `self` is a live `&mut Self`; buffered_amount only does
// short-lived raw-ptr field reads.
let buffered = buffered_override.unwrap_or_else(|| unsafe { Self::buffered_amount(self) });
CppWebSocket::opaque_ref(out.as_ptr()).did_abrupt_close(code, buffered);
// SAFETY: `self: &mut Self` → `*mut Self`; allocation kept live by
// caller's ref guard (see cancel/handle_close).
unsafe { Self::deref(self) };
}

fn dispatch_close(&mut self, code: u16, reason: &mut bun_core::String) {
fn dispatch_close(&mut self, code: u16, reason: &mut bun_core::String, buffered_amount: usize) {
let Some(out) = self.outgoing_websocket.take() else {
return;
};
self.poll_ref.unref(Self::vm_loop_ctx(&self.global_this));
jsc::mark_binding!();
CppWebSocket::opaque_ref(out.as_ptr()).did_close(code, reason);
CppWebSocket::opaque_ref(out.as_ptr()).did_close(code, reason, buffered_amount);
// SAFETY: `self: &mut Self` → `*mut Self`; allocation kept live by
// caller's ref guard.
unsafe { Self::deref(self) };
Expand Down Expand Up @@ -2092,6 +2129,45 @@ impl<const SSL: bool> WebSocket<SSL> {
// This is under-estimated a little, as we don't include usockets context.
cost
}

/// Bytes queued by `send()` that have not yet been written to the socket.
/// Backs the client `WebSocket.bufferedAmount` getter. Includes the framing
/// bytes of buffered frames (the send buffer holds fully framed messages),
/// plus any encrypted bytes the proxy tunnel still holds.
///
/// Takes `*const Self` and projects to `send_buffer`/`proxy_tunnel` via
/// `addr_of!` rather than forming a whole-struct `&Self`: the C++
/// `bufferedAmount` getter can run re-entrantly while a `&mut Self` is live
/// (JS reads `ws.bufferedAmount` inside an `onmessage` handler dispatched
/// from `dispatch_data(&mut self)`), and a whole-struct `&Self` would pop
/// that borrow's Unique tag (UB under Stacked Borrows).
///
/// # Safety
/// `this` must point to a live `WebSocket<SSL>`.
pub unsafe fn buffered_amount(this: *const Self) -> usize {
// SAFETY: `this` is live; short-lived shared borrows of the disjoint
// `send_buffer` and `proxy_tunnel` fields only (never a whole-struct
// `&Self`, which could overlap a live `&mut Self` on a re-entrant call).
let mut buffered = unsafe { (*core::ptr::addr_of!((*this).send_buffer)).readable_length() };
// SAFETY: as above — `proxy_tunnel` is `Copy` (Option<NonNull<_>>).
let tunnel = unsafe { *core::ptr::addr_of!((*this).proxy_tunnel) };
if let Some(tunnel) = tunnel {
// Raw-ptr accessor, not `tunnel.as_ref()`: reachable inside the
// tunnel's SSL-wrapper callbacks on abrupt close, where a
// whole-struct `&WebSocketProxyTunnel` would overlap the live
// `&mut SslWrapper` (see WebSocketProxyTunnel's Aliasing model doc).
// SAFETY: `tunnel` (NonNull) points to a live tunnel.
buffered += unsafe { WebSocketProxyTunnel::buffered_amount(tunnel.as_ptr()) };
}
buffered
}

// `extern "C"` entrypoint; `this` is non-null by C++ contract (see SAFETY comment below).
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn get_buffered_amount(this: *const Self) -> usize {
// SAFETY: called from C++ with a valid pointer.
unsafe { Self::buffered_amount(this) }
}
Comment thread
robobun marked this conversation as resolved.
}

// ──────────────────────────────────────────────────────────────────────────
Expand All @@ -2110,6 +2186,7 @@ macro_rules! export_websocket_client {
cancel = $cancel:ident,
close = $close:ident,
finalize = $finalize:ident,
get_buffered_amount = $get_buffered_amount:ident,
init = $init:ident,
init_with_tunnel = $init_with_tunnel:ident,
memory_cost = $memory_cost:ident,
Expand All @@ -2130,6 +2207,10 @@ macro_rules! export_websocket_client {
WebSocket::<$ssl>::finalize(this)
}
#[unsafe(no_mangle)]
pub extern "C" fn $get_buffered_amount(this: *const WebSocket<$ssl>) -> usize {
WebSocket::<$ssl>::get_buffered_amount(this)
}
#[unsafe(no_mangle)]
pub extern "C" fn $init(
outgoing: *mut CppWebSocket,
input_socket: *mut c_void,
Expand Down Expand Up @@ -2200,6 +2281,7 @@ export_websocket_client!(
cancel = Bun__WebSocketClient__cancel,
close = Bun__WebSocketClient__close,
finalize = Bun__WebSocketClient__finalize,
get_buffered_amount = Bun__WebSocketClient__getBufferedAmount,
init = Bun__WebSocketClient__init,
init_with_tunnel = Bun__WebSocketClient__initWithTunnel,
memory_cost = Bun__WebSocketClient__memoryCost,
Expand All @@ -2212,6 +2294,7 @@ export_websocket_client!(
cancel = Bun__WebSocketClientTLS__cancel,
close = Bun__WebSocketClientTLS__close,
finalize = Bun__WebSocketClientTLS__finalize,
get_buffered_amount = Bun__WebSocketClientTLS__getBufferedAmount,
init = Bun__WebSocketClientTLS__init,
init_with_tunnel = Bun__WebSocketClientTLS__initWithTunnel,
memory_cost = Bun__WebSocketClientTLS__memoryCost,
Expand Down
27 changes: 21 additions & 6 deletions src/http_jsc/websocket_client/CppWebSocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,17 @@ unsafe extern "C" {
buffered_len: usize,
deflate_params: *const websocket_deflate::Params,
);
safe fn WebSocket__didAbruptClose(websocket_context: &CppWebSocket, reason: ErrorCode);
fn WebSocket__didClose(websocket_context: &CppWebSocket, code: u16, reason: *const BunString);
safe fn WebSocket__didAbruptClose(
websocket_context: &CppWebSocket,
reason: ErrorCode,
buffered_amount: usize,
);
fn WebSocket__didClose(
websocket_context: &CppWebSocket,
code: u16,
reason: *const BunString,
buffered_amount: usize,
);
fn WebSocket__didReceiveText(
websocket_context: &CppWebSocket,
clone: bool,
Expand All @@ -69,22 +78,28 @@ unsafe extern "C" {
// borrows (often while `&mut WebSocket<SSL>` is also live), so `&mut self`
// would force needless `unsafe { &mut *ptr }` at every site.
impl CppWebSocket {
pub(crate) fn did_abrupt_close(&self, reason: ErrorCode) {
/// `buffered_amount` is the sender's unsent backlog captured *before* this
/// call (the connection's send buffer may be freed during the abrupt-close
/// teardown), so C++ can keep `WebSocket.bufferedAmount` from resetting to 0.
pub(crate) fn did_abrupt_close(&self, reason: ErrorCode, buffered_amount: usize) {
// SAFETY: VirtualMachine::get() returns the live current-thread VM;
// event_loop() yields its raw event-loop pointer (live for VM lifetime).
let event_loop = VirtualMachine::get().event_loop_mut();
event_loop.enter();
WebSocket__didAbruptClose(self, reason);
WebSocket__didAbruptClose(self, reason, buffered_amount);
event_loop.exit();
}

pub(crate) fn did_close(&self, code: u16, reason: &mut BunString) {
/// `buffered_amount` is the sender's unsent backlog captured *before* this
/// call (the send buffer is freed during close teardown), so C++ can keep
/// `WebSocket.bufferedAmount` from resetting to 0 once closed.
pub(crate) fn did_close(&self, code: u16, reason: &mut BunString, buffered_amount: usize) {
// SAFETY: VirtualMachine::get() returns the live current-thread VM;
// event_loop() yields its raw event-loop pointer (live for VM lifetime).
let event_loop = VirtualMachine::get().event_loop_mut();
event_loop.enter();
// SAFETY: self is a valid C++ WebCore::WebSocket; reason outlives the call.
unsafe { WebSocket__didClose(self, code, reason) };
unsafe { WebSocket__didClose(self, code, reason, buffered_amount) };
event_loop.exit();
}

Expand Down
16 changes: 16 additions & 0 deletions src/http_jsc/websocket_client/WebSocketProxyTunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,22 @@ impl WebSocketProxyTunnel {
pub(crate) fn has_backpressure(&self) -> bool {
self.write_buffer.is_not_empty()
}

/// Encrypted bytes still buffered in the tunnel awaiting a writable socket.
///
/// Takes `*const Self` and projects to `write_buffer` via `addr_of!` rather
/// than forming a whole-struct `&Self`: this is reachable from inside the
/// SSL-wrapper callbacks (abrupt close during the connected phase), which
/// hold a `&mut SslWrapper` over the `wrapper` field — a whole-struct borrow
/// would overlap it (see the module's Aliasing model doc).
///
/// # Safety
/// `this` must point to a live `WebSocketProxyTunnel`.
pub(crate) unsafe fn buffered_amount(this: *const Self) -> usize {
// SAFETY: `this` is live; short-lived shared borrow of the disjoint
// `write_buffer` field only (never touches `wrapper`).
unsafe { (*ptr::addr_of!((*this).write_buffer)).size() }
}
}

impl Drop for WebSocketProxyTunnel {
Expand Down
3 changes: 2 additions & 1 deletion src/http_jsc/websocket_client/WebSocketUpgradeClient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,8 @@ impl<const SSL: bool> HTTPClient<SSL> {
// SAFETY: short-lived `&mut` for the field take; ends before the FFI call.
let ws = unsafe { (*this).outgoing_websocket.take() };
if let Some(ws) = ws {
CppWebSocket::opaque_ref(ws).did_abrupt_close(code);
// The upgrade handshake has no send buffer yet, so the backlog is 0.
CppWebSocket::opaque_ref(ws).did_abrupt_close(code, 0);
// SAFETY: `this` carries root provenance; may free `this`.
unsafe { Self::deref(this) };
}
Expand Down
2 changes: 2 additions & 0 deletions src/jsc/bindings/headers.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading