Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 76 additions & 70 deletions qubesadmin/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,61 +63,77 @@


class VMCollection:
"""Collection of VMs objects"""
"""A lazily-loaded, cached view of the VMs known to qubesd.

Membership (``__contains__``, ``__iter__``, ``keys()``, ``values()``)
reflects exclusively the VMs returned by the most recent
``admin.vm.List`` call. That list is fetched on the first membership
query and cached; call :py:meth:`refresh_cache` to reload.

:py:meth:`get_blind` is **not** a membership operation.
It returns (or creates) a :py:class:`~qubesadmin.vm.QubesVM` handle for
the given name without querying the API and without registering the name
in the membership view. Its purpose is to provide a unique
:py:class:`~qubesadmin.vm.QubesVM` object for the same name on repeated
calls.
Blind objects desynced with ``vm.List`` are pruned from the internal cache
on the next :py:meth:`refresh_cache` call.

``del domains[name]`` removes the VM by calling ``admin.vm.Remove``.
There is no ``__setitem__``; use
:py:meth:`~qubesadmin.app.QubesBase.add_new_vm` to create VMs.
"""

def __init__(self, app: "QubesBase"):
self.app = app
# TODO we should properly document what's in / the
# purpose of _vm_dict and _vm_objects
self._vm_dict: dict[str, dict[str, str]] = {}
self._vm_objects: dict[str, QubesVM] = {}
self._vm_dict_initialized: bool = False
# object cache — may contain blind objects not confirmed by vm.List
self._vms: dict[str, QubesVM] = {}
# names returned by the last admin.vm.List call
self._known_names: set[str] = set()
self._initialized: bool = False

def clear_cache(self, invalidate_name: str | None=None) -> None:
"""Clear cached list of VMs
If *invalidate_name* is given, remove that object from cache
explicitly too.
If *invalidate_name* is given, remove that object from the collection.
"""
self._vm_dict_initialized = False
self._vm_dict = {}
self._initialized = False
self._known_names = set()
if invalidate_name:
self._vm_objects.pop(invalidate_name, None)
self._vms.pop(invalidate_name, None)

def refresh_cache(self, force: bool=False) -> None:
"""Refresh cached list of VMs"""
if not force and self._vm_dict_initialized:
return
def _ensure_cache_loaded(self) -> None:
"""Load the VM list from qubesd if not already cached."""
if not self._initialized:
self.refresh_cache()

def refresh_cache(self) -> None:
"""Reload the VM list from qubesd, discarding any cached data."""
vm_list_data = self.app.qubesd_call("dom0", "admin.vm.List")
new_vm_dict = {}
# FIXME: this will probably change
# Handle renamed VMs
vms_by_current_name = {vm.name: vm for vm in self._vms.values()}
new_known_names: set[str] = set()
for vm_data in vm_list_data.splitlines():
vm_name, props = vm_data.decode("ascii").split(" ", 1)
vm_name = str(vm_name)
props = props.split(" ")
new_vm_dict[vm_name] = dict(
[vm_prop.split("=", 1) for vm_prop in props]
)
# if cache not enabled, drop power state
if not self.app.cache_enabled:
try:
del new_vm_dict[vm_name]["state"]
except KeyError:
pass

self._vm_dict = new_vm_dict
for name, vm in list(self._vm_objects.items()):
if vm.name not in self._vm_dict:
# VM no longer exists
del self._vm_objects[name]
elif vm.klass != self._vm_dict[vm.name]["class"]:
# VM class have changed
del self._vm_objects[name]
# TODO: some generation ID, to detect VM re-creation
elif name != vm.name:
# renamed
self._vm_objects[vm.name] = vm
del self._vm_objects[name]
self._vm_dict_initialized = True
props_dict = dict(vm_prop.split("=", 1)
for vm_prop in props.split(" "))
klass = props_dict["class"]
power_state = typing.cast(PowerState, props_dict.get("state"))
new_known_names.add(vm_name)
existing_vm = self._vms.get(vm_name) or\
vms_by_current_name.get(vm_name)
# TODO: some generation ID (e.g. uuid), to detect VM re-creation
if existing_vm is None or existing_vm.klass != klass:
self._vms[vm_name] = QubesVM(
self.app, vm_name, klass=klass, power_state=power_state
)
elif existing_vm is not self._vms.get(vm_name):
# renamed: existing_vm was found via vms_by_current_name
self._vms[vm_name] = existing_vm
# Drop objects for VMs that no longer exist
self._vms = {name: vm for name, vm in self._vms.items()
if name in new_known_names}
self._known_names = new_known_names
self._initialized = True

