Skip to content

Commit 890d191

Browse files
committed
Merge branch 'master' of github.com:sammchardy/python-binance
2 parents 2ed81d9 + dccb3d9 commit 890d191

29 files changed

Lines changed: 586 additions & 462 deletions

.github/workflows/python-app.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ jobs:
3434
build:
3535
needs: lint
3636
runs-on: ubuntu-22.04
37-
timeout-minutes: 60
37+
timeout-minutes: 40
3838
env:
3939
PROXY: "http://51.83.140.52:16301"
4040
TEST_TESTNET: "true"

binance/async_client.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ async def create(
6464
testnet: bool = False,
6565
loop=None,
6666
session_params: Optional[Dict[str, Any]] = None,
67+
private_key: Optional[Union[str, Path]] = None,
68+
private_key_pass: Optional[str] = None,
6769
https_proxy: Optional[str] = None,
70+
time_unit: Optional[str] = None,
6871
):
6972
self = cls(
7073
api_key,
@@ -75,6 +78,10 @@ async def create(
7578
testnet,
7679
loop,
7780
session_params,
81+
private_key,
82+
private_key_pass,
83+
https_proxy,
84+
time_unit
7885
)
7986
self.https_proxy = https_proxy # move this to the constructor
8087

@@ -162,7 +169,8 @@ async def _handle_response(self, response: aiohttp.ClientResponse):
162169
if not str(response.status).startswith("2"):
163170
raise BinanceAPIException(response, response.status, await response.text())
164171

165-
if response.text == "":
172+
text = await response.text()
173+
if text == "":
166174
return {}
167175

168176
try:

binance/base_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class BaseClient:
3636
OPTIONS_TESTNET_URL = "https://testnet.binanceops.{}/eapi"
3737
PAPI_URL = "https://papi.binance.{}/papi"
3838
WS_API_URL = "wss://ws-api.binance.{}/ws-api/v3"
39-
WS_API_TESTNET_URL = "wss://testnet.binance.vision/ws-api/v3"
39+
WS_API_TESTNET_URL = "wss://ws-api.testnet.binance.vision/ws-api/v3"
4040
WS_FUTURES_URL = "wss://ws-fapi.binance.{}/ws-fapi/v1"
4141
WS_FUTURES_TESTNET_URL = "wss://testnet.binancefuture.com/ws-fapi/v1"
4242
PUBLIC_API_VERSION = "v1"

binance/helpers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import dateparser
77
import pytz
88

9-
from datetime import datetime
9+
from datetime import datetime, timezone
1010

1111
from binance.exceptions import UnknownDateFormat
1212

@@ -21,7 +21,7 @@ def date_to_milliseconds(date_str: str) -> int:
2121
:param date_str: date in readable format, i.e. "January 01, 2018", "11 hours ago UTC", "now UTC"
2222
"""
2323
# get epoch value in UTC
24-
epoch: datetime = datetime.utcfromtimestamp(0).replace(tzinfo=pytz.utc)
24+
epoch: datetime = datetime.fromtimestamp(0,timezone.utc)
2525
# parse our date string
2626
d: Optional[datetime] = dateparser.parse(date_str, settings={"TIMEZONE": "UTC"})
2727
if not d:

binance/ws/depthcache.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -185,15 +185,17 @@ async def __aenter__(self):
185185
return self
186186

187187
async def __aexit__(self, *args, **kwargs):
188+
self._log.debug(f"Exiting depth cache manager for {self._symbol}")
188189
await self._socket.__aexit__(*args, **kwargs)
189190

190191
async def recv(self):
191192
dc = None
192193
while not dc:
193194
try:
194195
res = await asyncio.wait_for(self._socket.recv(), timeout=self.TIMEOUT)
196+
self._log.debug(f"Received message: {res}")
195197
except Exception as e:
196-
self._log.warning(e)
198+
self._log.warning(f"Exception recieving message: {e.__class__.__name__} (e) ")
197199
else:
198200
dc = await self._depth_event(res)
199201
return dc
@@ -203,7 +205,7 @@ async def _init_cache(self):
203205
204206
:return:
205207
"""
206-
208+
self._log.debug(f"Initialising depth cache for {self._symbol}")
207209
# initialise or clear depth cache
208210
self._depth_cache = DepthCache(self._symbol, conv_type=self._conv_type)
209211

@@ -228,16 +230,15 @@ async def _depth_event(self, msg):
228230
:return:
229231
230232
"""
233+
self._log.debug(f"Received depth event: {msg}")
231234

232235
if not msg:
233236
return None
234237

235238
if "e" in msg and msg["e"] == "error":
236-
# close the socket
237-
await self.close()
238-
239-
# notify the user by returning a None value
240-
return None
239+
# notify user by return msg with error
240+
self._log.error(f"Error in depth event restarting cache: {msg}")
241+
return msg
241242

242243
return await self._process_depth_message(msg)
243244

binance/ws/reconnecting_websocket.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ class ReconnectingWebsocket:
4747
MIN_RECONNECT_WAIT = 0.1
4848
TIMEOUT = 10
4949
NO_MESSAGE_RECONNECT_TIMEOUT = 60
50-
MAX_QUEUE_SIZE = 100
5150

5251
def __init__(
5352
self,
@@ -57,6 +56,7 @@ def __init__(
5756
is_binary: bool = False,
5857
exit_coro=None,
5958
https_proxy: Optional[str] = None,
59+
max_queue_size: int = 100,
6060
**kwargs,
6161
):
6262
self._loop = get_loop()
@@ -75,6 +75,7 @@ def __init__(
7575
self._handle_read_loop = None
7676
self._https_proxy = https_proxy
7777
self._ws_kwargs = kwargs
78+
self.max_queue_size = max_queue_size
7879

7980
def json_dumps(self, msg) -> str:
8081
if orjson:
@@ -203,17 +204,22 @@ async def _read_loop(self):
203204
res = self._handle_message(res)
204205
self._log.debug(f"Received message: {res}")
205206
if res:
206-
if self._queue.qsize() < self.MAX_QUEUE_SIZE:
207+
if self._queue.qsize() < self.max_queue_size:
207208
await self._queue.put(res)
208209
else:
209210
raise BinanceWebsocketQueueOverflow(
210-
f"Message queue size {self._queue.qsize()} exceeded maximum {self.MAX_QUEUE_SIZE}"
211+
f"Message queue size {self._queue.qsize()} exceeded maximum {self.max_queue_size}"
211212
)
212213
except asyncio.TimeoutError:
213214
self._log.debug(f"no message in {self.TIMEOUT} seconds")
214215
# _no_message_received_reconnect
215216
except asyncio.CancelledError as e:
216217
self._log.debug(f"_read_loop cancelled error {e}")
218+
await self._queue.put({
219+
"e": "error",
220+
"type": f"{e.__class__.__name__}",
221+
"m": f"{e}",
222+
})
217223
break
218224
except (
219225
asyncio.IncompleteReadError,
@@ -234,7 +240,7 @@ async def _read_loop(self):
234240
Exception,
235241
) as e:
236242
# reports errors and break the loop
237-
self._log.error(f"Unknown exception ({e})")
243+
self._log.error(f"Unknown exception: {e.__class__.__name__} ({e})")
238244
await self._queue.put({
239245
"e": "error",
240246
"type": e.__class__.__name__,

binance/ws/streams.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class BinanceSocketType(str, Enum):
2525

2626
class BinanceSocketManager:
2727
STREAM_URL = "wss://stream.binance.{}:9443/"
28-
STREAM_TESTNET_URL = "wss://testnet.binance.vision/"
28+
STREAM_TESTNET_URL = "wss://stream.testnet.binance.vision/"
2929
FSTREAM_URL = "wss://fstream.binance.{}/"
3030
FSTREAM_TESTNET_URL = "wss://stream.binancefuture.com/"
3131
DSTREAM_URL = "wss://dstream.binance.{}/"
@@ -36,12 +36,19 @@ class BinanceSocketManager:
3636
WEBSOCKET_DEPTH_10 = "10"
3737
WEBSOCKET_DEPTH_20 = "20"
3838

39-
def __init__(self, client: AsyncClient, user_timeout=KEEPALIVE_TIMEOUT):
39+
def __init__(
40+
self,
41+
client: AsyncClient,
42+
user_timeout=KEEPALIVE_TIMEOUT,
43+
max_queue_size: int = 100,
44+
):
4045
"""Initialise the BinanceSocketManager
4146
4247
:param client: Binance API client
4348
:type client: binance.AsyncClient
44-
49+
:param user_timeout: Timeout for user socket in seconds
50+
:param max_queue_size: Max size of the websocket queue, defaults to 100
51+
:type max_queue_size: int
4552
"""
4653
self.STREAM_URL = self.STREAM_URL.format(client.tld)
4754
self.FSTREAM_URL = self.FSTREAM_URL.format(client.tld)
@@ -52,8 +59,8 @@ def __init__(self, client: AsyncClient, user_timeout=KEEPALIVE_TIMEOUT):
5259
self._loop = get_loop()
5360
self._client = client
5461
self._user_timeout = user_timeout
55-
5662
self.testnet = self._client.testnet
63+
self._max_queue_size = max_queue_size
5764
self.ws_kwargs = {}
5865

5966
def _get_stream_url(self, stream_url: Optional[str] = None):
@@ -84,6 +91,7 @@ def _get_socket(
8491
exit_coro=lambda p: self._exit_socket(f"{socket_type}_{p}"),
8592
is_binary=is_binary,
8693
https_proxy=self._client.https_proxy,
94+
max_queue_size=self._max_queue_size,
8795
**self.ws_kwargs,
8896
)
8997

@@ -1202,6 +1210,7 @@ def __init__(
12021210
session_params: Optional[Dict[str, Any]] = None,
12031211
https_proxy: Optional[str] = None,
12041212
loop: Optional[asyncio.AbstractEventLoop] = None,
1213+
max_queue_size: int = 100,
12051214
):
12061215
super().__init__(
12071216
api_key,
@@ -1214,10 +1223,14 @@ def __init__(
12141223
loop,
12151224
)
12161225
self._bsm: Optional[BinanceSocketManager] = None
1226+
self._max_queue_size = max_queue_size
12171227

12181228
async def _before_socket_listener_start(self):
12191229
assert self._client
1220-
self._bsm = BinanceSocketManager(client=self._client)
1230+
self._bsm = BinanceSocketManager(
1231+
client=self._client,
1232+
max_queue_size=self._max_queue_size
1233+
)
12211234

12221235
def _start_async_socket(
12231236
self,
@@ -1226,7 +1239,10 @@ def _start_async_socket(
12261239
params: Dict[str, Any],
12271240
path: Optional[str] = None,
12281241
) -> str:
1242+
start_time = time.time()
12291243
while not self._bsm:
1244+
if time.time() - start_time > 5:
1245+
raise RuntimeError("Binance Socket Manager failed to initialize after 5 seconds")
12301246
time.sleep(0.1)
12311247
socket = getattr(self._bsm, socket_name)(**params)
12321248
socket_path: str = path or socket._path # noqa

binance/ws/threaded_stream.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import logging
23
import threading
34
from typing import Optional, Dict, Any
45

@@ -24,6 +25,7 @@ def __init__(
2425
self._client: Optional[AsyncClient] = None
2526
self._running: bool = True
2627
self._socket_running: Dict[str, bool] = {}
28+
self._log = logging.getLogger(__name__)
2729
self._client_params = {
2830
"api_key": api_key,
2931
"api_secret": api_secret,
@@ -37,12 +39,17 @@ def __init__(
3739
async def _before_socket_listener_start(self): ...
3840

3941
async def socket_listener(self):
40-
self._client = await AsyncClient.create(loop=self._loop, **self._client_params)
41-
await self._before_socket_listener_start()
42+
try:
43+
self._client = await AsyncClient.create(loop=self._loop, **self._client_params)
44+
await self._before_socket_listener_start()
45+
except Exception as e:
46+
self._log.error(f"Failed to create client: {e}")
47+
self.stop()
4248
while self._running:
4349
await asyncio.sleep(0.2)
4450
while self._socket_running:
4551
await asyncio.sleep(0.2)
52+
self._log.info("Socket listener stopped")
4653

4754
async def start_listener(self, socket, path: str, callback):
4855
async with socket as s:
@@ -52,13 +59,19 @@ async def start_listener(self, socket, path: str, callback):
5259
except asyncio.TimeoutError:
5360
...
5461
continue
62+
except Exception as e:
63+
self._log.error(f"Error receiving message: {e}")
64+
msg = {
65+
"e": "error",
66+
"type": e.__class__.__name__,
67+
"m": f"{e}",
68+
}
69+
if not msg:
70+
continue # Handle both async and sync callbacks
71+
if asyncio.iscoroutinefunction(callback):
72+
asyncio.create_task(callback(msg))
5573
else:
56-
if not msg:
57-
continue # Handle both async and sync callbacks
58-
if asyncio.iscoroutinefunction(callback):
59-
await callback(msg)
60-
else:
61-
callback(msg)
74+
callback(msg)
6275
del self._socket_running[path]
6376

6477
def run(self):
@@ -74,6 +87,7 @@ async def stop_client(self):
7487
await self._client.close_connection()
7588

7689
def stop(self):
90+
self._log.debug("Stopping ThreadedApiManager")
7791
if not self._running:
7892
return
7993
self._running = False
@@ -85,6 +99,6 @@ def stop(self):
8599
future.result(timeout=5) # Add timeout to prevent hanging
86100
except Exception as e:
87101
# Log the error but don't raise it
88-
print(f"Error stopping client: {e}")
102+
self._log.error(f"Error stopping client: {e}")
89103
for socket_name in self._socket_running.keys():
90104
self._socket_running[socket_name] = False

docs/depth_cache.rst

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,31 @@ Websocket Errors
153153
----------------
154154

155155
If the underlying websocket is disconnected and is unable to reconnect None is returned for the depth_cache parameter.
156+
If the underlying websocket is disconnected an error msg is passed to the callback and to recv() containing the error message.
157+
In the case the BinanceWebsocketClosed is returned, the websocket will attempt to reconnect 5 times before returning a BinanceUnableToConnect error.
158+
Example:
159+
160+
.. code:: python
161+
162+
depth_cache = await dcm.recv()
163+
if isinstance(depth_cache, dict) and depth_cache.get('e') == 'error':
164+
logger.error(f"Received depth cache error in callback: {depth_cache}")
165+
if type == 'BinanceWebsocketClosed':
166+
# ignore as attempts to reconnect
167+
continue
168+
break
169+
170+
.. code:: python
171+
def handle_depth_cache(depth_cache):
172+
if isinstance(depth_cache, dict) and depth_cache.get('e') == 'error':
173+
logger.error(f"Received depth cache error in callback: {depth_cache}")
174+
type = depth_cache.get('type')
175+
if type == 'BinanceWebsocketClosed':
176+
# Automatically attempts to reconnect
177+
return
178+
dcm.stop()
179+
return
180+
# handle non error cases here
156181
157182
Examples
158183
--------

0 commit comments

Comments
 (0)