Skip to content

Commit 475eb8c

Browse files
committed
Adjustments to support multiple files
1 parent dda08a9 commit 475eb8c

5 files changed

Lines changed: 338 additions & 90 deletions

File tree

src/sed/config/lab_example_config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ dataframe:
3535

3636
first_event_time_stamp_key: /ScanParam/StartTime
3737
ms_markers_key: /SlowData/exposure_time
38+
millis_counter_key: /DLD/millisecCounter
3839

3940
# Time and binning settings
4041
tof_binwidth: 2.0576131995767355E-11 # Base time-of-flight bin width in seconds

src/sed/core/config_model.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ class DataframeModel(BaseModel):
133133
# mpes specific settings
134134
first_event_time_stamp_key: Optional[str] = None
135135
ms_markers_key: Optional[str] = None
136+
# cfel specific settings
137+
millis_counter_key: Optional[str] = None
136138
# flash specific settings
137139
forward_fill_iterations: Optional[int] = None
138140
ubid_offset: Optional[int] = None

src/sed/loader/cfel/buffer_handler.py

Lines changed: 82 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from pathlib import Path
55

66
import dask.dataframe as dd
7+
from joblib import delayed
8+
from joblib import Parallel
79

810
from sed.core.logging import setup_logging
911
from sed.loader.cfel.dataframe import DataFrameCreator
@@ -45,14 +47,88 @@ def _validate_h5_files(self, config, h5_paths: list[Path]) -> list[Path]:
4547

4648
return valid_h5_paths
4749

48-
def _save_buffer_file(self, paths: dict[str, Path]) -> None:
49-
"""Creates the electron and timed buffer files from the raw H5 file."""
50-
logger.debug(f"Processing file: {paths['raw'].stem}")
51-
start_time = time.time()
50+
def _save_buffer_files(self, force_recreate: bool, debug: bool) -> None:
51+
"""
52+
Creates the buffer files that are missing, handling multi-file runs properly.
5253
53-
# Create DataFrameCreator and get get dataframe
54-
dfc = DataFrameCreator(config_dataframe=self._config, h5_path=paths["raw"])
54+
Args:
55+
force_recreate (bool): Flag to force recreation of buffer files.
56+
debug (bool): Flag to enable debug mode, which serializes the creation.
57+
"""
58+
file_sets = self.fp.file_sets_to_process(force_recreate)
59+
logger.info(f"Reading files: {len(file_sets)} new files of {len(self.fp)} total.")
60+
61+
if len(file_sets) == 0:
62+
return
63+
64+
# Sort file sets by filename to ensure proper order
65+
file_sets = sorted(file_sets, key=lambda x: x['raw'].name)
66+
67+
# Get base timestamp from the first file if we have multiple files
68+
base_timestamp = None
69+
if len(file_sets) > 1:
70+
try:
71+
# Find the first file (ends with _0000)
72+
first_file_set = None
73+
for file_set in file_sets:
74+
if file_set['raw'].stem.endswith('_0000'):
75+
first_file_set = file_set
76+
break
77+
78+
if first_file_set:
79+
# Create a temporary DataFrameCreator to extract base timestamp
80+
first_dfc = DataFrameCreator(
81+
config_dataframe=self._config,
82+
h5_path=first_file_set['raw'],
83+
is_first_file=True
84+
)
85+
base_timestamp = first_dfc.get_base_timestamp()
86+
first_dfc.h5_file.close() # Clean up
87+
logger.info(f"Multi-file run detected. Base timestamp: {base_timestamp}")
88+
except Exception as e:
89+
logger.warning(f"Could not extract base timestamp: {e}. Processing files independently.")
90+
base_timestamp = None
91+
92+
n_cores = min(len(file_sets), self.n_cores)
93+
if n_cores > 0:
94+
if debug:
95+
for file_set in file_sets:
96+
is_first_file = file_set['raw'].stem.endswith('_0000')
97+
self._save_buffer_file(file_set, is_first_file, base_timestamp)
98+
else:
99+
# For parallel processing, we need to be careful about the order
100+
# Process all files in parallel with the correct parameters
101+
from joblib import delayed, Parallel
102+
103+
Parallel(n_jobs=n_cores, verbose=10)(
104+
delayed(self._save_buffer_file)(
105+
file_set,
106+
file_set['raw'].stem.endswith('_0000'),
107+
base_timestamp
108+
)
109+
for file_set in file_sets
110+
)
111+
112+
def _save_buffer_file(self, file_set, is_first_file=True, base_timestamp=None):
113+
"""
114+
Saves an HDF5 file to a Parquet file using the DataFrameCreator class.
115+
116+
Args:
117+
file_set: Dictionary containing file paths
118+
is_first_file: Whether this is the first file in a multi-file run
119+
base_timestamp: Base timestamp from the first file (for subsequent files)
120+
"""
121+
start_time = time.time() # Add this line
122+
paths = file_set
123+
124+
dfc = DataFrameCreator(
125+
config_dataframe=self._config,
126+
h5_path=paths["raw"],
127+
is_first_file=is_first_file,
128+
base_timestamp=base_timestamp
129+
)
55130
df = dfc.df
131+
df_timed = dfc.df_timed
56132

