diff --git a/pyproject.toml b/pyproject.toml index bf3f52d..3bb7d24 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,11 +29,12 @@ dependencies = [ "tables >= 3.7", "tqdm", "h5py", - "click" + "click", + "tabulate" ] [project.scripts] -topsim = "topsim.cli:cli" +topsim = "topsim.runtime.commands:cli" [project.urls] Homepage = "https://github.com/top-sim/topsim" diff --git a/test/simulation_data/heft_single_observation_simulation.json b/test/simulation_data/heft_single_observation_simulation.json index 3601245..394e9c9 100644 --- a/test/simulation_data/heft_single_observation_simulation.json +++ b/test/simulation_data/heft_single_observation_simulation.json @@ -1,57 +1,59 @@ { - "instrument": { - "telescope": { - "total_arrays": 36, - "max_ingest_resources": 1, - "pipelines": { - "emu": { - "workflow": "test/data/config/workflow_config_minutes_longtask.json", - "ingest_demand": 1 - } - }, - "observations": [ - { - "name": "emu", - "start": 1, - "duration": 10, - "instrument_demand": 36, - "data_product_rate": 0.03333333333333333 - } - ] - } - }, - "cluster": { - "header": { - }, - "system": { - "resources": { - "cat0_m0": { - "flops": 7000.0, - "compute_bandwidth": 1.0 - }, - "cat1_m1": { - "flops": 6000.0, - "compute_bandwidth": 1.0 - }, - "cat2_m2": { - "flops": 11000.0, - "compute_bandwidth": 1.0 - } - }, - "system_bandwidth": 1.0 - } - }, - "buffer": { - "hot": { - "capacity": 500, - "max_ingest_rate": 0.08333333333333333 - }, - "cold": { - "capacity": 250, - "max_data_rate": 0.03333333333333333 - } - }, - "planning": "heft", - "scheduling": "fifo", - "timestep": "minutes" -} + "instrument": { + "telescope": { + "total_arrays": 36, + "max_ingest_resources": 1, + "pipelines": { + "emu": { + "workflow": "test/data/config/workflow_config_minutes_longtask.json", + "ingest_demand": 1 + } + }, + "observations": [ + { + "name": "emu", + "start": 1, + "duration": 10, + "instrument_demand": 36, + "data_product_rate": 0.03333333333333333 + } + ] + } + }, + "cluster": { + "header": {}, + "system": { + "resources": { + "cat0": { + "compute_bandwidth": 1.0, + "flops": 7000.0, + "count": 1 + }, + "cat1": { + "compute_bandwidth": 1.0, + "flops": 6000.0, + "count": 1 + }, + "cat2": { + "compute_bandwidth": 1.0, + "flops": 11000.0, + "count": 1 + } + }, + "system_bandwidth": 1.0 + } + }, + "buffer": { + "hot": { + "capacity": 500, + "max_ingest_rate": 0.08333333333333333 + }, + "cold": { + "capacity": 250, + "max_data_rate": 0.03333333333333333 + } + }, + "planning": "heft", + "scheduling": "fifo", + "timestep": "minutes" +} \ No newline at end of file diff --git a/test/test_runtime.py b/test/test_runtime.py new file mode 100644 index 0000000..779af77 --- /dev/null +++ b/test/test_runtime.py @@ -0,0 +1,69 @@ +# Copyright (C) 2025 RW Bunney + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Test runtime.commands and runtime.parser +""" + +import unittest + +from topsim.runtime.parser import import_class, parse_experiment +from topsim.user.schedule.batch_allocation import BatchProcessing +from topsim.user.schedule.dynamic_plan import DynamicSchedulingFromPlan +from topsim.user.plan.batch_planning import BatchPlanning +from topsim.user.plan.static_planning import SHADOWPlanning + +class TestImportClass(unittest.TestCase): + + def test_import_class(self): + """ + Confirm we import classes correctly + """ + class_exists = "topsim.user.schedule.batch_allocation.BatchProcessing" + cls = import_class(class_exists) + self.assertEqual(BatchProcessing, cls) + + def test_import_class_doesnt_exist(self): + + class_fake = "topsim.user.schedule.fake_allocation.TestFake" + self.assertRaises(ModuleNotFoundError, import_class, class_fake) + + def test_import_module_node_class(self): + + module = "topsim.user.schedule.batch_allocation" + self.assertRaises(RuntimeError, import_class, module) + +class TestExperimentParser(unittest.TestCase): + + planning = ["topsim.user.plan.batch_planning.BatchPlanning", + "topsim.user.plan.static_planning.SHADOWPlanning"] + + scheduling = ["topsim.user.schedule.batch_allocation.BatchProcessing", + "topsim.user.schedule.dynamic_plan.DynamicSchedulingFromPlan"] + + data = ["NoData", "TaskEdgeData", "EdgeData"] + + def test_parse_experiment(self): + + allocator_combinations, data_combinations = parse_experiment(self.planning, + self.scheduling, + self.data) + exp_alloc_comb = [(BatchPlanning, BatchProcessing), + (SHADOWPlanning, DynamicSchedulingFromPlan)] + exp_data_comb = [ {'use_task_data': False, 'use_edge_data': False}, + {'use_task_data': True, 'use_edge_data': True}, + {'use_task_data': False, 'use_edge_data': True}] + self.assertListEqual(exp_alloc_comb, allocator_combinations) + self.assertListEqual(exp_data_comb, data_combinations) \ No newline at end of file diff --git a/test/test_simulation.py b/test/test_simulation.py index 5807abe..76f2ec8 100644 --- a/test/test_simulation.py +++ b/test/test_simulation.py @@ -50,12 +50,6 @@ def setUp(self) -> None: self.env = simpy.Environment() self.output = f'test/data/output/hdf5.h5' - def tearDown(self): - output = f'test/data/output/hdf5.h5' - os.remove(output) - # os.remove(f'{output}') - # os.remove(f'{output}-tasks.pkl') - def test_simulation_produces_file(self): simulation = Simulation( self.env, @@ -74,10 +68,19 @@ def test_simulation_produces_file(self): store = pd.HDFStore(self.output) store.close() + os.remove(self.output) - # store[f'{s}/standard_simulation/sim'] - - # def + def test_simulation_nofile_exception(self): + self.assertRaises(ValueError, Simulation, + self.env, + CONFIG, + Telescope, + planning_model=SHADOWPlanning('heft'), + scheduling=DynamicSchedulingFromPlan(), + delay=None, + timestamp=0, + to_file=True, + ) class TestSimulationBatchProcessing(unittest.TestCase): diff --git a/topsim/cli.py b/topsim/cli.py deleted file mode 100644 index 505f5c2..0000000 --- a/topsim/cli.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright (C) 2025 RW Bunney - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -""" -Command-line interface for the TopSim project -""" -import click -import sys - -from importlib.metadata import version as vs - - -@click.group() -def cli(): - """ - Command-line interface for the TOpSim simulation environment. - - This tool is used to provide an interface with Experiments so that you do not have to - write scripts to do so using the topsim.core library. - """ - -@cli.command() -# @click.option("--module", default='user.telescope') -def version(module=''): - """ - Print the current version of TOpSim - """ - click.echo(f"TOpSim: {vs('topsim')}") # using the {module} module.") - - -if __name__ == '__main__': - cli() \ No newline at end of file diff --git a/topsim/core/scheduler.py b/topsim/core/scheduler.py index 707d2b6..c7daa81 100644 --- a/topsim/core/scheduler.py +++ b/topsim/core/scheduler.py @@ -272,7 +272,7 @@ def allocate_tasks(self, observation): task_pool = set() _total_tasks = 0 # len(current_plan.tasks) _curr_tasks = 0 # len(current_plan.tasks) - _tqdm = True + _tqdm = False pbar_setup = False pbar = None _event_added = False diff --git a/topsim/core/simulation.py b/topsim/core/simulation.py index aad0790..b67b373 100644 --- a/topsim/core/simulation.py +++ b/topsim/core/simulation.py @@ -16,7 +16,7 @@ from topsim.core.delay import DelayModel LOGGER = logging.getLogger(__name__) - +HEARTBEAT_INT_SECONDS = 10*60 # Seconds class Simulation: """ @@ -193,6 +193,8 @@ def __init__( self._hdf5_store.close() except Exception as e: LOGGER.error('%s', e) + self._hdf5_store = None + elif self.to_file and hdf5_path is None: raise ValueError( 'Attempted to initialise Simulation object that outputs' @@ -257,12 +259,21 @@ def start(self, runtime=-1): self.env.process(self.scheduler.run()) self.env.process(self.buffer.run()) + heartbeat = time.monotonic() # use monotonic to avoid going backward + if runtime > 0: self.env.run(until=runtime) else: + while not self.is_finished(): self.env.run(self.env.now + 1) - # self.env.run(self.env.now + 1) + + now = time.monotonic() + if now - heartbeat > HEARTBEAT_INT_SECONDS: + wall_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + LOGGER.info("Wall time: %s / sim_time: %d", wall_time, self.env.now) + + heartbeat = now # self.env.run(self.env.now + 1) LOGGER.info("Simulation Finished @ %s", self.env.now) self.monitor.collate_events() diff --git a/topsim/runtime/__init__.py b/topsim/runtime/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/topsim/runtime/commands.py b/topsim/runtime/commands.py new file mode 100644 index 0000000..bd2a2c1 --- /dev/null +++ b/topsim/runtime/commands.py @@ -0,0 +1,150 @@ +# Copyright (C) 2025 RW Bunney + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Command-line interface for the TopSim project +""" + +import click +import sys + +import enum + +from importlib.metadata import version as vs + +from topsim.runtime.parser import parse_experiment, DataUse + +from topsim.utils.experiment import Experiment + + +# See topsim.utils.experiment.Experiment kwargs +PARAMS_DATA_COMBINATIONS = "data_combinations" +PARAMS_ALLOCATION_COMBINATIONS = "alloc_combinations" +PARAMS_SIMULATION_CONFIG = "configuration" +PARAMS_OUTPUT_DIR = "output" +PARMS_SCHEDULER_ARGUMENTS = "sched_args" + +@click.group() +def cli(): + """ + Command-line interface for the TOpSim simulation environment. + + This tool is used to provide an interface with Experiments so that you do not have to + write scripts to do so using the topsim.core library. + """ + + +@cli.command() +# @click.option("--module", default='user.telescope') +def version(module=''): + """ + Print the current version of TOpSim + """ + click.echo(f"TOpSim: {vs('topsim')}") # using the {module} module.") + + +pass_params = click.make_pass_decorator(dict, ensure=True) + +@cli.group(chain=True) +@pass_params +@click.option("-i", + "--input", + "input_config", + required=True, + help="Input configuration file for experiment.") +@click.option( + "-p", + "--planning", + "planning", + multiple=True, + default=["topsim.user.plan.batch_planning.BatchPlanning"], + help="The planning algorithm used to generate the WorkflowPlan.", +) +@click.option( + "-s", + "--scheduling", + "scheduling", + multiple=True, + default=["topsim.user.schedule.batch_allocation.BatchProcessing"], + help="The scheduling algorithm used to generate the WorkflowPlan.", +) +@click.option( + "-d", + "--data_usage", + "data", + type=click.Choice([e.value for e in DataUse], case_sensitive=False), + help="The data usage option for calculating runtime duration", + # default=[str(DataUse.EdgeData)], + multiple=True, + required=True, +) +@click.option( + "-o", + "--output_dir", + "output_dir", + type=click.Path(), + help="The output directory for where the results are stored", + required=True +) +def experiment(params, input_config, planning, scheduling, data: DataUse, + output_dir): + + pac, pdc = parse_experiment(planning, scheduling, data) + + params[PARAMS_ALLOCATION_COMBINATIONS] = pac + params[PARAMS_DATA_COMBINATIONS] = pdc + params[PARAMS_SIMULATION_CONFIG] = input_config + params[PARAMS_OUTPUT_DIR] = output_dir + + +@experiment.command() +@pass_params +@click.option( + "--ignore_ingest", + "ignore_ingest", + default=False, +) +@click.option( + "--use_workflow_dop", + "use_workflow_dop", + default=True +) +def scheduler_options(params, **kwargs): + """ + Add runtime options for scheduling heuristics + """ + params[PARMS_SCHEDULER_ARGUMENTS] = kwargs + +@experiment.command() +@pass_params +def describe(params): + e = Experiment(**params) + e.describe() + +@experiment.command() +@pass_params +def run(params): + """ + Run the experiment + + """ + click.echo(f"Running experiment with {params}") + e = Experiment( + **params + ) + e.run() + +if __name__ == '__main__': + experiment() \ No newline at end of file diff --git a/topsim/runtime/parser.py b/topsim/runtime/parser.py new file mode 100644 index 0000000..e80ad1a --- /dev/null +++ b/topsim/runtime/parser.py @@ -0,0 +1,88 @@ +# Copyright (C) 2025 RW Bunney + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Parsing logic to support the runtime.cli module. + +This is to improve the 'cleanliness' of that module, as it requires lots +of preambles for each of ther grouped commands that have 'options'. +""" +import enum +import inspect + +from importlib import import_module + +class DataUse(enum.Enum): + NoData = "NoData" + TaskData = "TaskData" + EdgeData = "EdgeData" + TaskEdgeData = "TaskEdgeData" + + def __str__(self): + return self.value + +DATA_USE_MAP = { + str(DataUse.NoData): {'use_task_data': False, 'use_edge_data': False}, + str(DataUse.TaskData): {'use_task_data': True, 'use_edge_data': False}, + str(DataUse.EdgeData): {'use_task_data': False, 'use_edge_data': True}, + str(DataUse.TaskEdgeData): {'use_task_data': True, 'use_edge_data': True} +} + +def import_class(module_path: str): + """ + Dynamically import a class from a module path string. + """ + module_name, class_name = module_path.rsplit(".", maxsplit=1) + module = import_module(module_name) + cls = getattr(module, class_name) + if inspect.isclass(cls): + return cls + else: + raise RuntimeError(f"You did not pass a class {cls}, cannot instantialise object!, cls") + + +def parse_experiment(planning, scheduling, data): + """ + Build combinations of planning, scheduling, and data experiments. + + Parameters + ---------- + planning: list, planning modules + scheduling: list, scheduling modules + data: + + Returns + ------- + allocator_combinations, data_combinations + """ + + plan_modules = [] + for p in planning: + plan_modules.append(import_class(p)) + + scheduling_modules = [] + for s in scheduling: + scheduling_modules.append(import_class(s)) + + allocator_combinations = [] + for i in range(min(len(plan_modules), len(scheduling_modules))): + allocator_combinations.append((plan_modules[i], + scheduling_modules[i])) + + data_combinations = [] + for d in data: + data_combinations.append(DATA_USE_MAP[d]) + + return allocator_combinations, data_combinations \ No newline at end of file diff --git a/topsim/user/plan/batch_planning.py b/topsim/user/plan/batch_planning.py index 7188bfd..7bd23a2 100644 --- a/topsim/user/plan/batch_planning.py +++ b/topsim/user/plan/batch_planning.py @@ -47,7 +47,7 @@ class BatchPlanning(Planning): """ - def __init__(self, algorithm, delay_model=None): + def __init__(self, algorithm="batch", delay_model=None): super().__init__(algorithm, delay_model) def __str__(self): @@ -71,7 +71,7 @@ def generate_plan(self, clock, cluster, buffer, observation, max_ingest, task_da """ plan = None - if self.algorithm is 'batch': + if self.algorithm == 'batch': graph = _workflow_to_nx(observation.workflow) est = clock # self._calc_workflow_est(observation, buffer) # new_graph = nx.DiGraph() diff --git a/topsim/user/plan/static_planning.py b/topsim/user/plan/static_planning.py index e98e586..89e4f00 100644 --- a/topsim/user/plan/static_planning.py +++ b/topsim/user/plan/static_planning.py @@ -37,13 +37,9 @@ class SHADOWPlanning(Planning): delay_model """ - def __init__(self, algorithm, delay_model=None): + def __init__(self, algorithm='heft', delay_model=None): super().__init__(algorithm, delay_model) - # self.observation = observation - # self.algorithm = algorithm - # self.buffer = buffer - # self.delay_model = delay_model def __str__(self): return 'SHADOWPlanning' diff --git a/topsim/utils/experiment.py b/topsim/utils/experiment.py index a827c73..76ae9a7 100644 --- a/topsim/utils/experiment.py +++ b/topsim/utils/experiment.py @@ -25,10 +25,12 @@ import itertools import logging +import pandas as pd import simpy from datetime import date from pathlib import Path + logging.basicConfig(level="INFO") LOGGER = logging.getLogger(__name__) @@ -37,11 +39,10 @@ # User defined models from topsim.user.telescope import Telescope # Instrument -from topsim.user.schedule.batch_allocation import BatchProcessing -from topsim.user.plan.batch_planning import BatchPlanning # Planning -from topsim.user.plan.static_planning import SHADOWPlanning -from topsim.user.schedule.dynamic_plan import DynamicSchedulingFromPlan +pretty_print_map = { + "configuration": "Simulation Configuration", +} class Experiment: """ @@ -56,21 +57,36 @@ class Experiment: def __init__( self, - configuration: list = None, + configuration: str = None, alloc_combinations: list[tuple] = None, - data_combinations: list[tuple] = None, + data_combinations: list[dict] = None, output=None, - delay: bool = False, + delay=None, **kwargs): - self._configurations = configuration + self._configuration = configuration self._combinations = list(itertools.product(alloc_combinations, data_combinations)) self._delay = delay self._output = Path(output) self._sims = [] - self.sched_args = kwargs['sched_args'] + self.sched_args = kwargs.get('sched_args', {}) self._batch = kwargs['slurm'] if 'batch' in kwargs else False + self._experiments = { + "Planning model":[], + "Scheduling model":[], + "Use task data":[], + "Use edge data":[]} + + self._built_simulations = False + + def _update_experiments(self, planning, scheduling, use_task_data, use_edge_data): + # self._experiments['Simulation configuration'].append(self._configuration) + self._experiments['Planning model'].append(planning) + self._experiments['Scheduling model'].append(scheduling) + self._experiments['Use task data'].append(use_task_data) + self._experiments['Use edge data'].append(use_edge_data) + # self._experiments['Output directory'].append(self.output) def _build_simulations(self): if not self._output.exists(): @@ -78,60 +94,30 @@ def _build_simulations(self): self._output.mkdir(parents=True) except OSError as e: LOGGER.critical("Failed to make output directory: %s", e) - for c in self._configurations: - for combination in self._combinations: - ac, dc = combination - plan, sched = ac - use_task_data, use_edge_data = dc - if plan == "batch": - plan = BatchPlanning("batch") - elif plan == "static": - plan = SHADOWPlanning("heft") - else: - raise RuntimeError("Planning '%s' is not supported", plan) - - if sched == "dynamic_plan": - sched = DynamicSchedulingFromPlan(**self.sched_args) - else: - sched = BatchProcessing(**self.sched_args) - env = simpy.Environment() - instrument = Telescope - result_path_hash = _generate_truncated_hash(c, hash_length=6) - yield Simulation(env=env, config=c, instrument=instrument, - planning_model=plan, scheduling=sched, delay=self._delay, timestamp=None, - to_file=True, - hdf5_path=f"{self._output}/results_f{date.today().isoformat()}_{result_path_hash}.h5", - use_task_data=use_task_data, use_edge_data=use_edge_data) - - def _run_batch(self): - """ - Batch experiments are single-run experiments, which means we don't run combinations - """ - c= self._configurations[0] - plan, sched = self._combinations[0] - if plan == "batch": - plan = BatchPlanning("batch") - elif plan == "static": - plan = SHADOWPlanning("heft") - else: - raise RuntimeError("Planning '%s' is not supported", plan) + for combination in self._combinations: + ac, dc = combination + plan, sched = ac + # TODO introduce delay models to experiment construction + planning_model = plan() + scheduling_model = sched(**self.sched_args) - if sched == "dynamic_plan": - sched = DynamicSchedulingFromPlan(**self.sched_args) - else: - sched = BatchProcessing(**self.sched_args) - env = simpy.Environment() - instrument = Telescope - result_path_hash = _generate_truncated_hash(c, hash_length=6) - yield Simulation(env=env, config=c, instrument=instrument, - planning_model=plan, scheduling=sched, delay=self._delay, timestamp=None, - to_file=True, - hdf5_path=f"{self._output}/results_f{date.today().isoformat()}_{result_path_hash}.h5") + use_task_data = dc.get("use_task_data", False) + use_edge_data = dc.get("use_edge_data", True) + + self._update_experiments(planning_model, scheduling_model, + use_task_data, use_edge_data) + env = simpy.Environment() + instrument = Telescope + result_path_hash = _generate_truncated_hash(self._configuration, hash_length=6) + yield Simulation(env=env, config=self._configuration, instrument=instrument, + planning_model=planning_model, scheduling=scheduling_model, delay=self._delay, timestamp=None, + to_file=True, + hdf5_path=f"{self._output}/results_f{date.today().isoformat()}_{result_path_hash}.h5", + use_task_data=use_task_data, use_edge_data=use_edge_data) + self._built_simulations = True - def _review_experiment_combinations(self): - pass def run(self, review=False, threading=False): """ @@ -166,7 +152,7 @@ def run(self, review=False, threading=False): i = 0 for s in self._build_simulations(): LOGGER.info("Simulation %s/%s running...", - i+1, len(self._combinations) * len(self._configurations)) + i + 1, len(self._combinations)) LOGGER.info("Simulation is using %s to plan and %s to schedule", s.planner.model.algorithm, s.scheduler.algorithm) print(s.planner.model.algorithm, s.scheduler.algorithm) @@ -183,6 +169,19 @@ def run(self, review=False, threading=False): LOGGER.info("Runtime: %s.", ft - st) LOGGER.info("Experiment complete.") + def describe(self): + """ + Produce a tabulated list of the experiments that are to be run within this experiment. + + Used for checking prior to running a suit of experiments. + """ + for s in self._build_simulations(): + continue + print("Experiment combinations\n") + print(f"Input: {self._configuration}\nOutput dir: {self._output}\n\n") + df = pd.DataFrame(self._experiments) + print(df.to_markdown()) + def _generate_truncated_hash(path: Path, hash_length: int ) -> str: """ Generate a truncated string hash of the pathname. This is to balance