From 9681d6f2f6e41707cae0f02ac771295cc8d12909 Mon Sep 17 00:00:00 2001 From: Paolo Saviano Date: Tue, 3 Feb 2026 12:16:22 +0100 Subject: [PATCH 1/2] feat(control-payload): add CONFIGURE/RESET control signals - _node.py now handles the new CONFIGURE and RESET ControlSignals - _node.py now has both the update_config and the reset_config signatures - add a data dictionary to ControlPayload - generate a new CONFIGURE signal in remote service on incoming requests to manage the configuration attribute - update passthrough identity node to handle the configuration update properly --- juturna/components/_node.py | 14 +++++++++++++ juturna/payloads/_control_signal.py | 2 ++ juturna/payloads/_payloads.py | 1 + juturna/remotizer/_remote_service.py | 20 ++++++++++++++----- .../passthrough_identity.py | 8 ++++++++ 5 files changed, 40 insertions(+), 5 deletions(-) diff --git a/juturna/components/_node.py b/juturna/components/_node.py index dcac7a46..3c580260 100644 --- a/juturna/components/_node.py +++ b/juturna/components/_node.py @@ -402,6 +402,10 @@ def configure(self): ... def update(self, message: Message[T_Input]): ... + def update_config(self, config: dict): ... + + def reset_config(self): ... + def set_on_config(self, prop: str, value: Any): ... def warmup(self): ... @@ -504,6 +508,16 @@ def _control(self, message: Message): self._suspended = False self._logger.info('node resumed') + return + case ControlSignal.CONFIGURE: + self.update_config(message.payload.data) + self._logger.info('node reconfigured') + + return + case ControlSignal.RESET: + self.reset_config() + self._logger.info('node reset') + return case None: return diff --git a/juturna/payloads/_control_signal.py b/juturna/payloads/_control_signal.py index 13093094..a61d9084 100644 --- a/juturna/payloads/_control_signal.py +++ b/juturna/payloads/_control_signal.py @@ -20,3 +20,5 @@ class ControlSignal(IntEnum): WARMUP: int = 2 SUSPEND: int = 3 RESUME: int = 4 + CONFIGURE: int = 5 + RESET: int = 6 diff --git a/juturna/payloads/_payloads.py b/juturna/payloads/_payloads.py index 730a57fd..91d31062 100644 --- a/juturna/payloads/_payloads.py +++ b/juturna/payloads/_payloads.py @@ -25,6 +25,7 @@ def serialize(obj): @dataclass(frozen=True) class ControlPayload(BasePayload): signal: ControlSignal = ControlSignal.STOP + data: dict = field(default_factory=dict) @dataclass(frozen=True) diff --git a/juturna/remotizer/_remote_service.py b/juturna/remotizer/_remote_service.py index 92739ec4..659b7504 100644 --- a/juturna/remotizer/_remote_service.py +++ b/juturna/remotizer/_remote_service.py @@ -11,10 +11,13 @@ import json import time import itertools +from concurrent import futures -from juturna.remotizer._remote_context import RequestContext +from juturna.components import Message, Node +from juturna.payloads import ControlPayload, ControlSignal -from concurrent import futures +from juturna.remotizer._remote_context import RequestContext +from juturna.remotizer._remote_builder import _standalone_builder from juturna.remotizer.utils import ( deserialize_envelope, @@ -22,8 +25,6 @@ message_to_proto, ) -from juturna.components import Message, Node -from juturna.remotizer._remote_builder import _standalone_builder from c_protos.payloads_pb2 import ( ProtoEnvelope, @@ -185,9 +186,18 @@ def SendAndReceive(self, request: ProtoEnvelope, context): request_message.id = tracking_id if envelope_dict.get('configuration'): - _configuration_to_be_applied = envelope_dict.get( + configuration_to_be_applied = envelope_dict.get( 'configuration', {} ) + configuration_message = Message( + creator=self.remote_name, + version=request_message.version, + payload=ControlPayload( + signal=ControlSignal.CONFIGURE, + data=configuration_to_be_applied, + ), + ) + self.node.put(configuration_message) request_message._freeze() self.node.put(request_message) diff --git a/plugins/nodes/proc/_passthrough_identity/passthrough_identity.py b/plugins/nodes/proc/_passthrough_identity/passthrough_identity.py index 86ffb71c..58892698 100644 --- a/plugins/nodes/proc/_passthrough_identity/passthrough_identity.py +++ b/plugins/nodes/proc/_passthrough_identity/passthrough_identity.py @@ -33,6 +33,14 @@ def __init__(self, delay: int, **kwargs): self._delay = delay self._transmitted = 0 + def update_config(self, config: dict): + """Update the node configuration""" + if 'delay' in config: + self._delay = config['delay'] + self.logger.info( + f'config updated: delay set to {self._delay} seconds' + ) + def update(self, message: Message[BasePayload]): """Receive a message from downstream, transmit a message upstream""" self.logger.info( From 742618d03b809875daaf821970e35a37a6c6a71f Mon Sep 17 00:00:00 2001 From: Paolo Saviano Date: Tue, 3 Feb 2026 12:31:38 +0100 Subject: [PATCH 2/2] fix(warp): add configuration to proto deserialization utils --- juturna/remotizer/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/juturna/remotizer/utils.py b/juturna/remotizer/utils.py index 36487d53..f678654a 100644 --- a/juturna/remotizer/utils.py +++ b/juturna/remotizer/utils.py @@ -360,6 +360,7 @@ def deserialize_envelope(envelope: ProtoEnvelope) -> dict[str, Any]: 'ttl': envelope.ttl, 'request_type': envelope.request_type, 'response_type': envelope.response_type, + 'configuration': dict(envelope.configuration), 'message': message, } return envelope_dict