Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ def setup(app):
intersphinx_mapping = {
"python": ("https://docs.python.org/3/", None),
"matplotlib": ("https://matplotlib.org/", None),
"requests": ("https://requests.readthedocs.io/en/latest/", None),
}

# check for :param / :return in html, points to faulty syntax, missing empty lines, etc.
Expand Down
76 changes: 74 additions & 2 deletions src/DIRAC/FrameworkSystem/Utilities/diracx.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import random
import requests

from cachetools import TTLCache, LRUCache, cached
from cachetools.keys import hashkey
from pathlib import Path
from requests.adapters import HTTPAdapter
from tempfile import NamedTemporaryFile
from typing import Any
from collections.abc import Generator
from urllib3 import PoolManager
from urllib3.connectionpool import HTTPConnectionPool, HTTPSConnectionPool

from DIRAC import gConfig
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
from contextlib import contextmanager
Expand All @@ -26,7 +31,74 @@
DEFAULT_TOKEN_CACHE_TTL = 5 * 60
DEFAULT_TOKEN_CACHE_SIZE = 1024

legacy_exchange_session = requests.Session()
# Number of pools to use for a given host.
# It should be in the order of host behind the alias
SESSION_NUM_POOLS = 20
# Number of connection per Pool
SESSION_CONNECTION_POOL_MAX_SIZE = 10


class RandomizedPoolManager(PoolManager):
"""
A PoolManager subclass that creates multiple connection pools per host.
Each connection request randomly picks one of the available pools.
"""

def __init__(self, num_pools=3, **kwargs):
self.num_pools = num_pools
super().__init__(**kwargs)

def connection_from_host(self, host, port=None, scheme="http", pool_kwargs=None):
# Pick a random index to diversify the pool key.

rand_index = random.randint(0, self.num_pools - 1)
pool_key = (f"{host}-{rand_index}", port, scheme)
if pool_key in self.pools:
return self.pools[pool_key]

# Create a new pool if none exists for this key.
if scheme == "http":
self.pools[pool_key] = HTTPConnectionPool(host, port, **self.connection_pool_kw)
elif scheme == "https":
self.pools[pool_key] = HTTPSConnectionPool(host, port, **self.connection_pool_kw)
else:
raise ValueError(f"Unsupported scheme: {scheme}")

return self.pools[pool_key]


class RandomizedHTTPAdapter(HTTPAdapter):
"""
An HTTPAdapter that uses the RandomizedPoolManager.
"""

def __init__(self, num_pools=3, maxsize=10, **kwargs):
self.num_pools = num_pools
self.custom_maxsize = maxsize
super().__init__(**kwargs)

def init_poolmanager(self, connections, maxsize, block=False, **pool_kwargs):
"""
Initialize the pool manager with our custom RandomizedPoolManager.
"""
# This ends up being passed to the HTTP(s)ConnectionPool constructors
pool_kwargs.update(
{
"maxsize": self.custom_maxsize,
"block": block,
Comment thread
andresailer marked this conversation as resolved.
}
)
self.poolmanager = RandomizedPoolManager(**pool_kwargs)


# Create a requests session.
diracx_session = requests.Session()
# Create an instance of the custom adapter.
diracx_pool_adapter = RandomizedHTTPAdapter(num_pools=SESSION_NUM_POOLS, maxsize=SESSION_CONNECTION_POOL_MAX_SIZE)

# Mount the adapter to handle both HTTP and HTTPS.
diracx_session.mount("http://", diracx_pool_adapter)
diracx_session.mount("https://", diracx_pool_adapter)


def get_token(
Expand All @@ -45,7 +117,7 @@ def get_token(
vo = Registry.getVOForGroup(group)
scopes = [f"vo:{vo}", f"group:{group}"] + [f"property:{prop}" for prop in dirac_properties]

r = legacy_exchange_session.get(
r = diracx_session.get(
f"{diracxUrl}/api/auth/legacy-exchange",
params={
"preferred_username": username,
Expand Down
Loading