Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions pylabrobot/barcode_scanners/keyence/keyence_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,37 @@ async def initialize(self):
)

async def send_command(self, command: str) -> str:
"""Send a command to the barcode scanner and return the response.
Keyence uses carriage return \r as the line ending by default."""
"""Send a command and return its reply, accumulated byte-by-byte up to the \r terminator.

Replies are bare \r-terminated and silence is a valid empty reply (e.g. no barcode), so
stop at \r or the first byte-less read rather than reading a fixed byte count."""

await self.io.write((command + "\r").encode(self.serial_messaging_encoding))
response = await self.io.read()
return response.decode(self.serial_messaging_encoding).strip()
buf = bytearray()
while True:
chunk = await self.io.read()
if not chunk:
break # port timeout elapsed with no byte: reply done, or none coming
buf.extend(chunk)
if chunk == b"\r":
break
return buf.decode(self.serial_messaging_encoding).strip()

async def stop(self):
await self.io.stop()

async def scan_barcode(self) -> Barcode:
data = await self.send_command("LON")
if data.startswith("NG"):
raise BarcodeScannerError("Barcode reader is off: cannot read barcode")
if data.startswith("ERR99"):
raise BarcodeScannerError(f"Error response from barcode reader: {data}")
return Barcode(data=data, symbology="unknown", position_on_resource="front")
try:
data = await self.send_command("LON")
if data.startswith("NG"):
raise BarcodeScannerError("Barcode reader is off: cannot read barcode")
if data.startswith("ERR99"):
raise BarcodeScannerError(f"Error response from barcode reader: {data}")
return Barcode(data=data, symbology="unknown", position_on_resource="front")
finally:
# LON latches the read beam on; release it whether the read succeeded or raised.
# try/except so a LOFF failure can't mask the scan's result.
try:
await self.send_command("LOFF")
except Exception:
logger.warning("Failed to turn off barcode reader beam (LOFF)", exc_info=True)
96 changes: 96 additions & 0 deletions pylabrobot/barcode_scanners/keyence/keyence_backend_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import unittest
from unittest.mock import AsyncMock, call

from pylabrobot.barcode_scanners.backend import BarcodeScannerError
from pylabrobot.barcode_scanners.keyence.keyence_backend import (
HAS_SERIAL,
KeyenceBarcodeScannerBackend,
)
from pylabrobot.resources.barcode import Barcode


@unittest.skipUnless(HAS_SERIAL, "pyserial is not installed")
class TestKeyenceSendCommand(unittest.IsolatedAsyncioTestCase):
"""send_command must read a full \\r-terminated reply, not a single byte."""

def setUp(self):
self.backend = KeyenceBarcodeScannerBackend(port="COM1")
self.backend.io.write = AsyncMock() # type: ignore
self.backend.io.read = AsyncMock() # type: ignore

async def test_writes_carriage_return_terminated_command(self):
self.backend.io.read.side_effect = [b"\r"] # type: ignore
await self.backend.send_command("RMOTOR")
self.backend.io.write.assert_awaited_once_with(b"RMOTOR\r") # type: ignore

async def test_accumulates_reply_until_carriage_return(self):
# The bug: a single io.read() returned one byte and truncated the reply.
self.backend.io.read.side_effect = [b"M", b"O", b"T", b"O", b"R", b"O", b"N", b"\r"] # type: ignore
response = await self.backend.send_command("RMOTOR")
self.assertEqual(response, "MOTORON")

async def test_stops_reading_at_terminator(self):
# Once \r lands the reply is complete; reading again would block on the next command.
self.backend.io.read.side_effect = [b"O", b"K", b"\r"] # type: ignore
await self.backend.send_command("LOFF")
self.assertEqual(self.backend.io.read.await_count, 3) # type: ignore

async def test_byteless_read_is_an_empty_reply(self):
# A byte-less read means the port timeout elapsed with no reply (e.g. no barcode).
self.backend.io.read.side_effect = [b""] # type: ignore
response = await self.backend.send_command("LON")
self.assertEqual(response, "")


@unittest.skipUnless(HAS_SERIAL, "pyserial is not installed")
class TestKeyenceScanBarcode(unittest.IsolatedAsyncioTestCase):
"""scan_barcode must release the LON-latched read beam with LOFF, success or failure."""

def setUp(self):
self.backend = KeyenceBarcodeScannerBackend(port="COM1")
self.replies: dict[str, str] = {}
self.raise_on: set[str] = set()

def _send(command: str) -> str:
if command in self.raise_on:
raise RuntimeError(f"simulated failure on {command}")
return self.replies.get(command, "")

self.backend.send_command = AsyncMock(side_effect=_send) # type: ignore[method-assign]

async def test_success_returns_barcode_then_releases_beam(self):
self.replies = {"LON": "ABC123", "LOFF": ""}
barcode = await self.backend.scan_barcode()
self.assertIsInstance(barcode, Barcode)
self.assertEqual(barcode.data, "ABC123")
self.backend.send_command.assert_has_calls([call("LON"), call("LOFF")]) # type: ignore

async def test_releases_beam_when_reader_off(self):
self.replies = {"LON": "NG", "LOFF": ""}
with self.assertRaises(BarcodeScannerError):
await self.backend.scan_barcode()
self.backend.send_command.assert_has_calls([call("LON"), call("LOFF")]) # type: ignore

async def test_releases_beam_on_error_response(self):
self.replies = {"LON": "ERR99", "LOFF": ""}
with self.assertRaises(BarcodeScannerError):
await self.backend.scan_barcode()
self.backend.send_command.assert_has_calls([call("LON"), call("LOFF")]) # type: ignore

async def test_loff_failure_does_not_mask_successful_scan(self):
# A failing LOFF is logged, never propagated, so the scan result survives.
self.replies = {"LON": "ABC123"}
self.raise_on = {"LOFF"}
barcode = await self.backend.scan_barcode()
self.assertEqual(barcode.data, "ABC123")

async def test_loff_failure_does_not_mask_scan_error(self):
# The reader error must propagate, not the swallowed LOFF failure.
self.replies = {"LON": "NG"}
self.raise_on = {"LOFF"}
with self.assertRaises(BarcodeScannerError):
await self.backend.scan_barcode()


if __name__ == "__main__":
unittest.main()
Loading