Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions go/capture/sqlserver/tests/capture.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package tests

import (
"bytes"
"encoding/json"
"fmt"
"regexp"
"testing"
"time"

"github.com/bradleyjkemp/cupaloy"
"github.com/estuary/connectors/go/capture/blackbox"
"github.com/stretchr/testify/require"
)

Expand All @@ -20,6 +25,7 @@ func TestCapture(t *testing.T, setup testSetupFunc) {
t.Run("PrimaryKeyUpdate", func(t *testing.T) { testPrimaryKeyUpdate(t, setup) })
t.Run("ComputedPrimaryKey", func(t *testing.T) { testComputedPrimaryKey(t, setup) })
t.Run("SourceTag", func(t *testing.T) { testSourceTag(t, setup) })
t.Run("PopulateSourceTsMs", func(t *testing.T) { testPopulateSourceTsMs(t, setup) })
}

func testColumnNameQuoting(t *testing.T, setup testSetupFunc) {
Expand Down Expand Up @@ -197,3 +203,85 @@ func testSourceTag(t *testing.T, setup testSetupFunc) {
tc.Run("Replication", -1)
cupaloy.SnapshotT(t, tc.Transcript.String())
}

// testPopulateSourceTsMs verifies the populate_source_ts_ms advanced option:
// CDC events carry transaction commit time as _meta/source/ts_ms; backfill
// rows and flag-off captures do not.
func testPopulateSourceTsMs(t *testing.T, setup testSetupFunc) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test does not belong here. The tests in this file / package are shared between source-sqlserver and source-sqlserver-ct, but this test cannot possibly work against source-sqlserver-ct.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to source-sqlserver/capture_test.go as TestPopulateSourceTimestamps. Snapshot at source-sqlserver/.snapshots/TestPopulateSourceTimestamps.

var db, tc = setup(t)
db.CreateTable(t, `<NAME>`, `(id INTEGER PRIMARY KEY, data TEXT)`)

// Redact ts_ms values in snapshots so timestamps don't destabilize the
// snapshot across runs; equality and ordering are asserted programmatically
// against the raw capture output below.
tc.DocumentSanitizers = append(tc.DocumentSanitizers, blackbox.JSONSanitizer{
Matcher: regexp.MustCompile(`"ts_ms":\d+`),
Replacement: `"ts_ms":REDACTED`,
})

// Backfill (flag off): no ts_ms expected.
db.Exec(t, `INSERT INTO <NAME> VALUES (0, 'zero'), (1, 'one')`)
tc.Discover("Discover Tables")
var backfillBytes = tc.Run("Initial Backfill (flag off)", -1)
require.Empty(t, extractTsMsValues(t, backfillBytes), "backfill events must not have ts_ms")

// Replication (flag off): no ts_ms expected.
db.Exec(t, `INSERT INTO <NAME> VALUES (2, 'two')`)
var replOffBytes = tc.Run("Replication (flag off)", -1)
require.Empty(t, extractTsMsValues(t, replOffBytes), "flag-off replication events must not have ts_ms")

require.NoError(t, tc.Capture.EditConfig("advanced.populate_source_ts_ms", true))

// Single transaction with multiple rows: all rows must share ts_ms.
db.Exec(t, `INSERT INTO <NAME> VALUES (3, 'three'), (4, 'four'), (5, 'five')`)
var multiTxnBytes = tc.Run("Replication (multi-row transaction, flag on)", -1)
var multiTxnTsMs = extractTsMsValues(t, multiTxnBytes)
require.Len(t, multiTxnTsMs, 3, "expected ts_ms on all three CDC rows")
for _, v := range multiTxnTsMs {
require.NotZero(t, v, "ts_ms should be populated with a real commit time")
require.Equal(t, multiTxnTsMs[0], v, "rows from one transaction must share ts_ms")
}

// Separate transactions: ts_ms must be monotonically non-decreasing.
time.Sleep(50 * time.Millisecond)
db.Exec(t, `INSERT INTO <NAME> VALUES (6, 'six')`)
time.Sleep(50 * time.Millisecond)
db.Exec(t, `INSERT INTO <NAME> VALUES (7, 'seven')`)
var sepTxnBytes = tc.Run("Replication (separate transactions, flag on)", -1)
var sepTxnTsMs = extractTsMsValues(t, sepTxnBytes)
require.Len(t, sepTxnTsMs, 2, "expected ts_ms on both CDC rows")
require.LessOrEqual(t, sepTxnTsMs[0], sepTxnTsMs[1], "ts_ms must be monotonically non-decreasing across transactions")

cupaloy.SnapshotT(t, tc.Transcript.String())
}

