Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions docs/source/tutorials/basic.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,16 @@ y = orm.Int(3)
Now it's time to multiply the two numbers!

```{code-cell} ipython3
multiply(x, y)
await multiply.run_async(x, y)
```

:::{note}
Because this tutorial runs as a Jupyter notebook, we use `await multiply.run_async(...)` instead of the synchronous `multiply(...)`.
Jupyter's kernel has an event loop already running, so process execution must be `await`-ed rather than run synchronously.
In a regular Python script, the event loop is not running, so you can call `multiply(x, y)` directly.
The same applies to `engine.run_async` vs `engine.run` used later in this tutorial.
:::

Success!
The `calcfunction`-decorated `multiply` function has multiplied the two `Int` data nodes and returned a new `Int` data node whose value is the product of the two input nodes.
Note that by executing the `multiply` function, all input and output nodes are automatically stored in the database:
Expand Down Expand Up @@ -372,10 +379,10 @@ One nifty feature of the builder is the ability to use tab completion for the in
Try it out by typing `builder.` + `<TAB>` in the verdi shell.
:::

To execute the `CalcJob`, we use the `run` function provided by the AiiDA engine, and wait for the process to complete:
To execute the `CalcJob`, we use the `run_async` function provided by the AiiDA engine, and wait for the process to complete:

```{code-cell} ipython3
engine.run(builder)
await engine.run_async(builder)
```

Besides the sum of the two `Int` nodes, the calculation function also returns two other outputs: one of type `RemoteData` and one of type `FolderData`.
Expand Down Expand Up @@ -532,7 +539,7 @@ Once the `WorkChain` input has been set up, we run it with the AiiDA engine:

```{code-cell} ipython3
from aiida import engine
engine.run(builder)
await engine.run_async(builder)
```

Now quickly leave the IPython shell and check the process list:
Expand Down Expand Up @@ -608,7 +615,7 @@ The provenance graph should be similar to the one we showed at the start of this

## Submitting to the daemon

