diff --git a/docs/src/main/sphinx/connector/hudi.md b/docs/src/main/sphinx/connector/hudi.md index 2262c8e8c17a..1edd410d4bd6 100644 --- a/docs/src/main/sphinx/connector/hudi.md +++ b/docs/src/main/sphinx/connector/hudi.md @@ -96,10 +96,10 @@ Additionally, following configuration properties can be set depending on the use or `CAST(part_key AS INTEGER) % 2 = 0` are not recognized as partition filters, and queries using such expressions fail if the property is set to `true`. - `false` -* - `hudi.ignore-absent-partitions` - - Ignore partitions when the file system location does not exist rather than - failing the query. This skips data that may be expected to be part of the - table. +* - `hudi.metadata-enabled` + - Fetch the list of file names and sizes from Hudi metadata table. Improves + the files listing performance by avoiding direct storage calls while + building splits. - `false` ::: diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHiveFileUtils.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHiveFileUtils.java index 37d897590576..19f222e5dc03 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHiveFileUtils.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/avro/AvroHiveFileUtils.java @@ -136,7 +136,7 @@ private static Schema constructSchemaFromParts(List columnNames, List i ? columnComments.get(i) : null; - Schema fieldSchema = recordIncrementingUtil.avroSchemaForHiveType(columnTypes.get(i)); + Schema fieldSchema = recordIncrementingUtil.avroSchemaForHiveTypeInternal(columnTypes.get(i)); fieldBuilder = fieldBuilder .name(columnNames.get(i)) .doc(comment) @@ -146,13 +146,19 @@ private static Schema constructSchemaFromParts(List columnNames, List createAvroPrimitive(hiveType); case LIST -> { ListTypeInfo listTypeInfo = (ListTypeInfo) hiveType.getTypeInfo(); - yield Schema.createArray(avroSchemaForHiveType(HiveType.fromTypeInfo(listTypeInfo.getListElementTypeInfo()))); + yield Schema.createArray(avroSchemaForHiveTypeInternal(HiveType.fromTypeInfo(listTypeInfo.getListElementTypeInfo()))); } case MAP -> { MapTypeInfo mapTypeInfo = ((MapTypeInfo) hiveType.getTypeInfo()); @@ -162,13 +168,13 @@ private Schema avroSchemaForHiveType(HiveType hiveType) throw new UnsupportedOperationException("Key of Map must be a String"); } TypeInfo valueTypeInfo = mapTypeInfo.getMapValueTypeInfo(); - yield Schema.createMap(avroSchemaForHiveType(HiveType.fromTypeInfo(valueTypeInfo))); + yield Schema.createMap(avroSchemaForHiveTypeInternal(HiveType.fromTypeInfo(valueTypeInfo))); } case STRUCT -> createAvroRecord(hiveType); case UNION -> { List childSchemas = new ArrayList<>(); for (TypeInfo childTypeInfo : ((UnionTypeInfo) hiveType.getTypeInfo()).getAllUnionObjectTypeInfos()) { - final Schema childSchema = avroSchemaForHiveType(HiveType.fromTypeInfo(childTypeInfo)); + final Schema childSchema = avroSchemaForHiveTypeInternal(HiveType.fromTypeInfo(childTypeInfo)); if (childSchema.getType() == Schema.Type.UNION) { childSchemas.addAll(childSchema.getTypes()); } @@ -243,7 +249,7 @@ private Schema createAvroRecord(HiveType hiveType) for (int i = 0; i < allStructFieldNames.size(); ++i) { final TypeInfo childTypeInfo = allStructFieldTypeInfo.get(i); - final Schema fieldSchema = avroSchemaForHiveType(HiveType.fromTypeInfo(childTypeInfo)); + final Schema fieldSchema = avroSchemaForHiveTypeInternal(HiveType.fromTypeInfo(childTypeInfo)); fieldAssembler = fieldAssembler .name(allStructFieldNames.get(i)) .doc(childTypeInfo.toString()) diff --git a/plugin/trino-hudi/pom.xml b/plugin/trino-hudi/pom.xml index ed94afe1bfde..6a1c205f99e1 100644 --- a/plugin/trino-hudi/pom.xml +++ b/plugin/trino-hudi/pom.xml @@ -19,6 +19,12 @@ + + com.esotericsoftware + kryo + 4.0.2 + + com.google.errorprone error_prone_annotations @@ -215,13 +221,6 @@ runtime - - com.esotericsoftware - kryo-shaded - 4.0.3 - test - - io.airlift configuration-testing @@ -401,6 +400,15 @@ + + org.apache.maven.plugins + maven-dependency-plugin + + + com.esotericsoftware:kryo + + + org.basepom.maven duplicate-finder-maven-plugin diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java index da087737f4b6..a6c27f34e493 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java @@ -31,8 +31,7 @@ @DefunctConfig({ "hudi.min-partition-batch-size", - "hudi.max-partition-batch-size", - "hudi.metadata-enabled", + "hudi.max-partition-batch-size" }) public class HudiConfig { @@ -47,7 +46,7 @@ public class HudiConfig private int splitGeneratorParallelism = 4; private long perTransactionMetastoreCacheMaximumSize = 2000; private boolean queryPartitionFilterRequired; - private boolean ignoreAbsentPartitions; + private boolean metadataEnabled; public List getColumnsToHide() { @@ -205,15 +204,16 @@ public boolean isQueryPartitionFilterRequired() return queryPartitionFilterRequired; } - @Config("hudi.ignore-absent-partitions") - public HudiConfig setIgnoreAbsentPartitions(boolean ignoreAbsentPartitions) + @Config("hudi.metadata-enabled") + @ConfigDescription("Fetch the list of file names and sizes from Hudi metadata table rather than storage") + public HudiConfig setMetadataEnabled(boolean metadataEnabled) { - this.ignoreAbsentPartitions = ignoreAbsentPartitions; + this.metadataEnabled = metadataEnabled; return this; } - public boolean isIgnoreAbsentPartitions() + public boolean isMetadataEnabled() { - return ignoreAbsentPartitions; + return this.metadataEnabled; } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java index 405017236503..013235e844da 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java @@ -31,7 +31,8 @@ public enum HudiErrorCode HUDI_CURSOR_ERROR(6, EXTERNAL), HUDI_FILESYSTEM_ERROR(7, EXTERNAL), HUDI_PARTITION_NOT_FOUND(8, EXTERNAL), - // HUDI_UNSUPPORTED_TABLE_TYPE(9, EXTERNAL), // Unused. Could be mistaken with HUDI_UNKNOWN_TABLE_TYPE. + HUDI_UNSUPPORTED_TABLE_TYPE(9, EXTERNAL), + HUDI_NO_VALID_COMMIT(10, EXTERNAL) /**/; diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java index b98cb5355663..a089da3ab678 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.metastore.Column; import io.trino.metastore.HiveMetastore; @@ -23,6 +24,8 @@ import io.trino.metastore.TableInfo; import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable; import io.trino.plugin.hive.HiveColumnHandle; +import io.trino.plugin.hudi.storage.TrinoHudiStorage; +import io.trino.plugin.hudi.storage.TrinoStorageConfiguration; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; @@ -40,6 +43,9 @@ import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.TypeManager; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.storage.StoragePath; import java.util.Collection; import java.util.Collections; @@ -68,6 +74,7 @@ import static io.trino.plugin.hudi.HudiSessionProperties.isQueryPartitionFilterRequired; import static io.trino.plugin.hudi.HudiTableProperties.LOCATION_PROPERTY; import static io.trino.plugin.hudi.HudiTableProperties.PARTITIONED_BY_PROPERTY; +import static io.trino.plugin.hudi.HudiUtil.fromInputFormat; import static io.trino.plugin.hudi.HudiUtil.hudiMetadataExists; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.StandardErrorCode.QUERY_REJECTED; @@ -77,7 +84,6 @@ import static java.util.Collections.singletonList; import static java.util.Objects.requireNonNull; import static java.util.function.Function.identity; -import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; public class HudiMetadata implements ConnectorMetadata @@ -119,15 +125,21 @@ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName throw new TrinoException(UNSUPPORTED_TABLE_TYPE, format("Not a Hudi table: %s", tableName)); } Location location = Location.of(table.get().getStorage().getLocation()); - if (!hudiMetadataExists(fileSystemFactory.create(session), location)) { + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + if (!hudiMetadataExists(fileSystem, location)) { throw new TrinoException(HUDI_BAD_DATA, "Location of table %s does not contain Hudi table metadata: %s".formatted(tableName, location)); } + StoragePath metaLocation = new StoragePath(table.get().getStorage().getLocation(), HoodieTableMetaClient.METAFOLDER_NAME); + HoodieTableConfig tableConfig = new HoodieTableConfig(new TrinoHudiStorage(fileSystem, new TrinoStorageConfiguration()), metaLocation, null, null, null); + String preCombineField = tableConfig.getPreCombineField(); + String inputFormat = table.get().getStorage().getStorageFormat().getInputFormat(); return new HudiTableHandle( tableName.getSchemaName(), tableName.getTableName(), table.get().getStorage().getLocation(), - COPY_ON_WRITE, + fromInputFormat(inputFormat), + preCombineField, getPartitionKeyColumnHandles(table.get(), typeManager), TupleDomain.all(), TupleDomain.all()); diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSource.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSource.java new file mode 100644 index 000000000000..902d142a0eb8 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSource.java @@ -0,0 +1,139 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi; + +import io.trino.plugin.hive.HiveColumnHandle; +import io.trino.plugin.hudi.util.HudiAvroSerializer; +import io.trino.plugin.hudi.util.SynthesizedColumnHandler; +import io.trino.spi.Page; +import io.trino.spi.PageBuilder; +import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.connector.SourcePage; +import io.trino.spi.metrics.Metrics; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.table.read.HoodieFileGroupReader; + +import java.io.IOException; +import java.util.List; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Objects.requireNonNull; + +public class HudiPageSource + implements ConnectorPageSource +{ + private final HoodieFileGroupReader fileGroupReader; + private final ConnectorPageSource pageSource; + private final PageBuilder pageBuilder; + private final HudiAvroSerializer avroSerializer; + + public HudiPageSource( + ConnectorPageSource pageSource, + HoodieFileGroupReader fileGroupReader, + List columnHandles, + SynthesizedColumnHandler synthesizedColumnHandler) + { + this.pageSource = requireNonNull(pageSource, "pageSource is null"); + this.fileGroupReader = requireNonNull(fileGroupReader, "fileGroupReader is null"); + this.initFileGroupReader(); + this.pageBuilder = new PageBuilder(columnHandles.stream().map(HiveColumnHandle::getType).collect(toImmutableList())); + this.avroSerializer = new HudiAvroSerializer(columnHandles, synthesizedColumnHandler); + } + + @Override + public long getCompletedBytes() + { + return pageSource.getCompletedBytes(); + } + + @Override + public OptionalLong getCompletedPositions() + { + return pageSource.getCompletedPositions(); + } + + @Override + public long getReadTimeNanos() + { + return pageSource.getReadTimeNanos(); + } + + @Override + public boolean isFinished() + { + try { + return !fileGroupReader.hasNext(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public SourcePage getNextSourcePage() + { + checkState(pageBuilder.isEmpty(), "PageBuilder is not empty at the beginning of a new page"); + try { + while (fileGroupReader.hasNext()) { + avroSerializer.buildRecordInPage(pageBuilder, fileGroupReader.next()); + } + } + catch (IOException e) { + throw new RuntimeException(e); + } + + Page newPage = pageBuilder.build(); + pageBuilder.reset(); + return SourcePage.create(newPage); + } + + @Override + public long getMemoryUsage() + { + return pageSource.getMemoryUsage(); + } + + @Override + public void close() + throws IOException + { + fileGroupReader.close(); + pageSource.close(); + } + + @Override + public CompletableFuture isBlocked() + { + return pageSource.isBlocked(); + } + + @Override + public Metrics getMetrics() + { + return pageSource.getMetrics(); + } + + protected void initFileGroupReader() + { + try { + this.fileGroupReader.initRecordIterators(); + } + catch (IOException e) { + throw new RuntimeException("Failed to initialize file group reader!", e); + } + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java index de65955fabd5..496cb966d014 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java @@ -32,11 +32,13 @@ import io.trino.parquet.reader.RowGroupInfo; import io.trino.plugin.base.metrics.FileFormatDataSourceStats; import io.trino.plugin.hive.HiveColumnHandle; -import io.trino.plugin.hive.HivePartitionKey; -import io.trino.plugin.hive.TransformConnectorPageSource; import io.trino.plugin.hive.parquet.ParquetReaderConfig; +import io.trino.plugin.hudi.file.HudiBaseFile; +import io.trino.plugin.hudi.reader.TrinoHudiReaderContext; +import io.trino.plugin.hudi.storage.TrinoHudiStorage; +import io.trino.plugin.hudi.storage.TrinoStorageConfiguration; +import io.trino.plugin.hudi.util.SynthesizedColumnHandler; import io.trino.spi.TrinoException; -import io.trino.spi.block.Block; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.connector.ConnectorPageSourceProvider; @@ -45,43 +47,33 @@ import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.DynamicFilter; +import io.trino.spi.connector.EmptyPageSource; import io.trino.spi.predicate.TupleDomain; -import io.trino.spi.type.Decimals; -import io.trino.spi.type.TypeSignature; -import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.read.HoodieFileGroupReader; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.storage.StoragePath; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.io.MessageColumnIO; import org.apache.parquet.schema.MessageType; import org.joda.time.DateTimeZone; import java.io.IOException; -import java.sql.Timestamp; -import java.time.LocalDate; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.time.format.DateTimeParseException; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; import java.util.TimeZone; +import java.util.stream.Collectors; -import static com.google.common.base.Preconditions.checkArgument; -import static io.airlift.slice.Slices.utf8Slice; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; -import static io.trino.metastore.Partitions.makePartName; import static io.trino.parquet.ParquetTypeUtils.getColumnIO; import static io.trino.parquet.ParquetTypeUtils.getDescriptors; import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; import static io.trino.parquet.predicate.PredicateUtils.getFilteredRowGroups; -import static io.trino.plugin.hive.HiveColumnHandle.FILE_MODIFIED_TIME_COLUMN_NAME; -import static io.trino.plugin.hive.HiveColumnHandle.FILE_MODIFIED_TIME_TYPE_SIGNATURE; -import static io.trino.plugin.hive.HiveColumnHandle.FILE_SIZE_COLUMN_NAME; -import static io.trino.plugin.hive.HiveColumnHandle.FILE_SIZE_TYPE_SIGNATURE; -import static io.trino.plugin.hive.HiveColumnHandle.PARTITION_COLUMN_NAME; -import static io.trino.plugin.hive.HiveColumnHandle.PARTITION_TYPE_SIGNATURE; -import static io.trino.plugin.hive.HiveColumnHandle.PATH_COLUMN_NAME; -import static io.trino.plugin.hive.HiveColumnHandle.PATH_TYPE; import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.ParquetReaderProvider; import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.createDataSource; import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.createParquetPageSource; @@ -90,37 +82,16 @@ import static io.trino.plugin.hudi.HudiErrorCode.HUDI_BAD_DATA; import static io.trino.plugin.hudi.HudiErrorCode.HUDI_CANNOT_OPEN_SPLIT; import static io.trino.plugin.hudi.HudiErrorCode.HUDI_CURSOR_ERROR; -import static io.trino.plugin.hudi.HudiErrorCode.HUDI_INVALID_PARTITION_VALUE; -import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNSUPPORTED_FILE_FORMAT; import static io.trino.plugin.hudi.HudiSessionProperties.getParquetSmallFileThreshold; import static io.trino.plugin.hudi.HudiSessionProperties.isParquetVectorizedDecodingEnabled; import static io.trino.plugin.hudi.HudiSessionProperties.shouldUseParquetColumnNames; -import static io.trino.plugin.hudi.HudiUtil.getHudiFileFormat; -import static io.trino.spi.predicate.Utils.nativeValueToBlock; -import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone; -import static io.trino.spi.type.StandardTypes.BIGINT; -import static io.trino.spi.type.StandardTypes.BOOLEAN; -import static io.trino.spi.type.StandardTypes.DATE; -import static io.trino.spi.type.StandardTypes.DECIMAL; -import static io.trino.spi.type.StandardTypes.DOUBLE; -import static io.trino.spi.type.StandardTypes.INTEGER; -import static io.trino.spi.type.StandardTypes.REAL; -import static io.trino.spi.type.StandardTypes.SMALLINT; -import static io.trino.spi.type.StandardTypes.TIMESTAMP; -import static io.trino.spi.type.StandardTypes.TINYINT; -import static io.trino.spi.type.StandardTypes.VARBINARY; -import static io.trino.spi.type.StandardTypes.VARCHAR; -import static io.trino.spi.type.TimeZoneKey.UTC_KEY; -import static java.lang.Double.parseDouble; -import static java.lang.Float.floatToRawIntBits; -import static java.lang.Float.parseFloat; -import static java.lang.Long.parseLong; +import static io.trino.plugin.hudi.HudiUtil.buildTableMetaClient; +import static io.trino.plugin.hudi.HudiUtil.constructSchema; +import static io.trino.plugin.hudi.HudiUtil.convertToFileSlice; +import static io.trino.plugin.hudi.HudiUtil.prependHudiMetaColumns; import static java.lang.String.format; -import static java.util.Objects.isNull; import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toMap; -import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; +import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; public class HudiPageSourceProvider implements ConnectorPageSourceProvider @@ -152,28 +123,61 @@ public ConnectorPageSource createPageSource( List columns, DynamicFilter dynamicFilter) { - HudiSplit split = (HudiSplit) connectorSplit; - String path = split.location(); - HoodieFileFormat hudiFileFormat = getHudiFileFormat(path); - if (PARQUET != hudiFileFormat) { - throw new TrinoException(HUDI_UNSUPPORTED_FILE_FORMAT, format("File format %s not supported", hudiFileFormat)); + HudiTableHandle hudiTableHandle = (HudiTableHandle) connectorTable; + HudiSplit hudiSplit = (HudiSplit) connectorSplit; + Optional hudiBaseFileOpt = hudiSplit.baseFile(); + + String dataFilePath = hudiBaseFileOpt.isPresent() + ? hudiBaseFileOpt.get().getPath() + : hudiSplit.logFiles().getFirst().getPath(); + // Filter out metadata table splits + if (dataFilePath.contains(new StoragePath( + ((HudiTableHandle) connectorTable).getBasePath()).toUri().getPath() + "/.hoodie/metadata")) { + return new EmptyPageSource(); + } + // Handle MERGE_ON_READ tables to be read in read_optimized mode + // IMPORTANT: These tables will have a COPY_ON_WRITE table type due to how `HudiTableTypeUtils#fromInputFormat` + if (hudiTableHandle.getTableType().equals(COPY_ON_WRITE) && !hudiSplit.logFiles().isEmpty()) { + if (hudiBaseFileOpt.isEmpty()) { + // Handle hasLogFiles=true, hasBaseFile = false + // Ignoring log files without base files, no data required to be read + return new EmptyPageSource(); + } } + long start = 0; + long length = 10; + if (hudiBaseFileOpt.isPresent()) { + start = hudiBaseFileOpt.get().getStart(); + length = hudiBaseFileOpt.get().getLength(); + } + HoodieTableMetaClient metaClient = buildTableMetaClient(fileSystemFactory.create(session), hudiTableHandle.getBasePath()); + String latestCommitTime = metaClient.getCommitsTimeline().lastInstant().get().requestedTime(); + Schema dataSchema; + try { + dataSchema = new TableSchemaResolver(metaClient).getTableAvroSchema(latestCommitTime); + } + catch (Exception e) { + throw new RuntimeException(e); + } List hiveColumns = columns.stream() .map(HiveColumnHandle.class::cast) - .collect(toList()); - // just send regular columns to create parquet page source - // for partition columns, separate blocks will be created + .toList(); + List regularColumns = hiveColumns.stream() .filter(columnHandle -> !columnHandle.isPartitionKey() && !columnHandle.isHidden()) - .collect(toList()); + .collect(Collectors.toList()); + List columnHandles = prependHudiMetaColumns(regularColumns); + + Schema requestedSchema = constructSchema(columnHandles.stream().map(HiveColumnHandle::getName).toList(), + columnHandles.stream().map(HiveColumnHandle::getHiveType).toList(), false); + TrinoFileSystem fileSystem = fileSystemFactory.create(session); - TrinoInputFile inputFile = fileSystem.newInputFile(Location.of(path), split.fileSize()); - ConnectorPageSource pageSource = createPageSource( + ConnectorPageSource dataPageSource = createPageSource( session, - regularColumns, - split, - inputFile, + columnHandles, + hudiSplit, + fileSystem.newInputFile(Location.of(hudiBaseFileOpt.get().getPath()), hudiBaseFileOpt.get().getFileSize()), dataSourceStats, ParquetReaderOptions.builder(options) .withSmallFileThreshold(getParquetSmallFileThreshold(session)) @@ -181,34 +185,34 @@ public ConnectorPageSource createPageSource( .build(), timeZone); - Map partitionBlocks = convertPartitionValues(hiveColumns, split.partitionKeys()); + SynthesizedColumnHandler synthesizedColumnHandler = SynthesizedColumnHandler.create(hudiSplit); - TransformConnectorPageSource.Builder transforms = TransformConnectorPageSource.builder(); - int delegateIndex = 0; - for (HiveColumnHandle column : hiveColumns) { - if (partitionBlocks.containsKey(column.getName())) { - transforms.constantValue(partitionBlocks.get(column.getName())); - } - else if (column.getName().equals(PARTITION_COLUMN_NAME)) { - transforms.constantValue(nativeValueToBlock(PARTITION_TYPE_SIGNATURE, utf8Slice(toPartitionName(split.partitionKeys())))); - } - else if (column.getName().equals(PATH_COLUMN_NAME)) { - transforms.constantValue(nativeValueToBlock(PATH_TYPE, utf8Slice(path))); - } - else if (column.getName().equals(FILE_SIZE_COLUMN_NAME)) { - transforms.constantValue(nativeValueToBlock(FILE_SIZE_TYPE_SIGNATURE, split.fileSize())); - } - else if (column.getName().equals(FILE_MODIFIED_TIME_COLUMN_NAME)) { - long packedTimestamp = packDateTimeWithZone(split.fileModifiedTime(), UTC_KEY); - transforms.constantValue(nativeValueToBlock(FILE_MODIFIED_TIME_TYPE_SIGNATURE, packedTimestamp)); - } - else { - transforms.column(delegateIndex); - delegateIndex++; - } - } + TrinoHudiReaderContext readerContext = new TrinoHudiReaderContext( + dataPageSource, + columnHandles, + synthesizedColumnHandler); - return transforms.build(pageSource); + HoodieFileGroupReader fileGroupReader = + new HoodieFileGroupReader<>( + readerContext, + new TrinoHudiStorage(fileSystemFactory.create(session), new TrinoStorageConfiguration()), + hudiTableHandle.getBasePath(), + latestCommitTime, + convertToFileSlice(hudiSplit, hudiTableHandle.getBasePath()), + dataSchema, + requestedSchema, + Option.empty(), + metaClient, + metaClient.getTableConfig().getProps(), + start, + length, + false); + + return new HudiPageSource( + dataPageSource, + fileGroupReader, + hiveColumns, + synthesizedColumnHandler); } private static ConnectorPageSource createPageSource( @@ -222,12 +226,13 @@ private static ConnectorPageSource createPageSource( { ParquetDataSource dataSource = null; boolean useColumnNames = shouldUseParquetColumnNames(session); - String path = hudiSplit.location(); - long start = hudiSplit.start(); - long length = hudiSplit.length(); + HudiBaseFile baseFile = hudiSplit.baseFile().get(); + String path = baseFile.getPath(); + long start = baseFile.getStart(); + long length = baseFile.getLength(); try { AggregatedMemoryContext memoryContext = newSimpleAggregatedMemoryContext(); - dataSource = createDataSource(inputFile, OptionalLong.of(hudiSplit.fileSize()), options, memoryContext, dataSourceStats); + dataSource = createDataSource(inputFile, OptionalLong.of(baseFile.getFileSize()), options, memoryContext, dataSourceStats); ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); FileMetadata fileMetaData = parquetMetadata.getFileMetaData(); MessageType fileSchema = fileMetaData.getSchema(); @@ -301,69 +306,4 @@ private static TrinoException handleException(ParquetDataSourceId dataSourceId, } return new TrinoException(HUDI_CURSOR_ERROR, format("Failed to read Parquet file: %s", dataSourceId), exception); } - - private static Map convertPartitionValues( - List allColumns, - List partitionKeys) - { - return allColumns.stream() - .filter(HiveColumnHandle::isPartitionKey) - .collect(toMap( - HiveColumnHandle::getName, - columnHandle -> nativeValueToBlock( - columnHandle.getType(), - partitionToNativeValue( - columnHandle.getName(), - partitionKeys, - columnHandle.getType().getTypeSignature()).orElse(null)))); - } - - private static Optional partitionToNativeValue( - String partitionColumnName, - List partitionKeys, - TypeSignature partitionDataType) - { - HivePartitionKey partitionKey = partitionKeys.stream().filter(key -> key.name().equalsIgnoreCase(partitionColumnName)).findFirst().orElse(null); - if (isNull(partitionKey)) { - return Optional.empty(); - } - - String partitionValue = partitionKey.value(); - String baseType = partitionDataType.getBase(); - try { - return switch (baseType) { - case TINYINT, SMALLINT, INTEGER, BIGINT -> Optional.of(parseLong(partitionValue)); - case REAL -> Optional.of((long) floatToRawIntBits(parseFloat(partitionValue))); - case DOUBLE -> Optional.of(parseDouble(partitionValue)); - case VARCHAR, VARBINARY -> Optional.of(utf8Slice(partitionValue)); - case DATE -> Optional.of(LocalDate.parse(partitionValue, DateTimeFormatter.ISO_LOCAL_DATE).toEpochDay()); - case TIMESTAMP -> Optional.of(Timestamp.valueOf(partitionValue).toLocalDateTime().toEpochSecond(ZoneOffset.UTC) * 1_000); - case BOOLEAN -> { - checkArgument(partitionValue.equalsIgnoreCase("true") || partitionValue.equalsIgnoreCase("false")); - yield Optional.of(Boolean.valueOf(partitionValue)); - } - case DECIMAL -> Optional.of(Decimals.parse(partitionValue).getObject()); - default -> throw new TrinoException( - HUDI_INVALID_PARTITION_VALUE, - format("Unsupported data type '%s' for partition column %s", partitionDataType, partitionColumnName)); - }; - } - catch (IllegalArgumentException | DateTimeParseException e) { - throw new TrinoException( - HUDI_INVALID_PARTITION_VALUE, - format("Can not parse partition value '%s' of type '%s' for partition column '%s'", partitionValue, partitionDataType, partitionColumnName), - e); - } - } - - private static String toPartitionName(List partitions) - { - ImmutableList.Builder partitionNames = ImmutableList.builderWithExpectedSize(partitions.size()); - ImmutableList.Builder partitionValues = ImmutableList.builderWithExpectedSize(partitions.size()); - for (HivePartitionKey partition : partitions) { - partitionNames.add(partition.name()); - partitionValues.add(partition.value()); - } - return makePartName(partitionNames.build(), partitionValues.build()); - } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java index e3a9f98c8707..c6c100f27c15 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java @@ -52,7 +52,7 @@ public class HudiSessionProperties private static final String MAX_OUTSTANDING_SPLITS = "max_outstanding_splits"; private static final String SPLIT_GENERATOR_PARALLELISM = "split_generator_parallelism"; private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required"; - private static final String IGNORE_ABSENT_PARTITIONS = "ignore_absent_partitions"; + private static final String METADATA_TABLE_ENABLED = "metadata_enabled"; private final List> sessionProperties; @@ -128,9 +128,9 @@ public HudiSessionProperties(HudiConfig hudiConfig, ParquetReaderConfig parquetR hudiConfig.isQueryPartitionFilterRequired(), false), booleanProperty( - IGNORE_ABSENT_PARTITIONS, - "Ignore absent partitions", - hudiConfig.isIgnoreAbsentPartitions(), + METADATA_TABLE_ENABLED, + "For Hudi tables prefer to fetch the list of files from its metadata table", + hudiConfig.isMetadataEnabled(), false)); } @@ -196,8 +196,8 @@ public static boolean isQueryPartitionFilterRequired(ConnectorSession session) return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class); } - public static boolean isIgnoreAbsentPartitions(ConnectorSession session) + public static boolean isHudiMetadataTableEnabled(ConnectorSession session) { - return session.getProperty(IGNORE_ABSENT_PARTITIONS, Boolean.class); + return session.getProperty(METADATA_TABLE_ENABLED, Boolean.class); } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java index 5899ef8936b8..8721c684aa5c 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java @@ -16,25 +16,25 @@ import com.google.common.collect.ImmutableList; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HivePartitionKey; +import io.trino.plugin.hudi.file.HudiBaseFile; +import io.trino.plugin.hudi.file.HudiLogFile; import io.trino.spi.SplitWeight; import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.predicate.TupleDomain; import java.util.List; +import java.util.Optional; import static com.google.common.base.MoreObjects.toStringHelper; -import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.slice.SizeOf.estimatedSizeOf; import static io.airlift.slice.SizeOf.instanceSize; import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; public record HudiSplit( - String location, - long start, - long length, - long fileSize, - long fileModifiedTime, + Optional baseFile, + List logFiles, + String commitTime, TupleDomain predicate, List partitionKeys, SplitWeight splitWeight) @@ -44,11 +44,9 @@ public record HudiSplit( public HudiSplit { - checkArgument(start >= 0, "start must be positive"); - checkArgument(length >= 0, "length must be positive"); - checkArgument(start + length <= fileSize, "fileSize must be at least start + length"); - - requireNonNull(location, "location is null"); + requireNonNull(baseFile, "baseFile is null"); + logFiles = ImmutableList.copyOf(logFiles); + requireNonNull(commitTime, "commitTime is null"); requireNonNull(predicate, "predicate is null"); partitionKeys = ImmutableList.copyOf(partitionKeys); requireNonNull(splitWeight, "splitWeight is null"); @@ -58,7 +56,9 @@ public record HudiSplit( public long getRetainedSizeInBytes() { return INSTANCE_SIZE - + estimatedSizeOf(location) + + 10 + + 10 + + estimatedSizeOf(commitTime) + splitWeight.getRetainedSizeInBytes() + predicate.getRetainedSizeInBytes(HiveColumnHandle::getRetainedSizeInBytes) + estimatedSizeOf(partitionKeys, HivePartitionKey::estimatedSizeInBytes); @@ -68,11 +68,9 @@ public long getRetainedSizeInBytes() public String toString() { return toStringHelper(this) - .addValue(location) - .addValue(start) - .addValue(length) - .addValue(fileSize) - .addValue(fileModifiedTime) + .addValue(baseFile) + .addValue(logFiles) + .addValue(commitTime) .toString(); } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java index ae2902ae58e4..cd350eb224f5 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Inject; +import io.airlift.log.Logger; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.metastore.HiveMetastore; import io.trino.metastore.Table; @@ -49,6 +50,8 @@ public class HudiSplitManager implements ConnectorSplitManager { + private static final Logger log = Logger.get(HudiSplitManager.class); + private final TypeManager typeManager; private final HudiTransactionManager transactionManager; private final TrinoFileSystemFactory fileSystemFactory; @@ -87,7 +90,7 @@ public ConnectorSplitSource getSplits( Map partitionColumnHandles = partitionColumns.stream() .collect(toImmutableMap(HiveColumnHandle::getName, identity())); List partitions = getPartitions(metastore, hudiTableHandle, partitionColumns); - + log.debug("Pruned partition count: %d", partitions.size()); HudiSplitSource splitSource = new HudiSplitSource( session, metastore, diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java index ee2efa976db8..bf4c060d9bc8 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java @@ -23,7 +23,7 @@ import io.trino.plugin.hive.util.AsyncQueue; import io.trino.plugin.hive.util.ThrottledAsyncQueue; import io.trino.plugin.hudi.query.HudiDirectoryLister; -import io.trino.plugin.hudi.query.HudiReadOptimizedDirectoryLister; +import io.trino.plugin.hudi.query.HudiSnapshotDirectoryLister; import io.trino.plugin.hudi.split.HudiBackgroundSplitLoader; import io.trino.plugin.hudi.split.HudiSplitWeightProvider; import io.trino.plugin.hudi.split.SizeBasedSplitWeightProvider; @@ -32,6 +32,7 @@ import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.connector.ConnectorSplitSource; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; import java.util.List; import java.util.Map; @@ -48,9 +49,10 @@ import static io.trino.plugin.hudi.HudiSessionProperties.getMinimumAssignedSplitWeight; import static io.trino.plugin.hudi.HudiSessionProperties.getSplitGeneratorParallelism; import static io.trino.plugin.hudi.HudiSessionProperties.getStandardSplitWeightSize; -import static io.trino.plugin.hudi.HudiSessionProperties.isIgnoreAbsentPartitions; +import static io.trino.plugin.hudi.HudiSessionProperties.isHudiMetadataTableEnabled; import static io.trino.plugin.hudi.HudiSessionProperties.isSizeBasedSplitWeightsEnabled; import static io.trino.plugin.hudi.HudiUtil.buildTableMetaClient; +import static java.lang.String.format; import static java.util.stream.Collectors.toList; public class HudiSplitSource @@ -73,18 +75,25 @@ public HudiSplitSource( int maxOutstandingSplits, List partitions) { + boolean enableMetadataTable = isHudiMetadataTableEnabled(session); HoodieTableMetaClient metaClient = buildTableMetaClient(fileSystemFactory.create(session), tableHandle.getBasePath()); + String latestCommitTime = metaClient.getActiveTimeline() + .getCommitsTimeline() + .filterCompletedInstants() + .lastInstant() + .map(HoodieInstant::requestedTime) + .orElseThrow(() -> new TrinoException(HudiErrorCode.HUDI_NO_VALID_COMMIT, format("Table %s has no valid commits", tableHandle.getSchemaTableName()))); List partitionColumnHandles = table.getPartitionColumns().stream() .map(column -> partitionColumnHandleMap.get(column.getName())).collect(toList()); - HudiDirectoryLister hudiDirectoryLister = new HudiReadOptimizedDirectoryLister( + HudiDirectoryLister hudiDirectoryLister = new HudiSnapshotDirectoryLister( tableHandle, metaClient, + enableMetadataTable, metastore, table, partitionColumnHandles, - partitions, - !tableHandle.getPartitionColumns().isEmpty() && isIgnoreAbsentPartitions(session)); + partitions); this.queue = new ThrottledAsyncQueue<>(maxSplitsPerSecond, maxOutstandingSplits, executor); HudiBackgroundSplitLoader splitLoader = new HudiBackgroundSplitLoader( @@ -99,7 +108,8 @@ public HudiSplitSource( trinoException.compareAndSet(null, new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Failed to generate splits for " + table.getSchemaTableName(), throwable)); queue.finish(); - }); + }, + latestCommitTime); this.splitLoaderFuture = splitLoaderExecutorService.schedule(splitLoader, 0, TimeUnit.MILLISECONDS); } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java index f423ff1c7c48..e570ed9e2382 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieTableType; import java.util.List; +import java.util.Optional; import java.util.Set; import static io.trino.spi.connector.SchemaTableName.schemaTableName; @@ -36,6 +37,7 @@ public class HudiTableHandle private final String tableName; private final String basePath; private final HoodieTableType tableType; + private final Optional preCombineField; private final List partitionColumns; // Used only for validation when config property hudi.query-partition-filter-required is enabled private final Set constraintColumns; @@ -48,11 +50,12 @@ public HudiTableHandle( @JsonProperty("tableName") String tableName, @JsonProperty("basePath") String basePath, @JsonProperty("tableType") HoodieTableType tableType, + @JsonProperty("preCombineField") String preCombineField, @JsonProperty("partitionColumns") List partitionColumns, @JsonProperty("partitionPredicates") TupleDomain partitionPredicates, @JsonProperty("regularPredicates") TupleDomain regularPredicates) { - this(schemaName, tableName, basePath, tableType, partitionColumns, ImmutableSet.of(), partitionPredicates, regularPredicates); + this(schemaName, tableName, basePath, tableType, Optional.ofNullable(preCombineField), partitionColumns, ImmutableSet.of(), partitionPredicates, regularPredicates); } public HudiTableHandle( @@ -60,6 +63,7 @@ public HudiTableHandle( String tableName, String basePath, HoodieTableType tableType, + Optional preCombineField, List partitionColumns, Set constraintColumns, TupleDomain partitionPredicates, @@ -69,6 +73,7 @@ public HudiTableHandle( this.tableName = requireNonNull(tableName, "tableName is null"); this.basePath = requireNonNull(basePath, "basePath is null"); this.tableType = requireNonNull(tableType, "tableType is null"); + this.preCombineField = requireNonNull(preCombineField, "preCombineField is null"); this.partitionColumns = requireNonNull(partitionColumns, "partitionColumns is null"); this.constraintColumns = requireNonNull(constraintColumns, "constraintColumns is null"); this.partitionPredicates = requireNonNull(partitionPredicates, "partitionPredicates is null"); @@ -99,6 +104,12 @@ public HoodieTableType getTableType() return tableType; } + @JsonProperty + public Optional getPreCombineField() + { + return preCombineField; + } + @JsonProperty public TupleDomain getPartitionPredicates() { @@ -139,6 +150,7 @@ HudiTableHandle applyPredicates( tableName, basePath, tableType, + preCombineField, partitionColumns, constraintColumns, partitionPredicates.intersect(partitionTupleDomain), diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java index e92742aaaeb8..3065501de8f4 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java @@ -20,6 +20,7 @@ import io.trino.filesystem.TrinoFileSystem; import io.trino.metastore.Column; import io.trino.metastore.HivePartition; +import io.trino.metastore.HiveType; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HivePartitionKey; import io.trino.plugin.hive.HivePartitionManager; @@ -30,51 +31,48 @@ import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.NullableValue; import io.trino.spi.predicate.TupleDomain; -import org.apache.hudi.common.model.HoodieFileFormat; +import io.trino.spi.type.VarcharType; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.storage.StoragePath; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; +import static com.google.common.base.Preconditions.checkArgument; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; +import static io.trino.plugin.hive.avro.AvroHiveFileUtils.avroSchemaFromHiveType; import static io.trino.plugin.hive.util.HiveUtil.checkCondition; import static io.trino.plugin.hudi.HudiErrorCode.HUDI_FILESYSTEM_ERROR; -import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNSUPPORTED_FILE_FORMAT; -import static org.apache.hudi.common.model.HoodieFileFormat.HFILE; -import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG; -import static org.apache.hudi.common.model.HoodieFileFormat.ORC; -import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; +import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNSUPPORTED_TABLE_TYPE; +import static java.util.Objects.requireNonNull; +import static org.apache.hudi.avro.HoodieAvroUtils.METADATA_FIELD_SCHEMA; import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME; public final class HudiUtil { - private HudiUtil() {} - - public static HoodieFileFormat getHudiFileFormat(String path) - { - String extension = getFileExtension(path); - if (extension.equals(PARQUET.getFileExtension())) { - return PARQUET; - } - if (extension.equals(HOODIE_LOG.getFileExtension())) { - return HOODIE_LOG; - } - if (extension.equals(ORC.getFileExtension())) { - return ORC; - } - if (extension.equals(HFILE.getFileExtension())) { - return HFILE; - } - throw new TrinoException(HUDI_UNSUPPORTED_FILE_FORMAT, "Hoodie InputFormat not implemented for base file of type " + extension); - } + private static final List HOODIE_META_COLUMNS = ImmutableList.of( + "_hoodie_commit_time", + "_hoodie_commit_seqno", + "_hoodie_record_key", + "_hoodie_partition_path", + "_hoodie_file_name"); + private static final String HUDI_PARQUET_INPUT_FORMAT = "org.apache.hudi.hadoop.HoodieParquetInputFormat"; + private static final String HUDI_PARQUET_REALTIME_INPUT_FORMAT = "org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat"; + private static final String HUDI_INPUT_FORMAT = "com.uber.hoodie.hadoop.HoodieInputFormat"; + private static final String HUDI_REALTIME_INPUT_FORMAT = "com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat"; - private static String getFileExtension(String fullName) - { - String fileName = Location.of(fullName).fileName(); - int dotIndex = fileName.lastIndexOf('.'); - return dotIndex == -1 ? "" : fileName.substring(dotIndex); - } + private HudiUtil() {} public static boolean hudiMetadataExists(TrinoFileSystem trinoFileSystem, Location baseLocation) { @@ -140,4 +138,79 @@ public static HoodieTableMetaClient buildTableMetaClient( .setBasePath(basePath) .build(); } + + public static Schema constructSchema(List columnNames, List columnTypes, boolean withMetaColumns) + { + requireNonNull(columnNames, "columnNames is null"); + requireNonNull(columnTypes, "columnTypes is null"); + checkArgument(columnNames.size() == columnTypes.size(), "columnNames and columnTypes must be the same size"); + SchemaBuilder.RecordBuilder schemaBuilder = SchemaBuilder.record("baseRecord"); + SchemaBuilder.FieldAssembler fieldBuilder = schemaBuilder.fields(); + + if (withMetaColumns) { + for (String metaFieldName : HOODIE_META_COLUMNS) { + fieldBuilder = fieldBuilder + .name(metaFieldName) + .type(METADATA_FIELD_SCHEMA) + .withDefault(null); + } + } + + for (int i = 0; i < columnNames.size(); ++i) { + Schema fieldSchema = avroSchemaFromHiveType(columnTypes.get(i)); + fieldBuilder = fieldBuilder + .name(columnNames.get(i)) + .type(fieldSchema) + .withDefault(null); + } + return fieldBuilder.endRecord(); + } + + public static List prependHudiMetaColumns(List dataColumns) + { + List columns = new ArrayList<>(); + for (int i = 0; i < HOODIE_META_COLUMNS.size(); i++) { + String metaColumnName = HOODIE_META_COLUMNS.get(i); + boolean alreadyPresent = dataColumns.stream() + .anyMatch(handle -> handle.getName().equals(metaColumnName)); + if (!alreadyPresent) { + columns.add(new HiveColumnHandle( + metaColumnName, + i, + HiveType.HIVE_STRING, + VarcharType.VARCHAR, + Optional.empty(), + HiveColumnHandle.ColumnType.REGULAR, + Optional.empty())); + } + } + columns.addAll(dataColumns); + return columns; + } + + public static FileSlice convertToFileSlice(HudiSplit split, String basePath) + { + String dataFilePath = split.baseFile().isPresent() + ? split.baseFile().get().getPath() + : split.logFiles().getFirst().getPath(); + String fileId = FSUtils.getFileIdFromFileName(new StoragePath(dataFilePath).getName()); + HoodieBaseFile baseFile = split.baseFile().isPresent() + ? new HoodieBaseFile(dataFilePath, fileId, split.commitTime(), null) + : null; + + return new FileSlice( + new HoodieFileGroupId(FSUtils.getRelativePartitionPath(new StoragePath(basePath), new StoragePath(dataFilePath)), fileId), + split.commitTime(), + baseFile, + split.logFiles().stream().map(lf -> new HoodieLogFile(lf.getPath())).toList()); + } + + public static HoodieTableType fromInputFormat(String inputFormat) + { + return switch (inputFormat) { + case HUDI_PARQUET_INPUT_FORMAT, HUDI_INPUT_FORMAT -> HoodieTableType.COPY_ON_WRITE; + case HUDI_PARQUET_REALTIME_INPUT_FORMAT, HUDI_REALTIME_INPUT_FORMAT -> HoodieTableType.MERGE_ON_READ; + default -> throw new TrinoException(HUDI_UNSUPPORTED_TABLE_TYPE, "Table has an unsupported input format: " + inputFormat); + }; + } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/file/HudiBaseFile.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/file/HudiBaseFile.java new file mode 100644 index 000000000000..de1b2685a20d --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/file/HudiBaseFile.java @@ -0,0 +1,103 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.file; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.hudi.common.model.HoodieBaseFile; + +import static com.google.common.base.Preconditions.checkArgument; + +public class HudiBaseFile + implements HudiFile +{ + private final String path; + private final long fileSize; + private final long modificationTime; + private final long start; + private final long length; + + public static HudiBaseFile of(HoodieBaseFile baseFile) + { + return of(baseFile, 0, baseFile.getFileSize()); + } + + public static HudiBaseFile of(HoodieBaseFile baseFile, long start, long length) + { + return new HudiBaseFile(baseFile, start, length); + } + + @JsonCreator + public HudiBaseFile( + @JsonProperty("path") String path, + @JsonProperty("fileSize") long fileSize, + @JsonProperty("modificationTime") long modificationTime, + @JsonProperty("start") long start, + @JsonProperty("length") long length) + { + this.path = path; + this.fileSize = fileSize; + this.modificationTime = modificationTime; + this.start = start; + this.length = length; + } + + private HudiBaseFile(HoodieBaseFile baseFile, long start, long length) + { + checkArgument(baseFile != null, "baseFile is null"); + checkArgument(start >= 0, "start must be positive"); + checkArgument(length >= 0, "length must be positive"); + checkArgument(start + length <= baseFile.getFileSize(), "fileSize must be at least start + length"); + this.path = baseFile.getPath(); + this.fileSize = baseFile.getFileSize(); + this.modificationTime = baseFile.getPathInfo().getModificationTime(); + this.start = start; + this.length = length; + } + + @JsonProperty + @Override + public String getPath() + { + return path; + } + + @JsonProperty + @Override + public long getFileSize() + { + return fileSize; + } + + @JsonProperty + @Override + public long getModificationTime() + { + return modificationTime; + } + + @JsonProperty + @Override + public long getStart() + { + return start; + } + + @JsonProperty + @Override + public long getLength() + { + return length; + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/file/HudiFile.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/file/HudiFile.java new file mode 100644 index 000000000000..6fbb0a400a94 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/file/HudiFile.java @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.file; + +public interface HudiFile +{ + String getPath(); + + long getFileSize(); + + long getModificationTime(); + + long getStart(); + + long getLength(); +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/file/HudiLogFile.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/file/HudiLogFile.java new file mode 100644 index 000000000000..1f9c4b9effd8 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/file/HudiLogFile.java @@ -0,0 +1,103 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.file; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.hudi.common.model.HoodieLogFile; + +import static com.google.common.base.Preconditions.checkArgument; + +public class HudiLogFile + implements HudiFile +{ + private final String path; + private final long fileSize; + private final long modificationTime; + private final long start; + private final long length; + + public static HudiLogFile of(HoodieLogFile logFile) + { + return of(logFile, 0, logFile.getFileSize()); + } + + public static HudiLogFile of(HoodieLogFile logFile, long start, long length) + { + return new HudiLogFile(logFile, start, length); + } + + @JsonCreator + public HudiLogFile( + @JsonProperty("path") String path, + @JsonProperty("fileSize") long fileSize, + @JsonProperty("modificationTime") long modificationTime, + @JsonProperty("start") long start, + @JsonProperty("length") long length) + { + this.path = path; + this.fileSize = fileSize; + this.modificationTime = modificationTime; + this.start = start; + this.length = length; + } + + private HudiLogFile(HoodieLogFile logFile, long start, long length) + { + checkArgument(logFile != null, "logFile is null"); + checkArgument(start >= 0, "start must be positive"); + checkArgument(length >= 0, "length must be positive"); + checkArgument(start + length <= logFile.getFileSize(), "fileSize must be at least start + length"); + this.path = logFile.getPath().toString(); + this.fileSize = logFile.getFileSize(); + this.modificationTime = logFile.getPathInfo().getModificationTime(); + this.start = start; + this.length = length; + } + + @JsonProperty + @Override + public String getPath() + { + return path; + } + + @JsonProperty + @Override + public long getFileSize() + { + return fileSize; + } + + @JsonProperty + @Override + public long getModificationTime() + { + return modificationTime; + } + + @JsonProperty + @Override + public long getStart() + { + return start; + } + + @JsonProperty + @Override + public long getLength() + { + return length; + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java index bed1c2b33f67..2c242176a627 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java @@ -16,16 +16,17 @@ import io.airlift.concurrent.MoreFutures; import io.trino.plugin.hive.HivePartitionKey; import io.trino.plugin.hive.util.AsyncQueue; -import io.trino.plugin.hudi.HudiFileStatus; import io.trino.plugin.hudi.query.HudiDirectoryLister; import io.trino.plugin.hudi.split.HudiSplitFactory; import io.trino.spi.connector.ConnectorSplit; +import org.apache.hudi.common.model.FileSlice; import java.util.Deque; import java.util.List; import java.util.Optional; import static io.trino.plugin.hudi.partition.HiveHudiPartitionInfo.NON_PARTITION; +import static java.util.Objects.requireNonNull; public class HudiPartitionInfoLoader implements Runnable @@ -34,6 +35,7 @@ public class HudiPartitionInfoLoader private final HudiSplitFactory hudiSplitFactory; private final AsyncQueue asyncQueue; private final Deque partitionQueue; + private final String commitTime; private boolean isRunning; @@ -41,13 +43,15 @@ public HudiPartitionInfoLoader( HudiDirectoryLister hudiDirectoryLister, HudiSplitFactory hudiSplitFactory, AsyncQueue asyncQueue, - Deque partitionQueue) + Deque partitionQueue, + String commitTime) { - this.hudiDirectoryLister = hudiDirectoryLister; - this.hudiSplitFactory = hudiSplitFactory; - this.asyncQueue = asyncQueue; - this.partitionQueue = partitionQueue; + this.hudiDirectoryLister = requireNonNull(hudiDirectoryLister, "hudiDirectoryLister is null"); + this.hudiSplitFactory = requireNonNull(hudiSplitFactory, "hudiSplitFactory is null"); + this.asyncQueue = requireNonNull(asyncQueue, "asyncQueue is null"); + this.partitionQueue = requireNonNull(partitionQueue, "partitionQueue is null"); this.isRunning = true; + this.commitTime = requireNonNull(commitTime, "commitTime is null"); } @Override @@ -68,9 +72,9 @@ private void generateSplitsFromPartition(String partitionName) partitionInfo.ifPresent(hudiPartitionInfo -> { if (hudiPartitionInfo.doesMatchPredicates() || partitionName.equals(NON_PARTITION)) { List partitionKeys = hudiPartitionInfo.getHivePartitionKeys(); - List partitionFiles = hudiDirectoryLister.listStatus(hudiPartitionInfo); - partitionFiles.stream() - .flatMap(fileStatus -> hudiSplitFactory.createSplits(partitionKeys, fileStatus).stream()) + List partitionFileSlices = hudiDirectoryLister.listStatus(hudiPartitionInfo, commitTime); + partitionFileSlices.stream() + .flatMap(slice -> hudiSplitFactory.createSplits(partitionKeys, slice, commitTime).stream()) .map(asyncQueue::offer) .forEachOrdered(MoreFutures::getFutureValue); } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiDirectoryLister.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiDirectoryLister.java index 710dfc44916c..ea9d6fb928d6 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiDirectoryLister.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiDirectoryLister.java @@ -13,8 +13,8 @@ */ package io.trino.plugin.hudi.query; -import io.trino.plugin.hudi.HudiFileStatus; import io.trino.plugin.hudi.partition.HudiPartitionInfo; +import org.apache.hudi.common.model.FileSlice; import java.io.Closeable; import java.util.List; @@ -23,7 +23,7 @@ public interface HudiDirectoryLister extends Closeable { - List listStatus(HudiPartitionInfo partitionInfo); + List listStatus(HudiPartitionInfo partitionInfo, String commitTime); Optional getPartitionInfo(String partition); } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiSnapshotDirectoryLister.java similarity index 57% rename from plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java rename to plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiSnapshotDirectoryLister.java index 051b9d89bd21..d23dc0628b3e 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiReadOptimizedDirectoryLister.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/query/HudiSnapshotDirectoryLister.java @@ -13,22 +13,20 @@ */ package io.trino.plugin.hudi.query; -import io.airlift.log.Logger; -import io.airlift.units.DataSize; -import io.trino.filesystem.Location; import io.trino.metastore.Column; import io.trino.metastore.HiveMetastore; import io.trino.metastore.Table; import io.trino.plugin.hive.HiveColumnHandle; -import io.trino.plugin.hudi.HudiFileStatus; import io.trino.plugin.hudi.HudiTableHandle; import io.trino.plugin.hudi.partition.HiveHudiPartitionInfo; import io.trino.plugin.hudi.partition.HudiPartitionInfo; +import io.trino.plugin.hudi.storage.TrinoStorageConfiguration; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieLocalEngineContext; -import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; -import org.apache.hudi.storage.StoragePathInfo; import java.util.List; import java.util.Map; @@ -37,34 +35,28 @@ import java.util.stream.Collectors; import static com.google.common.collect.ImmutableList.toImmutableList; -import static io.airlift.units.DataSize.Unit.MEGABYTE; -import static java.lang.Math.max; -import static java.lang.Math.min; -import static org.apache.hudi.common.table.view.HoodieTableFileSystemView.fileListingBasedFileSystemView; -public class HudiReadOptimizedDirectoryLister +public class HudiSnapshotDirectoryLister implements HudiDirectoryLister { - private static final Logger LOG = Logger.get(HudiReadOptimizedDirectoryLister.class); - private static final long MIN_BLOCK_SIZE = DataSize.of(32, MEGABYTE).toBytes(); - private final HoodieTableFileSystemView fileSystemView; private final List partitionColumns; private final Map allPartitionInfoMap; - public HudiReadOptimizedDirectoryLister( + public HudiSnapshotDirectoryLister( HudiTableHandle tableHandle, HoodieTableMetaClient metaClient, + boolean enableMetadataTable, HiveMetastore hiveMetastore, Table hiveTable, List partitionColumnHandles, - List hivePartitionNames, - boolean ignoreAbsentPartitions) + List hivePartitionNames) { - this.fileSystemView = fileListingBasedFileSystemView( - new HoodieLocalEngineContext(metaClient.getStorageConf()), - metaClient, - metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()); + HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder() + .enable(enableMetadataTable) + .build(); + this.fileSystemView = FileSystemViewManager.createInMemoryFileSystemView( + new HoodieLocalEngineContext(new TrinoStorageConfiguration()), metaClient, metadataConfig); this.partitionColumns = hiveTable.getPartitionColumns(); this.allPartitionInfoMap = hivePartitionNames.stream() .collect(Collectors.toMap( @@ -79,17 +71,9 @@ public HudiReadOptimizedDirectoryLister( } @Override - public List listStatus(HudiPartitionInfo partitionInfo) + public List listStatus(HudiPartitionInfo partitionInfo, String commitTime) { - LOG.debug("List partition: partitionInfo=%s", partitionInfo); - return fileSystemView.getLatestBaseFiles(partitionInfo.getRelativePartitionPath()) - .map(HudiReadOptimizedDirectoryLister::getStoragePathInfo) - .map(fileEntry -> new HudiFileStatus( - Location.of(fileEntry.getPath().toString()), - false, - fileEntry.getLength(), - fileEntry.getModificationTime(), - max(fileEntry.getBlockSize(), min(fileEntry.getLength(), MIN_BLOCK_SIZE)))) + return fileSystemView.getLatestFileSlicesBeforeOrOn(partitionInfo.getRelativePartitionPath(), commitTime, false) .collect(toImmutableList()); } @@ -106,12 +90,4 @@ public void close() fileSystemView.close(); } } - - private static StoragePathInfo getStoragePathInfo(HoodieBaseFile baseFile) - { - if (baseFile.getBootstrapBaseFile().isPresent()) { - return baseFile.getBootstrapBaseFile().get().getPathInfo(); - } - return baseFile.getPathInfo(); - } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/reader/TrinoHudiReaderContext.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/reader/TrinoHudiReaderContext.java new file mode 100644 index 000000000000..7b1e121cb265 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/reader/TrinoHudiReaderContext.java @@ -0,0 +1,216 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.reader; + +import io.trino.plugin.hive.HiveColumnHandle; +import io.trino.plugin.hudi.util.HudiAvroSerializer; +import io.trino.plugin.hudi.util.SynthesizedColumnHandler; +import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.connector.SourcePage; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.config.RecordMergeMode; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.model.HoodieAvroIndexedRecord; +import org.apache.hudi.common.model.HoodieAvroRecordMerger; +import org.apache.hudi.common.model.HoodieEmptyRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.table.read.BufferedRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.UnaryOperator; + +public class TrinoHudiReaderContext + extends HoodieReaderContext +{ + private final ConnectorPageSource pageSource; + private final HudiAvroSerializer avroSerializer; + private final Map columnToPosition; + + public TrinoHudiReaderContext( + ConnectorPageSource pageSource, + List columnHandles, + SynthesizedColumnHandler synthesizedColumnHandler) + { + this.pageSource = pageSource; + this.avroSerializer = new HudiAvroSerializer(columnHandles, synthesizedColumnHandler); + this.columnToPosition = new HashMap<>(); + for (int i = 0; i < columnHandles.size(); i++) { + HiveColumnHandle handle = columnHandles.get(i); + columnToPosition.put(handle.getBaseColumnName(), i); + } + } + + @Override + public ClosableIterator getFileRecordIterator( + StoragePath storagePath, + long start, + long length, + Schema dataSchema, + Schema requiredSchema, + HoodieStorage storage) + { + return new ClosableIterator<>() + { + private SourcePage currentPage; + private int currentPosition; + + @Override + public void close() + { + try { + pageSource.close(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean hasNext() + { + // If all records in the current page are consume, try to get next page + if (currentPage == null || currentPosition >= currentPage.getPositionCount()) { + if (pageSource.isFinished()) { + return false; + } + + // Get next page and reset currentPosition + currentPage = pageSource.getNextSourcePage(); + currentPosition = 0; + + // If no more pages are available + return currentPage != null; + } + + return true; + } + + @Override + public IndexedRecord next() + { + if (!hasNext()) { + throw new RuntimeException("No more records in the iterator"); + } + + IndexedRecord record = avroSerializer.serialize(currentPage, currentPosition); + currentPosition++; + return record; + } + }; + } + + @Override + public IndexedRecord convertAvroRecord(IndexedRecord record) + { + return record; + } + + @Override + public GenericRecord convertToAvroRecord(IndexedRecord record, Schema schema) + { + GenericRecord ret = new GenericData.Record(schema); + for (Schema.Field field : schema.getFields()) { + ret.put(field.name(), record.get(field.pos())); + } + return ret; + } + + @Override + public Option getRecordMerger(RecordMergeMode mergeMode, String mergeStrategyId, String mergeImplClasses) + { + return Option.of(HoodieAvroRecordMerger.INSTANCE); + } + + @Override + public Object getValue(IndexedRecord record, Schema schema, String fieldName) + { + if (columnToPosition.containsKey(fieldName)) { + return record.get(columnToPosition.get(fieldName)); + } + // record doesn't have the queried field, return null + return null; + } + + @Override + public IndexedRecord seal(IndexedRecord record) + { + Schema schema = record.getSchema(); + IndexedRecord newRecord = new Record(schema); + List fields = schema.getFields(); + for (Schema.Field field : fields) { + int pos = schema.getField(field.name()).pos(); + newRecord.put(pos, record.get(pos)); + } + return newRecord; + } + + @Override + public ClosableIterator mergeBootstrapReaders( + ClosableIterator closableIterator, Schema schema, + ClosableIterator closableIterator1, Schema schema1) + { + return null; + } + + @Override + public UnaryOperator projectRecord( + Schema from, + Schema to, + Map renamedColumns) + { + List toFields = to.getFields(); + int[] projection = new int[toFields.size()]; + for (int i = 0; i < projection.length; i++) { + projection[i] = from.getField(toFields.get(i).name()).pos(); + } + + return fromRecord -> { + IndexedRecord toRecord = new Record(to); + for (int i = 0; i < projection.length; i++) { + toRecord.put(i, fromRecord.get(projection[i])); + } + return toRecord; + }; + } + + @Override + public HoodieRecord constructHoodieRecord(BufferedRecord bufferedRecord) + { + if (bufferedRecord.isDelete()) { + return new HoodieEmptyRecord<>( + new HoodieKey(bufferedRecord.getRecordKey(), null), + HoodieRecord.HoodieRecordType.AVRO); + } + return new HoodieAvroIndexedRecord(bufferedRecord.getRecord()); + } + + @Override + public IndexedRecord toBinaryRow(Schema schema, IndexedRecord indexedRecord) + { + return indexedRecord; + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java index 81447f1f7a78..3c308f3d9545 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java @@ -47,6 +47,7 @@ public class HudiBackgroundSplitLoader private final HudiSplitFactory hudiSplitFactory; private final List partitions; private final Consumer errorListener; + private final String commitTime; public HudiBackgroundSplitLoader( ConnectorSession session, @@ -56,7 +57,8 @@ public HudiBackgroundSplitLoader( Executor splitGeneratorExecutor, HudiSplitWeightProvider hudiSplitWeightProvider, List partitions, - Consumer errorListener) + Consumer errorListener, + String commitTime) { this.hudiDirectoryLister = requireNonNull(hudiDirectoryLister, "hudiDirectoryLister is null"); this.asyncQueue = requireNonNull(asyncQueue, "asyncQueue is null"); @@ -65,6 +67,7 @@ public HudiBackgroundSplitLoader( this.hudiSplitFactory = new HudiSplitFactory(tableHandle, hudiSplitWeightProvider); this.partitions = requireNonNull(partitions, "partitions is null"); this.errorListener = requireNonNull(errorListener, "errorListener is null"); + this.commitTime = requireNonNull(commitTime, "commitTime is null"); } @Override @@ -76,7 +79,7 @@ public void run() // Start a number of partition split generators to generate the splits in parallel for (int i = 0; i < splitGeneratorNumThreads; i++) { - HudiPartitionInfoLoader generator = new HudiPartitionInfoLoader(hudiDirectoryLister, hudiSplitFactory, asyncQueue, partitionQueue); + HudiPartitionInfoLoader generator = new HudiPartitionInfoLoader(hudiDirectoryLister, hudiSplitFactory, asyncQueue, partitionQueue, commitTime); splitGeneratorList.add(generator); ListenableFuture future = Futures.submit(generator, splitGeneratorExecutor); addExceptionCallback(future, errorListener); diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java index ccae0b5a38f8..bcb4cdc76ab1 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java @@ -14,21 +14,32 @@ package io.trino.plugin.hudi.split; import com.google.common.collect.ImmutableList; +import io.airlift.units.DataSize; import io.trino.plugin.hive.HivePartitionKey; -import io.trino.plugin.hudi.HudiFileStatus; import io.trino.plugin.hudi.HudiSplit; import io.trino.plugin.hudi.HudiTableHandle; +import io.trino.plugin.hudi.file.HudiBaseFile; +import io.trino.plugin.hudi.file.HudiLogFile; import io.trino.spi.TrinoException; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.util.Option; +import java.util.Collections; import java.util.List; +import java.util.Optional; +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.units.DataSize.Unit.MEGABYTE; import static io.trino.plugin.hudi.HudiErrorCode.HUDI_FILESYSTEM_ERROR; import static java.lang.String.format; import static java.util.Objects.requireNonNull; +import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; public class HudiSplitFactory { private static final double SPLIT_SLOP = 1.1; // 10% slop/overflow allowed in bytes per split while generating splits + private static final long MIN_BLOCK_SIZE = DataSize.of(32, MEGABYTE).toBytes(); private final HudiTableHandle hudiTableHandle; private final HudiSplitWeightProvider hudiSplitWeightProvider; @@ -41,53 +52,67 @@ public HudiSplitFactory( this.hudiSplitWeightProvider = requireNonNull(hudiSplitWeightProvider, "hudiSplitWeightProvider is null"); } - public List createSplits(List partitionKeys, HudiFileStatus fileStatus) + public List createSplits(List partitionKeys, FileSlice fileSlice, String commitTime) { - if (fileStatus.isDirectory()) { - throw new TrinoException(HUDI_FILESYSTEM_ERROR, format("Not a valid location: %s", fileStatus.location())); + if (fileSlice.isEmpty()) { + throw new TrinoException(HUDI_FILESYSTEM_ERROR, format("Not a valid file slice: %s", fileSlice)); } - long fileSize = fileStatus.length(); + // Handle MERGE_ON_READ tables to be read in read_optimized mode + // IMPORTANT: These tables will have a COPY_ON_WRITE table type due to how `HudiTableTypeUtils#fromInputFormat` + if (fileSlice.getLogFiles().findAny().isEmpty() + || hudiTableHandle.getTableType().equals(COPY_ON_WRITE)) { + // Base file only + checkArgument(fileSlice.getBaseFile().isPresent(), + "Hudi base file must exist if there is no log file in the file slice"); + HoodieBaseFile baseFile = fileSlice.getBaseFile().get(); + long fileSize = baseFile.getFileSize(); - if (fileSize == 0) { - return ImmutableList.of(new HudiSplit( - fileStatus.location().toString(), - 0, - fileSize, - fileSize, - fileStatus.modificationTime(), - hudiTableHandle.getRegularPredicates(), - partitionKeys, - hudiSplitWeightProvider.calculateSplitWeight(fileSize))); - } + if (fileSize == 0) { + return ImmutableList.of(new HudiSplit( + Optional.of(HudiBaseFile.of(baseFile)), + Collections.emptyList(), + commitTime, + hudiTableHandle.getRegularPredicates(), + partitionKeys, + hudiSplitWeightProvider.calculateSplitWeight(fileSize))); + } - ImmutableList.Builder splits = ImmutableList.builder(); - long splitSize = fileStatus.blockSize(); + ImmutableList.Builder splits = ImmutableList.builder(); + long splitSize = Math.max(MIN_BLOCK_SIZE, baseFile.getPathInfo().getBlockSize()); - long bytesRemaining = fileSize; - while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { - splits.add(new HudiSplit( - fileStatus.location().toString(), - fileSize - bytesRemaining, - splitSize, - fileSize, - fileStatus.modificationTime(), - hudiTableHandle.getRegularPredicates(), - partitionKeys, - hudiSplitWeightProvider.calculateSplitWeight(splitSize))); - bytesRemaining -= splitSize; - } - if (bytesRemaining > 0) { - splits.add(new HudiSplit( - fileStatus.location().toString(), - fileSize - bytesRemaining, - bytesRemaining, - fileSize, - fileStatus.modificationTime(), - hudiTableHandle.getRegularPredicates(), - partitionKeys, - hudiSplitWeightProvider.calculateSplitWeight(bytesRemaining))); + long bytesRemaining = fileSize; + while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { + splits.add(new HudiSplit( + Optional.of(HudiBaseFile.of(baseFile, fileSize - bytesRemaining, splitSize)), + Collections.emptyList(), + commitTime, + hudiTableHandle.getRegularPredicates(), + partitionKeys, + hudiSplitWeightProvider.calculateSplitWeight(splitSize))); + bytesRemaining -= splitSize; + } + if (bytesRemaining > 0) { + splits.add(new HudiSplit( + Optional.of(HudiBaseFile.of(baseFile, fileSize - bytesRemaining, bytesRemaining)), + Collections.emptyList(), + commitTime, + hudiTableHandle.getRegularPredicates(), + partitionKeys, + hudiSplitWeightProvider.calculateSplitWeight(bytesRemaining))); + } + return splits.build(); } - return splits.build(); + + // Base and log files + Option baseFileOption = fileSlice.getBaseFile(); + return Collections.singletonList( + new HudiSplit( + baseFileOption.isPresent() ? Optional.of(HudiBaseFile.of(baseFileOption.get())) : Optional.empty(), + fileSlice.getLogFiles().map(HudiLogFile::of).toList(), + commitTime, + hudiTableHandle.getRegularPredicates(), + partitionKeys, + hudiSplitWeightProvider.calculateSplitWeight(fileSlice.getTotalFileSize()))); } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/util/HudiAvroSerializer.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/util/HudiAvroSerializer.java new file mode 100644 index 000000000000..0be3a8241689 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/util/HudiAvroSerializer.java @@ -0,0 +1,320 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.util; + +import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.plugin.hive.HiveColumnHandle; +import io.trino.spi.PageBuilder; +import io.trino.spi.TrinoException; +import io.trino.spi.block.ArrayBlockBuilder; +import io.trino.spi.block.BlockBuilder; +import io.trino.spi.block.MapBlockBuilder; +import io.trino.spi.block.RowBlockBuilder; +import io.trino.spi.connector.SourcePage; +import io.trino.spi.type.ArrayType; +import io.trino.spi.type.DecimalType; +import io.trino.spi.type.Decimals; +import io.trino.spi.type.Int128; +import io.trino.spi.type.LongTimestamp; +import io.trino.spi.type.LongTimestampWithTimeZone; +import io.trino.spi.type.MapType; +import io.trino.spi.type.RowType; +import io.trino.spi.type.SqlDate; +import io.trino.spi.type.Type; +import io.trino.spi.type.VarbinaryType; +import io.trino.spi.type.VarcharType; +import org.apache.avro.Conversions; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.util.Utf8; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.Map; + +import static com.google.common.base.Verify.verify; +import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.plugin.hudi.HudiUtil.constructSchema; +import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.DateType.DATE; +import static io.trino.spi.type.Decimals.encodeShortScaledValue; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.LongTimestampWithTimeZone.fromEpochMillisAndFraction; +import static io.trino.spi.type.TimeType.TIME_MICROS; +import static io.trino.spi.type.TimeZoneKey.UTC_KEY; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS; +import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS; +import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND; +import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND; +import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND; +import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND; +import static java.lang.Integer.parseInt; +import static java.lang.Math.floorDiv; +import static java.lang.Math.floorMod; +import static java.lang.Math.toIntExact; +import static java.lang.String.format; +import static java.time.ZoneOffset.UTC; + +public class HudiAvroSerializer +{ + private static final int[] NANO_FACTOR = { + -1, // 0, no need to multiply + 100_000_000, // 1 digit after the dot + 10_000_000, // 2 digits after the dot + 1_000_000, // 3 digits after the dot + 100_000, // 4 digits after the dot + 10_000, // 5 digits after the dot + 1000, // 6 digits after the dot + 100, // 7 digits after the dot + 10, // 8 digits after the dot + 1, // 9 digits after the dot + }; + + private static final AvroDecimalConverter DECIMAL_CONVERTER = new AvroDecimalConverter(); + + private final List columnHandles; + private final List columnTypes; + private final Schema schema; + private final SynthesizedColumnHandler synthesizedColumnHandler; + + public HudiAvroSerializer(List columnHandles, SynthesizedColumnHandler synthesizedColumnHandler) + { + this.columnHandles = ImmutableList.copyOf(columnHandles); + this.columnTypes = columnHandles.stream().map(HiveColumnHandle::getType).toList(); + // Fetches projected schema + this.schema = constructSchema( + columnHandles.stream().filter(column -> !column.isHidden()).map(HiveColumnHandle::getName).toList(), + columnHandles.stream().filter(column -> !column.isHidden()).map(HiveColumnHandle::getHiveType).toList(), + false); + this.synthesizedColumnHandler = synthesizedColumnHandler; + } + + public IndexedRecord serialize(SourcePage sourcePage, int position) + { + IndexedRecord record = new GenericData.Record(schema); + for (int i = 0; i < columnTypes.size(); i++) { + Object value = getValue(sourcePage, i, position); + record.put(i, value); + } + return record; + } + + public Object getValue(SourcePage sourcePage, int channel, int position) + { + return columnTypes.get(channel).getObjectValue(sourcePage.getBlock(channel), position); + } + + public void buildRecordInPage(PageBuilder pageBuilder, IndexedRecord record) + { + pageBuilder.declarePosition(); + int blockSeq = 0; + for (int channel = 0; channel < columnTypes.size(); channel++, blockSeq++) { + BlockBuilder output = pageBuilder.getBlockBuilder(blockSeq); + HiveColumnHandle columnHandle = columnHandles.get(channel); + if (synthesizedColumnHandler.isSynthesizedColumn(columnHandle)) { + synthesizedColumnHandler.getColumnStrategy(columnHandle).appendToBlock(output); + } + else { + // Record may not be projected, get index from it + int fieldPosInSchema = record.getSchema().getField(columnHandle.getName()).pos(); + appendTo(columnTypes.get(channel), record.get(fieldPosInSchema), output); + } + } + } + + public void appendTo(Type type, Object value, BlockBuilder output) + { + if (value == null) { + output.appendNull(); + return; + } + + Class javaType = type.getJavaType(); + try { + if (javaType == boolean.class) { + type.writeBoolean(output, (Boolean) value); + } + else if (javaType == long.class) { + if (type.equals(BIGINT)) { + type.writeLong(output, ((Number) value).longValue()); + } + else if (type.equals(INTEGER)) { + type.writeLong(output, ((Number) value).intValue()); + } + else if (type instanceof DecimalType decimalType) { + verify(decimalType.isShort(), "The type should be short decimal"); + BigDecimal decimal = DECIMAL_CONVERTER.convert(decimalType.getPrecision(), decimalType.getScale(), value); + type.writeLong(output, encodeShortScaledValue(decimal, decimalType.getScale())); + } + else if (type.equals(DATE)) { + type.writeLong(output, ((SqlDate) value).getDays()); + } + else if (type.equals(TIMESTAMP_MICROS)) { + type.writeLong(output, toTrinoTimestamp(((Utf8) value).toString())); + } + else if (type.equals(TIME_MICROS)) { + type.writeLong(output, (long) value * PICOSECONDS_PER_MICROSECOND); + } + else { + throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type)); + } + } + else if (javaType == double.class) { + type.writeDouble(output, ((Number) value).doubleValue()); + } + else if (type.getJavaType() == Int128.class) { + writeObject(output, type, value); + } + else if (javaType == Slice.class) { + writeSlice(output, type, value); + } + else if (javaType == LongTimestamp.class) { + type.writeObject(output, value); + } + else if (javaType == LongTimestampWithTimeZone.class) { + verify(type.equals(TIMESTAMP_TZ_MICROS)); + long epochMicros = (long) value; + int picosOfMillis = toIntExact(floorMod(epochMicros, MICROSECONDS_PER_MILLISECOND)) * PICOSECONDS_PER_MICROSECOND; + type.writeObject(output, fromEpochMillisAndFraction(floorDiv(epochMicros, MICROSECONDS_PER_MILLISECOND), picosOfMillis, UTC_KEY)); + } + else if (type instanceof ArrayType arrayType) { + writeArray((ArrayBlockBuilder) output, (List) value, arrayType); + } + else if (type instanceof RowType rowType) { + writeRow((RowBlockBuilder) output, (GenericRecord) value, rowType); + } + else if (type instanceof MapType mapType) { + writeMap((MapBlockBuilder) output, (Map) value, mapType); + } + else { + throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type)); + } + } + catch (ClassCastException ignore) { + // returns null instead of raising exception + output.appendNull(); + } + } + + public static LocalDateTime toLocalDateTime(String datetime) + { + int dotPosition = datetime.indexOf('.'); + if (dotPosition == -1) { + // no sub-second element + return LocalDateTime.from(DateTimeFormatter.ISO_LOCAL_DATE_TIME.parse(datetime)); + } + LocalDateTime result = LocalDateTime.from(DateTimeFormatter.ISO_LOCAL_DATE_TIME.parse(datetime.substring(0, dotPosition))); + // has sub-second element, so convert to nanosecond + String nanosStr = datetime.substring(dotPosition + 1); + int nanoOfSecond = parseInt(nanosStr) * NANO_FACTOR[nanosStr.length()]; + return result.withNano(nanoOfSecond); + } + + public static long toTrinoTimestamp(String datetime) + { + Instant instant = toLocalDateTime(datetime).toInstant(UTC); + return (instant.getEpochSecond() * MICROSECONDS_PER_SECOND) + (instant.getNano() / NANOSECONDS_PER_MICROSECOND); + } + + private static void writeSlice(BlockBuilder output, Type type, Object value) + { + if (type instanceof VarcharType) { + if (value instanceof Utf8 utf8) { + type.writeSlice(output, utf8Slice(utf8.toString())); + } + else if (value instanceof String str) { + type.writeSlice(output, utf8Slice(str)); + } + else { + type.writeSlice(output, utf8Slice(value.toString())); + } + } + else if (type instanceof VarbinaryType) { + if (value instanceof ByteBuffer byteBuffer) { + type.writeSlice(output, Slices.wrappedHeapBuffer(byteBuffer)); + } + else { + output.appendNull(); + } + } + else { + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Slice: " + type.getTypeSignature()); + } + } + + private static void writeObject(BlockBuilder output, Type type, Object value) + { + if (type instanceof DecimalType decimalType) { + verify(!decimalType.isShort(), "The type should be long decimal"); + BigDecimal decimal = DECIMAL_CONVERTER.convert(decimalType.getPrecision(), decimalType.getScale(), value); + type.writeObject(output, Decimals.encodeScaledValue(decimal, decimalType.getScale())); + } + else { + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Object: " + type.getTypeSignature()); + } + } + + private void writeArray(ArrayBlockBuilder output, List value, ArrayType arrayType) + { + Type elementType = arrayType.getElementType(); + output.buildEntry(elementBuilder -> { + for (Object element : value) { + appendTo(elementType, element, elementBuilder); + } + }); + } + + private void writeRow(RowBlockBuilder output, GenericRecord record, RowType rowType) + { + List fields = rowType.getFields(); + output.buildEntry(fieldBuilders -> { + for (int index = 0; index < fields.size(); index++) { + RowType.Field field = fields.get(index); + appendTo(field.getType(), record.get(field.getName().orElse("field" + index)), fieldBuilders.get(index)); + } + }); + } + + private void writeMap(MapBlockBuilder output, Map value, MapType mapType) + { + Type keyType = mapType.getKeyType(); + Type valueType = mapType.getValueType(); + output.buildEntry((keyBuilder, valueBuilder) -> { + for (Map.Entry entry : value.entrySet()) { + appendTo(keyType, entry.getKey(), keyBuilder); + appendTo(valueType, entry.getValue(), valueBuilder); + } + }); + } + + static class AvroDecimalConverter + { + private static final Conversions.DecimalConversion AVRO_DECIMAL_CONVERSION = new Conversions.DecimalConversion(); + + BigDecimal convert(int precision, int scale, Object value) + { + Schema schema = new Schema.Parser().parse(format("{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":%d,\"scale\":%d}", precision, scale)); + return AVRO_DECIMAL_CONVERSION.fromBytes((ByteBuffer) value, schema, schema.getLogicalType()); + } + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/util/SynthesizedColumnHandler.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/util/SynthesizedColumnHandler.java new file mode 100644 index 000000000000..7c85ec9b6552 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/util/SynthesizedColumnHandler.java @@ -0,0 +1,158 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.util; + +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.hive.HiveColumnHandle; +import io.trino.plugin.hive.HivePartitionKey; +import io.trino.plugin.hudi.HudiSplit; +import io.trino.plugin.hudi.file.HudiFile; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.metastore.Partitions.makePartName; +import static io.trino.plugin.hive.HiveColumnHandle.FILE_MODIFIED_TIME_COLUMN_NAME; +import static io.trino.plugin.hive.HiveColumnHandle.FILE_SIZE_COLUMN_NAME; +import static io.trino.plugin.hive.HiveColumnHandle.PARTITION_COLUMN_NAME; +import static io.trino.plugin.hive.HiveColumnHandle.PATH_COLUMN_NAME; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone; +import static io.trino.spi.type.TimeZoneKey.UTC_KEY; +import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; +import static io.trino.spi.type.VarcharType.VARCHAR; + +/** + * Handles synthesized (virtual) columns in Hudi tables, such as partition columns and metadata (not hudi metadata) + * columns. + */ +public class SynthesizedColumnHandler +{ + private final Map strategies; + private final SplitMetadata splitMetadata; + + public static SynthesizedColumnHandler create(HudiSplit hudiSplit) + { + return new SynthesizedColumnHandler(hudiSplit); + } + + public SynthesizedColumnHandler(HudiSplit hudiSplit) + { + this.splitMetadata = SplitMetadata.of(hudiSplit); + ImmutableMap.Builder builder = ImmutableMap.builder(); + initSynthesizedColStrategies(builder); + initPartitionKeyStrategies(builder, hudiSplit); + strategies = builder.buildOrThrow(); + } + + private void initSynthesizedColStrategies(ImmutableMap.Builder builder) + { + builder.put(PARTITION_COLUMN_NAME, blockBuilder -> + VARCHAR.writeSlice(blockBuilder, + utf8Slice(toPartitionName(splitMetadata.getPartitionKeyVals())))); + + builder.put(PATH_COLUMN_NAME, blockBuilder -> + VARCHAR.writeSlice(blockBuilder, utf8Slice(splitMetadata.getFilePath()))); + + builder.put(FILE_SIZE_COLUMN_NAME, blockBuilder -> + BIGINT.writeLong(blockBuilder, splitMetadata.getFileSize())); + + builder.put(FILE_MODIFIED_TIME_COLUMN_NAME, blockBuilder -> { + long packedTimestamp = packDateTimeWithZone( + splitMetadata.getFileModificationTime(), UTC_KEY); + TIMESTAMP_TZ_MILLIS.writeLong(blockBuilder, packedTimestamp); + }); + } + + private static void initPartitionKeyStrategies(ImmutableMap.Builder builder, + HudiSplit hudiSplit) + { + for (HivePartitionKey partitionKey : hudiSplit.partitionKeys()) { + builder.put(partitionKey.name(), (blockBuilder) -> + VARCHAR.writeSlice(blockBuilder, utf8Slice(partitionKey.value()))); + } + } + + public boolean isSynthesizedColumn(String columnName) + { + return strategies.containsKey(columnName); + } + + public boolean isSynthesizedColumn(HiveColumnHandle columnHandle) + { + return isSynthesizedColumn(columnHandle.getName()); + } + + public SynthesizedColumnStrategy getColumnStrategy(HiveColumnHandle columnHandle) + { + return strategies.get(columnHandle.getName()); + } + + private static String toPartitionName(Map partitionKeyVals) + { + return makePartName(List.copyOf(partitionKeyVals.keySet()), List.copyOf(partitionKeyVals.values())); + } + + /** + * Represents metadata about split being processed. + * Splits are assumed to be in the same partition. + */ + public static class SplitMetadata + { + private final Map partitionKeyVals; + private final String filePath; + private final long fileSize; + private final long modifiedTime; + + public static SplitMetadata of(HudiSplit hudiSplit) + { + return new SplitMetadata(hudiSplit); + } + + public SplitMetadata(HudiSplit hudiSplit) + { + this.partitionKeyVals = hudiSplit.partitionKeys().stream() + .collect(Collectors.toMap(HivePartitionKey::name, HivePartitionKey::value)); + // Parquet files will be prioritised over log files + HudiFile hudiFile = hudiSplit.baseFile().isPresent() + ? hudiSplit.baseFile().get() + : hudiSplit.logFiles().getFirst(); + this.filePath = hudiFile.getPath(); + this.fileSize = hudiFile.getFileSize(); + this.modifiedTime = hudiFile.getModificationTime(); + } + + public Map getPartitionKeyVals() + { + return partitionKeyVals; + } + + public String getFilePath() + { + return filePath; + } + + public long getFileSize() + { + return fileSize; + } + + public long getFileModificationTime() + { + return modifiedTime; + } + } +} diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/util/SynthesizedColumnStrategy.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/util/SynthesizedColumnStrategy.java new file mode 100644 index 000000000000..754961eeadd5 --- /dev/null +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/util/SynthesizedColumnStrategy.java @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.util; + +import io.trino.spi.block.BlockBuilder; + +public interface SynthesizedColumnStrategy +{ + void appendToBlock(BlockBuilder blockBuilder); +} diff --git a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiConfig.java b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiConfig.java index 746b3c05f713..39e24640ebc6 100644 --- a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiConfig.java +++ b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiConfig.java @@ -42,7 +42,7 @@ public void testDefaults() .setSplitGeneratorParallelism(4) .setPerTransactionMetastoreCacheMaximumSize(2000) .setQueryPartitionFilterRequired(false) - .setIgnoreAbsentPartitions(false)); + .setMetadataEnabled(false)); } @Test @@ -60,7 +60,7 @@ public void testExplicitPropertyMappings() .put("hudi.split-generator-parallelism", "32") .put("hudi.per-transaction-metastore-cache-maximum-size", "1000") .put("hudi.query-partition-filter-required", "true") - .put("hudi.ignore-absent-partitions", "true") + .put("hudi.metadata-enabled", "true") .buildOrThrow(); HudiConfig expected = new HudiConfig() @@ -75,7 +75,7 @@ public void testExplicitPropertyMappings() .setSplitGeneratorParallelism(32) .setPerTransactionMetastoreCacheMaximumSize(1000) .setQueryPartitionFilterRequired(true) - .setIgnoreAbsentPartitions(true); + .setMetadataEnabled(true); assertFullMapping(properties, expected); } diff --git a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java index 24abd8b5509c..89b1cfa3c929 100644 --- a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java +++ b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java @@ -28,6 +28,7 @@ import static io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer.TestingTable.HUDI_COW_PT_TBL; import static io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer.TestingTable.HUDI_NON_PART_COW; +import static io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer.TestingTable.HUDI_STOCK_TICKS_MOR; import static io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer.TestingTable.STOCK_TICKS_COW; import static io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer.TestingTable.STOCK_TICKS_MOR; import static org.assertj.core.api.Assertions.assertThat; @@ -134,12 +135,25 @@ public void testMetaColumns() "VALUES ('719c3273-2805-4124-b1ac-e980dada85bf-0_0-27-1215_20220906063435640.parquet'), ('4a3fcb9b-65eb-4f6e-acf9-7b0764bb4dd1-0_0-70-2444_20220906063456550.parquet')"); } + @Test + public void testReadPartitionedMORTableWithMetadata() + { + Session session = withMetadataEnabled(getSession()); + assertThat(query(session, "SELECT symbol, max(ts) FROM " + HUDI_STOCK_TICKS_MOR + " GROUP BY symbol HAVING symbol = 'GOOG'")) + .matches("VALUES (VARCHAR 'GOOG', VARCHAR '2018-08-31 10:59:00')"); + + assertThat(query(session, "SELECT date, count(1) FROM " + HUDI_STOCK_TICKS_MOR + " GROUP BY date")) + .matches("VALUES (VARCHAR '2018-08-31', BIGINT '99')"); + } + @Test public void testPathColumn() throws Exception { String path = (String) computeScalar("SELECT \"$path\" FROM " + HUDI_COW_PT_TBL + " WHERE id = 1"); assertThat(toInputFile(path).exists()).isTrue(); + path = (String) computeScalar("SELECT \"$path\" FROM " + HUDI_STOCK_TICKS_MOR + " WHERE volume = 6794"); + assertThat(toInputFile(path).exists()).isTrue(); } @Test @@ -366,6 +380,13 @@ private static Session withPartitionFilterRequired(Session session) .build(); } + private static Session withMetadataEnabled(Session session) + { + return Session.builder(session) + .setCatalogSessionProperty(session.getCatalog().orElseThrow(), "metadata_enabled", "true") + .build(); + } + private TrinoInputFile toInputFile(String path) { return ((HudiConnector) getDistributedQueryRunner().getCoordinator().getConnector("hudi")).getInjector() diff --git a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/ResourceHudiTablesInitializer.java b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/ResourceHudiTablesInitializer.java index 55d7c78b22fb..f1f0f11532fa 100644 --- a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/ResourceHudiTablesInitializer.java +++ b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/testing/ResourceHudiTablesInitializer.java @@ -167,6 +167,7 @@ public enum TestingTable HUDI_COW_PT_TBL(multiPartitionRegularColumns(), multiPartitionColumns(), multiPartitions()), STOCK_TICKS_COW(stockTicksRegularColumns(), stockTicksPartitionColumns(), stockTicksPartitions()), STOCK_TICKS_MOR(stockTicksRegularColumns(), stockTicksPartitionColumns(), stockTicksPartitions()), + HUDI_STOCK_TICKS_MOR(metadataStockTicksRegularColumns(), metadataStockTicksPartitionColumns(), metadataStockTicksPartitions()), /**/; private static final List HUDI_META_COLUMNS = ImmutableList.of( @@ -176,6 +177,32 @@ public enum TestingTable new Column("_hoodie_partition_path", HIVE_STRING, Optional.empty(), Map.of()), new Column("_hoodie_file_name", HIVE_STRING, Optional.empty(), Map.of())); + private static List metadataStockTicksRegularColumns() + { + return ImmutableList.of( + column("volume", HIVE_LONG), + column("ts", HIVE_STRING), + column("symbol", HIVE_STRING), + column("year", HIVE_INT), + column("month", HIVE_STRING), + column("high", HIVE_DOUBLE), + column("low", HIVE_DOUBLE), + column("key", HIVE_STRING), + column("close", HIVE_DOUBLE), + column("open", HIVE_DOUBLE), + column("day", HIVE_STRING)); + } + + private static List metadataStockTicksPartitionColumns() + { + return ImmutableList.of(column("date", HIVE_STRING)); + } + + private static Map metadataStockTicksPartitions() + { + return ImmutableMap.of("date=2018-08-31", "2018/08/31"); + } + private final List regularColumns; private final List partitionColumns; private final Map partitions; diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/.hoodie.properties.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/.hoodie.properties.crc new file mode 100644 index 000000000000..845332a748c8 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/.hoodie.properties.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/hoodie.properties b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/hoodie.properties new file mode 100644 index 000000000000..a72d7e43fa73 --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/hoodie.properties @@ -0,0 +1,29 @@ +#Updated at 2025-01-06T01:29:42.850Z +#Sun Jan 05 17:29:42 PST 2025 +hoodie.table.timeline.timezone=LOCAL +hoodie.table.version=8 +hoodie.database.name= +hoodie.table.initial.version=8 +hoodie.datasource.write.hive_style_partitioning=false +hoodie.table.metadata.partitions.inflight= +hoodie.table.checksum=1806126834 +hoodie.table.keygenerator.type=SIMPLE +hoodie.partition.metafile.use.base.format=false +hoodie.archivelog.folder=history +hoodie.table.cdc.enabled=false +hoodie.table.name=hudi_stock_ticks_mor +hoodie.record.merge.strategy.id=eeb8d96f-b1e4-49fd-bbf8-28ac514178e5 +hoodie.timeline.history.path=history +hoodie.compaction.payload.class=org.apache.hudi.common.model.DefaultHoodieRecordPayload +hoodie.populate.meta.fields=true +hoodie.table.type=MERGE_ON_READ +hoodie.datasource.write.partitionpath.urlencode=false +hoodie.table.base.file.format=PARQUET +hoodie.datasource.write.drop.partition.columns=false +hoodie.table.metadata.partitions=files +hoodie.timeline.layout.version=2 +hoodie.table.multiple.base.file.formats.enable=false +hoodie.record.merge.mode=EVENT_TIME_ORDERING +hoodie.table.recordkey.fields=key +hoodie.table.partition.fields=date +hoodie.timeline.path=timeline diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/.hoodie.properties.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/.hoodie.properties.crc new file mode 100644 index 000000000000..dd38489911cc Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/.hoodie.properties.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/hoodie.properties b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/hoodie.properties new file mode 100644 index 000000000000..3aaf794ae22b --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/hoodie.properties @@ -0,0 +1,19 @@ +#Properties saved on 2025-01-06T01:29:41.549Z +#Sun Jan 05 17:29:41 PST 2025 +hoodie.table.initial.version=8 +hoodie.datasource.write.drop.partition.columns=false +hoodie.table.type=MERGE_ON_READ +hoodie.archivelog.folder=history +hoodie.populate.meta.fields=false +hoodie.compaction.payload.class=org.apache.hudi.metadata.HoodieMetadataPayload +hoodie.timeline.path=timeline +hoodie.timeline.layout.version=2 +hoodie.table.version=8 +hoodie.record.merge.strategy.id=00000000-0000-0000-0000-000000000000 +hoodie.record.merge.mode=CUSTOM +hoodie.table.base.file.format=HFILE +hoodie.table.recordkey.fields=key +hoodie.table.keygenerator.type=HOODIE_TABLE_METADATA +hoodie.table.name=hudi_stock_ticks_mor_metadata +hoodie.timeline.history.path=history +hoodie.table.checksum=100423886 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.00000000000000000.deltacommit.inflight.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.00000000000000000.deltacommit.inflight.crc new file mode 100644 index 000000000000..3b7b044936a8 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.00000000000000000.deltacommit.inflight.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.00000000000000000.deltacommit.requested.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.00000000000000000.deltacommit.requested.crc new file mode 100644 index 000000000000..3b7b044936a8 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.00000000000000000.deltacommit.requested.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.00000000000000000_20250105172942716.deltacommit.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.00000000000000000_20250105172942716.deltacommit.crc new file mode 100644 index 000000000000..6d5284ede9a8 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.00000000000000000_20250105172942716.deltacommit.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.20250105172941176.deltacommit.inflight.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.20250105172941176.deltacommit.inflight.crc new file mode 100644 index 000000000000..beda4fbc18c6 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.20250105172941176.deltacommit.inflight.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.20250105172941176.deltacommit.requested.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.20250105172941176.deltacommit.requested.crc new file mode 100644 index 000000000000..3b7b044936a8 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.20250105172941176.deltacommit.requested.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.20250105172941176_20250105172945438.deltacommit.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.20250105172941176_20250105172945438.deltacommit.crc new file mode 100644 index 000000000000..3039bdcb6325 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.20250105172941176_20250105172945438.deltacommit.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.20250105172945774.deltacommit.inflight.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.20250105172945774.deltacommit.inflight.crc new file mode 100644 index 000000000000..d7db73e7e534 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.20250105172945774.deltacommit.inflight.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.20250105172945774.deltacommit.requested.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.20250105172945774.deltacommit.requested.crc new file mode 100644 index 000000000000..3b7b044936a8 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.20250105172945774.deltacommit.requested.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.20250105172945774_20250105172946721.deltacommit.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.20250105172945774_20250105172946721.deltacommit.crc new file mode 100644 index 000000000000..a963e0a0c7c8 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/.20250105172945774_20250105172946721.deltacommit.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/00000000000000000.deltacommit.inflight b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/00000000000000000.deltacommit.inflight new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/00000000000000000.deltacommit.requested b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/00000000000000000.deltacommit.requested new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/00000000000000000_20250105172942716.deltacommit b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/00000000000000000_20250105172942716.deltacommit new file mode 100644 index 000000000000..ab347748ac86 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/00000000000000000_20250105172942716.deltacommit differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/20250105172941176.deltacommit.inflight b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/20250105172941176.deltacommit.inflight new file mode 100644 index 000000000000..0a6daacf665e Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/20250105172941176.deltacommit.inflight differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/20250105172941176.deltacommit.requested b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/20250105172941176.deltacommit.requested new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/20250105172941176_20250105172945438.deltacommit b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/20250105172941176_20250105172945438.deltacommit new file mode 100644 index 000000000000..d22103c8f0aa Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/20250105172941176_20250105172945438.deltacommit differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/20250105172945774.deltacommit.inflight b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/20250105172945774.deltacommit.inflight new file mode 100644 index 000000000000..32900f146ddc Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/20250105172945774.deltacommit.inflight differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/20250105172945774.deltacommit.requested b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/20250105172945774.deltacommit.requested new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/20250105172945774_20250105172946721.deltacommit b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/20250105172945774_20250105172946721.deltacommit new file mode 100644 index 000000000000..36f22b424ece Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/.hoodie/timeline/20250105172945774_20250105172946721.deltacommit differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/..files-0000-0_00000000000000000.log.1_0-0-0.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/..files-0000-0_00000000000000000.log.1_0-0-0.crc new file mode 100644 index 000000000000..9ac537d8863c Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/..files-0000-0_00000000000000000.log.1_0-0-0.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/..files-0000-0_20250105172941176.log.1_0-34-46.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/..files-0000-0_20250105172941176.log.1_0-34-46.crc new file mode 100644 index 000000000000..b31a9111e5dd Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/..files-0000-0_20250105172941176.log.1_0-34-46.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/..files-0000-0_20250105172945774.log.1_0-62-88.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/..files-0000-0_20250105172945774.log.1_0-62-88.crc new file mode 100644 index 000000000000..4d8ff61d9e03 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/..files-0000-0_20250105172945774.log.1_0-62-88.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/..hoodie_partition_metadata.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/..hoodie_partition_metadata.crc new file mode 100644 index 000000000000..a58114fea269 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/..hoodie_partition_metadata.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/.files-0000-0_0-9-8_00000000000000000.hfile.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/.files-0000-0_0-9-8_00000000000000000.hfile.crc new file mode 100644 index 000000000000..4cffecd35f5b Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/.files-0000-0_0-9-8_00000000000000000.hfile.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/.files-0000-0_00000000000000000.log.1_0-0-0 b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/.files-0000-0_00000000000000000.log.1_0-0-0 new file mode 100644 index 000000000000..8eb3d43d1ccf Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/.files-0000-0_00000000000000000.log.1_0-0-0 differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/.files-0000-0_20250105172941176.log.1_0-34-46 b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/.files-0000-0_20250105172941176.log.1_0-34-46 new file mode 100644 index 000000000000..e4199945629d Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/.files-0000-0_20250105172941176.log.1_0-34-46 differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/.files-0000-0_20250105172945774.log.1_0-62-88 b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/.files-0000-0_20250105172945774.log.1_0-62-88 new file mode 100644 index 000000000000..d6acd63aa410 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/.files-0000-0_20250105172945774.log.1_0-62-88 differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/.hoodie_partition_metadata b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/.hoodie_partition_metadata new file mode 100644 index 000000000000..0c224b0b75e2 --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/.hoodie_partition_metadata @@ -0,0 +1,4 @@ +#partition metadata +#Sun Jan 05 17:29:42 PST 2025 +commitTime=00000000000000000 +partitionDepth=1 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/files-0000-0_0-9-8_00000000000000000.hfile b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/files-0000-0_0-9-8_00000000000000000.hfile new file mode 100644 index 000000000000..0ad8e6a3b795 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/metadata/files/files-0000-0_0-9-8_00000000000000000.hfile differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/.20250105172941176.deltacommit.inflight.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/.20250105172941176.deltacommit.inflight.crc new file mode 100644 index 000000000000..673919178699 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/.20250105172941176.deltacommit.inflight.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/.20250105172941176.deltacommit.requested.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/.20250105172941176.deltacommit.requested.crc new file mode 100644 index 000000000000..3b7b044936a8 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/.20250105172941176.deltacommit.requested.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/.20250105172941176_20250105172945475.deltacommit.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/.20250105172941176_20250105172945475.deltacommit.crc new file mode 100644 index 000000000000..ee8c2c36e4b3 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/.20250105172941176_20250105172945475.deltacommit.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/.20250105172945774.deltacommit.inflight.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/.20250105172945774.deltacommit.inflight.crc new file mode 100644 index 000000000000..7b2527074d22 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/.20250105172945774.deltacommit.inflight.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/.20250105172945774.deltacommit.requested.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/.20250105172945774.deltacommit.requested.crc new file mode 100644 index 000000000000..3b7b044936a8 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/.20250105172945774.deltacommit.requested.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/.20250105172945774_20250105172946758.deltacommit.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/.20250105172945774_20250105172946758.deltacommit.crc new file mode 100644 index 000000000000..1a5be7745f6a Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/.20250105172945774_20250105172946758.deltacommit.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/20250105172941176.deltacommit.inflight b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/20250105172941176.deltacommit.inflight new file mode 100644 index 000000000000..6062ab4aa67e Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/20250105172941176.deltacommit.inflight differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/20250105172941176.deltacommit.requested b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/20250105172941176.deltacommit.requested new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/20250105172941176_20250105172945475.deltacommit b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/20250105172941176_20250105172945475.deltacommit new file mode 100644 index 000000000000..c4578257a367 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/20250105172941176_20250105172945475.deltacommit differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/20250105172945774.deltacommit.inflight b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/20250105172945774.deltacommit.inflight new file mode 100644 index 000000000000..2d9c734fca1a Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/20250105172945774.deltacommit.inflight differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/20250105172945774.deltacommit.requested b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/20250105172945774.deltacommit.requested new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/20250105172945774_20250105172946758.deltacommit b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/20250105172945774_20250105172946758.deltacommit new file mode 100644 index 000000000000..03f90369929c Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/.hoodie/timeline/20250105172945774_20250105172946758.deltacommit differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/2018/08/31/..bf7392c4-e8a6-4e6a-8088-6744221940ca-0_20250105172945774.log.1_0-53-80.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/2018/08/31/..bf7392c4-e8a6-4e6a-8088-6744221940ca-0_20250105172945774.log.1_0-53-80.crc new file mode 100644 index 000000000000..f4b90ef3247c Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/2018/08/31/..bf7392c4-e8a6-4e6a-8088-6744221940ca-0_20250105172945774.log.1_0-53-80.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/2018/08/31/..hoodie_partition_metadata.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/2018/08/31/..hoodie_partition_metadata.crc new file mode 100644 index 000000000000..211f976ac24d Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/2018/08/31/..hoodie_partition_metadata.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/2018/08/31/.bf7392c4-e8a6-4e6a-8088-6744221940ca-0_0-25-38_20250105172941176.parquet.crc b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/2018/08/31/.bf7392c4-e8a6-4e6a-8088-6744221940ca-0_0-25-38_20250105172941176.parquet.crc new file mode 100644 index 000000000000..254fe7f6927a Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/2018/08/31/.bf7392c4-e8a6-4e6a-8088-6744221940ca-0_0-25-38_20250105172941176.parquet.crc differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/2018/08/31/.bf7392c4-e8a6-4e6a-8088-6744221940ca-0_20250105172945774.log.1_0-53-80 b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/2018/08/31/.bf7392c4-e8a6-4e6a-8088-6744221940ca-0_20250105172945774.log.1_0-53-80 new file mode 100644 index 000000000000..cd8bf7ba0f16 Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/2018/08/31/.bf7392c4-e8a6-4e6a-8088-6744221940ca-0_20250105172945774.log.1_0-53-80 differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/2018/08/31/.hoodie_partition_metadata b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/2018/08/31/.hoodie_partition_metadata new file mode 100644 index 000000000000..2f0577b32913 --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/2018/08/31/.hoodie_partition_metadata @@ -0,0 +1,4 @@ +#partition metadata +#Sun Jan 05 17:29:43 PST 2025 +commitTime=20250105172941176 +partitionDepth=3 diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/2018/08/31/bf7392c4-e8a6-4e6a-8088-6744221940ca-0_0-25-38_20250105172941176.parquet b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/2018/08/31/bf7392c4-e8a6-4e6a-8088-6744221940ca-0_0-25-38_20250105172941176.parquet new file mode 100644 index 000000000000..5498077b0e9f Binary files /dev/null and b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/2018/08/31/bf7392c4-e8a6-4e6a-8088-6744221940ca-0_0-25-38_20250105172941176.parquet differ diff --git a/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/README.md b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/README.md new file mode 100644 index 000000000000..7fdded06de19 --- /dev/null +++ b/plugin/trino-hudi/src/test/resources/hudi-testing-data/hudi_stock_ticks_mor/README.md @@ -0,0 +1,27 @@ +# Generating `hudi_stock_ticks_mor` table + +`hudi_stock_ticks_mor` is a partitioned Merge-on-Read (MOR) table. The table +data is generated following the steps for +Hudi's [docker demo](https://hudi.apache.org/docs/docker_demo) using Hudi +version 1.0.1. + +The table has the following data columns: + +| Name | Type | +|--------|---------| +| volume | bigint | +| ts | varchar | +| symbol | varchar | +| year | integer | +| month | varchar | +| high | double | +| low | double | +| key | varchar | +| date | varchar | +| close | double | +| open | double | +| day | varchar | + +The table is partitioned by `date`, and has a single partition at the +subdirectory `2018/08/31` with multiple file slices (an insert followed by +update).