1919from websockets .http11 import Response
2020from websockets .datastructures import Headers
2121
22- from .base import LinkBaseAsync
22+ from .base import LinkBaseAsync , _unpack_message
2323
2424
2525class WebsocketLinkBase (LinkBaseAsync ):
@@ -35,7 +35,6 @@ def __init__(self):
3535 self ._event_is_connected = threading .Event ()
3636 self ._event_is_running = threading .Event ()
3737 self ._start_handling_messages = threading .Event ()
38- self ._send_loop = asyncio .new_event_loop ()
3938
4039 self ._websocket_thread = threading .Thread (target = self ._connect , daemon = True )
4140 self ._websocket_thread .start ()
@@ -65,6 +64,7 @@ def __init__(self):
6564 self ._port = 8700
6665 self ._auth_token = secrets .token_urlsafe (32 )
6766 self ._executor = ThreadPoolExecutor (max_workers = 8 )
67+ self ._chunk_buffers = {}
6868 self ._stop = None
6969 super ().__init__ ()
7070
@@ -80,22 +80,50 @@ def _check_auth(self, connection, request):
8080 """Reject WebSocket connections that don't carry a valid token."""
8181 params = parse_qs (urlparse (request .path ).query )
8282 tokens = params .get ("token" , [])
83- if not tokens or tokens [0 ] != self ._auth_token :
83+ if not tokens or not secrets . compare_digest ( tokens [0 ], self ._auth_token ) :
8484 return Response (403 , "Forbidden" , Headers ())
8585 return None
8686
8787 @staticmethod
88- def _is_response (message ):
89- """Quick check if a message is a response (cheap, avoids full deserialization)."""
90- if isinstance ( message , ( memoryview , bytes )):
91- # Binary message: JSON metadata starts at byte 4
92- try :
88+ def _message_type (message ):
89+ """Return the top-level message type, parsing only the JSON header
90+ (not buffer payloads). Returns None on malformed input."""
91+ try :
92+ if isinstance ( message , ( memoryview , bytes )) :
9393 prefix_size = 4 + int .from_bytes (message [:4 ], byteorder = "little" )
94- header = message [4 :prefix_size ]
95- return b'"type":"response"' in bytes (header ) or b'"type": "response"' in bytes (header )
96- except Exception :
97- return False
98- return '"type":"response"' in message or '"type": "response"' in message
94+ header = json .loads (bytes (message [4 :prefix_size ]).decode ("utf-8" ))
95+ else :
96+ header = json .loads (message )
97+ return header .get ("type" ) if isinstance (header , dict ) else None
98+ except Exception :
99+ return None
100+
101+ def _is_response (self , message ):
102+ return self ._message_type (message ) == "response"
103+
104+ def _is_chunk (self , message ):
105+ return isinstance (message , (memoryview , bytes )) and self ._message_type (message ) == "chunk"
106+
107+ def _reassemble_chunk (self , message ):
108+ data , buffers = _unpack_message (message )
109+ pid = data ["parent_request_id" ]
110+ buf = self ._chunk_buffers .get (pid )
111+ if buf is None :
112+ buf = bytearray (data ["total_size" ])
113+ self ._chunk_buffers [pid ] = buf
114+ chunk = buffers [0 ]
115+ offset = data ["offset" ]
116+ buf [offset : offset + len (chunk )] = chunk
117+ if data ["chunk_id" ] + 1 == data ["n_chunks" ]:
118+ del self ._chunk_buffers [pid ]
119+ return bytes (buf )
120+ return None
121+
122+ def _dispatch (self , message ):
123+ if self ._is_response (message ):
124+ self ._on_message (message )
125+ else :
126+ self ._executor .submit (self ._on_message , message )
99127
100128 async def _websocket_handler (self , websocket , path = "" ):
101129 if self ._connection is not None :
@@ -107,13 +135,17 @@ async def _websocket_handler(self, websocket, path=""):
107135 async for message in websocket :
108136 # Handle responses inline to avoid deadlock: if all executor
109137 # threads are blocked waiting for JS responses, queued response
110- # messages would never be processed.
111- if self ._is_response (message ):
112- self ._on_message (message )
138+ # messages would never be processed. Chunks are reassembled
139+ # inline (single-threaded, ordered) then dispatched.
140+ if self ._is_chunk (message ):
141+ full = self ._reassemble_chunk (message )
142+ if full is not None :
143+ self ._dispatch (full )
113144 else :
114- self ._executor . submit ( self . _on_message , message )
145+ self ._dispatch ( message )
115146 finally :
116147 self ._connection = None
148+ self ._chunk_buffers .clear ()
117149
118150 def _connect (self ):
119151 async def start_websocket ():
0 commit comments