Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apps/dot_ext/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import re
from datetime import timedelta

# REGEX of paths that should be updated with auth flow info in hhs_oauth_server.request_logging.py
AUTH_FLOW_REQUEST_LOGGING_PATHS_REGEX = ('(^/v[1|2|3]/o/authorize/.*'
Expand Down Expand Up @@ -581,6 +582,8 @@
' the support team for the application you are trying to access.'
)

CLIENT_CREDENTIALS = 'client_credentials'

AUTH_CODE_TYPE = 'AUTH_CODE'
CLIENT_CREDENTIALS_TYPE = 'CLIENT_CREDENTIALS'
AUTH_CODE_AND_CLIENT_CREDENTIALS_TYPE = 'AUTH_CODE_AND_CLIENT_CREDS'
Expand All @@ -589,6 +592,9 @@
AUTH_CODE_SUPPORTED_TYPES = [AUTH_CODE_TYPE, AUTH_CODE_AND_CLIENT_CREDENTIALS_TYPE]
JWKS_URI_CAN_NOT_BE_NULL_ALLOWED_AUTH_TYPES = ['CLIENT_CREDENTIALS', 'AUTH_CODE_AND_CLIENT_CREDS']

CLIENT_CREDENTIALS_ACCESS_TOKEN_LIFETIME = timedelta(minutes=30)
CLIENT_CREDENTIALS_REFRESH_WINDOW = timedelta(hours=24)

YYYY_MM_DD_REGEX = r'^\d{4}-(0[1-9]|1[0-2])-(0[1-9]|[12]\d|3[01])$'

CC_SYSTEM_CODING_SYSTEM = 'http://terminology.hl7.org/CodeSystem/v2-0203'
Expand Down
37 changes: 36 additions & 1 deletion apps/dot_ext/tests/test_authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import uuid
from waffle.testutils import override_switch
from apps.fhir.bluebutton.models import Crosswalk
from apps.constants import CODE_CHALLENGE_METHOD_S256
from apps.constants import CLIENT_CREDENTIALS, CODE_CHALLENGE_METHOD_S256
from apps.authorization.models import DataAccessGrant, ArchivedDataAccessGrant
from apps.dot_ext.models import Application, ArchivedToken
from apps.dot_ext.views import AuthorizationView, TokenView
Expand Down Expand Up @@ -238,6 +238,41 @@ def test_post_with_invalid_non_standard_scheme_granttype_authcode_clienttype_con
response = self.client.post(reverse('oauth2_provider:authorize'), data=payload)
self.assertEqual(response.status_code, 400)

def test_authorize_rejects_authorization_code_when_app_only_allows_client_credentials(self):
redirect_uri = 'http://localhost'
self._create_user('anna', '123456')
capability_a = self._create_capability('Capability A', [])
capability_b = self._create_capability('Capability B', [])
application = self._create_application(
'an app',
grant_type=Application.GRANT_AUTHORIZATION_CODE,
client_type=Application.CLIENT_CONFIDENTIAL,
redirect_uris=redirect_uri)
application.scope.add(capability_a, capability_b)
application.jwks_uri = 'https://test.com'
application.allowed_auth_type = CLIENT_CREDENTIALS.upper()
application.save()

request = HttpRequest()
self.client.login(request=request, username='anna', password='123456')

payload = {
'client_id': application.client_id,
'response_type': 'code',
'redirect_uri': redirect_uri,
'scope': ['capability-a'],
'expires_in': 86400,
'allow': True,
'state': '0123456789abcdef',
}
response = self.client.post(reverse('oauth2_provider:authorize'), data=payload)

self.assertEqual(response.status_code, HTTPStatus.FOUND)
query_dict = parse_qs(urlparse(response['Location']).query)
self.assertEqual(query_dict.get('error'), ['unauthorized_client'])
self.assertEqual(query_dict.get('state'), ['0123456789abcdef'])
self.assertNotIn('code', query_dict)

