Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions api/persistence/v1/executions.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,10 @@ message ActivityInfo {
bool reset_heartbeats = 48;

int64 start_version = 50;

// A dedicated per-worker Nexus task queue on which the server sends control
// tasks (e.g. activity cancellation) to this specific worker instance.
string worker_control_task_queue = 51;
}

// timer_map column
Expand Down
1 change: 1 addition & 0 deletions service/history/api/recordactivitytaskstarted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func recordActivityTaskStarted(
if _, err := mutableState.AddActivityTaskStartedEvent(
ai, scheduledEventID, requestID, request.PollRequest.GetIdentity(),
versioningStamp, pollerDeployment, request.GetBuildIdRedirectInfo(),
request.PollRequest.GetWorkerControlTaskQueue(),
); err != nil {
return nil, rejectCodeUndefined, err
}
Expand Down
1 change: 1 addition & 0 deletions service/history/api/respondactivitytaskcompleted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func Invoke(
// TODO (shahab): do we need to do anything with wf redirect in this case or any
// other case where an activity starts?
nil,
"", // workerControlTaskQueue not available for force complete
)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions service/history/api/respondworkflowtaskcompleted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(

workflowTaskHandler := newWorkflowTaskCompletedHandler(
request.GetIdentity(),
request.GetWorkerControlTaskQueue(),
completedEvent.GetEventId(), // If completedEvent is nil, then GetEventId() returns 0 and this value shouldn't be used in workflowTaskHandler.
ms,
updateRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type (

workflowTaskCompletedHandler struct {
identity string
workerControlTaskQueue string
workflowTaskCompletedID int64

// internal state
Expand Down Expand Up @@ -106,6 +107,7 @@ type (

func newWorkflowTaskCompletedHandler(
identity string,
workerControlTaskQueue string,
workflowTaskCompletedID int64,
mutableState historyi.MutableState,
updateRegistry update.Registry,
Expand All @@ -126,6 +128,7 @@ func newWorkflowTaskCompletedHandler(
) *workflowTaskCompletedHandler {
return &workflowTaskCompletedHandler{
identity: identity,
workerControlTaskQueue: workerControlTaskQueue,
workflowTaskCompletedID: workflowTaskCompletedID,

// internal state
Expand Down Expand Up @@ -585,6 +588,7 @@ func (handler *workflowTaskCompletedHandler) handlePostCommandEagerExecuteActivi
stamp,
nil,
nil,
handler.workerControlTaskQueue, // Eager: activity runs on the same worker that completed the WFT.
); err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func TestCommandProtocolMessage(t *testing.T) {
)
out.handler = newWorkflowTaskCompletedHandler(
t.Name(), // identity
"", // workerControlTaskQueue
123, // workflowTaskCompletedID
out.ms,
out.updates,
Expand Down
1 change: 1 addition & 0 deletions service/history/history_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6702,6 +6702,7 @@ func addActivityTaskStartedEvent(ms historyi.MutableState, scheduledEventID int6
nil,
nil,
nil,
"",
)
return event
}
Expand Down
1 change: 1 addition & 0 deletions service/history/interfaces/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type (
*commonpb.WorkerVersionStamp,
*deploymentpb.Deployment,
*taskqueuespb.BuildIdRedirectInfo,
string, // workerControlTaskQueue
) (*historypb.HistoryEvent, error)
AddActivityTaskTimedOutEvent(int64, int64, *failurepb.Failure, enumspb.RetryState) (*historypb.HistoryEvent, error)
AddChildWorkflowExecutionCanceledEvent(int64, *commonpb.WorkflowExecution, *historypb.WorkflowExecutionCanceledEventAttributes) (*historypb.HistoryEvent, error)
Expand Down
8 changes: 4 additions & 4 deletions service/history/interfaces/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4093,6 +4093,7 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent(
versioningStamp *commonpb.WorkerVersionStamp,
deployment *deploymentpb.Deployment,
redirectInfo *taskqueuespb.BuildIdRedirectInfo,
workerControlTaskQueue string,
) (*historypb.HistoryEvent, error) {
opTag := tag.WorkflowActionActivityTaskStarted
err := ms.checkMutability(opTag)
Expand Down Expand Up @@ -4121,6 +4122,8 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent(
ai.LastDeploymentVersion = worker_versioning.ExternalWorkerDeploymentVersionFromDeployment(deployment)
}

ai.WorkerControlTaskQueue = workerControlTaskQueue

if !ai.HasRetryPolicy {
event := ms.hBuilder.AddActivityTaskStartedEvent(
scheduledEventID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ func (s *retryActivitySuite) makeActivityAndPutIntoFailingState() *persistencesp
nil,
nil,
nil,
"",
)
s.NoError(err)

Expand Down
75 changes: 75 additions & 0 deletions service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2878,6 +2878,7 @@ func (s *mutableStateSuite) TestRetryActivity_TruncateRetryableFailure() {
nil,
nil,
nil,
"",
)
s.NoError(err)

Expand Down Expand Up @@ -2943,6 +2944,7 @@ func (s *mutableStateSuite) TestRetryActivity_PausedIncrementsStamp() {
nil,
nil,
nil,
"",
)
s.NoError(err)

Expand Down Expand Up @@ -6151,6 +6153,79 @@ func (s *mutableStateSuite) TestSetContextMetadata() {
s.Equal(taskQueue, tq)
}

func (s *mutableStateSuite) TestAddActivityTaskStartedEventStoresWorkerControlTaskQueue() {
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()

// Setup workflow execution
_, err := s.mutableState.AddWorkflowExecutionStartedEvent(
&commonpb.WorkflowExecution{WorkflowId: tests.WorkflowID, RunId: tests.RunID},
&historyservice.StartWorkflowExecutionRequest{
NamespaceId: tests.NamespaceID.String(),
StartRequest: &workflowservice.StartWorkflowExecutionRequest{
WorkflowType: &commonpb.WorkflowType{Name: "workflow-type"},
TaskQueue: &taskqueuepb.TaskQueue{Name: "task-queue"},
WorkflowRunTimeout: durationpb.New(200 * time.Second),
WorkflowTaskTimeout: durationpb.New(1 * time.Second),
},
},
)
s.NoError(err)

di, err := s.mutableState.AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_NORMAL)
s.NoError(err)
_, _, err = s.mutableState.AddWorkflowTaskStartedEvent(
di.ScheduledEventID,
di.RequestID,
di.TaskQueue,
"identity",
nil,
nil,
nil,
false,
nil,
)
s.NoError(err)
_, err = s.mutableState.AddWorkflowTaskCompletedEvent(
di,
&workflowservice.RespondWorkflowTaskCompletedRequest{Identity: "identity"},
workflowTaskCompletionLimits,
)
s.NoError(err)

// Schedule activity
workflowTaskCompletedEventID := int64(4)
_, activityInfo, err := s.mutableState.AddActivityTaskScheduledEvent(
workflowTaskCompletedEventID,
&commandpb.ScheduleActivityTaskCommandAttributes{
ActivityId: "test-activity-1",
ActivityType: &commonpb.ActivityType{Name: "test-activity-type"},
TaskQueue: &taskqueuepb.TaskQueue{Name: "test-task-queue"},
},
false,
)
s.NoError(err)
s.Empty(activityInfo.WorkerControlTaskQueue, "WorkerControlTaskQueue should be empty before activity starts")

// Start activity with workerControlTaskQueue
expectedWorkerControlTaskQueue := "test-control-queue"
_, err = s.mutableState.AddActivityTaskStartedEvent(
activityInfo,
activityInfo.ScheduledEventId,
uuid.NewString(),
"worker-identity",
nil,
nil,
nil,
expectedWorkerControlTaskQueue,
)
s.NoError(err)

// Verify workerControlTaskQueue is stored
updatedActivityInfo, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId)
s.True(ok)
s.Equal(expectedWorkerControlTaskQueue, updatedActivityInfo.WorkerControlTaskQueue)
}

func (s *mutableStateSuite) TestCloseTransaction_PrincipalStamped() {
for _, tc := range []struct {
name string
Expand Down
2 changes: 2 additions & 0 deletions service/matching/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context, pollMetadata *pollMetada
WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities,
DeploymentOptions: pollMetadata.deploymentOptions,
WorkerInstanceKey: pollMetadata.workerInstanceKey,
WorkerControlTaskQueue: pollMetadata.workerControlTaskQueue,
},
ForwardedSource: fwdr.partition.RpcName(),
Conditions: pollMetadata.conditions,
Expand All @@ -275,6 +276,7 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context, pollMetadata *pollMetada
WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities,
DeploymentOptions: pollMetadata.deploymentOptions,
WorkerInstanceKey: pollMetadata.workerInstanceKey,
WorkerControlTaskQueue: pollMetadata.workerControlTaskQueue,
},
ForwardedSource: fwdr.partition.RpcName(),
Conditions: pollMetadata.conditions,
Expand Down
3 changes: 3 additions & 0 deletions service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type (
forwardedFrom string
localPollStartTime time.Time
workerInstanceKey string
workerControlTaskQueue string
}

userDataUpdate struct {
Expand Down Expand Up @@ -686,6 +687,7 @@ pollLoop:
forwardedFrom: req.ForwardedSource,
conditions: req.Conditions,
workerInstanceKey: request.WorkerInstanceKey,
workerControlTaskQueue: request.WorkerControlTaskQueue,
}
task, versionSetUsed, err := e.pollTask(pollerCtx, partition, pollMetadata)
if err != nil {
Expand Down Expand Up @@ -958,6 +960,7 @@ pollLoop:
forwardedFrom: req.ForwardedSource,
conditions: req.Conditions,
workerInstanceKey: request.WorkerInstanceKey,
workerControlTaskQueue: request.WorkerControlTaskQueue,
}
task, versionSetUsed, err := e.pollTask(pollerCtx, partition, pollMetadata)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions service/matching/pri_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ func ForwardPollWithTarget(
WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities,
DeploymentOptions: pollMetadata.deploymentOptions,
WorkerInstanceKey: pollMetadata.workerInstanceKey,
WorkerControlTaskQueue: pollMetadata.workerControlTaskQueue,
},
ForwardedSource: source.RpcName(),
Conditions: pollMetadata.conditions,
Expand All @@ -263,6 +264,7 @@ func ForwardPollWithTarget(
WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities,
DeploymentOptions: pollMetadata.deploymentOptions,
WorkerInstanceKey: pollMetadata.workerInstanceKey,
WorkerControlTaskQueue: pollMetadata.workerControlTaskQueue,
},
ForwardedSource: source.RpcName(),
Conditions: pollMetadata.conditions,
Expand Down
Loading