diff --git a/datajunction-server/datajunction_server/internal/access/authorization/context.py b/datajunction-server/datajunction_server/internal/access/authorization/context.py index 5c954be62..468e6de15 100644 --- a/datajunction-server/datajunction_server/internal/access/authorization/context.py +++ b/datajunction-server/datajunction_server/internal/access/authorization/context.py @@ -46,6 +46,7 @@ class AuthContext: username: str oauth_provider: Optional[str] role_assignments: List[RoleAssignment] # Direct + groups, flattened + is_admin: bool = False @classmethod async def from_user( @@ -76,6 +77,7 @@ async def from_user( username=user.username, oauth_provider=user.oauth_provider, role_assignments=assignments, + is_admin=bool(user.is_admin), ) @classmethod diff --git a/datajunction-server/datajunction_server/internal/access/authorization/service.py b/datajunction-server/datajunction_server/internal/access/authorization/service.py index 6abd28f12..73aabf408 100644 --- a/datajunction-server/datajunction_server/internal/access/authorization/service.py +++ b/datajunction-server/datajunction_server/internal/access/authorization/service.py @@ -2,6 +2,7 @@ Authorization service implementations for access control. """ +import logging from abc import ABC, abstractmethod from datetime import datetime, timezone from functools import lru_cache @@ -22,6 +23,8 @@ get_settings, ) +logger = logging.getLogger(__name__) + settings = get_settings() @@ -121,6 +124,22 @@ def authorize( Returns: Same list of requests with approved=True/False set """ + # Break-glass: admins bypass all RBAC checks. Kept as a single explicit + # check (and logged for audit) so the bypass is easy to find and, if + # ever needed, to scope down to "admin bypasses grants but still + # respects X". + if auth_context.is_admin: + logger.info( + "Admin access bypass: user=%s (id=%s) approved %d request(s): %s", + auth_context.username, + auth_context.user_id, + len(requests), + ", ".join(str(request) for request in requests), + ) + return [ + AccessDecision(request=request, approved=True, reason="admin") + for request in requests + ] return [self._make_decision(auth_context, request) for request in requests] def _make_decision( diff --git a/datajunction-server/tests/internal/authorization_test.py b/datajunction-server/tests/internal/authorization_test.py index 5e2b61b92..f4495d922 100644 --- a/datajunction-server/tests/internal/authorization_test.py +++ b/datajunction-server/tests/internal/authorization_test.py @@ -584,6 +584,95 @@ async def test_get_authorization_service_factory(self, mocker): assert "passthrough" in str(exc_info.value).lower() +@pytest.mark.asyncio +class TestAdminBypass: + """Tests for the admin break-glass bypass in RBACAuthorizationService.""" + + async def test_admin_bypasses_restrictive_policy(self, mocker): + """An admin is approved for everything, even under restrictive policy.""" + mock_settings = mocker.patch( + "datajunction_server.internal.access.authorization.service.settings", + ) + mock_settings.default_access_policy = "restrictive" + + service = RBACAuthorizationService() + auth_context = AuthContext( + user_id=1, + username="root", + oauth_provider="basic", + role_assignments=[], + is_admin=True, + ) + requests = [ + ResourceRequest( + verb=ResourceAction.MANAGE, + access_object=Resource( + name="anything.at.all", + resource_type=ResourceType.NODE, + ), + ), + ResourceRequest( + verb=ResourceAction.WRITE, + access_object=Resource( + name="finance", + resource_type=ResourceType.NAMESPACE, + ), + ), + ] + decisions = service.authorize(auth_context, requests) + assert all(decision.approved for decision in decisions) + assert all(decision.reason == "admin" for decision in decisions) + + async def test_non_admin_denied_under_restrictive(self, mocker): + """A non-admin with no grants is denied under restrictive policy.""" + mock_settings = mocker.patch( + "datajunction_server.internal.access.authorization.service.settings", + ) + mock_settings.default_access_policy = "restrictive" + + service = RBACAuthorizationService() + auth_context = AuthContext( + user_id=2, + username="bob", + oauth_provider="basic", + role_assignments=[], + is_admin=False, + ) + requests = [ + ResourceRequest( + verb=ResourceAction.WRITE, + access_object=Resource( + name="finance.revenue", + resource_type=ResourceType.NODE, + ), + ), + ] + decisions = service.authorize(auth_context, requests) + assert decisions[0].approved is False + + async def test_auth_context_from_user_carries_is_admin( + self, + default_user: User, + session: AsyncSession, + ): + """AuthContext.from_user reflects the user's is_admin flag.""" + admin = User( + username="admin-user", + kind=PrincipalKind.USER, + oauth_provider="basic", + is_admin=True, + ) + session.add(admin) + await session.commit() + + admin = await get_user(username="admin-user", session=session) + auth_context = await AuthContext.from_user(session, admin) + assert auth_context.is_admin is True + + non_admin_context = await AuthContext.from_user(session, default_user) + assert non_admin_context.is_admin is False + + @pytest.mark.asyncio class TestGroupBasedPermissions: """Tests for group-based role assignments."""