diff --git a/alchemiscale/compute/client.py b/alchemiscale/compute/client.py index 59809494..e57ecb90 100644 --- a/alchemiscale/compute/client.py +++ b/alchemiscale/compute/client.py @@ -9,6 +9,7 @@ import json from urllib.parse import urljoin from functools import wraps +from pathlib import Path import requests from requests.auth import HTTPBasicAuth @@ -108,3 +109,22 @@ def set_task_result( pdr_sk = self._post_resource(f"tasks/{task}/results", data) return ScopedKey.from_dict(pdr_sk) + + def push_resultfile( + self, task: ScopedKey, + protocoldagresult: ProtocolDAGResult, + path: Path + ) -> ScopedKey: + data = dict( + protocoldagresult=json.dumps( + protocoldagresult.to_dict(), cls=JSON_HANDLER.encoder + ) + ) + + pdr_sk = self._post_resource(f"tasks/{task}/results", data) + + return ScopedKey.from_dict(pdr_sk) + + + def check_exists_resultfile(self, location): + ... diff --git a/alchemiscale/compute/filestorage.py b/alchemiscale/compute/filestorage.py new file mode 100644 index 00000000..9e0f74cf --- /dev/null +++ b/alchemiscale/compute/filestorage.py @@ -0,0 +1,66 @@ +import pathlib +import shutil +import os +from typing import Union, Tuple, ContextManager + +from gufe.storage.externalresource.base import ExternalStorage +from gufe.storage.storagemanager import StorageManager, SingleProcDAGContextManager + +from gufe.storage.errors import ( + MissingExternalResourceError, ChangedExternalResourceError +) + +from ..models import ScopedKey +from .client import AlchemiscaleComputeClient + + +class ResultFileDAGContextManager(SingleProcDAGContextManager): + ... + + +class ResultFileStorageManager(StorageManager): + ... + + +class ResultFileStorage(ExternalStorage): + + # need some way of making sure files land in the right place in object store + # so somehow we need to communicate this in every call to API service, so + # it can translate what is being requested into the true location in the + # object store + + # task_sk may be the right thing here, but depends on if paths get shipped + # *before* or *after* ProtocolDAGResult returned by executor and uploaded + # it's better for us if paths get shipped *after*, since then we'll have + # the reference in the state store to use for routing into object store + def __init__(self, client: AlchemiscaleComputeClient, task_sk: ScopedKey): + self.client = client + + def _iter_contents(self, prefix=""): + raise NotImplementedError() + + def _store_bytes(self, location, byte_data): + """ + For implementers: This should be blocking, even if the storage + backend allows asynchronous storage. + """ + raise NotImplementedError() + + def _store_path(self, location, path): + """ + For implementers: This should be blocking, even if the storage + backend allows asynchronous storage. + """ + raise NotImplementedError() + + def _exists(self, location): + return self.client.check_exists_resultfile(location) + + def _delete(self, location): + raise NotImplementedError() + + def _get_filename(self, location): + raise NotImplementedError() + + def _load_stream(self, location): + raise NotImplementedError() diff --git a/alchemiscale/compute/service.py b/alchemiscale/compute/service.py index c143c53a..5948d329 100644 --- a/alchemiscale/compute/service.py +++ b/alchemiscale/compute/service.py @@ -24,7 +24,7 @@ from gufe.protocols.protocoldag import execute_DAG, ProtocolDAG, ProtocolDAGResult from .client import AlchemiscaleComputeClient -from ..storage.models import Task, TaskHub, ComputeServiceID +from ..storage.models import Task, TaskHub, ComputeServiceID, ObjectStoreRef from ..models import Scope, ScopedKey @@ -289,13 +289,20 @@ def task_to_protocoldag( ) return protocoldag, transformation, extends_protocoldagresult + + def _paths_to_objectstorerefs(self, outputs, task, protocoldagresult): + if isinstance(outputs, dict): + return {key: self.paths_to_objectstorerefs(value, task, protocoldagresult) for key, value in outputs.items()} + elif isinstance(outputs, list): + return [self.paths_to_objectstorerefs(value, task, protocoldagresult) for value in outputs] + elif isinstance(outputs, Path): + return self.client.push_result_path(task, protocoldagresult, outputs) + else: + return outputs + def push_result( self, task: ScopedKey, protocoldagresult: ProtocolDAGResult ) -> ScopedKey: - # TODO: this method should postprocess any paths, - # leaf nodes in DAG for blob results that should go to object store - - # TODO: ship paths to object store # finally, push ProtocolDAGResult sk: ScopedKey = self.client.set_task_result( @@ -304,6 +311,8 @@ def push_result( return sk + + def execute(self, task: ScopedKey) -> ScopedKey: """Executes given Task. diff --git a/alchemiscale/interface/client.py b/alchemiscale/interface/client.py index 2f9d00f7..30b562fd 100644 --- a/alchemiscale/interface/client.py +++ b/alchemiscale/interface/client.py @@ -668,13 +668,6 @@ def cancel_tasks( return [ScopedKey.from_str(i) if i is not None else None for i in canceled_sks] - def _set_task_status( - self, task: ScopedKey, status: TaskStatusEnum - ) -> Optional[ScopedKey]: - """Set the status of a `Task`.""" - task_sk = self._post_resource(f"/tasks/{task}/status", status.value) - return ScopedKey.from_str(task_sk) if task_sk is not None else None - async def _set_task_status( self, tasks: List[ScopedKey], status: TaskStatusEnum ) -> List[Optional[ScopedKey]]: diff --git a/alchemiscale/storage/objectstore.py b/alchemiscale/storage/objectstore.py index bbb89a5f..f339107a 100644 --- a/alchemiscale/storage/objectstore.py +++ b/alchemiscale/storage/objectstore.py @@ -138,20 +138,12 @@ def _get_bytes(self, location): return self.resource.Object(self.bucket, key).get()["Body"].read() def _store_path(self, location, path): - """ - For implementers: This should be blocking, even if the storage - backend allows asynchronous storage. - """ """ For implementers: This should be blocking, even if the storage backend allows asynchronous storage. """ key = os.path.join(self.prefix, location) - - with open(path, "rb") as f: - self.resource.Bucket(self.bucket).upload_fileobj(f, key) - - b = self.resource.Bucket(self.bucket) + self.resource.Bucket(self.bucket).upload_file(path, key) def _exists(self, location) -> bool: from botocore.exceptions import ClientError @@ -192,6 +184,54 @@ def _get_filename(self, location): return url + def push_result_artifact( + self, + protocoldagresult_key: GufeKey, + scope: Scope, + ) -> ProtocolDAGResultRef: + """Push given `ProtocolDAGResult` to this `ObjectStore`. + + Parameters + ---------- + protocoldagresult + ProtocolDAGResult to store. + scope + Scope to store ProtocolDAGResult under. + + Returns + ------- + ProtocolDAGResultRef + Reference to the serialized `ProtocolDAGResult` in the object store. + + """ + ok = protocoldagresult.ok() + route = "results" if ok else "failures" + + # build `location` based on gufe key + location = os.path.join( + "protocoldagresult", + *scope.to_tuple(), + protocoldagresult.transformation_key, + route, + protocoldagresult.key, + "obj.json", + ) + + # TODO: add support for compute client-side compressed protocoldagresults + pdr_jb = json.dumps( + protocoldagresult.to_dict(), cls=JSON_HANDLER.encoder + ).encode("utf-8") + response = self._store_bytes(location, pdr_jb) + + return ProtocolDAGResultRef( + location=location, + obj_key=protocoldagresult.key, + scope=scope, + ok=ok, + ) + + + def push_protocoldagresult( self, protocoldagresult: ProtocolDAGResult,