diff --git a/docs/docs/api/BalancedPool.md b/docs/docs/api/BalancedPool.md index df267fe7270..6d635a442d2 100644 --- a/docs/docs/api/BalancedPool.md +++ b/docs/docs/api/BalancedPool.md @@ -20,6 +20,11 @@ Extends: [`PoolOptions`](/docs/docs/api/Pool.md#parameter-pooloptions) * **factory** `(origin: URL, opts: Object) => Dispatcher` - Default: `(origin, opts) => new Pool(origin, opts)` The `PoolOptions` are passed to each of the `Pool` instances being created. + +When an upstream hostname resolves to multiple DNS records, `BalancedPool` +resolves the hostname lazily at connect time and rotates new connections across +the resolved addresses. Requests continue to use the original hostname for +`Host` headers and TLS `servername`. ## Instance Properties ### `BalancedPool.upstreams` diff --git a/lib/dispatcher/balanced-pool.js b/lib/dispatcher/balanced-pool.js index fa0fa97288e..bf10c3832fe 100644 --- a/lib/dispatcher/balanced-pool.js +++ b/lib/dispatcher/balanced-pool.js @@ -1,5 +1,7 @@ 'use strict' +const dns = require('node:dns') +const { isIP } = require('node:net') const { BalancedPoolMissingUpstreamError, InvalidArgumentError @@ -13,6 +15,7 @@ const { kGetDispatcher } = require('./pool-base') const Pool = require('./pool') +const buildConnector = require('../core/connect') const { kUrl } = require('../core/symbols') const util = require('../core/util') const kFactory = Symbol('factory') @@ -48,6 +51,69 @@ function defaultFactory (origin, opts) { return new Pool(origin, opts) } +function buildDnsBalancedConnector (origin, opts) { + const { + connect, + connectTimeout, + tls, + maxCachedSessions, + socketPath, + autoSelectFamily, + autoSelectFamilyAttemptTimeout, + allowH2 + } = opts + + const connector = typeof connect === 'function' + ? connect + : buildConnector({ + ...tls, + maxCachedSessions, + allowH2, + socketPath, + timeout: connectTimeout, + ...(typeof autoSelectFamily === 'boolean' ? { autoSelectFamily, autoSelectFamilyAttemptTimeout } : undefined), + ...connect + }) + + let offset = -1 + + return function dnsBalancedConnector (connectOpts, callback) { + dns.lookup(origin.hostname, { all: true, order: 'ipv4first' }, (err, addresses) => { + if (err) { + callback(err) + return + } + + const uniqueAddresses = [] + const seen = new Set() + + for (const address of addresses) { + const key = `${address.address}:${address.family}` + if (seen.has(key)) { + continue + } + + seen.add(key) + uniqueAddresses.push(address) + } + + if (uniqueAddresses.length === 0) { + callback(new Error(`No DNS entries found for ${origin.hostname}`)) + return + } + + offset = (offset + 1) % uniqueAddresses.length + const address = uniqueAddresses[offset] + + connector({ + ...connectOpts, + hostname: address.address, + servername: connectOpts.servername ?? origin.hostname + }, callback) + }) + } +} + class BalancedPool extends PoolBase { constructor (upstreams = [], { factory = defaultFactory, ...opts } = {}) { if (typeof factory !== 'function') { @@ -57,6 +123,7 @@ class BalancedPool extends PoolBase { super() this[kOptions] = { ...util.deepClone(opts) } + this[kOptions].connect = opts.connect this[kOptions].interceptors = opts.interceptors ? { ...opts.interceptors } : undefined @@ -79,7 +146,8 @@ class BalancedPool extends PoolBase { } addUpstream (upstream) { - const upstreamOrigin = util.parseOrigin(upstream).origin + const upstreamUrl = util.parseOrigin(upstream) + const upstreamOrigin = upstreamUrl.origin if (this[kClients].find((pool) => ( pool[kUrl].origin === upstreamOrigin && @@ -88,7 +156,15 @@ class BalancedPool extends PoolBase { ))) { return this } - const pool = this[kFactory](upstreamOrigin, this[kOptions]) + + const poolOptions = isIP(upstreamUrl.hostname) === 0 + ? { + ...this[kOptions], + connect: buildDnsBalancedConnector(upstreamUrl, this[kOptions]) + } + : this[kOptions] + + const pool = this[kFactory](upstreamOrigin, poolOptions) this[kAddClient](pool) pool.on('connect', () => { diff --git a/test/node-test/balanced-pool.js b/test/node-test/balanced-pool.js index 055c6f909b3..d453020bdf3 100644 --- a/test/node-test/balanced-pool.js +++ b/test/node-test/balanced-pool.js @@ -1,11 +1,14 @@ 'use strict' +const dns = require('node:dns') const { describe, test } = require('node:test') const assert = require('node:assert/strict') const { BalancedPool, Pool, Client, errors } = require('../..') +const buildConnector = require('../../lib/core/connect') const { createServer } = require('node:http') const { promisify } = require('node:util') const { tspl } = require('@matteo.collina/tspl') +const { kNeedDrain, kGetDispatcher } = require('../../lib/dispatcher/pool-base') test('throws when factory is not a function', (t) => { const p = tspl(t, { plan: 2 }) @@ -298,6 +301,194 @@ test('getUpstream returns undefined for closed/destroyed upstream', (t) => { p.strictEqual(result, undefined) }) +test('should balance hostname upstreams across resolved dns records', async (t) => { + const p = tspl(t, { plan: 13 }) + + const hostnames = [] + const hosts = [] + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + hosts.push(req.headers.host) + res.setHeader('content-type', 'text/plain') + res.end('hello') + }) + t.after(server.close.bind(server)) + + await promisify(server.listen).call(server, 0) + + const originalLookup = dns.lookup + dns.lookup = function lookup (hostname, options, callback) { + if (hostname !== 'service.local') { + return originalLookup.call(this, hostname, options, callback) + } + + queueMicrotask(() => { + callback(null, [ + { address: '127.0.0.1', family: 4 }, + { address: '127.0.0.2', family: 4 } + ]) + }) + } + t.after(() => { + dns.lookup = originalLookup + }) + + const connect = buildConnector({}) + const client = new BalancedPool(`http://service.local:${server.address().port}`, { + connections: 2, + pipelining: 1, + connect (opts, callback) { + hostnames.push(opts.hostname) + connect({ ...opts, hostname: '127.0.0.1' }, callback) + } + }) + t.after(client.destroy.bind(client)) + + const responses = await Promise.all([ + client.request({ path: '/', method: 'GET' }), + client.request({ path: '/', method: 'GET' }) + ]) + + for (const { statusCode, headers, body } of responses) { + p.strictEqual(statusCode, 200) + p.strictEqual(headers['content-type'], 'text/plain') + p.strictEqual(await body.text(), 'hello') + } + + p.deepStrictEqual(client.upstreams, [`http://service.local:${server.address().port}`]) + p.strictEqual(hosts.length, 2) + p.strictEqual(hosts[0], `service.local:${server.address().port}`) + p.strictEqual(hosts[1], `service.local:${server.address().port}`) + p.strictEqual(hostnames.length, 2) + p.ok(hostnames.includes('127.0.0.1')) + p.ok(hostnames.includes('127.0.0.2')) +}) + +test('should propagate dns lookup errors for hostname upstreams', async (t) => { + const p = tspl(t, { plan: 2 }) + + const originalLookup = dns.lookup + dns.lookup = function lookup (hostname, options, callback) { + if (hostname !== 'service.local') { + return originalLookup.call(this, hostname, options, callback) + } + + queueMicrotask(() => { + callback(new Error('lookup failed')) + }) + } + t.after(() => { + dns.lookup = originalLookup + }) + + const client = new BalancedPool('http://service.local:1') + t.after(client.destroy.bind(client)) + + try { + await client.request({ path: '/', method: 'GET' }) + } catch (err) { + p.strictEqual(err.message, 'lookup failed') + p.strictEqual(err.code, undefined) + } +}) + +test('should ignore duplicate dns records for hostname upstreams', async (t) => { + const p = tspl(t, { plan: 7 }) + + const hostnames = [] + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + res.end('ok') + }) + t.after(server.close.bind(server)) + + await promisify(server.listen).call(server, 0) + + const originalLookup = dns.lookup + dns.lookup = function lookup (hostname, options, callback) { + if (hostname !== 'service.local') { + return originalLookup.call(this, hostname, options, callback) + } + + queueMicrotask(() => { + callback(null, [ + { address: '127.0.0.1', family: 4 }, + { address: '127.0.0.1', family: 4 }, + { address: '127.0.0.2', family: 4 } + ]) + }) + } + t.after(() => { + dns.lookup = originalLookup + }) + + const connect = buildConnector({}) + const client = new BalancedPool(`http://service.local:${server.address().port}`, { + connections: 3, + pipelining: 1, + connect (opts, callback) { + hostnames.push(opts.hostname) + connect({ ...opts, hostname: '127.0.0.1' }, callback) + } + }) + t.after(client.destroy.bind(client)) + + const responses = await Promise.all([ + client.request({ path: '/', method: 'GET' }), + client.request({ path: '/', method: 'GET' }), + client.request({ path: '/', method: 'GET' }) + ]) + + for (const response of responses) { + p.strictEqual(await response.body.text(), 'ok') + } + + p.strictEqual(hostnames.length, 3) + p.strictEqual(hostnames[0], '127.0.0.1') + p.strictEqual(hostnames[1], '127.0.0.2') + p.strictEqual(hostnames[2], '127.0.0.1') +}) + +test('should fail when dns lookup returns no records for hostname upstreams', async (t) => { + const p = tspl(t, { plan: 1 }) + + const originalLookup = dns.lookup + dns.lookup = function lookup (hostname, options, callback) { + if (hostname !== 'service.local') { + return originalLookup.call(this, hostname, options, callback) + } + + queueMicrotask(() => { + callback(null, []) + }) + } + t.after(() => { + dns.lookup = originalLookup + }) + + const client = new BalancedPool('http://service.local:1') + t.after(client.destroy.bind(client)) + + await assert.rejects(client.request({ path: '/', method: 'GET' }), { + message: 'No DNS entries found for service.local' + }) + p.ok(true) +}) + +test('should return no dispatcher when all upstreams become busy', () => { + const pool = new BalancedPool() + const upstream = pool.addUpstream('http://localhost:3001').getUpstream('http://localhost:3001') + + let accesses = 0 + Object.defineProperty(upstream, kNeedDrain, { + configurable: true, + get () { + accesses++ + return accesses > 1 + } + }) + + assert.strictEqual(pool[kGetDispatcher](), undefined) +}) + class TestServer { constructor ({ config: { server, socketHangup, downOnRequests, socketHangupOnRequests }, onRequest }) { this.config = {