diff --git a/core/trino-server/src/main/provisio/trino.xml b/core/trino-server/src/main/provisio/trino.xml
index 8fcbb0d30943..1da10a350243 100644
--- a/core/trino-server/src/main/provisio/trino.xml
+++ b/core/trino-server/src/main/provisio/trino.xml
@@ -40,6 +40,12 @@
+
+
+
+
+
+
diff --git a/plugin/trino-couchbase/pom.xml b/plugin/trino-couchbase/pom.xml
new file mode 100644
index 000000000000..3c108bfc4c73
--- /dev/null
+++ b/plugin/trino-couchbase/pom.xml
@@ -0,0 +1,267 @@
+
+
+ 4.0.0
+
+
+ io.trino
+ trino-root
+ 480-SNAPSHOT
+ ../../pom.xml
+
+
+ trino-couchbase
+ trino-plugin
+ ${project.artifactId}
+ Trino - Couchbase Connector
+
+
+ true
+
+
+
+
+
+ com.couchbase.client
+ core-io
+ 3.11.1
+
+
+
+ com.couchbase.client
+ java-client
+ 3.11.1
+
+
+
+ com.google.guava
+ guava
+
+
+
+ com.google.inject
+ guice
+ classes
+
+
+
+ io.airlift
+ bootstrap
+
+
+
+ io.airlift
+ configuration
+
+
+
+ io.airlift
+ json
+
+
+
+ io.airlift
+ security
+
+
+
+ io.trino
+ trino-plugin-toolkit
+
+
+
+ jakarta.annotation
+ jakarta.annotation-api
+
+
+
+ jakarta.inject
+ jakarta.inject-api
+
+
+
+ jakarta.validation
+ jakarta.validation-api
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+ org.weakref
+ jmxutils
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+
+ io.airlift
+ slice
+ provided
+
+
+
+ io.opentelemetry
+ opentelemetry-api
+ provided
+
+
+
+ io.opentelemetry
+ opentelemetry-api-incubator
+ provided
+
+
+
+ io.opentelemetry
+ opentelemetry-context
+ provided
+
+
+
+ io.trino
+ trino-spi
+ provided
+
+
+
+ io.airlift
+ log
+ runtime
+
+
+
+ io.airlift
+ log-manager
+ runtime
+
+
+
+ io.airlift
+ units
+ runtime
+
+
+
+ com.exasol
+ exasol-testcontainers
+ 7.2.2
+ test
+
+
+
+ io.airlift
+ configuration-testing
+ test
+
+
+ io.airlift
+ junit-extensions
+ test
+
+
+
+ io.airlift
+ testing
+ test
+
+
+
+ io.trino
+ trino-base-jdbc
+ test-jar
+ test
+
+
+
+ io.trino
+ trino-main
+ test
+
+
+
+ io.trino
+ trino-main
+ test-jar
+ test
+
+
+
+ io.trino
+ trino-parser
+ test
+
+
+
+ io.trino
+ trino-testing
+ test
+
+
+
+ io.trino
+ trino-testing-services
+ test
+
+
+
+ io.trino
+ trino-tpch
+ test
+
+
+
+ io.trino.tpch
+ tpch
+ test
+
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+
+ org.jetbrains
+ annotations
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-params
+ test
+
+
+
+ org.testcontainers
+ testcontainers
+ test
+
+
+ org.testcontainers
+ testcontainers-couchbase
+ 2.0.1
+ test
+
+
+
+
diff --git a/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseClient.java b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseClient.java
new file mode 100644
index 000000000000..8fd0af09b7e6
--- /dev/null
+++ b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseClient.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.couchbase;
+
+import com.couchbase.client.core.env.Authenticator;
+import com.couchbase.client.core.env.CertificateAuthenticator;
+import com.couchbase.client.java.Bucket;
+import com.couchbase.client.java.Cluster;
+import com.couchbase.client.java.ClusterOptions;
+import com.couchbase.client.java.Scope;
+import io.airlift.security.pem.PemReader;
+import jakarta.inject.Inject;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public class CouchbaseClient
+{
+ private final CouchbaseConfig config;
+ private final Cluster cluster;
+
+ @Inject
+ public CouchbaseClient(CouchbaseConfig config)
+ {
+ this.config = config;
+ try {
+ if (config.getTlsKey() != null) {
+ PrivateKey key;
+ Optional password = Optional.ofNullable(config.getTlsKeyPassword());
+ List keyCertChain = new ArrayList<>();
+ if (new File(config.getTlsKey()).exists()) {
+ // load from file
+ key = PemReader.loadPrivateKey(new File(config.getTlsKey()), password);
+ }
+ else {
+ // try loading from string
+ key = PemReader.loadPrivateKey(config.getTlsKey(), password);
+ }
+ if (config.getTlsCertificate() != null) {
+ KeyStore tlsKeyStore = PemReader.loadTrustStore(new File(config.getTlsCertificate()));
+ tlsKeyStore.aliases().asIterator().forEachRemaining(alias -> {
+ try {
+ for (Certificate cert : tlsKeyStore.getCertificateChain(alias)) {
+ if (cert instanceof X509Certificate) {
+ keyCertChain.add((X509Certificate) cert);
+ }
+ }
+ }
+ catch (KeyStoreException e) {
+ throw new RuntimeException("Failed to load TLS certificates", e);
+ }
+ });
+ }
+ Authenticator authenticator = CertificateAuthenticator.fromKey(
+ key, password.orElse(""), keyCertChain);
+ cluster = Cluster.connect(
+ config.getCluster(),
+ ClusterOptions.clusterOptions(authenticator)
+ .environment(env -> {
+ env.securityConfig(security -> {
+ if (config.getTlsCertificate() != null) {
+ security.trustCertificate(Path.of(config.getTlsCertificate()));
+ }
+ });
+ env.timeoutConfig(timeout -> {
+ timeout.kvTimeout(config.getTimeouts());
+ timeout.queryTimeout(config.getTimeouts());
+ });
+ }));
+ }
+ else {
+ cluster = Cluster.connect(
+ config.getCluster(),
+ ClusterOptions.clusterOptions(config.getUsername(), config.getPassword())
+ .environment(env -> {
+ env.securityConfig(security -> {
+ if (config.getTlsCertificate() != null) {
+ security.trustCertificate(Path.of(config.getTlsCertificate()));
+ }
+ });
+ env.timeoutConfig(timeout -> {
+ timeout.kvTimeout(config.getTimeouts());
+ timeout.queryTimeout(config.getTimeouts());
+ });
+ }));
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to instantiate Couchbase client", e);
+ }
+ }
+
+ public Bucket getBucket()
+ {
+ return cluster.bucket(config.getBucket());
+ }
+
+ public Scope getScope()
+ {
+ return getBucket().scope(config.getScope());
+ }
+}
diff --git a/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseColumnHandle.java b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseColumnHandle.java
new file mode 100644
index 000000000000..52dc76ae6043
--- /dev/null
+++ b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseColumnHandle.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.couchbase;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.type.Type;
+
+import java.util.List;
+
+public record CouchbaseColumnHandle(List path, List dereferenceNames, Type type,
+ boolean synthetic) implements ColumnHandle
+{
+ public CouchbaseColumnHandle
+ {
+ path = ImmutableList.copyOf(path);
+ }
+
+ public String fullName()
+ {
+ return String.format("`%s`", Joiner.on("`.`").join(path));
+ }
+
+ public String name()
+ {
+ return path.getLast();
+ }
+
+ public boolean isSynthetic()
+ {
+ return synthetic;
+ }
+}
diff --git a/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseConfig.java b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseConfig.java
new file mode 100644
index 000000000000..e0cfcfeeea7b
--- /dev/null
+++ b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseConfig.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.couchbase;
+
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+import io.airlift.configuration.ConfigSecuritySensitive;
+import io.trino.spi.function.Description;
+
+import java.time.Duration;
+
+public class CouchbaseConfig
+{
+ private String cluster = "localhost";
+ private String username = "Administrator";
+ private String password = "password";
+ private String tlsKey;
+ private String tlsKeyPassword;
+ private String tlsCertificate;
+ private String schemaFolder = "couchbase-schema";
+ private String bucket = "default";
+ private String scope = "_default";
+ private Duration timeouts = Duration.ofSeconds(60);
+ private Long pageSize = 5000L;
+
+ @Config("couchbase.cluster")
+ @ConfigDescription("Couchbase cluster connection string")
+ public CouchbaseConfig setCluster(String connstring)
+ {
+ this.cluster = connstring;
+ return this;
+ }
+
+ public String getCluster()
+ {
+ return cluster;
+ }
+
+ @Config("couchbase.username")
+ @ConfigDescription("Username for the cluster")
+ public CouchbaseConfig setUsername(String username)
+ {
+ this.username = username;
+ return this;
+ }
+
+ public String getUsername()
+ {
+ return username;
+ }
+
+ @Config("couchbase.password")
+ @ConfigDescription("Password for the cluster")
+ @ConfigSecuritySensitive
+ public CouchbaseConfig setPassword(String password)
+ {
+ this.password = password;
+ return this;
+ }
+
+ public String getPassword()
+ {
+ return password;
+ }
+
+ @Config("couchbase.tls-key")
+ @ConfigDescription("Key file address for mTls")
+ public CouchbaseConfig setTlsKey(String tlsKey)
+ {
+ this.tlsKey = tlsKey;
+ return this;
+ }
+
+ public String getTlsKey()
+ {
+ return tlsKey;
+ }
+
+ @Config("couchbase.tls-key-password")
+ @ConfigDescription("Key password")
+ @ConfigSecuritySensitive
+ public CouchbaseConfig setTlsKeyPassword(String password)
+ {
+ this.tlsKeyPassword = password;
+ return this;
+ }
+
+ public String getTlsKeyPassword()
+ {
+ return tlsKeyPassword;
+ }
+
+ @Config("couchbase.tls-certificate")
+ @ConfigDescription("Cluster root certificate file address")
+ public CouchbaseConfig setTlsCertificate(String certificate)
+ {
+ this.tlsCertificate = certificate;
+ return this;
+ }
+
+ public String getTlsCertificate()
+ {
+ return tlsCertificate;
+ }
+
+ @Config("couchbase.schema-folder")
+ @ConfigDescription("Path for folder with json files containing Trino schema mappings")
+ public CouchbaseConfig setSchemaFolder(String schemaFolder)
+ {
+ this.schemaFolder = schemaFolder;
+ return this;
+ }
+
+ public String getSchemaFolder()
+ {
+ return schemaFolder;
+ }
+
+ @Config("couchbase.bucket")
+ @ConfigDescription("Bucket to connect to")
+ public CouchbaseConfig setBucket(String bucket)
+ {
+ this.bucket = bucket;
+ return this;
+ }
+
+ public String getBucket()
+ {
+ return bucket;
+ }
+
+ @Config("couchbase.scope")
+ @ConfigDescription("Scope to connect to")
+ public CouchbaseConfig setScope(String scope)
+ {
+ this.scope = scope;
+ return this;
+ }
+
+ public String getScope()
+ {
+ return scope;
+ }
+
+ @Config("couchbase.timeouts")
+ @ConfigDescription("Operations timeout in seconds")
+ public CouchbaseConfig setTimeouts(String timeout)
+ {
+ this.timeouts = Duration.ofSeconds(Long.parseLong(timeout));
+ return this;
+ }
+
+ public Duration getTimeouts()
+ {
+ return timeouts;
+ }
+
+ @Config("couchbase.page-size")
+ @Description("Maximum number of rows to be fetched in a single query")
+ public CouchbaseConfig setPageSize(String value)
+ {
+ this.pageSize = Long.valueOf(value);
+ return this;
+ }
+
+ public Long getPageSize()
+ {
+ return pageSize;
+ }
+}
diff --git a/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseConnector.java b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseConnector.java
new file mode 100644
index 000000000000..5ab60b614e1d
--- /dev/null
+++ b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseConnector.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.couchbase;
+
+import com.google.inject.Inject;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorPageSourceProvider;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorSplitManager;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.transaction.IsolationLevel;
+
+public class CouchbaseConnector
+ implements Connector
+{
+ private final CouchbaseMetadata metadata;
+ private final CouchbaseSplitManager splitManager;
+ private final CouchbasePageSourceProvider pageSourceProvider;
+
+ @Inject
+ public CouchbaseConnector(
+ CouchbaseMetadata metadata,
+ CouchbaseSplitManager splitManager,
+ CouchbasePageSourceProvider pageSourceProvider)
+ {
+ this.metadata = metadata;
+ this.splitManager = splitManager;
+ this.pageSourceProvider = pageSourceProvider;
+ }
+
+ @Override
+ public void shutdown()
+ {
+ }
+
+ @Override
+ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit)
+ {
+ return CouchbaseTransactionHandle.INSTANCE;
+ }
+
+ @Override
+ public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transactionHandle)
+ {
+ return metadata;
+ }
+
+ @Override
+ public ConnectorSplitManager getSplitManager()
+ {
+ return splitManager;
+ }
+
+ @Override
+ public ConnectorPageSourceProvider getPageSourceProvider()
+ {
+ return pageSourceProvider;
+ }
+}
diff --git a/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseConnectorFactory.java b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseConnectorFactory.java
new file mode 100644
index 000000000000..3bd25a48a4ec
--- /dev/null
+++ b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseConnectorFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.couchbase;
+
+import com.google.inject.Injector;
+import io.airlift.bootstrap.Bootstrap;
+import io.airlift.json.JsonModule;
+import io.trino.plugin.base.ConnectorContextModule;
+import io.trino.plugin.base.TypeDeserializerModule;
+import io.trino.plugin.base.jmx.ConnectorObjectNameGeneratorModule;
+import io.trino.plugin.base.jmx.MBeanServerModule;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorContext;
+import io.trino.spi.connector.ConnectorFactory;
+import org.weakref.jmx.guice.MBeanModule;
+
+import java.util.Map;
+
+import static io.trino.plugin.base.Versions.checkStrictSpiVersionMatch;
+import static java.util.Objects.requireNonNull;
+
+public class CouchbaseConnectorFactory
+ implements ConnectorFactory
+{
+ @Override
+ public String getName()
+ {
+ return "couchbase";
+ }
+
+ @Override
+ public Connector create(String catalogName, Map config, ConnectorContext context)
+ {
+ requireNonNull(catalogName, "catalogName is null");
+ requireNonNull(config, "config is null");
+ checkStrictSpiVersionMatch(context, this);
+
+ Bootstrap app = new Bootstrap(
+ "io.trino.bootstrap.catalog." + catalogName,
+ new MBeanModule(),
+ new MBeanServerModule(),
+ new ConnectorObjectNameGeneratorModule("io.trino.plugin.couchbase", "trino.plugin.couchbase"),
+ new JsonModule(),
+ new TypeDeserializerModule(),
+ new CouchbaseConnectorModule(),
+ new ConnectorContextModule(catalogName, context));
+
+ Injector injector = app
+ .doNotInitializeLogging()
+ .disableSystemProperties()
+ .setRequiredConfigurationProperties(config)
+ .initialize();
+
+ return injector.getInstance(CouchbaseConnector.class);
+ }
+}
diff --git a/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseConnectorModule.java b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseConnectorModule.java
new file mode 100644
index 000000000000..1bd2458c7543
--- /dev/null
+++ b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseConnectorModule.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.couchbase;
+
+import com.google.inject.Binder;
+import com.google.inject.Scopes;
+import io.airlift.configuration.AbstractConfigurationAwareModule;
+
+import static io.airlift.configuration.ConfigBinder.configBinder;
+import static org.weakref.jmx.guice.ExportBinder.newExporter;
+
+public class CouchbaseConnectorModule
+ extends AbstractConfigurationAwareModule
+{
+ @Override
+ protected void setup(Binder binder)
+ {
+ binder.bind(CouchbaseConnector.class).in(Scopes.SINGLETON);
+ binder.bind(CouchbaseMetadata.class).in(Scopes.SINGLETON);
+ binder.bind(CouchbaseSplitManager.class).in(Scopes.SINGLETON);
+ binder.bind(CouchbasePageSourceProvider.class).in(Scopes.SINGLETON);
+ binder.bind(CouchbaseClient.class).in(Scopes.SINGLETON);
+
+ newExporter(binder).export(CouchbaseClient.class).withGeneratedName();
+
+ configBinder(binder).bindConfig(CouchbaseConfig.class);
+ }
+}
diff --git a/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseMetadata.java b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseMetadata.java
new file mode 100644
index 000000000000..ba558165e536
--- /dev/null
+++ b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseMetadata.java
@@ -0,0 +1,630 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.couchbase;
+
+import com.couchbase.client.java.json.JsonArray;
+import com.couchbase.client.java.json.JsonObject;
+import com.couchbase.client.java.manager.collection.CollectionManager;
+import com.couchbase.client.protostellar.admin.collection.v1.CollectionAdminServiceGrpc;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import io.trino.plugin.base.projection.ApplyProjectionUtil;
+import io.trino.plugin.couchbase.translations.TrinoExpressionToCb;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.AggregateFunction;
+import io.trino.spi.connector.AggregationApplicationResult;
+import io.trino.spi.connector.Assignment;
+import io.trino.spi.connector.CatalogSchemaTableName;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTableMetadata;
+import io.trino.spi.connector.ConnectorTableVersion;
+import io.trino.spi.connector.Constraint;
+import io.trino.spi.connector.ConstraintApplicationResult;
+import io.trino.spi.connector.JoinApplicationResult;
+import io.trino.spi.connector.JoinStatistics;
+import io.trino.spi.connector.JoinType;
+import io.trino.spi.connector.LimitApplicationResult;
+import io.trino.spi.connector.ProjectionApplicationResult;
+import io.trino.spi.connector.SampleApplicationResult;
+import io.trino.spi.connector.SampleType;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.connector.SortItem;
+import io.trino.spi.connector.TableFunctionApplicationResult;
+import io.trino.spi.connector.TableScanRedirectApplicationResult;
+import io.trino.spi.connector.TopNApplicationResult;
+import io.trino.spi.expression.ConnectorExpression;
+import io.trino.spi.expression.Constant;
+import io.trino.spi.expression.Variable;
+import io.trino.spi.function.table.ConnectorTableFunctionHandle;
+import io.trino.spi.predicate.Domain;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.type.ArrayType;
+import io.trino.spi.type.BigintType;
+import io.trino.spi.type.BooleanType;
+import io.trino.spi.type.DateType;
+import io.trino.spi.type.DecimalType;
+import io.trino.spi.type.DoubleType;
+import io.trino.spi.type.IntegerType;
+import io.trino.spi.type.RowType;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.TypeManager;
+import io.trino.spi.type.VarcharType;
+import jakarta.annotation.Nullable;
+import jakarta.inject.Inject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.Reader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static com.couchbase.client.core.deps.com.google.common.collect.ImmutableSet.toImmutableSet;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
+import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns;
+import static io.trino.plugin.base.projection.ApplyProjectionUtil.replaceWithNewVariables;
+import static io.trino.plugin.couchbase.translations.TrinoToCbType.isPushdownSupportedType;
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+import static java.util.Objects.requireNonNull;
+import static java.util.function.Function.identity;
+
+public class CouchbaseMetadata
+ implements ConnectorMetadata
+{
+ private static final Logger LOG = LoggerFactory.getLogger(CouchbaseMetadata.class);
+ private final TypeManager typeManager;
+ private final CouchbaseClient client;
+ private final CouchbaseConfig config;
+ private final Map mtimeCache = new ConcurrentHashMap<>();
+ private final Map metaCache = new ConcurrentHashMap<>();
+
+ @Inject
+ public CouchbaseMetadata(TypeManager typeManager, CouchbaseClient client, CouchbaseConfig config)
+ {
+ this.typeManager = typeManager;
+ this.client = client;
+ this.config = config;
+ }
+
+ @Override
+ public boolean schemaExists(ConnectorSession session, String schemaName)
+ {
+ return client.getBucket().collections().getAllScopes().stream()
+ .filter(scope -> scope.name().equals(schemaName))
+ .findAny().isPresent();
+ }
+
+ @Nullable
+ @Override
+ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName, Optional startVersion, Optional endVersion)
+ {
+ if (startVersion.isPresent() || endVersion.isPresent()) {
+ throw new TrinoException(NOT_SUPPORTED, "This connector does not support versioned tables");
+ }
+
+ CollectionManager cm = client.getBucket().collections();
+ return cm.getAllScopes().stream()
+ .filter(scopeSpec -> scopeSpec.name().equals(tableName.getSchemaName()))
+ .flatMap(scopeSpec -> scopeSpec.collections().stream())
+ .filter(collectionSpec -> collectionSpec.name().equals(tableName.getTableName()))
+ .findFirst().map(collectionSpec -> CouchbaseTableHandle.fromSchemaAndName(tableName.getSchemaName(), tableName.getTableName()))
+ .orElse(null);
+ }
+
+ @Override
+ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
+ {
+ requireNonNull(config.getSchemaFolder(), "Couchbase schema folder is not set");
+ if (!(table instanceof CouchbaseTableHandle)) {
+ throw new RuntimeException("Couchbase table handle is not an instance of CouchbaseTableHandle");
+ }
+
+ CouchbaseTableHandle handle = (CouchbaseTableHandle) table;
+ String tablePath = handle.path();
+ checkMtime(handle);
+ if (metaCache.get(tablePath) == null) {
+ File schemaFile = getSchemaFile(handle);
+ if (!schemaFile.exists()) {
+ throw new RuntimeException(String.format("Couchbase schema file '%s' does not exist", schemaFile.getAbsolutePath()));
+ }
+ List columns = new LinkedList<>();
+ try (Reader schemaReader = Files.newBufferedReader(Path.of(schemaFile.toURI()))) {
+ JsonObject schema = JsonObject.fromJson(schemaReader.readAllAsString());
+ JsonObject properties = schema.getObject("properties");
+
+ ColumnMetadata[] orderedColumns = new ColumnMetadata[properties.size()];
+ boolean unordered = false;
+ boolean ordered = false;
+ for (String propertyName : properties.getNames()) {
+ if (propertyName.equals("~meta")) {
+ // skip the meta column
+ continue;
+ }
+ try {
+ JsonObject property = properties.getObject(propertyName);
+ if (property.containsKey("order")) {
+ if (unordered) {
+ throw new RuntimeException(String.format("unable to mix ordered and unordered properties: %s", tablePath));
+ }
+ int order = property.getInt("order");
+ orderedColumns[order] = new ColumnMetadata(propertyName, deductType(property));
+ }
+ else {
+ if (ordered) {
+ throw new RuntimeException(String.format("unable to mix ordered and unordered properties: %s", tablePath));
+ }
+ unordered = true;
+ columns.add(new ColumnMetadata(propertyName, deductType(property)));
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException(String.format("Failed to read schema for column '%s': %s",
+ propertyName, e.getMessage()), e);
+ }
+ }
+
+ if (!unordered) {
+ columns = Arrays.asList(orderedColumns);
+ }
+
+ LOG.debug("Loaded schema for table '{}': {}", tablePath, columns);
+ ConnectorTableMetadata result = new ConnectorTableMetadata(new SchemaTableName(handle.schema(), handle.name()), columns);
+ metaCache.put(tablePath, result);
+ mtimeCache.put(tablePath, Files.getLastModifiedTime(schemaFile.toPath()).toMillis());
+ }
+ catch (Exception e) {
+ LOG.error(String.format("Failed to read schema for table '%s'", tablePath), e);
+ throw new RuntimeException(String.format("Failed to read schema for collection '%s.%s': %s",
+ ((CouchbaseTableHandle) table).schema(), ((CouchbaseTableHandle) table).name(), e.getMessage()), e);
+ }
+ }
+
+ return metaCache.get(tablePath);
+ }
+
+ @Override
+ public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ CouchbaseTableHandle handle = (CouchbaseTableHandle) tableHandle;
+ ConnectorTableMetadata tableMetadata = getTableMetadata(session, tableHandle);
+ return tableMetadata.getColumns().stream()
+ .collect(Collectors.toMap(ColumnMetadata::getName,
+ column -> new CouchbaseColumnHandle(
+ Arrays.asList(handle.schema(), handle.name(), column.getName()),
+ new ArrayList<>(),
+ column.getType(),
+ false)));
+ }
+
+ @Override
+ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
+ {
+ ConnectorTableMetadata tableMeta = getTableMetadata(session, tableHandle);
+ return tableMeta.getColumns().stream()
+ .filter(column -> column.getName().equals(((CouchbaseColumnHandle) columnHandle).name()))
+ .findFirst()
+ .orElseGet(() -> {
+ if (columnHandle instanceof CouchbaseColumnHandle cbHandle) {
+ return new ColumnMetadata(cbHandle.name(), cbHandle.type());
+ }
+ return null;
+ });
+ }
+
+ private File getSchemaFile(CouchbaseTableHandle handle)
+ {
+ return new File(new File(config.getSchemaFolder()), String.format("%s.%s.%s.json", client.getBucket().name(), handle.schema(), handle.name()));
+ }
+
+ private void checkMtime(CouchbaseTableHandle handle)
+ {
+ final String path = handle.path();
+ try {
+ long cached = mtimeCache.getOrDefault(path, 0L);
+ long actual = Files.getLastModifiedTime(getSchemaFile(handle).toPath()).toMillis();
+ if (actual - cached > 1000) {
+ metaCache.remove(path);
+ }
+ }
+ catch (Exception e) {
+ metaCache.remove(path);
+ }
+ }
+
+ @Override
+ public Optional> applyFilter(ConnectorSession session, ConnectorTableHandle handle, Constraint constraint)
+ {
+ if (!takeOrReject(constraint.getAssignments())) {
+ LOG.info("rejecting constraint {} for table {}: unsupported assignments", constraint, handle);
+ return Optional.empty();
+ }
+
+ if (handle instanceof CouchbaseTableHandle cbHandle) {
+ // don't apply where on aggregations or root query
+ if (cbHandle.subQuery().isEmpty() || cbHandle.isAggregated().get() || !cbHandle.groupings().isEmpty()) {
+ cbHandle = cbHandle.wrap();
+ }
+ TupleDomain oldDomain = cbHandle.constraint();
+ TupleDomain newDomain = oldDomain.intersect(constraint.getSummary());
+ TupleDomain remainingFilter;
+ if (newDomain.isNone()) {
+ remainingFilter = TupleDomain.all();
+ }
+ else {
+ Map domains = newDomain.getDomains().orElseThrow();
+
+ Map supported = new HashMap<>();
+ Map unsupported = new HashMap<>();
+
+ for (Map.Entry entry : domains.entrySet()) {
+ CouchbaseColumnHandle columnHandle = (CouchbaseColumnHandle) entry.getKey();
+ Domain domain = entry.getValue();
+ Type columnType = columnHandle.type();
+ if (isPushdownSupportedType(columnType)) {
+ supported.put(entry.getKey(), entry.getValue());
+ }
+ else {
+ unsupported.put(columnHandle, domain);
+ }
+ }
+ newDomain = TupleDomain.withColumnDomains(supported);
+ remainingFilter = TupleDomain.withColumnDomains(unsupported);
+ }
+
+ boolean modified = false;
+ if (!oldDomain.equals(newDomain)) {
+ handle = cbHandle = cbHandle.withConstraint(newDomain);
+ modified = true;
+ }
+
+ if (constraint.getExpression() != Constant.TRUE && !cbHandle.containsConstraint(constraint)) {
+ cbHandle.whereClauses().add(TrinoExpressionToCb.convert(constraint.getExpression(), constraint.getAssignments()));
+ modified = true;
+ }
+
+ if (!modified) {
+ return Optional.empty();
+ }
+
+ return Optional.of(new ConstraintApplicationResult<>(handle, remainingFilter, constraint.getExpression(), false));
+ }
+ return ConnectorMetadata.super.applyFilter(session, handle, constraint);
+ }
+
+ private Type deductType(JsonObject property)
+ {
+ Object type = property.get("type");
+ if (type instanceof String) {
+ type = JsonArray.from(type);
+ }
+
+ JsonArray types = (JsonArray) type;
+ List deductedTypes = new ArrayList<>();
+ for (int i = 0; i < types.size(); i++) {
+ String cbType = types.getString(i);
+ if (cbType.equals("null")) {
+ continue;
+ }
+ else if (cbType.equals("string")) {
+ deductedTypes.add(VarcharType.VARCHAR);
+ }
+ else if (cbType.startsWith("varchar(")) {
+ int size = Integer.valueOf(cbType.replace("varchar(", "").replace(")", ""));
+ deductedTypes.add(VarcharType.createVarcharType(size));
+ }
+ else if (cbType.equals("boolean")) {
+ deductedTypes.add(BooleanType.BOOLEAN);
+ }
+ else if (cbType.equals("number")) {
+ deductedTypes.add(DecimalType.createDecimalType());
+ }
+ else if (cbType.equals("integer")) {
+ deductedTypes.add(IntegerType.INTEGER);
+ }
+ else if (cbType.equals("date")) {
+ deductedTypes.add(DateType.DATE);
+ }
+ else if (cbType.equals("bigint")) {
+ deductedTypes.add(BigintType.BIGINT);
+ }
+ else if (cbType.equals("double")) {
+ deductedTypes.add(DoubleType.DOUBLE);
+ }
+ else if (cbType.equals("array")) {
+ deductedTypes.add(new ArrayType(deductType(property.getObject("items"))));
+ }
+ else if (cbType.equals("object")) {
+ List fields = new ArrayList<>();
+ JsonObject properties = property.getObject("properties");
+ for (String propertyName : properties.getNames()) {
+ JsonObject subProperty = properties.getObject(propertyName);
+ fields.add(new RowType.Field(Optional.of(propertyName), deductType(subProperty)));
+ }
+ deductedTypes.add(RowType.from(fields));
+ }
+ else {
+ throw new RuntimeException("Unsupported couchbase type: " + cbType);
+ }
+ }
+ if (deductedTypes.size() != 1) {
+ throw new RuntimeException("Ambiguous couchbase type: " + deductedTypes);
+ }
+ return deductedTypes.get(0);
+ }
+
+ @Override
+ public List listTables(ConnectorSession session, Optional schemaName)
+ {
+ CollectionManager cm = client.getBucket().collections();
+ return cm.getAllScopes().stream()
+ .filter(scopeSpec -> scopeSpec.name().equals(client.getScope().name()))
+ .flatMap(scopeSpec -> scopeSpec.collections().stream())
+ .map(collectionSpec -> new SchemaTableName(client.getScope().name(), collectionSpec.name()))
+ .toList();
+ }
+
+ @Override
+ public List listSchemaNames(ConnectorSession session)
+ {
+ return Arrays.asList(client.getScope().name());
+ }
+
+ @Override
+ public Optional> applyTopN(ConnectorSession session, ConnectorTableHandle handle, long topNCount, List sortItems, Map assignments)
+ {
+ if (!takeOrReject(assignments)) {
+ LOG.info("Rejecting topN assignments: {}", assignments);
+ return Optional.empty();
+ }
+
+ if (handle instanceof CouchbaseTableHandle cbHandle) {
+ if (cbHandle.topNCount().longValue() != -1 || !cbHandle.orderClauses().isEmpty()) {
+ if (cbHandle.topNCount().longValue() <= topNCount && cbHandle.compareSortItems(sortItems,
+ assignments)) {
+ LOG.info("Rejecting topN: no effect");
+ return Optional.empty();
+ }
+ LOG.info("Wrapping table handle: already got topN or non-matching order");
+ cbHandle = cbHandle.wrap();
+ handle = cbHandle;
+ }
+
+
+
+ cbHandle.setTopNCount(topNCount);
+ cbHandle.addSortItems(sortItems, assignments);
+ }
+ else {
+ LOG.warn("Rejecting topN assignments: handle is not couchbase");
+ return Optional.empty();
+ }
+
+ LOG.info("Accepted topN assignments: {}", handle);
+ return Optional.of(new TopNApplicationResult(handle, true, true));
+ }
+
+ private boolean takeOrReject(Map assignments)
+ {
+ return assignments.keySet().stream()
+ .allMatch(key -> assignments.get(key) instanceof CouchbaseColumnHandle);
+ }
+
+ @Override
+ public Optional> applyLimit(ConnectorSession session, ConnectorTableHandle handle, long limit)
+ {
+ if (handle instanceof CouchbaseTableHandle cbHandle) {
+ if (cbHandle.topNCount().longValue() != -1) {
+ if (cbHandle.topNCount().longValue() <= limit) {
+ return Optional.empty();
+ }
+ }
+ cbHandle.setTopNCount(limit);
+ }
+ else {
+ return Optional.empty();
+ }
+
+ return Optional.of(new LimitApplicationResult<>(handle, true, false));
+ }
+
+ @Override
+ public Optional> applyProjection(ConnectorSession session, ConnectorTableHandle handle, List projections, Map assignments)
+ {
+ if (!(handle instanceof CouchbaseTableHandle)) {
+ return Optional.empty();
+ }
+
+ CouchbaseTableHandle cbTable = (CouchbaseTableHandle) handle;
+
+ try {
+ if (cbTable.containsProjections(projections, assignments)) {
+ if (cbTable.selectClauses().size() == projections.size()) {
+ return Optional.empty();
+ }
+ cbTable = cbTable.wrap();
+ }
+ }
+ catch (IllegalArgumentException e) {
+ LOG.warn(String.format("Exception while applying projections to couchbase table: %s", cbTable), e);
+ return Optional.empty();
+ }
+
+ Set projectedExpressions = projections.stream()
+ .flatMap(expression -> extractSupportedProjectedColumns(expression, ex -> true).stream())
+ .collect(toImmutableSet());
+
+ Map columnProjections = projectedExpressions.stream()
+ .collect(toImmutableMap(identity(), ApplyProjectionUtil::createProjectedColumnRepresentation));
+
+ Map newAssignments = new HashMap<>();
+ Map newColumnAssignmentMap = new HashMap<>();
+ ImmutableMap.Builder newVariablesBuilder = ImmutableMap.builder();
+ ImmutableSet.Builder projectedColumnsBuilder = ImmutableSet.builder();
+
+ for (Map.Entry entry : columnProjections.entrySet()) {
+ ConnectorExpression expression = entry.getKey();
+ ApplyProjectionUtil.ProjectedColumnRepresentation projectedColumn = entry.getValue();
+
+ CouchbaseColumnHandle baseColumnHandle = (CouchbaseColumnHandle) assignments.get(projectedColumn.getVariable().getName());
+ CouchbaseColumnHandle projectedColumnHandle = projectColumn(baseColumnHandle, projectedColumn.getDereferenceIndices(), expression.getType());
+ String projectedColumnName = projectedColumnHandle.name();
+
+ Variable projectedColumnVariable = new Variable(projectedColumnName, expression.getType());
+ Assignment newAssignment = new Assignment(projectedColumnName, projectedColumnHandle, expression.getType());
+ newAssignments.putIfAbsent(projectedColumnName, newAssignment);
+ newColumnAssignmentMap.putIfAbsent(projectedColumnName, projectedColumnHandle);
+
+ newVariablesBuilder.put(expression, projectedColumnVariable);
+ projectedColumnsBuilder.add(projectedColumnHandle);
+ }
+
+ Map newVariables = newVariablesBuilder.buildOrThrow();
+ List newProjections = projections.stream()
+ .map(expression -> replaceWithNewVariables(expression, newVariables))
+ .collect(toImmutableList());
+
+ if (cbTable.containsProjections(newProjections, newColumnAssignmentMap)) {
+ if (newProjections.size() == cbTable.selectClauses().size()) {
+ return Optional.empty();
+ }
+ cbTable = cbTable.wrap();
+ }
+
+ List outputAssignments = newAssignments.values().stream().collect(toImmutableList());
+ List projectionAssignments = cbTable.addProjections(newProjections, newColumnAssignmentMap);
+ for (int i = 0; i < newProjections.size(); i++) {
+ String assignedName = projectionAssignments.get(i);
+ if (!newColumnAssignmentMap.containsKey(assignedName)) {
+ ConnectorExpression projection = newProjections.get(i);
+ newColumnAssignmentMap.put(assignedName, new CouchbaseColumnHandle(
+ Arrays.asList(cbTable.schema(), cbTable.name(), assignedName),
+ Collections.emptyList(),
+ projection.getType(),
+ !(projection instanceof Variable)));
+ }
+ }
+
+ return Optional.of(new ProjectionApplicationResult<>(
+ cbTable,
+ newProjections,
+ outputAssignments,
+ false));
+ }
+
+ private static CouchbaseColumnHandle projectColumn(CouchbaseColumnHandle baseColumn, List indices, Type projectedColumnType)
+ {
+ if (indices.isEmpty()) {
+ return baseColumn;
+ }
+ ImmutableList.Builder dereferenceNamesBuilder = ImmutableList.builder();
+ dereferenceNamesBuilder.addAll(baseColumn.dereferenceNames());
+
+ Type type = baseColumn.type();
+ for (int index : indices) {
+ checkArgument(type instanceof RowType, "type should be Row type");
+ RowType rowType = (RowType) type;
+ RowType.Field field = rowType.getFields().get(index);
+ dereferenceNamesBuilder.add(field.getName()
+ .orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "ROW type does not have field names declared: " + rowType)));
+ type = field.getType();
+ }
+ return new CouchbaseColumnHandle(
+ baseColumn.path(),
+ dereferenceNamesBuilder.build(),
+ projectedColumnType,
+ baseColumn.isSynthetic());
+ }
+
+ @Override
+ public Optional> applySample(ConnectorSession session, ConnectorTableHandle handle, SampleType sampleType, double sampleRatio)
+ {
+ return ConnectorMetadata.super.applySample(session, handle, sampleType, sampleRatio);
+ }
+
+ @Override
+ public Optional> applyAggregation(ConnectorSession session, ConnectorTableHandle handle, List aggregates, Map assignments, List> groupingSets)
+ {
+ if (handle instanceof CouchbaseTableHandle cbTable) {
+ if (cbTable.containsAllAggregations(aggregates, assignments) && cbTable.containsAllGroupings(groupingSets)) {
+ return Optional.empty();
+ }
+ ImmutableList.Builder newAssignmentsBuilder = ImmutableList.builder();
+ ImmutableList.Builder projectionsBuilder = ImmutableList.builder();
+
+ cbTable = cbTable.wrap();
+
+ for (int i = 0; i < aggregates.size(); i++) {
+ AggregateFunction aggregateFunction = aggregates.get(i);
+ NamedParametrizedString result = cbTable.addAggregateFunction(aggregateFunction, assignments);
+ newAssignmentsBuilder.add(
+ new Assignment(
+ result.name(),
+ new CouchbaseColumnHandle(
+ Arrays.asList(cbTable.schema(), cbTable.name(), result.name()),
+ new ArrayList<>(),
+ aggregateFunction.getOutputType(),
+ true),
+ aggregateFunction.getOutputType()));
+ projectionsBuilder.add(new Variable(result.name(), aggregateFunction.getOutputType()));
+ }
+
+ cbTable.addGroupings(groupingSets);
+
+ return Optional.of(new AggregationApplicationResult<>(
+ cbTable,
+ projectionsBuilder.build(),
+ newAssignmentsBuilder.build(),
+ ImmutableMap.of(),
+ true
+ ));
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional> applyJoin(ConnectorSession session, JoinType joinType, ConnectorTableHandle left, ConnectorTableHandle right, ConnectorExpression joinCondition, Map leftAssignments, Map rightAssignments, JoinStatistics statistics)
+ {
+ return ConnectorMetadata.super.applyJoin(session, joinType, left, right, joinCondition, leftAssignments, rightAssignments, statistics);
+ }
+
+ @Override
+ public Optional> applyTableFunction(ConnectorSession session, ConnectorTableFunctionHandle handle)
+ {
+ return ConnectorMetadata.super.applyTableFunction(session, handle);
+ }
+
+ @Override
+ public Optional applyTableScanRedirect(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ return ConnectorMetadata.super.applyTableScanRedirect(session, tableHandle);
+ }
+}
diff --git a/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbasePageSource.java b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbasePageSource.java
new file mode 100644
index 000000000000..67401bc184ea
--- /dev/null
+++ b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbasePageSource.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.couchbase;
+
+import com.couchbase.client.java.json.JsonArray;
+import com.couchbase.client.java.json.JsonObject;
+import com.couchbase.client.java.query.QueryOptions;
+import com.couchbase.client.java.query.QueryResult;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.primitives.Shorts;
+import com.google.common.primitives.SignedBytes;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
+import io.trino.plugin.base.metrics.LongCount;
+import io.trino.spi.Page;
+import io.trino.spi.PageBuilder;
+import io.trino.spi.TrinoException;
+import io.trino.spi.block.ArrayBlockBuilder;
+import io.trino.spi.block.Block;
+import io.trino.spi.block.BlockBuilder;
+import io.trino.spi.block.MapBlockBuilder;
+import io.trino.spi.block.RowBlockBuilder;
+import io.trino.spi.block.SqlMap;
+import io.trino.spi.block.SqlRow;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ConnectorPageSource;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorSplit;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.connector.DynamicFilter;
+import io.trino.spi.connector.SourcePage;
+import io.trino.spi.metrics.Metrics;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.type.ArrayType;
+import io.trino.spi.type.BigintType;
+import io.trino.spi.type.BooleanType;
+import io.trino.spi.type.DateType;
+import io.trino.spi.type.DecimalType;
+import io.trino.spi.type.Decimals;
+import io.trino.spi.type.Int128;
+import io.trino.spi.type.IntegerType;
+import io.trino.spi.type.MapType;
+import io.trino.spi.type.RealType;
+import io.trino.spi.type.RowType;
+import io.trino.spi.type.SmallintType;
+import io.trino.spi.type.TinyintType;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.VarcharType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static com.couchbase.client.core.cnc.tracing.TracingAttribute.COLLECTION_NAME;
+import static com.google.common.base.Verify.verify;
+import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
+
+public final class CouchbasePageSource
+ implements ConnectorPageSource
+{
+ private static final Logger LOG = LoggerFactory.getLogger(CouchbasePageSource.class);
+ private final ConnectorTransactionHandle transaction;
+ private final ConnectorSession session;
+ private final ConnectorSplit split;
+ private final CouchbaseTableHandle table;
+ private final PageBuilder pageBuilder;
+ private final CouchbaseClient client;
+ private final String queryString;
+ private final Long pageSize;
+ private final List types = new LinkedList<>();
+ private final List names = new LinkedList<>();
+ private long offset;
+ private long total, pageCount, duration, errors;
+ private boolean finished;
+
+ public CouchbasePageSource(CouchbaseClient client, CouchbaseTransactionHandle transaction, ConnectorSession session, CouchbaseSplit split, CouchbaseTableHandle table, List columns, DynamicFilter dynamicFilter, Long pageSize)
+ {
+ this.client = client;
+ this.transaction = transaction;
+ this.session = session;
+ this.split = split;
+ this.pageSize = pageSize;
+
+ if (columns != null && !columns.isEmpty()) {
+ CouchbaseTableHandle finalTable = table;
+ columns.forEach(column -> {
+ if (!finalTable.coversColumn(column)) {
+ finalTable.addColumn(column);
+ }
+ });
+// table = table.wrap();
+// table.addColumns(columns);
+ }
+// else if (columns != null) {
+// table.clearSelectElements();
+// }
+
+ this.table = table;
+
+ TupleDomain predicate = dynamicFilter.getCurrentPredicate();
+
+ if (!predicate.isAll()) {
+ table = table.wrap();
+ table.addPredicate(predicate);
+ }
+
+ long limit = table.topNCount().get();
+ if (limit < 0) {
+ limit = pageSize;
+ }
+ columns.forEach(column -> {
+ types.add(column.type());
+ names.add(column.name());
+ });
+ this.pageBuilder = new PageBuilder((int) Math.min(limit, pageSize), types);
+ queryString = String.format("SELECT %s FROM (%s) data OFFSET %%d LIMIT %%d",
+ names.stream().collect(Collectors.joining("`, `", "`", "`")),
+ table.toSql().replaceAll("%", "%%"));
+ }
+
+ @Override
+ public SourcePage getNextSourcePage()
+ {
+ if (finished) {
+ return null;
+ }
+ final long started = System.currentTimeMillis();
+ verify(pageBuilder.isEmpty());
+ JsonArray queryArgs = JsonArray.create();
+ table.getParameters().forEach(queryArgs::add);
+ QueryOptions options = QueryOptions.queryOptions().parameters(queryArgs);
+
+ try {
+ final String query = String.format(queryString, offset, pageSize);
+ QueryResult result = client.getScope().query(query, options);
+ List rows = result.rowsAsObject();
+ LOG.info("Couchbase query ({} result rows): {}; arguments: {}", rows.size(), query, queryArgs);
+
+ for (int j = 0; j < rows.size(); j++) {
+ JsonObject row = rows.get(j);
+ pageBuilder.declarePosition();
+ for (int i = 0; i < names.size(); i++) {
+ Type type = types.get(i);
+ BlockBuilder output = pageBuilder.getBlockBuilder(i);
+ appendValue(output, type, row.get(names.get(i)));
+ }
+ }
+
+ // update metrics
+ offset += pageSize;
+ total += rows.size();
+ pageCount++;
+ duration += System.currentTimeMillis() - started;
+
+ if (rows.size() != pageSize) {
+ finished = true;
+ }
+ else if (rows.isEmpty()) {
+ finished = true;
+ return null;
+ }
+ } catch (Throwable t) {
+ errors++;
+ throw t;
+ }
+
+
+ Page page = pageBuilder.build();
+ pageBuilder.reset();
+ return SourcePage.create(page);
+ }
+
+ private void appendValue(BlockBuilder output, Type type, Object value)
+ {
+ if (value == null) {
+ output.appendNull();
+ return;
+ }
+
+ Class> javaType = type.getJavaType();
+
+ try {
+ if (type == BooleanType.BOOLEAN) {
+ type.writeBoolean(output, Boolean.valueOf(String.valueOf(value)));
+ return;
+ }
+ else if (type == VarcharType.VARCHAR || javaType == Slice.class) {
+ Slice slice = Slices.utf8Slice(String.valueOf(value));
+ type.writeSlice(output, slice);
+ }
+ else if (javaType == long.class) {
+ if (type.equals(BigintType.BIGINT)) {
+ type.writeLong(output, ((Number) value).longValue());
+ }
+ else if (type.equals(IntegerType.INTEGER)) {
+ type.writeLong(output, ((Number) value).intValue());
+ }
+ else if (type.equals(SmallintType.SMALLINT)) {
+ type.writeLong(output, Shorts.checkedCast(((Number) value).longValue()));
+ }
+ else if (type.equals(TinyintType.TINYINT)) {
+ type.writeLong(output, SignedBytes.checkedCast(((Number) value).longValue()));
+ }
+ else if (type.equals(RealType.REAL)) {
+ type.writeLong(output, Float.floatToIntBits(((Number) value).floatValue()));
+ }
+ else if (type instanceof DecimalType decimalType) {
+ LOG.info("test");
+ throw new RuntimeException("test");
+// Decimal128 decimal = (Decimal128) value;
+// if (decimal.compareTo(Decimal128.) == 0) {
+// type.writeLong(output, encodeShortScaledValue(BigDecimal.ZERO, decimalType.getScale()));
+// }
+// else {
+// type.writeLong(output, encodeShortScaledValue(decimal.bigDecimalValue(), decimalType.getScale()));
+// }
+ }
+ else if (type.equals(DateType.DATE)) {
+ type.writeLong(output, Long.valueOf(value.toString()));
+ }
+ else {
+ throw new RuntimeException("Unsupported type: " + type);
+ }
+ }
+ else if (javaType == Int128.class) {
+ DecimalType decimalType = (DecimalType) type;
+ if (value instanceof Integer intValue) {
+ if (intValue == 0) {
+ type.writeObject(output, Decimals.encodeScaledValue(BigDecimal.ZERO, decimalType.getScale()));
+ }
+ else {
+ type.writeObject(output, Decimals.encodeScaledValue(BigDecimal.valueOf(intValue), decimalType.getScale()));
+ }
+ }
+ else if (value instanceof Double doubleValue) {
+ if (doubleValue == 0.0d) {
+ type.writeObject(output, Decimals.encodeScaledValue(BigDecimal.ZERO, decimalType.getScale()));
+ }
+ else {
+ BigDecimal result = new BigDecimal(BigInteger.valueOf(doubleValue.longValue()));
+ type.writeObject(output, Decimals.encodeScaledValue(result, decimalType.getScale()));
+ }
+ }
+ else {
+ throw new RuntimeException("Unsupported type: " + value.getClass());
+ }
+ }
+ else if (javaType == Double.class || javaType == double.class) {
+ type.writeDouble(output, ((Number) value).doubleValue());
+ }
+ else if (javaType == Block.class || javaType == SqlMap.class || javaType == SqlRow.class) {
+ writeBlock(output, type, value);
+ }
+ else {
+ throw new RuntimeException("Unsupported type " + javaType);
+ }
+ return;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(String.format("Failed to append value '%s' of type %s from object type %s", String.valueOf(value), type, value.getClass()), e);
+ }
+ }
+
+ private void writeBlock(BlockBuilder output, Type type, Object valueArg)
+ {
+ final Object value;
+ if (valueArg instanceof JsonObject document) {
+ value = document.toMap();
+ }
+ else if (valueArg instanceof JsonArray arr) {
+ value = arr.toList();
+ } else {
+ value = valueArg;
+ }
+ if (type instanceof ArrayType arrayType) {
+ if (value instanceof List> list) {
+ ((ArrayBlockBuilder) output).buildEntry(elementBuilder -> list.forEach(element -> appendTo(arrayType.getElementType(), element, elementBuilder)));
+ return;
+ }
+ }
+ else if (type instanceof MapType mapType) {
+ if (value instanceof List>) {
+ ((MapBlockBuilder) output).buildEntry((keyBuilder, valueBuilder) -> {
+ for (Object element : (List>) value) {
+ if (!(element instanceof Map, ?> document)) {
+ continue;
+ }
+
+ if (document.containsKey("key") && document.containsKey("value")) {
+ appendTo(mapType.getKeyType(), document.get("key"), keyBuilder);
+ appendTo(mapType.getValueType(), document.get("value"), valueBuilder);
+ }
+ }
+ });
+ return;
+ }
+ if (value instanceof Map, ?> document) {
+ ((MapBlockBuilder) output).buildEntry((keyBuilder, valueBuilder) -> {
+ for (Map.Entry, ?> entry : document.entrySet()) {
+ appendTo(mapType.getKeyType(), entry.getKey(), keyBuilder);
+ appendTo(mapType.getValueType(), entry.getValue(), valueBuilder);
+ }
+ });
+ return;
+ }
+ }
+ else if (type instanceof RowType rowType) {
+ List fields = rowType.getFields();
+ if (value instanceof Map, ?> mapValue) {
+ ((RowBlockBuilder) output).buildEntry(fieldBuilders -> {
+ for (int i = 0; i < fields.size(); i++) {
+ RowType.Field field = fields.get(i);
+ String fieldName = field.getName().orElse("field" + i);
+ appendTo(field.getType(), mapValue.get(fieldName), fieldBuilders.get(i));
+ }
+ });
+ return;
+ }
+ if (value instanceof List> listValue) {
+ ((RowBlockBuilder) output).buildEntry(fieldBuilders -> {
+ for (int index = 0; index < fields.size(); index++) {
+ if (index < listValue.size()) {
+ appendTo(fields.get(index).getType(), listValue.get(index), fieldBuilders.get(index));
+ }
+ else {
+ fieldBuilders.get(index).appendNull();
+ }
+ }
+ });
+ return;
+ }
+ }
+ else {
+ throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Block: " + type.getDisplayName());
+ }
+
+ // not a convertible value
+ output.appendNull();
+ }
+
+ private void appendTo(Type elementType, Object element, BlockBuilder elementBuilder)
+ {
+ appendValue(elementBuilder, elementType, element);
+ }
+
+ @Override
+ public long getCompletedBytes()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getReadTimeNanos()
+ {
+ return 0;
+ }
+
+ @Override
+ public boolean isFinished()
+ {
+ return finished;
+ }
+
+ @Override
+ public long getMemoryUsage()
+ {
+ return 0;
+ }
+
+ @Override
+ public void close()
+ throws IOException
+ {
+ }
+
+ public ConnectorTransactionHandle transaction()
+ {
+ return transaction;
+ }
+
+ public ConnectorSession session()
+ {
+ return session;
+ }
+
+ public ConnectorSplit split()
+ {
+ return split;
+ }
+
+ public ConnectorTableHandle table()
+ {
+ return table;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null || obj.getClass() != this.getClass()) {
+ return false;
+ }
+ var that = (CouchbasePageSource) obj;
+ return Objects.equals(this.transaction, that.transaction) && Objects.equals(this.session, that.session) && Objects.equals(this.split, that.split) && Objects.equals(this.table, that.table);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(transaction, session, split, table);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "CouchbasePageSource[" + "transaction=" + transaction + ", " + "session=" + session + ", " + "split=" + split + ", " + "table=" + table + ']';
+ }
+
+ @Override
+ public Metrics getMetrics()
+ {
+ return new Metrics(ImmutableMap.of(
+ "rows", new LongCount(total),
+ "duration", new LongCount(duration),
+ "pages", new LongCount(pageCount),
+ "errors", new LongCount(errors)));
+ }
+}
diff --git a/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbasePageSourceProvider.java b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbasePageSourceProvider.java
new file mode 100644
index 000000000000..9763df3ae646
--- /dev/null
+++ b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbasePageSourceProvider.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.couchbase;
+
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ConnectorPageSource;
+import io.trino.spi.connector.ConnectorPageSourceProvider;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorSplit;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.connector.DynamicFilter;
+import jakarta.inject.Inject;
+
+import java.util.List;
+
+public class CouchbasePageSourceProvider
+ implements ConnectorPageSourceProvider
+{
+ private final CouchbaseClient client;
+ private final CouchbaseConfig config;
+
+ @Inject
+ public CouchbasePageSourceProvider(CouchbaseConfig config, CouchbaseClient client)
+ {
+ this.config = config;
+ this.client = client;
+ }
+
+ @Override
+ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, ConnectorTableHandle table, List columns, DynamicFilter dynamicFilter)
+ {
+ return new CouchbasePageSource(
+ client,
+ (CouchbaseTransactionHandle) transaction,
+ session,
+ (CouchbaseSplit) split,
+ (CouchbaseTableHandle) table,
+ columns.stream().map(CouchbaseColumnHandle.class::cast).toList(),
+ dynamicFilter,
+ config.getPageSize());
+ }
+}
diff --git a/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbasePlugin.java b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbasePlugin.java
new file mode 100644
index 000000000000..99204c662c57
--- /dev/null
+++ b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbasePlugin.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.couchbase;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import io.trino.spi.Plugin;
+import io.trino.spi.connector.ConnectorFactory;
+
+import static java.util.Objects.requireNonNull;
+
+public class CouchbasePlugin
+ implements Plugin
+{
+ private ConnectorFactory connectorFactory;
+
+ public CouchbasePlugin()
+ {
+ connectorFactory = new CouchbaseConnectorFactory();
+ }
+
+ @VisibleForTesting
+ CouchbasePlugin(ConnectorFactory connectorFactory)
+ {
+ connectorFactory = requireNonNull(connectorFactory, "factory is null");
+ }
+
+ @Override
+ public Iterable getConnectorFactories()
+ {
+ return ImmutableList.of(connectorFactory);
+ }
+}
diff --git a/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseSplit.java b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseSplit.java
new file mode 100644
index 000000000000..b36c68027ac0
--- /dev/null
+++ b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseSplit.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.couchbase;
+
+import io.trino.spi.connector.ConnectorSplit;
+
+public record CouchbaseSplit(
+) implements ConnectorSplit
+{
+}
diff --git a/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseSplitManager.java b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseSplitManager.java
new file mode 100644
index 000000000000..bd2fff866acd
--- /dev/null
+++ b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseSplitManager.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.couchbase;
+
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorSplitManager;
+import io.trino.spi.connector.ConnectorSplitSource;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.connector.Constraint;
+import io.trino.spi.connector.DynamicFilter;
+import io.trino.spi.connector.FixedSplitSource;
+import io.trino.spi.function.table.ConnectorTableFunctionHandle;
+
+public class CouchbaseSplitManager
+ implements ConnectorSplitManager
+{
+ @Override
+ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableHandle table, DynamicFilter dynamicFilter, Constraint constraint)
+ {
+ return new FixedSplitSource(new CouchbaseSplit());
+ }
+
+ @Override
+ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableFunctionHandle function)
+ {
+ return new FixedSplitSource(new CouchbaseSplit());
+ }
+}
diff --git a/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseTableHandle.java b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseTableHandle.java
new file mode 100644
index 000000000000..69b33a7446d7
--- /dev/null
+++ b/plugin/trino-couchbase/src/main/java/io/trino/plugin/couchbase/CouchbaseTableHandle.java
@@ -0,0 +1,531 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.couchbase;
+
+import com.google.common.collect.Streams;
+import io.airlift.slice.Slice;
+import io.trino.plugin.couchbase.translations.TrinoExpressionToCb;
+import io.trino.plugin.couchbase.translations.TrinoToCbType;
+import io.trino.spi.connector.AggregateFunction;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.Constraint;
+import io.trino.spi.connector.SortItem;
+import io.trino.spi.connector.SortOrder;
+import io.trino.spi.expression.ConnectorExpression;
+import io.trino.spi.expression.Variable;
+import io.trino.spi.predicate.Domain;
+import io.trino.spi.predicate.SortedRangeSet;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.type.Type;
+import jakarta.validation.constraints.NotNull;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.joining;
+
+public record CouchbaseTableHandle(String schema, String name, Optional subQuery,
+ List selectClauses, List selectTypes,
+ List selectNames, List whereClauses,
+ TupleDomain constraint,
+ LinkedHashMap orderClauses,
+ Set groupings,
+ AtomicBoolean isAggregated,
+ AtomicLong topNCount) implements ConnectorTableHandle
+{
+ public String path()
+ {
+ return String.format("`%s`.`%s`", schema, name);
+ }
+
+ public static CouchbaseTableHandle fromSchemaAndName(String schema, String name)
+ {
+ return new CouchbaseTableHandle(
+ schema,
+ name,
+ Optional.empty(),
+ new ArrayList<>(),
+ new ArrayList<>(),
+ new ArrayList<>(),
+ new ArrayList<>(),
+ TupleDomain.all(),
+ new LinkedHashMap<>(),
+ new HashSet<>(),
+ new AtomicBoolean(false),
+ new AtomicLong(-1L));
+ }
+
+ public void addSortItems(List sortItems, Map assignments)
+ {
+ CouchbaseTableHandle sq = subQuery.orElse(null);
+ List pushdown = new ArrayList<>();
+ sortItems.forEach(sortItem -> {
+ CouchbaseColumnHandle sourceColumn = (CouchbaseColumnHandle) assignments.get(sortItem.getName());
+ orderClauses.put(transformSortItem(sortItem, assignments), sourceColumn);
+ });
+ if (!pushdown.isEmpty()) {
+ sq.addSortItems(pushdown, assignments);
+ }
+ }
+
+ protected String transformSortItem(SortItem sortItem, Map assignments)
+ {
+ CouchbaseColumnHandle column = (CouchbaseColumnHandle) assignments.get(sortItem.getName());
+ return String.format("%s %s", column.name(), sortItem.getSortOrder().toString());
+ }
+
+ public boolean compareSortItems(List sortItems, Map assignments)
+ {
+ if (this.orderClauses.size() != sortItems.size()) {
+ return false;
+ }
+ return sortItems.stream().map(si -> transformSortItem(si, assignments)).allMatch(orderClauses.keySet()::contains);
+ }
+
+ public boolean hasVariable(String name)
+ {
+ return selectClauses.stream()
+ .anyMatch(c -> {
+ if (c.name() == null) {
+ String text = c.value().text();
+ return c.value().params().isEmpty() &&
+ text.startsWith(name, 1) &&
+ text.length() == name.length() + 2 &&
+ text.endsWith("`");
+ }
+ return c.name().equals(name);
+ });
+ }
+
+ public void setTopNCount(long topNCount)
+ {
+ this.topNCount.set(topNCount);
+ }
+
+ public List addProjections(List projections, Map assignments)
+ {
+ return projections.stream().map(p -> addProjection(p, assignments)).toList();
+ }
+
+ private String addProjection(ConnectorExpression projection, Map assignments)
+ {
+ NamedParametrizedString compiled = compileProjection(projection, assignments);
+ if (!selectClauses.contains(compiled)) {
+ String otherName = findName(compiled.value()).orElse(null);
+ if (otherName == null) {
+ if (compiled.name() == null) {
+ compiled = new NamedParametrizedString(generateColumnName(), compiled.value());
+ } else if (hasVariable(compiled.name())) {
+ return compiled.name();
+ }
+ selectClauses.add(compiled);
+ selectTypes.add(projection.getType());
+ selectNames.add(compiled.name());
+ }
+ else {
+ return otherName;
+ }
+ }
+ return compiled.name();
+ }
+
+ private Optional findName(ParametrizedString value)
+ {
+ return selectClauses.stream().filter(nps -> nps.value().equals(value)).findFirst().map(nps -> nps.name());
+ }
+
+ private NamedParametrizedString compileProjection(ConnectorExpression projection, Map assignments)
+ {
+ if (projection instanceof Variable variable) {
+ ParametrizedString compiled = TrinoExpressionToCb.convert(projection, assignments);
+ return new NamedParametrizedString(variable.getName(), compiled);
+ }
+ else {
+ ParametrizedString compiled = TrinoExpressionToCb.convert(projection, assignments);
+ return new NamedParametrizedString(null, compiled);
+ }
+ }
+
+ public String toSql()
+ {
+ List fromClause = new ArrayList<>();
+ boolean fromSubQuery = false;
+ if (subQuery.isPresent()) {
+ CouchbaseTableHandle sq = subQuery.get();
+ if (this.topNCount.get() < 0 && this.whereClauses().isEmpty() && this.orderClauses.isEmpty() && sq.selectClauses().containsAll(this.selectClauses) && this.groupings.isEmpty()) {
+ return sq.toSql();
+ }
+ if (sq != this && (sq.schema().equals(schema) && sq.name().equals(name))) {
+ fromClause.add(String.format("(%s) `%s`", sq.toSql(), "data"));
+// selectClauses.add(new NamedParametrizedString("data", ParametrizedString.from(String.format("`%s`.*", "data"))));
+ fromSubQuery = true;
+ }
+ }
+ if (fromClause.isEmpty()) {
+ fromClause.add(String.format("`%s`", name));
+ }
+
+ StringBuilder groupByClause = new StringBuilder();
+ if (!groupings.isEmpty()) {
+ groupByClause.append(groupings.stream()
+ .map(CouchbaseColumnHandle::name)
+ .collect(Collectors.joining("`, `", " GROUP BY `", "`"))
+ );
+ }
+
+ StringBuilder orderByClause = new StringBuilder();
+ if (!orderClauses.isEmpty()) {
+ orderByClause.append(String.format(" ORDER BY %s", String.join(", ", orderClauses.keySet())));
+ if (!fromSubQuery) {
+ orderByClause.append(", META().id");
+ }
+ } else if (!fromSubQuery) {
+ orderByClause.append(" ORDER BY META().id");
+ }
+
+ StringBuilder whereClause = new StringBuilder();
+ if (!whereClauses.isEmpty()) {
+ whereClause.append(String.format(" WHERE %s",
+ whereClauses.stream().map(ParametrizedString::toString).collect(joining(" AND "))));
+ }
+
+
+ String query = String.format("SELECT %s FROM %s%s%s%s",
+ selectClauses.isEmpty() ? String.format("`%s`.*", subQuery.isPresent() ? "data" : name()):
+ selectClauses.stream().map(NamedParametrizedString::toString).collect(joining(", ")),
+ String.join(", ", fromClause),
+ whereClause.toString(),
+ groupByClause.toString(),
+ orderByClause.toString());
+
+ if (topNCount.get() > -1) {
+ query = String.format("%s LIMIT %d", query, topNCount.get());
+ }
+
+ return query;
+ }
+
+ private Stream getParametrizedStrings()
+ {
+ return Streams.concat(
+ selectClauses.stream().map(NamedParametrizedString::value),
+ subQuery.stream().flatMap(CouchbaseTableHandle::getParametrizedStrings),
+ whereClauses.stream());
+ }
+
+ public List