Skip to content

Commit 965420d

Browse files
committed
feat (diracx): add a randomized connection pooling
1 parent 7d82709 commit 965420d

1 file changed

Lines changed: 75 additions & 1 deletion

File tree

src/DIRAC/FrameworkSystem/Utilities/diracx.py

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1+
import random
12
import requests
23

4+
35
from cachetools import TTLCache, cached
46
from cachetools.keys import hashkey
57
from pathlib import Path
8+
from requests.adapters import HTTPAdapter
69
from tempfile import NamedTemporaryFile
710
from typing import Any
11+
from urllib3 import PoolManager
12+
from urllib3.connectionpool import HTTPConnectionPool, HTTPSConnectionPool
13+
814
from DIRAC import gConfig
915
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
1016

@@ -25,6 +31,74 @@
2531
DEFAULT_TOKEN_CACHE_TTL = 5 * 60
2632
DEFAULT_TOKEN_CACHE_SIZE = 1024
2733

34+
# Number of pools to use for a given host.
35+
# It should be in the order of host behind the alias
36+
SESSION_NUM_POOLS = 20
37+
# Number of connection per Pool
38+
SESSION_CONNECTION_POOL_MAX_SIZE = 10
39+
40+
41+
class RandomizedPoolManager(PoolManager):
42+
"""
43+
A PoolManager subclass that creates multiple connection pools per host.
44+
Each connection request randomly picks one of the available pools.
45+
"""
46+
47+
def __init__(self, num_pools=3, **kwargs):
48+
self.num_pools = num_pools
49+
super().__init__(**kwargs)
50+
51+
def connection_from_host(self, host, port=None, scheme="http", pool_kwargs=None):
52+
# Pick a random index to diversify the pool key.
53+
54+
rand_index = random.randint(0, self.num_pools - 1)
55+
pool_key = (f"{host}-{rand_index}", port, scheme)
56+
if pool_key in self.pools:
57+
return self.pools[pool_key]
58+
59+
# Create a new pool if none exists for this key.
60+
if scheme == "http":
61+
self.pools[pool_key] = HTTPConnectionPool(host, port, **self.connection_pool_kw)
62+
elif scheme == "https":
63+
self.pools[pool_key] = HTTPSConnectionPool(host, port, **self.connection_pool_kw)
64+
else:
65+
raise ValueError(f"Unsupported scheme: {scheme}")
66+
67+
return self.pools[pool_key]
68+
69+
70+
class RandomizedHTTPAdapter(HTTPAdapter):
71+
"""
72+
An HTTPAdapter that uses the RandomizedPoolManager.
73+
"""
74+
75+
def __init__(self, num_pools=3, maxsize=10, **kwargs):
76+
self.num_pools = num_pools
77+
self.custom_maxsize = maxsize
78+
super().__init__(**kwargs)
79+
80+
def init_poolmanager(self, connections, maxsize, block=False, **pool_kwargs):
81+
"""
82+
Initialize the pool manager with our custom RandomizedPoolManager.
83+
"""
84+
pool_kwargs.update(
85+
{
86+
"maxsize": self.custom_maxsize,
87+
"block": block,
88+
}
89+
)
90+
self.poolmanager = RandomizedPoolManager(**pool_kwargs)
91+
92+
93+
# Create a requests session.
94+
diracx_session = requests.Session()
95+
# Create an instance of the custom adapter.
96+
diracx_pool_adapter = RandomizedHTTPAdapter(num_pools=SESSION_NUM_POOLS, maxsize=SESSION_CONNECTION_POOL_MAX_SIZE)
97+
98+
# Mount the adapter to handle both HTTP and HTTPS.
99+
diracx_session.mount("http://", diracx_pool_adapter)
100+
diracx_session.mount("https://", diracx_pool_adapter)
101+
28102

29103
def get_token(
30104
username: str, group: str, dirac_properties: set[str], *, expires_minutes: int | None = None, source: str = ""
@@ -42,7 +116,7 @@ def get_token(
42116
vo = Registry.getVOForGroup(group)
43117
scopes = [f"vo:{vo}", f"group:{group}"] + [f"property:{prop}" for prop in dirac_properties]
44118

45-
r = requests.get(
119+
r = diracx_session.get(
46120
f"{diracxUrl}/api/auth/legacy-exchange",
47121
params={
48122
"preferred_username": username,

0 commit comments

Comments
 (0)