1818from castellan .common import exception as castellan_exc
1919from castellan .common .objects import passphrase
2020from castellan .key_manager import key_manager
21+ import ddt
2122import fixtures
2223from oslo_log import log as logging
2324from oslo_utils import uuidutils
@@ -140,6 +141,7 @@ def remove_consumer(self, context, managed_object_id, consumer_data):
140141 )
141142
142143
144+ @ddt .ddt
143145class VTPMServersTest (base .ServersTestBase ):
144146
145147 # NOTE: ADMIN_API is intentionally not set to True in order to catch key
@@ -197,6 +199,14 @@ def assertInstanceHasNoSecret(self, server):
197199 self .assertNotIn ('vtpm_secret_uuid' , instance .system_metadata )
198200 self .assertEqual (0 , len (self .key_mgr ._passphrases ))
199201
202+ def _assert_libvirt_has_secret (self , host , instance_uuid ):
203+ s = host .driver ._host .find_secret ('vtpm' , instance_uuid )
204+ self .assertIsNotNone (s )
205+ ctx = nova_context .get_admin_context ()
206+ instance = objects .Instance .get_by_uuid (ctx , instance_uuid )
207+ secret_uuid = instance .system_metadata ['vtpm_secret_uuid' ]
208+ self .assertEqual (secret_uuid , s .UUIDString ())
209+
200210 def _assert_libvirt_had_secret (self , compute , secret_uuid ):
201211 # This assert is for ephemeral private libvirt secrets that we
202212 # undefine immediately after guest creation. Examples include 'user'
@@ -206,6 +216,10 @@ def _assert_libvirt_had_secret(self, compute, secret_uuid):
206216 conn = compute .driver ._host .get_connection ()
207217 self .assertIn (secret_uuid , conn ._removed_secrets )
208218
219+ def _assert_libvirt_secret_missing (self , host , instance_uuid ):
220+ s = host .driver ._host .find_secret ('vtpm' , instance_uuid )
221+ self .assertIsNone (s )
222+
209223 def test_tpm_secret_security_user (self ):
210224 self .flags (supported_tpm_secret_security = ['user' ], group = 'libvirt' )
211225 host = self .start_compute (hostname = 'tpm-host' )
@@ -251,6 +265,38 @@ def test_create_server(self):
251265 # ensure we deleted the key now that we no longer need it
252266 self .assertEqual (0 , len (self .key_mgr ._passphrases ))
253267
268+ def test_create_server_secret_security_host (self ):
269+ self .flags (supported_tpm_secret_security = ['host' ], group = 'libvirt' )
270+ compute = self .start_compute ()
271+
272+ # ensure we are reporting the correct traits
273+ traits = self ._get_provider_traits (self .compute_rp_uuids [compute ])
274+ self .assertIn ('COMPUTE_SECURITY_TPM_SECRET_SECURITY_HOST' , traits )
275+
276+ # create a server with vTPM
277+ server = self ._create_server_with_vtpm (secret_security = 'host' )
278+
279+ # ensure our instance's system_metadata field and key manager inventory
280+ # is correct
281+ self .assertInstanceHasSecret (server )
282+
283+ # ensure the libvirt secret is defined correctly
284+ ctx = nova_context .get_admin_context ()
285+ instance = objects .Instance .get_by_uuid (ctx , server ['id' ])
286+ conn = self .computes [compute ].driver ._host .get_connection ()
287+ secret = conn ._secrets [instance .system_metadata ['vtpm_secret_uuid' ]]
288+ self .assertFalse (secret ._ephemeral )
289+ self .assertFalse (secret ._private )
290+
291+ # now delete the server
292+ self ._delete_server (server )
293+
294+ # ensure we deleted the key and undefined the secret now that we no
295+ # longer need it
296+ self .assertEqual (0 , len (self .key_mgr ._passphrases ))
297+ self .assertNotIn (instance .system_metadata ['vtpm_secret_uuid' ],
298+ conn ._secrets )
299+
254300 def test_suspend_resume_server (self ):
255301 self .start_compute ()
256302
@@ -300,6 +346,24 @@ def test_hard_reboot_server(self):
300346 # is still correct
301347 self .assertInstanceHasSecret (server )
302348
349+ @ddt .data (None , 'user' , 'host' )
350+ def test_hard_reboot_server_as_admin (self , secret_security ):
351+ """Test hard rebooting a non-admin user's instance as admin.
352+
353+ This should only work for the 'host' TPM secret security policy.
354+ """
355+ self .start_compute ()
356+
357+ # create a server with vTPM
358+ server = self ._create_server_with_vtpm (secret_security = secret_security )
359+
360+ # Attempt to reboot the server as admin, should only work for 'host'.
361+ if secret_security == 'host' :
362+ self ._reboot_server (server , hard = True , api = self .admin_api )
363+ else :
364+ self ._reboot_server (server , hard = True , expected_state = 'ERROR' ,
365+ api = self .admin_api )
366+
303367 def _test_resize_revert_server__vtpm_to_vtpm (self , extra_specs = None ):
304368 """Test behavior of revert when a vTPM is retained across a resize.
305369
0 commit comments