diff --git a/core/trino-server/src/main/provisio/trino.xml b/core/trino-server/src/main/provisio/trino.xml
index 8fed2ae3937d..8fe719a3d48a 100644
--- a/core/trino-server/src/main/provisio/trino.xml
+++ b/core/trino-server/src/main/provisio/trino.xml
@@ -380,4 +380,10 @@
+
+
+
+
+
+
diff --git a/docs/src/main/sphinx/connector.md b/docs/src/main/sphinx/connector.md
index 9c6259eb632f..289b18655df0 100644
--- a/docs/src/main/sphinx/connector.md
+++ b/docs/src/main/sphinx/connector.md
@@ -23,6 +23,7 @@ Hive
Hudi
Iceberg
Ignite
+InfluxDB
JMX
Kafka
Kudu
diff --git a/docs/src/main/sphinx/connector/influxdb.rst b/docs/src/main/sphinx/connector/influxdb.rst
new file mode 100644
index 000000000000..c7ad8e59fe37
--- /dev/null
+++ b/docs/src/main/sphinx/connector/influxdb.rst
@@ -0,0 +1,167 @@
+=======================
+InfluxDB connector
+=======================
+
+The InfluxDB Connector allows access to `InfluxDB `_ data from Trino.
+This document describes how to setup the InfluxDB Connector to run SQL queries against InfluxDB.
+
+Requirements
+------------
+
+To connect to InfluxDB, you need:
+
+* InfluxDB v1.8.0 or higher.(v2.x not currently supported)
+* Network access from the Trino coordinator and workers to InfluxDB.
+ Port 8086 is the default port.
+
+Configuration
+-------------
+
+To configure the InfluxDB connector, create a catalog properties file
+``etc/catalog/example.properties`` with the following contents,
+replacing the properties as appropriate:
+
+.. code-block:: properties
+
+ connector.name=influxdb
+ influx.endpoint=http://localhost:8086
+ influx.username=username
+ influx.password=password
+
+Configuration properties
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. list-table:: The following configuration properties are available
+ :widths: 35, 55, 10
+ :header-rows: 1
+
+ * - Property name
+ - Description
+ - Default
+ * - ``influx.endpoint``
+ - Endpoint of the InfluxDB server to connect to. This property is required.
+ -
+ * - ``influx.username``
+ - User name to use to connect to InfluxDB.
+ -
+ * - ``influx.password``
+ - Password to use to connect to InfluxDB.
+ -
+ * - ``influx.connect-timeout``
+ - The socket connect timeout to InfluxDB server.
+ - ``10s``
+ * - ``influx.write-timeout``
+ - The socket write timeout to InfluxDB server.
+ - ``10s``
+ * - ``influx.read-timeout``
+ - The socket read timeout to InfluxDB server.
+ - ``60s``
+
+.. _influxdb-type-mapping:
+
+Type mapping
+------------
+
+Because Trino and InfluxDB each support types that the other does not, this
+connector :ref:`maps some types ` when reading data.
+
+
+InfluxDB type to Trino type mapping
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The connector maps InfluxDB types to the corresponding Trino types
+according to the following table:
+
+.. list-table:: InfluxDB type to Trino type mapping
+ :widths: 30, 30, 50
+ :header-rows: 1
+
+ * - InfluxDB type
+ - Trino type
+ - Notes
+ * - ``TIMESTAMP``
+ - ``TIMESTAMP``
+ - ``Timestamp`` key ``time`` is unix nanosecond timestamp in influxdb. see `data types `_.
+ * - ``BOOLEAN``
+ - ``BOOLEAN``
+ -
+ * - ``FLOAT``
+ - ``DOUBLE``
+ - ``FLOAT`` is 64-bit floating-point numbers in influxdb. see `data types `_.
+ * - ``INTEGER``
+ - ``BIGINT``
+ - ``INTEGER`` is 64-bit integers in influxdb. see `data types `_.
+ * - ``STRING``
+ - ``VARCHAR``
+ -
+
+No other types are supported.
+
+
+SQL support
+-----------
+
+The connector provides :ref:`globally available ` and
+:ref:`read operation ` statements to access data and
+metadata in the InfluxDB catalog.
+
+Creation and deletion of schemas are supported.
+As InfluxDB does not support creation of empty tables, creation of tables is not supported but deletion of tables is supported.
+
+.. _influxdb-pushdown:
+
+Pushdown support
+^^^^^^^^^^^^^^^^
+
+The connector supports pushdown for a number of operations:
+
+* :ref:`limit-pushdown`
+
+* :ref:`projection-pushdown`
+
+* :ref:`predicate-pushdown`
+
+But there are some special limitations for predicate-pushdown:
+
+To understand easily, think a measurement "student" in influxdb
+
+.. code-block:: text
+
+ SHOW TAG KEYS:
+ tagKey
+ ------
+ grade
+ class
+
+ SHOW FIELD KEYS:
+ fieldKey fieldType
+ -------- ---------
+ name string
+ age integer
+ score float
+
+
+Predicate pushdown of keys of ``STRING``, ``BOOLEAN``, ``INTEGER`` and ``FLOAT`` types are supported:
+
+For ``TIMESTAMP`` key, supports equality predicates ``=``, range predicates, such as ``>``, ``<``.
+
+For keys of ``STRING`` or ``BOOLEAN`` type (both tag set and field keys in InfluxDB), supports equality predicates ``=`` only.
+
+For keys of ``INTEGER`` or ``FLOAT`` type (field keys in InfluxDB), supports
+equality predicates ``=``, inequality predicates ``!=``, and range predicates, such as ``>``, ``<``, or ``BETWEEN``.
+
+.. note::
+ Decimal integer literals ``int_lit = ("1"…"9"){digit}`` pushdown works, hexadecimal and octal literals are not currently supported.
+ Floating-point literals ``float_lit = int_lit"."int_lit`` pushdown works, exponents are not currently supported.
+
+.. code-block:: sql
+
+ -- Not pushed down
+ SELECT * FROM student WHERE name > 'CLOUD';
+ SELECT * FROM student WHERE score <> 1.0E2;
+ -- Pushed down
+ SELECT * FROM student WHERE name = 'CLOUD';
+ SELECT * FROM student WHERE name != 'CLOUD';
+ SELECT * FROM student WHERE age > 12 and age < 14;
+ SELECT * FROM student WHERE score > 80.0 and score != 100;
+ SELECT * FROM student WHERE time >= timestamp '2020-01-01 00:00:00' and time <= timestamp '2020-01-01 23:59:59';
diff --git a/plugin/trino-influxdb/pom.xml b/plugin/trino-influxdb/pom.xml
new file mode 100644
index 000000000000..fb85394bc53d
--- /dev/null
+++ b/plugin/trino-influxdb/pom.xml
@@ -0,0 +1,215 @@
+
+
+ 4.0.0
+
+ io.trino
+ trino-root
+ 471-SNAPSHOT
+ ../../pom.xml
+
+
+ trino-influxdb
+ trino-plugin
+ Trino - InfluxDB Connector
+
+
+ ${project.parent.basedir}
+
+
+
+
+
+ com.google.guava
+ guava
+
+
+ com.google.inject
+ guice
+
+
+
+ com.squareup.okhttp3
+ logging-interceptor
+
+
+ com.squareup.okhttp3
+ okhttp
+
+
+ io.airlift
+ bootstrap
+
+
+ io.airlift
+ configuration
+
+
+ io.airlift
+ json
+
+
+ io.airlift
+ log
+
+
+ io.airlift
+ units
+
+
+ io.trino
+ trino-plugin-toolkit
+
+
+ jakarta.annotation
+ jakarta.annotation-api
+
+
+ jakarta.validation
+ jakarta.validation-api
+
+
+
+ org.influxdb
+ influxdb-java
+ 2.23
+
+
+ com.squareup.okhttp3
+ logging-interceptor
+
+
+ com.squareup.okhttp3
+ okhttp
+
+
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+ io.airlift
+ slice
+ provided
+
+
+
+ io.opentelemetry
+ opentelemetry-api
+ provided
+
+
+ io.opentelemetry
+ opentelemetry-context
+ provided
+
+
+
+ io.trino
+ trino-spi
+ provided
+
+
+ org.openjdk.jol
+ jol-core
+ provided
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ runtime
+
+
+ io.airlift
+ node
+ runtime
+
+
+ io.airlift
+ http-server
+ test
+
+
+
+ io.airlift
+ junit-extensions
+ test
+
+
+ io.airlift
+ testing
+ test
+
+
+ io.trino
+ trino-client
+ test
+
+
+ io.trino
+ trino-main
+ test
+
+
+ io.trino
+ trino-main
+ test-jar
+ test
+
+
+ io.trino
+ trino-spi
+ test-jar
+ test
+
+
+ io.trino
+ trino-testing
+ test
+
+
+ io.trino
+ trino-testing-services
+ test
+
+
+ io.trino
+ trino-tpch
+ test
+
+
+ io.trino.tpch
+ tpch
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ 5.11.3
+ test
+
+
+ org.testcontainers
+ influxdb
+ test
+
+
+ org.testcontainers
+ testcontainers
+ test
+
+
+ org.testng
+ testng
+ test
+
+
+
+
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxClient.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxClient.java
new file mode 100644
index 000000000000..f68e2054be74
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxClient.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.influxdb;
+
+import io.trino.spi.connector.SchemaTableName;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+
+import java.util.List;
+import java.util.Optional;
+
+public interface InfluxClient
+{
+ InfluxRecord query(Query query);
+
+ List getSchemaNames();
+
+ List getSchemaTableNames(String schemaName);
+
+ Optional getTableHandle(String schemaName, String tableName);
+
+ void dropTable(String schemaName, String tableName);
+
+ QueryResult createDatabase(String databaseName);
+
+ void dropDatabase(String databaseName);
+}
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxColumnHandle.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxColumnHandle.java
new file mode 100644
index 000000000000..988455a7c08b
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxColumnHandle.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.influxdb;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.type.Type;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static io.trino.plugin.influxdb.InfluxConstant.ColumnKind;
+import static java.util.Objects.requireNonNull;
+
+public class InfluxColumnHandle
+ implements ColumnHandle
+{
+ private final String name;
+ private final Type type;
+ private final ColumnKind kind;
+
+ @JsonCreator
+ public InfluxColumnHandle(
+ @JsonProperty("columnName") String name,
+ @JsonProperty("columnType") Type type,
+ @JsonProperty("columnKind") ColumnKind kind)
+ {
+ this.name = requireNonNull(name, "columnName is null");
+ this.type = requireNonNull(type, "columnType is null");
+ this.kind = requireNonNull(kind, "columnKind is null");
+ }
+
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @JsonProperty
+ public Type getType()
+ {
+ return type;
+ }
+
+ @JsonProperty
+ public ColumnKind getKind()
+ {
+ return kind;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(name, type, kind);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
+ }
+
+ InfluxColumnHandle other = (InfluxColumnHandle) obj;
+ return Objects.equals(this.name, other.name) &&
+ Objects.equals(this.type, other.type) &&
+ Objects.equals(this.kind, other.kind);
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("name", name)
+ .add("type", type)
+ .add("kind", kind)
+ .toString();
+ }
+}
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConfig.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConfig.java
new file mode 100644
index 000000000000..8a688c889c13
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConfig.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.influxdb;
+
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigSecuritySensitive;
+import io.airlift.units.Duration;
+import io.airlift.units.MinDuration;
+import jakarta.validation.constraints.AssertFalse;
+import jakarta.validation.constraints.NotNull;
+import jakarta.validation.constraints.Pattern;
+
+import java.util.Optional;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+public class InfluxConfig
+{
+ private String endpoint;
+ private String username;
+ private String password;
+ private Duration connectTimeOut = new Duration(10, SECONDS);
+ private Duration readTimeOut = new Duration(60, SECONDS);
+
+ @NotNull
+ @Pattern(message = "Invalid endpoint. Expected http:// ", regexp = "^http://.*")
+ public String getEndpoint()
+ {
+ return endpoint;
+ }
+
+ @Config("influx.endpoint")
+ public InfluxConfig setEndpoint(String endpoint)
+ {
+ this.endpoint = endpoint;
+ return this;
+ }
+
+ @NotNull
+ public Optional getUsername()
+ {
+ return Optional.ofNullable(username);
+ }
+
+ @Config("influx.username")
+ public InfluxConfig setUsername(String username)
+ {
+ this.username = username;
+ return this;
+ }
+
+ @NotNull
+ public Optional getPassword()
+ {
+ return Optional.ofNullable(password);
+ }
+
+ @ConfigSecuritySensitive
+ @Config("influx.password")
+ public InfluxConfig setPassword(String password)
+ {
+ this.password = password;
+ return this;
+ }
+
+ @MinDuration("0s")
+ public Duration getConnectTimeOut()
+ {
+ return connectTimeOut;
+ }
+
+ @Config("influx.connect-timeout")
+ public InfluxConfig setConnectTimeOut(Duration connectTimeOut)
+ {
+ this.connectTimeOut = connectTimeOut;
+ return this;
+ }
+
+ @MinDuration("0s")
+ public Duration getReadTimeOut()
+ {
+ return readTimeOut;
+ }
+
+ @Config("influx.read-timeout")
+ public InfluxConfig setReadTimeOut(Duration readTimeOut)
+ {
+ this.readTimeOut = readTimeOut;
+ return this;
+ }
+
+ @AssertFalse(message = "'influx.username' and 'influx.password' must be both empty or both non-empty.")
+ public boolean isValidPasswordConfig()
+ {
+ return (username == null) ^ (password == null);
+ }
+}
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConnector.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConnector.java
new file mode 100644
index 000000000000..8107dc2035da
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConnector.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.influxdb;
+
+import com.google.inject.Inject;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorRecordSetProvider;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorSplitManager;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.transaction.IsolationLevel;
+
+import static io.trino.plugin.influxdb.InfluxTransactionHandle.INSTANCE;
+import static java.util.Objects.requireNonNull;
+
+public class InfluxConnector
+ implements Connector
+{
+ private final LifeCycleManager lifeCycleManager;
+ private final InfluxMetadata metadata;
+ private final InfluxSplitManager splitManager;
+ private final InfluxRecordSetProvider recordSetProvider;
+
+ @Inject
+ public InfluxConnector(
+ LifeCycleManager lifeCycleManager,
+ InfluxMetadata metadata,
+ InfluxSplitManager splitManager,
+ InfluxRecordSetProvider recordSetProvider)
+ {
+ this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+ this.metadata = requireNonNull(metadata, "metadata is null");
+ this.splitManager = requireNonNull(splitManager, "splitManager is null");
+ this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
+ }
+
+ @Override
+ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit)
+ {
+ return INSTANCE;
+ }
+
+ @Override
+ public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transactionHandle)
+ {
+ return metadata;
+ }
+
+ @Override
+ public ConnectorSplitManager getSplitManager()
+ {
+ return splitManager;
+ }
+
+ @Override
+ public ConnectorRecordSetProvider getRecordSetProvider()
+ {
+ return recordSetProvider;
+ }
+
+ @Override
+ public final void shutdown()
+ {
+ lifeCycleManager.stop();
+ }
+}
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConnectorFactory.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConnectorFactory.java
new file mode 100644
index 000000000000..72412de9c46d
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConnectorFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.influxdb;
+
+import com.google.inject.Injector;
+import io.airlift.bootstrap.Bootstrap;
+import io.airlift.json.JsonModule;
+import io.trino.plugin.base.TypeDeserializerModule;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorContext;
+import io.trino.spi.connector.ConnectorFactory;
+
+import java.util.Map;
+
+import static io.trino.plugin.base.Versions.checkStrictSpiVersionMatch;
+import static java.util.Objects.requireNonNull;
+
+public class InfluxConnectorFactory
+ implements ConnectorFactory
+{
+ @Override
+ public String getName()
+ {
+ return "influxdb";
+ }
+
+ @Override
+ public Connector create(String catalogName, Map requiredConfig, ConnectorContext context)
+ {
+ requireNonNull(requiredConfig, "requiredConfig is null");
+ checkStrictSpiVersionMatch(context, this);
+
+ // A plugin is not required to use Guice; it is just very convenient
+ Bootstrap app = new Bootstrap(
+ new JsonModule(),
+ new TypeDeserializerModule(context.getTypeManager()),
+ new InfluxModule());
+
+ Injector injector = app
+ .doNotInitializeLogging()
+ .setRequiredConfigurationProperties(requiredConfig)
+ .initialize();
+
+ return injector.getInstance(InfluxConnector.class);
+ }
+}
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConstant.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConstant.java
new file mode 100644
index 000000000000..43fe3a3629a7
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxConstant.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.influxdb;
+
+public class InfluxConstant
+{
+ public enum ColumnName
+ {
+ TIME("time");
+
+ private final String name;
+
+ ColumnName(String name)
+ {
+ this.name = name;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+ }
+
+ public enum ColumnKind
+ {
+ TIME,
+ TAG,
+ FIELD,
+ }
+}
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxMetadata.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxMetadata.java
new file mode 100644
index 000000000000..f02256ae78f7
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxMetadata.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.influxdb;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Inject;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.Assignment;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTableMetadata;
+import io.trino.spi.connector.ConnectorTableVersion;
+import io.trino.spi.connector.Constraint;
+import io.trino.spi.connector.ConstraintApplicationResult;
+import io.trino.spi.connector.LimitApplicationResult;
+import io.trino.spi.connector.ProjectionApplicationResult;
+import io.trino.spi.connector.SchemaNotFoundException;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.connector.SchemaTablePrefix;
+import io.trino.spi.connector.TableColumnsMetadata;
+import io.trino.spi.connector.TableNotFoundException;
+import io.trino.spi.expression.ConnectorExpression;
+import io.trino.spi.predicate.Domain;
+import io.trino.spi.predicate.EquatableValueSet;
+import io.trino.spi.predicate.Range;
+import io.trino.spi.predicate.SortedRangeSet;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.security.TrinoPrincipal;
+import io.trino.spi.type.TimestampType;
+import jakarta.annotation.Nullable;
+import org.influxdb.dto.QueryResult;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.trino.plugin.influxdb.TypeUtils.isPushdownSupportedType;
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.BooleanType.BOOLEAN;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+import static java.lang.Math.toIntExact;
+import static java.util.Objects.requireNonNull;
+
+public class InfluxMetadata
+ implements ConnectorMetadata
+{
+ private final InfluxClient client;
+
+ @Inject
+ public InfluxMetadata(InfluxClient client)
+ {
+ this.client = requireNonNull(client, "client is null");
+ }
+
+ @Override
+ public List listSchemaNames(ConnectorSession session)
+ {
+ return ImmutableList.copyOf(client.getSchemaNames());
+ }
+
+ @Override
+ public List listTables(ConnectorSession session, Optional optionalSchemaName)
+ {
+ Set schemaNames = optionalSchemaName.map(ImmutableSet::of)
+ .orElseGet(() -> ImmutableSet.copyOf(client.getSchemaNames()));
+
+ return schemaNames.stream()
+ .flatMap(schemaName -> client.getSchemaTableNames(schemaName).stream())
+ .collect(toImmutableList());
+ }
+
+ @Nullable
+ @Override
+ public ConnectorTableHandle getTableHandle(
+ ConnectorSession session,
+ SchemaTableName schemaTableName,
+ Optional startVersion,
+ Optional endVersion)
+ {
+ if (startVersion.isPresent() || endVersion.isPresent()) {
+ throw new TrinoException(NOT_SUPPORTED, "This connector does not support versioned tables");
+ }
+
+ return getTableHandle(session, schemaTableName);
+ }
+
+ @Nullable
+ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName schemaTableName)
+ {
+ return client.getTableHandle(schemaTableName.getSchemaName(), schemaTableName.getTableName())
+ .map(_ -> new InfluxTableHandle(schemaTableName.getSchemaName(), schemaTableName.getTableName())).orElse(null);
+ }
+
+ @Override
+ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ return getTableMetadata(((InfluxTableHandle) tableHandle).toSchemaTableName())
+ .orElseThrow(() -> new RuntimeException("The table handle is invalid " + tableHandle));
+ }
+
+ @Override
+ public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ InfluxTableHandle influxTableHandle = (InfluxTableHandle) tableHandle;
+
+ InfluxTableHandle table = client.getTableHandle(influxTableHandle.getSchemaName(), influxTableHandle.getTableName())
+ .orElseThrow(() -> new TableNotFoundException(influxTableHandle.toSchemaTableName()));
+
+ ImmutableMap.Builder columnHandles =
+ ImmutableMap.builderWithExpectedSize(table.getColumns().size());
+ for (InfluxColumnHandle column : table.getColumns()) {
+ columnHandles.put(column.getName(), column);
+ }
+ return columnHandles.buildOrThrow();
+ }
+
+ @Override
+ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
+ {
+ InfluxColumnHandle influxColumnHandle = (InfluxColumnHandle) columnHandle;
+ return new ColumnMetadata(influxColumnHandle.getName(), influxColumnHandle.getType());
+ }
+
+ @Override
+ public Iterator streamTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
+ {
+ requireNonNull(prefix, "prefix is null");
+ ImmutableMap.Builder> columns = ImmutableMap.builder();
+ for (SchemaTableName tableName : listTables(session, prefix.getSchema())) {
+ if (!prefix.matches(tableName)) {
+ continue;
+ }
+ getTableMetadata(tableName).ifPresent(tableMetadata -> columns.put(tableName, tableMetadata.getColumns()));
+ }
+ return columns.buildOrThrow().entrySet().stream()
+ .map(entry -> TableColumnsMetadata.forTable(entry.getKey(), entry.getValue())).iterator();
+ }
+
+ @Override
+ public Optional> applyLimit(
+ ConnectorSession session,
+ ConnectorTableHandle handle,
+ long limit)
+ {
+ InfluxTableHandle tableHandle = (InfluxTableHandle) handle;
+ // InfluxQL Limit 0 is equivalent to setting no limit
+ if (limit == 0) {
+ return Optional.empty();
+ }
+ // InfluxQL doesn't support limit number greater than integer max
+ if (limit > Integer.MAX_VALUE) {
+ return Optional.empty();
+ }
+ if (tableHandle.getLimit().isPresent() && tableHandle.getLimit().getAsInt() <= limit) {
+ return Optional.empty();
+ }
+
+ return Optional.of(new LimitApplicationResult<>(
+ tableHandle.withLimit(OptionalInt.of(toIntExact(limit))),
+ true,
+ false));
+ }
+
+ @Override
+ public void dropTable(ConnectorSession session, ConnectorTableHandle handle)
+ {
+ InfluxTableHandle tableHandle = (InfluxTableHandle) handle;
+ if (client.getTableHandle(tableHandle.getSchemaName(), tableHandle.getTableName()).isEmpty()) {
+ throw new TableNotFoundException(tableHandle.toSchemaTableName());
+ }
+
+ client.dropTable(tableHandle.getSchemaName(), tableHandle.getTableName());
+ }
+
+ @Override
+ public void dropSchema(ConnectorSession session, String schemaName, boolean cascade)
+ {
+ if (!client.getSchemaNames().contains(schemaName)) {
+ throw new SchemaNotFoundException(schemaName);
+ }
+
+ if (!cascade) {
+ boolean isEmpty = listTables(session, Optional.of(schemaName)).isEmpty();
+ if (!isEmpty) {
+ throw new TrinoException(SCHEMA_NOT_EMPTY, "Cannot drop non-empty schema '%s'".formatted(schemaName));
+ }
+ }
+ client.dropDatabase(schemaName);
+ }
+
+ @Override
+ public void createSchema(ConnectorSession session, String schemaName, Map properties, TrinoPrincipal owner)
+ {
+ QueryResult result = client.createDatabase(schemaName);
+ if (!result.getResults().getFirst().hasError()) {
+ return;
+ }
+ throw new TrinoException(NOT_SUPPORTED, "Failed to create schema: " + result.getResults().getFirst().getError());
+ }
+
+ @Override
+ public Optional> applyFilter(
+ ConnectorSession session,
+ ConnectorTableHandle handle,
+ Constraint constraint)
+ {
+ InfluxTableHandle tableHandle = (InfluxTableHandle) handle;
+ TupleDomain oldDomain = tableHandle.getConstraint();
+ TupleDomain newDomain = oldDomain.intersect(constraint.getSummary());
+ TupleDomain remainingFilter;
+ if (newDomain.isNone()) {
+ remainingFilter = TupleDomain.all();
+ }
+ else {
+ Map domains = newDomain.getDomains().orElseThrow();
+ Map supported = new HashMap<>();
+ Map unsupported = new HashMap<>();
+ domains.forEach((key, domain) -> {
+ if (isPushdownSupportedType(((InfluxColumnHandle) key).getType())
+ && isPushdownSupportedDomain(domain)) {
+ supported.put(key, domain);
+ }
+ else {
+ unsupported.put(key, domain);
+ }
+ });
+ newDomain = TupleDomain.withColumnDomains(supported);
+ remainingFilter = TupleDomain.withColumnDomains(unsupported);
+ }
+
+ if (oldDomain.equals(newDomain)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(new ConstraintApplicationResult<>(
+ tableHandle.withConstraint(newDomain),
+ remainingFilter,
+ constraint.getExpression(),
+ false));
+ }
+
+ @Override
+ public Optional> applyProjection(
+ ConnectorSession session,
+ ConnectorTableHandle handle,
+ List projections,
+ Map assignments)
+ {
+ InfluxTableHandle tableHandle = (InfluxTableHandle) handle;
+ List oldProjections = ((InfluxTableHandle) handle).getProjections();
+ List newProjections = ImmutableList.copyOf(assignments.values());
+ if (oldProjections.equals(newProjections)) {
+ return Optional.empty();
+ }
+
+ List assignmentsList = assignments.entrySet().stream()
+ .map(assignment -> new Assignment(
+ assignment.getKey(),
+ assignment.getValue(),
+ ((InfluxColumnHandle) assignment.getValue()).getType()))
+ .collect(toImmutableList());
+ return Optional.of(new ProjectionApplicationResult<>(
+ tableHandle.withProjections(newProjections),
+ projections,
+ assignmentsList,
+ false));
+ }
+
+ private Optional getTableMetadata(SchemaTableName schemaTableName)
+ {
+ Optional tableHandle = client.getTableHandle(schemaTableName.getSchemaName(), schemaTableName.getTableName());
+
+ return tableHandle.map(table -> {
+ ImmutableList.Builder columnMetadataBuilder =
+ ImmutableList.builderWithExpectedSize(table.getColumns().size());
+ List columns = table.getColumns();
+ for (InfluxColumnHandle column : columns) {
+ columnMetadataBuilder.add(new ColumnMetadata(column.getName(), column.getType()));
+ }
+ return new ConnectorTableMetadata(schemaTableName, columnMetadataBuilder.build());
+ });
+ }
+
+ private boolean isPushdownSupportedDomain(Domain domain)
+ {
+ if (domain.getValues() instanceof SortedRangeSet rangeSet) {
+ if (rangeSet.getOrderedRanges().isEmpty()) {
+ return false;
+ }
+ List ranges = rangeSet.getOrderedRanges();
+ return isRangeSupportsTimestamp(ranges) ||
+ isRangeSupportsBoolean(ranges) ||
+ isRangeSupportsNumber(ranges) ||
+ isRangeSupportsVarchar(ranges);
+ }
+ else if (domain.getValues() instanceof EquatableValueSet valueSet) {
+ return !valueSet.getDiscreteSet().isEmpty();
+ }
+ return false;
+ }
+
+ private static boolean isRangeSupportsVarchar(List ranges)
+ {
+ return ranges.stream().allMatch(range -> range.getType() == VARCHAR) &&
+ ranges.stream().anyMatch(Range::isSingleValue);
+ }
+
+ private static boolean isRangeSupportsBoolean(List ranges)
+ {
+ return ranges.stream().allMatch(range -> range.getType() == BOOLEAN) &&
+ ranges.stream().anyMatch(Range::isSingleValue);
+ }
+
+ private static boolean isRangeSupportsNumber(List ranges)
+ {
+ return ranges.stream().allMatch(range -> range.getType() == DOUBLE || range.getType() == BIGINT);
+ }
+
+ private static boolean isRangeSupportsTimestamp(List ranges)
+ {
+ return ranges.stream().allMatch(range -> range.getType() instanceof TimestampType);
+ }
+}
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxModule.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxModule.java
new file mode 100644
index 000000000000..d55bb1fd2ed3
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxModule.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.influxdb;
+
+import com.google.inject.Binder;
+import com.google.inject.Module;
+
+import static com.google.inject.Scopes.SINGLETON;
+import static io.airlift.configuration.ConfigBinder.configBinder;
+
+public class InfluxModule
+ implements Module
+{
+ @Override
+ public void configure(Binder binder)
+ {
+ configBinder(binder).bindConfig(InfluxConfig.class);
+
+ binder.bind(InfluxClient.class).to(NativeInfluxClient.class).in(SINGLETON);
+ binder.bind(NativeInfluxClient.class).in(SINGLETON);
+ binder.bind(InfluxMetadata.class).in(SINGLETON);
+ binder.bind(InfluxSplitManager.class).in(SINGLETON);
+ binder.bind(InfluxRecordSetProvider.class).in(SINGLETON);
+ binder.bind(InfluxConnector.class).in(SINGLETON);
+ }
+}
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxPlugin.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxPlugin.java
new file mode 100644
index 000000000000..800c0754389b
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxPlugin.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.influxdb;
+
+import com.google.common.collect.ImmutableList;
+import io.trino.spi.Plugin;
+import io.trino.spi.connector.ConnectorFactory;
+
+public class InfluxPlugin
+ implements Plugin
+{
+ @Override
+ public Iterable getConnectorFactories()
+ {
+ return ImmutableList.of(new InfluxConnectorFactory());
+ }
+}
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxRecord.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxRecord.java
new file mode 100644
index 000000000000..1bcacf6224ca
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxRecord.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.influxdb;
+
+import java.util.List;
+import java.util.Objects;
+
+public class InfluxRecord
+{
+ private final List columns;
+ private final List> values;
+
+ public InfluxRecord(List columns, List> values)
+ {
+ this.columns = columns;
+ this.values = values;
+ }
+
+ public List getColumns()
+ {
+ return columns;
+ }
+
+ public List> getValues()
+ {
+ return values;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ InfluxRecord that = (InfluxRecord) o;
+ return Objects.equals(columns, that.columns) && Objects.equals(values, that.values);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(columns, values);
+ }
+}
diff --git a/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxRecordCursor.java b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxRecordCursor.java
new file mode 100644
index 000000000000..9135bb102f16
--- /dev/null
+++ b/plugin/trino-influxdb/src/main/java/io/trino/plugin/influxdb/InfluxRecordCursor.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.trino.plugin.influxdb;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import io.airlift.slice.Slice;
+import io.trino.spi.connector.RecordCursor;
+import io.trino.spi.type.Type;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.airlift.slice.Slices.utf8Slice;
+import static io.trino.plugin.influxdb.InfluxConstant.ColumnName.TIME;
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.BooleanType.BOOLEAN;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.TimestampType.TIMESTAMP_NANOS;
+import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
+import static java.time.format.DateTimeFormatter.ISO_DATE_TIME;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+public class InfluxRecordCursor
+ implements RecordCursor
+{
+ private long totalBytes;
+ private final List columnHandles;
+ private final Iterator> sourceDataIterator;
+ private List