Skip to content

Commit eb79c81

Browse files
committed
Use stored WorkerControlTaskQueue for activity cancellation dispatch
Instead of looking up the control queue name from the worker registry, use the WorkerControlTaskQueue that was persisted in ActivityInfo when the activity started. This simplifies the dispatch logic and removes the dependency on the worker registry lookup. Changes: - Get controlQueueName from first valid activity's WorkerControlTaskQueue - Remove registry lookup and hardcoded queue name construction - Update dispatchActivityCancelToWorker signature to accept queue name - Update test to send WorkerControlTaskQueue in poll request
1 parent 520f78c commit eb79c81

File tree

2 files changed

+19
-20
lines changed

2 files changed

+19
-20
lines changed

service/history/transfer_queue_active_task_executor.go

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2004,12 +2004,19 @@ func (t *transferQueueActiveTaskExecutor) processCancelActivityNexus(
20042004

20052005
// Build task tokens for all activities that still need cancellation
20062006
var taskTokens [][]byte
2007+
var controlQueueName string
20072008
for _, scheduledEventID := range task.ScheduledEventIDs {
20082009
ai, ok := mutableState.GetActivityInfo(scheduledEventID)
20092010
if !ok || !ai.CancelRequested || ai.StartedEventId == common.EmptyEventID {
20102011
continue
20112012
}
20122013

2014+
// Get control queue name from the first valid activity.
2015+
// All activities in a batch are from the same worker, so they have the same control queue.
2016+
if controlQueueName == "" {
2017+
controlQueueName = ai.WorkerControlTaskQueue
2018+
}
2019+
20132020
taskToken := &tokenspb.Task{
20142021
NamespaceId: task.NamespaceID,
20152022
WorkflowId: task.WorkflowID,
@@ -2027,25 +2034,19 @@ func (t *transferQueueActiveTaskExecutor) processCancelActivityNexus(
20272034
taskTokens = append(taskTokens, taskTokenBytes)
20282035
}
20292036

2030-
if len(taskTokens) == 0 {
2037+
if len(taskTokens) == 0 || controlQueueName == "" {
20312038
return nil
20322039
}
20332040

2034-
return t.dispatchActivityCancelToWorker(ctx, task, taskTokens)
2041+
return t.dispatchActivityCancelToWorker(ctx, task.NamespaceID, controlQueueName, taskTokens)
20352042
}
20362043

20372044
func (t *transferQueueActiveTaskExecutor) dispatchActivityCancelToWorker(
20382045
ctx context.Context,
2039-
task *tasks.CancelActivityNexusTask,
2046+
namespaceID string,
2047+
controlQueueName string,
20402048
taskTokens [][]byte,
20412049
) error {
2042-
nsName, err := t.shardContext.GetNamespaceRegistry().GetNamespaceName(namespace.ID(task.NamespaceID))
2043-
if err != nil {
2044-
return err
2045-
}
2046-
// TODO: Fetch control queue name from worker registry.
2047-
controlQueueName := fmt.Sprintf("/temporal-sys/worker-commands/%s/%s-nexus-queue", nsName, task.WorkerInstanceKey)
2048-
20492050
cancelPayload := &workerpb.CancelActivitiesRequestPayload{
20502051
TaskTokens: taskTokens,
20512052
}
@@ -2071,7 +2072,7 @@ func (t *transferQueueActiveTaskExecutor) dispatchActivityCancelToWorker(
20712072
}
20722073

20732074
resp, err := t.matchingRawClient.DispatchNexusTask(ctx, &matchingservice.DispatchNexusTaskRequest{
2074-
NamespaceId: task.NamespaceID,
2075+
NamespaceId: namespaceID,
20752076
TaskQueue: &taskqueuepb.TaskQueue{
20762077
Name: controlQueueName,
20772078
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
@@ -2080,17 +2081,14 @@ func (t *transferQueueActiveTaskExecutor) dispatchActivityCancelToWorker(
20802081
})
20812082
if err != nil {
20822083
t.logger.Warn("Failed to dispatch activity cancel to worker",
2083-
tag.WorkflowNamespaceID(task.NamespaceID),
2084-
tag.WorkflowID(task.WorkflowID),
2085-
tag.WorkflowRunID(task.RunID),
2084+
tag.NewStringTag("control_queue", controlQueueName),
20862085
tag.Error(err))
20872086
return err
20882087
}
20892088

20902089
if resp.GetRequestTimeout() != nil {
20912090
t.logger.Warn("No worker polling control queue for activity cancel",
2092-
tag.WorkflowNamespaceID(task.NamespaceID),
2093-
tag.WorkflowID(task.WorkflowID))
2091+
tag.NewStringTag("control_queue", controlQueueName))
20942092
return serviceerror.NewUnavailable("no worker polling control queue")
20952093
}
20962094

tests/cancel_running_activities_using_nexus_task_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,11 @@ func (s *CancelRunningActivitiesUsingNexusTaskSuite) TestDispatchCancelToWorker(
8989

9090
// Poll for activity task and start running the activity.
9191
activityPollResp, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{
92-
Namespace: s.Namespace().String(),
93-
TaskQueue: tv.TaskQueue(),
94-
Identity: tv.WorkerIdentity(),
95-
WorkerInstanceKey: tv.WorkerInstanceKey(),
92+
Namespace: s.Namespace().String(),
93+
TaskQueue: tv.TaskQueue(),
94+
Identity: tv.WorkerIdentity(),
95+
WorkerInstanceKey: tv.WorkerInstanceKey(),
96+
WorkerControlTaskQueue: controlQueueName,
9697
})
9798
s.NoError(err)
9899
s.NotNil(activityPollResp)

0 commit comments

Comments
 (0)