From 9544cf36765b7f18782f0fb1471e691f26712dea Mon Sep 17 00:00:00 2001 From: Claudio Date: Thu, 25 Jun 2026 16:28:53 +0200 Subject: [PATCH 1/3] `Stacker`: sequential ("stacking access") storage capability MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a `Stacker` capability for sequential/LIFO plate storage, the code counterpart to the "Stacking Access (Sequential)" category already in the storage docs. Per the design discussion in #1113, this is a composable capability (wrapping a `StackerBackend`) rather than a split frontend hierarchy, and it does not touch the existing random-access `Incubator`. - `Stacker(Machine, Resource)`: holds one or more single-ended LIFO `ResourceStack` stacks plus a transfer position (the "loading tray", borrowing the incubator's term). `downstack` moves the accessible (top) plate onto the tray; `upstack` moves a plate from the tray onto a stack. Stack height/positions come from `Plate.stacking_z_height` via ResourceStack nesting. - `StackerBackend`: minimal device interface (`set_stacks`, `downstack`, `upstack`) — deliberately no door/temperature/shaking. - `StackerChatterboxBackend` for tests/demos. Intended for the Agilent BenchCel and HighRes MicroServe; migrating the BenchCel onto this capability is a follow-up. Serialization round-trip is left for a follow-up (needs ResourceStack serialization support). Refs #1113. Co-Authored-By: Claude Opus 4.8 (1M context) --- CHANGELOG.md | 1 + pylabrobot/storage/__init__.py | 3 + pylabrobot/storage/stacker.py | 167 +++++++++++++++++++++++ pylabrobot/storage/stacker_backend.py | 42 ++++++ pylabrobot/storage/stacker_chatterbox.py | 19 +++ pylabrobot/storage/stacker_tests.py | 119 ++++++++++++++++ 6 files changed, 351 insertions(+) create mode 100644 pylabrobot/storage/stacker.py create mode 100644 pylabrobot/storage/stacker_backend.py create mode 100644 pylabrobot/storage/stacker_chatterbox.py create mode 100644 pylabrobot/storage/stacker_tests.py diff --git a/CHANGELOG.md b/CHANGELOG.md index b0c4231645a..ad229bf2c2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). - In-process `MicroSpinMockServer` (`pylabrobot.centrifuge.highres.mock_server`) that faithfully emulates the MicroSpin's wire protocol -- including the firmware's "`status` blocks until the spindle has stopped" semantics and the low-G spin-down-detection hang -- usable as a Python async context manager or runnable as a script (`python -m pylabrobot.centrifuge.highres.mock_server`) for `nc`/`telnet` debugging. - `MicroSpinBackend.reset()` recovery helper that issues `abort` -> `clearbuttonabort` -> `status`, using the last as the gate that genuinely confirms the rotor has stopped. - User guide notebook for the MicroSpin (`docs/user_guide/01_material-handling/centrifuge/highres_microspin.ipynb`). +- `Stacker` capability (`pylabrobot.storage.Stacker`) for sequential ("stacking access") plate storage: one or more single-ended LIFO `ResourceStack` stacks plus a transfer position ("loading tray"), with `downstack`/`upstack` operations and a `StackerBackend` interface (plus `StackerChatterboxBackend`). Intended for devices like the Agilent BenchCel and HighRes MicroServe (#1113). ### Fixed diff --git a/pylabrobot/storage/__init__.py b/pylabrobot/storage/__init__.py index 3ccfc9cd4de..6fbef36430a 100644 --- a/pylabrobot/storage/__init__.py +++ b/pylabrobot/storage/__init__.py @@ -4,3 +4,6 @@ from .incubator import Incubator from .inheco.scila import SCILABackend from .liconic import ExperimentalLiconicBackend +from .stacker import EmptyStackError, LoadingTrayOccupiedError, Stacker +from .stacker_backend import StackerBackend +from .stacker_chatterbox import StackerChatterboxBackend diff --git a/pylabrobot/storage/stacker.py b/pylabrobot/storage/stacker.py new file mode 100644 index 00000000000..5da71044343 --- /dev/null +++ b/pylabrobot/storage/stacker.py @@ -0,0 +1,167 @@ +from typing import List, Optional, Union + +from pylabrobot.machines import Machine +from pylabrobot.resources import ( + Coordinate, + Plate, + PlateHolder, + Resource, + ResourceNotFoundError, + Rotation, +) +from pylabrobot.resources.resource_stack import ResourceStack +from pylabrobot.serializer import serialize + +from .stacker_backend import StackerBackend + + +class EmptyStackError(Exception): + """Raised when downstacking from a stack that has no plates.""" + + +class LoadingTrayOccupiedError(Exception): + """Raised when a transfer would collide with a plate already on the loading tray.""" + + +class Stacker(Machine, Resource): + """Sequential ("stacking access") plate-storage capability. + + Models one or more single-ended LIFO stacks of (nesting) plates plus a single transfer + position -- the "loading tray", borrowing the incubator's term. Each stack is a + :class:`~pylabrobot.resources.resource_stack.ResourceStack` (``direction="z"``), which enforces + LIFO access (only the top plate can be removed) and computes the stack height from each plate's + ``stacking_z_height``. + + This is a *capability*: it is meant to be composed onto a machine (e.g. + ``self.stacker = Stacker(backend=...)``) rather than subclassed into a device-specific frontend. + Devices that are stackers include the Agilent BenchCel and the HighRes MicroServe. + """ + + def __init__( + self, + backend: StackerBackend, + name: str, + size_x: float, + size_y: float, + size_z: float, + stacks: List[ResourceStack], + loading_tray_location: Coordinate, + rotation: Optional[Rotation] = None, + category: Optional[str] = None, + model: Optional[str] = None, + ): + Machine.__init__(self, backend=backend) + self.backend: StackerBackend = backend # fix type + Resource.__init__( + self, + name=name, + size_x=size_x, + size_y=size_y, + size_z=size_z, + rotation=rotation, + category=category, + model=model, + ) + + self.loading_tray = PlateHolder( + name=self.name + "_tray", size_x=127.76, size_y=85.48, size_z=0, pedestal_size_z=0 + ) + self.assign_child_resource(self.loading_tray, location=loading_tray_location) + + self._stacks = stacks + for stack in self._stacks: + self.assign_child_resource(stack, location=None) + + @property + def stacks(self) -> List[ResourceStack]: + return self._stacks + + async def setup(self, **backend_kwargs): + await super().setup(**backend_kwargs) + await self.backend.set_stacks(self._stacks) + + def _resolve_stack(self, stack: Union[ResourceStack, int]) -> ResourceStack: + if isinstance(stack, int): + return self._stacks[stack] + if stack not in self._stacks: + raise ValueError(f"Stack {stack.name!r} is not part of stacker '{self.name}'") + return stack + + def get_accessible_plate(self, stack: Union[ResourceStack, int]) -> Optional[Plate]: + """The only plate that can be downstacked without moving others (the top of the stack).""" + stack = self._resolve_stack(stack) + if len(stack.children) == 0: + return None + top = stack.get_top_item() + return top if isinstance(top, Plate) else None + + def get_stack_by_plate_name(self, plate_name: str) -> ResourceStack: + for stack in self._stacks: + for child in stack.children: + if child.name == plate_name: + return stack + raise ResourceNotFoundError(f"Plate {plate_name} not found in stacker '{self.name}'") + + async def downstack(self, stack: Union[ResourceStack, int], **backend_kwargs) -> Plate: + """Move the accessible (top) plate of ``stack`` onto the loading tray and return it.""" + stack = self._resolve_stack(stack) + plate = self.get_accessible_plate(stack) + if plate is None: + raise EmptyStackError(f"Stack {stack.name!r} of stacker '{self.name}' is empty") + if self.loading_tray.resource is not None: + raise LoadingTrayOccupiedError( + f"Loading tray of stacker '{self.name}' already holds '{self.loading_tray.resource.name}'" + ) + await self.backend.downstack(stack, **backend_kwargs) + plate.unassign() + self.loading_tray.assign_child_resource(plate) + return plate + + async def upstack( + self, + stack: Union[ResourceStack, int], + plate: Optional[Plate] = None, + **backend_kwargs, + ) -> None: + """Move a plate from the loading tray onto ``stack`` (its new accessible plate). + + ``plate`` defaults to whatever is on the loading tray. + """ + stack = self._resolve_stack(stack) + if plate is None: + tray_resource = self.loading_tray.resource + if not isinstance(tray_resource, Plate): + raise ResourceNotFoundError(f"No plate on the loading tray of stacker '{self.name}'") + plate = tray_resource + await self.backend.upstack(stack, plate, **backend_kwargs) + plate.unassign() + stack.assign_child_resource(plate) + + def summary(self) -> str: + lines = [f"Stacker '{self.name}' ({len(self._stacks)} stacks)"] + for i, stack in enumerate(self._stacks): + # bottom -> top; the accessible plate is last. + contents = [child.name for child in stack.children] or [""] + lines.append(f" stack {i}: " + " -> ".join(contents) + " (top = accessible)") + tray = self.loading_tray.resource + lines.append(f" loading tray: {tray.name if tray is not None else ''}") + return "\n".join(lines) + + def serialize(self) -> dict: + return { + **Machine.serialize(self), + **Resource.serialize(self), + "backend": self.backend.serialize(), + "stacks": [stack.serialize() for stack in self._stacks], + "loading_tray_location": serialize(self.loading_tray.location), + } + + @classmethod + def deserialize(cls, data: dict, allow_marshal: bool = False) -> "Stacker": + # Deserialization is not supported yet: it needs ResourceStack serialization support + # (ResourceStack.__init__ takes ``direction`` rather than ``size_*``, so it does not round-trip + # through the generic Resource.(de)serialize path). Tracked as a follow-up. This override also + # resolves the otherwise-ambiguous ``deserialize`` inherited from both Machine and Resource. + raise NotImplementedError( + "Stacker.deserialize is not implemented yet (pending ResourceStack serialization support)." + ) diff --git a/pylabrobot/storage/stacker_backend.py b/pylabrobot/storage/stacker_backend.py new file mode 100644 index 00000000000..b7eb7913fd0 --- /dev/null +++ b/pylabrobot/storage/stacker_backend.py @@ -0,0 +1,42 @@ +from abc import ABCMeta, abstractmethod +from typing import List, Optional + +from pylabrobot.machines.backend import MachineBackend +from pylabrobot.resources.plate import Plate +from pylabrobot.resources.resource_stack import ResourceStack + + +class StackerBackend(MachineBackend, metaclass=ABCMeta): + """Backend interface for the sequential ("stacking access") :class:`Stacker` capability. + + A stacker stores plates in one or more single-ended LIFO stacks. Unlike an + incubator's random-access racks, only the accessible (top) plate of each stack can be moved + without first moving the plates above it. The device exposes two primitive transfers between a + stack and the stacker's transfer position (the "loading tray"): ``downstack`` (stack -> + transfer position) and ``upstack`` (transfer position -> stack). + + Backends are not incubators: there is deliberately no door/temperature/shaking here. A device + that both stores sequentially and, say, controls temperature would compose this capability with + a separate temperature-control capability. + """ + + def __init__(self) -> None: + super().__init__() + self._stacks: Optional[List[ResourceStack]] = None + + @property + def stacks(self) -> List[ResourceStack]: + assert self._stacks is not None, "Backend not set up?" + return self._stacks + + async def set_stacks(self, stacks: List[ResourceStack]) -> None: + """Configure the stacks the device manages. Called by :meth:`Stacker.setup`.""" + self._stacks = stacks + + @abstractmethod + async def downstack(self, stack: ResourceStack, **backend_kwargs) -> None: + """Move the accessible plate from ``stack`` to the stacker's transfer position.""" + + @abstractmethod + async def upstack(self, stack: ResourceStack, plate: Plate, **backend_kwargs) -> None: + """Move ``plate`` from the stacker's transfer position onto ``stack``.""" diff --git a/pylabrobot/storage/stacker_chatterbox.py b/pylabrobot/storage/stacker_chatterbox.py new file mode 100644 index 00000000000..ac516cb4cb2 --- /dev/null +++ b/pylabrobot/storage/stacker_chatterbox.py @@ -0,0 +1,19 @@ +from pylabrobot.resources.plate import Plate +from pylabrobot.resources.resource_stack import ResourceStack +from pylabrobot.storage.stacker_backend import StackerBackend + + +class StackerChatterboxBackend(StackerBackend): + """A no-op :class:`StackerBackend` that prints each operation; for tests and demos.""" + + async def setup(self): + print("Setting up stacker backend") + + async def stop(self): + print("Stopping stacker backend") + + async def downstack(self, stack: ResourceStack, **backend_kwargs): + print(f"Downstacking accessible plate from stack '{stack.name}'") + + async def upstack(self, stack: ResourceStack, plate: Plate, **backend_kwargs): + print(f"Upstacking plate '{plate.name}' onto stack '{stack.name}'") diff --git a/pylabrobot/storage/stacker_tests.py b/pylabrobot/storage/stacker_tests.py new file mode 100644 index 00000000000..3686e176e6b --- /dev/null +++ b/pylabrobot/storage/stacker_tests.py @@ -0,0 +1,119 @@ +"""Tests for the sequential Stacker capability.""" + +import unittest + +from pylabrobot.resources import Coordinate, Plate, ResourceNotFoundError +from pylabrobot.resources.resource_stack import ResourceStack +from pylabrobot.resources.utils import create_ordered_items_2d +from pylabrobot.resources.well import Well + +from .stacker import EmptyStackError, LoadingTrayOccupiedError, Stacker +from .stacker_chatterbox import StackerChatterboxBackend + + +def _plate(name: str, stacking_z_height=None) -> Plate: + return Plate( + name, + size_x=127.76, + size_y=85.48, + size_z=14.0, + ordered_items=create_ordered_items_2d( + Well, + num_items_x=1, + num_items_y=1, + dx=0, + dy=0, + dz=0, + item_dx=9, + item_dy=9, + size_x=9, + size_y=9, + size_z=10, + ), + stacking_z_height=stacking_z_height, + ) + + +def _make_stacker(num_stacks: int = 2) -> Stacker: + stacks = [ResourceStack(f"stack_{i}", "z") for i in range(num_stacks)] + return Stacker( + backend=StackerChatterboxBackend(), + name="stacker", + size_x=200, + size_y=200, + size_z=300, + stacks=stacks, + loading_tray_location=Coordinate(0, 0, 0), + ) + + +class StackerTests(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + self.stacker = _make_stacker() + await self.stacker.setup() + + async def test_setup_configures_backend_stacks(self): + self.assertEqual(self.stacker.backend.stacks, self.stacker.stacks) + + async def test_upstack_moves_plate_from_tray_to_stack(self): + plate = _plate("p1") + self.stacker.loading_tray.assign_child_resource(plate) + await self.stacker.upstack(0) + self.assertIsNone(self.stacker.loading_tray.resource) + self.assertIs(self.stacker.stacks[0].get_top_item(), plate) + self.assertIs(self.stacker.get_accessible_plate(0), plate) + + async def test_downstack_moves_accessible_plate_to_tray(self): + plate = _plate("p1") + self.stacker.loading_tray.assign_child_resource(plate) + await self.stacker.upstack(0) + returned = await self.stacker.downstack(0) + self.assertIs(returned, plate) + self.assertIs(self.stacker.loading_tray.resource, plate) + self.assertEqual(len(self.stacker.stacks[0].children), 0) + + async def test_lifo_order(self): + # upstack A then B; the accessible plate is B (last in), and downstack returns B first. + for name in ("A", "B"): + self.stacker.loading_tray.assign_child_resource(_plate(name)) + await self.stacker.upstack(0) + self.assertEqual(self.stacker.get_accessible_plate(0).name, "B") + first_out = await self.stacker.downstack(0) + self.assertEqual(first_out.name, "B") + + async def test_nesting_height_uses_stacking_z_height(self): + for name in ("A", "B", "C"): + self.stacker.loading_tray.assign_child_resource(_plate(name, stacking_z_height=10.0)) + await self.stacker.upstack(0) + # height = size_z + (N-1) * stacking_z_height = 14 + 2*10 + self.assertEqual(self.stacker.stacks[0].get_size_z(), 34.0) + # top plate (C) base sits at 2 * 10 + self.assertEqual(self.stacker.stacks[0].get_top_item().location, Coordinate(0, 0, 20.0)) + + async def test_downstack_empty_raises(self): + with self.assertRaises(EmptyStackError): + await self.stacker.downstack(0) + + async def test_downstack_with_occupied_tray_raises(self): + # put one plate in stack 0, and leave a different plate on the tray + self.stacker.loading_tray.assign_child_resource(_plate("in_stack")) + await self.stacker.upstack(0) + self.stacker.loading_tray.assign_child_resource(_plate("on_tray")) + with self.assertRaises(LoadingTrayOccupiedError): + await self.stacker.downstack(0) + + async def test_upstack_without_plate_raises(self): + with self.assertRaises(ResourceNotFoundError): + await self.stacker.upstack(0) + + async def test_get_stack_by_plate_name(self): + self.stacker.loading_tray.assign_child_resource(_plate("findme")) + await self.stacker.upstack(1) + self.assertIs(self.stacker.get_stack_by_plate_name("findme"), self.stacker.stacks[1]) + with self.assertRaises(ResourceNotFoundError): + self.stacker.get_stack_by_plate_name("nope") + + async def test_resolve_stack_rejects_foreign_stack(self): + foreign = ResourceStack("foreign", "z") + with self.assertRaises(ValueError): + await self.stacker.upstack(foreign) From c8d083729747adc4017ea4d8d39abc257912f07f Mon Sep 17 00:00:00 2001 From: Claudio Date: Thu, 25 Jun 2026 16:36:42 +0200 Subject: [PATCH 2/3] docs: document the `Stacker` capability in the storage guide Maps the documented "Random access" / "Stacking access (sequential)" retrieval patterns to the `Incubator` and new `Stacker` capabilities, and adds a `Stacker` section (loading tray, LIFO `downstack`/`upstack`, ResourceStack nesting) with a runnable `StackerChatterboxBackend` example. --- .../01_material-handling/storage/storage.rst | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/docs/user_guide/01_material-handling/storage/storage.rst b/docs/user_guide/01_material-handling/storage/storage.rst index 698d153b308..10022ce9663 100644 --- a/docs/user_guide/01_material-handling/storage/storage.rst +++ b/docs/user_guide/01_material-handling/storage/storage.rst @@ -133,6 +133,62 @@ Combined Retrieval & Access Summary LiCONiC STX Series +------------------------------------------ + +In PyLabRobot, these two retrieval patterns map to two capabilities: + +* **Random access** -> the ``Incubator`` frontend, which holds addressable + ``PlateHolder`` sites in ``PlateCarrier`` racks (any plate is directly + reachable). +* **Stacking access (sequential)** -> the ``Stacker`` capability + (``pylabrobot.storage.Stacker``), described below. + +The ``Stacker`` capability +-------------------------------------------------- + +A ``Stacker`` models one or more single-ended LIFO stacks -- each a +``ResourceStack`` with ``direction="z"`` -- plus a single transfer position, the +*loading tray* (the same term incubators use). Only the **accessible** (top) +plate of a stack can be moved without first moving the plates above it, and +plates nest by their ``stacking_z_height`` so the stack height is computed +correctly. + +Two primitives move plates between a stack and the loading tray: + +* ``downstack(stack)`` -- move the accessible plate of ``stack`` onto the loading + tray (and return it). +* ``upstack(stack, plate=None)`` -- move a plate from the loading tray onto + ``stack`` (defaults to whatever is currently on the tray). + +``Stacker`` is a *capability*, not a device-specific frontend: a machine that is +a stacker (e.g. the Agilent BenchCel or HighRes MicroServe) composes it and +provides a ``StackerBackend`` that implements the device-specific +``downstack``/``upstack`` transfers. ``StackerChatterboxBackend`` is a no-op +backend useful for trying the API out without hardware: + +.. code-block:: python + + from pylabrobot.resources import Coordinate + from pylabrobot.resources.resource_stack import ResourceStack + from pylabrobot.storage import Stacker, StackerChatterboxBackend + + stacker = Stacker( + backend=StackerChatterboxBackend(), + name="stacker", + size_x=200, + size_y=200, + size_z=300, + stacks=[ResourceStack(f"stack_{i}", "z") for i in range(4)], + loading_tray_location=Coordinate(0, 0, 0), + ) + await stacker.setup() + + # Move the accessible plate of stack 0 onto the loading tray: + plate = await stacker.downstack(0) + # ... hand it to a robot arm / reader, then return a plate from the tray: + await stacker.upstack(1) + + ------------------------------------------ .. toctree:: From a6d44e5f355f8dbe2bf8be750cc7b92d2de5911a Mon Sep 17 00:00:00 2001 From: Claudio Date: Tue, 23 Jun 2026 21:41:55 +0200 Subject: [PATCH 3/3] Add Agilent BenchCel 4R storage backend Adds an integration for the Agilent BenchCel 4R microplate handler, modeled as a PLR storage machine, plus an in-process mock TCP server that emulates the device's binary wire protocol for hardware-free development. - BenchCel4RBackend speaks the reverse-engineered framed binary protocol over TCP/7612 (reverse-engineered from VWorks captures + live tests). - BenchCel4R(...) factory returning a plain Incubator (it takes a name like all PLR device factories, since the returned Resource needs a unique name for the resource tree/serialization/lookups). - Regular motion/transfer APIs (home, move, pick/place, downstack/upstack, jog, gripper moves, load/unload, save_teachpoint, fetch/take_in_plate, ...) are unprefixed. Only opening the stacker clamps is exposed as dangerously_open_stacker_grippers, because that specific diagnostic can release/drop a plate stack. - Calculation-based labware integration: derive BenchCel geometry (StackingThickness, RobotGripperOffset, StackerGripperOffset, SensorOffset) from PLR Plate dimensions instead of bundling per-catalog XML profiles. - set_labware(...) pushes labware geometry to the device using the decoded 0x7d settings frame + 0x9f commit handshake. The 0x7d payload layout was reverse-engineered from VWorks packet captures and reproduces the captured frames byte-for-byte for standard flat microplates (StackingThickness, robot/stacker/sensor gripper offsets, full plate height at offset 37, notch flags, thresholds, and PlatePresenceThreshold at offset 75). Invalid geometry is rejected by the device with a 0x02 "labware gripper positions are too close" error, which is surfaced as BenchCelDeviceError. - BenchCelMockServer + tests (stub-based and real-TCP integration), including the 0x7d/0x9f settings handshake and its rejection path. - User guide + API docs. No files are written on the host running PLR: the device cannot read teachpoints back, but PLR does not persist any local teachpoint registry/files. Keep the numeric teachpoints you write in your own protocol/config if you need them. Known limitations / not yet covered: - Lidded and sealed plate configurations are NOT fully supported. In every captured VWorks 0x7d frame for standard flat microplates, the lidded/sealed sub-fields (lidded stacking thickness / plate thickness / resting height / gripper offset / gripper position / departure height, and sealed stacking thickness / plate thickness) plus ErrorCorrectionOffset were always zero, so their exact wire offsets could not be confirmed. They are therefore encoded as zero by to_device_payload() and are not decoded by from_device_payload(). - As a result, set_labware() should not be used for lidded or sealed labware yet, and there are no tests covering lidded/sealed geometry round-trips. Pinning these down needs captures where a lidded plate (e.g. a tip box with lid) and the sealed Seahorse plate are applied in VWorks with non-zero lidded/sealed values. - The robot/stacker/sensor gripper offsets calculated from PLR Plate dimensions are heuristics validated against standard SBS microplates; they are not part of the PLR labware definition and should be verified/overridden for unusual labware before pushing to hardware. --- CHANGELOG.md | 1 + docs/api/pylabrobot.rst | 1 + docs/api/pylabrobot.storage.rst | 71 + .../storage/agilent_benchcel.rst | 323 +++++ .../01_material-handling/storage/storage.rst | 7 +- pylabrobot/storage/__init__.py | 6 + pylabrobot/storage/agilent/__init__.py | 35 + pylabrobot/storage/agilent/benchcel.py | 90 ++ .../storage/agilent/benchcel_backend.py | 1283 +++++++++++++++++ .../storage/agilent/benchcel_labware.py | 602 ++++++++ .../storage/agilent/benchcel_mock_server.py | 437 ++++++ .../agilent/benchcel_mock_server_tests.py | 166 +++ pylabrobot/storage/agilent/benchcel_tests.py | 500 +++++++ pylabrobot/storage/agilent/stacks.py | 20 + pylabrobot/storage/cytomat/cytomat.py | 1 + 15 files changed, 3542 insertions(+), 1 deletion(-) create mode 100644 docs/api/pylabrobot.storage.rst create mode 100644 docs/user_guide/01_material-handling/storage/agilent_benchcel.rst create mode 100644 pylabrobot/storage/agilent/__init__.py create mode 100644 pylabrobot/storage/agilent/benchcel.py create mode 100644 pylabrobot/storage/agilent/benchcel_backend.py create mode 100644 pylabrobot/storage/agilent/benchcel_labware.py create mode 100644 pylabrobot/storage/agilent/benchcel_mock_server.py create mode 100644 pylabrobot/storage/agilent/benchcel_mock_server_tests.py create mode 100644 pylabrobot/storage/agilent/benchcel_tests.py create mode 100644 pylabrobot/storage/agilent/stacks.py diff --git a/CHANGELOG.md b/CHANGELOG.md index ad229bf2c2e..43cb33a370c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ### Added +- Agilent BenchCel 4R storage backend (`pylabrobot.storage.agilent.BenchCel4RBackend`) speaking the reverse-engineered binary TCP/7612 protocol, plus a `BenchCel4R(...)` factory. The BenchCel is modelled with the sequential `Stacker` capability (its four stackers are LIFO `ResourceStack`s) and the backend implements `downstack`/`upstack`. Includes calculation-based BenchCel labware helpers (preferring a plate's `stacking_z_height` for the device `StackingThickness`), a `set_labware(...)` command that pushes labware geometry to the device via the decoded `0x7d`/`0x9f` settings protocol, an in-process mock server, and a user guide. Opening the stacker clamps (which can drop a plate stack) is exposed as `dangerously_open_stacker_grippers`. - HighRes Biosolutions MicroSpin centrifuge backend (`pylabrobot.centrifuge.highres.MicroSpinBackend`) speaking the device's ASCII command/response protocol over TCP/1000, plus a `MicroSpin(...)` factory. - In-process `MicroSpinMockServer` (`pylabrobot.centrifuge.highres.mock_server`) that faithfully emulates the MicroSpin's wire protocol -- including the firmware's "`status` blocks until the spindle has stopped" semantics and the low-G spin-down-detection hang -- usable as a Python async context manager or runnable as a script (`python -m pylabrobot.centrifuge.highres.mock_server`) for `nc`/`telnet` debugging. - `MicroSpinBackend.reset()` recovery helper that issues `abort` -> `clearbuttonabort` -> `status`, using the last as the gate that genuinely confirms the rotor has stopped. diff --git a/docs/api/pylabrobot.rst b/docs/api/pylabrobot.rst index 051a61e8bd1..e6e4d16584b 100644 --- a/docs/api/pylabrobot.rst +++ b/docs/api/pylabrobot.rst @@ -19,6 +19,7 @@ Subpackages pylabrobot.only_fans pylabrobot.resources pylabrobot.scales + pylabrobot.storage pylabrobot.io.sila pylabrobot.shaking pylabrobot.temperature_controlling diff --git a/docs/api/pylabrobot.storage.rst b/docs/api/pylabrobot.storage.rst new file mode 100644 index 00000000000..831f7077cf1 --- /dev/null +++ b/docs/api/pylabrobot.storage.rst @@ -0,0 +1,71 @@ +.. currentmodule:: pylabrobot.storage + +pylabrobot.storage package +========================== + +This package contains APIs for automated storage devices and incubators. + +.. autosummary:: + :toctree: _autosummary + :nosignatures: + :recursive: + + incubator.Incubator + stacker.Stacker + agilent.benchcel.BenchCel4R + + +Backends +-------- + +.. autosummary:: + :toctree: _autosummary + :nosignatures: + :recursive: + + backend.IncubatorBackend + stacker_backend.StackerBackend + stacker_chatterbox.StackerChatterboxBackend + agilent.benchcel_backend.BenchCel4RBackend + cytomat.cytomat.CytomatBackend + liconic.liconic_backend.ExperimentalLiconicBackend + + +Agilent BenchCel support classes +-------------------------------- + +.. autosummary:: + :toctree: _autosummary + :nosignatures: + :recursive: + + agilent.benchcel_backend.Frame + agilent.benchcel_backend.SensorStatus + agilent.benchcel_backend.ArmStatus + agilent.benchcel_backend.GeneralStatus + agilent.benchcel_backend.Teachpoint + agilent.benchcel_backend.AxisBoundsResponse + agilent.benchcel_backend.CurrentPositionResponse + agilent.benchcel_labware.BenchCelLabwareSettings + agilent.benchcel_labware.PlateNotchSettings + agilent.benchcel_labware.apply_benchcel_labware_settings + agilent.benchcel_labware.calculate_benchcel_labware_settings + agilent.benchcel_labware.calculate_robot_gripper_offset + agilent.benchcel_labware.calculate_sensor_offset + agilent.benchcel_labware.calculate_stacker_gripper_offset + agilent.benchcel_labware.calculate_stacking_thickness + agilent.benchcel_mock_server.BenchCelMockServer + agilent.stacks.benchcel_4r_stacks + + +Errors +------ + +.. autosummary:: + :toctree: _autosummary + :nosignatures: + :recursive: + + agilent.benchcel_backend.BenchCelDeviceError + agilent.benchcel_backend.BenchCelProtocolError + agilent.benchcel_backend.BenchCelTimeoutError diff --git a/docs/user_guide/01_material-handling/storage/agilent_benchcel.rst b/docs/user_guide/01_material-handling/storage/agilent_benchcel.rst new file mode 100644 index 00000000000..a54b71160a4 --- /dev/null +++ b/docs/user_guide/01_material-handling/storage/agilent_benchcel.rst @@ -0,0 +1,323 @@ +Agilent BenchCel 4R +=================== + +The Agilent BenchCel Microplate Handler is an open, sequential stacker system +for moving SBS/ANSI-format plates between vertical stackers and one or more +taught robot-accessible positions. PyLabRobot supports the four-stacker +configuration with +:class:`~pylabrobot.storage.agilent.benchcel_backend.BenchCel4RBackend` and the +:func:`~pylabrobot.storage.agilent.benchcel.BenchCel4R` factory. + +.. warning:: + + The BenchCel protocol implemented here is reverse-engineered from Agilent + VWorks packet captures and live tests, not vendor Ethernet documentation. + Keep the robot/stacker area clear, make sure E-stop/power-off is available, + and ensure VWorks or any other control client is disconnected before issuing + motion commands. + +Manual safety notes +------------------- + +The Agilent BenchCel Microplate Handler R-Series Quick Guide (G5400-90003A) +contains several operational details that matter when automating the device: + +* The pendant has a red robot-disable button. Pressing it cuts power to the + motors and stops motion. +* Compressed air drives the stacker-head mechanisms. Power and compressed air + must be on for normal operation and rack install/removal workflows. +* The stacker clamps (also called stacker grippers) hold or release the bottom + plate at the base of a rack. They normally open/close automatically during + loading, unloading, downstacking, and stacking. Manual open/close is a + diagnostic/recovery action and can drop plates if the stack is unsupported. +* The stacker shelves temporarily support and level plates during downstacking + and upstacking. Retracting shelves can drop plates. PyLabRobot does not expose + a shelf command yet because the captured shelf-related command is not mapped + with enough confidence. +* The BenchCel is designed for ANSI/SBS-compatible labware. It typically grips + plates 5-10 mm above the bottom, between the plate top and skirt. Deep lids or + flexible skirts can cause unreliable gripping or accidental lid removal. + +Connection +---------- + +The BenchCel uses a framed binary protocol over TCP. The default IP address in +our captures was ``192.168.0.100`` and the observed TCP port was ``7612``. + +.. code-block:: python + + from pylabrobot.storage.agilent import BenchCel4RBackend + + backend = BenchCel4RBackend( + host="192.168.0.100", + port=7612, + # Optional: bind to the BenchCel-facing network interface on multi-NIC hosts. + source_ip="192.168.0.200", + ) + await backend.setup() + +Labware profiles and PLR plate dimensions +------------------------------------------ + +BenchCel/VWorks labware XML uses device-specific dimensions that are not exactly +identical to the dimensions PLR needs: + +* ``StackingThickness`` is the vertical pitch between nested plates in a stack. + It is usually smaller than the full plate height. +* PLR plate ``size_z`` should be the full outside plate height. +* ``RobotGripperOffset`` is the robot gripper contact height from the bottom of + the plate. In PLR this maps to ``plate.preferred_pickup_location.z`` and to + ``pickup_distance_from_top = plate.size_z - RobotGripperOffset``. + +PyLabRobot calculates BenchCel labware geometry from a PLR plate resource rather +than bundling per-catalog XML profiles. The default calculation is: + +* ``StackingThickness = plate.size_z - 1.5 mm``. Override + ``nesting_overlap`` if a plate family nests differently. +* ``RobotGripperOffset`` is kept between 5 and 8 mm from the bottom while + preserving at least about 5.4 mm above the grip point where possible. +* ``StackerGripperOffset`` is estimated as 4 mm for low-profile plates, 5 mm for + standard plates, and 6 mm for tall/deep plates. +* ``SensorOffset`` is estimated as 7 mm for low-profile plates, 8 mm for standard + plates, and near the top of tall/deep plates. + +The supplied example XML/dimension pairs were used to choose the defaults, but +optical thresholds and exact nesting behavior cannot be perfectly inferred from +outside dimensions. Pass explicit overrides when your measured/VWorks values +are known. + +.. code-block:: python + + from pylabrobot.resources.plate import Plate + from pylabrobot.storage.agilent import ( + BenchCel4R, + apply_benchcel_labware_settings, + calculate_benchcel_labware_settings, + ) + + plate = Plate("p1", size_x=127.76, size_y=85.48, size_z=10.4, ordered_items={}) + settings = apply_benchcel_labware_settings(plate) + assert settings.robot_gripper_offset == 5.0 + assert plate.preferred_pickup_location.z == 5.0 + + benchcel = BenchCel4R( + name="benchcel", + host="192.168.0.100", + labware=settings, + ) + + # If you know the exact nesting overlap for a plate family, override it. + settings = calculate_benchcel_labware_settings(plate, nesting_overlap=1.3) + +You can also parse user-supplied VWorks XML labware files with +:meth:`~pylabrobot.storage.agilent.benchcel_labware.BenchCelLabwareSettings.from_xml_file` +and supply the full measured/manufacturer plate dimensions. This is useful for +comparing calculated values against the current VWorks settings on the BenchCel +laptop, but the integration does not bundle those XML profiles. + +.. note:: + + If validation says a PLR plate resource has the wrong height, do not use + BenchCel stacker motion until the PLR resource and the VWorks labware profile + agree. For example, ``StackingThickness`` is not an acceptable substitute for + PLR ``size_z``. + +Pushing labware settings to the device +-------------------------------------- + +VWorks pushes the active labware geometry to the BenchCel over TCP whenever you +apply labware settings. This was confirmed from packet captures: the laptop +sends a 77-byte ``0x7d`` settings frame followed by an empty ``0x9f`` commit that +the device echoes back. Invalid geometry (for example, gripper hold positions +that are not separated) is rejected with a ``0x02`` error such as +``"The labware gripper positions are too close"``. + +The backend can push the same settings directly, so you do not need VWorks to +configure the active labware. The payload encoder is byte-for-byte compatible +with the captured VWorks frames for standard flat microplates and includes the +full plate height (offset 37), gripper offsets, sensor thresholds, notch options, +and ``PlatePresenceThreshold`` (offset 75). + +.. code-block:: python + + from pylabrobot.resources.plate import Plate + from pylabrobot.storage.agilent import BenchCel4RBackend + + backend = BenchCel4RBackend(host="192.168.0.100") + await backend.setup() + + plate = Plate("p1", size_x=127.76, size_y=85.48, size_z=14.4, ordered_items={}) + # Calculates geometry from the plate, encodes 0x7d, sends 0x7d + 0x9f, and + # raises BenchCelDeviceError if the device rejects the geometry. + settings = await backend.set_labware(plate) + + # Or push an explicit settings object / serialized dict. + await backend.set_labware(settings) + +.. warning:: + + The lidded/sealed sub-fields and ``ErrorCorrectionOffset`` were always zero + in the captures for standard flat microplates and are not yet mapped, so they + are sent as zero. Pushing settings for lidded or sealed labware is therefore + not fully supported yet. + +Status and sensors +------------------ + +VWorks continuously polls each stacker's sensors and a general arm-status frame. +The backend exposes decoded helpers for both. + +.. code-block:: python + + sensors = await backend.request_all_stacker_sensors() + for sensor in sensors: + print(sensor.stacker, sensor.air_pressure, sensor.plate_presence) + + arm = await backend.request_arm_status() + print(arm.theta, arm.x, arm.z, arm.gripper) + + bounds = await backend.request_axis_bounds() + print(bounds.x_min, bounds.x_max) + +Stacker and teachpoint moves +---------------------------- + +Stackers are addressed as human numbers 1 through 4 in the high-level API. The +wire protocol uses zero-based target IDs internally. + +.. code-block:: python + + await backend.home() + await backend.move_to_stacker(3) + await backend.fully_open_grippers() + await backend.downstack_plate(3) + await backend.upstack_plate(4) + +VWorks packet captures confirm that ``downstack_plate`` / ``upstack_plate`` are +exactly what the VWorks "Downstack" / "Upstack" buttons emit (a single ``0x62`` +/ ``0x63`` robot pick/place at the stacker target), and that ``load_stacker`` / +``unload_stacker`` are the distinct ``0x60`` / ``0x61`` stacker-mechanism commands +behind the "Load" / "Unload" buttons. + +If a stacker mechanism is in a bad state, the real device may request an unload +then load recovery cycle, or report errors such as ``Stack not loaded``, +``Stacker shelf position error``, ``Stacker shelf not retracted``, or +``Stacker gripper extended``. The stacker load/unload methods operate the +stacker mechanism, not the robot grippers. + +.. code-block:: python + + await backend.unload_stacker(3) + await backend.load_stacker(3) + +The backend also exposes the diagnostic stacker clamp command observed as +``0x67``. Opening the stacker clamps can release/drop a plate stack, so that one +operation keeps a ``dangerously_`` prefix; use it only for recovery/diagnostics +and only when the plate stack is physically supported. + +.. code-block:: python + + await backend.dangerously_open_stacker_grippers(1) # can drop plates + await backend.close_stacker_grippers(1) + +Teachpoints +----------- + +BenchCel transfer points are numeric teachpoint slots. The VWorks "right" +teachpoint observed in captures used target ID ``0x1e``. Live tests confirmed +that teachpoints can be written standalone with command ``0x73``; an undefined +teachpoint slot may move the arm to a home-like pose instead of to the desired +location. + +.. code-block:: python + + from pylabrobot.storage.agilent import Teachpoint + + await backend.save_teachpoint( + Teachpoint( + teachpoint_id=0x1e, + theta=0.0, + x=350.0, + z=0.0, + approach_height=20.0, + cavity_depth=0.0, + gripper_open_limit=-1.5, + respect_approach_height_when_not_holding_plate=True, + something_above_this_point=False, + name="right-transfer", # metadata only; not sent to the BenchCel + ), + ) + await backend.move_to_teachpoint(0x1e, approach_height=20.0) + +The BenchCel does not provide a known command to read saved teachpoints back, +and PyLabRobot does not write any files on the host. If you need to keep the +exact numeric teachpoints you wrote, persist them in your own protocol/config. + +Using with Stacker resource state +--------------------------------- + +A BenchCel stacker is a single-ended **LIFO stack of nesting plates**, so PyLabRobot +models the BenchCel with the :class:`~pylabrobot.storage.Stacker` capability (the +sequential/"stacking access" counterpart of the random-access +:class:`~pylabrobot.storage.incubator.Incubator`). The convenience factory returns a +``Stacker`` whose four stacks are :class:`~pylabrobot.resources.resource_stack.ResourceStack` +objects (``direction="z"``); each stack's height follows the plates' ``stacking_z_height``, and +only the **stacker identity** (1-4) is sent to the device. Two transfers move plates between a +stack and the loading tray: + +* ``downstack(stack)`` -- move the accessible (top) plate of ``stack`` onto the loading tray. +* ``upstack(stack, plate=None)`` -- move a plate from the loading tray onto ``stack``. + +.. important:: + + The BenchCel has **no fixed loading/unloading position** -- the transfer + point is a teachpoint you taught in VWorks (or with ``save_teachpoint``). + ``downstack`` and ``upstack`` therefore require a teachpoint, either via + ``loading_tray_teachpoint_id=`` on the factory/backend or ``teachpoint_id=`` + per call. They raise if none is configured, because an unset/wrong teachpoint + can send the arm to a home-like pose. + + The ``Stacker.loading_tray`` resource and its ``loading_tray_location`` + ``Coordinate`` are **cosmetic** -- they are only used for the PLR resource tree + and visualization and do not drive any motion. The physical transfer position + is determined entirely by the teachpoint ID on the device. PLR also models a + single loading tray, so it cannot represent transfers to several different + teachpoints distinctly. + +.. code-block:: python + + from pylabrobot.resources.corning.plates import Cor_96_wellplate_360ul_Fb + from pylabrobot.storage.agilent import BenchCel4R, apply_benchcel_labware_settings + + plate = Cor_96_wellplate_360ul_Fb(name="plate_1") + settings = apply_benchcel_labware_settings(plate) + + benchcel = BenchCel4R( + name="benchcel", + host="192.168.0.100", + loading_tray_teachpoint_id=0x1e, + labware=settings, + # Provide your instrument's calibrated deck footprint for visualization. + size_x=0, + size_y=0, + size_z=0, + ) + + benchcel.stacks[2].assign_child_resource(plate) # PLR state: plate on stacker 3 + + await benchcel.setup() + # Downstack the accessible plate of stacker 3 onto the loading tray: + fetched = await benchcel.downstack(2, teachpoint_id=0x1e) + # Upstack it onto stacker 4: + await benchcel.upstack(3, fetched, teachpoint_id=0x1e) + +Mock server +----------- + +The implementation includes an in-process mock server for tests and debugging. +It speaks the same framed protocol and can also be run as a small standalone TCP +server. + +.. code-block:: bash + + python -m pylabrobot.storage.agilent.benchcel_mock_server --port 7612 diff --git a/docs/user_guide/01_material-handling/storage/storage.rst b/docs/user_guide/01_material-handling/storage/storage.rst index 10022ce9663..20e22e309e9 100644 --- a/docs/user_guide/01_material-handling/storage/storage.rst +++ b/docs/user_guide/01_material-handling/storage/storage.rst @@ -7,6 +7,7 @@ A storage machine is defined as a **machine whose primary feature is** Examples of this simplest form of a storage machine include: +- `Agilent BenchCel Microplate Handler `_ - open sequential stacker storage with robotic plate handling - `Agilent Labware MiniHub `_ - open storage of labware with rotation feature - `Lab Services PlateCarousel `_ - open storage of labware with rotation feature @@ -83,6 +84,7 @@ Retrieval Pattern: Stacking (Sequential) vs. Random Access - More flexible but mechanically complex. * - **Examples:** + - Agilent BenchCel 4R - Agilent Labware MiniHub - Lab Services PlateCarousel - **Examples:** @@ -108,6 +110,7 @@ Accessibility: Open vs. Closed Storage * - No protection from contamination or temperature drift. - Ideal for incubators, cold storage, and sterile handling. * - **Examples:** + - Agilent BenchCel 4R - Agilent Labware MiniHub - Manual stackers - **Examples:** @@ -124,7 +127,8 @@ Combined Retrieval & Access Summary - **Open Storage** - **Closed Storage** * - **Stacking Access (Sequential)** - - Agilent Labware MiniHub + - Agilent BenchCel 4R + Agilent Labware MiniHub Lab Services PlateCarousel - STX incubators with drawer-based shelves * - **Random Access** @@ -195,6 +199,7 @@ backend useful for trying the API out without hardware: :maxdepth: 1 :hidden: + agilent_benchcel cytomat inheco/incubator_shaker inheco/scila diff --git a/pylabrobot/storage/__init__.py b/pylabrobot/storage/__init__.py index 6fbef36430a..57a46aa8990 100644 --- a/pylabrobot/storage/__init__.py +++ b/pylabrobot/storage/__init__.py @@ -1,3 +1,9 @@ +from .agilent import ( + BenchCel4R, + BenchCel4RBackend, + BenchCelBackend, + BenchCelLabwareSettings, +) from .backend import IncubatorBackend from .chatterbox import IncubatorChatterboxBackend from .cytomat import CytomatBackend diff --git a/pylabrobot/storage/agilent/__init__.py b/pylabrobot/storage/agilent/__init__.py new file mode 100644 index 00000000000..7daaee28ae1 --- /dev/null +++ b/pylabrobot/storage/agilent/__init__.py @@ -0,0 +1,35 @@ +from .benchcel import BenchCel4R +from .benchcel_backend import ( + AXIS_GRIPPER, + AXIS_NAMES, + AXIS_THETA, + AXIS_X, + AXIS_Z, + RIGHT_TEACHPOINT_ID, + TEST_LEFT_TEACHPOINT, + TEST_LEFT_TEACHPOINT_ID, + AxisBoundsResponse, + BenchCel4RBackend, + BenchCelBackend, + BenchCelDeviceError, + BenchCelProtocolError, + BenchCelTimeoutError, + CurrentPositionResponse, + Frame, + GeneralStatus, + SensorStatus, + Teachpoint, +) +from .benchcel_labware import ( + DEVICE_PAYLOAD_LENGTH, + BenchCelLabwareSettings, + PlateNotchSettings, + apply_benchcel_labware_settings, + benchcel_labware_summary_row, + calculate_benchcel_labware_settings, + calculate_robot_gripper_offset, + calculate_sensor_offset, + calculate_stacker_gripper_offset, + calculate_stacking_thickness, +) +from .stacks import benchcel_4r_stacks diff --git a/pylabrobot/storage/agilent/benchcel.py b/pylabrobot/storage/agilent/benchcel.py new file mode 100644 index 00000000000..e651293cc02 --- /dev/null +++ b/pylabrobot/storage/agilent/benchcel.py @@ -0,0 +1,90 @@ +"""Factory for the Agilent BenchCel 4R storage device.""" + +from __future__ import annotations + +from typing import List, Optional, Union + +from pylabrobot.resources import Coordinate, Plate +from pylabrobot.resources.resource_stack import ResourceStack +from pylabrobot.storage.stacker import Stacker + +from .benchcel_backend import BenchCel4RBackend +from .benchcel_labware import BenchCelLabwareSettings, resolve_benchcel_labware_settings +from .stacks import benchcel_4r_stacks + + +def BenchCel4R( + name: str, + host: str, + *, + port: int = BenchCel4RBackend.DEFAULT_PORT, + timeout: float = 30.0, + read_poll_timeout: float = 0.25, + loading_tray_teachpoint_id: Optional[int] = None, + source_ip: Optional[str] = None, + backend: Optional[BenchCel4RBackend] = None, + stacks: Optional[List[ResourceStack]] = None, + labware: Optional[Union[Plate, BenchCelLabwareSettings, dict]] = None, + loading_tray_location: Optional[Coordinate] = None, + size_x: float = 0.0, + size_y: float = 0.0, + size_z: float = 0.0, +) -> Stacker: + """Construct an Agilent BenchCel 4R as a PLR :class:`~pylabrobot.storage.Stacker`. + + The BenchCel is a sequential ("stacking access") storage device: each of its four stackers is a + single-ended LIFO stack of plates. It is therefore modelled with the ``Stacker`` capability + rather than the random-access ``Incubator``. The generated ``ResourceStack`` stacks track the + expected plate order/content of each stacker; their height follows each plate's + ``stacking_z_height``. + + Args: + name: Resource name for the returned :class:`~pylabrobot.storage.Stacker`. Like all PLR + resources, the BenchCel needs a unique name for the resource tree, serialization, and + lookups (consistent with other device factories such as ``MicroSpin(...)``). + host: IP address or DNS name of the BenchCel Ethernet interface. + loading_tray_teachpoint_id: Teachpoint target ID used as the transfer point by + ``downstack``/``upstack``. The BenchCel has no fixed loading position; this must be a + teachpoint taught on the device. Transfers raise unless it is set here or passed per call. + There is no default because an unset/wrong teachpoint can send the arm to a home-like pose. + stacks: Optionally provide custom ``ResourceStack`` stacks; defaults to four generic stacks. + loading_tray_location: Cosmetic only. The ``Coordinate`` of the ``Stacker.loading_tray`` + resource used for the resource tree and visualization. It does NOT drive any motion -- the + real transfer position is determined entirely by ``loading_tray_teachpoint_id`` on the + device. + """ + labware_settings = resolve_benchcel_labware_settings(labware) if labware is not None else None + + if backend is None: + backend = BenchCel4RBackend( + host=host, + port=port, + timeout=timeout, + read_poll_timeout=read_poll_timeout, + loading_tray_teachpoint_id=loading_tray_teachpoint_id, + source_ip=source_ip, + labware=labware_settings, + ) + elif labware_settings is not None: + existing_labware = backend.labware_settings + if existing_labware is not None and existing_labware != labware_settings: + raise ValueError( + "BenchCel4R backend labware " + f"{existing_labware.name!r} does not match factory labware " + f"{labware_settings.name!r}" + ) + backend.labware_settings = labware_settings + + if stacks is None: + stacks = benchcel_4r_stacks() + + return Stacker( + backend=backend, + name=name, + size_x=size_x, + size_y=size_y, + size_z=size_z, + stacks=stacks, + loading_tray_location=loading_tray_location or Coordinate.zero(), + model="Agilent BenchCel 4R", + ) diff --git a/pylabrobot/storage/agilent/benchcel_backend.py b/pylabrobot/storage/agilent/benchcel_backend.py new file mode 100644 index 00000000000..18047ac890f --- /dev/null +++ b/pylabrobot/storage/agilent/benchcel_backend.py @@ -0,0 +1,1283 @@ +"""Backend for the Agilent BenchCel 4R microplate handler. + +The BenchCel exposes a binary TCP protocol on port 7612 by default. This module +implements the command set reverse-engineered from Agilent VWorks packet +captures and live tests against a BenchCel / 4-stacker system running firmware +3.2.20.0. + +This is NOT vendor protocol documentation. The public Agilent Quick Guide +(G5400-90003) documents operation, safety, stacker clamps/shelves, labware +requirements, and diagnostic workflows, but not the Ethernet wire protocol. + +Protocol summary +---------------- +Every application frame observed so far has the shape:: + + [1 byte command_id][2 byte little-endian payload_length][payload] + +Host -> BenchCel commands implemented here: + +* ``0x47`` home motors. Live tests showed this drops the TCP session while the + device homes, then accepts connections again when homing is complete. +* ``0x48`` home. +* ``0x60`` / ``0x61`` stacker load / unload. These operate the stacker + mechanism, not the robot grippers, and are what the VWorks "Load"/"Unload" + buttons emit (confirmed from captures). ``0x60`` payload is ``01 ``; + ``0x61`` payload is ``01 00 00 00 00``. Their ``0x69`` ACK echoes the + command id and the stacker index, e.g. ``60 `` / ``61 ``. +* ``0x62`` / ``0x63`` pick/downstack and place/upstack for stackers or + teachpoints. Stackers are target IDs ``0x00``..``0x03``. Captures confirm the + VWorks "Downstack" button emits a single ``0x62`` and "Upstack" a single + ``0x63``, each with payload ``01 00 01`` (so ``0x62``/``0x63`` ARE + the VWorks downstack/upstack tasks when the target is a stacker; with a + teachpoint target they are a plain pick/place at that taught position). +* ``0x65`` move to stacker/teachpoint. +* ``0x66`` relative jog: axis 0 theta, 1 X, 2 Z, 3 robot gripper. +* ``0x67`` open/close pneumatic stacker grippers/clamps. These are diagnostics + and can drop plates if used when a stack is unsupported. +* ``0x6a`` full-open/full-close robot grippers. +* ``0x73`` save teachpoint. Captures did not show a command-specific ACK. +* ``0x7e`` stacker sensor query. +* ``0x85`` current-position read. Live tests showed the selector is ignored; + this is not a stored teachpoint reader. +* ``0x87`` general/arm status query. The decoded float32 fields are theta, X, + Z, and robot gripper position at offsets 4, 12, 20, and 28. +* ``0x99`` axis bounds query. Live tests decoded this as theta/X/Z/gripper + min/max limits, not as stored teachpoint data. + +Device errors are returned as ``0x02`` frames containing an ASCII message, for +example ``"X position out of bounds"``. Successful motion commands return a +``0x69`` ACK after motion is complete. Plate load/unload ACKs include the +stacker index in addition to the command ID. + +Safety +------ +Keep the robot and stacker area clear, make sure E-stop/power-off is available, +and ensure VWorks or any other control client is disconnected before using this +backend. The BenchCel appears to allow only one effective control client at a +time; if another client owns the session, connections may be accepted and then +immediately closed. + +Manual notes from Agilent G5400-90003A that matter for automation: + +* The pendant has a red robot-disable button that cuts power to the motors. +* Compressed air drives the stacker-head mechanisms; air must be on for normal + operation and for rack install/removal workflows. +* Stacker clamps/grippers hold/release the bottom plate at the rack base. Opening + clamps can release/drop a plate stack. The manual says clamps normally open and + close automatically during loading, unloading, downstacking, and stacking; use + manual open/close only for diagnostics/recovery. +* Stacker shelves temporarily support/level the stack during downstack/upstack. + Retracting shelves can drop plates. We have not exposed a shelf command because + the captured command is not yet confidently mapped. +* Labware should be ANSI/SBS-compatible. The BenchCel typically grips plates + about 5-10 mm above the bottom, between the top of the plate and the skirt. + Deep lids/flexible skirts can be problematic. +""" + +from __future__ import annotations + +import asyncio +import dataclasses +import logging +import struct +from typing import Callable, List, Optional, Tuple, Union + +from pylabrobot.io.socket import Socket +from pylabrobot.resources import Plate +from pylabrobot.resources.resource_stack import ResourceStack +from pylabrobot.storage.stacker_backend import StackerBackend + +from .benchcel_labware import BenchCelLabwareSettings, resolve_benchcel_labware_settings + +logger = logging.getLogger(__name__) + +# Stackers use target IDs 0x00..0x03 (same as zero-based stacker index). The +# right teachpoint captured from VWorks used 0x1e. Live tests confirmed that +# teachpoint slots can be written standalone with command 0x73; an undefined +# teachpoint slot may instead send the arm to a home-like position. +RIGHT_TEACHPOINT_ID = 0x1E +TEST_LEFT_TEACHPOINT_ID = 0x1F + + +class BenchCelProtocolError(RuntimeError): + """Raised when the BenchCel sends malformed or unexpected protocol data.""" + + +class BenchCelTimeoutError(TimeoutError): + """Raised when an expected BenchCel frame is not received in time.""" + + +class BenchCelDeviceError(RuntimeError): + """Raised when the BenchCel returns a ``0x02`` error frame. + + Attributes: + message: The decoded ASCII error string returned by the BenchCel. + frame: The raw error frame. + """ + + def __init__(self, message: str, frame: "Frame") -> None: + super().__init__(message) + self.message = message + self.frame = frame + + +@dataclasses.dataclass(frozen=True) +class Frame: + """One application-level BenchCel protocol frame.""" + + command_id: int + payload: bytes = b"" + + @property + def length(self) -> int: + return len(self.payload) + + def to_bytes(self) -> bytes: + """Serialize frame as ``[cmd][uint16le length][payload]``.""" + return make_frame(self.command_id, self.payload) + + def hex(self) -> str: + """Return full serialized frame bytes as lowercase hex.""" + return self.to_bytes().hex() + + def __str__(self) -> str: + return f"Frame(cmd=0x{self.command_id:02x}, len={self.length}, payload={self.payload.hex()})" + + +@dataclasses.dataclass(frozen=True) +class SensorStatus: + """Decoded ``0x7e`` stacker sensor/status response. + + Fields are named according to the reverse-engineered interpretation. The four + notch sensor names A-D are arbitrary labels until the physical sensor positions + are mapped. ``plate_presence`` is analog-ish; observed empty stackers were + around 0-1 and stackers with plates around 116-129. + """ + + stacker: int + stacker_index: int + constant_08: int + air_pressure: int + notch_sensor_a: int + notch_sensor_b: int + unknown_a: int + plate_presence: int + unknown_b: int + notch_sensor_c: int + notch_sensor_d: int + raw_payload: bytes + + def plate_present(self, threshold: int = 50) -> bool: + """Return a rough plate-present boolean from the analog presence value.""" + return self.plate_presence >= threshold + + def notch_values(self) -> Tuple[int, int, int, int]: + """Return the four binary notch sensor fields as currently mapped.""" + return ( + self.notch_sensor_a, + self.notch_sensor_b, + self.notch_sensor_c, + self.notch_sensor_d, + ) + + +@dataclasses.dataclass(frozen=True) +class ArmStatus: + """Partially decoded ``0x87`` general status response. + + The remaining bytes are still unknown and preserved in ``raw_payload``. + """ + + theta: float + x: float + z: float + gripper: float + raw_payload: bytes + + +@dataclasses.dataclass(frozen=True) +class GeneralStatus: + """``0x87`` general status response.""" + + raw_payload: bytes + arm_status: Optional[ArmStatus] = None + + +@dataclasses.dataclass(frozen=True) +class Teachpoint: + """Numeric teachpoint data for command ``0x73``. + + The human-readable name is metadata only and is not serialized. It did not + appear in the VWorks packet captures. + """ + + theta: float + x: float + z: float + approach_height: float + cavity_depth: float + gripper_open_limit: float + respect_approach_height_when_not_holding_plate: bool + something_above_this_point: bool + teachpoint_id: int = TEST_LEFT_TEACHPOINT_ID + name: Optional[str] = None + + +TEST_LEFT_TEACHPOINT = Teachpoint( + theta=89.99874114990234, + x=-360.8802795410156, + z=-10.0, + approach_height=20.0, + cavity_depth=0.0, + gripper_open_limit=-1.5, + respect_approach_height_when_not_holding_plate=True, + something_above_this_point=False, + teachpoint_id=TEST_LEFT_TEACHPOINT_ID, + name="test-left", +) + + +@dataclasses.dataclass(frozen=True) +class AxisBoundsResponse: + """Decoded response to command ``0x99`` (axis min/max travel limits).""" + + theta_min: float + x_min: float + z_min: float + gripper_min: float + theta_max: float + x_max: float + z_max: float + gripper_max: float + raw_payload: bytes + float_values: Tuple[float, ...] + + +@dataclasses.dataclass(frozen=True) +class CurrentPositionResponse: + """Response to command ``0x85``. + + Live tests showed the selector byte is ignored and the response is the current + arm position/config payload, not a stored teachpoint reader. Raw bytes are + preserved because only the status ``0x87`` layout is currently decoded. + """ + + selector: int + raw_payload: bytes + + +# --------------------------------------------------------------------------- +# Low-level protocol helpers +# --------------------------------------------------------------------------- + + +def make_frame(command_id: int, payload: bytes = b"") -> bytes: + """Build a BenchCel protocol frame.""" + if not 0 <= command_id <= 0xFF: + raise ValueError(f"command_id must fit in one byte, got {command_id!r}") + if len(payload) > 0xFFFF: + raise ValueError(f"payload too large: {len(payload)} bytes") + return bytes([command_id]) + len(payload).to_bytes(2, "little") + payload + + +def parse_frame_from_buffer(buffer: bytearray) -> Optional[Frame]: + """Parse one complete frame from the front of ``buffer``, if available. + + TCP packet boundaries are not protocol boundaries. This function removes one + full frame from ``buffer`` only when the complete payload is available. + """ + if len(buffer) < 3: + return None + command_id = buffer[0] + length = int.from_bytes(buffer[1:3], "little") + total = 3 + length + if len(buffer) < total: + return None + payload = bytes(buffer[3:total]) + del buffer[:total] + return Frame(command_id, payload) + + +def split_frames(data: bytes) -> List[Frame]: + """Split a byte string containing one or more complete frames.""" + buffer = bytearray(data) + frames: List[Frame] = [] + while buffer: + frame = parse_frame_from_buffer(buffer) + if frame is None: + raise BenchCelProtocolError(f"partial/truncated frame data: {bytes(buffer).hex()}") + frames.append(frame) + return frames + + +def _u16le(payload: bytes, offset: int) -> int: + return int.from_bytes(payload[offset : offset + 2], "little") + + +def _f32le(payload: bytes, offset: int) -> float: + return float(struct.unpack(" int: + """Validate human 1-based stacker number and return zero-based protocol index.""" + if stacker not in (1, 2, 3, 4): + raise ValueError(f"stacker must be 1, 2, 3, or 4; got {stacker!r}") + return stacker - 1 + + +def _target_id(target_id: int) -> int: + if not 0 <= target_id <= 0xFF: + raise ValueError(f"target_id must fit in one byte, got {target_id!r}") + return target_id + + +# Command IDs from VWorks captures/live tests. The backend methods construct +# frames directly, using private payload helpers for nontrivial binary layouts. +CMD_ERROR = 0x02 +CMD_HOME_MOTORS = 0x47 +CMD_HOME = 0x48 +CMD_LOAD_PLATE = 0x60 +CMD_UNLOAD_PLATE = 0x61 +CMD_PICK = 0x62 +CMD_PLACE = 0x63 +CMD_MOVE_TO_TARGET = 0x65 +CMD_JOG = 0x66 +CMD_STACKER_GRIPPER = 0x67 +CMD_ROBOT_GRIPPER = 0x6A +CMD_ACK = 0x69 +CMD_SAVE_TEACHPOINT = 0x73 +CMD_SET_LABWARE = 0x7D +CMD_SENSOR_STATUS = 0x7E +CMD_CURRENT_POSITION = 0x85 +CMD_GENERAL_STATUS = 0x87 +CMD_SETTINGS_COMMIT = 0x9F +CMD_AXIS_BOUNDS = 0x99 + + +def _target_payload(target_id: int) -> bytes: + """Shared payload for pick/place target commands.""" + return bytes([0x01, _target_id(target_id), 0x00, 0x01]) + + +def _move_to_target_payload(target_id: int, approach_height: float) -> bytes: + """Payload for command ``0x65`` (move to stacker/teachpoint target).""" + return struct.pack(" bytes: + """Payload for command ``0x73`` (save teachpoint).""" + if not 0 <= teachpoint.teachpoint_id <= 0xFF: + raise ValueError(f"teachpoint_id must fit in one byte, got {teachpoint.teachpoint_id!r}") + return struct.pack( + " str: + """Parse a ``0x02`` device error frame and return the ASCII message.""" + if frame.command_id != CMD_ERROR: + raise BenchCelProtocolError(f"not an error frame: {frame}") + return frame.payload.decode("ascii", errors="replace") + + +def parse_ack_frame(frame: Frame) -> int: + """Parse a standard ``0x69`` command completion ACK and return the command id.""" + if frame.command_id != CMD_ACK or len(frame.payload) < 1: + raise BenchCelProtocolError(f"not an ACK frame: {frame}") + return frame.payload[0] + + +def parse_sensor_response(frame: Frame) -> SensorStatus: + """Parse a ``0x7e`` / 18-byte stacker sensor response.""" + if frame.command_id != CMD_SENSOR_STATUS: + raise BenchCelProtocolError(f"expected 0x7e sensor frame, got {frame}") + if len(frame.payload) != 18: + raise BenchCelProtocolError( + f"expected 18-byte sensor payload, got {len(frame.payload)}: {frame}" + ) + + p = frame.payload + stacker_index = p[0] + if stacker_index not in (0, 1, 2, 3): + raise BenchCelProtocolError(f"unexpected stacker index in sensor payload: {stacker_index}") + + return SensorStatus( + stacker=stacker_index + 1, + stacker_index=stacker_index, + constant_08=p[1], + air_pressure=_u16le(p, 2), + notch_sensor_a=_u16le(p, 4), + notch_sensor_b=_u16le(p, 6), + unknown_a=_u16le(p, 8), + plate_presence=_u16le(p, 10), + unknown_b=_u16le(p, 12), + notch_sensor_c=_u16le(p, 14), + notch_sensor_d=_u16le(p, 16), + raw_payload=p, + ) + + +def parse_arm_status_from_87_payload(payload: bytes) -> ArmStatus: + """Decode known arm-position fields from a 66-byte ``0x87`` payload.""" + if len(payload) != 66: + raise BenchCelProtocolError(f"expected 66-byte 0x87 payload, got {len(payload)} bytes") + return ArmStatus( + theta=_f32le(payload, 4), + x=_f32le(payload, 12), + z=_f32le(payload, 20), + gripper=_f32le(payload, 28), + raw_payload=payload, + ) + + +def parse_general_status_response(frame: Frame) -> GeneralStatus: + """Parse the ``0x87`` general status response.""" + if frame.command_id != CMD_GENERAL_STATUS: + raise BenchCelProtocolError(f"expected 0x87 general status frame, got {frame}") + arm_status = parse_arm_status_from_87_payload(frame.payload) if len(frame.payload) == 66 else None + return GeneralStatus(raw_payload=frame.payload, arm_status=arm_status) + + +def parse_axis_bounds_response(frame: Frame) -> AxisBoundsResponse: + """Parse ``0x99`` response into per-axis min/max travel limits.""" + if frame.command_id != CMD_AXIS_BOUNDS: + raise BenchCelProtocolError(f"expected 0x99 axis bounds response, got {frame}") + if len(frame.payload) != 32: + raise BenchCelProtocolError(f"expected 32-byte 0x99 payload, got {len(frame.payload)}") + f = struct.unpack("<8f", frame.payload) + return AxisBoundsResponse( + theta_min=f[0], + x_min=f[1], + z_min=f[2], + gripper_min=f[3], + theta_max=f[4], + x_max=f[5], + z_max=f[6], + gripper_max=f[7], + raw_payload=frame.payload, + float_values=f, + ) + + +def parse_current_position_response(frame: Frame, *, selector: int = 1) -> CurrentPositionResponse: + """Preserve the observed ``0x85`` response as raw bytes.""" + if frame.command_id != CMD_CURRENT_POSITION: + raise BenchCelProtocolError(f"expected 0x85 current-position response, got {frame}") + return CurrentPositionResponse(selector=selector, raw_payload=frame.payload) + + +# --------------------------------------------------------------------------- +# Backend +# --------------------------------------------------------------------------- + + +class _BenchCelSocket(Socket): + """Socket variant that can bind to a specific local/source IP.""" + + def __init__(self, *args, source_ip: Optional[str] = None, **kwargs): + super().__init__(*args, **kwargs) + self._source_ip = source_ip + + async def _connect(self): + local_addr = (self._source_ip, 0) if self._source_ip is not None else None + self._reader, self._writer = await asyncio.open_connection( + host=self._host, + port=self._port, + ssl=self._ssl_context, + server_hostname=self._server_hostname, + local_addr=local_addr, + ) + + +class BenchCel4RBackend(StackerBackend): + """Asynchronous backend for an Agilent BenchCel 4R microplate handler. + + The BenchCel is a sequential ("stacking access") storage device: each of its four stackers is a + single-ended LIFO stack of plates. It is therefore modelled with the :class:`Stacker` + capability (not the random-access :class:`~pylabrobot.storage.Incubator`), and this backend + implements the :class:`~pylabrobot.storage.stacker_backend.StackerBackend` transfers + ``downstack``/``upstack`` on top of the device's robot pick/place primitives. + """ + + DEFAULT_PORT = 7612 + NUM_STACKERS = 4 + + def __init__( + self, + host: str, + port: int = DEFAULT_PORT, + timeout: float = 30.0, + read_poll_timeout: float = 0.25, + loading_tray_teachpoint_id: Optional[int] = None, + source_ip: Optional[str] = None, + labware: Optional[Union[Plate, BenchCelLabwareSettings, dict]] = None, + ): + """ + Args: + host: IP address or DNS name of the BenchCel Ethernet interface. + port: TCP port. Defaults to 7612, as observed in VWorks captures. + timeout: Default command timeout in seconds. + read_poll_timeout: Per-read timeout used while assembling framed replies. + loading_tray_teachpoint_id: Teachpoint target ID used as the transfer + (loading/unloading) point by :meth:`downstack` and :meth:`upstack`. + There is no fixed loading position on the + BenchCel: the transfer point is a teachpoint you taught in VWorks (or + with :meth:`save_teachpoint`). This is intentionally not defaulted -- + transfers raise unless a teachpoint is configured here or passed per + call via ``teachpoint_id``, because an unset/wrong teachpoint can send + the arm to a home-like pose. The captured VWorks right teachpoint was + ``0x1e``, but do not rely on that without verifying it on your device. + source_ip: Optional local/source IP to bind, useful on hosts with multiple + network interfaces connected to different subnets. + labware: Optional PLR plate, calculated settings object, or serialized + settings dict. The device must still be configured with matching VWorks + labware settings; this value is used for PLR metadata, serialization, + and validation. + """ + super().__init__() + self.host = host + self.port = port + self.timeout = timeout + self.read_poll_timeout = read_poll_timeout + self.loading_tray_teachpoint_id = loading_tray_teachpoint_id + self.source_ip = source_ip + self.labware_settings = ( + resolve_benchcel_labware_settings(labware) if labware is not None else None + ) + self.io = _BenchCelSocket( + human_readable_device_name="Agilent BenchCel 4R", + host=host, + port=port, + read_timeout=read_poll_timeout, + write_timeout=timeout, + source_ip=source_ip, + ) + self._lock = asyncio.Lock() + self._rx_buffer = bytearray() + + async def setup(self) -> None: + """Open the TCP connection to the BenchCel.""" + logger.debug("[benchcel] connecting to %s:%d", self.host, self.port) + await asyncio.wait_for(self.io.setup(), timeout=self.timeout) + + async def stop(self) -> None: + """Close the TCP connection. Safe to call even if never set up.""" + await self.io.stop() + self._rx_buffer.clear() + + def serialize(self) -> dict: + """Return a JSON-serialisable view of this backend's construction args.""" + return { + **super().serialize(), + "host": self.host, + "port": self.port, + "timeout": self.timeout, + "read_poll_timeout": self.read_poll_timeout, + "loading_tray_teachpoint_id": self.loading_tray_teachpoint_id, + "source_ip": self.source_ip, + "labware": self.labware_settings.to_dict() if self.labware_settings is not None else None, + } + + # ------------------------------------------------------------------ wire IO + + async def _write_frame(self, frame: Frame, *, timeout: Optional[float] = None) -> None: + data = frame.to_bytes() + logger.debug("[benchcel] >>> %s raw=%s", frame, data.hex()) + await self.io.write(data, timeout=self.timeout if timeout is None else timeout) + + async def _read_frame(self, *, timeout: Optional[float] = None) -> Frame: + effective_timeout = self.timeout if timeout is None else timeout + loop = asyncio.get_running_loop() + deadline = loop.time() + effective_timeout + + while True: + frame = parse_frame_from_buffer(self._rx_buffer) + if frame is not None: + logger.debug("[benchcel] <<< %s raw=%s", frame, frame.hex()) + return frame + + remaining = deadline - loop.time() + if remaining <= 0: + raise BenchCelTimeoutError(f"timed out after {effective_timeout}s waiting for frame") + + try: + chunk = await self.io.read(4096, timeout=min(self.read_poll_timeout, remaining)) + except TimeoutError: + continue + if not chunk: + raise BenchCelProtocolError("socket closed while waiting for frame") + logger.debug("[benchcel] <<< chunk %d bytes: %s", len(chunk), chunk.hex()) + self._rx_buffer.extend(chunk) + + async def _read_until( + self, + predicate: Callable[[Frame], bool], + *, + timeout: Optional[float] = None, + ) -> Frame: + effective_timeout = self.timeout if timeout is None else timeout + loop = asyncio.get_running_loop() + deadline = loop.time() + effective_timeout + while True: + remaining = deadline - loop.time() + if remaining <= 0: + raise BenchCelTimeoutError( + f"timed out after {effective_timeout}s waiting for matching frame" + ) + frame = await self._read_frame(timeout=remaining) + if frame.command_id == CMD_ERROR: + raise BenchCelDeviceError(parse_error_frame(frame), frame) + if predicate(frame): + return frame + + async def _wait_for_ack_payload( + self, + ack_payload: bytes, + *, + timeout: Optional[float] = None, + ) -> Frame: + """Wait for an exact ``0x69`` ACK payload or raise on device error.""" + return await self._read_until( + lambda f: f.command_id == CMD_ACK and f.payload == ack_payload, + timeout=timeout, + ) + + async def _wait_for_command_ack( + self, + command_id: int, + *, + timeout: Optional[float] = None, + ) -> Frame: + return await self._wait_for_ack_payload(bytes([command_id]), timeout=timeout) + + async def _send_frame_expect_ack_no_lock( + self, + frame: Frame, + *, + ack_payload: Optional[bytes] = None, + timeout: Optional[float] = None, + ) -> Frame: + await self._write_frame(frame, timeout=timeout) + if ack_payload is None: + return await self._wait_for_command_ack(frame.command_id, timeout=timeout) + return await self._wait_for_ack_payload(ack_payload, timeout=timeout) + + async def send_frame( + self, + frame: Frame, + *, + ack_payload: Optional[bytes] = None, + timeout: Optional[float] = None, + ) -> Frame: + """Send one frame and wait for its completion ACK. + + Most motion commands ACK with ``69 01 00 ``. Plate load/unload + ACKs include the stacker index too; pass that exact payload via + ``ack_payload`` for those commands. + """ + async with self._lock: + return await self._send_frame_expect_ack_no_lock( + frame, + ack_payload=ack_payload, + timeout=timeout, + ) + + # --------------------------------------------------------------- movements + + async def home_motors(self, *, timeout: float = 90.0, reconnect: bool = True) -> bool: + """Send VWorks ``home motors`` (``0x47``) and wait for the device to recover. + + Live testing showed this command drops the TCP control session while the + motors home. If ``reconnect=True`` (default), this method reconnects and + polls ``0x87`` status until the device responds again. + """ + async with self._lock: + cmd = Frame(CMD_HOME_MOTORS, b"\x01") + try: + await self._write_frame(cmd, timeout=min(self.timeout, 5.0)) + except OSError: + pass + + try: + await self._wait_for_command_ack(cmd.command_id, timeout=5.0) + if not reconnect: + return True + except (BenchCelProtocolError, BenchCelTimeoutError, OSError, ConnectionError): + pass + + if not reconnect: + return True + + loop = asyncio.get_running_loop() + deadline = loop.time() + timeout + while loop.time() < deadline: + try: + await self.io.stop() + except Exception: # pragma: no cover - defensive cleanup + pass + self._rx_buffer.clear() + try: + await asyncio.wait_for(self.io.setup(), timeout=min(self.timeout, 5.0)) + await self._write_frame(Frame(CMD_GENERAL_STATUS), timeout=2.0) + await self._read_until(lambda f: f.command_id == CMD_GENERAL_STATUS, timeout=2.0) + return True + except ( + BenchCelProtocolError, + BenchCelTimeoutError, + TimeoutError, + OSError, + ConnectionError, + asyncio.TimeoutError, + ): + await asyncio.sleep(2.0) + raise BenchCelTimeoutError(f"home-motors: device not responsive within {timeout}s") + + async def home(self, *, timeout: float = 15.0) -> Frame: + """Send the home command and wait for completion.""" + return await self.send_frame(Frame(CMD_HOME, b"\x01"), timeout=timeout) + + async def move_to_stacker(self, stacker: int, *, timeout: float = 20.0) -> Frame: + """Move the arm to stacker 1, 2, 3, or 4.""" + return await self.move_to_target(_stacker_index(stacker), timeout=timeout) + + async def move_to_target( + self, + target_id: int, + *, + approach_height: float = 0.0, + timeout: float = 20.0, + ) -> Frame: + """Move to a one-byte BenchCel target id using command ``0x65``.""" + return await self.send_frame( + Frame(CMD_MOVE_TO_TARGET, _move_to_target_payload(target_id, approach_height)), + timeout=timeout, + ) + + async def move_to_teachpoint( + self, + teachpoint_id: int, + *, + approach_height: float = 20.0, + timeout: float = 20.0, + ) -> Frame: + """Move to a teachpoint target id using command ``0x65``.""" + return await self.move_to_target( + teachpoint_id, + approach_height=approach_height, + timeout=timeout, + ) + + async def move_to_right_teachpoint( + self, + *, + approach_height: float = 20.0, + timeout: float = 20.0, + ) -> Frame: + """Move to the captured right teachpoint target id ``0x1e``.""" + return await self.move_to_teachpoint( + RIGHT_TEACHPOINT_ID, + approach_height=approach_height, + timeout=timeout, + ) + + async def pick_plate_from_target( + self, + target_id: int, + *, + timeout: float = 30.0, + ) -> Frame: + """Pick/downstack a plate from a target id using command ``0x62``.""" + return await self.send_frame( + Frame(CMD_PICK, _target_payload(target_id)), + timeout=timeout, + ) + + async def place_plate_at_target( + self, + target_id: int, + *, + timeout: float = 30.0, + ) -> Frame: + """Place/upstack a plate at a target id using command ``0x63``.""" + return await self.send_frame( + Frame(CMD_PLACE, _target_payload(target_id)), + timeout=timeout, + ) + + async def pick_plate_from_teachpoint( + self, + teachpoint_id: int, + *, + timeout: float = 30.0, + ) -> Frame: + """Pick a plate from a teachpoint target id using command ``0x62``.""" + return await self.pick_plate_from_target(teachpoint_id, timeout=timeout) + + async def place_plate_at_teachpoint( + self, + teachpoint_id: int, + *, + timeout: float = 30.0, + ) -> Frame: + """Place a plate at a teachpoint target id using command ``0x63``.""" + return await self.place_plate_at_target(teachpoint_id, timeout=timeout) + + async def pick_plate_from_right_teachpoint( + self, + *, + timeout: float = 30.0, + ) -> Frame: + """Pick a plate from the captured right teachpoint target id ``0x1e``.""" + return await self.pick_plate_from_teachpoint( + RIGHT_TEACHPOINT_ID, + timeout=timeout, + ) + + async def place_plate_at_right_teachpoint( + self, + *, + timeout: float = 30.0, + ) -> Frame: + """Place a plate at the captured right teachpoint target id ``0x1e``.""" + return await self.place_plate_at_teachpoint( + RIGHT_TEACHPOINT_ID, + timeout=timeout, + ) + + async def load_stacker(self, stacker: int, *, timeout: float = 30.0) -> Frame: + """Send the ``0x60`` stacker load command for stacker 1-4. + + Confirmed from VWorks captures: pressing "Load" emits a single ``0x60`` with + payload ``01 `` and the device replies ``0x69`` ``60 ``. + This operates the whole-stacker mechanism, not the robot grippers, and is + distinct from :meth:`downstack_plate`/:meth:`upstack_plate` (the + ``0x62``/``0x63`` per-plate robot pick/place). + """ + stacker_index = _stacker_index(stacker) + cmd = Frame(CMD_LOAD_PLATE, bytes([0x01, stacker_index])) + return await self.send_frame( + cmd, + ack_payload=bytes([cmd.command_id, stacker_index]), + timeout=timeout, + ) + + async def unload_stacker(self, stacker: int, *, timeout: float = 30.0) -> Frame: + """Send the ``0x61`` stacker unload command for stacker 1-4. + + Confirmed from VWorks captures: pressing "Unload" emits a single ``0x61`` + with payload ``01 00 00 00 00`` and the device replies + ``0x69`` ``61 ``. Like :meth:`load_stacker` this drives the + whole-stacker mechanism, not the robot grippers. + """ + stacker_index = _stacker_index(stacker) + cmd = Frame(CMD_UNLOAD_PLATE, bytes([0x01, stacker_index]) + b"\x00\x00\x00\x00") + return await self.send_frame( + cmd, + ack_payload=bytes([cmd.command_id, stacker_index]), + timeout=timeout, + ) + + async def dangerously_open_stacker_grippers( + self, + stacker: int, + *, + timeout: float = 15.0, + ) -> Frame: + """Open pneumatic stacker grippers/clamps using command ``0x67``. + + Caution: this diagnostic command can release/drop a plate stack if it is not + physically supported. These are stacker clamps, not the robot grippers. + """ + payload = bytes([_stacker_index(stacker), 0x01]) + return await self.send_frame( + Frame(CMD_STACKER_GRIPPER, payload), + timeout=timeout, + ) + + async def close_stacker_grippers( + self, + stacker: int, + *, + timeout: float = 15.0, + ) -> Frame: + """Close pneumatic stacker grippers/clamps using command ``0x67``.""" + payload = bytes([_stacker_index(stacker), 0x00]) + return await self.send_frame( + Frame(CMD_STACKER_GRIPPER, payload), + timeout=timeout, + ) + + async def downstack_plate(self, stacker: int, *, timeout: float = 30.0) -> Frame: + """Pick/downstack one plate from stacker 1-4. + + Equivalent to the VWorks "Downstack" task: confirmed from captures to emit a + single ``0x62`` with payload ``01 00 01``. + """ + return await self.pick_plate_from_target(_stacker_index(stacker), timeout=timeout) + + async def upstack_plate(self, stacker: int, *, timeout: float = 30.0) -> Frame: + """Place/upstack one plate to stacker 1-4. + + Equivalent to the VWorks "Upstack" task: confirmed from captures to emit a + single ``0x63`` with payload ``01 00 01``. + """ + return await self.place_plate_at_target(_stacker_index(stacker), timeout=timeout) + + async def jog(self, axis: int, delta: float, *, timeout: float = 10.0) -> Frame: + """Send a relative jog command on one axis and wait for ACK/error.""" + if axis not in AXIS_NAMES: + raise ValueError(f"axis must be one of {sorted(AXIS_NAMES)}, got {axis!r}") + return await self.send_frame( + Frame(CMD_JOG, struct.pack(" Frame: + """Relative theta jog. Positive is CCW/left in observed tests.""" + return await self.jog(AXIS_THETA, delta_degrees, timeout=timeout) + + async def move_x(self, delta_mm: float, *, timeout: float = 10.0) -> Frame: + """Relative X jog. Positive is right in observed tests.""" + return await self.jog(AXIS_X, delta_mm, timeout=timeout) + + async def move_z(self, delta_mm: float, *, timeout: float = 10.0) -> Frame: + """Relative Z jog. Positive is up in observed tests.""" + return await self.jog(AXIS_Z, delta_mm, timeout=timeout) + + async def move_gripper_relative( + self, + delta: float, + *, + timeout: float = 10.0, + ) -> Frame: + """Relative robot-gripper jog in internal units. Positive closes grippers.""" + return await self.jog(AXIS_GRIPPER, delta, timeout=timeout) + + async def fully_close_grippers(self, *, timeout: float = 10.0) -> Frame: + """Fully close robot grippers using command ``0x6a``.""" + return await self.send_frame(Frame(CMD_ROBOT_GRIPPER, b"\x00"), timeout=timeout) + + async def fully_open_grippers(self, *, timeout: float = 10.0) -> Frame: + """Fully open robot grippers using command ``0x6a``.""" + return await self.send_frame(Frame(CMD_ROBOT_GRIPPER, b"\x01"), timeout=timeout) + + async def save_teachpoint( + self, + teachpoint: Teachpoint, + *, + expect_ack: bool = False, + timeout: float = 5.0, + ) -> Frame: + """Send ``0x73`` save-teachpoint. + + Captures did not show a command-specific ACK after ``0x73``, so + ``expect_ack`` defaults to ``False`` and the sent frame is returned after + writing. The device cannot read teachpoints back; keep a record of the + numeric teachpoints you write in your own protocol/config if you need them. + """ + cmd = Frame(CMD_SAVE_TEACHPOINT, _teachpoint_payload(teachpoint)) + async with self._lock: + await self._write_frame(cmd, timeout=timeout) + if expect_ack: + return await self._wait_for_command_ack(cmd.command_id, timeout=timeout) + return cmd + + async def save_test_left_teachpoint( + self, + *, + expect_ack: bool = False, + timeout: float = 5.0, + ) -> Frame: + """Send exactly the captured numeric payload for teachpoint ``test-left``.""" + return await self.save_teachpoint( + TEST_LEFT_TEACHPOINT, + expect_ack=expect_ack, + timeout=timeout, + ) + + async def move_plate_between_stackers( + self, + source_stacker: int, + destination_stacker: int, + *, + open_grippers_first: bool = True, + timeout: float = 30.0, + ) -> None: + """Move one plate from the source stacker to the destination stacker.""" + async with self._lock: + if open_grippers_first: + await self._send_frame_expect_ack_no_lock( + Frame(CMD_ROBOT_GRIPPER, b"\x01"), + timeout=timeout, + ) + await self._send_frame_expect_ack_no_lock( + Frame(CMD_PICK, _target_payload(_stacker_index(source_stacker))), + timeout=timeout, + ) + await self._send_frame_expect_ack_no_lock( + Frame(CMD_PLACE, _target_payload(_stacker_index(destination_stacker))), + timeout=timeout, + ) + + # --------------------------------------------------------------- labware config + + async def set_labware( + self, + labware: Union[Plate, BenchCelLabwareSettings, dict], + *, + timeout: float = 10.0, + ) -> BenchCelLabwareSettings: + """Push labware geometry to the BenchCel using the ``0x7d`` settings command. + + VWorks sends the labware settings as a 77-byte ``0x7d`` frame followed by an + empty ``0x9f`` commit, which the device echoes back. Invalid geometry (for + example, gripper hold positions that are too close) is rejected with a + ``0x02`` error. ``labware`` may be a PLR :class:`~pylabrobot.resources.Plate` + (settings are calculated from its dimensions), a + :class:`~pylabrobot.storage.agilent.benchcel_labware.BenchCelLabwareSettings` + object, or a serialized settings dict. + + On success, the resolved settings are stored on ``self.labware_settings`` and + returned. + """ + settings = resolve_benchcel_labware_settings(labware) + payload = settings.to_device_payload() + async with self._lock: + await self._write_frame(Frame(CMD_SET_LABWARE, payload), timeout=timeout) + await self._write_frame(Frame(CMD_SETTINGS_COMMIT), timeout=timeout) + # The device replies with a 0x9f commit echo. On invalid geometry it first + # sends a 0x02 error, then still echoes 0x9f; consume through the echo so + # the stream is not left out of sync, and raise the error afterwards. + error: Optional[str] = None + loop = asyncio.get_running_loop() + deadline = loop.time() + timeout + while True: + remaining = deadline - loop.time() + if remaining <= 0: + raise BenchCelTimeoutError(f"timed out after {timeout}s waiting for 0x9f settings commit") + frame = await self._read_frame(timeout=remaining) + if frame.command_id == CMD_ERROR: + error = parse_error_frame(frame) + elif frame.command_id == CMD_SETTINGS_COMMIT: + break + if error is not None: + raise BenchCelDeviceError(error, Frame(CMD_ERROR, error.encode("ascii", "replace"))) + self.labware_settings = settings + return settings + + # --------------------------------------------------------------- status APIs + + async def request_stacker_sensors(self, stacker: int, *, timeout: float = 5.0) -> SensorStatus: + """Query and decode one stacker's sensor/status frame.""" + expected_index = _stacker_index(stacker) + query = Frame(CMD_SENSOR_STATUS, bytes([expected_index])) + + def is_matching_sensor_response(frame: Frame) -> bool: + return ( + frame.command_id == CMD_SENSOR_STATUS + and len(frame.payload) == 18 + and frame.payload[0] == expected_index + ) + + async with self._lock: + await self._write_frame(query, timeout=timeout) + frame = await self._read_until(is_matching_sensor_response, timeout=timeout) + return parse_sensor_response(frame) + + async def request_all_stacker_sensors( + self, + *, + timeout_per_stacker: float = 5.0, + ) -> List[SensorStatus]: + """Query and decode all four stacker sensor/status frames.""" + sensors: List[SensorStatus] = [] + for stacker in (1, 2, 3, 4): + sensors.append(await self.request_stacker_sensors(stacker, timeout=timeout_per_stacker)) + return sensors + + async def request_general_status(self, *, timeout: float = 5.0) -> GeneralStatus: + """Send ``87 00 00`` and return decoded/raw general status.""" + async with self._lock: + await self._write_frame(Frame(CMD_GENERAL_STATUS), timeout=timeout) + frame = await self._read_until( + lambda f: f.command_id == CMD_GENERAL_STATUS, + timeout=timeout, + ) + return parse_general_status_response(frame) + + async def request_arm_status(self, *, timeout: float = 5.0) -> ArmStatus: + """Send ``87 00 00`` and return decoded theta/X/Z/gripper fields.""" + status = await self.request_general_status(timeout=timeout) + if status.arm_status is None: + raise BenchCelProtocolError( + f"0x87 response did not contain decoded 66-byte arm status: len={len(status.raw_payload)}" + ) + return status.arm_status + + async def request_axis_bounds(self, *, timeout: float = 5.0) -> AxisBoundsResponse: + """Send ``0x99`` query and parse per-axis min/max travel limits.""" + async with self._lock: + await self._write_frame(Frame(CMD_AXIS_BOUNDS), timeout=timeout) + frame = await self._read_until(lambda f: f.command_id == CMD_AXIS_BOUNDS, timeout=timeout) + return parse_axis_bounds_response(frame) + + async def request_current_position( + self, + selector: int = 1, + *, + timeout: float = 5.0, + ) -> CurrentPositionResponse: + """Send ``0x85`` query and return the raw response. + + The selector is preserved for diagnostics, but live tests showed it is + ignored by the device. + """ + if not 0 <= selector <= 0xFF: + raise ValueError(f"selector must fit in one byte, got {selector!r}") + async with self._lock: + await self._write_frame(Frame(CMD_CURRENT_POSITION, bytes([selector])), timeout=timeout) + frame = await self._read_until( + lambda f: f.command_id == CMD_CURRENT_POSITION, + timeout=timeout, + ) + return parse_current_position_response(frame, selector=selector) + + async def vworks_style_idle_poll_once( + self, + *, + timeout_per_response: float = 5.0, + ) -> Tuple[List[SensorStatus], GeneralStatus]: + """Perform one VWorks-like idle polling cycle.""" + sensors = await self.request_all_stacker_sensors(timeout_per_stacker=timeout_per_response) + general = await self.request_general_status(timeout=timeout_per_response) + return sensors, general + + # ---------------------------------------------------------- StackerBackend API + + def _stack_to_stacker(self, stack: ResourceStack) -> int: + """Map a configured ``ResourceStack`` to its human stacker number (1-4).""" + stack_names = [s.name for s in self.stacks] + try: + return stack_names.index(stack.name) + 1 + except ValueError as exc: + raise ValueError(f"Stack {stack.name!r} is not configured on this BenchCel") from exc + + def _resolve_loading_tray_target(self, teachpoint_id: Optional[int]) -> int: + """Return the teachpoint target for a transfer, or raise if none configured.""" + target = self.loading_tray_teachpoint_id if teachpoint_id is None else teachpoint_id + if target is None: + raise ValueError( + "No BenchCel loading/transfer teachpoint configured. The BenchCel has no " + "fixed loading position; set loading_tray_teachpoint_id on the backend/factory " + "or pass teachpoint_id=... to this call. Make sure the teachpoint is taught on " + "the device (VWorks or save_teachpoint) first." + ) + if not 0 <= target <= 0xFF: + raise ValueError(f"teachpoint_id must fit in one byte, got {target!r}") + return target + + async def downstack( + self, + stack: ResourceStack, + *, + teachpoint_id: Optional[int] = None, + open_grippers_first: bool = True, + timeout: float = 30.0, + **backend_kwargs, + ) -> None: + """Move the accessible plate of ``stack`` to the loading teachpoint. + + The BenchCel firmware addresses whole stackers; ``stack`` selects which configured stacker + (1-4) to downstack from. This is the device half of :meth:`Stacker.downstack`: a robot pick + from the stacker (``0x62``) followed by a place at the transfer teachpoint (``0x63``). + """ + _ = backend_kwargs + source_stacker = self._stack_to_stacker(stack) + destination_target = self._resolve_loading_tray_target(teachpoint_id) + async with self._lock: + if open_grippers_first: + await self._send_frame_expect_ack_no_lock( + Frame(CMD_ROBOT_GRIPPER, b"\x01"), + timeout=timeout, + ) + await self._send_frame_expect_ack_no_lock( + Frame(CMD_PICK, _target_payload(_stacker_index(source_stacker))), + timeout=timeout, + ) + await self._send_frame_expect_ack_no_lock( + Frame(CMD_PLACE, _target_payload(destination_target)), + timeout=timeout, + ) + + async def upstack( + self, + stack: ResourceStack, + plate: Plate, + *, + teachpoint_id: Optional[int] = None, + open_grippers_first: bool = True, + timeout: float = 30.0, + **backend_kwargs, + ) -> None: + """Move a plate from the loading teachpoint onto ``stack``. + + The device half of :meth:`Stacker.upstack`: a robot pick from the transfer teachpoint + (``0x62``) followed by a place onto the selected stacker (``0x63``). ``plate`` is accepted for + interface symmetry and PLR state; the firmware only needs the destination stacker. + """ + _ = (plate, backend_kwargs) + source_target = self._resolve_loading_tray_target(teachpoint_id) + destination_stacker = self._stack_to_stacker(stack) + async with self._lock: + if open_grippers_first: + await self._send_frame_expect_ack_no_lock( + Frame(CMD_ROBOT_GRIPPER, b"\x01"), + timeout=timeout, + ) + await self._send_frame_expect_ack_no_lock( + Frame(CMD_PICK, _target_payload(source_target)), + timeout=timeout, + ) + await self._send_frame_expect_ack_no_lock( + Frame(CMD_PLACE, _target_payload(_stacker_index(destination_stacker))), + timeout=timeout, + ) + + +# Short alias for users who do not need the configuration-specific name. +BenchCelBackend = BenchCel4RBackend diff --git a/pylabrobot/storage/agilent/benchcel_labware.py b/pylabrobot/storage/agilent/benchcel_labware.py new file mode 100644 index 00000000000..985590b6ee3 --- /dev/null +++ b/pylabrobot/storage/agilent/benchcel_labware.py @@ -0,0 +1,602 @@ +"""BenchCel labware calculation and PyLabRobot plate integration helpers. + +BenchCel/VWorks labware settings separate three geometry concepts that are easy +to conflate: + +* ``StackingThickness`` is the vertical pitch of plates in a nested stack. +* PLR ``Plate.size_z`` is the full outside height of one plate. +* ``RobotGripperOffset`` is the BenchCel robot gripper contact height measured + from the bottom of the plate. + +This module does not bundle per-catalog BenchCel XML profiles. Instead, it +calculates a conservative BenchCel geometry profile from a PLR plate resource and +allows explicit overrides for values that cannot be inferred from dimensions +alone, such as optical sensor thresholds. +""" + +from __future__ import annotations + +import dataclasses +import struct +import xml.etree.ElementTree as ET +from pathlib import Path +from typing import Optional, Union + +from pylabrobot.resources import Coordinate, Plate + +DEVICE_PAYLOAD_LENGTH = 77 +DEFAULT_NESTING_OVERLAP = 1.5 +DEFAULT_MIN_ROBOT_GRIPPER_OFFSET = 5.0 +DEFAULT_MAX_ROBOT_GRIPPER_OFFSET = 8.0 +DEFAULT_MIN_PICKUP_DISTANCE_FROM_TOP = 5.4 +DEFAULT_LOW_PROFILE_HEIGHT_CUTOFF = 11.5 +DEFAULT_TALL_PLATE_HEIGHT_CUTOFF = 30.0 +DEFAULT_ADDITIONAL_RELEASE_HEIGHT = 2.0 + + +@dataclasses.dataclass(frozen=True) +class PlateNotchSettings: + """Orientation-notch settings from BenchCel/VWorks labware XML or overrides.""" + + check_orientation: bool = True + a1_notch: bool = True + top_right_notch: bool = False + bottom_left_notch: bool = True + bottom_right_notch: bool = False + + +@dataclasses.dataclass(frozen=True) +class BenchCelLabwareSettings: + """BenchCel/VWorks labware settings paired with PLR plate dimensions. + + Args: + name: Human-readable labware name. + plate_size_x: Full outside plate length in mm. + plate_size_y: Full outside plate width in mm. + plate_size_z: Full outside plate height in mm. + stacking_thickness: Vertical pitch of nested plates in the BenchCel stacker. + This is usually smaller than full plate height because plates nest. + robot_gripper_offset: BenchCel robot gripper contact height from the bottom + of the plate. This maps to ``Plate.preferred_pickup_location.z``. + stacker_gripper_offset: Stacker clamp/gripper contact height from the bottom + of the plate. + """ + + name: str + plate_size_x: float + plate_size_y: float + plate_size_z: float + stacking_thickness: float + robot_gripper_offset: float + stacker_gripper_offset: float + sensor_offset: float + gripper_open_position: float = -1.0 + gripper_holding_plate_position: float = 8.0 + gripper_holding_stack_position: float = 8.5 + orientation_sensor_threshold: int = 100 + plate_presence_threshold: int = 225 + sensor_intensity: int = 50 + error_correction_offset: float = 0.0 + can_be_lidded: bool = False + lidded_plate_stacking_thickness: Optional[float] = None + lidded_plate_thickness: Optional[float] = None + lidded_plate_resting_height: Optional[float] = None + lidded_plate_gripper_offset: Optional[float] = None + lidded_plate_gripper_position: Optional[float] = None + lidded_plate_departure_height: Optional[float] = None + can_be_sealed: bool = False + sealed_plate_stacking_thickness: Optional[float] = None + sealed_plate_thickness: Optional[float] = None + stack_plate_presence_threshold: int = 50 + rack_presence_threshold: int = 3 + additional_release_height: float = DEFAULT_ADDITIONAL_RELEASE_HEIGHT + low_pressure_warning: int = 30 + tilt_margin: float = 2.0 + tilt_margin_enabled: bool = False + notch_settings: PlateNotchSettings = dataclasses.field(default_factory=PlateNotchSettings) + identifier: Optional[str] = None + source: Optional[str] = None + + @classmethod + def from_plate(cls, plate: Plate, **kwargs) -> "BenchCelLabwareSettings": + """Calculate BenchCel settings from a PLR plate resource.""" + return calculate_benchcel_labware_settings(plate, **kwargs) + + @classmethod + def from_xml_file( + cls, + path: Union[str, Path], + *, + plate_size_x: Optional[float] = None, + plate_size_y: Optional[float] = None, + plate_size_z: Optional[float] = None, + identifier: Optional[str] = None, + ) -> "BenchCelLabwareSettings": + """Parse a user-supplied BenchCel/VWorks XML file. + + XML files contain BenchCel stack/gripper values but not necessarily the full + physical plate dimensions PLR needs. Provide ``plate_size_*`` values when the + XML should be used to validate or annotate PLR plate resources. + """ + path = Path(path) + root = ET.parse(path).getroot() + labware = root.find("Labware") + if labware is None: + raise ValueError(f"BenchCel labware XML has no section: {path}") + stack = root.find("StackSettings") + notch = labware.find("PlateNotchesOrientationOptions") + + def text(parent: ET.Element, name: str) -> str: + element = parent.find(name) + if element is None or element.text is None: + raise ValueError(f"Missing <{name}> in {path}") + return element.text.strip() + + def optional_text(parent: Optional[ET.Element], name: str) -> Optional[str]: + if parent is None: + return None + element = parent.find(name) + if element is None or element.text is None: + return None + value = element.text.strip() + return value if value != "" else None + + def as_bool(value: Optional[str]) -> bool: + return value is not None and value.strip().lower() in {"yes", "true", "1", "enabled"} + + def as_optional_float(value: Optional[str]) -> Optional[float]: + return None if value is None else float(value) + + def as_optional_int(value: Optional[str]) -> Optional[int]: + return None if value is None else int(value) + + name = text(labware, "Name") + stacking_thickness = float(text(labware, "StackingThickness")) + robot_gripper_offset = float(text(labware, "RobotGripperOffset")) + stacker_gripper_offset = float(text(labware, "StackerGripperOffset")) + sensor_offset = float(text(labware, "SensorOffset")) + + # If full physical height is not supplied, use stack pitch as the best-known + # fallback. Callers should pass real dimensions for PLR integration. + px = 127.76 if plate_size_x is None else plate_size_x + py = 85.48 if plate_size_y is None else plate_size_y + pz = stacking_thickness if plate_size_z is None else plate_size_z + + tilt = stack.find("TiltMargin") if stack is not None else None + notch_settings = PlateNotchSettings() + if notch is not None: + notch_settings = PlateNotchSettings( + check_orientation=as_bool(optional_text(notch, "CheckOrientation")), + a1_notch=as_bool(optional_text(notch, "A1Notch")), + top_right_notch=as_bool(optional_text(notch, "TopRightNotch")), + bottom_left_notch=as_bool(optional_text(notch, "BottomLeftNotch")), + bottom_right_notch=as_bool(optional_text(notch, "BottomRightNotch")), + ) + + return cls( + identifier=identifier or path.stem, + name=name, + plate_size_x=px, + plate_size_y=py, + plate_size_z=pz, + stacking_thickness=stacking_thickness, + robot_gripper_offset=robot_gripper_offset, + stacker_gripper_offset=stacker_gripper_offset, + sensor_offset=sensor_offset, + gripper_open_position=float(text(labware, "GripperOpenPosition")), + gripper_holding_plate_position=float(text(labware, "GripperHoldingPlatePosition")), + gripper_holding_stack_position=float(text(labware, "GripperHoldingStackPosition")), + orientation_sensor_threshold=int(text(labware, "OrientationSensorThreshold")), + plate_presence_threshold=int(text(labware, "PlatePresenceThreshold")), + sensor_intensity=int(text(labware, "SensorIntensity")), + error_correction_offset=float(text(labware, "ErrorCorrectionOffset")), + can_be_lidded=as_bool(optional_text(labware, "CanBeLidded")), + lidded_plate_stacking_thickness=as_optional_float( + optional_text(labware, "LiddedPlateStackingThickness") + ), + lidded_plate_thickness=as_optional_float(optional_text(labware, "LiddedPlateThickness")), + lidded_plate_resting_height=as_optional_float( + optional_text(labware, "LiddedPlateRestingHeight") + ), + lidded_plate_gripper_offset=as_optional_float( + optional_text(labware, "LiddedPlateGripperOffset") + ), + lidded_plate_gripper_position=as_optional_float( + optional_text(labware, "LiddedPlateGripperPosition") + ), + lidded_plate_departure_height=as_optional_float( + optional_text(labware, "LiddedPlateDepartureHeight") + ), + can_be_sealed=as_bool(optional_text(labware, "CanBeSealed")), + sealed_plate_stacking_thickness=as_optional_float( + optional_text(labware, "SealedPlateStackingThickness") + ), + sealed_plate_thickness=as_optional_float(optional_text(labware, "SealedPlateThickness")), + stack_plate_presence_threshold=as_optional_int(optional_text(stack, "PlatePresenceThreshold")) + or 50, + rack_presence_threshold=as_optional_int(optional_text(stack, "RackPresenceThreshold")) or 3, + additional_release_height=as_optional_float(optional_text(stack, "AdditionalReleaseHeight")) + or DEFAULT_ADDITIONAL_RELEASE_HEIGHT, + low_pressure_warning=as_optional_int(optional_text(stack, "LowPressureWarning")) or 30, + tilt_margin=as_optional_float(tilt.text.strip() if tilt is not None and tilt.text else None) + or 2.0, + tilt_margin_enabled=as_bool(tilt.get("Enabled") if tilt is not None else None), + notch_settings=notch_settings, + source=str(path), + ) + + @classmethod + def from_dict(cls, data: dict) -> "BenchCelLabwareSettings": + """Deserialize settings from :meth:`to_dict`.""" + data = dict(data) + notch = data.get("notch_settings") + if isinstance(notch, dict): + data["notch_settings"] = PlateNotchSettings(**notch) + return cls(**data) + + def to_dict(self) -> dict: + """Return JSON-serialisable settings.""" + return dataclasses.asdict(self) + + def effective_stacking_thickness(self, *, sealed: bool = False, lidded: bool = False) -> float: + """Return the BenchCel stack pitch for the selected labware state.""" + sealed_pitch = self.sealed_plate_stacking_thickness + if sealed and sealed_pitch not in (None, 0): + return float(sealed_pitch) # type: ignore[arg-type] + lidded_pitch = self.lidded_plate_stacking_thickness + if lidded and lidded_pitch not in (None, 0): + return float(lidded_pitch) # type: ignore[arg-type] + return self.stacking_thickness + + def effective_plate_height(self, *, sealed: bool = False, lidded: bool = False) -> float: + """Return full outside plate height for PLR rack/site sizing.""" + sealed_height = self.sealed_plate_thickness + if sealed and sealed_height not in (None, 0): + return float(sealed_height) # type: ignore[arg-type] + lidded_height = self.lidded_plate_thickness + if lidded and lidded_height not in (None, 0): + return float(lidded_height) # type: ignore[arg-type] + return self.plate_size_z + + def robot_pickup_distance_from_top(self, *, sealed: bool = False, lidded: bool = False) -> float: + """Return PLR ``pickup_distance_from_top`` implied by ``RobotGripperOffset``.""" + return self.effective_plate_height(sealed=sealed, lidded=lidded) - self.robot_gripper_offset + + def preferred_pickup_location(self, plate: Plate) -> Coordinate: + """Return a PLR preferred pickup location using the BenchCel robot offset.""" + return Coordinate( + x=plate.get_size_x() / 2, + y=plate.get_size_y() / 2, + z=self.robot_gripper_offset, + ) + + def dimension_differences(self, plate: Plate) -> dict[str, float]: + """Return profile minus plate-resource dimensions for each axis.""" + return { + "x": self.plate_size_x - plate.get_size_x(), + "y": self.plate_size_y - plate.get_size_y(), + "z": self.plate_size_z - plate.get_size_z(), + } + + def validate_plate_dimensions(self, plate: Plate, *, tolerance_mm: float = 0.25) -> None: + """Raise if a PLR plate resource differs too far from this BenchCel profile.""" + differences = self.dimension_differences(plate) + failures = [ + f"{axis}: expected {expected:.3f} mm, got {actual:.3f} mm" + for axis, expected, actual in ( + ("x", self.plate_size_x, plate.get_size_x()), + ("y", self.plate_size_y, plate.get_size_y()), + ("z", self.plate_size_z, plate.get_size_z()), + ) + if abs(differences[axis]) > tolerance_mm + ] + if failures: + raise ValueError( + f"Plate {plate.name!r} does not match BenchCel labware {self.name!r}: " + + "; ".join(failures) + ) + + def apply_to_plate( + self, + plate: Plate, + *, + validate_dimensions: bool = True, + tolerance_mm: float = 0.25, + ) -> Plate: + """Set PLR pickup metadata on ``plate`` using this BenchCel profile.""" + if validate_dimensions: + self.validate_plate_dimensions(plate, tolerance_mm=tolerance_mm) + plate.preferred_pickup_location = self.preferred_pickup_location(plate) + return plate + + def to_device_payload(self) -> bytes: + """Encode the 77-byte ``0x7d`` BenchCel labware-settings payload. + + The layout was reverse-engineered from VWorks packet captures and validated + byte-for-byte against several real plates. Fields that were always zero in + the captures for standard flat microplates (``ErrorCorrectionOffset`` and the + lidded/sealed sub-fields) are sent as zero and are not yet mapped. + """ + if not 0 <= int(self.orientation_sensor_threshold) <= 0xFFFF: + raise ValueError("orientation_sensor_threshold must fit in uint16") + if not 0 <= int(self.sensor_intensity) <= 0xFFFF: + raise ValueError("sensor_intensity must fit in uint16") + if not 0 <= int(self.plate_presence_threshold) <= 0xFFFF: + raise ValueError("plate_presence_threshold must fit in uint16") + + notch = self.notch_settings + payload = bytearray(DEVICE_PAYLOAD_LENGTH) + struct.pack_into(" "BenchCelLabwareSettings": + """Decode a 77-byte ``0x7d`` payload back into settings. + + Only the confidently-mapped fields are recovered; unmapped lidded/sealed + fields stay at their defaults. + """ + if len(payload) != DEVICE_PAYLOAD_LENGTH: + raise ValueError(f"expected {DEVICE_PAYLOAD_LENGTH}-byte 0x7d payload, got {len(payload)}") + + def fp(o: int) -> float: + return float(struct.unpack_from(" int: + return int(struct.unpack_from(" float: + """Estimate BenchCel ``StackingThickness`` from full plate height. + + The default overlap (1.5 mm) matches the supplied example XML/dimension pairs + within about 0.2 mm. Override this for labware with unusual nesting behavior. + """ + if plate_height <= 0: + raise ValueError(f"plate_height must be positive, got {plate_height}") + if nesting_overlap < 0: + raise ValueError(f"nesting_overlap must be non-negative, got {nesting_overlap}") + if nesting_overlap >= plate_height: + raise ValueError( + f"nesting_overlap ({nesting_overlap}) must be smaller than plate_height ({plate_height})" + ) + return plate_height - nesting_overlap + + +def calculate_robot_gripper_offset( + plate_height: float, + *, + min_offset: float = DEFAULT_MIN_ROBOT_GRIPPER_OFFSET, + max_offset: float = DEFAULT_MAX_ROBOT_GRIPPER_OFFSET, + min_pickup_distance_from_top: float = DEFAULT_MIN_PICKUP_DISTANCE_FROM_TOP, +) -> float: + """Estimate BenchCel ``RobotGripperOffset`` from plate height. + + The BenchCel manual says plates are typically gripped 5-10 mm above the bottom. + The default calculation keeps at least ~5.4 mm above the grip point where + possible while capping the grip height at 8 mm from the bottom. + """ + if plate_height <= 0: + raise ValueError(f"plate_height must be positive, got {plate_height}") + if min_offset > max_offset: + raise ValueError("min_offset must be <= max_offset") + return max(min_offset, min(max_offset, plate_height - min_pickup_distance_from_top)) + + +def calculate_stacker_gripper_offset( + plate_height: float, + robot_gripper_offset: float, + *, + low_profile_height_cutoff: float = DEFAULT_LOW_PROFILE_HEIGHT_CUTOFF, + tall_plate_height_cutoff: float = DEFAULT_TALL_PLATE_HEIGHT_CUTOFF, +) -> float: + """Estimate BenchCel ``StackerGripperOffset`` from plate height. + + This is a heuristic: low plates need clamps lower, very tall plates can use a + slightly higher clamp point, and standard SBS microplates sit in between. + """ + if plate_height <= low_profile_height_cutoff: + return min(robot_gripper_offset, 4.0) + if plate_height >= tall_plate_height_cutoff: + return min(robot_gripper_offset, 6.0) + return min(robot_gripper_offset, 5.0) + + +def calculate_sensor_offset( + plate_height: float, + *, + low_profile_height_cutoff: float = DEFAULT_LOW_PROFILE_HEIGHT_CUTOFF, + tall_plate_height_cutoff: float = DEFAULT_TALL_PLATE_HEIGHT_CUTOFF, +) -> float: + """Estimate BenchCel ``SensorOffset`` from plate height.""" + if plate_height <= low_profile_height_cutoff: + return 7.0 + if plate_height >= tall_plate_height_cutoff: + return max(7.0, plate_height - 4.0) + return 8.0 + + +def calculate_benchcel_labware_settings( + plate: Plate, + *, + name: Optional[str] = None, + identifier: Optional[str] = None, + nesting_overlap: float = DEFAULT_NESTING_OVERLAP, + stacking_thickness: Optional[float] = None, + robot_gripper_offset: Optional[float] = None, + stacker_gripper_offset: Optional[float] = None, + sensor_offset: Optional[float] = None, + orientation_sensor_threshold: int = 100, + plate_presence_threshold: int = 225, + sensor_intensity: int = 50, + error_correction_offset: float = 0.0, + gripper_open_position: float = -1.0, + gripper_holding_plate_position: float = 8.0, + gripper_holding_stack_position: float = 8.5, + can_be_lidded: Optional[bool] = None, + can_be_sealed: bool = False, + sealed_plate_stacking_thickness: Optional[float] = None, + sealed_plate_thickness: Optional[float] = None, + notch_settings: Optional[PlateNotchSettings] = None, +) -> BenchCelLabwareSettings: + """Calculate BenchCel labware settings from a PLR plate resource. + + Geometry fields are calculated from ``plate``. Optical sensor thresholds and + notch options cannot be reliably inferred from dimensions, so they are exposed + as optional overrides with conservative defaults. + """ + height = plate.get_size_z() + robot_offset = robot_gripper_offset + if robot_offset is None: + robot_offset = calculate_robot_gripper_offset(height) + stacker_offset = stacker_gripper_offset + if stacker_offset is None: + stacker_offset = calculate_stacker_gripper_offset(height, robot_offset) + sensor = sensor_offset + if sensor is None: + sensor = calculate_sensor_offset(height) + + # BenchCel ``StackingThickness`` is the per-plate vertical pitch of a nested stack, which is + # exactly PLR ``Plate.stacking_z_height``. Prefer an explicit override, then the plate's own + # declared pitch, and only estimate from height (``size_z - nesting_overlap``) as a last resort. + resolved_stacking_thickness = stacking_thickness + if resolved_stacking_thickness is None: + resolved_stacking_thickness = plate.stacking_z_height + if resolved_stacking_thickness is None: + resolved_stacking_thickness = calculate_stacking_thickness( + height, nesting_overlap=nesting_overlap + ) + + return BenchCelLabwareSettings( + identifier=identifier, + name=name or plate.model or plate.name, + plate_size_x=plate.get_size_x(), + plate_size_y=plate.get_size_y(), + plate_size_z=height, + stacking_thickness=resolved_stacking_thickness, + robot_gripper_offset=robot_offset, + stacker_gripper_offset=stacker_offset, + sensor_offset=sensor, + gripper_open_position=gripper_open_position, + gripper_holding_plate_position=gripper_holding_plate_position, + gripper_holding_stack_position=gripper_holding_stack_position, + orientation_sensor_threshold=orientation_sensor_threshold, + plate_presence_threshold=plate_presence_threshold, + sensor_intensity=sensor_intensity, + error_correction_offset=error_correction_offset, + can_be_lidded=plate.has_lid() if can_be_lidded is None else can_be_lidded, + can_be_sealed=can_be_sealed, + sealed_plate_stacking_thickness=sealed_plate_stacking_thickness, + sealed_plate_thickness=sealed_plate_thickness, + notch_settings=notch_settings or PlateNotchSettings(), + source="calculated from PLR plate dimensions", + ) + + +def resolve_benchcel_labware_settings( + labware: Union[Plate, BenchCelLabwareSettings, dict], +) -> BenchCelLabwareSettings: + """Resolve a PLR plate, settings object, or serialized settings dict.""" + if isinstance(labware, BenchCelLabwareSettings): + return labware + if isinstance(labware, Plate): + return calculate_benchcel_labware_settings(labware) + if isinstance(labware, dict): + return BenchCelLabwareSettings.from_dict(labware) + raise TypeError( + "labware must be a Plate, BenchCelLabwareSettings, or serialized settings dict; " + f"got {type(labware).__name__}" + ) + + +def apply_benchcel_labware_settings( + plate: Plate, + labware: Optional[Union[BenchCelLabwareSettings, dict]] = None, + *, + validate_dimensions: bool = True, + tolerance_mm: float = 0.25, + **calculation_kwargs, +) -> BenchCelLabwareSettings: + """Apply BenchCel pickup metadata to ``plate`` and return the settings used. + + If ``labware`` is omitted, settings are calculated from the PLR plate + dimensions using :func:`calculate_benchcel_labware_settings`. + """ + settings = ( + calculate_benchcel_labware_settings(plate, **calculation_kwargs) + if labware is None + else resolve_benchcel_labware_settings(labware) + ) + settings.apply_to_plate( + plate, + validate_dimensions=validate_dimensions, + tolerance_mm=tolerance_mm, + ) + return settings + + +def benchcel_labware_summary_row(settings: BenchCelLabwareSettings) -> dict: + """Return one summary row useful for docs/tests/diagnostics.""" + height = settings.effective_plate_height() + return { + "name": settings.name, + "plate_height": height, + "stacking_thickness": settings.stacking_thickness, + "nesting_overlap": height - settings.stacking_thickness, + "robot_gripper_offset": settings.robot_gripper_offset, + "pickup_distance_from_top": settings.robot_pickup_distance_from_top(), + "stacker_gripper_offset": settings.stacker_gripper_offset, + "sensor_offset": settings.sensor_offset, + } diff --git a/pylabrobot/storage/agilent/benchcel_mock_server.py b/pylabrobot/storage/agilent/benchcel_mock_server.py new file mode 100644 index 00000000000..495287253c7 --- /dev/null +++ b/pylabrobot/storage/agilent/benchcel_mock_server.py @@ -0,0 +1,437 @@ +"""In-process mock server for the Agilent BenchCel 4R TCP protocol.""" + +from __future__ import annotations + +import argparse +import asyncio +import dataclasses +import logging +import struct +from typing import Dict, Optional + +from .benchcel_backend import ( + AXIS_GRIPPER, + AXIS_THETA, + AXIS_X, + AXIS_Z, + CMD_ACK, + CMD_AXIS_BOUNDS, + CMD_CURRENT_POSITION, + CMD_ERROR, + CMD_GENERAL_STATUS, + CMD_HOME, + CMD_HOME_MOTORS, + CMD_JOG, + CMD_LOAD_PLATE, + CMD_MOVE_TO_TARGET, + CMD_PICK, + CMD_PLACE, + CMD_ROBOT_GRIPPER, + CMD_SAVE_TEACHPOINT, + CMD_SENSOR_STATUS, + CMD_SET_LABWARE, + CMD_SETTINGS_COMMIT, + CMD_STACKER_GRIPPER, + CMD_UNLOAD_PLATE, + AxisBoundsResponse, + Frame, + Teachpoint, + make_frame, + parse_frame_from_buffer, +) +from .benchcel_labware import DEVICE_PAYLOAD_LENGTH, BenchCelLabwareSettings + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass +class _Pose: + theta: float = 0.0 + x: float = 0.0 + z: float = 10.0 + gripper: float = -1.0 + + +_DEFAULT_BOUNDS = AxisBoundsResponse( + theta_min=-115.0, + x_min=-360.9, + z_min=-1.5, + gripper_min=-1.5, + theta_max=115.0, + x_max=360.9, + z_max=104.0, + gripper_max=11.0, + raw_payload=b"", + float_values=(-115.0, -360.9, -1.5, -1.5, 115.0, 360.9, 104.0, 11.0), +) + + +class BenchCelMockServer: + """Small asyncio TCP server emulating the BenchCel binary protocol. + + The mock is wire-compatible for the commands implemented by + :class:`~pylabrobot.storage.agilent.benchcel_backend.BenchCel4RBackend` and is + intended for backend tests and manual protocol debugging. + """ + + def __init__( + self, + host: str = "127.0.0.1", + port: int = 0, + *, + close_on_home_motors: bool = True, + ): + self.host = host + self.port = port + self.close_on_home_motors = close_on_home_motors + self._server: Optional[asyncio.AbstractServer] = None + self._pose = _Pose() + self._bounds = _DEFAULT_BOUNDS + self._teachpoints: Dict[int, Teachpoint] = {} + self._plate_in_gripper = False + self.plate_presence = [0, 1, 128, 118] + self.air_pressure = [56, 56, 45, 47] + self.stacker_grippers_open = [False, False, False, False] + self.labware: Optional[BenchCelLabwareSettings] = None + self.received_frames: list[Frame] = [] + + async def __aenter__(self) -> "BenchCelMockServer": + await self.start() + return self + + async def __aexit__(self, exc_type, exc, tb) -> None: + await self.stop() + + async def start(self) -> None: + """Start accepting TCP connections.""" + if self._server is not None: + return + self._server = await asyncio.start_server(self._handle_client, host=self.host, port=self.port) + sockets = list(self._server.sockets or []) + if sockets: + self.port = sockets[0].getsockname()[1] + logger.info("BenchCelMockServer listening on %s:%d", self.host, self.port) + + async def stop(self) -> None: + """Stop the server and wait for its listening socket to close.""" + if self._server is None: + return + self._server.close() + await self._server.wait_closed() + self._server = None + + async def serve_forever(self) -> None: + """Run until cancelled.""" + if self._server is None: + await self.start() + assert self._server is not None + await self._server.serve_forever() + + async def _handle_client( + self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + ) -> None: + buffer = bytearray() + try: + while not reader.at_eof(): + chunk = await reader.read(4096) + if not chunk: + break + buffer.extend(chunk) + while True: + frame = parse_frame_from_buffer(buffer) + if frame is None: + break + self.received_frames.append(frame) + close_after_response = await self._handle_frame(frame, writer) + await writer.drain() + if close_after_response: + writer.close() + await writer.wait_closed() + return + finally: + if not writer.is_closing(): + writer.close() + await writer.wait_closed() + + async def _handle_frame(self, frame: Frame, writer: asyncio.StreamWriter) -> bool: + """Handle one incoming frame. Return True to close the TCP session.""" + if frame.command_id == CMD_HOME_MOTORS: + self._pose = _Pose() + if not self.close_on_home_motors: + self._write_ack(writer, CMD_HOME_MOTORS) + return self.close_on_home_motors + + if frame.command_id == CMD_HOME: + self._pose = _Pose() + self._write_ack(writer, CMD_HOME) + return False + + if frame.command_id == CMD_LOAD_PLATE: + stacker = self._read_stacker_payload(frame, writer) + if stacker is not None: + self._write_ack(writer, CMD_LOAD_PLATE, stacker) + return False + + if frame.command_id == CMD_UNLOAD_PLATE: + stacker = self._read_stacker_payload(frame, writer) + if stacker is not None: + self._write_ack(writer, CMD_UNLOAD_PLATE, stacker) + return False + + if frame.command_id in (CMD_PICK, CMD_PLACE): + target_id = self._read_target_payload(frame, writer) + if target_id is None: + return False + self._move_to_target_id(target_id, approach_height=0.0) + self._plate_in_gripper = frame.command_id == CMD_PICK + self._pose.gripper = 5.0 if self._plate_in_gripper else -1.0 + self._write_ack(writer, frame.command_id) + return False + + if frame.command_id == CMD_MOVE_TO_TARGET: + if len(frame.payload) != 10: + self._write_error(writer, "Malformed move-to-target payload") + return False + _, target_id, _, approach_height = struct.unpack(" Optional[int]: + try: + return self._validate_stacker_payload(frame.payload) + except ValueError as exc: + self._write_error(writer, str(exc)) + return None + + def _read_target_payload(self, frame: Frame, writer: asyncio.StreamWriter) -> Optional[int]: + try: + return self._validate_target_payload(frame.payload) + except ValueError as exc: + self._write_error(writer, str(exc)) + return None + + def _write_ack(self, writer: asyncio.StreamWriter, command_id: int, *extra: int) -> None: + writer.write(make_frame(CMD_ACK, bytes([command_id, *extra]))) + + def _write_error(self, writer: asyncio.StreamWriter, message: str) -> None: + writer.write(make_frame(CMD_ERROR, message.encode("ascii", errors="replace"))) + + @staticmethod + def _validate_stacker_payload(payload: bytes) -> int: + if len(payload) < 2 or payload[0] != 0x01 or payload[1] not in (0, 1, 2, 3): + raise ValueError(f"invalid stacker payload: {payload.hex()}") + return payload[1] + + @staticmethod + def _validate_target_payload(payload: bytes) -> int: + if len(payload) != 4 or payload[0] != 0x01 or payload[2:] != b"\x00\x01": + raise ValueError(f"invalid target payload: {payload.hex()}") + return payload[1] + + def _handle_jog(self, frame: Frame, writer: asyncio.StreamWriter) -> None: + if len(frame.payload) != 5: + self._write_error(writer, "Malformed jog payload") + return + axis, delta = struct.unpack(" None: + if len(frame.payload) != 27: + self._write_error(writer, "Malformed save-teachpoint payload") + return + values = struct.unpack(" None: + if len(frame.payload) != DEVICE_PAYLOAD_LENGTH: + self._write_error(writer, "Malformed labware settings payload") + return + settings = BenchCelLabwareSettings.from_device_payload(frame.payload) + # The device rejects geometry where the stack hold position is not above the + # plate hold position (observed "too close" rejections had stack <= plate). + if settings.gripper_holding_stack_position <= settings.gripper_holding_plate_position: + self._write_error(writer, "The labware gripper positions are too close") + return + self.labware = settings + + def _move_to_target_id(self, target_id: int, approach_height: float) -> None: + if target_id in (0, 1, 2, 3): + self._pose.theta = 0.0 + self._pose.x = (-270.0, -90.0, 90.0, 270.0)[target_id] + self._pose.z = max(self._bounds.z_min, min(self._bounds.z_max, approach_height)) + return + + teachpoint = self._teachpoints.get(target_id) + if teachpoint is None: + self._pose.theta = 0.0 + self._pose.x = 0.0 + self._pose.z = 10.0 + return + + self._pose.theta = teachpoint.theta + self._pose.x = teachpoint.x + self._pose.z = max( + self._bounds.z_min, + min(self._bounds.z_max, teachpoint.z + approach_height), + ) + + def _sensor_payload(self, stacker_index: int) -> bytes: + return struct.pack( + " bytes: + payload = bytearray(66) + struct.pack_into(" bytes: + payload = bytearray(33) + payload[0] = selector + struct.pack_into(" argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Run a mock Agilent BenchCel 4R TCP server") + parser.add_argument("--host", default="127.0.0.1") + parser.add_argument("--port", type=int, default=7612) + parser.add_argument("--verbose", action="store_true") + return parser + + +async def _amain() -> None: + args = build_arg_parser().parse_args() + logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO) + server = BenchCelMockServer(host=args.host, port=args.port) + await server.start() + print(f"BenchCelMockServer listening on {server.host}:{server.port}") + try: + await server.serve_forever() + except asyncio.CancelledError: + pass + + +def main() -> None: + asyncio.run(_amain()) + + +if __name__ == "__main__": + main() diff --git a/pylabrobot/storage/agilent/benchcel_mock_server_tests.py b/pylabrobot/storage/agilent/benchcel_mock_server_tests.py new file mode 100644 index 00000000000..fd0af953fa9 --- /dev/null +++ b/pylabrobot/storage/agilent/benchcel_mock_server_tests.py @@ -0,0 +1,166 @@ +"""End-to-end BenchCel backend tests against the in-process mock server.""" + +from __future__ import annotations + +import unittest + +from pylabrobot.storage.agilent.benchcel_backend import BenchCel4RBackend, BenchCelDeviceError +from pylabrobot.storage.agilent.benchcel_mock_server import BenchCelMockServer + + +class BenchCelMockServerTests(unittest.IsolatedAsyncioTestCase): + async def test_motion_status_and_axis_bounds_over_tcp(self): + async with BenchCelMockServer() as server: + backend = BenchCel4RBackend( + host=server.host, + port=server.port, + timeout=2.0, + read_poll_timeout=0.01, + ) + await backend.setup() + try: + await backend.home() + await backend.move_to_stacker(3) + status = await backend.request_arm_status() + self.assertAlmostEqual(status.x, 90.0) + self.assertAlmostEqual(status.z, 0.0) + + await backend.move_x(5.0) + status = await backend.request_arm_status() + self.assertAlmostEqual(status.x, 95.0) + + bounds = await backend.request_axis_bounds() + self.assertAlmostEqual(bounds.theta_min, -115.0) + self.assertAlmostEqual(bounds.x_max, 360.9, places=3) + finally: + await backend.stop() + + async def test_stacker_sensor_query_over_tcp(self): + async with BenchCelMockServer() as server: + backend = BenchCel4RBackend( + host=server.host, + port=server.port, + timeout=2.0, + read_poll_timeout=0.01, + ) + await backend.setup() + try: + sensors = await backend.request_all_stacker_sensors() + self.assertEqual([s.stacker for s in sensors], [1, 2, 3, 4]) + self.assertEqual(sensors[2].plate_presence, 128) + self.assertTrue(sensors[2].plate_present()) + finally: + await backend.stop() + + async def test_jog_out_of_bounds_raises_device_error(self): + async with BenchCelMockServer() as server: + backend = BenchCel4RBackend( + host=server.host, + port=server.port, + timeout=2.0, + read_poll_timeout=0.01, + ) + await backend.setup() + try: + with self.assertRaises(BenchCelDeviceError) as cm: + await backend.move_x(500.0) + self.assertEqual(cm.exception.message, "X position out of bounds") + finally: + await backend.stop() + + async def test_teachpoint_save_then_move_over_tcp(self): + async with BenchCelMockServer() as server: + backend = BenchCel4RBackend( + host=server.host, + port=server.port, + timeout=2.0, + read_poll_timeout=0.01, + ) + await backend.setup() + try: + await backend.save_test_left_teachpoint() + await backend.move_to_teachpoint(0x1F, approach_height=20.0) + status = await backend.request_arm_status() + self.assertAlmostEqual(status.theta, 89.99874114990234, places=3) + self.assertAlmostEqual(status.x, -360.8802795410156, places=3) + self.assertAlmostEqual(status.z, 10.0) + finally: + await backend.stop() + + async def test_stacker_gripper_diagnostic_over_tcp(self): + async with BenchCelMockServer() as server: + backend = BenchCel4RBackend( + host=server.host, + port=server.port, + timeout=2.0, + read_poll_timeout=0.01, + ) + await backend.setup() + try: + await backend.dangerously_open_stacker_grippers(1) + self.assertTrue(server.stacker_grippers_open[0]) + await backend.close_stacker_grippers(1) + self.assertFalse(server.stacker_grippers_open[0]) + finally: + await backend.stop() + + async def test_set_labware_over_tcp(self): + from pylabrobot.resources.plate import Plate + + async with BenchCelMockServer() as server: + backend = BenchCel4RBackend( + host=server.host, + port=server.port, + timeout=2.0, + read_poll_timeout=0.01, + ) + await backend.setup() + try: + plate = Plate("p", size_x=127.76, size_y=85.48, size_z=14.4, ordered_items={}) + settings = await backend.set_labware(plate) + self.assertAlmostEqual(settings.stacking_thickness, 12.9) + self.assertIs(backend.labware_settings, settings) + assert server.labware is not None + self.assertAlmostEqual(server.labware.plate_size_z, 14.4, places=3) + # A second call still works (stream stays in sync after the commit echo). + await backend.set_labware(settings) + finally: + await backend.stop() + + async def test_set_labware_rejects_too_close_gripper_positions(self): + from pylabrobot.storage.agilent import BenchCelLabwareSettings, PlateNotchSettings + + async with BenchCelMockServer() as server: + backend = BenchCel4RBackend( + host=server.host, + port=server.port, + timeout=2.0, + read_poll_timeout=0.01, + ) + await backend.setup() + try: + bad = BenchCelLabwareSettings( + name="bad", + plate_size_x=127.76, + plate_size_y=85.48, + plate_size_z=14.4, + stacking_thickness=12.9, + robot_gripper_offset=8.0, + stacker_gripper_offset=5.0, + sensor_offset=8.0, + gripper_holding_plate_position=8.0, + gripper_holding_stack_position=8.0, # not above plate -> rejected + notch_settings=PlateNotchSettings(), + ) + with self.assertRaises(BenchCelDeviceError) as cm: + await backend.set_labware(bad) + self.assertEqual(cm.exception.message, "The labware gripper positions are too close") + # The connection is still usable after a rejection. + status = await backend.request_arm_status() + self.assertIsNotNone(status) + finally: + await backend.stop() + + +if __name__ == "__main__": + unittest.main() diff --git a/pylabrobot/storage/agilent/benchcel_tests.py b/pylabrobot/storage/agilent/benchcel_tests.py new file mode 100644 index 00000000000..eb03e788b47 --- /dev/null +++ b/pylabrobot/storage/agilent/benchcel_tests.py @@ -0,0 +1,500 @@ +import asyncio +import struct +import tempfile +import unittest +from pathlib import Path + +from pylabrobot.resources import Coordinate +from pylabrobot.resources.plate import Plate +from pylabrobot.resources.resource_stack import ResourceStack +from pylabrobot.storage.agilent import ( + BenchCel4R, + BenchCelLabwareSettings, + PlateNotchSettings, + apply_benchcel_labware_settings, + calculate_benchcel_labware_settings, + calculate_robot_gripper_offset, + calculate_sensor_offset, + calculate_stacker_gripper_offset, + calculate_stacking_thickness, +) +from pylabrobot.storage.agilent.benchcel_backend import ( + TEST_LEFT_TEACHPOINT, + BenchCel4RBackend, + BenchCelDeviceError, + BenchCelProtocolError, + Frame, + parse_arm_status_from_87_payload, + parse_frame_from_buffer, + parse_sensor_response, + split_frames, +) +from pylabrobot.storage.agilent.stacks import benchcel_4r_stacks +from pylabrobot.storage.stacker import Stacker +from pylabrobot.storage.stacker_backend import StackerBackend + + +class _FakeWriter: + def __init__(self) -> None: + self.sent = bytearray() + self.closed = False + + def write(self, data: bytes) -> None: + self.sent.extend(data) + + async def drain(self) -> None: + return None + + def close(self) -> None: + self.closed = True + + async def wait_closed(self) -> None: + return None + + +class _FakeReader: + def __init__(self, chunks: list[bytes]) -> None: + self._chunks = list(chunks) + + async def read(self, num_bytes: int) -> bytes: + if not self._chunks: + return b"" + chunk = self._chunks.pop(0) + if len(chunk) > num_bytes: + self._chunks.insert(0, chunk[num_bytes:]) + return chunk[:num_bytes] + return chunk + + +def _make_backend(chunks: list[bytes]) -> tuple[BenchCel4RBackend, _FakeWriter]: + backend = BenchCel4RBackend(host="ignored", port=0, timeout=1.0, read_poll_timeout=0.01) + writer = _FakeWriter() + backend.io._writer = writer # type: ignore[assignment] + backend.io._reader = _FakeReader(chunks) # type: ignore[assignment] + return backend, writer + + +def _sensor_payload(stacker_index: int = 2) -> bytes: + return struct.pack( + " + + + Example + 12.34 + 100 + 225 + 50 + 0.0 + 8.0 + 5.0 + 8.0 + -1.0 + 8.0 + 8.5 + + Yes + Yes + No + Yes + No + + No + No + + + 50 + 3 + 2.0 + 30 + 2.0 + + +""" + with tempfile.TemporaryDirectory() as td: + path = Path(td) / "example.xml" + path.write_text(xml, encoding="ascii") + settings = BenchCelLabwareSettings.from_xml_file( + path, + identifier="example", + plate_size_x=127.76, + plate_size_y=85.48, + plate_size_z=14.4, + ) + self.assertEqual(settings.identifier, "example") + self.assertEqual(settings.name, "Example") + self.assertAlmostEqual(settings.stacking_thickness, 12.34) + self.assertEqual(settings.stack_plate_presence_threshold, 50) + self.assertTrue(settings.notch_settings.a1_notch) + + +class BenchCelFactoryTests(unittest.TestCase): + def setUp(self): + # Constructing the backend creates ``asyncio.Lock``/``Socket`` objects, which on Python 3.9 + # bind to the current event loop at init time; ensure one exists for these synchronous tests. + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + + def tearDown(self): + asyncio.set_event_loop(None) + self._loop.close() + + def test_factory_creates_stacker_with_backend_and_four_stacks(self): + benchcel = BenchCel4R(name="bc", host="192.168.0.100") + self.assertIsInstance(benchcel, Stacker) + backend = benchcel.backend + assert isinstance(backend, BenchCel4RBackend) + self.assertEqual(backend.host, "192.168.0.100") + self.assertEqual(len(benchcel.stacks), 4) + self.assertTrue(all(isinstance(s, ResourceStack) for s in benchcel.stacks)) + self.assertEqual(benchcel.model, "Agilent BenchCel 4R") + + def test_factory_records_labware_settings(self): + plate = Plate("plate", size_x=127.76, size_y=85.47, size_z=44.04, ordered_items={}) + benchcel = BenchCel4R(name="bc", host="192.168.0.100", labware=plate) + backend = benchcel.backend + assert isinstance(backend, BenchCel4RBackend) + assert backend.labware_settings is not None + self.assertEqual(backend.labware_settings.name, "plate") + # no stacking_z_height on the plate -> estimated pitch (44.04 - 1.5) + self.assertAlmostEqual(backend.labware_settings.stacking_thickness, 42.54) + + +class BenchCelBackendWireTests(unittest.IsolatedAsyncioTestCase): + async def test_home_writes_command_and_waits_for_split_ack(self): + backend, writer = _make_backend([b"\x69", b"\x01\x00\x48"]) + ack = await backend.home() + self.assertEqual(writer.sent.hex(), "48010001") + self.assertEqual(ack, Frame(0x69, b"\x48")) + + async def test_move_to_stacker_writes_expected_frame(self): + backend, writer = _make_backend([Frame(0x69, b"\x65").to_bytes()]) + await backend.move_to_stacker(3) + self.assertEqual(writer.sent.hex(), "650a0001020000204100000000") + + async def test_save_teachpoint_writes_captured_shape(self): + backend, writer = _make_backend([]) + await backend.save_teachpoint(TEST_LEFT_TEACHPOINT) + self.assertEqual( + writer.sent.hex(), + "731b001f5bffb342ad70b4c3000020c100010000a041000000000000c0bf", + ) + + async def test_device_error_raises(self): + payload = b"X position out of bounds" + frame = Frame(0x02, payload).to_bytes() + backend, writer = _make_backend([frame]) + with self.assertRaises(BenchCelDeviceError) as cm: + await backend.move_x(500) + self.assertEqual(writer.sent.hex(), Frame(0x66, struct.pack(" 0x62 01 02 00 01 ACK 0x69 62 + # Upstack -> 0x63 01 02 00 01 ACK 0x69 63 + # Load -> 0x60 01 02 ACK 0x69 60 02 + # Unload -> 0x61 01 02 00 00 00 00 ACK 0x69 61 02 + backend, writer = _make_backend( + [ + Frame(0x69, b"\x62").to_bytes(), + Frame(0x69, b"\x63").to_bytes(), + Frame(0x69, b"\x60\x02").to_bytes(), + Frame(0x69, b"\x61\x02").to_bytes(), + ] + ) + await backend.downstack_plate(3) + await backend.upstack_plate(3) + await backend.load_stacker(3) + await backend.unload_stacker(3) + self.assertEqual( + writer.sent.hex(), + "62040001020001" # downstack stacker 3 + "63040001020001" # upstack stacker 3 + "6002000102" # load stacker 3 + "610600010200000000", # unload stacker 3 + ) + + async def test_serialize_includes_connection_info(self): + plate = Plate("plate", size_x=127.76, size_y=85.48, size_z=14.6, ordered_items={}) + backend = BenchCel4RBackend( + host="192.168.0.100", + port=7612, + timeout=12.5, + labware=plate, + ) + serialized = backend.serialize() + self.assertEqual(serialized["type"], "BenchCel4RBackend") + self.assertEqual(serialized["host"], "192.168.0.100") + self.assertEqual(serialized["timeout"], 12.5) + self.assertEqual(serialized["labware"]["name"], "plate") + self.assertAlmostEqual(serialized["labware"]["stacking_thickness"], 13.1) + deserialized = StackerBackend.deserialize(serialized.copy()) + self.assertIsInstance(deserialized, BenchCel4RBackend) + self.assertEqual(deserialized.host, "192.168.0.100") + self.assertEqual(deserialized.labware_settings.name, "plate") + + +class BenchCelStackerMappingTests(unittest.IsolatedAsyncioTestCase): + async def test_transfers_require_a_configured_teachpoint(self): + backend = BenchCel4RBackend(host="ignored") # no loading_tray_teachpoint_id + self.assertIsNone(backend.loading_tray_teachpoint_id) + stacks = benchcel_4r_stacks() + await backend.set_stacks(stacks) + plate = Plate("plate", size_x=1, size_y=1, size_z=1, ordered_items={}) + + with self.assertRaisesRegex(ValueError, "teachpoint"): + await backend.downstack(stacks[0]) + with self.assertRaisesRegex(ValueError, "teachpoint"): + await backend.upstack(stacks[0], plate) + + async def test_downstack_maps_stack_to_stacker(self): + backend = BenchCel4RBackend(host="ignored") + stacks = benchcel_4r_stacks() + await backend.set_stacks(stacks) + + sent: list[Frame] = [] + + async def fake_send(frame: Frame, **kwargs) -> Frame: + sent.append(frame) + return Frame(0x69, kwargs.get("ack_payload") or bytes([frame.command_id])) + + backend._send_frame_expect_ack_no_lock = fake_send # type: ignore[method-assign] + # stacks[2] is human stacker 3 (zero-based index 0x02) + await backend.downstack(stacks[2], teachpoint_id=0x1E) + + self.assertEqual( + [f.hex() for f in sent], + ["6a010001", "62040001020001", "630400011e0001"], + ) + + async def test_upstack_maps_stack_to_stacker(self): + backend = BenchCel4RBackend(host="ignored") + stacks = benchcel_4r_stacks() + await backend.set_stacks(stacks) + plate = Plate("plate", size_x=1, size_y=1, size_z=1, ordered_items={}) + + sent: list[Frame] = [] + + async def fake_send(frame: Frame, **kwargs) -> Frame: + sent.append(frame) + return Frame(0x69, kwargs.get("ack_payload") or bytes([frame.command_id])) + + backend._send_frame_expect_ack_no_lock = fake_send # type: ignore[method-assign] + # stacks[0] is human stacker 1 (zero-based index 0x00) + await backend.upstack(stacks[0], plate, teachpoint_id=0x1E) + + self.assertEqual( + [f.hex() for f in sent], + ["6a010001", "620400011e0001", "63040001000001"], + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/pylabrobot/storage/agilent/stacks.py b/pylabrobot/storage/agilent/stacks.py new file mode 100644 index 00000000000..50d43f9571f --- /dev/null +++ b/pylabrobot/storage/agilent/stacks.py @@ -0,0 +1,20 @@ +"""Stacker resources for Agilent BenchCel stackers.""" + +from typing import List + +from pylabrobot.resources.resource_stack import ResourceStack + + +def benchcel_4r_stacks(name_prefix: str = "benchcel_stacker") -> List[ResourceStack]: + """Create the four LIFO stacks for an Agilent BenchCel 4R. + + Each stacker is modelled as a z-direction + :class:`~pylabrobot.resources.resource_stack.ResourceStack` (a single-ended LIFO stack), used + with the :class:`~pylabrobot.storage.Stacker` capability. Stack height is computed from each + plate's ``stacking_z_height`` as plates are added, so -- unlike the previous fixed-site rack + model -- no ``num_sites``/``site_pitch``/``site_height`` needs to be supplied. + + The stacks are named ``{name_prefix}_1`` .. ``{name_prefix}_4`` and are ordered to match the + human stacker numbers 1-4 used by the backend and firmware. + """ + return [ResourceStack(name=f"{name_prefix}_{i}", direction="z") for i in range(1, 5)] diff --git a/pylabrobot/storage/cytomat/cytomat.py b/pylabrobot/storage/cytomat/cytomat.py index ca9227fe003..006a064dcd5 100644 --- a/pylabrobot/storage/cytomat/cytomat.py +++ b/pylabrobot/storage/cytomat/cytomat.py @@ -159,6 +159,7 @@ async def send_action( self, command_type: str, command: str, params: str, timeout: Optional[int] = 60 ) -> OverviewRegisterState: """Calls send_command, but has a timeout handler and returns the overview register state. + Args: timeout: The maximum time to wait for the command to complete. If None, the command will not wait for completion.