Skip to content

Commit 17a7449

Browse files
mergify[bot]FieldSwanbjsowa
authored
fix: Prevent client destruction race condition (backport #1183) (#1189)
* fix: Prevent client destruction race condition (#1183) * Client creation/destruction race condition stress test * Fix race condition with client destruction * Use ros isolated pytest (#4) --------- Co-authored-by: Błażej Sowa <bsowa123@gmail.com> (cherry picked from commit 5b20a2e) # Conflicts: # rosbridge_server/CMakeLists.txt * Fix conflicts --------- Co-authored-by: FieldSwan <michael.swan@fieldai.com> Co-authored-by: Błażej Sowa <bsowa123@gmail.com>
1 parent 34d67d5 commit 17a7449

4 files changed

Lines changed: 220 additions & 4 deletions

File tree

rosbridge_server/CMakeLists.txt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
cmake_minimum_required(VERSION 3.5)
22
project(rosbridge_server)
33

4-
find_package(ament_cmake REQUIRED)
4+
find_package(ament_cmake_ros REQUIRED)
55
find_package(ament_cmake_python REQUIRED)
66

77
ament_python_install_package(
@@ -22,6 +22,14 @@ install(FILES
2222

2323
if(BUILD_TESTING)
2424
find_package(launch_testing_ament_cmake REQUIRED)
25+
find_package(ament_cmake_pytest REQUIRED)
26+
27+
# Unit tests (pytest)
28+
ament_add_ros_isolated_pytest_test(test_stress_clients
29+
"test/test_stress_clients.py"
30+
TIMEOUT 60
31+
)
32+
2533
add_launch_test(test/websocket/advertise_action.test.py)
2634
add_launch_test(test/websocket/advertise_action_feedback.test.py)
2735
add_launch_test(test/websocket/advertise_service.test.py)

rosbridge_server/package.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
<author email="jonathan.c.mace@gmail.com">Jonathan Mace</author>
1414
<maintainer email="blazej@fictionlab.pl">Błażej Sowa</maintainer>
1515

16-
<buildtool_depend>ament_cmake</buildtool_depend>
16+
<buildtool_depend>ament_cmake_ros</buildtool_depend>
1717
<buildtool_depend>ament_cmake_python</buildtool_depend>
1818

1919
<exec_depend>python3-tornado</exec_depend>

rosbridge_server/src/rosbridge_server/websocket_handler.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,11 @@ def run(self) -> None:
120120
msg = self.queue.popleft()
121121

122122
self.protocol.incoming(msg)
123-
124-
self.protocol.finish()
123+
executor = self.protocol.node_handle.executor
124+
if executor is not None:
125+
executor.create_task(self.protocol.finish)
126+
else:
127+
self.protocol.finish()
125128

126129

127130
class RosbridgeWebSocket(WebSocketHandler):
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
#!/usr/bin/env python3
2+
"""Stress tests for concurrent client creation and destruction."""
3+
4+
from __future__ import annotations
5+
6+
import json
7+
import threading
8+
import time
9+
import unittest
10+
import uuid
11+
12+
import rclpy
13+
from rclpy.executors import SingleThreadedExecutor
14+
from rclpy.node import Node
15+
from rosbridge_library.rosbridge_protocol import RosbridgeProtocol
16+
from rosbridge_server.websocket_handler import IncomingQueue
17+
18+
19+
def _subscribe_msg(topic: str) -> str:
20+
return json.dumps({"op": "subscribe", "topic": topic, "type": "std_msgs/msg/String"})
21+
22+
23+
def _advertise_msg(topic: str) -> str:
24+
return json.dumps({"op": "advertise", "topic": topic, "type": "std_msgs/msg/String"})
25+
26+
27+
class TestStressClients(unittest.TestCase):
28+
"""Stress test client lifecycle operations against the executor."""
29+
30+
def setUp(self) -> None:
31+
rclpy.init()
32+
self.executor = SingleThreadedExecutor()
33+
self.node = Node("test_stress_clients")
34+
self.executor.add_node(self.node)
35+
36+
self.executor_errors: list[BaseException] = []
37+
self._stop_spinning = threading.Event()
38+
self.exec_thread = threading.Thread(target=self._spin_executor, daemon=True)
39+
self.exec_thread.start()
40+
41+
self._test_id = uuid.uuid4().hex[:8]
42+
43+
def _spin_executor(self) -> None:
44+
while not self._stop_spinning.is_set():
45+
try:
46+
self.executor.spin_once(timeout_sec=0.05)
47+
except Exception as e: # noqa: PERF203
48+
self.executor_errors.append(e)
49+
50+
def tearDown(self) -> None:
51+
self._stop_spinning.set()
52+
self.exec_thread.join(timeout=10)
53+
self.executor.remove_node(self.node)
54+
self.node.destroy_node()
55+
self.executor.shutdown()
56+
rclpy.shutdown()
57+
58+
def _assert_executor_healthy(self) -> None:
59+
self.assertTrue(self.exec_thread.is_alive(), "Executor thread died")
60+
self.assertEqual(
61+
self.executor_errors,
62+
[],
63+
f"Executor raised {len(self.executor_errors)} error(s): {self.executor_errors[:5]}",
64+
)
65+
66+
def _assert_executor_functional(self, msg: str = "Executor cannot process tasks") -> None:
67+
done = threading.Event()
68+
self.executor.create_task(done.set)
69+
self.assertTrue(done.wait(timeout=5), msg)
70+
71+
def _make_client(self, num_subs: int, num_pubs: int, prefix: str) -> IncomingQueue:
72+
protocol = RosbridgeProtocol(str(uuid.uuid4()), self.node)
73+
queue = IncomingQueue(protocol)
74+
queue.start()
75+
for i in range(num_subs):
76+
queue.push(_subscribe_msg(f"/{prefix}_s{i}"))
77+
for i in range(num_pubs):
78+
queue.push(_advertise_msg(f"/{prefix}_p{i}"))
79+
return queue
80+
81+
def test_barrier_synchronized_mass_disconnect(self) -> None:
82+
"""30 clients disconnect at the exact same instant via a barrier."""
83+
num_clients = 30
84+
entities_per_client = 20
85+
86+
queues = [
87+
self._make_client(
88+
entities_per_client, entities_per_client, f"barrier_{self._test_id}_c{i}"
89+
)
90+
for i in range(num_clients)
91+
]
92+
93+
time.sleep(3.0)
94+
self._assert_executor_healthy()
95+
96+
barrier = threading.Barrier(num_clients)
97+
barrier_errors: list[Exception] = []
98+
99+
def _finish_at_barrier(q: IncomingQueue) -> None:
100+
try:
101+
barrier.wait(timeout=10)
102+
q.finish()
103+
except Exception as e:
104+
barrier_errors.append(e)
105+
106+
threads = [threading.Thread(target=_finish_at_barrier, args=(q,)) for q in queues]
107+
for t in threads:
108+
t.start()
109+
for t in threads:
110+
t.join(timeout=15)
111+
for q in queues:
112+
q.join(timeout=10)
113+
114+
self.assertFalse(barrier_errors, f"Barrier/finish errors: {barrier_errors}")
115+
time.sleep(2.0)
116+
self._assert_executor_healthy()
117+
118+
for probe in range(10):
119+
self._assert_executor_functional(f"Executor dead (probe {probe})")
120+
121+
def test_repeated_rapid_connect_disconnect(self) -> None:
122+
"""100 rapid create/destroy cycles with no settle time between them."""
123+
for iteration in range(100):
124+
q = self._make_client(10, 10, f"rapid_{self._test_id}_{iteration}")
125+
q.finish()
126+
q.join(timeout=5)
127+
self._assert_executor_functional(f"Executor died at iteration {iteration}")
128+
129+
self._assert_executor_healthy()
130+
131+
def test_interleaved_connect_disconnect(self) -> None:
132+
"""New clients connect while old clients disconnect each wave."""
133+
previous_queues: list[IncomingQueue] = []
134+
135+
for wave in range(20):
136+
new_queues = [
137+
self._make_client(10, 0, f"wave_{self._test_id}_w{wave}_c{i}") for i in range(10)
138+
]
139+
140+
for q in previous_queues:
141+
q.finish()
142+
for q in previous_queues:
143+
q.join(timeout=5)
144+
145+
previous_queues = new_queues
146+
self._assert_executor_functional(f"Executor died at wave {wave}")
147+
148+
for q in previous_queues:
149+
q.finish()
150+
q.join(timeout=5)
151+
152+
time.sleep(1.0)
153+
self._assert_executor_healthy()
154+
155+
def test_combined_stress(self) -> None:
156+
"""Three phases: barrier-disconnect batch A while creating B, rapid cycling, then destroy B."""
157+
# Phase 1: barrier-disconnect batch A while creating batch B
158+
batch_a = [self._make_client(15, 15, f"combo_{self._test_id}_a{i}") for i in range(15)]
159+
time.sleep(2.0)
160+
161+
barrier = threading.Barrier(len(batch_a))
162+
barrier_errors: list[Exception] = []
163+
164+
def _finish_at_barrier(q: IncomingQueue) -> None:
165+
try:
166+
barrier.wait(timeout=10)
167+
q.finish()
168+
except Exception as e:
169+
barrier_errors.append(e)
170+
171+
destroy_threads = [threading.Thread(target=_finish_at_barrier, args=(q,)) for q in batch_a]
172+
for t in destroy_threads:
173+
t.start()
174+
175+
batch_b = [self._make_client(15, 15, f"combo_{self._test_id}_b{i}") for i in range(15)]
176+
177+
for t in destroy_threads:
178+
t.join(timeout=15)
179+
for q in batch_a:
180+
q.join(timeout=10)
181+
182+
self.assertFalse(barrier_errors, f"Phase 1 errors: {barrier_errors}")
183+
self._assert_executor_functional("Executor dead after phase 1")
184+
185+
# Phase 2: rapid cycling while batch B is alive
186+
for iteration in range(50):
187+
q = self._make_client(5, 5, f"combo_{self._test_id}_r{iteration}")
188+
q.finish()
189+
q.join(timeout=5)
190+
191+
self._assert_executor_functional("Executor dead after phase 2")
192+
193+
# Phase 3: destroy batch B
194+
for q in batch_b:
195+
q.finish()
196+
for q in batch_b:
197+
q.join(timeout=10)
198+
199+
time.sleep(1.0)
200+
self._assert_executor_healthy()
201+
self._assert_executor_functional("Executor dead after phase 3")
202+
203+
204+
if __name__ == "__main__":
205+
unittest.main()

0 commit comments

Comments
 (0)