|
8 | 8 | from typing import Any |
9 | 9 |
|
10 | 10 | import rclpy |
| 11 | +from geometry_msgs.msg import PoseStamped |
11 | 12 | from rclpy.executors import SingleThreadedExecutor |
12 | 13 | from rclpy.node import Node |
13 | 14 | from rclpy.qos import DurabilityPolicy, QoSProfile |
|
17 | 18 | MissingArgumentException, |
18 | 19 | ) |
19 | 20 | from rosbridge_library.protocol import Protocol |
| 21 | +from sensor_msgs.msg import TimeReference |
20 | 22 | from std_msgs.msg import String |
21 | 23 |
|
22 | 24 |
|
@@ -74,6 +76,143 @@ def cb(msg: String) -> None: |
74 | 76 | time.sleep(0.5) |
75 | 77 | self.assertEqual(received["msg"].data, msg["data"]) |
76 | 78 |
|
| 79 | + def test_publish_header_stamp_auto_populated_when_stamp_omitted(self) -> None: |
| 80 | + proto = Protocol("hello", self.node) |
| 81 | + pub = Publish(proto) |
| 82 | + topic = "/test_header_stamp_auto_populated" |
| 83 | + |
| 84 | + received: dict[str, Any] = {"msg": None} |
| 85 | + |
| 86 | + def cb(msg: PoseStamped) -> None: |
| 87 | + received["msg"] = msg |
| 88 | + |
| 89 | + subscriber_qos = QoSProfile( |
| 90 | + depth=10, |
| 91 | + durability=DurabilityPolicy.TRANSIENT_LOCAL, |
| 92 | + ) |
| 93 | + self.node.create_subscription(PoseStamped, topic, cb, subscriber_qos) |
| 94 | + |
| 95 | + # Publish with header present but stamp omitted |
| 96 | + pub_msg = { |
| 97 | + "op": "publish", |
| 98 | + "topic": topic, |
| 99 | + "type": "geometry_msgs/msg/PoseStamped", |
| 100 | + "msg": {"header": {}}, |
| 101 | + } |
| 102 | + pub.publish(pub_msg) |
| 103 | + time.sleep(0.5) |
| 104 | + |
| 105 | + self.assertIsNotNone(received["msg"]) |
| 106 | + stamp = received["msg"].header.stamp |
| 107 | + self.assertGreater( |
| 108 | + stamp.sec + stamp.nanosec, |
| 109 | + 0, |
| 110 | + "Expected server to auto-populate header.stamp with current ROS time, but stamp is zero", |
| 111 | + ) |
| 112 | + |
| 113 | + def test_publish_header_stamp_preserved_when_provided_by_client(self) -> None: |
| 114 | + proto = Protocol("hello", self.node) |
| 115 | + pub = Publish(proto) |
| 116 | + topic = "/test_header_stamp_preserved" |
| 117 | + |
| 118 | + received: dict[str, Any] = {"msg": None} |
| 119 | + |
| 120 | + def cb(msg: PoseStamped) -> None: |
| 121 | + received["msg"] = msg |
| 122 | + |
| 123 | + subscriber_qos = QoSProfile( |
| 124 | + depth=10, |
| 125 | + durability=DurabilityPolicy.TRANSIENT_LOCAL, |
| 126 | + ) |
| 127 | + self.node.create_subscription(PoseStamped, topic, cb, subscriber_qos) |
| 128 | + |
| 129 | + expected_sec = 12345 |
| 130 | + expected_nanosec = 67890 |
| 131 | + pub_msg = { |
| 132 | + "op": "publish", |
| 133 | + "topic": topic, |
| 134 | + "type": "geometry_msgs/msg/PoseStamped", |
| 135 | + "msg": { |
| 136 | + "header": { |
| 137 | + "stamp": {"sec": expected_sec, "nanosec": expected_nanosec}, |
| 138 | + } |
| 139 | + }, |
| 140 | + } |
| 141 | + pub.publish(pub_msg) |
| 142 | + time.sleep(0.5) |
| 143 | + |
| 144 | + self.assertIsNotNone(received["msg"]) |
| 145 | + stamp = received["msg"].header.stamp |
| 146 | + self.assertEqual(stamp.sec, expected_sec) |
| 147 | + self.assertEqual(stamp.nanosec, expected_nanosec) |
| 148 | + |
| 149 | + def test_publish_header_stamp_auto_populated_when_header_omitted(self) -> None: |
| 150 | + proto = Protocol("hello", self.node) |
| 151 | + pub = Publish(proto) |
| 152 | + topic = "/test_header_stamp_auto_populated_no_header" |
| 153 | + |
| 154 | + received: dict[str, Any] = {"msg": None} |
| 155 | + |
| 156 | + def cb(msg: PoseStamped) -> None: |
| 157 | + received["msg"] = msg |
| 158 | + |
| 159 | + subscriber_qos = QoSProfile( |
| 160 | + depth=10, |
| 161 | + durability=DurabilityPolicy.TRANSIENT_LOCAL, |
| 162 | + ) |
| 163 | + self.node.create_subscription(PoseStamped, topic, cb, subscriber_qos) |
| 164 | + |
| 165 | + # Publish with no header at all (the entire header field is omitted) |
| 166 | + pub_msg = { |
| 167 | + "op": "publish", |
| 168 | + "topic": topic, |
| 169 | + "type": "geometry_msgs/msg/PoseStamped", |
| 170 | + "msg": {}, |
| 171 | + } |
| 172 | + pub.publish(pub_msg) |
| 173 | + time.sleep(0.5) |
| 174 | + |
| 175 | + self.assertIsNotNone(received["msg"]) |
| 176 | + stamp = received["msg"].header.stamp |
| 177 | + self.assertGreater( |
| 178 | + stamp.sec + stamp.nanosec, |
| 179 | + 0, |
| 180 | + "Expected server to auto-populate header.stamp even when header is entirely omitted", |
| 181 | + ) |
| 182 | + |
| 183 | + def test_publish_time_field_now_string_populated(self) -> None: |
| 184 | + proto = Protocol("hello", self.node) |
| 185 | + pub = Publish(proto) |
| 186 | + topic = "/test_time_field_now" |
| 187 | + |
| 188 | + received: dict[str, Any] = {"msg": None} |
| 189 | + |
| 190 | + def cb(msg: TimeReference) -> None: |
| 191 | + received["msg"] = msg |
| 192 | + |
| 193 | + subscriber_qos = QoSProfile( |
| 194 | + depth=10, |
| 195 | + durability=DurabilityPolicy.TRANSIENT_LOCAL, |
| 196 | + ) |
| 197 | + self.node.create_subscription(TimeReference, topic, cb, subscriber_qos) |
| 198 | + |
| 199 | + pub_msg = { |
| 200 | + "op": "publish", |
| 201 | + "topic": topic, |
| 202 | + "type": "sensor_msgs/msg/TimeReference", |
| 203 | + "msg": {"time_ref": "now"}, |
| 204 | + } |
| 205 | + pub.publish(pub_msg) |
| 206 | + time.sleep(0.5) |
| 207 | + |
| 208 | + self.assertIsNotNone(received["msg"]) |
| 209 | + time_ref = received["msg"].time_ref |
| 210 | + self.assertGreater( |
| 211 | + time_ref.sec + time_ref.nanosec, |
| 212 | + 0, |
| 213 | + 'Expected server to populate time_ref with current ROS time when set to "now"', |
| 214 | + ) |
| 215 | + |
77 | 216 |
|
78 | 217 | if __name__ == "__main__": |
79 | 218 | unittest.main() |
0 commit comments