diff --git a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/client/ElasticsearchClient.java b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/client/ElasticsearchClient.java index 53cc99ea5440..9f5c14216eac 100644 --- a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/client/ElasticsearchClient.java +++ b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/client/ElasticsearchClient.java @@ -122,6 +122,7 @@ public class ElasticsearchClient private final BackpressureRestHighLevelClient client; private final int scrollSize; private final Duration scrollTimeout; + private final Duration requestTimeout; private final AtomicReference> nodes = new AtomicReference<>(ImmutableSet.of()); private final ScheduledExecutorService executor = newSingleThreadScheduledExecutor(daemonThreadsNamed("NodeRefresher")); @@ -146,6 +147,7 @@ public ElasticsearchClient( this.ignorePublishAddress = config.isIgnorePublishAddress(); this.scrollSize = config.getScrollSize(); this.scrollTimeout = config.getScrollTimeout(); + this.requestTimeout = config.getRequestTimeout(); this.refreshInterval = config.getNodeRefreshInterval(); this.tlsEnabled = config.isTlsEnabled(); } @@ -580,7 +582,8 @@ public String executeQuery(String index, String query) public SearchResponse beginSearch(String index, int shard, QueryBuilder query, Optional> fields, List documentFields, Optional sort, OptionalLong limit) { SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource() - .query(query); + .query(query) + .timeout(new TimeValue(requestTimeout.toMillis())); if (limit.isPresent() && limit.getAsLong() < scrollSize) { // Safe to cast it to int because scrollSize is int. @@ -612,7 +615,11 @@ public SearchResponse beginSearch(String index, int shard, QueryBuilder query, O long start = System.nanoTime(); try { - return client.search(request); + SearchResponse response = client.search(request); + if (response.isTimedOut()) { + throw new TrinoException(ELASTICSEARCH_CONNECTION_ERROR, "Elasticsearch query timed out"); + } + return response; } catch (IOException e) { throw new TrinoException(ELASTICSEARCH_CONNECTION_ERROR, e); @@ -642,7 +649,11 @@ public SearchResponse nextPage(String scrollId) long start = System.nanoTime(); try { - return client.searchScroll(request); + SearchResponse response = client.searchScroll(request); + if (response.isTimedOut()) { + throw new TrinoException(ELASTICSEARCH_CONNECTION_ERROR, "Elasticsearch query timed out"); + } + return response; } catch (IOException e) { throw new TrinoException(ELASTICSEARCH_CONNECTION_ERROR, e);