@override_switch('v3_endpoints', active=True)
def test_refresh_token(self):
redirect_uri = 'http://localhost'
Expand Down
176 changes: 137 additions & 39 deletions apps/dot_ext/tests/test_authorization_token.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
import json
from datetime import timedelta
from oauthlib.oauth2.rfc6749.errors import InvalidRequestError
from oauthlib.oauth2.rfc6749.errors import InvalidGrantError
from oauth2_provider.models import get_access_token_model, get_refresh_token_model
from django.http import HttpRequest
from django.utils import timezone
from unittest.mock import MagicMock
from unittest import skipIf
from urllib.parse import parse_qs, urlencode, urlparse
from urllib.parse import urlencode
from waffle.testutils import override_switch
from apps.constants import CLIENT_CREDENTIALS, CODE_CHALLENGE_METHOD_S256, TEST_APP_CLIENT_ID, TEST_APP_CLIENT_SECRET
from apps.constants import (
CLIENT_CREDENTIALS,
TEST_APP_CLIENT_ID,
TEST_APP_CLIENT_SECRET,
USER_TYPE_ALIGNED_NETWORKS_BENEFICIARY,
)
from apps.accounts.models import UserProfile
from apps.dot_ext.constants import (
APPLICATION_DOES_NOT_HAVE_CLIENT_CREDENTIALS_ENABLED,
APPLICATION_HAS_CLIENT_CREDENTIALS_ENABLED_NON_CLIENT_CREDENTIALS_AUTH_CALL_MADE,
CLIENT_ASSERTION_TYPE_VALUE,
CLIENT_CREDENTIALS_ACCESS_TOKEN_LIFETIME,
CLIENT_CREDENTIALS_REFRESH_WINDOW,
CLIENT_CREDENTIALS_TYPE,
AUTH_CODE_TYPE,
)
Expand Down Expand Up @@ -86,10 +96,7 @@ def test_client_credentials(self):

@override_switch('v3_endpoints', active=True)
def test_authorization_code_grant_type_when_app_is_only_allowed_client_credentials(self):
"""Purpose of this test is to show that if a call is made to the token endpoint, and the app has
allowed_auth_type of CLIENT_CREDENTIALS, and the grant_type is not client_credentials, that a 403 error
with a specific message will be returned
"""
"""Ensure the token endpoint rejects non-client_credentials requests for client_credentials-only apps."""
redirect_uri = 'com.custom.bluebutton://example.it'
# create a user
self._create_user('anna', '123456')
Expand All @@ -105,40 +112,9 @@ def test_authorization_code_grant_type_when_app_is_only_allowed_client_credentia
application.jwks_uri = 'https://test.com'
application.allowed_auth_type = AUTH_CODE_TYPE # need to set this to be able to generate auth code initially
application.save()
# user logs in
request = HttpRequest()
self.client.login(request=request, username='anna', password='123456')

code_challenge = "sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo"

payload = {
'client_id': application.client_id,
'response_type': 'code',
'redirect_uri': redirect_uri,
'code_challenge': code_challenge,
'code_challenge_method': CODE_CHALLENGE_METHOD_S256,
}
response = self.client.get('/v3/o/authorize', data=payload)
# post the authorization form with only one scope selected
payload = {
'client_id': application.client_id,
'response_type': 'code',
'redirect_uri': redirect_uri,
'scope': ['capability-a'],
'expires_in': 86400,
'allow': True,
"state": "0123456789abcdef",
'code_challenge': code_challenge,
'code_challenge_method': CODE_CHALLENGE_METHOD_S256,
}
response = self.client.post(response['Location'], data=payload)

self.assertEqual(response.status_code, HTTPStatus.FOUND)
query_dict = parse_qs(urlparse(response['Location']).query)
authorization_code = query_dict.pop('code')
token_request_data = {
'grant_type': 'authorization_code',
'code': authorization_code,
'code': 'dummy-auth-code',
'redirect_uri': redirect_uri,
'client_id': application.client_id,
}
Expand Down Expand Up @@ -200,3 +176,125 @@ def test_validate_client_credentials_request(self) -> None:

result = view_instance._validate_client_credentials_request(mock_request)
assert result is None

def test_apply_client_credentials_access_token_lifetime(self) -> None:
view_instance = TokenView()

application = self._create_application(
'cc app lifetime',
grant_type=Application.GRANT_CLIENT_CREDENTIALS,
client_type=Application.CLIENT_CONFIDENTIAL,
redirect_uris='http://localhost',
)
user = self._create_user('cc_lifetime_user', 'pw')

