From 86f3bb8a912fd7ca2500808fc73f93eb01042213 Mon Sep 17 00:00:00 2001 From: Ben Grande Date: Tue, 12 May 2026 14:41:55 +0200 Subject: [PATCH 01/10] Format utils with black --- qubesadmin/utils.py | 91 +++++++++++++++++++++++++++++---------------- 1 file changed, 59 insertions(+), 32 deletions(-) diff --git a/qubesadmin/utils.py b/qubesadmin/utils.py index a8131d54..8b54d0ca 100644 --- a/qubesadmin/utils.py +++ b/qubesadmin/utils.py @@ -43,12 +43,18 @@ def parse_size(size: str) -> int: """Parse human readable size into bytes.""" units = [ - ('K', 1000), ('KB', 1000), - ('M', 1000 * 1000), ('MB', 1000 * 1000), - ('G', 1000 * 1000 * 1000), ('GB', 1000 * 1000 * 1000), - ('Ki', 1024), ('KiB', 1024), - ('Mi', 1024 * 1024), ('MiB', 1024 * 1024), - ('Gi', 1024 * 1024 * 1024), ('GiB', 1024 * 1024 * 1024), + ("K", 1000), + ("KB", 1000), + ("M", 1000 * 1000), + ("MB", 1000 * 1000), + ("G", 1000 * 1000 * 1000), + ("GB", 1000 * 1000 * 1000), + ("Ki", 1024), + ("KiB", 1024), + ("Mi", 1024 * 1024), + ("MiB", 1024 * 1024), + ("Gi", 1024 * 1024 * 1024), + ("GiB", 1024 * 1024 * 1024), ] size = size.strip().upper() @@ -57,7 +63,7 @@ def parse_size(size: str) -> int: for unit, multiplier in units: if size.endswith(unit.upper()): - size = size[:-len(unit)].strip() + size = size[: -len(unit)].strip() return int(size) * multiplier raise qubesadmin.exc.QubesException("Invalid size: {0}.".format(size)) @@ -89,14 +95,15 @@ def size_to_human(size: int) -> str: if size < 1024: return str(size) if size < 1024 * 1024: - return str(round(size / 1024.0, 1)) + ' KiB' + return str(round(size / 1024.0, 1)) + " KiB" if size < 1024 * 1024 * 1024: - return str(round(size / (1024.0 * 1024), 1)) + ' MiB' - return str(round(size / (1024.0 * 1024 * 1024), 1)) + ' GiB' + return str(round(size / (1024.0 * 1024), 1)) + " MiB" + return str(round(size / (1024.0 * 1024 * 1024), 1)) + " GiB" -UPDATES_DEFAULT_VM_DISABLE_FLAG = \ - '/var/lib/qubes/updates/vm-default-disable-updates' +UPDATES_DEFAULT_VM_DISABLE_FLAG = ( + "/var/lib/qubes/updates/vm-default-disable-updates" +) def updates_vms_status(qvm_collection: QubesBase) -> bool | None: @@ -109,14 +116,15 @@ def updates_vms_status(qvm_collection: QubesBase) -> bool | None: for vm in qvm_collection.domains: if vm.qid == 0: continue - if vm.features.get('check-updates', True) != status: + if vm.features.get("check-updates", True) != status: # "mixed" return None return status -def vm_dependencies(app: QubesBase, reference_vm: QubesVM)\ - -> list[tuple[QubesVM | None, str]]: +def vm_dependencies( + app: QubesBase, reference_vm: QubesVM +) -> list[tuple[QubesVM | None, str]]: """Helper function that returns a list of all the places a given VM is used in. Output is a list of tuples (property_holder, property_name), with None as property_holder for global properties @@ -124,16 +132,29 @@ def vm_dependencies(app: QubesBase, reference_vm: QubesVM)\ result = [] - global_properties = ['default_dispvm', 'default_netvm', 'default_guivm', - 'default_audiovm', 'default_template', 'clockvm', - 'updatevm', 'management_dispvm'] + global_properties = [ + "default_dispvm", + "default_netvm", + "default_guivm", + "default_audiovm", + "default_template", + "clockvm", + "updatevm", + "management_dispvm", + ] for prop in global_properties: if reference_vm == getattr(app, prop, None): result.append((None, prop)) - vm_properties = ['template', 'netvm', 'guivm', 'audiovm', - 'default_dispvm', 'management_dispvm'] + vm_properties = [ + "template", + "netvm", + "guivm", + "audiovm", + "default_dispvm", + "management_dispvm", + ] for vm in app.domains: if vm == reference_vm: @@ -169,24 +190,26 @@ def encode_for_vmexec(args: Iterable[str]) -> str: """ def encode(part: re.Match) -> bytes: - if part.group(0) == b'-': - return b'--' - return '-{:02X}'.format(ord(part.group(0))).encode('ascii') + if part.group(0) == b"-": + return b"--" + return "-{:02X}".format(ord(part.group(0))).encode("ascii") parts = [] for arg in args: - part = re.sub(br'[^a-zA-Z0-9_.]', encode, arg.encode('utf-8')) + part = re.sub(rb"[^a-zA-Z0-9_.]", encode, arg.encode("utf-8")) parts.append(part) - return b'+'.join(parts).decode('ascii') + return b"+".join(parts).decode("ascii") + class LockFile: """Simple locking context manager. It opens a file with an advisory lock taken (fcntl.lockf)""" - def __init__(self, path: str, nonblock: bool=False): + + def __init__(self, path: str, nonblock: bool = False): """Open the file. Call *acquire* or enter the context to lock the file""" # pylint: disable=consider-using-with - self.file = open(path, "w", encoding='ascii') + self.file = open(path, "w", encoding="ascii") self.nonblock = nonblock def __enter__(self, *args, **kwargs) -> LockFile: @@ -195,12 +218,16 @@ def __enter__(self, *args, **kwargs) -> LockFile: def acquire(self) -> None: """Lock the opened file""" - fcntl.lockf(self.file, - fcntl.LOCK_EX | (fcntl.LOCK_NB if self.nonblock else 0)) + fcntl.lockf( + self.file, fcntl.LOCK_EX | (fcntl.LOCK_NB if self.nonblock else 0) + ) - def __exit__(self, exc_type: object | None = None, - exc_value: object | None = None, - traceback: object | None = None) -> None: + def __exit__( + self, + exc_type: object | None = None, + exc_value: object | None = None, + traceback: object | None = None, + ) -> None: self.release() def release(self) -> None: From 9503264c131dd533bbfa5243a5b5344f3e781a15 Mon Sep 17 00:00:00 2001 From: Ben Grande Date: Thu, 21 May 2026 09:22:22 +0200 Subject: [PATCH 02/10] Stop conversion of domains list to set It loses the input order the user intended. --- qubesadmin/tests/tools/init.py | 23 +++++++++++++++++++++++ qubesadmin/tools/__init__.py | 14 +++++++++----- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/qubesadmin/tests/tools/init.py b/qubesadmin/tests/tools/init.py index 91d67f4d..1d82ba7f 100644 --- a/qubesadmin/tests/tools/init.py +++ b/qubesadmin/tests/tools/init.py @@ -149,3 +149,26 @@ def test_105_set_prop_positional(self): args = parser.parse_args(['testvalue']) self.assertIn( ('testprop', 'testvalue'), args.properties.items()) + + +class TC_02_QubesArgumentParser(qubesadmin.tests.QubesTestCase): + def test_000_domain_preserve_order(self): + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( + b'0\x00some-vm class=AppVM state=Running\n' + b'some-a class=AppVM state=Halted\n' + b'some-b class=AppVM state=Paused\n' + ) + + parser = qubesadmin.tools.QubesArgumentParser(vmname_nargs="+") + wanted_args = ["some-vm", "some-a", "some-vm"] + args = parser.parse_args(wanted_args, app=self.app) + self.assertEqual( + ["some-a", "some-b", "some-vm"], + [qube.name for qube in self.app.domains], + "app namespace must have domains in alphabetical order", + ) + self.assertEqual( + ["some-vm", "some-a"], + [qube.name for qube in args.domains], + "args namespace must have domains in input order", + ) diff --git a/qubesadmin/tools/__init__.py b/qubesadmin/tools/__init__.py index cec8be55..85cfa7fa 100644 --- a/qubesadmin/tools/__init__.py +++ b/qubesadmin/tools/__init__.py @@ -191,14 +191,18 @@ def parse_qubes_app(self, parser, namespace): except KeyError: parser.error('no such domain: {!r}'.format(vm_name)) else: - destinations = set() + destinations = [] for destination in getattr(namespace, self.dest): if any(wildcard in destination for wildcard in '*?[!]'): - for domain in app.domains: + for domain in [ + qube + for qube in app.domains + if qube.name not in destinations + ]: if fnmatch.fnmatch(domain.name, destination): - destinations.add(domain.name) - else: - destinations.add(destination) + destinations.append(domain.name) + elif destination not in destinations: + destinations.append(destination) for vm_name in destinations: try: From 33906d2b2aa5d86ade9b9d5a69d0bc92a58517a3 Mon Sep 17 00:00:00 2001 From: Ben Grande Date: Wed, 6 May 2026 16:32:46 +0200 Subject: [PATCH 03/10] Add option to wait for domain shutdown By waiting for the full shutdown, it becomes possible to receive appropriate exception, that would allow adapting the client to react differently. A timeout can be interpreted and the method adjusted to kill the domain. For: https://github.com/QubesOS/qubes-issues/issues/10835 --- qubesadmin/tests/vm/actions.py | 21 +++++++++++++++++++++ qubesadmin/vm/__init__.py | 15 ++++++++++----- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/qubesadmin/tests/vm/actions.py b/qubesadmin/tests/vm/actions.py index 833a82b2..525b5357 100644 --- a/qubesadmin/tests/vm/actions.py +++ b/qubesadmin/tests/vm/actions.py @@ -40,6 +40,27 @@ def test_001_shutdown(self): self.vm.shutdown() self.assertAllCalled() + def test_001_shutdown_force(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.Shutdown', 'force', None)] = \ + b'0\x00' + self.vm.shutdown(force=True) + self.assertAllCalled() + + def test_001_shutdown_wait(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.Shutdown', 'wait', None)] = \ + b'0\x00' + self.vm.shutdown(wait=True) + self.assertAllCalled() + + def test_001_shutdown_force_wait(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.Shutdown', 'force+wait', None)] = \ + b'0\x00' + self.vm.shutdown(force=True, wait=True) + self.assertAllCalled() + def test_002_kill(self): self.app.expected_calls[ ('test-vm', 'admin.vm.Kill', None, None)] = \ diff --git a/qubesadmin/vm/__init__.py b/qubesadmin/vm/__init__.py index 55943747..e9d35359 100644 --- a/qubesadmin/vm/__init__.py +++ b/qubesadmin/vm/__init__.py @@ -113,17 +113,22 @@ def start(self): """ self.qubesd_call(self._method_dest, "admin.vm.Start") - def shutdown(self, force=False): + def shutdown(self, force=False, wait=False): """ Shutdown domain. :return: """ - # TODO: wait parameter (using event?) + arg_list = [] if force: - self.qubesd_call(self._method_dest, "admin.vm.Shutdown", "force") - else: - self.qubesd_call(self._method_dest, "admin.vm.Shutdown") + arg_list.append("force") + if wait: + arg_list.append("wait") + args = "+".join(arg_list) + params = [self._method_dest, "admin.vm.Shutdown"] + if args: + params.append(args) + self.qubesd_call(*params) def kill(self): """ From fa1f766d66c5df0dc29dac3b89c6d289bbbcf541 Mon Sep 17 00:00:00 2001 From: Ben Grande Date: Fri, 8 May 2026 15:13:24 +0200 Subject: [PATCH 04/10] Format qvm-shutdown with black --- qubesadmin/tools/qvm_shutdown.py | 92 ++++++++++++++++++++------------ 1 file changed, 58 insertions(+), 34 deletions(-) diff --git a/qubesadmin/tools/qvm_shutdown.py b/qubesadmin/tools/qvm_shutdown.py index 793e0471..a07eb536 100644 --- a/qubesadmin/tools/qvm_shutdown.py +++ b/qubesadmin/tools/qvm_shutdown.py @@ -20,7 +20,7 @@ # You should have received a copy of the GNU Lesser General Public License along # with this program; if not, see . -''' Shutdown a qube ''' +"""Shutdown a qube""" from __future__ import print_function @@ -33,38 +33,56 @@ import qubesadmin.exc parser = qubesadmin.tools.QubesArgumentParser( - description=__doc__, vmname_nargs='+') + description=__doc__, vmname_nargs="+" +) -parser.add_argument('--wait', - action='store_true', default=False, - help='wait for the VMs to shut down') +parser.add_argument( + "--wait", + action="store_true", + default=False, + help="wait for the VMs to shut down", +) -parser.add_argument('--timeout', - action='store', type=float, +parser.add_argument( + "--timeout", + action="store", + type=float, default=60, - help='timeout after which domains are killed when using --wait' - ' (default: %(default)d)') + help="timeout after which domains are killed when using --wait" + " (default: %(default)d)", +) parser.add_argument( - '--force', - action='store_true', default=False, - help='shut down even if other qubes depend on this one (e.g. as NetVM' - ' or AudioVM); does not affect how the qube itself is shut down;' - ' use with caution') + "--force", + action="store_true", + default=False, + help="shut down even if other qubes depend on this one (e.g. as NetVM" + " or AudioVM); does not affect how the qube itself is shut down;" + " use with caution", +) parser.add_argument( - '--dry-run', - action='store_true', dest='dry_run', default=False, - help='don\'t really shutdown or kill the domains; useful with --wait') + "--dry-run", + action="store_true", + dest="dry_run", + default=False, + help="don't really shutdown or kill the domains; useful with --wait", +) def failed_domains(vms): - '''Find the domains that have not successfully been shut down''' + """Find the domains that have not successfully been shut down""" # DispVM might have been deleted before we check them, so NA is acceptable. - return [vm for vm in vms - if not (vm.get_power_state() == 'Halted' - or (vm.klass == 'DispVM' and vm.get_power_state() == 'NA'))] + return [ + vm + for vm in vms + if not ( + vm.get_power_state() == "Halted" + or (vm.klass == "DispVM" and vm.get_power_state() == "NA") + ) + ] + def main(args=None, app=None): # pylint: disable=missing-docstring args = parser.parse_args(args, app=app) @@ -86,14 +104,15 @@ def main(args=None, app=None): # pylint: disable=missing-docstring pass except qubesadmin.exc.QubesException as e: if not args.wait: - vm.log.error('Shutdown error: {}'.format(e)) + vm.log.error("Shutdown error: {}".format(e)) shutdown_failed.add(vm) if not args.wait: if shutdown_failed: parser.error_runtime( - 'Failed to shut down: ' + - ', '.join(vm.name for vm in shutdown_failed), - len(shutdown_failed)) + "Failed to shut down: " + + ", ".join(vm.name for vm in shutdown_failed), + len(shutdown_failed), + ) return awaiting = remaining_domains - shutdown_failed remaining_domains = shutdown_failed @@ -103,16 +122,21 @@ def main(args=None, app=None): # pylint: disable=missing-docstring try: # pylint: disable=no-member - loop.run_until_complete(asyncio.wait_for( - qubesadmin.events.utils.wait_for_domain_shutdown( - awaiting), args.timeout)) + loop.run_until_complete( + asyncio.wait_for( + qubesadmin.events.utils.wait_for_domain_shutdown(awaiting), + args.timeout, + ) + ) except (TimeoutError, asyncio.TimeoutError): if not args.dry_run: current_vms = failed_domains(awaiting) if current_vms: args.app.log.info( - 'Killing remaining qubes: {}' - .format(', '.join([str(vm) for vm in current_vms]))) + "Killing remaining qubes: {}".format( + ", ".join([str(vm) for vm in current_vms]) + ) + ) for vm in current_vms: try: vm.kill() @@ -126,10 +150,10 @@ def main(args=None, app=None): # pylint: disable=missing-docstring failed = failed_domains(args.domains) if failed: parser.error_runtime( - 'Failed to shut down: ' + - ', '.join(vm.name for vm in failed), - len(failed)) + "Failed to shut down: " + ", ".join(vm.name for vm in failed), + len(failed), + ) -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(main()) From 60ae3ef9886812a46f0f1c8ed39c4920df25af8e Mon Sep 17 00:00:00 2001 From: Ben Grande Date: Fri, 8 May 2026 15:15:54 +0200 Subject: [PATCH 05/10] Wait for multiple shutdowns without events - Avoids extra calls that might not be available - Gather returned exception to raise appropriate message and act accordingly - Don't retry shutdown by checking power state, but just by waiting for the call to return Fixes: https://github.com/QubesOS/qubes-issues/issues/6132 --- qubesadmin/tests/tools/qvm_shutdown.py | 266 +++++++++++-------------- qubesadmin/tools/qvm_shutdown.py | 227 ++++++++++++++------- 2 files changed, 265 insertions(+), 228 deletions(-) diff --git a/qubesadmin/tests/tools/qvm_shutdown.py b/qubesadmin/tests/tools/qvm_shutdown.py index a43808c4..9623aa29 100644 --- a/qubesadmin/tests/tools/qvm_shutdown.py +++ b/qubesadmin/tests/tools/qvm_shutdown.py @@ -21,7 +21,6 @@ # pylint: disable=missing-docstring import asyncio -import unittest.mock import qubesadmin.tests import qubesadmin.tests.tools @@ -88,27 +87,12 @@ def test_010_wait(self): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - mock_events = unittest.mock.AsyncMock() - patch = unittest.mock.patch( - 'qubesadmin.events.EventsDispatcher._get_events_reader', - mock_events) - patch.start() - self.addCleanup(patch.stop) - mock_events.side_effect = qubesadmin.tests.tools.MockEventsReader([ - b'1\0\0connection-established\0\0', - b'1\0some-vm\0domain-shutdown\0\0', - ]) - self.app.expected_calls[ - ('some-vm', 'admin.vm.Shutdown', None, None)] = \ + ('some-vm', 'admin.vm.Shutdown', 'wait', None)] = \ b'0\x00' self.app.expected_calls[ ('dom0', 'admin.vm.List', None, None)] = \ b'0\x00some-vm class=AppVM state=Running\n' - self.app.expected_calls[ - ('some-vm', 'admin.vm.CurrentState', None, None)] = \ - [b'0\x00power_state=Running'] + \ - [b'0\x00power_state=Halted'] qubesadmin.tools.qvm_shutdown.main(['--wait', 'some-vm'], app=self.app) self.assertAllCalled() @@ -117,27 +101,14 @@ def test_012_wait_all(self): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - mock_events = unittest.mock.AsyncMock() - patch = unittest.mock.patch( - 'qubesadmin.events.EventsDispatcher._get_events_reader', - mock_events) - patch.start() - self.addCleanup(patch.stop) - mock_events.side_effect = qubesadmin.tests.tools.MockEventsReader([ - b'1\0\0connection-established\0\0', - b'1\0sys-net\0domain-shutdown\0\0', - b'1\0some-vm\0domain-shutdown\0\0', - b'1\0other-vm\0domain-shutdown\0\0', - ]) - self.app.expected_calls[ - ('some-vm', 'admin.vm.Shutdown', 'force', None)] = \ + ('some-vm', 'admin.vm.Shutdown', 'force+wait', None)] = \ b'0\x00' self.app.expected_calls[ - ('other-vm', 'admin.vm.Shutdown', 'force', None)] = \ + ('other-vm', 'admin.vm.Shutdown', 'force+wait', None)] = \ b'0\x00' self.app.expected_calls[ - ('sys-net', 'admin.vm.Shutdown', 'force', None)] = \ + ('sys-net', 'admin.vm.Shutdown', 'force+wait', None)] = \ b'0\x00' self.app.expected_calls[ ('dom0', 'admin.vm.List', None, None)] = \ @@ -145,15 +116,6 @@ def test_012_wait_all(self): b'sys-net class=AppVM state=Running\n' \ b'some-vm class=AppVM state=Running\n' \ b'other-vm class=AppVM state=Running\n' - self.app.expected_calls[ - ('some-vm', 'admin.vm.CurrentState', None, None)] = \ - b'0\x00power_state=Halted' - self.app.expected_calls[ - ('other-vm', 'admin.vm.CurrentState', None, None)] = \ - b'0\x00power_state=Halted' - self.app.expected_calls[ - ('sys-net', 'admin.vm.CurrentState', None, None)] = \ - b'0\x00power_state=Halted' qubesadmin.tools.qvm_shutdown.main(['--wait', '--all'], app=self.app) self.assertAllCalled() @@ -162,31 +124,20 @@ def test_015_wait_all_kill_timeout(self): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - mock_events = unittest.mock.AsyncMock() - patch = unittest.mock.patch( - 'qubesadmin.events.EventsDispatcher._get_events_reader', - mock_events) - patch.start() - self.addCleanup(patch.stop) - mock_events.side_effect = qubesadmin.tests.tools.MockEventsReader([ - b'1\0\0connection-established\0\0', - b'1\0sys-net\0domain-shutdown\0\0', - ]) - self.app.expected_calls[ - ('some-vm', 'admin.vm.Shutdown', 'force', None)] = \ - b'0\x00' + ('some-vm', 'admin.vm.Shutdown', 'force+wait', None)] = \ + b'2\x00QubesVMShutdownTimeoutError\x00\x00Shutdown timed out\x00' self.app.expected_calls[ ('some-vm', 'admin.vm.Kill', None, None)] = \ b'2\x00QubesVMNotStartedError\x00\x00Domain is powered off\x00' self.app.expected_calls[ - ('other-vm', 'admin.vm.Shutdown', 'force', None)] = \ - b'0\x00' + ('other-vm', 'admin.vm.Shutdown', 'force+wait', None)] = \ + b'2\x00QubesVMShutdownTimeoutError\x00\x00Shutdown timed out\x00' self.app.expected_calls[ ('other-vm', 'admin.vm.Kill', None, None)] = \ b'0\x00' self.app.expected_calls[ - ('sys-net', 'admin.vm.Shutdown', 'force', None)] = \ + ('sys-net', 'admin.vm.Shutdown', 'force+wait', None)] = \ b'0\x00' self.app.expected_calls[ ('dom0', 'admin.vm.List', None, None)] = \ @@ -194,27 +145,9 @@ def test_015_wait_all_kill_timeout(self): b'sys-net class=AppVM state=Running\n' \ b'some-vm class=AppVM state=Running\n' \ b'other-vm class=AppVM state=Running\n' - self.app.expected_calls[ - ('some-vm', 'admin.vm.CurrentState', None, None)] = [ - b'0\x00power_state=Running', - b'0\x00power_state=Running', - b'0\x00power_state=Running', - ] - self.app.expected_calls[ - ('other-vm', 'admin.vm.CurrentState', None, None)] = [ - b'0\x00power_state=Running', - b'0\x00power_state=Running', - b'0\x00power_state=Running', - ] - self.app.expected_calls[ - ('sys-net', 'admin.vm.CurrentState', None, None)] = [ - b'0\x00power_state=Halted', - b'0\x00power_state=Halted', - b'0\x00power_state=Halted', - ] with self.assertRaisesRegex(SystemExit, '2'): qubesadmin.tools.qvm_shutdown.main( - ['--wait', '--all', '--timeout=1'], app=self.app) + ['--wait', '--all'], app=self.app) self.assertAllCalled() def test_016_all_exclude_noforce(self): @@ -266,49 +199,114 @@ def test_006_dry_run(self): ['--dry-run', 'some-vm'], app=self.app) self.assertAllCalled() - def test_011_wait_retry(self): + def test_011_wait_retry_simple(self): '''test --wait retries VMs whose shutdown request failed''' loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - mock_events = unittest.mock.AsyncMock() - patch = unittest.mock.patch( - 'qubesadmin.events.EventsDispatcher._get_events_reader', - mock_events) - patch.start() - self.addCleanup(patch.stop) - mock_events.side_effect = qubesadmin.tests.tools.MockEventsReader([ - # round 1: wait for some-vm - b'1\0\0connection-established\0\0', - b'1\0some-vm\0domain-shutdown\0\0', - # round 2: wait for other-vm - b'1\0\0connection-established\0\0', - b'1\0other-vm\0domain-shutdown\0\0', - ]) - self.app.expected_calls[ ('dom0', 'admin.vm.List', None, None)] = \ b'0\x00' \ b'some-vm class=AppVM state=Running\n' \ b'other-vm class=AppVM state=Running\n' self.app.expected_calls[ - ('some-vm', 'admin.vm.Shutdown', None, None)] = \ + ('some-vm', 'admin.vm.Shutdown', 'wait', None)] = \ b'0\x00' # other-vm fails first attempt, succeeds on retry self.app.expected_calls[ - ('other-vm', 'admin.vm.Shutdown', None, None)] = [ - b'2\x00QubesException\x00\x00Shutdown refused\x00', + ('other-vm', 'admin.vm.Shutdown', 'wait', None)] = [ + b'2\x00QubesVMInUseError\x00\x00Denied as qube is in use\x00', b'0\x00', ] + qubesadmin.tools.qvm_shutdown.main( + ['--wait', 'some-vm', 'other-vm'], app=self.app) + self.assertAllCalled() + + def test_011_wait_retry_once_succeed(self): + '''test --wait retry quits early if none fail''' + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( + b'0\x00' + b'some-vm class=AppVM state=Running\n' + b'other-vm class=AppVM state=Running\n' + b'another-vm class=AppVM state=Running\n' + ) + self.app.expected_calls[ + ('some-vm', 'admin.vm.Shutdown', 'wait', None)] = [ + b'2\x00QubesVMInUseError\x00\x00Denied as qube is in use\x00', + b'0\x00' + ] + self.app.expected_calls[ + ('other-vm', 'admin.vm.Shutdown', 'wait', None)] = [ + b'2\x00QubesVMInUseError\x00\x00Denied as qube is in use\x00', + b'0\x00', + ] + self.app.expected_calls[ + ('another-vm', 'admin.vm.Shutdown', 'wait', None)] = [ + b'2\x00QubesVMInUseError\x00\x00Denied as qube is in use\x00', + b'0\x00', + ] + qubesadmin.tools.qvm_shutdown.main( + ['--wait', 'some-vm', 'other-vm', 'another-vm'], app=self.app) + self.assertAllCalled() + + def test_011_wait_retry_once_fail(self): + '''test --wait retry quits early if none succeed''' + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = ( + b'0\x00' + b'some-vm class=AppVM state=Running\n' + b'other-vm class=AppVM state=Running\n' + b'another-vm class=AppVM state=Running\n' + ) + self.app.expected_calls[ + ('some-vm', 'admin.vm.Shutdown', 'wait', None)] = [ + b'2\x00QubesVMInUseError\x00\x00Denied as qube is in use\x00', + b'2\x00QubesVMInUseError\x00\x00Denied as qube is in use\x00', + ] + self.app.expected_calls[ + ('other-vm', 'admin.vm.Shutdown', 'wait', None)] = [ + b'2\x00QubesVMInUseError\x00\x00Denied as qube is in use\x00', + b'2\x00QubesVMInUseError\x00\x00Denied as qube is in use\x00', + ] + self.app.expected_calls[ + ('another-vm', 'admin.vm.Shutdown', 'wait', None)] = [ + b'2\x00QubesVMInUseError\x00\x00Denied as qube is in use\x00', + b'2\x00QubesVMInUseError\x00\x00Denied as qube is in use\x00', + ] + with self.assertRaises(SystemExit) as cm: + qubesadmin.tools.qvm_shutdown.main( + ['--wait', 'some-vm', 'other-vm', 'another-vm'], app=self.app) + self.assertEqual(cm.exception.code, 3) + self.assertAllCalled() + + def test_011_wait_retry_as_many_as_in_use(self): + '''test --wait retries VMs whose shutdown request failed as many as + there were used qubes''' + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + self.app.expected_calls[ - ('some-vm', 'admin.vm.CurrentState', None, None)] = [ - b'0\x00power_state=Running', - b'0\x00power_state=Halted', + ('dom0', 'admin.vm.List', None, None)] = \ + b'0\x00' \ + b'some-vm class=AppVM state=Running\n' \ + b'other-vm class=AppVM state=Running\n' + # some-vm fails first attempt, succeeds on retry + self.app.expected_calls[ + ('some-vm', 'admin.vm.Shutdown', 'wait', None)] = [ + b'2\x00QubesVMInUseError\x00\x00Denied as qube is in use\x00', \ + b'0\x00' ] + # other-vm fails first and second attempt, succeeds on retry self.app.expected_calls[ - ('other-vm', 'admin.vm.CurrentState', None, None)] = [ - b'0\x00power_state=Running', - b'0\x00power_state=Halted', + ('other-vm', 'admin.vm.Shutdown', 'wait', None)] = [ + b'2\x00QubesVMInUseError\x00\x00Denied as qube is in use\x00', \ + b'2\x00QubesVMInUseError\x00\x00Denied as qube is in use\x00', + b'0\x00', ] qubesadmin.tools.qvm_shutdown.main( ['--wait', 'some-vm', 'other-vm'], app=self.app) @@ -320,80 +318,46 @@ def test_013_wait_all_shutdown_fail(self): ('dom0', 'admin.vm.List', None, None)] = \ b'0\x00some-vm class=AppVM state=Running\n' self.app.expected_calls[ - ('some-vm', 'admin.vm.Shutdown', None, None)] = \ + ('some-vm', 'admin.vm.Shutdown', 'wait', None)] = \ b'2\x00QubesException\x00\x00Shutdown refused\x00' - self.app.expected_calls[ - ('some-vm', 'admin.vm.CurrentState', None, None)] = \ - b'0\x00power_state=Running' with self.assertRaises(SystemExit): qubesadmin.tools.qvm_shutdown.main( ['--wait', 'some-vm'], app=self.app) self.assertAllCalled() - def test_016_wait_kill_exception(self): - '''test --wait timeout where kill raises QubesException''' + def test_014_timeout_deprecated_means_wait(self): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - mock_events = unittest.mock.AsyncMock() - patch = unittest.mock.patch( - 'qubesadmin.events.EventsDispatcher._get_events_reader', - mock_events) - patch.start() - self.addCleanup(patch.stop) - mock_events.side_effect = qubesadmin.tests.tools.MockEventsReader([ - b'1\0\0connection-established\0\0', - ]) - self.app.expected_calls[ ('dom0', 'admin.vm.List', None, None)] = \ b'0\x00some-vm class=AppVM state=Running\n' self.app.expected_calls[ - ('some-vm', 'admin.vm.Shutdown', None, None)] = \ - b'0\x00' + ('some-vm', 'admin.vm.Shutdown', 'wait', None)] = \ + b'2\x00QubesVMShutdownTimeoutError\x00\x00Shutdown timed out\x00' self.app.expected_calls[ ('some-vm', 'admin.vm.Kill', None, None)] = \ b'2\x00QubesException\x00\x00Kill failed\x00' - self.app.expected_calls[ - ('some-vm', 'admin.vm.CurrentState', None, None)] = [ - b'0\x00power_state=Running', - b'0\x00power_state=Running', - ] with self.assertRaises(SystemExit): qubesadmin.tools.qvm_shutdown.main( - ['--wait', '--timeout=1', 'some-vm'], app=self.app) + ['--timeout=3', 'some-vm'], app=self.app) self.assertAllCalled() - def test_017_wait_dispvm_na(self): - '''test --wait treats DispVM with NA power state as shut down''' + def test_016_wait_kill_exception(self): + '''test --wait timeout where kill raises QubesException''' loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - mock_events = unittest.mock.AsyncMock() - patch = unittest.mock.patch( - 'qubesadmin.events.EventsDispatcher._get_events_reader', - mock_events) - patch.start() - self.addCleanup(patch.stop) - mock_events.side_effect = qubesadmin.tests.tools.MockEventsReader([ - b'1\0\0connection-established\0\0', - b'1\0disp123\0domain-shutdown\0\0', - ]) - self.app.expected_calls[ ('dom0', 'admin.vm.List', None, None)] = \ - b'0\x00disp123 class=DispVM state=Running\n' + b'0\x00some-vm class=AppVM state=Running\n' self.app.expected_calls[ - ('disp123', 'admin.vm.Shutdown', None, None)] = \ - b'0\x00' + ('some-vm', 'admin.vm.Shutdown', 'wait', None)] = \ + b'2\x00QubesVMShutdownTimeoutError\x00\x00Shutdown timed out\x00' self.app.expected_calls[ - ('disp123', 'admin.vm.CurrentState', None, None)] = [ - b'0\x00power_state=Running', - # failed_domains: first get_power_state() != 'Halted', - # then klass == 'DispVM' triggers second get_power_state() - b'0\x00power_state=NA', - b'0\x00power_state=NA', - ] - qubesadmin.tools.qvm_shutdown.main( - ['--wait', 'disp123'], app=self.app) + ('some-vm', 'admin.vm.Kill', None, None)] = \ + b'2\x00QubesException\x00\x00Kill failed\x00' + with self.assertRaises(SystemExit): + qubesadmin.tools.qvm_shutdown.main( + ['--wait', 'some-vm'], app=self.app) self.assertAllCalled() diff --git a/qubesadmin/tools/qvm_shutdown.py b/qubesadmin/tools/qvm_shutdown.py index a07eb536..9b67a6a3 100644 --- a/qubesadmin/tools/qvm_shutdown.py +++ b/qubesadmin/tools/qvm_shutdown.py @@ -24,9 +24,9 @@ from __future__ import print_function -import sys - import asyncio +import sys +from warnings import warn import qubesadmin.events.utils import qubesadmin.tools @@ -46,10 +46,8 @@ parser.add_argument( "--timeout", action="store", - type=float, - default=60, - help="timeout after which domains are killed when using --wait" - " (default: %(default)d)", + help="Deprecated, use --wait instead. Setting this option will enable " + "--wait", ) parser.add_argument( @@ -70,89 +68,164 @@ ) -def failed_domains(vms): - """Find the domains that have not successfully been shut down""" - - # DispVM might have been deleted before we check them, so NA is acceptable. - return [ - vm - for vm in vms - if not ( - vm.get_power_state() == "Halted" - or (vm.klass == "DispVM" and vm.get_power_state() == "NA") - ) +def shutdown( + args, + loop: asyncio.AbstractEventLoop, + domains: list[qubesadmin.vm.QubesVM], + force: bool, +): + """ + Asynchronously shutdown qubes and return qubes that failed to shutdown + because and the client can't handle, as well as qubes that were in use + while --force was not provided, as well as timed out. + """ + # pylint: disable=missing-docstring + unhandled, used, timedout = [], [], [] + tasks = [ + asyncio.to_thread(qube.shutdown, force=force, wait=args.wait) + for qube in domains ] + results = loop.run_until_complete( + asyncio.gather(*tasks, return_exceptions=True) + ) + for qube, res in zip(domains, results): + if not isinstance(res, BaseException): + qube.log.info("Shutdown succeeded") + continue + try: + raise res + except qubesadmin.exc.QubesVMNotStartedError: + pass + except qubesadmin.exc.QubesVMInUseError as e: + if args.wait: + qube.log.error("Shutdown error: {}".format(e)) + else: + qube.log.error("Shutdown error: (try --force): {}".format(e)) + used.append(qube) + except qubesadmin.exc.QubesVMShutdownTimeoutError as e: + if args.wait: + qube.log.error("Shutdown error: {}".format(e)) + else: + qube.log.error("Shutdown error: (try qvm-kill): {}".format(e)) + timedout.append(qube) + except qubesadmin.exc.QubesException as e: + qube.log.error("Shutdown error: {}".format(e)) + unhandled.append(qube) + return unhandled, used, timedout + + +def kill(loop: asyncio.AbstractEventLoop, domains: list[qubesadmin.vm.QubesVM]): + """ + Asynchronously kill qubes and return qubes that failed to shutdown. + """ + # pylint: disable=missing-docstring + unhandled = domains.copy() + tasks = [asyncio.to_thread(qube.kill) for qube in domains] + results = loop.run_until_complete( + asyncio.gather(*tasks, return_exceptions=True) + ) + for qube, res in zip(domains, results): + if not isinstance(res, BaseException): + qube.log.info("Killing succeeded") + unhandled.remove(qube) + continue + try: + raise res + except qubesadmin.exc.QubesVMNotStartedError: + unhandled.remove(qube) + except qubesadmin.exc.QubesException as e: + qube.log.error("Kill error: {}".format(e)) + return unhandled -def main(args=None, app=None): # pylint: disable=missing-docstring +def main(args=None, app=None): + # pylint: disable=missing-docstring args = parser.parse_args(args, app=app) - force = args.force or (args.all_domains and not args.exclude) + if args.dry_run: + return + if args.timeout: + warn( + "Call to deprecated --timeout option, use --wait instead", + FutureWarning, + ) + args.wait = True loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - remaining_domains = set(args.domains) - for _ in range(len(args.domains)): - if not remaining_domains: - break - shutdown_failed = set() - if not args.dry_run: - for vm in remaining_domains: - try: - vm.shutdown(force=force) - except qubesadmin.exc.QubesVMNotStartedError: - pass - except qubesadmin.exc.QubesException as e: - if not args.wait: - vm.log.error("Shutdown error: {}".format(e)) - shutdown_failed.add(vm) - if not args.wait: - if shutdown_failed: - parser.error_runtime( - "Failed to shut down: " - + ", ".join(vm.name for vm in shutdown_failed), - len(shutdown_failed), - ) - return - awaiting = remaining_domains - shutdown_failed - remaining_domains = shutdown_failed - if not awaiting: - # no VM shutdown request succeeded, no sense to try again + unhandled, used, timedout = shutdown( + args=args, force=force, loop=loop, domains=args.domains + ) + unhandled_retry = [] + timedout_retry = [] + if used: + old_failed = unhandled, used, timedout + parser.print_error( + "Retrying shutdown of qubes that were in use for {} time(s)".format( + len(used) + ) + ) + for _ in range(len(used)): + parser.print_error( + "Retrying shutdown of qubes that were in use: {}".format( + ", ".join(qube.name for qube in used) + ) + ) + failed = shutdown(args=args, force=force, loop=loop, domains=used) + unhandled_retry, used, timedout_retry = failed + if not failed: break + if old_failed: + len_failed = sum(len(item) for item in failed) + len_old_failed = sum(len(item) for item in old_failed) + if len_failed == len_old_failed: + break + old_failed = failed + + unhandled.extend(qube for qube in unhandled_retry if qube not in unhandled) + timedout.extend(qube for qube in timedout_retry if qube not in timedout) + + # Retry timed out only once, as it can take a long time, 60s by default. + if timedout: + parser.print_error( + "Retrying shutdown of qubes that timed out: {}".format( + ", ".join(qube.name for qube in timedout) + ) + ) + unhandled, used, timedout = shutdown( + args=args, force=force, loop=loop, domains=timedout + ) - try: - # pylint: disable=no-member - loop.run_until_complete( - asyncio.wait_for( - qubesadmin.events.utils.wait_for_domain_shutdown(awaiting), - args.timeout, - ) + if timedout: + parser.print_error( + "Killing timed out qubes: {}".format( + ", ".join(qube.name for qube in timedout) + ) + ) + unhandled = kill(loop=loop, domains=timedout) + + if not unhandled and not used and not timedout: + return + + if unhandled: + parser.print_error( + "Failed to shut down for unknown reason: {}".format( + ", ".join(qube.name for qube in unhandled) + ) + ) + if used: + parser.print_error( + "Failed to shut down because it's in use: {}".format( + ", ".join(qube.name for qube in used) + ) + ) + if timedout: + parser.print_error( + "Failed to shut down because of time out: {}".format( + ", ".join(qube.name for qube in timedout) ) - except (TimeoutError, asyncio.TimeoutError): - if not args.dry_run: - current_vms = failed_domains(awaiting) - if current_vms: - args.app.log.info( - "Killing remaining qubes: {}".format( - ", ".join([str(vm) for vm in current_vms]) - ) - ) - for vm in current_vms: - try: - vm.kill() - except qubesadmin.exc.QubesVMNotStartedError: - # already shut down - pass - except qubesadmin.exc.QubesException as e: - parser.error_runtime(e) - - loop.close() - failed = failed_domains(args.domains) - if failed: - parser.error_runtime( - "Failed to shut down: " + ", ".join(vm.name for vm in failed), - len(failed), ) + raise SystemExit(len(unhandled + used + timedout)) if __name__ == "__main__": From 4fdae1542a64e7d3f8303b8c56dd2ae50546b107 Mon Sep 17 00:00:00 2001 From: Ben Grande Date: Wed, 20 May 2026 20:56:32 +0200 Subject: [PATCH 06/10] Make qvm_shutdown reusable for other tools It was necessary to make alternative main async functions, as Python is picky about running async code when mixing async and sync functions. --- qubesadmin/tests/tools/qvm_kill.py | 10 ++--- .../tests/tools/qvm_template_postprocess.py | 41 +----------------- qubesadmin/tools/qvm_kill.py | 36 ++++++++-------- qubesadmin/tools/qvm_shutdown.py | 43 +++++++------------ qubesadmin/tools/qvm_template_postprocess.py | 37 +++------------- 5 files changed, 48 insertions(+), 119 deletions(-) diff --git a/qubesadmin/tests/tools/qvm_kill.py b/qubesadmin/tests/tools/qvm_kill.py index 081fd83e..6a07ee00 100644 --- a/qubesadmin/tests/tools/qvm_kill.py +++ b/qubesadmin/tests/tools/qvm_kill.py @@ -74,9 +74,9 @@ def test_004_other_error(self): ('dom0', 'admin.vm.List', None, None)] = \ b'0\x00some-vm class=AppVM state=Running\n' with qubesadmin.tests.tools.StderrBuffer() as stderr: - self.assertEqual( - qubesadmin.tools.qvm_kill.main(['some-vm'], app=self.app), - 1) + with self.assertRaises(SystemExit): + self.assertEqual( + qubesadmin.tools.qvm_kill.main(['some-vm'], app=self.app), + 1) self.assertAllCalled() - self.assertIn("Failed to kill 'some-vm': Error message", - stderr.getvalue()) + self.assertIn("Failed to kill: some-vm", stderr.getvalue()) diff --git a/qubesadmin/tests/tools/qvm_template_postprocess.py b/qubesadmin/tests/tools/qvm_template_postprocess.py index f04a5030..0818a1bc 100644 --- a/qubesadmin/tests/tools/qvm_template_postprocess.py +++ b/qubesadmin/tests/tools/qvm_template_postprocess.py @@ -400,18 +400,7 @@ def test_020_post_install(self, mock_import_root_img, self.app.expected_calls[ ('test-vm', 'admin.vm.Start', None, None)] = b'0\0' self.app.expected_calls[ - ('test-vm', 'admin.vm.Shutdown', None, None)] = b'0\0' - - if qubesadmin.tools.qvm_template_postprocess.have_events: - patch_domain_shutdown = mock.patch( - 'qubesadmin.events.utils.wait_for_domain_shutdown') - self.addCleanup(patch_domain_shutdown.stop) - mock_domain_shutdown = patch_domain_shutdown.start() - mock_domain_shutdown.side_effect = self.wait_for_shutdown - else: - self.app.expected_calls[ - ('test-vm', 'admin.vm.List', None, None)] = \ - b'0\0test-vm class=TemplateVM state=Halted\n' + ('test-vm', 'admin.vm.Shutdown', 'wait', None)] = b'0\0' asyncio.set_event_loop(asyncio.new_event_loop()) ret = qubesadmin.tools.qvm_template_postprocess.main([ @@ -424,9 +413,6 @@ def test_020_post_install(self, mock_import_root_img, 'test-vm'], self.source_dir.name) mock_import_appmenus.assert_called_once_with(self.app.domains[ 'test-vm'], self.source_dir.name, skip_generate=True) - if qubesadmin.tools.qvm_template_postprocess.have_events: - mock_domain_shutdown.assert_called_once_with([self.app.domains[ - 'test-vm']]) self.assertEqual(self.app.service_calls, [ ('test-vm', 'qubes.PostInstall', { 'stdin': subprocess.PIPE, @@ -458,18 +444,7 @@ def test_021_post_install_reinstall(self, mock_reset_private_img, self.app.expected_calls[ ('test-vm', 'admin.vm.Start', None, None)] = b'0\0' self.app.expected_calls[ - ('test-vm', 'admin.vm.Shutdown', None, None)] = b'0\0' - - if qubesadmin.tools.qvm_template_postprocess.have_events: - patch_domain_shutdown = mock.patch( - 'qubesadmin.events.utils.wait_for_domain_shutdown') - self.addCleanup(patch_domain_shutdown.stop) - mock_domain_shutdown = patch_domain_shutdown.start() - mock_domain_shutdown.side_effect = self.wait_for_shutdown - else: - self.app.expected_calls[ - ('test-vm', 'admin.vm.List', None, None)] = \ - b'0\0test-vm class=TemplateVM state=Halted\n' + ('test-vm', 'admin.vm.Shutdown', 'wait', None)] = b'0\0' asyncio.set_event_loop(asyncio.new_event_loop()) ret = qubesadmin.tools.qvm_template_postprocess.main([ @@ -483,9 +458,6 @@ def test_021_post_install_reinstall(self, mock_reset_private_img, 'test-vm']) mock_import_appmenus.assert_called_once_with(self.app.domains[ 'test-vm'], self.source_dir.name, skip_generate=True) - if qubesadmin.tools.qvm_template_postprocess.have_events: - mock_domain_shutdown.assert_called_once_with([self.app.domains[ - 'test-vm']]) self.assertEqual(self.app.service_calls, [ ('test-vm', 'qubes.PostInstall', { 'stdin': subprocess.PIPE, @@ -508,13 +480,6 @@ def test_022_post_install_skip_start(self, mock_reset_private_img, = b'0\0' self.app.add_new_vm = mock.Mock() - if qubesadmin.tools.qvm_template_postprocess.have_events: - patch_domain_shutdown = mock.patch( - 'qubesadmin.events.utils.wait_for_domain_shutdown') - self.addCleanup(patch_domain_shutdown.stop) - mock_domain_shutdown = patch_domain_shutdown.start() - mock_domain_shutdown.side_effect = self.wait_for_shutdown - asyncio.set_event_loop(asyncio.new_event_loop()) ret = qubesadmin.tools.qvm_template_postprocess.main([ '--really', '--skip-start', 'post-install', 'test-vm', @@ -528,8 +493,6 @@ def test_022_post_install_skip_start(self, mock_reset_private_img, 'test-vm']) mock_import_appmenus.assert_called_once_with(self.app.domains[ 'test-vm'], self.source_dir.name, skip_generate=False) - if qubesadmin.tools.qvm_template_postprocess.have_events: - self.assertFalse(mock_domain_shutdown.called) self.assertEqual(self.app.service_calls, []) self.assertAllCalled() diff --git a/qubesadmin/tools/qvm_kill.py b/qubesadmin/tools/qvm_kill.py index 305b4ddc..2a6190ca 100644 --- a/qubesadmin/tools/qvm_kill.py +++ b/qubesadmin/tools/qvm_kill.py @@ -21,36 +21,36 @@ '''Immediately terminate a qube without a graceful shutdown sequence.''' +import asyncio import sys + import qubesadmin.exc import qubesadmin.tools +import qubesadmin.tools.qvm_shutdown parser = qubesadmin.tools.QubesArgumentParser( description='immediately terminate a qube without a graceful shutdown' ' sequence', vmname_nargs='+') -def main(args=None, app=None): - '''Main routine of :program:`qvm-kill`. - - :param list args: Optional arguments to override those delivered from \ - command line. - ''' +async def run_async(args=None, app=None): + # pylint: disable=missing-docstring args = parser.parse_args(args, app=app) + remnants = await qubesadmin.tools.qvm_shutdown.kill(domains=args.domains) + if not remnants: + return 0 + parser.error_runtime( + "Failed to kill: {}".format( + ", ".join(qube.name for qube in remnants) + ), + len(remnants) + ) - exit_code = 0 - for domain in args.domains: - try: - domain.kill() - except qubesadmin.exc.QubesVMNotStartedError: - pass - except (IOError, OSError, qubesadmin.exc.QubesException) as e: - exit_code = 1 - parser.print_error("Failed to kill '{}': {}".format( - domain.name, e)) - - return exit_code + +def main(args=None, app=None): + # pylint: disable=missing-docstring + return asyncio.run(run_async(args=args, app=app)) if __name__ == '__main__': diff --git a/qubesadmin/tools/qvm_shutdown.py b/qubesadmin/tools/qvm_shutdown.py index 9b67a6a3..3ea85a56 100644 --- a/qubesadmin/tools/qvm_shutdown.py +++ b/qubesadmin/tools/qvm_shutdown.py @@ -28,7 +28,6 @@ import sys from warnings import warn -import qubesadmin.events.utils import qubesadmin.tools import qubesadmin.exc @@ -68,12 +67,7 @@ ) -def shutdown( - args, - loop: asyncio.AbstractEventLoop, - domains: list[qubesadmin.vm.QubesVM], - force: bool, -): +async def shutdown(args, domains: list[qubesadmin.vm.QubesVM]): """ Asynchronously shutdown qubes and return qubes that failed to shutdown because and the client can't handle, as well as qubes that were in use @@ -82,12 +76,10 @@ def shutdown( # pylint: disable=missing-docstring unhandled, used, timedout = [], [], [] tasks = [ - asyncio.to_thread(qube.shutdown, force=force, wait=args.wait) + asyncio.to_thread(qube.shutdown, force=args.force, wait=args.wait) for qube in domains ] - results = loop.run_until_complete( - asyncio.gather(*tasks, return_exceptions=True) - ) + results = await asyncio.gather(*tasks, return_exceptions=True) for qube, res in zip(domains, results): if not isinstance(res, BaseException): qube.log.info("Shutdown succeeded") @@ -114,16 +106,14 @@ def shutdown( return unhandled, used, timedout -def kill(loop: asyncio.AbstractEventLoop, domains: list[qubesadmin.vm.QubesVM]): +async def kill(domains: list[qubesadmin.vm.QubesVM]): """ Asynchronously kill qubes and return qubes that failed to shutdown. """ # pylint: disable=missing-docstring unhandled = domains.copy() tasks = [asyncio.to_thread(qube.kill) for qube in domains] - results = loop.run_until_complete( - asyncio.gather(*tasks, return_exceptions=True) - ) + results = await asyncio.gather(*tasks, return_exceptions=True) for qube, res in zip(domains, results): if not isinstance(res, BaseException): qube.log.info("Killing succeeded") @@ -138,10 +128,9 @@ def kill(loop: asyncio.AbstractEventLoop, domains: list[qubesadmin.vm.QubesVM]): return unhandled -def main(args=None, app=None): +async def run_async(args=None, app=None): # pylint: disable=missing-docstring args = parser.parse_args(args, app=app) - force = args.force or (args.all_domains and not args.exclude) if args.dry_run: return if args.timeout: @@ -150,12 +139,9 @@ def main(args=None, app=None): FutureWarning, ) args.wait = True + args.force = args.force or (args.all_domains and not args.exclude) - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - unhandled, used, timedout = shutdown( - args=args, force=force, loop=loop, domains=args.domains - ) + unhandled, used, timedout = await shutdown(args=args, domains=args.domains) unhandled_retry = [] timedout_retry = [] if used: @@ -171,7 +157,7 @@ def main(args=None, app=None): ", ".join(qube.name for qube in used) ) ) - failed = shutdown(args=args, force=force, loop=loop, domains=used) + failed = await shutdown(args=args, domains=used) unhandled_retry, used, timedout_retry = failed if not failed: break @@ -192,9 +178,7 @@ def main(args=None, app=None): ", ".join(qube.name for qube in timedout) ) ) - unhandled, used, timedout = shutdown( - args=args, force=force, loop=loop, domains=timedout - ) + unhandled, used, timedout = await shutdown(args=args, domains=timedout) if timedout: parser.print_error( @@ -202,7 +186,7 @@ def main(args=None, app=None): ", ".join(qube.name for qube in timedout) ) ) - unhandled = kill(loop=loop, domains=timedout) + unhandled = await kill(domains=timedout) if not unhandled and not used and not timedout: return @@ -228,5 +212,10 @@ def main(args=None, app=None): raise SystemExit(len(unhandled + used + timedout)) +def main(args=None, app=None): + # pylint: disable=missing-docstring + asyncio.run(run_async(args=args, app=app)) + + if __name__ == "__main__": sys.exit(main()) diff --git a/qubesadmin/tools/qvm_template_postprocess.py b/qubesadmin/tools/qvm_template_postprocess.py index 52ada236..25aa9c28 100644 --- a/qubesadmin/tools/qvm_template_postprocess.py +++ b/qubesadmin/tools/qvm_template_postprocess.py @@ -35,12 +35,6 @@ import qubesadmin import qubesadmin.exc import qubesadmin.tools -try: - # pylint: disable=wrong-import-position - import qubesadmin.events.utils - have_events = True -except ImportError: - have_events = False parser = qubesadmin.tools.QubesArgumentParser( description='Postprocess template package') @@ -244,30 +238,13 @@ async def call_postinstall_service(vm): vm.run_service_for_stdio('qubes.PostInstall') except subprocess.CalledProcessError: vm.log.error('qubes.PostInstall service failed') - vm.shutdown() - if have_events: - try: - # pylint: disable=no-member - await asyncio.wait_for( - qubesadmin.events.utils.wait_for_domain_shutdown([vm]), - qubesadmin.config.defaults['shutdown_timeout']) - except asyncio.TimeoutError: - try: - vm.kill() - except qubesadmin.exc.QubesVMNotStartedError: - pass - else: - timeout = qubesadmin.config.defaults['shutdown_timeout'] - while timeout >= 0: - if vm.is_halted(): - break - await asyncio.sleep(1) - timeout -= 1 - if not vm.is_halted(): - try: - vm.kill() - except qubesadmin.exc.QubesVMNotStartedError: - pass + try: + await asyncio.wait_for( + asyncio.to_thread(vm.shutdown, wait=True), + timeout=qubesadmin.config.defaults['shutdown_timeout'], + ) + except TimeoutError: + await asyncio.to_thread(vm.kill) finally: vm.netvm = qubesadmin.DEFAULT From 55bb4aaee62c8a513554c0261f3c6a027e363c85 Mon Sep 17 00:00:00 2001 From: Ben Grande Date: Fri, 22 May 2026 11:26:48 +0200 Subject: [PATCH 07/10] Move qube actions to utils for reusability --- qubesadmin/tests/tools/qvm_shutdown.py | 17 ++-- qubesadmin/tools/qvm_kill.py | 13 +-- qubesadmin/tools/qvm_pause.py | 20 ++-- qubesadmin/tools/qvm_shutdown.py | 132 +++++++++---------------- qubesadmin/tools/qvm_start.py | 90 ++++++++++------- qubesadmin/tools/qvm_unpause.py | 17 ++-- qubesadmin/utils.py | 104 +++++++++++++++++++ 7 files changed, 234 insertions(+), 159 deletions(-) diff --git a/qubesadmin/tests/tools/qvm_shutdown.py b/qubesadmin/tests/tools/qvm_shutdown.py index 9623aa29..0bf9a741 100644 --- a/qubesadmin/tests/tools/qvm_shutdown.py +++ b/qubesadmin/tests/tools/qvm_shutdown.py @@ -145,9 +145,7 @@ def test_015_wait_all_kill_timeout(self): b'sys-net class=AppVM state=Running\n' \ b'some-vm class=AppVM state=Running\n' \ b'other-vm class=AppVM state=Running\n' - with self.assertRaisesRegex(SystemExit, '2'): - qubesadmin.tools.qvm_shutdown.main( - ['--wait', '--all'], app=self.app) + qubesadmin.tools.qvm_shutdown.main(['--wait', '--all'], app=self.app) self.assertAllCalled() def test_016_all_exclude_noforce(self): @@ -278,10 +276,10 @@ def test_011_wait_retry_once_fail(self): b'2\x00QubesVMInUseError\x00\x00Denied as qube is in use\x00', b'2\x00QubesVMInUseError\x00\x00Denied as qube is in use\x00', ] - with self.assertRaises(SystemExit) as cm: + with self.assertRaises(SystemExit) as cmg: qubesadmin.tools.qvm_shutdown.main( ['--wait', 'some-vm', 'other-vm', 'another-vm'], app=self.app) - self.assertEqual(cm.exception.code, 3) + self.assertEqual(cmg.exception.code, 3) self.assertAllCalled() def test_011_wait_retry_as_many_as_in_use(self): @@ -320,9 +318,10 @@ def test_013_wait_all_shutdown_fail(self): self.app.expected_calls[ ('some-vm', 'admin.vm.Shutdown', 'wait', None)] = \ b'2\x00QubesException\x00\x00Shutdown refused\x00' - with self.assertRaises(SystemExit): + with self.assertRaises(SystemExit) as cmg: qubesadmin.tools.qvm_shutdown.main( ['--wait', 'some-vm'], app=self.app) + self.assertEqual(cmg.exception.code, 1) self.assertAllCalled() def test_014_timeout_deprecated_means_wait(self): @@ -338,9 +337,10 @@ def test_014_timeout_deprecated_means_wait(self): self.app.expected_calls[ ('some-vm', 'admin.vm.Kill', None, None)] = \ b'2\x00QubesException\x00\x00Kill failed\x00' - with self.assertRaises(SystemExit): + with self.assertRaises(SystemExit) as cmg: qubesadmin.tools.qvm_shutdown.main( ['--timeout=3', 'some-vm'], app=self.app) + self.assertEqual(cmg.exception.code, 1) self.assertAllCalled() def test_016_wait_kill_exception(self): @@ -357,7 +357,8 @@ def test_016_wait_kill_exception(self): self.app.expected_calls[ ('some-vm', 'admin.vm.Kill', None, None)] = \ b'2\x00QubesException\x00\x00Kill failed\x00' - with self.assertRaises(SystemExit): + with self.assertRaises(SystemExit) as cmg: qubesadmin.tools.qvm_shutdown.main( ['--wait', 'some-vm'], app=self.app) + self.assertEqual(cmg.exception.code, 1) self.assertAllCalled() diff --git a/qubesadmin/tools/qvm_kill.py b/qubesadmin/tools/qvm_kill.py index 2a6190ca..10a2e877 100644 --- a/qubesadmin/tools/qvm_kill.py +++ b/qubesadmin/tools/qvm_kill.py @@ -37,15 +37,12 @@ async def run_async(args=None, app=None): # pylint: disable=missing-docstring args = parser.parse_args(args, app=app) - remnants = await qubesadmin.tools.qvm_shutdown.kill(domains=args.domains) - if not remnants: + failed = await qubesadmin.utils.kill(domains=args.domains) + if not failed: return 0 - parser.error_runtime( - "Failed to kill: {}".format( - ", ".join(qube.name for qube in remnants) - ), - len(remnants) - ) + for qube, exc in failed.items(): + parser.print_error("Failed to kill: {}: {}".format(qube, exc)) + raise SystemExit(len(failed)) def main(args=None, app=None): diff --git a/qubesadmin/tools/qvm_pause.py b/qubesadmin/tools/qvm_pause.py index 7a4b777e..a13b9104 100644 --- a/qubesadmin/tools/qvm_pause.py +++ b/qubesadmin/tools/qvm_pause.py @@ -20,6 +20,7 @@ '''qvm-pause - Pause a domain''' +import asyncio import sys import qubesadmin @@ -45,15 +46,16 @@ def main(args=None, app=None): args = parser.parse_args(args, app=app) exit_code = 0 - for domain in args.domains: - try: - if args.suspend: - domain.suspend() - else: - domain.pause() - except (IOError, OSError, qubesadmin.exc.QubesException) as e: - exit_code = 1 - parser.print_error(str(e)) + if args.suspend: + action = "suspend" + failed = asyncio.run(qubesadmin.utils.suspend(domains=args.domains)) + else: + action = "pause" + failed = asyncio.run(qubesadmin.utils.pause(domains=args.domains)) + if failed: + exit_code = 1 + for qube, exc in failed.items(): + parser.print_error("Failed to {}: {}: {}".format(action, qube, exc)) return exit_code diff --git a/qubesadmin/tools/qvm_shutdown.py b/qubesadmin/tools/qvm_shutdown.py index 3ea85a56..dbd5eced 100644 --- a/qubesadmin/tools/qvm_shutdown.py +++ b/qubesadmin/tools/qvm_shutdown.py @@ -67,67 +67,27 @@ ) -async def shutdown(args, domains: list[qubesadmin.vm.QubesVM]): - """ - Asynchronously shutdown qubes and return qubes that failed to shutdown - because and the client can't handle, as well as qubes that were in use - while --force was not provided, as well as timed out. - """ - # pylint: disable=missing-docstring - unhandled, used, timedout = [], [], [] - tasks = [ - asyncio.to_thread(qube.shutdown, force=args.force, wait=args.wait) - for qube in domains - ] - results = await asyncio.gather(*tasks, return_exceptions=True) - for qube, res in zip(domains, results): - if not isinstance(res, BaseException): - qube.log.info("Shutdown succeeded") - continue - try: - raise res - except qubesadmin.exc.QubesVMNotStartedError: - pass - except qubesadmin.exc.QubesVMInUseError as e: - if args.wait: - qube.log.error("Shutdown error: {}".format(e)) - else: - qube.log.error("Shutdown error: (try --force): {}".format(e)) - used.append(qube) - except qubesadmin.exc.QubesVMShutdownTimeoutError as e: - if args.wait: - qube.log.error("Shutdown error: {}".format(e)) - else: - qube.log.error("Shutdown error: (try qvm-kill): {}".format(e)) - timedout.append(qube) - except qubesadmin.exc.QubesException as e: - qube.log.error("Shutdown error: {}".format(e)) - unhandled.append(qube) +async def shutdown(domains, **shutdown_kwargs): + # pylint: disable=missing-function-docstring + failed = await qubesadmin.utils.shutdown(domains=domains, **shutdown_kwargs) + used = { + qube: exc + for qube, exc in failed.items() + if isinstance(exc, qubesadmin.exc.QubesVMInUseError) + } + timedout = { + qube: exc + for qube, exc in failed.items() + if isinstance(exc, qubesadmin.exc.QubesVMShutdownTimeoutError) + } + unhandled = { + qube: exc + for qube, exc in failed.items() + if qube not in used and qube not in timedout + } return unhandled, used, timedout -async def kill(domains: list[qubesadmin.vm.QubesVM]): - """ - Asynchronously kill qubes and return qubes that failed to shutdown. - """ - # pylint: disable=missing-docstring - unhandled = domains.copy() - tasks = [asyncio.to_thread(qube.kill) for qube in domains] - results = await asyncio.gather(*tasks, return_exceptions=True) - for qube, res in zip(domains, results): - if not isinstance(res, BaseException): - qube.log.info("Killing succeeded") - unhandled.remove(qube) - continue - try: - raise res - except qubesadmin.exc.QubesVMNotStartedError: - unhandled.remove(qube) - except qubesadmin.exc.QubesException as e: - qube.log.error("Kill error: {}".format(e)) - return unhandled - - async def run_async(args=None, app=None): # pylint: disable=missing-docstring args = parser.parse_args(args, app=app) @@ -140,10 +100,16 @@ async def run_async(args=None, app=None): ) args.wait = True args.force = args.force or (args.all_domains and not args.exclude) - - unhandled, used, timedout = await shutdown(args=args, domains=args.domains) - unhandled_retry = [] - timedout_retry = [] + shutdown_kwargs = { + "force": args.force, + "wait": args.wait, + } + + unhandled, used, timedout = await shutdown( + domains=args.domains, **shutdown_kwargs + ) + unhandled_retry = {} + timedout_retry = {} if used: old_failed = unhandled, used, timedout parser.print_error( @@ -157,7 +123,7 @@ async def run_async(args=None, app=None): ", ".join(qube.name for qube in used) ) ) - failed = await shutdown(args=args, domains=used) + failed = await shutdown(domains=used, **shutdown_kwargs) unhandled_retry, used, timedout_retry = failed if not failed: break @@ -168,8 +134,8 @@ async def run_async(args=None, app=None): break old_failed = failed - unhandled.extend(qube for qube in unhandled_retry if qube not in unhandled) - timedout.extend(qube for qube in timedout_retry if qube not in timedout) + unhandled.update(unhandled_retry) + timedout.update(timedout_retry) # Retry timed out only once, as it can take a long time, 60s by default. if timedout: @@ -178,7 +144,9 @@ async def run_async(args=None, app=None): ", ".join(qube.name for qube in timedout) ) ) - unhandled, used, timedout = await shutdown(args=args, domains=timedout) + unhandled, used, timedout = await shutdown( + domains=timedout, **shutdown_kwargs + ) if timedout: parser.print_error( @@ -186,30 +154,18 @@ async def run_async(args=None, app=None): ", ".join(qube.name for qube in timedout) ) ) - unhandled = await kill(domains=timedout) + timedout = await qubesadmin.utils.kill(domains=timedout) - if not unhandled and not used and not timedout: - return - - if unhandled: - parser.print_error( - "Failed to shut down for unknown reason: {}".format( - ", ".join(qube.name for qube in unhandled) - ) - ) - if used: - parser.print_error( - "Failed to shut down because it's in use: {}".format( - ", ".join(qube.name for qube in used) + for item in [unhandled, used, timedout]: + for qube, exc in item.items(): + parser.print_error( + "Failed to shut down: {}: {}".format(qube.name, str(exc)) ) - ) - if timedout: - parser.print_error( - "Failed to shut down because of time out: {}".format( - ", ".join(qube.name for qube in timedout) - ) - ) - raise SystemExit(len(unhandled + used + timedout)) + + exit_code = len(unhandled) + len(used) + len(timedout) + if exit_code == 0: + return + raise SystemExit(exit_code) def main(args=None, app=None): diff --git a/qubesadmin/tools/qvm_start.py b/qubesadmin/tools/qvm_start.py index 187782c6..9f8543c2 100644 --- a/qubesadmin/tools/qvm_start.py +++ b/qubesadmin/tools/qvm_start.py @@ -19,6 +19,7 @@ # with this program; if not, see . '''qvm-start - start a domain''' +import asyncio import argparse import string import sys @@ -165,51 +166,66 @@ def get_drive_assignment(app, drive_str): return assignment -def main(args=None, app=None): - '''Main routine of :program:`qvm-start`. +class QubesVMAlreadyRunningError(qubesadmin.exc.QubesVMError): + """Requested qube to start, but it's already running""" - :param list args: Optional arguments to override those delivered from \ - command line. - ''' +def startup(domain, args=None): + # pylint: disable=missing-function-docstring + if domain.is_running(): + if args.skip_if_running: + return + raise QubesVMAlreadyRunningError("Domain is already running") + drive_assignment = None + try: + if args.drive: + drive_assignment = get_drive_assignment(args.app, args.drive) + try: + domain.devices['block'].assign(drive_assignment) + except Exception: + drive_assignment = None + raise + + domain.start() + + if drive_assignment: + # don't reconnect this device after VM reboot + domain.devices['block'].unassign(drive_assignment) + except (IOError, OSError, qubesadmin.exc.QubesException, ValueError) as e: + if drive_assignment: + try: + domain.devices['block'].detach(drive_assignment) + except qubesadmin.exc.QubesException: + pass + raise e + + +async def run_async(args=None, app=None): + # pylint: disable=missing-function-docstring args = parser.parse_args(args, app=app) - + tasks = [ + asyncio.to_thread(startup, domain=qube, args=args) + for qube in args.domains + ] + results = await asyncio.gather(*tasks, return_exceptions=True) exit_code = 0 - for domain in args.domains: - if domain.is_running(): - if args.skip_if_running: - continue + for qube, res in zip(args.domains, results): + if isinstance(res, BaseException): exit_code = 1 parser.print_error( - 'domain {} is already running'.format(domain.name)) - return exit_code - drive_assignment = None - try: - if args.drive: - drive_assignment = get_drive_assignment(args.app, args.drive) - try: - domain.devices['block'].assign(drive_assignment) - except Exception: - drive_assignment = None - raise - - domain.start() - - if drive_assignment: - # don't reconnect this device after VM reboot - domain.devices['block'].unassign(drive_assignment) - except (IOError, OSError, qubesadmin.exc.QubesException, - ValueError) as e: - if drive_assignment: - try: - domain.devices['block'].detach(drive_assignment) - except qubesadmin.exc.QubesException: - pass - exit_code = 1 - parser.print_error(str(e)) - + 'Starting qube failed: {}: {}'.format(qube.name, str(res)) + ) return exit_code +def main(args=None, app=None): + '''Main routine of :program:`qvm-start`. + + :param list args: Optional arguments to override those delivered from \ + command line. + ''' + return asyncio.run(run_async(args=args, app=app)) + + if __name__ == '__main__': sys.exit(main()) diff --git a/qubesadmin/tools/qvm_unpause.py b/qubesadmin/tools/qvm_unpause.py index fca9eceb..b555622a 100644 --- a/qubesadmin/tools/qvm_unpause.py +++ b/qubesadmin/tools/qvm_unpause.py @@ -20,6 +20,7 @@ '''qvm-unpause - Unpause a domain''' +import asyncio import sys import qubesadmin @@ -45,15 +46,13 @@ def main(args=None, app=None): for vm in domains if not vm.features.get("internal") ] - for domain in domains: - try: - if domain.is_suspended(): - domain.resume() - else: - domain.unpause() - except (IOError, OSError, qubesadmin.exc.QubesException) as e: - exit_code = 1 - parser.print_error(str(e)) + + failed = asyncio.run(qubesadmin.utils.unpause(domains=domains)) + action = "unpause/resume" + if failed: + exit_code = 1 + for qube, exc in failed.items(): + parser.print_error("Failed to {}: {}: {}".format(action, qube, exc)) return exit_code diff --git a/qubesadmin/utils.py b/qubesadmin/utils.py index 8b54d0ca..0a8cb788 100644 --- a/qubesadmin/utils.py +++ b/qubesadmin/utils.py @@ -26,6 +26,7 @@ from __future__ import annotations +import asyncio import fcntl import os import re @@ -256,3 +257,106 @@ def qbool(value: str | int | bool) -> bool: ) return bool(value) + + +async def start(domains: list[qubesadmin.vm.QubesVM], **kwargs): + """ + Asynchronously start qubes and return ones that failed. + """ + return await generic_action(domains, action="start", **kwargs) + + +async def pause(domains: list[qubesadmin.vm.QubesVM], **kwargs): + """ + Asynchronously pause qubes and return ones that failed. + """ + return await generic_action(domains, action="pause", **kwargs) + + +async def unpause(domains: list[qubesadmin.vm.QubesVM], **kwargs): + """ + Asynchronously unpause qubes and return ones that failed. + """ + return await generic_action( + domains, + action=lambda qube: "resume" if qube.is_suspended() else "unpause", + **kwargs, + ) + + +async def suspend(domains: list[qubesadmin.vm.QubesVM], **kwargs): + """ + Asynchronously suspend qubes and return ones that failed. + """ + return await generic_action(domains, action="suspend", **kwargs) + + +async def resume(domains: list[qubesadmin.vm.QubesVM], **kwargs): + """ + Asynchronously resume qubes and return ones that failed. + """ + return await generic_action(domains, action="resume", **kwargs) + + +async def shutdown(domains: list[qubesadmin.vm.QubesVM], **kwargs): + """ + Asynchronously resume shutdown and return ones that failed. + """ + return await generic_action( + domains, + action="shutdown", + ignored_exceptions=(qubesadmin.exc.QubesVMNotStartedError), + **kwargs, + ) + + +async def kill(domains: list[qubesadmin.vm.QubesVM], **kwargs): + """ + Asynchronously kill qubes and return ones that failed. + """ + return await generic_action( + domains, + action="kill", + ignored_exceptions=(qubesadmin.exc.QubesVMNotStartedError), + **kwargs, + ) + + +async def generic_action( + domains: list[qubesadmin.vm.QubesVM], + action: str | typing.Callable, + *args, + **kwargs, +): + """ + Asynchronously run action on qubes and return ones that failed. + """ + + def wrapper(qube, action): + func = None + if isinstance(action, str): + func = getattr(qube, action) + elif callable(action): + method = action(qube) + func = getattr(qube, method) + if not func: + raise ValueError("Invalid action provided") + return func + + ignored_exceptions: tuple = tuple() + if "ignored_exceptions" in kwargs: + ignored_exceptions = kwargs.pop("ignored_exceptions") + + tasks = [ + asyncio.to_thread(wrapper(qube, action), *args, **kwargs) + for qube in domains + ] + results = await asyncio.gather(*tasks, return_exceptions=True) + failed: dict[qubesadmin.vm.QubesVM, BaseException] = {} + for qube, res in zip(domains, results): + if not isinstance(res, BaseException): + continue + if isinstance(res, ignored_exceptions): + continue + failed[qube] = res + return failed From 209fd42ddaf2538363bfc2af6516da985c78b642 Mon Sep 17 00:00:00 2001 From: Ben Grande Date: Thu, 28 May 2026 13:13:24 +0200 Subject: [PATCH 08/10] Blacken qvm-start --- qubesadmin/tools/qvm_start.py | 183 ++++++++++++++++++++-------------- 1 file changed, 108 insertions(+), 75 deletions(-) diff --git a/qubesadmin/tools/qvm_start.py b/qubesadmin/tools/qvm_start.py index 9f8543c2..c4340e80 100644 --- a/qubesadmin/tools/qvm_start.py +++ b/qubesadmin/tools/qvm_start.py @@ -18,7 +18,7 @@ # You should have received a copy of the GNU Lesser General Public License along # with this program; if not, see . -'''qvm-start - start a domain''' +"""qvm-start - start a domain""" import asyncio import argparse import string @@ -32,20 +32,22 @@ import qubesadmin.exc import qubesadmin.tools + class DriveAction(argparse.Action): - '''Action for argument parser that stores drive image path.''' + """Action for argument parser that stores drive image path.""" # pylint: disable=redefined-builtin - def __init__(self, - option_strings, - dest='drive', - *, - prefix='cdrom:', - metavar='IMAGE', - required=False, - help='Attach drive'): - super().__init__(option_strings, dest, - metavar=metavar, help=help) + def __init__( + self, + option_strings, + dest="drive", + *, + prefix="cdrom:", + metavar="IMAGE", + required=False, + help="Attach drive", + ): + super().__init__(option_strings, dest, metavar=metavar, help=help) self.prefix = prefix def __call__(self, parser, namespace, values, option_string=None): @@ -54,30 +56,50 @@ def __call__(self, parser, namespace, values, option_string=None): parser = qubesadmin.tools.QubesArgumentParser( - description='start a domain', vmname_nargs='+') + description="start a domain", vmname_nargs="+" +) -parser.add_argument('--skip-if-running', - action='store_true', default=False, - help='Do not fail if the qube is already runnning') +parser.add_argument( + "--skip-if-running", + action="store_true", + default=False, + help="Do not fail if the qube is already runnning", +) parser_drive = parser.add_mutually_exclusive_group() -parser_drive.add_argument('--drive', metavar='DRIVE', - help='temporarily attach specified drive as CD/DVD or hard disk (can be' - ' specified with prefix "hd:" or "cdrom:", default is cdrom)') - -parser_drive.add_argument('--hddisk', - action=DriveAction, dest='drive', prefix='hd:', - help='temporarily attach specified drive as hard disk') - -parser_drive.add_argument('--cdrom', metavar='IMAGE', - action=DriveAction, dest='drive', prefix='cdrom:', - help='temporarily attach specified drive as CD/DVD') - -parser_drive.add_argument('--install-windows-tools', - action='store_const', dest='drive', default=False, - const='cdrom:dom0:/usr/lib/qubes/qubes-windows-tools.iso', - help='temporarily attach Windows tools CDROM to the domain') +parser_drive.add_argument( + "--drive", + metavar="DRIVE", + help="temporarily attach specified drive as CD/DVD or hard disk (can be" + ' specified with prefix "hd:" or "cdrom:", default is cdrom)', +) + +parser_drive.add_argument( + "--hddisk", + action=DriveAction, + dest="drive", + prefix="hd:", + help="temporarily attach specified drive as hard disk", +) + +parser_drive.add_argument( + "--cdrom", + metavar="IMAGE", + action=DriveAction, + dest="drive", + prefix="cdrom:", + help="temporarily attach specified drive as CD/DVD", +) + +parser_drive.add_argument( + "--install-windows-tools", + action="store_const", + dest="drive", + default=False, + const="cdrom:dom0:/usr/lib/qubes/qubes-windows-tools.iso", + help="temporarily attach Windows tools CDROM to the domain", +) def get_drive_assignment(app, drive_str): @@ -93,75 +115,86 @@ def get_drive_assignment(app, drive_str): :param drive_str: drive argument :return: DeviceAssignment matching *drive_str* """ - devtype = 'cdrom' - if drive_str.startswith('cdrom:'): - devtype = 'cdrom' - drive_str = drive_str[len('cdrom:'):] - elif drive_str.startswith('hd:'): - devtype = 'disk' - drive_str = drive_str[len('hd:'):] + devtype = "cdrom" + if drive_str.startswith("cdrom:"): + devtype = "cdrom" + drive_str = drive_str[len("cdrom:") :] + elif drive_str.startswith("hd:"): + devtype = "disk" + drive_str = drive_str[len("hd:") :] try: - backend_domain_name, port_id = drive_str.split(':', 1) + backend_domain_name, port_id = drive_str.split(":", 1) except ValueError: - raise ValueError("Incorrect image name: image must be in the format " - "of VMNAME:full_path, for example " - "dom0:/home/user/test.iso") + raise ValueError( + "Incorrect image name: image must be in the format " + "of VMNAME:full_path, for example " + "dom0:/home/user/test.iso" + ) try: backend_domain = app.domains[backend_domain_name] except KeyError: raise qubesadmin.exc.QubesVMNotFoundError( - 'No such VM: %s', backend_domain_name) - if port_id.startswith('/'): + "No such VM: %s", backend_domain_name + ) + if port_id.startswith("/"): # it is a path - if we're running in dom0, try to call losetup to # export the device, otherwise reject - if app.qubesd_connection_type == 'qrexec': + if app.qubesd_connection_type == "qrexec": raise qubesadmin.exc.QubesException( - 'Existing block device identifier needed when running from ' - 'outside of dom0 (see qvm-block)') + "Existing block device identifier needed when running from " + "outside of dom0 (see qvm-block)" + ) try: - if backend_domain.klass == 'AdminVM': + if backend_domain.klass == "AdminVM": loop_name = subprocess.check_output( - ['sudo', 'losetup', '-f', '--show', port_id]) + ["sudo", "losetup", "-f", "--show", port_id] + ) loop_name = loop_name.strip() else: untrusted_loop_name, _ = backend_domain.run_with_args( - 'losetup', '-f', '--show', port_id, - user='root') + "losetup", "-f", "--show", port_id, user="root" + ) untrusted_loop_name = untrusted_loop_name.strip() - allowed_chars = string.ascii_lowercase + string.digits + '/' - allowed_chars = allowed_chars.encode('ascii') + allowed_chars = string.ascii_lowercase + string.digits + "/" + allowed_chars = allowed_chars.encode("ascii") if not all(c in allowed_chars for c in untrusted_loop_name): raise qubesadmin.exc.QubesException( - 'Invalid loop device name received from {}'.format( - backend_domain.name)) + "Invalid loop device name received from {}".format( + backend_domain.name + ) + ) loop_name = untrusted_loop_name del untrusted_loop_name except subprocess.CalledProcessError: raise qubesadmin.exc.QubesException( - 'Failed to setup loop device for %s', port_id) - assert loop_name.startswith(b'/dev/loop') - port_id = loop_name.decode().split('/')[2] + "Failed to setup loop device for %s", port_id + ) + assert loop_name.startswith(b"/dev/loop") + port_id = loop_name.decode().split("/")[2] # wait for device to appear # FIXME: convert this to waiting for event timeout = 10 while isinstance( - backend_domain.devices['block'][port_id], UnknownDevice + backend_domain.devices["block"][port_id], UnknownDevice ): if timeout == 0: raise qubesadmin.exc.QubesException( - 'Timeout waiting for {}:{} device to appear'.format( - backend_domain.name, port_id)) + "Timeout waiting for {}:{} device to appear".format( + backend_domain.name, port_id + ) + ) timeout -= 1 time.sleep(1) - options = { - 'devtype': devtype, - 'read-only': devtype == 'cdrom' - } + options = {"devtype": devtype, "read-only": devtype == "cdrom"} assignment = DeviceAssignment.new( - backend_domain=backend_domain, port_id=port_id, devclass='block', - options=options, mode="required") + backend_domain=backend_domain, + port_id=port_id, + devclass="block", + options=options, + mode="required", + ) return assignment @@ -181,7 +214,7 @@ def startup(domain, args=None): if args.drive: drive_assignment = get_drive_assignment(args.app, args.drive) try: - domain.devices['block'].assign(drive_assignment) + domain.devices["block"].assign(drive_assignment) except Exception: drive_assignment = None raise @@ -190,11 +223,11 @@ def startup(domain, args=None): if drive_assignment: # don't reconnect this device after VM reboot - domain.devices['block'].unassign(drive_assignment) + domain.devices["block"].unassign(drive_assignment) except (IOError, OSError, qubesadmin.exc.QubesException, ValueError) as e: if drive_assignment: try: - domain.devices['block'].detach(drive_assignment) + domain.devices["block"].detach(drive_assignment) except qubesadmin.exc.QubesException: pass raise e @@ -213,19 +246,19 @@ async def run_async(args=None, app=None): if isinstance(res, BaseException): exit_code = 1 parser.print_error( - 'Starting qube failed: {}: {}'.format(qube.name, str(res)) + "Starting qube failed: {}: {}".format(qube.name, str(res)) ) return exit_code def main(args=None, app=None): - '''Main routine of :program:`qvm-start`. + """Main routine of :program:`qvm-start`. :param list args: Optional arguments to override those delivered from \ command line. - ''' + """ return asyncio.run(run_async(args=args, app=app)) -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(main()) From 2743424acebd71acbaaa7a83b2690ef5e97a8e96 Mon Sep 17 00:00:00 2001 From: Ben Grande Date: Thu, 28 May 2026 13:40:00 +0200 Subject: [PATCH 09/10] Make qvm-start drive assignment reusable Qube Manager is calling qvm_start.main on 3 separate locations, and that function should never have been called from other modules because it's written to expect a shell client, and exception raised are converted to what a shell user needs to consume. With the necessity of using asyncio.run in the main function, it broke Qube Manager as an event loop was already running, therefore make it use function intended for other python modules via utils. --- qubesadmin/exc.py | 5 + qubesadmin/tools/qvm_start.py | 172 ++-------------------------------- qubesadmin/utils.py | 160 ++++++++++++++++++++++++++++++- 3 files changed, 173 insertions(+), 164 deletions(-) diff --git a/qubesadmin/exc.py b/qubesadmin/exc.py index 282d318b..6b374ee9 100644 --- a/qubesadmin/exc.py +++ b/qubesadmin/exc.py @@ -101,6 +101,10 @@ class QubesNoTemplateError(QubesVMError): """Cannot start domain, because there is no template""" +class QubesVMAlreadyStartedError(QubesVMError): + """Requested qube to start, but it's already running""" + + class QubesPoolInUseError(QubesException): """VM is in use, cannot remove.""" @@ -236,5 +240,6 @@ def __init__(self, prop: str): class QubesNotesError(QubesException): """Some problem with qube notes.""" + # legacy name QubesDaemonNoResponseError = QubesDaemonAccessError diff --git a/qubesadmin/tools/qvm_start.py b/qubesadmin/tools/qvm_start.py index c4340e80..dba9c4b5 100644 --- a/qubesadmin/tools/qvm_start.py +++ b/qubesadmin/tools/qvm_start.py @@ -20,39 +20,11 @@ """qvm-start - start a domain""" import asyncio -import argparse -import string import sys -import subprocess - -import time - -from qubesadmin.device_protocol import DeviceAssignment, UnknownDevice import qubesadmin.exc import qubesadmin.tools - - -class DriveAction(argparse.Action): - """Action for argument parser that stores drive image path.""" - - # pylint: disable=redefined-builtin - def __init__( - self, - option_strings, - dest="drive", - *, - prefix="cdrom:", - metavar="IMAGE", - required=False, - help="Attach drive", - ): - super().__init__(option_strings, dest, metavar=metavar, help=help) - self.prefix = prefix - - def __call__(self, parser, namespace, values, option_string=None): - # pylint: disable=redefined-outer-name - setattr(namespace, self.dest, self.prefix + values) +import qubesadmin.utils parser = qubesadmin.tools.QubesArgumentParser( @@ -77,7 +49,7 @@ def __call__(self, parser, namespace, values, option_string=None): parser_drive.add_argument( "--hddisk", - action=DriveAction, + action=qubesadmin.utils.DriveAction, dest="drive", prefix="hd:", help="temporarily attach specified drive as hard disk", @@ -86,7 +58,7 @@ def __call__(self, parser, namespace, values, option_string=None): parser_drive.add_argument( "--cdrom", metavar="IMAGE", - action=DriveAction, + action=qubesadmin.utils.DriveAction, dest="drive", prefix="cdrom:", help="temporarily attach specified drive as CD/DVD", @@ -102,142 +74,16 @@ def __call__(self, parser, namespace, values, option_string=None): ) -def get_drive_assignment(app, drive_str): - """ - Prepare :py:class:`qubesadmin.device_protocol.DeviceAssignment` object for a - given drive. - - If running in dom0, it will also take care about creating the appropriate - loop device (if necessary). Otherwise, only existing block devices are - supported. - - :param app: Qubes() instance - :param drive_str: drive argument - :return: DeviceAssignment matching *drive_str* - """ - devtype = "cdrom" - if drive_str.startswith("cdrom:"): - devtype = "cdrom" - drive_str = drive_str[len("cdrom:") :] - elif drive_str.startswith("hd:"): - devtype = "disk" - drive_str = drive_str[len("hd:") :] - - try: - backend_domain_name, port_id = drive_str.split(":", 1) - except ValueError: - raise ValueError( - "Incorrect image name: image must be in the format " - "of VMNAME:full_path, for example " - "dom0:/home/user/test.iso" - ) - try: - backend_domain = app.domains[backend_domain_name] - except KeyError: - raise qubesadmin.exc.QubesVMNotFoundError( - "No such VM: %s", backend_domain_name - ) - if port_id.startswith("/"): - # it is a path - if we're running in dom0, try to call losetup to - # export the device, otherwise reject - if app.qubesd_connection_type == "qrexec": - raise qubesadmin.exc.QubesException( - "Existing block device identifier needed when running from " - "outside of dom0 (see qvm-block)" - ) - try: - if backend_domain.klass == "AdminVM": - loop_name = subprocess.check_output( - ["sudo", "losetup", "-f", "--show", port_id] - ) - loop_name = loop_name.strip() - else: - untrusted_loop_name, _ = backend_domain.run_with_args( - "losetup", "-f", "--show", port_id, user="root" - ) - untrusted_loop_name = untrusted_loop_name.strip() - allowed_chars = string.ascii_lowercase + string.digits + "/" - allowed_chars = allowed_chars.encode("ascii") - if not all(c in allowed_chars for c in untrusted_loop_name): - raise qubesadmin.exc.QubesException( - "Invalid loop device name received from {}".format( - backend_domain.name - ) - ) - loop_name = untrusted_loop_name - del untrusted_loop_name - except subprocess.CalledProcessError: - raise qubesadmin.exc.QubesException( - "Failed to setup loop device for %s", port_id - ) - assert loop_name.startswith(b"/dev/loop") - port_id = loop_name.decode().split("/")[2] - # wait for device to appear - # FIXME: convert this to waiting for event - timeout = 10 - while isinstance( - backend_domain.devices["block"][port_id], UnknownDevice - ): - if timeout == 0: - raise qubesadmin.exc.QubesException( - "Timeout waiting for {}:{} device to appear".format( - backend_domain.name, port_id - ) - ) - timeout -= 1 - time.sleep(1) - - options = {"devtype": devtype, "read-only": devtype == "cdrom"} - assignment = DeviceAssignment.new( - backend_domain=backend_domain, - port_id=port_id, - devclass="block", - options=options, - mode="required", - ) - - return assignment - - -class QubesVMAlreadyRunningError(qubesadmin.exc.QubesVMError): - """Requested qube to start, but it's already running""" - - -def startup(domain, args=None): - # pylint: disable=missing-function-docstring - if domain.is_running(): - if args.skip_if_running: - return - raise QubesVMAlreadyRunningError("Domain is already running") - drive_assignment = None - try: - if args.drive: - drive_assignment = get_drive_assignment(args.app, args.drive) - try: - domain.devices["block"].assign(drive_assignment) - except Exception: - drive_assignment = None - raise - - domain.start() - - if drive_assignment: - # don't reconnect this device after VM reboot - domain.devices["block"].unassign(drive_assignment) - except (IOError, OSError, qubesadmin.exc.QubesException, ValueError) as e: - if drive_assignment: - try: - domain.devices["block"].detach(drive_assignment) - except qubesadmin.exc.QubesException: - pass - raise e - - async def run_async(args=None, app=None): # pylint: disable=missing-function-docstring args = parser.parse_args(args, app=app) tasks = [ - asyncio.to_thread(startup, domain=qube, args=args) + asyncio.to_thread( + qubesadmin.utils.start_expert, + domain=qube, + skip_if_running=args.skip_if_running, + drive=args.drive, + ) for qube in args.domains ] results = await asyncio.gather(*tasks, return_exceptions=True) diff --git a/qubesadmin/utils.py b/qubesadmin/utils.py index 0a8cb788..062136fc 100644 --- a/qubesadmin/utils.py +++ b/qubesadmin/utils.py @@ -26,15 +26,20 @@ from __future__ import annotations +import argparse import asyncio import fcntl import os import re +import string +import subprocess +import time import typing from collections.abc import Iterable import qubesadmin.exc -from qubesadmin.exc import QubesValueError +from qubesadmin.exc import QubesValueError, QubesVMAlreadyStartedError +from qubesadmin.device_protocol import DeviceAssignment, UnknownDevice if typing.TYPE_CHECKING: from qubesadmin.app import QubesBase @@ -360,3 +365,156 @@ def wrapper(qube, action): continue failed[qube] = res return failed + + +class DriveAction(argparse.Action): + """Action for argument parser that stores drive image path.""" + + # pylint: disable=redefined-builtin + def __init__( + self, + option_strings, + dest="drive", + *, + prefix="cdrom:", + metavar="IMAGE", + required=False, + help="Attach drive", + ): + super().__init__(option_strings, dest, metavar=metavar, help=help) + self.prefix = prefix + + def __call__(self, parser, namespace, values, option_string=None): + # pylint: disable=redefined-outer-name + setattr(namespace, self.dest, self.prefix + values) + + +def get_drive_assignment(app, drive_str): + """ + Prepare :py:class:`qubesadmin.device_protocol.DeviceAssignment` object for a + given drive. + + If running in dom0, it will also take care about creating the appropriate + loop device (if necessary). Otherwise, only existing block devices are + supported. + + :param app: Qubes() instance + :param drive_str: drive argument + :return: DeviceAssignment matching *drive_str* + """ + devtype = "cdrom" + if drive_str.startswith("cdrom:"): + devtype = "cdrom" + drive_str = drive_str[len("cdrom:") :] + elif drive_str.startswith("hd:"): + devtype = "disk" + drive_str = drive_str[len("hd:") :] + + try: + backend_domain_name, port_id = drive_str.split(":", 1) + except ValueError: + raise ValueError( + "Incorrect image name: image must be in the format " + "of VMNAME:full_path, for example " + "dom0:/home/user/test.iso" + ) + try: + backend_domain = app.domains[backend_domain_name] + except KeyError: + raise qubesadmin.exc.QubesVMNotFoundError( + "No such VM: %s", backend_domain_name + ) + if port_id.startswith("/"): + # it is a path - if we're running in dom0, try to call losetup to + # export the device, otherwise reject + if app.qubesd_connection_type == "qrexec": + raise qubesadmin.exc.QubesException( + "Existing block device identifier needed when running from " + "outside of dom0 (see qvm-block)" + ) + try: + if backend_domain.klass == "AdminVM": + loop_name = subprocess.check_output( + ["sudo", "losetup", "-f", "--show", port_id] + ) + loop_name = loop_name.strip() + else: + untrusted_loop_name, _ = backend_domain.run_with_args( + "losetup", "-f", "--show", port_id, user="root" + ) + untrusted_loop_name = untrusted_loop_name.strip() + allowed_chars = string.ascii_lowercase + string.digits + "/" + allowed_chars = allowed_chars.encode("ascii") + if not all(c in allowed_chars for c in untrusted_loop_name): + raise qubesadmin.exc.QubesException( + "Invalid loop device name received from {}".format( + backend_domain.name + ) + ) + loop_name = untrusted_loop_name + del untrusted_loop_name + except subprocess.CalledProcessError: + raise qubesadmin.exc.QubesException( + "Failed to setup loop device for %s", port_id + ) + assert loop_name.startswith(b"/dev/loop") + port_id = loop_name.decode().split("/")[2] + # wait for device to appear + # FIXME: convert this to waiting for event + timeout = 10 + while isinstance( + backend_domain.devices["block"][port_id], UnknownDevice + ): + if timeout == 0: + raise qubesadmin.exc.QubesException( + "Timeout waiting for {}:{} device to appear".format( + backend_domain.name, port_id + ) + ) + timeout -= 1 + time.sleep(1) + + options = {"devtype": devtype, "read-only": devtype == "cdrom"} + assignment = DeviceAssignment.new( + backend_domain=backend_domain, + port_id=port_id, + devclass="block", + options=options, + mode="required", + ) + + return assignment + + +def start_expert( + domain, skip_if_running: bool = False, drive: str | None = None +): + """ + Providing a drive in argument is not required. + """ + if domain.is_running(): + if skip_if_running: + return + raise QubesVMAlreadyStartedError("Domain is already running") + drive_assignment = None + try: + if drive: + drive_assignment = get_drive_assignment(domain.app, drive) + try: + domain.devices["block"].assign(drive_assignment) + except Exception: + drive_assignment = None + raise + + domain.start() + + if drive_assignment: + # don't reconnect this device after VM reboot + domain.devices["block"].unassign(drive_assignment) + except (IOError, OSError, qubesadmin.exc.QubesException, ValueError) as e: + if drive_assignment: + try: + domain.devices["block"].detach(drive_assignment) + except qubesadmin.exc.QubesException: + pass + raise e From 3d421e7b673cfd8095c64e8e6cb9c396d63b78ef Mon Sep 17 00:00:00 2001 From: Ben Grande Date: Fri, 29 May 2026 07:50:19 +0200 Subject: [PATCH 10/10] Move utils to its own module Prepare migration of each action function to its own module. --- qubesadmin/{utils.py => utils/__init__.py} | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) rename qubesadmin/{utils.py => utils/__init__.py} (98%) diff --git a/qubesadmin/utils.py b/qubesadmin/utils/__init__.py similarity index 98% rename from qubesadmin/utils.py rename to qubesadmin/utils/__init__.py index 062136fc..144141fc 100644 --- a/qubesadmin/utils.py +++ b/qubesadmin/utils/__init__.py @@ -368,7 +368,8 @@ def wrapper(qube, action): class DriveAction(argparse.Action): - """Action for argument parser that stores drive image path.""" + """Action for argument parser that stores drive image path. + Intended use for device attachment before domain is started.""" # pylint: disable=redefined-builtin def __init__( @@ -392,7 +393,7 @@ def __call__(self, parser, namespace, values, option_string=None): def get_drive_assignment(app, drive_str): """ Prepare :py:class:`qubesadmin.device_protocol.DeviceAssignment` object for a - given drive. + given drive. Intended to be used during before domain is started. If running in dom0, it will also take care about creating the appropriate loop device (if necessary). Otherwise, only existing block devices are @@ -490,7 +491,7 @@ def start_expert( domain, skip_if_running: bool = False, drive: str | None = None ): """ - Providing a drive in argument is not required. + Start the domain, optionally specifying a drive. """ if domain.is_running(): if skip_if_running: