Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 104 additions & 18 deletions coriolis/tests/integration/deployments/test_luks_osmorphing.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,32 @@
from coriolis import constants
from coriolis.tests.integration import base as integration_base
from coriolis.tests.integration import harness as integration_harness
from coriolis.tests.integration import utils as test_utils
from coriolis.tests.integration import osmorphing_utils

_LUKS_PASSPHRASE = "it-luks-encrypted"
_MIGRATION_PASSPHRASE = "it-luks-migratable"
_TPM_SLOT_SECRET = "fake-tpm-sealed-secret"


class _LUKSOSMorphingMixin:
"""Mixin for LUKS OS morphing integration tests.

Simulates the pre-migration workflow during setUp: the source disk is
encrypted with an original key (which may be TPM-sealed on a real VM),
then a separate migration key is added via the volume master key without
needing the original passphrase. Only the migration passphrase is given
to Coriolis, matching what would happen in an actual migration.

After OS morphing:
- The original keyslot is intact (Coriolis never touches it).
- The migration passphrase is written to /etc/luks/coriolis_<dev>.key, so
the firstboot script can remove the keyslot autonomously.
"""

# Extra space for initramfs-tools and cryptsetup-initramfs packages that
# the LUKS morphing tools install on top of the base OS image.
_SCSI_DEBUG_SIZE_MB = 512
_CONTAINER_IMAGE = "ubuntu:24.04"

@classmethod
def setUpClass(cls):
Expand All @@ -49,25 +65,68 @@ def setUp(self):
fh.write(_LUKS_PASSPHRASE)
self.addCleanup(os.unlink, self._key_file)

with tempfile.NamedTemporaryFile(
mode="w", suffix=".key", delete=False) as fh:
self._migration_key_file = fh.name
fh.write(_MIGRATION_PASSPHRASE)
self.addCleanup(os.unlink, self._migration_key_file)

with tempfile.NamedTemporaryFile(
mode="w", suffix=".key", delete=False) as fh:
self._tpm_key_file = fh.name
fh.write(_TPM_SLOT_SECRET)
self.addCleanup(os.unlink, self._tpm_key_file)

super().setUp()
self._prepare_src_device()

def _prepare_src_device(self):
test_utils.make_luks_device(
self._src_device, self._key_file, "ubuntu:24.04")
osmorphing_utils.make_luks_device(
self._src_device, self._key_file, self._CONTAINER_IMAGE)

# Simulate the real pre-migration step: the disk is already mounted
# (the VM is running), so the migration key can be added via the
# volume master key without needing the original passphrase (which
# may be TPM-sealed, Tang-bound, etc.).
with osmorphing_utils.luks_open(
self._src_device, self._key_file,
disable_keyring=True) as mapper:
osmorphing_utils.luks_add_key_from_mapper(
mapper, self._src_device, self._migration_key_file)

# Add a keyslot representing the TPM-sealed key. LUKS fills slots in
# ascending order: original (0), migration (1), and this is slot 2.
osmorphing_utils.luks_add_key(
self._src_device, self._key_file, self._tpm_key_file)

# Inject a fake systemd-tpm2 token pointing at the TPM keyslot.
osmorphing_utils.luks_add_tpm2_token(self._src_device, keyslot_id=2)

# Overwrite crypttab with the real LUKS UUID but adding
# tpm2-device=auto, matching what systemd-cryptenroll sets up on a
# TPM-enrolled system. The real UUID is required so
# _update_crypttab_keyfile can match it.
luks_uuid = osmorphing_utils.get_luks_uuid(self._src_device)
osmorphing_utils.write_file_on_luks_device(
self._src_device, self._key_file, "etc/crypttab",
"luks-%s\tUUID=%s\tnone\tluks,tpm2-device=auto,tpm2-pcrs=7\n"
% (luks_uuid, luks_uuid),
)

# Pass only the migration passphrase to Coriolis; the original key
# is never shared.
dest_env = {
"devices": [self._dst_device],
constants.ENCRYPTED_DISKS_PASS: _LUKS_PASSPHRASE,
constants.ENCRYPTED_DISKS_PASS: _MIGRATION_PASSPHRASE,
}
self._client.transfers.update(
self._transfer.id,
{"destination_environment": dest_env},
)

def _check_path_exists(self, device, path):
with test_utils.luks_open(device, self._key_file) as mapper_path:
return test_utils.path_exists_on_device(mapper_path, path)
with osmorphing_utils.luks_open(device, self._key_file) as mapper_path:
return osmorphing_utils.path_exists_on_device(mapper_path, path)

def _assert_luks_common_firstboot_files(self):
dst_basename = os.path.basename(self._dst_device)
Expand Down Expand Up @@ -98,6 +157,44 @@ def test_deployment_with_os_morphing(self):
)
self._assert_firstboot_setup()

# The migration keyslot must still be present after morphing; it is
# supposed to be removed on the first VM boot by the firstboot script.
self.assertTrue(
osmorphing_utils.luks_can_open(
self._dst_device, self._migration_key_file),
"Migration LUKS keyslot should still be present after OS morphing",
)

# Coriolis writes the migration passphrase to the keyfile on the OS,
# so the filesystem can be unlocked and mounted on the first boot, and
# the firstboot script can run.
dst_basename = os.path.basename(self._dst_device)
keyfile_content = osmorphing_utils.read_file_from_luks_device(
self._dst_device, self._key_file,
"etc/luks/coriolis_%s.key" % dst_basename)
self.assertEqual(
keyfile_content.strip(), _MIGRATION_PASSPHRASE,
"Migration keyfile content does not match migration passphrase",
)

