|
4 | 4 | import asyncio |
5 | 5 | from traceback import format_exc |
6 | 6 | from concurrent.futures._base import CancelledError |
| 7 | +from aiohttp._websocket.models import WSMessage, WSMsgType |
7 | 8 |
|
8 | 9 | from aiohttp import ClientSession |
9 | 10 | import aiohttp |
| 11 | + |
10 | 12 | from easyrpc.register import ( |
11 | 13 | create_proxy_from_config, |
12 | 14 | Coroutine, |
13 | 15 | Generator, |
14 | 16 | AsyncGenerator, |
15 | 17 | async_generator_asend |
16 | 18 | ) |
17 | | -from easyrpc.auth import encode |
| 19 | +from easyrpc.auth import encode, decode |
18 | 20 | from easyrpc.origin import Origin |
19 | 21 | from easyrpc.generator import RpcGenerator |
20 | 22 | from easyrpc.exceptions import ( |
@@ -250,6 +252,7 @@ async def get_origin_registered_functions(self): |
250 | 252 | self.server.origin(func, namespace=self.namespace) |
251 | 253 |
|
252 | 254 | async def cleanup_proxy_session(self): |
| 255 | + self.log.warning(f"cleanup_proxy_session called") |
253 | 256 | if not self.session_id in self.client_connections: |
254 | 257 | return |
255 | 258 | try: |
@@ -328,14 +331,24 @@ async def ws_receiver(): |
328 | 331 | try: |
329 | 332 | while True: |
330 | 333 | message = await ws.receive() |
| 334 | + self.log.debug(f"ws_receiver got message: {message}") |
| 335 | + |
| 336 | + if message.type == WSMsgType.CLOSE: |
| 337 | + self.log.info(f"Server sent WSCLOSE") |
| 338 | + break |
| 339 | + |
331 | 340 | if message.data == None: |
332 | 341 | break |
333 | 342 |
|
334 | 343 | if self.serialization == 'json': |
335 | 344 | if 'error' in message.data and not 'ws_action' in message.data: |
336 | 345 | break |
337 | 346 |
|
338 | | - message = self.deserialize(message.data) |
| 347 | + self.log.info(message.data) |
| 348 | + try: |
| 349 | + message: WSMessage = self.deserialize(message.data) |
| 350 | + except Exception as e: |
| 351 | + self.log.warning(f"error deserializing message: {repr(e)} - message: {message.data}") |
339 | 352 |
|
340 | 353 | if not 'ws_action' in message: |
341 | 354 | continue |
@@ -433,9 +446,9 @@ async def ws_receiver(): |
433 | 446 | }) |
434 | 447 |
|
435 | 448 | except Exception as e: |
436 | | - if not isinstance(e, CancelledError): |
437 | | - self.log.exception(f"error with ws_receiver") |
438 | | - await self.cleanup_proxy_session() |
| 449 | + self.log.info(f"ws_receiver exiting: reason - {repr(e)}") |
| 450 | + finally: |
| 451 | + await self.cleanup_proxy_session() |
439 | 452 | return ws_receiver |
440 | 453 | async def get_proxy_ws_session(self): |
441 | 454 | """ |
|
0 commit comments