diff --git a/materialize-motherduck/.snapshots/TestSQLGeneration b/materialize-motherduck/.snapshots/TestSQLGeneration index 1461ec36b6..dd635f1f3f 100644 --- a/materialize-motherduck/.snapshots/TestSQLGeneration +++ b/materialize-motherduck/.snapshots/TestSQLGeneration @@ -56,6 +56,7 @@ JOIN read_json( ['s3://bucket/file1', 's3://bucket/file2'], format='newline_delimited', compression='gzip', + ignore_errors=false, columns={ key1: 'BIGINT NOT NULL', key2: 'BOOLEAN NOT NULL', @@ -97,10 +98,11 @@ json_object( 'stringNumber', CAST(l."stringNumber" AS VARCHAR) ) as doc FROM db."a-schema".key_value AS l -JOIN read_json( - ['s3://bucket/file1', 's3://bucket/file2'], - format='newline_delimited', - compression='gzip', +JOIN read_json( + ['s3://bucket/file1', 's3://bucket/file2'], + format='newline_delimited', + compression='gzip', + ignore_errors=false, columns={ key1: 'BIGINT NOT NULL', key2: 'BOOLEAN NOT NULL', @@ -118,6 +120,7 @@ USING read_json( ['s3://bucket/file1', 's3://bucket/file2'], format='newline_delimited', compression='gzip', + ignore_errors=false, maximum_object_size=67108864, columns={ key1: 'BIGINT NOT NULL', @@ -160,6 +163,7 @@ SELECT * EXCLUDE (_flow_delete) FROM read_json( ['s3://bucket/file1', 's3://bucket/file2'], format='newline_delimited', compression='gzip', + ignore_errors=false, maximum_object_size=67108864, columns={ key1: 'BIGINT NOT NULL', @@ -200,6 +204,7 @@ SELECT * EXCLUDE (_flow_delete) FROM read_json( ['s3://bucket/file1', 's3://bucket/file2'], format='newline_delimited', compression='gzip', + ignore_errors=false, maximum_object_size=67108864, columns={ "theKey": 'VARCHAR NOT NULL', diff --git a/materialize-motherduck/driver.go b/materialize-motherduck/driver.go index 23087921e8..3156784ee1 100644 --- a/materialize-motherduck/driver.go +++ b/materialize-motherduck/driver.go @@ -143,7 +143,16 @@ type binding struct { target sql.Table nullFieldsToStrip []string mustMerge bool - expectedInserts int + // Per-transaction counters of staged rows, classified by the operation + // they represent. Used to verify what we asked Motherduck to do against + // what it reports back, and to surface the breakdown in logs so missing + // records can be traced to a specific stage. + // stagedInserts: it.Exists=false, !flowDelete (brand new rows) + // stagedUpdates: it.Exists=true, !flowDelete (existing rows being upserted) + // stagedHardDeletes: flowDelete=true (existing rows being hard-deleted) + stagedInserts int + stagedUpdates int + stagedHardDeletes int loadMergeBounds *sql.MergeBoundsBuilder storeMergeBounds *sql.MergeBoundsBuilder } @@ -315,8 +324,13 @@ func (d *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) { if it.Exists { b.mustMerge = true } - if !flowDelete { - b.expectedInserts++ + switch { + case flowDelete: + b.stagedHardDeletes++ + case it.Exists: + b.stagedUpdates++ + default: + b.stagedInserts++ } if converted, err := b.target.ConvertAll(it.Key, it.Values, it.RawJSON); err != nil { @@ -356,10 +370,13 @@ var retryableDuckdbErrors = []duckdb.ErrorType{ type bindingCommit struct { path []string queries []string - // expectedInserts is the number of rows we expect the final query - // (the INSERT) to affect. Verified after execution to catch silent - // row drops in the read_json → INSERT pipeline. - expectedInserts int + // hasDeleteQuery indicates whether queries[0] is the DELETE that runs + // before the INSERT (i.e. the binding has rows that already existed). + // The final query is always the INSERT. + hasDeleteQuery bool + stagedInserts int + stagedUpdates int + stagedHardDeletes int } func (d *transactor) commit(ctx context.Context, fenceUpdate string) error { @@ -393,15 +410,29 @@ func (d *transactor) commit(ctx context.Context, fenceUpdate string) error { if err := d.templates.storeQuery.Execute(&storeQuery, params); err != nil { return err } + + log.WithFields(log.Fields{ + "table": b.target.Path, + "stagedInserts": b.stagedInserts, + "stagedUpdates": b.stagedUpdates, + "stagedHardDeletes": b.stagedHardDeletes, + "mustMerge": b.mustMerge, + }).Info("motherduck: prepared store commit") + commits = append(commits, bindingCommit{ - path: b.target.Path, - queries: append(queries, storeQuery.String()), - expectedInserts: b.expectedInserts, + path: b.target.Path, + queries: append(queries, storeQuery.String()), + hasDeleteQuery: b.mustMerge, + stagedInserts: b.stagedInserts, + stagedUpdates: b.stagedUpdates, + stagedHardDeletes: b.stagedHardDeletes, }) // Reset for next round. b.mustMerge = false - b.expectedInserts = 0 + b.stagedInserts = 0 + b.stagedUpdates = 0 + b.stagedHardDeletes = 0 } for attempt := 1; ; attempt++ { @@ -443,27 +474,49 @@ func (d *transactor) commitBindings(ctx context.Context, bindings []bindingCommi for _, b := range bindings { d.be.StartedResourceCommit(b.path) + + expectedInserts := int64(b.stagedInserts + b.stagedUpdates) + // The DELETE removes one row per existing key, regardless of whether + // that key is being upserted or hard-deleted, so the runtime-side + // expectation is the sum of both. + expectedDeletes := int64(b.stagedUpdates + b.stagedHardDeletes) + + var deleteRowsAffected, insertRowsAffected int64 for i, query := range b.queries { res, err := txn.ExecContext(txnCtx, query) if err != nil { return fmt.Errorf("executing store query for %s: %w", b.path, err) } - // The final query is always the INSERT (storeQuery). Verify - // it affected exactly the expected number of rows so a silent - // drop between the staged JSON file and the INSERT (e.g., a - // row filtered by `WHERE NOT _flow_delete` due to a NULL or - // missing field) surfaces as an error rather than data loss. - if i != len(b.queries)-1 { - continue - } rows, err := res.RowsAffected() if err != nil { return fmt.Errorf("getting rows affected for %s: %w", b.path, err) } - if rows != int64(b.expectedInserts) { - return fmt.Errorf("expected %d rows inserted for %s, got %d", b.expectedInserts, b.path, rows) + if b.hasDeleteQuery && i == 0 { + deleteRowsAffected = rows + } else if i == len(b.queries)-1 { + insertRowsAffected = rows } } + + log.WithFields(log.Fields{ + "table": b.path, + "stagedInserts": b.stagedInserts, + "stagedUpdates": b.stagedUpdates, + "stagedHardDeletes": b.stagedHardDeletes, + "expectedInserts": expectedInserts, + "expectedDeletes": expectedDeletes, + "insertRowsAffected": insertRowsAffected, + "deleteRowsAffected": deleteRowsAffected, + "hasDeleteQuery": b.hasDeleteQuery, + }).Info("motherduck: store commit reported by motherduck") + + // Verify the INSERT affected exactly the expected number of rows so + // a silent drop between the staged JSON file and the INSERT (e.g., + // a row filtered by `WHERE NOT _flow_delete` due to a NULL or + // missing field) surfaces as an error rather than data loss. + if insertRowsAffected != expectedInserts { + return fmt.Errorf("expected %d rows inserted for %s, got %d", expectedInserts, b.path, insertRowsAffected) + } d.be.FinishedResourceCommit(b.path) } diff --git a/materialize-motherduck/sqlgen.go b/materialize-motherduck/sqlgen.go index 4d84f21c72..515eed84bb 100644 --- a/materialize-motherduck/sqlgen.go +++ b/materialize-motherduck/sqlgen.go @@ -160,6 +160,7 @@ JOIN read_json( ], format='newline_delimited', compression='gzip', + ignore_errors=false, columns={ {{- range $ind, $bound := $.Bounds }} {{- if $ind }},{{ end }} @@ -189,6 +190,7 @@ USING read_json( ], format='newline_delimited', compression='gzip', + ignore_errors=false, maximum_object_size=` + strconv.FormatUint(MAX_OBJECT_BYTES, 10) + `, columns={ {{- range $ind, $col := $.Columns }} @@ -214,6 +216,7 @@ SELECT * EXCLUDE (_flow_delete) FROM read_json( ], format='newline_delimited', compression='gzip', + ignore_errors=false, maximum_object_size=` + strconv.FormatUint(MAX_OBJECT_BYTES, 10) + `, columns={ {{- range $ind, $col := $.Columns }} @@ -254,15 +257,16 @@ json_object( {{- end}} ) as doc FROM {{ $.Identifier }} AS l -JOIN read_json( - [ - {{- range $ind, $f := $.Files }} - {{- if $ind }}, {{ end }}'{{ $f }}' - {{- end -}} - ], - format='newline_delimited', - compression='gzip', - columns={ +JOIN read_json( + [ + {{- range $ind, $f := $.Files }} + {{- if $ind }}, {{ end }}'{{ $f }}' + {{- end -}} + ], + format='newline_delimited', + compression='gzip', + ignore_errors=false, + columns={ {{- range $ind, $bound := $.Bounds }} {{- if $ind }},{{ end }} {{$bound.Identifier}}: '{{$bound.DDL}}'