Skip to content

Commit 7040cec

Browse files
rkannan82claude
andcommitted
Mark WorkerCommandsTask as low priority
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent b8b1352 commit 7040cec

File tree

2 files changed

+200
-1
lines changed

2 files changed

+200
-1
lines changed

service/history/queues/priority_assigner.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ func (a *priorityAssignerImpl) Assign(executable Executable) tasks.Priority {
4343
case enumsspb.TASK_TYPE_ACTIVITY_TIMEOUT,
4444
enumsspb.TASK_TYPE_WORKFLOW_TASK_TIMEOUT,
4545
enumsspb.TASK_TYPE_WORKFLOW_RUN_TIMEOUT,
46-
enumsspb.TASK_TYPE_WORKFLOW_EXECUTION_TIMEOUT:
46+
enumsspb.TASK_TYPE_WORKFLOW_EXECUTION_TIMEOUT,
47+
enumsspb.TASK_TYPE_WORKER_COMMANDS:
4748
return tasks.PriorityLow
4849
case enumsspb.TASK_TYPE_DELETE_HISTORY_EVENT,
4950
enumsspb.TASK_TYPE_TRANSFER_DELETE_EXECUTION,
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
package history
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"time"
8+
9+
"github.com/nexus-rpc/sdk-go/nexus"
10+
commonpb "go.temporal.io/api/common/v1"
11+
enumspb "go.temporal.io/api/enums/v1"
12+
nexuspb "go.temporal.io/api/nexus/v1"
13+
workerservicepb "go.temporal.io/api/nexusservices/workerservice/v1"
14+
taskqueuepb "go.temporal.io/api/taskqueue/v1"
15+
"go.temporal.io/server/api/matchingservice/v1"
16+
"go.temporal.io/server/common/debug"
17+
"go.temporal.io/server/common/log"
18+
"go.temporal.io/server/common/log/tag"
19+
"go.temporal.io/server/common/metrics"
20+
"go.temporal.io/server/common/resource"
21+
"go.temporal.io/server/service/history/configs"
22+
"go.temporal.io/server/service/history/tasks"
23+
"google.golang.org/protobuf/proto"
24+
)
25+
26+
const (
27+
workerCommandsTaskTimeout = time.Second * 10 * debug.TimeoutMultiplier
28+
workerCommandsMaxTaskAttempt = 3
29+
)
30+
31+
// workerCommandsTaskDispatcher dispatches worker commands to workers via Nexus.
32+
//
33+
// Failure scenarios:
34+
// - No worker polling: matching returns RequestTimeout -> *nexus.HandlerError{Type: UpstreamTimeout}.
35+
// Retryable -- worker may come up later.
36+
// - Worker crashes after receiving the task: matching blocks waiting for a response until
37+
// context deadline, then returns RequestTimeout. Indistinguishable from "no worker polling".
38+
// Safe to retry because commands are idempotent (e.g., cancelling a missing activity is a
39+
// no-op success per the worker contract).
40+
// - Transport/RPC failure: *nexus.HandlerError. Retryable.
41+
// - Operation failure (worker explicitly returns error): *nexus.OperationError. Permanent --
42+
// the worker contract requires success for all defined commands, so this indicates a bug
43+
// or version incompatibility.
44+
//
45+
// Retryable errors are capped at workerCommandsMaxTaskAttempt attempts (in-memory). These
46+
// commands are best-effort — the activity will eventually time out anyway — so excessive
47+
// retries waste resources. The counter resets on shard movement, which is acceptable.
48+
type workerCommandsTaskDispatcher struct {
49+
matchingRawClient resource.MatchingRawClient
50+
config *configs.Config
51+
metricsHandler metrics.Handler
52+
logger log.Logger
53+
}
54+
55+
func newWorkerCommandsTaskDispatcher(
56+
matchingRawClient resource.MatchingRawClient,
57+
config *configs.Config,
58+
metricsHandler metrics.Handler,
59+
logger log.Logger,
60+
) *workerCommandsTaskDispatcher {
61+
return &workerCommandsTaskDispatcher{
62+
matchingRawClient: matchingRawClient,
63+
config: config,
64+
metricsHandler: metricsHandler,
65+
logger: logger,
66+
}
67+
}
68+
69+
func (d *workerCommandsTaskDispatcher) execute(
70+
ctx context.Context,
71+
task *tasks.WorkerCommandsTask,
72+
attempt int,
73+
) error {
74+
if attempt > workerCommandsMaxTaskAttempt {
75+
d.logger.Info("Worker commands task exceeded max attempts, dropping",
76+
tag.WorkflowID(task.WorkflowID),
77+
tag.WorkflowRunID(task.RunID),
78+
tag.NewStringTag("control_queue", task.Destination),
79+
tag.Attempt(int32(attempt)),
80+
)
81+
metrics.WorkerCommandsSent.With(d.metricsHandler).Record(1, metrics.OutcomeTag("max_attempts_exceeded"))
82+
return nil
83+
}
84+
85+
if !d.config.EnableCancelActivityWorkerCommand() {
86+
d.logger.Info("Worker commands feature disabled, dropping task",
87+
tag.WorkflowID(task.WorkflowID),
88+
tag.WorkflowRunID(task.RunID),
89+
tag.NewStringTag("control_queue", task.Destination),
90+
tag.NewInt("command_count", len(task.Commands)),
91+
)
92+
return nil
93+
}
94+
95+
if len(task.Commands) == 0 {
96+
return nil
97+
}
98+
99+
ctx, cancel := context.WithTimeout(ctx, workerCommandsTaskTimeout)
100+
defer cancel()
101+
102+
return d.dispatchToWorker(ctx, task)
103+
}
104+
105+
func (d *workerCommandsTaskDispatcher) dispatchToWorker(
106+
ctx context.Context,
107+
task *tasks.WorkerCommandsTask,
108+
) error {
109+
request := &workerservicepb.ExecuteCommandsRequest{
110+
Commands: task.Commands,
111+
}
112+
// Encode as binary/protobuf using the standard Temporal payload format.
113+
// Worker commands are handled directly by SDK Core (not by lang-SDK Nexus handlers),
114+
// so we use binary/protobuf which Core can decode natively via prost. The standard
115+
// payload.Encode() uses json/protobuf encoding, which Core does not support because
116+
// it normally delegates Nexus payload deserialization to the lang SDK.
117+
requestData, err := proto.Marshal(request)
118+
if err != nil {
119+
return fmt.Errorf("failed to encode worker commands request: %w", err)
120+
}
121+
requestPayload := &commonpb.Payload{
122+
Metadata: map[string][]byte{
123+
"encoding": []byte("binary/protobuf"),
124+
},
125+
Data: requestData,
126+
}
127+
128+
nexusRequest := &nexuspb.Request{
129+
Header: map[string]string{},
130+
Variant: &nexuspb.Request_StartOperation{
131+
StartOperation: &nexuspb.StartOperationRequest{
132+
Service: workerservicepb.WorkerService.ServiceName,
133+
Operation: workerservicepb.WorkerService.ExecuteCommands.Name(),
134+
Payload: requestPayload,
135+
},
136+
},
137+
}
138+
139+
resp, err := d.matchingRawClient.DispatchNexusTask(ctx, &matchingservice.DispatchNexusTaskRequest{
140+
NamespaceId: task.NamespaceID,
141+
TaskQueue: &taskqueuepb.TaskQueue{
142+
Name: task.Destination,
143+
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
144+
},
145+
Request: nexusRequest,
146+
})
147+
if err != nil {
148+
metrics.WorkerCommandsSent.With(d.metricsHandler).Record(1, metrics.OutcomeTag("rpc_error"))
149+
return fmt.Errorf("failed to dispatch worker commands to control queue %s: %w", task.Destination, err)
150+
}
151+
152+
nexusErr := dispatchResponseToError(resp)
153+
if nexusErr == nil {
154+
metrics.WorkerCommandsSent.With(d.metricsHandler).Record(1, metrics.OutcomeTag("success"))
155+
return nil
156+
}
157+
158+
return d.handleError(nexusErr, task)
159+
}
160+
161+
func (d *workerCommandsTaskDispatcher) handleError(nexusErr error, task *tasks.WorkerCommandsTask) error {
162+
var opErr *nexus.OperationError
163+
if errors.As(nexusErr, &opErr) {
164+
// Operation-level failure: the worker received and processed the request but returned
165+
// an error. Permanent -- the worker contract requires success for all defined commands,
166+
// so this indicates a bug or version incompatibility. Retrying won't help.
167+
d.logger.Error("Worker returned operation failure for worker commands",
168+
tag.WorkflowID(task.WorkflowID),
169+
tag.WorkflowRunID(task.RunID),
170+
tag.NewStringTag("control_queue", task.Destination),
171+
tag.NewInt("command_count", len(task.Commands)),
172+
tag.Error(nexusErr))
173+
metrics.WorkerCommandsSent.With(d.metricsHandler).Record(1, metrics.OutcomeTag("operation_error"))
174+
return nil
175+
}
176+
177+
var handlerErr *nexus.HandlerError
178+
if errors.As(nexusErr, &handlerErr) {
179+
if handlerErr.Type == nexus.HandlerErrorTypeUpstreamTimeout {
180+
d.logger.Warn("No worker polling control queue",
181+
tag.NewStringTag("control_queue", task.Destination))
182+
metrics.WorkerCommandsSent.With(d.metricsHandler).Record(1, metrics.OutcomeTag("no_poller"))
183+
return nexusErr
184+
}
185+
186+
d.logger.Warn("Worker commands transport failure",
187+
tag.NewStringTag("control_queue", task.Destination),
188+
tag.Error(nexusErr))
189+
metrics.WorkerCommandsSent.With(d.metricsHandler).Record(1, metrics.OutcomeTag("transport_error"))
190+
return nexusErr
191+
}
192+
193+
d.logger.Warn("Worker commands unexpected error",
194+
tag.NewStringTag("control_queue", task.Destination),
195+
tag.Error(nexusErr))
196+
metrics.WorkerCommandsSent.With(d.metricsHandler).Record(1, metrics.OutcomeTag("unexpected_error"))
197+
return nexusErr
198+
}

0 commit comments

Comments
 (0)