11import asyncio
2+ import uuid
23from binance .async_client import AsyncClient
34from binance .ws .reconnecting_websocket import ReconnectingWebsocket
45from binance .ws .constants import KEEPALIVE_TIMEOUT
@@ -28,23 +29,34 @@ def __init__(
2829 self ._client = client
2930 self ._user_timeout = user_timeout or KEEPALIVE_TIMEOUT
3031 self ._timer = None
31- self ._listen_key = None
32+ self ._subscription_id = None
33+ self ._listen_key = None # Used for non spot stream types
3234
3335 async def __aexit__ (self , * args , ** kwargs ):
3436 if not self ._path :
3537 return
3638 if self ._timer :
3739 self ._timer .cancel ()
3840 self ._timer = None
41+ # Clean up subscription if it exists
42+ if self ._subscription_id is not None :
43+ await self ._unsubscribe_from_user_data_stream ()
3944 await super ().__aexit__ (* args , ** kwargs )
4045
4146 def _build_path (self ):
4247 self ._path = self ._listen_key
4348 time_unit = getattr (self ._client , "TIME_UNIT" , None )
44- if time_unit and self . _keepalive_type == "user" :
49+ if time_unit :
4550 self ._path = f"{ self ._listen_key } ?timeUnit={ time_unit } "
4651
4752 async def _before_connect (self ):
53+ if self ._keepalive_type == "user" :
54+ self ._subscription_id = await self ._subscribe_to_user_data_stream ()
55+ # Reuse the ws_api connection that's already established
56+ self .ws = self ._client .ws_api .ws
57+ self .ws_state = self ._client .ws_api .ws_state
58+ self ._queue = self ._client .ws_api ._queue
59+ return
4860 if not self ._listen_key :
4961 self ._listen_key = await self ._get_listen_key ()
5062 self ._build_path ()
@@ -57,6 +69,32 @@ def _start_socket_timer(self):
5769 self ._user_timeout , lambda : asyncio .create_task (self ._keepalive_socket ())
5870 )
5971
72+ async def _subscribe_to_user_data_stream (self ):
73+ """Subscribe to user data stream using WebSocket API"""
74+ params = {
75+ "id" : str (uuid .uuid4 ()),
76+ }
77+ response = await self ._client ._ws_api_request (
78+ "userDataStream.subscribe.signature" ,
79+ signed = True ,
80+ params = params
81+ )
82+ return response .get ("subscriptionId" )
83+
84+ async def _unsubscribe_from_user_data_stream (self ):
85+ """Unsubscribe from user data stream using WebSocket API"""
86+ if self ._keepalive_type == "user" and self ._subscription_id is not None :
87+ params = {
88+ "id" : str (uuid .uuid4 ()),
89+ "subscriptionId" : self ._subscription_id ,
90+ }
91+ await self ._client ._ws_api_request (
92+ "userDataStream.unsubscribe" ,
93+ signed = False ,
94+ params = params
95+ )
96+ self ._subscription_id = None
97+
6098 async def _get_listen_key (self ):
6199 if self ._keepalive_type == "user" :
62100 listen_key = await self ._client .stream_get_listen_key ()
@@ -77,21 +115,22 @@ async def _get_listen_key(self):
77115
78116 async def _keepalive_socket (self ):
79117 try :
118+ if self ._keepalive_type == "user" :
119+ return
80120 listen_key = await self ._get_listen_key ()
81121 if listen_key != self ._listen_key :
82122 self ._log .debug ("listen key changed: reconnect" )
123+ self ._listen_key = listen_key
83124 self ._build_path ()
84125 self ._reconnect ()
85126 else :
86127 self ._log .debug ("listen key same: keepalive" )
87- if self ._keepalive_type == "user" :
88- await self ._client .stream_keepalive (self ._listen_key )
89- elif self ._keepalive_type == "margin" : # cross-margin
128+ if self ._keepalive_type == "margin" : # cross-margin
90129 await self ._client .margin_stream_keepalive (self ._listen_key )
91130 elif self ._keepalive_type == "futures" :
92131 await self ._client .futures_stream_keepalive (self ._listen_key )
93132 elif self ._keepalive_type == "coin_futures" :
94- await self ._client .futures_coin_stream_keepalive (self ._listen_key )
133+ await self ._client .futures_coin_stream_keepalive (self ._listen_key )
95134 elif self ._keepalive_type == "portfolio_margin" :
96135 await self ._client .papi_stream_keepalive (self ._listen_key )
97136 else : # isolated margin
0 commit comments