diff --git a/CHANGELOG.md b/CHANGELOG.md index b0c4231645a..7170f85f261 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ### Added +- Agilent BenchCel 4R storage backend (`pylabrobot.storage.agilent.BenchCel4RBackend`) speaking the reverse-engineered binary TCP/7612 protocol, plus a `BenchCel4R(...)` factory, calculation-based BenchCel labware/rack sizing helpers, a `set_labware(...)` command that pushes labware geometry to the device via the decoded `0x7d`/`0x9f` settings protocol, an in-process mock server, and a user guide. Opening the stacker clamps (which can drop a plate stack) is exposed as `dangerously_open_stacker_grippers`. - HighRes Biosolutions MicroSpin centrifuge backend (`pylabrobot.centrifuge.highres.MicroSpinBackend`) speaking the device's ASCII command/response protocol over TCP/1000, plus a `MicroSpin(...)` factory. - In-process `MicroSpinMockServer` (`pylabrobot.centrifuge.highres.mock_server`) that faithfully emulates the MicroSpin's wire protocol -- including the firmware's "`status` blocks until the spindle has stopped" semantics and the low-G spin-down-detection hang -- usable as a Python async context manager or runnable as a script (`python -m pylabrobot.centrifuge.highres.mock_server`) for `nc`/`telnet` debugging. - `MicroSpinBackend.reset()` recovery helper that issues `abort` -> `clearbuttonabort` -> `status`, using the last as the gate that genuinely confirms the rotor has stopped. diff --git a/docs/api/pylabrobot.rst b/docs/api/pylabrobot.rst index 051a61e8bd1..e6e4d16584b 100644 --- a/docs/api/pylabrobot.rst +++ b/docs/api/pylabrobot.rst @@ -19,6 +19,7 @@ Subpackages pylabrobot.only_fans pylabrobot.resources pylabrobot.scales + pylabrobot.storage pylabrobot.io.sila pylabrobot.shaking pylabrobot.temperature_controlling diff --git a/docs/api/pylabrobot.storage.rst b/docs/api/pylabrobot.storage.rst new file mode 100644 index 00000000000..0e98fc38972 --- /dev/null +++ b/docs/api/pylabrobot.storage.rst @@ -0,0 +1,70 @@ +.. currentmodule:: pylabrobot.storage + +pylabrobot.storage package +========================== + +This package contains APIs for automated storage devices and incubators. + +.. autosummary:: + :toctree: _autosummary + :nosignatures: + :recursive: + + incubator.Incubator + agilent.benchcel.BenchCel4R + + +Backends +-------- + +.. autosummary:: + :toctree: _autosummary + :nosignatures: + :recursive: + + backend.IncubatorBackend + agilent.benchcel_backend.BenchCel4RBackend + cytomat.cytomat.CytomatBackend + liconic.liconic_backend.ExperimentalLiconicBackend + + +Agilent BenchCel support classes +-------------------------------- + +.. autosummary:: + :toctree: _autosummary + :nosignatures: + :recursive: + + agilent.benchcel_backend.Frame + agilent.benchcel_backend.SensorStatus + agilent.benchcel_backend.ArmStatus + agilent.benchcel_backend.GeneralStatus + agilent.benchcel_backend.Teachpoint + agilent.benchcel_backend.AxisBoundsResponse + agilent.benchcel_backend.CurrentPositionResponse + agilent.benchcel_labware.BenchCelLabwareSettings + agilent.benchcel_labware.PlateNotchSettings + agilent.benchcel_labware.apply_benchcel_labware_settings + agilent.benchcel_labware.calculate_benchcel_labware_settings + agilent.benchcel_labware.calculate_robot_gripper_offset + agilent.benchcel_labware.calculate_sensor_offset + agilent.benchcel_labware.calculate_stacker_gripper_offset + agilent.benchcel_labware.calculate_stacking_thickness + agilent.benchcel_mock_server.BenchCelMockServer + agilent.racks.benchcel_stacker_rack + agilent.racks.benchcel_4r_racks + agilent.racks.benchcel_4r_racks_for_labware + + +Errors +------ + +.. autosummary:: + :toctree: _autosummary + :nosignatures: + :recursive: + + agilent.benchcel_backend.BenchCelDeviceError + agilent.benchcel_backend.BenchCelProtocolError + agilent.benchcel_backend.BenchCelTimeoutError diff --git a/docs/contributor_guide/contributing-new-resources.md b/docs/contributor_guide/contributing-new-resources.md index addf70c5614..45e5653cbe7 100644 --- a/docs/contributor_guide/contributing-new-resources.md +++ b/docs/contributor_guide/contributing-new-resources.md @@ -59,6 +59,10 @@ def AGenBio_96_wellplate_Ub_2200ul(name: str, lid: Optional[Lid] = None) -> Plat size_x=127.76, # from spec size_y=85.48, # from spec size_z=42.5, # from spec + # Optional: the vertical pitch one plate adds to a stack of identical plates + # (size_z minus how far two identical plates nest). Only needed by plate + # stackers (e.g. the Agilent BenchCel); leave unset (None) otherwise. + stacking_z_height=39.0, # measured ... ) ```` diff --git a/docs/resources/itemized-resource/plate/plate.rst b/docs/resources/itemized-resource/plate/plate.rst index 0f670b97369..7e2de7dbf36 100644 --- a/docs/resources/itemized-resource/plate/plate.rst +++ b/docs/resources/itemized-resource/plate/plate.rst @@ -49,3 +49,21 @@ The ``nesting_z_height`` is the overlap between the lid and the plate when the l .. image:: /resources/img/plate/lid_nesting_z_height.jpeg :alt: nesting_z_height measurement + +---- + +``stacking_z_height`` +--------------------- +:class:`~pylabrobot.resources.plate.Plate` accepts an optional ``stacking_z_height`` argument: the vertical pitch (in mm) that one plate adds to a stack when an identical plate is placed directly on top of it. Equivalently, it is ``size_z`` minus the amount two identical plates nest into one another. + +It defaults to ``None`` (unknown) and is only required by devices that physically stack plates, such as the Agilent BenchCel microplate handler. It mirrors the ``stacking_z_height`` of a nested tip rack (:class:`~pylabrobot.resources.tip_rack.NestedTipRack`). + +To measure it, stack two identical plates and measure the total height with a caliper; then:: + + stacking_z_height = height_of_two_stacked_plates - size_z + +More generally, a stack of ``N`` identical plates is ``size_z + (N - 1) * stacking_z_height`` tall. + +When set, :class:`~pylabrobot.resources.resource_stack.ResourceStack` uses this value so that bare plates stacked in the z direction nest into one another (a plate placed on another bare plate sinks in by ``size_z - stacking_z_height``). Plates without a ``stacking_z_height``, and plates wearing a lid, do not nest. + +Because ``stacking_z_height`` is a physical dimension, two plates that differ in it are treated as different labware and do not compare equal. diff --git a/docs/resources/resource-stack/resource-stack.ipynb b/docs/resources/resource-stack/resource-stack.ipynb index 76f6faf9c08..fb6b9710a7e 100644 --- a/docs/resources/resource-stack/resource-stack.ipynb +++ b/docs/resources/resource-stack/resource-stack.ipynb @@ -240,6 +240,37 @@ "stacking_area.get_top_item() is plate" ] }, + { + "cell_type": "markdown", + "id": "a1b2c3d4", + "metadata": {}, + "source": [ + "### Nesting plates by `stacking_z_height`\n", + "\n", + "Bare plates that define a `stacking_z_height` nest into one another when stacked in the\n", + "z direction: a plate placed on another bare plate sinks in by `size_z - stacking_z_height`,\n", + "so a stack of `N` identical plates is `size_z + (N - 1) * stacking_z_height` tall instead of\n", + "`N * size_z`. Plates without a `stacking_z_height`, and plates wearing a lid, do not nest.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a1b2c3d5", + "metadata": {}, + "outputs": [], + "source": [ + "nesting_stack = ResourceStack(\"nesting_stack\", \"z\")\n", + "nesting_stack.location = Coordinate.zero()\n", + "for i in range(3):\n", + " nesting_stack.assign_child_resource(\n", + " Plate(f\"plate_{i}\", size_x=127, size_y=86, size_z=14, ordered_items={}, stacking_z_height=4)\n", + " )\n", + "\n", + "# 14 + (3 - 1) * 4 = 22, not 3 * 14 = 42\n", + "nesting_stack.get_size_z()" + ] + }, { "cell_type": "markdown", "id": "dc9633b5", diff --git a/docs/user_guide/01_material-handling/storage/agilent_benchcel.rst b/docs/user_guide/01_material-handling/storage/agilent_benchcel.rst new file mode 100644 index 00000000000..fc46791fd0c --- /dev/null +++ b/docs/user_guide/01_material-handling/storage/agilent_benchcel.rst @@ -0,0 +1,332 @@ +Agilent BenchCel 4R +=================== + +The Agilent BenchCel Microplate Handler is an open, sequential stacker system +for moving SBS/ANSI-format plates between vertical stackers and one or more +taught robot-accessible positions. PyLabRobot supports the four-stacker +configuration with +:class:`~pylabrobot.storage.agilent.benchcel_backend.BenchCel4RBackend` and the +:func:`~pylabrobot.storage.agilent.benchcel.BenchCel4R` factory. + +.. warning:: + + The BenchCel protocol implemented here is reverse-engineered from Agilent + VWorks packet captures and live tests, not vendor Ethernet documentation. + Keep the robot/stacker area clear, make sure E-stop/power-off is available, + and ensure VWorks or any other control client is disconnected before issuing + motion commands. + +Manual safety notes +------------------- + +The Agilent BenchCel Microplate Handler R-Series Quick Guide (G5400-90003A) +contains several operational details that matter when automating the device: + +* The pendant has a red robot-disable button. Pressing it cuts power to the + motors and stops motion. +* Compressed air drives the stacker-head mechanisms. Power and compressed air + must be on for normal operation and rack install/removal workflows. +* The stacker clamps (also called stacker grippers) hold or release the bottom + plate at the base of a rack. They normally open/close automatically during + loading, unloading, downstacking, and stacking. Manual open/close is a + diagnostic/recovery action and can drop plates if the stack is unsupported. +* The stacker shelves temporarily support and level plates during downstacking + and upstacking. Retracting shelves can drop plates. PyLabRobot does not expose + a shelf command yet because the captured shelf-related command is not mapped + with enough confidence. +* The BenchCel is designed for ANSI/SBS-compatible labware. It typically grips + plates 5-10 mm above the bottom, between the plate top and skirt. Deep lids or + flexible skirts can cause unreliable gripping or accidental lid removal. + +Connection +---------- + +The BenchCel uses a framed binary protocol over TCP. The default IP address in +our captures was ``192.168.0.100`` and the observed TCP port was ``7612``. + +.. code-block:: python + + from pylabrobot.storage.agilent import BenchCel4RBackend + + backend = BenchCel4RBackend( + host="192.168.0.100", + port=7612, + # Optional: bind to the BenchCel-facing network interface on multi-NIC hosts. + source_ip="192.168.0.200", + ) + await backend.setup() + +Labware profiles and PLR plate dimensions +------------------------------------------ + +BenchCel/VWorks labware XML uses device-specific dimensions that are not exactly +identical to the dimensions PLR needs: + +* ``StackingThickness`` is the vertical pitch between nested plates in a stack. + It is usually smaller than the full plate height. +* PLR plate ``size_z`` should be the full outside plate height. +* ``RobotGripperOffset`` is the robot gripper contact height from the bottom of + the plate. In PLR this maps to ``plate.preferred_pickup_location.z`` and to + ``pickup_distance_from_top = plate.size_z - RobotGripperOffset``. + +PyLabRobot calculates BenchCel labware geometry from a PLR plate resource rather +than bundling per-catalog XML profiles. The default calculation is: + +* ``StackingThickness = plate.size_z - 1.5 mm``. Override + ``nesting_overlap`` if a plate family nests differently. +* ``RobotGripperOffset`` is kept between 5 and 8 mm from the bottom while + preserving at least about 5.4 mm above the grip point where possible. +* ``StackerGripperOffset`` is estimated as 4 mm for low-profile plates, 5 mm for + standard plates, and 6 mm for tall/deep plates. +* ``SensorOffset`` is estimated as 7 mm for low-profile plates, 8 mm for standard + plates, and near the top of tall/deep plates. + +The supplied example XML/dimension pairs were used to choose the defaults, but +optical thresholds and exact nesting behavior cannot be perfectly inferred from +outside dimensions. Pass explicit overrides when your measured/VWorks values +are known. + +.. code-block:: python + + from pylabrobot.resources.plate import Plate + from pylabrobot.storage.agilent import ( + BenchCel4R, + apply_benchcel_labware_settings, + calculate_benchcel_labware_settings, + ) + + plate = Plate("p1", size_x=127.76, size_y=85.48, size_z=10.4, ordered_items={}) + settings = apply_benchcel_labware_settings(plate) + assert settings.robot_gripper_offset == 5.0 + assert plate.preferred_pickup_location.z == 5.0 + + benchcel = BenchCel4R( + name="benchcel", + host="192.168.0.100", + labware=settings, + stacker_num_sites=20, + ) + + # If you know the exact nesting overlap for a plate family, override it. + settings = calculate_benchcel_labware_settings(plate, nesting_overlap=1.3) + +You can also parse user-supplied VWorks XML labware files with +:meth:`~pylabrobot.storage.agilent.benchcel_labware.BenchCelLabwareSettings.from_xml_file` +and supply the full measured/manufacturer plate dimensions. This is useful for +comparing calculated values against the current VWorks settings on the BenchCel +laptop, but the integration does not bundle those XML profiles. + +.. note:: + + If validation says a PLR plate resource has the wrong height, do not use + BenchCel stacker motion until the PLR resource and the VWorks labware profile + agree. For example, ``StackingThickness`` is not an acceptable substitute for + PLR ``size_z``. + +Pushing labware settings to the device +-------------------------------------- + +VWorks pushes the active labware geometry to the BenchCel over TCP whenever you +apply labware settings. This was confirmed from packet captures: the laptop +sends a 77-byte ``0x7d`` settings frame followed by an empty ``0x9f`` commit that +the device echoes back. Invalid geometry (for example, gripper hold positions +that are not separated) is rejected with a ``0x02`` error such as +``"The labware gripper positions are too close"``. + +The backend can push the same settings directly, so you do not need VWorks to +configure the active labware. The payload encoder is byte-for-byte compatible +with the captured VWorks frames for standard flat microplates and includes the +full plate height (offset 37), gripper offsets, sensor thresholds, notch options, +and ``PlatePresenceThreshold`` (offset 75). + +.. code-block:: python + + from pylabrobot.resources.plate import Plate + from pylabrobot.storage.agilent import BenchCel4RBackend + + backend = BenchCel4RBackend(host="192.168.0.100") + await backend.setup() + + plate = Plate("p1", size_x=127.76, size_y=85.48, size_z=14.4, ordered_items={}) + # Calculates geometry from the plate, encodes 0x7d, sends 0x7d + 0x9f, and + # raises BenchCelDeviceError if the device rejects the geometry. + settings = await backend.set_labware(plate) + + # Or push an explicit settings object / serialized dict. + await backend.set_labware(settings) + +.. warning:: + + The lidded/sealed sub-fields and ``ErrorCorrectionOffset`` were always zero + in the captures for standard flat microplates and are not yet mapped, so they + are sent as zero. Pushing settings for lidded or sealed labware is therefore + not fully supported yet. + +Status and sensors +------------------ + +VWorks continuously polls each stacker's sensors and a general arm-status frame. +The backend exposes decoded helpers for both. + +.. code-block:: python + + sensors = await backend.request_all_stacker_sensors() + for sensor in sensors: + print(sensor.stacker, sensor.air_pressure, sensor.plate_presence) + + arm = await backend.request_arm_status() + print(arm.theta, arm.x, arm.z, arm.gripper) + + bounds = await backend.request_axis_bounds() + print(bounds.x_min, bounds.x_max) + +Stacker and teachpoint moves +---------------------------- + +Stackers are addressed as human numbers 1 through 4 in the high-level API. The +wire protocol uses zero-based target IDs internally. + +.. code-block:: python + + await backend.home() + await backend.move_to_stacker(3) + await backend.fully_open_grippers() + await backend.downstack_plate(3) + await backend.upstack_plate(4) + +VWorks packet captures confirm that ``downstack_plate`` / ``upstack_plate`` are +exactly what the VWorks "Downstack" / "Upstack" buttons emit (a single ``0x62`` +/ ``0x63`` robot pick/place at the stacker target), and that ``load_stacker`` / +``unload_stacker`` are the distinct ``0x60`` / ``0x61`` stacker-mechanism commands +behind the "Load" / "Unload" buttons. + +If a stacker mechanism is in a bad state, the real device may request an unload +then load recovery cycle, or report errors such as ``Stack not loaded``, +``Stacker shelf position error``, ``Stacker shelf not retracted``, or +``Stacker gripper extended``. The stacker load/unload methods operate the +stacker mechanism, not the robot grippers. + +.. code-block:: python + + await backend.unload_stacker(3) + await backend.load_stacker(3) + +The backend also exposes the diagnostic stacker clamp command observed as +``0x67``. Opening the stacker clamps can release/drop a plate stack, so that one +operation keeps a ``dangerously_`` prefix; use it only for recovery/diagnostics +and only when the plate stack is physically supported. + +.. code-block:: python + + await backend.dangerously_open_stacker_grippers(1) # can drop plates + await backend.close_stacker_grippers(1) + +Teachpoints +----------- + +BenchCel transfer points are numeric teachpoint slots. The VWorks "right" +teachpoint observed in captures used target ID ``0x1e``. Live tests confirmed +that teachpoints can be written standalone with command ``0x73``; an undefined +teachpoint slot may move the arm to a home-like pose instead of to the desired +location. + +.. code-block:: python + + from pylabrobot.storage.agilent import Teachpoint + + await backend.save_teachpoint( + Teachpoint( + teachpoint_id=0x1e, + theta=0.0, + x=350.0, + z=0.0, + approach_height=20.0, + cavity_depth=0.0, + gripper_open_limit=-1.5, + respect_approach_height_when_not_holding_plate=True, + something_above_this_point=False, + name="right-transfer", # metadata only; not sent to the BenchCel + ), + ) + await backend.move_to_teachpoint(0x1e, approach_height=20.0) + +The BenchCel does not provide a known command to read saved teachpoints back, +and PyLabRobot does not write any files on the host. If you need to keep the +exact numeric teachpoints you wrote, persist them in your own protocol/config. + +Using with Incubator resource state +----------------------------------- + +PyLabRobot models the BenchCel as a storage machine using +:class:`~pylabrobot.storage.incubator.Incubator`. The BenchCel firmware only +addresses whole stackers, so the rack resources are primarily for PLR-side state +tracking of expected plate order/content. The convenience factory creates the +backend plus four generic stacker racks. + +.. note:: + + A BenchCel stacker is, physically, a single-ended **LIFO stack of nesting + plates** -- i.e. a :class:`~pylabrobot.resources.resource_stack.ResourceStack` + (``direction="z"``), whose height is described by the per-plate + ``Plate.stacking_z_height``. It is modelled here as a + :class:`~pylabrobot.resources.carrier.PlateCarrier` + of fixed sites only because the :class:`~pylabrobot.storage.incubator.Incubator` + frontend is built around *random-access* racks. The ``site_pitch``-spaced + sites are cosmetic state and only the **stacker identity** is sent to the + device, so random-access site selection (e.g. ``find_random_site``) is not + honoured by the hardware -- it can only downstack/upstack at one end. A + ResourceStack-native model is a possible future improvement, pending a + storage-abstraction design discussion. + +.. important:: + + The BenchCel has **no fixed loading/unloading position** -- the transfer + point is a teachpoint you taught in VWorks (or with ``save_teachpoint``). + ``fetch_plate_to_loading_tray`` and ``take_in_plate`` therefore require a + teachpoint, either via ``loading_tray_teachpoint_id=`` on the factory/backend + or ``teachpoint_id=`` per call. They raise if none is configured, because an + unset/wrong teachpoint can send the arm to a home-like pose. + + The :class:`~pylabrobot.storage.incubator.Incubator` ``loading_tray`` resource + and its ``loading_tray_location`` ``Coordinate`` are **cosmetic** -- they are + only used for the PLR resource tree and visualization and do not drive any + motion. The physical transfer position is determined entirely by the + teachpoint ID on the device. PLR also models a single loading tray, so it + cannot represent transfers to several different teachpoints distinctly. + +.. code-block:: python + + from pylabrobot.resources.corning.plates import Cor_96_wellplate_360ul_Fb + from pylabrobot.storage.agilent import BenchCel4R, apply_benchcel_labware_settings + + plate = Cor_96_wellplate_360ul_Fb(name="plate_1") + settings = apply_benchcel_labware_settings(plate) + + benchcel = BenchCel4R( + name="benchcel", + host="192.168.0.100", + loading_tray_teachpoint_id=0x1e, + labware=settings, + stacker_num_sites=20, + # Provide your instrument's calibrated deck footprint for visualization. + size_x=0, + size_y=0, + size_z=0, + ) + + benchcel.racks[2][0].assign_child_resource(plate) # PLR state: stacker 3 + + await benchcel.setup() + fetched = await benchcel.fetch_plate_to_loading_tray("plate_1", teachpoint_id=0x1e) + await benchcel.take_in_plate(benchcel.racks[3][0], teachpoint_id=0x1e) # to stacker 4 + +Mock server +----------- + +The implementation includes an in-process mock server for tests and debugging. +It speaks the same framed protocol and can also be run as a small standalone TCP +server. + +.. code-block:: bash + + python -m pylabrobot.storage.agilent.benchcel_mock_server --port 7612 diff --git a/docs/user_guide/01_material-handling/storage/storage.rst b/docs/user_guide/01_material-handling/storage/storage.rst index 698d153b308..7b400799c3d 100644 --- a/docs/user_guide/01_material-handling/storage/storage.rst +++ b/docs/user_guide/01_material-handling/storage/storage.rst @@ -7,6 +7,7 @@ A storage machine is defined as a **machine whose primary feature is** Examples of this simplest form of a storage machine include: +- `Agilent BenchCel Microplate Handler `_ - open sequential stacker storage with robotic plate handling - `Agilent Labware MiniHub `_ - open storage of labware with rotation feature - `Lab Services PlateCarousel `_ - open storage of labware with rotation feature @@ -83,6 +84,7 @@ Retrieval Pattern: Stacking (Sequential) vs. Random Access - More flexible but mechanically complex. * - **Examples:** + - Agilent BenchCel 4R - Agilent Labware MiniHub - Lab Services PlateCarousel - **Examples:** @@ -108,6 +110,7 @@ Accessibility: Open vs. Closed Storage * - No protection from contamination or temperature drift. - Ideal for incubators, cold storage, and sterile handling. * - **Examples:** + - Agilent BenchCel 4R - Agilent Labware MiniHub - Manual stackers - **Examples:** @@ -124,7 +127,8 @@ Combined Retrieval & Access Summary - **Open Storage** - **Closed Storage** * - **Stacking Access (Sequential)** - - Agilent Labware MiniHub + - Agilent BenchCel 4R + Agilent Labware MiniHub Lab Services PlateCarousel - STX incubators with drawer-based shelves * - **Random Access** @@ -139,6 +143,7 @@ Combined Retrieval & Access Summary :maxdepth: 1 :hidden: + agilent_benchcel cytomat inheco/incubator_shaker inheco/scila diff --git a/pylabrobot/resources/resource_stack.py b/pylabrobot/resources/resource_stack.py index b66b4b87632..c526220f81f 100644 --- a/pylabrobot/resources/resource_stack.py +++ b/pylabrobot/resources/resource_stack.py @@ -16,6 +16,13 @@ class ResourceStack(Resource): back. Stacks growing in the z direction are from bottom to top, and function as the `stack data type `. + When stacking in the z direction, bare plates nest into one another: if a plate defines a + ``stacking_z_height`` (the vertical pitch it adds to a stack) and is placed directly on top of + another bare plate, it sinks in by ``size_z - stacking_z_height`` instead of resting at the + lower plate's full height. A stack of ``N`` identical such plates is therefore + ``size_z + (N - 1) * stacking_z_height`` tall. Plates without a ``stacking_z_height``, and plates + wearing a lid, do not nest. + Attributes: name: The name of the resource group. location: The location of the resource group. This will be the location of the first resource in @@ -84,22 +91,45 @@ def get_size_y(self) -> float: return sum(child.get_size_y() for child in self.children) return max(resource.get_size_y() for resource in self.children) + @staticmethod + def _actual_resource_height(resource: Resource) -> float: + """The height a resource occupies on its own, accounting for the lid nesting height if the + resource is a plate with a lid.""" + if isinstance(resource, Plate) and resource.lid is not None: + return resource.get_size_z() + resource.lid.get_size_z() - resource.lid.nesting_z_height + return resource.get_size_z() + + def _nesting_overlap(self, upper: Resource, lower: Optional[Resource]) -> float: + """How far ``upper`` sinks into ``lower`` when stacked in the z direction (``0`` if they do not + nest). Only a bare plate stacked on a bare plate with a known ``stacking_z_height`` nests; the + overlap is then ``size_z - stacking_z_height`` (i.e. the plate adds only its stacking pitch to + the stack instead of its full height).""" + if ( + self.direction == "z" + and isinstance(upper, Plate) + and upper.stacking_z_height is not None + and isinstance(lower, Plate) + and lower.lid is None + ): + return upper.get_size_z() - upper.stacking_z_height + return 0.0 + def get_size_z(self) -> float: """Get local size in the z direction.""" - def get_actual_resource_height(resource: Resource) -> float: - """Helper function to get the actual height of a resource, accounting for the lid nesting - height if the resource is a plate with a lid.""" - if isinstance(resource, Plate) and resource.lid is not None: - return resource.get_size_z() + resource.lid.get_size_z() - resource.lid.nesting_z_height - return resource.get_size_z() - if len(self.children) == 0: return 0 if self.direction != "z": - return max(get_actual_resource_height(child) for child in self.children) - return sum(get_actual_resource_height(child) for child in self.children) + return max(self._actual_resource_height(child) for child in self.children) + + # Sum bottom -> top, letting bare plates nest into one another by their stacking pitch. + total = 0.0 + prev: Optional[Resource] = None + for child in self.children: + total += self._actual_resource_height(child) - self._nesting_overlap(child, prev) + prev = child + return total def get_resource_stack_edge(self) -> Coordinate: if self.direction == "x": @@ -115,7 +145,9 @@ def get_resource_stack_edge(self) -> Coordinate: def get_new_child_location(self, resource: Resource) -> Coordinate: """Get the location where a new child resource should be placed in the stack.""" - return get_child_location(resource) + self.get_resource_stack_edge() + lower = self.children[-1] if len(self.children) > 0 else None + overlap = Coordinate(0, 0, self._nesting_overlap(resource, lower)) + return get_child_location(resource) + self.get_resource_stack_edge() - overlap def assign_child_resource( self, resource: Resource, location: Optional[Coordinate] = None, reassign: bool = True diff --git a/pylabrobot/resources/resource_stack_tests.py b/pylabrobot/resources/resource_stack_tests.py index 4a31fe60d0a..17083caf534 100644 --- a/pylabrobot/resources/resource_stack_tests.py +++ b/pylabrobot/resources/resource_stack_tests.py @@ -113,3 +113,59 @@ def test_get_absolute_location_stack_height(self): top_item = stacking_area.get_top_item() assert top_item is not None self.assertEqual(top_item.get_absolute_location(), Coordinate(0, 0, 1)) + + +class ResourceStackPlateNestingTests(unittest.TestCase): + """Bare plates with a known `stacking_z_height` should nest into one another in a z-stack.""" + + def _plate(self, name, stacking_z_height=None): + return Plate( + name, + size_x=10, + size_y=10, + size_z=10, + ordered_items={}, + stacking_z_height=stacking_z_height, + ) + + def test_without_stacking_z_height_no_nesting(self): + # backwards-compatible: plates without a stacking pitch stack at full size_z. + stack = ResourceStack("s", "z") + stack.location = Coordinate.zero() + stack.assign_child_resource(self._plate("p1")) + stack.assign_child_resource(self._plate("p2")) + self.assertEqual(stack.get_size_z(), 20) + self.assertEqual(stack.get_top_item().get_absolute_location(), Coordinate(0, 0, 10)) + + def test_two_plates_nest(self): + stack = ResourceStack("s", "z") + stack.location = Coordinate.zero() + stack.assign_child_resource(self._plate("p1", stacking_z_height=4)) + stack.assign_child_resource(self._plate("p2", stacking_z_height=4)) + # height = size_z + (N-1) * stacking_z_height = 10 + 4 + self.assertEqual(stack.get_size_z(), 14) + # second plate sinks into the first: base at the stacking pitch (4), not at full height (10). + self.assertEqual(stack.get_top_item().get_absolute_location(), Coordinate(0, 0, 4)) + + def test_three_plates_nest(self): + stack = ResourceStack("s", "z") + stack.location = Coordinate.zero() + for i in range(3): + stack.assign_child_resource(self._plate(f"p{i}", stacking_z_height=4)) + # height = 10 + 2 * 4 + self.assertEqual(stack.get_size_z(), 18) + self.assertEqual(stack.get_top_item().get_absolute_location(), Coordinate(0, 0, 8)) + + def test_no_nesting_onto_lidded_plate(self): + # a plate cannot nest into a plate that is wearing a lid. + lower = self._plate("lower", stacking_z_height=4) + lower.assign_child_resource( + Lid("lid", size_x=10, size_y=10, size_z=3, nesting_z_height=1), + location=Coordinate(0, 0, 0), + ) + stack = ResourceStack("s", "z") + stack.location = Coordinate.zero() + stack.assign_child_resource(lower) + stack.assign_child_resource(self._plate("upper", stacking_z_height=4)) + # lower occupies size_z + lid overhang = 10 + (3 - 1) = 12; upper sits on top at full height. + self.assertEqual(stack.get_top_item().get_absolute_location(), Coordinate(0, 0, 12)) diff --git a/pylabrobot/storage/__init__.py b/pylabrobot/storage/__init__.py index 3ccfc9cd4de..844f4cd1db2 100644 --- a/pylabrobot/storage/__init__.py +++ b/pylabrobot/storage/__init__.py @@ -1,3 +1,9 @@ +from .agilent import ( + BenchCel4R, + BenchCel4RBackend, + BenchCelBackend, + BenchCelLabwareSettings, +) from .backend import IncubatorBackend from .chatterbox import IncubatorChatterboxBackend from .cytomat import CytomatBackend diff --git a/pylabrobot/storage/agilent/__init__.py b/pylabrobot/storage/agilent/__init__.py new file mode 100644 index 00000000000..aacbc6341cd --- /dev/null +++ b/pylabrobot/storage/agilent/__init__.py @@ -0,0 +1,35 @@ +from .benchcel import BenchCel4R +from .benchcel_backend import ( + AXIS_GRIPPER, + AXIS_NAMES, + AXIS_THETA, + AXIS_X, + AXIS_Z, + RIGHT_TEACHPOINT_ID, + TEST_LEFT_TEACHPOINT, + TEST_LEFT_TEACHPOINT_ID, + AxisBoundsResponse, + BenchCel4RBackend, + BenchCelBackend, + BenchCelDeviceError, + BenchCelProtocolError, + BenchCelTimeoutError, + CurrentPositionResponse, + Frame, + GeneralStatus, + SensorStatus, + Teachpoint, +) +from .benchcel_labware import ( + DEVICE_PAYLOAD_LENGTH, + BenchCelLabwareSettings, + PlateNotchSettings, + apply_benchcel_labware_settings, + benchcel_labware_summary_row, + calculate_benchcel_labware_settings, + calculate_robot_gripper_offset, + calculate_sensor_offset, + calculate_stacker_gripper_offset, + calculate_stacking_thickness, +) +from .racks import benchcel_4r_racks, benchcel_4r_racks_for_labware, benchcel_stacker_rack diff --git a/pylabrobot/storage/agilent/benchcel.py b/pylabrobot/storage/agilent/benchcel.py new file mode 100644 index 00000000000..57d0dd1304b --- /dev/null +++ b/pylabrobot/storage/agilent/benchcel.py @@ -0,0 +1,124 @@ +"""Factory for the Agilent BenchCel 4R storage device.""" + +from __future__ import annotations + +from typing import Optional, Union + +from pylabrobot.resources import Coordinate, Plate, PlateCarrier +from pylabrobot.storage.incubator import Incubator + +from .benchcel_backend import BenchCel4RBackend +from .benchcel_labware import BenchCelLabwareSettings, resolve_benchcel_labware_settings +from .racks import benchcel_4r_racks, benchcel_4r_racks_for_labware + + +def BenchCel4R( + name: str, + host: str, + *, + port: int = BenchCel4RBackend.DEFAULT_PORT, + timeout: float = 30.0, + read_poll_timeout: float = 0.25, + loading_tray_teachpoint_id: Optional[int] = None, + source_ip: Optional[str] = None, + backend: Optional[BenchCel4RBackend] = None, + racks: Optional[list[PlateCarrier]] = None, + labware: Optional[Union[Plate, BenchCelLabwareSettings, dict]] = None, + stacker_num_sites: int = 20, + stacker_site_pitch: Optional[float] = None, + stacker_site_height: Optional[float] = None, + loading_tray_location: Optional[Coordinate] = None, + size_x: float = 0.0, + size_y: float = 0.0, + size_z: float = 0.0, +) -> Incubator: + """Construct an Agilent BenchCel 4R as a PLR storage machine. + + The BenchCel protocol addresses whole stackers rather than individual storage + slots. The generated rack resources therefore represent PLR-side state tracking + of expected plate order/content. + + If ``labware`` is supplied as a PLR plate or calculated settings object, the + generated racks use the calculated stack pitch (``StackingThickness``) and + full plate height. + + Args: + name: Resource name for the returned :class:`~pylabrobot.storage.Incubator`. + Like all PLR resources, the BenchCel needs a unique name for the resource + tree, serialization, and lookups (consistent with other device factories + such as ``MicroSpin(...)``). + host: IP address or DNS name of the BenchCel Ethernet interface. + loading_tray_teachpoint_id: Teachpoint target ID used as the transfer point + by ``fetch_plate_to_loading_tray``/``take_in_plate``. The BenchCel has no + fixed loading position; this must be a teachpoint taught on the device. + Transfers raise unless it is set here or passed per call. There is no + default because an unset/wrong teachpoint can send the arm to a home-like + pose. + loading_tray_location: Cosmetic only. This is the ``Coordinate`` of the PLR + ``Incubator.loading_tray`` resource used for the resource tree and + visualization. It does NOT drive any motion -- the real transfer position + is determined entirely by ``loading_tray_teachpoint_id`` on the device. + """ + labware_settings = resolve_benchcel_labware_settings(labware) if labware is not None else None + + if backend is None: + backend = BenchCel4RBackend( + host=host, + port=port, + timeout=timeout, + read_poll_timeout=read_poll_timeout, + loading_tray_teachpoint_id=loading_tray_teachpoint_id, + source_ip=source_ip, + labware=labware_settings, + ) + elif labware_settings is not None: + existing_labware = backend.labware_settings + if existing_labware is not None and existing_labware != labware_settings: + raise ValueError( + "BenchCel4R backend labware " + f"{existing_labware.name!r} does not match factory labware " + f"{labware_settings.name!r}" + ) + backend.labware_settings = labware_settings + + if labware_settings is None: + labware_settings = backend.labware_settings + + if racks is None: + if labware_settings is not None: + if stacker_site_pitch is None and stacker_site_height is None: + racks = benchcel_4r_racks_for_labware( + labware_settings, + num_sites=stacker_num_sites, + ) + else: + racks = benchcel_4r_racks( + num_sites=stacker_num_sites, + site_pitch=( + labware_settings.stacking_thickness + if stacker_site_pitch is None + else stacker_site_pitch + ), + site_height=( + labware_settings.effective_plate_height() + if stacker_site_height is None + else stacker_site_height + ), + ) + else: + racks = benchcel_4r_racks( + num_sites=stacker_num_sites, + site_pitch=14.5 if stacker_site_pitch is None else stacker_site_pitch, + site_height=9.0 if stacker_site_height is None else stacker_site_height, + ) + + return Incubator( + backend=backend, + name=name, + size_x=size_x, + size_y=size_y, + size_z=size_z, + racks=racks, + loading_tray_location=loading_tray_location or Coordinate.zero(), + model="Agilent BenchCel 4R", + ) diff --git a/pylabrobot/storage/agilent/benchcel_backend.py b/pylabrobot/storage/agilent/benchcel_backend.py new file mode 100644 index 00000000000..af21708ee16 --- /dev/null +++ b/pylabrobot/storage/agilent/benchcel_backend.py @@ -0,0 +1,1303 @@ +"""Backend for the Agilent BenchCel 4R microplate handler. + +The BenchCel exposes a binary TCP protocol on port 7612 by default. This module +implements the command set reverse-engineered from Agilent VWorks packet +captures and live tests against a BenchCel / 4-stacker system running firmware +3.2.20.0. + +This is NOT vendor protocol documentation. The public Agilent Quick Guide +(G5400-90003) documents operation, safety, stacker clamps/shelves, labware +requirements, and diagnostic workflows, but not the Ethernet wire protocol. + +Protocol summary +---------------- +Every application frame observed so far has the shape:: + + [1 byte command_id][2 byte little-endian payload_length][payload] + +Host -> BenchCel commands implemented here: + +* ``0x47`` home motors. Live tests showed this drops the TCP session while the + device homes, then accepts connections again when homing is complete. +* ``0x48`` home. +* ``0x60`` / ``0x61`` stacker load / unload. These operate the stacker + mechanism, not the robot grippers, and are what the VWorks "Load"/"Unload" + buttons emit (confirmed from captures). ``0x60`` payload is ``01 ``; + ``0x61`` payload is ``01 00 00 00 00``. Their ``0x69`` ACK echoes the + command id and the stacker index, e.g. ``60 `` / ``61 ``. +* ``0x62`` / ``0x63`` pick/downstack and place/upstack for stackers or + teachpoints. Stackers are target IDs ``0x00``..``0x03``. Captures confirm the + VWorks "Downstack" button emits a single ``0x62`` and "Upstack" a single + ``0x63``, each with payload ``01 00 01`` (so ``0x62``/``0x63`` ARE + the VWorks downstack/upstack tasks when the target is a stacker; with a + teachpoint target they are a plain pick/place at that taught position). +* ``0x65`` move to stacker/teachpoint. +* ``0x66`` relative jog: axis 0 theta, 1 X, 2 Z, 3 robot gripper. +* ``0x67`` open/close pneumatic stacker grippers/clamps. These are diagnostics + and can drop plates if used when a stack is unsupported. +* ``0x6a`` full-open/full-close robot grippers. +* ``0x73`` save teachpoint. Captures did not show a command-specific ACK. +* ``0x7e`` stacker sensor query. +* ``0x85`` current-position read. Live tests showed the selector is ignored; + this is not a stored teachpoint reader. +* ``0x87`` general/arm status query. The decoded float32 fields are theta, X, + Z, and robot gripper position at offsets 4, 12, 20, and 28. +* ``0x99`` axis bounds query. Live tests decoded this as theta/X/Z/gripper + min/max limits, not as stored teachpoint data. + +Device errors are returned as ``0x02`` frames containing an ASCII message, for +example ``"X position out of bounds"``. Successful motion commands return a +``0x69`` ACK after motion is complete. Plate load/unload ACKs include the +stacker index in addition to the command ID. + +Safety +------ +Keep the robot and stacker area clear, make sure E-stop/power-off is available, +and ensure VWorks or any other control client is disconnected before using this +backend. The BenchCel appears to allow only one effective control client at a +time; if another client owns the session, connections may be accepted and then +immediately closed. + +Manual notes from Agilent G5400-90003A that matter for automation: + +* The pendant has a red robot-disable button that cuts power to the motors. +* Compressed air drives the stacker-head mechanisms; air must be on for normal + operation and for rack install/removal workflows. +* Stacker clamps/grippers hold/release the bottom plate at the rack base. Opening + clamps can release/drop a plate stack. The manual says clamps normally open and + close automatically during loading, unloading, downstacking, and stacking; use + manual open/close only for diagnostics/recovery. +* Stacker shelves temporarily support/level the stack during downstack/upstack. + Retracting shelves can drop plates. We have not exposed a shelf command because + the captured command is not yet confidently mapped. +* Labware should be ANSI/SBS-compatible. The BenchCel typically grips plates + about 5-10 mm above the bottom, between the top of the plate and the skirt. + Deep lids/flexible skirts can be problematic. +""" + +from __future__ import annotations + +import asyncio +import dataclasses +import logging +import struct +from typing import Callable, List, Optional, Tuple, Union + +from pylabrobot.io.socket import Socket +from pylabrobot.resources import Plate, PlateCarrier, PlateHolder +from pylabrobot.storage.backend import IncubatorBackend + +from .benchcel_labware import BenchCelLabwareSettings, resolve_benchcel_labware_settings + +logger = logging.getLogger(__name__) + +# Stackers use target IDs 0x00..0x03 (same as zero-based stacker index). The +# right teachpoint captured from VWorks used 0x1e. Live tests confirmed that +# teachpoint slots can be written standalone with command 0x73; an undefined +# teachpoint slot may instead send the arm to a home-like position. +RIGHT_TEACHPOINT_ID = 0x1E +TEST_LEFT_TEACHPOINT_ID = 0x1F + + +class BenchCelProtocolError(RuntimeError): + """Raised when the BenchCel sends malformed or unexpected protocol data.""" + + +class BenchCelTimeoutError(TimeoutError): + """Raised when an expected BenchCel frame is not received in time.""" + + +class BenchCelDeviceError(RuntimeError): + """Raised when the BenchCel returns a ``0x02`` error frame. + + Attributes: + message: The decoded ASCII error string returned by the BenchCel. + frame: The raw error frame. + """ + + def __init__(self, message: str, frame: "Frame") -> None: + super().__init__(message) + self.message = message + self.frame = frame + + +@dataclasses.dataclass(frozen=True) +class Frame: + """One application-level BenchCel protocol frame.""" + + command_id: int + payload: bytes = b"" + + @property + def length(self) -> int: + return len(self.payload) + + def to_bytes(self) -> bytes: + """Serialize frame as ``[cmd][uint16le length][payload]``.""" + return make_frame(self.command_id, self.payload) + + def hex(self) -> str: + """Return full serialized frame bytes as lowercase hex.""" + return self.to_bytes().hex() + + def __str__(self) -> str: + return f"Frame(cmd=0x{self.command_id:02x}, len={self.length}, payload={self.payload.hex()})" + + +@dataclasses.dataclass(frozen=True) +class SensorStatus: + """Decoded ``0x7e`` stacker sensor/status response. + + Fields are named according to the reverse-engineered interpretation. The four + notch sensor names A-D are arbitrary labels until the physical sensor positions + are mapped. ``plate_presence`` is analog-ish; observed empty stackers were + around 0-1 and stackers with plates around 116-129. + """ + + stacker: int + stacker_index: int + constant_08: int + air_pressure: int + notch_sensor_a: int + notch_sensor_b: int + unknown_a: int + plate_presence: int + unknown_b: int + notch_sensor_c: int + notch_sensor_d: int + raw_payload: bytes + + def plate_present(self, threshold: int = 50) -> bool: + """Return a rough plate-present boolean from the analog presence value.""" + return self.plate_presence >= threshold + + def notch_values(self) -> Tuple[int, int, int, int]: + """Return the four binary notch sensor fields as currently mapped.""" + return ( + self.notch_sensor_a, + self.notch_sensor_b, + self.notch_sensor_c, + self.notch_sensor_d, + ) + + +@dataclasses.dataclass(frozen=True) +class ArmStatus: + """Partially decoded ``0x87`` general status response. + + The remaining bytes are still unknown and preserved in ``raw_payload``. + """ + + theta: float + x: float + z: float + gripper: float + raw_payload: bytes + + +@dataclasses.dataclass(frozen=True) +class GeneralStatus: + """``0x87`` general status response.""" + + raw_payload: bytes + arm_status: Optional[ArmStatus] = None + + +@dataclasses.dataclass(frozen=True) +class Teachpoint: + """Numeric teachpoint data for command ``0x73``. + + The human-readable name is metadata only and is not serialized. It did not + appear in the VWorks packet captures. + """ + + theta: float + x: float + z: float + approach_height: float + cavity_depth: float + gripper_open_limit: float + respect_approach_height_when_not_holding_plate: bool + something_above_this_point: bool + teachpoint_id: int = TEST_LEFT_TEACHPOINT_ID + name: Optional[str] = None + + +TEST_LEFT_TEACHPOINT = Teachpoint( + theta=89.99874114990234, + x=-360.8802795410156, + z=-10.0, + approach_height=20.0, + cavity_depth=0.0, + gripper_open_limit=-1.5, + respect_approach_height_when_not_holding_plate=True, + something_above_this_point=False, + teachpoint_id=TEST_LEFT_TEACHPOINT_ID, + name="test-left", +) + + +@dataclasses.dataclass(frozen=True) +class AxisBoundsResponse: + """Decoded response to command ``0x99`` (axis min/max travel limits).""" + + theta_min: float + x_min: float + z_min: float + gripper_min: float + theta_max: float + x_max: float + z_max: float + gripper_max: float + raw_payload: bytes + float_values: Tuple[float, ...] + + +@dataclasses.dataclass(frozen=True) +class CurrentPositionResponse: + """Response to command ``0x85``. + + Live tests showed the selector byte is ignored and the response is the current + arm position/config payload, not a stored teachpoint reader. Raw bytes are + preserved because only the status ``0x87`` layout is currently decoded. + """ + + selector: int + raw_payload: bytes + + +# --------------------------------------------------------------------------- +# Low-level protocol helpers +# --------------------------------------------------------------------------- + + +def make_frame(command_id: int, payload: bytes = b"") -> bytes: + """Build a BenchCel protocol frame.""" + if not 0 <= command_id <= 0xFF: + raise ValueError(f"command_id must fit in one byte, got {command_id!r}") + if len(payload) > 0xFFFF: + raise ValueError(f"payload too large: {len(payload)} bytes") + return bytes([command_id]) + len(payload).to_bytes(2, "little") + payload + + +def parse_frame_from_buffer(buffer: bytearray) -> Optional[Frame]: + """Parse one complete frame from the front of ``buffer``, if available. + + TCP packet boundaries are not protocol boundaries. This function removes one + full frame from ``buffer`` only when the complete payload is available. + """ + if len(buffer) < 3: + return None + command_id = buffer[0] + length = int.from_bytes(buffer[1:3], "little") + total = 3 + length + if len(buffer) < total: + return None + payload = bytes(buffer[3:total]) + del buffer[:total] + return Frame(command_id, payload) + + +def split_frames(data: bytes) -> List[Frame]: + """Split a byte string containing one or more complete frames.""" + buffer = bytearray(data) + frames: List[Frame] = [] + while buffer: + frame = parse_frame_from_buffer(buffer) + if frame is None: + raise BenchCelProtocolError(f"partial/truncated frame data: {bytes(buffer).hex()}") + frames.append(frame) + return frames + + +def _u16le(payload: bytes, offset: int) -> int: + return int.from_bytes(payload[offset : offset + 2], "little") + + +def _f32le(payload: bytes, offset: int) -> float: + return float(struct.unpack(" int: + """Validate human 1-based stacker number and return zero-based protocol index.""" + if stacker not in (1, 2, 3, 4): + raise ValueError(f"stacker must be 1, 2, 3, or 4; got {stacker!r}") + return stacker - 1 + + +def _target_id(target_id: int) -> int: + if not 0 <= target_id <= 0xFF: + raise ValueError(f"target_id must fit in one byte, got {target_id!r}") + return target_id + + +# Command IDs from VWorks captures/live tests. The backend methods construct +# frames directly, using private payload helpers for nontrivial binary layouts. +CMD_ERROR = 0x02 +CMD_HOME_MOTORS = 0x47 +CMD_HOME = 0x48 +CMD_LOAD_PLATE = 0x60 +CMD_UNLOAD_PLATE = 0x61 +CMD_PICK = 0x62 +CMD_PLACE = 0x63 +CMD_MOVE_TO_TARGET = 0x65 +CMD_JOG = 0x66 +CMD_STACKER_GRIPPER = 0x67 +CMD_ROBOT_GRIPPER = 0x6A +CMD_ACK = 0x69 +CMD_SAVE_TEACHPOINT = 0x73 +CMD_SET_LABWARE = 0x7D +CMD_SENSOR_STATUS = 0x7E +CMD_CURRENT_POSITION = 0x85 +CMD_GENERAL_STATUS = 0x87 +CMD_SETTINGS_COMMIT = 0x9F +CMD_AXIS_BOUNDS = 0x99 + + +def _target_payload(target_id: int) -> bytes: + """Shared payload for pick/place target commands.""" + return bytes([0x01, _target_id(target_id), 0x00, 0x01]) + + +def _move_to_target_payload(target_id: int, approach_height: float) -> bytes: + """Payload for command ``0x65`` (move to stacker/teachpoint target).""" + return struct.pack(" bytes: + """Payload for command ``0x73`` (save teachpoint).""" + if not 0 <= teachpoint.teachpoint_id <= 0xFF: + raise ValueError(f"teachpoint_id must fit in one byte, got {teachpoint.teachpoint_id!r}") + return struct.pack( + " str: + """Parse a ``0x02`` device error frame and return the ASCII message.""" + if frame.command_id != CMD_ERROR: + raise BenchCelProtocolError(f"not an error frame: {frame}") + return frame.payload.decode("ascii", errors="replace") + + +def parse_ack_frame(frame: Frame) -> int: + """Parse a standard ``0x69`` command completion ACK and return the command id.""" + if frame.command_id != CMD_ACK or len(frame.payload) < 1: + raise BenchCelProtocolError(f"not an ACK frame: {frame}") + return frame.payload[0] + + +def parse_sensor_response(frame: Frame) -> SensorStatus: + """Parse a ``0x7e`` / 18-byte stacker sensor response.""" + if frame.command_id != CMD_SENSOR_STATUS: + raise BenchCelProtocolError(f"expected 0x7e sensor frame, got {frame}") + if len(frame.payload) != 18: + raise BenchCelProtocolError( + f"expected 18-byte sensor payload, got {len(frame.payload)}: {frame}" + ) + + p = frame.payload + stacker_index = p[0] + if stacker_index not in (0, 1, 2, 3): + raise BenchCelProtocolError(f"unexpected stacker index in sensor payload: {stacker_index}") + + return SensorStatus( + stacker=stacker_index + 1, + stacker_index=stacker_index, + constant_08=p[1], + air_pressure=_u16le(p, 2), + notch_sensor_a=_u16le(p, 4), + notch_sensor_b=_u16le(p, 6), + unknown_a=_u16le(p, 8), + plate_presence=_u16le(p, 10), + unknown_b=_u16le(p, 12), + notch_sensor_c=_u16le(p, 14), + notch_sensor_d=_u16le(p, 16), + raw_payload=p, + ) + + +def parse_arm_status_from_87_payload(payload: bytes) -> ArmStatus: + """Decode known arm-position fields from a 66-byte ``0x87`` payload.""" + if len(payload) != 66: + raise BenchCelProtocolError(f"expected 66-byte 0x87 payload, got {len(payload)} bytes") + return ArmStatus( + theta=_f32le(payload, 4), + x=_f32le(payload, 12), + z=_f32le(payload, 20), + gripper=_f32le(payload, 28), + raw_payload=payload, + ) + + +def parse_general_status_response(frame: Frame) -> GeneralStatus: + """Parse the ``0x87`` general status response.""" + if frame.command_id != CMD_GENERAL_STATUS: + raise BenchCelProtocolError(f"expected 0x87 general status frame, got {frame}") + arm_status = parse_arm_status_from_87_payload(frame.payload) if len(frame.payload) == 66 else None + return GeneralStatus(raw_payload=frame.payload, arm_status=arm_status) + + +def parse_axis_bounds_response(frame: Frame) -> AxisBoundsResponse: + """Parse ``0x99`` response into per-axis min/max travel limits.""" + if frame.command_id != CMD_AXIS_BOUNDS: + raise BenchCelProtocolError(f"expected 0x99 axis bounds response, got {frame}") + if len(frame.payload) != 32: + raise BenchCelProtocolError(f"expected 32-byte 0x99 payload, got {len(frame.payload)}") + f = struct.unpack("<8f", frame.payload) + return AxisBoundsResponse( + theta_min=f[0], + x_min=f[1], + z_min=f[2], + gripper_min=f[3], + theta_max=f[4], + x_max=f[5], + z_max=f[6], + gripper_max=f[7], + raw_payload=frame.payload, + float_values=f, + ) + + +def parse_current_position_response(frame: Frame, *, selector: int = 1) -> CurrentPositionResponse: + """Preserve the observed ``0x85`` response as raw bytes.""" + if frame.command_id != CMD_CURRENT_POSITION: + raise BenchCelProtocolError(f"expected 0x85 current-position response, got {frame}") + return CurrentPositionResponse(selector=selector, raw_payload=frame.payload) + + +# --------------------------------------------------------------------------- +# Backend +# --------------------------------------------------------------------------- + + +class _BenchCelSocket(Socket): + """Socket variant that can bind to a specific local/source IP.""" + + def __init__(self, *args, source_ip: Optional[str] = None, **kwargs): + super().__init__(*args, **kwargs) + self._source_ip = source_ip + + async def _connect(self): + local_addr = (self._source_ip, 0) if self._source_ip is not None else None + self._reader, self._writer = await asyncio.open_connection( + host=self._host, + port=self._port, + ssl=self._ssl_context, + server_hostname=self._server_hostname, + local_addr=local_addr, + ) + + +class BenchCel4RBackend(IncubatorBackend): + """Asynchronous backend for an Agilent BenchCel 4R microplate handler.""" + + DEFAULT_PORT = 7612 + NUM_STACKERS = 4 + + def __init__( + self, + host: str, + port: int = DEFAULT_PORT, + timeout: float = 30.0, + read_poll_timeout: float = 0.25, + loading_tray_teachpoint_id: Optional[int] = None, + source_ip: Optional[str] = None, + labware: Optional[Union[Plate, BenchCelLabwareSettings, dict]] = None, + ): + """ + Args: + host: IP address or DNS name of the BenchCel Ethernet interface. + port: TCP port. Defaults to 7612, as observed in VWorks captures. + timeout: Default command timeout in seconds. + read_poll_timeout: Per-read timeout used while assembling framed replies. + loading_tray_teachpoint_id: Teachpoint target ID used as the transfer + (loading/unloading) point by :meth:`fetch_plate_to_loading_tray` and + :meth:`take_in_plate`. There is no fixed loading position on the + BenchCel: the transfer point is a teachpoint you taught in VWorks (or + with :meth:`save_teachpoint`). This is intentionally not defaulted -- + transfers raise unless a teachpoint is configured here or passed per + call via ``teachpoint_id``, because an unset/wrong teachpoint can send + the arm to a home-like pose. The captured VWorks right teachpoint was + ``0x1e``, but do not rely on that without verifying it on your device. + source_ip: Optional local/source IP to bind, useful on hosts with multiple + network interfaces connected to different subnets. + labware: Optional PLR plate, calculated settings object, or serialized + settings dict. The device must still be configured with matching VWorks + labware settings; this value is used for PLR metadata, serialization, + and validation. + """ + super().__init__() + self.host = host + self.port = port + self.timeout = timeout + self.read_poll_timeout = read_poll_timeout + self.loading_tray_teachpoint_id = loading_tray_teachpoint_id + self.source_ip = source_ip + self.labware_settings = ( + resolve_benchcel_labware_settings(labware) if labware is not None else None + ) + self.io = _BenchCelSocket( + human_readable_device_name="Agilent BenchCel 4R", + host=host, + port=port, + read_timeout=read_poll_timeout, + write_timeout=timeout, + source_ip=source_ip, + ) + self._lock = asyncio.Lock() + self._rx_buffer = bytearray() + + async def setup(self) -> None: + """Open the TCP connection to the BenchCel.""" + logger.debug("[benchcel] connecting to %s:%d", self.host, self.port) + await asyncio.wait_for(self.io.setup(), timeout=self.timeout) + + async def stop(self) -> None: + """Close the TCP connection. Safe to call even if never set up.""" + await self.io.stop() + self._rx_buffer.clear() + + def serialize(self) -> dict: + """Return a JSON-serialisable view of this backend's construction args.""" + return { + **super().serialize(), + "host": self.host, + "port": self.port, + "timeout": self.timeout, + "read_poll_timeout": self.read_poll_timeout, + "loading_tray_teachpoint_id": self.loading_tray_teachpoint_id, + "source_ip": self.source_ip, + "labware": self.labware_settings.to_dict() if self.labware_settings is not None else None, + } + + # ------------------------------------------------------------------ wire IO + + async def _write_frame(self, frame: Frame, *, timeout: Optional[float] = None) -> None: + data = frame.to_bytes() + logger.debug("[benchcel] >>> %s raw=%s", frame, data.hex()) + await self.io.write(data, timeout=self.timeout if timeout is None else timeout) + + async def _read_frame(self, *, timeout: Optional[float] = None) -> Frame: + effective_timeout = self.timeout if timeout is None else timeout + loop = asyncio.get_running_loop() + deadline = loop.time() + effective_timeout + + while True: + frame = parse_frame_from_buffer(self._rx_buffer) + if frame is not None: + logger.debug("[benchcel] <<< %s raw=%s", frame, frame.hex()) + return frame + + remaining = deadline - loop.time() + if remaining <= 0: + raise BenchCelTimeoutError(f"timed out after {effective_timeout}s waiting for frame") + + try: + chunk = await self.io.read(4096, timeout=min(self.read_poll_timeout, remaining)) + except TimeoutError: + continue + if not chunk: + raise BenchCelProtocolError("socket closed while waiting for frame") + logger.debug("[benchcel] <<< chunk %d bytes: %s", len(chunk), chunk.hex()) + self._rx_buffer.extend(chunk) + + async def _read_until( + self, + predicate: Callable[[Frame], bool], + *, + timeout: Optional[float] = None, + ) -> Frame: + effective_timeout = self.timeout if timeout is None else timeout + loop = asyncio.get_running_loop() + deadline = loop.time() + effective_timeout + while True: + remaining = deadline - loop.time() + if remaining <= 0: + raise BenchCelTimeoutError( + f"timed out after {effective_timeout}s waiting for matching frame" + ) + frame = await self._read_frame(timeout=remaining) + if frame.command_id == CMD_ERROR: + raise BenchCelDeviceError(parse_error_frame(frame), frame) + if predicate(frame): + return frame + + async def _wait_for_ack_payload( + self, + ack_payload: bytes, + *, + timeout: Optional[float] = None, + ) -> Frame: + """Wait for an exact ``0x69`` ACK payload or raise on device error.""" + return await self._read_until( + lambda f: f.command_id == CMD_ACK and f.payload == ack_payload, + timeout=timeout, + ) + + async def _wait_for_command_ack( + self, + command_id: int, + *, + timeout: Optional[float] = None, + ) -> Frame: + return await self._wait_for_ack_payload(bytes([command_id]), timeout=timeout) + + async def _send_frame_expect_ack_no_lock( + self, + frame: Frame, + *, + ack_payload: Optional[bytes] = None, + timeout: Optional[float] = None, + ) -> Frame: + await self._write_frame(frame, timeout=timeout) + if ack_payload is None: + return await self._wait_for_command_ack(frame.command_id, timeout=timeout) + return await self._wait_for_ack_payload(ack_payload, timeout=timeout) + + async def send_frame( + self, + frame: Frame, + *, + ack_payload: Optional[bytes] = None, + timeout: Optional[float] = None, + ) -> Frame: + """Send one frame and wait for its completion ACK. + + Most motion commands ACK with ``69 01 00 ``. Plate load/unload + ACKs include the stacker index too; pass that exact payload via + ``ack_payload`` for those commands. + """ + async with self._lock: + return await self._send_frame_expect_ack_no_lock( + frame, + ack_payload=ack_payload, + timeout=timeout, + ) + + # --------------------------------------------------------------- movements + + async def home_motors(self, *, timeout: float = 90.0, reconnect: bool = True) -> bool: + """Send VWorks ``home motors`` (``0x47``) and wait for the device to recover. + + Live testing showed this command drops the TCP control session while the + motors home. If ``reconnect=True`` (default), this method reconnects and + polls ``0x87`` status until the device responds again. + """ + async with self._lock: + cmd = Frame(CMD_HOME_MOTORS, b"\x01") + try: + await self._write_frame(cmd, timeout=min(self.timeout, 5.0)) + except OSError: + pass + + try: + await self._wait_for_command_ack(cmd.command_id, timeout=5.0) + if not reconnect: + return True + except (BenchCelProtocolError, BenchCelTimeoutError, OSError, ConnectionError): + pass + + if not reconnect: + return True + + loop = asyncio.get_running_loop() + deadline = loop.time() + timeout + while loop.time() < deadline: + try: + await self.io.stop() + except Exception: # pragma: no cover - defensive cleanup + pass + self._rx_buffer.clear() + try: + await asyncio.wait_for(self.io.setup(), timeout=min(self.timeout, 5.0)) + await self._write_frame(Frame(CMD_GENERAL_STATUS), timeout=2.0) + await self._read_until(lambda f: f.command_id == CMD_GENERAL_STATUS, timeout=2.0) + return True + except ( + BenchCelProtocolError, + BenchCelTimeoutError, + TimeoutError, + OSError, + ConnectionError, + asyncio.TimeoutError, + ): + await asyncio.sleep(2.0) + raise BenchCelTimeoutError(f"home-motors: device not responsive within {timeout}s") + + async def home(self, *, timeout: float = 15.0) -> Frame: + """Send the home command and wait for completion.""" + return await self.send_frame(Frame(CMD_HOME, b"\x01"), timeout=timeout) + + async def move_to_stacker(self, stacker: int, *, timeout: float = 20.0) -> Frame: + """Move the arm to stacker 1, 2, 3, or 4.""" + return await self.move_to_target(_stacker_index(stacker), timeout=timeout) + + async def move_to_target( + self, + target_id: int, + *, + approach_height: float = 0.0, + timeout: float = 20.0, + ) -> Frame: + """Move to a one-byte BenchCel target id using command ``0x65``.""" + return await self.send_frame( + Frame(CMD_MOVE_TO_TARGET, _move_to_target_payload(target_id, approach_height)), + timeout=timeout, + ) + + async def move_to_teachpoint( + self, + teachpoint_id: int, + *, + approach_height: float = 20.0, + timeout: float = 20.0, + ) -> Frame: + """Move to a teachpoint target id using command ``0x65``.""" + return await self.move_to_target( + teachpoint_id, + approach_height=approach_height, + timeout=timeout, + ) + + async def move_to_right_teachpoint( + self, + *, + approach_height: float = 20.0, + timeout: float = 20.0, + ) -> Frame: + """Move to the captured right teachpoint target id ``0x1e``.""" + return await self.move_to_teachpoint( + RIGHT_TEACHPOINT_ID, + approach_height=approach_height, + timeout=timeout, + ) + + async def pick_plate_from_target( + self, + target_id: int, + *, + timeout: float = 30.0, + ) -> Frame: + """Pick/downstack a plate from a target id using command ``0x62``.""" + return await self.send_frame( + Frame(CMD_PICK, _target_payload(target_id)), + timeout=timeout, + ) + + async def place_plate_at_target( + self, + target_id: int, + *, + timeout: float = 30.0, + ) -> Frame: + """Place/upstack a plate at a target id using command ``0x63``.""" + return await self.send_frame( + Frame(CMD_PLACE, _target_payload(target_id)), + timeout=timeout, + ) + + async def pick_plate_from_teachpoint( + self, + teachpoint_id: int, + *, + timeout: float = 30.0, + ) -> Frame: + """Pick a plate from a teachpoint target id using command ``0x62``.""" + return await self.pick_plate_from_target(teachpoint_id, timeout=timeout) + + async def place_plate_at_teachpoint( + self, + teachpoint_id: int, + *, + timeout: float = 30.0, + ) -> Frame: + """Place a plate at a teachpoint target id using command ``0x63``.""" + return await self.place_plate_at_target(teachpoint_id, timeout=timeout) + + async def pick_plate_from_right_teachpoint( + self, + *, + timeout: float = 30.0, + ) -> Frame: + """Pick a plate from the captured right teachpoint target id ``0x1e``.""" + return await self.pick_plate_from_teachpoint( + RIGHT_TEACHPOINT_ID, + timeout=timeout, + ) + + async def place_plate_at_right_teachpoint( + self, + *, + timeout: float = 30.0, + ) -> Frame: + """Place a plate at the captured right teachpoint target id ``0x1e``.""" + return await self.place_plate_at_teachpoint( + RIGHT_TEACHPOINT_ID, + timeout=timeout, + ) + + async def load_stacker(self, stacker: int, *, timeout: float = 30.0) -> Frame: + """Send the ``0x60`` stacker load command for stacker 1-4. + + Confirmed from VWorks captures: pressing "Load" emits a single ``0x60`` with + payload ``01 `` and the device replies ``0x69`` ``60 ``. + This operates the whole-stacker mechanism, not the robot grippers, and is + distinct from :meth:`downstack_plate`/:meth:`upstack_plate` (the + ``0x62``/``0x63`` per-plate robot pick/place). + """ + stacker_index = _stacker_index(stacker) + cmd = Frame(CMD_LOAD_PLATE, bytes([0x01, stacker_index])) + return await self.send_frame( + cmd, + ack_payload=bytes([cmd.command_id, stacker_index]), + timeout=timeout, + ) + + async def unload_stacker(self, stacker: int, *, timeout: float = 30.0) -> Frame: + """Send the ``0x61`` stacker unload command for stacker 1-4. + + Confirmed from VWorks captures: pressing "Unload" emits a single ``0x61`` + with payload ``01 00 00 00 00`` and the device replies + ``0x69`` ``61 ``. Like :meth:`load_stacker` this drives the + whole-stacker mechanism, not the robot grippers. + """ + stacker_index = _stacker_index(stacker) + cmd = Frame(CMD_UNLOAD_PLATE, bytes([0x01, stacker_index]) + b"\x00\x00\x00\x00") + return await self.send_frame( + cmd, + ack_payload=bytes([cmd.command_id, stacker_index]), + timeout=timeout, + ) + + async def dangerously_open_stacker_grippers( + self, + stacker: int, + *, + timeout: float = 15.0, + ) -> Frame: + """Open pneumatic stacker grippers/clamps using command ``0x67``. + + Caution: this diagnostic command can release/drop a plate stack if it is not + physically supported. These are stacker clamps, not the robot grippers. + """ + payload = bytes([_stacker_index(stacker), 0x01]) + return await self.send_frame( + Frame(CMD_STACKER_GRIPPER, payload), + timeout=timeout, + ) + + async def close_stacker_grippers( + self, + stacker: int, + *, + timeout: float = 15.0, + ) -> Frame: + """Close pneumatic stacker grippers/clamps using command ``0x67``.""" + payload = bytes([_stacker_index(stacker), 0x00]) + return await self.send_frame( + Frame(CMD_STACKER_GRIPPER, payload), + timeout=timeout, + ) + + async def downstack_plate(self, stacker: int, *, timeout: float = 30.0) -> Frame: + """Pick/downstack one plate from stacker 1-4. + + Equivalent to the VWorks "Downstack" task: confirmed from captures to emit a + single ``0x62`` with payload ``01 00 01``. + """ + return await self.pick_plate_from_target(_stacker_index(stacker), timeout=timeout) + + async def upstack_plate(self, stacker: int, *, timeout: float = 30.0) -> Frame: + """Place/upstack one plate to stacker 1-4. + + Equivalent to the VWorks "Upstack" task: confirmed from captures to emit a + single ``0x63`` with payload ``01 00 01``. + """ + return await self.place_plate_at_target(_stacker_index(stacker), timeout=timeout) + + async def jog(self, axis: int, delta: float, *, timeout: float = 10.0) -> Frame: + """Send a relative jog command on one axis and wait for ACK/error.""" + if axis not in AXIS_NAMES: + raise ValueError(f"axis must be one of {sorted(AXIS_NAMES)}, got {axis!r}") + return await self.send_frame( + Frame(CMD_JOG, struct.pack(" Frame: + """Relative theta jog. Positive is CCW/left in observed tests.""" + return await self.jog(AXIS_THETA, delta_degrees, timeout=timeout) + + async def move_x(self, delta_mm: float, *, timeout: float = 10.0) -> Frame: + """Relative X jog. Positive is right in observed tests.""" + return await self.jog(AXIS_X, delta_mm, timeout=timeout) + + async def move_z(self, delta_mm: float, *, timeout: float = 10.0) -> Frame: + """Relative Z jog. Positive is up in observed tests.""" + return await self.jog(AXIS_Z, delta_mm, timeout=timeout) + + async def move_gripper_relative( + self, + delta: float, + *, + timeout: float = 10.0, + ) -> Frame: + """Relative robot-gripper jog in internal units. Positive closes grippers.""" + return await self.jog(AXIS_GRIPPER, delta, timeout=timeout) + + async def fully_close_grippers(self, *, timeout: float = 10.0) -> Frame: + """Fully close robot grippers using command ``0x6a``.""" + return await self.send_frame(Frame(CMD_ROBOT_GRIPPER, b"\x00"), timeout=timeout) + + async def fully_open_grippers(self, *, timeout: float = 10.0) -> Frame: + """Fully open robot grippers using command ``0x6a``.""" + return await self.send_frame(Frame(CMD_ROBOT_GRIPPER, b"\x01"), timeout=timeout) + + async def save_teachpoint( + self, + teachpoint: Teachpoint, + *, + expect_ack: bool = False, + timeout: float = 5.0, + ) -> Frame: + """Send ``0x73`` save-teachpoint. + + Captures did not show a command-specific ACK after ``0x73``, so + ``expect_ack`` defaults to ``False`` and the sent frame is returned after + writing. The device cannot read teachpoints back; keep a record of the + numeric teachpoints you write in your own protocol/config if you need them. + """ + cmd = Frame(CMD_SAVE_TEACHPOINT, _teachpoint_payload(teachpoint)) + async with self._lock: + await self._write_frame(cmd, timeout=timeout) + if expect_ack: + return await self._wait_for_command_ack(cmd.command_id, timeout=timeout) + return cmd + + async def save_test_left_teachpoint( + self, + *, + expect_ack: bool = False, + timeout: float = 5.0, + ) -> Frame: + """Send exactly the captured numeric payload for teachpoint ``test-left``.""" + return await self.save_teachpoint( + TEST_LEFT_TEACHPOINT, + expect_ack=expect_ack, + timeout=timeout, + ) + + async def move_plate_between_stackers( + self, + source_stacker: int, + destination_stacker: int, + *, + open_grippers_first: bool = True, + timeout: float = 30.0, + ) -> None: + """Move one plate from the source stacker to the destination stacker.""" + async with self._lock: + if open_grippers_first: + await self._send_frame_expect_ack_no_lock( + Frame(CMD_ROBOT_GRIPPER, b"\x01"), + timeout=timeout, + ) + await self._send_frame_expect_ack_no_lock( + Frame(CMD_PICK, _target_payload(_stacker_index(source_stacker))), + timeout=timeout, + ) + await self._send_frame_expect_ack_no_lock( + Frame(CMD_PLACE, _target_payload(_stacker_index(destination_stacker))), + timeout=timeout, + ) + + # --------------------------------------------------------------- labware config + + async def set_labware( + self, + labware: Union[Plate, BenchCelLabwareSettings, dict], + *, + timeout: float = 10.0, + ) -> BenchCelLabwareSettings: + """Push labware geometry to the BenchCel using the ``0x7d`` settings command. + + VWorks sends the labware settings as a 77-byte ``0x7d`` frame followed by an + empty ``0x9f`` commit, which the device echoes back. Invalid geometry (for + example, gripper hold positions that are too close) is rejected with a + ``0x02`` error. ``labware`` may be a PLR :class:`~pylabrobot.resources.Plate` + (settings are calculated from its dimensions), a + :class:`~pylabrobot.storage.agilent.benchcel_labware.BenchCelLabwareSettings` + object, or a serialized settings dict. + + On success, the resolved settings are stored on ``self.labware_settings`` and + returned. + """ + settings = resolve_benchcel_labware_settings(labware) + payload = settings.to_device_payload() + async with self._lock: + await self._write_frame(Frame(CMD_SET_LABWARE, payload), timeout=timeout) + await self._write_frame(Frame(CMD_SETTINGS_COMMIT), timeout=timeout) + # The device replies with a 0x9f commit echo. On invalid geometry it first + # sends a 0x02 error, then still echoes 0x9f; consume through the echo so + # the stream is not left out of sync, and raise the error afterwards. + error: Optional[str] = None + loop = asyncio.get_running_loop() + deadline = loop.time() + timeout + while True: + remaining = deadline - loop.time() + if remaining <= 0: + raise BenchCelTimeoutError(f"timed out after {timeout}s waiting for 0x9f settings commit") + frame = await self._read_frame(timeout=remaining) + if frame.command_id == CMD_ERROR: + error = parse_error_frame(frame) + elif frame.command_id == CMD_SETTINGS_COMMIT: + break + if error is not None: + raise BenchCelDeviceError(error, Frame(CMD_ERROR, error.encode("ascii", "replace"))) + self.labware_settings = settings + return settings + + # --------------------------------------------------------------- status APIs + + async def request_stacker_sensors(self, stacker: int, *, timeout: float = 5.0) -> SensorStatus: + """Query and decode one stacker's sensor/status frame.""" + expected_index = _stacker_index(stacker) + query = Frame(CMD_SENSOR_STATUS, bytes([expected_index])) + + def is_matching_sensor_response(frame: Frame) -> bool: + return ( + frame.command_id == CMD_SENSOR_STATUS + and len(frame.payload) == 18 + and frame.payload[0] == expected_index + ) + + async with self._lock: + await self._write_frame(query, timeout=timeout) + frame = await self._read_until(is_matching_sensor_response, timeout=timeout) + return parse_sensor_response(frame) + + async def request_all_stacker_sensors( + self, + *, + timeout_per_stacker: float = 5.0, + ) -> List[SensorStatus]: + """Query and decode all four stacker sensor/status frames.""" + sensors: List[SensorStatus] = [] + for stacker in (1, 2, 3, 4): + sensors.append(await self.request_stacker_sensors(stacker, timeout=timeout_per_stacker)) + return sensors + + async def request_general_status(self, *, timeout: float = 5.0) -> GeneralStatus: + """Send ``87 00 00`` and return decoded/raw general status.""" + async with self._lock: + await self._write_frame(Frame(CMD_GENERAL_STATUS), timeout=timeout) + frame = await self._read_until( + lambda f: f.command_id == CMD_GENERAL_STATUS, + timeout=timeout, + ) + return parse_general_status_response(frame) + + async def request_arm_status(self, *, timeout: float = 5.0) -> ArmStatus: + """Send ``87 00 00`` and return decoded theta/X/Z/gripper fields.""" + status = await self.request_general_status(timeout=timeout) + if status.arm_status is None: + raise BenchCelProtocolError( + f"0x87 response did not contain decoded 66-byte arm status: len={len(status.raw_payload)}" + ) + return status.arm_status + + async def request_axis_bounds(self, *, timeout: float = 5.0) -> AxisBoundsResponse: + """Send ``0x99`` query and parse per-axis min/max travel limits.""" + async with self._lock: + await self._write_frame(Frame(CMD_AXIS_BOUNDS), timeout=timeout) + frame = await self._read_until(lambda f: f.command_id == CMD_AXIS_BOUNDS, timeout=timeout) + return parse_axis_bounds_response(frame) + + async def request_current_position( + self, + selector: int = 1, + *, + timeout: float = 5.0, + ) -> CurrentPositionResponse: + """Send ``0x85`` query and return the raw response. + + The selector is preserved for diagnostics, but live tests showed it is + ignored by the device. + """ + if not 0 <= selector <= 0xFF: + raise ValueError(f"selector must fit in one byte, got {selector!r}") + async with self._lock: + await self._write_frame(Frame(CMD_CURRENT_POSITION, bytes([selector])), timeout=timeout) + frame = await self._read_until( + lambda f: f.command_id == CMD_CURRENT_POSITION, + timeout=timeout, + ) + return parse_current_position_response(frame, selector=selector) + + async def vworks_style_idle_poll_once( + self, + *, + timeout_per_response: float = 5.0, + ) -> Tuple[List[SensorStatus], GeneralStatus]: + """Perform one VWorks-like idle polling cycle.""" + sensors = await self.request_all_stacker_sensors(timeout_per_stacker=timeout_per_response) + general = await self.request_general_status(timeout=timeout_per_response) + return sensors, general + + # ------------------------------------------------------- IncubatorBackend API + + def _site_to_stacker(self, site: PlateHolder) -> int: + rack = site.parent + if not isinstance(rack, PlateCarrier): + raise ValueError(f"Site {site.name!r} is not assigned to a PlateCarrier") + rack_names = [r.name for r in self.racks] + try: + return rack_names.index(rack.name) + 1 + except ValueError as exc: + raise ValueError(f"Rack {rack.name!r} is not configured on this BenchCel") from exc + + def _plate_to_stacker(self, plate: Plate) -> int: + if not isinstance(plate.parent, PlateHolder): + raise ValueError(f"Plate {plate.name!r} is not directly assigned to a BenchCel site") + return self._site_to_stacker(plate.parent) + + def _resolve_loading_tray_target(self, teachpoint_id: Optional[int]) -> int: + """Return the teachpoint target for a transfer, or raise if none configured.""" + target = self.loading_tray_teachpoint_id if teachpoint_id is None else teachpoint_id + if target is None: + raise ValueError( + "No BenchCel loading/transfer teachpoint configured. The BenchCel has no " + "fixed loading position; set loading_tray_teachpoint_id on the backend/factory " + "or pass teachpoint_id=... to this call. Make sure the teachpoint is taught on " + "the device (VWorks or save_teachpoint) first." + ) + if not 0 <= target <= 0xFF: + raise ValueError(f"teachpoint_id must fit in one byte, got {target!r}") + return target + + async def fetch_plate_to_loading_tray( + self, + plate: Plate, + *, + teachpoint_id: Optional[int] = None, + open_grippers_first: bool = True, + timeout: float = 30.0, + **backend_kwargs, + ) -> None: + """Fetch a plate from its stacker and place it at the loading teachpoint. + + The BenchCel firmware addresses stackers, not individual slots. The PLR + ``plate`` is used to determine which configured stacker contains the plate + and to update resource state in :class:`~pylabrobot.storage.Incubator`. + """ + _ = backend_kwargs + source_stacker = self._plate_to_stacker(plate) + destination_target = self._resolve_loading_tray_target(teachpoint_id) + async with self._lock: + if open_grippers_first: + await self._send_frame_expect_ack_no_lock( + Frame(CMD_ROBOT_GRIPPER, b"\x01"), + timeout=timeout, + ) + await self._send_frame_expect_ack_no_lock( + Frame(CMD_PICK, _target_payload(_stacker_index(source_stacker))), + timeout=timeout, + ) + await self._send_frame_expect_ack_no_lock( + Frame(CMD_PLACE, _target_payload(destination_target)), + timeout=timeout, + ) + + async def take_in_plate( + self, + plate: Plate, + site: PlateHolder, + *, + teachpoint_id: Optional[int] = None, + open_grippers_first: bool = True, + timeout: float = 30.0, + **backend_kwargs, + ) -> None: + """Pick a plate from the loading teachpoint and upstack it into ``site``'s stacker.""" + _ = (plate, backend_kwargs) + source_target = self._resolve_loading_tray_target(teachpoint_id) + destination_stacker = self._site_to_stacker(site) + async with self._lock: + if open_grippers_first: + await self._send_frame_expect_ack_no_lock( + Frame(CMD_ROBOT_GRIPPER, b"\x01"), + timeout=timeout, + ) + await self._send_frame_expect_ack_no_lock( + Frame(CMD_PICK, _target_payload(source_target)), + timeout=timeout, + ) + await self._send_frame_expect_ack_no_lock( + Frame(CMD_PLACE, _target_payload(_stacker_index(destination_stacker))), + timeout=timeout, + ) + + async def open_door(self): + """BenchCel is open storage and has no door primitive.""" + raise NotImplementedError("BenchCel 4R is open storage and has no door to open") + + async def close_door(self): + """BenchCel is open storage and has no door primitive.""" + raise NotImplementedError("BenchCel 4R is open storage and has no door to close") + + async def set_temperature(self, temperature: float): + """BenchCel has no temperature-control feature.""" + _ = temperature + raise NotImplementedError("BenchCel 4R does not support temperature control") + + async def get_temperature(self) -> float: + """BenchCel has no temperature-control feature.""" + raise NotImplementedError("BenchCel 4R does not support temperature control") + + async def start_shaking(self, frequency: float): + """BenchCel has no shaker feature.""" + _ = frequency + raise NotImplementedError("BenchCel 4R does not support shaking") + + async def stop_shaking(self): + """BenchCel has no shaker feature.""" + raise NotImplementedError("BenchCel 4R does not support shaking") + + +# Short alias for users who do not need the configuration-specific name. +BenchCelBackend = BenchCel4RBackend diff --git a/pylabrobot/storage/agilent/benchcel_labware.py b/pylabrobot/storage/agilent/benchcel_labware.py new file mode 100644 index 00000000000..985590b6ee3 --- /dev/null +++ b/pylabrobot/storage/agilent/benchcel_labware.py @@ -0,0 +1,602 @@ +"""BenchCel labware calculation and PyLabRobot plate integration helpers. + +BenchCel/VWorks labware settings separate three geometry concepts that are easy +to conflate: + +* ``StackingThickness`` is the vertical pitch of plates in a nested stack. +* PLR ``Plate.size_z`` is the full outside height of one plate. +* ``RobotGripperOffset`` is the BenchCel robot gripper contact height measured + from the bottom of the plate. + +This module does not bundle per-catalog BenchCel XML profiles. Instead, it +calculates a conservative BenchCel geometry profile from a PLR plate resource and +allows explicit overrides for values that cannot be inferred from dimensions +alone, such as optical sensor thresholds. +""" + +from __future__ import annotations + +import dataclasses +import struct +import xml.etree.ElementTree as ET +from pathlib import Path +from typing import Optional, Union + +from pylabrobot.resources import Coordinate, Plate + +DEVICE_PAYLOAD_LENGTH = 77 +DEFAULT_NESTING_OVERLAP = 1.5 +DEFAULT_MIN_ROBOT_GRIPPER_OFFSET = 5.0 +DEFAULT_MAX_ROBOT_GRIPPER_OFFSET = 8.0 +DEFAULT_MIN_PICKUP_DISTANCE_FROM_TOP = 5.4 +DEFAULT_LOW_PROFILE_HEIGHT_CUTOFF = 11.5 +DEFAULT_TALL_PLATE_HEIGHT_CUTOFF = 30.0 +DEFAULT_ADDITIONAL_RELEASE_HEIGHT = 2.0 + + +@dataclasses.dataclass(frozen=True) +class PlateNotchSettings: + """Orientation-notch settings from BenchCel/VWorks labware XML or overrides.""" + + check_orientation: bool = True + a1_notch: bool = True + top_right_notch: bool = False + bottom_left_notch: bool = True + bottom_right_notch: bool = False + + +@dataclasses.dataclass(frozen=True) +class BenchCelLabwareSettings: + """BenchCel/VWorks labware settings paired with PLR plate dimensions. + + Args: + name: Human-readable labware name. + plate_size_x: Full outside plate length in mm. + plate_size_y: Full outside plate width in mm. + plate_size_z: Full outside plate height in mm. + stacking_thickness: Vertical pitch of nested plates in the BenchCel stacker. + This is usually smaller than full plate height because plates nest. + robot_gripper_offset: BenchCel robot gripper contact height from the bottom + of the plate. This maps to ``Plate.preferred_pickup_location.z``. + stacker_gripper_offset: Stacker clamp/gripper contact height from the bottom + of the plate. + """ + + name: str + plate_size_x: float + plate_size_y: float + plate_size_z: float + stacking_thickness: float + robot_gripper_offset: float + stacker_gripper_offset: float + sensor_offset: float + gripper_open_position: float = -1.0 + gripper_holding_plate_position: float = 8.0 + gripper_holding_stack_position: float = 8.5 + orientation_sensor_threshold: int = 100 + plate_presence_threshold: int = 225 + sensor_intensity: int = 50 + error_correction_offset: float = 0.0 + can_be_lidded: bool = False + lidded_plate_stacking_thickness: Optional[float] = None + lidded_plate_thickness: Optional[float] = None + lidded_plate_resting_height: Optional[float] = None + lidded_plate_gripper_offset: Optional[float] = None + lidded_plate_gripper_position: Optional[float] = None + lidded_plate_departure_height: Optional[float] = None + can_be_sealed: bool = False + sealed_plate_stacking_thickness: Optional[float] = None + sealed_plate_thickness: Optional[float] = None + stack_plate_presence_threshold: int = 50 + rack_presence_threshold: int = 3 + additional_release_height: float = DEFAULT_ADDITIONAL_RELEASE_HEIGHT + low_pressure_warning: int = 30 + tilt_margin: float = 2.0 + tilt_margin_enabled: bool = False + notch_settings: PlateNotchSettings = dataclasses.field(default_factory=PlateNotchSettings) + identifier: Optional[str] = None + source: Optional[str] = None + + @classmethod + def from_plate(cls, plate: Plate, **kwargs) -> "BenchCelLabwareSettings": + """Calculate BenchCel settings from a PLR plate resource.""" + return calculate_benchcel_labware_settings(plate, **kwargs) + + @classmethod + def from_xml_file( + cls, + path: Union[str, Path], + *, + plate_size_x: Optional[float] = None, + plate_size_y: Optional[float] = None, + plate_size_z: Optional[float] = None, + identifier: Optional[str] = None, + ) -> "BenchCelLabwareSettings": + """Parse a user-supplied BenchCel/VWorks XML file. + + XML files contain BenchCel stack/gripper values but not necessarily the full + physical plate dimensions PLR needs. Provide ``plate_size_*`` values when the + XML should be used to validate or annotate PLR plate resources. + """ + path = Path(path) + root = ET.parse(path).getroot() + labware = root.find("Labware") + if labware is None: + raise ValueError(f"BenchCel labware XML has no section: {path}") + stack = root.find("StackSettings") + notch = labware.find("PlateNotchesOrientationOptions") + + def text(parent: ET.Element, name: str) -> str: + element = parent.find(name) + if element is None or element.text is None: + raise ValueError(f"Missing <{name}> in {path}") + return element.text.strip() + + def optional_text(parent: Optional[ET.Element], name: str) -> Optional[str]: + if parent is None: + return None + element = parent.find(name) + if element is None or element.text is None: + return None + value = element.text.strip() + return value if value != "" else None + + def as_bool(value: Optional[str]) -> bool: + return value is not None and value.strip().lower() in {"yes", "true", "1", "enabled"} + + def as_optional_float(value: Optional[str]) -> Optional[float]: + return None if value is None else float(value) + + def as_optional_int(value: Optional[str]) -> Optional[int]: + return None if value is None else int(value) + + name = text(labware, "Name") + stacking_thickness = float(text(labware, "StackingThickness")) + robot_gripper_offset = float(text(labware, "RobotGripperOffset")) + stacker_gripper_offset = float(text(labware, "StackerGripperOffset")) + sensor_offset = float(text(labware, "SensorOffset")) + + # If full physical height is not supplied, use stack pitch as the best-known + # fallback. Callers should pass real dimensions for PLR integration. + px = 127.76 if plate_size_x is None else plate_size_x + py = 85.48 if plate_size_y is None else plate_size_y + pz = stacking_thickness if plate_size_z is None else plate_size_z + + tilt = stack.find("TiltMargin") if stack is not None else None + notch_settings = PlateNotchSettings() + if notch is not None: + notch_settings = PlateNotchSettings( + check_orientation=as_bool(optional_text(notch, "CheckOrientation")), + a1_notch=as_bool(optional_text(notch, "A1Notch")), + top_right_notch=as_bool(optional_text(notch, "TopRightNotch")), + bottom_left_notch=as_bool(optional_text(notch, "BottomLeftNotch")), + bottom_right_notch=as_bool(optional_text(notch, "BottomRightNotch")), + ) + + return cls( + identifier=identifier or path.stem, + name=name, + plate_size_x=px, + plate_size_y=py, + plate_size_z=pz, + stacking_thickness=stacking_thickness, + robot_gripper_offset=robot_gripper_offset, + stacker_gripper_offset=stacker_gripper_offset, + sensor_offset=sensor_offset, + gripper_open_position=float(text(labware, "GripperOpenPosition")), + gripper_holding_plate_position=float(text(labware, "GripperHoldingPlatePosition")), + gripper_holding_stack_position=float(text(labware, "GripperHoldingStackPosition")), + orientation_sensor_threshold=int(text(labware, "OrientationSensorThreshold")), + plate_presence_threshold=int(text(labware, "PlatePresenceThreshold")), + sensor_intensity=int(text(labware, "SensorIntensity")), + error_correction_offset=float(text(labware, "ErrorCorrectionOffset")), + can_be_lidded=as_bool(optional_text(labware, "CanBeLidded")), + lidded_plate_stacking_thickness=as_optional_float( + optional_text(labware, "LiddedPlateStackingThickness") + ), + lidded_plate_thickness=as_optional_float(optional_text(labware, "LiddedPlateThickness")), + lidded_plate_resting_height=as_optional_float( + optional_text(labware, "LiddedPlateRestingHeight") + ), + lidded_plate_gripper_offset=as_optional_float( + optional_text(labware, "LiddedPlateGripperOffset") + ), + lidded_plate_gripper_position=as_optional_float( + optional_text(labware, "LiddedPlateGripperPosition") + ), + lidded_plate_departure_height=as_optional_float( + optional_text(labware, "LiddedPlateDepartureHeight") + ), + can_be_sealed=as_bool(optional_text(labware, "CanBeSealed")), + sealed_plate_stacking_thickness=as_optional_float( + optional_text(labware, "SealedPlateStackingThickness") + ), + sealed_plate_thickness=as_optional_float(optional_text(labware, "SealedPlateThickness")), + stack_plate_presence_threshold=as_optional_int(optional_text(stack, "PlatePresenceThreshold")) + or 50, + rack_presence_threshold=as_optional_int(optional_text(stack, "RackPresenceThreshold")) or 3, + additional_release_height=as_optional_float(optional_text(stack, "AdditionalReleaseHeight")) + or DEFAULT_ADDITIONAL_RELEASE_HEIGHT, + low_pressure_warning=as_optional_int(optional_text(stack, "LowPressureWarning")) or 30, + tilt_margin=as_optional_float(tilt.text.strip() if tilt is not None and tilt.text else None) + or 2.0, + tilt_margin_enabled=as_bool(tilt.get("Enabled") if tilt is not None else None), + notch_settings=notch_settings, + source=str(path), + ) + + @classmethod + def from_dict(cls, data: dict) -> "BenchCelLabwareSettings": + """Deserialize settings from :meth:`to_dict`.""" + data = dict(data) + notch = data.get("notch_settings") + if isinstance(notch, dict): + data["notch_settings"] = PlateNotchSettings(**notch) + return cls(**data) + + def to_dict(self) -> dict: + """Return JSON-serialisable settings.""" + return dataclasses.asdict(self) + + def effective_stacking_thickness(self, *, sealed: bool = False, lidded: bool = False) -> float: + """Return the BenchCel stack pitch for the selected labware state.""" + sealed_pitch = self.sealed_plate_stacking_thickness + if sealed and sealed_pitch not in (None, 0): + return float(sealed_pitch) # type: ignore[arg-type] + lidded_pitch = self.lidded_plate_stacking_thickness + if lidded and lidded_pitch not in (None, 0): + return float(lidded_pitch) # type: ignore[arg-type] + return self.stacking_thickness + + def effective_plate_height(self, *, sealed: bool = False, lidded: bool = False) -> float: + """Return full outside plate height for PLR rack/site sizing.""" + sealed_height = self.sealed_plate_thickness + if sealed and sealed_height not in (None, 0): + return float(sealed_height) # type: ignore[arg-type] + lidded_height = self.lidded_plate_thickness + if lidded and lidded_height not in (None, 0): + return float(lidded_height) # type: ignore[arg-type] + return self.plate_size_z + + def robot_pickup_distance_from_top(self, *, sealed: bool = False, lidded: bool = False) -> float: + """Return PLR ``pickup_distance_from_top`` implied by ``RobotGripperOffset``.""" + return self.effective_plate_height(sealed=sealed, lidded=lidded) - self.robot_gripper_offset + + def preferred_pickup_location(self, plate: Plate) -> Coordinate: + """Return a PLR preferred pickup location using the BenchCel robot offset.""" + return Coordinate( + x=plate.get_size_x() / 2, + y=plate.get_size_y() / 2, + z=self.robot_gripper_offset, + ) + + def dimension_differences(self, plate: Plate) -> dict[str, float]: + """Return profile minus plate-resource dimensions for each axis.""" + return { + "x": self.plate_size_x - plate.get_size_x(), + "y": self.plate_size_y - plate.get_size_y(), + "z": self.plate_size_z - plate.get_size_z(), + } + + def validate_plate_dimensions(self, plate: Plate, *, tolerance_mm: float = 0.25) -> None: + """Raise if a PLR plate resource differs too far from this BenchCel profile.""" + differences = self.dimension_differences(plate) + failures = [ + f"{axis}: expected {expected:.3f} mm, got {actual:.3f} mm" + for axis, expected, actual in ( + ("x", self.plate_size_x, plate.get_size_x()), + ("y", self.plate_size_y, plate.get_size_y()), + ("z", self.plate_size_z, plate.get_size_z()), + ) + if abs(differences[axis]) > tolerance_mm + ] + if failures: + raise ValueError( + f"Plate {plate.name!r} does not match BenchCel labware {self.name!r}: " + + "; ".join(failures) + ) + + def apply_to_plate( + self, + plate: Plate, + *, + validate_dimensions: bool = True, + tolerance_mm: float = 0.25, + ) -> Plate: + """Set PLR pickup metadata on ``plate`` using this BenchCel profile.""" + if validate_dimensions: + self.validate_plate_dimensions(plate, tolerance_mm=tolerance_mm) + plate.preferred_pickup_location = self.preferred_pickup_location(plate) + return plate + + def to_device_payload(self) -> bytes: + """Encode the 77-byte ``0x7d`` BenchCel labware-settings payload. + + The layout was reverse-engineered from VWorks packet captures and validated + byte-for-byte against several real plates. Fields that were always zero in + the captures for standard flat microplates (``ErrorCorrectionOffset`` and the + lidded/sealed sub-fields) are sent as zero and are not yet mapped. + """ + if not 0 <= int(self.orientation_sensor_threshold) <= 0xFFFF: + raise ValueError("orientation_sensor_threshold must fit in uint16") + if not 0 <= int(self.sensor_intensity) <= 0xFFFF: + raise ValueError("sensor_intensity must fit in uint16") + if not 0 <= int(self.plate_presence_threshold) <= 0xFFFF: + raise ValueError("plate_presence_threshold must fit in uint16") + + notch = self.notch_settings + payload = bytearray(DEVICE_PAYLOAD_LENGTH) + struct.pack_into(" "BenchCelLabwareSettings": + """Decode a 77-byte ``0x7d`` payload back into settings. + + Only the confidently-mapped fields are recovered; unmapped lidded/sealed + fields stay at their defaults. + """ + if len(payload) != DEVICE_PAYLOAD_LENGTH: + raise ValueError(f"expected {DEVICE_PAYLOAD_LENGTH}-byte 0x7d payload, got {len(payload)}") + + def fp(o: int) -> float: + return float(struct.unpack_from(" int: + return int(struct.unpack_from(" float: + """Estimate BenchCel ``StackingThickness`` from full plate height. + + The default overlap (1.5 mm) matches the supplied example XML/dimension pairs + within about 0.2 mm. Override this for labware with unusual nesting behavior. + """ + if plate_height <= 0: + raise ValueError(f"plate_height must be positive, got {plate_height}") + if nesting_overlap < 0: + raise ValueError(f"nesting_overlap must be non-negative, got {nesting_overlap}") + if nesting_overlap >= plate_height: + raise ValueError( + f"nesting_overlap ({nesting_overlap}) must be smaller than plate_height ({plate_height})" + ) + return plate_height - nesting_overlap + + +def calculate_robot_gripper_offset( + plate_height: float, + *, + min_offset: float = DEFAULT_MIN_ROBOT_GRIPPER_OFFSET, + max_offset: float = DEFAULT_MAX_ROBOT_GRIPPER_OFFSET, + min_pickup_distance_from_top: float = DEFAULT_MIN_PICKUP_DISTANCE_FROM_TOP, +) -> float: + """Estimate BenchCel ``RobotGripperOffset`` from plate height. + + The BenchCel manual says plates are typically gripped 5-10 mm above the bottom. + The default calculation keeps at least ~5.4 mm above the grip point where + possible while capping the grip height at 8 mm from the bottom. + """ + if plate_height <= 0: + raise ValueError(f"plate_height must be positive, got {plate_height}") + if min_offset > max_offset: + raise ValueError("min_offset must be <= max_offset") + return max(min_offset, min(max_offset, plate_height - min_pickup_distance_from_top)) + + +def calculate_stacker_gripper_offset( + plate_height: float, + robot_gripper_offset: float, + *, + low_profile_height_cutoff: float = DEFAULT_LOW_PROFILE_HEIGHT_CUTOFF, + tall_plate_height_cutoff: float = DEFAULT_TALL_PLATE_HEIGHT_CUTOFF, +) -> float: + """Estimate BenchCel ``StackerGripperOffset`` from plate height. + + This is a heuristic: low plates need clamps lower, very tall plates can use a + slightly higher clamp point, and standard SBS microplates sit in between. + """ + if plate_height <= low_profile_height_cutoff: + return min(robot_gripper_offset, 4.0) + if plate_height >= tall_plate_height_cutoff: + return min(robot_gripper_offset, 6.0) + return min(robot_gripper_offset, 5.0) + + +def calculate_sensor_offset( + plate_height: float, + *, + low_profile_height_cutoff: float = DEFAULT_LOW_PROFILE_HEIGHT_CUTOFF, + tall_plate_height_cutoff: float = DEFAULT_TALL_PLATE_HEIGHT_CUTOFF, +) -> float: + """Estimate BenchCel ``SensorOffset`` from plate height.""" + if plate_height <= low_profile_height_cutoff: + return 7.0 + if plate_height >= tall_plate_height_cutoff: + return max(7.0, plate_height - 4.0) + return 8.0 + + +def calculate_benchcel_labware_settings( + plate: Plate, + *, + name: Optional[str] = None, + identifier: Optional[str] = None, + nesting_overlap: float = DEFAULT_NESTING_OVERLAP, + stacking_thickness: Optional[float] = None, + robot_gripper_offset: Optional[float] = None, + stacker_gripper_offset: Optional[float] = None, + sensor_offset: Optional[float] = None, + orientation_sensor_threshold: int = 100, + plate_presence_threshold: int = 225, + sensor_intensity: int = 50, + error_correction_offset: float = 0.0, + gripper_open_position: float = -1.0, + gripper_holding_plate_position: float = 8.0, + gripper_holding_stack_position: float = 8.5, + can_be_lidded: Optional[bool] = None, + can_be_sealed: bool = False, + sealed_plate_stacking_thickness: Optional[float] = None, + sealed_plate_thickness: Optional[float] = None, + notch_settings: Optional[PlateNotchSettings] = None, +) -> BenchCelLabwareSettings: + """Calculate BenchCel labware settings from a PLR plate resource. + + Geometry fields are calculated from ``plate``. Optical sensor thresholds and + notch options cannot be reliably inferred from dimensions, so they are exposed + as optional overrides with conservative defaults. + """ + height = plate.get_size_z() + robot_offset = robot_gripper_offset + if robot_offset is None: + robot_offset = calculate_robot_gripper_offset(height) + stacker_offset = stacker_gripper_offset + if stacker_offset is None: + stacker_offset = calculate_stacker_gripper_offset(height, robot_offset) + sensor = sensor_offset + if sensor is None: + sensor = calculate_sensor_offset(height) + + # BenchCel ``StackingThickness`` is the per-plate vertical pitch of a nested stack, which is + # exactly PLR ``Plate.stacking_z_height``. Prefer an explicit override, then the plate's own + # declared pitch, and only estimate from height (``size_z - nesting_overlap``) as a last resort. + resolved_stacking_thickness = stacking_thickness + if resolved_stacking_thickness is None: + resolved_stacking_thickness = plate.stacking_z_height + if resolved_stacking_thickness is None: + resolved_stacking_thickness = calculate_stacking_thickness( + height, nesting_overlap=nesting_overlap + ) + + return BenchCelLabwareSettings( + identifier=identifier, + name=name or plate.model or plate.name, + plate_size_x=plate.get_size_x(), + plate_size_y=plate.get_size_y(), + plate_size_z=height, + stacking_thickness=resolved_stacking_thickness, + robot_gripper_offset=robot_offset, + stacker_gripper_offset=stacker_offset, + sensor_offset=sensor, + gripper_open_position=gripper_open_position, + gripper_holding_plate_position=gripper_holding_plate_position, + gripper_holding_stack_position=gripper_holding_stack_position, + orientation_sensor_threshold=orientation_sensor_threshold, + plate_presence_threshold=plate_presence_threshold, + sensor_intensity=sensor_intensity, + error_correction_offset=error_correction_offset, + can_be_lidded=plate.has_lid() if can_be_lidded is None else can_be_lidded, + can_be_sealed=can_be_sealed, + sealed_plate_stacking_thickness=sealed_plate_stacking_thickness, + sealed_plate_thickness=sealed_plate_thickness, + notch_settings=notch_settings or PlateNotchSettings(), + source="calculated from PLR plate dimensions", + ) + + +def resolve_benchcel_labware_settings( + labware: Union[Plate, BenchCelLabwareSettings, dict], +) -> BenchCelLabwareSettings: + """Resolve a PLR plate, settings object, or serialized settings dict.""" + if isinstance(labware, BenchCelLabwareSettings): + return labware + if isinstance(labware, Plate): + return calculate_benchcel_labware_settings(labware) + if isinstance(labware, dict): + return BenchCelLabwareSettings.from_dict(labware) + raise TypeError( + "labware must be a Plate, BenchCelLabwareSettings, or serialized settings dict; " + f"got {type(labware).__name__}" + ) + + +def apply_benchcel_labware_settings( + plate: Plate, + labware: Optional[Union[BenchCelLabwareSettings, dict]] = None, + *, + validate_dimensions: bool = True, + tolerance_mm: float = 0.25, + **calculation_kwargs, +) -> BenchCelLabwareSettings: + """Apply BenchCel pickup metadata to ``plate`` and return the settings used. + + If ``labware`` is omitted, settings are calculated from the PLR plate + dimensions using :func:`calculate_benchcel_labware_settings`. + """ + settings = ( + calculate_benchcel_labware_settings(plate, **calculation_kwargs) + if labware is None + else resolve_benchcel_labware_settings(labware) + ) + settings.apply_to_plate( + plate, + validate_dimensions=validate_dimensions, + tolerance_mm=tolerance_mm, + ) + return settings + + +def benchcel_labware_summary_row(settings: BenchCelLabwareSettings) -> dict: + """Return one summary row useful for docs/tests/diagnostics.""" + height = settings.effective_plate_height() + return { + "name": settings.name, + "plate_height": height, + "stacking_thickness": settings.stacking_thickness, + "nesting_overlap": height - settings.stacking_thickness, + "robot_gripper_offset": settings.robot_gripper_offset, + "pickup_distance_from_top": settings.robot_pickup_distance_from_top(), + "stacker_gripper_offset": settings.stacker_gripper_offset, + "sensor_offset": settings.sensor_offset, + } diff --git a/pylabrobot/storage/agilent/benchcel_mock_server.py b/pylabrobot/storage/agilent/benchcel_mock_server.py new file mode 100644 index 00000000000..495287253c7 --- /dev/null +++ b/pylabrobot/storage/agilent/benchcel_mock_server.py @@ -0,0 +1,437 @@ +"""In-process mock server for the Agilent BenchCel 4R TCP protocol.""" + +from __future__ import annotations + +import argparse +import asyncio +import dataclasses +import logging +import struct +from typing import Dict, Optional + +from .benchcel_backend import ( + AXIS_GRIPPER, + AXIS_THETA, + AXIS_X, + AXIS_Z, + CMD_ACK, + CMD_AXIS_BOUNDS, + CMD_CURRENT_POSITION, + CMD_ERROR, + CMD_GENERAL_STATUS, + CMD_HOME, + CMD_HOME_MOTORS, + CMD_JOG, + CMD_LOAD_PLATE, + CMD_MOVE_TO_TARGET, + CMD_PICK, + CMD_PLACE, + CMD_ROBOT_GRIPPER, + CMD_SAVE_TEACHPOINT, + CMD_SENSOR_STATUS, + CMD_SET_LABWARE, + CMD_SETTINGS_COMMIT, + CMD_STACKER_GRIPPER, + CMD_UNLOAD_PLATE, + AxisBoundsResponse, + Frame, + Teachpoint, + make_frame, + parse_frame_from_buffer, +) +from .benchcel_labware import DEVICE_PAYLOAD_LENGTH, BenchCelLabwareSettings + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass +class _Pose: + theta: float = 0.0 + x: float = 0.0 + z: float = 10.0 + gripper: float = -1.0 + + +_DEFAULT_BOUNDS = AxisBoundsResponse( + theta_min=-115.0, + x_min=-360.9, + z_min=-1.5, + gripper_min=-1.5, + theta_max=115.0, + x_max=360.9, + z_max=104.0, + gripper_max=11.0, + raw_payload=b"", + float_values=(-115.0, -360.9, -1.5, -1.5, 115.0, 360.9, 104.0, 11.0), +) + + +class BenchCelMockServer: + """Small asyncio TCP server emulating the BenchCel binary protocol. + + The mock is wire-compatible for the commands implemented by + :class:`~pylabrobot.storage.agilent.benchcel_backend.BenchCel4RBackend` and is + intended for backend tests and manual protocol debugging. + """ + + def __init__( + self, + host: str = "127.0.0.1", + port: int = 0, + *, + close_on_home_motors: bool = True, + ): + self.host = host + self.port = port + self.close_on_home_motors = close_on_home_motors + self._server: Optional[asyncio.AbstractServer] = None + self._pose = _Pose() + self._bounds = _DEFAULT_BOUNDS + self._teachpoints: Dict[int, Teachpoint] = {} + self._plate_in_gripper = False + self.plate_presence = [0, 1, 128, 118] + self.air_pressure = [56, 56, 45, 47] + self.stacker_grippers_open = [False, False, False, False] + self.labware: Optional[BenchCelLabwareSettings] = None + self.received_frames: list[Frame] = [] + + async def __aenter__(self) -> "BenchCelMockServer": + await self.start() + return self + + async def __aexit__(self, exc_type, exc, tb) -> None: + await self.stop() + + async def start(self) -> None: + """Start accepting TCP connections.""" + if self._server is not None: + return + self._server = await asyncio.start_server(self._handle_client, host=self.host, port=self.port) + sockets = list(self._server.sockets or []) + if sockets: + self.port = sockets[0].getsockname()[1] + logger.info("BenchCelMockServer listening on %s:%d", self.host, self.port) + + async def stop(self) -> None: + """Stop the server and wait for its listening socket to close.""" + if self._server is None: + return + self._server.close() + await self._server.wait_closed() + self._server = None + + async def serve_forever(self) -> None: + """Run until cancelled.""" + if self._server is None: + await self.start() + assert self._server is not None + await self._server.serve_forever() + + async def _handle_client( + self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + ) -> None: + buffer = bytearray() + try: + while not reader.at_eof(): + chunk = await reader.read(4096) + if not chunk: + break + buffer.extend(chunk) + while True: + frame = parse_frame_from_buffer(buffer) + if frame is None: + break + self.received_frames.append(frame) + close_after_response = await self._handle_frame(frame, writer) + await writer.drain() + if close_after_response: + writer.close() + await writer.wait_closed() + return + finally: + if not writer.is_closing(): + writer.close() + await writer.wait_closed() + + async def _handle_frame(self, frame: Frame, writer: asyncio.StreamWriter) -> bool: + """Handle one incoming frame. Return True to close the TCP session.""" + if frame.command_id == CMD_HOME_MOTORS: + self._pose = _Pose() + if not self.close_on_home_motors: + self._write_ack(writer, CMD_HOME_MOTORS) + return self.close_on_home_motors + + if frame.command_id == CMD_HOME: + self._pose = _Pose() + self._write_ack(writer, CMD_HOME) + return False + + if frame.command_id == CMD_LOAD_PLATE: + stacker = self._read_stacker_payload(frame, writer) + if stacker is not None: + self._write_ack(writer, CMD_LOAD_PLATE, stacker) + return False + + if frame.command_id == CMD_UNLOAD_PLATE: + stacker = self._read_stacker_payload(frame, writer) + if stacker is not None: + self._write_ack(writer, CMD_UNLOAD_PLATE, stacker) + return False + + if frame.command_id in (CMD_PICK, CMD_PLACE): + target_id = self._read_target_payload(frame, writer) + if target_id is None: + return False + self._move_to_target_id(target_id, approach_height=0.0) + self._plate_in_gripper = frame.command_id == CMD_PICK + self._pose.gripper = 5.0 if self._plate_in_gripper else -1.0 + self._write_ack(writer, frame.command_id) + return False + + if frame.command_id == CMD_MOVE_TO_TARGET: + if len(frame.payload) != 10: + self._write_error(writer, "Malformed move-to-target payload") + return False + _, target_id, _, approach_height = struct.unpack(" Optional[int]: + try: + return self._validate_stacker_payload(frame.payload) + except ValueError as exc: + self._write_error(writer, str(exc)) + return None + + def _read_target_payload(self, frame: Frame, writer: asyncio.StreamWriter) -> Optional[int]: + try: + return self._validate_target_payload(frame.payload) + except ValueError as exc: + self._write_error(writer, str(exc)) + return None + + def _write_ack(self, writer: asyncio.StreamWriter, command_id: int, *extra: int) -> None: + writer.write(make_frame(CMD_ACK, bytes([command_id, *extra]))) + + def _write_error(self, writer: asyncio.StreamWriter, message: str) -> None: + writer.write(make_frame(CMD_ERROR, message.encode("ascii", errors="replace"))) + + @staticmethod + def _validate_stacker_payload(payload: bytes) -> int: + if len(payload) < 2 or payload[0] != 0x01 or payload[1] not in (0, 1, 2, 3): + raise ValueError(f"invalid stacker payload: {payload.hex()}") + return payload[1] + + @staticmethod + def _validate_target_payload(payload: bytes) -> int: + if len(payload) != 4 or payload[0] != 0x01 or payload[2:] != b"\x00\x01": + raise ValueError(f"invalid target payload: {payload.hex()}") + return payload[1] + + def _handle_jog(self, frame: Frame, writer: asyncio.StreamWriter) -> None: + if len(frame.payload) != 5: + self._write_error(writer, "Malformed jog payload") + return + axis, delta = struct.unpack(" None: + if len(frame.payload) != 27: + self._write_error(writer, "Malformed save-teachpoint payload") + return + values = struct.unpack(" None: + if len(frame.payload) != DEVICE_PAYLOAD_LENGTH: + self._write_error(writer, "Malformed labware settings payload") + return + settings = BenchCelLabwareSettings.from_device_payload(frame.payload) + # The device rejects geometry where the stack hold position is not above the + # plate hold position (observed "too close" rejections had stack <= plate). + if settings.gripper_holding_stack_position <= settings.gripper_holding_plate_position: + self._write_error(writer, "The labware gripper positions are too close") + return + self.labware = settings + + def _move_to_target_id(self, target_id: int, approach_height: float) -> None: + if target_id in (0, 1, 2, 3): + self._pose.theta = 0.0 + self._pose.x = (-270.0, -90.0, 90.0, 270.0)[target_id] + self._pose.z = max(self._bounds.z_min, min(self._bounds.z_max, approach_height)) + return + + teachpoint = self._teachpoints.get(target_id) + if teachpoint is None: + self._pose.theta = 0.0 + self._pose.x = 0.0 + self._pose.z = 10.0 + return + + self._pose.theta = teachpoint.theta + self._pose.x = teachpoint.x + self._pose.z = max( + self._bounds.z_min, + min(self._bounds.z_max, teachpoint.z + approach_height), + ) + + def _sensor_payload(self, stacker_index: int) -> bytes: + return struct.pack( + " bytes: + payload = bytearray(66) + struct.pack_into(" bytes: + payload = bytearray(33) + payload[0] = selector + struct.pack_into(" argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Run a mock Agilent BenchCel 4R TCP server") + parser.add_argument("--host", default="127.0.0.1") + parser.add_argument("--port", type=int, default=7612) + parser.add_argument("--verbose", action="store_true") + return parser + + +async def _amain() -> None: + args = build_arg_parser().parse_args() + logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO) + server = BenchCelMockServer(host=args.host, port=args.port) + await server.start() + print(f"BenchCelMockServer listening on {server.host}:{server.port}") + try: + await server.serve_forever() + except asyncio.CancelledError: + pass + + +def main() -> None: + asyncio.run(_amain()) + + +if __name__ == "__main__": + main() diff --git a/pylabrobot/storage/agilent/benchcel_mock_server_tests.py b/pylabrobot/storage/agilent/benchcel_mock_server_tests.py new file mode 100644 index 00000000000..fd0af953fa9 --- /dev/null +++ b/pylabrobot/storage/agilent/benchcel_mock_server_tests.py @@ -0,0 +1,166 @@ +"""End-to-end BenchCel backend tests against the in-process mock server.""" + +from __future__ import annotations + +import unittest + +from pylabrobot.storage.agilent.benchcel_backend import BenchCel4RBackend, BenchCelDeviceError +from pylabrobot.storage.agilent.benchcel_mock_server import BenchCelMockServer + + +class BenchCelMockServerTests(unittest.IsolatedAsyncioTestCase): + async def test_motion_status_and_axis_bounds_over_tcp(self): + async with BenchCelMockServer() as server: + backend = BenchCel4RBackend( + host=server.host, + port=server.port, + timeout=2.0, + read_poll_timeout=0.01, + ) + await backend.setup() + try: + await backend.home() + await backend.move_to_stacker(3) + status = await backend.request_arm_status() + self.assertAlmostEqual(status.x, 90.0) + self.assertAlmostEqual(status.z, 0.0) + + await backend.move_x(5.0) + status = await backend.request_arm_status() + self.assertAlmostEqual(status.x, 95.0) + + bounds = await backend.request_axis_bounds() + self.assertAlmostEqual(bounds.theta_min, -115.0) + self.assertAlmostEqual(bounds.x_max, 360.9, places=3) + finally: + await backend.stop() + + async def test_stacker_sensor_query_over_tcp(self): + async with BenchCelMockServer() as server: + backend = BenchCel4RBackend( + host=server.host, + port=server.port, + timeout=2.0, + read_poll_timeout=0.01, + ) + await backend.setup() + try: + sensors = await backend.request_all_stacker_sensors() + self.assertEqual([s.stacker for s in sensors], [1, 2, 3, 4]) + self.assertEqual(sensors[2].plate_presence, 128) + self.assertTrue(sensors[2].plate_present()) + finally: + await backend.stop() + + async def test_jog_out_of_bounds_raises_device_error(self): + async with BenchCelMockServer() as server: + backend = BenchCel4RBackend( + host=server.host, + port=server.port, + timeout=2.0, + read_poll_timeout=0.01, + ) + await backend.setup() + try: + with self.assertRaises(BenchCelDeviceError) as cm: + await backend.move_x(500.0) + self.assertEqual(cm.exception.message, "X position out of bounds") + finally: + await backend.stop() + + async def test_teachpoint_save_then_move_over_tcp(self): + async with BenchCelMockServer() as server: + backend = BenchCel4RBackend( + host=server.host, + port=server.port, + timeout=2.0, + read_poll_timeout=0.01, + ) + await backend.setup() + try: + await backend.save_test_left_teachpoint() + await backend.move_to_teachpoint(0x1F, approach_height=20.0) + status = await backend.request_arm_status() + self.assertAlmostEqual(status.theta, 89.99874114990234, places=3) + self.assertAlmostEqual(status.x, -360.8802795410156, places=3) + self.assertAlmostEqual(status.z, 10.0) + finally: + await backend.stop() + + async def test_stacker_gripper_diagnostic_over_tcp(self): + async with BenchCelMockServer() as server: + backend = BenchCel4RBackend( + host=server.host, + port=server.port, + timeout=2.0, + read_poll_timeout=0.01, + ) + await backend.setup() + try: + await backend.dangerously_open_stacker_grippers(1) + self.assertTrue(server.stacker_grippers_open[0]) + await backend.close_stacker_grippers(1) + self.assertFalse(server.stacker_grippers_open[0]) + finally: + await backend.stop() + + async def test_set_labware_over_tcp(self): + from pylabrobot.resources.plate import Plate + + async with BenchCelMockServer() as server: + backend = BenchCel4RBackend( + host=server.host, + port=server.port, + timeout=2.0, + read_poll_timeout=0.01, + ) + await backend.setup() + try: + plate = Plate("p", size_x=127.76, size_y=85.48, size_z=14.4, ordered_items={}) + settings = await backend.set_labware(plate) + self.assertAlmostEqual(settings.stacking_thickness, 12.9) + self.assertIs(backend.labware_settings, settings) + assert server.labware is not None + self.assertAlmostEqual(server.labware.plate_size_z, 14.4, places=3) + # A second call still works (stream stays in sync after the commit echo). + await backend.set_labware(settings) + finally: + await backend.stop() + + async def test_set_labware_rejects_too_close_gripper_positions(self): + from pylabrobot.storage.agilent import BenchCelLabwareSettings, PlateNotchSettings + + async with BenchCelMockServer() as server: + backend = BenchCel4RBackend( + host=server.host, + port=server.port, + timeout=2.0, + read_poll_timeout=0.01, + ) + await backend.setup() + try: + bad = BenchCelLabwareSettings( + name="bad", + plate_size_x=127.76, + plate_size_y=85.48, + plate_size_z=14.4, + stacking_thickness=12.9, + robot_gripper_offset=8.0, + stacker_gripper_offset=5.0, + sensor_offset=8.0, + gripper_holding_plate_position=8.0, + gripper_holding_stack_position=8.0, # not above plate -> rejected + notch_settings=PlateNotchSettings(), + ) + with self.assertRaises(BenchCelDeviceError) as cm: + await backend.set_labware(bad) + self.assertEqual(cm.exception.message, "The labware gripper positions are too close") + # The connection is still usable after a rejection. + status = await backend.request_arm_status() + self.assertIsNotNone(status) + finally: + await backend.stop() + + +if __name__ == "__main__": + unittest.main() diff --git a/pylabrobot/storage/agilent/benchcel_tests.py b/pylabrobot/storage/agilent/benchcel_tests.py new file mode 100644 index 00000000000..490873a29fa --- /dev/null +++ b/pylabrobot/storage/agilent/benchcel_tests.py @@ -0,0 +1,495 @@ +import asyncio +import struct +import tempfile +import unittest +from pathlib import Path + +from pylabrobot.resources import Coordinate +from pylabrobot.resources.plate import Plate +from pylabrobot.storage.agilent import ( + BenchCel4R, + BenchCelLabwareSettings, + PlateNotchSettings, + apply_benchcel_labware_settings, + calculate_benchcel_labware_settings, + calculate_robot_gripper_offset, + calculate_sensor_offset, + calculate_stacker_gripper_offset, + calculate_stacking_thickness, +) +from pylabrobot.storage.agilent.benchcel_backend import ( + TEST_LEFT_TEACHPOINT, + BenchCel4RBackend, + BenchCelDeviceError, + BenchCelProtocolError, + Frame, + parse_arm_status_from_87_payload, + parse_frame_from_buffer, + parse_sensor_response, + split_frames, +) +from pylabrobot.storage.agilent.racks import benchcel_4r_racks, benchcel_4r_racks_for_labware +from pylabrobot.storage.backend import IncubatorBackend +from pylabrobot.storage.incubator import Incubator + + +class _FakeWriter: + def __init__(self) -> None: + self.sent = bytearray() + self.closed = False + + def write(self, data: bytes) -> None: + self.sent.extend(data) + + async def drain(self) -> None: + return None + + def close(self) -> None: + self.closed = True + + async def wait_closed(self) -> None: + return None + + +class _FakeReader: + def __init__(self, chunks: list[bytes]) -> None: + self._chunks = list(chunks) + + async def read(self, num_bytes: int) -> bytes: + if not self._chunks: + return b"" + chunk = self._chunks.pop(0) + if len(chunk) > num_bytes: + self._chunks.insert(0, chunk[num_bytes:]) + return chunk[:num_bytes] + return chunk + + +def _make_backend(chunks: list[bytes]) -> tuple[BenchCel4RBackend, _FakeWriter]: + backend = BenchCel4RBackend(host="ignored", port=0, timeout=1.0, read_poll_timeout=0.01) + writer = _FakeWriter() + backend.io._writer = writer # type: ignore[assignment] + backend.io._reader = _FakeReader(chunks) # type: ignore[assignment] + return backend, writer + + +def _sensor_payload(stacker_index: int = 2) -> bytes: + return struct.pack( + " + + + Example + 12.34 + 100 + 225 + 50 + 0.0 + 8.0 + 5.0 + 8.0 + -1.0 + 8.0 + 8.5 + + Yes + Yes + No + Yes + No + + No + No + + + 50 + 3 + 2.0 + 30 + 2.0 + + +""" + with tempfile.TemporaryDirectory() as td: + path = Path(td) / "example.xml" + path.write_text(xml, encoding="ascii") + settings = BenchCelLabwareSettings.from_xml_file( + path, + identifier="example", + plate_size_x=127.76, + plate_size_y=85.48, + plate_size_z=14.4, + ) + self.assertEqual(settings.identifier, "example") + self.assertEqual(settings.name, "Example") + self.assertAlmostEqual(settings.stacking_thickness, 12.34) + self.assertEqual(settings.stack_plate_presence_threshold, 50) + self.assertTrue(settings.notch_settings.a1_notch) + + +class BenchCelFactoryTests(unittest.TestCase): + def setUp(self): + # Constructing the backend creates ``asyncio.Lock``/``Socket`` objects, which on Python 3.9 + # bind to the current event loop at init time; ensure one exists for these synchronous tests. + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + + def tearDown(self): + asyncio.set_event_loop(None) + self._loop.close() + + def test_factory_creates_incubator_with_backend_and_four_racks(self): + benchcel = BenchCel4R(name="bc", host="192.168.0.100", stacker_num_sites=3) + self.assertIsInstance(benchcel, Incubator) + backend = benchcel.backend + assert isinstance(backend, BenchCel4RBackend) + self.assertEqual(backend.host, "192.168.0.100") + self.assertEqual(len(benchcel.racks), 4) + self.assertEqual(len(benchcel.racks[0]), 3) + self.assertEqual(benchcel.model, "Agilent BenchCel 4R") + + def test_factory_can_size_racks_from_plate_labware_dimensions(self): + plate = Plate("plate", size_x=127.76, size_y=85.47, size_z=44.04, ordered_items={}) + benchcel = BenchCel4R( + name="bc", + host="192.168.0.100", + stacker_num_sites=3, + labware=plate, + ) + backend = benchcel.backend + assert isinstance(backend, BenchCel4RBackend) + assert backend.labware_settings is not None + self.assertEqual(backend.labware_settings.name, "plate") + self.assertAlmostEqual(benchcel.racks[0][0].get_size_z(), 44.04) + self.assertAlmostEqual(benchcel.racks[0][1].location.z, 42.54) + + +class BenchCelBackendWireTests(unittest.IsolatedAsyncioTestCase): + async def test_home_writes_command_and_waits_for_split_ack(self): + backend, writer = _make_backend([b"\x69", b"\x01\x00\x48"]) + ack = await backend.home() + self.assertEqual(writer.sent.hex(), "48010001") + self.assertEqual(ack, Frame(0x69, b"\x48")) + + async def test_move_to_stacker_writes_expected_frame(self): + backend, writer = _make_backend([Frame(0x69, b"\x65").to_bytes()]) + await backend.move_to_stacker(3) + self.assertEqual(writer.sent.hex(), "650a0001020000204100000000") + + async def test_save_teachpoint_writes_captured_shape(self): + backend, writer = _make_backend([]) + await backend.save_teachpoint(TEST_LEFT_TEACHPOINT) + self.assertEqual( + writer.sent.hex(), + "731b001f5bffb342ad70b4c3000020c100010000a041000000000000c0bf", + ) + + async def test_device_error_raises(self): + payload = b"X position out of bounds" + frame = Frame(0x02, payload).to_bytes() + backend, writer = _make_backend([frame]) + with self.assertRaises(BenchCelDeviceError) as cm: + await backend.move_x(500) + self.assertEqual(writer.sent.hex(), Frame(0x66, struct.pack(" 0x62 01 02 00 01 ACK 0x69 62 + # Upstack -> 0x63 01 02 00 01 ACK 0x69 63 + # Load -> 0x60 01 02 ACK 0x69 60 02 + # Unload -> 0x61 01 02 00 00 00 00 ACK 0x69 61 02 + backend, writer = _make_backend( + [ + Frame(0x69, b"\x62").to_bytes(), + Frame(0x69, b"\x63").to_bytes(), + Frame(0x69, b"\x60\x02").to_bytes(), + Frame(0x69, b"\x61\x02").to_bytes(), + ] + ) + await backend.downstack_plate(3) + await backend.upstack_plate(3) + await backend.load_stacker(3) + await backend.unload_stacker(3) + self.assertEqual( + writer.sent.hex(), + "62040001020001" # downstack stacker 3 + "63040001020001" # upstack stacker 3 + "6002000102" # load stacker 3 + "610600010200000000", # unload stacker 3 + ) + + async def test_serialize_includes_connection_info(self): + plate = Plate("plate", size_x=127.76, size_y=85.48, size_z=14.6, ordered_items={}) + backend = BenchCel4RBackend( + host="192.168.0.100", + port=7612, + timeout=12.5, + labware=plate, + ) + serialized = backend.serialize() + self.assertEqual(serialized["type"], "BenchCel4RBackend") + self.assertEqual(serialized["host"], "192.168.0.100") + self.assertEqual(serialized["timeout"], 12.5) + self.assertEqual(serialized["labware"]["name"], "plate") + self.assertAlmostEqual(serialized["labware"]["stacking_thickness"], 13.1) + deserialized = IncubatorBackend.deserialize(serialized.copy()) + self.assertIsInstance(deserialized, BenchCel4RBackend) + self.assertEqual(deserialized.host, "192.168.0.100") + self.assertEqual(deserialized.labware_settings.name, "plate") + + +class BenchCelIncubatorMappingTests(unittest.IsolatedAsyncioTestCase): + async def test_transfers_require_a_configured_teachpoint(self): + backend = BenchCel4RBackend(host="ignored") # no loading_tray_teachpoint_id + self.assertIsNone(backend.loading_tray_teachpoint_id) + racks = benchcel_4r_racks(num_sites=1) + await backend.set_racks(racks) + plate = Plate("plate", size_x=1, size_y=1, size_z=1, ordered_items={}) + racks[0][0].assign_child_resource(plate, location=Coordinate.zero()) + + with self.assertRaisesRegex(ValueError, "teachpoint"): + await backend.fetch_plate_to_loading_tray(plate) + with self.assertRaisesRegex(ValueError, "teachpoint"): + await backend.take_in_plate(plate, racks[0][0]) + + async def test_fetch_plate_to_loading_tray_maps_plate_site_to_stacker(self): + backend = BenchCel4RBackend(host="ignored") + racks = benchcel_4r_racks(num_sites=2) + await backend.set_racks(racks) + + plate = Plate("plate", size_x=1, size_y=1, size_z=1, ordered_items={}) + racks[2][0].assign_child_resource(plate, location=Coordinate.zero()) + + sent: list[Frame] = [] + + async def fake_send(frame: Frame, **kwargs) -> Frame: + sent.append(frame) + return Frame(0x69, kwargs.get("ack_payload") or bytes([frame.command_id])) + + backend._send_frame_expect_ack_no_lock = fake_send # type: ignore[method-assign] + await backend.fetch_plate_to_loading_tray(plate, teachpoint_id=0x1E) + + self.assertEqual( + [f.hex() for f in sent], + ["6a010001", "62040001020001", "630400011e0001"], + ) + + async def test_take_in_plate_maps_destination_site_to_stacker(self): + backend = BenchCel4RBackend(host="ignored") + racks = benchcel_4r_racks(num_sites=2) + await backend.set_racks(racks) + plate = Plate("plate", size_x=1, size_y=1, size_z=1, ordered_items={}) + + sent: list[Frame] = [] + + async def fake_send(frame: Frame, **kwargs) -> Frame: + sent.append(frame) + return Frame(0x69, kwargs.get("ack_payload") or bytes([frame.command_id])) + + backend._send_frame_expect_ack_no_lock = fake_send # type: ignore[method-assign] + await backend.take_in_plate(plate, racks[0][1], teachpoint_id=0x1E) + + self.assertEqual( + [f.hex() for f in sent], + ["6a010001", "620400011e0001", "63040001000001"], + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/pylabrobot/storage/agilent/racks.py b/pylabrobot/storage/agilent/racks.py new file mode 100644 index 00000000000..6a9d1b99f42 --- /dev/null +++ b/pylabrobot/storage/agilent/racks.py @@ -0,0 +1,102 @@ +"""State-tracking rack resources for Agilent BenchCel stackers.""" + +from typing import Union + +from pylabrobot.resources import Coordinate, Plate +from pylabrobot.resources.carrier import PlateCarrier, PlateHolder + +from .benchcel_labware import BenchCelLabwareSettings, resolve_benchcel_labware_settings + +SBS_FOOTPRINT_X_MM = 127.76 +SBS_FOOTPRINT_Y_MM = 85.48 + + +def benchcel_stacker_rack( + name: str, + num_sites: int = 20, + site_pitch: float = 14.5, + site_height: float = 9.0, +) -> PlateCarrier: + """Create a generic vertical stacker rack for BenchCel state tracking. + + The BenchCel firmware addresses stackers, not individual slots. This resource + exists so PLR can track the expected plate order/content of each stacker. Use + ``num_sites`` and ``site_pitch`` values matching the physical rack/labware used + on your instrument. + + Note: a BenchCel stacker is logically a single-ended LIFO stack of nesting + plates -- i.e. a :class:`~pylabrobot.resources.resource_stack.ResourceStack` + (``direction="z"``), whose height the per-plate ``stacking_z_height`` already + models. We model it here as a :class:`PlateCarrier` of fixed sites only because + the :class:`~pylabrobot.storage.Incubator` frontend is built around + random-access racks; the ``site_pitch``-spaced sites are cosmetic state, and + only the stacker identity is sent to the hardware. Random-access site selection + (e.g. ``find_random_site``) is therefore not honoured by the device, which can + only downstack/upstack at one end. A ResourceStack-native model is a possible + future improvement (pending a storage-abstraction design discussion). + """ + if num_sites <= 0: + raise ValueError(f"num_sites must be positive, got {num_sites}") + if site_pitch <= 0: + raise ValueError(f"site_pitch must be positive, got {site_pitch}") + if site_height < 0: + raise ValueError(f"site_height must be non-negative, got {site_height}") + + total_height = site_pitch * (num_sites - 1) + site_height + return PlateCarrier( + name=name, + size_x=SBS_FOOTPRINT_X_MM, + size_y=SBS_FOOTPRINT_Y_MM, + size_z=total_height, + sites={ + i: PlateHolder( + name=f"{name}-{i}", + size_x=SBS_FOOTPRINT_X_MM, + size_y=SBS_FOOTPRINT_Y_MM, + size_z=site_height, + pedestal_size_z=0, + ).at(Coordinate(x=0, y=0, z=site_pitch * i)) + for i in range(num_sites) + }, + model="agilent_benchcel_stacker_rack", + ) + + +def benchcel_4r_racks( + name_prefix: str = "benchcel_stacker", + num_sites: int = 20, + site_pitch: float = 14.5, + site_height: float = 9.0, +) -> list[PlateCarrier]: + """Create four generic stacker racks for an Agilent BenchCel 4R.""" + return [ + benchcel_stacker_rack( + name=f"{name_prefix}_{stacker}", + num_sites=num_sites, + site_pitch=site_pitch, + site_height=site_height, + ) + for stacker in range(1, 5) + ] + + +def benchcel_4r_racks_for_labware( + labware: Union[Plate, BenchCelLabwareSettings, dict], + name_prefix: str = "benchcel_stacker", + num_sites: int = 20, + *, + sealed: bool = False, + lidded: bool = False, +) -> list[PlateCarrier]: + """Create four stacker racks sized from a BenchCel labware profile. + + ``site_pitch`` is BenchCel ``StackingThickness``. ``site_height`` is full PLR + plate height, which must be large enough for Incubator site-fit checks. + """ + settings = resolve_benchcel_labware_settings(labware) + return benchcel_4r_racks( + name_prefix=name_prefix, + num_sites=num_sites, + site_pitch=settings.effective_stacking_thickness(sealed=sealed, lidded=lidded), + site_height=settings.effective_plate_height(sealed=sealed, lidded=lidded), + ) diff --git a/pylabrobot/storage/cytomat/cytomat.py b/pylabrobot/storage/cytomat/cytomat.py index ca9227fe003..006a064dcd5 100644 --- a/pylabrobot/storage/cytomat/cytomat.py +++ b/pylabrobot/storage/cytomat/cytomat.py @@ -159,6 +159,7 @@ async def send_action( self, command_type: str, command: str, params: str, timeout: Optional[int] = 60 ) -> OverviewRegisterState: """Calls send_command, but has a timeout handler and returns the overview register state. + Args: timeout: The maximum time to wait for the command to complete. If None, the command will not wait for completion.