forked from sammchardy/python-binance
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwebsocket_api.py
More file actions
169 lines (143 loc) · 7.05 KB
/
websocket_api.py
File metadata and controls
169 lines (143 loc) · 7.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
from typing import Dict, Optional
import asyncio
from websockets import WebSocketClientProtocol # type: ignore
from .constants import WSListenerState
from .reconnecting_websocket import ReconnectingWebsocket
from binance.exceptions import BinanceAPIException, BinanceWebsocketUnableToConnect
class WebsocketAPI(ReconnectingWebsocket):
def __init__(self, url: str, tld: str = "com", testnet: bool = False, https_proxy: Optional[str] = None):
self._tld = tld
self._testnet = testnet
self._responses: Dict[str, asyncio.Future] = {}
self._connection_lock: Optional[asyncio.Lock] = None
# Subscription queues for routing user data stream events
self._subscription_queues: Dict[str, asyncio.Queue] = {}
super().__init__(url=url, prefix="", path="", is_binary=False, https_proxy=https_proxy)
def register_subscription_queue(self, subscription_id: str, queue: asyncio.Queue) -> None:
"""Register a queue to receive events for a specific subscription."""
self._subscription_queues[subscription_id] = queue
def unregister_subscription_queue(self, subscription_id: str) -> None:
"""Unregister a subscription queue."""
self._subscription_queues.pop(subscription_id, None)
@property
def connection_lock(self) -> asyncio.Lock:
if self._connection_lock is None:
loop = asyncio.get_event_loop()
self._connection_lock = asyncio.Lock()
return self._connection_lock
def _handle_message(self, msg):
"""Override message handling to support request-response"""
parsed_msg = super()._handle_message(msg)
self._log.debug(f"Received message: {parsed_msg}")
if parsed_msg is None:
return None
# Check if this is a subscription event (user data stream, etc.)
# These have 'subscriptionId' and 'event' fields instead of 'id'
if "subscriptionId" in parsed_msg and "event" in parsed_msg:
subscription_id = parsed_msg["subscriptionId"]
event = parsed_msg["event"]
# Route to the registered subscription queue if one exists
if subscription_id in self._subscription_queues:
queue = self._subscription_queues[subscription_id]
try:
queue.put_nowait(event)
except asyncio.QueueFull:
self._log.error(f"Subscription queue full for {subscription_id}, dropping event")
except Exception as e:
self._log.error(f"Error putting event in subscription queue for {subscription_id}: {e}")
return None # Don't put in main queue
else:
# No registered queue, return event for main queue (backward compat)
return event
req_id, exception = None, None
if "id" in parsed_msg:
req_id = parsed_msg["id"]
if "status" in parsed_msg:
if parsed_msg["status"] != 200:
exception = BinanceAPIException(
parsed_msg, parsed_msg["status"], self.json_dumps(parsed_msg["error"])
)
if req_id is not None and req_id in self._responses:
if exception is not None:
self._responses[req_id].set_exception(exception)
else:
self._responses[req_id].set_result(parsed_msg)
return None # Don't queue request-response messages
elif exception is not None:
raise exception
else:
self._log.warning(f"WS api receieved unknown message: {parsed_msg}")
return None
async def _ensure_ws_connection(self) -> None:
"""Ensure WebSocket connection is established and ready
This function will:
1. Check if connection exists and is streaming
2. Attempt to connect if not
3. Wait for connection to be ready
4. Handle reconnection if needed
"""
async with self.connection_lock:
try:
if (
self.ws is None
or (isinstance(self.ws, WebSocketClientProtocol) and self.ws.closed)
or self.ws_state != WSListenerState.STREAMING
):
await self.connect()
# Wait for connection to be ready
retries = 0
while (
self.ws_state != WSListenerState.STREAMING
and retries < self.MAX_RECONNECTS
):
if self.ws_state == WSListenerState.RECONNECTING:
self._log.info("Connection is reconnecting, waiting...")
await self._wait_for_reconnect()
elif self.ws is None or self.ws.closed:
self._log.info("Connection lost, reconnecting...")
await self.connect()
retries += 1
await asyncio.sleep(self.MIN_RECONNECT_WAIT)
if self.ws_state != WSListenerState.STREAMING:
raise BinanceWebsocketUnableToConnect(
f"Failed to establish connection after {retries} attempts"
)
self._log.debug("WebSocket connection established")
except Exception as e:
self._log.error(f"Error ensuring WebSocket connection: {e}")
raise BinanceWebsocketUnableToConnect(f"Connection failed: {str(e)}")
async def request(self, id: str, payload: dict) -> dict:
"""Send request and wait for response"""
await self._ensure_ws_connection()
# Create future for response
future = asyncio.Future()
self._responses[id] = future
try:
# Send request
if self.ws is None:
raise BinanceWebsocketUnableToConnect(
"Trying to send request while WebSocket is not connected"
)
await self.ws.send(self.json_dumps(payload))
# Wait for response
response = await asyncio.wait_for(future, timeout=self.TIMEOUT)
# Check for errors
if "error" in response:
raise BinanceWebsocketUnableToConnect(response["error"])
return response.get("result", response)
except asyncio.TimeoutError:
raise BinanceWebsocketUnableToConnect("Request timed out")
except Exception as e:
raise e
finally:
self._responses.pop(id, None)
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Clean up responses before closing"""
response_ids = list(self._responses.keys()) # Create a copy of keys
for req_id in response_ids:
future = self._responses.pop(req_id) # Remove and get the future
if not future.done():
future.set_exception(
BinanceWebsocketUnableToConnect("WebSocket closing")
)
await super().__aexit__(exc_type, exc_val, exc_tb)