Skip to content

Enterprise/agent service accounts#21584

Draft
fheisler wants to merge 3 commits intomainfrom
enterprise/agent-service-accounts
Draft

Enterprise/agent service accounts#21584
fheisler wants to merge 3 commits intomainfrom
enterprise/agent-service-accounts

Conversation

@fheisler
Copy link
Copy Markdown
Member

WIP agent-type service accounts based on attributes

@netlify
Copy link
Copy Markdown

netlify bot commented Apr 13, 2026

Deploy Preview for authentik-storybook ready!

Name Link
🔨 Latest commit 20389cc
🔍 Latest deploy log https://app.netlify.com/projects/authentik-storybook/deploys/69dd489602c7f50008f8103c
😎 Deploy Preview https://deploy-preview-21584--authentik-storybook.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@netlify
Copy link
Copy Markdown

netlify bot commented Apr 13, 2026

Deploy Preview for authentik-integrations ready!

Name Link
🔨 Latest commit 20389cc
🔍 Latest deploy log https://app.netlify.com/projects/authentik-integrations/deploys/69dd48964ccc0400082189b3
😎 Deploy Preview https://deploy-preview-21584--authentik-integrations.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@netlify
Copy link
Copy Markdown

netlify bot commented Apr 13, 2026

Deploy Preview for authentik-docs ready!

Name Link
🔨 Latest commit 20389cc
🔍 Latest deploy log https://app.netlify.com/projects/authentik-docs/deploys/69dd4896d3578f0008c6e8f8
😎 Deploy Preview https://deploy-preview-21584--authentik-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@codecov
Copy link
Copy Markdown

codecov bot commented Apr 13, 2026

❌ 6 Tests Failed:

Tests completed Failed Passed Skipped
3197 6 3191 1
View the top 3 failed test(s) by shortest run time
authentik.enterprise.tests.test_license.TestEnterpriseLicense::test_expiry_expired
Stack Traces | 0.004s run time
self = <unittest.case._Outcome object at 0x7f956859f1c0>
test_case = <authentik.enterprise.tests.test_license.TestEnterpriseLicense testMethod=test_expiry_expired>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.14.3............/x64/lib/python3.14/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.enterprise.tests.test_license.TestEnterpriseLicense testMethod=test_expiry_expired>
result = <TestCaseFunction test_expiry_expired>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.14.3............/x64/lib/python3.14/unittest/case.py:669: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.enterprise.tests.test_license.TestEnterpriseLicense testMethod=test_expiry_expired>
method = <bound method TestEnterpriseLicense.test_expiry_expired of <authentik.enterprise.tests.test_license.TestEnterpriseLicense testMethod=test_expiry_expired>>

    def _callTestMethod(self, method):
>       result = method()
                 ^^^^^^^^

.../hostedtoolcache/Python/3.14.3............/x64/lib/python3.14/unittest/case.py:615: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<authentik.enterprise.tests.test_license.TestEnterpriseLicense testMethod=test_expiry_expired>,)
keywargs = {}
newargs = (<authentik.enterprise.tests.test_license.TestEnterpriseLicense testMethod=test_expiry_expired>,)
newkeywargs = {}

    @wraps(func)
    def patched(*args, **keywargs):
        with self.decoration_helper(patched,
                                    args,
                                    keywargs) as (newargs, newkeywargs):
