diff --git a/btcrecover/addressset.py b/btcrecover/addressset.py index d26a1953..9fefceb8 100644 --- a/btcrecover/addressset.py +++ b/btcrecover/addressset.py @@ -386,6 +386,13 @@ def xor_at(data: bytes, key: Optional[bytes], offset: int) -> bytes: m = len(key) return bytes(b ^ key[(offset + i) % m] for i, b in enumerate(data)) +# Cached script pattern constants for block parsing performance +_P2PKH_PREFIX = b"\x76\xa9\x14" +_P2PKH_SUFFIX = b"\x88\xac" +_P2SH_PREFIX = b"\xa9\x14" +_P2WPKH_PREFIX = b"\x00\x14" +_P2TR_PREFIX = b"\x51\x20" + def create_address_db(dbfilename, blockdir, table_len, startBlockDate="2019-01-01", endBlockDate="3000-12-31", startBlockFile = 0, addressDB_yolo = False, outputToText = False, update = False, progress_bar = True, addresslistfile = None, multiFile = False, forcegzip = False): """Creates an AddressSet database and saves it to a file @@ -641,13 +648,13 @@ def create_address_db(dbfilename, blockdir, table_len, startBlockDate="2019-01-0 pkscript_len, offset = varint(block, offset + 8) # skips 8-byte satoshi count # If this is a P2PKH script (OP_DUP OP_HASH160 PUSH(20) <20 address bytes> OP_EQUALVERIFY OP_CHECKSIG) - if pkscript_len == 25 and block[offset:offset+3] == b"\x76\xa9\x14" and block[offset+23:offset+25] == b"\x88\xac": + if pkscript_len == 25 and block[offset:offset+3] == _P2PKH_PREFIX and block[offset+23:offset+25] == _P2PKH_SUFFIX: address_set.add(block[offset+3:offset+23],outputToText,'P2PKH') - elif block[offset:offset+2] == b"\xa9\x14": #Check for Segwit Address + elif block[offset:offset+2] == _P2SH_PREFIX: #Check for Segwit Address address_set.add(block[offset+2:offset+22],outputToText,'P2SH') - elif block[offset:offset+2] == b"\x00\x14": #Check for Native Segwit Address + elif block[offset:offset+2] == _P2WPKH_PREFIX: #Check for Native Segwit Address address_set.add(block[offset+2:offset+22],outputToText,'Bech32') - elif block[offset:offset+2] == b"\x51\x20": #Check for Taproot Address + elif block[offset:offset+2] == _P2TR_PREFIX: #Check for Taproot Address address_set.add(block[offset + 2:offset + 34], outputToText, 'Bech32m') offset += pkscript_len # advances past the pubkey script diff --git a/btcrecover/aezeed.py b/btcrecover/aezeed.py index 77daded7..1c67b0ac 100644 --- a/btcrecover/aezeed.py +++ b/btcrecover/aezeed.py @@ -84,20 +84,28 @@ def _mk_block(size: int = BLOCK_SIZE) -> bytearray: def _xor_bytes1x16(a: Sequence[int], b: Sequence[int], dst: bytearray) -> None: - for i in range(BLOCK_SIZE): - dst[i] = a[i] ^ b[i] + # Use integer XOR to process all 16 bytes at once instead of a Python loop + int_a = int.from_bytes(a, 'big') + int_b = int.from_bytes(b, 'big') + dst[:] = (int_a ^ int_b).to_bytes(BLOCK_SIZE, 'big') def _xor_bytes4x16( a: Sequence[int], b: Sequence[int], c: Sequence[int], d: Sequence[int], dst: bytearray ) -> None: - for i in range(BLOCK_SIZE): - dst[i] = a[i] ^ b[i] ^ c[i] ^ d[i] + # Use integer XOR to process all 16 bytes at once instead of a Python loop + int_a = int.from_bytes(a, 'big') + int_b = int.from_bytes(b, 'big') + int_c = int.from_bytes(c, 'big') + int_d = int.from_bytes(d, 'big') + dst[:] = (int_a ^ int_b ^ int_c ^ int_d).to_bytes(BLOCK_SIZE, 'big') def _xor_bytes(a: Sequence[int], b: Sequence[int], dst: bytearray) -> None: - for i in range(len(dst)): - dst[i] = a[i] ^ b[i] + n = len(dst) + int_a = int.from_bytes(a[:n], 'big') + int_b = int.from_bytes(b[:n], 'big') + dst[:] = (int_a ^ int_b).to_bytes(n, 'big') def _uint32(i: int) -> int: @@ -764,17 +772,13 @@ def _aez_decrypt(key: bytes, ad_list: Iterable[bytes], tau: int, ciphertext: byt x = bytearray(len(ciphertext)) if len(ciphertext) == tau: state.aez_prf(delta, tau, x) - mismatch = 0 - for i in range(tau): - mismatch |= x[i] ^ ciphertext[i] - if mismatch != 0: + # Use bytes comparison instead of byte-by-byte XOR loop + if x[:tau] != ciphertext[:tau]: return None return bytes() state.decipher(delta, ciphertext, x) - mismatch = 0 - for i in range(tau): - mismatch |= x[len(ciphertext) - tau + i] - if mismatch != 0: + # Check if trailing tau bytes are all zero + if any(x[-tau:]): return None return bytes(x[: len(ciphertext) - tau]) diff --git a/btcrecover/btcrpass.py b/btcrecover/btcrpass.py index c7c02b85..3e65ddd3 100644 --- a/btcrecover/btcrpass.py +++ b/btcrecover/btcrpass.py @@ -154,6 +154,10 @@ def error(s: str) -> None: passwordlist_first_line_num = 1 passwordlist_embedded_arguments = False +# Pre-built set of valid base58 byte values for fast character validation +# Base58 alphabet: 123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz +_base58_bytes = frozenset(b"123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz") + searchfailedtext = "\nAll possible passwords (as specified in your tokenlist or passwordlist) have been checked and none are correct for this wallet. You could consider trying again with a different password list or expanded tokenlist..." def load_customTokenWildcard(customTokenWildcardFile): @@ -980,20 +984,11 @@ def _return_verified_password_or_false_cpu(self, orig_passwords): # Multibit # # Does it look like a base58 private key (MultiBit, MultiDoge, or oldest-format Android key backup)? if b58_privkey[0] in "LK5Q".encode(): # private keys always start with L, K, or 5, or for MultiDoge Q - for c in b58_privkey[1:]: - # If it's outside of the base58 set [1-9A-HJ-NP-Za-km-z], break - if c > ord("z") or c < ord("1") or ord("9") < c < ord("A") or ord("Z") < c < ord("a") or chr(c) in "IOl": - break - # If the loop above doesn't break, it's base58-looking so far - else: + if all(c in _base58_bytes for c in b58_privkey[1:]): # If another AES block is available, decrypt and check it as well to avoid false positives if len(encrypted_block) >= 32: b58_privkey = l_aes256_cbc_decrypt(key1 + key2, encrypted_block[:16], encrypted_block[16:32]) - for c in b58_privkey: - if c > ord("z") or c < ord("1") or ord("9") < c < ord("A") or ord("Z") < c < ord("a") or chr(c) in "IOl": - break # not base58 - # If the loop above doesn't break, it's base58; we've found it - else: + if all(c in _base58_bytes for c in b58_privkey): if self._dump_privkeys_file: self.dump_privkeys_keybackup(key1, key2, iv) return orig_passwords[count-1], count @@ -1063,20 +1058,11 @@ def _return_verified_password_or_false_opencl(self, arg_passwords): # # Does it look like a base58 private key (MultiBit, MultiDoge, or oldest-format Android key backup)? if b58_privkey[0] in "LK5Q".encode(): # private keys always start with L, K, or 5, or for MultiDoge Q - for c in b58_privkey[1:]: - # If it's outside of the base58 set [1-9A-HJ-NP-Za-km-z], break - if c > ord("z") or c < ord("1") or ord("9") < c < ord("A") or ord("Z") < c < ord("a") or chr(c) in "IOl": - break - # If the loop above doesn't break, it's base58-looking so far - else: + if all(c in _base58_bytes for c in b58_privkey[1:]): # If another AES block is available, decrypt and check it as well to avoid false positives if len(encrypted_block) >= 32: b58_privkey = l_aes256_cbc_decrypt(key1 + key2, encrypted_block[:16], encrypted_block[16:32]) - for c in b58_privkey: - if c > ord("z") or c < ord("1") or ord("9") < c < ord("A") or ord("Z") < c < ord("a") or chr(c) in "IOl": - break # not base58 - # If the loop above doesn't break, it's base58; we've found it - else: + if all(c in _base58_bytes for c in b58_privkey): if self._dump_privkeys_file: self.dump_privkeys_keybackup(key1, key2, iv) return arg_passwords[count - 1], count @@ -1946,10 +1932,7 @@ def return_verified_password_or_false(self, passwords): #Electrum2 xprv = l_aes256_cbc_decrypt(key, iv, part_encrypted_xprv) if xprv.startswith(b"xprv") or xprv.startswith(b"zprv"): # BIP32 extended private key version bytes - for c in xprv[4:]: - # If it's outside of the base58 set [1-9A-HJ-NP-Za-km-z] - if c > ord("z") or c < ord("1") or ord("9") < c < ord("A") or ord("Z") < c < ord("a") or chr(c) in "IOl": break # not base58 - else: # if the loop above doesn't break, it's base58 + if all(c in _base58_bytes for c in xprv[4:]): return password.decode("utf_8", "replace"), count return False, count @@ -1982,11 +1965,8 @@ def return_verified_password_or_false(self, passwords): #ElectrumLooseKey padding_len = privkey_end[-1] # Check for valid PKCS7 padding for a 52 or 51 byte "WIF" private key # (4*16-byte-blocks == 64, 64 - 52 or 51 == 12 or 13 - if (padding_len == 12 or padding_len == 13) and privkey_end.endswith((chr(padding_len) * padding_len).encode()): - for c in privkey_end[:-padding_len]: - # If it's outside of the base58 set [1-9A-HJ-NP-Za-km-z] - if c > ord("z") or c < ord("1") or ord("9") < c < ord("A") or ord("Z") < c < ord("a") or chr(c) in "IOl": break # not base58 - else: # if the loop above doesn't break, it's base58 + if (padding_len == 12 or padding_len == 13) and privkey_end.endswith(bytes([padding_len]) * padding_len): + if all(c in _base58_bytes for c in privkey_end[:-padding_len]): return password.decode("utf_8", "replace"), count return False, count diff --git a/btcrecover/btcrseed.py b/btcrecover/btcrseed.py index 50a18690..9f3b895d 100644 --- a/btcrecover/btcrseed.py +++ b/btcrecover/btcrseed.py @@ -257,21 +257,13 @@ def compress_pubkey(uncompressed_pubkey): def load_pathlist(pathlistFile): - pathlist_file = open(pathlistFile, "r") - pathlist_lines = pathlist_file.readlines() - pathlist = [] - for path in pathlist_lines: - if path[0] == '#' or len(path.strip()) == 0: - continue - pathlist.append(path.split("#")[0].strip()) - pathlist_file.close() - return pathlist + with open(pathlistFile, "r") as pathlist_file: + return [line.split("#")[0].strip() for line in pathlist_file + if line.strip() and line[0] != '#'] def load_passphraselist(passphraselistFile): - passphraselist_file = open(passphraselistFile, "r") - passphraselist = passphraselist_file.read().splitlines() - passphraselist_file.close() - return passphraselist + with open(passphraselistFile, "r") as passphraselist_file: + return passphraselist_file.read().splitlines() import hmac import hashlib @@ -650,10 +642,7 @@ def return_verified_password_or_false(self, mnemonic_ids_list): for count, mnemonic_ids in enumerate(mnemonic_ids_list, 1): # In the event that a tokenlist based recovery is happening, convert the list from string sback to ints if (type(mnemonic_ids[0]) == str): - new_mnemonic_ids = [] - for word in mnemonic_ids: - new_mnemonic_ids.append(self._words.index(word)) - mnemonic_ids = new_mnemonic_ids + mnemonic_ids = [self._words.index(word) for word in mnemonic_ids] # Compute the binary seed from the word list the Electrum1 way seed = "" @@ -663,15 +652,11 @@ def return_verified_password_or_false(self, mnemonic_ids_list): + num_words2 * ( (mnemonic_ids[i + 2] - mnemonic_ids[i + 1]) % num_words )) # + # Convert to bytes once before the stretching loop to avoid + # repeated type checks across 100,000 iterations + seed = seed.encode() unstretched_seed = seed for i in range(100000): # Electrum1's seed stretching - - #Check the types of the seed and stretched_seed variables and force back to bytes (Allows most code to stay as-is for Py3) - if type(seed) is str: - seed = seed.encode() - if type(unstretched_seed) is str: - unstretched_seed = unstretched_seed.encode() - seed = l_sha256(seed + unstretched_seed).digest() # If a master public key was provided, check the pubkey derived from the seed against it @@ -688,7 +673,12 @@ def return_verified_password_or_false(self, mnemonic_ids_list): try: master_pubkey_bytes = coincurve.PublicKey.from_valid_secret(seed).format(compressed=False)[1:] except ValueError: continue - for seq_num in range(self._address_start_index, self._address_start_index + self._addrs_to_generate): + # Cache instance attributes as locals for the inner loop + l_known_hash160s = self._known_hash160s + l_address_start_index = self._address_start_index + l_addrs_to_generate = self._addrs_to_generate + + for seq_num in range(l_address_start_index, l_address_start_index + l_addrs_to_generate): # Compute the next deterministic private/public key pair the Electrum1 way. # FYI we derive a privkey first, and then a pubkey from that because it's # likely faster than deriving a pubkey directly from the base point and @@ -704,7 +694,7 @@ def return_verified_password_or_false(self, mnemonic_ids_list): # Compute the hash160 of the *uncompressed* public key, and check for a match - if ripemd160(l_sha256(d_pubkey).digest()) in self._known_hash160s: + if ripemd160(l_sha256(d_pubkey).digest()) in l_known_hash160s: return mnemonic_ids, count # found it return False, count @@ -936,18 +926,15 @@ def mn_mod(a, b): @staticmethod def words_to_bytes(words): - byte_array = [] - for word in words: - byte_array.extend(word.to_bytes(4, byteorder='big', signed=False)) + byte_array = bytearray(len(words) * 4) + for i, word in enumerate(words): + byte_array[i*4:(i+1)*4] = word.to_bytes(4, byteorder='big', signed=False) return byte_array @staticmethod def bytes_to_words(byte_array): - words = [] - for i in range(0, len(byte_array), 4): - word = int.from_bytes(byte_array[i:i+4], byteorder='big', signed=False) - words.append(word) - return words + return [int.from_bytes(byte_array[i:i+4], byteorder='big', signed=False) + for i in range(0, len(byte_array), 4)] @staticmethod def bytes_to_string(byte_array): @@ -1619,6 +1606,12 @@ def return_verified_password_or_false(self, mnemonic_ids_list): # This is the time-consuming function executed by worker thread(s). It returns a tuple: if a mnemonic # is correct return it, else return False for item 0; return a count of mnemonics checked for item 1 def _return_verified_password_or_false_cpu(self, mnemonic_ids_list): + # Cache the hmac key and type check outside the loop + l_hmac_new = hmac.new + l_hashlib_sha512 = hashlib.sha512 + bitcoin_seed_key = b"Bitcoin seed" + is_xlm = type(self) is WalletXLM + for count, mnemonic_ids in enumerate(mnemonic_ids_list, 1): if self.pre_start_benchmark or (not self._checksum_in_generator and not self._skip_worker_checksum): @@ -1635,8 +1628,8 @@ def _return_verified_password_or_false_cpu(self, mnemonic_ids_list): _derive_seed_list = self._derive_seed(mnemonic_ids) for derived_seed, salt in _derive_seed_list: - if type(self) is not WalletXLM: - seed_bytes = hmac.new("Bitcoin seed".encode('utf-8'), derived_seed, hashlib.sha512).digest() + if not is_xlm: + seed_bytes = l_hmac_new(bitcoin_seed_key, derived_seed, l_hashlib_sha512).digest() else: seed_bytes = derived_seed @@ -1647,26 +1640,29 @@ def _return_verified_password_or_false_cpu(self, mnemonic_ids_list): def _return_verified_password_or_false_opencl(self, mnemonic_ids_list): cleaned_mnemonic_ids_list = [] + is_electrum2 = type(self) is WalletElectrum2 + is_xlm = type(self) is WalletXLM + l_hmac_new = hmac.new + l_hashlib_sha512 = hashlib.sha512 + bitcoin_seed_key = b"Bitcoin seed" for mnemonic in mnemonic_ids_list: if not self._checksum_in_generator and not self._skip_worker_checksum: if self._verify_checksum(mnemonic): - if (type(self) is WalletElectrum2): + if is_electrum2: cleaned_mnemonic_ids_list.append(self._space.join(mnemonic).encode()) else: cleaned_mnemonic_ids_list.append(" ".join(mnemonic).encode()) else: - if type(self) is WalletElectrum2: + if is_electrum2: cleaned_mnemonic_ids_list.append(self._space.join(mnemonic).encode()) else: cleaned_mnemonic_ids_list.append(" ".join(mnemonic).encode()) + salt_prefix = b"electrum" if is_electrum2 else b"mnemonic" for i, salt in enumerate(self._derivation_salts,0): - if type(self) is WalletElectrum2: - salt = b"electrum" + salt - else: - salt = b"mnemonic" + salt + salt = salt_prefix + salt clResult = self.opencl_algo.cl_pbkdf2(self.opencl_context_pbkdf2_sha512[i], cleaned_mnemonic_ids_list, salt, 2048, 64) @@ -1674,8 +1670,8 @@ def _return_verified_password_or_false_opencl(self, mnemonic_ids_list): results = zip(cleaned_mnemonic_ids_list,clResult) for cleaned_mnemonic, derived_seed in results: - if type(self) is not WalletXLM: - seed_bytes = hmac.new("Bitcoin seed".encode('utf-8'), derived_seed, hashlib.sha512).digest() + if not is_xlm: + seed_bytes = l_hmac_new(bitcoin_seed_key, derived_seed, l_hashlib_sha512).digest() else: seed_bytes = derived_seed