Skip to content

Commit 5d17c04

Browse files
rkannan82claude
andcommitted
Address review feedback: backward compat, standby executor, lock safety
- Add StartedClock nil check in cancel handler (backward compat for activities started before deploy) - Add WorkerCommandsTask case to standby executor (drop task) - Use shardCtx.GetConfig() instead of passing config param - Add lock around Executable.Attempt() for thread safety - Add replication comment on started_clock proto field - Add tests for cancel command with/without StartedClock Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 8dec8b4 commit 5d17c04

8 files changed

Lines changed: 171 additions & 33 deletions

File tree

proto/internal/temporal/server/api/persistence/v1/executions.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,9 @@ message ActivityInfo {
655655
// is to store the full serialized task token (~150-300 bytes), which avoids
656656
// reconstruction entirely and is immune to token format changes. We chose the
657657
// clock approach to keep the per-activity memory footprint minimal (~24 bytes).
658+
//
659+
// Replication: This field is part of ActivityInfo and is automatically replicated
660+
// via state-based replication. No special handling is needed.
658661
temporal.server.api.clock.v1.VectorClock started_clock = 52;
659662
}
660663

service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -670,36 +670,47 @@ func (handler *workflowTaskCompletedHandler) handleCommandRequestCancelActivity(
670670
}
671671
handler.activityNotStartedCancelled = true
672672
} else if ai.StartedEventId != common.EmptyEventID && ai.WorkerControlTaskQueue != "" {
673-
// Activity has started and worker supports Nexus control tasks - collect for batched dispatch.
674-
taskToken, err := handler.tokenSerializer.Serialize(tasktoken.NewActivityTaskToken(
675-
handler.mutableState.GetNamespaceEntry().ID().String(),
676-
handler.mutableState.GetWorkflowKey().WorkflowID,
677-
handler.mutableState.GetWorkflowKey().RunID,
678-
ai.ScheduledEventId,
679-
ai.ActivityId,
680-
ai.ActivityType.GetName(),
681-
ai.Attempt,
682-
ai.StartedClock,
683-
ai.Version,
684-
ai.StartVersion,
685-
nil,
686-
))
687-
if err != nil {
688-
return nil, err
689-
}
690-
if handler.pendingWorkerCommandsByControlQueue == nil {
691-
handler.pendingWorkerCommandsByControlQueue = make(map[string][]*workerpb.WorkerCommand)
692-
}
693-
handler.pendingWorkerCommandsByControlQueue[ai.WorkerControlTaskQueue] = append(
694-
handler.pendingWorkerCommandsByControlQueue[ai.WorkerControlTaskQueue],
695-
&workerpb.WorkerCommand{
696-
Type: &workerpb.WorkerCommand_CancelActivity{
697-
CancelActivity: &workerpb.CancelActivityCommand{
698-
TaskToken: taskToken,
673+
if ai.StartedClock == nil {
674+
// StartedClock may be nil for activities started before this feature was deployed.
675+
// Skip cancel command; the activity will time out normally.
676+
handler.logger.Info("Skipping worker cancel command: activity missing StartedClock (pre-deploy)",
677+
tag.WorkflowNamespaceID(handler.mutableState.GetWorkflowKey().NamespaceID),
678+
tag.WorkflowID(handler.mutableState.GetWorkflowKey().WorkflowID),
679+
tag.WorkflowRunID(handler.mutableState.GetWorkflowKey().RunID),
680+
tag.WorkflowScheduledEventID(ai.ScheduledEventId),
681+
)
682+
} else {
683+
// Activity has started and worker supports Nexus control tasks - collect for batched dispatch.
684+
taskToken, err := handler.tokenSerializer.Serialize(tasktoken.NewActivityTaskToken(
685+
handler.mutableState.GetNamespaceEntry().ID().String(),
686+
handler.mutableState.GetWorkflowKey().WorkflowID,
687+
handler.mutableState.GetWorkflowKey().RunID,
688+
ai.ScheduledEventId,
689+
ai.ActivityId,
690+
ai.ActivityType.GetName(),
691+
ai.Attempt,
692+
ai.StartedClock,
693+
ai.Version,
694+
ai.StartVersion,
695+
nil,
696+
))
697+
if err != nil {
698+
return nil, err
699+
}
700+
if handler.pendingWorkerCommandsByControlQueue == nil {
701+
handler.pendingWorkerCommandsByControlQueue = make(map[string][]*workerpb.WorkerCommand)
702+
}
703+
handler.pendingWorkerCommandsByControlQueue[ai.WorkerControlTaskQueue] = append(
704+
handler.pendingWorkerCommandsByControlQueue[ai.WorkerControlTaskQueue],
705+
&workerpb.WorkerCommand{
706+
Type: &workerpb.WorkerCommand_CancelActivity{
707+
CancelActivity: &workerpb.CancelActivityCommand{
708+
TaskToken: taskToken,
709+
},
699710
},
700711
},
701-
},
702-
)
712+
)
713+
}
703714
}
704715
}
705716
return actCancelReqEvent, nil

service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler_test.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"testing"
88
"time"
99

10+
"github.com/stretchr/testify/assert"
1011
"github.com/stretchr/testify/require"
1112
commandpb "go.temporal.io/api/command/v1"
1213
commonpb "go.temporal.io/api/common/v1"
@@ -17,12 +18,15 @@ import (
1718
"go.temporal.io/api/serviceerror"
1819
updatepb "go.temporal.io/api/update/v1"
1920
workerpb "go.temporal.io/api/worker/v1"
21+
clockspb "go.temporal.io/server/api/clock/v1"
2022
persistencespb "go.temporal.io/server/api/persistence/v1"
23+
"go.temporal.io/server/common"
2124
"go.temporal.io/server/common/backoff"
2225
"go.temporal.io/server/common/collection"
2326
"go.temporal.io/server/common/dynamicconfig"
2427
"go.temporal.io/server/common/effect"
2528
"go.temporal.io/server/common/log"
29+
"go.temporal.io/server/common/tasktoken"
2630
"go.temporal.io/server/common/metrics"
2731
"go.temporal.io/server/common/namespace"
2832
"go.temporal.io/server/common/namespace/nsregistry"
@@ -470,3 +474,120 @@ func TestFlushWorkerCommandsTasks(t *testing.T) {
470474
require.NoError(t, err)
471475
})
472476
}
477+
478+
func TestHandleCommandRequestCancelActivity_WorkerCommands(t *testing.T) {
479+
t.Parallel()
480+
481+
cancelReqEvent := &historypb.HistoryEvent{EventId: 10}
482+
scheduledEventID := int64(5)
483+
controlQueue := "/_sys/worker-commands/test-ns/key1"
484+
485+
t.Run("started activity with clock collects worker command", func(t *testing.T) {
486+
ctrl := gomock.NewController(t)
487+
ms := historyi.NewMockMutableState(ctrl)
488+
489+
ai := &persistencespb.ActivityInfo{
490+
ScheduledEventId: scheduledEventID,
491+
StartedEventId: 7,
492+
ActivityId: "act-1",
493+
WorkerControlTaskQueue: controlQueue,
494+
StartedClock: &clockspb.VectorClock{ClusterId: 1, ShardId: 1, Clock: 100},
495+
Version: 1,
496+
StartVersion: 1,
497+
Attempt: 1,
498+
ActivityType: &commonpb.ActivityType{Name: "test-activity"},
499+
}
500+
501+
ms.EXPECT().AddActivityTaskCancelRequestedEvent(int64(123), scheduledEventID, "test-identity").
502+
Return(cancelReqEvent, ai, nil)
503+
ms.EXPECT().GetNamespaceEntry().Return(tests.LocalNamespaceEntry).AnyTimes()
504+
ms.EXPECT().GetWorkflowKey().Return(tests.WorkflowKey).AnyTimes()
505+
506+
handler := &workflowTaskCompletedHandler{
507+
identity: "test-identity",
508+
workflowTaskCompletedID: 123,
509+
mutableState: ms,
510+
tokenSerializer: tasktoken.NewSerializer(),
511+
logger: log.NewNoopLogger(),
512+
}
513+
514+
event, err := handler.handleCommandRequestCancelActivity(
515+
context.Background(),
516+
&commandpb.RequestCancelActivityTaskCommandAttributes{
517+
ScheduledEventId: scheduledEventID,
518+
},
519+
)
520+
require.NoError(t, err)
521+
assert.Equal(t, cancelReqEvent, event)
522+
assert.Len(t, handler.pendingWorkerCommandsByControlQueue[controlQueue], 1)
523+
})
524+
525+
t.Run("started activity without clock skips worker command", func(t *testing.T) {
526+
ctrl := gomock.NewController(t)
527+
ms := historyi.NewMockMutableState(ctrl)
528+
529+
ai := &persistencespb.ActivityInfo{
530+
ScheduledEventId: scheduledEventID,
531+
StartedEventId: 7,
532+
ActivityId: "act-1",
533+
WorkerControlTaskQueue: controlQueue,
534+
StartedClock: nil, // pre-deploy: no clock stored
535+
Version: 1,
536+
StartVersion: 1,
537+
Attempt: 1,
538+
}
539+
540+
ms.EXPECT().AddActivityTaskCancelRequestedEvent(int64(123), scheduledEventID, "test-identity").
541+
Return(cancelReqEvent, ai, nil)
542+
543+
handler := &workflowTaskCompletedHandler{
544+
identity: "test-identity",
545+
workflowTaskCompletedID: 123,
546+
mutableState: ms,
547+
logger: log.NewNoopLogger(),
548+
}
549+
550+
event, err := handler.handleCommandRequestCancelActivity(
551+
context.Background(),
552+
&commandpb.RequestCancelActivityTaskCommandAttributes{
553+
ScheduledEventId: scheduledEventID,
554+
},
555+
)
556+
require.NoError(t, err)
557+
assert.Equal(t, cancelReqEvent, event)
558+
assert.Empty(t, handler.pendingWorkerCommandsByControlQueue, "no worker command should be created when StartedClock is nil")
559+
})
560+
561+
t.Run("started activity without control queue does not collect worker command", func(t *testing.T) {
562+
ctrl := gomock.NewController(t)
563+
ms := historyi.NewMockMutableState(ctrl)
564+
565+
ai := &persistencespb.ActivityInfo{
566+
ScheduledEventId: scheduledEventID,
567+
StartedEventId: 7,
568+
ActivityId: "act-1",
569+
WorkerControlTaskQueue: "", // worker doesn't support control tasks
570+
StartedClock: &clockspb.VectorClock{ClusterId: 1, ShardId: 1, Clock: 100},
571+
}
572+
573+
ms.EXPECT().AddActivityTaskCancelRequestedEvent(int64(123), scheduledEventID, "test-identity").
574+
Return(cancelReqEvent, ai, nil)
575+
576+
handler := &workflowTaskCompletedHandler{
577+
identity: "test-identity",
578+
workflowTaskCompletedID: 123,
579+
mutableState: ms,
580+
logger: log.NewNoopLogger(),
581+
}
582+
583+
event, err := handler.handleCommandRequestCancelActivity(
584+
context.Background(),
585+
&commandpb.RequestCancelActivityTaskCommandAttributes{
586+
ScheduledEventId: scheduledEventID,
587+
},
588+
)
589+
require.NoError(t, err)
590+
assert.Equal(t, cancelReqEvent, event)
591+
assert.Empty(t, handler.pendingWorkerCommandsByControlQueue)
592+
})
593+
}

