-
Notifications
You must be signed in to change notification settings - Fork 175
Add mrd pool cache #846
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add mrd pool cache #846
Changes from all commits
cb7c907
04b6d1e
766be8c
5bd9ec7
43d8fe4
3e5c6ea
43840a1
f02ea16
ace3018
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ | |
| from enum import Enum | ||
| from glob import has_magic | ||
|
|
||
| import fsspec | ||
| from fsspec import asyn | ||
| from fsspec.callbacks import NoOpCallback | ||
| from google.api_core import exceptions as api_exceptions | ||
|
|
@@ -89,12 +90,23 @@ class ExtendedGcsFileSystem(GCSFileSystem): | |
| to the parent class GCSFileSystem for default processing. | ||
| """ | ||
|
|
||
| def __init__(self, *args, finalize_on_close=False, **kwargs): | ||
| def __init__( | ||
| self, | ||
| *args, | ||
| finalize_on_close=False, | ||
| mrd_pool_cache_size=16, | ||
| max_mrd_pool_cache_queue_size=8, | ||
| **kwargs, | ||
| ): | ||
| """ | ||
| Parameters | ||
| ---------- | ||
| finalize_on_close : bool, default False | ||
| By default, files in zonal buckets are left unfinalized to allow appends. | ||
| mrd_pool_cache_size : int, default 16 | ||
| Maximum number of idle pools to retain in the cache. | ||
| max_mrd_pool_cache_queue_size : int, default 8 | ||
| Maximum number of idle MRDs per key in the cache. | ||
| **kwargs : dict | ||
| Additional arguments passed to GCSFileSystem. | ||
| Supports retry configuration overrides for Storage Control API: | ||
|
|
@@ -130,6 +142,38 @@ def __init__(self, *args, finalize_on_close=False, **kwargs): | |
| max_workers=kwargs.get("memmove_max_workers", 8) | ||
| ) | ||
| weakref.finalize(self, self._memmove_executor.shutdown) | ||
| self._mrd_pool_cache = zb_hns_utils.MRDPoolCache( | ||
| self, | ||
| max_idle_pools=mrd_pool_cache_size, | ||
| max_queue_size=max_mrd_pool_cache_queue_size, | ||
| ) | ||
| weakref.finalize( | ||
| self, | ||
| self._finalize_mrd_pool_cache, | ||
| self.loop, | ||
| self._mrd_pool_cache, | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def _finalize_mrd_pool_cache(loop, cache): | ||
| """Tear down the MRDPoolCache when ExtendedGcsFileSystem is garbage collected.""" | ||
| if cache is None or getattr(cache, "_closed", False): | ||
| return | ||
|
|
||
| try: | ||
| current_loop = asyncio.get_running_loop() | ||
| except RuntimeError: | ||
| current_loop = None | ||
|
|
||
| if loop and loop.is_running(): | ||
| asyncio.run_coroutine_threadsafe(cache.close(), loop) | ||
| elif current_loop is not None and current_loop.is_running(): | ||
| asyncio.run_coroutine_threadsafe(cache.close(), current_loop) | ||
| elif asyn.loop[0] is not None and asyn.loop[0].is_running(): | ||
| try: | ||
| asyn.sync(asyn.loop[0], cache.close, timeout=5.0) | ||
| except fsspec.FSTimeoutError: | ||
| pass | ||
|
|
||
| @property | ||
| def _user_project(self): | ||
|
|
@@ -190,6 +234,26 @@ async def _get_control_plane_client(self): | |
| ) | ||
| return self._storage_control_client | ||
|
|
||
| async def close_resources(self): | ||
| """Close gRPC clients, channels, and other resources.""" | ||
| if self._grpc_client is not None: | ||
| try: | ||
| await self._grpc_client.grpc_client.transport.close() | ||
| except Exception as e: | ||
| logger.warning(f"Failed to close grpc_client: {e}") | ||
| self._grpc_client = None | ||
| if self._storage_control_client is not None: | ||
| try: | ||
| await self._storage_control_client.transport.close() | ||
| except Exception as e: | ||
| logger.warning(f"Failed to close storage_control_client: {e}") | ||
| self._storage_control_client = None | ||
| if self._mrd_pool_cache is not None: | ||
| try: | ||
| await self._mrd_pool_cache.close() | ||
| except Exception as e: | ||
| logger.warning(f"Failed to close MRDPoolCache: {e}") | ||
|
|
||
| async def _lookup_bucket_type(self, bucket): | ||
| if bucket in self._storage_layout_cache: | ||
| return self._storage_layout_cache[bucket] | ||
|
|
@@ -358,10 +422,9 @@ async def _fetch_range_split( | |
| if mrd is None: | ||
| # If no mrd is provided, we create one with pool size equal to passed concurrency. | ||
| pool_size = min(len(chunk_lengths), concurrency) | ||
| mrd = zb_hns_utils.MRDPool( | ||
| self, bucket, object_name, generation, pool_size=pool_size | ||
| mrd = await self._mrd_pool_cache.get( | ||
| bucket, object_name, generation, pool_size=pool_size | ||
| ) | ||
| await mrd.initialize() | ||
| pool_created_here = True | ||
|
|
||
| tasks = [] | ||
|
|
@@ -511,10 +574,9 @@ async def _cat_file( | |
| ) | ||
|
|
||
| # Instantiate an MRDPool locally for this call | ||
| mrd = zb_hns_utils.MRDPool( | ||
| self, bucket, object_name, generation, pool_size=concurrency | ||
| mrd = await self._mrd_pool_cache.get( | ||
| bucket, object_name, generation, pool_size=concurrency | ||
| ) | ||
| await mrd.initialize() | ||
| pool_created_here = True | ||
|
|
||
| try: | ||
|
|
@@ -1570,48 +1632,46 @@ async def _get_file(self, rpath, lpath, callback=None, **kwargs): | |
| return | ||
| callback = callback or NoOpCallback() | ||
|
|
||
| mrd = None | ||
| mrd_pool = await self._mrd_pool_cache.get(bucket, key, generation, pool_size=1) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With a pool size of 1, the pool cache initializes a new pool every time, resulting in no performance improvements and added overhead. Do you have any measurements showing the improvements made?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 to this
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| try: | ||
| await self._get_grpc_client() | ||
| mrd = await zb_hns_utils.init_mrd(self.grpc_client, bucket, key, generation) | ||
|
|
||
| size = mrd.persisted_size | ||
| if size is None: | ||
| logger.warning( | ||
| f"AsyncMultiRangeDownloader (MRD) for {rpath} has no 'persisted_size'. " | ||
| "Falling back to _info() to get the file size. " | ||
| "This may result in incorrect behavior for unfinalized objects." | ||
| ) | ||
| size = (await self._info(rpath))["size"] | ||
| callback.set_size(size) | ||
| async with mrd_pool.get_mrd() as mrd: | ||
| size = mrd.persisted_size | ||
|
Yonghui-Lee marked this conversation as resolved.
|
||
| if size is None: | ||
| logger.warning( | ||
| f"AsyncMultiRangeDownloader (MRD) for {rpath} has no 'persisted_size'. " | ||
| "Falling back to _info() to get the file size. " | ||
| "This may result in incorrect behavior for unfinalized objects." | ||
| ) | ||
| size = (await self._info(rpath))["size"] | ||
| callback.set_size(size) | ||
|
|
||
| lparent = os.path.dirname(lpath) or os.curdir | ||
| os.makedirs(lparent, exist_ok=True) | ||
| lparent = os.path.dirname(lpath) or os.curdir | ||
| os.makedirs(lparent, exist_ok=True) | ||
|
|
||
| chunksize = kwargs.get("chunksize", 4096 * 32) # 128KB default | ||
| offset = 0 | ||
| chunksize = kwargs.get("chunksize", 4096 * 32) # 128KB default | ||
| offset = 0 | ||
|
|
||
| with open(lpath, "wb") as f2: | ||
| while True: | ||
| if offset >= size: | ||
| break | ||
| with open(lpath, "wb") as f2: | ||
| while True: | ||
| if offset >= size: | ||
| break | ||
|
|
||
| data = await zb_hns_utils.download_range( | ||
| offset=offset, length=chunksize, mrd=mrd | ||
| ) | ||
| if not data: | ||
| break | ||
| data = await zb_hns_utils.download_range( | ||
| offset=offset, length=chunksize, mrd=mrd | ||
| ) | ||
| if not data: | ||
| break | ||
|
|
||
| f2.write(data) | ||
| offset += len(data) | ||
| callback.relative_update(len(data)) | ||
| f2.write(data) | ||
| offset += len(data) | ||
| callback.relative_update(len(data)) | ||
| except Exception as e: | ||
| # Clean up the corrupted file before raising error | ||
| if os.path.exists(lpath): | ||
| os.remove(lpath) | ||
| raise e | ||
| finally: | ||
| await zb_hns_utils.close_mrd(mrd) | ||
| await mrd_pool.close() | ||
|
|
||
| async def _do_list_objects( | ||
| self, | ||
|
|
||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make this targetted import ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's for
fsspec.FSTimeoutError. We use the same import in the core.py.