diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 3a689f05938..669dc255fb0 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -59,6 +59,7 @@ Drop, DropTipRack, GripDirection, + Mix, MultiHeadAspirationContainer, MultiHeadAspirationPlate, MultiHeadDispenseContainer, @@ -9093,6 +9094,171 @@ async def head96_experimental_dispense( du=f"{stop_flow_rate_increment:05}", ) + @_requires_head96 + @need_iswap_parked + async def mix96( + self, + mix: Mix, + resource: Optional[Union[Plate, Container, List[Well]]] = None, + a1_coordinate: Optional[Coordinate] = None, + minimum_traverse_height_start: Optional[float] = None, + offset: Coordinate = Coordinate.zero(), + blowout_air_volume: float = 5.0, + lld_mode: Optional[LLDMode] = None, + descent_speed: float = 80.0, + swap_speed: float = 5.0, + settling_time: float = 0.0, + minimum_traverse_height_end: Optional[float] = None, + ): + """Position the 96-head over a target and mix in place. + + Raises the single channels to safe Z, then moves X/Y over the target and descends into the + well, then runs ``mix.repetitions`` symmetric aspirate / dispense cycles: each aspirate + follows the surface down by ``surface_following_distance`` and each dispense follows it + back up, so the tip oscillates without drifting. Returns to a traverse height when done. + + Z targets are in tip-bottom space (the target Z is where the tip end goes, not the stop + disk). Declare the target in exactly one of two ways: + + - ``a1_coordinate``: an explicit deck Coordinate for the channel-A1 position. + - ``resource`` (+ ``offset``): like ``aspirate96`` - a Plate (head A1 over well A1), a + Container, or a list of Wells. + + Args: + mix: volume, repetitions, flow_rate and optional surface_following_distance. + resource: aspirate96-style target (Plate / Container / list[Well]); mutually exclusive + with ``a1_coordinate``. + a1_coordinate: explicit channel-A1 deck target; mutually exclusive with ``resource``. + minimum_traverse_height_start: absolute tip-bottom Z before the X/Y move; None uses full Z + safety. + offset: added to the resolved target position. + blowout_air_volume: air gap taken above the well before descent and expelled above it on + exit, to clear the tips of residual on the way out; 0 skips both. + lld_mode: liquid-level-detection mode; only ``LLDMode.OFF`` is supported (the default). + descent_speed: speed for the fast descent down to just above the well. + swap_speed: speed from there into the well to the target Z. + settling_time: seconds to wait after the last cycle, before retracting out of the well. + minimum_traverse_height_end: absolute tip-bottom Z after mixing; None uses full Z safety. + + Raises: + ValueError: if neither or both of ``a1_coordinate`` / ``resource`` are given, + ``lld_mode`` is not ``LLDMode.OFF``, or ``settling_time`` < 0. + RuntimeError: if the 96-head holds no tips. + """ + if (a1_coordinate is None) == (resource is None): + raise ValueError("provide exactly one of a1_coordinate or resource") + + lld_mode = lld_mode if lld_mode is not None else self.LLDMode.OFF + if lld_mode is not self.LLDMode.OFF: + raise ValueError("mix96 currently supports only LLDMode.OFF") + + if settling_time < 0: + raise ValueError("settling_time must be >= 0") + + if await self.head96_request_tip_presence() == 0: + raise RuntimeError("96-head has no tips (firmware reports none); pick up tips first") + + # H0 direct-drive moves don't raise the single channels (the C0 core-96 commands do so at + # firmware level), so do it explicitly before the X/Y traverse. + await self.move_all_channels_in_z_safety() + + # resolve the channel-A1 deck target (and the container top, when a resource gives us one) + if a1_coordinate is not None: + a1 = a1_coordinate + offset + z_top: Optional[float] = None + else: + anchor: Container + if isinstance(resource, Plate): + anchor = resource.get_item(0) # head A1 over well A1 (as aspirate96 resolves it) + elif isinstance(resource, list): + anchor = resource[0] + else: + assert resource is not None + anchor = resource + a1 = anchor.get_absolute_location(x="c", y="c", z="cavity_bottom") + offset + z_top = anchor.get_absolute_location(x="c", y="c", z="top").z + + # traverse to start height; None retracts to full (stop-disk) Z safety, a value is the + # tip-bottom height the rest of the method works in + if minimum_traverse_height_start is None: + await self.head96_move_to_z_safety() + else: + await self.head96_move_tool_z(minimum_traverse_height_start, speed=descent_speed) + + # move X, Y; X acceleration_level=1 below y=200 mm, the low-Y zone where the head is + # cantilevered forward off the X-drive and wobbles most + # TODO: replace head96_move_y with a primitive Y move to enable parallelised addressing of + # the X and Y drives. Its speed/acceleration request+set round-trip currently blocks + # parallelisation of the Y drive: the second read is trapped behind the in-flight X move on + # the single connection, so the Y move only starts once X has finished. + await asyncio.gather( + self.head96_move_x(a1.x, acceleration_level=1 if a1.y <= 200.0 else 3), + self.head96_move_y(a1.y), + ) + + # the tip oscillates between the floor (a1.z) and mix_start (floor + sf), starting at + # mix_start so the first aspirate can follow the surface down without hitting the bottom. + sf = 0.0 if mix.surface_following_distance is None else mix.surface_following_distance + mix_floor = a1.z + mix_start = a1.z + sf + + # 2-stage Z descent in tip-bottom space: descent_speed to the swap-start height just above + # the well, aspirate blowout_air_volume, then swap_speed down to mix_start; move_tool_z lands + # the tip end each move. + z_clearance = 5.0 + swap_start_z = (z_top if z_top is not None else mix_start) + z_clearance + await self.head96_move_tool_z(swap_start_z, speed=descent_speed) + if blowout_air_volume: + await self.head96_experimental_aspirate( + blowout_air_volume, + flow_rate=mix.flow_rate, + minimum_height=mix_floor, + surface_following_distance=0, + requires_tip=False, + ) + await self.head96_move_tool_z(mix_start, speed=swap_speed) + + # symmetric mix cycles (no per-cycle drift): each aspirate follows the surface down by sf + # to the floor, each dispense back up to mix_start. minimum_height is the tip-bottom floor; + # the experimental commands convert it to the stop-disk reference. + for _ in range(mix.repetitions): + await self.head96_experimental_aspirate( + mix.volume, + flow_rate=mix.flow_rate, + minimum_height=mix_floor, + surface_following_distance=sf, + requires_tip=False, + ) + await self.head96_experimental_dispense( + mix.volume, + flow_rate=mix.flow_rate, + minimum_height=mix_floor, + surface_following_distance=sf, + requires_tip=False, + ) + + # settle in place (tip still in the liquid) after the last cycle + if settling_time: + await asyncio.sleep(settling_time) + + # careful exit at swap_speed back up to the swap-start height (mirrors the descent), before + # the fast traverse out + await self.head96_move_tool_z(swap_start_z, speed=swap_speed) + + if blowout_air_volume: + await self.head96_experimental_dispense( + blowout_air_volume, + flow_rate=mix.flow_rate, + requires_tip=False, + ) + + # traverse to end height; None retracts to full (stop-disk) Z safety, a value is the + # tip-bottom height the rest of the method works in + if minimum_traverse_height_end is None: + await self.head96_move_to_z_safety() + else: + await self.head96_move_tool_z(minimum_traverse_height_end, speed=descent_speed) + # # # Granular commands # # # async def head96_dispensing_drive_move_to_home_volume( diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py index 46ec9a6169d..0a35b60acf9 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py @@ -9,7 +9,7 @@ from pylabrobot.arms.standard import CartesianCoords from pylabrobot.liquid_handling import LiquidHandler -from pylabrobot.liquid_handling.standard import GripDirection, Pickup +from pylabrobot.liquid_handling.standard import GripDirection, Mix, Pickup from pylabrobot.plate_reading import PlateReader from pylabrobot.plate_reading.chatterbox import PlateReaderChatterboxBackend from pylabrobot.resources import ( @@ -169,6 +169,24 @@ def _make_head96_information(star): ) +def _stub_mix96_motion(star): + """Stub the 96-head primitives mix96 orchestrates so tests can assert the arguments it passes + without touching firmware. Tips present; iSWAP already parked via setUp.""" + star._head96_information = _make_head96_information(star) + star.head96_request_tip_presence = unittest.mock.AsyncMock(return_value=1) + for method in ( + "move_all_channels_in_z_safety", + "head96_move_to_z_safety", + "head96_move_z", + "head96_move_x", + "head96_move_y", + "head96_move_tool_z", + "head96_experimental_aspirate", + "head96_experimental_dispense", + ): + setattr(star, method, unittest.mock.AsyncMock()) + + class TestPipChannelInformationParsing(unittest.TestCase): """VW (pip channel hardware-configuration) response parsing. @@ -1364,6 +1382,71 @@ async def test_head96_probe_z_using_clld_retracts_on_firmware_error(self): await self.STAR.head96_probe_z_using_clld(tip_len=50.0) self.STAR.head96_move_to_z_safety.assert_awaited_once() + async def test_mix96_floor_maps_to_minimum_height_with_offset(self): + """mix96 sends the resolved tip-bottom floor (well cavity_bottom + offset.z) as the + experimental command's minimum_height - guards offset.z reaching the floor.""" + _stub_mix96_motion(self.STAR) + offset_z = 2.0 + await self.STAR.mix96( + Mix(volume=50, repetitions=1, flow_rate=100), + resource=self.plate, + offset=Coordinate(0, 0, offset_z), + ) + well = self.plate.get_item(0) + expected_floor = well.get_absolute_location(x="c", y="c", z="cavity_bottom").z + offset_z + self.assertEqual( + self.STAR.head96_experimental_aspirate.call_args.kwargs["minimum_height"], expected_floor + ) + + async def test_mix96_stroke_starts_surface_following_above_floor(self): + """The careful (swap_speed) descent lands at floor + surface_following_distance and that + distance reaches the aspirate, so the stroke spans [floor, floor+sf], never below floor.""" + _stub_mix96_motion(self.STAR) + floor_z, sf = 100.0, 8.0 + await self.STAR.mix96( + Mix(volume=50, repetitions=1, flow_rate=100, surface_following_distance=sf), + a1_coordinate=Coordinate(500, 300, floor_z), + swap_speed=5.0, + ) + # move_tool_z calls: [0] fast to swap-start, [1] careful to mix_start, [2] exit retract + careful_descent = self.STAR.head96_move_tool_z.call_args_list[1] + self.assertEqual(careful_descent.args[0], floor_z + sf) + self.assertEqual(careful_descent.kwargs["speed"], 5.0) + self.assertEqual( + self.STAR.head96_experimental_aspirate.call_args.kwargs["surface_following_distance"], sf + ) + + async def test_mix96_specified_traverse_heights_are_tip_bottom_moves(self): + """A specified minimum_traverse_height_start/end is a tip-bottom Z (head96_move_tool_z), like + the rest of the method; only the None default retracts to stop-disk Z safety. Guards against a + geometric (tip-bottom) traverse height being driven as a stop-disk position.""" + _stub_mix96_motion(self.STAR) + start_z, end_z = 250.0, 240.0 + await self.STAR.mix96( + Mix(volume=50, repetitions=1, flow_rate=100), + a1_coordinate=Coordinate(500, 300, 100.0), + minimum_traverse_height_start=start_z, + minimum_traverse_height_end=end_z, + ) + self.STAR.head96_move_to_z_safety.assert_not_called() + tool_z_targets = [call.args[0] for call in self.STAR.head96_move_tool_z.call_args_list] + self.assertEqual(tool_z_targets[0], start_z) # first tool move is the start traverse + self.assertEqual(tool_z_targets[-1], end_z) # last tool move is the end traverse + + async def test_mix96_zero_blowout_skips_air_gap_calls(self): + """blowout_air_volume=0 issues no firmware aspirate/dispense for the air gap: every + experimental aspirate/dispense is a mix-cycle stroke (mix.volume), none a zero-volume blow-out.""" + _stub_mix96_motion(self.STAR) + await self.STAR.mix96( + Mix(volume=50, repetitions=1, flow_rate=100), + a1_coordinate=Coordinate(500, 300, 100.0), + blowout_air_volume=0.0, + ) + asp_vols = [call.args[0] for call in self.STAR.head96_experimental_aspirate.call_args_list] + disp_vols = [call.args[0] for call in self.STAR.head96_experimental_dispense.call_args_list] + self.assertEqual(asp_vols, [50]) # one cycle aspirate, no blow-out aspirate + self.assertEqual(disp_vols, [50]) # one cycle dispense, no blow-out dispense + async def test_core_96_dispense_quadrant(self): """Test that each quadrant of a 384-well plate produces the correct firmware command.