Skip to content

memory2 tf service#2707

Merged
leshy merged 16 commits into
mainfrom
feat/ivan/memtf
Jul 3, 2026
Merged

memory2 tf service#2707
leshy merged 16 commits into
mainfrom
feat/ivan/memtf

Conversation

@leshy

@leshy leshy commented Jul 3, 2026

Copy link
Copy Markdown
Member

we converted recordings to sensor frame, mem now needs a tf service for global mapping

leshy added 4 commits July 3, 2026 00:32
Unify replay tf with the live service: StreamTF(MultiTBuffer, TFSpec)
mirrors PubSubTF, pulling windows from a recorded tf stream on demand
instead of receiving pushed messages. Lookups span buffer_size backward
(or an explicit time_tolerance) plus forward_tolerance ahead; a cache
miss prefetches cache_span past the query window and evicts everything
first, so chronological replay costs one db query per cache_span.

- TFLookup protocol (read side) + mypy conformance checks
- get_pose hoisted from PubSubTF to TFSpec
- MultiTBuffer: None tolerance resolves to buffer_size explicitly
- map global: registration via tf stream (never Observation poses),
  --frame auto-detects world/map/odom via probe lookups, fail-fast
  when the cloud frame can't be resolved
- tf lookup tests consolidated into a grid: live MultiTBuffer vs
  StreamTF over memory/sqlite stores
@mintlify

mintlify Bot commented Jul 3, 2026

Copy link
Copy Markdown

Preview deployment for your docs. Learn more about Mintlify Previews.

Project Status Preview Updated (UTC)
dimensional 🟢 Ready View Preview Jul 3, 2026, 12:34 AM

💡 Tip: Enable Workflows to automatically generate PRs for you.

@codecov

codecov Bot commented Jul 3, 2026

Copy link
Copy Markdown

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
2551 1 2550 70
View the top 1 failed test(s) by shortest run time
dimos.e2e_tests.test_dimsim_path_replaning::test_path_replanning
Stack Traces | 230s run time
lcm_spy = <dimos.e2e_tests.lcm_spy.LcmSpy object at 0x7ef275f51d90>
start_blueprint = <function start_blueprint.<locals>.set_name_and_start at 0x7ef273915300>
dim_sim = <dimos.e2e_tests.dim_sim_client.DimSimClient object at 0x7ef27631b4d0>
direct_cmd_vel_explorer = <dimos.simulation.mujoco.direct_cmd_vel_explorer.DirectCmdVelExplorer object at 0x7ef2778ef6b0>
spawn_wall_on_pose = <function spawn_wall_on_pose.<locals>.spawn at 0x7ef2739160c0>

    @pytest.mark.self_hosted_large
    def test_path_replanning(
        lcm_spy, start_blueprint, dim_sim, direct_cmd_vel_explorer, spawn_wall_on_pose
    ) -> None:
        start_blueprint(
            "--dimsim-scene=empty",
            "run",
            "unitree-go2-agentic",
            simulator="dimsim",
        )
        lcm_spy.save_topic(".../McpClient/on_system_modules/res")
        lcm_spy.wait_for_saved_topic(".../McpClient/on_system_modules/res", timeout=1200.0)
    
        # robot spawns at (3, 2)
    
        # side wall
        dim_sim.add_wall(2, -2.5, 12, -2.5)
        # other side wall
        dim_sim.add_wall(2, 3.5, 12, 3.5)
        # back wall (behind robot)
        dim_sim.add_wall(2, -2.5, 2, 3.5)
        # forward wall (far end)
        dim_sim.add_wall(12, -2.5, 12, 3.5)
        # dividing wall at x=7 with doors at y=[-1.5,-0.5] and y=[1.5,2.5]
        dim_sim.add_wall(7, -2.5, 7, -1.5)
        dim_sim.add_wall(7, -0.5, 7, 1.5)
        dim_sim.add_wall(7, 2.5, 7, 3.5)
    
        direct_cmd_vel_explorer.linear_speed = 0.8
        direct_cmd_vel_explorer.follow_points([(10, 2), (2.5, 2), (3, 2)])
    
        # When the robot comes within 1.5 m of the left door's centre, drop a wall
        # in the opening so the planner has to bail out and route through the
        # right door at y=-1 instead.
        spawn_wall_on_pose(
            point=(7, 2),
            threshold=1.5,
            wall=(7, 1.5, 7, 2.5),
        )
    
        dim_sim.publish_goal(10.913, 0.588)
    
>       lcm_spy.wait_until_odom_position(10.913, 0.588, threshold=1, timeout=120)

dim_sim    = <dimos.e2e_tests.dim_sim_client.DimSimClient object at 0x7ef27631b4d0>
direct_cmd_vel_explorer = <dimos.simulation.mujoco.direct_cmd_vel_explorer.DirectCmdVelExplorer object at 0x7ef2778ef6b0>
lcm_spy    = <dimos.e2e_tests.lcm_spy.LcmSpy object at 0x7ef275f51d90>
spawn_wall_on_pose = <function spawn_wall_on_pose.<locals>.spawn at 0x7ef2739160c0>
start_blueprint = <function start_blueprint.<locals>.set_name_and_start at 0x7ef273915300>

