forked from sammchardy/python-binance
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_user_socket_integration.py
More file actions
191 lines (143 loc) · 7.72 KB
/
test_user_socket_integration.py
File metadata and controls
191 lines (143 loc) · 7.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
"""
Integration tests for user socket with ws_api subscription routing.
These tests verify that the user socket correctly:
1. Uses ws_api for subscription (not creating its own connection)
2. Has its own queue for receiving events (not sharing ws_api's queue)
3. Does not start its own read loop (ws_api handles reading)
4. Properly cleans up subscriptions on exit
Requirements:
- Binance testnet API credentials (configured in conftest.py)
- Network connectivity to testnet
Run with: pytest tests/test_user_socket_integration.py -v
"""
import asyncio
import pytest
import pytest_asyncio
from binance import BinanceSocketManager
@pytest_asyncio.fixture
async def socket_manager(clientAsync):
"""Create a BinanceSocketManager using the clientAsync fixture from conftest."""
return BinanceSocketManager(clientAsync)
class TestUserSocketArchitecture:
"""Tests verifying the user socket architecture is correct."""
@pytest.mark.asyncio
async def test_user_socket_has_separate_queue(self, clientAsync, socket_manager):
"""User socket should have its own queue, not share ws_api's queue."""
user_socket = socket_manager.user_socket()
async with user_socket:
# Queues should be different objects
assert user_socket._queue is not clientAsync.ws_api._queue, \
"user_socket should have its own queue, not share ws_api's queue"
@pytest.mark.asyncio
async def test_user_socket_uses_ws_api_subscription(self, clientAsync, socket_manager):
"""User socket should use ws_api subscription mechanism."""
user_socket = socket_manager.user_socket()
async with user_socket:
# Should be marked as using ws_api subscription
assert user_socket._uses_ws_api_subscription is True, \
"user_socket should be marked as using ws_api subscription"
# Should have a subscription ID
assert user_socket._subscription_id is not None, \
"user_socket should have a subscription ID"
@pytest.mark.asyncio
async def test_user_socket_no_read_loop(self, clientAsync, socket_manager):
"""User socket should NOT have its own read loop (ws_api handles reading)."""
user_socket = socket_manager.user_socket()
async with user_socket:
# user_socket should not have started its own read loop
assert user_socket._handle_read_loop is None, \
"user_socket should not have its own read loop"
# ws_api should have a read loop
assert clientAsync.ws_api._handle_read_loop is not None, \
"ws_api should have a read loop"
@pytest.mark.asyncio
async def test_user_socket_queue_registered_with_ws_api(self, clientAsync, socket_manager):
"""User socket's queue should be registered with ws_api for event routing."""
user_socket = socket_manager.user_socket()
async with user_socket:
sub_id = user_socket._subscription_id
# Subscription should be registered in ws_api
assert sub_id in clientAsync.ws_api._subscription_queues, \
"Subscription should be registered with ws_api"
# Registered queue should be user_socket's queue
registered_queue = clientAsync.ws_api._subscription_queues[sub_id]
assert registered_queue is user_socket._queue, \
"Registered queue should be user_socket's queue"
@pytest.mark.asyncio
async def test_user_socket_cleanup_on_exit(self, clientAsync, socket_manager):
"""User socket should unregister from ws_api on exit."""
user_socket = socket_manager.user_socket()
async with user_socket:
sub_id = user_socket._subscription_id
# Verify it's registered while connected
assert sub_id in clientAsync.ws_api._subscription_queues
# After exit, subscription should be unregistered
assert sub_id not in clientAsync.ws_api._subscription_queues, \
"Subscription should be unregistered after exit"
class TestUserSocketFunctionality:
"""Tests verifying user socket functionality works correctly."""
@pytest.mark.asyncio
async def test_user_socket_recv_timeout(self, clientAsync, socket_manager):
"""User socket recv() should timeout gracefully when no events."""
user_socket = socket_manager.user_socket()
async with user_socket:
# recv() should timeout without errors (no events on quiet account)
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(user_socket.recv(), timeout=2)
@pytest.mark.asyncio
async def test_user_socket_context_manager(self, clientAsync, socket_manager):
"""User socket should work as async context manager."""
user_socket = socket_manager.user_socket()
# Should not be connected initially
assert user_socket._subscription_id is None
async with user_socket:
# Should be connected inside context
assert user_socket._subscription_id is not None
assert user_socket._uses_ws_api_subscription is True
# Subscription ID is cleared after unsubscribe
assert user_socket._subscription_id is None
class TestNonUserSockets:
"""Tests verifying other socket types still work normally."""
@pytest.mark.asyncio
async def test_margin_socket_not_using_ws_api_subscription(self, clientAsync, socket_manager):
"""Non-user KeepAliveWebsockets (like margin socket) should not use ws_api subscription."""
# margin_socket is a KeepAliveWebsocket with keepalive_type="margin"
# Create it but don't connect - just check the flag
margin_socket = socket_manager.margin_socket()
# Before connecting, the flag should be False (default)
assert margin_socket._uses_ws_api_subscription is False, \
"Margin socket should not use ws_api subscription"
# The _keepalive_type should be "margin", not "user"
assert margin_socket._keepalive_type == "margin"
class TestWsApiSubscriptionRouting:
"""Tests verifying ws_api correctly routes subscription events."""
@pytest.mark.asyncio
async def test_ws_api_has_subscription_queues(self, clientAsync):
"""ws_api should have subscription queues dict."""
# Ensure ws_api is initialized
await clientAsync.ws_api._ensure_ws_connection()
assert hasattr(clientAsync.ws_api, '_subscription_queues'), \
"ws_api should have _subscription_queues attribute"
assert isinstance(clientAsync.ws_api._subscription_queues, dict), \
"_subscription_queues should be a dict"
@pytest.mark.asyncio
async def test_ws_api_register_unregister_queue(self, clientAsync):
"""ws_api should be able to register and unregister queues."""
await clientAsync.ws_api._ensure_ws_connection()
test_queue = asyncio.Queue()
test_sub_id = "test_subscription_123"
# Register
clientAsync.ws_api.register_subscription_queue(test_sub_id, test_queue)
assert test_sub_id in clientAsync.ws_api._subscription_queues
assert clientAsync.ws_api._subscription_queues[test_sub_id] is test_queue
# Unregister
clientAsync.ws_api.unregister_subscription_queue(test_sub_id)
assert test_sub_id not in clientAsync.ws_api._subscription_queues
@pytest.mark.asyncio
async def test_ws_api_unregister_nonexistent_is_safe(self, clientAsync):
"""Unregistering a non-existent subscription should not raise."""
await clientAsync.ws_api._ensure_ws_connection()
# Should not raise
clientAsync.ws_api.unregister_subscription_queue("nonexistent_sub_id")
if __name__ == "__main__":
pytest.main([__file__, "-v"])