Skip to content
10 changes: 10 additions & 0 deletions src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,15 @@ public sealed record DaprSubscriptionOptions(MessageHandlingPolicy MessageHandli
/// been signaled.
/// </summary>
public TimeSpan MaximumCleanupTimeout { get; init; } = TimeSpan.FromSeconds(30);

/// <summary>
/// An optional callback invoked when errors occur in background tasks during an active subscription.
/// Errors during the initial subscription attempt are thrown directly to the caller.
/// If not set, the first background fault per subscribe cycle is cached on the receiver and rethrown
/// on the next call to <c>SubscribeAsync</c>, so the caller can observe it and decide whether to retry.
/// If the handler itself throws, both the original fault and the handler failure are cached and
/// surfaced together as an <see cref="AggregateException"/> on the next subscribe attempt.
/// </summary>
public SubscriptionErrorHandler? ErrorHandler { get; init; }
}

148 changes: 137 additions & 11 deletions src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// ------------------------------------------------------------------------

using System.Threading.Channels;
using Dapr;
using Dapr.AppCallback.Autogen.Grpc.v1;
using Grpc.Core;
using P = Dapr.Client.Autogen.Grpc.v1;
Expand Down Expand Up @@ -73,6 +74,17 @@ internal sealed class PublishSubscribeReceiver : IAsyncDisposable
/// </summary>
private int hasInitialized;
/// <summary>
/// Dedupes <see cref="HandleTaskCompletion"/> across the three background continuations so a single
/// sidecar failure surfaces exactly once per subscribe cycle. Reset at the start of each new cycle.
/// </summary>
private int hasFaulted;
/// <summary>
/// Stores the first unhandled background fault per subscribe cycle. Rethrown on the next call to
/// <see cref="SubscribeAsync"/> so a caller that did not configure an <see cref="DaprSubscriptionOptions.ErrorHandler"/>
/// still observes the error rather than losing it as an unobserved task exception.
/// </summary>
private Exception? pendingBackgroundFault;
/// <summary>
/// Flag that ensures the instance is only disposed a single time.
/// </summary>
private bool isDisposed;
Expand All @@ -81,6 +93,9 @@ internal sealed class PublishSubscribeReceiver : IAsyncDisposable
internal Task TopicMessagesChannelCompletion => topicMessagesChannel.Reader.Completion;
// Internal property for testing purposes
internal Task AcknowledgementsChannelCompletion => acknowledgementsChannel.Reader.Completion;
// Internal property for testing purposes — exposes whether a background fault has been cached
// and is waiting to be surfaced on the next SubscribeAsync call.
internal bool HasPendingBackgroundFault => Volatile.Read(ref pendingBackgroundFault) is not null;

