Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion openviking/storage/queuefs/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ def _queue_worker_loop(
finally:
loop.close()

# Maximum time a single queue task may run before being cancelled.
_TASK_TTL_SECONDS: float = 600.0 # 10 minutes

async def _worker_async_concurrent(
self, queue: NamedQueue, stop_event: threading.Event, max_concurrent: int
) -> None:
Expand All @@ -213,9 +216,20 @@ async def process_one(data: Dict[str, Any]) -> None:
async with sem:
msg_id = data.get("id", "") if isinstance(data, dict) else ""
try:
await queue.process_dequeued(data)
await asyncio.wait_for(
queue.process_dequeued(data),
timeout=self._TASK_TTL_SECONDS,
)
# Ack after successful processing (delete from persistent storage).
await queue.ack(msg_id)
except asyncio.TimeoutError:
queue._on_process_error(
f"Task timed out after {self._TASK_TTL_SECONDS}s", data
)
logger.error(
f"[QueueManager] Task timeout for {queue.name} "
f"(msg_id={msg_id}, TTL={self._TASK_TTL_SECONDS}s)"
)
except Exception as e:
# Handler did not call report_error; decrement in_progress manually.
# Do NOT ack — let RecoverStale re-queue on next startup.
Expand Down
Loading