Skip to content

Commit 6f44221

Browse files
committed
Make upsert robust to large inputs
1 parent bd88ee3 commit 6f44221

1 file changed

Lines changed: 92 additions & 21 deletions

File tree

hotshot-query-service/src/data_source/storage/sql/transaction.rs

Lines changed: 92 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,12 @@ where
392392

393393
/// Low-level, general database queries and mutation.
394394
impl Transaction<Write> {
395+
/// Maximum number of parameters allowed in a single query.
396+
///
397+
/// This should be safely under the hard limit imposed by the Postgres client, which is either
398+
/// 32,767 or 65,535.
399+
const STATEMENT_MAX_PARAMETERS: usize = 30_000;
400+
395401
pub async fn upsert<'p, const N: usize, R>(
396402
&mut self,
397403
table: &str,
@@ -420,27 +426,40 @@ impl Transaction<Write> {
420426
return Ok(());
421427
}
422428

423-
let mut query_builder =
424-
QueryBuilder::new(format!("INSERT INTO \"{table}\" ({columns_str}) "));
425-
query_builder.push_values(rows, |mut b, row| {
426-
row.bind(&mut b);
427-
});
428-
query_builder.push(format!(" ON CONFLICT ({pk}) DO UPDATE SET {set_columns}"));
429-
430-
let query = query_builder.build();
431-
let statement = query.sql();
432-
433-
let res = self.execute(query).await.inspect_err(|err| {
434-
tracing::error!(statement, "error in statement execution: {err:#}");
435-
})?;
436-
let rows_modified = res.rows_affected() as usize;
437-
if rows_modified != num_rows {
438-
let error = format!(
439-
"unexpected number of rows modified: expected {num_rows}, got {rows_modified}. \
440-
query: {statement}"
441-
);
442-
tracing::error!(error);
443-
bail!(error);
429+
// For very large upserts, we might need to proceed in chunks to avoid exceeding the maximum
430+
// number of parameters in a single statement.
431+
let rows_per_chunk = Self::STATEMENT_MAX_PARAMETERS / N;
432+
let mut rows = rows.into_iter();
433+
loop {
434+
let chunk = rows.by_ref().take(rows_per_chunk).collect::<Vec<_>>();
435+
if chunk.is_empty() {
436+
break;
437+
}
438+
let num_rows = chunk.len();
439+
tracing::debug!(num_rows, "upsert chunk");
440+
441+
let mut query_builder =
442+
QueryBuilder::new(format!("INSERT INTO \"{table}\" ({columns_str}) "));
443+
query_builder.push_values(chunk, |mut b, row| {
444+
row.bind(&mut b);
445+
});
446+
query_builder.push(format!(" ON CONFLICT ({pk}) DO UPDATE SET {set_columns}"));
447+
448+
let query = query_builder.build();
449+
let statement = query.sql();
450+
451+
let res = self.execute(query).await.inspect_err(|err| {
452+
tracing::error!(statement, "error in statement execution: {err:#}");
453+
})?;
454+
let rows_modified = res.rows_affected() as usize;
455+
if rows_modified != num_rows {
456+
let error = format!(
457+
"unexpected number of rows modified: expected {num_rows}, got \
458+
{rows_modified}. query: {statement}"
459+
);
460+
tracing::error!(error);
461+
bail!(error);
462+
}
444463
}
445464
Ok(())
446465
}
@@ -907,3 +926,55 @@ impl PoolMetrics {
907926
}
908927
}
909928
}
929+
930+
#[cfg(test)]
931+
mod test {
932+
use super::*;
933+
use crate::data_source::{
934+
Transaction as _, VersionedDataSource,
935+
sql::testing::TmpDb,
936+
storage::{SqlStorage, StorageConnectionType},
937+
};
938+
939+
#[tokio::test]
940+
#[test_log::test]
941+
async fn test_upsert_many_rows() {
942+
let db = TmpDb::init().await;
943+
let storage = SqlStorage::connect(db.config(), StorageConnectionType::Sequencer)
944+
.await
945+
.unwrap();
946+
947+
let mut tx = storage.write().await.unwrap();
948+
query(
949+
"CREATE TABLE test (
950+
a INT PRIMARY KEY,
951+
b INT,
952+
c INT
953+
)",
954+
)
955+
.execute(tx.as_mut())
956+
.await
957+
.unwrap();
958+
tx.commit().await.unwrap();
959+
960+
// use a non-integer number of chunks.
961+
let n = (2 * Transaction::STATEMENT_MAX_PARAMETERS
962+
+ (Transaction::STATEMENT_MAX_PARAMETERS / 2)) as i32;
963+
let rows = (0..n).map(|i| (i, i, i)).collect::<Vec<_>>();
964+
965+
let mut tx = storage.write().await.unwrap();
966+
tx.upsert("test", ["a", "b", "c"], ["a"], rows.clone())
967+
.await
968+
.unwrap();
969+
tx.commit().await.unwrap();
970+
971+
let mut tx = storage.read().await.unwrap();
972+
assert_eq!(
973+
rows,
974+
query_as("SELECT * FROM test ORDER BY a")
975+
.fetch_all(tx.as_mut())
976+
.await
977+
.unwrap()
978+
);
979+
}
980+
}

0 commit comments

Comments
 (0)