token = AccessToken.objects.create(
user=user,
token='cc-lifetime-token',
application=application,
expires=timezone.now() + timedelta(days=1),
)

body = {'expires_in': 36000}
view_instance._apply_client_credentials_access_token_lifetime(token, body)
token.refresh_from_db()

self.assertEqual(body['expires_in'], int(CLIENT_CREDENTIALS_ACCESS_TOKEN_LIFETIME.total_seconds()))
expected_expires = timezone.now() + CLIENT_CREDENTIALS_ACCESS_TOKEN_LIFETIME
self.assertLessEqual(abs((token.expires - expected_expires).total_seconds()), 5)

def test_validate_client_credentials_refresh_window_expired(self) -> None:
view_instance = TokenView()

application = self._create_application(
'cc app refresh',
grant_type=Application.GRANT_CLIENT_CREDENTIALS,
client_type=Application.CLIENT_CONFIDENTIAL,
redirect_uris='http://localhost',
)
user = self._create_user('cc_refresh_user', 'pw')
UserProfile.objects.update_or_create(
user=user,
defaults={'user_type': USER_TYPE_ALIGNED_NETWORKS_BENEFICIARY},
)
user.crosswalk.user_id_type = 'M'
user.crosswalk.save(update_fields=['user_id_type'])

access_token = AccessToken.objects.create(
user=user,
token='cc-refresh-token',
application=application,
expires=timezone.now() + timedelta(hours=1),
)
refresh_token = RefreshToken.objects.create(
user=user,
token='cc-refresh-token-value',
application=application,
access_token=access_token,
)

access_token.created = timezone.now() - CLIENT_CREDENTIALS_REFRESH_WINDOW - timedelta(seconds=1)
access_token.expires = access_token.created + CLIENT_CREDENTIALS_ACCESS_TOKEN_LIFETIME
access_token.save(update_fields=['created', 'expires'])

with self.assertRaises(InvalidGrantError):
view_instance._validate_client_credentials_refresh_window(refresh_token)

def test_validate_client_credentials_refresh_window_ignores_non_anb_user(self) -> None:
view_instance = TokenView()

application = self._create_application(
'cc app refresh non anb',
grant_type=Application.GRANT_CLIENT_CREDENTIALS,
client_type=Application.CLIENT_CONFIDENTIAL,
redirect_uris='http://localhost',
)
user = self._create_user('cc_non_anb_refresh_user', 'pw')

access_token = AccessToken.objects.create(
user=user,
token='cc-refresh-token-non-anb',
application=application,
expires=timezone.now() + timedelta(hours=1),
)
refresh_token = RefreshToken.objects.create(
user=user,
token='cc-refresh-token-value-non-anb',
application=application,
access_token=access_token,
)

access_token.created = timezone.now() - CLIENT_CREDENTIALS_REFRESH_WINDOW - timedelta(seconds=1)
access_token.expires = access_token.created + CLIENT_CREDENTIALS_ACCESS_TOKEN_LIFETIME
access_token.save(update_fields=['created', 'expires'])

# Non-ANB beneficiaries should not use client_credentials refresh-chain expiration checks.
view_instance._validate_client_credentials_refresh_window(refresh_token)

def test_is_client_credentials_access_token_uses_token_lifetime_for_refresh(self) -> None:
view_instance = TokenView()

application = self._create_application(
'cc token-tied refresh app',
grant_type=Application.GRANT_AUTHORIZATION_CODE,
client_type=Application.CLIENT_CONFIDENTIAL,
redirect_uris='http://localhost',
)
user = self._create_user('cc_token_tied_user', 'pw')
UserProfile.objects.update_or_create(
user=user,
defaults={'user_type': USER_TYPE_ALIGNED_NETWORKS_BENEFICIARY},
)
user.crosswalk.user_id_type = 'M'
user.crosswalk.save(update_fields=['user_id_type'])

access_token = AccessToken.objects.create(
user=user,
token='cc-token-tied-refresh-token',
application=application,
expires=timezone.now() + CLIENT_CREDENTIALS_ACCESS_TOKEN_LIFETIME,
)

self.assertTrue(
view_instance._is_client_credentials_access_token(access_token, 'refresh_token')
)
Loading
Loading