Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# The pinned formatter versions below (black 23.1.0, isort 5.12.0, docformatter
# 1.7.x) do not run on Python 3.13+, so pin the hook toolchain to 3.12 (the version
# CI uses). Without this, hooks fail on machines whose default python is newer.
default_language_version:
python: python3.12

repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v5.0.0
Expand All @@ -18,8 +24,11 @@ repos:
hooks:
- id: codespell
args: [--skip, "*.json,*.lock,*pnpm-lock.yaml"]
# v1.7.6 dropped the `docformatter-venv` hook whose `language: python_venv` made
# prek / pre-commit>=4 reject the whole manifest. Pinned to v1.7.6 (not v1.7.7,
# whose docstring wrapping differs) so the bump introduces no reformatting.
- repo: https://github.com/PyCQA/docformatter
rev: v1.7.5
rev: v1.7.6
hooks:
- id: docformatter
args: [--in-place]
14 changes: 14 additions & 0 deletions app-tests/docker-compose-app-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ services:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 3s
retries: 10

opal_server:
image: permitio/opal-server:${OPAL_IMAGE_TAG:-latest}
Expand All @@ -56,6 +61,15 @@ services:
endpoint_mode: vip
environment:
- OPAL_BROADCAST_URI=postgres://postgres:postgres@broadcast_channel:5432/postgres
# Make the broadcaster reconnect fast and replay after both replicas have
# re-subscribed, so a cross-instance update published during a backbone
# outage deterministically converges to both clients (see run.sh
# "cross-instance consistency" test). With backoff_max=2 both servers
# re-subscribe within ~2s of the DB returning, while settle=3 delays the
# replay until after that, guaranteeing the replayed update is delivered.
- OPAL_BROADCAST_RECONNECT_BACKOFF_MIN_SECONDS=0.5
- OPAL_BROADCAST_RECONNECT_BACKOFF_MAX_SECONDS=2
- OPAL_BROADCAST_RESYNC_SETTLE_SECONDS=3
- UVICORN_NUM_WORKERS=4
- OPAL_POLICY_REPO_URL=http://gitea:3000/gitea_admin/policy-repo.git
- OPAL_POLICY_REPO_MAIN_BRANCH=${POLICY_REPO_BRANCH:-main}
Expand Down
90 changes: 83 additions & 7 deletions app-tests/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,33 @@ function check_no_error {
fi
}

function check_servers_logged {
echo "- Looking for msg '$1' in server's logs"
compose logs opal_server | grep -q "$1"
}

function check_servers_not_logged {
echo "- Ensuring msg '$1' is absent from server's logs"
if compose logs opal_server | grep -q "$1"; then
echo "- Unexpectedly found '$1' in server logs:"
compose logs opal_server | grep "$1"
exit 1
fi
}

function wait_for_broadcaster {
echo "- Waiting for broadcast_channel to accept connections"
for _ in $(seq 1 30); do
if compose exec -T broadcast_channel pg_isready -U postgres -q; then
echo " broadcast_channel is ready"
return 0
fi
sleep 1
done
echo " broadcast_channel did not become ready in time"
exit 1
}

function clean_up {
ARG=$?
# Ensure we're in the script directory for cleanup
Expand Down Expand Up @@ -277,11 +304,9 @@ function test_push_policy {
check_clients_logged "PUT /v1/policies/$regofile -> 200"
}

function test_data_publish {
echo "- Testing data publish for user $1"
function publish_data {
# POST a data update to a single OPAL server (no assertion).
user=$1

# Use curl to publish data update via OPAL server API
curl -s -X POST http://localhost:7002/data/config \
-H "Authorization: Bearer $OPAL_DATA_SOURCE_TOKEN" \
-H "Content-Type: application/json" \
Expand All @@ -294,9 +319,13 @@ function test_data_publish {
"save_method": "PUT"
}]
}'
}

function test_data_publish {
echo "- Testing data publish for user $1"
publish_data "$1"
sleep 5
check_clients_logged "PUT /v1/data/users/$user/location -> 204"
check_clients_logged "PUT /v1/data/users/$1/location -> 204"
}

