Skip to content
Open
217 changes: 217 additions & 0 deletions pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
)
from pylabrobot.resources.liquid import Liquid
from pylabrobot.resources.rotation import Rotation
from pylabrobot.resources.tip_tracker import does_tip_tracking
from pylabrobot.resources.trash import Trash

T = TypeVar("T")
Expand Down Expand Up @@ -8601,6 +8602,209 @@ async def head96_move_tool_z(self, z: float, speed: Optional[float] = None):

return await self.head96_move_stop_disk_z(z + tip_overhang, speed=speed)

@need_iswap_parked
@_requires_head96
async def head96_probe_z_using_clld(
self,
start_pos_search: Optional[float] = None,
tip_len: Optional[float] = None,
lowest_immers_pos: Optional[float] = None,
approach_speed: Optional[float] = None,
speed: float = 10.0,
acceleration: float = 300.0,
lld_sensor: Literal["A1 or B2", "G11 or H12", "any", "all"] = "any",
detection_edge: int = 10,
detection_drop: int = 2,
post_detection_dist: float = 2.0,
current_protection_limiter: int = 15,
move_to_z_safety_after: bool = False,
) -> float:
"""Probe the liquid-surface Z-height with the 96-head's capacitive LLD (cLLD).

Runs a downward cLLD search on the 96-head stop-disk Z drive (H0 ZL), stopping at the detected
surface, and returns the tip-bottom (= liquid-surface) Z-height. The 96-head counterpart of the
single-channel cLLD probe; `lld_sensor` is head-specific (the head has two cLLD sensors, a
channel has one).

Positions are tip-bottom referenced like `head96_move_tool_z`: the tip overhang (stop disk minus
tip bottom, a rigid constant for the mounted tip) maps them to the firmware's stop-disk zh / zc,
and the deck floor caps the deepest immersion. The head should be at Z-safety before calling -
`start_pos_search` defaults to the top and the firmware brings the head there before searching.
cLLD needs a conductive path, so tips must be loaded.

The ZL wire format changed between the 2008 and 2013 firmware command sets, so the parameters are
formatted per the head's reported firmware date. `lld_sensor` other than "any" requires 2013+
firmware, since the 2008 ZL has no sensor-selection field.

Args:
tip_len: mounted tip length in mm, used to map tip-bottom positions to the stop disk. None
(default) measures it via `head96_request_tip_length`.
lowest_immers_pos: lowest tip-bottom the search may reach in mm; None is the deepest safe value.
start_pos_search: tip-bottom position the search starts from in mm; None is the highest safe.
speed: cLLD search speed in mm/sec.
acceleration: search acceleration in mm/sec**2.
approach_speed: fast descent speed in mm/sec for the upper section, before the slow search.
None uses `head96_z_drive_speed_default`.
current_protection_limiter: motor current limit (hardware units; 0-15 on 2013+, 0-7 on 2008).
lld_sensor: which head cLLD sensor(s) trigger detection.
detection_edge: edge steepness threshold for cLLD detection (0-1023).
detection_drop: offset applied after cLLD edge detection (0-1023).
post_detection_dist: signed distance to move after detection in mm; positive moves up / out of
liquid, negative moves down / deeper.
move_to_z_safety_after: if True, retract the head to Z-safety after reading the height.

Returns:
The detected liquid-surface Z-height as a tip-bottom position in mm.

Raises:
ValueError: if the head holds no tips, the chosen sensor's corner channel(s) hold no tip (when
tip tracking is on), a parameter is out of range, or `lld_sensor` other than "any" is
requested on pre-2013 firmware.
"""
assert self._head96_information is not None, (
"requires 96-head firmware version information for safe operation"
)

lld_sensor_map = {"G11 or H12": 0, "A1 or B2": 1, "any": 2, "all": 3}
if lld_sensor not in lld_sensor_map:
raise ValueError(f"lld_sensor must be one of {list(lld_sensor_map)}, is {lld_sensor!r}")

z_speed_min, z_speed_max = self._head96_information.z_speed_range
z_accel_min, z_accel_max = self._head96_information.z_acceleration_range

if approach_speed is None:
approach_speed = self.head96_z_drive_speed_default
if not z_speed_min <= approach_speed <= z_speed_max:
raise ValueError(
f"approach_speed must be between {z_speed_min} - {z_speed_max} mm/sec, is {approach_speed}"
)
if not z_speed_min <= speed <= z_speed_max:
raise ValueError(f"speed must be between {z_speed_min} - {z_speed_max} mm/sec, is {speed}")
if not z_accel_min <= acceleration <= z_accel_max:
raise ValueError(
f"acceleration must be between {z_accel_min} - {z_accel_max} mm/sec**2, is {acceleration}"
)
Comment on lines +8672 to +8686

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe at some point we can abstract this now that many features use the same pattern

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree, probably 2-3 more commands and we'll be able to see a nice abstraction to apply :)

if not 0 <= detection_edge <= 1023:
raise ValueError(f"detection_edge must be between 0 - 1023, is {detection_edge}")
if not 0 <= detection_drop <= 1023:
raise ValueError(f"detection_drop must be between 0 - 1023, is {detection_drop}")

