diff --git a/materialize-postgres/driver.go b/materialize-postgres/driver.go index b2af00631f..bc27a4e40c 100644 --- a/materialize-postgres/driver.go +++ b/materialize-postgres/driver.go @@ -664,46 +664,44 @@ func (d *transactor) Store(it *m.StoreIterator) (_ m.StartCommitFunc, err error) defuser.Defuse() return func(ctx context.Context, runtimeCheckpoint *protocol.Checkpoint) (*pf.ConnectorState, m.OpFuture) { - return nil, m.RunAsyncOperation(func() error { - defer txn.Rollback(ctx) + defer txn.Rollback(ctx) - var err error - if d.store.fence.Checkpoint, err = runtimeCheckpoint.Marshal(); err != nil { - return fmt.Errorf("marshalling checkpoint: %w", err) - } + var err error + if d.store.fence.Checkpoint, err = runtimeCheckpoint.Marshal(); err != nil { + return nil, m.FinishedOperation(fmt.Errorf("marshalling checkpoint: %w", err)) + } - var fenceUpdate strings.Builder - if err := d.templates.updateFence.Execute(&fenceUpdate, d.store.fence); err != nil { - return fmt.Errorf("evaluating fence template: %w", err) - } + var fenceUpdate strings.Builder + if err = d.templates.updateFence.Execute(&fenceUpdate, d.store.fence); err != nil { + return nil, m.FinishedOperation(fmt.Errorf("evaluating fence template: %w", err)) + } - // Add the update to the fence as the last statement in the batch. - batch.Queue(fenceUpdate.String()) + // Add the update to the fence as the last statement in the batch. + batch.Queue(fenceUpdate.String()) - results := txn.SendBatch(ctx, &batch) + var results = txn.SendBatch(ctx, &batch) - // Execute all remaining doc inserts & updates. - for i := 0; i < batch.Len()-1; i++ { - if _, err := results.Exec(); err != nil { - return fmt.Errorf("store at index %d: %w", i, err) - } + // Execute all remaining doc inserts & updates. + for i := 0; i < batch.Len()-1; i++ { + if _, err = results.Exec(); err != nil { + return nil, m.FinishedOperation(fmt.Errorf("store at index %d: %w", i, err)) } + } - // The fence update is always the last operation in the batch. - if _, err := results.Exec(); err != nil { - return fmt.Errorf("updating flow checkpoint: %w", err) - } else if err = results.Close(); err != nil { - return fmt.Errorf("results.Close(): %w", err) - } + // The fence update is always the last operation in the batch. + if _, err = results.Exec(); err != nil { + return nil, m.FinishedOperation(fmt.Errorf("updating flow checkpoint: %w", err)) + } else if err = results.Close(); err != nil { + return nil, m.FinishedOperation(fmt.Errorf("results.Close(): %w", err)) + } - commitCtx, cancel := ctxWithQueryTimeout(ctx) - defer cancel() - if err := txn.Commit(commitCtx); err != nil { - return fmt.Errorf("committing Store transaction: %w", err) - } + var commitCtx, cancel = ctxWithQueryTimeout(ctx) + defer cancel() + if err = txn.Commit(commitCtx); err != nil { + return nil, m.FinishedOperation(fmt.Errorf("committing Store transaction: %w", err)) + } - return nil - }) + return nil, nil }, nil }