From 043c2494720f95e6e0ad9995928a6f8549f28bbf Mon Sep 17 00:00:00 2001 From: yc111233 Date: Mon, 6 Apr 2026 00:31:23 +0800 Subject: [PATCH] fix(queue): add per-task TTL to prevent indefinitely stuck queue tasks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Queue tasks that hang (e.g. due to an unresponsive VLM service or a deadlocked event loop) hold a semaphore slot forever and increment in_progress permanently. The only recovery mechanism — RecoverStale — runs only at startup, so a stuck task blocks the queue until the service is restarted. Wrap each task's process_dequeued call with asyncio.wait_for using a 10-minute TTL. On timeout the task is cancelled, _on_process_error decrements in_progress, and the message remains in SQLite 'processing' state for RecoverStale to re-queue on the next restart. Co-Authored-By: Claude Opus 4.6 --- openviking/storage/queuefs/queue_manager.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/openviking/storage/queuefs/queue_manager.py b/openviking/storage/queuefs/queue_manager.py index 76c14c3c2..e8f2b5906 100644 --- a/openviking/storage/queuefs/queue_manager.py +++ b/openviking/storage/queuefs/queue_manager.py @@ -199,6 +199,9 @@ def _queue_worker_loop( finally: loop.close() + # Maximum time a single queue task may run before being cancelled. + _TASK_TTL_SECONDS: float = 600.0 # 10 minutes + async def _worker_async_concurrent( self, queue: NamedQueue, stop_event: threading.Event, max_concurrent: int ) -> None: @@ -213,9 +216,20 @@ async def process_one(data: Dict[str, Any]) -> None: async with sem: msg_id = data.get("id", "") if isinstance(data, dict) else "" try: - await queue.process_dequeued(data) + await asyncio.wait_for( + queue.process_dequeued(data), + timeout=self._TASK_TTL_SECONDS, + ) # Ack after successful processing (delete from persistent storage). await queue.ack(msg_id) + except asyncio.TimeoutError: + queue._on_process_error( + f"Task timed out after {self._TASK_TTL_SECONDS}s", data + ) + logger.error( + f"[QueueManager] Task timeout for {queue.name} " + f"(msg_id={msg_id}, TTL={self._TASK_TTL_SECONDS}s)" + ) except Exception as e: # Handler did not call report_error; decrement in_progress manually. # Do NOT ack — let RecoverStale re-queue on next startup.