self.assertFalse(
osmorphing_utils.luks_has_tpm2_token(self._dst_device),
"systemd-tpm2 token not removed from destination after morphing",
)
self.assertFalse(
osmorphing_utils.luks_can_open(
self._dst_device, self._tpm_key_file),
"TPM2 keyslot was not killed on destination after OS morphing",
)
crypttab = osmorphing_utils.read_file_from_luks_device(
self._dst_device, self._key_file, "etc/crypttab")
self.assertIsNotNone(
crypttab, "/etc/crypttab not found on destination")
self.assertNotIn(
"tpm2-", crypttab,
"TPM2 options remain in /etc/crypttab after OS morphing",
)


class LUKSOSMorphingDeploymentTest(
_LUKSOSMorphingMixin, integration_base.ReplicaIntegrationTestBase):
Expand All @@ -116,18 +213,7 @@ class LUKSRockyLinuxOSMorphingDeploymentTest(
_LUKSOSMorphingMixin, integration_base.ReplicaIntegrationTestBase):
"""LUKS + dracut OS morphing test using Rocky Linux 9."""

def _prepare_src_device(self):
test_utils.make_luks_device(
self._src_device, self._key_file, "rockylinux:9")

dest_env = {
"devices": [self._dst_device],
constants.ENCRYPTED_DISKS_PASS: _LUKS_PASSPHRASE,
}
self._client.transfers.update(
self._transfer.id,
{"destination_environment": dest_env},
)
_CONTAINER_IMAGE = "rockylinux:9"

def _assert_firstboot_setup(self):
self._assert_luks_common_firstboot_files()
Expand Down
23 changes: 13 additions & 10 deletions coriolis/tests/integration/deployments/test_osmorphing.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from coriolis.tests.integration import base as integration_base
from coriolis.tests.integration import harness as integration_harness
from coriolis.tests.integration import utils as test_utils
from coriolis.tests.integration import osmorphing_utils


class OsMorphingDeploymentTest(integration_base.ReplicaIntegrationTestBase):
Expand All @@ -33,18 +33,21 @@ def setUpClass(cls):

def setUp(self):
super().setUp()
test_utils.write_os_image_to_disk(self._src_device, "ubuntu:24.04")
osmorphing_utils.write_os_image_to_disk(
self._src_device, "ubuntu:24.04")

def test_deployment_with_os_morphing(self):
self.assertFalse(
test_utils.path_exists_on_device(self._src_device, "usr/bin/jq"),
osmorphing_utils.path_exists_on_device(
self._src_device, "usr/bin/jq"),
"jq was found on the source device before OS morphing",
)

self._execute_transfer_and_deployment()

self.assertTrue(
test_utils.path_exists_on_device(self._dst_device, "usr/bin/jq"),
osmorphing_utils.path_exists_on_device(
self._dst_device, "usr/bin/jq"),
"jq was not found on the destination device after OS morphing",
)

Expand All @@ -64,7 +67,7 @@ def test_os_morphing_global_script_basic_format(self):
}
self._execute_transfer_and_deployment(deployment_kwargs)

file_contents = test_utils.read_file_from_device(
file_contents = osmorphing_utils.read_file_from_device(
self._dst_device,
"cookie")
self.assertEqual(expected_string, file_contents)
Expand All @@ -82,7 +85,7 @@ def test_os_morphing_instance_script_basic_format(self):
}
self._execute_transfer_and_deployment(deployment_kwargs)

file_contents = test_utils.read_file_from_device(
file_contents = osmorphing_utils.read_file_from_device(
self._dst_device,
"cookie")
self.assertEqual(expected_string, file_contents)
Expand Down Expand Up @@ -124,10 +127,10 @@ def test_os_morphing_global_script_extended_format(self):
}
self._execute_transfer_and_deployment(deployment_kwargs)

pre_mounts = test_utils.read_file_from_device(
pre_mounts = osmorphing_utils.read_file_from_device(
self._dst_device,
"pre_mounts")
post_mounts = test_utils.read_file_from_device(
post_mounts = osmorphing_utils.read_file_from_device(
self._dst_device,
"post_mounts")

Expand Down Expand Up @@ -167,7 +170,7 @@ def test_os_morphing_global_script_first_boot(self):
# actually get executed. We'll merely verify that those files
# have been injected at the expected location.
first_boot_script_dir = "usr/lib/coriolis/firstboot/user"
first_boot_scripts = test_utils.list_files_from_device(
first_boot_scripts = osmorphing_utils.list_files_from_device(
self._dst_device, first_boot_script_dir)
if not first_boot_scripts:
raise AssertionError("Couldn't find first boot script dir.")
Expand All @@ -177,7 +180,7 @@ def test_os_morphing_global_script_first_boot(self):
if re.match(r"\d+-\w+\.sh", file_name):
first_boot_script_path = os.path.join(
first_boot_script_dir, file_name)
first_boot_script = test_utils.read_file_from_device(
first_boot_script = osmorphing_utils.read_file_from_device(
self._dst_device,
first_boot_script_path)
if payload == first_boot_script:
Expand Down
Loading
Loading