Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public class ElasticsearchClient
private final BackpressureRestHighLevelClient client;
private final int scrollSize;
private final Duration scrollTimeout;
private final Duration requestTimeout;

private final AtomicReference<Set<ElasticsearchNode>> nodes = new AtomicReference<>(ImmutableSet.of());
private final ScheduledExecutorService executor = newSingleThreadScheduledExecutor(daemonThreadsNamed("NodeRefresher"));
Expand All @@ -146,6 +147,7 @@ public ElasticsearchClient(
this.ignorePublishAddress = config.isIgnorePublishAddress();
this.scrollSize = config.getScrollSize();
this.scrollTimeout = config.getScrollTimeout();
this.requestTimeout = config.getRequestTimeout();
this.refreshInterval = config.getNodeRefreshInterval();
this.tlsEnabled = config.isTlsEnabled();
}
Expand Down Expand Up @@ -580,7 +582,8 @@ public String executeQuery(String index, String query)
public SearchResponse beginSearch(String index, int shard, QueryBuilder query, Optional<List<String>> fields, List<String> documentFields, Optional<String> sort, OptionalLong limit)
{
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource()
.query(query);
.query(query)
.timeout(new TimeValue(requestTimeout.toMillis()));

if (limit.isPresent() && limit.getAsLong() < scrollSize) {
// Safe to cast it to int because scrollSize is int.
Expand Down Expand Up @@ -612,7 +615,11 @@ public SearchResponse beginSearch(String index, int shard, QueryBuilder query, O

long start = System.nanoTime();
try {
return client.search(request);
SearchResponse response = client.search(request);
if (response.isTimedOut()) {
Comment on lines 617 to +619
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Clear scroll when aborting timed-out initial search

If client.search(request) returns timed_out=true, this branch throws before returning the SearchResponse. In the beginSearch path, that happens before ScanQueryPageSource.SearchHitIterator is constructed, so no caller can read the returned _scroll_id and invoke clearScroll. When timed-out scroll searches return a scroll ID, those contexts are left open until elasticsearch.scroll-timeout expires, which can accumulate under repeated timeouts and waste Elasticsearch resources.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codex This is a good remark, but I think it's a separate issue. Whether we want to clear scrolls and when is not specific to the timeout case.

If your human masters want me to specifically add a clearScroll call before the throw here I'll do it though.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw new TrinoException(ELASTICSEARCH_CONNECTION_ERROR, "Elasticsearch query timed out");
}
return response;
}
catch (IOException e) {
throw new TrinoException(ELASTICSEARCH_CONNECTION_ERROR, e);
Expand Down Expand Up @@ -642,7 +649,11 @@ public SearchResponse nextPage(String scrollId)

long start = System.nanoTime();
try {
return client.searchScroll(request);
SearchResponse response = client.searchScroll(request);
if (response.isTimedOut()) {
throw new TrinoException(ELASTICSEARCH_CONNECTION_ERROR, "Elasticsearch query timed out");
}
Comment on lines +653 to +655
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Handle timed-out scroll pages without leaking contexts

Throwing immediately on response.isTimedOut() drops the new scroll ID returned by that page, because SearchHitIterator.reset(...) is only called after nextPage returns (ScanQueryPageSource.computeNext). In clusters where Elasticsearch rotates scroll IDs between pages, close() will clear only the stale prior ID, leaving the timed-out page's scroll context open until elasticsearch.scroll-timeout expires; under repeated timeouts this accumulates server-side resources.

Useful? React with 👍 / 👎.

return response;
}
catch (IOException e) {
throw new TrinoException(ELASTICSEARCH_CONNECTION_ERROR, e);
Expand Down