From fa89e9d3559503f20e0e54669279b011bd71982a Mon Sep 17 00:00:00 2001 From: Alok Kamatar Date: Wed, 2 Apr 2025 14:53:59 -0500 Subject: [PATCH 1/2] Adding utilization to parallel jobs --- platforms/energy_platform_homogeneous.xml | 8 +-- src/profiles.cpp | 31 ++++++++ src/profiles.hpp | 3 + src/task_execution.cpp | 87 +++++++++++++++++++++++ test/conftest.py | 10 ++- test/test_parallel_usage.py | 49 +++++++++++++ workloads/test_parallel_usage.json | 37 ++++++++++ 7 files changed, 219 insertions(+), 6 deletions(-) create mode 100644 test/test_parallel_usage.py create mode 100644 workloads/test_parallel_usage.json diff --git a/platforms/energy_platform_homogeneous.xml b/platforms/energy_platform_homogeneous.xml index c7961d6c..da61efe8 100644 --- a/platforms/energy_platform_homogeneous.xml +++ b/platforms/energy_platform_homogeneous.xml @@ -11,25 +11,25 @@ When switching from a computing state to the state 1, passing by the virtual pstate 2 is mandatory to simulate the time and energy consumed by the switch off. When switching from the state 1 to a computing state, passing by the virtual pstate 3 is mandatory to simulate the time and energy consumed by the switch on. --> - + - + - + - + diff --git a/src/profiles.cpp b/src/profiles.cpp index d53f251f..3a67c887 100644 --- a/src/profiles.cpp +++ b/src/profiles.cpp @@ -374,6 +374,17 @@ ProfilePtr Profile::from_json(const std::string & profile_name, "elements must be non-negative", error_prefix.c_str(), profile_name.c_str()); } + // get and check util. Default to full utilization if not exists + if (json_desc.HasMember("util")) { + xbt_assert(json_desc["util"].IsNumber(), "%s: profile '%s' has a non-number 'util' field", + error_prefix.c_str(), profile_name.c_str()); + data->util = json_desc["util"].GetDouble(); + xbt_assert(data->util > 0.0 && data->util <= 1.0 , "%s: profile '%s' has an invalid 'util' field (%g)", + error_prefix.c_str(), profile_name.c_str(), data->util); + } else { + data->util = 1.0; + } + profile->data = data; } else if (profile_type == "parallel_homogeneous") @@ -404,6 +415,16 @@ ProfilePtr Profile::from_json(const std::string & profile_name, xbt_assert(data->com >= 0, "%s: profile '%s' has a non-positive 'com' field (%g)", error_prefix.c_str(), profile_name.c_str(), data->com); + if (json_desc.HasMember("util")) { + xbt_assert(json_desc["util"].IsNumber(), "%s: profile '%s' has a non-number 'util' field", + error_prefix.c_str(), profile_name.c_str()); + data->util = json_desc["util"].GetDouble(); + xbt_assert(data->util > 0.0 && data->util <= 1.0 , "%s: profile '%s' has an invalid 'util' field (%g)", + error_prefix.c_str(), profile_name.c_str(), data->util); + } else { + data->util = 1.0; + } + profile->data = data; } else if (profile_type == "parallel_homogeneous_total") @@ -434,6 +455,16 @@ ProfilePtr Profile::from_json(const std::string & profile_name, xbt_assert(data->com >= 0, "%s: profile '%s' has a non-positive 'com' field (%g)", error_prefix.c_str(), profile_name.c_str(), data->com); + if (json_desc.HasMember("util")) { + xbt_assert(json_desc["util"].IsNumber(), "%s: profile '%s' has a non-number 'util' field", + error_prefix.c_str(), profile_name.c_str()); + data->util = json_desc["util"].GetDouble(); + xbt_assert(data->util > 0.0 && data->util <= 1.0 , "%s: profile '%s' has an invalid 'util' field (%g)", + error_prefix.c_str(), profile_name.c_str(), data->util); + } else { + data->util = 1.0; + } + profile->data = data; } else if (profile_type == "composed") diff --git a/src/profiles.hpp b/src/profiles.hpp index 1b44d0cd..fbed4964 100644 --- a/src/profiles.hpp +++ b/src/profiles.hpp @@ -102,6 +102,7 @@ struct ParallelProfileData unsigned int nb_res; //!< The number of resources double * cpu = nullptr; //!< The computation vector double * com = nullptr; //!< The communication matrix + double util; //!< The cpu utilization on each node }; /** @@ -111,6 +112,7 @@ struct ParallelHomogeneousProfileData { double cpu; //!< The computation amount on each node double com; //!< The communication amount between each pair of nodes + double util; //!< The cpu utilization on each node }; /** @@ -120,6 +122,7 @@ struct ParallelHomogeneousTotalAmountProfileData { double cpu; //!< The computation amount to spread over the nodes double com; //!< The communication amount to spread over each pair of nodes + double util; //!< The cpu utilization on each node }; /** * @brief The data associated to DELAY profiles diff --git a/src/task_execution.cpp b/src/task_execution.cpp index b5cb757d..8e4b352e 100644 --- a/src/task_execution.cpp +++ b/src/task_execution.cpp @@ -21,11 +21,13 @@ using namespace roles; * parallel task profile. Also set the prefix name of the task. * @param[out] computation_amount the computation matrix to be simulated by the parallel task * @param[out] communication_amount the communication matrix to be simulated by the parallel task + * @param[out] utilization the max CPU utilization to be simulated (as a decimal between 0 and 1) * @param[in] nb_res the number of resources the task have to run on * @param[in] profile_data the profile data */ void generate_parallel_task(std::vector& computation_amount, std::vector& communication_amount, + double& utilization, unsigned int nb_res, void * profile_data) { @@ -42,6 +44,7 @@ void generate_parallel_task(std::vector& computation_amount, // Retrieve the matrices from the profile memcpy(computation_amount.data(), data->cpu, sizeof(double) * nb_res); memcpy(communication_amount.data(), data->com, sizeof(double) * nb_res * nb_res); + utilization = data->util; } /** @@ -49,11 +52,13 @@ void generate_parallel_task(std::vector& computation_amount, * parallel homogeneous task profile. Also set the prefix name of the task. * @param[out] computation_amount the computation matrix to be simulated by the parallel task * @param[out] communication_amount the communication matrix to be simulated by the parallel task + * @param[out] utilization the max CPU utilization to be simulated (as a decimal between 0 and 1) * @param[in] nb_res the number of resources the task have to run on * @param[in] profile_data the profile data */ void generate_parallel_homogeneous(std::vector& computation_amount, std::vector& communication_amount, + double& utilization, unsigned int nb_res, void * profile_data) { @@ -61,6 +66,7 @@ void generate_parallel_homogeneous(std::vector& computation_amount, double cpu = data->cpu; double com = data->com; + utilization = data->util; // Prepare buffers computation_amount.reserve(nb_res); @@ -100,6 +106,7 @@ void generate_parallel_homogeneous(std::vector& computation_amount, * * @param[out] computation_amount the computation matrix to be simulated by the parallel task * @param[out] communication_amount the communication matrix to be simulated by the parallel task + * @param[out] utilization the max CPU utilization to be simulated (as a decimal between 0 and 1) * @param[in] nb_res the number of resources the task have to run on * @param[in] profile_data the profile data * @@ -109,6 +116,7 @@ void generate_parallel_homogeneous(std::vector& computation_amount, */ void generate_parallel_homogeneous_total_amount(std::vector& computation_amount, std::vector& communication_amount, + double& utilization, unsigned int nb_res, void * profile_data) { @@ -147,6 +155,8 @@ void generate_parallel_homogeneous_total_amount(std::vector& computation } } } + + utilization = data->util; } /** @@ -155,6 +165,7 @@ void generate_parallel_homogeneous_total_amount(std::vector& computation * * @param[out] computation_amount the computation matrix to be simulated by the parallel task * @param[out] communication_amount the communication matrix to be simulated by the parallel task + * @param[out] utilization the max CPU utilization to be simulated (as a decimal between 0 and 1) * @param[in,out] hosts_to_use the list of host to be used by the task * @param[in] storage_mapping mapping from label given in the profile and machine id * @param[in] profile_data the profile data @@ -165,6 +176,7 @@ void generate_parallel_homogeneous_total_amount(std::vector& computation */ void generate_parallel_homogeneous_with_pfs(std::vector& computation_amount, std::vector& communication_amount, + double& utilization, std::vector & hosts_to_use, const std::map * storage_mapping, void * profile_data, @@ -243,6 +255,8 @@ void generate_parallel_homogeneous_with_pfs(std::vector& computation_amo } } } + + utilization = 1.0; } /** @@ -253,6 +267,7 @@ void generate_parallel_homogeneous_with_pfs(std::vector& computation_amo * name of the task. * @param[out] computation_amount the computation matrix to be simulated by the parallel task * @param[out] communication_amount the communication matrix to be simulated by the parallel task + * @param[out] utilization the max CPU utilization to be simulated (as a decimal between 0 and 1) * @param[in,out] hosts_to_use the list of host to be used by the task * @param[in] storage_mapping mapping from label given in the profile and machine id * @param[in] profile_data the profile data @@ -260,6 +275,7 @@ void generate_parallel_homogeneous_with_pfs(std::vector& computation_amo */ void generate_data_staging_task(std::vector& computation_amount, std::vector& communication_amount, + double& utilization, std::vector & hosts_to_use, const std::map * storage_mapping, void * profile_data, @@ -330,12 +346,15 @@ void generate_data_staging_task(std::vector& computation_amount, } } } + + utilization = 1.0; } /** * @brief Debug print of a parallel task (via XBT_DEBUG) * @param[in] computation_vector The ptask computation vector * @param[in] communication_matrix The ptask communication matrix + * @param[in] utilization the max CPU utilization d * @param[in] nb_res The number of hosts involved in the parallel task * @param[in] alloc The resource ids allocated for the parallel task * @param[in] mapping The mapping between executor id and resource id, if any @@ -374,6 +393,7 @@ void debug_print_ptask(const std::vector& computation_vector, * @brief * @param[out] computation_vector The computation vector to be simulated by the parallel task * @param[out] communication_matrix The communication matrix to be simulated by the parallel task + * @param[out] utilization the max CPU utilization to be simulated (as a decimal between 0 and 1) * @param[in,out] hosts_to_use The list of host to be used by the task * @param[in] profile The profile to be converted to a compute/comm matrix * @param[in] storage_mapping The storage mapping @@ -381,6 +401,7 @@ void debug_print_ptask(const std::vector& computation_vector, */ void generate_matrices_from_profile(std::vector& computation_vector, std::vector& communication_matrix, + double& utilization, std::vector & hosts_to_use, ProfilePtr profile, const std::map * storage_mapping, @@ -396,24 +417,28 @@ void generate_matrices_from_profile(std::vector& computation_vector, case ProfileType::PARALLEL: generate_parallel_task(computation_vector, communication_matrix, + utilization, nb_res, profile->data); break; case ProfileType::PARALLEL_HOMOGENEOUS: generate_parallel_homogeneous(computation_vector, communication_matrix, + utilization, nb_res, profile->data); break; case ProfileType::PARALLEL_HOMOGENEOUS_TOTAL_AMOUNT: generate_parallel_homogeneous_total_amount(computation_vector, communication_matrix, + utilization, nb_res, profile->data); break; case ProfileType::PARALLEL_HOMOGENEOUS_PFS: generate_parallel_homogeneous_with_pfs(computation_vector, communication_matrix, + utilization, hosts_to_use, storage_mapping, profile->data, @@ -422,6 +447,7 @@ void generate_matrices_from_profile(std::vector& computation_vector, case ProfileType::DATA_STAGING: generate_data_staging_task(computation_vector, communication_matrix, + utilization, hosts_to_use, storage_mapping, profile->data, @@ -462,6 +488,60 @@ void check_ptask_execution_permission(const IntervalSet & alloc, } } +/** + * @brief + * @param[in, out] computation_vector The computation vector to be simulated by the parallel task + * @param[in, out] communication_matrix The communication matrix to be simulated by the parallel task + * @param[in,out] hosts_to_use The list of host to be used by the task + */ +void replicate_for_cores(std::vector& computation_vector, + std::vector& communication_matrix, + std::vector & hosts_to_use, + double usage) +{ + // compute how many cores should be used depending on usage and on which host is used + const double nb_cores = simgrid::s4u::this_actor::get_host()->get_core_count(); + const int nb_cores_to_use = std::max(round(usage * nb_cores), 1.0); // use at least 1 core, otherwise using flops is impossible + const int nb_hosts = hosts_to_use.size(); + + std::vector hosts_copy(hosts_to_use); + std::vector computation_copy(computation_vector); + std::vector communication_copy(communication_matrix); + hosts_to_use.reserve(nb_cores_to_use * nb_hosts); + computation_vector.reserve(nb_cores_to_use * nb_hosts); + + for (int i = 0; i < nb_cores_to_use - 1; ++i) { + for (auto host : hosts_copy) { + hosts_to_use.push_back(host); + } + } + + for (int i = 0; i < nb_cores_to_use - 1; ++i) { + for (auto computation : computation_copy) { + computation_vector.push_back(computation); + } + } + + if(!communication_matrix.empty()) { + communication_matrix.clear(); + communication_matrix.reserve(nb_cores_to_use * nb_hosts * nb_cores_to_use * nb_cores); + for(int row = 0; row < nb_hosts; row++) { + for(int col = 0; col < nb_hosts; col++) { + communication_matrix.push_back(communication_copy[(row * nb_hosts) + col]); + } + for(int col = nb_hosts; col < (nb_cores_to_use - 1) * nb_hosts; col++) { + communication_matrix.push_back(0); + } + } + + for(int row = nb_hosts; row < (nb_cores_to_use - 1) * nb_hosts; row++) { + for(int col = 0; col < nb_cores_to_use * nb_hosts; col++) { + communication_matrix.push_back(0); + } + } + } +} + int execute_parallel_task(BatTask * btask, const SchedulingAllocation* allocation, double * remaining_time, @@ -472,6 +552,7 @@ int execute_parallel_task(BatTask * btask, std::vector computation_vector; std::vector communication_matrix; + double utilization; string task_name = profile_type_to_string(profile->type) + '_' + static_cast(btask->parent_job)->id.to_string() + "_" + btask->profile->name; @@ -480,6 +561,7 @@ int execute_parallel_task(BatTask * btask, generate_matrices_from_profile(computation_vector, communication_matrix, + utilization, hosts_to_use, profile, & allocation->storage_mapping, @@ -500,6 +582,7 @@ int execute_parallel_task(BatTask * btask, std::vector io_hosts = allocation->io_hosts; generate_matrices_from_profile(io_computation_vector, io_communication_matrix, + utilization, io_hosts, io_profile, nullptr, @@ -640,6 +723,10 @@ int execute_parallel_task(BatTask * btask, simgrid::s4u::ExecPtr ptask = simgrid::s4u::this_actor::exec_init(hosts_to_use, computation_vector, communication_matrix); ptask->set_name(task_name.c_str()); + // double flops = ptask->get_host()->get_speed() * utilization; + // XBT_DEBUG("Setting utilization of '%s' on %zu resources to %g", task_name.c_str(), hosts_to_use.size(), flops); + // ptask->set_bound(flops); // Set bound can only be set per task not per host + // Keep track of the task to get information on kill btask->ptask = ptask; diff --git a/test/conftest.py b/test/conftest.py index 4196de79..a71bb63a 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -62,6 +62,7 @@ def pytest_generate_tests(metafunc): "cluster512_pfs": "cluster512_pfs.xml", "energy128notopo": "energy_platform_homogeneous_no_net_128.xml", "energy128cluster": "cluster_energy_128.xml", + "energysmall": "energy_platform_homogeneous.xml", "properties_platform": "properties_example.xml", } energy_platforms = ["energy128notopo", "energy128cluster"] @@ -91,8 +92,9 @@ def pytest_generate_tests(metafunc): "usagetrace": "test_usage_trace.json", "walltime": "test_walltime.json", "walltimesmpi": "test_walltime_smpi.json", + "parallel_usage": "test_parallel_usage.json" } - one_job_workloads = ["delay1", "compute1", "computetot1"] + one_job_workloads = ["delay1", "compute1", "computetot1", "compute1util"] small_workloads = ["delays", "delaysequences", "mixed"] smpi_workloads = ["smpicomp1", "smpicomp2", "smpimapping", "smpimixed", "smpicollectives"] dynsub_workloads = ["delay1", "mixed"] @@ -148,10 +150,12 @@ def pytest_generate_tests(metafunc): metafunc.parametrize('properties_platform', generate_platforms(platform_dir, platforms_def, ['properties_platform'])) if 'usage_trace_platform' in metafunc.fixturenames: metafunc.parametrize('usage_trace_platform', generate_platforms(platform_dir, platforms_def, ['smallusage'])) + if 'energy_small_platform' in metafunc.fixturenames: + metafunc.parametrize('energy_small_platform', generate_platforms(platform_dir, platforms_def, ['energysmall'])) # Workloads if 'workload' in metafunc.fixturenames: - metafunc.parametrize('workload', generate_workloads(workload_dir, workloads_def, [key for key in workload_def])) + metafunc.parametrize('workload', generate_workloads(workload_dir, workloads_def, [key for key in workloads_def])) if 'workflow' in metafunc.fixturenames: metafunc.parametrize('workflow', generate_workloads(workload_dir, workloads_def, [key for key in workflows])) if 'tuto_stencil_workload' in metafunc.fixturenames: @@ -184,6 +188,8 @@ def pytest_generate_tests(metafunc): metafunc.parametrize('delaysequences_workload', generate_workloads(workload_dir, workloads_def, ['delaysequences'])) if 'mixed_workload' in metafunc.fixturenames: metafunc.parametrize('mixed_workload', generate_workloads(workload_dir, workloads_def, ['mixed'])) + if 'parallel_usage_workload' in metafunc.fixturenames: + metafunc.parametrize('parallel_usage_workload', generate_workloads(workload_dir, workloads_def, ["parallel_usage"])) # External Events if 'simple_events' in metafunc.fixturenames: diff --git a/test/test_parallel_usage.py b/test/test_parallel_usage.py new file mode 100644 index 00000000..f501a868 --- /dev/null +++ b/test/test_parallel_usage.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 +'''Usage trace tests. + +These tests check that the energy consumption of usage trace profiles are the expected ones. +''' +import pandas as pd +import pytest +from helper import * + +widle = 30 +wmin = 30 +wmax = 90 + +def joule_prediction(time, wattmin, wattmax, usage): + return time*(wattmin + (wattmax-wattmin)*usage) + +def check_ok_bool(row): + if int(row['execution_time']) != int(row['expected_execution_time']): + return False + + if int(row['consumed_energy']) != int(row['expected_consumed_energy']): + return False + + return True + +def check_ok(row): + return int(check_ok_bool(row)) + +def usage_trace(platform, workload, algorithm): + test_name = f'paralleluage-{algorithm.name}-{platform.name}-{workload.name}' + output_dir, robin_filename, _ = init_instance(test_name) + + if algorithm.sched_implem != 'batsched': raise Exception('This test only supports batsched for now') + + batcmd = gen_batsim_cmd(platform.filename, workload.filename, output_dir, "--energy") + instance = RobinInstance(output_dir=output_dir, + batcmd=batcmd, + schedcmd=f"batsched -v '{algorithm.sched_algo_name}'", + simulation_timeout=30, ready_timeout=5, + success_timeout=10, failure_timeout=0 + ) + + instance.to_file(robin_filename) + ret = run_robin(robin_filename) + if ret.returncode != 0: raise Exception(f'Bad robin return code ({ret.returncode})') + + +def test_usage_trace(energy_small_platform, parallel_usage_workload, fcfs_algorithm): + usage_trace(energy_small_platform, parallel_usage_workload, fcfs_algorithm) diff --git a/workloads/test_parallel_usage.json b/workloads/test_parallel_usage.json new file mode 100644 index 00000000..d084245b --- /dev/null +++ b/workloads/test_parallel_usage.json @@ -0,0 +1,37 @@ +{ + "nb_res": 4, + "jobs": [ + {"id":1, "subtime":0, "walltime": 100, "res": 4, "profile": "low"}, + {"id":2, "subtime":0, "walltime": 100, "res": 4, "profile": "high"}, + + {"id":3, "subtime":0, "walltime": 100, "res": 4, "profile": "comm_bound_low"}, + {"id":4, "subtime":0, "walltime": 100, "res": 4, "profile": "comm_bound_high"} + ], + + "profiles": { + "low": { + "type": "parallel_homogeneous", + "cpu": 1e9, + "com": 1e5, + "util": 0.1 + }, + "high": { + "type": "parallel_homogeneous", + "cpu": 1e9, + "com": 1e5, + "util": 1.0 + }, + "comm_bound_low": { + "type": "parallel_homogeneous", + "cpu": 0, + "com": 1e5, + "util": 0.1 + }, + "comm_bound_high": { + "type": "parallel_homogeneous", + "cpu": 0, + "com": 1e5, + "util": 1.0 + } + } +} From 15b9808e7ec18188462bf63309d68964b6554ab5 Mon Sep 17 00:00:00 2001 From: Alok Kamatar Date: Thu, 3 Apr 2025 11:35:49 -0500 Subject: [PATCH 2/2] Adding cores back to task execution and making tests robust --- src/task_execution.cpp | 10 ++++++---- test/test_parallel_usage.py | 21 +++++++++++++++++++++ 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/src/task_execution.cpp b/src/task_execution.cpp index 8e4b352e..694dafb3 100644 --- a/src/task_execution.cpp +++ b/src/task_execution.cpp @@ -504,6 +504,8 @@ void replicate_for_cores(std::vector& computation_vector, const int nb_cores_to_use = std::max(round(usage * nb_cores), 1.0); // use at least 1 core, otherwise using flops is impossible const int nb_hosts = hosts_to_use.size(); + XBT_DEBUG("Replicating computation to use %d cores per machine", nb_cores_to_use); + std::vector hosts_copy(hosts_to_use); std::vector computation_copy(computation_vector); std::vector communication_copy(communication_matrix); @@ -524,17 +526,17 @@ void replicate_for_cores(std::vector& computation_vector, if(!communication_matrix.empty()) { communication_matrix.clear(); - communication_matrix.reserve(nb_cores_to_use * nb_hosts * nb_cores_to_use * nb_cores); + communication_matrix.reserve(nb_cores_to_use * nb_hosts * nb_cores_to_use * nb_hosts); for(int row = 0; row < nb_hosts; row++) { for(int col = 0; col < nb_hosts; col++) { communication_matrix.push_back(communication_copy[(row * nb_hosts) + col]); } - for(int col = nb_hosts; col < (nb_cores_to_use - 1) * nb_hosts; col++) { + for(int col = nb_hosts; col < nb_cores_to_use* nb_hosts; col++) { communication_matrix.push_back(0); } } - for(int row = nb_hosts; row < (nb_cores_to_use - 1) * nb_hosts; row++) { + for(int row = nb_hosts; row < nb_cores_to_use * nb_hosts; row++) { for(int col = 0; col < nb_cores_to_use * nb_hosts; col++) { communication_matrix.push_back(0); } @@ -719,7 +721,7 @@ int execute_parallel_task(BatTask * btask, // Create the parallel task XBT_DEBUG("Creating parallel task '%s' on %zu resources", task_name.c_str(), hosts_to_use.size()); - + replicate_for_cores(computation_vector, communication_matrix, hosts_to_use, utilization); simgrid::s4u::ExecPtr ptask = simgrid::s4u::this_actor::exec_init(hosts_to_use, computation_vector, communication_matrix); ptask->set_name(task_name.c_str()); diff --git a/test/test_parallel_usage.py b/test/test_parallel_usage.py index f501a868..50b04e6b 100644 --- a/test/test_parallel_usage.py +++ b/test/test_parallel_usage.py @@ -44,6 +44,27 @@ def usage_trace(platform, workload, algorithm): ret = run_robin(robin_filename) if ret.returncode != 0: raise Exception(f'Bad robin return code ({ret.returncode})') + batjobs_filename = f'{output_dir}/batres_jobs.csv' + jobs = pd.read_csv(batjobs_filename) + jobs['job_id'] = jobs['job_id'].astype('string') + jobs.sort_values(by=['job_id'], inplace=True) + + expected = [ + ['1', 10, 4 * joule_prediction(10, wmin, wmax, 0.1)], + ['2', 10, 4 * joule_prediction(10, wmin, wmax, 0.1)], + + ['3', 0.086, 4 * joule_prediction(10, wmin, wmax, 0)], + ['4', 0.086, 4 * joule_prediction(10, wmin, wmax, 0)] + ] + expected_df = pd.DataFrame(expected, columns = ['job_id', 'expected_execution_time', 'expected_consumed_energy']) + + merged = pd.merge(jobs, expected_df) + assert(len(merged) != len(jobs), "The number of jobs completed does not match the expected amount.") + + merged['valid'] = merged.apply(check_ok, axis=1) + print(merged[["job_id", "valid", "execution_time", "expected_execution_time", "consumed_energy", "expected_consumed_energy"]]) + assert(merged['valid'].sum() != len(merged), 'The execution of some jobs did not match this test expectations.') + def test_usage_trace(energy_small_platform, parallel_usage_workload, fcfs_algorithm): usage_trace(energy_small_platform, parallel_usage_workload, fcfs_algorithm)