Skip to content

Commit f16d34c

Browse files
authored
fix: reconnect keep alive (#1637)
* fix: reconnect keep alive * lint
1 parent a76774b commit f16d34c

File tree

2 files changed

+224
-9
lines changed

2 files changed

+224
-9
lines changed

binance/ws/keepalive_websocket.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ async def _before_connect(self):
6262
self._build_path()
6363

6464
async def _after_connect(self):
65-
self._start_socket_timer()
65+
if self._timer is None:
66+
self._start_socket_timer()
6667

6768
def _start_socket_timer(self):
6869
self._timer = self._loop.call_later(
@@ -75,9 +76,7 @@ async def _subscribe_to_user_data_stream(self):
7576
"id": str(uuid.uuid4()),
7677
}
7778
response = await self._client._ws_api_request(
78-
"userDataStream.subscribe.signature",
79-
signed=True,
80-
params=params
79+
"userDataStream.subscribe.signature", signed=True, params=params
8180
)
8281
return response.get("subscriptionId")
8382

@@ -89,9 +88,7 @@ async def _unsubscribe_from_user_data_stream(self):
8988
"subscriptionId": self._subscription_id,
9089
}
9190
await self._client._ws_api_request(
92-
"userDataStream.unsubscribe",
93-
signed=False,
94-
params=params
91+
"userDataStream.unsubscribe", signed=False, params=params
9592
)
9693
self._subscription_id = None
9794

@@ -130,7 +127,7 @@ async def _keepalive_socket(self):
130127
elif self._keepalive_type == "futures":
131128
await self._client.futures_stream_keepalive(self._listen_key)
132129
elif self._keepalive_type == "coin_futures":
133-
await self._client.futures_coin_stream_keepalive(self._listen_key)
130+
await self._client.futures_coin_stream_keepalive(self._listen_key)
134131
elif self._keepalive_type == "portfolio_margin":
135132
await self._client.papi_stream_keepalive(self._listen_key)
136133
else: # isolated margin
@@ -141,4 +138,7 @@ async def _keepalive_socket(self):
141138
except Exception as e:
142139
self._log.error(f"error in keepalive_socket: {e}")
143140
finally:
144-
self._start_socket_timer()
141+
if self._timer is not None:
142+
self._start_socket_timer()
143+
else:
144+
self._log.info("skip timer restart - web socket is exiting")