# First guard (firmware, always verifiable): some channel must hold a tip for the conductive path.
if not await self.head96_request_tip_presence():
raise ValueError("96-head cLLD requires tips loaded (conductive path); none detected")

# When tip tracking is on, also require a tip on the corner channel(s) that feed the chosen
# sensor - the firmware guard above only confirms *some* channel does. cLLD sensor 0 reads
# G11(86)/H12(95), sensor 1 reads A1(0)/B2(9) (column-major head96 indices).
if does_tip_tracking() and self.head96 is not None:
sensor_0 = self.head96[86].has_tip or self.head96[95].has_tip
sensor_1 = self.head96[0].has_tip or self.head96[9].has_tip
sensor_ready = {
"G11 or H12": sensor_0,
"A1 or B2": sensor_1,
"any": sensor_0 or sensor_1,
"all": sensor_0 and sensor_1,
}[lld_sensor]
if not sensor_ready:
raise ValueError(
f"lld_sensor={lld_sensor!r}: the tip tracker reports no tip on the corner channel(s) "
"that feed it"
)

# Tip length: measure unless the caller supplied it.
if tip_len is None:
tip_len = await self.head96_request_tip_length()
tip_overhang = tip_len - STARBackend.DEFAULT_TIP_FITTING_DEPTH

# Reachable tip-bottom window: z_range shifted down by the overhang, floored at the deck.
z_min, z_max = self._head96_information.z_range
deck = STARBackend.MINIMUM_CHANNEL_Z_POSITION
height_min = max(z_min - tip_overhang, deck)
height_max = z_max - tip_overhang

if lowest_immers_pos is None:
lowest_immers_pos = height_min
if start_pos_search is None:
start_pos_search = height_max
if not (height_min <= lowest_immers_pos <= height_max):
raise ValueError(
f"lowest_immers_pos={lowest_immers_pos} mm out of reach "
f"[{round(height_min, 1)}, {round(height_max, 1)}] mm (tip-bottom)"
)
if not (height_min <= start_pos_search <= height_max):
raise ValueError(
f"start_pos_search={start_pos_search} mm out of reach "
f"[{round(height_min, 1)}, {round(height_max, 1)}] mm (tip-bottom)"
)

# lm and the raw 6-digit zr arrived with the 2013 firmware; pre-2013 has no lm and scales zr.
uses_2013_structure = self._head96_information.fw_version >= datetime.date(2013, 1, 1)
if not uses_2013_structure and lld_sensor != "any":
raise ValueError(
f"lld_sensor={lld_sensor!r} requires 2013+ firmware; the 2008 command set has no "
"sensor-selection field"
)

# zw (current protection limiter) range narrows on pre-2013 firmware.
zw_max = 15 if uses_2013_structure else 7
if not 0 <= current_protection_limiter <= zw_max:
raise ValueError(
f"current_protection_limiter must be between 0 - {zw_max}, is {current_protection_limiter}"
)

# Back to stop-disk space (zh / zc) via the overhang.
lowest_immers_pos_increments = self._head96_z_drive_mm_to_increment(
lowest_immers_pos + tip_overhang
)
start_pos_search_increments = self._head96_z_drive_mm_to_increment(
start_pos_search + tip_overhang
)
approach_speed_increments = self._head96_z_drive_mm_to_increment(approach_speed)
speed_increments = self._head96_z_drive_mm_to_increment(speed)
acceleration_increments = self._head96_z_drive_mm_to_increment(acceleration)

# Signed post-detection move -> direction (zj) and magnitude (zi).
post_detection_direction = 1 if post_detection_dist >= 0 else 0
post_detection_dist_increments = self._head96_z_drive_mm_to_increment(abs(post_detection_dist))
if not 0 <= post_detection_dist_increments <= 9999:
raise ValueError(
f"abs(post_detection_dist) must be <= "
f"{self._head96_z_drive_increment_to_mm(9999)} mm, is {abs(post_detection_dist)}"
)

lm_field = {"lm": str(lld_sensor_map[lld_sensor])} if uses_2013_structure else {}
if uses_2013_structure:
zr_field = f"{acceleration_increments:06}" # raw [increment/second**2]
zw_field = f"{current_protection_limiter:02}"
else:
zr_field = f"{acceleration_increments // 1000:03}" # [1000 increment/second**2]
zw_field = f"{current_protection_limiter:01}"
zl_params: Dict[str, Any] = {
"zh": f"{lowest_immers_pos_increments:05}", # lowest immersion position [increment]
"zc": f"{start_pos_search_increments:05}", # start position of LLD search [increment]
"zi": f"{post_detection_dist_increments:04}", # immersion depth after LLD [increment]
"zj": f"{post_detection_direction}", # direction of immersion depth (0 down, 1 up)
**lm_field, # which cLLD sensor(s) trigger detection (2013+ only)
"gt": f"{detection_edge:04}", # edge steepness at cLLD detection
"gl": f"{detection_drop:04}", # offset after cLLD edge detection
"zv": f"{approach_speed_increments:05}", # upper-section (fast approach) speed
"zl": f"{speed_increments:05}", # cLLD search speed
"zr": zr_field, # acceleration
"zw": zw_field, # current protection limiter
}
try:
await self.send_command(module="H0", command="ZL", **zl_params)
except STARFirmwareError:
await self.head96_move_to_z_safety()
raise

# RH returns the latched detected surface (stop-disk frame), unaffected by the post-detection
# move; map it to tip-bottom. TODO(hardware): confirm the RH response format against a capture.
detected_tip_bottom = round(await self.head96_request_last_lld_height() - tip_overhang, 2)
if move_to_z_safety_after:
await self.head96_move_to_z_safety()
return detected_tip_bottom

# -------------- 3.10.2 Tip handling using CoRe 96 Head --------------

@need_iswap_parked
Expand Down Expand Up @@ -9727,6 +9931,19 @@ async def head96_request_stop_disk_z(self) -> float:
resp = await self.send_command(module="H0", command="RZ", fmt="rz##### (n)")
return self._head96_z_drive_increment_to_mm(resp["rz"][1]) # [0] = FW counter, [1] = HW counter

async def head96_request_last_lld_height(self) -> float:
"""Request the liquid-surface position the last 96-head cLLD search found, in mm (H0 RH).

Unlike `head96_request_stop_disk_z` (the head's current position), this is the latched surface
the last `ZL` search detected, so it is unaffected by any post-detection move - the head
counterpart of the channel `request_pip_height_last_lld`.

Returns:
Detected liquid-surface Z position (stop-disk frame) in mm.
"""
resp = await self.send_command(module="H0", command="RH", fmt="rh#####")
return self._head96_z_drive_increment_to_mm(resp["rh"])

async def _head96_probe_z_max(self) -> float:
"""Probe the reachable Z top (mm) for this unit: drive to the firmware Z-safety height (C0 EV)
and read the stop disk there. The generic command-range max can exceed what this unit actually
Expand Down
57 changes: 57 additions & 0 deletions pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,63 @@ async def test_head96_experimental_aspirate_minimum_height_defaults_to_floor(sel
]
)

async def test_head96_probe_z_using_clld_wire_string(self):
"""The 2013+ ZL command assembles in the documented field order with the tip-overhang offset.

Guards the zc 5-digit width (6 caused firmware er32), the tip-bottom -> stop-disk mapping, the
zv/zw fields, and approach_speed=None -> head96_z_drive_speed_default. Returns the detected
surface as a tip-bottom position (stop disk minus overhang).
"""
self.STAR._head96_information = _make_head96_information(self.STAR)
self.STAR._head96_z_drive_speed_default = 85.0
self.STAR.head96_request_tip_presence = unittest.mock.AsyncMock(return_value=1)
self.STAR.head96_request_last_lld_height = unittest.mock.AsyncMock(return_value=200.0)
self.STAR._write_and_read_command.reset_mock()
detected = await self.STAR.head96_probe_z_using_clld(
tip_len=50.0, # overhang = 50 - 8 = 42 mm
lowest_immers_pos=140.0,
start_pos_search=250.0,
speed=10.0,
acceleration=300.0,
approach_speed=None, # -> head96_z_drive_speed_default = 85.0
current_protection_limiter=15,
lld_sensor="any",
detection_edge=10,
detection_drop=2,
post_detection_dist=2.0,
)
self.STAR._write_and_read_command.assert_has_calls(
[
_any_write_and_read_command_call(
"H0ZLid0001zh36400zc58400zi0400zj1lm2gt0010gl0002zv17000zl02000zr060000zw15"
)
]
)
self.assertEqual(detected, 158.0) # 200.0 detected surface - 42 overhang

async def test_head96_probe_z_using_clld_requires_tip(self):
"""cLLD raises if the head holds no tip, whether tip_len is measured or supplied."""
self.STAR._head96_information = _make_head96_information(self.STAR)
self.STAR._head96_z_drive_speed_default = 85.0
self.STAR.head96_request_tip_presence = unittest.mock.AsyncMock(return_value=0)
with self.assertRaises(ValueError):
await self.STAR.head96_probe_z_using_clld()
with self.assertRaises(ValueError):
await self.STAR.head96_probe_z_using_clld(tip_len=50.0)

async def test_head96_probe_z_using_clld_retracts_on_firmware_error(self):
"""A firmware error during the search retracts the head to Z-safety before re-raising."""
self.STAR._head96_information = _make_head96_information(self.STAR)
self.STAR._head96_z_drive_speed_default = 85.0
self.STAR.head96_request_tip_presence = unittest.mock.AsyncMock(return_value=1)
self.STAR.head96_move_to_z_safety = unittest.mock.AsyncMock()
self.STAR._write_and_read_command = unittest.mock.AsyncMock(
side_effect=STARFirmwareError(errors={}, raw_response="H0ZLid0001er32")
)
with self.assertRaises(STARFirmwareError):
await self.STAR.head96_probe_z_using_clld(tip_len=50.0)
self.STAR.head96_move_to_z_safety.assert_awaited_once()

async def test_core_96_dispense_quadrant(self):
"""Test that each quadrant of a 384-well plate produces the correct firmware command.

Expand Down
Loading