Skip to content

Commit 7489808

Browse files
committed
Use stored WorkerControlTaskQueue for activity cancellation dispatch
Instead of looking up the control queue name from the worker registry, use the WorkerControlTaskQueue that was persisted in ActivityInfo when the activity started. This simplifies the dispatch logic and removes the dependency on the worker registry lookup. Changes: - Get controlQueueName from first valid activity's WorkerControlTaskQueue - Remove registry lookup and hardcoded queue name construction - Update dispatchActivityCancelToWorker signature to accept queue name - Update test to send WorkerControlTaskQueue in poll request
1 parent 5b95af3 commit 7489808

File tree

2 files changed

+19
-20
lines changed

2 files changed

+19
-20
lines changed

service/history/transfer_queue_active_task_executor.go

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2007,12 +2007,19 @@ func (t *transferQueueActiveTaskExecutor) processCancelActivityNexus(
20072007

20082008
// Build task tokens for all activities that still need cancellation
20092009
var taskTokens [][]byte
2010+
var controlQueueName string
20102011
for _, scheduledEventID := range task.ScheduledEventIDs {
20112012
ai, ok := mutableState.GetActivityInfo(scheduledEventID)
20122013
if !ok || !ai.CancelRequested || ai.StartedEventId == common.EmptyEventID {
20132014
continue
20142015
}
20152016

2017+
// Get control queue name from the first valid activity.
2018+
// All activities in a batch are from the same worker, so they have the same control queue.
2019+
if controlQueueName == "" {
2020+
controlQueueName = ai.WorkerControlTaskQueue
2021+
}
2022+
20162023
taskToken := &tokenspb.Task{
20172024
NamespaceId: task.NamespaceID,
20182025
WorkflowId: task.WorkflowID,
@@ -2030,25 +2037,19 @@ func (t *transferQueueActiveTaskExecutor) processCancelActivityNexus(
20302037
taskTokens = append(taskTokens, taskTokenBytes)
20312038
}
20322039

2033-
if len(taskTokens) == 0 {
2040+
if len(taskTokens) == 0 || controlQueueName == "" {
20342041
return nil
20352042
}
20362043

2037-
return t.dispatchActivityCancelToWorker(ctx, task, taskTokens)
2044+
return t.dispatchActivityCancelToWorker(ctx, task.NamespaceID, controlQueueName, taskTokens)
20382045
}
20392046

20402047
func (t *transferQueueActiveTaskExecutor) dispatchActivityCancelToWorker(
20412048
ctx context.Context,
2042-
task *tasks.CancelActivityNexusTask,
2049+
namespaceID string,
2050+
controlQueueName string,
20432051
taskTokens [][]byte,
20442052
) error {
2045-
nsName, err := t.shardContext.GetNamespaceRegistry().GetNamespaceName(namespace.ID(task.NamespaceID))
2046-
if err != nil {
2047-
return err
2048-
}
2049-
// TODO: Fetch control queue name from worker registry.
2050-
controlQueueName := fmt.Sprintf("/temporal-sys/worker-commands/%s/%s-nexus-queue", nsName, task.WorkerInstanceKey)
2051-
20522053
cancelPayload := &workerpb.CancelActivitiesRequestPayload{
20532054
TaskTokens: taskTokens,
20542055
}
@@ -2074,7 +2075,7 @@ func (t *transferQueueActiveTaskExecutor) dispatchActivityCancelToWorker(
20742075
}
20752076

20762077
resp, err := t.matchingRawClient.DispatchNexusTask(ctx, &matchingservice.DispatchNexusTaskRequest{
2077-
NamespaceId: task.NamespaceID,
2078+
NamespaceId: namespaceID,
20782079
TaskQueue: &taskqueuepb.TaskQueue{
20792080
Name: controlQueueName,
20802081
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
@@ -2083,17 +2084,14 @@ func (t *transferQueueActiveTaskExecutor) dispatchActivityCancelToWorker(
20832084
})
20842085
if err != nil {
20852086
t.logger.Warn("Failed to dispatch activity cancel to worker",
2086-
tag.WorkflowNamespaceID(task.NamespaceID),
2087-
tag.WorkflowID(task.WorkflowID),
2088-
tag.WorkflowRunID(task.RunID),
2087+
tag.NewStringTag("control_queue", controlQueueName),
20892088
tag.Error(err))
20902089
return err
20912090
}
20922091

20932092
if resp.GetRequestTimeout() != nil {
20942093
t.logger.Warn("No worker polling control queue for activity cancel",
2095-
tag.WorkflowNamespaceID(task.NamespaceID),
2096-
tag.WorkflowID(task.WorkflowID))
2094+
tag.NewStringTag("control_queue", controlQueueName))
20972095
return serviceerror.NewUnavailable("no worker polling control queue")
20982096
}
20992097

tests/cancel_running_activities_using_nexus_task_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,11 @@ func (s *CancelRunningActivitiesUsingNexusTaskSuite) TestDispatchCancelToWorker(
8989

9090
// Poll for activity task and start running the activity.
9191
activityPollResp, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{
92-
Namespace: s.Namespace().String(),
93-
TaskQueue: tv.TaskQueue(),
94-
Identity: tv.WorkerIdentity(),
95-
WorkerInstanceKey: tv.WorkerInstanceKey(),
92+
Namespace: s.Namespace().String(),
93+
TaskQueue: tv.TaskQueue(),
94+
Identity: tv.WorkerIdentity(),
95+
WorkerInstanceKey: tv.WorkerInstanceKey(),
96+
WorkerControlTaskQueue: controlQueueName,
9697
})
9798
s.NoError(err)
9899
s.NotNil(activityPollResp)

0 commit comments

Comments
 (0)