[SPARK-56521][SQL] Support PartitionPredicate in runtime filters#55382
[SPARK-56521][SQL] Support PartitionPredicate in runtime filters#55382szehon-ho wants to merge 8 commits intoapache:masterfrom
Conversation
c9c87bd to
939e128
Compare
…ters.nonEmpty, simplify partPredicates
939e128 to
0ba0445
Compare
…panion object Extract runtime filter pushing logic from filteredPartitions into a companion object method with a pattern match guard, removing the asInstanceOf cast.
Move the runtime filter pushing logic from the BatchScanExec companion object to PushDownUtils, co-locating it with the related partition predicate helpers.
Add a pushedPredicates() API to SupportsRuntimeV2Filtering, mirroring SupportsPushDownV2Filters. Use it in pushRuntimeFilters to exclude already-pushed predicates from the second pass and to determine whether replanning is needed.
…pushedPredicates dedup, and comprehensive tests - Use pushedPredicates() to avoid deriving PartitionPredicates from runtime filters whose V2 translation was already accepted in the first filter() pass, preventing duplicate pushdown. - Gate PartitionPredicate candidates on filterAttributes(), consistent with PartitionPruning's planning-time check, using ExprId-based AttributeSet.subsetOf comparison. - Reorganize test suite into 12 numbered cases (with subcases) covering all combinations of DPP/scalar, translated/untranslatable, accepted/rejected, partition/data column, and filterAttributes. - Add configurable test table properties (accept-v2-predicates, filter-attributes) for targeted scenario testing.
cloud-fan
left a comment
There was a problem hiding this comment.
I think there's a regression here that will affect every existing SupportsRuntimeV2Filtering / SupportsRuntimeFiltering implementation. The PR description says "existing connectors are unaffected", but reading pushRuntimeFilters and BatchScanExec.filteredPartitions together, the new code path decides whether to re-plan partitions based on pushedPredicates().nonEmpty — and pushedPredicates() has a default that returns an empty array. Existing connectors don't override it, so filtered is false, scan.toBatch.planInputPartitions() is never re-called, and inputPartitions (lazily evaluated before filter()) is what BatchScanExec scans. The connector's filter() side-effect is effectively dropped. Details inline on F1.
Existing DPP V2 test suites don't catch this because they check runtimeFilters on BatchScanExec and final query answers — post-scan FilterExec preserves correctness. Partition pruning effectiveness is not asserted for V2. Case 11 in the new suite looks like it covers this, but the assertPushedPartitionPredicates helper falls through to Seq.empty for any scan class other than the new test table, so its == 0 assertion is trivially true (F5).
Other findings inline are smaller: a redundant/weaker runtime filterAttributes re-check (F2), a naming inconsistency with the sibling static interface (F3), a missing contract detail in the class Javadoc (F4), a scan-local conf leak in the test (F7), a Scaladoc imprecision (F6), and a minor double-call (F8).
| } | ||
| } | ||
|
|
||
| filterableScan.pushedPredicates().nonEmpty |
There was a problem hiding this comment.
[BLOCKING] The old BatchScanExec always re-called scan.toBatch.planInputPartitions() whenever filter() had been invoked; this return value is meant to replace that signal. But pushedPredicates() is a newly added default-returning-empty method. Every existing SupportsRuntimeV2Filtering / SupportsRuntimeFiltering implementation that doesn't override it will have this return false, causing BatchScanExec.filteredPartitions to fall into the else branch and use the pre-filter inputPartitions — the connector's filter() side effect (partitions pruned in its internal state) is then invisible to Spark. This breaks runtime partition pruning for every existing V2 runtime-filter implementation (including the in-repo InMemoryBatchScan and InMemoryV2FilterBatchScan).
The signal should be "was filter() actually called", not "did the scan self-report". Something like:
var filterCalled = false
if (filtersToTranslated.nonEmpty) {
filterableScan.filter(filtersToTranslated.values.toArray)
filterCalled = true
}
if (filterableScan.supportsIterativeFiltering()) {
// ...
if (partPredicates.nonEmpty) {
filterableScan.filter(partPredicates.toArray)
filterCalled = true
}
}
filterCalledPlease also add a regression test: a scan that doesn't override pushedPredicates() and asserts its filter()-driven partition pruning still takes effect (e.g., via partition count on the resulting batch).
There was a problem hiding this comment.
i think this is overstated, there's actually a API 'supportsIterativeFiltering' that is default false, so it should not affect existing connector.
The rule is now, if the connector returns 'true', it needs to maintain pushedPredicates()
There was a problem hiding this comment.
but that being said, i did add a test for such a connector (that overrides supportsIterativeFiltering=true, but doesn't implement pushedPredicates. the expected behavior is then it gets duplicate predicates in the second round.
| val pushed = filterableScan.pushedPredicates().toSet | ||
| val candidates = runtimeFilters.filter { f => | ||
| !filtersToTranslated.get(f).exists(pushed.contains) && | ||
| f.references.subsetOf(filterAttrs) |
There was a problem hiding this comment.
These candidates are already constrained at planning time: DPP filters in PartitionPruning.scala:82-89 require resExp.references.subsetOf(filterAttrs) via V2ExpressionUtils.resolveRefs; scalar subquery filters in DataSourceV2Strategy.scala:168-173 require f.references.subsetOf(relation.runtimeFilterAttrs) — both using the proper resolver. The runtime reconstruction here uses r.fieldNames.head + output.find(resolver), which drops multi-part paths for nested partition fields and is redundant with planning-time filtering. Can we drop this re-check and rely on the planning-time filters? If it's defense-in-depth, please use V2ExpressionUtils.resolveRefs for consistency with the planning-time path.
There was a problem hiding this comment.
Done. Extracted V2ExpressionUtils.resolveAttributeRefs and use it in both PartitionPruning and PushDownUtils for consistency.
| * | ||
| * @since 4.2.0 | ||
| */ | ||
| default boolean supportsIterativeFiltering() { |
There was a problem hiding this comment.
SupportsPushDownV2Filters already has supportsIterativePushdown() for the same concept. Two names for one capability is a consistency hazard for connectors that implement both interfaces. Worth aligning — supportsIterativeFiltering matches the local filter() verb, supportsIterativePushdown matches the sibling interface. Happy either way, just ideally one name.
There was a problem hiding this comment.
yea to add to the name context, on runtime filter the API is 'filter', so there's no mention of pushdown.
So supportsIterativePushdown may not make sense. But im open as well cc @aokolnychyi
There was a problem hiding this comment.
actually never mind, it does make sense as the new other method is 'pushedPredicate', i changed the name to match now
| * and only one of them should be implemented by the data sources. | ||
| * | ||
| * <p> | ||
| * <b>Iterative filtering:</b> When {@link #supportsIterativeFiltering()} returns true, |
There was a problem hiding this comment.
The description says filter() "may be called multiple times" but doesn't state the call order. The implementation pushes translated V2 predicates first and PartitionPredicate instances second, and the in-repo InMemoryEnhancedRuntimePartitionFilterBatchScan already relies on that ordering. Worth documenting explicitly so implementations know what to expect in each call.
There was a problem hiding this comment.
Done. Updated Javadoc to document the two-pass call order and that the second pass excludes filters already accepted via pushedPredicates().
| checkAnswer(df, Row(3, 3)) | ||
|
|
||
| assertHasRuntimeFilters(df) | ||
| assertPushedPartitionPredicates(df, 0) |
There was a problem hiding this comment.
This assertion is effectively a no-op here. getPushedPartitionPredicates pattern-matches specifically on InMemoryEnhancedRuntimePartitionFilterBatchScan; for any other scan class (case 11 uses InMemoryV2FilterBatchScan) it returns Seq.empty. So assertPushedPartitionPredicates(df, 0) is trivially true regardless of what actually happened — it would pass even if a PartitionPredicate did get pushed. More reliable options: track raw filter() argument types on the test scan and assert no PartitionPredicate was seen, or add a variant of the new test table that returns supportsIterativeFiltering=false so the existing helper can still inspect it.
There was a problem hiding this comment.
Done. Case 11 now uses the enhanced catalog with supports-iterative-filtering=false table property, so the assertPushedPartitionPredicates helper can properly inspect the scan.
| * Only runtime filters that were not already translated are used to derive PartitionPredicates | ||
| * in the second pass, avoiding duplicate pushdown. |
There was a problem hiding this comment.
The code filter is !filtersToTranslated.get(f).exists(pushed.contains) — i.e., exclude filters whose translation was already accepted (present in pushedPredicates()). Translated-but-rejected filters are still candidates. The inline comment on line 161-163 already states this correctly.
| * Only runtime filters that were not already translated are used to derive PartitionPredicates | |
| * in the second pass, avoiding duplicate pushdown. | |
| * Only runtime filters whose translated form was not already accepted by the data source in | |
| * the first pass are used to derive PartitionPredicates in the second pass, avoiding duplicate | |
| * pushdown. |
There was a problem hiding this comment.
Done. Updated scaladoc to clarify "accepted" vs "rejected".
|
|
||
| test("case 11: supportsIterativeFiltering is false -> no PartitionPredicate") { | ||
| val baseCatalog = "testv2filterNoIterative" | ||
| spark.conf.set(s"spark.sql.catalog.$baseCatalog", |
There was a problem hiding this comment.
This conf isn't unset; only spark.sessionState.catalogManager.reset() runs in after. Consider wrapping with withSQLConf (or unsetting at the end) so the conf is cleaned up between tests.
There was a problem hiding this comment.
Done. Wrapped with withSQLConf.
| if (filterableScan.supportsIterativeFiltering()) { | ||
| val filterAttrs = AttributeSet(filterableScan.filterAttributes() | ||
| .flatMap(r => output.find(a => SQLConf.get.resolver(a.name, r.fieldNames.head)))) | ||
| val pushed = filterableScan.pushedPredicates().toSet |
There was a problem hiding this comment.
Minor — pushedPredicates() is invoked twice (here and again on line 180 for the return signal). For a connector with a non-trivial implementation, one call would do:
val before = filterableScan.pushedPredicates().toSet
// ... second-pass logic uses `before` ...
// at the end, reuse `filterCalled` (from the F1 suggestion) instead of re-querying pushedPredicatesThere was a problem hiding this comment.
Done. Refactored to val approach — pushedPredicates() is now called only once, and the return value uses translatedFiltersPushed || partPredicatesPushed.
…test improvements - Return translatedFiltersPushed || partPredicatesPushed instead of pushedPredicates().nonEmpty, so filter() side effects are visible even if the connector does not override pushedPredicates(). - Extract V2ExpressionUtils.resolveAttributeRefs to share resolution logic between PartitionPruning and PushDownUtils. - Clarify SupportsRuntimeV2Filtering javadoc: document two-pass call order and that the second pass excludes already-accepted filters. - Refactor case 11 to use the enhanced catalog with supports-iterative-filtering=false property and withSQLConf. - Add regression test for buggy connector that omits first-pass filters from pushedPredicates(). - Code tidying in InMemoryEnhancedRuntimePartitionFilterTable.
cloud-fan
left a comment
There was a problem hiding this comment.
Re-review of 6b07f10: 7 addressed, 1 remaining, 0 new.
Addressed:
- F1 (BLOCKING):
pushRuntimeFiltersnow returnstranslatedFiltersPushed || partPredicatesPushed— a direct signal of whetherfilter()was invoked, independent ofpushedPredicates(). Added a regression test (pushedPredicates() omits first-pass filters -> second round still prunes) that covers the buggy-connector case. - F2: the runtime
filterAttributesre-check now uses the new sharedV2ExpressionUtils.resolveAttributeRefshelper, soPushDownUtilsandPartitionPruninguse the same resolver. - F4: class-level Javadoc on
SupportsRuntimeV2Filteringnow documents the two-pass call order (translated V2 predicates first,PartitionPredicatesecond). - F5: case 11 now uses the enhanced catalog with a new
supports-iterative-filtering=falsetable property, soassertPushedPartitionPredicatesactually inspects the scan instead of trivially matchingSeq.empty. - F6: scaladoc on
pushRuntimeFiltersnow distinguishes "not already accepted" from "not already translated". - F7: case 11 now wraps its scan-local conf in
withSQLConffor proper cleanup. - F8:
pushedPredicates()is invoked once perpushRuntimeFilterscall (via avalthat is reused for both the candidate filter and the return signal).
Remaining:
- F3:
supportsIterativeFilteringvs. the siblingSupportsPushDownV2Filters#supportsIterativePushdownnaming is still open pending @aokolnychyi's input (#55382 (comment)). No blocker from my side either way — just one name.
New: none.
…tivePushdown Align with SupportsPushDownV2Filters.supportsIterativePushdown() naming.
cloud-fan
left a comment
There was a problem hiding this comment.
Re-review of f426fbc: 1 addressed, 0 remaining, 0 new — F3 is now resolved.
Addressed this round:
- F3:
supportsIterativeFilteringrenamed tosupportsIterativePushdown, aligning withSupportsPushDownV2Filters#supportsIterativePushdown(). Test property key and test case names updated consistently; no stale references remain.
All eight findings from the original review are now resolved. No new concerns on my side.
|
thanks. merging to master! |
What changes were proposed in this pull request?
This PR introduces
PartitionPredicatesupport in runtime filters for DataSource V2 scans. Currently,PartitionPredicateis only used in static filter pushdown and metadata-only delete paths. This extends the same mechanism to runtime filters (Dynamic Partition Pruning and scalar subqueries).Changes:
SupportsRuntimeV2Filtering: AddedsupportsIterativeFiltering()andpushedPredicates()default methods. When a scan returnstruefromsupportsIterativeFiltering(), Spark may callfilter()multiple times — first with translated V2 predicates, then withPartitionPredicateinstances derived from runtime filters. ThepushedPredicates()method (mirroringSupportsPushDownV2Filters) allows Spark to determine which predicates were already accepted in the first pass, avoiding duplicate pushdown.BatchScanExec: After the existing runtime filter pushdown, if the scan supports iterative filtering, derivesPartitionPredicateinstances from DPP expressions and literalized scalar subqueries and pushes them in a secondfilter()call.PushDownUtils: RefactoredpushRuntimeFilters()to track which runtime filter expressions were translated to V2 predicates. UsespushedPredicates()to exclude filters already accepted in the first pass fromPartitionPredicatederivation. Candidates are further gated byfilterAttributes()— only runtime filters whose referenced columns are declared infilterAttributes()are eligible forPartitionPredicatederivation, consistent withPartitionPruning's planning-time check.Why are the changes needed?
Runtime filters (DPP and scalar subqueries) currently push V2 predicates to connectors, but connectors have no way to receive partition-level predicates with evaluable functions.
PartitionPredicatewraps a Catalyst expression that connectors can evaluate directly against partition keys, enabling more efficient partition pruning at runtime without needing to translate expressions into the connector's native predicate format.The
pushedPredicates()method is needed to prevent the same logical filter from being pushed twice — once as a translated V2 predicate and again as aPartitionPredicate. ThefilterAttributes()gate ensures that only filters on declared filterable columns are considered, aligning runtime behavior with the static planning-time checks inPartitionPruning.This is a sub-task of the DSV2 Enhanced Partition Stats Filtering umbrella (SPARK-55596).
Does this PR introduce any user-facing change?
Yes. Connectors implementing
SupportsRuntimeV2Filteringcan now:supportsIterativeFiltering()to returntrueand receivePartitionPredicateinstances viafilter()during runtime filtering.pushedPredicates()to report which predicates were accepted, so Spark avoids redundant pushdown.This is an opt-in API addition; existing connectors are unaffected.
How was this patch tested?
Added
DataSourceV2EnhancedRuntimePartitionFilterSuitewith 12 numbered test cases (and subcases) covering all combinations of:filterAttributes()Supporting test infrastructure:
InMemoryEnhancedRuntimePartitionFilterTable(with configurableaccept-v2-predicatesandfilter-attributestable properties) andInMemoryTableEnhancedRuntimePartitionFilterCatalog.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor (Claude Opus 4)