Skip to content

Commit 4997c93

Browse files
rkannan82claude
andauthored
Store worker attributes needed by server to propagate nexus tasks to worker (#9231)
## What changed? As part of RecordActivityTaskStarted flow, store worker_control_task_queue for an activity in the mutable state (ActivityInfo). Main changes: - executions.proto: Added the new worker_control_task_queue field. - mutable_state_impl.go: Update mutable state. - matching/forwarder.go: Propagate worker_control_task_queue when polls get forwarded. Otherwise, RecordActivityTaskStarted request will not have it set when invoked from a forwarded poll. ## Why? To support activity cancellation without activity heartbeat. Overall flow: - [This PR] Store worker attributes in ActivityInfo as part of RecordActivityTaskStarted call. - [#9232] When user cancels a workflow, create 1 or more tasks. Group all activities belonging to a worker into the task (for efficiency). - [#9233] Lookup the Nexus task queue for each worker, and send a Nexus operation for each transfer task. - [SDK] Worker will receive this cancel task and cancel the running activities. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) - [ ] added new functional test(s) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 7183025 commit 4997c93

File tree

16 files changed

+119
-8
lines changed

16 files changed

+119
-8
lines changed

api/persistence/v1/executions.pb.go

Lines changed: 15 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/internal/temporal/server/api/persistence/v1/executions.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,10 @@ message ActivityInfo {
655655
bool reset_heartbeats = 48;
656656

657657
int64 start_version = 50;
658+
659+
// A dedicated per-worker Nexus task queue on which the server sends control
660+
// tasks (e.g. activity cancellation) to this specific worker instance.
661+
string worker_control_task_queue = 51;
658662
}
659663

660664
// timer_map column

service/history/api/recordactivitytaskstarted/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ func recordActivityTaskStarted(
242242
if _, err := mutableState.AddActivityTaskStartedEvent(
243243
ai, scheduledEventID, requestID, request.PollRequest.GetIdentity(),
244244
versioningStamp, pollerDeployment, request.GetBuildIdRedirectInfo(),
245+
request.PollRequest.GetWorkerControlTaskQueue(),
245246
); err != nil {
246247
return nil, rejectCodeUndefined, err
247248
}

service/history/api/respondactivitytaskcompleted/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ func Invoke(
9999
// TODO (shahab): do we need to do anything with wf redirect in this case or any
100100
// other case where an activity starts?
101101
nil,
102+
"", // workerControlTaskQueue not available for force complete
102103
)
103104
if err != nil {
104105
return nil, err

service/history/api/respondworkflowtaskcompleted/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
393393

394394
workflowTaskHandler := newWorkflowTaskCompletedHandler(
395395
request.GetIdentity(),
396+
request.GetWorkerControlTaskQueue(),
396397
completedEvent.GetEventId(), // If completedEvent is nil, then GetEventId() returns 0 and this value shouldn't be used in workflowTaskHandler.
397398
ms,
398399
updateRegistry,

service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type (
5454

5555
workflowTaskCompletedHandler struct {
5656
identity string
57+
workerControlTaskQueue string
5758
workflowTaskCompletedID int64
5859

5960
// internal state
@@ -106,6 +107,7 @@ type (
106107

107108
func newWorkflowTaskCompletedHandler(
108109
identity string,
110+
workerControlTaskQueue string,
109111
workflowTaskCompletedID int64,
110112
mutableState historyi.MutableState,
111113
updateRegistry update.Registry,
@@ -126,6 +128,7 @@ func newWorkflowTaskCompletedHandler(
126128
) *workflowTaskCompletedHandler {
127129
return &workflowTaskCompletedHandler{
128130
identity: identity,
131+
workerControlTaskQueue: workerControlTaskQueue,
129132
workflowTaskCompletedID: workflowTaskCompletedID,
130133

131134
// internal state
@@ -585,6 +588,7 @@ func (handler *workflowTaskCompletedHandler) handlePostCommandEagerExecuteActivi
585588
stamp,
586589
nil,
587590
nil,
591+
handler.workerControlTaskQueue, // Eager: activity runs on the same worker that completed the WFT.
588592
); err != nil {
589593
return nil, err
590594
}

service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ func TestCommandProtocolMessage(t *testing.T) {
108108
)
109109
out.handler = newWorkflowTaskCompletedHandler(
110110
t.Name(), // identity
111+
"", // workerControlTaskQueue
111112
123, // workflowTaskCompletedID
112113
out.ms,
113114
out.updates,

service/history/history_engine_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6702,6 +6702,7 @@ func addActivityTaskStartedEvent(ms historyi.MutableState, scheduledEventID int6
67026702
nil,
67036703
nil,
67046704
nil,
6705+
"",
67056706
)
67066707
return event
67076708
}

service/history/interfaces/mutable_state.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type (
5757
*commonpb.WorkerVersionStamp,
5858
*deploymentpb.Deployment,
5959
*taskqueuespb.BuildIdRedirectInfo,
60+
string, // workerControlTaskQueue
6061
) (*historypb.HistoryEvent, error)
6162
AddActivityTaskTimedOutEvent(int64, int64, *failurepb.Failure, enumspb.RetryState) (*historypb.HistoryEvent, error)
6263
AddChildWorkflowExecutionCanceledEvent(int64, *commonpb.WorkflowExecution, *historypb.WorkflowExecutionCanceledEventAttributes) (*historypb.HistoryEvent, error)

service/history/interfaces/mutable_state_mock.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)