Skip to content

Commit 9f809c8

Browse files
authored
feat: Add a credential provider for Momento Local (#501)
Add a credential provider to connect to Momento Local. Update the gRPC managers to be able to create insecure channels for connecting to Momento Local.
1 parent 7c47aab commit 9f809c8

3 files changed

Lines changed: 233 additions & 125 deletions

File tree

src/momento/auth/credential_provider.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class CredentialProvider:
1616
control_endpoint: str
1717
cache_endpoint: str
1818
token_endpoint: str
19+
port: int
1920

2021
@staticmethod
2122
def from_environment_variable(
@@ -72,7 +73,23 @@ def from_string(
7273
cache_endpoint = cache_endpoint or token_and_endpoints.cache_endpoint
7374
token_endpoint = token_endpoint or token_and_endpoints.token_endpoint
7475
auth_token = token_and_endpoints.auth_token
75-
return CredentialProvider(auth_token, control_endpoint, cache_endpoint, token_endpoint)
76+
return CredentialProvider(auth_token, control_endpoint, cache_endpoint, token_endpoint, 443)
77+
78+
@staticmethod
79+
def for_momento_local(
80+
host: str = "127.0.0.1",
81+
port: int = 8080,
82+
) -> CredentialProvider:
83+
"""Creates a credential provider for use with Momento Local.
84+
85+
Args:
86+
host (str): the Momento Local host.
87+
port (int): the Momento Local port.
88+
89+
Returns:
90+
CredentialProvider
91+
"""
92+
return CredentialProvider("", host, host, host, port)
7693

7794
def __repr__(self) -> str:
7895
attributes: Dict[str, str] = copy.copy(vars(self)) # type: ignore[misc]

src/momento/internal/aio/_scs_grpc_manager.py

Lines changed: 125 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -38,53 +38,75 @@ class _ControlGrpcManager:
3838
"""Internal gRPC control manager."""
3939

4040
def __init__(self, configuration: Configuration, credential_provider: CredentialProvider):
41-
self._secure_channel = grpc.aio.secure_channel(
42-
target=credential_provider.control_endpoint,
43-
credentials=channel_credentials_from_root_certs_or_default(configuration),
44-
interceptors=_interceptors(
45-
credential_provider.auth_token, ClientType.CACHE, configuration.get_retry_strategy()
46-
),
47-
options=grpc_control_channel_options_from_grpc_config(
48-
grpc_config=configuration.get_transport_strategy().get_grpc_configuration(),
49-
),
50-
)
41+
if credential_provider.port == 443:
42+
self._channel = grpc.aio.secure_channel(
43+
target=credential_provider.control_endpoint,
44+
credentials=channel_credentials_from_root_certs_or_default(configuration),
45+
interceptors=_interceptors(
46+
credential_provider.auth_token, ClientType.CACHE, configuration.get_retry_strategy()
47+
),
48+
options=grpc_control_channel_options_from_grpc_config(
49+
grpc_config=configuration.get_transport_strategy().get_grpc_configuration(),
50+
),
51+
)
52+
else:
53+
self._channel = grpc.aio.insecure_channel(
54+
target=f"{credential_provider.control_endpoint}:{credential_provider.port}",
55+
interceptors=_interceptors(
56+
credential_provider.auth_token, ClientType.CACHE, configuration.get_retry_strategy()
57+
),
58+
options=grpc_control_channel_options_from_grpc_config(
59+
grpc_config=configuration.get_transport_strategy().get_grpc_configuration(),
60+
),
61+
)
5162

5263
async def close(self) -> None:
53-
await self._secure_channel.close()
64+
await self._channel.close()
5465

5566
def async_stub(self) -> control_client.ScsControlStub:
56-
return control_client.ScsControlStub(self._secure_channel) # type: ignore[no-untyped-call]
67+
return control_client.ScsControlStub(self._channel) # type: ignore[no-untyped-call]
5768

5869

5970
class _DataGrpcManager:
6071
"""Internal gRPC data manager."""
6172

6273
def __init__(self, configuration: Configuration, credential_provider: CredentialProvider):
6374
self._logger = logs.logger
64-
self._secure_channel = grpc.aio.secure_channel(
65-
target=credential_provider.cache_endpoint,
66-
credentials=channel_credentials_from_root_certs_or_default(configuration),
67-
interceptors=_interceptors(
68-
credential_provider.auth_token, ClientType.CACHE, configuration.get_retry_strategy()
69-
),
70-
# Here is where you would pass override configuration to the underlying C gRPC layer.
71-
# However, I have tried several different tuning options here and did not see any
72-
# performance improvements, so sticking with the defaults for now.
73-
# For more info on the performance investigations:
74-
# https://github.com/momentohq/client-sdk-python/issues/120
75-
# For more info on available gRPC config options:
76-
# https://grpc.github.io/grpc/python/grpc.html
77-
# https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments
78-
# https://github.com/grpc/grpc/blob/v1.46.x/include/grpc/impl/codegen/grpc_types.h#L140
79-
# options=[
80-
# ('grpc.max_concurrent_streams', 1000),
81-
# ('grpc.use_local_subchannel_pool', 1),
82-
# (experimental.ChannelOptions.SingleThreadedUnaryStream, 1)
83-
# ],
84-
options=grpc_data_channel_options_from_grpc_config(
85-
configuration.get_transport_strategy().get_grpc_configuration()
86-
),
87-
)
75+
if credential_provider.port == 443:
76+
self._channel = grpc.aio.secure_channel(
77+
target=credential_provider.cache_endpoint,
78+
credentials=channel_credentials_from_root_certs_or_default(configuration),
79+
interceptors=_interceptors(
80+
credential_provider.auth_token, ClientType.CACHE, configuration.get_retry_strategy()
81+
),
82+
# Here is where you would pass override configuration to the underlying C gRPC layer.
83+
# However, I have tried several different tuning options here and did not see any
84+
# performance improvements, so sticking with the defaults for now.
85+
# For more info on the performance investigations:
86+
# https://github.com/momentohq/client-sdk-python/issues/120
87+
# For more info on available gRPC config options:
88+
# https://grpc.github.io/grpc/python/grpc.html
89+
# https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments
90+
# https://github.com/grpc/grpc/blob/v1.46.x/include/grpc/impl/codegen/grpc_types.h#L140
91+
# options=[
92+
# ('grpc.max_concurrent_streams', 1000),
93+
# ('grpc.use_local_subchannel_pool', 1),
94+
# (experimental.ChannelOptions.SingleThreadedUnaryStream, 1)
95+
# ],
96+
options=grpc_data_channel_options_from_grpc_config(
97+
configuration.get_transport_strategy().get_grpc_configuration()
98+
),
99+
)
100+
else:
101+
self._channel = grpc.aio.insecure_channel(
102+
target=f"{credential_provider.cache_endpoint}:{credential_provider.port}",
103+
interceptors=_interceptors(
104+
credential_provider.auth_token, ClientType.CACHE, configuration.get_retry_strategy()
105+
),
106+
options=grpc_data_channel_options_from_grpc_config(
107+
configuration.get_transport_strategy().get_grpc_configuration()
108+
),
109+
)
88110

89111
async def eagerly_connect(self, timeout_seconds: float) -> None:
90112
self._logger.debug(
@@ -93,15 +115,15 @@ async def eagerly_connect(self, timeout_seconds: float) -> None:
93115
try:
94116
await asyncio.wait_for(self.wait_for_ready(), timeout_seconds)
95117
except Exception as error:
96-
self._secure_channel.close()
118+
self._channel.close()
97119
self._logger.debug(f"Failed to connect to the server within the given timeout. {error}")
98120
raise ConnectionException(
99121
message=f"Failed to connect to Momento's server within given eager connection timeout: {error}",
100122
service=Service.CACHE,
101123
) from error
102124

103125
async def wait_for_ready(self) -> None:
104-
latest_state = self._secure_channel.get_state(True) # try_to_connect
126+
latest_state = self._channel.get_state(True) # try_to_connect
105127
ready: grpc.ChannelConnectivity = grpc.ChannelConnectivity.READY
106128
connecting: grpc.ChannelConnectivity = grpc.ChannelConnectivity.CONNECTING
107129
idle: grpc.ChannelConnectivity = grpc.ChannelConnectivity.IDLE
@@ -117,80 +139,109 @@ async def wait_for_ready(self) -> None:
117139

118140
# This is a gRPC callback helper that prevents us from repeatedly polling on the state
119141
# which is highly inefficient.
120-
await self._secure_channel.wait_for_state_change(latest_state)
121-
latest_state = self._secure_channel.get_state(False) # no need to reconnect
142+
await self._channel.wait_for_state_change(latest_state)
143+
latest_state = self._channel.get_state(False) # no need to reconnect
122144

123145
if latest_state == ready:
124146
self._logger.debug("Connected to Momento's server! Happy Caching!")
125147

126148
async def close(self) -> None:
127149
self._logger.debug("Closing and tearing down gRPC channel")
128-
await self._secure_channel.close()
150+
await self._channel.close()
129151

130152
def async_stub(self) -> cache_client.ScsStub:
131-
return cache_client.ScsStub(self._secure_channel) # type: ignore[no-untyped-call]
153+
return cache_client.ScsStub(self._channel) # type: ignore[no-untyped-call]
132154

133155

134156
class _PubsubGrpcManager:
135157
"""Internal gRPC pubsub manager."""
136158

137159
def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider):
138-
self._secure_channel = grpc.aio.secure_channel(
139-
target=credential_provider.cache_endpoint,
140-
credentials=grpc.ssl_channel_credentials(),
141-
interceptors=_interceptors(credential_provider.auth_token, ClientType.TOPIC, None),
142-
options=grpc_topic_channel_options_from_grpc_config(
143-
configuration.get_transport_strategy().get_grpc_configuration()
144-
),
145-
)
160+
if credential_provider.port == 443:
161+
self._channel = grpc.aio.secure_channel(
162+
target=credential_provider.cache_endpoint,
163+
credentials=grpc.ssl_channel_credentials(),
164+
interceptors=_interceptors(credential_provider.auth_token, ClientType.TOPIC, None),
165+
options=grpc_topic_channel_options_from_grpc_config(
166+
configuration.get_transport_strategy().get_grpc_configuration()
167+
),
168+
)
169+
else:
170+
self._channel = grpc.aio.insecure_channel(
171+
target=f"{credential_provider.cache_endpoint}:{credential_provider.port}",
172+
interceptors=_interceptors(credential_provider.auth_token, ClientType.TOPIC, None),
173+
options=grpc_topic_channel_options_from_grpc_config(
174+
configuration.get_transport_strategy().get_grpc_configuration()
175+
),
176+
)
146177

147178
async def close(self) -> None:
148-
await self._secure_channel.close()
179+
await self._channel.close()
149180

150181
def async_stub(self) -> pubsub_client.PubsubStub:
151-
return pubsub_client.PubsubStub(self._secure_channel) # type: ignore[no-untyped-call]
182+
return pubsub_client.PubsubStub(self._channel) # type: ignore[no-untyped-call]
152183

153184

154185
class _PubsubGrpcStreamManager:
155186
"""Internal gRPC pubsub stream manager."""
156187

157188
def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider):
158-
self._secure_channel = grpc.aio.secure_channel(
159-
target=credential_provider.cache_endpoint,
160-
credentials=grpc.ssl_channel_credentials(),
161-
interceptors=_stream_interceptors(credential_provider.auth_token, ClientType.TOPIC),
162-
options=grpc_topic_channel_options_from_grpc_config(
163-
configuration.get_transport_strategy().get_grpc_configuration()
164-
),
165-
)
189+
if credential_provider.port == 443:
190+
self._channel = grpc.aio.secure_channel(
191+
target=credential_provider.cache_endpoint,
192+
credentials=grpc.ssl_channel_credentials(),
193+
interceptors=_stream_interceptors(credential_provider.auth_token, ClientType.TOPIC),
194+
options=grpc_topic_channel_options_from_grpc_config(
195+
configuration.get_transport_strategy().get_grpc_configuration()
196+
),
197+
)
198+
else:
199+
self._channel = grpc.aio.insecure_channel(
200+
target=f"{credential_provider.cache_endpoint}:{credential_provider.port}",
201+
interceptors=_stream_interceptors(credential_provider.auth_token, ClientType.TOPIC),
202+
options=grpc_topic_channel_options_from_grpc_config(
203+
configuration.get_transport_strategy().get_grpc_configuration()
204+
),
205+
)
166206

167207
async def close(self) -> None:
168-
await self._secure_channel.close()
208+
await self._channel.close()
169209

170210
def async_stub(self) -> pubsub_client.PubsubStub:
171-
return pubsub_client.PubsubStub(self._secure_channel) # type: ignore[no-untyped-call]
211+
return pubsub_client.PubsubStub(self._channel) # type: ignore[no-untyped-call]
172212

173213

174214
class _TokenGrpcManager:
175215
"""Internal gRPC token manager."""
176216

177217
def __init__(self, configuration: AuthConfiguration, credential_provider: CredentialProvider):
178-
self._secure_channel = grpc.aio.secure_channel(
179-
target=credential_provider.token_endpoint,
180-
credentials=grpc.ssl_channel_credentials(),
181-
interceptors=_interceptors(
182-
credential_provider.auth_token, ClientType.TOKEN, configuration.get_retry_strategy()
183-
),
184-
options=grpc_control_channel_options_from_grpc_config(
185-
grpc_config=configuration.get_transport_strategy().get_grpc_configuration(),
186-
),
187-
)
218+
if credential_provider.port == 443:
219+
self._channel = grpc.aio.secure_channel(
220+
target=credential_provider.token_endpoint,
221+
credentials=grpc.ssl_channel_credentials(),
222+
interceptors=_interceptors(
223+
credential_provider.auth_token, ClientType.TOKEN, configuration.get_retry_strategy()
224+
),
225+
options=grpc_control_channel_options_from_grpc_config(
226+
grpc_config=configuration.get_transport_strategy().get_grpc_configuration(),
227+
),
228+
)
229+
else:
230+
self._channel = grpc.aio.insecure_channel(
231+
target=f"{credential_provider.token_endpoint}:{credential_provider.port}",
232+
interceptors=_interceptors(
233+
credential_provider.auth_token, ClientType.TOKEN, configuration.get_retry_strategy()
234+
),
235+
options=grpc_control_channel_options_from_grpc_config(
236+
grpc_config=configuration.get_transport_strategy().get_grpc_configuration(),
237+
),
238+
)
188239

189240
async def close(self) -> None:
190-
await self._secure_channel.close()
241+
await self._channel.close()
191242

192243
def async_stub(self) -> token_client.TokenStub:
193-
return token_client.TokenStub(self._secure_channel) # type: ignore[no-untyped-call]
244+
return token_client.TokenStub(self._channel) # type: ignore[no-untyped-call]
194245

195246

196247
def _interceptors(

0 commit comments

Comments
 (0)