Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions authentik/core/api/tokens.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tokens API Viewset"""

from datetime import timedelta
from typing import Any

from django.utils.timezone import now
Expand All @@ -18,12 +19,15 @@
from authentik.core.api.users import UserSerializer
from authentik.core.api.utils import ModelSerializer, PassiveSerializer
from authentik.core.models import (
USER_ATTRIBUTE_AGENT_OWNER_PK,
USER_ATTRIBUTE_TOKEN_EXPIRING,
USER_ATTRIBUTE_TOKEN_MAXIMUM_LIFETIME,
Token,
TokenIntents,
User,
UserTypes,
default_token_duration,
default_token_key,
)
from authentik.events.models import Event, EventAction
from authentik.events.utils import model_to_dict
Expand Down Expand Up @@ -171,6 +175,40 @@ def view_key(self, request: Request, identifier: str) -> Response:
Event.new(EventAction.SECRET_VIEW, secret=token).from_http(request) # noqa # nosec
return Response(TokenViewSerializer({"key": token.key}).data)

@extend_schema(
request=None,
responses={
200: TokenViewSerializer(many=False),
403: OpenApiResponse(description="Not the token owner, agent owner, or superuser"),
},
)
@action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
def rotate(self, request: Request, identifier: str) -> Response:
"""Rotate the token key and reset the expiry to 24 hours. Only callable by the token
owner, the owning agent's human owner, or a superuser."""
token = (
Token.objects.including_expired()
.select_related("user")
.filter(identifier=identifier)
.first()
)
if not token:
return Response(status=404)

if not request.user.is_superuser:
is_token_owner = token.user_id == request.user.pk
is_agent_owner = token.user.type == UserTypes.AGENT and str(
request.user.pk
) == token.user.attributes.get(USER_ATTRIBUTE_AGENT_OWNER_PK)
if not is_token_owner and not is_agent_owner:
return Response(status=403)

token.key = default_token_key()
token.expires = now() + timedelta(hours=24)
token.save()
Event.new(EventAction.SECRET_ROTATE, secret=token).from_http(request) # noqa # nosec
return Response(TokenViewSerializer({"key": token.key}).data)

@permission_required("authentik_core.set_token_key")
@extend_schema(
request=TokenSetKeySerializer(),
Expand Down
296 changes: 296 additions & 0 deletions authentik/core/api/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,13 @@
SESSION_KEY_IMPERSONATE_USER,
)
from authentik.core.models import (
USER_ATTRIBUTE_AGENT_ALLOWED_APPS,
USER_ATTRIBUTE_AGENT_OWNER_PK,
USER_ATTRIBUTE_TOKEN_EXPIRING,
USER_PATH_AGENT,
USER_PATH_SERVICE_ACCOUNT,
USERNAME_MAX_LENGTH,
Application,
Group,
Session,
Token,
Expand All @@ -88,6 +92,7 @@
)
from authentik.endpoints.connectors.agent.auth import AgentAuth
from authentik.events.models import Event, EventAction
from authentik.events.utils import model_to_dict, sanitize_dict
from authentik.flows.exceptions import FlowNonApplicableException
from authentik.flows.models import FlowToken
from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlanner
Expand Down Expand Up @@ -249,8 +254,25 @@ def validate_type(self, user_type: str) -> str:
raise ValidationError(_("Can't change internal service account to other user type."))
if not self.instance and user_type == UserTypes.INTERNAL_SERVICE_ACCOUNT.value:
raise ValidationError(_("Setting a user to internal service account is not allowed."))
if (
self.instance
and self.instance.type == UserTypes.AGENT
and user_type != UserTypes.AGENT.value
):
raise ValidationError(_("Can't change agent user type."))
return user_type

def validate_attributes(self, attrs: dict) -> dict:
"""Prevent changes to agent owner"""
if not self.instance:
return attrs
if self.instance.type == UserTypes.AGENT:
existing_owner = self.instance.attributes.get(USER_ATTRIBUTE_AGENT_OWNER_PK)
new_owner = attrs.get(USER_ATTRIBUTE_AGENT_OWNER_PK)
if existing_owner is not None and new_owner != existing_owner:
raise ValidationError(_("Can't change owner of agent user."))
return attrs

def validate(self, attrs: dict) -> dict:
if self.instance and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT:
raise ValidationError(_("Can't modify internal service account users"))
Expand Down Expand Up @@ -405,6 +427,26 @@ class UserServiceAccountSerializer(PassiveSerializer):
)


