Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public static Object get(InternalRow internalRow, int pos, DataType fieldType) {
return internalRow.getTimestampNtz(pos, timestampType.getPrecision());
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType lzTs = (LocalZonedTimestampType) fieldType;
return internalRow.getTimestampNtz(pos, lzTs.getPrecision());
return internalRow.getTimestampLtz(pos, lzTs.getPrecision());
case FLOAT:
return internalRow.getFloat(pos);
case DOUBLE:
Expand All @@ -188,6 +188,7 @@ public static Object get(InternalRow internalRow, int pos, DataType fieldType) {
return internalRow.getDecimal(
pos, decimalType.getPrecision(), decimalType.getScale());
case BINARY:
case BYTES:
Comment thread
fresh-borzoni marked this conversation as resolved.
return internalRow.getBytes(pos);
default:
throw new UnsupportedOperationException("Unsupported type: " + fieldType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import org.apache.fluss.exception.InvalidPartitionException;
import org.apache.fluss.metadata.PartitionSpec;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
import org.apache.fluss.types.DataTypeRoot;
import org.apache.fluss.types.RowType;

import java.time.Instant;
import java.time.ZonedDateTime;
Expand Down Expand Up @@ -345,4 +348,28 @@ public static String convertValueOfType(Object value, DataTypeRoot type) {
}
return stringPartitionKey;
}

/** Projects {@code tableInfo}'s row type down to its partition key columns, in key order. */
public static RowType partitionRowType(TableInfo tableInfo) {
RowType schema = tableInfo.getRowType();
List<String> fieldNames = schema.getFieldNames();
int[] indexes =
tableInfo.getPartitionKeys().stream().mapToInt(fieldNames::indexOf).toArray();
return schema.project(indexes);
}

/**
* Builds a row of typed partition values by parsing each string with {@link
* #parseValueOfType(String, DataTypeRoot)} for the column at that ordinal in {@code
* partitionRowType}.
*/
public static GenericRow toPartitionRow(
List<String> partitionValues, RowType partitionRowType) {
GenericRow row = new GenericRow(partitionValues.size());
for (int i = 0; i < partitionValues.size(); i++) {
DataTypeRoot type = partitionRowType.getTypeAt(i).getTypeRoot();
row.setField(i, parseValueOfType(partitionValues.get(i), type));
}
return row;
}
Comment thread
fresh-borzoni marked this conversation as resolved.
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@
import static org.apache.fluss.flink.utils.PredicateConverter.convertToFlussPredicate;
import static org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
import static org.apache.fluss.flink.utils.PushdownUtils.extractFieldEquals;
import static org.apache.fluss.flink.utils.StringifyPredicateVisitor.stringifyPartitionPredicate;
import static org.apache.fluss.utils.Preconditions.checkNotNull;

/** Flink table source to scan Fluss data. */
Expand Down Expand Up @@ -627,8 +626,7 @@ && hasPrimaryKey()
} else {
acceptedFilters.add(filter);
}
// Convert literals in the predicate to partition string
converted.add(stringifyPartitionPredicate(p));
converted.add(p);
} else {
remainingFilters.add(filter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,10 @@
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.predicate.Predicate;
import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.shaded.guava32.com.google.common.collect.Lists;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.ExceptionUtils;
import org.apache.fluss.utils.PartitionUtils;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
Expand Down Expand Up @@ -558,9 +557,17 @@ private List<PartitionInfo> applyPartitionFilter(List<PartitionInfo> partitionIn
return partitionInfos;
} else {
int originalSize = partitionInfos.size();
RowType partitionRowType = PartitionUtils.partitionRowType(tableInfo);
Comment thread
fresh-borzoni marked this conversation as resolved.
List<PartitionInfo> filteredPartitionInfos =
partitionInfos.stream()
.filter(partition -> partitionFilters.test(toInternalRow(partition)))
.filter(
partition ->
partitionFilters.test(
PartitionUtils.toPartitionRow(
partition
.getResolvedPartitionSpec()
.getPartitionValues(),
partitionRowType)))
.collect(Collectors.toList());

int filteredSize = filteredPartitionInfos.size();
Expand All @@ -583,16 +590,6 @@ private List<PartitionInfo> applyPartitionFilter(List<PartitionInfo> partitionIn
}
}

private static InternalRow toInternalRow(PartitionInfo partitionInfo) {
List<String> partitionValues =
partitionInfo.getResolvedPartitionSpec().getPartitionValues();
GenericRow genericRow = new GenericRow(partitionValues.size());
for (int i = 0; i < partitionValues.size(); i++) {
genericRow.setField(i, BinaryString.fromString(partitionValues.get(i)));
}
return genericRow;
}

/** Init the splits for Fluss. */
private void checkPartitionChanges(Set<PartitionInfo> partitionInfos, Throwable t) {
if (closed) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1228,15 +1228,15 @@ void testStreamingReadAllPartitionTypePushDown() throws Exception {

String query =
"select * from all_type_partitioned_table where "
+ "p_bool is false and p_int not in (12) and p_bigint in (99999) and p_bytes=CAST('Hi' AS VARBINARY) "
+ "p_bool is false and p_int < 11 and p_bigint in (99999) and p_bytes=CAST('Hi' AS VARBINARY) "
+ "and p_string='hello' and p_float=CAST(12.5 AS FLOAT) and p_double=7.88 and p_date=DATE '2025-10-12' "
+ "and p_time=TIME '12:55:00' and p_ts_ntz=TIMESTAMP '2025-10-12 12:55:00.001' "
+ "and p_ts_ltz=TO_TIMESTAMP_LTZ(4001, 3)";
String plan = tEnv.explainSql(query);
assertThat(plan)
.contains(
"TableSourceScan(table=[[testcatalog, defaultdb, all_type_partitioned_table, "
+ "filter=[and(and(and(and(and(and(and(and(and(and(<>(p_int, 12), "
+ "filter=[and(and(and(and(and(and(and(and(and(and(<(p_int, 11), "
+ "=(p_bigint, 99999:BIGINT)), =(p_bytes, X'4869':VARBINARY(2147483647))), "
+ "=(p_string, _UTF-16LE'hello':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")), "
+ "=(p_float, 1.25E1:FLOAT)), =(p_double, 7.88E0:DOUBLE)), =(p_date, 2025-10-12)), "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@
import org.apache.fluss.types.DecimalType;
import org.apache.fluss.types.RowType;

import java.time.LocalDate;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -145,9 +143,9 @@ private static Object toLiteralValue(PbLiteralValue pbLiteral, DataType fieldTyp
decimalType.getScale());
}
case DATE:
return LocalDate.ofEpochDay(pbLiteral.getBigintValue());
return (int) pbLiteral.getBigintValue();
case TIME_WITHOUT_TIME_ZONE:
return LocalTime.ofNanoOfDay(pbLiteral.getIntValue() * 1_000_000L);
return pbLiteral.getIntValue();
case TIMESTAMP_WITHOUT_TIME_ZONE:
return TimestampNtz.fromMillis(
pbLiteral.getTimestampMillisValue(),
Expand Down Expand Up @@ -282,10 +280,10 @@ private static PbLiteralValue toPbLiteralValue(DataType type, Object literal) {
pbLiteral.setIntValue((Integer) literal);
break;
case DATE:
pbLiteral.setBigintValue(((LocalDate) literal).toEpochDay());
pbLiteral.setBigintValue(((Integer) literal).longValue());
break;
case TIME_WITHOUT_TIME_ZONE:
pbLiteral.setIntValue((int) (((LocalTime) literal).toNanoOfDay() / 1_000_000L));
pbLiteral.setIntValue((Integer) literal);
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
pbLiteral.setTimestampMillisValue(((TimestampNtz) literal).getMillisecond());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@

import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -94,6 +93,20 @@ private static RowType buildRowType(LeafPredicate... predicates) {
return buildRowType(Arrays.asList(predicates));
}

@Test
public void testLeafPredicateTimeIntegerLiteral() {
int timeMillis = 45_000_000;
DataType type = new TimeType(false, 3);
LeafPredicate predicate =
new LeafPredicate(
Equal.INSTANCE, type, 0, "f_time", Collections.singletonList(timeMillis));
RowType rowType = buildRowType(predicate);
PbPredicate pb = PredicateMessageUtils.toPbPredicate(predicate, rowType);
Predicate result = PredicateMessageUtils.toPredicate(pb, rowType);
LeafPredicate lp = (LeafPredicate) result;
assertThat(lp.literals().get(0)).isEqualTo(timeMillis);
}

@Test
public void testLeafPredicateIntEqual() {
DataType type = new IntType(false);
Expand Down Expand Up @@ -317,22 +330,17 @@ public void testPbLiteralSerde() {
DataType dateType = new DateType(false);
LeafPredicate datePredicate =
new LeafPredicate(
Equal.INSTANCE,
dateType,
8,
"f_date",
Collections.singletonList(
LocalDate.ofEpochDay(19000L))); // days since epoch
Equal.INSTANCE, dateType, 8, "f_date", Collections.singletonList(19000));
// time
DataType timeType = new TimeType(false, 3);
LocalTime time = LocalTime.of(12, 30);
int timeMillis = (int) (LocalTime.of(12, 30).toNanoOfDay() / 1_000_000L);
LeafPredicate timePredicate =
new LeafPredicate(
Equal.INSTANCE,
timeType,
9,
"f_time",
Collections.singletonList(time)); // millis of day
Collections.singletonList(timeMillis));
// timestamp
DataType tsType = new TimestampType(false, 3);
TimestampNtz ts = TimestampNtz.fromMillis(1680000000000L, 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ class FlussAppendBatch(

if (tableInfo.isPartitioned) {
val matching =
SparkPartitionPredicate.filterPartitions(partitionInfos.asScala.toSeq, partitionPredicate)
SparkPartitionPredicate.filterPartitions(
tableInfo,
partitionInfos.asScala.toSeq,
partitionPredicate)
matching
.map {
partitionInfo =>
Expand Down Expand Up @@ -240,7 +243,10 @@ class FlussUpsertBatch(

if (tableInfo.isPartitioned) {
val matching =
SparkPartitionPredicate.filterPartitions(partitionInfos.asScala.toSeq, partitionPredicate)
SparkPartitionPredicate.filterPartitions(
tableInfo,
partitionInfos.asScala.toSeq,
partitionPredicate)
matching.flatMap {
partitionInfo =>
val partitionName = partitionInfo.getPartitionName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.fluss.spark.row

import org.apache.fluss.row.{InternalRow => FlussInternalRow}
import org.apache.fluss.types.{ArrayType => FlussArrayType, BinaryType => FlussBinaryType, LocalZonedTimestampType, MapType => FlussMapType, RowType, TimestampType}
import org.apache.fluss.types.{ArrayType => FlussArrayType, BinaryType => FlussBinaryType, BytesType => FlussBytesType, LocalZonedTimestampType, MapType => FlussMapType, RowType, TimestampType}
import org.apache.fluss.utils.InternalRowUtils

import org.apache.spark.sql.catalyst.{InternalRow => SparkInteralRow}
Expand Down Expand Up @@ -84,9 +84,9 @@ class FlussAsSparkRow(rowType: RowType) extends SparkInteralRow {
DataConverter.toSparkUTF8String(row.getString(ordinal))
}

override def getBinary(ordinal: Int): Array[Byte] = {
val binaryType = rowType.getTypeAt(ordinal).asInstanceOf[FlussBinaryType]
row.getBinary(ordinal, binaryType.getLength)
override def getBinary(ordinal: Int): Array[Byte] = rowType.getTypeAt(ordinal) match {
case b: FlussBinaryType => row.getBinary(ordinal, b.getLength)
case _: FlussBytesType => row.getBytes(ordinal)
}

override def getInterval(ordinal: Int): CalendarInterval =
Expand Down
Loading