Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 121 additions & 53 deletions dimos/mapping/utils/cli/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from dimos.mapping.loop_closure.pgo import PoseGraph
from dimos.memory2.stream import Stream
from dimos.memory2.type.observation import Observation
from dimos.msgs.geometry_msgs.Transform import Transform
from dimos.msgs.sensor_msgs.Image import Image
from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2

Expand All @@ -40,6 +41,20 @@
# labels never overlap the boxes.
MARKER_STEM = 1.0

# Conventional world frames tried in order when --frame isn't given.
_WORLD_FRAMES = ("world", "map", "odom")


def _detect_world(tf_buf: Any, cloud_frame: str, ts: float) -> str | None:
"""Pick the first conventional world frame that resolves the cloud frame via tf."""
if cloud_frame in _WORLD_FRAMES:
return cloud_frame
if tf_buf is not None:
for cand in _WORLD_FRAMES:
if tf_buf.get(cand, cloud_frame, time_point=ts) is not None:
return cand
return None


def _log_markers(
prefix: str,
Expand Down Expand Up @@ -97,49 +112,35 @@ def _accumulate(
block_count: int,
device: str,
graph: PoseGraph | None = None,
world_frame: bool = True,
register: Callable[[Observation[Any]], Transform | None] | None = None,
carve_columns: bool = False,
progress_cb: Callable[[Observation[Any]], None] | None = None,
) -> PointCloud2 | None:
"""Accumulate a voxel map from `obs_iter`, optionally PGO-correcting each frame.

By default the clouds are assumed already world-registered (the go2/fastlio
path) — only the PGO correction is applied, if any. Set ``world_frame=False``
(the ``--use-tf`` path) when each frame's cloud is in the sensor/body frame
and must be registered into the world via its per-frame pose.
``register`` maps each observation to the transform lifting its cloud into
the world frame; ``None`` means no transform is available and the frame is
skipped. With ``register=None`` all clouds are assumed world-registered.

Returns the final ``PointCloud2`` (or ``None`` if the input was empty).
Disposal of the underlying ``VoxelGrid`` is handled by ``VoxelMapTransformer``.
"""
from dimos.mapping.voxels import VoxelMapTransformer
from dimos.msgs.geometry_msgs.Quaternion import Quaternion
from dimos.msgs.geometry_msgs.Transform import Transform
from dimos.msgs.geometry_msgs.Vector3 import Vector3

def _pose_tf(obs: Observation[Any]) -> Transform:
pose = obs.pose
assert pose is not None
return Transform(
translation=Vector3(pose.position.x, pose.position.y, pose.position.z),
rotation=Quaternion(
pose.orientation.x, pose.orientation.y, pose.orientation.z, pose.orientation.w
),
)

def prepared() -> Iterable[Observation[PointCloud2]]:
for obs in obs_iter:
if progress_cb is not None:
progress_cb(obs)
if len(obs.data) == 0:
continue
# body->world via the per-frame pose, unless the clouds are already
# world-registered (go2 default). graph adds the PGO correction on top
# (correction ∘ pose), applied after the pose.
# sensor->world via `register`, unless the clouds are already
# world-registered. graph adds the PGO correction on top
# (correction ∘ tf), applied after the registration.
tf: Transform | None = None
if not world_frame:
if obs.pose is None:
if register is not None:
tf = register(obs)
if tf is None:
continue
tf = _pose_tf(obs)
if graph is not None:
if obs.pose_tuple is None:
continue
Expand Down Expand Up @@ -328,11 +329,19 @@ def main(
None, "--out", help="Output .rrd path (default: ./<dataset>.rrd)"
),
no_gui: bool = typer.Option(False, "--no-gui", help="Write the .rrd but don't launch rerun"),
use_tf: bool = typer.Option(
False,
"--use-tf",
help="Clouds are in the sensor/body frame; register each by its per-frame pose. "
"By default clouds are assumed already world-registered (e.g. go2/fastlio).",
frame: str | None = typer.Option(
None,
"--frame",
help="World frame to register clouds into. Default: auto-detect — the "
"first of 'world', 'map', 'odom' that resolves the cloud frame via the "
"dataset's tf stream. Clouds whose frame_id differs from it are "
"registered via tf; clouds already in it pass through verbatim.",
),
tf_tolerance: float | None = typer.Option(
None,
"--tf-tolerance",
help="Max |Δts| (s) for tf lookups; default unlimited (nearest message), "
"which also serves static/rarely-published transforms",
),
carve: bool = typer.Option(
False,
Expand Down Expand Up @@ -412,44 +421,103 @@ def main(

total = lidar.count()

# Spatial dedup: bucket frames by 3D cell using the raw pose, keep the
# latest per cell. Shared by raw and PGO rebuilds. Doesn't touch obs.data
# so it stays cheap (no pointcloud loading). With pgo_tol<=0 the bucketing
# is disabled and every posed frame is kept (keyed by index).
seen: dict[Any, Observation[Any]] = {}
for i, obs in enumerate(lidar):
# Register clouds into the world frame via the dataset's tf stream. Clouds
# already stamped with the world frame pass through verbatim; sensor-frame
# clouds with no tf lookup are dropped. Stored per-frame poses are never
# used for registration — only as trajectory metadata (dedup/path) when
# the tf stream can't provide a position.
from dimos.memory2.tf import StreamTF

tf_buf = StreamTF.from_store(store)
# Streams are homogeneous: read the cloud frame from the first observation.
first_obs = next(iter(lidar), None)
cloud_frame: str | None = first_obs.data.frame_id if first_obs is not None else None

world = frame
if world is None and first_obs is not None and cloud_frame is not None:
world = _detect_world(tf_buf, cloud_frame, first_obs.ts)
if world is None:
frames = tf_buf.get_frames() if tf_buf is not None else set()
known = ", ".join(sorted(frames)) or "dataset has no tf stream"
raise typer.BadParameter(
f"none of {', '.join(_WORLD_FRAMES)} resolves {cloud_frame!r} clouds; "
f"pass --frame (tf frames: {known})",
param_hint="--frame",
)
if world is None:
world = "world" # empty lidar stream; the frame is moot

# Registration: sensor-frame clouds get a per-frame tf lookup lifting them
# into the world frame (frames with no tf answer are dropped); clouds
# already stamped with the world frame accumulate verbatim (register=None).
register: Callable[[Observation[Any]], Transform | None] | None = None
if first_obs is not None and cloud_frame is not None and cloud_frame != world:
# Fail fast when registration is impossible: probe the first cloud's
# timestamp (unbounded tolerance — "possible at all", not "in range").
probe = (
tf_buf.get(world, cloud_frame, time_point=first_obs.ts) if tf_buf is not None else None
)
if tf_buf is None or probe is None:
frames = tf_buf.get_frames() if tf_buf is not None else set()
known = ", ".join(sorted(frames)) or "dataset has no tf stream"
raise typer.BadParameter(
f"cannot register {cloud_frame!r} clouds into {world!r} (tf frames: {known})",
param_hint="--frame",
)
print(f"registering clouds {world!r} ← {cloud_frame!r} via tf")
buf = tf_buf

def _register(obs: Observation[Any]) -> Transform | None:
return buf.get(world, obs.data.frame_id, time_point=obs.ts, time_tolerance=tf_tolerance)

register = _register
elif cloud_frame is not None:
print(f"clouds already in world frame {world!r}; accumulating verbatim")

def _position(obs: Observation[Any]) -> tuple[float, float, float] | None:
"""Trajectory position for dedup/path: tf lookup, else the stored pose."""
if tf_buf is not None and cloud_frame is not None and cloud_frame != world:
tf = tf_buf.get(world, cloud_frame, time_point=obs.ts, time_tolerance=tf_tolerance)
if tf is not None:
return (tf.translation.x, tf.translation.y, tf.translation.z)
pose = obs.pose
if pose is None:
continue
# Reject placeholder poses: zero translation OR uninitialized rotation.
# Same condition as pgo_keyframes so dedup and PGO see the same frames.
if pose.position.is_zero() or pose.orientation.is_zero():
if pose is not None and not (pose.position.is_zero() or pose.orientation.is_zero()):
return (pose.position.x, pose.position.y, pose.position.z)
return None

# Spatial dedup: bucket frames by 3D cell using the trajectory position,
# keep the latest per cell. Shared by raw and PGO rebuilds. Doesn't touch
# obs.data so it stays cheap (no pointcloud loading). With pgo_tol<=0 the
# bucketing is disabled and every positioned frame is kept (keyed by index).
seen: dict[Any, tuple[Observation[Any], tuple[float, float, float]]] = {}
for i, obs in enumerate(lidar):
pos = _position(obs)
if pos is None:
continue
if pgo_tol > 0:
t = pose.position
# math.floor so negative coords bucket consistently; int() truncates
# toward zero and silently folds -0.5 and 0.5 into the same cell.
key: Any = (
math.floor(t.x / pgo_tol),
math.floor(t.y / pgo_tol),
math.floor(t.z / pgo_tol),
math.floor(pos[0] / pgo_tol),
math.floor(pos[1] / pgo_tol),
math.floor(pos[2] / pgo_tol),
)
else:
key = i
seen[key] = obs
seen[key] = (obs, pos)

n_kept = len(seen)
pct = 100 * n_kept / total if total else 0
if pgo_tol > 0:
print(f"dedup: kept [{n_kept}/{total}] frames ({pct:.1f}%) at tol={pgo_tol}m")
else:
print(f"dedup: disabled, kept all [{n_kept}/{total}] posed frames")
print(f"dedup: disabled, kept all [{n_kept}/{total}] positioned frames")

# Dict insertion order = lidar iteration order = chronological.
# `seen` only contains entries with non-None poses (filtered above).
path: list[tuple[float, float, float]] = [
(p[0], p[1], p[2]) for obs in seen.values() if (p := obs.pose_tuple) is not None
]
kept = [obs for obs, _ in seen.values()]
path: list[tuple[float, float, float]] = [pos for _, pos in seen.values()]

pgo_map = None
pgo_path: list[tuple[float, float, float]] = []
Expand All @@ -465,12 +533,12 @@ def main(
]

pgo_map = _accumulate(
seen.values(),
kept,
voxel=voxel,
block_count=block_count,
device=device,
graph=graph,
world_frame=not use_tf,
register=register,
carve_columns=carve,
progress_cb=progress(n_kept, "pgo pass 2 (rebuilding)"),
)
Expand All @@ -484,18 +552,18 @@ def main(
block_count=block_count,
device=device,
graph=graph,
world_frame=not use_tf,
register=register,
carve_columns=carve,
progress_cb=progress(total, "full pgo (rebuilding)"),
)

# Raw map: same dedup'd frames, no PGO correction.
global_map = _accumulate(
seen.values(),
kept,
voxel=voxel,
block_count=block_count,
device=device,
world_frame=not use_tf,
register=register,
carve_columns=carve,
progress_cb=progress(n_kept, "reconstructing global map"),
)
Expand Down
110 changes: 110 additions & 0 deletions dimos/memory2/tf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""TF service backed by a recorded ``tf`` stream."""

from __future__ import annotations

import math
from typing import TYPE_CHECKING, Any, cast

from dimos.memory2.stream import Stream
from dimos.msgs.tf2_msgs.TFMessage import TFMessage
from dimos.protocol.tf.tf import MultiTBuffer, TFConfig, TFSpec

if TYPE_CHECKING:
from dimos.msgs.geometry_msgs.Transform import Transform
from dimos.protocol.tf.tf import TFLookup


class StreamTFConfig(TFConfig):
stream: Stream[TFMessage] | None = (
None # Required field but needs default for config inheritance
)
cache_span: float = 300.0


class StreamTF(MultiTBuffer, TFSpec):
config: StreamTFConfig

def __init__(self, stream: Stream[TFMessage] | None = None, **kwargs: Any) -> None:
if stream is not None:
kwargs["stream"] = stream
TFSpec.__init__(self, **kwargs)
MultiTBuffer.__init__(self, buffer_size=math.inf)
Comment thread
leshy marked this conversation as resolved.

if self.config.stream is None:
raise ValueError("Stream configuration is missing")
self.stream = self.config.stream

self._covered: tuple[float, float] | None = None

@classmethod
def from_store(cls, store: Any, stream: str = "tf") -> StreamTF | None:
if stream not in store.list_streams():
return None
return cls(store.stream(stream, TFMessage))

def publish(self, *args: Transform) -> None:
raise NotImplementedError("StreamTF is a read-only replay service.")

def publish_static(self, *args: Transform) -> None:
raise NotImplementedError("StreamTF is a read-only replay service.")

def _load(self, lo: float, hi: float) -> None:
for obs in self.stream.at((lo + hi) / 2, (hi - lo) / 2):
self.receive_transform(*obs.data.transforms)
self._covered = (lo, hi)
Comment thread
leshy marked this conversation as resolved.

def _ensure(self, lo: float, hi: float) -> None:
"""Serve ``[lo, hi]`` from the cache, else re-cache ``[lo, hi + cache_span]``."""
if self._covered is not None:
clo, chi = self._covered
if clo <= lo and hi <= chi:
return
with self._cv:
self.buffers.clear()
self._load(lo, hi + self.config.cache_span)
Comment thread
leshy marked this conversation as resolved.
Outdated

def get(
self,
parent_frame: str,
child_frame: str,
time_point: float | None = None,
time_tolerance: float | None = None,
*,
forward_tolerance: float = 0.0,
) -> Transform | None:
tp = time_point
if tp is None:
last = next(iter(self.stream.order_by("ts", desc=True).limit(1)), None)
tp = last.ts if last is not None else None

if tp is not None:
back = time_tolerance if time_tolerance is not None else self.config.buffer_size
fwd = time_tolerance if time_tolerance is not None else forward_tolerance
self._ensure(tp - back, tp + fwd)
Comment thread
leshy marked this conversation as resolved.

return super().get(
parent_frame,
child_frame,
time_point,
time_tolerance,
forward_tolerance=0.0,
)


if TYPE_CHECKING:
# mypy conformance check: StreamTF satisfies the read-side tf protocol.
_lookup_impl: TFLookup = cast("StreamTF", None)
Comment thread
leshy marked this conversation as resolved.
Loading
Loading