1919# with this program; if not, see <http://www.gnu.org/licenses/>.
2020
2121'''Base classes for managed objects'''
22+ from __future__ import annotations
23+
24+ import typing
25+ from typing import BinaryIO , Any , TypeAlias , TypeVar , Generic
26+ from collections .abc import Generator
2227
2328import qubesadmin .exc
2429
30+ if typing .TYPE_CHECKING :
31+ from qubesadmin .vm import QubesVM
32+ from qubesadmin .app import QubesBase
33+
2534DEFAULT = object ()
2635
36+ # We use Any because the dynamic metatada handling of the current code
37+ # is too complex for type checkers otherwise
38+ VMProperty : TypeAlias = Any # noqa: ANN401
39+
2740
2841class PropertyHolder :
2942 '''A base class for object having properties retrievable using mgmt API.
@@ -34,28 +47,29 @@ class PropertyHolder:
3447 '''
3548 #: a place for appropriate Qubes() object (QubesLocal or QubesRemote),
3649 # use None for self
37- app = None
50+ app : QubesBase
3851
39- def __init__ (self , app , method_prefix , method_dest ):
52+ def __init__ (self , app : QubesBase , method_prefix : str , method_dest : str ):
4053 #: appropriate Qubes() object (QubesLocal or QubesRemote), use None
4154 # for self
4255 self .app = app
4356 self ._method_prefix = method_prefix
4457 self ._method_dest = method_dest
45- self ._properties = None
58+ self ._properties : list [ str ] | None = None
4659 self ._properties_help = None
4760 # the cache is maintained by EventsDispatcher(),
4861 # through helper functions in QubesBase()
4962 self ._properties_cache = {}
5063
51- def clear_cache (self ):
64+ def clear_cache (self ) -> None :
5265 """
5366 Clear property cache.
5467 """
5568 self ._properties_cache = {}
5669
57- def qubesd_call (self , dest , method , arg = None , payload = None ,
58- payload_stream = None ):
70+ def qubesd_call (self , dest : str | None , method : str ,
71+ arg : str | None = None , payload : bytes | None = None ,
72+ payload_stream : BinaryIO | None = None ) -> bytes :
5973 '''
6074 Call into qubesd using appropriate mechanism. This method should be
6175 defined by a subclass.
@@ -69,26 +83,23 @@ def qubesd_call(self, dest, method, arg=None, payload=None,
6983 :param payload_stream: file-like object to read payload from
7084 :return: Data returned by qubesd (string)
7185 '''
72- if not self .app :
73- raise NotImplementedError
74- if dest is None :
75- dest = self ._method_dest
86+ dest : str = dest or self ._method_dest
7687 if (
7788 getattr (self , "_redirect_dispvm_calls" , False )
7889 and dest .startswith ("@dispvm" )
7990 ):
8091 if dest .startswith ("@dispvm:" ):
8192 dest = dest [len ("@dispvm:" ) :]
8293 else :
83- dest = getattr (self .app , "default_dispvm" , None )
94+ dest : QubesVM | None = getattr (self .app , "default_dispvm" , None )
8495 if dest :
8596 dest = dest .name
8697 # have the actual implementation at Qubes() instance
8798 return self .app .qubesd_call (dest , method , arg , payload ,
8899 payload_stream )
89100
90101 @staticmethod
91- def _parse_qubesd_response (response_data ) :
102+ def _parse_qubesd_response (response_data : bytes ) -> bytes :
92103 '''Parse response from qubesd.
93104
94105 In case of success, return actual data. In case of error,
@@ -122,7 +133,7 @@ def _parse_qubesd_response(response_data):
122133 raise qubesadmin .exc .QubesDaemonCommunicationError (
123134 'Invalid response format' )
124135
125- def property_list (self ):
136+ def property_list (self ) -> list [ str ] :
126137 '''
127138 List available properties (their names).
128139
@@ -138,7 +149,7 @@ def property_list(self):
138149 # TODO: make it somehow immutable
139150 return self ._properties
140151
141- def property_help (self , name ) :
152+ def property_help (self , name : str ) -> str :
142153 '''
143154 Get description of a property.
144155
@@ -151,7 +162,7 @@ def property_help(self, name):
151162 None )
152163 return help_text .decode ('ascii' )
153164
154- def property_is_default (self , item ) :
165+ def property_is_default (self , item : str ) -> bool :
155166 '''
156167 Check if given property have default value
157168
@@ -183,7 +194,7 @@ def property_is_default(self, item):
183194 self ._properties_cache [item ] = (is_default , value )
184195 return is_default
185196
186- def property_get_default (self , item ) :
197+ def property_get_default (self , item : str ) -> VMProperty :
187198 '''
188199 Get default property value, regardless of the current value
189200
@@ -206,7 +217,8 @@ def property_get_default(self, item):
206217 (prop_type , value ) = property_str .split (b' ' , 1 )
207218 return self ._parse_type_value (prop_type , value )
208219
209- def clone_properties (self , src , proplist = None ):
220+ def clone_properties (self , src : PropertyHolder ,
221+ proplist : list [str ] | None = None ) -> None :
210222 '''Clone properties from other object.
211223
212224 :param PropertyHolder src: source object
@@ -223,7 +235,7 @@ def clone_properties(self, src, proplist=None):
223235 except AttributeError :
224236 continue
225237
226- def __getattr__ (self , item ) :
238+ def __getattr__ (self , item : str ) -> VMProperty :
227239 if item .startswith ('_' ):
228240 raise AttributeError (item )
229241 # pre-fill cache if enabled
@@ -254,7 +266,8 @@ def __getattr__(self, item):
254266 raise AttributeError (item )
255267 return value
256268
257- def _deserialize_property (self , api_response ):
269+ def _deserialize_property (self , api_response : bytes ) \
270+ -> tuple [bool , VMProperty ]:
258271 """
259272 Deserialize property.Get response format
260273 :param api_response: bytes, as retrieved from qubesd
@@ -267,7 +280,7 @@ def _deserialize_property(self, api_response):
267280 value = self ._parse_type_value (prop_type , value )
268281 return is_default , value
269282
270- def _parse_type_value (self , prop_type , value ) :
283+ def _parse_type_value (self , prop_type : bytes , value : bytes ) -> VMProperty :
271284 '''
272285 Parse `type=... ...` qubesd response format. Return a value of
273286 appropriate type.
@@ -278,20 +291,23 @@ def _parse_type_value(self, prop_type, value):
278291 :return: parsed value
279292 '''
280293 # pylint: disable=too-many-return-statements
281- prop_type = prop_type .decode ('ascii' )
294+ prop_type : str = prop_type .decode ('ascii' )
282295 if not prop_type .startswith ('type=' ):
283296 raise qubesadmin .exc .QubesDaemonCommunicationError (
284297 'Invalid type prefix received: {}' .format (prop_type ))
285298 (_ , prop_type ) = prop_type .split ('=' , 1 )
286- value = value .decode ()
299+ value : str = value .decode ()
287300 if prop_type == 'str' :
288301 return str (value )
289302 if prop_type == 'bool' :
290303 if value == '' :
304+ # TODO shouldn't that at least be ValueError ?
305+ # but then we need to properly propagate that modification
291306 return AttributeError
292307 return value == "True"
293308 if prop_type == 'int' :
294309 if value == '' :
310+ # TODO same as above
295311 return AttributeError
296312 return int (value )
297313 if prop_type == 'vm' :
@@ -305,7 +321,7 @@ def _parse_type_value(self, prop_type, value):
305321 raise qubesadmin .exc .QubesDaemonCommunicationError (
306322 'Received invalid value type: {}' .format (prop_type ))
307323
308- def _fetch_all_properties (self ):
324+ def _fetch_all_properties (self ) -> None :
309325 """
310326 Retrieve all properties values at once using (prefix).property.GetAll
311327 method. If it succeed, save retrieved values in the properties cache.
@@ -315,7 +331,7 @@ def _fetch_all_properties(self):
315331 :return: None
316332 """
317333
318- def unescape (line ) :
334+ def unescape (line : bytes ) -> Generator [ int ] :
319335 """Handle \\ -escaped values, generates a list of character codes"""
320336 escaped = False
321337 for char in line :
@@ -342,15 +358,15 @@ def unescape(line):
342358 return
343359 for line in properties_str .splitlines ():
344360 # decode newlines
345- line = bytes (unescape (line ))
346- name , property_str = line .split (b' ' , 1 )
361+ line_bytes = bytes (list ( unescape (line ) ))
362+ name , property_str = line_bytes .split (b' ' , 1 )
347363 name = name .decode ()
348364 is_default , value = self ._deserialize_property (property_str )
349365 self ._properties_cache [name ] = (is_default , value )
350366 self ._properties = list (self ._properties_cache .keys ())
351367
352368 @classmethod
353- def _local_properties (cls ):
369+ def _local_properties (cls ) -> set :
354370 '''
355371 Get set of property names that are properties on the Python object,
356372 and must not be set on the remote object
@@ -367,7 +383,7 @@ def _local_properties(cls):
367383
368384 return cls ._local_properties_set
369385
370- def __setattr__ (self , key , value ):
386+ def __setattr__ (self , key : str , value : typing . Any ) -> None : # noqa: ANN401
371387 if key .startswith ('_' ) or key in self ._local_properties ():
372388 return super ().__setattr__ (key , value )
373389 if value is qubesadmin .DEFAULT :
@@ -381,7 +397,9 @@ def __setattr__(self, key, value):
381397 qubesadmin .exc .QubesVMNotFoundError ):
382398 raise qubesadmin .exc .QubesPropertyAccessError (key )
383399 else :
384- if isinstance (value , qubesadmin .vm .QubesVM ):
400+ # Dynamic import because qubesadmin.vm imports base.py
401+ from qubesadmin .vm import QubesVM
402+ if isinstance (value , QubesVM ):
385403 value = value .name
386404 if value is None :
387405 value = ''
@@ -395,7 +413,7 @@ def __setattr__(self, key, value):
395413 qubesadmin .exc .QubesVMNotFoundError ):
396414 raise qubesadmin .exc .QubesPropertyAccessError (key )
397415
398- def __delattr__ (self , name ) :
416+ def __delattr__ (self , name : str ) -> None :
399417 if name .startswith ('_' ) or name in self ._local_properties ():
400418 return super ().__delattr__ (name )
401419 try :
@@ -408,10 +426,13 @@ def __delattr__(self, name):
408426 qubesadmin .exc .QubesVMNotFoundError ):
409427 raise qubesadmin .exc .QubesPropertyAccessError (name )
410428
429+ WrapperObjectsCollectionKey : TypeAlias = int | str
430+ T = TypeVar ('T' )
411431
412- class WrapperObjectsCollection :
432+ class WrapperObjectsCollection ( Generic [ T ]) :
413433 '''Collection of simple named objects'''
414- def __init__ (self , app , list_method , object_class ):
434+ def __init__ (self , app : QubesBase ,
435+ list_method : str , object_class : type [T ]):
415436 '''
416437 Construct manager of named wrapper objects.
417438
@@ -425,11 +446,13 @@ def __init__(self, app, list_method, object_class):
425446 self ._list_method = list_method
426447 self ._object_class = object_class
427448 #: names cache
428- self ._names_list = None
449+ self ._names_list : list [ WrapperObjectsCollectionKey ] | None = None
429450 #: returned objects cache
430- self ._objects = {}
451+ self ._objects : dict [ WrapperObjectsCollectionKey , T ] = {}
431452
432- def clear_cache (self , invalidate_name = None ):
453+ def clear_cache (self ,
454+ invalidate_name : WrapperObjectsCollectionKey | None = None )\
455+ -> None :
433456 """Clear cached list of names.
434457 If *invalidate_name* is given, remove that object from cache
435458 explicitly too.
@@ -438,7 +461,7 @@ def clear_cache(self, invalidate_name=None):
438461 if invalidate_name :
439462 self ._objects .pop (invalidate_name , None )
440463
441- def refresh_cache (self , force = False ):
464+ def refresh_cache (self , force : bool = False ) -> None :
442465 '''Refresh cached list of names'''
443466 if not force and self ._names_list is not None :
444467 return
@@ -447,17 +470,18 @@ def refresh_cache(self, force=False):
447470 assert list_data [- 1 ] == '\n '
448471 self ._names_list = [str (name ) for name in list_data [:- 1 ].splitlines ()]
449472
450- for name , obj in list (self ._objects .items ()):
473+ for name , obj in self ._objects .items ():
474+ assert hasattr (obj , "name" )
451475 if obj .name not in self ._names_list :
452476 # Object no longer exists
453477 del self ._objects [name ]
454478
455- def __getitem__ (self , item ) :
479+ def __getitem__ (self , item : WrapperObjectsCollectionKey ) -> T :
456480 if not self .app .blind_mode and item not in self :
457481 raise KeyError (item )
458482 return self .get_blind (item )
459483
460- def get_blind (self , item ) :
484+ def get_blind (self , item : WrapperObjectsCollectionKey ) -> T :
461485 '''
462486 Get a property without downloading the list
463487 and checking if it's present
@@ -466,25 +490,30 @@ def get_blind(self, item):
466490 self ._objects [item ] = self ._object_class (self .app , item )
467491 return self ._objects [item ]
468492
469- def __contains__ (self , item ) :
493+ def __contains__ (self , item : WrapperObjectsCollectionKey ) -> bool :
470494 self .refresh_cache ()
495+ assert self ._names_list is not None
471496 return item in self ._names_list
472497
473- def __iter__ (self ):
498+ def __iter__ (self ) -> Generator [ WrapperObjectsCollectionKey ] :
474499 self .refresh_cache ()
500+ assert self ._names_list is not None
475501 yield from self ._names_list
476502
477- def keys (self ):
503+ def keys (self ) -> list [ WrapperObjectsCollectionKey ] :
478504 '''Get list of names.'''
479505 self .refresh_cache ()
506+ assert self ._names_list is not None
480507 return list (self ._names_list )
481508
482- def items (self ):
509+ def items (self ) -> list [ tuple [ WrapperObjectsCollectionKey , T ]] :
483510 '''Get list of (key, value) pairs'''
484511 self .refresh_cache ()
512+ assert self ._names_list is not None
485513 return [(key , self .get_blind (key )) for key in self ._names_list ]
486514
487- def values (self ):
515+ def values (self ) -> list [ T ] :
488516 '''Get list of objects'''
489517 self .refresh_cache ()
518+ assert self ._names_list is not None
490519 return [self .get_blind (key ) for key in self ._names_list ]
0 commit comments