diff --git a/core/trino-server/src/main/provisio/trino.xml b/core/trino-server/src/main/provisio/trino.xml
index 97a366b39290..aab7dbfa00d3 100644
--- a/core/trino-server/src/main/provisio/trino.xml
+++ b/core/trino-server/src/main/provisio/trino.xml
@@ -313,4 +313,10 @@
+
+
+
+
+
+
diff --git a/docs/src/main/sphinx/connector.md b/docs/src/main/sphinx/connector.md
index a954b30cf059..a4ee85d59d17 100644
--- a/docs/src/main/sphinx/connector.md
+++ b/docs/src/main/sphinx/connector.md
@@ -23,6 +23,7 @@ Hive
Hudi
Iceberg
Ignite
+InfluxDB
JMX
Kafka
Lakehouse
diff --git a/docs/src/main/sphinx/connector/influxdb.md b/docs/src/main/sphinx/connector/influxdb.md
new file mode 100644
index 000000000000..f9e592787020
--- /dev/null
+++ b/docs/src/main/sphinx/connector/influxdb.md
@@ -0,0 +1,124 @@
+# InfluxDB connector
+
+The InfluxDB Connector allows access to [InfluxDB](https://www.influxdata.com/) data from Trino.
+This document describes how to setup the InfluxDB Connector to run SQL queries against InfluxDB.
+
+## Requirements
+
+To connect to InfluxDB, you need:
+
+* InfluxDB v1.8.0 or higher.(v2.x not currently supported)
+* Network access from the Trino coordinator and workers to InfluxDB.
+ Port 8086 is the default port.
+
+## Configuration
+
+To configure the InfluxDB connector, create a catalog properties file
+`etc/catalog/example.properties` with the following contents,
+replacing the properties as appropriate:
+
+```properties
+connector.name=influxdb
+influx.endpoint=http://localhost:8086
+influx.username=username
+influx.password=password
+```
+
+### Configuration properties
+
+The following configuration properties are available:
+
+| Property name | Description | Default |
+|---------------|-------------|---------|
+| `influx.endpoint` | Endpoint of the InfluxDB server to connect to. This property is required. | |
+| `influx.username` | User name to use to connect to InfluxDB. | |
+| `influx.password` | Password to use to connect to InfluxDB. | |
+| `influx.connect-timeout` | The socket connect timeout to InfluxDB server. | `10s` |
+| `influx.write-timeout` | The socket write timeout to InfluxDB server. | `10s` |
+| `influx.read-timeout` | The socket read timeout to InfluxDB server. | `60s` |
+
+(influxdb-type-mapping)=
+
+## Type mapping
+
+Because Trino and InfluxDB each support types that the other does not, this
+connector [maps some types](type-mapping-overview) when reading data.
+
+### InfluxDB type to Trino type mapping
+
+The connector maps InfluxDB types to the corresponding Trino types
+according to the following table:
+
+| InfluxDB type | Trino type | Notes |
+|---------------|------------|-------|
+| `TIMESTAMP` | `TIMESTAMP` | `Timestamp` key `time` is unix nanosecond timestamp in influxdb. see [data types](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/#data-types). |
+| `BOOLEAN` | `BOOLEAN` | |
+| `FLOAT` | `DOUBLE` | `FLOAT` is 64-bit floating-point numbers in influxdb. see [data types](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/#data-types). |
+| `INTEGER` | `BIGINT` | `INTEGER` is 64-bit integers in influxdb. see [data types](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/#data-types). |
+| `STRING` | `VARCHAR` | |
+
+No other types are supported.
+
+## SQL support
+
+The connector provides [globally available](sql-globally-available) and
+[read operation](sql-read-operations) statements to access data and
+metadata in the InfluxDB catalog.
+
+Creation and deletion of schemas are supported.
+As InfluxDB does not support creation of empty tables, creation of tables is not supported but deletion of tables is supported.
+
+(influxdb-pushdown)=
+
+### Pushdown support
+
+The connector supports pushdown for a number of operations:
+
+* [limit-pushdown](limit-pushdown)
+* [projection-pushdown](projection-pushdown)
+* [predicate-pushdown](predicate-pushdown)
+
+But there are some special limitations for predicate-pushdown:
+
+To understand easily, think a measurement "student" in influxdb
+
+```text
+SHOW TAG KEYS:
+tagKey
+------
+grade
+class
+
+SHOW FIELD KEYS:
+fieldKey fieldType
+-------- ---------
+name string
+age integer
+score float
+```
+
+Predicate pushdown of keys of `STRING`, `BOOLEAN`, `INTEGER` and `FLOAT` types are supported:
+
+For `TIMESTAMP` key, supports equality predicates `=`, range predicates, such as `>`, `<`.
+
+For keys of `STRING` or `BOOLEAN` type (both tag set and field keys in InfluxDB), supports equality predicates `=` only.
+
+For keys of `INTEGER` or `FLOAT` type (field keys in InfluxDB), supports
+equality predicates `=`, inequality predicates `!=`, and range predicates, such as `>`, `<`, or `BETWEEN`.
+
+```{note}
+Decimal integer literals `int_lit = ("1"…"9"){digit}` pushdown works, hexadecimal and octal literals are not currently supported.
+Floating-point literals `float_lit = int_lit"."int_lit` pushdown works, exponents are not currently supported.
+```
+
+```sql
+-- Not pushed down
+SELECT * FROM student WHERE name > 'CLOUD';
+SELECT * FROM student WHERE score <> 1.0E2;
+-- Pushed down
+SELECT * FROM student WHERE name = 'CLOUD';
+SELECT * FROM student WHERE name != 'CLOUD';
+SELECT * FROM student WHERE age > 12 and age < 14;
+SELECT * FROM student WHERE score > 80.0 and score != 100;
+SELECT * FROM student WHERE time >= timestamp '2020-01-01 00:00:00' and time <= timestamp '2020-01-01 23:59:59';
+```
diff --git a/plugin/trino-influxdb/pom.xml b/plugin/trino-influxdb/pom.xml
new file mode 100644
index 000000000000..e30c68ff3065
--- /dev/null
+++ b/plugin/trino-influxdb/pom.xml
@@ -0,0 +1,251 @@
+
+
+ 4.0.0
+
+ io.trino
+ trino-root
+ 477-SNAPSHOT
+ ../../pom.xml
+
+
+ trino-influxdb
+ trino-plugin
+ Trino - InfluxDB Connector
+
+
+ true
+ ${project.parent.basedir}
+
+
+
+
+ com.google.guava
+ guava
+
+
+
+ com.google.inject
+ guice
+ classes
+
+
+
+ com.squareup.okhttp3
+ logging-interceptor
+
+
+
+ com.squareup.okhttp3
+ okhttp-jvm
+
+
+
+ io.airlift
+ bootstrap
+
+
+
+ io.airlift
+ configuration
+
+
+
+ io.airlift
+ json
+
+
+
+ io.airlift
+ log
+
+
+
+ io.airlift
+ units
+
+
+
+ io.trino
+ trino-plugin-toolkit
+
+
+
+ jakarta.annotation
+ jakarta.annotation-api
+
+
+
+ jakarta.validation
+ jakarta.validation-api
+
+
+
+ org.influxdb
+ influxdb-java
+ 2.23
+
+
+ com.squareup.okhttp3
+ logging-interceptor
+
+
+ com.squareup.okhttp3
+ okhttp
+
+
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+
+ io.airlift
+ slice
+ provided
+
+
+
+ io.opentelemetry
+ opentelemetry-api
+ provided
+
+
+
+ io.opentelemetry
+ opentelemetry-context
+ provided
+
+
+
+ io.trino
+ trino-spi
+ provided
+
+
+
+ org.openjdk.jol
+ jol-core
+ provided
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ runtime
+
+
+
+ io.airlift
+ node
+ runtime
+
+
+
+ io.airlift
+ configuration-testing
+ test
+
+
+
+ io.airlift
+ http-server
+ test
+
+
+
+ io.airlift
+ junit-extensions
+ test
+
+
+
+ io.airlift
+ testing
+ test
+
+
+
+ io.trino
+ trino-client
+ test
+
+
+
+ io.trino
+ trino-main
+ test
+
+
+
+ io.trino
+ trino-main
+ test-jar
+ test
+
+
+
+ io.trino
+ trino-spi
+ test-jar
+ test
+
+
+
+ io.trino
+ trino-testing
+ test
+
+
+
+ io.trino
+ trino-testing-services
+ test
+
+
+
+ io.trino
+ trino-tpch
+ test
+
+
+
+ io.trino.tpch
+ tpch
+ test
+
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+
+ org.testcontainers
+ influxdb
+ test
+
+
+
+ org.testcontainers
+ testcontainers
+ test
+
+
+
+
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxClient.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxClient.java
new file mode 100644
index 000000000000..aa8e0c909697
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxClient.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.influxdb;
+
+import io.trino.spi.connector.SchemaTableName;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+
+import java.util.List;
+import java.util.Optional;
+
+public interface InfluxClient
+{
+ InfluxRecord execute(Query query);
+
+ List listSchemaNames();
+
+ List getSchemaTableNames(String schemaName);
+
+ Optional findTableHandle(String schemaName, String tableName);
+
+ void dropTable(String schemaName, String tableName);
+
+ QueryResult createSchema(String schemaName);
+
+ void dropSchema(String schemaName);
+}
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxColumnHandle.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxColumnHandle.java
new file mode 100644
index 000000000000..334ef302b8c0
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxColumnHandle.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.influxdb;
+
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.type.Type;
+
+import static io.trino.plugin.influxdb.InfluxConstant.ColumnKind;
+import static java.util.Objects.requireNonNull;
+
+public record InfluxColumnHandle(
+ String columnName,
+ Type columnType,
+ ColumnKind columnKind)
+ implements ColumnHandle
+{
+ public InfluxColumnHandle
+ {
+ requireNonNull(columnName, "columnName is null");
+ requireNonNull(columnType, "columnType is null");
+ requireNonNull(columnKind, "columnKind is null");
+ }
+}
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConfig.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConfig.java
new file mode 100644
index 000000000000..89ce17c179c4
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConfig.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.influxdb;
+
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigSecuritySensitive;
+import io.airlift.units.Duration;
+import io.airlift.units.MinDuration;
+import jakarta.validation.constraints.AssertFalse;
+import jakarta.validation.constraints.AssertTrue;
+import jakarta.validation.constraints.NotNull;
+import jakarta.validation.constraints.Pattern;
+
+import java.net.URI;
+import java.util.Optional;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+public class InfluxConfig
+{
+ private String endpoint;
+ private String username;
+ private String password;
+ private boolean insecureAllowed;
+ private Duration connectTimeOut = new Duration(10, SECONDS);
+ private Duration readTimeOut = new Duration(1, MINUTES);
+
+ @NotNull
+ @Pattern(message = "Invalid endpoint. Expected http:// or https://", regexp = "^https?://.*")
+ public String getEndpoint()
+ {
+ return endpoint;
+ }
+
+ @Config("influx.endpoint")
+ public InfluxConfig setEndpoint(String endpoint)
+ {
+ this.endpoint = endpoint;
+ return this;
+ }
+
+ @NotNull
+ public Optional getUsername()
+ {
+ return Optional.ofNullable(username);
+ }
+
+ @Config("influx.username")
+ public InfluxConfig setUsername(String username)
+ {
+ this.username = username;
+ return this;
+ }
+
+ @NotNull
+ public Optional getPassword()
+ {
+ return Optional.ofNullable(password);
+ }
+
+ @ConfigSecuritySensitive
+ @Config("influx.password")
+ public InfluxConfig setPassword(String password)
+ {
+ this.password = password;
+ return this;
+ }
+
+ public boolean getInsecureAllowed()
+ {
+ return insecureAllowed;
+ }
+
+ @Config("influx.insecure-allowed")
+ public InfluxConfig setInsecureAllowed(boolean insecureAllowed)
+ {
+ this.insecureAllowed = insecureAllowed;
+ return this;
+ }
+
+ @MinDuration("0s")
+ public Duration getConnectTimeOut()
+ {
+ return connectTimeOut;
+ }
+
+ @Config("influx.connect-timeout")
+ public InfluxConfig setConnectTimeOut(Duration connectTimeOut)
+ {
+ this.connectTimeOut = connectTimeOut;
+ return this;
+ }
+
+ @MinDuration("0s")
+ public Duration getReadTimeOut()
+ {
+ return readTimeOut;
+ }
+
+ @Config("influx.read-timeout")
+ public InfluxConfig setReadTimeOut(Duration readTimeOut)
+ {
+ this.readTimeOut = readTimeOut;
+ return this;
+ }
+
+ @AssertFalse(message = "'influx.username' and 'influx.password' must be both empty or both non-empty.")
+ public boolean isValidPasswordConfig()
+ {
+ return (username == null) ^ (password == null);
+ }
+
+ @AssertTrue(message = "Credentials require HTTPS. Set 'influx.insecure-allowed=true' to permit HTTP.")
+ public boolean isSecureWhenCredentialsConfigured()
+ {
+ boolean hasCreds = (username != null) || (password != null);
+ if (!hasCreds || endpoint == null) {
+ return true;
+ }
+ String scheme = URI.create(endpoint).getScheme();
+ return insecureAllowed || "https".equalsIgnoreCase(scheme);
+ }
+}
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConnector.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConnector.java
new file mode 100644
index 000000000000..8107dc2035da
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConnector.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.influxdb;
+
+import com.google.inject.Inject;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorRecordSetProvider;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorSplitManager;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.transaction.IsolationLevel;
+
+import static io.trino.plugin.influxdb.InfluxTransactionHandle.INSTANCE;
+import static java.util.Objects.requireNonNull;
+
+public class InfluxConnector
+ implements Connector
+{
+ private final LifeCycleManager lifeCycleManager;
+ private final InfluxMetadata metadata;
+ private final InfluxSplitManager splitManager;
+ private final InfluxRecordSetProvider recordSetProvider;
+
+ @Inject
+ public InfluxConnector(
+ LifeCycleManager lifeCycleManager,
+ InfluxMetadata metadata,
+ InfluxSplitManager splitManager,
+ InfluxRecordSetProvider recordSetProvider)
+ {
+ this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+ this.metadata = requireNonNull(metadata, "metadata is null");
+ this.splitManager = requireNonNull(splitManager, "splitManager is null");
+ this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
+ }
+
+ @Override
+ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit)
+ {
+ return INSTANCE;
+ }
+
+ @Override
+ public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transactionHandle)
+ {
+ return metadata;
+ }
+
+ @Override
+ public ConnectorSplitManager getSplitManager()
+ {
+ return splitManager;
+ }
+
+ @Override
+ public ConnectorRecordSetProvider getRecordSetProvider()
+ {
+ return recordSetProvider;
+ }
+
+ @Override
+ public final void shutdown()
+ {
+ lifeCycleManager.stop();
+ }
+}
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConnectorFactory.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConnectorFactory.java
new file mode 100644
index 000000000000..72412de9c46d
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConnectorFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.influxdb;
+
+import com.google.inject.Injector;
+import io.airlift.bootstrap.Bootstrap;
+import io.airlift.json.JsonModule;
+import io.trino.plugin.base.TypeDeserializerModule;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorContext;
+import io.trino.spi.connector.ConnectorFactory;
+
+import java.util.Map;
+
+import static io.trino.plugin.base.Versions.checkStrictSpiVersionMatch;
+import static java.util.Objects.requireNonNull;
+
+public class InfluxConnectorFactory
+ implements ConnectorFactory
+{
+ @Override
+ public String getName()
+ {
+ return "influxdb";
+ }
+
+ @Override
+ public Connector create(String catalogName, Map requiredConfig, ConnectorContext context)
+ {
+ requireNonNull(requiredConfig, "requiredConfig is null");
+ checkStrictSpiVersionMatch(context, this);
+
+ // A plugin is not required to use Guice; it is just very convenient
+ Bootstrap app = new Bootstrap(
+ new JsonModule(),
+ new TypeDeserializerModule(context.getTypeManager()),
+ new InfluxModule());
+
+ Injector injector = app
+ .doNotInitializeLogging()
+ .setRequiredConfigurationProperties(requiredConfig)
+ .initialize();
+
+ return injector.getInstance(InfluxConnector.class);
+ }
+}
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConstant.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConstant.java
new file mode 100644
index 000000000000..43fe3a3629a7
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConstant.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.influxdb;
+
+public class InfluxConstant
+{
+ public enum ColumnName
+ {
+ TIME("time");
+
+ private final String name;
+
+ ColumnName(String name)
+ {
+ this.name = name;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+ }
+
+ public enum ColumnKind
+ {
+ TIME,
+ TAG,
+ FIELD,
+ }
+}
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxMetadata.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxMetadata.java
new file mode 100644
index 000000000000..ff4be454d8e5
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxMetadata.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.influxdb;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Inject;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.Assignment;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTableMetadata;
+import io.trino.spi.connector.ConnectorTableVersion;
+import io.trino.spi.connector.Constraint;
+import io.trino.spi.connector.ConstraintApplicationResult;
+import io.trino.spi.connector.LimitApplicationResult;
+import io.trino.spi.connector.ProjectionApplicationResult;
+import io.trino.spi.connector.RelationColumnsMetadata;
+import io.trino.spi.connector.SchemaNotFoundException;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.connector.SchemaTablePrefix;
+import io.trino.spi.connector.TableNotFoundException;
+import io.trino.spi.expression.ConnectorExpression;
+import io.trino.spi.predicate.Domain;
+import io.trino.spi.predicate.EquatableValueSet;
+import io.trino.spi.predicate.Range;
+import io.trino.spi.predicate.SortedRangeSet;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.security.TrinoPrincipal;
+import io.trino.spi.type.TimestampType;
+import jakarta.annotation.Nullable;
+import org.influxdb.dto.QueryResult;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.trino.plugin.influxdb.TypeUtils.isPushdownSupportedType;
+import static io.trino.spi.StandardErrorCode.CATALOG_STORE_ERROR;
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
+import static io.trino.spi.connector.RelationColumnsMetadata.forTable;
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.BooleanType.BOOLEAN;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+import static java.lang.Math.toIntExact;
+import static java.util.Objects.requireNonNull;
+
+public class InfluxMetadata
+ implements ConnectorMetadata
+{
+ private final InfluxClient client;
+
+ @Inject
+ public InfluxMetadata(InfluxClient client)
+ {
+ this.client = requireNonNull(client, "client is null");
+ }
+
+ @Override
+ public List listSchemaNames(ConnectorSession session)
+ {
+ return ImmutableList.copyOf(client.listSchemaNames());
+ }
+
+ @Override
+ public List listTables(ConnectorSession session, Optional optionalSchemaName)
+ {
+ Set schemaNames = optionalSchemaName.map(ImmutableSet::of)
+ .orElseGet(() -> ImmutableSet.copyOf(client.listSchemaNames()));
+
+ return schemaNames.stream()
+ .flatMap(schemaName -> client.getSchemaTableNames(schemaName).stream())
+ .collect(toImmutableList());
+ }
+
+ @Nullable
+ @Override
+ public ConnectorTableHandle getTableHandle(
+ ConnectorSession session,
+ SchemaTableName schemaTableName,
+ Optional startVersion,
+ Optional endVersion)
+ {
+ if (startVersion.isPresent() || endVersion.isPresent()) {
+ throw new TrinoException(NOT_SUPPORTED, "This connector does not support versioned tables");
+ }
+
+ return getTableHandle(session, schemaTableName);
+ }
+
+ @Nullable
+ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName schemaTableName)
+ {
+ return client.findTableHandle(schemaTableName.getSchemaName(), schemaTableName.getTableName())
+ .map(ignored -> InfluxTableHandle.of(schemaTableName.getSchemaName(), schemaTableName.getTableName())).orElse(null);
+ }
+
+ @Override
+ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ return getTableMetadata(((InfluxTableHandle) tableHandle).toSchemaTableName())
+ .orElseThrow(() -> new RuntimeException("The table handle is invalid " + tableHandle));
+ }
+
+ @Override
+ public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ InfluxTableHandle influxTableHandle = (InfluxTableHandle) tableHandle;
+
+ InfluxTableHandle table = client.findTableHandle(influxTableHandle.schemaName(), influxTableHandle.tableName())
+ .orElseThrow(() -> new TableNotFoundException(influxTableHandle.toSchemaTableName()));
+
+ ImmutableMap.Builder columnHandles =
+ ImmutableMap.builderWithExpectedSize(table.columns().size());
+ for (InfluxColumnHandle column : table.columns()) {
+ columnHandles.put(column.columnName(), column);
+ }
+ return columnHandles.buildOrThrow();
+ }
+
+ @Override
+ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
+ {
+ InfluxColumnHandle influxColumnHandle = (InfluxColumnHandle) columnHandle;
+ return new ColumnMetadata(influxColumnHandle.columnName(), influxColumnHandle.columnType());
+ }
+
+ @Override
+ public Iterator streamRelationColumns(ConnectorSession session, Optional schemaName, UnaryOperator> relationFilter)
+ {
+ Map relationColumns = new HashMap<>();
+
+ SchemaTablePrefix prefix = schemaName.map(SchemaTablePrefix::new)
+ .orElseGet(SchemaTablePrefix::new);
+
+ for (SchemaTableName tableName : listTables(session, prefix.getSchema())) {
+ Optional tableMetadata = getTableMetadata(tableName);
+ if (tableMetadata.isPresent()) {
+ relationColumns.put(tableName, forTable(tableName, tableMetadata.get().getColumns()));
+ }
+ }
+
+ return relationFilter.apply(relationColumns.keySet()).stream()
+ .map(relationColumns::get)
+ .iterator();
+ }
+
+ @Override
+ public Optional> applyLimit(
+ ConnectorSession session,
+ ConnectorTableHandle handle,
+ long limit)
+ {
+ InfluxTableHandle tableHandle = (InfluxTableHandle) handle;
+ // InfluxQL Limit 0 is equivalent to setting no limit
+ if (limit == 0) {
+ return Optional.empty();
+ }
+ // InfluxQL doesn't support limit number greater than integer max
+ if (limit > Integer.MAX_VALUE) {
+ return Optional.empty();
+ }
+ if (tableHandle.limit().isPresent() && tableHandle.limit().getAsInt() <= limit) {
+ return Optional.empty();
+ }
+
+ return Optional.of(new LimitApplicationResult<>(
+ tableHandle.withLimit(OptionalInt.of(toIntExact(limit))),
+ true,
+ false));
+ }
+
+ @Override
+ public void dropTable(ConnectorSession session, ConnectorTableHandle handle)
+ {
+ InfluxTableHandle tableHandle = (InfluxTableHandle) handle;
+ if (client.findTableHandle(tableHandle.schemaName(), tableHandle.tableName()).isEmpty()) {
+ throw new TableNotFoundException(tableHandle.toSchemaTableName());
+ }
+
+ client.dropTable(tableHandle.schemaName(), tableHandle.tableName());
+ }
+
+ @Override
+ public void dropSchema(ConnectorSession session, String schemaName, boolean cascade)
+ {
+ if (!client.listSchemaNames().contains(schemaName)) {
+ throw new SchemaNotFoundException(schemaName);
+ }
+
+ if (!cascade) {
+ boolean isEmpty = listTables(session, Optional.of(schemaName)).isEmpty();
+ if (!isEmpty) {
+ throw new TrinoException(SCHEMA_NOT_EMPTY, "Cannot drop non-empty schema '%s'".formatted(schemaName));
+ }
+ }
+ client.dropSchema(schemaName);
+ }
+
+ @Override
+ public void createSchema(ConnectorSession session, String schemaName, Map properties, TrinoPrincipal owner)
+ {
+ QueryResult result = client.createSchema(schemaName);
+ if (!result.getResults().getFirst().hasError()) {
+ return;
+ }
+ throw new TrinoException(CATALOG_STORE_ERROR, "Failed to create schema: " + result.getResults().getFirst().getError());
+ }
+
+ @Override
+ public Optional> applyFilter(
+ ConnectorSession session,
+ ConnectorTableHandle handle,
+ Constraint constraint)
+ {
+ InfluxTableHandle tableHandle = (InfluxTableHandle) handle;
+ TupleDomain oldDomain = tableHandle.constraint();
+ TupleDomain newDomain = oldDomain.intersect(constraint.getSummary());
+ TupleDomain remainingFilter;
+ if (newDomain.isNone()) {
+ remainingFilter = TupleDomain.all();
+ }
+ else {
+ Map domains = newDomain.getDomains().orElseThrow();
+ Map supported = new HashMap<>();
+ Map unsupported = new HashMap<>();
+ domains.forEach((key, domain) -> {
+ if (isPushdownSupportedType(((InfluxColumnHandle) key).columnType())
+ && isPushdownSupportedDomain(domain)) {
+ supported.put(key, domain);
+ }
+ else {
+ unsupported.put(key, domain);
+ }
+ });
+ newDomain = TupleDomain.withColumnDomains(supported);
+ remainingFilter = TupleDomain.withColumnDomains(unsupported);
+ }
+
+ if (oldDomain.equals(newDomain)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(new ConstraintApplicationResult<>(
+ tableHandle.withConstraint(newDomain),
+ remainingFilter,
+ constraint.getExpression(),
+ false));
+ }
+
+ @Override
+ public Optional> applyProjection(
+ ConnectorSession session,
+ ConnectorTableHandle handle,
+ List projections,
+ Map assignments)
+ {
+ InfluxTableHandle tableHandle = (InfluxTableHandle) handle;
+ List oldProjections = ((InfluxTableHandle) handle).projections();
+ List newProjections = ImmutableList.copyOf(assignments.values());
+ if (oldProjections.equals(newProjections)) {
+ return Optional.empty();
+ }
+
+ List assignmentsList = assignments.entrySet().stream()
+ .map(assignment -> new Assignment(
+ assignment.getKey(),
+ assignment.getValue(),
+ ((InfluxColumnHandle) assignment.getValue()).columnType()))
+ .collect(toImmutableList());
+ return Optional.of(new ProjectionApplicationResult<>(
+ tableHandle.withProjections(newProjections),
+ projections,
+ assignmentsList,
+ false));
+ }
+
+ private Optional getTableMetadata(SchemaTableName schemaTableName)
+ {
+ Optional tableHandle = client.findTableHandle(schemaTableName.getSchemaName(), schemaTableName.getTableName());
+
+ return tableHandle.map(table -> {
+ ImmutableList.Builder columnMetadataBuilder =
+ ImmutableList.builderWithExpectedSize(table.columns().size());
+ List columns = table.columns();
+ for (InfluxColumnHandle column : columns) {
+ columnMetadataBuilder.add(new ColumnMetadata(column.columnName(), column.columnType()));
+ }
+ return new ConnectorTableMetadata(schemaTableName, columnMetadataBuilder.build());
+ });
+ }
+
+ private static boolean isPushdownSupportedDomain(Domain domain)
+ {
+ if (domain.getValues() instanceof SortedRangeSet rangeSet) {
+ if (rangeSet.getOrderedRanges().isEmpty()) {
+ return false;
+ }
+ List ranges = rangeSet.getOrderedRanges();
+ return isRangeSupportsTimestamp(ranges) ||
+ isRangeSupportsBoolean(ranges) ||
+ isRangeSupportsNumber(ranges) ||
+ isRangeSupportsVarchar(ranges);
+ }
+ if (domain.getValues() instanceof EquatableValueSet valueSet) {
+ return !valueSet.getDiscreteSet().isEmpty();
+ }
+ return false;
+ }
+
+ private static boolean isRangeSupportsVarchar(List ranges)
+ {
+ return ranges.stream().allMatch(range -> range.getType() == VARCHAR) &&
+ ranges.stream().anyMatch(Range::isSingleValue);
+ }
+
+ private static boolean isRangeSupportsBoolean(List ranges)
+ {
+ return ranges.stream().allMatch(range -> range.getType() == BOOLEAN) &&
+ ranges.stream().anyMatch(Range::isSingleValue);
+ }
+
+ private static boolean isRangeSupportsNumber(List ranges)
+ {
+ return ranges.stream().allMatch(range -> range.getType() == DOUBLE || range.getType() == BIGINT);
+ }
+
+ private static boolean isRangeSupportsTimestamp(List ranges)
+ {
+ return ranges.stream().allMatch(range -> range.getType() instanceof TimestampType);
+ }
+}
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxModule.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxModule.java
new file mode 100644
index 000000000000..d55bb1fd2ed3
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxModule.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.influxdb;
+
+import com.google.inject.Binder;
+import com.google.inject.Module;
+
+import static com.google.inject.Scopes.SINGLETON;
+import static io.airlift.configuration.ConfigBinder.configBinder;
+
+public class InfluxModule
+ implements Module
+{
+ @Override
+ public void configure(Binder binder)
+ {
+ configBinder(binder).bindConfig(InfluxConfig.class);
+
+ binder.bind(InfluxClient.class).to(NativeInfluxClient.class).in(SINGLETON);
+ binder.bind(NativeInfluxClient.class).in(SINGLETON);
+ binder.bind(InfluxMetadata.class).in(SINGLETON);
+ binder.bind(InfluxSplitManager.class).in(SINGLETON);
+ binder.bind(InfluxRecordSetProvider.class).in(SINGLETON);
+ binder.bind(InfluxConnector.class).in(SINGLETON);
+ }
+}
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxPlugin.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxPlugin.java
new file mode 100644
index 000000000000..800c0754389b
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxPlugin.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.influxdb;
+
+import com.google.common.collect.ImmutableList;
+import io.trino.spi.Plugin;
+import io.trino.spi.connector.ConnectorFactory;
+
+public class InfluxPlugin
+ implements Plugin
+{
+ @Override
+ public Iterable getConnectorFactories()
+ {
+ return ImmutableList.of(new InfluxConnectorFactory());
+ }
+}
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxRecord.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxRecord.java
new file mode 100644
index 000000000000..6bd6cb61c222
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxRecord.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.influxdb;
+
+import java.util.List;
+
+public record InfluxRecord(List columns, List> values)
+{
+}
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxRecordCursor.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxRecordCursor.java
new file mode 100644
index 000000000000..0a06b746969a
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxRecordCursor.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.influxdb;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import io.airlift.slice.Slice;
+import io.trino.spi.connector.RecordCursor;
+import io.trino.spi.type.Type;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.airlift.slice.Slices.utf8Slice;
+import static io.trino.plugin.influxdb.InfluxConstant.ColumnName.TIME;
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.BooleanType.BOOLEAN;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.TimestampType.TIMESTAMP_NANOS;
+import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
+import static java.time.format.DateTimeFormatter.ISO_DATE_TIME;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+public class InfluxRecordCursor
+ implements RecordCursor
+{
+ private long totalBytes;
+ private final List columnHandles;
+ private final Iterator> sourceDataIterator;
+ private List