From e5a5f9670fcc45fa4e43bf2a85d80c678724c31e Mon Sep 17 00:00:00 2001 From: Ali Khosravi Date: Wed, 28 Jan 2026 15:07:54 +0100 Subject: [PATCH 01/11] . --- environment.yml | 4 ++-- pyproject.toml | 4 ++-- uv.lock | 62 +++++++++++++++++++++++++++++++++++-------------- 3 files changed, 48 insertions(+), 22 deletions(-) diff --git a/environment.yml b/environment.yml index 62aa69f83f..d189874b35 100644 --- a/environment.yml +++ b/environment.yml @@ -16,11 +16,11 @@ dependencies: - docstring_parser - get-annotations~=0.1 - python-graphviz~=0.19 -- plumpy~=0.25.0 +- plumpy@ git+https://github.com/khsrali/plumpy.git@greenlet - ipython>=7.6 - jedi<0.19 - jinja2~=3.0 -- kiwipy[rmq]~=0.8.4 +- kiwipy[rmq]~=0.9.0 - importlib-metadata~=6.0 - numpy<3,>=1.21 - paramiko~=3.0 diff --git a/pyproject.toml b/pyproject.toml index 5c9d205c87..fb244af379 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,11 +39,11 @@ dependencies = [ 'docstring-parser', 'get-annotations~=0.1;python_version<"3.10"', 'graphviz~=0.19', - 'plumpy~=0.25.0', + 'plumpy @ git+https://github.com/khsrali/plumpy.git@greenlet', 'ipython>=7.6', 'jedi<0.19', 'jinja2~=3.0', - 'kiwipy[rmq]~=0.8.4', + 'kiwipy[rmq]~=0.9.0', 'importlib-metadata~=6.0', 'numpy>=1.21,<3', 'paramiko~=3.0', diff --git a/uv.lock b/uv.lock index ff82b1d777..768fb79e54 100644 --- a/uv.lock +++ b/uv.lock @@ -224,7 +224,7 @@ requires-dist = [ { name = "jinja2", specifier = "~=3.0" }, { name = "jupyter", marker = "extra == 'notebook'", specifier = "~=1.0" }, { name = "jupyter-client", marker = "extra == 'notebook'", specifier = "~=8.0" }, - { name = "kiwipy", extras = ["rmq"], specifier = "~=0.8.4" }, + { name = "kiwipy", extras = ["rmq"], specifier = "~=0.9.0" }, { name = "matplotlib", marker = "extra == 'atomic-tools'", specifier = "~=3.3,>=3.3.4" }, { name = "mypy", marker = "extra == 'pre-commit'", specifier = "~=1.19.0" }, { name = "myst-nb", marker = "extra == 'docs'", specifier = "~=1.0.0" }, @@ -235,7 +235,7 @@ requires-dist = [ { name = "pg8000", marker = "extra == 'tests'", specifier = "~=1.13" }, { name = "pgsu", specifier = "~=0.3.0" }, { name = "pgtest", marker = "extra == 'tests'", specifier = "~=1.3,>=1.3.1" }, - { name = "plumpy", specifier = "~=0.25.0" }, + { name = "plumpy", git = "https://github.com/khsrali/plumpy.git?rev=greenlet" }, { name = "pre-commit", marker = "extra == 'pre-commit'", specifier = "~=3.5" }, { name = "psutil", specifier = "~=7.0" }, { name = "psycopg", extras = ["binary"], specifier = ">=3.0.2,<4" }, @@ -302,15 +302,44 @@ sdist = { url = "https://files.pythonhosted.org/packages/ea/90/1c9c13f2fdfa96602 [[package]] name = "aio-pika" -version = "9.4.3" +version = "9.5.6" source = { registry = "https://pypi.org/simple" } +resolution-markers = [ + "python_full_version < '3.10'", +] dependencies = [ - { name = "aiormq" }, - { name = "yarl" }, + { name = "aiormq", marker = "python_full_version < '3.10'" }, + { name = "exceptiongroup", marker = "python_full_version < '3.10'" }, + { name = "typing-extensions", marker = "python_full_version < '3.10'" }, + { name = "yarl", marker = "python_full_version < '3.10'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/c9/69/8649bdb97fa1521af3dafe23dbc5debadd4b01abb2850a4d193dae9b0451/aio_pika-9.4.3.tar.gz", hash = "sha256:fd2b1fce25f6ed5203ef1dd554dc03b90c9a46a64aaf758d032d78dc31e5295d", size = 47693, upload-time = "2024-08-13T06:49:09.619Z" } +sdist = { url = "https://files.pythonhosted.org/packages/59/52/fe35c898bce5cc8af839ba786b38f7db8932aac48a67ba8ca7de3b074e07/aio_pika-9.5.6.tar.gz", hash = "sha256:5013f429e1235e1ce8df054a821e0eea140ea9afc94a09725b96590ea2dad001", size = 47308, upload-time = "2025-08-05T14:18:35.949Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/85/66/cad391d83b7266a667c85c826bb6c0d7f68519a0eed7634098c12fb39a4b/aio_pika-9.4.3-py3-none-any.whl", hash = "sha256:f1423d2d5a8b7315d144efe1773763bf687ac17aa1535385982687e9e5ed49bb", size = 53240, upload-time = "2024-08-13T06:49:07.276Z" }, + { url = "https://files.pythonhosted.org/packages/ec/fb/c1cfb7cb98ccd2abdc91e170e7ba0e1e3088b6a9d051e4f2899d3249a231/aio_pika-9.5.6-py3-none-any.whl", hash = "sha256:47b532419185cf1105ae18daa45a5052ff98064915c5e080b2433431fe808193", size = 54303, upload-time = "2025-08-05T14:18:34.62Z" }, +] + +[[package]] +name = "aio-pika" +version = "9.5.8" +source = { registry = "https://pypi.org/simple" } +resolution-markers = [ + "python_full_version >= '3.14' and sys_platform == 'win32'", + "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform == 'win32'", + "python_full_version == '3.11.*' and sys_platform == 'win32'", + "python_full_version >= '3.14' and sys_platform != 'win32'", + "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform != 'win32'", + "python_full_version == '3.11.*' and sys_platform != 'win32'", + "python_full_version == '3.10.*' and sys_platform == 'win32'", + "python_full_version == '3.10.*' and sys_platform != 'win32'", +] +dependencies = [ + { name = "aiormq", marker = "python_full_version >= '3.10'" }, + { name = "exceptiongroup", marker = "python_full_version == '3.10.*'" }, + { name = "yarl", marker = "python_full_version >= '3.10'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c5/73/8d1020683970de5532b3b01732d75c8bf922a6505fcdad1a9c7c6405242a/aio_pika-9.5.8.tar.gz", hash = "sha256:7c36874115f522bbe7486c46d8dd711a4dbedd67c4e8a8c47efe593d01862c62", size = 47408, upload-time = "2025-11-12T10:37:10.215Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7c/91/513971861d845d28160ecb205ae2cfaf618b16918a9cd4e0b832b5360ce7/aio_pika-9.5.8-py3-none-any.whl", hash = "sha256:f4c6cb8a6c5176d00f39fd7431e9702e638449bc6e86d1769ad7548b2a506a8d", size = 54397, upload-time = "2025-11-12T10:37:08.374Z" }, ] [[package]] @@ -2923,21 +2952,22 @@ wheels = [ [[package]] name = "kiwipy" -version = "0.8.5" +version = "0.9.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "deprecation" }, { name = "pyyaml" }, { name = "shortuuid" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/f7/c9/60f4597b2f7ce9f1ce9f202c1ddc70b857716597d828fc5baa123a2fa17e/kiwipy-0.8.5.tar.gz", hash = "sha256:23b746f60577120764d662673335cea40cf34083d15f1ee8ab4fa6155b50d60f", size = 41087, upload-time = "2024-12-02T08:19:59.85Z" } +sdist = { url = "https://files.pythonhosted.org/packages/41/d1/a56aea1ee27c9aa73c7c6c785f4eb0539799392566b758fc920397afea91/kiwipy-0.9.0.tar.gz", hash = "sha256:3dc5a2cbe4bf7127da2c8a6c20476ddad30849b32fa12b495c622059c633db4f", size = 242121, upload-time = "2025-10-21T05:58:10.836Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/68/50/2d180b54272d467a3e5eb4d7e64df80a8bb11d483e908404d71905a2801b/kiwipy-0.8.5-py3-none-any.whl", hash = "sha256:b6acf17ba69fdfc9ce246673efd35e1db06a27b2c624ba1735d2159f8e665a1b", size = 41820, upload-time = "2024-12-02T08:19:58.573Z" }, + { url = "https://files.pythonhosted.org/packages/c0/c0/2ed83a3b88048db2504ddd67a419148e23a1cf2bc64ee0bbaacb46c80bbb/kiwipy-0.9.0-py3-none-any.whl", hash = "sha256:8d861310d64dc15de50667c1c7b7295f2dba50a5561284e8a889e3a6c2b197f9", size = 41865, upload-time = "2025-10-21T05:58:09.033Z" }, ] [package.optional-dependencies] rmq = [ - { name = "aio-pika" }, + { name = "aio-pika", version = "9.5.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" }, + { name = "aio-pika", version = "9.5.8", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, { name = "pamqp" }, { name = "pytray" }, ] @@ -4899,17 +4929,13 @@ wheels = [ [[package]] name = "plumpy" -version = "0.25.0" -source = { registry = "https://pypi.org/simple" } +version = "0.25.1" +source = { git = "https://github.com/khsrali/plumpy.git?rev=greenlet#243b7fe1f57773de38c206c300c86d0ad5029075" } dependencies = [ + { name = "greenlet" }, { name = "kiwipy", extra = ["rmq"] }, - { name = "nest-asyncio" }, { name = "pyyaml" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/ea/b1/7c8141f04fb9060e9de7fd4fafed7ce429a16c1a903675a093d390f14b16/plumpy-0.25.0.tar.gz", hash = "sha256:5eccca0f11757db652b15bfb0bb95dc010a9a5fa000df5f9db51cf6a4d1e682f", size = 198187, upload-time = "2025-05-01T07:48:10.931Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/97/9a/e939c60e6376bdcc84ed9ae92ed96312c13781044c08b1b96be036d4b11b/plumpy-0.25.0-py3-none-any.whl", hash = "sha256:f15e7b471185265e6bbd0cfd37b9cd4b5bf51a9019fdd74247d9cfe28c5da617", size = 75249, upload-time = "2025-05-01T07:48:09.428Z" }, -] [[package]] name = "ply" From 74c409ce58b9681a3de12f0488a03560c617f146 Mon Sep 17 00:00:00 2001 From: Ali Khosravi Date: Wed, 28 Jan 2026 20:50:53 +0100 Subject: [PATCH 02/11] async with --- src/aiida/engine/processes/process.py | 4 +- src/aiida/engine/runners.py | 4 +- src/aiida/transports/transport.py | 53 ++++++++++++++++++++++++- tests/engine/daemon/test_execmanager.py | 14 +++---- 4 files changed, 64 insertions(+), 11 deletions(-) diff --git a/src/aiida/engine/processes/process.py b/src/aiida/engine/processes/process.py index b16bd95826..12635c2820 100644 --- a/src/aiida/engine/processes/process.py +++ b/src/aiida/engine/processes/process.py @@ -361,7 +361,9 @@ def kill(self, msg_text: str | None = None, force_kill: bool = False) -> Union[b coro = self._launch_task(task_kill_job, self.node, self.runner.transport) self._cancelling_scheduler_job = asyncio.create_task(coro) try: - self.loop.run_until_complete(self._cancelling_scheduler_job) + import plumpy + + plumpy.run_until_complete(self._cancelling_scheduler_job, self.loop) except Exception as exc: self.node.logger.error(f'While cancelling the scheduler job an error was raised: {exc}') return False diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index b19821b2e7..b94e57d1ca 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -156,8 +156,10 @@ def stop(self) -> None: def run_until_complete(self, future: asyncio.Future) -> Any: """Run the loop until the future has finished and return the result.""" + import plumpy + with utils.loop_scope(self._loop): - return self._loop.run_until_complete(future) + return plumpy.run_until_complete(future, self._loop) def close(self) -> None: """Close the runner by stopping the loop.""" diff --git a/src/aiida/transports/transport.py b/src/aiida/transports/transport.py index cbe4c38db7..475d74f599 100644 --- a/src/aiida/transports/transport.py +++ b/src/aiida/transports/transport.py @@ -187,6 +187,30 @@ def __exit__(self, type_, value, traceback): if self._enters == 0: self.close() + async def __aenter__(self): + """Async context manager entry. Opens the transport connection. + + For sync transports, this just calls the sync open() method. + AsyncTransport subclasses override this to use async open. + """ + if self._enters == 0: + if self.is_open: + self._enters += 1 + else: + self.open() + self._enters += 1 + return self + + async def __aexit__(self, type_, value, traceback): + """Async context manager exit. Closes the transport connection if needed. + + For sync transports, this just calls the sync close() method. + AsyncTransport subclasses override this to use async close. + """ + self._enters -= 1 + if self._enters == 0: + self.close() + @property def is_open(self): return self._is_open @@ -1843,13 +1867,38 @@ class AsyncTransport(Transport): because they will block the event loop. """ + async def __aenter__(self): + """Async context manager entry. Opens the transport connection.""" + if self._enters == 0: + if self.is_open: + self._enters += 1 + else: + await self.open_async() + self._enters += 1 + return self + + async def __aexit__(self, type_, value, traceback): + """Async context manager exit. Closes the transport connection if needed.""" + self._enters -= 1 + if self._enters == 0: + await self.close_async() + def run_command_blocking(self, func, *args, **kwargs): """The event loop must be the one of manager.""" + from plumpy.greenlet_bridge import await_only, in_worker_greenlet, run_in_thread from aiida.manage import get_manager - loop = get_manager().get_runner() - return loop.run_until_complete(func(*args, **kwargs)) + loop = get_manager().get_runner().loop + + if loop.is_running(): + if in_worker_greenlet(): + return await_only(func(*args, **kwargs)) + else: + # Pass a factory so the awaitable is created fresh in the new thread's loop + return run_in_thread(lambda: func(*args, **kwargs)) + else: + return loop.run_until_complete(func(*args, **kwargs)) def open(self): return self.run_command_blocking(self.open_async) diff --git a/tests/engine/daemon/test_execmanager.py b/tests/engine/daemon/test_execmanager.py index 16269df352..5305a8aeed 100644 --- a/tests/engine/daemon/test_execmanager.py +++ b/tests/engine/daemon/test_execmanager.py @@ -178,7 +178,7 @@ async def test_upload_local_copy_list( node, calc_info = node_and_calc_info calc_info.local_copy_list = [[folder.uuid] + local_copy_list] - with node.computer.get_transport() as transport: + async with node.computer.get_transport() as transport: await execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox) # Check that none of the files were written to the repository of the calculation node, since they were communicated @@ -216,7 +216,7 @@ async def test_upload_local_copy_list_files_folders( (inputs['folder'].uuid, None, '.'), ] - with node.computer.get_transport() as transport: + async with node.computer.get_transport() as transport: await execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox) # Check that none of the files were written to the repository of the calculation node, since they were communicated @@ -248,7 +248,7 @@ async def test_upload_remote_symlink_list( (node.computer.uuid, str(tmp_path / 'file_a.txt'), 'file_a.txt'), ] - with node.computer.get_transport() as transport: + async with node.computer.get_transport() as transport: await execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox) filepath_workdir = pathlib.Path(node.get_remote_workdir()) @@ -313,7 +313,7 @@ async def test_upload_file_copy_operation_order(node_and_calc_info, tmp_path, or if order is not None: calc_info.file_copy_operation_order = order - with node.computer.get_transport() as transport: + async with node.computer.get_transport() as transport: await execmanager.upload_calculation(node, transport, calc_info, sandbox, inputs) filepath = pathlib.Path(node.get_remote_workdir()) / 'file.txt' assert filepath.is_file() @@ -613,14 +613,14 @@ async def test_upload_combinations( ) if expected_exception is not None: with pytest.raises(expected_exception): - with node.computer.get_transport() as transport: + async with node.computer.get_transport() as transport: await execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox) filepath_workdir = pathlib.Path(node.get_remote_workdir()) assert serialize_file_hierarchy(filepath_workdir, read_bytes=False) == expected_hierarchy else: - with node.computer.get_transport() as transport: + async with node.computer.get_transport() as transport: await execmanager.upload_calculation(node, transport, calc_info, fixture_sandbox) filepath_workdir = pathlib.Path(node.get_remote_workdir()) @@ -649,7 +649,7 @@ async def test_upload_calculation_portable_code(fixture_sandbox, node_and_calc_i code_info.code_uuid = code.uuid calc_info.codes_info = [code_info] - with node.computer.get_transport() as transport: + async with node.computer.get_transport() as transport: await execmanager.upload_calculation( node, transport, From ed288b4878488dd27453cda784b5e51e9774ba10 Mon Sep 17 00:00:00 2001 From: Ali Khosravi Date: Thu, 29 Jan 2026 11:00:28 +0100 Subject: [PATCH 03/11] more fixes --- src/aiida/engine/processes/functions.py | 4 +++- src/aiida/engine/processes/process.py | 4 ++-- src/aiida/engine/processes/workchains/workchain.py | 4 +++- src/aiida/engine/runners.py | 4 ++-- tests/transports/test_asyncssh_plugin.py | 4 ++-- 5 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/aiida/engine/processes/functions.py b/src/aiida/engine/processes/functions.py index 08441cddcc..56b5f27309 100644 --- a/src/aiida/engine/processes/functions.py +++ b/src/aiida/engine/processes/functions.py @@ -601,7 +601,9 @@ async def run(self) -> 'ExitCode' | None: # The remaining inputs have to be keyword arguments. kwargs.update(**inputs) - result = self._func(*args, **kwargs) + from plumpy.greenlet_bridge import greenlet_spawn + + result = await greenlet_spawn(self._func, *args, **kwargs) if result is None or isinstance(result, ExitCode): # type: ignore[redundant-expr] return result # type: ignore[unreachable] diff --git a/src/aiida/engine/processes/process.py b/src/aiida/engine/processes/process.py index 12635c2820..4c2ac15d7a 100644 --- a/src/aiida/engine/processes/process.py +++ b/src/aiida/engine/processes/process.py @@ -361,9 +361,9 @@ def kill(self, msg_text: str | None = None, force_kill: bool = False) -> Union[b coro = self._launch_task(task_kill_job, self.node, self.runner.transport) self._cancelling_scheduler_job = asyncio.create_task(coro) try: - import plumpy + from plumpy.greenlet_bridge import run_until_complete - plumpy.run_until_complete(self._cancelling_scheduler_job, self.loop) + run_until_complete(self.loop, self._cancelling_scheduler_job) except Exception as exc: self.node.logger.error(f'While cancelling the scheduler job an error was raised: {exc}') return False diff --git a/src/aiida/engine/processes/workchains/workchain.py b/src/aiida/engine/processes/workchains/workchain.py index 4b847722a0..f661653fdf 100644 --- a/src/aiida/engine/processes/workchains/workchain.py +++ b/src/aiida/engine/processes/workchains/workchain.py @@ -298,8 +298,10 @@ def _update_process_status(self) -> None: @override @Protect.final async def run(self) -> t.Any: + from plumpy.greenlet_bridge import greenlet_spawn + self._stepper = self.spec().get_outline().create_stepper(self) # type: ignore[arg-type] - return self._do_step() + return await greenlet_spawn(self._do_step) def _do_step(self) -> t.Any: """Execute the next step in the outline and return the result. diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index b94e57d1ca..6adfa1d5f8 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -156,10 +156,10 @@ def stop(self) -> None: def run_until_complete(self, future: asyncio.Future) -> Any: """Run the loop until the future has finished and return the result.""" - import plumpy + from plumpy.greenlet_bridge import run_until_complete with utils.loop_scope(self._loop): - return plumpy.run_until_complete(future, self._loop) + return run_until_complete(self._loop, future) def close(self) -> None: """Close the runner by stopping the loop.""" diff --git a/tests/transports/test_asyncssh_plugin.py b/tests/transports/test_asyncssh_plugin.py index 7177d5dfb2..fa6feac54d 100644 --- a/tests/transports/test_asyncssh_plugin.py +++ b/tests/transports/test_asyncssh_plugin.py @@ -34,7 +34,7 @@ async def test_semaphore_released_after_errors(self, tmp_path_factory): } async_transport = AsyncSshTransport(**transport_params) - with async_transport as transport: + async with async_transport as transport: # Each operation should fail but release the semaphore with pytest.raises(OSError, match='Error while downloading file'): await transport.getfile_async('non_existing', local_dir) @@ -46,7 +46,7 @@ async def test_semaphore_released_after_errors(self, tmp_path_factory): with pytest.raises(OSError, match='Error while downloading file'): await transport.getfile_async('non_existing', local_dir) - assert transport._semaphore._value == 1, 'Semaphore should be fully released' + assert async_transport._semaphore._value == 1, 'Semaphore should be fully released' @pytest.mark.asyncio async def test_semaphore_limits_concurrent_operations(self): From 4070d1437fbb7a6ced99503472c6eeae5a41a938 Mon Sep 17 00:00:00 2001 From: Ali Khosravi Date: Thu, 29 Jan 2026 16:16:03 +0100 Subject: [PATCH 04/11] import issues --- src/aiida/engine/processes/process.py | 2 +- src/aiida/engine/runners.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/aiida/engine/processes/process.py b/src/aiida/engine/processes/process.py index 4c2ac15d7a..fdadf1b3f7 100644 --- a/src/aiida/engine/processes/process.py +++ b/src/aiida/engine/processes/process.py @@ -361,7 +361,7 @@ def kill(self, msg_text: str | None = None, force_kill: bool = False) -> Union[b coro = self._launch_task(task_kill_job, self.node, self.runner.transport) self._cancelling_scheduler_job = asyncio.create_task(coro) try: - from plumpy.greenlet_bridge import run_until_complete + from plumpy import run_until_complete # type: ignore[attr-defined] run_until_complete(self.loop, self._cancelling_scheduler_job) except Exception as exc: diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index 6adfa1d5f8..3a331b62d8 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -156,7 +156,7 @@ def stop(self) -> None: def run_until_complete(self, future: asyncio.Future) -> Any: """Run the loop until the future has finished and return the result.""" - from plumpy.greenlet_bridge import run_until_complete + from plumpy import run_until_complete # type: ignore[attr-defined] with utils.loop_scope(self._loop): return run_until_complete(self._loop, future) From 655d40c32b2f2be4786edf05013b5aea824a9c09 Mon Sep 17 00:00:00 2001 From: Ali Khosravi Date: Thu, 29 Jan 2026 16:55:20 +0100 Subject: [PATCH 05/11] lock file update --- src/aiida/engine/processes/process.py | 2 +- src/aiida/engine/runners.py | 2 +- uv.lock | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/aiida/engine/processes/process.py b/src/aiida/engine/processes/process.py index fdadf1b3f7..83f9fd667c 100644 --- a/src/aiida/engine/processes/process.py +++ b/src/aiida/engine/processes/process.py @@ -361,7 +361,7 @@ def kill(self, msg_text: str | None = None, force_kill: bool = False) -> Union[b coro = self._launch_task(task_kill_job, self.node, self.runner.transport) self._cancelling_scheduler_job = asyncio.create_task(coro) try: - from plumpy import run_until_complete # type: ignore[attr-defined] + from plumpy import run_until_complete run_until_complete(self.loop, self._cancelling_scheduler_job) except Exception as exc: diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index 3a331b62d8..512704cc76 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -156,7 +156,7 @@ def stop(self) -> None: def run_until_complete(self, future: asyncio.Future) -> Any: """Run the loop until the future has finished and return the result.""" - from plumpy import run_until_complete # type: ignore[attr-defined] + from plumpy import run_until_complete with utils.loop_scope(self._loop): return run_until_complete(self._loop, future) diff --git a/uv.lock b/uv.lock index 768fb79e54..f98d5eb882 100644 --- a/uv.lock +++ b/uv.lock @@ -4930,7 +4930,7 @@ wheels = [ [[package]] name = "plumpy" version = "0.25.1" -source = { git = "https://github.com/khsrali/plumpy.git?rev=greenlet#243b7fe1f57773de38c206c300c86d0ad5029075" } +source = { git = "https://github.com/khsrali/plumpy.git?rev=greenlet#cf186b0d02d6c709e2ed87abbdd4cefe2cda659f" } dependencies = [ { name = "greenlet" }, { name = "kiwipy", extra = ["rmq"] }, From bac6466879372f14dbb6a2d3b09c2bea44f404c6 Mon Sep 17 00:00:00 2001 From: Ali Khosravi Date: Thu, 29 Jan 2026 17:04:45 +0100 Subject: [PATCH 06/11] pre-commit fixed? --- uv.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uv.lock b/uv.lock index f98d5eb882..aae2b8fca2 100644 --- a/uv.lock +++ b/uv.lock @@ -4930,7 +4930,7 @@ wheels = [ [[package]] name = "plumpy" version = "0.25.1" -source = { git = "https://github.com/khsrali/plumpy.git?rev=greenlet#cf186b0d02d6c709e2ed87abbdd4cefe2cda659f" } +source = { git = "https://github.com/khsrali/plumpy.git?rev=greenlet#df5ef958b7909d815092b46b61194bb6f7eeb19a" } dependencies = [ { name = "greenlet" }, { name = "kiwipy", extra = ["rmq"] }, From 0d1debce7afa3d86e9090a202a8fae383566e4bb Mon Sep 17 00:00:00 2001 From: Ali Khosravi Date: Fri, 30 Jan 2026 14:38:43 +0100 Subject: [PATCH 07/11] run_async --- src/aiida/engine/__init__.py | 3 + src/aiida/engine/launch.py | 64 ++++++++++++++- src/aiida/engine/processes/functions.py | 104 ++++++++++++++++++++++++ src/aiida/engine/runners.py | 92 +++++++++++++++++++++ uv.lock | 2 +- 5 files changed, 263 insertions(+), 2 deletions(-) diff --git a/src/aiida/engine/__init__.py b/src/aiida/engine/__init__.py index 0a855c6484..869308be5a 100644 --- a/src/aiida/engine/__init__.py +++ b/src/aiida/engine/__init__.py @@ -68,8 +68,11 @@ 'process_handler', 'return_', 'run', + 'run_async', 'run_get_node', + 'run_get_node_async', 'run_get_pk', + 'run_get_pk_async', 'submit', 'while_', 'workfunction', diff --git a/src/aiida/engine/launch.py b/src/aiida/engine/launch.py index 78e42df19b..c7b7b8685b 100644 --- a/src/aiida/engine/launch.py +++ b/src/aiida/engine/launch.py @@ -25,7 +25,16 @@ from .runners import ResultAndPk from .utils import instantiate_process, is_process_scoped, prepare_inputs -__all__ = ('await_processes', 'run', 'run_get_node', 'run_get_pk', 'submit') +__all__ = ( + 'await_processes', + 'run', + 'run_async', + 'run_get_node', + 'run_get_node_async', + 'run_get_pk', + 'run_get_pk_async', + 'submit', +) TYPE_RUN_PROCESS = t.Union[Process, t.Type[Process], ProcessBuilder] # run can also be process function, but it is not clear what type this should be @@ -48,6 +57,23 @@ def run(process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwa return runner.run(process, inputs, **kwargs) +async def run_async( + process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwargs: t.Any +) -> dict[str, t.Any]: + """Run the process with the supplied inputs in a local runner without blocking the event loop. + + :param process: the process class or process function to run + :param inputs: the inputs to be passed to the process + :return: the outputs of the process + """ + if isinstance(process, Process): + runner = process.runner + else: + runner = manager.get_manager().get_runner() + + return await runner.run_async(process, inputs, **kwargs) + + def run_get_node( process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwargs: t.Any ) -> tuple[dict[str, t.Any], ProcessNode]: @@ -65,6 +91,23 @@ def run_get_node( return runner.run_get_node(process, inputs, **kwargs) +async def run_get_node_async( + process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwargs: t.Any +) -> tuple[dict[str, t.Any], ProcessNode]: + """Run the process with the supplied inputs in a local runner without blocking the event loop. + + :param process: the process class, instance, builder or function to run + :param inputs: the inputs to be passed to the process + :return: tuple of the outputs of the process and the process node + """ + if isinstance(process, Process): + runner = process.runner + else: + runner = manager.get_manager().get_runner() + + return await runner.run_get_node_async(process, inputs, **kwargs) + + def run_get_pk(process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwargs: t.Any) -> ResultAndPk: """Run the process with the supplied inputs in a local runner that will block until the process is completed. @@ -80,6 +123,23 @@ def run_get_pk(process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None return runner.run_get_pk(process, inputs, **kwargs) +async def run_get_pk_async( + process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwargs: t.Any +) -> ResultAndPk: + """Run the process with the supplied inputs in a local runner without blocking the event loop. + + :param process: the process class, instance, builder or function to run + :param inputs: the inputs to be passed to the process + :return: tuple of the outputs of the process and process node pk + """ + if isinstance(process, Process): + runner = process.runner + else: + runner = manager.get_manager().get_runner() + + return await runner.run_get_pk_async(process, inputs, **kwargs) + + def submit( process: TYPE_SUBMIT_PROCESS, inputs: dict[str, t.Any] | None = None, @@ -182,3 +242,5 @@ def await_processes(nodes: t.Sequence[ProcessNode], wait_interval: int = 1) -> N # Allow one to also use run.get_node and run.get_pk as a shortcut, without having to import the functions themselves run.get_node = run_get_node # type: ignore[attr-defined] run.get_pk = run_get_pk # type: ignore[attr-defined] +run_async.get_node = run_get_node_async # type: ignore[attr-defined] +run_async.get_pk = run_get_pk_async # type: ignore[attr-defined] diff --git a/src/aiida/engine/processes/functions.py b/src/aiida/engine/processes/functions.py index 56b5f27309..9d79bfcb15 100644 --- a/src/aiida/engine/processes/functions.py +++ b/src/aiida/engine/processes/functions.py @@ -107,6 +107,12 @@ def run_get_pk(self, *args: P.args, **kwargs: P.kwargs) -> tuple[dict[str, t.Any def run_get_node(self, *args: P.args, **kwargs: P.kwargs) -> tuple[dict[str, t.Any] | None, N]: ... + async def run_async(self, *args: P.args, **kwargs: P.kwargs) -> R_co: ... + + async def run_get_pk_async(self, *args: P.args, **kwargs: P.kwargs) -> tuple[dict[str, t.Any] | None, int]: ... + + async def run_get_node_async(self, *args: P.args, **kwargs: P.kwargs) -> tuple[dict[str, t.Any] | None, N]: ... + is_process_function: bool node_class: t.Type[N] @@ -253,6 +259,83 @@ def kill_process(_num, _frame): return result, process.node + async def run_get_node_async(*args, **kwargs) -> tuple[dict[str, t.Any] | None, 'ProcessNode']: + """Run the FunctionProcess with the supplied inputs in a local runner without blocking the event loop. + + :param args: input arguments to construct the FunctionProcess + :param kwargs: input keyword arguments to construct the FunctionProcess + :return: tuple of the outputs of the process and the process node + """ + frame_delta = 1000 + frame_count = get_stack_size() + stack_limit = sys.getrecursionlimit() + LOGGER.info('Executing process function, current stack status: %d frames of %d', frame_count, stack_limit) + + # If the current frame count is more than 80% of the stack limit, or comes within 200 frames, increase the + # stack limit by ``frame_delta``. + if frame_count > min(0.8 * stack_limit, stack_limit - 200): + LOGGER.warning( + 'Current stack contains %d frames which is close to the limit of %d. Increasing the limit by %d', + frame_count, + stack_limit, + frame_delta, + ) + sys.setrecursionlimit(stack_limit + frame_delta) + + manager = get_manager() + runner = manager.get_runner() + inputs = process_class.create_inputs(*args, **kwargs) + + # Remove all the known inputs from the kwargs + for port in process_class.spec().inputs: + kwargs.pop(port, None) + + # If any kwargs remain, the spec should be dynamic, so we raise if it isn't + if kwargs and not process_class.spec().inputs.dynamic: + raise ValueError(f'{function.__name__} does not support these kwargs: {kwargs.keys()}') + + process: Process = process_class(inputs=inputs, runner=runner) + + # Only add handlers for interrupt signal to kill the process if we are in a local and not a daemon runner. + # Without this check, running process functions in a daemon worker would be killed if the daemon is shutdown + current_runner = manager.get_runner() + original_handler = None + kill_signal = signal.SIGINT + + if not current_runner.is_daemon_runner: + + def kill_process(_num, _frame): + """Send the kill signal to the process in the current scope.""" + LOGGER.critical('runner received interrupt, killing process %s', process.pid) + result = process.kill(msg_text='Process was killed because the runner received an interrupt') + return result + + # Store the current handler on the signal such that it can be restored after process has terminated + original_handler = signal.getsignal(kill_signal) + signal.signal(kill_signal, kill_process) + + try: + from aiida.engine import utils as engine_utils + + with engine_utils.loop_scope(runner.loop): + await process.step_until_terminated() + process.future().result() + finally: + # If the `original_handler` is set, that means the `kill_process` was bound, which needs to be reset + if original_handler: + signal.signal(signal.SIGINT, original_handler) + + store_provenance = inputs.get('metadata', {}).get('store_provenance', True) + if not store_provenance: + process.node._storable = False + process.node._unstorable_message = 'cannot store node because it was run with `store_provenance=False`' + + result = process.outputs + if result and len(result) == 1 and process.SINGLE_OUTPUT_LINKNAME in result: + return result[process.SINGLE_OUTPUT_LINKNAME], process.node + + return result, process.node + def run_get_pk(*args, **kwargs) -> tuple[dict[str, t.Any] | None, int]: """Recreate the `run_get_pk` utility launcher. @@ -265,15 +348,36 @@ def run_get_pk(*args, **kwargs) -> tuple[dict[str, t.Any] | None, int]: assert node.pk is not None return result, node.pk + async def run_get_pk_async(*args, **kwargs) -> tuple[dict[str, t.Any] | None, int]: + """Recreate the `run_get_pk` utility launcher without blocking the event loop. + + :param args: input arguments to construct the FunctionProcess + :param kwargs: input keyword arguments to construct the FunctionProcess + :return: tuple of the outputs of the process and the process node pk + + """ + result, node = await run_get_node_async(*args, **kwargs) + assert node.pk is not None + return result, node.pk + @functools.wraps(function) def decorated_function(*args, **kwargs): """This wrapper function is the actual function that is called.""" result, _ = run_get_node(*args, **kwargs) return result + @functools.wraps(function) + async def decorated_function_async(*args, **kwargs): + """Async wrapper for running a process function without blocking the event loop.""" + result, _ = await run_get_node_async(*args, **kwargs) + return result + decorated_function.run = decorated_function # type: ignore[attr-defined] decorated_function.run_get_pk = run_get_pk # type: ignore[attr-defined] decorated_function.run_get_node = run_get_node # type: ignore[attr-defined] + decorated_function.run_async = decorated_function_async # type: ignore[attr-defined] + decorated_function.run_get_pk_async = run_get_pk_async # type: ignore[attr-defined] + decorated_function.run_get_node_async = run_get_node_async # type: ignore[attr-defined] decorated_function.is_process_function = True # type: ignore[attr-defined] decorated_function.node_class = node_class # type: ignore[attr-defined] decorated_function.process_class = process_class # type: ignore[attr-defined] diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index 512704cc76..d9093a389d 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -267,6 +267,56 @@ def kill_process(_num, _frame): return process_inited.outputs, process_inited.node + async def _run_async( + self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any + ) -> Tuple[Dict[str, Any], ProcessNode]: + """Run the process with the supplied inputs in this runner without blocking the event loop. + + The return value will be the results of the completed process. + + :param process: the process class or process function to run + :param inputs: the inputs to be passed to the process + :return: tuple of the outputs of the process and the calculation node + """ + assert not self._closed + + inputs = utils.prepare_inputs(inputs, **kwargs) + + if utils.is_process_function(process): + run_get_node_async = getattr(process, 'run_get_node_async', None) + if run_get_node_async is None: + raise exceptions.InvalidOperation( + 'Process function does not support async execution. ' + 'Use the synchronous launcher or update to a version that provides `run_get_node_async`.' + ) + result, node = await run_get_node_async(**inputs) + return result, node + + with utils.loop_scope(self.loop): + process_inited = self.instantiate_process(process, **inputs) + + def kill_process(_num, _frame): + """Send the kill signal to the process in the current scope.""" + if process_inited.is_killing: + LOGGER.warning('runner received interrupt, process %s already being killed', process_inited.pid) + return + LOGGER.critical('runner received interrupt, killing process %s', process_inited.pid) + process_inited.kill(msg_text='Process was killed because the runner received an interrupt') + + original_handler_int = signal.getsignal(signal.SIGINT) + original_handler_term = signal.getsignal(signal.SIGTERM) + + try: + signal.signal(signal.SIGINT, kill_process) + signal.signal(signal.SIGTERM, kill_process) + await process_inited.step_until_terminated() + process_inited.future().result() + finally: + signal.signal(signal.SIGINT, original_handler_int) + signal.signal(signal.SIGTERM, original_handler_term) + + return process_inited.outputs, process_inited.node + def run(self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any) -> Dict[str, Any]: """Run the process with the supplied inputs in this runner that will block until the process is completed. @@ -279,6 +329,20 @@ def run(self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = None, * result, _ = self._run(process, inputs, **kwargs) return result + async def run_async( + self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any + ) -> Dict[str, Any]: + """Run the process with the supplied inputs in this runner without blocking the event loop. + + The return value will be the results of the completed process. + + :param process: the process class or process function to run + :param inputs: the inputs to be passed to the process + :return: the outputs of the process + """ + result, _ = await self._run_async(process, inputs, **kwargs) + return result + def run_get_node( self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any ) -> ResultAndNode: @@ -293,6 +357,20 @@ def run_get_node( result, node = self._run(process, inputs, **kwargs) return ResultAndNode(result, node) + async def run_get_node_async( + self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any + ) -> ResultAndNode: + """Run the process with the supplied inputs in this runner without blocking the event loop. + + The return value will be the results of the completed process. + + :param process: the process class or process function to run + :param inputs: the inputs to be passed to the process + :return: tuple of the outputs of the process and the calculation node + """ + result, node = await self._run_async(process, inputs, **kwargs) + return ResultAndNode(result, node) + def run_get_pk(self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any) -> ResultAndPk: """Run the process with the supplied inputs in this runner that will block until the process is completed. @@ -305,6 +383,20 @@ def run_get_pk(self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = result, node = self._run(process, inputs, **kwargs) return ResultAndPk(result, node.pk) + async def run_get_pk_async( + self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any + ) -> ResultAndPk: + """Run the process with the supplied inputs in this runner without blocking the event loop. + + The return value will be the results of the completed process. + + :param process: the process class or process function to run + :param inputs: the inputs to be passed to the process + :return: tuple of the outputs of the process and process node pk + """ + result, node = await self._run_async(process, inputs, **kwargs) + return ResultAndPk(result, node.pk) + def call_on_process_finish(self, pk: int, callback: Callable[[], Any]) -> None: """Schedule a callback when the process of the given pk is terminated. diff --git a/uv.lock b/uv.lock index aae2b8fca2..f9af23068c 100644 --- a/uv.lock +++ b/uv.lock @@ -4930,7 +4930,7 @@ wheels = [ [[package]] name = "plumpy" version = "0.25.1" -source = { git = "https://github.com/khsrali/plumpy.git?rev=greenlet#df5ef958b7909d815092b46b61194bb6f7eeb19a" } +source = { git = "https://github.com/khsrali/plumpy.git?rev=greenlet#58a0b9b02fa1bb49967c70630ee5644ffe062581" } dependencies = [ { name = "greenlet" }, { name = "kiwipy", extra = ["rmq"] }, From 6af5aa439aba6183a23c10e5187f86d7c0b49fae Mon Sep 17 00:00:00 2001 From: Ali Khosravi Date: Fri, 30 Jan 2026 16:07:30 +0100 Subject: [PATCH 08/11] basic.md switch to run_async --- docs/source/tutorials/basic.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/source/tutorials/basic.md b/docs/source/tutorials/basic.md index e490fa13f7..30893ae9a5 100644 --- a/docs/source/tutorials/basic.md +++ b/docs/source/tutorials/basic.md @@ -189,7 +189,7 @@ y = orm.Int(3) Now it's time to multiply the two numbers! ```{code-cell} ipython3 -multiply(x, y) +await multiply.run_async(x, y) ``` Success! @@ -372,10 +372,10 @@ One nifty feature of the builder is the ability to use tab completion for the in Try it out by typing `builder.` + `` in the verdi shell. ::: -To execute the `CalcJob`, we use the `run` function provided by the AiiDA engine, and wait for the process to complete: +To execute the `CalcJob`, we use the `run_async` function provided by the AiiDA engine, and wait for the process to complete: ```{code-cell} ipython3 -engine.run(builder) +await engine.run_async(builder) ``` Besides the sum of the two `Int` nodes, the calculation function also returns two other outputs: one of type `RemoteData` and one of type `FolderData`. @@ -532,7 +532,7 @@ Once the `WorkChain` input has been set up, we run it with the AiiDA engine: ```{code-cell} ipython3 from aiida import engine -engine.run(builder) +await engine.run_async(builder) ``` Now quickly leave the IPython shell and check the process list: @@ -608,7 +608,7 @@ The provenance graph should be similar to the one we showed at the start of this ## Submitting to the daemon -When we used the `run` command in the previous sections, the IPython shell was blocked while it was waiting for the `CalcJob` to finish. +When we used the `run_async` function in the previous sections, the notebook cell was waiting for the `CalcJob` to finish. This is not a problem when we're simply adding two number together, but if we want to run multiple calculations that take hours or days, this is no longer practical. Instead, we are going to *submit* the `CalcJob` to the AiiDA *daemon*. The daemon is a program that runs in the background and manages submitted calculations until they are *terminated*. From 9676baad33b21ba2b2708b2fcb4a4b7dbf1092d0 Mon Sep 17 00:00:00 2001 From: Ali Khosravi Date: Fri, 30 Jan 2026 16:13:20 +0100 Subject: [PATCH 09/11] basic.md note updated --- docs/source/tutorials/basic.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/source/tutorials/basic.md b/docs/source/tutorials/basic.md index 30893ae9a5..6acf442a5b 100644 --- a/docs/source/tutorials/basic.md +++ b/docs/source/tutorials/basic.md @@ -192,6 +192,13 @@ Now it's time to multiply the two numbers! await multiply.run_async(x, y) ``` +:::{note} +Because this tutorial runs as a Jupyter notebook, we use `await multiply.run_async(...)` instead of the synchronous `multiply(...)`. +Jupyter's kernel has an event loop already running, so process execution must be `await`-ed rather than run synchronously. +In a regular Python script, the event loop is not running, so you can call `multiply(x, y)` directly. +The same applies to `engine.run_async` vs `engine.run` used later in this tutorial. +::: + Success! The `calcfunction`-decorated `multiply` function has multiplied the two `Int` data nodes and returned a new `Int` data node whose value is the product of the two input nodes. Note that by executing the `multiply` function, all input and output nodes are automatically stored in the database: From bd9e7b6ff63e85394d809a504572c4485f20e877 Mon Sep 17 00:00:00 2001 From: Ali Khosravi Date: Fri, 30 Jan 2026 17:15:24 +0100 Subject: [PATCH 10/11] transport run_until_complete, now imports the logic from plumpy --- pyproject.toml | 1 - src/aiida/engine/processes/process.py | 3 +-- src/aiida/engine/processes/workchains/workchain.py | 3 +-- src/aiida/engine/runners.py | 2 +- src/aiida/engine/transports.py | 12 +++++------- src/aiida/transports/transport.py | 14 +++----------- tests/engine/test_transport.py | 2 +- uv.lock | 2 +- 8 files changed, 13 insertions(+), 26 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index fb244af379..16224dbdf0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -412,7 +412,6 @@ filterwarnings = [ 'ignore:The `Code` class is deprecated.*:aiida.common.warnings.AiidaDeprecationWarning', # https://github.com/aiidateam/plumpy/issues/283 'ignore:There is no current event loop:DeprecationWarning:plumpy', - 'ignore:There is no current event loop:DeprecationWarning:nest_asyncio', # spglib deprecation 'ignore:dict interface is deprecated:DeprecationWarning', # https://github.com/aiidateam/archive-path/issues/21 diff --git a/src/aiida/engine/processes/process.py b/src/aiida/engine/processes/process.py index 83f9fd667c..c83e748f33 100644 --- a/src/aiida/engine/processes/process.py +++ b/src/aiida/engine/processes/process.py @@ -40,6 +40,7 @@ import plumpy.persistence import plumpy.processes from kiwipy.communications import UnroutableError +from plumpy import run_until_complete from plumpy.process_states import Finished, ProcessState from plumpy.processes import ConnectionClosed # type: ignore[attr-defined] from plumpy.processes import Process as PlumpyProcess @@ -361,8 +362,6 @@ def kill(self, msg_text: str | None = None, force_kill: bool = False) -> Union[b coro = self._launch_task(task_kill_job, self.node, self.runner.transport) self._cancelling_scheduler_job = asyncio.create_task(coro) try: - from plumpy import run_until_complete - run_until_complete(self.loop, self._cancelling_scheduler_job) except Exception as exc: self.node.logger.error(f'While cancelling the scheduler job an error was raised: {exc}') diff --git a/src/aiida/engine/processes/workchains/workchain.py b/src/aiida/engine/processes/workchains/workchain.py index f661653fdf..dafef0f74b 100644 --- a/src/aiida/engine/processes/workchains/workchain.py +++ b/src/aiida/engine/processes/workchains/workchain.py @@ -15,6 +15,7 @@ import logging import typing as t +from plumpy.greenlet_bridge import greenlet_spawn from plumpy.persistence import auto_persist from plumpy.process_states import Continue, Wait from plumpy.processes import ProcessStateMachineMeta @@ -298,8 +299,6 @@ def _update_process_status(self) -> None: @override @Protect.final async def run(self) -> t.Any: - from plumpy.greenlet_bridge import greenlet_spawn - self._stepper = self.spec().get_outline().create_stepper(self) # type: ignore[arg-type] return await greenlet_spawn(self._do_step) diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index d9093a389d..b95f2ae991 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -19,6 +19,7 @@ from typing import Any, Callable, Dict, NamedTuple, Optional, Tuple, Type, Union import kiwipy +from plumpy import run_until_complete from plumpy.communications import wrap_communicator from plumpy.events import reset_event_loop_policy, set_event_loop_policy from plumpy.persistence import Persister @@ -156,7 +157,6 @@ def stop(self) -> None: def run_until_complete(self, future: asyncio.Future) -> Any: """Run the loop until the future has finished and return the result.""" - from plumpy import run_until_complete with utils.loop_scope(self._loop): return run_until_complete(self._loop, future) diff --git a/src/aiida/engine/transports.py b/src/aiida/engine/transports.py index 9b7e14fe83..9f606b7128 100644 --- a/src/aiida/engine/transports.py +++ b/src/aiida/engine/transports.py @@ -119,13 +119,11 @@ def do_open(): if transport_request.count == 0: # IMPORTANT: Pop from _transport_requests BEFORE closing the transport. # This prevents a race condition with async transports where: - # 1. close() is called, which for AsyncTransport uses run_until_complete(close_async) - # 2. With nest_asyncio (used by plumpy), this call yields back to the event loop - # 3. The event loop schedules close_async, then continues running another tasks - # - for example one that requests the transport which is scheduled to be closed - # 4. The task now using the transport to do some operation awaits, - # next the close_async task closes the transport while still in use -> error - # By poping first, new tasks will create a fresh transport request. + # 1. close() is called, which for AsyncTransport uses run_until_complete to run close_async + # 2. This can yield back to the event loop (via greenlet or re-entrant call) + # 3. The event loop then runs another task that requests the same transport + # 4. That task uses a transport that is being closed -> error + # By popping first, new tasks will create a fresh transport request. self._transport_requests.pop(authinfo.pk, None) if transport_request.future.done(): diff --git a/src/aiida/transports/transport.py b/src/aiida/transports/transport.py index 475d74f599..1030eacf20 100644 --- a/src/aiida/transports/transport.py +++ b/src/aiida/transports/transport.py @@ -1884,21 +1884,13 @@ async def __aexit__(self, type_, value, traceback): await self.close_async() def run_command_blocking(self, func, *args, **kwargs): - """The event loop must be the one of manager.""" - from plumpy.greenlet_bridge import await_only, in_worker_greenlet, run_in_thread + """Run an async transport method synchronously, handling nested event loop scenarios.""" + from plumpy import run_until_complete from aiida.manage import get_manager loop = get_manager().get_runner().loop - - if loop.is_running(): - if in_worker_greenlet(): - return await_only(func(*args, **kwargs)) - else: - # Pass a factory so the awaitable is created fresh in the new thread's loop - return run_in_thread(lambda: func(*args, **kwargs)) - else: - return loop.run_until_complete(func(*args, **kwargs)) + return run_until_complete(loop, func(*args, **kwargs)) def open(self): return self.run_command_blocking(self.open_async) diff --git a/tests/engine/test_transport.py b/tests/engine/test_transport.py index 2d11eafc6b..6fdb0d5395 100644 --- a/tests/engine/test_transport.py +++ b/tests/engine/test_transport.py @@ -137,7 +137,7 @@ def test_request_removed_before_close(self): This is a regression test for a race condition with async transports where: 1. close() is called, which for AsyncTransport uses run_until_complete() - 2. With nest_asyncio (used by plumpy), this can yield to the event loop + 2. This can yield back to the event loop (via greenlet for instance) 3. Another task might enter and get the same transport_request 4. That task tries to use the transport that's being closed -> error diff --git a/uv.lock b/uv.lock index f9af23068c..30763d8660 100644 --- a/uv.lock +++ b/uv.lock @@ -4930,7 +4930,7 @@ wheels = [ [[package]] name = "plumpy" version = "0.25.1" -source = { git = "https://github.com/khsrali/plumpy.git?rev=greenlet#58a0b9b02fa1bb49967c70630ee5644ffe062581" } +source = { git = "https://github.com/khsrali/plumpy.git?rev=greenlet#5bdb0f5ee50ef31ea2b720634ab4dcdaf7f5977d" } dependencies = [ { name = "greenlet" }, { name = "kiwipy", extra = ["rmq"] }, From 74984a22df29b1d4bbc9c164312a742d07310a34 Mon Sep 17 00:00:00 2001 From: Ali Khosravi Date: Mon, 2 Feb 2026 11:18:54 +0100 Subject: [PATCH 11/11] refactor naming --- src/aiida/engine/processes/functions.py | 4 ++-- src/aiida/engine/processes/workchains/workchain.py | 4 ++-- uv.lock | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/aiida/engine/processes/functions.py b/src/aiida/engine/processes/functions.py index 9d79bfcb15..030e087caf 100644 --- a/src/aiida/engine/processes/functions.py +++ b/src/aiida/engine/processes/functions.py @@ -705,9 +705,9 @@ async def run(self) -> 'ExitCode' | None: # The remaining inputs have to be keyword arguments. kwargs.update(**inputs) - from plumpy.greenlet_bridge import greenlet_spawn + from plumpy.greenlet_bridge import run_in_greenlet - result = await greenlet_spawn(self._func, *args, **kwargs) + result = await run_in_greenlet(self._func, *args, **kwargs) if result is None or isinstance(result, ExitCode): # type: ignore[redundant-expr] return result # type: ignore[unreachable] diff --git a/src/aiida/engine/processes/workchains/workchain.py b/src/aiida/engine/processes/workchains/workchain.py index dafef0f74b..7f2ca6d513 100644 --- a/src/aiida/engine/processes/workchains/workchain.py +++ b/src/aiida/engine/processes/workchains/workchain.py @@ -15,7 +15,7 @@ import logging import typing as t -from plumpy.greenlet_bridge import greenlet_spawn +from plumpy.greenlet_bridge import run_in_greenlet from plumpy.persistence import auto_persist from plumpy.process_states import Continue, Wait from plumpy.processes import ProcessStateMachineMeta @@ -300,7 +300,7 @@ def _update_process_status(self) -> None: @Protect.final async def run(self) -> t.Any: self._stepper = self.spec().get_outline().create_stepper(self) # type: ignore[arg-type] - return await greenlet_spawn(self._do_step) + return await run_in_greenlet(self._do_step) def _do_step(self) -> t.Any: """Execute the next step in the outline and return the result. diff --git a/uv.lock b/uv.lock index 30763d8660..5b21ea378b 100644 --- a/uv.lock +++ b/uv.lock @@ -4930,7 +4930,7 @@ wheels = [ [[package]] name = "plumpy" version = "0.25.1" -source = { git = "https://github.com/khsrali/plumpy.git?rev=greenlet#5bdb0f5ee50ef31ea2b720634ab4dcdaf7f5977d" } +source = { git = "https://github.com/khsrali/plumpy.git?rev=greenlet#c036472726030f58dadf8d93bf2cfb6019040fff" } dependencies = [ { name = "greenlet" }, { name = "kiwipy", extra = ["rmq"] },