Skip to content

Commit 36d259c

Browse files
Implement stand-alone callback executions
Add standalone callback execution support as first-class CHASM entities independent of workflows. A standalone callback is a durable Nexus completion delivery mechanism managed through new WorkflowService RPCs.
1 parent f8bbf7c commit 36d259c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+4775
-101
lines changed

chasm/callback.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package chasm
22

33
const (
4-
CallbackLibraryName = "callback"
5-
CallbackComponentName = "callback"
4+
CallbackLibraryName = "callback"
5+
CallbackComponentName = "callback"
6+
CallbackExecutionComponentName = "callback_execution"
67
)
78

89
var (
9-
CallbackComponentID = GenerateTypeID(FullyQualifiedName(CallbackLibraryName, CallbackComponentName))
10+
CallbackComponentID = GenerateTypeID(FullyQualifiedName(CallbackLibraryName, CallbackComponentName))
11+
CallbackExecutionComponentID = GenerateTypeID(FullyQualifiedName(CallbackLibraryName, CallbackExecutionComponentName))
1012
)
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
package callback
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/nexus-rpc/sdk-go/nexus"
7+
callbackpb "go.temporal.io/api/callback/v1"
8+
commonpb "go.temporal.io/api/common/v1"
9+
enumspb "go.temporal.io/api/enums/v1"
10+
failurepb "go.temporal.io/api/failure/v1"
11+
"go.temporal.io/api/serviceerror"
12+
"go.temporal.io/server/chasm"
13+
callbackspb "go.temporal.io/server/chasm/lib/callback/gen/callbackpb/v1"
14+
commonnexus "go.temporal.io/server/common/nexus"
15+
"go.temporal.io/server/common/nexus/nexusrpc"
16+
"google.golang.org/protobuf/proto"
17+
"google.golang.org/protobuf/types/known/durationpb"
18+
"google.golang.org/protobuf/types/known/timestamppb"
19+
)
20+
21+
var (
22+
executionStatusSearchAttribute = chasm.NewSearchAttributeKeyword(
23+
"ExecutionStatus",
24+
chasm.SearchAttributeFieldLowCardinalityKeyword01,
25+
)
26+
27+
_ chasm.RootComponent = (*CallbackExecution)(nil)
28+
_ CompletionSource = (*CallbackExecution)(nil)
29+
_ chasm.VisibilitySearchAttributesProvider = (*CallbackExecution)(nil)
30+
_ chasm.VisibilityMemoProvider = (*CallbackExecution)(nil)
31+
)
32+
33+
// CallbackExecution is a top-level CHASM entity that manages a standalone callback.
34+
// It owns a child Callback component and implements CompletionSource to provide
35+
// stored Nexus completion data for invocation.
36+
type CallbackExecution struct {
37+
chasm.UnimplementedComponent
38+
39+
// Persisted state
40+
*callbackspb.CallbackExecutionState
41+
42+
// Child callback component
43+
Callback chasm.Field[*Callback]
44+
45+
// Visibility sub-component for search attributes and memo indexing.
46+
Visibility chasm.Field[*chasm.Visibility]
47+
}
48+
49+
// StartCallbackExecutionInput contains validated fields from the start request.
50+
type StartCallbackExecutionInput struct {
51+
CallbackID string
52+
RequestID string
53+
Callback *callbackspb.Callback
54+
SuccessCompletion *commonpb.Payload
55+
FailureCompletion *failurepb.Failure
56+
ScheduleToCloseTimeout *durationpb.Duration //nolint:revive // keeping full type name for clarity
57+
SearchAttributes map[string]*commonpb.Payload
58+
}
59+
60+
// CreateCallbackExecution constructs a new CallbackExecution entity with a child Callback.
61+
// The child Callback is immediately transitioned to SCHEDULED state to begin invocation.
62+
func CreateCallbackExecution(
63+
ctx chasm.MutableContext,
64+
input *StartCallbackExecutionInput,
65+
) (*CallbackExecution, error) {
66+
now := timestamppb.Now()
67+
68+
state := &callbackspb.CallbackExecutionState{
69+
CallbackId: input.CallbackID,
70+
CreateTime: now,
71+
ScheduleToCloseTimeout: input.ScheduleToCloseTimeout,
72+
}
73+
74+
// Store the completion payload.
75+
if input.SuccessCompletion != nil {
76+
state.Completion = &callbackspb.CallbackExecutionState_SuccessCompletion{
77+
SuccessCompletion: input.SuccessCompletion,
78+
}
79+
} else if input.FailureCompletion != nil {
80+
state.Completion = &callbackspb.CallbackExecutionState_FailureCompletion{
81+
FailureCompletion: input.FailureCompletion,
82+
}
83+
}
84+
85+
// Create child Callback component.
86+
cb := NewCallback(
87+
input.RequestID,
88+
now,
89+
&callbackspb.CallbackState{},
90+
input.Callback,
91+
)
92+
93+
exec := &CallbackExecution{
94+
CallbackExecutionState: state,
95+
}
96+
exec.Callback = chasm.NewComponentField(ctx, cb)
97+
98+
visibility := chasm.NewVisibilityWithData(ctx, input.SearchAttributes, nil)
99+
exec.Visibility = chasm.NewComponentField(ctx, visibility)
100+
101+
// Immediately schedule the callback for invocation.
102+
if err := TransitionScheduled.Apply(cb, ctx, EventScheduled{}); err != nil {
103+
return nil, fmt.Errorf("failed to schedule callback: %w", err)
104+
}
105+
106+
// Schedule the timeout task if ScheduleToCloseTimeout is set.
107+
if input.ScheduleToCloseTimeout != nil {
108+
if timeout := input.ScheduleToCloseTimeout.AsDuration(); timeout > 0 {
109+
ctx.AddTask(
110+
cb,
111+
chasm.TaskAttributes{
112+
ScheduledTime: now.AsTime().Add(timeout),
113+
},
114+
&callbackspb.ScheduleToCloseTimeoutTask{},
115+
)
116+
}
117+
}
118+
119+
return exec, nil
120+
}
121+
122+
// LifecycleState delegates to the child Callback's lifecycle state.
123+
func (e *CallbackExecution) LifecycleState(ctx chasm.Context) chasm.LifecycleState {
124+
cb := e.Callback.Get(ctx)
125+
return cb.LifecycleState(ctx)
126+
}
127+
128+
// Terminate forcefully terminates the callback execution.
129+
// If already terminated with the same request ID, this is a no-op.
130+
// If already terminated with a different request ID, returns FailedPrecondition.
131+
func (e *CallbackExecution) Terminate(
132+
ctx chasm.MutableContext,
133+
req chasm.TerminateComponentRequest,
134+
) (chasm.TerminateComponentResponse, error) {
135+
cb := e.Callback.Get(ctx)
136+
if cb.LifecycleState(ctx).IsClosed() {
137+
if e.TerminateRequestId == "" {
138+
// Completed organically (succeeded/failed/timed out), not via Terminate.
139+
return chasm.TerminateComponentResponse{}, serviceerror.NewFailedPreconditionf(
140+
"callback execution already in terminal state %v", cb.Status)
141+
}
142+
if e.TerminateRequestId != req.RequestID {
143+
return chasm.TerminateComponentResponse{}, serviceerror.NewFailedPreconditionf(
144+
"already terminated with request ID %s", e.TerminateRequestId)
145+
}
146+
return chasm.TerminateComponentResponse{}, nil
147+
}
148+
if err := TransitionTerminated.Apply(cb, ctx, EventTerminated{Reason: req.Reason}); err != nil {
149+
return chasm.TerminateComponentResponse{}, fmt.Errorf("failed to terminate callback: %w", err)
150+
}
151+
e.TerminateRequestId = req.RequestID
152+
return chasm.TerminateComponentResponse{}, nil
153+
}
154+
155+
// Describe returns CallbackExecutionInfo for the describe RPC.
156+
func (e *CallbackExecution) Describe(ctx chasm.Context) (*callbackpb.CallbackExecutionInfo, error) {
157+
cb := e.Callback.Get(ctx)
158+
apiCb, err := cb.ToAPICallback()
159+
if err != nil {
160+
return nil, err
161+
}
162+
163+
info := &callbackpb.CallbackExecutionInfo{
164+
CallbackId: e.CallbackId,
165+
RunId: ctx.ExecutionKey().RunID,
166+
Callback: apiCb,
167+
State: callbackStatusToAPIExecutionState(cb.Status),
168+
Attempt: cb.Attempt,
169+
CreateTime: e.CreateTime,
170+
LastAttemptCompleteTime: cb.LastAttemptCompleteTime,
171+
LastAttemptFailure: cb.LastAttemptFailure,
172+
NextAttemptScheduleTime: cb.NextAttemptScheduleTime,
173+
CloseTime: cb.CloseTime,
174+
ScheduleToCloseTimeout: e.ScheduleToCloseTimeout,
175+
StateTransitionCount: ctx.StateTransitionCount(),
176+
}
177+
return info, nil
178+
}
179+
180+
// GetOutcome returns the callback execution outcome if the execution is in a terminal state.
181+
func (e *CallbackExecution) GetOutcome(ctx chasm.Context) (*callbackpb.CallbackExecutionOutcome, error) {
182+
cb := e.Callback.Get(ctx)
183+
switch cb.Status {
184+
case callbackspb.CALLBACK_STATUS_SUCCEEDED:
185+
return &callbackpb.CallbackExecutionOutcome{
186+
Value: &callbackpb.CallbackExecutionOutcome_Success{},
187+
}, nil
188+
case callbackspb.CALLBACK_STATUS_FAILED,
189+
callbackspb.CALLBACK_STATUS_TERMINATED:
190+
return &callbackpb.CallbackExecutionOutcome{
191+
Value: &callbackpb.CallbackExecutionOutcome_Failure{
192+
Failure: cb.LastAttemptFailure,
193+
},
194+
}, nil
195+
default:
196+
return nil, nil
197+
}
198+
}
199+
200+
// callbackStatusToAPIExecutionState maps internal CallbackStatus to public API CallbackExecutionState.
201+
func callbackStatusToAPIExecutionState(status callbackspb.CallbackStatus) enumspb.CallbackExecutionState {
202+
switch status {
203+
case callbackspb.CALLBACK_STATUS_STANDBY,
204+
callbackspb.CALLBACK_STATUS_SCHEDULED,
205+
callbackspb.CALLBACK_STATUS_BACKING_OFF:
206+
return enumspb.CALLBACK_EXECUTION_STATE_RUNNING
207+
case callbackspb.CALLBACK_STATUS_FAILED:
208+
return enumspb.CALLBACK_EXECUTION_STATE_FAILED
209+
case callbackspb.CALLBACK_STATUS_SUCCEEDED:
210+
return enumspb.CALLBACK_EXECUTION_STATE_SUCCEEDED
211+
case callbackspb.CALLBACK_STATUS_TERMINATED:
212+
return enumspb.CALLBACK_EXECUTION_STATE_TERMINATED
213+
default:
214+
return enumspb.CALLBACK_EXECUTION_STATE_UNSPECIFIED
215+
}
216+
}
217+
218+
// SearchAttributes implements chasm.VisibilitySearchAttributesProvider.
219+
func (e *CallbackExecution) SearchAttributes(ctx chasm.Context) []chasm.SearchAttributeKeyValue {
220+
cb := e.Callback.Get(ctx)
221+
return []chasm.SearchAttributeKeyValue{
222+
executionStatusSearchAttribute.Value(callbackStatusToAPIExecutionState(cb.Status).String()),
223+
}
224+
}
225+
226+
// Memo implements chasm.VisibilityMemoProvider. Returns the CallbackExecutionListInfo
227+
// as the memo for visibility queries.
228+
func (e *CallbackExecution) Memo(ctx chasm.Context) proto.Message {
229+
cb := e.Callback.Get(ctx)
230+
return &callbackpb.CallbackExecutionListInfo{
231+
CallbackId: e.CallbackId,
232+
State: callbackStatusToAPIExecutionState(cb.Status),
233+
CreateTime: e.CreateTime,
234+
CloseTime: cb.CloseTime,
235+
}
236+
}
237+
238+
// GetNexusCompletion implements CompletionSource. It converts the stored completion
239+
// payload to nexusrpc.CompleteOperationOptions for use by the Callback invocation logic.
240+
func (e *CallbackExecution) GetNexusCompletion(
241+
ctx chasm.Context,
242+
requestID string,
243+
) (nexusrpc.CompleteOperationOptions, error) {
244+
opts := nexusrpc.CompleteOperationOptions{
245+
StartTime: e.CreateTime.AsTime(),
246+
}
247+
switch c := e.Completion.(type) {
248+
case *callbackspb.CallbackExecutionState_SuccessCompletion:
249+
opts.Result = c.SuccessCompletion
250+
return opts, nil
251+
case *callbackspb.CallbackExecutionState_FailureCompletion:
252+
f, err := commonnexus.TemporalFailureToNexusFailure(c.FailureCompletion)
253+
if err != nil {
254+
return nexusrpc.CompleteOperationOptions{}, fmt.Errorf("failed to convert failure: %w", err)
255+
}
256+
opErr := &nexus.OperationError{
257+
State: nexus.OperationStateFailed,
258+
Message: "operation failed",
259+
Cause: &nexus.FailureError{Failure: f},
260+
}
261+
if err := nexusrpc.MarkAsWrapperError(nexusrpc.DefaultFailureConverter(), opErr); err != nil {
262+
return nexusrpc.CompleteOperationOptions{}, fmt.Errorf("failed to mark wrapper error: %w", err)
263+
}
264+
opts.Error = opErr
265+
return opts, nil
266+
default:
267+
return nexusrpc.CompleteOperationOptions{}, fmt.Errorf("empty completion payload")
268+
}
269+
}

