@@ -60,6 +60,8 @@ class RucioFileCatalogClient(FileCatalogClientBase):
6060 "resolveDataset" ,
6161 "getLFNForPFN" ,
6262 "getUserDirectory" ,
63+ "getFileUserMetadata" ,
64+ "findFilesByMetadata" ,
6365 ]
6466
6567 WRITE_METHODS = FileCatalogClientBase .WRITE_METHODS + [
@@ -78,13 +80,15 @@ class RucioFileCatalogClient(FileCatalogClientBase):
7880 "createDataset" ,
7981 "changePathOwner" ,
8082 "changePathMode" ,
83+ "setMetadata" ,
8184 ]
8285
8386 NO_LFN_METHODS = FileCatalogClientBase .NO_LFN_METHODS + [
8487 "getUserDirectory" ,
8588 "createUserDirectory" ,
8689 "createUserMapping" ,
8790 "removeUserDirectory" ,
91+ "findFilesByMetadata" ,
8892 ]
8993
9094 ADMIN_METHODS = FileCatalogClientBase .ADMIN_METHODS + [
@@ -697,3 +701,193 @@ def getDirectorySize(self, lfns, longOutput=False, rawFiles=False):
697701 except Exception as err :
698702 return S_ERROR (str (err ))
699703 return S_OK (resDict )
704+
705+ @checkCatalogArguments
706+ def getFileUserMetadata (self , path ):
707+ """Get the meta data attached to a file, but also to
708+ all its parents
709+ """
710+ path = next (iter (path ))
711+ resDict = {"Successful" : {}, "Failed" : {}}
712+ try :
713+ did = self .__getDidsFromLfn (path )
714+ meta = next (self .client .get_metadata_bulk (dids = [did ], inherit = True , plugin = "ALL" ))
715+ if meta ["did_type" ] == "FILE" : # Should we also return the metadata for the directories ?
716+ resDict ["Successful" ][path ] = meta
717+ else :
718+ resDict ["Failed" ][path ] = "Not a file"
719+ except DataIdentifierNotFound :
720+ resDict ["Failed" ][path ] = "No such file or directory"
721+ except Exception as err :
722+ return S_ERROR (str (err ))
723+ return S_OK (resDict )
724+
725+ @checkCatalogArguments
726+ def getFileUserMetadataBulk (self , lfns ):
727+ """Get the meta data attached to a list of files, but also to
728+ all their parents
729+ """
730+ resDict = {"Successful" : {}, "Failed" : {}}
731+ dids = []
732+ lfnChunks = breakListIntoChunks (lfns , 1000 )
733+ for lfnList in lfnChunks :
734+ try :
735+ dids = [self .__getDidsFromLfn (lfn ) for lfn in lfnList ]
736+ except Exception as err :
737+ return S_ERROR (str (err ))
738+ try :
739+ for met in self .client .get_metadata_bulk (dids = dids , inherit = True ):
740+ lfn = met ["name" ]
741+ resDict ["Successful" ][lfn ] = met
742+ for lfn in lfnList :
743+ if lfn not in resDict ["Successful" ]:
744+ resDict ["Failed" ][lfn ] = "No such file or directory"
745+ except Exception as err :
746+ return S_ERROR (str (err ))
747+ return S_OK (resDict )
748+
749+ @checkCatalogArguments
750+ def setMetadataBulk (self , pathMetadataDict ):
751+ """Add metadata for the given paths"""
752+ resDict = {"Successful" : {}, "Failed" : {}}
753+ dids = []
754+ for path , metadataDict in pathMetadataDict .items ():
755+ try :
756+ did = self .__getDidsFromLfn (path )
757+ did ["meta" ] = metadataDict
758+ dids .append (did )
759+ except Exception as err :
760+ return S_ERROR (str (err ))
761+ try :
762+ self .client .set_dids_metadata_bulk (dids = dids , recursive = False )
763+ except Exception as err :
764+ return S_ERROR (str (err ))
765+ return S_OK (resDict )
766+
767+ @checkCatalogArguments
768+ def setMetadata (self , path , metadataDict ):
769+ """Add metadata to the given path"""
770+ pathMetadataDict = {}
771+ path = next (iter (path ))
772+ pathMetadataDict [path ] = metadataDict
773+ return self .setMetadataBulk (pathMetadataDict )
774+
775+ @checkCatalogArguments
776+ def removeMetadata (self , path , metadata ):
777+ """Remove the specified metadata for the given file"""
778+ resDict = {"Successful" : {}, "Failed" : {}}
779+ try :
780+ did = self .__getDidsFromLfn (path )
781+ failedMeta = {}
782+ # TODO : Implement bulk delete_metadata method in Rucio
783+ for meta in metadata :
784+ try :
785+ self .client .delete_metadata (scope = did ["scope" ], name = did ["name" ], key = meta )
786+ except DataIdentifierNotFound :
787+ return S_ERROR (f"File { path } not found" )
788+ except Exception as err :
789+ failedMeta [meta ] = str (err )
790+
791+ if failedMeta :
792+ metaExample = list (failedMeta )[0 ]
793+ result = S_ERROR (f"Failed to remove { len (failedMeta )} metadata, e.g. { failedMeta [metaExample ]} " )
794+ result ["FailedMetadata" ] = failedMeta
795+ except Exception as err :
796+ return S_ERROR (str (err ))
797+ return S_OK ()
798+
799+ def findFilesByMetadata (self , metadataFilterDict , path = "/" , timeout = 120 ):
800+ """find the dids for the given metadataFilterDict"""
801+ ruciometadataFilterDict = self .__transform_DIRAC_filter_dict_to_Rucio_filter_dict ([metadataFilterDict ])
802+ dids = []
803+ for scope in self .scopes :
804+ try :
805+ dids .extend (self .client .list_dids (scope = scope , filters = ruciometadataFilterDict , did_type = "all" ))
806+ except Exception as err :
807+ return S_ERROR (str (err ))
808+ return S_OK (dids )
809+
810+ def __transform_DIRAC_operator_to_Rucio (self , DIRAC_dict ):
811+ """
812+ Transforms a DIRAC's metadata Query dictionary to a Rucio-compatible dictionary.
813+ This method takes a dictionary with DIRAC operators and converts it to a
814+ dictionary with Rucio-compatible operators based on predefined mappings.
815+ for example :
816+ input_dict={'key1': 'value1', 'key2': {'>': 10}, 'key3': {'=': 10}}
817+ return = {'key1': 'value1', 'key2.gt': 10, 'key3': 10}
818+ """
819+ rucio_dict = {}
820+ operator_mapping = {">" : ".gt" , "<" : ".lt" , ">=" : ".gte" , "<=" : ".lte" , "=<" : ".lte" , "!=" : ".ne" , "=" : "" }
821+
822+ for key , value in DIRAC_dict .items ():
823+ if isinstance (value , dict ):
824+ for operator , num in value .items ():
825+ if operator in operator_mapping :
826+ mapped_operator = operator_mapping [operator ]
827+ rucio_dict [f"{ key } { mapped_operator } " ] = num
828+ else :
829+ rucio_dict [key ] = value
830+
831+ return rucio_dict
832+
833+ def __transform_dict_with_in_operateur (self , DIRAC_dict_with_in_operator_list ):
834+ """
835+ Transforms a list of DIRAC dictionaries containing 'in' operators into a combined list of dictionaries,
836+ expanding the 'in' operator into individual dictionaries while preserving other keys.
837+ example
838+ input_dict_list = [{'particle': {'in': ['proton','electron']},'site': {'in': [ "LaPalma", 'paranal']},'configuration_id': {'=': 14} } ]
839+ return = [{'particle': 'proton', 'site': 'LaPalma', 'configuration_id': {'=': 14} }, {'particle': 'proton', 'site': 'paranal', 'configuration_id': {'=': 14} }, {'particle': 'electron', 'site': 'LaPalma', 'configuration_id': {'=': 14} }, {'particle': 'electron', 'site': 'paranal', 'configuration_id': {'=': 14} }]
840+ """
841+ if not isinstance (DIRAC_dict_with_in_operator_list , list ):
842+ raise TypeError ("DIRAC_dict_with_in_operator_list must be a list of dictionaries" )
843+
844+ combined_dict_list = [] # Final list of transformed dictionaries
845+ break_reached = False # Boolean to track if 'in' was found and processed in any dictionary
846+
847+ # Process each dictionary in the input list
848+ for DIRAC_dict_with_in_operator in DIRAC_dict_with_in_operator_list :
849+ if not isinstance (DIRAC_dict_with_in_operator , dict ):
850+ raise TypeError ("Each element in DIRAC_dict_with_in_operator_list must be a dictionary" )
851+
852+ in_key = None
853+ in_values = []
854+
855+ # Extract the key with 'in' operator and the list of values
856+ for key , value in DIRAC_dict_with_in_operator .items ():
857+ if isinstance (value , dict ) and "in" in value :
858+ in_key = key
859+ in_values = value ["in" ]
860+ break_reached = True # 'in' operator found
861+ break
862+
863+ # If an 'in' key exists, expand the dictionary for each value
864+ if in_key :
865+ for val in in_values :
866+ # Copy the original dictionary and replace the 'in' key
867+ new_dict = DIRAC_dict_with_in_operator .copy ()
868+ new_dict [in_key ] = val # Replace the 'in' key with the current value
869+ combined_dict_list .append (new_dict )
870+ else :
871+ # If no 'in' key, simply add the input dictionary as-is
872+ combined_dict_list .append (DIRAC_dict_with_in_operator )
873+
874+ return combined_dict_list , break_reached
875+
876+ def __transform_DIRAC_filter_dict_to_Rucio_filter_dict (self , DIRAC_filter_dict_list ):
877+ """
878+ Transforms a list of DIRAC filter dictionaries into a list of Rucio filter dictionaries.
879+ This method takes a list of filter dictionaries used in DIRAC and converts them into a format
880+ that is compatible with Rucio. It handles the transformation of operators and expands filters
881+ that use the 'in' operator.
882+ example:
883+ input_dict_list = [{'particle': {'in': ['proton','electron']},'site': {'in': [ "LaPalma", 'paranal']},'configuration_id': {'=': 14} } ]
884+ return = [{'particle': 'proton', 'site': 'LaPalma', 'configuration_id': 14}, {'particle': 'proton', 'site': 'paranal', 'configuration_id': 14}, {'particle': 'electron', 'site': 'LaPalma', 'configuration_id': 14}, {'particle': 'electron', 'site': 'paranal', 'configuration_id': 14}]
885+ """
886+ break_detected = True
887+ DIRAC_expanded_filters = DIRAC_filter_dict_list
888+ while break_detected :
889+ DIRAC_expanded_filters , break_detected = self .__transform_dict_with_in_operateur (DIRAC_expanded_filters )
890+ Rucio_filters = []
891+ for filter in DIRAC_expanded_filters :
892+ Rucio_filters .append (self .__transform_DIRAC_operator_to_Rucio (filter ))
893+ return Rucio_filters
0 commit comments