Skip to content

Commit e00a33e

Browse files
committed
fix: address PR review — skip retry sleep on shutdown, add tests
- Capture workerThread in startAndBlock() so close() can interrupt the thread even when startAndBlock() is called directly (not via start()), fixing the case where the 5s sleep blocks shutdown. - Add isNormalShutdown guard before the retry sleep so any exception code (UNAVAILABLE, CANCELLED, etc.) exits promptly during shutdown. - Add DurableTaskGrpcWorkerShutdownTest with 3 scenarios: - start() + close() terminates the worker thread promptly - startAndBlock() on a separate thread exits on close() - startAndBlock() exits on thread interrupt Signed-off-by: Javier Aliaga <javier@diagrid.io>
1 parent 307bbfd commit e00a33e

2 files changed

Lines changed: 133 additions & 0 deletions

File tree

durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ public void close() {
158158
* interrupt signal.</p>
159159
*/
160160
public void startAndBlock() {
161+
this.workerThread = Thread.currentThread();
161162
logger.log(Level.INFO, "Durable Task worker is connecting to sidecar at {0}.", this.getSidecarAddress());
162163

163164
TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor(
@@ -218,6 +219,10 @@ public void startAndBlock() {
218219
String.format("Unexpected failure connecting to %s", this.getSidecarAddress()), e);
219220
}
220221

222+
if (this.isNormalShutdown) {
223+
break;
224+
}
225+
221226
// Retry after 5 seconds
222227
try {
223228
Thread.sleep(5000);
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.durabletask;
15+
16+
import io.grpc.ManagedChannel;
17+
import io.grpc.ManagedChannelBuilder;
18+
import org.junit.jupiter.api.Test;
19+
20+
import java.time.Duration;
21+
import java.time.Instant;
22+
23+
import static org.junit.jupiter.api.Assertions.assertFalse;
24+
import static org.junit.jupiter.api.Assertions.assertTrue;
25+
26+
/**
27+
* Unit tests for DurableTaskGrpcWorker shutdown behavior.
28+
*/
29+
public class DurableTaskGrpcWorkerShutdownTest {
30+
31+
/**
32+
* Verifies that calling close() on a worker that was started via start()
33+
* causes the worker thread to terminate promptly (within a bounded time),
34+
* rather than hanging in the retry loop.
35+
*/
36+
@Test
37+
void workerThreadTerminatesPromptlyOnClose() throws Exception {
38+
// Use an arbitrary port where no sidecar is running — the worker will
39+
// enter the retry loop (UNAVAILABLE → sleep 5s → retry).
40+
DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder()
41+
.port(19876)
42+
.build();
43+
44+
worker.start();
45+
46+
// Give the worker thread time to enter the retry loop
47+
Thread.sleep(500);
48+
49+
Instant before = Instant.now();
50+
worker.close();
51+
52+
// Wait for the worker thread to finish — the join is bounded so the
53+
// test doesn't hang if the fix regresses.
54+
Thread workerThread = getWorkerThread(worker);
55+
if (workerThread != null) {
56+
workerThread.join(Duration.ofSeconds(3).toMillis());
57+
assertFalse(workerThread.isAlive(),
58+
"Worker thread should have terminated after close()");
59+
}
60+
61+
Duration elapsed = Duration.between(before, Instant.now());
62+
assertTrue(elapsed.toMillis() < 3000,
63+
"close() should return promptly, but took " + elapsed.toMillis() + "ms");
64+
}
65+
66+
/**
67+
* Verifies that calling close() on a worker that was started via
68+
* startAndBlock() on a separate thread terminates that thread promptly.
69+
*/
70+
@Test
71+
void startAndBlockExitsOnClose() throws Exception {
72+
DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder()
73+
.port(19877)
74+
.build();
75+
76+
Thread blockingThread = new Thread(worker::startAndBlock);
77+
blockingThread.start();
78+
79+
// Give the blocking thread time to enter the retry loop
80+
Thread.sleep(500);
81+
82+
Instant before = Instant.now();
83+
worker.close();
84+
85+
blockingThread.join(Duration.ofSeconds(3).toMillis());
86+
assertFalse(blockingThread.isAlive(),
87+
"startAndBlock() thread should have terminated after close()");
88+
89+
Duration elapsed = Duration.between(before, Instant.now());
90+
assertTrue(elapsed.toMillis() < 3000,
91+
"close() should terminate startAndBlock() promptly, but took " + elapsed.toMillis() + "ms");
92+
}
93+
94+
/**
95+
* Verifies that interrupting the thread running startAndBlock() causes it
96+
* to exit and preserves the interrupt status.
97+
*/
98+
@Test
99+
void startAndBlockExitsOnInterrupt() throws Exception {
100+
DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder()
101+
.port(19878)
102+
.build();
103+
104+
Thread blockingThread = new Thread(worker::startAndBlock);
105+
blockingThread.start();
106+
107+
// Give the blocking thread time to enter the retry loop
108+
Thread.sleep(500);
109+
110+
blockingThread.interrupt();
111+
blockingThread.join(Duration.ofSeconds(3).toMillis());
112+
113+
assertFalse(blockingThread.isAlive(),
114+
"startAndBlock() thread should have exited after interrupt");
115+
116+
worker.close();
117+
}
118+
119+
private Thread getWorkerThread(DurableTaskGrpcWorker worker) {
120+
try {
121+
java.lang.reflect.Field f = DurableTaskGrpcWorker.class.getDeclaredField("workerThread");
122+
f.setAccessible(true);
123+
return (Thread) f.get(worker);
124+
} catch (Exception e) {
125+
return null;
126+
}
127+
}
128+
}

0 commit comments

Comments
 (0)