Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@ def execute(self):
return result
pilotProxy = result["Value"]

# Dump the proxy to a file to get DiracX token (it's later used by DiracX)
result = gProxyManager.dumpProxyToFile(pilotProxy)
if not result["OK"]:
return result
os.environ["X509_USER_PROXY"] = result["Value"]

for queueName, queueDictionary in queueDictItems:
# Make sure there is no problem with the queue before trying to submit
if not self._allowedToSubmit(queueName):
Expand Down
16 changes: 16 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo
f"site {self._workloadSite}, CE {self._workloadCE}, queue {self._workloadQueue}",
)

# The CE interface needs to drop the token section from the proxy file to interact with the CE
# So we save the current proxy file location (which likely contains the DiracX token)
# and we will restore it at the end of the job
originalProxyLocation = os.environ.get("X509_USER_PROXY")

# Set up Application Queue
if not (result := self._setUpWorkloadCE(numberOfProcessors))["OK"]:
result["Errno"] = DErrno.ERESUNA
Expand Down Expand Up @@ -87,6 +92,8 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo
time.sleep(timeBetweenRetries)
else:
result["Errno"] = DErrno.EWMSSUBM
# Restore the original proxy location
os.environ["X509_USER_PROXY"] = originalProxyLocation
return result

jobID = result["Value"][0]
Expand All @@ -107,6 +114,8 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo
time.sleep(timeBetweenRetries)
else:
result["Errno"] = DErrno.EWMSSTATUS
# Restore the original proxy location
os.environ["X509_USER_PROXY"] = originalProxyLocation
return result

jobStatus = result["Value"][jobID]
Expand All @@ -123,6 +132,8 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo
time.sleep(timeBetweenRetries)
else:
result["Errno"] = DErrno.EWMSJMAN
# Restore the original proxy location
os.environ["X509_USER_PROXY"] = originalProxyLocation
return result

output, error = result["Value"]
Expand All @@ -131,6 +142,8 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo
self.log.info("Checking the integrity of the outputs...")
if not (result := self._checkOutputIntegrity("."))["OK"]:
result["Errno"] = DErrno.EWMSJMAN
# Restore the original proxy location
os.environ["X509_USER_PROXY"] = originalProxyLocation
return result
self.log.info("The output has been retrieved and declared complete")

Expand All @@ -146,6 +159,9 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo
self.log.warn("Failed to clean the output remotely", result["Message"])
self.log.info("The job has been remotely removed")

# Restore the original proxy location
os.environ["X509_USER_PROXY"] = originalProxyLocation

commandStatus = {"Done": 0, "Failed": -1, "Killed": -2}
return S_OK((commandStatus[jobStatus], output, error))

Expand Down
Loading