Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 126 additions & 67 deletions xfel/command_line/cxi_mpi_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,16 @@
phil_scope = parse(phil_str + mp_phil_str, process_includes=True)
mp_phil_scope = parse(mp_phil_str, process_includes=True)

def copy_wrapper(src, dst):
# OsWrapper is a signleton => as long as it is constructed by Script() first,
# repeated calls to the constructor will return the previously constructed
# instance
from xfel.util.sfapi_connector import OsWrapper
os = OsWrapper()
with os.open(src, "r") as f_in:
with os.open(dst, "w") as f_out:
f_out.write(f_in.read())

def copy_config(config, dest_dir, root_name, params, target_num):
""" Copy a config file to a directory, and all of its referenced phil files. Recursively
copies the files and changes the includes statements to match the new file names.
Expand All @@ -141,46 +151,51 @@ def copy_config(config, dest_dir, root_name, params, target_num):
@param params phil parameters
@param target_num for target phil files found in config, latest number found
"""
# make a copy of the original cfg file
shutil.copy(config, os.path.join(dest_dir, "%s_orig.cfg"%root_name))

# OsWrapper is a signleton => as long as it is constructed by Script() first,
# repeated calls to the constructor will return the previously constructed
# instance
from xfel.util.sfapi_connector import OsWrapper
os = OsWrapper()
# make a copy of the original cfg file
# shutil.copy(config, os.path.join(dest_dir, "%s_orig.cfg"%root_name))
copy_wrapper(config, os.path.join(dest_dir, "%s_orig.cfg"%root_name))
config_path = os.path.join(dest_dir, "%s.cfg"%root_name)

# Re-write the config file, changing paths to be relative to the trial directory.
# Also copy and re-write included phil files, while updating the config file to
# the new paths
# Note, some legacy rewrites done by cxi.lsf are not included here.
f = open(config_path, 'w')
for line in open(config).readlines():
if "[pyana]" in line:
raise Sorry("Pyana not supported. Check your config file.")
if "RUN_NO" in line:
line = line.replace("RUN_NO", str(params.input.run_num))
if "RUN_STR" in line:
line = line.replace("RUN_STR", "r%04d"%(params.input.run_num))
if "trial_id" in line:
key, val = line.split("=")
line = "%s= %d\n"%(key,params.input.trial)
if "rungroup_id" in line:
key, val = line.split("=")
line = "%s= %s\n"%(key,params.input.rungroup) # None ok
elif "_dirname" in line:
key, val = line.split("=")
val = os.path.join(dest_dir, os.path.basename(val.strip()))
if not os.path.exists(val):
os.mkdir(val)
line = "%s= %s\n"%(key,val)
elif "xtal_target" in line:
key, val = line.split("=")
val = val.strip()
if not os.path.exists(val):
raise Sorry("One of the xtal_target files in the cfg file doesn't exist: %s"%val)
new_target = "params_%d"%target_num
copy_target(val, dest_dir, new_target)
target_num += 1
line = "%s= %s.phil\n"%(key,os.path.join(dest_dir, new_target))
f.write(line)
f.close()
with os.open(config_path, 'w') as f:
for line in open(config).readlines():
if "[pyana]" in line:
raise Sorry("Pyana not supported. Check your config file.")
if "RUN_NO" in line:
line = line.replace("RUN_NO", str(params.input.run_num))
if "RUN_STR" in line:
line = line.replace("RUN_STR", "r%04d"%(params.input.run_num))
if "trial_id" in line:
key, val = line.split("=")
line = "%s= %d\n"%(key,params.input.trial)
if "rungroup_id" in line:
key, val = line.split("=")
line = "%s= %s\n"%(key,params.input.rungroup) # None ok
elif "_dirname" in line:
key, val = line.split("=")
val = os.path.join(dest_dir, os.path.basename(val.strip()))
if not os.path.exists(val):
os.mkdir(val)
line = "%s= %s\n"%(key,val)
elif "xtal_target" in line:
key, val = line.split("=")
val = val.strip()
if not os.path.exists(val):
raise Sorry("One of the xtal_target files in the cfg file doesn't exist: %s"%val)
new_target = "params_%d"%target_num
copy_target(val, dest_dir, new_target)
target_num += 1
line = "%s= %s.phil\n"%(key,os.path.join(dest_dir, new_target))
f.write(line)
# f.close()
return target_num

