2424proxy_connect = None
2525try :
2626 from websockets_proxy import Proxy as w_Proxy , proxy_connect as w_proxy_connect
27+
2728 Proxy = w_Proxy
2829 proxy_connect = w_proxy_connect
2930except ImportError :
3031 pass
3132
3233import websockets as ws
3334
34- from binance .exceptions import BinanceWebsocketUnableToConnect
35+ from binance .exceptions import (
36+ BinanceWebsocketClosed ,
37+ BinanceWebsocketUnableToConnect ,
38+ BinanceWebsocketQueueOverflow ,
39+ )
3540from binance .helpers import get_loop
3641from binance .ws .constants import WSListenerState
3742
@@ -89,6 +94,7 @@ async def close(self):
8994 await self .__aexit__ (None , None , None )
9095
9196 async def __aexit__ (self , exc_type , exc_val , exc_tb ):
97+ self ._log .debug (f"Closing Websocket { self ._url } { self ._prefix } { self ._path } " )
9298 if self ._exit_coro :
9399 await self ._exit_coro (self ._path )
94100 self .ws_state = WSListenerState .EXITING
@@ -98,7 +104,6 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
98104 await self ._conn .__aexit__ (exc_type , exc_val , exc_tb )
99105 self .ws = None
100106 if self ._handle_read_loop :
101- self ._log .error ("CANCEL read_loop" )
102107 await self ._kill_read_loop ()
103108
104109 async def connect (self ):
@@ -113,22 +118,25 @@ async def connect(self):
113118 # handle https_proxy
114119 if self ._https_proxy :
115120 if not Proxy or not proxy_connect :
116- raise ImportError ("websockets_proxy is not installed, please install it to use a websockets proxy (pip install websockets_proxy)" )
117- proxy = Proxy .from_url (self ._https_proxy ) # type: ignore
118- self ._conn = proxy_connect (ws_url , close_timeout = 0.1 , proxy = proxy , ** self ._ws_kwargs ) # type: ignore
121+ raise ImportError (
122+ "websockets_proxy is not installed, please install it to use a websockets proxy (pip install websockets_proxy)"
123+ )
124+ proxy = Proxy .from_url (self ._https_proxy ) # type: ignore
125+ self ._conn = proxy_connect (
126+ ws_url , close_timeout = 0.1 , proxy = proxy , ** self ._ws_kwargs
127+ ) # type: ignore
119128 else :
120129 self ._conn = ws .connect (ws_url , close_timeout = 0.1 , ** self ._ws_kwargs ) # type: ignore
121130
122131 try :
123132 self .ws = await self ._conn .__aenter__ ()
124133 except Exception as e : # noqa
125134 self ._log .error (f"Failed to connect to websocket: { e } " )
126- self .ws_state = WSListenerState .INITIALISING
127- return
135+ self .ws_state = WSListenerState .RECONNECTING
136+ raise e
128137 self .ws_state = WSListenerState .STREAMING
129138 self ._reconnects = 0
130139 await self ._after_connect ()
131- # To manage the "cannot call recv while another coroutine is already waiting for the next message"
132140 if not self ._handle_read_loop :
133141 self ._handle_read_loop = self ._loop .call_soon_threadsafe (
134142 asyncio .create_task , self ._read_loop ()
@@ -150,13 +158,23 @@ def _handle_message(self, evt):
150158 if self ._is_binary :
151159 try :
152160 evt = gzip .decompress (evt )
153- except (ValueError , OSError ):
154- return None
161+ except (ValueError , OSError ) as e :
162+ self ._log .error (f"Failed to decompress message: { (e )} " )
163+ raise
164+ except Exception as e :
165+ self ._log .error (f"Unexpected decompression error: { (e )} " )
166+ raise
155167 try :
156168 return self .json_loads (evt )
157- except ValueError :
158- self ._log .debug (f"error parsing evt json:{ evt } " )
159- return None
169+ except ValueError as e :
170+ self ._log .error (f"JSON Value Error parsing message: Error: { (e )} " )
171+ raise
172+ except TypeError as e :
173+ self ._log .error (f"JSON Type Error parsing message. Error: { (e )} " )
174+ raise
175+ except Exception as e :
176+ self ._log .error (f"Unexpected error parsing message. Error: { (e )} " )
177+ raise
160178
161179 async def _read_loop (self ):
162180 try :
@@ -174,45 +192,56 @@ async def _read_loop(self):
174192 await asyncio .sleep (0.1 )
175193 continue
176194 elif self .ws .state == ws .protocol .State .CLOSED : # type: ignore
177- await self ._reconnect ()
195+ self ._reconnect ()
196+ raise BinanceWebsocketClosed (
197+ "Connection closed. Reconnecting..."
198+ )
178199 elif self .ws_state == WSListenerState .STREAMING :
179200 assert self .ws
180201 res = await asyncio .wait_for (
181202 self .ws .recv (), timeout = self .TIMEOUT
182203 )
183204 res = self ._handle_message (res )
205+ self ._log .debug (f"Received message: { res } " )
184206 if res :
185207 if self ._queue .qsize () < self .MAX_QUEUE_SIZE :
186208 await self ._queue .put (res )
187209 else :
188- self ._log .debug (
189- f"Queue overflow { self .MAX_QUEUE_SIZE } . Message not filled"
190- )
191- await self ._queue .put (
192- {
193- "e" : "error" ,
194- "m" : "Queue overflow. Message not filled" ,
195- }
210+ raise BinanceWebsocketQueueOverflow (
211+ f"Message queue size { self ._queue .qsize ()} exceeded maximum { self .MAX_QUEUE_SIZE } "
196212 )
197- raise BinanceWebsocketUnableToConnect
198213 except asyncio .TimeoutError :
199214 self ._log .debug (f"no message in { self .TIMEOUT } seconds" )
200215 # _no_message_received_reconnect
201216 except asyncio .CancelledError as e :
202- self ._log .debug (f"cancelled error { e } " )
217+ self ._log .debug (f"_read_loop cancelled error { e } " )
203218 break
204- except asyncio .IncompleteReadError as e :
205- self ._log .debug (f"incomplete read error ({ e } )" )
206- except ConnectionClosedError as e :
207- self ._log .debug (f"connection close error ({ e } )" )
208- except gaierror as e :
209- self ._log .debug (f"DNS Error ({ e } )" )
210- except BinanceWebsocketUnableToConnect as e :
211- self ._log .debug (f"BinanceWebsocketUnableToConnect ({ e } )" )
219+ except (
220+ asyncio .IncompleteReadError ,
221+ gaierror ,
222+ ConnectionClosedError ,
223+ BinanceWebsocketClosed ,
224+ ) as e :
225+ # reports errors and continue loop
226+ self ._log .error (f"{ e .__class__ .__name__ } ({ e } )" )
227+ await self ._queue .put ({
228+ "e" : "error" ,
229+ "type" : f"{ e .__class__ .__name__ } " ,
230+ "m" : f"{ e } " ,
231+ })
232+ except (
233+ BinanceWebsocketUnableToConnect ,
234+ BinanceWebsocketQueueOverflow ,
235+ Exception ,
236+ ) as e :
237+ # reports errors and break the loop
238+ self ._log .error (f"Unknown exception ({ e } )" )
239+ await self ._queue .put ({
240+ "e" : "error" ,
241+ "type" : e .__class__ .__name__ ,
242+ "m" : f"{ e } " ,
243+ })
212244 break
213- except Exception as e :
214- self ._log .debug (f"Unknown exception ({ e } )" )
215- continue
216245 finally :
217246 self ._handle_read_loop = None # Signal the coro is stopped
218247 self ._reconnects = 0
@@ -226,11 +255,13 @@ async def _run_reconnect(self):
226255 f"waiting { reconnect_wait } "
227256 )
228257 await asyncio .sleep (reconnect_wait )
229- await self .connect ()
258+ try :
259+ await self .connect ()
260+ except Exception as e :
261+ pass
230262 else :
231263 self ._log .error (f"Max reconnections { self .MAX_RECONNECTS } reached:" )
232264 # Signal the error
233- await self ._queue .put ({"e" : "error" , "m" : "Max reconnect retries reached" })
234265 raise BinanceWebsocketUnableToConnect
235266
236267 async def recv (self ):
@@ -262,9 +293,5 @@ async def before_reconnect(self):
262293
263294 self ._reconnects += 1
264295
265- def _no_message_received_reconnect (self ):
266- self ._log .debug ("No message received, reconnecting" )
267- self .ws_state = WSListenerState .RECONNECTING
268-
269- async def _reconnect (self ):
296+ def _reconnect (self ):
270297 self .ws_state = WSListenerState .RECONNECTING
0 commit comments