Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/746.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for the `cylc validate-reinstall` command.
186 changes: 161 additions & 25 deletions cylc/uiserver/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
DEVNULL,
PIPE,
Popen,
TimeoutExpired,
)
from textwrap import indent
from time import time
Expand Down Expand Up @@ -155,6 +156,35 @@ def _build_cmd(cmd: List, args: Dict) -> List:
return cmd


def _rename_keys(
dictionary: dict[str, Any], substitutions: Iterable[tuple[str, str]]
) -> dict[str, Any]:
"""Rename dictionary keys.

Args:
dictionary: The dictionary to modify.
substitutions: `[(before, after)]` pairs of keys to rename.

Returns:
The provided dictionary post-modification.

Examples:
>>> _rename_keys({'a': 1, 'b': 2}, [('b', 'c')])
{'a': 1, 'c': 2}

>>> _rename_keys({'a': 1, 'b': 2}, [('b', 'c'), ('c', 'd')])
{'a': 1, 'd': 2}

>>> _rename_keys({'a': 1, 'b': 2}, [('x', 'y')])
{'a': 1, 'b': 2}

"""
for before, after in substitutions:
if before in dictionary:
dictionary[after] = dictionary.pop(before)
return dictionary


def process_cat_log_stderr(text: bytes) -> str:
"""Tidy up cylc cat-log stderr.

Expand Down Expand Up @@ -219,31 +249,34 @@ class Services:
# log file stream lag
CAT_LOG_SLEEP = 1

# command timeout for commands which start schedulers
START_TIMEOUT = 120

@staticmethod
def _error(message: Union[Exception, str]):
"""Format error case response."""
return [
return (
False,
str(message)
]
)

@staticmethod
def _return(message: str):
"""Format success case response."""
return [
return (
True,
message
]
)

@classmethod
async def clean(
cls,
workflows_mgr: 'WorkflowsManager',
workflows: Iterable['Tokens'],
args: dict,
workflows_mgr: 'WorkflowsManager',
executor: 'Executor',
log: 'Logger'
):
) -> tuple[bool, str]:
"""Calls `cylc clean`"""
# Convert Schema options → cylc.flow.workflow_files.init_clean opts:
opts = _schema_opts_to_api_opts(args, schema=CleanOptions)
Expand Down Expand Up @@ -276,25 +309,51 @@ async def scan(
cls,
args: dict,
workflows_mgr: 'WorkflowsManager',
):
) -> tuple[bool, str]:
await workflows_mgr.scan()
return cls._return("Scan requested")

@classmethod
Comment thread
MetRonnie marked this conversation as resolved.
async def play(
async def run_command(
cls,
command: Iterable[str],
workflows: Iterable[Tokens],
args: Dict[str, Any],
workflows_mgr: 'WorkflowsManager',
log: 'Logger',
) -> List[Union[bool, str]]:
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed from list[bool | str] to tuple[bool, str] as this is safer for the way we are trying to use it (('msg', False) would have been acceptable before).

"""Calls `cylc play`."""
timeout: int,
success_msg: str = 'Command succeeded',
fail_msg: str = 'Command failed',
) -> tuple[bool, str]:
"""Calls the specified Cylc command.

Args:
command:
The Cylc subcommand to run.
e.g ["play"] or ["cat-log", "-m", "p"].
workflows:
The workflows to run this command against.
args:
CLI arguments to be provided to this command.
e.g {'color': 'never'} would result in "--color=never".
log:
The application log, used to record this command invocation.
timeout:
Length of time to wait for the command to complete.
The command will be killed if the timeout elapses.
success_msg:
Message to be used in the response if the command succeeds.
fail_msg:
Message to be used in the response if the command fails.

Returns:

