Skip to content

Commit 46d5afc

Browse files
Brian McCaffreyMasses
authored andcommitted
Add in min_connections to available arguments
Signed-off-by: Brian McCaffrey <brian@musubilabs.ai>
1 parent 9a3b547 commit 46d5afc

9 files changed

Lines changed: 245 additions & 6 deletions

File tree

tests/test_asyncio/test_cluster.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,24 @@ async def read_response_mocked(*args: Any, **kwargs: Any) -> None:
468468

469469
await rc.aclose()
470470

471+
async def test_min_connections(
472+
self, create_valkey: Callable[..., ValkeyCluster]
473+
) -> None:
474+
rc = await create_valkey(cls=ValkeyCluster, min_connections=5)
475+
for node in rc.get_nodes():
476+
assert node.min_connections == 5
477+
assert len(node._connections) == 5
478+
assert len(node._free) == 5
479+
await rc.aclose()
480+
481+
async def test_min_connections_greater_than_max(
482+
self, create_valkey: Callable[..., ValkeyCluster]
483+
) -> None:
484+
with pytest.raises(ValkeyClusterException):
485+
await create_valkey(
486+
cls=ValkeyCluster, min_connections=20, max_connections=10
487+
)
488+
471489
async def test_execute_command_errors(self, r: ValkeyCluster) -> None:
472490
"""
473491
Test that if no key is provided then exception should be raised.

tests/test_asyncio/test_connection_pool.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,51 @@ async def test_repr_contains_db_info_unix(self):
196196
expected = "path=/abc,db=1,client_name=test-client"
197197
assert expected in repr(pool)
198198

199+
async def test_min_connections(self):
200+
pool = valkey.ConnectionPool(
201+
connection_class=DummyConnection,
202+
min_connections=5,
203+
)
204+
assert pool.min_connections == 5
205+
assert len(pool._available_connections) == 5
206+
await pool.disconnect(inuse_connections=True)
207+
208+
async def test_min_connections_default(self):
209+
pool = valkey.ConnectionPool(
210+
connection_class=DummyConnection,
211+
)
212+
assert pool.min_connections == 0
213+
assert len(pool._available_connections) == 0
214+
await pool.disconnect(inuse_connections=True)
215+
216+
async def test_min_connections_greater_than_max_raises(self):
217+
with pytest.raises(ValueError):
218+
valkey.ConnectionPool(
219+
connection_class=DummyConnection,
220+
min_connections=20,
221+
max_connections=10,
222+
)
223+
224+
async def test_min_connections_negative_raises(self):
225+
with pytest.raises(ValueError):
226+
valkey.ConnectionPool(
227+
connection_class=DummyConnection,
228+
min_connections=-1,
229+
)
230+
231+
async def test_min_connections_initialize(self):
232+
pool = valkey.ConnectionPool(
233+
connection_class=DummyConnection,
234+
min_connections=3,
235+
)
236+
assert not pool._initialized
237+
await pool.initialize()
238+
assert pool._initialized
239+
# Calling initialize again is a no-op
240+
await pool.initialize()
241+
assert pool._initialized
242+
await pool.disconnect(inuse_connections=True)
243+
199244

200245
class TestBlockingConnectionPool:
201246
@asynccontextmanager

tests/test_connection_pool.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,37 @@ def test_repr_contains_db_info_unix(self):
107107
expected = "path=/abc,db=1,client_name=test-client"
108108
assert expected in repr(pool)
109109

110+
def test_min_connections(self):
111+
pool = valkey.ConnectionPool(
112+
connection_class=DummyConnection,
113+
min_connections=5,
114+
)
115+
assert pool.min_connections == 5
116+
assert len(pool._available_connections) == 5
117+
assert pool._created_connections == 5
118+
119+
def test_min_connections_default(self):
120+
pool = valkey.ConnectionPool(
121+
connection_class=DummyConnection,
122+
)
123+
assert pool.min_connections == 0
124+
assert len(pool._available_connections) == 0
125+
126+
def test_min_connections_greater_than_max_raises(self):
127+
with pytest.raises(ValueError):
128+
valkey.ConnectionPool(
129+
connection_class=DummyConnection,
130+
min_connections=20,
131+
max_connections=10,
132+
)
133+
134+
def test_min_connections_negative_raises(self):
135+
with pytest.raises(ValueError):
136+
valkey.ConnectionPool(
137+
connection_class=DummyConnection,
138+
min_connections=-1,
139+
)
140+
110141

111142
class TestBlockingConnectionPool:
112143
def get_pool(self, connection_kwargs=None, max_connections=10, timeout=20):
@@ -196,6 +227,31 @@ def test_repr_contains_db_info_unix(self):
196227
expected = "path=abc,db=0,client_name=test-client"
197228
assert expected in repr(pool)
198229

230+
def test_min_connections(self):
231+
pool = valkey.BlockingConnectionPool(
232+
connection_class=DummyConnection,
233+
max_connections=10,
234+
min_connections=5,
235+
)
236+
assert pool.min_connections == 5
237+
assert len(pool._connections) == 5
238+
239+
def test_min_connections_default(self):
240+
pool = valkey.BlockingConnectionPool(
241+
connection_class=DummyConnection,
242+
max_connections=10,
243+
)
244+
assert pool.min_connections == 0
245+
assert len(pool._connections) == 0
246+
247+
def test_min_connections_greater_than_max_raises(self):
248+
with pytest.raises(ValueError):
249+
valkey.BlockingConnectionPool(
250+
connection_class=DummyConnection,
251+
min_connections=20,
252+
max_connections=10,
253+
)
254+
199255

200256
class TestConnectionPoolURLParsing:
201257
def test_hostname(self):

valkey/asyncio/client.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ def __init__(
226226
ssl_min_version: Optional[ssl.TLSVersion] = None,
227227
ssl_ciphers: Optional[str] = None,
228228
max_connections: Optional[int] = None,
229+
min_connections: int = 0,
229230
single_connection_client: bool = False,
230231
health_check_interval: int = 0,
231232
client_name: Optional[str] = None,
@@ -251,6 +252,13 @@ def __init__(
251252
`retry_on_error` to a list of the error/s to retry on, then set
252253
`retry` to a valid `Retry` object.
253254
To retry on TimeoutError, `retry_on_timeout` can also be set to `True`.
255+
256+
Args:
257+
max_connections: The maximum number of connections for the pool.
258+
min_connections: The minimum number of connections to pre-create
259+
when the pool is initialized. These connections are eagerly
260+
connected when the client is first awaited or used.
261+
Defaults to 0.
254262
"""
255263
kwargs: Dict[str, Any]
256264
# auto_close_connection_pool only has an effect if connection_pool is
@@ -287,6 +295,7 @@ def __init__(
287295
"retry_on_error": retry_on_error,
288296
"retry": copy.deepcopy(retry),
289297
"max_connections": max_connections,
298+
"min_connections": min_connections,
290299
"health_check_interval": health_check_interval,
291300
"client_name": client_name,
292301
"lib_name": lib_name,
@@ -368,6 +377,7 @@ def __await__(self):
368377
return self.initialize().__await__()
369378

370379
async def initialize(self: _ValkeyT) -> _ValkeyT:
380+
await self.connection_pool.initialize()
371381
if self.single_connection_client:
372382
async with self._single_conn_lock:
373383
if self.connection is None:

valkey/asyncio/cluster.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,11 @@ class ValkeyCluster(AbstractValkey, AbstractValkeyCluster, AsyncValkeyClusterCom
174174
maximum number of connections are already created, a
175175
:class:`~.MaxConnectionsError` is raised. This error may be retried as defined
176176
by :attr:`connection_error_retry_attempts`
177+
:param min_connections:
178+
| Minimum number of connections per node to pre-create when the cluster is
179+
initialized. These connections are eagerly connected during cluster setup,
180+
reducing latency on the first requests. Must be less than or equal to
181+
``max_connections``. Defaults to 0.
177182
:param address_remap:
178183
| An optional callable which, when provided with an internal network
179184
address of a node, e.g. a `(host, port)` tuple, will return the address
@@ -255,6 +260,7 @@ def __init__(
255260
cluster_error_retry_attempts: int = 3,
256261
connection_error_retry_attempts: int = 3,
257262
max_connections: int = 2**31,
263+
min_connections: int = 0,
258264
# Client related kwargs
259265
db: Union[str, int] = 0,
260266
path: Optional[str] = None,
@@ -317,6 +323,7 @@ def __init__(
317323

318324
kwargs: Dict[str, Any] = {
319325
"max_connections": max_connections,
326+
"min_connections": min_connections,
320327
"connection_class": Connection,
321328
"parser_class": ClusterParser,
322329
# Client related kwargs
@@ -979,6 +986,7 @@ class ClusterNode:
979986
"connection_kwargs",
980987
"host",
981988
"max_connections",
989+
"min_connections",
982990
"name",
983991
"port",
984992
"response_callbacks",
@@ -992,12 +1000,18 @@ def __init__(
9921000
server_type: Optional[str] = None,
9931001
*,
9941002
max_connections: int = 2**31,
1003+
min_connections: int = 0,
9951004
connection_class: Type[Connection] = Connection,
9961005
**connection_kwargs: Any,
9971006
) -> None:
9981007
if host == "localhost":
9991008
host = socket.gethostbyname(host)
10001009

1010+
if min_connections > max_connections:
1011+
raise ValkeyClusterException(
1012+
'"min_connections" must be less than or equal to "max_connections"'
1013+
)
1014+
10011015
connection_kwargs["host"] = host
10021016
connection_kwargs["port"] = port
10031017
self.host = host
@@ -1006,13 +1020,28 @@ def __init__(
10061020
self.server_type = server_type
10071021

10081022
self.max_connections = max_connections
1023+
self.min_connections = min_connections
10091024
self.connection_class = connection_class
10101025
self.connection_kwargs = connection_kwargs
10111026
self.response_callbacks = connection_kwargs.pop("response_callbacks", {})
10121027

10131028
self._connections: List[Connection] = []
10141029
self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections)
10151030

1031+
# Pre-create min_connections connection objects
1032+
for _ in range(self.min_connections):
1033+
connection = self.connection_class(**self.connection_kwargs)
1034+
self._connections.append(connection)
1035+
self._free.append(connection)
1036+
1037+
async def initialize(self) -> None:
1038+
"""Connect all pre-created connections from min_connections."""
1039+
if not self._connections:
1040+
return
1041+
await asyncio.gather(
1042+
*(connection.connect() for connection in self._connections)
1043+
)
1044+
10161045
def __repr__(self) -> str:
10171046
return (
10181047
f"[host={self.host}, port={self.port}, "
@@ -1415,6 +1444,9 @@ async def initialize(self) -> None:
14151444
# If initialize was called after a MovedError, clear it
14161445
self._moved_exception = None
14171446

1447+
# Eagerly connect min_connections for each node
1448+
await asyncio.gather(*(node.initialize() for node in self.nodes_cache.values()))
1449+
14181450
async def aclose(self, attr: str = "nodes_cache") -> None:
14191451
self.default_node = None
14201452
await asyncio.gather(

valkey/asyncio/connection.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -984,6 +984,16 @@ class ConnectionPool:
984984
985985
Any additional keyword arguments are passed to the constructor of
986986
``connection_class``.
987+
988+
Args:
989+
connection_class: The class to use for creating connections.
990+
Defaults to :py:class:`~valkey.asyncio.connection.Connection`.
991+
max_connections: The maximum number of connections to create.
992+
If not set, there is no limit.
993+
min_connections: The minimum number of connections to pre-create
994+
when the pool is initialized. Call :meth:`initialize` to eagerly
995+
connect them, or they will be connected lazily on first use.
996+
Must be less than or equal to ``max_connections``. Defaults to 0.
987997
"""
988998

989999
@classmethod
@@ -1036,20 +1046,43 @@ def __init__(
10361046
self,
10371047
connection_class: Type[AbstractConnection] = Connection,
10381048
max_connections: Optional[int] = None,
1049+
min_connections: int = 0,
10391050
**connection_kwargs,
10401051
):
10411052
max_connections = max_connections or 2**31
10421053
if not isinstance(max_connections, int) or max_connections < 0:
10431054
raise ValueError('"max_connections" must be a positive integer')
1055+
if not isinstance(min_connections, int) or min_connections < 0:
1056+
raise ValueError('"min_connections" must be a non-negative integer')
1057+
if min_connections > max_connections:
1058+
raise ValueError(
1059+
'"min_connections" must be less than or equal to "max_connections"'
1060+
)
10441061

10451062
self.connection_class = connection_class
10461063
self.connection_kwargs = connection_kwargs
10471064
self.max_connections = max_connections
1065+
self.min_connections = min_connections
10481066

10491067
self._available_connections: List[AbstractConnection] = []
10501068
self._in_use_connections: Set[AbstractConnection] = set()
10511069
self.encoder_class = self.connection_kwargs.get("encoder_class", Encoder)
10521070

1071+
self._initialized = False
1072+
1073+
# Pre-create min_connections connection objects (connected lazily,
1074+
# or call initialize() to connect them eagerly)
1075+
for _ in range(self.min_connections):
1076+
self._available_connections.append(self.make_connection())
1077+
1078+
async def initialize(self):
1079+
"""Connect all pre-created connections from min_connections."""
1080+
if self._initialized:
1081+
return
1082+
for connection in self._available_connections:
1083+
await connection.connect()
1084+
self._initialized = True
1085+
10531086
def __repr__(self):
10541087
return (
10551088
f"<{self.__class__.__module__}.{self.__class__.__name__}"
@@ -1059,6 +1092,11 @@ def __repr__(self):
10591092
def reset(self):
10601093
self._available_connections = []
10611094
self._in_use_connections = weakref.WeakSet()
1095+
self._initialized = False
1096+
1097+
# Pre-create min_connections connection objects
1098+
for _ in range(self.min_connections):
1099+
self._available_connections.append(self.make_connection())
10621100

10631101
def can_get_connection(self) -> bool:
10641102
"""Return True if a connection can be retrieved from the pool."""

valkey/client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ def __init__(
204204
ssl_min_version=None,
205205
ssl_ciphers=None,
206206
max_connections=None,
207+
min_connections=0,
207208
single_connection_client=False,
208209
health_check_interval=0,
209210
client_name=None,
@@ -231,6 +232,11 @@ def __init__(
231232
232233
Args:
233234
235+
max_connections:
236+
The maximum number of connections for the pool.
237+
min_connections:
238+
The minimum number of connections to pre-create and connect
239+
when the pool is initialized. Defaults to 0.
234240
single_connection_client:
235241
if `True`, connection pool is not used. In that case `Valkey`
236242
instance use is not thread safe.
@@ -265,6 +271,7 @@ def __init__(
265271
"retry_on_error": retry_on_error,
266272
"retry": copy.deepcopy(retry),
267273
"max_connections": max_connections,
274+
"min_connections": min_connections,
268275
"health_check_interval": health_check_interval,
269276
"client_name": client_name,
270277
"lib_name": lib_name,

valkey/cluster.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ def parse_cluster_myshardid(resp, **options):
146146
"lib_name",
147147
"lib_version",
148148
"max_connections",
149+
"min_connections",
149150
"nodes_flag",
150151
"valkey_connect_func",
151152
"password",
@@ -562,7 +563,8 @@ def __init__(
562563
563564
:**kwargs:
564565
Extra arguments that will be sent into Valkey instance when created
565-
(See Official valkey-py doc for supported kwargs)
566+
(See Official valkey-py doc for supported kwargs
567+
e.g. ``max_connections``, ``min_connections``)
566568
Some kwargs are not supported and will raise a
567569
ValkeyClusterException:
568570
- db (Valkey do not support database SELECT in cluster mode)

0 commit comments

Comments
 (0)