diff --git a/docs/source/AdministratorGuide/ExternalsSupport/index.rst b/docs/source/AdministratorGuide/ExternalsSupport/index.rst index 2cdf39fb09d..0f206bb1ac4 100644 --- a/docs/source/AdministratorGuide/ExternalsSupport/index.rst +++ b/docs/source/AdministratorGuide/ExternalsSupport/index.rst @@ -55,6 +55,7 @@ You can run your OpenSearch cluster without authentication, or using User name a - ``ca_certs`` (default:``None``) - ``client_key`` (default:``None``) - ``client_cert`` (default:``None``) + - ``IndexPrefix`` (default:``''``). Prefix prepended to all DIRAC-created OpenSearch indexes. The prefix will be lower case only. to the location:: diff --git a/docs/source/AdministratorGuide/Systems/MonitoringSystem/index.rst b/docs/source/AdministratorGuide/Systems/MonitoringSystem/index.rst index 531c04e06be..6e85e39933c 100644 --- a/docs/source/AdministratorGuide/Systems/MonitoringSystem/index.rst +++ b/docs/source/AdministratorGuide/Systems/MonitoringSystem/index.rst @@ -50,13 +50,14 @@ For example:: { User = test Password = password + IndexPrefix = mydirac- } } -The following option can be set in `Systems/Monitoring/Databases/MonitoringDB`: +The following global option can be set in `Systems/NoSQLDatabases`: - *IndexPrefix*: Prefix used to prepend to indexes created in the ES instance. + *IndexPrefix*: Prefix prepended to all indexes created in the OpenSearch instance. For each monitoring types managed, the Period (how often a new index is created) can be defined with:: diff --git a/src/DIRAC/ConfigurationSystem/Client/Utilities.py b/src/DIRAC/ConfigurationSystem/Client/Utilities.py index 63c38da2354..de5c59bb591 100644 --- a/src/DIRAC/ConfigurationSystem/Client/Utilities.py +++ b/src/DIRAC/ConfigurationSystem/Client/Utilities.py @@ -532,6 +532,13 @@ def _getCACerts(cs_path): if ca_certs: parameters["ca_certs"] = ca_certs + # Global index prefix for all OpenSearch databases + result = gConfig.getOption("/Systems/NoSQLDatabases/IndexPrefix") + if result["OK"]: + parameters["IndexPrefix"] = str(result["Value"]).strip().lower() + else: + parameters["IndexPrefix"] = "" + # Check optional parameters: Host, Port, SSL result = gConfig.getOption(cs_path + "/Host") if not result["OK"]: diff --git a/src/DIRAC/Core/Base/ElasticDB.py b/src/DIRAC/Core/Base/ElasticDB.py index 7f73806bbbc..9b26178f72a 100644 --- a/src/DIRAC/Core/Base/ElasticDB.py +++ b/src/DIRAC/Core/Base/ElasticDB.py @@ -34,6 +34,7 @@ def __init__(self, fullName, indexPrefix="", parentLogger=None): self.__ca_certs = dbParameters.get("ca_certs", None) self.__client_key = dbParameters.get("client_key", None) self.__client_cert = dbParameters.get("client_cert", None) + self.__globalIndexPrefix = dbParameters.get("IndexPrefix", "") super().__init__( host=self._dbHost, @@ -41,6 +42,7 @@ def __init__(self, fullName, indexPrefix="", parentLogger=None): user=self.__user, password=self.__dbPassword, indexPrefix=indexPrefix, + globalIndexPrefix=self.__globalIndexPrefix, useSSL=self.__useSSL, useCRT=self.__useCRT, ca_certs=self.__ca_certs, diff --git a/src/DIRAC/Core/Utilities/ElasticSearchDB.py b/src/DIRAC/Core/Utilities/ElasticSearchDB.py index 46322129abd..786d4e994e7 100644 --- a/src/DIRAC/Core/Utilities/ElasticSearchDB.py +++ b/src/DIRAC/Core/Utilities/ElasticSearchDB.py @@ -103,6 +103,7 @@ def __init__( user=None, password=None, indexPrefix="", + globalIndexPrefix="", useSSL=True, useCRT=False, ca_certs=None, @@ -117,6 +118,7 @@ def __init__( :param str user: user name to access the db :param str password: if the db is password protected we need to provide a password :param str indexPrefix: it is the indexPrefix used to get all indexes + :param str globalIndexPrefix: prefix prepended to all index names and patterns :param bool useSSL: We can disable using secure connection. By default we use secure connection. :param bool useCRT: Use certificates. :param str ca_certs: CA certificates bundle. @@ -125,6 +127,7 @@ def __init__( """ self._connected = False + self.globalIndexPrefix = globalIndexPrefix if user and password: sLog.debug("Specified username and password") password = urlparse.quote_plus(password) @@ -191,6 +194,43 @@ def __init__( except ElasticConnectionError as e: sLog.error(repr(e)) + @property + def globalIndexPrefix(self) -> str: + """Global prefix prepended to all index names and patterns.""" + return self._globalIndexPrefix + + @globalIndexPrefix.setter + def globalIndexPrefix(self, value: str): + self._globalIndexPrefix = (value or "").strip().lower() + + def _withGlobalPrefix(self, indexName): + """Prepend the global index prefix to an index name or pattern.""" + if not self._globalIndexPrefix: + return indexName + + prefixedTokens = [] + for token in indexName.split(","): + strippedToken = token.strip() + if not strippedToken: + prefixedTokens.append(strippedToken) + continue + + excluded = strippedToken.startswith("-") + if excluded: + strippedToken = strippedToken[1:] + + if strippedToken == "_all": + strippedToken = "*" + + if not strippedToken.startswith(self._globalIndexPrefix): + strippedToken = f"{self._globalIndexPrefix}{strippedToken}" + + if excluded: + strippedToken = f"-{strippedToken}" + prefixedTokens.append(strippedToken) + + return ",".join(prefixedTokens) + @ifConnected def addIndexTemplate( self, name: str, index_patterns: list, mapping: dict, priority: int = 1, settings: dict = None @@ -204,6 +244,7 @@ def addIndexTemplate( """ if settings is None: settings = {"index": {"number_of_shards": 1, "number_of_replicas": 1}} + index_patterns = [self._withGlobalPrefix(pattern) for pattern in index_patterns] body = { "index_patterns": index_patterns, "priority": priority, @@ -225,6 +266,7 @@ def query(self, index: str, query): :param dict query: It is the query in OpenSearch DSL language """ + index = self._withGlobalPrefix(index) try: esDSLQueryResult = self.client.search(index=index, body=query) return S_OK(esDSLQueryResult) @@ -247,6 +289,7 @@ def update(self, index: str, query=None, updateByQuery: bool = True, docID: str if not index or not query: return S_ERROR("Missing index or query") + index = self._withGlobalPrefix(index) try: if updateByQuery: esDSLQueryResult = self.client.update_by_query(index=index, body=query) @@ -263,6 +306,7 @@ def getDoc(self, index: str, docID: str) -> dict: :param index: name of the index :param docID: document ID """ + index = self._withGlobalPrefix(index) sLog.debug(f"Retrieving document {docID} in index {index}") try: return S_OK(self.client.get(index, docID)["_source"]) @@ -280,7 +324,7 @@ def getDocs(self, indexFunc, docIDs: list[str], vo: str) -> list[dict]: :param docIDs: document IDs """ sLog.debug(f"Retrieving documents {docIDs}") - docs = [{"_index": indexFunc(docID, vo), "_id": docID} for docID in docIDs] + docs = [{"_index": self._withGlobalPrefix(indexFunc(docID, vo)), "_id": docID} for docID in docIDs] try: response = self.client.mget({"docs": docs}) except RequestError as re: @@ -298,6 +342,7 @@ def updateDoc(self, index: str, docID: str, body) -> dict: :param body: The request definition requires either `script` or partial `doc` """ + index = self._withGlobalPrefix(index) sLog.debug(f"Updating document {docID} in index {index}") try: self.client.update(index, docID, body) @@ -317,6 +362,7 @@ def deleteDoc(self, index: str, docID: str): :param index: name of the index :param docID: document ID """ + index = self._withGlobalPrefix(index) sLog.debug(f"Deleting document {docID} in index {index}") try: return S_OK(self.client.delete(index, docID)) @@ -333,6 +379,7 @@ def existsDoc(self, index: str, docID: str) -> bool: :param index: name of the index :param docID: document ID """ + index = self._withGlobalPrefix(index) sLog.debug(f"Checking if document {docID} in index {index} exists") return self.client.exists(index, docID) @@ -341,6 +388,7 @@ def _Search(self, indexname): """ it returns the object which can be used for retreiving certain value from the DB """ + indexname = self._withGlobalPrefix(indexname) return Search(using=self.client, index=indexname) def _Q(self, name_or_query="match", **params): @@ -363,6 +411,7 @@ def getIndexes(self, indexName=None): """ if not indexName: indexName = "" + indexName = self._withGlobalPrefix(indexName) sLog.debug(f"Getting indices alias of {indexName}") # we only return indexes which belong to a specific prefix for example 'lhcb-production' or 'dirac-production etc. return list(self.client.indices.get_alias(f"{indexName}*")) @@ -376,6 +425,7 @@ def getDocTypes(self, indexName): :return: S_OK or S_ERROR """ result = [] + indexName = self._withGlobalPrefix(indexName) try: sLog.debug("Getting mappings for ", indexName) result = self.client.indices.get_mapping(indexName) @@ -407,6 +457,7 @@ def existingIndex(self, indexName): :param str indexName: the name of the index :returns: S_OK/S_ERROR if the request is successful """ + indexName = self._withGlobalPrefix(indexName) sLog.debug(f"Checking existance of index {indexName}") try: return S_OK(self.client.indices.exists(indexName)) @@ -428,6 +479,7 @@ def createIndex(self, indexPrefix, mapping=None, period="day"): else: sLog.warn("The period is not provided, so using non-periodic indexes names") fullIndex = indexPrefix + fullIndex = self._withGlobalPrefix(fullIndex) try: if not mapping: @@ -444,6 +496,7 @@ def deleteIndex(self, indexName): """ :param str indexName: the name of the index to be deleted... """ + indexName = self._withGlobalPrefix(indexName) sLog.info("Deleting index", indexName) try: retVal = self.client.indices.delete(indexName) @@ -474,6 +527,7 @@ def index(self, indexName, body=None, docID=None, op_type="index"): if not indexName or not body: return S_ERROR("Missing index or body") + indexName = self._withGlobalPrefix(indexName) try: res = self.client.index(index=indexName, body=body, id=docID, params={"op_type": op_type}) except (RequestError, TransportError) as e: @@ -505,8 +559,9 @@ def bulk_index(self, indexPrefix, data=None, mapping=None, period="day", withTim indexName = self.generateFullIndexName(indexPrefix, period) else: indexName = indexPrefix - sLog.debug(f"Bulk indexing into {indexName} of {len(data)}") + sLog.debug(f"Bulk indexing into {self._withGlobalPrefix(indexName)} of {len(data)}") + # Keep existence/creation checks on the raw name path; methods apply global prefix internally. res = self.existingIndex(indexName) if not res["OK"]: return res @@ -515,6 +570,9 @@ def bulk_index(self, indexPrefix, data=None, mapping=None, period="day", withTim if not retVal["OK"]: return retVal + # Prefix exactly once for the direct bulk API call. + indexName = self._withGlobalPrefix(indexName) + try: res = bulk(client=self.client, index=indexName, actions=generateDocs(data, withTimeStamp)) except (BulkIndexError, RequestError) as e: @@ -534,6 +592,7 @@ def getUniqueValue(self, indexName, key, orderBy=False): :param dict orderBy: it is a dictionary in case we want to order the result {key:'desc'} or {key:'asc'} :returns: a list of unique value for a certain key from the dictionary. """ + indexName = self._withGlobalPrefix(indexName) query = self._Search(indexName) @@ -592,6 +651,7 @@ def deleteByQuery(self, indexName, query): :param str indexName: the name of the index :param str query: the JSON-formatted query for which we want to issue the delete """ + indexName = self._withGlobalPrefix(indexName) try: self.client.delete_by_query(index=indexName, body=query) except Exception as inst: diff --git a/src/DIRAC/Core/scripts/install_full.cfg b/src/DIRAC/Core/scripts/install_full.cfg index 4ea5e518ed2..233dfb60144 100755 --- a/src/DIRAC/Core/scripts/install_full.cfg +++ b/src/DIRAC/Core/scripts/install_full.cfg @@ -142,5 +142,7 @@ LocalInstallation Password = Host = Port = + # Optional global prefix prepended to all DIRAC-created OpenSearch indexes + # IndexPrefix = } } diff --git a/src/DIRAC/MonitoringSystem/DB/MonitoringDB.py b/src/DIRAC/MonitoringSystem/DB/MonitoringDB.py index 4c09ef93c30..37112bf3d07 100644 --- a/src/DIRAC/MonitoringSystem/DB/MonitoringDB.py +++ b/src/DIRAC/MonitoringSystem/DB/MonitoringDB.py @@ -3,9 +3,8 @@ **Configuration Parameters**: -The following option can be set in `Systems/Monitoring/Databases/MonitoringDB` - -* *IndexPrefix*: Prefix used to prepend to indexes created in the OpenSearch instance. +The global OpenSearch index prefix can be set in +`Systems/NoSQLDatabases/IndexPrefix`. For each monitoring types managed, the Period (how often a new index is created) can be defined with:: @@ -31,8 +30,6 @@ import time from DIRAC import S_ERROR, S_OK -from DIRAC.ConfigurationSystem.Client.Config import gConfig -from DIRAC.ConfigurationSystem.Client.PathFinder import getDatabaseSection from DIRAC.Core.Base.ElasticDB import ElasticDB from DIRAC.Core.Utilities.Plotting.TypeLoader import TypeLoader @@ -45,10 +42,8 @@ def __init__(self, name="Monitoring/MonitoringDB"): """Standard constructor""" try: - section = getDatabaseSection("Monitoring/MonitoringDB") - indexPrefix = gConfig.getValue(f"{section}/IndexPrefix", "").lower() # Connecting to the ES cluster - super().__init__(fullName=name, indexPrefix=indexPrefix) + super().__init__(fullName=name) except RuntimeError as ex: self.log.error("Can't connect to MonitoringDB", repr(ex)) raise ex