diff --git a/codecarbon/core/cpu.py b/codecarbon/core/cpu.py index 74549a14f..ad74e4481 100644 --- a/codecarbon/core/cpu.py +++ b/codecarbon/core/cpu.py @@ -12,9 +12,16 @@ from typing import Dict, Optional, Tuple import pandas as pd -import psutil from rapidfuzz import fuzz, process, utils +try: + import psutil + + PSUTIL_AVAILABLE = True +except ImportError: + PSUTIL_AVAILABLE = False + psutil = None + from codecarbon.core.rapl import RAPLFile from codecarbon.core.units import Time from codecarbon.core.util import count_cpus, detect_cpu_model @@ -207,6 +214,9 @@ def is_rapl_available(rapl_dir: Optional[str] = None) -> bool: def is_psutil_available(): + if not PSUTIL_AVAILABLE: + logger.debug("psutil module is not available.") + return False try: cpu_times = psutil.cpu_times() diff --git a/codecarbon/core/resource_tracker.py b/codecarbon/core/resource_tracker.py index 67786189d..326d8536d 100644 --- a/codecarbon/core/resource_tracker.py +++ b/codecarbon/core/resource_tracker.py @@ -127,16 +127,39 @@ def _setup_fallback_tracking(self, tdp, max_power): self.cpu_tracker = "TDP constant" model = tdp.model - if (max_power is None) and self.tracker._force_cpu_power: + if self.tracker._force_cpu_power is not None: user_input_power = self.tracker._force_cpu_power logger.debug(f"Using user input TDP: {user_input_power} W") self.cpu_tracker = "User Input TDP constant" - max_power = user_input_power + if max_power is None: + max_power = user_input_power logger.info(f"CPU Model on constant consumption mode: {model}") self.tracker._conf["cpu_model"] = model - if tdp: + # Check for forced constant mode first + if self.tracker._conf.get("force_mode_constant", False): + logger.info( + "Force constant mode requested - bypassing psutil and using constant CPU power" + ) + model = tdp.model + if max_power is None and self.tracker._force_cpu_power: + max_power = self.tracker._force_cpu_power + logger.debug(f"Using user input TDP for constant mode: {max_power} W") + self.cpu_tracker = "User Input TDP constant" + else: + self.cpu_tracker = "TDP constant" + logger.info(f"CPU Model on forced constant consumption mode: {model}") + self.tracker._conf["cpu_model"] = model + hardware_cpu = CPU.from_utils( + self.tracker._output_dir, "constant", model, max_power + ) + self.tracker._hardware.append(hardware_cpu) + return + + if self.tracker._conf.get("force_mode_cpu_load", False) and ( + tdp.tdp is not None or self.tracker._force_cpu_power is not None + ): if cpu.is_psutil_available(): logger.warning( "No CPU tracking mode found. Falling back on CPU load mode." @@ -156,7 +179,8 @@ def _setup_fallback_tracking(self, tdp, max_power): hardware_cpu = CPU.from_utils( self.tracker._output_dir, "constant", model, max_power ) - self.cpu_tracker = "global constant" + if max_power is None: + self.cpu_tracker = "global constant" self.tracker._hardware.append(hardware_cpu) else: if cpu.is_psutil_available(): @@ -172,11 +196,19 @@ def _setup_fallback_tracking(self, tdp, max_power): ) self.cpu_tracker = MODE_CPU_LOAD else: - logger.warning( - "Failed to match CPU TDP constant. Falling back on a global constant." - ) - self.cpu_tracker = "global constant" - hardware_cpu = CPU.from_utils(self.tracker._output_dir, "constant") + if max_power is None: + logger.warning( + "Failed to match CPU TDP constant. Falling back on a global constant." + ) + self.cpu_tracker = "global constant" + hardware_cpu = CPU.from_utils(self.tracker._output_dir, "constant") + else: + logger.warning( + "No CPU tracking mode found. Falling back on CPU constant mode." + ) + hardware_cpu = CPU.from_utils( + self.tracker._output_dir, "constant", model, max_power + ) self.tracker._hardware.append(hardware_cpu) def set_CPU_tracking(self): @@ -192,6 +224,24 @@ def set_CPU_tracking(self): self.cpu_tracker = "User Input TDP constant" max_power = self.tracker._force_cpu_power + # Force constant mode takes precedence over every other CPU tracking backend. + if self.tracker._conf.get("force_mode_constant", False): + if tdp is None: + tdp = cpu.TDP() + if max_power is None: + max_power = tdp.tdp * cpu_number if tdp.tdp is not None else None + + logger.info( + "Force constant mode requested - bypassing dynamic CPU power backends" + ) + self.cpu_tracker = "TDP constant" + self.tracker._conf["cpu_model"] = tdp.model + hardware_cpu = CPU.from_utils( + self.tracker._output_dir, "constant", tdp.model, max_power + ) + self.tracker._hardware.append(hardware_cpu) + return + # Try force CPU load mode if requested if self.tracker._conf.get("force_mode_cpu_load", False): if tdp is None: diff --git a/codecarbon/core/util.py b/codecarbon/core/util.py index 744b2e3e5..afa65e054 100644 --- a/codecarbon/core/util.py +++ b/codecarbon/core/util.py @@ -9,7 +9,14 @@ from typing import Optional, Union import cpuinfo -import psutil + +try: + import psutil + + PSUTIL_AVAILABLE = True +except ImportError: + PSUTIL_AVAILABLE = False + psutil = None from codecarbon.external.logger import logger @@ -146,6 +153,12 @@ def _windows_get_physical_sockets(): def count_cpus() -> int: + if not PSUTIL_AVAILABLE: + logger.warning("psutil not available, using fallback CPU count detection") + # Fallback to using os.cpu_count() or physical CPU count + cpu_count = os.cpu_count() + return cpu_count if cpu_count is not None else 1 + if SLURM_JOB_ID is None: return psutil.cpu_count() diff --git a/codecarbon/emissions_tracker.py b/codecarbon/emissions_tracker.py index ad712c9f0..0336f6bee 100644 --- a/codecarbon/emissions_tracker.py +++ b/codecarbon/emissions_tracker.py @@ -197,6 +197,7 @@ def __init__( wue: Optional[float] = _sentinel, force_carbon_intensity_g_co2e_kwh: Optional[float] = _sentinel, force_mode_cpu_load: Optional[bool] = _sentinel, + force_mode_constant: Optional[bool] = _sentinel, allow_multiple_runs: Optional[bool] = _sentinel, rapl_include_dram: Optional[bool] = _sentinel, rapl_prefer_psys: Optional[bool] = _sentinel, @@ -275,6 +276,8 @@ def __init__( (CPU + chipset + PCIe). When False, uses package domains which are more reliable. Note: psys can report higher values than CPU TDP and may be unreliable on older systems. + :param force_mode_constant: Force the addition of a CPU in constant mode, bypassing psutil + :param allow_multiple_runs: Allow multiple instances of codecarbon running in parallel. Defaults to False. """ # logger.info("base tracker init") @@ -377,6 +380,7 @@ def __init__( self._set_from_conf(force_mode_cpu_load, "force_mode_cpu_load", False, bool) self._set_from_conf(rapl_include_dram, "rapl_include_dram", False, bool) self._set_from_conf(rapl_prefer_psys, "rapl_prefer_psys", False, bool) + self._set_from_conf(force_mode_constant, "force_mode_constant", False, bool) self._set_from_conf( experiment_id, "experiment_id", "5b0fa12a-3dd7-45bb-9766-cc326314d9f1" ) diff --git a/tests/test_force_constant_mode.py b/tests/test_force_constant_mode.py new file mode 100644 index 000000000..4b1ca8429 --- /dev/null +++ b/tests/test_force_constant_mode.py @@ -0,0 +1,146 @@ +import os +import tempfile +import time +import unittest +from unittest import mock + +import pandas as pd + +from codecarbon.emissions_tracker import ( + EmissionsTracker, + OfflineEmissionsTracker, +) + + +def light_computation(run_time_secs: int = 1): + end_time: float = ( + time.perf_counter() + run_time_secs + ) # Run for `run_time_secs` seconds + while time.perf_counter() < end_time: + pass + + +class TestForceConstantMode(unittest.TestCase): + def setUp(self) -> None: + self.project_name = "project_TestForceConstantMode" + self.emissions_file = "emissions-test-TestForceConstantMode.csv" + self.emissions_path = tempfile.gettempdir() + self.emissions_file_path = os.path.join( + self.emissions_path, self.emissions_file + ) + if os.path.isfile(self.emissions_file_path): + os.remove(self.emissions_file_path) + + def tearDown(self) -> None: + if os.path.isfile(self.emissions_file_path): + os.remove(self.emissions_file_path) + + def test_force_constant_mode_online(self): + """Test force_mode_constant parameter with online tracker""" + tracker = EmissionsTracker( + output_dir=self.emissions_path, + output_file=self.emissions_file, + force_mode_constant=True, + ) + tracker.start() + light_computation(run_time_secs=1) + emissions = tracker.stop() + + # Check that emissions were calculated + assert isinstance(emissions, float) + self.assertNotEqual(emissions, 0.0) + + # Verify output file was created + self.verify_output_file(self.emissions_file_path) + + # Check CSV content shows constant mode + df = pd.read_csv(self.emissions_file_path) + # The cpu_power should be a constant value (not varying like in load mode) + self.assertGreater(df["cpu_power"].iloc[0], 0) + + def test_force_constant_mode_offline(self): + """Test force_mode_constant parameter with offline tracker""" + tracker = OfflineEmissionsTracker( + country_iso_code="USA", + output_dir=self.emissions_path, + output_file=self.emissions_file, + force_mode_constant=True, + ) + tracker.start() + light_computation(run_time_secs=1) + emissions = tracker.stop() + + assert isinstance(emissions, float) + self.assertNotEqual(emissions, 0.0) + self.verify_output_file(self.emissions_file_path) + + def test_force_constant_mode_with_custom_cpu_power(self): + """Test force_mode_constant with custom CPU power""" + custom_cpu_power = 200 # 200W + tracker = EmissionsTracker( + output_dir=self.emissions_path, + output_file=self.emissions_file, + force_mode_constant=True, + force_cpu_power=custom_cpu_power, + ) + tracker.start() + light_computation(run_time_secs=1) + emissions = tracker.stop() + + assert isinstance(emissions, float) + self.assertNotEqual(emissions, 0.0) + + # Check that the custom CPU power was used + df = pd.read_csv(self.emissions_file_path) + # CPU power should be 50% of the TDP (constant mode assumption) + expected_cpu_power = custom_cpu_power / 2 + self.assertEqual(df["cpu_power"].iloc[0], expected_cpu_power) + + @mock.patch("codecarbon.core.cpu.PSUTIL_AVAILABLE", False) + @mock.patch("codecarbon.core.util.PSUTIL_AVAILABLE", False) + def test_force_constant_mode_without_psutil(self): + """Test that force_mode_constant works when psutil is not available""" + tracker = EmissionsTracker( + output_dir=self.emissions_path, + output_file=self.emissions_file, + force_mode_constant=True, + ) + tracker.start() + light_computation(run_time_secs=1) + emissions = tracker.stop() + + assert isinstance(emissions, float) + self.assertNotEqual(emissions, 0.0) + self.verify_output_file(self.emissions_file_path) + + def test_force_constant_mode_takes_precedence_over_cpu_load(self): + """Test that force_mode_constant takes precedence over force_mode_cpu_load""" + tracker = EmissionsTracker( + output_dir=self.emissions_path, + output_file=self.emissions_file, + force_mode_constant=True, + force_mode_cpu_load=True, # This should be ignored + ) + tracker.start() + light_computation(run_time_secs=1) + emissions = tracker.stop() + + assert isinstance(emissions, float) + self.assertNotEqual(emissions, 0.0) + self.verify_output_file(self.emissions_file_path) + + def verify_output_file(self, file_path: str) -> None: + """Verify that the output CSV file exists and has expected structure""" + with open(file_path, "r") as f: + lines = [line.rstrip() for line in f] + assert len(lines) == 2 # Header + 1 data row + + # Check that it's a valid CSV with expected columns + df = pd.read_csv(file_path) + expected_columns = ["emissions", "cpu_power", "cpu_energy"] + for col in expected_columns: + self.assertIn(col, df.columns) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_resource_tracker.py b/tests/test_resource_tracker.py index 632aee464..60600e694 100644 --- a/tests/test_resource_tracker.py +++ b/tests/test_resource_tracker.py @@ -198,6 +198,31 @@ def __bool__(self): assert tracker._hardware == [hardware_cpu] +def test_setup_fallback_tracking_uses_forced_power_without_psutil(): + tracker = make_tracker(_force_cpu_power=42) + resource_tracker = ResourceTracker(tracker) + hardware_cpu = MagicMock() + tdp = SimpleNamespace(model="Unknown CPU", tdp=None) + + with ( + patch( + "codecarbon.core.resource_tracker.cpu.is_psutil_available", + return_value=False, + ), + patch( + "codecarbon.core.resource_tracker.CPU.from_utils", return_value=hardware_cpu + ) as mock_from_utils, + patch.object( + resource_tracker, "_get_install_instructions", return_value="instructions" + ), + ): + resource_tracker._setup_fallback_tracking(tdp, 42) + + mock_from_utils.assert_called_once_with("out", "constant", "Unknown CPU", 42) + assert resource_tracker.cpu_tracker == "User Input TDP constant" + assert tracker._hardware == [hardware_cpu] + + def test_set_cpu_tracking_force_mode_uses_cpu_load_and_returns(): tracker = make_tracker(_conf={"cpu_physical_count": 4, "force_mode_cpu_load": True}) resource_tracker = ResourceTracker(tracker) @@ -214,6 +239,55 @@ def test_set_cpu_tracking_force_mode_uses_cpu_load_and_returns(): mock_setup.assert_called_once_with(fake_tdp, 80) +def test_set_cpu_tracking_force_mode_constant_takes_precedence_over_all_backends(): + tracker = make_tracker( + _conf={ + "cpu_physical_count": 4, + "force_mode_constant": True, + "force_mode_cpu_load": True, + } + ) + resource_tracker = ResourceTracker(tracker) + fake_tdp = SimpleNamespace(tdp=20, model="CPU") + fake_cpu = MagicMock() + + with ( + patch("codecarbon.core.resource_tracker.cpu.TDP", return_value=fake_tdp), + patch( + "codecarbon.core.resource_tracker.CPU.from_utils", return_value=fake_cpu + ) as mock_from_utils, + patch( + "codecarbon.core.resource_tracker.cpu.is_powergadget_available", + return_value=True, + ), + patch( + "codecarbon.core.resource_tracker.cpu.is_rapl_available", return_value=True + ), + patch( + "codecarbon.core.resource_tracker.powermetrics.is_powermetrics_available", + return_value=True, + ), + patch.object(resource_tracker, "_setup_cpu_load_mode") as mock_setup_cpu_load, + patch.object( + resource_tracker, "_setup_power_gadget" + ) as mock_setup_power_gadget, + patch.object(resource_tracker, "_setup_rapl") as mock_setup_rapl, + patch.object( + resource_tracker, "_setup_powermetrics" + ) as mock_setup_powermetrics, + ): + resource_tracker.set_CPU_tracking() + + mock_from_utils.assert_called_once_with("out", "constant", "CPU", 80) + mock_setup_cpu_load.assert_not_called() + mock_setup_power_gadget.assert_not_called() + mock_setup_rapl.assert_not_called() + mock_setup_powermetrics.assert_not_called() + assert resource_tracker.cpu_tracker == "TDP constant" + assert tracker._conf["cpu_model"] == "CPU" + assert tracker._hardware == [fake_cpu] + + def test_set_cpu_tracking_prefers_power_gadget(): tracker = make_tracker() resource_tracker = ResourceTracker(tracker)