diff --git a/docs/source/conf.py b/docs/source/conf.py index 73f0b1dc3e0..e17191c8170 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -306,6 +306,7 @@ def setup(app): intersphinx_mapping = { "python": ("https://docs.python.org/3/", None), "matplotlib": ("https://matplotlib.org/", None), + "requests": ("https://requests.readthedocs.io/en/latest/", None), } # check for :param / :return in html, points to faulty syntax, missing empty lines, etc. diff --git a/src/DIRAC/FrameworkSystem/Utilities/diracx.py b/src/DIRAC/FrameworkSystem/Utilities/diracx.py index 631e4167c3e..2a8d5d24c16 100644 --- a/src/DIRAC/FrameworkSystem/Utilities/diracx.py +++ b/src/DIRAC/FrameworkSystem/Utilities/diracx.py @@ -1,11 +1,16 @@ +import random import requests from cachetools import TTLCache, LRUCache, cached from cachetools.keys import hashkey from pathlib import Path +from requests.adapters import HTTPAdapter from tempfile import NamedTemporaryFile from typing import Any from collections.abc import Generator +from urllib3 import PoolManager +from urllib3.connectionpool import HTTPConnectionPool, HTTPSConnectionPool + from DIRAC import gConfig from DIRAC.ConfigurationSystem.Client.Helpers import Registry from contextlib import contextmanager @@ -26,7 +31,74 @@ DEFAULT_TOKEN_CACHE_TTL = 5 * 60 DEFAULT_TOKEN_CACHE_SIZE = 1024 -legacy_exchange_session = requests.Session() +# Number of pools to use for a given host. +# It should be in the order of host behind the alias +SESSION_NUM_POOLS = 20 +# Number of connection per Pool +SESSION_CONNECTION_POOL_MAX_SIZE = 10 + + +class RandomizedPoolManager(PoolManager): + """ + A PoolManager subclass that creates multiple connection pools per host. + Each connection request randomly picks one of the available pools. + """ + + def __init__(self, num_pools=3, **kwargs): + self.num_pools = num_pools + super().__init__(**kwargs) + + def connection_from_host(self, host, port=None, scheme="http", pool_kwargs=None): + # Pick a random index to diversify the pool key. + + rand_index = random.randint(0, self.num_pools - 1) + pool_key = (f"{host}-{rand_index}", port, scheme) + if pool_key in self.pools: + return self.pools[pool_key] + + # Create a new pool if none exists for this key. + if scheme == "http": + self.pools[pool_key] = HTTPConnectionPool(host, port, **self.connection_pool_kw) + elif scheme == "https": + self.pools[pool_key] = HTTPSConnectionPool(host, port, **self.connection_pool_kw) + else: + raise ValueError(f"Unsupported scheme: {scheme}") + + return self.pools[pool_key] + + +class RandomizedHTTPAdapter(HTTPAdapter): + """ + An HTTPAdapter that uses the RandomizedPoolManager. + """ + + def __init__(self, num_pools=3, maxsize=10, **kwargs): + self.num_pools = num_pools + self.custom_maxsize = maxsize + super().__init__(**kwargs) + + def init_poolmanager(self, connections, maxsize, block=False, **pool_kwargs): + """ + Initialize the pool manager with our custom RandomizedPoolManager. + """ + # This ends up being passed to the HTTP(s)ConnectionPool constructors + pool_kwargs.update( + { + "maxsize": self.custom_maxsize, + "block": block, + } + ) + self.poolmanager = RandomizedPoolManager(**pool_kwargs) + + +# Create a requests session. +diracx_session = requests.Session() +# Create an instance of the custom adapter. +diracx_pool_adapter = RandomizedHTTPAdapter(num_pools=SESSION_NUM_POOLS, maxsize=SESSION_CONNECTION_POOL_MAX_SIZE) + +# Mount the adapter to handle both HTTP and HTTPS. +diracx_session.mount("http://", diracx_pool_adapter) +diracx_session.mount("https://", diracx_pool_adapter) def get_token( @@ -45,7 +117,7 @@ def get_token( vo = Registry.getVOForGroup(group) scopes = [f"vo:{vo}", f"group:{group}"] + [f"property:{prop}" for prop in dirac_properties] - r = legacy_exchange_session.get( + r = diracx_session.get( f"{diracxUrl}/api/auth/legacy-exchange", params={ "preferred_username": username,