>           return func(*newargs, **newkeywargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.../hostedtoolcache/Python/3.14.3............/x64/lib/python3.14/unittest/mock.py:1432: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.enterprise.tests.test_license.TestEnterpriseLicense testMethod=test_expiry_expired>

    @patch(
        "authentik.enterprise.license.LicenseKey.validate",
        MagicMock(
            return_value=LicenseKey(
                aud="",
                exp=expiry_expired,
                name=generate_id(),
                internal_users=100,
                external_users=100,
            )
        ),
    )
    @patch(
        "authentik.enterprise.license.LicenseKey.record_usage",
        MagicMock(),
    )
    def test_expiry_expired(self):
        """Check license verification"""
>       User.objects.all().delete()
        ^^^^
E       NameError: name 'User' is not defined

.../enterprise/tests/test_license.py:235: NameError
authentik.core.tests.test_users.TestAgentUserSignals::test_deactivate_owner_clears_agent_sessions
Stack Traces | 0.033s run time
self = <unittest.case._Outcome object at 0x7f577c0c4360>
test_case = <authentik.core.tests.test_users.TestAgentUserSignals testMethod=test_deactivate_owner_clears_agent_sessions>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.14.3........./x64/lib/python3.14/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.core.tests.test_users.TestAgentUserSignals testMethod=test_deactivate_owner_clears_agent_sessions>
result = <TestCaseFunction test_deactivate_owner_clears_agent_sessions>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.14.3........./x64/lib/python3.14/unittest/case.py:669: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.core.tests.test_users.TestAgentUserSignals testMethod=test_deactivate_owner_clears_agent_sessions>
method = <bound method TestAgentUserSignals.test_deactivate_owner_clears_agent_sessions of <authentik.core.tests.test_users.TestAgentUserSignals testMethod=test_deactivate_owner_clears_agent_sessions>>

    def _callTestMethod(self, method):
>       result = method()
                 ^^^^^^^^

.../hostedtoolcache/Python/3.14.3........./x64/lib/python3.14/unittest/case.py:615: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.core.tests.test_users.TestAgentUserSignals testMethod=test_deactivate_owner_clears_agent_sessions>

    def test_deactivate_owner_clears_agent_sessions(self):
        """Deactivating an owner removes authenticated sessions for their agents"""
        owner = self._create_owner()
        agent = self._create_agent(owner)
>       session = Session.objects.create(session_key=generate_id(), session_data="{}")
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.../core/tests/test_users.py:126: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.core.models.ExpiringManager object at 0x7f57a49f4a50>
args = ()
kwargs = {'session_data': '{}', 'session_key': 'Nf9SQoEMztYhhT1uXpnGJ5L4ryGOvpDr7nCi1qhu'}

    @wraps(method)
    def manager_method(self, *args, **kwargs):
>       return getattr(self.get_queryset(), name)(*args, **kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../db/models/manager.py:87: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <QuerySet []>
kwargs = {'session_data': '{}', 'session_key': 'Nf9SQoEMztYhhT1uXpnGJ5L4ryGOvpDr7nCi1qhu'}
reverse_one_to_one_fields = frozenset()
obj = <Session: Nf9SQoEMztYhhT1uXpnGJ5L4ryGOvpDr7nCi1qhu>

    def create(self, **kwargs):
        """
        Create a new object with the given kwargs, saving it to the database
        and returning the created object.
        """
        reverse_one_to_one_fields = frozenset(kwargs).intersection(
            self.model._meta._reverse_one_to_one_field_names
        )
        if reverse_one_to_one_fields:
            raise ValueError(
                "The following fields do not exist in this model: %s"
                % ", ".join(reverse_one_to_one_fields)
            )
    
        obj = self.model(**kwargs)
        self._for_write = True
>       obj.save(force_insert=True, using=self.db)

.venv/lib/python3.14.../db/models/query.py:665: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Session: Nf9SQoEMztYhhT1uXpnGJ5L4ryGOvpDr7nCi1qhu>, force_insert = True
force_update = False, using = 'default', update_fields = None

    def save(
        self,
        *args,
        force_insert=False,
        force_update=False,
        using=None,
        update_fields=None,
    ):
        """
        Save the current instance. Override this in a subclass if you want to
        control the saving process.
    
        The 'force_insert' and 'force_update' parameters can be used to insist
        that the "save" must be an SQL insert or update (or equivalent for
        non-SQL backends), respectively. Normally, they should not be set.
        """
        # RemovedInDjango60Warning.
        if args:
            force_insert, force_update, using, update_fields = self._parse_save_params(
                *args,
                method_name="save",
                force_insert=force_insert,
                force_update=force_update,
                using=using,
                update_fields=update_fields,
            )
    
        self._prepare_related_fields_for_save(operation_name="save")
    
        using = using or router.db_for_write(self.__class__, instance=self)
        if force_insert and (force_update or update_fields):
            raise ValueError("Cannot force both insert and updating in model saving.")
    
        deferred_non_generated_fields = {
            f.attname
            for f in self._meta.concrete_fields
            if f.attname not in self.__dict__ and f.generated is False
        }
        if update_fields is not None:
            # If update_fields is empty, skip the save. We do also check for
            # no-op saves later on for inheritance cases. This bailout is
            # still needed for skipping signal sending.
            if not update_fields:
                return
    
            update_fields = frozenset(update_fields)
            field_names = self._meta._non_pk_concrete_field_names
            not_updatable_fields = update_fields.difference(field_names)
    
            if not_updatable_fields:
                raise ValueError(
                    "The following fields do not exist in this model, are m2m "
                    "fields, primary keys, or are non-concrete fields: %s"
                    % ", ".join(not_updatable_fields)
                )
    
        # If saving to the same database, and this model is deferred, then
        # automatically do an "update_fields" save on the loaded fields.
        elif (
            not force_insert
            and deferred_non_generated_fields
            and using == self._state.db
        ):
            field_names = set()
            pk_fields = self._meta.pk_fields
            for field in self._meta.concrete_fields:
                if field not in pk_fields and not hasattr(field, "through"):
                    field_names.add(field.attname)
            loaded_fields = field_names.difference(deferred_non_generated_fields)
            if loaded_fields:
                update_fields = frozenset(loaded_fields)
    
>       self.save_base(
            using=using,
            force_insert=force_insert,
            force_update=force_update,
            update_fields=update_fields,
        )

.venv/lib/python3.14.../db/models/base.py:902: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Session: Nf9SQoEMztYhhT1uXpnGJ5L4ryGOvpDr7nCi1qhu>, raw = False
force_insert = (<class 'authentik.core.models.Session'>,), force_update = False
using = 'default', update_fields = None

    def save_base(
        self,
        raw=False,
        force_insert=False,
        force_update=False,
        using=None,
        update_fields=None,
    ):
        """
        Handle the parts of saving which should be done only once per save,
        yet need to be done in raw saves, too. This includes some sanity
        checks and signal sending.
    
        The 'raw' argument is telling save_base not to save any parent
        models and not to do any changes to the values before save. This
        is used by fixture loading.
        """
        using = using or router.db_for_write(self.__class__, instance=self)
        assert not (force_insert and (force_update or update_fields))
        assert update_fields is None or update_fields
        cls = origin = self.__class__
        # Skip proxies, but keep the origin as the proxy model.
        if cls._meta.proxy:
            cls = cls._meta.concrete_model
        meta = cls._meta
        if not meta.auto_created:
            pre_save.send(
                sender=origin,
                instance=self,
                raw=raw,
                using=using,
                update_fields=update_fields,
            )
        # A transaction isn't needed if one query is issued.
        if meta.parents:
            context_manager = transaction.atomic(using=using, savepoint=False)
        else:
            context_manager = transaction.mark_for_rollback_on_error(using=using)
        with context_manager:
            parent_inserted = False
            if not raw:
                # Validate force insert only when parents are inserted.
                force_insert = self._validate_force_insert(force_insert)
                parent_inserted = self._save_parents(
                    cls, using, update_fields, force_insert
                )
>           updated = self._save_table(
                raw,
                cls,
                force_insert or parent_inserted,
                force_update,
                using,
                update_fields,
            )

.venv/lib/python3.14.../db/models/base.py:1008: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Session: Nf9SQoEMztYhhT1uXpnGJ5L4ryGOvpDr7nCi1qhu>, raw = False
cls = <class 'authentik.core.models.Session'>
force_insert = (<class 'authentik.core.models.Session'>,), force_update = False
using = 'default', update_fields = None

    def _save_table(
        self,
        raw=False,
        cls=None,
        force_insert=False,
        force_update=False,
        using=None,
        update_fields=None,
    ):
        """
        Do the heavy-lifting involved in saving. Update or insert the data
        for a single table.
        """
        meta = cls._meta
        pk_fields = meta.pk_fields
        non_pks_non_generated = [
            f
            for f in meta.local_concrete_fields
            if f not in pk_fields and not f.generated
        ]
    
        if update_fields:
            non_pks_non_generated = [
                f
                for f in non_pks_non_generated
                if f.name in update_fields or f.attname in update_fields
            ]
    
        if not self._is_pk_set(meta):
            pk_val = meta.pk.get_pk_value_on_save(self)
            setattr(self, meta.pk.attname, pk_val)
        pk_set = self._is_pk_set(meta)
        if not pk_set and (force_update or update_fields):
            raise ValueError("Cannot force an update in save() with no primary key.")
        updated = False
        # Skip an UPDATE when adding an instance and primary key has a default.
        if (
            not raw
            and not force_insert
            and not force_update
            and self._state.adding
            and all(f.has_default() or f.has_db_default() for f in meta.pk_fields)
        ):
            force_insert = True
        # If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
        if pk_set and not force_insert:
            base_qs = cls._base_manager.using(using)
            values = [
                (
                    f,
                    None,
                    (getattr(self, f.attname) if raw else f.pre_save(self, False)),
                )
                for f in non_pks_non_generated
            ]
            forced_update = update_fields or force_update
            pk_val = self._get_pk_val(meta)
            updated = self._do_update(
                base_qs, using, pk_val, values, update_fields, forced_update
            )
            if force_update and not updated:
                raise DatabaseError("Forced update did not affect any rows.")
            if update_fields and not updated:
                raise DatabaseError("Save with update_fields did not affect any rows.")
        if not updated:
            if meta.order_with_respect_to:
                # If this is a model with an order_with_respect_to
                # autopopulate the _order field
                field = meta.order_with_respect_to
                filter_args = field.get_filter_kwargs_for_object(self)
                self._order = (
                    cls._base_manager.using(using)
                    .filter(**filter_args)
                    .aggregate(
                        _order__max=Coalesce(
                            ExpressionWrapper(
                                Max("_order") + Value(1), output_field=IntegerField()
                            ),
                            Value(0),
                        ),
                    )["_order__max"]
                )
            fields = [
                f
                for f in meta.local_concrete_fields
                if not f.generated and (pk_set or f is not meta.auto_field)
            ]
            returning_fields = meta.db_returning_fields
>           results = self._do_insert(
                cls._base_manager, using, fields, returning_fields, raw
            )

.venv/lib/python3.14.../db/models/base.py:1169: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Session: Nf9SQoEMztYhhT1uXpnGJ5L4ryGOvpDr7nCi1qhu>
manager = <django.db.models.manager.Manager object at 0x7f57a49f4b50>
using = 'default'
fields = [<django.db.models.fields.CharField: session_key>, <django.db.models.fields.DateTimeField: expires>, <django.db.models...>, <django.db.models.fields.GenericIPAddressField: last_ip>, <django.db.models.fields.TextField: last_user_agent>, ...]
returning_fields = [], raw = False

    def _do_insert(self, manager, using, fields, returning_fields, raw):
        """
        Do an INSERT. If returning_fields is defined then this method should
        return the newly created data for the model.
        """
>       return manager._insert(
            [self],
            fields=fields,
            returning_fields=returning_fields,
            using=using,
            raw=raw,
        )

.venv/lib/python3.14.../db/models/base.py:1210: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.models.manager.Manager object at 0x7f57a49f4b50>
args = ([<Session: Nf9SQoEMztYhhT1uXpnGJ5L4ryGOvpDr7nCi1qhu>],)
kwargs = {'fields': [<django.db.models.fields.CharField: session_key>, <django.db.models.fields.DateTimeField: expires>, <djang..., <django.db.models.fields.TextField: last_user_agent>, ...], 'raw': False, 'returning_fields': [], 'using': 'default'}

    @wraps(method)
    def manager_method(self, *args, **kwargs):
>       return getattr(self.get_queryset(), name)(*args, **kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../db/models/manager.py:87: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <QuerySet []>
objs = [<Session: Nf9SQoEMztYhhT1uXpnGJ5L4ryGOvpDr7nCi1qhu>]
fields = [<django.db.models.fields.CharField: session_key>, <django.db.models.fields.DateTimeField: expires>, <django.db.models...>, <django.db.models.fields.GenericIPAddressField: last_ip>, <django.db.models.fields.TextField: last_user_agent>, ...]
returning_fields = [], raw = False, using = 'default', on_conflict = None
update_fields = None, unique_fields = None

    def _insert(
        self,
        objs,
        fields,
        returning_fields=None,
        raw=False,
        using=None,
        on_conflict=None,
        update_fields=None,
        unique_fields=None,
    ):
        """
        Insert a new record for the given model. This provides an interface to
        the InsertQuery class and is how Model.save() is implemented.
        """
        self._for_write = True
        if using is None:
            using = self.db
        query = sql.InsertQuery(
            self.model,
            on_conflict=on_conflict,
            update_fields=update_fields,
            unique_fields=unique_fields,
        )
        query.insert_values(fields, objs, raw=raw)
>       return query.get_compiler(using=using).execute_sql(returning_fields)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../db/models/query.py:1873: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <SQLInsertCompiler model=Session connection=<DatabaseWrapper vendor='postgresql' alias='default'> using='default'>
returning_fields = []

    def execute_sql(self, returning_fields=None):
        assert not (
            returning_fields
            and len(self.query.objs) != 1
            and not self.connection.features.can_return_rows_from_bulk_insert
        )
        opts = self.query.get_meta()
        self.returning_fields = returning_fields
        cols = []
        with self.connection.cursor() as cursor:
            for sql, params in self.as_sql():
>               cursor.execute(sql, params)

.venv/lib/python3.14.../models/sql/compiler.py:1882: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<django.db.backends.utils.CursorWrapper object at 0x7f577b19f950>, 'INSERT INTO "authentik_core_session" ("session_ke...', datetime.datetime(2026, 4, 13, 20, 34, 20, 87902, tzinfo=datetime.timezone.utc), True, Binary('{}'), None, '', ...))
kwargs = {}

    def runner(*args: "P.args", **kwargs: "P.kwargs") -> "R":
        if sentry_sdk.get_client().get_integration(integration) is None:
            return original_function(*args, **kwargs)
    
>       return sentry_patched_function(*args, **kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../site-packages/sentry_sdk/utils.py:1900: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f577b19f950>
sql = 'INSERT INTO "authentik_core_session" ("session_key", "expires", "expiring", "session_data", "last_ip", "last_user_agent", "last_used") VALUES (%s, %s, %s, %s, %s, %s, %s)'
params = ('Nf9SQoEMztYhhT1uXpnGJ5L4ryGOvpDr7nCi1qhu', datetime.datetime(2026, 4, 13, 20, 34, 20, 87902, tzinfo=datetime.timezone.utc), True, Binary('{}'), None, '', ...)

    @ensure_integration_enabled(DjangoIntegration, real_execute)
    def execute(
        self: "CursorWrapper", sql: "Any", params: "Optional[Any]" = None
    ) -> "Any":
        with record_sql_queries(
            cursor=self.cursor,
            query=sql,
            params_list=params,
            paramstyle="format",
            executemany=False,
            span_origin=DjangoIntegration.origin_db,
        ) as span:
            _set_db_data(span, self)
>           result = real_execute(self, sql, params)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../integrations/django/__init__.py:645: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f577b19f950>
sql = 'INSERT INTO "authentik_core_session" ("session_key", "expires", "expiring", "session_data", "last_ip", "last_user_agent", "last_used") VALUES (%s, %s, %s, %s, %s, %s, %s)'
params = ('Nf9SQoEMztYhhT1uXpnGJ5L4ryGOvpDr7nCi1qhu', datetime.datetime(2026, 4, 13, 20, 34, 20, 87902, tzinfo=datetime.timezone.utc), True, Binary('{}'), None, '', ...)

    def execute(self, sql, params=None):
>       return self._execute_with_wrappers(
            sql, params, many=False, executor=self._execute
        )

.venv/lib/python3.14.../db/backends/utils.py:79: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f577b19f950>
sql = 'INSERT INTO "authentik_core_session" ("session_key", "expires", "expiring", "session_data", "last_ip", "last_user_agent", "last_used") VALUES (%s, %s, %s, %s, %s, %s, %s)'
params = ('Nf9SQoEMztYhhT1uXpnGJ5L4ryGOvpDr7nCi1qhu', datetime.datetime(2026, 4, 13, 20, 34, 20, 87902, tzinfo=datetime.timezone.utc), True, Binary('{}'), None, '', ...)
many = False
executor = <bound method CursorWrapper._execute of <django.db.backends.utils.CursorWrapper object at 0x7f577b19f950>>

    def _execute_with_wrappers(self, sql, params, many, executor):
        context = {"connection": self.db, "cursor": self}
        for wrapper in reversed(self.db.execute_wrappers):
            executor = functools.partial(wrapper, executor)
>       return executor(sql, params, many, context)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../db/backends/utils.py:92: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f577b19f950>
sql = 'INSERT INTO "authentik_core_session" ("session_key", "expires", "expiring", "session_data", "last_ip", "last_user_agent", "last_used") VALUES (%s, %s, %s, %s, %s, %s, %s)'
params = ('Nf9SQoEMztYhhT1uXpnGJ5L4ryGOvpDr7nCi1qhu', datetime.datetime(2026, 4, 13, 20, 34, 20, 87902, tzinfo=datetime.timezone.utc), True, Binary('{}'), None, '', ...)
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7f577b19f950>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
                return self.cursor.execute(sql)
            else:
>               return self.cursor.execute(sql, params)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../db/backends/utils.py:105: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [INTRANS] (host=localhost database=authentik) at 0x7f577b98aa90>
args = ('INSERT INTO "authentik_core_session" ("session_key", "expires", "expiring", "session_data", "last_ip", "last_user_ag...', datetime.datetime(2026, 4, 13, 20, 34, 20, 87902, tzinfo=datetime.timezone.utc), True, Binary('{}'), None, '', ...))
kwargs = {}

    def execute(self, *args, **kwargs):
        execute_total.labels(alias, vendor).inc()
        with (
            query_duration_seconds.labels(**labels).time(),
            ExceptionCounterByType(errors_total, extra_labels=labels),
        ):
>           return super().execute(*args, **kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../django_prometheus/db/common.py:69: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [INTRANS] (host=localhost database=authentik) at 0x7f577b98aa90>
query = 'INSERT INTO "authentik_core_session" ("session_key", "expires", "expiring", "session_data", "last_ip", "last_user_agent", "last_used") VALUES (%s, %s, %s, %s, %s, %s, %s)'
params = ('Nf9SQoEMztYhhT1uXpnGJ5L4ryGOvpDr7nCi1qhu', datetime.datetime(2026, 4, 13, 20, 34, 20, 87902, tzinfo=datetime.timezone.utc), True, Binary('{}'), None, '', ...)

    def execute(
        self,
        query: Query,
        params: Params | None = None,
        *,
        prepare: bool | None = None,
        binary: bool | None = None,
    ) -> Self:
        """
        Execute a query or command to the database.
        """
        try:
            with self._conn.lock:
>               self._conn.wait(
                    self._execute_gen(query, params, prepare=prepare, binary=binary)
                )

.venv/lib/python3.14.../site-packages/psycopg/cursor.py:113: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <psycopg.Connection [INTRANS] (host=localhost database=authentik) at 0x7f5792a8b200>
gen = <generator object BaseCursor._execute_gen at 0x7f5792a04040>
interval = 0.1

    def wait(self, gen: PQGen[RV], interval: float = _WAIT_INTERVAL) -> RV:
        """
        Consume a generator operating on the connection.
    
        The function must be used on generators that don't change connection
        fd (i.e. not on connect and reset).
        """
        try:
>           return waiting.wait(gen, self.pgconn.socket, interval=interval)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../site-packages/psycopg/connection.py:484: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   ???

psycopg_c/_psycopg/waiting.pyx:219: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [INTRANS] (host=localhost database=authentik) at 0x7f577b98aa90>
query = 'INSERT INTO "authentik_core_session" ("session_key", "expires", "expiring", "session_data", "last_ip", "last_user_agent", "last_used") VALUES (%s, %s, %s, %s, %s, %s, %s)'
params = ('Nf9SQoEMztYhhT1uXpnGJ5L4ryGOvpDr7nCi1qhu', datetime.datetime(2026, 4, 13, 20, 34, 20, 87902, tzinfo=datetime.timezone.utc), True, Binary('{}'), None, '', ...)

    def _execute_gen(
        self,
        query: Query,
        params: Params | None = None,
        *,
        prepare: bool | None = None,
        binary: bool | None = None,
    ) -> PQGen[None]:
        """Generator implementing `Cursor.execute()`."""
        yield from self._start_query(query)
>       pgq = self._convert_query(query, params)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14....../site-packages/psycopg/_cursor_base.py:205: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [INTRANS] (host=localhost database=authentik) at 0x7f577b98aa90>
query = 'INSERT INTO "authentik_core_session" ("session_key", "expires", "expiring", "session_data", "last_ip", "last_user_agent", "last_used") VALUES (%s, %s, %s, %s, %s, %s, %s)'
params = ('Nf9SQoEMztYhhT1uXpnGJ5L4ryGOvpDr7nCi1qhu', datetime.datetime(2026, 4, 13, 20, 34, 20, 87902, tzinfo=datetime.timezone.utc), True, Binary('{}'), None, '', ...)

    def _convert_query(
        self, query: Query, params: Params | None = None
    ) -> PostgresQuery:
        pgq = self._query_cls(self._tx)
>       pgq.convert(query, params)

.venv/lib/python3.14....../site-packages/psycopg/_cursor_base.py:465: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <psycopg._queries.PostgresClientQuery object at 0x7f5792f5ef80>
query = b'INSERT INTO "authentik_core_session" ("session_key", "expires", "expiring", "session_data", "last_ip", "last_user_agent", "last_used") VALUES (%s, %s, %s, %s, %s, %s, %s)'
vars = ('Nf9SQoEMztYhhT1uXpnGJ5L4ryGOvpDr7nCi1qhu', datetime.datetime(2026, 4, 13, 20, 34, 20, 87902, tzinfo=datetime.timezone.utc), True, Binary('{}'), None, '', ...)

    def convert(self, query: Query, vars: Params | None) -> None:
        """
        Set up the query and parameters to convert.
    
        The results of this function can be obtained accessing the object
        attributes (`query`, `params`, `types`, `formats`).
        """
        if isinstance(query, Template):
            return self._convert_template(query, vars)
    
        query = self._ensure_bytes(query)
    
        if vars is not None:
            if (
                len(query) <= MAX_CACHED_STATEMENT_LENGTH
                and len(vars) <= MAX_CACHED_STATEMENT_PARAMS
            ):
                f: _Query2PgClient = _query2pg_client
            else:
                f = _query2pg_client_nocache
    
            self.template, self._order, self._parts = f(query, self._tx.encoding)
        else:
            self.query = query
            self._order = None
    
>       self.dump(vars)

.venv/lib/python3.14........./site-packages/psycopg/_queries.py:289: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <psycopg._queries.PostgresClientQuery object at 0x7f5792f5ef80>
vars = ('Nf9SQoEMztYhhT1uXpnGJ5L4ryGOvpDr7nCi1qhu', datetime.datetime(2026, 4, 13, 20, 34, 20, 87902, tzinfo=datetime.timezone.utc), True, Binary('{}'), None, '', ...)

    def dump(self, vars: Params | None) -> None:
        """
        Process a new set of variables on the query processed by `convert()`.
    
        This method updates `params` and `types`.
        """
        if vars is not None:
            params = self.validate_and_reorder_params(self._parts, vars, self._order)
>           self.params = tuple(
                          ^^^^^
                self._tx.as_literal(p) if p is not None else b"NULL" for p in params
            )

.venv/lib/python3.14........./site-packages/psycopg/_queries.py:299: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

.0 = <tuple_iterator object at 0x7f5792b63a90>

    self.params = tuple(
>       self._tx.as_literal(p) if p is not None else b"NULL" for p in params
        ^^^^^^^^^^^^^^^^^^^^^^
    )

.venv/lib/python3.14........./site-packages/psycopg/_queries.py:300: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   ???

psycopg_c/_psycopg/transform.pyx:205: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   ???

psycopg_c/_psycopg/transform.pyx:214: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <psycopg.dbapi20.BinaryTextDumper (oid=17) at 0x7f5792f5f4d0>
obj = Binary('{}')

    def quote(self, obj: Buffer) -> Buffer:
>       if (escaped := self.dump(obj)) is None:
                       ^^^^^^^^^^^^^^

.venv/lib/python3.14.../psycopg/types/string.py:141: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <psycopg.dbapi20.BinaryTextDumper (oid=17) at 0x7f5792f5f4d0>
obj = Binary('{}')

    def dump(self, obj: Buffer | Binary) -> Buffer | None:
        if isinstance(obj, Binary):
>           return super().dump(obj.obj)
                   ^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../site-packages/psycopg/dbapi20.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <psycopg.dbapi20.BinaryTextDumper (oid=17) at 0x7f5792f5f4d0>, obj = '{}'

    def dump(self, obj: Buffer) -> Buffer | None:
>       return self._esc.escape_bytea(obj)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../psycopg/types/string.py:138: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   ???

psycopg_c/pq/escaping.pyx:90: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   ???

psycopg_c/pq/escaping.pyx:99: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   ???
E   TypeError: bytes or buffer expected, got <class 'str'>

psycopg_c/pq/pqbuffer.pyx:111: TypeError
authentik.core.tests.test_users_api.TestAgentUserAPI::test_token_rotate_by_agent_owner
Stack Traces | 1.6s run time
self = <unittest.case._Outcome object at 0x7f4c985e6cf0>
test_case = <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_token_rotate_by_agent_owner>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.14.3.............../x64/lib/python3.14/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_token_rotate_by_agent_owner>
result = <TestCaseFunction test_token_rotate_by_agent_owner>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.14.3.............../x64/lib/python3.14/unittest/case.py:669: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_token_rotate_by_agent_owner>
method = <bound method TestAgentUserAPI.test_token_rotate_by_agent_owner of <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_token_rotate_by_agent_owner>>

    def _callTestMethod(self, method):
>       result = method()
                 ^^^^^^^^

.../hostedtoolcache/Python/3.14.3.............../x64/lib/python3.14/unittest/case.py:615: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_token_rotate_by_agent_owner>

    def test_token_rotate_by_agent_owner(self):
        """Agent owner can rotate the agent's token"""
        agent = self._create_agent(owner=self.user)
        token = Token.objects.create(
            identifier=generate_id(),
            intent=TokenIntents.INTENT_API,
            user=agent,
            expiring=True,
        )
        original_key = token.key
        self.client.force_login(self.user)
        response = self.client.post(
            reverse("authentik_api:token-rotate", kwargs={"identifier": token.identifier}),
        )
>       self.assertEqual(response.status_code, 200)

.../core/tests/test_users_api.py:1072: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_token_rotate_by_agent_owner>
first = 403, second = 200, msg = None

    def assertEqual(self, first, second, msg=None):
        """Fail if the two objects are unequal as determined by the '=='
           operator.
        """
        assertion_func = self._getAssertEqualityFunc(first, second)
>       assertion_func(first, second, msg=msg)

.../hostedtoolcache/Python/3.14.3.............../x64/lib/python3.14/unittest/case.py:925: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_token_rotate_by_agent_owner>
first = 403, second = 200, msg = '403 != 200'

    def _baseAssertEqual(self, first, second, msg=None):
        """The default assertEqual implementation, not type specific."""
        if not first == second:
            standardMsg = '%s != %s' % _common_shorten_repr(first, second)
            msg = self._formatMessage(msg, standardMsg)
>           raise self.failureException(msg)
E           AssertionError: 403 != 200

.../hostedtoolcache/Python/3.14.3.............../x64/lib/python3.14/unittest/case.py:918: AssertionError
authentik.core.tests.test_users_api.TestAgentUserAPI::test_agent_create_non_internal_user
Stack Traces | 2.21s run time
self = <unittest.case._Outcome object at 0x7f4cb512f1c0>
test_case = <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_agent_create_non_internal_user>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.14.3.............../x64/lib/python3.14/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_agent_create_non_internal_user>
result = <TestCaseFunction test_agent_create_non_internal_user>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.14.3.............../x64/lib/python3.14/unittest/case.py:669: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_agent_create_non_internal_user>
method = <bound method TestAgentUserAPI.test_agent_create_non_internal_user of <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_agent_create_non_internal_user>>

    def _callTestMethod(self, method):
>       result = method()
                 ^^^^^^^^

.../hostedtoolcache/Python/3.14.3.............../x64/lib/python3.14/unittest/case.py:615: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_agent_create_non_internal_user>

    def test_agent_create_non_internal_user(self):
        """Only internal users can create agent users"""
        external = create_test_user()
        external.type = UserTypes.EXTERNAL
        external.save()
        external.assign_perms_to_managed_role("authentik_core.add_agent_user")
        self.client.force_login(external)
        with patch(
            "authentik.enterprise.license.LicenseKey.cached_summary",
            MagicMock(return_value=MagicMock(status=MagicMock(is_valid=True))),
        ):
            response = self.client.post(
                reverse("authentik_api:user-agent"),
                data={"name": "test-agent"},
            )
>       self.assertEqual(response.status_code, 400)

.../core/tests/test_users_api.py:968: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_agent_create_non_internal_user>
first = 403, second = 400, msg = None

    def assertEqual(self, first, second, msg=None):
        """Fail if the two objects are unequal as determined by the '=='
           operator.
        """
        assertion_func = self._getAssertEqualityFunc(first, second)
>       assertion_func(first, second, msg=msg)

.../hostedtoolcache/Python/3.14.3.............../x64/lib/python3.14/unittest/case.py:925: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_agent_create_non_internal_user>
first = 403, second = 400, msg = '403 != 400'

    def _baseAssertEqual(self, first, second, msg=None):
        """The default assertEqual implementation, not type specific."""
        if not first == second:
            standardMsg = '%s != %s' % _common_shorten_repr(first, second)
            msg = self._formatMessage(msg, standardMsg)
>           raise self.failureException(msg)
E           AssertionError: 403 != 400

.../hostedtoolcache/Python/3.14.3.............../x64/lib/python3.14/unittest/case.py:918: AssertionError
authentik.core.tests.test_users_api.TestAgentUserAPI::test_agent_allowed_apps_update_unauthorized
Stack Traces | 2.84s run time
self = <unittest.case._Outcome object at 0x7f4c8488d710>
test_case = <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_agent_allowed_apps_update_unauthorized>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.14.3.............../x64/lib/python3.14/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_agent_allowed_apps_update_unauthorized>
result = <TestCaseFunction test_agent_allowed_apps_update_unauthorized>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.14.3.............../x64/lib/python3.14/unittest/case.py:669: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_agent_allowed_apps_update_unauthorized>
method = <bound method TestAgentUserAPI.test_agent_allowed_apps_update_unauthorized of <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_agent_allowed_apps_update_unauthorized>>

    def _callTestMethod(self, method):
>       result = method()
                 ^^^^^^^^

.../hostedtoolcache/Python/3.14.3.............../x64/lib/python3.14/unittest/case.py:615: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_agent_allowed_apps_update_unauthorized>

    def test_agent_allowed_apps_update_unauthorized(self):
        """Non-owner cannot update the agent's allowed apps list"""
        other = create_test_user()
        agent = self._create_agent(owner=other)
        self.client.force_login(self.admin)
        response = self.client.put(
            reverse("authentik_api:user-agent-allowed-apps", kwargs={"pk": agent.pk}),
            data={"allowed_apps": []},
            content_type="application/json",
        )
>       self.assertEqual(response.status_code, 403)

.../core/tests/test_users_api.py:1046: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_agent_allowed_apps_update_unauthorized>
first = 200, second = 403, msg = None

    def assertEqual(self, first, second, msg=None):
        """Fail if the two objects are unequal as determined by the '=='
           operator.
        """
        assertion_func = self._getAssertEqualityFunc(first, second)
>       assertion_func(first, second, msg=msg)

.../hostedtoolcache/Python/3.14.3.............../x64/lib/python3.14/unittest/case.py:925: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <authentik.core.tests.test_users_api.TestAgentUserAPI testMethod=test_agent_allowed_apps_update_unauthorized>
first = 200, second = 403, msg = '200 != 403'

    def _baseAssertEqual(self, first, second, msg=None):
        """The default assertEqual implementation, not type specific."""
        if not first == second:
            standardMsg = '%s != %s' % _common_shorten_repr(first, second)
            msg = self._formatMessage(msg, standardMsg)
>           raise self.failureException(msg)
E           AssertionError: 200 != 403

.../hostedtoolcache/Python/3.14.3.............../x64/lib/python3.14/unittest/case.py:918: AssertionError
tests.e2e.test_provider_rac.TestProviderRAC::test_rac_ssh
Stack Traces | 162s run time
self = <docker.api.client.APIClient object at 0x7fc47cb62cf0>
response = <Response [404]>

    def _raise_for_status(self, response):
        """Raises stored :class:`APIError`, if one occurred."""
        try:
>           response.raise_for_status()

.venv/lib/python3.14.../docker/api/client.py:275: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Response [404]>

    def raise_for_status(self):
        """Raises :class:`HTTPError`, if one occurred."""
    
        http_error_msg = ""
        if isinstance(self.reason, bytes):
            # We attempt to decode utf-8 first because some servers
            # choose to localize their reason strings. If the string
            # isn't utf-8, we fall back to iso-8859-1 for all other
            # encodings. (See PR #3538)
            try:
                reason = self.reason.decode("utf-8")
            except UnicodeDecodeError:
                reason = self.reason.decode("iso-8859-1")
        else:
            reason = self.reason
    
        if 400 <= self.status_code < 500:
            http_error_msg = (
                f"{self.status_code} Client Error: {reason} for url: {self.url}"
            )
    
        elif 500 <= self.status_code < 600:
            http_error_msg = (
                f"{self.status_code} Server Error: {reason} for url: {self.url}"
            )
    
        if http_error_msg:
>           raise HTTPError(http_error_msg, response=self)
E           requests.exceptions.HTTPError: 404 Client Error: Not Found for url: http+docker:.../localhost/v1.48/containers/create

.venv/lib/python3.14....../site-packages/requests/models.py:1028: HTTPError

The above exception was the direct cause of the following exception:

self = <docker.models.containers.ContainerCollection object at 0x7fc471915fd0>
image = 'lscr.io/linuxserver/openssh-server:latest', command = None
stdout = True, stderr = False, remove = False
kwargs = {'environment': {'AUTHENTIK_HOST': 'http://10.1.0.96:44665', 'PASSWORD_ACCESS': 'true', 'SUDO_ACCESS': 'true', 'USER_N...FkKyLbMcEiS5pfz5Gxn'}, 'network': 'authentik-test-UNtGypDe6D9c4UGy16d7rnP3tMRIDzaFFoCNe34h', 'ports': {'2222': '2222'}}
stream = False, detach = True, platform = None

    def run(self, image, command=None, stdout=True, stderr=False,
            remove=False, **kwargs):
        """
        Run a container. By default, it will wait for the container to finish
        and return its logs, similar to ``docker run``.
    
        If the ``detach`` argument is ``True``, it will start the container
        and immediately return a :py:class:`Container` object, similar to
        ``docker run -d``.
    
        Example:
            Run a container and get its output:
    
            >>> import docker
            >>> client = docker.from_env()
            >>> client.containers.run('alpine', 'echo hello world')
            b'hello world\\n'
    
            Run a container and detach:
    
            >>> container = client.containers.run('bfirsh/reticulate-splines',
                                                  detach=True)
            >>> container.logs()
            'Reticulating spline 1...\\nReticulating spline 2...\\n'
    
        Args:
            image (str): The image to run.
            command (str or list): The command to run in the container.
            auto_remove (bool): enable auto-removal of the container on daemon
                side when the container's process exits.
            blkio_weight_device: Block IO weight (relative device weight) in
                the form of: ``[{"Path": "device_path", "Weight": weight}]``.
            blkio_weight: Block IO weight (relative weight), accepts a weight
                value between 10 and 1000.
            cap_add (list of str): Add kernel capabilities. For example,
                ``["SYS_ADMIN", "MKNOD"]``.
            cap_drop (list of str): Drop kernel capabilities.
            cgroup_parent (str): Override the default parent cgroup.
            cgroupns (str): Override the default cgroup namespace mode for the
                container. One of:
                - ``private`` the container runs in its own private cgroup
                  namespace.
                - ``host`` use the host system's cgroup namespace.
            cpu_count (int): Number of usable CPUs (Windows only).
            cpu_percent (int): Usable percentage of the available CPUs
                (Windows only).
            cpu_period (int): The length of a CPU period in microseconds.
            cpu_quota (int): Microseconds of CPU time that the container can
                get in a CPU period.
            cpu_rt_period (int): Limit CPU real-time period in microseconds.
            cpu_rt_runtime (int): Limit CPU real-time runtime in microseconds.
            cpu_shares (int): CPU shares (relative weight).
            cpuset_cpus (str): CPUs in which to allow execution (``0-3``,
                ``0,1``).
            cpuset_mems (str): Memory nodes (MEMs) in which to allow execution
                (``0-3``, ``0,1``). Only effective on NUMA systems.
            detach (bool): Run container in the background and return a
                :py:class:`Container` object.
            device_cgroup_rules (:py:class:`list`): A list of cgroup rules to
                apply to the container.
            device_read_bps: Limit read rate (bytes per second) from a device
                in the form of: `[{"Path": "device_path", "Rate": rate}]`
            device_read_iops: Limit read rate (IO per second) from a device.
            device_write_bps: Limit write rate (bytes per second) from a
                device.
            device_write_iops: Limit write rate (IO per second) from a device.
            devices (:py:class:`list`): Expose host devices to the container,
                as a list of strings in the form
                ``<path_on_host>:<path_in_container>:<cgroup_permissions>``.
    
                For example, ``/dev/sda:/dev/xvda:rwm`` allows the container
                to have read-write access to the host's ``/dev/sda`` via a
                node named ``/dev/xvda`` inside the container.
            device_requests (:py:class:`list`): Expose host resources such as
                GPUs to the container, as a list of
                :py:class:`docker.types.DeviceRequest` instances.
            dns (:py:class:`list`): Set custom DNS servers.
            dns_opt (:py:class:`list`): Additional options to be added to the
                container's ``resolv.conf`` file.
            dns_search (:py:class:`list`): DNS search domains.
            domainname (str or list): Set custom DNS search domains.
            entrypoint (str or list): The entrypoint for the container.
            environment (dict or list): Environment variables to set inside
                the container, as a dictionary or a list of strings in the
                format ``["SOMEVARIABLE=xxx"]``.
            extra_hosts (dict): Additional hostnames to resolve inside the
                container, as a mapping of hostname to IP address.
            group_add (:py:class:`list`): List of additional group names and/or
                IDs that the container process will run as.
            healthcheck (dict): Specify a test to perform to check that the
                container is healthy. The dict takes the following keys:
    
                - test (:py:class:`list` or str): Test to perform to determine
                    container health. Possible values:
    
                    - Empty list: Inherit healthcheck from parent image
                    - ``["NONE"]``: Disable healthcheck
                    - ``["CMD", args...]``: exec arguments directly.
                    - ``["CMD-SHELL", command]``: Run command in the system's
                      default shell.
    
                    If a string is provided, it will be used as a ``CMD-SHELL``
                    command.
                - interval (int): The time to wait between checks in
                  nanoseconds. It should be 0 or at least 1000000 (1 ms).
                - timeout (int): The time to wait before considering the check
                  to have hung. It should be 0 or at least 1000000 (1 ms).
                - retries (int): The number of consecutive failures needed to
                    consider a container as unhealthy.
                - start_period (int): Start period for the container to
                    initialize before starting health-retries countdown in
                    nanoseconds. It should be 0 or at least 1000000 (1 ms).
            hostname (str): Optional hostname for the container.
            init (bool): Run an init inside the container that forwards
                signals and reaps processes
            init_path (str): Path to the docker-init binary
            ipc_mode (str): Set the IPC mode for the container.
            isolation (str): Isolation technology to use. Default: `None`.
            kernel_memory (int or str): Kernel memory limit
            labels (dict or list): A dictionary of name-value labels (e.g.
                ``{"label1": "value1", "label2": "value2"}``) or a list of
                names of labels to set with empty values (e.g.
                ``["label1", "label2"]``)
            links (dict): Mapping of links using the
                ``{'container': 'alias'}`` format. The alias is optional.
                Containers declared in this dict will be linked to the new
                container using the provided alias. Default: ``None``.
            log_config (LogConfig): Logging configuration.
            lxc_conf (dict): LXC config.
            mac_address (str): MAC address to assign to the container.
            mem_limit (int or str): Memory limit. Accepts float values
                (which represent the memory limit of the created container in
                bytes) or a string with a units identification char
                (``100000b``, ``1000k``, ``128m``, ``1g``). If a string is
                specified without a units character, bytes are assumed as an
                intended unit.
            mem_reservation (int or str): Memory soft limit.
            mem_swappiness (int): Tune a container's memory swappiness
                behavior. Accepts number between 0 and 100.
            memswap_limit (str or int): Maximum amount of memory + swap a
                container is allowed to consume.
            mounts (:py:class:`list`): Specification for mounts to be added to
                the container. More powerful alternative to ``volumes``. Each
                item in the list is expected to be a
                :py:class:`docker.types.Mount` object.
            name (str): The name for this container.
            nano_cpus (int):  CPU quota in units of 1e-9 CPUs.
            network (str): Name of the network this container will be connected
                to at creation time. You can connect to additional networks
                using :py:meth:`Network.connect`. Incompatible with
                ``network_mode``.
            network_disabled (bool): Disable networking.
            network_mode (str): One of:
    
                - ``bridge`` Create a new network stack for the container on
                  the bridge network.
                - ``none`` No networking for this container.
                - ``container:<name|id>`` Reuse another container's network
                  stack.
                - ``host`` Use the host network stack.
                  This mode is incompatible with ``ports``.
    
                Incompatible with ``network``.
            networking_config (Dict[str, EndpointConfig]):
                Dictionary of EndpointConfig objects for each container network.
                The key is the name of the network.
                Defaults to ``None``.
    
                Used in conjuction with ``network``.
    
                Incompatible with ``network_mode``.
            oom_kill_disable (bool): Whether to disable OOM killer.
            oom_score_adj (int): An integer value containing the score given
                to the container in order to tune OOM killer preferences.
            pid_mode (str): If set to ``host``, use the host PID namespace
                inside the container.
            pids_limit (int): Tune a container's pids limit. Set ``-1`` for
                unlimited.
            platform (str): Platform in the format ``os[/arch[/variant]]``.
                Only used if the method needs to pull the requested image.
            ports (dict): Ports to bind inside the container.
    
                The keys of the dictionary are the ports to bind inside the
                container, either as an integer or a string in the form
                ``port/protocol``, where the protocol is either ``tcp``,
                ``udp``, or ``sctp``.
    
                The values of the dictionary are the corresponding ports to
                open on the host, which can be either:
    
                - The port number, as an integer. For example,
                  ``{'2222/tcp': 3333}`` will expose port 2222 inside the
                  container as port 3333 on the host.
                - ``None``, to assign a random host port. For example,
                  ``{'2222/tcp': None}``.
                - A tuple of ``(address, port)`` if you want to specify the
                  host interface. For example,
                  ``{'1111/tcp': ('127.0.0.1', 1111)}``.
                - A list of integers, if you want to bind multiple host ports
                  to a single container port. For example,
                  ``{'1111/tcp': [1234, 4567]}``.
    
                Incompatible with ``host`` network mode.
            privileged (bool): Give extended privileges to this container.
            publish_all_ports (bool): Publish all ports to the host.
            read_only (bool): Mount the container's root filesystem as read
                only.
            remove (bool): Remove the container when it has finished running.
                Default: ``False``.
            restart_policy (dict): Restart the container when it exits.
                Configured as a dictionary with keys:
    
                - ``Name`` One of ``on-failure``, or ``always``.
                - ``MaximumRetryCount`` Number of times to restart the
                  container on failure.
    
                For example:
                ``{"Name": "on-failure", "MaximumRetryCount": 5}``
    
            runtime (str): Runtime to use with this container.
            security_opt (:py:class:`list`): A list of string values to
                customize labels for MLS systems, such as SELinux.
            shm_size (str or int): Size of /dev/shm (e.g. ``1G``).
            stdin_open (bool): Keep ``STDIN`` open even if not attached.
            stdout (bool): Return logs from ``STDOUT`` when ``detach=False``.
                Default: ``True``.
            stderr (bool): Return logs from ``STDERR`` when ``detach=False``.
                Default: ``False``.
            stop_signal (str): The stop signal to use to stop the container
                (e.g. ``SIGINT``).
            storage_opt (dict): Storage driver options per container as a
                key-value mapping.
            stream (bool): If true and ``detach`` is false, return a log
                generator instead of a string. Ignored if ``detach`` is true.
                Default: ``False``.
            sysctls (dict): Kernel parameters to set in the container.
            tmpfs (dict): Temporary filesystems to mount, as a dictionary
                mapping a path inside the container to options for that path.
    
                For example:
    
                .. code-block:: python
    
                    {
                        '/mnt/vol2': '',
                        '/mnt/vol1': 'size=3G,uid=1000'
                    }
    
            tty (bool): Allocate a pseudo-TTY.
            ulimits (:py:class:`list`): Ulimits to set inside the container,
                as a list of :py:class:`docker.types.Ulimit` instances.
            use_config_proxy (bool): If ``True``, and if the docker client
                configuration file (``~/.docker/config.json`` by default)
                contains a proxy configuration, the corresponding environment
                variables will be set in the container being built.
            user (str or int): Username or UID to run commands as inside the
                container.
            userns_mode (str): Sets the user namespace mode for the container
                when user namespace remapping option is enabled. Supported
                values are: ``host``
            uts_mode (str): Sets the UTS namespace mode for the container.
                Supported values are: ``host``
            version (str): The version of the API to use. Set to ``auto`` to
                automatically detect the server's version. Default: ``1.35``
            volume_driver (str): The name of a volume driver/plugin.
            volumes (dict or list): A dictionary to configure volumes mounted
                inside the container. The key is either the host path or a
                volume name, and the value is a dictionary with the keys:
    
                - ``bind`` The path to mount the volume inside the container
                - ``mode`` Either ``rw`` to mount the volume read/write, or
                  ``ro`` to mount it read-only.
    
                For example:
    
                .. code-block:: python
    
                    {'/home/user1/': {'bind': '/mnt/vol2', 'mode': 'rw'},
                     '/var/www': {'bind': '/mnt/vol1', 'mode': 'ro'}}
    
                Or a list of strings which each one of its elements specifies a
                mount volume.
    
                For example:
    
                .. code-block:: python
    
                    ['/home/user1/:/mnt/vol2','/var/www:/mnt/vol1']
    
            volumes_from (:py:class:`list`): List of container names or IDs to
                get volumes from.
            working_dir (str): Path to the working directory.
    
        Returns:
            The container logs, either ``STDOUT``, ``STDERR``, or both,
            depending on the value of the ``stdout`` and ``stderr`` arguments.
    
            ``STDOUT`` and ``STDERR`` may be read only if either ``json-file``
            or ``journald`` logging driver used. Thus, if you are using none of
            these drivers, a ``None`` object is returned instead. See the
            `Engine API documentation
            <https://docs.docker..../engine/api/v1.30/#operation/ContainerLogs/>`_
            for full details.
    
            If ``detach`` is ``True``, a :py:class:`Container` object is
            returned instead.
    
        Raises:
            :py:class:`docker.errors.ContainerError`
                If the container exits with a non-zero exit code and
                ``detach`` is ``False``.
            :py:class:`docker.errors.ImageNotFound`
                If the specified image does not exist.
            :py:class:`docker.errors.APIError`
                If the server returns an error.
        """
        if isinstance(image, Image):
            image = image.id
        stream = kwargs.pop('stream', False)
        detach = kwargs.pop('detach', False)
        platform = kwargs.get('platform', None)
    
        if detach and remove:
            if version_gte(self.client.api._version, '1.25'):
                kwargs["auto_remove"] = True
            else:
                raise RuntimeError("The options 'detach' and 'remove' cannot "
                                   "be used together in api versions < 1.25.")
    
        if kwargs.get('network') and kwargs.get('network_mode'):
            raise RuntimeError(
                'The options "network" and "network_mode" can not be used '
                'together.'
            )
    
        if kwargs.get('networking_config') and not kwargs.get('network'):
            raise RuntimeError(
                'The option "networking_config" can not be used '
                'without "network".'
            )
    
        try:
>           container = self.create(image=image, command=command,
                                    detach=detach, **kwargs)

.venv/lib/python3.14.../docker/models/containers.py:876: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.models.containers.ContainerCollection object at 0x7fc471915fd0>
image = 'lscr.io/linuxserver/openssh-server:latest', command = None, kwargs = {}
create_kwargs = {'command': None, 'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.96:44665', 'PASSWORD_ACCESS': 'true...st-UNtGypDe6D9c4UGy16d7rnP3tMRIDzaFFoCNe34h', 'PortBindings': {'2222/tcp': [{'HostIp': '', 'HostPort': '2222'}]}}, ...}

    def create(self, image, command=None, **kwargs):
        """
        Create a container without starting it. Similar to ``docker create``.
    
        Takes the same arguments as :py:meth:`run`, except for ``stdout``,
        ``stderr``, and ``remove``.
    
        Returns:
            A :py:class:`Container` object.
    
        Raises:
            :py:class:`docker.errors.ImageNotFound`
                If the specified image does not exist.
            :py:class:`docker.errors.APIError`
                If the server returns an error.
        """
        if isinstance(image, Image):
            image = image.id
        kwargs['image'] = image
        kwargs['command'] = command
        kwargs['version'] = self.client.api._version
        create_kwargs = _create_container_args(kwargs)
>       resp = self.client.api.create_container(**create_kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../docker/models/containers.py:935: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.api.client.APIClient object at 0x7fc47cb62cf0>
image = 'lscr.io/linuxserver/openssh-server:latest', command = None
hostname = None, user = None, detach = True, stdin_open = False, tty = False
ports = [('2222', 'tcp')]
environment = ['USER_NAME=authentik', 'USER_PASSWORD=UgJ2Z6aIyMblUXSfxMbG3VSa78NLugf5jTB0TU76', 'PASSWORD_ACCESS=true', 'SUDO_ACCESS=true', 'AUTHENTIK_HOST=http://10.1.0.96:44665']
volumes = None, network_disabled = False, name = None, entrypoint = None
working_dir = None, domainname = None
host_config = {'NetworkMode': 'authentik-test-UNtGypDe6D9c4UGy16d7rnP3tMRIDzaFFoCNe34h', 'PortBindings': {'2222/tcp': [{'HostIp': '', 'HostPort': '2222'}]}}
mac_address = None
labels = {'io.goauthentik.test': 'ZpFIXX6tVS97QXQg1CEfxFkKyLbMcEiS5pfz5Gxn'}
stop_signal = None
networking_config = {'authentik-test-UNtGypDe6D9c4UGy16d7rnP3tMRIDzaFFoCNe34h': None}
healthcheck = None, stop_timeout = None, runtime = None, use_config_proxy = True
platform = None

    def create_container(self, image, command=None, hostname=None, user=None,
                         detach=False, stdin_open=False, tty=False, ports=None,
                         environment=None, volumes=None,
                         network_disabled=False, name=None, entrypoint=None,
                         working_dir=None, domainname=None, host_config=None,
                         mac_address=None, labels=None, stop_signal=None,
                         networking_config=None, healthcheck=None,
                         stop_timeout=None, runtime=None,
                         use_config_proxy=True, platform=None):
        """
        Creates a container. Parameters are similar to those for the ``docker
        run`` command except it doesn't support the attach options (``-a``).
    
        The arguments that are passed directly to this function are
        host-independent configuration options. Host-specific configuration
        is passed with the `host_config` argument. You'll normally want to
        use this method in combination with the :py:meth:`create_host_config`
        method to generate ``host_config``.
    
        **Port bindings**
    
        Port binding is done in two parts: first, provide a list of ports to
        open inside the container with the ``ports`` parameter, then declare
        bindings with the ``host_config`` parameter. For example:
    
        .. code-block:: python
    
            container_id = client.api.create_container(
                'busybox', 'ls', ports=[1111, 2222],
                host_config=client.api.create_host_config(port_bindings={
                    1111: 4567,
                    2222: None
                })
            )
    
    
        You can limit the host address on which the port will be exposed like
        such:
    
        .. code-block:: python
    
            client.api.create_host_config(
                port_bindings={1111: ('127.0.0.1', 4567)}
            )
    
        Or without host port assignment:
    
        .. code-block:: python
    
            client.api.create_host_config(port_bindings={1111: ('127.0.0.1',)})
    
        If you wish to use UDP instead of TCP (default), you need to declare
        ports as such in both the config and host config:
    
        .. code-block:: python
    
            container_id = client.api.create_container(
                'busybox', 'ls', ports=[(1111, 'udp'), 2222],
                host_config=client.api.create_host_config(port_bindings={
                    '1111/udp': 4567, 2222: None
                })
            )
    
        To bind multiple host ports to a single container port, use the
        following syntax:
    
        .. code-block:: python
    
            client.api.create_host_config(port_bindings={
                1111: [1234, 4567]
            })
    
        You can also bind multiple IPs to a single container port:
    
        .. code-block:: python
    
            client.api.create_host_config(port_bindings={
                1111: [
                    ('192.168.0.100', 1234),
                    ('192.168.0.101', 1234)
                ]
            })
    
        **Using volumes**
    
        Volume declaration is done in two parts. Provide a list of
        paths to use as mountpoints inside the container with the
        ``volumes`` parameter, and declare mappings from paths on the host
        in the ``host_config`` section.
    
        .. code-block:: python
    
            container_id = client.api.create_container(
                'busybox', 'ls', volumes=['/mnt/vol1', '/mnt/vol2'],
                host_config=client.api.create_host_config(binds={
                    '/home/user1/': {
                        'bind': '/mnt/vol2',
                        'mode': 'rw',
                    },
                    '/var/www': {
                        'bind': '/mnt/vol1',
                        'mode': 'ro',
                    },
                    '/autofs/user1': {
                        'bind': '/mnt/vol3',
                        'mode': 'rw',
                        'propagation': 'shared'
                    }
                })
            )
    
        You can alternatively specify binds as a list. This code is equivalent
        to the example above:
    
        .. code-block:: python
    
            container_id = client.api.create_container(
                'busybox', 'ls', volumes=['/mnt/vol1', '/mnt/vol2', '/mnt/vol3'],
                host_config=client.api.create_host_config(binds=[
                    '/home/user1/:/mnt/vol2',
                    '/var/www:/mnt/vol1:ro',
                    '/autofs/user1:/mnt/vol3:rw,shared',
                ])
            )
    
        **Networking**
    
        You can specify networks to connect the container to by using the
        ``networking_config`` parameter. At the time of creation, you can
        only connect a container to a single networking, but you
        can create more connections by using
        :py:meth:`~connect_container_to_network`.
    
        For example:
    
        .. code-block:: python
    
            networking_config = client.api.create_networking_config({
                'network1': client.api.create_endpoint_config(
                    ipv4_address='172.28.0.124',
                    aliases=['foo', 'bar'],
                    links=['container2']
                )
            })
    
            ctnr = client.api.create_container(
                img, command, networking_config=networking_config
            )
    
        Args:
            image (str): The image to run
            command (str or list): The command to be run in the container
            hostname (str): Optional hostname for the container
            user (str or int): Username or UID
            detach (bool): Detached mode: run container in the background and
                return container ID
            stdin_open (bool): Keep STDIN open even if not attached
            tty (bool): Allocate a pseudo-TTY
            ports (list of ints): A list of port numbers
            environment (dict or list): A dictionary or a list of strings in
                the following format ``["PASSWORD=xxx"]`` or
                ``{"PASSWORD": "xxx"}``.
            volumes (str or list): List of paths inside the container to use
                as volumes.
            network_disabled (bool): Disable networking
            name (str): A name for the container
            entrypoint (str or list): An entrypoint
            working_dir (str): Path to the working directory
            domainname (str): The domain name to use for the container
            host_config (dict): A dictionary created with
                :py:meth:`create_host_config`.
            mac_address (str): The Mac Address to assign the container
            labels (dict or list): A dictionary of name-value labels (e.g.
                ``{"label1": "value1", "label2": "value2"}``) or a list of
                names of labels to set with empty values (e.g.
                ``["label1", "label2"]``)
            stop_signal (str): The stop signal to use to stop the container
                (e.g. ``SIGINT``).
            stop_timeout (int): Timeout to stop the container, in seconds.
                Default: 10
            networking_config (dict): A networking configuration generated
                by :py:meth:`create_networking_config`.
            runtime (str): Runtime to use with this container.
            healthcheck (dict): Specify a test to perform to check that the
                container is healthy.
            use_config_proxy (bool): If ``True``, and if the docker client
                configuration file (``~/.docker/config.json`` by default)
                contains a proxy configuration, the corresponding environment
                variables will be set in the container being created.
            platform (str): Platform in the format ``os[/arch[/variant]]``.
    
        Returns:
            A dictionary with an image 'Id' key and a 'Warnings' key.
    
        Raises:
            :py:class:`docker.errors.ImageNotFound`
                If the specified image does not exist.
            :py:class:`docker.errors.APIError`
                If the server returns an error.
        """
        if isinstance(volumes, str):
            volumes = [volumes, ]
    
        if isinstance(environment, dict):
            environment = utils.utils.format_environment(environment)
    
        if use_config_proxy:
            environment = self._proxy_configs.inject_proxy_environment(
                environment
            ) or None
    
        config = self.create_container_config(
            image, command, hostname, user, detach, stdin_open, tty,
            ports, environment, volumes,
            network_disabled, entrypoint, working_dir, domainname,
            host_config, mac_address, labels,
            stop_signal, networking_config, healthcheck,
            stop_timeout, runtime
        )
>       return self.create_container_from_config(config, name, platform)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../docker/api/container.py:440: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.api.client.APIClient object at 0x7fc47cb62cf0>
config = {'Hostname': None, 'Domainname': None, 'ExposedPorts': {'2222/tcp': {}}, 'User': None, 'Tty': False, 'OpenStdin': Fals...IXX6tVS97QXQg1CEfxFkKyLbMcEiS5pfz5Gxn'}, 'StopSignal': None, 'Healthcheck': None, 'StopTimeout': None, 'Runtime': None}
name = None, platform = None

    def create_container_from_config(self, config, name=None, platform=None):
        u = self._url("/containers/create")
        params = {
            'name': name
        }
        if platform:
            if utils.version_lt(self._version, '1.41'):
                raise errors.InvalidVersion(
                    'platform is not supported for API version < 1.41'
                )
            params['platform'] = platform
        res = self._post_json(u, data=config, params=params)
>       return self._result(res, True)
               ^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../docker/api/container.py:457: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.api.client.APIClient object at 0x7fc47cb62cf0>
response = <Response [404]>, json = True, binary = False

    def _result(self, response, json=False, binary=False):
        assert not (json and binary)
>       self._raise_for_status(response)

.venv/lib/python3.14.../docker/api/client.py:281: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.api.client.APIClient object at 0x7fc47cb62cf0>
response = <Response [404]>

    def _raise_for_status(self, response):
        """Raises stored :class:`APIError`, if one occurred."""
        try:
            response.raise_for_status()
        except requests.exceptions.HTTPError as e:
>           raise create_api_error_from_http_exception(e) from e
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../docker/api/client.py:277: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

e = HTTPError('404 Client Error: Not Found for url: http+docker:.../localhost/v1.48/containers/create')

    def create_api_error_from_http_exception(e):
        """
        Create a suitable APIError from requests.exceptions.HTTPError.
        """
        response = e.response
        try:
            explanation = response.json()['message']
        except ValueError:
            explanation = (response.text or '').strip()
        cls = APIError
        if response.status_code == 404:
            explanation_msg = (explanation or '').lower()
            if any(fragment in explanation_msg
                   for fragment in _image_not_found_explanation_fragments):
                cls = ImageNotFound
            else:
                cls = NotFound
>       raise cls(e, response=response, explanation=explanation) from e
E       docker.errors.ImageNotFound: 404 Client Error for http+docker:.../localhost/v1.48/containers/create: Not Found ("No such image: lscr.io/linuxserver/openssh-server:latest")

.venv/lib/python3.14....../site-packages/docker/errors.py:39: ImageNotFound

During handling of the above exception, another exception occurred:

self = <docker.api.client.APIClient object at 0x7fc47cb62cf0>
response = <Response [500]>

    def _raise_for_status(self, response):
        """Raises stored :class:`APIError`, if one occurred."""
        try:
>           response.raise_for_status()

.venv/lib/python3.14.../docker/api/client.py:275: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Response [500]>

    def raise_for_status(self):
        """Raises :class:`HTTPError`, if one occurred."""
    
        http_error_msg = ""
        if isinstance(self.reason, bytes):
            # We attempt to decode utf-8 first because some servers
            # choose to localize their reason strings. If the string
            # isn't utf-8, we fall back to iso-8859-1 for all other
            # encodings. (See PR #3538)
            try:
                reason = self.reason.decode("utf-8")
            except UnicodeDecodeError:
                reason = self.reason.decode("iso-8859-1")
        else:
            reason = self.reason
    
        if 400 <= self.status_code < 500:
            http_error_msg = (
                f"{self.status_code} Client Error: {reason} for url: {self.url}"
            )
    
        elif 500 <= self.status_code < 600:
            http_error_msg = (
                f"{self.status_code} Server Error: {reason} for url: {self.url}"
            )
    
        if http_error_msg:
>           raise HTTPError(http_error_msg, response=self)
E           requests.exceptions.HTTPError: 500 Server Error: Internal Server Error for url: http+docker:.../localhost/v1.48/images/create?tag=latest&fromImage=lscr.io%2Flinuxserver%2Fopenssh-server

.venv/lib/python3.14....../site-packages/requests/models.py:1028: HTTPError

The above exception was the direct cause of the following exception:

self = <unittest.case._Outcome object at 0x7fc47c33a3c0>
test_case = <tests.e2e.test_provider_rac.TestProviderRAC testMethod=test_rac_ssh>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.14.3........./x64/lib/python3.14/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_rac.TestProviderRAC testMethod=test_rac_ssh>
result = <TestCaseFunction test_rac_ssh>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.14.3........./x64/lib/python3.14/unittest/case.py:669: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_rac.TestProviderRAC testMethod=test_rac_ssh>
method = <bound method TestProviderRAC.test_rac_ssh of <tests.e2e.test_provider_rac.TestProviderRAC testMethod=test_rac_ssh>>

    def _callTestMethod(self, method):
>       result = method()
                 ^^^^^^^^

.../hostedtoolcache/Python/3.14.3........./x64/lib/python3.14/unittest/case.py:615: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_rac.TestProviderRAC testMethod=test_rac_ssh>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^

tests/decorators.py:60: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_rac.TestProviderRAC testMethod=test_rac_ssh>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)
               ^^^^^^^^^^^^^^^^^^^^^

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_rac.TestProviderRAC testMethod=test_rac_ssh>,)
kwargs = {}, file = 'default/flow-default-provider-invalidation.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Provider invalidation flow\nentries:\n- attrs:\n    designation: invalidatio...ation: none\n  identifiers:\n    slug: default-provider-invalidation-flow\n  model: authentik_flows.flow\n  id: flow\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)
               ^^^^^^^^^^^^^^^^^^^^^

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_rac.TestProviderRAC testMethod=test_rac_ssh>,)
kwargs = {}, file = 'system/providers-rac.yaml'
content = 'version: 1\nmetadata:\n  labels:\n    blueprints.goauthentik.io/system: "true"\n  name: System - RAC Provider - Mappi...uthentik default RAC Mapping: SSH Default settings"\n      static_settings:\n        terminal-type: "xterm-256color"\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)
               ^^^^^^^^^^^^^^^^^^^^^

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_rac.TestProviderRAC testMethod=test_rac_ssh>,)
kwargs = {}, config = <AuthentikCryptoConfig: authentik_crypto>

    @wraps(func)
    def wrapper(*args, **kwargs):
        config = apps.get_app_config(app_name)
        if isinstance(config, ManagedAppConfig):
            config._on_startup_callback(None)
