diff --git a/.gitignore b/.gitignore index 77faef34a..c3224ec1c 100644 --- a/.gitignore +++ b/.gitignore @@ -137,3 +137,6 @@ dmypy.json *.iml .DS_Store + +# Private Claude Code working artifacts (plans/specs) — never commit +.claude/ diff --git a/app-tests/git-leak/README.md b/app-tests/git-leak/README.md new file mode 100644 index 000000000..4b5d9c486 --- /dev/null +++ b/app-tests/git-leak/README.md @@ -0,0 +1,82 @@ +# OPAL git-leak / resilience test bed + +Reproduces (as failing tests) the four issues fixed by PR2–PR5: memory leak, +offline-repo hang, slow serial boot, broadcaster no-reconnect. + +Every assertion is driven through `GET /internal/git-fetcher-cache-stats`, which +**this PR (PR1) adds** — it does not exist on `master`. So the suite runs against +*this branch*: the leak/offline tests fail here *until PR2/PR3 land*, then go +green. Run against true `master` they would all error at setup on the missing +endpoint, not "fail for the targeted bug." + +## Stack +- `opal_server` (single worker, scopes on, Postgres broadcaster, built from `docker/Dockerfile`) +- `redis`, `postgres`, `gitea` (+ one-shot `gitea-admin` and `seed` sidecars) +- `blackhole` (alpine/socat: accepts TCP then never answers — the offline repo) + +Only `opal_server` (`:7002`) and `gitea` (`:13000` on the host) are published; +Postgres and `blackhole` are internal to the compose network. + +## Helpers (`helpers.py`) +- `OpalServerClient` — drive opal over HTTP (`stats`, `put_scope`, `delete_scope`, + `refresh_all`, `get_scope_policy`, `list_scope_ids`, `delete_all_scopes`). +- `GiteaAdmin` — host-side Gitea admin client (`list_repos`, `repo_exists`, + `create_repo`, `delete_repo`); also exposed as the `gitea_admin` pytest fixture. +- `make_repo_unreachable(name)` — git URL on the `blackhole` sidecar (completes + the TCP handshake, never answers) so the clone hangs for the offline-repo test. +- `bounce_postgres(down_seconds)` — stop Postgres, then `up -d --wait` it back to + simulate a broadcaster outage and await readiness before the recovery poll. + +## Run +```bash +cd app-tests/git-leak +python -m pytest -v --boot-scopes=50 # full set +python -m pytest test_leak.py -v --boot-scopes=20 # just the leak gates +``` +Useful flags: `--boot-scopes=N` (any N), `--keep-stack` (skip teardown), +env `BOOT_TARGET_SECONDS=120` (tighten the boot gate). + +## Expected behavior + +Gate-coverage matrix (what each flagship test actually does): + +| Test | Role | Behaviour here | +|---|---|---| +| `test_churn_releases_caches` | **gate (PR2)** | FAILS without the PR2 leak fix — delete leaves the caches populated; flips green when PR2 lands | +| `test_offline_repo_does_not_block_healthy_scopes` | **gate (PR3)** | FAILS without the PR3 fetch timeout — 40 hung clones starve the executor so a healthy scope never serves; flips green when PR3 lands | +| `test_boot_loads_all_scopes` | **baseline → gate (PR4)** | PASSES with the loose default target; set `BOOT_TARGET_SECONDS` low (plan: 120 @ 50) on PR4 to gate the parallel-boot fix | +| `test_repeat_sync_rss_stays_bounded` | **RSS guard** | PASSES; an RSS-budget guard against per-sync allocation leaks (the cache *count* can't grow for any impl, so there is no count assertion — see below) | +| `test_server_recovers_after_postgres_bounce` | **guard (PER-15065)** | PASSES on this branch (which has #915); guards the in-place broadcaster reconnect | + +Notes on the two guards: +- `test_repeat_sync_rss_stays_bounded` — clone paths are keyed by the repo URL, + so re-syncing identical scopes reuses cache entries and the cache *counts* + can't grow for any implementation; the load-bearing assertion is therefore on + RSS only (a `len(repos)` check would be tautological and is intentionally + omitted), guarding against a regression that leaks per-sync allocations. +- `test_server_recovers_after_postgres_bounce` — runs **2 workers** so the + Postgres backbone is actually exercised (cross-worker fan-out needs >=2 + workers; a single worker fans out in-process and never touches the backbone). + Across a transient bounce it asserts the gunicorn **worker PIDs are unchanged** + — proving #915's reconnecting broadcaster recovered the reader *in place* + rather than gunicorn respawning a graceful-shutdown worker (the pre-fix + behaviour) — and that a scope PUT after the bounce becomes servable, proving + the broadcast/sync path recovered (not just HTTP). + +## Requires +Docker + docker compose v2, plus host Python with `pytest pytest-timeout requests GitPython`. + +## Notes +- Auth is disabled in the stack: `OPAL_AUTH_PUBLIC_KEY` is left unset so the JWT + verifier is disabled and the harness can call scope routes without minting JWTs. + Local test bed only; never a production setting. (The `/internal` endpoint is + registered with the same `JWTAuthenticator` dependency as the other routes, so + it is protected when JWT verification is enabled and open only here.) +- The server runs a **single** uvicorn worker. The `GitPolicyFetcher` caches read + by `/internal/git-fetcher-cache-stats` are per-process, so a multi-worker stack + would make a round-robin read miss the worker that fetched and let a `== 0` + drain assertion pass falsely. One worker makes every cache read deterministic; + the leak/boot/offline bugs all reproduce single-worker. +- First-sync of a fresh scope takes the clone path, which fills only `repo_locks`; + `repos` / `repos_last_fetched` are filled by the discover/fetch path on a second + sync, so the load helpers issue a `refresh_all()` before asserting on `repos`. diff --git a/app-tests/git-leak/conftest.py b/app-tests/git-leak/conftest.py new file mode 100644 index 000000000..547c22404 --- /dev/null +++ b/app-tests/git-leak/conftest.py @@ -0,0 +1,133 @@ +import os +import shutil + +import pytest +from helpers import ( + HEALTHY_PROBE_REPO, + GiteaAdmin, + OpalServerClient, + compose, + list_seeded_repos, + worker_pids, +) + + +def pytest_addoption(parser): + parser.addoption( + "--boot-scopes", + action="store", + default="50", + help="number of repos to seed/boot (default 50)", + ) + parser.addoption( + "--keep-stack", + action="store_true", + default=False, + help="do not tear the compose stack down after the run", + ) + + +@pytest.fixture(scope="session") +def repo_count(request) -> int: + return int(request.config.getoption("--boot-scopes")) + + +@pytest.fixture(scope="session") +def stack(request, repo_count): + # Defense-in-depth: this docker-compose suite is already excluded from the + # repo's default `pytest` run via `testpaths = packages` in pytest.ini, so + # the unit-test CI matrix never collects it. If it is ever collected in an + # environment without docker, skip cleanly instead of erroring. + if shutil.which("docker") is None: + pytest.skip("docker (compose) is required for the git-leak test bed") + os.environ["REPO_COUNT"] = str(repo_count) + # build + start infra; seed runs to completion then exits + compose("up", "-d", "--build") + # block until seeding sidecar has finished creating repos. compose() raises + # (with output) if the seed container exited non-zero, so a hard seed + # failure surfaces here rather than as a confusing later test failure. + compose("wait", "seed") + # Verify the seed actually produced all N repos before any test runs: a + # partial seed would otherwise look like a server bug when the load gate + # can't reach N. Fail loudly with the gap. + # include the reserved probe repo the resilience test relies on, so a + # partial seed of it is caught here too rather than as a later test failure + expected = set(list_seeded_repos(repo_count)) | {HEALTHY_PROBE_REPO} + present = set(GiteaAdmin().list_repos()) + missing = expected - present + assert not missing, ( + f"seed incomplete: {len(missing)}/{repo_count} repos missing " + f"(e.g. {sorted(missing)[:5]})" + ) + client = OpalServerClient() + client.wait_healthy() + yield client + if not request.config.getoption("--keep-stack"): + compose("down", "-v") + + +@pytest.fixture() +def opal(stack) -> OpalServerClient: + # The compose stack is session-scoped (one server for the whole run), but + # scopes must not leak between tests: clone paths are keyed by repo URL, so + # a scope left behind by one test shares a cache entry with any later test + # using the same seeded repo and would pollute its drain assertions. + # + # Delete every scope the *server* currently knows (not just this client's + # tracked set) at setup, so a scope orphaned by a prior failed test can't + # contaminate this one; then again on teardown. + stack.delete_all_scopes() + # Guard the single-worker invariant the cache gates depend on: if a prior + # opal_multiworker teardown failed to restore 1 worker, the per-process cache + # reads would be nondeterministic here (a `== 0` drain could false-pass). + # Fail loudly and ordering-independently rather than silently mis-measure. + assert ( + len(worker_pids()) == 1 + ), f"expected a single-worker stack, found workers {sorted(worker_pids())}" + yield stack + stack.delete_all_scopes() + + +@pytest.fixture() +def opal_multiworker(stack) -> OpalServerClient: + """opal_server reconfigured to 2 gunicorn workers, for the broadcaster + test. + + The session stack is single-worker (the right call for the per- + process cache drain assertions), but the Postgres broadcaster's + cross-worker fan-out — the reason it is in this compose file at all + — is only exercised with >=2 workers (references/debug-pubsub.md + §3-4). This force-recreates opal_server with 2 workers for one test, + then restores the single-worker stack on teardown so the cache tests + keep their determinism. Each side starts from a clean slate: the + recreate wipes the container's on-disk clones, and clearing scopes + stops a leftover scope (whose clone is URL-keyed) from being re- + cloned on boot. + """ + os.environ["OPAL_TEST_WORKERS"] = "2" + try: + # --no-deps: don't bounce redis/postgres/gitea; --force-recreate: apply + # the new worker count. No --wait (opal_server has no compose + # healthcheck) — wait_healthy() polls the HTTP surface instead. + compose("up", "-d", "--no-deps", "--force-recreate", "opal_server") + stack.wait_healthy() + stack.delete_all_scopes() + yield stack + finally: + os.environ["OPAL_TEST_WORKERS"] = "1" + compose("up", "-d", "--no-deps", "--force-recreate", "opal_server") + stack.wait_healthy() + stack.delete_all_scopes() + # Verify the restore actually reduced the stack back to one worker. If it + # did not (a botched recreate), fail loudly here rather than leave a + # 2-worker stack that would silently break later single-worker cache + # gates' determinism. + assert ( + len(worker_pids()) == 1 + ), f"opal_multiworker teardown left workers {sorted(worker_pids())}, expected 1" + + +@pytest.fixture(scope="session") +def gitea_admin(stack) -> GiteaAdmin: + """Host-side Gitea admin client (depends on `stack` so Gitea is up).""" + return GiteaAdmin() diff --git a/app-tests/git-leak/docker-compose.yml b/app-tests/git-leak/docker-compose.yml new file mode 100644 index 000000000..66b5d7a59 --- /dev/null +++ b/app-tests/git-leak/docker-compose.yml @@ -0,0 +1,141 @@ +name: opal-git-leak-test + +services: + redis: + image: redis:7-alpine + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 2s + timeout: 3s + retries: 30 + + postgres: + image: postgres:16-alpine + environment: + POSTGRES_USER: opal + POSTGRES_PASSWORD: opal + POSTGRES_DB: opal + # not published to the host: only opal_server reaches it over the compose + # network, and bounce_postgres() uses `docker compose stop/start`. Publishing + # 5432 would collide with any Postgres already running on the host. + healthcheck: + test: ["CMD-SHELL", "pg_isready -U opal"] + interval: 2s + timeout: 3s + retries: 30 + + gitea: + image: gitea/gitea:1.21 + environment: + GITEA__security__INSTALL_LOCK: "true" + GITEA__server__ROOT_URL: "http://gitea:3000/" + GITEA__database__DB_TYPE: "sqlite3" + # published on 13000 (not 3000) for the host-side GiteaAdmin helper; the + # uncommon port avoids the usual :3000 clash. opal_server and the seed + # sidecar still reach it over the compose network via http://gitea:3000. + ports: + - "13000:3000" + volumes: + - gitea-data:/data + healthcheck: + test: ["CMD-SHELL", "wget -qO- http://localhost:3000/api/v1/version || exit 1"] + interval: 3s + timeout: 5s + retries: 40 + + gitea-admin: + # creates the admin user once gitea is healthy + image: gitea/gitea:1.21 + depends_on: + gitea: + condition: service_healthy + user: git + entrypoint: ["/bin/sh", "-c"] + # Tolerate the idempotent "already exists" case, and RETRY on "database is + # locked": this CLI mutates the same SQLite file the live gitea server holds + # open, so it can lose a lock race and fail transiently. Any other failure + # aborts so `seed` (which depends on this completing) doesn't run against a + # Gitea with no admin user and fail with a confusing 401. + # `|` (literal) block, not `>` (folded): the `gitea admin user create` call is + # kept on ONE line so YAML can't fold a newline into the middle of its args + # (that would run `--email ...` as its own command -> exit 127). + command: + - | + for attempt in 1 2 3 4 5 6; do + out=$$(gitea admin user create --username opaladmin --password opaladmin --email admin@example.com --admin --must-change-password=false --config /data/gitea/conf/app.ini 2>&1) + rc=$$? + echo "$$out" + if [ $$rc -eq 0 ] || echo "$$out" | grep -qi "already exist"; then exit 0; fi + if echo "$$out" | grep -qi "database is locked"; then echo "gitea db locked; retry $$attempt"; sleep 2; continue; fi + exit $$rc + done + echo "gitea admin create failed after retries" + exit 1 + volumes: + - gitea-data:/data + restart: "no" + + blackhole: + # Accepts the TCP handshake then never answers — a clone connects and + # blocks reading the git smart-HTTP response, holding the fetch executor. + # Deterministic, unlike a TEST-NET-1 address which many networks reject + # fast with ICMP-unreachable (so the clone would fail fast, not hang). + image: alpine/socat:1.8.0.3 + command: ["TCP-LISTEN:80,fork,reuseaddr", "SYSTEM:sleep 3600"] + + seed: + build: ./seed + depends_on: + gitea: + condition: service_healthy + gitea-admin: + condition: service_completed_successfully + environment: + GITEA_URL: "http://gitea:3000" + GITEA_ADMIN_USER: "opaladmin" + GITEA_ADMIN_PASSWORD: "opaladmin" + REPO_COUNT: "${REPO_COUNT:-50}" + restart: "no" + + opal_server: + build: + context: ../.. + dockerfile: docker/Dockerfile + target: server + environment: + # Default single worker: the GitPolicyFetcher caches read by + # /internal/git-fetcher-cache-stats are per-process, so with >1 worker a + # round-robin read can miss the worker that fetched and make a `== 0` + # drain assertion pass falsely. One worker makes every cache read + # deterministic. The leak/boot/offline bugs all reproduce single-worker. + # The postgres-bounce test (test_resilience.py) overrides this to 2 via + # OPAL_TEST_WORKERS for its own container, because cross-worker fan-out + # over the Postgres backbone only happens with >=2 workers + # (references/debug-pubsub.md §3-4) — a single worker can't tell #915's + # in-place broadcaster reconnect from a plain worker respawn. + UVICORN_NUM_WORKERS: "${OPAL_TEST_WORKERS:-1}" + OPAL_SCOPES: "1" + OPAL_REDIS_URL: "redis://redis:6379" + OPAL_BROADCAST_URI: "postgres://opal:opal@postgres:5432/opal" + # Make the broadcaster reconnect fast + deterministic for the postgres- + # bounce test (defaults are 30s max backoff / 2s settle). Harmless to the + # single-worker tests, which don't exercise cross-worker fan-out. + OPAL_BROADCAST_RECONNECT_BACKOFF_MAX_SECONDS: "2" + OPAL_BROADCAST_RESYNC_SETTLE_SECONDS: "2" + OPAL_BASE_DIR: "/opal" + OPAL_POLICY_REFRESH_INTERVAL: "0" + OPAL_DEBUG_INTERNAL_STATS: "1" + # OPAL_AUTH_PUBLIC_KEY is intentionally left unset: with no public key the + # JWT verifier is disabled, so the harness can call scope routes without + # minting JWTs. Local test bed only; never a production setting. + OPAL_LOG_FORMAT_INCLUDE_PID: "true" + ports: + - "7002:7002" + depends_on: + redis: + condition: service_healthy + postgres: + condition: service_healthy + +volumes: + gitea-data: diff --git a/app-tests/git-leak/helpers.py b/app-tests/git-leak/helpers.py new file mode 100644 index 000000000..fd88c0d5a --- /dev/null +++ b/app-tests/git-leak/helpers.py @@ -0,0 +1,397 @@ +"""HTTP + infra helpers for the git-leak test bed.""" +import subprocess +import time +from pathlib import Path +from typing import Dict, List + +import requests + +OPAL_URL = "http://localhost:7002" +# reachable from inside the opal_server container (compose network) +GITEA_INTERNAL_URL = "http://gitea:3000" +# reachable from the host-side test harness (published port, see docker-compose.yml) +GITEA_HOST_URL = "http://localhost:13000" +GITEA_USER = "opaladmin" +GITEA_PASSWORD = "opaladmin" + +# the `blackhole` compose service (alpine/socat) accepts the TCP handshake then +# never answers, so a clone connects and blocks reading the response — a +# deterministic hang. Reachable from the opal_server container on the compose +# network. (A TEST-NET-1 address was rejected too fast on many networks, so the +# clone failed fast instead of hanging and the offline scenario wasn't exercised.) +UNREACHABLE_HOST = "blackhole" + +# the compose project lives next to this file; compose() runs from here +_COMPOSE_DIR = str(Path(__file__).resolve().parent) + + +class OpalServerClient: + def __init__(self, base_url: str = OPAL_URL): + self.base_url = base_url.rstrip("/") + # scope_ids created via put_scope, so a per-test fixture can delete them + # on teardown. Clone paths are keyed by repo URL (not scope_id), so a + # scope left behind by one test shares a GitPolicyFetcher cache entry + # with any other test pointing at the same seeded repo — without cleanup + # that leftover keeps the entry alive and pollutes a drain assertion. + self._created_scopes: set = set() + + def wait_healthy(self, timeout: int = 180) -> None: + deadline = time.time() + timeout + last = None + while time.time() < deadline: + try: + if ( + requests.get(f"{self.base_url}/healthcheck", timeout=5).status_code + == 200 + ): + return + except requests.RequestException as exc: + last = exc + time.sleep(2) + raise RuntimeError(f"opal-server not healthy in {timeout}s (last: {last})") + + def stats(self, samples: int = 3, interval: float = 0.1) -> Dict[str, int]: + """Read the git-fetcher cache stats, merged across a few reads. + + The stack runs a single uvicorn worker (see docker-compose.yml), so the + per-process ``GitPolicyFetcher`` caches are read deterministically — a + read can't miss the worker that fetched. Sampling a few times and taking + the ``max`` per key only smooths over a read that races an in-flight + mutation; it is not relied on to paper over multi-worker nondeterminism + (which the single-worker setup removes outright). + """ + merged: Dict[str, int] = {} + for i in range(max(1, samples)): + resp = requests.get( + f"{self.base_url}/internal/git-fetcher-cache-stats", timeout=10 + ) + resp.raise_for_status() + for key, value in resp.json().items(): + merged[key] = max(merged.get(key, 0), value) + if i < samples - 1: + time.sleep(interval) + return merged + + def put_scope(self, scope_id: str, repo_url: str, branch: str = "main") -> None: + body = { + "scope_id": scope_id, + "policy": { + "source_type": "git", + "url": repo_url, + "auth": {"auth_type": "none"}, + "branch": branch, + "directories": ["."], + "extensions": [".rego", ".json"], + "manifest": ".manifest", + "poll_updates": False, + }, + "data": {"entries": []}, + } + # the scope router mounts at prefix="/scopes" with @router.put("") + resp = requests.put(f"{self.base_url}/scopes", json=body, timeout=30) + resp.raise_for_status() + self._created_scopes.add(scope_id) + + def delete_scope(self, scope_id: str) -> None: + resp = requests.delete(f"{self.base_url}/scopes/{scope_id}", timeout=30) + if resp.status_code not in (200, 204, 404): + resp.raise_for_status() + self._created_scopes.discard(scope_id) + + def list_scope_ids(self) -> List[str]: + """All scope ids the server currently knows (GET /scopes).""" + resp = requests.get(f"{self.base_url}/scopes", timeout=30) + resp.raise_for_status() + return [s["scope_id"] for s in resp.json()] + + def hard_reset(self, timeout: int = 600) -> None: + """Recover the server from a saturated fetch executor by wiping state. + + When a test leaves many clones hung (the offline-repo test saturates the + executor on purpose), per-scope DELETEs would queue *behind* those hung + threads, and a plain restart would have ``preload_scopes`` re-clone the + offline scopes and saturate again. Instead: stop the server (killing the + hung threads), flush the Redis scope store so nothing is re-cloned, then + start clean. Used in that test's teardown so the session-scoped stack is + usable by every later test. + """ + compose("stop", "opal_server") + try: + compose("exec", "-T", "redis", "redis-cli", "FLUSHALL") + finally: + # Always bring the server back up, even if the flush failed: leaving + # it stopped would fail every later session-scoped test, and since + # this runs in a test's `finally` it would also mask the real result. + compose("start", "opal_server") + self._created_scopes.clear() + self.wait_healthy(timeout=timeout) + + def delete_all_scopes(self, drain_timeout: int = 3) -> None: + """Delete every scope the *server* knows (not just this client's), then + best-effort wait for the caches to drain — a clean slate independent of + what any prior, possibly-failed, test left behind. + + Best-effort drain by design: on master, delete never purges the caches + (the leak this suite gates), so the wait can't succeed there — hence the + short ``drain_timeout`` (this runs in *every* test's setup and teardown, + so a long wait for a state that can't occur on master would be pure dead + time per test). Post-PR2 the purge is near-instant, so a few seconds is + ample. The DELETEs themselves are synchronous, so the scope store is + already clean before this wait — the wait only smooths the in-process + cache count. This runs in fixture setup/teardown, so a failure here must + not mask the test, hence the broad excepts and bounded wait. + """ + try: + for scope_id in self.list_scope_ids(): + try: + self.delete_scope(scope_id) + except Exception: + self._created_scopes.discard(scope_id) + except Exception: + pass + self._created_scopes.clear() + deadline = time.time() + drain_timeout + while time.time() < deadline: + try: + # Single snapshot: we're waiting for zero, so the peak-merge + # (max over samples) would only delay observing the drain. + if self.stats(samples=1)["repo_locks"] == 0: + return + except Exception: + # A transient stats-read failure is not proof of a drain — keep + # polling until the deadline rather than returning early, which + # would let a not-yet-drained cache leak into the next test. + pass + time.sleep(1) + + def get_scope_policy(self, scope_id: str) -> requests.Response: + """Fetch a scope's policy bundle (GET /scopes/{id}/policy). + + A 200 here proves the scope's repo was cloned and is being + served — the signal that a healthy scope still works while + another scope's clone is hanging. + """ + return requests.get(f"{self.base_url}/scopes/{scope_id}/policy", timeout=30) + + def refresh_all(self) -> None: + # POST /scopes/refresh publishes on the webhook topic so the leader + # re-syncs all scopes. The second sync takes the discover/fetch path + # (not the first-sync clone path), which is what populates the `repos` + # and `repos_last_fetched` caches. A missing route is a real error and + # is surfaced via raise_for_status. + resp = requests.post(f"{self.base_url}/scopes/refresh", timeout=30) + resp.raise_for_status() + + +class GiteaAdmin: + """Host-side admin client for the test bed's Gitea. + + The ``seed`` sidecar does the bulk repo creation from inside the compose + network; this class lets a test inspect or mutate Gitea repos directly + from the host (e.g. assert seeding happened, or add/remove a single repo + for a specific scenario). It authenticates with the admin user that the + ``gitea-admin`` sidecar created, over the published host port. + """ + + def __init__( + self, + base_url: str = GITEA_HOST_URL, + user: str = GITEA_USER, + password: str = GITEA_PASSWORD, + ): + self.base_url = base_url.rstrip("/") + self._user = user + self._auth = (user, password) + + def repo_exists(self, name: str) -> bool: + resp = requests.get( + f"{self.base_url}/api/v1/repos/{self._user}/{name}", + auth=self._auth, + timeout=10, + ) + return resp.status_code == 200 + + def list_repos(self) -> List[str]: + names: List[str] = [] + page = 1 + while True: + resp = requests.get( + f"{self.base_url}/api/v1/users/{self._user}/repos", + params={"page": page, "limit": 50}, + auth=self._auth, + timeout=10, + ) + resp.raise_for_status() + batch = resp.json() + if not batch: + break + names.extend(r["name"] for r in batch) + page += 1 + return names + + def create_repo(self, name: str) -> None: + if self.repo_exists(name): + return + resp = requests.post( + f"{self.base_url}/api/v1/user/repos", + json={"name": name, "private": False, "auto_init": True}, + auth=self._auth, + timeout=10, + ) + resp.raise_for_status() + + def delete_repo(self, name: str) -> None: + resp = requests.delete( + f"{self.base_url}/api/v1/repos/{self._user}/{name}", + auth=self._auth, + timeout=10, + ) + if resp.status_code not in (204, 404): + resp.raise_for_status() + + +def gitea_repo_url(name: str) -> str: + # url reachable from inside the opal_server container + return f"{GITEA_INTERNAL_URL}/{GITEA_USER}/{name}.git" + + +def make_repo_unreachable(name: str) -> str: + """Return a git URL for ``name`` pointing at the ``blackhole`` sidecar. + + Simulates an offline/unreachable policy repo: ``blackhole`` (alpine/socat) + accepts the TCP handshake then never answers, so the clone connects and + blocks reading the git smart-HTTP response — a deterministic hang that + exercises the missing fetch timeout on the scopes path (the bug PR3 fixes). + The URL keeps the same ``/{user}/{name}.git`` shape as a real Gitea repo so + the scope looks ordinary apart from the unreachable host. + """ + return f"http://{UNREACHABLE_HOST}/{GITEA_USER}/{name}.git" + + +def compose(*args: str, timeout: int = 1200) -> subprocess.CompletedProcess: + """Run `docker compose `; on failure, surface the captured output. + + `capture_output=True` keeps compose noise out of passing tests, but + a raw CalledProcessError shows only the exit code — so on failure we + re-raise with the captured stdout/stderr embedded, otherwise a + broken build/seed/ restart is opaque to debug. + + ``timeout`` (default 1200s) bounds each call: ``@pytest.mark.timeout`` does + not cover session-scoped *fixture setup*, so a wedged ``up``/``wait``/build + would otherwise hang to the CI job limit. On expiry we raise a clear error + (subprocess.run kills the process group) instead of blocking indefinitely. + """ + try: + proc = subprocess.run( + ["docker", "compose", *args], + cwd=_COMPOSE_DIR, + capture_output=True, + text=True, + timeout=timeout, + ) + except subprocess.TimeoutExpired as exc: + raise RuntimeError( + f"`docker compose {' '.join(args)}` timed out after {timeout}s\n" + f"--- stdout ---\n{exc.stdout or ''}\n--- stderr ---\n{exc.stderr or ''}" + ) from exc + if proc.returncode != 0: + raise RuntimeError( + f"`docker compose {' '.join(args)}` failed (exit {proc.returncode})\n" + f"--- stdout ---\n{proc.stdout}\n--- stderr ---\n{proc.stderr}" + ) + return proc + + +def worker_pids(service: str = "opal_server") -> set: + """Return the set of gunicorn *worker* PIDs running inside ``service``. + + The server runs ``gunicorn`` (master) + ``UvicornWorker`` children (see + ``scripts/start.sh``). When a worker's broadcaster reader gives up on a + backbone disconnect it triggers a graceful shutdown and gunicorn respawns + the worker with a *new* PID; the reconnecting broadcaster (PER-15065 / #915) + instead recovers the reader in place and the worker keeps its PID. Comparing + this set across a transient bounce is how the broadcaster test tells an + in-place reconnect apart from a worker respawn. + + Implemented over ``/proc`` (no ``ps`` in the slim image): every gunicorn + process' ``cmdline`` contains "gunicorn", and the master is the lowest PID + (it exists before it forks any worker), so the workers are the rest. The + match is done **host-side in Python**, not with ``grep gunicorn`` in the + container: the scanning command's own ``sh -c`` wrapper has "gunicorn" in + its command line, so an in-container grep would count that wrapper as a + third "worker". The dump command below contains neither "gunicorn" nor + "grep", so it cannot match itself. + """ + out = compose( + "exec", + "-T", + service, + "sh", + "-c", + # emit " " per process; tr -d strips the NUL arg + # separators so the args concatenate into one searchable token. + # `|| true`: a momentary read failure must not raise from compose(). + "for d in /proc/[0-9]*/; do p=${d#/proc/}; p=${p%/}; " + 'echo "$p $(cat "$d/cmdline" 2>/dev/null | tr -d "\\000")"; ' + "done || true", + ).stdout + pids = [] + for line in out.splitlines(): + pid_str, _, cmd = line.partition(" ") + if pid_str.isdigit() and "gunicorn" in cmd: + pids.append(int(pid_str)) + pids.sort() + if len(pids) <= 1: + return set() # only the master (or nothing) observed: no workers + return set(pids[1:]) # drop the master (lowest PID); the rest are workers + + +# The reconnecting broadcaster (PER-15065 / #915) logs this line every time its +# reader (re)connects to the backbone channel — once at boot, and once more on +# each reconnect after a backbone drop (pubsub_resilience.py `_reader_loop` -> +# `_ensure_connected`). Counting it across a Postgres bounce positively proves a +# disconnect+reconnect actually happened. +_BROADCASTER_CONNECT_LOG = "Broadcaster listener connected to channel" + + +def broadcaster_connect_count(service: str = "opal_server") -> int: + """Count broadcaster reader (re)connect log lines for ``service``. + + The postgres-bounce test asserts this COUNT *increased* across the bounce so + the gate positively confirms the backbone actually dropped and the reader + reconnected — without this, a bounce that failed to break the reader (a + future Postgres shutdown-signal change, connection pooling, etc.) would leave + the worker PIDs unchanged and pass the gate vacuously. Paired with + ``worker_pids()`` unchanged (which proves the recovery was *in place*, not a + respawn), the two together pin down the PER-15065 property. + """ + # --no-log-prefix strips the "service | " column so the marker matches cleanly. + out = compose("logs", "--no-log-prefix", service).stdout + return out.count(_BROADCASTER_CONNECT_LOG) + + +def bounce_postgres(down_seconds: int = 5) -> None: + compose("stop", "postgres") + time.sleep(down_seconds) + # `up -d --wait` blocks until Postgres passes its healthcheck again (plain + # `compose start` has no --wait), so a recovery poll that follows isn't + # racing an unready broadcaster. --no-recreate keeps the same container. + compose("up", "-d", "--wait", "--no-recreate", "postgres") + + +def list_seeded_repos(count: int) -> List[str]: + return [f"policy-repo-{i:04d}" for i in range(count)] + + +# A reserved repo seeded *outside* the numeric ``policy-repo-NNNN`` range that +# ``list_seeded_repos`` enumerates, so no boot/leak test ever clones it. The +# resilience offline-hang test uses it as its "healthy" probe so the scope must +# perform a genuine *clone* through the starved executor, rather than reusing an +# on-disk clone left by another test (clones live at ``base_dir/`` +# keyed by URL-hash and survive ``compose restart/stop/start`` — opal_server +# mounts no volume at ``/opal``; only ``down -v`` wipes them). Note serving the +# bundle (``make_bundle`` via ``run_sync``) shares that same fetch executor, so a +# pre-cloned shared repo would be starved on serve too — the never-cloned probe +# is belt-and-suspenders that additionally exercises the clone path. Keep this +# name in sync with ``RESERVED_REPOS`` in ``seed/seed_gitea.py``. +HEALTHY_PROBE_REPO = "policy-repo-healthy-probe" diff --git a/app-tests/git-leak/pytest.ini b/app-tests/git-leak/pytest.ini new file mode 100644 index 000000000..0d501534f --- /dev/null +++ b/app-tests/git-leak/pytest.ini @@ -0,0 +1,11 @@ +[pytest] +# Self-root the git-leak suite so `cd app-tests/git-leak && pytest --boot-scopes=N` +# is deterministic regardless of pytest version or cwd. +# +# The repo-root pytest.ini sets `testpaths = packages` to keep this docker suite +# out of the CI unit run. Per pytest, `testpaths` only applies when pytest is +# invoked *from the rootdir*, so running from here already collects this dir and +# loads its conftest (where --boot-scopes is registered). This empty [pytest] +# marker makes that guarantee explicit — it pins the rootdir to this directory +# when run in place, independent of the root config — while the root run (from +# the repo root) still never descends here. diff --git a/app-tests/git-leak/seed/Dockerfile b/app-tests/git-leak/seed/Dockerfile new file mode 100644 index 000000000..540118cd2 --- /dev/null +++ b/app-tests/git-leak/seed/Dockerfile @@ -0,0 +1,6 @@ +FROM python:3.11-slim +RUN apt-get update && apt-get install -y --no-install-recommends git \ + && rm -rf /var/lib/apt/lists/* +RUN pip install --no-cache-dir requests==2.32.3 GitPython==3.1.50 +COPY seed_gitea.py /seed_gitea.py +ENTRYPOINT ["python", "/seed_gitea.py"] diff --git a/app-tests/git-leak/seed/seed_gitea.py b/app-tests/git-leak/seed/seed_gitea.py new file mode 100644 index 000000000..674a54689 --- /dev/null +++ b/app-tests/git-leak/seed/seed_gitea.py @@ -0,0 +1,168 @@ +"""Seed a Gitea instance with N policy repos for the OPAL git-leak test bed. + +Idempotent: re-running creates only the missing repos. Each repo gets a +single commit containing a minimal OPA policy tree. + +Env: + GITEA_URL e.g. http://gitea:3000 + GITEA_ADMIN_USER admin username (created out-of-band by compose) + GITEA_ADMIN_PASSWORD admin password + REPO_COUNT how many repos to ensure exist (default 50) +""" +import os +import sys +import time +from pathlib import Path +from urllib.parse import urlsplit, urlunsplit + +import requests +from git import Actor, Repo + +POLICY_REGO = """package example + +default allow = false + +allow { + input.user == "admin" +} +""" + +DATA_JSON = '{"roles": {"admin": ["read", "write"]}}\n' + +# Reserved repos seeded in addition to the numeric ``policy-repo-NNNN`` set. +# These sit outside the range the boot/leak tests enumerate (so no test ever +# clones them) and back the resilience offline-hang test's "healthy" probe, +# which must force a fresh clone through the saturated executor rather than +# reuse a surviving on-disk clone. Keep in sync with ``HEALTHY_PROBE_REPO`` in +# ``helpers.py``. +RESERVED_REPOS = ("policy-repo-healthy-probe",) + + +def _wait_for_gitea(base_url: str, timeout: int = 120) -> None: + deadline = time.time() + timeout + while time.time() < deadline: + try: + if requests.get(f"{base_url}/api/v1/version", timeout=5).status_code == 200: + return + except requests.RequestException: + pass + time.sleep(2) + raise RuntimeError(f"Gitea not reachable at {base_url} within {timeout}s") + + +def _ensure_token(base_url: str, user: str, password: str) -> str: + name = "seed-token" + resp = requests.post( + f"{base_url}/api/v1/users/{user}/tokens", + auth=(user, password), + json={"name": name, "scopes": ["write:repository", "write:user"]}, + timeout=10, + ) + if resp.status_code == 201: + return resp.json()["sha1"] + # token already exists -> delete then recreate (Gitea won't reveal an existing secret) + requests.delete( + f"{base_url}/api/v1/users/{user}/tokens/{name}", + auth=(user, password), + timeout=10, + ) + resp = requests.post( + f"{base_url}/api/v1/users/{user}/tokens", + auth=(user, password), + json={"name": name, "scopes": ["write:repository", "write:user"]}, + timeout=10, + ) + resp.raise_for_status() + return resp.json()["sha1"] + + +def _ensure_repo(base_url: str, token: str, user: str, name: str) -> None: + headers = {"Authorization": f"token {token}"} + exists = requests.get( + f"{base_url}/api/v1/repos/{user}/{name}", headers=headers, timeout=10 + ) + if exists.status_code == 200: + return + created = requests.post( + f"{base_url}/api/v1/user/repos", + headers=headers, + json={"name": name, "private": False, "auto_init": False}, + timeout=10, + ) + created.raise_for_status() + + +def _push_policy( + base_url: str, token: str, user: str, name: str, workdir: Path +) -> None: + repo_dir = workdir / name + repo_dir.mkdir(parents=True, exist_ok=True) + (repo_dir / "example.rego").write_text(POLICY_REGO) + (repo_dir / "data.json").write_text(DATA_JSON) + + repo = Repo.init(repo_dir, initial_branch="main") + repo.index.add(["example.rego", "data.json"]) + author = Actor("seed", "seed@example.com") + repo.index.commit("seed policy", author=author, committer=author) + + # Inject credentials scheme-agnostically (works for http and https) rather + # than string-replacing "http://", which would silently drop the creds if + # GITEA_URL were ever https and produce an opaque auth failure. + parts = urlsplit(base_url) + push_url = urlunsplit( + (parts.scheme, f"{user}:{token}@{parts.netloc}", f"/{user}/{name}.git", "", "") + ) + origin = repo.create_remote("origin", push_url) + origin.push(refspec="main:main") + + +def main() -> int: + base_url = os.environ["GITEA_URL"].rstrip("/") + user = os.environ["GITEA_ADMIN_USER"] + password = os.environ["GITEA_ADMIN_PASSWORD"] + count = int(os.environ.get("REPO_COUNT", "50")) + + _wait_for_gitea(base_url) + token = _ensure_token(base_url, user, password) + + workdir = Path("/tmp/seed-work") + failures = [] + # the numeric set the boot/leak tests enumerate, plus the reserved repos + # (e.g. the resilience offline-hang test's never-cloned healthy probe) + names = [f"policy-repo-{i:04d}" for i in range(count)] + list(RESERVED_REPOS) + for name in names: + # Isolate per-repo failures: one bad push must not abort the loop and + # leave an indeterminate subset seeded. Collect failures and exit + # non-zero with a count so the harness sees a real seed error (and + # `docker compose wait seed` surfaces it) instead of a later, confusing + # load-gate timeout. + try: + _ensure_repo(base_url, token, user, name) + # only push if the repo is empty (freshly created) + head = requests.get( + f"{base_url}/api/v1/repos/{user}/{name}/branches/main", + headers={"Authorization": f"token {token}"}, + timeout=10, + ) + if head.status_code != 200: + _push_policy(base_url, token, user, name, workdir) + print(f"seeded {name}", flush=True) + except Exception as exc: # noqa: BLE001 - report, don't abort the loop + failures.append((name, repr(exc))) + print(f"FAILED {name}: {exc!r}", flush=True) + + total = len(names) + if failures: + print( + f"ERROR: seeded {total - len(failures)}/{total} repos; " + f"{len(failures)} failed (e.g. {failures[:3]})", + flush=True, + ) + return 1 + + print(f"DONE: ensured {total} repos", flush=True) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/app-tests/git-leak/test_boot.py b/app-tests/git-leak/test_boot.py new file mode 100644 index 000000000..f0b66c697 --- /dev/null +++ b/app-tests/git-leak/test_boot.py @@ -0,0 +1,77 @@ +import os +import time + +import pytest +import requests +from helpers import compose, gitea_repo_url, list_seeded_repos + + +@pytest.mark.timeout(2400) +def test_boot_loads_all_scopes(opal, repo_count): + """Measure how long a fresh boot takes to *serve* all scope repos. + + On master boot-sync is serial and slow (the ~20-min problem at scale). PR1 + records the baseline (loose target, so it passes here as a baseline + recorder); PR4 (boot parallelism) is what makes this a hard gate. + + Carry-forward for PR4: run with a tight ``BOOT_TARGET_SECONDS`` (the plan: + 120s @ 50 scopes) so the assertion actually gates the parallel-boot fix — + with the default loose target it always passes and only records a number. + + We measure a **deterministic cold boot**: ``--force-recreate`` gives the + container a fresh filesystem (opal_server mounts no volume at ``/opal``), so + ``preload_scopes`` must cold-clone all N repos from the Redis scope store — + exactly the fresh-pod scenario PR4 parallelizes. A plain ``restart`` would + instead leave a run-dependent mix of complete on-disk clones (fast warm + re-discover) and clones killed mid-flight by the restart (cold re-clone), + making ``elapsed`` non-deterministic and a poor basis for PR4's tight gate. + + Completion is keyed on every scope's bundle being *served* + (``GET /scopes/{id}/policy`` == 200), not on the ``repo_locks`` cache count: + ``repo_locks`` is set at fetch *start*, so stopping the clock on it would end + before the final clone finishes and undercount the boot-sync window this gate + exists to measure. + """ + n = repo_count + repos = list_seeded_repos(n) + scope_ids = [f"boot-{i}" for i in range(n)] + for scope_id, name in zip(scope_ids, repos): + opal.put_scope(scope_id, gitea_repo_url(name)) + + # Start the clock at the recreate, not after wait_healthy: preload_scopes() + # runs in gunicorn's `when_ready` (before workers accept traffic), so by the + # time /healthcheck answers, boot-sync may already be partly done — starting + # the clock later would undercount it. --force-recreate wipes the container FS + # so the scopes (still in Redis) are all cold-cloned by preload, a + # deterministic fresh-boot measurement. --no-deps leaves redis/postgres/gitea + # untouched. + start = time.time() + compose("up", "-d", "--no-deps", "--force-recreate", "opal_server") + opal.wait_healthy(timeout=600) + + # Poll until every scope serves its bundle. Re-check only the not-yet-served + # ones so the work drains as scopes come online; a not-yet-cloned scope + # returns a non-200 quickly (it does not block), so this stays cheap. + served = set() + poll_deadline = time.time() + 1800 + while time.time() < poll_deadline: + for scope_id in scope_ids: + if scope_id in served: + continue + try: + if opal.get_scope_policy(scope_id).status_code == 200: + served.add(scope_id) + except requests.RequestException: + pass + if len(served) == n: + break + time.sleep(2) + elapsed = time.time() - start + + BOOT_TARGET_SECONDS = int(os.environ.get("BOOT_TARGET_SECONDS", "2000")) + print( + f"boot served {len(served)}/{n} scopes in {elapsed:.1f}s " + f"(target {BOOT_TARGET_SECONDS}s)" + ) + assert len(served) == n, f"only {len(served)}/{n} scopes served after boot" + assert elapsed < BOOT_TARGET_SECONDS, f"boot too slow: {elapsed:.1f}s" diff --git a/app-tests/git-leak/test_leak.py b/app-tests/git-leak/test_leak.py new file mode 100644 index 000000000..800b23afc --- /dev/null +++ b/app-tests/git-leak/test_leak.py @@ -0,0 +1,125 @@ +import time + +import pytest +import requests +from helpers import gitea_repo_url, list_seeded_repos + + +def _wait_until(predicate, timeout=30, interval=0.5): + """Poll ``predicate`` until true or ``timeout`` elapses. + + The predicates here read ``/internal`` via ``opal.stats()``, which does + ``raise_for_status()``. A *transient* request error (a momentary 5xx, a + connection reset, or a read racing an in-flight mutation) is treated as "not + yet" and retried instead of propagating and turning the whole test into an + ERROR before the deadline. A *persistent* failure still surfaces: the wait + just returns ``False`` and the caller's ``assert`` fires with the last stats. + """ + deadline = time.time() + timeout + while time.time() < deadline: + try: + if predicate(): + return True + except requests.RequestException: + pass + time.sleep(interval) + return False + + +def _load_scopes(opal, prefix, names): + """PUT a scope per repo, then force a second sync so all three caches fill. + + The first sync of a fresh scope takes the clone path, which populates only + ``repo_locks`` — ``repos`` and ``repos_last_fetched`` are filled solely by + the discover/fetch path on a *subsequent* sync. ``refresh_all()`` triggers + that second sync, so after this returns all three caches reflect the N + scopes and the drain assertions test a cache the sync path actually fills. + + Returns the per-key load count reached (max over a wait). + """ + n = len(names) + for i, name in enumerate(names): + opal.put_scope(f"{prefix}-{i}", gitea_repo_url(name)) + # repo_locks is populated on the first sync (the clone path), so it is the + # deterministic signal that every scope was at least picked up. + locked = _wait_until(lambda: opal.stats()["repo_locks"] >= n, timeout=600) + assert locked, f"initial load never locked {n} repos: {opal.stats()}" + # force the discover/fetch path so `repos` / `repos_last_fetched` fill too + opal.refresh_all() + fetched = _wait_until(lambda: opal.stats()["repos"] >= n, timeout=600) + assert fetched, f"refresh never populated {n} repos: {opal.stats()}" + return opal.stats() + + +@pytest.mark.timeout(900) +def test_churn_releases_caches(opal, repo_count): + """Create then delete many scopes; the three caches must return to empty. + + FAILS on this branch (without PR2): delete_scope never purges the + GitPolicyFetcher caches, so they stay populated after every scope is + gone. Becomes green once PR2 lands. + """ + n = min(repo_count, 100) + repos = list_seeded_repos(n) + loaded = _load_scopes(opal, "churn", repos) + assert loaded["repo_locks"] >= n and loaded["repos"] >= n, loaded + rss_loaded = loaded["rss_kb"] + + for i in range(n): + opal.delete_scope(f"churn-{i}") + + # all three caches must drain to empty once every scope is deleted. Read a + # single stats snapshot per poll so the three keys reflect the same + # observation (and to avoid 3x the HTTP round-trips per iteration). + def _all_caches_empty() -> bool: + s = opal.stats(samples=1) + return s["repo_locks"] == 0 and s["repos"] == 0 and s["repos_last_fetched"] == 0 + + released = _wait_until(_all_caches_empty, timeout=60) + stats = opal.stats() + assert released, f"caches did not drain after deleting all scopes: {stats}" + + # The cache drain above is the gate. RSS is only a loose backstop here: + # freeing the caches need not return memory to the OS (glibc/Python keep + # arenas), so this guards against a *gross* leak — RSS ballooning well past + # the loaded peak — without false-failing on allocator slack once PR2 lands. + rss_budget = rss_loaded + max(100_000, rss_loaded // 2) + assert stats["rss_kb"] <= rss_budget, ( + f"RSS ballooned across churn: {rss_loaded} -> {stats['rss_kb']} kb " + f"(budget {rss_budget})" + ) + + +@pytest.mark.timeout(900) +def test_repeat_sync_rss_stays_bounded(opal, repo_count): + """Re-syncing the *same* scopes must not leak per-sync memory (RSS guard). + + Deliberately an **RSS** guard, not a cache-count leak gate. The clone caches + are keyed by repo URL (``source_id`` = sha256(url)+branch-shard), so + re-syncing identical scopes reuses the existing ``repos`` / + ``repos_last_fetched`` entries; the cache *counts* cannot grow for *any* + implementation. A ``len(repos)`` assertion would therefore be tautological + (it can't fail), so it is intentionally omitted here — do not re-add it as a + "gate". The load-bearing assertion is RSS: it catches a regression where each + repeat sync leaks per-sync allocations even while the entry count stays flat. + + The unbounded-growth-then-no-purge-on-delete leak is covered by + ``test_churn_releases_caches`` above, which uses *distinct* scopes. + """ + n = min(repo_count, 50) + repos = list_seeded_repos(n) + loaded = _load_scopes(opal, "stable", repos) + baseline_rss = loaded["rss_kb"] + + for _ in range(10): + opal.refresh_all() + time.sleep(2) + + grown = opal.stats() + # allow generous headroom for allocator slack; fail only on a real per-sync + # leak (10 refreshes of N scopes ballooning RSS). + rss_budget = baseline_rss + max(50_000, baseline_rss // 5) + assert grown["rss_kb"] <= rss_budget, ( + f"RSS grew unboundedly on repeat sync: " + f"{baseline_rss} -> {grown['rss_kb']} kb (budget {rss_budget})" + ) diff --git a/app-tests/git-leak/test_resilience.py b/app-tests/git-leak/test_resilience.py new file mode 100644 index 000000000..d2ef2fd3e --- /dev/null +++ b/app-tests/git-leak/test_resilience.py @@ -0,0 +1,189 @@ +import time + +import pytest +import requests +from helpers import ( + HEALTHY_PROBE_REPO, + bounce_postgres, + broadcaster_connect_count, + gitea_repo_url, + list_seeded_repos, + make_repo_unreachable, + worker_pids, +) + +# Enough hanging clones to exhaust opal's default fetch executor +# (run_sync -> run_in_executor(None, ...), a ThreadPoolExecutor of +# min(32, cpu+4) workers). One hang wouldn't starve a multi-thread pool, so we +# saturate it with many; after PR3's fetch timeout these give up and free their +# threads, letting the healthy scope through. +OFFLINE_REPOS = 40 + + +@pytest.mark.timeout(420) +def test_offline_repo_does_not_block_healthy_scopes(opal, repo_count): + """Unreachable repos must not stop a healthy scope from serving. + + FAILS on this branch (without PR3): the scopes path has no fetch + timeout, so the hung clones of the offline repos occupy the shared + fetch executor and the healthy scope's bundle never becomes + available. + """ + try: + # the `blackhole` sidecar accepts the TCP handshake then never answers, so + # each of these clones hangs (holding a fetch-executor thread) rather than + # failing fast; enough of them saturate the pool. These PUTs live *inside* + # the try so that if one fails partway through, the finally still runs + # hard_reset() — otherwise the clones already dispatched to the executor + # would hang for the blackhole's full duration and starve the + # session-scoped stack for every later test. + for i in range(OFFLINE_REPOS): + opal.put_scope( + f"offline-{i}", make_repo_unreachable(f"dead-{i}"), branch="main" + ) + + # Point the healthy scope at a repo *no other test clones* + # (HEALTHY_PROBE_REPO is seeded outside the numeric range the boot/leak + # tests enumerate) so it must perform a genuine clone rather than reuse a + # surviving on-disk clone via _discover_repository. Serving the bundle + # shares the same starved executor too, so a shared repo would also stay + # red here — the never-cloned probe additionally guarantees the *clone* + # itself is exercised through the starved pool (fails correctly on this + # branch, no PR3 timeout). + opal.put_scope("healthy", gitea_repo_url(HEALTHY_PROBE_REPO)) + + # The healthy scope must become *servable* within a bounded time even + # while the offline scopes hang. A 200 from its policy bundle proves the + # clone completed and the scope is served — a stronger signal than a + # cache count, and exactly what the offline hang starves on master. + # + # A 200 here can't be a *masked* default bundle: GET /{scope}/policy + # falls back to the "default" scope on a bad/missing repo, but this bed + # never creates a "default" scope, so that fallback raises instead of + # returning 200. The only way to get 200 is the healthy clone completing. + deadline = time.time() + 90 + served = False + last = None + while time.time() < deadline: + try: + resp = opal.get_scope_policy("healthy") + last = resp.status_code + if resp.status_code == 200: + served = True + break + except requests.RequestException as exc: # may stall when starved + last = repr(exc) + time.sleep(2) + assert served, ( + f"healthy scope never served while {OFFLINE_REPOS} offline repos " + f"were hanging (last policy response: {last})" + ) + finally: + # The offline clones hang for the blackhole's full duration, occupying + # executor threads. On the session-scoped stack that would starve every + # later test, and per-scope DELETEs would queue behind the hung threads. + # hard_reset stops the server (killing the hung threads), flushes the + # Redis scope store so preload doesn't re-clone them, and restarts clean. + opal.hard_reset() + + +@pytest.mark.timeout(420) +def test_server_recovers_after_postgres_bounce(opal_multiworker, repo_count): + """A transient Postgres (broadcaster) outage must reconnect *in place*. + + Runs **2 workers** (via the ``opal_multiworker`` fixture) so the Postgres + backbone is actually exercised: cross-worker fan-out only happens with >=2 + workers (references/debug-pubsub.md §3-4). A single worker fans out + in-process and never touches the backbone, so it can't tell #915's in-place + reconnect from a plain worker respawn — which is why the previous + single-worker version of this test passed either way. + + Guards PER-15065 (#915): on a backbone disconnect the reconnecting + broadcaster recovers the reader in process (retry-forever by default), so the + worker keeps its PID. Before that fix the disconnect escalated to a graceful + worker shutdown and gunicorn respawned the worker with a *new* PID. We assert + (a) the worker PIDs are unchanged across the bounce — the in-place-reconnect + signal; (b) the backbone reader actually dropped and reconnected (its connect + log count increased), so (a) is not vacuously true because the bounce failed + to break anything; and (c) a scope PUT after the bounce becomes servable, + proving the broadcast/sync path itself recovered (not just HTTP liveness). + """ + opal = opal_multiworker + + before = worker_pids() + assert ( + len(before) == 2 + ), f"expected 2 gunicorn workers for this test, got {sorted(before)}" + # baseline reader (re)connect count — assertion (b) requires it to increase + before_connects = broadcaster_connect_count() + + bounce_postgres(down_seconds=5) + + # HTTP must come back first. A respawn would also satisfy this — hence the + # PID check below, which a respawn would *not* satisfy. + deadline = time.time() + 90 + recovered = False + while time.time() < deadline: + try: + opal.wait_healthy(timeout=5) + recovered = True + break + except (requests.RequestException, RuntimeError): + # RequestException: HTTP not back yet; RuntimeError: wait_healthy timed out + time.sleep(2) + assert recovered, "server did not recover HTTP within 90s of a postgres bounce" + + # (a) in-place reconnect: the workers must be the *same* processes. A changed + # PID means the broadcaster gave up and gunicorn respawned the worker — the + # pre-#915 behavior this guards against. + after = worker_pids() + assert after == before, ( + f"workers respawned across the bounce ({sorted(before)} -> {sorted(after)}); " + f"the broadcaster did not reconnect in place (PER-15065 regressed)" + ) + + # (b) the bounce actually broke and reconnected the backbone reader — so (a) + # is not vacuously true because nothing dropped. The reconnecting broadcaster + # logs a fresh connect line on every reconnect; the count must strictly + # increase over the pre-bounce baseline (poll, since the reconnect lags the + # Postgres healthcheck by the backoff + resync-settle window). + deadline = time.time() + 60 + reconnected = False + after_connects = before_connects + while time.time() < deadline: + after_connects = broadcaster_connect_count() + if after_connects > before_connects: + reconnected = True + break + time.sleep(2) + assert reconnected, ( + f"broadcaster reader never logged a reconnect after the bounce " + f"(connect count stayed {before_connects}); the bounce may not have " + f"dropped the backbone, which would make the PID-unchanged check above " + f"vacuous" + ) + + # (c) the broadcast/sync path recovered: a freshly PUT scope must become + # servable. A 200 from its bundle proves the leader received the sync and + # cloned the repo after the backbone returned. We assert on a served bundle + # rather than /internal cache counts, which are per-process and so not + # deterministic to read on a 2-worker stack. + healthy = list_seeded_repos(1)[0] + opal.put_scope("post-bounce", gitea_repo_url(healthy)) + served = False + last = None + deadline = time.time() + 120 + while time.time() < deadline: + try: + resp = opal.get_scope_policy("post-bounce") + last = resp.status_code + if resp.status_code == 200: + served = True + break + except requests.RequestException as exc: + last = repr(exc) + time.sleep(2) + assert served, ( + f"scope PUT after the bounce never became servable (last: {last}); " + f"the broadcaster/sync path did not recover" + ) diff --git a/packages/opal-server/opal_server/config.py b/packages/opal-server/opal_server/config.py index 58341ff37..8c107dff5 100644 --- a/packages/opal-server/opal_server/config.py +++ b/packages/opal-server/opal_server/config.py @@ -404,6 +404,15 @@ class OpalServerConfig(Confi): description="Set if OPAL server should enable tracing with datadog APM", ) + DEBUG_INTERNAL_STATS = confi.bool( + "DEBUG_INTERNAL_STATS", + False, + description=( + "Expose GET /internal/git-fetcher-cache-stats with in-memory cache " + "sizes and process RSS. For diagnostics/tests only; keep off in production." + ), + ) + SCOPES = confi.bool("SCOPES", default=False, description="Enable scopes") SCOPES_REPO_CLONES_SHARDS = confi.int( diff --git a/packages/opal-server/opal_server/debug_stats.py b/packages/opal-server/opal_server/debug_stats.py new file mode 100644 index 000000000..5439ce950 --- /dev/null +++ b/packages/opal-server/opal_server/debug_stats.py @@ -0,0 +1,61 @@ +"""Read-only introspection of the git-fetcher in-memory caches. + +Used only by the off-by-default /internal stats endpoint so tests can +observe the cache growth that the memory-leak fix (PR2) eliminates. +""" +from pathlib import Path +from typing import Dict, List, Optional + +from fastapi import FastAPI, params +from opal_server.git_fetcher import GitPolicyFetcher + + +def _read_rss_kb() -> int: + """Resident set size of this process in kilobytes (Linux), else 0.""" + try: + for line in Path("/proc/self/status").read_text().splitlines(): + if line.startswith("VmRSS:"): + return int(line.split()[1]) + except (OSError, ValueError, IndexError): + return 0 + return 0 + + +def git_fetcher_cache_stats() -> Dict[str, int]: + """Sizes of the three process-global GitPolicyFetcher caches + RSS.""" + return { + "repo_locks": len(GitPolicyFetcher.repo_locks), + "repos": len(GitPolicyFetcher.repos), + "repos_last_fetched": len(GitPolicyFetcher.repos_last_fetched), + "rss_kb": _read_rss_kb(), + } + + +def register_internal_stats_route( + app: FastAPI, + enabled: bool, + dependencies: Optional[List[params.Depends]] = None, +) -> None: + """Mount GET /internal/git-fetcher-cache-stats only when enabled. + + ``dependencies`` are applied to the route (e.g. the server's + ``JWTAuthenticator``) so the endpoint is protected when JWT verification + is enabled. When verification is disabled — as in the test bed, which + leaves ``OPAL_AUTH_PUBLIC_KEY`` unset — the authenticator is a no-op and + the route stays reachable without a token. + """ + if not enabled: + return + + # Deliberately a sync def: Starlette runs it in its own threadpool, which is + # independent of the default loop executor opal uses for git fetches + # (run_sync -> run_in_executor(None, ...)). So this endpoint keeps answering + # even when hung clones saturate the fetch executor — which is exactly the + # condition the offline-repo test observes through it. + @app.get( + "/internal/git-fetcher-cache-stats", + include_in_schema=False, + dependencies=dependencies or [], + ) + def _git_fetcher_cache_stats() -> Dict[str, int]: + return git_fetcher_cache_stats() diff --git a/packages/opal-server/opal_server/server.py b/packages/opal-server/opal_server/server.py index 00de60747..0946d3d2c 100644 --- a/packages/opal-server/opal_server/server.py +++ b/packages/opal-server/opal_server/server.py @@ -27,6 +27,7 @@ from opal_server.config import opal_server_config from opal_server.data.api import init_data_updates_router from opal_server.data.data_update_publisher import DataUpdatePublisher +from opal_server.debug_stats import register_internal_stats_route from opal_server.loadlimiting import init_loadlimit_router from opal_server.policy.bundles.api import router as bundles_router from opal_server.policy.watcher.factory import setup_watcher_task @@ -313,6 +314,12 @@ def healthcheck(): ) return {"status": "ok"} + register_internal_stats_route( + app, + enabled=opal_server_config.DEBUG_INTERNAL_STATS, + dependencies=[Depends(authenticator)], + ) + return app def _configure_lifecycle_callbacks(self, app: FastAPI): diff --git a/packages/opal-server/opal_server/tests/debug_stats_endpoint_test.py b/packages/opal-server/opal_server/tests/debug_stats_endpoint_test.py new file mode 100644 index 000000000..4abd1abb1 --- /dev/null +++ b/packages/opal-server/opal_server/tests/debug_stats_endpoint_test.py @@ -0,0 +1,40 @@ +from fastapi import Depends, FastAPI, HTTPException +from fastapi.testclient import TestClient +from opal_server.debug_stats import register_internal_stats_route + + +def _app_with_flag(enabled: bool) -> FastAPI: + app = FastAPI() + register_internal_stats_route(app, enabled=enabled) + return app + + +def test_endpoint_absent_when_disabled(): + client = TestClient(_app_with_flag(False)) + assert client.get("/internal/git-fetcher-cache-stats").status_code == 404 + + +def test_endpoint_present_when_enabled(): + client = TestClient(_app_with_flag(True)) + resp = client.get("/internal/git-fetcher-cache-stats") + assert resp.status_code == 200 + body = resp.json() + assert set(body) == {"repo_locks", "repos", "repos_last_fetched", "rss_kb"} + + +def test_endpoint_applies_passed_dependencies(): + """A route dependency (e.g. the server's authenticator) is enforced. + + Mirrors how server.py wires the real JWTAuthenticator: when + verification is enabled the dependency rejects unauthenticated + reads; when disabled it is a no-op (covered by the test above, which + passes no dependency). + """ + + def _deny(): + raise HTTPException(status_code=401, detail="unauthorized") + + app = FastAPI() + register_internal_stats_route(app, enabled=True, dependencies=[Depends(_deny)]) + resp = TestClient(app).get("/internal/git-fetcher-cache-stats") + assert resp.status_code == 401 diff --git a/packages/opal-server/opal_server/tests/debug_stats_test.py b/packages/opal-server/opal_server/tests/debug_stats_test.py new file mode 100644 index 000000000..c67fb3530 --- /dev/null +++ b/packages/opal-server/opal_server/tests/debug_stats_test.py @@ -0,0 +1,28 @@ +import sys + +from opal_server.config import opal_server_config +from opal_server.debug_stats import git_fetcher_cache_stats +from opal_server.git_fetcher import GitPolicyFetcher + + +def test_stats_report_dict_sizes(monkeypatch): + monkeypatch.setattr(GitPolicyFetcher, "repo_locks", {"a": object()}) + monkeypatch.setattr(GitPolicyFetcher, "repos", {"p1": object(), "p2": object()}) + monkeypatch.setattr(GitPolicyFetcher, "repos_last_fetched", {}) + + stats = git_fetcher_cache_stats() + + assert stats["repo_locks"] == 1 + assert stats["repos"] == 2 + assert stats["repos_last_fetched"] == 0 + assert isinstance(stats["rss_kb"], int) + # On Linux /proc/self/status exists, so RSS reading must actually work; on + # other platforms _read_rss_kb falls back to 0 and the wiring is untestable. + if sys.platform.startswith("linux"): + assert stats["rss_kb"] > 0 + else: + assert stats["rss_kb"] >= 0 + + +def test_internal_stats_flag_defaults_off(): + assert opal_server_config.DEBUG_INTERNAL_STATS is False diff --git a/pytest.ini b/pytest.ini index 16c88ba91..c98ec88b9 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,3 +1,10 @@ # Handling DeprecationWarning 'asyncio_mode' default value [pytest] asyncio_mode = strict +# Scope the default (rootdir) collection to the unit tests under packages/. +# The heavyweight, docker-compose-driven test bed under app-tests/git-leak/ is +# designed to FAIL on master (it is the regression gate for later PRs) and must +# not be collected by the repo's CI `pytest` run. testpaths only applies when +# pytest is invoked from the rootdir with no args, so running it explicitly +# (e.g. `cd app-tests/git-leak && pytest`) still works. +testpaths = packages