Skip to content

Commit 369771e

Browse files
authored
Merge pull request #66 from UniversalScientificTechnologies/update-parsers
airdos04c parser time and memory optimization
2 parents 05e06d0 + 20ce4ab commit 369771e

2 files changed

Lines changed: 52 additions & 67 deletions

File tree

backend/DOSPORTAL/services/parsing/parsers/airdos_04c.py

Lines changed: 49 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from typing import BinaryIO
22

3+
import numpy as np
34
import pandas as pd
45

56
from ..contracts import ParsedUnifiedData
@@ -45,109 +46,97 @@ def parse(
4546
def _parse_file(self, file_obj: BinaryIO, raw_header: str) -> dict[str, object]:
4647
file_obj.seek(0)
4748

48-
# ── Pass 1: collect raw blocks ────────────────────────────────────────
49-
blocks: list[dict[str, object]] = []
50-
current_events: list[int] = []
49+
bin_width = (self.ADC_MAX - self.ADC_MIN) / self.N_HIGH_BINS
50+
total_channels = self.LOW_CHANNELS + self.N_HIGH_BINS
51+
52+
time_list: list[float] = []
53+
particle_count_list: list[int] = []
54+
low_ch_list: list[list[int]] = []
55+
high_hist_list: list[list[int]] = []
56+
current_hist: list[int] = []
5157
in_block = False
5258
skipped_rows = 0
5359
start_unix_s: int | None = None
60+
unix_offset: float | None = None
5461

5562
for raw_line in file_obj:
56-
line = self._decode_line(raw_line)
57-
if not line or line.startswith("#"):
63+
raw_line = raw_line.rstrip()
64+
if not raw_line or raw_line.startswith(b"#"):
5865
continue
5966

60-
if line.startswith("$TIME,") and start_unix_s is None:
61-
tokens = line.split(",")
67+
if raw_line.startswith(b"$TIME,"):
68+
tokens = raw_line.split(b",")
6269
if len(tokens) >= 4:
63-
start_unix_s = self._safe_int(tokens[3])
64-
65-
elif line.startswith("$START,"):
66-
tokens = line.split(",")
70+
device_s = self._safe_float(tokens[1].decode())
71+
unix_s = self._safe_float(tokens[3].decode())
72+
if device_s is not None and unix_s is not None:
73+
unix_offset = unix_s - device_s
74+
if start_unix_s is None and unix_s is not None:
75+
start_unix_s = int(unix_s)
76+
77+
elif raw_line.startswith(b"$START,"):
78+
tokens = raw_line.split(b",")
6779
if len(tokens) >= 3:
6880
in_block = True
69-
current_events = []
81+
current_hist = [0] * self.N_HIGH_BINS
7082
else:
7183
skipped_rows += 1
7284

73-
elif line.startswith("$E,"):
85+
elif raw_line.startswith(b"$E,"):
7486
if not in_block:
7587
skipped_rows += 1
7688
continue
77-
tokens = line.split(",")
89+
tokens = raw_line.split(b",")
7890
if len(tokens) >= 3:
79-
adc = self._safe_int(tokens[2])
91+
adc = self._safe_int(tokens[2].decode())
8092
if adc is not None and adc >= self.ADC_MIN:
81-
current_events.append(adc)
93+
current_hist[min(int((adc - self.ADC_MIN) / bin_width), self.N_HIGH_BINS - 1)] += 1
8294
else:
8395
skipped_rows += 1
8496
else:
8597
skipped_rows += 1
8698

87-
elif line.startswith("$STOP,"):
99+
elif raw_line.startswith(b"$STOP,"):
88100
if not in_block:
89101
skipped_rows += 1
90102
continue
91103
in_block = False
92104

93105
# $STOP,<count>,<tm>.<tm_s100>,<systime>,<events_count>,<h0>,<h1>,<h2>,<h3>
94-
tokens = line.split(",")
106+
tokens = raw_line.split(b",")
95107
if len(tokens) < 9:
96108
skipped_rows += 1
97109
continue
98110

99-
time_s = self._safe_float(tokens[2])
100-
events_count = self._safe_int(tokens[4])
101-
h = [self._safe_int(tokens[5 + i]) for i in range(self.LOW_CHANNELS)]
111+
time_s = self._safe_float(tokens[2].decode())
112+
events_count = self._safe_int(tokens[4].decode())
113+
h = [self._safe_int(tokens[5 + i].decode()) for i in range(self.LOW_CHANNELS)]
102114

103115
if time_s is None or events_count is None:
104116
skipped_rows += 1
105117
continue
106118

107-
blocks.append(
108-
{
109-
"time_ms": time_s * 1000.0,
110-
"low_channels": [v if v is not None else 0 for v in h],
111-
"high_events": list(current_events),
112-
"events_count": events_count,
113-
}
114-
)
119+
low_ch = [v if v is not None else 0 for v in h]
120+
time_list.append((time_s + unix_offset) * 1000.0 if unix_offset is not None else time_s * 1000.0)
121+
particle_count_list.append(events_count + sum(low_ch))
122+
low_ch_list.append(low_ch)
123+
high_hist_list.append(current_hist)
115124

116-
if not blocks:
125+
if not time_list:
117126
raise ParsingError("No valid measurement blocks found in AIRDOS04C log file")
118127

119-
# ── Fixed high-energy bins: ADC_MIN..ADC_MAX → N_HIGH_BINS bins ────────
120-
bin_width = (self.ADC_MAX - self.ADC_MIN) / self.N_HIGH_BINS
121-
122-
def adc_to_bin(adc: int) -> int:
123-
return min(int((adc - self.ADC_MIN) / bin_width), self.N_HIGH_BINS - 1)
124-
125-
# ── Pass 2: build histogram per block ────────────────────────────────
126-
rows: list[dict[str, int | float]] = []
127-
128-
for index, block in enumerate(blocks):
129-
high_hist = [0] * self.N_HIGH_BINS
130-
for adc in block["high_events"]: # type: ignore[union-attr]
131-
high_hist[adc_to_bin(adc)] += 1
132-
133-
low_ch: list[int] = block["low_channels"] # type: ignore[assignment]
134-
row: dict[str, int | float] = {
135-
"id": index,
136-
"time_ms": float(block["time_ms"]), # type: ignore[arg-type]
137-
"particle_count": int(block["events_count"]) + sum(low_ch), # type: ignore[arg-type]
138-
}
139-
for i, count in enumerate(low_ch):
140-
row[f"channel_{i}"] = count
141-
for i, count in enumerate(high_hist):
142-
row[f"channel_{self.LOW_CHANNELS + i}"] = count
128+
n = len(time_list)
129+
channel_names = [f"channel_{i}" for i in range(total_channels)]
143130

144-
rows.append(row)
131+
channel_arr = np.empty((n, total_channels), dtype=np.int32)
132+
for i in range(n):
133+
channel_arr[i, :self.LOW_CHANNELS] = low_ch_list[i]
134+
channel_arr[i, self.LOW_CHANNELS:] = high_hist_list[i]
145135

146-
df = pd.DataFrame(rows)
147-
df["time_ms"] = df["time_ms"] - df["time_ms"].min()
148-
149-
total_channels = self.LOW_CHANNELS + self.N_HIGH_BINS
150-
channel_names = [f"channel_{i}" for i in range(total_channels)]
136+
df = pd.DataFrame(channel_arr, columns=channel_names)
137+
df.insert(0, "particle_count", np.array(particle_count_list, dtype=np.int32))
138+
df.insert(0, "time_ms", np.array(time_list, dtype=np.float64))
139+
df.insert(0, "id", np.arange(n, dtype=np.int32))
151140

152141
# Bin edges: ADC value at the start of each high-energy bin
153142
bin_edges = [self.ADC_MIN + i * bin_width for i in range(self.N_HIGH_BINS + 1)]
@@ -163,7 +152,6 @@ def adc_to_bin(adc: int) -> int:
163152
float(df["time_ms"].min()),
164153
float(df["time_ms"].max()),
165154
],
166-
"channel_columns": channel_names,
167155
"high_energy_bin_edges": bin_edges,
168156
"high_energy_adc_max": self.ADC_MAX,
169157
"start_unix_s": int(start_unix_s) if start_unix_s is not None else None,
@@ -172,12 +160,6 @@ def adc_to_bin(adc: int) -> int:
172160
file_obj.seek(0)
173161
return {"dataframe": df, "metadata": metadata}
174162

175-
@staticmethod
176-
def _decode_line(raw_line: bytes | str) -> str:
177-
if isinstance(raw_line, (bytes, bytearray)):
178-
return raw_line.decode("utf-8", errors="ignore").strip()
179-
return str(raw_line).strip()
180-
181163
@staticmethod
182164
def _safe_float(value: str) -> float | None:
183165
try:

backend/DOSPORTAL/tasks/spectral_records.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,10 @@ def process_spectral_record_into_spectral_file_async(spectral_record_id):
120120
combined = _combine_raw_log_files(raw_files)
121121

122122
try:
123+
import time
124+
_t0 = time.perf_counter()
123125
parsed = parse_log_to_unified(combined)
126+
print(f"Parsing done in {time.perf_counter() - _t0:.3f}s (record {record.id}, {len(raw_files)} files)")
124127
except Exception as e:
125128
logger.exception("Error parsing SpectralRecord %s", record.id)
126129
record.processing_status = ProcessingStatusMixin.PROCESSING_FAILED

0 commit comments

Comments
 (0)