diff --git a/.gitignore b/.gitignore
index b18c09ad1eb..f7e26153db0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,6 +11,9 @@ __pycache__/
# vscode
.vscode
+# pycharm
+.idea
+
# processed workflow configs
*.rc.processed
*.cylc.processed
diff --git a/changes.d/6663.feat.md b/changes.d/6663.feat.md
new file mode 100644
index 00000000000..dc2112fb64f
--- /dev/null
+++ b/changes.d/6663.feat.md
@@ -0,0 +1 @@
+Added a Cylc job profiler which captures CPU and memory information from job runners which use cgroups. This information can be reviewed in the Analysis view in the GUI.
diff --git a/cylc/flow/cfgspec/globalcfg.py b/cylc/flow/cfgspec/globalcfg.py
index 16c68f290a3..3e04edcac56 100644
--- a/cylc/flow/cfgspec/globalcfg.py
+++ b/cylc/flow/cfgspec/globalcfg.py
@@ -1477,6 +1477,38 @@ def default_for(
.. versionadded:: 8.0.0
''')
+
+ with Conf('profiler', desc='''
+ Configure the Cylc job profiler.
+
+ This tool can capture CPU and memory information from
+ job runners which use cgroups such as PBS and Slurm.
+
+ .. versionadded:: 8.7.0
+ '''):
+ Conf('activate', VDR.V_BOOLEAN, False, desc='''
+ Enable the Cylc profiler for this platform.
+ ''')
+ Conf('cgroups path', VDR.V_STRING,
+ default='/sys/fs/cgroup',
+ desc='''
+ Configure the path to the cgroups filesystem.
+
+ The default value is the standard
+ location for cgroups on linux and should work in
+ most circumstances
+ ''')
+ Conf('polling interval', VDR.V_INTEGER,
+ default=10,
+ desc='''
+ Configure the profiler polling interval.
+
+ The interval (in seconds) at which the profiler will
+ poll the cgroups filesystem for resource usage data.
+ The default value of 10 seconds should be sufficient for
+ most use cases, but can be adjusted as needed.
+ ''')
+
Conf('job runner', VDR.V_STRING, 'background', desc=f'''
The system used to run jobs on the platform.
diff --git a/cylc/flow/etc/job.sh b/cylc/flow/etc/job.sh
index c64edfda298..263ebe489b1 100755
--- a/cylc/flow/etc/job.sh
+++ b/cylc/flow/etc/job.sh
@@ -139,6 +139,12 @@ cylc__job__main() {
mkdir -p "$(dirname "${CYLC_TASK_WORK_DIR}")" || true
mkdir -p "${CYLC_TASK_WORK_DIR}"
cd "${CYLC_TASK_WORK_DIR}"
+
+ if [[ "${CYLC_PROFILER}" == "True" ]] ; then
+ cylc profile -m "${CYLC_PROFILER_CGROUPS}" -i "${CYLC_PROFILER_POLL_INTERVAL}" &
+ export profiler_pid="$!"
+ fi
+
# Env-Script, User Environment, Pre-Script, Script and Post-Script
# Run user scripts in subshell to protect cylc job script from interference.
# Waiting on background process allows signal traps to trigger immediately.
@@ -157,11 +163,15 @@ cylc__job__main() {
cylc__set_return "$ret_code"
fi
}
+ # Grab the max rss and cpu_time and clean up before changing directory
+ cylc__kill_profiler
# Empty work directory remove
cd
rmdir "${CYLC_TASK_WORK_DIR}" 2>'/dev/null' || true
# Send task succeeded message
+
wait "${CYLC_TASK_MESSAGE_STARTED_PID}" 2>'/dev/null' || true
+
cylc message -- "${CYLC_WORKFLOW_ID}" "${CYLC_TASK_JOB}" 'succeeded' || true
# (Ignore shellcheck "globbing and word splitting" warning here).
# shellcheck disable=SC2086
@@ -187,6 +197,14 @@ cylc__set_return() {
return "${1:-0}"
}
+###############################################################################
+# Save the data using cylc message and exit the profiler
+cylc__kill_profiler() {
+ if [[ -n "${profiler_pid:-}" ]] && ps -p "$profiler_pid" > /dev/null; then
+ kill -s SIGINT "${profiler_pid}" || true
+ fi
+}
+
###############################################################################
# Disable selected or all (if no arguments given) fail traps.
# Globals:
@@ -268,6 +286,9 @@ cylc__job_finish_err() {
# (Ignore shellcheck "globbing and word splitting" warning here).
# shellcheck disable=SC2086
trap '' ${CYLC_VACATION_SIGNALS:-} ${CYLC_FAIL_SIGNALS}
+
+ cylc__kill_profiler
+
if [[ -n "${CYLC_TASK_MESSAGE_STARTED_PID:-}" ]]; then
wait "${CYLC_TASK_MESSAGE_STARTED_PID}" 2>'/dev/null' || true
fi
diff --git a/cylc/flow/exceptions.py b/cylc/flow/exceptions.py
index 7e55c014756..6faab0e25b9 100644
--- a/cylc/flow/exceptions.py
+++ b/cylc/flow/exceptions.py
@@ -529,3 +529,16 @@ def __init__(self, message, expr=None):
def __str__(self):
return self.message
+
+
+class CylcProfilerError(CylcError):
+ """Exception for errors raised from the cylc profiler. These errors do not
+ affect workflows functionally, just stats gathering. We don't want to
+ panic users."""
+ def __init__(self, exc: Exception, error_msg: str) -> None:
+ CylcError.__init__(
+ self,
+ f"{exc}. {error_msg}. This error came from the Cylc profiler"
+ f" and is not a problem with your workflow. Statistics gathering "
+ f"for the analysis view may be incomplete."
+ )
diff --git a/cylc/flow/job_file.py b/cylc/flow/job_file.py
index 930331dc5a4..100d5c0da23 100644
--- a/cylc/flow/job_file.py
+++ b/cylc/flow/job_file.py
@@ -224,8 +224,16 @@ def _write_task_environment(self, handle, job_conf):
'\n export CYLC_TASK_TRY_NUMBER=%s' % job_conf['try_num'])
handle.write(
"\n export CYLC_TASK_FLOW_NUMBERS="
- f"{','.join(str(f) for f in job_conf['flow_nums'])}"
- )
+ f"{','.join(str(f) for f in job_conf['flow_nums'])}")
+ handle.write(
+ "\n export CYLC_PROFILER="
+ f"{job_conf['platform']['profiler']['activate']}")
+ handle.write(
+ "\n export CYLC_PROFILER_CGROUPS="
+ f"{job_conf['platform']['profiler']['cgroups path']}")
+ handle.write(
+ "\n export CYLC_PROFILER_POLL_INTERVAL="
+ f"{job_conf['platform']['profiler']['polling interval']}")
# Standard parameter environment variables
for var, val in job_conf['param_var'].items():
handle.write('\n export CYLC_TASK_PARAM_%s="%s"' % (var, val))
diff --git a/cylc/flow/scripts/message.py b/cylc/flow/scripts/message.py
index 03a4eed3bf8..a35295d236b 100755
--- a/cylc/flow/scripts/message.py
+++ b/cylc/flow/scripts/message.py
@@ -93,12 +93,13 @@
"""
+import asyncio
from logging import getLevelName, INFO
import os
import sys
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Sequence
-from cylc.flow.id_cli import parse_id
+from cylc.flow.id_cli import parse_id_async
from cylc.flow.option_parsers import (
WORKFLOW_ID_ARG_DOC,
CylcOptionParser as COP
@@ -142,6 +143,10 @@ def main(parser: COP, options: 'Values', *args: str) -> None:
if not args:
parser.error('No message supplied')
return
+ asyncio.run(_main(options, args))
+
+
+async def _main(options: 'Values', args: Sequence[str]) -> None:
if len(args) <= 2:
# BACK COMPAT: args <= 2
# from:
@@ -160,7 +165,7 @@ def main(parser: COP, options: 'Values', *args: str) -> None:
message_strs = list(args)
else:
workflow_id, job_id, *message_strs = args
- workflow_id, *_ = parse_id(
+ workflow_id, *_ = await parse_id_async(
workflow_id,
constraint='workflows',
)
@@ -198,4 +203,4 @@ def main(parser: COP, options: 'Values', *args: str) -> None:
messages.append([options.severity, message_str.strip()])
else:
messages.append([getLevelName(INFO), message_str.strip()])
- record_messages(workflow_id, job_id, messages)
+ await record_messages(workflow_id, job_id, messages)
diff --git a/cylc/flow/scripts/profiler.py b/cylc/flow/scripts/profiler.py
new file mode 100755
index 00000000000..e511fdf1519
--- /dev/null
+++ b/cylc/flow/scripts/profiler.py
@@ -0,0 +1,282 @@
+#!/usr/bin/env python3
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+"""cylc profiler [OPTIONS]
+
+Profiler which periodically polls cgroups to track
+the resource usage of jobs running on the node.
+"""
+
+import asyncio
+from contextlib import suppress
+import json
+import os
+import re
+import signal
+import psutil
+
+from pathlib import Path
+from dataclasses import dataclass
+
+from cylc.flow.exceptions import CylcProfilerError
+from cylc.flow.option_parsers import CylcOptionParser as COP
+from cylc.flow.remote import watch_and_kill
+from cylc.flow.task_message import record_messages
+from cylc.flow.terminal import cli_function
+
+INTERNAL = True
+PID_REGEX = re.compile(r"([^:]*\d{6,}.*)")
+RE_CPU_USAGE = re.compile(r'usage_usec\s*(\d+)')
+
+
+@dataclass
+class Process:
+ """Class for representing CPU and Memory usage of a process"""
+ cgroup_memory_path: str
+ max_rss: int
+ cgroup_cpu_path: str
+ memory_allocated_path: str
+ cgroup_version: int
+
+
+async def stop_profiler(process, comms_timeout, tasks, *_args):
+ """Stop the profiler and return its data to the scheduler.
+
+ This function will be executed when the profiler receives a stop signal.
+ """
+ # stop the profiler
+ for task in tasks:
+ task.cancel()
+
+ # extract the stats
+ profiler_data = get_profiler_data(process)
+
+ # send a task message to the scheduler / write message to job.status file
+ await record_messages(
+ os.environ['CYLC_WORKFLOW_ID'],
+ os.environ['CYLC_TASK_JOB'],
+ [['DEBUG', f'_cylc_profiler: {json.dumps(profiler_data)}']],
+ comms_timeout=comms_timeout,
+ )
+
+
+def get_profiler_data(process):
+ if (
+ process.cgroup_memory_path is None
+ or process.cgroup_cpu_path is None
+ or process.memory_allocated_path is None
+ ):
+ # If a task fails instantly, or finishes very quickly (< 1 second),
+ # the get config function doesn't have time to run
+ max_rss = cpu_time = memory_allocated = 0
+ else:
+ max_rss = process.max_rss
+ cpu_time = parse_cpu_file(process)
+ memory_allocated = parse_memory_allocated(process)
+ return {
+ 'max_rss': max_rss,
+ 'cpu_time': cpu_time,
+ 'memory_allocated': memory_allocated,
+ }
+
+
+def parse_memory_file(process: Process):
+ """Open the memory stat file and copy the appropriate data"""
+
+ try:
+ if process.cgroup_version == 2:
+ with open(process.cgroup_memory_path, 'r') as f:
+ for line in f:
+ if "anon" in line:
+ return int(''.join(filter(str.isdigit, line)))
+ else:
+ with open(process.cgroup_memory_path, 'r') as f:
+ for line in f:
+ if "total_rss" in line:
+ return int(''.join(filter(str.isdigit, line)))
+ except Exception as err:
+ raise CylcProfilerError(
+ err, "Unable to find memory usage data") from err
+
+
+def parse_memory_allocated(process: Process) -> int:
+ """Open the memory stat file and copy the appropriate data"""
+ if process.cgroup_version == 2:
+ cgroup_memory_path = Path(process.memory_allocated_path)
+ for _ in range(5):
+ with open(cgroup_memory_path / "memory.max", 'r') as f:
+ line = f.readline()
+ if "max" not in line:
+ return int(line)
+ cgroup_memory_path = cgroup_memory_path.parent
+ return 0
+ else: # Memory limit not tracked for cgroups v1
+ return 0
+
+
+def parse_cpu_file(process: Process) -> int:
+ """Open the CPU stat file and return the appropriate data"""
+ try:
+ if process.cgroup_version == 2:
+ with open(process.cgroup_cpu_path, 'r') as f:
+ for line in f:
+ if match := RE_CPU_USAGE.search(line):
+ return round(int(match.group(1)) / 1000)
+ raise FileNotFoundError(process.cgroup_cpu_path)
+
+ elif process.cgroup_version == 1:
+ with open(process.cgroup_cpu_path, 'r') as f:
+ for line in f:
+ # Cgroups v1 uses nanoseconds
+ return round(int(line) / 1000000)
+ raise FileNotFoundError(process.cgroup_cpu_path)
+
+ except Exception as err:
+ raise CylcProfilerError(
+ err, "Unable to find cpu usage data") from err
+ return 0
+
+
+def get_cgroup_version(cgroup_location: str, cgroup_name: str) -> int:
+ try:
+ if Path.exists(Path(cgroup_location + cgroup_name)):
+ return 2
+ elif Path.exists(Path(cgroup_location + "/memory" + cgroup_name)):
+ return 1
+ raise FileNotFoundError(cgroup_location + cgroup_name)
+ except Exception as err:
+ raise CylcProfilerError(
+ err, "Cgroup not found at " + cgroup_location +
+ cgroup_name) from err
+
+
+def get_cgroup_name():
+ """Get the cgroup directory for the current process"""
+
+ # fugly hack to allow functional tests to use test data
+ if 'profiler_test_env_var' in os.environ:
+ return os.environ['profiler_test_env_var']
+
+ # Get the PID of the current process
+ pid = os.getpid()
+ try:
+ # Get the cgroup information for the current process
+ with open('/proc/' + str(pid) + '/cgroup', 'r') as f:
+ result = f.read()
+ return PID_REGEX.search(result).group()
+
+ except Exception as err:
+ raise CylcProfilerError(
+ err, '/proc/' + str(pid) + '/cgroup not found') from err
+
+
+def get_cgroup_paths(location) -> Process:
+
+ try:
+ cgroup_name = get_cgroup_name()
+ cgroup_version = get_cgroup_version(location, cgroup_name)
+ if cgroup_version == 2:
+ return Process(
+ cgroup_memory_path=location +
+ cgroup_name + "/" + "memory.stat",
+ cgroup_cpu_path=location +
+ cgroup_name + "/" + "cpu.stat",
+ memory_allocated_path=location + cgroup_name,
+ cgroup_version=cgroup_version,
+ max_rss=0,
+ )
+
+ elif cgroup_version == 1:
+ return Process(
+ cgroup_memory_path=location + "memory/" +
+ cgroup_name + "/memory.stat",
+ cgroup_cpu_path=location + "cpu/" +
+ cgroup_name + "/cpuacct.usage",
+ memory_allocated_path="",
+ cgroup_version=cgroup_version,
+ max_rss=0,
+ )
+ raise Exception
+ except Exception as err:
+ raise CylcProfilerError(
+ err, "Unable to determine cgroup version") from err
+
+
+async def profile(process: Process, delay, keep_looping=lambda: True):
+ # The infinite loop that will constantly poll the cgroup
+ # The lambda function is used to allow the loop to be stopped in unit tests
+
+ while keep_looping():
+ # Polling the cgroup for memory and keeping track of the max rss value
+ max_rss = parse_memory_file(process)
+ if max_rss is not None and max_rss > process.max_rss:
+ process.max_rss = max_rss
+ await asyncio.sleep(delay)
+
+
+def get_option_parser() -> COP:
+ parser = COP(
+ __doc__,
+ comms=True,
+ argdoc=[
+ ],
+ )
+ parser.add_option(
+ "-i", type=int,
+ help="interval between query cycles in seconds", dest="delay")
+ parser.add_option(
+ "-m", type=str, help="Location of cgroups directory",
+ dest="cgroup_location")
+
+ return parser
+
+
+@cli_function(get_option_parser)
+def main(_parser: COP, options) -> None:
+ """CLI main."""
+ with suppress(SystemExit, asyncio.exceptions.CancelledError, Exception):
+ asyncio.run(_main(options))
+
+
+async def _main(options) -> None:
+ # get cgroup information
+ process = get_cgroup_paths(options.cgroup_location)
+
+ # list of asyncio tasks
+ tasks: list[asyncio.Task] = []
+
+ # Register the stop_profiler function with the signal library
+ # The signal library doesn't work with asyncio, so we have to use the
+ # loop's add_signal_handler function instead
+ loop = asyncio.get_running_loop()
+ for sig in (signal.SIGINT, signal.SIGHUP, signal.SIGTERM):
+ loop.add_signal_handler(
+ sig,
+ lambda: asyncio.create_task(
+ stop_profiler(process, options.comms_timeout, tasks)
+ ),
+ )
+
+ # the profiler will run until one of these coroutines calls `sys.exit`:
+ tasks.extend([
+ # run the profiler itself
+ asyncio.create_task(profile(process, options.delay)),
+
+ # kill the profiler if its PPID changes
+ # (i.e, if the job exits before the profiler does)
+ asyncio.create_task(watch_and_kill(psutil.Process(os.getpid()))),
+ ])
+ await asyncio.gather(*tasks)
diff --git a/cylc/flow/task_message.py b/cylc/flow/task_message.py
index 6f4f41bd5ad..85067ca27d5 100644
--- a/cylc/flow/task_message.py
+++ b/cylc/flow/task_message.py
@@ -92,24 +92,31 @@ def split_run_signal(message: str) -> tuple[str, str | None]:
return prefix, signal[0] if signal else None
-def record_messages(workflow: str, job_id: str, messages: List[list]) -> None:
+async def record_messages(
+ workflow: str,
+ job_id: str,
+ messages: List[list],
+ comms_timeout: float | None = None,
+) -> None:
"""Record task job messages.
- Print the messages according to their severity.
- Write the messages in the job status file.
- Send the messages to the workflow, if possible.
+ * Print the messages according to their severity.
+ * Write the messages in the job status file.
+ * Send the messages to the workflow, if possible.
Arguments:
workflow: Workflow ID.
job_id: Job identifier "CYCLE/TASK_NAME/SUBMIT_NUM".
messages: List of messages "[[severity, message], ...]".
+ comms_timeout: Used for sending messages if appropriate.
"""
# Record the event time, in case the message is delayed in some way.
event_time = get_current_time_string(
override_use_utc=(os.getenv('CYLC_UTC') == 'True'))
write_messages(workflow, job_id, messages, event_time)
if get_comms_method() != CommsMeth.POLL:
- send_messages(workflow, job_id, messages, event_time)
+ await send_messages(workflow, job_id, messages,
+ event_time, comms_timeout)
def write_messages(workflow, job_id, messages, event_time):
@@ -125,12 +132,16 @@ def write_messages(workflow, job_id, messages, event_time):
_append_job_status_file(workflow, job_id, event_time, messages)
-def send_messages(
- workflow: str, job_id: str, messages: List[list], event_time: str
+async def send_messages(
+ workflow: str,
+ job_id: str,
+ messages: List[list],
+ event_time: str,
+ comms_timeout: float | None = None,
) -> None:
workflow = os.path.normpath(workflow)
try:
- pclient = get_client(workflow)
+ pclient = get_client(workflow, timeout=comms_timeout)
except WorkflowStopped:
# on a remote host this means the contact file is not present
# either the workflow is stopped or the contact file is not present
@@ -153,7 +164,7 @@ def send_messages(
'messages': messages,
}
}
- pclient('graphql', mutation_kwargs)
+ await pclient.async_request('graphql', mutation_kwargs)
def _append_job_status_file(workflow, job_id, event_time, messages):
diff --git a/setup.cfg b/setup.cfg
index c3b4b4057b4..8c4a131a074 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -169,6 +169,7 @@ cylc.command =
ping = cylc.flow.scripts.ping:main
play = cylc.flow.scripts.play:main
poll = cylc.flow.scripts.poll:main
+ profiler = cylc.flow.scripts.profiler:main
psutils = cylc.flow.scripts.psutil:main
reinstall = cylc.flow.scripts.reinstall:main
release = cylc.flow.scripts.release:main
diff --git a/tests/functional/cylc-cat-log/12-delete-kill.t b/tests/functional/cylc-cat-log/12-delete-kill.t
index c642d4995ec..fd43bdedd65 100644
--- a/tests/functional/cylc-cat-log/12-delete-kill.t
+++ b/tests/functional/cylc-cat-log/12-delete-kill.t
@@ -33,6 +33,7 @@ __EOF__
log_file="${WORKFLOW_RUN_DIR}/log/foo.log"
echo "Hello, Mr. Thompson" > "$log_file"
+
TEST_NAME="${TEST_NAME_BASE}-delete"
cylc cat-log --mode=tail "$WORKFLOW_NAME" -f foo.log 2>&1 &
cat_log_pid="$!"
diff --git a/tests/functional/jobscript/03-profiler-e2e.t b/tests/functional/jobscript/03-profiler-e2e.t
new file mode 100644
index 00000000000..64c88fb82f3
--- /dev/null
+++ b/tests/functional/jobscript/03-profiler-e2e.t
@@ -0,0 +1,84 @@
+#!/usr/bin/env bash
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#-------------------------------------------------------------------------------
+# Cylc profile test
+# # Cylc profile test. This test will run the Cylc profiler with real data
+# and test to see that the profiler data is received and processed correctly.
+# NOTE: This test will run the Cylc profiler on the given test platform.
+# The test platform may need to be configured for this to work (e.g.
+# "cgroups path" may need to be set).
+export REQUIRE_PLATFORM='runner:?(pbs|slurm) comms:tcp'
+. "$(dirname "$0")/test_header"
+set_test_number 8
+
+create_test_global_config "
+[platforms]
+ [[${CYLC_TEST_PLATFORM}]]
+ [[[profiler]]]
+ activate = True
+ polling interval = 10
+ [[localhost]]
+ [[[profiler]]]
+ activate = True
+ polling interval = 10
+ cgroups path = the/thing/that/should/not/be
+"
+
+init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
+#!Jinja2
+
+[scheduling]
+ [[graph]]
+ R1 = the_good & the_bad? & the_ugly
+
+[runtime]
+ [[the_good]]
+ # this task should succeeded normally
+ platform = {{ environ['CYLC_TEST_PLATFORM'] }}
+ script = sleep 1
+ [[the_bad]]
+ # this task should fail (it should still send profiling info)
+ platform = {{ environ['CYLC_TEST_PLATFORM'] }}
+ script = sleep 5; false
+ [[the_ugly]]
+ # this task should succeed despite the broken profiler configuration
+ platform = localhost
+ script = sleep 5
+__FLOW_CONFIG__
+
+run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
+workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --debug --no-detach "${WORKFLOW_NAME}"
+
+# ensure the cpu and memory messages were received and that these messages
+# were received before the succeeded message
+log_scan "${TEST_NAME_BASE}-task-succeeded" \
+ "${WORKFLOW_RUN_DIR}/log/scheduler/log" 1 0 \
+ '1/the_good.*(received)_cylc_profiler.*cpu_time' \
+ '1/the_good.*(received)succeeded'
+
+# ensure the cpu and memory messages were received and that these messages
+# were received before the failed message
+log_scan "${TEST_NAME_BASE}-task-succeeded" \
+ "${WORKFLOW_RUN_DIR}/log/scheduler/log" 1 0 \
+ '1/the_bad.*(received)_cylc_profiler.*cpu_time.*' \
+ '1/the_bad.*failed'
+
+# ensure this task succeeded despite the broken profiler configuration
+grep_workflow_log_ok "${TEST_NAME_BASE}-broken" '1/the_ugly.*(received)succeeded'
+grep_ok 'Unable to determine cgroup version' "$(cylc cat-log "${WORKFLOW_NAME}//1/the_ugly" -f e -m p)"
+
+purge
diff --git a/tests/functional/jobscript/04-profiler.t b/tests/functional/jobscript/04-profiler.t
new file mode 100644
index 00000000000..1142687e517
--- /dev/null
+++ b/tests/functional/jobscript/04-profiler.t
@@ -0,0 +1,80 @@
+#!/usr/bin/env bash
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#-------------------------------------------------------------------------------
+# Cylc profile test. This test will run the Cylc profiler with mocked data
+# and test to see that the profiler data is received and processed correctly.
+
+. "$(dirname "$0")/test_header"
+
+if [[ "$OSTYPE" != "linux-gnu"* ]]; then
+ skip_all "Tests not compatible with $OSTYPE"
+fi
+
+set_test_number 6
+
+# Set up test data
+mkdir -p "${PWD}/cgroups_test_data"
+
+echo 'anon 12345678' > cgroups_test_data/memory.stat
+echo '123456789' > cgroups_test_data/memory.max
+printf "blah blah 123456\nusage_usec 56781234" > cgroups_test_data/cpu.stat
+
+export profiler_test_env_var='/cgroups_test_data'
+create_test_global_config "
+[platforms]
+ [[localhost]]
+ [[[profiler]]]
+ activate = True
+ cgroups path = ${PWD}
+"
+
+init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
+#!Jinja2
+
+[scheduling]
+ [[graph]]
+ R1 = the_good & the_bad?
+
+[runtime]
+ [[the_good]]
+ # this task should succeeded normally
+ platform = localhost
+ script = sleep 5
+ [[the_bad]]
+ # this task should fail (it should still send profiling info)
+ platform = localhost
+ script = sleep 5; false
+__FLOW_CONFIG__
+
+run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
+workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --debug --no-detach "${WORKFLOW_NAME}"
+
+# ensure the cpu and memory messages were received and that these messages
+# were received before the succeeded message
+log_scan "${TEST_NAME_BASE}-task-succeeded" \
+ "${WORKFLOW_RUN_DIR}/log/scheduler/log" 1 0 \
+ '1/the_good.*(received)_cylc_profiler.*cpu_time' \
+ '1/the_good.*(received)succeeded'
+
+# ensure the cpu and memory messages were received and that these messages
+# were received before the failed message
+log_scan "${TEST_NAME_BASE}-task-succeeded" \
+ "${WORKFLOW_RUN_DIR}/log/scheduler/log" 1 0 \
+ '1/the_bad.*(received)_cylc_profiler.*cpu_time.*' \
+ '1/the_bad.*failed'
+
+purge
diff --git a/tests/unit/scripts/test_profiler.py b/tests/unit/scripts/test_profiler.py
new file mode 100644
index 00000000000..3c92ad4138f
--- /dev/null
+++ b/tests/unit/scripts/test_profiler.py
@@ -0,0 +1,427 @@
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+# Tests for functions contained in cylc.flow.scripts.profiler
+import os
+from unittest import mock
+
+import pytest
+
+from cylc.flow.exceptions import CylcProfilerError
+from cylc.flow.scripts.profiler import (
+ Process,
+ _main,
+ get_cgroup_name,
+ get_cgroup_paths,
+ get_cgroup_version,
+ get_profiler_data,
+ parse_cpu_file,
+ parse_memory_allocated,
+ parse_memory_file,
+ profile,
+ stop_profiler,
+)
+
+
+async def test_stop_profiler(monkeypatch, tmpdir):
+ """It should capture and record data profiler is stopped."""
+ monkeypatch.setenv('CYLC_WORKFLOW_ID', "test_value")
+ monkeypatch.setenv('CYLC_TASK_JOB', "test_task_job")
+
+ # capture record_messages calls
+ record_messages = mock.AsyncMock()
+ monkeypatch.setattr(
+ 'cylc.flow.scripts.profiler.record_messages', record_messages
+ )
+
+ # mock the cGroup filesystem
+ mem_file = tmpdir.join("memory_file.txt")
+ mem_file.write('total_rss=1234')
+ cpu_file = tmpdir.join("cpu_file.txt")
+ cpu_file.write('5678000')
+ # NOTE: memory allocated is not available for cGroups v1
+ mem_allocated_file = tmpdir.join("memory_allocated.txt")
+ mem_allocated_file.write('99999')
+
+ process_object = Process(
+ cgroup_memory_path=mem_file,
+ cgroup_cpu_path=cpu_file,
+ memory_allocated_path=mem_allocated_file,
+ cgroup_version=1,
+ max_rss=42,
+ )
+
+ # tell the profiler to stop
+ await stop_profiler(process_object, 1, [])
+
+ # the profiler should record a "cylc message" with the profiler data
+ assert record_messages.call_args_list == [
+ mock.call(
+ 'test_value',
+ 'test_task_job',
+ [
+ [
+ 'DEBUG',
+ '_cylc_profiler:'
+ ' {"max_rss": 42, "cpu_time": 6, "memory_allocated": 0}',
+ ]
+ ],
+ comms_timeout=1,
+ )
+ ]
+
+
+def test_get_resource_usage():
+ """It should return 0 if cGroup information is not provided."""
+ process_object = Process(
+ cgroup_memory_path=None,
+ cgroup_cpu_path=None,
+ memory_allocated_path=None,
+ cgroup_version=1,
+ max_rss=0)
+
+ assert get_profiler_data(process_object) == {
+ 'max_rss': 0,
+ 'cpu_time': 0,
+ 'memory_allocated': 0,
+ }
+
+
+def test_parse_memory_file(tmpdir):
+ """It should return the memory usage of the process."""
+ mem_file_v1 = tmpdir.join("memory_file_v1.txt")
+ mem_file_v1.write('total_rss=1024')
+ mem_file_v2 = tmpdir.join("memory_file_v2.txt")
+ mem_file_v2.write('anon=666')
+ cpu_file = tmpdir.join("cpu_file.txt")
+ cpu_file.write('5678')
+ mem_allocated_file = tmpdir.join("memory_allocated.txt")
+ mem_allocated_file.write('99999')
+
+ good_process_object_v1 = Process(
+ cgroup_memory_path=mem_file_v1,
+ cgroup_cpu_path=cpu_file,
+ memory_allocated_path=mem_allocated_file,
+ cgroup_version=1,
+ max_rss=0)
+ good_process_object_v2 = Process(
+ cgroup_memory_path=mem_file_v2,
+ cgroup_cpu_path=cpu_file,
+ memory_allocated_path=mem_allocated_file,
+ cgroup_version=2,
+ max_rss=0)
+ bad_process_object = Process(
+ cgroup_memory_path='',
+ cgroup_cpu_path='',
+ memory_allocated_path='',
+ cgroup_version=1,
+ max_rss=0)
+
+ with pytest.raises(CylcProfilerError) as excinfo:
+ parse_memory_file(bad_process_object)
+ assert "Unable to find memory usage data" in str(excinfo.value)
+
+ # Test the parse_memory_file function
+ assert parse_memory_file(good_process_object_v1) == 1024
+ assert parse_memory_file(good_process_object_v2) == 666
+
+
+def test_parse_cpu_file(tmpdir):
+ """It should return the cpu usage of the process."""
+ mem_file = tmpdir.join("memory_file.txt")
+ mem_file.write('1024')
+ cpu_file_v1_good = tmpdir.join("cpu_file_v1_good.txt")
+ cpu_file_v1_good.write('1234567890')
+ cpu_file_v1_bad = tmpdir.join("cpu_file_v1_bad.txt")
+ cpu_file_v1_bad.write("I'm your dream, mind ashtray")
+ cpu_file_v2_good = tmpdir.join("cpu_file_v2_good.txt")
+ cpu_file_v2_good.write('usage_usec 1234567890')
+ cpu_file_v2_bad = tmpdir.join("cpu_file_v2_bad.txt")
+ cpu_file_v2_bad.write('Give me fuel, give me fire, '
+ 'give me that which I desire')
+ mem_allocated_file = tmpdir.join("memory_allocated.txt")
+ mem_allocated_file.write('99999')
+
+ good_process_object_v1 = Process(
+ cgroup_memory_path=mem_file,
+ cgroup_cpu_path=cpu_file_v1_good,
+ memory_allocated_path=mem_allocated_file,
+ cgroup_version=1,
+ max_rss=0)
+ good_process_object_v2 = Process(
+ cgroup_memory_path=mem_file,
+ cgroup_cpu_path=cpu_file_v2_good,
+ memory_allocated_path=mem_allocated_file,
+ cgroup_version=2,
+ max_rss=0)
+ bad_process_object_v1_1 = Process(
+ cgroup_memory_path='',
+ cgroup_cpu_path='',
+ memory_allocated_path='',
+ cgroup_version=1,
+ max_rss=0)
+ bad_process_object_v1_2 = Process(
+ cgroup_memory_path=mem_file,
+ cgroup_cpu_path=cpu_file_v1_bad,
+ memory_allocated_path=mem_allocated_file,
+ cgroup_version=1,
+ max_rss=0)
+ bad_process_object_v2 = Process(
+ cgroup_memory_path=mem_file,
+ cgroup_cpu_path=cpu_file_v2_bad,
+ memory_allocated_path=mem_allocated_file,
+ cgroup_version=2,
+ max_rss=0)
+
+ assert parse_cpu_file(good_process_object_v1) == 1235
+ assert parse_cpu_file(good_process_object_v2) == 1234568
+
+ with pytest.raises(CylcProfilerError) as excinfo:
+ parse_cpu_file(bad_process_object_v1_1)
+ assert "Unable to find cpu usage data" in str(excinfo.value)
+ with pytest.raises(CylcProfilerError) as excinfo:
+ parse_cpu_file(bad_process_object_v1_2)
+ assert "Unable to find cpu usage data" in str(excinfo.value)
+ with pytest.raises(CylcProfilerError) as excinfo:
+ parse_cpu_file(bad_process_object_v2)
+ assert "Unable to find cpu usage data" in str(excinfo.value)
+
+
+def test_get_cgroup_name(mocker):
+ """It should return the cgroup name of the process."""
+ mock_file = mocker.mock_open(read_data="0::bad/test/cgroup/place")
+ mocker.patch("builtins.open", mock_file)
+ with pytest.raises(CylcProfilerError):
+ get_cgroup_name()
+
+ mock_file = mocker.mock_open(read_data="0::good/cgroup/place/2222222")
+ mocker.patch("builtins.open", mock_file)
+ assert get_cgroup_name() == "good/cgroup/place/2222222"
+
+
+def test_parse_memory_allocated(tmp_path_factory):
+ """It should return the memory allocated to the process."""
+ good_mem_dir = tmp_path_factory.mktemp("mem_dir")
+ mem_allocated_file = good_mem_dir / "memory.max"
+ mem_allocated_file.write_text('99999')
+
+ # We currently do not track memory allocated for cgroups v1
+ good_process_object_v1 = Process(
+ cgroup_memory_path='',
+ cgroup_cpu_path='',
+ memory_allocated_path=str(good_mem_dir),
+ cgroup_version=1,
+ max_rss=0)
+
+ good_process_object_v2 = Process(
+ cgroup_memory_path='',
+ cgroup_cpu_path='',
+ memory_allocated_path=str(good_mem_dir),
+ cgroup_version=2,
+ max_rss=0)
+
+ bad_process_object_v2_1 = Process(
+ cgroup_memory_path='',
+ cgroup_cpu_path='',
+ memory_allocated_path='/',
+ cgroup_version=2,
+ max_rss=0)
+
+ assert parse_memory_allocated(good_process_object_v1) == 0
+ assert parse_memory_allocated(good_process_object_v2) == 99999
+ with pytest.raises(CylcProfilerError) as excinfo:
+ parse_memory_file(bad_process_object_v2_1)
+ assert "Unable to find memory usage data" in str(excinfo.value)
+ # Nested directories with 'max' value
+ base_dir = tmp_path_factory.mktemp("base")
+
+ dir_1 = base_dir / "dir_1"
+ dir_1.mkdir()
+ mem_file_1 = dir_1 / "memory.max"
+ mem_file_1.write_text("max")
+
+ dir_2 = dir_1 / "dir_2"
+ dir_2.mkdir()
+ mem_file_2 = dir_2 / "memory.max"
+ mem_file_2.write_text("max")
+
+ dir_3 = dir_2 / "dir_3"
+ dir_3.mkdir()
+ mem_file_3 = dir_3 / "memory.max"
+ mem_file_3.write_text("max")
+
+ dir_4 = dir_3 / "dir_4"
+ dir_4.mkdir()
+ mem_file_4 = dir_4 / "memory.max"
+ mem_file_4.write_text("max")
+
+ dir_5 = dir_4 / "dir_5"
+ dir_5.mkdir()
+ mem_file_5 = dir_5 / "memory.max"
+ mem_file_5.write_text("max")
+
+ bad_process_object_v2_2 = Process(
+ cgroup_memory_path='',
+ cgroup_cpu_path='',
+ memory_allocated_path=str(dir_5),
+ cgroup_version=2,
+ max_rss=0)
+
+ # The function should return 0 if it cannot find a memory.max file with
+ # a value
+ assert parse_memory_allocated(bad_process_object_v2_2) == 0
+
+ # Add a memory.max file with a value to the top level directory
+ # and check it is read
+ mem_file_1.write_text("99999")
+ assert parse_memory_allocated(bad_process_object_v2_2) == 99999
+
+
+def test_get_cgroup_name_file_not_found(mocker):
+ """It should raise an error if the cgroup file is not found."""
+ def mock_os_pid():
+ return 'The Thing That Should Not Be'
+
+ mocker.patch("os.getpid", mock_os_pid)
+ with pytest.raises(CylcProfilerError) as excinfo:
+ get_cgroup_name()
+ assert "/cgroup not found" in str(excinfo.value)
+
+
+def test_get_cgroup_version(mocker):
+ """It should return the cgroup version of the process."""
+ # Mock the Path.exists function call to return True
+ mocker.patch("pathlib.Path.exists", return_value=True)
+ assert get_cgroup_version('stuff/in/place',
+ 'more_stuff') == 2
+
+ with mock.patch('pathlib.Path.exists', side_effect=[False, True]):
+ assert get_cgroup_version('stuff/in/place',
+ 'more_stuff') == 1
+
+ # Mock the Path.exists function call to return False
+ mocker.patch("pathlib.Path.exists", return_value=False)
+ with pytest.raises(CylcProfilerError) as excinfo:
+ get_cgroup_version('stuff/in/other/place',
+ 'things')
+ assert "Cgroup not found" in str(excinfo.value)
+
+
+def test_get_cgroup_paths(mocker):
+ """It should return the cgroup paths of the process."""
+ mocker.patch("cylc.flow.scripts.profiler.get_cgroup_name",
+ return_value='test_name')
+ mocker.patch("cylc.flow.scripts.profiler.get_cgroup_version",
+ return_value=2)
+ process = get_cgroup_paths("test_location/")
+ assert process.cgroup_memory_path == "test_location/test_name/memory.stat"
+ assert process.cgroup_cpu_path == "test_location/test_name/cpu.stat"
+
+ mocker.patch("cylc.flow.scripts.profiler.get_cgroup_version",
+ return_value=1)
+
+ process = get_cgroup_paths("test_location/")
+ assert (process.cgroup_memory_path ==
+ "test_location/memory/test_name/memory.stat")
+ assert (process.cgroup_cpu_path ==
+ "test_location/cpu/test_name/cpuacct.usage")
+
+ mocker.patch("cylc.flow.scripts.profiler.get_cgroup_version",
+ return_value=3)
+ with pytest.raises(CylcProfilerError) as excinfo:
+ get_cgroup_paths("test_location/")
+ assert "Unable to determine cgroup version" in str(excinfo.value)
+
+
+async def test_profile_data(mocker):
+ """Test the profile function with mocked data to ensure it calls the parse
+ functions and handles the data correctly."""
+ mocker.patch("cylc.flow.scripts.profiler.get_cgroup_name",
+ return_value='test_name')
+ mocker.patch("cylc.flow.scripts.profiler.get_cgroup_version",
+ return_value=2)
+ process = get_cgroup_paths("test_location/")
+
+ mock_file = mocker.mock_open(read_data="")
+ mocker.patch("builtins.open", mock_file)
+ mocker.patch("cylc.flow.scripts.profiler.parse_memory_file",
+ side_effect=[100, 200, 150])
+ mocker.patch("cylc.flow.scripts.profiler.parse_cpu_file",
+ return_value=2048)
+
+ # The profile function runs until the callable returns False.
+ run_count = 0
+
+ def run_four_times():
+ nonlocal run_count
+ run_count += 1
+ return run_count < 4
+
+ await profile(process, 0.1, run_four_times)
+
+ # It should have been called 4 times before run_once returned False
+ assert run_count == 4
+ # The max_rss should be the highest value from parse_memory_file
+ assert process.max_rss == 200
+
+
+@pytest.fixture
+def options(mocker):
+ opts = mocker.Mock()
+ opts.cgroup_location = "/fake/path"
+ opts.cgroup_memory_path = "/another/fake/path"
+ opts.comms_timeout = 10
+ opts.delay = 1
+ return opts
+
+
+async def test_main(mocker, options):
+ """It should run the profiler and watch and kill functions concurrently."""
+ # Mock Cylc env vars
+ os.environ['CYLC_WORKFLOW_ID'] = "Exit Light"
+ os.environ['CYLC_TASK_JOB'] = "Enter Night"
+
+ # Mock the gets and parse functions to return something sensible
+ # without needing actual files
+ mocker.patch("cylc.flow.scripts.profiler.get_cgroup_paths",
+ return_value=Process(
+ cgroup_memory_path="/some/place/memory.stat",
+ cgroup_cpu_path="/some/place/cpu.stat",
+ memory_allocated_path="/some/place",
+ cgroup_version=2,
+ max_rss=0,))
+ mocker.patch("cylc.flow.scripts.profiler.parse_memory_file",
+ return_value=1234)
+ mocker.patch("cylc.flow.scripts.profiler.parse_cpu_file",
+ return_value=5678)
+ mocker.patch("cylc.flow.scripts.profiler.parse_memory_allocated",
+ return_value=90)
+
+ mock_signal = mocker.patch("cylc.flow.scripts.profiler.signal.signal")
+ mock_profile = mocker.patch("cylc.flow.scripts.profiler.profile")
+ mock_watch_and_kill = mocker.patch(
+ "cylc.flow.scripts.profiler.watch_and_kill"
+ )
+
+ await _main(options)
+
+ # Make sure the 3 types of kill signal are registered.
+ assert mock_signal.call_count == 3
+
+ # Ensure the profiler and watch and kill functions are called by
+ # asyncio.gather
+ mock_profile.assert_called_once()
+ mock_watch_and_kill.assert_called_once()
diff --git a/tests/unit/test_job_file.py b/tests/unit/test_job_file.py
index 74d97e6c2f7..08898b57361 100644
--- a/tests/unit/test_job_file.py
+++ b/tests/unit/test_job_file.py
@@ -405,11 +405,21 @@ def test_write_task_environment():
'CYLC_TASK_NAMESPACE_HIERARCHY="baa moo"\n export '
'CYLC_TASK_TRY_NUMBER=1\n export '
'CYLC_TASK_FLOW_NUMBERS=1\n export '
+ 'CYLC_PROFILER=true\n export '
+ 'CYLC_PROFILER_CGROUPS=exit_light\n export '
+ 'CYLC_PROFILER_POLL_INTERVAL=1\n export '
'CYLC_TASK_PARAM_duck="quack"\n export '
'CYLC_TASK_PARAM_mouse="squeak"\n '
'CYLC_TASK_WORK_DIR_BASE=\'farm_noises/work_d\'\n}')
job_conf = {
- "platform": {'communication method': 'ssh'},
+ "platform": {
+ 'communication method': 'ssh',
+ 'profiler': {
+ "activate": "true",
+ "cgroups path": 'exit_light',
+ "polling interval": 1
+ }
+ },
"job_d": "1/moo/01",
"namespace_hierarchy": ["baa", "moo"],
"dependencies": ['moo', 'neigh', 'quack'],
diff --git a/tests/unit/test_task_message.py b/tests/unit/test_task_message.py
index 6894631a1ac..1a6e50e222d 100644
--- a/tests/unit/test_task_message.py
+++ b/tests/unit/test_task_message.py
@@ -21,7 +21,7 @@
from cylc.flow.task_message import send_messages
-def test_send_messages_err(
+async def test_send_messages_err(
monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture
):
"""If an error occurs while initializing the client, it should be printed.
@@ -32,7 +32,7 @@ def mock_get_client(*a, **k):
raise gaierror(-2, exc_msg)
monkeypatch.setattr('cylc.flow.task_message.get_client', mock_get_client)
- send_messages(
+ await send_messages(
'arasaka', '1/v/01', [['INFO', 'silverhand']], '2077-01-01T00:00:00Z'
)
assert f"gaierror: [Errno -2] {exc_msg}" in capsys.readouterr().err