1616
1717
1818class DecryptCommand (Command ):
19- """Concrete command to perform decryption with secure parameters"""
20-
21- PBKDF2_ITERATIONS = 600000 # Must match encryption iterations
22-
23- def __init__ (self , config : BackupConfig , file_path : str ):
24- self .config = config
25- self .file_path = file_path
26- self .logger = logging .getLogger (__name__ )
19+ """
20+ Command to securely decrypt backup archives using OpenSSL with PBKDF2.
21+
22+ This class ensures decryption parameters match those used for encryption and
23+ verifies file integrity post-decryption. Logging uses %s formatting for performance.
24+ """
25+
26+ PBKDF2_ITERATIONS : int = 600000 # Must match encryption iterations
27+
28+ def __init__ (self , config : BackupConfig , file_path : str ) -> None :
29+ """
30+ Initialize the DecryptCommand.
31+
32+ Args:
33+ config (BackupConfig): The backup configuration object.
34+ file_path (str): Path to the encrypted file to decrypt.
35+ """
36+ self .config : BackupConfig = config
37+ self .file_path : str = file_path
38+ self .logger : logging .Logger = logging .getLogger (__name__ )
2739 self ._password_context = ContextManager ()._password_context
2840 self ._safe_cleanup = ContextManager ()._safe_cleanup
2941
3042 def execute (self ) -> bool :
31- """Secure decryption with matched PBKDF2 parameters"""
32- output_path = os .path .splitext (self .file_path )[0 ]
33- decrypted_path = f"{ output_path } -decrypted"
43+ """
44+ Perform secure decryption with matched PBKDF2 parameters.
45+
46+ Returns:
47+ bool: True if decryption and integrity check succeed, False otherwise.
48+ """
49+ output_path : str = os .path .splitext (self .file_path )[0 ]
50+ decrypted_path : str = f"{ output_path } -decrypted"
3451
52+ # Use password context manager to securely obtain password
3553 with self ._password_context () as password :
3654 if not password :
3755 return False
3856
39- cmd = [
57+ cmd : list [ str ] = [
4058 "openssl" ,
4159 "enc" ,
4260 "-d" ,
@@ -66,7 +84,10 @@ def execute(self) -> bool:
6684 self ._verify_integrity (decrypted_path )
6785 return True
6886 except subprocess .CalledProcessError as e :
69- self .logger .error (f"Decryption failed: { self ._sanitize_logs (e .stderr )} " )
87+ # Log sanitized error output to avoid leaking sensitive data
88+ self .logger .error (
89+ "Decryption failed: %s" , self ._sanitize_logs (e .stderr )
90+ )
7091 self ._safe_cleanup (decrypted_path )
7192 return False
7293 except subprocess .TimeoutExpired :
@@ -75,13 +96,18 @@ def execute(self) -> bool:
7596 return False
7697
7798 def _verify_integrity (self , decrypted_path : str ) -> None :
78- """Verify decrypted file matches original backup checksum"""
79- original_path = os .path .splitext (self .file_path )[0 ]
99+ """
100+ Verify decrypted file matches original backup checksum.
101+
102+ Args:
103+ decrypted_path (str): Path to the decrypted file.
104+ """
105+ original_path : str = os .path .splitext (self .file_path )[0 ]
80106 if os .path .exists (original_path ):
81- decrypted_hash = self ._calculate_sha256 (decrypted_path )
82- original_hash = self ._calculate_sha256 (original_path )
107+ decrypted_hash : str = self ._calculate_sha256 (decrypted_path )
108+ original_hash : str = self ._calculate_sha256 (original_path )
83109
84- # Compare hashes
110+ # Print hashes for manual inspection (acceptable for CLI tools)
85111 print (f"Decrypted file hash: { decrypted_hash } " )
86112 print (f"Original file hash: { original_hash } " )
87113
@@ -91,7 +117,15 @@ def _verify_integrity(self, decrypted_path: str) -> None:
91117 self .logger .error ("Integrity check failed" )
92118
93119 def _calculate_sha256 (self , file_path : str ) -> str :
94- """Calculate SHA256 checksum for a file"""
120+ """
121+ Calculate SHA256 checksum for a file.
122+
123+ Args:
124+ file_path (str): Path to the file.
125+
126+ Returns:
127+ str: SHA256 hex digest of the file contents.
128+ """
95129 sha256 = hashlib .sha256 ()
96130 with open (file_path , "rb" ) as f :
97131 while True :
@@ -102,8 +136,16 @@ def _calculate_sha256(self, file_path: str) -> str:
102136 return sha256 .hexdigest ()
103137
104138 def _sanitize_logs (self , output : bytes ) -> str :
105- """Safe log sanitization without modifying bytes"""
106- # Replace password=<value> with password=[REDACTED]
139+ """
140+ Sanitize log output to redact sensitive information.
141+
142+ Args:
143+ output (bytes): Raw stderr output from subprocess.
144+
145+ Returns:
146+ str: Sanitized string safe for logging.
147+ """
148+ # Redact password and IP addresses from logs for security
107149 sanitized = re .sub (rb"password=[^\s]*" , b"password=[REDACTED]" , output )
108150 sanitized = re .sub (rb"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b" , b"[IP_REDACTED]" , sanitized )
109151 return sanitized .decode ("utf-8" , errors = "replace" )
0 commit comments