class UserAgentSerializer(PassiveSerializer):
"""Payload to create an agent user"""

name = CharField(max_length=150)
owner = PrimaryKeyRelatedField(queryset=User.objects.all(), required=False, default=None)


class UserAgentAllowedAppsSerializer(PassiveSerializer):
"""Payload to replace an agent's allowed applications"""

allowed_apps = ListField(child=UUIDField())


class UserAgentAllowedAppSerializer(PassiveSerializer):
"""Payload to add or remove a single allowed application"""

app = UUIDField()
action = ChoiceField(choices=[("add", "Add"), ("remove", "Remove")])


class UserRecoveryLinkSerializer(PassiveSerializer):
"""Payload to create a recovery link"""

Expand Down Expand Up @@ -691,6 +733,260 @@ def service_account(self, request: Request, body: UserServiceAccountSerializer)
status=500,
)

@permission_required(
None,
["authentik_core.add_user", "authentik_core.add_token", "authentik_core.add_agent_user"],
)
@extend_schema(
request=UserAgentSerializer,
responses={
200: inline_serializer(
"UserAgentResponse",
{
"username": CharField(required=True),
"token": CharField(required=True),
"user_uid": CharField(required=True),
"user_pk": IntegerField(required=True),
},
)
},
)
@action(
detail=False,
methods=["POST"],
pagination_class=None,
filter_backends=[],
)
@validate(UserAgentSerializer)
def agent(self, request: Request, body: UserAgentSerializer) -> Response:
"""Create a new agent user. Enterprise only. Caller must be an internal user."""
from authentik.enterprise.license import LicenseKey

if not LicenseKey.cached_summary().status.is_valid:
raise ValidationError(_("Enterprise is required to use this endpoint."))

if request.user.type != UserTypes.INTERNAL:
raise ValidationError(_("Only internal users can create agent users."))

requested_owner = body.validated_data.get("owner")
if requested_owner and not request.user.is_superuser:
if requested_owner.pk != request.user.pk:
raise ValidationError(
_("Non-superusers can only create agents owned by themselves.")
)
owner = requested_owner or request.user

username = body.validated_data["name"]
with atomic():
try:
user: User = User.objects.create(
username=username,
name=username,
type=UserTypes.AGENT,
attributes={
USER_ATTRIBUTE_AGENT_OWNER_PK: str(owner.pk),
USER_ATTRIBUTE_AGENT_ALLOWED_APPS: [],
},
path=USER_PATH_AGENT,
)
user.set_unusable_password()
user.save()

token = Token.objects.create(
identifier=slugify(f"agent-{username}-token"),
intent=TokenIntents.INTENT_API,
user=user,
expires=now() + timedelta(hours=24),
expiring=True,
)
user.assign_perms_to_managed_role("authentik_core.view_token_key", token)

owner.assign_perms_to_managed_role("authentik_core.view_user", user)
owner.assign_perms_to_managed_role("authentik_core.change_user", user)
owner.assign_perms_to_managed_role("authentik_core.delete_user", user)
owner.assign_perms_to_managed_role("authentik_core.view_user_applications", user)

Event.new(
EventAction.MODEL_CREATED,
model=sanitize_dict(model_to_dict(user)),
agent_owner=sanitize_dict(model_to_dict(owner)),
).from_http(request)

return Response(
{
"username": user.username,
"user_uid": user.uid,
"user_pk": user.pk,
"token": token.key,
}
)
except IntegrityError as exc:
error_msg = str(exc).lower()
if "unique" in error_msg:
return Response(
data={"non_field_errors": [_("A user with this username already exists")]},
status=400,
)
else:
LOGGER.warning("Agent user creation failed", exc=exc)
return Response(
data={"non_field_errors": [_("Unable to create user")]},
status=400,
)
except (ValueError, TypeError) as exc:
LOGGER.error("Unexpected error during agent user creation", exc=exc)
return Response(
data={"non_field_errors": [_("Unknown error occurred")]},
status=500,
)

