Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 214 additions & 0 deletions source-sqlserver/.snapshots/TestPopulateSourceTimestamps
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
sql> CREATE TABLE dbo.PopulateSourceTimestamps_731765 (id INTEGER PRIMARY KEY, data TEXT)
sql> INSERT INTO dbo.PopulateSourceTimestamps_731765 VALUES (0, 'zero'), (1, 'one')
=== Discover: Discover Tables ===
acmeCo/test_capture/dbo/populatesourcetimestamps_731765
=== Run: Initial Backfill (flag off) ===
[
"acmeCo/test_capture/dbo/populatesourcetimestamps_731765",
{
"_meta": {
"op": "c",
"source": {
"lsn": "",
"schema": "dbo",
"seqval": "REDACTED",
"snapshot": true,
"table": "PopulateSourceTimestamps_731765"
},
"uuid": "DocUUIDPlaceholder-329Bb50aa48EAa9ef"
},
"data": "zero",
"id": 0
}
]
[
"acmeCo/test_capture/dbo/populatesourcetimestamps_731765",
{
"_meta": {
"op": "c",
"source": {
"lsn": "",
"schema": "dbo",
"seqval": "REDACTED",
"snapshot": true,
"table": "PopulateSourceTimestamps_731765"
},
"uuid": "DocUUIDPlaceholder-329Bb50aa48EAa9ef"
},
"data": "one",
"id": 1
}
]
--- Final Checkpoint ---
{
"bindingStateV1": {
"dbo%2FPopulateSourceTimestamps_731765": {
"backfilled": 2,
"key_columns": [
"id"
],
"mode": "Active"
}
},
"cursor": "REDACTED"
}
sql> INSERT INTO dbo.PopulateSourceTimestamps_731765 VALUES (2, 'two')
=== Run: Replication (flag off) ===
[
"acmeCo/test_capture/dbo/populatesourcetimestamps_731765",
{
"_meta": {
"op": "c",
"source": {
"lsn": "REDACTED",
"schema": "dbo",
"seqval": "REDACTED",
"table": "PopulateSourceTimestamps_731765",
"updateMask": "Aw=="
},
"uuid": "DocUUIDPlaceholder-329Bb50aa48EAa9ef"
},
"data": "two",
"id": 2
}
]
--- Final Checkpoint ---
{
"bindingStateV1": {
"dbo%2FPopulateSourceTimestamps_731765": {
"backfilled": 2,
"key_columns": [
"id"
],
"mode": "Active"
}
},
"cursor": "REDACTED"
}
sql> INSERT INTO dbo.PopulateSourceTimestamps_731765 VALUES (3, 'three'), (4, 'four'), (5, 'five')
=== Run: Replication (multi-row transaction, flag on) ===
[
"acmeCo/test_capture/dbo/populatesourcetimestamps_731765",
{
"_meta": {
"op": "c",
"source": {
"lsn": "REDACTED",
"schema": "dbo",
"seqval": "REDACTED",
"table": "PopulateSourceTimestamps_731765",
"ts_ms": 0,
"updateMask": "Aw=="
},
"uuid": "DocUUIDPlaceholder-329Bb50aa48EAa9ef"
},
"data": "three",
"id": 3
}
]
[
"acmeCo/test_capture/dbo/populatesourcetimestamps_731765",
{
"_meta": {
"op": "c",
"source": {
"lsn": "REDACTED",
"schema": "dbo",
"seqval": "REDACTED",
"table": "PopulateSourceTimestamps_731765",
"ts_ms": 0,
"updateMask": "Aw=="
},
"uuid": "DocUUIDPlaceholder-329Bb50aa48EAa9ef"
},
"data": "four",
"id": 4
}
]
[
"acmeCo/test_capture/dbo/populatesourcetimestamps_731765",
{
"_meta": {
"op": "c",
"source": {
"lsn": "REDACTED",
"schema": "dbo",
"seqval": "REDACTED",
"table": "PopulateSourceTimestamps_731765",
"ts_ms": 0,
"updateMask": "Aw=="
},
"uuid": "DocUUIDPlaceholder-329Bb50aa48EAa9ef"
},
"data": "five",
"id": 5
}
]
--- Final Checkpoint ---
{
"bindingStateV1": {
"dbo%2FPopulateSourceTimestamps_731765": {
"backfilled": 2,
"key_columns": [
"id"
],
"mode": "Active"
}
},
"cursor": "REDACTED"
}
sql> INSERT INTO dbo.PopulateSourceTimestamps_731765 VALUES (6, 'six')
sql> INSERT INTO dbo.PopulateSourceTimestamps_731765 VALUES (7, 'seven')
=== Run: Replication (separate transactions, flag on) ===
[
"acmeCo/test_capture/dbo/populatesourcetimestamps_731765",
{
"_meta": {
"op": "c",
"source": {
"lsn": "REDACTED",
"schema": "dbo",
"seqval": "REDACTED",
"table": "PopulateSourceTimestamps_731765",
"ts_ms": 0,
"updateMask": "Aw=="
},
"uuid": "DocUUIDPlaceholder-329Bb50aa48EAa9ef"
},
"data": "six",
"id": 6
}
]
[
"acmeCo/test_capture/dbo/populatesourcetimestamps_731765",
{
"_meta": {
"op": "c",
"source": {
"lsn": "REDACTED",
"schema": "dbo",
"seqval": "REDACTED",
"table": "PopulateSourceTimestamps_731765",
"ts_ms": 0,
"updateMask": "Aw=="
},
"uuid": "DocUUIDPlaceholder-329Bb50aa48EAa9ef"
},
"data": "seven",
"id": 7
}
]
--- Final Checkpoint ---
{
"bindingStateV1": {
"dbo%2FPopulateSourceTimestamps_731765": {
"backfilled": 2,
"key_columns": [
"id"
],
"mode": "Active"
}
},
"cursor": "REDACTED"
}

