From 550c86d238abe48f37de498a010e318d331799aa Mon Sep 17 00:00:00 2001 From: blttkgl Date: Wed, 13 May 2026 13:52:39 +0300 Subject: [PATCH] fix(#30): phase-aware concurrency control for Sobol vs BO phases MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolves https://github.com/499602D2/flowboost/issues/30 Previously, backend.max_parallelism was never linked to manager.job_limit, so Ax silently capped concurrent trial generation at DEFAULT_BO_CONCURRENCY=3 regardless of the scheduler's actual capacity. Changes: - Session._apply_backend_preferences() wires manager.job_limit to backend.max_parallelism so the Sobol initialization phase can fill all available scheduler slots - Add Session.bo_concurrency parameter (Optional[int]) to cap concurrent acquisitions during the BO phase independently of job_limit - Phase detection in _is_bo_phase() checks Ax's actual experiment trial count (not just finished cases) to match Ax's internal Sobol→BO transition - _validate_bo_concurrency() enforces type correctness at construction time - Persist/restore bo_concurrency through session config - 6 new tests covering Sobol-phase pass-through, BO-phase capping, max_parallelism sync on start() and restore() - Update aerofoilNACA0012Steady example with bo_concurrency=2, job_limit=10 Signed-off-by: blttkgl --- .../aerofoilNACA0012Steady.py | 5 +- flowboost/session/session.py | 64 ++++++++++++++++ tests/flowboost/session/test_session.py | 59 +++++++++++++++ tests/flowboost/session/test_session_unit.py | 74 +++++++++++++++++++ 4 files changed, 200 insertions(+), 2 deletions(-) diff --git a/examples/aerofoilNACA0012Steady/aerofoilNACA0012Steady.py b/examples/aerofoilNACA0012Steady/aerofoilNACA0012Steady.py index 806dd82..15e8170 100644 --- a/examples/aerofoilNACA0012Steady/aerofoilNACA0012Steady.py +++ b/examples/aerofoilNACA0012Steady/aerofoilNACA0012Steady.py @@ -55,6 +55,7 @@ def lift_objective(case: Case): random_seed=0, target_value=60.8, # Target lift-to-drag ratio max_evaluations=50, + bo_concurrency=3 ) # --- Template case --- @@ -106,11 +107,11 @@ def lift_objective(case: Case): session.job_manager = session.job_manager or Manager.create( scheduler="Local", wdir=session.data_dir, - job_limit=2, + job_limit=5, ) session.job_manager.monitoring_interval = 10 # --- Run --- - session.backend.initialization_trials = 8 + session.backend.initialization_trials = 10 session.clean_pending_cases() session.start() diff --git a/flowboost/session/session.py b/flowboost/session/session.py index 77cda1d..7777ff5 100644 --- a/flowboost/session/session.py +++ b/flowboost/session/session.py @@ -37,6 +37,7 @@ def __init__( backend: str = "AxBackend", clone_method: Literal["foamCloneCase", "copy"] = "foamCloneCase", random_seed: Optional[int] = None, + bo_concurrency: Optional[int] = None, max_evaluations: Optional[int] = None, target_value: Optional[float] = None, target_objective: Optional[str] = None, @@ -62,6 +63,10 @@ def __init__( session restarts. Note: this mutates ``torch``'s global RNG \ state, which may affect user code that depends on it in the \ same process. Defaults to None (non-deterministic). + bo_concurrency (Optional[int], optional): Maximum number of + Bayesian-optimization candidates to request per cycle after + the Sobol initialization phase. If None, no additional BO + cap is applied at the Session layer. max_evaluations (Optional[int], optional): Maximum number of attempted \ case evaluations before stopping optimization. Completed failed \ cases and pending submitted cases both count against this cap. \ @@ -79,6 +84,7 @@ def __init__( self.dataframe_format: Literal["pandas", "polars"] = dataframe_format self.clone_method: Literal["foamCloneCase", "copy"] = clone_method self.random_seed: Optional[int] = random_seed + self.bo_concurrency: Optional[int] = bo_concurrency # Termination criteria self.max_evaluations: Optional[int] = max_evaluations @@ -284,6 +290,10 @@ def start(self): # Template configured and dimensions OK, plus no wonky properties? self._verify_search_space_in_template() + # Ensure scheduler-aware backend preferences (e.g. max_parallelism) + # are synchronized just before initialization. + self._apply_backend_preferences() + # Initialize the optimizer backend self.backend.initialize() @@ -437,6 +447,19 @@ def local_optimization(self, num_new_cases: int) -> list[dict[Dimension, Any]]: # Attach failed cases separately self.backend.attach_failed_cases(self.get_failed_cases()) + if self._is_bo_phase(finished_case_count=len(finished_cases)): + if self.bo_concurrency is not None: + num_new_cases = min(num_new_cases, self.bo_concurrency) + logging.info( + "Running BO acquisition with session cap " + f"bo_concurrency={self.bo_concurrency}" + ) + else: + logging.info( + "Running Sobol initialization acquisition " + f"(finished={len(finished_cases)}/{self._num_initialization_trials()})" + ) + # Ready to get new cases logging.info(f"Running acquisition: manager had {num_new_cases} free slot(s)") suggestion = self.backend.ask(max_cases=num_new_cases) @@ -602,6 +625,8 @@ def state(self) -> dict[str, Any]: } if self.random_seed is not None: optimizer_state["random_seed"] = self.random_seed + if self.bo_concurrency is not None: + optimizer_state["bo_concurrency"] = self.bo_concurrency state = { "session": { @@ -676,6 +701,7 @@ def restore(self, from_file: str = config.DEFAULT_CONFIG_NAME): # [optimizer] backend_type = str(data.get("optimizer", {}).get("type", "Ax")) self.random_seed = data.get("optimizer", {}).get("random_seed") + self.bo_concurrency = data.get("optimizer", {}).get("bo_concurrency") self.backend = Backend.create(backend_type) self._apply_backend_preferences() offload = data.get("optimizer", {}).get("offload_acquisition", False) @@ -690,6 +716,7 @@ def restore(self, from_file: str = config.DEFAULT_CONFIG_NAME): self.job_manager = Manager.create( scheduler=scheduler, wdir=self.data_dir, job_limit=job_limit ) + self._apply_backend_preferences() logging.info(f"Session restored from {from_file}") # No automatic pending case cleanup here. @@ -698,6 +725,43 @@ def _apply_backend_preferences(self): """Apply session-level optimizer settings to the active backend when supported.""" self.backend.random_seed = self.random_seed + manager = getattr(self, "job_manager", None) + if manager and hasattr(self.backend, "max_parallelism"): + # Keep Ax generation strategy from under-utilizing scheduler slots + # during Sobol initialization. + setattr(self.backend, "max_parallelism", manager.job_limit) + + def _num_initialization_trials(self) -> int: + trials = getattr(self.backend, "initialization_trials", None) + return int(trials) if trials is not None else 5 + + def _is_bo_phase(self, finished_case_count: int) -> bool: + """Detect if we're in the BO phase by checking Ax's actual trial count. + + Ax transitions from Sobol to BO based on total generated trials + (including pending), not just finished trials. We must check Ax's + actual state to match its phase transitions. + """ + # Check if backend is Ax and has been initialized with trials + if not hasattr(self.backend, "client"): + # Fallback: use finished case count for non-Ax backends + return finished_case_count >= self._num_initialization_trials() + + # Check Ax's experiment trial count + try: + experiment = self.backend.client.experiment + if experiment is None: + # Experiment not yet created, still in initialization + return False + + total_trials = len(experiment.trials) + init_trials = self._num_initialization_trials() + return total_trials >= init_trials + except (AttributeError, AssertionError): + # If we can't access Ax's state (experiment not created or other error), + # fall back to finished case count + return finished_case_count >= self._num_initialization_trials() + def clean_pending_cases(self): """ Explicitly removes all pending case directories. diff --git a/tests/flowboost/session/test_session.py b/tests/flowboost/session/test_session.py index e26dccd..99abb4c 100644 --- a/tests/flowboost/session/test_session.py +++ b/tests/flowboost/session/test_session.py @@ -1,6 +1,7 @@ import json import logging from pathlib import Path +from types import SimpleNamespace import pytest @@ -66,6 +67,64 @@ def test_session_restore_reapplies_random_seed_to_backend(tmp_path): assert restored.backend.random_seed == 321 +def test_session_job_limit_is_applied_to_backend_parallelism_when_unset(tmp_path): + session = Session( + name="Parallelism sync", + data_dir=tmp_path / "parallelism_sync_session", + ) + + assert hasattr(session.backend, "max_parallelism") + assert session.backend.max_parallelism is None + + session.job_manager = SimpleNamespace(job_limit=8) + session._apply_backend_preferences() + + assert session.backend.max_parallelism == 8 + + +def test_session_persists_and_restores_bo_concurrency(tmp_path): + session = Session( + name="BO concurrency", + data_dir=tmp_path / "bo_concurrency_session", + bo_concurrency=4, + ) + + session.persist() + restored = Session( + name="ignored", + data_dir=session.data_dir, + ) + + assert restored.bo_concurrency == 4 + + +def test_session_restore_applies_scheduler_job_limit_to_backend_parallelism( + tmp_path, monkeypatch +): + data_dir = tmp_path / "restore_parallelism_session" + created = Session( + name="Restore parallelism", + data_dir=data_dir, + ) + created.job_manager = SimpleNamespace(type="Local", job_limit=6) + created.persist() + + def _fake_manager_create(scheduler: str, wdir: Path, job_limit: int): + return SimpleNamespace(type=scheduler, job_limit=job_limit) + + monkeypatch.setattr("flowboost.session.session.Manager.create", _fake_manager_create) + + restored = Session( + name="ignored", + data_dir=data_dir, + ) + + assert restored.job_manager is not None + assert restored.job_manager.job_limit == 6 + assert hasattr(restored.backend, "max_parallelism") + assert restored.backend.max_parallelism == 6 + + def test_incorrect_startup(): # Objective missing linked entry # Test missing dictionary diff --git a/tests/flowboost/session/test_session_unit.py b/tests/flowboost/session/test_session_unit.py index b1c03ff..8d73747 100644 --- a/tests/flowboost/session/test_session_unit.py +++ b/tests/flowboost/session/test_session_unit.py @@ -102,6 +102,80 @@ def test_configure_optimization_delegates_to_backend(self, tmp_path): assert session.backend.dimensions == [dimension] +class TestSessionBOConcurrency: + def _patch_local_optimization_dependencies(self, session: Session, monkeypatch): + monkeypatch.setattr(session.backend, "tell", lambda cases: None) + monkeypatch.setattr(session.backend, "attach_pending_cases", lambda cases: None) + monkeypatch.setattr(session.backend, "attach_failed_cases", lambda cases: None) + monkeypatch.setattr(session, "get_pending_cases", lambda: []) + monkeypatch.setattr(session, "get_failed_cases", lambda: []) + + def test_local_optimization_sobol_phase_uses_available_slots( + self, tmp_path, monkeypatch + ): + session = Session( + name="test", + data_dir=tmp_path / "session", + bo_concurrency=1, + ) + session.backend.initialization_trials = 3 + monkeypatch.setattr(session, "get_finished_cases", lambda batch_process=True: []) + self._patch_local_optimization_dependencies(session, monkeypatch) + + ask = MagicMock(return_value=[]) + monkeypatch.setattr(session.backend, "ask", ask) + + session.local_optimization(num_new_cases=5) + + ask.assert_called_once_with(max_cases=5) + + def test_local_optimization_bo_phase_uses_bo_concurrency_cap( + self, tmp_path, monkeypatch + ): + session = Session( + name="test", + data_dir=tmp_path / "session", + bo_concurrency=2, + ) + session.backend.initialization_trials = 3 + + completed = [MagicMock(), MagicMock(), MagicMock()] + monkeypatch.setattr( + session, + "get_finished_cases", + lambda batch_process=True: completed, + ) + self._patch_local_optimization_dependencies(session, monkeypatch) + + ask = MagicMock(return_value=[]) + monkeypatch.setattr(session.backend, "ask", ask) + + session.local_optimization(num_new_cases=5) + + ask.assert_called_once_with(max_cases=2) + + def test_local_optimization_bo_phase_without_cap_uses_available_slots( + self, tmp_path, monkeypatch + ): + session = Session(name="test", data_dir=tmp_path / "session") + session.backend.initialization_trials = 1 + + completed = [MagicMock()] + monkeypatch.setattr( + session, + "get_finished_cases", + lambda batch_process=True: completed, + ) + self._patch_local_optimization_dependencies(session, monkeypatch) + + ask = MagicMock(return_value=[]) + monkeypatch.setattr(session.backend, "ask", ask) + + session.local_optimization(num_new_cases=4) + + ask.assert_called_once_with(max_cases=4) + + class TestCheckTerminationCriteria: def _make_session_with_cases(self, tmp_path, n_cases, **session_kwargs): session = Session(name="test", data_dir=tmp_path / "session", **session_kwargs)