diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index b06a21e695a..dc39383e1f5 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -62,9 +62,7 @@ const { HTTP2_HEADER_CONTENT_LENGTH, HTTP2_HEADER_EXPECT, HTTP2_HEADER_STATUS, - HTTP2_HEADER_PROTOCOL, - NGHTTP2_REFUSED_STREAM, - NGHTTP2_CANCEL + HTTP2_HEADER_PROTOCOL } } = http2 @@ -506,6 +504,60 @@ function writeH2 (client, request) { if (upgrade || method === 'CONNECT') { session.ref() + const setupUpgradeStream = (stream) => { + let responseReceived = false + + const failUpgradeStream = (err) => { + if (responseReceived || request.aborted || request.completed) { + return + } + + abort(err) + client[kQueue][client[kRunningIdx]++] = null + client[kPendingIdx] = client[kRunningIdx] + client[kResume]() + } + + const onUpgradeStreamError = () => { + if (typeof stream.rstCode === 'number' && stream.rstCode !== 0) { + failUpgradeStream(new InformationalError(`HTTP/2: "stream error" received - code ${stream.rstCode}`)) + } else { + failUpgradeStream(new InformationalError('HTTP/2: stream errored before response headers')) + } + } + + const onUpgradeStreamEnd = () => { + failUpgradeStream(new InformationalError('HTTP/2: stream half-closed (remote)')) + } + + stream.once('response', (headers, _flags) => { + responseReceived = true + + const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers + + request.onRequestUpgrade(statusCode, parseH2Headers(realHeaders), stream) + + if (!request.aborted && !request.completed) { + stream.off('error', onUpgradeStreamError) + stream.off('end', onUpgradeStreamEnd) + } + + client[kQueue][client[kRunningIdx]++] = null + }) + + stream.on('error', onUpgradeStreamError) + stream.once('end', onUpgradeStreamEnd) + stream.once('close', () => { + failUpgradeStream(new InformationalError('HTTP/2: stream closed before response headers')) + + session[kOpenStreams] -= 1 + if (session[kOpenStreams] === 0) session.unref() + }) + + ++session[kOpenStreams] + stream.setTimeout(requestTimeout) + } + if (upgrade === 'websocket') { // We cannot upgrade to websocket if extended CONNECT protocol is not supported if (session[kEnableConnectProtocol] === false) { @@ -530,31 +582,7 @@ function writeH2 (client, request) { stream = session.request(headers, { endStream: false, signal }) stream[kHTTP2Stream] = true - - stream.once('response', (headers, _flags) => { - const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers - - request.onRequestUpgrade(statusCode, parseH2Headers(realHeaders), stream) - - ++session[kOpenStreams] - client[kQueue][client[kRunningIdx]++] = null - }) - - stream.on('error', () => { - if (stream.rstCode === NGHTTP2_REFUSED_STREAM || stream.rstCode === NGHTTP2_CANCEL) { - // NGHTTP2_REFUSED_STREAM (7) or NGHTTP2_CANCEL (8) - // We do not treat those as errors as the server might - // not support websockets and refuse the stream - abort(new InformationalError(`HTTP/2: "stream error" received - code ${stream.rstCode}`)) - } - }) - - stream.once('close', () => { - session[kOpenStreams] -= 1 - if (session[kOpenStreams] === 0) session.unref() - }) - - stream.setTimeout(requestTimeout) + setupUpgradeStream(stream) return true } @@ -565,18 +593,7 @@ function writeH2 (client, request) { // We disabled endStream to allow the user to write to the stream stream = session.request(headers, { endStream: false, signal }) stream[kHTTP2Stream] = true - stream.on('response', headers => { - const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers - - request.onRequestUpgrade(statusCode, parseH2Headers(realHeaders), stream) - ++session[kOpenStreams] - client[kQueue][client[kRunningIdx]++] = null - }) - stream.once('close', () => { - session[kOpenStreams] -= 1 - if (session[kOpenStreams] === 0) session.unref() - }) - stream.setTimeout(requestTimeout) + setupUpgradeStream(stream) return true } diff --git a/test/http2-dispatcher.js b/test/http2-dispatcher.js index 3244119ad0c..22a30eb2dd1 100644 --- a/test/http2-dispatcher.js +++ b/test/http2-dispatcher.js @@ -278,7 +278,6 @@ test('Dispatcher#Upgrade', async t => { }, allowH2: true }) - after(() => client.close().then(() => { server.close() })) const { socket } = await client.upgrade({ path: '/', protocol: 'websocket' }) @@ -286,7 +285,77 @@ test('Dispatcher#Upgrade', async t => { t.ok(socket.writable) t.strictEqual(socket.closed, false) - after(() => socket.end()) + await t.completed + + socket.on('error', () => {}) + socket.end() + await once(socket, 'close') + await client.close() + await new Promise((resolve) => server.close(resolve)) +}) + +test('Dispatcher#Upgrade rejects if stream closes before response headers', async t => { + t = tspl(t, { plan: 2 }) + + const server = createSecureServer({ ...(await pem.generate({ opts: { keySize: 2048 } })), settings: { enableConnectProtocol: true } }) + + server.on('stream', (stream) => { + stream.close() + }) + + await once(server.listen(0), 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + after(() => client.close().then(() => { server.close() })) + + const err = await Promise.race([ + client.upgrade({ path: '/', protocol: 'websocket' }).then( + () => new Error('upgrade unexpectedly resolved'), + err => err + ), + sleep(1_000).then(() => new Error('upgrade hung waiting for response headers')) + ]) + + t.ok(err instanceof errors.InformationalError, err.message) + t.match(err.message, /HTTP\/2:/) + + await t.completed +}) + +test('Dispatcher#Connect rejects if stream closes before response headers', async t => { + t = tspl(t, { plan: 2 }) + + const server = createSecureServer(await pem.generate({ opts: { keySize: 2048 } })) + + server.on('stream', (stream) => { + stream.close() + }) + + await once(server.listen(0), 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + after(() => client.close().then(() => { server.close() })) + + const err = await Promise.race([ + client.connect({ path: '/' }).then( + () => new Error('connect unexpectedly resolved'), + err => err + ), + sleep(1_000).then(() => new Error('connect hung waiting for response headers')) + ]) + + t.ok(err instanceof errors.InformationalError, err.message) + t.match(err.message, /HTTP\/2:/) await t.completed }) diff --git a/test/websocket/opening-handshake.js b/test/websocket/opening-handshake.js index e62f054fd4e..26fbdb90448 100644 --- a/test/websocket/opening-handshake.js +++ b/test/websocket/opening-handshake.js @@ -122,7 +122,12 @@ test('WebSocket on H2', { skip: crypto == null }, async (t) => { test('WebSocket connecting to server that isn\'t a Websocket server (h2 - supports extended CONNECT protocol)', async (t) => { const planner = tspl(t, { plan: 6 }) + const sessions = new Set() const h2Server = createSecureServer({ cert, key, settings: { enableConnectProtocol: true } }) + .on('session', (session) => { + sessions.add(session) + session.on('close', () => sessions.delete(session)) + }) .on('stream', (stream, headers) => { planner.equal(headers[':method'], 'CONNECT') planner.equal(headers[':protocol'], 'websocket') @@ -145,15 +150,22 @@ test('WebSocket connecting to server that isn\'t a Websocket server (h2 - suppor }) const ws = new WebSocket(`wss://localhost:${h2Server.address().port}`, { dispatcher, protocols: ['chat'] }) const cleaner = setupListener() - ws.onmessage = ws.onopen = () => planner.fail('should not open') - t.after(() => { + t.after(async () => { cleaner() - dispatcher.close() + ws.onerror = null ws.close() - h2Server.close() + + for (const session of sessions) { + session.close() + } + + await new Promise((resolve) => h2Server.close(resolve)) + await dispatcher.close() }) + ws.onmessage = ws.onopen = () => planner.fail('should not open') + await planner.completed function setupListener () {