UN-3548 [FEAT] PgBarrier — Postgres fan-in barrier (3rd WORKER_BARRIER_BACKEND)#2053
UN-3548 [FEAT] PgBarrier — Postgres fan-in barrier (3rd WORKER_BARRIER_BACKEND)#2053muhammad-ali-e wants to merge 3 commits into
Conversation
…R_BACKEND)
Add a Postgres Barrier substrate selected by WORKER_BARRIER_BACKEND=pg (default
stays chord). Moves the fan-in aggregation ("wait for N header tasks, then fire
the callback with their results") onto a pg_barrier_state row — the same DB that
holds the PG queue, so an execution can coordinate without Redis/RabbitMQ. The
9e pipeline on-ramp primitive.
Mirrors RedisDecrBarrier 1:1 — same Barrier protocol, fairness plumbing,
Celery-dispatched header tasks with .link/.link_error, empty->None,
missing-execution_id->raise, mid-loop dispatch cleanup. Defaults-off, zero
behaviour change until the flag flips.
- Schema (backend/pg_queue): pg_barrier_state (execution_id PK, remaining,
results jsonb, aborted, expires_at) + migration 0004.
- pg_barrier.py: PgBarrier + barrier_pg_decr_and_check / barrier_pg_abort.
Atomic decrement is ONE statement (UPDATE ... SET remaining = remaining-1,
results = results || jsonb_build_array(%s) ... RETURNING remaining, results,
aborted) — row lock serialises concurrent decrements so exactly one sees 0;
no Lua. Guards: reads aborted in the same statement (never fires partial),
row-missing / negative-remaining clean up without firing, callback dispatched
BEFORE row delete. Orphan bound via expires_at + opportunistic sweep in
enqueue (periodic sweep is the backstop).
- __init__.py: BarrierBackend.PG -> PgBarrier() in get_barrier().
Tests: protocol shape, TTL env validation, enqueue (upsert/links/fairness/
stale-reset/expiry-sweep/mid-loop-cleanup), decr paths (pending/complete-fires/
aborted/negative/missing/unserialisable), abort (claim+delete/dedup), and a real
two-connection decrement-atomicity check (exactly one sees 0). Selector PG case.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Automated PR review (PR Review Toolkit: code-reviewer, silent-failure-hunter, type-design-analyzer, pr-test-analyzer, comment-analyzer, code-simplifier).
The implementation is high quality and faithfully mirrors RedisDecrBarrier — the atomic single-statement decrement, the aborted-read-in-decrement guard, and dispatch-before-delete are all sound. Findings below are inline. Highest priority: abort claim/delete atomicity, the per-enqueue orphan sweep on the hot path, and jsonb's rejection of \u0000 (which json.dumps accepts). Nothing here is blocking the safety-critical "never fire with partial results" property, which holds.
High: - Abort is now ONE atomic statement: `WITH claimed AS (DELETE ... RETURNING) ...` — claim+teardown in a single transaction (no claimed-but-not-deleted window; a crash rolls back so a sibling retries). This makes the `aborted` column redundant — dropped it; the decrement's "row missing -> abandoned" branch now covers the failed-task case. The callback can only fire when remaining hits 0 (all tasks succeeded), so a failed task (which deletes the row) can never let a partial-results fire. - Dropped the per-enqueue global orphan sweep (unbounded DELETE on the hot path, deadlock-prone, shared the UPSERT txn). Reclaim is a future periodic sweep. - A NUL byte survives json.dumps but jsonb rejects it -> catch the DataError and tear the barrier down (fail fast) instead of hanging to expiry. Medium: - Post-dispatch row delete is best-effort (logged, not raised) so a delete error can't mask the already-fired callback; documented the no-double-fire invariant (last decrement + max_retries=0). - Added a DB CheckConstraint (expires_at > created_at) — the one writer-proof invariant; Meta comment warns off a `remaining >= 0` check (teardown needs negative). Softened the "periodic sweep" comments to future/not-yet-shipped. Low: - Extracted shared `barrier_ttl_seconds()` + `CallbackDescriptor` into barrier.py; both backends import them (redis keeps back-compat aliases). signature_kwargs dict instead of inline ** spread. Atomicity comment notes the per-transaction premise. Tests: callback-dispatch-failure-preserves-row; decrement-after-abort-no-fire; atomicity through barrier_pg_decr_and_check (two threads, exactly one fires); list-result-as-single-element; NUL-byte teardown; DB-constraint; max_retries=0. 92 barrier tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Review feedback addressed —
|
|
| Filename | Overview |
|---|---|
| workers/queue_backend/pg_barrier.py | New PgBarrier implementation — atomic UPDATE…RETURNING decrement, DELETE-based abort, thread-local connection management, and mid-loop cleanup are all correct; module docstring incorrectly lists a non-existent aborted column in the wire-model description. |
| backend/pg_queue/models.py | Adds PgBarrierState model with correct fields (execution_id PK, remaining, results jsonb, created_at, expires_at), expires_at index, and a CHECK constraint enforcing expires_at > created_at. |
| backend/pg_queue/migrations/0004_pgbarrierstate_and_more.py | Clean auto-generated migration creating pg_barrier_state table and the expires_after_created CHECK constraint; correctly depends on migration 0003. |
| workers/queue_backend/barrier.py | Moves barrier_ttl_seconds() and CallbackDescriptor TypedDict from redis_barrier.py to the shared barrier.py module so both backends share a single definition; no behavioural changes. |
| workers/queue_backend/redis_barrier.py | Pure refactor — removes the local TTL function and CallbackDescriptor, replaces them with aliases to the shared definitions in barrier.py; no behavioural change to RedisDecrBarrier. |
| workers/queue_backend/init.py | Adds BarrierBackend.PG enum value and the corresponding get_barrier() branch; exports PgBarrier, barrier_pg_abort, barrier_pg_decr_and_check through all. |
| workers/tests/test_pg_barrier.py | Good three-layer test coverage (protocol shape, real-DB enqueue/decrement/abort, two-connection atomicity); PR description claims expiry-sweep is tested but no such test is present. |
| workers/tests/test_barrier_backend_selection.py | Adds enum value and factory assertions for BarrierBackend.PG; straightforward additions mirroring existing chord/redis patterns. |
Sequence Diagram
sequenceDiagram
participant Caller
participant PgBarrier
participant PgDB as pg_barrier_state
participant HeaderTask
participant DecrTask as barrier_pg_decr_and_check
participant AbortTask as barrier_pg_abort
participant Callback
Caller->>PgBarrier: enqueue(header_tasks, callback_kwargs)
PgBarrier->>PgDB: "UPSERT (execution_id, remaining=N, results=[])"
loop for each header_task
PgBarrier->>HeaderTask: apply_async() + .link(DecrTask) + .link_error(AbortTask)
end
PgBarrier-->>Caller: "PgBarrierHandle(id=execution_id)"
alt Header task succeeds
HeaderTask->>DecrTask: invoke(result, execution_id, callback_descriptor)
DecrTask->>PgDB: "UPDATE SET remaining-1, results||result RETURNING remaining,results"
alt "remaining > 0"
DecrTask-->>DecrTask: return pending
else "remaining == 0"
DecrTask->>Callback: apply_async(all_results)
DecrTask->>PgDB: DELETE best-effort
DecrTask-->>DecrTask: return complete
else "remaining < 0 or row is None"
DecrTask->>PgDB: DELETE if needed
DecrTask-->>DecrTask: return abandoned
end
else Header task fails
HeaderTask->>AbortTask: invoke(execution_id)
AbortTask->>PgDB: DELETE WHERE execution_id RETURNING
alt row existed
AbortTask-->>AbortTask: return aborted
else row gone
AbortTask-->>AbortTask: return already_aborted
end
end
Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 2
workers/queue_backend/pg_barrier.py:19-20
The wire-model docstring in point 1 lists `aborted = false` as a column in the UPSERT, but no such column exists — neither in the `PgBarrierState` Django model, the migration `0004`, nor in any of the SQL statements in this file. The abort path achieves its dedup guarantee through `DELETE … RETURNING` (the row's existence is the token), not through a boolean flag. A future developer reading this docstring would look for an `aborted` column that isn't there, or — worse — add one thinking it was accidentally dropped.
```suggestion
1. ``enqueue``: UPSERT one ``pg_barrier_state`` row (``remaining = N``,
``results = []``, ``expires_at = now() + ttl``) — the
```
### Issue 2 of 2
workers/tests/test_pg_barrier.py:1-10
**Missing expiry-sweep test despite PR description claiming coverage**
The PR description lists "expiry-sweep" as an explicitly tested enqueue path, but no such test exists in this file. The `test_upsert_overwrites_stale_state` test covers re-use of a prior barrier row, not an expiry-sweep scenario (seeding rows past `expires_at` and verifying they are cleaned up by a sweep query). Since no sweep job ships in this phase there is nothing wrong with the implementation, but the test suite description is inaccurate and could give the wrong confidence signal to reviewers of the future sweep PR.
Reviews (1): Last reviewed commit: "UN-3548 [FIX] Address PR #2053 review fe..." | Re-trigger Greptile
| """Tests for :class:`queue_backend.pg_barrier.PgBarrier`. | ||
|
|
||
| Three layers: | ||
|
|
||
| 1. **Protocol / TTL** — no DB, no Celery. | ||
| 2. **Enqueue + link/abort tasks** — a real autocommit Postgres connection is | ||
| injected into the module thread-local (the barrier's SQL runs for real); the | ||
| Celery header-task dispatch + callback are mocked. Skips if Postgres is | ||
| unreachable or the ``pg_barrier_state`` migration is unapplied. | ||
| 3. **Atomicity** — two real connections race the decrement SQL directly. |
There was a problem hiding this comment.
Missing expiry-sweep test despite PR description claiming coverage
The PR description lists "expiry-sweep" as an explicitly tested enqueue path, but no such test exists in this file. The test_upsert_overwrites_stale_state test covers re-use of a prior barrier row, not an expiry-sweep scenario (seeding rows past expires_at and verifying they are cleaned up by a sweep query). Since no sweep job ships in this phase there is nothing wrong with the implementation, but the test suite description is inaccurate and could give the wrong confidence signal to reviewers of the future sweep PR.
Prompt To Fix With AI
This is a comment left during a code review.
Path: workers/tests/test_pg_barrier.py
Line: 1-10
Comment:
**Missing expiry-sweep test despite PR description claiming coverage**
The PR description lists "expiry-sweep" as an explicitly tested enqueue path, but no such test exists in this file. The `test_upsert_overwrites_stale_state` test covers re-use of a prior barrier row, not an expiry-sweep scenario (seeding rows past `expires_at` and verifying they are cleaned up by a sweep query). Since no sweep job ships in this phase there is nothing wrong with the implementation, but the test suite description is inaccurate and could give the wrong confidence signal to reviewers of the future sweep PR.
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
There was a problem hiding this comment.
Good catch — the PR description was stale, not the code. I removed the inline opportunistic sweep during the toolkit pass (it was unbounded/deadlock-prone on the hot path), and its test_opportunistic_expiry_sweep went with it, but the description still listed "expiry-sweep". Updated the PR body: dropped the "expiry-sweep" tested-path claim and corrected the orphan-bound line to "expires_at + a future (not-yet-shipped) periodic sweep." No sweep ships this phase, so nothing to test yet.
…ring The wire-model docstring's enqueue step still listed `aborted = false` as an UPSERT column after the column was removed (abort now dedups via DELETE … RETURNING / row existence). Remove it so a reader doesn't hunt for — or re-add — a column that no longer exists. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Greptile feedback addressedBoth findings were stale references my own refactor left behind (the code is correct) — Greptile 4/5, safe-to-merge.
No code-behaviour change beyond the docstring; 92 barrier tests still green. |



Adds a third
Barriersubstrate,PgBarrier, selected byWORKER_BARRIER_BACKEND=pg(default stayschord). It moves the fan-in aggregation — "wait for N header tasks, then fire the callback with their results" — onto a Postgres row, so an execution can coordinate in the same DB that holds the PG queue, with no Redis or RabbitMQ chord backend. The 9e pipeline on-ramp primitive. Targetsfeat/UN-3445-pg-queue-integration.Mirrors
RedisDecrBarrier(6b) 1:1 — sameBarrierprotocol, fairness plumbing, Celery-dispatched header tasks with.link/.link_error,empty → None, missing-execution_id→ raise, mid-loop dispatch cleanup. Defaults-off, zero behaviour change until the flag flips. (Header-task transport stays Celery; only the coordination moves — routing header tasks through PG is 9e proper.)What
backend/pg_queue/):pg_barrier_state(execution_id PK, remaining, results jsonb, aborted, expires_at) + migration0004.pg_barrier.py:PgBarrier+barrier_pg_decr_and_check/barrier_pg_abort.__init__.py:BarrierBackend.PG→PgBarrier()inget_barrier().The atomic decrement is one statement (no Lua)
The row lock serialises concurrent decrements, so exactly one task observes
remaining = 0— verified by a real two-connection atomicity test.Failure-masking guards
abortedin the same statement → never fires the callback with partial results, even if an abort set the flag but its DELETE hadn't run.remaining < 0(expiry/replay) → clean up, no fire.apply_asyncfailure leaves the row + expiry in place rather than stranding the execution.UPDATE … SET aborted WHERE NOT aborted RETURNING.expires_at; a periodic sweep job (not yet shipped) is the reclaim backstop. (Abort dedups viaDELETE … RETURNING/ row existence — noabortedcolumn.)Testing
pgcase — protocol shape, TTL env validation, enqueue (upsert / links / fairness / stale-reset / mid-loop-cleanup), decrement paths (pending / complete-fires / decrement-after-abort / negative / missing / unserialisable / NUL-teardown / dispatch-failure-preserves-row / list-result), abort (claim+delete / dedup), a real two-connection atomicity check driven throughbarrier_pg_decr_and_check(exactly one fires), plus DB-constraint andmax_retries=0.get_barrier(pg)→PgBarrier.pg_barrier_staterow via the liveenqueuepath (remaining counts down, results grows) to confirm the on-the-wire shape.Out of scope
🤖 Generated with Claude Code