Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion packages/opal-server/opal_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ async def start_server_background_tasks(self):
await self.broadcast_listening_context.__aenter__()
# if the broadcast channel is closed, we want to restart worker process because statistics can't be reliable anymore
self.broadcast_listening_context._event_broadcaster.get_reader_task().add_done_callback(
lambda _: self._graceful_shutdown()
self._handle_broadcast_listener_done
)
asyncio.create_task(self.opal_statistics.run())
self.pubsub.endpoint.notifier.register_unsubscribe_event(
Expand Down Expand Up @@ -412,6 +412,26 @@ async def stop_server_background_tasks(self):
except Exception:
logger.exception("exception while shutting down background tasks")

def _handle_broadcast_listener_done(self, task: asyncio.Task):
try:
exception = task.exception()
except asyncio.CancelledError:
logger.info("Broadcast listener task was cancelled")
except Exception as exc:
logger.error(
"Failed to read broadcast listener task result: {err}",
err=repr(exc),
)
else:
if exception is not None:
logger.error(
"Broadcast listener failed; check OPAL server broadcast URI '{uri}': {err}",
uri=self.broadcaster_uri,
err=repr(exception),
)

self._graceful_shutdown()

def _graceful_shutdown(self):
logger.info("Trigger worker graceful shutdown")
os.kill(os.getpid(), signal.SIGTERM)
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import asyncio

from opal_server import server as server_module
from opal_server.server import OpalServer


class CapturingLogger:
def __init__(self):
self.errors = []
self.infos = []

def error(self, message, **kwargs):
self.errors.append((message, kwargs))

def info(self, message, **kwargs):
self.infos.append((message, kwargs))


class DoneTask:
def __init__(self, result=None, error=None):
self.result = result
self.error = error

def exception(self):
if self.error is not None:
raise self.error
return self.result


def make_server(monkeypatch):
opal_server = object.__new__(OpalServer)
opal_server.broadcaster_uri = "postgres://bad-host:5432/postgres"
shutdowns = []
monkeypatch.setattr(
opal_server,
"_graceful_shutdown",
lambda: shutdowns.append("shutdown"),
)
return opal_server, shutdowns


def test_logs_broadcast_listener_exception(monkeypatch):
opal_server, shutdowns = make_server(monkeypatch)
captured = CapturingLogger()
monkeypatch.setattr(server_module, "logger", captured)

opal_server._handle_broadcast_listener_done(
DoneTask(result=RuntimeError("could not resolve host"))
)

assert shutdowns == ["shutdown"]
assert captured.errors == [
(
"Broadcast listener failed; check OPAL server broadcast URI '{uri}': {err}",
{
"uri": "postgres://bad-host:5432/postgres",
"err": "RuntimeError('could not resolve host')",
},
)
]


def test_logs_cancelled_broadcast_listener(monkeypatch):
opal_server, shutdowns = make_server(monkeypatch)
captured = CapturingLogger()
monkeypatch.setattr(server_module, "logger", captured)

opal_server._handle_broadcast_listener_done(
DoneTask(error=asyncio.CancelledError())
)

assert shutdowns == ["shutdown"]
assert captured.infos == [("Broadcast listener task was cancelled", {})]


def test_logs_failure_to_read_listener_result(monkeypatch):
opal_server, shutdowns = make_server(monkeypatch)
captured = CapturingLogger()
monkeypatch.setattr(server_module, "logger", captured)

opal_server._handle_broadcast_listener_done(
DoneTask(error=ValueError("task state unavailable"))
)

assert shutdowns == ["shutdown"]
assert captured.errors == [
(
"Failed to read broadcast listener task result: {err}",
{"err": "ValueError('task state unavailable')"},
)
]