Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 2 additions & 30 deletions src/http/h2_client/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,31 +734,9 @@ pub(crate) fn strip_padding(payload: &[u8]) -> Option<&[u8]> {
/// hop-by-hop fields. Names from lshpack are already lowercase for table
/// hits but a literal can carry anything.
pub(crate) fn is_malformed_response_field(name: &[u8]) -> bool {
if name.is_empty() {
if name.is_empty() || !name.iter().all(|&c| wire::is_lower_tchar(c)) {
return true;
}
for &c in name {
match c {
b'a'..=b'z'
| b'0'..=b'9'
| b'!'
| b'#'
| b'$'
| b'%'
| b'&'
| b'\''
| b'*'
| b'+'
| b'-'
| b'.'
| b'^'
| b'_'
| b'`'
| b'|'
| b'~' => {}
_ => return true,
}
}
matches!(
name,
b"connection"
Expand All @@ -770,13 +748,7 @@ pub(crate) fn is_malformed_response_field(name: &[u8]) -> bool {
)
}

/// RFC 9113 §8.2.1: a field value MUST NOT contain NUL (0x00), LF (0x0a), or
/// CR (0x0d). HPACK is length-prefixed so these would otherwise pass through
/// verbatim, breaking the no-CR/LF invariant the HTTP/1.1 parser provides and
/// enabling header injection when values are forwarded downstream.
pub fn is_malformed_response_value(value: &[u8]) -> bool {
value.iter().any(|&c| c == 0 || c == b'\r' || c == b'\n')
}
pub use wire::is_malformed_field_value as is_malformed_response_value;

pub fn error_code_for(err: bun_core::Error) -> wire::ErrorCode {
// bun_core::Error is a NonZeroU16 interned tag; `err!()` yields
Expand Down
251 changes: 87 additions & 164 deletions src/http/lib.rs

Large diffs are not rendered by default.

267 changes: 128 additions & 139 deletions src/http_jsc/websocket_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1717,16 +1717,19 @@ impl<const SSL: bool> WebSocket<SSL> {
this.send_close_with_body(code, None, None, 0);
}

pub extern "C" fn init(
outgoing: *mut CppWebSocket,
input_socket: *mut c_void,
/// Allocate a `WebSocket` with `ref_count == 1` and initialize deflate
/// if requested. The initial ref is the I/O-layer ref: the adopted-socket
/// ref in `init` (released by `handle_close`) or the tunnel-connection
/// ref in `init_with_tunnel` (released in `clear_data` when
/// `proxy_tunnel` is detached). The C++ ref paired with
/// `m_connectedWebSocket` is taken later in `finish_init`.
fn new_ws(
global_this: &JSGlobalObject,
buffered_data: *mut u8,
buffered_data_len: usize,
outgoing: *mut CppWebSocket,
secure: Option<*mut SslCtx>,
proxy_tunnel: Option<NonNull<WebSocketProxyTunnel>>,
deflate_params: Option<&websocket_deflate::Params>,
secure_ptr: *mut c_void,
) -> *mut c_void {
let tcp = input_socket.cast::<us_socket_t>();
) -> *mut Self {
// outlives this call.
let vm = global_this.bun_vm().as_mut();
let ws = bun_core::heap::into_raw(Box::new(WebSocket::<SSL> {
Expand Down Expand Up @@ -1755,30 +1758,100 @@ impl<const SSL: bool> WebSocket<SSL> {
initial_data_handler: None,
// reshaped for borrowck — `vm.event_loop()` returns a
// `&'static`-tied borrow that would lock `vm` for the rest of the
// fn; re-derive from `global_this` so `vm` stays usable below.
// fn; re-derive from `global_this` so `vm` stays usable for the
// deflate init below.
// SAFETY: bun_vm() never returns null; event_loop ptr is live for VM lifetime.
event_loop: global_this.bun_vm().event_loop_mut(),
deflate: None,
receiving_compressed: false,
message_is_compressed: false,
secure: if secure_ptr.is_null() {
None
} else {
Some(secure_ptr.cast::<SslCtx>())
},
proxy_tunnel: None,
secure,
proxy_tunnel,
}));
bun_core::scoped_log!(alloc, "new({}) = {:p}", Self::ALLOC_TYPE_NAME, ws);
// SAFETY: ws was just allocated via heap::alloc
let ws_ref = unsafe { &mut *ws };

if let Some(params) = deflate_params {
match WebSocketDeflate::init(*params, vm.rare_data()) {
Ok(deflate) => ws_ref.deflate = Some(deflate),
Err(_) => ws_ref.deflate = None,
}
// SAFETY: ws was just allocated via heap::alloc
unsafe { (*ws).deflate = WebSocketDeflate::init(*params, vm.rare_data()).ok() };
}

ws
}

/// Shared tail of `init` / `init_with_tunnel`: preallocate the frame
/// buffers, ref the event loop, queue any buffered handshake data as a
/// microtask, and take the C++-side ref.
///
/// # Safety
/// `ws` must be the live `heap::alloc` allocation returned by `new_ws`,
/// with no other `&`/`&mut` borrow of `*ws` live across this call. If
/// `buffered_data_len > 0`, `buffered_data` must be a mimalloc allocation
/// of that length whose ownership transfers to this call (extern-C
/// contract with the upgrade client).
unsafe fn finish_init(
ws: *mut Self,
global_this: &JSGlobalObject,
buffered: Option<Box<[u8]>>,
) -> *mut c_void {
// SAFETY: caller contract — `ws` is live with no other borrows.
let ws_ref = unsafe { &mut *ws };
bun_core::handle_oom(ws_ref.send_buffer.ensure_total_capacity(2048));
bun_core::handle_oom(ws_ref.receive_buffer.ensure_total_capacity(2048));
ws_ref.poll_ref.r#ref(Self::vm_loop_ctx(global_this));

if let Some(buffered_slice) = buffered {
let initial_data = bun_core::heap::into_raw(Box::new(InitialDataHandler::<SSL> {
adopted: NonNull::new(ws),
slice: buffered_slice,
// We need to ref the outgoing websocket so that it doesn't get
// finalized before the initial data handler is called.
// SAFETY: `outgoing_websocket` (set by `new_ws` from the
// extern-C `outgoing` argument) is a valid CppWebSocket*; it
// outlives the handler — `handle_without_deinit` drops the
// ref before C++ can finalize.
ws: ws_ref
.outgoing_websocket
.map(|p| unsafe { CppWebSocketRef::new(p) }),
}));
// Backref so `handle_data` can drain the buffered slice ahead of
// fresh socket data, and so `deinit()` can detach from the box if
// teardown races ahead of the microtask drain.
ws_ref.initial_data_handler = NonNull::new(initial_data);

// Use a higher-priority callback for the initial onData handler
// `queue_microtask_callback` takes an erased
// `(*mut c_void, unsafe extern "C" fn(*mut c_void))`; cast both.
global_this.queue_microtask_callback(
initial_data.cast::<c_void>(),
InitialDataHandler::<SSL>::handle,
);
}

// And lastly, ref the new websocket since C++ has a reference to it
ws_ref.ref_();

ws.cast::<c_void>()
}

pub extern "C" fn init(
outgoing: *mut CppWebSocket,
input_socket: *mut c_void,
global_this: &JSGlobalObject,
buffered_data: *mut u8,
buffered_data_len: usize,
deflate_params: Option<&websocket_deflate::Params>,
secure_ptr: *mut c_void,
) -> *mut c_void {
let tcp = input_socket.cast::<us_socket_t>();
let secure = if secure_ptr.is_null() {
None
} else {
Some(secure_ptr.cast::<SslCtx>())
};
let ws = Self::new_ws(global_this, outgoing, secure, None, deflate_params);
// outlives this call.
let vm = global_this.bun_vm().as_mut();

// `adopt_group` takes a closure to write the new socket.
let group = {
// reshaped for borrowck — `rare_data()` borrows `vm`
Expand All @@ -1804,8 +1877,8 @@ impl<const SSL: bool> WebSocket<SSL> {
},
ws,
// SAFETY: `owner == ws` is a valid live allocation; raw-ptr field
// write avoids materializing a second `&mut` that would alias
// `ws_ref` above.
// write avoids materializing a `&mut WebSocket` around the
// callback.
|owner, sock| unsafe { core::ptr::addr_of_mut!((*owner).tcp).write(sock) },
) {
// SAFETY: `ws` is the `heap::alloc` allocation just created
Expand All @@ -1814,50 +1887,24 @@ impl<const SSL: bool> WebSocket<SSL> {
return core::ptr::null_mut();
}

bun_core::handle_oom(ws_ref.send_buffer.ensure_total_capacity(2048));
bun_core::handle_oom(ws_ref.receive_buffer.ensure_total_capacity(2048));
ws_ref.poll_ref.r#ref(Self::vm_loop_ctx(global_this));

if buffered_data_len > 0 {
let buffered: Option<Box<[u8]>> = if buffered_data_len > 0 {
// SAFETY: buffered_data/len from C++; caller guarantees validity.
// The upgrade client allocated this buffer via mimalloc
// and transfers ownership to us.
// The global allocator is also mimalloc, so `heap::take`
// adopts the original allocation (no copy) and `Drop` will `mi_free` it.
let buffered_slice: Box<[u8]> = unsafe {
Some(unsafe {
bun_core::heap::take(std::ptr::slice_from_raw_parts_mut(
buffered_data,
buffered_data_len,
))
};
let initial_data = bun_core::heap::into_raw(Box::new(InitialDataHandler::<SSL> {
adopted: NonNull::new(ws),
slice: buffered_slice,
// We need to ref the outgoing websocket so that it doesn't get
// finalized before the initial data handler is called.
// SAFETY: outgoing is a valid CppWebSocket* (extern-C contract);
// it outlives the handler — `handle_without_deinit` drops the
// ref before C++ can finalize.
ws: NonNull::new(outgoing).map(|p| unsafe { CppWebSocketRef::new(p) }),
}));
// Backref so `handle_data` can drain the buffered slice ahead of
// fresh socket data, and so `deinit()` can detach from the box if
// teardown races ahead of the microtask drain.
ws_ref.initial_data_handler = NonNull::new(initial_data);

// Use a higher-priority callback for the initial onData handler
// `queue_microtask_callback` takes an erased
// `(*mut c_void, unsafe extern "C" fn(*mut c_void))`; cast both.
global_this.queue_microtask_callback(
initial_data.cast::<c_void>(),
InitialDataHandler::<SSL>::handle,
);
}

// And lastly, ref the new websocket since C++ has a reference to it
ws_ref.ref_();

ws.cast::<c_void>()
})
} else {
None
};
// SAFETY: `ws` is the live allocation created above with no other
// borrows.
unsafe { Self::finish_init(ws, global_this, buffered) }
}

/// Initialize a WebSocket client that uses a proxy tunnel for I/O.
Expand All @@ -1882,92 +1929,34 @@ impl<const SSL: bool> WebSocket<SSL> {
NonNull::new(p).expect("extern-C contract: tunnel_ptr is non-null")
};

// ref_count starts at 1: this is the I/O-layer ref, owned by the
// tunnel connection (analogous to the adopted-socket ref in init()
// that handle_close() releases). It is released in clear_data() when
// proxy_tunnel is detached. The ws.ref() below adds the C++ ref
// paired with m_connectedWebSocket.
// outlives this call.
let vm = global_this.bun_vm().as_mut();
let ws = bun_core::heap::into_raw(Box::new(WebSocket::<SSL> {
ref_count: Cell::new(1),
tcp: Socket::<SSL>::detached(), // No direct socket - using tunnel
outgoing_websocket: NonNull::new(outgoing),
receive_state: ReceiveState::NeedHeader,
receiving_type: Opcode::ResB,
receiving_is_final: true,
ping_frame_bytes: [0u8; 128 + 6],
ping_len: 0,
ping_received: false,
pong_received: false,
close_received: false,
close_frame_buffering: false,
receive_frame: 0,
receive_body_remain: 0,
receive_pending_chunk_len: 0,
receive_buffer: LinearFifo::<u8, DynamicBuffer<u8>>::init(),
send_buffer: LinearFifo::<u8, DynamicBuffer<u8>>::init(),
global_this: GlobalRef::from(global_this),
poll_ref: KeepAlive::init(),
header_fragment: None,
payload_length_frame_bytes: [0u8; 8],
payload_length_frame_len: 0,
initial_data_handler: None,
// reshaped for borrowck — `vm.event_loop()` returns a
// `&'static`-tied borrow that would lock `vm` for the rest of the
// fn; re-derive from `global_this` so `vm` stays usable below.
// SAFETY: bun_vm() never returns null; event_loop ptr is live for VM lifetime.
event_loop: global_this.bun_vm().event_loop_mut(),
deflate: None,
receiving_compressed: false,
message_is_compressed: false,
secure: None,
proxy_tunnel: Some(tunnel_owned),
}));
bun_core::scoped_log!(alloc, "new({}) = {:p}", Self::ALLOC_TYPE_NAME, ws);
// SAFETY: ws was just allocated via heap::alloc
let ws_ref = unsafe { &mut *ws };

if let Some(params) = deflate_params {
match WebSocketDeflate::init(*params, vm.rare_data()) {
Ok(deflate) => ws_ref.deflate = Some(deflate),
Err(_) => ws_ref.deflate = None,
}
}

bun_core::handle_oom(ws_ref.send_buffer.ensure_total_capacity(2048));
bun_core::handle_oom(ws_ref.receive_buffer.ensure_total_capacity(2048));
ws_ref.poll_ref.r#ref(Self::vm_loop_ctx(global_this));
// No direct socket — `tcp` stays detached and all I/O goes through
// the tunnel.
let ws = Self::new_ws(
global_this,
outgoing,
None,
Some(tunnel_owned),
deflate_params,
);

if buffered_data_len > 0 {
// SAFETY: see `init()` — adopt the C++ mimalloc-owned buffer
// directly so it is freed (not leaked) when the handler drops.
let buffered_slice: Box<[u8]> = unsafe {
let buffered: Option<Box<[u8]>> = if buffered_data_len > 0 {
// SAFETY: buffered_data/len from C++; caller guarantees validity.
// The upgrade client allocated this buffer via mimalloc
// and transfers ownership to us.
// The global allocator is also mimalloc, so `heap::take`
// adopts the original allocation (no copy) and `Drop` will `mi_free` it.
Some(unsafe {
bun_core::heap::take(std::ptr::slice_from_raw_parts_mut(
buffered_data,
buffered_data_len,
))
};
let initial_data = bun_core::heap::into_raw(Box::new(InitialDataHandler::<SSL> {
adopted: NonNull::new(ws),
slice: buffered_slice,
// SAFETY: outgoing is a valid CppWebSocket* (extern-C contract);
// it outlives the handler — `handle_without_deinit` drops the
// ref before C++ can finalize.
ws: NonNull::new(outgoing).map(|p| unsafe { CppWebSocketRef::new(p) }),
}));
ws_ref.initial_data_handler = NonNull::new(initial_data);
// `queue_microtask_callback` takes an erased
// `(*mut c_void, unsafe extern "C" fn(*mut c_void))`; cast both.
global_this.queue_microtask_callback(
initial_data.cast::<c_void>(),
InitialDataHandler::<SSL>::handle,
);
}

ws_ref.ref_();

ws.cast::<c_void>()
})
} else {
None
};
// SAFETY: `ws` is the live allocation created above with no other
// borrows.
unsafe { Self::finish_init(ws, global_this, buffered) }
}

/// Handle data received from the proxy tunnel (already decrypted).
Expand Down
Loading
Loading