// extractTsMsValues returns the _meta/source/ts_ms value for every emitted
// document in the raw capture output, in emission order. Documents without
// ts_ms are skipped.
func extractTsMsValues(t *testing.T, raw []byte) []int64 {
t.Helper()
var values []int64
for line := range bytes.SplitSeq(raw, []byte("\n")) {
if len(line) == 0 {
continue
}
// Each captured document is emitted as ["binding-name", {document}, ...].
var parts []json.RawMessage
require.NoError(t, json.Unmarshal(line, &parts))
if len(parts) < 2 {
continue
}
var doc struct {
Meta struct {
Source struct {
TsMs *int64 `json:"ts_ms"`
} `json:"source"`
} `json:"_meta"`
}
require.NoError(t, json.Unmarshal(parts[1], &doc))
if doc.Meta.Source.TsMs != nil {
values = append(values, *doc.Meta.Source.TsMs)
}
}
return values
}
6 changes: 6 additions & 0 deletions source-sqlserver/.snapshots/TestSpec
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@
"title": "Source Tag",
"description": "When set the capture will add this value as the property 'tag' in the source metadata of each document."
},
"populate_source_ts_ms": {
"type": "boolean",
"title": "Populate Source ts_ms",
"description": "When set the connector will populate the '_meta/source/ts_ms' property of CDC events with the transaction commit time. The commit time is looked up from cdc.lsn_time_mapping after each polling cycle. This adds one extra query per change batch and is therefore opt-in. Backfill rows do not have a meaningful commit time so ts_ms will not be set on them.",
"default": false
},
"feature_flags": {
"type": "string",
"title": "Feature Flags",
Expand Down
18 changes: 18 additions & 0 deletions source-sqlserver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,24 @@ This is mostly an issue for small-scale testing, as these overheads are more or
less fixed and unrelated to the actual volume of changes. But this makes our
automated test suite runs take 10-20x longer than they do on other databases.

### Transaction commit timestamps (`_meta/source/ts_ms`)

By default the connector does not populate `_meta/source/ts_ms` on CDC events,
because SQL Server CDC change tables do not contain commit timestamps inline.
Setting the advanced option `populate_source_ts_ms` causes the connector to
range-prefetch commit times from `cdc.lsn_time_mapping` over each polling
cycle's LSN range and apply them as `_meta/source/ts_ms` on emitted change
events. The lookup is one indexed range query per polling cycle, paginated
when the range is wide enough to cross the prefetch page size. Backfill rows
do not have a meaningful commit time and will not have `ts_ms` set, matching
the behavior of the Postgres and MySQL connectors.

The lookup is best-effort: if it fails for any reason, the rest of the
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a concrete failure scenario in mind here or is this just typical LLM "let me add fallbacks everywhere just in case" behavior?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No concrete scenario; the fallback is gone. A cdc.lsn_time_mapping query failure now propagates from pollTable like any other transient SQL error.

polling cycle is emitted with `ts_ms` left unset and a warning is logged. The
standard `GRANT SELECT ON SCHEMA :: cdc TO flow_capture` already grants the
necessary permissions on `cdc.lsn_time_mapping`, so no additional setup is
required.

### Developing

Some useful commands for working with a test instance of SQL Server:
Expand Down
1 change: 1 addition & 0 deletions source-sqlserver/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (db *sqlserverDatabase) ScanTableChunk(ctx context.Context, info *sqlcaptur
Operation: sqlcapture.InsertOp,
Source: sqlserverSourceInfoCDC{
SourceCommon: sqlcapture.SourceCommon{
Millis: 0, // Not known.
Schema: schema,
Snapshot: true,
Table: table,
Expand Down
1 change: 1 addition & 0 deletions source-sqlserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type advancedConfig struct {
Filegroup string `json:"filegroup,omitempty" jsonschema:"title=CDC Instance Filegroup,description=When set the connector will create new CDC instances with the specified 'filegroup_name' argument. Has no effect if CDC instances are managed manually."`
RoleName string `json:"role_name,omitempty" jsonschema:"title=CDC Instance Access Role,description=When set the connector will create new CDC instances with the specified 'role_name' argument as the gating role. When unset the capture user name is used as the 'role_name' instead. Has no effect if CDC instances are managed manually."`
SourceTag string `json:"source_tag,omitempty" jsonschema:"title=Source Tag,description=When set the capture will add this value as the property 'tag' in the source metadata of each document."`
PopulateSourceTsMs bool `json:"populate_source_ts_ms,omitempty" jsonschema:"title=Populate Source ts_ms,default=false,description=When set the connector will populate the '_meta/source/ts_ms' property of CDC events with the transaction commit time. The commit time is looked up from cdc.lsn_time_mapping after each polling cycle. This adds one extra query per change batch and is therefore opt-in. Backfill rows do not have a meaningful commit time so ts_ms will not be set on them."`
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would change the description to "Populate Source Timestamps", it feels weird to put the ts_ms field name into the human-readable field description which will render in the UI.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to Populate Source Timestamps; the description no longer references the ts_ms field name.

FeatureFlags string `json:"feature_flags,omitempty" jsonschema:"title=Feature Flags,description=This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support."`
WatermarksTable string `json:"watermarksTable,omitempty" jsonschema:"default=dbo.flow_watermarks,description=This property is deprecated for new captures as they will no longer use watermark writes by default. The name of the table used for watermark writes during backfills. Must be fully-qualified in '<schema>.<table>' form."`
}
Expand Down
116 changes: 107 additions & 9 deletions source-sqlserver/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -1057,15 +1057,57 @@ type tablePollInfo struct {
}

func (rs *sqlserverReplicationStream) pollTable(ctx context.Context, info *tablePollInfo) error {
//log.WithFields(log.Fields{
// "stream": info.StreamID,
// "instance": info.InstanceName,
// "fromLSN": info.FromLSN,
// "toLSN": info.ToLSN,
//}).Trace("polling stream")
if !rs.cfg.Advanced.PopulateSourceTsMs {
return rs.pollTableRange(ctx, info, info.FromLSN, info.ToLSN, nil)
}

// Walk the polling cycle's LSN range in pages bounded by
// tranEndTimesPageSize so the prefetch never grows unbounded during a
// wide catch-up. Each iteration: one prefetch query against
// cdc.lsn_time_mapping, one TVF query over the same sub-range, emit
// each change row with its prefetched ts_ms.
var current = info.FromLSN
for bytes.Compare(current, info.ToLSN) < 0 {
tranEndTimes, pageEnd, err := rs.lookupTranEndTimesPage(ctx, current, info.ToLSN, tranEndTimesPageSize)
if err != nil {
// Best-effort: log once and emit the rest of the polling cycle
// without ts_ms so a transient lookup failure can't stall CDC.
log.WithFields(log.Fields{
"err": err,
"instance": info.InstanceName,
"fromLSN": fmt.Sprintf("%X", current),
"toLSN": fmt.Sprintf("%X", info.ToLSN),
}).Warn("failed to prefetch transaction commit times; emitting remaining changes without ts_ms")
return rs.pollTableRange(ctx, info, current, info.ToLSN, nil)
}
log.WithFields(log.Fields{
"instance": info.InstanceName,
"fromLSN": fmt.Sprintf("%X", current),
"toLSN": fmt.Sprintf("%X", pageEnd),
"lsns": len(tranEndTimes),
}).Debug("prefetched transaction commit times")
if err := rs.pollTableRange(ctx, info, current, pageEnd, tranEndTimes); err != nil {
return err
}
current = pageEnd
}
return nil
}

// pollTableRange scans the change-table TVF over the half-open LSN range
// (fromLSN, toLSN] and emits each row as a change event. When tranEndTimes is
// non-nil, every emitted event is enriched with its commit time as
// _meta/source/ts_ms; when nil, events stream without ts_ms (the original
// behavior, byte-identical to a flag-off capture).
func (rs *sqlserverReplicationStream) pollTableRange(
ctx context.Context,
info *tablePollInfo,
fromLSN LSN,
toLSN LSN,
tranEndTimes map[string]time.Time,
) error {
var query = fmt.Sprintf(`SELECT * FROM cdc.fn_cdc_get_all_changes_%s(@p1, @p2, N'all');`, info.InstanceName)
rows, err := rs.conn.QueryContext(ctx, query, info.FromLSN, info.ToLSN)
rows, err := rs.conn.QueryContext(ctx, query, fromLSN, toLSN)
if err != nil {
return fmt.Errorf("error requesting changes: %w", err)
}
Expand Down Expand Up @@ -1186,7 +1228,7 @@ func (rs *sqlserverReplicationStream) pollTable(ctx context.Context, info *table
if !ok || changeLSN == nil {
return fmt.Errorf("invalid '__$start_lsn' value (capture user may not have correct permissions): %v", changeLSN)
}
if bytes.Compare(changeLSN, info.FromLSN) <= 0 {
if bytes.Compare(changeLSN, fromLSN) <= 0 {
continue
}

Expand Down Expand Up @@ -1241,14 +1283,70 @@ func (rs *sqlserverReplicationStream) pollTable(ctx context.Context, info *table
RowKey: rowKey,
Values: vals,
}
if tranEndTimes != nil {
if t, ok := tranEndTimes[string(changeLSN)]; ok {
event.Meta.Source.Millis = t.UnixMilli()
}
}
if err := rs.emitEvent(ctx, event); err != nil {
return err
}
}

return rows.Err()
}

// tranEndTimesPageSize bounds the row count of a single
// cdc.lsn_time_mapping prefetch. Tunable; lower values reduce per-page
// memory at the cost of more queries during wide catch-up.
const tranEndTimesPageSize = 100000

// lookupTranEndTimesPage returns up to pageSize commit-time entries from
// cdc.lsn_time_mapping for the half-open range (fromLSN, toLSN]. The second
// return value is the upper bound of the returned page: when pageSize rows
// were returned it is the last row's start_lsn (so the caller advances to
// it as the next page's fromLSN); otherwise it is toLSN, signaling that the
// entire requested range was consumed.
//
// LSNs not present in cdc.lsn_time_mapping (possible at retention
// boundaries) and rows with a NULL tran_end_time are absent from the
// returned map.
func (rs *sqlserverReplicationStream) lookupTranEndTimesPage(ctx context.Context, fromLSN, toLSN LSN, pageSize int) (map[string]time.Time, LSN, error) {
const queryFmt = `SELECT TOP (@p3) start_lsn, tran_end_time
FROM cdc.lsn_time_mapping
WHERE start_lsn > @p1 AND start_lsn <= @p2
ORDER BY start_lsn`

rows, err := rs.conn.QueryContext(ctx, queryFmt, fromLSN, toLSN, pageSize)
if err != nil {
return nil, nil, fmt.Errorf("error prefetching cdc.lsn_time_mapping: %w", err)
}
defer rows.Close()

var result = make(map[string]time.Time)
var lastLSN LSN
var rowCount int
for rows.Next() {
var lsn LSN
var tranEndTime sql.NullTime
if err := rows.Scan(&lsn, &tranEndTime); err != nil {
return nil, nil, fmt.Errorf("error scanning cdc.lsn_time_mapping row: %w", err)
}
if tranEndTime.Valid {
result[string(lsn)] = tranEndTime.Time
}
lastLSN = lsn
rowCount++
}
if err := rows.Err(); err != nil {
return nil, nil, err
}

if rowCount >= pageSize {
return result, lastLSN, nil
}
return result, toLSN, nil
}

func (rs *sqlserverReplicationStream) emitEvent(ctx context.Context, event sqlcapture.DatabaseEvent) error {
select {
case rs.events <- event:
Expand Down
Loading