diff --git a/openviking/storage/queuefs/queue_manager.py b/openviking/storage/queuefs/queue_manager.py index 76c14c3c2..e8f2b5906 100644 --- a/openviking/storage/queuefs/queue_manager.py +++ b/openviking/storage/queuefs/queue_manager.py @@ -199,6 +199,9 @@ def _queue_worker_loop( finally: loop.close() + # Maximum time a single queue task may run before being cancelled. + _TASK_TTL_SECONDS: float = 600.0 # 10 minutes + async def _worker_async_concurrent( self, queue: NamedQueue, stop_event: threading.Event, max_concurrent: int ) -> None: @@ -213,9 +216,20 @@ async def process_one(data: Dict[str, Any]) -> None: async with sem: msg_id = data.get("id", "") if isinstance(data, dict) else "" try: - await queue.process_dequeued(data) + await asyncio.wait_for( + queue.process_dequeued(data), + timeout=self._TASK_TTL_SECONDS, + ) # Ack after successful processing (delete from persistent storage). await queue.ack(msg_id) + except asyncio.TimeoutError: + queue._on_process_error( + f"Task timed out after {self._TASK_TTL_SECONDS}s", data + ) + logger.error( + f"[QueueManager] Task timeout for {queue.name} " + f"(msg_id={msg_id}, TTL={self._TASK_TTL_SECONDS}s)" + ) except Exception as e: # Handler did not call report_error; decrement in_progress manually. # Do NOT ack — let RecoverStale re-queue on next startup.