Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/js/internal/sql/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,9 @@ abstract class BaseSQLAdapter<PooledConnection extends BasePooledConnection, Con

while (true) {
const nonReservedConnections = Array.from(this.readyConnections).filter(
c => !(c.flags & PooledConnectionFlags.preReserved) && c.queryCount < maxDistribution,
c =>
!(c.flags & (PooledConnectionFlags.preReserved | PooledConnectionFlags.reserved)) &&
c.queryCount < maxDistribution,
);
if (nonReservedConnections.length === 0) {
return;
Expand Down Expand Up @@ -1087,6 +1089,9 @@ abstract class BaseSQLAdapter<PooledConnection extends BasePooledConnection, Con
connection.flags |= PooledConnectionFlags.reserved;
connection.queryCount++;
this.totalQueries++;
// the connection now belongs to the reserved waiter; it must not stay
// visible to flushConcurrentQueries (same as the reserved path in connect())
this.readyConnections.delete(connection);
// we have a connection waiting for a reserved connection lets prioritize it
pendingReserved(connection.storedError, connection);
return;
Expand Down
34 changes: 23 additions & 11 deletions src/sql_jsc/postgres/PostgresSQLConnection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1800,16 +1800,27 @@ impl PostgresSQLConnection {
&& self.pipelined_requests.get() == 0
}

/// Process pending requests and flush. Called from the enqueue path when
/// unnamed prepared statements with params skip writeQuery+Sync and need
/// advance() to send everything atomically on an idle connection.
/// Process pending requests and flush. Called from the enqueue path so
/// newly queued requests are written as soon as ordering allows: prepared
/// statements pipeline behind in-flight pipelined requests, unnamed
/// prepared statements with params get Parse+Bind+Execute sent atomically
/// on an idle connection, and requests that must wait (simple queries,
/// statements that still need preparing) stay queued until advance()
/// reaches them in FIFO order.
///
/// Gated on WAITING_TO_PREPARE because advance() skips over requests whose
/// statement is still Parsing; entering it with a Parse outstanding could
/// write a later request ahead of an earlier unwritten one.
pub fn advance_and_flush(&self) {
let flags = self.flags.get();
if !flags.contains(ConnectionFlags::HAS_BACKPRESSURE)
&& flags.contains(ConnectionFlags::IS_READY_FOR_QUERY)
&& !flags.contains(ConnectionFlags::WAITING_TO_PREPARE)
{
self.advance();
self.flush_data();
// defer the flush, so many queries enqueued in the same tick
// coalesce into one socket write (same batching as
// flush_data_and_reset_timeout)
self.register_auto_flusher();
}
}

Expand Down Expand Up @@ -2276,12 +2287,13 @@ impl PostgresSQLConnection {
}

QueryStatus::Running | QueryStatus::Binding | QueryStatus::PartialResponse => {
if self
.flags
.get()
.contains(ConnectionFlags::WAITING_TO_PREPARE)
|| self.nonpipelinable_requests.get() > 0
{
// can_pipeline() covers WAITING_TO_PREPARE and
// nonpipelinable_requests, and additionally stops us from
// writing past in-flight requests when pipelining is off
// (BUN_FEATURE_FLAG_DISABLE_SQL_AUTO_PIPELINING, unnamed
// prepared statements) or the write buffer is already at
// MAX_PIPELINE_SIZE.
if !self.can_pipeline() {
defer_cleanup!(self);
return;
}
Expand Down
55 changes: 12 additions & 43 deletions src/sql_jsc/postgres/PostgresSQLQuery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,50 +657,19 @@ impl PostgresSQLQuery {
unsafe { Self::deref(this_ptr) };
return Err(global_object.throw_value(error_response));
}
StatementStatus::Prepared => {
if !connection.has_query_running() || connection.can_pipeline() {
this.update_flags(|f| f.binary = !stmt.fields.is_empty());
bun_core::scoped_log!(Postgres, "bindAndExecute");

// bindAndExecute will bind + execute, it will change to running after binding is complete
if let Err(err) = PostgresRequest::bind_and_execute(
global_object,
stmt,
binding_value,
columns_value,
writer,
) {
// fail to run do cleanup — drop the ref we took above.
this.release_statement();
// SAFETY: undoes the speculative `this.ref_()` above; count was ≥2, never frees here.
unsafe { Self::deref(this_ptr) };

if !global_object.has_exception() {
return Err(global_object.throw_value(
postgres_error_to_js(
global_object,
Some(b"failed to bind and execute query"),
err,
),
));
}
return Err(JsError::Thrown);
}
{
let mut f = connection.flags.get();
f.set(ConnectionFlags::IS_READY_FOR_QUERY, false);
connection.flags.set(f);
}
this.status.set(Status::Binding);
this.update_flags(|f| f.pipelined = true);
connection
.pipelined_requests
.set(connection.pipelined_requests.get() + 1);

did_write = true;
}
StatementStatus::Prepared
| StatementStatus::Parsing
| StatementStatus::Pending => {
// Leave the request Pending; advance() (reached via
// advance_and_flush() after the enqueue below) writes it only
// once every earlier queued request has been written, so wire
// order always matches queue order. Writing Bind+Execute here
// could bypass a queued-but-unwritten request (e.g. a
// simple-protocol COMMIT waiting for the pipeline to drain);
// responses are matched to requests in FIFO queue order, so
// the bypassed request would steal this query's result and
// permanently desync the connection.
}
StatementStatus::Parsing | StatementStatus::Pending => {}
}
Comment thread
robobun marked this conversation as resolved.

break 'enqueue;
Expand Down
56 changes: 56 additions & 0 deletions test/js/sql/postgres-pool-pipeline-flag-fixture.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

89 changes: 89 additions & 0 deletions test/js/sql/postgres-pool-transaction-stall-fixture.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading