diff --git a/docs/source/_static/prefetch_dynamic_transition.png b/docs/source/_static/prefetch_dynamic_transition.png new file mode 100644 index 000000000..0a3cef445 Binary files /dev/null and b/docs/source/_static/prefetch_dynamic_transition.png differ diff --git a/docs/source/prefetcher.rst b/docs/source/prefetcher.rst index bec545285..b24afaa47 100644 --- a/docs/source/prefetcher.rst +++ b/docs/source/prefetcher.rst @@ -189,3 +189,18 @@ Multi Stream Performance (48 Process) +---------+--------------+-----------------+-----------------+--------------------------+------------------+------------------+--------------------------+ | rand | 100.00 | 8276.16 | 10171.59 | 10172.54 | 20621.17 | 23598.05 | 24086.18 | +---------+--------------+-----------------+-----------------+--------------------------+------------------+------------------+--------------------------+ + +Seeing the Prefetcher in Action +=============================== + +Real-world applications rarely do just one thing; they often switch between entirely different reading patterns mid-file. To understand how the engine balances aggressive fetching with careful memory management, we can look at a dynamic workload transition. + +The graph below plots the size of the data requested by the application (User Read Size) against the volume of data the prefetcher is fetching in the background (Scheduled/Queued Data). + +.. image:: _static/prefetch_dynamic_transition.png + +Notice how perfectly the prefetcher mirrors the application's changing behaviour across three distinct phases: + +1. **Smooth Sequential Reading (0–10s):** The application starts by reading steady 16MB chunks. After three consistent reads, the prefetcher confirms the pattern and confidently ramps up its background fetching. It maintains a comfortable buffer ahead of the application so the user never waits on the network. +2. **Pausing for Random Seeks (10–20s):** The application suddenly stops reading in a straight line and starts jumping randomly around the file. Prefetching is actively harmful here. The engine instantly detects the broken streak and drops the background buffer to zero, ensuring no network bandwidth or memory is wasted downloading unneeded data. +3. **Adjusting to Massive Reads (20–30s):** The application resumes reading sequentially, but this time stepping up to massive 100MB chunks. The algorithm quickly catches on. It detects the new streak, calculates the new rolling average, and rebuilds the background buffer—this time scaling it up safely to handle the much larger data chunks. diff --git a/gcsfs/prefetcher.py b/gcsfs/prefetcher.py index 840cec63b..639bee1fa 100644 --- a/gcsfs/prefetcher.py +++ b/gcsfs/prefetcher.py @@ -78,6 +78,24 @@ def average(self) -> int: return 1024 * 1024 # 1MB return self._sum // count + @property + def is_variable(self) -> bool: + """Determines if the history contains distinct chunk sizes.""" + count = len(self._history) + if count < 2: + return False + + first_val = self._history[0] + return any(val != first_val for val in self._history) + + @property + def last_value(self) -> int: + """Returns the most recent entry in the history.""" + if not self._history: + raise RuntimeError("No entry found in history") + + return self._history[-1] + def clear(self): """Clears the history and resets the sum to zero.""" logger.debug("Clearing RunningAverageTracker history.") @@ -101,6 +119,24 @@ class PrefetchProducer: # to maximum of 2 * io_size and 128MB MIN_PREFETCH_SIZE = 128 * 1024 * 1024 + # The prefetching starts on the third read. + MIN_STREAKS_FOR_PREFETCHING = 3 + + # Threshold for disabling proactive prefetching on large, variable reads. + # + # If the average read size exceeds this value and patterns are variable, + # prefetching shifts from an I/O bottleneck to a CPU bottleneck. When a user + # requests random massive sizes (e.g., jumping between 64MB and INF), the + # producer still fetches chunks based on the rolling average. The consumer + # then has to pick up multiple chunks and stitch them together to match the + # exact requested size. + # + # For small average read sizes, this byte assembly is fast and the bottleneck + # remains the network I/O. However, for massive reads (>= 64MB), the extra + # step of copying and assembling huge byte strings in memory severely slows + # down the operation. + VARIABLE_IO_THRESHOLD = 64 * 1024 * 1024 + def __init__( self, fetcher, @@ -108,10 +144,9 @@ def __init__( concurrency: int, queue: asyncio.Queue, wakeup_event: asyncio.Event, - get_user_offset, - get_io_size, - get_sequential_streak, - on_error, + consumer: "PrefetchConsumer", + tracker: RunningAverageTracker, + orchestrator: "BackgroundPrefetcher", user_max_prefetch_size=None, ): """Initializes the background producer. @@ -122,10 +157,9 @@ def __init__( concurrency (int): Maximum number of concurrent fetch tasks. queue (asyncio.Queue): The shared queue to push download tasks into. wakeup_event (asyncio.Event): Event used to wake the producer from an idle state. - get_user_offset (Callable): Function returning the user's current read offset. - get_io_size (Callable): Function returning the adaptive IO size. - get_sequential_streak (Callable): Function returning the current sequential read streak. - on_error (Callable): Callback triggered when a background error occurs. + consumer (PrefetchConsumer): The consumer reading the prefetched chunks. + tracker (RunningAverageTracker): Tracker for history of read sizes. + orchestrator (BackgroundPrefetcher): The parent object managing the operation. user_max_prefetch_size (int, optional): A hard limit for prefetch size overrides. """ logger.debug( @@ -140,10 +174,9 @@ def __init__( self.queue = queue self.wakeup_event = wakeup_event - self.get_user_offset = get_user_offset - self.get_io_size = get_io_size - self.get_sequential_streak = get_sequential_streak - self.on_error = on_error + self.consumer = consumer + self.tracker = tracker + self.orchestrator = orchestrator self._user_max_prefetch_size = user_max_prefetch_size self.current_offset = 0 @@ -161,9 +194,9 @@ def max_prefetch_size(self) -> int: if self._user_max_prefetch_size is not None: return min( self._user_max_prefetch_size, - max(2 * self.get_io_size(), self.MIN_PREFETCH_SIZE), + max(2 * self.tracker.average, self.MIN_PREFETCH_SIZE), ) - return max(2 * self.get_io_size(), self.MIN_PREFETCH_SIZE) + return max(2 * self.tracker.average, self.MIN_PREFETCH_SIZE) def start(self): """Starts the background producer loop. @@ -246,23 +279,60 @@ async def _loop(self): if self.is_stopped: break - io_size = self.get_io_size() - streak = self.get_sequential_streak() - prefetch_size = min((streak + 1) * io_size, self.max_prefetch_size) + avg_io_size = self.tracker.average + streak = self.consumer.sequential_streak + is_variable = self.tracker.is_variable + last_read_size = self.tracker.last_value + + exceeds_user_max = ( + self._user_max_prefetch_size is not None + and avg_io_size > self._user_max_prefetch_size + ) + + # Disable prefetching ahead if highly variable AND average > 64MB, or if it exceeds user max + if ( + is_variable and avg_io_size > PrefetchProducer.VARIABLE_IO_THRESHOLD + ) or exceeds_user_max: + logger.debug( + "Large IO detected (variable > 64MB or > user max). Disabling background prefetching." + ) + prefetch_multiplier = 1 + elif streak < self.MIN_STREAKS_FOR_PREFETCHING: + prefetch_multiplier = 1 + else: + prefetch_multiplier = streak - self.MIN_STREAKS_FOR_PREFETCHING + 1 + + if self.queue.empty() or prefetch_multiplier == 1: + io_size = last_read_size + else: + io_size = avg_io_size + + prefetch_size = min( + prefetch_multiplier * io_size, self.max_prefetch_size + ) + if self.consumer.offset + prefetch_size < self.consumer.target_offset: + prefetch_size = self.consumer.target_offset - self.consumer.offset + + if is_variable: + effective_prefetch_size = prefetch_size + else: + effective_prefetch_size = (prefetch_size // io_size) * io_size + if effective_prefetch_size == 0: + effective_prefetch_size = prefetch_size logger.debug( "Producer awake. Current offset: %d, User offset: %d, Prefetch size: %d", self.current_offset, - self.get_user_offset(), + self.consumer.offset, prefetch_size, ) while ( not self.is_stopped - and (self.current_offset - self.get_user_offset()) < prefetch_size + and (self.current_offset - self.consumer.offset) < prefetch_size and self.current_offset < self.size ): - user_offset = self.get_user_offset() + user_offset = self.consumer.offset space_remaining = self.size - self.current_offset prefetch_space_available = prefetch_size - ( self.current_offset - user_offset @@ -278,14 +348,22 @@ async def _loop(self): else: actual_size = min(io_size, space_remaining) - if streak < 2: + if prefetch_space_available < actual_size: + if is_variable or prefetch_space_available == prefetch_size: + actual_size = prefetch_space_available + else: + break + + if streak < PrefetchProducer.MIN_STREAKS_FOR_PREFETCHING: sfactor = self.concurrency else: sfactor = min( self.concurrency, max( 1, - actual_size * self.concurrency // prefetch_size, + actual_size + * self.concurrency + // effective_prefetch_size, ), ) @@ -317,7 +395,7 @@ async def _loop(self): exc_info=True, ) self.is_stopped = True - self.on_error(e) + self.orchestrator._set_error(e) await self.queue.put(e) @@ -332,24 +410,25 @@ def __init__( self, queue: asyncio.Queue, wakeup_event: asyncio.Event, - is_producer_stopped, - on_error, + tracker: RunningAverageTracker, + orchestrator: "BackgroundPrefetcher", ): """Initializes the consumer. Args: queue (asyncio.Queue): The shared queue containing fetch tasks. wakeup_event (asyncio.Event): Event used to wake the producer when more data is needed. - is_producer_stopped (Callable): Function returning whether the producer has been halted. - on_error (Callable): Callback triggered when a fetch error is encountered. + tracker (RunningAverageTracker): Tracker for history of read sizes. + orchestrator (BackgroundPrefetcher): The parent object managing the operation. """ logger.debug("Initializing PrefetchConsumer.") self.queue = queue self.wakeup_event = wakeup_event - self.is_producer_stopped = is_producer_stopped - self.on_error = on_error + self.tracker = tracker + self.orchestrator = orchestrator self.sequential_streak = 0 self.offset = 0 + self.target_offset = 0 self._current_block = b"" self._current_block_idx = 0 @@ -364,6 +443,7 @@ def seek(self, new_offset: int): new_offset, ) self.offset = new_offset + self.target_offset = new_offset self.sequential_streak = 0 self._current_block = b"" self._current_block_idx = 0 @@ -384,12 +464,18 @@ async def _advance(self, size: int, save_data: bool) -> list[bytes]: chunks = [] processed = 0 + self.target_offset = self.offset + size while processed < size: available = len(self._current_block) - self._current_block_idx + trigger_wakeup = False if not available: - if self.is_producer_stopped() and self.queue.empty(): + is_producer_stopped = ( + not hasattr(self.orchestrator, "producer") + or self.orchestrator.producer.is_stopped + ) + if is_producer_stopped and self.queue.empty(): logger.debug("Consumer reached EOF.") break @@ -401,15 +487,38 @@ async def _advance(self, size: int, save_data: bool) -> list[bytes]: if isinstance(task, Exception): logger.error("Consumer retrieved an exception: %s", task) - self.on_error(task) + self.orchestrator._set_error(task) raise task try: block = await task self.sequential_streak += 1 - if self.sequential_streak >= 2: - self.wakeup_event.set() + if ( + self.sequential_streak + >= PrefetchProducer.MIN_STREAKS_FOR_PREFETCHING + ): + is_variable = self.tracker.is_variable + avg_io_size = self.tracker.average + + exceeds_user_max = ( + self.orchestrator.max_prefetch_size is not None + and avg_io_size > self.orchestrator.max_prefetch_size + ) + is_massive_variable = ( + is_variable + and avg_io_size > PrefetchProducer.VARIABLE_IO_THRESHOLD + ) + + # Suppress proactive wakeups to prevent large CPU assembly + # on erratic large reads or exceeding max + if not (is_massive_variable or exceeds_user_max): + trigger_wakeup = True + else: + logger.debug( + "Suppressing proactive producer wakeup due to massive variable" + " workload or exceeding user max prefetch." + ) self._current_block = block self._current_block_idx = 0 @@ -418,7 +527,7 @@ async def _advance(self, size: int, save_data: bool) -> list[bytes]: raise except Exception as e: logger.error("Consumer caught an error: %s", e, exc_info=True) - self.on_error(e) + self.orchestrator._set_error(e) raise e if not self._current_block: @@ -440,6 +549,8 @@ async def _advance(self, size: int, save_data: bool) -> list[bytes]: self._current_block_idx += take processed += take self.offset += take + if trigger_wakeup: + self.wakeup_event.set() return chunks @@ -504,6 +615,7 @@ def __init__(self, fetcher, size: int, concurrency: int, max_prefetch_size=None) ) self.size = size self.concurrency = concurrency + self.max_prefetch_size = max_prefetch_size if max_prefetch_size is not None and max_prefetch_size <= 0: logger.error("Invalid max_prefetch_size provided: %s", max_prefetch_size) @@ -523,8 +635,8 @@ def __init__(self, fetcher, size: int, concurrency: int, max_prefetch_size=None) self.consumer = PrefetchConsumer( queue=self.queue, wakeup_event=self.wakeup_event, - is_producer_stopped=self._is_producer_stopped, - on_error=self._set_error, + tracker=self.read_tracker, + orchestrator=self, ) self.producer = PrefetchProducer( @@ -533,10 +645,9 @@ def __init__(self, fetcher, size: int, concurrency: int, max_prefetch_size=None) concurrency=self.concurrency, queue=self.queue, wakeup_event=self.wakeup_event, - get_user_offset=lambda: self.consumer.offset, - get_io_size=self._get_adaptive_io_size, - get_sequential_streak=lambda: self.consumer.sequential_streak, - on_error=self._set_error, + consumer=self.consumer, + tracker=self.read_tracker, + orchestrator=self, user_max_prefetch_size=max_prefetch_size, ) @@ -554,12 +665,6 @@ def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit point. Ensures the prefetcher is cleanly closed.""" self.close() - def _get_adaptive_io_size(self) -> int: - return self.read_tracker.average - - def _is_producer_stopped(self) -> bool: - return self.producer.is_stopped if hasattr(self, "producer") else True - def _set_error(self, e: Exception): logger.error("Global error state set in BackgroundPrefetcher: %s", e) self._error = e diff --git a/gcsfs/tests/test_prefetcher.py b/gcsfs/tests/test_prefetcher.py index e0dc8e859..acba2213e 100644 --- a/gcsfs/tests/test_prefetcher.py +++ b/gcsfs/tests/test_prefetcher.py @@ -136,9 +136,9 @@ def test_producer_concurrency_streak_and_min_chunk(): original_min_chunk = bp.producer.MIN_CHUNK_SIZE bp.producer.MIN_CHUNK_SIZE = 10 - bp._fetch(0, 50) - bp._fetch(50, 100) - bp._fetch(100, 150) + # Do 6 reads to push the streak well past the MIN_STREAKS threshold + for i in range(6): + bp._fetch(i * 50, (i + 1) * 50) fsspec.asyn.sync(bp.loop, asyncio.sleep, 0.1) @@ -172,17 +172,14 @@ def test_producer_loop_space_constraints(): def test_producer_error_propagation(): - fetcher = MockFetcher(b"A" * 1000, fail_at_call=3) - bp = BackgroundPrefetcher(fetcher=fetcher, size=1000, concurrency=4) - bp.read_tracker.add(100) + fetcher = MockFetcher(b"A" * 2000, fail_at_call=3) + bp = BackgroundPrefetcher(fetcher=fetcher, size=2000, concurrency=4) - assert bp._fetch(0, 100) == b"A" * 100 + for i in range(2): + bp._fetch(i * 100, (i + 1) * 100) with pytest.raises(OSError, match="Simulated Network Timeout"): - bp._fetch(100, 500) - - assert bp.is_stopped is True - bp.close() + bp._fetch(400, 500) def test_read_after_close_or_error(): @@ -353,18 +350,18 @@ def test_producer_min_chunk_logic(): def test_producer_loop_exception(): - bp = BackgroundPrefetcher(fetcher=MockFetcher(b""), size=100, concurrency=4) + bp = BackgroundPrefetcher(fetcher=MockFetcher(b"A" * 100), size=100, concurrency=4) error_object = ValueError("Producer crash") - bp.producer.get_io_size = mock.Mock(side_effect=error_object) - with pytest.raises(ValueError, match="Producer crash"): - bp._fetch(0, 10) + with mock.patch( + "gcsfs.prefetcher.RunningAverageTracker.average", new_callable=mock.PropertyMock + ) as mocked_avg: + mocked_avg.side_effect = error_object + with pytest.raises(ValueError, match="Producer crash"): + bp._fetch(0, 10) assert bp.is_stopped is True assert bp._error == error_object - - with pytest.raises(ValueError, match="Producer crash"): - bp._fetch(0, 10) bp.close() @@ -507,7 +504,10 @@ def test_producer_min_chunk_inner_break(): async def trigger_loop(): bp.producer.current_offset = 250 bp.consumer.offset = 0 - bp.consumer.sequential_streak = 3 # makes prefetch_size = (3+1) * 100 = 400 + bp.consumer.target_offset = 0 + # streak=6 makes prefetch_multiplier = 4 (6 - 3 + 1) + # prefetch_size = 4 * 100 = 400 + bp.consumer.sequential_streak = 6 bp.wakeup_event.set() await asyncio.sleep(0.05) @@ -533,3 +533,95 @@ async def trigger_stop_and_wake(): # Verify the producer gracefully exited without doing work assert fetcher.call_count == 0 bp.close() + + +def test_massive_read_disables_proactive_prefetching(): + fetcher = MockFetcher(b"X" * 1000) + + # max_prefetch_size = 40 + bp = BackgroundPrefetcher( + fetcher=fetcher, size=1000, concurrency=4, max_prefetch_size=40 + ) + + # Do enough reads to build a sequential streak and trigger large averages + # Reading 60 bytes at a time. Average = 60. Threshold = 50. + for i in range(4): + bp._fetch(i * 60, (i + 1) * 60) + + fsspec.asyn.sync(bp.loop, asyncio.sleep, 0.1) + + # Because average (60) > threshold (40), prefetch_multiplier is pinned to 1. + # The producer should only fetch what the user specifically read (4 * 60 = 240) + # and should NOT have pre-fetched any additional data ahead into the queue. + assert bp.producer.current_offset == 240 + bp.close() + + +def test_normal_read_allows_proactive_prefetching(): + fetcher = MockFetcher(b"X" * 1000) + + # max_prefetch_size = 200 makes dynamic threshold = 100 + bp = BackgroundPrefetcher( + fetcher=fetcher, size=1000, concurrency=4, max_prefetch_size=200 + ) + + # Reading 60 bytes at a time. Average = 60. Threshold = 100. + for i in range(4): + bp._fetch(i * 60, (i + 1) * 60) + + fsspec.asyn.sync(bp.loop, asyncio.sleep, 0.1) + + # Because average (60) <= threshold (100), the producer allows prefetching. + # It calculates a normal prefetch_multiplier > 1 and pre-fetches data ahead. + assert bp.producer.current_offset > 240 + bp.close() + + +def test_target_offset_expands_prefetch(): + fetcher = MockFetcher(b"X" * 1000) + bp = BackgroundPrefetcher(fetcher=fetcher, size=1000, concurrency=4) + + # Seed tracker to keep the default `max_prefetch_size` calculation small + bp.read_tracker.add(10) + + # The consumer requests a massive chunk (500 bytes), far exceeding normal prefetch windows + bp._fetch(0, 500) + + fsspec.asyn.sync(bp.loop, asyncio.sleep, 0.1) + + # The new target_offset logic should explicitly tell the producer to expand its + # boundary to cover the requested 500 bytes, overriding the tiny multiplier logic. + assert bp.consumer.target_offset == 500 + assert bp.producer.current_offset >= 500 + bp.close() + + +def test_producer_min_chunk_inner_empty_queue_shrink(): + fetcher = MockFetcher(b"X" * 1000) + bp = BackgroundPrefetcher( + fetcher=fetcher, size=1000, concurrency=4, max_prefetch_size=400 + ) + + bp.read_tracker.add(100) + + original_min_chunk = bp.producer.MIN_CHUNK_SIZE + bp.producer.MIN_CHUNK_SIZE = 200 + + async def trigger_loop(): + # Setup conditions where the queue is empty and the user is waiting + # This makes prefetch_space_available exactly equal to prefetch_size + bp.producer.current_offset = 0 + bp.consumer.offset = 0 + bp.consumer.target_offset = 0 + bp.consumer.sequential_streak = 6 + bp.wakeup_event.set() + await asyncio.sleep(0.05) + + fsspec.asyn.sync(bp.loop, trigger_loop) + + # Because space_available == prefetch_size, it triggers the shrink condition + # instead of breaking, ensuring the blocked consumer gets its data. + assert fetcher.call_count > 0 + + bp.producer.MIN_CHUNK_SIZE = original_min_chunk + bp.close()