|
10 | 10 | from datetime import datetime, timedelta, timezone |
11 | 11 | from pathlib import Path |
12 | 12 |
|
13 | | -from construct import Container, ChecksumError, CheckError |
| 13 | +from construct import Container, ListContainer, ChecksumError, CheckError |
14 | 14 |
|
15 | 15 | from lxml import etree |
16 | 16 | from lxml.builder import E |
|
30 | 30 |
|
31 | 31 | logger = logging.getLogger(__name__) |
32 | 32 |
|
33 | | -BLANK_DATABASE_FILENAME = "blank_database.kdbx" |
34 | | -BLANK_DATABASE_LOCATION = os.path.join(os.path.dirname(os.path.realpath(__file__)), BLANK_DATABASE_FILENAME) |
35 | | -BLANK_DATABASE_PASSWORD = "password" |
36 | 33 |
|
37 | 34 | class PyKeePass: |
38 | 35 | """Open a KeePass database |
@@ -211,6 +208,58 @@ def kdf_algorithm(self): |
211 | 208 | elif kdf_parameters['$UUID'].value == kdf_uuids['aeskdf']: |
212 | 209 | return 'aeskdf' |
213 | 210 |
|
| 211 | + def _get_kdf_parameters(self): |
| 212 | + """Get KDF parameters dict, raising error if not KDBX4 with Argon2.""" |
| 213 | + if self.version != (4, 0): |
| 214 | + raise ValueError("KDF parameters only available for KDBX4 databases") |
| 215 | + kdf_parameters = self.kdbx.header.value.dynamic_header.kdf_parameters.data.dict |
| 216 | + if kdf_parameters['$UUID'].value not in (kdf_uuids['argon2'], kdf_uuids['argon2id']): |
| 217 | + raise ValueError("KDF parameters only available for Argon2/Argon2id databases") |
| 218 | + return kdf_parameters |
| 219 | + |
| 220 | + def _invalidate_header_cache(self): |
| 221 | + """Invalidate cached header bytes to force rebuild on save. |
| 222 | +
|
| 223 | + The KDBX header is wrapped in construct's RawCopy which caches the |
| 224 | + original parsed bytes. When modifying header values (like KDF parameters), |
| 225 | + we must delete this cache to ensure the modified values are serialized. |
| 226 | + """ |
| 227 | + if hasattr(self.kdbx.header, 'data'): |
| 228 | + del self.kdbx.header.data |
| 229 | + # Also clear cached transformed key since KDF params affect it |
| 230 | + self.kdbx.body.transformed_key = None |
| 231 | + |
| 232 | + @property |
| 233 | + def argon2_iterations(self): |
| 234 | + """`int`: Argon2 time cost (iterations). Only available for KDBX4 with Argon2.""" |
| 235 | + return self._get_kdf_parameters()['I'].value |
| 236 | + |
| 237 | + @argon2_iterations.setter |
| 238 | + def argon2_iterations(self, value): |
| 239 | + self._get_kdf_parameters()['I'].value = int(value) |
| 240 | + self._invalidate_header_cache() |
| 241 | + |
| 242 | + @property |
| 243 | + def argon2_memory(self): |
| 244 | + """`int`: Argon2 memory cost in kibibytes (KiB). Only available for KDBX4 with Argon2.""" |
| 245 | + return self._get_kdf_parameters()['M'].value // 1024 |
| 246 | + |
| 247 | + @argon2_memory.setter |
| 248 | + def argon2_memory(self, value): |
| 249 | + """Set Argon2 memory cost in kibibytes (KiB).""" |
| 250 | + self._get_kdf_parameters()['M'].value = int(value) * 1024 |
| 251 | + self._invalidate_header_cache() |
| 252 | + |
| 253 | + @property |
| 254 | + def argon2_parallelism(self): |
| 255 | + """`int`: Argon2 parallelism (threads). Only available for KDBX4 with Argon2.""" |
| 256 | + return self._get_kdf_parameters()['P'].value |
| 257 | + |
| 258 | + @argon2_parallelism.setter |
| 259 | + def argon2_parallelism(self, value): |
| 260 | + self._get_kdf_parameters()['P'].value = int(value) |
| 261 | + self._invalidate_header_cache() |
| 262 | + |
214 | 263 | @property |
215 | 264 | def transformed_key(self): |
216 | 265 | """`bytes`: transformed key used in database decryption. May be cached |
@@ -1029,35 +1078,254 @@ def _decode_time(self, text): |
1029 | 1078 | else: |
1030 | 1079 | return datetime.fromisoformat(text.replace('Z','+00:00')).replace(tzinfo=timezone.utc) |
1031 | 1080 |
|
| 1081 | +def _build_kdbx4_structure( |
| 1082 | + encryption='aes256', |
| 1083 | + kdf='argon2id', |
| 1084 | + argon2_iterations=19, |
| 1085 | + argon2_memory=65536, |
| 1086 | + argon2_parallelism=2, |
| 1087 | +): |
| 1088 | + """Build a KDBX4 Container structure from scratch. |
| 1089 | +
|
| 1090 | + Args: |
| 1091 | + encryption (`str`): encryption algorithm ('aes256', 'chacha20', 'twofish') |
| 1092 | + kdf (`str`): key derivation function ('argon2id', 'argon2') |
| 1093 | + argon2_iterations (`int`): Argon2 time cost (iterations) |
| 1094 | + argon2_memory (`int`): Argon2 memory cost in kibibytes (KiB) |
| 1095 | + argon2_parallelism (`int`): Argon2 parallelism (threads) |
| 1096 | +
|
| 1097 | + Returns: |
| 1098 | + `Container`: KDBX4 structure ready for building |
| 1099 | + """ |
| 1100 | + # Validate parameters |
| 1101 | + if encryption not in ('aes256', 'chacha20', 'twofish'): |
| 1102 | + raise ValueError(f"Unsupported encryption: {encryption}") |
| 1103 | + if kdf not in ('argon2id', 'argon2'): |
| 1104 | + raise ValueError(f"Unsupported KDF: {kdf}") |
| 1105 | + |
| 1106 | + # Generate random security values |
| 1107 | + master_seed = os.urandom(32) |
| 1108 | + kdf_salt = os.urandom(32) |
| 1109 | + protected_stream_key = os.urandom(64) |
| 1110 | + root_group_uuid = uuid.uuid4().bytes |
| 1111 | + |
| 1112 | + # IV size depends on cipher |
| 1113 | + if encryption == 'chacha20': |
| 1114 | + encryption_iv = os.urandom(12) |
| 1115 | + else: # aes256, twofish |
| 1116 | + encryption_iv = os.urandom(16) |
| 1117 | + |
| 1118 | + # Helper for KeePass time encoding (base64-encoded binary timestamp) |
| 1119 | + def encode_time(): |
| 1120 | + # KeePass uses seconds since 0001-01-01 00:00:00 UTC, stored as 8-byte LE int |
| 1121 | + # For simplicity, encode current time |
| 1122 | + epoch = datetime(1, 1, 1, tzinfo=timezone.utc) |
| 1123 | + now = datetime.now(timezone.utc) |
| 1124 | + seconds = int((now - epoch).total_seconds()) |
| 1125 | + return base64.b64encode(struct.pack('<Q', seconds)).decode('ascii') |
| 1126 | + |
| 1127 | + def encode_uuid(u): |
| 1128 | + return base64.b64encode(u).decode('ascii') |
| 1129 | + |
| 1130 | + null_uuid = b'\x00' * 16 |
| 1131 | + |
| 1132 | + # Build the XML structure |
| 1133 | + xml_root = E.KeePassFile( |
| 1134 | + E.Meta( |
| 1135 | + E.Generator("pykeepass"), |
| 1136 | + E.DatabaseName("Database"), |
| 1137 | + E.DatabaseNameChanged(encode_time()), |
| 1138 | + E.DatabaseDescription(""), |
| 1139 | + E.DatabaseDescriptionChanged(encode_time()), |
| 1140 | + E.DefaultUserName(""), |
| 1141 | + E.DefaultUserNameChanged(encode_time()), |
| 1142 | + E.MaintenanceHistoryDays("365"), |
| 1143 | + E.Color(""), |
| 1144 | + E.MasterKeyChanged(encode_time()), |
| 1145 | + E.MasterKeyChangeRec("-1"), |
| 1146 | + E.MasterKeyChangeForce("-1"), |
| 1147 | + E.MemoryProtection( |
| 1148 | + E.ProtectTitle("False"), |
| 1149 | + E.ProtectUserName("False"), |
| 1150 | + E.ProtectPassword("True"), |
| 1151 | + E.ProtectURL("False"), |
| 1152 | + E.ProtectNotes("False"), |
| 1153 | + ), |
| 1154 | + E.CustomIcons(), |
| 1155 | + E.RecycleBinEnabled("True"), |
| 1156 | + E.RecycleBinUUID(encode_uuid(null_uuid)), |
| 1157 | + E.RecycleBinChanged(encode_time()), |
| 1158 | + E.EntryTemplatesGroup(encode_uuid(null_uuid)), |
| 1159 | + E.EntryTemplatesGroupChanged(encode_time()), |
| 1160 | + E.LastSelectedGroup(encode_uuid(null_uuid)), |
| 1161 | + E.LastTopVisibleGroup(encode_uuid(null_uuid)), |
| 1162 | + E.HistoryMaxItems("10"), |
| 1163 | + E.HistoryMaxSize("6291456"), |
| 1164 | + E.SettingsChanged(encode_time()), |
| 1165 | + E.CustomData(), |
| 1166 | + ), |
| 1167 | + E.Root( |
| 1168 | + E.Group( |
| 1169 | + E.UUID(encode_uuid(root_group_uuid)), |
| 1170 | + E.Name("Root"), |
| 1171 | + E.Notes(""), |
| 1172 | + E.IconID("48"), |
| 1173 | + E.Times( |
| 1174 | + E.LastModificationTime(encode_time()), |
| 1175 | + E.CreationTime(encode_time()), |
| 1176 | + E.LastAccessTime(encode_time()), |
| 1177 | + E.ExpiryTime(encode_time()), |
| 1178 | + E.Expires("False"), |
| 1179 | + E.UsageCount("0"), |
| 1180 | + E.LocationChanged(encode_time()), |
| 1181 | + ), |
| 1182 | + E.IsExpanded("True"), |
| 1183 | + E.DefaultAutoTypeSequence(""), |
| 1184 | + E.EnableAutoType("null"), |
| 1185 | + E.EnableSearching("null"), |
| 1186 | + E.LastTopVisibleEntry(encode_uuid(null_uuid)), |
| 1187 | + ), |
| 1188 | + E.DeletedObjects(), |
| 1189 | + ), |
| 1190 | + ) |
| 1191 | + xml_tree = etree.ElementTree(xml_root) |
| 1192 | + |
| 1193 | + # Build KDF parameters (VariantDictionary format) |
| 1194 | + kdf_uuid = kdf_uuids['argon2id'] if kdf == 'argon2id' else kdf_uuids['argon2'] |
| 1195 | + kdf_params = Container( |
| 1196 | + version=b'\x00\x01', |
| 1197 | + dict={ |
| 1198 | + '$UUID': Container(type=0x42, key='$UUID', value=kdf_uuid, next_byte=0x05), |
| 1199 | + 'I': Container(type=0x05, key='I', value=argon2_iterations, next_byte=0x05), |
| 1200 | + 'M': Container(type=0x05, key='M', value=argon2_memory * 1024, next_byte=0x04), |
| 1201 | + 'P': Container(type=0x04, key='P', value=argon2_parallelism, next_byte=0x42), |
| 1202 | + 'S': Container(type=0x42, key='S', value=kdf_salt, next_byte=0x04), |
| 1203 | + 'V': Container(type=0x04, key='V', value=0x13, next_byte=0x00), # Argon2 version 19 |
| 1204 | + } |
| 1205 | + ) |
| 1206 | + |
| 1207 | + # Build the complete KDBX structure |
| 1208 | + kdbx = Container( |
| 1209 | + header=Container( |
| 1210 | + # No 'data' attribute - will be built from value |
| 1211 | + value=Container( |
| 1212 | + sig1=b'\x03\xd9\xa2\x9a', |
| 1213 | + sig2=b'\x67\xfb\x4b\xb5', |
| 1214 | + sig_check=True, |
| 1215 | + minor_version=0, |
| 1216 | + major_version=4, |
| 1217 | + dynamic_header=Container({ |
| 1218 | + 'cipher_id': Container(id='cipher_id', data=encryption), |
| 1219 | + 'compression_flags': Container(id='compression_flags', data=Container(compression=True)), |
| 1220 | + 'master_seed': Container(id='master_seed', data=master_seed), |
| 1221 | + 'encryption_iv': Container(id='encryption_iv', data=encryption_iv), |
| 1222 | + 'kdf_parameters': Container(id='kdf_parameters', data=kdf_params), |
| 1223 | + 'end': Container(id='end', data=b''), |
| 1224 | + }), |
| 1225 | + ), |
| 1226 | + ), |
| 1227 | + body=Container( |
| 1228 | + transformed_key=None, # Computed during build |
| 1229 | + master_key=None, # Computed during build |
| 1230 | + sha256=None, # Computed during build |
| 1231 | + cred_check=None, # Computed during build |
| 1232 | + payload=Container( |
| 1233 | + inner_header=Container({ |
| 1234 | + 'protected_stream_id': Container(type='protected_stream_id', data='chacha20'), |
| 1235 | + 'protected_stream_key': Container(type='protected_stream_key', data=protected_stream_key), |
| 1236 | + 'binary': ListContainer([]), |
| 1237 | + 'end': Container(type='end', data=b''), |
| 1238 | + }), |
| 1239 | + xml=xml_tree, |
| 1240 | + ), |
| 1241 | + ), |
| 1242 | + ) |
| 1243 | + |
| 1244 | + return kdbx |
| 1245 | + |
| 1246 | + |
1032 | 1247 | def create_database( |
1033 | | - filename, password=None, keyfile=None, transformed_key=None |
| 1248 | + filename, |
| 1249 | + password=None, |
| 1250 | + keyfile=None, |
| 1251 | + transformed_key=None, |
| 1252 | + encryption='aes256', |
| 1253 | + kdf='argon2id', |
| 1254 | + argon2_iterations=19, |
| 1255 | + argon2_memory=65536, |
| 1256 | + argon2_parallelism=2, |
1034 | 1257 | ): |
1035 | 1258 | """ |
1036 | | - Create a new database at ``filename`` with supplied credentials. |
| 1259 | + Create a new KDBX4 database at ``filename`` with supplied credentials. |
1037 | 1260 |
|
1038 | 1261 | Args: |
1039 | | - filename (`str`, optional): path to database or stream object. |
1040 | | - If None, the path given when the database was opened is used. |
1041 | | - password (`str`, optional): database password. If None, |
| 1262 | + filename (`str`): path to database or stream object |
| 1263 | + password (`str`, optional): database password. If None, |
1042 | 1264 | database is assumed to have no password |
1043 | | - keyfile (`str`, optional): path to keyfile. If None, |
| 1265 | + keyfile (`str`, optional): path to keyfile. If None, |
1044 | 1266 | database is assumed to have no keyfile |
1045 | | - transformed_key (`bytes`, optional): precomputed transformed |
1046 | | - key. |
| 1267 | + transformed_key (`bytes`, optional): precomputed transformed key |
| 1268 | + encryption (`str`): encryption algorithm. One of 'aes256' (default), |
| 1269 | + 'chacha20', or 'twofish' |
| 1270 | + kdf (`str`): key derivation function. One of 'argon2id' (default) |
| 1271 | + or 'argon2' |
| 1272 | + argon2_iterations (`int`): Argon2 time cost / iterations (default: 19) |
| 1273 | + argon2_memory (`int`): Argon2 memory cost in kibibytes (default: 65536 = 64 MiB) |
| 1274 | + argon2_parallelism (`int`): Argon2 parallelism / threads (default: 2) |
1047 | 1275 |
|
1048 | 1276 | Returns: |
1049 | | - `PyKeePass` |
| 1277 | + `PyKeePass`: The newly created database instance |
| 1278 | +
|
| 1279 | + Example: |
| 1280 | + >>> from pykeepass import create_database |
| 1281 | + >>> kp = create_database('new.kdbx', password='secret') |
| 1282 | +
|
| 1283 | + >>> # With custom KDF settings for cold storage |
| 1284 | + >>> kp = create_database( |
| 1285 | + ... 'secure.kdbx', |
| 1286 | + ... password='secret', |
| 1287 | + ... argon2_iterations=100, |
| 1288 | + ... argon2_memory=256*1024, # 256 MiB |
| 1289 | + ... argon2_parallelism=4 |
| 1290 | + ... ) |
1050 | 1291 | """ |
1051 | | - keepass_instance = PyKeePass( |
1052 | | - BLANK_DATABASE_LOCATION, BLANK_DATABASE_PASSWORD |
| 1292 | + # Build the KDBX4 structure from scratch |
| 1293 | + kdbx = _build_kdbx4_structure( |
| 1294 | + encryption=encryption, |
| 1295 | + kdf=kdf, |
| 1296 | + argon2_iterations=argon2_iterations, |
| 1297 | + argon2_memory=argon2_memory, |
| 1298 | + argon2_parallelism=argon2_parallelism, |
1053 | 1299 | ) |
1054 | 1300 |
|
1055 | | - keepass_instance.filename = filename |
1056 | | - keepass_instance.password = password |
1057 | | - keepass_instance.keyfile = keyfile |
1058 | | - |
1059 | | - keepass_instance.save(transformed_key) |
1060 | | - return keepass_instance |
| 1301 | + # Build and save to file |
| 1302 | + if hasattr(filename, "write"): |
| 1303 | + KDBX.build_stream( |
| 1304 | + kdbx, |
| 1305 | + filename, |
| 1306 | + password=password, |
| 1307 | + keyfile=keyfile, |
| 1308 | + transformed_key=transformed_key, |
| 1309 | + decrypt=True, |
| 1310 | + ) |
| 1311 | + stream_filename = filename |
| 1312 | + else: |
| 1313 | + KDBX.build_file( |
| 1314 | + kdbx, |
| 1315 | + filename, |
| 1316 | + password=password, |
| 1317 | + keyfile=keyfile, |
| 1318 | + transformed_key=transformed_key, |
| 1319 | + decrypt=True, |
| 1320 | + ) |
| 1321 | + stream_filename = None |
| 1322 | + |
| 1323 | + # Open the newly created database and return PyKeePass instance |
| 1324 | + if stream_filename: |
| 1325 | + stream_filename.seek(0) |
| 1326 | + return PyKeePass(stream_filename, password=password, keyfile=keyfile) |
| 1327 | + else: |
| 1328 | + return PyKeePass(filename, password=password, keyfile=keyfile) |
1061 | 1329 |
|
1062 | 1330 | def debug_setup(): |
1063 | 1331 | """Convenience function to quickly enable debug messages""" |
|
0 commit comments