Guard against late-arriving polls after worker shutdown#9330
Guard against late-arriving polls after worker shutdown#9330
Conversation
When CancelOutstandingWorkerPolls is called, the WorkerInstanceKey is cached in a TTL cache (60s default). Any subsequent poll arriving with this key returns empty immediately, preventing task dispatch to a shutting-down worker. This handles the edge case where a poll request was in-flight (already sent by SDK) when ShutdownWorker was called, arriving at the server after the cancellation logic has completed. - Add ShutdownWorkerCacheTTL dynamic config (60s default) - Add shutdownWorkers TTL cache to matchingEngineImpl - Check cache early in PollWorkflowTaskQueue/PollActivityTaskQueue - Add unit tests for cache behavior
441d5b6 to
5c1df05
Compare
| `ShutdownWorkerCacheTTL is the time to live for entries in the shutdown worker cache. When a worker calls | ||
| ShutdownWorker, its WorkerInstanceKey is cached for this duration. Any poll arriving with a cached | ||
| WorkerInstanceKey returns empty immediately, preventing task dispatch to a shutting-down worker. | ||
| This should be longer than MatchingLongPollExpirationInterval (1 min default) to catch in-flight polls.`, |
There was a problem hiding this comment.
This isn't related to the length of a long poll at all, it about the interval where rpcs can get reordered on the network. Which is unbounded in theory but practically something like 10s-30s should be fine. The SDK isn't waiting that long anyway between calling ShutdownWorker and actually exiting, right?
| outstandingPollers: collection.NewSyncMap[string, context.CancelFunc](), | ||
| workerInstancePollers: workerPollerTracker{pollers: make(map[string]map[string]context.CancelFunc)}, | ||
| // 50000 entries ≈ 10MB (each entry ~200 bytes: UUID key + cache overhead) | ||
| shutdownWorkers: cache.New(50000, &cache.Options{TTL: config.ShutdownWorkerCacheTTL()}), |
There was a problem hiding this comment.
If you're going to use dynamic config, why not the cache size too? (I'm fine with neither being dynamic, actually)
| `PollerHistoryTTL is the time to live for poller histories in the pollerHistory cache of a physical task queue. Poller histories are fetched when | ||
| requiring a list of pollers that polled a given task queue.`, | ||
| ) | ||
| ShutdownWorkerCacheTTL = NewGlobalDurationSetting( |
There was a problem hiding this comment.
Note that this requires a process restart to take effect
| // This guards against polls that arrive after CancelOutstandingWorkerPolls completed. | ||
| if workerInstanceKey := request.GetWorkerInstanceKey(); workerInstanceKey != "" { | ||
| if e.shutdownWorkers.Get(workerInstanceKey) != nil { | ||
| return emptyPollWorkflowTaskQueueResponse, nil |
There was a problem hiding this comment.
I would think it should return nil, serviceerror.NewCanceled("worker shutdown") or something like that
|
Already implemented in #9545. |
What changed?
When
CancelOutstandingWorkerPollsis called, theWorkerInstanceKeyis cached in a TTL cache (70s default). Any subsequent poll arriving with this key returns empty immediately, preventing task dispatch to a shutting-down worker.Why?
This handles the edge case where a poll request was in-flight (already sent by SDK) when
ShutdownWorkerwas called, arriving at the server after the cancellation logic has completed. Without this guard, such polls could receive tasks that would never be processed.How did you test it?
Potential risks