Pass dynamic filter predicate to ConnectorSplitSource#getNextBatch#29206
Pass dynamic filter predicate to ConnectorSplitSource#getNextBatch#29206chenjian2664 wants to merge 11 commits into
ConnectorSplitSource#getNextBatch#29206Conversation
6805608 to
da416b1
Compare
da416b1 to
4c7cf8a
Compare
6f38f96 to
0648ae0
Compare
07fc49b to
0648ae0
Compare
|
Could you rebase on master to resolve conflicts? |
0648ae0 to
8c6f835
Compare
5a075fe to
642d95f
Compare
cb8f353 to
cf05e58
Compare
|
Updated based on the review feedback:
CI running now. The previous run's |
d5266ab to
33e75bb
Compare
|
@losipiuk Could you please take a look at Update engine and classloader wrappers to use new split APIs this one touches FTE relate code paths , I'd appreciate having your eyes on it |
This looks fine, but it does not seem very much related to FTE, at least not directly - did I miss sth obvious? |
|
@losipiuk Thanks for reviewing. I noticed the FTE tests were already failing when I started adding changes to this commit — mainly the tests related to |
|
There may be some timing issues. Can you point me to example failures? |
|
@losipiuk https://github.com/trinodb/trino/actions/runs/25434596125/job/74609407569 and https://github.com/trinodb/trino/actions/runs/25430147008/job/74594178701 |
7b64305 to
6eb0808
Compare
83923f9 to
5e632e5
Compare
Description
Move dynamic filter waiting logic from individual connectors into the engine layer.
SPI changes:
ConnectorDynamicFilter(TupleDomain<ColumnHandle> currentPredicate, boolean isComplete)record as a per-batch snapshot of the engine's DF state.ConnectorSplitSource#getNextBatch(int, ConnectorDynamicFilter)as the new primary entry point (default delegates to the deprecatedgetNextBatch(int)). The engine waits up toConnectorSplitSource#getRequestedDynamicFilterWaitTimeoutMillis()before the first call; subsequent calls pass the current predicate without additional waiting.ConnectorSplitSource#getRequestedDynamicFilterWaitTimeoutMillis()(default0) so each connector continues to control its own DF wait timeout — no new engine-level config needed.ConnectorSplitManager#getSplitsto acceptSet<ColumnHandle> dynamicFilterColumns(static scan-wide info) instead of a liveDynamicFilter. The deprecatedDynamicFilteroverload remains for backward compatibility.Engine changes:
ConnectorAwareSplitSourcenow owns the DF wait loop: readsgetRequestedDynamicFilterWaitTimeoutMillis()once at construction, waits onDynamicFilter.isBlocked()if awaitable and time remains, then passesnew ConnectorDynamicFilter(predicate, isComplete)on everygetNextBatchcall.SplitManagercalls the newgetSplits(..., dynamicFilter.getColumnsCovered(), constraint)overload.Connector changes:
IcebergSplitSourceoverridesgetNextBatch(int, ConnectorDynamicFilter)andgetRequestedDynamicFilterWaitTimeoutMillis(); removes the internalDynamicFilter/Stopwatchfields and wait loop; removesiceberg.dynamic-filtering.wait-timeoutconfig property (@DefunctConfig) and thedynamic_filtering_wait_timeoutconnector session property.HiveSplitSourceoverrides the newgetNextBatchandgetRequestedDynamicFilterWaitTimeoutMillis(). The partition loader receives the covered-columns set at construction; the per-batch predicate snapshot is shared via aCompletableFutureupdated on eachgetNextBatchcall.JdbcDynamicFilteringSplitSource/DynamicFilteringJdbcSplitSourceoverride the new method and timeout.ConnectorSplitManager#getSplitswithSet<ColumnHandle>.getNextBatch(int)continue to work via the default bridge — no changes required.Additional context and related issues