6 changes: 6 additions & 0 deletions source-sqlserver/.snapshots/TestSpec
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@
"title": "Source Tag",
"description": "When set the capture will add this value as the property 'tag' in the source metadata of each document."
},
"populate_source_ts_ms": {
"type": "boolean",
"title": "Populate Source Timestamps",
"description": "When set the connector will populate the source timestamp on CDC change events with the transaction commit time. The commit time is looked up from cdc.lsn_time_mapping after each polling cycle. This adds one extra query per change batch and is therefore opt-in. Backfill rows do not have a meaningful commit time so the source timestamp will not be set on them.",
"default": false
},
"feature_flags": {
"type": "string",
"title": "Feature Flags",
Expand Down
14 changes: 14 additions & 0 deletions source-sqlserver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,20 @@ This is mostly an issue for small-scale testing, as these overheads are more or
less fixed and unrelated to the actual volume of changes. But this makes our
automated test suite runs take 10-20x longer than they do on other databases.

### Transaction commit timestamps (`_meta/source/ts_ms`)

By default the connector does not populate `_meta/source/ts_ms` on CDC events,
because SQL Server CDC change tables do not contain commit timestamps inline.
Setting the advanced option `populate_source_ts_ms` causes the connector to
range-prefetch commit times from `cdc.lsn_time_mapping` over each polling
cycle's LSN range and apply them as `_meta/source/ts_ms` on emitted change
events. The lookup is one indexed range query per polling cycle, paginated
when the range is wide enough to cross the prefetch page size. Backfill rows
do not have a meaningful commit time and will not have `ts_ms` set, matching
the behavior of the Postgres and MySQL connectors. The standard
`GRANT SELECT ON SCHEMA :: cdc TO flow_capture` already grants the necessary
permissions on `cdc.lsn_time_mapping`, so no additional setup is required.

### Developing

Some useful commands for working with a test instance of SQL Server:
Expand Down
1 change: 1 addition & 0 deletions source-sqlserver/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (db *sqlserverDatabase) ScanTableChunk(ctx context.Context, info *sqlcaptur
Operation: sqlcapture.InsertOp,
Source: sqlserverSourceInfoCDC{
SourceCommon: sqlcapture.SourceCommon{
Millis: 0, // Not known.
Schema: schema,
Snapshot: true,
Table: table,
Expand Down
83 changes: 83 additions & 0 deletions source-sqlserver/capture_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package main

import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"regexp"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/bradleyjkemp/cupaloy"
"github.com/estuary/connectors/go/capture/blackbox"
"github.com/estuary/connectors/sqlcapture"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -375,3 +379,82 @@ func TestCatalogPrimaryKey(t *testing.T) {
tc.Run("Replication", -1)
cupaloy.SnapshotT(t, tc.Transcript.String())
}

// TestPopulateSourceTimestamps verifies the populate_source_ts_ms advanced
// option: CDC events carry the transaction commit time as
// _meta/source/ts_ms; backfill rows and flag-off captures do not.
func TestPopulateSourceTimestamps(t *testing.T) {
var db, tc = blackboxTestSetup(t)
db.CreateTable(t, `<NAME>`, `(id INTEGER PRIMARY KEY, data TEXT)`)

// Redact ts_ms values in the snapshot so commit timestamps don't
// destabilize the snapshot across runs. Equality and ordering are
// asserted programmatically below against the raw capture output.
tc.DocumentSanitizers = append(tc.DocumentSanitizers, blackbox.JSONSanitizer{
Matcher: regexp.MustCompile(`"ts_ms":\d+`),
Replacement: `"ts_ms":0`,
})

// Backfill (flag off): no ts_ms expected.
db.Exec(t, `INSERT INTO <NAME> VALUES (0, 'zero'), (1, 'one')`)
tc.Discover("Discover Tables")
require.Empty(t, extractTsMsValues(t, tc.Run("Initial Backfill (flag off)", -1)),
"backfill events must not have ts_ms")

// Replication (flag off): no ts_ms expected.
db.Exec(t, `INSERT INTO <NAME> VALUES (2, 'two')`)
require.Empty(t, extractTsMsValues(t, tc.Run("Replication (flag off)", -1)),
"flag-off replication events must not have ts_ms")

require.NoError(t, tc.Capture.EditConfig("advanced.populate_source_ts_ms", true))

// Single transaction with multiple rows: all rows must share ts_ms.
db.Exec(t, `INSERT INTO <NAME> VALUES (3, 'three'), (4, 'four'), (5, 'five')`)
multiTxn := extractTsMsValues(t, tc.Run("Replication (multi-row transaction, flag on)", -1))
require.Len(t, multiTxn, 3)
for _, v := range multiTxn {
require.NotZero(t, v)
require.Equal(t, multiTxn[0], v, "rows from one transaction must share ts_ms")
}

// Separate transactions: ts_ms must be monotonically non-decreasing.
time.Sleep(50 * time.Millisecond)
db.Exec(t, `INSERT INTO <NAME> VALUES (6, 'six')`)
time.Sleep(50 * time.Millisecond)
db.Exec(t, `INSERT INTO <NAME> VALUES (7, 'seven')`)
sepTxn := extractTsMsValues(t, tc.Run("Replication (separate transactions, flag on)", -1))
require.Len(t, sepTxn, 2)
require.LessOrEqual(t, sepTxn[0], sepTxn[1],
"ts_ms must be monotonically non-decreasing across transactions")

cupaloy.SnapshotT(t, tc.Transcript.String())
}

// extractTsMsValues returns the _meta/source/ts_ms value of every captured
// document, in emission order. Documents without ts_ms are skipped.
func extractTsMsValues(t *testing.T, raw []byte) []int64 {
t.Helper()
var values []int64
for line := range bytes.SplitSeq(raw, []byte("\n")) {
if len(line) == 0 {
continue
}
var parts []json.RawMessage
require.NoError(t, json.Unmarshal(line, &parts))
if len(parts) < 2 {
continue
}
var doc struct {
Meta struct {
Source struct {
TsMs *int64 `json:"ts_ms"`
} `json:"source"`
} `json:"_meta"`
}
require.NoError(t, json.Unmarshal(parts[1], &doc))
if doc.Meta.Source.TsMs != nil {
values = append(values, *doc.Meta.Source.TsMs)
}
}
return values
}
1 change: 1 addition & 0 deletions source-sqlserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type advancedConfig struct {
Filegroup string `json:"filegroup,omitempty" jsonschema:"title=CDC Instance Filegroup,description=When set the connector will create new CDC instances with the specified 'filegroup_name' argument. Has no effect if CDC instances are managed manually."`
RoleName string `json:"role_name,omitempty" jsonschema:"title=CDC Instance Access Role,description=When set the connector will create new CDC instances with the specified 'role_name' argument as the gating role. When unset the capture user name is used as the 'role_name' instead. Has no effect if CDC instances are managed manually."`
SourceTag string `json:"source_tag,omitempty" jsonschema:"title=Source Tag,description=When set the capture will add this value as the property 'tag' in the source metadata of each document."`
PopulateSourceTsMs bool `json:"populate_source_ts_ms,omitempty" jsonschema:"title=Populate Source Timestamps,default=false,description=When set the connector will populate the source timestamp on CDC change events with the transaction commit time. The commit time is looked up from cdc.lsn_time_mapping after each polling cycle. This adds one extra query per change batch and is therefore opt-in. Backfill rows do not have a meaningful commit time so the source timestamp will not be set on them."`
FeatureFlags string `json:"feature_flags,omitempty" jsonschema:"title=Feature Flags,description=This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support."`
WatermarksTable string `json:"watermarksTable,omitempty" jsonschema:"default=dbo.flow_watermarks,description=This property is deprecated for new captures as they will no longer use watermark writes by default. The name of the table used for watermark writes during backfills. Must be fully-qualified in '<schema>.<table>' form."`
}
Expand Down
Loading