From 1484de46dd5e236602c1734e524c5a724b4c6b43 Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Tue, 16 Sep 2025 11:57:22 -0400 Subject: [PATCH 01/19] Add unimplemented merge_networks --- alchemiscale/storage/statestore.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index de397160..b5f089df 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -861,6 +861,9 @@ def delete_network( """ raise NotImplementedError + def merge_networks(self, networks: list[ScopedKey], scope: Scope) -> ScopedKey: + raise NotImplementedError + def get_network_state(self, networks: list[ScopedKey]) -> list[str | None]: """Get the states of a group of networks. From 40cb0ab6c35b54b0390b2bdebda6e7108e0beb47 Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Sat, 27 Sep 2025 10:34:07 -0400 Subject: [PATCH 02/19] Make assemble_network chainable --- alchemiscale/storage/statestore.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index b5f089df..b09e5b3c 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -766,11 +766,13 @@ def _topological_sort(graph_data: dict[GufeKey, tuple[Node, list[GufeKey]]]): graph_data.pop(rk) return L + @chainable def assemble_network( self, network: AlchemicalNetwork, scope: Scope, state: NetworkStateEnum | str = NetworkStateEnum.active, + tx=None, ) -> tuple[ScopedKey, ScopedKey, ScopedKey]: """Create all nodes and relationships needed for an AlchemicalNetwork represented in an alchemiscale state store. @@ -796,8 +798,7 @@ def assemble_network( subgraph = nw_subgraph | th_subgraph | nm_subgraph - with self.transaction() as tx: - merge_subgraph(tx, subgraph, "GufeTokenizable", "_scoped_key") + merge_subgraph(tx, subgraph, "GufeTokenizable", "_scoped_key") return nw_sk, th_sk, nm_sk From 30e7dd3682197b3a62167b513bace0c40e2e3488 Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Sat, 27 Sep 2025 11:57:18 -0400 Subject: [PATCH 03/19] Partial implementation of merge_networks Networks are merged into a new network under a new scope with a given name. Task and ProtocolDAGResultRef support is not implemented here. --- alchemiscale/storage/statestore.py | 115 ++++++++++++++++- .../integration/storage/test_statestore.py | 116 ++++++++++++++++++ 2 files changed, 229 insertions(+), 2 deletions(-) diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index b09e5b3c..6d8bd95e 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -862,8 +862,119 @@ def delete_network( """ raise NotImplementedError - def merge_networks(self, networks: list[ScopedKey], scope: Scope) -> ScopedKey: - raise NotImplementedError + @chainable + def merge_networks( + self, + network_scoped_keys: list[ScopedKey], + name: str, + scope: Scope, + clone_incomplete_tasks=False, + tx=None, + ) -> ScopedKey: + """Merge multiple ``AlchemicalNetwork`` nodes into a new ``AlchemicalNetwork``. + + Parameters + ---------- + network_scoped_keys + List of ``AlchemicalNetwork`` ``ScopedKey`` objects to merge. + name + The name of the new ``AlchemicalNetwork``. + scope + The ``Scope`` of the new ``AlchemicalNetwork``. + + Returns + ------- + The ``ScopedKey`` of the new ``AlchemicalNetwork`` in the database. + """ + + # - Collect keyed chain representation for all alchemiscale networks + try: + network_keyed_chains: list[tuple[Scope, KeyedChain]] = [] + for network_scoped_key in network_scoped_keys: + keyed_chain = self.get_keyed_chain(network_scoped_key) + network_keyed_chains.append((network_scoped_key.scope, keyed_chain)) + # could not find specified network by provided scoped key + except KeyError: + raise ValueError( + f"ScopedKey ({network_scoped_key}) not found in the database." + ) + + from dataclasses import dataclass, field + + @dataclass + class TransformationData: + transformation: Transformation + task_tree: list = field( + default_factory=list + ) # flat represenation of task tree with results + + def __eq__(self, other): + if isinstance(other, (Transformation, NonTransformation)): + return other is self.transformation + return other.transformation is self.transformation + + def add_known_scoped_key_results(self, key, scope): + known_scoped_key = ScopedKey(gufe_key=GufeKey(key), **scope.to_dict()) + self.update_task_tree(known_scoped_key) + + def update_task_tree(self, tf_scoped_key): + nonlocal tx + query = """ + MATCH (task:Task)-[:PERFORMS]->(:Transformation {`_scoped_key`: $tf_scoped_key}) + OPTIONAL MATCH (task)-[:EXTENDS]->(extended_task:Task) + OPTIONAL MATCH (task)<-[:RESULTS_IN]-(pdrr:ProtocolDAGResultRef) + RETURN task, extended_task._scoped_key as etask_sk, pdrr + """ + results = ( + tx.run(query, tf_scoped_key=str(tf_scoped_key)) + .to_eager_result() + .records + ) + + self.task_tree.extend(results) + + from gufe.tokenization import key_decode_dependencies + + # TODO: upstream to gufe + def kc_to_gufe(kc, gts): + for gufe_key, keyed_dict in kc: + if gt := gts.get(gufe_key): + continue + gt = key_decode_dependencies(keyed_dict, registry=gts) + gts[gufe_key] = gt + return gt + + # Map Transformation and NonTransformation objects to their + # potentially duplicated database gufe keys. This may happen + # due to minor version changes between serialization + # times. These keys are then used to get all results for Tasks + # associated with them. + transformation_data: list[TransformationData] = [] + for network_scope, network_keyed_chain in network_keyed_chains: + gts = {} + for index, (database_key, _) in enumerate(network_keyed_chain): + # only process Transformation or NonTransformation keys + if ( + "Transformation" in database_key + or "NonTransformation" in database_key + ): + # get the tokenizable at the index along with all previous data + subchain = KeyedChain(network_keyed_chain[: index + 1]) + transformation = kc_to_gufe(subchain, gts) + try: + index = transformation_data.index(transformation) + data = transformation_data[index] + except ValueError: + data = TransformationData(transformation) + transformation_data.append(data) + data.add_known_scoped_key_results(database_key, network_scope) + # - Collect all transformation gufe objects and collect into a new set of edges + new_edges = [td.transformation for td in transformation_data] + # - Make new alchemiscale network with these edges + combined_alchemical_network = AlchemicalNetwork(edges=new_edges, name=name) + # - assemble the network + an_sk, _, _ = self.assemble_network(combined_alchemical_network, scope, tx=tx) + return an_sk def get_network_state(self, networks: list[ScopedKey]) -> list[str | None]: """Get the states of a group of networks. diff --git a/alchemiscale/tests/integration/storage/test_statestore.py b/alchemiscale/tests/integration/storage/test_statestore.py index d15e168d..c42fa5ca 100644 --- a/alchemiscale/tests/integration/storage/test_statestore.py +++ b/alchemiscale/tests/integration/storage/test_statestore.py @@ -138,6 +138,122 @@ def test_create_overlapping_networks(self, n4js, network_tyk2, scope_test): def test_delete_network(self): raise NotImplementedError + def test_merge_networks(self, n4js, network_tyk2, scope_test): + network_sks = [] + + scope_args = scope_test.to_dict() + all_transformations = list(network_tyk2.edges) + all_transformations.sort(key=lambda x: x.key) + + def project_scope(iteration): + return Scope(**(scope_args | {"project": f"project{iteration}"})) + + transformations_common = all_transformations[:3] + + # note the nonlocal task_sks and transformation_sks to reduce + # clutter down below + def set_result(_slice, *, ok: bool): + nonlocal task_sks, transformation_sks + for task_sk, transformation_sk in zip( + task_sks[_slice], transformation_sks[_slice] + ): + pdrr = ProtocolDAGResultRef( + obj_key=transformation_sk.gufe_key, scope=task_sk.scope, ok=ok + ) + n4js.set_task_result(task_sk, pdrr) + + # NETWORK 1---JUST TYK2 + # 5 tasks: [completed, completed, running, error, waiting] + # no extends + an = network_tyk2.copy_with_replacements( + name=network_tyk2.name + f"_test_network_clone_1" + ) + scope = project_scope(1) + sk, th_sk, _ = n4js.assemble_network(an, scope) + network_sks.append(sk) + # add two more of the transformations + transformation_sks = [ + n4js.get_scoped_key(transformation, scope) + for transformation in transformations_common + all_transformations[3:5] + ] + task_sks = n4js.create_tasks(transformation_sks) + n4js.set_task_running(task_sks[2:3]) + n4js.action_tasks(task_sks[2:3], th_sk) + + n4js.set_task_waiting(task_sks[5:]) + n4js.action_tasks(task_sks[5:], th_sk) + + n4js.set_task_error(task_sks[4:5]) + n4js.action_tasks(task_sks[4:5], th_sk) + + n4js.set_task_complete(task_sks[:2]) + + set_result(slice(None, 2), ok=True) + set_result(slice(4, 5), ok=False) + + # NETWORK 2---TYK2 WITH TRIMMED EDGES + # 6 tasks: [complete, complete, complete] + # ^ ^ ^ + # | | | + # extends: [waiting, running, complete] + an = network_tyk2.copy_with_replacements( + name=network_tyk2.name + f"_test_network_clone_2_fewer_transformations", + edges=all_transformations[:-2], + ) + scope = project_scope(2) + sk, _, _ = n4js.assemble_network(an, scope) + network_sks.append(sk) + transformation_sks = [ + n4js.get_scoped_key(transformation, scope) + for transformation in transformations_common + ] + task_sks = n4js.create_tasks(transformation_sks) + n4js.set_task_complete(task_sks) + set_result(slice(None), ok=True) + + # create extending tasks + task_sks = n4js.create_tasks(transformation_sks, extends=task_sks) + + n4js.set_task_waiting(task_sks[0:1]) + n4js.set_task_running(task_sks[1:2]) + n4js.set_task_complete(task_sks[2:3]) + + set_result(slice(2, 3), ok=True) + + # network 3---tyk2 + # 6 tasks: [waiting, waiting, waiting, waiting, waiting, complete] + # no extends + # last task is for a transformation missing from NETWORK 2 + an = network_tyk2.copy_with_replacements( + name=network_tyk2.name + f"_test_network_clone_2_name_only" + ) + scope = project_scope(3) + sk, _, _ = n4js.assemble_network(an, scope) + network_sks.append(sk) + + transformation_sks = [ + n4js.get_scoped_key(transformation, scope) + for transformation in transformations_common + all_transformations[-2:] + ] + + task_sks = n4js.create_tasks(transformation_sks) + n4js.set_task_waiting(task_sks[:-1]) + n4js.set_task_complete(task_sks[-1:]) + set_result(slice(-1, None), ok=True) + + scope_dict = scope_test.to_dict() + scope_dict["project"] = "mergedproject" + new_scope = Scope(**scope_dict) + sk_merged = n4js.merge_networks( + network_sks, f"{network_tyk2.name}_combined", new_scope + ) + assert len(n4js.get_gufe(sk_merged).edges) == len( + n4js.get_gufe(network_sks[0]).edges + ) + assert len(n4js.get_gufe(sk_merged).edges) != len( + n4js.get_gufe(network_sks[1]).edges + ) + def test_set_network_state(self, n4js, network_tyk2, scope_test): valid_states = [state.value for state in NetworkStateEnum] network_sks = [] From 865a46f2483f78b539b87942b4181af1dcbfa91f Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Mon, 29 Sep 2025 15:23:18 -0400 Subject: [PATCH 04/19] Make _validate_extends_tasks and create_tasks chainable --- alchemiscale/storage/statestore.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index 5468e7b1..7dc9b48d 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -2895,7 +2895,10 @@ def task_count(task_dict: dict): ## tasks - def _validate_extends_tasks(self, task_list) -> dict[str, tuple[Node, str]]: + @chainable + def _validate_extends_tasks( + self, task_list, tx=None + ) -> dict[str, tuple[Node, str]]: if not task_list: return {} @@ -2906,7 +2909,7 @@ def _validate_extends_tasks(self, task_list) -> dict[str, tuple[Node, str]]: return t, tf._scoped_key as tf_sk """ - results = self.execute_query(q, task_list=list(map(str, task_list))) + results = tx.run(q, task_list=list(map(str, task_list))).to_eager_result() nodes = {} @@ -2926,11 +2929,13 @@ def _validate_extends_tasks(self, task_list) -> dict[str, tuple[Node, str]]: return nodes + @chainable def create_tasks( self, transformations: list[ScopedKey], extends: list[ScopedKey | None] | None = None, creator: str | None = None, + tx=None, ) -> list[ScopedKey]: """Create Tasks for the given Transformations. @@ -2985,7 +2990,8 @@ def create_tasks( transformation_map[transformation.qualname][1].append(extends[i]) extends_nodes = self._validate_extends_tasks( - [_extends for _extends in extends if _extends is not None] + [_extends for _extends in extends if _extends is not None], + tx=tx, ) subgraph = Subgraph() @@ -3006,9 +3012,9 @@ def create_tasks( RETURN n """ - results = self.execute_query( + results = tx.run( q, transformation_subset=list(map(str, transformation_subset)) - ) + ).to_eager_result() transformation_nodes = {} for record in results.records: @@ -3057,8 +3063,7 @@ def create_tasks( _project=scope.project, ) - with self.transaction() as tx: - merge_subgraph(tx, subgraph, "GufeTokenizable", "_scoped_key") + merge_subgraph(tx, subgraph, "GufeTokenizable", "_scoped_key") return sks From fb28a599f4f21ae86c36f9e2b4cdea011ae1094e Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Tue, 7 Oct 2025 08:22:49 -0400 Subject: [PATCH 05/19] Use Subgraphs for task (+PDRR) and network assembly --- alchemiscale/storage/statestore.py | 55 ++++++++++++++++++++++++------ 1 file changed, 44 insertions(+), 11 deletions(-) diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index 7dc9b48d..a8b09af8 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -901,7 +901,6 @@ def merge_networks( ------- The ``ScopedKey`` of the new ``AlchemicalNetwork`` in the database. """ - # - Collect keyed chain representation for all alchemiscale networks try: network_keyed_chains: list[tuple[Scope, KeyedChain]] = [] @@ -928,17 +927,17 @@ def __eq__(self, other): return other is self.transformation return other.transformation is self.transformation - def add_known_scoped_key_results(self, key, scope): + def add_known_scoped_key_results(self, key, scope, tx): known_scoped_key = ScopedKey(gufe_key=GufeKey(key), **scope.to_dict()) - self.update_task_tree(known_scoped_key) + self.update_task_tree(known_scoped_key, tx) - def update_task_tree(self, tf_scoped_key): - nonlocal tx + def update_task_tree(self, tf_scoped_key, tx): + # TODO: filter out tasks that are not wanted before returning them query = """ - MATCH (task:Task)-[:PERFORMS]->(:Transformation {`_scoped_key`: $tf_scoped_key}) + MATCH (task:Task)-[:PERFORMS]->(:Transformation|NonTransformation {`_scoped_key`: $tf_scoped_key}) OPTIONAL MATCH (task)-[:EXTENDS]->(extended_task:Task) - OPTIONAL MATCH (task)<-[:RESULTS_IN]-(pdrr:ProtocolDAGResultRef) - RETURN task, extended_task._scoped_key as etask_sk, pdrr + OPTIONAL MATCH (task)-[:RESULTS_IN]->(pdrr:ProtocolDAGResultRef) + RETURN task, extended_task as extended_task, collect(pdrr) as pdrrs """ results = ( tx.run(query, tf_scoped_key=str(tf_scoped_key)) @@ -948,6 +947,37 @@ def update_task_tree(self, tf_scoped_key): self.task_tree.extend(results) + def to_subgraph(self, target_scope, statestore): + if not self.task_tree: + return Subgraph() + + _, tf_node, _ = statestore._keyed_chain_to_subgraph(KeyedChain.from_gufe(self.transformation), target_scope) + subgraph = Subgraph() | tf_node + + scope_props = { + "_org": target_scope.org, + "_campaign": target_scope.campaign, + "_project": target_scope.project, + } + + def record_to_node(record): + scoped_key = record["_scoped_key"] + scoped_key = ScopedKey(gufe_key=record["_gufe_key"], **scope.to_dict()) + return Node(*record.labels, **record._properties | scope_props | {"_scoped_key": str(scoped_key)}) + + for record in self.task_tree: + etask_record = record["extended_task"] + pdrr_records = record["pdrrs"] + task_node = record_to_node(record["task"]) + etask_node = None if not record["extended_task"] else record_to_node(record["extended_task"]) + if etask_node: + subgraph |= Relationship.type("EXTENDS")(etask_node, task_node, **scope_props) + for pdrr_record in record["pdrrs"]: + pdrr_node = record_to_node(pdrr_record) + subgraph |= Relationship.type("RESULTS_IN")(task_node, pdrr_node, **scope_props) + + return subgraph + from gufe.tokenization import key_decode_dependencies # TODO: upstream to gufe @@ -982,13 +1012,16 @@ def kc_to_gufe(kc, gts): except ValueError: data = TransformationData(transformation) transformation_data.append(data) - data.add_known_scoped_key_results(database_key, network_scope) + data.add_known_scoped_key_results(database_key, network_scope, tx) # - Collect all transformation gufe objects and collect into a new set of edges new_edges = [td.transformation for td in transformation_data] # - Make new alchemiscale network with these edges combined_alchemical_network = AlchemicalNetwork(edges=new_edges, name=name) - # - assemble the network - an_sk, _, _ = self.assemble_network(combined_alchemical_network, scope, tx=tx) + an_subgraph, an_node, an_sk = self._keyed_chain_to_subgraph(KeyedChain.from_gufe(combined_alchemical_network), scope) + an_subgraph |= self.create_network_mark_subgraph(an_node)[0] | self.create_taskhub_subgraph(an_node)[0] + for td in transformation_data: + an_subgraph |= td.to_subgraph(scope, self) + merge_subgraph(tx, an_subgraph, "GufeTokenizable", "_scoped_key") return an_sk def get_network_state(self, networks: list[ScopedKey]) -> list[str | None]: From 50322959e1b7c7374f2cbbb6e9df79de5f017346 Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Tue, 7 Oct 2025 10:07:13 -0400 Subject: [PATCH 06/19] Update task trees with one query and cache subchains for performance increase --- alchemiscale/storage/statestore.py | 41 ++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index a8b09af8..1e8163f6 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -921,37 +921,44 @@ class TransformationData: task_tree: list = field( default_factory=list ) # flat represenation of task tree with results + known_scoped_keys: list = field( + default_factory=list + ) def __eq__(self, other): if isinstance(other, (Transformation, NonTransformation)): return other is self.transformation return other.transformation is self.transformation - def add_known_scoped_key_results(self, key, scope, tx): - known_scoped_key = ScopedKey(gufe_key=GufeKey(key), **scope.to_dict()) - self.update_task_tree(known_scoped_key, tx) + def add_known_scoped_key(self, key, scope): + self.known_scoped_keys.append(ScopedKey(gufe_key=GufeKey(key), **scope.to_dict())) - def update_task_tree(self, tf_scoped_key, tx): + @staticmethod + def update_task_trees(transformation_data: list, tx): + key_to_data_map = {str(td.transformation.key): td for td in transformation_data} + transformation_sk_pairs = [[str(td.transformation.key), str(sk)] for td in transformation_data for sk in td.known_scoped_keys] # TODO: filter out tasks that are not wanted before returning them query = """ - MATCH (task:Task)-[:PERFORMS]->(:Transformation|NonTransformation {`_scoped_key`: $tf_scoped_key}) + UNWIND $tf_sk_pairs as pairs + WITH pairs[0] AS tf_key, pairs[1] AS tf_scoped_key + MATCH (task:Task)-[:PERFORMS]->(:Transformation|NonTransformation {`_scoped_key`: tf_scoped_key}) OPTIONAL MATCH (task)-[:EXTENDS]->(extended_task:Task) OPTIONAL MATCH (task)-[:RESULTS_IN]->(pdrr:ProtocolDAGResultRef) - RETURN task, extended_task as extended_task, collect(pdrr) as pdrrs + RETURN tf_key, task, extended_task as extended_task, collect(pdrr) as pdrrs """ results = ( - tx.run(query, tf_scoped_key=str(tf_scoped_key)) + tx.run(query, tf_sk_pairs=transformation_sk_pairs) .to_eager_result() .records ) - self.task_tree.extend(results) + for record in results: + key_to_data_map[record["tf_key"]].task_tree.append(record) - def to_subgraph(self, target_scope, statestore): + def to_subgraph(self, target_scope, statestore, subchain_cache): if not self.task_tree: return Subgraph() - - _, tf_node, _ = statestore._keyed_chain_to_subgraph(KeyedChain.from_gufe(self.transformation), target_scope) + _, tf_node, _ = statestore._keyed_chain_to_subgraph(subchain_cache[self.transformation], target_scope) subgraph = Subgraph() | tf_node scope_props = { @@ -994,7 +1001,11 @@ def kc_to_gufe(kc, gts): # due to minor version changes between serialization # times. These keys are then used to get all results for Tasks # associated with them. + # + # TODO: this is currently slow from all of the to_gufe calls + # on subchains that contain more information than is necessary transformation_data: list[TransformationData] = [] + subchain_cache = {} for network_scope, network_keyed_chain in network_keyed_chains: gts = {} for index, (database_key, _) in enumerate(network_keyed_chain): @@ -1006,21 +1017,25 @@ def kc_to_gufe(kc, gts): # get the tokenizable at the index along with all previous data subchain = KeyedChain(network_keyed_chain[: index + 1]) transformation = kc_to_gufe(subchain, gts) + subchain_cache[transformation] = subchain try: index = transformation_data.index(transformation) data = transformation_data[index] except ValueError: data = TransformationData(transformation) transformation_data.append(data) - data.add_known_scoped_key_results(database_key, network_scope, tx) + data.add_known_scoped_key(database_key, network_scope) # - Collect all transformation gufe objects and collect into a new set of edges + TransformationData.update_task_trees(transformation_data, tx) new_edges = [td.transformation for td in transformation_data] # - Make new alchemiscale network with these edges combined_alchemical_network = AlchemicalNetwork(edges=new_edges, name=name) an_subgraph, an_node, an_sk = self._keyed_chain_to_subgraph(KeyedChain.from_gufe(combined_alchemical_network), scope) an_subgraph |= self.create_network_mark_subgraph(an_node)[0] | self.create_taskhub_subgraph(an_node)[0] + task_subgraph = Subgraph() for td in transformation_data: - an_subgraph |= td.to_subgraph(scope, self) + task_subgraph |= td.to_subgraph(scope, self, subchain_cache) + an_subgraph |= task_subgraph merge_subgraph(tx, an_subgraph, "GufeTokenizable", "_scoped_key") return an_sk From d4f3dda6d9acc959bac518c619a681b2c99acb4b Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Tue, 7 Oct 2025 10:20:27 -0400 Subject: [PATCH 07/19] Remove need for methods to be chainable --- alchemiscale/storage/statestore.py | 94 ++++++++++++++++++------------ 1 file changed, 58 insertions(+), 36 deletions(-) diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index 1e8163f6..f7eb25b8 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -781,13 +781,11 @@ def _topological_sort(graph_data: dict[GufeKey, tuple[Node, list[GufeKey]]]): graph_data.pop(rk) return L - @chainable def assemble_network( self, network: AlchemicalNetwork, scope: Scope, state: NetworkStateEnum | str = NetworkStateEnum.active, - tx=None, ) -> tuple[ScopedKey, ScopedKey, ScopedKey]: """Create all nodes and relationships needed for an AlchemicalNetwork represented in an alchemiscale state store. @@ -813,7 +811,8 @@ def assemble_network( subgraph = nw_subgraph | th_subgraph | nm_subgraph - merge_subgraph(tx, subgraph, "GufeTokenizable", "_scoped_key") + with self.transaction() as tx: + merge_subgraph(tx, subgraph, "GufeTokenizable", "_scoped_key") return nw_sk, th_sk, nm_sk @@ -877,14 +876,12 @@ def delete_network( """ raise NotImplementedError - @chainable def merge_networks( self, network_scoped_keys: list[ScopedKey], name: str, scope: Scope, clone_incomplete_tasks=False, - tx=None, ) -> ScopedKey: """Merge multiple ``AlchemicalNetwork`` nodes into a new ``AlchemicalNetwork``. @@ -921,9 +918,7 @@ class TransformationData: task_tree: list = field( default_factory=list ) # flat represenation of task tree with results - known_scoped_keys: list = field( - default_factory=list - ) + known_scoped_keys: list = field(default_factory=list) def __eq__(self, other): if isinstance(other, (Transformation, NonTransformation)): @@ -931,12 +926,20 @@ def __eq__(self, other): return other.transformation is self.transformation def add_known_scoped_key(self, key, scope): - self.known_scoped_keys.append(ScopedKey(gufe_key=GufeKey(key), **scope.to_dict())) + self.known_scoped_keys.append( + ScopedKey(gufe_key=GufeKey(key), **scope.to_dict()) + ) @staticmethod - def update_task_trees(transformation_data: list, tx): - key_to_data_map = {str(td.transformation.key): td for td in transformation_data} - transformation_sk_pairs = [[str(td.transformation.key), str(sk)] for td in transformation_data for sk in td.known_scoped_keys] + def update_task_trees(transformation_data: list, statestore): + key_to_data_map = { + str(td.transformation.key): td for td in transformation_data + } + transformation_sk_pairs = [ + [str(td.transformation.key), str(sk)] + for td in transformation_data + for sk in td.known_scoped_keys + ] # TODO: filter out tasks that are not wanted before returning them query = """ UNWIND $tf_sk_pairs as pairs @@ -946,11 +949,9 @@ def update_task_trees(transformation_data: list, tx): OPTIONAL MATCH (task)-[:RESULTS_IN]->(pdrr:ProtocolDAGResultRef) RETURN tf_key, task, extended_task as extended_task, collect(pdrr) as pdrrs """ - results = ( - tx.run(query, tf_sk_pairs=transformation_sk_pairs) - .to_eager_result() - .records - ) + results = statestore.execute_query( + query, tf_sk_pairs=transformation_sk_pairs + ).records for record in results: key_to_data_map[record["tf_key"]].task_tree.append(record) @@ -958,7 +959,9 @@ def update_task_trees(transformation_data: list, tx): def to_subgraph(self, target_scope, statestore, subchain_cache): if not self.task_tree: return Subgraph() - _, tf_node, _ = statestore._keyed_chain_to_subgraph(subchain_cache[self.transformation], target_scope) + _, tf_node, _ = statestore._keyed_chain_to_subgraph( + subchain_cache[self.transformation], target_scope + ) subgraph = Subgraph() | tf_node scope_props = { @@ -969,19 +972,34 @@ def to_subgraph(self, target_scope, statestore, subchain_cache): def record_to_node(record): scoped_key = record["_scoped_key"] - scoped_key = ScopedKey(gufe_key=record["_gufe_key"], **scope.to_dict()) - return Node(*record.labels, **record._properties | scope_props | {"_scoped_key": str(scoped_key)}) + scoped_key = ScopedKey( + gufe_key=record["_gufe_key"], **scope.to_dict() + ) + return Node( + *record.labels, + **record._properties + | scope_props + | {"_scoped_key": str(scoped_key)}, + ) for record in self.task_tree: etask_record = record["extended_task"] pdrr_records = record["pdrrs"] task_node = record_to_node(record["task"]) - etask_node = None if not record["extended_task"] else record_to_node(record["extended_task"]) + etask_node = ( + None + if not record["extended_task"] + else record_to_node(record["extended_task"]) + ) if etask_node: - subgraph |= Relationship.type("EXTENDS")(etask_node, task_node, **scope_props) + subgraph |= Relationship.type("EXTENDS")( + etask_node, task_node, **scope_props + ) for pdrr_record in record["pdrrs"]: pdrr_node = record_to_node(pdrr_record) - subgraph |= Relationship.type("RESULTS_IN")(task_node, pdrr_node, **scope_props) + subgraph |= Relationship.type("RESULTS_IN")( + task_node, pdrr_node, **scope_props + ) return subgraph @@ -1026,17 +1044,23 @@ def kc_to_gufe(kc, gts): transformation_data.append(data) data.add_known_scoped_key(database_key, network_scope) # - Collect all transformation gufe objects and collect into a new set of edges - TransformationData.update_task_trees(transformation_data, tx) + TransformationData.update_task_trees(transformation_data, self) new_edges = [td.transformation for td in transformation_data] # - Make new alchemiscale network with these edges combined_alchemical_network = AlchemicalNetwork(edges=new_edges, name=name) - an_subgraph, an_node, an_sk = self._keyed_chain_to_subgraph(KeyedChain.from_gufe(combined_alchemical_network), scope) - an_subgraph |= self.create_network_mark_subgraph(an_node)[0] | self.create_taskhub_subgraph(an_node)[0] + an_subgraph, an_node, an_sk = self._keyed_chain_to_subgraph( + KeyedChain.from_gufe(combined_alchemical_network), scope + ) + an_subgraph |= ( + self.create_network_mark_subgraph(an_node)[0] + | self.create_taskhub_subgraph(an_node)[0] + ) task_subgraph = Subgraph() for td in transformation_data: task_subgraph |= td.to_subgraph(scope, self, subchain_cache) an_subgraph |= task_subgraph - merge_subgraph(tx, an_subgraph, "GufeTokenizable", "_scoped_key") + with self.transaction() as tx: + merge_subgraph(tx, an_subgraph, "GufeTokenizable", "_scoped_key") return an_sk def get_network_state(self, networks: list[ScopedKey]) -> list[str | None]: @@ -2943,9 +2967,9 @@ def task_count(task_dict: dict): ## tasks - @chainable def _validate_extends_tasks( - self, task_list, tx=None + self, + task_list, ) -> dict[str, tuple[Node, str]]: if not task_list: @@ -2957,7 +2981,7 @@ def _validate_extends_tasks( return t, tf._scoped_key as tf_sk """ - results = tx.run(q, task_list=list(map(str, task_list))).to_eager_result() + results = self.execute_query(q, task_list=list(map(str, task_list))) nodes = {} @@ -2977,13 +3001,11 @@ def _validate_extends_tasks( return nodes - @chainable def create_tasks( self, transformations: list[ScopedKey], extends: list[ScopedKey | None] | None = None, creator: str | None = None, - tx=None, ) -> list[ScopedKey]: """Create Tasks for the given Transformations. @@ -3039,7 +3061,6 @@ def create_tasks( extends_nodes = self._validate_extends_tasks( [_extends for _extends in extends if _extends is not None], - tx=tx, ) subgraph = Subgraph() @@ -3060,9 +3081,9 @@ def create_tasks( RETURN n """ - results = tx.run( + results = self.execute_query( q, transformation_subset=list(map(str, transformation_subset)) - ).to_eager_result() + ) transformation_nodes = {} for record in results.records: @@ -3111,7 +3132,8 @@ def create_tasks( _project=scope.project, ) - merge_subgraph(tx, subgraph, "GufeTokenizable", "_scoped_key") + with self.transaction() as tx: + merge_subgraph(tx, subgraph, "GufeTokenizable", "_scoped_key") return sks From b9a527e6172767637ee9a280c1d73ca9a93bf424 Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Tue, 7 Oct 2025 11:34:34 -0400 Subject: [PATCH 08/19] Change confusing, but beneign clashing names --- alchemiscale/storage/statestore.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index f7eb25b8..0a3ac8fa 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -1037,8 +1037,8 @@ def kc_to_gufe(kc, gts): transformation = kc_to_gufe(subchain, gts) subchain_cache[transformation] = subchain try: - index = transformation_data.index(transformation) - data = transformation_data[index] + idx = transformation_data.index(transformation) + data = transformation_data[idx] except ValueError: data = TransformationData(transformation) transformation_data.append(data) From 96b527eafa4239d74b00e38ff6e48ea2ee6c7b44 Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Tue, 7 Oct 2025 11:35:43 -0400 Subject: [PATCH 09/19] Merge directly into an_subgraph There is hardly any difference in performance. --- alchemiscale/storage/statestore.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index 0a3ac8fa..568be782 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -1055,10 +1055,8 @@ def kc_to_gufe(kc, gts): self.create_network_mark_subgraph(an_node)[0] | self.create_taskhub_subgraph(an_node)[0] ) - task_subgraph = Subgraph() for td in transformation_data: - task_subgraph |= td.to_subgraph(scope, self, subchain_cache) - an_subgraph |= task_subgraph + an_subgraph |= td.to_subgraph(scope, self, subchain_cache) with self.transaction() as tx: merge_subgraph(tx, an_subgraph, "GufeTokenizable", "_scoped_key") return an_sk From 8f3b17fa766fa9929c9c3918febb4182517b1e54 Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Thu, 16 Oct 2025 13:26:43 -0400 Subject: [PATCH 10/19] Only create tasks that were complete --- alchemiscale/storage/statestore.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index 568be782..e496e7d6 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -7,6 +7,7 @@ import abc import bisect import datetime +from dataclasses import dataclass, field from contextlib import contextmanager import json import re @@ -881,7 +882,6 @@ def merge_networks( network_scoped_keys: list[ScopedKey], name: str, scope: Scope, - clone_incomplete_tasks=False, ) -> ScopedKey: """Merge multiple ``AlchemicalNetwork`` nodes into a new ``AlchemicalNetwork``. @@ -910,8 +910,6 @@ def merge_networks( f"ScopedKey ({network_scoped_key}) not found in the database." ) - from dataclasses import dataclass, field - @dataclass class TransformationData: transformation: Transformation @@ -920,11 +918,6 @@ class TransformationData: ) # flat represenation of task tree with results known_scoped_keys: list = field(default_factory=list) - def __eq__(self, other): - if isinstance(other, (Transformation, NonTransformation)): - return other is self.transformation - return other.transformation is self.transformation - def add_known_scoped_key(self, key, scope): self.known_scoped_keys.append( ScopedKey(gufe_key=GufeKey(key), **scope.to_dict()) @@ -940,11 +933,10 @@ def update_task_trees(transformation_data: list, statestore): for td in transformation_data for sk in td.known_scoped_keys ] - # TODO: filter out tasks that are not wanted before returning them query = """ UNWIND $tf_sk_pairs as pairs WITH pairs[0] AS tf_key, pairs[1] AS tf_scoped_key - MATCH (task:Task)-[:PERFORMS]->(:Transformation|NonTransformation {`_scoped_key`: tf_scoped_key}) + MATCH (task:Task {status: "complete"})-[:PERFORMS]->(:Transformation|NonTransformation {`_scoped_key`: tf_scoped_key}) OPTIONAL MATCH (task)-[:EXTENDS]->(extended_task:Task) OPTIONAL MATCH (task)-[:RESULTS_IN]->(pdrr:ProtocolDAGResultRef) RETURN tf_key, task, extended_task as extended_task, collect(pdrr) as pdrrs @@ -959,6 +951,7 @@ def update_task_trees(transformation_data: list, statestore): def to_subgraph(self, target_scope, statestore, subchain_cache): if not self.task_tree: return Subgraph() + _, tf_node, _ = statestore._keyed_chain_to_subgraph( subchain_cache[self.transformation], target_scope ) From f566d4f1dfc1445a6404153973e59be108b892a2 Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Thu, 16 Oct 2025 14:37:59 -0400 Subject: [PATCH 11/19] Avoid soft failure for setting tasks to complete/error --- .../integration/storage/test_statestore.py | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/alchemiscale/tests/integration/storage/test_statestore.py b/alchemiscale/tests/integration/storage/test_statestore.py index 2469fc6e..315ab474 100644 --- a/alchemiscale/tests/integration/storage/test_statestore.py +++ b/alchemiscale/tests/integration/storage/test_statestore.py @@ -154,11 +154,11 @@ def project_scope(iteration): # clutter down below def set_result(_slice, *, ok: bool): nonlocal task_sks, transformation_sks - for task_sk, transformation_sk in zip( - task_sks[_slice], transformation_sks[_slice] - ): + for task_sk in task_sks[_slice]: pdrr = ProtocolDAGResultRef( - obj_key=transformation_sk.gufe_key, scope=task_sk.scope, ok=ok + obj_key=f"ProtocolDAGResult-{uuid.uuid4()}", + scope=task_sk.scope, + ok=ok, ) n4js.set_task_result(task_sk, pdrr) @@ -183,9 +183,11 @@ def set_result(_slice, *, ok: bool): n4js.set_task_waiting(task_sks[5:]) n4js.action_tasks(task_sks[5:], th_sk) + n4js.set_task_running(task_sks[4:5]) n4js.set_task_error(task_sks[4:5]) n4js.action_tasks(task_sks[4:5], th_sk) + n4js.set_task_running(task_sks[:2]) n4js.set_task_complete(task_sks[:2]) set_result(slice(None, 2), ok=True) @@ -208,6 +210,7 @@ def set_result(_slice, *, ok: bool): for transformation in transformations_common ] task_sks = n4js.create_tasks(transformation_sks) + n4js.set_task_running(task_sks) n4js.set_task_complete(task_sks) set_result(slice(None), ok=True) @@ -216,6 +219,7 @@ def set_result(_slice, *, ok: bool): n4js.set_task_waiting(task_sks[0:1]) n4js.set_task_running(task_sks[1:2]) + n4js.set_task_running(task_sks[2:3]) n4js.set_task_complete(task_sks[2:3]) set_result(slice(2, 3), ok=True) @@ -238,6 +242,7 @@ def set_result(_slice, *, ok: bool): task_sks = n4js.create_tasks(transformation_sks) n4js.set_task_waiting(task_sks[:-1]) + n4js.set_task_running(task_sks[-1:]) n4js.set_task_complete(task_sks[-1:]) set_result(slice(-1, None), ok=True) @@ -254,6 +259,16 @@ def set_result(_slice, *, ok: bool): n4js.get_gufe(network_sks[1]).edges ) + # we expect 7 pdrrs from the completed tasks + results = n4js.execute_query( + """ + MATCH (pdrr: ProtocolDAGResultRef {`_project`: $project}) + RETURN pdrr + """, + project=new_scope.project, + ) + assert len(results.records) == 7 + def test_set_network_state(self, n4js, network_tyk2, scope_test): valid_states = [state.value for state in NetworkStateEnum] network_sks = [] From 1d8dfde8d74e0243c53f8ffa335f86cd992ed1e2 Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Thu, 16 Oct 2025 14:47:37 -0400 Subject: [PATCH 12/19] Also include error task ProtocolDAGResultRefs --- alchemiscale/storage/statestore.py | 3 ++- .../tests/integration/storage/test_statestore.py | 12 ++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index e496e7d6..0dae93fc 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -936,7 +936,8 @@ def update_task_trees(transformation_data: list, statestore): query = """ UNWIND $tf_sk_pairs as pairs WITH pairs[0] AS tf_key, pairs[1] AS tf_scoped_key - MATCH (task:Task {status: "complete"})-[:PERFORMS]->(:Transformation|NonTransformation {`_scoped_key`: tf_scoped_key}) + MATCH (task:Task)-[:PERFORMS]->(:Transformation|NonTransformation {`_scoped_key`: tf_scoped_key}) + WHERE task.status IN ["complete", "error"] OPTIONAL MATCH (task)-[:EXTENDS]->(extended_task:Task) OPTIONAL MATCH (task)-[:RESULTS_IN]->(pdrr:ProtocolDAGResultRef) RETURN tf_key, task, extended_task as extended_task, collect(pdrr) as pdrrs diff --git a/alchemiscale/tests/integration/storage/test_statestore.py b/alchemiscale/tests/integration/storage/test_statestore.py index 315ab474..f8f1c4c0 100644 --- a/alchemiscale/tests/integration/storage/test_statestore.py +++ b/alchemiscale/tests/integration/storage/test_statestore.py @@ -263,12 +263,24 @@ def set_result(_slice, *, ok: bool): results = n4js.execute_query( """ MATCH (pdrr: ProtocolDAGResultRef {`_project`: $project}) + WHERE pdrr.ok = True RETURN pdrr """, project=new_scope.project, ) assert len(results.records) == 7 + # we expect 1 pdrrs from the errored task + results = n4js.execute_query( + """ + MATCH (pdrr: ProtocolDAGResultRef {`_project`: $project}) + WHERE pdrr.ok = False + RETURN pdrr + """, + project=new_scope.project, + ) + assert len(results.records) == 1 + def test_set_network_state(self, n4js, network_tyk2, scope_test): valid_states = [state.value for state in NetworkStateEnum] network_sks = [] From f19985f9c5c939c5cdce41dfabf8761d037aa942 Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Mon, 20 Oct 2025 10:13:11 -0400 Subject: [PATCH 13/19] Add comments to merge_networks --- alchemiscale/storage/statestore.py | 69 +++++++++++++++++++++++++----- 1 file changed, 59 insertions(+), 10 deletions(-) diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index 0dae93fc..2845ef49 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -25,7 +25,13 @@ Protocol, ) from gufe.settings import SettingsBaseModel -from gufe.tokenization import GufeTokenizable, GufeKey, JSON_HANDLER, KeyedChain +from gufe.tokenization import ( + GufeTokenizable, + GufeKey, + JSON_HANDLER, + KeyedChain, + key_decode_dependencies, +) from gufe.protocols import ProtocolUnitFailure from neo4j import Transaction, GraphDatabase, Driver @@ -898,7 +904,7 @@ def merge_networks( ------- The ``ScopedKey`` of the new ``AlchemicalNetwork`` in the database. """ - # - Collect keyed chain representation for all alchemiscale networks + # Collect keyed chain representation for all alchemiscale networks try: network_keyed_chains: list[tuple[Scope, KeyedChain]] = [] for network_scoped_key in network_scoped_keys: @@ -910,8 +916,23 @@ def merge_networks( f"ScopedKey ({network_scoped_key}) not found in the database." ) + # Helper dataclass for managing transformations and tasks that + # perform them. @dataclass class TransformationData: + """ + transformation + + task_tree + flat representation of tasks associated with the transformation, each entry has: + - transformation gufe key + - full task neo4j node + - optional: full task neo4j node that this task extends + - list of ProtocolDAGResultRef nodes + known_scoped_keys + All scoped keys that represent the transformation across networks + """ + transformation: Transformation task_tree: list = field( default_factory=list @@ -925,9 +946,16 @@ def add_known_scoped_key(self, key, scope): @staticmethod def update_task_trees(transformation_data: list, statestore): + """Given a list of TransformationData, extract all + necessary info from Neo4j and load the task trees. + """ + key_to_data_map = { str(td.transformation.key): td for td in transformation_data } + # prepare for unwind claus, include transformation key + # for updating each entry of transformation_data for + # each scoped key transformation_sk_pairs = [ [str(td.transformation.key), str(sk)] for td in transformation_data @@ -950,9 +978,26 @@ def update_task_trees(transformation_data: list, statestore): key_to_data_map[record["tf_key"]].task_tree.append(record) def to_subgraph(self, target_scope, statestore, subchain_cache): + """Create a subgraph where the "central" node is the + transformation and iteratively add Task and PDRRs + nodes with their corresponding relationships. + + """ + # if there are no tasks, can return an subgraph, no + # need to make transformation node because it exists + # already outside of this method if not self.task_tree: return Subgraph() + # create the transformation node from its keyed chain, + # this allows later nodes to be easily connected to + # the subgraph outside of this method. subchain_cache + # is a nonlocal dict[Transformation, KeyedChain] that + # removes the need to find the transformation within a + # larger KeyedChain. This will likely be unnecessary + # with later versions of gufe when decode_subchains is + # added. + # (https://github.com/OpenFreeEnergy/gufe/pull/634) _, tf_node, _ = statestore._keyed_chain_to_subgraph( subchain_cache[self.transformation], target_scope ) @@ -965,7 +1010,7 @@ def to_subgraph(self, target_scope, statestore, subchain_cache): } def record_to_node(record): - scoped_key = record["_scoped_key"] + # create node from a neo4j record with updated scoped key scoped_key = ScopedKey( gufe_key=record["_gufe_key"], **scope.to_dict() ) @@ -976,10 +1021,14 @@ def record_to_node(record): | {"_scoped_key": str(scoped_key)}, ) + # process each task found. Each record represents a + # single task. for record in self.task_tree: etask_record = record["extended_task"] pdrr_records = record["pdrrs"] + # update task node to have new scoped key task_node = record_to_node(record["task"]) + # create the task node this task extends if it exists etask_node = ( None if not record["extended_task"] @@ -989,6 +1038,7 @@ def record_to_node(record): subgraph |= Relationship.type("EXTENDS")( etask_node, task_node, **scope_props ) + # clone all result refs for the task for pdrr_record in record["pdrrs"]: pdrr_node = record_to_node(pdrr_record) subgraph |= Relationship.type("RESULTS_IN")( @@ -997,8 +1047,6 @@ def record_to_node(record): return subgraph - from gufe.tokenization import key_decode_dependencies - # TODO: upstream to gufe def kc_to_gufe(kc, gts): for gufe_key, keyed_dict in kc: @@ -1013,9 +1061,6 @@ def kc_to_gufe(kc, gts): # due to minor version changes between serialization # times. These keys are then used to get all results for Tasks # associated with them. - # - # TODO: this is currently slow from all of the to_gufe calls - # on subchains that contain more information than is necessary transformation_data: list[TransformationData] = [] subchain_cache = {} for network_scope, network_keyed_chain in network_keyed_chains: @@ -1037,20 +1082,24 @@ def kc_to_gufe(kc, gts): data = TransformationData(transformation) transformation_data.append(data) data.add_known_scoped_key(database_key, network_scope) - # - Collect all transformation gufe objects and collect into a new set of edges + # Collect all transformation gufe objects and collect into a new set of edges TransformationData.update_task_trees(transformation_data, self) new_edges = [td.transformation for td in transformation_data] - # - Make new alchemiscale network with these edges + # Make new alchemiscale network with these edges combined_alchemical_network = AlchemicalNetwork(edges=new_edges, name=name) an_subgraph, an_node, an_sk = self._keyed_chain_to_subgraph( KeyedChain.from_gufe(combined_alchemical_network), scope ) + # create and fold in taskhub and network mark supporting nodes an_subgraph |= ( self.create_network_mark_subgraph(an_node)[0] | self.create_taskhub_subgraph(an_node)[0] ) + # create and fold in all task and results data for td in transformation_data: an_subgraph |= td.to_subgraph(scope, self, subchain_cache) + + # merge the new network into neo4j with self.transaction() as tx: merge_subgraph(tx, an_subgraph, "GufeTokenizable", "_scoped_key") return an_sk From 9374b9be90bbcc7867b56d1ce49a19e8d602833f Mon Sep 17 00:00:00 2001 From: David Dotson Date: Tue, 11 Nov 2025 21:38:10 -0700 Subject: [PATCH 14/19] Typo fix and formatting --- alchemiscale/storage/statestore.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index 2845ef49..0126f5cd 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -904,7 +904,7 @@ def merge_networks( ------- The ``ScopedKey`` of the new ``AlchemicalNetwork`` in the database. """ - # Collect keyed chain representation for all alchemiscale networks + # Collect keyed chain representation for all alchemical networks try: network_keyed_chains: list[tuple[Scope, KeyedChain]] = [] for network_scoped_key in network_scoped_keys: @@ -1082,6 +1082,7 @@ def kc_to_gufe(kc, gts): data = TransformationData(transformation) transformation_data.append(data) data.add_known_scoped_key(database_key, network_scope) + # Collect all transformation gufe objects and collect into a new set of edges TransformationData.update_task_trees(transformation_data, self) new_edges = [td.transformation for td in transformation_data] From 6859746212ec50e32c8c62a1b138ba747b047e45 Mon Sep 17 00:00:00 2001 From: David Dotson Date: Mon, 8 Jun 2026 17:19:52 -0600 Subject: [PATCH 15/19] Expose merge_networks on the user-facing client and API Add the client + API surface for the merge_networks database operation introduced in this branch. Users can now combine multiple existing AlchemicalNetworks into a single new AlchemicalNetwork through AlchemiscaleClient, with completed and errored Tasks (and their ProtocolDAGResultRefs) carried over so previously-computed results do not need to be re-run. - POST /networks/merge endpoint validates the destination Scope and each source network's Scope against the caller's token, then defers to Neo4jStore.merge_networks. - AlchemiscaleClient.merge_networks wraps the endpoint and guards against wildcard Scopes, empty input, and non-AlchemicalNetwork ScopedKeys client-side. - Integration tests cover the happy path, scope authorization (both bad destination and bad source), and that Tasks + PDRRs in complete/error state are cloned into the new scope and reachable through get_network_tasks. Co-Authored-By: Claude Opus 4.7 (1M context) --- alchemiscale/interface/api.py | 47 +++++ alchemiscale/interface/client.py | 83 ++++++++ .../interface/client/test_client.py | 197 ++++++++++++++++++ .../tests/integration/interface/test_api.py | 91 ++++++++ news/issue-221.rst | 24 +++ 5 files changed, 442 insertions(+) create mode 100644 news/issue-221.rst diff --git a/alchemiscale/interface/api.py b/alchemiscale/interface/api.py index 341069e9..1777e6d4 100644 --- a/alchemiscale/interface/api.py +++ b/alchemiscale/interface/api.py @@ -138,6 +138,53 @@ async def create_network( return an_sk +@router.post("/networks/merge", response_model=ScopedKey) +def merge_networks( + *, + networks: list[str] = Body(embed=True), + name: str = Body(embed=True), + scope: dict = Body(embed=True), + n4js: Neo4jStore = Depends(get_n4js_depends), + token: TokenData = Depends(get_token_data_depends), +): + # validate the destination scope first + try: + target_scope = Scope(**scope) + except (TypeError, ValidationError) as e: + raise HTTPException( + status_code=http_status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=str(e), + ) + validate_scopes(target_scope, token) + + # validate each source network's scope is accessible to the token + network_sks = [] + for network in networks: + try: + network_sk = ScopedKey.from_str(network) + except ValueError as e: + raise HTTPException( + status_code=http_status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=e.args[0], + ) + validate_scopes(network_sk.scope, token) + network_sks.append(network_sk) + + try: + an_sk = n4js.merge_networks( + network_scoped_keys=network_sks, + name=name, + scope=target_scope, + ) + except ValueError as e: + raise HTTPException( + status_code=http_status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=e.args[0], + ) + + return an_sk + + @router.post("/bulk/networks/state/set") def set_networks_state( *, diff --git a/alchemiscale/interface/client.py b/alchemiscale/interface/client.py index f53859ba..d6215f43 100644 --- a/alchemiscale/interface/client.py +++ b/alchemiscale/interface/client.py @@ -187,6 +187,89 @@ def post(): return ScopedKey.from_dict(scoped_key) + def merge_networks( + self, + networks: list[ScopedKey], + name: str, + scope: Scope, + visualize: bool = True, + ) -> ScopedKey: + """Merge multiple existing AlchemicalNetworks into a new AlchemicalNetwork. + + The resulting AlchemicalNetwork contains the union of all + Transformations and NonTransformations from the source networks. + Existing Tasks for those transformations that are in ``complete`` or + ``error`` state are cloned into the new network's scope along with + their associated ProtocolDAGResultRefs, so previously-computed results + do not need to be re-run. + + The new AlchemicalNetwork is created in the active state. + + Parameters + ---------- + networks + The ScopedKeys of the AlchemicalNetworks to merge. The source + networks may live in different Scopes; the caller must have access + to each. + name + The name of the new AlchemicalNetwork. + scope + The Scope in which to create the new AlchemicalNetwork. + This must be a *specific* Scope; it must not contain wildcards. + visualize + If ``True``, show submission progress indicator. + + Returns + ------- + ScopedKey + The ScopedKey of the new, merged AlchemicalNetwork. + """ + if not scope.specific(): + raise ValueError( + f"`scope` '{scope}' contains wildcards ('*'); `scope` must be *specific*" + ) + + if not networks: + raise ValueError("`networks` must contain at least one ScopedKey") + + network_sks = [ + sk if isinstance(sk, ScopedKey) else ScopedKey.from_str(sk) + for sk in networks + ] + + for network_sk in network_sks: + if network_sk.qualname not in ("AlchemicalNetwork",): + raise ValueError( + f"ScopedKey '{network_sk}' does not refer to an AlchemicalNetwork" + ) + + data = dict( + networks=[str(sk) for sk in network_sks], + name=name, + scope=scope.to_dict(), + ) + + def post(): + return self._post_resource("/networks/merge", data) + + if visualize: + from rich.progress import Progress + + with Progress(*self._rich_waiting_columns(), transient=False) as progress: + task = progress.add_task( + f"Merging [bold]{len(network_sks)}[/bold] networks into " + f"[bold]'{name}'[/bold] in scope [bold]'{scope}'[/bold]...", + total=None, + ) + + scoped_key = post() + progress.start_task(task) + progress.update(task, total=1, completed=1) + else: + scoped_key = post() + + return ScopedKey.from_dict(scoped_key) + def set_network_state( self, network: ScopedKey, state: NetworkStateEnum | str ) -> ScopedKey | None: diff --git a/alchemiscale/tests/integration/interface/client/test_client.py b/alchemiscale/tests/integration/interface/client/test_client.py index 390acb71..b8dcf289 100644 --- a/alchemiscale/tests/integration/interface/client/test_client.py +++ b/alchemiscale/tests/integration/interface/client/test_client.py @@ -2,6 +2,7 @@ import datetime from time import sleep import os +import uuid from pathlib import Path from itertools import chain import json @@ -199,6 +200,202 @@ def test_create_network( # common with an existing network # user_client.create_network( + def test_merge_networks( + self, + scope_test, + multiple_scopes, + n4js_preloaded, + user_client: client.AlchemiscaleClient, + network_tyk2, + ): + # gather source AlchemicalNetwork ScopedKeys across all scopes + # n4js_preloaded creates `network_tyk2` and a trimmed copy named + # "incomplete" in each of `multiple_scopes` + source_sks = user_client.query_networks(state=None) + + # destination scope: a new project under the existing org/campaign + merge_scope = Scope( + org=scope_test.org, + campaign=scope_test.campaign, + project="merged_project", + ) + + merged_sk = user_client.merge_networks( + networks=source_sks, + name="merged_tyk2", + scope=merge_scope, + visualize=False, + ) + + assert isinstance(merged_sk, ScopedKey) + assert merged_sk.scope == merge_scope + assert merged_sk.qualname == "AlchemicalNetwork" + + # the merged network should exist + assert user_client.check_exists(merged_sk) + + # the merged network should appear in queries (defaults to active state) + all_active_sks = user_client.query_networks() + assert merged_sk in all_active_sks + + # the merged network should contain the union of all source edges; + # since `network_tyk2` is a superset of `incomplete`, the union equals + # `network_tyk2.edges` + merged_network = user_client.get_network(merged_sk) + assert merged_network.name == "merged_tyk2" + assert len(merged_network.edges) == len(network_tyk2.edges) + assert {t.key for t in merged_network.edges} == { + t.key for t in network_tyk2.edges + } + + def test_merge_networks_rejects_wildcard_scope( + self, + n4js_preloaded, + user_client: client.AlchemiscaleClient, + ): + source_sks = user_client.query_networks(state=None) + with pytest.raises(ValueError, match="wildcards"): + user_client.merge_networks( + networks=source_sks, + name="should_fail", + scope=Scope(org="test_org"), + visualize=False, + ) + + def test_merge_networks_rejects_empty_list( + self, + scope_test, + n4js_preloaded, + user_client: client.AlchemiscaleClient, + ): + with pytest.raises(ValueError, match="at least one"): + user_client.merge_networks( + networks=[], + name="should_fail", + scope=scope_test, + visualize=False, + ) + + def test_merge_networks_rejects_non_network_scoped_key( + self, + scope_test, + n4js_preloaded, + user_client: client.AlchemiscaleClient, + ): + # pass a Transformation ScopedKey rather than an AlchemicalNetwork one + tf_sks = user_client.query_transformations(scope=scope_test) + assert tf_sks + with pytest.raises(ValueError, match="does not refer to an AlchemicalNetwork"): + user_client.merge_networks( + networks=[tf_sks[0]], + name="should_fail", + scope=scope_test, + visualize=False, + ) + + def test_merge_networks_preserves_tasks_and_results( + self, + scope_test, + n4js_preloaded, + user_client: client.AlchemiscaleClient, + network_tyk2, + ): + """The merged network must carry over Tasks in complete/error state + along with their ProtocolDAGResultRefs, cloned into the new scope.""" + # pick a source network in scope_test + source_sks = user_client.query_networks(scope=scope_test, state=None) + assert source_sks + source_sk = source_sks[0] + + # create Tasks on two of its Transformations directly through n4js so + # we can drive them to completed/errored states with PDRRs without + # actually executing protocols + transformation_sks = n4js_preloaded.get_network_transformations(source_sk) + assert len(transformation_sks) >= 2 + + task_sks = n4js_preloaded.create_tasks(transformation_sks[:2]) + + # task 0: complete, ok result + n4js_preloaded.set_task_running(task_sks[:1]) + n4js_preloaded.set_task_complete(task_sks[:1]) + ok_pdrr = ProtocolDAGResultRef( + obj_key=f"ProtocolDAGResult-{uuid.uuid4()}", + scope=task_sks[0].scope, + ok=True, + ) + n4js_preloaded.set_task_result(task_sks[0], ok_pdrr) + + # task 1: error, failure result + n4js_preloaded.set_task_running(task_sks[1:2]) + n4js_preloaded.set_task_error(task_sks[1:2]) + err_pdrr = ProtocolDAGResultRef( + obj_key=f"ProtocolDAGResult-{uuid.uuid4()}", + scope=task_sks[1].scope, + ok=False, + ) + n4js_preloaded.set_task_result(task_sks[1], err_pdrr) + + # merge into a fresh project under the same org/campaign + merge_scope = Scope( + org=scope_test.org, + campaign=scope_test.campaign, + project="merged_with_results", + ) + merged_sk = user_client.merge_networks( + networks=[source_sk], + name="merged_with_results", + scope=merge_scope, + visualize=False, + ) + assert user_client.check_exists(merged_sk) + + # both Tasks should appear in the new scope, with their original statuses + task_records = n4js_preloaded.execute_query( + """ + MATCH (t:Task {`_project`: $project}) + RETURN t.status AS status + """, + project=merge_scope.project, + ).records + assert sorted(r["status"] for r in task_records) == ["complete", "error"] + + # one ok and one not-ok PDRR should appear in the new scope, with their + # original object keys preserved + pdrr_records = n4js_preloaded.execute_query( + """ + MATCH (pdrr:ProtocolDAGResultRef {`_project`: $project}) + RETURN pdrr.ok AS ok, pdrr.obj_key AS obj_key + """, + project=merge_scope.project, + ).records + assert len(pdrr_records) == 2 + oks = sorted(r["ok"] for r in pdrr_records) + assert oks == [False, True] + obj_keys = {r["obj_key"] for r in pdrr_records} + assert obj_keys == {ok_pdrr.obj_key, err_pdrr.obj_key} + + # cloned PDRRs must be wired to the cloned Tasks in the new scope + linked = n4js_preloaded.execute_query( + """ + MATCH (t:Task {`_project`: $project})-[:RESULTS_IN]-> + (pdrr:ProtocolDAGResultRef {`_project`: $project}) + RETURN t.status AS status, pdrr.ok AS ok + """, + project=merge_scope.project, + ).records + pairs = sorted((r["status"], r["ok"]) for r in linked) + assert pairs == [("complete", True), ("error", False)] + + # the cloned Tasks must be reachable from the merged AlchemicalNetwork + # via the standard PERFORMS traversal that the user-facing API uses; + # this catches any case where Tasks are written to the new scope but + # not wired back to their Transformations in the merged network + merged_task_sks = user_client.get_network_tasks(merged_sk) + assert len(merged_task_sks) == 2 + assert all(sk.scope == merge_scope for sk in merged_task_sks) + statuses = sorted(user_client.get_tasks_status(merged_task_sks)) + assert statuses == ["complete", "error"] + def test_check_exists( self, scope_test, diff --git a/alchemiscale/tests/integration/interface/test_api.py b/alchemiscale/tests/integration/interface/test_api.py index 21322c02..24be0be1 100644 --- a/alchemiscale/tests/integration/interface/test_api.py +++ b/alchemiscale/tests/integration/interface/test_api.py @@ -103,6 +103,97 @@ def test_create_network_bad_scope( assert str(bad_scope) in details assert str(scope_test) in details + def test_merge_networks( + self, n4js_preloaded, test_client, network_tyk2, scope_test + ): + n4js = n4js_preloaded + + # source networks in scope_test (pre-loaded by n4js_preloaded) + source_sks = n4js.query_networks(scope=scope_test) + assert len(source_sks) >= 2 + + # destination scope: a new project under the same org/campaign so the + # test_client's scope_test token has access + merge_scope_dict = { + "org": scope_test.org, + "campaign": scope_test.campaign, + "project": scope_test.project, + } + + headers = {"Content-type": "application/json"} + data = dict( + networks=[str(sk) for sk in source_sks], + name="api_merged", + scope=merge_scope_dict, + ) + jsondata = json.dumps(data, cls=JSON_HANDLER.encoder) + + response = test_client.post("/networks/merge", data=jsondata, headers=headers) + assert response.status_code == 200 + + merged_sk = ScopedKey(**response.json()) + assert merged_sk.scope == scope_test + assert merged_sk.gufe_key.startswith("AlchemicalNetwork-") + + # network should now be present in the database + assert n4js.check_existence(merged_sk) + + # merged network's union-of-edges equals network_tyk2's edge set, + # since one of the sources is network_tyk2 and the other is a strict subset + merged_network = n4js.get_gufe(merged_sk) + assert merged_network.name == "api_merged" + assert {t.key for t in merged_network.edges} == { + t.key for t in network_tyk2.edges + } + + def test_merge_networks_bad_scope( + self, n4js_preloaded, test_client, network_tyk2, scope_test, multiple_scopes + ): + # destination scope the test_client's token does not have access to + bad_scope = multiple_scopes[1] + assert bad_scope != scope_test + + source_sks = n4js_preloaded.query_networks(scope=scope_test) + assert source_sks + + headers = {"Content-type": "application/json"} + data = dict( + networks=[str(sk) for sk in source_sks], + name="should_fail", + scope=bad_scope.to_dict(), + ) + jsondata = json.dumps(data, cls=JSON_HANDLER.encoder) + + response = test_client.post("/networks/merge", data=jsondata, headers=headers) + assert response.status_code == 401 + details = response.json() + assert "detail" in details + assert str(bad_scope) in details["detail"] + + def test_merge_networks_bad_source_scope( + self, n4js_preloaded, test_client, network_tyk2, scope_test, multiple_scopes + ): + # source network in a scope the test_client's token does not authorize + unauth_scope = multiple_scopes[1] + assert unauth_scope != scope_test + + unauth_source_sks = n4js_preloaded.query_networks(scope=unauth_scope) + assert unauth_source_sks + + headers = {"Content-type": "application/json"} + data = dict( + networks=[str(sk) for sk in unauth_source_sks], + name="should_fail", + scope=scope_test.to_dict(), + ) + jsondata = json.dumps(data, cls=JSON_HANDLER.encoder) + + response = test_client.post("/networks/merge", data=jsondata, headers=headers) + assert response.status_code == 401 + details = response.json() + assert "detail" in details + assert str(unauth_scope) in details["detail"] + def test_get_network(self, prepared_network, test_client): network, scoped_key = prepared_network response = test_client.get(f"/networks/{scoped_key}") diff --git a/news/issue-221.rst b/news/issue-221.rst new file mode 100644 index 00000000..75a23c91 --- /dev/null +++ b/news/issue-221.rst @@ -0,0 +1,24 @@ +**Added:** + +* ``AlchemiscaleClient.merge_networks`` for combining multiple existing ``AlchemicalNetwork``\s into a new ``AlchemicalNetwork``, preserving completed and errored ``Task`` results so they do not need to be re-run. Backed by ``Neo4jStore.merge_networks`` and a new ``POST /networks/merge`` endpoint on the user API. + +**Changed:** + +* + +**Deprecated:** + +* + +**Removed:** + +* + +**Fixed:** + +* + +**Security:** + +* + From 7e3f5d12abf41ed32ef6fb6aa0004adb404a6762 Mon Sep 17 00:00:00 2001 From: David Dotson Date: Mon, 8 Jun 2026 17:20:03 -0600 Subject: [PATCH 16/19] Refactor merge_networks using KeyedChain.decode_subchains Replace the inline kc_to_gufe helper and manual chain-slicing loop with a call to KeyedChain.decode_subchains, paired with a list comprehension that captures the original database GufeKeys in the same order. The decode_subchains helper (gufe >=1.8.0) reuses a shared tokenizable_map across yields, so common dependencies in a source network are decoded only once. Lift the previously nested TransformationData dataclass to a private module-level _TransformationData so merge_networks reads top-to-bottom without chasing an inline class definition, and so the helper can be referenced by future tests in isolation. The dropped key_decode_dependencies import goes with kc_to_gufe. subchain_cache is preserved with the same shape but now populated via KeyedChain.from_gufe(transformation); whether the cache (and the unconnected tf_node it feeds) is needed at all is a separate question to revisit once the PERFORMS-wiring question is settled. Co-Authored-By: Claude Opus 4.7 (1M context) --- alchemiscale/storage/statestore.py | 333 ++++++++++++++--------------- 1 file changed, 166 insertions(+), 167 deletions(-) diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index ef2fc87c..7527a38a 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -30,7 +30,6 @@ GufeKey, JSON_HANDLER, KeyedChain, - key_decode_dependencies, ) from gufe.protocols import ProtocolUnitFailure @@ -85,6 +84,141 @@ def get_n4js(settings: Neo4jStoreSettings): class Neo4JStoreError(Exception): ... +def _is_transformation_keyed_dict(keyed_dict: dict) -> bool: + """Return ``True`` if ``keyed_dict`` represents a ``Transformation`` or ``NonTransformation``. + + Used as the predicate for ``KeyedChain.decode_subchains`` when walking + an ``AlchemicalNetwork``'s keyed chain inside :meth:`Neo4jStore.merge_networks`. + """ + return keyed_dict.get("__qualname__") in ("Transformation", "NonTransformation") + + +@dataclass +class _TransformationData: + """Bookkeeping for one Transformation as it is reconstructed during + :meth:`Neo4jStore.merge_networks`. + + Attributes + ---------- + transformation + The decoded ``Transformation`` (or ``NonTransformation``) + ``GufeTokenizable``. + task_tree + Flat list of Neo4j records, one per ``Task`` associated with this + ``Transformation`` in any of the source networks. Each record carries: + + - ``tf_key``: this ``Transformation``'s decoded gufe key (string) + - ``task``: the full ``Task`` node + - ``extended_task``: optional ``Task`` node that this ``Task`` extends + - ``pdrrs``: list of ``ProtocolDAGResultRef`` nodes for this ``Task`` + known_scoped_keys + All ``ScopedKey``\\ s representing this ``Transformation`` across the + source networks. Multiple ``ScopedKey``\\ s can map to a single + decoded ``Transformation`` if the same content was serialized under + different gufe versions. + """ + + transformation: Transformation + task_tree: list = field(default_factory=list) + known_scoped_keys: list = field(default_factory=list) + + def add_known_scoped_key(self, key, scope): + self.known_scoped_keys.append( + ScopedKey(gufe_key=GufeKey(key), **scope.to_dict()) + ) + + @staticmethod + def update_task_trees(transformation_data: list, statestore): + """Given a list of ``_TransformationData``, extract all necessary + info from Neo4j and load the task trees. + """ + key_to_data_map = {str(td.transformation.key): td for td in transformation_data} + # prepare for unwind clause, include transformation key + # for updating each entry of transformation_data for + # each scoped key + transformation_sk_pairs = [ + [str(td.transformation.key), str(sk)] + for td in transformation_data + for sk in td.known_scoped_keys + ] + query = """ + UNWIND $tf_sk_pairs as pairs + WITH pairs[0] AS tf_key, pairs[1] AS tf_scoped_key + MATCH (task:Task)-[:PERFORMS]->(:Transformation|NonTransformation {`_scoped_key`: tf_scoped_key}) + WHERE task.status IN ["complete", "error"] + OPTIONAL MATCH (task)-[:EXTENDS]->(extended_task:Task) + OPTIONAL MATCH (task)-[:RESULTS_IN]->(pdrr:ProtocolDAGResultRef) + RETURN tf_key, task, extended_task as extended_task, collect(pdrr) as pdrrs + """ + results = statestore.execute_query( + query, tf_sk_pairs=transformation_sk_pairs + ).records + + for record in results: + key_to_data_map[record["tf_key"]].task_tree.append(record) + + def to_subgraph(self, target_scope, statestore, subchain_cache): + """Create a subgraph where the "central" node is the + transformation and iteratively add Task and PDRR + nodes with their corresponding relationships. + """ + # if there are no tasks, can return an empty subgraph; no + # need to make a transformation node because it already exists + # in the surrounding merged-network subgraph + if not self.task_tree: + return Subgraph() + + # create the transformation node. With ``decode_subchains`` + # available upstream (gufe >=1.8.0), ``subchain_cache`` holds the + # re-serialized chain for the decoded ``Transformation``; the + # resulting Node's ``_scoped_key`` matches the one already produced + # by the combined AlchemicalNetwork's keyed chain and will dedupe + # against it during ``merge_subgraph``. + _, tf_node, _ = statestore._keyed_chain_to_subgraph( + subchain_cache[self.transformation], target_scope + ) + subgraph = Subgraph() | tf_node + + scope_props = { + "_org": target_scope.org, + "_campaign": target_scope.campaign, + "_project": target_scope.project, + } + + def record_to_node(record): + # create node from a neo4j record with updated scoped key + scoped_key = ScopedKey( + gufe_key=record["_gufe_key"], **target_scope.to_dict() + ) + return Node( + *record.labels, + **record._properties | scope_props | {"_scoped_key": str(scoped_key)}, + ) + + # process each task found. Each record represents a single task. + for record in self.task_tree: + # update task node to have new scoped key + task_node = record_to_node(record["task"]) + # create the task node this task extends if it exists + etask_node = ( + None + if not record["extended_task"] + else record_to_node(record["extended_task"]) + ) + if etask_node: + subgraph |= Relationship.type("EXTENDS")( + etask_node, task_node, **scope_props + ) + # clone all result refs for the task + for pdrr_record in record["pdrrs"]: + pdrr_node = record_to_node(pdrr_record) + subgraph |= Relationship.type("RESULTS_IN")( + task_node, pdrr_node, **scope_props + ) + + return subgraph + + class AlchemiscaleStateStore(abc.ABC): ... @@ -916,175 +1050,40 @@ def merge_networks( f"ScopedKey ({network_scoped_key}) not found in the database." ) - # Helper dataclass for managing transformations and tasks that - # perform them. - @dataclass - class TransformationData: - """ - transformation - - task_tree - flat representation of tasks associated with the transformation, each entry has: - - transformation gufe key - - full task neo4j node - - optional: full task neo4j node that this task extends - - list of ProtocolDAGResultRef nodes - known_scoped_keys - All scoped keys that represent the transformation across networks - """ - - transformation: Transformation - task_tree: list = field( - default_factory=list - ) # flat represenation of task tree with results - known_scoped_keys: list = field(default_factory=list) - - def add_known_scoped_key(self, key, scope): - self.known_scoped_keys.append( - ScopedKey(gufe_key=GufeKey(key), **scope.to_dict()) - ) - - @staticmethod - def update_task_trees(transformation_data: list, statestore): - """Given a list of TransformationData, extract all - necessary info from Neo4j and load the task trees. - """ - - key_to_data_map = { - str(td.transformation.key): td for td in transformation_data - } - # prepare for unwind claus, include transformation key - # for updating each entry of transformation_data for - # each scoped key - transformation_sk_pairs = [ - [str(td.transformation.key), str(sk)] - for td in transformation_data - for sk in td.known_scoped_keys - ] - query = """ - UNWIND $tf_sk_pairs as pairs - WITH pairs[0] AS tf_key, pairs[1] AS tf_scoped_key - MATCH (task:Task)-[:PERFORMS]->(:Transformation|NonTransformation {`_scoped_key`: tf_scoped_key}) - WHERE task.status IN ["complete", "error"] - OPTIONAL MATCH (task)-[:EXTENDS]->(extended_task:Task) - OPTIONAL MATCH (task)-[:RESULTS_IN]->(pdrr:ProtocolDAGResultRef) - RETURN tf_key, task, extended_task as extended_task, collect(pdrr) as pdrrs - """ - results = statestore.execute_query( - query, tf_sk_pairs=transformation_sk_pairs - ).records - - for record in results: - key_to_data_map[record["tf_key"]].task_tree.append(record) - - def to_subgraph(self, target_scope, statestore, subchain_cache): - """Create a subgraph where the "central" node is the - transformation and iteratively add Task and PDRRs - nodes with their corresponding relationships. - - """ - # if there are no tasks, can return an subgraph, no - # need to make transformation node because it exists - # already outside of this method - if not self.task_tree: - return Subgraph() - - # create the transformation node from its keyed chain, - # this allows later nodes to be easily connected to - # the subgraph outside of this method. subchain_cache - # is a nonlocal dict[Transformation, KeyedChain] that - # removes the need to find the transformation within a - # larger KeyedChain. This will likely be unnecessary - # with later versions of gufe when decode_subchains is - # added. - # (https://github.com/OpenFreeEnergy/gufe/pull/634) - _, tf_node, _ = statestore._keyed_chain_to_subgraph( - subchain_cache[self.transformation], target_scope - ) - subgraph = Subgraph() | tf_node - - scope_props = { - "_org": target_scope.org, - "_campaign": target_scope.campaign, - "_project": target_scope.project, - } - - def record_to_node(record): - # create node from a neo4j record with updated scoped key - scoped_key = ScopedKey( - gufe_key=record["_gufe_key"], **scope.to_dict() - ) - return Node( - *record.labels, - **record._properties - | scope_props - | {"_scoped_key": str(scoped_key)}, - ) - - # process each task found. Each record represents a - # single task. - for record in self.task_tree: - etask_record = record["extended_task"] - pdrr_records = record["pdrrs"] - # update task node to have new scoped key - task_node = record_to_node(record["task"]) - # create the task node this task extends if it exists - etask_node = ( - None - if not record["extended_task"] - else record_to_node(record["extended_task"]) - ) - if etask_node: - subgraph |= Relationship.type("EXTENDS")( - etask_node, task_node, **scope_props - ) - # clone all result refs for the task - for pdrr_record in record["pdrrs"]: - pdrr_node = record_to_node(pdrr_record) - subgraph |= Relationship.type("RESULTS_IN")( - task_node, pdrr_node, **scope_props - ) - - return subgraph - - # TODO: upstream to gufe - def kc_to_gufe(kc, gts): - for gufe_key, keyed_dict in kc: - if gt := gts.get(gufe_key): - continue - gt = key_decode_dependencies(keyed_dict, registry=gts) - gts[gufe_key] = gt - return gt - - # Map Transformation and NonTransformation objects to their - # potentially duplicated database gufe keys. This may happen - # due to minor version changes between serialization - # times. These keys are then used to get all results for Tasks - # associated with them. - transformation_data: list[TransformationData] = [] - subchain_cache = {} + # Map decoded Transformation / NonTransformation objects to all of + # their original database GufeKeys (potentially across multiple + # source networks, and including duplicates introduced by minor + # serialization-version drift). These original keys are needed to + # locate Tasks associated with each Transformation in the source + # networks. + transformation_data: list[_TransformationData] = [] + subchain_cache: dict[GufeTokenizable, KeyedChain] = {} for network_scope, network_keyed_chain in network_keyed_chains: - gts = {} - for index, (database_key, _) in enumerate(network_keyed_chain): - # only process Transformation or NonTransformation keys - if ( - "Transformation" in database_key - or "NonTransformation" in database_key - ): - # get the tokenizable at the index along with all previous data - subchain = KeyedChain(network_keyed_chain[: index + 1]) - transformation = kc_to_gufe(subchain, gts) - subchain_cache[transformation] = subchain - try: - idx = transformation_data.index(transformation) - data = transformation_data[idx] - except ValueError: - data = TransformationData(transformation) - transformation_data.append(data) - data.add_known_scoped_key(database_key, network_scope) + # database keys for Transformations / NonTransformations in this + # source network's chain, in chain order + database_keys = [ + gufe_key + for gufe_key, keyed_dict in network_keyed_chain + if _is_transformation_keyed_dict(keyed_dict) + ] + # decoded Transformation / NonTransformation objects in the same + # order; decode_subchains shares a tokenizable_map across yields + # so common dependencies are decoded only once per source network + transformations = network_keyed_chain.decode_subchains( + _is_transformation_keyed_dict + ) + for database_key, transformation in zip(database_keys, transformations): + subchain_cache[transformation] = KeyedChain.from_gufe(transformation) + try: + idx = transformation_data.index(transformation) + data = transformation_data[idx] + except ValueError: + data = _TransformationData(transformation) + transformation_data.append(data) + data.add_known_scoped_key(database_key, network_scope) # Collect all transformation gufe objects and collect into a new set of edges - TransformationData.update_task_trees(transformation_data, self) + _TransformationData.update_task_trees(transformation_data, self) new_edges = [td.transformation for td in transformation_data] # Make new alchemiscale network with these edges combined_alchemical_network = AlchemicalNetwork(edges=new_edges, name=name) From df3a1807f0a35b5b0f488f963c9a237e2256ab8c Mon Sep 17 00:00:00 2001 From: David Dotson Date: Mon, 8 Jun 2026 17:20:12 -0600 Subject: [PATCH 17/19] Bump gufe to >=1.8.0 in test and server env files merge_networks now uses KeyedChain.decode_subchains, which was added in gufe 1.8.0. Bump the pin in: - devtools/conda-envs/test.yml (was =1.7.1) so the integration tests exercise the new code path. - devtools/conda-envs/alchemiscale-server.yml (was =1.6.1) so server deployments running merge_networks pick up the required gufe API. Client and compute env files are left alone since neither imports the new code path. Co-Authored-By: Claude Opus 4.7 (1M context) --- devtools/conda-envs/alchemiscale-server.yml | 2 +- devtools/conda-envs/test.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/devtools/conda-envs/alchemiscale-server.yml b/devtools/conda-envs/alchemiscale-server.yml index fa6f22b3..298e5861 100644 --- a/devtools/conda-envs/alchemiscale-server.yml +++ b/devtools/conda-envs/alchemiscale-server.yml @@ -8,7 +8,7 @@ dependencies: - cuda-version >=12 # alchemiscale dependencies - - gufe=1.6.1 + - gufe>=1.8.0 - openfe=1.6.1 - requests - click diff --git a/devtools/conda-envs/test.yml b/devtools/conda-envs/test.yml index d8591fcf..024256e7 100644 --- a/devtools/conda-envs/test.yml +++ b/devtools/conda-envs/test.yml @@ -7,7 +7,7 @@ dependencies: - cuda-version >=12 # alchemiscale dependencies - - gufe =1.7.1 + - gufe >=1.8.0 - openfe =1.8.0 - pydantic >2 - pydantic-settings From d0bf4ec7d6c0c719308ff10fffc92335ca019d38 Mon Sep 17 00:00:00 2001 From: David Dotson Date: Mon, 8 Jun 2026 17:42:26 -0600 Subject: [PATCH 18/19] Address PR #507 review: PERFORMS wiring, state parameter, optimizations Code review revealed that cloned Tasks on the merged network were written but never wired back to their Transformations, leaving every PERFORMS-based traversal (get_network_tasks, get_task_transformation, set_tasks_status, etc.) blind to them. The new test_merge_networks_preserves_tasks_and_results check via get_network_tasks asserts this exact thing and would have failed in CI. Changes ------- - Add the PERFORMS edge in _TransformationData.to_subgraph so cloned Tasks are reachable from the standard (:AlchemicalNetwork)-[:DEPENDS_ON]->(:Transformation)<-[:PERFORMS]-(:Task) traversal. The pre-existing tf_node construction is now meaningful, resolving the open question about subchain_cache. - Add a `state` parameter to Neo4jStore.merge_networks, /networks/merge, and AlchemiscaleClient.merge_networks, bringing merge_networks to feature parity with assemble_network / create_network. Defaults to NetworkStateEnum.active. - Document the "cloned Tasks are not actioned to the merged TaskHub" semantics in the store-level and client docstrings, including the remediation path (call action_tasks after merging). - Replace the O(N^2) `transformation_data.index(transformation)` dedup with a `dict[GufeKey, _TransformationData]` keyed on transformation.key, avoiding repeated GufeTokenizable equality checks on large networks. - Improve the missing-ScopedKey error to collect and report all missing source SKs in a single ValueError, rather than only the one that first triggered KeyError. - Add a chain-order contract comment at the zip(database_keys, transformations) site pointing at the decode_subchains yield-order guarantee. - Make /networks/merge an `async def` endpoint, matching the style of the neighboring create_network endpoint. - Add a parametrized client test (state in {active, inactive}) that exercises the new state parameter end-to-end. - Drop the unused network_tyk2 fixture from the two API-level authorization tests. Co-Authored-By: Claude Opus 4.7 (1M context) --- alchemiscale/interface/api.py | 6 +- alchemiscale/interface/client.py | 16 ++- alchemiscale/storage/statestore.py | 106 ++++++++++++------ .../interface/client/test_client.py | 28 +++++ .../tests/integration/interface/test_api.py | 4 +- 5 files changed, 120 insertions(+), 40 deletions(-) diff --git a/alchemiscale/interface/api.py b/alchemiscale/interface/api.py index 1777e6d4..d5edf34f 100644 --- a/alchemiscale/interface/api.py +++ b/alchemiscale/interface/api.py @@ -31,7 +31,7 @@ from ..settings import get_base_api_settings from ..storage.statestore import Neo4jStore from ..storage.objectstore import S3ObjectStore -from ..storage.models import TaskStatusEnum, StrategyState +from ..storage.models import NetworkStateEnum, TaskStatusEnum, StrategyState from ..models import Scope, ScopedKey from ..security.models import TokenData, CredentialedUserIdentity @@ -139,11 +139,12 @@ async def create_network( @router.post("/networks/merge", response_model=ScopedKey) -def merge_networks( +async def merge_networks( *, networks: list[str] = Body(embed=True), name: str = Body(embed=True), scope: dict = Body(embed=True), + state: str = Body(embed=True, default=NetworkStateEnum.active.value), n4js: Neo4jStore = Depends(get_n4js_depends), token: TokenData = Depends(get_token_data_depends), ): @@ -175,6 +176,7 @@ def merge_networks( network_scoped_keys=network_sks, name=name, scope=target_scope, + state=state, ) except ValueError as e: raise HTTPException( diff --git a/alchemiscale/interface/client.py b/alchemiscale/interface/client.py index d6215f43..c0a0a0a3 100644 --- a/alchemiscale/interface/client.py +++ b/alchemiscale/interface/client.py @@ -192,6 +192,7 @@ def merge_networks( networks: list[ScopedKey], name: str, scope: Scope, + state: NetworkStateEnum | str = NetworkStateEnum.active, visualize: bool = True, ) -> ScopedKey: """Merge multiple existing AlchemicalNetworks into a new AlchemicalNetwork. @@ -203,7 +204,13 @@ def merge_networks( their associated ProtocolDAGResultRefs, so previously-computed results do not need to be re-run. - The new AlchemicalNetwork is created in the active state. + Cloned Tasks are wired to their Transformations via ``PERFORMS`` and + are reachable through standard network traversals + (``get_network_tasks``, ``get_network_results``, etc.). They are + intentionally **not** actioned to the new network's TaskHub; to + retry errored Tasks on the merged network, call + :meth:`action_tasks` with the merged network's ScopedKey after the + merge completes. Parameters ---------- @@ -216,6 +223,10 @@ def merge_networks( scope The Scope in which to create the new AlchemicalNetwork. This must be a *specific* Scope; it must not contain wildcards. + state + The starting state of the new AlchemicalNetwork in the database. + See :meth:`AlchemiscaleClient.set_network_state` for valid states. + Defaults to ``"active"``. visualize If ``True``, show submission progress indicator. @@ -243,10 +254,13 @@ def merge_networks( f"ScopedKey '{network_sk}' does not refer to an AlchemicalNetwork" ) + state = NetworkStateEnum(state) + data = dict( networks=[str(sk) for sk in network_sks], name=name, scope=scope.to_dict(), + state=state.value, ) def post(): diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index 7527a38a..5f94f83f 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -158,22 +158,29 @@ def update_task_trees(transformation_data: list, statestore): key_to_data_map[record["tf_key"]].task_tree.append(record) def to_subgraph(self, target_scope, statestore, subchain_cache): - """Create a subgraph where the "central" node is the - transformation and iteratively add Task and PDRR - nodes with their corresponding relationships. - """ - # if there are no tasks, can return an empty subgraph; no - # need to make a transformation node because it already exists - # in the surrounding merged-network subgraph + """Create a subgraph anchored at the Transformation node and + iteratively add Task and PDRR nodes with their corresponding + relationships. + + Each cloned Task is wired back to the Transformation via a + ``PERFORMS`` edge so the standard + ``(:AlchemicalNetwork)-[:DEPENDS_ON]->(:Transformation)<-[:PERFORMS]-(:Task)`` + traversal keeps working on the merged network. Note that cloned + Tasks are intentionally **not** actioned to the merged network's + TaskHub; see :meth:`Neo4jStore.merge_networks` for the rationale. + """ + # if there are no tasks, return an empty subgraph; the + # Transformation node already exists in the surrounding + # merged-network subgraph if not self.task_tree: return Subgraph() - # create the transformation node. With ``decode_subchains`` - # available upstream (gufe >=1.8.0), ``subchain_cache`` holds the - # re-serialized chain for the decoded ``Transformation``; the - # resulting Node's ``_scoped_key`` matches the one already produced - # by the combined AlchemicalNetwork's keyed chain and will dedupe - # against it during ``merge_subgraph``. + # build the Transformation node as the PERFORMS anchor for cloned + # Tasks. ``subchain_cache`` holds the re-serialized chain for the + # decoded ``Transformation``; the resulting Node's ``_scoped_key`` + # matches the one already produced by the combined + # AlchemicalNetwork's keyed chain and will dedupe against it + # during ``merge_subgraph``. _, tf_node, _ = statestore._keyed_chain_to_subgraph( subchain_cache[self.transformation], target_scope ) @@ -199,6 +206,10 @@ def record_to_node(record): for record in self.task_tree: # update task node to have new scoped key task_node = record_to_node(record["task"]) + # wire the cloned Task back to its Transformation; without + # this edge the Task is unreachable from get_network_tasks + # and every other PERFORMS-based traversal + subgraph |= Relationship.type("PERFORMS")(task_node, tf_node, **scope_props) # create the task node this task extends if it exists etask_node = ( None @@ -1022,9 +1033,23 @@ def merge_networks( network_scoped_keys: list[ScopedKey], name: str, scope: Scope, + state: NetworkStateEnum | str = NetworkStateEnum.active, ) -> ScopedKey: """Merge multiple ``AlchemicalNetwork`` nodes into a new ``AlchemicalNetwork``. + Each ``Transformation`` / ``NonTransformation`` in the input + networks is included exactly once in the new network. ``Task``\\ s + on the source networks that are in ``complete`` or ``error`` + state are cloned into the new network's ``Scope`` along with + their ``ProtocolDAGResultRef``\\ s and ``EXTENDS`` relationships, + and are wired to their ``Transformation`` via ``PERFORMS`` so + they are reachable from the standard network traversals. + + Cloned ``Task``\\ s are intentionally **not** actioned to the new + network's ``TaskHub``. Users wanting to retry errored tasks on + the merged network should call :meth:`action_tasks` themselves + with the merged network's ``TaskHub`` ``ScopedKey``. + Parameters ---------- network_scoped_keys @@ -1033,21 +1058,30 @@ def merge_networks( The name of the new ``AlchemicalNetwork``. scope The ``Scope`` of the new ``AlchemicalNetwork``. + state + The starting state for the new ``AlchemicalNetwork``'s + ``NetworkMark``. Defaults to ``NetworkStateEnum.active``. Returns ------- The ``ScopedKey`` of the new ``AlchemicalNetwork`` in the database. """ - # Collect keyed chain representation for all alchemical networks - try: - network_keyed_chains: list[tuple[Scope, KeyedChain]] = [] - for network_scoped_key in network_scoped_keys: + # Collect keyed chain representation for all alchemical networks, + # gathering every missing ScopedKey up front so callers passing + # many SKs get a single, complete error. + network_keyed_chains: list[tuple[Scope, KeyedChain]] = [] + missing: list[ScopedKey] = [] + for network_scoped_key in network_scoped_keys: + try: keyed_chain = self.get_keyed_chain(network_scoped_key) - network_keyed_chains.append((network_scoped_key.scope, keyed_chain)) - # could not find specified network by provided scoped key - except KeyError: + except KeyError: + missing.append(network_scoped_key) + continue + network_keyed_chains.append((network_scoped_key.scope, keyed_chain)) + if missing: + joined = ", ".join(str(sk) for sk in missing) raise ValueError( - f"ScopedKey ({network_scoped_key}) not found in the database." + f"The following ScopedKey(s) were not found in the database: {joined}" ) # Map decoded Transformation / NonTransformation objects to all of @@ -1055,8 +1089,10 @@ def merge_networks( # source networks, and including duplicates introduced by minor # serialization-version drift). These original keys are needed to # locate Tasks associated with each Transformation in the source - # networks. - transformation_data: list[_TransformationData] = [] + # networks. We key the dedup map on ``transformation.key`` (the + # decoded GufeKey) to avoid O(N^2) GufeTokenizable equality checks + # for large networks. + transformation_data: dict[GufeKey, _TransformationData] = {} subchain_cache: dict[GufeTokenizable, KeyedChain] = {} for network_scope, network_keyed_chain in network_keyed_chains: # database keys for Transformations / NonTransformations in this @@ -1066,25 +1102,25 @@ def merge_networks( for gufe_key, keyed_dict in network_keyed_chain if _is_transformation_keyed_dict(keyed_dict) ] - # decoded Transformation / NonTransformation objects in the same - # order; decode_subchains shares a tokenizable_map across yields - # so common dependencies are decoded only once per source network + # decoded Transformation / NonTransformation objects, yielded + # by decode_subchains in the same chain order as the predicate + # selects above (gufe contract); decode_subchains also shares + # a tokenizable_map across yields so common dependencies are + # decoded only once per source network. transformations = network_keyed_chain.decode_subchains( _is_transformation_keyed_dict ) for database_key, transformation in zip(database_keys, transformations): subchain_cache[transformation] = KeyedChain.from_gufe(transformation) - try: - idx = transformation_data.index(transformation) - data = transformation_data[idx] - except ValueError: + data = transformation_data.get(transformation.key) + if data is None: data = _TransformationData(transformation) - transformation_data.append(data) + transformation_data[transformation.key] = data data.add_known_scoped_key(database_key, network_scope) # Collect all transformation gufe objects and collect into a new set of edges - _TransformationData.update_task_trees(transformation_data, self) - new_edges = [td.transformation for td in transformation_data] + _TransformationData.update_task_trees(list(transformation_data.values()), self) + new_edges = [td.transformation for td in transformation_data.values()] # Make new alchemiscale network with these edges combined_alchemical_network = AlchemicalNetwork(edges=new_edges, name=name) an_subgraph, an_node, an_sk = self._keyed_chain_to_subgraph( @@ -1092,11 +1128,11 @@ def merge_networks( ) # create and fold in taskhub and network mark supporting nodes an_subgraph |= ( - self.create_network_mark_subgraph(an_node)[0] + self.create_network_mark_subgraph(an_node, state=state)[0] | self.create_taskhub_subgraph(an_node)[0] ) # create and fold in all task and results data - for td in transformation_data: + for td in transformation_data.values(): an_subgraph |= td.to_subgraph(scope, self, subchain_cache) # merge the new network into neo4j diff --git a/alchemiscale/tests/integration/interface/client/test_client.py b/alchemiscale/tests/integration/interface/client/test_client.py index b8dcf289..1fa56b1a 100644 --- a/alchemiscale/tests/integration/interface/client/test_client.py +++ b/alchemiscale/tests/integration/interface/client/test_client.py @@ -248,6 +248,34 @@ def test_merge_networks( t.key for t in network_tyk2.edges } + @pytest.mark.parametrize("state", ["active", "inactive"]) + def test_merge_networks_respects_state( + self, + state, + scope_test, + n4js_preloaded, + user_client: client.AlchemiscaleClient, + ): + """The state parameter must control the NetworkMark on the merged + AlchemicalNetwork the same way it does on create_network.""" + source_sks = user_client.query_networks(scope=scope_test, state=None) + assert source_sks + + merge_scope = Scope( + org=scope_test.org, + campaign=scope_test.campaign, + project=f"merged_state_{state}", + ) + merged_sk = user_client.merge_networks( + networks=source_sks, + name=f"merged_state_{state}", + scope=merge_scope, + state=state, + visualize=False, + ) + + assert user_client.get_network_state(merged_sk) == state + def test_merge_networks_rejects_wildcard_scope( self, n4js_preloaded, diff --git a/alchemiscale/tests/integration/interface/test_api.py b/alchemiscale/tests/integration/interface/test_api.py index 24be0be1..f1427147 100644 --- a/alchemiscale/tests/integration/interface/test_api.py +++ b/alchemiscale/tests/integration/interface/test_api.py @@ -147,7 +147,7 @@ def test_merge_networks( } def test_merge_networks_bad_scope( - self, n4js_preloaded, test_client, network_tyk2, scope_test, multiple_scopes + self, n4js_preloaded, test_client, scope_test, multiple_scopes ): # destination scope the test_client's token does not have access to bad_scope = multiple_scopes[1] @@ -171,7 +171,7 @@ def test_merge_networks_bad_scope( assert str(bad_scope) in details["detail"] def test_merge_networks_bad_source_scope( - self, n4js_preloaded, test_client, network_tyk2, scope_test, multiple_scopes + self, n4js_preloaded, test_client, scope_test, multiple_scopes ): # source network in a scope the test_client's token does not authorize unauth_scope = multiple_scopes[1] From 5a92fbcb357a048539a1899f5ccd4f0495a7a148 Mon Sep 17 00:00:00 2001 From: David Dotson Date: Mon, 8 Jun 2026 18:07:22 -0600 Subject: [PATCH 19/19] Pin gufe to =1.8.0 across all conda env files merge_networks now depends on KeyedChain.decode_subchains (gufe 1.8.0). Pin all five env files to an exact 1.8.0 release, matching the repo's convention of pinning gufe exactly rather than with a lower bound. Touches the test, server, compute, client, and docs envs. Co-Authored-By: Claude Opus 4.7 (1M context) --- devtools/conda-envs/alchemiscale-client.yml | 2 +- devtools/conda-envs/alchemiscale-compute.yml | 2 +- devtools/conda-envs/alchemiscale-server.yml | 2 +- devtools/conda-envs/docs.yml | 2 +- devtools/conda-envs/test.yml | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/devtools/conda-envs/alchemiscale-client.yml b/devtools/conda-envs/alchemiscale-client.yml index 6ef6d801..8a536682 100644 --- a/devtools/conda-envs/alchemiscale-client.yml +++ b/devtools/conda-envs/alchemiscale-client.yml @@ -8,7 +8,7 @@ dependencies: - cuda-version >=12 # alchemiscale dependencies - - gufe=1.6.1 + - gufe=1.8.0 - openfe=1.6.1 - requests - click diff --git a/devtools/conda-envs/alchemiscale-compute.yml b/devtools/conda-envs/alchemiscale-compute.yml index 92b9d4be..6d8cd255 100644 --- a/devtools/conda-envs/alchemiscale-compute.yml +++ b/devtools/conda-envs/alchemiscale-compute.yml @@ -8,7 +8,7 @@ dependencies: - cuda-version >=12 # alchemiscale dependencies - - gufe=1.6.1 + - gufe=1.8.0 - openfe=1.6.1 - requests - click diff --git a/devtools/conda-envs/alchemiscale-server.yml b/devtools/conda-envs/alchemiscale-server.yml index 298e5861..6fea2592 100644 --- a/devtools/conda-envs/alchemiscale-server.yml +++ b/devtools/conda-envs/alchemiscale-server.yml @@ -8,7 +8,7 @@ dependencies: - cuda-version >=12 # alchemiscale dependencies - - gufe>=1.8.0 + - gufe=1.8.0 - openfe=1.6.1 - requests - click diff --git a/devtools/conda-envs/docs.yml b/devtools/conda-envs/docs.yml index f080a0af..69f4c6d2 100644 --- a/devtools/conda-envs/docs.yml +++ b/devtools/conda-envs/docs.yml @@ -10,6 +10,6 @@ dependencies: - myst-parser>=0.14 - docutils - sphinx-notfound-page - - gufe=1.3.0 + - gufe=1.8.0 - py2neo - stratocaster diff --git a/devtools/conda-envs/test.yml b/devtools/conda-envs/test.yml index eed7fb35..f4f6b5e7 100644 --- a/devtools/conda-envs/test.yml +++ b/devtools/conda-envs/test.yml @@ -7,7 +7,7 @@ dependencies: - cuda-version >=12 # alchemiscale dependencies - - gufe >=1.8.0 + - gufe =1.8.0 - openfe =1.8.0 - pydantic >2 - pydantic-settings