Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions packages/opal-server/opal_server/git_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,26 @@ def base_dir(base_dir: Path) -> Path:
def repo_clone_path(base_dir: Path, source: GitPolicyScopeSource) -> Path:
return GitPolicyFetcher.base_dir(base_dir) / GitPolicyFetcher.source_id(source)

@staticmethod
def forget_repo(path: str) -> None:
"""Drop the cached repository for a clone path and release its handles.

The cached ``pygit2.Repository`` keeps OS file descriptors and mmapped
pack indexes open; without this, a deleted scope's repo pins memory and
inodes for the lifetime of the process even after the clone is removed.
``Repository.free()`` is called only when available (it is not part of
every pygit2 release); otherwise the dropped reference is reclaimed by GC.
"""
repo = GitPolicyFetcher.repos.pop(path, None)
if repo is None:
return
free = getattr(repo, "free", None)
if callable(free):
try:
free()
except Exception:
logger.debug("pygit2 Repository.free() failed; relying on GC")


class GitCallback(RemoteCallbacks):
def __init__(self, source: GitPolicyScopeSource):
Expand Down
9 changes: 4 additions & 5 deletions packages/opal-server/opal_server/policy/watcher/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ async def __aexit__(self, exc_type, exc, tb):

async def _on_webhook(self, topic: Topic, data: Any):
logger.info(f"Webhook listener triggered ({len(self._webhook_tasks)})")
for task in self._webhook_tasks:
if task.done():
# Clean references to finished tasks
self._webhook_tasks.remove(task)

# Rebuild rather than remove-while-iterating: list.remove() inside a
# `for t in self._webhook_tasks` loop skips the element after each removal,
# so finished tasks accumulate.
self._webhook_tasks = [t for t in self._webhook_tasks if not t.done()]
self._webhook_tasks.append(asyncio.create_task(self.trigger(topic, data)))

async def _listen_to_webhook_notifications(self):
Expand Down
40 changes: 25 additions & 15 deletions packages/opal-server/opal_server/scopes/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,24 +162,34 @@ async def delete_scope(self, scope_id: str):
with tracer.trace("scopes_service.delete_scope", resource=scope_id):
logger.info(f"Delete scope: {scope_id}")
scope = await self._scopes.get(scope_id)
url = scope.policy.url

scopes = await self._scopes.all()
remove_repo_clone = True

for scope in scopes:
if scope.scope_id != scope_id and scope.policy.url == url:
logger.info(
f"found another scope with same remote url ({scope.scope_id}), skipping clone deletion"
)
remove_repo_clone = False
break
deleted_source = cast(GitPolicyScopeSource, scope.policy)
deleted_source_id = GitPolicyFetcher.source_id(deleted_source)
scope_dir = GitPolicyFetcher.repo_clone_path(self._base_dir, deleted_source)

# Clone dir, the `repos` handle cache, and `repos_last_fetched` are
# all keyed by source_id (= the clone path). A sibling only shares
# storage when it resolves to the same source_id; same url with a
# different branch can shard to a different source_id (and a
# different clone dir) when SCOPES_REPO_CLONES_SHARDS > 1, so gate on
# source_id, not url — otherwise the deleted scope's clone + pygit2
# handle leak.
other_scopes = [
s for s in await self._scopes.all() if s.scope_id != scope_id
]
source_id_shared = any(
isinstance(s.policy, GitPolicyScopeSource)
and GitPolicyFetcher.source_id(s.policy) == deleted_source_id
for s in other_scopes
)

if remove_repo_clone:
scope_dir = GitPolicyFetcher.repo_clone_path(
self._base_dir, cast(GitPolicyScopeSource, scope.policy)
if source_id_shared:
logger.info(
"Another scope shares the same clone (source id), skipping clone deletion"
)
else:
shutil.rmtree(scope_dir, ignore_errors=True)
GitPolicyFetcher.forget_repo(str(scope_dir))
GitPolicyFetcher.repos_last_fetched.pop(deleted_source_id, None)

await self._scopes.delete(scope_id)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import pytest
from opal_common.schemas.policy_source import GitPolicyScopeSource, NoAuthData
from opal_common.schemas.scopes import Scope
from opal_server.git_fetcher import GitPolicyFetcher
from opal_server.scopes.scope_repository import ScopeNotFoundError
from opal_server.scopes.service import ScopesService


class FakeScopeRepository:
def __init__(self, scopes):
self._scopes = {s.scope_id: s for s in scopes}

async def get(self, scope_id):
if scope_id not in self._scopes:
raise ScopeNotFoundError(scope_id)
return self._scopes[scope_id]

async def all(self):
return list(self._scopes.values())

async def delete(self, scope_id):
self._scopes.pop(scope_id, None)


def _scope(scope_id, url, branch="main"):
return Scope(
scope_id=scope_id,
policy=GitPolicyScopeSource(
source_type="git",
url=url,
branch=branch,
auth=NoAuthData(auth_type="none"),
),
data={"entries": []},
)


@pytest.fixture(autouse=True)
def clear_caches():
GitPolicyFetcher.repos.clear()
GitPolicyFetcher.repos_last_fetched.clear()
GitPolicyFetcher.repo_locks.clear()
yield
GitPolicyFetcher.repos.clear()
GitPolicyFetcher.repos_last_fetched.clear()
GitPolicyFetcher.repo_locks.clear()


@pytest.mark.asyncio
async def test_delete_unique_scope_purges_caches(tmp_path, monkeypatch):
scope = _scope("only", "https://git/repo-a.git")
repo = FakeScopeRepository([scope])
svc = ScopesService(base_dir=tmp_path, scopes=repo, pubsub_endpoint=None)

src = scope.policy
sid = GitPolicyFetcher.source_id(src)
clone_path = str(GitPolicyFetcher.repo_clone_path(tmp_path, src))
GitPolicyFetcher.repos[clone_path] = object()
GitPolicyFetcher.repos_last_fetched[sid] = "ts"

monkeypatch.setattr(
"opal_server.scopes.service.shutil.rmtree", lambda *a, **k: None
)

await svc.delete_scope("only")

assert clone_path not in GitPolicyFetcher.repos
assert sid not in GitPolicyFetcher.repos_last_fetched


@pytest.mark.asyncio
async def test_delete_keeps_caches_when_sibling_shares_source(tmp_path, monkeypatch):
a = _scope("a", "https://git/shared.git")
b = _scope("b", "https://git/shared.git") # same url+branch -> same source_id
repo = FakeScopeRepository([a, b])
svc = ScopesService(base_dir=tmp_path, scopes=repo, pubsub_endpoint=None)

sid = GitPolicyFetcher.source_id(a.policy)
clone_path = str(GitPolicyFetcher.repo_clone_path(tmp_path, a.policy))
GitPolicyFetcher.repos[clone_path] = object()
GitPolicyFetcher.repos_last_fetched[sid] = "ts"

rmtree_calls = []
monkeypatch.setattr(
"opal_server.scopes.service.shutil.rmtree",
lambda p, **k: rmtree_calls.append(p),
)

await svc.delete_scope("a")

assert rmtree_calls == [] # sibling shares the source id; clone must survive
assert clone_path in GitPolicyFetcher.repos
assert sid in GitPolicyFetcher.repos_last_fetched


@pytest.mark.asyncio
async def test_delete_purges_when_sibling_shares_url_but_not_source(
tmp_path, monkeypatch
):
"""Same url, different branch, sharded clones (SCOPES_REPO_CLONES_SHARDS>1)
resolve to different source_ids -> different clone dirs.

Deleting one must still purge its own clone + caches; the url-
sharing sibling lives elsewhere.
"""
# shards=4: branch "main" -> index 1, "dev" -> index 3 (distinct source_ids).
monkeypatch.setattr(
"opal_server.git_fetcher.opal_server_config.SCOPES_REPO_CLONES_SHARDS", 4
)
a = _scope("a", "https://git/shared.git", branch="main")
b = _scope("b", "https://git/shared.git", branch="dev") # same url, diff source_id
assert GitPolicyFetcher.source_id(a.policy) != GitPolicyFetcher.source_id(b.policy)

repo = FakeScopeRepository([a, b])
svc = ScopesService(base_dir=tmp_path, scopes=repo, pubsub_endpoint=None)

sid_a = GitPolicyFetcher.source_id(a.policy)
clone_path_a = str(GitPolicyFetcher.repo_clone_path(tmp_path, a.policy))
GitPolicyFetcher.repos[clone_path_a] = object()
GitPolicyFetcher.repos_last_fetched[sid_a] = "ts"

rmtree_calls = []
monkeypatch.setattr(
"opal_server.scopes.service.shutil.rmtree",
lambda p, **k: rmtree_calls.append(str(p)),
)

await svc.delete_scope("a")

assert rmtree_calls == [clone_path_a] # its own clone dir removed
assert clone_path_a not in GitPolicyFetcher.repos
assert sid_a not in GitPolicyFetcher.repos_last_fetched
31 changes: 31 additions & 0 deletions packages/opal-server/opal_server/tests/forget_repo_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from opal_server.git_fetcher import GitPolicyFetcher


class _FakeRepo:
def __init__(self):
self.freed = False

def free(self):
self.freed = True


def test_forget_repo_pops_and_frees(monkeypatch):
fake = _FakeRepo()
monkeypatch.setattr(GitPolicyFetcher, "repos", {"/clones/x": fake})

GitPolicyFetcher.forget_repo("/clones/x")

assert "/clones/x" not in GitPolicyFetcher.repos
assert fake.freed is True


def test_forget_repo_unknown_path_is_noop(monkeypatch):
monkeypatch.setattr(GitPolicyFetcher, "repos", {})
GitPolicyFetcher.forget_repo("/clones/missing") # must not raise
assert GitPolicyFetcher.repos == {}


def test_forget_repo_without_free_method(monkeypatch):
monkeypatch.setattr(GitPolicyFetcher, "repos", {"/clones/y": object()})
GitPolicyFetcher.forget_repo("/clones/y") # object() has no .free(); must not raise
assert "/clones/y" not in GitPolicyFetcher.repos
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import asyncio

import pytest
from opal_server.policy.watcher.task import BasePolicyWatcherTask


class _Watcher(BasePolicyWatcherTask):
async def trigger(self, topic, data):
return None # fast no-op so the created task finishes quickly


@pytest.mark.asyncio
async def test_done_tasks_are_all_removed():
w = _Watcher(pubsub_endpoint=None)

async def _done():
return None

# three already-finished tasks pre-loaded into the list
finished = [asyncio.create_task(_done()) for _ in range(3)]
await asyncio.gather(*finished)
w._webhook_tasks = list(finished)

await w._on_webhook("webhook", None)
await asyncio.sleep(0) # let the newly created trigger task finish

# all 3 done ones removed...
remaining_done = [t for t in w._webhook_tasks if t in finished]
assert remaining_done == [], f"stale done tasks leaked: {remaining_done}"
# ...and exactly the one freshly scheduled trigger task survives.
survivors = [t for t in w._webhook_tasks if t not in finished]
assert len(w._webhook_tasks) == 1
assert len(survivors) == 1, f"new trigger task not scheduled: {w._webhook_tasks}"

await asyncio.gather(*survivors) # drain the dangling task
Loading