chasm/lib/callback/chasm_invocation.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,14 @@ func (c chasmInvocation) Invoke(
5656
task *callbackspb.InvocationTask,
5757
taskAttr chasm.TaskAttributes,
5858
) invocationResult {
59-
header := nexus.Header(c.nexus.GetHeader())
60-
if header == nil {
61-
header = nexus.Header{}
59+
// Get the token from the dedicated Token field, falling back to the header for backward compat.
60+
encodedRef := c.nexus.GetToken()
61+
if encodedRef == "" {
62+
header := nexus.Header(c.nexus.GetHeader())
63+
if header != nil {
64+
encodedRef = header.Get(commonnexus.CallbackTokenHeader)
65+
}
6266
}
63-
64-
// Get back the base64-encoded ComponentRef from the header.
65-
encodedRef := header.Get(commonnexus.CallbackTokenHeader)
6667
if encodedRef == "" {
6768
return invocationResultFail{logInternalError(h.logger, "callback missing token", nil)}
6869
}

chasm/lib/callback/component.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,11 @@ func (c *Callback) LifecycleState(_ chasm.Context) chasm.LifecycleState {
5252
switch c.Status {
5353
case callbackspb.CALLBACK_STATUS_SUCCEEDED:
5454
return chasm.LifecycleStateCompleted
55-
case callbackspb.CALLBACK_STATUS_FAILED:
55+
case callbackspb.CALLBACK_STATUS_FAILED,
56+
callbackspb.CALLBACK_STATUS_TERMINATED:
57+
// TODO: Use chasm.LifecycleStateTerminated when it's available (currently commented out
58+
// in chasm/component.go:70). For now, LifecycleStateFailed is functionally correct
59+
// as IsClosed() returns true for all states >= LifecycleStateCompleted.
5660
return chasm.LifecycleStateFailed
5761
default:
5862
return chasm.LifecycleStateRunning
@@ -117,6 +121,11 @@ func (c *Callback) saveResult(
117121
ctx chasm.MutableContext,
118122
input saveResultInput,
119123
) (chasm.NoValue, error) {
124+
// If the callback was terminated while the invocation was in-flight,
125+
// the result is no longer relevant — drop it silently.
126+
if c.LifecycleState(ctx).IsClosed() {
127+
return nil, nil
128+
}
120129
switch r := input.result.(type) {
121130
case invocationResultOK:
122131
err := TransitionSucceeded.Apply(c, ctx, EventSucceeded{Time: ctx.Now(c)})

0 commit comments

Comments
 (0)