diff --git a/sdk/keyvault/azure-keyvault-keys/CHANGELOG.md b/sdk/keyvault/azure-keyvault-keys/CHANGELOG.md index 9a17ff3db8c0..0574f148a2f6 100644 --- a/sdk/keyvault/azure-keyvault-keys/CHANGELOG.md +++ b/sdk/keyvault/azure-keyvault-keys/CHANGELOG.md @@ -1,14 +1,12 @@ # Release History -## 4.12.0b2 (Unreleased) +## 4.12.0b2 (2026-05-29) ### Features Added -### Breaking Changes - -### Bugs Fixed - -### Other Changes +- Added the `ExternalKey` model and the new `KeyClient.create_external_key` method + for registering a Key Vault key whose material is held in an external HSM [#47200](https://github.com/Azure/azure-sdk-for-python/pull/47200). +- Added the `KeyProperties.external_key` read-only property. ## 4.12.0b1 (2026-05-26) diff --git a/sdk/keyvault/azure-keyvault-keys/README.md b/sdk/keyvault/azure-keyvault-keys/README.md index 53261a130b56..85b76f9a8c95 100644 --- a/sdk/keyvault/azure-keyvault-keys/README.md +++ b/sdk/keyvault/azure-keyvault-keys/README.md @@ -86,6 +86,7 @@ This section contains code snippets covering common tasks: * [Update an existing key](#update-an-existing-key) * [Delete a key](#delete-a-key) * [Configure automatic key rotation](#configure-automatic-key-rotation) +* [Register external keys](#register-external-keys-managed-hsm-only) * [List keys](#list-keys) * [Perform cryptographic operations](#cryptographic-operations) * [Async API](#async-api) @@ -204,6 +205,29 @@ print(f"Rotated the key on-demand; new version is {rotated_key.properties.versio +### Register external keys (Managed HSM only) +[create_external_key](https://aka.ms/azsdk/python/keyvault-keys/docs#azure.keyvault.keys.KeyClient.create_external_key) +registers an external key with a Managed HSM that is configured to use External Key Management (EKM). The external HSM +owns the key material; the Managed HSM stores only a reference to the key. + +> **NOTE:** External keys are only supported on Managed HSM, not regular Key Vault. The Managed HSM must be configured +> with an external HSM source. + +```python +from azure.identity import DefaultAzureCredential +from azure.keyvault.keys import ExternalKey, KeyClient + +credential = DefaultAzureCredential() + +key_client = KeyClient(vault_url="https://my-managed-hsm.managedhsm.azure.net/", credential=credential) + +external_key = ExternalKey(id="external-key-reference-id") +key = key_client.create_external_key("external-key-name", external_key=external_key) + +print(key.name) +print(key.properties.external_key.id) +``` + ### List keys [list_properties_of_keys](https://aka.ms/azsdk/python/keyvault-keys/docs#azure.keyvault.keys.KeyClient.list_properties_of_keys) lists the properties of all of the keys in the client's vault. diff --git a/sdk/keyvault/azure-keyvault-keys/assets.json b/sdk/keyvault/azure-keyvault-keys/assets.json index 112f3a5b3c5e..be0788c9d3ab 100644 --- a/sdk/keyvault/azure-keyvault-keys/assets.json +++ b/sdk/keyvault/azure-keyvault-keys/assets.json @@ -2,5 +2,5 @@ "AssetsRepo": "Azure/azure-sdk-assets", "AssetsRepoPrefixPath": "python", "TagPrefix": "python/keyvault/azure-keyvault-keys", - "Tag": "python/keyvault/azure-keyvault-keys_e47dc10e22" + "Tag": "python/keyvault/azure-keyvault-keys_652b9e1d39" } diff --git a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/__init__.py b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/__init__.py index ace593d6ff4d..e706108a5a00 100644 --- a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/__init__.py +++ b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/__init__.py @@ -12,6 +12,7 @@ from ._shared.client_base import ApiVersion from ._models import ( DeletedKey, + ExternalKey, JsonWebKey, KeyAttestation, KeyProperties, @@ -27,6 +28,7 @@ __all__ = [ "ApiVersion", "KeyClient", + "ExternalKey", "JsonWebKey", "KeyAttestation", "KeyVaultKey", diff --git a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_client.py b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_client.py index b1c2eb3a4b93..399d89d22e4a 100644 --- a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_client.py +++ b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_client.py @@ -17,7 +17,15 @@ from ._models import JsonWebKey, KeyRotationLifetimeAction from ._shared import KeyVaultClientBase from ._shared._polling import DeleteRecoverPollingMethod, KeyVaultOperationPoller -from ._models import DeletedKey, KeyVaultKey, KeyProperties, KeyReleasePolicy, KeyRotationPolicy, ReleaseKeyResult +from ._models import ( + DeletedKey, + ExternalKey, + KeyVaultKey, + KeyProperties, + KeyReleasePolicy, + KeyRotationPolicy, + ReleaseKeyResult, +) def _get_key_id(vault_url, key_name, version=None): @@ -57,6 +65,7 @@ def _get_attributes( not_before: Optional[datetime], expires_on: Optional[datetime], exportable: Optional[bool] = None, + external_key: Optional[ExternalKey] = None, ) -> Optional[KeyAttributes]: """Return a KeyAttributes object if non-None attributes are provided, or None otherwise. @@ -68,13 +77,25 @@ def _get_attributes( :type expires_on: ~datetime.datetime or None :param exportable: Whether the private key can be exported. :type exportable: bool or None + :param external_key: A reference to an external key, when registering an external key. + :type external_key: ~azure.keyvault.keys.ExternalKey or None :returns: An autorest-generated model of the key's attributes. :rtype: KeyAttributes """ - if enabled is not None or not_before is not None or expires_on is not None or exportable is not None: + if ( + enabled is not None + or not_before is not None + or expires_on is not None + or exportable is not None + or external_key is not None + ): return self._models.KeyAttributes( - enabled=enabled, not_before=not_before, expires=expires_on, exportable=exportable + enabled=enabled, + not_before=not_before, + expires=expires_on, + exportable=exportable, + external_key=external_key._to_generated() if external_key is not None else None, ) return None @@ -398,6 +419,64 @@ def create_oct_key( **kwargs, ) + @distributed_trace + def create_external_key( + self, + name: str, + external_key: ExternalKey, + *, + enabled: Optional[bool] = None, + tags: Optional[Dict[str, str]] = None, + not_before: Optional[datetime] = None, + expires_on: Optional[datetime] = None, + release_policy: Optional[KeyReleasePolicy] = None, + **kwargs: Any, + ) -> KeyVaultKey: + """Register a Managed HSM key that points at material managed by an external HSM. + + Requires the keys/create permission. Only available with API version + ``2026-01-01-preview`` and newer, and only supported on Managed HSM. + + :param str name: The name for the new key. + :param external_key: A reference identifying the external key material. + :type external_key: ~azure.keyvault.keys.ExternalKey + + :keyword enabled: Whether the key is enabled for use. + :paramtype enabled: bool or None + :keyword tags: Application specific metadata in the form of key-value pairs. + :paramtype tags: dict[str, str] or None + :keyword not_before: Not before date of the key in UTC. + :paramtype not_before: ~datetime.datetime or None + :keyword expires_on: Expiry date of the key in UTC. + :paramtype expires_on: ~datetime.datetime or None + :keyword release_policy: The policy rules under which the key can be exported. + :paramtype release_policy: ~azure.keyvault.keys.KeyReleasePolicy or None + + :returns: The created key. + :rtype: ~azure.keyvault.keys.KeyVaultKey + + :raises ~azure.core.exceptions.HttpResponseError: + """ + attributes = self._get_attributes( + enabled=enabled, not_before=not_before, expires_on=expires_on, external_key=external_key + ) + + policy = release_policy + if policy is not None: + policy = self._models.KeyReleasePolicy( + encoded_policy=policy.encoded_policy, content_type=policy.content_type, immutable=policy.immutable + ) + # External keys are mutually exclusive with `kty`. The generated overload requires `kty`, + # but the runtime constructor accepts arbitrary kwargs. + parameters = self._models.KeyCreateParameters( # type: ignore[call-overload] + key_attributes=attributes, + tags=tags, + release_policy=policy, + ) + + bundle = self._client.create_key(key_name=name, parameters=parameters, **kwargs) + return KeyVaultKey._from_key_bundle(bundle) + @distributed_trace def begin_delete_key( # pylint:disable=bad-option-value,delete-operation-wrong-return-type self, name: str, **kwargs: Any diff --git a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_generated/_utils/model_base.py b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_generated/_utils/model_base.py index 4102784f9a85..d725c55906d3 100644 --- a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_generated/_utils/model_base.py +++ b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_generated/_utils/model_base.py @@ -590,6 +590,239 @@ def _create_value(rf: typing.Optional["_RestField"], value: typing.Any) -> typin return _serialize(value, rf._format) +# ============================================================================ +# Fast-path scalar deserializer functions for rest_field(deserializer=...) +# These are referenced from rest_field declarations to bypass the generic +# _deserialize -> _deserialize_with_callable chain. +# Only simple/primitive types — no models or container types. +# ============================================================================ + + +def _xml_deser_str(value): + if isinstance(value, ET.Element): + return value.text or "" + return str(value) if value is not None else None + + +def _xml_deser_int(value): + if isinstance(value, ET.Element): + return int(value.text) if value.text else None + return int(value) if value is not None else None + + +def _xml_deser_float(value): + if isinstance(value, ET.Element): + return float(value.text) if value.text else None + return float(value) if value is not None else None + + +def _xml_deser_bool(value): + if isinstance(value, ET.Element): + text = value.text + else: + text = value + if text is None: + return None + if text in (True, False): + return text + return text.lower() == "true" + + +# pylint: disable=docstring-missing-param +def _xml_deser_bytes(value): + """Deserialize bytes from XML (base64).""" + if isinstance(value, ET.Element): + text = value.text + else: + text = value + if text is None: + return None + return _deserialize_bytes(text) + + +def _xml_deser_bytes_base64url(value): + """Deserialize bytes from XML (base64url).""" + if isinstance(value, ET.Element): + text = value.text + else: + text = value + if text is None: + return None + return _deserialize_bytes_base64(text) + + +def _xml_deser_datetime(value): + """Deserialize a datetime from XML (ISO 8601 / rfc3339).""" + if isinstance(value, ET.Element): + text = value.text + else: + text = value + if text is None: + return None + return _deserialize_datetime(text) + + +def _xml_deser_datetime_rfc7231(value): + """Deserialize a datetime from XML (RFC7231 format).""" + if isinstance(value, ET.Element): + text = value.text + else: + text = value + if text is None: + return None + return _deserialize_datetime_rfc7231(text) + + +def _xml_deser_datetime_unix_timestamp(value): + """Deserialize a datetime from XML (Unix timestamp).""" + if isinstance(value, ET.Element): + text = value.text + else: + text = value + if text is None: + return None + return _deserialize_datetime_unix_timestamp(float(text)) + + +def _xml_deser_date(value): + """Deserialize a date from XML (ISO 8601).""" + if isinstance(value, ET.Element): + text = value.text + else: + text = value + if text is None: + return None + return _deserialize_date(text) + + +def _xml_deser_time(value): + """Deserialize a time from XML (ISO 8601).""" + if isinstance(value, ET.Element): + text = value.text + else: + text = value + if text is None: + return None + return _deserialize_time(text) + + +def _xml_deser_duration(value): + """Deserialize a timedelta from XML (ISO 8601 duration).""" + if isinstance(value, ET.Element): + text = value.text + else: + text = value + if text is None: + return None + return _deserialize_duration(text) + + +def _xml_deser_decimal(value): + """Deserialize a Decimal from XML.""" + if isinstance(value, ET.Element): + text = value.text + else: + text = value + if text is None: + return None + return _deserialize_decimal(text) + + +def _xml_deser_enum_or_str(enum_cls, value): + """Deserialize a Union[EnumType, str] from XML.""" + text = value.text if isinstance(value, ET.Element) else value + if text is None: + return None + try: + return enum_cls(text) + except ValueError: + return text + + +def _extract_xml_model_type(rf_type): + """Extract the concrete Model class from a resolved rf._type partial chain. + + Unwraps ``Optional[Model]`` and ``_deserialize_model(Model, ...)`` + wrappers. Only handles Model and Optional[Model] — other composite + types (List, Dict, Union, etc.) return None and fall through to the + generic ``_deserialize`` path at runtime. + """ + if rf_type is None: + return None + if isinstance(rf_type, type) and _is_model(rf_type): + return rf_type + if not isinstance(rf_type, functools.partial): + return None + func = rf_type.func + args = rf_type.args + if func is _deserialize_with_optional and args: + return _extract_xml_model_type(args[0]) + if func is _deserialize_model and args: + cls = args[0] + return cls if isinstance(cls, type) and _is_model(cls) else None + return None + + +def _build_xml_field_plan( # pylint: disable=docstring-missing-return, docstring-missing-rtype, unused-variable + cls, attr_to_rest_field: dict +) -> list: + """Build a precomputed XML field plan for fast _init_from_xml iteration. + + Called once per model class in __new__. Returns a list of tuples: + (rest_name, xml_name, kind, deser, rf_type, is_optional, items_name) + + kind: 0=wrapped, 1=attribute, 2=unwrapped, 3=text + + For Model and Optional[Model] fields that lack a scalar + ``_deserializer``, this function precomputes the Model class as the + deserializer so ``_init_from_xml`` can call ``ModelClass(element)`` + directly instead of going through the expensive + ``_get_deserialize_callable_from_annotation`` chain at runtime. + """ + model_meta = getattr(cls, "_xml", {}) + model_ns = model_meta.get("ns") or model_meta.get("namespace") + plan = [] + + for rf in attr_to_rest_field.values(): + prop_meta = getattr(rf, "_xml", {}) + deser = rf._deserializer + + xml_name = prop_meta.get("name", rf._rest_name) + xml_ns = _resolve_xml_ns(prop_meta, model_meta) + if xml_ns: + xml_name = "{" + xml_ns + "}" + xml_name + + is_optional = rf._is_optional + + # For Model / Optional[Model] fields without a scalar deserializer, + # precompute the Model class as the deserializer. + if deser is None and rf._type is not None: + model_cls = _extract_xml_model_type(rf._type) + if model_cls is not None: + deser = model_cls + + if prop_meta.get("attribute", False): + plan.append((rf._rest_name, xml_name, 1, deser, rf._type, is_optional, None)) + elif prop_meta.get("unwrapped", False): + items_name = prop_meta.get("itemsName") + if items_name: + items_ns = prop_meta.get("itemsNs") + if items_ns is not None: + xml_ns = items_ns + if xml_ns: + items_name = "{" + xml_ns + "}" + items_name + else: + items_name = xml_name + plan.append((rf._rest_name, xml_name, 2, deser, rf._type, is_optional, items_name)) + elif prop_meta.get("text", False): + plan.append((rf._rest_name, xml_name, 3, deser, rf._type, is_optional, None)) + else: + plan.append((rf._rest_name, xml_name, 0, deser, rf._type, is_optional, None)) + + return plan + + +# pylint: enable=docstring-missing-param class Model(_MyMutableMapping): _is_model = True # label whether current class's _attr_to_rest_field has been calculated @@ -630,7 +863,9 @@ def __init__(self, *args: typing.Any, **kwargs: typing.Any) -> None: dict_to_pass[rf._rest_name] = _create_value(rf, rf._default) super().__init__(dict_to_pass) - def _init_from_xml(self, element: ET.Element) -> dict[str, typing.Any]: + def _init_from_xml( # pylint: disable=too-many-branches, too-many-statements + self, element: ET.Element + ) -> dict[str, typing.Any]: """Deserialize an XML element into a dict mapping rest field names to values. :param ET.Element element: The XML element to deserialize from. @@ -638,53 +873,89 @@ def _init_from_xml(self, element: ET.Element) -> dict[str, typing.Any]: :rtype: dict """ result: dict[str, typing.Any] = {} - model_meta = getattr(self, "_xml", {}) existed_attr_keys: list[str] = [] - for rf in self._attr_to_rest_field.values(): - prop_meta = getattr(rf, "_xml", {}) - xml_name = prop_meta.get("name", rf._rest_name) - xml_ns = _resolve_xml_ns(prop_meta, model_meta) - if xml_ns: - xml_name = "{" + xml_ns + "}" + xml_name - - # attribute - if prop_meta.get("attribute", False) and element.get(xml_name) is not None: - existed_attr_keys.append(xml_name) - result[rf._rest_name] = _deserialize(rf._type, element.get(xml_name)) - continue - - # unwrapped element is array - if prop_meta.get("unwrapped", False): - # unwrapped array could either use prop items meta/prop meta - _items_name = prop_meta.get("itemsName") - if _items_name: - xml_name = _items_name - _items_ns = prop_meta.get("itemsNs") - if _items_ns is not None: - xml_ns = _items_ns - if xml_ns: - xml_name = "{" + xml_ns + "}" + xml_name - items = element.findall(xml_name) # pyright: ignore - if len(items) > 0: + field_plan = getattr(self, "_xml_field_plan", None) + if field_plan: + for rest_name, xml_name, kind, deser, rf_type, is_optional, items_name in field_plan: + if kind == 0: # wrapped element (most common) + item = element.find(xml_name) + if item is not None: + existed_attr_keys.append(xml_name) + if deser: + result[rest_name] = deser(item) + else: + result[rest_name] = _deserialize(rf_type, item) + elif kind == 1: # attribute + attr_val = element.get(xml_name) + if attr_val is not None: + existed_attr_keys.append(xml_name) + if deser: + result[rest_name] = deser(attr_val) + else: + result[rest_name] = attr_val + elif kind == 2: # unwrapped array + items = element.findall(items_name) # pyright: ignore + if len(items) > 0: + existed_attr_keys.append(items_name) + if deser: + result[rest_name] = deser(items) + else: + result[rest_name] = _deserialize(rf_type, items) + elif not is_optional: + existed_attr_keys.append(items_name) + result[rest_name] = [] + elif kind == 3: # text + if element.text is not None: + if deser: + result[rest_name] = deser(element.text) + else: + result[rest_name] = element.text + else: + model_meta = getattr(self, "_xml", {}) + for rf in self._attr_to_rest_field.values(): + prop_meta = getattr(rf, "_xml", {}) + xml_name = prop_meta.get("name", rf._rest_name) + xml_ns = _resolve_xml_ns(prop_meta, model_meta) + if xml_ns: + xml_name = "{" + xml_ns + "}" + xml_name + + # attribute + if prop_meta.get("attribute", False) and element.get(xml_name) is not None: existed_attr_keys.append(xml_name) - result[rf._rest_name] = _deserialize(rf._type, items) - elif not rf._is_optional: + result[rf._rest_name] = _deserialize(rf._type, element.get(xml_name)) + continue + + # unwrapped element is array + if prop_meta.get("unwrapped", False): + _items_name = prop_meta.get("itemsName") + if _items_name: + xml_name = _items_name + _items_ns = prop_meta.get("itemsNs") + if _items_ns is not None: + xml_ns = _items_ns + if xml_ns: + xml_name = "{" + xml_ns + "}" + xml_name + items = element.findall(xml_name) # pyright: ignore + if len(items) > 0: + existed_attr_keys.append(xml_name) + result[rf._rest_name] = _deserialize(rf._type, items) + elif not rf._is_optional: + existed_attr_keys.append(xml_name) + result[rf._rest_name] = [] + continue + + # text element is primitive type + if prop_meta.get("text", False): + if element.text is not None: + result[rf._rest_name] = _deserialize(rf._type, element.text) + continue + + # wrapped element could be normal property or array + item = element.find(xml_name) + if item is not None: existed_attr_keys.append(xml_name) - result[rf._rest_name] = [] - continue - - # text element is primitive type - if prop_meta.get("text", False): - if element.text is not None: - result[rf._rest_name] = _deserialize(rf._type, element.text) - continue - - # wrapped element could be normal property or array, it should only have one element - item = element.find(xml_name) - if item is not None: - existed_attr_keys.append(xml_name) - result[rf._rest_name] = _deserialize(rf._type, item) + result[rf._rest_name] = _deserialize(rf._type, item) # rest thing is additional properties for e in element: @@ -717,6 +988,9 @@ def __new__(cls, *args: typing.Any, **kwargs: typing.Any) -> Self: if not rf._rest_name_input: rf._rest_name_input = attr cls._attr_to_rest_field: dict[str, _RestField] = dict(attr_to_rest_field.items()) + # Build XML field plan for fast _init_from_xml (only for XML models) + if getattr(cls, "_xml", None): + cls._xml_field_plan = _build_xml_field_plan(cls, attr_to_rest_field) cls._calculated.add(f"{cls.__module__}.{cls.__qualname__}") return super().__new__(cls) @@ -1091,6 +1365,7 @@ def __init__( format: typing.Optional[str] = None, is_multipart_file_input: bool = False, xml: typing.Optional[dict[str, typing.Any]] = None, + deserializer: typing.Optional[typing.Callable] = None, ): self._type = type self._rest_name_input = name @@ -1103,6 +1378,7 @@ def __init__( self._format = format self._is_multipart_file_input = is_multipart_file_input self._xml = xml if xml is not None else {} + self._deserializer = deserializer @property def _class_type(self) -> typing.Any: @@ -1138,7 +1414,11 @@ def __get__(self, obj: Model, type=None): # pylint: disable=redefined-builtin # Return the value from _data directly (it's been deserialized in place) return obj._data.get(self._rest_name) - deserialized = _deserialize(self._type, _serialize(item, self._format), rf=self) + # Fast path: use _deserializer directly (avoids _serialize/_deserialize chain) + if self._deserializer: + deserialized = self._deserializer(item) + else: + deserialized = _deserialize(self._type, _serialize(item, self._format), rf=self) # For mutable types, store the deserialized value back in _data # so mutations directly affect _data @@ -1184,6 +1464,7 @@ def rest_field( format: typing.Optional[str] = None, is_multipart_file_input: bool = False, xml: typing.Optional[dict[str, typing.Any]] = None, + deserializer: typing.Optional[typing.Callable] = None, ) -> typing.Any: return _RestField( name=name, @@ -1193,6 +1474,7 @@ def rest_field( format=format, is_multipart_file_input=is_multipart_file_input, xml=xml, + deserializer=deserializer, ) @@ -1426,6 +1708,8 @@ def _deserialize_xml( value: str, ) -> typing.Any: element = ET.fromstring(value) # nosec + if _is_model(deserializer): + return deserializer._deserialize(element, []) return _deserialize(deserializer, element) diff --git a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_generated/_utils/serialization.py b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_generated/_utils/serialization.py index 954bf7ebffa7..a088671e9c51 100644 --- a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_generated/_utils/serialization.py +++ b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_generated/_utils/serialization.py @@ -1405,7 +1405,7 @@ def __init__(self, classes: Optional[Mapping[str, type]] = None) -> None: # Otherwise, result are unexpected self.additional_properties_detection = True - def __call__(self, target_obj, response_data, content_type=None): + def __call__(self, target_obj, response_data, content_type=None): # pylint: disable=too-many-return-statements """Call the deserializer to process a REST response. :param str target_obj: Target data type to deserialize to. @@ -1415,6 +1415,27 @@ def __call__(self, target_obj, response_data, content_type=None): :return: Deserialized object. :rtype: object """ + # Fast path for header deserialization: response_data is a plain str or None + # and target_obj is a simple scalar type. This avoids the expensive + # _unpack_content → _deserialize → _classify_target → deserialize_data chain. + if response_data is None: + return None + if target_obj == "str" and isinstance(response_data, str): + return response_data + if isinstance(response_data, str): + if target_obj == "int": + return int(response_data) + if target_obj == "bool": + if response_data in ("true", "1", "True"): + return True + if response_data in ("false", "0", "False"): + return False + return bool(response_data) + if target_obj == "rfc-1123": + return Deserializer.deserialize_rfc(response_data) + if target_obj == "bytearray": + return Deserializer.deserialize_bytearray(response_data) + data = self._unpack_content(response_data, content_type) return self._deserialize(target_obj, data) diff --git a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_models.py b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_models.py index 487b58416427..97eca0b2437a 100644 --- a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_models.py +++ b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/_models.py @@ -93,6 +93,34 @@ def _from_generated(cls, attestation: "_models.KeyAttestation") -> "KeyAttestati ) +class ExternalKey: + """A reference to a key whose material lives outside Azure Managed HSM. + + Used with :func:`~azure.keyvault.keys.KeyClient.create_external_key` (and its async equivalent) + to register a Managed HSM key that points at material stored in an external HSM. Only available + with API version ``2026-01-01-preview`` and newer, and only supported on Managed HSM. + + :keyword str id: The external key identifier. Must contain only characters in the set + ``[a-zA-Z0-9-]`` and be at most 64 characters long. Required. + """ + + def __init__(self, *, id: str) -> None: # pylint: disable=redefined-builtin + self.id = id + + def __repr__(self) -> str: + return f""[:1024] + + @classmethod + def _from_generated(cls, external_key: "_models.ExternalKey") -> "ExternalKey": + return cls(id=external_key.id) + + def _to_generated(self) -> "_models.ExternalKey": + # Imported lazily to avoid importing the generated layer when this class is not used. + from ._generated.models import ExternalKey as _ExternalKey + + return _ExternalKey(id=self.id) + + class KeyProperties(object): """A key's ID and attributes. @@ -332,6 +360,22 @@ def key_size(self) -> Optional[int]: return getattr(self._attributes, "key_size", None) return None + @property + def external_key(self) -> Optional[ExternalKey]: + """The external key reference, if available. + + Only available on Managed HSM with API version 2026-01-01-preview or later. + + :returns: A reference to the external key material backing this key, or None. + :rtype: ~azure.keyvault.keys.ExternalKey or None + """ + # external_key was added in 2026-01-01-preview + if self._attributes: + external_key = getattr(self._attributes, "external_key", None) + if external_key: + return ExternalKey._from_generated(external_key=external_key) # pylint:disable=protected-access + return None + class KeyReleasePolicy(object): """The policy rules under which a key can be exported. diff --git a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/aio/_client.py b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/aio/_client.py index 41084614a416..b8443db4377f 100644 --- a/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/aio/_client.py +++ b/sdk/keyvault/azure-keyvault-keys/azure/keyvault/keys/aio/_client.py @@ -20,6 +20,7 @@ from .._shared import AsyncKeyVaultClientBase from .. import ( DeletedKey, + ExternalKey, JsonWebKey, KeyProperties, KeyReleasePolicy, @@ -63,6 +64,7 @@ def _get_attributes( not_before: Optional[datetime], expires_on: Optional[datetime], exportable: Optional[bool] = None, + external_key: Optional[ExternalKey] = None, ) -> Optional[KeyAttributes]: """Return a KeyAttributes object if non-None attributes are provided, or None otherwise. @@ -74,13 +76,25 @@ def _get_attributes( :type expires_on: ~datetime.datetime or None :param exportable: Whether the private key can be exported. :type exportable: bool or None + :param external_key: A reference to an external key, when registering an external key. + :type external_key: ~azure.keyvault.keys.ExternalKey or None :returns: An autorest-generated model of the key's attributes. :rtype: KeyAttributes """ - if enabled is not None or not_before is not None or expires_on is not None or exportable is not None: + if ( + enabled is not None + or not_before is not None + or expires_on is not None + or exportable is not None + or external_key is not None + ): return self._models.KeyAttributes( - enabled=enabled, not_before=not_before, expires=expires_on, exportable=exportable + enabled=enabled, + not_before=not_before, + expires=expires_on, + exportable=exportable, + external_key=external_key._to_generated() if external_key is not None else None, ) return None @@ -409,6 +423,64 @@ async def create_oct_key( **kwargs, ) + @distributed_trace_async + async def create_external_key( + self, + name: str, + external_key: ExternalKey, + *, + enabled: Optional[bool] = None, + tags: Optional[Dict[str, str]] = None, + not_before: Optional[datetime] = None, + expires_on: Optional[datetime] = None, + release_policy: Optional[KeyReleasePolicy] = None, + **kwargs: Any, + ) -> KeyVaultKey: + """Register a Managed HSM key that points at material managed by an external HSM. + + Requires the keys/create permission. Only available with API version + ``2026-01-01-preview`` and newer, and only supported on Managed HSM. + + :param str name: The name for the new key. + :param external_key: A reference identifying the external key material. + :type external_key: ~azure.keyvault.keys.ExternalKey + + :keyword enabled: Whether the key is enabled for use. + :paramtype enabled: bool or None + :keyword tags: Application specific metadata in the form of key-value pairs. + :paramtype tags: dict[str, str] or None + :keyword not_before: Not before date of the key in UTC. + :paramtype not_before: ~datetime.datetime or None + :keyword expires_on: Expiry date of the key in UTC. + :paramtype expires_on: ~datetime.datetime or None + :keyword release_policy: The policy rules under which the key can be exported. + :paramtype release_policy: ~azure.keyvault.keys.KeyReleasePolicy or None + + :returns: The created key. + :rtype: ~azure.keyvault.keys.KeyVaultKey + + :raises ~azure.core.exceptions.HttpResponseError: + """ + attributes = self._get_attributes( + enabled=enabled, not_before=not_before, expires_on=expires_on, external_key=external_key + ) + + policy = release_policy + if policy is not None: + policy = self._models.KeyReleasePolicy( + encoded_policy=policy.encoded_policy, content_type=policy.content_type, immutable=policy.immutable + ) + # External keys are mutually exclusive with `kty`. The generated overload requires `kty`, + # but the runtime constructor accepts arbitrary kwargs. + parameters = self._models.KeyCreateParameters( # type: ignore[call-overload] + key_attributes=attributes, + tags=tags, + release_policy=policy, + ) + + bundle = await self._client.create_key(key_name=name, parameters=parameters, **kwargs) + return KeyVaultKey._from_key_bundle(bundle) + @distributed_trace_async async def delete_key(self, name: str, **kwargs: Any) -> DeletedKey: """Delete all versions of a key and its cryptographic material. diff --git a/sdk/keyvault/azure-keyvault-keys/samples/README.md b/sdk/keyvault/azure-keyvault-keys/samples/README.md index cb976e6c6951..605934cb4775 100644 --- a/sdk/keyvault/azure-keyvault-keys/samples/README.md +++ b/sdk/keyvault/azure-keyvault-keys/samples/README.md @@ -36,12 +36,16 @@ pip install azure-keyvault-keys azure-identity | [backup_restore_operations.py][backup_operations_sample] ([async version][backup_operations_async_sample]) | back up and recover keys | | [recover_purge_operations.py][recover_purge_sample] ([async version][recover_purge_async_sample]) | recover and purge keys | | [key_rotation.py][key_rotation_sample] ([async version][key_rotation_async_sample]) | create/update key rotation policies and rotate keys on-demand | +| [external_key_operations.py][external_key_sample] ([async version][external_key_async_sample]) | register and manage external keys with Managed HSM (EKM) | | [send_request.py][send_request_sample] | use the `send_request` client method | [backup_operations_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/keyvault/azure-keyvault-keys/samples/backup_restore_operations.py [backup_operations_async_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/keyvault/azure-keyvault-keys/samples/backup_restore_operations_async.py +[external_key_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/keyvault/azure-keyvault-keys/samples/external_key_operations.py +[external_key_async_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/keyvault/azure-keyvault-keys/samples/external_key_operations_async.py + [hello_world_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/keyvault/azure-keyvault-keys/samples/hello_world.py [hello_world_async_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/keyvault/azure-keyvault-keys/samples/hello_world_async.py diff --git a/sdk/keyvault/azure-keyvault-keys/samples/external_key_operations.py b/sdk/keyvault/azure-keyvault-keys/samples/external_key_operations.py new file mode 100644 index 000000000000..6bb1fa9e84ba --- /dev/null +++ b/sdk/keyvault/azure-keyvault-keys/samples/external_key_operations.py @@ -0,0 +1,75 @@ +# pylint: disable=line-too-long,useless-suppression +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +import os + +from azure.identity import DefaultAzureCredential +from azure.keyvault.keys import ExternalKey, KeyClient + +# ---------------------------------------------------------------------------------------------------------- +# Prerequisites: +# 1. An Azure Key Vault Managed HSM (https://learn.microsoft.com/azure/key-vault/managed-hsm/quick-create-cli) +# +# 2. azure-keyvault-keys and azure-identity libraries (pip install these) +# +# 3. Set environment variable MANAGED_HSM_URL with the URL of your Managed HSM +# +# 4. Set up your environment to use azure-identity's DefaultAzureCredential. For more information about how to configure +# the DefaultAzureCredential, refer to https://aka.ms/azsdk/python/identity/docs#azure.identity.DefaultAzureCredential +# +# 5. Key create, get, and delete permissions for your service principal in your Managed HSM +# +# 6. The Managed HSM is configured with an external HSM source that owns the key material referenced by external_key.id +# +# ---------------------------------------------------------------------------------------------------------- +# Sample - demonstrates External Key Management (EKM) operations against a Managed HSM that is backed by an +# external HSM. The external HSM owns the key material; Managed HSM stores a reference (`ExternalKey.id`) to +# that key. +# +# Note: External Key Management requires API version 2026-01-01-preview or later and is only supported on +# Managed HSM (not regular Key Vault). +# +# 1. Register a key whose material is owned by an external HSM (create_external_key) +# +# 2. Retrieve the key and inspect the external_key reference (get_key) +# +# 3. Delete the key registration (begin_delete_key) +# +# 4. Purge the key registration (purge_deleted_key) +# ---------------------------------------------------------------------------------------------------------- + +# Instantiate a key client that will be used to call the service. +# Here we use the DefaultAzureCredential, but any azure-identity credential can be used. +MANAGED_HSM_URL = os.environ["MANAGED_HSM_URL"] +credential = DefaultAzureCredential() +client = KeyClient(vault_url=MANAGED_HSM_URL, credential=credential) + +# Build an ExternalKey that references the key material managed in the external HSM. +# The id must be at most 64 characters and may only contain letters, digits, and hyphens. +print("\n.. Create an External Key") +key_name = "externalKeyName" +external_key = ExternalKey(id="external-key-reference-id") +key = client.create_external_key(key_name, external_key=external_key) +assert key.properties.external_key is not None +print(f"External key '{key.name}' was registered with external id '{key.properties.external_key.id}'.") +print(f"Key type is '{key.key_type}' and key size is '{key.properties.key_size}'.") + +# Read the registration back to confirm the external_key reference is round-tripped. +print("\n.. Get the External Key by name") +fetched = client.get_key(key.name) +assert fetched.properties.external_key is not None +print(f"Key with name '{fetched.name}' has external_key id '{fetched.properties.external_key.id}'.") +print(f"Key type is '{fetched.key_type}' and key size is '{fetched.properties.key_size}'.") + +# The external key registration is no longer used; delete it from the Managed HSM. +# Deleting the registration does not delete the key material in the external HSM. +print("\n.. Delete the External Key") +client.begin_delete_key(key.name).wait() +print(f"Deleted external key '{key.name}'.") + +# The deleted key registration still exists in the Managed HSM's soft-deleted state. Purge it to remove it permanently. +print("\n.. Purge the deleted External Key") +client.purge_deleted_key(key.name) +print(f"Purged external key '{key.name}'.") diff --git a/sdk/keyvault/azure-keyvault-keys/samples/external_key_operations_async.py b/sdk/keyvault/azure-keyvault-keys/samples/external_key_operations_async.py new file mode 100644 index 000000000000..faad2bac68ff --- /dev/null +++ b/sdk/keyvault/azure-keyvault-keys/samples/external_key_operations_async.py @@ -0,0 +1,87 @@ +# pylint: disable=line-too-long,useless-suppression +# ------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------ +import asyncio +import os + +from azure.identity.aio import DefaultAzureCredential +from azure.keyvault.keys import ExternalKey +from azure.keyvault.keys.aio import KeyClient + +# ---------------------------------------------------------------------------------------------------------- +# Prerequisites: +# 1. An Azure Key Vault Managed HSM (https://learn.microsoft.com/azure/key-vault/managed-hsm/quick-create-cli) +# +# 2. azure-keyvault-keys and azure-identity libraries (pip install these) +# +# 3. Set environment variable MANAGED_HSM_URL with the URL of your Managed HSM +# +# 4. Set up your environment to use azure-identity's DefaultAzureCredential. For more information about how to configure +# the DefaultAzureCredential, refer to https://aka.ms/azsdk/python/identity/docs#azure.identity.DefaultAzureCredential +# +# 5. Key create, get, and delete permissions for your service principal in your Managed HSM +# +# 6. The Managed HSM is configured with an external HSM source that owns the key material referenced by external_key.id +# +# ---------------------------------------------------------------------------------------------------------- +# Sample - demonstrates External Key Management (EKM) operations against a Managed HSM that is backed by an +# external HSM. The external HSM owns the key material; Managed HSM stores a reference (`ExternalKey.id`) to +# that key. +# +# Note: External Key Management requires API version 2026-01-01-preview or later and is only supported on +# Managed HSM (not regular Key Vault). +# +# 1. Register a key whose material is owned by an external HSM (create_external_key) +# +# 2. Retrieve the key and inspect the external_key reference (get_key) +# +# 3. Delete the key registration (begin_delete_key) +# +# 4. Purge the key registration (purge_deleted_key) +# ---------------------------------------------------------------------------------------------------------- + + +async def run_sample(): + # Instantiate a key client that will be used to call the service. + # Here we use the DefaultAzureCredential, but any azure-identity credential can be used. + MANAGED_HSM_URL = os.environ["MANAGED_HSM_URL"] + credential = DefaultAzureCredential() + client = KeyClient(vault_url=MANAGED_HSM_URL, credential=credential) + + # Build an ExternalKey that references the key material managed in the external HSM. + # The id must be at most 64 characters and may only contain letters, digits, and hyphens. + print("\n.. Create an External Key") + key_name = "externalKeyNameAsync" + external_key = ExternalKey(id="external-key-reference-id") + key = await client.create_external_key(key_name, external_key=external_key) + assert key.properties.external_key is not None + print(f"External key '{key.name}' was registered with external id '{key.properties.external_key.id}'.") + print(f"Key type is '{key.key_type}' and key size is '{key.properties.key_size}'.") + + # Read the registration back to confirm the external_key reference is round-tripped. + print("\n.. Get the External Key by name") + fetched = await client.get_key(key.name) + assert fetched.properties.external_key is not None + print(f"Key with name '{fetched.name}' has external_key id '{fetched.properties.external_key.id}'.") + print(f"Key type is '{fetched.key_type}' and key size is '{fetched.properties.key_size}'.") + + # The external key registration is no longer used; delete it from the Managed HSM. + # Deleting the registration does not delete the key material in the external HSM. + print("\n.. Delete the External Key") + await client.delete_key(key.name) + print(f"Deleted external key '{key.name}'.") + + # The deleted key registration still exists in the Managed HSM's soft-deleted state. Purge it to remove it permanently. + print("\n.. Purge the deleted External Key") + await client.purge_deleted_key(key.name) + print(f"Purged external key '{key.name}'.") + + print("\nrun_sample done") + await credential.close() + await client.close() + + +if __name__ == "__main__": + asyncio.run(run_sample()) diff --git a/sdk/keyvault/azure-keyvault-keys/tests/_async_test_case.py b/sdk/keyvault/azure-keyvault-keys/tests/_async_test_case.py index ad68d6a8a1e4..fc9936b7d041 100644 --- a/sdk/keyvault/azure-keyvault-keys/tests/_async_test_case.py +++ b/sdk/keyvault/azure-keyvault-keys/tests/_async_test_case.py @@ -22,15 +22,18 @@ class AsyncKeysClientPreparer(AzureRecordedTestCase): def __init__(self, *args, **kwargs): vault_playback_url = "https://vaultname.vault.azure.net" hsm_playback_url = "https://managedhsmvaultname.managedhsm.azure.net" + playback_ekm_external_id = "fake-external-key" self.is_logging_enabled = kwargs.pop("logging_enable", True) if self.is_live: self.vault_url = os.environ["AZURE_KEYVAULT_URL"] hsm = os.environ.get("AZURE_MANAGEDHSM_URL") self.managed_hsm_url = hsm if hsm else None + self.ekm_external_id = os.getenv("EKM_EXTERNAL_ID") else: self.vault_url = vault_playback_url self.managed_hsm_url = hsm_playback_url + self.ekm_external_id = playback_ekm_external_id self._set_mgmt_settings_real_values() @@ -44,7 +47,12 @@ async def _preparer(test_class, api_version, is_hsm, **kwargs): client = self.create_key_client(endpoint_url, api_version=api_version, **kwargs) async with client: await fn( - test_class, client, is_hsm=is_hsm, managed_hsm_url=self.managed_hsm_url, vault_url=self.vault_url + test_class, + client, + is_hsm=is_hsm, + managed_hsm_url=self.managed_hsm_url, + vault_url=self.vault_url, + ekm_external_id=self.ekm_external_id, ) return _preparer diff --git a/sdk/keyvault/azure-keyvault-keys/tests/_test_case.py b/sdk/keyvault/azure-keyvault-keys/tests/_test_case.py index 6663690fc1ca..416b7ebef956 100644 --- a/sdk/keyvault/azure-keyvault-keys/tests/_test_case.py +++ b/sdk/keyvault/azure-keyvault-keys/tests/_test_case.py @@ -61,6 +61,7 @@ class KeysClientPreparer(AzureRecordedTestCase): def __init__(self, *args, **kwargs): vault_playback_url = "https://vaultname.vault.azure.net" hsm_playback_url = "https://managedhsmvaultname.managedhsm.azure.net" + playback_ekm_external_id = "fake-external-key" self.is_logging_enabled = kwargs.pop("logging_enable", True) if self.is_live: @@ -70,9 +71,11 @@ def __init__(self, *args, **kwargs): self.managed_hsm_url = hsm if hsm else None if self.managed_hsm_url: self.managed_hsm_url = self.managed_hsm_url.rstrip("/") + self.ekm_external_id = os.getenv("EKM_EXTERNAL_ID") else: self.vault_url = vault_playback_url self.managed_hsm_url = hsm_playback_url + self.ekm_external_id = playback_ekm_external_id self._set_mgmt_settings_real_values() @@ -86,7 +89,14 @@ def _preparer(test_class, api_version, is_hsm, **kwargs): client = self.create_key_client(endpoint_url, api_version=api_version, **kwargs) with client: - fn(test_class, client, is_hsm=is_hsm, managed_hsm_url=self.managed_hsm_url, vault_url=self.vault_url) + fn( + test_class, + client, + is_hsm=is_hsm, + managed_hsm_url=self.managed_hsm_url, + vault_url=self.vault_url, + ekm_external_id=self.ekm_external_id, + ) return _preparer diff --git a/sdk/keyvault/azure-keyvault-keys/tests/conftest.py b/sdk/keyvault/azure-keyvault-keys/tests/conftest.py index bdd0c0faf09d..d6bc321cd4e2 100644 --- a/sdk/keyvault/azure-keyvault-keys/tests/conftest.py +++ b/sdk/keyvault/azure-keyvault-keys/tests/conftest.py @@ -49,12 +49,14 @@ def add_sanitizers(test_proxy): "AZURE_KEYVAULT_ATTESTATION_URL", "https://fakeattestation.azurewebsites.net" ) azure_attestation_uri = azure_attestation_uri.rstrip("/") + ekm_external_id = os.environ.get("EKM_EXTERNAL_ID", "fake-external-key") add_general_string_sanitizer(target=azure_keyvault_url, value="https://vaultname.vault.azure.net") add_general_string_sanitizer(target=keyvault_tenant_id, value="00000000-0000-0000-0000-000000000000") add_general_string_sanitizer(target=keyvault_subscription_id, value="00000000-0000-0000-0000-000000000000") add_general_string_sanitizer(target=azure_managedhsm_url, value="https://managedhsmvaultname.managedhsm.azure.net") add_general_string_sanitizer(target=azure_attestation_uri, value="https://fakeattestation.azurewebsites.net") + add_general_string_sanitizer(target=ekm_external_id, value="fake-external-key") add_oauth_response_sanitizer() set_custom_default_matcher(compare_bodies=False, ignore_query_ordering=True, ignored_headers="Accept") diff --git a/sdk/keyvault/azure-keyvault-keys/tests/test_key_client.py b/sdk/keyvault/azure-keyvault-keys/tests/test_key_client.py index e84667fc8ba1..6c869a95881b 100644 --- a/sdk/keyvault/azure-keyvault-keys/tests/test_key_client.py +++ b/sdk/keyvault/azure-keyvault-keys/tests/test_key_client.py @@ -17,6 +17,7 @@ from azure.core.rest import HttpRequest from azure.keyvault.keys import ( ApiVersion, + ExternalKey, JsonWebKey, KeyClient, KeyProperties, @@ -807,6 +808,38 @@ def test_send_request(self, client, is_hsm, **kwargs): response = client.send_request(request) assert response.json()["key"]["kid"] == key.id + @pytest.mark.parametrize("api_version,is_hsm", only_hsm_default) + @KeysClientPreparer() + @recorded_by_proxy + def test_create_external_key(self, client, **kwargs): + """Register an external HSM key and verify the external_key reference round-trips.""" + external_id = kwargs.pop("ekm_external_id") + if not external_id: + pytest.skip( + "No external key ID provided. This test requires an EKM-connected HSM and an existing external key." + ) + + key_name = self.get_resource_name("ext-key") + external_key = ExternalKey(id=external_id) + + created = client.create_external_key(key_name, external_key=external_key) + assert created is not None + assert created.name == key_name + assert created.properties.external_key is not None + assert created.properties.external_key.id == external_id + assert created.key_type is not None + + # Verify the external_key reference is also returned by a subsequent get_key. + fetched = client.get_key(key_name) + assert fetched.properties.external_key is not None + assert fetched.properties.external_key.id == external_id + assert fetched.key_type is not None + + # Delete the external key registration. + deleted_key = client.begin_delete_key(key_name).result() + assert deleted_key is not None + assert deleted_key.name == key_name + @pytest.mark.parametrize("api_version,is_hsm", only_hsm_default) @KeysClientPreparer() @recorded_by_proxy diff --git a/sdk/keyvault/azure-keyvault-keys/tests/test_keys_async.py b/sdk/keyvault/azure-keyvault-keys/tests/test_keys_async.py index 9843202745c3..1334a49f6f42 100644 --- a/sdk/keyvault/azure-keyvault-keys/tests/test_keys_async.py +++ b/sdk/keyvault/azure-keyvault-keys/tests/test_keys_async.py @@ -17,6 +17,7 @@ from azure.core.rest import HttpRequest from azure.keyvault.keys import ( ApiVersion, + ExternalKey, JsonWebKey, KeyProperties, KeyReleasePolicy, @@ -824,6 +825,39 @@ async def test_send_request(self, client, is_hsm, **kwargs): response = await client.send_request(request) assert response.json()["key"]["kid"] == key.id + @pytest.mark.asyncio + @pytest.mark.parametrize("api_version,is_hsm", only_hsm_default) + @AsyncKeysClientPreparer() + @recorded_by_proxy_async + async def test_create_external_key(self, client, **kwargs): + """Register an external HSM key and verify the external_key reference round-trips.""" + external_id = kwargs.pop("ekm_external_id") + if not external_id: + pytest.skip( + "No external key ID provided. This test requires an EKM-connected HSM and an existing external key." + ) + + key_name = self.get_resource_name("ext-key") + external_key = ExternalKey(id=external_id) + + created = await client.create_external_key(key_name, external_key=external_key) + assert created is not None + assert created.name == key_name + assert created.properties.external_key is not None + assert created.properties.external_key.id == external_id + assert created.key_type is not None + + # Verify the external_key reference is also returned by a subsequent get_key. + fetched = await client.get_key(key_name) + assert fetched.properties.external_key is not None + assert fetched.properties.external_key.id == external_id + assert fetched.key_type is not None + + # Delete the external key registration. + deleted_key = await client.delete_key(key_name) + assert deleted_key is not None + assert deleted_key.name == key_name + @pytest.mark.asyncio @pytest.mark.parametrize("api_version,is_hsm", only_hsm_default) @AsyncKeysClientPreparer() diff --git a/sdk/keyvault/azure-keyvault-keys/tests/test_samples_keys.py b/sdk/keyvault/azure-keyvault-keys/tests/test_samples_keys.py index a8c2293ba2dd..e3a34ea189b8 100644 --- a/sdk/keyvault/azure-keyvault-keys/tests/test_samples_keys.py +++ b/sdk/keyvault/azure-keyvault-keys/tests/test_samples_keys.py @@ -17,6 +17,7 @@ all_api_versions = get_decorator(only_vault=True) default_version = get_decorator(api_versions=[DEFAULT_VERSION]) only_hsm = get_decorator(only_hsm=True) +only_hsm_default = get_decorator(only_hsm=True, api_versions=[DEFAULT_VERSION]) def print(*args): @@ -148,6 +149,30 @@ def test_example_create_oct_key(self, key_client, **kwargs): print(key.properties.key_size) # [END create_oct_key] + @pytest.mark.parametrize("api_version,is_hsm", only_hsm_default) + @KeysClientPreparer() + @recorded_by_proxy + def test_example_create_external_key(self, key_client, **kwargs): + external_id = kwargs.pop("ekm_external_id") + if not external_id: + pytest.skip( + "No external key ID provided. This test requires an EKM-connected HSM and an existing external key." + ) + key_name = self.get_resource_name("ext-key") + + # [START create_external_key] + from azure.keyvault.keys import ExternalKey + + # the external_key.id refers to the key material managed by an external HSM + external_key = ExternalKey(id=external_id) + key = key_client.create_external_key(key_name, external_key=external_key) + + print(key.id) + print(key.name) + print(key.properties.external_key.id) + print(key.key_type) + # [END create_external_key] + @pytest.mark.parametrize("api_version,is_hsm", all_api_versions) @KeysClientPreparer() @recorded_by_proxy diff --git a/sdk/keyvault/azure-keyvault-keys/tests/test_samples_keys_async.py b/sdk/keyvault/azure-keyvault-keys/tests/test_samples_keys_async.py index 3ab572522834..1e01792e2b93 100644 --- a/sdk/keyvault/azure-keyvault-keys/tests/test_samples_keys_async.py +++ b/sdk/keyvault/azure-keyvault-keys/tests/test_samples_keys_async.py @@ -17,6 +17,7 @@ all_api_versions = get_decorator(is_async=True, only_vault=True) default_version = get_decorator(is_async=True, api_versions=[DEFAULT_VERSION]) only_hsm = get_decorator(only_hsm=True, is_async=True) +only_hsm_default = get_decorator(only_hsm=True, is_async=True, api_versions=[DEFAULT_VERSION]) def print(*args): @@ -150,6 +151,31 @@ async def test_example_create_oct_key(self, key_client, **kwargs): print(key.properties.key_size) # [END create_oct_key] + @pytest.mark.asyncio + @pytest.mark.parametrize("api_version,is_hsm", only_hsm_default) + @AsyncKeysClientPreparer() + @recorded_by_proxy_async + async def test_example_create_external_key(self, key_client, **kwargs): + external_id = kwargs.pop("ekm_external_id") + if not external_id: + pytest.skip( + "No external key ID provided. This test requires an EKM-connected HSM and an existing external key." + ) + key_name = self.get_resource_name("ext-key") + + # [START create_external_key] + from azure.keyvault.keys import ExternalKey + + # the external_key.id refers to the key material managed by an external HSM + external_key = ExternalKey(id=external_id) + key = await key_client.create_external_key(key_name, external_key=external_key) + + print(key.id) + print(key.name) + print(key.properties.external_key.id) + print(key.key_type) + # [END create_external_key] + @pytest.mark.asyncio @pytest.mark.parametrize("api_version,is_hsm", all_api_versions) @AsyncKeysClientPreparer()