Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class AuthContext:
username: str
oauth_provider: Optional[str]
role_assignments: List[RoleAssignment] # Direct + groups, flattened
is_admin: bool = False

@classmethod
async def from_user(
Expand Down Expand Up @@ -76,6 +77,7 @@ async def from_user(
username=user.username,
oauth_provider=user.oauth_provider,
role_assignments=assignments,
is_admin=bool(user.is_admin),
)

@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Authorization service implementations for access control.
"""

import logging
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from functools import lru_cache
Expand All @@ -22,6 +23,8 @@
get_settings,
)

logger = logging.getLogger(__name__)

settings = get_settings()


Expand Down Expand Up @@ -121,6 +124,22 @@ def authorize(
Returns:
Same list of requests with approved=True/False set
"""
# Break-glass: admins bypass all RBAC checks. Kept as a single explicit
# check (and logged for audit) so the bypass is easy to find and, if
# ever needed, to scope down to "admin bypasses grants but still
# respects X".
if auth_context.is_admin:
logger.info(
"Admin access bypass: user=%s (id=%s) approved %d request(s): %s",
auth_context.username,
auth_context.user_id,
len(requests),
", ".join(str(request) for request in requests),
)
return [
AccessDecision(request=request, approved=True, reason="admin")
for request in requests
]
return [self._make_decision(auth_context, request) for request in requests]

def _make_decision(
Expand Down
89 changes: 89 additions & 0 deletions datajunction-server/tests/internal/authorization_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,95 @@ async def test_get_authorization_service_factory(self, mocker):
assert "passthrough" in str(exc_info.value).lower()


@pytest.mark.asyncio
class TestAdminBypass:
"""Tests for the admin break-glass bypass in RBACAuthorizationService."""

async def test_admin_bypasses_restrictive_policy(self, mocker):
"""An admin is approved for everything, even under restrictive policy."""
mock_settings = mocker.patch(
"datajunction_server.internal.access.authorization.service.settings",
)
mock_settings.default_access_policy = "restrictive"

service = RBACAuthorizationService()
auth_context = AuthContext(
user_id=1,
username="root",
oauth_provider="basic",
role_assignments=[],
is_admin=True,
)
requests = [
ResourceRequest(
verb=ResourceAction.MANAGE,
access_object=Resource(
name="anything.at.all",
resource_type=ResourceType.NODE,
),
),
ResourceRequest(
verb=ResourceAction.WRITE,
access_object=Resource(
name="finance",
resource_type=ResourceType.NAMESPACE,
),
),
]
decisions = service.authorize(auth_context, requests)
assert all(decision.approved for decision in decisions)
assert all(decision.reason == "admin" for decision in decisions)

async def test_non_admin_denied_under_restrictive(self, mocker):
"""A non-admin with no grants is denied under restrictive policy."""
mock_settings = mocker.patch(
"datajunction_server.internal.access.authorization.service.settings",
)
mock_settings.default_access_policy = "restrictive"

service = RBACAuthorizationService()
auth_context = AuthContext(
user_id=2,
username="bob",
oauth_provider="basic",
role_assignments=[],
is_admin=False,
)
requests = [
ResourceRequest(
verb=ResourceAction.WRITE,
access_object=Resource(
name="finance.revenue",
resource_type=ResourceType.NODE,
),
),
]
decisions = service.authorize(auth_context, requests)
assert decisions[0].approved is False

async def test_auth_context_from_user_carries_is_admin(
self,
default_user: User,
session: AsyncSession,
):
"""AuthContext.from_user reflects the user's is_admin flag."""
admin = User(
username="admin-user",
kind=PrincipalKind.USER,
oauth_provider="basic",
is_admin=True,
)
session.add(admin)
await session.commit()

admin = await get_user(username="admin-user", session=session)
auth_context = await AuthContext.from_user(session, admin)
assert auth_context.is_admin is True

non_admin_context = await AuthContext.from_user(session, default_user)
assert non_admin_context.is_admin is False


@pytest.mark.asyncio
class TestGroupBasedPermissions:
"""Tests for group-based role assignments."""
Expand Down
Loading