57133
# Save electron resolved dataframe
58134
electron_channels = get_channels(self._config, "per_electron")
@@ -62,7 +138,6 @@ def _save_buffer_file(self, paths: dict[str, Path]) -> None:
62138
electron_df.to_parquet(paths["electron"])
63139

64140
# Create and save timed dataframe
65-
df_timed = dfc.df_timed
66141
dtypes = get_dtypes(self._config, df_timed.columns.values)
67142
timed_df = df_timed.astype(dtypes)
68143
logger.debug(f"Saving timed buffer with shape: {timed_df.shape}")

src/sed/loader/cfel/dataframe.py

Lines changed: 84 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,21 @@ class DataFrameCreator:
2929
_config (dict): The configuration dictionary for the DataFrame.
3030
"""
3131

32-
def __init__(self, config_dataframe: dict, h5_path: Path) -> None:
32+
def __init__(self, config_dataframe: dict, h5_path: Path,
33+
is_first_file: bool = True, base_timestamp: pd.Timestamp = None) -> None:
3334
"""
3435
Initializes the DataFrameCreator class.
3536
3637
Args:
3738
config_dataframe (dict): The configuration dictionary with only the dataframe key.
3839
h5_path (Path): Path to the h5 file.
40+
is_first_file (bool): Whether this is the first file in a multi-file run.
41+
base_timestamp (pd.Timestamp): Base timestamp from the first file (for subsequent files).
3942
"""
4043
self.h5_file = h5py.File(h5_path, "r")
4144
self._config = config_dataframe
45+
self.is_first_file = is_first_file
46+
self.base_timestamp = base_timestamp
4247

4348
index_alias = self._config.get("index", ["countId"])[0]
4449
# all values except the last as slow data starts from start of file
@@ -83,6 +88,19 @@ def get_dataset_array(
8388

8489
return dataset
8590

91+
def get_base_timestamp(self) -> pd.Timestamp:
92+
"""
93+
Extracts the base timestamp from the first file to be used for subsequent files.
94+
95+
Returns:
96+
pd.Timestamp: The base timestamp from the first file.
97+
"""
98+
if not self.is_first_file:
99+
raise ValueError("get_base_timestamp() should only be called on the first file")
100+
101+
first_timestamp = self.h5_file[self._config.get("first_event_time_stamp_key")][0]
102+
return pd.to_datetime(first_timestamp.decode())
103+
86104
@property
87105
def df_electron(self) -> pd.DataFrame:
88106
"""
@@ -141,14 +159,72 @@ def df_train(self) -> pd.DataFrame:
141159
@property
142160
def df_timestamp(self) -> pd.DataFrame:
143161
"""
144-
Uses the first_event_time_stamp_key to get initial timestamp and the
145-
ms_markers_key which is a dataset of exposure times same size as the index."""
162+
For files with first_event_time_stamp_key: Uses that as initial timestamp.
163+
For files with only millis_counter_key: Uses that as absolute timestamp.
164+
Both use ms_markers_key for exposure times within the file.
165+
"""
146166

147-
first_timestamp = self.h5_file[self._config.get("first_event_time_stamp_key")][
148-
0
149-
] # single value
150-
ts_start = pd.to_datetime(first_timestamp.decode())
151-
# actually in seconds but using milliseconds for consistency with mpes loader
167+
# Try to determine which timestamp approach to use based on available data
168+
first_timestamp_key = self._config.get("first_event_time_stamp_key")
169+
millis_counter_key = self._config.get("millis_counter_key", "/DLD/millisecCounter")
170+
171+
has_first_timestamp = (first_timestamp_key is not None and
172+
first_timestamp_key in self.h5_file and
173+
len(self.h5_file[first_timestamp_key]) > 0)
174+
175+
has_millis_counter = (millis_counter_key in self.h5_file and
176+
len(self.h5_file[millis_counter_key]) > 0)
177+
178+
# Log millisecond counter values for ALL files
179+
if has_millis_counter:
180+
millis_counter_values = self.h5_file[millis_counter_key][()]
181+
182+
if self.is_first_file and has_first_timestamp:
183+
logger.warning("DEBUG: Taking first file with scan start timestamp path")
184+
# First file with scan start timestamp
185+
first_timestamp = self.h5_file[first_timestamp_key][0]
186+
base_ts = pd.to_datetime(first_timestamp.decode())
187+
188+
# Also log millisecond counter values for first file if available
189+
if has_millis_counter:
190+
millis_counter_values = self.h5_file[millis_counter_key][()]
191+
millis_min = millis_counter_values[0] # First value
192+
millis_max = millis_counter_values[-1] # Last value
193+
194+
# Add the first millisecond counter value to the base timestamp
195+
ts_start = base_ts + pd.Timedelta(milliseconds=millis_min)
196+
197+
# Calculate what these would be as timestamps
198+
ts_min_from_millis = base_ts + pd.Timedelta(milliseconds=millis_min)
199+
ts_max_from_millis = base_ts + pd.Timedelta(milliseconds=millis_max)
200+
else:
201+
# Fallback if no millisecond counter
202+
ts_start = base_ts
203+
elif not self.is_first_file and self.base_timestamp is not None and has_millis_counter:
204+
# Subsequent files: use base timestamp + millisecond counter offset
205+
millis_counter_values = self.h5_file[millis_counter_key][()] # Get all values
206+
207+
# Get min (first) and max (last) millisecond values
208+
millis_min = millis_counter_values[0] # First value
209+
millis_max = millis_counter_values[-1] # Last value
210+
211+
# Calculate timestamps for min and max
212+
ts_min = self.base_timestamp + pd.Timedelta(milliseconds=millis_min)
213+
ts_max = self.base_timestamp + pd.Timedelta(milliseconds=millis_max)
214+
215+
logger.warning(f"DEBUG: Timestamp for min: {ts_min}")
216+
logger.warning(f"DEBUG: Timestamp for max: {ts_max}")
217+
218+
# Use the first value (start time) for calculating offset
219+
millis_counter = millis_counter_values[0] # First element is the start time
220+
offset = pd.Timedelta(milliseconds=millis_counter)
221+
ts_start = self.base_timestamp + offset
222+
else:
223+
logger.warning("DEBUG: Falling through to undefined ts_start - THIS IS THE PROBLEM!")
224+
logger.warning(f"DEBUG: Condition 1: is_first_file={self.is_first_file} AND has_first_timestamp={has_first_timestamp} = {self.is_first_file and has_first_timestamp}")
225+
logger.warning(f"DEBUG: Condition 2: not is_first_file={not self.is_first_file} AND base_timestamp is not None={self.base_timestamp is not None} AND has_millis_counter={has_millis_counter} = {not self.is_first_file and self.base_timestamp is not None and has_millis_counter}")
226+
227+
# Get exposure times (in seconds) for this file
152228
exposure_time = self.h5_file[self._config.get("ms_markers_key")][()]
153229

154230
# Calculate cumulative exposure times

0 commit comments

Comments
 (0)