From b121331e0022915821226edc94f5fe031c10aec1 Mon Sep 17 00:00:00 2001 From: robobun Date: Wed, 10 Jun 2026 00:07:28 +0000 Subject: [PATCH 1/3] sql: make close({ timeout: 0 }) force-close immediately with queries in flight The pool close() in the postgres and mysql adapters gated the timeout option on truthiness, so timeout: 0 fell into the graceful-drain branch and waited for pending queries indefinitely, even though the code below the gate explicitly treats timeout === 0 as an immediate close. Gate on presence (!= null) instead, so 0 flows through validation and hits the immediate-close branch. undefined and null still mean graceful drain with no timer. --- src/js/internal/sql/shared.ts | 4 +- test/js/sql/sql-close-timeout-zero.test.ts | 248 +++++++++++++++++++++ 2 files changed, 251 insertions(+), 1 deletion(-) create mode 100644 test/js/sql/sql-close-timeout-zero.test.ts diff --git a/src/js/internal/sql/shared.ts b/src/js/internal/sql/shared.ts index 94e82acef85..b8c4fd39789 100644 --- a/src/js/internal/sql/shared.ts +++ b/src/js/internal/sql/shared.ts @@ -1203,7 +1203,9 @@ abstract class BaseSQLAdapter 2 ** 31 || timeout < 0 || timeout !== timeout) { throw $ERR_INVALID_ARG_VALUE("options.timeout", timeout, "must be a non-negative integer less than 2^31"); diff --git a/test/js/sql/sql-close-timeout-zero.test.ts b/test/js/sql/sql-close-timeout-zero.test.ts new file mode 100644 index 00000000000..ad98c830f2c --- /dev/null +++ b/test/js/sql/sql-close-timeout-zero.test.ts @@ -0,0 +1,248 @@ +// close({ timeout: 0 }) must force-close the pool immediately even when +// queries are in flight. It used to be gated on truthiness, so 0 fell into +// the graceful-drain branch and close() waited for pending queries forever. +// https://github.com/oven-sh/bun/issues/32038 +// +// Mock servers complete the handshake and then never answer the query, so +// the query stays in flight until the pool force-closes the connection. +import { SQL } from "bun"; +import { expect, test } from "bun:test"; +import net from "net"; + +// --- Postgres wire helpers (mirrors postgres-multi-statement-fields.test.ts) --- + +function pkt(type: string, body: Buffer): Buffer { + const header = Buffer.alloc(5); + header.write(type, 0); + header.writeInt32BE(body.length + 4, 1); + return Buffer.concat([header, body]); +} + +function int16(n: number): Buffer { + const b = Buffer.alloc(2); + b.writeInt16BE(n, 0); + return b; +} + +function int32(n: number): Buffer { + const b = Buffer.alloc(4); + b.writeInt32BE(n, 0); + return b; +} + +function cstr(s: string): Buffer { + return Buffer.concat([Buffer.from(s), Buffer.from([0])]); +} + +const authenticationOk = pkt("R", int32(0)); +const readyForQuery = pkt("Z", Buffer.from("I")); + +function rowDescription(names: string[]): Buffer { + const fields = Buffer.concat( + names.map(name => + Buffer.concat([ + cstr(name), // column name + int32(0), // table oid + int16(0), // column attr number + int32(25), // type oid: text + int16(-1), // type size + int32(-1), // type modifier + int16(0), // format: text + ]), + ), + ); + return pkt("T", Buffer.concat([int16(names.length), fields])); +} + +function dataRow(values: string[]): Buffer { + const cols = Buffer.concat( + values.map(v => { + const bytes = Buffer.from(v); + return Buffer.concat([int32(bytes.length), bytes]); + }), + ); + return pkt("D", Buffer.concat([int16(values.length), cols])); +} + +interface PostgresMock { + port: number; + server: net.Server; + sockets: Set; + queryReceived: Promise; +} + +// Completes the startup handshake, then hands every post-startup chunk to +// onQuery (default: swallow it, leaving the query in flight forever). +async function postgresMock(onQuery?: (socket: net.Socket, data: Buffer) => void): Promise { + const queryReceived = Promise.withResolvers(); + const sockets = new Set(); + const server = net.createServer(socket => { + sockets.add(socket); + let startup = true; + socket.on("data", data => { + if (startup) { + startup = false; + socket.write(Buffer.concat([authenticationOk, readyForQuery])); + return; + } + onQuery?.(socket, data); + queryReceived.resolve(); + }); + }); + await new Promise(r => server.listen(0, "127.0.0.1", () => r())); + const port = (server.address() as net.AddressInfo).port; + return { port, server, sockets, queryReceived: queryReceived.promise }; +} + +// --- MySQL wire helpers (mirrors sql-mysql-datetime-text-mock-fixture.ts) --- + +function u16le(n: number): Buffer { + return Buffer.from([n & 0xff, (n >> 8) & 0xff]); +} +function u24le(n: number): Buffer { + return Buffer.from([n & 0xff, (n >> 8) & 0xff, (n >> 16) & 0xff]); +} +function u32le(n: number): Buffer { + return Buffer.from([n & 0xff, (n >> 8) & 0xff, (n >> 16) & 0xff, (n >>> 24) & 0xff]); +} +function mysqlPacket(seq: number, payload: Buffer): Buffer { + return Buffer.concat([u24le(payload.length), Buffer.from([seq]), payload]); +} + +const CLIENT_PROTOCOL_41 = 1 << 9; +const CLIENT_SECURE_CONNECTION = 1 << 15; +const CLIENT_PLUGIN_AUTH = 1 << 19; +const CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA = 1 << 21; +const CLIENT_DEPRECATE_EOF = 1 << 24; +const SERVER_CAPS = + CLIENT_PROTOCOL_41 | + CLIENT_SECURE_CONNECTION | + CLIENT_PLUGIN_AUTH | + CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA | + CLIENT_DEPRECATE_EOF; + +function handshakeV10(): Buffer { + const authData1 = Buffer.alloc(8, 0x61); + const authData2 = Buffer.alloc(13, 0x62); + authData2[12] = 0; + const payload = Buffer.concat([ + Buffer.from([10]), // protocol version + Buffer.from("mock-5.7.0\0"), + u32le(1), // connection id + authData1, + Buffer.from([0]), // filler + u16le(SERVER_CAPS & 0xffff), + Buffer.from([0x2d]), // utf8mb4_general_ci + u16le(0x0002), // SERVER_STATUS_AUTOCOMMIT + u16le((SERVER_CAPS >>> 16) & 0xffff), + Buffer.from([21]), // length of auth-plugin-data + Buffer.alloc(10, 0), // reserved + authData2, + Buffer.from("mysql_native_password\0"), + ]); + return mysqlPacket(0, payload); +} + +function okPacket(seq: number): Buffer { + return mysqlPacket(seq, Buffer.from([0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00])); +} + +test("postgres: close({ timeout: 0 }) settles immediately with a query in flight", async () => { + const mock = await postgresMock(); + const sql = new SQL({ url: `postgres://u@127.0.0.1:${mock.port}/db`, max: 1 }); + + try { + // .catch() starts execution; the mock never answers, so the query stays + // in flight until the pool force-closes the connection. + const pending = sql`select 1`.catch(e => e.code); + await mock.queryReceived; + + // Without the fix this waits for the in-flight query forever and the + // test times out. + await sql.close({ timeout: 0 }); + + expect(await pending).toBe("ERR_POSTGRES_CONNECTION_CLOSED"); + } finally { + for (const socket of mock.sockets) socket.destroy(); + mock.server.close(); + } +}); + +test("postgres: close({ timeout: null }) still drains gracefully", async () => { + // Guards the presence check: null (like undefined) means "no timeout", + // not "timeout of 0", so pending queries finish instead of being killed. + let respond: (() => void) | undefined; + const mock = await postgresMock((socket, data) => { + if (data[0] !== 0x51 /* 'Q' */) return; + respond = () => { + socket.write( + Buffer.concat([rowDescription(["x"]), dataRow(["1"]), pkt("C", cstr("SELECT 1")), readyForQuery]), + ); + }; + }); + const sql = new SQL({ url: `postgres://u@127.0.0.1:${mock.port}/db`, max: 1 }); + + try { + const query = sql`select 1 as x`.simple(); + const result = query.then(r => r); + await mock.queryReceived; + + // Enters the graceful-drain branch synchronously (the query is still in + // flight), then the server releases the response. + const closing = sql.close({ timeout: null }); + respond!(); + + expect(await result).toEqual([{ x: "1" }]); + await closing; + } finally { + for (const socket of mock.sockets) socket.destroy(); + mock.server.close(); + } +}); + +test("mysql: close({ timeout: 0 }) settles immediately with a query in flight", async () => { + const queryReceived = Promise.withResolvers(); + const sockets = new Set(); + const server = net.createServer(socket => { + sockets.add(socket); + let buffered = Buffer.alloc(0); + let authed = false; + + socket.write(handshakeV10()); + + socket.on("data", chunk => { + buffered = Buffer.concat([buffered, chunk]); + while (buffered.length >= 4) { + const len = buffered[0] | (buffered[1] << 8) | (buffered[2] << 16); + if (buffered.length < 4 + len) break; + const seq = buffered[3]; + buffered = buffered.subarray(4 + len); + + if (!authed) { + authed = true; + socket.write(okPacket(seq + 1)); + continue; + } + + // Swallow the query so it stays in flight forever. + queryReceived.resolve(); + } + }); + }); + await new Promise(r => server.listen(0, "127.0.0.1", () => r())); + const { port } = server.address() as net.AddressInfo; + + const sql = new SQL({ url: `mysql://root@127.0.0.1:${port}/db`, max: 1 }); + + try { + const pending = sql`select 1`.catch(e => e.code); + await queryReceived.promise; + + await sql.close({ timeout: 0 }); + + expect(await pending).toBe("ERR_MYSQL_CONNECTION_CLOSED"); + } finally { + for (const socket of sockets) socket.destroy(); + server.close(); + } +}); From 8dd9f5e415f08d1e228c40968118b0bfcf315549 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 10 Jun 2026 00:09:34 +0000 Subject: [PATCH 2/3] [autofix.ci] apply automated fixes --- test/js/sql/sql-close-timeout-zero.test.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/js/sql/sql-close-timeout-zero.test.ts b/test/js/sql/sql-close-timeout-zero.test.ts index ad98c830f2c..cf4c29e1dc6 100644 --- a/test/js/sql/sql-close-timeout-zero.test.ts +++ b/test/js/sql/sql-close-timeout-zero.test.ts @@ -175,9 +175,7 @@ test("postgres: close({ timeout: null }) still drains gracefully", async () => { const mock = await postgresMock((socket, data) => { if (data[0] !== 0x51 /* 'Q' */) return; respond = () => { - socket.write( - Buffer.concat([rowDescription(["x"]), dataRow(["1"]), pkt("C", cstr("SELECT 1")), readyForQuery]), - ); + socket.write(Buffer.concat([rowDescription(["x"]), dataRow(["1"]), pkt("C", cstr("SELECT 1")), readyForQuery])); }; }); const sql = new SQL({ url: `postgres://u@127.0.0.1:${mock.port}/db`, max: 1 }); From 83ce87fb137ac19bd1616908dcdf5b52654d4455 Mon Sep 17 00:00:00 2001 From: robobun Date: Wed, 10 Jun 2026 00:17:35 +0000 Subject: [PATCH 3/3] test: reject listen failures, cover mysql graceful drain with timeout: null --- test/js/sql/sql-close-timeout-zero.test.ts | 154 ++++++++++++++++----- 1 file changed, 117 insertions(+), 37 deletions(-) diff --git a/test/js/sql/sql-close-timeout-zero.test.ts b/test/js/sql/sql-close-timeout-zero.test.ts index cf4c29e1dc6..10c59c26cc2 100644 --- a/test/js/sql/sql-close-timeout-zero.test.ts +++ b/test/js/sql/sql-close-timeout-zero.test.ts @@ -9,6 +9,16 @@ import { SQL } from "bun"; import { expect, test } from "bun:test"; import net from "net"; +function listen(server: net.Server): Promise { + const { promise, resolve, reject } = Promise.withResolvers(); + server.once("error", reject); + server.listen(0, "127.0.0.1", () => { + server.removeListener("error", reject); + resolve(); + }); + return promise; +} + // --- Postgres wire helpers (mirrors postgres-multi-statement-fields.test.ts) --- function pkt(type: string, body: Buffer): Buffer { @@ -64,7 +74,7 @@ function dataRow(values: string[]): Buffer { return pkt("D", Buffer.concat([int16(values.length), cols])); } -interface PostgresMock { +interface Mock { port: number; server: net.Server; sockets: Set; @@ -73,7 +83,7 @@ interface PostgresMock { // Completes the startup handshake, then hands every post-startup chunk to // onQuery (default: swallow it, leaving the query in flight forever). -async function postgresMock(onQuery?: (socket: net.Socket, data: Buffer) => void): Promise { +async function postgresMock(onQuery?: (socket: net.Socket, data: Buffer) => void): Promise { const queryReceived = Promise.withResolvers(); const sockets = new Set(); const server = net.createServer(socket => { @@ -89,8 +99,8 @@ async function postgresMock(onQuery?: (socket: net.Socket, data: Buffer) => void queryReceived.resolve(); }); }); - await new Promise(r => server.listen(0, "127.0.0.1", () => r())); - const port = (server.address() as net.AddressInfo).port; + await listen(server); + const { port } = server.address() as net.AddressInfo; return { port, server, sockets, queryReceived: queryReceived.promise }; } @@ -108,6 +118,11 @@ function u32le(n: number): Buffer { function mysqlPacket(seq: number, payload: Buffer): Buffer { return Buffer.concat([u24le(payload.length), Buffer.from([seq]), payload]); } +function lenencStr(s: string): Buffer { + const buf = Buffer.from(s, "utf-8"); + if (buf.length >= 0xfb) throw new Error("lenencStr: only the 1-byte form is needed here"); + return Buffer.concat([Buffer.from([buf.length]), buf]); +} const CLIENT_PROTOCOL_41 = 1 << 9; const CLIENT_SECURE_CONNECTION = 1 << 15; @@ -147,6 +162,74 @@ function okPacket(seq: number): Buffer { return mysqlPacket(seq, Buffer.from([0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00])); } +function mysqlColumnDefinition(name: string): Buffer { + return Buffer.concat([ + lenencStr("def"), + lenencStr(""), + lenencStr("t"), + lenencStr("t"), + lenencStr(name), + lenencStr(name), + Buffer.from([0x0c]), // fixed-length-fields length = 12 + u16le(33), // utf8_general_ci + u32le(32), // column_length (display width) + Buffer.from([0xfd]), // MYSQL_TYPE_VAR_STRING + u16le(0), // flags + Buffer.from([0]), // decimals + Buffer.from([0, 0]), // reserved + ]); +} + +// Text-protocol result set: one column, one row. +function mysqlTextResultSet(startSeq: number, column: string, value: string): Buffer { + let seq = startSeq; + return Buffer.concat([ + mysqlPacket(seq++, Buffer.from([1])), // column count + mysqlPacket(seq++, mysqlColumnDefinition(column)), + mysqlPacket(seq++, lenencStr(value)), // row + // OK packet closing the result set (CLIENT_DEPRECATE_EOF, header 0xfe). + mysqlPacket(seq++, Buffer.from([0xfe, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00])), + ]); +} + +// Sends the handshake, OKs the auth response, then hands every post-auth +// packet to onCommand (default: swallow it, leaving the query in flight +// forever). +async function mysqlMock(onCommand?: (socket: net.Socket, seq: number, payload: Buffer) => void): Promise { + const queryReceived = Promise.withResolvers(); + const sockets = new Set(); + const server = net.createServer(socket => { + sockets.add(socket); + let buffered = Buffer.alloc(0); + let authed = false; + + socket.write(handshakeV10()); + + socket.on("data", chunk => { + buffered = Buffer.concat([buffered, chunk]); + while (buffered.length >= 4) { + const len = buffered[0] | (buffered[1] << 8) | (buffered[2] << 16); + if (buffered.length < 4 + len) break; + const seq = buffered[3]; + const payload = buffered.subarray(4, 4 + len); + buffered = buffered.subarray(4 + len); + + if (!authed) { + authed = true; + socket.write(okPacket(seq + 1)); + continue; + } + + onCommand?.(socket, seq, payload); + queryReceived.resolve(); + } + }); + }); + await listen(server); + const { port } = server.address() as net.AddressInfo; + return { port, server, sockets, queryReceived: queryReceived.promise }; +} + test("postgres: close({ timeout: 0 }) settles immediately with a query in flight", async () => { const mock = await postgresMock(); const sql = new SQL({ url: `postgres://u@127.0.0.1:${mock.port}/db`, max: 1 }); @@ -199,48 +282,45 @@ test("postgres: close({ timeout: null }) still drains gracefully", async () => { }); test("mysql: close({ timeout: 0 }) settles immediately with a query in flight", async () => { - const queryReceived = Promise.withResolvers(); - const sockets = new Set(); - const server = net.createServer(socket => { - sockets.add(socket); - let buffered = Buffer.alloc(0); - let authed = false; + const mock = await mysqlMock(); + const sql = new SQL({ url: `mysql://root@127.0.0.1:${mock.port}/db`, max: 1 }); - socket.write(handshakeV10()); + try { + const pending = sql`select 1`.catch(e => e.code); + await mock.queryReceived; - socket.on("data", chunk => { - buffered = Buffer.concat([buffered, chunk]); - while (buffered.length >= 4) { - const len = buffered[0] | (buffered[1] << 8) | (buffered[2] << 16); - if (buffered.length < 4 + len) break; - const seq = buffered[3]; - buffered = buffered.subarray(4 + len); + await sql.close({ timeout: 0 }); - if (!authed) { - authed = true; - socket.write(okPacket(seq + 1)); - continue; - } + expect(await pending).toBe("ERR_MYSQL_CONNECTION_CLOSED"); + } finally { + for (const socket of mock.sockets) socket.destroy(); + mock.server.close(); + } +}); - // Swallow the query so it stays in flight forever. - queryReceived.resolve(); - } - }); +test("mysql: close({ timeout: null }) still drains gracefully", async () => { + let respond: (() => void) | undefined; + const mock = await mysqlMock((socket, seq, payload) => { + if (payload[0] !== 0x03 /* COM_QUERY */) return; + respond = () => { + socket.write(mysqlTextResultSet(seq + 1, "x", "1")); + }; }); - await new Promise(r => server.listen(0, "127.0.0.1", () => r())); - const { port } = server.address() as net.AddressInfo; - - const sql = new SQL({ url: `mysql://root@127.0.0.1:${port}/db`, max: 1 }); + const sql = new SQL({ url: `mysql://root@127.0.0.1:${mock.port}/db`, max: 1 }); try { - const pending = sql`select 1`.catch(e => e.code); - await queryReceived.promise; + // .simple() forces the text protocol (COM_QUERY). + const query = sql`select 1 as x`.simple(); + const result = query.then(r => r); + await mock.queryReceived; - await sql.close({ timeout: 0 }); + const closing = sql.close({ timeout: null }); + respond!(); - expect(await pending).toBe("ERR_MYSQL_CONNECTION_CLOSED"); + expect(await result).toEqual([{ x: "1" }]); + await closing; } finally { - for (const socket of sockets) socket.destroy(); - server.close(); + for (const socket of mock.sockets) socket.destroy(); + mock.server.close(); } });