diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py index 492c1c0e200..fe133ecc390 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py @@ -230,6 +230,12 @@ def execute(self): return result pilotProxy = result["Value"] + # Dump the proxy to a file to get DiracX token (it's later used by DiracX) + result = gProxyManager.dumpProxyToFile(pilotProxy) + if not result["OK"]: + return result + os.environ["X509_USER_PROXY"] = result["Value"] + for queueName, queueDictionary in queueDictItems: # Make sure there is no problem with the queue before trying to submit if not self._allowedToSubmit(queueName): diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py b/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py index 5a2eb8a13e5..39d5f2d4a26 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py @@ -49,6 +49,11 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo f"site {self._workloadSite}, CE {self._workloadCE}, queue {self._workloadQueue}", ) + # The CE interface needs to drop the token section from the proxy file to interact with the CE + # So we save the current proxy file location (which likely contains the DiracX token) + # and we will restore it at the end of the job + originalProxyLocation = os.environ.get("X509_USER_PROXY") + # Set up Application Queue if not (result := self._setUpWorkloadCE(numberOfProcessors))["OK"]: result["Errno"] = DErrno.ERESUNA @@ -87,6 +92,8 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo time.sleep(timeBetweenRetries) else: result["Errno"] = DErrno.EWMSSUBM + # Restore the original proxy location + os.environ["X509_USER_PROXY"] = originalProxyLocation return result jobID = result["Value"][0] @@ -107,6 +114,8 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo time.sleep(timeBetweenRetries) else: result["Errno"] = DErrno.EWMSSTATUS + # Restore the original proxy location + os.environ["X509_USER_PROXY"] = originalProxyLocation return result jobStatus = result["Value"][jobID] @@ -123,6 +132,8 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo time.sleep(timeBetweenRetries) else: result["Errno"] = DErrno.EWMSJMAN + # Restore the original proxy location + os.environ["X509_USER_PROXY"] = originalProxyLocation return result output, error = result["Value"] @@ -131,6 +142,8 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo self.log.info("Checking the integrity of the outputs...") if not (result := self._checkOutputIntegrity("."))["OK"]: result["Errno"] = DErrno.EWMSJMAN + # Restore the original proxy location + os.environ["X509_USER_PROXY"] = originalProxyLocation return result self.log.info("The output has been retrieved and declared complete") @@ -146,6 +159,9 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo self.log.warn("Failed to clean the output remotely", result["Message"]) self.log.info("The job has been remotely removed") + # Restore the original proxy location + os.environ["X509_USER_PROXY"] = originalProxyLocation + commandStatus = {"Done": 0, "Failed": -1, "Killed": -2} return S_OK((commandStatus[jobStatus], output, error))