Skip to content

Commit cf07eb4

Browse files
committed
re-worked the start timing overview
1 parent 146ab58 commit cf07eb4

2 files changed

Lines changed: 77 additions & 51 deletions

File tree

rc/control/daqinterface.py

Lines changed: 75 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -713,56 +713,83 @@ def disable(self):
713713
self.__do_disable = True
714714

715715
def timing_trace_is_enabled(self):
716-
token = os.environ.get("DAQINTERFACE_TIMING_TRACE", "true")
717-
return token.lower() not in ["0", "false", "no", "off"]
718-
719-
def timing_trace_filename(self):
720-
if "DAQINTERFACE_TIMING_TRACE_FILE" in os.environ:
721-
return os.environ["DAQINTERFACE_TIMING_TRACE_FILE"]
722-
return "/tmp/daqinterface_timing_%s_partition%s.log" % (
723-
os.environ.get("USER", "unknown"),
724-
self.partition_number,
725-
)
716+
return getattr(self, "timing_trace_enabled", False)
717+
718+
# Timing-trace entries are accumulated during a transition and emitted as a
719+
# single summary block when the outermost timed stage completes, rather than
720+
# logging a line per begin/end as each stage runs. Nesting is tracked via a
721+
# depth counter (a timed stage such as do_config_total may itself contain a
722+
# nested do_command), so the flush happens exactly once, at depth zero.
723+
724+
def _timing_trace_init(self):
725+
if not hasattr(self, "_timing_trace_entries"):
726+
self._timing_trace_entries = []
727+
self._timing_trace_depth = 0
726728

727729
def timing_trace(self, event, stage, elapsed_s=None, extra_fields=None):
728-
if not self.timing_trace_is_enabled() or getattr(self, "_timing_trace_failed", False):
730+
if not self.timing_trace_is_enabled():
731+
return
732+
733+
self._timing_trace_init()
734+
self._timing_trace_entries.append((stage, elapsed_s, extra_fields))
735+
736+
def timing_trace_start(self, stage, extra_fields=None):
737+
if self.timing_trace_is_enabled():
738+
self._timing_trace_init()
739+
self._timing_trace_depth += 1
740+
return time()
741+
742+
def timing_trace_end(self, stage, start_time, extra_fields=None):
743+
self.timing_trace(
744+
"end", stage, elapsed_s=(time() - start_time), extra_fields=extra_fields
745+
)
746+
747+
if not self.timing_trace_is_enabled():
729748
return
730749

