diff --git a/labgrid/remote/coordinator.py b/labgrid/remote/coordinator.py index a63a21482..89ad66da8 100644 --- a/labgrid/remote/coordinator.py +++ b/labgrid/remote/coordinator.py @@ -9,6 +9,7 @@ from contextlib import contextmanager import copy import random +import signal import attr import grpc @@ -1143,7 +1144,7 @@ async def serve(listen, cleanup) -> None: except ImportError: logging.info("Module grpcio-channelz not available") - server.add_insecure_port(listen) + bound = server.add_insecure_port(listen) logging.debug("Starting server") await server.start() @@ -1157,8 +1158,18 @@ async def server_graceful_shutdown(): # existing RPCs to continue within the grace period. await server.stop(5) + def callback(): + asyncio.ensure_future(server_graceful_shutdown()) + cleanup.append(server_graceful_shutdown()) + loop = asyncio.get_event_loop() + loop.add_signal_handler(signal.SIGINT, callback) + loop.add_signal_handler(signal.SIGTERM, callback) logging.info("Coordinator ready") + host, sep, port = listen.rpartition(":") + if not sep or not port.isdigit(): + host = listen + print(f"listening on {host}:{bound}", flush=True) await server.wait_for_termination() diff --git a/labgrid/remote/exporter.py b/labgrid/remote/exporter.py index 0e48f1942..383d97c82 100755 --- a/labgrid/remote/exporter.py +++ b/labgrid/remote/exporter.py @@ -8,6 +8,7 @@ import os import os.path import traceback +import signal import shutil import subprocess from urllib.parse import urlsplit @@ -884,8 +885,11 @@ async def run(self) -> None: for task in pending: task.cancel() - await self.pump_task - await self.poll_task + try: + await self.pump_task + await self.poll_task + except asyncio.CancelledError: + return def send_started(self): msg = labgrid_coordinator_pb2.ExporterInMessage() @@ -901,6 +905,7 @@ async def message_pump(self): logging.debug("received message %s", out_message) kind = out_message.WhichOneof("kind") if kind == "hello": + print("Exporter ready", flush=True) logging.info("connected to coordinator version %s", out_message.hello.version) elif kind == "set_acquired_request": logging.debug("acquire request") @@ -1009,6 +1014,10 @@ async def poll(self): except Exception: # pylint: disable=broad-except traceback.print_exc(file=sys.stderr) + async def stop(self): + if self.poll_task is not None: + self.poll_task.cancel() + async def add_resource(self, group_name, resource_name, cls, params): """Add a resource to the exporter and update status on the coordinator""" print(f"add resource {group_name}/{resource_name}: {cls}/{params}") @@ -1051,6 +1060,13 @@ async def amain(config) -> bool: if inspect: inspect.exporter = exporter + def _stop(): + asyncio.ensure_future(exporter.stop()) + + loop = asyncio.get_event_loop() + loop.add_signal_handler(signal.SIGINT, _stop) + loop.add_signal_handler(signal.SIGTERM, _stop) + await exporter.run()