diff --git a/docs/source/tutorials/basic.md b/docs/source/tutorials/basic.md index e490fa13f7..6acf442a5b 100644 --- a/docs/source/tutorials/basic.md +++ b/docs/source/tutorials/basic.md @@ -189,9 +189,16 @@ y = orm.Int(3) Now it's time to multiply the two numbers! ```{code-cell} ipython3 -multiply(x, y) +await multiply.run_async(x, y) ``` +:::{note} +Because this tutorial runs as a Jupyter notebook, we use `await multiply.run_async(...)` instead of the synchronous `multiply(...)`. +Jupyter's kernel has an event loop already running, so process execution must be `await`-ed rather than run synchronously. +In a regular Python script, the event loop is not running, so you can call `multiply(x, y)` directly. +The same applies to `engine.run_async` vs `engine.run` used later in this tutorial. +::: + Success! The `calcfunction`-decorated `multiply` function has multiplied the two `Int` data nodes and returned a new `Int` data node whose value is the product of the two input nodes. Note that by executing the `multiply` function, all input and output nodes are automatically stored in the database: @@ -372,10 +379,10 @@ One nifty feature of the builder is the ability to use tab completion for the in Try it out by typing `builder.` + `` in the verdi shell. ::: -To execute the `CalcJob`, we use the `run` function provided by the AiiDA engine, and wait for the process to complete: +To execute the `CalcJob`, we use the `run_async` function provided by the AiiDA engine, and wait for the process to complete: ```{code-cell} ipython3 -engine.run(builder) +await engine.run_async(builder) ``` Besides the sum of the two `Int` nodes, the calculation function also returns two other outputs: one of type `RemoteData` and one of type `FolderData`. @@ -532,7 +539,7 @@ Once the `WorkChain` input has been set up, we run it with the AiiDA engine: ```{code-cell} ipython3 from aiida import engine -engine.run(builder) +await engine.run_async(builder) ``` Now quickly leave the IPython shell and check the process list: @@ -608,7 +615,7 @@ The provenance graph should be similar to the one we showed at the start of this ## Submitting to the daemon -When we used the `run` command in the previous sections, the IPython shell was blocked while it was waiting for the `CalcJob` to finish. +When we used the `run_async` function in the previous sections, the notebook cell was waiting for the `CalcJob` to finish. This is not a problem when we're simply adding two number together, but if we want to run multiple calculations that take hours or days, this is no longer practical. Instead, we are going to *submit* the `CalcJob` to the AiiDA *daemon*. The daemon is a program that runs in the background and manages submitted calculations until they are *terminated*. diff --git a/environment.yml b/environment.yml index 62aa69f83f..d189874b35 100644 --- a/environment.yml +++ b/environment.yml @@ -16,11 +16,11 @@ dependencies: - docstring_parser - get-annotations~=0.1 - python-graphviz~=0.19 -- plumpy~=0.25.0 +- plumpy@ git+https://github.com/khsrali/plumpy.git@greenlet - ipython>=7.6 - jedi<0.19 - jinja2~=3.0 -- kiwipy[rmq]~=0.8.4 +- kiwipy[rmq]~=0.9.0 - importlib-metadata~=6.0 - numpy<3,>=1.21 - paramiko~=3.0 diff --git a/pyproject.toml b/pyproject.toml index 5c9d205c87..16224dbdf0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,11 +39,11 @@ dependencies = [ 'docstring-parser', 'get-annotations~=0.1;python_version<"3.10"', 'graphviz~=0.19', - 'plumpy~=0.25.0', + 'plumpy @ git+https://github.com/khsrali/plumpy.git@greenlet', 'ipython>=7.6', 'jedi<0.19', 'jinja2~=3.0', - 'kiwipy[rmq]~=0.8.4', + 'kiwipy[rmq]~=0.9.0', 'importlib-metadata~=6.0', 'numpy>=1.21,<3', 'paramiko~=3.0', @@ -412,7 +412,6 @@ filterwarnings = [ 'ignore:The `Code` class is deprecated.*:aiida.common.warnings.AiidaDeprecationWarning', # https://github.com/aiidateam/plumpy/issues/283 'ignore:There is no current event loop:DeprecationWarning:plumpy', - 'ignore:There is no current event loop:DeprecationWarning:nest_asyncio', # spglib deprecation 'ignore:dict interface is deprecated:DeprecationWarning', # https://github.com/aiidateam/archive-path/issues/21 diff --git a/src/aiida/engine/__init__.py b/src/aiida/engine/__init__.py index 0a855c6484..869308be5a 100644 --- a/src/aiida/engine/__init__.py +++ b/src/aiida/engine/__init__.py @@ -68,8 +68,11 @@ 'process_handler', 'return_', 'run', + 'run_async', 'run_get_node', + 'run_get_node_async', 'run_get_pk', + 'run_get_pk_async', 'submit', 'while_', 'workfunction', diff --git a/src/aiida/engine/launch.py b/src/aiida/engine/launch.py index 78e42df19b..c7b7b8685b 100644 --- a/src/aiida/engine/launch.py +++ b/src/aiida/engine/launch.py @@ -25,7 +25,16 @@ from .runners import ResultAndPk from .utils import instantiate_process, is_process_scoped, prepare_inputs -__all__ = ('await_processes', 'run', 'run_get_node', 'run_get_pk', 'submit') +__all__ = ( + 'await_processes', + 'run', + 'run_async', + 'run_get_node', + 'run_get_node_async', + 'run_get_pk', + 'run_get_pk_async', + 'submit', +) TYPE_RUN_PROCESS = t.Union[Process, t.Type[Process], ProcessBuilder] # run can also be process function, but it is not clear what type this should be @@ -48,6 +57,23 @@ def run(process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwa return runner.run(process, inputs, **kwargs) +async def run_async( + process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwargs: t.Any +) -> dict[str, t.Any]: + """Run the process with the supplied inputs in a local runner without blocking the event loop. + + :param process: the process class or process function to run + :param inputs: the inputs to be passed to the process + :return: the outputs of the process + """ + if isinstance(process, Process): + runner = process.runner + else: + runner = manager.get_manager().get_runner() + + return await runner.run_async(process, inputs, **kwargs) + + def run_get_node( process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwargs: t.Any ) -> tuple[dict[str, t.Any], ProcessNode]: @@ -65,6 +91,23 @@ def run_get_node( return runner.run_get_node(process, inputs, **kwargs) +async def run_get_node_async( + process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwargs: t.Any +) -> tuple[dict[str, t.Any], ProcessNode]: + """Run the process with the supplied inputs in a local runner without blocking the event loop. + + :param process: the process class, instance, builder or function to run + :param inputs: the inputs to be passed to the process + :return: tuple of the outputs of the process and the process node + """ + if isinstance(process, Process): + runner = process.runner + else: + runner = manager.get_manager().get_runner() + + return await runner.run_get_node_async(process, inputs, **kwargs) + + def run_get_pk(process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwargs: t.Any) -> ResultAndPk: """Run the process with the supplied inputs in a local runner that will block until the process is completed. @@ -80,6 +123,23 @@ def run_get_pk(process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None return runner.run_get_pk(process, inputs, **kwargs) +async def run_get_pk_async( + process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwargs: t.Any +) -> ResultAndPk: + """Run the process with the supplied inputs in a local runner without blocking the event loop. + + :param process: the process class, instance, builder or function to run + :param inputs: the inputs to be passed to the process + :return: tuple of the outputs of the process and process node pk + """ + if isinstance(process, Process): + runner = process.runner + else: + runner = manager.get_manager().get_runner() + + return await runner.run_get_pk_async(process, inputs, **kwargs) + + def submit( process: TYPE_SUBMIT_PROCESS, inputs: dict[str, t.Any] | None = None, @@ -182,3 +242,5 @@ def await_processes(nodes: t.Sequence[ProcessNode], wait_interval: int = 1) -> N # Allow one to also use run.get_node and run.get_pk as a shortcut, without having to import the functions themselves run.get_node = run_get_node # type: ignore[attr-defined] run.get_pk = run_get_pk # type: ignore[attr-defined] +run_async.get_node = run_get_node_async # type: ignore[attr-defined] +run_async.get_pk = run_get_pk_async # type: ignore[attr-defined] diff --git a/src/aiida/engine/processes/functions.py b/src/aiida/engine/processes/functions.py index 08441cddcc..030e087caf 100644 --- a/src/aiida/engine/processes/functions.py +++ b/src/aiida/engine/processes/functions.py @@ -107,6 +107,12 @@ def run_get_pk(self, *args: P.args, **kwargs: P.kwargs) -> tuple[dict[str, t.Any def run_get_node(self, *args: P.args, **kwargs: P.kwargs) -> tuple[dict[str, t.Any] | None, N]: ... + async def run_async(self, *args: P.args, **kwargs: P.kwargs) -> R_co: ... + + async def run_get_pk_async(self, *args: P.args, **kwargs: P.kwargs) -> tuple[dict[str, t.Any] | None, int]: ... + + async def run_get_node_async(self, *args: P.args, **kwargs: P.kwargs) -> tuple[dict[str, t.Any] | None, N]: ... + is_process_function: bool node_class: t.Type[N] @@ -253,6 +259,83 @@ def kill_process(_num, _frame): return result, process.node + async def run_get_node_async(*args, **kwargs) -> tuple[dict[str, t.Any] | None, 'ProcessNode']: + """Run the FunctionProcess with the supplied inputs in a local runner without blocking the event loop. + + :param args: input arguments to construct the FunctionProcess + :param kwargs: input keyword arguments to construct the FunctionProcess + :return: tuple of the outputs of the process and the process node + """ + frame_delta = 1000 + frame_count = get_stack_size() + stack_limit = sys.getrecursionlimit() + LOGGER.info('Executing process function, current stack status: %d frames of %d', frame_count, stack_limit) + + # If the current frame count is more than 80% of the stack limit, or comes within 200 frames, increase the + # stack limit by ``frame_delta``. + if frame_count > min(0.8 * stack_limit, stack_limit - 200): + LOGGER.warning( + 'Current stack contains %d frames which is close to the limit of %d. Increasing the limit by %d', + frame_count, + stack_limit, + frame_delta, + ) + sys.setrecursionlimit(stack_limit + frame_delta) + + manager = get_manager() + runner = manager.get_runner() + inputs = process_class.create_inputs(*args, **kwargs) + + # Remove all the known inputs from the kwargs + for port in process_class.spec().inputs: + kwargs.pop(port, None) + + # If any kwargs remain, the spec should be dynamic, so we raise if it isn't + if kwargs and not process_class.spec().inputs.dynamic: + raise ValueError(f'{function.__name__} does not support these kwargs: {kwargs.keys()}') + + process: Process = process_class(inputs=inputs, runner=runner) + + # Only add handlers for interrupt signal to kill the process if we are in a local and not a daemon runner. + # Without this check, running process functions in a daemon worker would be killed if the daemon is shutdown + current_runner = manager.get_runner() + original_handler = None + kill_signal = signal.SIGINT + + if not current_runner.is_daemon_runner: + + def kill_process(_num, _frame): + """Send the kill signal to the process in the current scope.""" + LOGGER.critical('runner received interrupt, killing process %s', process.pid) + result = process.kill(msg_text='Process was killed because the runner received an interrupt') + return result + + # Store the current handler on the signal such that it can be restored after process has terminated + original_handler = signal.getsignal(kill_signal) + signal.signal(kill_signal, kill_process) + + try: + from aiida.engine import utils as engine_utils + + with engine_utils.loop_scope(runner.loop): + await process.step_until_terminated() + process.future().result() + finally: + # If the `original_handler` is set, that means the `kill_process` was bound, which needs to be reset + if original_handler: + signal.signal(signal.SIGINT, original_handler) + + store_provenance = inputs.get('metadata', {}).get('store_provenance', True) + if not store_provenance: + process.node._storable = False + process.node._unstorable_message = 'cannot store node because it was run with `store_provenance=False`' + + result = process.outputs + if result and len(result) == 1 and process.SINGLE_OUTPUT_LINKNAME in result: + return result[process.SINGLE_OUTPUT_LINKNAME], process.node + + return result, process.node + def run_get_pk(*args, **kwargs) -> tuple[dict[str, t.Any] | None, int]: """Recreate the `run_get_pk` utility launcher. @@ -265,15 +348,36 @@ def run_get_pk(*args, **kwargs) -> tuple[dict[str, t.Any] | None, int]: assert node.pk is not None return result, node.pk + async def run_get_pk_async(*args, **kwargs) -> tuple[dict[str, t.Any] | None, int]: + """Recreate the `run_get_pk` utility launcher without blocking the event loop. + + :param args: input arguments to construct the FunctionProcess + :param kwargs: input keyword arguments to construct the FunctionProcess + :return: tuple of the outputs of the process and the process node pk + + """ + result, node = await run_get_node_async(*args, **kwargs) + assert node.pk is not None + return result, node.pk + @functools.wraps(function) def decorated_function(*args, **kwargs): """This wrapper function is the actual function that is called.""" result, _ = run_get_node(*args, **kwargs) return result + @functools.wraps(function) + async def decorated_function_async(*args, **kwargs): + """Async wrapper for running a process function without blocking the event loop.""" + result, _ = await run_get_node_async(*args, **kwargs) + return result + decorated_function.run = decorated_function # type: ignore[attr-defined] decorated_function.run_get_pk = run_get_pk # type: ignore[attr-defined] decorated_function.run_get_node = run_get_node # type: ignore[attr-defined] + decorated_function.run_async = decorated_function_async # type: ignore[attr-defined] + decorated_function.run_get_pk_async = run_get_pk_async # type: ignore[attr-defined] + decorated_function.run_get_node_async = run_get_node_async # type: ignore[attr-defined] decorated_function.is_process_function = True # type: ignore[attr-defined] decorated_function.node_class = node_class # type: ignore[attr-defined] decorated_function.process_class = process_class # type: ignore[attr-defined] @@ -601,7 +705,9 @@ async def run(self) -> 'ExitCode' | None: # The remaining inputs have to be keyword arguments. kwargs.update(**inputs) - result = self._func(*args, **kwargs) + from plumpy.greenlet_bridge import run_in_greenlet + + result = await run_in_greenlet(self._func, *args, **kwargs) if result is None or isinstance(result, ExitCode): # type: ignore[redundant-expr] return result # type: ignore[unreachable] diff --git a/src/aiida/engine/processes/process.py b/src/aiida/engine/processes/process.py index b16bd95826..c83e748f33 100644 --- a/src/aiida/engine/processes/process.py +++ b/src/aiida/engine/processes/process.py @@ -40,6 +40,7 @@ import plumpy.persistence import plumpy.processes from kiwipy.communications import UnroutableError +from plumpy import run_until_complete from plumpy.process_states import Finished, ProcessState from plumpy.processes import ConnectionClosed # type: ignore[attr-defined] from plumpy.processes import Process as PlumpyProcess @@ -361,7 +362,7 @@ def kill(self, msg_text: str | None = None, force_kill: bool = False) -> Union[b coro = self._launch_task(task_kill_job, self.node, self.runner.transport) self._cancelling_scheduler_job = asyncio.create_task(coro) try: - self.loop.run_until_complete(self._cancelling_scheduler_job) + run_until_complete(self.loop, self._cancelling_scheduler_job) except Exception as exc: self.node.logger.error(f'While cancelling the scheduler job an error was raised: {exc}') return False diff --git a/src/aiida/engine/processes/workchains/workchain.py b/src/aiida/engine/processes/workchains/workchain.py index 4b847722a0..7f2ca6d513 100644 --- a/src/aiida/engine/processes/workchains/workchain.py +++ b/src/aiida/engine/processes/workchains/workchain.py @@ -15,6 +15,7 @@ import logging import typing as t +from plumpy.greenlet_bridge import run_in_greenlet from plumpy.persistence import auto_persist from plumpy.process_states import Continue, Wait from plumpy.processes import ProcessStateMachineMeta @@ -299,7 +300,7 @@ def _update_process_status(self) -> None: @Protect.final async def run(self) -> t.Any: self._stepper = self.spec().get_outline().create_stepper(self) # type: ignore[arg-type] - return self._do_step() + return await run_in_greenlet(self._do_step) def _do_step(self) -> t.Any: """Execute the next step in the outline and return the result. diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index b19821b2e7..b95f2ae991 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -19,6 +19,7 @@ from typing import Any, Callable, Dict, NamedTuple, Optional, Tuple, Type, Union import kiwipy +from plumpy import run_until_complete from plumpy.communications import wrap_communicator from plumpy.events import reset_event_loop_policy, set_event_loop_policy from plumpy.persistence import Persister @@ -156,8 +157,9 @@ def stop(self) -> None: def run_until_complete(self, future: asyncio.Future) -> Any: """Run the loop until the future has finished and return the result.""" + with utils.loop_scope(self._loop): - return self._loop.run_until_complete(future) + return run_until_complete(self._loop, future) def close(self) -> None: """Close the runner by stopping the loop.""" @@ -265,6 +267,56 @@ def kill_process(_num, _frame): return process_inited.outputs, process_inited.node + async def _run_async( + self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any + ) -> Tuple[Dict[str, Any], ProcessNode]: + """Run the process with the supplied inputs in this runner without blocking the event loop. + + The return value will be the results of the completed process. + + :param process: the process class or process function to run + :param inputs: the inputs to be passed to the process + :return: tuple of the outputs of the process and the calculation node + """ + assert not self._closed + + inputs = utils.prepare_inputs(inputs, **kwargs) + + if utils.is_process_function(process): + run_get_node_async = getattr(process, 'run_get_node_async', None) + if run_get_node_async is None: + raise exceptions.InvalidOperation( + 'Process function does not support async execution. ' + 'Use the synchronous launcher or update to a version that provides `run_get_node_async`.' + ) + result, node = await run_get_node_async(**inputs) + return result, node + + with utils.loop_scope(self.loop): + process_inited = self.instantiate_process(process, **inputs) + + def kill_process(_num, _frame): + """Send the kill signal to the process in the current scope.""" + if process_inited.is_killing: + LOGGER.warning('runner received interrupt, process %s already being killed', process_inited.pid) + return + LOGGER.critical('runner received interrupt, killing process %s', process_inited.pid) + process_inited.kill(msg_text='Process was killed because the runner received an interrupt') + + original_handler_int = signal.getsignal(signal.SIGINT) + original_handler_term = signal.getsignal(signal.SIGTERM) + + try: + signal.signal(signal.SIGINT, kill_process) + signal.signal(signal.SIGTERM, kill_process) + await process_inited.step_until_terminated() + process_inited.future().result() + finally: + signal.signal(signal.SIGINT, original_handler_int) + signal.signal(signal.SIGTERM, original_handler_term) + + return process_inited.outputs, process_inited.node + def run(self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any) -> Dict[str, Any]: """Run the process with the supplied inputs in this runner that will block until the process is completed. @@ -277,6 +329,20 @@ def run(self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = None, * result, _ = self._run(process, inputs, **kwargs) return result + async def run_async( + self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any + ) -> Dict[str, Any]: + """Run the process with the supplied inputs in this runner without blocking the event loop. + + The return value will be the results of the completed process. + + :param process: the process class or process function to run + :param inputs: the inputs to be passed to the process + :return: the outputs of the process + """ + result, _ = await self._run_async(process, inputs, **kwargs) + return result + def run_get_node( self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any ) -> ResultAndNode: @@ -291,6 +357,20 @@ def run_get_node( result, node = self._run(process, inputs, **kwargs) return ResultAndNode(result, node) + async def run_get_node_async( + self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any + ) -> ResultAndNode: + """Run the process with the supplied inputs in this runner without blocking the event loop. + + The return value will be the results of the completed process. + + :param process: the process class or process function to run + :param inputs: the inputs to be passed to the process + :return: tuple of the outputs of the process and the calculation node + """ + result, node = await self._run_async(process, inputs, **kwargs) + return ResultAndNode(result, node) + def run_get_pk(self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any) -> ResultAndPk: """Run the process with the supplied inputs in this runner that will block until the process is completed. @@ -303,6 +383,20 @@ def run_get_pk(self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = result, node = self._run(process, inputs, **kwargs) return ResultAndPk(result, node.pk) + async def run_get_pk_async( + self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any + ) -> ResultAndPk: + """Run the process with the supplied inputs in this runner without blocking the event loop. + + The return value will be the results of the completed process. + + :param process: the process class or process function to run + :param inputs: the inputs to be passed to the process + :return: tuple of the outputs of the process and process node pk + """ + result, node = await self._run_async(process, inputs, **kwargs) + return ResultAndPk(result, node.pk) + def call_on_process_finish(self, pk: int, callback: Callable[[], Any]) -> None: """Schedule a callback when the process of the given pk is terminated. diff --git a/src/aiida/engine/transports.py b/src/aiida/engine/transports.py index 9b7e14fe83..9f606b7128 100644 --- a/src/aiida/engine/transports.py +++ b/src/aiida/engine/transports.py @@ -119,13 +119,11 @@ def do_open(): if transport_request.count == 0: # IMPORTANT: Pop from _transport_requests BEFORE closing the transport. # This prevents a race condition with async transports where: - # 1. close() is called, which for AsyncTransport uses run_until_complete(close_async) - # 2. With nest_asyncio (used by plumpy), this call yields back to the event loop - # 3. The event loop schedules close_async, then continues running another tasks - # - for example one that requests the transport which is scheduled to be closed - # 4. The task now using the transport to do some operation awaits, - # next the close_async task closes the transport while still in use -> error - # By poping first, new tasks will create a fresh transport request. + # 1. close() is called, which for AsyncTransport uses run_until_complete to run close_async + # 2. This can yield back to the event loop (via greenlet or re-entrant call) + # 3. The event loop then runs another task that requests the same transport + # 4. That task uses a transport that is being closed -> error + # By popping first, new tasks will create a fresh transport request. self._transport_requests.pop(authinfo.pk, None) if transport_request.future.done(): diff --git a/src/aiida/transports/transport.py b/src/aiida/transports/transport.py index cbe4c38db7..1030eacf20 100644 --- a/src/aiida/transports/transport.py +++ b/src/aiida/transports/transport.py @@ -187,6 +187,30 @@ def __exit__(self, type_, value, traceback): if self._enters == 0: self.close() + async def __aenter__(self): + """Async context manager entry. Opens the transport connection. + + For sync transports, this just calls the sync open() method. + AsyncTransport subclasses override this to use async open. + """ + if self._enters == 0: + if self.is_open: + self._enters += 1 + else: + self.open() + self._enters += 1 + return self + + async def __aexit__(self, type_, value, traceback): + """Async context manager exit. Closes the transport connection if needed. + + For sync transports, this just calls the sync close() method. + AsyncTransport subclasses override this to use async close. + """ + self._enters -= 1 + if self._enters == 0: + self.close() + @property def is_open(self): return self._is_open @@ -1843,13 +1867,30 @@ class AsyncTransport(Transport): because they will block the event loop. """ + async def __aenter__(self): + """Async context manager entry. Opens the transport connection.""" + if self._enters == 0: + if self.is_open: + self._enters += 1 + else: + await self.open_async() + self._enters += 1 + return self + + async def __aexit__(self, type_, value, traceback): + """Async context manager exit. Closes the transport connection if needed.""" + self._enters -= 1 + if self._enters == 0: + await self.close_async() + def run_command_blocking(self, func, *args, **kwargs): - """The event loop must be the one of manager.""" + """Run an async transport method synchronously, handling nested event loop scenarios.""" + from plumpy import run_until_complete from aiida.manage import get_manager - loop = get_manager().get_runner() - return loop.run_until_complete(func(*args, **kwargs)) + loop = get_manager().get_runner().loop + return run_until_complete(loop, func(*args, **kwargs)) def open(self): return self.run_command_blocking(self.open_async) diff --git a/tests/engine/daemon/test_execmanager.py b/tests/engine/daemon/test_execmanager.py index 16269df352..5305a8aeed 100644 --- a/tests/engine/daemon/test_execmanager.py +++ b/tests/engine/daemon/test_execmanager.py @@ -178,7 +178,7 @@ async def test_upload_local_copy_list( node, calc_info = node_and_calc_info calc_info.local_copy_list = [[folder.uuid] + local_copy_list] - with node.computer.get_transport() as transport: + async with node.computer.get_transport() as transport: await execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox) # Check that none of the files were written to the repository of the calculation node, since they were communicated @@ -216,7 +216,7 @@ async def test_upload_local_copy_list_files_folders( (inputs['folder'].uuid, None, '.'), ] - with node.computer.get_transport() as transport: + async with node.computer.get_transport() as transport: await execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox) # Check that none of the files were written to the repository of the calculation node, since they were communicated @@ -248,7 +248,7 @@ async def test_upload_remote_symlink_list( (node.computer.uuid, str(tmp_path / 'file_a.txt'), 'file_a.txt'), ] - with node.computer.get_transport() as transport: + async with node.computer.get_transport() as transport: await execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox) filepath_workdir = pathlib.Path(node.get_remote_workdir()) @@ -313,7 +313,7 @@ async def test_upload_file_copy_operation_order(node_and_calc_info, tmp_path, or if order is not None: calc_info.file_copy_operation_order = order - with node.computer.get_transport() as transport: + async with node.computer.get_transport() as transport: await execmanager.upload_calculation(node, transport, calc_info, sandbox, inputs) filepath = pathlib.Path(node.get_remote_workdir()) / 'file.txt' assert filepath.is_file() @@ -613,14 +613,14 @@ async def test_upload_combinations( ) if expected_exception is not None: with pytest.raises(expected_exception): - with node.computer.get_transport() as transport: + async with node.computer.get_transport() as transport: await execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox) filepath_workdir = pathlib.Path(node.get_remote_workdir()) assert serialize_file_hierarchy(filepath_workdir, read_bytes=False) == expected_hierarchy else: - with node.computer.get_transport() as transport: + async with node.computer.get_transport() as transport: await execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox) filepath_workdir = pathlib.Path(node.get_remote_workdir()) @@ -649,7 +649,7 @@ async def test_upload_calculation_portable_code(fixture_sandbox, node_and_calc_i code_info.code_uuid = code.uuid calc_info.codes_info = [code_info] - with node.computer.get_transport() as transport: + async with node.computer.get_transport() as transport: await execmanager.upload_calculation( node, transport, diff --git a/tests/engine/test_transport.py b/tests/engine/test_transport.py index 2d11eafc6b..6fdb0d5395 100644 --- a/tests/engine/test_transport.py +++ b/tests/engine/test_transport.py @@ -137,7 +137,7 @@ def test_request_removed_before_close(self): This is a regression test for a race condition with async transports where: 1. close() is called, which for AsyncTransport uses run_until_complete() - 2. With nest_asyncio (used by plumpy), this can yield to the event loop + 2. This can yield back to the event loop (via greenlet for instance) 3. Another task might enter and get the same transport_request 4. That task tries to use the transport that's being closed -> error diff --git a/tests/transports/test_asyncssh_plugin.py b/tests/transports/test_asyncssh_plugin.py index 7177d5dfb2..fa6feac54d 100644 --- a/tests/transports/test_asyncssh_plugin.py +++ b/tests/transports/test_asyncssh_plugin.py @@ -34,7 +34,7 @@ async def test_semaphore_released_after_errors(self, tmp_path_factory): } async_transport = AsyncSshTransport(**transport_params) - with async_transport as transport: + async with async_transport as transport: # Each operation should fail but release the semaphore with pytest.raises(OSError, match='Error while downloading file'): await transport.getfile_async('non_existing', local_dir) @@ -46,7 +46,7 @@ async def test_semaphore_released_after_errors(self, tmp_path_factory): with pytest.raises(OSError, match='Error while downloading file'): await transport.getfile_async('non_existing', local_dir) - assert transport._semaphore._value == 1, 'Semaphore should be fully released' + assert async_transport._semaphore._value == 1, 'Semaphore should be fully released' @pytest.mark.asyncio async def test_semaphore_limits_concurrent_operations(self): diff --git a/uv.lock b/uv.lock index ff82b1d777..5b21ea378b 100644 --- a/uv.lock +++ b/uv.lock @@ -224,7 +224,7 @@ requires-dist = [ { name = "jinja2", specifier = "~=3.0" }, { name = "jupyter", marker = "extra == 'notebook'", specifier = "~=1.0" }, { name = "jupyter-client", marker = "extra == 'notebook'", specifier = "~=8.0" }, - { name = "kiwipy", extras = ["rmq"], specifier = "~=0.8.4" }, + { name = "kiwipy", extras = ["rmq"], specifier = "~=0.9.0" }, { name = "matplotlib", marker = "extra == 'atomic-tools'", specifier = "~=3.3,>=3.3.4" }, { name = "mypy", marker = "extra == 'pre-commit'", specifier = "~=1.19.0" }, { name = "myst-nb", marker = "extra == 'docs'", specifier = "~=1.0.0" }, @@ -235,7 +235,7 @@ requires-dist = [ { name = "pg8000", marker = "extra == 'tests'", specifier = "~=1.13" }, { name = "pgsu", specifier = "~=0.3.0" }, { name = "pgtest", marker = "extra == 'tests'", specifier = "~=1.3,>=1.3.1" }, - { name = "plumpy", specifier = "~=0.25.0" }, + { name = "plumpy", git = "https://github.com/khsrali/plumpy.git?rev=greenlet" }, { name = "pre-commit", marker = "extra == 'pre-commit'", specifier = "~=3.5" }, { name = "psutil", specifier = "~=7.0" }, { name = "psycopg", extras = ["binary"], specifier = ">=3.0.2,<4" }, @@ -302,15 +302,44 @@ sdist = { url = "https://files.pythonhosted.org/packages/ea/90/1c9c13f2fdfa96602 [[package]] name = "aio-pika" -version = "9.4.3" +version = "9.5.6" source = { registry = "https://pypi.org/simple" } +resolution-markers = [ + "python_full_version < '3.10'", +] dependencies = [ - { name = "aiormq" }, - { name = "yarl" }, + { name = "aiormq", marker = "python_full_version < '3.10'" }, + { name = "exceptiongroup", marker = "python_full_version < '3.10'" }, + { name = "typing-extensions", marker = "python_full_version < '3.10'" }, + { name = "yarl", marker = "python_full_version < '3.10'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/c9/69/8649bdb97fa1521af3dafe23dbc5debadd4b01abb2850a4d193dae9b0451/aio_pika-9.4.3.tar.gz", hash = "sha256:fd2b1fce25f6ed5203ef1dd554dc03b90c9a46a64aaf758d032d78dc31e5295d", size = 47693, upload-time = "2024-08-13T06:49:09.619Z" } +sdist = { url = "https://files.pythonhosted.org/packages/59/52/fe35c898bce5cc8af839ba786b38f7db8932aac48a67ba8ca7de3b074e07/aio_pika-9.5.6.tar.gz", hash = "sha256:5013f429e1235e1ce8df054a821e0eea140ea9afc94a09725b96590ea2dad001", size = 47308, upload-time = "2025-08-05T14:18:35.949Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/85/66/cad391d83b7266a667c85c826bb6c0d7f68519a0eed7634098c12fb39a4b/aio_pika-9.4.3-py3-none-any.whl", hash = "sha256:f1423d2d5a8b7315d144efe1773763bf687ac17aa1535385982687e9e5ed49bb", size = 53240, upload-time = "2024-08-13T06:49:07.276Z" }, + { url = "https://files.pythonhosted.org/packages/ec/fb/c1cfb7cb98ccd2abdc91e170e7ba0e1e3088b6a9d051e4f2899d3249a231/aio_pika-9.5.6-py3-none-any.whl", hash = "sha256:47b532419185cf1105ae18daa45a5052ff98064915c5e080b2433431fe808193", size = 54303, upload-time = "2025-08-05T14:18:34.62Z" }, +] + +[[package]] +name = "aio-pika" +version = "9.5.8" +source = { registry = "https://pypi.org/simple" } +resolution-markers = [ + "python_full_version >= '3.14' and sys_platform == 'win32'", + "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform == 'win32'", + "python_full_version == '3.11.*' and sys_platform == 'win32'", + "python_full_version >= '3.14' and sys_platform != 'win32'", + "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform != 'win32'", + "python_full_version == '3.11.*' and sys_platform != 'win32'", + "python_full_version == '3.10.*' and sys_platform == 'win32'", + "python_full_version == '3.10.*' and sys_platform != 'win32'", +] +dependencies = [ + { name = "aiormq", marker = "python_full_version >= '3.10'" }, + { name = "exceptiongroup", marker = "python_full_version == '3.10.*'" }, + { name = "yarl", marker = "python_full_version >= '3.10'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c5/73/8d1020683970de5532b3b01732d75c8bf922a6505fcdad1a9c7c6405242a/aio_pika-9.5.8.tar.gz", hash = "sha256:7c36874115f522bbe7486c46d8dd711a4dbedd67c4e8a8c47efe593d01862c62", size = 47408, upload-time = "2025-11-12T10:37:10.215Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7c/91/513971861d845d28160ecb205ae2cfaf618b16918a9cd4e0b832b5360ce7/aio_pika-9.5.8-py3-none-any.whl", hash = "sha256:f4c6cb8a6c5176d00f39fd7431e9702e638449bc6e86d1769ad7548b2a506a8d", size = 54397, upload-time = "2025-11-12T10:37:08.374Z" }, ] [[package]] @@ -2923,21 +2952,22 @@ wheels = [ [[package]] name = "kiwipy" -version = "0.8.5" +version = "0.9.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "deprecation" }, { name = "pyyaml" }, { name = "shortuuid" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/f7/c9/60f4597b2f7ce9f1ce9f202c1ddc70b857716597d828fc5baa123a2fa17e/kiwipy-0.8.5.tar.gz", hash = "sha256:23b746f60577120764d662673335cea40cf34083d15f1ee8ab4fa6155b50d60f", size = 41087, upload-time = "2024-12-02T08:19:59.85Z" } +sdist = { url = "https://files.pythonhosted.org/packages/41/d1/a56aea1ee27c9aa73c7c6c785f4eb0539799392566b758fc920397afea91/kiwipy-0.9.0.tar.gz", hash = "sha256:3dc5a2cbe4bf7127da2c8a6c20476ddad30849b32fa12b495c622059c633db4f", size = 242121, upload-time = "2025-10-21T05:58:10.836Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/68/50/2d180b54272d467a3e5eb4d7e64df80a8bb11d483e908404d71905a2801b/kiwipy-0.8.5-py3-none-any.whl", hash = "sha256:b6acf17ba69fdfc9ce246673efd35e1db06a27b2c624ba1735d2159f8e665a1b", size = 41820, upload-time = "2024-12-02T08:19:58.573Z" }, + { url = "https://files.pythonhosted.org/packages/c0/c0/2ed83a3b88048db2504ddd67a419148e23a1cf2bc64ee0bbaacb46c80bbb/kiwipy-0.9.0-py3-none-any.whl", hash = "sha256:8d861310d64dc15de50667c1c7b7295f2dba50a5561284e8a889e3a6c2b197f9", size = 41865, upload-time = "2025-10-21T05:58:09.033Z" }, ] [package.optional-dependencies] rmq = [ - { name = "aio-pika" }, + { name = "aio-pika", version = "9.5.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" }, + { name = "aio-pika", version = "9.5.8", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, { name = "pamqp" }, { name = "pytray" }, ] @@ -4899,17 +4929,13 @@ wheels = [ [[package]] name = "plumpy" -version = "0.25.0" -source = { registry = "https://pypi.org/simple" } +version = "0.25.1" +source = { git = "https://github.com/khsrali/plumpy.git?rev=greenlet#c036472726030f58dadf8d93bf2cfb6019040fff" } dependencies = [ + { name = "greenlet" }, { name = "kiwipy", extra = ["rmq"] }, - { name = "nest-asyncio" }, { name = "pyyaml" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/ea/b1/7c8141f04fb9060e9de7fd4fafed7ce429a16c1a903675a093d390f14b16/plumpy-0.25.0.tar.gz", hash = "sha256:5eccca0f11757db652b15bfb0bb95dc010a9a5fa000df5f9db51cf6a4d1e682f", size = 198187, upload-time = "2025-05-01T07:48:10.931Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/97/9a/e939c60e6376bdcc84ed9ae92ed96312c13781044c08b1b96be036d4b11b/plumpy-0.25.0-py3-none-any.whl", hash = "sha256:f15e7b471185265e6bbd0cfd37b9cd4b5bf51a9019fdd74247d9cfe28c5da617", size = 75249, upload-time = "2025-05-01T07:48:09.428Z" }, -] [[package]] name = "ply"