Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 104 additions & 14 deletions src/js/internal/sql/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,16 @@ abstract class BasePooledConnection<ConnectionHandle extends { close(): void; fl
}
return true;
}
/// Bypasses `retry()`'s auth-code gate and forces a fresh handshake on
/// this slot. Only meaningful when `connectionInfo.password` is a
/// function — the re-evaluation is what lets a rotated IAM token / Vault
/// lease actually take effect on a slot whose previous handshake failed
/// with a non-retryable auth error.
forceRetry(): boolean {
if (this.adapter.closed) return false;
this.doRetry();
return true;
}
}

function closeNT(onClose: (err: Error) => void, err: Error | null) {
Expand Down Expand Up @@ -895,7 +905,13 @@ abstract class BaseSQLAdapter<PooledConnection extends BasePooledConnection, Con
{
public readonly connectionInfo: Bun.SQL.__internal.DefinedPostgresOrMySQLOptions;

/// Live pool entries. Grown lazily on demand up to `maxPoolSize` — slots
/// are appended by #tryGrowPool and only nulled out during shutdown
/// (#close), so every index < connections.length is a valid connection.
public readonly connections: PooledConnection[];
/// Hard cap on the number of connections this pool is allowed to open.
/// `connections.length` is the CURRENT size and grows from 0 up to this.
public readonly maxPoolSize: number;
public readonly readyConnections: Set<PooledConnection> = new Set();

public waitingQueue: Array<(err: Error | null, result: any) => void> = [];
Expand All @@ -908,7 +924,8 @@ abstract class BaseSQLAdapter<PooledConnection extends BasePooledConnection, Con

constructor(connectionInfo: Bun.SQL.__internal.DefinedPostgresOrMySQLOptions) {
this.connectionInfo = connectionInfo;
this.connections = new Array(connectionInfo.max);
this.connections = [];
this.maxPoolSize = connectionInfo.max;
}

protected abstract createPooledConnection(): PooledConnection;
Expand Down Expand Up @@ -1003,10 +1020,38 @@ abstract class BaseSQLAdapter<PooledConnection extends BasePooledConnection, Con

maxDistribution() {
if (!this.waitingQueue.length) return 0;
const result = Math.ceil((this.waitingQueue.length + this.totalQueries) / this.connections.length);
// Target distribution against the pool ceiling, not the current size.
// With lazy pool growth `connections.length` starts small and would
// collapse every queued query onto the first connection.
const result = Math.ceil((this.waitingQueue.length + this.totalQueries) / this.maxPoolSize);
return result ? result : 1;
}

/// Open a new connection and append it to the pool if we have room. The
/// new connection starts connecting immediately and will enter
/// `readyConnections` via `release()` once its TCP/auth handshake finishes.
/// Returns the new connection, or null if we're already at `maxPoolSize`.
#tryGrowPool(): PooledConnection | null {
if (this.closed) return null;
if (this.connections.length >= this.maxPoolSize) return null;
const connection = this.createPooledConnection();
this.connections.push(connection);
return connection;
}

/// Count connections that are still completing their handshake. A pending
/// connection will soon join `readyConnections`, so we don't need to grow
/// the pool further just because no connection is ready *right now*.
#pendingConnectionsCount(): number {
let count = 0;
const len = this.connections.length;
for (let i = 0; i < len; i++) {
const c = this.connections[i];
if (c && c.state === PooledConnectionState.pending) count++;
}
return count;
}

