diff --git a/openviking/storage/queuefs/semantic_processor.py b/openviking/storage/queuefs/semantic_processor.py index a069c0bf9..f97b1b33b 100644 --- a/openviking/storage/queuefs/semantic_processor.py +++ b/openviking/storage/queuefs/semantic_processor.py @@ -491,7 +491,18 @@ async def _gen(idx: int, file_path: str) -> None: logger.warning(f"Failed to generate summary for {file_path}: {e}") file_summaries[idx] = {"name": file_name, "summary": ""} - await asyncio.gather(*[_gen(i, fp) for i, fp in pending_indices]) + # Process in batches to prevent event-loop starvation when the + # pending list is large (e.g. hundreds of files). Without batching + # all coroutines are spawned at once via asyncio.gather, which can + # deadlock the event loop even with a semaphore in place. + _BATCH_SIZE = 20 + for _batch_start in range(0, len(pending_indices), _BATCH_SIZE): + _batch = pending_indices[_batch_start : _batch_start + _BATCH_SIZE] + await asyncio.gather(*[_gen(i, fp) for i, fp in _batch]) + logger.info( + f"Summary batch {_batch_start // _BATCH_SIZE + 1}/" + f"{(len(pending_indices) + _BATCH_SIZE - 1) // _BATCH_SIZE} done" + ) file_summaries = [s for s in file_summaries if s is not None]