2727KEYSTORE_FILENAME = "mavlink_signing_keys.json"
2828KEYSTORE_VERSION = 1
2929
30+ # PBKDF2 iteration counts
31+ # File storage uses lower iterations as vehicle_id provides limited entropy
32+ PBKDF2_ITERATIONS_FILE = 480_000
33+ # Export/import uses higher iterations for password-based encryption
34+ PBKDF2_ITERATIONS_EXPORT = 600_000
35+
3036
3137@dataclass
3238class StoredKey :
@@ -39,12 +45,12 @@ class StoredKey:
3945 salt : str # Base64 encoded salt for encryption
4046 description : str = ""
4147
42- def to_dict (self ) -> dict :
48+ def to_dict (self ) -> dict [ str , str ] :
4349 """Convert to dictionary for JSON serialization."""
4450 return asdict (self )
4551
4652 @classmethod
47- def from_dict (cls , data : dict ) -> "StoredKey" :
53+ def from_dict (cls , data : dict [ str , str ] ) -> "StoredKey" :
4854 """Create from dictionary."""
4955 return cls (** data )
5056
@@ -56,18 +62,19 @@ class KeystoreData:
5662 version : int = KEYSTORE_VERSION
5763 keys : dict [str , StoredKey ] = field (default_factory = dict )
5864
59- def to_dict (self ) -> dict :
65+ def to_dict (self ) -> dict [ str , object ] :
6066 """Convert to dictionary for JSON serialization."""
6167 return {
6268 "version" : self .version ,
6369 "keys" : {k : v .to_dict () for k , v in self .keys .items ()},
6470 }
6571
6672 @classmethod
67- def from_dict (cls , data : dict ) -> "KeystoreData" :
73+ def from_dict (cls , data : dict [ str , object ] ) -> "KeystoreData" :
6874 """Create from dictionary."""
69- keys = {k : StoredKey .from_dict (v ) for k , v in data .get ("keys" , {}).items ()}
70- return cls (version = data .get ("version" , KEYSTORE_VERSION ), keys = keys )
75+ keys_data : dict [str , dict [str , str ]] = data .get ("keys" , {}) # type: ignore[assignment]
76+ keys = {k : StoredKey .from_dict (v ) for k , v in keys_data .items ()}
77+ return cls (version = data .get ("version" , KEYSTORE_VERSION ), keys = keys ) # type: ignore[arg-type]
7178
7279
7380class SigningKeystore :
@@ -84,6 +91,14 @@ class SigningKeystore:
8491 - Password-protected export/import
8592 - Encrypted storage with key derivation
8693
94+ **IMPORTANT SECURITY LIMITATION:**
95+ The file-based fallback storage uses vehicle_id as the encryption key derivation input.
96+ This provides obfuscation but NOT strong encryption against attackers with filesystem access.
97+ For production use, consider:
98+ - Using the OS keyring when available (provides proper security)
99+ - Implementing additional master password protection
100+ - Restricting filesystem access to the keystore file
101+
87102 Attributes:
88103 keyring_available: Whether OS keyring is available
89104 fallback_path: Path to encrypted fallback file
@@ -141,7 +156,10 @@ def _check_keyring_available(self) -> bool:
141156 # Test with a dummy operation
142157 test_key = f"_test_keyring_{ secrets .token_hex (4 )} "
143158 keyring .set_password (self ._keyring_service , test_key , "test" )
144- keyring .delete_password (self ._keyring_service , test_key )
159+ try :
160+ keyring .delete_password (self ._keyring_service , test_key )
161+ except Exception as cleanup_exc : # pylint: disable=broad-except
162+ self ._logger .warning ("Failed to cleanup keyring test: %s" , cleanup_exc )
145163 return True
146164 except ImportError :
147165 self ._logger .debug ("Keyring package not installed" )
@@ -202,13 +220,13 @@ def store_key(self, vehicle_id: str, key: bytes, description: str = "") -> bool:
202220 # Fall back to encrypted file storage
203221 return self ._store_key_in_file (vehicle_id , key , description )
204222
205- def _store_key_in_file (self , vehicle_id : str , key : bytes , description : str = "" ) -> bool :
223+ def _store_key_in_file (self , vehicle_id : str , key : bytes , description : str = "" ) -> bool : # pylint: disable=too-many-locals
206224 """Store key in encrypted file."""
207225 try :
208226 from cryptography .fernet import Fernet # noqa: PLC0415 # pylint: disable=import-outside-toplevel
209227 from cryptography .hazmat .primitives import hashes # noqa: PLC0415 # pylint: disable=import-outside-toplevel
210- from cryptography .hazmat .primitives .kdf .pbkdf2 import ( # noqa: PLC0415
211- PBKDF2HMAC , # pylint: disable=import-outside-toplevel
228+ from cryptography .hazmat .primitives .kdf .pbkdf2 import ( # noqa: PLC0415 # pylint: disable=import-outside-toplevel
229+ PBKDF2HMAC ,
212230 )
213231
214232 # Load existing keystore or create new
@@ -223,7 +241,7 @@ def _store_key_in_file(self, vehicle_id: str, key: bytes, description: str = "")
223241 algorithm = hashes .SHA256 (),
224242 length = 32 ,
225243 salt = salt ,
226- iterations = 480000 ,
244+ iterations = PBKDF2_ITERATIONS_FILE ,
227245 )
228246 derived_key = kdf .derive (vehicle_id .encode ())
229247 fernet_key = base64 .urlsafe_b64encode (derived_key )
@@ -286,8 +304,8 @@ def _retrieve_key_from_file(self, vehicle_id: str) -> Optional[bytes]:
286304 try :
287305 from cryptography .fernet import Fernet # noqa: PLC0415 # pylint: disable=import-outside-toplevel
288306 from cryptography .hazmat .primitives import hashes # noqa: PLC0415 # pylint: disable=import-outside-toplevel
289- from cryptography .hazmat .primitives .kdf .pbkdf2 import ( # noqa: PLC0415
290- PBKDF2HMAC , # pylint: disable=import-outside-toplevel
307+ from cryptography .hazmat .primitives .kdf .pbkdf2 import ( # noqa: PLC0415 # pylint: disable=import-outside-toplevel
308+ PBKDF2HMAC ,
291309 )
292310
293311 keystore = self ._load_keystore_file ()
@@ -306,14 +324,15 @@ def _retrieve_key_from_file(self, vehicle_id: str) -> Optional[bytes]:
306324 algorithm = hashes .SHA256 (),
307325 length = 32 ,
308326 salt = salt ,
309- iterations = 480000 ,
327+ iterations = PBKDF2_ITERATIONS_FILE ,
310328 )
311329 derived_key = kdf .derive (vehicle_id .encode ())
312330 fernet_key = base64 .urlsafe_b64encode (derived_key )
313331 fernet = Fernet (fernet_key )
314332
315333 # Decrypt the signing key
316- return fernet .decrypt (encrypted_key )
334+ decrypted_key : bytes = fernet .decrypt (encrypted_key )
335+ return decrypted_key
317336
318337 except Exception as exc : # pylint: disable=broad-except
319338 self ._logger .exception ("Failed to retrieve key from file: %s" , exc )
@@ -397,8 +416,8 @@ def export_key(self, vehicle_id: str, password: str) -> Optional[str]:
397416 try :
398417 from cryptography .fernet import Fernet # noqa: PLC0415 # pylint: disable=import-outside-toplevel
399418 from cryptography .hazmat .primitives import hashes # noqa: PLC0415 # pylint: disable=import-outside-toplevel
400- from cryptography .hazmat .primitives .kdf .pbkdf2 import ( # noqa: PLC0415
401- PBKDF2HMAC , # pylint: disable=import-outside-toplevel
419+ from cryptography .hazmat .primitives .kdf .pbkdf2 import ( # noqa: PLC0415 # pylint: disable=import-outside-toplevel
420+ PBKDF2HMAC ,
402421 )
403422
404423 # Generate salt
@@ -409,7 +428,7 @@ def export_key(self, vehicle_id: str, password: str) -> Optional[str]:
409428 algorithm = hashes .SHA256 (),
410429 length = 32 ,
411430 salt = salt ,
412- iterations = 600000 , # Higher iterations for password-based
431+ iterations = PBKDF2_ITERATIONS_EXPORT ,
413432 )
414433 derived_key = kdf .derive (password .encode ())
415434 fernet_key = base64 .urlsafe_b64encode (derived_key )
@@ -433,7 +452,7 @@ def export_key(self, vehicle_id: str, password: str) -> Optional[str]:
433452 self ._logger .exception ("Failed to export key: %s" , exc )
434453 return None
435454
436- def import_key (self , export_data : str , password : str ) -> Optional [str ]:
455+ def import_key (self , export_data : str , password : str ) -> Optional [str ]: # pylint: disable=too-many-locals
437456 """
438457 Import a signing key from an encrypted export.
439458
@@ -448,8 +467,8 @@ def import_key(self, export_data: str, password: str) -> Optional[str]:
448467 try :
449468 from cryptography .fernet import Fernet , InvalidToken # noqa: PLC0415 # pylint: disable=import-outside-toplevel
450469 from cryptography .hazmat .primitives import hashes # noqa: PLC0415 # pylint: disable=import-outside-toplevel
451- from cryptography .hazmat .primitives .kdf .pbkdf2 import ( # noqa: PLC0415
452- PBKDF2HMAC , # pylint: disable=import-outside-toplevel
470+ from cryptography .hazmat .primitives .kdf .pbkdf2 import ( # noqa: PLC0415 # pylint: disable=import-outside-toplevel
471+ PBKDF2HMAC ,
453472 )
454473
455474 # Decode export package
@@ -464,7 +483,7 @@ def import_key(self, export_data: str, password: str) -> Optional[str]:
464483 algorithm = hashes .SHA256 (),
465484 length = 32 ,
466485 salt = salt ,
467- iterations = 600000 ,
486+ iterations = PBKDF2_ITERATIONS_EXPORT ,
468487 )
469488 derived_key = kdf .derive (password .encode ())
470489 fernet_key = base64 .urlsafe_b64encode (derived_key )
@@ -495,19 +514,24 @@ def _load_keystore_file(self) -> KeystoreData:
495514 with open (self ._fallback_path , encoding = "utf-8" ) as f :
496515 data = json .load (f )
497516 return KeystoreData .from_dict (data )
498- except Exception as exc : # pylint: disable=broad-except
517+ except ( json . JSONDecodeError , OSError , KeyError , TypeError , ValueError ) as exc :
499518 self ._logger .warning ("Failed to load keystore file: %s" , exc )
500519 return KeystoreData ()
501520
502521 def _save_keystore_file (self , keystore : KeystoreData ) -> None :
503- """Save keystore to file."""
522+ """Save keystore to file with file locking for concurrent access protection ."""
504523 # Ensure directory exists
505524 self ._fallback_path .parent .mkdir (parents = True , exist_ok = True )
506525
507526 # Write atomically using a temporary file
508527 temp_path = self ._fallback_path .with_suffix (".tmp" )
509528 try :
510529 with open (temp_path , "w" , encoding = "utf-8" ) as f :
530+ # Apply file lock on Unix systems to prevent concurrent writes
531+ if os .name != "nt" :
532+ import fcntl # noqa: PLC0415 # pylint: disable=import-outside-toplevel
533+
534+ fcntl .flock (f .fileno (), fcntl .LOCK_EX )
511535 json .dump (keystore .to_dict (), f , indent = 2 )
512536
513537 # Atomic rename
0 commit comments