From 1e5cebb7dd83f83a39a14ee238e319c25a0996c3 Mon Sep 17 00:00:00 2001 From: David Dotson Date: Mon, 3 Apr 2023 22:18:40 -0700 Subject: [PATCH] Adding TaskProvenance usage to compute execution and results --- alchemiscale/compute/client.py | 8 +++++- alchemiscale/compute/service.py | 19 +++++++++++--- alchemiscale/storage/models.py | 19 -------------- alchemiscale/storage/statestore.py | 26 ++++++++++++++++--- .../interface/client/test_client.py | 3 +++ 5 files changed, 49 insertions(+), 26 deletions(-) diff --git a/alchemiscale/compute/client.py b/alchemiscale/compute/client.py index e28c58dd..a39f5f5d 100644 --- a/alchemiscale/compute/client.py +++ b/alchemiscale/compute/client.py @@ -9,6 +9,7 @@ import json from urllib.parse import urljoin from functools import wraps +from datetime import datetime import requests from requests.auth import HTTPBasicAuth @@ -93,7 +94,12 @@ def get_task_transformation( ) def set_task_result( - self, task: ScopedKey, protocoldagresult: ProtocolDAGResult + self, + task: ScopedKey, + protocoldagresult: ProtocolDAGResult, + compute_service_id: Optional[ComputeServiceID] = None, + start: Optional[datetime] = None, + end: Optional[datetime] = None, ) -> ScopedKey: data = dict( protocoldagresult=json.dumps( diff --git a/alchemiscale/compute/service.py b/alchemiscale/compute/service.py index f6dd4220..29f294da 100644 --- a/alchemiscale/compute/service.py +++ b/alchemiscale/compute/service.py @@ -16,6 +16,7 @@ from pathlib import Path from threading import Thread import tempfile +from datetime import datetime import requests @@ -243,7 +244,8 @@ def task_to_protocoldag( return protocoldag, transformation, extends_protocoldagresult def push_result( - self, task: ScopedKey, protocoldagresult: ProtocolDAGResult + self, task: ScopedKey, protocoldagresult: ProtocolDAGResult, + start: datetime, end: datetime ) -> ScopedKey: # TODO: this method should postprocess any paths, # leaf nodes in DAG for blob results that should go to object store @@ -251,7 +253,10 @@ def push_result( # TODO: ship paths to object store # finally, push ProtocolDAGResult - sk: ScopedKey = self.client.set_task_result(task, protocoldagresult) + sk: ScopedKey = self.client.set_task_result(task, protocoldagresult, + compute_service_id=self.compute_service_id, + start=start, + end=end) return sk @@ -271,6 +276,8 @@ def execute(self, task: ScopedKey) -> ScopedKey: ) shared = Path(shared_tmp.name) + start = datetime.utcnow() + protocoldagresult = execute_DAG( protocoldag, shared=shared, @@ -279,11 +286,17 @@ def execute(self, task: ScopedKey) -> ScopedKey: raise_error=False, ) + end = datetime.utcnow() + if not self.keep_shared: shared_tmp.cleanup() # push the result (or failure) back to the compute API - result_sk = self.push_result(task, protocoldagresult) + result_sk = self.push_result(task, + protocoldagresult, + start=start, + end=end + ) return result_sk diff --git a/alchemiscale/storage/models.py b/alchemiscale/storage/models.py index b428d6e3..b69db382 100644 --- a/alchemiscale/storage/models.py +++ b/alchemiscale/storage/models.py @@ -191,21 +191,6 @@ def _defaults(cls): return super()._defaults() -class TaskArchive(GufeTokenizable): - ... - - def _to_dict(self): - return {} - - @classmethod - def _from_dict(cls, d): - return cls(**d) - - @classmethod - def _defaults(cls): - return super()._defaults() - - class ObjectStoreRef(GufeTokenizable): location: Optional[str] obj_key: Optional[GufeKey] @@ -257,7 +242,3 @@ def _to_dict(self): "scope": str(self.scope), "ok": self.ok, } - - -class TaskArchive(GufeTokenizable): - ... diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index d5ad080e..db6783fa 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -25,9 +25,9 @@ from .models import ( ComputeServiceID, ComputeServiceRegistration, + TaskProvenance, Task, TaskHub, - TaskArchive, TaskStatusEnum, ProtocolDAGResultRef, ) @@ -1570,9 +1570,17 @@ def get_task_transformation( return transformation, protocoldagresultref def set_task_result( - self, task: ScopedKey, protocoldagresultref: ProtocolDAGResultRef + self, + task: ScopedKey, + protocoldagresultref: ProtocolDAGResultRef, + taskprovenance: Optional[TaskProvenance] = None ) -> ScopedKey: - """Set a `ProtocolDAGResultRef` pointing to a `ProtocolDAGResult` for the given `Task`.""" + """Set a `ProtocolDAGResultRef` pointing to a `ProtocolDAGResult` for the given `Task`. + + If a `TaskProvenance` is given, this will also be associated with the + `ProtocolDAGResultRef` via a RECORDS relationship. + + """ if task.qualname != "Task": raise ValueError("`task` ScopedKey does not correspond to a `Task`") @@ -1595,6 +1603,18 @@ def set_task_result( _project=scope.project, ) + if taskprovenance is not None: + taskprovenance_node = Node( + "TaskProvenance", **taskprovenance.to_dict() + ) + subgraph = subgraph | Relationship.type("RECORDS")( + taskprovenance_node, + protocoldagresultref_node, + _org=scope.org, + _campaign=scope.campaign, + _project=scope.project, + ) + with self.transaction() as tx: tx.merge(subgraph, "GufeTokenizable", "_scoped_key") diff --git a/alchemiscale/tests/integration/interface/client/test_client.py b/alchemiscale/tests/integration/interface/client/test_client.py index 32b5277d..2d93b178 100644 --- a/alchemiscale/tests/integration/interface/client/test_client.py +++ b/alchemiscale/tests/integration/interface/client/test_client.py @@ -453,6 +453,9 @@ def test_get_task_results( assert isinstance(pdr.extends_key, GufeKey) or pdr.extends_key is None assert pdr.ok() + import pdb + pdb.set_trace() + def test_get_task_failures( self, scope_test,