-
Notifications
You must be signed in to change notification settings - Fork 55
Expand file tree
/
Copy pathwebsocket_envelope_reader.py
More file actions
51 lines (43 loc) · 1.52 KB
/
websocket_envelope_reader.py
File metadata and controls
51 lines (43 loc) · 1.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import ssl
from typing import Callable, Tuple
import websocket
class WebsocketFrameReader(object):
def __init__(
self,
url,
access_token_provider: Callable[[], str],
verify_ssl: bool = True,
proxy_host: str | None = None,
proxy_port: int | None = None,
proxy_auth: Tuple[str, str] | None = None,
):
if not verify_ssl:
self._ws = websocket.WebSocket(sslopt=dict(cert_reqs=ssl.CERT_NONE))
else:
self._ws = websocket.WebSocket()
self._url = url
self._proxy_host = proxy_host
self._proxy_port = proxy_port
self._proxy_auth = proxy_auth
self._access_token_provider = access_token_provider
def connect(self):
kw_args = dict(header=dict(Authorization="Bearer %s" % self._access_token_provider()))
if self._proxy_host is not None and self._proxy_port is not None:
kw_args["http_proxy_host"] = self._proxy_host
kw_args["http_proxy_port"] = str(self._proxy_port)
kw_args["http_proxy_auth"] = self._proxy_auth
self._ws.connect(self._url, **kw_args)
def close(self):
if self._ws.connected:
self._ws.close()
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def __iter__(self):
try:
for frame in self._ws:
yield frame
except websocket.WebSocketConnectionClosedException:
pass