"""
cylc_version = args.pop('cylc_version', None)
results: Dict[str, str] = {}
failed = False
for tokens in workflows:
try:
cmd = _build_cmd(['cylc', 'play', '--color=never'], args)
cmd = _build_cmd(['cylc', *command, '--color=never'], args)

if tokens['user'] and tokens['user'] != getuser():
return cls._error(
Expand All @@ -316,7 +375,7 @@ async def play(
env.pop('CYLC_ENV_NAME', None)
env['CYLC_VERSION'] = cylc_version

# run cylc play
# run command
proc = Popen(
cmd,
env=env,
Expand All @@ -325,11 +384,21 @@ async def play(
stderr=PIPE,
text=True
)
ret_code = proc.wait(timeout=120)

if ret_code:
msg = f"Command failed ({ret_code}): {cmd_repr}"
try:
ret_code = proc.wait(timeout=timeout)
except TimeoutExpired as exc:
proc.kill()
ret_code = 124 # mimic `timeout` command error code
# NOTE: preserve any stderr that the command produced this
# far as this may help with debugging
out, err = proc.communicate()
err = str(exc) + (('\n' + err) if err else '')
Comment on lines +388 to +396
Copy link
Copy Markdown
Member Author

@oliver-sanders oliver-sanders Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, it turns out this timeout logic was bunk. You have to actually .kill() to process if you want it to stop.

https://docs.python.org/3/library/subprocess.html#subprocess.Popen.communicate

Additionally, this will now log the command's stderr which might be useful in the event of a timeout.

else:
out, err = proc.communicate()

if ret_code:
msg = f"{fail_msg} ({ret_code}): {cmd_repr}"
results[wflow] = err.strip() or out.strip() or msg
log.error(
f"{msg}\n"
Expand All @@ -338,26 +407,87 @@ async def play(
)
failed = True
else:
results[wflow] = 'started'
results[wflow] = success_msg

except Exception as exc: # unexpected error
log.exception(exc)
return cls._error(exc)

if failed:
if len(results) == 1:
# all commands failed
return cls._error(results.popitem()[1])
# else log each workflow result on separate lines

# some commands failed
return cls._error(
# log each workflow result on separate lines
"\n\n" + "\n\n".join(
f"{wflow}: {msg}" for wflow, msg in results.items()
)
)

# all commands succeeded
return cls._return(f'Workflow(s) {success_msg}')

@classmethod
async def play(
cls,
workflows_mgr: 'WorkflowsManager',
workflows: Iterable[Tokens],
args: dict,
log,
**kwargs,
) -> tuple[bool, str]:
"""Calls `cylc play`."""
ret = await cls.run_command(
('play',),
workflows,
args,
log,
cls.START_TIMEOUT,
**kwargs,
success_msg='started',
)

# trigger a re-scan
await workflows_mgr.scan()

# return results
return ret

@classmethod
async def validate_reinstall(
cls,
workflows_mgr: 'WorkflowsManager',
workflows: Iterable[Tokens],
args: dict,
log,
**kwargs,
) -> tuple[bool, str]:
"""Calls `cylc validate-reinstall`."""
ret = await cls.run_command(
('validate-reinstall', '--yes'),
workflows,
# map GraphQL fields to CLI args where they differ
_rename_keys(args, [
# from "cylc reload":
('reload_global', 'global'),
# from "cylc reinstall"
('rose_opt_conf_keys', 'opt_conf_key'),
('rose_suite_defines', 'define'),
('rose_template_variables', 'rose_template_variable'),
]),
log,
cls.START_TIMEOUT,
**kwargs,
success_msg='reinstalled',
)

# trigger a re-scan
await workflows_mgr.scan()
# send a success message
return cls._return('Workflow(s) started')

# return results
return ret

@staticmethod
async def enqueue(
Expand Down Expand Up @@ -602,8 +732,7 @@ async def service(
command: str,
workflows: Iterable['Tokens'],
kwargs: Dict[str, Any],
) -> List[Union[bool, str]]:

) -> tuple[bool, str]:
# GraphQL v3 includes all variables that are set, even if set to null.
kwargs = {
k: v
Expand All @@ -613,19 +742,26 @@ async def service(

if command == 'clean': # noqa: SIM116
return await Services.clean(
self.workflows_mgr,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched arg order for consistency.

workflows,
kwargs,
self.workflows_mgr,
log=self.log,
executor=self.executor
)
elif command == 'play':
elif command == 'play': # noqa: SIM116
return await Services.play(
self.workflows_mgr,
workflows,
kwargs,
self.workflows_mgr,
log=self.log
)
elif command == 'validate_reinstall':
return await Services.validate_reinstall(
self.workflows_mgr,
workflows,
kwargs,
log=self.log,
)
elif command == 'scan':
return await Services.scan(
kwargs,
Expand Down
Loading