diff --git a/apps/authorization/tests/test_data_access_grant.py b/apps/authorization/tests/test_data_access_grant.py index 99bda5e19..b73e55927 100644 --- a/apps/authorization/tests/test_data_access_grant.py +++ b/apps/authorization/tests/test_data_access_grant.py @@ -35,7 +35,9 @@ class TestDataAccessGrant(BaseApiTest): @staticmethod def _create_authorization_header(client_id, client_secret): - return 'Basic {0}'.format(base64.b64encode('{0}:{1}'.format(client_id, client_secret).encode('utf-8')).decode('utf-8')) + return 'Basic {0}'.format( + base64.b64encode('{0}:{1}'.format(client_id, client_secret).encode('utf-8')).decode('utf-8') + ) def test_create_update_delete(self): # 1. Test create and default expiration_date @@ -76,7 +78,9 @@ def test_create_update_delete(self): dag.delete() # Verify it does exist and archived. - arch_dag = ArchivedDataAccessGrant.objects.get(beneficiary__username='test_beneficiary', application__name='test_app') + arch_dag = ArchivedDataAccessGrant.objects.get( + beneficiary__username='test_beneficiary', application__name='test_app' + ) # Verify expiration_date copied OK. self.assertEqual('2030-01-15 00:00:00+00:00', str(arch_dag.expiration_date)) @@ -190,6 +194,7 @@ def test_delete_authenticated_user_grant(self): def setup_test_application_with_user(self, test_user, application_name='an app'): redirect_uri = 'http://localhost' + code_challenge = 'sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo' capability_a = self._create_capability('Capability A', []) capability_b = self._create_capability('Capability B', []) # create an application and add capabilities @@ -207,6 +212,9 @@ def setup_test_application_with_user(self, test_user, application_name='an app') 'client_id': application.client_id, 'response_type': 'code', 'redirect_uri': redirect_uri, + 'state': '0123456789abcdef', + 'code_challenge': code_challenge, + 'code_challenge_method': 'S256', } response = self.client.get('/v1/o/authorize', data=payload) # post the authorization form with only one scope selected @@ -218,6 +226,8 @@ def setup_test_application_with_user(self, test_user, application_name='an app') 'expires_in': 86400, 'allow': True, 'state': '0123456789abcdef', + 'code_challenge': code_challenge, + 'code_challenge_method': 'S256', } response = self.client.post(response['Location'], data=payload) self.assertEqual(response.status_code, 302) @@ -229,6 +239,7 @@ def setup_test_application_with_user(self, test_user, application_name='an app') 'code': authorization_code, 'redirect_uri': redirect_uri, 'client_id': application.client_id, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } response = self.client.post(reverse('oauth2_provider:token'), data=token_request_data) fhir_id = json.loads(response.content)['patient'] @@ -237,6 +248,7 @@ def setup_test_application_with_user(self, test_user, application_name='an app') def test_no_action_on_reapproval(self): redirect_uri = 'http://localhost' + code_challenge = 'sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo' user = self._create_user('anna', '123456') application, fhir_id = self.setup_test_application_with_user(user) @@ -248,6 +260,9 @@ def test_no_action_on_reapproval(self): 'client_id': application.client_id, 'response_type': 'code', 'redirect_uri': redirect_uri, + 'state': '0123456789abcdef', + 'code_challenge': code_challenge, + 'code_challenge_method': 'S256', } response = self.client.get('/v1/o/authorize', data=payload) # post the authorization form with only one scope selected @@ -259,6 +274,8 @@ def test_no_action_on_reapproval(self): 'expires_in': 86400, 'allow': True, 'state': '0123456789abcdef', + 'code_challenge': code_challenge, + 'code_challenge_method': 'S256', } response = self.client.post(response['Location'], data=payload) @@ -271,6 +288,7 @@ def test_no_action_on_reapproval(self): 'code': authorization_code, 'redirect_uri': redirect_uri, 'client_id': application.client_id, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } response = self.client.post(reverse('oauth2_provider:token'), data=token_request_data) self.assertEqual(response.status_code, 200) @@ -342,6 +360,7 @@ def test_permission_deny_on_app_or_org_disabled(self): to an application or applications under a user (organization) """ redirect_uri = 'http://localhost' + code_challenge = 'sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo' # create a user user = self._create_user('anna', '123456') capability_a = self._create_capability('Capability A', []) @@ -363,6 +382,9 @@ def test_permission_deny_on_app_or_org_disabled(self): 'client_id': application.client_id, 'response_type': 'code', 'redirect_uri': redirect_uri, + 'state': '0123456789abcdef', + 'code_challenge': code_challenge, + 'code_challenge_method': 'S256', } response = self.client.get('/v1/o/authorize', data=payload) payload = { @@ -373,6 +395,8 @@ def test_permission_deny_on_app_or_org_disabled(self): 'expires_in': 86400, 'allow': True, 'state': '0123456789abcdef', + 'code_challenge': code_challenge, + 'code_challenge_method': 'S256', } response = self.client.post(response['Location'], data=payload) diff --git a/apps/dot_ext/tests/test_authorization.py b/apps/dot_ext/tests/test_authorization.py index 6eb4797cc..094a239e8 100644 --- a/apps/dot_ext/tests/test_authorization.py +++ b/apps/dot_ext/tests/test_authorization.py @@ -42,6 +42,13 @@ def _create_authorization_header(self, client_id, client_secret): base64.b64encode('{0}:{1}'.format(client_id, client_secret).encode('utf-8')).decode('utf-8') ) + def _add_pkce_defaults(self, payload): + auth_payload = dict(payload) + auth_payload.setdefault('state', '0123456789abcdef') + auth_payload.setdefault('code_challenge', 'sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo') + auth_payload.setdefault('code_challenge_method', CODE_CHALLENGE_METHOD_S256) + return auth_payload + def test_post_with_valid_non_standard_scheme_granttype_authcode_clienttype_public(self): # Test with application setup as grant_type=authorization_code and client_type=public redirect_uri = 'com.custom.bluebutton://example.it' @@ -71,7 +78,7 @@ def test_post_with_valid_non_standard_scheme_granttype_authcode_clienttype_publi 'code_challenge': code_challenge, 'code_challenge_method': CODE_CHALLENGE_METHOD_S256, } - response = self.client.get('/v1/o/authorize', data=payload) + response = self.client.get('/v1/o/authorize', data=self._add_pkce_defaults(payload)) # post the authorization form with only one scope selected payload = { 'client_id': application.client_id, @@ -95,6 +102,7 @@ def test_post_with_valid_non_standard_scheme_granttype_authcode_clienttype_publi 'code': authorization_code, 'redirect_uri': redirect_uri, 'client_id': application.client_id, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } # Test that using a BAD code_verifier has a bad request response token_request_data.update({'code_verifier': 'test1234567bad9verifier23456789123456789123456789'}) @@ -138,7 +146,7 @@ def test_post_with_invalid_non_standard_scheme_granttype_authcode_clienttype_pub 'allow': True, 'state': '0123456789abcdef', } - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + response = self.client.post(reverse('oauth2_provider:authorize'), data=self._add_pkce_defaults(payload)) self.assertEqual(response.status_code, 400) def test_post_with_valid_non_standard_scheme_granttype_authcode_clienttype_confidential(self): @@ -170,7 +178,7 @@ def test_post_with_valid_non_standard_scheme_granttype_authcode_clienttype_confi 'code_challenge': code_challenge, 'code_challenge_method': CODE_CHALLENGE_METHOD_S256, } - response = self.client.get('/v1/o/authorize', data=payload) + response = self.client.get('/v1/o/authorize', data=self._add_pkce_defaults(payload)) # post the authorization form with only one scope selected payload = { 'client_id': application.client_id, @@ -194,6 +202,7 @@ def test_post_with_valid_non_standard_scheme_granttype_authcode_clienttype_confi 'code': authorization_code, 'redirect_uri': redirect_uri, 'client_id': application.client_id, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } # Test that request is unauthorized WITH OUT the client_secret. token_request_data.update({'code_verifier': 'test123456789123456789123456789123456789123456789'}) @@ -246,7 +255,7 @@ def test_post_with_invalid_non_standard_scheme_granttype_authcode_clienttype_con 'allow': True, 'state': '0123456789abcdef', } - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + response = self.client.post(reverse('oauth2_provider:authorize'), data=self._add_pkce_defaults(payload)) self.assertEqual(response.status_code, 400) @override_switch('v3_endpoints', active=True) @@ -279,7 +288,7 @@ def test_refresh_token(self): 'allow': True, 'state': '0123456789abcdef', } - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + response = self.client.post(reverse('oauth2_provider:authorize'), data=self._add_pkce_defaults(payload)) self.client.logout() self.assertEqual(response.status_code, 302) # now extract the authorization code and use it to request an access_token @@ -291,6 +300,7 @@ def test_refresh_token(self): 'redirect_uri': redirect_uri, 'client_id': application.client_id, 'client_secret': application.client_secret_plain, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } c = Client() response = c.post(f'/v{Versions.V2}/o/token/', data=token_request_data) @@ -356,7 +366,7 @@ def test_refresh_with_expired_token(self): 'allow': True, 'state': '0123456789abcdef', } - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + response = self.client.post(reverse('oauth2_provider:authorize'), data=self._add_pkce_defaults(payload)) self.client.logout() self.assertEqual(response.status_code, 302) # now extract the authorization code and use it to request an access_token @@ -368,6 +378,7 @@ def test_refresh_with_expired_token(self): 'redirect_uri': redirect_uri, 'client_id': application.client_id, 'client_secret': application.client_secret_plain, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } c = Client() response = c.post('/v1/o/token/', data=token_request_data) @@ -414,7 +425,7 @@ def test_refresh_13_month_with_expired_grant(self): 'allow': True, 'state': '0123456789abcdef', } - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + response = self.client.post(reverse('oauth2_provider:authorize'), data=self._add_pkce_defaults(payload)) self.client.logout() self.assertEqual(response.status_code, 302) # now extract the authorization code and use it to request an access_token @@ -426,6 +437,7 @@ def test_refresh_13_month_with_expired_grant(self): 'redirect_uri': redirect_uri, 'client_id': application.client_id, 'client_secret': application.client_secret_plain, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } c = Client() response = c.post('/v1/o/token/', data=token_request_data) @@ -475,7 +487,7 @@ def test_refresh_with_one_time_access_retrieve_app_using_refresh_token(self): 'allow': True, 'state': '0123456789abcdef', } - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + response = self.client.post(reverse('oauth2_provider:authorize'), data=self._add_pkce_defaults(payload)) self.client.logout() self.assertEqual(response.status_code, 302) # now extract the authorization code and use it to request an access_token @@ -487,6 +499,7 @@ def test_refresh_with_one_time_access_retrieve_app_using_refresh_token(self): 'redirect_uri': redirect_uri, 'client_id': application.client_id, 'client_secret': application.client_secret_plain, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } c = Client() response = c.post('/v1/o/token/', data=token_request_data) @@ -532,7 +545,7 @@ def test_refresh_with_one_time_access_retrieve_app_from_auth_header(self): 'allow': True, 'state': '0123456789abcdef', } - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + response = self.client.post(reverse('oauth2_provider:authorize'), data=self._add_pkce_defaults(payload)) self.client.logout() self.assertEqual(response.status_code, 302) # now extract the authorization code and use it to request an access_token @@ -544,6 +557,7 @@ def test_refresh_with_one_time_access_retrieve_app_from_auth_header(self): 'redirect_uri': redirect_uri, 'client_id': application.client_id, 'client_secret': application.client_secret_plain, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } c = Client() response = c.post('/v1/o/token/', data=token_request_data) @@ -596,7 +610,7 @@ def test_refresh_token_with_scope_parameter_valid_request(self): 'allow': True, 'state': '0123456789abcdef', } - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + response = self.client.post(reverse('oauth2_provider:authorize'), data=self._add_pkce_defaults(payload)) self.client.logout() self.assertEqual(response.status_code, 302) # now extract the authorization code and use it to request an access_token @@ -609,6 +623,7 @@ def test_refresh_token_with_scope_parameter_valid_request(self): 'redirect_uri': redirect_uri, 'client_id': application.client_id, 'client_secret': application.client_secret_plain, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } c = Client() response = c.post(f'/v{Versions.V2}/o/token/', data=token_request_data) @@ -665,7 +680,7 @@ def test_refresh_token_with_scope_parameter_invalid_request(self): 'allow': True, 'state': '0123456789abcdef', } - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + response = self.client.post(reverse('oauth2_provider:authorize'), data=self._add_pkce_defaults(payload)) self.client.logout() self.assertEqual(response.status_code, 302) # now extract the authorization code and use it to request an access_token @@ -678,6 +693,7 @@ def test_refresh_token_with_scope_parameter_invalid_request(self): 'redirect_uri': redirect_uri, 'client_id': application.client_id, 'client_secret': application.client_secret_plain, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } c = Client() response = c.post(f'/v{Versions.V2}/o/token/', data=token_request_data) @@ -741,7 +757,7 @@ def test_dag_expiration_exists(self): 'allow': True, 'state': '0123456789abcdef', } - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + response = self.client.post(reverse('oauth2_provider:authorize'), data=self._add_pkce_defaults(payload)) self.client.logout() # now extract the authorization code and use it to request an access_token @@ -753,6 +769,7 @@ def test_dag_expiration_exists(self): 'redirect_uri': redirect_uri, 'client_id': application.client_id, 'client_secret': application.client_secret_plain, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } c = Client() response = c.post('/v1/o/token/', data=token_request_data) @@ -798,7 +815,7 @@ def test_revoke_endpoint(self): 'allow': True, 'state': '0123456789abcdef', } - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + response = self.client.post(reverse('oauth2_provider:authorize'), data=self._add_pkce_defaults(payload)) self.client.logout() self.assertEqual(response.status_code, 302) # now extract the authorization code and use it to request an access_token @@ -810,6 +827,7 @@ def test_revoke_endpoint(self): 'redirect_uri': redirect_uri, 'client_id': application.client_id, 'client_secret': application.client_secret_plain, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } c = Client() response = c.post('/v1/o/token/', data=token_request_data) @@ -857,7 +875,7 @@ def test_refresh_with_revoked_token(self): 'allow': True, 'state': '0123456789abcdef', } - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + response = self.client.post(reverse('oauth2_provider:authorize'), data=self._add_pkce_defaults(payload)) self.client.logout() self.assertEqual(response.status_code, 302) # now extract the authorization code and use it to request an access_token @@ -869,6 +887,7 @@ def test_refresh_with_revoked_token(self): 'redirect_uri': redirect_uri, 'client_id': application.client_id, 'client_secret': application.client_secret_plain, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } c = Client() response = c.post('/v1/o/token/', data=token_request_data) @@ -925,7 +944,7 @@ def test_application_delete_after_auth(self): 'allow': True, 'state': '0123456789abcdef', } - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + response = self.client.post(reverse('oauth2_provider:authorize'), data=self._add_pkce_defaults(payload)) self.client.logout() self.assertEqual(response.status_code, 302) # now extract the authorization code and use it to request an access_token @@ -937,6 +956,7 @@ def test_application_delete_after_auth(self): 'redirect_uri': redirect_uri, 'client_id': application.client_id, 'client_secret': application.client_secret_plain, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } c = Client() response = c.post('/v1/o/token/', data=token_request_data) @@ -983,7 +1003,7 @@ def test_user_delete_after_auth(self): 'allow': True, 'state': '0123456789abcdef', } - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + response = self.client.post(reverse('oauth2_provider:authorize'), data=self._add_pkce_defaults(payload)) self.client.logout() self.assertEqual(response.status_code, 302) # now extract the authorization code and use it to request an access_token @@ -995,6 +1015,7 @@ def test_user_delete_after_auth(self): 'redirect_uri': redirect_uri, 'client_id': application.client_id, 'client_secret': application.client_secret_plain, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } c = Client() response = c.post('/v1/o/token/', data=token_request_data) @@ -1045,7 +1066,7 @@ def test_revoked_token_on_inactive_app(self): 'allow': True, 'state': '0123456789abcdef', } - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + response = self.client.post(reverse('oauth2_provider:authorize'), data=self._add_pkce_defaults(payload)) self.client.logout() self.assertEqual(response.status_code, 302) # now extract the authorization code and use it to request an access_token @@ -1057,6 +1078,7 @@ def test_revoked_token_on_inactive_app(self): 'redirect_uri': redirect_uri, 'client_id': application.client_id, 'client_secret': application.client_secret_plain, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } c = Client() response = c.post('/v1/o/token/', data=token_request_data) @@ -1116,7 +1138,7 @@ def test_introspect_token_on_inactive_app(self): 'allow': True, 'state': '0123456789abcdef', } - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + response = self.client.post(reverse('oauth2_provider:authorize'), data=self._add_pkce_defaults(payload)) self.client.logout() self.assertEqual(response.status_code, 302) # now extract the authorization code and use it to request an access_token @@ -1128,6 +1150,7 @@ def test_introspect_token_on_inactive_app(self): 'redirect_uri': redirect_uri, 'client_id': application.client_id, 'client_secret': application.client_secret_plain, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } c = Client() response = c.post('/v1/o/token/', data=token_request_data) @@ -1203,7 +1226,7 @@ def _execute_token_endpoint(self, token_path): 'allow': True, 'state': '0123456789abcdef', } - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + response = self.client.post(reverse('oauth2_provider:authorize'), data=self._add_pkce_defaults(payload)) self.client.logout() self.assertEqual(response.status_code, 302) # now extract the authorization code and use it to request an access_token @@ -1215,6 +1238,7 @@ def _execute_token_endpoint(self, token_path): 'redirect_uri': redirect_uri, 'client_id': application.client_id, 'client_secret': application.client_secret_plain, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } c = Client() response = c.post(token_path, data=token_request_data) @@ -1321,6 +1345,7 @@ def test_cancel_button_clicked_flow_thirteen_month_data_access_type(self): That we do not delete the associated data_access_grant, access_token, and refresh_token """ redirect_uri = 'http://localhost' + code_challenge = 'sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo' # create a user self._create_user('anna', '123456') capability_patient = self._create_capability('patient/Patient.rs', []) @@ -1350,8 +1375,10 @@ def test_cancel_button_clicked_flow_thirteen_month_data_access_type(self): 'expires_in': 86400, 'allow': False, 'state': '0123456789abcdef', + 'code_challenge': code_challenge, + 'code_challenge_method': CODE_CHALLENGE_METHOD_S256, } - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + response = self.client.post(reverse('oauth2_provider:authorize'), data=self._add_pkce_defaults(payload)) self.assertEqual(response.status_code, 302) query_dict = parse_qs(urlparse(response['Location']).query) @@ -1369,6 +1396,7 @@ def test_cancel_button_clicked_flow_one_time_data_access_type(self): That we do not delete the associated data_access_grant, access_token, and refresh_token """ redirect_uri = 'http://localhost' + code_challenge = 'sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo' # create a user self._create_user('anna', '123456') capability_patient = self._create_capability('patient/Patient.rs', []) @@ -1399,8 +1427,10 @@ def test_cancel_button_clicked_flow_one_time_data_access_type(self): 'expires_in': 86400, 'allow': False, 'state': '0123456789abcdef', + 'code_challenge': code_challenge, + 'code_challenge_method': CODE_CHALLENGE_METHOD_S256, } - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + response = self.client.post(reverse('oauth2_provider:authorize'), data=self._add_pkce_defaults(payload)) self.assertEqual(response.status_code, 302) query_dict = parse_qs(urlparse(response['Location']).query) @@ -1429,18 +1459,105 @@ def test_invalid_uuid_authorize_call(self): def test_valid_uuid_authorize_call(self): """BB2-4326: Ensure a 302 is thrown if a valid UUID is passed to an authorize endpoint""" app = self._create_application('an app') + code_challenge = 'sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo' + query_data = { + 'client_id': app.client_id, + 'response_type': 'code', + 'redirect_uri': 'http://localhost', + 'state': '0123456789abcdef', + 'code_challenge': code_challenge, + 'code_challenge_method': CODE_CHALLENGE_METHOD_S256, + 'scope': 'patient/Patient.read', + } auth_uri_v1 = reverse('oauth2_provider:authorize-instance', args=[uuid.uuid4()]) auth_uri_v2 = reverse('oauth2_provider_v2:authorize-instance-v2', args=[uuid.uuid4()]) auth_uri_v3 = reverse('oauth2_provider_v3:authorize-instance-v3', args=[uuid.uuid4()]) - response_v1 = self.client.get(auth_uri_v1, data={'client_id': app.client_id}) - response_v2 = self.client.get(auth_uri_v2, data={'client_id': app.client_id}) - response_v3 = self.client.get(auth_uri_v3, data={'client_id': app.client_id}) + response_v1 = self.client.get(auth_uri_v1, data=query_data) + response_v2 = self.client.get(auth_uri_v2, data=query_data) + response_v3 = self.client.get(auth_uri_v3, data=query_data) assert response_v1.status_code == HTTPStatus.FOUND assert response_v2.status_code == HTTPStatus.FOUND assert response_v3.status_code == HTTPStatus.FOUND + @override_switch('v3_endpoints', active=True) + def test_missing_code_challenge_returns_400(self): + """Ensure a 400 is returned when code_challenge is missing from the authorize request.""" + app = self._create_application('an app') + for version in Versions.latest_versions(): + auth_uri = f'/v{version}/o/authorize/' + response = self.client.get( + auth_uri, + data={ + 'client_id': app.client_id, + 'response_type': 'code', + 'redirect_uri': 'http://localhost', + 'state': '0123456789abcdef', + 'code_challenge_method': CODE_CHALLENGE_METHOD_S256, + }, + ) + self.assertEqual(response.status_code, HTTPStatus.BAD_REQUEST) + self.assertIn('code_challenge', response.json()['message']) + + @override_switch('v3_endpoints', active=True) + def test_missing_code_challenge_method_returns_400(self): + """Ensure a 400 is returned when code_challenge_method is missing from the authorize request.""" + app = self._create_application('an app') + for version in Versions.latest_versions(): + auth_uri = f'/v{version}/o/authorize/' + response = self.client.get( + auth_uri, + data={ + 'client_id': app.client_id, + 'response_type': 'code', + 'redirect_uri': 'http://localhost', + 'state': '0123456789abcdef', + 'code_challenge': 'sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo', + }, + ) + self.assertEqual(response.status_code, HTTPStatus.BAD_REQUEST) + self.assertIn('code_challenge_method', response.json()['message']) + + @override_switch('v3_endpoints', active=True) + def test_invalid_code_challenge_method_returns_400(self): + """Ensure a 400 is returned when code_challenge_method is not S256.""" + app = self._create_application('an app') + for version in Versions.latest_versions(): + auth_uri = f'/v{version}/o/authorize/' + response = self.client.get( + auth_uri, + data={ + 'client_id': app.client_id, + 'response_type': 'code', + 'redirect_uri': 'http://localhost', + 'state': '0123456789abcdef', + 'code_challenge': 'sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo', + 'code_challenge_method': 'plain', + }, + ) + self.assertEqual(response.status_code, HTTPStatus.BAD_REQUEST) + self.assertIn('must be S256', response.json()['error_description']) + + @override_switch('v3_endpoints', active=True) + def test_missing_state_returns_400(self): + """Ensure a 400 is returned when state is missing from the authorize request.""" + app = self._create_application('an app') + for version in Versions.latest_versions(): + auth_uri = f'/v{version}/o/authorize/' + response = self.client.get( + auth_uri, + data={ + 'client_id': app.client_id, + 'response_type': 'code', + 'redirect_uri': 'http://localhost', + 'code_challenge': 'sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo', + 'code_challenge_method': CODE_CHALLENGE_METHOD_S256, + }, + ) + self.assertEqual(response.status_code, HTTPStatus.BAD_REQUEST) + self.assertIn('state', response.json()['message']) + @patch( 'apps.mymedicare_cb.models.match_fhir_id', return_value=( @@ -1481,7 +1598,7 @@ def test_failure_response_v1_refresh_token_flow_match_fhir_id_failure(self, mock 'code_challenge': code_challenge, 'code_challenge_method': CODE_CHALLENGE_METHOD_S256, } - response = self.client.get('/v1/o/authorize', data=payload) + response = self.client.get('/v1/o/authorize', data=self._add_pkce_defaults(payload)) # post the authorization form with only one scope selected payload = { 'client_id': application.client_id, diff --git a/apps/dot_ext/tests/test_beneficiary_demographic_scope_changes.py b/apps/dot_ext/tests/test_beneficiary_demographic_scope_changes.py index 5a9f5a4f7..3042dc6ee 100644 --- a/apps/dot_ext/tests/test_beneficiary_demographic_scope_changes.py +++ b/apps/dot_ext/tests/test_beneficiary_demographic_scope_changes.py @@ -20,7 +20,11 @@ class TestBeneficiaryDemographicScopesChanges(BaseApiTest): @override_switch('require-scopes', active=True) def _authorize_and_request_token(self, payload, application): - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + auth_payload = dict(payload) + auth_payload.setdefault('state', '0123456789abcdef') + auth_payload.setdefault('code_challenge', 'sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo') + auth_payload.setdefault('code_challenge_method', 'S256') + response = self.client.post(reverse('oauth2_provider:authorize'), data=auth_payload) self.assertEqual(response.status_code, 302) # now extract the authorization code and use it to request an access_token query_dict = parse_qs(urlparse(response['Location']).query) @@ -30,6 +34,7 @@ def _authorize_and_request_token(self, payload, application): 'code': authorization_code, 'redirect_uri': 'http://example.it', 'client_id': application.client_id, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } response = self.client.post(reverse('oauth2_provider:token'), data=token_request_data) @@ -114,6 +119,8 @@ def test_bene_demo_scopes_change(self, mock_get_and_update): 'expires_in': 86400, 'allow': True, 'state': '0123456789abcdef', + 'code_challenge': 'sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo', + 'code_challenge_method': 'S256', } request_scopes = APPLICATION_SCOPES_FULL diff --git a/apps/dot_ext/tests/test_verify_bfd_headers.py b/apps/dot_ext/tests/test_verify_bfd_headers.py index 3f10c037a..5a064ae77 100644 --- a/apps/dot_ext/tests/test_verify_bfd_headers.py +++ b/apps/dot_ext/tests/test_verify_bfd_headers.py @@ -63,6 +63,8 @@ def _create_test_token(self, user, application): 'allow': True, 'state': '0123456789abcdef', 'share_demographic_scopes': True, + 'code_challenge': 'sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo', + 'code_challenge_method': 'S256', } if application.authorization_grant_type == Application.GRANT_IMPLICIT: payload['response_type'] = 'token' @@ -84,6 +86,7 @@ def _create_test_token(self, user, application): 'redirect_uri': application.redirect_uris, 'client_id': application.client_id, 'client_secret': application.client_secret_plain, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } response = self.client.post('/v1/o/token/', data=token_request_data) diff --git a/apps/dot_ext/tests/test_views.py b/apps/dot_ext/tests/test_views.py index 0a57b6339..488971764 100644 --- a/apps/dot_ext/tests/test_views.py +++ b/apps/dot_ext/tests/test_views.py @@ -182,7 +182,11 @@ def fhir_request_coverage_success_mock(self, url, request): @override_switch('require-scopes', active=True) def _authorize_and_request_token(self, payload, application): - response = self.client.post(reverse('oauth2_provider:authorize'), data=payload) + auth_payload = dict(payload) + auth_payload.setdefault('state', '0123456789abcdef') + auth_payload.setdefault('code_challenge', 'sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo') + auth_payload.setdefault('code_challenge_method', 'S256') + response = self.client.post(reverse('oauth2_provider:authorize'), data=auth_payload) self.assertEqual(response.status_code, 302) # now extract the authorization code and use it to request an access_token query_dict = parse_qs(urlparse(response['Location']).query) @@ -192,6 +196,7 @@ def _authorize_and_request_token(self, payload, application): 'code': authorization_code, 'redirect_uri': 'http://example.it', 'client_id': application.client_id, + 'code_verifier': 'test123456789123456789123456789123456789123456789', } return self.client.post(reverse('oauth2_provider:token'), data=token_request_data) @@ -437,6 +442,8 @@ def _create_test_token(self, user: User, application: Application): 'expires_in': 86400, 'allow': True, 'state': '0123456789abcdef', + 'code_challenge': 'E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM', + 'code_challenge_method': 'S256', } if application.authorization_grant_type == Application.GRANT_IMPLICIT: payload['response_type'] = 'token' @@ -458,6 +465,7 @@ def _create_test_token(self, user: User, application: Application): 'redirect_uri': application.redirect_uris, 'client_id': application.client_id, 'client_secret': application.client_secret_plain, + 'code_verifier': 'dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk', } response = self.client.post('/v1/o/token/', data=token_request_data) diff --git a/apps/dot_ext/views/authorization.py b/apps/dot_ext/views/authorization.py index acf03d3e9..82fd4ba84 100644 --- a/apps/dot_ext/views/authorization.py +++ b/apps/dot_ext/views/authorization.py @@ -184,14 +184,26 @@ def _check_for_required_params(self, request): missing_params = [] v3 = True if request.path.startswith('/v3/o/authorize') else False - if not request.GET.get('code_challenge', None): + if not self._get_param(request, 'code_challenge'): missing_params.append('code_challenge') - if not request.GET.get('code_challenge_method', None): + + code_challenge_method = self._get_param(request, 'code_challenge_method') + if not code_challenge_method: missing_params.append('code_challenge_method') + elif code_challenge_method != 'S256': + return JsonResponse( + { + 'status_code': HTTPStatus.BAD_REQUEST, + 'error': 'invalid_request', + 'error_description': 'code_challenge_method must be S256', + }, + status=HTTPStatus.BAD_REQUEST, + ) - if not request.GET.get('state', None): + state = self._get_param(request, 'state') + if not state: missing_params.append('state') - elif len(request.GET.get('state', None)) < 16: + elif len(state) < 16: error_message = 'State parameter should have a minimum of 16 characters' return JsonResponse( {'status_code': HTTPStatus.BAD_REQUEST, 'message': error_message}, @@ -201,7 +213,7 @@ def _check_for_required_params(self, request): # BB2-4250: This code will not execute if the application is not in the v3_early_adopter flag # so it will not be modified as part of BB2-4250 if switch_is_active('v3_endpoints') and v3: - if 'scope' not in request.GET: + if not self._has_param(request, 'scope'): missing_params.append('scope') if missing_params: @@ -308,6 +320,10 @@ def dispatch(self, request, *args, **kwargs): if sensitive_info_detected: return sensitive_info_detected + param_check = self._check_for_required_params(request) + if param_check: + return param_check + request.session['version'] = self.version # Accept lang from GET or POST @@ -359,16 +375,9 @@ def get_initial(self): return initial_data def post(self, request, *args, **kwargs): - kwargs['code_challenge'] = request.POST.get('code_challenge') - kwargs['code_challenge_method'] = request.POST.get('code_challenge_method') return super().post(request, *args, **kwargs) def get(self, request, *args, **kwargs): - param_check = self._check_for_required_params(request) - if param_check: - return param_check - kwargs['code_challenge'] = request.GET.get('code_challenge', None) - kwargs['code_challenge_method'] = request.GET.get('code_challenge_method', None) return super().get(request, *args, **kwargs) def validate_v3_authorization_request(self): @@ -399,16 +408,10 @@ def form_valid(self, form): 'redirect_uri': form.cleaned_data.get('redirect_uri'), 'response_type': form.cleaned_data.get('response_type', None), 'state': form.cleaned_data.get('state', None), - # "code_challenge": form.cleaned_data.get("code_challenge", None), - # "code_challenge_method": form.cleaned_data.get("code_challenge_method", None), + 'code_challenge': form.cleaned_data.get('code_challenge', None), + 'code_challenge_method': form.cleaned_data.get('code_challenge_method', None), } - if form.cleaned_data.get('code_challenge'): - credentials['code_challenge'] = form.cleaned_data.get('code_challenge') - - if form.cleaned_data.get('code_challenge_method'): - credentials['code_challenge_method'] = form.cleaned_data.get('code_challenge_method') - scopes = form.cleaned_data.get('scope') allow = form.cleaned_data.get('allow') diff --git a/apps/logging/constants.py b/apps/logging/constants.py index f39ae16b4..2ef27b177 100644 --- a/apps/logging/constants.py +++ b/apps/logging/constants.py @@ -129,7 +129,7 @@ 'auth_app_id': {'pattern': '^1$'}, 'auth_app_name': {'pattern': 'an app'}, 'auth_app_data_access_type': {'pattern': 'THIRTEEN_MONTH'}, - 'auth_pkce_method': {'type': 'null'}, + 'auth_pkce_method': {'type': 'string', 'pattern': '^S256$'}, 'auth_share_demographic_scopes': {'pattern': '^$'}, 'auth_require_demographic_scopes': {'pattern': '^True$'}, }, diff --git a/apps/logging/tests/test_audit_loggers.py b/apps/logging/tests/test_audit_loggers.py index 6e646d92d..2e9eaa323 100644 --- a/apps/logging/tests/test_audit_loggers.py +++ b/apps/logging/tests/test_audit_loggers.py @@ -419,6 +419,7 @@ def test_creation_on_approval_token_logger_v2(self): def _creation_on_approval_token_logger(self, version=1): # copy and adapted to test token logger redirect_uri = 'http://localhost' + code_challenge = 'sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo' self._create_user('anna', '123456') capability_a = self._create_capability('Capability A', []) capability_b = self._create_capability('Capability B', []) @@ -438,6 +439,9 @@ def _creation_on_approval_token_logger(self, version=1): 'client_id': application.client_id, 'response_type': 'code', 'redirect_uri': redirect_uri, + 'state': '0123456789abcdef', + 'code_challenge': code_challenge, + 'code_challenge_method': 'S256', } response = self.client.get('/{}/o/authorize'.format(api_ver), data=payload) payload = { @@ -448,6 +452,8 @@ def _creation_on_approval_token_logger(self, version=1): 'expires_in': 86400, 'allow': True, 'state': '0123456789abcdef', + 'code_challenge': code_challenge, + 'code_challenge_method': 'S256', } response = self.client.post(response['Location'], data=payload) self.assertEqual(response.status_code, status.HTTP_302_FOUND) @@ -544,6 +550,7 @@ def _request_logger_data_facilitator_end_user(self, version=1): def test_auth_flow_lang_logger(self, version=1): # copy and adapted to test auth flow logger redirect_uri = 'http://localhost' + code_challenge = 'sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo' capability_a = self._create_capability('Capability A', []) capability_b = self._create_capability('Capability B', []) application = self._create_application( @@ -560,6 +567,9 @@ def test_auth_flow_lang_logger(self, version=1): 'response_type': 'code', 'redirect_uri': redirect_uri, 'lang': 'es', + 'state': '0123456789abcdef', + 'code_challenge': code_challenge, + 'code_challenge_method': 'S256', } response = self.client.get('/v2/o/authorize', data=payload) @@ -571,6 +581,8 @@ def test_auth_flow_lang_logger(self, version=1): 'expires_in': 86400, 'allow': True, 'state': '0123456789abcdef', + 'code_challenge': code_challenge, + 'code_challenge_method': 'S256', } response = self.client.post(response['Location'], data=payload) request_log_content = get_log_content(self.logger_registry, logging.AUDIT_HHS_AUTH_SERVER_REQ_LOGGER) diff --git a/apps/mymedicare_cb/tests/test_callback_slsx.py b/apps/mymedicare_cb/tests/test_callback_slsx.py index 901def070..e884b73e6 100644 --- a/apps/mymedicare_cb/tests/test_callback_slsx.py +++ b/apps/mymedicare_cb/tests/test_callback_slsx.py @@ -1,32 +1,33 @@ import copy import json -import jsonschema import re import uuid - -import apps.logging.request_logger as logging - from datetime import datetime +from http import HTTPStatus +from unittest.mock import MagicMock, patch +from urllib.parse import parse_qs, urlparse + +import jsonschema from django.conf import settings from django.contrib.auth.models import Group, User +from django.test.client import Client +from django.urls import reverse from django.utils.dateparse import parse_duration from django.utils.text import slugify -from django.urls import reverse -from django.test.client import Client -from httmock import urlmatch, all_requests, HTTMock +from httmock import HTTMock, all_requests, urlmatch from jsonschema import validate from requests.exceptions import HTTPError from rest_framework import status -from unittest.mock import patch, MagicMock -from urllib.parse import urlparse, parse_qs from waffle.testutils import override_switch +import apps.logging.request_logger as logging from apps.accounts.models import UserProfile -from apps.fhir.server.authentication import MatchFhirIdErrorType, MatchFhirIdResult, MatchFhirIdLookupType from apps.capabilities.models import ProtectedCapability -from apps.dot_ext.models import Approval, Application +from apps.constants import CODE_CHALLENGE_METHOD_S256, MYMEDICARE_CB_GET_UPDATE_BENE_LOG_SCHEMA +from apps.dot_ext.models import Application, Approval from apps.fhir.bluebutton.models import ArchivedCrosswalk, Crosswalk -from apps.logging.utils import redirect_loggers, cleanup_logger, get_log_lines_list, get_log_content +from apps.fhir.server.authentication import MatchFhirIdErrorType, MatchFhirIdLookupType, MatchFhirIdResult +from apps.logging.utils import cleanup_logger, get_log_content, get_log_lines_list, redirect_loggers from apps.mymedicare_cb.authorization import OAuth2ConfigSLSx from apps.mymedicare_cb.constants import ( ERR_MSG_HICN_EMPTY_OR_NONE, @@ -37,16 +38,11 @@ ) from apps.mymedicare_cb.models import AnonUserState from apps.mymedicare_cb.tests.mock_url_responses_slsx import MockUrlSLSxResponses +from apps.mymedicare_cb.tests.responses import patient_response from apps.mymedicare_cb.views import generate_nonce -from apps.constants import CODE_CHALLENGE_METHOD_S256, MYMEDICARE_CB_GET_UPDATE_BENE_LOG_SCHEMA from apps.test import BaseApiTest - -from apps.mymedicare_cb.tests.responses import patient_response - from hhs_oauth_server.settings.base import MOCK_FHIR_ENDPOINT_HOSTNAME, MOCK_FHIR_V3_ENDPOINT_HOSTNAME -from http import HTTPStatus - class MyMedicareSLSxBlueButtonClientApiUserInfoTest(BaseApiTest): """ @@ -133,7 +129,15 @@ def test_callback_url_missing_relay(self): def test_authorize_uuid_dne(self): app = self._create_application('an app') auth_uri = reverse('oauth2_provider:authorize-instance', args=[uuid.uuid4()]) - response = self.client.get(auth_uri, data={'client_id': app.client_id}) + response = self.client.get( + auth_uri, + data={ + 'client_id': app.client_id, + 'code_challenge': 'sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo', + 'code_challenge_method': 'S256', + 'state': '0123456789abcdef', + }, + ) self.assertEqual(status.HTTP_302_FOUND, response.status_code) def test_authorize_uuid(self): @@ -190,6 +194,8 @@ def test_authorize_uuid(self): 'expires_in': 86400, 'allow': True, 'state': '0123456789abcdef', + 'code_challenge_method': CODE_CHALLENGE_METHOD_S256, + 'code_challenge': 'sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo', } response = self.client.post(auth_uri, data=payload) self.assertEqual(status.HTTP_302_FOUND, response.status_code) @@ -202,7 +208,9 @@ def test_authorize_uuid(self): 'client_id': application.client_id, 'redirect_uri': 'http://test.com', 'response_type': 'code', - 'state': '1234567890', + 'state': '0123456789abcdef', + 'code_challenge_method': CODE_CHALLENGE_METHOD_S256, + 'code_challenge': 'sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo', }, ) self.assertEqual(status.HTTP_302_FOUND, response.status_code) @@ -213,9 +221,11 @@ def test_authorize_uuid(self): 'client_id': application.client_id, 'redirect_uri': 'http://test.com', 'response_type': 'code', + 'code_challenge_method': CODE_CHALLENGE_METHOD_S256, + 'code_challenge': 'sZrievZsrYqxdnu2NVD603EiYBM18CuzZpwB-pOSZjo', }, ) - self.assertEqual(status.HTTP_302_FOUND, response.status_code) + self.assertEqual(HTTPStatus.BAD_REQUEST, response.status_code) def test_callback_url_success_v1(self): self._test_callback_url_success(1) @@ -1079,13 +1089,19 @@ def catchall(url, request): self.assertTrue(self.validate_json_schema(log_schema, log_dict)) def test_callback_usrinfo_invalid_hicn_none(self): - self._callback_usrinfo_invalid_hicn_mbi(self.mock_response.slsx_user_info_none_hicn_mock, ERR_MSG_HICN_EMPTY_OR_NONE) + self._callback_usrinfo_invalid_hicn_mbi( + self.mock_response.slsx_user_info_none_hicn_mock, ERR_MSG_HICN_EMPTY_OR_NONE + ) def test_callback_usrinfo_invalid_hicn_empty(self): - self._callback_usrinfo_invalid_hicn_mbi(self.mock_response.slsx_user_info_empty_hicn_mock, ERR_MSG_HICN_EMPTY_OR_NONE) + self._callback_usrinfo_invalid_hicn_mbi( + self.mock_response.slsx_user_info_empty_hicn_mock, ERR_MSG_HICN_EMPTY_OR_NONE + ) def test_callback_usrinfo_invalid_hicn_non_str(self): - self._callback_usrinfo_invalid_hicn_mbi(self.mock_response.slsx_user_info_non_str_hicn_mock, ERR_MSG_HICN_NOT_STR) + self._callback_usrinfo_invalid_hicn_mbi( + self.mock_response.slsx_user_info_non_str_hicn_mock, ERR_MSG_HICN_NOT_STR + ) def test_callback_usrinfo_invalid_mbi_non_str(self): self._callback_usrinfo_invalid_hicn_mbi(self.mock_response.slsx_user_info_non_str_mbi_mock, ERR_MSG_MBI_NOT_STR) @@ -1120,7 +1136,9 @@ def catchall(url, request): } ) s.save() - response = self.client.get(self.callback_url, data={'req_token': '0000-test_req_token-0000', 'relay': state}) + response = self.client.get( + self.callback_url, data={'req_token': '0000-test_req_token-0000', 'relay': state} + ) resp_json = response.json() self.assertEqual(response.status_code, HTTPStatus.CONFLICT) self.assertIsNotNone(resp_json) @@ -1139,7 +1157,9 @@ def catchall(url, request): @patch( 'apps.mymedicare_cb.models.match_fhir_id', return_value=( - MatchFhirIdResult(error='Failure', error_type=MatchFhirIdErrorType.UPSTREAM, lookup_type=MatchFhirIdLookupType.MBI) + MatchFhirIdResult( + error='Failure', error_type=MatchFhirIdErrorType.UPSTREAM, lookup_type=MatchFhirIdLookupType.MBI + ) ), ) def test_failure_response_v1_auth_flow_match_fhir_id_failure(self, mock_match_fhir): diff --git a/apps/pkce/oauth2_server.py b/apps/pkce/oauth2_server.py index 6d8476854..f5ba1abe5 100644 --- a/apps/pkce/oauth2_server.py +++ b/apps/pkce/oauth2_server.py @@ -1,12 +1,13 @@ import base64 import hashlib from urllib.parse import urlparse -from oauthlib.oauth2.rfc6749.errors import OAuth2Error, InvalidRequestError +from django.core.exceptions import ObjectDoesNotExist from oauth2_provider.models import get_grant_model from oauth2_provider.settings import oauth2_settings -from django.core.exceptions import ObjectDoesNotExist -from apps.pkce.constants import ERR_CC_REQUIRED, ERR_CCM_S256_REQUIRED, ERR_CCM_REQUIRED +from oauthlib.oauth2.rfc6749.errors import InvalidRequestError, OAuth2Error + +from apps.pkce.constants import ERR_CC_REQUIRED, ERR_CCM_REQUIRED, ERR_CCM_S256_REQUIRED Grant = get_grant_model()