@extend_schema(
request=UserAgentAllowedAppsSerializer,
responses={
200: UserAgentAllowedAppsSerializer,
400: OpenApiResponse(description="Invalid app UUIDs or owner lacks access"),
403: OpenApiResponse(description="Not the agent's owner or superuser"),
},
)
@action(
detail=True,
methods=["PUT"],
url_path="agent_allowed_apps",
url_name="agent-allowed-apps",
pagination_class=None,
filter_backends=[],
)
@validate(UserAgentAllowedAppsSerializer)
def agent_allowed_apps(
self, request: Request, pk: int, body: UserAgentAllowedAppsSerializer
) -> Response:
"""Replace the allowed application list for an agent user.
Caller must be the agent's owner or a superuser."""
from authentik.core.apps import AppAccessWithoutBindings
from authentik.policies.engine import PolicyEngine

agent, owner = self._get_agent_and_owner(request)

app_uuids = body.validated_data["allowed_apps"]
errors = []
for app_uuid in app_uuids:
try:
app = Application.objects.get(pk=app_uuid)
except Application.DoesNotExist:
errors.append(str(app_uuid))
continue
engine = PolicyEngine(app, owner, request)
engine.empty_result = AppAccessWithoutBindings.get()
engine.use_cache = False
engine.build()
if not engine.passing:
errors.append(str(app_uuid))

if errors:
return Response(
data={
"allowed_apps": [
_(
"Owner does not have access to application %(uuid)s "
"or application does not exist."
)
% {"uuid": uuid}
for uuid in errors
]
},
status=400,
)

agent.attributes[USER_ATTRIBUTE_AGENT_ALLOWED_APPS] = [str(u) for u in app_uuids]
agent.save(update_fields=["attributes"])
return Response({"allowed_apps": [str(u) for u in app_uuids]})

@extend_schema(
request=UserAgentAllowedAppSerializer,
responses={
200: UserAgentAllowedAppsSerializer,
204: OpenApiResponse(description="Application removed"),
400: OpenApiResponse(description="Invalid app UUID or owner lacks access"),
403: OpenApiResponse(description="Not the agent's owner or superuser"),
},
)
@action(
detail=True,
methods=["PATCH"],
url_path="agent_allowed_app",
url_name="agent-allowed-app",
pagination_class=None,
filter_backends=[],
)
@validate(UserAgentAllowedAppSerializer)
def agent_allowed_app(
self, request: Request, pk: int, body: UserAgentAllowedAppSerializer
) -> Response:
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
"""Add or remove a single application from an agent's allowed list.
Caller must be the agent's owner or a superuser."""
from authentik.core.apps import AppAccessWithoutBindings
from authentik.policies.engine import PolicyEngine

agent, owner = self._get_agent_and_owner(request)

app_uuid = str(body.validated_data["app"])
action = body.validated_data["action"]
current = agent.attributes.get(USER_ATTRIBUTE_AGENT_ALLOWED_APPS, [])

if action == "add":
try:
app = Application.objects.get(pk=app_uuid)
except Application.DoesNotExist:
return Response(
data={"app": [_("Application does not exist.")]},
status=400,
)
engine = PolicyEngine(app, owner, request)
engine.empty_result = AppAccessWithoutBindings.get()
engine.use_cache = False
engine.build()
if not engine.passing:
return Response(
data={"app": [_("Owner does not have access to this application.")]},
status=400,
)
if app_uuid not in current:
current.append(app_uuid)
agent.attributes[USER_ATTRIBUTE_AGENT_ALLOWED_APPS] = current
agent.save(update_fields=["attributes"])
return Response({"allowed_apps": current})

if action == "remove":
if app_uuid in current:
current.remove(app_uuid)
agent.attributes[USER_ATTRIBUTE_AGENT_ALLOWED_APPS] = current
agent.save(update_fields=["attributes"])
return Response(status=204)

return Response(
data={"action": [_("Invalid action.")]},
status=400,
)

def _get_agent_and_owner(self, request: Request) -> tuple[User, User]:
"""Validate that the target is an agent and the caller is authorized."""
agent: User = self.get_object()

if agent.type != UserTypes.AGENT:
raise ValidationError(_("User is not an agent user."))

owner_pk = agent.attributes.get(USER_ATTRIBUTE_AGENT_OWNER_PK)
is_owner = str(request.user.pk) == owner_pk
if not request.user.is_superuser and not is_owner:
raise ValidationError(_("Not the agent's owner or superuser."))

try:
owner = User.objects.get(pk=owner_pk)
except User.DoesNotExist as exc:
raise ValidationError(_("Agent owner not found.")) from exc

return agent, owner

@extend_schema(responses={200: SessionUserSerializer(many=False)})
@action(
url_path="me",
Expand Down
Loading
Loading