diff --git a/CHANGELOG.md b/CHANGELOG.md index f85eaecf7..553b0ca90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,8 +21,10 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Support Jackson 3.x release line ([#1810](https://github.com/opensearch-project/opensearch-java/pull/1810)) - Added `equals()` and `hashCode()` implementations to `FieldValue` ([#1998](https://github.com/opensearch-project/opensearch-java/pull/1998)) - Add document lifecycle guide and runnable sample ([#2017](https://github.com/opensearch-project/opensearch-java/pull/2017)) +- Add configurable I/O reactor rebuild back-off to `ApacheHttpClient5Transport` ([#1969](https://github.com/opensearch-project/opensearch-java/issues/1969)) ### Fixed +- Recover `ApacheHttpClient5Transport` from an I/O reactor shutdown instead of failing every subsequent request permanently ([#1969](https://github.com/opensearch-project/opensearch-java/issues/1969)) ## [Unreleased 3.x] ### Added diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java index 58644b5b6..0a50392ba 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java @@ -41,7 +41,13 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.LongSupplier; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.zip.GZIPOutputStream; import javax.annotation.Nullable; @@ -78,7 +84,10 @@ import org.apache.hc.core5.http.message.StatusLine; import org.apache.hc.core5.http.nio.AsyncRequestProducer; import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.net.URIBuilder; +import org.apache.hc.core5.reactor.IOReactorShutdownException; +import org.apache.hc.core5.reactor.IOReactorStatus; import org.apache.hc.core5.util.Args; import org.opensearch.client.json.JsonpDeserializer; import org.opensearch.client.json.JsonpMapper; @@ -107,8 +116,29 @@ public class ApacheHttpClient5Transport implements OpenSearchTransport { private static final Log logger = LogFactory.getLog(ApacheHttpClient5Transport.class); static final ContentType JsonContentType = ContentType.APPLICATION_JSON; + /** + * Default minimum delay between two consecutive attempts to rebuild the underlying client after an I/O reactor + * shutdown. The effective delay grows exponentially (up to {@link #MAX_REBUILD_BACKOFF_NANOS}) while the reactor + * keeps dying, and resets once it has been stable. + */ + static final long DEFAULT_REBUILD_BACKOFF_MILLIS = 1000L; + + /** Upper bound for the exponentially-growing rebuild back-off. */ + private static final long MAX_REBUILD_BACKOFF_NANOS = 30_000L * 1_000_000L; + private final JsonpMapper mapper; - private final CloseableHttpAsyncClient client; + private final AtomicReference clientRef; + @Nullable + private final Supplier clientFactory; + private final ReentrantLock rebuildLock = new ReentrantLock(); + private final AtomicBoolean closed = new AtomicBoolean(false); + private final long rebuildBackoffBaseNanos; + // The following three fields are guarded by rebuildLock. + private boolean rebuiltOnce = false; + private long lastRebuildNanos = 0L; + private long currentBackoffNanos = 0L; + // Monotonic nanosecond clock; overridable for deterministic back-off testing. + private volatile LongSupplier nanoClock = System::nanoTime; private final ApacheHttpClient5Options transportOptions; private final ConcurrentMap denylist = new ConcurrentHashMap<>(); private final AtomicInteger lastNodeIndex = new AtomicInteger(0); @@ -133,9 +163,56 @@ public ApacheHttpClient5Transport( final boolean strictDeprecationMode, final boolean compressionEnabled, final boolean chunkedEnabled + ) { + this( + null, + DEFAULT_REBUILD_BACKOFF_MILLIS, + client, + defaultHeaders, + nodes, + mapper, + options, + pathPrefix, + failureListener, + nodeSelector, + strictDeprecationMode, + compressionEnabled, + chunkedEnabled + ); + } + + /** + * Creates a transport that can rebuild its underlying {@link CloseableHttpAsyncClient} if the I/O reactor is shut + * down, so the transport can recover without being recreated by the caller. See + * opensearch-java#1969. + * + * @param clientFactory factory used to (re)build and start the async client, or {@code null} to disable recovery + * (used when the client lifecycle is owned externally) + * @param rebuildBackoffMillis base back-off, in milliseconds, between client rebuilds (grows exponentially while + * the reactor keeps dying); a value {@code <= 0} disables the back-off + * @param client the initial, already-started async client + */ + ApacheHttpClient5Transport( + @Nullable final Supplier clientFactory, + final long rebuildBackoffMillis, + final CloseableHttpAsyncClient client, + final Header[] defaultHeaders, + final List nodes, + final JsonpMapper mapper, + @Nullable TransportOptions options, + final String pathPrefix, + final FailureListener failureListener, + final NodeSelector nodeSelector, + final boolean strictDeprecationMode, + final boolean compressionEnabled, + final boolean chunkedEnabled ) { this.mapper = mapper; - this.client = client; + this.clientFactory = clientFactory; + final long backoffMillis = Math.max(0L, rebuildBackoffMillis); + // Clamp to avoid overflow when converting to nanoseconds for absurdly large inputs. + this.rebuildBackoffBaseNanos = Math.min(backoffMillis, Long.MAX_VALUE / 1_000_000L) * 1_000_000L; + this.clientRef = new AtomicReference<>(client); this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders)); this.pathPrefix = pathPrefix; this.transportOptions = (options == null) ? ApacheHttpClient5Options.initialOptions() : ApacheHttpClient5Options.of(options); @@ -208,7 +285,14 @@ public TransportOptions options() { @Override public void close() throws IOException { - client.close(); + rebuildLock.lock(); + try { + if (closed.compareAndSet(false, true)) { + clientRef.get().close(); + } + } finally { + rebuildLock.unlock(); + } } private void performRequestAsync( @@ -218,61 +302,246 @@ private void performRequestAsync( final WarningsHandler warningsHandler, final CompletableFuture listener ) { - final RequestContext context = createContextForNextAttempt(options, request, nodeTuple.nodes.next(), nodeTuple.authCache); - Future future = client.execute( - context.requestProducer, - context.asyncResponseConsumer, - context.context, - new FutureCallback() { - @Override - public void completed(ClassicHttpResponse httpResponse) { - try { - ResponseOrResponseException responseOrResponseException = convertResponse( - request, - context.node, - httpResponse, - warningsHandler - ); - if (responseOrResponseException.responseException == null) { - listener.complete(responseOrResponseException.response); - } else { - if (nodeTuple.nodes.hasNext()) { - performRequestAsync(nodeTuple, options, request, warningsHandler, listener); - } else { - listener.completeExceptionally(responseOrResponseException.responseException); - } - } - } catch (Exception e) { - listener.completeExceptionally(e); - } - } + final Node node = nodeTuple.nodes.next(); + executeRequestAttempt(nodeTuple, options, request, warningsHandler, listener, node, true); + } - @Override - public void failed(Exception failure) { - try { - onFailure(context.node); + private void executeRequestAttempt( + final NodeTuple> nodeTuple, + final ApacheHttpClient5Options options, + final HttpUriRequestBase request, + final WarningsHandler warningsHandler, + final CompletableFuture listener, + final Node node, + final boolean allowRecovery + ) { + final CloseableHttpAsyncClient client; + final RequestContext context; + if (closed.get()) { + listener.completeExceptionally(new IOException("transport is closed")); + return; + } + client = clientRef.get(); + context = createContextForNextAttempt(options, request, node, nodeTuple.authCache); + final AttemptCallback callback = new AttemptCallback() { + @Override + public void completed(ClassicHttpResponse httpResponse) { + markCallbackOrDependencyClaimed(); + try { + ResponseOrResponseException responseOrResponseException = convertResponse(request, node, httpResponse, warningsHandler); + if (responseOrResponseException.responseException == null) { + listener.complete(responseOrResponseException.response); + } else { if (nodeTuple.nodes.hasNext()) { performRequestAsync(nodeTuple, options, request, warningsHandler, listener); } else { - listener.completeExceptionally(failure); + listener.completeExceptionally(responseOrResponseException.responseException); } - } catch (Exception e) { - listener.completeExceptionally(e); } + } catch (Exception e) { + listener.completeExceptionally(e); } + } - @Override - public void cancelled() { - listener.completeExceptionally(new CancellationException("request was cancelled")); + @Override + public void failed(Exception failure) { + markCallbackOrDependencyClaimed(); + try { + if (isRecoverableReactorFailure(failure, client)) { + retryAfterReactorFailure( + nodeTuple, + options, + request, + warningsHandler, + listener, + node, + client, + failure, + allowRecovery + ); + return; + } + onFailure(node); + if (nodeTuple.nodes.hasNext()) { + performRequestAsync(nodeTuple, options, request, warningsHandler, listener); + } else { + listener.completeExceptionally(failure); + } + } catch (Exception e) { + listener.completeExceptionally(e); } } - ); - if (future instanceof org.apache.hc.core5.concurrent.Cancellable) { + @Override + public void cancelled() { + markCallbackOrDependencyClaimed(); + listener.completeExceptionally(new CancellationException("request was cancelled")); + } + }; + + Future future; + try { + future = client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, callback); + } catch (final RuntimeException runtimeFailure) { + if (isRecoverableReactorFailure(runtimeFailure, client) == false) { + throw runtimeFailure; + } + // The I/O reactor has been shut down. Try to recover by rebuilding the client, then retry this request once + // on the fresh client. + retryAfterReactorFailure(nodeTuple, options, request, warningsHandler, listener, node, client, runtimeFailure, allowRecovery); + return; + } + + if (callback.claimDependency() && future instanceof org.apache.hc.core5.concurrent.Cancellable) { request.setDependency((org.apache.hc.core5.concurrent.Cancellable) future); } } + private void retryAfterReactorFailure( + final NodeTuple> nodeTuple, + final ApacheHttpClient5Options options, + final HttpUriRequestBase request, + final WarningsHandler warningsHandler, + final CompletableFuture listener, + final Node node, + final CloseableHttpAsyncClient deadClient, + final Throwable reactorFailure, + final boolean allowRecovery + ) { + if (closed.get()) { + listener.completeExceptionally(new IOException("transport is closed")); + return; + } + if (!allowRecovery) { + listener.completeExceptionally(new IOException("I/O reactor has been shut down", reactorFailure)); + return; + } + final CloseableHttpAsyncClient recovered = recoverClient(deadClient); + if (recovered == null || recovered == deadClient) { + listener.completeExceptionally( + new IOException("I/O reactor has been shut down and the transport could not be recovered", reactorFailure) + ); + return; + } + executeRequestAttempt(nodeTuple, options, request, warningsHandler, listener, node, false); + } + + /** + * Attempts to recover from a shut-down I/O reactor by rebuilding the underlying async client. + *

+ * Recovery is guarded by a circuit breaker based on {@link #DEFAULT_REBUILD_BACKOFF_MILLIS} so that a reactor which + * keeps dying under a repeated failure condition does not trigger a rebuild storm that would amplify the failure. + * Only one thread rebuilds at a time; concurrent callers that observe the already-rebuilt client reuse it. + * + * @param deadClient the client whose reactor was detected as shut down + * @return a healthy client to retry with, or {@code null} if recovery is disabled, throttled, or itself failed + */ + private CloseableHttpAsyncClient recoverClient(final CloseableHttpAsyncClient deadClient) { + if (clientFactory == null) { + // The client was supplied externally and its lifecycle is not owned by this transport: cannot rebuild. + return null; + } + CloseableHttpAsyncClient rebuilt = null; + rebuildLock.lock(); + try { + if (closed.get()) { + return null; + } + final CloseableHttpAsyncClient current = clientRef.get(); + if (current != deadClient) { + // Another thread has already rebuilt the client; reuse it. + return current; + } + + final long now = nanoClock.getAsLong(); + if (rebuiltOnce) { + final long elapsed = now - lastRebuildNanos; + if (elapsed < currentBackoffNanos) { + // Within the back-off window: fail fast instead of thrashing. + return null; + } + // Grow the back-off while the reactor keeps dying soon after a rebuild; reset once it looks stable. + if (elapsed > currentBackoffNanos * 4) { + currentBackoffNanos = rebuildBackoffBaseNanos; + } else { + currentBackoffNanos = Math.min(currentBackoffNanos * 2, MAX_REBUILD_BACKOFF_NANOS); + } + } else { + currentBackoffNanos = rebuildBackoffBaseNanos; + } + rebuiltOnce = true; + lastRebuildNanos = now; + + rebuilt = clientFactory.get(); + clientRef.set(rebuilt); + logger.warn("Apache HttpClient 5 I/O reactor was shut down; the transport client has been rebuilt to recover"); + } catch (final RuntimeException rebuildFailure) { + logger.error("Failed to rebuild Apache HttpClient 5 transport after I/O reactor shutdown", rebuildFailure); + return null; + } finally { + rebuildLock.unlock(); + } + // Close the dead client outside the lock, with IMMEDIATE mode, so a (potentially slow) graceful shutdown of a + // dead reactor cannot block the request thread or stall other threads waiting on the rebuild lock. + closeQuietly(deadClient); + return rebuilt; + } + + private static void closeQuietly(final CloseableHttpAsyncClient client) { + try { + client.close(CloseMode.IMMEDIATE); + } catch (final Exception e) { + // Best-effort cleanup of the dead client; the reactor is already down. + } + } + + /** + * Overrides the monotonic clock used to time rebuild back-off. For tests only. + * + * @param nanoClock a supplier of monotonically non-decreasing nanosecond timestamps + */ + void setNanoClock(final LongSupplier nanoClock) { + this.nanoClock = nanoClock; + } + + private static boolean isRecoverableReactorFailure(final Throwable failure, final CloseableHttpAsyncClient client) { + return isReactorShutdown(failure) || isClientReactorShutDown(client); + } + + private static boolean isReactorShutdown(final Throwable failure) { + Throwable cause = failure; + // Bounded walk of the cause chain, defensive against pathological or cyclic chains. + for (int depth = 0; cause != null && depth < 32; cause = cause.getCause(), depth++) { + if (cause instanceof IOReactorShutdownException) { + return true; + } + } + return false; + } + + private static boolean isClientReactorShutDown(final CloseableHttpAsyncClient client) { + try { + return client.getStatus() == IOReactorStatus.SHUT_DOWN; + } catch (final RuntimeException statusFailure) { + return false; + } + } + + private abstract static class AttemptCallback implements FutureCallback { + private static final AtomicIntegerFieldUpdater CALLBACK_OR_DEPENDENCY_CLAIMED = AtomicIntegerFieldUpdater + .newUpdater(AttemptCallback.class, "callbackOrDependencyClaimed"); + + private volatile int callbackOrDependencyClaimed; + + final void markCallbackOrDependencyClaimed() { + callbackOrDependencyClaimed = 1; + } + + final boolean claimDependency() { + return CALLBACK_OR_DEPENDENCY_CLAIMED.compareAndSet(this, 0, 1); + } + } + /** * Replaces the nodes with which the client communicates. * diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java index 96bf6edf0..65552089e 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java @@ -15,6 +15,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.function.Supplier; import java.util.stream.Collectors; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; @@ -33,6 +34,7 @@ import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.reactor.ssl.TlsDetails; import org.apache.hc.core5.util.Timeout; import org.opensearch.client.RestClient; @@ -79,6 +81,7 @@ public class ApacheHttpClient5TransportBuilder { private Optional chunkedEnabled; private JsonpMapper mapper; private TransportOptions options; + private Long reactorRebuildBackoffMillis = null; /** * Creates a new builder instance and sets the hosts that the client will send requests to. @@ -200,6 +203,23 @@ public ApacheHttpClient5TransportBuilder setOptions(TransportOptions options) { return this; } + /** + * Sets the base back-off, in milliseconds, between attempts to rebuild the underlying async client after the + * I/O reactor has been shut down (see + * opensearch-java#1969). The + * effective delay grows exponentially while the reactor keeps dying and resets once it has been stable, acting as + * a circuit breaker against rebuild storms. + *

+ * Defaults to {@value ApacheHttpClient5Transport#DEFAULT_REBUILD_BACKOFF_MILLIS} ms. A value {@code <= 0} disables + * the back-off (a rebuild is attempted on every detected shutdown). + * + * @param reactorRebuildBackoffMillis the base back-off in milliseconds + */ + public ApacheHttpClient5TransportBuilder setReactorRebuildBackoffMillis(long reactorRebuildBackoffMillis) { + this.reactorRebuildBackoffMillis = reactorRebuildBackoffMillis; + return this; + } + /** * Cleans up the given path prefix to ensure that looks like "/base/path". * @@ -282,30 +302,61 @@ public ApacheHttpClient5Transport build() { if (failureListener == null) { failureListener = new ApacheHttpClient5Transport.FailureListener(); } - CloseableHttpAsyncClient httpClient = AccessController.doPrivileged( - (PrivilegedAction) this::createHttpClient - ); + + final RequestConfigCallback requestConfigCallbackSnapshot = requestConfigCallback; + final ConnectionConfigCallback connectionConfigCallbackSnapshot = connectionConfigCallback; + final HttpClientConfigCallback httpClientConfigCallbackSnapshot = httpClientConfigCallback; + + // Factory that builds AND starts a fresh async client. It is used for the initial client and to rebuild it if + // the I/O reactor is shut down, allowing the transport to recover without being recreated by the caller. + // See https://github.com/opensearch-project/opensearch-java/issues/1969. + final Supplier clientFactory = () -> { + final CloseableHttpAsyncClient client = AccessController.doPrivileged( + (PrivilegedAction) () -> createHttpClient( + requestConfigCallbackSnapshot, + connectionConfigCallbackSnapshot, + httpClientConfigCallbackSnapshot + ) + ); + client.start(); + return client; + }; + + final CloseableHttpAsyncClient httpClient = clientFactory.get(); if (mapper == null) { mapper = new JacksonJsonpMapper(); } - final ApacheHttpClient5Transport transport = new ApacheHttpClient5Transport( - httpClient, - defaultHeaders, - nodes, - mapper, - options, - pathPrefix, - failureListener, - nodeSelector, - strictDeprecationMode, - compressionEnabled, - chunkedEnabled.orElse(false) - ); - - httpClient.start(); - return transport; + final long rebuildBackoffMillis = (reactorRebuildBackoffMillis != null) + ? reactorRebuildBackoffMillis + : ApacheHttpClient5Transport.DEFAULT_REBUILD_BACKOFF_MILLIS; + + try { + return new ApacheHttpClient5Transport( + clientFactory, + rebuildBackoffMillis, + httpClient, + defaultHeaders, + nodes, + mapper, + options, + pathPrefix, + failureListener, + nodeSelector, + strictDeprecationMode, + compressionEnabled, + chunkedEnabled.orElse(false) + ); + } catch (final RuntimeException | Error e) { + // The client has already been started; avoid leaking its reactor threads if construction fails. + try { + httpClient.close(CloseMode.IMMEDIATE); + } catch (final Exception suppressed) { + e.addSuppressed(suppressed); + } + throw e; + } } /** @@ -339,7 +390,11 @@ public static ApacheHttpClient5TransportBuilder builder(HttpHost... hosts) { return new ApacheHttpClient5TransportBuilder(nodes); } - private CloseableHttpAsyncClient createHttpClient() { + private CloseableHttpAsyncClient createHttpClient( + RequestConfigCallback requestConfigCallback, + ConnectionConfigCallback connectionConfigCallback, + HttpClientConfigCallback httpClientConfigCallback + ) { // default timeouts are all infinite RequestConfig.Builder requestConfigBuilder = RequestConfig.custom(); diff --git a/java-client/src/test/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportRecoveryTest.java b/java-client/src/test/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportRecoveryTest.java new file mode 100644 index 000000000..594e16106 --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportRecoveryTest.java @@ -0,0 +1,470 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.httpclient5; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.message.BasicClassicHttpResponse; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.apache.hc.core5.http.nio.HandlerFactory; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.IOReactorShutdownException; +import org.apache.hc.core5.reactor.IOReactorStatus; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; +import org.junit.Test; +import org.opensearch.client.json.jackson3.JacksonJsonpMapper; +import org.opensearch.client.transport.endpoints.BooleanEndpoint; +import org.opensearch.client.transport.endpoints.BooleanResponse; +import org.opensearch.client.transport.httpclient5.internal.Node; + +/** + * Unit tests for the self-healing recovery of {@link ApacheHttpClient5Transport} (see opensearch-java#1969). + */ +public class ApacheHttpClient5TransportRecoveryTest { + + private static final HttpHost HOST_A = new HttpHost("localhost", 9200); + + // --- Reactor recovery ----------------------------------------------------------------------------------------- + + @Test + public void testRecoversByRebuildingClientAfterReactorShutdown() throws IOException { + FakeAsyncClient dead = new FakeAsyncClient(true); + FakeAsyncClient healthy = new FakeAsyncClient(false); + AtomicInteger factoryCalls = new AtomicInteger(); + Supplier factory = () -> { + factoryCalls.incrementAndGet(); + return healthy; + }; + + ApacheHttpClient5Transport transport = newTransport(factory, dead); + + // The dead reactor is detected, the client is rebuilt, and the request is retried on the healthy client. + BooleanResponse response = transport.performRequest(null, headEndpoint(), null); + + assertTrue("request should succeed after recovery", response.value()); + assertEquals("client should be rebuilt exactly once", 1, factoryCalls.get()); + assertTrue("dead client should be closed during recovery", dead.closed); + } + + @Test + public void testRecoversWhenReactorShutdownIsReportedToCallback() throws IOException { + FakeAsyncClient dead = FakeAsyncClient.reactorShutdownViaCallback(); + FakeAsyncClient healthy = new FakeAsyncClient(false); + AtomicInteger factoryCalls = new AtomicInteger(); + Supplier factory = () -> { + factoryCalls.incrementAndGet(); + return healthy; + }; + + ApacheHttpClient5Transport transport = newTransport(factory, dead); + + // The real HC5 client catches IOReactorShutdownException as an IllegalStateException and reports it to the + // FutureCallback. Recovery must therefore happen from failed(...), not only from a synchronous execute() throw. + BooleanResponse response = transport.performRequest(null, headEndpoint(), null); + + assertTrue("request should succeed after callback-delivered recovery", response.value()); + assertEquals("client should be rebuilt exactly once", 1, factoryCalls.get()); + assertEquals("dead client should receive the original request once", 1, dead.executeCount.get()); + assertEquals("healthy client should receive the recovered retry once", 1, healthy.executeCount.get()); + assertTrue("dead client should be closed during recovery", dead.closed); + } + + @Test + public void testRecoversWhenShutdownClientThrowsCancellationException() throws IOException { + FakeAsyncClient dead = FakeAsyncClient.reactorShutdownAsCancellationException(); + FakeAsyncClient healthy = new FakeAsyncClient(false); + AtomicInteger factoryCalls = new AtomicInteger(); + Supplier factory = () -> { + factoryCalls.incrementAndGet(); + return healthy; + }; + + ApacheHttpClient5Transport transport = newTransport(factory, dead); + + BooleanResponse response = transport.performRequest(null, headEndpoint(), null); + + assertTrue("request should succeed after recovering from a stopped client", response.value()); + assertEquals("client should be rebuilt exactly once", 1, factoryCalls.get()); + assertTrue("dead client should be closed during recovery", dead.closed); + } + + @Test + public void testRecoversWhenShutdownClientReportsIllegalStateWithoutCause() throws IOException { + FakeAsyncClient dead = FakeAsyncClient.reactorShutdownAsIllegalStateCallback(); + FakeAsyncClient healthy = new FakeAsyncClient(false); + AtomicInteger factoryCalls = new AtomicInteger(); + Supplier factory = () -> { + factoryCalls.incrementAndGet(); + return healthy; + }; + + ApacheHttpClient5Transport transport = newTransport(factory, dead); + + BooleanResponse response = transport.performRequest(null, headEndpoint(), null); + + assertTrue("request should succeed after recovering from a stopped client callback", response.value()); + assertEquals("client should be rebuilt exactly once", 1, factoryCalls.get()); + assertTrue("dead client should be closed during recovery", dead.closed); + } + + @Test + public void testDoesNotRecoverActiveClientCancellation() { + FakeAsyncClient cancelled = FakeAsyncClient.activeCancellation(); + AtomicInteger factoryCalls = new AtomicInteger(); + Supplier factory = () -> { + factoryCalls.incrementAndGet(); + return new FakeAsyncClient(false); + }; + + ApacheHttpClient5Transport transport = newTransport(factory, cancelled); + + assertThrows(RuntimeException.class, () -> transport.performRequest(null, headEndpoint(), null)); + assertEquals("active-client cancellation must not trigger a reactor rebuild", 0, factoryCalls.get()); + assertFalse("active client should not be closed as a dead reactor", cancelled.closed); + } + + @Test + public void testResponseConsumerFactoryRunsOutsideRebuildLock() throws IOException { + AtomicReference transportRef = new AtomicReference<>(); + AtomicReference closeFailure = new AtomicReference<>(); + ApacheHttpClient5Options.Builder options = ApacheHttpClient5Options.DEFAULT.toBuilder(); + options.setHttpAsyncResponseConsumerFactory(() -> { + Thread closer = new Thread(() -> { + try { + transportRef.get().close(); + } catch (IOException e) { + closeFailure.set(e); + } + }, "transport-close-from-response-consumer-factory"); + closer.start(); + try { + closer.join(1000L); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } + assertFalse("response consumer factory must not run while the rebuild lock is held", closer.isAlive()); + if (closeFailure.get() != null) { + throw new AssertionError(closeFailure.get()); + } + return HttpAsyncResponseConsumerFactory.DEFAULT.createHttpAsyncResponseConsumer(); + }); + ApacheHttpClient5Transport transport = newTransport( + null, + new FakeAsyncClient(false), + 60_000L, + Collections.singletonList(new Node(HOST_A)), + options.build() + ); + transportRef.set(transport); + + BooleanResponse response = transport.performRequest(null, headEndpoint(), null); + + assertTrue("request should still complete in the fake client", response.value()); + } + + @Test + public void testBuilderCreatedTransportRebuildsClosedClient() throws Exception { + ApacheHttpClient5Transport transport = ApacheHttpClient5TransportBuilder.builder(new HttpHost("localhost", 1)) + .setReactorRebuildBackoffMillis(0L) + .setRequestConfigCallback( + requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(Timeout.ofMilliseconds(100L)) + .setResponseTimeout(Timeout.ofMilliseconds(100L)) + ) + .build(); + + try { + AtomicReference clientRef = clientRef(transport); + CloseableHttpAsyncClient initialClient = clientRef.get(); + initialClient.close(CloseMode.IMMEDIATE); + + assertThrows(IOException.class, () -> transport.performRequest(null, headEndpoint(), null)); + + assertNotSame("builder-created transport should rebuild the closed async client", initialClient, clientRef.get()); + } finally { + transport.close(); + } + } + + @Test + public void testRebuildIsCircuitBroken() { + FakeAsyncClient dead = new FakeAsyncClient(true); + AtomicInteger factoryCalls = new AtomicInteger(); + // Every rebuilt client is also dead, simulating sustained pressure that keeps killing the reactor. + Supplier factory = () -> { + factoryCalls.incrementAndGet(); + return new FakeAsyncClient(true); + }; + + ApacheHttpClient5Transport transport = newTransport(factory, dead); + + // Two requests in quick succession (well within the back-off window). + assertThrows(IOException.class, () -> transport.performRequest(null, headEndpoint(), null)); + assertThrows(IOException.class, () -> transport.performRequest(null, headEndpoint(), null)); + + // The breaker must allow only a single rebuild attempt within the back-off window. + assertEquals("circuit breaker should permit only one rebuild within the back-off window", 1, factoryCalls.get()); + } + + @Test + public void testRecoveryDisabledWhenNoFactory() { + FakeAsyncClient dead = new FakeAsyncClient(true); + + // No factory => client lifecycle owned externally => recovery disabled, request just fails. + ApacheHttpClient5Transport transport = newTransport(null, dead); + + assertThrows(IOException.class, () -> transport.performRequest(null, headEndpoint(), null)); + assertFalse("externally-owned client must not be closed by the transport", dead.closed); + } + + @Test + public void testClosedTransportDoesNotRecover() throws IOException { + FakeAsyncClient initial = FakeAsyncClient.reactorShutdownViaCallback(); + AtomicInteger factoryCalls = new AtomicInteger(); + Supplier factory = () -> { + factoryCalls.incrementAndGet(); + return new FakeAsyncClient(false); + }; + + ApacheHttpClient5Transport transport = newTransport(factory, initial); + transport.close(); + + assertThrows(IOException.class, () -> transport.performRequest(null, headEndpoint(), null)); + assertTrue("close should close the current client", initial.closed); + assertEquals("closed transport must not execute requests", 0, initial.executeCount.get()); + assertEquals("closed transport must not rebuild a new client", 0, factoryCalls.get()); + } + + @Test + public void testRebuildBackoffGrowsExponentially() { + AtomicLong clock = new AtomicLong(0L); + AtomicInteger factoryCalls = new AtomicInteger(); + Supplier factory = () -> { + factoryCalls.incrementAndGet(); + return new FakeAsyncClient(true); // rebuilt clients are also dead (sustained failure) + }; + // Base back-off 1000 ms. + ApacheHttpClient5Transport transport = newTransport( + factory, + new FakeAsyncClient(true), + 1000L, + Collections.singletonList(new Node(HOST_A)) + ); + transport.setNanoClock(clock::get); + + final long ms = 1_000_000L; + + // t=0: first detected shutdown -> rebuild #1, back-off = 1000 ms. + assertThrows(IOException.class, () -> transport.performRequest(null, headEndpoint(), null)); + assertEquals(1, factoryCalls.get()); + + // Still within 1000 ms of the last rebuild -> no rebuild. + clock.set(900L * ms); + assertThrows(IOException.class, () -> transport.performRequest(null, headEndpoint(), null)); + assertEquals(1, factoryCalls.get()); + + // Past 1000 ms -> rebuild #2, back-off doubles to 2000 ms. + clock.set(1_100L * ms); + assertThrows(IOException.class, () -> transport.performRequest(null, headEndpoint(), null)); + assertEquals(2, factoryCalls.get()); + + // Only 1000 ms after the last rebuild (< 2000 ms window) -> no rebuild. + clock.set(2_100L * ms); + assertThrows(IOException.class, () -> transport.performRequest(null, headEndpoint(), null)); + assertEquals(2, factoryCalls.get()); + + // Past the 2000 ms window -> rebuild #3. + clock.set(3_200L * ms); + assertThrows(IOException.class, () -> transport.performRequest(null, headEndpoint(), null)); + assertEquals(3, factoryCalls.get()); + } + + // --- helpers -------------------------------------------------------------------------------------------------- + + private static ApacheHttpClient5Transport newTransport(Supplier factory, CloseableHttpAsyncClient initial) { + return newTransport(factory, initial, 60_000L, Collections.singletonList(new Node(HOST_A))); + } + + private static ApacheHttpClient5Transport newTransport( + Supplier factory, + CloseableHttpAsyncClient initial, + long backoffMillis, + List nodes + ) { + return newTransport(factory, initial, backoffMillis, nodes, null); + } + + private static ApacheHttpClient5Transport newTransport( + Supplier factory, + CloseableHttpAsyncClient initial, + long backoffMillis, + List nodes, + ApacheHttpClient5Options options + ) { + return new ApacheHttpClient5Transport( + factory, + backoffMillis, + initial, + new Header[0], + nodes, + new JacksonJsonpMapper(), + options, + null, + null, + null, + false, + false, + false + ); + } + + @SuppressWarnings("unchecked") + private static AtomicReference clientRef(ApacheHttpClient5Transport transport) throws NoSuchFieldException, + IllegalAccessException { + Field clientRefField = ApacheHttpClient5Transport.class.getDeclaredField("clientRef"); + clientRefField.setAccessible(true); + return (AtomicReference) clientRefField.get(transport); + } + + private static BooleanEndpoint headEndpoint() { + return new BooleanEndpoint<>(v -> "HEAD", v -> "/", v -> Collections.emptyMap(), v -> Collections.emptyMap()); + } + + /** + * Minimal {@link CloseableHttpAsyncClient}. It models the reactor shutdown failure shapes observed around the real + * HC5 scheduling path, or completes with an empty {@code 200} response. + */ + private static final class FakeAsyncClient extends CloseableHttpAsyncClient { + private enum FailureMode { + NONE, + IO_REACTOR_THROWN, + IO_REACTOR_CALLBACK, + CANCELLATION_THROWN, + ILLEGAL_STATE_CALLBACK, + ACTIVE_CANCELLATION + } + + private final FailureMode failureMode; + final AtomicInteger executeCount = new AtomicInteger(); + volatile boolean closed = false; + + FakeAsyncClient(boolean reactorDown) { + this(reactorDown ? FailureMode.IO_REACTOR_THROWN : FailureMode.NONE); + } + + private FakeAsyncClient(FailureMode failureMode) { + this.failureMode = failureMode; + } + + static FakeAsyncClient reactorShutdownViaCallback() { + return new FakeAsyncClient(FailureMode.IO_REACTOR_CALLBACK); + } + + static FakeAsyncClient reactorShutdownAsCancellationException() { + return new FakeAsyncClient(FailureMode.CANCELLATION_THROWN); + } + + static FakeAsyncClient reactorShutdownAsIllegalStateCallback() { + return new FakeAsyncClient(FailureMode.ILLEGAL_STATE_CALLBACK); + } + + static FakeAsyncClient activeCancellation() { + return new FakeAsyncClient(FailureMode.ACTIVE_CANCELLATION); + } + + @Override + protected Future doExecute( + HttpHost target, + AsyncRequestProducer requestProducer, + AsyncResponseConsumer responseConsumer, + HandlerFactory pushHandlerFactory, + HttpContext context, + FutureCallback callback + ) { + executeCount.incrementAndGet(); + switch (failureMode) { + case IO_REACTOR_THROWN: + throw new IOReactorShutdownException("I/O reactor has been shut down"); + case IO_REACTOR_CALLBACK: + if (callback != null) { + callback.failed(new IOReactorShutdownException("I/O reactor has been shut down")); + } + return CompletableFuture.completedFuture(null); + case CANCELLATION_THROWN: + case ACTIVE_CANCELLATION: + throw new CancellationException("Request execution cancelled"); + case ILLEGAL_STATE_CALLBACK: + if (callback != null) { + callback.failed(new IllegalStateException("Request execution cancelled")); + } + return CompletableFuture.completedFuture(null); + case NONE: + break; + } + @SuppressWarnings("unchecked") + T response = (T) new BasicClassicHttpResponse(200); + if (callback != null) { + callback.completed(response); + } + return CompletableFuture.completedFuture(response); + } + + @Override + public void start() {} + + @Override + public IOReactorStatus getStatus() { + return failureMode == FailureMode.NONE || failureMode == FailureMode.ACTIVE_CANCELLATION + ? IOReactorStatus.ACTIVE + : IOReactorStatus.SHUT_DOWN; + } + + @Override + public void awaitShutdown(TimeValue waitTime) {} + + @Override + public void initiateShutdown() {} + + @Override + public void register(String hostname, String uriPattern, org.apache.hc.core5.function.Supplier supplier) {} + + @Override + public void close(CloseMode closeMode) { + closed = true; + } + + @Override + public void close() { + closed = true; + } + } +}