Skip to content

Commit 43b4618

Browse files
rkannan82claude
andauthored
Add Nexus worker service for server-to-worker commands (#708)
## Summary Defines a Nexus service for server-to-worker communication, starting with activity cancellation support. ## Design Decision We chose a **generic command API** (`ExecuteCommandsRequest` with `oneof` command types) instead of a cancel-specific API. This allows a future optimization to batch multiple commands (cancel, pause, etc) in a single request and deliver to a worker in one RPC. ## Files - `temporal/api/nexusservices/workerservice/v1/request_response.proto` - request response definitions - `nexus-rpc/temporal-proto-models-nexusrpc.yaml` - Nexus service definition ## Related - [Server PR](temporalio/temporal#9233) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 9461f7b commit 43b4618

File tree

3 files changed

+83
-0
lines changed

3 files changed

+83
-0
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# yaml-language-server: $schema=https://raw.githubusercontent.com/nexus-rpc/nexus-rpc-gen/main/schemas/nexus-rpc-gen.json
2+
#
3+
# Nexus service definition for server-to-worker communication.
4+
# See request_response.proto for message definitions.
5+
#
6+
# Task queue format: /temporal-sys/worker-commands/{namespace}/{worker_grouping_key}
7+
8+
nexusrpc: 1.0.0
9+
10+
services:
11+
temporal.api.nexusservices.workerservice.v1.WorkerService:
12+
description: >
13+
Internal Nexus service for server-to-worker communication.
14+
Used by the Temporal server to send commands to workers.
15+
operations:
16+
ExecuteCommands:
17+
description: Executes worker commands sent by the server.
18+
input:
19+
$goRef: "go.temporal.io/api/nexusservices/workerservice/v1.ExecuteCommandsRequest"
20+
$javaRef: "io.temporal.api.nexusservices.workerservice.v1.ExecuteCommandsRequest"
21+
$pythonRef: "temporalio.api.nexusservices.workerservice.v1.ExecuteCommandsRequest"
22+
$typescriptRef: "@temporalio/api/nexusservices/workerservice/v1.ExecuteCommandsRequest"
23+
$dotnetRef: "Temporalio.Api.Nexusservices.Workerservice.V1.ExecuteCommandsRequest"
24+
$rubyRef: "Temporalio::Api::Nexusservices::Workerservice::V1::ExecuteCommandsRequest"
25+
output:
26+
$goRef: "go.temporal.io/api/nexusservices/workerservice/v1.ExecuteCommandsResponse"
27+
$javaRef: "io.temporal.api.nexusservices.workerservice.v1.ExecuteCommandsResponse"
28+
$pythonRef: "temporalio.api.nexusservices.workerservice.v1.ExecuteCommandsResponse"
29+
$typescriptRef: "@temporalio/api/nexusservices/workerservice/v1.ExecuteCommandsResponse"
30+
$dotnetRef: "Temporalio.Api.Nexusservices.Workerservice.V1.ExecuteCommandsResponse"
31+
$rubyRef: "Temporalio::Api::Nexusservices::Workerservice::V1::ExecuteCommandsResponse"
32+
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
syntax = "proto3";
2+
3+
package temporal.api.nexusservices.workerservice.v1;
4+
5+
option go_package = "go.temporal.io/api/nexusservices/workerservice/v1;workerservice";
6+
option java_package = "io.temporal.api.nexusservices.workerservice.v1";
7+
option java_multiple_files = true;
8+
option java_outer_classname = "RequestResponseProto";
9+
option ruby_package = "Temporalio::Api::Nexusservices::Workerservice::V1";
10+
option csharp_namespace = "Temporalio.Api.Nexusservices.Workerservice.V1";
11+
12+
import "temporal/api/worker/v1/message.proto";
13+
14+
// (--
15+
// Internal Nexus service for server-to-worker communication.
16+
// --)
17+
18+
// Request payload for the "ExecuteCommands" Nexus operation.
19+
message ExecuteCommandsRequest {
20+
repeated temporal.api.worker.v1.WorkerCommand commands = 1;
21+
}
22+
23+
// Response payload for the "ExecuteCommands" Nexus operation.
24+
// The results list must be 1:1 with the commands list in the request (same size and order).
25+
message ExecuteCommandsResponse {
26+
repeated temporal.api.worker.v1.WorkerCommandResult results = 1;
27+
}

temporal/api/worker/v1/message.proto

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,3 +193,27 @@ message StorageDriverInfo {
193193
// The type of the driver, required.
194194
string type = 1;
195195
}
196+
197+
// A command sent from the server to a worker.
198+
message WorkerCommand {
199+
oneof type {
200+
CancelActivityCommand cancel_activity = 1;
201+
}
202+
}
203+
204+
// Cancel an activity if it is still running. Otherwise, do nothing.
205+
message CancelActivityCommand {
206+
bytes task_token = 1;
207+
}
208+
209+
// The result of executing a WorkerCommand.
210+
message WorkerCommandResult {
211+
oneof type {
212+
CancelActivityResult cancel_activity = 1;
213+
}
214+
}
215+
216+
// Result of a CancelActivityCommand.
217+
// Treat both successful cancellation and no-op (activity is no longer running) as success.
218+
message CancelActivityResult {
219+
}

0 commit comments

Comments
 (0)