diff --git a/docs/docs/api/DiagnosticsChannel.md b/docs/docs/api/DiagnosticsChannel.md index acf25e08218..a18173fd309 100644 --- a/docs/docs/api/DiagnosticsChannel.md +++ b/docs/docs/api/DiagnosticsChannel.md @@ -201,6 +201,32 @@ The `handshakeResponse` object contains the HTTP response that upgraded the conn This information is particularly useful for debugging and monitoring WebSocket connections, as it provides access to the initial HTTP handshake response that established the WebSocket connection. +## `undici:websocket:created` + +This message is published when a `WebSocket` instance is created, before the opening handshake is sent. + +```js +import diagnosticsChannel from 'diagnostics_channel' + +diagnosticsChannel.channel('undici:websocket:created').subscribe(({ websocket, url }) => { + console.log(websocket) // the WebSocket instance + console.log(url) // serialized websocket URL +}) +``` + +## `undici:websocket:handshakeRequest` + +This message is published when the HTTP upgrade request is about to be sent. + +```js +import diagnosticsChannel from 'diagnostics_channel' + +diagnosticsChannel.channel('undici:websocket:handshakeRequest').subscribe(({ websocket, request }) => { + console.log(websocket) // the WebSocket instance + console.log(request.headers) // handshake request headers assembled so far +}) +``` + ## `undici:websocket:close` This message is published after the connection has closed. @@ -227,6 +253,47 @@ diagnosticsChannel.channel('undici:websocket:socket_error').subscribe((error) => }) ``` +## `undici:websocket:frameSent` + +This message is published after a WebSocket frame is written to the socket. + +```js +import diagnosticsChannel from 'diagnostics_channel' + +diagnosticsChannel.channel('undici:websocket:frameSent').subscribe(({ websocket, opcode, payloadData }) => { + console.log(websocket) // the WebSocket instance + console.log(opcode) // RFC 6455 opcode + console.log(payloadData) // unmasked payload bytes +}) +``` + +## `undici:websocket:frameReceived` + +This message is published after a WebSocket frame is parsed from the socket. + +```js +import diagnosticsChannel from 'diagnostics_channel' + +diagnosticsChannel.channel('undici:websocket:frameReceived').subscribe(({ websocket, opcode, payloadData }) => { + console.log(websocket) // the WebSocket instance + console.log(opcode) // RFC 6455 opcode + console.log(payloadData) // payload bytes as received +}) +``` + +## `undici:websocket:frameError` + +This message is published when Undici rejects an invalid incoming WebSocket frame. + +```js +import diagnosticsChannel from 'diagnostics_channel' + +diagnosticsChannel.channel('undici:websocket:frameError').subscribe(({ websocket, error }) => { + console.log(websocket) // the WebSocket instance + console.log(error.message) +}) +``` + ## `undici:websocket:ping` This message is published after the client receives a ping frame, if the connection is not closing. diff --git a/lib/core/diagnostics.js b/lib/core/diagnostics.js index ccd6870ca6d..f63828077e4 100644 --- a/lib/core/diagnostics.js +++ b/lib/core/diagnostics.js @@ -22,8 +22,13 @@ const channels = { trailers: diagnosticsChannel.channel('undici:request:trailers'), error: diagnosticsChannel.channel('undici:request:error'), // WebSocket + created: diagnosticsChannel.channel('undici:websocket:created'), + handshakeRequest: diagnosticsChannel.channel('undici:websocket:handshakeRequest'), open: diagnosticsChannel.channel('undici:websocket:open'), close: diagnosticsChannel.channel('undici:websocket:close'), + frameSent: diagnosticsChannel.channel('undici:websocket:frameSent'), + frameReceived: diagnosticsChannel.channel('undici:websocket:frameReceived'), + frameError: diagnosticsChannel.channel('undici:websocket:frameError'), socketError: diagnosticsChannel.channel('undici:websocket:socket_error'), ping: diagnosticsChannel.channel('undici:websocket:ping'), pong: diagnosticsChannel.channel('undici:websocket:pong'), @@ -166,15 +171,27 @@ function trackWebSocketEvents (debugLog = websocketDebuglog) { // Check if any of the channels already have subscribers to prevent duplicate subscriptions // This can happen when both Node.js built-in undici and undici as a dependency are present - if (channels.open.hasSubscribers || channels.close.hasSubscribers || - channels.socketError.hasSubscribers || channels.ping.hasSubscribers || - channels.pong.hasSubscribers) { + if (channels.created.hasSubscribers || channels.handshakeRequest.hasSubscribers || + channels.open.hasSubscribers || channels.close.hasSubscribers || + channels.frameSent.hasSubscribers || channels.frameReceived.hasSubscribers || + channels.frameError.hasSubscribers || channels.socketError.hasSubscribers || + channels.ping.hasSubscribers || channels.pong.hasSubscribers) { isTrackingWebSocketEvents = true return } isTrackingWebSocketEvents = true + diagnosticsChannel.subscribe('undici:websocket:created', + evt => { + debugLog('created websocket for %s', evt.url) + }) + + diagnosticsChannel.subscribe('undici:websocket:handshakeRequest', + evt => { + debugLog('sending opening handshake for %s', evt.websocket?.url ?? '') + }) + diagnosticsChannel.subscribe('undici:websocket:open', evt => { const { @@ -194,6 +211,21 @@ function trackWebSocketEvents (debugLog = websocketDebuglog) { ) }) + diagnosticsChannel.subscribe('undici:websocket:frameSent', + evt => { + debugLog('frame sent opcode=%d bytes=%d', evt.opcode, evt.payloadData.length) + }) + + diagnosticsChannel.subscribe('undici:websocket:frameReceived', + evt => { + debugLog('frame received opcode=%d bytes=%d', evt.opcode, evt.payloadData.length) + }) + + diagnosticsChannel.subscribe('undici:websocket:frameError', + evt => { + debugLog('frame errored for %s - %s', evt.websocket?.url ?? '', evt.error.message) + }) + diagnosticsChannel.subscribe('undici:websocket:socket_error', err => { debugLog('connection errored - %s', err.message) diff --git a/lib/web/websocket/connection.js b/lib/web/websocket/connection.js index 4ecc8a195fc..107b5caffe2 100644 --- a/lib/web/websocket/connection.js +++ b/lib/web/websocket/connection.js @@ -9,6 +9,7 @@ const { getDecodeSplit } = require('../fetch/util') const { WebsocketFrameSend } = require('./frame') const assert = require('node:assert') const { runtimeFeatures } = require('../../util/runtime-features') +const { channels } = require('../../core/diagnostics') const crypto = runtimeFeatures.has('crypto') ? require('node:crypto') @@ -89,6 +90,15 @@ function establishWebSocketConnection (url, protocols, client, handler, options) // 11. Fetch request with useParallelQueue set to true, and // processResponse given response being these steps: + if (channels.handshakeRequest.hasSubscribers) { + channels.handshakeRequest.publish({ + websocket: handler.websocket, + request: { + headers: request.headersList.entries + } + }) + } + const controller = fetching({ request, useParallelQueue: true, diff --git a/lib/web/websocket/receiver.js b/lib/web/websocket/receiver.js index 13ad8b48201..997c1d969d0 100644 --- a/lib/web/websocket/receiver.js +++ b/lib/web/websocket/receiver.js @@ -16,6 +16,7 @@ const { failWebsocketConnection } = require('./connection') const { WebsocketFrameSend } = require('./frame') const { PerMessageDeflate } = require('./permessage-deflate') const { MessageSizeExceededError } = require('../../core/errors') +const { channels } = require('../../core/diagnostics') // This code was influenced by ws released under the MIT license. // Copyright (c) 2011 Einar Otto Stangvik @@ -97,12 +98,12 @@ class ByteParser extends Writable { const rsv3 = buffer[0] & 0x10 if (!isValidOpcode(opcode)) { - failWebsocketConnection(this.#handler, 1002, 'Invalid opcode received') + this.failWebsocketConnection(1002, 'Invalid opcode received') return callback() } if (masked) { - failWebsocketConnection(this.#handler, 1002, 'Frame cannot be masked') + this.failWebsocketConnection(1002, 'Frame cannot be masked') return callback() } @@ -116,43 +117,43 @@ class ByteParser extends Writable { // WebSocket connection where a PMCE is in use, this bit indicates // whether a message is compressed or not. if (rsv1 !== 0 && !this.#extensions.has('permessage-deflate')) { - failWebsocketConnection(this.#handler, 1002, 'Expected RSV1 to be clear.') + this.failWebsocketConnection(1002, 'Expected RSV1 to be clear.') return } if (rsv2 !== 0 || rsv3 !== 0) { - failWebsocketConnection(this.#handler, 1002, 'RSV1, RSV2, RSV3 must be clear') + this.failWebsocketConnection(1002, 'RSV1, RSV2, RSV3 must be clear') return } if (fragmented && !isTextBinaryFrame(opcode)) { // Only text and binary frames can be fragmented - failWebsocketConnection(this.#handler, 1002, 'Invalid frame type was fragmented.') + this.failWebsocketConnection(1002, 'Invalid frame type was fragmented.') return } // If we are already parsing a text/binary frame and do not receive either // a continuation frame or close frame, fail the connection. if (isTextBinaryFrame(opcode) && this.#fragments.length > 0) { - failWebsocketConnection(this.#handler, 1002, 'Expected continuation frame') + this.failWebsocketConnection(1002, 'Expected continuation frame') return } if (this.#info.fragmented && fragmented) { // A fragmented frame can't be fragmented itself - failWebsocketConnection(this.#handler, 1002, 'Fragmented frame exceeded 125 bytes.') + this.failWebsocketConnection(1002, 'Fragmented frame exceeded 125 bytes.') return } // "All control frames MUST have a payload length of 125 bytes or less // and MUST NOT be fragmented." if ((payloadLength > 125 || fragmented) && isControlFrame(opcode)) { - failWebsocketConnection(this.#handler, 1002, 'Control frame either too large or fragmented') + this.failWebsocketConnection(1002, 'Control frame either too large or fragmented') return } if (isContinuationFrame(opcode) && this.#fragments.length === 0 && !this.#info.compressed) { - failWebsocketConnection(this.#handler, 1002, 'Unexpected continuation frame') + this.failWebsocketConnection(1002, 'Unexpected continuation frame') return } @@ -199,7 +200,7 @@ class ByteParser extends Writable { // https://source.chromium.org/chromium/chromium/src/+/main:v8/src/common/globals.h;drc=1946212ac0100668f14eb9e2843bdd846e510a1e;bpv=1;bpt=1;l=1275 // https://source.chromium.org/chromium/chromium/src/+/main:v8/src/objects/js-array-buffer.h;l=34;drc=1946212ac0100668f14eb9e2843bdd846e510a1e if (upper !== 0 || lower > 2 ** 31 - 1) { - failWebsocketConnection(this.#handler, 1009, 'Received payload length > 2^31 bytes.') + this.failWebsocketConnection(1009, 'Received payload length > 2^31 bytes.') return } @@ -212,6 +213,14 @@ class ByteParser extends Writable { const body = this.consume(this.#info.payloadLength) + if (channels.frameReceived.hasSubscribers) { + channels.frameReceived.publish({ + websocket: this.#handler.websocket, + opcode: this.#info.opcode, + payloadData: Buffer.from(body) + }) + } + if (isControlFrame(this.#info.opcode)) { this.#loop = this.parseControlFrame(body) this.#state = parserStates.INFO @@ -231,9 +240,8 @@ class ByteParser extends Writable { } else { this.#extensions.get('permessage-deflate').decompress(body, this.#info.fin, (error, data) => { if (error) { - // Use 1009 (Message Too Big) for decompression size limit errors const code = error instanceof MessageSizeExceededError ? 1009 : 1007 - failWebsocketConnection(this.#handler, code, error.message) + this.failWebsocketConnection(code, error.message, error) return } @@ -384,7 +392,7 @@ class ByteParser extends Writable { if (opcode === opcodes.CLOSE) { if (payloadLength === 1) { - failWebsocketConnection(this.#handler, 1002, 'Received close frame with a 1-byte body.') + this.failWebsocketConnection(1002, 'Received close frame with a 1-byte body.') return false } @@ -393,7 +401,7 @@ class ByteParser extends Writable { if (this.#info.closeInfo.error) { const { code, reason } = this.#info.closeInfo - failWebsocketConnection(this.#handler, code, reason) + this.failWebsocketConnection(code, reason) return false } @@ -448,6 +456,22 @@ class ByteParser extends Writable { get closingInfo () { return this.#info.closeInfo } + + publishFrameError (error) { + if (!channels.frameError.hasSubscribers) { + return + } + + channels.frameError.publish({ + websocket: this.#handler.websocket, + error + }) + } + + failWebsocketConnection (code, reason, error = new Error(reason)) { + this.publishFrameError(error) + failWebsocketConnection(this.#handler, code, reason) + } } module.exports = { diff --git a/lib/web/websocket/sender.js b/lib/web/websocket/sender.js index c647bf629d7..8fd00eeabf1 100644 --- a/lib/web/websocket/sender.js +++ b/lib/web/websocket/sender.js @@ -3,12 +3,14 @@ const { WebsocketFrameSend } = require('./frame') const { opcodes, sendHints } = require('./constants') const FixedQueue = require('../../dispatcher/fixed-queue') +const { channels } = require('../../core/diagnostics') /** * @typedef {object} SendQueueNode * @property {Promise | null} promise * @property {((...args: any[]) => any)} callback * @property {Buffer | null} frame + * @property {{ data: Buffer | ArrayBuffer | ArrayBufferView | null, hint: number }} diagnosticInfo */ class SendQueue { @@ -25,8 +27,11 @@ class SendQueue { /** @type {import('node:net').Socket} */ #socket - constructor (socket) { + #websocket + + constructor (socket, websocket) { this.#socket = socket + this.#websocket = websocket } add (item, cb, hint) { @@ -35,6 +40,7 @@ class SendQueue { // TODO(@tsctx): support fast-path for string on running if (hint === sendHints.text) { // special fast-path for string + publishFrame(this.#websocket, opcodes.TEXT, item, hint) const { 0: head, 1: body } = WebsocketFrameSend.createFastTextFrame(item) this.#socket.cork() this.#socket.write(head) @@ -42,6 +48,7 @@ class SendQueue { this.#socket.uncork() } else { // direct writing + publishFrame(this.#websocket, opcodes.BINARY, item, hint) this.#socket.write(createFrame(item, hint), cb) } } else { @@ -49,7 +56,11 @@ class SendQueue { const node = { promise: null, callback: cb, - frame: createFrame(item, hint) + frame: createFrame(item, hint), + diagnosticInfo: { + data: item, + hint + } } this.#queue.push(node) } @@ -60,10 +71,15 @@ class SendQueue { const node = { promise: item.arrayBuffer().then((ab) => { node.promise = null + node.diagnosticInfo.data = ab node.frame = createFrame(ab, hint) }), callback: cb, - frame: null + frame: null, + diagnosticInfo: { + data: item, + hint + } } this.#queue.push(node) @@ -83,6 +99,9 @@ class SendQueue { await node.promise } // write + if (node.frame !== null) { + publishQueuedFrame(this.#websocket, node.frame, node.diagnosticInfo) + } this.#socket.write(node.frame, node.callback) // cleanup node.callback = node.frame = null @@ -106,4 +125,28 @@ function toBuffer (data, hint) { } } +function publishFrame (websocket, opcode, data, hint) { + if (!channels.frameSent.hasSubscribers) { + return + } + + channels.frameSent.publish({ + websocket, + opcode, + payloadData: Buffer.from(toBuffer(data, hint)) + }) +} + +function publishQueuedFrame (websocket, frame, diagnosticInfo) { + if (!channels.frameSent.hasSubscribers) { + return + } + + channels.frameSent.publish({ + websocket, + opcode: frame[0] & 0x0F, + payloadData: Buffer.from(toBuffer(diagnosticInfo.data, diagnosticInfo.hint)) + }) +} + module.exports = { SendQueue } diff --git a/lib/web/websocket/websocket.js b/lib/web/websocket/websocket.js index 676b20164df..e7c0d31933a 100644 --- a/lib/web/websocket/websocket.js +++ b/lib/web/websocket/websocket.js @@ -36,6 +36,7 @@ const { channels } = require('../../core/diagnostics') * @property {() => void} onSocketClose * @property {(body: Buffer) => void} onPing * @property {(body: Buffer) => void} onPong + * @property {WebSocket} [websocket] * * @property {number} readyState * @property {import('stream').Duplex} socket @@ -155,6 +156,14 @@ class WebSocket extends EventTarget { // 5. Set this's url to urlRecord. this.#url = new URL(urlRecord.href) + this.#handler.websocket = this + + if (channels.created.hasSubscribers) { + channels.created.publish({ + websocket: this, + url: URLSerializer(this.#url) + }) + } // Store options for later use (e.g., maxDecompressedMessageSize) this.#options = { @@ -468,7 +477,7 @@ class WebSocket extends EventTarget { parser.on('error', (err) => this.#handler.onParserError(err)) this.#parser = parser - this.#sendQueue = new SendQueue(response.socket) + this.#sendQueue = new SendQueue(response.socket, this) // 1. Change the ready state to OPEN (1). this.#handler.readyState = states.OPEN diff --git a/test/node-test/debug.js b/test/node-test/debug.js index 2e047175307..38ec31936ec 100644 --- a/test/node-test/debug.js +++ b/test/node-test/debug.js @@ -12,7 +12,7 @@ const isNode23Plus = process.versions.node.split('.')[0] >= 23 const isCITGM = !!process.env.CITGM test('debug#websocket', { skip: !process.versions.icu || isCITGM || isNode23Plus }, async t => { - const assert = tspl(t, { plan: 6 }) + const assert = tspl(t, { plan: 10 }) const child = spawn( process.execPath, [ @@ -26,10 +26,13 @@ test('debug#websocket', { skip: !process.versions.icu || isCITGM || isNode23Plus ) const chunks = [] const assertions = [ + /(WEBSOCKET [0-9]+:) (created websocket for)/, + /(WEBSOCKET [0-9]+:) (sending opening handshake for)/, /(WEBSOCKET [0-9]+:) (connecting to)/, /(WEBSOCKET [0-9]+:) (connected to)/, /(WEBSOCKET [0-9]+:) (sending request)/, /(WEBSOCKET [0-9]+:) (connection opened)/, + /(WEBSOCKET [0-9]+:) (frame received opcode=8 bytes=9)/, /(WEBSOCKET [0-9]+:) (closed connection to)/, /^$/ ] @@ -41,7 +44,7 @@ test('debug#websocket', { skip: !process.versions.icu || isCITGM || isNode23Plus child.stderr.on('end', () => { const lines = extractLines(chunks) assert.strictEqual(lines.length, assertions.length) - for (let i = 1; i < lines.length; i++) { + for (let i = 0; i < lines.length; i++) { assert.match(lines[i], assertions[i]) } }) diff --git a/test/types/diagnostics-channel.test-d.ts b/test/types/diagnostics-channel.test-d.ts index 842788c6b93..6d84276d74d 100644 --- a/test/types/diagnostics-channel.test-d.ts +++ b/test/types/diagnostics-channel.test-d.ts @@ -1,6 +1,6 @@ import { Socket } from 'node:net' import { expectAssignable } from 'tsd' -import { DiagnosticsChannel, buildConnector } from '../..' +import { DiagnosticsChannel, WebSocket, buildConnector } from '../..' const request = { origin: '', @@ -27,6 +27,8 @@ const connectParams = { servername: '' } +const websocket = {} as InstanceType + expectAssignable({ request }) expectAssignable({ request, chunk: Buffer.from('') }) expectAssignable({ request, chunk: '' }) @@ -73,3 +75,43 @@ expectAssignable({ callback: buildConnector.Callback ) => new Socket() }) +expectAssignable({ + websocket, + url: 'ws://localhost:3000' +}) +expectAssignable({ + websocket, + request: { + headers: {} + } +}) +expectAssignable({ + address: { + address: '127.0.0.1', + family: 'IPv4', + port: 3000 + }, + protocol: '', + extensions: '', + websocket, + handshakeResponse: { + status: 101, + statusText: 'Switching Protocols', + headers: {} + } +}) +expectAssignable({ + websocket, + code: 1000, + reason: '' +}) +expectAssignable({ + websocket, + opcode: 1, + mask: true, + payloadData: Buffer.from('') +}) +expectAssignable({ + websocket, + error: new Error('Error') +}) diff --git a/test/websocket/diagnostics-channel-created-handshake-request.js b/test/websocket/diagnostics-channel-created-handshake-request.js new file mode 100644 index 00000000000..fa9e2c415ac --- /dev/null +++ b/test/websocket/diagnostics-channel-created-handshake-request.js @@ -0,0 +1,64 @@ +'use strict' + +const { test } = require('node:test') +const { once } = require('node:events') +const { createServer } = require('node:http') +const dc = require('node:diagnostics_channel') +const { WebSocketServer } = require('ws') +const { WebSocket } = require('../..') + +test('diagnostics channel - undici:websocket:[created/handshakeRequest]', async (t) => { + t.plan(10) + + const server = createServer() + const wss = new WebSocketServer({ noServer: true }) + + server.on('upgrade', (request, socket, head) => { + wss.handleUpgrade(request, socket, head, (ws) => { + ws.close(1000, 'done') + }) + }) + + server.listen(0) + await once(server, 'listening') + + const url = `ws://localhost:${server.address().port}` + const events = [] + let createdWebSocket + let handshakeWebSocket + + const createdListener = ({ websocket, url: createdUrl }) => { + events.push('created') + createdWebSocket = websocket + t.assert.strictEqual(createdUrl, `${url}/`) + } + + const handshakeRequestListener = ({ websocket, request }) => { + events.push('handshakeRequest') + handshakeWebSocket = websocket + t.assert.strictEqual(typeof request, 'object') + t.assert.strictEqual(typeof request.headers, 'object') + t.assert.strictEqual(request.headers['sec-websocket-version'], '13') + t.assert.strictEqual(request.headers['sec-websocket-extensions'], 'permessage-deflate; client_max_window_bits') + t.assert.strictEqual(typeof request.headers['sec-websocket-key'], 'string') + } + + dc.channel('undici:websocket:created').subscribe(createdListener) + dc.channel('undici:websocket:handshakeRequest').subscribe(handshakeRequestListener) + + const ws = new WebSocket(url) + + t.after(() => { + dc.channel('undici:websocket:created').unsubscribe(createdListener) + dc.channel('undici:websocket:handshakeRequest').unsubscribe(handshakeRequestListener) + wss.close() + server.close() + }) + + await once(ws, 'close') + + t.assert.deepStrictEqual(events, ['created', 'handshakeRequest']) + t.assert.strictEqual(createdWebSocket, ws) + t.assert.strictEqual(handshakeWebSocket, ws) + t.assert.strictEqual(ws.url, `${url}/`) +}) diff --git a/test/websocket/diagnostics-channel-frame-error.js b/test/websocket/diagnostics-channel-frame-error.js new file mode 100644 index 00000000000..4305c43550d --- /dev/null +++ b/test/websocket/diagnostics-channel-frame-error.js @@ -0,0 +1,42 @@ +'use strict' + +const { test } = require('node:test') +const { once } = require('node:events') +const dc = require('node:diagnostics_channel') +const { WebSocketServer } = require('ws') +const { WebSocket } = require('../..') +const { WebsocketFrameSend } = require('../../lib/web/websocket/frame') + +test('diagnostics channel - undici:websocket:frameError', async (t) => { + t.plan(3) + + const body = Buffer.allocUnsafe(2) + body.writeUInt16BE(1006, 0) + + const frame = new WebsocketFrameSend(body) + const buffer = frame.createFrame(0x8) + + const server = new WebSocketServer({ port: 0 }) + + server.on('connection', (socket) => { + socket._socket.write(buffer, () => socket.close()) + }) + + const ws = new WebSocket(`ws://localhost:${server.address().port}`) + + const frameErrorListener = ({ websocket, error }) => { + t.assert.strictEqual(websocket, ws) + t.assert.strictEqual(error.message, 'Frame cannot be masked') + } + + dc.channel('undici:websocket:frameError').subscribe(frameErrorListener) + + t.after(() => { + server.close() + ws.close() + dc.channel('undici:websocket:frameError').unsubscribe(frameErrorListener) + }) + + await once(ws, 'close') + t.assert.strictEqual(ws.readyState, WebSocket.CLOSED) +}) diff --git a/test/websocket/diagnostics-channel-frames.js b/test/websocket/diagnostics-channel-frames.js new file mode 100644 index 00000000000..fab953b5382 --- /dev/null +++ b/test/websocket/diagnostics-channel-frames.js @@ -0,0 +1,55 @@ +'use strict' + +const { test } = require('node:test') +const { once } = require('node:events') +const dc = require('node:diagnostics_channel') +const { WebSocketServer } = require('ws') +const { WebSocket } = require('../..') +const { opcodes } = require('../../lib/web/websocket/constants') + +test('diagnostics channel - undici:websocket:[frameSent/frameReceived]', async (t) => { + t.plan(6) + + const server = new WebSocketServer({ port: 0 }) + const ws = new WebSocket(`ws://localhost:${server.address().port}`) + + server.on('connection', (socket) => { + socket.on('message', (payload) => { + socket.send(payload.toString()) + }) + }) + + const frameSentListener = ({ websocket, opcode, mask, payloadData }) => { + if (opcode !== opcodes.TEXT || payloadData.toString() !== 'hello') { + return + } + + t.assert.strictEqual(websocket, ws) + t.assert.strictEqual(opcode, opcodes.TEXT) + t.assert.strictEqual(payloadData.toString(), 'hello') + } + + const frameReceivedListener = ({ websocket, opcode, mask, payloadData }) => { + if (opcode !== opcodes.TEXT || payloadData.toString() !== 'hello') { + return + } + + t.assert.strictEqual(websocket, ws) + t.assert.strictEqual(opcode, opcodes.TEXT) + t.assert.strictEqual(payloadData.toString(), 'hello') + } + + dc.channel('undici:websocket:frameSent').subscribe(frameSentListener) + dc.channel('undici:websocket:frameReceived').subscribe(frameReceivedListener) + + t.after(() => { + server.close() + ws.close() + dc.channel('undici:websocket:frameSent').unsubscribe(frameSentListener) + dc.channel('undici:websocket:frameReceived').unsubscribe(frameReceivedListener) + }) + + await once(ws, 'open') + ws.send('hello') + await once(ws, 'message') +}) diff --git a/types/diagnostics-channel.d.ts b/types/diagnostics-channel.d.ts index 3c6a5299d38..687c79f372d 100644 --- a/types/diagnostics-channel.d.ts +++ b/types/diagnostics-channel.d.ts @@ -4,6 +4,7 @@ import buildConnector from './connector' import Dispatcher from './dispatcher' declare namespace DiagnosticsChannel { + type WebSocket = InstanceType interface Request { origin?: string | URL; completed: boolean; @@ -71,4 +72,44 @@ declare namespace DiagnosticsChannel { connectParams: ConnectParams; connector: Connector; } + export interface WebsocketCreatedMessage { + websocket: WebSocket; + url: string; + } + export interface WebsocketHandshakeRequestMessage { + websocket: WebSocket; + request: { + headers: Record; + }; + } + export interface WebsocketOpenMessage { + address: { + address: string; + family: string; + port: number; + }; + protocol: string; + extensions: string; + websocket: WebSocket; + handshakeResponse: { + status: number; + statusText: string; + headers: Record; + }; + } + export interface WebsocketCloseMessage { + websocket: WebSocket; + code: number; + reason: string; + } + export interface WebsocketFrameMessage { + websocket: WebSocket; + opcode: number; + mask: boolean; + payloadData: Buffer; + } + export interface WebsocketFrameErrorMessage { + websocket: WebSocket; + error: Error; + } }