diff --git a/src/js/internal/sql/mysql.ts b/src/js/internal/sql/mysql.ts index 71236467d82..187efe17d0e 100644 --- a/src/js/internal/sql/mysql.ts +++ b/src/js/internal/sql/mysql.ts @@ -1,11 +1,16 @@ import type { MySQLErrorOptions } from "internal/sql/errors"; import type { Query } from "./query"; -import type { ArrayType, DatabaseAdapter, SQLArrayParameter, SQLHelper, SQLResultArray, SSLMode } from "./shared"; -const { SQLHelper, SSLMode, SQLResultArray, buildDefinedColumnsAndQuery } = require("internal/sql/shared"); +import type { ArrayType, DatabaseAdapter, SQLArrayParameter, SQLCommand, SQLResultArray, SSLMode } from "./shared"; +const { + SQLResultArray, + BasePooledConnection, + BaseSQLAdapter, + createPooledConnectionHandle, + getHelperCommandFromDetect, +} = require("internal/sql/shared"); const { - Query, SQLQueryFlags, - symbols: { _strings, _values, _results, _handle }, + symbols: { _results, _handle }, } = require("internal/sql/query"); const { MySQLError } = require("internal/sql/errors"); @@ -114,461 +119,66 @@ export interface MySQLDotZig { ) => $ZigGeneratedClasses.MySQLQuery; } -const enum SQLCommand { - insert = 0, - update = 1, - updateSet = 2, - where = 3, - in = 4, - none = -1, -} -export type { SQLCommand }; - -function commandToString(command: SQLCommand): string { - switch (command) { - case SQLCommand.insert: - return "INSERT"; - case SQLCommand.updateSet: - case SQLCommand.update: - return "UPDATE"; - case SQLCommand.in: - case SQLCommand.where: - return "WHERE"; - default: - return ""; - } -} - -function detectCommand(query: string): SQLCommand { - const text = query.toLowerCase().trim(); - const text_len = text.length; - - let token = ""; - let command = SQLCommand.none; - let quoted = false; - // we need to reverse search so we find the closest command to the parameter - for (let i = text_len - 1; i >= 0; i--) { - const char = text[i]; - switch (char) { - case " ": // Space - case "\n": // Line feed - case "\t": // Tab character - case "\r": // Carriage return - case "\f": // Form feed - case "\v": { - switch (token) { - case "insert": { - return SQLCommand.insert; - } - case "update": { - return SQLCommand.update; - } - case "where": { - return SQLCommand.where; - } - case "set": { - return SQLCommand.updateSet; - } - case "in": { - return SQLCommand.in; - } - default: { - token = ""; - continue; - } - } - } - default: { - // skip quoted commands - if (char === '"') { - quoted = !quoted; - continue; - } - if (!quoted) { - token = char + token; - } - } - } - } - if (token) { - switch (token) { - case "insert": - return SQLCommand.insert; - case "update": - return SQLCommand.update; - case "where": - return SQLCommand.where; - case "set": - return SQLCommand.updateSet; - case "in": - case "any": - case "all": - return SQLCommand.in; - default: - return SQLCommand.none; - } - } - return command; -} -const enum PooledConnectionState { - pending = 0, - connected = 1, - closed = 2, -} - -const enum PooledConnectionFlags { - /// canBeConnected is used to indicate that at least one time we were able to connect to the database - canBeConnected = 1 << 0, - /// reserved is used to indicate that the connection is currently reserved - reserved = 1 << 1, - /// preReserved is used to indicate that the connection will be reserved in the future when queryCount drops to 0 - preReserved = 1 << 2, -} - -function onQueryFinish(this: PooledMySQLConnection, onClose: (err: Error) => void) { - this.queries.delete(onClose); - this.adapter.release(this); -} - -function closeNT(onClose: (err: Error) => void, err: Error | null) { - onClose(err as Error); -} -class PooledMySQLConnection { - private static async createConnection( - options: Bun.SQL.__internal.DefinedPostgresOrMySQLOptions, - onConnected: (err: Error | null, connection: $ZigGeneratedClasses.MySQLConnection) => void, - onClose: (err: Error | null) => void, - ): Promise<$ZigGeneratedClasses.MySQLConnection | null> { - const { - hostname, - port, - username, - tls, - query, - database, - sslMode, - idleTimeout = 0, - connectionTimeout = 30 * 1000, - maxLifetime = 0, - prepare = true, - path, - allowPublicKeyRetrieval = false, - } = options; - - let password: Bun.MaybePromise | string | undefined | (() => Bun.MaybePromise) = options.password; - - try { - if (typeof password === "function") { - password = password(); - } - - if (password && $isPromise(password)) { - password = await password; - } - - return createMySQLConnection( - hostname, - Number(port), - username || "", - password || "", - database || "", - // > The default value for sslmode is prefer. As is shown in the table, this - // makes no sense from a security point of view, and it only promises - // performance overhead if possible. It is only provided as the default for - // backward compatibility, and is not recommended in secure deployments. - sslMode || SSLMode.disable, - tls || null, - query || "", - path || "", - onConnected, - onClose, - idleTimeout, - connectionTimeout, - maxLifetime, - !prepare, - !!allowPublicKeyRetrieval, - ); - } catch (e) { - process.nextTick(closeNT, onClose, e); - return null; +class PooledMySQLConnection extends BasePooledConnection<$ZigGeneratedClasses.MySQLConnection> { + protected handleConnected(err: any, connection?: $ZigGeneratedClasses.MySQLConnection) { + if (!err) { + this.connection = connection!; } + super.handleConnected(err); } - adapter: MySQLAdapter; - connection: $ZigGeneratedClasses.MySQLConnection | null = null; - state: PooledConnectionState = PooledConnectionState.pending; - storedError: Error | null = null; - queries: Set<(err: Error) => void> = new Set(); - onFinish: ((err: Error | null) => void) | null = null; - connectionInfo: Bun.SQL.__internal.DefinedPostgresOrMySQLOptions; - flags: number = 0; - /// queryCount is used to indicate the number of queries using the connection, if a connection is reserved or if its a transaction queryCount will be 1 independently of the number of queries - queryCount: number = 0; - /// when the current connect cycle started; 0 when not connecting. Connect - /// failures (server not yet accepting connections) are retried until - /// connectionTimeout elapses from this point. - connectStartedAt: number = 0; - connectAttempts: number = 0; - retryTimer: ReturnType | null = null; - - #onConnected(err, connection) { - if (err) { - err = wrapError(err); - } else { - this.connection = connection; - } - - const connectionInfo = this.connectionInfo; - try { - // user code; a throw must not abort the pool bookkeeping below - // (the exception keeps propagating after the finally block runs) - if (connectionInfo?.onconnect) { - connectionInfo.onconnect(err); - } - } finally { - this.storedError = err; - if (!err) { - this.connectStartedAt = 0; - this.flags |= PooledConnectionFlags.canBeConnected; - } - this.state = err ? PooledConnectionState.closed : PooledConnectionState.connected; - const onFinish = this.onFinish; - if (onFinish) { - this.queryCount = 0; - this.flags &= ~PooledConnectionFlags.reserved; - this.flags &= ~PooledConnectionFlags.preReserved; - - // pool is closed, lets finish the connection - if (err) { - onFinish(err); - } else { - this.connection?.close(); - } - } else { - this.adapter.release(this, true); - } - } - } - - #onClose(err) { - if (err) { - err = wrapError(err); - } - this.connection = null; - this.storedError = err; - if (this.#shouldRetryConnecting(err)) { - // The server is not accepting connections yet (e.g. still starting - // up). Keep the slot pending and retry with backoff instead of - // failing the queries that are waiting for a connection. The user's - // onclose callback only fires when the slot actually closes. - this.connectAttempts++; - const delay = Math.min(20 * 2 ** this.connectAttempts, 1000); - this.retryTimer = setTimeout(PooledMySQLConnection.#retryTimerFired, delay, this); - return; - } - // this connect cycle is over; a later retry() starts a fresh one - this.connectStartedAt = 0; - this.#finishClose(err); - } - - static #retryTimerFired(self: PooledMySQLConnection) { - self.retryTimer = null; - // conditions may have changed during the backoff (pool closing, waiters - // gone, retry budget elapsed), so re-check before dialing - if (self.#canKeepRetrying()) { - self.#startConnection(); - } else { - self.#finishClose(self.storedError); - } - } - - #finishClose(err) { - const connectionInfo = this.connectionInfo; - try { - // user code; a throw must not abort the pool bookkeeping below - // (the exception keeps propagating after the finally block runs) - if (connectionInfo?.onclose) { - connectionInfo.onclose(err); - } - } finally { - this.state = PooledConnectionState.closed; - this.storedError = err; - - // remove from ready connections if its there - this.adapter.readyConnections.delete(this); - const queries = new Set(this.queries); - this.queries?.clear?.(); - this.queryCount = 0; - this.flags &= ~PooledConnectionFlags.reserved; - - // notify all queries that the connection is closed - for (const onClose of queries) { - onClose(err); - } - const onFinish = this.onFinish; - if (onFinish) { - onFinish(err); - } - - this.adapter.release(this, true); - } - } - - constructor(connectionInfo: Bun.SQL.__internal.DefinedMySQLOptions, adapter: MySQLAdapter) { - this.state = PooledConnectionState.pending; - this.adapter = adapter; - this.connectionInfo = connectionInfo; - this.#startConnection(); - } - - async #startConnection() { - if (this.connectStartedAt === 0) { - this.connectStartedAt = Date.now(); - this.connectAttempts = 0; - } - // store the handle right away (not only in #onConnected) so a forced + protected async startConnection() { + // store the handle right away (not only in handleConnected) so a forced // pool close can tear down a connection whose handshake is in flight - this.connection = await PooledMySQLConnection.createConnection( + this.connection = await createPooledConnectionHandle( + createMySQLConnection, this.connectionInfo, - this.#onConnected.bind(this), - this.#onClose.bind(this), + this.handleConnected.bind(this), + this.handleClose.bind(this), ); - if (this.onFinish !== null) { - // the pool was force-closed while the native handle was being created; - // close it now so onClose fires and onFinish settles - this.connection?.close(); - } } - /// Connect failures (ERR_MYSQL_CONNECTION_FAILED) mean the server - /// accepted the TCP connection but closed it before the handshake - /// completed — typically it is still starting up, or an intermediary - /// (like a container port proxy) is up before the database is. Those are - /// retried until connectionTimeout elapses, as long as queries are - /// waiting on the pool. Refused connections - /// (ERR_MYSQL_CONNECTION_REFUSED) fail fast: nothing is listening, - /// and probes/healthchecks rely on the immediate error. Real server errors (authentication, - /// handshake errors) and closes of established connections are not - /// retried here. - #shouldRetryConnecting(err: Error | null): boolean { - // connect failures come from the native layer as options objects that - // wrapError turned into MySQLError instances with a typed code - if (!(err instanceof MySQLError) || err.code !== "ERR_MYSQL_CONNECTION_FAILED") { - return false; - } - return this.#canKeepRetrying(); + protected wrapError(error: any): Error { + return wrapError(error); } - #canKeepRetrying(): boolean { - if (this.adapter.closed || this.onFinish !== null) { - return false; - } - // only retry while queries are actually waiting for a connection - if (this.adapter.waitingQueue.length === 0 && this.adapter.reservedQueue.length === 0) { - return false; - } - // an explicit connectionTimeout of 0 disables the connect timer, and with - // it the retry budget - const connectionTimeout = this.connectionInfo.connectionTimeout ?? 30 * 1000; - if (connectionTimeout <= 0) { - return false; - } - return this.connectStartedAt !== 0 && Date.now() - this.connectStartedAt < connectionTimeout; - } - - /// Returns true if a scheduled connect retry was cancelled — in that case - /// nothing is in flight and no onClose/onConnected callback will fire. - cancelRetry(): boolean { - if (this.retryTimer !== null) { - clearTimeout(this.retryTimer); - this.retryTimer = null; - return true; + protected isNonRetryableError(code: string | undefined): boolean { + switch (code) { + case "ERR_MYSQL_PASSWORD_REQUIRED": + case "ERR_MYSQL_MISSING_AUTH_DATA": + case "ERR_MYSQL_FAILED_TO_ENCRYPT_PASSWORD": + case "ERR_MYSQL_INVALID_PUBLIC_KEY": + case "ERR_MYSQL_UNSUPPORTED_PROTOCOL_VERSION": + case "ERR_MYSQL_UNSUPPORTED_AUTH_PLUGIN": + case "ERR_MYSQL_AUTHENTICATION_FAILED": + // we can't retry these are authentication errors + return true; + default: + return false; } - return false; - } - - onClose(onClose: (err: Error) => void) { - this.queries.add(onClose); } - bindQuery(query: Query, onClose: (err: Error) => void) { - this.queries.add(onClose); - query.finally(onQueryFinish.bind(this, onClose)); - } - - #doRetry() { - if (this.adapter.closed) { - return; - } - // reset error and state - this.storedError = null; - this.connectStartedAt = 0; - this.state = PooledConnectionState.pending; - // retry connection - this.#startConnection(); - } - close() { - try { - if (this.state === PooledConnectionState.connected) { - this.connection?.close(); - } - } catch {} - } - flush() { - this.connection?.flush(); - } - retry() { - // if pool is closed, we can't retry - if (this.adapter.closed) { - return false; - } - // we need to reconnect - // lets use a retry strategy - - // we can only retry if one day we are able to connect - if (this.flags & PooledConnectionFlags.canBeConnected) { - this.#doRetry(); - } else { - // analyse type of error to see if we can retry - switch (this.storedError?.code) { - case "ERR_MYSQL_PASSWORD_REQUIRED": - case "ERR_MYSQL_MISSING_AUTH_DATA": - case "ERR_MYSQL_FAILED_TO_ENCRYPT_PASSWORD": - case "ERR_MYSQL_INVALID_PUBLIC_KEY": - case "ERR_MYSQL_UNSUPPORTED_PROTOCOL_VERSION": - case "ERR_MYSQL_UNSUPPORTED_AUTH_PLUGIN": - case "ERR_MYSQL_AUTHENTICATION_FAILED": - // we can't retry these are authentication errors - return false; - default: - // we can retry - this.#doRetry(); - } - } - return true; + /// Connect failures (ERR_MYSQL_CONNECTION_FAILED) mean the server accepted + /// the TCP connection but closed it before the handshake completed, + /// typically because it is still starting up or an intermediary (like a + /// container port proxy) is up before the database is. Those are retried + /// until connectionTimeout elapses, as long as queries are waiting on the + /// pool. Refused connections (ERR_MYSQL_CONNECTION_REFUSED) fail fast: + /// nothing is listening, and probes/healthchecks rely on the immediate + /// error. Real server errors (authentication, handshake errors) and closes + /// of established connections are not retried here. + protected isConnectFailureError(err: Error | null): boolean { + return err instanceof MySQLError && (err as any).code === "ERR_MYSQL_CONNECTION_FAILED"; } } class MySQLAdapter + extends BaseSQLAdapter implements DatabaseAdapter { - public readonly connectionInfo: Bun.SQL.__internal.DefinedPostgresOrMySQLOptions; - - public readonly connections: PooledMySQLConnection[]; - public readonly readyConnections: Set = new Set(); - - public waitingQueue: Array<(err: Error | null, result: any) => void> = []; - public reservedQueue: Array<(err: Error | null, result: any) => void> = []; - - public poolStarted: boolean = false; - public closed: boolean = false; - public totalQueries: number = 0; - public onAllQueriesFinished: (() => void) | null = null; - - constructor(connectionInfo: Bun.SQL.__internal.DefinedPostgresOrMySQLOptions) { - this.connectionInfo = connectionInfo; - this.connections = new Array(connectionInfo.max); + protected createPooledConnection(): PooledMySQLConnection { + return new PooledMySQLConnection(this.connectionInfo, this); } escapeIdentifier(str: string) { @@ -595,25 +205,12 @@ class MySQLAdapter code: "ERR_MYSQL_INVALID_TRANSACTION_STATE", }); } - supportsReservedConnections() { - return true; - } - - getConnectionForQuery(pooledConnection: PooledMySQLConnection) { - return pooledConnection.connection; - } - - attachConnectionCloseHandler(connection: PooledMySQLConnection, handler: () => void): void { - if (connection.onClose) { - connection.onClose(handler); - } + unsafeTransactionError() { + return new MySQLError("Only use sql.begin, sql.reserved or max: 1", { + code: "ERR_MYSQL_UNSAFE_TRANSACTION", + }); } - detachConnectionCloseHandler(connection: PooledMySQLConnection, handler: () => void): void { - if (connection.queries) { - connection.queries.delete(handler); - } - } array(_values: any[], _typeNameOrID?: number | ArrayType): SQLArrayParameter { throw new Error("MySQL doesn't support arrays"); } @@ -649,28 +246,6 @@ class MySQLAdapter }; } - validateTransactionOptions(options: string): { valid: boolean; error?: string } { - // The string is interpolated into `START TRANSACTION ${options}`, so refuse anything - // that could terminate the statement or start a new one. - if (!/^[A-Za-z ,]*$/.test(options)) { - return { - valid: false, - error: "Transaction options can only contain letters, spaces, and commas.", - }; - } - return { valid: true }; - } - - validateDistributedTransactionName(name: string): { valid: boolean; error?: string } { - if (name.indexOf("'") !== -1) { - return { - valid: false, - error: "Distributed transaction name cannot contain single quotes.", - }; - } - return { valid: true }; - } - getCommitDistributedSQL(name: string): string { const validation = this.validateDistributedTransactionName(name); if (!validation.valid) { @@ -688,16 +263,7 @@ class MySQLAdapter } createQueryHandle(sql: string, values: unknown[], flags: number) { - if (!(flags & SQLQueryFlags.allowUnsafeTransaction)) { - if (this.connectionInfo.max !== 1) { - const upperCaseSqlString = sql.toUpperCase().trim(); - if (upperCaseSqlString.startsWith("BEGIN") || upperCaseSqlString.startsWith("START TRANSACTION")) { - throw new MySQLError("Only use sql.begin, sql.reserved or max: 1", { - code: "ERR_MYSQL_UNSAFE_TRANSACTION", - }); - } - } - } + this.checkUnsafeTransaction(sql, flags); return createMySQLQuery( sql, @@ -709,571 +275,15 @@ class MySQLAdapter ); } - maxDistribution() { - if (!this.waitingQueue.length) return 0; - const result = Math.ceil((this.waitingQueue.length + this.totalQueries) / this.connections.length); - return result ? result : 1; - } - - flushConcurrentQueries() { - const maxDistribution = this.maxDistribution(); - if (maxDistribution === 0) { - return; - } - - while (true) { - const nonReservedConnections = Array.from(this.readyConnections).filter( - c => !(c.flags & PooledConnectionFlags.preReserved) && c.queryCount < maxDistribution, - ); - if (nonReservedConnections.length === 0) { - return; - } - const orderedConnections = nonReservedConnections.sort((a, b) => a.queryCount - b.queryCount); - for (const connection of orderedConnections) { - const pending = this.waitingQueue.shift(); - if (!pending) { - return; - } - connection.queryCount++; - this.totalQueries++; - pending(null, connection); - } - } - } - - release(connection: PooledMySQLConnection, connectingEvent: boolean = false) { - if (!connectingEvent) { - connection.queryCount--; - this.totalQueries--; - } - const currentQueryCount = connection.queryCount; - if (currentQueryCount == 0) { - connection.flags &= ~PooledConnectionFlags.reserved; - connection.flags &= ~PooledConnectionFlags.preReserved; - } - if (this.onAllQueriesFinished) { - // we are waiting for all queries to finish, lets check if we can call it - if (!this.hasPendingQueries()) { - this.onAllQueriesFinished(); - } - } - - if (connection.state !== PooledConnectionState.connected) { - // connection is not ready - if (connection.storedError) { - // this connection got a error but maybe we can wait for another - - if (this.hasConnectionsAvailable()) { - return; - } - - const waitingQueue = this.waitingQueue; - const reservedQueue = this.reservedQueue; - - this.waitingQueue = []; - this.reservedQueue = []; - // we have no connections available so lets fails - for (const pending of waitingQueue) { - pending(connection.storedError, connection); - } - for (const pending of reservedQueue) { - pending(connection.storedError, connection); - } - // draining the queues may have been the last pending work — a - // graceful close() is waiting on this callback - if (this.onAllQueriesFinished && !this.hasPendingQueries()) { - this.onAllQueriesFinished(); - } - } - return; - } - - if (currentQueryCount == 0) { - // ok we can actually bind reserved queries to it - const pendingReserved = this.reservedQueue.shift(); - if (pendingReserved) { - connection.flags |= PooledConnectionFlags.reserved; - connection.queryCount++; - this.totalQueries++; - // we have a connection waiting for a reserved connection lets prioritize it - pendingReserved(connection.storedError, connection); - return; - } - } - this.readyConnections.add(connection); - this.flushConcurrentQueries(); - } - - hasConnectionsAvailable() { - if (this.readyConnections.size > 0) return true; - if (this.poolStarted) { - const pollSize = this.connections.length; - for (let i = 0; i < pollSize; i++) { - const connection = this.connections[i]; - if (connection.state !== PooledConnectionState.closed) { - // some connection is connecting or connected - return true; - } - } - } - return false; - } - - hasPendingQueries() { - if (this.waitingQueue.length > 0 || this.reservedQueue.length > 0) return true; - if (this.poolStarted) { - return this.totalQueries > 0; - } - return false; - } - isConnected() { - if (this.readyConnections.size > 0) { - return true; - } - if (this.poolStarted) { - const pollSize = this.connections.length; - for (let i = 0; i < pollSize; i++) { - const connection = this.connections[i]; - if (connection.state === PooledConnectionState.connected) { - return true; - } - } - } - return false; - } - flush() { - if (this.closed) { - return; - } - if (this.poolStarted) { - const pollSize = this.connections.length; - for (let i = 0; i < pollSize; i++) { - const connection = this.connections[i]; - if (connection.state === PooledConnectionState.connected) { - connection.connection?.flush(); - } - } - } - } - - async #close() { - let pending; - while ((pending = this.waitingQueue.shift())) { - pending(this.connectionClosedError(), null); - } - while (this.reservedQueue.length > 0) { - const pendingReserved = this.reservedQueue.shift(); - if (pendingReserved) { - pendingReserved(this.connectionClosedError(), null); - } - } - - const promises: Array> = []; - - if (this.poolStarted) { - this.poolStarted = false; - const pollSize = this.connections.length; - for (let i = 0; i < pollSize; i++) { - const connection = this.connections[i]; - switch (connection.state) { - case PooledConnectionState.pending: - { - if (connection.cancelRetry()) { - // a connect retry was scheduled; nothing is in flight so - // there is no onClose/onConnected to wait for - connection.state = PooledConnectionState.closed; - break; - } - const { promise, resolve } = Promise.withResolvers(); - connection.onFinish = resolve; - promises.push(promise); - connection.connection?.close(); - } - break; - - case PooledConnectionState.connected: - { - const { promise, resolve } = Promise.withResolvers(); - connection.onFinish = resolve; - promises.push(promise); - connection.connection?.close(); - } - break; - } - // clean connection reference - // @ts-ignore - this.connections[i] = null; - } - } - - this.readyConnections.clear(); - this.waitingQueue.length = 0; - return Promise.all(promises); + getHelperCommand(query: string): SQLCommand { + return getHelperCommandFromDetect(query, true); } - async close(options?: { timeout?: number }) { - if (this.closed) { - return; - } - - let timeout = options?.timeout; - if (timeout) { - timeout = Number(timeout); - if (timeout > 2 ** 31 || timeout < 0 || timeout !== timeout) { - throw $ERR_INVALID_ARG_VALUE("options.timeout", timeout, "must be a non-negative integer less than 2^31"); - } - - this.closed = true; - if (timeout === 0 || !this.hasPendingQueries()) { - // close immediately - await this.#close(); - return; - } - - const { promise, resolve } = Promise.withResolvers(); - const timer = setTimeout(() => { - // timeout is reached, lets close and probably fail some queries - this.#close().finally(resolve); - }, timeout * 1000); - timer.unref(); // dont block the event loop - - this.onAllQueriesFinished = () => { - clearTimeout(timer); - // everything is closed, lets close the pool - this.#close().finally(resolve); - }; - - return promise; - } else { - this.closed = true; - if (!this.hasPendingQueries()) { - // close immediately - await this.#close(); - return; - } - - // gracefully close the pool - const { promise, resolve } = Promise.withResolvers(); - - this.onAllQueriesFinished = () => { - // everything is closed, lets close the pool - this.#close().finally(resolve); - }; - - return promise; - } - } - - /** - * @param {function} onConnected - The callback function to be called when the connection is established. - * @param {boolean} reserved - Whether the connection is reserved, if is reserved the connection will not be released until release is called, if not release will only decrement the queryCount counter - */ - connect(onConnected: (err: Error | null, result: any) => void, reserved: boolean = false) { - if (this.closed) { - return onConnected(this.connectionClosedError(), null); - } - - if (this.readyConnections.size === 0) { - // no connection ready lets make some - let retry_in_progress = false; - let all_closed = true; - let storedError: Error | null = null; - - if (this.poolStarted) { - // we already started the pool - // lets check if some connection is available to retry - const pollSize = this.connections.length; - for (let i = 0; i < pollSize; i++) { - const connection = this.connections[i]; - // we need a new connection and we have some connections that can retry - if (connection.state === PooledConnectionState.closed) { - if (connection.retry()) { - // lets wait for connection to be released - if (!retry_in_progress) { - // avoid adding to the queue twice, we wanna to retry every available pool connection - retry_in_progress = true; - if (reserved) { - // we are not sure what connection will be available so we dont pre reserve - this.reservedQueue.push(onConnected); - } else { - this.waitingQueue.push(onConnected); - } - } - } else { - // we have some error, lets grab it and fail if unable to start a connection - storedError = connection.storedError; - } - } else { - // we have some pending or open connections - all_closed = false; - } - } - if (!all_closed && !retry_in_progress) { - // is possible to connect because we have some working connections, or we are just without network for some reason - // wait for connection to be released or fail - if (reserved) { - // we are not sure what connection will be available so we dont pre reserve - this.reservedQueue.push(onConnected); - } else { - this.waitingQueue.push(onConnected); - } - } else if (!retry_in_progress) { - // impossible to connect or retry - onConnected(storedError ?? this.connectionClosedError(), null); - } - return; - } - // we never started the pool, lets start it - if (reserved) { - this.reservedQueue.push(onConnected); - } else { - this.waitingQueue.push(onConnected); - } - this.poolStarted = true; - const pollSize = this.connections.length; - // pool is always at least 1 connection - const firstConnection = new PooledMySQLConnection(this.connectionInfo, this); - this.connections[0] = firstConnection; - if (reserved) { - firstConnection.flags |= PooledConnectionFlags.preReserved; // lets pre reserve the first connection - } - for (let i = 1; i < pollSize; i++) { - this.connections[i] = new PooledMySQLConnection(this.connectionInfo, this); - } - return; - } - if (reserved) { - let connectionWithLeastQueries: PooledMySQLConnection | null = null; - let leastQueries = Infinity; - for (const connection of this.readyConnections) { - if (connection.flags & PooledConnectionFlags.preReserved || connection.flags & PooledConnectionFlags.reserved) - continue; - const queryCount = connection.queryCount; - if (queryCount > 0) { - if (queryCount < leastQueries) { - leastQueries = queryCount; - connectionWithLeastQueries = connection; - } - continue; - } - connection.flags |= PooledConnectionFlags.reserved; - connection.queryCount++; - this.totalQueries++; - this.readyConnections.delete(connection); - onConnected(null, connection); - return; - } - - if (connectionWithLeastQueries) { - // lets mark the connection with the least queries as preReserved if any - connectionWithLeastQueries.flags |= PooledConnectionFlags.preReserved; - } - - // no connection available to be reserved lets wait for a connection to be released - this.reservedQueue.push(onConnected); - } else { - this.waitingQueue.push(onConnected); - this.flushConcurrentQueries(); - } - } - - normalizeQuery(strings: string | TemplateStringsArray, values: unknown[], binding_idx = 1): [string, unknown[]] { - if (typeof strings === "string") { - // identifier or unsafe query - return [strings, values || []]; - } - - if (!$isArray(strings)) { - // we should not hit this path - throw new SyntaxError("Invalid query: SQL Fragment cannot be executed or was misused"); - } - - const str_len = strings.length; - if (str_len === 0) { - return ["", []]; - } - - let binding_values: any[] = []; - let query = ""; - - for (let i = 0; i < str_len; i++) { - const string = strings[i]; - - if (typeof string === "string") { - query += string; - - if (values.length > i) { - const value = values[i]; - - if (value instanceof Query) { - const q = value as Query; - const [sub_query, sub_values] = this.normalizeQuery(q[_strings], q[_values], binding_idx); - - query += sub_query; - for (let j = 0; j < sub_values.length; j++) { - binding_values.push(sub_values[j]); - } - binding_idx += sub_values.length; - } else if (value instanceof SQLHelper) { - const command = detectCommand(query); - // only selectIn, insert, update, updateSet are allowed - if (command === SQLCommand.none || command === SQLCommand.where) { - throw new SyntaxError("Helpers are only allowed for INSERT, UPDATE and IN commands"); - } - const { columns, value: items } = value as SQLHelper; - const columnCount = columns.length; - if (columnCount === 0 && command !== SQLCommand.in) { - throw new SyntaxError(`Cannot ${commandToString(command)} with no columns`); - } - const lastColumnIndex = columns.length - 1; - - if (command === SQLCommand.insert) { - // - // insert into users ${sql(users)} or insert into users ${sql(user)} - // - - // Build column list while determining which columns have at least one defined value - const { definedColumns, columnsSql } = buildDefinedColumnsAndQuery( - columns, - items, - this.escapeIdentifier.bind(this), - ); - - const definedColumnCount = definedColumns.length; - if (definedColumnCount === 0) { - throw new SyntaxError("Insert needs to have at least one column with a defined value"); - } - const lastDefinedColumnIndex = definedColumnCount - 1; - - query += columnsSql; - if ($isArray(items)) { - const itemsCount = items.length; - const lastItemIndex = itemsCount - 1; - for (let j = 0; j < itemsCount; j++) { - query += "("; - const item = items[j]; - for (let k = 0; k < definedColumnCount; k++) { - const column = definedColumns[k]; - const columnValue = item[column]; - query += `?${k < lastDefinedColumnIndex ? ", " : ""}`; - // If this item has undefined for a column that other items defined, use null - binding_values.push(typeof columnValue === "undefined" ? null : columnValue); - } - if (j < lastItemIndex) { - query += "),"; - } else { - query += ") "; // the user can add RETURNING * or RETURNING id - } - } - } else { - query += "("; - const item = items; - for (let j = 0; j < definedColumnCount; j++) { - const column = definedColumns[j]; - const columnValue = item[column]; - query += `?${j < lastDefinedColumnIndex ? ", " : ""}`; - binding_values.push(columnValue); - } - query += ") "; // the user can add RETURNING * or RETURNING id - } - } else if (command === SQLCommand.in) { - // SELECT * FROM users WHERE id IN (${sql([1, 2, 3])}) - if (!$isArray(items)) { - throw new SyntaxError("An array of values is required for WHERE IN helper"); - } - const itemsCount = items.length; - const lastItemIndex = itemsCount - 1; - query += "("; - for (let j = 0; j < itemsCount; j++) { - query += `?${j < lastItemIndex ? ", " : ""}`; - if (columnCount > 0) { - // we must use a key from a object - if (columnCount > 1) { - // we should not pass multiple columns here - throw new SyntaxError("Cannot use WHERE IN helper with multiple columns"); - } - // SELECT * FROM users WHERE id IN (${sql(users, "id")}) - const value = items[j]; - if (typeof value === "undefined") { - binding_values.push(null); - } else { - const value_from_key = value[columns[0]]; - - if (typeof value_from_key === "undefined") { - binding_values.push(null); - } else { - binding_values.push(value_from_key); - } - } - } else { - const value = items[j]; - if (typeof value === "undefined") { - binding_values.push(null); - } else { - binding_values.push(value); - } - } - } - query += ") "; // more conditions can be added after this - } else { - // UPDATE users SET ${sql({ name: "John", age: 31 })} WHERE id = 1 - let item; - if ($isArray(items)) { - if (items.length > 1) { - throw new SyntaxError("Cannot use array of objects for UPDATE"); - } - item = items[0]; - } else { - item = items; - } - // no need to include if is updateSet or upsert - const isUpsert = query.trimEnd().endsWith("ON DUPLICATE KEY UPDATE"); - if (command === SQLCommand.update && !isUpsert) { - query += " SET "; - } - let hasValues = false; - for (let i = 0; i < columnCount; i++) { - const column = columns[i]; - const columnValue = item[column]; - if (typeof columnValue === "undefined") { - // skip undefined values, this is the expected behavior in JS - continue; - } - hasValues = true; - query += `${this.escapeIdentifier(column)} = ?${i < lastColumnIndex ? ", " : ""}`; - binding_values.push(columnValue); - } - if (query.endsWith(", ")) { - // we got an undefined value at the end, lets remove the last comma - query = query.substring(0, query.length - 2); - } - if (!hasValues) { - throw new SyntaxError("Update needs to have at least one column"); - } - query += " "; // the user can add where clause after this - } - } else { - //TODO: handle sql.array parameters - query += `? `; - if (typeof value === "undefined") { - binding_values.push(null); - } else { - binding_values.push(value); - } - } - } - } else { - throw new SyntaxError("Invalid query: SQL Fragment cannot be executed or was misused"); - } - } - - return [query, binding_values]; + isUpsertUpdate(query: string): boolean { + return query.trimEnd().endsWith("ON DUPLICATE KEY UPDATE"); } } export default { MySQLAdapter, - commandToString, - detectCommand, - SQLCommand, }; diff --git a/src/js/internal/sql/postgres.ts b/src/js/internal/sql/postgres.ts index 177112734ec..c06ba2d60de 100644 --- a/src/js/internal/sql/postgres.ts +++ b/src/js/internal/sql/postgres.ts @@ -1,17 +1,18 @@ import type { PostgresErrorOptions } from "internal/sql/errors"; import type { Query } from "./query"; -import type { ArrayType, DatabaseAdapter, SQLArrayParameter, SQLHelper, SQLResultArray, SSLMode } from "./shared"; +import type { ArrayType, DatabaseAdapter, SQLArrayParameter, SQLCommand, SQLResultArray, SSLMode } from "./shared"; const { - SQLHelper, - SSLMode, SQLResultArray, SQLArrayParameter, - buildDefinedColumnsAndQuery, + BasePooledConnection, + BaseSQLAdapter, + createPooledConnectionHandle, + getHelperCommandFromDetect, + pushBindParam, } = require("internal/sql/shared"); const { - Query, SQLQueryFlags, - symbols: { _strings, _values, _flags, _results, _handle }, + symbols: { _results, _handle }, } = require("internal/sql/query"); function isTypedArray(value: any) { // Buffer should be treated as a normal object @@ -346,437 +347,57 @@ export interface PostgresDotZig { ) => $ZigGeneratedClasses.PostgresSQLQuery; } -const enum SQLCommand { - insert = 0, - update = 1, - updateSet = 2, - where = 3, - in = 4, - none = -1, -} -export type { SQLCommand }; - -function commandToString(command: SQLCommand): string { - switch (command) { - case SQLCommand.insert: - return "INSERT"; - case SQLCommand.updateSet: - case SQLCommand.update: - return "UPDATE"; - case SQLCommand.in: - case SQLCommand.where: - return "WHERE"; - default: - return ""; - } -} - -function detectCommand(query: string): SQLCommand { - const text = query.toLowerCase().trim(); - const text_len = text.length; - - let token = ""; - let command = SQLCommand.none; - let quoted = false; - // we need to reverse search so we find the closest command to the parameter - for (let i = text_len - 1; i >= 0; i--) { - const char = text[i]; - switch (char) { - case " ": // Space - case "\n": // Line feed - case "\t": // Tab character - case "\r": // Carriage return - case "\f": // Form feed - case "\v": { - switch (token) { - case "insert": { - return SQLCommand.insert; - } - case "update": { - return SQLCommand.update; - } - case "where": { - return SQLCommand.where; - } - case "set": { - return SQLCommand.updateSet; - } - case "in": { - return SQLCommand.in; - } - default: { - token = ""; - continue; - } - } - } - default: { - // skip quoted commands - if (char === '"') { - quoted = !quoted; - continue; - } - if (!quoted) { - token = char + token; - } - } - } - } - if (token) { - switch (token) { - case "insert": - return SQLCommand.insert; - case "update": - return SQLCommand.update; - case "where": - return SQLCommand.where; - case "set": - return SQLCommand.updateSet; - case "in": - return SQLCommand.in; - default: - return SQLCommand.none; - } - } - return command; -} - -const enum PooledConnectionState { - pending = 0, - connected = 1, - closed = 2, -} - -const enum PooledConnectionFlags { - /// canBeConnected is used to indicate that at least one time we were able to connect to the database - canBeConnected = 1 << 0, - /// reserved is used to indicate that the connection is currently reserved - reserved = 1 << 1, - /// preReserved is used to indicate that the connection will be reserved in the future when queryCount drops to 0 - preReserved = 1 << 2, -} - -function onQueryFinish(this: PooledPostgresConnection, onClose: (err: Error) => void) { - this.queries.delete(onClose); - this.adapter.release(this); -} - -function closeNT(onClose: (err: Error) => void, err: Error | null) { - onClose(err as Error); -} - -class PooledPostgresConnection { - private static async createConnection( - options: Bun.SQL.__internal.DefinedPostgresOrMySQLOptions, - onConnected: (err: Error | null, connection: $ZigGeneratedClasses.PostgresSQLConnection) => void, - onClose: (err: Error | null) => void, - ): Promise<$ZigGeneratedClasses.PostgresSQLConnection | null> { - const { - hostname, - port, - username, - tls, - query, - database, - sslMode, - idleTimeout = 0, - connectionTimeout = 30 * 1000, - maxLifetime = 0, - prepare = true, - path, - } = options; - - let password: Bun.MaybePromise | string | undefined | (() => Bun.MaybePromise) = options.password; - - try { - if (typeof password === "function") { - password = password(); - } - - if (password && $isPromise(password)) { - password = await password; - } - - return createPostgresConnection( - hostname, - Number(port), - username || "", - password || "", - database || "", - // > The default value for sslmode is prefer. As is shown in the table, this - // makes no sense from a security point of view, and it only promises - // performance overhead if possible. It is only provided as the default for - // backward compatibility, and is not recommended in secure deployments. - sslMode || SSLMode.disable, - tls || null, - query || "", - path || "", - onConnected, - onClose, - idleTimeout, - connectionTimeout, - maxLifetime, - !prepare, - ); - } catch (e) { - // defer so the callback never runs while the adapter is still filling - // this.connections (it scans that array); mysql.ts does the same - process.nextTick(closeNT, onClose, e); - return null; - } - } - - adapter: PostgresAdapter; - connection: $ZigGeneratedClasses.PostgresSQLConnection | null = null; - state: PooledConnectionState = PooledConnectionState.pending; - storedError: Error | null = null; - queries: Set<(err: Error) => void> = new Set(); - onFinish: ((err: Error | null) => void) | null = null; - connectionInfo: Bun.SQL.__internal.DefinedPostgresOrMySQLOptions; - flags: number = 0; - /// queryCount is used to indicate the number of queries using the connection, if a connection is reserved or if its a transaction queryCount will be 1 independently of the number of queries - queryCount: number = 0; - /// when the current connect cycle started; 0 when not connecting. Connect - /// failures (server not yet accepting connections) are retried until - /// connectionTimeout elapses from this point. - connectStartedAt: number = 0; - connectAttempts: number = 0; - retryTimer: ReturnType | null = null; - - #onConnected(err, _) { - if (err) { - err = wrapPostgresError(err); - } - const connectionInfo = this.connectionInfo; - try { - // user code; a throw must not abort the pool bookkeeping below - // (the exception keeps propagating after the finally block runs) - if (connectionInfo?.onconnect) { - connectionInfo.onconnect(err); - } - } finally { - this.storedError = err; - if (!err) { - this.connectStartedAt = 0; - this.flags |= PooledConnectionFlags.canBeConnected; - } - this.state = err ? PooledConnectionState.closed : PooledConnectionState.connected; - const onFinish = this.onFinish; - if (onFinish) { - this.queryCount = 0; - this.flags &= ~PooledConnectionFlags.reserved; - this.flags &= ~PooledConnectionFlags.preReserved; - - // pool is closed, lets finish the connection - if (err) { - onFinish(err); - } else { - this.connection?.close(); - } - } else { - this.adapter.release(this, true); - } - } - } - - #onClose(err) { - if (err) { - err = wrapPostgresError(err); - } - this.connection = null; - this.storedError = err; - if (this.#shouldRetryConnecting(err)) { - // The server is not accepting connections yet (e.g. still starting - // up). Keep the slot pending and retry with backoff instead of - // failing the queries that are waiting for a connection. The user's - // onclose callback only fires when the slot actually closes. - this.connectAttempts++; - const delay = Math.min(20 * 2 ** this.connectAttempts, 1000); - this.retryTimer = setTimeout(PooledPostgresConnection.#retryTimerFired, delay, this); - return; - } - // this connect cycle is over; a later retry() starts a fresh one - this.connectStartedAt = 0; - this.#finishClose(err); - } - - static #retryTimerFired(self: PooledPostgresConnection) { - self.retryTimer = null; - // conditions may have changed during the backoff (pool closing, waiters - // gone, retry budget elapsed), so re-check before dialing - if (self.#canKeepRetrying()) { - self.#startConnection(); - } else { - self.#finishClose(self.storedError); - } - } - - #finishClose(err) { - const connectionInfo = this.connectionInfo; - try { - // user code; a throw must not abort the pool bookkeeping below - // (the exception keeps propagating after the finally block runs) - if (connectionInfo?.onclose) { - connectionInfo.onclose(err); - } - } finally { - this.state = PooledConnectionState.closed; - this.storedError = err; - - // remove from ready connections if its there - this.adapter.readyConnections?.delete(this); - const queries = new Set(this.queries); - this.queries?.clear?.(); - this.queryCount = 0; - this.flags &= ~PooledConnectionFlags.reserved; - - // notify all queries that the connection is closed - for (const onClose of queries) { - onClose(err); - } - const onFinish = this.onFinish; - if (onFinish) { - onFinish(err); - } - - this.adapter.release(this, true); - } +class PooledPostgresConnection extends BasePooledConnection<$ZigGeneratedClasses.PostgresSQLConnection> { + protected async startConnection() { + this.connection = await createPooledConnectionHandle( + createPostgresConnection, + this.connectionInfo, + this.handleConnected.bind(this), + this.handleClose.bind(this), + ); } - constructor(connectionInfo: Bun.SQL.__internal.DefinedPostgresOrMySQLOptions, adapter: PostgresAdapter) { - this.state = PooledConnectionState.pending; - this.adapter = adapter; - this.connectionInfo = connectionInfo; - this.#startConnection(); + protected wrapError(error: any): Error { + return wrapPostgresError(error); } - async #startConnection() { - if (this.connectStartedAt === 0) { - this.connectStartedAt = Date.now(); - this.connectAttempts = 0; - } - this.connection = await PooledPostgresConnection.createConnection( - this.connectionInfo, - this.#onConnected.bind(this), - this.#onClose.bind(this), - ); - if (this.onFinish !== null) { - // the pool was force-closed while the native handle was being created; - // close it now so onClose fires and onFinish settles - this.connection?.close(); + protected isNonRetryableError(code: string | undefined): boolean { + switch (code) { + case "ERR_POSTGRES_UNSUPPORTED_AUTHENTICATION_METHOD": + case "ERR_POSTGRES_UNKNOWN_AUTHENTICATION_METHOD": + case "ERR_POSTGRES_TLS_NOT_AVAILABLE": + case "ERR_POSTGRES_TLS_UPGRADE_FAILED": + case "ERR_POSTGRES_INVALID_SERVER_SIGNATURE": + case "ERR_POSTGRES_INVALID_SERVER_KEY": + case "ERR_POSTGRES_AUTHENTICATION_FAILED_PBKDF2": + // we can't retry these are authentication errors + return true; + default: + return false; } } /// Connect failures (ERR_POSTGRES_CONNECTION_FAILED) mean the server /// accepted the TCP connection but closed it before the handshake - /// completed — typically it is still starting up, or an intermediary + /// completed, typically because it is still starting up or an intermediary /// (like a container port proxy) is up before the database is. Those are - /// retried until connectionTimeout elapses, as long as queries are - /// waiting on the pool. Refused connections - /// (ERR_POSTGRES_CONNECTION_REFUSED) fail fast: nothing is listening, - /// and probes/healthchecks rely on the immediate error. Real server errors (authentication, - /// ErrorResponse during startup) and closes of established connections are - /// not retried here. - #shouldRetryConnecting(err: Error | null): boolean { - // connect failures come from the native layer as options objects that - // wrapPostgresError turned into PostgresError instances with a typed code - if (!(err instanceof PostgresError) || err.code !== "ERR_POSTGRES_CONNECTION_FAILED") { - return false; - } - return this.#canKeepRetrying(); - } - - #canKeepRetrying(): boolean { - if (this.adapter.closed || this.onFinish !== null) { - return false; - } - // only retry while queries are actually waiting for a connection - if (this.adapter.waitingQueue.length === 0 && this.adapter.reservedQueue.length === 0) { - return false; - } - // an explicit connectionTimeout of 0 disables the connect timer, and with - // it the retry budget - const connectionTimeout = this.connectionInfo.connectionTimeout ?? 30 * 1000; - if (connectionTimeout <= 0) { - return false; - } - return this.connectStartedAt !== 0 && Date.now() - this.connectStartedAt < connectionTimeout; - } - - /// Returns true if a scheduled connect retry was cancelled — in that case - /// nothing is in flight and no onClose/onConnected callback will fire. - cancelRetry(): boolean { - if (this.retryTimer !== null) { - clearTimeout(this.retryTimer); - this.retryTimer = null; - return true; - } - return false; - } - - onClose(onClose: (err: Error) => void) { - this.queries.add(onClose); - } - - bindQuery(query: Query, onClose: (err: Error) => void) { - this.queries.add(onClose); - query.finally(onQueryFinish.bind(this, onClose)); - } - - #doRetry() { - if (this.adapter.closed) { - return; - } - // reset error and state - this.storedError = null; - this.connectStartedAt = 0; - this.state = PooledConnectionState.pending; - // retry connection - this.#startConnection(); - } - close() { - try { - if (this.state === PooledConnectionState.connected) { - this.connection?.close(); - } - } catch {} - } - flush() { - this.connection?.flush(); - } - retry() { - // if pool is closed, we can't retry - if (this.adapter.closed) { - return false; - } - // we need to reconnect - // lets use a retry strategy - - // we can only retry if one day we are able to connect - if (this.flags & PooledConnectionFlags.canBeConnected) { - this.#doRetry(); - } else { - // analyse type of error to see if we can retry - switch (this.storedError?.code) { - case "ERR_POSTGRES_UNSUPPORTED_AUTHENTICATION_METHOD": - case "ERR_POSTGRES_UNKNOWN_AUTHENTICATION_METHOD": - case "ERR_POSTGRES_TLS_NOT_AVAILABLE": - case "ERR_POSTGRES_TLS_UPGRADE_FAILED": - case "ERR_POSTGRES_INVALID_SERVER_SIGNATURE": - case "ERR_POSTGRES_INVALID_SERVER_KEY": - case "ERR_POSTGRES_AUTHENTICATION_FAILED_PBKDF2": - // we can't retry these are authentication errors - return false; - default: - // we can retry - this.#doRetry(); - } - } - return true; + /// retried until connectionTimeout elapses, as long as queries are waiting + /// on the pool. Refused connections (ERR_POSTGRES_CONNECTION_REFUSED) fail + /// fast: nothing is listening, and probes/healthchecks rely on the + /// immediate error. Real server errors (authentication, ErrorResponse + /// during startup) and closes of established connections are not retried + /// here. + protected isConnectFailureError(err: Error | null): boolean { + return err instanceof PostgresError && (err as any).code === "ERR_POSTGRES_CONNECTION_FAILED"; } } class PostgresAdapter + extends BaseSQLAdapter< + PooledPostgresConnection, + $ZigGeneratedClasses.PostgresSQLConnection, + $ZigGeneratedClasses.PostgresSQLQuery + > implements DatabaseAdapter< PooledPostgresConnection, @@ -784,23 +405,8 @@ class PostgresAdapter $ZigGeneratedClasses.PostgresSQLQuery > { - public readonly connectionInfo: Bun.SQL.__internal.DefinedPostgresOrMySQLOptions; - - public readonly connections: PooledPostgresConnection[]; - public readonly readyConnections: Set; - - public waitingQueue: Array<(err: Error | null, result: any) => void> = []; - public reservedQueue: Array<(err: Error | null, result: any) => void> = []; - - public poolStarted: boolean = false; - public closed: boolean = false; - public totalQueries: number = 0; - public onAllQueriesFinished: (() => void) | null = null; - - constructor(connectionInfo: Bun.SQL.__internal.DefinedPostgresOrMySQLOptions) { - this.connectionInfo = connectionInfo; - this.connections = new Array(connectionInfo.max); - this.readyConnections = new Set(); + protected createPooledConnection(): PooledPostgresConnection { + return new PooledPostgresConnection(this.connectionInfo, this); } escapeIdentifier(str: string) { @@ -827,26 +433,10 @@ class PostgresAdapter code: "ERR_POSTGRES_INVALID_TRANSACTION_STATE", }); } - supportsReservedConnections() { - return true; - } - - getConnectionForQuery(pooledConnection: PooledPostgresConnection) { - return pooledConnection.connection; - } - - attachConnectionCloseHandler(connection: PooledPostgresConnection, handler: () => void): void { - // PostgreSQL pooled connections support onClose handlers - if (connection.onClose) { - connection.onClose(handler); - } - } - - detachConnectionCloseHandler(connection: PooledPostgresConnection, handler: () => void): void { - // PostgreSQL pooled connections track queries - if (connection.queries) { - connection.queries.delete(handler); - } + unsafeTransactionError() { + return new PostgresError("Only use sql.begin, sql.reserved or max: 1", { + code: "ERR_POSTGRES_UNSAFE_TRANSACTION", + }); } array(values: any[], typeNameOrID?: number | ArrayType): SQLArrayParameter { @@ -886,28 +476,6 @@ class PostgresAdapter }; } - validateTransactionOptions(options: string): { valid: boolean; error?: string } { - // The string is interpolated into `BEGIN ${options}`, so refuse anything that - // could terminate the statement or start a new one. - if (!/^[A-Za-z ,]*$/.test(options)) { - return { - valid: false, - error: "Transaction options can only contain letters, spaces, and commas.", - }; - } - return { valid: true }; - } - - validateDistributedTransactionName(name: string): { valid: boolean; error?: string } { - if (name.indexOf("'") !== -1) { - return { - valid: false, - error: "Distributed transaction name cannot contain single quotes.", - }; - } - return { valid: true }; - } - getCommitDistributedSQL(name: string): string { const validation = this.validateDistributedTransactionName(name); if (!validation.valid) { @@ -925,16 +493,7 @@ class PostgresAdapter } createQueryHandle(sql: string, values: unknown[], flags: number) { - if (!(flags & SQLQueryFlags.allowUnsafeTransaction)) { - if (this.connectionInfo.max !== 1) { - const upperCaseSqlString = sql.toUpperCase().trim(); - if (upperCaseSqlString.startsWith("BEGIN") || upperCaseSqlString.startsWith("START TRANSACTION")) { - throw new PostgresError("Only use sql.begin, sql.reserved or max: 1", { - code: "ERR_POSTGRES_UNSAFE_TRANSACTION", - }); - } - } - } + this.checkUnsafeTransaction(sql, flags); return createPostgresQuery( sql, @@ -946,578 +505,23 @@ class PostgresAdapter ); } - maxDistribution() { - if (!this.waitingQueue.length) return 0; - const result = Math.ceil((this.waitingQueue.length + this.totalQueries) / this.connections.length); - return result ? result : 1; + getHelperCommand(query: string): SQLCommand { + return getHelperCommandFromDetect(query, false); } - flushConcurrentQueries() { - const maxDistribution = this.maxDistribution(); - if (maxDistribution === 0) { - return; - } - - while (true) { - const nonReservedConnections = Array.from(this.readyConnections || []).filter( - c => !(c.flags & PooledConnectionFlags.preReserved) && c.queryCount < maxDistribution, - ); - if (nonReservedConnections.length === 0) { - return; - } - const orderedConnections = nonReservedConnections.sort((a, b) => a.queryCount - b.queryCount); - for (const connection of orderedConnections) { - const pending = this.waitingQueue.shift(); - if (!pending) { - return; - } - connection.queryCount++; - this.totalQueries++; - pending(null, connection); - } - } + placeholder(index: number): string { + return "$" + index; } - release(connection: PooledPostgresConnection, connectingEvent: boolean = false) { - if (!connectingEvent) { - connection.queryCount--; - this.totalQueries--; - } - const currentQueryCount = connection.queryCount; - if (currentQueryCount == 0) { - connection.flags &= ~PooledConnectionFlags.reserved; - connection.flags &= ~PooledConnectionFlags.preReserved; - } - if (this.onAllQueriesFinished) { - // we are waiting for all queries to finish, lets check if we can call it - if (!this.hasPendingQueries()) { - this.onAllQueriesFinished(); - } + bindParam(value: unknown, binding_values: unknown[], index: number): string { + if (value instanceof SQLArrayParameter) { + binding_values.push(value.serializedValues); + return `$${index}::${value.arrayType}[] `; } - - if (connection.state !== PooledConnectionState.connected) { - // connection is not ready - if (connection.storedError) { - // this connection got a error but maybe we can wait for another - - if (this.hasConnectionsAvailable()) { - return; - } - - const waitingQueue = this.waitingQueue; - const reservedQueue = this.reservedQueue; - - this.waitingQueue = []; - this.reservedQueue = []; - // we have no connections available so lets fails - for (const pending of waitingQueue) { - pending(connection.storedError, connection); - } - for (const pending of reservedQueue) { - pending(connection.storedError, connection); - } - // draining the queues may have been the last pending work — a - // graceful close() is waiting on this callback - if (this.onAllQueriesFinished && !this.hasPendingQueries()) { - this.onAllQueriesFinished(); - } - } - return; - } - - if (currentQueryCount == 0) { - // ok we can actually bind reserved queries to it - const pendingReserved = this.reservedQueue.shift(); - if (pendingReserved) { - connection.flags |= PooledConnectionFlags.reserved; - connection.queryCount++; - this.totalQueries++; - // we have a connection waiting for a reserved connection lets prioritize it - pendingReserved(connection.storedError, connection); - return; - } - } - this.readyConnections.add(connection); - this.flushConcurrentQueries(); - } - - hasConnectionsAvailable() { - if (this.readyConnections?.size > 0) return true; - if (this.poolStarted) { - const pollSize = this.connections.length; - for (let i = 0; i < pollSize; i++) { - const connection = this.connections[i]; - if (connection && connection.state !== PooledConnectionState.closed) { - // some connection is connecting or connected - return true; - } - } - } - return false; - } - - hasPendingQueries() { - if (this.waitingQueue.length > 0 || this.reservedQueue.length > 0) return true; - if (this.poolStarted) { - return this.totalQueries > 0; - } - return false; - } - isConnected() { - if (this.readyConnections?.size > 0) { - return true; - } - if (this.poolStarted) { - const pollSize = this.connections.length; - for (let i = 0; i < pollSize; i++) { - const connection = this.connections[i]; - if (connection.state === PooledConnectionState.connected) { - return true; - } - } - } - return false; - } - flush() { - if (this.closed) { - return; - } - if (this.poolStarted) { - const pollSize = this.connections.length; - for (let i = 0; i < pollSize; i++) { - const connection = this.connections[i]; - if (connection.state === PooledConnectionState.connected) { - connection.connection?.flush(); - } - } - } - } - - async #close() { - let pending; - while ((pending = this.waitingQueue.shift())) { - pending(this.connectionClosedError(), null); - } - while (this.reservedQueue.length > 0) { - const pendingReserved = this.reservedQueue.shift(); - if (pendingReserved) { - pendingReserved(this.connectionClosedError(), null); - } - } - - const promises: Array> = []; - - if (this.poolStarted) { - this.poolStarted = false; - const pollSize = this.connections.length; - for (let i = 0; i < pollSize; i++) { - const connection = this.connections[i]; - switch (connection.state) { - case PooledConnectionState.pending: - { - if (connection.cancelRetry()) { - // a connect retry was scheduled; nothing is in flight so - // there is no onClose/onConnected to wait for - connection.state = PooledConnectionState.closed; - break; - } - const { promise, resolve } = Promise.withResolvers(); - connection.onFinish = resolve; - promises.push(promise); - connection.connection?.close(); - } - break; - - case PooledConnectionState.connected: - { - const { promise, resolve } = Promise.withResolvers(); - connection.onFinish = resolve; - promises.push(promise); - connection.connection?.close(); - } - break; - } - // clean connection reference - // @ts-ignore - this.connections[i] = null; - } - } - - this.readyConnections.clear(); - this.waitingQueue.length = 0; - return Promise.all(promises); - } - - async close(options?: { timeout?: number }): Promise { - if (this.closed) { - return; - } - - let timeout = options?.timeout; - if (timeout) { - timeout = Number(timeout); - if (timeout > 2 ** 31 || timeout < 0 || timeout !== timeout) { - throw $ERR_INVALID_ARG_VALUE("options.timeout", timeout, "must be a non-negative integer less than 2^31"); - } - - this.closed = true; - if (timeout === 0 || !this.hasPendingQueries()) { - // close immediately - await this.#close(); - return; - } - - const { promise, resolve } = Promise.withResolvers(); - const timer = setTimeout(() => { - // timeout is reached, lets close and probably fail some queries - this.#close().finally(resolve); - }, timeout * 1000); - timer.unref(); // dont block the event loop - - this.onAllQueriesFinished = () => { - clearTimeout(timer); - // everything is closed, lets close the pool - this.#close().finally(resolve); - }; - - return promise; - } else { - this.closed = true; - if (!this.hasPendingQueries()) { - // close immediately - await this.#close(); - return; - } - - // gracefully close the pool - const { promise, resolve } = Promise.withResolvers(); - - this.onAllQueriesFinished = () => { - // everything is closed, lets close the pool - this.#close().finally(resolve); - }; - - return promise; - } - } - - /** - * @param {function} onConnected - The callback function to be called when the connection is established. - * @param {boolean} reserved - Whether the connection is reserved, if is reserved the connection will not be released until release is called, if not release will only decrement the queryCount counter - */ - connect(onConnected: (err: Error | null, result: any) => void, reserved: boolean = false) { - if (this.closed) { - return onConnected(this.connectionClosedError(), null); - } - - if (!this.readyConnections || this.readyConnections.size === 0) { - // no connection ready lets make some - let retry_in_progress = false; - let all_closed = true; - let storedError: Error | null = null; - - if (this.poolStarted) { - // we already started the pool - // lets check if some connection is available to retry - const pollSize = this.connections.length; - for (let i = 0; i < pollSize; i++) { - const connection = this.connections[i]; - // we need a new connection and we have some connections that can retry - if (connection.state === PooledConnectionState.closed) { - if (connection.retry()) { - // lets wait for connection to be released - if (!retry_in_progress) { - // avoid adding to the queue twice, we wanna to retry every available pool connection - retry_in_progress = true; - if (reserved) { - // we are not sure what connection will be available so we dont pre reserve - this.reservedQueue.push(onConnected); - } else { - this.waitingQueue.push(onConnected); - } - } - } else { - // we have some error, lets grab it and fail if unable to start a connection - storedError = connection.storedError; - } - } else { - // we have some pending or open connections - all_closed = false; - } - } - if (!all_closed && !retry_in_progress) { - // is possible to connect because we have some working connections, or we are just without network for some reason - // wait for connection to be released or fail - if (reserved) { - // we are not sure what connection will be available so we dont pre reserve - this.reservedQueue.push(onConnected); - } else { - this.waitingQueue.push(onConnected); - } - } else if (!retry_in_progress) { - // impossible to connect or retry - onConnected(storedError ?? this.connectionClosedError(), null); - } - return; - } - // we never started the pool, lets start it - if (reserved) { - this.reservedQueue.push(onConnected); - } else { - this.waitingQueue.push(onConnected); - } - this.poolStarted = true; - const pollSize = this.connections.length; - // pool is always at least 1 connection - const firstConnection = new PooledPostgresConnection(this.connectionInfo, this); - this.connections[0] = firstConnection; - if (reserved) { - firstConnection.flags |= PooledConnectionFlags.preReserved; // lets pre reserve the first connection - } - for (let i = 1; i < pollSize; i++) { - this.connections[i] = new PooledPostgresConnection(this.connectionInfo, this); - } - return; - } - if (reserved) { - let connectionWithLeastQueries: PooledPostgresConnection | null = null; - let leastQueries = Infinity; - for (const connection of this.readyConnections || []) { - if (connection.flags & PooledConnectionFlags.preReserved || connection.flags & PooledConnectionFlags.reserved) - continue; - const queryCount = connection.queryCount; - if (queryCount > 0) { - if (queryCount < leastQueries) { - leastQueries = queryCount; - connectionWithLeastQueries = connection; - } - continue; - } - connection.flags |= PooledConnectionFlags.reserved; - connection.queryCount++; - this.totalQueries++; - this.readyConnections?.delete(connection); - onConnected(null, connection); - return; - } - - if (connectionWithLeastQueries) { - // lets mark the connection with the least queries as preReserved if any - connectionWithLeastQueries.flags |= PooledConnectionFlags.preReserved; - } - - // no connection available to be reserved lets wait for a connection to be released - this.reservedQueue.push(onConnected); - } else { - this.waitingQueue.push(onConnected); - this.flushConcurrentQueries(); - } - } - - normalizeQuery(strings: string | TemplateStringsArray, values: unknown[], binding_idx = 1): [string, unknown[]] { - // This function handles array values in single fields: - // - JSON/JSONB are the only field types that can be arrays themselves, so we serialize them - // - SQL array field types (e.g., INTEGER[], TEXT[]) require the sql.array() helper - // - All other types are handled natively - - if (typeof strings === "string") { - // identifier or unsafe query - return [strings, values || []]; - } - - if (!$isArray(strings)) { - // we should not hit this path - throw new SyntaxError("Invalid query: SQL Fragment cannot be executed or was misused"); - } - - const str_len = strings.length; - if (str_len === 0) { - return ["", []]; - } - - let binding_values: any[] = []; - let query = ""; - - for (let i = 0; i < str_len; i++) { - const string = strings[i]; - - if (typeof string === "string") { - query += string; - - if (values.length > i) { - const value = values[i]; - - if (value instanceof Query) { - const q = value as Query; - const [sub_query, sub_values] = this.normalizeQuery(q[_strings], q[_values], binding_idx); - - query += sub_query; - for (let j = 0; j < sub_values.length; j++) { - binding_values.push(sub_values[j]); - } - binding_idx += sub_values.length; - } else if (value instanceof SQLHelper) { - const command = detectCommand(query); - // only selectIn, insert, update, updateSet are allowed - if (command === SQLCommand.none || command === SQLCommand.where) { - throw new SyntaxError("Helpers are only allowed for INSERT, UPDATE and IN commands"); - } - const { columns, value: items } = value as SQLHelper; - const columnCount = columns.length; - if (columnCount === 0 && command !== SQLCommand.in) { - throw new SyntaxError(`Cannot ${commandToString(command)} with no columns`); - } - const lastColumnIndex = columns.length - 1; - - if (command === SQLCommand.insert) { - // - // insert into users ${sql(users)} or insert into users ${sql(user)} - // - - // Build column list while determining which columns have at least one defined value - const { definedColumns, columnsSql } = buildDefinedColumnsAndQuery( - columns, - items, - this.escapeIdentifier.bind(this), - ); - - const definedColumnCount = definedColumns.length; - if (definedColumnCount === 0) { - throw new SyntaxError("Insert needs to have at least one column with a defined value"); - } - const lastDefinedColumnIndex = definedColumnCount - 1; - - query += columnsSql; - if ($isArray(items)) { - const itemsCount = items.length; - const lastItemIndex = itemsCount - 1; - for (let j = 0; j < itemsCount; j++) { - query += "("; - const item = items[j]; - for (let k = 0; k < definedColumnCount; k++) { - const column = definedColumns[k]; - const columnValue = item[column]; - query += `$${binding_idx++}${k < lastDefinedColumnIndex ? ", " : ""}`; - // If this item has undefined for a column that other items defined, use null - binding_values.push(typeof columnValue === "undefined" ? null : columnValue); - } - if (j < lastItemIndex) { - query += "),"; - } else { - query += ") "; // the user can add RETURNING * or RETURNING id - } - } - } else { - query += "("; - const item = items; - for (let j = 0; j < definedColumnCount; j++) { - const column = definedColumns[j]; - const columnValue = item[column]; - query += `$${binding_idx++}${j < lastDefinedColumnIndex ? ", " : ""}`; - binding_values.push(columnValue); - } - query += ") "; // the user can add RETURNING * or RETURNING id - } - } else if (command === SQLCommand.in) { - // SELECT * FROM users WHERE id IN (${sql([1, 2, 3])}) - if (!$isArray(items)) { - throw new SyntaxError("An array of values is required for WHERE IN helper"); - } - const itemsCount = items.length; - const lastItemIndex = itemsCount - 1; - query += "("; - for (let j = 0; j < itemsCount; j++) { - query += `$${binding_idx++}${j < lastItemIndex ? ", " : ""}`; - if (columnCount > 0) { - // we must use a key from a object - if (columnCount > 1) { - // we should not pass multiple columns here - throw new SyntaxError("Cannot use WHERE IN helper with multiple columns"); - } - // SELECT * FROM users WHERE id IN (${sql(users, "id")}) - const value = items[j]; - if (typeof value === "undefined") { - binding_values.push(null); - } else { - const value_from_key = value[columns[0]]; - - if (typeof value_from_key === "undefined") { - binding_values.push(null); - } else { - binding_values.push(value_from_key); - } - } - } else { - const value = items[j]; - if (typeof value === "undefined") { - binding_values.push(null); - } else { - binding_values.push(value); - } - } - } - query += ") "; // more conditions can be added after this - } else { - // UPDATE users SET ${sql({ name: "John", age: 31 })} WHERE id = 1 - let item; - if ($isArray(items)) { - if (items.length > 1) { - throw new SyntaxError("Cannot use array of objects for UPDATE"); - } - item = items[0]; - } else { - item = items; - } - // no need to include if is updateSet - if (command === SQLCommand.update) { - query += " SET "; - } - let hasValues = false; - for (let i = 0; i < columnCount; i++) { - const column = columns[i]; - const columnValue = item[column]; - if (typeof columnValue === "undefined") { - // skip undefined values, this is the expected behavior in JS - continue; - } - hasValues = true; - query += `${this.escapeIdentifier(column)} = $${binding_idx++}${i < lastColumnIndex ? ", " : ""}`; - binding_values.push(columnValue); - } - if (query.endsWith(", ")) { - // we got an undefined value at the end, lets remove the last comma - query = query.substring(0, query.length - 2); - } - if (!hasValues) { - throw new SyntaxError("Update needs to have at least one column"); - } - // the user can add where clause after this - query += " "; - } - } else if (value instanceof SQLArrayParameter) { - query += `$${binding_idx++}::${value.arrayType}[] `; - binding_values.push(value.serializedValues); - } else { - query += `$${binding_idx++} `; - if (typeof value === "undefined") { - binding_values.push(null); - } else { - binding_values.push(value); - } - } - } - } else { - throw new SyntaxError("Invalid query: SQL Fragment cannot be executed or was misused"); - } - } - - return [query, binding_values]; + return pushBindParam(this, value, binding_values, index); } } export default { PostgresAdapter, - SQLCommand, - commandToString, - detectCommand, }; diff --git a/src/js/internal/sql/shared.ts b/src/js/internal/sql/shared.ts index 51301ef8c57..94e82acef85 100644 --- a/src/js/internal/sql/shared.ts +++ b/src/js/internal/sql/shared.ts @@ -1,4 +1,11 @@ +import type { Query as QueryType } from "./query"; + const PublicArray = globalThis.Array; +const { + Query, + SQLQueryFlags, + symbols: { _strings, _values }, +} = require("internal/sql/query"); declare global { interface NumberConstructor { @@ -218,6 +225,1145 @@ function buildDefinedColumnsAndQuery( return { definedColumns, columnsSql }; } +const enum SQLCommand { + insert = 0, + update = 1, + updateSet = 2, + where = 3, + in = 4, + none = -1, +} +export type { SQLCommand }; + +function commandToString(command: SQLCommand): string { + switch (command) { + case SQLCommand.insert: + return "INSERT"; + case SQLCommand.updateSet: + case SQLCommand.update: + return "UPDATE"; + case SQLCommand.in: + case SQLCommand.where: + return "WHERE"; + default: + return ""; + } +} + +function detectCommand(query: string, anyAndAllMeanIn: boolean): SQLCommand { + const text = query.toLowerCase().trim(); + const text_len = text.length; + + let token = ""; + let command = SQLCommand.none; + let quoted = false; + // we need to reverse search so we find the closest command to the parameter + for (let i = text_len - 1; i >= 0; i--) { + const char = text[i]; + switch (char) { + case " ": // Space + case "\n": // Line feed + case "\t": // Tab character + case "\r": // Carriage return + case "\f": // Form feed + case "\v": { + switch (token) { + case "insert": { + return SQLCommand.insert; + } + case "update": { + return SQLCommand.update; + } + case "where": { + return SQLCommand.where; + } + case "set": { + return SQLCommand.updateSet; + } + case "in": { + return SQLCommand.in; + } + default: { + token = ""; + continue; + } + } + } + default: { + // skip quoted commands + if (char === '"') { + quoted = !quoted; + continue; + } + if (!quoted) { + token = char + token; + } + } + } + } + if (token) { + switch (token) { + case "insert": + return SQLCommand.insert; + case "update": + return SQLCommand.update; + case "where": + return SQLCommand.where; + case "set": + return SQLCommand.updateSet; + case "in": + return SQLCommand.in; + case "any": + case "all": + // MySQL treats a leading ANY/ALL token like IN; Postgres does not. + return anyAndAllMeanIn ? SQLCommand.in : SQLCommand.none; + default: + return SQLCommand.none; + } + } + return command; +} + +function getHelperCommandFromDetect(query: string, anyAndAllMeanIn: boolean): SQLCommand { + const command = detectCommand(query, anyAndAllMeanIn); + // only selectIn, insert, update, updateSet are allowed + if (command === SQLCommand.none || command === SQLCommand.where) { + throw new SyntaxError("Helpers are only allowed for INSERT, UPDATE and IN commands"); + } + return command; +} + +/** + * The driver-specific hooks consumed by the shared {@link normalizeQuery}. + * Methods stay on the adapter prototype so per-query cost is a monomorphic + * method call. + */ +interface QueryNormalizationAdapter { + escapeIdentifier(name: string): string; + /** Returns the placeholder for the given 1-based binding index ("?" or "$N"). */ + placeholder(index: number): string; + /** Pushes a plain bound value and returns its SQL fragment (always consumes one binding index). */ + bindParam(value: unknown, binding_values: unknown[], index: number): string; + /** Detects the SQL command preceding a helper, throwing if helpers are not allowed there. */ + getHelperCommand(query: string): SQLCommand; + /** Whether the UPDATE helper should omit the SET keyword (MySQL upsert). */ + isUpsertUpdate(query: string): boolean; + throwIfUpdateEmpty(query: string, hasValues: boolean): void; +} + +function pushBindParam( + adapter: QueryNormalizationAdapter, + value: unknown, + binding_values: unknown[], + index: number, +): string { + if (typeof value === "undefined") { + binding_values.push(null); + } else { + binding_values.push(value); + } + return adapter.placeholder(index) + " "; +} + +// This function handles array values in single fields: +// - JSON/JSONB are the only field types that can be arrays themselves, so we serialize them +// - SQL array field types (e.g., INTEGER[], TEXT[]) require the sql.array() helper +// - All other types are handled natively +function normalizeQuery( + adapter: QueryNormalizationAdapter, + strings: string | TemplateStringsArray, + values: unknown[], + binding_idx = 1, +): [string, unknown[]] { + if (typeof strings === "string") { + // identifier or unsafe query + return [strings, values || []]; + } + + if (!$isArray(strings)) { + // we should not hit this path + throw new SyntaxError("Invalid query: SQL Fragment cannot be executed or was misused"); + } + + const str_len = strings.length; + if (str_len === 0) { + return ["", []]; + } + + let binding_values: any[] = []; + let query = ""; + + for (let i = 0; i < str_len; i++) { + const string = strings[i]; + + if (typeof string === "string") { + query += string; + + if (values.length > i) { + const value = values[i]; + + if (value instanceof Query) { + const q = value as QueryType; + const [sub_query, sub_values] = normalizeQuery(adapter, q[_strings], q[_values], binding_idx); + + query += sub_query; + for (let j = 0; j < sub_values.length; j++) { + binding_values.push(sub_values[j]); + } + binding_idx += sub_values.length; + } else if (value instanceof SQLHelper) { + const command = adapter.getHelperCommand(query); + const { columns, value: items } = value as SQLHelper; + const columnCount = columns.length; + if (columnCount === 0 && command !== SQLCommand.in) { + throw new SyntaxError(`Cannot ${commandToString(command)} with no columns`); + } + const lastColumnIndex = columns.length - 1; + + if (command === SQLCommand.insert) { + // + // insert into users ${sql(users)} or insert into users ${sql(user)} + // + + // Build column list while determining which columns have at least one defined value + const { definedColumns, columnsSql } = buildDefinedColumnsAndQuery( + columns, + items, + adapter.escapeIdentifier.bind(adapter), + ); + + const definedColumnCount = definedColumns.length; + if (definedColumnCount === 0) { + throw new SyntaxError("Insert needs to have at least one column with a defined value"); + } + const lastDefinedColumnIndex = definedColumnCount - 1; + + query += columnsSql; + if ($isArray(items)) { + const itemsCount = items.length; + const lastItemIndex = itemsCount - 1; + for (let j = 0; j < itemsCount; j++) { + query += "("; + const item = items[j]; + for (let k = 0; k < definedColumnCount; k++) { + const column = definedColumns[k]; + const columnValue = item[column]; + query += `${adapter.placeholder(binding_idx++)}${k < lastDefinedColumnIndex ? ", " : ""}`; + // If this item has undefined for a column that other items defined, use null + binding_values.push(typeof columnValue === "undefined" ? null : columnValue); + } + if (j < lastItemIndex) { + query += "),"; + } else { + query += ") "; // the user can add RETURNING * or RETURNING id + } + } + } else { + query += "("; + const item = items; + for (let j = 0; j < definedColumnCount; j++) { + const column = definedColumns[j]; + const columnValue = item[column]; + query += `${adapter.placeholder(binding_idx++)}${j < lastDefinedColumnIndex ? ", " : ""}`; + binding_values.push(columnValue); + } + query += ") "; // the user can add RETURNING * or RETURNING id + } + } else if (command === SQLCommand.in) { + // SELECT * FROM users WHERE id IN (${sql([1, 2, 3])}) + if (!$isArray(items)) { + throw new SyntaxError("An array of values is required for WHERE IN helper"); + } + const itemsCount = items.length; + const lastItemIndex = itemsCount - 1; + query += "("; + for (let j = 0; j < itemsCount; j++) { + query += `${adapter.placeholder(binding_idx++)}${j < lastItemIndex ? ", " : ""}`; + if (columnCount > 0) { + // we must use a key from a object + if (columnCount > 1) { + // we should not pass multiple columns here + throw new SyntaxError("Cannot use WHERE IN helper with multiple columns"); + } + // SELECT * FROM users WHERE id IN (${sql(users, "id")}) + const value = items[j]; + if (typeof value === "undefined") { + binding_values.push(null); + } else { + const value_from_key = value[columns[0]]; + + if (typeof value_from_key === "undefined") { + binding_values.push(null); + } else { + binding_values.push(value_from_key); + } + } + } else { + const value = items[j]; + if (typeof value === "undefined") { + binding_values.push(null); + } else { + binding_values.push(value); + } + } + } + query += ") "; // more conditions can be added after this + } else { + // UPDATE users SET ${sql({ name: "John", age: 31 })} WHERE id = 1 + let item; + if ($isArray(items)) { + if (items.length > 1) { + throw new SyntaxError("Cannot use array of objects for UPDATE"); + } + item = items[0]; + } else { + item = items; + } + // no need to include SET if is updateSet or upsert + if (command === SQLCommand.update && !adapter.isUpsertUpdate(query)) { + query += " SET "; + } + let hasValues = false; + for (let i = 0; i < columnCount; i++) { + const column = columns[i]; + const columnValue = item[column]; + if (typeof columnValue === "undefined") { + // skip undefined values, this is the expected behavior in JS + continue; + } + hasValues = true; + query += `${adapter.escapeIdentifier(column as string)} = ${adapter.placeholder(binding_idx++)}${i < lastColumnIndex ? ", " : ""}`; + binding_values.push(columnValue); + } + if (query.endsWith(", ")) { + // we got an undefined value at the end, lets remove the last comma + query = query.substring(0, query.length - 2); + } + adapter.throwIfUpdateEmpty(query, hasValues); + // the user can add where clause after this + query += " "; + } + } else { + query += adapter.bindParam(value, binding_values, binding_idx++); + } + } + } else { + throw new SyntaxError("Invalid query: SQL Fragment cannot be executed or was misused"); + } + } + + return [query, binding_values]; +} + +const enum PooledConnectionState { + pending = 0, + connected = 1, + closed = 2, +} + +const enum PooledConnectionFlags { + /// canBeConnected is used to indicate that at least one time we were able to connect to the database + canBeConnected = 1 << 0, + /// reserved is used to indicate that the connection is currently reserved + reserved = 1 << 1, + /// preReserved is used to indicate that the connection will be reserved in the future when queryCount drops to 0 + preReserved = 1 << 2, +} +export type { PooledConnectionState }; + +function onQueryFinish(this: BasePooledConnection, onClose: (err: Error) => void) { + this.queries.delete(onClose); + this.adapter.release(this); +} + +abstract class BasePooledConnection { + adapter: BaseSQLAdapter; + connection: ConnectionHandle | null = null; + state: PooledConnectionState = PooledConnectionState.pending; + storedError: Error | null = null; + queries: Set<(err: Error) => void> = new Set(); + onFinish: ((err: Error | null) => void) | null = null; + connectionInfo: Bun.SQL.__internal.DefinedPostgresOrMySQLOptions; + flags: number = 0; + /// queryCount is used to indicate the number of queries using the connection, if a connection is reserved or if its a transaction queryCount will be 1 independently of the number of queries + queryCount: number = 0; + /// when the current connect cycle started; 0 when not connecting. Connect + /// failures (server not yet accepting connections) are retried until + /// connectionTimeout elapses from this point. + connectStartedAt: number = 0; + connectAttempts: number = 0; + retryTimer: ReturnType | null = null; + + constructor( + connectionInfo: Bun.SQL.__internal.DefinedPostgresOrMySQLOptions, + adapter: BaseSQLAdapter, + ) { + this.adapter = adapter; + this.connectionInfo = connectionInfo; + this.#beginConnecting(); + } + + /** Starts (or restarts) the driver-specific native connection. */ + protected abstract startConnection(): Promise; + /** Wraps a driver error options object into the driver's Error class. */ + protected abstract wrapError(error: any): Error; + /** Whether the given error code is an authentication-style error that retrying cannot fix. */ + protected abstract isNonRetryableError(code: string | undefined): boolean; + /** + * Whether the error is a connect failure (the server accepted the + * connection but closed it before the handshake completed) that a backoff + * retry can fix. + */ + protected abstract isConnectFailureError(err: Error | null): boolean; + + async #beginConnecting() { + // a fresh connect cycle (not a backoff retry) starts the retry budget + if (this.connectStartedAt === 0) { + this.connectStartedAt = Date.now(); + this.connectAttempts = 0; + } + await this.startConnection(); + if (this.onFinish !== null) { + // the pool was force-closed while the native handle was being created; + // close it now so onClose fires and onFinish settles + this.connection?.close(); + } + } + + protected handleConnected(err: any) { + if (err) { + err = this.wrapError(err); + } + const connectionInfo = this.connectionInfo; + try { + // user code; a throw must not abort the pool bookkeeping below + if (connectionInfo?.onconnect) { + connectionInfo.onconnect(err); + } + } finally { + this.storedError = err; + if (!err) { + this.connectStartedAt = 0; + this.flags |= PooledConnectionFlags.canBeConnected; + } + this.state = err ? PooledConnectionState.closed : PooledConnectionState.connected; + const onFinish = this.onFinish; + if (onFinish) { + this.queryCount = 0; + this.flags &= ~PooledConnectionFlags.reserved; + this.flags &= ~PooledConnectionFlags.preReserved; + + // pool is closed, lets finish the connection + if (err) { + onFinish(err); + } else { + this.connection?.close(); + } + } else { + this.adapter.release(this, true); + } + } + } + + protected handleClose(err: any) { + if (err) { + err = this.wrapError(err); + } + this.connection = null; + this.storedError = err; + if (this.#shouldRetryConnecting(err)) { + // The server is not accepting connections yet (e.g. still starting + // up). Keep the slot pending and retry with backoff instead of + // failing the queries that are waiting for a connection. The user's + // onclose callback only fires when the slot actually closes. + this.connectAttempts++; + const delay = Math.min(20 * 2 ** this.connectAttempts, 1000); + this.retryTimer = setTimeout(BasePooledConnection.#retryTimerFired, delay, this); + return; + } + // this connect cycle is over; a later retry() starts a fresh one + this.connectStartedAt = 0; + this.#finishClose(err); + } + + static #retryTimerFired(self: BasePooledConnection) { + self.retryTimer = null; + // conditions may have changed during the backoff (pool closing, waiters + // gone, retry budget elapsed), so re-check before dialing + if (self.#canKeepRetrying()) { + self.#beginConnecting(); + } else { + self.#finishClose(self.storedError); + } + } + + #shouldRetryConnecting(err: any): boolean { + // connect failures come from the native layer as options objects that + // wrapError turned into the driver's Error class with a typed code + if (!this.isConnectFailureError(err)) { + return false; + } + return this.#canKeepRetrying(); + } + + #canKeepRetrying(): boolean { + if (this.adapter.closed || this.onFinish !== null) { + return false; + } + // only retry while queries are actually waiting for a connection + if (this.adapter.waitingQueue.length === 0 && this.adapter.reservedQueue.length === 0) { + return false; + } + // an explicit connectionTimeout of 0 disables the connect timer, and with + // it the retry budget + const connectionTimeout = this.connectionInfo.connectionTimeout ?? 30 * 1000; + if (connectionTimeout <= 0) { + return false; + } + return this.connectStartedAt !== 0 && Date.now() - this.connectStartedAt < connectionTimeout; + } + + /// Returns true if a scheduled connect retry was cancelled; in that case + /// nothing is in flight and no onClose/onConnected callback will fire. + cancelRetry(): boolean { + if (this.retryTimer !== null) { + clearTimeout(this.retryTimer); + this.retryTimer = null; + return true; + } + return false; + } + + #finishClose(err: any) { + const connectionInfo = this.connectionInfo; + try { + // user code; a throw must not abort the pool bookkeeping below + if (connectionInfo?.onclose) { + connectionInfo.onclose(err); + } + } finally { + this.state = PooledConnectionState.closed; + this.storedError = err; + + // remove from ready connections if its there + this.adapter.readyConnections.delete(this); + const queries = new Set(this.queries); + this.queries?.clear?.(); + this.queryCount = 0; + this.flags &= ~PooledConnectionFlags.reserved; + + // notify all queries that the connection is closed + for (const onClose of queries) { + onClose(err); + } + const onFinish = this.onFinish; + if (onFinish) { + onFinish(err); + } + + this.adapter.release(this, true); + } + } + + onClose(onClose: (err: Error) => void) { + this.queries.add(onClose); + } + + bindQuery(query: QueryType, onClose: (err: Error) => void) { + this.queries.add(onClose); + query.finally(onQueryFinish.bind(this, onClose)); + } + + protected doRetry() { + if (this.adapter.closed) { + return; + } + // reset error and state + this.storedError = null; + this.connectStartedAt = 0; + this.state = PooledConnectionState.pending; + // retry connection + this.#beginConnecting(); + } + close() { + try { + if (this.state === PooledConnectionState.connected) { + this.connection?.close(); + } + } catch {} + } + flush() { + this.connection?.flush(); + } + retry() { + // if pool is closed, we can't retry + if (this.adapter.closed) { + return false; + } + // we need to reconnect + // lets use a retry strategy + + // we can only retry if one day we are able to connect + if (this.flags & PooledConnectionFlags.canBeConnected) { + this.doRetry(); + } else if (this.isNonRetryableError((this.storedError as any)?.code)) { + // we can't retry these are authentication errors + return false; + } else { + // we can retry + this.doRetry(); + } + return true; + } +} + +function closeNT(onClose: (err: Error) => void, err: Error | null) { + onClose(err as Error); +} + +/** + * Resolves the password (which may be a function and/or a promise) and calls + * the driver's native createConnection with the normalized pool options. + * Extra trailing arguments past `useUnnamedPreparedStatements` (MySQL's + * `allowPublicKeyRetrieval`) are ignored by drivers that don't take them. + */ +async function createPooledConnectionHandle( + nativeCreateConnection: (...args: any[]) => ConnectionHandle, + options: Bun.SQL.__internal.DefinedPostgresOrMySQLOptions, + onConnected: (err: Error | null, connection: ConnectionHandle) => void, + onClose: (err: Error | null) => void, +): Promise { + const { + hostname, + port, + username, + tls, + query, + database, + sslMode, + idleTimeout = 0, + connectionTimeout = 30 * 1000, + maxLifetime = 0, + prepare = true, + path, + allowPublicKeyRetrieval = false, + } = options; + + let password: Bun.MaybePromise | string | undefined | (() => Bun.MaybePromise) = options.password; + + try { + if (typeof password === "function") { + password = password(); + } + + if (password && $isPromise(password)) { + password = await password; + } + + return nativeCreateConnection( + hostname, + Number(port), + username || "", + password || "", + database || "", + // > The default value for sslmode is prefer. As is shown in the table, this + // makes no sense from a security point of view, and it only promises + // performance overhead if possible. It is only provided as the default for + // backward compatibility, and is not recommended in secure deployments. + sslMode || SSLMode.disable, + tls || null, + query || "", + path || "", + onConnected, + onClose, + idleTimeout, + connectionTimeout, + maxLifetime, + !prepare, + !!allowPublicKeyRetrieval, + ); + } catch (e) { + // defer so the callback never runs while the adapter is still filling + // this.connections (it scans that array) + process.nextTick(closeNT, onClose, e); + return null; + } +} + +abstract class BaseSQLAdapter + implements DatabaseAdapter +{ + public readonly connectionInfo: Bun.SQL.__internal.DefinedPostgresOrMySQLOptions; + + public readonly connections: PooledConnection[]; + public readonly readyConnections: Set = new Set(); + + public waitingQueue: Array<(err: Error | null, result: any) => void> = []; + public reservedQueue: Array<(err: Error | null, result: any) => void> = []; + + public poolStarted: boolean = false; + public closed: boolean = false; + public totalQueries: number = 0; + public onAllQueriesFinished: (() => void) | null = null; + + constructor(connectionInfo: Bun.SQL.__internal.DefinedPostgresOrMySQLOptions) { + this.connectionInfo = connectionInfo; + this.connections = new Array(connectionInfo.max); + } + + protected abstract createPooledConnection(): PooledConnection; + abstract createQueryHandle(sql: string, values: unknown[], flags: number): QueryHandle; + abstract array(values: any[], typeNameOrID?: number | ArrayType): SQLArrayParameter; + abstract getTransactionCommands(options?: string): TransactionCommands; + abstract getDistributedTransactionCommands(name: string): TransactionCommands | null; + abstract getCommitDistributedSQL(name: string): string; + abstract getRollbackDistributedSQL(name: string): string; + abstract escapeIdentifier(name: string): string; + abstract connectionClosedError(): Error; + abstract notTaggedCallError(): Error; + abstract queryCancelledError(): Error; + abstract invalidTransactionStateError(message: string): Error; + abstract unsafeTransactionError(): Error; + abstract getHelperCommand(query: string): SQLCommand; + + placeholder(_index: number): string { + return "?"; + } + + bindParam(value: unknown, binding_values: unknown[], index: number): string { + return pushBindParam(this, value, binding_values, index); + } + + isUpsertUpdate(_query: string): boolean { + return false; + } + + throwIfUpdateEmpty(_query: string, hasValues: boolean): void { + if (!hasValues) { + throw new SyntaxError("Update needs to have at least one column"); + } + } + + normalizeQuery(strings: string | TemplateStringsArray, values: unknown[], binding_idx = 1): [string, unknown[]] { + return normalizeQuery(this, strings, values, binding_idx); + } + + protected checkUnsafeTransaction(sql: string, flags: number) { + if (!(flags & SQLQueryFlags.allowUnsafeTransaction)) { + if (this.connectionInfo.max !== 1) { + const upperCaseSqlString = sql.toUpperCase().trim(); + if (upperCaseSqlString.startsWith("BEGIN") || upperCaseSqlString.startsWith("START TRANSACTION")) { + throw this.unsafeTransactionError(); + } + } + } + } + + supportsReservedConnections() { + return true; + } + + getConnectionForQuery(pooledConnection: PooledConnection) { + return pooledConnection.connection; + } + + attachConnectionCloseHandler(connection: PooledConnection, handler: () => void): void { + if (connection.onClose) { + connection.onClose(handler); + } + } + + detachConnectionCloseHandler(connection: PooledConnection, handler: () => void): void { + if (connection.queries) { + connection.queries.delete(handler); + } + } + + validateTransactionOptions(options: string): { valid: boolean; error?: string } { + // The string is interpolated into the BEGIN/START TRANSACTION statement, so refuse + // anything that could terminate the statement or start a new one. + if (!/^[A-Za-z ,]*$/.test(options)) { + return { + valid: false, + error: "Transaction options can only contain letters, spaces, and commas.", + }; + } + return { valid: true }; + } + + validateDistributedTransactionName(name: string): { valid: boolean; error?: string } { + if (name.indexOf("'") !== -1) { + return { + valid: false, + error: "Distributed transaction name cannot contain single quotes.", + }; + } + return { valid: true }; + } + + maxDistribution() { + if (!this.waitingQueue.length) return 0; + const result = Math.ceil((this.waitingQueue.length + this.totalQueries) / this.connections.length); + return result ? result : 1; + } + + flushConcurrentQueries() { + const maxDistribution = this.maxDistribution(); + if (maxDistribution === 0) { + return; + } + + while (true) { + const nonReservedConnections = Array.from(this.readyConnections).filter( + c => !(c.flags & PooledConnectionFlags.preReserved) && c.queryCount < maxDistribution, + ); + if (nonReservedConnections.length === 0) { + return; + } + const orderedConnections = nonReservedConnections.sort((a, b) => a.queryCount - b.queryCount); + for (const connection of orderedConnections) { + const pending = this.waitingQueue.shift(); + if (!pending) { + return; + } + connection.queryCount++; + this.totalQueries++; + pending(null, connection); + } + } + } + + release(connection: PooledConnection, connectingEvent: boolean = false) { + if (!connectingEvent) { + connection.queryCount--; + this.totalQueries--; + } + const currentQueryCount = connection.queryCount; + if (currentQueryCount == 0) { + connection.flags &= ~PooledConnectionFlags.reserved; + connection.flags &= ~PooledConnectionFlags.preReserved; + } + if (this.onAllQueriesFinished) { + // we are waiting for all queries to finish, lets check if we can call it + if (!this.hasPendingQueries()) { + this.onAllQueriesFinished(); + } + } + + if (connection.state !== PooledConnectionState.connected) { + // connection is not ready + if (connection.storedError) { + // this connection got a error but maybe we can wait for another + + if (this.hasConnectionsAvailable()) { + return; + } + + const waitingQueue = this.waitingQueue; + const reservedQueue = this.reservedQueue; + + this.waitingQueue = []; + this.reservedQueue = []; + // we have no connections available so lets fails + for (const pending of waitingQueue) { + pending(connection.storedError, connection); + } + for (const pending of reservedQueue) { + pending(connection.storedError, connection); + } + // draining the queues may have been the last pending work; a + // graceful close() is waiting on this callback + if (this.onAllQueriesFinished && !this.hasPendingQueries()) { + this.onAllQueriesFinished(); + } + } + return; + } + + if (currentQueryCount == 0) { + // ok we can actually bind reserved queries to it + const pendingReserved = this.reservedQueue.shift(); + if (pendingReserved) { + connection.flags |= PooledConnectionFlags.reserved; + connection.queryCount++; + this.totalQueries++; + // we have a connection waiting for a reserved connection lets prioritize it + pendingReserved(connection.storedError, connection); + return; + } + } + this.readyConnections.add(connection); + this.flushConcurrentQueries(); + } + + hasConnectionsAvailable() { + if (this.readyConnections?.size > 0) return true; + if (this.poolStarted) { + const pollSize = this.connections.length; + for (let i = 0; i < pollSize; i++) { + const connection = this.connections[i]; + // The slot can still be an unassigned hole while the pool is starting + // and a synchronous creation failure re-enters via release(). + if (connection && connection.state !== PooledConnectionState.closed) { + // some connection is connecting or connected + return true; + } + } + } + return false; + } + + hasPendingQueries() { + if (this.waitingQueue.length > 0 || this.reservedQueue.length > 0) return true; + if (this.poolStarted) { + return this.totalQueries > 0; + } + return false; + } + isConnected() { + if (this.readyConnections.size > 0) { + return true; + } + if (this.poolStarted) { + const pollSize = this.connections.length; + for (let i = 0; i < pollSize; i++) { + const connection = this.connections[i]; + if (connection.state === PooledConnectionState.connected) { + return true; + } + } + } + return false; + } + flush() { + if (this.closed) { + return; + } + if (this.poolStarted) { + const pollSize = this.connections.length; + for (let i = 0; i < pollSize; i++) { + const connection = this.connections[i]; + if (connection.state === PooledConnectionState.connected) { + connection.connection?.flush(); + } + } + } + } + + async #close() { + let pending; + while ((pending = this.waitingQueue.shift())) { + pending(this.connectionClosedError(), null); + } + while (this.reservedQueue.length > 0) { + const pendingReserved = this.reservedQueue.shift(); + if (pendingReserved) { + pendingReserved(this.connectionClosedError(), null); + } + } + + const promises: Array> = []; + + if (this.poolStarted) { + this.poolStarted = false; + const pollSize = this.connections.length; + for (let i = 0; i < pollSize; i++) { + const connection = this.connections[i]; + switch (connection.state) { + case PooledConnectionState.pending: + case PooledConnectionState.connected: { + // cancelRetry only returns true while a connect retry is parked + // in a backoff timer; nothing is in flight then, so there is no + // onClose/onConnected to wait for + if (connection.cancelRetry()) { + connection.state = PooledConnectionState.closed; + break; + } + const { promise, resolve } = Promise.withResolvers(); + connection.onFinish = resolve; + promises.push(promise); + connection.connection?.close(); + break; + } + } + // clean connection reference + // @ts-ignore + this.connections[i] = null; + } + } + + this.readyConnections.clear(); + this.waitingQueue.length = 0; + return Promise.all(promises); + } + + async close(options?: { timeout?: number }): Promise { + if (this.closed) { + return; + } + + let timeout = options?.timeout; + if (timeout) { + timeout = Number(timeout); + if (timeout > 2 ** 31 || timeout < 0 || timeout !== timeout) { + throw $ERR_INVALID_ARG_VALUE("options.timeout", timeout, "must be a non-negative integer less than 2^31"); + } + + this.closed = true; + if (timeout === 0 || !this.hasPendingQueries()) { + // close immediately + await this.#close(); + return; + } + + const { promise, resolve } = Promise.withResolvers(); + const timer = setTimeout(() => { + // timeout is reached, lets close and probably fail some queries + this.#close().finally(resolve); + }, timeout * 1000); + timer.unref(); // dont block the event loop + + this.onAllQueriesFinished = () => { + clearTimeout(timer); + // everything is closed, lets close the pool + this.#close().finally(resolve); + }; + + return promise; + } else { + this.closed = true; + if (!this.hasPendingQueries()) { + // close immediately + await this.#close(); + return; + } + + // gracefully close the pool + const { promise, resolve } = Promise.withResolvers(); + + this.onAllQueriesFinished = () => { + // everything is closed, lets close the pool + this.#close().finally(resolve); + }; + + return promise; + } + } + + /** + * @param {function} onConnected - The callback function to be called when the connection is established. + * @param {boolean} reserved - Whether the connection is reserved, if is reserved the connection will not be released until release is called, if not release will only decrement the queryCount counter + */ + connect(onConnected: (err: Error | null, result: any) => void, reserved: boolean = false) { + if (this.closed) { + return onConnected(this.connectionClosedError(), null); + } + + if (this.readyConnections.size === 0) { + // no connection ready lets make some + let retry_in_progress = false; + let all_closed = true; + let storedError: Error | null = null; + + if (this.poolStarted) { + // we already started the pool + // lets check if some connection is available to retry + const pollSize = this.connections.length; + for (let i = 0; i < pollSize; i++) { + const connection = this.connections[i]; + // we need a new connection and we have some connections that can retry + if (connection.state === PooledConnectionState.closed) { + if (connection.retry()) { + // lets wait for connection to be released + if (!retry_in_progress) { + // avoid adding to the queue twice, we wanna to retry every available pool connection + retry_in_progress = true; + if (reserved) { + // we are not sure what connection will be available so we dont pre reserve + this.reservedQueue.push(onConnected); + } else { + this.waitingQueue.push(onConnected); + } + } + } else { + // we have some error, lets grab it and fail if unable to start a connection + storedError = connection.storedError; + } + } else { + // we have some pending or open connections + all_closed = false; + } + } + if (!all_closed && !retry_in_progress) { + // is possible to connect because we have some working connections, or we are just without network for some reason + // wait for connection to be released or fail + if (reserved) { + // we are not sure what connection will be available so we dont pre reserve + this.reservedQueue.push(onConnected); + } else { + this.waitingQueue.push(onConnected); + } + } else if (!retry_in_progress) { + // impossible to connect or retry + onConnected(storedError ?? this.connectionClosedError(), null); + } + return; + } + // we never started the pool, lets start it + if (reserved) { + this.reservedQueue.push(onConnected); + } else { + this.waitingQueue.push(onConnected); + } + this.poolStarted = true; + const pollSize = this.connections.length; + // pool is always at least 1 connection + const firstConnection = this.createPooledConnection(); + this.connections[0] = firstConnection; + if (reserved) { + firstConnection.flags |= PooledConnectionFlags.preReserved; // lets pre reserve the first connection + } + for (let i = 1; i < pollSize; i++) { + this.connections[i] = this.createPooledConnection(); + } + return; + } + if (reserved) { + let connectionWithLeastQueries: PooledConnection | null = null; + let leastQueries = Infinity; + for (const connection of this.readyConnections) { + if (connection.flags & PooledConnectionFlags.preReserved || connection.flags & PooledConnectionFlags.reserved) + continue; + const queryCount = connection.queryCount; + if (queryCount > 0) { + if (queryCount < leastQueries) { + leastQueries = queryCount; + connectionWithLeastQueries = connection; + } + continue; + } + connection.flags |= PooledConnectionFlags.reserved; + connection.queryCount++; + this.totalQueries++; + this.readyConnections.delete(connection); + onConnected(null, connection); + return; + } + + if (connectionWithLeastQueries) { + // lets mark the connection with the least queries as preReserved if any + connectionWithLeastQueries.flags |= PooledConnectionFlags.preReserved; + } + + // no connection available to be reserved lets wait for a connection to be released + this.reservedQueue.push(onConnected); + } else { + this.waitingQueue.push(onConnected); + this.flushConcurrentQueries(); + } + } +} + const SQLITE_MEMORY = ":memory:"; const SQLITE_MEMORY_VARIANTS: string[] = [":memory:", "sqlite://:memory:", "sqlite:memory"]; @@ -975,6 +2121,12 @@ export default { normalizeSSLMode, SQLResultArray, SQLArrayParameter, + getHelperCommandFromDetect, + pushBindParam, + normalizeQuery, + BasePooledConnection, + BaseSQLAdapter, + createPooledConnectionHandle, // @ts-expect-error we're exporting a const enum which works in our builtins // generator but not in typescript officially SSLMode, diff --git a/src/js/internal/sql/sqlite.ts b/src/js/internal/sql/sqlite.ts index a7061616bd3..4b2c0f14fb5 100644 --- a/src/js/internal/sql/sqlite.ts +++ b/src/js/internal/sql/sqlite.ts @@ -1,13 +1,16 @@ import type * as BunSQLiteModule from "bun:sqlite"; import type { BaseQueryHandle, Query, SQLQueryResultMode } from "./query"; -import type { ArrayType, DatabaseAdapter, OnConnected, SQLArrayParameter, SQLHelper, SQLResultArray } from "./shared"; - -const { SQLHelper, SQLResultArray, buildDefinedColumnsAndQuery } = require("internal/sql/shared"); -const { - Query, - SQLQueryResultMode, - symbols: { _strings, _values }, -} = require("internal/sql/query"); +import type { + ArrayType, + DatabaseAdapter, + OnConnected, + SQLCommand as SharedSQLCommand, + SQLArrayParameter, + SQLResultArray, +} from "./shared"; + +const { SQLResultArray, normalizeQuery, pushBindParam } = require("internal/sql/shared"); +const { SQLQueryResultMode } = require("internal/sql/query"); const { SQLiteError } = require("internal/sql/errors"); let lazySQLiteModule: typeof BunSQLiteModule; @@ -377,204 +380,38 @@ class SQLiteAdapter implements DatabaseAdapter i) { - const value = values[i]; + // only selectIn, insert, update, updateSet are allowed + if (command === SQLCommand.none || command === SQLCommand.where) { + throw new SyntaxError("Helpers are only allowed for INSERT, UPDATE and WHERE IN commands"); + } + // the local SQLCommand enum is numerically identical to the shared one + return command as unknown as SharedSQLCommand; + } - if (value instanceof Query) { - const q = value as Query; - const [sub_query, sub_values] = this.normalizeQuery(q[_strings], q[_values], binding_idx); + isUpsertUpdate(_query: string): boolean { + return false; + } - query += sub_query; - for (let j = 0; j < sub_values.length; j++) { - binding_values.push(sub_values[j]); - } - binding_idx += sub_values.length; - } else if (value instanceof SQLHelper) { - // when partial is true we stop on the first command we find - const { command } = parseSQLQuery(query, true); - - // only selectIn, insert, update, updateSet are allowed - if (command === SQLCommand.none || command === SQLCommand.where) { - throw new SyntaxError("Helpers are only allowed for INSERT, UPDATE and WHERE IN commands"); - } - const { columns, value: items } = value as SQLHelper; - const columnCount = columns.length; - if (columnCount === 0 && command !== SQLCommand.in) { - throw new SyntaxError(`Cannot ${commandToString(command)} with no columns`); - } - const lastColumnIndex = columns.length - 1; - - if (command === SQLCommand.insert) { - // - // insert into users ${sql(users)} or insert into users ${sql(user)} - // - - // Build column list while determining which columns have at least one defined value - const { definedColumns, columnsSql } = buildDefinedColumnsAndQuery( - columns, - items, - this.escapeIdentifier.bind(this), - ); - - const definedColumnCount = definedColumns.length; - if (definedColumnCount === 0) { - throw new SyntaxError("Insert needs to have at least one column with a defined value"); - } - const lastDefinedColumnIndex = definedColumnCount - 1; - - query += columnsSql; - if ($isArray(items)) { - const itemsCount = items.length; - const lastItemIndex = itemsCount - 1; - for (let j = 0; j < itemsCount; j++) { - query += "("; - const item = items[j]; - for (let k = 0; k < definedColumnCount; k++) { - const column = definedColumns[k]; - const columnValue = item[column]; - // SQLite uses ? for placeholders, not $1, $2, etc. - query += `?${k < lastDefinedColumnIndex ? ", " : ""}`; - // If this item has undefined for a column that other items defined, use null - binding_values.push(typeof columnValue === "undefined" ? null : columnValue); - } - if (j < lastItemIndex) { - query += "),"; - } else { - query += ") "; // the user can add RETURNING * or RETURNING id - } - } - } else { - query += "("; - const item = items; - for (let j = 0; j < definedColumnCount; j++) { - const column = definedColumns[j]; - const columnValue = item[column]; - // SQLite uses ? for placeholders - query += `?${j < lastDefinedColumnIndex ? ", " : ""}`; - binding_values.push(columnValue); - } - query += ") "; // the user can add RETURNING * or RETURNING id - } - } else if (command === SQLCommand.in) { - // SELECT * FROM users WHERE id IN (${sql([1, 2, 3])}) - if (!$isArray(items)) { - throw new SyntaxError("An array of values is required for WHERE IN helper"); - } - const itemsCount = items.length; - const lastItemIndex = itemsCount - 1; - query += "("; - for (let j = 0; j < itemsCount; j++) { - // SQLite uses ? for placeholders - query += `?${j < lastItemIndex ? ", " : ""}`; - if (columnCount > 0) { - // we must use a key from a object - if (columnCount > 1) { - // we should not pass multiple columns here - throw new SyntaxError("Cannot use WHERE IN helper with multiple columns"); - } - // SELECT * FROM users WHERE id IN (${sql(users, "id")}) - const value = items[j]; - if (typeof value === "undefined") { - binding_values.push(null); - } else { - const value_from_key = value[columns[0]]; - - if (typeof value_from_key === "undefined") { - binding_values.push(null); - } else { - binding_values.push(value_from_key); - } - } - } else { - const value = items[j]; - if (typeof value === "undefined") { - binding_values.push(null); - } else { - binding_values.push(value); - } - } - } - query += ") "; // more conditions can be added after this - } else { - // UPDATE users SET ${sql({ name: "John", age: 31 })} WHERE id = 1 - let item; - if ($isArray(items)) { - if (items.length > 1) { - throw new SyntaxError("Cannot use array of objects for UPDATE"); - } - item = items[0]; - } else { - item = items; - } - // no need to include if is updateSet - if (command === SQLCommand.update) { - query += " SET "; - } - for (let i = 0; i < columnCount; i++) { - const column = columns[i]; - const columnValue = item[column]; - if (typeof columnValue === "undefined") { - // skip undefined values, this is the expected behavior in JS - continue; - } - // SQLite uses ? for placeholders - query += `${this.escapeIdentifier(column)} = ?${i < lastColumnIndex ? ", " : ""}`; - if (typeof columnValue === "undefined") { - binding_values.push(null); - } else { - binding_values.push(columnValue); - } - } - if (query.endsWith(", ")) { - // we got an undefined value at the end, lets remove the last comma - query = query.substring(0, query.length - 2); - } - if (query.endsWith("SET ")) { - throw new SyntaxError("Update needs to have at least one column"); - } - // the user can add where clause after this - query += " "; - } - } else { - // SQLite uses ? for placeholders - query += `? `; - if (typeof value === "undefined") { - binding_values.push(null); - } else { - binding_values.push(value); - } - } - } - } else { - throw new SyntaxError("Invalid query: SQL Fragment cannot be executed or was misused"); - } + throwIfUpdateEmpty(query: string, _hasValues: boolean): void { + if (query.endsWith("SET ")) { + throw new SyntaxError("Update needs to have at least one column"); } - - return [query, binding_values]; } connect(onConnected: OnConnected, reserved?: boolean) { diff --git a/test/js/sql/sql-onconnect-onclose-throw.test.ts b/test/js/sql/sql-onconnect-onclose-throw.test.ts index 35d91b09315..43bc4d3b69b 100644 --- a/test/js/sql/sql-onconnect-onclose-throw.test.ts +++ b/test/js/sql/sql-onconnect-onclose-throw.test.ts @@ -196,3 +196,71 @@ process.exit(0); expect(stdout).toBe("reentry ok\nreentry ok\nquery rejected: password error\n"); expect(exitCode).toBe(0); }); + +// The forced-close path (#32095) and the throwing-callback path (#32037) meet +// in the pool connection's close handler: the user's onclose runs first and +// may throw, and the bookkeeping that follows it must still settle the +// promise returned by close(). A server that accepts the TCP connection but +// never answers keeps the connection mid-handshake, and connectionTimeout: 0 +// disables the connect timer, so close() is the only teardown path; if the +// throw skipped the bookkeeping these fixtures would never print "closed". +const neverAnsweringServer = /* ts */ ` +const net = require("net"); +function neverAnsweringServer() { + return new Promise(resolveListening => { + const first = Promise.withResolvers(); + const server = net.createServer(socket => { + socket.unref(); + first.resolve(); + }); + server.unref(); + server.listen(0, "127.0.0.1", () => { + resolveListening({ port: server.address().port, accepted: first.promise }); + }); + }); +} +`; + +function forcedCloseFixture(adapter: "postgres" | "mysql") { + const url = adapter === "postgres" ? "postgres://postgres@127.0.0.1:" : "mysql://root@127.0.0.1:"; + const db = adapter === "postgres" ? "/postgres" : "/db"; + return ( + neverAnsweringServer + + /* ts */ ` +import { SQL } from "bun"; +process.on("uncaughtException", err => console.log("uncaught:", err.message)); +const { port, accepted } = await neverAnsweringServer(); +const sql = new SQL({ + url: "${url}" + port + "${db}", + max: 1, + connectionTimeout: 0, + onclose(err) { + console.log("onclose:", err?.code ?? err); + throw new Error("boom from onclose"); + }, +}); +const queryError = sql.unsafe("SELECT 1").catch(err => err); +await accepted; +await sql.close({ timeout: "0" }); +console.log("closed"); +console.log("query rejected:", (await queryError).code); +process.exit(0); +` + ); +} + +for (const [adapter, closedCode] of [ + ["postgres", "ERR_POSTGRES_CONNECTION_CLOSED"], + ["mysql", "ERR_MYSQL_CONNECTION_CLOSED"], +] as const) { + test.concurrent( + `${adapter}: a throwing onclose does not prevent forced close() from resolving mid-handshake`, + async () => { + const { stdout, exitCode } = await runFixture(forcedCloseFixture(adapter)); + expect(stdout).toBe( + `onclose: ${closedCode}\nuncaught: boom from onclose\nclosed\nquery rejected: ${closedCode}\n`, + ); + expect(exitCode).toBe(0); + }, + ); +}