diff --git a/source-sqlserver/.snapshots/TestPopulateSourceTimestamps b/source-sqlserver/.snapshots/TestPopulateSourceTimestamps new file mode 100644 index 0000000000..19d4397df0 --- /dev/null +++ b/source-sqlserver/.snapshots/TestPopulateSourceTimestamps @@ -0,0 +1,214 @@ +sql> CREATE TABLE dbo.PopulateSourceTimestamps_731765 (id INTEGER PRIMARY KEY, data TEXT) +sql> INSERT INTO dbo.PopulateSourceTimestamps_731765 VALUES (0, 'zero'), (1, 'one') +=== Discover: Discover Tables === +acmeCo/test_capture/dbo/populatesourcetimestamps_731765 +=== Run: Initial Backfill (flag off) === +[ + "acmeCo/test_capture/dbo/populatesourcetimestamps_731765", + { + "_meta": { + "op": "c", + "source": { + "lsn": "", + "schema": "dbo", + "seqval": "REDACTED", + "snapshot": true, + "table": "PopulateSourceTimestamps_731765" + }, + "uuid": "DocUUIDPlaceholder-329Bb50aa48EAa9ef" + }, + "data": "zero", + "id": 0 + } +] +[ + "acmeCo/test_capture/dbo/populatesourcetimestamps_731765", + { + "_meta": { + "op": "c", + "source": { + "lsn": "", + "schema": "dbo", + "seqval": "REDACTED", + "snapshot": true, + "table": "PopulateSourceTimestamps_731765" + }, + "uuid": "DocUUIDPlaceholder-329Bb50aa48EAa9ef" + }, + "data": "one", + "id": 1 + } +] +--- Final Checkpoint --- +{ + "bindingStateV1": { + "dbo%2FPopulateSourceTimestamps_731765": { + "backfilled": 2, + "key_columns": [ + "id" + ], + "mode": "Active" + } + }, + "cursor": "REDACTED" +} +sql> INSERT INTO dbo.PopulateSourceTimestamps_731765 VALUES (2, 'two') +=== Run: Replication (flag off) === +[ + "acmeCo/test_capture/dbo/populatesourcetimestamps_731765", + { + "_meta": { + "op": "c", + "source": { + "lsn": "REDACTED", + "schema": "dbo", + "seqval": "REDACTED", + "table": "PopulateSourceTimestamps_731765", + "updateMask": "Aw==" + }, + "uuid": "DocUUIDPlaceholder-329Bb50aa48EAa9ef" + }, + "data": "two", + "id": 2 + } +] +--- Final Checkpoint --- +{ + "bindingStateV1": { + "dbo%2FPopulateSourceTimestamps_731765": { + "backfilled": 2, + "key_columns": [ + "id" + ], + "mode": "Active" + } + }, + "cursor": "REDACTED" +} +sql> INSERT INTO dbo.PopulateSourceTimestamps_731765 VALUES (3, 'three'), (4, 'four'), (5, 'five') +=== Run: Replication (multi-row transaction, flag on) === +[ + "acmeCo/test_capture/dbo/populatesourcetimestamps_731765", + { + "_meta": { + "op": "c", + "source": { + "lsn": "REDACTED", + "schema": "dbo", + "seqval": "REDACTED", + "table": "PopulateSourceTimestamps_731765", + "ts_ms": 0, + "updateMask": "Aw==" + }, + "uuid": "DocUUIDPlaceholder-329Bb50aa48EAa9ef" + }, + "data": "three", + "id": 3 + } +] +[ + "acmeCo/test_capture/dbo/populatesourcetimestamps_731765", + { + "_meta": { + "op": "c", + "source": { + "lsn": "REDACTED", + "schema": "dbo", + "seqval": "REDACTED", + "table": "PopulateSourceTimestamps_731765", + "ts_ms": 0, + "updateMask": "Aw==" + }, + "uuid": "DocUUIDPlaceholder-329Bb50aa48EAa9ef" + }, + "data": "four", + "id": 4 + } +] +[ + "acmeCo/test_capture/dbo/populatesourcetimestamps_731765", + { + "_meta": { + "op": "c", + "source": { + "lsn": "REDACTED", + "schema": "dbo", + "seqval": "REDACTED", + "table": "PopulateSourceTimestamps_731765", + "ts_ms": 0, + "updateMask": "Aw==" + }, + "uuid": "DocUUIDPlaceholder-329Bb50aa48EAa9ef" + }, + "data": "five", + "id": 5 + } +] +--- Final Checkpoint --- +{ + "bindingStateV1": { + "dbo%2FPopulateSourceTimestamps_731765": { + "backfilled": 2, + "key_columns": [ + "id" + ], + "mode": "Active" + } + }, + "cursor": "REDACTED" +} +sql> INSERT INTO dbo.PopulateSourceTimestamps_731765 VALUES (6, 'six') +sql> INSERT INTO dbo.PopulateSourceTimestamps_731765 VALUES (7, 'seven') +=== Run: Replication (separate transactions, flag on) === +[ + "acmeCo/test_capture/dbo/populatesourcetimestamps_731765", + { + "_meta": { + "op": "c", + "source": { + "lsn": "REDACTED", + "schema": "dbo", + "seqval": "REDACTED", + "table": "PopulateSourceTimestamps_731765", + "ts_ms": 0, + "updateMask": "Aw==" + }, + "uuid": "DocUUIDPlaceholder-329Bb50aa48EAa9ef" + }, + "data": "six", + "id": 6 + } +] +[ + "acmeCo/test_capture/dbo/populatesourcetimestamps_731765", + { + "_meta": { + "op": "c", + "source": { + "lsn": "REDACTED", + "schema": "dbo", + "seqval": "REDACTED", + "table": "PopulateSourceTimestamps_731765", + "ts_ms": 0, + "updateMask": "Aw==" + }, + "uuid": "DocUUIDPlaceholder-329Bb50aa48EAa9ef" + }, + "data": "seven", + "id": 7 + } +] +--- Final Checkpoint --- +{ + "bindingStateV1": { + "dbo%2FPopulateSourceTimestamps_731765": { + "backfilled": 2, + "key_columns": [ + "id" + ], + "mode": "Active" + } + }, + "cursor": "REDACTED" +} + diff --git a/source-sqlserver/.snapshots/TestSpec b/source-sqlserver/.snapshots/TestSpec index 24111150a0..f7d1397b90 100644 --- a/source-sqlserver/.snapshots/TestSpec +++ b/source-sqlserver/.snapshots/TestSpec @@ -92,6 +92,12 @@ "title": "Source Tag", "description": "When set the capture will add this value as the property 'tag' in the source metadata of each document." }, + "populate_source_ts_ms": { + "type": "boolean", + "title": "Populate Source Timestamps", + "description": "When set the connector will populate the source timestamp on CDC change events with the transaction commit time. The commit time is looked up from cdc.lsn_time_mapping after each polling cycle. This adds one extra query per change batch and is therefore opt-in. Backfill rows do not have a meaningful commit time so the source timestamp will not be set on them.", + "default": false + }, "feature_flags": { "type": "string", "title": "Feature Flags", diff --git a/source-sqlserver/README.md b/source-sqlserver/README.md index 17e10a56ca..78b6e7db8d 100644 --- a/source-sqlserver/README.md +++ b/source-sqlserver/README.md @@ -25,6 +25,20 @@ This is mostly an issue for small-scale testing, as these overheads are more or less fixed and unrelated to the actual volume of changes. But this makes our automated test suite runs take 10-20x longer than they do on other databases. +### Transaction commit timestamps (`_meta/source/ts_ms`) + +By default the connector does not populate `_meta/source/ts_ms` on CDC events, +because SQL Server CDC change tables do not contain commit timestamps inline. +Setting the advanced option `populate_source_ts_ms` causes the connector to +range-prefetch commit times from `cdc.lsn_time_mapping` over each polling +cycle's LSN range and apply them as `_meta/source/ts_ms` on emitted change +events. The lookup is one indexed range query per polling cycle, paginated +when the range is wide enough to cross the prefetch page size. Backfill rows +do not have a meaningful commit time and will not have `ts_ms` set, matching +the behavior of the Postgres and MySQL connectors. The standard +`GRANT SELECT ON SCHEMA :: cdc TO flow_capture` already grants the necessary +permissions on `cdc.lsn_time_mapping`, so no additional setup is required. + ### Developing Some useful commands for working with a test instance of SQL Server: diff --git a/source-sqlserver/backfill.go b/source-sqlserver/backfill.go index 69b598064a..cd15469ad1 100644 --- a/source-sqlserver/backfill.go +++ b/source-sqlserver/backfill.go @@ -180,6 +180,7 @@ func (db *sqlserverDatabase) ScanTableChunk(ctx context.Context, info *sqlcaptur Operation: sqlcapture.InsertOp, Source: sqlserverSourceInfoCDC{ SourceCommon: sqlcapture.SourceCommon{ + Millis: 0, // Not known. Schema: schema, Snapshot: true, Table: table, diff --git a/source-sqlserver/capture_test.go b/source-sqlserver/capture_test.go index c3c33c4bb4..4105945914 100644 --- a/source-sqlserver/capture_test.go +++ b/source-sqlserver/capture_test.go @@ -1,15 +1,19 @@ package main import ( + "bytes" "context" + "encoding/json" "fmt" "os" + "regexp" "strings" "sync/atomic" "testing" "time" "github.com/bradleyjkemp/cupaloy" + "github.com/estuary/connectors/go/capture/blackbox" "github.com/estuary/connectors/sqlcapture" "github.com/stretchr/testify/require" ) @@ -375,3 +379,82 @@ func TestCatalogPrimaryKey(t *testing.T) { tc.Run("Replication", -1) cupaloy.SnapshotT(t, tc.Transcript.String()) } + +// TestPopulateSourceTimestamps verifies the populate_source_ts_ms advanced +// option: CDC events carry the transaction commit time as +// _meta/source/ts_ms; backfill rows and flag-off captures do not. +func TestPopulateSourceTimestamps(t *testing.T) { + var db, tc = blackboxTestSetup(t) + db.CreateTable(t, ``, `(id INTEGER PRIMARY KEY, data TEXT)`) + + // Redact ts_ms values in the snapshot so commit timestamps don't + // destabilize the snapshot across runs. Equality and ordering are + // asserted programmatically below against the raw capture output. + tc.DocumentSanitizers = append(tc.DocumentSanitizers, blackbox.JSONSanitizer{ + Matcher: regexp.MustCompile(`"ts_ms":\d+`), + Replacement: `"ts_ms":0`, + }) + + // Backfill (flag off): no ts_ms expected. + db.Exec(t, `INSERT INTO VALUES (0, 'zero'), (1, 'one')`) + tc.Discover("Discover Tables") + require.Empty(t, extractTsMsValues(t, tc.Run("Initial Backfill (flag off)", -1)), + "backfill events must not have ts_ms") + + // Replication (flag off): no ts_ms expected. + db.Exec(t, `INSERT INTO VALUES (2, 'two')`) + require.Empty(t, extractTsMsValues(t, tc.Run("Replication (flag off)", -1)), + "flag-off replication events must not have ts_ms") + + require.NoError(t, tc.Capture.EditConfig("advanced.populate_source_ts_ms", true)) + + // Single transaction with multiple rows: all rows must share ts_ms. + db.Exec(t, `INSERT INTO VALUES (3, 'three'), (4, 'four'), (5, 'five')`) + multiTxn := extractTsMsValues(t, tc.Run("Replication (multi-row transaction, flag on)", -1)) + require.Len(t, multiTxn, 3) + for _, v := range multiTxn { + require.NotZero(t, v) + require.Equal(t, multiTxn[0], v, "rows from one transaction must share ts_ms") + } + + // Separate transactions: ts_ms must be monotonically non-decreasing. + time.Sleep(50 * time.Millisecond) + db.Exec(t, `INSERT INTO VALUES (6, 'six')`) + time.Sleep(50 * time.Millisecond) + db.Exec(t, `INSERT INTO VALUES (7, 'seven')`) + sepTxn := extractTsMsValues(t, tc.Run("Replication (separate transactions, flag on)", -1)) + require.Len(t, sepTxn, 2) + require.LessOrEqual(t, sepTxn[0], sepTxn[1], + "ts_ms must be monotonically non-decreasing across transactions") + + cupaloy.SnapshotT(t, tc.Transcript.String()) +} + +// extractTsMsValues returns the _meta/source/ts_ms value of every captured +// document, in emission order. Documents without ts_ms are skipped. +func extractTsMsValues(t *testing.T, raw []byte) []int64 { + t.Helper() + var values []int64 + for line := range bytes.SplitSeq(raw, []byte("\n")) { + if len(line) == 0 { + continue + } + var parts []json.RawMessage + require.NoError(t, json.Unmarshal(line, &parts)) + if len(parts) < 2 { + continue + } + var doc struct { + Meta struct { + Source struct { + TsMs *int64 `json:"ts_ms"` + } `json:"source"` + } `json:"_meta"` + } + require.NoError(t, json.Unmarshal(parts[1], &doc)) + if doc.Meta.Source.TsMs != nil { + values = append(values, *doc.Meta.Source.TsMs) + } + } + return values +} diff --git a/source-sqlserver/main.go b/source-sqlserver/main.go index df01880ae1..c4fa527ff5 100644 --- a/source-sqlserver/main.go +++ b/source-sqlserver/main.go @@ -97,6 +97,7 @@ type advancedConfig struct { Filegroup string `json:"filegroup,omitempty" jsonschema:"title=CDC Instance Filegroup,description=When set the connector will create new CDC instances with the specified 'filegroup_name' argument. Has no effect if CDC instances are managed manually."` RoleName string `json:"role_name,omitempty" jsonschema:"title=CDC Instance Access Role,description=When set the connector will create new CDC instances with the specified 'role_name' argument as the gating role. When unset the capture user name is used as the 'role_name' instead. Has no effect if CDC instances are managed manually."` SourceTag string `json:"source_tag,omitempty" jsonschema:"title=Source Tag,description=When set the capture will add this value as the property 'tag' in the source metadata of each document."` + PopulateSourceTsMs bool `json:"populate_source_ts_ms,omitempty" jsonschema:"title=Populate Source Timestamps,default=false,description=When set the connector will populate the source timestamp on CDC change events with the transaction commit time. The commit time is looked up from cdc.lsn_time_mapping after each polling cycle. This adds one extra query per change batch and is therefore opt-in. Backfill rows do not have a meaningful commit time so the source timestamp will not be set on them."` FeatureFlags string `json:"feature_flags,omitempty" jsonschema:"title=Feature Flags,description=This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support."` WatermarksTable string `json:"watermarksTable,omitempty" jsonschema:"default=dbo.flow_watermarks,description=This property is deprecated for new captures as they will no longer use watermark writes by default. The name of the table used for watermark writes during backfills. Must be fully-qualified in '.' form."` } diff --git a/source-sqlserver/replication.go b/source-sqlserver/replication.go index eb7ce03ecc..f6eb1c9ac5 100644 --- a/source-sqlserver/replication.go +++ b/source-sqlserver/replication.go @@ -1057,15 +1057,43 @@ type tablePollInfo struct { } func (rs *sqlserverReplicationStream) pollTable(ctx context.Context, info *tablePollInfo) error { - //log.WithFields(log.Fields{ - // "stream": info.StreamID, - // "instance": info.InstanceName, - // "fromLSN": info.FromLSN, - // "toLSN": info.ToLSN, - //}).Trace("polling stream") + if !rs.cfg.Advanced.PopulateSourceTsMs { + return rs.pollTableRange(ctx, info, info.FromLSN, info.ToLSN, nil) + } + + // Walk the polling cycle's LSN range in pages bounded by + // tranEndTimesPageSize so the prefetch never grows unbounded during a + // wide catch-up. Each iteration: one prefetch query against + // cdc.lsn_time_mapping, one TVF query over the same sub-range, emit + // each change row with its prefetched ts_ms. + var current = info.FromLSN + for bytes.Compare(current, info.ToLSN) < 0 { + tranEndTimes, pageEnd, err := rs.lookupTranEndTimesPage(ctx, current, info.ToLSN, tranEndTimesPageSize) + if err != nil { + return fmt.Errorf("error prefetching commit times: %w", err) + } + if err := rs.pollTableRange(ctx, info, current, pageEnd, tranEndTimes); err != nil { + return err + } + current = pageEnd + } + return nil +} +// pollTableRange scans the change-table TVF over the half-open LSN range +// (fromLSN, toLSN] and emits each row as a change event. When tranEndTimes is +// non-nil, every emitted event is enriched with its commit time as +// _meta/source/ts_ms; when nil, events stream without ts_ms (the original +// behavior, byte-identical to a flag-off capture). +func (rs *sqlserverReplicationStream) pollTableRange( + ctx context.Context, + info *tablePollInfo, + fromLSN LSN, + toLSN LSN, + tranEndTimes map[string]time.Time, +) error { var query = fmt.Sprintf(`SELECT * FROM cdc.fn_cdc_get_all_changes_%s(@p1, @p2, N'all');`, info.InstanceName) - rows, err := rs.conn.QueryContext(ctx, query, info.FromLSN, info.ToLSN) + rows, err := rs.conn.QueryContext(ctx, query, fromLSN, toLSN) if err != nil { return fmt.Errorf("error requesting changes: %w", err) } @@ -1186,7 +1214,7 @@ func (rs *sqlserverReplicationStream) pollTable(ctx context.Context, info *table if !ok || changeLSN == nil { return fmt.Errorf("invalid '__$start_lsn' value (capture user may not have correct permissions): %v", changeLSN) } - if bytes.Compare(changeLSN, info.FromLSN) <= 0 { + if bytes.Compare(changeLSN, fromLSN) <= 0 { continue } @@ -1241,14 +1269,70 @@ func (rs *sqlserverReplicationStream) pollTable(ctx context.Context, info *table RowKey: rowKey, Values: vals, } + if tranEndTimes != nil { + if t, ok := tranEndTimes[string(changeLSN)]; ok { + event.Meta.Source.Millis = t.UnixMilli() + } + } if err := rs.emitEvent(ctx, event); err != nil { return err } } - return rows.Err() } +// tranEndTimesPageSize bounds the row count of a single +// cdc.lsn_time_mapping prefetch. Tunable; lower values reduce per-page +// memory at the cost of more queries during wide catch-up. +const tranEndTimesPageSize = 100000 + +// lookupTranEndTimesPage returns up to pageSize commit-time entries from +// cdc.lsn_time_mapping for the half-open range (fromLSN, toLSN]. The second +// return value is the upper bound of the returned page: when pageSize rows +// were returned it is the last row's start_lsn (so the caller advances to +// it as the next page's fromLSN); otherwise it is toLSN, signaling that the +// entire requested range was consumed. +// +// LSNs not present in cdc.lsn_time_mapping (possible at retention +// boundaries) and rows with a NULL tran_end_time are absent from the +// returned map. +func (rs *sqlserverReplicationStream) lookupTranEndTimesPage(ctx context.Context, fromLSN, toLSN LSN, pageSize int) (map[string]time.Time, LSN, error) { + const queryFmt = `SELECT TOP (@p3) start_lsn, tran_end_time + FROM cdc.lsn_time_mapping + WHERE start_lsn > @p1 AND start_lsn <= @p2 + ORDER BY start_lsn` + + rows, err := rs.conn.QueryContext(ctx, queryFmt, fromLSN, toLSN, pageSize) + if err != nil { + return nil, nil, fmt.Errorf("error prefetching cdc.lsn_time_mapping: %w", err) + } + defer rows.Close() + + var result = make(map[string]time.Time) + var lastLSN LSN + var rowCount int + for rows.Next() { + var lsn LSN + var tranEndTime sql.NullTime + if err := rows.Scan(&lsn, &tranEndTime); err != nil { + return nil, nil, fmt.Errorf("error scanning cdc.lsn_time_mapping row: %w", err) + } + if tranEndTime.Valid { + result[string(lsn)] = tranEndTime.Time + } + lastLSN = lsn + rowCount++ + } + if err := rows.Err(); err != nil { + return nil, nil, err + } + + if rowCount >= pageSize { + return result, lastLSN, nil + } + return result, toLSN, nil +} + func (rs *sqlserverReplicationStream) emitEvent(ctx context.Context, event sqlcapture.DatabaseEvent) error { select { case rs.events <- event: