diff --git a/pylabrobot/barcode_scanners/keyence/keyence_backend.py b/pylabrobot/barcode_scanners/keyence/keyence_backend.py index e79501fda71..92dee9935c6 100644 --- a/pylabrobot/barcode_scanners/keyence/keyence_backend.py +++ b/pylabrobot/barcode_scanners/keyence/keyence_backend.py @@ -73,20 +73,37 @@ async def initialize(self): ) async def send_command(self, command: str) -> str: - """Send a command to the barcode scanner and return the response. - Keyence uses carriage return \r as the line ending by default.""" + """Send a command and return its reply, accumulated byte-by-byte up to the \r terminator. + + Replies are bare \r-terminated and silence is a valid empty reply (e.g. no barcode), so + stop at \r or the first byte-less read rather than reading a fixed byte count.""" await self.io.write((command + "\r").encode(self.serial_messaging_encoding)) - response = await self.io.read() - return response.decode(self.serial_messaging_encoding).strip() + buf = bytearray() + while True: + chunk = await self.io.read() + if not chunk: + break # port timeout elapsed with no byte: reply done, or none coming + buf.extend(chunk) + if chunk == b"\r": + break + return buf.decode(self.serial_messaging_encoding).strip() async def stop(self): await self.io.stop() async def scan_barcode(self) -> Barcode: - data = await self.send_command("LON") - if data.startswith("NG"): - raise BarcodeScannerError("Barcode reader is off: cannot read barcode") - if data.startswith("ERR99"): - raise BarcodeScannerError(f"Error response from barcode reader: {data}") - return Barcode(data=data, symbology="unknown", position_on_resource="front") + try: + data = await self.send_command("LON") + if data.startswith("NG"): + raise BarcodeScannerError("Barcode reader is off: cannot read barcode") + if data.startswith("ERR99"): + raise BarcodeScannerError(f"Error response from barcode reader: {data}") + return Barcode(data=data, symbology="unknown", position_on_resource="front") + finally: + # LON latches the read beam on; release it whether the read succeeded or raised. + # try/except so a LOFF failure can't mask the scan's result. + try: + await self.send_command("LOFF") + except Exception: + logger.warning("Failed to turn off barcode reader beam (LOFF)", exc_info=True) diff --git a/pylabrobot/barcode_scanners/keyence/keyence_backend_tests.py b/pylabrobot/barcode_scanners/keyence/keyence_backend_tests.py new file mode 100644 index 00000000000..69f7285f564 --- /dev/null +++ b/pylabrobot/barcode_scanners/keyence/keyence_backend_tests.py @@ -0,0 +1,96 @@ +import unittest +from unittest.mock import AsyncMock, call + +from pylabrobot.barcode_scanners.backend import BarcodeScannerError +from pylabrobot.barcode_scanners.keyence.keyence_backend import ( + HAS_SERIAL, + KeyenceBarcodeScannerBackend, +) +from pylabrobot.resources.barcode import Barcode + + +@unittest.skipUnless(HAS_SERIAL, "pyserial is not installed") +class TestKeyenceSendCommand(unittest.IsolatedAsyncioTestCase): + """send_command must read a full \\r-terminated reply, not a single byte.""" + + def setUp(self): + self.backend = KeyenceBarcodeScannerBackend(port="COM1") + self.backend.io.write = AsyncMock() # type: ignore + self.backend.io.read = AsyncMock() # type: ignore + + async def test_writes_carriage_return_terminated_command(self): + self.backend.io.read.side_effect = [b"\r"] # type: ignore + await self.backend.send_command("RMOTOR") + self.backend.io.write.assert_awaited_once_with(b"RMOTOR\r") # type: ignore + + async def test_accumulates_reply_until_carriage_return(self): + # The bug: a single io.read() returned one byte and truncated the reply. + self.backend.io.read.side_effect = [b"M", b"O", b"T", b"O", b"R", b"O", b"N", b"\r"] # type: ignore + response = await self.backend.send_command("RMOTOR") + self.assertEqual(response, "MOTORON") + + async def test_stops_reading_at_terminator(self): + # Once \r lands the reply is complete; reading again would block on the next command. + self.backend.io.read.side_effect = [b"O", b"K", b"\r"] # type: ignore + await self.backend.send_command("LOFF") + self.assertEqual(self.backend.io.read.await_count, 3) # type: ignore + + async def test_byteless_read_is_an_empty_reply(self): + # A byte-less read means the port timeout elapsed with no reply (e.g. no barcode). + self.backend.io.read.side_effect = [b""] # type: ignore + response = await self.backend.send_command("LON") + self.assertEqual(response, "") + + +@unittest.skipUnless(HAS_SERIAL, "pyserial is not installed") +class TestKeyenceScanBarcode(unittest.IsolatedAsyncioTestCase): + """scan_barcode must release the LON-latched read beam with LOFF, success or failure.""" + + def setUp(self): + self.backend = KeyenceBarcodeScannerBackend(port="COM1") + self.replies: dict[str, str] = {} + self.raise_on: set[str] = set() + + def _send(command: str) -> str: + if command in self.raise_on: + raise RuntimeError(f"simulated failure on {command}") + return self.replies.get(command, "") + + self.backend.send_command = AsyncMock(side_effect=_send) # type: ignore[method-assign] + + async def test_success_returns_barcode_then_releases_beam(self): + self.replies = {"LON": "ABC123", "LOFF": ""} + barcode = await self.backend.scan_barcode() + self.assertIsInstance(barcode, Barcode) + self.assertEqual(barcode.data, "ABC123") + self.backend.send_command.assert_has_calls([call("LON"), call("LOFF")]) # type: ignore + + async def test_releases_beam_when_reader_off(self): + self.replies = {"LON": "NG", "LOFF": ""} + with self.assertRaises(BarcodeScannerError): + await self.backend.scan_barcode() + self.backend.send_command.assert_has_calls([call("LON"), call("LOFF")]) # type: ignore + + async def test_releases_beam_on_error_response(self): + self.replies = {"LON": "ERR99", "LOFF": ""} + with self.assertRaises(BarcodeScannerError): + await self.backend.scan_barcode() + self.backend.send_command.assert_has_calls([call("LON"), call("LOFF")]) # type: ignore + + async def test_loff_failure_does_not_mask_successful_scan(self): + # A failing LOFF is logged, never propagated, so the scan result survives. + self.replies = {"LON": "ABC123"} + self.raise_on = {"LOFF"} + barcode = await self.backend.scan_barcode() + self.assertEqual(barcode.data, "ABC123") + + async def test_loff_failure_does_not_mask_scan_error(self): + # The reader error must propagate, not the swallowed LOFF failure. + self.replies = {"LON": "NG"} + self.raise_on = {"LOFF"} + with self.assertRaises(BarcodeScannerError): + await self.backend.scan_barcode() + + +if __name__ == "__main__": + unittest.main()