dimos/e2e_tests/test_dimsim_path_replaning.py:60: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
dimos/e2e_tests/lcm_spy.py:182: in wait_until_odom_position
    self.wait_for_message_result(
        predicate  = <function LcmSpy.wait_until_odom_position.<locals>.predicate at 0x7ef273916a20>
        self       = <dimos.e2e_tests.lcm_spy.LcmSpy object at 0x7ef275f51d90>
        threshold  = 1
        timeout    = 120
        x          = 10.913
        y          = 0.588
dimos/e2e_tests/lcm_spy.py:168: in wait_for_message_result
    self.wait_until(
        event      = <threading.Event at 0x7ef275eb3b30: unset>
        fail_message = 'Failed to get to position x=10.913, y=0.588'
        listener   = <function LcmSpy.wait_for_message_result.<locals>.listener at 0x7ef273916ac0>
        predicate  = <function LcmSpy.wait_until_odom_position.<locals>.predicate at 0x7ef273916a20>
        self       = <dimos.e2e_tests.lcm_spy.LcmSpy object at 0x7ef275f51d90>
        timeout    = 120
        topic      = '/odom#geometry_msgs.PoseStamped'
        type       = <class 'dimos.msgs.geometry_msgs.PoseStamped.PoseStamped'>
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dimos.e2e_tests.lcm_spy.LcmSpy object at 0x7ef275f51d90>

    def wait_until(
        self,
        *,
        condition: Callable[[], bool],
        timeout: float,
        error_message: str,
        poll_interval: float = 0.1,
    ) -> None:
        start_time = time.time()
        while time.time() - start_time < timeout:
            if condition():
                return
            time.sleep(poll_interval)
>       raise TimeoutError(error_message)
E       TimeoutError: Failed to get to position x=10.913, y=0.588

condition  = <bound method Event.is_set of <threading.Event at 0x7ef275eb3b30: unset>>
error_message = 'Failed to get to position x=10.913, y=0.588'
poll_interval = 0.1
self       = <dimos.e2e_tests.lcm_spy.LcmSpy object at 0x7ef275f51d90>
start_time = 1783078207.7808752
timeout    = 120

dimos/e2e_tests/lcm_spy.py:105: TimeoutError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@leshy leshy marked this pull request as ready for review July 3, 2026 00:52
@leshy leshy changed the title Feat/ivan/memtf memory2 tf service Jul 3, 2026
@greptile-apps

greptile-apps Bot commented Jul 3, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR introduces StreamTF, a read-only TF service backed by a recorded tf stream, so the mapping CLI can register sensor-frame point clouds into a world frame during offline replay. It also replaces the --use-tf boolean with automatic world-frame detection (_detect_world) and a new --frame / --tf-tolerance pair, moves get_pose up to TFSpec, and adds a TFLookup protocol for type-safe loose coupling.

  • dimos/memory2/tf.py — new StreamTF class: demand-loads a sliding window of TF data from the store into an in-memory MultiTBuffer cache; eviction is manual on window miss.
  • dimos/mapping/utils/cli/map.py — world-frame auto-detection probes world/map/odom via the dataset's tf stream; sensor-frame clouds are registered per-frame, world-frame clouds pass through verbatim; dedup and trajectory now use the TF-derived position instead of the stored obs.pose.
  • dimos/protocol/tf/tf.pyTFLookup protocol added; get_pose promoted from PubSubTF to TFSpec; None tolerance bug in MultiTBuffer.get_transform fixed by defaulting to buffer_size.

Confidence Score: 5/5

Safe to merge; all changed paths are well-tested and the two observations raised are optimization notes without correctness impact.

The core caching and eviction logic in StreamTF is correct for the single-threaded CLI use case, the world-frame detection handles all edge cases (null tf_buf, user-provided --frame, cloud already in a world frame), and the test suite now parametrizes over live, memory-backed, and sqlite-backed backends covering chain composition, time tolerance, eviction, and protocol conformance. The two open points — I/O under lock and redundant register calls — are non-blocking quality concerns.

No files require special attention for correctness; dimos/memory2/tf.py and dimos/mapping/utils/cli/map.py carry the non-blocking quality notes.

Important Files Changed

Filename Overview
dimos/memory2/tf.py New StreamTF class wrapping a recorded tf stream for offline replay; caching and eviction logic is correct for single-threaded use, with minor concerns around holding the lock during I/O and the 10 s lookback fallback for time_tolerance=None.
dimos/mapping/utils/cli/map.py Replaces the --use-tf boolean with automatic world-frame detection and per-frame tf registration; logic is correct but each kept frame incurs a redundant second register() call during accumulation.
dimos/protocol/tf/tf.py Adds TFLookup protocol and moves get_pose from PubSubTF to TFSpec; also fixes the None tolerance passed to TBuffer.get by defaulting to buffer_size instead.
dimos/protocol/tf/test_tf.py Tests refactored into a parametrized grid fixture covering live, memory-backed, and sqlite-backed StreamTF; good coverage of caching, eviction, chain composition, and the TFLookup protocol conformance check.
docs/usage/transforms.md Minor doc fix: corrects the relative path to tf.py in the Markdown link.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant CLI as map.py CLI
    participant STF as StreamTF
    participant Store as SqliteStore / Stream
    participant MTB as MultiTBuffer (cache)

    CLI->>Store: stream("tf", TFMessage)
    CLI->>STF: StreamTF.from_store(store)
    CLI->>STF: "get("world", cloud_frame, ts=first_obs.ts)"
    STF->>STF: _ensure(ts-10, ts+0)
    STF->>Store: stream.at(center, radius)
    Store-->>STF: TFMessage observations
    STF->>MTB: receive_transform(...)
    STF->>MTB: super().get(...)
    MTB-->>STF: "Transform | None"
    STF-->>CLI: world frame detected

    loop for each obs in lidar (dedup)
        CLI->>STF: register(obs) via _position()
        STF->>STF: _ensure(obs.ts-10, obs.ts) — cache hit or evict+reload
        STF->>MTB: super().get(world, frame_id, obs.ts)
        MTB-->>STF: "Transform | None"
        STF-->>CLI: "position tuple | None"
    end

    loop for each kept obs in _accumulate()
        CLI->>STF: register(obs)
        STF->>MTB: super().get(...) — typically cache hit
        MTB-->>STF: Transform
        STF-->>CLI: Transform (used to register cloud into world frame)
    end
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant CLI as map.py CLI
    participant STF as StreamTF
    participant Store as SqliteStore / Stream
    participant MTB as MultiTBuffer (cache)

    CLI->>Store: stream("tf", TFMessage)
    CLI->>STF: StreamTF.from_store(store)
    CLI->>STF: "get("world", cloud_frame, ts=first_obs.ts)"
    STF->>STF: _ensure(ts-10, ts+0)
    STF->>Store: stream.at(center, radius)
    Store-->>STF: TFMessage observations
    STF->>MTB: receive_transform(...)
    STF->>MTB: super().get(...)
    MTB-->>STF: "Transform | None"
    STF-->>CLI: world frame detected

    loop for each obs in lidar (dedup)
        CLI->>STF: register(obs) via _position()
        STF->>STF: _ensure(obs.ts-10, obs.ts) — cache hit or evict+reload
        STF->>MTB: super().get(world, frame_id, obs.ts)
        MTB-->>STF: "Transform | None"
        STF-->>CLI: "position tuple | None"
    end

    loop for each kept obs in _accumulate()
        CLI->>STF: register(obs)
        STF->>MTB: super().get(...) — typically cache hit
        MTB-->>STF: Transform
        STF-->>CLI: Transform (used to register cloud into world frame)
    end
Loading

Reviews (4): Last reviewed commit: "Merge branch 'feat/ivan/memtf' of github..." | Re-trigger Greptile

Comment thread dimos/memory2/tf.py Outdated
Comment thread dimos/memory2/tf.py
@leshy leshy added the PlzReview label Jul 3, 2026
Comment thread dimos/memory2/tf.py
Comment thread dimos/memory2/tf.py
Comment thread dimos/memory2/tf.py
Comment thread dimos/protocol/tf/tf.py
paul-nechifor
paul-nechifor previously approved these changes Jul 3, 2026
@github-actions github-actions Bot added the ready-to-merge Required CI checks have passed on this PR label Jul 3, 2026
@github-actions github-actions Bot removed the ready-to-merge Required CI checks have passed on this PR label Jul 3, 2026
@github-actions github-actions Bot added the ready-to-merge Required CI checks have passed on this PR label Jul 3, 2026
leshy added 4 commits July 3, 2026 14:15
Covers the covered-range check, eviction, and reload atomically so a
concurrent reader can't see a stale _covered against cleared buffers;
also resets _covered before reloading so a failed _load can't leave
the cache claiming coverage it evicted.
@github-actions github-actions Bot removed the ready-to-merge Required CI checks have passed on this PR label Jul 3, 2026
@leshy leshy enabled auto-merge (squash) July 3, 2026 11:40
@github-actions github-actions Bot added the ready-to-merge Required CI checks have passed on this PR label Jul 3, 2026
@leshy leshy merged commit b01251e into main Jul 3, 2026
31 of 32 checks passed
@leshy leshy deleted the feat/ivan/memtf branch July 3, 2026 16:46
@mintlify mintlify Bot mentioned this pull request Jul 3, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

PlzReview ready-to-merge Required CI checks have passed on this PR

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants