Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ You can run your OpenSearch cluster without authentication, or using User name a
- ``ca_certs`` (default:``None``)
- ``client_key`` (default:``None``)
- ``client_cert`` (default:``None``)
- ``IndexPrefix`` (default:``''``). Prefix prepended to all DIRAC-created OpenSearch indexes. The prefix will be lower case only.


to the location::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ For example::
{
User = test
Password = password
IndexPrefix = mydirac-
}
}


The following option can be set in `Systems/Monitoring/Databases/MonitoringDB`:
The following global option can be set in `Systems/NoSQLDatabases`:

*IndexPrefix*: Prefix used to prepend to indexes created in the ES instance.
*IndexPrefix*: Prefix prepended to all indexes created in the OpenSearch instance.

For each monitoring types managed, the Period (how often a new index is created)
can be defined with::
Expand Down
7 changes: 7 additions & 0 deletions src/DIRAC/ConfigurationSystem/Client/Utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,13 @@ def _getCACerts(cs_path):
if ca_certs:
parameters["ca_certs"] = ca_certs

# Global index prefix for all OpenSearch databases
result = gConfig.getOption("/Systems/NoSQLDatabases/IndexPrefix")
if result["OK"]:
parameters["IndexPrefix"] = str(result["Value"]).strip().lower()
else:
parameters["IndexPrefix"] = ""

# Check optional parameters: Host, Port, SSL
result = gConfig.getOption(cs_path + "/Host")
if not result["OK"]:
Expand Down
2 changes: 2 additions & 0 deletions src/DIRAC/Core/Base/ElasticDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ def __init__(self, fullName, indexPrefix="", parentLogger=None):
self.__ca_certs = dbParameters.get("ca_certs", None)
self.__client_key = dbParameters.get("client_key", None)
self.__client_cert = dbParameters.get("client_cert", None)
self.__globalIndexPrefix = dbParameters.get("IndexPrefix", "")

