Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions backend/pg_queue/migrations/0004_pgbarrierstate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Generated by Django 4.2.1 on 2026-06-12 14:49

import django.utils.timezone
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("pg_queue", "0003_pgqueuemessage_pg_queue_message_priority_range"),
]

operations = [
migrations.CreateModel(
name="PgBarrierState",
fields=[
("execution_id", models.TextField(primary_key=True, serialize=False)),
("remaining", models.IntegerField()),
("results", models.JSONField(default=list)),
("aborted", models.BooleanField(default=False)),
("created_at", models.DateTimeField(default=django.utils.timezone.now)),
("expires_at", models.DateTimeField()),
],
options={
"db_table": "pg_barrier_state",
"indexes": [
models.Index(fields=["expires_at"], name="pg_barrier_expires_idx")
],
},
),
]
41 changes: 41 additions & 0 deletions backend/pg_queue/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,44 @@ class Meta:
name="pg_queue_message_dequeue_idx",
)
]


class PgBarrierState(models.Model):
"""Per-execution fan-in barrier state for ``PgBarrier`` (the Postgres
``WORKER_BARRIER_BACKEND``).

One row per in-flight barrier (keyed by ``execution_id``). The worker-side
``barrier_pg_decr_and_check`` link task atomically decrements ``remaining``
and appends to ``results`` in a single ``UPDATE … RETURNING``; the task that
drives ``remaining`` to 0 dispatches the aggregating callback and deletes the
row. ``aborted`` is claimed once (``UPDATE … WHERE NOT aborted RETURNING``)
so N concurrent header-task failures collapse to a single cleanup. Like the
Redis backend's TTL, ``expires_at`` bounds an orphaned barrier (header tasks
that never complete); the tasks delete past-expiry rows opportunistically and
a periodic sweep is the backstop.

Managed=True / generated migration — no DB-side function, extension-free
(UN-3533), same posture as ``PgQueueMessage``.
"""

execution_id = models.TextField(primary_key=True)
# Header tasks still pending. The last task to decrement it to 0 fires the
# callback. A value < 0 (decrement after expiry/cleanup) means the barrier
# was already torn down — the task cleans up without firing.
remaining = models.IntegerField()
# Aggregated header-task results, appended in completion order (JSONB array).
results = models.JSONField(default=list)
# Set once on the first header-task failure so the abort cleanup runs once.
aborted = models.BooleanField(default=False)
created_at = models.DateTimeField(default=timezone.now)
# Orphan bound (Redis-TTL equivalent): a barrier whose header tasks never
# complete is reclaimable past this. Must exceed the longest execution
# wall-clock, same budgeting as WORKER_BARRIER_KEY_TTL_SECONDS.
expires_at = models.DateTimeField()

class Meta:
Comment thread
muhammad-ali-e marked this conversation as resolved.
db_table = "pg_barrier_state"
indexes = [
# Drives the opportunistic/periodic expiry sweep.
models.Index(fields=["expires_at"], name="pg_barrier_expires_idx"),
]
11 changes: 11 additions & 0 deletions workers/queue_backend/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
from .decorator import worker_task
from .dispatch import dispatch
from .fairness import FairnessKey
from .pg_barrier import (
PgBarrier,
barrier_pg_abort,
barrier_pg_decr_and_check,
)
from .redis_barrier import (
RedisDecrBarrier,
barrier_abort,
Expand All @@ -54,10 +59,13 @@
"BarrierHandle",
"CeleryChordBarrier",
"FairnessKey",
"PgBarrier",
"QueueBackend",
"RedisDecrBarrier",
"barrier_abort",
"barrier_decr_and_check",
"barrier_pg_abort",
"barrier_pg_decr_and_check",
"dispatch",
"get_barrier",
"select_backend",
Expand All @@ -77,6 +85,7 @@ class BarrierBackend(StrEnum):

CHORD = "chord"
REDIS = "redis"
PG = "pg"


def get_barrier() -> Barrier:
Expand Down Expand Up @@ -110,6 +119,8 @@ def get_barrier() -> Barrier:
return CeleryChordBarrier()
if backend is BarrierBackend.REDIS:
return RedisDecrBarrier()
if backend is BarrierBackend.PG:
return PgBarrier()
# Unreachable — StrEnum constructor would have raised above for
# anything not in the enum. Defensive raise so the type checker
# sees an exhaustive match.
Expand Down
Loading