diff --git a/pyeudiw/openid4vp/authorization_request.py b/pyeudiw/openid4vp/authorization_request.py index f987b7d24..8b93b6d35 100644 --- a/pyeudiw/openid4vp/authorization_request.py +++ b/pyeudiw/openid4vp/authorization_request.py @@ -25,6 +25,7 @@ def build_authorization_request_claims( response_uri: str, authorization_config: dict, nonce: str = "", + metadata: dict = None, ) -> dict: """ Primitive function to build the payload claims of the (JAR) authorization request. @@ -41,6 +42,8 @@ def build_authorization_request_claims( :param nonce: optional nonce to be inserted in the request object; if not \ set, a new cryptographically safe uuid v4 nonce is generated. :type nonce: str + :param metadata: optional metadata to be included in the request object + :type metadata: dict :raises KeyError: if authorization_config misses mandatory configuration options :returns: a dictionary with the *complete* set of jar jwt playload claims :rtype: dict @@ -62,6 +65,10 @@ def build_authorization_request_claims( "iat": iat_now(), "exp": exp_from_now(minutes=authorization_config["expiration_time"]), } + + if metadata: + claims["client_metadata"] = metadata + if authorization_config.get("scopes"): claims["scope"] = " ".join(authorization_config["scopes"]) # backend configuration validation should check that at least PE or DCQL must be configured within the authz request conf diff --git a/pyeudiw/openid4vp/authorization_response.py b/pyeudiw/openid4vp/authorization_response.py index b35392027..06afbb474 100644 --- a/pyeudiw/openid4vp/authorization_response.py +++ b/pyeudiw/openid4vp/authorization_response.py @@ -1,3 +1,4 @@ +from typing import TypeVar import cryptojwt.jwe.exception import satosa.context from pyeudiw.jwt.exceptions import JWEDecryptionError @@ -14,6 +15,27 @@ ) +_S = TypeVar('_S', str, list[str]) + + +def normalize_jsonstring_to_string(s: _S) -> _S: + """ + Normalize s from string (or list of string) or JSON String (or list + of JSON String) to simply string (or list of string). + For example, this would map a vp_token from JSON String "ey...Ui5" to + the naitve string ey...Ui5 (note the missing quote "). + + Note that this method is NOT intended to parse JSON String. + For that purpose, json.loads should be preferred. Instead, this method + should be used when an imput might be a string OR a JSON string. + """ + if isinstance(s, str): + return s.strip('"') + if isinstance(s, list): + return [v.strip('"') for v in s] + return s + + def detect_response_mode(context: satosa.context.Context) -> ResponseMode: """ Try to make inference on which response mode type this is based on the @@ -67,7 +89,16 @@ def parse_and_validate( resp_data: dict = context.request try: - return AuthorizeResponsePayload(**resp_data) + d = {} + if (vp_token := resp_data.get("vp_token", None)): + # vp_token should be a JSON string but caller might not be compliant and use string instead + vp_token = normalize_jsonstring_to_string(vp_token) + d["vp_token"] = vp_token + if (state := resp_data.get("state", None)): + d["state"] = state + if (presentation_submission := resp_data["presentation_submission"]): + d["presentation_submission"] = presentation_submission + return AuthorizeResponsePayload(**d) except Exception as e: raise AuthRespParsingException( "invalid data in direct_post request body", e diff --git a/pyeudiw/openid4vp/exceptions.py b/pyeudiw/openid4vp/exceptions.py index 20426d3df..371891a93 100644 --- a/pyeudiw/openid4vp/exceptions.py +++ b/pyeudiw/openid4vp/exceptions.py @@ -32,4 +32,16 @@ class MissingIssuer(Exception): """ Raised when a given VP not contain the issuer """ + pass + +class MdocCborValidationError(Exception): + """ + Raised when a given VP not contain the issuer + """ + pass + +class VPExpired(Exception): + """ + Raised when a given VP is expired + """ pass \ No newline at end of file diff --git a/pyeudiw/openid4vp/vp_mdoc_cbor.py b/pyeudiw/openid4vp/vp_mdoc_cbor.py index d89e2a6aa..7bf37ea74 100644 --- a/pyeudiw/openid4vp/vp_mdoc_cbor.py +++ b/pyeudiw/openid4vp/vp_mdoc_cbor.py @@ -1,19 +1,42 @@ from pymdoccbor.mdoc.verifier import MdocCbor +from datetime import datetime +from pyeudiw.openid4vp.exceptions import MdocCborValidationError +import logging -from pyeudiw.openid4vp.vp import Vp +logger = logging.getLogger(__name__) -class VpMDocCbor(Vp): +class VpMDocCbor: def __init__(self, data: str) -> None: self.data = data self.mdoc = MdocCbor() self.parse_digital_credential() - def parse_digital_credential(self) -> None: - self.mdoc.load(data=self.data) + def get_documents(self) -> dict: + return self.mdoc.data_as_cbor_dict["documents"] + + def is_revoked(self) -> bool: + return False + + def is_expired(self) -> bool: + _val_until: str = "" + try: + _val_until = self.mdoc.data_as_cbor_dict()["issuerSigned"]["issuerAuth"]["validityInfo"].get("validUntil") + except KeyError as e: + logger.error(f'Unconsitent issuerSigned schema ["issuerSigned"]["issuerAuth"]["validityInfo"], {e}, in mdoc cbor: {self.mdoc.data_as_cbor_dict()}') + if _val_until: + exp_date = datetime.fromisoformat(_val_until) + else: + logger.warning(f"Missing issuerSigned velidUntil in mdoc cbor: {self.mdoc.data_as_cbor_dict()}") - def verify(self, **kwargs) -> bool: - return self.mdoc.verify() + return exp_date < datetime.now() + + def verify_signature(self) -> None: + if self.mdoc.verify() == False: + raise MdocCborValidationError("Signature is invalid") + + def parse_digital_credential(self) -> None: + self.mdoc.loads(data=self.data) def _detect_vp_type(self) -> str: return "mdoc_cbor" diff --git a/pyeudiw/openid4vp/vp_sd_jwt_vc.py b/pyeudiw/openid4vp/vp_sd_jwt_vc.py index 605285674..fc78d3479 100644 --- a/pyeudiw/openid4vp/vp_sd_jwt_vc.py +++ b/pyeudiw/openid4vp/vp_sd_jwt_vc.py @@ -5,9 +5,9 @@ from pyeudiw.jwt.helper import is_jwt_expired from pyeudiw.openid4vp.interface import VpTokenParser, VpTokenVerifier -from pyeudiw.sd_jwt.schema import VerifierChallenge, is_sd_jwt_kb_format +from pyeudiw.sd_jwt.schema import VerifierChallenge from pyeudiw.sd_jwt.sd_jwt import SdJwt -from pyeudiw.openid4vp.exceptions import NotKBJWT, MissingIssuer +from pyeudiw.openid4vp.exceptions import MissingIssuer class VpVcSdJwtParserVerifier(VpTokenParser, VpTokenVerifier): @@ -18,10 +18,6 @@ def __init__( verifier_nonce: Optional[str] = None, ): self.token = token - if not is_sd_jwt_kb_format(token): - raise NotKBJWT( - f"input [token]={token} is not an sd-jwt with key binding: maybe it is a regular jwt or key binding jwt is missing?" - ) self.verifier_id = verifier_id self.verifier_nonce = verifier_nonce # precomputed values @@ -64,7 +60,7 @@ def is_expired(self) -> bool: :returns: if the credential is expired :rtype: bool """ - return is_jwt_expired(self.sdjwt.issuer_jwt) + return is_jwt_expired(self.sdjwt.issuer_jwt.jwt) def verify_signature(self, public_key: ECKey | RSAKey | dict) -> None: """ diff --git a/pyeudiw/satosa/default/request_handler.py b/pyeudiw/satosa/default/request_handler.py index f54f093ba..43358cfcd 100644 --- a/pyeudiw/satosa/default/request_handler.py +++ b/pyeudiw/satosa/default/request_handler.py @@ -28,12 +28,18 @@ def request_endpoint(self, context: Context, *args) -> Response: "request error: missing or invalid parameter [id]", e400 ) + + try: + metadata = self.trust_evaluator.get_metadata(self.client_id) + except Exception: + metadata = None data = build_authorization_request_claims( self.client_id, state, self.absolute_response_url, self.config["authorization"], + metadata=metadata, ) if _aud := self.config["authorization"].get("aud"): diff --git a/pyeudiw/satosa/default/response_handler.py b/pyeudiw/satosa/default/response_handler.py index 3e024d52f..9e32d468d 100644 --- a/pyeudiw/satosa/default/response_handler.py +++ b/pyeudiw/satosa/default/response_handler.py @@ -41,6 +41,9 @@ from pyeudiw.trust.exceptions import InvalidJwkMetadataException from pyeudiw.jwt.exceptions import JWSVerificationError from pyeudiw.sd_jwt.exceptions import UnsupportedSdAlg, InvalidKeyBinding +from pyeudiw.sd_jwt.schema import is_sd_jwt_kb_format +from pyeudiw.openid4vp.vp_mdoc_cbor import VpMDocCbor +from pyeudiw.openid4vp.exceptions import MdocCborValidationError, VPExpired @@ -49,11 +52,11 @@ class ResponseHandler(ResponseHandlerInterface): _SUPPORTED_RESPONSE_CONTENT_TYPE = "application/x-www-form-urlencoded" _ACCEPTED_ISSUER_METADATA_TYPE = "openid_credential_issuer" - def _extract_all_user_attributes(self, attributes_by_issuers: dict) -> dict: + def _extract_all_user_attributes(self, extracted_attributess: dict) -> dict: # for all the valid credentials, take the payload and the disclosure and disclose user attributes # returns the user attributes ... all_user_attributes = dict() - for i in attributes_by_issuers.values(): + for i in extracted_attributess.values(): all_user_attributes.update(**i) return all_user_attributes @@ -158,7 +161,7 @@ def response_endpoint( # (1) we don't check that presentation submission matches definition (yet) # (2) we don't check that vp tokens are aligned with information declared in the presentation submission # (3) we use all disclosed claims in vp tokens to build the user identity - attributes_by_issuer: dict[str, dict[str, Any]] = {} + extracted_attributes: dict[str, dict[str, Any]] = {} credential_issuers: list[str] = [] encoded_vps: list[str] = ( [authz_payload.vp_token] @@ -168,71 +171,116 @@ def response_endpoint( for vp_token in encoded_vps: # verify vp token and extract user information - try: - token_parser, token_verifier = self._vp_verifier_factory( - authz_payload.presentation_submission, vp_token, request_session - ) - except NotKBJWT as e400: - return self._handle_400( - context, - "invalid vp token: not a key-bound jwt", - e400 - ) - - try: - token_issuer = token_parser.get_issuer_name() - whitelisted_keys = self.trust_evaluator.get_public_keys(token_issuer) - token_verifier.verify_signature(whitelisted_keys) - token_verifier.verify_challenge() - except MissingIssuer as e400: - return self._handle_400( - context, - "invalid vp token: missing issuer information", - e400 - ) - except InvalidJwkMetadataException as e500: - return self._handle_500( - context, - "trust error: cannot fetch public keys", - e500 - ) - except JWSVerificationError as e400: - return self._handle_400( - context, - "invalid vp token: invalid signature", - e400 - ) - except UnsupportedSdAlg as e400: - return self._handle_400( - context, - "invalid vp token: unsupported signature algorithm", - e400 - ) - except InvalidKeyBinding as e400: - return self._handle_400( - context, - "invalid vp token: nonce or aud mismatch", - e400 - ) - except ValueError as e400: - return self._handle_400( - context, - "invalid vp token: missing or invalid iat claim", - e400 - ) - except Exception as e400: - return self._handle_400( - context, - "trust error: cannot verify vp token", - e400 - ) - - claims = token_parser.get_credentials() - iss = token_parser.get_issuer_name() - attributes_by_issuer[iss] = claims - self._log_debug(context, f"disclosed claims {claims} from issuer {iss}") - - all_attributes = self._extract_all_user_attributes(attributes_by_issuer) + if is_sd_jwt_kb_format(vp_token): + try: + challenge = self._get_verifier_challenge(request_session) + + token_processor = VpVcSdJwtParserVerifier( + vp_token, challenge["aud"], challenge["nonce"] + ) + + token_issuer = token_processor.get_issuer_name() + + whitelisted_keys = self.trust_evaluator.get_public_keys(token_issuer) + token_processor.verify_signature(whitelisted_keys) + token_processor.verify_challenge() + + if token_processor.is_expired() == True: + raise VPExpired("VP is expired") + + claims = token_processor.get_credentials() + iss = token_processor.get_issuer_name() + + extracted_attributes[iss] = claims + except MissingIssuer as e400: + return self._handle_400( + context, + "invalid vp token: missing issuer information", + e400 + ) + except InvalidJwkMetadataException as e500: + return self._handle_500( + context, + "trust error: cannot fetch public keys", + e500 + ) + except JWSVerificationError as e400: + return self._handle_400( + context, + "invalid vp token: invalid signature", + e400 + ) + except UnsupportedSdAlg as e400: + return self._handle_400( + context, + "invalid vp token: unsupported signature algorithm", + e400 + ) + except InvalidKeyBinding as e400: + return self._handle_400( + context, + "invalid vp token: nonce or aud mismatch", + e400 + ) + except ValueError as e400: + return self._handle_400( + context, + "invalid vp token: missing or invalid iat claim", + e400 + ) + except VPExpired as e400: + return self._handle_400( + context, + "invalid vp token: expired", + e400 + ) + except Exception as e400: + return self._handle_400( + context, + "trust error: cannot verify vp token", + e400 + ) + else: + try: + token_processor = VpMDocCbor(vp_token) + except Exception as e400: + return self._handle_400( + context, + "invalid vp token: cannot parse vp token", + e400 + ) + + try: + token_processor.verify_signature() + + docs = token_processor.get_documents() + + #TODO: implement decode of attributes + + #for doc in docs: + # data = doc["issuerSigned"]["nameSpaces"].values() + # extracted_attributes[data["doc_type"]] = doc + + except MdocCborValidationError as e400: + return self._handle_400( + context, + "invalid vp token: invalid signature", + e400 + ) + except VPExpired as e400: + return self._handle_400( + context, + "invalid vp token: expired", + e400 + ) + except Exception as e400: + return self._handle_400( + context, + "trust error: cannot verify vp token", + e400 + ) + + all_attributes = self._extract_all_user_attributes(extracted_attributes) iss_list_serialized = ";".join(credential_issuers) # marshaling is whatever internal_resp = self._translate_response( all_attributes, iss_list_serialized, context @@ -358,17 +406,6 @@ def _parse_authorization_response( Exception("invalid program state"), ) - def _vp_verifier_factory( - self, presentation_submission: dict, token: str, session_data: dict - ) -> tuple[VpTokenParser, VpTokenVerifier]: - # TODO: la funzione dovrebbe consumare la presentation submission per sapere quale token - # ritornare - per ora viene ritornata l'unica implementazione possibile - challenge = self._get_verifier_challenge(session_data) - token_processor = VpVcSdJwtParserVerifier( - token, challenge["aud"], challenge["nonce"] - ) - return (token_processor, deepcopy(token_processor)) - def _get_verifier_challenge(self, session_data: dict) -> VerifierChallenge: # TODO: check aud according to the LSP Potential singularities ... return {"aud": self.client_id, "nonce": session_data["nonce"]} diff --git a/pyeudiw/tests/openid4vp/test_authorization_response.py b/pyeudiw/tests/openid4vp/test_authorization_response.py index fded18a8b..98826ebfb 100644 --- a/pyeudiw/tests/openid4vp/test_authorization_response.py +++ b/pyeudiw/tests/openid4vp/test_authorization_response.py @@ -5,6 +5,7 @@ from pyeudiw.openid4vp.authorization_response import ( DirectPostJwtJweParser, DirectPostParser, + normalize_jsonstring_to_string, ) from pyeudiw.openid4vp.exceptions import ( AuthRespParsingException, @@ -42,6 +43,7 @@ def test_direct_post_parser_good_case(): {"id": "verifiable-credential-type", "format": "dc+sd-jwt", "path": "$.vct"} ], } + # case 0: vp_token is string ctx.request = { "vp_token": vp_token, "state": state, @@ -53,6 +55,18 @@ def test_direct_post_parser_good_case(): assert resp.state == state assert resp.presentation_submission == presentation_submission + # case 1: vp_token is a json string + ctx.request = { + "vp_token": f'"{vp_token}"', + "state": state, + "presentation_submission": presentation_submission + } + + resp = parser.parse_and_validate(ctx) + assert resp.vp_token == vp_token + assert resp.state == state + assert resp.presentation_submission == presentation_submission + def test_direct_post_response_bad_parse_case(): # case 0: bad method @@ -114,6 +128,7 @@ def test_direct_post_jwt_jwe_parser_good_case(jwe_helper): {"id": "verifiable-credential-type", "format": "dc+sd-jwt", "path": "$.vct"} ], } + data = { "vp_token": vp_token, "state": state, @@ -213,3 +228,13 @@ def test_direct_post_jwt_jwe_parser_bad_validation_case(jwe_helper): assert False, f"obtained unexpected parsing exception: {e}" except AuthRespValidationException: assert True + + +def test_normalize_json_string(): + s = 'asd' + assert s == normalize_jsonstring_to_string(s) + assert s == normalize_jsonstring_to_string(f'"{s}"') + + sl = ['asd', 'fgh'] + assert sl == normalize_jsonstring_to_string(sl) + assert sl == normalize_jsonstring_to_string([f'"{sl[0]}"', f'"{sl[1]}"']) diff --git a/pyeudiw/tests/satosa/test_backend.py b/pyeudiw/tests/satosa/test_backend.py index ddfca08ed..faaece702 100644 --- a/pyeudiw/tests/satosa/test_backend.py +++ b/pyeudiw/tests/satosa/test_backend.py @@ -1,3 +1,4 @@ +import os import uuid import base64 import datetime @@ -20,6 +21,7 @@ from pyeudiw.storage.base_storage import TrustType from pyeudiw.storage.db_engine import DBEngine from pyeudiw.jwt.jws_helper import DEFAULT_SIG_KTY_MAP +from pymdoccbor.mdoc.issuer import MdocCborIssuer from pyeudiw.tests.federation.base import ( EXP, NOW, @@ -49,6 +51,31 @@ from pyeudiw.jwt.jwe_helper import JWEHelper from pyeudiw.satosa.utils.response import JsonResponse +PKEY = { + 'KTY': 'EC2', + 'CURVE': 'P_256', + 'ALG': 'ES256', + 'D': os.urandom(32), + 'KID': b"demo-kid" +} + +PID_DATA = { + "eu.europa.ec.eudiw.pid.1": { + "family_name": "Raffaello", + "given_name": "Mascetti", + "birth_date": "1922-03-13", + "birth_place": "Rome", + "birth_country": "IT" + }, + "eu.europa.ec.eudiw.pid.it.1": { + "tax_id_code": "TINIT-XXXXXXXXXXXXXXX" + } +} + +mdoci = MdocCborIssuer( + private_key=PKEY +) + def issue_sd_jwt(specification: dict, settings: dict, issuer_key: JWK, holder_key: JWK) -> dict: claims = { @@ -174,9 +201,17 @@ def _generate_payload(self, issuer_jwk, holder_jwk, nonce, state, aud): vp_token = sdjwt_at_holder.sd_jwt_presentation + mdoci.new( + doctype="eu.europa.ec.eudiw.pid.1", + data=PID_DATA, + devicekeyinfo=PKEY + ) + + vp_token_mdoc = mdoci.dumps().decode() + return { "state": state, - "vp_token": vp_token, + "vp_token": [vp_token, vp_token_mdoc], "presentation_submission": { "definition_id": "32f54163-7166-48f1-93d8-ff217bdb0653", "id": "04a98be3-7fb0-4cf5-af9a-31579c8b0e7d", @@ -336,7 +371,7 @@ def test_fail_vp_validation_in_response_endpoint(self, context): assert response_endpoint.status == "400" msg = json.loads(response_endpoint.message) assert msg["error"] == "invalid_request" - assert msg["error_description"] == "invalid vp token: not a key-bound jwt" + assert msg["error_description"] == "invalid vp token: cannot parse vp token" def test_response_endpoint(self, context): nonce = str(uuid.uuid4()) @@ -448,7 +483,7 @@ def test_response_endpoint_no_key_binding_jwt(self, context): msg = json.loads(response_endpoint.message) assert response_endpoint.status == "400" assert msg["error"] == "invalid_request" - assert msg["error_description"] == "invalid vp token: not a key-bound jwt" + assert msg["error_description"] == "invalid vp token: cannot parse vp token" def test_response_endpoint_invalid_signature(self, context): nonce = str(uuid.uuid4()) @@ -459,7 +494,7 @@ def test_response_endpoint_invalid_signature(self, context): response = self._generate_payload(self.issuer_jwk, self.holder_jwk, nonce, state, self.backend.client_id) - jwt_segments = response["vp_token"].split(".") + jwt_segments = response["vp_token"][0].split(".") midlen = len(jwt_segments[2]) // 2 jwt_segments[2] = jwt_segments[2][:midlen] + jwt_segments[2][midlen+1:] diff --git a/pyeudiw/tests/trust/handler/test_direct_trust_jar.py b/pyeudiw/tests/trust/handler/test_direct_trust_jar.py index 2be35905c..c9723a50b 100644 --- a/pyeudiw/tests/trust/handler/test_direct_trust_jar.py +++ b/pyeudiw/tests/trust/handler/test_direct_trust_jar.py @@ -23,6 +23,22 @@ def signing_private_key() -> list[dict]: } ] +@pytest.fixture +def rsa_signing_private_key() -> list[dict]: + return [{ + "kty": "RSA", + "use": "sig", + "alg": "RS256", + "kid": "123114cf-ebef-48d9-9602-3be85e6e12dd", + "d": "b41VkvQv083zdtsqX9Q4RqW6DOH7LcSMSSK-KaUi-jtR4SdPkans1vY9QwfZ1gL-iQm0UP50Txow1Xawnh_-O45efpTOJ0sEXno5gXregQQNXxum-ATh7npYTv3Zjfl1lw4GX9UvXwtko3zHA01OtvOdXxtDHtatvoojFEwTisBT5j9f_q7Dmmgmtml17U_M1heANv9O9PqOey2U7_wZRji2lLGpeP7DxeBpTVztyKdnBZCjBnwfyrES3eAPlO5GI3zWAxHuaSsms3F8WQKJqHQs8xDxHpC1MCPMqmnCZnrBxZXxeeg6gMuEJ72RtzziOwH2gr3alND6gpARwwgEYQ", + "n": "oV1dBQQpxKhVpJzouceEvuJQ_0nIvK3GVF4FEKRunCWK1amBupkegZgIXq98WsvfNHLwKPhhFXO1unONb44Q51VeFet7ThWyJSB9dhXmr21wvqFA4HVQj4vGPLiGUmacKL-9W4vd_ElLyf1TEtcolUafEI83zfg6bsVkJrwdSRDkxYU5Kh28ayCgoaqXUwLsuR-xT5EiksJESHtqW5_8sqrp5v95UOxxK8NdbEQ54Fr2pfeKQ6Id5VyUlwOnfnV6zgJJ7qBM1NxcyQ7OkQHrh03LfoPF2Hl7-EuZ0ET8p9RVC7eC2NH033O9rSiWljwwsvmRG7nyVN7bkB5wbInp0Q", + "e": "AQAB", + "p": "0RHnCQZiI6VomMmRcfDyRgqZjUEHLPF17u4TAxqFys3-lgxuRCn8cjXkzJ7t9C0FmGNQy2zrwhQZRUlKotPwB9t0qTRwshqmG40O4EHfdgqu_sqNe8toCJ9xGqkDJFdYvmPy-SkqMYyszRf1GEwMjgj1Ncyx4WciaEbHZUllQo0", + "q": "xZYanwkJJGOD4b7Z2PwCA_ubEYU8O2C3UoeINv2P5fXicXRK278o4WelaQBhyvDcPyS3lJyyusB_ro3Fax1fm4IDV1buITar671NzooWKOUQgG0MoVHS8k7qFmGXGDhFBrO_khsvc3FNAjdqkNpH5slo8AwvN2SrbHO3GX6aVVU", + "dp": "tk7iJCCI24SVXQYH6k-tNB5yH5ag5zP3Hs5DjeVG3b4bTkSwsofaNs2AIl5EKTRJOMUB4yGrw6U7FAwBJVOib3eSlym_S8-pIUUzv6IxdgGC73M5RMXuhfZi7liLANmZ7QvDCDo5LNP6qy1E8FcAa6qsCKniQydn_X4aydvijNE", + "dq": "Ml9mQg1Hq2NDiBXj7BGzYdiPXBQfmvO5SO0MqRhTy0i4hjwjqYo-ndiSrwZN6DMns2Fk_BpG5p2U76dtITXH3hlzSJz88LLDecI1R-akZ6CeaF9kzOvTX7sGqtYOczpFPsQsns8XddL40wvVu0Aq_Id0nV49211q5qdJktJX_lE", + "qi": "rQ5SbqNeVrGOZ1rJXWbiAxux_-E1HBunOKWN6HQpoStLpRzJ6zz8aEXhSXMAnbeQOi1ZBS1escmlSupkgz4TEnrhionAJ2orIJ1rOiZIii7stJVkB3fs2LBoxs17Msj9AVrBA-tHhWpoBj63t-ahhEuxhgReq_0DjzQgcP7xUA" + }] @pytest.fixture def all_private_keys(signing_private_key: list[dict]) -> list[dict]: @@ -119,6 +135,25 @@ def test_direct_trust_jar_metadata(direct_trust_jar): assert "d" not in pub_key assert pub_key.get("use", "") != "enc" +def test_direct_trust_jar_metadata_no_private_key(all_private_keys, rsa_signing_private_key): + dtj = DirectTrustJar(jwks=all_private_keys+rsa_signing_private_key) + + backend = "openid4vp" + entity_id = f"https://rp.example/{backend}" + metadata = dtj._build_metadata_with_issuer_jwk(entity_id) + assert len(metadata["jwks"]["keys"]) == 2 + + assert "d" not in metadata["jwks"]["keys"][0] + assert metadata["jwks"]["keys"][0].get("use", "") != "enc" + + assert "d" not in metadata["jwks"]["keys"][1] + assert "p" not in metadata["jwks"]["keys"][1] + assert "q" not in metadata["jwks"]["keys"][1] + assert "dp" not in metadata["jwks"]["keys"][1] + assert "dq" not in metadata["jwks"]["keys"][1] + assert "qi" not in metadata["jwks"]["keys"][1] + assert metadata["jwks"]["keys"][1].get("use", "") != "enc" + def test_direct_trust_metadata_handler(direct_trust_jar, signing_private_key): backend = "openid4vp" diff --git a/pyeudiw/tests/trust/mock_trust_handler.py b/pyeudiw/tests/trust/mock_trust_handler.py index 620b05455..059783557 100644 --- a/pyeudiw/tests/trust/mock_trust_handler.py +++ b/pyeudiw/tests/trust/mock_trust_handler.py @@ -13,6 +13,17 @@ "y": "fUEsB8IrX2DgzqABfVsCody1RypAXX54fXQ1keoPP5Y", } +mock_jwk_private = { + "kty": "EC", + "d": "Md6-VEjd5ZEFTnKo7AUCzUdejljdXdSAurSywGNw8oo", + "use": "sig", + "crv": "P-256", + "kid": "8OQ2P3OVYOys0Aobqr1HbK2HVpJiiWBl1Z2c2v732Sw", + "x": "SMbScr0uzMGAsEMaGbAMXjQwv45h5Lpx3oMLljgsAeA", + "y": "NGbl7KWmrDc_LiM2oLOm-wrNPmlhtSDdBV6noIB7jyw", + "alg": "ES256" +} + class MockTrustHandler(TrustHandlerInterface): """ @@ -25,7 +36,15 @@ def __init__(self, *args, **kwargs): def get_metadata(self, issuer: str, trust_source: TrustSourceData) -> dict: if issuer == self.client_id: - trust_source.metadata = {"default_key": "default_value"} + trust_source.metadata = { + "default_key": "default_value", + "jwks": { + "keys": [ + mock_jwk, + mock_jwk_private + ] + }, + } return trust_source trust_source.metadata = {"json_key": "json_value"} @@ -39,7 +58,7 @@ def extract_and_update_trust_materials( if issuer == self.client_id: trust_param = TrustParameterData( attribute_name="trust_param_name", - jwks=[mock_jwk], + jwks=[mock_jwk, mock_jwk_private], expiration_date=datetime.fromtimestamp(exp_from_now(self.exp)), trust_param_name={'trust_param_key': 'trust_param_value'}, trust_handler_name=str(self.__class__.__name__) @@ -47,7 +66,7 @@ def extract_and_update_trust_materials( else: trust_param = TrustParameterData( attribute_name="trust_param_name", - jwks=[mock_jwk], + jwks=[mock_jwk, mock_jwk_private], expiration_date=datetime.fromtimestamp(exp_from_now(self.exp)), trust_param_name={"trust_param_key": "trust_param_value"}, trust_handler_name=str(self.__class__.__name__) diff --git a/pyeudiw/tests/trust/test_dynamic.py b/pyeudiw/tests/trust/test_dynamic.py index 3a8ced5b7..711b72824 100644 --- a/pyeudiw/tests/trust/test_dynamic.py +++ b/pyeudiw/tests/trust/test_dynamic.py @@ -54,14 +54,24 @@ def test_public_key_and_metadata_retrive(): "config": {}, }, - }, db_engine, default_client_id="default-client-id" + }, db_engine, default_client_id="default-client-id", mode="update_first" ) uuid_url = f"http://{uuid4()}.issuer.it" assert trust_ev.get_jwt_header_trust_parameters(uuid_url) == {'trust_param_name': {'trust_param_key': 'trust_param_value'}} - assert trust_ev.get_metadata() == {"default_key": "default_value"} + metadata = trust_ev.get_metadata() + + assert metadata["default_key"] == "default_value" + assert len(metadata["jwks"]["keys"]) == 2 + assert "d" not in metadata["jwks"]["keys"][0] + assert "d" not in metadata["jwks"]["keys"][1] + + keys = trust_ev.get_public_keys() + assert len(keys) == 2 + assert "d" not in keys[0] + assert "d" not in keys[1] def test_update_first_strategy(): db_engine = DBEngine(CONFIG["storage"]) diff --git a/pyeudiw/trust/dynamic.py b/pyeudiw/trust/dynamic.py index 5133b9c7d..baf3e5bcb 100644 --- a/pyeudiw/trust/dynamic.py +++ b/pyeudiw/trust/dynamic.py @@ -9,7 +9,7 @@ from pyeudiw.storage.exceptions import EntryNotFound from pyeudiw.tools.base_logger import BaseLogger from pyeudiw.tools.utils import dynamic_class_loader -from pyeudiw.trust.exceptions import NoCriptographicMaterial, TrustConfigurationError +from pyeudiw.trust.exceptions import NoCriptographicMaterial, TrustConfigurationError, NoMetadata from pyeudiw.trust.handler.direct_trust_jar import DirectTrustJar from pyeudiw.trust.handler.direct_trust_sd_jwt_vc import DirectTrustSdJwtVc from pyeudiw.trust.handler.interface import TrustHandlerInterface @@ -172,10 +172,11 @@ def get_public_keys(self, issuer: Optional[str] = None, force_update: bool = Fal for handler in self.handlers: if (key := trust_source.get_trust_param_by_handler_name(handler.__class__.__name__)) is not None: for jwk in key.jwks: - thumbprint = key_from_jwk_dict(jwk).thumbprint("SHA-256") + key = key_from_jwk_dict(jwk) + thumbprint = key.thumbprint("SHA-256") if thumbprint not in thumbprints: thumbprints.append(thumbprint) - keys.append(jwk) + keys.append(key.serialize(private=False)) if not keys: raise NoCriptographicMaterial( @@ -188,16 +189,29 @@ def get_public_keys(self, issuer: Optional[str] = None, force_update: bool = Fal def get_metadata(self, issuer: Optional[str] = None, force_update: bool = False) -> dict: """ Yields a dictionary of metadata about an issuer, according to some trust model. + + :param issuer: The issuer + :type issuer: str + :param force_update: If the metadata should be updated even if it is already present in the cache + :type force_update: bool + + :returns: The metadata + :rtype: dict """ trust_source = self._get_trust_source(issuer, force_update) if not trust_source.metadata: - raise Exception( + raise NoMetadata( f"no trust evaluator can provide metadata for {issuer}: " f"searched among: {self.handlers_names}" ) + + metadata = trust_source.metadata.copy() + + if "jwks" in metadata and "keys" in metadata["jwks"]: + metadata["jwks"]["keys"] = [key_from_jwk_dict(jwk).serialize(private=False) for jwk in metadata["jwks"]["keys"]] - return trust_source.metadata + return metadata def is_revoked(self, issuer: Optional[str] = None, force_update: bool = False) -> bool: """ diff --git a/pyeudiw/trust/exceptions.py b/pyeudiw/trust/exceptions.py index d557a3b2c..a5cf3c035 100644 --- a/pyeudiw/trust/exceptions.py +++ b/pyeudiw/trust/exceptions.py @@ -32,3 +32,6 @@ class TrustConfigurationError(Exception): class NoCriptographicMaterial(Exception): pass + +class NoMetadata(Exception): + pass \ No newline at end of file diff --git a/pyeudiw/trust/handler/direct_trust_sd_jwt_vc.py b/pyeudiw/trust/handler/direct_trust_sd_jwt_vc.py index e8e48f3f3..fcd78be56 100644 --- a/pyeudiw/trust/handler/direct_trust_sd_jwt_vc.py +++ b/pyeudiw/trust/handler/direct_trust_sd_jwt_vc.py @@ -1,6 +1,7 @@ from pyeudiw.tools.utils import cacheable_get_http_url, get_http_url from pyeudiw.trust.handler._direct_trust_jwk import _DirectTrustJwkHandler from pyeudiw.trust.model.trust_source import TrustSourceData +from cryptojwt.jwk.jwk import key_from_jwk_dict from .commons import DEFAULT_HTTPC_PARAMS, DEFAULT_OPENID4VCI_METADATA_ENDPOINT @@ -45,14 +46,19 @@ def get_metadata( """ url = build_metadata_issuer_endpoint(issuer, self.metadata_endpoint) if self.cache_ttl == 0: - trust_source.metadata = get_http_url( + metadata = get_http_url( url, self.httpc_params, self.http_async_calls )[0].json() else: - trust_source.metadata = cacheable_get_http_url( + metadata = cacheable_get_http_url( self.cache_ttl, url, self.httpc_params, self.http_async_calls ).json() + if "jwks" in metadata and "keys" in metadata["jwks"]: + metadata["jwks"]["keys"] = [key_from_jwk_dict(jwk).serialize(private=False) for jwk in metadata["jwks"]["keys"]] + + trust_source.metadata = metadata + return trust_source diff --git a/pyeudiw/trust/model/trust_source.py b/pyeudiw/trust/model/trust_source.py index 6da7bd2b8..9710f8699 100644 --- a/pyeudiw/trust/model/trust_source.py +++ b/pyeudiw/trust/model/trust_source.py @@ -2,6 +2,8 @@ from datetime import datetime from typing import Optional +from cryptojwt.jwk.jwk import key_from_jwk_dict + @dataclass class TrustParameterData: """ @@ -31,8 +33,8 @@ def __init__( self.attribute_name = attribute_name self.expiration_date = expiration_date - self.jwks = jwks self.trust_handler_name = trust_handler_name + self.jwks = [key_from_jwk_dict(jwk).serialize(private=False) for jwk in jwks] for type, tp in kwargs.items(): setattr(self, type, tp) @@ -47,7 +49,7 @@ def serialize(self) -> dict[str, any]: return { "attribute_name": self.attribute_name, "expiration_date": self.expiration_date, - "jwks": self.jwks, + "jwks": [key_from_jwk_dict(jwk).serialize(private=False) for jwk in self.jwks], "trust_handler_name": self.trust_handler_name, self.attribute_name: getattr(self, self.attribute_name) } @@ -93,9 +95,13 @@ def __init__( """ self.entity_id = entity_id self.policies = policies - self.metadata = metadata self.revoked = revoked + if "jwks" in metadata and "keys" in metadata["jwks"]: + metadata["jwks"]["keys"] = [key_from_jwk_dict(jwk).serialize(private=False) for jwk in metadata["jwks"]["keys"]] + + self.metadata = metadata + for type, tp in kwargs.items(): setattr(self, type, TrustParameterData(**tp)) @@ -161,10 +167,16 @@ def serialize(self) -> dict[str, any]: trust_source = { "entity_id": self.entity_id, "policies": self.policies, - "metadata": self.metadata, "revoked": self.revoked, } + tmp_metadata = self.metadata.copy() + + if "jwks" in tmp_metadata and "keys" in tmp_metadata["jwks"]: + tmp_metadata["jwks"]["keys"] = [key_from_jwk_dict(jwk).serialize(private=False) for jwk in tmp_metadata["jwks"]["keys"]] + + trust_source["metadata"] = tmp_metadata + for type in dir(self): if isinstance(getattr(self, type), TrustParameterData): trust_source[type] = getattr(self, type).serialize()