11from DIRAC .Core .Utilities .ReturnValues import convertToReturnValue
2+ from DIRAC .Core .Security .DiracX import DiracXClient , FutureClient
23
3- from DIRAC .Core .Security .DiracX import DiracXClient
44
5-
6- class PilotManagerClient :
5+ class PilotManagerClient (FutureClient ):
76 @convertToReturnValue
87 def addPilotReferences (self , pilot_stamps , VO , gridType = "DIRAC" , pilot_references = {}):
98 with DiracXClient () as api :
109 # We will move toward a stamp as identifier for the pilot
1110 return api .pilots .add_pilot_stamps (
12- {"pilot_stamps" : pilot_stamps , "vo" : VO , "grid_type" : gridType , "pilot_references" : pilot_references }
11+ {"pilot_stamps" : pilot_stamps , "vo" : VO , "grid_type" : gridType , "pilot_references" : pilot_references } # type: ignore
1312 )
1413
1514 def set_pilot_field (self , pilot_stamp , values_dict ):
1615 with DiracXClient () as api :
1716 values_dict ["PilotStamp" ] = pilot_stamp
18- return api .pilots .update_pilot_fields (values_dict )
19-
20- @convertToReturnValue
21- def setPilotBenchmark (self , pilotStamp , mark ):
22- return self .set_pilot_field (pilotStamp , {"BenchMark" : mark })
23-
24- @convertToReturnValue
25- def setAccountingFlag (self , pilotStamp , flag ):
26- return self .set_pilot_field (pilotStamp , {"AccountingSent" : flag })
17+ return api .pilots .update_pilot_fields ({"pilot_stamps_to_fields_mapping" : [values_dict ]}) # type: ignore
2718
2819 @convertToReturnValue
2920 def setPilotStatus (self , pilot_stamp , status , destination = None , reason = None , grid_site = None , queue = None ):
@@ -44,13 +35,74 @@ def clearPilots(self, interval=30, aborted_interval=7):
4435 api .pilots .delete_pilots (age_in_days = interval , delete_only_aborted = False )
4536 api .pilots .delete_pilots (age_in_days = aborted_interval , delete_only_aborted = True )
4637
38+ @convertToReturnValue
39+ def deletePilot (self , pilot_stamp ):
40+ with DiracXClient () as api :
41+ pilot_stamps = [pilot_stamp ]
42+ return api .pilots .delete_pilots (pilot_stamps = pilot_stamps )
43+
44+ @convertToReturnValue
45+ def getJobsForPilotByStamp (self , pilotStamp ):
46+ with DiracXClient () as api :
47+ return api .pilots .get_pilot_jobs (pilot_stamp = pilotStamp )
48+
49+ @convertToReturnValue
50+ def getPilots (self , job_id ):
51+ with DiracXClient () as api :
52+ pilot_ids = api .pilots .get_pilot_jobs (job_id = job_id )
53+ search = [{"parameter" : "PilotID" , "operator" : "in" , "value" : pilot_ids }]
54+ return api .pilots .search (parameters = [], search = search , sort = []) # type: ignore
55+
56+ @convertToReturnValue
57+ def getPilotInfo (self , pilot_reference ):
58+ """Important: We assume that to one stamp is mapped one pilot."""
59+ with DiracXClient () as api :
60+ search = [{"parameter" : "PilotJobReference" , "operator" : "eq" , "value" : pilot_reference }]
61+ pilot = api .pilots .search (parameters = [], search = search , sort = [])[0 ] # type: ignore
62+
63+ if not pilot :
64+ # Return an error as in the legacy code
65+ return []
66+
67+ # Convert all bools in pilot to str
68+ for k , v in pilot .items ():
69+ if isinstance (v , bool ):
70+ pilot [k ] = str (v )
71+
72+ # Transform the list of pilots into a dict keyed by PilotJobReference
73+ resDict = {}
74+
75+ pilotRef = pilot .get ("PilotJobReference" , None )
76+ assert pilot_reference == pilotRef
77+ pilotStamp = pilot .get ("PilotStamp" , None )
78+
79+ if pilotRef is not None :
80+ resDict [pilotRef ] = pilot
81+ else :
82+ # Fallback: use PilotStamp or another key if PilotJobReference is missing
83+ resDict [pilotStamp ] = pilot
84+
85+ jobIDs = self .getJobsForPilotByStamp (pilotStamp )
86+ if jobIDs : # Only add if jobs exist
87+ for pilotRef , pilotInfo in resDict .items ():
88+ pilotInfo ["Jobs" ] = jobIDs # Attach the entire list
89+
90+ return resDict
91+
92+ @convertToReturnValue
93+ def getGroupedPilotSummary (self , column_list ):
94+ with DiracXClient () as api :
95+ return api .pilots .summary (grouping = column_list )
96+
4797 @convertToReturnValue
4898 def deletePilots (self , pilot_stamps ):
99+ # Used by no one, but we won't raise `UnimplementedError` because we still use it in tests.
49100 with DiracXClient () as api :
50101 pilot_ids = None
51- if isinstance (pilot_stamps , list [int ]):
52- # Multiple elements (int)
53- pilot_ids = pilot_stamps # Semantic
102+ if pilot_stamps and isinstance (pilot_stamps , list ):
103+ if isinstance (pilot_stamps [0 ], int ):
104+ # Multiple elements (int)
105+ pilot_ids = pilot_stamps # Semantic
54106 elif isinstance (pilot_stamps , int ):
55107 # Only one element (int)
56108 pilot_ids = [pilot_stamps ]
@@ -61,37 +113,45 @@ def deletePilots(self, pilot_stamps):
61113
62114 if pilot_ids :
63115 # If we have defined pilot_ids, then we have to change them to pilot_stamps
64- query = [{"parameter" : "PilotID" , "operator" : "in" , "value" : pilot_ids }]
116+ search = [{"parameter" : "PilotID" , "operator" : "in" , "value" : pilot_ids }]
65117
66- pilots = api .pilots .search (parameters = ["PilotStamp" ], search = query , sort = [])
118+ pilots = api .pilots .search (parameters = ["PilotStamp" ], search = search , sort = []) # type: ignore
67119 pilot_stamps = [pilot ["PilotStamp" ] for pilot in pilots ]
68120
69- api .pilots .delete_pilots (pilot_stamps = pilot_stamps )
121+ return api .pilots .delete_pilots (pilot_stamps = pilot_stamps ) # type: ignore
70122
71123 @convertToReturnValue
72124 def setJobForPilot (self , job_id , pilot_stamp , destination = None ):
73- with DiracXClient () as api :
74- api .pilots .add_jobs_to_pilot ({"pilot_stamp" : pilot_stamp , "job_ids" : [job_id ]})
75-
76- self .set_pilot_field (
77- pilot_stamp ,
78- {
79- "DestinationSite" : destination ,
80- },
81- )
125+ raise NotImplementedError (
126+ "This function is used by no one. I won't be adapted into DiracX via a legacy adaptor."
127+ )
82128
83129 @convertToReturnValue
84- def getPilots (self , job_id ):
85- with DiracXClient () as api :
86- pilot_ids = api .pilots .get_pilot_jobs (job_id = job_id )
130+ def countPilots (self , condDict , older = None , newer = None ):
131+ raise NotImplementedError (
132+ "This function is used by no one. I won't be adapted into DiracX via a legacy adaptor."
133+ )
87134
88- query = [{"parameter" : "PilotID" , "operator" : "in" , "value" : pilot_ids }]
135+ @convertToReturnValue
136+ def selectPilots (self , condDict ):
137+ raise NotImplementedError (
138+ "This function is used by no one. I won't be adapted into DiracX via a legacy adaptor."
139+ )
89140
90- return api .pilots .search (parameters = [], search = query , sort = [])
141+ @convertToReturnValue
142+ def getCurrentPilotCounters (self , attrDict = {}):
143+ raise NotImplementedError (
144+ "This function is used by no one. I won't be adapted into DiracX via a legacy adaptor."
145+ )
91146
92147 @convertToReturnValue
93- def getPilotInfo (self , pilot_stamp ):
94- with DiracXClient () as api :
95- query = [{"parameter" : "PilotStamp" , "operator" : "eq" , "value" : pilot_stamp }]
148+ def setPilotBenchmark (self , pilotRef , mark ):
149+ raise NotImplementedError (
150+ "This function is used by no one. I won't be adapted into DiracX via a legacy adaptor."
151+ )
96152
97- return api .pilots .search (parameters = [], search = query , sort = [])
153+ @convertToReturnValue
154+ def setAccountingFlag (self , pilotRef , flag = "True" ):
155+ raise NotImplementedError (
156+ "This function is used by no one. I won't be adapted into DiracX via a legacy adaptor."
157+ )
0 commit comments