Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/src/main/sphinx/connector/hudi.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ Additionally, following configuration properties can be set depending on the use
or `CAST(part_key AS INTEGER) % 2 = 0` are not recognized as partition filters,
and queries using such expressions fail if the property is set to `true`.
- `false`
* - `hudi.ignore-absent-partitions`
- Ignore partitions when the file system location does not exist rather than
failing the query. This skips data that may be expected to be part of the
table.
* - `hudi.metadata-enabled`
- Fetch the list of file names and sizes from Hudi metadata table. Improves
the files listing performance by avoiding direct storage calls while
building splits.
- `false`
Comment thread
codope marked this conversation as resolved.
Outdated

:::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private static Schema constructSchemaFromParts(List<String> columnNames, List<Hi

for (int i = 0; i < columnNames.size(); ++i) {
String comment = columnComments.size() > i ? columnComments.get(i) : null;
Schema fieldSchema = recordIncrementingUtil.avroSchemaForHiveType(columnTypes.get(i));
Schema fieldSchema = recordIncrementingUtil.avroSchemaForHiveTypeInternal(columnTypes.get(i));
fieldBuilder = fieldBuilder
.name(columnNames.get(i))
.doc(comment)
Expand All @@ -146,13 +146,19 @@ private static Schema constructSchemaFromParts(List<String> columnNames, List<Hi
return fieldBuilder.endRecord();
}

private Schema avroSchemaForHiveType(HiveType hiveType)
public static Schema avroSchemaFromHiveType(HiveType hiveType)
{
AvroHiveFileUtils utils = new AvroHiveFileUtils();
return utils.avroSchemaForHiveTypeInternal(hiveType);
}

private Schema avroSchemaForHiveTypeInternal(HiveType hiveType)
{
Schema schema = switch (hiveType.getCategory()) {
case PRIMITIVE -> createAvroPrimitive(hiveType);
case LIST -> {
ListTypeInfo listTypeInfo = (ListTypeInfo) hiveType.getTypeInfo();
yield Schema.createArray(avroSchemaForHiveType(HiveType.fromTypeInfo(listTypeInfo.getListElementTypeInfo())));
yield Schema.createArray(avroSchemaForHiveTypeInternal(HiveType.fromTypeInfo(listTypeInfo.getListElementTypeInfo())));
}
case MAP -> {
MapTypeInfo mapTypeInfo = ((MapTypeInfo) hiveType.getTypeInfo());
Expand All @@ -162,13 +168,13 @@ private Schema avroSchemaForHiveType(HiveType hiveType)
throw new UnsupportedOperationException("Key of Map must be a String");
}
TypeInfo valueTypeInfo = mapTypeInfo.getMapValueTypeInfo();
yield Schema.createMap(avroSchemaForHiveType(HiveType.fromTypeInfo(valueTypeInfo)));
yield Schema.createMap(avroSchemaForHiveTypeInternal(HiveType.fromTypeInfo(valueTypeInfo)));
}
case STRUCT -> createAvroRecord(hiveType);
case UNION -> {
List<Schema> childSchemas = new ArrayList<>();
for (TypeInfo childTypeInfo : ((UnionTypeInfo) hiveType.getTypeInfo()).getAllUnionObjectTypeInfos()) {
final Schema childSchema = avroSchemaForHiveType(HiveType.fromTypeInfo(childTypeInfo));
final Schema childSchema = avroSchemaForHiveTypeInternal(HiveType.fromTypeInfo(childTypeInfo));
if (childSchema.getType() == Schema.Type.UNION) {
childSchemas.addAll(childSchema.getTypes());
}
Expand Down Expand Up @@ -243,7 +249,7 @@ private Schema createAvroRecord(HiveType hiveType)

for (int i = 0; i < allStructFieldNames.size(); ++i) {
final TypeInfo childTypeInfo = allStructFieldTypeInfo.get(i);
final Schema fieldSchema = avroSchemaForHiveType(HiveType.fromTypeInfo(childTypeInfo));
final Schema fieldSchema = avroSchemaForHiveTypeInternal(HiveType.fromTypeInfo(childTypeInfo));
fieldAssembler = fieldAssembler
.name(allStructFieldNames.get(i))
.doc(childTypeInfo.toString())
Expand Down
22 changes: 15 additions & 7 deletions plugin/trino-hudi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
</properties>

<dependencies>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>4.0.2</version>
</dependency>

<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
Expand Down Expand Up @@ -215,13 +221,6 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
<version>4.0.3</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>configuration-testing</artifactId>
Expand Down Expand Up @@ -401,6 +400,15 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<!-- suppress “unused compile dependency” for Kryo -->
<ignoredUnusedDeclaredDependencies>com.esotericsoftware:kryo</ignoredUnusedDeclaredDependencies>
</configuration>
</plugin>

<plugin>
<groupId>org.basepom.maven</groupId>
<artifactId>duplicate-finder-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@

@DefunctConfig({
"hudi.min-partition-batch-size",
"hudi.max-partition-batch-size",
"hudi.metadata-enabled",
"hudi.max-partition-batch-size"
})
public class HudiConfig
{
Expand All @@ -47,7 +46,7 @@ public class HudiConfig
private int splitGeneratorParallelism = 4;
private long perTransactionMetastoreCacheMaximumSize = 2000;
private boolean queryPartitionFilterRequired;
private boolean ignoreAbsentPartitions;
private boolean metadataEnabled;

public List<String> getColumnsToHide()
{
Expand Down Expand Up @@ -205,15 +204,16 @@ public boolean isQueryPartitionFilterRequired()
return queryPartitionFilterRequired;
}

@Config("hudi.ignore-absent-partitions")
public HudiConfig setIgnoreAbsentPartitions(boolean ignoreAbsentPartitions)
@Config("hudi.metadata-enabled")
@ConfigDescription("Fetch the list of file names and sizes from Hudi metadata table rather than storage")
public HudiConfig setMetadataEnabled(boolean metadataEnabled)
{
this.ignoreAbsentPartitions = ignoreAbsentPartitions;
this.metadataEnabled = metadataEnabled;
return this;
}

public boolean isIgnoreAbsentPartitions()
public boolean isMetadataEnabled()
{
return ignoreAbsentPartitions;
return this.metadataEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public enum HudiErrorCode
HUDI_CURSOR_ERROR(6, EXTERNAL),
HUDI_FILESYSTEM_ERROR(7, EXTERNAL),
HUDI_PARTITION_NOT_FOUND(8, EXTERNAL),
// HUDI_UNSUPPORTED_TABLE_TYPE(9, EXTERNAL), // Unused. Could be mistaken with HUDI_UNKNOWN_TABLE_TYPE.
HUDI_UNSUPPORTED_TABLE_TYPE(9, EXTERNAL),
HUDI_NO_VALID_COMMIT(10, EXTERNAL)

/**/;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.metastore.Column;
import io.trino.metastore.HiveMetastore;
import io.trino.metastore.Table;
import io.trino.metastore.TableInfo;
import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hudi.storage.TrinoHudiStorage;
import io.trino.plugin.hudi.storage.TrinoStorageConfiguration;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
Expand All @@ -40,6 +43,9 @@
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TypeManager;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.storage.StoragePath;

import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -68,6 +74,7 @@
import static io.trino.plugin.hudi.HudiSessionProperties.isQueryPartitionFilterRequired;
import static io.trino.plugin.hudi.HudiTableProperties.LOCATION_PROPERTY;
import static io.trino.plugin.hudi.HudiTableProperties.PARTITIONED_BY_PROPERTY;
import static io.trino.plugin.hudi.HudiUtil.fromInputFormat;
import static io.trino.plugin.hudi.HudiUtil.hudiMetadataExists;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.StandardErrorCode.QUERY_REJECTED;
Expand All @@ -77,7 +84,6 @@
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;

public class HudiMetadata
implements ConnectorMetadata
Expand Down Expand Up @@ -119,15 +125,21 @@ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName
throw new TrinoException(UNSUPPORTED_TABLE_TYPE, format("Not a Hudi table: %s", tableName));
}
Location location = Location.of(table.get().getStorage().getLocation());
if (!hudiMetadataExists(fileSystemFactory.create(session), location)) {
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
if (!hudiMetadataExists(fileSystem, location)) {
throw new TrinoException(HUDI_BAD_DATA, "Location of table %s does not contain Hudi table metadata: %s".formatted(tableName, location));
}
StoragePath metaLocation = new StoragePath(table.get().getStorage().getLocation(), HoodieTableMetaClient.METAFOLDER_NAME);
HoodieTableConfig tableConfig = new HoodieTableConfig(new TrinoHudiStorage(fileSystem, new TrinoStorageConfiguration()), metaLocation, null, null, null);
String preCombineField = tableConfig.getPreCombineField();
String inputFormat = table.get().getStorage().getStorageFormat().getInputFormat();

return new HudiTableHandle(
tableName.getSchemaName(),
tableName.getTableName(),
table.get().getStorage().getLocation(),
COPY_ON_WRITE,
fromInputFormat(inputFormat),
preCombineField,
getPartitionKeyColumnHandles(table.get(), typeManager),
TupleDomain.all(),
TupleDomain.all());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hudi;

import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hudi.util.HudiAvroSerializer;
import io.trino.plugin.hudi.util.SynthesizedColumnHandler;
import io.trino.spi.Page;
import io.trino.spi.PageBuilder;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.SourcePage;
import io.trino.spi.metrics.Metrics;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;

import java.io.IOException;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

public class HudiPageSource
implements ConnectorPageSource
{
private final HoodieFileGroupReader<IndexedRecord> fileGroupReader;
private final ConnectorPageSource pageSource;
private final PageBuilder pageBuilder;
private final HudiAvroSerializer avroSerializer;

public HudiPageSource(
ConnectorPageSource pageSource,
HoodieFileGroupReader<IndexedRecord> fileGroupReader,
List<HiveColumnHandle> columnHandles,
SynthesizedColumnHandler synthesizedColumnHandler)
{
this.pageSource = requireNonNull(pageSource, "pageSource is null");
this.fileGroupReader = requireNonNull(fileGroupReader, "fileGroupReader is null");
this.initFileGroupReader();
this.pageBuilder = new PageBuilder(columnHandles.stream().map(HiveColumnHandle::getType).collect(toImmutableList()));
this.avroSerializer = new HudiAvroSerializer(columnHandles, synthesizedColumnHandler);
}

@Override
public long getCompletedBytes()
{
return pageSource.getCompletedBytes();
}

@Override
public OptionalLong getCompletedPositions()
{
return pageSource.getCompletedPositions();
}

@Override
public long getReadTimeNanos()
{
return pageSource.getReadTimeNanos();
}

@Override
public boolean isFinished()
{
try {
return !fileGroupReader.hasNext();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public SourcePage getNextSourcePage()
{
checkState(pageBuilder.isEmpty(), "PageBuilder is not empty at the beginning of a new page");
try {
while (fileGroupReader.hasNext()) {
avroSerializer.buildRecordInPage(pageBuilder, fileGroupReader.next());
}
}
catch (IOException e) {
throw new RuntimeException(e);
}

Page newPage = pageBuilder.build();
pageBuilder.reset();
return SourcePage.create(newPage);
}

@Override
public long getMemoryUsage()
{
return pageSource.getMemoryUsage();
}

@Override
public void close()
throws IOException
{
fileGroupReader.close();
pageSource.close();
}

@Override
public CompletableFuture<?> isBlocked()
{
return pageSource.isBlocked();
}

@Override
public Metrics getMetrics()
{
return pageSource.getMetrics();
}

protected void initFileGroupReader()
{
try {
this.fileGroupReader.initRecordIterators();
}
catch (IOException e) {
throw new RuntimeException("Failed to initialize file group reader!", e);
}
}
}
Loading
Loading