Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 21 additions & 25 deletions qubesusbproxy/core3ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,27 +426,32 @@ def _load_usb_known_devices(cls) -> Dict[str, Dict[str, Tuple[str, str]]]:
vendor_name: Optional[str] = None
for line in usb_ids.readlines():
line = line.rstrip()
if line.startswith("#"):
# skip comments
continue
if not line:
# skip empty lines
continue
if line.startswith("\t\t"):
# skip interfaces
first_char = line[0]
if first_char == "#":
# skip comments
continue
if line.startswith("C "):
if first_char == "C" and len(line) >= 2 and line[1] == " ":
# description of classes starts here, we can finish
break
if line.startswith("\t"):
if first_char == "\t":
if len(line) >= 2 and line[1] == "\t":
# skip interfaces
continue
# device line
parts = line[1:].split(" ", 2)
if len(parts) < 3:
continue
# save vendor, device pair
device_id, _, device_name = line[1:].split(" ", 2)
device_id, _, device_name = parts
if vendor_id is None or vendor_name is None:
continue
result[vendor_id][device_id] = vendor_name, device_name
result[vendor_id][device_id] = (vendor_name, device_name)
else:
# new vendor
vendor_id, _, vendor_name = line[:].split(" ", 2)
parts = line.split(" ", 2)
vendor_id, _, vendor_name = parts
result[vendor_id] = {}

cls._usb_known_devices = result
Expand Down Expand Up @@ -648,26 +653,17 @@ def on_device_get_usb(self, vm, event, port_id):
):
yield USBDevice(Port(vm, port_id, "usb"))

@staticmethod
def get_all_devices(app):
for vm in app.domains:
if not vm.is_running() or not hasattr(vm, "devices"):
continue

for dev in vm.devices["usb"]:
# there may be more than one USB-passthrough implementation
if isinstance(dev, USBDevice):
yield dev

@qubes.ext.handler("device-list-attached:usb")
def on_device_list_attached(self, vm, event, **kwargs):
# pylint: disable=unused-argument
if not vm.is_running():
return

for dev in self.get_all_devices(vm.app):
if dev.attachment == vm:
yield (dev, {})
for ports in self.devices_cache.values():
for port_id, attachment in ports.items():
if attachment == vm:
dev = USBDevice(Port(vm, port_id, "usb"))
yield (dev, {})

@qubes.ext.handler("device-pre-attach:usb")
async def on_device_attach_usb(self, vm, event, device, options):
Expand Down