service/history/outbound_queue_active_task_executor.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"go.temporal.io/server/common/log"
1212
"go.temporal.io/server/common/metrics"
1313
"go.temporal.io/server/common/resource"
14-
"go.temporal.io/server/service/history/configs"
1514
"go.temporal.io/server/service/history/consts"
1615
historyi "go.temporal.io/server/service/history/interfaces"
1716
"go.temporal.io/server/service/history/queues"
@@ -39,7 +38,6 @@ func newOutboundQueueActiveTaskExecutor(
3938
metricsHandler metrics.Handler,
4039
chasmEngine chasm.Engine,
4140
matchingRawClient resource.MatchingRawClient,
42-
config *configs.Config,
4341
) *outboundQueueActiveTaskExecutor {
4442
scopedMetricsHandler := metricsHandler.WithTags(
4543
metrics.OperationTag(metrics.OperationOutboundQueueProcessorScope),
@@ -54,7 +52,7 @@ func newOutboundQueueActiveTaskExecutor(
5452
chasmEngine: chasmEngine,
5553
workerCommandsTaskDispatcher: newWorkerCommandsTaskDispatcher(
5654
matchingRawClient,
57-
config,
55+
shardCtx.GetConfig(),
5856
scopedMetricsHandler,
5957
logger,
6058
),

service/history/outbound_queue_active_task_executor_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ func (s *outboundQueueActiveTaskExecutorSuite) SetupTest() {
116116
s.metricsHandler,
117117
s.mockChasmEngine,
118118
nil, // matchingRawClient - not used in these tests
119-
nil, // config - not used in these tests
120119
)
121120
}
122121

service/history/outbound_queue_factory.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,6 @@ func (f *outboundQueueFactory) CreateQueue(
230230
metricsHandler,
231231
f.ChasmEngine,
232232
f.MatchingRawClient,
233-
shardContext.GetConfig(),
234233
)
235234

236235
standbyExecutor := newOutboundQueueStandbyTaskExecutor(

service/history/outbound_queue_standby_task_executor.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ func (e *outboundQueueStandbyTaskExecutor) Execute(
9999
return respond(e.executeStateMachineTask(ctx, task, nsName))
100100
case *tasks.ChasmTask:
101101
return respond(e.executeChasmSideEffectTask(ctx, task))
102+
case *tasks.WorkerCommandsTask:
103+
// Worker commands are best-effort and only executed on the active cluster.
104+
// On standby, simply drop the task.
105+
return respond(nil)
102106
}
103107

104108
return respond(queueserrors.NewUnprocessableTaskError(fmt.Sprintf("unknown task type '%T'", task)))

service/history/queues/executable.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,9 @@ func (e *executableImpl) State() ctasks.State {
716716
}
717717

718718
func (e *executableImpl) Attempt() int {
719+
e.Lock()
720+
defer e.Unlock()
721+
719722
return e.attempt
720723
}
721724

0 commit comments

Comments
 (0)