flushConcurrentQueries() {
const maxDistribution = this.maxDistribution();
if (maxDistribution === 0) {
Expand All @@ -1018,6 +1063,21 @@ abstract class BaseSQLAdapter<PooledConnection extends BasePooledConnection, Con
c => !(c.flags & PooledConnectionFlags.preReserved) && c.queryCount < maxDistribution,
);
if (nonReservedConnections.length === 0) {
// No idle connection can take another query. Grow the pool only if
// the number of still-handshaking connections is less than the
// backlog — otherwise those pending connections will drain the
// queue on their own once they become ready.
//
// `release()` hands freshly-connected slots to `reservedQueue`
// first (and returns early, never feeding `waitingQueue`), so up
// to `reservedQueue.length` pending sockets are already spoken for
// and don't count as capacity for `waitingQueue`.
const pending = this.#pendingConnectionsCount();
const pendingForWaiting = Math.max(0, pending - this.reservedQueue.length);
const unservedWaiters = this.waitingQueue.length - pendingForWaiting;
if (unservedWaiters > 0 && this.connections.length < this.maxPoolSize) {
this.#tryGrowPool();
}
return;
}
const orderedConnections = nonReservedConnections.sort((a, b) => a.queryCount - b.queryCount);
Expand Down Expand Up @@ -1303,29 +1363,57 @@ abstract class BaseSQLAdapter<PooledConnection extends BasePooledConnection, Con
} else {
this.waitingQueue.push(onConnected);
}
// Caller is waiting behind in-flight connections. If the pool is
// not at max yet, open another connection in parallel rather than
// letting everyone pile up behind the first socket.
this.#tryGrowPool();
} else if (!retry_in_progress) {
// impossible to connect or retry
onConnected(storedError ?? this.connectionClosedError(), null);
// Every existing slot is closed and `retry()` refused all of
// them. That's the non-retryable class of errors (bad password,
// unknown auth method, TLS failures). Opening another connection
// with the SAME `connectionInfo` will hit the identical failure
// and just burn a TCP+auth round-trip per waiter — UNLESS the
// password is supplied as a function (dynamic credential, e.g.
// IAM token), in which case a fresh attempt may genuinely pick
// up a new secret.
if (typeof this.connectionInfo.password === "function") {
if (reserved) {
this.reservedQueue.push(onConnected);
} else {
this.waitingQueue.push(onConnected);
}
// Prefer growing the pool when there's room; otherwise reuse
// one of the existing closed slots via `forceRetry()` so the
// `max: 1` case (and any already-at-max pool) can still
// recover after credentials rotate.
if (!this.#tryGrowPool()) {
for (let i = 0; i < pollSize; i++) {
const c = this.connections[i];
if (c.state === PooledConnectionState.closed && c.forceRetry()) {
break;
}
}
}
} else {
// impossible to connect or retry
onConnected(storedError ?? this.connectionClosedError(), null);
}
}
return;
}
// we never started the pool, lets start it
// We never started the pool. Only open one connection for the first
// query — further connections are opened lazily by
// `flushConcurrentQueries()` as demand grows, up to `maxPoolSize`.
if (reserved) {
this.reservedQueue.push(onConnected);
} else {
this.waitingQueue.push(onConnected);
}
this.poolStarted = true;
const pollSize = this.connections.length;
// pool is always at least 1 connection
const firstConnection = this.createPooledConnection();
this.connections[0] = firstConnection;
if (reserved) {
const firstConnection = this.#tryGrowPool();
if (reserved && firstConnection) {
firstConnection.flags |= PooledConnectionFlags.preReserved; // lets pre reserve the first connection
}
for (let i = 1; i < pollSize; i++) {
this.connections[i] = this.createPooledConnection();
}
return;
}
if (reserved) {
Expand Down Expand Up @@ -1355,8 +1443,10 @@ abstract class BaseSQLAdapter<PooledConnection extends BasePooledConnection, Con
connectionWithLeastQueries.flags |= PooledConnectionFlags.preReserved;
}

