Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public static Object get(InternalRow internalRow, int pos, DataType fieldType) {
return internalRow.getTimestampNtz(pos, timestampType.getPrecision());
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType lzTs = (LocalZonedTimestampType) fieldType;
return internalRow.getTimestampNtz(pos, lzTs.getPrecision());
return internalRow.getTimestampLtz(pos, lzTs.getPrecision());
case FLOAT:
return internalRow.getFloat(pos);
case DOUBLE:
Expand All @@ -188,6 +188,7 @@ public static Object get(InternalRow internalRow, int pos, DataType fieldType) {
return internalRow.getDecimal(
pos, decimalType.getPrecision(), decimalType.getScale());
case BINARY:
case BYTES:
Comment thread
fresh-borzoni marked this conversation as resolved.
return internalRow.getBytes(pos);
default:
throw new UnsupportedOperationException("Unsupported type: " + fieldType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import org.apache.fluss.exception.InvalidPartitionException;
import org.apache.fluss.metadata.PartitionSpec;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
import org.apache.fluss.types.DataTypeRoot;
import org.apache.fluss.types.RowType;

import java.time.Instant;
import java.time.ZonedDateTime;
Expand Down Expand Up @@ -345,4 +348,28 @@ public static String convertValueOfType(Object value, DataTypeRoot type) {
}
return stringPartitionKey;
}

/** Projects {@code tableInfo}'s row type down to its partition key columns, in key order. */
public static RowType partitionRowType(TableInfo tableInfo) {
RowType schema = tableInfo.getRowType();
List<String> fieldNames = schema.getFieldNames();
int[] indexes =
tableInfo.getPartitionKeys().stream().mapToInt(fieldNames::indexOf).toArray();
return schema.project(indexes);
}

/**
* Builds a row of typed partition values by parsing each string with {@link
* #parseValueOfType(String, DataTypeRoot)} for the column at that ordinal in {@code
* partitionRowType}.
*/
public static GenericRow toPartitionRow(
List<String> partitionValues, RowType partitionRowType) {
GenericRow row = new GenericRow(partitionValues.size());
for (int i = 0; i < partitionValues.size(); i++) {
DataTypeRoot type = partitionRowType.getTypeAt(i).getTypeRoot();
row.setField(i, parseValueOfType(partitionValues.get(i), type));
}
return row;
}
Comment thread
fresh-borzoni marked this conversation as resolved.
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@
import static org.apache.fluss.flink.utils.PredicateConverter.convertToFlussPredicate;
import static org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
import static org.apache.fluss.flink.utils.PushdownUtils.extractFieldEquals;
import static org.apache.fluss.flink.utils.StringifyPredicateVisitor.stringifyPartitionPredicate;
import static org.apache.fluss.utils.Preconditions.checkNotNull;

/** Flink table source to scan Fluss data. */
Expand Down Expand Up @@ -627,8 +626,7 @@ && hasPrimaryKey()
} else {
acceptedFilters.add(filter);
}
// Convert literals in the predicate to partition string
converted.add(stringifyPartitionPredicate(p));
converted.add(p);
} else {
remainingFilters.add(filter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.predicate.Predicate;
import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.shaded.guava32.com.google.common.collect.Lists;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.ExceptionUtils;
import org.apache.fluss.utils.PartitionUtils;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
Expand Down Expand Up @@ -558,9 +558,13 @@ private List<PartitionInfo> applyPartitionFilter(List<PartitionInfo> partitionIn
return partitionInfos;
} else {
int originalSize = partitionInfos.size();
RowType partitionRowType = PartitionUtils.partitionRowType(tableInfo);
Comment thread
fresh-borzoni marked this conversation as resolved.
List<PartitionInfo> filteredPartitionInfos =
partitionInfos.stream()
.filter(partition -> partitionFilters.test(toInternalRow(partition)))
.filter(
partition ->
partitionFilters.test(
toInternalRow(partition, partitionRowType)))
.collect(Collectors.toList());

int filteredSize = filteredPartitionInfos.size();
Expand All @@ -583,14 +587,10 @@ private List<PartitionInfo> applyPartitionFilter(List<PartitionInfo> partitionIn
}
}

private static InternalRow toInternalRow(PartitionInfo partitionInfo) {
List<String> partitionValues =
partitionInfo.getResolvedPartitionSpec().getPartitionValues();
GenericRow genericRow = new GenericRow(partitionValues.size());
for (int i = 0; i < partitionValues.size(); i++) {
genericRow.setField(i, BinaryString.fromString(partitionValues.get(i)));
}
return genericRow;
private static InternalRow toInternalRow(
Comment thread
fresh-borzoni marked this conversation as resolved.
Outdated
PartitionInfo partitionInfo, RowType partitionRowType) {
return PartitionUtils.toPartitionRow(
partitionInfo.getResolvedPartitionSpec().getPartitionValues(), partitionRowType);
}

/** Init the splits for Fluss. */
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1756,6 +1756,32 @@ private List<String> writeRowsToTwoPartition(TablePath tablePath, Collection<Str
return expectedRowValues;
}

@Test
void testReadPartitionPushDownWithIntPartitionRangePredicate() throws Exception {
tEnv.executeSql(
"create table int_partitioned_table"
+ " (a int not null, b varchar, pt int, primary key (a, pt) NOT ENFORCED) partitioned by (pt) ");
TablePath tablePath = TablePath.of(DEFAULT_DB, "int_partitioned_table");
tEnv.executeSql("alter table int_partitioned_table add partition (pt=2)");
tEnv.executeSql("alter table int_partitioned_table add partition (pt=10)");

List<InternalRow> rows = new ArrayList<>();
for (int i = 0; i < 3; i++) {
rows.add(row(i, "v" + i, 2));
rows.add(row(i, "v" + i, 10));
}
writeRows(conn, tablePath, rows, false);
FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);

List<String> expected = new ArrayList<>();
for (int i = 0; i < 3; i++) {
expected.add(String.format("+I[%d, v%d, 10]", i, i));
}
CloseableIterator<Row> rowIter =
tEnv.executeSql("select * from int_partitioned_table where pt > 2").collect();
assertResultsIgnoreOrder(rowIter, expected, true);
}

@Test
void testStreamingReadPartitionPushDownWithInExpr() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ class FlussAppendBatch(

if (tableInfo.isPartitioned) {
val matching =
SparkPartitionPredicate.filterPartitions(partitionInfos.asScala.toSeq, partitionPredicate)
SparkPartitionPredicate.filterPartitions(
tableInfo,
partitionInfos.asScala.toSeq,
partitionPredicate)
matching
.map {
partitionInfo =>
Expand Down Expand Up @@ -240,7 +243,10 @@ class FlussUpsertBatch(

if (tableInfo.isPartitioned) {
val matching =
SparkPartitionPredicate.filterPartitions(partitionInfos.asScala.toSeq, partitionPredicate)
SparkPartitionPredicate.filterPartitions(
tableInfo,
partitionInfos.asScala.toSeq,
partitionPredicate)
matching.flatMap {
partitionInfo =>
val partitionName = partitionInfo.getPartitionName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.fluss.spark.utils

import org.apache.fluss.metadata.{PartitionInfo, TableInfo}
import org.apache.fluss.predicate.{CompoundPredicate, LeafPredicate, PartitionPredicateVisitor, Predicate => FlussPredicate, PredicateBuilder, PredicateVisitor}
import org.apache.fluss.row.{BinaryString, GenericRow}
import org.apache.fluss.types.{DataTypes, RowType}
import org.apache.fluss.predicate.{PartitionPredicateVisitor, Predicate => FlussPredicate, PredicateBuilder}
import org.apache.fluss.utils.PartitionUtils

import org.apache.spark.sql.connector.expressions.filter.Predicate
Expand All @@ -34,15 +32,14 @@ object SparkPartitionPredicate {
val partitionKeys = tableInfo.getPartitionKeys
if (partitionKeys.isEmpty) return None

val rowType = partitionRowType(tableInfo)
val rowType = PartitionUtils.partitionRowType(tableInfo)
val onlyPartitionKeys = new PartitionPredicateVisitor(partitionKeys)

val converted = predicates.flatMap {
sparkPredicate =>
SparkPredicateConverter
.convert(rowType, sparkPredicate)
.filter(_.visit(onlyPartitionKeys))
.map(stringifyLiterals)
}

converted match {
Expand All @@ -53,54 +50,17 @@ object SparkPartitionPredicate {
}

def filterPartitions(
tableInfo: TableInfo,
partitionInfos: Seq[PartitionInfo],
partitionPredicate: Option[FlussPredicate]): Seq[PartitionInfo] =
partitionPredicate match {
case None => partitionInfos
case Some(predicate) => partitionInfos.filter(p => predicate.test(toPartitionRow(p)))
case Some(predicate) =>
val rowType = PartitionUtils.partitionRowType(tableInfo)
partitionInfos.filter {
p =>
predicate.test(
Comment thread
fresh-borzoni marked this conversation as resolved.
PartitionUtils.toPartitionRow(p.getResolvedPartitionSpec.getPartitionValues, rowType))
Comment thread
fresh-borzoni marked this conversation as resolved.
}
}

private def partitionRowType(tableInfo: TableInfo): RowType = {
val schemaRowType = tableInfo.getRowType
val fieldNames = schemaRowType.getFieldNames
val partitionFieldIndexes = tableInfo.getPartitionKeys.asScala.map(fieldNames.indexOf).toArray
schemaRowType.project(partitionFieldIndexes)
}

private def toPartitionRow(partitionInfo: PartitionInfo): GenericRow = {
val values = partitionInfo.getResolvedPartitionSpec.getPartitionValues
val row = new GenericRow(values.size)
var i = 0
while (i < values.size) {
row.setField(i, BinaryString.fromString(values.get(i)))
i += 1
}
row
}

// Partition values are stored as strings; literals must be coerced before evaluation.
private val stringifier: PredicateVisitor[FlussPredicate] = new PredicateVisitor[FlussPredicate] {
override def visit(leaf: LeafPredicate): FlussPredicate = {
val converted: Seq[Object] = leaf.literals.asScala.toSeq.map {
case null => null
case literal =>
BinaryString.fromString(
PartitionUtils.convertValueOfType(literal, leaf.`type`.getTypeRoot))
}
new LeafPredicate(
leaf.function,
DataTypes.STRING,
leaf.index,
leaf.fieldName,
converted.asJava)
}

override def visit(compound: CompoundPredicate): FlussPredicate = {
val children = compound.children.asScala.map(_.visit(this)).asJava
new CompoundPredicate(compound.function, children)
}
}

private def stringifyLiterals(predicate: FlussPredicate): FlussPredicate =
predicate.visit(stringifier)
}
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,19 @@ class SparkLogTableReadTest extends FlussSparkTestBase {
}
}

test("Spark Read: partition pushdown — INT partition with range predicate") {
withTable("t") {
sql(s"""
|CREATE TABLE $DEFAULT_DATABASE.t (id BIGINT, pt INT)
|PARTITIONED BY (pt)""".stripMargin)
sql(s"INSERT INTO $DEFAULT_DATABASE.t VALUES (1, 2), (2, 10)")

val query = sql(s"SELECT * FROM $DEFAULT_DATABASE.t WHERE pt > 2 ORDER BY id")
checkAnswer(query, Row(2L, 10) :: Nil)
assert(partitionPredicate(query).isDefined)
}
}

test("Spark Read: scan description surfaces partition filter when pushed") {
withPartitionedTable {
val withPart = sql(s"SELECT * FROM $DEFAULT_DATABASE.t WHERE dt = '2026-01-01'")
Expand Down