diff --git a/packages/opal-server/opal_server/git_fetcher.py b/packages/opal-server/opal_server/git_fetcher.py index e52083c98..8daee2a8e 100644 --- a/packages/opal-server/opal_server/git_fetcher.py +++ b/packages/opal-server/opal_server/git_fetcher.py @@ -366,6 +366,26 @@ def base_dir(base_dir: Path) -> Path: def repo_clone_path(base_dir: Path, source: GitPolicyScopeSource) -> Path: return GitPolicyFetcher.base_dir(base_dir) / GitPolicyFetcher.source_id(source) + @staticmethod + def forget_repo(path: str) -> None: + """Drop the cached repository for a clone path and release its handles. + + The cached ``pygit2.Repository`` keeps OS file descriptors and mmapped + pack indexes open; without this, a deleted scope's repo pins memory and + inodes for the lifetime of the process even after the clone is removed. + ``Repository.free()`` is called only when available (it is not part of + every pygit2 release); otherwise the dropped reference is reclaimed by GC. + """ + repo = GitPolicyFetcher.repos.pop(path, None) + if repo is None: + return + free = getattr(repo, "free", None) + if callable(free): + try: + free() + except Exception: + logger.debug("pygit2 Repository.free() failed; relying on GC") + class GitCallback(RemoteCallbacks): def __init__(self, source: GitPolicyScopeSource): diff --git a/packages/opal-server/opal_server/policy/watcher/task.py b/packages/opal-server/opal_server/policy/watcher/task.py index d420d183c..1afba7ef2 100644 --- a/packages/opal-server/opal_server/policy/watcher/task.py +++ b/packages/opal-server/opal_server/policy/watcher/task.py @@ -28,11 +28,10 @@ async def __aexit__(self, exc_type, exc, tb): async def _on_webhook(self, topic: Topic, data: Any): logger.info(f"Webhook listener triggered ({len(self._webhook_tasks)})") - for task in self._webhook_tasks: - if task.done(): - # Clean references to finished tasks - self._webhook_tasks.remove(task) - + # Rebuild rather than remove-while-iterating: list.remove() inside a + # `for t in self._webhook_tasks` loop skips the element after each removal, + # so finished tasks accumulate. + self._webhook_tasks = [t for t in self._webhook_tasks if not t.done()] self._webhook_tasks.append(asyncio.create_task(self.trigger(topic, data))) async def _listen_to_webhook_notifications(self): diff --git a/packages/opal-server/opal_server/scopes/service.py b/packages/opal-server/opal_server/scopes/service.py index d3df4972f..64cf3b8a5 100644 --- a/packages/opal-server/opal_server/scopes/service.py +++ b/packages/opal-server/opal_server/scopes/service.py @@ -162,24 +162,34 @@ async def delete_scope(self, scope_id: str): with tracer.trace("scopes_service.delete_scope", resource=scope_id): logger.info(f"Delete scope: {scope_id}") scope = await self._scopes.get(scope_id) - url = scope.policy.url - - scopes = await self._scopes.all() - remove_repo_clone = True - - for scope in scopes: - if scope.scope_id != scope_id and scope.policy.url == url: - logger.info( - f"found another scope with same remote url ({scope.scope_id}), skipping clone deletion" - ) - remove_repo_clone = False - break + deleted_source = cast(GitPolicyScopeSource, scope.policy) + deleted_source_id = GitPolicyFetcher.source_id(deleted_source) + scope_dir = GitPolicyFetcher.repo_clone_path(self._base_dir, deleted_source) + + # Clone dir, the `repos` handle cache, and `repos_last_fetched` are + # all keyed by source_id (= the clone path). A sibling only shares + # storage when it resolves to the same source_id; same url with a + # different branch can shard to a different source_id (and a + # different clone dir) when SCOPES_REPO_CLONES_SHARDS > 1, so gate on + # source_id, not url — otherwise the deleted scope's clone + pygit2 + # handle leak. + other_scopes = [ + s for s in await self._scopes.all() if s.scope_id != scope_id + ] + source_id_shared = any( + isinstance(s.policy, GitPolicyScopeSource) + and GitPolicyFetcher.source_id(s.policy) == deleted_source_id + for s in other_scopes + ) - if remove_repo_clone: - scope_dir = GitPolicyFetcher.repo_clone_path( - self._base_dir, cast(GitPolicyScopeSource, scope.policy) + if source_id_shared: + logger.info( + "Another scope shares the same clone (source id), skipping clone deletion" ) + else: shutil.rmtree(scope_dir, ignore_errors=True) + GitPolicyFetcher.forget_repo(str(scope_dir)) + GitPolicyFetcher.repos_last_fetched.pop(deleted_source_id, None) await self._scopes.delete(scope_id) diff --git a/packages/opal-server/opal_server/tests/delete_scope_cache_purge_test.py b/packages/opal-server/opal_server/tests/delete_scope_cache_purge_test.py new file mode 100644 index 000000000..4a4e0a5e7 --- /dev/null +++ b/packages/opal-server/opal_server/tests/delete_scope_cache_purge_test.py @@ -0,0 +1,132 @@ +import pytest +from opal_common.schemas.policy_source import GitPolicyScopeSource, NoAuthData +from opal_common.schemas.scopes import Scope +from opal_server.git_fetcher import GitPolicyFetcher +from opal_server.scopes.scope_repository import ScopeNotFoundError +from opal_server.scopes.service import ScopesService + + +class FakeScopeRepository: + def __init__(self, scopes): + self._scopes = {s.scope_id: s for s in scopes} + + async def get(self, scope_id): + if scope_id not in self._scopes: + raise ScopeNotFoundError(scope_id) + return self._scopes[scope_id] + + async def all(self): + return list(self._scopes.values()) + + async def delete(self, scope_id): + self._scopes.pop(scope_id, None) + + +def _scope(scope_id, url, branch="main"): + return Scope( + scope_id=scope_id, + policy=GitPolicyScopeSource( + source_type="git", + url=url, + branch=branch, + auth=NoAuthData(auth_type="none"), + ), + data={"entries": []}, + ) + + +@pytest.fixture(autouse=True) +def clear_caches(): + GitPolicyFetcher.repos.clear() + GitPolicyFetcher.repos_last_fetched.clear() + GitPolicyFetcher.repo_locks.clear() + yield + GitPolicyFetcher.repos.clear() + GitPolicyFetcher.repos_last_fetched.clear() + GitPolicyFetcher.repo_locks.clear() + + +@pytest.mark.asyncio +async def test_delete_unique_scope_purges_caches(tmp_path, monkeypatch): + scope = _scope("only", "https://git/repo-a.git") + repo = FakeScopeRepository([scope]) + svc = ScopesService(base_dir=tmp_path, scopes=repo, pubsub_endpoint=None) + + src = scope.policy + sid = GitPolicyFetcher.source_id(src) + clone_path = str(GitPolicyFetcher.repo_clone_path(tmp_path, src)) + GitPolicyFetcher.repos[clone_path] = object() + GitPolicyFetcher.repos_last_fetched[sid] = "ts" + + monkeypatch.setattr( + "opal_server.scopes.service.shutil.rmtree", lambda *a, **k: None + ) + + await svc.delete_scope("only") + + assert clone_path not in GitPolicyFetcher.repos + assert sid not in GitPolicyFetcher.repos_last_fetched + + +@pytest.mark.asyncio +async def test_delete_keeps_caches_when_sibling_shares_source(tmp_path, monkeypatch): + a = _scope("a", "https://git/shared.git") + b = _scope("b", "https://git/shared.git") # same url+branch -> same source_id + repo = FakeScopeRepository([a, b]) + svc = ScopesService(base_dir=tmp_path, scopes=repo, pubsub_endpoint=None) + + sid = GitPolicyFetcher.source_id(a.policy) + clone_path = str(GitPolicyFetcher.repo_clone_path(tmp_path, a.policy)) + GitPolicyFetcher.repos[clone_path] = object() + GitPolicyFetcher.repos_last_fetched[sid] = "ts" + + rmtree_calls = [] + monkeypatch.setattr( + "opal_server.scopes.service.shutil.rmtree", + lambda p, **k: rmtree_calls.append(p), + ) + + await svc.delete_scope("a") + + assert rmtree_calls == [] # sibling shares the source id; clone must survive + assert clone_path in GitPolicyFetcher.repos + assert sid in GitPolicyFetcher.repos_last_fetched + + +@pytest.mark.asyncio +async def test_delete_purges_when_sibling_shares_url_but_not_source( + tmp_path, monkeypatch +): + """Same url, different branch, sharded clones (SCOPES_REPO_CLONES_SHARDS>1) + resolve to different source_ids -> different clone dirs. + + Deleting one must still purge its own clone + caches; the url- + sharing sibling lives elsewhere. + """ + # shards=4: branch "main" -> index 1, "dev" -> index 3 (distinct source_ids). + monkeypatch.setattr( + "opal_server.git_fetcher.opal_server_config.SCOPES_REPO_CLONES_SHARDS", 4 + ) + a = _scope("a", "https://git/shared.git", branch="main") + b = _scope("b", "https://git/shared.git", branch="dev") # same url, diff source_id + assert GitPolicyFetcher.source_id(a.policy) != GitPolicyFetcher.source_id(b.policy) + + repo = FakeScopeRepository([a, b]) + svc = ScopesService(base_dir=tmp_path, scopes=repo, pubsub_endpoint=None) + + sid_a = GitPolicyFetcher.source_id(a.policy) + clone_path_a = str(GitPolicyFetcher.repo_clone_path(tmp_path, a.policy)) + GitPolicyFetcher.repos[clone_path_a] = object() + GitPolicyFetcher.repos_last_fetched[sid_a] = "ts" + + rmtree_calls = [] + monkeypatch.setattr( + "opal_server.scopes.service.shutil.rmtree", + lambda p, **k: rmtree_calls.append(str(p)), + ) + + await svc.delete_scope("a") + + assert rmtree_calls == [clone_path_a] # its own clone dir removed + assert clone_path_a not in GitPolicyFetcher.repos + assert sid_a not in GitPolicyFetcher.repos_last_fetched diff --git a/packages/opal-server/opal_server/tests/forget_repo_test.py b/packages/opal-server/opal_server/tests/forget_repo_test.py new file mode 100644 index 000000000..b6e223cf1 --- /dev/null +++ b/packages/opal-server/opal_server/tests/forget_repo_test.py @@ -0,0 +1,31 @@ +from opal_server.git_fetcher import GitPolicyFetcher + + +class _FakeRepo: + def __init__(self): + self.freed = False + + def free(self): + self.freed = True + + +def test_forget_repo_pops_and_frees(monkeypatch): + fake = _FakeRepo() + monkeypatch.setattr(GitPolicyFetcher, "repos", {"/clones/x": fake}) + + GitPolicyFetcher.forget_repo("/clones/x") + + assert "/clones/x" not in GitPolicyFetcher.repos + assert fake.freed is True + + +def test_forget_repo_unknown_path_is_noop(monkeypatch): + monkeypatch.setattr(GitPolicyFetcher, "repos", {}) + GitPolicyFetcher.forget_repo("/clones/missing") # must not raise + assert GitPolicyFetcher.repos == {} + + +def test_forget_repo_without_free_method(monkeypatch): + monkeypatch.setattr(GitPolicyFetcher, "repos", {"/clones/y": object()}) + GitPolicyFetcher.forget_repo("/clones/y") # object() has no .free(); must not raise + assert "/clones/y" not in GitPolicyFetcher.repos diff --git a/packages/opal-server/opal_server/tests/webhook_task_cleanup_test.py b/packages/opal-server/opal_server/tests/webhook_task_cleanup_test.py new file mode 100644 index 000000000..a576d924c --- /dev/null +++ b/packages/opal-server/opal_server/tests/webhook_task_cleanup_test.py @@ -0,0 +1,35 @@ +import asyncio + +import pytest +from opal_server.policy.watcher.task import BasePolicyWatcherTask + + +class _Watcher(BasePolicyWatcherTask): + async def trigger(self, topic, data): + return None # fast no-op so the created task finishes quickly + + +@pytest.mark.asyncio +async def test_done_tasks_are_all_removed(): + w = _Watcher(pubsub_endpoint=None) + + async def _done(): + return None + + # three already-finished tasks pre-loaded into the list + finished = [asyncio.create_task(_done()) for _ in range(3)] + await asyncio.gather(*finished) + w._webhook_tasks = list(finished) + + await w._on_webhook("webhook", None) + await asyncio.sleep(0) # let the newly created trigger task finish + + # all 3 done ones removed... + remaining_done = [t for t in w._webhook_tasks if t in finished] + assert remaining_done == [], f"stale done tasks leaked: {remaining_done}" + # ...and exactly the one freshly scheduled trigger task survives. + survivors = [t for t in w._webhook_tasks if t not in finished] + assert len(w._webhook_tasks) == 1 + assert len(survivors) == 1, f"new trigger task not scheduled: {w._webhook_tasks}" + + await asyncio.gather(*survivors) # drain the dangling task