|
2 | 2 | # Copyright (c) Microsoft Corporation. |
3 | 3 | # Licensed under the MIT License. |
4 | 4 | # ------------------------------------ |
| 5 | + |
| 6 | +import base64 |
5 | 7 | import pytest |
6 | 8 |
|
7 | 9 | from azure.core.exceptions import HttpResponseError |
8 | | -from azure.keyvault.administration import ( |
9 | | - KeyVaultEkmClient, |
10 | | - KeyVaultEkmConnection, |
11 | | - KeyVaultEkmProxyClientCertificateInfo, |
12 | | - KeyVaultEkmProxyInfo, |
13 | | -) |
14 | | -from azure.keyvault.administration._generated.models import EkmConnection as _GeneratedEkmConnection |
| 10 | +from azure.keyvault.administration import KeyVaultEkmClient, KeyVaultEkmConnection |
| 11 | +from azure.keyvault.administration._internal.client_base import DEFAULT_VERSION |
15 | 12 |
|
16 | 13 | from devtools_testutils import recorded_by_proxy |
17 | 14 |
|
18 | 15 | from _shared.test_case import KeyVaultTestCase |
19 | 16 | from _test_case import KeyVaultEkmClientPreparer, get_decorator |
20 | 17 |
|
| 18 | +only_latest = get_decorator(api_versions=[DEFAULT_VERSION]) |
21 | 19 |
|
22 | | -# EKM operations are only available on the 2026-01-01-preview API version. |
23 | | -ekm_api_versions = get_decorator(api_versions=["2026-01-01-preview"]) |
24 | | - |
25 | | -# Minimal placeholder bytes; real ca certificates are PEM/DER chains pinned to your EKM proxy. |
26 | | -_PROXY_CA_BYTES = b"\x30\x82\x01\x00" |
27 | | - |
28 | | - |
29 | | -def _build_connection(host: str = "ekm.contoso.com") -> KeyVaultEkmConnection: |
30 | | - return KeyVaultEkmConnection( |
31 | | - host=host, |
32 | | - server_ca_certificates=[_PROXY_CA_BYTES], |
33 | | - path_prefix="/api", |
34 | | - server_subject_common_name="ekm.contoso.com", |
35 | | - ) |
36 | | - |
37 | | - |
38 | | -class TestEkmModels: |
39 | | - """Unit tests for EKM public model wrappers (no service interaction).""" |
40 | | - |
41 | | - def test_ekm_connection_init_required_and_optional_fields(self): |
42 | | - connection = _build_connection() |
43 | | - assert connection.host == "ekm.contoso.com" |
44 | | - assert connection.server_ca_certificates == [_PROXY_CA_BYTES] |
45 | | - assert connection.path_prefix == "/api" |
46 | | - assert connection.server_subject_common_name == "ekm.contoso.com" |
47 | | - |
48 | | - def test_ekm_connection_optional_fields_default_to_none(self): |
49 | | - connection = KeyVaultEkmConnection(host="h.example", server_ca_certificates=[_PROXY_CA_BYTES]) |
50 | | - assert connection.path_prefix is None |
51 | | - assert connection.server_subject_common_name is None |
| 20 | +# Note: These tests require an EKM connection to be established with an EKM Sample Proxy. |
52 | 21 |
|
53 | | - def test_ekm_connection_repr_includes_host(self): |
54 | | - connection = _build_connection("repr.example.net") |
55 | | - assert "repr.example.net" in repr(connection) |
56 | 22 |
|
57 | | - def test_ekm_connection_round_trip_to_and_from_generated(self): |
58 | | - original = _build_connection() |
59 | | - generated = original._to_generated() |
60 | | - assert isinstance(generated, _GeneratedEkmConnection) |
61 | | - assert generated.host == original.host |
62 | | - assert generated.server_ca_certificates == original.server_ca_certificates |
63 | | - assert generated.path_prefix == original.path_prefix |
64 | | - assert generated.server_subject_common_name == original.server_subject_common_name |
65 | | - |
66 | | - round_tripped = KeyVaultEkmConnection._from_generated(generated) |
67 | | - assert round_tripped.host == original.host |
68 | | - assert round_tripped.server_ca_certificates == original.server_ca_certificates |
69 | | - assert round_tripped.path_prefix == original.path_prefix |
70 | | - assert round_tripped.server_subject_common_name == original.server_subject_common_name |
71 | | - |
72 | | - def test_ekm_proxy_client_certificate_info_kwargs(self): |
73 | | - info = KeyVaultEkmProxyClientCertificateInfo( |
74 | | - ca_certificates=[_PROXY_CA_BYTES], subject_common_name="ekm-client.contoso.com" |
75 | | - ) |
76 | | - assert info.ca_certificates == [_PROXY_CA_BYTES] |
77 | | - assert info.subject_common_name == "ekm-client.contoso.com" |
78 | | - assert "ekm-client.contoso.com" in repr(info) |
79 | | - |
80 | | - def test_ekm_proxy_info_kwargs(self): |
81 | | - info = KeyVaultEkmProxyInfo( |
82 | | - api_version="2.0", proxy_vendor="Contoso", proxy_name="ProxyX 1.2", ekm_vendor="Acme", ekm_product="HSM 9000" |
83 | | - ) |
84 | | - assert info.api_version == "2.0" |
85 | | - assert info.proxy_vendor == "Contoso" |
86 | | - assert info.proxy_name == "ProxyX 1.2" |
87 | | - assert info.ekm_vendor == "Acme" |
88 | | - assert info.ekm_product == "HSM 9000" |
89 | | - rendered = repr(info) |
90 | | - assert "Contoso" in rendered and "Acme" in rendered |
91 | | - |
92 | | - |
93 | | -class TestEkmClient(KeyVaultTestCase): |
94 | | - @pytest.mark.parametrize("api_version", ekm_api_versions) |
| 23 | +class TestEkm(KeyVaultTestCase): |
| 24 | + @pytest.mark.live_test_only |
| 25 | + @pytest.mark.parametrize("api_version", only_latest) |
95 | 26 | @KeyVaultEkmClientPreparer() |
96 | 27 | @recorded_by_proxy |
97 | | - def test_ekm_connection_lifecycle(self, client: KeyVaultEkmClient, **kwargs): |
98 | | - """Exercise create -> get -> update -> delete on an EKM connection.""" |
99 | | - # Pre-condition: there should not be an existing EKM connection. |
100 | | - # If the recording was made against a clean HSM, delete is a no-op below. |
| 28 | + def test_ekm_connection(self, client: KeyVaultEkmClient, **kwargs): |
| 29 | + ekm_host = kwargs.pop("ekm_host") |
| 30 | + server_ca_certificate = kwargs.pop("ekm_certificate") |
| 31 | + if not server_ca_certificate or not ekm_host: |
| 32 | + pytest.skip( |
| 33 | + "EKM CA certificate is required for live tests. Please set the EKM_PROXY_HOST and EKM_SERVER_CA_CERTIFICATE environment variables." |
| 34 | + ) |
| 35 | + |
| 36 | + # Cleanup |
101 | 37 | try: |
102 | 38 | client.delete_ekm_connection() |
103 | 39 | except HttpResponseError: |
104 | 40 | pass |
105 | 41 |
|
106 | | - # Create |
107 | | - created = client.create_ekm_connection(_build_connection()) |
108 | | - assert isinstance(created, KeyVaultEkmConnection) |
109 | | - assert created.host == "ekm.contoso.com" |
110 | | - assert created.path_prefix == "/api" |
111 | | - |
112 | | - # Get |
113 | | - fetched = client.get_ekm_connection() |
114 | | - assert isinstance(fetched, KeyVaultEkmConnection) |
115 | | - assert fetched.host == created.host |
116 | | - assert fetched.path_prefix == created.path_prefix |
117 | | - |
118 | | - # Update |
119 | | - updated_input = _build_connection() |
120 | | - updated_input.path_prefix = "/v2" |
121 | | - updated = client.update_ekm_connection(updated_input) |
122 | | - assert updated.path_prefix == "/v2" |
123 | | - |
124 | | - # Delete |
125 | | - deleted = client.delete_ekm_connection() |
126 | | - assert isinstance(deleted, KeyVaultEkmConnection) |
127 | | - assert deleted.host == created.host |
128 | | - |
129 | | - @pytest.mark.parametrize("api_version", ekm_api_versions) |
130 | | - @KeyVaultEkmClientPreparer() |
131 | | - @recorded_by_proxy |
132 | | - def test_get_ekm_certificate(self, client: KeyVaultEkmClient, **kwargs): |
133 | | - """The EKM proxy client certificate info should always be retrievable.""" |
134 | | - info = client.get_ekm_certificate() |
135 | | - assert isinstance(info, KeyVaultEkmProxyClientCertificateInfo) |
136 | | - # The service may return either populated certificate info or an empty payload |
137 | | - # depending on EKM provisioning state. Both are valid response shapes. |
138 | | - |
139 | | - @pytest.mark.parametrize("api_version", ekm_api_versions) |
140 | | - @KeyVaultEkmClientPreparer() |
141 | | - @recorded_by_proxy |
142 | | - def test_check_ekm_connection(self, client: KeyVaultEkmClient, **kwargs): |
143 | | - """Verify check_ekm_connection returns proxy info when an EKM proxy is reachable.""" |
144 | | - try: |
145 | | - client.create_ekm_connection(_build_connection()) |
146 | | - except HttpResponseError: |
147 | | - # An EKM connection may already exist from a previous test/recording |
148 | | - pass |
149 | | - |
150 | | - try: |
151 | | - info = client.check_ekm_connection() |
152 | | - assert isinstance(info, KeyVaultEkmProxyInfo) |
153 | | - finally: |
154 | | - try: |
155 | | - client.delete_ekm_connection() |
156 | | - except HttpResponseError: |
157 | | - pass |
| 42 | + # Create an EKM connection |
| 43 | + ekm_connection = KeyVaultEkmConnection( |
| 44 | + host=ekm_host, |
| 45 | + server_ca_certificates=[base64.b64decode(server_ca_certificate)], |
| 46 | + path_prefix="/api/v1", |
| 47 | + ) |
| 48 | + created_ekm_connection = client.create_ekm_connection(connection=ekm_connection) |
| 49 | + assert created_ekm_connection is not None |
| 50 | + assert created_ekm_connection.host == ekm_host |
| 51 | + assert created_ekm_connection.server_ca_certificates is not None |
| 52 | + assert len(created_ekm_connection.server_ca_certificates) == 1 |
| 53 | + assert created_ekm_connection.path_prefix == ekm_connection.path_prefix |
| 54 | + assert created_ekm_connection.server_subject_common_name == ekm_connection.server_subject_common_name |
| 55 | + |
| 56 | + # Get the EKM connection |
| 57 | + retrieved_ekm_connection = client.get_ekm_connection() |
| 58 | + assert retrieved_ekm_connection is not None |
| 59 | + assert retrieved_ekm_connection.host == ekm_host |
| 60 | + assert retrieved_ekm_connection.server_ca_certificates is not None |
| 61 | + assert len(retrieved_ekm_connection.server_ca_certificates) == 1 |
| 62 | + assert retrieved_ekm_connection.path_prefix == ekm_connection.path_prefix |
| 63 | + assert retrieved_ekm_connection.server_subject_common_name == created_ekm_connection.server_subject_common_name |
| 64 | + |
| 65 | + # Get the EKM certificate |
| 66 | + ekm_certificate = client.get_ekm_certificate() |
| 67 | + assert ekm_certificate is not None |
| 68 | + assert ekm_certificate.ca_certificates is not None |
| 69 | + assert len(ekm_certificate.ca_certificates) == 1 |
| 70 | + |
| 71 | + # Check the EKM connection status |
| 72 | + connection_status = client.check_ekm_connection() |
| 73 | + assert connection_status is not None |
| 74 | + assert connection_status.api_version is not None |
| 75 | + assert connection_status.proxy_vendor is not None |
| 76 | + assert connection_status.proxy_name is not None |
| 77 | + assert connection_status.ekm_vendor is not None |
| 78 | + assert connection_status.ekm_product is not None |
| 79 | + |
| 80 | + # Update the EKM connection |
| 81 | + updated_ekm_connection = KeyVaultEkmConnection( |
| 82 | + host=ekm_host, |
| 83 | + server_ca_certificates=[base64.b64decode(server_ca_certificate)], |
| 84 | + path_prefix="/api/v1", |
| 85 | + ) |
| 86 | + result = client.update_ekm_connection(connection=updated_ekm_connection) |
| 87 | + assert result is not None |
| 88 | + assert result.host == updated_ekm_connection.host |
| 89 | + assert result.server_ca_certificates is not None |
| 90 | + assert len(result.server_ca_certificates) == 1 |
| 91 | + assert result.path_prefix == updated_ekm_connection.path_prefix |
| 92 | + assert result.server_subject_common_name == updated_ekm_connection.server_subject_common_name |
| 93 | + |
| 94 | + # Delete the EKM connection |
| 95 | + result = client.delete_ekm_connection() |
| 96 | + assert result is not None |
| 97 | + assert result.host == updated_ekm_connection.host |
| 98 | + assert result.server_ca_certificates is not None |
| 99 | + assert len(result.server_ca_certificates) == 1 |
| 100 | + assert result.path_prefix == updated_ekm_connection.path_prefix |
| 101 | + assert result.server_subject_common_name == updated_ekm_connection.server_subject_common_name |
0 commit comments