@@ -51,10 +51,14 @@ class WebSocketConnection:
5151 response_types (dict): Map of stream names to their response types.
5252 ws_type (str): Type of WebSocket connection (API or Stream).
5353 websocket (aiohttp.ClientWebSocketResponse): The WebSocket response object.
54+ reconnect (bool): Flag indicating if the connection should reconnect.
55+ is_session_log_on (bool): Flag indicating if the session is logged on.
56+ session_logon_request (Optional[dict]): The session logon request data.
57+ url_path (Optional[str]): The URL path for the WebSocket connection.
5458 """
5559
5660 def __init__ (
57- self , websocket : aiohttp .ClientWebSocketResponse , id : Union [str , int ], ws_type : str
61+ self , websocket : aiohttp .ClientWebSocketResponse , id : Union [str , int ], ws_type : str , url_path : Optional [ str ] = None
5862 ):
5963 self .id = id
6064 self .pending_request = {}
@@ -65,6 +69,7 @@ def __init__(
6569 self .reconnect = False
6670 self .is_session_log_on = False
6771 self .session_logon_request = None
72+ self .url_path = url_path
6873
6974
7075class WebSocketCommon :
@@ -91,23 +96,27 @@ async def connect(
9196 url : str ,
9297 configuration : Union [ConfigurationWebSocketAPI , ConfigurationWebSocketStreams ],
9398 ws_id : Union [str , int ] = None ,
99+ url_paths : Optional [list [str ]] = None ,
94100 ):
95101 """Connect to the Binance WebSocket server.
96102
97103 Args:
98104 url (str): WebSocket URL.
99105 configuration (Union[ConfigurationWebSocketAPI, ConfigurationWebSocketStreams]): Configuration object.
100106 ws_id (Union[str, int]): Optional WebSocket ID for the connection.
107+ url_paths (Optional[list[str]]): Optional list of URL paths for the connection.
101108 """
102109
103110 try :
104111 if self .session is None :
105112 self .session = aiohttp .ClientSession ()
106- if configuration .mode == WebsocketMode .POOL :
107- for i in range (configuration .pool_size ):
108- await self .init_connection (url , configuration , ws_id )
109- else :
110- await self .init_connection (url , configuration , ws_id )
113+
114+ pool_size = configuration .pool_size if configuration .mode == WebsocketMode .POOL else 1
115+ urls = url_paths if url_paths else [None ]
116+
117+ for url_path in urls :
118+ for _ in range (pool_size ):
119+ await self .init_connection (url , configuration , ws_id = ws_id , url_path = url_path )
111120 return self
112121 except Exception as e :
113122 logging .error (f"WebSocket failed to connect: { e } " )
@@ -116,13 +125,15 @@ async def init_connection(
116125 self ,
117126 url ,
118127 configuration : Union [ConfigurationWebSocketAPI , ConfigurationWebSocketStreams ],
128+ url_path : Optional [str ] = None ,
119129 ws_id : Union [str , int ] = None ,
120130 ):
121131 """Initialize a WebSocket connection.
122132
123133 Args:
124134 url (str): WebSocket URL.
125135 configuration (Union[ConfigurationWebSocketAPI, ConfigurationWebSocketStreams]): Configuration object.
136+ url_path (Optional[str]): Optional URL path for the connection.
126137 ws_id (str): Optional WebSocket ID for the connection.
127138 """
128139
@@ -138,6 +149,9 @@ async def init_connection(
138149 url = f"{ url } ?timeUnit={ configuration .time_unit .value } "
139150 logging .info (f"Connecting to { url } with proxy { proxy } " )
140151
152+ if url_path :
153+ url = url .replace ("/stream" , f"/{ url_path } /stream" )
154+
141155 if type (configuration ).__name__ == "ConfigurationWebSocketAPI" :
142156 websocket = await self .session .ws_connect (
143157 url ,
@@ -168,7 +182,7 @@ async def init_connection(
168182 id = ws_id if ws_id else get_uuid ()
169183
170184 logging .info (f"Establishing Websocket connection with id { id } to: { url } " )
171- connection = WebSocketConnection (websocket , id , type (configuration ).__name__ )
185+ connection = WebSocketConnection (websocket , id , type (configuration ).__name__ , url_path )
172186
173187 self .connections .append (connection )
174188
@@ -472,22 +486,37 @@ async def close_connection(
472486
473487
474488class WebSocketStreamBase (WebSocketCommon ):
475- def __init__ (self , configuration : ConfigurationWebSocketStreams , id_strict_int : Optional [bool ] = False ):
489+ def __init__ (
490+ self ,
491+ configuration : ConfigurationWebSocketStreams ,
492+ id_strict_int : Optional [bool ] = False ,
493+ url_paths : Optional [str ] = None
494+ ):
495+ """Initialize the WebSocketStreamBase class.
496+
497+ Args:
498+ configuration (ConfigurationWebSocketStreams): Configuration object.
499+ id_strict_int (Optional[bool]): Whether to use strict integer IDs.
500+ url_paths (Optional[str]): URL paths for the WebSocket connection.
501+ """
502+
476503 if not configuration .stream_url .endswith ("stream" ):
477504 configuration .stream_url = configuration .stream_url + "/stream"
478505 super ().__init__ (configuration )
479506 self .configuration = configuration
480507 self .id_strict_int = id_strict_int
508+ self .url_paths = url_paths
481509
482510 async def create_connection (self ):
483511 """Create a WebSocket connection.
484512
485513 Returns:
486514 WebSocketConnection: The created WebSocket connection.
487515 """
488- return await self .connect (self .configuration .stream_url , self .configuration )
489516
490- async def subscribe (self , streams : list [str ], response_model : Type [T ] = None ):
517+ return await self .connect (self .configuration .stream_url , self .configuration , url_paths = self .url_paths )
518+
519+ async def subscribe (self , streams : list [str ], response_model : Type [T ] = None , stream_url : Optional [str ] = None ):
491520 """Subscribe to a list of streams.
492521
493522 Args:
@@ -516,15 +545,20 @@ async def subscribe(self, streams: list[str], response_model: Type[T] = None):
516545 ]
517546
518547 for stream in streams :
548+ if stream_url :
549+ candidates = [c for c in self .connections if c .url_path == stream_url ]
550+ else :
551+ candidates = self .connections
552+
519553 if self .configuration .mode == WebsocketMode .SINGLE :
520- connection = self . connections [0 ]
554+ connection = candidates [0 ] if candidates else None
521555 else :
522- connection = self .connections [
523- self .round_robin_index % len (self . connections )
524- ]
525- self . round_robin_index = ( self . round_robin_index + 1 ) % len (
526- self . connections
527- )
556+ connection = candidates [ self .round_robin_index % len ( candidates )] if candidates else None
557+ self . round_robin_index = ( self .round_robin_index + 1 ) % len (candidates ) if candidates else 0
558+
559+ if connection is None :
560+ logging . warning ( f"No matching connection found for stream: { stream } " )
561+ continue
528562
529563 logging .info (f"Subscribing to streams: { streams } " )
530564 json_msg = {"method" : "SUBSCRIBE" , "params" : streams , "id" : get_random_int () if self .id_strict_int else get_uuid ()}
@@ -957,7 +991,7 @@ def on(self, event: str, callback: Callable[[T], None]) -> None:
957991
958992
959993async def RequestStream (
960- websocket_base : WebSocketStreamBase or WebSocketAPIBase , stream : str , response_model : Type [T ] = None
994+ websocket_base : WebSocketStreamBase or WebSocketAPIBase , stream : str , response_model : Type [T ] = None , stream_url : Optional [ str ] = None
961995) -> RequestStreamHandle [T ]:
962996 """Decorator to create a request stream for a specific stream.
963997
@@ -968,7 +1002,7 @@ async def RequestStream(
9681002 """
9691003
9701004 if isinstance (websocket_base , WebSocketStreamBase ):
971- await websocket_base .subscribe (streams = [stream ], response_model = response_model )
1005+ await websocket_base .subscribe (streams = [stream ], response_model = response_model , stream_url = stream_url )
9721006 else :
9731007 await websocket_base .subscribe_user_data (id = stream , response_model = response_model )
9741008
0 commit comments