From a8dd74802152730045f656a3baafb02c59a055f7 Mon Sep 17 00:00:00 2001 From: Evan Wu Date: Tue, 12 May 2026 10:12:44 -0700 Subject: [PATCH 1/7] Spark: Backport migrate SparkCopyOnWriteScan to SupportsRuntimeV2Filtering --- .../spark/source/SparkCopyOnWriteScan.java | 64 +++++++++++-------- .../spark/source/SparkCopyOnWriteScan.java | 64 +++++++++++-------- .../spark/source/SparkCopyOnWriteScan.java | 64 +++++++++++-------- 3 files changed, 114 insertions(+), 78 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java index dbf5d455b948..69d3c098530c 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java @@ -32,18 +32,17 @@ import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.metrics.ScanReport; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.expressions.Expressions; +import org.apache.spark.sql.connector.expressions.Literal; import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.sql.connector.expressions.filter.Predicate; import org.apache.spark.sql.connector.read.Statistics; -import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering; -import org.apache.spark.sql.sources.Filter; -import org.apache.spark.sql.sources.In; +import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.MetadataBuilder; import org.apache.spark.sql.types.StructField; @@ -52,7 +51,7 @@ import org.slf4j.LoggerFactory; class SparkCopyOnWriteScan extends SparkPartitioningAwareScan - implements SupportsRuntimeFiltering { + implements SupportsRuntimeV2Filtering { private static final Logger LOG = LoggerFactory.getLogger(SparkCopyOnWriteScan.class); @@ -118,26 +117,11 @@ public NamedReference[] filterAttributes() { } @Override - public void filter(Filter[] filters) { - Preconditions.checkState( - Objects.equals(snapshotId(), currentSnapshotId()), - "Runtime file filtering is not possible: the table has been concurrently modified. " - + "Row-level operation scan snapshot ID: %s, current table snapshot ID: %s. " - + "If an external process modifies the table, enable table caching in the catalog. " - + "If multiple threads modify the table, use independent Spark sessions in each thread.", - snapshotId(), - currentSnapshotId()); - - for (Filter filter : filters) { - // Spark can only pass In filters at the moment - if (filter instanceof In - && ((In) filter).attribute().equalsIgnoreCase(MetadataColumns.FILE_PATH.name())) { - In in = (In) filter; - - Set fileLocations = Sets.newHashSet(); - for (Object value : in.values()) { - fileLocations.add((String) value); - } + public void filter(Predicate[] predicates) { + for (Predicate predicate : predicates) { + // Spark can only pass IN predicates at the moment + if (isFilePathInPredicate(predicate)) { + Set fileLocations = extractStringLiterals(predicate); // Spark may call this multiple times for UPDATEs with subqueries // as such cases are rewritten using UNION and the same scan on both sides @@ -159,7 +143,7 @@ public void filter(Filter[] filters) { resetTasks(filteredTasks); } } else { - LOG.warn("Unsupported runtime filter {}", filter); + LOG.warn("Unsupported runtime filter {}", predicate); } } } @@ -228,4 +212,32 @@ private boolean isRowLineageField(StructField field) { || field.name().equals(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name()); return hasLineageFieldName && field.metadata().contains("__metadata_col"); } + + private static boolean isFilePathInPredicate(Predicate predicate) { + if (!"IN".equals(predicate.name()) || predicate.children().length < 1) { + return false; + } + + if (!(predicate.children()[0] instanceof NamedReference)) { + return false; + } + + String[] fieldNames = ((NamedReference) predicate.children()[0]).fieldNames(); + + return fieldNames.length == 1 + && fieldNames[0].equalsIgnoreCase(MetadataColumns.FILE_PATH.name()); + } + + private static Set extractStringLiterals(Predicate predicate) { + Set values = Sets.newHashSet(); + for (int i = 1; i < predicate.children().length; i++) { + if (predicate.children()[i] instanceof Literal) { + Object value = ((Literal) predicate.children()[i]).value(); + // V2 string literals come through as UTF8String; toString() materializes the Java String + values.add(value.toString()); + } + } + + return values; + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java index dbf5d455b948..69d3c098530c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java @@ -32,18 +32,17 @@ import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.metrics.ScanReport; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.expressions.Expressions; +import org.apache.spark.sql.connector.expressions.Literal; import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.sql.connector.expressions.filter.Predicate; import org.apache.spark.sql.connector.read.Statistics; -import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering; -import org.apache.spark.sql.sources.Filter; -import org.apache.spark.sql.sources.In; +import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.MetadataBuilder; import org.apache.spark.sql.types.StructField; @@ -52,7 +51,7 @@ import org.slf4j.LoggerFactory; class SparkCopyOnWriteScan extends SparkPartitioningAwareScan - implements SupportsRuntimeFiltering { + implements SupportsRuntimeV2Filtering { private static final Logger LOG = LoggerFactory.getLogger(SparkCopyOnWriteScan.class); @@ -118,26 +117,11 @@ public NamedReference[] filterAttributes() { } @Override - public void filter(Filter[] filters) { - Preconditions.checkState( - Objects.equals(snapshotId(), currentSnapshotId()), - "Runtime file filtering is not possible: the table has been concurrently modified. " - + "Row-level operation scan snapshot ID: %s, current table snapshot ID: %s. " - + "If an external process modifies the table, enable table caching in the catalog. " - + "If multiple threads modify the table, use independent Spark sessions in each thread.", - snapshotId(), - currentSnapshotId()); - - for (Filter filter : filters) { - // Spark can only pass In filters at the moment - if (filter instanceof In - && ((In) filter).attribute().equalsIgnoreCase(MetadataColumns.FILE_PATH.name())) { - In in = (In) filter; - - Set fileLocations = Sets.newHashSet(); - for (Object value : in.values()) { - fileLocations.add((String) value); - } + public void filter(Predicate[] predicates) { + for (Predicate predicate : predicates) { + // Spark can only pass IN predicates at the moment + if (isFilePathInPredicate(predicate)) { + Set fileLocations = extractStringLiterals(predicate); // Spark may call this multiple times for UPDATEs with subqueries // as such cases are rewritten using UNION and the same scan on both sides @@ -159,7 +143,7 @@ public void filter(Filter[] filters) { resetTasks(filteredTasks); } } else { - LOG.warn("Unsupported runtime filter {}", filter); + LOG.warn("Unsupported runtime filter {}", predicate); } } } @@ -228,4 +212,32 @@ private boolean isRowLineageField(StructField field) { || field.name().equals(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name()); return hasLineageFieldName && field.metadata().contains("__metadata_col"); } + + private static boolean isFilePathInPredicate(Predicate predicate) { + if (!"IN".equals(predicate.name()) || predicate.children().length < 1) { + return false; + } + + if (!(predicate.children()[0] instanceof NamedReference)) { + return false; + } + + String[] fieldNames = ((NamedReference) predicate.children()[0]).fieldNames(); + + return fieldNames.length == 1 + && fieldNames[0].equalsIgnoreCase(MetadataColumns.FILE_PATH.name()); + } + + private static Set extractStringLiterals(Predicate predicate) { + Set values = Sets.newHashSet(); + for (int i = 1; i < predicate.children().length; i++) { + if (predicate.children()[i] instanceof Literal) { + Object value = ((Literal) predicate.children()[i]).value(); + // V2 string literals come through as UTF8String; toString() materializes the Java String + values.add(value.toString()); + } + } + + return values; + } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java index ee4be2461894..28dd2e05612c 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java @@ -32,22 +32,21 @@ import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.metrics.ScanReport; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.expressions.Expressions; +import org.apache.spark.sql.connector.expressions.Literal; import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.sql.connector.expressions.filter.Predicate; import org.apache.spark.sql.connector.read.Statistics; -import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering; -import org.apache.spark.sql.sources.Filter; -import org.apache.spark.sql.sources.In; +import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering; import org.slf4j.Logger; import org.slf4j.LoggerFactory; class SparkCopyOnWriteScan extends SparkPartitioningAwareScan - implements SupportsRuntimeFiltering { + implements SupportsRuntimeV2Filtering { private static final Logger LOG = LoggerFactory.getLogger(SparkCopyOnWriteScan.class); @@ -103,26 +102,11 @@ public NamedReference[] filterAttributes() { } @Override - public void filter(Filter[] filters) { - Preconditions.checkState( - Objects.equals(snapshotId(), currentSnapshotId()), - "Runtime file filtering is not possible: the table has been concurrently modified. " - + "Row-level operation scan snapshot ID: %s, current table snapshot ID: %s. " - + "If an external process modifies the table, enable table caching in the catalog. " - + "If multiple threads modify the table, use independent Spark sessions in each thread.", - snapshotId(), - currentSnapshotId()); - - for (Filter filter : filters) { - // Spark can only pass In filters at the moment - if (filter instanceof In - && ((In) filter).attribute().equalsIgnoreCase(MetadataColumns.FILE_PATH.name())) { - In in = (In) filter; - - Set fileLocations = Sets.newHashSet(); - for (Object value : in.values()) { - fileLocations.add((String) value); - } + public void filter(Predicate[] predicates) { + for (Predicate predicate : predicates) { + // Spark can only pass IN predicates at the moment + if (isFilePathInPredicate(predicate)) { + Set fileLocations = extractStringLiterals(predicate); // Spark may call this multiple times for UPDATEs with subqueries // as such cases are rewritten using UNION and the same scan on both sides @@ -144,7 +128,7 @@ public void filter(Filter[] filters) { resetTasks(filteredTasks); } } else { - LOG.warn("Unsupported runtime filter {}", filter); + LOG.warn("Unsupported runtime filter {}", predicate); } } } @@ -188,4 +172,32 @@ private Long currentSnapshotId() { Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table(), branch()); return currentSnapshot != null ? currentSnapshot.snapshotId() : null; } + + private static boolean isFilePathInPredicate(Predicate predicate) { + if (!"IN".equals(predicate.name()) || predicate.children().length < 1) { + return false; + } + + if (!(predicate.children()[0] instanceof NamedReference)) { + return false; + } + + String[] fieldNames = ((NamedReference) predicate.children()[0]).fieldNames(); + + return fieldNames.length == 1 + && fieldNames[0].equalsIgnoreCase(MetadataColumns.FILE_PATH.name()); + } + + private static Set extractStringLiterals(Predicate predicate) { + Set values = Sets.newHashSet(); + for (int i = 1; i < predicate.children().length; i++) { + if (predicate.children()[i] instanceof Literal) { + Object value = ((Literal) predicate.children()[i]).value(); + // V2 string literals come through as UTF8String; toString() materializes the Java String + values.add(value.toString()); + } + } + + return values; + } } From 4b7318e2ac5981c2ce17983ee617877d65b7b296 Mon Sep 17 00:00:00 2001 From: Evan Wu Date: Tue, 12 May 2026 21:18:19 -0700 Subject: [PATCH 2/7] trigger build From 18878abb440fb252f02ccaa529000a00e087f24c Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Wed, 13 May 2026 18:10:00 -0700 Subject: [PATCH 3/7] Update spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java --- .../iceberg/spark/source/SparkCopyOnWriteScan.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java index 69d3c098530c..a94979818f3e 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java @@ -118,6 +118,15 @@ public NamedReference[] filterAttributes() { @Override public void filter(Predicate[] predicates) { + Preconditions.checkState( + Objects.equals(snapshotId(), currentSnapshotId()), + "Runtime file filtering is not possible: the table has been concurrently modified. " + + "Row-level operation scan snapshot ID: %s, current table snapshot ID: %s. " + + "If an external process modifies the table, enable table caching in the catalog. " + + "If multiple threads modify the table, use independent Spark sessions in each thread.", + snapshotId(), + currentSnapshotId()); + for (Predicate predicate : predicates) { // Spark can only pass IN predicates at the moment if (isFilePathInPredicate(predicate)) { From 50c4aa569563ec5f565aec90c34cb0e579040a6b Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Wed, 13 May 2026 18:10:34 -0700 Subject: [PATCH 4/7] Update spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java --- .../iceberg/spark/source/SparkCopyOnWriteScan.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java index 69d3c098530c..a94979818f3e 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java @@ -118,6 +118,15 @@ public NamedReference[] filterAttributes() { @Override public void filter(Predicate[] predicates) { + Preconditions.checkState( + Objects.equals(snapshotId(), currentSnapshotId()), + "Runtime file filtering is not possible: the table has been concurrently modified. " + + "Row-level operation scan snapshot ID: %s, current table snapshot ID: %s. " + + "If an external process modifies the table, enable table caching in the catalog. " + + "If multiple threads modify the table, use independent Spark sessions in each thread.", + snapshotId(), + currentSnapshotId()); + for (Predicate predicate : predicates) { // Spark can only pass IN predicates at the moment if (isFilePathInPredicate(predicate)) { From 8a60f9c0864eb4cd317c8a53a82cbf2086a6fb89 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Wed, 13 May 2026 18:10:52 -0700 Subject: [PATCH 5/7] Update spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java --- .../iceberg/spark/source/SparkCopyOnWriteScan.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java index 28dd2e05612c..f216f4a3308c 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java @@ -103,6 +103,15 @@ public NamedReference[] filterAttributes() { @Override public void filter(Predicate[] predicates) { + Preconditions.checkState( + Objects.equals(snapshotId(), currentSnapshotId()), + "Runtime file filtering is not possible: the table has been concurrently modified. " + + "Row-level operation scan snapshot ID: %s, current table snapshot ID: %s. " + + "If an external process modifies the table, enable table caching in the catalog. " + + "If multiple threads modify the table, use independent Spark sessions in each thread.", + snapshotId(), + currentSnapshotId()); + for (Predicate predicate : predicates) { // Spark can only pass IN predicates at the moment if (isFilePathInPredicate(predicate)) { From 7ed01399848604cf0b6f069dac8af2765d8a79ec Mon Sep 17 00:00:00 2001 From: Evan Wu Date: Wed, 13 May 2026 19:09:17 -0700 Subject: [PATCH 6/7] add back import --- .../org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java | 1 + .../org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java | 1 + .../org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java | 1 + 3 files changed, 3 insertions(+) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java index a94979818f3e..9674a7333fa8 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java @@ -32,6 +32,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.metrics.ScanReport; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java index a94979818f3e..9674a7333fa8 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java @@ -32,6 +32,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.metrics.ScanReport; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java index f216f4a3308c..f957b97d60f5 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java @@ -32,6 +32,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.metrics.ScanReport; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.util.SnapshotUtil; From 8789483572aeb47e2611a7cb84fcccf218117d21 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Wed, 13 May 2026 22:03:20 -0700 Subject: [PATCH 7/7] spark 3.4: SupportsRuntimeFiltering -> SupportsRuntimeV2Filtering --- .../dynamicpruning/RowLevelCommandDynamicPruning.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala index f8acef9fe355..6766ad338b9f 100644 --- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala +++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.catalyst.plans.logical.UpdateIcebergTable import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION import org.apache.spark.sql.catalyst.trees.TreePattern.SORT -import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering +import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits @@ -67,7 +67,7 @@ case class RowLevelCommandDynamicPruning(spark: SparkSession) // apply special dynamic filtering only for plans that don't support deltas case RewrittenRowLevelCommand( command: RowLevelCommand, - DataSourceV2ScanRelation(_, scan: SupportsRuntimeFiltering, _, _, _), + DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _), rewritePlan: ReplaceIcebergData) if conf.dynamicPartitionPruningEnabled && isCandidate(command) =>