Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
syntax = "proto3";

package temporal.api.nexusservices.workerservice.v1;

option go_package = "go.temporal.io/api/nexusservices/workerservice/v1;workerservice";
option java_package = "io.temporal.api.nexusservices.workerservice.v1";
option java_multiple_files = true;
option java_outer_classname = "RequestResponseProto";
option ruby_package = "Temporalio::Api::Nexusservices::Workerservice::V1";
option csharp_namespace = "Temporalio.Api.Nexusservices.Workerservice.V1";

import "temporal/api/worker/v1/message.proto";

// (--
// Internal Nexus service for server-to-worker communication.
// See service.yaml for the service definition.
// --)

// Request payload for the "ExecuteCommands" Nexus operation.
message ExecuteCommandsRequest {
repeated temporal.api.worker.v1.WorkerCommand commands = 1;
}

// Response payload for the "ExecuteCommands" Nexus operation.
// The results list must be 1:1 with the commands list in the request (same size and order).
message ExecuteCommandsResponse {
repeated temporal.api.worker.v1.WorkerCommandResult results = 1;
}
32 changes: 32 additions & 0 deletions temporal/api/nexusservices/workerservice/v1/service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# yaml-language-server: $schema=https://raw.githubusercontent.com/nexus-rpc/nexus-rpc-gen/main/schemas/nexus-rpc-gen.json
#
# Nexus service definition for server-to-worker communication.
# See request_response.proto for message definitions.
#
# Task queue format: /temporal-sys/worker-commands/{namespace}/{worker_grouping_key}

nexusrpc: 1.0.0

services:
temporal.api.nexusservices.workerservice.v1.WorkerService:
description: >
Internal Nexus service for server-to-worker communication.
Used by the Temporal server to send commands to workers.
operations:
ExecuteCommands:
description: Executes worker commands sent by the server.
input:
$goRef: "go.temporal.io/api/nexusservices/workerservice/v1.ExecuteCommandsRequest"
$javaRef: "io.temporal.api.nexusservices.workerservice.v1.ExecuteCommandsRequest"
$pythonRef: "temporalio.api.nexusservices.workerservice.v1.ExecuteCommandsRequest"
$typescriptRef: "@temporalio/api/nexusservices/workerservice/v1.ExecuteCommandsRequest"
$dotnetRef: "Temporalio.Api.Nexusservices.Workerservice.V1.ExecuteCommandsRequest"
$rubyRef: "Temporalio::Api::Nexusservices::Workerservice::V1::ExecuteCommandsRequest"
output:
$goRef: "go.temporal.io/api/nexusservices/workerservice/v1.ExecuteCommandsResponse"
$javaRef: "io.temporal.api.nexusservices.workerservice.v1.ExecuteCommandsResponse"
$pythonRef: "temporalio.api.nexusservices.workerservice.v1.ExecuteCommandsResponse"
$typescriptRef: "@temporalio/api/nexusservices/workerservice/v1.ExecuteCommandsResponse"
$dotnetRef: "Temporalio.Api.Nexusservices.Workerservice.V1.ExecuteCommandsResponse"
$rubyRef: "Temporalio::Api::Nexusservices::Workerservice::V1::ExecuteCommandsResponse"

24 changes: 24 additions & 0 deletions temporal/api/worker/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,27 @@ message StorageDriverInfo {
// The type of the driver, required.
string type = 1;
}

// A command sent from the server to a worker.
message WorkerCommand {
oneof type {
CancelActivityCommand cancel_activity = 1;
}
}

// Cancel an activity if it is still running. Otherwise, do nothing.
message CancelActivityCommand {
bytes task_token = 1;
}

// The result of executing a WorkerCommand.
message WorkerCommandResult {
oneof type {
CancelActivityResult cancel_activity = 1;
}
}

// Result of a CancelActivityCommand.
// Treat both successful cancellation and no-op (activity is no longer running) as success.
message CancelActivityResult {
}
18 changes: 18 additions & 0 deletions temporal/api/workflowservice/v1/request_response.proto
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@ message PollWorkflowTaskQueueRequest {
// A unique key for this worker instance, used for tracking worker lifecycle.
// This is guaranteed to be unique, whereas identity is not guaranteed to be unique.
string worker_instance_key = 8;

// The task queue on which the server will send control tasks to this worker.
// Example tasks: Cancel activity, update config, etc.
string worker_control_task_queue = 9;

// Deprecated. Use deployment_options instead.
// Each worker process should provide an ID unique to the specific set of code it is running
// "checksum" in this field name isn't very accurate, it should be though of as an id.
Expand Down Expand Up @@ -381,6 +386,14 @@ message RespondWorkflowTaskCompletedRequest {
// Worker deployment options that user has set in the worker.
temporal.api.deployment.v1.WorkerDeploymentOptions deployment_options = 17;

// A unique key for this worker instance, used for tracking worker lifecycle.
// This is guaranteed to be unique, whereas identity is not guaranteed to be unique.
string worker_instance_key = 18;

// The task queue on which the server will send control tasks to this worker.
// Example tasks: Cancel activity, update config, etc.
string worker_control_task_queue = 19;

// SDK capability details.
message Capabilities {
// True if the SDK can handle speculative workflow task with command events. If true, the
Expand Down Expand Up @@ -444,6 +457,11 @@ message PollActivityTaskQueueRequest {
// A unique key for this worker instance, used for tracking worker lifecycle.
// This is guaranteed to be unique, whereas identity is not guaranteed to be unique.
string worker_instance_key = 8;

// The task queue on which the server will send control tasks to this worker.
// Example tasks: Cancel activity, update config, etc.
string worker_control_task_queue = 9;

temporal.api.taskqueue.v1.TaskQueueMetadata task_queue_metadata = 4;
// Information about this worker's build identifier and if it is choosing to use the versioning
// feature. See the `WorkerVersionCapabilities` docstring for more.
Expand Down
Loading