731-
now = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")
732-
fields = [
733-
"ts=%s" % (now),
734-
"event=%s" % (event),
735-
"stage=%s" % (stage),
750+
self._timing_trace_depth -= 1
751+
if self._timing_trace_depth <= 0:
752+
self.timing_trace_flush(stage, extra_fields)
753+
754+
def timing_trace_flush(self, top_stage, top_fields=None):
755+
entries = self._timing_trace_entries
756+
self._timing_trace_entries = []
757+
self._timing_trace_depth = 0
758+
759+
header_fields = [
760+
top_stage,
736761
"partition=%s" % (self.partition_number),
737762
"pid=%s" % (os.getpid()),
738763
]
739764

740765
if self.run_number is not None:
741-
fields.append("run=%s" % (self.run_number))
766+
header_fields.append("run=%s" % (self.run_number))
742767

743-
if elapsed_s is not None:
744-
fields.append("elapsed_s=%.6f" % (elapsed_s))
768+
if top_fields is not None:
769+
for key in sorted(top_fields.keys()):
770+
value = str(top_fields[key]).replace(" ", "_")
771+
header_fields.append("%s=%s" % (key, value))
745772

746-
if extra_fields is not None:
747-
for key in sorted(extra_fields.keys()):
748-
value = str(extra_fields[key]).replace(" ", "_")
749-
fields.append("%s=%s" % (key, value))
773+
lines = ["TIMING TRACE: %s" % (" ".join(header_fields))]
750774

751-
try:
752-
with open(self.timing_trace_filename(), "a") as outf:
753-
outf.write(" ".join(fields) + "\n")
754-
except IOError as e:
755-
self.print_log("w", "Timing trace write failed, disabling tracing: %s" % e)
756-
self._timing_trace_failed = True
775+
# Sort the timed stages slowest-first; "point" events (no elapsed_s)
776+
# are listed afterwards with their extra fields.
777+
timed = [e for e in entries if e[1] is not None]
778+
points = [e for e in entries if e[1] is None]
757779

758-
def timing_trace_start(self, stage, extra_fields=None):
759-
self.timing_trace("begin", stage, extra_fields=extra_fields)
760-
return time()
780+
for stage, elapsed_s, _ in sorted(timed, key=lambda e: e[1], reverse=True):
781+
lines.append(" %-40s %8.3fs" % (stage, elapsed_s))
761782

762-
def timing_trace_end(self, stage, start_time, extra_fields=None):
763-
self.timing_trace(
764-
"end", stage, elapsed_s=(time() - start_time), extra_fields=extra_fields
765-
)
783+
for stage, _, extra_fields in points:
784+
extras = ""
785+
if extra_fields is not None:
786+
extras = " " + " ".join(
787+
"%s=%s" % (key, str(extra_fields[key]).replace(" ", "_"))
788+
for key in sorted(extra_fields.keys())
789+
)
790+
lines.append(" %-40s (point)%s" % (stage, extras))
791+
792+
self.print_log("d", "\n".join(lines))
766793

767794
# JCF, Jan-2-2020
768795

@@ -932,6 +959,7 @@ def read_settings(self):
932959

933960
self.use_messageviewer = True
934961
self.use_messagefacility = True
962+
self.timing_trace_enabled = True
935963
self.advanced_memory_usage = False
936964
self.strict_fragment_id_mode = False
937965
self.fake_messagefacility = False
@@ -1127,6 +1155,11 @@ def read_settings(self):
11271155

11281156
if res:
11291157
self.use_messagefacility = False
1158+
elif "timing_trace_enabled" in line or "timing trace enabled" in line:
1159+
token = line.split()[-1].strip()
1160+
1161+
# Defaults to true; allow the settings file to turn it off.
1162+
self.timing_trace_enabled = re.search(r"[Ff]alse", token) is None
11301163
elif "advanced_memory_usage" in line or "advanced memory usage" in line:
11311164
token = line.split()[-1].strip()
11321165

@@ -4678,6 +4711,12 @@ def runner(self):
46784711
self.perform_periodic_action()
46794712

46804713
except Exception:
4714+
# Emit whatever timing was gathered before the exception, and reset
4715+
# the buffer so a partial transition doesn't leak into the next one.
4716+
if self.timing_trace_is_enabled() and getattr(
4717+
self, "_timing_trace_entries", None
4718+
):
4719+
self.timing_trace_flush("transition_failed")
46814720
self.in_recovery = True
46824721
self.alert_and_recover(traceback.format_exc())
46834722
self.in_recovery = False

rc/control/manage_processes_direct.py

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -319,26 +319,13 @@ def launch_procs_base(self):
319319
launch_commands_to_run_on_host_background[procinfo.host] = []
320320
launch_commands_on_host_to_show_user[procinfo.host] = []
321321

322-
tmp_launch_attempt_file = "/tmp/launch_attempt_tmp_%s_%s_partition%s" % (
323-
procinfo.host,
324-
os.environ["USER"],
325-
os.environ["DAQINTERFACE_PARTITION_NUMBER"],
326-
)
327-
328322
launch_commands_to_run_on_host[procinfo.host].append("set +C")
329-
launch_commands_to_run_on_host[procinfo.host].append(
330-
"echo > %s" % (tmp_launch_attempt_file)
331-
)
332323
launch_commands_to_run_on_host[procinfo.host] += get_setup_commands(
333-
self.spackdir, tmp_launch_attempt_file
324+
self.spackdir, self.launch_attempt_files[procinfo.host]
334325
)
335326
launch_commands_to_run_on_host[procinfo.host].append(
336327
"source %s for_running >> %s 2>&1 "
337-
% (self.daq_setup_script, tmp_launch_attempt_file)
338-
)
339-
launch_commands_to_run_on_host[procinfo.host].append(
340-
"cat %s >> %s && rm %s"
341-
% (tmp_launch_attempt_file, self.launch_attempt_files[procinfo.host], tmp_launch_attempt_file)
328+
% (self.daq_setup_script, self.launch_attempt_files[procinfo.host])
342329
)
343330
launch_commands_to_run_on_host[procinfo.host].append(
344331
"export ARTDAQ_LOG_ROOT=%s" % (self.log_directory)

0 commit comments

Comments
 (0)