Skip to content

Add a queue to shmrpc#2717

Open
Dreamsorcerer wants to merge 2 commits into
mainfrom
sam/fix-shmrpc-concurrency
Open

Add a queue to shmrpc#2717
Dreamsorcerer wants to merge 2 commits into
mainfrom
sam/fix-shmrpc-concurrency

Conversation

@Dreamsorcerer

Copy link
Copy Markdown
Collaborator

I'm not entirely sure if it might be more appropriate to use this everywhere for SHM or not. But, for RPC atleast, this fixes the concurrency tests.

@greptile-apps

greptile-apps Bot commented Jul 3, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR adds a queue-backed shared-memory path for RPC. The main changes are:

  • New CpuShmQueue ring buffer for SHM message delivery.
  • ShmRPC now uses the queue with RPC-sized frames and a fixed slot count.
  • SHM topic names now include the channel layout.
  • Tests now cover SHM async and concurrent RPC behavior.

Confidence Score: 5/5

This looks safe to merge.

  • No blocking issues found in the changed code.

Important Files Changed

Filename Overview
dimos/protocol/pubsub/shm/ipc_factory.py Adds the queue-backed SHM channel and its ownership, read, write, attach, and close paths.
dimos/protocol/pubsub/impl/shmpubsub.py Allows SHM pubsub subclasses to choose a channel implementation and includes layout details in topic names.
dimos/protocol/rpc/pubsubrpc.py Configures ShmRPC to use the queue-backed channel with RPC-specific capacity settings.
dimos/protocol/pubsub/shm/test_ipc_factory.py Adds unit tests for queue delivery, overflow handling, named segments, and descriptor attachment.
dimos/protocol/rpc/test_spec.py Extends RPC tests so SHM runs through async and concurrent call coverage.

Reviews (2): Last reviewed commit: "Minor fixes" | Re-trigger Greptile

Comment thread dimos/protocol/pubsub/shm/ipc_factory.py
Comment thread dimos/protocol/pubsub/impl/shmpubsub.py
Comment thread dimos/protocol/pubsub/shm/ipc_factory.py Outdated
Comment thread dimos/protocol/pubsub/shm/ipc_factory.py
@codecov

codecov Bot commented Jul 3, 2026

Copy link
Copy Markdown

❌ 2 Tests Failed:

Tests completed Failed Passed Skipped
2561 2 2559 69
View the top 2 failed test(s) by shortest run time
dimos.protocol.pubsub.test_spec::test_high_volume_messages[ros_context-topic1-values1]
Stack Traces | 3.27s run time
pubsub_context = <function ros_context at 0x7ce15e9b4e00>
topic = RawROSTopic(topic='/test_ros_topic', ros_type=<class 'geometry_msgs.msg._vector3.Vector3'>, qos=<rclpy.qos.QoSProfile object at 0x7ce15e9beb20>)
values = [geometry_msgs.msg.Vector3(x=1.0, y=2.0, z=3.0), geometry_msgs.msg.Vector3(x=4.0, y=5.0, z=6.0), geometry_msgs.msg.Vector3(x=7.0, y=8.0, z=9.0)]

    @pytest.mark.self_hosted
    @pytest.mark.skipif_macos_bug
    @pytest.mark.parametrize("pubsub_context, topic, values", testdata)
    def test_high_volume_messages(
        pubsub_context: Callable[[], Any], topic: Any, values: list[Any]
    ) -> None:
        """Test that all 5k messages are received correctly.
        Limited to 5k because ros transport cannot handle more.
        Might want to have separate expectations per transport later
        """
        with pubsub_context() as x:
            # Create a list to capture received messages
            received_messages: list[Any] = []
            last_message_time = [time.time()]  # Use list to allow modification in callback
    
            # Define callback function
            def callback(message: Any, topic: Any) -> None:
                received_messages.append(message)
                last_message_time[0] = time.time()
    
            # Subscribe to the topic
            x.subscribe(topic, callback)
    
            # Publish 5000 messages
            num_messages = 5000
            for _ in range(num_messages):
                x.publish(topic, values[0])
    
            # Wait until no messages received for 0.5 seconds
            timeout = 2.0  # Maximum time to wait
            stable_duration = 0.1  # Time without new messages to consider done
            start_time = time.time()
    
            while time.time() - start_time < timeout:
                if time.time() - last_message_time[0] >= stable_duration:
                    break
                time.sleep(0.1)
    
            # Capture count and clear list to avoid printing huge list on failure
            received_len = len(received_messages)
            received_messages.clear()
>           assert received_len == num_messages, f"Expected {num_messages} messages, got {received_len}"
E           AssertionError: Expected 5000 messages, got 4280
E           assert 4280 == 5000

_          = 4999
callback   = <function test_high_volume_messages.<locals>.callback at 0x7ce11fceccc0>
last_message_time = [1783101382.0500698]
num_messages = 5000
pubsub_context = <function ros_context at 0x7ce15e9b4e00>
received_len = 4280
received_messages = []
stable_duration = 0.1
start_time = 1783101380.0339353
timeout    = 2.0
topic      = RawROSTopic(topic='/test_ros_topic', ros_type=<class 'geometry_msgs.msg._vector3.Vector3'>, qos=<rclpy.qos.QoSProfile object at 0x7ce15e9beb20>)
values     = [geometry_msgs.msg.Vector3(x=1.0, y=2.0, z=3.0), geometry_msgs.msg.Vector3(x=4.0, y=5.0, z=6.0), geometry_msgs.msg.Vector3(x=7.0, y=8.0, z=9.0)]
x          = <dimos.protocol.pubsub.impl.rospubsub.RawROS object at 0x7ce15edbf200>