def copy_target(target, dest_dir, root_name):
Expand All @@ -191,23 +206,34 @@ def copy_target(target, dest_dir, root_name):
@param root_name Name to give to copied file. Included phil files will be given
this name + _N where N is incremented for each file included in the phil file
"""
# OsWrapper is a signleton => as long as it is constructed by Script() first,
# repeated calls to the constructor will return the previously constructed
# instance
from xfel.util.sfapi_connector import OsWrapper
os = OsWrapper()
# Each included phil file will be named root_name_N.phil where N is this number,
# incremented for each included phil file
num_sub_targets = 1
f = open(os.path.join(dest_dir, root_name + ".phil"), 'w')
for line in open(target).readlines():
if "include" in line:
inc_str, include_type, sub_target = line.strip().split() # Example: include file cxi-8.2.phil
if include_type != 'file':
raise Sorry("Include isn't a file") # FIXME look up what other values are possible here
sub_target_root_name = "%s_%d"%(root_name, num_sub_targets)
line = " ".join([inc_str, include_type, sub_target_root_name + ".phil\n"])
# recursive call to check for other included files
copy_target(os.path.join(os.path.dirname(target), sub_target), dest_dir, sub_target_root_name)
num_sub_targets += 1
f.write(line)
with os.open(os.path.join(dest_dir, root_name + ".phil"), 'w') as f:
for line in open(target).readlines():
if "include" in line:
inc_str, include_type, sub_target = line.strip().split() # Example: include file cxi-8.2.phil
if include_type != 'file':
raise Sorry("Include isn't a file") # FIXME look up what other values are possible here
sub_target_root_name = "%s_%d"%(root_name, num_sub_targets)
line = " ".join([inc_str, include_type, sub_target_root_name + ".phil\n"])
# recursive call to check for other included files
copy_target(os.path.join(os.path.dirname(target), sub_target), dest_dir, sub_target_root_name)
num_sub_targets += 1
f.write(line)

def get_trialdir(output_dir, run_num, trial = None, rungroup = None, task = None):
# OsWrapper is a signleton => as long as it is constructed by Script() first,
# repeated calls to the constructor will return the previously constructed
# instance
from xfel.util.sfapi_connector import OsWrapper
os = OsWrapper()

try:
rundir = os.path.join(output_dir, "r%04d"%int(run_num))
except ValueError:
Expand Down Expand Up @@ -272,6 +298,9 @@ def get_submission_id(result, method):
elif method == 'slurm' or method == "shifter":
# Assuming that all shifter instances are running on NERSC (slurm) systems
return result.stdout_lines[0].split()[-1].strip()
elif method == 'sfapi':
submission_id = str(result)
return submission_id
elif method == 'htcondor':
return result.stdout_lines[-1].split()[-1].rstrip('.')
elif method == 'sge':
Expand Down Expand Up @@ -313,15 +342,32 @@ def do_submit(command, submit_path, stdoutdir, mp_params, log_name="log.out", er
stdout = os.open(os.path.join(stdoutdir, 'submit.log'), os.O_WRONLY|os.O_CREAT|os.O_TRUNC); os.dup2(stdout, 1)
stderr = os.open(os.path.join(stdoutdir, 'submit.err'), os.O_WRONLY|os.O_CREAT|os.O_TRUNC); os.dup2(stderr, 2)
os.execv(command.split()[0], command.split())
elif mp_params.method == 'sfapi':
job_stript_path = submit_command # the SFAPI submit command is the location of the job script

from sfapi_client import Client
from sfapi_client.compute import Machine
from xfel.util.sfapi_connector import KeyManager, LOGGER

km = KeyManager()
with Client(key=km.key) as client:
perlmutter = client.compute(Machine.perlmutter)
[job_script_remote] = perlmutter.ls(
job_stript_path.replace("~/", km.home), directory=False
)

print(f"Jobscript is at: {job_script_remote}")
job = perlmutter.submit_job(job_script_remote)
print(f"Submitted_job: {job.jobid}")
result = job.jobid
else:
try:
result = easy_run.fully_buffered(command=submit_command)
result.raise_if_errors()
except Exception as e:
if not "Warning: job being submitted without an AFS token." in str(e):
raise e

return get_submission_id(result, mp_params.method)
return get_submission_id(result, mp_params.method)

class Script(object):
""" Script to submit XFEL data for processing"""
Expand Down Expand Up @@ -359,6 +405,17 @@ def run(self, argv = None):
params = scope.extract()
dispatcher_args.extend(["%s=%s"%(u.path,u.object.words[0].value) for u in unused])

# OsWrapper is a signleton => as long as it is constructed by Script()
# first, repeated calls to the constructor will return the previously
# constructed instance
if params.mp.method == "sfapi":
import logging
from xfel.util.sfapi_connector import OsWrapper, OsSFAPI, LOGGER
LOGGER.setLevel(logging.DEBUG)
self.os = OsWrapper(backend=OsSFAPI())
else:
self.os = os

assert params.input.run_num is not None
if params.input.dispatcher in ["cxi.xtc_process", "cctbx.xfel.xtc_process"]:
# processing XTC streams at LCLS -- dispatcher will locate raw data
Expand All @@ -371,19 +428,19 @@ def run(self, argv = None):
print("Using trial", params.input.trial)

# log file will live here
stdoutdir = os.path.join(trialdir, "stdout")
os.mkdir(stdoutdir)
stdoutdir = self.os.path.join(trialdir, "stdout")
self.os.mkdir(stdoutdir)
logging_str = ""
if params.output.split_logs:# test parameter for split_log then open and close log file and loop over nprocs
if params.mp.method=='shifter' and params.mp.shifter.staging=='DataWarp':
bbdirstr = "${DW_JOB_STRIPED}/stdout"
logging_str = "output.logging_dir=%s"%bbdirstr
else:
for i in range(params.mp.nproc):
error_files = os.path.join(stdoutdir,"error_rank%04d.out"%i)
log_files = os.path.join(stdoutdir,"log_rank%04d.out"%i)
open(log_files,'a').close()
open(error_files,'a').close()
error_files = self.os.path.join(stdoutdir,"error_rank%04d.out"%i)
log_files = self.os.path.join(stdoutdir,"log_rank%04d.out"%i)
self.os.open(log_files,'a').close()
self.os.open(error_files,'a').close()
logging_str = "output.logging_dir=%s"%stdoutdir
else:
logging_str = ""
Expand All @@ -398,50 +455,52 @@ def run(self, argv = None):
continue
name, value = arg.split('=')

if "cfg" in name and os.path.splitext(value)[1].lower() == ".cfg":
if "cfg" in name and self.os.path.splitext(value)[1].lower() == ".cfg":
cfg = value
if not os.path.exists(cfg):
if not self.os.path.exists(cfg):
raise Sorry("Config file doesn't exist: %s"%cfg)
if has_config:
raise Sorry("Multiple config files found")
has_config = True
target_num = copy_config(cfg, trialdir, "psana", params, target_num)
redone_args.append("%s=%s"%(name, os.path.join(trialdir, "psana.cfg")))
elif "target" in name or os.path.splitext(value)[1].lower() == ".phil":
redone_args.append("%s=%s"%(name, self.os.path.join(trialdir, "psana.cfg")))
elif "target" in name or self.os.path.splitext(value)[1].lower() == ".phil":
phil = value
if not os.path.exists(phil):
if not self.os.path.exists(phil):
raise Sorry("Phil file doesn't exist: %s"%phil)
copy_target(phil, trialdir, "params_%d"%target_num)
redone_args.append("%s=%s"%(name, os.path.join(trialdir, "params_%d.phil"%target_num)))
redone_args.append("%s=%s"%(name, self.os.path.join(trialdir, "params_%d.phil"%target_num)))
target_num += 1
else:
redone_args.append(arg)
dispatcher_args = redone_args

# If additional phil params are provided, copy them over too
if params.input.target is not None:
# don't use self.os => need to to check host
if not os.path.exists(params.input.target):
raise Sorry("Target file doesn't exist: %s"%params.input.target)
copy_target(params.input.target, trialdir, "params_%d"%target_num)
params.input.target = os.path.join(trialdir, "params_%d.phil"%target_num)
params.input.target = self.os.path.join(trialdir, "params_%d.phil"%target_num)
target_num += 1

# Some configs files will specify out_dirname. If not, we want to explicitly create one
# so the dispatcher will have an output directory.
output_dir = os.path.join(trialdir, "out")
if not os.path.exists(output_dir):
os.makedirs(output_dir)
output_dir = self.os.path.join(trialdir, "out")
if not self.os.path.exists(output_dir):
self.os.makedirs(output_dir)

# Write out a script for submitting this job and submit it
submit_path = os.path.join(trialdir, "submit.sh")
submit_path = self.os.path.join(trialdir, "submit.sh")

extra_str = ""
data_str = ""

assert [params.input.locator, params.input.experiment].count(None) != 0
if params.input.locator is not None:
locator_file = os.path.join(trialdir, "data.loc")
shutil.copyfile(params.input.locator, locator_file)
locator_file = self.os.path.join(trialdir, "data.loc")
# shutil.copyfile(params.input.locator, locator_file)
copy_wrapper(params.input.locator, locator_file)
data_str += locator_file

from xfel.ui import known_dials_dispatchers
Expand Down
3 changes: 2 additions & 1 deletion xfel/command_line/fee_calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from libtbx.phil import parse
from libtbx.utils import Sorry
import psana
from matplotlib import pyplot as plt
from serialtbx.util.energy_scan_notch_finder import notch_phil_string, find_notch, plot_notches, calibrate_energy

Expand All @@ -27,6 +26,8 @@

def tally_fee_data(experiment, runs, plot=True, verbose=True, max_events=None):
"""Check each event of each requested run in the specified experiment for a FEE spectrometer event. Report how many events are missing. Return spectrometer data if present."""
import psana

good = 0
bad = 0
events = []
Expand Down
8 changes: 8 additions & 0 deletions xfel/ui/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@
.help = Write images to disk whether they index or not. \
Helpful for tuning spotfinding and indexing parameters, and necessary \
for the "Should have indexed" feature of the Run Stats tab.
api {
protocol = http *https
.type = choice
.help = Use http or https for API connections
host = pswww.slac.stanford.edu
.type = str
.help = Address of the LCLS API Host
}
}
standalone {
data_dir = None
Expand Down
Loading