From 42441a6e70aca8809b69582b954218ef4cde3444 Mon Sep 17 00:00:00 2001 From: Jon Chen Date: Wed, 24 Jun 2026 17:50:27 -0700 Subject: [PATCH 01/10] Add smooth PF400 Cartesian route primitive --- pylabrobot/brooks/precise_flex/arm_backend.py | 123 ++++++++++++++- .../precise_flex/tests/arm_backend_tests.py | 144 +++++++++++++++++- todo-documentation-backendparameters.md | 1 + 3 files changed, 265 insertions(+), 3 deletions(-) diff --git a/pylabrobot/brooks/precise_flex/arm_backend.py b/pylabrobot/brooks/precise_flex/arm_backend.py index 0ea4078f2d1..895a2b7f1d5 100644 --- a/pylabrobot/brooks/precise_flex/arm_backend.py +++ b/pylabrobot/brooks/precise_flex/arm_backend.py @@ -21,7 +21,7 @@ import logging import warnings from dataclasses import dataclass -from typing import ClassVar, Dict, List, Literal, Optional +from typing import ClassVar, Dict, List, Literal, Optional, Sequence from pylabrobot.capabilities.arms.backend import ( CanFreedrive, @@ -512,6 +512,51 @@ async def _cart_to_joints(self, cart: PreciseFlexCartesianPose) -> JointPose: joints[Axis.RAIL] = cart.rail_position return joints + # [int] + def _plan_cartesian_pose_route( + self, + poses: Sequence[PreciseFlexCartesianPose], + planned_joints: JointPose, + planned_pose: PreciseFlexCartesianPose, + ) -> List[JointPose]: + """Plan a Cartesian pose route into joint targets from an initial state snapshot. + + Unlike :meth:`_cart_to_joints`, this helper does not query the controller for every + waypoint. Omitted pose fields inherit from the *planned* previous pose so IK branch + selection remains continuous across the route. + """ + planned_joints = dict(planned_joints) + targets: List[JointPose] = [] + for pose in poses: + cart = dataclasses.replace( + pose, + orientation=planned_pose.orientation if pose.orientation is None else pose.orientation, + wrist=planned_pose.wrist if pose.wrist is None else pose.wrist, + # PF400 IK expects a shoulder/reference rail position even on rail-less arms. + # Mirror _cart_to_joints(): omitted pose fields inherit from the previous pose. + rail_position=planned_pose.rail_position + if pose.rail_position is None + else pose.rail_position, + ) + ik_joints = _snap_to_current( + kinematics.ik(cart, p=self._kinematics_params), + planned_joints, + cart.wrist, + ) + target = dict(planned_joints) + target[Axis.BASE] = ik_joints[1] + target[Axis.SHOULDER] = ik_joints[2] + target[Axis.ELBOW] = ik_joints[3] + target[Axis.WRIST] = ik_joints[4] + if self._has_rail and cart.rail_position is not None: + target[Axis.RAIL] = cart.rail_position + + self._assert_within_soft_limits(planned_joints, target) + targets.append(target) + planned_joints = target + planned_pose = cart + return targets + # -- speed & motion profiles -------------------------------------------------------------- # [cmd] @@ -1811,6 +1856,22 @@ class MoveToLocationParams(BackendParams): wrist: Optional[Wrist] = None rail_position: Optional[float] = None + # [cmd] + @dataclass + class MoveThroughCartesianPosesParams(BackendParams): + """PreciseFlex arm parameters for blended Cartesian pose routes. + + Args: + speed_pct: Movement speed override as a percentage (0-100). If None, uses the + current speed setting. + blend: When True, temporarily set the active motion profile's ``InRange`` value to + ``-1`` so the controller may blend through intermediate waypoints instead of stopping + at each one. The original profile is restored after the final waypoint is reached. + """ + + speed_pct: Optional[float] = None + blend: bool = True + async def move_to_location( self, location: Coordinate, @@ -1839,6 +1900,66 @@ async def move_to_location( joints = await self._cart_to_joints(coords) await self._move_j(profile_index=self.profile_index, joint_coords=joints) + # [cmd] + async def move_through_cartesian_poses( + self, + poses: Sequence[PreciseFlexCartesianPose], + backend_params: Optional[BackendParams] = None, + ) -> None: + """Move through a sequence of Cartesian poses using one planned IK route. + + The standard Cartesian move path snapshots the current state for each waypoint, + which waits for end-of-motion between moves. For taught air-transit routes, this + method snapshots state once, plans each subsequent IK target from the previous + planned target, queues the joint moves, and waits only after the final waypoint. + + This is a PreciseFlex-specific primitive: intermediate waypoints may be blended + by the controller and should not be used for operations that require an exact + stop, gripper action, or physical contact at every pose. + """ + if not isinstance(backend_params, self.MoveThroughCartesianPosesParams): + backend_params = PreciseFlexArmBackend.MoveThroughCartesianPosesParams() + if not poses: + return + if backend_params.speed_pct is not None: + await self._set_speed(backend_params.speed_pct) + + planned_joints, planned_pose = await self._request_state() + targets = self._plan_cartesian_pose_route(poses, planned_joints, planned_pose) + + profile_index = self.profile_index + original_profile = None + should_restore_profile = False + if backend_params.blend: + original_profile = await self.request_motion_profile_values(profile_index) + should_restore_profile = original_profile[7] != -1 + if should_restore_profile: + await self.set_motion_profile_values( + original_profile[0], + original_profile[1], + original_profile[2], + original_profile[3], + original_profile[4], + original_profile[5], + original_profile[6], + -1, + original_profile[8], + ) + + moves_sent = 0 + waited_for_eom = False + try: + for target in targets: + await self._move_j(profile_index=profile_index, joint_coords=target) + moves_sent += 1 + await self.driver._wait_for_eom() + waited_for_eom = True + finally: + if should_restore_profile and original_profile is not None: + if moves_sent > 0 and not waited_for_eom: + await self.driver._wait_for_eom() + await self.set_motion_profile_values(*original_profile) + # [cmd] async def dest_c(self, arg1: int = 0) -> tuple[float, float, float, float, float, float, int]: """Get the destination or current Cartesian location of the robot. diff --git a/pylabrobot/brooks/precise_flex/tests/arm_backend_tests.py b/pylabrobot/brooks/precise_flex/tests/arm_backend_tests.py index 90589a4a034..a5155dbfe1b 100644 --- a/pylabrobot/brooks/precise_flex/tests/arm_backend_tests.py +++ b/pylabrobot/brooks/precise_flex/tests/arm_backend_tests.py @@ -1,8 +1,9 @@ import unittest from typing import Tuple -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, MagicMock, patch -from pylabrobot.brooks.precise_flex import Axis, PreciseFlexArmBackend +from pylabrobot.brooks.precise_flex import Axis, PreciseFlexArmBackend, PreciseFlexCartesianPose +from pylabrobot.resources import Coordinate, Rotation def _make_backend( @@ -210,3 +211,142 @@ async def test_park_without_position_falls_back_to_movetosafe(self): await self.backend.park() self.driver.send_command.assert_awaited_once_with("movetosafe") self.assertEqual(self._movej_cmds(), []) + + +class TestPreciseFlexSmoothCartesianRoute(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self.backend, self.driver = _make_backend() + self.driver._wait_for_eom = AsyncMock() + self.current_joints = { + Axis.BASE: 100.0, + Axis.SHOULDER: 0.0, + Axis.ELBOW: 180.0, + Axis.WRIST: 180.0, + Axis.GRIPPER: 70.0, + } + self.current_pose = PreciseFlexCartesianPose( + location=Coordinate(10.0, 20.0, 100.0), + rotation=Rotation(x=-180.0, y=90.0, z=0.0), + rail_position=123.0, + orientation="right", + wrist="ccw", + ) + self.backend._request_state = AsyncMock(return_value=(self.current_joints, self.current_pose)) + + def _stub_profile_transport(self, profile: str = "1 50 0 100 100 0 0 25 0") -> None: + async def respond(command: str) -> str: + if command == "Profile 1": + return profile + return "" + + self.driver.send_command = AsyncMock(side_effect=respond) + + def _movej_cmds(self) -> list[str]: + return [ + c.args[0] for c in self.driver.send_command.call_args_list if c.args[0].startswith("moveJ") + ] + + def _profile_cmds(self) -> list[str]: + return [ + c.args[0] + for c in self.driver.send_command.call_args_list + if c.args[0].startswith("Profile") + ] + + async def test_move_through_cartesian_poses_plans_from_one_state_snapshot(self): + """A smooth route snapshots state once, fills omitted pose fields from the planned pose, + queues all joint moves, then waits once at the end.""" + self._stub_profile_transport() + poses = [ + PreciseFlexCartesianPose( + location=Coordinate(200.0, 20.0, 110.0), + rotation=Rotation(x=-180.0, y=90.0, z=10.0), + ), + PreciseFlexCartesianPose( + location=Coordinate(210.0, 20.0, 120.0), + rotation=Rotation(x=-180.0, y=90.0, z=20.0), + ), + ] + + with patch( + "pylabrobot.brooks.precise_flex.arm_backend.kinematics.ik", + side_effect=[ + {1: 110.0, 2: 10.0, 3: 20.0, 4: 30.0, 6: 123.0}, + {1: 120.0, 2: 11.0, 3: 21.0, 4: 31.0, 6: 123.0}, + ], + ) as ik: + await self.backend.move_through_cartesian_poses(poses) + + self.backend._request_state.assert_awaited_once() + self.driver._wait_for_eom.assert_awaited_once() + self.assertEqual( + self._movej_cmds(), + [ + "moveJ 1 110.0 10.0 20.0 30.0 70.0", + "moveJ 1 120.0 11.0 21.0 31.0 70.0", + ], + ) + planned_pose_args = [call.args[0] for call in ik.call_args_list] + self.assertEqual([pose.orientation for pose in planned_pose_args], ["right", "right"]) + self.assertEqual([pose.wrist for pose in planned_pose_args], ["ccw", "ccw"]) + # Rail-less PF400 still needs the shoulder/reference rail position for IK. + self.assertEqual([pose.rail_position for pose in planned_pose_args], [123.0, 123.0]) + + async def test_move_through_cartesian_poses_temporarily_enables_blending(self): + self._stub_profile_transport() + pose = PreciseFlexCartesianPose( + location=Coordinate(200.0, 20.0, 110.0), + rotation=Rotation(x=-180.0, y=90.0, z=10.0), + ) + + with patch( + "pylabrobot.brooks.precise_flex.arm_backend.kinematics.ik", + return_value={1: 110.0, 2: 10.0, 3: 20.0, 4: 30.0, 6: 123.0}, + ): + await self.backend.move_through_cartesian_poses([pose]) + + self.assertEqual( + self._profile_cmds(), + [ + "Profile 1", + "Profile 1 50.0 0.0 100.0 100.0 0.0 0.0 -1 0", + "Profile 1 50.0 0.0 100.0 100.0 0.0 0.0 25.0 0", + ], + ) + + async def test_move_through_cartesian_poses_can_skip_profile_blending(self): + pose = PreciseFlexCartesianPose( + location=Coordinate(200.0, 20.0, 110.0), + rotation=Rotation(x=-180.0, y=90.0, z=10.0), + ) + + with patch( + "pylabrobot.brooks.precise_flex.arm_backend.kinematics.ik", + return_value={1: 110.0, 2: 10.0, 3: 20.0, 4: 30.0, 6: 123.0}, + ): + await self.backend.move_through_cartesian_poses( + [pose], + backend_params=PreciseFlexArmBackend.MoveThroughCartesianPosesParams(blend=False), + ) + + self.assertEqual(self._profile_cmds(), []) + self.assertEqual(self._movej_cmds(), ["moveJ 1 110.0 10.0 20.0 30.0 70.0"]) + self.driver._wait_for_eom.assert_awaited_once() + + async def test_move_through_cartesian_poses_blocks_before_motion_on_limit_failure(self): + pose = PreciseFlexCartesianPose( + location=Coordinate(200.0, 20.0, 110.0), + rotation=Rotation(x=-180.0, y=90.0, z=10.0), + ) + self.backend._assert_within_soft_limits = MagicMock(side_effect=ValueError("bad target")) + + with patch( + "pylabrobot.brooks.precise_flex.arm_backend.kinematics.ik", + return_value={1: 110.0, 2: 10.0, 3: 20.0, 4: 30.0, 6: 123.0}, + ): + with self.assertRaisesRegex(ValueError, "bad target"): + await self.backend.move_through_cartesian_poses([pose]) + + self.assertEqual(self._movej_cmds(), []) + self.assertEqual(self._profile_cmds(), []) + self.driver._wait_for_eom.assert_not_awaited() diff --git a/todo-documentation-backendparameters.md b/todo-documentation-backendparameters.md index d9a2d9b3de7..0bc5785b155 100644 --- a/todo-documentation-backendparameters.md +++ b/todo-documentation-backendparameters.md @@ -62,6 +62,7 @@ classes received new or expanded docstrings in this pass: - `PreciseFlexArmBackend.DropParams` -- new docstring - `PreciseFlexArmBackend.MoveToJointPositionParams` -- new docstring - `PreciseFlexArmBackend.MoveToLocationParams` -- new docstring +- `PreciseFlexArmBackend.MoveThroughCartesianPosesParams` -- new docstring ### Already documented (no changes needed) - `MultidropCombiPeristalticDispensingBackend.DispenseParams` From 654b2a90bc8cf4c0e794460768247d8216e7c255 Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Fri, 26 Jun 2026 20:58:41 -0700 Subject: [PATCH 02/10] refactor(PF400): extract _apply_ik_arm_axes helper for IK joint merge Replace the repeated four-line ik_joints[1..4] assignments in _cart_to_joints and _plan_cartesian_pose_route with a named helper that merges only the IK-solved arm axes (BASE/SHOULDER/ELBOW/WRIST) onto a full joint pose, keeping gripper and rail from the base. Documents why only these four axes are touched (gripper is independent; rail must not appear on rail-less arms) and drops the magic integer indices. Co-Authored-By: Claude Opus 4.8 --- pylabrobot/brooks/precise_flex/arm_backend.py | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/pylabrobot/brooks/precise_flex/arm_backend.py b/pylabrobot/brooks/precise_flex/arm_backend.py index 895a2b7f1d5..98da0604d07 100644 --- a/pylabrobot/brooks/precise_flex/arm_backend.py +++ b/pylabrobot/brooks/precise_flex/arm_backend.py @@ -86,6 +86,21 @@ def _snap_to_current(ik_joints: JointPose, current: JointPose, wrist: Optional[W return out +# The arm axes IK solves for. Gripper and rail are not part of the Cartesian solution: the gripper +# opening is independent, and the rail is set explicitly by the caller (`ik` echoes a rail value but +# rail-less arms must not gain a RAIL axis), so a merge must touch only these four. +_IK_ARM_AXES = (Axis.BASE, Axis.SHOULDER, Axis.ELBOW, Axis.WRIST) + + +def _apply_ik_arm_axes(base: JointPose, ik_joints: JointPose) -> JointPose: + """Return a copy of `base` with the four IK-solved arm axes overwritten from `ik_joints`. + + `base` carries the full pose (including gripper and rail); only BASE/SHOULDER/ELBOW/WRIST come + from IK, so the result stays a complete joint dict usable directly by the joint-move commands. + """ + return {**base, **{axis: ik_joints[axis] for axis in _IK_ARM_AXES}} + + class PreciseFlexArmBackend(OrientableGripperArmBackend, HasJoints, CanFreedrive): """Backend for the PreciseFlex robotic arm. @@ -504,10 +519,7 @@ async def _cart_to_joints(self, cart: PreciseFlexCartesianPose) -> JointPose: rail_position=current.rail_position if cart.rail_position is None else cart.rail_position, ) ik_joints = _snap_to_current(kinematics.ik(cart, p=self._kinematics_params), joints, cart.wrist) - joints[Axis.BASE] = ik_joints[1] - joints[Axis.SHOULDER] = ik_joints[2] - joints[Axis.ELBOW] = ik_joints[3] - joints[Axis.WRIST] = ik_joints[4] + joints = _apply_ik_arm_axes(joints, ik_joints) if cart.rail_position is not None: joints[Axis.RAIL] = cart.rail_position return joints @@ -543,11 +555,7 @@ def _plan_cartesian_pose_route( planned_joints, cart.wrist, ) - target = dict(planned_joints) - target[Axis.BASE] = ik_joints[1] - target[Axis.SHOULDER] = ik_joints[2] - target[Axis.ELBOW] = ik_joints[3] - target[Axis.WRIST] = ik_joints[4] + target = _apply_ik_arm_axes(planned_joints, ik_joints) if self._has_rail and cart.rail_position is not None: target[Axis.RAIL] = cart.rail_position From ff86d6e36c7048f4cc34710bc0aea5f0c25eb395 Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Fri, 26 Jun 2026 21:22:49 -0700 Subject: [PATCH 03/10] refactor(PF400): merge IK arm axes with a loop over the Axis enum In _cart_to_joints and _plan_cartesian_pose_route, replace the four explicit ik_joints[1..4] assignments with a short loop over the arm axes (BASE/SHOULDER/ELBOW/WRIST), dropping the magic integer indices. Gripper and rail are untouched: IK doesn't solve the gripper, and the rail is set explicitly so rail-less arms never gain a RAIL axis. Co-Authored-By: Claude Opus 4.8 --- pylabrobot/brooks/precise_flex/arm_backend.py | 24 ++++++------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/pylabrobot/brooks/precise_flex/arm_backend.py b/pylabrobot/brooks/precise_flex/arm_backend.py index 98da0604d07..fd18d779348 100644 --- a/pylabrobot/brooks/precise_flex/arm_backend.py +++ b/pylabrobot/brooks/precise_flex/arm_backend.py @@ -86,21 +86,6 @@ def _snap_to_current(ik_joints: JointPose, current: JointPose, wrist: Optional[W return out -# The arm axes IK solves for. Gripper and rail are not part of the Cartesian solution: the gripper -# opening is independent, and the rail is set explicitly by the caller (`ik` echoes a rail value but -# rail-less arms must not gain a RAIL axis), so a merge must touch only these four. -_IK_ARM_AXES = (Axis.BASE, Axis.SHOULDER, Axis.ELBOW, Axis.WRIST) - - -def _apply_ik_arm_axes(base: JointPose, ik_joints: JointPose) -> JointPose: - """Return a copy of `base` with the four IK-solved arm axes overwritten from `ik_joints`. - - `base` carries the full pose (including gripper and rail); only BASE/SHOULDER/ELBOW/WRIST come - from IK, so the result stays a complete joint dict usable directly by the joint-move commands. - """ - return {**base, **{axis: ik_joints[axis] for axis in _IK_ARM_AXES}} - - class PreciseFlexArmBackend(OrientableGripperArmBackend, HasJoints, CanFreedrive): """Backend for the PreciseFlex robotic arm. @@ -519,7 +504,9 @@ async def _cart_to_joints(self, cart: PreciseFlexCartesianPose) -> JointPose: rail_position=current.rail_position if cart.rail_position is None else cart.rail_position, ) ik_joints = _snap_to_current(kinematics.ik(cart, p=self._kinematics_params), joints, cart.wrist) - joints = _apply_ik_arm_axes(joints, ik_joints) + # IK only solves the arm axes; gripper and rail keep their current values. + for axis in (Axis.BASE, Axis.SHOULDER, Axis.ELBOW, Axis.WRIST): + joints[axis] = ik_joints[axis] if cart.rail_position is not None: joints[Axis.RAIL] = cart.rail_position return joints @@ -555,7 +542,10 @@ def _plan_cartesian_pose_route( planned_joints, cart.wrist, ) - target = _apply_ik_arm_axes(planned_joints, ik_joints) + # IK only solves the arm axes; gripper and rail keep the planned values. + target = dict(planned_joints) + for axis in (Axis.BASE, Axis.SHOULDER, Axis.ELBOW, Axis.WRIST): + target[axis] = ik_joints[axis] if self._has_rail and cart.rail_position is not None: target[Axis.RAIL] = cart.rail_position From 1f3e72b71d9a8db8324e439959d75e8561e58177 Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Sat, 27 Jun 2026 17:23:41 -0700 Subject: [PATCH 04/10] refactor(PF400): drop [cmd] and [int] method tags Remove the 75 [cmd] and 36 [int] inline tags, keeping only [PLR]. Update the module docstring: a [PLR]-tagged method implements a capability-ABC method PLR core drives; everything else is a public firmware command (no leading underscore) or a private helper (leading underscore), so the remaining two categories are already distinguishable by name. Co-Authored-By: Claude Opus 4.8 --- pylabrobot/brooks/precise_flex/arm_backend.py | 117 +----------------- 1 file changed, 3 insertions(+), 114 deletions(-) diff --git a/pylabrobot/brooks/precise_flex/arm_backend.py b/pylabrobot/brooks/precise_flex/arm_backend.py index fd18d779348..2f351e96368 100644 --- a/pylabrobot/brooks/precise_flex/arm_backend.py +++ b/pylabrobot/brooks/precise_flex/arm_backend.py @@ -12,9 +12,9 @@ - L3 - high-level capability API: joint-space and cartesian motion, gripper, rail, pick & place, parking. Within each group methods run reads (``request_``) then writes (``set_``) then actions, with private -helpers last. Each method carries a tag for who calls it: ``[PLR]`` implements a capability-ABC method -that PyLabRobot core drives, ``[cmd]`` is a public PreciseFlex firmware command outside the ABC, and -``[int]`` is a private helper or primitive. +helpers last. A method that implements a capability-ABC method PyLabRobot core drives is tagged +``[PLR]``; the rest are public PreciseFlex firmware commands (no leading underscore) or private +helpers/primitives (leading underscore). """ import dataclasses @@ -215,7 +215,6 @@ async def _on_setup(self, backend_params: Optional[BackendParams] = None) -> Non # -- parsing helpers ---------------------------------------------------------------------- - # [int] def _parse_xyz_response( self, parts: List[str] ) -> tuple[float, float, float, float, float, float]: @@ -230,7 +229,6 @@ def _parse_xyz_response( float(parts[5]), ) - # [int] def _parse_angles_response(self, parts: List[str]) -> JointPose: """Parse angle values from a response string. @@ -259,7 +257,6 @@ def _parse_angles_response(self, parts: List[str]) -> JointPose: # -- raw parameters ----------------------------------------------------------------------- - # [cmd] async def request_parameter( self, data_id: int, @@ -292,7 +289,6 @@ async def request_parameter( response = await self.driver.send_command(f"pd {data_id}") return response - # [cmd] async def set_parameter( self, data_id: int, @@ -329,7 +325,6 @@ async def set_parameter( else: await self.driver.send_command(f"pc {data_id} {value}") - # [cmd] async def set_axis_parameter( self, data_id: int, @@ -359,7 +354,6 @@ async def set_axis_parameter( data_id, value, unit_number=robot_number, sub_unit=0, array_index=int(axis) ) - # [cmd] async def nop(self) -> None: """No operation command. @@ -370,7 +364,6 @@ async def nop(self) -> None: # -- digital I/O -------------------------------------------------------------------------- - # [cmd] async def request_signal(self, signal_number: int) -> int: """Get the value of the specified digital input or output signal. @@ -384,7 +377,6 @@ async def request_signal(self, signal_number: int) -> int: sig_id, sig_val = response.split() return int(sig_val) - # [cmd] async def set_signal(self, signal_number: int, value: int) -> None: """Set the specified digital input or output signal. @@ -400,7 +392,6 @@ async def set_signal(self, signal_number: int, value: int) -> None: # -- motion primitives -------------------------------------------------------------------- - # [int] async def _move_j(self, profile_index: int, joint_coords: JointPose) -> None: """Move the robot using joint coordinates, handling rail configuration.""" if self._has_rail: @@ -422,7 +413,6 @@ async def _move_j(self, profile_index: int, joint_coords: JointPose) -> None: ) await self.driver.send_command(f"moveJ {profile_index} {angles_str}") - # [int] async def _move_one_axis(self, axis: Axis, position: float) -> None: """Move a single axis to an absolute position (firmware ``MoveOneAxis``). @@ -432,7 +422,6 @@ async def _move_one_axis(self, axis: Axis, position: float) -> None: """ await self.driver.send_command(f"MoveOneAxis {int(axis)} {position} {self.profile_index}") - # [int] async def _move_to_stored_location(self, location_index: int, profile_index: int) -> None: """Move to the location specified by the station index using the specified profile. @@ -445,7 +434,6 @@ async def _move_to_stored_location(self, location_index: int, profile_index: int """ await self.driver.send_command(f"move {location_index} {profile_index}") - # [int] async def _move_to_stored_location_appro(self, location_index: int, profile_index: int) -> None: """Approach the location specified by the station index using the specified profile. @@ -460,7 +448,6 @@ async def _move_to_stored_location_appro(self, location_index: int, profile_inde """ await self.driver.send_command(f"moveAppro {location_index} {profile_index}") - # [int] async def _set_joint_angles( self, location_index: int, @@ -487,7 +474,6 @@ async def _set_joint_angles( f"{joint_position[Axis.GRIPPER]}" ) - # [int] async def _cart_to_joints(self, cart: PreciseFlexCartesianPose) -> JointPose: """Convert a Cartesian location into a full joint dict using our IK. @@ -511,7 +497,6 @@ async def _cart_to_joints(self, cart: PreciseFlexCartesianPose) -> JointPose: joints[Axis.RAIL] = cart.rail_position return joints - # [int] def _plan_cartesian_pose_route( self, poses: Sequence[PreciseFlexCartesianPose], @@ -557,7 +542,6 @@ def _plan_cartesian_pose_route( # -- speed & motion profiles -------------------------------------------------------------- - # [cmd] async def request_monitor_speed(self) -> int: """Get the global system (monitor) speed. @@ -567,7 +551,6 @@ async def request_monitor_speed(self) -> int: response = await self.driver.send_command("mspeed") return int(response) - # [cmd] async def set_monitor_speed(self, speed_pct: int) -> None: """Set the global system (monitor) speed. @@ -581,7 +564,6 @@ async def set_monitor_speed(self, speed_pct: int) -> None: raise ValueError(f"speed_pct must be between 0 and 100, got {speed_pct}") await self.driver.send_command(f"mspeed {speed_pct}") - # [cmd] async def request_payload(self) -> int: """Get the payload percent value for the current robot. @@ -591,7 +573,6 @@ async def request_payload(self) -> int: response = await self.driver.send_command("payload") return int(response) - # [cmd] async def set_payload(self, payload_pct: int) -> None: """Set the payload percent of maximum for the currently selected or attached robot. @@ -608,7 +589,6 @@ async def set_payload(self, payload_pct: int) -> None: raise ValueError("Payload percent must be between 0 and 100") await self.driver.send_command(f"payload {payload_pct}") - # [cmd] async def request_profile_speed(self, profile_index: int) -> float: """Get the speed property of the specified profile. @@ -622,7 +602,6 @@ async def request_profile_speed(self, profile_index: int) -> float: profile, speed = response.split() return float(speed) - # [cmd] async def set_profile_speed(self, profile_index: int, speed_pct: float) -> None: """Set the speed property of the specified profile. @@ -637,7 +616,6 @@ async def set_profile_speed(self, profile_index: int, speed_pct: float) -> None: raise ValueError(f"speed_pct must be between 0 and 100, got {speed_pct}") await self.driver.send_command(f"Speed {profile_index} {speed_pct}") - # [cmd] async def request_profile_speed2(self, profile_index: int) -> float: """Get the speed2 property of the specified profile. @@ -651,7 +629,6 @@ async def request_profile_speed2(self, profile_index: int) -> float: profile, speed2 = response.split() return float(speed2) - # [cmd] async def set_profile_speed2(self, profile_index: int, speed2_pct: float) -> None: """Set the speed2 property of the specified profile. @@ -667,7 +644,6 @@ async def set_profile_speed2(self, profile_index: int, speed2_pct: float) -> Non raise ValueError(f"speed2_pct must be between 0 and 100, got {speed2_pct}") await self.driver.send_command(f"Speed2 {profile_index} {speed2_pct}") - # [cmd] async def request_profile_acceleration(self, profile_index: int) -> float: """Get the acceleration property of the specified profile. @@ -681,7 +657,6 @@ async def request_profile_acceleration(self, profile_index: int) -> float: profile, acceleration = response.split() return float(acceleration) - # [cmd] async def set_profile_acceleration(self, profile_index: int, acceleration_pct: float) -> None: """Set the acceleration property of the specified profile. @@ -696,7 +671,6 @@ async def set_profile_acceleration(self, profile_index: int, acceleration_pct: f raise ValueError(f"acceleration_pct must be between 0 and 100, got {acceleration_pct}") await self.driver.send_command(f"Accel {profile_index} {acceleration_pct}") - # [cmd] async def request_profile_acceleration_ramp(self, profile_index: int) -> float: """Get the acceleration ramp property of the specified profile. @@ -710,7 +684,6 @@ async def request_profile_acceleration_ramp(self, profile_index: int) -> float: profile, acceleration_ramp = response.split() return float(acceleration_ramp) - # [cmd] async def set_profile_acceleration_ramp( self, profile_index: int, acceleration_ramp_seconds: float ) -> None: @@ -722,7 +695,6 @@ async def set_profile_acceleration_ramp( """ await self.driver.send_command(f"AccRamp {profile_index} {acceleration_ramp_seconds}") - # [cmd] async def request_profile_deceleration(self, profile_index: int) -> float: """Get the deceleration property of the specified profile. @@ -736,7 +708,6 @@ async def request_profile_deceleration(self, profile_index: int) -> float: profile, deceleration = response.split() return float(deceleration) - # [cmd] async def set_profile_deceleration(self, profile_index: int, deceleration_pct: float) -> None: """Set the deceleration property of the specified profile. @@ -751,7 +722,6 @@ async def set_profile_deceleration(self, profile_index: int, deceleration_pct: f raise ValueError(f"deceleration_pct must be between 0 and 100, got {deceleration_pct}") await self.driver.send_command(f"Decel {profile_index} {deceleration_pct}") - # [cmd] async def request_profile_deceleration_ramp(self, profile_index: int) -> float: """Get the deceleration ramp property of the specified profile. @@ -765,7 +735,6 @@ async def request_profile_deceleration_ramp(self, profile_index: int) -> float: profile, deceleration_ramp = response.split() return float(deceleration_ramp) - # [cmd] async def set_profile_deceleration_ramp( self, profile_index: int, deceleration_ramp_seconds: float ) -> None: @@ -777,7 +746,6 @@ async def set_profile_deceleration_ramp( """ await self.driver.send_command(f"DecRamp {profile_index} {deceleration_ramp_seconds}") - # [cmd] async def request_profile_in_range(self, profile_index: int) -> float: """Get the InRange property of the specified profile. @@ -794,7 +762,6 @@ async def request_profile_in_range(self, profile_index: int) -> float: profile, in_range = response.split() return float(in_range) - # [cmd] async def set_profile_in_range(self, profile_index: int, in_range_value: float) -> None: """Set the InRange property of the specified profile. @@ -812,7 +779,6 @@ async def set_profile_in_range(self, profile_index: int, in_range_value: float) raise ValueError("InRange value must be between -1 and 100") await self.driver.send_command(f"InRange {profile_index} {in_range_value}") - # [cmd] async def request_profile_straight(self, profile_index: int) -> bool: """Get the Straight property of the specified profile. @@ -828,7 +794,6 @@ async def request_profile_straight(self, profile_index: int) -> bool: profile, straight = response.split() return straight == "True" - # [cmd] async def set_profile_straight(self, profile_index: int, straight_mode: bool) -> None: """Set the Straight property of the specified profile. @@ -844,7 +809,6 @@ async def set_profile_straight(self, profile_index: int, straight_mode: bool) -> straight_int = 1 if straight_mode else 0 await self.driver.send_command(f"Straight {profile_index} {straight_int}") - # [cmd] async def request_motion_profile_values( self, profile: int ) -> tuple[int, float, float, float, float, float, float, float, bool]: @@ -882,7 +846,6 @@ async def request_motion_profile_values( int(parts[8]) != 0, ) - # [cmd] async def set_motion_profile_values( self, profile: int, @@ -929,19 +892,16 @@ async def set_motion_profile_values( f"{acceleration_ramp} {deceleration_ramp} {in_range} {straight_int}" ) - # [int] async def _set_speed(self, speed_pct: float): """Set the speed percentage of the arm's movement (0-100).""" await self.set_profile_speed(self.profile_index, speed_pct) - # [int] async def _request_speed(self) -> float: """Get the current speed percentage of the arm's movement.""" return await self.request_profile_speed(self.profile_index) # -- brakes, torque & freedrive ----------------------------------------------------------- - # [cmd] async def release_brake(self, axis: int) -> None: """Release the axis brake. @@ -954,7 +914,6 @@ async def release_brake(self, axis: int) -> None: """ await self.driver.send_command(f"releaseBrake {axis}") - # [cmd] async def set_brake(self, axis: int) -> None: """Set the axis brake. @@ -966,7 +925,6 @@ async def set_brake(self, axis: int) -> None: """ await self.driver.send_command(f"setBrake {axis}") - # [cmd] async def zero_torque(self, enable: bool, axis_mask: int = 1) -> None: """Sets or clears zero torque mode for the selected robot. @@ -1017,7 +975,6 @@ async def halt(self, backend_params: Optional[BackendParams] = None): # -- gripper primitives ------------------------------------------------------------------- - # [cmd] async def change_config(self, grip_mode: int = 0) -> None: """Change Robot configuration from Righty to Lefty or vice versa using customizable locations. @@ -1033,7 +990,6 @@ async def change_config(self, grip_mode: int = 0) -> None: """ await self.driver.send_command(f"ChangeConfig {grip_mode}") - # [cmd] async def change_config2(self, grip_mode: int = 0) -> None: """Change Robot configuration from Righty to Lefty or vice versa using algorithm. @@ -1049,7 +1005,6 @@ async def change_config2(self, grip_mode: int = 0) -> None: """ await self.driver.send_command(f"ChangeConfig2 {grip_mode}") - # [int] async def _request_grip_close_pos(self) -> float: """Get the gripper close position for the servoed gripper. @@ -1059,7 +1014,6 @@ async def _request_grip_close_pos(self) -> float: data = await self.driver.send_command("GripClosePos") return float(data) - # [int] async def _set_grip_close_pos(self, close_position: float) -> None: """Set the gripper close position for the servoed gripper. @@ -1070,7 +1024,6 @@ async def _set_grip_close_pos(self, close_position: float) -> None: """ await self.driver.send_command(f"GripClosePos {close_position}") - # [int] async def _request_grip_open_pos(self) -> float: """Get the gripper open position for the servoed gripper. @@ -1080,7 +1033,6 @@ async def _request_grip_open_pos(self) -> float: data = await self.driver.send_command("GripOpenPos") return float(data) - # [int] async def _set_grip_open_pos(self, open_position: float) -> None: """Set the gripper open position for the servoed gripper. @@ -1089,7 +1041,6 @@ async def _set_grip_open_pos(self, open_position: float) -> None: """ await self.driver.send_command(f"GripOpenPos {open_position}") - # [int] async def _request_grasp_data(self) -> tuple[float, float, float]: """Get the data to be used for the next force-controlled PickPlate command grip operation. @@ -1102,7 +1053,6 @@ async def _request_grasp_data(self) -> tuple[float, float, float]: raise PreciseFlexError(-1, "Unexpected response format from GraspData command.") return (float(parts[0]), float(parts[1]), float(parts[2])) - # [int] async def _set_grasp_data( self, plate_width: float, finger_speed_pct: float, grasp_force: float ) -> None: @@ -1124,12 +1074,10 @@ async def _set_grasp_data( raise ValueError(f"finger_speed_pct must be between 0 and 100, got {finger_speed_pct}") await self.driver.send_command(f"GraspData {plate_width} {finger_speed_pct} {grasp_force}") - # [int] async def _set_grip_detail(self): """Configure a default vertical station type for pick/place operations.""" await self.driver.send_command(f"StationType {self.location_index} 1 0 100 0 10") - # [int] def _mm_to_firmware_units(self, width_mm: float) -> float: """Convert a jaw width (mm) to the firmware's native position unit. @@ -1140,7 +1088,6 @@ def _mm_to_firmware_units(self, width_mm: float) -> float: # -- rail primitives ---------------------------------------------------------------------- - # [int] async def _set_rail_position(self, station_id: int, rail_position: float) -> None: """Set the rail position for the specified station. @@ -1150,7 +1097,6 @@ async def _set_rail_position(self, station_id: int, rail_position: float) -> Non """ await self.driver.send_command(f"Rail {station_id} {rail_position}") - # [int] async def _move_rail(self, station_id: Optional[int] = None, mode: int = 1) -> None: """Move the rail to the position stored at the specified station. @@ -1169,57 +1115,45 @@ async def _move_rail(self, station_id: Optional[int] = None, mode: int = 1) -> N # -- identity & status reads -------------------------------------------------------------- - # [cmd] async def request_manufacturer(self) -> str: return (await self.request_parameter(DataID.MANUFACTURER)).strip() - # [cmd] async def request_controller_model(self) -> str: return (await self.request_parameter(DataID.CONTROLLER_MODEL)).strip() - # [cmd] async def request_hardware_version(self) -> str: return (await self.request_parameter(DataID.HARDWARE_VERSION)).strip() - # [cmd] async def request_gpl_version(self) -> str: """Controller firmware/runtime version (distinct from ``request_version``, the TCS app).""" return (await self.request_parameter(DataID.GPL_VERSION)).strip() - # [cmd] async def request_controller_serial(self) -> str: return (await self.request_parameter(DataID.CONTROLLER_SERIAL)).strip() - # [cmd] async def request_robot_name(self) -> str: return (await self.request_parameter(DataID.ROBOT_NAME)).strip() - # [cmd] async def request_robot_type(self) -> int: """Built-in kinematic model id (PF400 = 12).""" return int(_parse_scalar(await self.request_parameter(DataID.ROBOT_TYPE))) - # [cmd] async def request_axis_count(self) -> int: """Number of servoed axes.""" return int(_parse_scalar(await self.request_parameter(DataID.NUM_AXES))) - # [cmd] async def request_extra_axis_count(self) -> int: """Number of non-servoed (extra) axes.""" return int(_parse_scalar(await self.request_parameter(DataID.EXTRA_AXES))) - # [cmd] async def request_axis_mask(self) -> int: """Capability/option bit field (rail, dual gripper, ...).""" return int(_parse_scalar(await self.request_parameter(DataID.AXIS_MASK))) - # [cmd] async def request_power_state(self) -> int: """Power / auto-execute state word.""" return int(_parse_scalar(await self.request_parameter(DataID.POWER_STATE))) - # [cmd] async def request_version(self) -> str: """Get the current version of TCS and any installed plug-ins. @@ -1230,7 +1164,6 @@ async def request_version(self) -> str: # -- kinematics & reference limits -------------------------------------------------------- - # [cmd] async def request_joint_limits(self, hard: bool = False) -> Dict[Axis, tuple[float, float]]: """Per-axis travel limits as {Axis: (min, max)}. @@ -1243,29 +1176,24 @@ async def request_joint_limits(self, hard: bool = False) -> Dict[Axis, tuple[flo _parse_per_axis(await self.request_parameter(max_id)), ) - # [cmd] async def request_reference_speed(self) -> Dict[Axis, float]: """Per-axis rated speed at 100%; J1/J5 in mm/s, J2-J4 in deg/s.""" return _parse_per_axis(await self.request_parameter(DataID.REFERENCE_SPEED)) - # [cmd] async def request_reference_acceleration(self) -> Dict[Axis, float]: """Per-axis rated acceleration at 100%.""" return _parse_per_axis(await self.request_parameter(DataID.REFERENCE_ACCEL)) - # [cmd] async def request_link_lengths(self) -> tuple[float, float]: """(l1, l2) SCARA link lengths in mm: shoulder->elbow, elbow->wrist.""" per_axis = _parse_per_axis(await self.request_parameter(DataID.LINK_LENGTHS)) return per_axis[Axis.SHOULDER], per_axis[Axis.ELBOW] - # [cmd] async def request_tool_length(self) -> float: """Wrist->TCP distance in mm (z of the tool-offset transform).""" values = [float(v) for v in (await self.request_parameter(DataID.TOOL_OFFSET)).split(",")] return values[2] - # [cmd] async def request_kinematic_parameters(self) -> "kinematics.PF400Params": """Build PF400Params from the controller's stored geometry. @@ -1280,34 +1208,28 @@ async def request_kinematic_parameters(self) -> "kinematics.PF400Params": gripper_length=await self.request_tool_length(), ) - # [cmd] async def request_reference_cartesian_speed(self) -> float: """Rated Cartesian (translational) speed at 100%, in mm/s.""" return _parse_scalar(await self.request_parameter(DataID.REFERENCE_CARTESIAN_SPEED)) - # [cmd] async def request_reference_cartesian_acceleration(self) -> float: """Rated Cartesian (translational) acceleration at 100%, in mm/s^2.""" return _parse_scalar(await self.request_parameter(DataID.REFERENCE_CARTESIAN_ACCEL)) - # [cmd] async def request_max_speed_percent(self) -> float: """Global cap on the speed percentage (one value, applies to all joints).""" return _parse_scalar(await self.request_parameter(DataID.MAX_SPEED_PERCENT)) - # [cmd] async def request_max_acceleration_percent(self) -> float: """Global cap on the acceleration percentage (one value, applies to all joints).""" return _parse_scalar(await self.request_parameter(DataID.MAX_ACCEL_PERCENT)) - # [cmd] async def request_max_deceleration_percent(self) -> float: """Global cap on the deceleration percentage (one value, applies to all joints).""" return _parse_scalar(await self.request_parameter(DataID.MAX_DECEL_PERCENT)) # -- tool & base frame -------------------------------------------------------------------- - # [cmd] async def request_base(self) -> tuple[float, float, float, float]: """Get the robot base offset. @@ -1320,7 +1242,6 @@ async def request_base(self) -> tuple[float, float, float, float]: raise PreciseFlexError(-1, "Unexpected response format from base command.") return (float(parts[0]), float(parts[1]), float(parts[2]), float(parts[3])) - # [cmd] async def set_base( self, x_offset: float, y_offset: float, z_offset: float, z_rotation: float ) -> None: @@ -1338,7 +1259,6 @@ async def set_base( """ await self.driver.send_command(f"base {x_offset} {y_offset} {z_offset} {z_rotation}") - # [cmd] async def request_tool_transformation_values( self, ) -> tuple[float, float, float, float, float, float]: @@ -1356,7 +1276,6 @@ async def request_tool_transformation_values( x, y, z, yaw, pitch, roll = self._parse_xyz_response(parts) return (x, y, z, yaw, pitch, roll) - # [cmd] async def set_tool_transformation_values( self, x: float, y: float, z: float, yaw: float, pitch: float, roll: float ) -> None: @@ -1376,7 +1295,6 @@ async def set_tool_transformation_values( # -- robot selection ---------------------------------------------------------------------- - # [cmd] async def reset(self, robot_number: int) -> None: """Reset the threads associated with the specified robot. @@ -1393,7 +1311,6 @@ async def reset(self, robot_number: int) -> None: raise ValueError("Robot number must be greater than zero") await self.driver.send_command(f"reset {robot_number}") - # [cmd] async def request_selected_robot(self) -> int: """Get the number of the currently selected robot. @@ -1403,7 +1320,6 @@ async def request_selected_robot(self) -> int: response = await self.driver.send_command("selectRobot") return int(response) - # [cmd] async def select_robot(self, robot_number: int) -> None: """Change the robot associated with this communications link. @@ -1418,7 +1334,6 @@ async def select_robot(self, robot_number: int) -> None: # -- configuration discovery & adoption --------------------------------------------------- - # [cmd] @property def configuration(self) -> "PreciseFlexConfiguration": """The device configuration resolved at setup. Raises before setup().""" @@ -1426,7 +1341,6 @@ def configuration(self) -> "PreciseFlexConfiguration": raise RuntimeError("Configuration is not available until setup() has run.") return self._configuration - # [int] async def _request_configuration(self) -> "PreciseFlexConfiguration": """Read the controller's identity, axes, limits, kinematics, and envelope. @@ -1516,7 +1430,6 @@ async def _request_configuration(self) -> "PreciseFlexConfiguration": reach_class=reach_class, ) - # [int] async def _request_state( self, ) -> tuple[JointPose, PreciseFlexCartesianPose]: @@ -1527,7 +1440,6 @@ async def _request_state( pose = dataclasses.replace(pose, rotation=Rotation(x=-180, y=90, z=pose.rotation.yaw)) return joints, pose - # [int] def _adopt_configuration(self, config: "PreciseFlexConfiguration") -> None: """Adopt the discovered configuration as the source of truth for later commands. @@ -1542,7 +1454,6 @@ def _adopt_configuration(self, config: "PreciseFlexConfiguration") -> None: self._has_rail = config.has_rail self._is_dual_gripper = config.is_dual_gripper - # [int] def _assess_configuration(self, config: "PreciseFlexConfiguration") -> None: """Warn about an unsupported model, a missing TCS module, or an untested combo. @@ -1578,7 +1489,6 @@ def _assess_configuration(self, config: "PreciseFlexConfiguration") -> None: suggest_entry(config.robot_type, config.gpl_version, config.tcs_version, config.modules), ) - # [int] def _log_configuration_summary(self, config: "PreciseFlexConfiguration") -> None: """Log a single structured summary of the discovered device: name, connection, firmware, this unit's configuration, and the resulting capabilities.""" @@ -1615,7 +1525,6 @@ def _log_configuration_summary(self, config: "PreciseFlexConfiguration") -> None # -- homing & range recovery -------------------------------------------------------------- - # [cmd] # Axes auto-recovered when parked out of range, in a deliberately safe order: the # gripper jaw first (no arm motion), then the Z column (vertical clearance), then # the rotary links shoulder -> elbow (smallest swept volume last to first). @@ -1685,7 +1594,6 @@ async def recover_axes_within_limits( await self._set_speed(prior_speed) # don't leave the profile at the slow recovery speed return recovered - # [int] async def _is_robot_homed(self) -> bool: """Whether all axes are homed (DataID 2800). @@ -1694,7 +1602,6 @@ async def _is_robot_homed(self) -> bool: """ return _parse_scalar(await self.request_parameter(DataID.ROBOT_HOMED)) == 1.0 - # [int] async def _handle_out_of_range_axes(self) -> None: """Warn about every out-of-range axis, then correct what is recoverable, or raise. @@ -1739,7 +1646,6 @@ def fmt(axes: Dict[Axis, tuple]) -> str: f"its limit).", ) - # [int] def _axes_outside_soft_limits(self, joints: JointPose) -> Dict[Axis, tuple]: """Axes whose value lies outside their soft limit, as ``axis -> (value, (lo, hi))``. @@ -1756,7 +1662,6 @@ def _axes_outside_soft_limits(self, joints: JointPose) -> Dict[Axis, tuple]: outside[axis] = (value, (lo, hi)) return outside - # [int] def _assert_within_soft_limits(self, current: JointPose, target: JointPose) -> None: """Turn the controller's cryptic ``-1012`` into a clear client-side error. @@ -1854,7 +1759,6 @@ class MoveToLocationParams(BackendParams): wrist: Optional[Wrist] = None rail_position: Optional[float] = None - # [cmd] @dataclass class MoveThroughCartesianPosesParams(BackendParams): """PreciseFlex arm parameters for blended Cartesian pose routes. @@ -1898,7 +1802,6 @@ async def move_to_location( joints = await self._cart_to_joints(coords) await self._move_j(profile_index=self.profile_index, joint_coords=joints) - # [cmd] async def move_through_cartesian_poses( self, poses: Sequence[PreciseFlexCartesianPose], @@ -1958,7 +1861,6 @@ async def move_through_cartesian_poses( await self.driver._wait_for_eom() await self.set_motion_profile_values(*original_profile) - # [cmd] async def dest_c(self, arg1: int = 0) -> tuple[float, float, float, float, float, float, int]: """Get the destination or current Cartesian location of the robot. @@ -1983,7 +1885,6 @@ async def dest_c(self, arg1: int = 0) -> tuple[float, float, float, float, float config = int(parts[6]) return (x, y, z, yaw, pitch, roll, config) - # [cmd] async def dest_j(self, arg1: int = 0) -> JointPose: """Get the destination or current joint location of the robot. @@ -2006,7 +1907,6 @@ async def dest_j(self, arg1: int = 0) -> JointPose: raise PreciseFlexError(-1, "Unexpected response format from destJ command.") return self._parse_angles_response(parts) - # [cmd] async def here_j(self, location_index: int) -> None: """Record the current position of the selected robot into the specified Location as angles. @@ -2017,7 +1917,6 @@ async def here_j(self, location_index: int) -> None: """ await self.driver.send_command(f"hereJ {location_index}") - # [cmd] async def here_c(self, location_index: int) -> None: """Record the current position of the selected robot into the specified Location as Cartesian. @@ -2093,7 +1992,6 @@ async def is_gripper_closed(self, backend_params: Optional[BackendParams] = None response = await self.driver.send_command("IsFullyClosed") return int(response) == -1 - # [cmd] async def are_grippers_closed(self) -> tuple[bool, bool]: """(Dual Gripper Only) Tests if each gripper is fully closed by checking the end-of-travel sensors.""" if not self._is_dual_gripper: @@ -2106,7 +2004,6 @@ async def are_grippers_closed(self) -> tuple[bool, bool]: # -- rail --------------------------------------------------------------------------------- - # [cmd] async def move_rail(self, rail_position: float) -> None: """Move the rail to the specified position. @@ -2271,7 +2168,6 @@ async def drop_at_location( ) await self._place_plate_c(cartesian_position=coords) - # [int] async def _pick_plate_j(self, joint_position: JointPose): """Pick a plate from the specified position using joint coordinates.""" await self._set_joint_angles(self.location_index, joint_position) @@ -2283,7 +2179,6 @@ async def _pick_plate_j(self, joint_position: JointPose): if ret_code == "0": raise PreciseFlexError(-1, "the force-controlled gripper detected no plate present.") - # [int] async def _place_plate_j(self, joint_position: JointPose): """Place a plate at the specified position using joint coordinates.""" await self._set_joint_angles(self.location_index, joint_position) @@ -2293,13 +2188,11 @@ async def _place_plate_j(self, joint_position: JointPose): f"placeplate {self.location_index} {horizontal_compliance_int} {self.horizontal_compliance_torque}" ) - # [int] async def _pick_plate_c(self, cartesian_position: PreciseFlexCartesianPose): """Pick a plate at a Cartesian position via IK + joint-space pickplate.""" joints = await self._cart_to_joints(cartesian_position) await self._pick_plate_j(joints) - # [int] async def _place_plate_c(self, cartesian_position: PreciseFlexCartesianPose): """Place a plate at a Cartesian position via IK + joint-space placeplate.""" joints = await self._cart_to_joints(cartesian_position) @@ -2307,7 +2200,6 @@ async def _place_plate_c(self, cartesian_position: PreciseFlexCartesianPose): # -- parking ------------------------------------------------------------------------------ - # [cmd] @property def parking_position(self) -> Optional[JointPose]: """The pose ``park()`` moves to. Assign one of the ``PARKING_POSITION_BACK/RIGHT/FRONT`` class @@ -2316,7 +2208,6 @@ def parking_position(self) -> Optional[JointPose]: ``PARKING_POSITION_RIGHT``. A pose that omits ``Axis.BASE`` has its Z filled at park time.""" return self._parking_position - # [cmd] @parking_position.setter def parking_position(self, position: Optional[JointPose]) -> None: if position is not None: @@ -2339,7 +2230,6 @@ async def park(self, backend_params: Optional[BackendParams] = None) -> None: else: await self.driver.send_command("movetosafe") - # [int] def _validate_parking_position(self, position: JointPose) -> None: """Reject anything that is not a JointPose of in-range axes (limits checked once known).""" if not isinstance(position, dict) or not position: @@ -2356,7 +2246,6 @@ def _validate_parking_position(self, position: JointPose) -> None: f"parking_position[{axis.name}]={value} is outside the soft limits [{lo}, {hi}]" ) - # [int] def _parking_pose_with_default_z(self, position: JointPose) -> JointPose: """Fill the Z column (``Axis.BASE``) at 3/4 of the discovered travel when the pose omits it.""" if Axis.BASE in position or self._configuration is None: From 4341447d788ab00b6ae582725ef68daeb3e337ac Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Sat, 27 Jun 2026 17:29:12 -0700 Subject: [PATCH 05/10] refactor(PF400): move _plan_cartesian_pose_route next to its caller Relocate the helper from the frames/IK group down to directly above move_through_cartesian_poses, its only caller, so the route planning reads in one place. Pure move, no behaviour change. Co-Authored-By: Claude Opus 4.8 --- pylabrobot/brooks/precise_flex/arm_backend.py | 86 +++++++++---------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/pylabrobot/brooks/precise_flex/arm_backend.py b/pylabrobot/brooks/precise_flex/arm_backend.py index 2f351e96368..3ab8b2deffe 100644 --- a/pylabrobot/brooks/precise_flex/arm_backend.py +++ b/pylabrobot/brooks/precise_flex/arm_backend.py @@ -497,49 +497,6 @@ async def _cart_to_joints(self, cart: PreciseFlexCartesianPose) -> JointPose: joints[Axis.RAIL] = cart.rail_position return joints - def _plan_cartesian_pose_route( - self, - poses: Sequence[PreciseFlexCartesianPose], - planned_joints: JointPose, - planned_pose: PreciseFlexCartesianPose, - ) -> List[JointPose]: - """Plan a Cartesian pose route into joint targets from an initial state snapshot. - - Unlike :meth:`_cart_to_joints`, this helper does not query the controller for every - waypoint. Omitted pose fields inherit from the *planned* previous pose so IK branch - selection remains continuous across the route. - """ - planned_joints = dict(planned_joints) - targets: List[JointPose] = [] - for pose in poses: - cart = dataclasses.replace( - pose, - orientation=planned_pose.orientation if pose.orientation is None else pose.orientation, - wrist=planned_pose.wrist if pose.wrist is None else pose.wrist, - # PF400 IK expects a shoulder/reference rail position even on rail-less arms. - # Mirror _cart_to_joints(): omitted pose fields inherit from the previous pose. - rail_position=planned_pose.rail_position - if pose.rail_position is None - else pose.rail_position, - ) - ik_joints = _snap_to_current( - kinematics.ik(cart, p=self._kinematics_params), - planned_joints, - cart.wrist, - ) - # IK only solves the arm axes; gripper and rail keep the planned values. - target = dict(planned_joints) - for axis in (Axis.BASE, Axis.SHOULDER, Axis.ELBOW, Axis.WRIST): - target[axis] = ik_joints[axis] - if self._has_rail and cart.rail_position is not None: - target[Axis.RAIL] = cart.rail_position - - self._assert_within_soft_limits(planned_joints, target) - targets.append(target) - planned_joints = target - planned_pose = cart - return targets - # -- speed & motion profiles -------------------------------------------------------------- async def request_monitor_speed(self) -> int: @@ -1802,6 +1759,49 @@ async def move_to_location( joints = await self._cart_to_joints(coords) await self._move_j(profile_index=self.profile_index, joint_coords=joints) + def _plan_cartesian_pose_route( + self, + poses: Sequence[PreciseFlexCartesianPose], + planned_joints: JointPose, + planned_pose: PreciseFlexCartesianPose, + ) -> List[JointPose]: + """Plan a Cartesian pose route into joint targets from an initial state snapshot. + + Unlike :meth:`_cart_to_joints`, this helper does not query the controller for every + waypoint. Omitted pose fields inherit from the *planned* previous pose so IK branch + selection remains continuous across the route. + """ + planned_joints = dict(planned_joints) + targets: List[JointPose] = [] + for pose in poses: + cart = dataclasses.replace( + pose, + orientation=planned_pose.orientation if pose.orientation is None else pose.orientation, + wrist=planned_pose.wrist if pose.wrist is None else pose.wrist, + # PF400 IK expects a shoulder/reference rail position even on rail-less arms. + # Mirror _cart_to_joints(): omitted pose fields inherit from the previous pose. + rail_position=planned_pose.rail_position + if pose.rail_position is None + else pose.rail_position, + ) + ik_joints = _snap_to_current( + kinematics.ik(cart, p=self._kinematics_params), + planned_joints, + cart.wrist, + ) + # IK only solves the arm axes; gripper and rail keep the planned values. + target = dict(planned_joints) + for axis in (Axis.BASE, Axis.SHOULDER, Axis.ELBOW, Axis.WRIST): + target[axis] = ik_joints[axis] + if self._has_rail and cart.rail_position is not None: + target[Axis.RAIL] = cart.rail_position + + self._assert_within_soft_limits(planned_joints, target) + targets.append(target) + planned_joints = target + planned_pose = cart + return targets + async def move_through_cartesian_poses( self, poses: Sequence[PreciseFlexCartesianPose], From 9a93e9d53861e6e8803d79d103048d61a8d1c943 Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Sat, 27 Jun 2026 17:31:31 -0700 Subject: [PATCH 06/10] refactor(PF400): move MoveThroughCartesianPosesParams next to its method Relocate the nested params dataclass from the params cluster down to directly above move_through_cartesian_poses, grouping the helper, params, and method together. Pure move, no behaviour change. Co-Authored-By: Claude Opus 4.8 --- pylabrobot/brooks/precise_flex/arm_backend.py | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/pylabrobot/brooks/precise_flex/arm_backend.py b/pylabrobot/brooks/precise_flex/arm_backend.py index 3ab8b2deffe..622da19c3af 100644 --- a/pylabrobot/brooks/precise_flex/arm_backend.py +++ b/pylabrobot/brooks/precise_flex/arm_backend.py @@ -1716,21 +1716,6 @@ class MoveToLocationParams(BackendParams): wrist: Optional[Wrist] = None rail_position: Optional[float] = None - @dataclass - class MoveThroughCartesianPosesParams(BackendParams): - """PreciseFlex arm parameters for blended Cartesian pose routes. - - Args: - speed_pct: Movement speed override as a percentage (0-100). If None, uses the - current speed setting. - blend: When True, temporarily set the active motion profile's ``InRange`` value to - ``-1`` so the controller may blend through intermediate waypoints instead of stopping - at each one. The original profile is restored after the final waypoint is reached. - """ - - speed_pct: Optional[float] = None - blend: bool = True - async def move_to_location( self, location: Coordinate, @@ -1802,6 +1787,21 @@ def _plan_cartesian_pose_route( planned_pose = cart return targets + @dataclass + class MoveThroughCartesianPosesParams(BackendParams): + """PreciseFlex arm parameters for blended Cartesian pose routes. + + Args: + speed_pct: Movement speed override as a percentage (0-100). If None, uses the + current speed setting. + blend: When True, temporarily set the active motion profile's ``InRange`` value to + ``-1`` so the controller may blend through intermediate waypoints instead of stopping + at each one. The original profile is restored after the final waypoint is reached. + """ + + speed_pct: Optional[float] = None + blend: bool = True + async def move_through_cartesian_poses( self, poses: Sequence[PreciseFlexCartesianPose], From b924d9742348ab0775b15aa2735aefd7ead2d4ba Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Sat, 27 Jun 2026 17:33:21 -0700 Subject: [PATCH 07/10] refactor(PF400): make _plan_cartesian_pose_route self-contained The planned_joints/planned_pose args were always just _request_state()'s output, so move that query inside the helper (now async) and have move_through_cartesian_poses call _plan_cartesian_pose_route(poses) directly. Rename the running planned_* accumulators to prev_* (they track the previous waypoint's result) and drop the redundant defensive copy of the freshly queried joints. Co-Authored-By: Claude Opus 4.8 --- pylabrobot/brooks/precise_flex/arm_backend.py | 39 +++++++++---------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/pylabrobot/brooks/precise_flex/arm_backend.py b/pylabrobot/brooks/precise_flex/arm_backend.py index 622da19c3af..0a129a65393 100644 --- a/pylabrobot/brooks/precise_flex/arm_backend.py +++ b/pylabrobot/brooks/precise_flex/arm_backend.py @@ -1744,47 +1744,45 @@ async def move_to_location( joints = await self._cart_to_joints(coords) await self._move_j(profile_index=self.profile_index, joint_coords=joints) - def _plan_cartesian_pose_route( - self, - poses: Sequence[PreciseFlexCartesianPose], - planned_joints: JointPose, - planned_pose: PreciseFlexCartesianPose, + async def _plan_cartesian_pose_route( + self, poses: Sequence[PreciseFlexCartesianPose] ) -> List[JointPose]: - """Plan a Cartesian pose route into joint targets from an initial state snapshot. + """Plan a Cartesian pose route into joint targets, snapshotting state once. - Unlike :meth:`_cart_to_joints`, this helper does not query the controller for every - waypoint. Omitted pose fields inherit from the *planned* previous pose so IK branch - selection remains continuous across the route. + Unlike :meth:`_cart_to_joints`, this does not query the controller for every waypoint: it + reads the current state once and resolves each waypoint's IK from the previous waypoint's + result. Omitted pose fields inherit from the previous pose so IK branch selection remains + continuous across the route. """ - planned_joints = dict(planned_joints) + prev_joints, prev_pose = await self._request_state() targets: List[JointPose] = [] for pose in poses: cart = dataclasses.replace( pose, - orientation=planned_pose.orientation if pose.orientation is None else pose.orientation, - wrist=planned_pose.wrist if pose.wrist is None else pose.wrist, + orientation=prev_pose.orientation if pose.orientation is None else pose.orientation, + wrist=prev_pose.wrist if pose.wrist is None else pose.wrist, # PF400 IK expects a shoulder/reference rail position even on rail-less arms. # Mirror _cart_to_joints(): omitted pose fields inherit from the previous pose. - rail_position=planned_pose.rail_position + rail_position=prev_pose.rail_position if pose.rail_position is None else pose.rail_position, ) ik_joints = _snap_to_current( kinematics.ik(cart, p=self._kinematics_params), - planned_joints, + prev_joints, cart.wrist, ) - # IK only solves the arm axes; gripper and rail keep the planned values. - target = dict(planned_joints) + # IK only solves the arm axes; gripper and rail keep the previous values. + target = dict(prev_joints) for axis in (Axis.BASE, Axis.SHOULDER, Axis.ELBOW, Axis.WRIST): target[axis] = ik_joints[axis] if self._has_rail and cart.rail_position is not None: target[Axis.RAIL] = cart.rail_position - self._assert_within_soft_limits(planned_joints, target) + self._assert_within_soft_limits(prev_joints, target) targets.append(target) - planned_joints = target - planned_pose = cart + prev_joints = target + prev_pose = cart return targets @dataclass @@ -1825,8 +1823,7 @@ async def move_through_cartesian_poses( if backend_params.speed_pct is not None: await self._set_speed(backend_params.speed_pct) - planned_joints, planned_pose = await self._request_state() - targets = self._plan_cartesian_pose_route(poses, planned_joints, planned_pose) + targets = await self._plan_cartesian_pose_route(poses) profile_index = self.profile_index original_profile = None From b30adb02b16f7274eb7aae9c2cabbdb8a2bcda7c Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Sat, 27 Jun 2026 17:36:14 -0700 Subject: [PATCH 08/10] refactor(PF400): single end-of-motion wait in finally for route move Drop the moves_sent / waited_for_eom bookkeeping. The success-path and profile-restore waits were the same settle, so do it once in finally: wait for end of motion, then restore the profile. Also waits before propagating on the error path now, so control returns with the arm stopped. Co-Authored-By: Claude Opus 4.8 --- pylabrobot/brooks/precise_flex/arm_backend.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/pylabrobot/brooks/precise_flex/arm_backend.py b/pylabrobot/brooks/precise_flex/arm_backend.py index 0a129a65393..b69c1bced84 100644 --- a/pylabrobot/brooks/precise_flex/arm_backend.py +++ b/pylabrobot/brooks/precise_flex/arm_backend.py @@ -1844,19 +1844,17 @@ async def move_through_cartesian_poses( original_profile[8], ) - moves_sent = 0 - waited_for_eom = False try: for target in targets: await self._move_j(profile_index=profile_index, joint_coords=target) - moves_sent += 1 - await self.driver._wait_for_eom() - waited_for_eom = True finally: - if should_restore_profile and original_profile is not None: - if moves_sent > 0 and not waited_for_eom: - await self.driver._wait_for_eom() - await self.set_motion_profile_values(*original_profile) + # Let queued motion settle before returning or restoring the profile - restoring InRange + # mid-move would change the in-flight blend. + try: + await self.driver._wait_for_eom() + finally: + if should_restore_profile and original_profile is not None: + await self.set_motion_profile_values(*original_profile) async def dest_c(self, arg1: int = 0) -> tuple[float, float, float, float, float, float, int]: """Get the destination or current Cartesian location of the robot. From 6e214412d5434f0c5c1dedb3e0a87ff7f7863f76 Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Sat, 27 Jun 2026 17:39:45 -0700 Subject: [PATCH 09/10] refactor(PF400): MotionProfile namedtuple, drop magic profile indices request_motion_profile_values now returns a MotionProfile namedtuple instead of a bare 9-tuple. The blend save/restore reads original_profile.in_range != BLEND_IN_RANGE and builds the temporary profile with ._replace(in_range=BLEND_IN_RANGE), replacing the [7] magic index, the -1 sentinel, and the nine positional original_profile[i] args. Co-Authored-By: Claude Opus 4.8 --- pylabrobot/brooks/precise_flex/arm_backend.py | 50 +++++++++---------- 1 file changed, 23 insertions(+), 27 deletions(-) diff --git a/pylabrobot/brooks/precise_flex/arm_backend.py b/pylabrobot/brooks/precise_flex/arm_backend.py index b69c1bced84..f3edb430708 100644 --- a/pylabrobot/brooks/precise_flex/arm_backend.py +++ b/pylabrobot/brooks/precise_flex/arm_backend.py @@ -21,7 +21,7 @@ import logging import warnings from dataclasses import dataclass -from typing import ClassVar, Dict, List, Literal, Optional, Sequence +from typing import ClassVar, Dict, List, Literal, NamedTuple, Optional, Sequence from pylabrobot.capabilities.arms.backend import ( CanFreedrive, @@ -48,6 +48,23 @@ logger = logging.getLogger(__name__) +# InRange sentinel that lets the controller blend through waypoints instead of stopping at each one. +BLEND_IN_RANGE = -1 + + +class MotionProfile(NamedTuple): + """A controller motion profile, as reported by ``Profile `` (field order matches the wire).""" + + profile: int + speed: float + speed2: float + acceleration: float + deceleration: float + acceleration_ramp: float + deceleration_ramp: float + in_range: float # -1 (BLEND_IN_RANGE) to 100; -1 blends, 0 stops, >0 enforces position accuracy + straight: bool # True = straight-line path, False = joint-based path + def _parse_scalar(response: str) -> float: """Parse the first numeric field of a DataID reply. @@ -766,9 +783,7 @@ async def set_profile_straight(self, profile_index: int, straight_mode: bool) -> straight_int = 1 if straight_mode else 0 await self.driver.send_command(f"Straight {profile_index} {straight_int}") - async def request_motion_profile_values( - self, profile: int - ) -> tuple[int, float, float, float, float, float, float, float, bool]: + async def request_motion_profile_values(self, profile: int) -> MotionProfile: """ Get the current motion profile values for the specified profile index on the PreciseFlex robot. @@ -776,22 +791,13 @@ async def request_motion_profile_values( profile: Profile index to get values for. Returns: - A tuple containing (profile, speed, speed2, acceleration, deceleration, acceleration_ramp, deceleration_ramp, in_range, straight) - - profile: Profile index - - speed: Percentage of maximum speed - - speed2: Secondary speed setting - - acceleration: Percentage of maximum acceleration - - deceleration: Percentage of maximum deceleration - - acceleration_ramp: Acceleration ramp time in seconds - - deceleration_ramp: Deceleration ramp time in seconds - - in_range: InRange value (-1 to 100) - - straight: True if straight-line path, False if joint-based path + A :class:`MotionProfile` with the profile's speed, acceleration, ramps, InRange and path mode. """ data = await self.driver.send_command(f"Profile {profile}") parts = data.split(" ") if len(parts) != 9: raise PreciseFlexError(-1, "Unexpected response format from device.") - return ( + return MotionProfile( int(parts[0]), float(parts[1]), float(parts[2]), @@ -1830,19 +1836,9 @@ async def move_through_cartesian_poses( should_restore_profile = False if backend_params.blend: original_profile = await self.request_motion_profile_values(profile_index) - should_restore_profile = original_profile[7] != -1 + should_restore_profile = original_profile.in_range != BLEND_IN_RANGE if should_restore_profile: - await self.set_motion_profile_values( - original_profile[0], - original_profile[1], - original_profile[2], - original_profile[3], - original_profile[4], - original_profile[5], - original_profile[6], - -1, - original_profile[8], - ) + await self.set_motion_profile_values(*original_profile._replace(in_range=BLEND_IN_RANGE)) try: for target in targets: From 229f13e8ab3bed5713003f6ea1efc68294086465 Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Sat, 27 Jun 2026 17:51:20 -0700 Subject: [PATCH 10/10] style(PF400): ruff format after merge Collapse two now-short lines ruff format flagged: the rail_position ternary (shortened by the planned_*->prev_* rename) and the _profile_cmds comprehension. Co-Authored-By: Claude Opus 4.8 --- pylabrobot/brooks/precise_flex/arm_backend.py | 4 +--- pylabrobot/brooks/precise_flex/tests/arm_backend_tests.py | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/pylabrobot/brooks/precise_flex/arm_backend.py b/pylabrobot/brooks/precise_flex/arm_backend.py index 78793836cc8..ebd7088e56d 100644 --- a/pylabrobot/brooks/precise_flex/arm_backend.py +++ b/pylabrobot/brooks/precise_flex/arm_backend.py @@ -1824,9 +1824,7 @@ async def _plan_cartesian_pose_route( wrist=prev_pose.wrist if pose.wrist is None else pose.wrist, # PF400 IK expects a shoulder/reference rail position even on rail-less arms. # Mirror _cart_to_joints(): omitted pose fields inherit from the previous pose. - rail_position=prev_pose.rail_position - if pose.rail_position is None - else pose.rail_position, + rail_position=prev_pose.rail_position if pose.rail_position is None else pose.rail_position, ) ik_joints = _snap_to_current( kinematics.ik(cart, p=self._kinematics_params), diff --git a/pylabrobot/brooks/precise_flex/tests/arm_backend_tests.py b/pylabrobot/brooks/precise_flex/tests/arm_backend_tests.py index 72698913a8a..4c75c61fbee 100644 --- a/pylabrobot/brooks/precise_flex/tests/arm_backend_tests.py +++ b/pylabrobot/brooks/precise_flex/tests/arm_backend_tests.py @@ -253,9 +253,7 @@ def _movej_cmds(self) -> list[str]: def _profile_cmds(self) -> list[str]: return [ - c.args[0] - for c in self.driver.send_command.call_args_list - if c.args[0].startswith("Profile") + c.args[0] for c in self.driver.send_command.call_args_list if c.args[0].startswith("Profile") ] async def test_move_through_cartesian_poses_plans_from_one_state_snapshot(self):