.../protocol/pubsub/test_spec.py:325: AssertionError
dimos.navigation.cmu_nav.modules.far_planner.test_far_planner_rosbag.TestFarPlannerRosbag::test_waypoint_accuracy
Stack Traces | 72.3s run time
self = <dimos.navigation.cmu_nav.modules.far_planner.test_far_planner_rosbag.TestFarPlannerRosbag object at 0x7ce15eebb800>

    def test_waypoint_accuracy(self) -> None:
        """Feed identical inputs at original timing and compare waypoint output."""
        if not FAR_PLANNER_BIN.exists():
            pytest.skip(f"FAR planner binary not found: {FAR_PLANNER_BIN}")
    
        window = load_rosbag_window()
        ref_wp = window.way_point
        assert len(ref_wp) > 0, "No reference waypoints in fixture"
    
        lcm = lcmlib.LCM()
        wp_collector = LcmCollector(topic=WAYPOINT_OUT_LCM, msg_type=PointStamped)
        wp_collector.start(lcm)
    
        stop_event = threading.Event()
        handle_thread = threading.Thread(
            target=lcm_handle_loop, args=(lcm, stop_event), daemon=True
        )
        handle_thread.start()
    
        runner = NativeProcessRunner(binary_path=str(FAR_PLANNER_BIN), args=_far_planner_args())
    
        try:
            runner.start()
            assert runner.is_running, "FAR planner binary failed to start"
            time.sleep(_PROCESS_STARTUP_SEC)
    
            # Feed at original timing (1:1 with rosbag)
            feed_at_original_timing(
                lcm,
                window,
                topic_map={
                    "odom": ODOM_LCM,
                    "scan": SCAN_LCM,
                    "terrain": TERRAIN_LCM,
                    "terrain_ext": TERRAIN_EXT_LCM,
                    "goal": GOAL_LCM,
                },
            )
    
            time.sleep(_POST_FEED_DRAIN_SEC)
    
        finally:
            runner.stop()
            stop_event.set()
            handle_thread.join(timeout=DEFAULT_THREAD_JOIN_TIMEOUT)
            wp_collector.stop(lcm)
    
        our_wps = [(msg.x, msg.y) for msg in wp_collector.messages]
    
        # Compute deviation score
        score = _compute_waypoint_deviation(our_wps, ref_wp)
    
        # Log score for visibility
        logger.info(f"\n{'=' * 60}")
        logger.info("FAR PLANNER DEVIATION SCORE")
        logger.info(f"  Our waypoints:     {len(our_wps)}")
        logger.info(f"  Reference:         {len(ref_wp)}")
        logger.info(f"  Count ratio:       {score['count_ratio']:.3f}")
        logger.info(f"  Mean error:        {score['mean_error_m']:.3f} m")
        logger.info(f"  Max error:         {score['max_error_m']:.3f} m")
        logger.info(f"  Mean X diff:       {score['mean_x_diff']:.3f} m")
        logger.info(f"  Mean Y diff:       {score['mean_y_diff']:.3f} m")
        logger.info(f"{'=' * 60}\n")
    
        # Assertions — generous thresholds, the point is to measure
        assert len(our_wps) > 0, "FAR planner produced no waypoints"
>       assert score["mean_error_m"] < 5.0, (
            f"Mean waypoint error {score['mean_error_m']:.2f}m exceeds 5m threshold"
        )
E       AssertionError: Mean waypoint error 5.43m exceeds 5m threshold
E       assert 5.426592027836722 < 5.0

handle_thread = <Thread(Thread-155 (lcm_handle_loop), stopped daemon 137306568636096)>
lcm        = <LCM object at 0x7ce1584becd0>
our_wps    = [(6.291114330291748, -3.876110076904297), (6.291114330291748, -3.876110076904297), (6.291114330291748, -3.876110076904...4330291748, -3.876110076904297), (6.291114330291748, -3.876110076904297), (6.291114330291748, -3.876110076904297), ...]
ref_wp     = array([[     5.5327,        -0.3,         2.5,        0.75],
       [     5.7342,        -0.3,         2.5,        0.7...,      7.8589,      1.4869,    0.095978],
       [     65.337,      7.8589,      1.4869,    0.095978]], shape=(300, 4))
runner     = NativeProcessRunner(binary_path='.../result/bin/far_planner_...hred', '4', '--new_point_counter', '5', '--obs_inflate_size', '1', '--visualize_ratio', '0.4', '--wp_churn_dist', '0'])
score      = {'count_ratio': 0.31666666666666665, 'max_error_m': 9.170472615133974, 'mean_error_m': 5.426592027836722, 'mean_x_diff': 3.969529392384646, ...}
self       = <dimos.navigation.cmu_nav.modules.far_planner.test_far_planner_rosbag.TestFarPlannerRosbag object at 0x7ce15eebb800>
stop_event = <threading.Event at 0x7ce158276240: set>
window     = RosbagWindow(odom=array([[     5.3674,           0,           0, ...,           0,           0,           1],
       [...,           1],
       [     0.2975,           0,           0,           0,           0,           0,           1]]))])
wp_collector = LcmCollector(topic='/rbfp_wp#geometry_msgs.PointStamped', msg_type=<class 'dimos.msgs.geometry_msgs.PointStamped.Point...383, 5558145.749739975, 5558145.950500847, 5558146.147794054, 5558146.349164861, 5558146.547530573, 5558146.754177557])

.../modules/far_planner/test_far_planner_rosbag.py:276: AssertionError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@github-actions github-actions Bot added the ready-to-merge Required CI checks have passed on this PR label Jul 3, 2026
@github-actions github-actions Bot removed the ready-to-merge Required CI checks have passed on this PR label Jul 3, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant