Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions materialize-motherduck/.snapshots/TestSQLGeneration
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ JOIN read_json(
['s3://bucket/file1', 's3://bucket/file2'],
format='newline_delimited',
compression='gzip',
ignore_errors=false,
columns={
key1: 'BIGINT NOT NULL',
key2: 'BOOLEAN NOT NULL',
Expand Down Expand Up @@ -97,10 +98,11 @@ json_object(
'stringNumber', CAST(l."stringNumber" AS VARCHAR)
) as doc
FROM db."a-schema".key_value AS l
JOIN read_json(
['s3://bucket/file1', 's3://bucket/file2'],
format='newline_delimited',
compression='gzip',
JOIN read_json(
['s3://bucket/file1', 's3://bucket/file2'],
format='newline_delimited',
compression='gzip',
ignore_errors=false,
columns={
key1: 'BIGINT NOT NULL',
key2: 'BOOLEAN NOT NULL',
Expand All @@ -118,6 +120,7 @@ USING read_json(
['s3://bucket/file1', 's3://bucket/file2'],
format='newline_delimited',
compression='gzip',
ignore_errors=false,
maximum_object_size=67108864,
columns={
key1: 'BIGINT NOT NULL',
Expand Down Expand Up @@ -160,6 +163,7 @@ SELECT * EXCLUDE (_flow_delete) FROM read_json(
['s3://bucket/file1', 's3://bucket/file2'],
format='newline_delimited',
compression='gzip',
ignore_errors=false,
maximum_object_size=67108864,
columns={
key1: 'BIGINT NOT NULL',
Expand Down Expand Up @@ -200,6 +204,7 @@ SELECT * EXCLUDE (_flow_delete) FROM read_json(
['s3://bucket/file1', 's3://bucket/file2'],
format='newline_delimited',
compression='gzip',
ignore_errors=false,
maximum_object_size=67108864,
columns={
"theKey": 'VARCHAR NOT NULL',
Expand Down
95 changes: 74 additions & 21 deletions materialize-motherduck/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,16 @@ type binding struct {
target sql.Table
nullFieldsToStrip []string
mustMerge bool
expectedInserts int
// Per-transaction counters of staged rows, classified by the operation
// they represent. Used to verify what we asked Motherduck to do against
// what it reports back, and to surface the breakdown in logs so missing
// records can be traced to a specific stage.
// stagedInserts: it.Exists=false, !flowDelete (brand new rows)
// stagedUpdates: it.Exists=true, !flowDelete (existing rows being upserted)
// stagedHardDeletes: flowDelete=true (existing rows being hard-deleted)
stagedInserts int
stagedUpdates int
stagedHardDeletes int
loadMergeBounds *sql.MergeBoundsBuilder
storeMergeBounds *sql.MergeBoundsBuilder
}
Expand Down Expand Up @@ -315,8 +324,13 @@ func (d *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
if it.Exists {
b.mustMerge = true
}
if !flowDelete {
b.expectedInserts++
switch {
case flowDelete:
b.stagedHardDeletes++
case it.Exists:
b.stagedUpdates++
default:
b.stagedInserts++
}

if converted, err := b.target.ConvertAll(it.Key, it.Values, it.RawJSON); err != nil {
Expand Down Expand Up @@ -356,10 +370,13 @@ var retryableDuckdbErrors = []duckdb.ErrorType{
type bindingCommit struct {
path []string
queries []string
// expectedInserts is the number of rows we expect the final query
// (the INSERT) to affect. Verified after execution to catch silent
// row drops in the read_json → INSERT pipeline.
expectedInserts int
// hasDeleteQuery indicates whether queries[0] is the DELETE that runs
// before the INSERT (i.e. the binding has rows that already existed).
// The final query is always the INSERT.
hasDeleteQuery bool
stagedInserts int
stagedUpdates int
stagedHardDeletes int
}

func (d *transactor) commit(ctx context.Context, fenceUpdate string) error {
Expand Down Expand Up @@ -393,15 +410,29 @@ func (d *transactor) commit(ctx context.Context, fenceUpdate string) error {
if err := d.templates.storeQuery.Execute(&storeQuery, params); err != nil {
return err
}

log.WithFields(log.Fields{
"table": b.target.Path,
"stagedInserts": b.stagedInserts,
"stagedUpdates": b.stagedUpdates,
"stagedHardDeletes": b.stagedHardDeletes,
"mustMerge": b.mustMerge,
}).Info("motherduck: prepared store commit")

commits = append(commits, bindingCommit{
path: b.target.Path,
queries: append(queries, storeQuery.String()),
expectedInserts: b.expectedInserts,
path: b.target.Path,
queries: append(queries, storeQuery.String()),
hasDeleteQuery: b.mustMerge,
stagedInserts: b.stagedInserts,
stagedUpdates: b.stagedUpdates,
stagedHardDeletes: b.stagedHardDeletes,
})

// Reset for next round.
b.mustMerge = false
b.expectedInserts = 0
b.stagedInserts = 0
b.stagedUpdates = 0
b.stagedHardDeletes = 0
}

for attempt := 1; ; attempt++ {
Expand Down Expand Up @@ -443,27 +474,49 @@ func (d *transactor) commitBindings(ctx context.Context, bindings []bindingCommi

for _, b := range bindings {
d.be.StartedResourceCommit(b.path)

expectedInserts := int64(b.stagedInserts + b.stagedUpdates)
// The DELETE removes one row per existing key, regardless of whether
// that key is being upserted or hard-deleted, so the runtime-side
// expectation is the sum of both.
expectedDeletes := int64(b.stagedUpdates + b.stagedHardDeletes)

var deleteRowsAffected, insertRowsAffected int64
for i, query := range b.queries {
res, err := txn.ExecContext(txnCtx, query)
if err != nil {
return fmt.Errorf("executing store query for %s: %w", b.path, err)
}
// The final query is always the INSERT (storeQuery). Verify
// it affected exactly the expected number of rows so a silent
// drop between the staged JSON file and the INSERT (e.g., a
// row filtered by `WHERE NOT _flow_delete` due to a NULL or
// missing field) surfaces as an error rather than data loss.
if i != len(b.queries)-1 {
continue
}
rows, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("getting rows affected for %s: %w", b.path, err)
}
if rows != int64(b.expectedInserts) {
return fmt.Errorf("expected %d rows inserted for %s, got %d", b.expectedInserts, b.path, rows)
if b.hasDeleteQuery && i == 0 {
deleteRowsAffected = rows
} else if i == len(b.queries)-1 {
insertRowsAffected = rows
}
}

log.WithFields(log.Fields{
"table": b.path,
"stagedInserts": b.stagedInserts,
"stagedUpdates": b.stagedUpdates,
"stagedHardDeletes": b.stagedHardDeletes,
"expectedInserts": expectedInserts,
"expectedDeletes": expectedDeletes,
"insertRowsAffected": insertRowsAffected,
"deleteRowsAffected": deleteRowsAffected,
"hasDeleteQuery": b.hasDeleteQuery,
}).Info("motherduck: store commit reported by motherduck")

// Verify the INSERT affected exactly the expected number of rows so
// a silent drop between the staged JSON file and the INSERT (e.g.,
// a row filtered by `WHERE NOT _flow_delete` due to a NULL or
// missing field) surfaces as an error rather than data loss.
if insertRowsAffected != expectedInserts {
return fmt.Errorf("expected %d rows inserted for %s, got %d", expectedInserts, b.path, insertRowsAffected)
}
d.be.FinishedResourceCommit(b.path)
}

Expand Down
22 changes: 13 additions & 9 deletions materialize-motherduck/sqlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ JOIN read_json(
],
format='newline_delimited',
compression='gzip',
ignore_errors=false,
columns={
{{- range $ind, $bound := $.Bounds }}
{{- if $ind }},{{ end }}
Expand Down Expand Up @@ -189,6 +190,7 @@ USING read_json(
],
format='newline_delimited',
compression='gzip',
ignore_errors=false,
maximum_object_size=` + strconv.FormatUint(MAX_OBJECT_BYTES, 10) + `,
columns={
{{- range $ind, $col := $.Columns }}
Expand All @@ -214,6 +216,7 @@ SELECT * EXCLUDE (_flow_delete) FROM read_json(
],
format='newline_delimited',
compression='gzip',
ignore_errors=false,
maximum_object_size=` + strconv.FormatUint(MAX_OBJECT_BYTES, 10) + `,
columns={
{{- range $ind, $col := $.Columns }}
Expand Down Expand Up @@ -254,15 +257,16 @@ json_object(
{{- end}}
) as doc
FROM {{ $.Identifier }} AS l
JOIN read_json(
[
{{- range $ind, $f := $.Files }}
{{- if $ind }}, {{ end }}'{{ $f }}'
{{- end -}}
],
format='newline_delimited',
compression='gzip',
columns={
JOIN read_json(
[
{{- range $ind, $f := $.Files }}
{{- if $ind }}, {{ end }}'{{ $f }}'
{{- end -}}
],
format='newline_delimited',
compression='gzip',
ignore_errors=false,
columns={
{{- range $ind, $bound := $.Bounds }}
{{- if $ind }},{{ end }}
{{$bound.Identifier}}: '{{$bound.DDL}}'
Expand Down
Loading