From df1993b266f32f6bb14e879faf2582c0cf86534f Mon Sep 17 00:00:00 2001 From: David Dotson Date: Thu, 23 Mar 2023 22:23:56 -0700 Subject: [PATCH 1/2] Beginning of work for result path upload Very much a work in progress; will be a separate PR from #98 --- alchemiscale/compute/client.py | 16 ++++++++ alchemiscale/compute/service.py | 57 +++++++++++++---------------- alchemiscale/interface/client.py | 3 ++ alchemiscale/storage/objectstore.py | 57 ++++++++++++++++++++++++----- 4 files changed, 93 insertions(+), 40 deletions(-) diff --git a/alchemiscale/compute/client.py b/alchemiscale/compute/client.py index baa19931..39f2ca9d 100644 --- a/alchemiscale/compute/client.py +++ b/alchemiscale/compute/client.py @@ -9,6 +9,7 @@ import json from urllib.parse import urljoin from functools import wraps +from pathlib import Path import requests from requests.auth import HTTPBasicAuth @@ -106,3 +107,18 @@ def set_task_result( pdr_sk = self._post_resource(f"tasks/{task}/results", data) return ScopedKey.from_dict(pdr_sk) + + def push_result_path( + self, task: ScopedKey, + protocoldagresult: ProtocolDAGResult, + path: Path + ) -> ScopedKey: + data = dict( + protocoldagresult=json.dumps( + protocoldagresult.to_dict(), cls=JSON_HANDLER.encoder + ) + ) + + pdr_sk = self._post_resource(f"tasks/{task}/results", data) + + return ScopedKey.from_dict(pdr_sk) diff --git a/alchemiscale/compute/service.py b/alchemiscale/compute/service.py index 515c1b2b..7e636a4e 100644 --- a/alchemiscale/compute/service.py +++ b/alchemiscale/compute/service.py @@ -22,7 +22,7 @@ from gufe.protocols.protocoldag import execute_DAG, ProtocolDAG, ProtocolDAGResult from .client import AlchemiscaleComputeClient -from ..storage.models import Task, TaskHub, ComputeServiceID +from ..storage.models import Task, TaskHub, ComputeServiceID, ObjectStoreRef from ..models import Scope, ScopedKey @@ -235,19 +235,40 @@ def task_to_protocoldag( ) return protocoldag, transformation, extends_protocoldagresult + + def _paths_to_objectstorerefs(self, outputs, task, protocoldagresult): + if isinstance(outputs, dict): + return {key: self.paths_to_objectstorerefs(value, task, protocoldagresult) for key, value in outputs.items()} + elif isinstance(outputs, list): + return [self.paths_to_objectstorerefs(value, task, protocoldagresult) for value in outputs] + elif isinstance(outputs, Path): + return self.client.push_result_path(task, protocoldagresult, outputs) + else: + return outputs + def push_result( self, task: ScopedKey, protocoldagresult: ProtocolDAGResult ) -> ScopedKey: - # TODO: this method should postprocess any paths, - # leaf nodes in DAG for blob results that should go to object store - - # TODO: ship paths to object store + + # for each terminal protocolunitresult, process Paths present + # push the file represented by each path to the object store, and replace + # the Path in the prototocolunitresult with an ObjectStoreRef + terminal_purs = protocoldagresult.terminal_protocol_unit_results + for terminal_pur in terminal_purs: + outputs = terminal_pur.outputs + processed_outputs = self._paths_to_objectstorerefs(outputs, task, protocoldagresult) + + # TODO: this is a little dirty; consider putting in a proper way to + # do this in gufe + terminal_pur._outputs = processed_outputs # finally, push ProtocolDAGResult sk: ScopedKey = self.client.set_task_result(task, protocoldagresult) return sk + + def execute(self, task: ScopedKey) -> ScopedKey: """Executes given Task. @@ -383,29 +404,3 @@ def start(self): def stop(self): self._stop = True - - -class AlchemiscaleComputeService(AsynchronousComputeService): - """Folding@Home-based compute service. - - This service is designed for production use with Folding@Home. - - """ - - def __init__(self, object_store, fah_work_server): - self.scheduler = sched.scheduler(time.time, self.int_sleep) - self.loop = asyncio.get_event_loop() - - self._stop = False - - async def get_new_tasks(self): - ... - - def start(self): - ... - while True: - if self._stop: - return - - def stop(self): - ... diff --git a/alchemiscale/interface/client.py b/alchemiscale/interface/client.py index 6d9dd9f6..1da4ea12 100644 --- a/alchemiscale/interface/client.py +++ b/alchemiscale/interface/client.py @@ -301,6 +301,9 @@ def set_tasks_status( """ if isinstance(tasks, ScopedKey): tasks = [tasks] + + status = TaskStatusEnum(status) + task_sks = [self._set_task_status(t, status) for t in tasks] return task_sks diff --git a/alchemiscale/storage/objectstore.py b/alchemiscale/storage/objectstore.py index 9409e857..fbcc0c41 100644 --- a/alchemiscale/storage/objectstore.py +++ b/alchemiscale/storage/objectstore.py @@ -137,20 +137,12 @@ def _get_bytes(self, location): return self.resource.Object(self.bucket, key).get()["Body"].read() def _store_path(self, location, path): - """ - For implementers: This should be blocking, even if the storage - backend allows asynchronous storage. - """ """ For implementers: This should be blocking, even if the storage backend allows asynchronous storage. """ key = os.path.join(self.prefix, location) - - with open(path, "rb") as f: - self.resource.Bucket(self.bucket).upload_fileobj(f, key) - - b = self.resource.Bucket(self.bucket) + self.resource.Bucket(self.bucket).upload_file(path, key) def _exists(self, location) -> bool: from botocore.exceptions import ClientError @@ -191,6 +183,53 @@ def _get_filename(self, location): return url + def push_result_artifact( + self, + protocoldagresult_key: GufeKey, + scope: Scope, + ) -> ProtocolDAGResultRef: + """Push given `ProtocolDAGResult` to this `ObjectStore`. + + Parameters + ---------- + protocoldagresult + ProtocolDAGResult to store. + scope + Scope to store ProtocolDAGResult under. + + Returns + ------- + ProtocolDAGResultRef + Reference to the serialized `ProtocolDAGResult` in the object store. + + """ + ok = protocoldagresult.ok() + route = "results" if ok else "failures" + + # build `location` based on gufe key + location = os.path.join( + "protocoldagresult", + *scope.to_tuple(), + protocoldagresult.transformation_key, + route, + protocoldagresult.key, + "obj.json", + ) + + # TODO: add support for compute client-side compressed protocoldagresults + pdr_jb = json.dumps( + protocoldagresult.to_dict(), cls=JSON_HANDLER.encoder + ).encode("utf-8") + response = self._store_bytes(location, pdr_jb) + + return ProtocolDAGResultRef( + location=location, + obj_key=protocoldagresult.key, + scope=scope, + ok=ok, + ) + + def push_protocoldagresult( self, protocoldagresult: ProtocolDAGResult, From c434cb17c82442cafd7f7be7067374d985841e20 Mon Sep 17 00:00:00 2001 From: David Dotson Date: Tue, 19 Sep 2023 13:10:54 -0700 Subject: [PATCH 2/2] Current state; adding stubs for objects we plan to subclass in filestorage --- alchemiscale/compute/client.py | 6 ++- alchemiscale/compute/filestorage.py | 66 +++++++++++++++++++++++++++++ alchemiscale/compute/service.py | 12 ------ alchemiscale/storage/objectstore.py | 1 + 4 files changed, 72 insertions(+), 13 deletions(-) create mode 100644 alchemiscale/compute/filestorage.py diff --git a/alchemiscale/compute/client.py b/alchemiscale/compute/client.py index dd20f008..e57ecb90 100644 --- a/alchemiscale/compute/client.py +++ b/alchemiscale/compute/client.py @@ -110,7 +110,7 @@ def set_task_result( return ScopedKey.from_dict(pdr_sk) - def push_result_path( + def push_resultfile( self, task: ScopedKey, protocoldagresult: ProtocolDAGResult, path: Path @@ -124,3 +124,7 @@ def push_result_path( pdr_sk = self._post_resource(f"tasks/{task}/results", data) return ScopedKey.from_dict(pdr_sk) + + + def check_exists_resultfile(self, location): + ... diff --git a/alchemiscale/compute/filestorage.py b/alchemiscale/compute/filestorage.py new file mode 100644 index 00000000..9e0f74cf --- /dev/null +++ b/alchemiscale/compute/filestorage.py @@ -0,0 +1,66 @@ +import pathlib +import shutil +import os +from typing import Union, Tuple, ContextManager + +from gufe.storage.externalresource.base import ExternalStorage +from gufe.storage.storagemanager import StorageManager, SingleProcDAGContextManager + +from gufe.storage.errors import ( + MissingExternalResourceError, ChangedExternalResourceError +) + +from ..models import ScopedKey +from .client import AlchemiscaleComputeClient + + +class ResultFileDAGContextManager(SingleProcDAGContextManager): + ... + + +class ResultFileStorageManager(StorageManager): + ... + + +class ResultFileStorage(ExternalStorage): + + # need some way of making sure files land in the right place in object store + # so somehow we need to communicate this in every call to API service, so + # it can translate what is being requested into the true location in the + # object store + + # task_sk may be the right thing here, but depends on if paths get shipped + # *before* or *after* ProtocolDAGResult returned by executor and uploaded + # it's better for us if paths get shipped *after*, since then we'll have + # the reference in the state store to use for routing into object store + def __init__(self, client: AlchemiscaleComputeClient, task_sk: ScopedKey): + self.client = client + + def _iter_contents(self, prefix=""): + raise NotImplementedError() + + def _store_bytes(self, location, byte_data): + """ + For implementers: This should be blocking, even if the storage + backend allows asynchronous storage. + """ + raise NotImplementedError() + + def _store_path(self, location, path): + """ + For implementers: This should be blocking, even if the storage + backend allows asynchronous storage. + """ + raise NotImplementedError() + + def _exists(self, location): + return self.client.check_exists_resultfile(location) + + def _delete(self, location): + raise NotImplementedError() + + def _get_filename(self, location): + raise NotImplementedError() + + def _load_stream(self, location): + raise NotImplementedError() diff --git a/alchemiscale/compute/service.py b/alchemiscale/compute/service.py index 64504f2f..5948d329 100644 --- a/alchemiscale/compute/service.py +++ b/alchemiscale/compute/service.py @@ -303,18 +303,6 @@ def _paths_to_objectstorerefs(self, outputs, task, protocoldagresult): def push_result( self, task: ScopedKey, protocoldagresult: ProtocolDAGResult ) -> ScopedKey: - - # for each terminal protocolunitresult, process Paths present - # push the file represented by each path to the object store, and replace - # the Path in the prototocolunitresult with an ObjectStoreRef - terminal_purs = protocoldagresult.terminal_protocol_unit_results - for terminal_pur in terminal_purs: - outputs = terminal_pur.outputs - processed_outputs = self._paths_to_objectstorerefs(outputs, task, protocoldagresult) - - # TODO: this is a little dirty; consider putting in a proper way to - # do this in gufe - terminal_pur._outputs = processed_outputs # finally, push ProtocolDAGResult sk: ScopedKey = self.client.set_task_result( diff --git a/alchemiscale/storage/objectstore.py b/alchemiscale/storage/objectstore.py index 55a2d2fe..f339107a 100644 --- a/alchemiscale/storage/objectstore.py +++ b/alchemiscale/storage/objectstore.py @@ -230,6 +230,7 @@ def push_result_artifact( ok=ok, ) + def push_protocoldagresult( self,