diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs b/src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs index 73838b605..cf1194ced 100644 --- a/src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs +++ b/src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs @@ -40,5 +40,15 @@ public sealed record DaprSubscriptionOptions(MessageHandlingPolicy MessageHandli /// been signaled. /// public TimeSpan MaximumCleanupTimeout { get; init; } = TimeSpan.FromSeconds(30); + + /// + /// An optional callback invoked when errors occur in background tasks during an active subscription. + /// Errors during the initial subscription attempt are thrown directly to the caller. + /// If not set, the first background fault per subscribe cycle is cached on the receiver and rethrown + /// on the next call to SubscribeAsync, so the caller can observe it and decide whether to retry. + /// If the handler itself throws, both the original fault and the handler failure are cached and + /// surfaced together as an on the next subscribe attempt. + /// + public SubscriptionErrorHandler? ErrorHandler { get; init; } } diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs index 4b0d608ff..602966c6b 100644 --- a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs +++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs @@ -12,6 +12,7 @@ // ------------------------------------------------------------------------ using System.Threading.Channels; +using Dapr; using Dapr.AppCallback.Autogen.Grpc.v1; using Grpc.Core; using P = Dapr.Client.Autogen.Grpc.v1; @@ -73,6 +74,17 @@ internal sealed class PublishSubscribeReceiver : IAsyncDisposable /// private int hasInitialized; /// + /// Dedupes across the three background continuations so a single + /// sidecar failure surfaces exactly once per subscribe cycle. Reset at the start of each new cycle. + /// + private int hasFaulted; + /// + /// Stores the first unhandled background fault per subscribe cycle. Rethrown on the next call to + /// so a caller that did not configure an + /// still observes the error rather than losing it as an unobserved task exception. + /// + private Exception? pendingBackgroundFault; + /// /// Flag that ensures the instance is only disposed a single time. /// private bool isDisposed; @@ -81,6 +93,9 @@ internal sealed class PublishSubscribeReceiver : IAsyncDisposable internal Task TopicMessagesChannelCompletion => topicMessagesChannel.Reader.Completion; // Internal property for testing purposes internal Task AcknowledgementsChannelCompletion => acknowledgementsChannel.Reader.Completion; + // Internal property for testing purposes — exposes whether a background fault has been cached + // and is waiting to be surfaced on the next SubscribeAsync call. + internal bool HasPendingBackgroundFault => Volatile.Read(ref pendingBackgroundFault) is not null; /// /// Constructs a new instance of a instance. @@ -109,34 +124,63 @@ internal PublishSubscribeReceiver(string pubSubName, string topicName, DaprSubsc /// Dynamically subscribes to messages on a PubSub topic provided by the Dapr sidecar. /// /// Cancellation token. - /// An containing messages provided by the sidecar. + /// A task representing the asynchronous subscribe operation. internal async Task SubscribeAsync(CancellationToken cancellationToken = default) { + // Surface any unhandled background fault from a prior subscribe cycle so the caller + // can observe it explicitly before re-subscribing. + var carried = Interlocked.Exchange(ref pendingBackgroundFault, null); + if (carried is not null) + { + throw carried; + } + //Prevents the receiver from performing the subscribe operation more than once (as the multiple initialization messages would cancel the stream). if (Interlocked.Exchange(ref hasInitialized, 1) == 1) { return; } - var stream = await GetStreamAsync(cancellationToken); + // Reset the per-cycle fault dedupe flag. + Interlocked.Exchange(ref hasFaulted, 0); + + AsyncDuplexStreamingCall stream; + try + { + stream = await GetStreamAsync(cancellationToken); + } + catch (RpcException ex) + { + // Reset so the caller can retry after the sidecar becomes available + Interlocked.Exchange(ref hasInitialized, 0); + throw new DaprException( + $"Unable to subscribe to topic '{topicName}' on pubsub '{pubSubName}'. The Dapr sidecar may be unavailable.", + ex); + } + catch (Exception) + { + // Reset so the caller can retry regardless of the exception type + Interlocked.Exchange(ref hasInitialized, 0); + throw; + } //Retrieve the messages from the sidecar and write to the messages channel - start without awaiting so this isn't blocking _ = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken) - .ContinueWith(HandleTaskCompletion, null, cancellationToken, TaskContinuationOptions.OnlyOnFaulted, + .ContinueWith(HandleTaskCompletion, null, CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default); - //Process the messages as they're written to either channel + // Start background processors for acknowledgements and topic messages _ = ProcessAcknowledgementChannelMessagesAsync(stream, cancellationToken).ContinueWith(HandleTaskCompletion, - null, cancellationToken, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default); + null, CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default); _ = ProcessTopicChannelMessagesAsync(cancellationToken).ContinueWith(HandleTaskCompletion, null, - cancellationToken, + CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default); } /// /// Exposed for testing purposes only. /// - /// The test message to write. + /// The topic message to write. internal async Task WriteMessageToChannelAsync(TopicMessage message) { await topicMessagesChannel.Writer.WriteAsync(message); @@ -148,12 +192,94 @@ internal async Task WriteAcknowledgementToChannelAsync(TopicAcknowledgement ackn await acknowledgementsChannel.Writer.WriteAsync(acknowledgement); } - //Exposed for testing purposes only - internal static void HandleTaskCompletion(Task task, object? state) + /// + /// Handles faulted background tasks by resetting the subscription state and either invoking the + /// configured error handler or caching the fault for the next call. + /// + /// + /// This method is the terminal continuation for , + /// , and . + /// It never re-throws: doing so would fault an unobserved inner (the original bug + /// this class was meant to fix). Instead, unhandled faults are stored in + /// and surfaced on the caller's next subscribe attempt. + /// + internal async Task HandleTaskCompletion(Task task, object? state) { - if (task.Exception != null) + if (task.Exception is null) + { + return; + } + + // Dedupe: the three sibling continuations typically all fault when the sidecar dies. + // Only the first one is reported; the rest observe the dedupe flag and exit. + if (Interlocked.CompareExchange(ref hasFaulted, 1, 0) != 0) + { + return; + } + + var innerExceptions = task.Exception.InnerExceptions; + + // If every inner exception is a cancellation, treat the fault as a clean cancel: + // reset the subscription state but do not cache or surface an error. + if (innerExceptions.All(e => e is OperationCanceledException)) { - throw task.Exception; + await ResetStreamStateAsync().ConfigureAwait(false); + return; + } + + // Prefer a non-cancellation inner exception for the user-facing message. + var innerException = innerExceptions.FirstOrDefault(e => e is not OperationCanceledException) + ?? task.Exception; + + await ResetStreamStateAsync().ConfigureAwait(false); + + var daprException = new DaprException( + $"An error occurred during an active subscription to topic '{topicName}' on pubsub '{pubSubName}'.", + innerException); + + if (options.ErrorHandler is null) + { + // No handler configured: cache so the next SubscribeAsync surfaces it. + Interlocked.CompareExchange(ref pendingBackgroundFault, daprException, null); + return; + } + + try + { + await options.ErrorHandler.Invoke(daprException).ConfigureAwait(false); + } + catch (Exception handlerEx) + { + // User-supplied handler threw. Cache a combined fault so the next SubscribeAsync + // surfaces both the original error and the handler failure — never silently drop either. + var combined = new AggregateException( + $"The SubscriptionErrorHandler for topic '{topicName}' on pubsub '{pubSubName}' threw while handling a prior fault.", + daprException, handlerEx); + Interlocked.CompareExchange(ref pendingBackgroundFault, combined, null); + } + } + + /// + /// Atomically tears down the active stream and resets the initialization flag. + /// + /// + /// Serialized under (the same lock used by ) + /// so a concurrent cannot observe reset + /// while still holds the stale reference. + /// + private async Task ResetStreamStateAsync() + { + await semaphore.WaitAsync().ConfigureAwait(false); + try + { + var old = clientStream; + clientStream = null; + Interlocked.Exchange(ref hasInitialized, 0); + old?.Dispose(); + } + finally + { + semaphore.Release(); } } diff --git a/src/Dapr.Messaging/PublishSubscribe/SubscriptionErrorHandler.cs b/src/Dapr.Messaging/PublishSubscribe/SubscriptionErrorHandler.cs new file mode 100644 index 000000000..5e245a38c --- /dev/null +++ b/src/Dapr.Messaging/PublishSubscribe/SubscriptionErrorHandler.cs @@ -0,0 +1,25 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Messaging.PublishSubscribe; + +/// +/// A delegate that handles errors occurring during an active subscription. +/// +/// The wrapping the original error. +/// +/// This handler is invoked on a thread pool thread. Implementations should be thread-safe. +/// If the returned task faults, the handler's exception is combined with the original fault and +/// surfaced as an on the next call to SubscribeAsync. +/// +public delegate Task SubscriptionErrorHandler(DaprException exception); diff --git a/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs b/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs index d266ff0cb..352d1ae5f 100644 --- a/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs +++ b/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs @@ -12,6 +12,7 @@ // ------------------------------------------------------------------------ using System.Threading.Channels; +using Dapr; using Dapr.AppCallback.Autogen.Grpc.v1; using Dapr.Messaging.PublishSubscribe; using Grpc.Core; @@ -193,23 +194,68 @@ public async Task DisposeAsync_ShouldCompleteChannels() } [Fact] - public void HandleTaskCompletion_ShouldThrowException_WhenTaskHasException() + public async Task HandleTaskCompletion_ShouldInvokeErrorHandler_WhenTaskHasException() { + const string pubSubName = "testPubSub"; + const string topicName = "testTopic"; + DaprException? receivedException = null; + var options = + new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success)) + { + ErrorHandler = ex => { receivedException = ex; return Task.CompletedTask; } + }; + + var messageHandler = new TopicMessageHandler((message, token) => Task.FromResult(TopicResponseAction.Success)); + var mockDaprClient = new Mock(); + var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, mockDaprClient.Object); + var task = Task.FromException(new InvalidOperationException("Test exception")); - var exception = Assert.Throws(() => - PublishSubscribeReceiver.HandleTaskCompletion(task, null)); + await receiver.HandleTaskCompletion(task, null); + + Assert.NotNull(receivedException); + Assert.IsType(receivedException.InnerException); + Assert.Equal("Test exception", receivedException.InnerException.Message); + Assert.Contains("testTopic", receivedException.Message); + Assert.Contains("testPubSub", receivedException.Message); + } + + [Fact] + public async Task HandleTaskCompletion_ShouldCacheForNextSubscribe_WhenNoErrorHandler() + { + const string pubSubName = "testPubSub"; + const string topicName = "testTopic"; + var options = + new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success)); + + var messageHandler = new TopicMessageHandler((message, token) => Task.FromResult(TopicResponseAction.Success)); + var mockDaprClient = new Mock(); + var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, mockDaprClient.Object); + var task = Task.FromException(new InvalidOperationException("Test exception")); + + // With no ErrorHandler, HandleTaskCompletion must NOT throw synchronously (doing so would + // fault an unobserved continuation Task — the original silent-failure bug). Instead it caches + // the fault and the next SubscribeAsync call rethrows it. + await receiver.HandleTaskCompletion(task, null); + + var exception = await Assert.ThrowsAsync(() => receiver.SubscribeAsync(TestContext.Current.CancellationToken)); Assert.IsType(exception.InnerException); - Assert.Equal("Test exception", exception.InnerException.Message); + Assert.Equal("Test exception", exception.InnerException!.Message); } [Fact] - public void HandleTaskCompletion_SuccessfulTask_DoesNotThrow() + public async Task HandleTaskCompletion_SuccessfulTask_DoesNotThrow() { + const string pubSubName = "testPubSub"; + const string topicName = "testTopic"; + var options = new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success)); + var messageHandler = new TopicMessageHandler((message, token) => Task.FromResult(TopicResponseAction.Success)); + var mockDaprClient = new Mock(); + var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, mockDaprClient.Object); var task = Task.CompletedTask; // Should not throw for a completed (non-faulted) task - PublishSubscribeReceiver.HandleTaskCompletion(task, null); + await receiver.HandleTaskCompletion(task, null); } [Fact] @@ -614,10 +660,11 @@ public async Task ProcessTopicChannelMessages_SuccessAction_WritesSuccessAcknowl /// /// When the message handler returns an unrecognised TopicResponseAction, AcknowledgeMessageAsync /// throws InvalidOperationException, which causes the ProcessTopicChannelMessagesAsync background - /// task to fault and HandleTaskCompletion to re-throw the exception. + /// task to fault. The fault must be cached by HandleTaskCompletion and rethrown on the next + /// SubscribeAsync call. /// [Fact] - public async Task AcknowledgeMessageAsync_UnrecognisedAction_FaultsProcessingTask() + public async Task AcknowledgeMessageAsync_UnrecognisedAction_FaultIsCachedForNextSubscribe() { const string pubSubName = "testPubSub"; const string topicName = "testTopic"; @@ -631,6 +678,8 @@ public async Task AcknowledgeMessageAsync_UnrecognisedAction_FaultsProcessingTas mockRequestStream .Setup(s => s.WriteAsync(It.IsAny(), It.IsAny())) .Returns(Task.CompletedTask); + // Keep the fetch loop parked so it doesn't race the ack-processing fault. + mockResponseStream.Setup(s => s.MoveNext(It.IsAny())).ReturnsAsync(false); var mockCall = new AsyncDuplexStreamingCall( mockRequestStream.Object, mockResponseStream.Object, @@ -638,10 +687,6 @@ public async Task AcknowledgeMessageAsync_UnrecognisedAction_FaultsProcessingTas mockDaprClient.Setup(c => c.SubscribeTopicEventsAlpha1(null, null, It.IsAny())) .Returns(mockCall); - // Capture the faulted continuation task so we can observe the exception. -#pragma warning disable CS0219 // Variable is assigned but its value is never used - Task? faultedTask = null; -#pragma warning restore CS0219 // Variable is assigned but its value is never used var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, (_, _) => Task.FromResult((TopicResponseAction)99), mockDaprClient.Object); @@ -651,13 +696,11 @@ public async Task AcknowledgeMessageAsync_UnrecognisedAction_FaultsProcessingTas var msg = new TopicMessage("bad-action-id", "src", "type", "1.0", "text/plain", topicName, pubSubName); await receiver.WriteMessageToChannelAsync(msg); - // Allow the background task time to fault. - await Task.Delay(300, TestContext.Current.CancellationToken); + // Wait for the background fault to be observed and cached by HandleTaskCompletion. + await WaitForPendingBackgroundFaultAsync(receiver); - // Verify HandleTaskCompletion correctly re-throws when given the faulted task. - var faultedStub = Task.FromException(new InvalidOperationException("Unrecognized topic acknowledgement action: 99")); - var ex = Assert.Throws(() => - PublishSubscribeReceiver.HandleTaskCompletion(faultedStub, null)); + // The next SubscribeAsync must surface the cached fault. + var ex = await Assert.ThrowsAsync(() => receiver.SubscribeAsync(TestContext.Current.CancellationToken)); Assert.IsType(ex.InnerException); Assert.Contains("99", ex.InnerException!.Message); @@ -831,4 +874,276 @@ await receiver.WriteAcknowledgementToChannelAsync( Assert.True(receiver.TopicMessagesChannelCompletion.IsCompleted); Assert.True(receiver.AcknowledgementsChannelCompletion.IsCompleted); } + + // ------------------------------------------------------------------------- + // Background-fault surfacing — repro tests that drive the real ContinueWith wiring. + // ------------------------------------------------------------------------- + + /// + /// Polls until the receiver has cached a background fault or the timeout elapses. Used by tests + /// that need to wait for HandleTaskCompletion to run through the real continuation chain. + /// + private static async Task WaitForPendingBackgroundFaultAsync(PublishSubscribeReceiver receiver, int timeoutMs = 2000) + { + var deadline = Environment.TickCount + timeoutMs; + while (!receiver.HasPendingBackgroundFault) + { + if (Environment.TickCount >= deadline) + { + throw new Xunit.Sdk.XunitException( + $"Background fault was not cached within {timeoutMs} ms — HandleTaskCompletion did not run or did not store the fault."); + } + await Task.Delay(10, TestContext.Current.CancellationToken); + } + } + + /// + /// Builds a mock that returns a DaprClient whose response stream faults on MoveNext with the + /// supplied exception. Drives FetchDataFromSidecarAsync into a faulted state, exercising the + /// real ContinueWith -> HandleTaskCompletion chain. + /// + private static Mock CreateMockDaprClientWithFaultingResponseStream(Exception faultWith, int? callCount = null) + { + var mockDaprClient = new Mock(); + var mockRequestStream = new Mock>(); + mockRequestStream + .Setup(s => s.WriteAsync(It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + var mockResponseStream = new Mock>(); + mockResponseStream + .Setup(s => s.MoveNext(It.IsAny())) + .Returns(Task.FromException(faultWith)); + + var mockCall = new AsyncDuplexStreamingCall( + mockRequestStream.Object, mockResponseStream.Object, + Task.FromResult(new Metadata()), () => new Status(), () => new Metadata(), () => { }); + mockDaprClient.Setup(c => c.SubscribeTopicEventsAlpha1(null, null, It.IsAny())) + .Returns(mockCall); + return mockDaprClient; + } + + /// + /// Repro for the original bug: when the response stream faults in the background (sidecar became + /// unavailable mid-subscription) AND no ErrorHandler is configured, the fault must not be lost. + /// It must be cached on the receiver and rethrown on the next SubscribeAsync call. + /// + [Fact] + public async Task SubscribeAsync_WhenBackgroundFetchFaults_WithoutHandler_NextSubscribeRethrows() + { + const string pubSubName = "testPubSub"; + const string topicName = "testTopic"; + var options = new DaprSubscriptionOptions( + new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success)) + { MaximumCleanupTimeout = TimeSpan.FromSeconds(1) }; + + var rpcFault = new RpcException(new Status(StatusCode.Unavailable, "sidecar went away")); + var mockDaprClient = CreateMockDaprClientWithFaultingResponseStream(rpcFault); + + var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, + (_, _) => Task.FromResult(TopicResponseAction.Success), mockDaprClient.Object); + + // First subscribe succeeds — the gRPC call itself is established. The fault happens in the + // background FetchDataFromSidecarAsync loop when it tries to read the first message. + await receiver.SubscribeAsync(TestContext.Current.CancellationToken); + + await WaitForPendingBackgroundFaultAsync(receiver); + + var ex = await Assert.ThrowsAsync(() => receiver.SubscribeAsync(TestContext.Current.CancellationToken)); + Assert.IsType(ex.InnerException); + Assert.Contains("testTopic", ex.Message); + Assert.Contains("testPubSub", ex.Message); + + await receiver.DisposeAsync(); + } + + /// + /// When an ErrorHandler IS configured, a single sidecar failure must invoke it exactly once — + /// not 2–3 times, even though three background continuations all route to HandleTaskCompletion. + /// + [Fact] + public async Task SubscribeAsync_WhenBackgroundFetchFaults_InvokesErrorHandlerExactlyOnce() + { + const string pubSubName = "testPubSub"; + const string topicName = "testTopic"; + + var handlerInvoked = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var invocationCount = 0; + var options = new DaprSubscriptionOptions( + new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success)) + { + MaximumCleanupTimeout = TimeSpan.FromSeconds(1), + ErrorHandler = ex => + { + Interlocked.Increment(ref invocationCount); + handlerInvoked.TrySetResult(ex); + return Task.CompletedTask; + } + }; + + var rpcFault = new RpcException(new Status(StatusCode.Unavailable, "sidecar went away")); + var mockDaprClient = CreateMockDaprClientWithFaultingResponseStream(rpcFault); + + var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, + (_, _) => Task.FromResult(TopicResponseAction.Success), mockDaprClient.Object); + + await receiver.SubscribeAsync(TestContext.Current.CancellationToken); + + var received = await handlerInvoked.Task.WaitAsync(TimeSpan.FromSeconds(2), TestContext.Current.CancellationToken); + Assert.IsType(received.InnerException); + + // Grace period for any sibling continuations to (incorrectly) invoke the handler. + await Task.Delay(150, TestContext.Current.CancellationToken); + + Assert.Equal(1, Volatile.Read(ref invocationCount)); + + await receiver.DisposeAsync(); + } + + /// + /// After a background fault with no handler, the caller can re-subscribe to recover. The second + /// subscribe call must rethrow the cached fault; a third call must then succeed and establish a + /// fresh stream (verifying that the previous stream was properly reset). + /// + [Fact] + public async Task SubscribeAsync_AfterBackgroundFault_NextSubscribeRecreatesStream() + { + const string pubSubName = "testPubSub"; + const string topicName = "testTopic"; + var options = new DaprSubscriptionOptions( + new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success)) + { MaximumCleanupTimeout = TimeSpan.FromSeconds(1) }; + + // First call: response stream faults on MoveNext. Second call: a healthy stream that parks. + var mockDaprClient = new Mock(); + + var faultyRequestStream = new Mock>(); + faultyRequestStream + .Setup(s => s.WriteAsync(It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + var faultyResponseStream = new Mock>(); + faultyResponseStream + .Setup(s => s.MoveNext(It.IsAny())) + .Returns(Task.FromException(new RpcException(new Status(StatusCode.Unavailable, "boom")))); + var faultyCall = new AsyncDuplexStreamingCall( + faultyRequestStream.Object, faultyResponseStream.Object, + Task.FromResult(new Metadata()), () => new Status(), () => new Metadata(), () => { }); + + var healthyRequestStream = new Mock>(); + healthyRequestStream + .Setup(s => s.WriteAsync(It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + var healthyResponseStream = new Mock>(); + healthyResponseStream + .Setup(s => s.MoveNext(It.IsAny())) + .ReturnsAsync(false); // no messages; stream completes cleanly + var healthyCall = new AsyncDuplexStreamingCall( + healthyRequestStream.Object, healthyResponseStream.Object, + Task.FromResult(new Metadata()), () => new Status(), () => new Metadata(), () => { }); + + mockDaprClient.SetupSequence(c => c.SubscribeTopicEventsAlpha1(null, null, It.IsAny())) + .Returns(faultyCall) + .Returns(healthyCall); + + var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, + (_, _) => Task.FromResult(TopicResponseAction.Success), mockDaprClient.Object); + + await receiver.SubscribeAsync(TestContext.Current.CancellationToken); + await WaitForPendingBackgroundFaultAsync(receiver); + + // Second call: caller observes the cached fault. + await Assert.ThrowsAsync(() => receiver.SubscribeAsync(TestContext.Current.CancellationToken)); + + // Third call: the fault has been drained; a fresh stream should be obtained from the client. + await receiver.SubscribeAsync(TestContext.Current.CancellationToken); + + mockDaprClient.Verify( + c => c.SubscribeTopicEventsAlpha1(null, null, It.IsAny()), + Times.Exactly(2)); + + await receiver.DisposeAsync(); + } + + /// + /// A background task that completes via OperationCanceledException must not be treated as an + /// error: no fault should be cached and the next SubscribeAsync should not rethrow. + /// + [Fact] + public async Task SubscribeAsync_WhenBackgroundFetchCancelled_DoesNotCachePendingFault() + { + const string pubSubName = "testPubSub"; + const string topicName = "testTopic"; + var options = new DaprSubscriptionOptions( + new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success)) + { MaximumCleanupTimeout = TimeSpan.FromSeconds(1) }; + + var mockDaprClient = CreateMockDaprClientWithFaultingResponseStream(new OperationCanceledException("cancelled")); + + var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, + (_, _) => Task.FromResult(TopicResponseAction.Success), mockDaprClient.Object); + + await receiver.SubscribeAsync(TestContext.Current.CancellationToken); + + // Give HandleTaskCompletion a chance to run and reset state. Since this is a clean cancellation, + // no fault should be cached. + await Task.Delay(200, TestContext.Current.CancellationToken); + Assert.False(receiver.HasPendingBackgroundFault); + + // A subsequent SubscribeAsync must not rethrow anything cancellation-related — it should try + // to re-establish the subscription (which will fault again here, but that second fault path + // is orthogonal to this assertion). We only verify that the pending-fault rethrow does NOT fire. + try + { + await receiver.SubscribeAsync(TestContext.Current.CancellationToken); + } + catch (DaprException) + { + // A secondary fault from the new subscribe cycle is acceptable — we only wanted to prove + // that the cancellation did not get cached as a pending fault to rethrow at the top of + // SubscribeAsync (which would have been an OperationCanceledException, not DaprException). + } + + await receiver.DisposeAsync(); + } + + /// + /// When the user-supplied ErrorHandler itself throws, the original fault plus the handler fault + /// must both be cached and surfaced together on the next SubscribeAsync as an AggregateException. + /// + [Fact] + public async Task SubscribeAsync_WhenErrorHandlerThrows_CachesCombinedFaultForNextSubscribe() + { + const string pubSubName = "testPubSub"; + const string topicName = "testTopic"; + + var handlerInvoked = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var options = new DaprSubscriptionOptions( + new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success)) + { + MaximumCleanupTimeout = TimeSpan.FromSeconds(1), + ErrorHandler = _ => + { + handlerInvoked.TrySetResult(); + throw new InvalidOperationException("handler bug"); + } + }; + + var rpcFault = new RpcException(new Status(StatusCode.Unavailable, "sidecar down")); + var mockDaprClient = CreateMockDaprClientWithFaultingResponseStream(rpcFault); + + var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, + (_, _) => Task.FromResult(TopicResponseAction.Success), mockDaprClient.Object); + + await receiver.SubscribeAsync(TestContext.Current.CancellationToken); + await handlerInvoked.Task.WaitAsync(TimeSpan.FromSeconds(2), TestContext.Current.CancellationToken); + await WaitForPendingBackgroundFaultAsync(receiver); + + var ex = await Assert.ThrowsAsync(() => + receiver.SubscribeAsync(TestContext.Current.CancellationToken)); + + Assert.Contains(ex.InnerExceptions, e => e is DaprException); + Assert.Contains(ex.InnerExceptions, e => e is InvalidOperationException { Message: "handler bug" }); + + await receiver.DisposeAsync(); + } }