|
1 | | -import threading |
2 | | -import usb |
3 | | - |
4 | | - |
5 | 1 | ############################################################################### |
6 | 2 | # driver contracts |
7 | 3 | ############################################################################### |
@@ -124,113 +120,3 @@ def listen(self): |
124 | 120 | """ |
125 | 121 | raise NotImplementedError( |
126 | 122 | 'Abstract ``Listener`` does not implement ``listen``') |
127 | | - |
128 | | - |
129 | | -############################################################################### |
130 | | -# USB backends |
131 | | -############################################################################### |
132 | | - |
133 | | -class USBBackend(Backend): |
134 | | - """Backend implementation for communicating with USB devices. |
135 | | - """ |
136 | | - |
137 | | - def __init__(self, vendor, product, bus=None, |
138 | | - port_numbers=None, interface=0): |
139 | | - self._device = None |
140 | | - # lookup devices by vendor and product |
141 | | - devices = [dev for dev in usb.core.find( |
142 | | - find_all=True, |
143 | | - idVendor=vendor, |
144 | | - idProduct=product |
145 | | - )] |
146 | | - # use first device if bus or port_numers not defined |
147 | | - if bus is None or port_numbers is None: |
148 | | - self._device = devices[0] |
149 | | - else: |
150 | | - for dev in devices: |
151 | | - if dev.bus == bus and dev.port_numbers == port_numbers: |
152 | | - self._device = dev |
153 | | - break |
154 | | - # if queried device not found, raise |
155 | | - if not self._device: |
156 | | - raise usb.core.USBError('Device not found') |
157 | | - # detach kernel driver if necessary |
158 | | - if self._device.is_kernel_driver_active(interface) is True: |
159 | | - self._device.detach_kernel_driver(interface) |
160 | | - # set device configuration |
161 | | - self._device.set_configuration() |
162 | | - # claim interface |
163 | | - usb.util.claim_interface(self._device, interface) |
164 | | - # get active configuration |
165 | | - cfg = self._device.get_active_configuration() |
166 | | - intf = cfg[(0, 0)] |
167 | | - # get write end point |
168 | | - def match_ep_out(e): |
169 | | - return usb.util.endpoint_direction(e.bEndpointAddress) == \ |
170 | | - usb.util.ENDPOINT_OUT |
171 | | - self._ep_write = usb.util.find_descriptor( |
172 | | - intf, custom_match=match_ep_out) |
173 | | - # get read end point |
174 | | - def match_ep_in(e): |
175 | | - return usb.util.endpoint_direction(e.bEndpointAddress) == \ |
176 | | - usb.util.ENDPOINT_IN |
177 | | - self._ep_read = usb.util.find_descriptor( |
178 | | - intf, custom_match=match_ep_in) |
179 | | - |
180 | | - def read(self, timeout=None): |
181 | | - """Read data from USB device. |
182 | | - """ |
183 | | - return self._ep_read.read(self._ep_read.wMaxPacketSize, timeout=timeout) |
184 | | - |
185 | | - def write(self, data): |
186 | | - """Write data to USB device. |
187 | | - """ |
188 | | - return self._ep_write.write(data) |
189 | | - |
190 | | - def close(self): |
191 | | - """Close connection to USB device. |
192 | | - """ |
193 | | - usb.util.dispose_resources(self._device) |
194 | | - |
195 | | - |
196 | | -class USBListener(USBBackend, Listener): |
197 | | - """Listener implementation for communicating with USB devices. |
198 | | - """ |
199 | | - |
200 | | - def __init__(self, driver, vendor, product, bus=None, |
201 | | - port_numbers=None, interface=0): |
202 | | - super(USBListener, self).__init__( |
203 | | - vendor, |
204 | | - product, |
205 | | - bus=bus, |
206 | | - port_numbers=port_numbers, |
207 | | - interface=interface |
208 | | - ) |
209 | | - self.driver = driver |
210 | | - # flag whether actually disconnecting from device |
211 | | - self._disconnecting = False |
212 | | - # event to stop listening |
213 | | - self._stop_listening = threading.Event() |
214 | | - # create and start listener thread |
215 | | - self._listener = threading.Thread(target=self.listen) |
216 | | - self._listener.start() |
217 | | - |
218 | | - def listen(self): |
219 | | - """Poll data from USB device. |
220 | | - """ |
221 | | - while not self._stop_listening.is_set(): |
222 | | - try: |
223 | | - self.driver.receive(self.read()) |
224 | | - except usb.core.USBError as e: |
225 | | - # read timeout |
226 | | - if e.errno == 110: |
227 | | - continue |
228 | | - if not self._disconnecting: |
229 | | - self.close() |
230 | | - |
231 | | - def close(self): |
232 | | - """Close connection to USB device. |
233 | | - """ |
234 | | - self._disconnecting = True |
235 | | - self._stop_listening.set() |
236 | | - super(USBListener, self).close() |
0 commit comments