-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Expand file tree
/
Copy pathcallback_execution.go
More file actions
269 lines (242 loc) · 9.52 KB
/
callback_execution.go
File metadata and controls
269 lines (242 loc) · 9.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
package callback
import (
"fmt"
"github.com/nexus-rpc/sdk-go/nexus"
callbackpb "go.temporal.io/api/callback/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
failurepb "go.temporal.io/api/failure/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/server/chasm"
callbackspb "go.temporal.io/server/chasm/lib/callback/gen/callbackpb/v1"
commonnexus "go.temporal.io/server/common/nexus"
"go.temporal.io/server/common/nexus/nexusrpc"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
var (
executionStatusSearchAttribute = chasm.NewSearchAttributeKeyword(
"ExecutionStatus",
chasm.SearchAttributeFieldLowCardinalityKeyword01,
)
_ chasm.RootComponent = (*CallbackExecution)(nil)
_ CompletionSource = (*CallbackExecution)(nil)
_ chasm.VisibilitySearchAttributesProvider = (*CallbackExecution)(nil)
_ chasm.VisibilityMemoProvider = (*CallbackExecution)(nil)
)
// CallbackExecution is a top-level CHASM entity that manages a standalone callback.
// It owns a child Callback component and implements CompletionSource to provide
// stored Nexus completion data for invocation.
type CallbackExecution struct {
chasm.UnimplementedComponent
// Persisted state
*callbackspb.CallbackExecutionState
// Child callback component
Callback chasm.Field[*Callback]
// Visibility sub-component for search attributes and memo indexing.
Visibility chasm.Field[*chasm.Visibility]
}
// StartCallbackExecutionInput contains validated fields from the start request.
type StartCallbackExecutionInput struct {
CallbackID string
RequestID string
Callback *callbackspb.Callback
SuccessCompletion *commonpb.Payload
FailureCompletion *failurepb.Failure
ScheduleToCloseTimeout *durationpb.Duration //nolint:revive // keeping full type name for clarity
SearchAttributes map[string]*commonpb.Payload
}
// CreateCallbackExecution constructs a new CallbackExecution entity with a child Callback.
// The child Callback is immediately transitioned to SCHEDULED state to begin invocation.
func CreateCallbackExecution(
ctx chasm.MutableContext,
input *StartCallbackExecutionInput,
) (*CallbackExecution, error) {
now := timestamppb.Now()
state := &callbackspb.CallbackExecutionState{
CallbackId: input.CallbackID,
CreateTime: now,
ScheduleToCloseTimeout: input.ScheduleToCloseTimeout,
}
// Store the completion payload.
if input.SuccessCompletion != nil {
state.Completion = &callbackspb.CallbackExecutionState_SuccessCompletion{
SuccessCompletion: input.SuccessCompletion,
}
} else if input.FailureCompletion != nil {
state.Completion = &callbackspb.CallbackExecutionState_FailureCompletion{
FailureCompletion: input.FailureCompletion,
}
}
// Create child Callback component.
cb := NewCallback(
input.RequestID,
now,
&callbackspb.CallbackState{},
input.Callback,
)
exec := &CallbackExecution{
CallbackExecutionState: state,
}
exec.Callback = chasm.NewComponentField(ctx, cb)
visibility := chasm.NewVisibilityWithData(ctx, input.SearchAttributes, nil)
exec.Visibility = chasm.NewComponentField(ctx, visibility)
// Immediately schedule the callback for invocation.
if err := TransitionScheduled.Apply(cb, ctx, EventScheduled{}); err != nil {
return nil, fmt.Errorf("failed to schedule callback: %w", err)
}
// Schedule the timeout task if ScheduleToCloseTimeout is set.
if input.ScheduleToCloseTimeout != nil {
if timeout := input.ScheduleToCloseTimeout.AsDuration(); timeout > 0 {
ctx.AddTask(
cb,
chasm.TaskAttributes{
ScheduledTime: now.AsTime().Add(timeout),
},
&callbackspb.ScheduleToCloseTimeoutTask{},
)
}
}
return exec, nil
}
// LifecycleState delegates to the child Callback's lifecycle state.
func (e *CallbackExecution) LifecycleState(ctx chasm.Context) chasm.LifecycleState {
cb := e.Callback.Get(ctx)
return cb.LifecycleState(ctx)
}
// Terminate forcefully terminates the callback execution.
// If already terminated with the same request ID, this is a no-op.
// If already terminated with a different request ID, returns FailedPrecondition.
func (e *CallbackExecution) Terminate(
ctx chasm.MutableContext,
req chasm.TerminateComponentRequest,
) (chasm.TerminateComponentResponse, error) {
cb := e.Callback.Get(ctx)
if cb.LifecycleState(ctx).IsClosed() {
if e.TerminateRequestId == "" {
// Completed organically (succeeded/failed/timed out), not via Terminate.
return chasm.TerminateComponentResponse{}, serviceerror.NewFailedPreconditionf(
"callback execution already in terminal state %v", cb.Status)
}
if e.TerminateRequestId != req.RequestID {
return chasm.TerminateComponentResponse{}, serviceerror.NewFailedPreconditionf(
"already terminated with request ID %s", e.TerminateRequestId)
}
return chasm.TerminateComponentResponse{}, nil
}
if err := TransitionTerminated.Apply(cb, ctx, EventTerminated{Reason: req.Reason}); err != nil {
return chasm.TerminateComponentResponse{}, fmt.Errorf("failed to terminate callback: %w", err)
}
e.TerminateRequestId = req.RequestID
return chasm.TerminateComponentResponse{}, nil
}
// Describe returns CallbackExecutionInfo for the describe RPC.
func (e *CallbackExecution) Describe(ctx chasm.Context) (*callbackpb.CallbackExecutionInfo, error) {
cb := e.Callback.Get(ctx)
apiCb, err := cb.ToAPICallback()
if err != nil {
return nil, err
}
info := &callbackpb.CallbackExecutionInfo{
CallbackId: e.CallbackId,
RunId: ctx.ExecutionKey().RunID,
Callback: apiCb,
State: callbackStatusToAPIExecutionState(cb.Status),
Attempt: cb.Attempt,
CreateTime: e.CreateTime,
LastAttemptCompleteTime: cb.LastAttemptCompleteTime,
LastAttemptFailure: cb.LastAttemptFailure,
NextAttemptScheduleTime: cb.NextAttemptScheduleTime,
CloseTime: cb.CloseTime,
ScheduleToCloseTimeout: e.ScheduleToCloseTimeout,
StateTransitionCount: ctx.StateTransitionCount(),
}
return info, nil
}
// GetOutcome returns the callback execution outcome if the execution is in a terminal state.
func (e *CallbackExecution) GetOutcome(ctx chasm.Context) (*callbackpb.CallbackExecutionOutcome, error) {
cb := e.Callback.Get(ctx)
switch cb.Status {
case callbackspb.CALLBACK_STATUS_SUCCEEDED:
return &callbackpb.CallbackExecutionOutcome{
Value: &callbackpb.CallbackExecutionOutcome_Success{},
}, nil
case callbackspb.CALLBACK_STATUS_FAILED,
callbackspb.CALLBACK_STATUS_TERMINATED:
return &callbackpb.CallbackExecutionOutcome{
Value: &callbackpb.CallbackExecutionOutcome_Failure{
Failure: cb.LastAttemptFailure,
},
}, nil
default:
return nil, nil
}
}
// callbackStatusToAPIExecutionState maps internal CallbackStatus to public API CallbackExecutionState.
func callbackStatusToAPIExecutionState(status callbackspb.CallbackStatus) enumspb.CallbackExecutionState {
switch status {
case callbackspb.CALLBACK_STATUS_STANDBY,
callbackspb.CALLBACK_STATUS_SCHEDULED,
callbackspb.CALLBACK_STATUS_BACKING_OFF:
return enumspb.CALLBACK_EXECUTION_STATE_RUNNING
case callbackspb.CALLBACK_STATUS_FAILED:
return enumspb.CALLBACK_EXECUTION_STATE_FAILED
case callbackspb.CALLBACK_STATUS_SUCCEEDED:
return enumspb.CALLBACK_EXECUTION_STATE_SUCCEEDED
case callbackspb.CALLBACK_STATUS_TERMINATED:
return enumspb.CALLBACK_EXECUTION_STATE_TERMINATED
default:
return enumspb.CALLBACK_EXECUTION_STATE_UNSPECIFIED
}
}
// SearchAttributes implements chasm.VisibilitySearchAttributesProvider.
func (e *CallbackExecution) SearchAttributes(ctx chasm.Context) []chasm.SearchAttributeKeyValue {
cb := e.Callback.Get(ctx)
return []chasm.SearchAttributeKeyValue{
executionStatusSearchAttribute.Value(callbackStatusToAPIExecutionState(cb.Status).String()),
}
}
// Memo implements chasm.VisibilityMemoProvider. Returns the CallbackExecutionListInfo
// as the memo for visibility queries.
func (e *CallbackExecution) Memo(ctx chasm.Context) proto.Message {
cb := e.Callback.Get(ctx)
return &callbackpb.CallbackExecutionListInfo{
CallbackId: e.CallbackId,
State: callbackStatusToAPIExecutionState(cb.Status),
CreateTime: e.CreateTime,
CloseTime: cb.CloseTime,
}
}
// GetNexusCompletion implements CompletionSource. It converts the stored completion
// payload to nexusrpc.CompleteOperationOptions for use by the Callback invocation logic.
func (e *CallbackExecution) GetNexusCompletion(
ctx chasm.Context,
requestID string,
) (nexusrpc.CompleteOperationOptions, error) {
opts := nexusrpc.CompleteOperationOptions{
StartTime: e.CreateTime.AsTime(),
}
switch c := e.Completion.(type) {
case *callbackspb.CallbackExecutionState_SuccessCompletion:
opts.Result = c.SuccessCompletion
return opts, nil
case *callbackspb.CallbackExecutionState_FailureCompletion:
f, err := commonnexus.TemporalFailureToNexusFailure(c.FailureCompletion)
if err != nil {
return nexusrpc.CompleteOperationOptions{}, fmt.Errorf("failed to convert failure: %w", err)
}
opErr := &nexus.OperationError{
State: nexus.OperationStateFailed,
Message: "operation failed",
Cause: &nexus.FailureError{Failure: f},
}
if err := nexusrpc.MarkAsWrapperError(nexusrpc.DefaultFailureConverter(), opErr); err != nil {
return nexusrpc.CompleteOperationOptions{}, fmt.Errorf("failed to mark wrapper error: %w", err)
}
opts.Error = opErr
return opts, nil
default:
return nexusrpc.CompleteOperationOptions{}, fmt.Errorf("empty completion payload")
}
}