schema =
+ new SchemaManager(fileIO, tablePath, branchName).latest();
+ if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
+ schema =
+ schema.map(
+ s -> {
+ Options branchOptions = new Options(s.options());
+ branchOptions.set(CoreOptions.BRANCH, branchName);
+ return s.copy(branchOptions.toMap());
+ });
+ }
+ schema.ifPresent(s -> s.options().put(PATH.key(), tablePath.toString()));
+ return schema;
+ }
+
+ protected Path newDatabasePathInFileSystem(String database)
+ {
+ return newDatabasePathInFileSystem(warehouse(), database);
+ }
+
+ protected Path newDatabasePathInFileSystem(String warehouse, String database)
+ {
+ return new Path(warehouse, database + DB_SUFFIX);
+ }
+
+ protected Path getTableLocationInFileSystem(Identifier identifier)
+ {
+ return new Path(newDatabasePathInFileSystem(identifier.getDatabaseName()), identifier.getTableName());
+ }
+
+ @Override
+ public PaimonConfig config()
+ {
+ return config;
+ }
+}
diff --git a/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/PaimonCatalogModule.java b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/PaimonCatalogModule.java
new file mode 100644
index 000000000000..67847bbc6466
--- /dev/null
+++ b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/PaimonCatalogModule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon.catalog;
+
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import io.airlift.configuration.AbstractConfigurationAwareModule;
+import io.trino.plugin.paimon.CatalogType;
+import io.trino.plugin.paimon.PaimonConfig;
+import io.trino.plugin.paimon.catalog.file.PaimonFileSystemCatalogModule;
+import io.trino.plugin.paimon.catalog.hms.PaimonHiveCatalogModule;
+
+import static io.airlift.configuration.ConditionalModule.conditionalModule;
+import static io.trino.plugin.paimon.CatalogType.FILESYSTEM;
+import static io.trino.plugin.paimon.CatalogType.HIVE;
+
+public class PaimonCatalogModule
+ extends AbstractConfigurationAwareModule
+{
+ @Override
+ protected void setup(Binder binder)
+ {
+ bindCatalogModule(FILESYSTEM, new PaimonFileSystemCatalogModule());
+ bindCatalogModule(HIVE, new PaimonHiveCatalogModule());
+ }
+
+ private void bindCatalogModule(CatalogType catalogType, Module module)
+ {
+ // TODO: enable more types of catalog
+ install(conditionalModule(
+ PaimonConfig.class,
+ config -> config.getCatalogType() == catalogType,
+ module));
+ }
+}
diff --git a/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/TrinoCatalog.java b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/TrinoCatalog.java
new file mode 100644
index 000000000000..d70e4be0f0e7
--- /dev/null
+++ b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/TrinoCatalog.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon.catalog;
+
+import io.trino.plugin.paimon.PaimonConfig;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.SchemaTableName;
+import org.apache.paimon.table.Table;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * An interface to allow different Paimon catalog implementations in PaimonMetadata.
+ *
+ * It mimics the Paimon catalog interface, with the following modifications:
+ *
+ * - ConnectorSession is added at the front of each method signature, similar to Iceberg's TrinoCatalog
+ *
+ */
+public interface TrinoCatalog
+{
+ String DB_SUFFIX = ".db";
+
+ boolean databaseExists(ConnectorSession session, String database);
+
+ List listDatabases(ConnectorSession session);
+
+ Table loadTable(ConnectorSession session, SchemaTableName schemaTableName);
+
+ List listTables(ConnectorSession session, Optional namespace);
+
+ String warehouse();
+
+ PaimonConfig config();
+}
diff --git a/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/TrinoCatalogFactory.java b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/TrinoCatalogFactory.java
new file mode 100644
index 000000000000..40021f82b1b9
--- /dev/null
+++ b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/TrinoCatalogFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon.catalog;
+
+import io.trino.spi.security.ConnectorIdentity;
+
+public interface TrinoCatalogFactory
+{
+ TrinoCatalog create(ConnectorIdentity identity);
+}
diff --git a/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/file/PaimonFileSystemCatalogModule.java b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/file/PaimonFileSystemCatalogModule.java
new file mode 100644
index 000000000000..3bfb42d6a03b
--- /dev/null
+++ b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/file/PaimonFileSystemCatalogModule.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.paimon.catalog.file;
+
+import com.google.inject.Binder;
+import com.google.inject.Scopes;
+import io.airlift.configuration.AbstractConfigurationAwareModule;
+import io.trino.plugin.paimon.catalog.TrinoCatalogFactory;
+
+public class PaimonFileSystemCatalogModule
+ extends AbstractConfigurationAwareModule
+{
+ @Override
+ protected void setup(Binder binder)
+ {
+ binder.bind(TrinoCatalogFactory.class).to(TrinoFileSystemCatalogFactory.class).in(Scopes.SINGLETON);
+ }
+}
diff --git a/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/file/TrinoFileSystemCatalogFactory.java b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/file/TrinoFileSystemCatalogFactory.java
new file mode 100644
index 000000000000..4706e8d2cad0
--- /dev/null
+++ b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/file/TrinoFileSystemCatalogFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.paimon.catalog.file;
+
+import com.google.inject.Inject;
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.plugin.paimon.PaimonConfig;
+import io.trino.plugin.paimon.catalog.TrinoCatalog;
+import io.trino.plugin.paimon.catalog.TrinoCatalogFactory;
+import io.trino.spi.security.ConnectorIdentity;
+
+import static java.util.Objects.requireNonNull;
+
+public class TrinoFileSystemCatalogFactory
+ implements TrinoCatalogFactory
+{
+ private final PaimonConfig paimonConfig;
+ private final TrinoFileSystemFactory fileSystemFactory;
+
+ @Inject
+ public TrinoFileSystemCatalogFactory(
+ PaimonConfig config,
+ TrinoFileSystemFactory fileSystemFactory)
+ {
+ this.paimonConfig = requireNonNull(config, "config is null");
+ this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
+ }
+
+ @Override
+ public TrinoCatalog create(ConnectorIdentity identity)
+ {
+ return new TrinoFileSystemPaimonCatalog(
+ paimonConfig,
+ fileSystemFactory);
+ }
+}
diff --git a/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/file/TrinoFileSystemPaimonCatalog.java b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/file/TrinoFileSystemPaimonCatalog.java
new file mode 100644
index 000000000000..b13935f2828f
--- /dev/null
+++ b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/file/TrinoFileSystemPaimonCatalog.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.paimon.catalog.file;
+
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.plugin.paimon.PaimonConfig;
+import io.trino.plugin.paimon.catalog.AbstractPaimonTrinoCatalog;
+import io.trino.plugin.paimon.fileio.PaimonFileIO;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.connector.TableNotFoundException;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.Table;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.paimon.CoreOptions.PATH;
+
+public class TrinoFileSystemPaimonCatalog
+ extends AbstractPaimonTrinoCatalog
+{
+ public TrinoFileSystemPaimonCatalog(
+ PaimonConfig config,
+ TrinoFileSystemFactory fileSystemFactory)
+ {
+ super(config, fileSystemFactory);
+ }
+
+ @Override
+ public boolean databaseExists(ConnectorSession session, String database)
+ {
+ FileIO fileIO = getFileIO(session);
+ return uncheck(() -> fileIO.exists(newDatabasePathInFileSystem(database)));
+ }
+
+ @Override
+ public List listDatabases(ConnectorSession session)
+ {
+ return listDatabasesInFileSystem(session);
+ }
+
+ @Override
+ public List listTables(ConnectorSession session, Optional namespace)
+ {
+ if (namespace.isPresent()) {
+ return listTablesInFileSystem(session, newDatabasePathInFileSystem(namespace.get()));
+ }
+ else {
+ return listDatabases(session).stream()
+ .flatMap(db -> listTablesInFileSystem(session, newDatabasePathInFileSystem(db)).stream())
+ .toList();
+ }
+ }
+
+ @Override
+ public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName)
+ {
+ Identifier identifier = Identifier.create(schemaTableName.getSchemaName(), schemaTableName.getTableName());
+ Path tablePath = new Path(newDatabasePathInFileSystem(identifier.getDatabaseName()), identifier.getTableName());
+ PaimonFileIO fileIO = new PaimonFileIO(fileSystemFactory.create(session), tablePath);
+ TableSchema schema = loadTableSchema(fileIO, identifier).orElseThrow(() -> new TableNotFoundException(schemaTableName));
+ Path path = new Path(schema.options().get(PATH.key()));
+
+ return FileStoreTableFactory.create(fileIO, path, schema);
+ }
+
+ public Optional loadTableSchema(FileIO fileIO, Identifier identifier)
+ {
+ Path tablePath = new Path(newDatabasePathInFileSystem(identifier.getDatabaseName()), identifier.getObjectName());
+ Optional schema = new SchemaManager(fileIO, tablePath).latest();
+ schema.ifPresent(s -> s.options().put(PATH.key(), tablePath.toString()));
+ return schema;
+ }
+}
diff --git a/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/hms/PaimonHiveCatalogModule.java b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/hms/PaimonHiveCatalogModule.java
new file mode 100644
index 000000000000..a4e8132e9c5a
--- /dev/null
+++ b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/hms/PaimonHiveCatalogModule.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.paimon.catalog.hms;
+
+import com.google.inject.Binder;
+import com.google.inject.Scopes;
+import io.airlift.configuration.AbstractConfigurationAwareModule;
+import io.trino.plugin.hive.metastore.HiveMetastoreModule;
+import io.trino.plugin.paimon.catalog.TrinoCatalogFactory;
+
+import java.util.Optional;
+
+public class PaimonHiveCatalogModule
+ extends AbstractConfigurationAwareModule
+{
+ @Override
+ protected void setup(Binder binder)
+ {
+ binder.bind(TrinoCatalogFactory.class).to(TrinoHiveCatalogFactory.class).in(Scopes.SINGLETON);
+
+ install(new HiveMetastoreModule(Optional.empty(), false));
+ }
+}
diff --git a/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/hms/TrinoHiveCatalogFactory.java b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/hms/TrinoHiveCatalogFactory.java
new file mode 100644
index 000000000000..a30cc48556ad
--- /dev/null
+++ b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/hms/TrinoHiveCatalogFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.paimon.catalog.hms;
+
+import com.google.inject.Inject;
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.metastore.HiveMetastoreFactory;
+import io.trino.metastore.cache.CachingHiveMetastore;
+import io.trino.plugin.paimon.PaimonConfig;
+import io.trino.plugin.paimon.catalog.TrinoCatalog;
+import io.trino.plugin.paimon.catalog.TrinoCatalogFactory;
+import io.trino.spi.security.ConnectorIdentity;
+
+import java.util.Optional;
+
+import static io.trino.metastore.cache.CachingHiveMetastore.createPerTransactionCache;
+import static java.util.Objects.requireNonNull;
+
+public class TrinoHiveCatalogFactory
+ implements TrinoCatalogFactory
+{
+ private final PaimonConfig paimonConfig;
+ private final HiveMetastoreFactory metastoreFactory;
+ private final TrinoFileSystemFactory fileSystemFactory;
+
+ @Inject
+ public TrinoHiveCatalogFactory(
+ PaimonConfig config,
+ HiveMetastoreFactory metastoreFactory,
+ TrinoFileSystemFactory fileSystemFactory)
+ {
+ this.paimonConfig = requireNonNull(config, "config is null");
+ this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null");
+ this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
+ }
+
+ @Override
+ public TrinoCatalog create(ConnectorIdentity identity)
+ {
+ CachingHiveMetastore metastore = createPerTransactionCache(metastoreFactory.createMetastore(Optional.of(identity)), 1000);
+
+ return new TrinoHivePaimonCatalog(
+ paimonConfig,
+ metastore,
+ fileSystemFactory);
+ }
+}
diff --git a/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/hms/TrinoHivePaimonCatalog.java b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/hms/TrinoHivePaimonCatalog.java
new file mode 100644
index 000000000000..a7df9e72bd18
--- /dev/null
+++ b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/catalog/hms/TrinoHivePaimonCatalog.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon.catalog.hms;
+
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.metastore.Database;
+import io.trino.metastore.cache.CachingHiveMetastore;
+import io.trino.plugin.hive.TableType;
+import io.trino.plugin.paimon.PaimonConfig;
+import io.trino.plugin.paimon.catalog.AbstractPaimonTrinoCatalog;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.SchemaNotFoundException;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.connector.TableNotFoundException;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.catalog.TableMetadata;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.Table;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.paimon.CoreOptions.PATH;
+
+public class TrinoHivePaimonCatalog
+ extends AbstractPaimonTrinoCatalog
+{
+ private static final String INPUT_FORMAT_CLASS_NAME =
+ "org.apache.paimon.hive.mapred.PaimonInputFormat";
+ private static final String OUTPUT_FORMAT_CLASS_NAME =
+ "org.apache.paimon.hive.mapred.PaimonOutputFormat";
+
+ private final CachingHiveMetastore metastore;
+
+ public TrinoHivePaimonCatalog(
+ PaimonConfig config,
+ CachingHiveMetastore metastore,
+ TrinoFileSystemFactory fileSystemFactory)
+ {
+ super(
+ config,
+ fileSystemFactory);
+
+ this.metastore = requireNonNull(metastore, "metastore is null");
+ }
+
+ private static boolean isPaimonTable(io.trino.metastore.Table table)
+ {
+ return INPUT_FORMAT_CLASS_NAME.equals(table.getStorage().getStorageFormat().getInputFormat())
+ && OUTPUT_FORMAT_CLASS_NAME.equals(table.getStorage().getStorageFormat().getOutputFormat());
+ }
+
+ @Override
+ public boolean databaseExists(ConnectorSession session, String database)
+ {
+ return metastore.getDatabase(database).isPresent();
+ }
+
+ @Override
+ public List listDatabases(ConnectorSession session)
+ {
+ return metastore.getAllDatabases();
+ }
+
+ @Override
+ public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName)
+ {
+ Identifier identifier = Identifier.create(schemaTableName.getSchemaName(), schemaTableName.getTableName());
+ FileIO fileIO = getFileIO(session);
+ TableMetadata tableMetadata = loadTableMetadata(fileIO, identifier);
+ Path path = new Path(tableMetadata.schema().options().get(PATH.key()));
+ CatalogEnvironment catalogEnv =
+ new CatalogEnvironment(
+ identifier,
+ tableMetadata.uuid(),
+ null,
+ null,
+ null,
+ false);
+ return FileStoreTableFactory.create(fileIO, path, tableMetadata.schema(), catalogEnv);
+ }
+
+ @Override
+ public List listTables(ConnectorSession session, Optional namespace)
+ {
+ if (namespace.isPresent()) {
+ String databaseName = namespace.get();
+ if (!databaseExists(session, databaseName)) {
+ throw new SchemaNotFoundException("Database does not exist: " + databaseName);
+ }
+ return metastore.getTables(databaseName).stream().map(t -> t.tableName().getTableName()).collect(Collectors.toList());
+ }
+
+ return metastore.getAllDatabases().stream().flatMap(name -> metastore.getTables(name).stream())
+ .map(t -> t.tableName().getTableName())
+ .collect(Collectors.toList());
+ }
+
+ protected TableMetadata loadTableMetadata(FileIO fileIO, Identifier identifier)
+ {
+ return loadTableMetadata(fileIO, identifier, getHmsTable(identifier));
+ }
+
+ public io.trino.metastore.Table getHmsTable(Identifier identifier)
+ {
+ return metastore.getTable(identifier.getDatabaseName(), identifier.getTableName()).orElseThrow(() -> new TableNotFoundException(SchemaTableName.schemaTableName(identifier.getDatabaseName(), identifier.getTableName())));
+ }
+
+ private TableMetadata loadTableMetadata(FileIO fileIO, Identifier identifier, io.trino.metastore.Table table)
+ {
+ return new TableMetadata(
+ loadTableSchema(fileIO, identifier, table),
+ isExternalTable(table),
+ identifier.getFullName() + "." + table.getParameters().get("transient_lastDdlTime"));
+ }
+
+ private TableSchema loadTableSchema(FileIO fileIO, Identifier identifier, io.trino.metastore.Table table)
+ {
+ if (isPaimonTable(table)) {
+ return tableSchemaInFileSystem(
+ fileIO,
+ getTableLocation(identifier, table),
+ identifier.getBranchNameOrDefault())
+ .orElseThrow(() -> new TableNotFoundException(SchemaTableName.schemaTableName(identifier.getDatabaseName(), identifier.getTableName())));
+ }
+
+ throw new TableNotFoundException(SchemaTableName.schemaTableName(identifier.getDatabaseName(), identifier.getTableName()));
+ }
+
+ private boolean isExternalTable(io.trino.metastore.Table table)
+ {
+ return table != null && TableType.EXTERNAL_TABLE.name().equals(table.getTableType());
+ }
+
+ private Path getTableLocation(Identifier identifier, @Nullable io.trino.metastore.Table table)
+ {
+ Optional tableLocationString = table.getStorage().getOptionalLocation();
+ if (tableLocationString.isPresent()) {
+ return new Path(tableLocationString.get());
+ }
+ String databaseName = identifier.getDatabaseName();
+ String tableName = identifier.getTableName();
+ Database database = metastore.getDatabase(databaseName).orElseThrow(() -> new SchemaNotFoundException("Database does not exist: " + databaseName));
+ Optional tableLocationByDatabase = database.getLocation().map(databaseLocation -> new Path(databaseLocation, tableName));
+ return tableLocationByDatabase.orElse(super.getTableLocationInFileSystem(identifier));
+ }
+}
diff --git a/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/fileio/PaimonDirectoryFileStatus.java b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/fileio/PaimonDirectoryFileStatus.java
new file mode 100644
index 000000000000..4a173abfbd88
--- /dev/null
+++ b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/fileio/PaimonDirectoryFileStatus.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon.fileio;
+
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+
+import static java.util.Objects.requireNonNull;
+
+public class PaimonDirectoryFileStatus
+ implements FileStatus
+{
+ private final Path path;
+
+ public PaimonDirectoryFileStatus(Path path)
+ {
+ this.path = requireNonNull(path, "path is null");
+ }
+
+ @Override
+ public long getLen()
+ {
+ // can't get len by trino file system
+ return -1;
+ }
+
+ @Override
+ public boolean isDir()
+ {
+ return true;
+ }
+
+ @Override
+ public Path getPath()
+ {
+ return path;
+ }
+
+ @Override
+ public long getModificationTime()
+ {
+ // can't get modification time by trino file system
+ return -1;
+ }
+}
diff --git a/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/fileio/PaimonFileIO.java b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/fileio/PaimonFileIO.java
new file mode 100644
index 000000000000..1fceb7c7f353
--- /dev/null
+++ b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/fileio/PaimonFileIO.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon.fileio;
+
+import io.trino.filesystem.FileEntry;
+import io.trino.filesystem.FileIterator;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoFileSystem;
+import io.trino.filesystem.TrinoInputFile;
+import io.trino.filesystem.TrinoInputStream;
+import io.trino.filesystem.TrinoOutputFile;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static java.util.Locale.ENGLISH;
+import static java.util.Objects.requireNonNull;
+
+public class PaimonFileIO
+ implements FileIO
+{
+ private static final ReentrantLock RENAME_LOCK = new ReentrantLock();
+
+ private final boolean objectStore;
+
+ private final TrinoFileSystem fileSystem;
+
+ public PaimonFileIO(
+ TrinoFileSystem fileSystem,
+ @Nullable Path path)
+ {
+ this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
+ this.objectStore = path == null || checkObjectStore(path.toUri().getScheme());
+ }
+
+ private static boolean checkObjectStore(String scheme)
+ {
+ scheme = scheme.toLowerCase(ENGLISH);
+ if (!scheme.startsWith("s3")
+ && !scheme.startsWith("emr")
+ && !scheme.startsWith("oss")
+ && !scheme.startsWith("wasb")) {
+ return scheme.startsWith("http") || scheme.startsWith("ftp");
+ }
+ return true;
+ }
+
+ @Override
+ public boolean isObjectStore()
+ {
+ return objectStore;
+ }
+
+ @Override
+ public void configure(CatalogContext catalogContext) {}
+
+ @Override
+ public SeekableInputStream newInputStream(Path path)
+ throws IOException
+ {
+ return new PaimonInputStreamWrapper(fileSystem.newInputFile(Location.of(path.toString())).newStream());
+ }
+
+ @Override
+ public PositionOutputStream newOutputStream(Path path, boolean overwrite)
+ throws IOException
+ {
+ TrinoOutputFile outputFile = fileSystem.newOutputFile(Location.of(path.toString()));
+ try {
+ return new PositionOutputStreamWrapper(outputFile.create(), 0);
+ }
+ catch (FileAlreadyExistsException e) {
+ if (overwrite) {
+ fileSystem.deleteFile(Location.of(path.toString()));
+ return new PositionOutputStreamWrapper(outputFile.create(), 0);
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path)
+ throws IOException
+ {
+ return status(path);
+ }
+
+ private FileStatus status(Path path)
+ throws IOException
+ {
+ if (fileSystem.directoryExists(Location.of(path.toString())).orElse(false)) {
+ return new PaimonDirectoryFileStatus(path);
+ }
+ TrinoInputFile trinoInputFile = fileSystem.newInputFile(Location.of(path.toString()));
+ return new PaimonFileStatus(trinoInputFile.length(), path, trinoInputFile.lastModified().getEpochSecond());
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path)
+ throws IOException
+ {
+ List fileStatusList = new ArrayList<>();
+ Location location = Location.of(path.toString());
+ if (fileSystem.directoryExists(location).orElse(false)) {
+ FileIterator fileIterator = fileSystem.listFiles(location);
+ while (fileIterator.hasNext()) {
+ FileEntry fileEntry = fileIterator.next();
+ fileStatusList.add(
+ new PaimonFileStatus(
+ fileEntry.length(),
+ new Path(fileEntry.location().toString()),
+ fileEntry.lastModified().getEpochSecond()));
+ }
+ fileSystem.listDirectories(Location.of(path.toString()))
+ .forEach(directory -> fileStatusList.add(new PaimonDirectoryFileStatus(new Path(directory.toString()))));
+ }
+ return fileStatusList.toArray(new FileStatus[0]);
+ }
+
+ @Override
+ public FileStatus[] listDirectories(Path path)
+ throws IOException
+ {
+ return fileSystem.listDirectories(Location.of(path.toString())).stream()
+ .map(location -> new PaimonDirectoryFileStatus(new Path(location.toString())))
+ .toArray(FileStatus[]::new);
+ }
+
+ @Override
+ public boolean exists(Path path)
+ throws IOException
+ {
+ return fileSystem.directoryExists(Location.of(path.toString())).orElse(false)
+ || existFile(Location.of(path.toString()));
+ }
+
+ private boolean existFile(Location location)
+ throws IOException
+ {
+ try {
+ return fileSystem.newInputFile(location).exists();
+ }
+ catch (IllegalArgumentException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean delete(Path path, boolean recursive)
+ throws IOException
+ {
+ Location location = Location.of(path.toString());
+ if (fileSystem.directoryExists(location).orElse(false)) {
+ if (!recursive) {
+ if (fileSystem.listFiles(location).hasNext()) {
+ throw new IOException("Directory " + location + " is not empty");
+ }
+ }
+ fileSystem.deleteDirectory(location);
+ return true;
+ }
+ else if (existFile(location)) {
+ fileSystem.deleteFile(location);
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public boolean mkdirs(Path path)
+ throws IOException
+ {
+ fileSystem.createDirectory(Location.of(path.toString()));
+ return true;
+ }
+
+ @Override
+ public boolean rename(Path source, Path target)
+ throws IOException
+ {
+ boolean local = "file".equals(source.toUri().getScheme());
+ try {
+ if (local) {
+ RENAME_LOCK.lock();
+ }
+ Location sourceLocation = Location.of(source.toString());
+ Location targetLocation = Location.of(target.toString());
+ if (fileSystem.directoryExists(sourceLocation).orElse(false)) {
+ fileSystem.renameDirectory(sourceLocation, targetLocation);
+ }
+ else {
+ renameFileAnyway(sourceLocation, targetLocation);
+ }
+ }
+ catch (IOException e) {
+ if (e.getMessage().contains("Target location already exists") || e.getMessage().contains("rename failed")) {
+ return false;
+ }
+ throw e;
+ }
+ finally {
+ if (local) {
+ RENAME_LOCK.unlock();
+ }
+ }
+ return true;
+ }
+
+ private void renameFileAnyway(Location source, Location target)
+ throws IOException
+ {
+ try {
+ fileSystem.renameFile(source, target);
+ }
+ catch (IOException e) {
+ try (TrinoInputStream input = fileSystem.newInputFile(source).newStream();
+ OutputStream outputStream = fileSystem.newOutputFile(target).create()) {
+ byte[] btyes = new byte[1024 * 8];
+ int len;
+ while ((len = input.read(btyes)) != -1) {
+ outputStream.write(btyes, 0, len);
+ }
+ }
+ fileSystem.deleteFile(source);
+ }
+ }
+}
diff --git a/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/fileio/PaimonFileStatus.java b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/fileio/PaimonFileStatus.java
new file mode 100644
index 000000000000..91c70fbf78fc
--- /dev/null
+++ b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/fileio/PaimonFileStatus.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon.fileio;
+
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+
+import static java.util.Objects.requireNonNull;
+
+public class PaimonFileStatus
+ implements FileStatus
+{
+ private final long len;
+ private final Path path;
+ private final long modificationTime;
+
+ public PaimonFileStatus(long length, Path path, long modificationTime)
+ {
+ this.len = length;
+ this.path = requireNonNull(path, "path is null");
+ this.modificationTime = modificationTime;
+ }
+
+ @Override
+ public long getLen()
+ {
+ return len;
+ }
+
+ @Override
+ public boolean isDir()
+ {
+ return false;
+ }
+
+ @Override
+ public Path getPath()
+ {
+ return path;
+ }
+
+ @Override
+ public long getModificationTime()
+ {
+ return modificationTime;
+ }
+}
diff --git a/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/fileio/PaimonInputStreamWrapper.java b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/fileio/PaimonInputStreamWrapper.java
new file mode 100644
index 000000000000..8c3a2e5da08a
--- /dev/null
+++ b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/fileio/PaimonInputStreamWrapper.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon.fileio;
+
+import io.trino.filesystem.TrinoInputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import static java.util.Objects.requireNonNull;
+
+public class PaimonInputStreamWrapper
+ extends SeekableInputStream
+{
+ private final TrinoInputStream inputStream;
+
+ public PaimonInputStreamWrapper(TrinoInputStream inputStream)
+ {
+ this.inputStream = requireNonNull(inputStream, "inputStream is null");
+ }
+
+ @Override
+ public void seek(long l)
+ throws IOException
+ {
+ inputStream.seek(l);
+ }
+
+ @Override
+ public long getPos()
+ throws IOException
+ {
+ return inputStream.getPosition();
+ }
+
+ @Override
+ public int read()
+ throws IOException
+ {
+ return inputStream.read();
+ }
+
+ @Override
+ public int read(byte[] bytes, int off, int len)
+ throws IOException
+ {
+ return inputStream.read(bytes, off, len);
+ }
+
+ @Override
+ public void close()
+ throws IOException
+ {
+ inputStream.close();
+ }
+
+ @Override
+ public byte[] readNBytes(int len)
+ throws IOException
+ {
+ return inputStream.readNBytes(len);
+ }
+
+ @Override
+ public int readNBytes(byte[] b, int off, int len)
+ throws IOException
+ {
+ return inputStream.readNBytes(b, off, len);
+ }
+
+ @Override
+ public int read(byte[] b)
+ throws IOException
+ {
+ return inputStream.read(b);
+ }
+
+ @Override
+ public long skip(long n)
+ throws IOException
+ {
+ return inputStream.skip(n);
+ }
+
+ @Override
+ public void skipNBytes(long n)
+ throws IOException
+ {
+ inputStream.skipNBytes(n);
+ }
+
+ @Override
+ public byte[] readAllBytes()
+ throws IOException
+ {
+ return inputStream.readAllBytes();
+ }
+
+ @Override
+ public int available()
+ throws IOException
+ {
+ return inputStream.available();
+ }
+
+ @Override
+ public void mark(int readlimit)
+ {
+ inputStream.mark(readlimit);
+ }
+
+ @Override
+ public void reset()
+ throws IOException
+ {
+ inputStream.reset();
+ }
+
+ @Override
+ public boolean markSupported()
+ {
+ return inputStream.markSupported();
+ }
+
+ @Override
+ public long transferTo(OutputStream out)
+ throws IOException
+ {
+ return inputStream.transferTo(out);
+ }
+}
diff --git a/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/fileio/PositionOutputStreamWrapper.java b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/fileio/PositionOutputStreamWrapper.java
new file mode 100644
index 000000000000..5ad951ebb65f
--- /dev/null
+++ b/plugin/trino-paimon/src/main/java/io/trino/plugin/paimon/fileio/PositionOutputStreamWrapper.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon.fileio;
+
+import org.apache.paimon.fs.PositionOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import static java.util.Objects.requireNonNull;
+
+public class PositionOutputStreamWrapper
+ extends PositionOutputStream
+{
+ private final OutputStream outputStream;
+
+ private long position;
+
+ public PositionOutputStreamWrapper(OutputStream outputStream, long startPosition)
+ {
+ this.outputStream = requireNonNull(outputStream, "outputStream is null");
+ this.position = startPosition;
+ }
+
+ @Override
+ public long getPos()
+ {
+ return position;
+ }
+
+ @Override
+ public void write(int b)
+ throws IOException
+ {
+ position++;
+ outputStream.write(b);
+ }
+
+ @Override
+ public void write(byte[] bytes)
+ throws IOException
+ {
+ position += bytes.length;
+ outputStream.write(bytes);
+ }
+
+ @Override
+ public void write(byte[] bytes, int off, int len)
+ throws IOException
+ {
+ position += len;
+ outputStream.write(bytes, off, len);
+ }
+
+ @Override
+ public void flush()
+ throws IOException
+ {
+ outputStream.flush();
+ }
+
+ @Override
+ public void close()
+ throws IOException
+ {
+ outputStream.close();
+ }
+}
diff --git a/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/PaimonQueryRunner.java b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/PaimonQueryRunner.java
new file mode 100644
index 000000000000..ad8df693a4d8
--- /dev/null
+++ b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/PaimonQueryRunner.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon;
+
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import io.airlift.log.Logger;
+import io.trino.Session;
+import io.trino.plugin.hive.containers.Hive3MinioDataLake;
+import io.trino.plugin.paimon.testing.PaimonTablesInitializer;
+import io.trino.testing.DistributedQueryRunner;
+import io.trino.testing.QueryRunner;
+import io.trino.tpch.TpchTable;
+
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+
+import static io.airlift.testing.Closeables.closeAllSuppress;
+import static io.trino.testing.TestingSession.testSessionBuilder;
+import static io.trino.testing.containers.Minio.MINIO_REGION;
+import static io.trino.testing.containers.Minio.MINIO_ROOT_PASSWORD;
+import static io.trino.testing.containers.Minio.MINIO_ROOT_USER;
+
+final class PaimonQueryRunner
+{
+ private static final String PAIMON_CATALOG = "paimon";
+ private static final String SCHEMA_NAME = "tpch";
+
+ private PaimonQueryRunner() {}
+
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
+ public static Builder builder(Hive3MinioDataLake hiveMinioDataLake)
+ {
+ return new Builder()
+ .setWarehouse("s3://" + hiveMinioDataLake.getBucketName() + "/")
+ .addConnectorProperty("paimon.catalog.type", "hive")
+ .addConnectorProperty("hive.metastore.uri", hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint().toString())
+ .addConnectorProperty("fs.native-s3.enabled", "true")
+ .addConnectorProperty("s3.aws-access-key", MINIO_ROOT_USER)
+ .addConnectorProperty("s3.aws-secret-key", MINIO_ROOT_PASSWORD)
+ .addConnectorProperty("s3.region", MINIO_REGION)
+ .addConnectorProperty("s3.endpoint", hiveMinioDataLake.getMinio().getMinioAddress())
+ .addConnectorProperty("s3.path-style-access", "true");
+ }
+
+ public static class Builder
+ extends DistributedQueryRunner.Builder
+ {
+ private final Map connectorProperties = new HashMap<>();
+ private String warehouse;
+ private PaimonTablesInitializer dataLoader;
+
+ protected Builder()
+ {
+ super(testSessionBuilder()
+ .setCatalog("paimon")
+ .setSchema(SCHEMA_NAME)
+ .build());
+ }
+
+ @CanIgnoreReturnValue
+ public Builder setWarehouse(String warehouse)
+ {
+ this.warehouse = warehouse;
+ return this;
+ }
+
+ @CanIgnoreReturnValue
+ public Builder setDataLoader(PaimonTablesInitializer dataLoader)
+ {
+ this.dataLoader = dataLoader;
+ return this;
+ }
+
+ @CanIgnoreReturnValue
+ public Builder addConnectorProperty(String key, String value)
+ {
+ this.connectorProperties.put(key, value);
+ return this;
+ }
+
+ @Override
+ public DistributedQueryRunner build()
+ throws Exception
+ {
+ Session session = testSessionBuilder().setCatalog(PAIMON_CATALOG).setSchema(SCHEMA_NAME).build();
+ DistributedQueryRunner queryRunner = super.build();
+ try {
+ queryRunner.installPlugin(new TestingPaimonPlugin(queryRunner.getCoordinator().getBaseDataDir().resolve("/")));
+ if (warehouse == null) {
+ Path dataDir = queryRunner.getCoordinator().getBaseDataDir().resolve("paimon_data");
+ warehouse = dataDir.toFile().toURI().toString();
+ }
+ connectorProperties.put("paimon.warehouse", warehouse);
+ queryRunner.createCatalog(PAIMON_CATALOG, PAIMON_CATALOG, connectorProperties);
+
+ dataLoader.initializeTables(session, queryRunner, SCHEMA_NAME);
+ return queryRunner;
+ }
+ catch (Throwable e) {
+ closeAllSuppress(e, queryRunner);
+ throw e;
+ }
+ }
+ }
+
+ public static void main(String[] args)
+ throws Exception
+ {
+ //noinspection resource
+ QueryRunner queryRunner = builder()
+ .addCoordinatorProperty("http-server.http.port", "8080")
+ .setDataLoader(new PaimonTablesInitializer(TpchTable.getTables()))
+ .build();
+ Logger log = Logger.get(PaimonQueryRunner.class);
+ log.info("======== SERVER STARTED ========");
+ log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
+ }
+}
diff --git a/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/PaimonTestUtils.java b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/PaimonTestUtils.java
new file mode 100644
index 000000000000..49d701185001
--- /dev/null
+++ b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/PaimonTestUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.paimon.data.BinaryString.fromString;
+
+final class PaimonTestUtils
+{
+ private PaimonTestUtils() {}
+
+ public static byte[] getSerializedTable()
+ throws Exception
+ {
+ String warehouse = Files.createTempDirectory(UUID.randomUUID().toString()).toUri().toString();
+ Path tablePath = new Path(warehouse, "test.db/user");
+ SimpleTableTestHelper testHelper = createTestHelper(tablePath);
+ testHelper.write(GenericRow.of(1, 2L, fromString("1"), fromString("1")));
+ testHelper.write(GenericRow.of(3, 4L, fromString("2"), fromString("2")));
+ testHelper.write(GenericRow.of(5, 6L, fromString("3"), fromString("3")));
+ testHelper.write(GenericRow.ofKind(RowKind.DELETE, 3, 4L, fromString("2"), fromString("2")));
+ testHelper.commit();
+ Map config = new HashMap<>();
+ config.put("warehouse", warehouse);
+ Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(Options.fromMap(config)));
+ Identifier tablePath2 = new Identifier("test", "user");
+ return InstantiationUtil.serializeObject(catalog.getTable(tablePath2));
+ }
+
+ private static SimpleTableTestHelper createTestHelper(Path tablePath)
+ throws Exception
+ {
+ RowType rowType = new RowType(
+ Arrays.asList(
+ new DataField(0, "a", new IntType()),
+ new DataField(1, "b", new BigIntType()),
+ // test field name has upper case
+ new DataField(2, "aCa", new VarCharType()),
+ new DataField(3, "d", new CharType(1))));
+ return new SimpleTableTestHelper(tablePath, rowType);
+ }
+}
diff --git a/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/SimpleTableTestHelper.java b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/SimpleTableTestHelper.java
new file mode 100644
index 000000000000..694c65c356d5
--- /dev/null
+++ b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/SimpleTableTestHelper.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.InnerTableCommit;
+import org.apache.paimon.table.sink.InnerTableWrite;
+import org.apache.paimon.types.RowType;
+
+final class SimpleTableTestHelper
+{
+ private static final String USER = "user";
+
+ private final InnerTableWrite writer;
+ private final InnerTableCommit commit;
+ private final FileStoreTable table;
+
+ public SimpleTableTestHelper(Path path, RowType rowType)
+ throws Exception
+ {
+ Schema schema = new Schema(rowType.getFields(), ImmutableList.of(), ImmutableList.of("a"), ImmutableMap.of("bucket", "1"), "");
+ new SchemaManager(LocalFileIO.create(), path).createTable(schema);
+ table = FileStoreTableFactory.create(LocalFileIO.create(), path);
+ this.writer = table.newWrite(USER);
+ this.commit = table.newCommit(USER);
+ }
+
+ public void write(InternalRow row)
+ throws Exception
+ {
+ writer.write(row);
+ }
+
+ public void createTag(String name)
+ {
+ table.createTag(name);
+ }
+
+ public void commit()
+ throws Exception
+ {
+ commit.commit(0, writer.prepareCommit(true, 0));
+ }
+}
diff --git a/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonConfig.java b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonConfig.java
new file mode 100644
index 000000000000..f6051e96e498
--- /dev/null
+++ b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonConfig.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon;
+
+final class TestPaimonConfig
+{
+ // TODO Add tests
+}
diff --git a/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonConnectorTest.java b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonConnectorTest.java
new file mode 100644
index 000000000000..736ff39ef990
--- /dev/null
+++ b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonConnectorTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon;
+
+import io.trino.plugin.paimon.testing.PaimonTablesInitializer;
+import io.trino.testing.BaseConnectorTest;
+import io.trino.testing.QueryRunner;
+import io.trino.testing.TestingConnectorBehavior;
+
+final class TestPaimonConnectorTest
+ extends BaseConnectorTest
+{
+ @Override
+ protected QueryRunner createQueryRunner()
+ throws Exception
+ {
+ return PaimonQueryRunner.builder()
+ .setDataLoader(new PaimonTablesInitializer(REQUIRED_TPCH_TABLES))
+ .build();
+ }
+
+ @Override
+ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
+ {
+ return switch (connectorBehavior) {
+ case SUPPORTS_ADD_COLUMN,
+ SUPPORTS_ARRAY,
+ SUPPORTS_COMMENT_ON_COLUMN,
+ SUPPORTS_COMMENT_ON_TABLE,
+ SUPPORTS_CREATE_MATERIALIZED_VIEW,
+ SUPPORTS_CREATE_SCHEMA,
+ SUPPORTS_CREATE_TABLE,
+ SUPPORTS_CREATE_VIEW,
+ SUPPORTS_LIMIT_PUSHDOWN,
+ SUPPORTS_DELETE,
+ SUPPORTS_DROP_NOT_NULL_CONSTRAINT,
+ SUPPORTS_INSERT,
+ SUPPORTS_MAP_TYPE,
+ SUPPORTS_MERGE,
+ SUPPORTS_RENAME_COLUMN,
+ SUPPORTS_RENAME_SCHEMA,
+ SUPPORTS_RENAME_TABLE,
+ SUPPORTS_ROW_TYPE,
+ SUPPORTS_TOPN_PUSHDOWN,
+ SUPPORTS_UPDATE -> false;
+ default -> super.hasBehavior(connectorBehavior);
+ };
+ }
+}
diff --git a/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonITCase.java b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonITCase.java
new file mode 100644
index 000000000000..47c63bf20c66
--- /dev/null
+++ b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonITCase.java
@@ -0,0 +1,686 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon;
+
+import com.google.common.collect.ImmutableMap;
+import io.trino.testing.AbstractTestQueryFramework;
+import io.trino.testing.DistributedQueryRunner;
+import io.trino.testing.MaterializedResult;
+import io.trino.testing.QueryRunner;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.InnerTableCommit;
+import org.apache.paimon.table.sink.InnerTableWrite;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+import org.junit.jupiter.api.Test;
+
+import java.nio.file.Files;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static io.airlift.testing.Closeables.closeAllSuppress;
+import static io.trino.testing.TestingSession.testSessionBuilder;
+import static java.time.ZoneOffset.UTC;
+import static org.apache.paimon.data.BinaryString.fromString;
+import static org.assertj.core.api.Assertions.assertThat;
+
+// TODO Merge into TestPaimonConnectorTest
+final class TestPaimonITCase
+ extends AbstractTestQueryFramework
+{
+ private static final String CATALOG = "paimon";
+ private static final String DB = "default";
+
+ private static SimpleTableTestHelper createTestHelper(Path tablePath)
+ throws Exception
+ {
+ RowType rowType = new RowType(
+ Arrays.asList(
+ new DataField(0, "a", new IntType()),
+ new DataField(1, "b", new BigIntType()),
+ // test field name has upper case
+ new DataField(2, "aCa", new VarCharType()),
+ new DataField(3, "d", new CharType(1))));
+ return new SimpleTableTestHelper(tablePath, rowType);
+ }
+
+ private static String timestampLiteral(long epochMilliSeconds, int precision)
+ {
+ return DateTimeFormatter.ofPattern("''yyyy-MM-dd HH:mm:ss." + "S".repeat(precision) + " VV''")
+ .format(Instant.ofEpochMilli(epochMilliSeconds).atZone(UTC));
+ }
+
+ @Override
+ protected QueryRunner createQueryRunner()
+ throws Exception
+ {
+ String warehouse = Files.createTempDirectory(UUID.randomUUID().toString()).toUri().toString();
+
+ Path tablePath1 = new Path(warehouse, DB + ".db/t1");
+ SimpleTableTestHelper testHelper1 = createTestHelper(tablePath1);
+ testHelper1.write(GenericRow.of(1, 2L, fromString("1"), fromString("1")));
+ testHelper1.write(GenericRow.of(3, 4L, fromString("2"), fromString("2")));
+ testHelper1.write(GenericRow.of(5, 6L, fromString("3"), fromString("3")));
+ testHelper1.write(GenericRow.ofKind(RowKind.DELETE, 3, 4L, fromString("2"), fromString("2")));
+ testHelper1.commit();
+
+ Path tablePath2 = new Path(warehouse, "default.db/t2");
+ SimpleTableTestHelper testHelper2 = createTestHelper(tablePath2);
+ testHelper2.write(GenericRow.of(1, 2L, fromString("1"), fromString("1")));
+ testHelper2.write(GenericRow.of(3, 4L, fromString("2"), fromString("2")));
+ testHelper2.commit();
+ testHelper2.createTag("1");
+ testHelper2.write(GenericRow.of(5, 6L, fromString("3"), fromString("3")));
+ testHelper2.write(GenericRow.of(7, 8L, fromString("4"), fromString("4")));
+ testHelper2.commit();
+ testHelper2.createTag("tag-2");
+
+ {
+ Path tablePath3 = new Path(warehouse, "default.db/t3");
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "pt", DataTypes.STRING()),
+ new DataField(1, "a", new IntType()),
+ new DataField(2, "b", new BigIntType()),
+ new DataField(3, "c", new BigIntType()),
+ new DataField(4, "d", new IntType())));
+ new SchemaManager(LocalFileIO.create(), tablePath3)
+ .createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.singletonList("pt"),
+ Collections.emptyList(),
+ new HashMap<>(),
+ ""));
+ FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath3);
+ InnerTableWrite writer = table.newWrite("user");
+ InnerTableCommit commit = table.newCommit("user");
+ writer.write(GenericRow.of(fromString("1"), 1, 1L, 1L, 1));
+ writer.write(GenericRow.of(fromString("1"), 1, 2L, 2L, 2));
+ writer.write(GenericRow.of(fromString("2"), 3, 3L, 3L, 3));
+ commit.commit(0, writer.prepareCommit(true, 0));
+ }
+
+ {
+ Path tablePath = new Path(warehouse, "default.db/empty_t");
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(1, "a", new IntType()),
+ new DataField(2, "b", new BigIntType())));
+ new SchemaManager(LocalFileIO.create(), tablePath)
+ .createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<>(),
+ ""));
+ }
+
+ {
+ Path tablePath4 = new Path(warehouse, "default.db/t4");
+ List innerRowFields = new ArrayList<>();
+ innerRowFields.add(new DataField(4, "innercol1", new IntType()));
+ innerRowFields.add(
+ new DataField(5, "innercol2", new VarCharType(VarCharType.MAX_LENGTH)));
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "i", new IntType()),
+ new DataField(
+ 1,
+ "map",
+ new MapType(
+ new VarCharType(VarCharType.MAX_LENGTH),
+ new VarCharType(VarCharType.MAX_LENGTH))),
+ new DataField(2, "innerrow", new RowType(true, innerRowFields)),
+ new DataField(3, "array", new ArrayType(new IntType()))));
+ new SchemaManager(LocalFileIO.create(), tablePath4)
+ .createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.singletonList("i"),
+ Collections.singletonMap("bucket", "1"),
+ ""));
+ FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath4);
+ InnerTableWrite writer = table.newWrite("user");
+ InnerTableCommit commit = table.newCommit("user");
+ writer.write(
+ GenericRow.of(
+ 1,
+ new GenericMap(
+ new HashMap<>(ImmutableMap.of(fromString("1"), fromString("2")))),
+ GenericRow.of(2, fromString("male")),
+ new GenericArray(new int[] {1, 2, 3})));
+ commit.commit(0, writer.prepareCommit(true, 0));
+ }
+
+ {
+ Path tablePath6 = new Path(warehouse, "default.db/t99");
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "boolean", DataTypes.BOOLEAN()),
+ new DataField(1, "tinyint", DataTypes.TINYINT()),
+ new DataField(2, "smallint", DataTypes.SMALLINT()),
+ new DataField(3, "int", DataTypes.INT()),
+ new DataField(4, "bigint", DataTypes.BIGINT()),
+ new DataField(5, "float", DataTypes.FLOAT()),
+ new DataField(6, "double", DataTypes.DOUBLE()),
+ new DataField(7, "char", DataTypes.CHAR(5)),
+ new DataField(8, "varchar", DataTypes.VARCHAR(100)),
+ new DataField(9, "date", DataTypes.DATE()),
+ new DataField(10, "timestamp_0", DataTypes.TIMESTAMP(0)),
+ new DataField(11, "timestamp_3", DataTypes.TIMESTAMP(3)),
+ new DataField(12, "timestamp_6", DataTypes.TIMESTAMP(6)),
+ new DataField(
+ 13,
+ "timestamp_tz",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+ new DataField(14, "decimal", DataTypes.DECIMAL(10, 5)),
+ new DataField(15, "varbinary", DataTypes.VARBINARY(10)),
+ new DataField(16, "array", DataTypes.ARRAY(DataTypes.INT())),
+ new DataField(
+ 17,
+ "map",
+ DataTypes.MAP(DataTypes.INT(), DataTypes.INT())),
+ new DataField(
+ 18,
+ "row",
+ DataTypes.ROW(
+ DataTypes.FIELD(100, "q1", DataTypes.INT()),
+ DataTypes.FIELD(101, "q2", DataTypes.INT())))));
+ new SchemaManager(LocalFileIO.create(), tablePath6)
+ .createTable(
+ new Schema(
+ rowType.getFields(),
+ List.of(
+ "boolean",
+ "tinyint",
+ "smallint",
+ "int",
+ "bigint",
+ "float",
+ "double",
+ "char",
+ "varchar",
+ "date",
+ "timestamp_0",
+ "timestamp_3",
+ "timestamp_6",
+ "timestamp_tz",
+ "decimal"),
+ List.of(
+ "boolean",
+ "tinyint",
+ "smallint",
+ "int",
+ "bigint",
+ "float",
+ "double",
+ "char",
+ "varchar",
+ "date",
+ "timestamp_0",
+ "timestamp_3",
+ "timestamp_6",
+ "timestamp_tz",
+ "decimal",
+ "varbinary"),
+ Collections.singletonMap("bucket", "1"),
+ ""));
+ FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath6);
+ InnerTableWrite writer = table.newWrite("user");
+ InnerTableCommit commit = table.newCommit("user");
+ writer.write(
+ GenericRow.of(
+ true,
+ (byte) 1,
+ (short) 1,
+ 1,
+ 1L,
+ 1.0f,
+ 1.0d,
+ BinaryString.fromString("char1"),
+ BinaryString.fromString("varchar1"),
+ 0,
+ Timestamp.fromMicros(1694505288000000L),
+ Timestamp.fromMicros(1694505288001000L),
+ Timestamp.fromMicros(1694505288001001L),
+ Timestamp.fromMicros(1694505288002001L),
+ Decimal.fromUnscaledLong(10000, 10, 5),
+ new byte[] {0x01, 0x02, 0x03},
+ new GenericArray(new int[] {1, 1, 1}),
+ new GenericMap(Map.of(1, 1)),
+ GenericRow.of(1, 1)));
+ commit.commit(0, writer.prepareCommit(true, 0));
+ }
+
+ {
+ Path tablePath7 = new Path(warehouse, "default.db/t100");
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "boolean", DataTypes.BOOLEAN()),
+ new DataField(1, "tinyint", DataTypes.TINYINT()),
+ new DataField(2, "smallint", DataTypes.SMALLINT()),
+ new DataField(3, "int", DataTypes.INT()),
+ new DataField(4, "bigint", DataTypes.BIGINT()),
+ new DataField(5, "float", DataTypes.FLOAT()),
+ new DataField(6, "double", DataTypes.DOUBLE()),
+ new DataField(7, "char", DataTypes.CHAR(5)),
+ new DataField(8, "varchar", DataTypes.VARCHAR(100)),
+ new DataField(9, "date", DataTypes.DATE()),
+ new DataField(10, "timestamp_0", DataTypes.TIMESTAMP(3)),
+ new DataField(11, "timestamp_3", DataTypes.TIMESTAMP(3)),
+ new DataField(12, "timestamp_6", DataTypes.TIMESTAMP(6)),
+ new DataField(13, "decimal", DataTypes.DECIMAL(10, 5)),
+ new DataField(14, "varbinary", DataTypes.VARBINARY(10)),
+ new DataField(15, "array", DataTypes.ARRAY(DataTypes.INT())),
+ new DataField(
+ 16,
+ "map",
+ DataTypes.MAP(DataTypes.INT(), DataTypes.INT())),
+ new DataField(
+ 17,
+ "row",
+ DataTypes.ROW(
+ DataTypes.FIELD(100, "q1", DataTypes.INT()),
+ DataTypes.FIELD(101, "q2", DataTypes.INT())))));
+ new SchemaManager(LocalFileIO.create(), tablePath7)
+ .createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonMap("bucket", "-1"),
+ ""));
+ FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath7);
+ InnerTableWrite writer = table.newWrite("user");
+ InnerTableCommit commit = table.newCommit("user");
+ writer.write(
+ GenericRow.of(
+ true,
+ (byte) 1,
+ (short) 1,
+ 1,
+ 1L,
+ 1.0f,
+ 1.0d,
+ BinaryString.fromString("char1"),
+ BinaryString.fromString("varchar1"),
+ 0,
+ Timestamp.fromMicros(1694505288000000L),
+ Timestamp.fromMicros(1694505288001000L),
+ Timestamp.fromMicros(1694505288001001L),
+ Decimal.fromUnscaledLong(10000, 10, 5),
+ new byte[] {0x01, 0x02, 0x03},
+ new GenericArray(new int[] {1, 1, 1}),
+ new GenericMap(Map.of(1, 1)),
+ GenericRow.of(1, 1)));
+ commit.commit(0, writer.prepareCommit(true, 0));
+
+ new SchemaManager(LocalFileIO.create(), tablePath7)
+ .commitChanges(SchemaChange.dropColumn("smallint"));
+ table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath7);
+ writer = table.newWrite("user");
+ commit = table.newCommit("user");
+ writer.write(
+ GenericRow.of(
+ true,
+ (byte) 1,
+ 1,
+ 1L,
+ 1.0f,
+ 1.0d,
+ BinaryString.fromString("char1"),
+ BinaryString.fromString("varchar1"),
+ 0,
+ Timestamp.fromMicros(1694505288000000L),
+ Timestamp.fromMicros(1694505288001000L),
+ Timestamp.fromMicros(1694505288001001L),
+ Decimal.fromUnscaledLong(10000, 10, 5),
+ new byte[] {0x01, 0x02, 0x03},
+ new GenericArray(new int[] {1, 1, 1}),
+ new GenericMap(Map.of(1, 1)),
+ GenericRow.of(1, 1)));
+ commit.commit(1, writer.prepareCommit(true, 1));
+
+ new SchemaManager(LocalFileIO.create(), tablePath7)
+ .commitChanges(SchemaChange.addColumn("smallint", DataTypes.SMALLINT()));
+ table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath7);
+ writer = table.newWrite("user");
+ commit = table.newCommit("user");
+ writer.write(
+ GenericRow.of(
+ true,
+ (byte) 1,
+ 1,
+ 1L,
+ 1.0f,
+ 1.0d,
+ BinaryString.fromString("char1"),
+ BinaryString.fromString("varchar1"),
+ 0,
+ Timestamp.fromMicros(1694505288000000L),
+ Timestamp.fromMicros(1694505288001000L),
+ Timestamp.fromMicros(1694505288001001L),
+ Decimal.fromUnscaledLong(10000, 10, 5),
+ new byte[] {0x01, 0x02, 0x03},
+ new GenericArray(new int[] {1, 1, 1}),
+ new GenericMap(Map.of(1, 1)),
+ GenericRow.of(1, 1),
+ (short) 1));
+ commit.commit(1, writer.prepareCommit(true, 1));
+
+ new SchemaManager(LocalFileIO.create(), tablePath7)
+ .commitChanges(SchemaChange.updateColumnType("smallint", DataTypes.STRING()));
+ table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath7);
+ writer = table.newWrite("user");
+ commit = table.newCommit("user");
+ writer.write(
+ GenericRow.of(
+ true,
+ (byte) 1,
+ 1,
+ 1L,
+ 1.0f,
+ 1.0d,
+ BinaryString.fromString("char1"),
+ BinaryString.fromString("varchar1"),
+ 0,
+ Timestamp.fromMicros(1694505288000000L),
+ Timestamp.fromMicros(1694505288001000L),
+ Timestamp.fromMicros(1694505288001001L),
+ Decimal.fromUnscaledLong(10000, 10, 5),
+ new byte[] {0x01, 0x02, 0x03},
+ new GenericArray(new int[] {1, 1, 1}),
+ new GenericMap(Map.of(1, 1)),
+ GenericRow.of(1, 1),
+ BinaryString.fromString("10086")));
+ commit.commit(1, writer.prepareCommit(true, 1));
+ }
+
+ {
+ Path tablePath6 = new Path(warehouse, "default.db/t101");
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "a", DataTypes.STRING()),
+ new DataField(1, "b", DataTypes.INT()),
+ new DataField(2, "c", DataTypes.INT())));
+ new SchemaManager(LocalFileIO.create(), tablePath6)
+ .createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ List.of("a"),
+ new HashMap<>(ImmutableMap.of(CoreOptions.BUCKET.key(), "1", CoreOptions.DELETION_VECTORS_ENABLED.key(), "true")),
+ ""));
+ FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath6);
+ InnerTableWrite writer = table.newWrite("user");
+ writer.withIOManager(new IOManagerImpl("/tmp"));
+ InnerTableCommit commit = table.newCommit("user");
+ for (int i = 0; i < 10; i++) {
+ writer.write(GenericRow.of(BinaryString.fromString("a" + i), i, i));
+ }
+ commit.commit(0, writer.prepareCommit(true, 0));
+
+ writer.write(GenericRow.ofKind(RowKind.DELETE, BinaryString.fromString("a0"), 0, 0));
+ commit.commit(1, writer.prepareCommit(true, 1));
+ }
+
+ {
+ Path tablePath = new Path(warehouse, "default.db/t102");
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "a", DataTypes.STRING()),
+ new DataField(1, "b", DataTypes.INT()),
+ new DataField(2, "c", DataTypes.INT())));
+ new SchemaManager(LocalFileIO.create(), tablePath)
+ .createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<>(ImmutableMap.of("file-index.bloom-filter.columns", "a,b,c")),
+ ""));
+ FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath);
+ InnerTableWrite writer = table.newWrite("user");
+ writer.withIOManager(new IOManagerImpl("/tmp"));
+ InnerTableCommit commit = table.newCommit("user");
+ for (int i = 0; i < 100; i = i + 3) {
+ writer.write(GenericRow.of(BinaryString.fromString("a" + i), i, i));
+ }
+ commit.commit(0, writer.prepareCommit(true, 0));
+
+ for (int i = 1; i < 100; i = i + 3) {
+ writer.write(GenericRow.of(BinaryString.fromString("a" + i), i, i));
+ }
+ commit.commit(1, writer.prepareCommit(true, 1));
+
+ for (int i = 2; i < 100; i = i + 3) {
+ writer.write(GenericRow.of(BinaryString.fromString("a" + i), i, i));
+ }
+ commit.commit(2, writer.prepareCommit(true, 2));
+ }
+
+ {
+ Path tablePath = new Path(warehouse, "default.db/fixed_bucket_table_wi_pk");
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "name", DataTypes.STRING())));
+ new SchemaManager(LocalFileIO.create(), tablePath)
+ .createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<>(ImmutableMap.of("file.format", "orc", "primary-key", "id", "bucket", "2")),
+ ""));
+ }
+
+ {
+ Path tablePath = new Path(warehouse, "default.db/fixed_bucket_table_wo_pk");
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "name", DataTypes.STRING())));
+ new SchemaManager(LocalFileIO.create(), tablePath)
+ .createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<>(ImmutableMap.of("file.format", "orc", "bucket", "2", "bucket-key", "id")),
+ ""));
+ }
+
+ {
+ Path tablePath = new Path(warehouse, "default.db/unaware_table");
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "name", DataTypes.STRING())));
+ new SchemaManager(LocalFileIO.create(), tablePath)
+ .createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<>(ImmutableMap.of("file.format", "orc")),
+ ""));
+ }
+
+ DistributedQueryRunner queryRunner = null;
+ try {
+ queryRunner =
+ DistributedQueryRunner.builder(
+ testSessionBuilder().setCatalog(CATALOG).setSchema(DB).build())
+ .build();
+ queryRunner.installPlugin(new PaimonPlugin());
+ Map options = new HashMap<>();
+ options.put("paimon.warehouse", warehouse);
+ options.put("paimon.catalog.type", "filesystem");
+ options.put("fs.hadoop.enabled", "true");
+ queryRunner.createCatalog(CATALOG, CATALOG, options);
+ return queryRunner;
+ }
+ catch (Throwable e) {
+ closeAllSuppress(e, queryRunner);
+ throw e;
+ }
+ }
+
+ @Test
+ void testComplexTypes()
+ {
+ assertThat(execute("SELECT * FROM paimon.default.t4"))
+ .isEqualTo("[[1, {1=2}, [2, male], [1, 2, 3]]]");
+ }
+
+ @Test
+ void testEmptyTable()
+ {
+ assertThat(execute("SELECT * FROM paimon.default.empty_t")).isEqualTo("[]");
+ }
+
+ @Test
+ void testProjection()
+ {
+ assertThat(execute("SELECT * FROM paimon.default.t1"))
+ .isEqualTo("[[1, 2, 1, 1], [5, 6, 3, 3]]");
+ assertThat(execute("SELECT a, aCa FROM paimon.default.t1")).isEqualTo("[[1, 1], [5, 3]]");
+ assertThat(execute("SELECT SUM(b) FROM paimon.default.t1")).isEqualTo("[[8]]");
+ }
+
+ @Test
+ void testLimit()
+ {
+ assertThat(execute("SELECT * FROM paimon.default.t1 LIMIT 1")).isEqualTo("[[1, 2, 1, 1]]");
+ assertThat(execute("SELECT * FROM paimon.default.t1 WHERE a = 5 LIMIT 1"))
+ .isEqualTo("[[5, 6, 3, 3]]");
+ }
+
+ @Test
+ void testFilter()
+ {
+ assertThat(execute("SELECT a, aCa FROM paimon.default.t2 WHERE a < 4"))
+ .isEqualTo("[[1, 1], [3, 2]]");
+ }
+
+ @Test
+ void testGroupByWithCast()
+ {
+ assertThat(
+ execute(
+ "SELECT pt, a, SUM(b), SUM(d) FROM paimon.default.t3 GROUP BY pt, a ORDER BY pt, a"))
+ .isEqualTo("[[1, 1, 3, 3], [2, 3, 3, 3]]");
+ }
+
+ @Test
+ void testLimitWithPartition()
+ {
+ assertThat(execute("SELECT * FROM paimon.default.t3 WHERE pt = '1' LIMIT 1"))
+ .isEqualTo("[[1, 1, 1, 1, 1]]");
+
+ assertThat(execute("SELECT * FROM paimon.default.t3 WHERE pt = '1' AND b = 2 LIMIT 1"))
+ .isEqualTo("[[1, 1, 2, 2, 2]]");
+ }
+
+ @Test
+ void testAllType()
+ {
+ assertThat(
+ execute(
+ "SELECT boolean, tinyint, smallint,int,bigint,float,double,char,varchar, date,timestamp_0, "
+ + "timestamp_3, timestamp_6, decimal, to_hex(varbinary), array, map, row FROM paimon.default.t99"))
+ .isEqualTo(
+ "[[true, 1, 1, 1, 1, 1.0, 1.0, char1, varchar1, 1970-01-01, "
+ + "2023-09-12T07:54:48, 2023-09-12T07:54:48.001, 2023-09-12T07:54:48.001001, "
+ + "0.10000, 010203, [1, 1, 1], {1=1}, [1, 1]]]");
+ }
+
+ @Test
+ void testSchemaEvolution()
+ {
+ assertThat(
+ execute(
+ "SELECT boolean, tinyint, smallint, int, bigint,float,double,char,varchar, date,timestamp_0, "
+ + "timestamp_3, timestamp_6, decimal, to_hex(varbinary), array, map, row FROM paimon.default.t100"))
+ .isEqualTo(
+ "[[true, 1, null, 1, 1, 1.0, 1.0, char1, varchar1, 1970-01-01, 2023-09-12T07:54:48, 2023-09-12T07:54:48.001, 2023-09-12T07:54:48.001001, 0.10000, 010203, [1, 1, 1], {1=1}, [1, 1]], "
+ + "[true, 1, null, 1, 1, 1.0, 1.0, char1, varchar1, 1970-01-01, 2023-09-12T07:54:48, 2023-09-12T07:54:48.001, 2023-09-12T07:54:48.001001, 0.10000, 010203, [1, 1, 1], {1=1}, [1, 1]], "
+ + "[true, 1, 1, 1, 1, 1.0, 1.0, char1, varchar1, 1970-01-01, 2023-09-12T07:54:48, 2023-09-12T07:54:48.001, 2023-09-12T07:54:48.001001, 0.10000, 010203, [1, 1, 1], {1=1}, [1, 1]], "
+ + "[true, 1, 10086, 1, 1, 1.0, 1.0, char1, varchar1, 1970-01-01, 2023-09-12T07:54:48, 2023-09-12T07:54:48.001, 2023-09-12T07:54:48.001001, 0.10000, 010203, [1, 1, 1], {1=1}, [1, 1]]]");
+ }
+
+ @Test
+ void testDeletionFile()
+ {
+ assertThat(execute("SELECT * FROM paimon.default.t101 WHERE b > 0"))
+ .isEqualTo(
+ "[[a1, 1, 1], [a2, 2, 2], [a3, 3, 3], [a4, 4, 4], [a5, 5, 5], [a6, 6, 6], [a7, 7, 7], [a8, 8, 8], [a9, 9, 9]]");
+ }
+
+ @Test
+ void testFileIndex()
+ {
+ assertThat(execute("SELECT * FROM paimon.default.t102 where c = 2")).isEqualTo("[[a2, 2, 2]]");
+ }
+
+ private String execute(String sql)
+ {
+ MaterializedResult result = getQueryRunner().execute(sql);
+ // TODO Use assertThat(query()) instead
+ return result.getMaterializedRows().toString();
+ }
+}
diff --git a/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonMinioConnectorSmokeTest.java b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonMinioConnectorSmokeTest.java
new file mode 100644
index 000000000000..1e5cdb674a87
--- /dev/null
+++ b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonMinioConnectorSmokeTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon;
+
+import io.trino.plugin.hive.containers.Hive3MinioDataLake;
+import io.trino.plugin.paimon.testing.PaimonTablesInitializer;
+import io.trino.testing.BaseConnectorSmokeTest;
+import io.trino.testing.QueryRunner;
+import io.trino.testing.TestingConnectorBehavior;
+
+import static io.trino.plugin.hive.containers.HiveHadoop.HIVE3_IMAGE;
+import static io.trino.testing.TestingNames.randomNameSuffix;
+
+final class TestPaimonMinioConnectorSmokeTest
+ extends BaseConnectorSmokeTest
+{
+ @Override
+ protected QueryRunner createQueryRunner()
+ throws Exception
+ {
+ String bucketName = "test-paimon-connector-" + randomNameSuffix();
+ Hive3MinioDataLake hiveMinioDataLake = closeAfterClass(new Hive3MinioDataLake(bucketName, HIVE3_IMAGE));
+ hiveMinioDataLake.start();
+ hiveMinioDataLake.getMinioClient().ensureBucketExists(bucketName);
+
+ return PaimonQueryRunner.builder(hiveMinioDataLake)
+ .setDataLoader(new PaimonTablesInitializer(REQUIRED_TPCH_TABLES))
+ .build();
+ }
+
+ @Override
+ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
+ {
+ return switch (connectorBehavior) {
+ case SUPPORTS_ADD_COLUMN,
+ SUPPORTS_ARRAY,
+ SUPPORTS_CREATE_MATERIALIZED_VIEW,
+ SUPPORTS_CREATE_SCHEMA,
+ SUPPORTS_CREATE_TABLE,
+ SUPPORTS_CREATE_VIEW,
+ SUPPORTS_DELETE,
+ SUPPORTS_INSERT,
+ SUPPORTS_MAP_TYPE,
+ SUPPORTS_MERGE,
+ SUPPORTS_RENAME_COLUMN,
+ SUPPORTS_RENAME_SCHEMA,
+ SUPPORTS_RENAME_TABLE,
+ SUPPORTS_ROW_TYPE,
+ SUPPORTS_UPDATE -> false;
+ default -> super.hasBehavior(connectorBehavior);
+ };
+ }
+}
diff --git a/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonPlugin.java b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonPlugin.java
new file mode 100644
index 000000000000..6eff7f4e1029
--- /dev/null
+++ b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonPlugin.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon;
+
+import com.google.common.collect.ImmutableMap;
+import io.trino.spi.Plugin;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorFactory;
+import io.trino.testing.TestingConnectorContext;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static java.nio.file.Files.createTempDirectory;
+import static org.assertj.core.api.Assertions.assertThat;
+
+final class TestPaimonPlugin
+{
+ @Test
+ void testCreatePaimonConnector()
+ throws IOException
+ {
+ String warehouse = createTempDirectory(UUID.randomUUID().toString()).toUri().toString();
+ Plugin plugin = new PaimonPlugin();
+ ConnectorFactory factory = getOnlyElement(plugin.getConnectorFactories());
+ Connector connector = factory.create(
+ "paimon",
+ ImmutableMap.of("paimon.warehouse", warehouse, "paimon.catalog.type", "filesystem"),
+ new TestingConnectorContext());
+ assertThat(connector).isNotNull();
+ connector.shutdown();
+ }
+}
diff --git a/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonRow.java b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonRow.java
new file mode 100644
index 000000000000..7e0727522294
--- /dev/null
+++ b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonRow.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon;
+
+import io.airlift.slice.Slices;
+import io.trino.spi.Page;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.BooleanType.BOOLEAN;
+import static io.trino.spi.type.DecimalType.createDecimalType;
+import static io.trino.spi.type.Decimals.encodeScaledValue;
+import static io.trino.spi.type.Decimals.encodeShortScaledValue;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.IntegerType.INTEGER;
+import static io.trino.spi.type.RealType.REAL;
+import static io.trino.spi.type.SmallintType.SMALLINT;
+import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
+import static io.trino.spi.type.TypeUtils.writeNativeValue;
+import static io.trino.spi.type.VarbinaryType.VARBINARY;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+import static io.trino.type.DateTimes.MICROSECONDS_PER_MILLISECOND;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.paimon.data.Timestamp.fromLocalDateTime;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+final class TestPaimonRow
+{
+ @Test
+ void testPaimonRow()
+ {
+ Page singlePage = new Page(
+ 1,
+ writeNativeValue(BOOLEAN, null),
+ writeNativeValue(BOOLEAN, false),
+ writeNativeValue(VARBINARY, Slices.wrappedBuffer((byte) 22)),
+ writeNativeValue(SMALLINT, 356L),
+ writeNativeValue(INTEGER, 4L),
+ writeNativeValue(BIGINT, 23567222L),
+ writeNativeValue(REAL, (long) Float.floatToIntBits(1213.33f)),
+ writeNativeValue(DOUBLE, 121.3d),
+ writeNativeValue(VARCHAR, Slices.wrappedBuffer("rfyu".getBytes(UTF_8))),
+ writeNativeValue(createDecimalType(2, 2), encodeShortScaledValue(BigDecimal.valueOf(0.21), 2)),
+ writeNativeValue(createDecimalType(38, 2), encodeScaledValue(BigDecimal.valueOf(65782123123.01), 2)),
+ writeNativeValue(createDecimalType(10, 1), encodeShortScaledValue(BigDecimal.valueOf(62123123.5), 1)),
+ writeNativeValue(TIMESTAMP_MICROS, fromLocalDateTime(LocalDateTime.parse("2007-12-03T10:15:30")).getMillisecond() * MICROSECONDS_PER_MILLISECOND),
+ writeNativeValue(VARBINARY, Slices.wrappedBuffer("varbinary_v".getBytes(UTF_8))));
+
+ RowType rowType = RowType.builder().fields(
+ DataTypes.BOOLEAN(),
+ DataTypes.BOOLEAN(),
+ DataTypes.SMALLINT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.FLOAT(),
+ DataTypes.DOUBLE(),
+ DataTypes.STRING(),
+ DataTypes.DECIMAL(2, 2),
+ DataTypes.DECIMAL(38, 2),
+ DataTypes.DECIMAL(10, 1),
+ DataTypes.TIMESTAMP(6),
+ DataTypes.VARBINARY(Integer.MAX_VALUE)).build();
+
+ PaimonRow paimonRow = new PaimonRow(rowType, singlePage, RowKind.INSERT);
+
+ assertThat(paimonRow.getRowKind()).isEqualTo(RowKind.INSERT);
+ assertThat(paimonRow.isNullAt(0)).isEqualTo(true);
+ assertThat(paimonRow.getBoolean(1)).isEqualTo(false);
+ assertThat(paimonRow.getBinary(2)[0]).isEqualTo((byte) 22);
+ assertThat(paimonRow.getShort(3)).isEqualTo((short) 356);
+ assertThat(paimonRow.getInt(4)).isEqualTo(4);
+ assertThat(paimonRow.getLong(5)).isEqualTo(23567222L);
+ assertThat(paimonRow.getFloat(6)).isEqualTo(1213.33f);
+ assertThat(paimonRow.getDouble(7)).isEqualTo(121.3d);
+ assertThat(paimonRow.getString(8)).isEqualTo(BinaryString.fromString("rfyu"));
+ assertThat(paimonRow.getDecimal(9, 2, 2))
+ .isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(0.21), 2, 2));
+ assertThat(paimonRow.getDecimal(10, 38, 2))
+ .isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(65782123123.01), 38, 2));
+ assertThat(paimonRow.getDecimal(11, 10, 1))
+ .isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(62123123.5), 10, 1));
+ assertThat(paimonRow.getTimestamp(12, 6))
+ .isEqualTo(fromLocalDateTime(LocalDateTime.parse("2007-12-03T10:15:30")));
+ assertThat(paimonRow.getBinary(13))
+ .isEqualTo("varbinary_v".getBytes(UTF_8));
+ }
+}
diff --git a/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonSplit.java b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonSplit.java
new file mode 100644
index 000000000000..ea28db04dc46
--- /dev/null
+++ b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonSplit.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon;
+
+import io.airlift.json.JsonCodec;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+final class TestPaimonSplit
+{
+ private final JsonCodec codec = JsonCodec.jsonCodec(PaimonSplit.class);
+
+ @Test
+ void testJsonRoundTrip()
+ throws Exception
+ {
+ byte[] serializedTable = PaimonTestUtils.getSerializedTable();
+ PaimonSplit expected = new PaimonSplit(Arrays.toString(serializedTable), 0.1);
+ String json = codec.toJson(expected);
+ PaimonSplit actual = codec.fromJson(json);
+ assertThat(actual.splitSerialized()).isEqualTo(expected.splitSerialized());
+ }
+}
diff --git a/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonTrinoType.java b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonTrinoType.java
new file mode 100644
index 000000000000..74620bcdc875
--- /dev/null
+++ b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestPaimonTrinoType.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon;
+
+import io.trino.spi.type.ArrayType;
+import io.trino.spi.type.BigintType;
+import io.trino.spi.type.BooleanType;
+import io.trino.spi.type.CharType;
+import io.trino.spi.type.DateType;
+import io.trino.spi.type.DecimalType;
+import io.trino.spi.type.DoubleType;
+import io.trino.spi.type.IntegerType;
+import io.trino.spi.type.MapType;
+import io.trino.spi.type.RealType;
+import io.trino.spi.type.RowType;
+import io.trino.spi.type.SmallintType;
+import io.trino.spi.type.TimestampType;
+import io.trino.spi.type.TimestampWithTimeZoneType;
+import io.trino.spi.type.TinyintType;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.TypeOperators;
+import io.trino.spi.type.VarbinaryType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.VarCharType;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import static io.trino.plugin.paimon.PaimonTypeUtils.toPaimonType;
+import static io.trino.plugin.paimon.PaimonTypeUtils.toTrinoType;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+final class TestPaimonTrinoType
+{
+ @Test
+ void testToTrinoType()
+ {
+ assertThat(toTrinoType(DataTypes.CHAR(1)).getDisplayName()).isEqualTo("char(1)");
+ assertThat(toTrinoType(DataTypes.VARCHAR(10)).getDisplayName()).isEqualTo("varchar(10)");
+ assertThat(toTrinoType(DataTypes.BOOLEAN()).getDisplayName()).isEqualTo("boolean");
+ assertThat(toTrinoType(DataTypes.BINARY(10)).getDisplayName()).isEqualTo("varbinary");
+ assertThat(toTrinoType(DataTypes.VARBINARY(10)).getDisplayName()).isEqualTo("varbinary");
+ assertThat(toTrinoType(DataTypes.DECIMAL(38, 0)).getDisplayName()).isEqualTo("decimal(38,0)");
+ assertThat(toTrinoType(DataTypes.DECIMAL(2, 2)).getDisplayName()).isEqualTo("decimal(2,2)");
+ assertThat(toTrinoType(DataTypes.TINYINT()).getDisplayName()).isEqualTo("tinyint");
+ assertThat(toTrinoType(DataTypes.SMALLINT()).getDisplayName()).isEqualTo("smallint");
+ assertThat(toTrinoType(DataTypes.INT()).getDisplayName()).isEqualTo("integer");
+ assertThat(toTrinoType(DataTypes.BIGINT()).getDisplayName()).isEqualTo("bigint");
+ assertThat(toTrinoType(DataTypes.FLOAT()).getDisplayName()).isEqualTo("real");
+ assertThat(toTrinoType(DataTypes.DOUBLE()).getDisplayName()).isEqualTo("double");
+ assertThat(toTrinoType(DataTypes.DATE()).getDisplayName()).isEqualTo("date");
+ assertThat(toTrinoType(new TimeType(0)).getDisplayName()).isEqualTo("time(0)");
+ assertThat(toTrinoType(new TimeType(3)).getDisplayName()).isEqualTo("time(3)");
+ assertThat(toTrinoType(new TimeType(6)).getDisplayName()).isEqualTo("time(6)");
+ assertThat(toTrinoType(new TimeType(9)).getDisplayName()).isEqualTo("time(9)");
+ assertThat(toTrinoType(DataTypes.TIMESTAMP()).getDisplayName()).isEqualTo("timestamp(6)");
+ assertThat(toTrinoType(new org.apache.paimon.types.TimestampType(3)).getDisplayName()).isEqualTo("timestamp(3)");
+ assertThat(toTrinoType(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()).getDisplayName()).isEqualTo("timestamp(6) with time zone");
+ assertThat(toTrinoType(DataTypes.ARRAY(DataTypes.STRING())).getDisplayName()).isEqualTo("array(varchar)");
+ assertThat(toTrinoType(DataTypes.MULTISET(DataTypes.STRING())).getDisplayName()).isEqualTo("map(varchar, integer)");
+ assertThat(toTrinoType(DataTypes.MAP(DataTypes.BIGINT(), DataTypes.STRING())).getDisplayName()).isEqualTo("map(bigint, varchar)");
+ assertThat(toTrinoType(DataTypes.ROW(
+ new DataField(0, "id", new IntType()),
+ new DataField(1, "name", new VarCharType(Integer.MAX_VALUE)))).getDisplayName())
+ .isEqualTo("row(\"id\" integer, \"name\" varchar)");
+ }
+
+ @Test
+ void testToPaimonType()
+ {
+ assertThat(toPaimonType(CharType.createCharType(1)).asSQLString()).isEqualTo("CHAR(1)");
+ assertThat(toPaimonType(VARCHAR).asSQLString()).isEqualTo("VARCHAR(2147483646)");
+ assertThat(toPaimonType(BooleanType.BOOLEAN).asSQLString()).isEqualTo("BOOLEAN");
+ assertThat(toPaimonType(VarbinaryType.VARBINARY).asSQLString()).isEqualTo("BYTES");
+ assertThat(toPaimonType(DecimalType.createDecimalType(2, 2)).asSQLString()).isEqualTo("DECIMAL(2, 2)");
+ assertThat(toPaimonType(TinyintType.TINYINT).asSQLString()).isEqualTo("TINYINT");
+ assertThat(toPaimonType(SmallintType.SMALLINT).asSQLString()).isEqualTo("SMALLINT");
+ assertThat(toPaimonType(IntegerType.INTEGER).asSQLString()).isEqualTo("INT");
+ assertThat(toPaimonType(BigintType.BIGINT).asSQLString()).isEqualTo("BIGINT");
+ assertThat(toPaimonType(RealType.REAL).asSQLString()).isEqualTo("FLOAT");
+ assertThat(toPaimonType(DoubleType.DOUBLE).asSQLString()).isEqualTo("DOUBLE");
+ assertThat(toPaimonType(DateType.DATE).asSQLString()).isEqualTo("DATE");
+ assertThat(toPaimonType(io.trino.spi.type.TimeType.TIME_SECONDS).asSQLString()).isEqualTo("TIME(0)");
+ assertThat(toPaimonType(io.trino.spi.type.TimeType.TIME_MILLIS).asSQLString()).isEqualTo("TIME(3)");
+ assertThat(toPaimonType(TimestampType.TIMESTAMP_SECONDS).asSQLString()).isEqualTo("TIMESTAMP(0)");
+ assertThat(toPaimonType(TimestampType.TIMESTAMP_MILLIS).asSQLString()).isEqualTo("TIMESTAMP(3)");
+ assertThat(toPaimonType(TimestampType.TIMESTAMP_MICROS).asSQLString()).isEqualTo("TIMESTAMP(6)");
+ assertThat(toPaimonType(TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS).asSQLString()).isEqualTo("TIMESTAMP(3) WITH LOCAL TIME ZONE");
+ assertThat(toPaimonType(new ArrayType(IntegerType.INTEGER)).asSQLString()).isEqualTo("ARRAY");
+ assertThat(toPaimonType(new MapType(
+ IntegerType.INTEGER,
+ VARCHAR,
+ new TypeOperators())).asSQLString()).isEqualTo("MAP");
+ List fields = new ArrayList<>();
+ fields.add(new RowType.Field(Optional.of("id"), IntegerType.INTEGER));
+ fields.add(new RowType.Field(Optional.of("name"), VARCHAR));
+ Type type = RowType.from(fields);
+ DataType rowType = toPaimonType(type);
+ assertThat(rowType.asSQLString()).isEqualTo("ROW<`id` INT, `name` VARCHAR(2147483646)>");
+ }
+}
diff --git a/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestingPaimonConnectorFactory.java b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestingPaimonConnectorFactory.java
new file mode 100644
index 000000000000..eaef1e19950b
--- /dev/null
+++ b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestingPaimonConnectorFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon;
+
+import com.google.common.collect.ImmutableMap;
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.filesystem.local.LocalFileSystemFactory;
+import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorContext;
+import io.trino.spi.connector.ConnectorFactory;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Optional;
+
+import static com.google.inject.multibindings.MapBinder.newMapBinder;
+import static io.airlift.configuration.ConfigBinder.configBinder;
+
+public class TestingPaimonConnectorFactory
+ implements ConnectorFactory
+{
+ private final Path localFileSystemRootPath;
+
+ public TestingPaimonConnectorFactory(Path localFileSystemRootPath)
+ {
+ localFileSystemRootPath.toFile().mkdirs();
+ this.localFileSystemRootPath = localFileSystemRootPath;
+ }
+
+ @Override
+ public String getName()
+ {
+ return "paimon";
+ }
+
+ @Override
+ public Connector create(String catalogName, Map config, ConnectorContext context)
+ {
+ ImmutableMap.Builder configBuilder = ImmutableMap.builder()
+ .putAll(config);
+ return PaimonConnectorFactory.createConnector(catalogName, configBuilder.buildOrThrow(), context, Optional.of(binder -> {
+ newMapBinder(binder, String.class, TrinoFileSystemFactory.class)
+ .addBinding("file").toInstance(new LocalFileSystemFactory(localFileSystemRootPath));
+ configBinder(binder).bindConfigDefaults(FileHiveMetastoreConfig.class, metastoreConfig -> metastoreConfig.setCatalogDirectory("file:///managed/"));
+ }));
+ }
+}
diff --git a/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestingPaimonPlugin.java b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestingPaimonPlugin.java
new file mode 100644
index 000000000000..4a5e60a24482
--- /dev/null
+++ b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/TestingPaimonPlugin.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon;
+
+import com.google.common.collect.ImmutableList;
+import io.trino.spi.connector.ConnectorFactory;
+
+import java.nio.file.Path;
+import java.util.List;
+
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
+public class TestingPaimonPlugin
+ extends PaimonPlugin
+{
+ private final Path localFileSystemRootPath;
+
+ public TestingPaimonPlugin(Path localFileSystemRootPath)
+ {
+ this.localFileSystemRootPath = requireNonNull(localFileSystemRootPath, "localFileSystemRootPath is null");
+ }
+
+ @Override
+ public Iterable getConnectorFactories()
+ {
+ List connectorFactories = ImmutableList.copyOf(super.getConnectorFactories());
+ verify(connectorFactories.size() == 1, "Unexpected connector factories: %s", connectorFactories);
+
+ return ImmutableList.of(new TestingPaimonConnectorFactory(localFileSystemRootPath));
+ }
+}
diff --git a/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/testing/PaimonTablesInitializer.java b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/testing/PaimonTablesInitializer.java
new file mode 100644
index 000000000000..3cd7f80e62ad
--- /dev/null
+++ b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/testing/PaimonTablesInitializer.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon.testing;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.airlift.log.Logger;
+import io.trino.Session;
+import io.trino.filesystem.TrinoFileSystem;
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.metastore.HiveMetastore;
+import io.trino.metastore.HiveMetastoreFactory;
+import io.trino.plugin.paimon.CatalogType;
+import io.trino.plugin.paimon.PaimonConnector;
+import io.trino.plugin.paimon.PaimonMetadata;
+import io.trino.plugin.paimon.PaimonMetadataFactory;
+import io.trino.plugin.paimon.catalog.TrinoCatalog;
+import io.trino.plugin.paimon.fileio.PaimonFileIO;
+import io.trino.plugin.tpch.TpchMetadata;
+import io.trino.plugin.tpch.TpchPlugin;
+import io.trino.plugin.tpch.TpchTableHandle;
+import io.trino.spi.connector.CatalogSchemaName;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableMetadata;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.security.ConnectorIdentity;
+import io.trino.testing.MaterializedResult;
+import io.trino.testing.MaterializedRow;
+import io.trino.testing.QueryRunner;
+import io.trino.testing.TestingConnectorSession;
+import io.trino.tpch.TpchTable;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Database;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileIOLoader;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.intellij.lang.annotations.Language;
+
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static io.trino.plugin.paimon.testing.PaimonTestingUtils.createPaimonTable;
+import static java.lang.String.format;
+
+public class PaimonTablesInitializer
+{
+ private static final CatalogSchemaName TPCH_TINY = new CatalogSchemaName("tpch", "tiny");
+ private static final Logger log = Logger.get(PaimonTablesInitializer.class);
+
+ private final List> tpchTables;
+
+ public PaimonTablesInitializer(List> tpchTables)
+ {
+ this.tpchTables = ImmutableList.copyOf(tpchTables);
+ }
+
+ private static void createTableWithData(Catalog catalog, String databaseName, TpchTable> tpchTable, QueryRunner queryRunner)
+ throws Exception
+ {
+ @Language("SQL") String sql = generateScanSql(TPCH_TINY, tpchTable);
+ log.info("Executing %s", sql);
+ MaterializedResult result = queryRunner.execute(sql);
+
+ Table paimonTable = catalog.getTable(Identifier.create(databaseName, tpchTable.getTableName()));
+ BatchWriteBuilder builder = paimonTable.newBatchWriteBuilder();
+
+ try (BatchTableWrite write = builder.newWrite();
+ BatchTableCommit commit = builder.newCommit()) {
+ for (MaterializedRow row : result.getMaterializedRows()) {
+ write.write(toPaimonRow(row));
+ }
+ commit.commit(write.prepareCommit());
+ }
+ }
+
+ private static InternalRow toPaimonRow(MaterializedRow row)
+ {
+ GenericRow genericRow = new GenericRow(row.getFields().size());
+ for (int i = 0; i < row.getFields().size(); i++) {
+ Object field = row.getField(i);
+ switch (field) {
+ case String string -> genericRow.setField(i, BinaryString.fromString(string));
+ case LocalDate date -> genericRow.setField(i, (int) date.toEpochDay());
+ default -> genericRow.setField(i, field);
+ }
+ }
+ return genericRow;
+ }
+
+ private static String generateScanSql(CatalogSchemaName catalogSchemaName, TpchTable> table)
+ {
+ StringBuilder builder = new StringBuilder();
+ builder.append("SELECT ");
+ String columnList = table.getColumns().stream()
+ .map(column -> quote(column.getSimplifiedColumnName()))
+ .collect(Collectors.joining(", "));
+ builder.append(columnList);
+ String tableName = format("%s.%s", catalogSchemaName.toString(), table.getTableName());
+ builder.append(" FROM ").append(tableName);
+ return builder.toString();
+ }
+
+ private static String quote(String name)
+ {
+ return "\"" + name + "\"";
+ }
+
+ public void initializeTables(Session session, QueryRunner queryRunner, String schemaName)
+ throws Exception
+ {
+ queryRunner.installPlugin(new TpchPlugin());
+ queryRunner.createCatalog(TPCH_TINY.getCatalogName(), "tpch", ImmutableMap.of());
+ ConnectorIdentity connectorIdentity = ConnectorIdentity.ofUser(session.getUser());
+ ConnectorSession connectorSession = TestingConnectorSession.SESSION;
+
+ PaimonMetadata paimonMetadata = ((PaimonConnector) queryRunner.getCoordinator().getConnector("paimon")).getInjector()
+ .getInstance(PaimonMetadataFactory.class)
+ .create(connectorIdentity);
+
+ TrinoFileSystem fileSystem = ((PaimonConnector) queryRunner.getCoordinator().getConnector("paimon")).getInjector()
+ .getInstance(TrinoFileSystemFactory.class)
+ .create(connectorIdentity);
+
+ PaimonFileIO paimonFileIO = new PaimonFileIO(fileSystem, new Path(paimonMetadata.catalog().warehouse()));
+
+ TrinoCatalog paimonTrinoCatalog = paimonMetadata.catalog();
+
+ Options options = new Options();
+ options.set(CatalogOptions.WAREHOUSE, paimonTrinoCatalog.warehouse());
+ CatalogContext context = CatalogContext.create(options, new FileIOLoader()
+ {
+ @Override
+ public String getScheme()
+ {
+ return "s3";
+ }
+
+ @Override
+ public FileIO load(Path path)
+ {
+ return paimonFileIO;
+ }
+ });
+
+ Catalog paimonCatalog = new FileSystemCatalog(paimonFileIO, new Path(paimonTrinoCatalog.warehouse()), context.options())
+ {
+ @Override
+ public Database getDatabaseImpl(String name)
+ {
+ return Database.of(name);
+ }
+ };
+
+ if (paimonMetadata.catalog().config().getCatalogType() == CatalogType.HIVE) {
+ HiveMetastoreFactory hiveMetastoreFactory = ((PaimonConnector) queryRunner.getCoordinator().getConnector("paimon")).getInjector()
+ .getInstance(HiveMetastoreFactory.class);
+ HiveMetastore hiveMetaStore = hiveMetastoreFactory.createMetastore(Optional.of(connectorIdentity));
+ paimonCatalog = new TrinoHiveCatalog(hiveMetaStore, paimonCatalog);
+ }
+
+ paimonCatalog.createDatabase(schemaName, true);
+ TpchMetadata tpchMetadata = (TpchMetadata) queryRunner.getCoordinator().getConnector("tpch").getMetadata(null, null);
+
+ for (TpchTable> tpchTable : tpchTables) {
+ TpchTableHandle tpchTableHandle = tpchMetadata.getTableHandle(connectorSession, SchemaTableName.schemaTableName(TPCH_TINY.getSchemaName(), tpchTable.getTableName()), Optional.empty(), Optional.empty());
+ ConnectorTableMetadata metadata = tpchMetadata.getTableMetadata(connectorSession, tpchTableHandle);
+ ConnectorTableMetadata paimonTableMeta = new ConnectorTableMetadata(
+ new SchemaTableName(schemaName, metadata.getTable().getTableName()),
+ metadata.getColumns().stream().filter(c -> !c.isHidden()).collect(Collectors.toList()),
+ metadata.getProperties(),
+ metadata.getComment());
+ createPaimonTable(paimonCatalog, paimonTableMeta);
+ createTableWithData(paimonCatalog, session.getSchema().orElse(schemaName), tpchTable, queryRunner);
+ }
+ }
+}
diff --git a/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/testing/PaimonTestingUtils.java b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/testing/PaimonTestingUtils.java
new file mode 100644
index 000000000000..13f6b55ba21a
--- /dev/null
+++ b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/testing/PaimonTestingUtils.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon.testing;
+
+import io.trino.plugin.paimon.PaimonTableOptionUtils;
+import io.trino.plugin.paimon.PaimonTableOptions;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorTableMetadata;
+import io.trino.spi.connector.SchemaTableName;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.schema.Schema;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static io.trino.plugin.paimon.PaimonTypeUtils.toPaimonType;
+import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_FOUND;
+import static java.lang.String.format;
+
+/**
+ * Utility class for Paimon connector testing.
+ * This class provides helper methods for creating tables in tests.
+ */
+public final class PaimonTestingUtils
+{
+ private PaimonTestingUtils() {}
+
+ /**
+ * Create a Paimon table for testing purposes.
+ * This method is intended for use in tests only.
+ */
+ public static void createPaimonTable(
+ Catalog catalog,
+ ConnectorTableMetadata tableMetadata)
+ {
+ SchemaTableName table = tableMetadata.getTable();
+ Identifier identifier = Identifier.create(table.getSchemaName(), table.getTableName());
+
+ try {
+ catalog.createTable(identifier, prepareSchema(tableMetadata), true);
+ }
+ catch (Catalog.DatabaseNotExistException e) {
+ throw new TrinoException(SCHEMA_NOT_FOUND, format("Schema %s not found", table.getSchemaName()), e);
+ }
+ catch (Catalog.TableAlreadyExistException _) {
+ // Table was already created by a previous test setup step.
+ }
+ }
+
+ private static Schema prepareSchema(ConnectorTableMetadata tableMetadata)
+ {
+ Map properties = new HashMap<>(tableMetadata.getProperties());
+ Schema.Builder builder = Schema.newBuilder()
+ .primaryKey(PaimonTableOptions.getPrimaryKeys(properties))
+ .partitionKeys(PaimonTableOptions.getPartitionedKeys(properties))
+ .comment(tableMetadata.getComment().orElse(null));
+
+ for (ColumnMetadata column : tableMetadata.getColumns()) {
+ builder.column(column.getName(), toPaimonType(column.getType()), column.getComment().orElse(null));
+ }
+
+ PaimonTableOptionUtils.buildOptions(builder, properties);
+
+ return builder.build();
+ }
+}
diff --git a/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/testing/TrinoHiveCatalog.java b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/testing/TrinoHiveCatalog.java
new file mode 100644
index 000000000000..88c1ca8d9b2c
--- /dev/null
+++ b/plugin/trino-paimon/src/test/java/io/trino/plugin/paimon/testing/TrinoHiveCatalog.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.paimon.testing;
+
+import io.trino.metastore.Column;
+import io.trino.metastore.HiveMetastore;
+import io.trino.metastore.HiveType;
+import io.trino.metastore.PrincipalPrivileges;
+import io.trino.metastore.StorageFormat;
+import org.apache.paimon.PagedList;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogLoader;
+import org.apache.paimon.catalog.Database;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.function.Function;
+import org.apache.paimon.function.FunctionChange;
+import org.apache.paimon.partition.Partition;
+import org.apache.paimon.partition.PartitionStatistics;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.Instant;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.TableSnapshot;
+import org.apache.paimon.types.DataField;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.trino.metastore.type.TypeInfoUtils.getTypeInfoFromTypeString;
+import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE;
+import static java.util.Objects.requireNonNull;
+
+public class TrinoHiveCatalog
+ implements Catalog
+{
+ private static final String INPUT_FORMAT_CLASS_NAME = "org.apache.paimon.hive.mapred.PaimonInputFormat";
+ private static final String OUTPUT_FORMAT_CLASS_NAME = "org.apache.paimon.hive.mapred.PaimonOutputFormat";
+ private static final String SERDE_CLASS_NAME = "org.apache.paimon.hive.PaimonSerDe";
+
+ public static final StorageFormat PAIMON_METASTORE_STORAGE_FORMAT = StorageFormat.create(
+ SERDE_CLASS_NAME,
+ INPUT_FORMAT_CLASS_NAME,
+ OUTPUT_FORMAT_CLASS_NAME);
+
+ private final HiveMetastore hiveMetastore;
+ private final Catalog baseCatalog;
+
+ public TrinoHiveCatalog(HiveMetastore hiveMetastore, Catalog baseCatalog)
+ {
+ this.hiveMetastore = requireNonNull(hiveMetastore, "hiveMetastore is null");
+ this.baseCatalog = requireNonNull(baseCatalog, "baseCatalog is null");
+ }
+
+ public static List toHiveColumns(List columns)
+ {
+ return columns.stream()
+ .map(column -> new Column(
+ column.name(),
+ HiveType.fromTypeInfo(getTypeInfoFromTypeString(column.type().asSQLString().toLowerCase(Locale.ROOT))),
+ Optional.ofNullable(column.description()),
+ Map.of()))
+ .collect(toImmutableList());
+ }
+
+ @Override
+ public Map options()
+ {
+ return baseCatalog.options();
+ }
+
+ @Override
+ public CatalogLoader catalogLoader()
+ {
+ return baseCatalog.catalogLoader();
+ }
+
+ @Override
+ public boolean caseSensitive()
+ {
+ return baseCatalog.caseSensitive();
+ }
+
+ @Override
+ public List listDatabases()
+ {
+ return hiveMetastore.getAllDatabases();
+ }
+
+ @Override
+ public PagedList listDatabasesPaged(@Nullable Integer integer, @Nullable String s, @Nullable String s1)
+ {
+ return null;
+ }
+
+ @Override
+ public void createDatabase(String name, boolean ignoreIfExists, Map options)
+ throws DatabaseAlreadyExistException
+ {
+ if (databaseExists(name)) {
+ if (ignoreIfExists) {
+ return;
+ }
+ throw new DatabaseAlreadyExistException(name);
+ }
+
+ hiveMetastore.createDatabase(io.trino.metastore.Database.builder().setDatabaseName(name).setOwnerName(Optional.empty()).setOwnerType(Optional.empty()).setParameters(options).build());
+ baseCatalog.createDatabase(name, ignoreIfExists);
+ }
+
+ private boolean databaseExists(String databaseName)
+ {
+ return hiveMetastore.getDatabase(databaseName).isPresent();
+ }
+
+ @Override
+ public Database getDatabase(String name)
+ throws DatabaseNotExistException
+ {
+ Optional database = hiveMetastore.getDatabase(name);
+ if (database.isEmpty()) {
+ throw new DatabaseNotExistException(name);
+ }
+
+ return new Database.DatabaseImpl(name, database.get().getParameters(), database.get().getComment().orElse(null));
+ }
+
+ @Override
+ public void dropDatabase(String databaseName, boolean ignoreIfNotExists, boolean cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException
+ {
+ if (!databaseExists(databaseName)) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+ throw new DatabaseNotExistException(databaseName);
+ }
+
+ hiveMetastore.dropDatabase(databaseName, false);
+ baseCatalog.dropDatabase(databaseName, ignoreIfNotExists, cascade);
+ }
+
+ @Override
+ public void alterDatabase(String databaseName, List propertyChanges, boolean ignoreIfNotExists)
+ throws DatabaseNotExistException
+ {
+ if (!databaseExists(databaseName)) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+ throw new DatabaseNotExistException(databaseName);
+ }
+
+ baseCatalog.alterDatabase(databaseName, propertyChanges, ignoreIfNotExists);
+ }
+
+ @Override
+ public Table getTable(Identifier identifier)
+ throws TableNotExistException
+ {
+ return baseCatalog.getTable(identifier);
+ }
+
+ @Override
+ public List listTables(String databaseName)
+ throws DatabaseNotExistException
+ {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(databaseName);
+ }
+ return hiveMetastore.getTables(databaseName).stream().map(t -> t.tableName().getTableName()).collect(Collectors.toList());
+ }
+
+ @Override
+ public PagedList listTablesPaged(String s, @Nullable Integer integer, @Nullable String s1, @Nullable String s2, @Nullable String s3)
+ throws DatabaseNotExistException
+ {
+ throw new UnsupportedOperationException("Alter table is not supported yet");
+ }
+
+ @Override
+ public PagedList listTableDetailsPaged(String s, @Nullable Integer integer, @Nullable String s1, @Nullable String s2, @Nullable String s3)
+ {
+ throw new UnsupportedOperationException("Alter table is not supported yet");
+ }
+
+ @Override
+ public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
+ throws TableNotExistException
+ {
+ hiveMetastore.dropTable(identifier.getDatabaseName(), identifier.getTableName(), false);
+ baseCatalog.dropTable(identifier, ignoreIfNotExists);
+ }
+
+ @Override
+ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException
+ {
+ if (!databaseExists(identifier.getDatabaseName())) {
+ throw new DatabaseNotExistException(identifier.getDatabaseName());
+ }
+
+ if (hiveMetastore.getTable(identifier.getDatabaseName(), identifier.getTableName()).isPresent()) {
+ if (!ignoreIfExists) {
+ throw new TableAlreadyExistException(identifier);
+ }
+ }
+
+ hiveMetastore.createTable(io.trino.metastore.Table.builder()
+ .setDatabaseName(identifier.getDatabaseName())
+ .setTableName(identifier.getTableName())
+ .setDataColumns(toHiveColumns(schema.fields()))
+ .setOwner(Optional.of("paimon"))
+ .setTableType(EXTERNAL_TABLE.name())
+ .withStorage(storage -> storage.setLocation(baseCatalog.options().get("warehouse") + "/" + identifier.getDatabaseName() + ".db/" + identifier.getTableName()))
+ .withStorage(storage -> storage.setStorageFormat(PAIMON_METASTORE_STORAGE_FORMAT))
+ .setParameter("EXTERNAL", "TRUE")
+ .setParameters(schema.options())
+ .build(), PrincipalPrivileges.NO_PRIVILEGES);
+
+ baseCatalog.createTable(identifier, schema, ignoreIfExists);
+ }
+
+ @Override
+ public void renameTable(Identifier identifier, Identifier newIdentifier, boolean b)
+ throws TableNotExistException, TableAlreadyExistException
+ {
+ hiveMetastore.renameTable(identifier.getDatabaseName(), identifier.getTableName(), newIdentifier.getDatabaseName(), newIdentifier.getTableName());
+ baseCatalog.renameTable(identifier, newIdentifier, b);
+ }
+
+ @Override
+ public void alterTable(Identifier identifier, List list, boolean b)
+ {
+ throw new UnsupportedOperationException("Alter table is not supported yet");
+ }
+
+ @Override
+ public void markDonePartitions(Identifier identifier, List