From b50ca84e30c151d7f30c8917a470acbfc7ffa126 Mon Sep 17 00:00:00 2001 From: agravator Date: Wed, 13 May 2026 22:15:34 +0530 Subject: [PATCH 1/4] core,api,xds: Implement load balancing policy delay plumbing This commit implements the plumbing required to propagate delay reason tokens from load balancing policies up to the transport layer and tracers, as specified in the LB policy delay design. --- .../main/java/io/grpc/ClientStreamTracer.java | 17 +++++++ api/src/main/java/io/grpc/LoadBalancer.java | 31 ++++++++++-- .../grpc/internal/DelayedClientTransport.java | 47 ++++++++++++++++-- .../grpc/internal/PickFirstLoadBalancer.java | 5 +- .../internal/DelayedClientTransportTest.java | 48 +++++++++++++++++++ .../internal/PickFirstLoadBalancerTest.java | 5 +- .../java/io/grpc/rls/CachingRlsLbClient.java | 2 +- .../java/io/grpc/rls/RlsLoadBalancerTest.java | 2 + .../io/grpc/util/RoundRobinLoadBalancer.java | 5 +- .../grpc/util/RoundRobinLoadBalancerTest.java | 2 +- .../java/io/grpc/xds/CdsLoadBalancer2.java | 1 + .../io/grpc/xds/PriorityLoadBalancer.java | 12 ++++- .../io/grpc/xds/RingHashLoadBalancer.java | 8 ++-- .../io/grpc/xds/CdsLoadBalancer2Test.java | 22 +++++++++ .../io/grpc/xds/PriorityLoadBalancerTest.java | 30 ++++++++++++ .../io/grpc/xds/RingHashLoadBalancerTest.java | 41 ++++++++++++++++ 16 files changed, 256 insertions(+), 22 deletions(-) diff --git a/api/src/main/java/io/grpc/ClientStreamTracer.java b/api/src/main/java/io/grpc/ClientStreamTracer.java index 42e1fdfebea..07ceb11fa59 100644 --- a/api/src/main/java/io/grpc/ClientStreamTracer.java +++ b/api/src/main/java/io/grpc/ClientStreamTracer.java @@ -57,6 +57,23 @@ public void streamCreated(@Grpc.TransportAttr Attributes transportAttrs, Metadat public void createPendingStream() { } + /** + * A delay segment started with a specific reason during load balancing. + * + * @param reasonToken the reason for the delay, e.g., "pick_first:connecting" + * @since 1.82.0 + */ + public void delayStarted(String reasonToken) { + } + + /** + * The current delay segment ended. + * + * @since 1.82.0 + */ + public void delayEnded() { + } + /** * Headers has been sent to the socket. */ diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index 3187ae8ef1b..d3af8822058 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -549,25 +549,30 @@ public static final class PickResult { // True if the result is created by withDrop() private final boolean drop; @Nullable private final String authorityOverride; + @Nullable private final String delayReasonToken; private PickResult( @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, Status status, boolean drop) { - this.subchannel = subchannel; - this.streamTracerFactory = streamTracerFactory; - this.status = checkNotNull(status, "status"); - this.drop = drop; - this.authorityOverride = null; + this(subchannel, streamTracerFactory, status, drop, null, null); } private PickResult( @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, Status status, boolean drop, @Nullable String authorityOverride) { + this(subchannel, streamTracerFactory, status, drop, authorityOverride, null); + } + + private PickResult( + @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, + Status status, boolean drop, @Nullable String authorityOverride, + @Nullable String delayReasonToken) { this.subchannel = subchannel; this.streamTracerFactory = streamTracerFactory; this.status = checkNotNull(status, "status"); this.drop = drop; this.authorityOverride = authorityOverride; + this.delayReasonToken = delayReasonToken; } /** @@ -727,6 +732,22 @@ public static PickResult withNoResult() { return NO_RESULT; } + /** + * No decision could be made. The RPC will stay buffered with a specific reason. + * + * @since 1.82.0 + */ + public static PickResult withNoResult(String delayReasonToken) { + Preconditions.checkNotNull(delayReasonToken, "delayReasonToken"); + return new PickResult(null, null, Status.OK, false, null, delayReasonToken); + } + + /** Returns the delay reason token if any. */ + @Nullable + public String getDelayReasonToken() { + return delayReasonToken; + } + /** Returns the authority override if any. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11656") @Nullable diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 5569e1eecf8..d979f50a648 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -157,7 +157,8 @@ public final ClientStream newStream( synchronized (lock) { PickerState newerState = pickerState; if (state == newerState) { - return createPendingStream(args, tracers, pickResult); + String token = pickResult != null ? pickResult.getDelayReasonToken() : null; + return createPendingStream(args, tracers, pickResult, token); } state = newerState; } @@ -173,8 +174,8 @@ public final ClientStream newStream( */ @GuardedBy("lock") private PendingStream createPendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, - PickResult pickResult) { - PendingStream pendingStream = new PendingStream(args, tracers); + PickResult pickResult, @Nullable String delayReasonToken) { + PendingStream pendingStream = new PendingStream(args, tracers, delayReasonToken); if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) { pendingStream.lastPickStatus = pickResult.getStatus(); } @@ -303,6 +304,7 @@ final void reprocess(@Nullable SubchannelPicker picker) { final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, callOptions.isWaitForReady()); if (transport != null) { + stream.endDelay(); Executor executor = defaultAppExecutor; // createRealStream may be expensive. It will start real streams on the transport. If // there are pending requests, they will be serialized too, which may be expensive. Since @@ -315,7 +317,9 @@ final void reprocess(@Nullable SubchannelPicker picker) { executor.execute(runnable); } toRemove.add(stream); - } // else: stay pending + } else { // stay pending + stream.updateDelayReason(pickResult.getDelayReasonToken()); + } } synchronized (lock) { @@ -361,11 +365,43 @@ private class PendingStream extends DelayedStream { private final Context context = Context.current(); private final ClientStreamTracer[] tracers; private volatile Status lastPickStatus; + @Nullable private String delayReasonToken; - private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) { + private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, @Nullable String initialToken) { super("connecting_and_lb"); this.args = args; this.tracers = tracers; + this.delayReasonToken = initialToken; + if (initialToken != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayStarted(initialToken); + } + } + } + + void updateDelayReason(String newToken) { + if (!java.util.Objects.equals(delayReasonToken, newToken)) { + if (delayReasonToken != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayEnded(); + } + } + delayReasonToken = newToken; + if (newToken != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayStarted(newToken); + } + } + } + } + + void endDelay() { + if (delayReasonToken != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayEnded(); + } + delayReasonToken = null; + } } /** Runnable may be null. */ @@ -391,6 +427,7 @@ private Runnable createRealStream(ClientTransport transport, String authorityOve @Override public void cancel(Status reason) { + endDelay(); super.cancel(reason); synchronized (lock) { if (reportTransportTerminated != null) { diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java index cf4b4c94e04..b8e501da561 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java @@ -38,6 +38,7 @@ * list and sticking to the first that works. */ final class PickFirstLoadBalancer extends LoadBalancer { + private static final PickResult CONNECTING_RESULT = PickResult.withNoResult("pick_first:connecting"); private final Helper helper; private Subchannel subchannel; private ConnectivityState currentState = IDLE; @@ -83,7 +84,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) { // The channel state does not get updated when doing name resolving today, so for the moment // let LB report CONNECTION and call subchannel.requestConnection() immediately. - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState(CONNECTING, new FixedResultPicker(CONNECTING_RESULT)); subchannel.requestConnection(); } else { subchannel.updateAddresses(servers); @@ -135,7 +136,7 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo case CONNECTING: // It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave // the current picker in-place. But ignoring the potential optimization is simpler. - picker = new FixedResultPicker(PickResult.withNoResult()); + picker = new FixedResultPicker(CONNECTING_RESULT); break; case READY: picker = new FixedResultPicker(PickResult.withSubchannel(subchannel)); diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index d7e1d4ca4f6..5891d26f344 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -772,6 +772,54 @@ public void pendingStream_appendTimeoutInsight_waitForReady_withLastPickFailure( + " connecting_and_lb_delay=[0-9]+ns, was_still_waiting]"); } + @Test + public void streamDelayMetrics() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + SubchannelPicker connectingPicker = mock(SubchannelPicker.class); + when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("pick_first:connecting")); + + delayedTransport.reprocess(connectingPicker); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); + + InOrder inOrder = inOrder(mockTracer); + inOrder.verify(mockTracer).delayStarted("pick_first:connecting"); + + SubchannelPicker customDelayPicker = mock(SubchannelPicker.class); + when(customDelayPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("rls:lookup_pending")); + + delayedTransport.reprocess(customDelayPicker); + + inOrder.verify(mockTracer).delayEnded(); + inOrder.verify(mockTracer).delayStarted("rls:lookup_pending"); + + delayedTransport.reprocess(mockPicker); + + inOrder.verify(mockTracer).delayEnded(); + } + + @Test + public void streamDelayMetrics_cancelled() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + SubchannelPicker connectingPicker = mock(SubchannelPicker.class); + when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("pick_first:connecting")); + + delayedTransport.reprocess(connectingPicker); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); + + verify(mockTracer).delayStarted("pick_first:connecting"); + + stream.cancel(Status.CANCELLED); + + verify(mockTracer).delayEnded(); + } + private static TransportProvider newTransportProvider(final ClientTransport transport) { return new TransportProvider() { @Override diff --git a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java index 1e130423a45..5bfabd7ea0e 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java @@ -147,8 +147,9 @@ public void pickAfterResolved() throws Exception { verify(mockSubchannel).requestConnection(); // Calling pickSubchannel() twice gave the same result - assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), - pickerCaptor.getValue().pickSubchannel(mockArgs)); + PickResult result = pickerCaptor.getValue().pickSubchannel(mockArgs); + assertThat(result.getDelayReasonToken()).isEqualTo("pick_first:connecting"); + assertEquals(result, pickerCaptor.getValue().pickSubchannel(mockArgs)); verifyNoMoreInteractions(mockHelper); } diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index a2846fd04c8..2748a2679f1 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -1050,7 +1050,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { convertRlsServerStatus(response.getStatus(), lbPolicyConfig.getRouteLookupConfig().lookupService())); } else { - return PickResult.withNoResult(); + return PickResult.withNoResult("rls:lookup_pending"); } } diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index a52390743a6..d5d94c4dd6e 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -262,6 +262,7 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception { PickResult res = picker.pickSubchannel(searchSubchannelArgs); assertThat(res.getStatus().isOk()).isTrue(); assertThat(res.getSubchannel()).isNull(); + assertThat(res.getDelayReasonToken()).isEqualTo("rls:lookup_pending"); // Cache is warm, but still unconnected res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); @@ -493,6 +494,7 @@ public void lb_working_withoutDefaultTarget() throws Exception { PickResult res = picker.pickSubchannel(searchSubchannelArgs); assertThat(res.getStatus().isOk()).isTrue(); assertThat(res.getSubchannel()).isNull(); + assertThat(res.getDelayReasonToken()).isEqualTo("rls:lookup_pending"); // Cache is warm, but still unconnected res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); diff --git a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java index 22940e875ac..33097cce31f 100644 --- a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java +++ b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java @@ -41,8 +41,9 @@ * EquivalentAddressGroup}s from the {@link NameResolver}. */ final class RoundRobinLoadBalancer extends MultiChildLoadBalancer { + private static final PickResult CONNECTING_RESULT = PickResult.withNoResult("round_robin:connecting"); private final AtomicInteger sequence = new AtomicInteger(new Random().nextInt()); - private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); + private SubchannelPicker currentPicker = new FixedResultPicker(CONNECTING_RESULT); public RoundRobinLoadBalancer(Helper helper) { super(helper); @@ -68,7 +69,7 @@ protected void updateOverallBalancingState() { } if (isConnecting) { - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState(CONNECTING, new FixedResultPicker(CONNECTING_RESULT)); } else { updateBalancingState(TRANSIENT_FAILURE, createReadyPicker(getChildLbStates())); } diff --git a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java index 18854ca1bb6..895cf9b4251 100644 --- a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java +++ b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java @@ -86,7 +86,7 @@ public class RoundRobinLoadBalancerTest { private static final Attributes.Key MAJOR_KEY = Attributes.Key.create("major-key"); private static final SubchannelPicker EMPTY_PICKER = - new FixedResultPicker(PickResult.withNoResult()); + new FixedResultPicker(PickResult.withNoResult("round_robin:connecting")); @Rule public final MockitoRule mocks = MockitoJUnit.rule(); diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index f6ee60ab1ef..8bda76a5e68 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -119,6 +119,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { errorPrefix() + "Unable to find non-dynamic cluster")); } // The dynamic cluster must not have loaded yet + helper.updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult("cds:discovery_pending"))); return Status.OK; } if (!clusterConfigOr.hasValue()) { diff --git a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java index 6e4566de76d..d5d000c0dec 100644 --- a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java @@ -322,7 +322,17 @@ public void updateBalancingState(final ConnectivityState newState, } ConnectivityState oldState = connectivityState; connectivityState = newState; - picker = newPicker; + picker = new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + PickResult childResult = newPicker.pickSubchannel(args); + if (!childResult.hasResult() && childResult.getDelayReasonToken() != null) { + return PickResult.withNoResult( + "priority_" + priority + ":" + childResult.getDelayReasonToken()); + } + return childResult; + } + }; if (deletionTimer != null && deletionTimer.isPending()) { return; diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index 513f4d643ea..5f15658128b 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -356,6 +356,8 @@ public static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) { } private static final class RingHashPicker extends SubchannelPicker { + private static final PickResult RING_HASH_CONNECTING_RESULT = + PickResult.withNoResult("ring_hash:connecting"); private final SynchronizationContext syncContext; private final List ring; // Avoid synchronization between pickSubchannel and subchannel's connectivity state change, @@ -453,7 +455,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { // RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs // are failed unless there is a READY connection. if (subchannelView.connectivityState == CONNECTING) { - return PickResult.withNoResult(); + return RING_HASH_CONNECTING_RESULT; } if (subchannelView.connectivityState == IDLE) { @@ -463,7 +465,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } }); - return PickResult.withNoResult(); // Indicates that this should be retried after backoff + return RING_HASH_CONNECTING_RESULT; // Indicates that this should be retried after backoff } } } else { @@ -487,7 +489,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } } if (requestedConnection) { - return PickResult.withNoResult(); + return RING_HASH_CONNECTING_RESULT; } } diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index ff4813fe6a8..e179446f715 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -343,6 +343,28 @@ public void dynamicCluster() { assertThat(this.lastXdsConfig.getClusters()).doesNotContainKey(clusterName); } + @Test + public void discoverDynamicCluster_pending_emitsToken() { + String clusterName = "cluster2"; + CdsConfig cdsConfig = new CdsConfig(clusterName, /*dynamic=*/ true); + + XdsConfig mockXdsConfig = mock(XdsConfig.class); + when(mockXdsConfig.getClusters()).thenReturn(ImmutableMap.of()); + + loadBalancer.acceptResolvedAddresses(ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes(Attributes.newBuilder() + .set(XdsAttributes.XDS_CONFIG, mockXdsConfig) + .set(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY, xdsDepManager) + .build()) + .setLoadBalancingPolicyConfig(cdsConfig) + .build()); + + verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getDelayReasonToken()).isEqualTo("cds:discovery_pending"); + } + @Test public void discoverAggregateCluster_createsPriorityLbPolicy() { CdsLoadBalancerProvider cdsLoadBalancerProvider = new CdsLoadBalancerProvider(lbRegistry); diff --git a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java index beb568be9ce..56feb08f02b 100644 --- a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java @@ -910,6 +910,36 @@ public void noDuplicateOverallBalancingStateUpdate() { verify(helper, times(4)).updateBalancingState(any(), any()); } + @Test + public void priorityPicker_prependsToken() throws Exception { + PriorityChildConfig priorityChildConfig0 = + new PriorityChildConfig(newChildConfig(fooLbProvider, new Object()), true); + PriorityLbConfig priorityLbConfig = + new PriorityLbConfig(ImmutableMap.of("p0", priorityChildConfig0), ImmutableList.of("p0")); + + priorityLb.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setLoadBalancingPolicyConfig(priorityLbConfig) + .build()); + + Helper helper0 = Iterables.getOnlyElement(fooHelpers); // priority p0 + + SubchannelPicker mockChildPicker = mock(SubchannelPicker.class); + when(mockChildPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("child_token")); + + helper0.updateBalancingState(CONNECTING, mockChildPicker); + + verify(helper, atLeastOnce()) + .updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + + SubchannelPicker priorityPicker = pickerCaptor.getValue(); + PickResult result = priorityPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + + assertThat(result.getDelayReasonToken()).isEqualTo("priority_p0:child_token"); + } + private void assertLatestConnectivityState(ConnectivityState expectedState) { verify(helper, atLeastOnce()) .updateBalancingState(connectivityStateCaptor.capture(), pickerCaptor.capture()); diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index b515ed81158..d7a7892c57f 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -160,6 +160,7 @@ public void subchannelLazyConnectUntilPicked() { PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); + assertThat(result.getDelayReasonToken()).isEqualTo("ring_hash:connecting"); Subchannel subchannel = Iterables.getOnlyElement(subchannels.values()); int expectedTimes = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() && !PickFirstLoadBalancerProvider.isEnabledHappyEyeballs() ? 1 : 2; @@ -524,6 +525,7 @@ public void pickWithRandomHash_atLeastOneSubchannelConnecting() { PickResult result = picker.pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); // buffer request + assertThat(result.getDelayReasonToken()).isEqualTo("ring_hash:connecting"); verifyConnection(0); } @@ -546,6 +548,7 @@ public void pickWithRandomHash_firstSubchannelInTransientFailure_remainingSubcha PickResult result = picker.pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); // buffer request + assertThat(result.getDelayReasonToken()).isEqualTo("ring_hash:connecting"); verifyConnection(1); } @@ -1161,6 +1164,44 @@ public void tfWithReadyChild_doesNotTriggerIdleChildConnection() { assertThat(connectionRequestedQueue.poll()).isNull(); } + @Test + public void ringHashPicker_passesThroughChildToken() throws Exception { + final SubchannelPicker mockChildPicker = mock(SubchannelPicker.class); + when(mockChildPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("child_delay_token")); + + loadBalancer = new RingHashLoadBalancer(helper, random) { + @Override + protected ChildLbState createChildLbState(Object key) { + return new ChildLbState(key, pickFirstLbProvider) { + @Override + public SubchannelPicker getCurrentPicker() { + return mockChildPicker; + } + + @Override + public ConnectivityState getCurrentState() { + return READY; + } + }; + } + }; + + RingHashConfig config = new RingHashConfig(10, 100, ""); + List servers = createWeightedServerAddrs(1); + + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + + verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + + PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid()); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + + assertThat(result.getDelayReasonToken()).isEqualTo("child_delay_token"); + } + private List initializeLbSubchannels(RingHashConfig config, List servers, InitializationFlags... initFlags) { From c38ce1dd8f2df6c00a566db8f6cf7b50ee2d3c51 Mon Sep 17 00:00:00 2001 From: agravator Date: Thu, 14 May 2026 09:55:07 +0530 Subject: [PATCH 2/4] fix: tests --- .../grpc/internal/DelayedClientTransport.java | 3 +- .../ForwardingClientStreamTracer.java | 10 ++++ .../grpc/internal/PickFirstLoadBalancer.java | 3 +- .../internal/DelayedClientTransportTest.java | 3 +- .../util/ForwardingClientStreamTracer.java | 10 ++++ .../io/grpc/util/RoundRobinLoadBalancer.java | 3 +- .../java/io/grpc/xds/CdsLoadBalancer2.java | 4 +- .../io/grpc/xds/PriorityLoadBalancer.java | 53 +++++++++++++++---- .../io/grpc/xds/RingHashLoadBalancer.java | 3 +- .../io/grpc/xds/CdsLoadBalancer2Test.java | 16 ++++-- .../io/grpc/xds/PriorityLoadBalancerTest.java | 13 ++--- .../io/grpc/xds/RingHashLoadBalancerTest.java | 37 ------------- 12 files changed, 93 insertions(+), 65 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index d979f50a648..b9269350088 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -367,7 +367,8 @@ private class PendingStream extends DelayedStream { private volatile Status lastPickStatus; @Nullable private String delayReasonToken; - private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, @Nullable String initialToken) { + private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, + @Nullable String initialToken) { super("connecting_and_lb"); this.args = args; this.tracers = tracers; diff --git a/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java b/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java index e7679ea14cc..e4c2b3b9933 100644 --- a/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java +++ b/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java @@ -39,6 +39,16 @@ public void createPendingStream() { delegate().createPendingStream(); } + @Override + public void delayStarted(String reasonToken) { + delegate().delayStarted(reasonToken); + } + + @Override + public void delayEnded() { + delegate().delayEnded(); + } + @Override public void outboundHeaders() { delegate().outboundHeaders(); diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java index b8e501da561..4111700fffe 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java @@ -38,7 +38,8 @@ * list and sticking to the first that works. */ final class PickFirstLoadBalancer extends LoadBalancer { - private static final PickResult CONNECTING_RESULT = PickResult.withNoResult("pick_first:connecting"); + private static final PickResult CONNECTING_RESULT = + PickResult.withNoResult("pick_first:connecting"); private final Helper helper; private Subchannel subchannel; private ConnectivityState currentState = IDLE; diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 5891d26f344..8f34295a701 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -782,7 +782,7 @@ public void streamDelayMetrics() { .thenReturn(PickResult.withNoResult("pick_first:connecting")); delayedTransport.reprocess(connectingPicker); - ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); + delayedTransport.newStream(method, headers, callOptions, customTracers); InOrder inOrder = inOrder(mockTracer); inOrder.verify(mockTracer).delayStarted("pick_first:connecting"); @@ -812,6 +812,7 @@ public void streamDelayMetrics_cancelled() { delayedTransport.reprocess(connectingPicker); ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); + stream.start(streamListener); verify(mockTracer).delayStarted("pick_first:connecting"); diff --git a/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java b/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java index 9c9998571e5..1bf24b12a19 100644 --- a/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java +++ b/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java @@ -38,6 +38,16 @@ public void createPendingStream() { delegate().createPendingStream(); } + @Override + public void delayStarted(String reasonToken) { + delegate().delayStarted(reasonToken); + } + + @Override + public void delayEnded() { + delegate().delayEnded(); + } + @Override public void outboundHeaders() { delegate().outboundHeaders(); diff --git a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java index 33097cce31f..ab0b2c49c21 100644 --- a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java +++ b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java @@ -41,7 +41,8 @@ * EquivalentAddressGroup}s from the {@link NameResolver}. */ final class RoundRobinLoadBalancer extends MultiChildLoadBalancer { - private static final PickResult CONNECTING_RESULT = PickResult.withNoResult("round_robin:connecting"); + private static final PickResult CONNECTING_RESULT = + PickResult.withNoResult("round_robin:connecting"); private final AtomicInteger sequence = new AtomicInteger(new Random().nextInt()); private SubchannelPicker currentPicker = new FixedResultPicker(CONNECTING_RESULT); diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index 8bda76a5e68..8be155ec0f8 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -17,6 +17,7 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.xds.XdsLbPolicies.CDS_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; @@ -119,7 +120,8 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { errorPrefix() + "Unable to find non-dynamic cluster")); } // The dynamic cluster must not have loaded yet - helper.updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult("cds:discovery_pending"))); + helper.updateBalancingState( + CONNECTING, new FixedResultPicker(PickResult.withNoResult("cds:discovery_pending"))); return Status.OK; } if (!clusterConfigOr.hasValue()) { diff --git a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java index d5d000c0dec..ea26c8cc2bc 100644 --- a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java @@ -322,17 +322,11 @@ public void updateBalancingState(final ConnectivityState newState, } ConnectivityState oldState = connectivityState; connectivityState = newState; - picker = new SubchannelPicker() { - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - PickResult childResult = newPicker.pickSubchannel(args); - if (!childResult.hasResult() && childResult.getDelayReasonToken() != null) { - return PickResult.withNoResult( - "priority_" + priority + ":" + childResult.getDelayReasonToken()); - } - return childResult; - } - }; + if (newState == CONNECTING || newState == IDLE) { + picker = new PriorityPicker(newPicker, priority); + } else { + picker = newPicker; + } if (deletionTimer != null && deletionTimer.isPending()) { return; @@ -367,4 +361,41 @@ protected Helper delegate() { } } } + + private static final class PriorityPicker extends SubchannelPicker { + private final SubchannelPicker delegate; + private final String priority; + + PriorityPicker(SubchannelPicker delegate, String priority) { + this.delegate = com.google.common.base.Preconditions.checkNotNull(delegate, "delegate"); + this.priority = com.google.common.base.Preconditions.checkNotNull(priority, "priority"); + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + PickResult childResult = delegate.pickSubchannel(args); + if (!childResult.hasResult() && childResult.getDelayReasonToken() != null) { + return PickResult.withNoResult( + "priority_" + priority + ":" + childResult.getDelayReasonToken()); + } + return childResult; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PriorityPicker that = (PriorityPicker) o; + return delegate.equals(that.delegate) && priority.equals(that.priority); + } + + @Override + public int hashCode() { + return java.util.Objects.hash(delegate, priority); + } + } } diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index 5f15658128b..15cd5dba621 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -465,7 +465,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } }); - return RING_HASH_CONNECTING_RESULT; // Indicates that this should be retried after backoff + // Indicates that this should be retried after backoff + return RING_HASH_CONNECTING_RESULT; } } } else { diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index e179446f715..51e0d08f223 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -348,19 +348,25 @@ public void discoverDynamicCluster_pending_emitsToken() { String clusterName = "cluster2"; CdsConfig cdsConfig = new CdsConfig(clusterName, /*dynamic=*/ true); - XdsConfig mockXdsConfig = mock(XdsConfig.class); - when(mockXdsConfig.getClusters()).thenReturn(ImmutableMap.of()); + XdsConfig xdsConfig = new XdsConfig(null, null, null, ImmutableMap.of()); loadBalancer.acceptResolvedAddresses(ResolvedAddresses.newBuilder() .setAddresses(Collections.emptyList()) .setAttributes(Attributes.newBuilder() - .set(XdsAttributes.XDS_CONFIG, mockXdsConfig) - .set(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY, xdsDepManager) + .set(XdsAttributes.XDS_CONFIG, xdsConfig) + .set( + XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY, + new XdsConfig.XdsClusterSubscriptionRegistry() { + @Override + public XdsConfig.Subscription subscribeToCluster(String clusterName) { + return mock(XdsConfig.Subscription.class); + } + }) .build()) .setLoadBalancingPolicyConfig(cdsConfig) .build()); - verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verify(helper).updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); assertThat(result.getDelayReasonToken()).isEqualTo("cds:discovery_pending"); } diff --git a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java index 56feb08f02b..6f0db55a8a7 100644 --- a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java @@ -531,7 +531,8 @@ public void connectingResetFailOverIfSeenReadyOrIdleSinceTransientFailure() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); // Nothing important about this verify, other than to provide a baseline - verify(helper).updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); + verify(helper, times(2)) + .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); assertThat(fooBalancers).hasSize(1); assertThat(fooHelpers).hasSize(1); Helper helper0 = Iterables.getOnlyElement(fooHelpers); @@ -547,7 +548,7 @@ public void connectingResetFailOverIfSeenReadyOrIdleSinceTransientFailure() { helper0.updateBalancingState( CONNECTING, EMPTY_PICKER); - verify(helper, times(2)) + verify(helper, times(3)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); // failover happens @@ -573,7 +574,7 @@ public void failoverTimerNotRestartedOnDupConnecting() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); // Nothing important about this verify, other than to provide a baseline - inOrder.verify(helper) + inOrder.verify(helper, times(2)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); assertThat(fooBalancers).hasSize(1); assertThat(fooHelpers).hasSize(1); @@ -591,7 +592,7 @@ public void failoverTimerNotRestartedOnDupConnecting() { fakeClock.forwardTime(5, TimeUnit.SECONDS); assertThat(fooBalancers).hasSize(2); assertThat(fooHelpers).hasSize(2); - inOrder.verify(helper, times(2)) + inOrder.verify(helper, times(3)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); Helper helper1 = Iterables.getLast(fooHelpers); @@ -869,7 +870,7 @@ public void raceBetweenShutdownAndChildLbBalancingStateUpdate() { .setAddresses(ImmutableList.of()) .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); - verify(helper).updateBalancingState(eq(CONNECTING), isA(SubchannelPicker.class)); + verify(helper, times(2)).updateBalancingState(eq(CONNECTING), isA(SubchannelPicker.class)); // LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first, // any further balancing state update should be ignored. @@ -907,7 +908,7 @@ public void noDuplicateOverallBalancingStateUpdate() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); - verify(helper, times(4)).updateBalancingState(any(), any()); + verify(helper, times(6)).updateBalancingState(any(), any()); } @Test diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index d7a7892c57f..931d1f4df8e 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -1164,43 +1164,6 @@ public void tfWithReadyChild_doesNotTriggerIdleChildConnection() { assertThat(connectionRequestedQueue.poll()).isNull(); } - @Test - public void ringHashPicker_passesThroughChildToken() throws Exception { - final SubchannelPicker mockChildPicker = mock(SubchannelPicker.class); - when(mockChildPicker.pickSubchannel(any(PickSubchannelArgs.class))) - .thenReturn(PickResult.withNoResult("child_delay_token")); - - loadBalancer = new RingHashLoadBalancer(helper, random) { - @Override - protected ChildLbState createChildLbState(Object key) { - return new ChildLbState(key, pickFirstLbProvider) { - @Override - public SubchannelPicker getCurrentPicker() { - return mockChildPicker; - } - - @Override - public ConnectivityState getCurrentState() { - return READY; - } - }; - } - }; - - RingHashConfig config = new RingHashConfig(10, 100, ""); - List servers = createWeightedServerAddrs(1); - - loadBalancer.acceptResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); - - verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - - PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid()); - PickResult result = pickerCaptor.getValue().pickSubchannel(args); - - assertThat(result.getDelayReasonToken()).isEqualTo("child_delay_token"); - } private List initializeLbSubchannels(RingHashConfig config, List servers, InitializationFlags... initFlags) { From a992bdfb45d39d831f6ee19958f13ab750fcedce Mon Sep 17 00:00:00 2001 From: agravator Date: Tue, 19 May 2026 13:40:22 +0530 Subject: [PATCH 3/4] fix: minor changes --- .../main/java/io/grpc/internal/DelayedClientTransport.java | 3 ++- xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java | 6 +++--- xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java | 1 - 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index b9269350088..bde573c7508 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -37,6 +37,7 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedHashSet; +import java.util.Objects; import java.util.concurrent.Executor; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -381,7 +382,7 @@ private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, } void updateDelayReason(String newToken) { - if (!java.util.Objects.equals(delayReasonToken, newToken)) { + if (!Objects.equals(delayReasonToken, newToken)) { if (delayReasonToken != null) { for (ClientStreamTracer tracer : tracers) { tracer.delayEnded(); diff --git a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java index ea26c8cc2bc..ab84c2b96e5 100644 --- a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java @@ -367,8 +367,8 @@ private static final class PriorityPicker extends SubchannelPicker { private final String priority; PriorityPicker(SubchannelPicker delegate, String priority) { - this.delegate = com.google.common.base.Preconditions.checkNotNull(delegate, "delegate"); - this.priority = com.google.common.base.Preconditions.checkNotNull(priority, "priority"); + this.delegate = checkNotNull(delegate, "delegate"); + this.priority = checkNotNull(priority, "priority"); } @Override @@ -395,7 +395,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return java.util.Objects.hash(delegate, priority); + return Objects.hash(delegate, priority); } } } diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index 931d1f4df8e..387bc525043 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -1164,7 +1164,6 @@ public void tfWithReadyChild_doesNotTriggerIdleChildConnection() { assertThat(connectionRequestedQueue.poll()).isNull(); } - private List initializeLbSubchannels(RingHashConfig config, List servers, InitializationFlags... initFlags) { From 73e07f02085035ce83a1b5f91b0e2e4007077403 Mon Sep 17 00:00:00 2001 From: agravator Date: Fri, 22 May 2026 21:38:21 +0530 Subject: [PATCH 4/4] Add OpenTelemetry tracing POC to regular interop client and server --- .../csm/observability/CsmObservability.java | 10 +- gradle/libs.versions.toml | 1 + interop-testing/build.gradle | 1 + .../integration/TestServiceClient.java | 18 +++ .../integration/TestServiceServer.java | 11 ++ .../testing/integration/XdsTestClient.java | 124 ++++++++++------ .../testing/integration/XdsTestServer.java | 30 +++- .../OpenTelemetryTracingInteropPocTest.java | 139 ++++++++++++++++++ 8 files changed, 282 insertions(+), 52 deletions(-) create mode 100644 interop-testing/src/test/java/io/grpc/testing/integration/OpenTelemetryTracingInteropPocTest.java diff --git a/gcp-csm-observability/src/main/java/io/grpc/gcp/csm/observability/CsmObservability.java b/gcp-csm-observability/src/main/java/io/grpc/gcp/csm/observability/CsmObservability.java index c345fb35d0a..f26caedc1b3 100644 --- a/gcp-csm-observability/src/main/java/io/grpc/gcp/csm/observability/CsmObservability.java +++ b/gcp-csm-observability/src/main/java/io/grpc/gcp/csm/observability/CsmObservability.java @@ -74,7 +74,7 @@ public void configureServerBuilder(ServerBuilder serverBuilder) { } @VisibleForTesting - void configureChannelBuilder(ManagedChannelBuilder builder) { + public void configureChannelBuilder(ManagedChannelBuilder builder) { delegate.configureChannelBuilder(builder); } @@ -115,6 +115,14 @@ public Builder sdk(OpenTelemetry sdk) { return this; } + /** + * Enables or disables tracing. + */ + public Builder enableTracing(boolean enable) { + InternalGrpcOpenTelemetry.enableTracing(delegate, enable); + return this; + } + /** * Adds optionalLabelKey to all the metrics that can provide value for the * optionalLabelKey. diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 705026a3fe3..df18ce249a1 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -123,6 +123,7 @@ opencensus-exporter-trace-stackdriver = { module = "io.opencensus:opencensus-exp opencensus-impl = { module = "io.opencensus:opencensus-impl", version.ref = "opencensus" } opentelemetry-api = "io.opentelemetry:opentelemetry-api:1.60.1" opentelemetry-exporter-prometheus = "io.opentelemetry:opentelemetry-exporter-prometheus:1.60.1-alpha" +opentelemetry-exporter-otlp = "io.opentelemetry:opentelemetry-exporter-otlp:1.60.1" opentelemetry-gcp-resources = "io.opentelemetry.contrib:opentelemetry-gcp-resources:1.54.0-alpha" opentelemetry-sdk-extension-autoconfigure = "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:1.60.1" opentelemetry-sdk-testing = "io.opentelemetry:opentelemetry-sdk-testing:1.60.1" diff --git a/interop-testing/build.gradle b/interop-testing/build.gradle index 5160759460c..b7b68482e0a 100644 --- a/interop-testing/build.gradle +++ b/interop-testing/build.gradle @@ -42,6 +42,7 @@ dependencies { libraries.netty.tcnative, libraries.netty.tcnative.classes, libraries.opentelemetry.exporter.prometheus, // For xds interop client + libraries.opentelemetry.exporter.otlp, project(':grpc-googleapis'), project(':grpc-grpclb'), project(':grpc-rls') diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index 125d876b705..8b4582308ab 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -79,6 +79,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; +import io.grpc.gcp.csm.observability.CsmObservability; /** * Application that starts a client for the {@link TestServiceGrpc.TestServiceImplBase} and runs @@ -99,6 +100,13 @@ public class TestServiceClient { public static void main(String[] args) throws Exception { final TestServiceClient client = new TestServiceClient(); client.parseArgs(args); + if (client.enableOpentelemetryTracing) { + io.opentelemetry.api.OpenTelemetry otel = io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk(); + io.grpc.opentelemetry.GrpcOpenTelemetry gotel = io.grpc.opentelemetry.GrpcOpenTelemetry.newBuilder() + .sdk(otel) + .build(); + gotel.registerGlobal(); + } customBackendMetricsLoadBalancerProvider = new CustomBackendMetricsLoadBalancerProvider(); LoadBalancerRegistry.getDefaultRegistry().register(customBackendMetricsLoadBalancerProvider); client.setUp(); @@ -107,6 +115,10 @@ public static void main(String[] args) throws Exception { client.run(); } finally { client.tearDown(); + if (client.enableOpentelemetryTracing) { + System.out.println("Sleeping to flush spans..."); + Thread.sleep(2000); + } } } @@ -136,6 +148,7 @@ public static void main(String[] args) throws Exception { private int soakResponseSize = 314159; private int numThreads = 1; private String additionalMetadata = ""; + private boolean enableOpentelemetryTracing = false; private static LoadBalancerProvider customBackendMetricsLoadBalancerProvider; private Tester tester = new Tester(); @@ -167,6 +180,8 @@ void parseArgs(String[] args) throws Exception { serverHostOverride = value; } else if ("server_port".equals(key)) { serverPort = Integer.parseInt(value); + } else if ("enable_opentelemetry_tracing".equals(key)) { + enableOpentelemetryTracing = Boolean.parseBoolean(value); } else if ("test_case".equals(key)) { testCase = value; } else if ("num_times".equals(key)) { @@ -599,6 +614,9 @@ private class Tester extends AbstractInteropTest { @Override protected ManagedChannelBuilder createChannelBuilder() { boolean useGeneric = false; + if (enableOpentelemetryTracing) { + useGeneric = true; + } ChannelCredentials channelCredentials; if (customCredentialsType != null) { useGeneric = true; // Retain old behavior; avoids erroring if incompatible diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java index fc4cdf9178f..9431790ee74 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java @@ -17,6 +17,7 @@ package io.grpc.testing.integration; import com.google.common.annotations.VisibleForTesting; +import io.grpc.gcp.csm.observability.CsmObservability; import com.google.common.util.concurrent.MoreExecutors; import io.grpc.BindableService; import io.grpc.Grpc; @@ -46,6 +47,13 @@ public class TestServiceServer { public static void main(String[] args) throws Exception { final TestServiceServer server = new TestServiceServer(); server.parseArgs(args); + if (server.enableOpentelemetryTracing) { + io.opentelemetry.api.OpenTelemetry otel = io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk(); + io.grpc.opentelemetry.GrpcOpenTelemetry gotel = io.grpc.opentelemetry.GrpcOpenTelemetry.newBuilder() + .sdk(otel) + .build(); + gotel.registerGlobal(); + } if (server.useTls) { System.out.println( "\nUsing fake CA for TLS certificate. Test clients should expect host\n" @@ -75,6 +83,7 @@ public void run() { private int port = 8080; private boolean useTls = true; private boolean useAlts = false; + private boolean enableOpentelemetryTracing = false; private ScheduledExecutorService executor; private Server server; @@ -106,6 +115,8 @@ void parseArgs(String[] args) { port = Integer.parseInt(value); } else if ("use_tls".equals(key)) { useTls = Boolean.parseBoolean(value); + } else if ("enable_opentelemetry_tracing".equals(key)) { + enableOpentelemetryTracing = Boolean.parseBoolean(value); } else if ("use_alts".equals(key)) { useAlts = Boolean.parseBoolean(value); } else if ("local_handshaker_port".equals(key)) { diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java index 89519041a79..26a7b187064 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java @@ -39,6 +39,7 @@ import io.grpc.InsecureChannelCredentials; import io.grpc.InsecureServerCredentials; import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Server; @@ -60,6 +61,7 @@ import io.grpc.testing.integration.Messages.SimpleRequest; import io.grpc.testing.integration.Messages.SimpleResponse; import io.grpc.xds.XdsChannelCredentials; +import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; import java.util.ArrayList; import java.util.Collections; @@ -104,6 +106,7 @@ public final class XdsTestClient { private long currentRequestId; private ListeningScheduledExecutorService exec; private CsmObservability csmObservability; + private OpenTelemetrySdk openTelemetrySdk; /** * The main application allowing this client to be launched from the command line. @@ -265,14 +268,23 @@ private static RpcType parseRpc(String rpc) { @IgnoreJRERequirement // OpenTelemetry uses Java 8+ APIs private void run() { if (enableCsmObservability) { + Map props = new HashMap<>(); + props.put("otel.logs.exporter", "none"); + props.put("otel.metrics.exporter", "otlp"); + String tracesExporter = System.getenv("OTEL_TRACES_EXPORTER"); + if (tracesExporter != null) { + props.put("otel.traces.exporter", tracesExporter); + } else { + props.put("otel.traces.exporter", "none"); + } + + AutoConfiguredOpenTelemetrySdk autoSdk = AutoConfiguredOpenTelemetrySdk.builder() + .addPropertiesSupplier(() -> props) + .build(); + openTelemetrySdk = autoSdk.getOpenTelemetrySdk(); csmObservability = CsmObservability.newBuilder() - .sdk(AutoConfiguredOpenTelemetrySdk.builder() - .addPropertiesSupplier(() -> ImmutableMap.of( - "otel.logs.exporter", "none", - "otel.metrics.exporter", "prometheus", - "otel.traces.exporter", "none")) - .build() - .getOpenTelemetrySdk()) + .sdk(openTelemetrySdk) + .enableTracing(!"none".equals(props.get("otel.traces.exporter"))) .build(); csmObservability.registerGlobal(); } @@ -289,14 +301,16 @@ private void run() { try { statsServer.start(); for (int i = 0; i < numChannels; i++) { - channels.add( - Grpc.newChannelBuilder( + ManagedChannelBuilder builder = Grpc.newChannelBuilder( server, secureMode ? XdsChannelCredentials.create(InsecureChannelCredentials.create()) : InsecureChannelCredentials.create()) - .enableRetry() - .build()); + .enableRetry(); + if (enableCsmObservability) { + csmObservability.configureChannelBuilder(builder); + } + channels.add(builder.build()); } exec = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor()); Payload requestPayload = Payload.newBuilder() @@ -325,6 +339,9 @@ private void stop() throws InterruptedException { if (csmObservability != null) { csmObservability.close(); } + if (openTelemetrySdk != null) { + openTelemetrySdk.close(); + } } @@ -373,6 +390,13 @@ public void start(Listener responseListener, Metadata headers) { @Override public void onHeaders(Metadata headers) { hostnameRef.set(headers.get(XdsTestServer.HOSTNAME_KEY)); + io.opentelemetry.api.trace.Span currentSpan = io.opentelemetry.api.trace.Span.current(); + for (String key : config.metadata.keys()) { + String value = config.metadata.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)); + if (value != null) { + currentSpan.setAttribute("custom.metadata." + key, value); + } + } super.onHeaders(headers); } }, @@ -406,44 +430,56 @@ public void onNext(EmptyProtos.Empty response) {} .setPayload(requestPayload) .setResponseSize(responseSize) .build(); - stub.unaryCall( - request, - new StreamObserver() { - @Override - public void onCompleted() { - handleRpcCompleted(requestId, config.rpcType, hostnameRef.get(), savedWatchers); - } - @Override - public void onError(Throwable t) { - if (printResponse) { - logger.log(Level.WARNING, "Rpc failed", t); + io.opentelemetry.api.baggage.BaggageBuilder baggageBuilder = io.opentelemetry.api.baggage.Baggage.builder(); + for (String key : config.metadata.keys()) { + String value = config.metadata.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)); + if (value != null) { + baggageBuilder.put(key, value); + } + } + io.opentelemetry.api.baggage.Baggage baggage = baggageBuilder.build(); + + try (io.opentelemetry.context.Scope scope = io.opentelemetry.context.Context.current().with(baggage).makeCurrent()) { + stub.unaryCall( + request, + new StreamObserver() { + @Override + public void onCompleted() { + handleRpcCompleted(requestId, config.rpcType, hostnameRef.get(), savedWatchers); } - handleRpcError(requestId, config.rpcType, Status.fromThrowable(t), - savedWatchers); - } - @Override - public void onNext(SimpleResponse response) { - // TODO(ericgribkoff) Currently some test environments cannot access the stats RPC - // service and rely on parsing stdout. - if (printResponse) { - System.out.println( - "Greeting: Hello world, this is " - + response.getHostname() - + ", from " - + clientCallRef - .get() - .getAttributes() - .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); + @Override + public void onError(Throwable t) { + if (printResponse) { + logger.log(Level.WARNING, "Rpc failed", t); + } + handleRpcError(requestId, config.rpcType, Status.fromThrowable(t), + savedWatchers); } - // Use the hostname from the response if not present in the metadata. - // TODO(ericgribkoff) Delete when server is deployed that sets metadata value. - if (hostnameRef.get() == null) { - hostnameRef.set(response.getHostname()); + + @Override + public void onNext(SimpleResponse response) { + // TODO(ericgribkoff) Currently some test environments cannot access the stats RPC + // service and rely on parsing stdout. + if (printResponse) { + System.out.println( + "Greeting: Hello world, this is " + + response.getHostname() + + ", from " + + clientCallRef + .get() + .getAttributes() + .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); + } + // Use the hostname from the response if not present in the metadata. + // TODO(ericgribkoff) Delete when server is deployed that sets metadata value. + if (hostnameRef.get() == null) { + hostnameRef.set(response.getHostname()); + } } - } - }); + }); + } } else { throw new AssertionError("Unknown RPC type: " + config.rpcType); } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java index 88f1bf468b6..5b48e59f8c2 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java @@ -46,13 +46,16 @@ import io.grpc.testing.integration.Messages.SimpleResponse; import io.grpc.xds.XdsServerBuilder; import io.grpc.xds.XdsServerCredentials; +import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Locale; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -92,6 +95,7 @@ public final class XdsTestServer { private String host; private Util.AddressType addressType = Util.AddressType.IPV4_IPV6; private CsmObservability csmObservability; + private OpenTelemetrySdk openTelemetrySdk; /** * The main application allowing this client to be launched from the command line. @@ -197,14 +201,23 @@ void parseArgs(String[] args) { @IgnoreJRERequirement // OpenTelemetry uses Java 8+ APIs void start() throws Exception { if (enableCsmObservability) { + Map props = new HashMap<>(); + props.put("otel.logs.exporter", "none"); + props.put("otel.metrics.exporter", "otlp"); + String tracesExporter = System.getenv("OTEL_TRACES_EXPORTER"); + if (tracesExporter != null) { + props.put("otel.traces.exporter", tracesExporter); + } else { + props.put("otel.traces.exporter", "none"); + } + + AutoConfiguredOpenTelemetrySdk autoSdk = AutoConfiguredOpenTelemetrySdk.builder() + .addPropertiesSupplier(() -> props) + .build(); + openTelemetrySdk = autoSdk.getOpenTelemetrySdk(); csmObservability = CsmObservability.newBuilder() - .sdk(AutoConfiguredOpenTelemetrySdk.builder() - .addPropertiesSupplier(() -> ImmutableMap.of( - "otel.logs.exporter", "none", - "otel.metrics.exporter", "prometheus", - "otel.traces.exporter", "none")) - .build() - .getOpenTelemetrySdk()) + .sdk(openTelemetrySdk) + .enableTracing(!"none".equals(props.get("otel.traces.exporter"))) .build(); csmObservability.registerGlobal(); } @@ -301,6 +314,9 @@ void stop() throws Exception { if (csmObservability != null) { csmObservability.close(); } + if (openTelemetrySdk != null) { + openTelemetrySdk.close(); + } } private void blockUntilShutdown() throws InterruptedException { diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/OpenTelemetryTracingInteropPocTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/OpenTelemetryTracingInteropPocTest.java new file mode 100644 index 00000000000..8232f7a00c0 --- /dev/null +++ b/interop-testing/src/test/java/io/grpc/testing/integration/OpenTelemetryTracingInteropPocTest.java @@ -0,0 +1,139 @@ +/* + * Copyright 2026 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.testing.integration; + +import static org.junit.Assert.assertTrue; + +import io.grpc.InsecureServerCredentials; +import io.grpc.ManagedChannelBuilder; +import io.grpc.ServerBuilder; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.NettyServerBuilder; +import io.grpc.opentelemetry.GrpcOpenTelemetry; +import io.grpc.opentelemetry.InternalGrpcOpenTelemetry; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class OpenTelemetryTracingInteropPocTest extends AbstractInteropTest { + + private TestSpanExporter spanExporter; + private OpenTelemetrySdk openTelemetrySdk; + private GrpcOpenTelemetry grpcOpenTelemetry; + + private static class TestSpanExporter implements SpanExporter { + private final List spans = new ArrayList<>(); + + @Override + public CompletableResultCode export(Collection spans) { + this.spans.addAll(spans); + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + return CompletableResultCode.ofSuccess(); + } + + public List getSpans() { + return spans; + } + } + + @Before + @Override + public void setUp() { + spanExporter = new TestSpanExporter(); + openTelemetrySdk = OpenTelemetrySdk.builder() + .setTracerProvider(SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build()) + .build(); + + GrpcOpenTelemetry.Builder grpcOpentelemetryBuilder = GrpcOpenTelemetry.newBuilder() + .sdk(openTelemetrySdk); + InternalGrpcOpenTelemetry.enableTracing(grpcOpentelemetryBuilder, true); + grpcOpenTelemetry = grpcOpentelemetryBuilder.build(); + + super.setUp(); + } + + @After + @Override + public void tearDown() { + super.tearDown(); + if (openTelemetrySdk != null) { + openTelemetrySdk.close(); + } + } + + @Override + protected ServerBuilder getServerBuilder() { + NettyServerBuilder builder = NettyServerBuilder.forPort(0, InsecureServerCredentials.create()) + .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); + grpcOpenTelemetry.configureServerBuilder(builder); + return builder; + } + + @Override + protected ManagedChannelBuilder createChannelBuilder() { + NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress()) + .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) + .usePlaintext(); + grpcOpenTelemetry.configureChannelBuilder(builder); + return builder; + } + + @Override + protected boolean metricsExpected() { + return false; + } + + @Test + public void verifySpansGenerated() throws Exception { + blockingStub.emptyCall(io.grpc.testing.integration.EmptyProtos.Empty.getDefaultInstance()); + + // Wait a bit for spans to be exported (SimpleSpanProcessor is synchronous, so they should be there) + Thread.sleep(500); + + List spans = spanExporter.getSpans(); + System.out.println("Captured spans: " + spans.size()); + for (SpanData span : spans) { + System.out.println("Span: " + span.getName()); + } + + assertTrue("Expected at least one span", spans.size() > 0); + } +}