diff --git a/syllabus/core/curriculum_base.py b/syllabus/core/curriculum_base.py index 03284dab..79a2c79e 100644 --- a/syllabus/core/curriculum_base.py +++ b/syllabus/core/curriculum_base.py @@ -12,8 +12,8 @@ class Curriculum: """Base class and API for defining curricula to interface with Gym environments. """ - - def __init__(self, task_space: TaskSpace, random_start_tasks: int = 0, task_names: Callable = None) -> None: + + def __init__(self, task_space: TaskSpace, random_start_tasks: int = 0, seed: int = None, sample_weights: list = None, task_names: Callable = None) -> None: """Initialize the base Curriculum :param task_space: the environment's task space from which new tasks are sampled @@ -28,6 +28,8 @@ def __init__(self, task_space: TaskSpace, random_start_tasks: int = 0, task_name self.completed_tasks = 0 self.task_names = task_names self.n_updates = 0 + self.seed = seed + self.sample_weights = sample_weights if self.num_tasks == 0: warnings.warn("Task space is empty. This will cause errors during sampling if no tasks are added.") @@ -64,6 +66,12 @@ def tasks(self) -> List[tuple]: """ return list(self.task_space.tasks) + def set_seed(self, seed: int = None) -> None: + self.seed = seed + if(seed!=None) : + np.random.seed(seed) + + def add_task(self, task: typing.Any) -> None: # TODO raise NotImplementedError("This curriculum does not support adding tasks after initialization.") @@ -194,8 +202,12 @@ def sample(self, k: int = 1) -> Union[List, Any]: # Use list of indices because np.choice does not play nice with tuple tasks # tasks = self.tasks + np.random.seed(self.seed) n_tasks = self.num_tasks - task_dist = self._sample_distribution() + if self.sample_weights == None: + task_dist = self._sample_distribution() + else : + task_dist = self.sample_weights task_idx = np.random.choice(list(range(n_tasks)), size=k, p=task_dist) return task_idx diff --git a/syllabus/core/curriculum_sync_wrapper.py b/syllabus/core/curriculum_sync_wrapper.py index 6e069d8c..fd3f94ac 100644 --- a/syllabus/core/curriculum_sync_wrapper.py +++ b/syllabus/core/curriculum_sync_wrapper.py @@ -29,6 +29,10 @@ def count_tasks(self, task_space=None): def tasks(self): return self.task_space.tasks + @property + def seed(self): + return self.seed + def get_tasks(self, task_space=None): return self.task_space.get_tasks(gym_space=task_space) diff --git a/syllabus/core/environment_sync_wrapper.py b/syllabus/core/environment_sync_wrapper.py index c995aa19..c33568b1 100644 --- a/syllabus/core/environment_sync_wrapper.py +++ b/syllabus/core/environment_sync_wrapper.py @@ -39,7 +39,8 @@ def __init__(self, self.task_progress = 0.0 self._batch_step = 0 self.instance_id = components.get_id() - + self.seed = seed + self.episode_length = 0 self.episode_return = 0 diff --git a/syllabus/curricula/plr/central_plr_wrapper.py b/syllabus/curricula/plr/central_plr_wrapper.py index 7f69ea85..bdf39ba3 100644 --- a/syllabus/curricula/plr/central_plr_wrapper.py +++ b/syllabus/curricula/plr/central_plr_wrapper.py @@ -102,6 +102,7 @@ class CentralizedPrioritizedLevelReplay(Curriculum): def __init__( self, task_space: TaskSpace, + seed : int = None, *curriculum_args, task_sampler_kwargs_dict: dict = None, action_space: gym.Space = None, @@ -117,6 +118,7 @@ def __init__( if task_sampler_kwargs_dict is None: task_sampler_kwargs_dict = {} + self.seed = seed self._strategy = task_sampler_kwargs_dict.get("strategy", None) if not isinstance(task_space.gym_space, Discrete) and not isinstance(task_space.gym_space, MultiDiscrete): raise ValueError( @@ -133,7 +135,7 @@ def __init__( self._gae_lambda = gae_lambda self._supress_usage_warnings = suppress_usage_warnings self._task2index = {task: i for i, task in enumerate(self.tasks)} - self._task_sampler = TaskSampler(self.tasks, action_space=action_space, **task_sampler_kwargs_dict) + self._task_sampler = TaskSampler(self.tasks, action_space=action_space, **task_sampler_kwargs_dict, seed = seed) self._rollouts = RolloutStorage( self._num_steps, self._num_processes, diff --git a/syllabus/curricula/plr/plr_wrapper.py b/syllabus/curricula/plr/plr_wrapper.py index 9515df4b..92dc071c 100644 --- a/syllabus/curricula/plr/plr_wrapper.py +++ b/syllabus/curricula/plr/plr_wrapper.py @@ -191,6 +191,7 @@ def __init__( self, task_space: TaskSpace, observation_space: gym.Space, + seed: int = None, *curriculum_args, task_sampler_kwargs_dict: dict = None, action_space: gym.Space = None, @@ -225,8 +226,9 @@ def __init__( self._supress_usage_warnings = suppress_usage_warnings self._get_action_log_dist = get_action_log_dist self._task2index = {task: i for i, task in enumerate(self.tasks)} + self.seed = seed - self._task_sampler = TaskSampler(self.tasks, action_space=action_space, **task_sampler_kwargs_dict) + self._task_sampler = TaskSampler(self.tasks, action_space=action_space, **task_sampler_kwargs_dict, seed = self.seed) self._rollouts = RolloutStorage( self._num_steps, self._num_processes, diff --git a/syllabus/curricula/plr/task_sampler.py b/syllabus/curricula/plr/task_sampler.py index 15ad4852..3b06eb13 100644 --- a/syllabus/curricula/plr/task_sampler.py +++ b/syllabus/curricula/plr/task_sampler.py @@ -40,6 +40,7 @@ def __init__( staleness_coef: float = 0.1, staleness_transform: str = "power", staleness_temperature: float = 1.0, + seed : int = None ): self.action_space = action_space self.tasks = tasks @@ -62,7 +63,7 @@ def __init__( self.partial_task_scores = np.zeros((num_actors, self.num_tasks), dtype=float) self.partial_task_steps = np.zeros((num_actors, self.num_tasks), dtype=np.int64) self.task_staleness = np.array([0.0] * self.num_tasks, dtype=float) - + self.seed = seed self.next_task_index = 0 # Only used for sequential strategy # Logging metrics @@ -280,6 +281,8 @@ def sample(self, strategy=None): proportion_seen = (self.num_tasks - num_unseen) / self.num_tasks if self.replay_schedule == "fixed": + if self.seed != None : + np.random.seed(self.seed) if proportion_seen >= self.rho: # Sample replay level with fixed prob = 1 - nu OR if all levels seen if np.random.rand() > self.nu or not proportion_seen < 1.0: @@ -289,6 +292,8 @@ def sample(self, strategy=None): return self._sample_unseen_level() elif self.replay_schedule == "proportionate": + if self.seed != None : + np.random.seed(self.seed) if proportion_seen >= self.rho and np.random.rand() < proportion_seen: return self._sample_replay_level() else: diff --git a/syllabus/curricula/sequential.py b/syllabus/curricula/sequential.py index baa12637..7de7ded9 100644 --- a/syllabus/curricula/sequential.py +++ b/syllabus/curricula/sequential.py @@ -12,7 +12,7 @@ class SequentialCurriculum(Curriculum): REQUIRES_EPISODE_UPDATES = True REQUIRES_CENTRAL_UPDATES = False - def __init__(self, curriculum_list: List[Curriculum], stopping_conditions: List[Any], *curriculum_args, **curriculum_kwargs): + def __init__(self, curriculum_list: List[Curriculum], stopping_conditions: List[Any], seed : int = None, *curriculum_args, **curriculum_kwargs): super().__init__(*curriculum_args, **curriculum_kwargs) assert len(curriculum_list) > 0, "Must provide at least one curriculum" assert len(stopping_conditions) == len(curriculum_list) - 1, f"Stopping conditions must be one less than the number of curricula. Final curriculum is used for the remainder of training. Expected {len(curriculum_list) - 1}, got {len(stopping_conditions)}." @@ -31,6 +31,7 @@ def __init__(self, curriculum_list: List[Curriculum], stopping_conditions: List[ self.n_tasks = 0 self.total_tasks = 0 self.episode_returns = [] + self.seed = seed def _parse_curriculum_list(self, curriculum_list: List[Curriculum]) -> List[Curriculum]: """ Parse the curriculum list to ensure that all items are curricula. @@ -39,12 +40,13 @@ def _parse_curriculum_list(self, curriculum_list: List[Curriculum]) -> List[Curr parsed_list = [] for item in curriculum_list: if isinstance(item, Curriculum): + item.set_seed(self.seed) parsed_list.append(item) elif isinstance(item, TaskSpace): - parsed_list.append(DomainRandomization(item)) + parsed_list.append(DomainRandomization(item, seed = self.seed)) elif isinstance(item, list): task_space = TaskSpace(len(item), item) - parsed_list.append(DomainRandomization(task_space)) + parsed_list.append(DomainRandomization(task_space, seed = self.seed)) elif self.task_space.contains(item): parsed_list.append(NoopCurriculum(item, self.task_space)) else: @@ -153,6 +155,7 @@ def sample(self, k: int = 1) -> Union[List, Any]: Choose the next k tasks from the list. """ curriculum = self.current_curriculum + curriculum.set_seed(self.seed) tasks = curriculum.sample(k) # Recode tasks into environment task space diff --git a/syllabus/curricula/test_seeding.py b/syllabus/curricula/test_seeding.py new file mode 100644 index 00000000..46f9c89a --- /dev/null +++ b/syllabus/curricula/test_seeding.py @@ -0,0 +1,106 @@ +from syllabus.core import Curriculum +import gymnasium as gym + +from syllabus.task_space import TaskSpace +from domain_randomization import DomainRandomization +from learning_progress import LearningProgressCurriculum +from sequential import SequentialCurriculum +from syllabus.curricula.plr import CentralizedPrioritizedLevelReplay +from syllabus.curricula.plr import PrioritizedLevelReplay +from syllabus.curricula.plr import TaskSampler +import numpy as np + +def seed_test(c: Curriculum): + sample = c.sample() + for i in range(5): + next_sample = c.sample() + print(next_sample) + assert sample == next_sample, f"Expected all samples to be the same, got {sample} and {next_sample}" + sample = next_sample + + return True + +def no_seed_test(c: Curriculum): + sample = c.sample() + list = [int(sample[0])] + for i in range(5): + next_sample = c.sample() + list.append(int(next_sample[0])) + print(next_sample) + sample = next_sample + + if(len(set(list))<=1) : + raise Exception(f"Expected samples to variable, only one sample value {sample}") + else : + return True + +#Seed Tests +task_space = TaskSpace(200) +# task_space = TaskSpace(gym.spaces.Box(low=0, high=1, shape=(2,)), [(0, 0), (0, 1), (1, 0), (1, 1)]) +seed = 3 + +#1: DomainRandomization with seed +c = DomainRandomization(task_space = task_space, seed = seed) +if seed_test(c = c) : + print("DomainRandomization with seed! SUCCESSFUL") + +#2: DomainRandomization without seed +c = DomainRandomization(task_space = task_space) +if no_seed_test(c = c) : + print("DomainRandomization without seed! SUCCESSFUL") + + +#3: LearningProgressCurriculum with seed +c = LearningProgressCurriculum(task_space = task_space, seed = seed) +if seed_test(c = c) : + print("LearningProgressCurriculum with seed! SUCCESSFUL") + +#4: LearningProgressCurriculum without seed +c = LearningProgressCurriculum(task_space = task_space) +if no_seed_test(c = c) : + print("LearningProgressCurriculum without seed! SUCCESSFUL") + +#5: SequentialCurriculum with seed +list = [LearningProgressCurriculum(task_space = task_space),DomainRandomization(task_space = task_space) ] +c = SequentialCurriculum(task_space = task_space, curriculum_list = list, stopping_conditions = ["steps>1"], seed = seed) +if seed_test(c = c) : + print("SequentialCurriculum with seed! SUCCESSFUL") + +#6: SequentialCurriculum without seed +list = [LearningProgressCurriculum(task_space = task_space),DomainRandomization(task_space = task_space) ] +c = SequentialCurriculum(task_space = task_space, curriculum_list = list, stopping_conditions = ["steps>1"]) +if no_seed_test(c = c) : + print("SequentialCurriculum without seed! SUCCESSFUL") + +#7 CentralizedPrioritizedLevelReplay with seed +c = CentralizedPrioritizedLevelReplay(task_space = task_space, seed = seed) +if seed_test(c = c) : + print("CentralizedPrioritizedLevelReplay with seed! SUCCESSFUL") + +#8 CentralizedPrioritizedLevelReplay without seed +c = CentralizedPrioritizedLevelReplay(task_space = task_space) +if no_seed_test(c = c) : + print("CentralizedPrioritizedLevelReplay without seed! SUCCESSFUL") + +#9 PrioritizedLevelReplay with seed +c = PrioritizedLevelReplay(task_space = task_space, observation_space = gym.spaces.Discrete(3), seed = seed) +if seed_test(c = c) : + print("PrioritizedLevelReplay with seed! SUCCESSFUL") + +#10 PrioritizedLevelReplay without seed +c = PrioritizedLevelReplay(task_space = task_space, observation_space = gym.spaces.Discrete(3)) +if no_seed_test(c = c) : + print("PrioritizedLevelReplay without seed! SUCCESSFUL") + +#11 DomainRandomization with seed with sample_weights +space = TaskSpace(gym.spaces.Discrete(4), ["a", "b", "c","d"]) +c = DomainRandomization(task_space = space, seed = seed, sample_weights = [0.6,0.2,0.1,0.1]) +if seed_test(c = c) : + print("DomainRandomization with seed with sample weights! SUCCESSFUL") + +#2: DomainRandomization without seed +c = DomainRandomization(task_space = space, sample_weights = [0.3,0.2,0.4,0.1]) +if no_seed_test(c = c) : + print("DomainRandomization without seed with sample weights! SUCCESSFUL") + + diff --git a/syllabus/examples/training_scripts/cleanrl_procgen_plr.py b/syllabus/examples/training_scripts/cleanrl_procgen_plr.py index e13c22ed..73511911 100644 --- a/syllabus/examples/training_scripts/cleanrl_procgen_plr.py +++ b/syllabus/examples/training_scripts/cleanrl_procgen_plr.py @@ -311,7 +311,7 @@ def get_value(obs): # env setup print("Creating env") - envs = gym.vector.AsyncVectorEnv( + envs = gym.vector.SyncVectorEnv( [ make_env( args.env_id, @@ -324,7 +324,7 @@ def get_value(obs): ) envs = wrap_vecenv(envs) - test_eval_envs = gym.vector.AsyncVectorEnv( + test_eval_envs = gym.vector.SyncVectorEnv( [ make_env(args.env_id, args.seed + i, num_levels=0) for i in range(args.num_eval_episodes) @@ -332,7 +332,7 @@ def get_value(obs): ) test_eval_envs = wrap_vecenv(test_eval_envs) - train_eval_envs = gym.vector.AsyncVectorEnv( + train_eval_envs = gym.vector.SyncVectorEnv( [ make_env(args.env_id, args.seed + i, num_levels=200) for i in range(args.num_eval_episodes) @@ -368,6 +368,7 @@ def get_value(obs): episode_rewards = deque(maxlen=10) completed_episodes = 0 + for update in range(1, num_updates + 1): # Annealing the rate if instructed to do so. if args.anneal_lr: