Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions chasm/callback.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package chasm

const (
CallbackLibraryName = "callback"
CallbackComponentName = "callback"
CallbackLibraryName = "callback"
CallbackComponentName = "callback"
CallbackExecutionComponentName = "callback_execution"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There shouldn't be a separate component for the execution. We can reuse the same component.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It all could be one component, I don't think that is a good idea

)

var (
CallbackComponentID = GenerateTypeID(FullyQualifiedName(CallbackLibraryName, CallbackComponentName))
CallbackComponentID = GenerateTypeID(FullyQualifiedName(CallbackLibraryName, CallbackComponentName))
CallbackExecutionComponentID = GenerateTypeID(FullyQualifiedName(CallbackLibraryName, CallbackExecutionComponentName))
)
290 changes: 290 additions & 0 deletions chasm/lib/callback/callback_execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
package callback

import (
"fmt"

"github.com/nexus-rpc/sdk-go/nexus"
callbackpb "go.temporal.io/api/callback/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
failurepb "go.temporal.io/api/failure/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/server/chasm"
callbackspb "go.temporal.io/server/chasm/lib/callback/gen/callbackpb/v1"
commonnexus "go.temporal.io/server/common/nexus"
"go.temporal.io/server/common/nexus/nexusrpc"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

var (
executionStatusSearchAttribute = chasm.NewSearchAttributeKeyword(
"ExecutionStatus",
chasm.SearchAttributeFieldLowCardinalityKeyword01,
)

_ chasm.RootComponent = (*CallbackExecution)(nil)
_ CompletionSource = (*CallbackExecution)(nil)
_ chasm.VisibilitySearchAttributesProvider = (*CallbackExecution)(nil)
_ chasm.VisibilityMemoProvider = (*CallbackExecution)(nil)
)

// CallbackExecution is a top-level CHASM entity that manages a standalone callback.
// It owns a child Callback component and implements CompletionSource to provide
// stored Nexus completion data for invocation.
type CallbackExecution struct {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other components that we support both standalone and embdded are structured differently. We don't create these wrapper components for them and instead make the embedded component multi-purpose. Not that what you did is incorrect, it's just different than what we have done in the past and IMHO best to keep the consistent approach.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is a bad approach personally, the only precedent we have is for stand alone activities no? or is there other components that did that?

chasm.UnimplementedComponent

// Persisted state
*callbackspb.CallbackExecutionState

// Child callback component
Callback chasm.Field[*Callback]

// Visibility sub-component for search attributes and memo indexing.
Visibility chasm.Field[*chasm.Visibility]
}

// StartCallbackExecutionInput contains validated fields from the start request.
type StartCallbackExecutionInput struct {
CallbackID string
RequestID string
Callback *callbackspb.Callback
SuccessCompletion *commonpb.Payload
FailureCompletion *failurepb.Failure
ScheduleToCloseTimeout *durationpb.Duration //nolint:revive // keeping full type name for clarity
SearchAttributes map[string]*commonpb.Payload
}

// CreateCallbackExecution constructs a new CallbackExecution entity with a child Callback.
// The child Callback is immediately transitioned to SCHEDULED state to begin invocation.
func CreateCallbackExecution(
ctx chasm.MutableContext,
input *StartCallbackExecutionInput,
) (*CallbackExecution, error) {
now := timestamppb.Now()

state := &callbackspb.CallbackExecutionState{
CallbackId: input.CallbackID,
CreateTime: now,
ScheduleToCloseTimeout: input.ScheduleToCloseTimeout,
}

// Store the completion payload.
if input.SuccessCompletion != nil {
state.Completion = &callbackspb.CallbackExecutionState_SuccessCompletion{
SuccessCompletion: input.SuccessCompletion,
}
} else if input.FailureCompletion != nil {
state.Completion = &callbackspb.CallbackExecutionState_FailureCompletion{
FailureCompletion: input.FailureCompletion,
}
}
Comment on lines +75 to +83
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you rewrite to reuse the callback component, make the completion an embedded field because it can save some memory for transitions and APIs that don't require this potentially large piece of data.


// Create child Callback component.
cb := NewCallback(
input.RequestID,
now,
&callbackspb.CallbackState{},
input.Callback,
)

exec := &CallbackExecution{
CallbackExecutionState: state,
}
exec.Callback = chasm.NewComponentField(ctx, cb)

visibility := chasm.NewVisibilityWithData(ctx, input.SearchAttributes, nil)
exec.Visibility = chasm.NewComponentField(ctx, visibility)

// Immediately schedule the callback for invocation.
if err := TransitionScheduled.Apply(cb, ctx, EventScheduled{}); err != nil {
return nil, fmt.Errorf("failed to schedule callback: %w", err)
}

// Schedule the timeout task if ScheduleToCloseTimeout is set.
if input.ScheduleToCloseTimeout != nil {
if timeout := input.ScheduleToCloseTimeout.AsDuration(); timeout > 0 {
ctx.AddTask(
cb,
chasm.TaskAttributes{
ScheduledTime: now.AsTime().Add(timeout),
},
&callbackspb.ScheduleToCloseTimeoutTask{},
)
}
}

return exec, nil
}

// LifecycleState delegates to the child Callback's lifecycle state.
func (e *CallbackExecution) LifecycleState(ctx chasm.Context) chasm.LifecycleState {
cb := e.Callback.Get(ctx)
return cb.LifecycleState(ctx)
}

// Terminate forcefully terminates the callback execution.
// If already terminated with the same request ID, this is a no-op.
// If already terminated with a different request ID, returns FailedPrecondition.
func (e *CallbackExecution) Terminate(
ctx chasm.MutableContext,
req chasm.TerminateComponentRequest,
) (chasm.TerminateComponentResponse, error) {
cb := e.Callback.Get(ctx)
if cb.LifecycleState(ctx).IsClosed() {
if e.TerminateRequestId == "" {
// Completed organically (succeeded/failed/timed out), not via Terminate.
return chasm.TerminateComponentResponse{}, serviceerror.NewFailedPreconditionf(
"callback execution already in terminal state %v", cb.Status)
}
if e.TerminateRequestId != req.RequestID {
return chasm.TerminateComponentResponse{}, serviceerror.NewFailedPreconditionf(
"already terminated with request ID %s", e.TerminateRequestId)
}
return chasm.TerminateComponentResponse{}, nil
}
if err := TransitionTerminated.Apply(cb, ctx, EventTerminated{Reason: req.Reason}); err != nil {
return chasm.TerminateComponentResponse{}, fmt.Errorf("failed to terminate callback: %w", err)
}
e.TerminateRequestId = req.RequestID
return chasm.TerminateComponentResponse{}, nil
}

// Describe returns CallbackExecutionInfo for the describe RPC.
func (e *CallbackExecution) Describe(ctx chasm.Context) (*callbackpb.CallbackExecutionInfo, error) {
cb := e.Callback.Get(ctx)
apiCb, err := cb.ToAPICallback()
if err != nil {
return nil, err
}

info := &callbackpb.CallbackExecutionInfo{
CallbackId: e.CallbackId,
RunId: ctx.ExecutionKey().RunID,
Callback: apiCb,
Status: callbackStatusToAPIExecutionStatus(cb.Status),
State: callbackStatusToAPIState(cb.Status),
Attempt: cb.Attempt,
CreateTime: e.CreateTime,
LastAttemptCompleteTime: cb.LastAttemptCompleteTime,
LastAttemptFailure: cb.LastAttemptFailure,
NextAttemptScheduleTime: cb.NextAttemptScheduleTime,
CloseTime: cb.CloseTime,
ScheduleToCloseTimeout: e.ScheduleToCloseTimeout,
StateTransitionCount: ctx.StateTransitionCount(),
}
return info, nil
}

// GetOutcome returns the callback execution outcome if the execution is in a terminal state.
func (e *CallbackExecution) GetOutcome(ctx chasm.Context) (*callbackpb.CallbackExecutionOutcome, error) {
Comment thread
Quinn-With-Two-Ns marked this conversation as resolved.
cb := e.Callback.Get(ctx)
switch cb.Status {
case callbackspb.CALLBACK_STATUS_SUCCEEDED:
return &callbackpb.CallbackExecutionOutcome{
Value: &callbackpb.CallbackExecutionOutcome_Success{},
}, nil
case callbackspb.CALLBACK_STATUS_FAILED,
callbackspb.CALLBACK_STATUS_TERMINATED:
return &callbackpb.CallbackExecutionOutcome{
Value: &callbackpb.CallbackExecutionOutcome_Failure{
Failure: cb.LastAttemptFailure,
},
}, nil
default:
return nil, nil
}
}

// callbackStatusToAPIExecutionStatus maps internal CallbackStatus to public API CallbackExecutionStatus.
func callbackStatusToAPIExecutionStatus(status callbackspb.CallbackStatus) enumspb.CallbackExecutionStatus {
switch status {
case callbackspb.CALLBACK_STATUS_STANDBY,
callbackspb.CALLBACK_STATUS_SCHEDULED,
callbackspb.CALLBACK_STATUS_BACKING_OFF:
return enumspb.CALLBACK_EXECUTION_STATUS_RUNNING
case callbackspb.CALLBACK_STATUS_FAILED:
return enumspb.CALLBACK_EXECUTION_STATUS_FAILED
case callbackspb.CALLBACK_STATUS_SUCCEEDED:
return enumspb.CALLBACK_EXECUTION_STATUS_SUCCEEDED
case callbackspb.CALLBACK_STATUS_TERMINATED:
return enumspb.CALLBACK_EXECUTION_STATUS_TERMINATED
default:
return enumspb.CALLBACK_EXECUTION_STATUS_UNSPECIFIED
}
}

// callbackStatusToAPIState maps internal CallbackStatus to public API CallbackState.
func callbackStatusToAPIState(status callbackspb.CallbackStatus) enumspb.CallbackState {
switch status {
case callbackspb.CALLBACK_STATUS_STANDBY:
return enumspb.CALLBACK_STATE_STANDBY
case callbackspb.CALLBACK_STATUS_SCHEDULED:
return enumspb.CALLBACK_STATE_SCHEDULED
case callbackspb.CALLBACK_STATUS_BACKING_OFF:
return enumspb.CALLBACK_STATE_BACKING_OFF
case callbackspb.CALLBACK_STATUS_FAILED:
return enumspb.CALLBACK_STATE_FAILED
case callbackspb.CALLBACK_STATUS_SUCCEEDED:
return enumspb.CALLBACK_STATE_SUCCEEDED
case callbackspb.CALLBACK_STATUS_TERMINATED:
return enumspb.CALLBACK_STATE_TERMINATED
default:
return enumspb.CALLBACK_STATE_UNSPECIFIED
}
}

// SearchAttributes implements chasm.VisibilitySearchAttributesProvider.
func (e *CallbackExecution) SearchAttributes(ctx chasm.Context) []chasm.SearchAttributeKeyValue {
cb := e.Callback.Get(ctx)
return []chasm.SearchAttributeKeyValue{
executionStatusSearchAttribute.Value(callbackStatusToAPIExecutionStatus(cb.Status).String()),
}
}

// Memo implements chasm.VisibilityMemoProvider. Returns the CallbackExecutionListInfo
// as the memo for visibility queries.
func (e *CallbackExecution) Memo(ctx chasm.Context) proto.Message {
cb := e.Callback.Get(ctx)
return &callbackpb.CallbackExecutionListInfo{
CallbackId: e.CallbackId,
Status: callbackStatusToAPIExecutionStatus(cb.Status),
CreateTime: e.CreateTime,
CloseTime: cb.CloseTime,
}
}
Comment on lines +249 to +257
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these fields should be available to you. You shouldn't need to store any of this in visibility.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I don't understand are you saying they shouldn't be on the list output?


// GetNexusCompletion implements CompletionSource. It converts the stored completion
// payload to nexusrpc.CompleteOperationOptions for use by the Callback invocation logic.
func (e *CallbackExecution) GetNexusCompletion(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't worry, I'm going to change this interface to not call the method Get... :)

ctx chasm.Context,
requestID string,
) (nexusrpc.CompleteOperationOptions, error) {
opts := nexusrpc.CompleteOperationOptions{
StartTime: e.CreateTime.AsTime(),
}
switch c := e.Completion.(type) {
case *callbackspb.CallbackExecutionState_SuccessCompletion:
opts.Result = c.SuccessCompletion
return opts, nil
case *callbackspb.CallbackExecutionState_FailureCompletion:
f, err := commonnexus.TemporalFailureToNexusFailure(c.FailureCompletion)
if err != nil {
return nexusrpc.CompleteOperationOptions{}, fmt.Errorf("failed to convert failure: %w", err)
}
opErr := &nexus.OperationError{
State: nexus.OperationStateFailed,
Message: "operation failed",
Cause: &nexus.FailureError{Failure: f},
}
if err := nexusrpc.MarkAsWrapperError(nexusrpc.DefaultFailureConverter(), opErr); err != nil {
return nexusrpc.CompleteOperationOptions{}, fmt.Errorf("failed to mark wrapper error: %w", err)
}
opts.Error = opErr
return opts, nil
default:
return nexusrpc.CompleteOperationOptions{}, fmt.Errorf("empty completion payload")
}
}
13 changes: 7 additions & 6 deletions chasm/lib/callback/chasm_invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,14 @@ func (c chasmInvocation) Invoke(
task *callbackspb.InvocationTask,
taskAttr chasm.TaskAttributes,
) invocationResult {
header := nexus.Header(c.nexus.GetHeader())
if header == nil {
header = nexus.Header{}
// Get the token from the dedicated Token field, falling back to the header for backward compat.
encodedRef := c.nexus.GetToken()
if encodedRef == "" {
header := nexus.Header(c.nexus.GetHeader())
if header != nil {
encodedRef = header.Get(commonnexus.CallbackTokenHeader)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would ideally want to make the callback token header something generic that we can put in the Nexus SPEC.md. I was hoping we would do that as part of this change. It's just a bit tricky since you'll have to send both headers for a while for compatibility but on the handler side, we can remove the duplication if we can read from the new header.

}
}

// Get back the base64-encoded ComponentRef from the header.
encodedRef := header.Get(commonnexus.CallbackTokenHeader)
if encodedRef == "" {
return invocationResultFail{logInternalError(h.logger, "callback missing token", nil)}
}
Expand Down
11 changes: 10 additions & 1 deletion chasm/lib/callback/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ func (c *Callback) LifecycleState(_ chasm.Context) chasm.LifecycleState {
switch c.Status {
case callbackspb.CALLBACK_STATUS_SUCCEEDED:
return chasm.LifecycleStateCompleted
case callbackspb.CALLBACK_STATUS_FAILED:
case callbackspb.CALLBACK_STATUS_FAILED,
callbackspb.CALLBACK_STATUS_TERMINATED:
// TODO: Use chasm.LifecycleStateTerminated when it's available (currently commented out
// in chasm/component.go:70). For now, LifecycleStateFailed is functionally correct
// as IsClosed() returns true for all states >= LifecycleStateCompleted.
return chasm.LifecycleStateFailed
default:
return chasm.LifecycleStateRunning
Expand Down Expand Up @@ -117,6 +121,11 @@ func (c *Callback) saveResult(
ctx chasm.MutableContext,
input saveResultInput,
) (chasm.NoValue, error) {
// If the callback was terminated while the invocation was in-flight,
// the result is no longer relevant — drop it silently.
if c.LifecycleState(ctx).IsClosed() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transitions should already validate that the callback is in a correct state and the task will fail with a warning in the log (IIRC). It's an edge case that I am happy to ignore since the error would be benign.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we want benign warning logs in production? Why not just not process it and skip the log?

return nil, nil
}
switch r := input.result.(type) {
case invocationResultOK:
err := TransitionSucceeded.Apply(c, ctx, EventSucceeded{Time: ctx.Now(c)})
Expand Down
Loading