fix(durabletask): detect shutdown and break gRPC worker loop#1727
fix(durabletask): detect shutdown and break gRPC worker loop#1727javier-aliaga wants to merge 3 commits intodapr:masterfrom
Conversation
60ba79d to
5634b8d
Compare
The worker loop ran `while (true)` and only exited if an InterruptedException happened during the 5-second retry sleep. If `close()` was called while the gRPC stream was blocking, the CANCELLED exception was logged but the loop kept retrying. - Replace `while (true)` with a check on `isNormalShutdown` and the thread interrupt flag so the loop exits promptly. - Break out of the retry path on CANCELLED when `isNormalShutdown` is set, avoiding a misleading 5-second sleep after `close()`. - Re-set the interrupt flag before breaking on InterruptedException to preserve the interrupt contract for callers higher up. Signed-off-by: Javier Aliaga <javier@diagrid.io>
5634b8d to
307bbfd
Compare
There was a problem hiding this comment.
Pull request overview
Fixes DurableTaskGrpcWorker.startAndBlock() not terminating promptly during shutdown when the gRPC getWorkItems() stream is blocking, aligning behavior with issue #1728 expectations.
Changes:
- Replaces the infinite worker loop with a loop that checks
isNormalShutdownand thread interrupt status. - Exits the retry path on
CANCELLEDduring normal shutdown and preserves interrupt status when sleep is interrupted.
Comments suppressed due to low confidence (1)
durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java:179
- The outer loop now checks
isNormalShutdown, but the worker can still remain inside the innerwhile (workItemStream.hasNext())and keep pulling/dispatching work items after shutdown has been requested. This is especially problematic becauseclose()shuts downworkerPoolbefore the gRPC channel is closed, soworkerPool.submit(...)can start throwingRejectedExecutionException(uncaught) during shutdown. Consider checkingisNormalShutdown/Thread.interrupted()inside the work-item loop and breaking out (or short-circuiting submissions) as soon as shutdown is requested, to avoid dispatching new work during shutdown and to exit promptly.
while (!this.isNormalShutdown && !Thread.currentThread().isInterrupted()) {
try {
OrchestratorService.GetWorkItemsRequest getWorkItemsRequest = OrchestratorService.GetWorkItemsRequest
.newBuilder().build();
Iterator<OrchestratorService.WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
while (workItemStream.hasNext()) {
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java
Show resolved
Hide resolved
durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java
Show resolved
Hide resolved
durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java
Show resolved
Hide resolved
- Capture workerThread in startAndBlock() so close() can interrupt the thread even when startAndBlock() is called directly (not via start()), fixing the case where the 5s sleep blocks shutdown. - Add isNormalShutdown guard before the retry sleep so any exception code (UNAVAILABLE, CANCELLED, etc.) exits promptly during shutdown. - Add DurableTaskGrpcWorkerShutdownTest with 3 scenarios: - start() + close() terminates the worker thread promptly - startAndBlock() on a separate thread exits on close() - startAndBlock() exits on thread interrupt Signed-off-by: Javier Aliaga <javier@diagrid.io>
31a1d32 to
e00a33e
Compare
There was a problem hiding this comment.
Pull request overview
This PR fixes DurableTaskGrpcWorker.startAndBlock() not exiting promptly during shutdown by making the worker loop aware of normal shutdown and thread interrupts, and by avoiding retries when gRPC disconnects due to shutdown.
Changes:
- Update the worker loop condition to stop on normal shutdown and on thread interrupt.
- Break out of the retry path when gRPC returns
CANCELLEDduring a normal shutdown, and preserve interrupt status when catchingInterruptedException. - Add unit tests intended to validate shutdown/interrupt behavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
| durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java | Adjusts the worker loop to exit on shutdown/interrupt and handles CANCELLED during shutdown. |
| durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java | Adds tests for prompt termination on close() and on interrupt. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java
Show resolved
Hide resolved
durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java
Outdated
Show resolved
Hide resolved
durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java
Show resolved
Hide resolved
durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java
Show resolved
Hide resolved
durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java
Outdated
Show resolved
Hide resolved
durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java
Show resolved
Hide resolved
durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java
Outdated
Show resolved
Hide resolved
durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java
Show resolved
Hide resolved
- Mark workerThread as volatile for cross-thread visibility - Remove unused imports (ManagedChannel, ManagedChannelBuilder) - Fail test explicitly when reflection fails instead of silently returning null - Assert interrupt status is preserved in startAndBlockExitsOnInterrupt Signed-off-by: Javier Aliaga <javier@diagrid.io>
3578c26 to
944bf24
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #1727 +/- ##
=========================================
Coverage 72.90% 72.90%
Complexity 2257 2257
=========================================
Files 242 242
Lines 7415 7415
Branches 738 738
=========================================
Hits 5406 5406
Misses 1648 1648
Partials 361 361 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Description
Summary
DurableTaskGrpcWorker.startAndBlock()loop not exiting whenclose()/stop()is called while the gRPC stream is blockingwhile (true)loop only broke onInterruptedExceptionduring the 5s retry sleep — aCANCELLEDfrom channel shutdown was logged but retried indefinitelyChanges
durabletask-client —
DurableTaskGrpcWorkerwhile (true)withwhile (!isNormalShutdown && !Thread.currentThread().isInterrupted())to check shutdown signals each iterationCANCELLEDis received during a normal shutdown, avoiding a stale 5s sleep cycleInterruptedExceptionto preserve the interrupt contract for upstream callersIssue reference
We strive to have all PR being opened based on an issue, where the problem or feature have been discussed prior to implementation.
Please reference the issue this PR will close: #1728
Checklist
Please make sure you've completed the relevant tasks for this PR, out of the following list: