source-sqlserver: optionally populate _meta/source/ts_ms (#3553)#4328
source-sqlserver: optionally populate _meta/source/ts_ms (#3553)#4328jameswinegar wants to merge 10 commits into
Conversation
b0f80a0 to
9450be8
Compare
Add an advanced capture option `populate_source_ts_ms` (default false) which causes CDC events to carry the transaction commit time as `_meta/source/ts_ms`. The commit time is looked up from `cdc.lsn_time_mapping` after each polling cycle, deduplicated by LSN, and chunked at 1000 LSNs per query to stay under SQL Server's 2100-parameter limit. The lookup is best-effort: failures emit the batch with ts_ms unset and log a warning. Buffered events are flushed in 4096-event sub-batches to bound memory during catch-up scans. Backfill rows leave ts_ms unset (`Millis: 0, // Not known.`), matching the behavior of source-postgres and source-mysql. Closes estuary#3553.
9450be8 to
8210e6a
Compare
|
Follow-up to consider: TVF + LEFT JOIN as a single query I investigated whether replacing the prefetch + TVF pair with a single JOINed query would be a worthwhile optimization. Capturing the analysis here so it's not lost. SELECT t.[__$start_lsn], m.tran_end_time
FROM cdc.fn_cdc_get_all_changes_X(@p1, @p2, N'all') AS t
LEFT JOIN cdc.lsn_time_mapping AS m ON t.[__$start_lsn] = m.start_lsnI verified the syntax works correctly via raw Potential wins
Risks worth measuring before adopting
Couldn't benchmark locally |
| Filegroup string `json:"filegroup,omitempty" jsonschema:"title=CDC Instance Filegroup,description=When set the connector will create new CDC instances with the specified 'filegroup_name' argument. Has no effect if CDC instances are managed manually."` | ||
| RoleName string `json:"role_name,omitempty" jsonschema:"title=CDC Instance Access Role,description=When set the connector will create new CDC instances with the specified 'role_name' argument as the gating role. When unset the capture user name is used as the 'role_name' instead. Has no effect if CDC instances are managed manually."` | ||
| SourceTag string `json:"source_tag,omitempty" jsonschema:"title=Source Tag,description=When set the capture will add this value as the property 'tag' in the source metadata of each document."` | ||
| PopulateSourceTsMs bool `json:"populate_source_ts_ms,omitempty" jsonschema:"title=Populate Source ts_ms,default=false,description=When set the connector will populate the '_meta/source/ts_ms' property of CDC events with the transaction commit time. The commit time is looked up from cdc.lsn_time_mapping after each polling cycle. This adds one extra query per change batch and is therefore opt-in. Backfill rows do not have a meaningful commit time so ts_ms will not be set on them."` |
There was a problem hiding this comment.
I would change the description to "Populate Source Timestamps", it feels weird to put the ts_ms field name into the human-readable field description which will render in the UI.
There was a problem hiding this comment.
Renamed to Populate Source Timestamps; the description no longer references the ts_ms field name.
| // testPopulateSourceTsMs verifies the populate_source_ts_ms advanced option: | ||
| // CDC events carry transaction commit time as _meta/source/ts_ms; backfill | ||
| // rows and flag-off captures do not. | ||
| func testPopulateSourceTsMs(t *testing.T, setup testSetupFunc) { |
There was a problem hiding this comment.
This test does not belong here. The tests in this file / package are shared between source-sqlserver and source-sqlserver-ct, but this test cannot possibly work against source-sqlserver-ct.
There was a problem hiding this comment.
Moved to source-sqlserver/capture_test.go as TestPopulateSourceTimestamps. Snapshot at source-sqlserver/.snapshots/TestPopulateSourceTimestamps.
| // forces sys.sp_cdc_scan so the test works without the SQL Server Agent, | ||
| // and verifies that lookupTranEndTimesPage returns the same | ||
| // start_lsn -> tran_end_time mapping that's in cdc.lsn_time_mapping. | ||
| func TestLookupTranEndTimesPageIntegration(t *testing.T) { |
There was a problem hiding this comment.
This test appears to be mostly validating SQL Server CDC behavior, and to the extent that it's actually exercising connector functionality it appears to be checking a trivial property that amounts to "is the data we read from this table the same as the data we read from this table?"
| } | ||
|
|
||
| var ctx = context.Background() | ||
| var controlURI = (&Config{ |
There was a problem hiding this comment.
This whole chunk of connection-opening code is boilerplate which should be factored out into a helper function. I would probably either use the existing blackboxTestSetup() helper (which does some other setup work which isn't strictly needed here but which is benign), or factor out just the "open control connection" logic from that into another named helper function and use it here. Either way, half of the line count of these tests should not be devoted to boilerplate connection opening code.
There was a problem hiding this comment.
The whole file is gone. TestPopulateSourceTimestamps (capture_test.go) uses blackboxTestSetup().
| require.NoErrorf(t, err, "executing %s", query) | ||
| } | ||
|
|
||
| runSQL(fmt.Sprintf(`IF EXISTS (SELECT 1 FROM cdc.change_tables WHERE capture_instance = N'%s') EXEC sys.sp_cdc_disable_table @source_schema = 'dbo', @source_name = 'tsms_page_%s', @capture_instance = N'%s'`, captureInstance, probeID, captureInstance)) |
There was a problem hiding this comment.
This table creation/cleanup code is duplicating test helpers we already have.
There was a problem hiding this comment.
The whole file is gone. Coverage is TestPopulateSourceTimestamps which uses db.CreateTable().
|
|
||
| runSQL(fmt.Sprintf(`DECLARE @i INT = 0; WHILE @i < %d BEGIN INSERT INTO %s (id) VALUES (@i); SET @i = @i + 1; END`, numTransactions, tableName)) | ||
|
|
||
| if _, err := conn.ExecContext(ctx, "EXEC sys.sp_cdc_scan @maxtrans = 5000, @maxscans = 10"); err != nil { |
There was a problem hiding this comment.
It's not entirely clear to me why extra effort was put into running this against SQL Server Edge when our normal test DB setup defined in source-sqlserver/docker-compose.yaml runs a perfectly normal SQL Server instance which doesn't need these accommodations.
There was a problem hiding this comment.
The standard docker-compose.yaml is what runs in TestPopulateSourceTimestamps; no Edge-specific code remains. The original docker-compose failure on my end was that I had Docker Desktop set to "Docker VMM" instead of Apple Virtualization Framework — switching to VZ + Rosetta resolved it and SQL Server 2022 starts cleanly.
| require.NoError(t, rows.Close()) | ||
| require.Equalf(t, numTransactions, len(expectedLSNs), "expected %d distinct LSNs in change table; got %d", numTransactions, len(expectedLSNs)) | ||
|
|
||
| // Walk the range in pages exactly the way pollTable does. |
There was a problem hiding this comment.
Writing a bunch of extra logic to "do something the same way as the real implementation" is a sign that something is being done wrong here. If it's worth having a test for, it's worth finding a way to factor out the real logic so it can be invoked from the test.
There was a problem hiding this comment.
The whole file is gone. Coverage is TestPopulateSourceTimestamps. Pagination boundaries are not directly exercised by the integration test workload — if that's a concern, the right shape is making tranEndTimesPageSize a test-tunable var (like cdcCleanupInterval) and lowering it inside the integration test. Happy to add that if you'd like.
| // | ||
| // GODEBUG=x509negativeserial=1 TEST_DATABASE=yes \ | ||
| // go test -run='^$' -bench=BenchmarkLookupTranEndTimes -benchmem ./source-sqlserver/... | ||
| func BenchmarkLookupTranEndTimes(b *testing.B) { |
There was a problem hiding this comment.
While performance impact is a concern with this feature, the performance of doing the map lookups was never the issue and that's basically all this is measuring. The actual concerns are things like "how will an extra database RTT impact overall polling time in production deployments where the connector and database aren't running on the same machine" and "is it worth trying to cache the LSN time mapping information across all tables within a single polling cycle" and this benchmark says nothing useful about any of that. Recommend deleting.
| do not have a meaningful commit time and will not have `ts_ms` set, matching | ||
| the behavior of the Postgres and MySQL connectors. | ||
|
|
||
| The lookup is best-effort: if it fails for any reason, the rest of the |
There was a problem hiding this comment.
Is there a concrete failure scenario in mind here or is this just typical LLM "let me add fallbacks everywhere just in case" behavior?
There was a problem hiding this comment.
No concrete scenario; the fallback is gone. A cdc.lsn_time_mapping query failure now propagates from pollTable like any other transient SQL error.
Per review feedback: don't put the field name 'ts_ms' into the human-readable UI label and description.
…package Per review feedback: the test in go/capture/sqlserver/tests/capture.go is shared between source-sqlserver and source-sqlserver-ct, but populate_source_ts_ms is a source-sqlserver-only config field. Move TestPopulateSourceTimestamps and the extractTsMsValues helper into the source-sqlserver package where they belong.
Per review feedback: TestLookupTranEndTimesPageIntegration was mostly exercising SQL Server CDC behavior (multi-row transactions share LSNs, distinct transactions get distinct commit times) rather than connector logic, and where it did touch connector code it tautologically asserted that the data we read from cdc.lsn_time_mapping matches the data we read from cdc.lsn_time_mapping. Deleted.
Per review feedback: BenchmarkLookupTranEndTimes only measured the in-process map lookup performance, which was never the actual concern. The real questions for this feature are network round-trip impact in production deployments and whether per-polling-cycle caching across tables would be worthwhile, none of which this benchmark could speak to.
Per review feedback: - Use blackboxTestSetup() for connection setup instead of duplicating the control-connection boilerplate. - Use db.CreateTable() / db.QuietExec() for table create/cleanup instead of duplicating sp_cdc_enable_table / sp_cdc_disable_table by hand. - Drop sys.sp_cdc_scan and other Azure-SQL-Edge-specific accommodations; the standard test docker-compose runs a real SQL Server instance with the Agent enabled. - Replace the multi-page pagination walk (which mirrored pollTable's loop) with a single-call boundary test that asserts the function's pagination contract directly: a full page returns exactly pageSize entries with pageEnd advanced to the last entry's start_lsn; a non-full page returns all entries with pageEnd == toLSN.
Per review feedback: there was no concrete failure scenario the fallback was defending against. A cdc.lsn_time_mapping query failure is no different from any other transient SQL error in pollTable - propagate it and let the connector's outer retry loop handle it. Removes the swallow- the-error log path and the conditional re-call into pollTableRange.
Followup to review feedback: deleted replication_test.go. The empty-range test exercised a trivial branch and the pagination boundary test, while narrowly useful, was stylistically inconsistent with the rest of the connector test suite (everything else is blackbox integration via flowctl). Feature coverage lives in TestPopulateSourceTimestamps in capture_test.go.
Generated by running the test against the standard docker-compose SQL Server 2022 with the SQL Server Agent enabled. Also sanitize ts_ms to a zero placeholder rather than the literal string 'REDACTED' so the emitted JSON stays valid and the snapshot pretty-prints consistently (numeric field stays numeric).
Restores the file to match origin/main; the trailing blank line was introduced incidentally when removing testPopulateSourceTsMs.
Closes #3553.
Adds an advanced capture option
populate_source_ts_ms(default false) which causes the SQL Server connector to populate_meta/source/ts_mson CDC events with the transaction commit time, prefetched once per polling cycle fromcdc.lsn_time_mappingover the cycle's LSN range. This brings parity withsource-postgresandsource-mysql, which already populatets_msfor CDC events.Design
When the flag is enabled, each
pollTableinvocation runs an indexed range query againstcdc.lsn_time_mappingover the polling cycle's(FromLSN, ToLSN]window:The result is a
map[string]time.Timeconsulted in O(1) per change row to set_meta/source/ts_msas events stream through the existing scan-and-emit loop.tranEndTimesPageSize(default 100,000 rows) — each page does its own bounded prefetch and TVF, so memory stays bounded regardless of catch-up size.rs.eventsone at a time as they're scanned. No buffering, no flush, noclear()dance.ts_msunset and log a warning. Missing LSNs (possible at retention boundaries) and NULLtran_end_time(possible for control-row entries) leave individual events withts_msunset rather than failing the cycle.ts_msunset (Millis: 0, // Not known.), matching the convention in source-postgres and source-mysql.GRANT SELECT ON SCHEMA :: cdc TO flow_capture(already required) coverscdc.lsn_time_mapping.Why range pre-fetch over per-batch IN-list
An earlier iteration of this PR used a per-batch
WHERE start_lsn IN (?, ?, ...)lookup with chunked-at-1000 buffering. That design works fine for moderate-scale captures but doesn't scale to the kind of high-throughput SQL Server workloads we want to migrate from Debezium: the per-LSN cost stayed constant (~12.5 μs/LSN), so an enterprise workload with 50k LSNs per polling cycle paid ~625 ms of synchronous lookup time per cycle.The range-prefetch design pulls all commit times for the cycle in a single indexed range scan that amortizes its query overhead across however many LSNs are in the range. Per-LSN cost drops from ~12.5 μs to ~0.66 μs at high scale (≈18-20× faster) and the implementation is simpler — no event buffering, no flush threshold, no dedup map, no chunking loop.
Performance
Microbenchmark — feature off vs feature on
Measured locally against Azure SQL Edge (linux/arm64-native, Apple M4 Max). The
FeatureOffsub-bench iterates the same buffered events without a lookup; the delta is the per-pollTable cost the feature adds.What this shows:
tranEndTimesPageSize = 100,000; 50k LSNs (a high-throughput cycle) fits in one page = one query.pollTableworkers. Tunable viatranEndTimesPageSize.Comparison with the earlier IN-list iteration
Realistic workload sizing
Cost scales with distinct LSNs, not row count. Multi-row transactions (
INSERT ... VALUES (...), (...), (...), bulk operations) share onestart_lsn, so high-write-amplification workloads dedup heavily and see proportionally less overhead.Order-of-magnitude estimates (workloads vary; not from production data):
(Costs assume the per-LSN curve from the benchmark above. Real Linux x86_64 SQL Server is likely 2–5× faster than the SQL Edge ARM64 numbers shown.)
polling_interval(advanced option, default 500 ms) is tunable but does not reduce this feature's CPU cost — only the dedup ratio does. At 100k tps with a 5 s polling interval, a single page (100k rows) would be needed every cycle.Catch-up scenarios
A multi-hour catch-up means a wide LSN range that exceeds one page. The connector iterates
lookupTranEndTimesPagereturningpageEndset to the last row'sstart_lsnwhenever the page hits its row cap, then issues a TVF over the same sub-range. Each iteration: one prefetch query, one TVF query, bounded memory.A hypothetical 10M-LSN backlog dispatches ~100 prefetch+TVF iterations at ~33 ms prefetch each = ~3.3 s of total added time across the catch-up. The change-row processing dominates by orders of magnitude.
Testing — what was actually run
This is a starting point for validation, not exhaustive. Estuary CI should exercise the full capture pipeline against a SQL Server with the SQL Agent running.
go build ./...andgo vet ./...clean for bothsource-sqlserverandgo/capture/sqlserver/tests.Verbatim Go test output (against Azure SQL Edge, linux/arm64-native)
What each test proves:
TestSpec— JSON config schema is generated correctly with the newpopulate_source_ts_msadvanced field.TestLookupTranEndTimesPageIntegration— runs three transactions (one with two rows, two singletons), captures the FromLSN/ToLSN bounds viasys.fn_cdc_get_max_lsn(), forcessys.sp_cdc_scan, callslookupTranEndTimesPage, verifies the entire range fits in one page (pageEnd == toLSN), multi-row transaction equality, distinct/non-decreasing across transactions, and byte-for-byte agreement with a directSELECT tran_end_time FROM cdc.lsn_time_mapping WHERE start_lsn = ?query.TestLookupTranEndTimesPageEmpty— zero-width range returns a non-nil empty map and the requested toLSN as pageEnd.TestLookupTranEndTimesPagePagination— inserts 1,100 single-row transactions in a server-side loop, walks the LSN range withpageSize=500(forces ≥ 2 pages), asserts every distinct LSN is consumed exactly once with no gaps or duplicates and that non-final pages are exactly full.Independent SQL probe against the same database
Issued the connector's exact range query directly through
sqlcmdto confirm the underlying assumptions aboutcdc.lsn_time_mapping:This independently confirms: a multi-row
INSERTproduces a single__$start_lsnshared by both rows; three separate commits produce three distinctstart_lsnvalues with monotonically non-decreasingtran_end_time; the connector's range query syntax/semantics match the table schema.Not yet validated locally
The full snapshot test
testPopulateSourceTsMs(ingo/capture/sqlserver/tests/capture.go) which routes throughflowctl preview→ connector binary → SQL Server. Azure SQL Edge has no SQL Server Agent so change tables aren't auto-populated; theGODEBUG=x509negativeserial=1flag needed for SQL Edge's older cert doesn't propagate to the connector binary spawned byflowctl. This test should run cleanly on Estuary CI which uses real SQL Server, and its snapshot file (source-sqlserver/.snapshots/TestCapture-PopulateSourceTsMs) will be auto-generated by the standardUPDATE_SNAPSHOTS=truetest invocation.Files changed
source-sqlserver/main.go—PopulateSourceTsMsadvanced optionsource-sqlserver/replication.go—pollTabledrives pagination,pollTableRangeis the scan-and-emit loop,lookupTranEndTimesPageis the bounded indexed range querysource-sqlserver/backfill.go— explicitMillis: 0 // Not known.matching Postgres/MySQL conventionsource-sqlserver/.snapshots/TestSpec— updated config schema snapshotsource-sqlserver/README.md— new section explaining the optionsource-sqlserver/replication_test.go(new) —TestLookupTranEndTimesPageIntegration,TestLookupTranEndTimesPageEmpty,TestLookupTranEndTimesPagePagination,BenchmarkLookupTranEndTimesgo/capture/sqlserver/tests/capture.go— newtestPopulateSourceTsMssnapshot + programmatic test for the full pipelineTest plan
go test ./source-sqlserver/...against the docker-compose SQL Server (this exercisestestPopulateSourceTsMsend-to-end and auto-generates the missing snapshot)BenchmarkLookupTranEndTimeson Estuary CI hardware against real SQL Server for an apples-to-apples baseline_meta/source/ts_msappears on CDC events and is absent on backfill events, observe per-cycle lookup latency and memory usagefailed to prefetch transaction commit timeswarnings in production logs after rollout