Skip to content

Commit 8b07fd5

Browse files
Resolve "Websocket error propagation lacks usability" (#363)
1 parent ffa0a1a commit 8b07fd5

7 files changed

Lines changed: 83 additions & 51 deletions

File tree

src/kraken/futures/websocket/__init__.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from collections.abc import Callable
2727

2828
from kraken.futures import FuturesWSClient
29+
from kraken.utils.utils import WSState
2930

3031
LOG: logging.Logger = logging.getLogger(__name__)
3132

@@ -53,6 +54,7 @@ def __init__(
5354
endpoint: str,
5455
callback: Callable,
5556
) -> None:
57+
self.state = WSState.INIT
5658
self.__client: FuturesWSClient = client
5759
self.__ws_endpoint: str = endpoint
5860
self.__callback: Any = callback
@@ -86,14 +88,17 @@ async def start(self: ConnectFuturesWebsocket) -> None:
8688

8789
async def stop(self: ConnectFuturesWebsocket) -> None:
8890
"""Stops the websocket connection"""
91+
self.state = WSState.CANCELLING
8992
self.keep_alive = False
9093
if hasattr(self, "task") and not self.task.done():
9194
await self.task
95+
self.state = WSState.CLOSED
9296

9397
async def __run( # noqa: C901
9498
self: ConnectFuturesWebsocket,
9599
event: asyncio.Event,
96100
) -> None:
101+
self.state = WSState.CONNECTING
97102
self.__new_challenge = None
98103
self.__last_challenge = None
99104

@@ -103,7 +108,8 @@ async def __run( # noqa: C901
103108
ping_interval=30,
104109
max_queue=None, # FIXME: This is not recommended by the docs https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#module-websockets.asyncio.client
105110
) as socket:
106-
LOG.info("Websocket connected!")
111+
self.state = WSState.CONNECTED
112+
LOG.info("Websocket connection established!")
107113
self.socket = socket
108114

109115
if not event.is_set():
@@ -121,7 +127,6 @@ async def __run( # noqa: C901
121127
except asyncio.CancelledError:
122128
LOG.exception("asyncio.CancelledError")
123129
self.keep_alive = False
124-
await self.__callback({"error": "asyncio.CancelledError"})
125130
else:
126131
try:
127132
message: dict = json.loads(_message)
@@ -148,19 +153,23 @@ async def __run_forever(self: ConnectFuturesWebsocket) -> None:
148153
while self.keep_alive:
149154
await self.__reconnect()
150155
except MaxReconnectError:
156+
self.state = WSState.ERROR
151157
await self.__callback(
152-
{"error": "kraken.exceptions.MaxReconnectError"},
158+
{"python-kraken-sdk": {"error": "kraken.exceptions.MaxReconnectError"}},
153159
)
154160
self.exception_occur = True
155-
except Exception:
161+
except Exception: # pylint: disable=broad-except
162+
self.state = WSState.ERROR
156163
LOG.exception(traceback.format_exc())
157164
self.exception_occur = True
158165

159166
async def close_connection(self: ConnectFuturesWebsocket) -> None:
160167
"""Closes the connection -/ will force reconnect"""
168+
self.state = WSState.CANCELLING
161169
await self.socket.close()
162170

163171
async def __reconnect(self: ConnectFuturesWebsocket) -> None:
172+
self.state = WSState.RECONNECTING
164173
LOG.info("Websocket start connect/reconnect")
165174

166175
self.__reconnect_num += 1
@@ -174,9 +183,8 @@ async def __reconnect(self: ConnectFuturesWebsocket) -> None:
174183
self.__reconnect_num,
175184
)
176185
await asyncio.sleep(reconnect_wait)
177-
LOG.debug("asyncio sleep done")
178-
event: asyncio.Event = asyncio.Event()
179186

187+
event: asyncio.Event = asyncio.Event()
180188
tasks: dict = {
181189
asyncio.ensure_future(
182190
self.__recover_subscription_req_msg(event),
@@ -192,22 +200,23 @@ async def __reconnect(self: ConnectFuturesWebsocket) -> None:
192200
exception_occur = False
193201
for task in finished:
194202
if task.exception():
203+
self.state = WSState.ERRORHANDLING
195204
exception_occur = True
196205
self.__challenge_ready = False
197-
traceback.print_stack()
198206
message = f"{task} got an exception {task.exception()}\n {task.get_stack()}"
199207
LOG.warning(message)
200208
for process in pending:
201-
LOG.warning("pending %s", process)
209+
LOG.warning("Pending %s", process)
202210
try:
203211
process.cancel()
212+
LOG.warning("Cancelled %s", process)
204213
except asyncio.CancelledError:
205-
LOG.exception("CancelledError")
206-
LOG.warning("cancel ok")
207-
await self.__callback({"error": message})
214+
LOG.error("Failed to cancel %s", process)
215+
await self.__callback({"python-kraken-sdk": {"error": message}})
208216
if exception_occur:
209217
break
210-
LOG.warning("Connection closed")
218+
self.state = WSState.CLOSED
219+
LOG.info("Connection closed!")
211220

212221
async def __recover_subscription_req_msg(
213222
self: ConnectFuturesWebsocket,

src/kraken/spot/funding.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ def withdraw_funds(
341341
... key="MyPolkadotWallet"
342342
... amount=4
343343
... )
344-
{ 'refid': 'I7KGS6-UFMTTQ-AGBSO6T'}
344+
{'refid': 'I7KGS6-UFMTTQ-AGBSO6T'}
345345
"""
346346
params: dict = {"asset": asset, "key": str(key), "amount": str(amount)}
347347
if defined(max_fee):

src/kraken/spot/websocket/__init__.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
from kraken.spot import SpotAsyncClient
1919
from kraken.spot.websocket.connectors import ConnectSpotWebsocket
20-
from kraken.utils.utils import deprecated
20+
from kraken.utils.utils import WSState, deprecated
2121

2222
if TYPE_CHECKING:
2323
from collections.abc import Callable
@@ -51,6 +51,8 @@ class SpotWSClientBase(SpotAsyncClient):
5151
LOG: logging.Logger = logging.getLogger(__name__)
5252
PROD_ENV_URL: str = "ws.kraken.com"
5353
AUTH_PROD_ENV_URL: str = "ws-auth.kraken.com"
54+
# Changing this can cause errors, as this class is designed for v2.
55+
API_V: str = "/v2"
5456

5557
def __init__( # nosec: B107
5658
self: SpotWSClientBase,
@@ -61,7 +63,7 @@ def __init__( # nosec: B107
6163
no_public: bool = False,
6264
) -> None:
6365
super().__init__(key=key, secret=secret)
64-
66+
self.state: WSState = WSState.INIT
6567
self._is_auth: bool = bool(key and secret)
6668
self.__callback: Callable | None = callback
6769
self._pub_conn: ConnectSpotWebsocket | None = None
@@ -77,16 +79,12 @@ def exception_occur(self: SpotWSClientBase) -> bool:
7779

7880
# --------------------------------------------------------------------------
7981
# Internals
80-
def __prepare_connect(
81-
self: SpotWSClientBase,
82-
*,
83-
no_public: bool,
84-
) -> None:
82+
def __prepare_connect(self: SpotWSClientBase, *, no_public: bool) -> None:
8583
"""Set up functions and attributes based on the API version."""
8684

8785
# pylint: disable=invalid-name
88-
self.PROD_ENV_URL += "/v2"
89-
self.AUTH_PROD_ENV_URL += "/v2"
86+
self.PROD_ENV_URL += self.API_V
87+
self.AUTH_PROD_ENV_URL += self.API_V
9088

9189
self._pub_conn = (
9290
ConnectSpotWebsocket(
@@ -112,6 +110,7 @@ def __prepare_connect(
112110

113111
async def start(self: SpotWSClientBase) -> None:
114112
"""Method to start the websocket connection."""
113+
self.state = WSState.CONNECTING
115114
if self._pub_conn:
116115
await self._pub_conn.start()
117116
if self._priv_conn:
@@ -126,39 +125,45 @@ async def start(self: SpotWSClientBase) -> None:
126125
else:
127126
public_conntection_waiting = False
128127

129-
private_conection_waiting = True
128+
private_connection_waiting = True
130129
if self._priv_conn:
131130
if self._priv_conn.socket is not None:
132-
private_conection_waiting = False
131+
private_connection_waiting = False
133132
else:
134-
private_conection_waiting = False
133+
private_connection_waiting = False
135134

136-
if not public_conntection_waiting and not private_conection_waiting:
135+
if not public_conntection_waiting and not private_connection_waiting:
137136
break
138137
await async_sleep(0.2)
139138
timeout += 0.2
140139
else:
140+
self.state = WSState.ERROR
141141
raise TimeoutError("Could not connect to the Kraken API!")
142+
self.state = WSState.CONNECTED
142143

143144
async def close(self: SpotWSClientBase) -> None:
144145
"""Method to close the websocket connection."""
146+
self.state = WSState.CANCELLING
145147
if self._pub_conn:
146148
await self._pub_conn.stop()
147149
if self._priv_conn:
148150
await self._priv_conn.stop()
149151
await super().close()
152+
self.state = WSState.CLOSED
150153

151154
@deprecated(
152155
"The 'stop' function is deprecated and will be replaced by"
153156
" 'close' in a future release.",
154157
)
155158
async def stop(self: SpotWSClientBase) -> None:
156159
"""Method to stop the websocket connection."""
160+
self.state = WSState.CANCELLING
157161
if self._pub_conn:
158162
await self._pub_conn.stop()
159163
if self._priv_conn:
160164
await self._priv_conn.stop()
161165
await super().close()
166+
self.state = WSState.CLOSED
162167

163168
async def on_message(
164169
self: SpotWSClientBase,

src/kraken/spot/websocket/connectors.py

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from websockets.asyncio.client import connect
2828

2929
from kraken.exceptions import MaxReconnectError
30+
from kraken.utils.utils import WSState
3031

3132
if TYPE_CHECKING:
3233
from collections.abc import Callable
@@ -68,6 +69,7 @@ def __init__(
6869
*,
6970
is_auth: bool = False,
7071
) -> None:
72+
self.state: WSState = WSState.INIT
7173
self.__client: SpotWSClientBase = client
7274
self.__ws_endpoint: str = endpoint
7375
self.__callback: Callable = callback
@@ -104,16 +106,19 @@ async def start(self: ConnectSpotWebsocketBase) -> None:
104106
hasattr(self, "task")
105107
and not self.task.done() # pylint: disable=access-member-before-definition
106108
):
109+
LOG.warning("Websocket connection already running!")
107110
return
108111
self.task: asyncio.Task = asyncio.create_task(
109112
self.__run_forever(),
110113
)
111114

112115
async def stop(self: ConnectSpotWebsocketBase) -> None:
113116
"""Stops the websocket connection"""
117+
self.state = WSState.CANCELLING
114118
self.keep_alive = False
115119
if hasattr(self, "task") and not self.task.done():
116120
await self.task
121+
self.state = WSState.CLOSED
117122

118123
async def __run(self: ConnectSpotWebsocketBase, event: asyncio.Event) -> None:
119124
"""
@@ -123,6 +128,7 @@ async def __run(self: ConnectSpotWebsocketBase, event: asyncio.Event) -> None:
123128
:param event: Event used to control the information flow
124129
:type event: asyncio.Event
125130
"""
131+
self.state = WSState.CONNECTING
126132
self._last_ping = time()
127133
self.ws_conn_details = (
128134
None if not self.__is_auth else await self.__client.get_ws_token()
@@ -135,9 +141,10 @@ async def __run(self: ConnectSpotWebsocketBase, event: asyncio.Event) -> None:
135141
ping_interval=30,
136142
max_queue=None, # FIXME: This is not recommended by the docs https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#module-websockets.asyncio.client
137143
) as socket:
138-
LOG.info("Websocket connected!")
139-
self.socket = socket
144+
self.state = WSState.CONNECTED
145+
LOG.info("Websocket connection established!")
140146

147+
self.socket = socket
141148
if not event.is_set():
142149
await self.send_ping()
143150
event.set()
@@ -153,7 +160,6 @@ async def __run(self: ConnectSpotWebsocketBase, event: asyncio.Event) -> None:
153160
except asyncio.CancelledError:
154161
LOG.exception("asyncio.CancelledError")
155162
self.keep_alive = False
156-
await self.__callback({"error": "asyncio.CancelledError"})
157163
else:
158164
try:
159165
message: dict = json.loads(_message)
@@ -172,22 +178,19 @@ async def __run_forever(self: ConnectSpotWebsocketBase) -> None:
172178
while self.keep_alive:
173179
await self.__reconnect()
174180
except MaxReconnectError:
181+
self.state = WSState.ERROR
175182
await self.__callback(
176-
{"error": "kraken.exceptions.MaxReconnectError"},
183+
{"python-kraken-sdk": {"error": "kraken.exceptions.MaxReconnectError"}},
177184
)
178185
self.exception_occur = True
179-
except Exception as exc:
180-
traceback_: str = traceback.format_exc()
181-
LOG.exception(
182-
"%s: %s",
183-
exc,
184-
traceback_,
185-
)
186-
await self.__callback({"error": traceback_})
186+
except Exception: # pylint: disable=broad-except
187+
self.state = WSState.ERROR
188+
LOG.exception(traceback.format_exc())
187189
self.exception_occur = True
188190

189191
async def close_connection(self: ConnectSpotWebsocketBase) -> None:
190192
"""Closes the websocket connection and thus forces a reconnect"""
193+
self.state = WSState.CANCELLING
191194
await self.socket.close()
192195

193196
async def __reconnect(self: ConnectSpotWebsocketBase) -> None:
@@ -198,6 +201,7 @@ async def __reconnect(self: ConnectSpotWebsocketBase) -> None:
198201
:raises KrakenException.MaxReconnectError: If there are to many
199202
reconnect retries
200203
"""
204+
self.state = WSState.RECONNECTING
201205
LOG.info("Websocket start connect/reconnect")
202206

203207
self.__reconnect_num += 1
@@ -225,25 +229,25 @@ async def __reconnect(self: ConnectSpotWebsocketBase) -> None:
225229
tasks,
226230
return_when=asyncio.FIRST_EXCEPTION,
227231
)
228-
exception_occur: bool = False
232+
exception_occur = False
229233
for task in finished:
230234
if task.exception():
235+
self.state = WSState.ERRORHANDLING
231236
exception_occur = True
232-
traceback.print_stack()
233-
message: str = (
234-
f"{task} got an exception {task.exception()}\n {task.get_stack()}"
235-
)
237+
message = f"{task} got an exception {task.exception()}\n {task.get_stack()}"
236238
LOG.warning(message)
237239
for process in pending:
238240
LOG.warning("pending %s", process)
239241
try:
240242
process.cancel()
243+
LOG.warning("Cancelled %s", process)
241244
except asyncio.CancelledError:
242-
LOG.exception("asyncio.CancelledError")
243-
await self.__callback({"error": message})
245+
LOG.error("Failed to cancel %s", process)
246+
await self.__callback({"python-kraken-sdk": {"error": message}})
244247
if exception_occur:
245248
break
246-
LOG.warning("Connection closed")
249+
self.state = WSState.CLOSED
250+
LOG.info("Connection closed!")
247251

248252
def __get_reconnect_wait(
249253
self: ConnectSpotWebsocketBase,
@@ -353,10 +357,10 @@ async def _recover_subscriptions(
353357
it is set to ``True`` - which is when the connection is ready)
354358
:type event: asyncio.Event
355359
"""
356-
log_msg: str = (
357-
f'Recover {"authenticated" if self.is_auth else "public"} subscriptions {self._subscriptions}'
360+
LOG.info(
361+
"%s: waiting",
362+
log_msg := f'Recover {"authenticated" if self.is_auth else "public"} subscriptions {self._subscriptions}',
358363
)
359-
LOG.info("%s: waiting", log_msg)
360364
await event.wait()
361365

362366
for subscription in self._subscriptions:

0 commit comments

Comments
 (0)