diff --git a/qubesadmin/app.py b/qubesadmin/app.py index 7ed68226..2fa11558 100644 --- a/qubesadmin/app.py +++ b/qubesadmin/app.py @@ -63,61 +63,77 @@ class VMCollection: - """Collection of VMs objects""" + """A lazily-loaded, cached view of the VMs known to qubesd. + + Membership (``__contains__``, ``__iter__``, ``keys()``, ``values()``) + reflects exclusively the VMs returned by the most recent + ``admin.vm.List`` call. That list is fetched on the first membership + query and cached; call :py:meth:`refresh_cache` to reload. + + :py:meth:`get_blind` is **not** a membership operation. + It returns (or creates) a :py:class:`~qubesadmin.vm.QubesVM` handle for + the given name without querying the API and without registering the name + in the membership view. Its purpose is to provide a unique + :py:class:`~qubesadmin.vm.QubesVM` object for the same name on repeated + calls. + Blind objects desynced with ``vm.List`` are pruned from the internal cache + on the next :py:meth:`refresh_cache` call. + + ``del domains[name]`` removes the VM by calling ``admin.vm.Remove``. + There is no ``__setitem__``; use + :py:meth:`~qubesadmin.app.QubesBase.add_new_vm` to create VMs. + """ def __init__(self, app: "QubesBase"): self.app = app - # TODO we should properly document what's in / the - # purpose of _vm_dict and _vm_objects - self._vm_dict: dict[str, dict[str, str]] = {} - self._vm_objects: dict[str, QubesVM] = {} - self._vm_dict_initialized: bool = False + # object cache — may contain blind objects not confirmed by vm.List + self._vms: dict[str, QubesVM] = {} + # names returned by the last admin.vm.List call + self._known_names: set[str] = set() + self._initialized: bool = False def clear_cache(self, invalidate_name: str | None=None) -> None: """Clear cached list of VMs - If *invalidate_name* is given, remove that object from cache - explicitly too. + If *invalidate_name* is given, remove that object from the collection. """ - self._vm_dict_initialized = False - self._vm_dict = {} + self._initialized = False + self._known_names = set() if invalidate_name: - self._vm_objects.pop(invalidate_name, None) + self._vms.pop(invalidate_name, None) - def refresh_cache(self, force: bool=False) -> None: - """Refresh cached list of VMs""" - if not force and self._vm_dict_initialized: - return + def _ensure_cache_loaded(self) -> None: + """Load the VM list from qubesd if not already cached.""" + if not self._initialized: + self.refresh_cache() + + def refresh_cache(self) -> None: + """Reload the VM list from qubesd, discarding any cached data.""" vm_list_data = self.app.qubesd_call("dom0", "admin.vm.List") - new_vm_dict = {} - # FIXME: this will probably change + # Handle renamed VMs + vms_by_current_name = {vm.name: vm for vm in self._vms.values()} + new_known_names: set[str] = set() for vm_data in vm_list_data.splitlines(): vm_name, props = vm_data.decode("ascii").split(" ", 1) - vm_name = str(vm_name) - props = props.split(" ") - new_vm_dict[vm_name] = dict( - [vm_prop.split("=", 1) for vm_prop in props] - ) - # if cache not enabled, drop power state - if not self.app.cache_enabled: - try: - del new_vm_dict[vm_name]["state"] - except KeyError: - pass - - self._vm_dict = new_vm_dict - for name, vm in list(self._vm_objects.items()): - if vm.name not in self._vm_dict: - # VM no longer exists - del self._vm_objects[name] - elif vm.klass != self._vm_dict[vm.name]["class"]: - # VM class have changed - del self._vm_objects[name] - # TODO: some generation ID, to detect VM re-creation - elif name != vm.name: - # renamed - self._vm_objects[vm.name] = vm - del self._vm_objects[name] - self._vm_dict_initialized = True + props_dict = dict(vm_prop.split("=", 1) + for vm_prop in props.split(" ")) + klass = props_dict["class"] + power_state = typing.cast(PowerState, props_dict.get("state")) + new_known_names.add(vm_name) + existing_vm = self._vms.get(vm_name) or\ + vms_by_current_name.get(vm_name) + # TODO: some generation ID (e.g. uuid), to detect VM re-creation + if existing_vm is None or existing_vm.klass != klass: + self._vms[vm_name] = QubesVM( + self.app, vm_name, klass=klass, power_state=power_state + ) + elif existing_vm is not self._vms.get(vm_name): + # renamed: existing_vm was found via vms_by_current_name + self._vms[vm_name] = existing_vm + # Drop objects for VMs that no longer exist + self._vms = {name: vm for name, vm in self._vms.items() + if name in new_known_names} + self._known_names = new_known_names + self._initialized = True def __getitem__(self, item: str | QubesVM) -> QubesVM: if isinstance(item, QubesVM): @@ -128,23 +144,12 @@ def __getitem__(self, item: str | QubesVM) -> QubesVM: def get_blind(self, item: str) -> QubesVM: """ - Get a vm without downloading the list - and checking if exists + Get a vm from the collection. If the vm is not in the collection + already, a new basic entry will be created from the provided name. """ - if item not in self._vm_objects: - # provide class name to constructor, if already cached (which can be - # done by 'item not in self' check above, unless blind_mode is - # enabled - klass: Klass | None = None - power_state: PowerState | None = None - if item in self._vm_dict: - klass = typing.cast(Klass | None, self._vm_dict[item]["class"]) - power_state = typing.cast(PowerState | None, - self._vm_dict[item].get("state")) - self._vm_objects[item] = QubesVM( - self.app, item, klass=klass, power_state=power_state - ) - return self._vm_objects[item] + if item not in self._vms: + self._vms[item] = QubesVM(self.app, item) + return self._vms[item] T = TypeVar("T") @@ -168,27 +173,28 @@ def get(self, item: str | QubesVM, default: object=None)\ def __contains__(self, item: QubesVM | str) -> bool: if isinstance(item, qubesadmin.vm.QubesVM): item = item.name - self.refresh_cache() - return item in self._vm_dict + self._ensure_cache_loaded() + return item in self._known_names def __delitem__(self, key: str) -> None: self.app.qubesd_call(key, "admin.vm.Remove") - self.clear_cache() + self._known_names.discard(key) + self._vms.pop(key, None) def __iter__(self) -> Generator[QubesVM, None, None]: - self.refresh_cache() - for vm in sorted(self._vm_dict): - yield self[vm] + self._ensure_cache_loaded() + for vm in sorted(self._known_names): + yield self.get_blind(vm) def keys(self) -> Iterable[str]: """Get list of VM names.""" - self.refresh_cache() - return self._vm_dict.keys() + self._ensure_cache_loaded() + return self._known_names def values(self) -> list[QubesVM]: """Get list of VM objects.""" - self.refresh_cache() - return [self[name] for name in self._vm_dict] + self._ensure_cache_loaded() + return [self.get_blind(name) for name in self._known_names] class QubesBase(qubesadmin.base.PropertyHolder): @@ -838,7 +844,7 @@ def _invalidate_cache_all(self) -> None: """ # pylint: disable=protected-access self.domains.clear_cache() - for vm in self.domains._vm_objects.values(): + for vm in self.domains._vms.values(): assert isinstance(vm, qubesadmin.vm.QubesVM) vm._power_state_cache = None vm._properties_cache = {} diff --git a/qubesadmin/tests/app.py b/qubesadmin/tests/app.py index 254869ba..c14861cc 100644 --- a/qubesadmin/tests/app.py +++ b/qubesadmin/tests/app.py @@ -187,6 +187,71 @@ def test_012_getitem_cached_object(self): self.assertIsNot(vm1, vm4) self.assertAllCalled() + def test_013_get_blind_not_in_membership(self): + """Blind objects must not appear in membership operations.""" + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00test-vm class=AppVM state=Running\n' + self.assertIn('test-vm', self.app.domains) + self.app.domains.get_blind('other-vm') + self.assertNotIn('other-vm', self.app.domains) + self.assertNotIn('other-vm', self.app.domains.keys()) + self.assertEqual([vm.name for vm in self.app.domains], ['test-vm']) + self.assertAllCalled() + + def test_014_refresh_cache_forces_reload(self): + """refresh_cache() must trigger a new admin.vm.List call even if + already initialised.""" + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00test-vm class=AppVM state=Running\n' + self.assertIn('test-vm', self.app.domains) + self.assertNotIn('test-vm2', self.app.domains) + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00test-vm2 class=AppVM state=Running\n' + self.app.domains.refresh_cache() + self.assertNotIn('test-vm', self.app.domains) + self.assertIn('test-vm2', self.app.domains) + self.assertAllCalled() + + def test_015_delitem_targeted_cleanup(self): + """del domains[name] must remove the VM immediately without a + further admin.vm.List call.""" + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00test-vm class=AppVM state=Running\n' + self.app.expected_calls[('test-vm', 'admin.vm.Remove', None, None)] = \ + b'0\x00' + self.assertIn('test-vm', self.app.domains) + del self.app.domains['test-vm'] + self.assertNotIn('test-vm', self.app.domains) + self.assertAllCalled() + + def test_016_rename_preserves_identity(self): + """After a rename, refresh_cache() must return the same object + under the new name.""" + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00test-vm class=AppVM state=Running\n' + vm = self.app.domains['test-vm'] + # pylint: disable=protected-access + vm._method_dest = 'new-name' + self.app.domains.clear_cache() + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00new-name class=AppVM state=Running\n' + vm2 = self.app.domains['new-name'] + self.assertIs(vm, vm2) + self.assertAllCalled() + + def test_017_clear_cache_invalidate_name(self): + """clear_cache(invalidate_name) must drop that VM's object from the + cache so a fresh one is created on next access.""" + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00test-vm class=AppVM state=Running\n' + vm1 = self.app.domains['test-vm'] + self.app.domains.clear_cache() + vm2 = self.app.domains['test-vm'] + self.assertIs(vm1, vm2) + self.app.domains.clear_cache(invalidate_name = 'test-vm') + vm3 = self.app.domains['test-vm'] + self.assertIsNot(vm1, vm3) + self.assertAllCalled() class TC_10_QubesBase(qubesadmin.tests.QubesTestCase): diff --git a/qubesadmin/tests/tools/qvm_ls.py b/qubesadmin/tests/tools/qvm_ls.py index 6a837794..507802a4 100644 --- a/qubesadmin/tests/tools/qvm_ls.py +++ b/qubesadmin/tests/tools/qvm_ls.py @@ -388,17 +388,10 @@ def test_101_list_selected(self): b'0\x00vm1 class=AppVM state=Running\n' \ b'template1 class=TemplateVM state=Halted\n' \ b'sys-net class=AppVM state=Running\n' - self.app.expected_calls[ - ('vm1', 'admin.vm.CurrentState', None, None)] = \ - b'0\x00power_state=Running' - self.app.expected_calls[ - ('sys-net', 'admin.vm.CurrentState', None, None)] = \ - b'0\x00power_state=Running' props = { 'label': 'type=label green', 'template': 'type=vm template1', 'netvm': 'type=vm sys-net', -# 'virt_mode': b'type=str pv', } self.app.expected_calls[ ('vm1', 'admin.vm.property.GetAll', None, None)] = \ diff --git a/qubesadmin/tools/qvm_template.py b/qubesadmin/tools/qvm_template.py index 95a2b77d..6f2a5758 100644 --- a/qubesadmin/tools/qvm_template.py +++ b/qubesadmin/tools/qvm_template.py @@ -1200,7 +1200,7 @@ def verify(rpmfile, reponame, package_hdr=None): name, target + PATH_PREFIX + '/' + name]) - app.domains.refresh_cache(force=True) + app.domains.refresh_cache() tpl = app.domains[name] tpl.features['template-name'] = name diff --git a/qubesadmin/vm/__init__.py b/qubesadmin/vm/__init__.py index 55943747..6d6aae20 100644 --- a/qubesadmin/vm/__init__.py +++ b/qubesadmin/vm/__init__.py @@ -212,7 +212,7 @@ def get_power_state(self): """ - if self._power_state_cache is not None: + if self._power_state_cache is not None and self.app.cache_enabled: return self._power_state_cache try: power_state = self._get_current_state()["power_state"]