super().__init__(
host=self._dbHost,
port=self._dbPort,
user=self.__user,
password=self.__dbPassword,
indexPrefix=indexPrefix,
globalIndexPrefix=self.__globalIndexPrefix,
useSSL=self.__useSSL,
useCRT=self.__useCRT,
ca_certs=self.__ca_certs,
Expand Down
64 changes: 62 additions & 2 deletions src/DIRAC/Core/Utilities/ElasticSearchDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def __init__(
user=None,
password=None,
indexPrefix="",
globalIndexPrefix="",
useSSL=True,
useCRT=False,
ca_certs=None,
Expand All @@ -117,6 +118,7 @@ def __init__(
:param str user: user name to access the db
:param str password: if the db is password protected we need to provide a password
:param str indexPrefix: it is the indexPrefix used to get all indexes
:param str globalIndexPrefix: prefix prepended to all index names and patterns
:param bool useSSL: We can disable using secure connection. By default we use secure connection.
:param bool useCRT: Use certificates.
:param str ca_certs: CA certificates bundle.
Expand All @@ -125,6 +127,7 @@ def __init__(
"""

self._connected = False
self.globalIndexPrefix = globalIndexPrefix
if user and password:
sLog.debug("Specified username and password")
password = urlparse.quote_plus(password)
Expand Down Expand Up @@ -191,6 +194,43 @@ def __init__(
except ElasticConnectionError as e:
sLog.error(repr(e))

@property
def globalIndexPrefix(self) -> str:
"""Global prefix prepended to all index names and patterns."""
return self._globalIndexPrefix

@globalIndexPrefix.setter
def globalIndexPrefix(self, value: str):
self._globalIndexPrefix = (value or "").strip().lower()

def _withGlobalPrefix(self, indexName):
"""Prepend the global index prefix to an index name or pattern."""
if not self._globalIndexPrefix:
return indexName

prefixedTokens = []
for token in indexName.split(","):
strippedToken = token.strip()
if not strippedToken:
prefixedTokens.append(strippedToken)
continue

excluded = strippedToken.startswith("-")
if excluded:
strippedToken = strippedToken[1:]

if strippedToken == "_all":
strippedToken = "*"

if not strippedToken.startswith(self._globalIndexPrefix):
strippedToken = f"{self._globalIndexPrefix}{strippedToken}"

if excluded:
strippedToken = f"-{strippedToken}"
prefixedTokens.append(strippedToken)

return ",".join(prefixedTokens)

@ifConnected
def addIndexTemplate(
self, name: str, index_patterns: list, mapping: dict, priority: int = 1, settings: dict = None
Expand All @@ -204,6 +244,7 @@ def addIndexTemplate(
"""
if settings is None:
settings = {"index": {"number_of_shards": 1, "number_of_replicas": 1}}
index_patterns = [self._withGlobalPrefix(pattern) for pattern in index_patterns]
body = {
"index_patterns": index_patterns,
"priority": priority,
Expand All @@ -225,6 +266,7 @@ def query(self, index: str, query):
:param dict query: It is the query in OpenSearch DSL language

"""
index = self._withGlobalPrefix(index)
try:
esDSLQueryResult = self.client.search(index=index, body=query)
return S_OK(esDSLQueryResult)
Expand All @@ -247,6 +289,7 @@ def update(self, index: str, query=None, updateByQuery: bool = True, docID: str
if not index or not query:
return S_ERROR("Missing index or query")

index = self._withGlobalPrefix(index)
try:
if updateByQuery:
esDSLQueryResult = self.client.update_by_query(index=index, body=query)
Expand All @@ -263,6 +306,7 @@ def getDoc(self, index: str, docID: str) -> dict:
:param index: name of the index
:param docID: document ID
"""
index = self._withGlobalPrefix(index)
sLog.debug(f"Retrieving document {docID} in index {index}")
try:
return S_OK(self.client.get(index, docID)["_source"])
Expand All @@ -280,7 +324,7 @@ def getDocs(self, indexFunc, docIDs: list[str], vo: str) -> list[dict]:
:param docIDs: document IDs
"""
sLog.debug(f"Retrieving documents {docIDs}")
docs = [{"_index": indexFunc(docID, vo), "_id": docID} for docID in docIDs]
docs = [{"_index": self._withGlobalPrefix(indexFunc(docID, vo)), "_id": docID} for docID in docIDs]
try:
response = self.client.mget({"docs": docs})
except RequestError as re:
Expand All @@ -298,6 +342,7 @@ def updateDoc(self, index: str, docID: str, body) -> dict:
:param body: The request definition requires either `script` or
partial `doc`
"""
index = self._withGlobalPrefix(index)
sLog.debug(f"Updating document {docID} in index {index}")
try:
self.client.update(index, docID, body)
Expand All @@ -317,6 +362,7 @@ def deleteDoc(self, index: str, docID: str):
:param index: name of the index
:param docID: document ID
"""
index = self._withGlobalPrefix(index)
sLog.debug(f"Deleting document {docID} in index {index}")
try:
return S_OK(self.client.delete(index, docID))
Expand All @@ -333,6 +379,7 @@ def existsDoc(self, index: str, docID: str) -> bool:
:param index: name of the index
:param docID: document ID
"""
index = self._withGlobalPrefix(index)
sLog.debug(f"Checking if document {docID} in index {index} exists")
return self.client.exists(index, docID)

Expand All @@ -341,6 +388,7 @@ def _Search(self, indexname):
"""
it returns the object which can be used for retreiving certain value from the DB
"""
indexname = self._withGlobalPrefix(indexname)
return Search(using=self.client, index=indexname)

def _Q(self, name_or_query="match", **params):
Expand All @@ -363,6 +411,7 @@ def getIndexes(self, indexName=None):
"""
if not indexName:
indexName = ""
indexName = self._withGlobalPrefix(indexName)
sLog.debug(f"Getting indices alias of {indexName}")
# we only return indexes which belong to a specific prefix for example 'lhcb-production' or 'dirac-production etc.
return list(self.client.indices.get_alias(f"{indexName}*"))
Expand All @@ -376,6 +425,7 @@ def getDocTypes(self, indexName):
:return: S_OK or S_ERROR
"""
result = []
indexName = self._withGlobalPrefix(indexName)
try:
sLog.debug("Getting mappings for ", indexName)
result = self.client.indices.get_mapping(indexName)
Expand Down Expand Up @@ -407,6 +457,7 @@ def existingIndex(self, indexName):
:param str indexName: the name of the index
:returns: S_OK/S_ERROR if the request is successful
"""
indexName = self._withGlobalPrefix(indexName)
sLog.debug(f"Checking existance of index {indexName}")
try:
return S_OK(self.client.indices.exists(indexName))
Expand All @@ -428,6 +479,7 @@ def createIndex(self, indexPrefix, mapping=None, period="day"):
else:
sLog.warn("The period is not provided, so using non-periodic indexes names")
fullIndex = indexPrefix
fullIndex = self._withGlobalPrefix(fullIndex)

try:
if not mapping:
Expand All @@ -444,6 +496,7 @@ def deleteIndex(self, indexName):
"""
:param str indexName: the name of the index to be deleted...
"""
indexName = self._withGlobalPrefix(indexName)
sLog.info("Deleting index", indexName)
try:
retVal = self.client.indices.delete(indexName)
Expand Down Expand Up @@ -474,6 +527,7 @@ def index(self, indexName, body=None, docID=None, op_type="index"):
if not indexName or not body:
return S_ERROR("Missing index or body")

indexName = self._withGlobalPrefix(indexName)
try:
res = self.client.index(index=indexName, body=body, id=docID, params={"op_type": op_type})
except (RequestError, TransportError) as e:
Expand Down Expand Up @@ -505,8 +559,9 @@ def bulk_index(self, indexPrefix, data=None, mapping=None, period="day", withTim
indexName = self.generateFullIndexName(indexPrefix, period)
else:
indexName = indexPrefix
sLog.debug(f"Bulk indexing into {indexName} of {len(data)}")
sLog.debug(f"Bulk indexing into {self._withGlobalPrefix(indexName)} of {len(data)}")

# Keep existence/creation checks on the raw name path; methods apply global prefix internally.
res = self.existingIndex(indexName)
if not res["OK"]:
return res
Expand All @@ -515,6 +570,9 @@ def bulk_index(self, indexPrefix, data=None, mapping=None, period="day", withTim
if not retVal["OK"]:
return retVal

# Prefix exactly once for the direct bulk API call.
indexName = self._withGlobalPrefix(indexName)

try:
res = bulk(client=self.client, index=indexName, actions=generateDocs(data, withTimeStamp))
except (BulkIndexError, RequestError) as e:
Expand All @@ -534,6 +592,7 @@ def getUniqueValue(self, indexName, key, orderBy=False):
:param dict orderBy: it is a dictionary in case we want to order the result {key:'desc'} or {key:'asc'}
:returns: a list of unique value for a certain key from the dictionary.
"""
indexName = self._withGlobalPrefix(indexName)

query = self._Search(indexName)

Expand Down Expand Up @@ -592,6 +651,7 @@ def deleteByQuery(self, indexName, query):
:param str indexName: the name of the index
:param str query: the JSON-formatted query for which we want to issue the delete
"""
indexName = self._withGlobalPrefix(indexName)
try:
self.client.delete_by_query(index=indexName, body=query)
except Exception as inst:
Expand Down
2 changes: 2 additions & 0 deletions src/DIRAC/Core/scripts/install_full.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -142,5 +142,7 @@ LocalInstallation
Password = <password>
Host = <host>
Port = <port>
# Optional global prefix prepended to all DIRAC-created OpenSearch indexes
# IndexPrefix = <prefix>
}
}
11 changes: 3 additions & 8 deletions src/DIRAC/MonitoringSystem/DB/MonitoringDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@

**Configuration Parameters**:

The following option can be set in `Systems/Monitoring/Databases/MonitoringDB`

* *IndexPrefix*: Prefix used to prepend to indexes created in the OpenSearch instance.
The global OpenSearch index prefix can be set in
`Systems/NoSQLDatabases/IndexPrefix`.

For each monitoring types managed, the Period (how often a new index is created)
can be defined with::
Expand All @@ -31,8 +30,6 @@
import time

from DIRAC import S_ERROR, S_OK
from DIRAC.ConfigurationSystem.Client.Config import gConfig
from DIRAC.ConfigurationSystem.Client.PathFinder import getDatabaseSection
from DIRAC.Core.Base.ElasticDB import ElasticDB
from DIRAC.Core.Utilities.Plotting.TypeLoader import TypeLoader

Expand All @@ -45,10 +42,8 @@ def __init__(self, name="Monitoring/MonitoringDB"):
"""Standard constructor"""

try:
section = getDatabaseSection("Monitoring/MonitoringDB")
indexPrefix = gConfig.getValue(f"{section}/IndexPrefix", "").lower()
# Connecting to the ES cluster
super().__init__(fullName=name, indexPrefix=indexPrefix)
super().__init__(fullName=name)
except RuntimeError as ex:
self.log.error("Can't connect to MonitoringDB", repr(ex))
raise ex
Expand Down
Loading