diff --git a/src/js/internal/sql/shared.ts b/src/js/internal/sql/shared.ts index 94e82acef85..eeaee00bc93 100644 --- a/src/js/internal/sql/shared.ts +++ b/src/js/internal/sql/shared.ts @@ -1015,7 +1015,9 @@ abstract class BaseSQLAdapter !(c.flags & PooledConnectionFlags.preReserved) && c.queryCount < maxDistribution, + c => + !(c.flags & (PooledConnectionFlags.preReserved | PooledConnectionFlags.reserved)) && + c.queryCount < maxDistribution, ); if (nonReservedConnections.length === 0) { return; @@ -1087,6 +1089,9 @@ abstract class BaseSQLAdapter { - if self - .flags - .get() - .contains(ConnectionFlags::WAITING_TO_PREPARE) - || self.nonpipelinable_requests.get() > 0 - { + // can_pipeline() covers WAITING_TO_PREPARE and + // nonpipelinable_requests, and additionally stops us from + // writing past in-flight requests when pipelining is off + // (BUN_FEATURE_FLAG_DISABLE_SQL_AUTO_PIPELINING, unnamed + // prepared statements) or the write buffer is already at + // MAX_PIPELINE_SIZE. + if !self.can_pipeline() { defer_cleanup!(self); return; } diff --git a/src/sql_jsc/postgres/PostgresSQLQuery.rs b/src/sql_jsc/postgres/PostgresSQLQuery.rs index c2c3b45768b..8e1232b6b48 100644 --- a/src/sql_jsc/postgres/PostgresSQLQuery.rs +++ b/src/sql_jsc/postgres/PostgresSQLQuery.rs @@ -657,50 +657,19 @@ impl PostgresSQLQuery { unsafe { Self::deref(this_ptr) }; return Err(global_object.throw_value(error_response)); } - StatementStatus::Prepared => { - if !connection.has_query_running() || connection.can_pipeline() { - this.update_flags(|f| f.binary = !stmt.fields.is_empty()); - bun_core::scoped_log!(Postgres, "bindAndExecute"); - - // bindAndExecute will bind + execute, it will change to running after binding is complete - if let Err(err) = PostgresRequest::bind_and_execute( - global_object, - stmt, - binding_value, - columns_value, - writer, - ) { - // fail to run do cleanup — drop the ref we took above. - this.release_statement(); - // SAFETY: undoes the speculative `this.ref_()` above; count was ≥2, never frees here. - unsafe { Self::deref(this_ptr) }; - - if !global_object.has_exception() { - return Err(global_object.throw_value( - postgres_error_to_js( - global_object, - Some(b"failed to bind and execute query"), - err, - ), - )); - } - return Err(JsError::Thrown); - } - { - let mut f = connection.flags.get(); - f.set(ConnectionFlags::IS_READY_FOR_QUERY, false); - connection.flags.set(f); - } - this.status.set(Status::Binding); - this.update_flags(|f| f.pipelined = true); - connection - .pipelined_requests - .set(connection.pipelined_requests.get() + 1); - - did_write = true; - } + StatementStatus::Prepared + | StatementStatus::Parsing + | StatementStatus::Pending => { + // Leave the request Pending; advance() (reached via + // advance_and_flush() after the enqueue below) writes it only + // once every earlier queued request has been written, so wire + // order always matches queue order. Writing Bind+Execute here + // could bypass a queued-but-unwritten request (e.g. a + // simple-protocol COMMIT waiting for the pipeline to drain); + // responses are matched to requests in FIFO queue order, so + // the bypassed request would steal this query's result and + // permanently desync the connection. } - StatementStatus::Parsing | StatementStatus::Pending => {} } break 'enqueue; diff --git a/test/js/sql/postgres-pool-pipeline-flag-fixture.ts b/test/js/sql/postgres-pool-pipeline-flag-fixture.ts new file mode 100644 index 00000000000..6c7230fca8d --- /dev/null +++ b/test/js/sql/postgres-pool-pipeline-flag-fixture.ts @@ -0,0 +1,56 @@ +// Fixture for postgres-pool-transaction-stall.test.ts ("pipelining feature +// flag keeps one query in flight per connection"). Runs with +// BUN_FEATURE_FLAG_DISABLE_SQL_AUTO_PIPELINING=1: a second prepared query +// fired while another is in flight must not be written to the wire until the +// first one's response arrives. +import { SQL } from "bun"; + +const url = process.env.DATABASE_URL!; +const sql = new SQL({ url, max: 1 }); +const ctl = new SQL({ url, max: 1 }); + +// Hang guard: turn "hangs forever" into a loud nonzero exit. +const watchdog = setTimeout(() => { + console.error("WATCHDOG: queries never completed"); + process.exit(1); +}, 15_000); + +function step(name: string) { + console.log(`STEP ${name}`); +} + +// Warm the statement so later executions take the already-prepared path. +await sql`select ${1}::int as hold_me`; +step("prepared"); + +// From here on, the mock server holds the response to hold_me executions +// until the release control query arrives. +await ctl.unsafe("/* ctl:arm_hold */ select 1"); +step("armed"); + +// q1 is held by the mock. q2 is fired while q1 is in flight; with pipelining +// disabled it must stay queued client-side until q1 completes. +const q1 = sql`select ${2}::int as hold_me`; +q1.execute(); +const q2 = sql`select ${3}::int as hold_me`; +q2.execute(); + +// Let the connection's deferred flush run so anything the client (wrongly) +// decided to write for q2 is on the socket before the control query below. +await new Promise(resolve => setImmediate(resolve)); + +// Release the held response. The mock snapshots how many hold_me Binds have +// arrived when it handles this query; the parent test asserts q2's was not +// among them. +await ctl.unsafe("/* ctl:release_slow */ select 1"); +step("released"); + +await q1; +step("q1 done"); +await q2; +step("q2 done"); + +clearTimeout(watchdog); +await sql.close(); +await ctl.close(); +console.log("DONE"); diff --git a/test/js/sql/postgres-pool-transaction-stall-fixture.ts b/test/js/sql/postgres-pool-transaction-stall-fixture.ts new file mode 100644 index 00000000000..d9a47bf1afe --- /dev/null +++ b/test/js/sql/postgres-pool-transaction-stall-fixture.ts @@ -0,0 +1,89 @@ +// Fixture for postgres-pool-transaction-stall.test.ts. Drives the connection +// pool into the state from https://github.com/oven-sh/bun/issues/32004: +// a transaction acquires its connection through the release() -> reservedQueue +// handoff while concurrent pooled prepared-statement queries are running. +import { SQL } from "bun"; + +const url = process.env.DATABASE_URL!; +const sql = new SQL({ url, max: 1 }); +const ctl = new SQL({ url, max: 1 }); + +// Hang guard: the bug wedges the pool forever. Turn "hangs forever" into a +// loud nonzero exit so the parent test fails fast with a useful message. +const watchdog = setTimeout(() => { + console.error("WATCHDOG: pool wedged, queries never completed"); + process.exit(1); +}, 20_000); + +function step(name: string) { + console.log(`STEP ${name}`); +} + +// Prepare all three statements up-front so later executions of the same query +// text take the already-prepared pipelining path in the native queue. +await sql`select ${1}::int as hold_me`; +await sql`select ${1}::int as fast_q`; +await sql`select ${1}::int as warmup_q`; +step("prepared"); + +// From here on, the mock server holds the response to hold_me executions +// until the release control query arrives. +await ctl.unsafe("/* ctl:arm_hold */ select 1"); +step("armed"); + +// A pooled query that is still in flight when sql.begin() is called, so the +// transaction cannot take the direct reserved path in connect() and instead +// waits in reservedQueue for the release() handoff. +const p0 = sql`select ${2}::int as warmup_q`; +p0.execute(); + +const bodyGate = Promise.withResolvers(); +let slowQ: Promise; +let victimQ: Promise; + +const txP = sql.begin(async tx => { + // Pooled query (NOT tx): with the pool bug it is distributed onto this + // transaction's connection and written to the wire immediately. The mock + // server holds its response until the control query arrives. + slowQ = sql`select ${3}::int as hold_me`; + (slowQ as any).execute(); + // Simple-protocol query on the transaction connection: it is queued + // UNWRITTEN behind slowQ until the pipeline drains. + victimQ = tx.unsafe("select 641 as victim_q"); + (victimQ as any).execute(); + bodyGate.resolve(); + await victimQ; + step("victim resolved"); +}); + +await p0; +step("p0 done"); +await bodyGate.promise; +step("body gate"); + +// Another pooled prepared query while victim_q is queued unwritten. The bug: +// run()'s pipelining fast path writes its Bind+Execute to the wire ahead of +// the queued victim_q, so the server's response to fast_q is attributed to +// victim_q (FIFO queue order) and the connection desyncs permanently. +const fastQ = sql`select ${4}::int as fast_q`; +fastQ.execute(); + +// Release the held response for slowQ. +await ctl.unsafe("/* ctl:release_slow */ select 1"); +step("released"); + +await fastQ; +step("fast resolved"); +await slowQ!; +step("slow resolved"); +await txP; +step("tx resolved"); + +// The pool must still be usable afterwards. +await sql`select ${5}::int as warmup_q`; +step("pool alive"); + +clearTimeout(watchdog); +await sql.close(); +await ctl.close(); +console.log("DONE"); diff --git a/test/js/sql/postgres-pool-transaction-stall.test.ts b/test/js/sql/postgres-pool-transaction-stall.test.ts new file mode 100644 index 00000000000..4589b158d05 --- /dev/null +++ b/test/js/sql/postgres-pool-transaction-stall.test.ts @@ -0,0 +1,386 @@ +// https://github.com/oven-sh/bun/issues/32004 +// +// Under concurrency, a Bun.SQL postgres pool could permanently stall when +// sql.begin() transactions ran alongside pooled prepared-statement queries: +// +// 1. release() handed an idle connection to a waiting sql.begin() but did not +// remove it from readyConnections, and flushConcurrentQueries() did not +// filter reserved connections, so pooled queries kept getting distributed +// onto the transaction's connection. +// 2. The native queue's pipelining fast path could then write a prepared +// query's Bind+Execute to the wire while an earlier queued simple-protocol +// request (e.g. the transaction's COMMIT) was still unwritten. Responses +// are matched to requests in FIFO queue order, so the unwritten request +// stole the pipelined query's result: the transaction "committed" without +// COMMIT ever reaching the server (left "idle in transaction"), the +// nonpipelinable request counter underflowed, and the connection wedged +// forever while the stolen-from query waited for a connection that never +// came back. +// +// The tests run a scripted mock postgres server so both sides of the race are +// deterministic: the mock holds one query's response until a control query on +// a second connection arrives, which forces the exact interleaving. +import { expect, test } from "bun:test"; +import { bunEnv, bunExe } from "harness"; +import net from "node:net"; +import path from "node:path"; + +function pkt(type: string, body: Buffer = Buffer.alloc(0)): Buffer { + const header = Buffer.alloc(5); + header.write(type, 0); + header.writeInt32BE(body.length + 4, 1); + return Buffer.concat([header, body]); +} + +function int16(n: number): Buffer { + const b = Buffer.alloc(2); + b.writeInt16BE(n, 0); + return b; +} + +function int32(n: number): Buffer { + const b = Buffer.alloc(4); + b.writeInt32BE(n, 0); + return b; +} + +function cstr(s: string): Buffer { + return Buffer.concat([Buffer.from(s), Buffer.from([0])]); +} + +const authenticationOk = pkt("R", int32(0)); +const readyForQuery = pkt("Z", Buffer.from("I")); +const parseComplete = pkt("1"); +const bindComplete = pkt("2"); +// zero result columns for every statement: no DataRow needed +const rowDescription = pkt("T", int16(0)); +const commandComplete = (tag: string) => pkt("C", cstr(tag)); +const parameterDescription = (oids: number[]) => pkt("t", Buffer.concat([int16(oids.length), ...oids.map(int32)])); + +interface Frame { + type: string; + body: Buffer; +} + +interface Conn { + socket: net.Socket; + buf: Buffer; + sawStartup: boolean; + frames: Frame[]; + busy: boolean; + // prepared statement name -> { query text, declared param oids } + statements: Map; + // arrival-order log of P/B/E/Q frames for assertions, recorded when the + // frame is parsed off the socket (holds only delay responses, not logging) + log: string[]; +} + +function readCStr(buf: Buffer, offset: number): [string, number] { + const end = buf.indexOf(0, offset); + return [buf.toString("utf8", offset, end), end + 1]; +} + +// Scripted mini postgres server. Responds to everything immediately, except +// that once a "/* ctl:arm_hold */" control query has been seen, a Bind to a +// statement whose text contains "hold_me" blocks that connection's responses +// (like a slow query) until a "/* ctl:release_slow */" control query arrives +// on any connection. +function startMockServer() { + const conns: Conn[] = []; + const release = Promise.withResolvers(); + let armed = false; + let released = false; + // frame handler failures; asserted empty by the tests so protocol bugs in + // the mock surface as failures instead of fixture hangs + const errors: unknown[] = []; + // count of hold_me Binds that had arrived (across connections) when the + // release control query was handled + let holdMeBindsAtRelease = -1; + + const countHoldMeBinds = () => + conns.reduce((n, c) => n + c.log.filter(entry => entry.startsWith("B:") && entry.includes("hold_me")).length, 0); + + function logFrame(conn: Conn, frame: Frame) { + const { type, body } = frame; + switch (type) { + case "P": { + // Parse: name, query, nParams, oids + const [name, afterName] = readCStr(body, 0); + const [query, afterQuery] = readCStr(body, afterName); + const nParams = body.readInt16BE(afterQuery); + const oids: number[] = []; + for (let i = 0; i < nParams; i++) { + oids.push(body.readInt32BE(afterQuery + 2 + i * 4)); + } + conn.statements.set(name, { query, oids }); + conn.log.push(`P:${query}`); + break; + } + case "B": { + // Bind: portal, statement name + const [, afterPortal] = readCStr(body, 0); + const [name] = readCStr(body, afterPortal); + const stmt = conn.statements.get(name); + conn.log.push(`B:${stmt ? stmt.query : ""}`); + break; + } + case "E": { + conn.log.push("E"); + break; + } + case "Q": { + const [query] = readCStr(body, 0); + conn.log.push(`Q:${query}`); + break; + } + default: + break; + } + } + + async function handleFrame(conn: Conn, frame: Frame) { + const { type, body } = frame; + switch (type) { + case "P": { + conn.socket.write(parseComplete); + break; + } + case "D": { + // Describe statement: echo the Parse-declared param oids, zero columns + const [name] = readCStr(body, 1); + const stmt = conn.statements.get(name); + conn.socket.write(Buffer.concat([parameterDescription(stmt ? stmt.oids : []), rowDescription])); + break; + } + case "B": { + const [, afterPortal] = readCStr(body, 0); + const [name] = readCStr(body, afterPortal); + const stmt = conn.statements.get(name); + const query = stmt ? stmt.query : ""; + if (query.includes("hold_me") && armed && !released) { + // act like a slow query: block this connection's responses (and + // everything queued after it) until the control query arrives + await release.promise; + } + conn.socket.write(bindComplete); + break; + } + case "E": { + conn.socket.write(commandComplete("SELECT 0")); + break; + } + case "S": { + conn.socket.write(readyForQuery); + break; + } + case "Q": { + const [query] = readCStr(body, 0); + if (query.includes("ctl:arm_hold")) { + armed = true; + } + if (query.includes("ctl:release_slow")) { + released = true; + holdMeBindsAtRelease = countHoldMeBinds(); + release.resolve(); + } + let tag = "SELECT 1"; + const first = query.trimStart().slice(0, 8).toUpperCase(); + if (first.startsWith("BEGIN")) tag = "BEGIN"; + else if (first.startsWith("COMMIT")) tag = "COMMIT"; + else if (first.startsWith("ROLLBACK")) tag = "ROLLBACK"; + conn.socket.write(Buffer.concat([commandComplete(tag), readyForQuery])); + break; + } + case "H": // Flush: no response + case "X": // Terminate + break; + default: + break; + } + } + + async function pump(conn: Conn) { + if (conn.busy) return; + conn.busy = true; + try { + while (conn.frames.length > 0) { + await handleFrame(conn, conn.frames.shift()!); + } + } finally { + conn.busy = false; + } + } + + const server = net.createServer(socket => { + const conn: Conn = { + socket, + buf: Buffer.alloc(0), + sawStartup: false, + frames: [], + busy: false, + statements: new Map(), + log: [], + }; + conns.push(conn); + socket.on("error", () => {}); + socket.on("data", data => { + conn.buf = Buffer.concat([conn.buf, data]); + while (true) { + if (!conn.sawStartup) { + if (conn.buf.length < 4) break; + const len = conn.buf.readInt32BE(0); + if (conn.buf.length < len) break; + conn.buf = conn.buf.subarray(len); + conn.sawStartup = true; + socket.write(Buffer.concat([authenticationOk, readyForQuery])); + continue; + } + if (conn.buf.length < 5) break; + const len = conn.buf.readInt32BE(1); + if (conn.buf.length < len + 1) break; + const frame: Frame = { + type: conn.buf.toString("utf8", 0, 1), + body: conn.buf.subarray(5, len + 1), + }; + conn.buf = conn.buf.subarray(len + 1); + logFrame(conn, frame); + conn.frames.push(frame); + } + pump(conn).catch(err => { + errors.push(err); + // kill the connection so the fixture fails fast instead of hanging + socket.destroy(err instanceof Error ? err : new Error(String(err))); + }); + }); + }); + + return { + server, + conns, + errors, + forceRelease: () => release.resolve(), + holdMeBindsAtRelease: () => holdMeBindsAtRelease, + listen: () => + new Promise(resolve => + server.listen(0, "127.0.0.1", () => resolve((server.address() as net.AddressInfo).port)), + ), + }; +} + +async function runFixture(fixture: string, port: number, extraEnv: Record = {}) { + await using proc = Bun.spawn({ + cmd: [bunExe(), path.join(import.meta.dir, fixture)], + env: { + ...bunEnv, + ...extraEnv, + DATABASE_URL: `postgres://bun:bun@127.0.0.1:${port}/bun?sslmode=disable`, + }, + stdout: "pipe", + stderr: "pipe", + }); + + const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]); + return { stdout, stderr, exitCode }; +} + +test("pool does not stall when sql.begin() runs concurrently with pooled prepared queries", async () => { + const mock = startMockServer(); + const port = await mock.listen(); + + try { + const { stdout, stderr, exitCode } = await runFixture("postgres-pool-transaction-stall-fixture.ts", port); + + // every stage of the fixture must have completed, in order; only the + // relative order of "released" and "victim resolved" legitimately depends + // on scheduling, so normalize that adjacent pair before the exact + // sequence comparison + const steps = stdout.split(/\r?\n/).filter(line => line.startsWith("STEP ") || line === "DONE"); + const released = steps.indexOf("STEP released"); + const victim = steps.indexOf("STEP victim resolved"); + if (released !== -1 && victim === released - 1) { + steps[victim] = "STEP released"; + steps[released] = "STEP victim resolved"; + } + expect({ + steps: steps.join("\n"), + stderr: stderr.includes("WATCHDOG") ? "WATCHDOG" : "", + exitCode, + mockErrors: mock.errors, + }).toEqual({ + steps: [ + "STEP prepared", + "STEP armed", + "STEP p0 done", + "STEP body gate", + "STEP released", + "STEP victim resolved", + "STEP fast resolved", + "STEP slow resolved", + "STEP tx resolved", + "STEP pool alive", + "DONE", + ].join("\n"), + stderr: "", + exitCode: 0, + mockErrors: [], + }); + + // wire-order assertions on the transaction's connection: while the + // transaction owns the connection, no pooled query may be written to it, + // and COMMIT must actually reach the server + const txConn = mock.conns.find(c => c.log.some(entry => entry === "Q:BEGIN")); + expect(txConn).toBeDefined(); + const log = txConn!.log; + const beginIndex = log.indexOf("Q:BEGIN"); + const commitIndex = log.indexOf("Q:COMMIT"); + expect(commitIndex).toBeGreaterThan(beginIndex); + expect(log.slice(beginIndex + 1, commitIndex)).toEqual(["Q:select 641 as victim_q"]); + } finally { + mock.forceRelease(); + mock.server.close(); + } +}); + +// BUN_FEATURE_FLAG_DISABLE_SQL_AUTO_PIPELINING must keep at most one query in +// flight per connection: a second prepared query fired while another is in +// flight may not be written to the wire until the first one's response +// arrives. +test("pipelining feature flag keeps one query in flight per connection", async () => { + const mock = startMockServer(); + const port = await mock.listen(); + + try { + const { stdout, stderr, exitCode } = await runFixture("postgres-pool-pipeline-flag-fixture.ts", port, { + BUN_FEATURE_FLAG_DISABLE_SQL_AUTO_PIPELINING: "1", + }); + + expect({ + steps: stdout + .split(/\r?\n/) + .filter(line => line.startsWith("STEP ") || line === "DONE") + .join("\n"), + stderr: stderr.includes("WATCHDOG") ? "WATCHDOG" : "", + exitCode, + mockErrors: mock.errors, + }).toEqual({ + steps: ["STEP prepared", "STEP armed", "STEP released", "STEP q1 done", "STEP q2 done", "DONE"].join("\n"), + stderr: "", + exitCode: 0, + mockErrors: [], + }); + + // By the time the release control query was handled, only the held + // query's Bind may have reached the server. The warmup execution happens + // before arm_hold, so the expected count is exactly 2 (warmup + held q1); + // q2's Bind must arrive only after the release. + expect(mock.holdMeBindsAtRelease()).toBe(2); + const total = mock.conns.reduce( + (n, c) => n + c.log.filter(entry => entry.startsWith("B:") && entry.includes("hold_me")).length, + 0, + ); + expect(total).toBe(3); + } finally { + mock.forceRelease(); + mock.server.close(); + } +});