Skip to content

Commit 78f65f2

Browse files
committed
feat (diracx): add a randomized connection pooling
1 parent 7d82709 commit 78f65f2

1 file changed

Lines changed: 66 additions & 1 deletion

File tree

src/DIRAC/FrameworkSystem/Utilities/diracx.py

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1+
import random
12
import requests
23

4+
35
from cachetools import TTLCache, cached
46
from cachetools.keys import hashkey
57
from pathlib import Path
8+
from requests.adapters import HTTPAdapter
69
from tempfile import NamedTemporaryFile
710
from typing import Any
11+
from urllib3 import PoolManager
12+
from urllib3.connectionpool import HTTPConnectionPool, HTTPSConnectionPool
13+
814
from DIRAC import gConfig
915
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
1016

@@ -26,6 +32,65 @@
2632
DEFAULT_TOKEN_CACHE_SIZE = 1024
2733

2834

35+
class RandomizedPoolManager(PoolManager):
36+
"""
37+
A PoolManager subclass that creates multiple connection pools per host.
38+
Each connection request randomly picks one of the available pools.
39+
"""
40+
41+
def __init__(self, num_pools=3, **kwargs):
42+
self.num_pools = num_pools
43+
super().__init__(**kwargs)
44+
45+
def connection_from_host(self, host, port=None, scheme="http"):
46+
# Pick a random index to diversify the pool key.
47+
rand_index = random.randint(0, self.num_pools - 1)
48+
pool_key = (f"{host}-{rand_index}", port, scheme)
49+
50+
if pool_key in self.pools:
51+
return self.pools[pool_key]
52+
53+
# Create a new pool if none exists for this key.
54+
if scheme == "http":
55+
self.pools[pool_key] = HTTPConnectionPool(host, port, **self.connection_pool_kw)
56+
elif scheme == "https":
57+
self.pools[pool_key] = HTTPSConnectionPool(host, port, **self.connection_pool_kw)
58+
else:
59+
raise ValueError(f"Unsupported scheme: {scheme}")
60+
61+
return self.pools[pool_key]
62+
63+
64+
class RandomizedHTTPAdapter(HTTPAdapter):
65+
"""
66+
An HTTPAdapter that uses the RandomizedPoolManager.
67+
"""
68+
69+
def __init__(self, num_pools=3, maxsize=10, **kwargs):
70+
self.num_pools = num_pools
71+
self.custom_maxsize = maxsize
72+
super().__init__(**kwargs)
73+
74+
def init_poolmanager(self, connections, maxsize, block=False, **pool_kwargs):
75+
"""
76+
Initialize the pool manager with our custom RandomizedPoolManager.
77+
"""
78+
pool_kwargs.update({"num_pools": self.num_pools, "maxsize": self.custom_maxsize, "block": block})
79+
self.poolmanager = RandomizedPoolManager(**pool_kwargs)
80+
81+
82+
# Create a requests session.
83+
dirax_session = requests.Session()
84+
85+
# Create an instance of the custom adapter.
86+
# For example, 5 sub-pools per host, each allowing up to 10 connections.
87+
diracx_pool_adapter = RandomizedHTTPAdapter(num_pools=10, maxsize=10)
88+
89+
# Mount the adapter to handle both HTTP and HTTPS.
90+
dirax_session.mount("http://", diracx_pool_adapter)
91+
dirax_session.mount("https://", diracx_pool_adapter)
92+
93+
2994
def get_token(
3095
username: str, group: str, dirac_properties: set[str], *, expires_minutes: int | None = None, source: str = ""
3196
):
@@ -42,7 +107,7 @@ def get_token(
42107
vo = Registry.getVOForGroup(group)
43108
scopes = [f"vo:{vo}", f"group:{group}"] + [f"property:{prop}" for prop in dirac_properties]
44109

45-
r = requests.get(
110+
r = dirax_session.get(
46111
f"{diracxUrl}/api/auth/legacy-exchange",
47112
params={
48113
"preferred_username": username,

0 commit comments

Comments
 (0)