def __getitem__(self, item: str | QubesVM) -> QubesVM:
if isinstance(item, QubesVM):
Expand All @@ -128,23 +144,12 @@ def __getitem__(self, item: str | QubesVM) -> QubesVM:

def get_blind(self, item: str) -> QubesVM:
"""
Get a vm without downloading the list
and checking if exists
Get a vm from the collection. If the vm is not in the collection
already, a new basic entry will be created from the provided name.
"""
if item not in self._vm_objects:
# provide class name to constructor, if already cached (which can be
# done by 'item not in self' check above, unless blind_mode is
# enabled
klass: Klass | None = None
power_state: PowerState | None = None
if item in self._vm_dict:
klass = typing.cast(Klass | None, self._vm_dict[item]["class"])
power_state = typing.cast(PowerState | None,
self._vm_dict[item].get("state"))
self._vm_objects[item] = QubesVM(
self.app, item, klass=klass, power_state=power_state
)
return self._vm_objects[item]
if item not in self._vms:
self._vms[item] = QubesVM(self.app, item)
return self._vms[item]

T = TypeVar("T")

Expand All @@ -168,27 +173,28 @@ def get(self, item: str | QubesVM, default: object=None)\
def __contains__(self, item: QubesVM | str) -> bool:
if isinstance(item, qubesadmin.vm.QubesVM):
item = item.name
self.refresh_cache()
return item in self._vm_dict
self._ensure_cache_loaded()
return item in self._known_names

def __delitem__(self, key: str) -> None:
self.app.qubesd_call(key, "admin.vm.Remove")
self.clear_cache()
self._known_names.discard(key)
self._vms.pop(key, None)

def __iter__(self) -> Generator[QubesVM, None, None]:
self.refresh_cache()
for vm in sorted(self._vm_dict):
yield self[vm]
self._ensure_cache_loaded()
for vm in sorted(self._known_names):
yield self.get_blind(vm)

def keys(self) -> Iterable[str]:
"""Get list of VM names."""
self.refresh_cache()
return self._vm_dict.keys()
self._ensure_cache_loaded()
return self._known_names

def values(self) -> list[QubesVM]:
"""Get list of VM objects."""
self.refresh_cache()
return [self[name] for name in self._vm_dict]
self._ensure_cache_loaded()
return [self.get_blind(name) for name in self._known_names]


