From e83a8393770934074bbaa53680b6e1dfa8dac0cd Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Fri, 15 May 2026 03:02:39 +0100 Subject: [PATCH 1/3] [flink][spark] Fix partition pruning for non-string partition keys Partition predicate pushdown stringified literals and partition values before evaluation, so range comparisons fell back to string lexicographic order. An INT partition column with values 2 and 10 under WHERE pt > 2 lex-compared "10" < "2" and incorrectly dropped partition 10. Add PartitionUtils.toPartitionRow and PartitionUtils.partitionRowType in fluss-common. Use them from SparkPartitionPredicate and FlinkSourceEnumerator; drop the stringify step in FlinkTableSource and delete StringifyPredicateVisitor. The stringifier was also hiding two latent gaps in LeafPredicate.get: BYTES had no case (UnsupportedOperationException) and TIMESTAMP_WITH_LOCAL_TIME_ZONE used getTimestampNtz instead of getTimestampLtz (ClassCastException). Both exercised by testStreamingReadAllPartitionTypePushDown; fix in the same file. Regression test for the partition pruning bug added with an INT partition column and a range predicate in SparkLogTableReadTest and FlinkTableSourceITCase. Closes #3292. --- .../apache/fluss/predicate/LeafPredicate.java | 3 +- .../apache/fluss/utils/PartitionUtils.java | 27 +++++++ .../fluss/flink/source/FlinkTableSource.java | 4 +- .../enumerator/FlinkSourceEnumerator.java | 22 +++--- .../utils/StringifyPredicateVisitor.java | 72 ------------------- .../flink/source/FlinkTableSourceITCase.java | 26 +++++++ .../apache/fluss/spark/read/FlussBatch.scala | 10 ++- .../spark/utils/SparkPartitionPredicate.scala | 60 +++------------- .../fluss/spark/SparkLogTableReadTest.scala | 13 ++++ 9 files changed, 98 insertions(+), 139 deletions(-) delete mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/StringifyPredicateVisitor.java diff --git a/fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java b/fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java index c008bd80dd..8918efbf76 100644 --- a/fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java +++ b/fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java @@ -175,7 +175,7 @@ public static Object get(InternalRow internalRow, int pos, DataType fieldType) { return internalRow.getTimestampNtz(pos, timestampType.getPrecision()); case TIMESTAMP_WITH_LOCAL_TIME_ZONE: LocalZonedTimestampType lzTs = (LocalZonedTimestampType) fieldType; - return internalRow.getTimestampNtz(pos, lzTs.getPrecision()); + return internalRow.getTimestampLtz(pos, lzTs.getPrecision()); case FLOAT: return internalRow.getFloat(pos); case DOUBLE: @@ -188,6 +188,7 @@ public static Object get(InternalRow internalRow, int pos, DataType fieldType) { return internalRow.getDecimal( pos, decimalType.getPrecision(), decimalType.getScale()); case BINARY: + case BYTES: return internalRow.getBytes(pos); default: throw new UnsupportedOperationException("Unsupported type: " + fieldType); diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java index 6474d2ce9f..c8f67955ae 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java @@ -22,11 +22,14 @@ import org.apache.fluss.exception.InvalidPartitionException; import org.apache.fluss.metadata.PartitionSpec; import org.apache.fluss.metadata.ResolvedPartitionSpec; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; import org.apache.fluss.types.DataTypeRoot; +import org.apache.fluss.types.RowType; import java.time.Instant; import java.time.ZonedDateTime; @@ -345,4 +348,28 @@ public static String convertValueOfType(Object value, DataTypeRoot type) { } return stringPartitionKey; } + + /** Projects {@code tableInfo}'s row type down to its partition key columns, in key order. */ + public static RowType partitionRowType(TableInfo tableInfo) { + RowType schema = tableInfo.getRowType(); + List fieldNames = schema.getFieldNames(); + int[] indexes = + tableInfo.getPartitionKeys().stream().mapToInt(fieldNames::indexOf).toArray(); + return schema.project(indexes); + } + + /** + * Builds a row of typed partition values by parsing each string with {@link + * #parseValueOfType(String, DataTypeRoot)} for the column at that ordinal in {@code + * partitionRowType}. + */ + public static GenericRow toPartitionRow( + List partitionValues, RowType partitionRowType) { + GenericRow row = new GenericRow(partitionValues.size()); + for (int i = 0; i < partitionValues.size(); i++) { + DataTypeRoot type = partitionRowType.getTypeAt(i).getTypeRoot(); + row.setField(i, parseValueOfType(partitionValues.get(i), type)); + } + return row; + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index 859ef7cfae..ee4d1ba0dd 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -104,7 +104,6 @@ import static org.apache.fluss.flink.utils.PredicateConverter.convertToFlussPredicate; import static org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE; import static org.apache.fluss.flink.utils.PushdownUtils.extractFieldEquals; -import static org.apache.fluss.flink.utils.StringifyPredicateVisitor.stringifyPartitionPredicate; import static org.apache.fluss.utils.Preconditions.checkNotNull; /** Flink table source to scan Fluss data. */ @@ -627,8 +626,7 @@ && hasPrimaryKey() } else { acceptedFilters.add(filter); } - // Convert literals in the predicate to partition string - converted.add(stringifyPartitionPredicate(p)); + converted.add(p); } else { remainingFilters.add(filter); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index e04989cbb2..d012ffbcc7 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -50,11 +50,11 @@ import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.predicate.Predicate; -import org.apache.fluss.row.BinaryString; -import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; import org.apache.fluss.shaded.guava32.com.google.common.collect.Lists; +import org.apache.fluss.types.RowType; import org.apache.fluss.utils.ExceptionUtils; +import org.apache.fluss.utils.PartitionUtils; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; @@ -558,9 +558,13 @@ private List applyPartitionFilter(List partitionIn return partitionInfos; } else { int originalSize = partitionInfos.size(); + RowType partitionRowType = PartitionUtils.partitionRowType(tableInfo); List filteredPartitionInfos = partitionInfos.stream() - .filter(partition -> partitionFilters.test(toInternalRow(partition))) + .filter( + partition -> + partitionFilters.test( + toInternalRow(partition, partitionRowType))) .collect(Collectors.toList()); int filteredSize = filteredPartitionInfos.size(); @@ -583,14 +587,10 @@ private List applyPartitionFilter(List partitionIn } } - private static InternalRow toInternalRow(PartitionInfo partitionInfo) { - List partitionValues = - partitionInfo.getResolvedPartitionSpec().getPartitionValues(); - GenericRow genericRow = new GenericRow(partitionValues.size()); - for (int i = 0; i < partitionValues.size(); i++) { - genericRow.setField(i, BinaryString.fromString(partitionValues.get(i))); - } - return genericRow; + private static InternalRow toInternalRow( + PartitionInfo partitionInfo, RowType partitionRowType) { + return PartitionUtils.toPartitionRow( + partitionInfo.getResolvedPartitionSpec().getPartitionValues(), partitionRowType); } /** Init the splits for Fluss. */ diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/StringifyPredicateVisitor.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/StringifyPredicateVisitor.java deleted file mode 100644 index 1d0f394572..0000000000 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/StringifyPredicateVisitor.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.flink.utils; - -import org.apache.fluss.predicate.CompoundPredicate; -import org.apache.fluss.predicate.LeafPredicate; -import org.apache.fluss.predicate.Predicate; -import org.apache.fluss.predicate.PredicateVisitor; -import org.apache.fluss.row.BinaryString; -import org.apache.fluss.types.DataTypeRoot; -import org.apache.fluss.types.DataTypes; -import org.apache.fluss.utils.PartitionUtils; - -import java.util.ArrayList; -import java.util.List; - -/** - * A {@link PredicateVisitor} that converts all literals in {@link LeafPredicate} to string type in - * the string format of {@link PartitionUtils#convertValueOfType(Object, DataTypeRoot)}. This is - * necessary because partition metadata is stored as string. - */ -public class StringifyPredicateVisitor implements PredicateVisitor { - - public static Predicate stringifyPartitionPredicate(Predicate predicate) { - StringifyPredicateVisitor visitor = new StringifyPredicateVisitor(); - return predicate.visit(visitor); - } - - @Override - public Predicate visit(LeafPredicate predicate) { - List convertedLiterals = new ArrayList<>(); - for (Object literal : predicate.literals()) { - if (literal != null) { - String stringValue = - PartitionUtils.convertValueOfType(literal, predicate.type().getTypeRoot()); - convertedLiterals.add(BinaryString.fromString(stringValue)); - } else { - convertedLiterals.add(null); - } - } - return new LeafPredicate( - predicate.function(), - DataTypes.STRING(), - predicate.index(), - predicate.fieldName(), - convertedLiterals); - } - - @Override - public Predicate visit(CompoundPredicate predicate) { - List newChildren = new ArrayList<>(); - for (Predicate child : predicate.children()) { - newChildren.add(child.visit(this)); - } - return new CompoundPredicate(predicate.function(), newChildren); - } -} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java index 6cebccb9bb..5fccb6d821 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java @@ -1756,6 +1756,32 @@ private List writeRowsToTwoPartition(TablePath tablePath, Collection rows = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + rows.add(row(i, "v" + i, 2)); + rows.add(row(i, "v" + i, 10)); + } + writeRows(conn, tablePath, rows, false); + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath); + + List expected = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + expected.add(String.format("+I[%d, v%d, 10]", i, i)); + } + CloseableIterator rowIter = + tEnv.executeSql("select * from int_partitioned_table where pt > 2").collect(); + assertResultsIgnoreOrder(rowIter, expected, true); + } + @Test void testStreamingReadPartitionPushDownWithInExpr() throws Exception { diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala index 128094a04c..be4b1fa21c 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala @@ -119,7 +119,10 @@ class FlussAppendBatch( if (tableInfo.isPartitioned) { val matching = - SparkPartitionPredicate.filterPartitions(partitionInfos.asScala.toSeq, partitionPredicate) + SparkPartitionPredicate.filterPartitions( + tableInfo, + partitionInfos.asScala.toSeq, + partitionPredicate) matching .map { partitionInfo => @@ -240,7 +243,10 @@ class FlussUpsertBatch( if (tableInfo.isPartitioned) { val matching = - SparkPartitionPredicate.filterPartitions(partitionInfos.asScala.toSeq, partitionPredicate) + SparkPartitionPredicate.filterPartitions( + tableInfo, + partitionInfos.asScala.toSeq, + partitionPredicate) matching.flatMap { partitionInfo => val partitionName = partitionInfo.getPartitionName diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/SparkPartitionPredicate.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/SparkPartitionPredicate.scala index cf300892a9..353334c6e4 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/SparkPartitionPredicate.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/SparkPartitionPredicate.scala @@ -18,9 +18,7 @@ package org.apache.fluss.spark.utils import org.apache.fluss.metadata.{PartitionInfo, TableInfo} -import org.apache.fluss.predicate.{CompoundPredicate, LeafPredicate, PartitionPredicateVisitor, Predicate => FlussPredicate, PredicateBuilder, PredicateVisitor} -import org.apache.fluss.row.{BinaryString, GenericRow} -import org.apache.fluss.types.{DataTypes, RowType} +import org.apache.fluss.predicate.{PartitionPredicateVisitor, Predicate => FlussPredicate, PredicateBuilder} import org.apache.fluss.utils.PartitionUtils import org.apache.spark.sql.connector.expressions.filter.Predicate @@ -34,7 +32,7 @@ object SparkPartitionPredicate { val partitionKeys = tableInfo.getPartitionKeys if (partitionKeys.isEmpty) return None - val rowType = partitionRowType(tableInfo) + val rowType = PartitionUtils.partitionRowType(tableInfo) val onlyPartitionKeys = new PartitionPredicateVisitor(partitionKeys) val converted = predicates.flatMap { @@ -42,7 +40,6 @@ object SparkPartitionPredicate { SparkPredicateConverter .convert(rowType, sparkPredicate) .filter(_.visit(onlyPartitionKeys)) - .map(stringifyLiterals) } converted match { @@ -53,54 +50,17 @@ object SparkPartitionPredicate { } def filterPartitions( + tableInfo: TableInfo, partitionInfos: Seq[PartitionInfo], partitionPredicate: Option[FlussPredicate]): Seq[PartitionInfo] = partitionPredicate match { case None => partitionInfos - case Some(predicate) => partitionInfos.filter(p => predicate.test(toPartitionRow(p))) + case Some(predicate) => + val rowType = PartitionUtils.partitionRowType(tableInfo) + partitionInfos.filter { + p => + predicate.test( + PartitionUtils.toPartitionRow(p.getResolvedPartitionSpec.getPartitionValues, rowType)) + } } - - private def partitionRowType(tableInfo: TableInfo): RowType = { - val schemaRowType = tableInfo.getRowType - val fieldNames = schemaRowType.getFieldNames - val partitionFieldIndexes = tableInfo.getPartitionKeys.asScala.map(fieldNames.indexOf).toArray - schemaRowType.project(partitionFieldIndexes) - } - - private def toPartitionRow(partitionInfo: PartitionInfo): GenericRow = { - val values = partitionInfo.getResolvedPartitionSpec.getPartitionValues - val row = new GenericRow(values.size) - var i = 0 - while (i < values.size) { - row.setField(i, BinaryString.fromString(values.get(i))) - i += 1 - } - row - } - - // Partition values are stored as strings; literals must be coerced before evaluation. - private val stringifier: PredicateVisitor[FlussPredicate] = new PredicateVisitor[FlussPredicate] { - override def visit(leaf: LeafPredicate): FlussPredicate = { - val converted: Seq[Object] = leaf.literals.asScala.toSeq.map { - case null => null - case literal => - BinaryString.fromString( - PartitionUtils.convertValueOfType(literal, leaf.`type`.getTypeRoot)) - } - new LeafPredicate( - leaf.function, - DataTypes.STRING, - leaf.index, - leaf.fieldName, - converted.asJava) - } - - override def visit(compound: CompoundPredicate): FlussPredicate = { - val children = compound.children.asScala.map(_.visit(this)).asJava - new CompoundPredicate(compound.function, children) - } - } - - private def stringifyLiterals(predicate: FlussPredicate): FlussPredicate = - predicate.visit(stringifier) } diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala index 42b0aa62d0..462d9032f6 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala @@ -498,6 +498,19 @@ class SparkLogTableReadTest extends FlussSparkTestBase { } } + test("Spark Read: partition pushdown — INT partition with range predicate") { + withTable("t") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t (id BIGINT, pt INT) + |PARTITIONED BY (pt)""".stripMargin) + sql(s"INSERT INTO $DEFAULT_DATABASE.t VALUES (1, 2), (2, 10)") + + val query = sql(s"SELECT * FROM $DEFAULT_DATABASE.t WHERE pt > 2 ORDER BY id") + checkAnswer(query, Row(2L, 10) :: Nil) + assert(partitionPredicate(query).isDefined) + } + } + test("Spark Read: scan description surfaces partition filter when pushed") { withPartitionedTable { val withPart = sql(s"SELECT * FROM $DEFAULT_DATABASE.t WHERE dt = '2026-01-01'") From d373e0db672102bf476854a1cddd5d5f156fe82d Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Fri, 15 May 2026 13:02:51 +0100 Subject: [PATCH 2/3] fix wrapper --- .../source/enumerator/FlinkSourceEnumerator.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index d012ffbcc7..dac81698e1 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -50,7 +50,6 @@ import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.predicate.Predicate; -import org.apache.fluss.row.InternalRow; import org.apache.fluss.shaded.guava32.com.google.common.collect.Lists; import org.apache.fluss.types.RowType; import org.apache.fluss.utils.ExceptionUtils; @@ -564,7 +563,11 @@ private List applyPartitionFilter(List partitionIn .filter( partition -> partitionFilters.test( - toInternalRow(partition, partitionRowType))) + PartitionUtils.toPartitionRow( + partition + .getResolvedPartitionSpec() + .getPartitionValues(), + partitionRowType))) .collect(Collectors.toList()); int filteredSize = filteredPartitionInfos.size(); @@ -587,12 +590,6 @@ private List applyPartitionFilter(List partitionIn } } - private static InternalRow toInternalRow( - PartitionInfo partitionInfo, RowType partitionRowType) { - return PartitionUtils.toPartitionRow( - partitionInfo.getResolvedPartitionSpec().getPartitionValues(), partitionRowType); - } - /** Init the splits for Fluss. */ private void checkPartitionChanges(Set partitionInfos, Throwable t) { if (closed) { From abeeff45f82fba55c8e3f898c91881b34cf124e0 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sat, 16 May 2026 02:59:08 +0100 Subject: [PATCH 3/3] address comments --- .../flink/source/FlinkTableSourceITCase.java | 30 ++-------------- .../fluss/rpc/util/PredicateMessageUtils.java | 10 +++--- .../rpc/util/PredicateMessageUtilsTest.java | 26 +++++++++----- .../fluss/spark/row/FlussAsSparkRow.scala | 8 ++--- .../spark/utils/SparkPredicateConverter.scala | 4 +-- .../fluss/spark/SparkLogTableReadTest.scala | 35 +++++++++++++++---- .../utils/SparkPredicateConverterTest.scala | 2 +- 7 files changed, 58 insertions(+), 57 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java index 5fccb6d821..564a3e29b6 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java @@ -1228,7 +1228,7 @@ void testStreamingReadAllPartitionTypePushDown() throws Exception { String query = "select * from all_type_partitioned_table where " - + "p_bool is false and p_int not in (12) and p_bigint in (99999) and p_bytes=CAST('Hi' AS VARBINARY) " + + "p_bool is false and p_int < 11 and p_bigint in (99999) and p_bytes=CAST('Hi' AS VARBINARY) " + "and p_string='hello' and p_float=CAST(12.5 AS FLOAT) and p_double=7.88 and p_date=DATE '2025-10-12' " + "and p_time=TIME '12:55:00' and p_ts_ntz=TIMESTAMP '2025-10-12 12:55:00.001' " + "and p_ts_ltz=TO_TIMESTAMP_LTZ(4001, 3)"; @@ -1236,7 +1236,7 @@ void testStreamingReadAllPartitionTypePushDown() throws Exception { assertThat(plan) .contains( "TableSourceScan(table=[[testcatalog, defaultdb, all_type_partitioned_table, " - + "filter=[and(and(and(and(and(and(and(and(and(and(<>(p_int, 12), " + + "filter=[and(and(and(and(and(and(and(and(and(and(<(p_int, 11), " + "=(p_bigint, 99999:BIGINT)), =(p_bytes, X'4869':VARBINARY(2147483647))), " + "=(p_string, _UTF-16LE'hello':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")), " + "=(p_float, 1.25E1:FLOAT)), =(p_double, 7.88E0:DOUBLE)), =(p_date, 2025-10-12)), " @@ -1756,32 +1756,6 @@ private List writeRowsToTwoPartition(TablePath tablePath, Collection rows = new ArrayList<>(); - for (int i = 0; i < 3; i++) { - rows.add(row(i, "v" + i, 2)); - rows.add(row(i, "v" + i, 10)); - } - writeRows(conn, tablePath, rows, false); - FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath); - - List expected = new ArrayList<>(); - for (int i = 0; i < 3; i++) { - expected.add(String.format("+I[%d, v%d, 10]", i, i)); - } - CloseableIterator rowIter = - tEnv.executeSql("select * from int_partitioned_table where pt > 2").collect(); - assertResultsIgnoreOrder(rowIter, expected, true); - } - @Test void testStreamingReadPartitionPushDownWithInExpr() throws Exception { diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/util/PredicateMessageUtils.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/util/PredicateMessageUtils.java index 3eb138e6f4..f492e3957d 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/util/PredicateMessageUtils.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/util/PredicateMessageUtils.java @@ -51,8 +51,6 @@ import org.apache.fluss.types.DecimalType; import org.apache.fluss.types.RowType; -import java.time.LocalDate; -import java.time.LocalTime; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -145,9 +143,9 @@ private static Object toLiteralValue(PbLiteralValue pbLiteral, DataType fieldTyp decimalType.getScale()); } case DATE: - return LocalDate.ofEpochDay(pbLiteral.getBigintValue()); + return (int) pbLiteral.getBigintValue(); case TIME_WITHOUT_TIME_ZONE: - return LocalTime.ofNanoOfDay(pbLiteral.getIntValue() * 1_000_000L); + return pbLiteral.getIntValue(); case TIMESTAMP_WITHOUT_TIME_ZONE: return TimestampNtz.fromMillis( pbLiteral.getTimestampMillisValue(), @@ -282,10 +280,10 @@ private static PbLiteralValue toPbLiteralValue(DataType type, Object literal) { pbLiteral.setIntValue((Integer) literal); break; case DATE: - pbLiteral.setBigintValue(((LocalDate) literal).toEpochDay()); + pbLiteral.setBigintValue(((Integer) literal).longValue()); break; case TIME_WITHOUT_TIME_ZONE: - pbLiteral.setIntValue((int) (((LocalTime) literal).toNanoOfDay() / 1_000_000L)); + pbLiteral.setIntValue((Integer) literal); break; case TIMESTAMP_WITHOUT_TIME_ZONE: pbLiteral.setTimestampMillisValue(((TimestampNtz) literal).getMillisecond()); diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/util/PredicateMessageUtilsTest.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/util/PredicateMessageUtilsTest.java index 8e1e66f76d..76b926d974 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/util/PredicateMessageUtilsTest.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/util/PredicateMessageUtilsTest.java @@ -56,7 +56,6 @@ import java.lang.reflect.Field; import java.math.BigDecimal; -import java.time.LocalDate; import java.time.LocalTime; import java.util.Arrays; import java.util.Collections; @@ -94,6 +93,20 @@ private static RowType buildRowType(LeafPredicate... predicates) { return buildRowType(Arrays.asList(predicates)); } + @Test + public void testLeafPredicateTimeIntegerLiteral() { + int timeMillis = 45_000_000; + DataType type = new TimeType(false, 3); + LeafPredicate predicate = + new LeafPredicate( + Equal.INSTANCE, type, 0, "f_time", Collections.singletonList(timeMillis)); + RowType rowType = buildRowType(predicate); + PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate, rowType); + Predicate result = PredicateMessageUtils.toPredicate(pb, rowType); + LeafPredicate lp = (LeafPredicate) result; + assertThat(lp.literals().get(0)).isEqualTo(timeMillis); + } + @Test public void testLeafPredicateIntEqual() { DataType type = new IntType(false); @@ -317,22 +330,17 @@ public void testPbLiteralSerde() { DataType dateType = new DateType(false); LeafPredicate datePredicate = new LeafPredicate( - Equal.INSTANCE, - dateType, - 8, - "f_date", - Collections.singletonList( - LocalDate.ofEpochDay(19000L))); // days since epoch + Equal.INSTANCE, dateType, 8, "f_date", Collections.singletonList(19000)); // time DataType timeType = new TimeType(false, 3); - LocalTime time = LocalTime.of(12, 30); + int timeMillis = (int) (LocalTime.of(12, 30).toNanoOfDay() / 1_000_000L); LeafPredicate timePredicate = new LeafPredicate( Equal.INSTANCE, timeType, 9, "f_time", - Collections.singletonList(time)); // millis of day + Collections.singletonList(timeMillis)); // timestamp DataType tsType = new TimestampType(false, 3); TimestampNtz ts = TimestampNtz.fromMillis(1680000000000L, 3); diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkRow.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkRow.scala index 4744264704..5b356e033a 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkRow.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkRow.scala @@ -18,7 +18,7 @@ package org.apache.fluss.spark.row import org.apache.fluss.row.{InternalRow => FlussInternalRow} -import org.apache.fluss.types.{ArrayType => FlussArrayType, BinaryType => FlussBinaryType, LocalZonedTimestampType, MapType => FlussMapType, RowType, TimestampType} +import org.apache.fluss.types.{ArrayType => FlussArrayType, BinaryType => FlussBinaryType, BytesType => FlussBytesType, LocalZonedTimestampType, MapType => FlussMapType, RowType, TimestampType} import org.apache.fluss.utils.InternalRowUtils import org.apache.spark.sql.catalyst.{InternalRow => SparkInteralRow} @@ -84,9 +84,9 @@ class FlussAsSparkRow(rowType: RowType) extends SparkInteralRow { DataConverter.toSparkUTF8String(row.getString(ordinal)) } - override def getBinary(ordinal: Int): Array[Byte] = { - val binaryType = rowType.getTypeAt(ordinal).asInstanceOf[FlussBinaryType] - row.getBinary(ordinal, binaryType.getLength) + override def getBinary(ordinal: Int): Array[Byte] = rowType.getTypeAt(ordinal) match { + case b: FlussBinaryType => row.getBinary(ordinal, b.getLength) + case _: FlussBytesType => row.getBytes(ordinal) } override def getInterval(ordinal: Int): CalendarInterval = diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/SparkPredicateConverter.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/SparkPredicateConverter.scala index 4ea5fd75f7..ff103bb189 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/SparkPredicateConverter.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/SparkPredicateConverter.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.types.{Decimal => SparkDecimal} import org.apache.spark.unsafe.types.UTF8String import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort} -import java.time.LocalDate import scala.jdk.CollectionConverters._ @@ -277,9 +276,8 @@ object SparkPredicateConverter { } case DataTypeRoot.DATE => - // RPC serialization (PredicateMessageUtils) expects LocalDate. value match { - case d: Integer => LocalDate.ofEpochDay(d.longValue()) + case d: Integer => d case _ => throw new UnsupportedExpression } diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala index 462d9032f6..3276decb77 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala @@ -498,15 +498,38 @@ class SparkLogTableReadTest extends FlussSparkTestBase { } } - test("Spark Read: partition pushdown — INT partition with range predicate") { + test("Spark Read: partition pushdown — all supported types") { withTable("t") { sql(s""" - |CREATE TABLE $DEFAULT_DATABASE.t (id BIGINT, pt INT) - |PARTITIONED BY (pt)""".stripMargin) - sql(s"INSERT INTO $DEFAULT_DATABASE.t VALUES (1, 2), (2, 10)") + |CREATE TABLE $DEFAULT_DATABASE.t ( + | id INT, + | p_bool BOOLEAN, p_int_eq INT, p_int_range INT, + | p_bigint BIGINT, p_float FLOAT, p_double DOUBLE, + | p_string STRING, p_binary BINARY, + | p_date DATE, p_ts TIMESTAMP, p_ts_ntz TIMESTAMP_NTZ) + |PARTITIONED BY ( + | p_bool, p_int_eq, p_int_range, p_bigint, p_float, p_double, + | p_string, p_binary, p_date, p_ts, p_ts_ntz)""".stripMargin) + sql(s"""INSERT INTO $DEFAULT_DATABASE.t VALUES + |(1, false, 10, 10, 99999L, CAST(12.5 AS FLOAT), 7.88, 'hello', + | CAST('Hi' AS BINARY), DATE '2026-01-01', + | TIMESTAMP '2026-01-01 12:00:00', TIMESTAMP_NTZ '2026-01-01 12:00:00'), + |(2, true, 11, 2, 99998L, CAST(13.5 AS FLOAT), 8.88, 'world', + | CAST('Bye' AS BINARY), DATE '2026-01-02', + | TIMESTAMP '2026-01-02 12:00:00', TIMESTAMP_NTZ '2026-01-02 12:00:00') + |""".stripMargin) - val query = sql(s"SELECT * FROM $DEFAULT_DATABASE.t WHERE pt > 2 ORDER BY id") - checkAnswer(query, Row(2L, 10) :: Nil) + val query = sql(s""" + |SELECT id FROM $DEFAULT_DATABASE.t WHERE + | p_bool = false AND p_int_eq = 10 AND p_int_range > 2 + | AND p_bigint = 99999L + | AND p_float = CAST(12.5 AS FLOAT) AND p_double = 7.88 + | AND p_string = 'hello' AND p_binary = CAST('Hi' AS BINARY) + | AND p_date = DATE '2026-01-01' + | AND p_ts = TIMESTAMP '2026-01-01 12:00:00' + | AND p_ts_ntz = TIMESTAMP_NTZ '2026-01-01 12:00:00' + |""".stripMargin) + checkAnswer(query, Row(1) :: Nil) assert(partitionPredicate(query).isDefined) } } diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/utils/SparkPredicateConverterTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/utils/SparkPredicateConverterTest.scala index 07fca20810..154a2f9382 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/utils/SparkPredicateConverterTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/utils/SparkPredicateConverterTest.scala @@ -195,7 +195,7 @@ class SparkPredicateConverterTest extends AnyFunSuite { test("Date literal from epoch days") { val days = Integer.valueOf(LocalDate.of(2025, 1, 15).toEpochDay.toInt) val predicate = convert(pred("=", ref("dt"), lit(days, DateType))) - val expected = new PredicateBuilder(rowType).equal(6, LocalDate.of(2025, 1, 15)) + val expected = new PredicateBuilder(rowType).equal(6, days) assertThat(predicate).isEqualTo(expected) }