>       return func(*args, **kwargs)
               ^^^^^^^^^^^^^^^^^^^^^

.../blueprints/tests/__init__.py:43: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_rac.TestProviderRAC testMethod=test_rac_ssh>

    @retry()
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    @apply_blueprint(
        "default/flow-default-provider-authorization-implicit-consent.yaml",
        "default/flow-default-provider-invalidation.yaml",
    )
    @apply_blueprint(
        "system/providers-rac.yaml",
    )
    @reconcile_app("authentik_crypto")
    def test_rac_ssh(self):
        """Test SSH RAC"""
>       test_ssh = self.run_container(
            image="lscr.io/linuxserver/openssh-server:latest",
            ports={
                "2222": "2222",
            },
            environment={
                "USER_NAME": "authentik",
                "USER_PASSWORD": self.password,
                "PASSWORD_ACCESS": "true",
                "SUDO_ACCESS": "true",
            },
        )

tests/e2e/test_provider_rac.py:49: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_rac.TestProviderRAC testMethod=test_rac_ssh>
specs = {'detach': True, 'environment': {'AUTHENTIK_HOST': 'http://10.1.0.96:44665', 'PASSWORD_ACCESS': 'true', 'SUDO_ACCESS':...linuxserver/openssh-server:latest', 'labels': {'io.goauthentik.test': 'ZpFIXX6tVS97QXQg1CEfxFkKyLbMcEiS5pfz5Gxn'}, ...}

    def run_container(self, **specs: Any) -> Container:
        if "network_mode" not in specs:
            specs["network"] = self.__network.name
        specs["labels"] = self.docker_labels
        specs["detach"] = True
        if hasattr(self, "live_server_url"):
            specs.setdefault("environment", {})
            specs["environment"]["AUTHENTIK_HOST"] = self.live_server_url
>       container: Container = self.docker_client.containers.run(**specs)
                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

tests/docker.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.models.containers.ContainerCollection object at 0x7fc471915fd0>
image = 'lscr.io/linuxserver/openssh-server:latest', command = None
stdout = True, stderr = False, remove = False
kwargs = {'environment': {'AUTHENTIK_HOST': 'http://10.1.0.96:44665', 'PASSWORD_ACCESS': 'true', 'SUDO_ACCESS': 'true', 'USER_N...FkKyLbMcEiS5pfz5Gxn'}, 'network': 'authentik-test-UNtGypDe6D9c4UGy16d7rnP3tMRIDzaFFoCNe34h', 'ports': {'2222': '2222'}}
stream = False, detach = True, platform = None

    def run(self, image, command=None, stdout=True, stderr=False,
            remove=False, **kwargs):
        """
        Run a container. By default, it will wait for the container to finish
        and return its logs, similar to ``docker run``.
    
        If the ``detach`` argument is ``True``, it will start the container
        and immediately return a :py:class:`Container` object, similar to
        ``docker run -d``.
    
        Example:
            Run a container and get its output:
    
            >>> import docker
            >>> client = docker.from_env()
            >>> client.containers.run('alpine', 'echo hello world')
            b'hello world\\n'
    
            Run a container and detach:
    
            >>> container = client.containers.run('bfirsh/reticulate-splines',
                                                  detach=True)
            >>> container.logs()
            'Reticulating spline 1...\\nReticulating spline 2...\\n'
    
        Args:
            image (str): The image to run.
            command (str or list): The command to run in the container.
            auto_remove (bool): enable auto-removal of the container on daemon
                side when the container's process exits.
            blkio_weight_device: Block IO weight (relative device weight) in
                the form of: ``[{"Path": "device_path", "Weight": weight}]``.
            blkio_weight: Block IO weight (relative weight), accepts a weight
                value between 10 and 1000.
            cap_add (list of str): Add kernel capabilities. For example,
                ``["SYS_ADMIN", "MKNOD"]``.
            cap_drop (list of str): Drop kernel capabilities.
            cgroup_parent (str): Override the default parent cgroup.
            cgroupns (str): Override the default cgroup namespace mode for the
                container. One of:
                - ``private`` the container runs in its own private cgroup
                  namespace.
                - ``host`` use the host system's cgroup namespace.
            cpu_count (int): Number of usable CPUs (Windows only).
            cpu_percent (int): Usable percentage of the available CPUs
                (Windows only).
            cpu_period (int): The length of a CPU period in microseconds.
            cpu_quota (int): Microseconds of CPU time that the container can
                get in a CPU period.
            cpu_rt_period (int): Limit CPU real-time period in microseconds.
            cpu_rt_runtime (int): Limit CPU real-time runtime in microseconds.
            cpu_shares (int): CPU shares (relative weight).
            cpuset_cpus (str): CPUs in which to allow execution (``0-3``,
                ``0,1``).
            cpuset_mems (str): Memory nodes (MEMs) in which to allow execution
                (``0-3``, ``0,1``). Only effective on NUMA systems.
            detach (bool): Run container in the background and return a
                :py:class:`Container` object.
            device_cgroup_rules (:py:class:`list`): A list of cgroup rules to
                apply to the container.
            device_read_bps: Limit read rate (bytes per second) from a device
                in the form of: `[{"Path": "device_path", "Rate": rate}]`
            device_read_iops: Limit read rate (IO per second) from a device.
            device_write_bps: Limit write rate (bytes per second) from a
                device.
            device_write_iops: Limit write rate (IO per second) from a device.
            devices (:py:class:`list`): Expose host devices to the container,
                as a list of strings in the form
                ``<path_on_host>:<path_in_container>:<cgroup_permissions>``.
    
                For example, ``/dev/sda:/dev/xvda:rwm`` allows the container
                to have read-write access to the host's ``/dev/sda`` via a
                node named ``/dev/xvda`` inside the container.
            device_requests (:py:class:`list`): Expose host resources such as
                GPUs to the container, as a list of
                :py:class:`docker.types.DeviceRequest` instances.
            dns (:py:class:`list`): Set custom DNS servers.
            dns_opt (:py:class:`list`): Additional options to be added to the
                container's ``resolv.conf`` file.
            dns_search (:py:class:`list`): DNS search domains.
            domainname (str or list): Set custom DNS search domains.
            entrypoint (str or list): The entrypoint for the container.
            environment (dict or list): Environment variables to set inside
                the container, as a dictionary or a list of strings in the
                format ``["SOMEVARIABLE=xxx"]``.
            extra_hosts (dict): Additional hostnames to resolve inside the
                container, as a mapping of hostname to IP address.
            group_add (:py:class:`list`): List of additional group names and/or
                IDs that the container process will run as.
            healthcheck (dict): Specify a test to perform to check that the
                container is healthy. The dict takes the following keys:
    
                - test (:py:class:`list` or str): Test to perform to determine
                    container health. Possible values:
    
                    - Empty list: Inherit healthcheck from parent image
                    - ``["NONE"]``: Disable healthcheck
                    - ``["CMD", args...]``: exec arguments directly.
                    - ``["CMD-SHELL", command]``: Run command in the system's
                      default shell.
    
                    If a string is provided, it will be used as a ``CMD-SHELL``
                    command.
                - interval (int): The time to wait between checks in
                  nanoseconds. It should be 0 or at least 1000000 (1 ms).
                - timeout (int): The time to wait before considering the check
                  to have hung. It should be 0 or at least 1000000 (1 ms).
                - retries (int): The number of consecutive failures needed to
                    consider a container as unhealthy.
                - start_period (int): Start period for the container to
                    initialize before starting health-retries countdown in
                    nanoseconds. It should be 0 or at least 1000000 (1 ms).
            hostname (str): Optional hostname for the container.
            init (bool): Run an init inside the container that forwards
                signals and reaps processes
            init_path (str): Path to the docker-init binary
            ipc_mode (str): Set the IPC mode for the container.
            isolation (str): Isolation technology to use. Default: `None`.
            kernel_memory (int or str): Kernel memory limit
            labels (dict or list): A dictionary of name-value labels (e.g.
                ``{"label1": "value1", "label2": "value2"}``) or a list of
                names of labels to set with empty values (e.g.
                ``["label1", "label2"]``)
            links (dict): Mapping of links using the
                ``{'container': 'alias'}`` format. The alias is optional.
                Containers declared in this dict will be linked to the new
                container using the provided alias. Default: ``None``.
            log_config (LogConfig): Logging configuration.
            lxc_conf (dict): LXC config.
            mac_address (str): MAC address to assign to the container.
            mem_limit (int or str): Memory limit. Accepts float values
                (which represent the memory limit of the created container in
                bytes) or a string with a units identification char
                (``100000b``, ``1000k``, ``128m``, ``1g``). If a string is
                specified without a units character, bytes are assumed as an
                intended unit.
            mem_reservation (int or str): Memory soft limit.
            mem_swappiness (int): Tune a container's memory swappiness
                behavior. Accepts number between 0 and 100.
            memswap_limit (str or int): Maximum amount of memory + swap a
                container is allowed to consume.
            mounts (:py:class:`list`): Specification for mounts to be added to
                the container. More powerful alternative to ``volumes``. Each
                item in the list is expected to be a
                :py:class:`docker.types.Mount` object.
            name (str): The name for this container.
            nano_cpus (int):  CPU quota in units of 1e-9 CPUs.
            network (str): Name of the network this container will be connected
                to at creation time. You can connect to additional networks
                using :py:meth:`Network.connect`. Incompatible with
                ``network_mode``.
            network_disabled (bool): Disable networking.
            network_mode (str): One of:
    
                - ``bridge`` Create a new network stack for the container on
                  the bridge network.
                - ``none`` No networking for this container.
                - ``container:<name|id>`` Reuse another container's network
                  stack.
                - ``host`` Use the host network stack.
                  This mode is incompatible with ``ports``.
    
                Incompatible with ``network``.
            networking_config (Dict[str, EndpointConfig]):
                Dictionary of EndpointConfig objects for each container network.
                The key is the name of the network.
                Defaults to ``None``.
    
                Used in conjuction with ``network``.
    
                Incompatible with ``network_mode``.
            oom_kill_disable (bool): Whether to disable OOM killer.
            oom_score_adj (int): An integer value containing the score given
                to the container in order to tune OOM killer preferences.
            pid_mode (str): If set to ``host``, use the host PID namespace
                inside the container.
            pids_limit (int): Tune a container's pids limit. Set ``-1`` for
                unlimited.
            platform (str): Platform in the format ``os[/arch[/variant]]``.
                Only used if the method needs to pull the requested image.
            ports (dict): Ports to bind inside the container.
    
                The keys of the dictionary are the ports to bind inside the
                container, either as an integer or a string in the form
                ``port/protocol``, where the protocol is either ``tcp``,
                ``udp``, or ``sctp``.
    
                The values of the dictionary are the corresponding ports to
                open on the host, which can be either:
    
                - The port number, as an integer. For example,
                  ``{'2222/tcp': 3333}`` will expose port 2222 inside the
                  container as port 3333 on the host.
                - ``None``, to assign a random host port. For example,
                  ``{'2222/tcp': None}``.
                - A tuple of ``(address, port)`` if you want to specify the
                  host interface. For example,
                  ``{'1111/tcp': ('127.0.0.1', 1111)}``.
                - A list of integers, if you want to bind multiple host ports
                  to a single container port. For example,
                  ``{'1111/tcp': [1234, 4567]}``.
    
                Incompatible with ``host`` network mode.
            privileged (bool): Give extended privileges to this container.
            publish_all_ports (bool): Publish all ports to the host.
            read_only (bool): Mount the container's root filesystem as read
                only.
            remove (bool): Remove the container when it has finished running.
                Default: ``False``.
            restart_policy (dict): Restart the container when it exits.
                Configured as a dictionary with keys:
    
                - ``Name`` One of ``on-failure``, or ``always``.
                - ``MaximumRetryCount`` Number of times to restart the
                  container on failure.
    
                For example:
                ``{"Name": "on-failure", "MaximumRetryCount": 5}``
    
            runtime (str): Runtime to use with this container.
            security_opt (:py:class:`list`): A list of string values to
                customize labels for MLS systems, such as SELinux.
            shm_size (str or int): Size of /dev/shm (e.g. ``1G``).
            stdin_open (bool): Keep ``STDIN`` open even if not attached.
            stdout (bool): Return logs from ``STDOUT`` when ``detach=False``.
                Default: ``True``.
            stderr (bool): Return logs from ``STDERR`` when ``detach=False``.
                Default: ``False``.
            stop_signal (str): The stop signal to use to stop the container
                (e.g. ``SIGINT``).
            storage_opt (dict): Storage driver options per container as a
                key-value mapping.
            stream (bool): If true and ``detach`` is false, return a log
                generator instead of a string. Ignored if ``detach`` is true.
                Default: ``False``.
            sysctls (dict): Kernel parameters to set in the container.
            tmpfs (dict): Temporary filesystems to mount, as a dictionary
                mapping a path inside the container to options for that path.
    
                For example:
    
                .. code-block:: python
    
                    {
                        '/mnt/vol2': '',
                        '/mnt/vol1': 'size=3G,uid=1000'
                    }
    
            tty (bool): Allocate a pseudo-TTY.
            ulimits (:py:class:`list`): Ulimits to set inside the container,
                as a list of :py:class:`docker.types.Ulimit` instances.
            use_config_proxy (bool): If ``True``, and if the docker client
                configuration file (``~/.docker/config.json`` by default)
                contains a proxy configuration, the corresponding environment
                variables will be set in the container being built.
            user (str or int): Username or UID to run commands as inside the
                container.
            userns_mode (str): Sets the user namespace mode for the container
                when user namespace remapping option is enabled. Supported
                values are: ``host``
            uts_mode (str): Sets the UTS namespace mode for the container.
                Supported values are: ``host``
            version (str): The version of the API to use. Set to ``auto`` to
                automatically detect the server's version. Default: ``1.35``
            volume_driver (str): The name of a volume driver/plugin.
            volumes (dict or list): A dictionary to configure volumes mounted
                inside the container. The key is either the host path or a
                volume name, and the value is a dictionary with the keys:
    
                - ``bind`` The path to mount the volume inside the container
                - ``mode`` Either ``rw`` to mount the volume read/write, or
                  ``ro`` to mount it read-only.
    
                For example:
    
                .. code-block:: python
    
                    {'/home/user1/': {'bind': '/mnt/vol2', 'mode': 'rw'},
                     '/var/www': {'bind': '/mnt/vol1', 'mode': 'ro'}}
    
                Or a list of strings which each one of its elements specifies a
                mount volume.
    
                For example:
    
                .. code-block:: python
    
                    ['/home/user1/:/mnt/vol2','/var/www:/mnt/vol1']
    
            volumes_from (:py:class:`list`): List of container names or IDs to
                get volumes from.
            working_dir (str): Path to the working directory.
    
        Returns:
            The container logs, either ``STDOUT``, ``STDERR``, or both,
            depending on the value of the ``stdout`` and ``stderr`` arguments.
    
            ``STDOUT`` and ``STDERR`` may be read only if either ``json-file``
            or ``journald`` logging driver used. Thus, if you are using none of
            these drivers, a ``None`` object is returned instead. See the
            `Engine API documentation
            <https://docs.docker..../engine/api/v1.30/#operation/ContainerLogs/>`_
            for full details.
    
            If ``detach`` is ``True``, a :py:class:`Container` object is
            returned instead.
    
        Raises:
            :py:class:`docker.errors.ContainerError`
                If the container exits with a non-zero exit code and
                ``detach`` is ``False``.
            :py:class:`docker.errors.ImageNotFound`
                If the specified image does not exist.
            :py:class:`docker.errors.APIError`
                If the server returns an error.
        """
        if isinstance(image, Image):
            image = image.id
        stream = kwargs.pop('stream', False)
        detach = kwargs.pop('detach', False)
        platform = kwargs.get('platform', None)
    
        if detach and remove:
            if version_gte(self.client.api._version, '1.25'):
                kwargs["auto_remove"] = True
            else:
                raise RuntimeError("The options 'detach' and 'remove' cannot "
                                   "be used together in api versions < 1.25.")
    
        if kwargs.get('network') and kwargs.get('network_mode'):
            raise RuntimeError(
                'The options "network" and "network_mode" can not be used '
                'together.'
            )
    
        if kwargs.get('networking_config') and not kwargs.get('network'):
            raise RuntimeError(
                'The option "networking_config" can not be used '
                'without "network".'
            )
    
        try:
            container = self.create(image=image, command=command,
                                    detach=detach, **kwargs)
        except ImageNotFound:
>           self.client.images.pull(image, platform=platform)

.venv/lib/python3.14.../docker/models/containers.py:879: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.models.images.ImageCollection object at 0x7fc471916120>
repository = 'lscr.io/linuxserver/openssh-server', tag = 'latest'
all_tags = False, kwargs = {'platform': None}, image_tag = 'latest'

    def pull(self, repository, tag=None, all_tags=False, **kwargs):
        """
        Pull an image of the given name and return it. Similar to the
        ``docker pull`` command.
        If ``tag`` is ``None`` or empty, it is set to ``latest``.
        If ``all_tags`` is set, the ``tag`` parameter is ignored and all image
        tags will be pulled.
    
        If you want to get the raw pull output, use the
        :py:meth:`~docker.api.image.ImageApiMixin.pull` method in the
        low-level API.
    
        Args:
            repository (str): The repository to pull
            tag (str): The tag to pull
            auth_config (dict): Override the credentials that are found in the
                config for this request.  ``auth_config`` should contain the
                ``username`` and ``password`` keys to be valid.
            platform (str): Platform in the format ``os[/arch[/variant]]``
            all_tags (bool): Pull all image tags
    
        Returns:
            (:py:class:`Image` or list): The image that has been pulled.
                If ``all_tags`` is True, the method will return a list
                of :py:class:`Image` objects belonging to this repository.
    
        Raises:
            :py:class:`docker.errors.APIError`
                If the server returns an error.
    
        Example:
    
            >>> # Pull the image tagged `latest` in the busybox repo
            >>> image = client.images.pull('busybox')
    
            >>> # Pull all tags in the busybox repo
            >>> images = client.images.pull('busybox', all_tags=True)
        """
        repository, image_tag = parse_repository_tag(repository)
        tag = tag or image_tag or 'latest'
    
        if 'stream' in kwargs:
            warnings.warn(
                '`stream` is not a valid parameter for this method'
                ' and will be overridden',
                stacklevel=1,
            )
            del kwargs['stream']
    
>       pull_log = self.client.api.pull(
            repository, tag=tag, stream=True, all_tags=all_tags, **kwargs
        )

.venv/lib/python3.14.../docker/models/images.py:464: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.api.client.APIClient object at 0x7fc47cb62cf0>
repository = 'lscr.io/linuxserver/openssh-server', tag = 'latest', stream = True
auth_config = None, decode = False, platform = None, all_tags = False

    def pull(self, repository, tag=None, stream=False, auth_config=None,
             decode=False, platform=None, all_tags=False):
        """
        Pulls an image. Similar to the ``docker pull`` command.
    
        Args:
            repository (str): The repository to pull
            tag (str): The tag to pull. If ``tag`` is ``None`` or empty, it
                is set to ``latest``.
            stream (bool): Stream the output as a generator. Make sure to
                consume the generator, otherwise pull might get cancelled.
            auth_config (dict): Override the credentials that are found in the
                config for this request.  ``auth_config`` should contain the
                ``username`` and ``password`` keys to be valid.
            decode (bool): Decode the JSON data from the server into dicts.
                Only applies with ``stream=True``
            platform (str): Platform in the format ``os[/arch[/variant]]``
            all_tags (bool): Pull all image tags, the ``tag`` parameter is
                ignored.
    
        Returns:
            (generator or str): The output
    
        Raises:
            :py:class:`docker.errors.APIError`
                If the server returns an error.
    
        Example:
    
            >>> resp = client.api.pull('busybox', stream=True, decode=True)
            ... for line in resp:
            ...     print(json.dumps(line, indent=4))
            {
                "status": "Pulling image (latest) from busybox",
                "progressDetail": {},
                "id": "e72ac664f4f0"
            }
            {
                "status": "Pulling image (latest) from busybox, endpoint: ...",
                "progressDetail": {},
                "id": "e72ac664f4f0"
            }
    
        """
        repository, image_tag = utils.parse_repository_tag(repository)
        tag = tag or image_tag or 'latest'
    
        if all_tags:
            tag = None
    
        registry, repo_name = auth.resolve_repository_name(repository)
    
        params = {
            'tag': tag,
            'fromImage': repository
        }
        headers = {}
    
        if auth_config is None:
            header = auth.get_config_header(self, registry)
            if header:
                headers['X-Registry-Auth'] = header
        else:
            log.debug('Sending supplied auth config')
            headers['X-Registry-Auth'] = auth.encode_header(auth_config)
    
        if platform is not None:
            if utils.version_lt(self._version, '1.32'):
                raise errors.InvalidVersion(
                    'platform was only introduced in API version 1.32'
                )
            params['platform'] = platform
    
        response = self._post(
            self._url('/images/create'), params=params, headers=headers,
            stream=stream, timeout=None
        )
    
>       self._raise_for_status(response)

.venv/lib/python3.14.../docker/api/image.py:429: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <docker.api.client.APIClient object at 0x7fc47cb62cf0>
response = <Response [500]>

    def _raise_for_status(self, response):
        """Raises stored :class:`APIError`, if one occurred."""
        try:
            response.raise_for_status()
        except requests.exceptions.HTTPError as e:
>           raise create_api_error_from_http_exception(e) from e
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.venv/lib/python3.14.../docker/api/client.py:277: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

e = HTTPError('500 Server Error: Internal Server Error for url: http+docker:.../localhost/v1.48/images/create?tag=latest&fromImage=lscr.io%2Flinuxserver%2Fopenssh-server')

    def create_api_error_from_http_exception(e):
        """
        Create a suitable APIError from requests.exceptions.HTTPError.
        """
        response = e.response
        try:
            explanation = response.json()['message']
        except ValueError:
            explanation = (response.text or '').strip()
        cls = APIError
        if response.status_code == 404:
            explanation_msg = (explanation or '').lower()
            if any(fragment in explanation_msg
                   for fragment in _image_not_found_explanation_fragments):
                cls = ImageNotFound
            else:
                cls = NotFound
>       raise cls(e, response=response, explanation=explanation) from e
E       docker.errors.APIError: 500 Server Error for http+docker:.../localhost/v1.48/images/create?tag=latest&fromImage=lscr.io%2Flinuxserver%2Fopenssh-server: Internal Server Error ("Head "https://ghcr..../openssh-server/manifests/latest": dial tcp 140.82.114.33:443: i/o timeout")

.venv/lib/python3.14....../site-packages/docker/errors.py:39: APIError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant