Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion labgrid/remote/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from contextlib import contextmanager
import copy
import random
import signal

import attr
import grpc
Expand Down Expand Up @@ -1143,7 +1144,7 @@ async def serve(listen, cleanup) -> None:
except ImportError:
logging.info("Module grpcio-channelz not available")

server.add_insecure_port(listen)
bound = server.add_insecure_port(listen)
logging.debug("Starting server")
await server.start()

Expand All @@ -1157,8 +1158,18 @@ async def server_graceful_shutdown():
# existing RPCs to continue within the grace period.
await server.stop(5)

def callback():
asyncio.ensure_future(server_graceful_shutdown())

cleanup.append(server_graceful_shutdown())
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, callback)
loop.add_signal_handler(signal.SIGTERM, callback)
logging.info("Coordinator ready")
host, sep, port = listen.rpartition(":")
if not sep or not port.isdigit():
host = listen
print(f"listening on {host}:{bound}", flush=True)
await server.wait_for_termination()


Expand Down
20 changes: 18 additions & 2 deletions labgrid/remote/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import os
import os.path
import traceback
import signal
import shutil
import subprocess
from urllib.parse import urlsplit
Expand Down Expand Up @@ -884,8 +885,11 @@ async def run(self) -> None:
for task in pending:
task.cancel()

await self.pump_task
await self.poll_task
try:
await self.pump_task
await self.poll_task
except asyncio.CancelledError:
return

def send_started(self):
msg = labgrid_coordinator_pb2.ExporterInMessage()
Expand All @@ -901,6 +905,7 @@ async def message_pump(self):
logging.debug("received message %s", out_message)
kind = out_message.WhichOneof("kind")
if kind == "hello":
print("Exporter ready", flush=True)
logging.info("connected to coordinator version %s", out_message.hello.version)
elif kind == "set_acquired_request":
logging.debug("acquire request")
Expand Down Expand Up @@ -1009,6 +1014,10 @@ async def poll(self):
except Exception: # pylint: disable=broad-except
traceback.print_exc(file=sys.stderr)

async def stop(self):
if self.poll_task is not None:
self.poll_task.cancel()

async def add_resource(self, group_name, resource_name, cls, params):
"""Add a resource to the exporter and update status on the coordinator"""
print(f"add resource {group_name}/{resource_name}: {cls}/{params}")
Expand Down Expand Up @@ -1051,6 +1060,13 @@ async def amain(config) -> bool:
if inspect:
inspect.exporter = exporter

def _stop():
asyncio.ensure_future(exporter.stop())

loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, _stop)
loop.add_signal_handler(signal.SIGTERM, _stop)

await exporter.run()


Expand Down
Loading