From 77e5ecc5f482628ac3675a3fbfe0a7603b6dfecd Mon Sep 17 00:00:00 2001 From: Isaiah Inuwa Date: Wed, 6 Aug 2025 09:47:48 -0500 Subject: [PATCH] Apply lints to Python script --- webext/app/credential_manager_shim.py | 199 +++++++++++++++----------- 1 file changed, 113 insertions(+), 86 deletions(-) diff --git a/webext/app/credential_manager_shim.py b/webext/app/credential_manager_shim.py index c20b6235..2840521f 100755 --- a/webext/app/credential_manager_shim.py +++ b/webext/app/credential_manager_shim.py @@ -14,7 +14,10 @@ from dbus_next.aio import MessageBus from dbus_next import Variant -logging.basicConfig(filename='/tmp/credential_manager_shim.log', encoding='utf-8', level=logging.DEBUG) +logging.basicConfig( + filename="/tmp/credential_manager_shim.log", encoding="utf-8", level=logging.DEBUG +) + def getMessage(): logging.debug("blocking on read") @@ -24,51 +27,55 @@ def getMessage(): sys.exit(0) try: logging.debug(f"unpacking struct: {rawLength}") - messageLength = struct.unpack('@I', rawLength)[0] + messageLength = struct.unpack("@I", rawLength)[0] logging.debug(f"reading {messageLength} bytes") except Exception as e: logging.error("Failed to convert rawLength to integer", exc_info=e) try: raw_msg = sys.stdin.buffer.read(messageLength) logging.debug(f"received bytes: {raw_msg}") - message = raw_msg.decode('utf-8') + message = raw_msg.decode("utf-8") logging.debug("received " + message) return json.loads(message) except Exception as e: logging.error("Failed to read message") raise e + # Encode a message for transmission, # given its content. def encodeMessage(messageContent): - encodedContent = json.dumps(messageContent).encode('utf-8') - encodedLength = struct.pack('@I', len(encodedContent)) - return {'length': encodedLength, 'content': encodedContent} + encodedContent = json.dumps(messageContent).encode("utf-8") + encodedLength = struct.pack("@I", len(encodedContent)) + return {"length": encodedLength, "content": encodedContent} + # Send an encoded message to stdout def sendMessage(encodedMessage): - sys.stdout.buffer.write(encodedMessage['length']) - sys.stdout.buffer.write(encodedMessage['content']) + sys.stdout.buffer.write(encodedMessage["length"]) + sys.stdout.buffer.write(encodedMessage["content"]) sys.stdout.buffer.flush() logging.debug(f"sent message: {encodedMessage}") def b64_encode(data: bytes) -> str: - return base64.urlsafe_b64encode(data).rstrip(b'=').decode('ascii') + return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii") + def b64_decode(s) -> bytes: - padding = '=' * (len(s) % 4) + padding = "=" * (len(s) % 4) return base64.urlsafe_b64decode(s + padding) + class MajorType(Enum): - PositiveInteger = 0, - NegativeInteger = 1, - ByteString = 2, - TextString = 3, - Array = 4, - Map = 5, - Tag = 6, - SimpleOrFloat = 7, + PositiveInteger = (0,) + NegativeInteger = (1,) + ByteString = (2,) + TextString = (3,) + Array = (4,) + Map = (5,) + Tag = (6,) + SimpleOrFloat = (7,) class CborParser: @@ -89,16 +96,16 @@ def _read_value(self, buf): argument_len = 0 elif additional_info == 24: argument_len = 1 - argument = struct.unpack('>B', buf[1:1+argument_len])[0] + argument = struct.unpack(">B", buf[1 : 1 + argument_len])[0] elif additional_info == 25: argument_len = 2 - argument = struct.unpack('>H', buf[1:1+argument_len])[0] + argument = struct.unpack(">H", buf[1 : 1 + argument_len])[0] elif additional_info == 26: argument_len = 4 - argument = struct.unpack('>I', buf[1:1+argument_len])[0] + argument = struct.unpack(">I", buf[1 : 1 + argument_len])[0] elif additional_info == 27: argument_len = 8 - argument = struct.unpack('>Q', buf[1:1+argument_len])[0] + argument = struct.unpack(">Q", buf[1 : 1 + argument_len])[0] elif additional_info == 31: # Indefinite length for types 2-5 argument = None @@ -138,12 +145,12 @@ def _read_value(self, buf): string_len = 0 # indefinite length value = "" - while self.data[self.pos] != 0xff: - val = self._read_value(self.data[self.pos:])[0] + while self.data[self.pos] != 0xFF: + val = self._read_value(self.data[self.pos :])[0] value += val string_len = 1 else: - value = self.data[self.pos:self.pos+string_len] + value = self.data[self.pos : self.pos + string_len] bytes_consumed = string_len case MajorType.TextString: @@ -151,12 +158,14 @@ def _read_value(self, buf): if string_len is None: # indefinite length value = "" - while self.data[self.pos] != 0xff: - val = self._read_value(self.data[self.pos:]) + while self.data[self.pos] != 0xFF: + val = self._read_value(self.data[self.pos :]) value += val bytes_consumed = 1 else: - value = codecs.utf_8_decode(self.data[self.pos:self.pos+string_len])[0] + value = codecs.utf_8_decode( + self.data[self.pos : self.pos + string_len] + )[0] bytes_consumed = string_len case MajorType.Map: @@ -164,15 +173,15 @@ def _read_value(self, buf): if argument is None: argument = 0 value = {} - while self.data[self.pos] != 0xff: - inner_key = self._read_value(self.data[self.pos:]) - inner_value = self._read_value(self.data[self.pos:]) + while self.data[self.pos] != 0xFF: + inner_key = self._read_value(self.data[self.pos :]) + inner_value = self._read_value(self.data[self.pos :]) value[inner_key] = inner_value bytes_consumed = 1 else: for _ in range(argument): - inner_key = self._read_value(self.data[self.pos:]) - inner_value = self._read_value(self.data[self.pos:]) + inner_key = self._read_value(self.data[self.pos :]) + inner_value = self._read_value(self.data[self.pos :]) value[inner_key] = inner_value case MajorType.Array: @@ -180,13 +189,13 @@ def _read_value(self, buf): if argument is None: argument = 0 value = [] - while self.data[self.pos] != 0xff: - inner_value = self._read_value(self.data[self.pos:]) + while self.data[self.pos] != 0xFF: + inner_value = self._read_value(self.data[self.pos :]) value.append(inner_value) bytes_consumed = 1 else: for _ in range(argument): - inner_value = self._read_value(self.data[self.pos:]) + inner_value = self._read_value(self.data[self.pos :]) value.append(inner_value) case MajorType.Tag: @@ -207,10 +216,12 @@ def _read_value(self, buf): self.pos += bytes_consumed return value + def cbor_loads(data): parser = CborParser(data) return parser.parse() + def _parse_authenticator_data(auth_data): client_rp_id_hash = auth_data[:32] @@ -223,17 +234,17 @@ def _parse_authenticator_data(auth_data): flags.add(bits[i]) flag_byte = flag_byte >> 1 - sign_count = struct.unpack('>I', auth_data[33:37])[0] + sign_count = struct.unpack(">I", auth_data[33:37])[0] - if 'AT' in flags: - aaguid = auth_data[37:37 + 16] - cred_id_length = struct.unpack('>H', auth_data[53:55])[0] - cred_id = auth_data[55:55+cred_id_length] - parser = CborParser(auth_data[55 + cred_id_length:]) + if "AT" in flags: + aaguid = auth_data[37 : 37 + 16] + cred_id_length = struct.unpack(">H", auth_data[53:55])[0] + cred_id = auth_data[55 : 55 + cred_id_length] + parser = CborParser(auth_data[55 + cred_id_length :]) _ = parser.parse() - cose_key_bytes = parser.data[:parser.pos] + cose_key_bytes = parser.data[: parser.pos] cose_key_bytes_len = len(cose_key_bytes) - assert(len(cose_key_bytes) == parser.pos) + assert len(cose_key_bytes) == parser.pos attested_cred_data_len = 55 + cred_id_length + cose_key_bytes_len else: @@ -242,8 +253,8 @@ def _parse_authenticator_data(auth_data): cred_id = None cose_key_bytes = None - if 'ED' in flags: - extensions = cbor_loads(auth_data[37 + attested_cred_data_len:]) + if "ED" in flags: + extensions = cbor_loads(auth_data[37 + attested_cred_data_len :]) else: extensions = None return AuthenticatorData( @@ -253,9 +264,10 @@ def _parse_authenticator_data(auth_data): aaguid=aaguid, cred_id=cred_id, pub_key_bytes=cose_key_bytes, - extensions=extensions + extensions=extensions, ) + @dataclass class AuthenticatorData: rp_id_hash: bytes @@ -280,27 +292,31 @@ async def create_passkey(interface, options, origin, top_origin): req_json = json.dumps(options) logging.debug(req_json) req = { - "type": Variant('s', "publicKey"), - "origin": Variant('s', origin), - "is_same_origin": Variant('b', is_same_origin), - "publicKey": Variant('a{sv}', { - "request_json": Variant('s', req_json) - }) + "type": Variant("s", "publicKey"), + "origin": Variant("s", origin), + "is_same_origin": Variant("b", is_same_origin), + "publicKey": Variant("a{sv}", {"request_json": Variant("s", req_json)}), } logging.debug("Sending request to D-Bus API") rsp = await interface.call_create_credential(req) - if rsp['type'].value != 'public-key': - raise Exception(f"Invalid credential type received: expected 'public-key', received {rsp['type'.value]}") - response_json = json.loads(rsp['public_key'].value['registration_response_json'].value) - attestation = cbor_loads(b64_decode(response_json['response']['attestationObject'])) + if rsp["type"].value != "public-key": + raise Exception( + f"Invalid credential type received: expected 'public-key', received {rsp['type'.value]}" + ) + response_json = json.loads( + rsp["public_key"].value["registration_response_json"].value + ) + attestation = cbor_loads(b64_decode(response_json["response"]["attestationObject"])) auth_data_view = attestation["authData"] - response_json['response']['authenticatorData'] = b64_encode(auth_data_view) + response_json["response"]["authenticatorData"] = b64_encode(auth_data_view) auth_data = _parse_authenticator_data(auth_data_view) if auth_data.pub_key_bytes: # TODO: format this as SubjectPublicKeyInfo -_- - response_json['response']['publicKey'] = b64_encode(auth_data.pub_key_bytes) + response_json["response"]["publicKey"] = b64_encode(auth_data.pub_key_bytes) COSE_ALG = 3 - response_json['response']['publicKeyAlgorithm'] = auth_data.get_pub_key()[COSE_ALG] + response_json["response"]["publicKeyAlgorithm"] = auth_data.get_pub_key()[ + COSE_ALG + ] return response_json @@ -310,20 +326,22 @@ async def get_passkey(interface, options, origin, top_origin): req_json = json.dumps(options) logging.debug(req_json) req = { - "type": Variant('s', "publicKey"), - "origin": Variant('s', origin), - "is_same_origin": Variant('b', is_same_origin), - "publicKey": Variant('a{sv}', { - "request_json": Variant('s', req_json) - }) + "type": Variant("s", "publicKey"), + "origin": Variant("s", origin), + "is_same_origin": Variant("b", is_same_origin), + "publicKey": Variant("a{sv}", {"request_json": Variant("s", req_json)}), } logging.debug("Sending request to D-Bus API") rsp = await interface.call_get_credential(req) - if rsp['type'].value != 'public-key': - raise Exception(f"Invalid credential type received: expected 'public-key', received {rsp['type'.value]}") + if rsp["type"].value != "public-key": + raise Exception( + f"Invalid credential type received: expected 'public-key', received {rsp['type'.value]}" + ) - response_json = json.loads(rsp['public_key'].value['authentication_response_json'].value) + response_json = json.loads( + rsp["public_key"].value["authentication_response_json"].value + ) return response_json @@ -332,6 +350,7 @@ async def run(cmd, options, origin, top_origin): bus = await MessageBus().connect() logging.debug("Connected to bus") import os + logging.info(os.getcwd()) with open('../../contrib/xyz.iinuwa.credentials.CredentialManager.xml', 'r') as f: @@ -346,17 +365,25 @@ async def run(cmd, options, origin, top_origin): interface = proxy_object.get_interface("xyz.iinuwa.credentials.Credentials1") logging.debug(f"Connected to interface at {interface.path}") - if cmd == 'create': - if 'publicKey' in options: - return await create_passkey(interface, options['publicKey'], origin, top_origin) + if cmd == "create": + if "publicKey" in options: + return await create_passkey( + interface, options["publicKey"], origin, top_origin + ) else: - raise Exception(f"Could not create unknown credential type: {options.keys()[0]}") - elif cmd == 'get': - if 'publicKey' in options: - return await get_passkey(interface, options['publicKey'], origin, top_origin) + raise Exception( + f"Could not create unknown credential type: {options.keys()[0]}" + ) + elif cmd == "get": + if "publicKey" in options: + return await get_passkey( + interface, options["publicKey"], origin, top_origin + ) else: - raise Exception(f"Could not get unknown credential type: {options.keys()[0]}") - elif cmd == 'getClientCapabilities': + raise Exception( + f"Could not get unknown credential type: {options.keys()[0]}" + ) + elif cmd == "getClientCapabilities": rsp = await interface.call_get_client_capabilities() response = {} for name, val in rsp.items(): @@ -370,19 +397,19 @@ async def run(cmd, options, origin, top_origin): while True: logging.debug("starting event loop message") receivedMessage = getMessage() - request_id = receivedMessage['requestId'] + request_id = receivedMessage["requestId"] try: - cmd = receivedMessage['cmd'] + cmd = receivedMessage["cmd"] options = None - if 'options' in receivedMessage: - options = receivedMessage['options'] - origin = receivedMessage['origin'] - top_origin = receivedMessage['topOrigin'] + if "options" in receivedMessage: + options = receivedMessage["options"] + origin = receivedMessage["origin"] + top_origin = receivedMessage["topOrigin"] loop = asyncio.get_event_loop() auth_data = loop.run_until_complete(run(cmd, options, origin, top_origin)) - sendMessage(encodeMessage({ "requestId": request_id, "data": auth_data })) + sendMessage(encodeMessage({"requestId": request_id, "data": auth_data})) except Exception as e: logging.error("Failed to send message", exc_info=e) - sendMessage(encodeMessage({ "requestId": request_id, "error": str(e) })) + sendMessage(encodeMessage({"requestId": request_id, "error": str(e)})) logging.debug("Sent error message")