/// <summary>
/// Constructs a new instance of a <see cref="PublishSubscribeReceiver"/> instance.
Expand Down Expand Up @@ -109,34 +124,63 @@ internal PublishSubscribeReceiver(string pubSubName, string topicName, DaprSubsc
/// Dynamically subscribes to messages on a PubSub topic provided by the Dapr sidecar.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>An <see cref="IAsyncEnumerable{TopicMessage}"/> containing messages provided by the sidecar.</returns>
/// <returns>A task representing the asynchronous subscribe operation.</returns>
internal async Task SubscribeAsync(CancellationToken cancellationToken = default)
{
// Surface any unhandled background fault from a prior subscribe cycle so the caller
// can observe it explicitly before re-subscribing.
var carried = Interlocked.Exchange(ref pendingBackgroundFault, null);
if (carried is not null)
{
throw carried;
}

//Prevents the receiver from performing the subscribe operation more than once (as the multiple initialization messages would cancel the stream).
if (Interlocked.Exchange(ref hasInitialized, 1) == 1)
{
return;
}

var stream = await GetStreamAsync(cancellationToken);
// Reset the per-cycle fault dedupe flag.
Interlocked.Exchange(ref hasFaulted, 0);

AsyncDuplexStreamingCall<P.SubscribeTopicEventsRequestAlpha1, P.SubscribeTopicEventsResponseAlpha1> stream;
try
{
stream = await GetStreamAsync(cancellationToken);
}
catch (RpcException ex)
{
// Reset so the caller can retry after the sidecar becomes available
Interlocked.Exchange(ref hasInitialized, 0);
throw new DaprException(
$"Unable to subscribe to topic '{topicName}' on pubsub '{pubSubName}'. The Dapr sidecar may be unavailable.",
ex);
}
catch (Exception)
{
// Reset so the caller can retry regardless of the exception type
Interlocked.Exchange(ref hasInitialized, 0);
throw;
}

//Retrieve the messages from the sidecar and write to the messages channel - start without awaiting so this isn't blocking
_ = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken)
.ContinueWith(HandleTaskCompletion, null, cancellationToken, TaskContinuationOptions.OnlyOnFaulted,
.ContinueWith(HandleTaskCompletion, null, CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted,
TaskScheduler.Default);

//Process the messages as they're written to either channel
// Start background processors for acknowledgements and topic messages
_ = ProcessAcknowledgementChannelMessagesAsync(stream, cancellationToken).ContinueWith(HandleTaskCompletion,
null, cancellationToken, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
null, CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
_ = ProcessTopicChannelMessagesAsync(cancellationToken).ContinueWith(HandleTaskCompletion, null,
cancellationToken,
CancellationToken.None,
TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
}

/// <summary>
/// Exposed for testing purposes only.
/// </summary>
/// <param name="message">The test message to write.</param>
/// <param name="message">The topic message to write.</param>
internal async Task WriteMessageToChannelAsync(TopicMessage message)
{
await topicMessagesChannel.Writer.WriteAsync(message);
Expand All @@ -148,12 +192,94 @@ internal async Task WriteAcknowledgementToChannelAsync(TopicAcknowledgement ackn
await acknowledgementsChannel.Writer.WriteAsync(acknowledgement);
}

//Exposed for testing purposes only
internal static void HandleTaskCompletion(Task task, object? state)
/// <summary>
/// Handles faulted background tasks by resetting the subscription state and either invoking the
/// configured error handler or caching the fault for the next <see cref="SubscribeAsync"/> call.
/// </summary>
/// <remarks>
/// This method is the terminal continuation for <see cref="FetchDataFromSidecarAsync"/>,
/// <see cref="ProcessAcknowledgementChannelMessagesAsync"/>, and <see cref="ProcessTopicChannelMessagesAsync"/>.
/// It never re-throws: doing so would fault an unobserved inner <see cref="Task"/> (the original bug
/// this class was meant to fix). Instead, unhandled faults are stored in
/// <see cref="pendingBackgroundFault"/> and surfaced on the caller's next subscribe attempt.
/// </remarks>
internal async Task HandleTaskCompletion(Task task, object? state)
{
if (task.Exception != null)
if (task.Exception is null)
{
return;
}

// Dedupe: the three sibling continuations typically all fault when the sidecar dies.
// Only the first one is reported; the rest observe the dedupe flag and exit.
if (Interlocked.CompareExchange(ref hasFaulted, 1, 0) != 0)
{
return;
}

var innerExceptions = task.Exception.InnerExceptions;

// If every inner exception is a cancellation, treat the fault as a clean cancel:
// reset the subscription state but do not cache or surface an error.
if (innerExceptions.All(e => e is OperationCanceledException))
{
throw task.Exception;
await ResetStreamStateAsync().ConfigureAwait(false);
return;
}

// Prefer a non-cancellation inner exception for the user-facing message.
var innerException = innerExceptions.FirstOrDefault(e => e is not OperationCanceledException)
?? task.Exception;

await ResetStreamStateAsync().ConfigureAwait(false);

var daprException = new DaprException(
$"An error occurred during an active subscription to topic '{topicName}' on pubsub '{pubSubName}'.",
innerException);

if (options.ErrorHandler is null)
{
// No handler configured: cache so the next SubscribeAsync surfaces it.
Interlocked.CompareExchange(ref pendingBackgroundFault, daprException, null);
return;
}

try
{
await options.ErrorHandler.Invoke(daprException).ConfigureAwait(false);
}
catch (Exception handlerEx)
{
// User-supplied handler threw. Cache a combined fault so the next SubscribeAsync
// surfaces both the original error and the handler failure — never silently drop either.
var combined = new AggregateException(
$"The SubscriptionErrorHandler for topic '{topicName}' on pubsub '{pubSubName}' threw while handling a prior fault.",
daprException, handlerEx);
Interlocked.CompareExchange(ref pendingBackgroundFault, combined, null);
}
}

/// <summary>
/// Atomically tears down the active stream and resets the initialization flag.
/// </summary>
/// <remarks>
/// Serialized under <see cref="semaphore"/> (the same lock used by <see cref="GetStreamAsync"/>)
/// so a concurrent <see cref="SubscribeAsync"/> cannot observe <see cref="hasInitialized"/> reset
/// while <see cref="clientStream"/> still holds the stale reference.
/// </remarks>
private async Task ResetStreamStateAsync()
{
await semaphore.WaitAsync().ConfigureAwait(false);
try
{
var old = clientStream;
clientStream = null;
Interlocked.Exchange(ref hasInitialized, 0);
old?.Dispose();
}
finally
{
semaphore.Release();
}
}

Expand Down
25 changes: 25 additions & 0 deletions src/Dapr.Messaging/PublishSubscribe/SubscriptionErrorHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

namespace Dapr.Messaging.PublishSubscribe;

/// <summary>
/// A delegate that handles errors occurring during an active subscription.
/// </summary>
/// <param name="exception">The <see cref="DaprException"/> wrapping the original error.</param>
/// <remarks>
/// This handler is invoked on a thread pool thread. Implementations should be thread-safe.
/// If the returned task faults, the handler's exception is combined with the original fault and
/// surfaced as an <see cref="AggregateException"/> on the next call to <c>SubscribeAsync</c>.
/// </remarks>
public delegate Task SubscriptionErrorHandler(DaprException exception);
Loading
Loading