diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ee19108103f9..71c10c8829a4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -350,6 +350,7 @@ jobs: !:trino-ignite, !:trino-jdbc, !:trino-kafka, + !:trino-lakehouse, !:trino-main, !:trino-mariadb, !:trino-memory, @@ -470,6 +471,7 @@ jobs: - { modules: plugin/trino-iceberg, profile: minio-and-avro } - { modules: plugin/trino-ignite } - { modules: plugin/trino-kafka } + - { modules: plugin/trino-lakehouse } - { modules: plugin/trino-mariadb } - { modules: plugin/trino-mongodb } - { modules: plugin/trino-mysql } diff --git a/core/trino-server/src/main/provisio/trino.xml b/core/trino-server/src/main/provisio/trino.xml index 039862e382fd..97a366b39290 100644 --- a/core/trino-server/src/main/provisio/trino.xml +++ b/core/trino-server/src/main/provisio/trino.xml @@ -148,6 +148,15 @@ + + + + + + + + + diff --git a/docs/src/main/sphinx/connector.md b/docs/src/main/sphinx/connector.md index 2d9781a69616..a954b30cf059 100644 --- a/docs/src/main/sphinx/connector.md +++ b/docs/src/main/sphinx/connector.md @@ -25,6 +25,7 @@ Iceberg Ignite JMX Kafka +Lakehouse Loki MariaDB Memory diff --git a/docs/src/main/sphinx/connector/lakehouse.md b/docs/src/main/sphinx/connector/lakehouse.md new file mode 100644 index 000000000000..1c65e0f33c3f --- /dev/null +++ b/docs/src/main/sphinx/connector/lakehouse.md @@ -0,0 +1,105 @@ +# Lakehouse connector + +The Lakehouse connector provides a unified way to interact with data stored +in various table formats across different storage systems and metastore services. +This single connector allows you to query and write data seamlessly, regardless of +whether it's in Iceberg, Delta Lake, or Hudi table formats, or traditional Hive tables. + +This connector offers flexible connectivity to popular metastore services including +AWS Glue and Hive Metastore. For data storage, it supports a wide range of options +including cloud storage services such as AWS S3, S3-compatible storage, +Google Cloud Storage (GCS), and Azure Blob Storage, as well as HDFS installations. + +The connector combines the features of the +[Hive](/connector/hive), [Iceberg](/connector/iceberg), +[Delta Lake](/connector/delta-lake), and [Hudi](/connector/hudi) +connectors into a single connector. The configuration properties, +session properties, table properties, and beahvior come from the underlying +connectors. Please refer to the documentation for the underlying connectors +for the table formats that you are using. + +## General configuration + +To configure the Lakehouse connector, create a catalog properties file +`etc/catalog/example.properties` with the following content, replacing the +properties as appropriate: + +```text +connector.name=lakehouse +``` + +You must configure a [AWS Glue or a Hive metastore](/object-storage/metastores). +The `hive.metastore` property will also configure the Iceberg catalog. +Do not specify `iceberg.catalog.type`. + +You must select and configure one of the +[supported file systems](lakehouse-file-system-configuration). + +## Configuration properties + +The following configuration properties are available: + +:::{list-table} +:widths: 30, 58, 12 +:header-rows: 1 + +* - Property name + - Description + - Default +* - `lakehouse.table-type` + - The default table type for newly created tables when the `format` + table property is not specified. Possible values: + * `HIVE` + * `ICEBERG` + * `DELTA` + - `ICEBERG` +::: + +(lakehouse-file-system-configuration)= +## File system access configuration + +The connector supports accessing the following file systems: + +* [](/object-storage/file-system-azure) +* [](/object-storage/file-system-gcs) +* [](/object-storage/file-system-s3) +* [](/object-storage/file-system-hdfs) + +You must enable and configure the specific file system access. + +## Examples + +Create an Iceberg table: + +```sql +CREATE TABLE iceberg_table ( + c1 INTEGER, + c2 DATE, + c3 DOUBLE +) +WITH ( + type = 'ICEBERG' + format = 'PARQUET', + partitioning = ARRAY['c1', 'c2'], + sorted_by = ARRAY['c3'] +); +``` + +Create a Hive table: + +```sql +CREATE TABLE hive_page_views ( + view_time TIMESTAMP, + user_id BIGINT, + page_url VARCHAR, + ds DATE, + country VARCHAR +) +WITH ( + type = 'HIVE', + format = 'ORC', + partitioned_by = ARRAY['ds', 'country'], + bucketed_by = ARRAY['user_id'], + bucket_count = 50 +) +``` diff --git a/plugin/trino-lakehouse/pom.xml b/plugin/trino-lakehouse/pom.xml new file mode 100644 index 000000000000..c838e5c5fd4c --- /dev/null +++ b/plugin/trino-lakehouse/pom.xml @@ -0,0 +1,274 @@ + + + 4.0.0 + + + io.trino + trino-root + 477-SNAPSHOT + ../../pom.xml + + + trino-lakehouse + trino-plugin + Trino - Lakehouse connector + + + + com.google.guava + guava + + + org.jspecify + jspecify + + + + + + com.google.inject + guice + + + + io.airlift + bootstrap + + + + io.airlift + configuration + + + + io.airlift + json + + + + io.trino + trino-delta-lake + + + + io.trino + trino-filesystem-manager + + + + io.trino + trino-hive + + + + io.trino + trino-hudi + + + org.jspecify + jspecify + + + + + + io.trino + trino-iceberg + + + + io.trino + trino-metastore + + + + io.trino + trino-plugin-toolkit + + + + jakarta.validation + jakarta.validation-api + + + + org.weakref + jmxutils + + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + io.airlift + slice + provided + + + + io.opentelemetry + opentelemetry-api + provided + + + + io.opentelemetry + opentelemetry-api-incubator + provided + + + + io.opentelemetry + opentelemetry-context + provided + + + + io.trino + trino-spi + provided + + + + org.openjdk.jol + jol-core + provided + + + + io.airlift + log-manager + runtime + + + + io.airlift + configuration-testing + test + + + + io.airlift + http-server + test + + + + io.airlift + junit-extensions + test + + + + io.airlift + testing + test + + + + io.minio + minio + test + + + + io.trino + trino-hdfs + test + + + + io.trino + trino-hive + test-jar + test + + + + io.trino + trino-main + test + + + + io.trino + trino-main + test-jar + test + + + + io.trino + trino-spi + test-jar + test + + + + io.trino + trino-testing + test + + + + io.trino + trino-testing-containers + test + + + + io.trino + trino-testing-services + test + + + + io.trino + trino-tpcds + test + + + + io.trino + trino-tpch + test + + + + org.assertj + assertj-core + test + + + + org.junit.jupiter + junit-jupiter-api + test + + + + org.junit.jupiter + junit-jupiter-engine + test + + + + + + + org.basepom.maven + duplicate-finder-maven-plugin + + + iceberg-build.properties + mozilla/public-suffix-list.txt + mime.types + + + + + + diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConfig.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConfig.java new file mode 100644 index 000000000000..4d8bc3e1221b --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConfig.java @@ -0,0 +1,37 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import io.airlift.configuration.Config; +import jakarta.validation.constraints.NotNull; + +import static io.trino.plugin.lakehouse.TableType.ICEBERG; + +public class LakehouseConfig +{ + private TableType tableType = ICEBERG; + + @NotNull + public TableType getTableType() + { + return tableType; + } + + @Config("lakehouse.table-type") + public LakehouseConfig setTableType(TableType tableType) + { + this.tableType = tableType; + return this; + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java new file mode 100644 index 000000000000..8ee552c98930 --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java @@ -0,0 +1,162 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.inject.Inject; +import io.airlift.bootstrap.LifeCycleManager; +import io.trino.plugin.hive.HiveSchemaProperties; +import io.trino.plugin.iceberg.IcebergMaterializedViewProperties; +import io.trino.spi.connector.Connector; +import io.trino.spi.connector.ConnectorCapabilities; +import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorNodePartitioningProvider; +import io.trino.spi.connector.ConnectorPageSinkProvider; +import io.trino.spi.connector.ConnectorPageSourceProviderFactory; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplitManager; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.session.PropertyMetadata; +import io.trino.spi.transaction.IsolationLevel; + +import java.util.List; +import java.util.Set; + +import static com.google.common.collect.Sets.immutableEnumSet; +import static io.trino.spi.connector.ConnectorCapabilities.MATERIALIZED_VIEW_GRACE_PERIOD; +import static io.trino.spi.connector.ConnectorCapabilities.NOT_NULL_COLUMN_CONSTRAINT; +import static io.trino.spi.transaction.IsolationLevel.READ_UNCOMMITTED; +import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports; +import static java.util.Objects.requireNonNull; + +public class LakehouseConnector + implements Connector +{ + private final LifeCycleManager lifeCycleManager; + private final LakehouseTransactionManager transactionManager; + private final LakehouseSplitManager splitManager; + private final LakehousePageSourceProviderFactory pageSourceProviderFactory; + private final LakehousePageSinkProvider pageSinkProvider; + private final LakehouseNodePartitioningProvider nodePartitioningProvider; + private final LakehouseSessionProperties sessionProperties; + private final LakehouseTableProperties tableProperties; + private final IcebergMaterializedViewProperties materializedViewProperties; + + @Inject + public LakehouseConnector( + LifeCycleManager lifeCycleManager, + LakehouseTransactionManager transactionManager, + LakehouseSplitManager splitManager, + LakehousePageSourceProviderFactory pageSourceProviderFactory, + LakehousePageSinkProvider pageSinkProvider, + LakehouseNodePartitioningProvider nodePartitioningProvider, + LakehouseSessionProperties sessionProperties, + LakehouseTableProperties tableProperties, + IcebergMaterializedViewProperties materializedViewProperties) + { + this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null"); + this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); + this.splitManager = requireNonNull(splitManager, "splitManager is null"); + this.pageSourceProviderFactory = requireNonNull(pageSourceProviderFactory, "pageSourceProviderFactory is null"); + this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null"); + this.nodePartitioningProvider = requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null"); + this.sessionProperties = requireNonNull(sessionProperties, "sessionProperties is null"); + this.tableProperties = requireNonNull(tableProperties, "tableProperties is null"); + this.materializedViewProperties = requireNonNull(materializedViewProperties, "materializedViewProperties is null"); + } + + @Override + public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit) + { + checkConnectorSupports(READ_UNCOMMITTED, isolationLevel); + return transactionManager.begin(); + } + + @Override + public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transactionHandle) + { + return transactionManager.get(transactionHandle, session.getIdentity()); + } + + @Override + public ConnectorSplitManager getSplitManager() + { + return splitManager; + } + + @Override + public ConnectorPageSourceProviderFactory getPageSourceProviderFactory() + { + return pageSourceProviderFactory; + } + + @Override + public ConnectorPageSinkProvider getPageSinkProvider() + { + return pageSinkProvider; + } + + @Override + public ConnectorNodePartitioningProvider getNodePartitioningProvider() + { + return nodePartitioningProvider; + } + + @Override + public void commit(ConnectorTransactionHandle transactionHandle) + { + transactionManager.commit(transactionHandle); + } + + @Override + public void rollback(ConnectorTransactionHandle transactionHandle) + { + transactionManager.rollback(transactionHandle); + } + + @Override + public List> getSessionProperties() + { + return sessionProperties.getSessionProperties(); + } + + @Override + public List> getSchemaProperties() + { + return HiveSchemaProperties.SCHEMA_PROPERTIES; + } + + @Override + public List> getTableProperties() + { + return tableProperties.getTableProperties(); + } + + @Override + public List> getMaterializedViewProperties() + { + return materializedViewProperties.getMaterializedViewProperties(); + } + + @Override + public void shutdown() + { + lifeCycleManager.stop(); + } + + @Override + public Set getCapabilities() + { + return immutableEnumSet(NOT_NULL_COLUMN_CONSTRAINT, MATERIALIZED_VIEW_GRACE_PERIOD); + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnectorFactory.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnectorFactory.java new file mode 100644 index 000000000000..790757481b96 --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnectorFactory.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.inject.Injector; +import io.airlift.bootstrap.Bootstrap; +import io.airlift.json.JsonModule; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Tracer; +import io.trino.plugin.base.TypeDeserializerModule; +import io.trino.plugin.base.jmx.ConnectorObjectNameGeneratorModule; +import io.trino.plugin.base.jmx.MBeanServerModule; +import io.trino.plugin.hive.NodeVersion; +import io.trino.plugin.hive.security.HiveSecurityModule; +import io.trino.spi.NodeManager; +import io.trino.spi.PageIndexerFactory; +import io.trino.spi.PageSorter; +import io.trino.spi.VersionEmbedder; +import io.trino.spi.catalog.CatalogName; +import io.trino.spi.classloader.ThreadContextClassLoader; +import io.trino.spi.connector.CatalogHandle; +import io.trino.spi.connector.Connector; +import io.trino.spi.connector.ConnectorContext; +import io.trino.spi.connector.ConnectorFactory; +import io.trino.spi.connector.MetadataProvider; +import org.weakref.jmx.guice.MBeanModule; + +import java.util.Map; + +import static io.trino.plugin.base.Versions.checkStrictSpiVersionMatch; + +public class LakehouseConnectorFactory + implements ConnectorFactory +{ + @Override + public String getName() + { + return "lakehouse"; + } + + @Override + public Connector create(String catalogName, Map config, ConnectorContext context) + { + checkStrictSpiVersionMatch(context, this); + try (var _ = new ThreadContextClassLoader(getClass().getClassLoader())) { + Bootstrap app = new Bootstrap( + new MBeanModule(), + new MBeanServerModule(), + new ConnectorObjectNameGeneratorModule("io.trino.plugin", "trino.plugin"), + new JsonModule(), + new TypeDeserializerModule(context.getTypeManager()), + new LakehouseModule(), + new LakehouseHiveModule(), + new LakehouseIcebergModule(), + new LakehouseDeltaModule(), + new LakehouseHudiModule(), + new HiveSecurityModule(), + new LakehouseFileSystemModule(catalogName, context), + binder -> { + binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry()); + binder.bind(Tracer.class).toInstance(context.getTracer()); + binder.bind(NodeVersion.class).toInstance(new NodeVersion(context.getNodeManager().getCurrentNode().getVersion())); + binder.bind(NodeManager.class).toInstance(context.getNodeManager()); + binder.bind(VersionEmbedder.class).toInstance(context.getVersionEmbedder()); + binder.bind(MetadataProvider.class).toInstance(context.getMetadataProvider()); + binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory()); + binder.bind(CatalogHandle.class).toInstance(context.getCatalogHandle()); + binder.bind(CatalogName.class).toInstance(new CatalogName(catalogName)); + binder.bind(PageSorter.class).toInstance(context.getPageSorter()); + }); + + Injector injector = app + .doNotInitializeLogging() + .setRequiredConfigurationProperties(config) + .initialize(); + + return injector.getInstance(LakehouseConnector.class); + } + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseDeltaModule.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseDeltaModule.java new file mode 100644 index 000000000000..cd30d99e515a --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseDeltaModule.java @@ -0,0 +1,111 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.inject.Binder; +import com.google.inject.Scopes; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.deltalake.DataFileInfo; +import io.trino.plugin.deltalake.DeltaLakeConfig; +import io.trino.plugin.deltalake.DeltaLakeExecutorModule; +import io.trino.plugin.deltalake.DeltaLakeMergeResult; +import io.trino.plugin.deltalake.DeltaLakeMetadataFactory; +import io.trino.plugin.deltalake.DeltaLakeNodePartitioningProvider; +import io.trino.plugin.deltalake.DeltaLakePageSinkProvider; +import io.trino.plugin.deltalake.DeltaLakePageSourceProvider; +import io.trino.plugin.deltalake.DeltaLakeSessionProperties; +import io.trino.plugin.deltalake.DeltaLakeSplitManager; +import io.trino.plugin.deltalake.DeltaLakeSynchronizerModule; +import io.trino.plugin.deltalake.DeltaLakeTableProperties; +import io.trino.plugin.deltalake.DeltaLakeTransactionManager; +import io.trino.plugin.deltalake.DeltaLakeWriterStats; +import io.trino.plugin.deltalake.metastore.DeltaLakeMetastoreModule; +import io.trino.plugin.deltalake.metastore.DeltaLakeTableMetadataScheduler; +import io.trino.plugin.deltalake.metastore.file.DeltaLakeFileMetastoreModule; +import io.trino.plugin.deltalake.metastore.glue.DeltaLakeGlueMetastoreModule; +import io.trino.plugin.deltalake.metastore.thrift.DeltaLakeThriftMetastoreModule; +import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess; +import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess.ForCachingExtendedStatisticsAccess; +import io.trino.plugin.deltalake.statistics.ExtendedStatistics; +import io.trino.plugin.deltalake.statistics.ExtendedStatisticsAccess; +import io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess; +import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; +import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointSchemaManager; +import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriterManager; +import io.trino.plugin.deltalake.transactionlog.checkpoint.LastCheckpoint; +import io.trino.plugin.deltalake.transactionlog.reader.FileSystemTransactionLogReaderFactory; +import io.trino.plugin.deltalake.transactionlog.reader.TransactionLogReaderFactory; +import io.trino.plugin.deltalake.transactionlog.writer.FileSystemTransactionLogWriterFactory; +import io.trino.plugin.deltalake.transactionlog.writer.NoIsolationSynchronizer; +import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogSynchronizerManager; +import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriterFactory; +import io.trino.plugin.hive.metastore.MetastoreTypeConfig; + +import static io.airlift.configuration.ConfigBinder.configBinder; +import static io.airlift.json.JsonCodecBinder.jsonCodecBinder; +import static org.weakref.jmx.guice.ExportBinder.newExporter; + +public class LakehouseDeltaModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + install(new DeltaLakeSynchronizerModule()); + install(new DeltaLakeMetastoreModule()); + + configBinder(binder).bindConfig(DeltaLakeConfig.class); + + binder.bind(DeltaLakeNodePartitioningProvider.class).in(Scopes.SINGLETON); + binder.bind(DeltaLakePageSinkProvider.class).in(Scopes.SINGLETON); + binder.bind(DeltaLakePageSourceProvider.class).in(Scopes.SINGLETON); + binder.bind(DeltaLakeSessionProperties.class).in(Scopes.SINGLETON); + binder.bind(DeltaLakeSplitManager.class).in(Scopes.SINGLETON); + binder.bind(DeltaLakeTableProperties.class).in(Scopes.SINGLETON); + + binder.bind(DeltaLakeTransactionManager.class).in(Scopes.SINGLETON); + binder.bind(DeltaLakeMetadataFactory.class).in(Scopes.SINGLETON); + binder.bind(DeltaLakeWriterStats.class).in(Scopes.SINGLETON); + binder.bind(CheckpointSchemaManager.class).in(Scopes.SINGLETON); + binder.bind(CheckpointWriterManager.class).in(Scopes.SINGLETON); + binder.bind(TransactionLogAccess.class).in(Scopes.SINGLETON); + binder.bind(TransactionLogReaderFactory.class).to(FileSystemTransactionLogReaderFactory.class).in(Scopes.SINGLETON); + binder.bind(TransactionLogWriterFactory.class).to(FileSystemTransactionLogWriterFactory.class).in(Scopes.SINGLETON); + binder.bind(TransactionLogSynchronizerManager.class).in(Scopes.SINGLETON); + binder.bind(NoIsolationSynchronizer.class).in(Scopes.SINGLETON); + + binder.bind(CachingExtendedStatisticsAccess.class).in(Scopes.SINGLETON); + binder.bind(ExtendedStatisticsAccess.class).to(CachingExtendedStatisticsAccess.class).in(Scopes.SINGLETON); + binder.bind(ExtendedStatisticsAccess.class).annotatedWith(ForCachingExtendedStatisticsAccess.class).to(MetaDirStatisticsAccess.class).in(Scopes.SINGLETON); + + binder.bind(TransactionLogAccess.class).in(Scopes.SINGLETON); + newExporter(binder).export(TransactionLogAccess.class).withGeneratedName(); + + binder.bind(DeltaLakeTableMetadataScheduler.class).in(Scopes.SINGLETON); + newExporter(binder).export(DeltaLakeTableMetadataScheduler.class).withGeneratedName(); + + jsonCodecBinder(binder).bindJsonCodec(DataFileInfo.class); + jsonCodecBinder(binder).bindJsonCodec(DeltaLakeMergeResult.class); + jsonCodecBinder(binder).bindJsonCodec(ExtendedStatistics.class); + jsonCodecBinder(binder).bindJsonCodec(LastCheckpoint.class); + + install(switch (buildConfigObject(MetastoreTypeConfig.class).getMetastoreType()) { + case THRIFT -> new DeltaLakeThriftMetastoreModule(); + case FILE -> new DeltaLakeFileMetastoreModule(); + case GLUE -> new DeltaLakeGlueMetastoreModule(); + }); + + binder.install(new DeltaLakeExecutorModule()); + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseFileSystemModule.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseFileSystemModule.java new file mode 100644 index 000000000000..ad151824f0ce --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseFileSystemModule.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.inject.Binder; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.opentelemetry.api.OpenTelemetry; +import io.trino.filesystem.manager.FileSystemModule; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.spi.NodeManager; +import io.trino.spi.connector.ConnectorContext; + +import static java.util.Objects.requireNonNull; + +class LakehouseFileSystemModule + extends AbstractConfigurationAwareModule +{ + private final String catalogName; + private final NodeManager nodeManager; + private final OpenTelemetry openTelemetry; + + public LakehouseFileSystemModule(String catalogName, ConnectorContext context) + { + this.catalogName = requireNonNull(catalogName, "catalogName is null"); + this.nodeManager = context.getNodeManager(); + this.openTelemetry = context.getOpenTelemetry(); + } + + @Override + protected void setup(Binder binder) + { + boolean metadataCacheEnabled = buildConfigObject(IcebergConfig.class).isMetadataCacheEnabled(); + install(new FileSystemModule(catalogName, nodeManager, openTelemetry, metadataCacheEnabled)); + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseHiveModule.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseHiveModule.java new file mode 100644 index 000000000000..2892a81d0116 --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseHiveModule.java @@ -0,0 +1,140 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.inject.Binder; +import com.google.inject.Scopes; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.hive.HiveConfig; +import io.trino.plugin.hive.HiveExecutorModule; +import io.trino.plugin.hive.HiveFileWriterFactory; +import io.trino.plugin.hive.HiveLocationService; +import io.trino.plugin.hive.HiveMetadataFactory; +import io.trino.plugin.hive.HiveNodePartitioningProvider; +import io.trino.plugin.hive.HivePageSinkProvider; +import io.trino.plugin.hive.HivePageSourceFactory; +import io.trino.plugin.hive.HivePageSourceProvider; +import io.trino.plugin.hive.HivePartitionManager; +import io.trino.plugin.hive.HiveSessionProperties; +import io.trino.plugin.hive.HiveSplitManager; +import io.trino.plugin.hive.HiveTableProperties; +import io.trino.plugin.hive.HiveTransactionManager; +import io.trino.plugin.hive.HiveWriterStats; +import io.trino.plugin.hive.LocationService; +import io.trino.plugin.hive.PartitionUpdate; +import io.trino.plugin.hive.PartitionsSystemTableProvider; +import io.trino.plugin.hive.PropertiesSystemTableProvider; +import io.trino.plugin.hive.RcFileFileWriterFactory; +import io.trino.plugin.hive.SystemTableProvider; +import io.trino.plugin.hive.TransactionalMetadataFactory; +import io.trino.plugin.hive.avro.AvroFileWriterFactory; +import io.trino.plugin.hive.avro.AvroPageSourceFactory; +import io.trino.plugin.hive.fs.CachingDirectoryLister; +import io.trino.plugin.hive.fs.DirectoryLister; +import io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryListerFactory; +import io.trino.plugin.hive.line.CsvFileWriterFactory; +import io.trino.plugin.hive.line.CsvPageSourceFactory; +import io.trino.plugin.hive.line.JsonFileWriterFactory; +import io.trino.plugin.hive.line.JsonPageSourceFactory; +import io.trino.plugin.hive.line.OpenXJsonFileWriterFactory; +import io.trino.plugin.hive.line.OpenXJsonPageSourceFactory; +import io.trino.plugin.hive.line.RegexFileWriterFactory; +import io.trino.plugin.hive.line.RegexPageSourceFactory; +import io.trino.plugin.hive.line.SimpleSequenceFilePageSourceFactory; +import io.trino.plugin.hive.line.SimpleSequenceFileWriterFactory; +import io.trino.plugin.hive.line.SimpleTextFilePageSourceFactory; +import io.trino.plugin.hive.line.SimpleTextFileWriterFactory; +import io.trino.plugin.hive.metastore.HiveMetastoreConfig; +import io.trino.plugin.hive.metastore.HiveMetastoreModule; +import io.trino.plugin.hive.orc.OrcFileWriterFactory; +import io.trino.plugin.hive.orc.OrcPageSourceFactory; +import io.trino.plugin.hive.parquet.ParquetFileWriterFactory; +import io.trino.plugin.hive.parquet.ParquetPageSourceFactory; +import io.trino.plugin.hive.rcfile.RcFilePageSourceFactory; + +import java.util.Optional; + +import static com.google.inject.multibindings.Multibinder.newSetBinder; +import static io.airlift.configuration.ConfigBinder.configBinder; +import static io.airlift.json.JsonCodecBinder.jsonCodecBinder; +import static org.weakref.jmx.guice.ExportBinder.newExporter; + +class LakehouseHiveModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + install(new HiveMetastoreModule(Optional.empty())); + + configBinder(binder).bindConfig(HiveConfig.class); + configBinder(binder).bindConfig(HiveMetastoreConfig.class); + + binder.bind(HiveNodePartitioningProvider.class).in(Scopes.SINGLETON); + binder.bind(HivePageSinkProvider.class).in(Scopes.SINGLETON); + binder.bind(HivePageSourceProvider.class).in(Scopes.SINGLETON); + binder.bind(HiveSessionProperties.class).in(Scopes.SINGLETON); + binder.bind(HiveSplitManager.class).in(Scopes.SINGLETON); + binder.bind(HiveTableProperties.class).in(Scopes.SINGLETON); + + binder.bind(HiveTransactionManager.class).in(Scopes.SINGLETON); + binder.bind(HiveMetadataFactory.class).in(Scopes.SINGLETON); + binder.bind(TransactionalMetadataFactory.class).to(HiveMetadataFactory.class).in(Scopes.SINGLETON); + binder.bind(HivePartitionManager.class).in(Scopes.SINGLETON); + binder.bind(LocationService.class).to(HiveLocationService.class).in(Scopes.SINGLETON); + binder.bind(TransactionScopeCachingDirectoryListerFactory.class).in(Scopes.SINGLETON); + + jsonCodecBinder(binder).bindJsonCodec(PartitionUpdate.class); + + binder.bind(HiveWriterStats.class).in(Scopes.SINGLETON); + newExporter(binder).export(HiveWriterStats.class).withGeneratedName(); + + binder.bind(CachingDirectoryLister.class).in(Scopes.SINGLETON); + newExporter(binder).export(CachingDirectoryLister.class).withGeneratedName(); + binder.bind(DirectoryLister.class).to(CachingDirectoryLister.class).in(Scopes.SINGLETON); + + binder.bind(OrcFileWriterFactory.class).in(Scopes.SINGLETON); + newExporter(binder).export(OrcFileWriterFactory.class).withGeneratedName(); + + var systemTableProviders = newSetBinder(binder, SystemTableProvider.class); + systemTableProviders.addBinding().to(PartitionsSystemTableProvider.class).in(Scopes.SINGLETON); + systemTableProviders.addBinding().to(PropertiesSystemTableProvider.class).in(Scopes.SINGLETON); + + var pageSourceFactoryBinder = newSetBinder(binder, HivePageSourceFactory.class); + pageSourceFactoryBinder.addBinding().to(CsvPageSourceFactory.class).in(Scopes.SINGLETON); + pageSourceFactoryBinder.addBinding().to(JsonPageSourceFactory.class).in(Scopes.SINGLETON); + pageSourceFactoryBinder.addBinding().to(OpenXJsonPageSourceFactory.class).in(Scopes.SINGLETON); + pageSourceFactoryBinder.addBinding().to(RegexPageSourceFactory.class).in(Scopes.SINGLETON); + pageSourceFactoryBinder.addBinding().to(SimpleTextFilePageSourceFactory.class).in(Scopes.SINGLETON); + pageSourceFactoryBinder.addBinding().to(SimpleSequenceFilePageSourceFactory.class).in(Scopes.SINGLETON); + pageSourceFactoryBinder.addBinding().to(OrcPageSourceFactory.class).in(Scopes.SINGLETON); + pageSourceFactoryBinder.addBinding().to(ParquetPageSourceFactory.class).in(Scopes.SINGLETON); + pageSourceFactoryBinder.addBinding().to(RcFilePageSourceFactory.class).in(Scopes.SINGLETON); + pageSourceFactoryBinder.addBinding().to(AvroPageSourceFactory.class).in(Scopes.SINGLETON); + + var fileWriterFactoryBinder = newSetBinder(binder, HiveFileWriterFactory.class); + fileWriterFactoryBinder.addBinding().to(CsvFileWriterFactory.class).in(Scopes.SINGLETON); + fileWriterFactoryBinder.addBinding().to(JsonFileWriterFactory.class).in(Scopes.SINGLETON); + fileWriterFactoryBinder.addBinding().to(RegexFileWriterFactory.class).in(Scopes.SINGLETON); + fileWriterFactoryBinder.addBinding().to(OpenXJsonFileWriterFactory.class).in(Scopes.SINGLETON); + fileWriterFactoryBinder.addBinding().to(SimpleTextFileWriterFactory.class).in(Scopes.SINGLETON); + fileWriterFactoryBinder.addBinding().to(SimpleSequenceFileWriterFactory.class).in(Scopes.SINGLETON); + fileWriterFactoryBinder.addBinding().to(OrcFileWriterFactory.class).in(Scopes.SINGLETON); + fileWriterFactoryBinder.addBinding().to(RcFileFileWriterFactory.class).in(Scopes.SINGLETON); + fileWriterFactoryBinder.addBinding().to(AvroFileWriterFactory.class).in(Scopes.SINGLETON); + fileWriterFactoryBinder.addBinding().to(ParquetFileWriterFactory.class).in(Scopes.SINGLETON); + + binder.install(new HiveExecutorModule()); + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseHudiModule.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseHudiModule.java new file mode 100644 index 000000000000..8861c369885b --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseHudiModule.java @@ -0,0 +1,48 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.inject.Binder; +import com.google.inject.Scopes; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.hudi.HudiConfig; +import io.trino.plugin.hudi.HudiExecutorModule; +import io.trino.plugin.hudi.HudiMetadataFactory; +import io.trino.plugin.hudi.HudiPageSourceProvider; +import io.trino.plugin.hudi.HudiSessionProperties; +import io.trino.plugin.hudi.HudiSplitManager; +import io.trino.plugin.hudi.HudiTableProperties; +import io.trino.plugin.hudi.HudiTransactionManager; + +import static io.airlift.configuration.ConfigBinder.configBinder; + +public class LakehouseHudiModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + configBinder(binder).bindConfig(HudiConfig.class); + + binder.bind(HudiPageSourceProvider.class).in(Scopes.SINGLETON); + binder.bind(HudiSessionProperties.class).in(Scopes.SINGLETON); + binder.bind(HudiSplitManager.class).in(Scopes.SINGLETON); + binder.bind(HudiTableProperties.class).in(Scopes.SINGLETON); + + binder.bind(HudiTransactionManager.class).in(Scopes.SINGLETON); + binder.bind(HudiMetadataFactory.class).in(Scopes.SINGLETON); + + binder.install(new HudiExecutorModule()); + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java new file mode 100644 index 000000000000..ffaffb8fb1fd --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java @@ -0,0 +1,81 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.inject.Binder; +import com.google.inject.Key; +import com.google.inject.Scopes; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.metastore.HiveMetastoreFactory; +import io.trino.metastore.RawHiveMetastoreFactory; +import io.trino.plugin.hive.metastore.MetastoreTypeConfig; +import io.trino.plugin.iceberg.CommitTaskData; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.IcebergExecutorModule; +import io.trino.plugin.iceberg.IcebergFileSystemFactory; +import io.trino.plugin.iceberg.IcebergFileWriterFactory; +import io.trino.plugin.iceberg.IcebergMaterializedViewProperties; +import io.trino.plugin.iceberg.IcebergMetadataFactory; +import io.trino.plugin.iceberg.IcebergNodePartitioningProvider; +import io.trino.plugin.iceberg.IcebergPageSinkProvider; +import io.trino.plugin.iceberg.IcebergPageSourceProviderFactory; +import io.trino.plugin.iceberg.IcebergSessionProperties; +import io.trino.plugin.iceberg.IcebergSplitManager; +import io.trino.plugin.iceberg.IcebergTableProperties; +import io.trino.plugin.iceberg.IcebergTransactionManager; +import io.trino.plugin.iceberg.TableStatisticsWriter; +import io.trino.plugin.iceberg.catalog.file.IcebergFileMetastoreCatalogModule; +import io.trino.plugin.iceberg.catalog.glue.IcebergGlueCatalogModule; +import io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule; +import io.trino.plugin.iceberg.catalog.rest.DefaultIcebergFileSystemFactory; + +import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; +import static io.airlift.configuration.ConfigBinder.configBinder; +import static io.airlift.json.JsonCodecBinder.jsonCodecBinder; + +public class LakehouseIcebergModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + configBinder(binder).bindConfig(IcebergConfig.class); + + binder.bind(IcebergNodePartitioningProvider.class).in(Scopes.SINGLETON); + binder.bind(IcebergPageSinkProvider.class).in(Scopes.SINGLETON); + binder.bind(IcebergPageSourceProviderFactory.class).in(Scopes.SINGLETON); + binder.bind(IcebergSessionProperties.class).in(Scopes.SINGLETON); + binder.bind(IcebergSplitManager.class).in(Scopes.SINGLETON); + binder.bind(IcebergTableProperties.class).in(Scopes.SINGLETON); + binder.bind(IcebergMaterializedViewProperties.class).in(Scopes.SINGLETON); + + binder.bind(IcebergTransactionManager.class).in(Scopes.SINGLETON); + binder.bind(IcebergMetadataFactory.class).in(Scopes.SINGLETON); + binder.bind(IcebergFileWriterFactory.class).in(Scopes.SINGLETON); + binder.bind(TableStatisticsWriter.class).in(Scopes.SINGLETON); + binder.bind(IcebergFileSystemFactory.class).to(DefaultIcebergFileSystemFactory.class).in(Scopes.SINGLETON); + + newOptionalBinder(binder, Key.get(HiveMetastoreFactory.class, RawHiveMetastoreFactory.class)); + + jsonCodecBinder(binder).bindJsonCodec(CommitTaskData.class); + + install(switch (buildConfigObject(MetastoreTypeConfig.class).getMetastoreType()) { + case THRIFT -> new IcebergHiveMetastoreCatalogModule(); + case FILE -> new IcebergFileMetastoreCatalogModule(); + case GLUE -> new IcebergGlueCatalogModule(); + }); + + binder.install(new IcebergExecutorModule()); + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java new file mode 100644 index 000000000000..99cb65cff0e3 --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java @@ -0,0 +1,1030 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.common.collect.Iterators; +import io.airlift.slice.Slice; +import io.trino.metastore.Table; +import io.trino.plugin.deltalake.DeltaLakeInsertTableHandle; +import io.trino.plugin.deltalake.DeltaLakeMergeTableHandle; +import io.trino.plugin.deltalake.DeltaLakeMetadata; +import io.trino.plugin.deltalake.DeltaLakeOutputTableHandle; +import io.trino.plugin.deltalake.DeltaLakePartitioningHandle; +import io.trino.plugin.deltalake.DeltaLakeTableHandle; +import io.trino.plugin.deltalake.procedure.DeltaLakeTableExecuteHandle; +import io.trino.plugin.hive.HiveInsertTableHandle; +import io.trino.plugin.hive.HiveMergeTableHandle; +import io.trino.plugin.hive.HiveOutputTableHandle; +import io.trino.plugin.hive.HivePartitioningHandle; +import io.trino.plugin.hive.HiveTableExecuteHandle; +import io.trino.plugin.hive.HiveTableHandle; +import io.trino.plugin.hive.TransactionalMetadata; +import io.trino.plugin.hudi.HudiMetadata; +import io.trino.plugin.hudi.HudiTableHandle; +import io.trino.plugin.iceberg.IcebergMergeTableHandle; +import io.trino.plugin.iceberg.IcebergMetadata; +import io.trino.plugin.iceberg.IcebergPartitioningHandle; +import io.trino.plugin.iceberg.IcebergTableHandle; +import io.trino.plugin.iceberg.IcebergWritableTableHandle; +import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle; +import io.trino.spi.RefreshType; +import io.trino.spi.connector.AggregateFunction; +import io.trino.spi.connector.AggregationApplicationResult; +import io.trino.spi.connector.BeginTableExecuteResult; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ColumnPosition; +import io.trino.spi.connector.ConnectorAccessControl; +import io.trino.spi.connector.ConnectorAnalyzeMetadata; +import io.trino.spi.connector.ConnectorInsertTableHandle; +import io.trino.spi.connector.ConnectorMaterializedViewDefinition; +import io.trino.spi.connector.ConnectorMergeTableHandle; +import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorOutputMetadata; +import io.trino.spi.connector.ConnectorOutputTableHandle; +import io.trino.spi.connector.ConnectorPartitioningHandle; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.ConnectorTableLayout; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.ConnectorTableProperties; +import io.trino.spi.connector.ConnectorTableSchema; +import io.trino.spi.connector.ConnectorTableVersion; +import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.Constraint; +import io.trino.spi.connector.ConstraintApplicationResult; +import io.trino.spi.connector.LimitApplicationResult; +import io.trino.spi.connector.MaterializedViewFreshness; +import io.trino.spi.connector.ProjectionApplicationResult; +import io.trino.spi.connector.RelationColumnsMetadata; +import io.trino.spi.connector.RelationCommentMetadata; +import io.trino.spi.connector.RelationType; +import io.trino.spi.connector.RetryMode; +import io.trino.spi.connector.RowChangeParadigm; +import io.trino.spi.connector.SampleApplicationResult; +import io.trino.spi.connector.SampleType; +import io.trino.spi.connector.SaveMode; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.SchemaTablePrefix; +import io.trino.spi.connector.SortItem; +import io.trino.spi.connector.SystemTable; +import io.trino.spi.connector.TableColumnsMetadata; +import io.trino.spi.connector.TopNApplicationResult; +import io.trino.spi.connector.WriterScalingOptions; +import io.trino.spi.expression.ConnectorExpression; +import io.trino.spi.expression.Constant; +import io.trino.spi.function.LanguageFunction; +import io.trino.spi.function.SchemaFunctionName; +import io.trino.spi.security.GrantInfo; +import io.trino.spi.security.Privilege; +import io.trino.spi.security.RoleGrant; +import io.trino.spi.security.TrinoPrincipal; +import io.trino.spi.statistics.ComputedStatistics; +import io.trino.spi.statistics.TableStatistics; +import io.trino.spi.statistics.TableStatisticsMetadata; +import io.trino.spi.type.Type; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.function.UnaryOperator; + +import static io.trino.plugin.hive.util.HiveUtil.isDeltaLakeTable; +import static io.trino.plugin.hive.util.HiveUtil.isHudiTable; +import static io.trino.plugin.hive.util.HiveUtil.isIcebergTable; +import static io.trino.plugin.iceberg.IcebergTableName.isIcebergTableName; +import static io.trino.plugin.iceberg.IcebergTableName.isMaterializedViewStorage; +import static io.trino.plugin.lakehouse.LakehouseTableProperties.getTableType; +import static java.util.Objects.requireNonNull; + +public class LakehouseMetadata + implements ConnectorMetadata +{ + private final LakehouseTableProperties tableProperties; + private final TransactionalMetadata hiveMetadata; + private final IcebergMetadata icebergMetadata; + private final DeltaLakeMetadata deltaMetadata; + private final HudiMetadata hudiMetadata; + + public LakehouseMetadata( + LakehouseTableProperties tableProperties, + TransactionalMetadata hiveMetadata, + IcebergMetadata icebergMetadata, + DeltaLakeMetadata deltaMetadata, + HudiMetadata hudiMetadata) + { + this.tableProperties = requireNonNull(tableProperties, "tableProperties is null"); + this.hiveMetadata = requireNonNull(hiveMetadata, "hiveMetadata is null"); + this.icebergMetadata = requireNonNull(icebergMetadata, "icebergMetadata is null"); + this.deltaMetadata = requireNonNull(deltaMetadata, "deltaMetadata is null"); + this.hudiMetadata = requireNonNull(hudiMetadata, "hudiMetadata is null"); + } + + @Override + public boolean schemaExists(ConnectorSession session, String schemaName) + { + return hiveMetadata.schemaExists(session, schemaName); + } + + @Override + public List listSchemaNames(ConnectorSession session) + { + return hiveMetadata.listSchemaNames(session); + } + + @Override + public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName, Optional startVersion, Optional endVersion) + { + if (isIcebergTableName(tableName.getTableName()) && isMaterializedViewStorage(tableName.getTableName())) { + return icebergMetadata.getTableHandle(session, tableName, startVersion, endVersion); + } + + Table table = hiveMetadata.getMetastore() + .getTable(tableName.getSchemaName(), tableName.getTableName()) + .orElse(null); + if (table == null) { + return null; + } + if (isIcebergTable(table)) { + return icebergMetadata.getTableHandle(session, tableName, startVersion, endVersion); + } + if (isDeltaLakeTable(table)) { + return deltaMetadata.getTableHandle(session, tableName, startVersion, endVersion); + } + if (isHudiTable(table)) { + return hudiMetadata.getTableHandle(session, tableName, startVersion, endVersion); + } + return hiveMetadata.getTableHandle(session, tableName, startVersion, endVersion); + } + + @Override + public Optional getTableHandleForExecute(ConnectorSession session, ConnectorAccessControl accessControl, ConnectorTableHandle tableHandle, String procedureName, Map executeProperties, RetryMode retryMode) + { + return forHandle(tableHandle).getTableHandleForExecute(session, accessControl, tableHandle, procedureName, executeProperties, retryMode); + } + + @Override + public Optional getLayoutForTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) + { + return forHandle(tableExecuteHandle).getLayoutForTableExecute(session, tableExecuteHandle); + } + + @Override + public BeginTableExecuteResult beginTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, ConnectorTableHandle updatedSourceTableHandle) + { + return forHandle(tableExecuteHandle).beginTableExecute(session, tableExecuteHandle, updatedSourceTableHandle); + } + + @Override + public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection fragments, List tableExecuteState) + { + forHandle(tableExecuteHandle).finishTableExecute(session, tableExecuteHandle, fragments, tableExecuteState); + } + + @Override + public void executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) + { + forHandle(tableExecuteHandle).executeTableExecute(session, tableExecuteHandle); + } + + @Override + public Optional getSystemTable(ConnectorSession session, SchemaTableName tableName) + { + return hiveMetadata.getSystemTable(session, tableName) + .or(() -> icebergMetadata.getSystemTable(session, tableName)) + .or(() -> deltaMetadata.getSystemTable(session, tableName)) + .or(() -> hudiMetadata.getSystemTable(session, tableName)); + } + + @Override + public Optional applyPartitioning(ConnectorSession session, ConnectorTableHandle tableHandle, Optional partitioningHandle, List columns) + { + return forHandle(tableHandle).applyPartitioning(session, tableHandle, partitioningHandle, columns); + } + + @Override + public Optional getCommonPartitioningHandle(ConnectorSession session, ConnectorPartitioningHandle left, ConnectorPartitioningHandle right) + { + if (left.getClass() != right.getClass()) { + return Optional.empty(); + } + return forHandle(left).getCommonPartitioningHandle(session, left, right); + } + + @Override + public SchemaTableName getTableName(ConnectorSession session, ConnectorTableHandle table) + { + return forHandle(table).getTableName(session, table); + } + + @Override + public ConnectorTableSchema getTableSchema(ConnectorSession session, ConnectorTableHandle table) + { + return forHandle(table).getTableSchema(session, table); + } + + @Override + public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) + { + return wrapTableMetadata(tableTypeForHandle(table), forHandle(table).getTableMetadata(session, table)); + } + + @Override + public Optional getInfo(ConnectorSession session, ConnectorTableHandle table) + { + return forHandle(table).getInfo(session, table); + } + + @Override + public List listTables(ConnectorSession session, Optional schemaName) + { + return hiveMetadata.listTables(session, schemaName); + } + + @Override + public Map getRelationTypes(ConnectorSession session, Optional schemaName) + { + return hiveMetadata.getRelationTypes(session, schemaName); + } + + @Override + public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return forHandle(tableHandle).getColumnHandles(session, tableHandle); + } + + @Override + public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) + { + return forHandle(tableHandle).getColumnMetadata(session, tableHandle, columnHandle); + } + + @Override + public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) + { + throw new UnsupportedOperationException("The deprecated listTableColumns is not supported because streamTableColumns is implemented instead"); + } + + @Override + public Iterator streamTableColumns(ConnectorSession session, SchemaTablePrefix prefix) + { + throw new UnsupportedOperationException("The deprecated streamTableColumns is not supported because streamRelationColumns is implemented instead"); + } + + @Override + public Iterator streamRelationColumns(ConnectorSession session, Optional schemaName, UnaryOperator> relationFilter) + { + return Iterators.concat( + hiveMetadata.streamRelationColumns(session, schemaName, relationFilter), + icebergMetadata.streamRelationColumns(session, schemaName, relationFilter), + deltaMetadata.streamRelationColumns(session, schemaName, relationFilter), + hudiMetadata.streamRelationColumns(session, schemaName, relationFilter)); + } + + @Override + public Iterator streamRelationComments(ConnectorSession session, Optional schemaName, UnaryOperator> relationFilter) + { + return Iterators.concat( + hiveMetadata.streamRelationComments(session, schemaName, relationFilter), + icebergMetadata.streamRelationComments(session, schemaName, relationFilter), + deltaMetadata.streamRelationComments(session, schemaName, relationFilter), + hudiMetadata.streamRelationComments(session, schemaName, relationFilter)); + } + + @Override + public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return forHandle(tableHandle).getTableStatistics(session, tableHandle); + } + + @Override + public void createSchema(ConnectorSession session, String schemaName, Map properties, TrinoPrincipal owner) + { + hiveMetadata.createSchema(session, schemaName, properties, owner); + } + + @Override + public void dropSchema(ConnectorSession session, String schemaName, boolean cascade) + { + // use Iceberg to allow dropping materialized views + icebergMetadata.dropSchema(session, schemaName, cascade); + } + + @Override + public void renameSchema(ConnectorSession session, String source, String target) + { + hiveMetadata.renameSchema(session, source, target); + } + + @Override + public void setSchemaAuthorization(ConnectorSession session, String schemaName, TrinoPrincipal principal) + { + hiveMetadata.setSchemaAuthorization(session, schemaName, principal); + } + + @Override + public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, SaveMode saveMode) + { + forProperties(tableMetadata.getProperties()).createTable(session, unwrapTableMetadata(tableMetadata), saveMode); + } + + @Override + public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) + { + forHandle(tableHandle).dropTable(session, tableHandle); + } + + @Override + public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHandle) + { + forHandle(tableHandle).truncateTable(session, tableHandle); + } + + @Override + public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTableName) + { + forHandle(tableHandle).renameTable(session, tableHandle, newTableName); + } + + @Override + public void setTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle, Map> properties) + { + forHandle(tableHandle).setTableProperties(session, tableHandle, properties); + } + + @Override + public void setTableComment(ConnectorSession session, ConnectorTableHandle tableHandle, Optional comment) + { + forHandle(tableHandle).setTableComment(session, tableHandle, comment); + } + + @Override + public void setViewComment(ConnectorSession session, SchemaTableName viewName, Optional comment) + { + hiveMetadata.setViewComment(session, viewName, comment); + } + + @Override + public void setViewColumnComment(ConnectorSession session, SchemaTableName viewName, String columnName, Optional comment) + { + hiveMetadata.setViewColumnComment(session, viewName, columnName, comment); + } + + @Override + public void setMaterializedViewColumnComment(ConnectorSession session, SchemaTableName viewName, String columnName, Optional comment) + { + icebergMetadata.setMaterializedViewColumnComment(session, viewName, columnName, comment); + } + + @Override + public void setColumnComment(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle column, Optional comment) + { + forHandle(tableHandle).setColumnComment(session, tableHandle, column, comment); + } + + @Override + public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column, ColumnPosition position) + { + forHandle(tableHandle).addColumn(session, tableHandle, column, position); + } + + @Override + public void addField(ConnectorSession session, ConnectorTableHandle tableHandle, List parentPath, String fieldName, Type type, boolean ignoreExisting) + { + forHandle(tableHandle).addField(session, tableHandle, parentPath, fieldName, type, ignoreExisting); + } + + @Override + public void setColumnType(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle column, Type type) + { + forHandle(tableHandle).setColumnType(session, tableHandle, column, type); + } + + @Override + public void setFieldType(ConnectorSession session, ConnectorTableHandle tableHandle, List fieldPath, Type type) + { + forHandle(tableHandle).setFieldType(session, tableHandle, fieldPath, type); + } + + @Override + public void dropNotNullConstraint(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle column) + { + forHandle(tableHandle).dropNotNullConstraint(session, tableHandle, column); + } + + @Override + public void setTableAuthorization(ConnectorSession session, SchemaTableName tableName, TrinoPrincipal principal) + { + hiveMetadata.setTableAuthorization(session, tableName, principal); + } + + @Override + public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle source, String target) + { + forHandle(tableHandle).renameColumn(session, tableHandle, source, target); + } + + @Override + public void renameField(ConnectorSession session, ConnectorTableHandle tableHandle, List fieldPath, String target) + { + forHandle(tableHandle).renameField(session, tableHandle, fieldPath, target); + } + + @Override + public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle column) + { + forHandle(tableHandle).dropColumn(session, tableHandle, column); + } + + @Override + public void dropField(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle column, List fieldPath) + { + forHandle(tableHandle).dropField(session, tableHandle, column, fieldPath); + } + + @Override + public Optional getNewTableLayout(ConnectorSession session, ConnectorTableMetadata tableMetadata) + { + return forProperties(tableMetadata.getProperties()).getNewTableLayout(session, unwrapTableMetadata(tableMetadata)); + } + + @Override + public Optional getSupportedType(ConnectorSession session, Map tableProperties, Type type) + { + return forProperties(tableProperties).getSupportedType(session, tableProperties, type); + } + + @Override + public Optional getInsertLayout(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return forHandle(tableHandle).getInsertLayout(session, tableHandle); + } + + @Override + public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(ConnectorSession session, ConnectorTableMetadata tableMetadata) + { + return forProperties(tableMetadata.getProperties()).getStatisticsCollectionMetadataForWrite(session, unwrapTableMetadata(tableMetadata)); + } + + @Override + public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, Map analyzeProperties) + { + return forHandle(tableHandle).getStatisticsCollectionMetadata(session, tableHandle, analyzeProperties); + } + + @Override + public ConnectorTableHandle beginStatisticsCollection(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return forHandle(tableHandle).beginStatisticsCollection(session, tableHandle); + } + + @Override + public void finishStatisticsCollection(ConnectorSession session, ConnectorTableHandle tableHandle, Collection computedStatistics) + { + forHandle(tableHandle).finishStatisticsCollection(session, tableHandle, computedStatistics); + } + + @Override + public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout, RetryMode retryMode, boolean replace) + { + return forProperties(tableMetadata.getProperties()).beginCreateTable(session, unwrapTableMetadata(tableMetadata), layout, retryMode, replace); + } + + @Override + public Optional finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments, Collection computedStatistics) + { + return forHandle(tableHandle).finishCreateTable(session, tableHandle, fragments, computedStatistics); + } + + @Override + public void beginQuery(ConnectorSession session) + { + hiveMetadata.beginQuery(session); + } + + @Override + public void cleanupQuery(ConnectorSession session) + { + hiveMetadata.cleanupQuery(session); + } + + @Override + public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List columns, RetryMode retryMode) + { + return forHandle(tableHandle).beginInsert(session, tableHandle, columns, retryMode); + } + + @Override + public Optional finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, List sourceTableHandles, Collection fragments, Collection computedStatistics) + { + return forHandle(insertHandle).finishInsert(session, insertHandle, sourceTableHandles, fragments, computedStatistics); + } + + @Override + public boolean delegateMaterializedViewRefreshToConnector(ConnectorSession session, SchemaTableName viewName) + { + return icebergMetadata.delegateMaterializedViewRefreshToConnector(session, viewName); + } + + @Override + public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, List sourceTableHandles, RetryMode retryMode, RefreshType refreshType) + { + return icebergMetadata.beginRefreshMaterializedView(session, tableHandle, sourceTableHandles, retryMode, refreshType); + } + + @Override + public Optional finishRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, ConnectorInsertTableHandle insertHandle, Collection fragments, Collection computedStatistics, List sourceTableHandles, List sourceTableFunctions) + { + return icebergMetadata.finishRefreshMaterializedView(session, tableHandle, insertHandle, fragments, computedStatistics, sourceTableHandles, sourceTableFunctions); + } + + @Override + public RowChangeParadigm getRowChangeParadigm(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return forHandle(tableHandle).getRowChangeParadigm(session, tableHandle); + } + + @Override + public ColumnHandle getMergeRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return forHandle(tableHandle).getMergeRowIdColumnHandle(session, tableHandle); + } + + @Override + public Optional getUpdateLayout(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return forHandle(tableHandle).getUpdateLayout(session, tableHandle); + } + + @Override + public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, Map> updateCaseColumns, RetryMode retryMode) + { + return forHandle(tableHandle).beginMerge(session, tableHandle, updateCaseColumns, retryMode); + } + + @Override + public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle mergeTableHandle, List sourceTableHandles, Collection fragments, Collection computedStatistics) + { + forHandle(mergeTableHandle).finishMerge(session, mergeTableHandle, sourceTableHandles, fragments, computedStatistics); + } + + @Override + public void createView(ConnectorSession session, SchemaTableName viewName, ConnectorViewDefinition definition, Map viewProperties, boolean replace) + { + hiveMetadata.createView(session, viewName, definition, viewProperties, replace); + } + + @Override + public void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target) + { + hiveMetadata.renameView(session, source, target); + } + + @Override + public void setViewAuthorization(ConnectorSession session, SchemaTableName viewName, TrinoPrincipal principal) + { + hiveMetadata.setViewAuthorization(session, viewName, principal); + } + + @Override + public void dropView(ConnectorSession session, SchemaTableName viewName) + { + hiveMetadata.dropView(session, viewName); + } + + @Override + public List listViews(ConnectorSession session, Optional schemaName) + { + return hiveMetadata.listViews(session, schemaName); + } + + @Override + public Map getViews(ConnectorSession session, Optional schemaName) + { + return hiveMetadata.getViews(session, schemaName); + } + + @Override + public Optional getView(ConnectorSession session, SchemaTableName viewName) + { + return hiveMetadata.getView(session, viewName); + } + + @Override + public boolean isView(ConnectorSession session, SchemaTableName viewName) + { + return hiveMetadata.isView(session, viewName); + } + + @Override + public Map getViewProperties(ConnectorSession session, SchemaTableName viewName) + { + return hiveMetadata.getViewProperties(session, viewName); + } + + @Override + public Map getSchemaProperties(ConnectorSession session, String schemaName) + { + return hiveMetadata.getSchemaProperties(session, schemaName); + } + + @Override + public Optional getSchemaOwner(ConnectorSession session, String schemaName) + { + return hiveMetadata.getSchemaOwner(session, schemaName); + } + + @Override + public Optional applyUpdate(ConnectorSession session, ConnectorTableHandle handle, Map assignments) + { + return forHandle(handle).applyUpdate(session, handle, assignments); + } + + @Override + public OptionalLong executeUpdate(ConnectorSession session, ConnectorTableHandle handle) + { + return forHandle(handle).executeUpdate(session, handle); + } + + @Override + public Optional applyDelete(ConnectorSession session, ConnectorTableHandle handle) + { + return forHandle(handle).applyDelete(session, handle); + } + + @Override + public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle handle) + { + return forHandle(handle).executeDelete(session, handle); + } + + @Override + public Collection listLanguageFunctions(ConnectorSession session, String schemaName) + { + return hiveMetadata.listLanguageFunctions(session, schemaName); + } + + @Override + public Collection getLanguageFunctions(ConnectorSession session, SchemaFunctionName name) + { + return hiveMetadata.getLanguageFunctions(session, name); + } + + @Override + public boolean languageFunctionExists(ConnectorSession session, SchemaFunctionName name, String signatureToken) + { + return hiveMetadata.languageFunctionExists(session, name, signatureToken); + } + + @Override + public void createLanguageFunction(ConnectorSession session, SchemaFunctionName name, LanguageFunction function, boolean replace) + { + hiveMetadata.createLanguageFunction(session, name, function, replace); + } + + @Override + public void dropLanguageFunction(ConnectorSession session, SchemaFunctionName name, String signatureToken) + { + hiveMetadata.dropLanguageFunction(session, name, signatureToken); + } + + @Override + public boolean roleExists(ConnectorSession session, String role) + { + return hiveMetadata.roleExists(session, role); + } + + @Override + public void createRole(ConnectorSession session, String role, Optional grantor) + { + hiveMetadata.createRole(session, role, grantor); + } + + @Override + public void dropRole(ConnectorSession session, String role) + { + hiveMetadata.dropRole(session, role); + } + + @Override + public Set listRoles(ConnectorSession session) + { + return hiveMetadata.listRoles(session); + } + + @Override + public Set listRoleGrants(ConnectorSession session, TrinoPrincipal principal) + { + return hiveMetadata.listRoleGrants(session, principal); + } + + @Override + public void grantRoles(ConnectorSession connectorSession, Set roles, Set grantees, boolean adminOption, Optional grantor) + { + hiveMetadata.grantRoles(connectorSession, roles, grantees, adminOption, grantor); + } + + @Override + public void revokeRoles(ConnectorSession connectorSession, Set roles, Set grantees, boolean adminOption, Optional grantor) + { + hiveMetadata.revokeRoles(connectorSession, roles, grantees, adminOption, grantor); + } + + @Override + public Set listApplicableRoles(ConnectorSession session, TrinoPrincipal principal) + { + return hiveMetadata.listApplicableRoles(session, principal); + } + + @Override + public Set listEnabledRoles(ConnectorSession session) + { + return hiveMetadata.listEnabledRoles(session); + } + + @Override + public void grantSchemaPrivileges(ConnectorSession session, String schemaName, Set privileges, TrinoPrincipal grantee, boolean grantOption) + { + hiveMetadata.grantSchemaPrivileges(session, schemaName, privileges, grantee, grantOption); + } + + @Override + public void denySchemaPrivileges(ConnectorSession session, String schemaName, Set privileges, TrinoPrincipal grantee) + { + hiveMetadata.denySchemaPrivileges(session, schemaName, privileges, grantee); + } + + @Override + public void revokeSchemaPrivileges(ConnectorSession session, String schemaName, Set privileges, TrinoPrincipal grantee, boolean grantOption) + { + hiveMetadata.revokeSchemaPrivileges(session, schemaName, privileges, grantee, grantOption); + } + + @Override + public void grantTablePrivileges(ConnectorSession session, SchemaTableName tableName, Set privileges, TrinoPrincipal grantee, boolean grantOption) + { + hiveMetadata.grantTablePrivileges(session, tableName, privileges, grantee, grantOption); + } + + @Override + public void denyTablePrivileges(ConnectorSession session, SchemaTableName tableName, Set privileges, TrinoPrincipal grantee) + { + hiveMetadata.denyTablePrivileges(session, tableName, privileges, grantee); + } + + @Override + public void revokeTablePrivileges(ConnectorSession session, SchemaTableName tableName, Set privileges, TrinoPrincipal grantee, boolean grantOption) + { + hiveMetadata.revokeTablePrivileges(session, tableName, privileges, grantee, grantOption); + } + + @Override + public List listTablePrivileges(ConnectorSession session, SchemaTablePrefix prefix) + { + return hiveMetadata.listTablePrivileges(session, prefix); + } + + @Override + public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table) + { + return forHandle(table).getTableProperties(session, table); + } + + @Override + public Optional> applyLimit(ConnectorSession session, ConnectorTableHandle handle, long limit) + { + return forHandle(handle).applyLimit(session, handle, limit); + } + + @Override + public Optional> applyFilter(ConnectorSession session, ConnectorTableHandle handle, Constraint constraint) + { + return forHandle(handle).applyFilter(session, handle, constraint); + } + + @Override + public Optional> applyProjection(ConnectorSession session, ConnectorTableHandle handle, List projections, Map assignments) + { + return forHandle(handle).applyProjection(session, handle, projections, assignments); + } + + @Override + public Optional> applySample(ConnectorSession session, ConnectorTableHandle handle, SampleType sampleType, double sampleRatio) + { + return forHandle(handle).applySample(session, handle, sampleType, sampleRatio); + } + + @Override + public Optional> applyAggregation(ConnectorSession session, ConnectorTableHandle handle, List aggregates, Map assignments, List> groupingSets) + { + return forHandle(handle).applyAggregation(session, handle, aggregates, assignments, groupingSets); + } + + @Override + public Optional> applyTopN(ConnectorSession session, ConnectorTableHandle handle, long topNCount, List sortItems, Map assignments) + { + return forHandle(handle).applyTopN(session, handle, topNCount, sortItems, assignments); + } + + @Override + public void validateScan(ConnectorSession session, ConnectorTableHandle handle) + { + forHandle(handle).validateScan(session, handle); + } + + @Override + public void createMaterializedView(ConnectorSession session, SchemaTableName viewName, ConnectorMaterializedViewDefinition definition, Map properties, boolean replace, boolean ignoreExisting) + { + icebergMetadata.createMaterializedView(session, viewName, definition, properties, replace, ignoreExisting); + } + + @Override + public void dropMaterializedView(ConnectorSession session, SchemaTableName viewName) + { + icebergMetadata.dropMaterializedView(session, viewName); + } + + @Override + public List listMaterializedViews(ConnectorSession session, Optional schemaName) + { + return icebergMetadata.listMaterializedViews(session, schemaName); + } + + @Override + public void setMaterializedViewAuthorization(ConnectorSession session, SchemaTableName viewName, TrinoPrincipal principal) + { + icebergMetadata.setMaterializedViewAuthorization(session, viewName, principal); + } + + @Override + public Map getMaterializedViews(ConnectorSession session, Optional schemaName) + { + return icebergMetadata.getMaterializedViews(session, schemaName); + } + + @Override + public Optional getMaterializedView(ConnectorSession session, SchemaTableName viewName) + { + return icebergMetadata.getMaterializedView(session, viewName); + } + + @Override + public Map getMaterializedViewProperties(ConnectorSession session, SchemaTableName viewName, ConnectorMaterializedViewDefinition materializedViewDefinition) + { + return icebergMetadata.getMaterializedViewProperties(session, viewName, materializedViewDefinition); + } + + @Override + public MaterializedViewFreshness getMaterializedViewFreshness(ConnectorSession session, SchemaTableName name) + { + return icebergMetadata.getMaterializedViewFreshness(session, name); + } + + @Override + public void renameMaterializedView(ConnectorSession session, SchemaTableName source, SchemaTableName target) + { + icebergMetadata.renameMaterializedView(session, source, target); + } + + @Override + public void setMaterializedViewProperties(ConnectorSession session, SchemaTableName viewName, Map> properties) + { + icebergMetadata.setMaterializedViewProperties(session, viewName, properties); + } + + @Override + public boolean allowSplittingReadIntoMultipleSubQueries(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return forHandle(tableHandle).allowSplittingReadIntoMultipleSubQueries(session, tableHandle); + } + + @Override + public WriterScalingOptions getNewTableWriterScalingOptions(ConnectorSession session, SchemaTableName tableName, Map tableProperties) + { + return forProperties(tableProperties).getNewTableWriterScalingOptions(session, tableName, tableProperties); + } + + @Override + public WriterScalingOptions getInsertWriterScalingOptions(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return forHandle(tableHandle).getInsertWriterScalingOptions(session, tableHandle); + } + + private ConnectorMetadata forHandle(ConnectorTableHandle handle) + { + return switch (handle) { + case HiveTableHandle _ -> hiveMetadata; + case IcebergTableHandle _ -> icebergMetadata; + case DeltaLakeTableHandle _ -> deltaMetadata; + case HudiTableHandle _ -> hudiMetadata; + default -> throw new IllegalArgumentException("Unsupported table handle: " + handle.getClass().getName()); + }; + } + + private ConnectorMetadata forHandle(ConnectorTableExecuteHandle handle) + { + return switch (handle) { + case HiveTableExecuteHandle _ -> hiveMetadata; + case IcebergTableExecuteHandle _ -> icebergMetadata; + case DeltaLakeTableExecuteHandle _ -> deltaMetadata; + default -> throw new IllegalArgumentException("Unsupported execute handle: " + handle.getClass().getName()); + }; + } + + private ConnectorMetadata forHandle(ConnectorPartitioningHandle handle) + { + return switch (handle) { + case HivePartitioningHandle _ -> hiveMetadata; + case IcebergPartitioningHandle _ -> icebergMetadata; + case DeltaLakePartitioningHandle _ -> deltaMetadata; + default -> throw new IllegalArgumentException("Unsupported partitioning handle: " + handle.getClass().getName()); + }; + } + + private ConnectorMetadata forHandle(ConnectorInsertTableHandle handle) + { + return switch (handle) { + case HiveInsertTableHandle _ -> hiveMetadata; + case IcebergWritableTableHandle _ -> icebergMetadata; + case DeltaLakeInsertTableHandle _ -> deltaMetadata; + default -> throw new IllegalArgumentException("Unsupported insert handle: " + handle.getClass().getName()); + }; + } + + private ConnectorMetadata forHandle(ConnectorOutputTableHandle handle) + { + return switch (handle) { + case HiveOutputTableHandle _ -> hiveMetadata; + case IcebergWritableTableHandle _ -> icebergMetadata; + case DeltaLakeOutputTableHandle _ -> deltaMetadata; + default -> throw new IllegalArgumentException("Unsupported output handle: " + handle.getClass().getName()); + }; + } + + private ConnectorMetadata forHandle(ConnectorMergeTableHandle handle) + { + return switch (handle) { + case HiveMergeTableHandle _ -> hiveMetadata; + case IcebergMergeTableHandle _ -> icebergMetadata; + case DeltaLakeMergeTableHandle _ -> deltaMetadata; + default -> throw new IllegalArgumentException("Unsupported merge handle: " + handle.getClass().getName()); + }; + } + + private ConnectorMetadata forProperties(Map properties) + { + return switch (getTableType(properties)) { + case HIVE -> hiveMetadata; + case ICEBERG -> icebergMetadata; + case DELTA -> deltaMetadata; + case HUDI -> hudiMetadata; + }; + } + + private ConnectorTableMetadata unwrapTableMetadata(ConnectorTableMetadata metadata) + { + return withProperties(metadata, tableProperties.unwrapProperties(metadata.getProperties())); + } + + private ConnectorTableMetadata wrapTableMetadata(TableType tableType, ConnectorTableMetadata metadata) + { + return withProperties(metadata, tableProperties.wrapProperties(tableType, metadata.getProperties())); + } + + private static ConnectorTableMetadata withProperties(ConnectorTableMetadata metadata, Map properties) + { + return new ConnectorTableMetadata( + metadata.getTable(), + metadata.getColumns(), + properties, + metadata.getComment(), + metadata.getCheckConstraints()); + } + + private static TableType tableTypeForHandle(ConnectorTableHandle handle) + { + return switch (handle) { + case HiveTableHandle _ -> TableType.HIVE; + case IcebergTableHandle _ -> TableType.ICEBERG; + case DeltaLakeTableHandle _ -> TableType.DELTA; + case HudiTableHandle _ -> TableType.HUDI; + default -> throw new IllegalArgumentException("Unsupported table handle: " + handle.getClass().getName()); + }; + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java new file mode 100644 index 000000000000..6050c8fa54f0 --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java @@ -0,0 +1,58 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.inject.Binder; +import com.google.inject.Key; +import com.google.inject.Scopes; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.base.metrics.FileFormatDataSourceStats; +import io.trino.plugin.hive.HideDeltaLakeTables; +import io.trino.plugin.hive.SortingFileWriterConfig; +import io.trino.plugin.hive.orc.OrcReaderConfig; +import io.trino.plugin.hive.orc.OrcWriterConfig; +import io.trino.plugin.hive.parquet.ParquetReaderConfig; +import io.trino.plugin.hive.parquet.ParquetWriterConfig; + +import static io.airlift.configuration.ConfigBinder.configBinder; +import static org.weakref.jmx.guice.ExportBinder.newExporter; + +class LakehouseModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + configBinder(binder).bindConfig(LakehouseConfig.class); + configBinder(binder).bindConfig(OrcReaderConfig.class); + configBinder(binder).bindConfig(OrcWriterConfig.class); + configBinder(binder).bindConfig(ParquetReaderConfig.class); + configBinder(binder).bindConfig(ParquetWriterConfig.class); + configBinder(binder).bindConfig(SortingFileWriterConfig.class, "lakehouse"); + + binder.bind(LakehouseConnector.class).in(Scopes.SINGLETON); + binder.bind(LakehouseNodePartitioningProvider.class).in(Scopes.SINGLETON); + binder.bind(LakehousePageSinkProvider.class).in(Scopes.SINGLETON); + binder.bind(LakehousePageSourceProviderFactory.class).in(Scopes.SINGLETON); + binder.bind(LakehouseSessionProperties.class).in(Scopes.SINGLETON); + binder.bind(LakehouseSplitManager.class).in(Scopes.SINGLETON); + binder.bind(LakehouseTableProperties.class).in(Scopes.SINGLETON); + binder.bind(LakehouseTransactionManager.class).in(Scopes.SINGLETON); + + binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON); + newExporter(binder).export(FileFormatDataSourceStats.class).withGeneratedName(); + + binder.bind(Key.get(boolean.class, HideDeltaLakeTables.class)).toInstance(false); + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseNodePartitioningProvider.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseNodePartitioningProvider.java new file mode 100644 index 000000000000..10f67a359708 --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseNodePartitioningProvider.java @@ -0,0 +1,84 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.inject.Inject; +import io.trino.plugin.deltalake.DeltaLakeNodePartitioningProvider; +import io.trino.plugin.deltalake.DeltaLakePartitioningHandle; +import io.trino.plugin.deltalake.DeltaLakeUpdateHandle; +import io.trino.plugin.hive.HiveNodePartitioningProvider; +import io.trino.plugin.hive.HivePartitioningHandle; +import io.trino.plugin.iceberg.IcebergNodePartitioningProvider; +import io.trino.plugin.iceberg.IcebergPartitioningHandle; +import io.trino.spi.connector.BucketFunction; +import io.trino.spi.connector.ConnectorBucketNodeMap; +import io.trino.spi.connector.ConnectorNodePartitioningProvider; +import io.trino.spi.connector.ConnectorPartitioningHandle; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplit; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.type.Type; + +import java.util.List; +import java.util.Optional; +import java.util.function.ToIntFunction; + +import static java.util.Objects.requireNonNull; + +public class LakehouseNodePartitioningProvider + implements ConnectorNodePartitioningProvider +{ + private final HiveNodePartitioningProvider hiveNodePartitioningProvider; + private final IcebergNodePartitioningProvider icebergNodePartitioningProvider; + private final DeltaLakeNodePartitioningProvider deltaNodePartitioningProvider; + + @Inject + public LakehouseNodePartitioningProvider( + HiveNodePartitioningProvider hiveNodePartitioningProvider, + IcebergNodePartitioningProvider icebergNodePartitioningProvider, + DeltaLakeNodePartitioningProvider deltaNodePartitioningProvider) + { + this.hiveNodePartitioningProvider = requireNonNull(hiveNodePartitioningProvider, "hiveNodePartitioningProvider is null"); + this.icebergNodePartitioningProvider = requireNonNull(icebergNodePartitioningProvider, "icebergNodePartitioningProvider is null"); + this.deltaNodePartitioningProvider = requireNonNull(deltaNodePartitioningProvider, "deltaNodePartitioningProvider is null"); + } + + @Override + public Optional getBucketNodeMapping(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle) + { + return forHandle(partitioningHandle).getBucketNodeMapping(transactionHandle, session, partitioningHandle); + } + + @Override + public ToIntFunction getSplitBucketFunction(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle, int bucketCount) + { + return forHandle(partitioningHandle).getSplitBucketFunction(transactionHandle, session, partitioningHandle, bucketCount); + } + + @Override + public BucketFunction getBucketFunction(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle, List partitionChannelTypes, int bucketCount) + { + return forHandle(partitioningHandle).getBucketFunction(transactionHandle, session, partitioningHandle, partitionChannelTypes, bucketCount); + } + + private ConnectorNodePartitioningProvider forHandle(ConnectorPartitioningHandle handle) + { + return switch (handle) { + case HivePartitioningHandle _ -> hiveNodePartitioningProvider; + case IcebergPartitioningHandle _ -> icebergNodePartitioningProvider; + case DeltaLakePartitioningHandle _, DeltaLakeUpdateHandle _ -> deltaNodePartitioningProvider; + default -> throw new UnsupportedOperationException("Unsupported partitioning handle " + handle.getClass()); + }; + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehousePageSinkProvider.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehousePageSinkProvider.java new file mode 100644 index 000000000000..73bfe157e69e --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehousePageSinkProvider.java @@ -0,0 +1,125 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.inject.Inject; +import io.trino.plugin.deltalake.DeltaLakeInsertTableHandle; +import io.trino.plugin.deltalake.DeltaLakeMergeTableHandle; +import io.trino.plugin.deltalake.DeltaLakeOutputTableHandle; +import io.trino.plugin.deltalake.DeltaLakePageSinkProvider; +import io.trino.plugin.deltalake.procedure.DeltaLakeTableExecuteHandle; +import io.trino.plugin.hive.HiveInsertTableHandle; +import io.trino.plugin.hive.HiveMergeTableHandle; +import io.trino.plugin.hive.HiveOutputTableHandle; +import io.trino.plugin.hive.HivePageSinkProvider; +import io.trino.plugin.hive.HiveTableExecuteHandle; +import io.trino.plugin.iceberg.IcebergMergeTableHandle; +import io.trino.plugin.iceberg.IcebergPageSinkProvider; +import io.trino.plugin.iceberg.IcebergWritableTableHandle; +import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle; +import io.trino.spi.connector.ConnectorInsertTableHandle; +import io.trino.spi.connector.ConnectorMergeSink; +import io.trino.spi.connector.ConnectorMergeTableHandle; +import io.trino.spi.connector.ConnectorOutputTableHandle; +import io.trino.spi.connector.ConnectorPageSink; +import io.trino.spi.connector.ConnectorPageSinkId; +import io.trino.spi.connector.ConnectorPageSinkProvider; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; +import io.trino.spi.connector.ConnectorTransactionHandle; + +import static java.util.Objects.requireNonNull; + +public class LakehousePageSinkProvider + implements ConnectorPageSinkProvider +{ + private final HivePageSinkProvider hivePageSinkProvider; + private final IcebergPageSinkProvider icebergPageSinkProvider; + private final DeltaLakePageSinkProvider deltaPageSinkProvider; + + @Inject + public LakehousePageSinkProvider( + HivePageSinkProvider hivePageSinkProvider, + IcebergPageSinkProvider icebergPageSinkProvider, + DeltaLakePageSinkProvider deltaPageSinkProvider) + { + this.hivePageSinkProvider = requireNonNull(hivePageSinkProvider, "hivePageSinkProvider is null"); + this.icebergPageSinkProvider = requireNonNull(icebergPageSinkProvider, "icebergPageSinkProvider is null"); + this.deltaPageSinkProvider = requireNonNull(deltaPageSinkProvider, "deltaPageSinkProvider is null"); + } + + @Override + public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle outputTableHandle, ConnectorPageSinkId pageSinkId) + { + return forHandle(outputTableHandle).createPageSink(transactionHandle, session, outputTableHandle, pageSinkId); + } + + @Override + public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle insertTableHandle, ConnectorPageSinkId pageSinkId) + { + return forHandle(insertTableHandle).createPageSink(transactionHandle, session, insertTableHandle, pageSinkId); + } + + @Override + public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, ConnectorPageSinkId pageSinkId) + { + return forHandle(tableExecuteHandle).createPageSink(transactionHandle, session, tableExecuteHandle, pageSinkId); + } + + @Override + public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorMergeTableHandle mergeHandle, ConnectorPageSinkId pageSinkId) + { + return forHandle(mergeHandle).createMergeSink(transactionHandle, session, mergeHandle, pageSinkId); + } + + private ConnectorPageSinkProvider forHandle(ConnectorOutputTableHandle handle) + { + return switch (handle) { + case HiveOutputTableHandle _ -> hivePageSinkProvider; + case IcebergWritableTableHandle _ -> icebergPageSinkProvider; + case DeltaLakeOutputTableHandle _ -> deltaPageSinkProvider; + default -> throw new UnsupportedOperationException("Unsupported output handle " + handle.getClass()); + }; + } + + private ConnectorPageSinkProvider forHandle(ConnectorInsertTableHandle handle) + { + return switch (handle) { + case HiveInsertTableHandle _ -> hivePageSinkProvider; + case IcebergWritableTableHandle _ -> icebergPageSinkProvider; + case DeltaLakeInsertTableHandle _ -> deltaPageSinkProvider; + default -> throw new UnsupportedOperationException("Unsupported insert handle " + handle.getClass()); + }; + } + + private ConnectorPageSinkProvider forHandle(ConnectorTableExecuteHandle handle) + { + return switch (handle) { + case HiveTableExecuteHandle _ -> hivePageSinkProvider; + case IcebergTableExecuteHandle _ -> icebergPageSinkProvider; + case DeltaLakeTableExecuteHandle _ -> deltaPageSinkProvider; + default -> throw new UnsupportedOperationException("Unsupported execute handle " + handle.getClass()); + }; + } + + private ConnectorPageSinkProvider forHandle(ConnectorMergeTableHandle handle) + { + return switch (handle) { + case HiveMergeTableHandle _ -> hivePageSinkProvider; + case IcebergMergeTableHandle _ -> icebergPageSinkProvider; + case DeltaLakeMergeTableHandle _ -> deltaPageSinkProvider; + default -> throw new UnsupportedOperationException("Unsupported merge handle " + handle.getClass()); + }; + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehousePageSourceProviderFactory.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehousePageSourceProviderFactory.java new file mode 100644 index 000000000000..016cfb28aea1 --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehousePageSourceProviderFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.inject.Inject; +import io.trino.plugin.deltalake.DeltaLakePageSourceProvider; +import io.trino.plugin.deltalake.DeltaLakeTableHandle; +import io.trino.plugin.hive.HivePageSourceProvider; +import io.trino.plugin.hive.HiveTableHandle; +import io.trino.plugin.hudi.HudiPageSourceProvider; +import io.trino.plugin.hudi.HudiTableHandle; +import io.trino.plugin.iceberg.IcebergPageSourceProviderFactory; +import io.trino.plugin.iceberg.IcebergTableHandle; +import io.trino.spi.connector.ConnectorPageSourceProvider; +import io.trino.spi.connector.ConnectorPageSourceProviderFactory; +import io.trino.spi.connector.ConnectorTableHandle; + +import static java.util.Objects.requireNonNull; + +public class LakehousePageSourceProviderFactory + implements ConnectorPageSourceProviderFactory +{ + private final HivePageSourceProvider hivePageSourceProvider; + private final IcebergPageSourceProviderFactory icebergPageSourceProviderFactory; + private final DeltaLakePageSourceProvider deltaLakePageSourceProvider; + private final HudiPageSourceProvider hudiPageSourceProvider; + + @Inject + public LakehousePageSourceProviderFactory( + HivePageSourceProvider hivePageSourceProvider, + IcebergPageSourceProviderFactory icebergPageSourceProviderFactory, + DeltaLakePageSourceProvider deltaLakePageSourceProvider, + HudiPageSourceProvider hudiPageSourceProvider) + { + this.hivePageSourceProvider = requireNonNull(hivePageSourceProvider, "hivePageSourceProvider is null"); + this.icebergPageSourceProviderFactory = requireNonNull(icebergPageSourceProviderFactory, "icebergPageSourceProviderFactory is null"); + this.deltaLakePageSourceProvider = requireNonNull(deltaLakePageSourceProvider, "deltaLakePageSourceProvider is null"); + this.hudiPageSourceProvider = requireNonNull(hudiPageSourceProvider, "hudiPageSourceProvider is null"); + } + + @Override + public ConnectorPageSourceProvider createPageSourceProvider() + { + return (transaction, session, split, table, columns, dynamicFilter) -> + forHandle(table).createPageSource(transaction, session, split, table, columns, dynamicFilter); + } + + private ConnectorPageSourceProvider forHandle(ConnectorTableHandle handle) + { + return switch (handle) { + case HiveTableHandle _ -> hivePageSourceProvider; + case IcebergTableHandle _ -> icebergPageSourceProviderFactory.createPageSourceProvider(); + case DeltaLakeTableHandle _ -> deltaLakePageSourceProvider; + case HudiTableHandle _ -> hudiPageSourceProvider; + default -> throw new UnsupportedOperationException("Unsupported table handle " + handle.getClass()); + }; + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehousePlugin.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehousePlugin.java new file mode 100644 index 000000000000..37dbd2a216a4 --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehousePlugin.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.common.collect.ImmutableList; +import io.trino.spi.Plugin; +import io.trino.spi.connector.ConnectorFactory; + +public class LakehousePlugin + implements Plugin +{ + @Override + public Iterable getConnectorFactories() + { + return ImmutableList.of(new LakehouseConnectorFactory()); + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseSessionProperties.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseSessionProperties.java new file mode 100644 index 000000000000..7aaff0624c48 --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseSessionProperties.java @@ -0,0 +1,102 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.common.base.VerifyException; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Streams; +import com.google.inject.Inject; +import io.trino.plugin.deltalake.DeltaLakeSessionProperties; +import io.trino.plugin.hive.HiveSessionProperties; +import io.trino.plugin.hudi.HudiSessionProperties; +import io.trino.plugin.iceberg.IcebergSessionProperties; +import io.trino.spi.session.PropertyMetadata; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public class LakehouseSessionProperties +{ + private static final Set IGNORED_DESCRIPTIONS = ImmutableSet.builder() + .add("compression_codec") + .add("ignore_absent_partitions") + .add("minimum_assigned_split_weight") + .add("query_partition_filter_required") + .add("size_based_split_weights_enabled") + .add("sorted_writing_enabled") + .add("timestamp_precision") + .build(); + + private static final Set IGNORED_DEFAULT_VALUES = ImmutableSet.builder() + .add("compression_codec") + .add("dynamic_filtering_wait_timeout") + .add("max_split_size") + .build(); + + private final List> sessionProperties; + + @Inject + public LakehouseSessionProperties( + HiveSessionProperties hiveSessionProperties, + IcebergSessionProperties icebergSessionProperties, + DeltaLakeSessionProperties deltaSessionProperties, + HudiSessionProperties hudiSessionProperties) + { + List> allProperties = Streams.concat( + hiveSessionProperties.getSessionProperties().stream(), + icebergSessionProperties.getSessionProperties().stream(), + deltaSessionProperties.getSessionProperties().stream(), + hudiSessionProperties.getSessionProperties().stream()).toList(); + + Map> properties = new HashMap<>(); + + for (PropertyMetadata property : allProperties) { + PropertyMetadata existing = properties.putIfAbsent(property.getName(), property); + if (existing == null) { + continue; + } + if (!existing.getDescription().equals(property.getDescription()) && !IGNORED_DESCRIPTIONS.contains(property.getName())) { + throw new VerifyException("Conflicting session property '%s' with different descriptions: %s <> %s" + .formatted(property.getName(), existing.getDescription(), property.getDescription())); + } + if (!existing.getJavaType().equals(property.getJavaType())) { + throw new VerifyException("Conflicting session property '%s' with different Java types: %s <> %s" + .formatted(property.getName(), existing.getJavaType(), property.getJavaType())); + } + if (!existing.getSqlType().equals(property.getSqlType())) { + throw new VerifyException("Conflicting session property '%s' with different SQL types: %s <> %s" + .formatted(property.getName(), existing.getSqlType(), property.getSqlType())); + } + if (!Objects.equals(existing.getDefaultValue(), property.getDefaultValue()) && !IGNORED_DEFAULT_VALUES.contains(property.getName())) { + throw new VerifyException("Conflicting session property '%s' with different default values: %s <> %s" + .formatted(property.getName(), existing.getDefaultValue(), property.getDefaultValue())); + } + if (existing.isHidden() != property.isHidden() && !property.getName().equals("timestamp_precision")) { + throw new VerifyException("Conflicting session property '%s' with different hidden flags: %s <> %s" + .formatted(property.getName(), existing.isHidden(), property.isHidden())); + } + } + + this.sessionProperties = ImmutableList.copyOf(properties.values()); + } + + public List> getSessionProperties() + { + return sessionProperties; + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseSplitManager.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseSplitManager.java new file mode 100644 index 000000000000..6d4635ab660f --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseSplitManager.java @@ -0,0 +1,90 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.inject.Inject; +import io.trino.plugin.deltalake.DeltaLakeSplitManager; +import io.trino.plugin.deltalake.DeltaLakeTableHandle; +import io.trino.plugin.deltalake.functions.tablechanges.TableChangesTableFunctionHandle; +import io.trino.plugin.hive.HiveSplitManager; +import io.trino.plugin.hive.HiveTableHandle; +import io.trino.plugin.hudi.HudiSplitManager; +import io.trino.plugin.hudi.HudiTableHandle; +import io.trino.plugin.iceberg.IcebergSplitManager; +import io.trino.plugin.iceberg.IcebergTableHandle; +import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionHandle; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplitManager; +import io.trino.spi.connector.ConnectorSplitSource; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.Constraint; +import io.trino.spi.connector.DynamicFilter; +import io.trino.spi.function.table.ConnectorTableFunctionHandle; + +import static java.util.Objects.requireNonNull; + +public class LakehouseSplitManager + implements ConnectorSplitManager +{ + private final HiveSplitManager hiveSplitManager; + private final IcebergSplitManager icebergSplitManager; + private final DeltaLakeSplitManager deltaSplitManager; + private final HudiSplitManager hudiSplitManager; + + @Inject + public LakehouseSplitManager( + HiveSplitManager hiveSplitManager, + IcebergSplitManager icebergSplitManager, + DeltaLakeSplitManager deltaSplitManager, + HudiSplitManager hudiSplitManager) + { + this.hiveSplitManager = requireNonNull(hiveSplitManager, "hiveSplitManager is null"); + this.icebergSplitManager = requireNonNull(icebergSplitManager, "icebergSplitManager is null"); + this.deltaSplitManager = requireNonNull(deltaSplitManager, "deltaSplitManager is null"); + this.hudiSplitManager = requireNonNull(hudiSplitManager, "hudiSplitManager is null"); + } + + @Override + public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableHandle table, DynamicFilter dynamicFilter, Constraint constraint) + { + return forHandle(table).getSplits(transaction, session, table, dynamicFilter, constraint); + } + + @Override + public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableFunctionHandle function) + { + return forHandle(function).getSplits(transaction, session, function); + } + + private ConnectorSplitManager forHandle(ConnectorTableHandle handle) + { + return switch (handle) { + case HiveTableHandle _ -> hiveSplitManager; + case IcebergTableHandle _ -> icebergSplitManager; + case DeltaLakeTableHandle _ -> deltaSplitManager; + case HudiTableHandle _ -> hudiSplitManager; + default -> throw new IllegalArgumentException("Unsupported table handle: " + handle.getClass().getName()); + }; + } + + private ConnectorSplitManager forHandle(ConnectorTableFunctionHandle handle) + { + return switch (handle) { + case TableChangesFunctionHandle _ -> icebergSplitManager; + case TableChangesTableFunctionHandle _ -> deltaSplitManager; + default -> throw new IllegalArgumentException("Unsupported table function handle: " + handle.getClass().getName()); + }; + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseTableProperties.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseTableProperties.java new file mode 100644 index 000000000000..0f2b33667588 --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseTableProperties.java @@ -0,0 +1,171 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.common.base.VerifyException; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Streams; +import com.google.inject.Inject; +import io.trino.plugin.deltalake.DeltaLakeTableProperties; +import io.trino.plugin.hive.HiveTableProperties; +import io.trino.plugin.hudi.HudiTableProperties; +import io.trino.plugin.iceberg.IcebergTableProperties; +import io.trino.spi.session.PropertyMetadata; +import io.trino.spi.type.ArrayType; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import static com.google.common.base.Strings.emptyToNull; +import static com.google.common.collect.MoreCollectors.onlyElement; +import static io.trino.spi.session.PropertyMetadata.enumProperty; +import static io.trino.spi.session.PropertyMetadata.stringProperty; +import static io.trino.spi.type.VarcharType.VARCHAR; + +public class LakehouseTableProperties +{ + public static final String TABLE_TYPE_PROPERTY = "type"; + + private final List> tableProperties; + private final PropertyMetadata hiveFormatProperty; + private final PropertyMetadata icebergFormatProperty; + + @Inject + public LakehouseTableProperties( + HiveTableProperties hiveTableProperties, + IcebergTableProperties icebergTableProperties, + DeltaLakeTableProperties deltaTableProperties, + HudiTableProperties hudiTableProperties, + LakehouseConfig config) + { + Map> tableProperties = new HashMap<>(); + + tableProperties.put("type", enumProperty( + "type", + "Table type", + TableType.class, + config.getTableType(), + false)); + + tableProperties.put("format", stringProperty( + "format", + "File format for the table", + "", + false)); + + tableProperties.put("sorted_by", new PropertyMetadata<>( + "sorted_by", + "Sorted columns", + new ArrayType(VARCHAR), + List.class, + ImmutableList.of(), + false, + value -> (List) value, + value -> value)); + + List> allProperties = Streams.concat( + hiveTableProperties.getTableProperties().stream(), + icebergTableProperties.getTableProperties().stream(), + deltaTableProperties.getTableProperties().stream(), + hudiTableProperties.getTableProperties().stream()) + .filter(property -> !property.getName().equals("format")) + .filter(property -> !property.getName().equals("sorted_by")) + .toList(); + + for (PropertyMetadata property : allProperties) { + PropertyMetadata existing = tableProperties.putIfAbsent(property.getName(), property); + if (existing == null) { + continue; + } + if (!existing.getDescription().equals(property.getDescription())) { + throw new VerifyException("Conflicting table property '%s' with different descriptions: %s <> %s" + .formatted(property.getName(), existing.getDescription(), property.getDescription())); + } + if (!existing.getJavaType().equals(property.getJavaType())) { + throw new VerifyException("Conflicting table property '%s' with different Java types: %s <> %s" + .formatted(property.getName(), existing.getJavaType(), property.getJavaType())); + } + if (!existing.getSqlType().equals(property.getSqlType())) { + throw new VerifyException("Conflicting table property '%s' with different SQL types: %s <> %s" + .formatted(property.getName(), existing.getSqlType(), property.getSqlType())); + } + if (!Objects.equals(existing.getDefaultValue(), property.getDefaultValue())) { + throw new VerifyException("Conflicting table property '%s' with different default values: %s <> %s" + .formatted(property.getName(), existing.getDefaultValue(), property.getDefaultValue())); + } + if (existing.isHidden() != property.isHidden()) { + throw new VerifyException("Conflicting table property '%s' with different hidden flags: %s <> %s" + .formatted(property.getName(), existing.isHidden(), property.isHidden())); + } + } + + this.tableProperties = ImmutableList.copyOf(tableProperties.values()); + + this.hiveFormatProperty = hiveTableProperties.getTableProperties().stream() + .filter(property -> property.getName().equals("format")) + .collect(onlyElement()); + + this.icebergFormatProperty = icebergTableProperties.getTableProperties().stream() + .filter(property -> property.getName().equals("format")) + .collect(onlyElement()); + } + + public List> getTableProperties() + { + return tableProperties; + } + + public Map unwrapProperties(Map wrappedProperties) + { + Map properties = new HashMap<>(wrappedProperties); + properties.computeIfPresent("format", (_, value) -> switch (getTableType(properties)) { + case HIVE -> decodeProperty(hiveFormatProperty, value); + case ICEBERG -> decodeProperty(icebergFormatProperty, value); + case DELTA, HUDI -> value; + }); + return properties; + } + + public Map wrapProperties(TableType tableType, Map unwrappedProperties) + { + Map properties = new HashMap<>(unwrappedProperties); + properties.computeIfPresent("format", (_, value) -> switch (tableType) { + case HIVE -> encodeProperty(hiveFormatProperty, value); + case ICEBERG -> encodeProperty(icebergFormatProperty, value); + case DELTA, HUDI -> value; + }); + properties.put("type", tableType); + return properties; + } + + public static TableType getTableType(Map tableProperties) + { + return (TableType) tableProperties.get(TABLE_TYPE_PROPERTY); + } + + private static Object decodeProperty(PropertyMetadata property, Object value) + { + return Optional.ofNullable(emptyToNull((String) value)) + .map(property::decode) + .orElseGet(property::getDefaultValue); + } + + private static Object encodeProperty(PropertyMetadata property, Object value) + { + return property.encode(property.getJavaType().cast(value)); + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseTransactionManager.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseTransactionManager.java new file mode 100644 index 000000000000..d8865ed4d68b --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseTransactionManager.java @@ -0,0 +1,86 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.inject.Inject; +import io.trino.plugin.deltalake.DeltaLakeTransactionManager; +import io.trino.plugin.hive.HiveTransactionHandle; +import io.trino.plugin.hive.HiveTransactionManager; +import io.trino.plugin.hudi.HudiTransactionManager; +import io.trino.plugin.iceberg.IcebergTransactionManager; +import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.security.ConnectorIdentity; + +import static java.util.Objects.requireNonNull; + +public final class LakehouseTransactionManager +{ + private final LakehouseTableProperties lakehouseTableProperties; + private final HiveTransactionManager hiveTransactionManager; + private final IcebergTransactionManager icebergTransactionManager; + private final DeltaLakeTransactionManager deltaTransactionManager; + private final HudiTransactionManager hudiTransactionManager; + + @Inject + public LakehouseTransactionManager( + LakehouseTableProperties lakehouseTableProperties, + HiveTransactionManager hiveTransactionManager, + IcebergTransactionManager icebergTransactionManager, + DeltaLakeTransactionManager deltaTransactionManager, + HudiTransactionManager hudiTransactionManager) + { + this.lakehouseTableProperties = requireNonNull(lakehouseTableProperties, "lakehouseTableProperties is null"); + this.hiveTransactionManager = requireNonNull(hiveTransactionManager, "hiveTransactionManager is null"); + this.icebergTransactionManager = requireNonNull(icebergTransactionManager, "icebergTransactionManager is null"); + this.deltaTransactionManager = requireNonNull(deltaTransactionManager, "deltaTransactionManager is null"); + this.hudiTransactionManager = requireNonNull(hudiTransactionManager, "hudiTransactionManager is null"); + } + + public ConnectorTransactionHandle begin() + { + ConnectorTransactionHandle handle = new HiveTransactionHandle(true); + hiveTransactionManager.begin(handle); + icebergTransactionManager.begin(handle); + deltaTransactionManager.begin(handle); + hudiTransactionManager.put(handle); + return handle; + } + + public ConnectorMetadata get(ConnectorTransactionHandle transaction, ConnectorIdentity identity) + { + return new LakehouseMetadata( + lakehouseTableProperties, + hiveTransactionManager.get(transaction, identity), + icebergTransactionManager.get(transaction, identity), + deltaTransactionManager.get(transaction, identity), + hudiTransactionManager.get(transaction, identity)); + } + + public void commit(ConnectorTransactionHandle transaction) + { + hiveTransactionManager.commit(transaction); + icebergTransactionManager.commit(transaction); + deltaTransactionManager.commit(transaction); + hudiTransactionManager.commit(transaction); + } + + public void rollback(ConnectorTransactionHandle transaction) + { + hiveTransactionManager.rollback(transaction); + icebergTransactionManager.rollback(transaction); + deltaTransactionManager.rollback(transaction); + hudiTransactionManager.rollback(transaction); + } +} diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/TableType.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/TableType.java new file mode 100644 index 000000000000..8c8420932f0a --- /dev/null +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/TableType.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +public enum TableType +{ + HIVE, ICEBERG, DELTA, HUDI +} diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/BaseLakehouseConnectorSmokeTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/BaseLakehouseConnectorSmokeTest.java new file mode 100644 index 000000000000..db3d4421f0d2 --- /dev/null +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/BaseLakehouseConnectorSmokeTest.java @@ -0,0 +1,169 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import io.trino.plugin.hive.containers.Hive3MinioDataLake; +import io.trino.plugin.hive.containers.HiveMinioDataLake; +import io.trino.testing.BaseConnectorSmokeTest; +import io.trino.testing.QueryRunner; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME; +import static io.trino.testing.QueryAssertions.copyTpchTables; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY; +import static io.trino.testing.containers.Minio.MINIO_REGION; +import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY; +import static java.util.Objects.requireNonNull; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@TestInstance(PER_CLASS) +public abstract class BaseLakehouseConnectorSmokeTest + extends BaseConnectorSmokeTest +{ + protected final String bucketName = "test-bucket-" + randomNameSuffix(); + protected final TableType tableType; + + protected BaseLakehouseConnectorSmokeTest(TableType tableType) + { + this.tableType = requireNonNull(tableType, "tableType is null"); + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + HiveMinioDataLake hiveMinio = closeAfterClass(new Hive3MinioDataLake(bucketName)); + hiveMinio.start(); + + return LakehouseQueryRunner.builder() + .addLakehouseProperty("lakehouse.table-type", tableType.name()) + .addLakehouseProperty("hive.metastore", "thrift") + .addLakehouseProperty("hive.metastore.uri", hiveMinio.getHiveMetastoreEndpoint().toString()) + .addLakehouseProperty("fs.hadoop.enabled", "true") + .addLakehouseProperty("fs.native-s3.enabled", "true") + .addLakehouseProperty("s3.aws-access-key", MINIO_ACCESS_KEY) + .addLakehouseProperty("s3.aws-secret-key", MINIO_SECRET_KEY) + .addLakehouseProperty("s3.region", MINIO_REGION) + .addLakehouseProperty("s3.endpoint", hiveMinio.getMinio().getMinioAddress()) + .addLakehouseProperty("s3.path-style-access", "true") + .addLakehouseProperty("s3.streaming.part-size", "5MB") + .build(); + } + + @BeforeAll + public void setUp() + { + computeActual("CREATE SCHEMA lakehouse.tpch WITH (location='s3://%s/tpch')".formatted(bucketName)); + copyTpchTables(getQueryRunner(), "tpch", TINY_SCHEMA_NAME, REQUIRED_TPCH_TABLES); + } + + @Test + @Override + public void testRenameSchema() + { + assertQueryFails( + "ALTER SCHEMA tpch RENAME TO tpch_" + randomNameSuffix(), + "Hive metastore does not support renaming schemas"); + } + + @Test + public void testCreateHiveTable() + { + computeActual( + """ + CREATE TABLE create_hive + WITH ( + format = 'RCBINARY', + type = 'HIVE' + ) + AS SELECT * FROM tpch.tiny.region + """); + + assertThat((String) computeScalar("SHOW CREATE TABLE create_hive")).isEqualTo( + """ + CREATE TABLE lakehouse.tpch.create_hive ( + regionkey bigint, + name varchar(25), + comment varchar(152) + ) + WITH ( + format = 'RCBINARY', + type = 'HIVE' + )"""); + + assertUpdate("DROP TABLE create_hive"); + } + + @Test + public void testCreateIcebergTable() + { + computeActual( + """ + CREATE TABLE create_iceberg + WITH ( + format = 'ORC', + type = 'ICEBERG' + ) + AS SELECT * FROM tpch.tiny.region + """); + + assertThat((String) computeScalar("SHOW CREATE TABLE create_iceberg")).matches( + """ + \\QCREATE TABLE lakehouse.tpch.create_iceberg ( + regionkey bigint, + name varchar, + comment varchar + ) + WITH ( + format = 'ORC', + format_version = 2, + location = \\E's3://test-bucket-.*/tpch/create_iceberg-.*'\\Q, + max_commit_retry = 4, + type = 'ICEBERG' + )\\E"""); + + assertUpdate("DROP TABLE create_iceberg"); + } + + @Test + public void testCreateDeltaTable() + { + computeActual( + """ + CREATE TABLE create_delta + WITH ( + type = 'DELTA' + ) + AS SELECT * FROM tpch.tiny.region + """); + + assertThat((String) computeScalar("SHOW CREATE TABLE create_delta")).matches( + """ + \\QCREATE TABLE lakehouse.tpch.create_delta ( + regionkey bigint, + name varchar, + comment varchar + ) + WITH ( + location = \\E's3://test-bucket-.*/tpch/create_delta-.*'\\Q, + type = 'DELTA' + )\\E"""); + + assertUpdate("DROP TABLE create_delta"); + } +} diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/LakehouseQueryRunner.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/LakehouseQueryRunner.java new file mode 100644 index 000000000000..8d2bda098bfa --- /dev/null +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/LakehouseQueryRunner.java @@ -0,0 +1,84 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.common.collect.ImmutableMap; +import io.airlift.log.Logging; +import io.trino.plugin.iceberg.IcebergPlugin; +import io.trino.plugin.tpcds.TpcdsPlugin; +import io.trino.plugin.tpch.TpchPlugin; +import io.trino.testing.DistributedQueryRunner; + +import static io.airlift.testing.Closeables.closeAllSuppress; +import static io.trino.testing.TestingSession.testSessionBuilder; + +public final class LakehouseQueryRunner +{ + static { + Logging.initialize(); + } + + private LakehouseQueryRunner() {} + + public static Builder builder() + { + return new Builder(); + } + + public static class Builder + extends DistributedQueryRunner.Builder + { + private final ImmutableMap.Builder lakehouseProperties = ImmutableMap.builder(); + + protected Builder() + { + super(testSessionBuilder() + .setCatalog("lakehouse") + .setSchema("tpch") + .build()); + } + + public Builder addLakehouseProperty(String key, String value) + { + lakehouseProperties.put(key, value); + return self(); + } + + @Override + public DistributedQueryRunner build() + throws Exception + { + DistributedQueryRunner queryRunner = super.build(); + try { + // needed for $iceberg_theta_stat function + queryRunner.installPlugin(new IcebergPlugin()); + + queryRunner.installPlugin(new TpchPlugin()); + queryRunner.createCatalog("tpch", "tpch"); + + queryRunner.installPlugin(new TpcdsPlugin()); + queryRunner.createCatalog("tpcds", "tpcds"); + + queryRunner.installPlugin(new LakehousePlugin()); + queryRunner.createCatalog("lakehouse", "lakehouse", lakehouseProperties.buildOrThrow()); + + return queryRunner; + } + catch (Exception e) { + closeAllSuppress(e, queryRunner); + throw e; + } + } + } +} diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseConfig.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseConfig.java new file mode 100644 index 000000000000..649d6eee43f7 --- /dev/null +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseConfig.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; +import static io.trino.plugin.lakehouse.TableType.HIVE; +import static io.trino.plugin.lakehouse.TableType.ICEBERG; + +public class TestLakehouseConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(recordDefaults(LakehouseConfig.class) + .setTableType(ICEBERG)); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = ImmutableMap.of("lakehouse.table-type", "hive"); + + LakehouseConfig expected = new LakehouseConfig() + .setTableType(HIVE); + + assertFullMapping(properties, expected); + } +} diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseConnectorTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseConnectorTest.java new file mode 100644 index 000000000000..444acf98be42 --- /dev/null +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseConnectorTest.java @@ -0,0 +1,367 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import io.trino.Session; +import io.trino.plugin.hive.containers.Hive3MinioDataLake; +import io.trino.testing.BaseConnectorTest; +import io.trino.testing.MaterializedResult; +import io.trino.testing.QueryRunner; +import io.trino.testing.TestingConnectorBehavior; +import io.trino.testing.sql.TestTable; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +import java.util.Optional; +import java.util.OptionalInt; + +import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static io.trino.testing.MaterializedResult.resultBuilder; +import static io.trino.testing.QueryAssertions.copyTpchTables; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY; +import static io.trino.testing.containers.Minio.MINIO_REGION; +import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assumptions.abort; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@TestInstance(PER_CLASS) +public class TestLakehouseConnectorTest + extends BaseConnectorTest +{ + protected final String bucketName = "test-bucket-" + randomNameSuffix(); + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + Hive3MinioDataLake hiveMinio = closeAfterClass(new Hive3MinioDataLake(bucketName)); + hiveMinio.start(); + + return LakehouseQueryRunner.builder() + .addExtraProperty("sql.path", "lakehouse.functions") + .addExtraProperty("sql.default-function-catalog", "lakehouse") + .addExtraProperty("sql.default-function-schema", "functions") + .addLakehouseProperty("hive.metastore.uri", hiveMinio.getHiveMetastoreEndpoint().toString()) + .addLakehouseProperty("fs.hadoop.enabled", "true") + .addLakehouseProperty("fs.native-s3.enabled", "true") + .addLakehouseProperty("s3.aws-access-key", MINIO_ACCESS_KEY) + .addLakehouseProperty("s3.aws-secret-key", MINIO_SECRET_KEY) + .addLakehouseProperty("s3.region", MINIO_REGION) + .addLakehouseProperty("s3.endpoint", hiveMinio.getMinio().getMinioAddress()) + .addLakehouseProperty("s3.path-style-access", "true") + .addLakehouseProperty("s3.streaming.part-size", "5MB") + .addLakehouseProperty("hive.metastore-cache-ttl", "1d") + .addLakehouseProperty("hive.metastore-refresh-interval", "1d") + .addLakehouseProperty("hive.partition-projection-enabled", "true") + .build(); + } + + @BeforeAll + public void setUp() + { + computeActual(createSchemaSql("tpch")); + computeActual(createSchemaSql("functions")); + copyTpchTables(getQueryRunner(), "tpch", TINY_SCHEMA_NAME, REQUIRED_TPCH_TABLES); + } + + @Override + protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) + { + return switch (connectorBehavior) { + case SUPPORTS_CREATE_OR_REPLACE_TABLE, + SUPPORTS_CREATE_FUNCTION, + SUPPORTS_REPORTING_WRITTEN_BYTES -> true; + case SUPPORTS_ADD_COLUMN_NOT_NULL_CONSTRAINT, + SUPPORTS_RENAME_MATERIALIZED_VIEW_ACROSS_SCHEMAS, + SUPPORTS_TOPN_PUSHDOWN -> false; + default -> super.hasBehavior(connectorBehavior); + }; + } + + @Override + protected OptionalInt maxSchemaNameLength() + { + return OptionalInt.of(128); + } + + @Override + protected OptionalInt maxTableNameLength() + { + return OptionalInt.of(128); + } + + @Override + protected String createSchemaSql(String schemaName) + { + return "CREATE SCHEMA %s WITH (location = 's3://%s/%s')".formatted(schemaName, bucketName, schemaName); + } + + @Override + protected MaterializedResult getDescribeOrdersResult() + { + return resultBuilder(getSession(), VARCHAR, VARCHAR, VARCHAR, VARCHAR) + .row("orderkey", "bigint", "", "") + .row("custkey", "bigint", "", "") + .row("orderstatus", "varchar", "", "") + .row("totalprice", "double", "", "") + .row("orderdate", "date", "", "") + .row("orderpriority", "varchar", "", "") + .row("clerk", "varchar", "", "") + .row("shippriority", "integer", "", "") + .row("comment", "varchar", "", "") + .build(); + } + + @Override + protected String errorMessageForInsertIntoNotNullColumn(String columnName) + { + return "NULL value not allowed for NOT NULL column: " + columnName; + } + + @Override + protected Session withoutSmallFileThreshold(Session session) + { + return Session.builder(session) + .setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "parquet_small_file_threshold", "0B") + .setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "orc_tiny_stripe_threshold", "0B") + .build(); + } + + @Override + protected TestTable createTableWithDefaultColumns() + { + return abort("Iceberg connector does not support column default values"); + } + + @Override + protected void verifyConcurrentUpdateFailurePermissible(Exception e) + { + assertThat(e).hasMessageMatching("Failed to commit( the transaction)? during write.*"); + } + + @Override + protected void verifyConcurrentAddColumnFailurePermissible(Exception e) + { + assertThat(e).hasMessageMatching( + "Failed to add column: Metadata location \\[.*] is not same as table metadata location \\[.*] for tpch.test_add_column.*"); + } + + @Override + protected void verifySetColumnTypeFailurePermissible(Throwable e) + { + assertThat(e).hasMessageMatching(".*(Failed to set column type: Cannot change (column type:|type from .* to )" + + "|Time(stamp)? precision \\(3\\) not supported for Iceberg. Use \"time(stamp)?\\(6\\)\" instead" + + "|Type not supported for Iceberg: smallint|char\\(20\\)).*"); + } + + @Override + protected void verifySetFieldTypeFailurePermissible(Throwable e) + { + assertThat(e).hasMessageMatching(".*(Failed to set field type: Cannot change (column type:|type from .* to )" + + "|Time(stamp)? precision \\(3\\) not supported for Iceberg. Use \"time(stamp)?\\(6\\)\" instead" + + "|Type not supported for Iceberg: smallint|char\\(20\\)" + + "|Iceberg doesn't support changing field type (from|to) non-primitive types).*"); + } + + @Override + protected void verifyVersionedQueryFailurePermissible(Exception e) + { + assertThat(e).hasMessageMatching("Version pointer type is not supported: .*|" + + "Unsupported type for temporal table version: .*|" + + "Unsupported type for table version: .*|" + + "No version history table tpch.nation at or before .*|" + + "Iceberg snapshot ID does not exists: .*|" + + "Cannot find snapshot with reference name: .*"); + } + + @Override + protected Optional filterDataMappingSmokeTestData(DataMappingTestSetup dataMappingTestSetup) + { + String typeName = dataMappingTestSetup.getTrinoTypeName(); + if (typeName.equals("char(3)")) { + return Optional.of(new DataMappingTestSetup(typeName, "'ab '", dataMappingTestSetup.getHighValueLiteral())); + } + return Optional.of(dataMappingTestSetup); + } + + @Override + protected Optional filterSetColumnTypesDataProvider(SetColumnTypeSetup setup) + { + if (setup.sourceColumnType().equals("timestamp(3) with time zone")) { + return Optional.of(setup.withNewValueLiteral("TIMESTAMP '2020-02-12 14:03:00.123000 +00:00'")); + } + switch ("%s -> %s".formatted(setup.sourceColumnType(), setup.newColumnType())) { + case "row(x integer) -> row(y integer)": + return Optional.of(setup.withNewValueLiteral("NULL")); + case "tinyint -> smallint": + case "bigint -> integer": + case "decimal(5,3) -> decimal(5,2)": + case "varchar -> char(20)": + case "time(6) -> time(3)": + case "timestamp(6) -> timestamp(3)": + case "array(integer) -> array(bigint)": + return Optional.of(setup.asUnsupported()); + case "varchar(100) -> varchar(50)": + return Optional.empty(); + } + return Optional.of(setup); + } + + @Override + protected Optional filterSetFieldTypesDataProvider(SetColumnTypeSetup setup) + { + switch ("%s -> %s".formatted(setup.sourceColumnType(), setup.newColumnType())) { + case "tinyint -> smallint": + case "bigint -> integer": + case "decimal(5,3) -> decimal(5,2)": + case "varchar -> char(20)": + case "time(6) -> time(3)": + case "timestamp(6) -> timestamp(3)": + case "array(integer) -> array(bigint)": + case "row(x integer) -> row(x bigint)": + case "row(x integer) -> row(y integer)": + case "row(x integer, y integer) -> row(x integer, z integer)": + case "row(x integer) -> row(x integer, y integer)": + case "row(x integer, y integer) -> row(x integer)": + case "row(x integer, y integer) -> row(y integer, x integer)": + case "row(x integer, y integer) -> row(z integer, y integer, x integer)": + case "row(x row(nested integer)) -> row(x row(nested bigint))": + case "row(x row(a integer, b integer)) -> row(x row(b integer, a integer))": + return Optional.of(setup.asUnsupported()); + case "varchar(100) -> varchar(50)": + return Optional.empty(); + } + return Optional.of(setup); + } + + @Disabled("Long names cause metastore timeouts") + @Test + @Override + public void testCreateTableWithLongTableName() {} + + @Disabled("Long names cause metastore timeouts") + @Test + @Override + public void testRenameTableToLongTableName() {} + + @Disabled("Long names cause metastore timeouts") + @Test + @Override + public void testCreateSchemaWithLongName() {} + + @Disabled("Long names cause metastore timeouts") + @Test + @Override + public void testRenameSchemaToLongName() {} + + @Test + @Override + public void testDropRowFieldWhenDuplicates() + { + assertThatThrownBy(super::testDropRowFieldWhenDuplicates) + .hasMessage("Field name 'a' specified more than once"); + } + + @Test + @Override + public void testDropAmbiguousRowFieldCaseSensitivity() + { + assertThatThrownBy(super::testDropAmbiguousRowFieldCaseSensitivity) + .hasMessage("Field name 'some_field' specified more than once"); + } + + @Test + @Override + public void testSetFieldMapKeyType() + { + assertThatThrownBy(super::testSetFieldMapKeyType) + .hasMessageContaining("Failed to set field type: Cannot alter map keys"); + } + + @Test + @Override + public void testSetNestedFieldMapKeyType() + { + assertThatThrownBy(super::testSetNestedFieldMapKeyType) + .hasMessageContaining("Failed to set field type: Cannot alter map keys"); + } + + @Test + @Override + public void testRenameSchema() + { + assertQueryFails("ALTER SCHEMA tpch RENAME TO tpch_renamed", "Hive metastore does not support renaming schemas"); + } + + @Test + @Override + public void testCharVarcharComparison() + { + try (TestTable table = newTrinoTable( + "test_char_varchar", + "(k, v) AS VALUES" + + " (-1, CAST(NULL AS CHAR(3))), " + + " (3, CAST(' ' AS CHAR(3)))," + + " (6, CAST('x ' AS CHAR(3)))")) { + assertThat(query("SELECT k, v FROM " + table.getName() + " WHERE v = CAST(' ' AS varchar(2))")).returnsEmptyResult(); + assertThat(query("SELECT k, v FROM " + table.getName() + " WHERE v = CAST(' ' AS varchar(4))")).returnsEmptyResult(); + assertThat(query("SELECT k, v FROM " + table.getName() + " WHERE v = CAST('x ' AS varchar(2))")).returnsEmptyResult(); + assertQuery("SELECT k, v FROM " + table.getName() + " WHERE v = CAST(' ' AS varchar(3))", "VALUES (3, ' ')"); + } + } + + @Test + @Override + public void testShowCreateSchema() + { + assertThat(computeActual("SHOW CREATE SCHEMA tpch").getOnlyValue().toString()).matches( + """ + \\QCREATE SCHEMA lakehouse.tpch + WITH ( + location = \\E's3://test-bucket-.*/tpch'\\Q + )\\E"""); + } + + @Override + @Test + public void testShowCreateTable() + { + assertThat((String) computeActual("SHOW CREATE TABLE orders").getOnlyValue()).matches( + """ + \\QCREATE TABLE lakehouse.tpch.orders ( + orderkey bigint, + custkey bigint, + orderstatus varchar, + totalprice double, + orderdate date, + orderpriority varchar, + clerk varchar, + shippriority integer, + comment varchar + ) + WITH ( + format = 'PARQUET', + format_version = 2, + location = \\E's3://test-bucket-.*/tpch/orders-.*'\\Q, + max_commit_retry = 4, + type = 'ICEBERG' + )\\E"""); + } +} diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java new file mode 100644 index 000000000000..955bdd2da1ee --- /dev/null +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import org.junit.jupiter.api.Test; + +import static io.trino.plugin.lakehouse.TableType.DELTA; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestLakehouseDeltaConnectorSmokeTest + extends BaseLakehouseConnectorSmokeTest +{ + protected TestLakehouseDeltaConnectorSmokeTest() + { + super(DELTA); + } + + @Test + @Override + public void testRenameTable() + { + assertThatThrownBy(super::testRenameTable) + .hasMessage("Renaming managed tables is not allowed with current metastore configuration"); + } + + @Test + @Override + public void testRenameTableAcrossSchemas() + { + assertThatThrownBy(super::testRenameTableAcrossSchemas) + .hasMessage("Renaming managed tables is not allowed with current metastore configuration"); + } + + @Test + @Override + public void testShowCreateTable() + { + assertThat((String) computeScalar("SHOW CREATE TABLE region")).matches( + """ + \\QCREATE TABLE lakehouse.tpch.region ( + regionkey bigint, + name varchar, + comment varchar + ) + WITH ( + location = \\E's3://test-bucket-.*/tpch/region.*'\\Q, + type = 'DELTA' + )\\E"""); + } +} diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseFileConnectorSmokeTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseFileConnectorSmokeTest.java new file mode 100644 index 000000000000..75bcc6e25e29 --- /dev/null +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseFileConnectorSmokeTest.java @@ -0,0 +1,107 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import io.trino.testing.BaseConnectorSmokeTest; +import io.trino.testing.QueryRunner; +import io.trino.testing.containers.MotoContainer; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME; +import static io.trino.testing.QueryAssertions.copyTpchTables; +import static io.trino.testing.containers.MotoContainer.MOTO_ACCESS_KEY; +import static io.trino.testing.containers.MotoContainer.MOTO_REGION; +import static io.trino.testing.containers.MotoContainer.MOTO_SECRET_KEY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@TestInstance(PER_CLASS) +public class TestLakehouseFileConnectorSmokeTest + extends BaseConnectorSmokeTest +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + MotoContainer moto = closeAfterClass(new MotoContainer()); + moto.start(); + moto.createBucket("test-bucket"); + + return LakehouseQueryRunner.builder() + .addLakehouseProperty("hive.metastore", "file") + .addLakehouseProperty("hive.metastore.catalog.dir", "s3://test-bucket/") + .addLakehouseProperty("fs.native-s3.enabled", "true") + .addLakehouseProperty("s3.region", MOTO_REGION) + .addLakehouseProperty("s3.endpoint", moto.getEndpoint().toString()) + .addLakehouseProperty("s3.aws-access-key", MOTO_ACCESS_KEY) + .addLakehouseProperty("s3.aws-secret-key", MOTO_SECRET_KEY) + .addLakehouseProperty("s3.path-style-access", "true") + .build(); + } + + @BeforeAll + public void setUp() + { + computeActual("CREATE SCHEMA lakehouse.tpch WITH (location='s3://test-bucket/tpch')"); + copyTpchTables(getQueryRunner(), "tpch", TINY_SCHEMA_NAME, REQUIRED_TPCH_TABLES); + } + + @Test + @Override + public void testRenameSchema() + { + assertThatThrownBy(super::testRenameSchema) + .hasMessage("Could not rename database schema") + .hasRootCauseMessage("S3 does not support renames"); + } + + @Test + @Override + public void testRenameTable() + { + assertThatThrownBy(super::testRenameTable) + .hasMessageMatching("S3 does not support( directory)? renames"); + } + + @Test + @Override + public void testRenameTableAcrossSchemas() + { + assertThatThrownBy(super::testRenameTableAcrossSchemas) + .hasMessageMatching("S3 does not support( directory)? renames"); + } + + @Test + @Override + public void testShowCreateTable() + { + assertThat((String) computeScalar("SHOW CREATE TABLE region")).matches( + """ + \\QCREATE TABLE lakehouse.tpch.region ( + regionkey bigint, + name varchar, + comment varchar + ) + WITH ( + format = 'PARQUET', + format_version = 2, + location = 's3://test-bucket/tpch/region-\\E.*\\Q', + max_commit_retry = 4, + type = 'ICEBERG' + )\\E"""); + } +} diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java new file mode 100644 index 000000000000..1745312c5c41 --- /dev/null +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java @@ -0,0 +1,69 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import io.trino.testing.TestingConnectorBehavior; +import io.trino.testing.sql.TestTable; +import org.junit.jupiter.api.Test; + +import static io.trino.plugin.lakehouse.TableType.HIVE; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestLakehouseHiveConnectorSmokeTest + extends BaseLakehouseConnectorSmokeTest +{ + protected TestLakehouseHiveConnectorSmokeTest() + { + super(HIVE); + } + + @SuppressWarnings("SwitchStatementWithTooFewBranches") + @Override + protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) + { + return switch (connectorBehavior) { + case SUPPORTS_TRUNCATE -> false; + default -> super.hasBehavior(connectorBehavior); + }; + } + + @Override + protected TestTable newTrinoTable(String namePrefix, String tableDefinition) + { + if (tableDefinition.startsWith("(")) { + tableDefinition += " WITH (transactional = true)"; + } + else { + tableDefinition = "WITH (transactional = true) " + tableDefinition; + } + return super.newTrinoTable(namePrefix, tableDefinition); + } + + @Test + @Override + public void testShowCreateTable() + { + assertThat((String) computeScalar("SHOW CREATE TABLE region")).isEqualTo( + """ + CREATE TABLE lakehouse.tpch.region ( + regionkey bigint, + name varchar(25), + comment varchar(152) + ) + WITH ( + format = 'ORC', + type = 'HIVE' + )"""); + } +} diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java new file mode 100644 index 000000000000..eb63da366585 --- /dev/null +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java @@ -0,0 +1,48 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import org.junit.jupiter.api.Test; + +import static io.trino.plugin.lakehouse.TableType.ICEBERG; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestLakehouseIcebergConnectorSmokeTest + extends BaseLakehouseConnectorSmokeTest +{ + protected TestLakehouseIcebergConnectorSmokeTest() + { + super(ICEBERG); + } + + @Test + @Override + public void testShowCreateTable() + { + assertThat((String) computeScalar("SHOW CREATE TABLE region")).matches( + """ + \\QCREATE TABLE lakehouse.tpch.region ( + regionkey bigint, + name varchar, + comment varchar + ) + WITH ( + format = 'PARQUET', + format_version = 2, + location = \\E's3://test-bucket-.*/tpch/region-.*'\\Q, + max_commit_retry = 4, + type = 'ICEBERG' + )\\E"""); + } +} diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseMetadata.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseMetadata.java new file mode 100644 index 000000000000..c36072a890de --- /dev/null +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseMetadata.java @@ -0,0 +1,71 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import com.google.common.collect.ImmutableSet; +import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.JoinStatistics; +import io.trino.spi.connector.JoinType; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.expression.ConnectorExpression; +import io.trino.spi.function.BoundSignature; +import io.trino.spi.function.FunctionId; +import io.trino.spi.function.SchemaFunctionName; +import io.trino.spi.function.table.ConnectorTableFunctionHandle; +import io.trino.spi.predicate.TupleDomain; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Method; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static io.trino.spi.testing.InterfaceTestUtils.assertAllMethodsOverridden; + +public class TestLakehouseMetadata +{ + private static final Set EXCLUSIONS; + + static { + try { + EXCLUSIONS = ImmutableSet.builder() + .add(ConnectorMetadata.class.getMethod("supportsMissingColumnsOnInsert")) + .add(ConnectorMetadata.class.getMethod("refreshMaterializedView", ConnectorSession.class, SchemaTableName.class)) + .add(ConnectorMetadata.class.getMethod("resolveIndex", ConnectorSession.class, ConnectorTableHandle.class, Set.class, Set.class, TupleDomain.class)) + .add(ConnectorMetadata.class.getMethod("listFunctions", ConnectorSession.class, String.class)) + .add(ConnectorMetadata.class.getMethod("getFunctions", ConnectorSession.class, SchemaFunctionName.class)) + .add(ConnectorMetadata.class.getMethod("getFunctionMetadata", ConnectorSession.class, FunctionId.class)) + .add(ConnectorMetadata.class.getMethod("getAggregationFunctionMetadata", ConnectorSession.class, FunctionId.class)) + .add(ConnectorMetadata.class.getMethod("getFunctionDependencies", ConnectorSession.class, FunctionId.class, BoundSignature.class)) + .add(ConnectorMetadata.class.getMethod("applyJoin", ConnectorSession.class, JoinType.class, ConnectorTableHandle.class, ConnectorTableHandle.class, ConnectorExpression.class, Map.class, Map.class, JoinStatistics.class)) + .add(ConnectorMetadata.class.getMethod("applyJoin", ConnectorSession.class, JoinType.class, ConnectorTableHandle.class, ConnectorTableHandle.class, List.class, Map.class, Map.class, JoinStatistics.class)) + .add(ConnectorMetadata.class.getMethod("applyTableFunction", ConnectorSession.class, ConnectorTableFunctionHandle.class)) + .add(ConnectorMetadata.class.getMethod("applyTableScanRedirect", ConnectorSession.class, ConnectorTableHandle.class)) + .add(ConnectorMetadata.class.getMethod("redirectTable", ConnectorSession.class, SchemaTableName.class)) + .add(ConnectorMetadata.class.getMethod("getMaxWriterTasks", ConnectorSession.class)) + .build(); + } + catch (NoSuchMethodException e) { + throw new AssertionError(e); + } + } + + @Test + void testMethodsImplemented() + { + assertAllMethodsOverridden(ConnectorMetadata.class, LakehouseMetadata.class, EXCLUSIONS); + } +} diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseMotoConnectorSmokeTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseMotoConnectorSmokeTest.java new file mode 100644 index 000000000000..01a94498121e --- /dev/null +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseMotoConnectorSmokeTest.java @@ -0,0 +1,85 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.lakehouse; + +import io.trino.testing.BaseConnectorSmokeTest; +import io.trino.testing.QueryRunner; +import io.trino.testing.containers.MotoContainer; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME; +import static io.trino.testing.QueryAssertions.copyTpchTables; +import static io.trino.testing.containers.MotoContainer.MOTO_ACCESS_KEY; +import static io.trino.testing.containers.MotoContainer.MOTO_REGION; +import static io.trino.testing.containers.MotoContainer.MOTO_SECRET_KEY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@TestInstance(PER_CLASS) +public class TestLakehouseMotoConnectorSmokeTest + extends BaseConnectorSmokeTest +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + MotoContainer moto = closeAfterClass(new MotoContainer()); + moto.start(); + moto.createBucket("test-bucket"); + + return LakehouseQueryRunner.builder() + .addLakehouseProperty("hive.metastore", "glue") + .addLakehouseProperty("hive.metastore.glue.region", MOTO_REGION) + .addLakehouseProperty("hive.metastore.glue.endpoint-url", moto.getEndpoint().toString()) + .addLakehouseProperty("hive.metastore.glue.aws-access-key", MOTO_ACCESS_KEY) + .addLakehouseProperty("hive.metastore.glue.aws-secret-key", MOTO_SECRET_KEY) + .addLakehouseProperty("hive.metastore.glue.default-warehouse-dir", "s3://test-bucket/") + .addLakehouseProperty("fs.native-s3.enabled", "true") + .addLakehouseProperty("s3.region", MOTO_REGION) + .addLakehouseProperty("s3.endpoint", moto.getEndpoint().toString()) + .addLakehouseProperty("s3.aws-access-key", MOTO_ACCESS_KEY) + .addLakehouseProperty("s3.aws-secret-key", MOTO_SECRET_KEY) + .addLakehouseProperty("s3.path-style-access", "true") + .build(); + } + + @BeforeAll + public void setUp() + { + computeActual("CREATE SCHEMA lakehouse.tpch WITH (location='s3://test-bucket/tpch')"); + copyTpchTables(getQueryRunner(), "tpch", TINY_SCHEMA_NAME, REQUIRED_TPCH_TABLES); + } + + @Test + @Override + public void testShowCreateTable() + { + assertThat((String) computeScalar("SHOW CREATE TABLE region")).matches( + """ + \\QCREATE TABLE lakehouse.tpch.region ( + regionkey bigint, + name varchar, + comment varchar + ) + WITH ( + format = 'PARQUET', + format_version = 2, + location = 's3://test-bucket/tpch/region-\\E.*\\Q', + max_commit_retry = 4, + type = 'ICEBERG' + )\\E"""); + } +} diff --git a/pom.xml b/pom.xml index 5441c379497b..ab52b8608a9d 100644 --- a/pom.xml +++ b/pom.xml @@ -87,6 +87,7 @@ plugin/trino-jmx plugin/trino-kafka plugin/trino-kafka-event-listener + plugin/trino-lakehouse plugin/trino-ldap-group-provider plugin/trino-loki plugin/trino-mariadb