When we used the `run` command in the previous sections, the IPython shell was blocked while it was waiting for the `CalcJob` to finish.
When we used the `run_async` function in the previous sections, the notebook cell was waiting for the `CalcJob` to finish.
This is not a problem when we're simply adding two number together, but if we want to run multiple calculations that take hours or days, this is no longer practical.
Instead, we are going to *submit* the `CalcJob` to the AiiDA *daemon*.
The daemon is a program that runs in the background and manages submitted calculations until they are *terminated*.
Expand Down
4 changes: 2 additions & 2 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ dependencies:
- docstring_parser
- get-annotations~=0.1
- python-graphviz~=0.19
- plumpy~=0.25.0
- plumpy@ git+https://github.com/khsrali/plumpy.git@greenlet
- ipython>=7.6
- jedi<0.19
- jinja2~=3.0
- kiwipy[rmq]~=0.8.4
- kiwipy[rmq]~=0.9.0
- importlib-metadata~=6.0
- numpy<3,>=1.21
- paramiko~=3.0
Expand Down
5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ dependencies = [
'docstring-parser',
'get-annotations~=0.1;python_version<"3.10"',
'graphviz~=0.19',
'plumpy~=0.25.0',
'plumpy @ git+https://github.com/khsrali/plumpy.git@greenlet',
'ipython>=7.6',
'jedi<0.19',
'jinja2~=3.0',
'kiwipy[rmq]~=0.8.4',
'kiwipy[rmq]~=0.9.0',
'importlib-metadata~=6.0',
'numpy>=1.21,<3',
'paramiko~=3.0',
Expand Down Expand Up @@ -412,7 +412,6 @@ filterwarnings = [
'ignore:The `Code` class is deprecated.*:aiida.common.warnings.AiidaDeprecationWarning',
# https://github.com/aiidateam/plumpy/issues/283
'ignore:There is no current event loop:DeprecationWarning:plumpy',
'ignore:There is no current event loop:DeprecationWarning:nest_asyncio',
# spglib deprecation
'ignore:dict interface is deprecated:DeprecationWarning',
# https://github.com/aiidateam/archive-path/issues/21
Expand Down
3 changes: 3 additions & 0 deletions src/aiida/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,11 @@
'process_handler',
'return_',
'run',
'run_async',
'run_get_node',
'run_get_node_async',
'run_get_pk',
'run_get_pk_async',
'submit',
'while_',
'workfunction',
Expand Down
64 changes: 63 additions & 1 deletion src/aiida/engine/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,16 @@
from .runners import ResultAndPk
from .utils import instantiate_process, is_process_scoped, prepare_inputs

__all__ = ('await_processes', 'run', 'run_get_node', 'run_get_pk', 'submit')
__all__ = (
'await_processes',
'run',
'run_async',
'run_get_node',
'run_get_node_async',
'run_get_pk',
'run_get_pk_async',
'submit',
)

TYPE_RUN_PROCESS = t.Union[Process, t.Type[Process], ProcessBuilder]
# run can also be process function, but it is not clear what type this should be
Expand All @@ -48,6 +57,23 @@ def run(process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwa
return runner.run(process, inputs, **kwargs)


async def run_async(
process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwargs: t.Any
) -> dict[str, t.Any]:
"""Run the process with the supplied inputs in a local runner without blocking the event loop.

:param process: the process class or process function to run
:param inputs: the inputs to be passed to the process
:return: the outputs of the process
"""
if isinstance(process, Process):
runner = process.runner
else:
runner = manager.get_manager().get_runner()

return await runner.run_async(process, inputs, **kwargs)


def run_get_node(
process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwargs: t.Any
) -> tuple[dict[str, t.Any], ProcessNode]:
Expand All @@ -65,6 +91,23 @@ def run_get_node(
return runner.run_get_node(process, inputs, **kwargs)


async def run_get_node_async(
process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwargs: t.Any
) -> tuple[dict[str, t.Any], ProcessNode]:
"""Run the process with the supplied inputs in a local runner without blocking the event loop.

:param process: the process class, instance, builder or function to run
:param inputs: the inputs to be passed to the process
:return: tuple of the outputs of the process and the process node
"""
if isinstance(process, Process):
runner = process.runner
else:
runner = manager.get_manager().get_runner()

return await runner.run_get_node_async(process, inputs, **kwargs)


def run_get_pk(process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwargs: t.Any) -> ResultAndPk:
"""Run the process with the supplied inputs in a local runner that will block until the process is completed.

Expand All @@ -80,6 +123,23 @@ def run_get_pk(process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None
return runner.run_get_pk(process, inputs, **kwargs)


async def run_get_pk_async(
process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwargs: t.Any
) -> ResultAndPk:
"""Run the process with the supplied inputs in a local runner without blocking the event loop.

:param process: the process class, instance, builder or function to run
:param inputs: the inputs to be passed to the process
:return: tuple of the outputs of the process and process node pk
"""
if isinstance(process, Process):
runner = process.runner
else:
runner = manager.get_manager().get_runner()

return await runner.run_get_pk_async(process, inputs, **kwargs)


def submit(
process: TYPE_SUBMIT_PROCESS,
inputs: dict[str, t.Any] | None = None,
Expand Down Expand Up @@ -182,3 +242,5 @@ def await_processes(nodes: t.Sequence[ProcessNode], wait_interval: int = 1) -> N
# Allow one to also use run.get_node and run.get_pk as a shortcut, without having to import the functions themselves
run.get_node = run_get_node # type: ignore[attr-defined]
run.get_pk = run_get_pk # type: ignore[attr-defined]
run_async.get_node = run_get_node_async # type: ignore[attr-defined]
run_async.get_pk = run_get_pk_async # type: ignore[attr-defined]
108 changes: 107 additions & 1 deletion src/aiida/engine/processes/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ def run_get_pk(self, *args: P.args, **kwargs: P.kwargs) -> tuple[dict[str, t.Any

def run_get_node(self, *args: P.args, **kwargs: P.kwargs) -> tuple[dict[str, t.Any] | None, N]: ...

async def run_async(self, *args: P.args, **kwargs: P.kwargs) -> R_co: ...

async def run_get_pk_async(self, *args: P.args, **kwargs: P.kwargs) -> tuple[dict[str, t.Any] | None, int]: ...

async def run_get_node_async(self, *args: P.args, **kwargs: P.kwargs) -> tuple[dict[str, t.Any] | None, N]: ...

is_process_function: bool

node_class: t.Type[N]
Expand Down Expand Up @@ -253,6 +259,83 @@ def kill_process(_num, _frame):

return result, process.node

async def run_get_node_async(*args, **kwargs) -> tuple[dict[str, t.Any] | None, 'ProcessNode']:
"""Run the FunctionProcess with the supplied inputs in a local runner without blocking the event loop.

:param args: input arguments to construct the FunctionProcess
:param kwargs: input keyword arguments to construct the FunctionProcess
:return: tuple of the outputs of the process and the process node
"""
frame_delta = 1000
frame_count = get_stack_size()
stack_limit = sys.getrecursionlimit()
LOGGER.info('Executing process function, current stack status: %d frames of %d', frame_count, stack_limit)

# If the current frame count is more than 80% of the stack limit, or comes within 200 frames, increase the
# stack limit by ``frame_delta``.
if frame_count > min(0.8 * stack_limit, stack_limit - 200):
LOGGER.warning(
'Current stack contains %d frames which is close to the limit of %d. Increasing the limit by %d',
frame_count,
stack_limit,
frame_delta,
)
sys.setrecursionlimit(stack_limit + frame_delta)

manager = get_manager()
runner = manager.get_runner()
inputs = process_class.create_inputs(*args, **kwargs)

# Remove all the known inputs from the kwargs
for port in process_class.spec().inputs:
kwargs.pop(port, None)

# If any kwargs remain, the spec should be dynamic, so we raise if it isn't
if kwargs and not process_class.spec().inputs.dynamic:
raise ValueError(f'{function.__name__} does not support these kwargs: {kwargs.keys()}')

process: Process = process_class(inputs=inputs, runner=runner)

# Only add handlers for interrupt signal to kill the process if we are in a local and not a daemon runner.
# Without this check, running process functions in a daemon worker would be killed if the daemon is shutdown
current_runner = manager.get_runner()
original_handler = None
kill_signal = signal.SIGINT

if not current_runner.is_daemon_runner:

def kill_process(_num, _frame):
"""Send the kill signal to the process in the current scope."""
LOGGER.critical('runner received interrupt, killing process %s', process.pid)
result = process.kill(msg_text='Process was killed because the runner received an interrupt')
return result

# Store the current handler on the signal such that it can be restored after process has terminated
original_handler = signal.getsignal(kill_signal)
signal.signal(kill_signal, kill_process)

try:
from aiida.engine import utils as engine_utils

with engine_utils.loop_scope(runner.loop):
await process.step_until_terminated()
process.future().result()
finally:
# If the `original_handler` is set, that means the `kill_process` was bound, which needs to be reset
if original_handler:
signal.signal(signal.SIGINT, original_handler)

store_provenance = inputs.get('metadata', {}).get('store_provenance', True)
if not store_provenance:
process.node._storable = False
process.node._unstorable_message = 'cannot store node because it was run with `store_provenance=False`'

result = process.outputs
if result and len(result) == 1 and process.SINGLE_OUTPUT_LINKNAME in result:
return result[process.SINGLE_OUTPUT_LINKNAME], process.node

return result, process.node

def run_get_pk(*args, **kwargs) -> tuple[dict[str, t.Any] | None, int]:
"""Recreate the `run_get_pk` utility launcher.

Expand All @@ -265,15 +348,36 @@ def run_get_pk(*args, **kwargs) -> tuple[dict[str, t.Any] | None, int]:
assert node.pk is not None
return result, node.pk

async def run_get_pk_async(*args, **kwargs) -> tuple[dict[str, t.Any] | None, int]:
"""Recreate the `run_get_pk` utility launcher without blocking the event loop.

:param args: input arguments to construct the FunctionProcess
:param kwargs: input keyword arguments to construct the FunctionProcess
:return: tuple of the outputs of the process and the process node pk

"""
result, node = await run_get_node_async(*args, **kwargs)
assert node.pk is not None
return result, node.pk

@functools.wraps(function)
def decorated_function(*args, **kwargs):
"""This wrapper function is the actual function that is called."""
result, _ = run_get_node(*args, **kwargs)
return result

@functools.wraps(function)
async def decorated_function_async(*args, **kwargs):
"""Async wrapper for running a process function without blocking the event loop."""
result, _ = await run_get_node_async(*args, **kwargs)
return result

decorated_function.run = decorated_function # type: ignore[attr-defined]
decorated_function.run_get_pk = run_get_pk # type: ignore[attr-defined]
decorated_function.run_get_node = run_get_node # type: ignore[attr-defined]
decorated_function.run_async = decorated_function_async # type: ignore[attr-defined]
decorated_function.run_get_pk_async = run_get_pk_async # type: ignore[attr-defined]
decorated_function.run_get_node_async = run_get_node_async # type: ignore[attr-defined]
decorated_function.is_process_function = True # type: ignore[attr-defined]
decorated_function.node_class = node_class # type: ignore[attr-defined]
decorated_function.process_class = process_class # type: ignore[attr-defined]
Expand Down Expand Up @@ -601,7 +705,9 @@ async def run(self) -> 'ExitCode' | None:
# The remaining inputs have to be keyword arguments.
kwargs.update(**inputs)

result = self._func(*args, **kwargs)
from plumpy.greenlet_bridge import run_in_greenlet

result = await run_in_greenlet(self._func, *args, **kwargs)

if result is None or isinstance(result, ExitCode): # type: ignore[redundant-expr]
return result # type: ignore[unreachable]
Expand Down
3 changes: 2 additions & 1 deletion src/aiida/engine/processes/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import plumpy.persistence
import plumpy.processes
from kiwipy.communications import UnroutableError
from plumpy import run_until_complete
from plumpy.process_states import Finished, ProcessState
from plumpy.processes import ConnectionClosed # type: ignore[attr-defined]
from plumpy.processes import Process as PlumpyProcess
Expand Down Expand Up @@ -361,7 +362,7 @@ def kill(self, msg_text: str | None = None, force_kill: bool = False) -> Union[b
coro = self._launch_task(task_kill_job, self.node, self.runner.transport)
self._cancelling_scheduler_job = asyncio.create_task(coro)
try:
self.loop.run_until_complete(self._cancelling_scheduler_job)
run_until_complete(self.loop, self._cancelling_scheduler_job)
except Exception as exc:
self.node.logger.error(f'While cancelling the scheduler job an error was raised: {exc}')
return False
Expand Down
3 changes: 2 additions & 1 deletion src/aiida/engine/processes/workchains/workchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging
import typing as t

from plumpy.greenlet_bridge import run_in_greenlet
from plumpy.persistence import auto_persist
from plumpy.process_states import Continue, Wait
from plumpy.processes import ProcessStateMachineMeta
Expand Down Expand Up @@ -299,7 +300,7 @@ def _update_process_status(self) -> None:
@Protect.final
async def run(self) -> t.Any:
self._stepper = self.spec().get_outline().create_stepper(self) # type: ignore[arg-type]
return self._do_step()
return await run_in_greenlet(self._do_step)

def _do_step(self) -> t.Any:
"""Execute the next step in the outline and return the result.
Expand Down
Loading
Loading