Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,17 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.expressions.Expressions;
import org.apache.spark.sql.connector.expressions.Literal;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.In;
import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
Expand All @@ -52,7 +51,7 @@
import org.slf4j.LoggerFactory;

class SparkCopyOnWriteScan extends SparkPartitioningAwareScan<FileScanTask>
implements SupportsRuntimeFiltering {
implements SupportsRuntimeV2Filtering {

private static final Logger LOG = LoggerFactory.getLogger(SparkCopyOnWriteScan.class);

Expand Down Expand Up @@ -118,26 +117,11 @@ public NamedReference[] filterAttributes() {
}

@Override
public void filter(Filter[] filters) {
Preconditions.checkState(
Objects.equals(snapshotId(), currentSnapshotId()),
"Runtime file filtering is not possible: the table has been concurrently modified. "
+ "Row-level operation scan snapshot ID: %s, current table snapshot ID: %s. "
+ "If an external process modifies the table, enable table caching in the catalog. "
+ "If multiple threads modify the table, use independent Spark sessions in each thread.",
snapshotId(),
currentSnapshotId());

Comment on lines -122 to -130
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's removed in https://github.com/apache/iceberg/pull/15240/changes#diff-417c91abd0a86688c3f4f45fba4e80aac9e221bc8e7aa1a59e74ce27ce2222c5. We need to keep import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

for (Filter filter : filters) {
// Spark can only pass In filters at the moment
if (filter instanceof In
&& ((In) filter).attribute().equalsIgnoreCase(MetadataColumns.FILE_PATH.name())) {
In in = (In) filter;

Set<String> fileLocations = Sets.newHashSet();
for (Object value : in.values()) {
fileLocations.add((String) value);
}
public void filter(Predicate[] predicates) {
for (Predicate predicate : predicates) {
Comment thread
kevinjqliu marked this conversation as resolved.
// Spark can only pass IN predicates at the moment
if (isFilePathInPredicate(predicate)) {
Set<String> fileLocations = extractStringLiterals(predicate);

// Spark may call this multiple times for UPDATEs with subqueries
// as such cases are rewritten using UNION and the same scan on both sides
Expand All @@ -159,7 +143,7 @@ public void filter(Filter[] filters) {
resetTasks(filteredTasks);
}
} else {
LOG.warn("Unsupported runtime filter {}", filter);
LOG.warn("Unsupported runtime filter {}", predicate);
}
}
}
Expand Down Expand Up @@ -228,4 +212,32 @@ private boolean isRowLineageField(StructField field) {
|| field.name().equals(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name());
return hasLineageFieldName && field.metadata().contains("__metadata_col");
}

private static boolean isFilePathInPredicate(Predicate predicate) {
if (!"IN".equals(predicate.name()) || predicate.children().length < 1) {
return false;
}

if (!(predicate.children()[0] instanceof NamedReference)) {
return false;
}

String[] fieldNames = ((NamedReference) predicate.children()[0]).fieldNames();

return fieldNames.length == 1
&& fieldNames[0].equalsIgnoreCase(MetadataColumns.FILE_PATH.name());
}

private static Set<String> extractStringLiterals(Predicate predicate) {
Set<String> values = Sets.newHashSet();
for (int i = 1; i < predicate.children().length; i++) {
if (predicate.children()[i] instanceof Literal) {
Object value = ((Literal<?>) predicate.children()[i]).value();
// V2 string literals come through as UTF8String; toString() materializes the Java String
values.add(value.toString());
}
}

return values;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,17 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.expressions.Expressions;
import org.apache.spark.sql.connector.expressions.Literal;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.In;
import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
Expand All @@ -52,7 +51,7 @@
import org.slf4j.LoggerFactory;

class SparkCopyOnWriteScan extends SparkPartitioningAwareScan<FileScanTask>
implements SupportsRuntimeFiltering {
implements SupportsRuntimeV2Filtering {

private static final Logger LOG = LoggerFactory.getLogger(SparkCopyOnWriteScan.class);

Expand Down Expand Up @@ -118,26 +117,11 @@ public NamedReference[] filterAttributes() {
}

@Override
public void filter(Filter[] filters) {
Preconditions.checkState(
Objects.equals(snapshotId(), currentSnapshotId()),
"Runtime file filtering is not possible: the table has been concurrently modified. "
+ "Row-level operation scan snapshot ID: %s, current table snapshot ID: %s. "
+ "If an external process modifies the table, enable table caching in the catalog. "
+ "If multiple threads modify the table, use independent Spark sessions in each thread.",
snapshotId(),
currentSnapshotId());

for (Filter filter : filters) {
// Spark can only pass In filters at the moment
if (filter instanceof In
&& ((In) filter).attribute().equalsIgnoreCase(MetadataColumns.FILE_PATH.name())) {
In in = (In) filter;

Set<String> fileLocations = Sets.newHashSet();
for (Object value : in.values()) {
fileLocations.add((String) value);
}
public void filter(Predicate[] predicates) {
for (Predicate predicate : predicates) {
Comment thread
kevinjqliu marked this conversation as resolved.
// Spark can only pass IN predicates at the moment
if (isFilePathInPredicate(predicate)) {
Set<String> fileLocations = extractStringLiterals(predicate);

// Spark may call this multiple times for UPDATEs with subqueries
// as such cases are rewritten using UNION and the same scan on both sides
Expand All @@ -159,7 +143,7 @@ public void filter(Filter[] filters) {
resetTasks(filteredTasks);
}
} else {
LOG.warn("Unsupported runtime filter {}", filter);
LOG.warn("Unsupported runtime filter {}", predicate);
}
}
}
Expand Down Expand Up @@ -228,4 +212,32 @@ private boolean isRowLineageField(StructField field) {
|| field.name().equals(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name());
return hasLineageFieldName && field.metadata().contains("__metadata_col");
}

private static boolean isFilePathInPredicate(Predicate predicate) {
if (!"IN".equals(predicate.name()) || predicate.children().length < 1) {
return false;
}

if (!(predicate.children()[0] instanceof NamedReference)) {
return false;
}

String[] fieldNames = ((NamedReference) predicate.children()[0]).fieldNames();

return fieldNames.length == 1
&& fieldNames[0].equalsIgnoreCase(MetadataColumns.FILE_PATH.name());
}

private static Set<String> extractStringLiterals(Predicate predicate) {
Set<String> values = Sets.newHashSet();
for (int i = 1; i < predicate.children().length; i++) {
if (predicate.children()[i] instanceof Literal) {
Object value = ((Literal<?>) predicate.children()[i]).value();
// V2 string literals come through as UTF8String; toString() materializes the Java String
values.add(value.toString());
}
}

return values;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,21 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.expressions.Expressions;
import org.apache.spark.sql.connector.expressions.Literal;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.In;
import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SparkCopyOnWriteScan extends SparkPartitioningAwareScan<FileScanTask>
implements SupportsRuntimeFiltering {
implements SupportsRuntimeV2Filtering {

private static final Logger LOG = LoggerFactory.getLogger(SparkCopyOnWriteScan.class);

Expand Down Expand Up @@ -103,26 +102,11 @@ public NamedReference[] filterAttributes() {
}

@Override
public void filter(Filter[] filters) {
Preconditions.checkState(
Objects.equals(snapshotId(), currentSnapshotId()),
"Runtime file filtering is not possible: the table has been concurrently modified. "
+ "Row-level operation scan snapshot ID: %s, current table snapshot ID: %s. "
+ "If an external process modifies the table, enable table caching in the catalog. "
+ "If multiple threads modify the table, use independent Spark sessions in each thread.",
snapshotId(),
currentSnapshotId());

for (Filter filter : filters) {
// Spark can only pass In filters at the moment
if (filter instanceof In
&& ((In) filter).attribute().equalsIgnoreCase(MetadataColumns.FILE_PATH.name())) {
In in = (In) filter;

Set<String> fileLocations = Sets.newHashSet();
for (Object value : in.values()) {
fileLocations.add((String) value);
}
public void filter(Predicate[] predicates) {
for (Predicate predicate : predicates) {
Comment thread
kevinjqliu marked this conversation as resolved.
// Spark can only pass IN predicates at the moment
if (isFilePathInPredicate(predicate)) {
Set<String> fileLocations = extractStringLiterals(predicate);

// Spark may call this multiple times for UPDATEs with subqueries
// as such cases are rewritten using UNION and the same scan on both sides
Expand All @@ -144,7 +128,7 @@ public void filter(Filter[] filters) {
resetTasks(filteredTasks);
}
} else {
LOG.warn("Unsupported runtime filter {}", filter);
LOG.warn("Unsupported runtime filter {}", predicate);
}
}
}
Expand Down Expand Up @@ -188,4 +172,32 @@ private Long currentSnapshotId() {
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table(), branch());
return currentSnapshot != null ? currentSnapshot.snapshotId() : null;
}

private static boolean isFilePathInPredicate(Predicate predicate) {
if (!"IN".equals(predicate.name()) || predicate.children().length < 1) {
return false;
}

if (!(predicate.children()[0] instanceof NamedReference)) {
return false;
}

String[] fieldNames = ((NamedReference) predicate.children()[0]).fieldNames();

return fieldNames.length == 1
&& fieldNames[0].equalsIgnoreCase(MetadataColumns.FILE_PATH.name());
}

private static Set<String> extractStringLiterals(Predicate predicate) {
Set<String> values = Sets.newHashSet();
for (int i = 1; i < predicate.children().length; i++) {
if (predicate.children()[i] instanceof Literal) {
Object value = ((Literal<?>) predicate.children()[i]).value();
// V2 string literals come through as UTF8String; toString() materializes the Java String
values.add(value.toString());
}
}

return values;
}
}
Loading