class QubesBase(qubesadmin.base.PropertyHolder):
Expand Down Expand Up @@ -838,7 +844,7 @@ def _invalidate_cache_all(self) -> None:
"""
# pylint: disable=protected-access
self.domains.clear_cache()
for vm in self.domains._vm_objects.values():
for vm in self.domains._vms.values():
assert isinstance(vm, qubesadmin.vm.QubesVM)
vm._power_state_cache = None
vm._properties_cache = {}
Expand Down
65 changes: 65 additions & 0 deletions qubesadmin/tests/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,71 @@ def test_012_getitem_cached_object(self):
self.assertIsNot(vm1, vm4)
self.assertAllCalled()

def test_013_get_blind_not_in_membership(self):
"""Blind objects must not appear in membership operations."""
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00test-vm class=AppVM state=Running\n'
self.assertIn('test-vm', self.app.domains)
self.app.domains.get_blind('other-vm')
self.assertNotIn('other-vm', self.app.domains)
self.assertNotIn('other-vm', self.app.domains.keys())
self.assertEqual([vm.name for vm in self.app.domains], ['test-vm'])
self.assertAllCalled()

def test_014_refresh_cache_forces_reload(self):
"""refresh_cache() must trigger a new admin.vm.List call even if
already initialised."""
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00test-vm class=AppVM state=Running\n'
self.assertIn('test-vm', self.app.domains)
self.assertNotIn('test-vm2', self.app.domains)
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00test-vm2 class=AppVM state=Running\n'
self.app.domains.refresh_cache()
self.assertNotIn('test-vm', self.app.domains)
self.assertIn('test-vm2', self.app.domains)
self.assertAllCalled()

def test_015_delitem_targeted_cleanup(self):
"""del domains[name] must remove the VM immediately without a
further admin.vm.List call."""
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00test-vm class=AppVM state=Running\n'
self.app.expected_calls[('test-vm', 'admin.vm.Remove', None, None)] = \
b'0\x00'
self.assertIn('test-vm', self.app.domains)
del self.app.domains['test-vm']
self.assertNotIn('test-vm', self.app.domains)
self.assertAllCalled()

def test_016_rename_preserves_identity(self):
"""After a rename, refresh_cache() must return the same object
under the new name."""
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00test-vm class=AppVM state=Running\n'
vm = self.app.domains['test-vm']
# pylint: disable=protected-access
vm._method_dest = 'new-name'
self.app.domains.clear_cache()
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00new-name class=AppVM state=Running\n'
vm2 = self.app.domains['new-name']
self.assertIs(vm, vm2)
self.assertAllCalled()

def test_017_clear_cache_invalidate_name(self):
"""clear_cache(invalidate_name) must drop that VM's object from the
cache so a fresh one is created on next access."""
self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \
b'0\x00test-vm class=AppVM state=Running\n'
vm1 = self.app.domains['test-vm']
self.app.domains.clear_cache()
vm2 = self.app.domains['test-vm']
self.assertIs(vm1, vm2)
self.app.domains.clear_cache(invalidate_name = 'test-vm')
vm3 = self.app.domains['test-vm']
self.assertIsNot(vm1, vm3)
self.assertAllCalled()


class TC_10_QubesBase(qubesadmin.tests.QubesTestCase):
Expand Down
7 changes: 0 additions & 7 deletions qubesadmin/tests/tools/qvm_ls.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,17 +388,10 @@ def test_101_list_selected(self):
b'0\x00vm1 class=AppVM state=Running\n' \
b'template1 class=TemplateVM state=Halted\n' \
b'sys-net class=AppVM state=Running\n'
self.app.expected_calls[
('vm1', 'admin.vm.CurrentState', None, None)] = \
b'0\x00power_state=Running'
self.app.expected_calls[
('sys-net', 'admin.vm.CurrentState', None, None)] = \
b'0\x00power_state=Running'
props = {
'label': 'type=label green',
'template': 'type=vm template1',
'netvm': 'type=vm sys-net',
# 'virt_mode': b'type=str pv',
}
self.app.expected_calls[
('vm1', 'admin.vm.property.GetAll', None, None)] = \
Expand Down
2 changes: 1 addition & 1 deletion qubesadmin/tools/qvm_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,7 @@ def verify(rpmfile, reponame, package_hdr=None):
name,
target + PATH_PREFIX + '/' + name])

app.domains.refresh_cache(force=True)
app.domains.refresh_cache()
tpl = app.domains[name]

tpl.features['template-name'] = name
Expand Down
2 changes: 1 addition & 1 deletion qubesadmin/vm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def get_power_state(self):

"""

if self._power_state_cache is not None:
if self._power_state_cache is not None and self.app.cache_enabled:
return self._power_state_cache
try:
power_state = self._get_current_state()["power_state"]
Expand Down