// no connection available to be reserved lets wait for a connection to be released
// No ready connection can be reserved. Grow the pool if we still can,
// so the caller isn't stuck waiting behind the currently-open sockets.
this.reservedQueue.push(onConnected);
this.#tryGrowPool();
} else {
this.waitingQueue.push(onConnected);
this.flushConcurrentQueries();
Expand Down
5 changes: 4 additions & 1 deletion test/js/sql/sql-onconnect-onclose-throw.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ test.concurrent(
// close) threw a TypeError on the holes when called from inside the callback.
// The callback is now deferred until the pool is fully constructed. Nothing
// is dialed: password() throws before the connection is created.
//
// With lazy pool growth (#30632) a single query only opens one slot even at
// max: 2, so exactly one onclose fires.
test.concurrent("postgres: pool calls from onclose are safe when connecting fails synchronously", async () => {
const fixture = /* ts */ `
import { SQL } from "bun";
Expand Down Expand Up @@ -194,7 +197,7 @@ try {
process.exit(0);
`;
const { stdout, exitCode } = await runFixture(fixture);
expect(stdout).toBe("reentry ok\nreentry ok\nquery rejected: password error\n");
expect(stdout).toBe("reentry ok\nquery rejected: password error\n");
expect(exitCode).toBe(0);
});

Expand Down
175 changes: 175 additions & 0 deletions test/js/sql/sql-pool-lazy-growth.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Issue #30632: `new Bun.SQL({ max: N })` must grow the pool lazily on demand,
// not open all N connections up-front. Uses a bare TCP listener as a drop-in
// sink so we can count the opened sockets without needing Docker or a real
// Postgres / MySQL server.
import { SQL } from "bun";
import { describe, expect, test } from "bun:test";

type Adapter = "postgres" | "mysql";

function makeSink() {
let opened = 0;
const server = Bun.listen({
hostname: "127.0.0.1",
port: 0,
socket: {
open() {
opened++;
},
data() {},
close() {},
error() {},
},
});
return {
port: server.port,
[Symbol.dispose]() {
server.stop();
},
get opened() {
return opened;
},
};
}

describe.each(["postgres", "mysql"] as Adapter[])("%s connection pool grows lazily (#30632)", adapter => {
test("a single query only opens one TCP connection, not `max`", async () => {
using sink = makeSink();
await using sql = new SQL({
adapter,
host: "127.0.0.1",
port: sink.port,
username: "x",
database: "x",
max: 50,
connectionTimeout: 1,
});

// Query fails (nothing is speaking the DB protocol on the other end);
// we only care about how many sockets Bun opened.
await sql`SELECT 1`.catch(() => {});
expect(sink.opened).toBe(1);
});
});

// Followup from #30632 review (@claude-bot / @Lillious): when a connection
// fails with a non-retryable auth error (unsupported auth method, bad
// password, TLS refused, etc.), subsequent queries must fail fast with the
// cached error — not keep opening new sockets to hit the same auth wall.
// Uses a minimal fake server that answers the startup message with an
// AuthenticationRequest carrying an unsupported auth code, which Bun rejects
// as `ERR_POSTGRES_UNSUPPORTED_AUTHENTICATION_METHOD`.
//
// Returns the listener + a counter of opened sockets. Every client write
// (the StartupMessage) gets an AuthenticationRequest with auth code 9
// (SSPI), which Bun treats as an unsupported method.
// Wire: 'R' (1 byte) + int32 length (4) + int32 auth code (4).
function makeUnsupportedAuthPgServer() {
let opened = 0;
const server = Bun.listen({
hostname: "127.0.0.1",
port: 0,
socket: {
open() {
opened++;
},
data(socket) {
const buf = Buffer.alloc(9);
buf.write("R", 0);
buf.writeInt32BE(8, 1);
buf.writeInt32BE(9, 5);
socket.write(buf);
},
close() {},
error() {},
},
});
return {
port: server.port,
[Symbol.dispose]() {
server.stop();
},
get opened() {
return opened;
},
};
}

describe("postgres pool fast-fails on non-retryable auth errors (#30632)", () => {
test("repeated queries with a static password do not open more sockets after an auth failure", async () => {
using server = makeUnsupportedAuthPgServer();
await using sql = new SQL({
adapter: "postgres",
host: "127.0.0.1",
port: server.port,
username: "x",
database: "x",
max: 20,
connectionTimeout: 1,
});

// Fire 5 sequential queries. The first one opens a connection, the
// auth handshake fails, and the remaining 4 should reject immediately
// with the cached auth error — no extra sockets.
for (let i = 0; i < 5; i++) {
await sql`SELECT ${i}`.catch(() => {});
}
expect(server.opened).toBe(1);
});

test("function password retries auth on each new query (rotatable credentials)", async () => {
// When `password` is a function, Bun re-invokes it every time it opens
// a new TCP connection, so a rotated IAM token / Vault lease can take
// effect. Verify that after an initial auth failure, subsequent
// queries actually try again — even at `max: 1` where there's no room
// to grow the pool, which forces reuse of the existing closed slot.
using server = makeUnsupportedAuthPgServer();
await using sql = new SQL({
adapter: "postgres",
host: "127.0.0.1",
port: server.port,
username: "x",
database: "x",
max: 1,
connectionTimeout: 1,
password: () => "rotating-token",
});

for (let i = 0; i < 3; i++) {
await sql`SELECT ${i}`.catch(() => {});
}
// 3 attempts, each dialing fresh TCP on the same slot.
expect(server.opened).toBe(3);
});

test("synchronous `password()` throw does not hang subsequent queries", async () => {
// `createConnection` in postgres.ts catches a thrown `password()`
// and invokes `onClose` synchronously, so `release()` drains the
// queue on the same tick `connect()` enqueues onto it. If we push
// AFTER triggering the retry path, the waiter is lost and the query
// hangs forever. Guard against that: both queries must resolve with
// the thrown error. The runner's default per-test timeout fails the
// test if anything hangs, which is the failure mode we're guarding.
await using sql = new SQL({
adapter: "postgres",
host: "127.0.0.1",
port: 1,
username: "x",
database: "x",
max: 1,
password: () => {
throw new Error("boom");
},
});

for (let i = 0; i < 2; i++) {
let err: any;
try {
await sql`SELECT ${i}`;
} catch (e) {
err = e;
}
expect(err?.message).toBe("boom");
}
});
});
Loading