Skip to content
Open
72 changes: 71 additions & 1 deletion docs/docs/api/DiagnosticsChannel.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,32 @@ The `handshakeResponse` object contains the HTTP response that upgraded the conn

This information is particularly useful for debugging and monitoring WebSocket connections, as it provides access to the initial HTTP handshake response that established the WebSocket connection.

## `undici:websocket:created`

This message is published when a `WebSocket` instance is created, before the opening handshake is sent.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:websocket:created').subscribe(({ websocket, url }) => {
console.log(websocket) // the WebSocket instance
console.log(url) // serialized websocket URL
})
```

## `undici:websocket:handshakeRequest`

This message is published when the HTTP upgrade request is about to be sent.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:websocket:handshakeRequest').subscribe(({ websocket, request }) => {
console.log(websocket) // the WebSocket instance
console.log(request.headers) // handshake request headers assembled so far
})
```

## `undici:websocket:close`

This message is published after the connection has closed.
Expand All @@ -222,8 +248,52 @@ This message is published if the socket experiences an error.
```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:websocket:socket_error').subscribe((error) => {
diagnosticsChannel.channel('undici:websocket:socket_error').subscribe(({ error, websocket }) => {
console.log(error)
console.log(websocket) // the WebSocket instance, if available
})
```

## `undici:websocket:frameSent`

This message is published after a WebSocket frame is written to the socket.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:websocket:frameSent').subscribe(({ websocket, opcode, mask, payloadData }) => {
console.log(websocket) // the WebSocket instance
console.log(opcode) // RFC 6455 opcode
console.log(mask) // true for client-sent frames
console.log(payloadData) // unmasked payload bytes
})
```

## `undici:websocket:frameReceived`

This message is published after a WebSocket frame is parsed from the socket.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:websocket:frameReceived').subscribe(({ websocket, opcode, mask, payloadData }) => {
console.log(websocket) // the WebSocket instance
console.log(opcode) // RFC 6455 opcode
console.log(mask) // false for server-sent frames
console.log(payloadData) // payload bytes as received
})
```

## `undici:websocket:frameError`