function test_statistics {
Expand Down Expand Up @@ -344,15 +373,54 @@ function main {
test_push_policy "something"
test_statistics

echo "- Testing broadcast channel disconnection"
echo "- Testing broadcast channel disconnection (graceful restart)"
compose restart broadcast_channel
sleep 10
wait_for_broadcaster
# Give the servers' reconnecting broadcaster a moment to re-establish the backbone
sleep 5

test_data_publish "alice"
test_push_policy "another"

echo "- Testing broadcast channel disconnection (ungraceful kill)"
compose kill broadcast_channel
sleep 3
compose up -d broadcast_channel
wait_for_broadcaster
sleep 5

test_data_publish "sunil"
test_data_publish "eve"
test_push_policy "best_one_yet"

# Regression guards for the broadcaster-disconnect storm (see pubsub_resilience.py):
# the servers must have reconnected to the backbone (this line is logged on every
# (re)connect, so it fires on both the graceful-restart and ungraceful-kill paths),
# and must NOT have spewed the non-idempotent-disconnect ValueError that drove the
# fleet-wide drop storm.
check_servers_logged "Broadcaster listener connected to channel"
check_servers_not_logged "list.remove(x): x not in list"

# Cross-instance consistency: publish an update WHILE the backbone is down, then
# recover. The two clients connect to different server replicas via the service VIP,
# so for BOTH to end up with the value the missed cross-server update must converge
# after recovery (via the replay buffer and/or the resync-on-reconnect path).
echo "- Testing cross-instance consistency across a backbone outage"
compose kill broadcast_channel
sleep 3
publish_data "consistency_user"
sleep 2
compose up -d broadcast_channel
wait_for_broadcaster
# allow buffered replay + (if needed) client resync + full refetch to settle
sleep 15
# The server that received the publish while the backbone was down must have
# buffered it and replayed it on recovery (proves the replay path actually ran,
# not just a client refetch).
check_servers_logged "buffered for replay"
check_servers_logged "Replaying"
# BOTH clients (on different replicas via the VIP) must end up with the value.
check_clients_logged "PUT /v1/data/users/consistency_user/location -> 204"
# TODO: Test statistics feature again after broadcaster restart (should first fix statistics bug)
}

Expand All @@ -365,6 +433,14 @@ while [ $RETRY_COUNT -lt $MAX_RETRIES ]; do
main && break
RETRY_COUNT=$((RETRY_COUNT + 1))
echo "Test failed, retrying..."
# Tear the stack down before retrying so the next attempt starts clean:
# generate_opal_keys binds host port 7002, which conflicts with a leftover
# stack, and stale (transient) client ERRORs from the previous attempt's
# broadcaster kills would otherwise trip check_no_error. The compose helper
# uses --env-file .env, which exists after the first attempt's keygen.
compose down --remove-orphans --volumes 2>/dev/null || true
docker rm -f --wait opal-server-keygen 2>/dev/null || true
rm -rf ./opal-tests-policy-repo ./temp-repo ./gitea-data ./git-repos 2>/dev/null || true
done

if [ $RETRY_COUNT -ge $MAX_RETRIES ]; then
Expand Down
46 changes: 44 additions & 2 deletions documentation/docs/getting-started/configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -526,11 +526,53 @@ The channel name for broadcasting messages.

For more information, see [running OPAL with Kafka](/tutorials/run_opal_with_kafka) and [running OPAL with Apache Pulsar](/tutorials/run_opal_with_pulsar).

#### OPAL_BROADCAST_CONN_LOSS_BUGFIX_EXPERIMENT_ENABLED
#### OPAL_BROADCAST_RECONNECT_ENABLED

Default: `True`

Enable experimental fix for broadcast connection loss issues.
Reconnect the broadcaster reader on a backbone disconnect instead of dropping all client connections. Set to `False` to revert to the legacy (non-reconnecting) broadcaster.

#### OPAL_BROADCAST_RECONNECT_MAX_RETRIES

Default: `0`

Maximum consecutive broadcaster reconnect attempts before giving up and letting the worker restart (`0` = retry forever).

#### OPAL_BROADCAST_RECONNECT_BACKOFF_MIN_SECONDS

Default: `0.5`

Minimum backoff in seconds between broadcaster reconnect attempts.

#### OPAL_BROADCAST_RECONNECT_BACKOFF_MAX_SECONDS

Default: `30.0`

Maximum backoff in seconds between broadcaster reconnect attempts.

#### OPAL_BROADCAST_REPLAY_BUFFER_SIZE

Default: `10000`

Maximum number of outbound broadcasts buffered while the backbone is down and replayed on reconnect (`0` disables buffering).

#### OPAL_BROADCAST_RESYNC_ON_RECONNECT

Default: `True`

After a backbone gap that may have lost updates, force this worker's connected clients to reconnect so they re-fetch full policy + data state (guarantees cross-instance consistency).

#### OPAL_BROADCAST_RESYNC_SETTLE_SECONDS

Default: `2.0`

Grace period after a broadcaster reconnect before replaying buffered broadcasts and resyncing clients, to let peer servers re-subscribe.

#### OPAL_BROADCAST_HEALTHCHECK_ENABLED

Default: `True`

Make `/healthcheck` reflect the broadcaster reader's health, so a Kubernetes readiness/liveness probe can route away from or restart a worker whose reader is wedged while clients depend on it.

#### OPAL_BROADCAST_KEEPALIVE_INTERVAL

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,27 @@ This is how you define the number of workers (pay attention: this env var is not
| Env Var Name | Function |
| :------------------ | :--------------------------------------------------------------- |
| UVICORN_NUM_WORKERS | the number of workers in a single container (example value: `4`) |

#### 4) Broadcaster reconnection (resilience)

If the broadcast backbone (Postgres/Redis/Kafka) briefly drops — for example during a managed-database failover or restart — OPAL servers reconnect to it automatically with bounded exponential backoff, instead of dropping their connected clients. This is enabled by default; the following **server-side** env vars (all prefixed with `OPAL_`) tune it:

| Env Var Name | Default | Function |
| :------------------------------------------- | :------ | :----------------------------------------------------------------------------------------------------------------------------------------------- |
| OPAL_BROADCAST_RECONNECT_ENABLED | `true` | Reconnect the broadcaster reader on a backbone disconnect instead of dropping all client connections. Set to `false` for the legacy behavior. |
| OPAL_BROADCAST_RECONNECT_MAX_RETRIES | `0` | Maximum consecutive reconnect attempts before giving up and letting the worker restart. `0` means retry forever. |
| OPAL_BROADCAST_RECONNECT_BACKOFF_MIN_SECONDS | `0.5` | Minimum backoff (seconds) between reconnect attempts. |
| OPAL_BROADCAST_RECONNECT_BACKOFF_MAX_SECONDS | `30` | Maximum backoff (seconds) between reconnect attempts. |
| OPAL_BROADCAST_REPLAY_BUFFER_SIZE | `10000` | Max number of outbound broadcasts buffered while the backbone is down and replayed on reconnect (`0` disables buffering). On overflow the oldest are dropped. |
| OPAL_BROADCAST_RESYNC_ON_RECONNECT | `true` | After a backbone gap, force this worker's clients to reconnect so they re-fetch full policy + data state. Set to `false` to rely only on best-effort replay. |
| OPAL_BROADCAST_RESYNC_SETTLE_SECONDS | `2` | Grace period after a reconnect before replaying buffered broadcasts and resyncing clients, to let peer servers re-subscribe. |
| OPAL_BROADCAST_HEALTHCHECK_ENABLED | `true` | Make `/healthcheck` reflect the broadcaster reader's health: it returns `503` (instead of `200`) when the reader is wedged while clients depend on it, so a k8s readiness/liveness probe can route away from or restart the worker. A normal transient reconnect still reads healthy. Set to `false` to keep `/healthcheck` always `200`. |

Consistency across the outage is handled in two layers. While the backbone is unreachable, client websocket connections are kept alive but cross-server fan-out is paused. On reconnect:

- **Replay buffer** — broadcasts that failed to reach the backbone during the outage are replayed, so peer servers that have re-subscribed catch up without a client refetch. This is best-effort: the backbone keeps no replay of its own, so a peer that is slow to re-subscribe may miss a replayed message.
- **Resync** (the guarantee) — each server forces its own clients to reconnect and re-fetch the full policy/data state. Because every server experienced the same gap, every server reconciles its own clients and the fleet converges to current truth. Updates missed during the gap are therefore reconciled even if the replay did not reach a peer in time.

**Scope of the resync guarantee.** The resync re-fetch covers the policy and the **configured data sources** (those in `OPAL_DATA_CONFIG_SOURCES` / the server's data-source config) — clients fully refetch them. Runtime-published **incremental** updates — for example a `POST /data/config` that carries an inline `data` payload or a one-off fetch URL that is not part of the configured sources — are **not** part of that full refetch, so they are recoverable **only** through the best-effort replay buffer; if their replay is dropped (buffer overflow, or a peer that did not re-subscribe in time) the update is lost and not reconciled by the resync.

**Replay ordering.** Cross-worker replay ordering is not enforced, so a buffered update can be delivered after a newer live update (this self-heals for fetch-URL entries, which re-fetch current state, but a stale value can win for inline-`data` entries that carry the value directly).
52 changes: 49 additions & 3 deletions packages/opal-server/opal_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,56 @@ class OpalServerConfig(Confi):
"EventNotifier",
description="The name to be used for segmentation in the backbone pub/sub",
)
BROADCAST_CONN_LOSS_BUGFIX_EXPERIMENT_ENABLED = confi.bool(
"BROADCAST_CONN_LOSS_BUGFIX_EXPERIMENT_ENABLED",
BROADCAST_RECONNECT_ENABLED = confi.bool(
"BROADCAST_RECONNECT_ENABLED",
True,
description="Enable experimental bugfix for broadcast connection loss",
description="Reconnect the broadcaster reader on a backbone disconnect instead "
"of dropping all client connections. Set to False to revert to the legacy "
"(non-reconnecting) broadcaster.",
)
BROADCAST_RECONNECT_MAX_RETRIES = confi.int(
"BROADCAST_RECONNECT_MAX_RETRIES",
0,
description="Maximum consecutive broadcaster reconnect attempts before giving "
"up and letting the worker restart (0 = retry forever).",
)
BROADCAST_RECONNECT_BACKOFF_MIN_SECONDS = confi.float(
"BROADCAST_RECONNECT_BACKOFF_MIN_SECONDS",
0.5,
description="Minimum backoff in seconds between broadcaster reconnect attempts.",
)
BROADCAST_RECONNECT_BACKOFF_MAX_SECONDS = confi.float(
"BROADCAST_RECONNECT_BACKOFF_MAX_SECONDS",
30.0,
description="Maximum backoff in seconds between broadcaster reconnect attempts.",
)
BROADCAST_REPLAY_BUFFER_SIZE = confi.int(
"BROADCAST_REPLAY_BUFFER_SIZE",
10000,
description="Max number of outbound broadcasts buffered while the backbone is "
"down and replayed on reconnect (0 disables buffering). On overflow the oldest "
"buffered broadcasts are dropped; the resync on reconnect still reconciles clients.",
)
BROADCAST_RESYNC_ON_RECONNECT = confi.bool(
"BROADCAST_RESYNC_ON_RECONNECT",
True,
description="After a backbone gap that may have lost updates, force this "
"worker's connected clients to reconnect so they re-fetch full policy + data "
"state (guarantees cross-instance consistency).",
)
BROADCAST_RESYNC_SETTLE_SECONDS = confi.float(
"BROADCAST_RESYNC_SETTLE_SECONDS",
2.0,
description="Grace period after a broadcaster reconnect before replaying "
"buffered broadcasts and resyncing clients, to let peer servers re-subscribe.",
)
BROADCAST_HEALTHCHECK_ENABLED = confi.bool(
"BROADCAST_HEALTHCHECK_ENABLED",
True,
description="Make /healthcheck reflect the broadcaster reader's health so a "
"k8s readiness/liveness probe can route away from or restart a worker whose "
"reader is wedged while clients depend on it. Set to False to revert "
"/healthcheck to always returning ok.",
)

# server security
Expand Down
Loading
Loading