Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 57 additions & 40 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ const {
HTTP2_HEADER_CONTENT_LENGTH,
HTTP2_HEADER_EXPECT,
HTTP2_HEADER_STATUS,
HTTP2_HEADER_PROTOCOL,
NGHTTP2_REFUSED_STREAM,
NGHTTP2_CANCEL
HTTP2_HEADER_PROTOCOL
}
} = http2

Expand Down Expand Up @@ -506,6 +504,60 @@ function writeH2 (client, request) {
if (upgrade || method === 'CONNECT') {
session.ref()

const setupUpgradeStream = (stream) => {
let responseReceived = false

const failUpgradeStream = (err) => {
if (responseReceived || request.aborted || request.completed) {
return
}

abort(err)
client[kQueue][client[kRunningIdx]++] = null
client[kPendingIdx] = client[kRunningIdx]
client[kResume]()
}

const onUpgradeStreamError = () => {
if (typeof stream.rstCode === 'number' && stream.rstCode !== 0) {
failUpgradeStream(new InformationalError(`HTTP/2: "stream error" received - code ${stream.rstCode}`))
} else {
failUpgradeStream(new InformationalError('HTTP/2: stream errored before response headers'))
}
}

const onUpgradeStreamEnd = () => {
failUpgradeStream(new InformationalError('HTTP/2: stream half-closed (remote)'))
}

stream.once('response', (headers, _flags) => {
responseReceived = true

const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers

request.onRequestUpgrade(statusCode, parseH2Headers(realHeaders), stream)

if (!request.aborted && !request.completed) {
stream.off('error', onUpgradeStreamError)
stream.off('end', onUpgradeStreamEnd)
}

client[kQueue][client[kRunningIdx]++] = null
})

stream.on('error', onUpgradeStreamError)
stream.once('end', onUpgradeStreamEnd)
stream.once('close', () => {
failUpgradeStream(new InformationalError('HTTP/2: stream closed before response headers'))

session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) session.unref()
})

++session[kOpenStreams]
stream.setTimeout(requestTimeout)
}

if (upgrade === 'websocket') {
// We cannot upgrade to websocket if extended CONNECT protocol is not supported
if (session[kEnableConnectProtocol] === false) {
Expand All @@ -530,31 +582,7 @@ function writeH2 (client, request) {

stream = session.request(headers, { endStream: false, signal })
stream[kHTTP2Stream] = true

stream.once('response', (headers, _flags) => {
const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers

request.onRequestUpgrade(statusCode, parseH2Headers(realHeaders), stream)

++session[kOpenStreams]
client[kQueue][client[kRunningIdx]++] = null
})

stream.on('error', () => {
if (stream.rstCode === NGHTTP2_REFUSED_STREAM || stream.rstCode === NGHTTP2_CANCEL) {
// NGHTTP2_REFUSED_STREAM (7) or NGHTTP2_CANCEL (8)
// We do not treat those as errors as the server might
// not support websockets and refuse the stream
abort(new InformationalError(`HTTP/2: "stream error" received - code ${stream.rstCode}`))
}
})

stream.once('close', () => {
session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) session.unref()
})

stream.setTimeout(requestTimeout)
setupUpgradeStream(stream)
return true
}

Expand All @@ -565,18 +593,7 @@ function writeH2 (client, request) {
// We disabled endStream to allow the user to write to the stream
stream = session.request(headers, { endStream: false, signal })
stream[kHTTP2Stream] = true
stream.on('response', headers => {
const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers

request.onRequestUpgrade(statusCode, parseH2Headers(realHeaders), stream)
++session[kOpenStreams]
client[kQueue][client[kRunningIdx]++] = null
})
stream.once('close', () => {
session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) session.unref()
})
stream.setTimeout(requestTimeout)
setupUpgradeStream(stream)

return true
}
Expand Down
73 changes: 71 additions & 2 deletions test/http2-dispatcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,84 @@ test('Dispatcher#Upgrade', async t => {
},
allowH2: true
})
after(() => client.close().then(() => { server.close() }))

const { socket } = await client.upgrade({ path: '/', protocol: 'websocket' })

t.ok(socket.readable)
t.ok(socket.writable)
t.strictEqual(socket.closed, false)

after(() => socket.end())
await t.completed

socket.on('error', () => {})
socket.end()
await once(socket, 'close')
await client.close()
await new Promise((resolve) => server.close(resolve))
})

test('Dispatcher#Upgrade rejects if stream closes before response headers', async t => {
t = tspl(t, { plan: 2 })

const server = createSecureServer({ ...(await pem.generate({ opts: { keySize: 2048 } })), settings: { enableConnectProtocol: true } })

server.on('stream', (stream) => {
stream.close()
})

await once(server.listen(0), 'listening')

const client = new Client(`https://localhost:${server.address().port}`, {
connect: {
rejectUnauthorized: false
},
allowH2: true
})
after(() => client.close().then(() => { server.close() }))

const err = await Promise.race([
client.upgrade({ path: '/', protocol: 'websocket' }).then(
() => new Error('upgrade unexpectedly resolved'),
err => err
),
sleep(1_000).then(() => new Error('upgrade hung waiting for response headers'))
])

t.ok(err instanceof errors.InformationalError, err.message)
t.match(err.message, /HTTP\/2:/)

await t.completed
})

test('Dispatcher#Connect rejects if stream closes before response headers', async t => {
t = tspl(t, { plan: 2 })

const server = createSecureServer(await pem.generate({ opts: { keySize: 2048 } }))

server.on('stream', (stream) => {
stream.close()
})

await once(server.listen(0), 'listening')

const client = new Client(`https://localhost:${server.address().port}`, {
connect: {
rejectUnauthorized: false
},
allowH2: true
})
after(() => client.close().then(() => { server.close() }))

const err = await Promise.race([
client.connect({ path: '/' }).then(
() => new Error('connect unexpectedly resolved'),
err => err
),
sleep(1_000).then(() => new Error('connect hung waiting for response headers'))
])

t.ok(err instanceof errors.InformationalError, err.message)
t.match(err.message, /HTTP\/2:/)

await t.completed
})
Expand Down
20 changes: 16 additions & 4 deletions test/websocket/opening-handshake.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,12 @@ test('WebSocket on H2', { skip: crypto == null }, async (t) => {

test('WebSocket connecting to server that isn\'t a Websocket server (h2 - supports extended CONNECT protocol)', async (t) => {
const planner = tspl(t, { plan: 6 })
const sessions = new Set()
const h2Server = createSecureServer({ cert, key, settings: { enableConnectProtocol: true } })
.on('session', (session) => {
sessions.add(session)
session.on('close', () => sessions.delete(session))
})
.on('stream', (stream, headers) => {
planner.equal(headers[':method'], 'CONNECT')
planner.equal(headers[':protocol'], 'websocket')
Expand All @@ -145,15 +150,22 @@ test('WebSocket connecting to server that isn\'t a Websocket server (h2 - suppor
})
const ws = new WebSocket(`wss://localhost:${h2Server.address().port}`, { dispatcher, protocols: ['chat'] })
const cleaner = setupListener()
ws.onmessage = ws.onopen = () => planner.fail('should not open')

t.after(() => {
t.after(async () => {
cleaner()
dispatcher.close()
ws.onerror = null
ws.close()
h2Server.close()

for (const session of sessions) {
session.close()
}

await new Promise((resolve) => h2Server.close(resolve))
await dispatcher.close()
})

ws.onmessage = ws.onopen = () => planner.fail('should not open')

await planner.completed

function setupListener () {
Expand Down
Loading