Skip to content

Commit 0d5e514

Browse files
authored
fix: subscriptions bookkeeping in pubsub clients (#510)
Pubsub clients should do some bookkeeping of number of active subscriptions to be able to gracefully reject new subscribe requests rather than letting them silently queue up on the client. Updated the client to use a pool of 4 unary grpc managers by default to match the other SDKs. The stream managers use the existing with_max_subscriptions configuration to statically set the number of grpc managers in the stream pool.
1 parent 62ac4ae commit 0d5e514

12 files changed

Lines changed: 484 additions & 107 deletions

File tree

.github/workflows/on-pull-request.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ jobs:
5757
run: poetry run ruff format --check --diff src tests
5858

5959
- name: Run tests
60-
run: poetry run pytest -p no:sugar -q -m "not local"
60+
run: poetry run pytest -p no:sugar -q -m "not local and not subscription_initialization"
6161

6262
test-examples:
6363
runs-on: ubuntu-24.04

Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,16 @@ gen-sync: do-gen-sync format lint
5050
.PHONY: test
5151
## Run unit and integration tests with pytest
5252
test:
53-
@poetry run pytest -m "not local"
53+
@poetry run pytest -m "not local and not subscription_initialization"
5454

5555
.PHONY: test-local
5656
## Run the integration tests that require Momento Local
5757
test-local:
5858
@poetry run pytest -m local
5959

60+
test-subscription-initialization:
61+
@poetry run pytest -m subscription_initialization
62+
6063
.PHONY: precommit
6164
## Run format, lint, and test as a step before committing.
6265
precommit: gen-sync format lint test

poetry.lock

Lines changed: 79 additions & 66 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pytest = "^7.1.3"
4444
pytest-asyncio = "^0.19.0"
4545
pytest-describe = "^2.0.1"
4646
pytest-sugar = "^0.9.5"
47+
pytest-timeout = "^2.4.0"
4748

4849
[tool.poetry.group.lint.dependencies]
4950
mypy = "^1.0"
@@ -61,6 +62,7 @@ log_cli_format = "%(asctime)s [%(levelname)s] %(message)s"
6162
log_cli_date_format = "%Y-%m-%d %H:%M:%S.%f"
6263
markers = [
6364
"local: tests that require Momento Local",
65+
"subscription_initialization: tests that require higher subscriptions limit when testing against the live service",
6466
]
6567

6668
[tool.mypy]

src/momento/errors/exceptions.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,3 +371,21 @@ def __init__(
371371
transport_details,
372372
message_wrapper="Service returned an unknown response; please contact us at support@momentohq.com",
373373
)
374+
375+
376+
class ClientResourceExhaustedException(SdkException):
377+
"""Client resource (such as memory or number of concurrent streams) exhausted."""
378+
379+
def __init__(
380+
self,
381+
message: str,
382+
service: Service,
383+
transport_details: Optional[MomentoErrorTransportDetails] = None,
384+
):
385+
super().__init__(
386+
message,
387+
MomentoErrorCode.CLIENT_RESOURCE_EXHAUSTED,
388+
service,
389+
transport_details,
390+
message_wrapper="Client resource (such as memory or number of concurrent streams) exhausted",
391+
)

src/momento/internal/aio/_scs_grpc_manager.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from momento.auth import CredentialProvider
1414
from momento.config import Configuration, TopicConfiguration
1515
from momento.config.auth_configuration import AuthConfiguration
16-
from momento.errors.exceptions import ConnectionException
16+
from momento.errors.exceptions import ClientResourceExhaustedException, ConnectionException
1717
from momento.internal._utilities import PYTHON_RUNTIME_VERSION, ClientType
1818
from momento.internal._utilities._channel_credentials import (
1919
channel_credentials_from_root_certs_or_default,
@@ -220,13 +220,23 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede
220220
configuration.get_transport_strategy().get_grpc_configuration()
221221
),
222222
)
223+
self._active_streams_count = 0
223224

224225
async def close(self) -> None:
225226
await self._channel.close()
226227

227228
def async_stub(self) -> pubsub_client.PubsubStub:
229+
if self._active_streams_count >= 100:
230+
raise ClientResourceExhaustedException(
231+
message="Already at max number of concurrent streams",
232+
service=Service.TOPICS,
233+
)
234+
self._active_streams_count += 1
228235
return pubsub_client.PubsubStub(self._channel) # type: ignore[no-untyped-call]
229236

237+
def decrement_stream_count(self) -> None:
238+
self._active_streams_count -= 1
239+
230240

231241
class _TokenGrpcManager:
232242
"""Internal gRPC token manager."""

src/momento/internal/aio/_scs_pubsub_client.py

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import math
44
from datetime import timedelta
5+
from typing import Callable
56

