@@ -39,8 +39,7 @@ def __init__(
3939 self ._check_mirrors ()
4040
4141 # Will hold a list of all the gpg keys (public and private)
42- # self._keys: Optional[List[pathlib.Path]] = []
43- self ._keys = self ._key_init ()
42+ self ._keys : Optional [List [pathlib .Path ]] = []
4443
4544 def _load_mirrors (self , cmdline_cache : Optional [pathlib .Path ]) -> Dict [str , Dict ]:
4645 """Load the mirrors file, if one exists."""
@@ -156,12 +155,14 @@ def _check_mirrors(self):
156155 f"Check the url listed in mirrors.yaml in system config. \n { err } "
157156 )
158157
159- def key_files (self , config_root : pathlib .Path ):
158+ @property
159+ def keys (self ):
160160 """Return the list of public and private key file paths."""
161+
161162 if self ._keys is None :
162163 raise RuntimeError ("The mirror.keys method was accessed before setup_configs() was called." )
163- key_dir = config_root / self . KEY_STORE_DIR
164- return [ key_dir / info [ "path" ] for info in self ._keys ]
164+
165+ return self ._keys
165166
166167 def setup_configs (self , config_root : pathlib .Path ):
167168 """Setup all mirror configs in the given config_root."""
@@ -230,67 +231,64 @@ def _create_bootstrap_configs(self, config_root: pathlib.Path):
230231 with (config_root / "bootstrap.yaml" ).open ("w" ) as file :
231232 yaml .dump (bootstrap_yaml , file , default_flow_style = False )
232233
233- def _key_init (self ):
234- key_info = {}
235-
236- key = self .mirrors ["buildcache" ].get ("private_key" )
234+ def _load_key (self , key : str , dest : pathlib .Path , name : str ):
235+ """Validate mirror keys, relocate to key_store, and update mirror config with new key paths."""
237236
238- if key is None :
239- return
237+ # key will be saved under key_store/mirror_name.[pub/priv].gpg
240238
241239 # if path, check if abs path, if not, append sys config path in front and check again
242240 path = pathlib .Path (os .path .expandvars (key ))
243-
244241 if not path .is_absolute ():
245242 # try prepending system config path
246243 path = self ._system_config_root / path
247244
248245 if path .is_file ():
249- # use the user-provided file
250- key_info = {"path" : pathlib .Path (f"buildcache.pgp" ), "source" : path }
246+ with open (path , "rb" ) as reader :
247+ binary_key = reader .read ()
248+
249+ # convert base64 key to binary
251250 else :
252- # convert base64 key to binary
253251 try :
254- binary_key = base64 .b64decode (key , validate = True )
255- print (binary_key )
252+ binary_key = base64 .b64decode (key )
256253 except ValueError :
257254 raise MirrorError (
258- f"Key for mirror 'buildcache ' is not valid. \n "
255+ f"Key for mirror '{ name } ' is not valid: ' { path } ' . \n "
259256 f"Must be a path to a GPG public key or a base64 encoded GPG public key. \n "
260257 f"Check the key listed in mirrors.yaml in system config."
261258 )
262259
263- file_type = magic .from_buffer (binary_key , mime = True )
264- if file_type not in ("application/x-gnupg-keyring" , "application/pgp-keys" ):
265- raise MirrorError (
266- f"Key for mirror 'buildcache' is not a valid GPG key. \n "
267- f"The file (or base64) was readable, but the data itself was not a PGP key.\n "
268- f"Check the key listed in mirrors.yaml in system config."
269- )
270-
271- key_info = {"path" : pathlib .Path ("buildcache.pgp" ), "source" : binary_key }
260+ # private keys will evaluate as "application/octet-stream"
261+ file_type = magic .from_buffer (binary_key , mime = True )
262+ if file_type not in ("application/x-gnupg-keyring" , "application/pgp-keys" , "application/octet-stream" ):
263+ raise MirrorError (
264+ f"Key for mirror { name } is not a valid GPG key. \n "
265+ f"The file (or base64) was readable, but the data itself was not a PGP key.\n "
266+ f"Check the key listed in mirrors.yaml in system config."
267+ )
272268
273- return key_info
269+ # copy key to new destination in key store
270+ with open (dest , "wb" ) as writer :
271+ writer .write (binary_key )
274272
273+ self ._keys .append (dest )
274+
275+
275276 def _key_setup (self , key_store : pathlib .Path ):
276- """Validate mirror keys, relocate to key_store, and update mirror config with new key paths. """
277+ """Iterate through mirror keys and load + relocate each one to key_store """
277278
279+ self ._keys = []
278280 key_store .mkdir (exist_ok = True )
279281
280- #for key_info in self._keys:
281-
282- #path = key_store / key_info["path"]
283- path = key_store / self ._keys ["path" ]
284- #source = key_info["source"]
285- source = self ._keys ["source" ]
286-
287- match source :
288- case pathlib .Path ():
289- # copy source -> path
290- shutil .copy2 (source , path )
291- case bytes ():
292- # open path and copy in bytes
293- with open (path , "wb" ) as writer :
294- writer .write (source )
295- case _:
296- raise TypeError (f"Expected Path or bytes, got { type (source ).__name__ } " )
282+ for name , mirror in self .mirrors .items ():
283+ if name == "buildcache" :
284+ if mirror .get ("private_key" ):
285+ key = mirror ["private_key" ]
286+ dest = pathlib .Path (key_store / f"{ name } .priv.gpg" )
287+ self ._load_key (key , dest , name )
288+
289+ if mirror .get ("public_key" ) is None :
290+ continue
291+
292+ key = mirror ["public_key" ]
293+ dest = pathlib .Path (key_store / f"{ name } .pub.gpg" )
294+ self ._load_key (key , dest , name )
0 commit comments