Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions examples/aerofoilNACA0012Steady/aerofoilNACA0012Steady.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def lift_objective(case: Case):
random_seed=0,
target_value=60.8, # Target lift-to-drag ratio
max_evaluations=50,
bo_concurrency=3
)

# --- Template case ---
Expand Down Expand Up @@ -106,11 +107,11 @@ def lift_objective(case: Case):
session.job_manager = session.job_manager or Manager.create(
scheduler="Local",
wdir=session.data_dir,
job_limit=2,
job_limit=5,
)
session.job_manager.monitoring_interval = 10

# --- Run ---
session.backend.initialization_trials = 8
session.backend.initialization_trials = 10
session.clean_pending_cases()
session.start()
64 changes: 64 additions & 0 deletions flowboost/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(
backend: str = "AxBackend",
clone_method: Literal["foamCloneCase", "copy"] = "foamCloneCase",
random_seed: Optional[int] = None,
bo_concurrency: Optional[int] = None,
max_evaluations: Optional[int] = None,
target_value: Optional[float] = None,
target_objective: Optional[str] = None,
Expand All @@ -62,6 +63,10 @@ def __init__(
session restarts. Note: this mutates ``torch``'s global RNG \
state, which may affect user code that depends on it in the \
same process. Defaults to None (non-deterministic).
bo_concurrency (Optional[int], optional): Maximum number of
Bayesian-optimization candidates to request per cycle after
the Sobol initialization phase. If None, no additional BO
cap is applied at the Session layer.
max_evaluations (Optional[int], optional): Maximum number of attempted \
case evaluations before stopping optimization. Completed failed \
cases and pending submitted cases both count against this cap. \
Expand All @@ -79,6 +84,7 @@ def __init__(
self.dataframe_format: Literal["pandas", "polars"] = dataframe_format
self.clone_method: Literal["foamCloneCase", "copy"] = clone_method
self.random_seed: Optional[int] = random_seed
self.bo_concurrency: Optional[int] = bo_concurrency

# Termination criteria
self.max_evaluations: Optional[int] = max_evaluations
Expand Down Expand Up @@ -284,6 +290,10 @@ def start(self):
# Template configured and dimensions OK, plus no wonky properties?
self._verify_search_space_in_template()

# Ensure scheduler-aware backend preferences (e.g. max_parallelism)
# are synchronized just before initialization.
self._apply_backend_preferences()

# Initialize the optimizer backend
self.backend.initialize()

Expand Down Expand Up @@ -437,6 +447,19 @@ def local_optimization(self, num_new_cases: int) -> list[dict[Dimension, Any]]:
# Attach failed cases separately
self.backend.attach_failed_cases(self.get_failed_cases())

if self._is_bo_phase(finished_case_count=len(finished_cases)):
if self.bo_concurrency is not None:
num_new_cases = min(num_new_cases, self.bo_concurrency)
logging.info(
"Running BO acquisition with session cap "
f"bo_concurrency={self.bo_concurrency}"
)
else:
logging.info(
"Running Sobol initialization acquisition "
f"(finished={len(finished_cases)}/{self._num_initialization_trials()})"
)

# Ready to get new cases
logging.info(f"Running acquisition: manager had {num_new_cases} free slot(s)")
suggestion = self.backend.ask(max_cases=num_new_cases)
Expand Down Expand Up @@ -602,6 +625,8 @@ def state(self) -> dict[str, Any]:
}
if self.random_seed is not None:
optimizer_state["random_seed"] = self.random_seed
if self.bo_concurrency is not None:
optimizer_state["bo_concurrency"] = self.bo_concurrency

state = {
"session": {
Expand Down Expand Up @@ -676,6 +701,7 @@ def restore(self, from_file: str = config.DEFAULT_CONFIG_NAME):
# [optimizer]
backend_type = str(data.get("optimizer", {}).get("type", "Ax"))
self.random_seed = data.get("optimizer", {}).get("random_seed")
self.bo_concurrency = data.get("optimizer", {}).get("bo_concurrency")
self.backend = Backend.create(backend_type)
self._apply_backend_preferences()
offload = data.get("optimizer", {}).get("offload_acquisition", False)
Expand All @@ -690,6 +716,7 @@ def restore(self, from_file: str = config.DEFAULT_CONFIG_NAME):
self.job_manager = Manager.create(
scheduler=scheduler, wdir=self.data_dir, job_limit=job_limit
)
self._apply_backend_preferences()

logging.info(f"Session restored from {from_file}")
# No automatic pending case cleanup here.
Expand All @@ -698,6 +725,43 @@ def _apply_backend_preferences(self):
"""Apply session-level optimizer settings to the active backend when supported."""
self.backend.random_seed = self.random_seed

manager = getattr(self, "job_manager", None)
if manager and hasattr(self.backend, "max_parallelism"):
# Keep Ax generation strategy from under-utilizing scheduler slots
# during Sobol initialization.
setattr(self.backend, "max_parallelism", manager.job_limit)

def _num_initialization_trials(self) -> int:
trials = getattr(self.backend, "initialization_trials", None)
return int(trials) if trials is not None else 5

def _is_bo_phase(self, finished_case_count: int) -> bool:
"""Detect if we're in the BO phase by checking Ax's actual trial count.

Ax transitions from Sobol to BO based on total generated trials
(including pending), not just finished trials. We must check Ax's
actual state to match its phase transitions.
"""
# Check if backend is Ax and has been initialized with trials
if not hasattr(self.backend, "client"):
# Fallback: use finished case count for non-Ax backends
return finished_case_count >= self._num_initialization_trials()

# Check Ax's experiment trial count
try:
experiment = self.backend.client.experiment
if experiment is None:
# Experiment not yet created, still in initialization
return False

total_trials = len(experiment.trials)
init_trials = self._num_initialization_trials()
return total_trials >= init_trials
except (AttributeError, AssertionError):
# If we can't access Ax's state (experiment not created or other error),
# fall back to finished case count
return finished_case_count >= self._num_initialization_trials()

def clean_pending_cases(self):
"""
Explicitly removes all pending case directories.
Expand Down
59 changes: 59 additions & 0 deletions tests/flowboost/session/test_session.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
from pathlib import Path
from types import SimpleNamespace

import pytest

Expand Down Expand Up @@ -66,6 +67,64 @@ def test_session_restore_reapplies_random_seed_to_backend(tmp_path):
assert restored.backend.random_seed == 321


def test_session_job_limit_is_applied_to_backend_parallelism_when_unset(tmp_path):
session = Session(
name="Parallelism sync",
data_dir=tmp_path / "parallelism_sync_session",
)

assert hasattr(session.backend, "max_parallelism")
assert session.backend.max_parallelism is None

session.job_manager = SimpleNamespace(job_limit=8)
session._apply_backend_preferences()

assert session.backend.max_parallelism == 8


def test_session_persists_and_restores_bo_concurrency(tmp_path):
session = Session(
name="BO concurrency",
data_dir=tmp_path / "bo_concurrency_session",
bo_concurrency=4,
)

session.persist()
restored = Session(
name="ignored",
data_dir=session.data_dir,
)

assert restored.bo_concurrency == 4


def test_session_restore_applies_scheduler_job_limit_to_backend_parallelism(
tmp_path, monkeypatch
):
data_dir = tmp_path / "restore_parallelism_session"
created = Session(
name="Restore parallelism",
data_dir=data_dir,
)
created.job_manager = SimpleNamespace(type="Local", job_limit=6)
created.persist()

def _fake_manager_create(scheduler: str, wdir: Path, job_limit: int):
return SimpleNamespace(type=scheduler, job_limit=job_limit)

monkeypatch.setattr("flowboost.session.session.Manager.create", _fake_manager_create)

restored = Session(
name="ignored",
data_dir=data_dir,
)

assert restored.job_manager is not None
assert restored.job_manager.job_limit == 6
assert hasattr(restored.backend, "max_parallelism")
assert restored.backend.max_parallelism == 6


def test_incorrect_startup():
# Objective missing linked entry
# Test missing dictionary
Expand Down
74 changes: 74 additions & 0 deletions tests/flowboost/session/test_session_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,80 @@ def test_configure_optimization_delegates_to_backend(self, tmp_path):
assert session.backend.dimensions == [dimension]


class TestSessionBOConcurrency:
def _patch_local_optimization_dependencies(self, session: Session, monkeypatch):
monkeypatch.setattr(session.backend, "tell", lambda cases: None)
monkeypatch.setattr(session.backend, "attach_pending_cases", lambda cases: None)
monkeypatch.setattr(session.backend, "attach_failed_cases", lambda cases: None)
monkeypatch.setattr(session, "get_pending_cases", lambda: [])
monkeypatch.setattr(session, "get_failed_cases", lambda: [])

def test_local_optimization_sobol_phase_uses_available_slots(
self, tmp_path, monkeypatch
):
session = Session(
name="test",
data_dir=tmp_path / "session",
bo_concurrency=1,
)
session.backend.initialization_trials = 3
monkeypatch.setattr(session, "get_finished_cases", lambda batch_process=True: [])
self._patch_local_optimization_dependencies(session, monkeypatch)

ask = MagicMock(return_value=[])
monkeypatch.setattr(session.backend, "ask", ask)

session.local_optimization(num_new_cases=5)

ask.assert_called_once_with(max_cases=5)

def test_local_optimization_bo_phase_uses_bo_concurrency_cap(
self, tmp_path, monkeypatch
):
session = Session(
name="test",
data_dir=tmp_path / "session",
bo_concurrency=2,
)
session.backend.initialization_trials = 3

completed = [MagicMock(), MagicMock(), MagicMock()]
monkeypatch.setattr(
session,
"get_finished_cases",
lambda batch_process=True: completed,
)
self._patch_local_optimization_dependencies(session, monkeypatch)

ask = MagicMock(return_value=[])
monkeypatch.setattr(session.backend, "ask", ask)

session.local_optimization(num_new_cases=5)

ask.assert_called_once_with(max_cases=2)

def test_local_optimization_bo_phase_without_cap_uses_available_slots(
self, tmp_path, monkeypatch
):
session = Session(name="test", data_dir=tmp_path / "session")
session.backend.initialization_trials = 1

completed = [MagicMock()]
monkeypatch.setattr(
session,
"get_finished_cases",
lambda batch_process=True: completed,
)
self._patch_local_optimization_dependencies(session, monkeypatch)

ask = MagicMock(return_value=[])
monkeypatch.setattr(session.backend, "ask", ask)

session.local_optimization(num_new_cases=4)

ask.assert_called_once_with(max_cases=4)


class TestCheckTerminationCriteria:
def _make_session_with_cases(self, tmp_path, n_cases, **session_kwargs):
session = Session(name="test", data_dir=tmp_path / "session", **session_kwargs)
Expand Down
Loading