Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 29 additions & 31 deletions materialize-postgres/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,46 +664,44 @@ func (d *transactor) Store(it *m.StoreIterator) (_ m.StartCommitFunc, err error)

defuser.Defuse()
return func(ctx context.Context, runtimeCheckpoint *protocol.Checkpoint) (*pf.ConnectorState, m.OpFuture) {
return nil, m.RunAsyncOperation(func() error {
defer txn.Rollback(ctx)
defer txn.Rollback(ctx)

var err error
if d.store.fence.Checkpoint, err = runtimeCheckpoint.Marshal(); err != nil {
return fmt.Errorf("marshalling checkpoint: %w", err)
}
var err error
if d.store.fence.Checkpoint, err = runtimeCheckpoint.Marshal(); err != nil {
return nil, m.FinishedOperation(fmt.Errorf("marshalling checkpoint: %w", err))
}

var fenceUpdate strings.Builder
if err := d.templates.updateFence.Execute(&fenceUpdate, d.store.fence); err != nil {
return fmt.Errorf("evaluating fence template: %w", err)
}
var fenceUpdate strings.Builder
if err = d.templates.updateFence.Execute(&fenceUpdate, d.store.fence); err != nil {
return nil, m.FinishedOperation(fmt.Errorf("evaluating fence template: %w", err))
}

// Add the update to the fence as the last statement in the batch.
batch.Queue(fenceUpdate.String())
// Add the update to the fence as the last statement in the batch.
batch.Queue(fenceUpdate.String())

results := txn.SendBatch(ctx, &batch)
var results = txn.SendBatch(ctx, &batch)

// Execute all remaining doc inserts & updates.
for i := 0; i < batch.Len()-1; i++ {
if _, err := results.Exec(); err != nil {
return fmt.Errorf("store at index %d: %w", i, err)
}
// Execute all remaining doc inserts & updates.
for i := 0; i < batch.Len()-1; i++ {
if _, err = results.Exec(); err != nil {
return nil, m.FinishedOperation(fmt.Errorf("store at index %d: %w", i, err))
}
}

// The fence update is always the last operation in the batch.
if _, err := results.Exec(); err != nil {
return fmt.Errorf("updating flow checkpoint: %w", err)
} else if err = results.Close(); err != nil {
return fmt.Errorf("results.Close(): %w", err)
}
// The fence update is always the last operation in the batch.
if _, err = results.Exec(); err != nil {
return nil, m.FinishedOperation(fmt.Errorf("updating flow checkpoint: %w", err))
} else if err = results.Close(); err != nil {
return nil, m.FinishedOperation(fmt.Errorf("results.Close(): %w", err))
}

commitCtx, cancel := ctxWithQueryTimeout(ctx)
defer cancel()
if err := txn.Commit(commitCtx); err != nil {
return fmt.Errorf("committing Store transaction: %w", err)
}
var commitCtx, cancel = ctxWithQueryTimeout(ctx)
defer cancel()
if err = txn.Commit(commitCtx); err != nil {
return nil, m.FinishedOperation(fmt.Errorf("committing Store transaction: %w", err))
}

return nil
})
return nil, nil
}, nil
}

Expand Down
Loading