67
from momento_wire_types import cachepubsub_pb2 as pubsub_pb
78
from momento_wire_types import cachepubsub_pb2_grpc as pubsub_grpc
@@ -10,6 +11,7 @@
1011
from momento.auth import CredentialProvider
1112
from momento.config import TopicConfiguration
1213
from momento.errors import convert_error
14+
from momento.errors.exceptions import ClientResourceExhaustedException
1315
from momento.internal._utilities import _validate_cache_name, _validate_topic_name
1416
from momento.internal.aio._scs_grpc_manager import (
1517
_PubsubGrpcManager,
@@ -27,8 +29,6 @@
2729
class _ScsPubsubClient:
2830
"""Internal pubsub client."""
2931

30-
stream_topic_manager_count = 0
31-
3232
def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider):
3333
endpoint = credential_provider.cache_endpoint
3434
self._logger = logs.logger
@@ -38,20 +38,23 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede
3838
default_deadline: timedelta = configuration.get_transport_strategy().get_grpc_configuration().get_deadline()
3939
self._default_deadline_seconds = default_deadline.total_seconds()
4040

41-
num_subscriptions = configuration.get_max_subscriptions()
4241
# Default to a single channel and scale up if necessary. Each channel can support
4342
# 100 subscriptions. Issuing more subscribe requests than you have channels to handle
44-
# will cause the last request to hang indefinitely, so it's important to get this right.
43+
# will cause a ClientResourceExhaustedException.
4544
num_channels = 1
45+
num_subscriptions = configuration.get_max_subscriptions()
4646
if num_subscriptions > 0:
4747
num_channels = math.ceil(num_subscriptions / 100.0)
4848
self._logger.debug(f"creating {num_channels} subscription channels")
49-
50-
self._grpc_manager = _PubsubGrpcManager(configuration, credential_provider)
5149
self._stream_managers = [
5250
_PubsubGrpcStreamManager(configuration, credential_provider) for i in range(0, num_channels)
5351
]
5452

53+
# Default to 4 unary pubsub channels. TODO: Make this configurable.
54+
self._unary_managers = [_PubsubGrpcManager(configuration, credential_provider) for i in range(0, 4)]
55+
self._stream_manager_count = 0
56+
self._unary_manager_count = 0
57+
5558
@property
5659
def endpoint(self) -> str:
5760
return self._endpoint
@@ -72,7 +75,7 @@ async def publish(self, cache_name: str, topic_name: str, value: str | bytes) ->
7275
value=topic_value,
7376
)
7477

