Skip to content
75 changes: 50 additions & 25 deletions src/js/bun/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,30 @@ interface TransactionState {
queries: Set<Query<any, any>>;
}

// closing the connection fires its close handlers; for a reservation,
// onReservedConnectionClosed returns the pool slot
function closeReservedConnection(state: TransactionState, pooledConnection) {
if (state.connectionState & ReservedConnectionState.closed) {
return;
}
state.connectionState |= ReservedConnectionState.closed;
for (const query of state.queries) {
query.cancel();
}
pooledConnection.close();
}

function reservedCloseTimeoutFired(state: TransactionState, pooledConnection, resolve: () => void) {
closeReservedConnection(state, pooledConnection);
resolve();
}

function reservedCloseDrained(timer: Timer, state: TransactionState, pooledConnection, resolve: () => void) {
clearTimeout(timer);
closeReservedConnection(state, pooledConnection);
resolve();
}

function adapterFromOptions(options: Bun.SQL.__internal.DefinedOptions) {
switch (options.adapter) {
case "postgres":
Expand Down Expand Up @@ -237,6 +261,16 @@ const SQL: typeof Bun.SQL = function SQL(
}
}

function onReservedConnectionClosed(this: TransactionState, pooledConnection, err: Error) {
onTransactionDisconnected.$call(this, err);
// The reservation holds one pool slot (queryCount/totalQueries) that is
// normally returned by reserved_sql.release(). When the underlying
// connection closes first (reserved_sql.close() or an unexpected
// disconnect), release() can no longer run, so return the slot here;
// otherwise a graceful sql.close() waits on it forever.
pool.release(pooledConnection);
}

function onReserveConnected(this: Query<any, any>, err: Error | null, pooledConnection) {
const { resolve, reject } = this;

Expand All @@ -253,7 +287,7 @@ const SQL: typeof Bun.SQL = function SQL(
queries: new Set(),
};

const onClose = onTransactionDisconnected.bind(state);
const onClose = onReservedConnectionClosed.bind(state, pooledConnection);
if (pooledConnection.onClose) {
pooledConnection.onClose(onClose);
}
Expand Down Expand Up @@ -379,49 +413,40 @@ const SQL: typeof Bun.SQL = function SQL(
return pool.flush();
};
reserved_sql.close = async (options?: { timeout?: number }) => {
const reserveQueries = state.queries;
if (
state.connectionState & ReservedConnectionState.closed ||
!(state.connectionState & ReservedConnectionState.acceptQueries)
) {
return Promise.$resolve(undefined);
}
state.connectionState &= ~ReservedConnectionState.acceptQueries;
// validate before mutating any state: throwing after clearing
// acceptQueries would strand the reservation in a state where neither
// close() nor release() can return its pool slot
let timeout = options?.timeout;
if (timeout) {
timeout = Number(timeout);
if (timeout > 2 ** 31 || timeout < 0 || timeout !== timeout) {
throw $ERR_INVALID_ARG_VALUE("options.timeout", timeout, "must be a non-negative integer less than 2^31");
}
Comment thread
claude[bot] marked this conversation as resolved.
if (timeout > 0 && (reserveQueries.size > 0 || reservedTransaction.size > 0)) {
}
state.connectionState &= ~ReservedConnectionState.acceptQueries;
if (timeout) {
if (timeout > 0 && (state.queries.size > 0 || reservedTransaction.size > 0)) {
const { promise, resolve } = Promise.withResolvers();
// race all queries vs timeout
const pending_queries = Array.from(reserveQueries);
const pending_queries = Array.from(state.queries);
const pending_transactions = Array.from(reservedTransaction);
const timer = setTimeout(() => {
state.connectionState |= ReservedConnectionState.closed;
for (const query of reserveQueries) {
(query as Query<any, any>).cancel();
}
state.connectionState |= ReservedConnectionState.closed;
pooledConnection.close();

resolve();
}, timeout * 1000);
const timer = setTimeout(reservedCloseTimeoutFired, timeout * 1000, state, pooledConnection, resolve);
timer.unref(); // dont block the event loop
Promise.all([Promise.all(pending_queries), Promise.all(pending_transactions)]).finally(() => {
clearTimeout(timer);
resolve();
});
// wait for every tracked operation to settle: one failing query must
// not cut the grace period short for the rest, and the queries' own
// consumers observe their rejections
const drained = reservedCloseDrained.bind(null, timer, state, pooledConnection, resolve);
Promise.allSettled([...pending_queries, ...pending_transactions]).then(drained);
return promise;
}
}
state.connectionState |= ReservedConnectionState.closed;
for (const query of reserveQueries) {
(query as Query<any, any>).cancel();
}

pooledConnection.close();
closeReservedConnection(state, pooledConnection);

return Promise.$resolve(undefined);
};
Expand Down
10 changes: 9 additions & 1 deletion src/js/internal/sql/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,15 @@ abstract class BaseSQLAdapter<PooledConnection extends BasePooledConnection, Con

release(connection: PooledConnection, connectingEvent: boolean = false) {
if (!connectingEvent) {
connection.queryCount--;
// A slot can be returned after the connection already closed (a query
// settling, or the reserved wrapper's close handler). #finishClose
// zeroed queryCount at that point, so only decrement what is still
// checked out; a negative count corrupts the reserved-flag handling
// below once the slot reconnects. The pool-level counter always drops:
// every release pairs with exactly one checkout that incremented it.
if (connection.queryCount > 0) {
connection.queryCount--;
}
this.totalQueries--;
}
const currentQueryCount = connection.queryCount;
Expand Down
Loading
Loading