Skip to content

Commit 1d13f73

Browse files
committed
Stopped using ExceptionIteratingRequestsError so that CancelledError thrown into async persistent subscription request iterators can be raised directly (this only happens since Python 3.12), also changed in sync client for behavioural consistency.
1 parent e4b1db0 commit 1d13f73

5 files changed

Lines changed: 25 additions & 9 deletions

File tree

kurrentdbclient/exceptions.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
from typing import TYPE_CHECKING
44

5+
from typing_extensions import deprecated
6+
57
if TYPE_CHECKING:
68
from typing import Any
79
from uuid import UUID
@@ -246,6 +248,7 @@ class ReadOnlyReplicaNotFoundError(DiscoveryFailedError):
246248
"""
247249

248250

251+
@deprecated("Actual error raised instead")
249252
class ExceptionIteratingRequestsError(KurrentDBClientError):
250253
"""
251254
Raised when a persistent subscription errors whilst iterating requests.

kurrentdbclient/persistent.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
from kurrentdbclient.events import RecordedEvent
5555
from kurrentdbclient.exceptions import (
5656
CancelledByClientError,
57-
ExceptionIteratingRequestsError,
5857
KurrentDBClientError,
5958
NodeIsNotLeaderError,
6059
ProgrammingError,
@@ -620,7 +619,7 @@ async def stop(self, *, wait_until_stopped: bool = True) -> None:
620619
except grpc.RpcError as e:
621620
raise handle_rpc_error(e) from e
622621
if self._read_reqs.errored:
623-
raise ExceptionIteratingRequestsError from self._read_reqs.errored
622+
raise self._read_reqs.errored
624623

625624
async def ack(self, item: UUID | RecordedEvent) -> None:
626625
await self._read_reqs.ack(event_id=self._get_event_id(item))
@@ -638,6 +637,13 @@ def _get_event_id(item: UUID | RecordedEvent) -> UUID:
638637
return item.ack_id
639638
return item
640639

640+
def __del__(self) -> None:
641+
# Safety net, last chance to cancel the streaming call.
642+
if hasattr(self, "_stream_stream_call"):
643+
self._stream_stream_call.cancel()
644+
else: # pragma: no cover
645+
pass
646+
641647

642648
class PersistentSubscription(
643649
GrpcStreamer, AbstractPersistentSubscription, BasePersistentSubscription
@@ -717,7 +723,7 @@ def _get_next_read_resp(self) -> persistent_pb2.ReadResp:
717723
and "Exception iterating requests!" in details
718724
and self._read_reqs.errored
719725
):
720-
raise ExceptionIteratingRequestsError from self._read_reqs.errored
726+
raise self._read_reqs.errored from None
721727
raise handle_rpc_error(e) from None
722728
assert isinstance(read_resp, persistent_pb2.ReadResp)
723729
return read_resp

kurrentdbclient/streams.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,13 @@ async def stop(self, *, wait_until_stopped: bool = True) -> None:
239239
except grpc.RpcError as e:
240240
raise handle_streams_rpc_error(e) from e
241241

242+
def __del__(self) -> None:
243+
# Safety net, last chance to cancel the streaming call.
244+
if hasattr(self, "_unary_stream_call"):
245+
self._unary_stream_call.cancel()
246+
else: # pragma: no cover
247+
pass
248+
242249

243250
class AsyncCatchupSubscription(AsyncReadResponse, AbstractAsyncCatchupSubscription):
244251
def __init__(

tests/test_client.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
ConsumerTooSlowError,
5151
DeadlineExceededError,
5252
DiscoveryFailedError,
53-
ExceptionIteratingRequestsError,
5453
ExceptionThrownByHandlerError,
5554
FailedPreconditionError,
5655
FollowerNotFoundError,
@@ -3564,10 +3563,10 @@ class NotUUID:
35643563
pass
35653564

35663565
# Ack with wrong type of object.
3567-
with self.assertRaises(ExceptionIteratingRequestsError) as cm:
3566+
with self.assertRaises(ValueError) as cm:
35683567
for _ in subscription:
35693568
subscription.ack(NotUUID()) # type: ignore
3570-
self.assertIsInstance(cm.exception.__cause__, ValueError)
3569+
self.assertIn("is not a UUID", str(cm.exception))
35713570

35723571
def test_subscription_to_stream_replay_parked(self) -> None:
35733572
self.construct_client()

tests/test_client_async.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
AlreadyExistsError,
3636
DeadlineExceededError,
3737
DiscoveryFailedError,
38-
ExceptionIteratingRequestsError,
3938
ExceptionThrownByHandlerError,
4039
FollowerNotFoundError,
4140
GrpcDeadlineExceededError,
@@ -1601,9 +1600,10 @@ async def test_persistent_subscription_to_all(self) -> None:
16011600

16021601
# Read subscription - error iterating requests is propagated.
16031602
persistent_subscription = await self.client.read_subscription_to_all(group_name)
1604-
with self.assertRaises(ExceptionIteratingRequestsError):
1603+
with self.assertRaises(ValueError) as cm:
16051604
async for _ in persistent_subscription:
16061605
await persistent_subscription.ack("a") # type: ignore[arg-type]
1606+
self.assertIn("event_id 'a' is not a UUID", str(cm.exception))
16071607

16081608
# Read subscription - success.
16091609
persistent_subscription = await self.client.read_subscription_to_all(group_name)
@@ -2420,9 +2420,10 @@ async def test_persistent_subscription_to_stream(self) -> None:
24202420
subscription = await self.client.read_subscription_to_stream(
24212421
group_name, stream_name1
24222422
)
2423-
with self.assertRaises(ExceptionIteratingRequestsError):
2423+
with self.assertRaises(ValueError) as cm:
24242424
async for _ in subscription:
24252425
await subscription.ack("a") # type: ignore[arg-type]
2426+
self.assertIn("event_id 'a' is not a UUID", str(cm.exception))
24262427

24272428
# Read subscription - success.
24282429
subscription = await self.client.read_subscription_to_stream(

0 commit comments

Comments
 (0)