75-
await self._get_stub().Publish( # type: ignore[misc]
78+
await self._get_unary_stub().Publish( # type: ignore[misc]
7679
request,
7780
timeout=self._default_deadline_seconds,
7881
)
@@ -98,7 +101,8 @@ async def subscribe(
98101
resume_at_topic_sequence_number=resume_at_topic_sequence_number,
99102
sequence_page=resume_at_topic_sequence_page,
100103
)
101-
stream = self._get_stream_stub().Subscribe( # type: ignore[misc]
104+
stub, decrement_stream_count = self._get_stream_stub()
105+
stream = stub.Subscribe( # type: ignore[misc]
102106
request,
103107
)
104108

@@ -112,23 +116,48 @@ async def subscribe(
112116
err = Exception(f"expected a heartbeat message but got '{msg_type}'")
113117
self._log_request_error("subscribe", err)
114118
return TopicSubscribe.Error(convert_error(err, Service.TOPICS))
115-
return TopicSubscribe.SubscriptionAsync(cache_name, topic_name, client_stream=stream) # type: ignore[misc]
119+
return TopicSubscribe.SubscriptionAsync(
120+
cache_name,
121+
topic_name,
122+
client_stream=stream, # type: ignore[misc]
123+
decrement_stream_count_method=decrement_stream_count,
124+
)
116125
except Exception as e:
117126
self._log_request_error("subscribe", e)
118127
return TopicSubscribe.Error(convert_error(e, Service.TOPICS))
119128

120129
def _log_request_error(self, request_type: str, e: Exception) -> None:
121130
self._logger.warning(f"{request_type} failed with exception: {e}")
122131

123-
def _get_stub(self) -> pubsub_grpc.PubsubStub:
124-
return self._grpc_manager.async_stub()
125-
126-
def _get_stream_stub(self) -> pubsub_grpc.PubsubStub:
127-
stub = self._stream_managers[self.stream_topic_manager_count % len(self._stream_managers)].async_stub()
128-
self.stream_topic_manager_count += 1
129-
return stub
132+
def _get_unary_stub(self) -> pubsub_grpc.PubsubStub:
133+
# Simply round-robin through the unary managers.
134+
# Unary requests will eventually complete (unlike long-lived subscriptions),
135+
# so we do not need the same bookkeeping logic here.
136+
manager = self._unary_managers[self._unary_manager_count % len(self._unary_managers)]
137+
self._unary_manager_count += 1
138+
return manager.async_stub()
139+
140+
def _get_stream_stub(self) -> tuple[pubsub_grpc.PubsubStub, Callable[[], None]]:
141+
# Try to get a client with capacity for another subscription by round-robining through the stubs.
142+
# Allow up to max_stream_capacity attempts to account for large bursts of requests.
143+
max_stream_capacity = len(self._stream_managers) * 100
144+
for _ in range(0, max_stream_capacity):
145+
try:
146+
manager = self._stream_managers[self._stream_manager_count % len(self._stream_managers)]
147+
self._stream_manager_count += 1
148+
return manager.async_stub(), manager.decrement_stream_count
149+
except ClientResourceExhaustedException:
150+
# If the stub is at capacity, continue to the next one.
151+
continue
152+
153+
# Otherwise return an error if no stubs have capacity.
154+
raise ClientResourceExhaustedException(
155+
message="Maximum number of active subscriptions reached",
156+
service=Service.TOPICS,
157+
)
130158

131159
async def close(self) -> None:
132-
await self._grpc_manager.close()
160+
for unary_client in self._unary_managers:
161+
await unary_client.close()
133162
for stream_client in self._stream_managers:
134163
await stream_client.close()

src/momento/internal/synchronous/_scs_grpc_manager.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from momento.config.middleware import MiddlewareRequestHandlerContext
1818
from momento.config.middleware.models import CONNECTION_ID_KEY
1919
from momento.config.middleware.synchronous import Middleware
20-
from momento.errors.exceptions import ConnectionException
20+
from momento.errors.exceptions import ClientResourceExhaustedException, ConnectionException
2121
from momento.internal._utilities import PYTHON_RUNTIME_VERSION, ClientType
2222
from momento.internal._utilities._channel_credentials import (
2323
channel_credentials_from_root_certs_or_default,
@@ -158,7 +158,7 @@ def on_state_change(state: grpc.ChannelConnectivity) -> None:
158158
elif state == connecting:
159159
self._logger.debug("State transitioned to CONNECTING; waiting to get READY")
160160
else:
161-
self._logger.warn(f"Unexpected connection state while trying to eagerly connect: {state}")
161+
self._logger.warning(f"Unexpected connection state while trying to eagerly connect: {state}")
162162
# we could not connect within the timeout and we no longer need this subscription
163163
self._channel.unsubscribe(on_state_change)
164164
connection_event.set()
@@ -233,13 +233,23 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede
233233
self._secure_channel, *_stream_interceptors(credential_provider.auth_token, ClientType.TOPIC)
234234
)
235235
self._stub = pubsub_client.PubsubStub(intercept_channel) # type: ignore[no-untyped-call]
236+
self._active_streams_count = 0
236237

237238
def close(self) -> None:
238239
self._secure_channel.close()
239240

240241
def stub(self) -> pubsub_client.PubsubStub:
242+
if self._active_streams_count >= 100:
243+
raise ClientResourceExhaustedException(
244+
message="Already at max number of concurrent streams",
245+
service=Service.TOPICS,
246+
)
247+
self._active_streams_count += 1
241248
return self._stub
242249

250+
def decrement_stream_count(self) -> None:
251+
self._active_streams_count -= 1
252+
243253

244254
class _TokenGrpcManager:
245255
"""Internal gRPC token manager."""

src/momento/internal/synchronous/_scs_pubsub_client.py

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import math
44
from datetime import timedelta
5+
from typing import Callable
56

67
from momento_wire_types import cachepubsub_pb2 as pubsub_pb
78
from momento_wire_types import cachepubsub_pb2_grpc as pubsub_grpc
@@ -10,6 +11,7 @@
1011
from momento.auth import CredentialProvider
1112
from momento.config import TopicConfiguration
1213
from momento.errors import convert_error
14+
from momento.errors.exceptions import ClientResourceExhaustedException
1315
from momento.internal._utilities import _validate_cache_name, _validate_topic_name
1416
from momento.internal.services import Service
1517
from momento.internal.synchronous._scs_grpc_manager import (
@@ -27,8 +29,6 @@
2729
class _ScsPubsubClient:
2830
"""Internal pubsub client."""
2931

30-
stream_topic_manager_count = 0
31-
3232
def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider):
3333
endpoint = credential_provider.cache_endpoint
3434
self._logger = logs.logger
@@ -38,20 +38,23 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede
3838
default_deadline: timedelta = configuration.get_transport_strategy().get_grpc_configuration().get_deadline()
3939
self._default_deadline_seconds = default_deadline.total_seconds()
4040

41-
num_subscriptions = configuration.get_max_subscriptions()
4241
# Default to a single channel and scale up if necessary. Each channel can support
4342
# 100 subscriptions. Issuing more subscribe requests than you have channels to handle
44-
# will cause the last request to hang indefinitely, so it's important to get this right.
43+
# will cause a ClientResourceExhaustedException.
4544
num_channels = 1
45+
num_subscriptions = configuration.get_max_subscriptions()
4646
if num_subscriptions > 0:
4747
num_channels = math.ceil(num_subscriptions / 100.0)
4848
self._logger.debug(f"creating {num_channels} subscription channels")
49-
50-
self._grpc_manager = _PubsubGrpcManager(configuration, credential_provider)
5149
self._stream_managers = [
5250
_PubsubGrpcStreamManager(configuration, credential_provider) for i in range(0, num_channels)
5351
]
5452

53+
# Default to 4 unary pubsub channels. TODO: Make this configurable.
54+
self._unary_managers = [_PubsubGrpcManager(configuration, credential_provider) for i in range(0, 4)]
55+
self._stream_manager_count = 0
56+
self._unary_manager_count = 0
57+
5558
@property
5659
def endpoint(self) -> str:
5760
return self._endpoint
@@ -72,7 +75,7 @@ def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> Topic
7275
value=topic_value,
7376
)
7477

75-
self._get_stub().Publish(
78+
self._get_unary_stub().Publish(
7679
request,
7780
timeout=self._default_deadline_seconds,
7881
)
@@ -98,7 +101,8 @@ def subscribe(
98101
resume_at_topic_sequence_number=resume_at_topic_sequence_number,
99102
sequence_page=resume_at_topic_sequence_page,
100103
)
101-
stream = self._get_stream_stub().Subscribe( # type: ignore[misc]
104+
stub, decrement_stream_count = self._get_stream_stub()
105+
stream = stub.Subscribe( # type: ignore[misc]
102106
request,
103107
)
104108

@@ -112,23 +116,48 @@ def subscribe(
112116
err = Exception(f"expected a heartbeat message but got '{msg_type}'")
113117
self._log_request_error("subscribe", err)
114118
return TopicSubscribe.Error(convert_error(err, Service.TOPICS))
115-
return TopicSubscribe.Subscription(cache_name, topic_name, client_stream=stream) # type: ignore[misc]
119+
return TopicSubscribe.Subscription(
120+
cache_name,
121+
topic_name,
122+
client_stream=stream, # type: ignore[misc]
123+
decrement_stream_count_method=decrement_stream_count,
124+
)
116125
except Exception as e:
117126
self._log_request_error("subscribe", e)
118127
return TopicSubscribe.Error(convert_error(e, Service.TOPICS))
119128

120129
def _log_request_error(self, request_type: str, e: Exception) -> None:
121130
self._logger.warning(f"{request_type} failed with exception: {e}")
122131

123-
def _get_stub(self) -> pubsub_grpc.PubsubStub:
124-
return self._grpc_manager.stub()
125-
126-
def _get_stream_stub(self) -> pubsub_grpc.PubsubStub:
127-
stub = self._stream_managers[self.stream_topic_manager_count % len(self._stream_managers)].stub()
128-
self.stream_topic_manager_count += 1
132+
def _get_unary_stub(self) -> pubsub_grpc.PubsubStub:
133+
# Simply round-robin through the unary managers.
134+
# Unary requests will eventually complete (unlike long-lived subscriptions),
135+
# so we do not need the same bookkeeping logic here.
136+
stub = self._unary_managers[self._unary_manager_count % len(self._unary_managers)].stub()
137+
self._unary_manager_count += 1
129138
return stub
130139

140+
def _get_stream_stub(self) -> tuple[pubsub_grpc.PubsubStub, Callable[[], None]]:
141+
# Try to get a client with capacity for another subscription by round-robining through the stubs.
142+
# Allow up to max_stream_capacity attempts to account for large bursts of requests.
143+
max_stream_capacity = len(self._stream_managers) * 100
144+
for _ in range(0, max_stream_capacity):
145+
try:
146+
manager = self._stream_managers[self._stream_manager_count % len(self._stream_managers)]
147+
self._stream_manager_count += 1
148+
return manager.stub(), manager.decrement_stream_count
149+
except ClientResourceExhaustedException:
150+
# If the stub is at capacity, continue to the next one.
151+
continue
152+
153+
# Otherwise return an error if no stubs have capacity.
154+
raise ClientResourceExhaustedException(
155+
message="Maximum number of active subscriptions reached",
156+
service=Service.TOPICS,
157+
)
158+
131159
def close(self) -> None:
132-
self._grpc_manager.close()
160+
for unary_manager in self._unary_managers:
161+
unary_manager.close()
133162
for stream_client in self._stream_managers:
134163
stream_client.close()

0 commit comments

Comments
 (0)