This message is published when Undici rejects an invalid incoming WebSocket frame.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:websocket:frameError').subscribe(({ websocket, error }) => {
console.log(websocket) // the WebSocket instance
console.log(error.message)
})
```

Expand Down
42 changes: 37 additions & 5 deletions lib/core/diagnostics.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@ const channels = {
trailers: diagnosticsChannel.channel('undici:request:trailers'),
error: diagnosticsChannel.channel('undici:request:error'),
// WebSocket
created: diagnosticsChannel.channel('undici:websocket:created'),
handshakeRequest: diagnosticsChannel.channel('undici:websocket:handshakeRequest'),
open: diagnosticsChannel.channel('undici:websocket:open'),
close: diagnosticsChannel.channel('undici:websocket:close'),
frameSent: diagnosticsChannel.channel('undici:websocket:frameSent'),
frameReceived: diagnosticsChannel.channel('undici:websocket:frameReceived'),
frameError: diagnosticsChannel.channel('undici:websocket:frameError'),
socketError: diagnosticsChannel.channel('undici:websocket:socket_error'),
ping: diagnosticsChannel.channel('undici:websocket:ping'),
pong: diagnosticsChannel.channel('undici:websocket:pong'),
Expand Down Expand Up @@ -166,15 +171,27 @@ function trackWebSocketEvents (debugLog = websocketDebuglog) {

// Check if any of the channels already have subscribers to prevent duplicate subscriptions
// This can happen when both Node.js built-in undici and undici as a dependency are present
if (channels.open.hasSubscribers || channels.close.hasSubscribers ||
channels.socketError.hasSubscribers || channels.ping.hasSubscribers ||
channels.pong.hasSubscribers) {
if (channels.created.hasSubscribers || channels.handshakeRequest.hasSubscribers ||
channels.open.hasSubscribers || channels.close.hasSubscribers ||
channels.frameSent.hasSubscribers || channels.frameReceived.hasSubscribers ||
channels.frameError.hasSubscribers || channels.socketError.hasSubscribers ||
channels.ping.hasSubscribers || channels.pong.hasSubscribers) {
isTrackingWebSocketEvents = true
return
}

isTrackingWebSocketEvents = true

diagnosticsChannel.subscribe('undici:websocket:created',
evt => {
debugLog('created websocket for %s', evt.url)
})

diagnosticsChannel.subscribe('undici:websocket:handshakeRequest',
evt => {
debugLog('sending opening handshake for %s', evt.websocket?.url ?? '<unknown>')
})

diagnosticsChannel.subscribe('undici:websocket:open',
evt => {
const {
Expand All @@ -194,9 +211,24 @@ function trackWebSocketEvents (debugLog = websocketDebuglog) {
)
})

diagnosticsChannel.subscribe('undici:websocket:frameSent',
evt => {
debugLog('frame sent opcode=%d bytes=%d', evt.opcode, evt.payloadData.length)
})

diagnosticsChannel.subscribe('undici:websocket:frameReceived',
evt => {
debugLog('frame received opcode=%d bytes=%d', evt.opcode, evt.payloadData.length)
})

diagnosticsChannel.subscribe('undici:websocket:frameError',
evt => {
debugLog('frame errored for %s - %s', evt.websocket?.url ?? '<unknown>', evt.error.message)
})

diagnosticsChannel.subscribe('undici:websocket:socket_error',
err => {
debugLog('connection errored - %s', err.message)
evt => {
debugLog('connection errored for %s - %s', evt.websocket?.url ?? '<unknown>', evt.error.message)
})

diagnosticsChannel.subscribe('undici:websocket:ping',
Expand Down
10 changes: 10 additions & 0 deletions lib/web/websocket/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const { getDecodeSplit } = require('../fetch/util')
const { WebsocketFrameSend } = require('./frame')
const assert = require('node:assert')
const { runtimeFeatures } = require('../../util/runtime-features')
const { channels } = require('../../core/diagnostics')

const crypto = runtimeFeatures.has('crypto')
? require('node:crypto')
Expand Down Expand Up @@ -89,6 +90,15 @@ function establishWebSocketConnection (url, protocols, client, handler, options)

// 11. Fetch request with useParallelQueue set to true, and
// processResponse given response being these steps:
if (channels.handshakeRequest.hasSubscribers) {
channels.handshakeRequest.publish({
websocket: handler.websocket,
request: {
headers: request.headersList.entries
}
})
}

const controller = fetching({
request,
useParallelQueue: true,
Expand Down
35 changes: 34 additions & 1 deletion lib/web/websocket/receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const { failWebsocketConnection } = require('./connection')
const { WebsocketFrameSend } = require('./frame')
const { PerMessageDeflate } = require('./permessage-deflate')
const { MessageSizeExceededError } = require('../../core/errors')
const { channels } = require('../../core/diagnostics')

// This code was influenced by ws released under the MIT license.
// Copyright (c) 2011 Einar Otto Stangvik <einaros@gmail.com>
Expand Down Expand Up @@ -97,11 +98,13 @@ class ByteParser extends Writable {
const rsv3 = buffer[0] & 0x10

if (!isValidOpcode(opcode)) {
this.publishFrameError(new Error('Invalid opcode received'))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there a cleaner way to handle this? Repeating these same lines before every failWebsocketConnection call seems like it could lead to maintenance issues.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made the function common.

failWebsocketConnection(this.#handler, 1002, 'Invalid opcode received')
return callback()
}

if (masked) {
this.publishFrameError(new Error('Frame cannot be masked'))
failWebsocketConnection(this.#handler, 1002, 'Frame cannot be masked')
return callback()
}
Expand All @@ -116,42 +119,49 @@ class ByteParser extends Writable {
// WebSocket connection where a PMCE is in use, this bit indicates
// whether a message is compressed or not.
if (rsv1 !== 0 && !this.#extensions.has('permessage-deflate')) {
this.publishFrameError(new Error('Expected RSV1 to be clear.'))
failWebsocketConnection(this.#handler, 1002, 'Expected RSV1 to be clear.')
return
}

if (rsv2 !== 0 || rsv3 !== 0) {
this.publishFrameError(new Error('RSV1, RSV2, RSV3 must be clear'))
failWebsocketConnection(this.#handler, 1002, 'RSV1, RSV2, RSV3 must be clear')
return
}

if (fragmented && !isTextBinaryFrame(opcode)) {
// Only text and binary frames can be fragmented
this.publishFrameError(new Error('Invalid frame type was fragmented.'))
failWebsocketConnection(this.#handler, 1002, 'Invalid frame type was fragmented.')
return
}

// If we are already parsing a text/binary frame and do not receive either
// a continuation frame or close frame, fail the connection.
if (isTextBinaryFrame(opcode) && this.#fragments.length > 0) {
this.publishFrameError(new Error('Expected continuation frame'))
failWebsocketConnection(this.#handler, 1002, 'Expected continuation frame')
return
}

if (this.#info.fragmented && fragmented) {
// A fragmented frame can't be fragmented itself
this.publishFrameError(new Error('Fragmented frame exceeded 125 bytes.'))
failWebsocketConnection(this.#handler, 1002, 'Fragmented frame exceeded 125 bytes.')
return
}

// "All control frames MUST have a payload length of 125 bytes or less
// and MUST NOT be fragmented."
if ((payloadLength > 125 || fragmented) && isControlFrame(opcode)) {
this.publishFrameError(new Error('Control frame either too large or fragmented'))
failWebsocketConnection(this.#handler, 1002, 'Control frame either too large or fragmented')
return
}

if (isContinuationFrame(opcode) && this.#fragments.length === 0 && !this.#info.compressed) {
this.publishFrameError(new Error('Unexpected continuation frame'))
failWebsocketConnection(this.#handler, 1002, 'Unexpected continuation frame')
return
}
Expand Down Expand Up @@ -199,6 +209,7 @@ class ByteParser extends Writable {
// https://source.chromium.org/chromium/chromium/src/+/main:v8/src/common/globals.h;drc=1946212ac0100668f14eb9e2843bdd846e510a1e;bpv=1;bpt=1;l=1275
// https://source.chromium.org/chromium/chromium/src/+/main:v8/src/objects/js-array-buffer.h;l=34;drc=1946212ac0100668f14eb9e2843bdd846e510a1e
if (upper !== 0 || lower > 2 ** 31 - 1) {
this.publishFrameError(new Error('Received payload length > 2^31 bytes.'))
failWebsocketConnection(this.#handler, 1009, 'Received payload length > 2^31 bytes.')
return
}
Expand All @@ -212,6 +223,15 @@ class ByteParser extends Writable {

const body = this.consume(this.#info.payloadLength)

if (channels.frameReceived.hasSubscribers) {
channels.frameReceived.publish({
websocket: this.#handler.websocket,
opcode: this.#info.opcode,
mask: this.#info.masked,
payloadData: Buffer.from(body)
})
}

if (isControlFrame(this.#info.opcode)) {
this.#loop = this.parseControlFrame(body)
this.#state = parserStates.INFO
Expand All @@ -231,7 +251,7 @@ class ByteParser extends Writable {
} else {
this.#extensions.get('permessage-deflate').decompress(body, this.#info.fin, (error, data) => {
if (error) {
// Use 1009 (Message Too Big) for decompression size limit errors
this.publishFrameError(error)
const code = error instanceof MessageSizeExceededError ? 1009 : 1007
failWebsocketConnection(this.#handler, code, error.message)
return
Expand Down Expand Up @@ -384,6 +404,7 @@ class ByteParser extends Writable {

if (opcode === opcodes.CLOSE) {
if (payloadLength === 1) {
this.publishFrameError(new Error('Received close frame with a 1-byte body.'))
failWebsocketConnection(this.#handler, 1002, 'Received close frame with a 1-byte body.')
return false
}
Expand All @@ -393,6 +414,7 @@ class ByteParser extends Writable {
if (this.#info.closeInfo.error) {
const { code, reason } = this.#info.closeInfo

this.publishFrameError(new Error(reason))
failWebsocketConnection(this.#handler, code, reason)
return false
}
Expand Down Expand Up @@ -448,6 +470,17 @@ class ByteParser extends Writable {
get closingInfo () {
return this.#info.closeInfo
}

publishFrameError (error) {
if (!channels.frameError.hasSubscribers) {
return
}

channels.frameError.publish({
websocket: this.#handler.websocket,
error
})
}
}

module.exports = {
Expand Down
Loading
Loading