tests/test_keepalive_reconnect.py

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
"""
2+
Test to verify that KeepAliveWebsocket doesn't create duplicate keepalive loops on reconnection.
3+
4+
This test reproduces the issue where reconnection events create duplicate keepalive loops
5+
that continue running indefinitely, leading to resource exhaustion and redundant API calls.
6+
"""
7+
8+
import sys
9+
import asyncio
10+
import pytest
11+
from unittest.mock import AsyncMock, MagicMock
12+
from binance.async_client import AsyncClient
13+
from binance.ws.keepalive_websocket import KeepAliveWebsocket
14+
15+
16+
@pytest.mark.skipif(sys.version_info < (3, 8), reason="websockets_proxy Python 3.8+")
17+
@pytest.mark.asyncio
18+
async def test_no_duplicate_keepalive_loops_on_reconnect():
19+
"""
20+
Test that reconnection doesn't create duplicate keepalive loops.
21+
22+
The bug occurs when:
23+
1. A keepalive loop is running (timer -> keepalive_socket -> timer -> ...)
24+
2. Reconnection happens via _after_connect()
25+
3. A new keepalive loop is started unconditionally
26+
4. The old loop continues running in the background
27+
5. Each reconnection adds another orphaned loop
28+
"""
29+
# Create a mock client
30+
mock_client = MagicMock(spec=AsyncClient)
31+
mock_client.futures_stream_get_listen_key = AsyncMock(
32+
return_value="test_listen_key"
33+
)
34+
mock_client.futures_stream_keepalive = AsyncMock()
35+
36+
# Create the websocket instance
37+
ws = KeepAliveWebsocket(
38+
client=mock_client,
39+
url="wss://fstream.binance.com/",
40+
keepalive_type="futures",
41+
prefix="ws/",
42+
user_timeout=0.1, # Short timeout for faster test
43+
)
44+
45+
# Track how many times _keepalive_socket is called
46+
keepalive_call_count = 0
47+
original_keepalive = ws._keepalive_socket
48+
49+
async def tracked_keepalive():
50+
nonlocal keepalive_call_count
51+
keepalive_call_count += 1
52+
# Call the original method but skip the actual API call
53+
# Just track that it was called
54+
return
55+
56+
ws._keepalive_socket = tracked_keepalive
57+
58+
# Simulate the first connection
59+
await ws._before_connect()
60+
await ws._after_connect()
61+
62+
# Wait for the first timer to trigger
63+
await asyncio.sleep(0.15)
64+
first_call_count = keepalive_call_count
65+
66+
assert first_call_count >= 1, "Keepalive should have been called at least once"
67+
68+
# Simulate a reconnection (this is where the bug occurs)
69+
# In a real scenario, _after_connect() is called again by the reconnection logic
70+
await ws._after_connect()
71+
72+
# Wait for more timer triggers
73+
await asyncio.sleep(0.15)
74+
second_call_count = keepalive_call_count
75+
76+
# Calculate how many calls happened after reconnection
77+
calls_after_reconnect = second_call_count - first_call_count
78+
79+
# With the bug: multiple loops are running, so we'd see 2+ calls per timer period
80+
# Without the bug: only one loop is running, so we'd see ~1 call per timer period
81+
# Allow some margin (up to 2 calls) due to timing
82+
assert calls_after_reconnect <= 2, (
83+
f"Too many keepalive calls after reconnection: {calls_after_reconnect}. "
84+
f"This indicates duplicate keepalive loops are running. "
85+
f"Total calls: {second_call_count}, calls before reconnect: {first_call_count}"
86+
)
87+
88+
# Clean up
89+
if ws._timer:
90+
ws._timer.cancel()
91+
ws._timer = None
92+
93+
94+
@pytest.mark.skipif(sys.version_info < (3, 8), reason="websockets_proxy Python 3.8+")
95+
@pytest.mark.asyncio
96+
async def test_keepalive_stops_after_exit():
97+
"""
98+
Test that keepalive loop stops properly when the websocket exits.
99+
100+
The fix should ensure that when __aexit__ sets _timer to None,
101+
the finally block in _keepalive_socket doesn't restart the timer.
102+
"""
103+
# Create a mock client
104+
mock_client = MagicMock(spec=AsyncClient)
105+
mock_client.futures_stream_get_listen_key = AsyncMock(
106+
return_value="test_listen_key"
107+
)
108+
mock_client.futures_stream_keepalive = AsyncMock()
109+
110+
# Create the websocket instance
111+
ws = KeepAliveWebsocket(
112+
client=mock_client,
113+
url="wss://fstream.binance.com/",
114+
keepalive_type="futures",
115+
prefix="ws/",
116+
user_timeout=0.1, # Short timeout for faster test
117+
)
118+
119+
# Track keepalive calls
120+
keepalive_call_count = 0
121+
122+
async def tracked_keepalive():
123+
nonlocal keepalive_call_count
124+
keepalive_call_count += 1
125+
return
126+
127+
ws._keepalive_socket = tracked_keepalive
128+
129+
# Start the keepalive
130+
await ws._before_connect()
131+
await ws._after_connect()
132+
133+
# Wait for at least one keepalive call
134+
await asyncio.sleep(0.15)
135+
calls_before_exit = keepalive_call_count
136+
assert calls_before_exit >= 1, "Keepalive should have been called before exit"
137+
138+
# Simulate exit by setting timer to None (this is what __aexit__ does)
139+
if ws._timer:
140+
ws._timer.cancel()
141+
ws._timer = None
142+
143+
# Wait to see if more keepalive calls happen (they shouldn't)
144+
await asyncio.sleep(0.15)
145+
calls_after_exit = keepalive_call_count
146+
147+
# After setting _timer to None, no more calls should happen
148+
assert calls_after_exit == calls_before_exit, (
149+
f"Keepalive should not continue after exit. "
150+
f"Calls before exit: {calls_before_exit}, calls after exit: {calls_after_exit}"
151+
)
152+
153+
154+
@pytest.mark.skipif(sys.version_info < (3, 8), reason="websockets_proxy Python 3.8+")
155+
@pytest.mark.asyncio
156+
async def test_multiple_reconnects_no_loop_accumulation():
157+
"""
158+
Test that multiple reconnections don't accumulate keepalive loops.
159+
160+
This is a stress test to ensure the fix works even with many reconnections.
161+
"""
162+
# Create a mock client
163+
mock_client = MagicMock(spec=AsyncClient)
164+
mock_client.futures_stream_get_listen_key = AsyncMock(
165+
return_value="test_listen_key"
166+
)
167+
mock_client.futures_stream_keepalive = AsyncMock()
168+
169+
# Create the websocket instance
170+
ws = KeepAliveWebsocket(
171+
client=mock_client,
172+
url="wss://fstream.binance.com/",
173+
keepalive_type="futures",
174+
prefix="ws/",
175+
user_timeout=0.1, # Short timeout for faster test
176+
)
177+
178+
# Track keepalive calls
179+
keepalive_call_count = 0
180+
181+
async def tracked_keepalive():
182+
nonlocal keepalive_call_count
183+
keepalive_call_count += 1
184+
return
185+
186+
ws._keepalive_socket = tracked_keepalive
187+
188+
# Initial connection
189+
await ws._before_connect()
190+
await ws._after_connect()
191+
192+
# Wait for initial calls
193+
await asyncio.sleep(0.15)
194+
195+
# Simulate 5 reconnections
196+
for i in range(5):
197+
await ws._after_connect()
198+
199+
# Reset counter
200+
keepalive_call_count = 0
201+
202+
# Wait for a timer period
203+
await asyncio.sleep(0.15)
204+
205+
# Should only have ~1 call per timer period, not 6 (one per each connection + reconnections)
206+
# Allow margin of 2 due to timing
207+
assert keepalive_call_count <= 2, (
208+
f"Too many keepalive calls after 5 reconnections: {keepalive_call_count}. "
209+
f"This indicates keepalive loops are accumulating."
210+
)
211+
212+
# Clean up
213+
if ws._timer:
214+
ws._timer.cancel()
215+
ws._timer = None

0 commit comments

Comments
 (0)