Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions src/Dapr.Workflow.Grpc/orchestrator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,14 @@ message TimerCreatedEvent {
// If defined, indicates that this task was the starting point of a new
// workflow execution as the result of a rerun operation.
optional RerunParentInstanceInfo rerunParentInstanceInfo = 3;

// Indicates the reason this timer was created.
oneof origin {
TimerOriginCreateTimer originCreateTimer = 4;
TimerOriginExternalEvent originExternalEvent = 5;
TimerOriginActivityRetry originActivityRetry = 6;
TimerOriginChildWorkflowRetry originChildWorkflowRetry = 7;
}
}

message TimerFiredEvent {
Expand Down Expand Up @@ -315,9 +323,36 @@ message CreateSubOrchestrationAction {
optional TaskRouter router = 5;
}

// Timer created explicitly by the workflow via CreateTimer().
message TimerOriginCreateTimer {
}

// Timer created to track the timeout of a WaitForExternalEvent call.
message TimerOriginExternalEvent {
string name = 1;
}

// Timer created to manage the retry delay of an activity.
message TimerOriginActivityRetry {
string taskExecutionId = 1;
}

// Timer created to manage the retry delay of a child workflow.
message TimerOriginChildWorkflowRetry {
string instanceId = 1;
}

message CreateTimerAction {
google.protobuf.Timestamp fireAt = 1;
optional string name = 2;

// Indicates the reason this timer is being created.
oneof origin {
TimerOriginCreateTimer originCreateTimer = 3;
TimerOriginExternalEvent originExternalEvent = 4;
TimerOriginActivityRetry originActivityRetry = 5;
TimerOriginChildWorkflowRetry originChildWorkflowRetry = 6;
}
}

message SendEventAction {
Expand Down
20 changes: 16 additions & 4 deletions src/Dapr.Workflow/Client/RetryInterceptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Threading;
using System.Threading.Tasks;
using Dapr.Workflow;
using Dapr.Workflow.Worker.Internal;

namespace Dapr.Workflow.Client;

Expand All @@ -13,7 +14,16 @@ namespace Dapr.Workflow.Client;
/// <param name="context">The workflow context used for scheduling timers.</param>
/// <param name="retryPolicy">The retry policy to apply.</param>
/// <param name="retryCall">The operation to invoke and retry.</param>
public sealed class RetryInterceptor<T>(IWorkflowContext context, WorkflowRetryPolicy retryPolicy, Func<Task<T>> retryCall)
/// <param name="retryTimerFactory">
/// Creates the retry delay timer with the appropriate origin metadata.
/// Receives the delay <see cref="TimeSpan"/> and returns a <see cref="Task"/> that completes when the timer fires.
/// When <c>null</c>, falls back to a plain <see cref="WorkflowContext.CreateTimer(TimeSpan, CancellationToken)"/> call.
/// </param>
public sealed class RetryInterceptor<T>(
IWorkflowContext context,
WorkflowRetryPolicy retryPolicy,
Func<Task<T>> retryCall,
Func<TimeSpan, Task>? retryTimerFactory = null)
{
/// <summary>
/// Executes the operation and applies the retry policy when failures occur.
Expand Down Expand Up @@ -45,7 +55,11 @@ public sealed class RetryInterceptor<T>(IWorkflowContext context, WorkflowRetryP
if (nextDelay == TimeSpan.Zero)
break;

if (context is WorkflowContext workflowContext)
if (retryTimerFactory != null)
{
await retryTimerFactory(nextDelay);
}
else if (context is WorkflowContext workflowContext)
{
await workflowContext.CreateTimer(nextDelay);
}
Expand Down Expand Up @@ -99,5 +113,3 @@ private TimeSpan ComputeNextDelay(int attempt, DateTime firstAttempt, Exception
return nextDelay;
}
}


83 changes: 83 additions & 0 deletions src/Dapr.Workflow/Worker/Internal/TimerOriginHelpers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using Dapr.DurableTask.Protobuf;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;

namespace Dapr.Workflow.Worker.Internal;

/// <summary>
/// Static helpers for timer origin metadata and optional-timer recognition.
/// Kept separate from <see cref="WorkflowOrchestrationContext"/> to avoid
/// mixing protobuf types into the context's public surface.
/// </summary>
internal static class TimerOriginHelpers
{
/// <summary>
/// Sentinel fireAt value for indefinite external event timers.
/// Must be exactly 9999-12-31T23:59:59.999999999Z.
/// </summary>
internal static readonly Timestamp ExternalEventIndefiniteFireAt =
new()
{
Seconds = 253402300799, // 9999-12-31T23:59:59Z
Nanos = 999999999
};

/// <summary>
/// Sets the appropriate origin field on a <see cref="CreateTimerAction"/> based on the
/// runtime type of the supplied origin message.
/// </summary>
internal static void SetTimerOrigin(CreateTimerAction action, IMessage origin)
{
switch (origin)
{
case TimerOriginCreateTimer createTimer:
action.OriginCreateTimer = createTimer;
break;
case TimerOriginExternalEvent externalEvent:
action.OriginExternalEvent = externalEvent;
break;
case TimerOriginActivityRetry activityRetry:
action.OriginActivityRetry = activityRetry;
break;
case TimerOriginChildWorkflowRetry childWorkflowRetry:
action.OriginChildWorkflowRetry = childWorkflowRetry;
break;
}
}

/// <summary>
/// Determines whether a <see cref="OrchestratorAction"/> is an optional external event timer
/// (sentinel fireAt + ExternalEvent origin).
/// </summary>
internal static bool IsOptionalExternalEventTimerAction(OrchestratorAction action)
{
return action.CreateTimer is { } timer
&& timer.OriginCase == CreateTimerAction.OriginOneofCase.OriginExternalEvent
&& timer.FireAt != null
&& timer.FireAt.Equals(ExternalEventIndefiniteFireAt);
}

/// <summary>
/// Determines whether a <see cref="TimerCreatedEvent"/> is an optional external event timer
/// (sentinel fireAt + ExternalEvent origin).
/// </summary>
internal static bool IsOptionalExternalEventTimerCreatedEvent(TimerCreatedEvent timerCreated)
{
return timerCreated.OriginCase == TimerCreatedEvent.OriginOneofCase.OriginExternalEvent
&& timerCreated.FireAt != null
&& timerCreated.FireAt.Equals(ExternalEventIndefiniteFireAt);
}
}
Loading
Loading