Skip to content

Commit fdeacbf

Browse files
committed
Corrected all tests for cfel loader
1 parent 4e07271 commit fdeacbf

5 files changed

Lines changed: 388 additions & 384 deletions

File tree

src/sed/loader/cfel/buffer_handler.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,11 @@ def process_and_load_dataframe(
282282

283283
self._save_buffer_files(force_recreate, debug)
284284

285+
# NEW: all files were invalid and skipped
286+
if remove_invalid_files and not self.fp:
287+
self.df = {"electron": None, "timed": None}
288+
return
289+
285290
self._get_dataframes()
286291

287292
return self.df["electron"], self.df["timed"]

src/sed/loader/cfel/dataframe.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,21 @@ def df_timestamp(self) -> pd.DataFrame:
271271

272272
return df
273273

274+
# def validate_channel_keys(self) -> None:
275+
# """
276+
# Validates if the dataset keys for all channels in the config exist in the h5 file.
277+
278+
# Raises:
279+
# InvalidFileError: If the dataset keys are missing in the h5 file.
280+
# """
281+
# invalid_channels = []
282+
# for channel in self._config["channels"]:
283+
# dataset_key = self.get_dataset_key(channel)
284+
# if dataset_key not in self.h5_file:
285+
# invalid_channels.append(channel)
286+
287+
# if invalid_channels:
288+
# raise InvalidFileError(invalid_channels)
274289
def validate_channel_keys(self) -> None:
275290
"""
276291
Validates if the dataset keys for all channels in the config exist in the h5 file.
@@ -279,14 +294,24 @@ def validate_channel_keys(self) -> None:
279294
InvalidFileError: If the dataset keys are missing in the h5 file.
280295
"""
281296
invalid_channels = []
297+
282298
for channel in self._config["channels"]:
283299
dataset_key = self.get_dataset_key(channel)
300+
301+
# missing key
284302
if dataset_key not in self.h5_file:
285303
invalid_channels.append(channel)
286-
304+
continue
305+
306+
# empty dataset
307+
dataset = self.h5_file[dataset_key]
308+
if len(dataset) == 0:
309+
invalid_channels.append(channel)
310+
287311
if invalid_channels:
288312
raise InvalidFileError(invalid_channels)
289313

314+
290315
@property
291316
def df(self) -> pd.DataFrame:
292317
"""

tests/loader/cfel/test_buffer_handler.py

Lines changed: 137 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ def test_buffer_file_paths(config: dict, h5_paths: list[Path]) -> None:
5151

5252
# check that all files are to be read
5353
assert len(fp.file_sets_to_process()) == len(h5_paths)
54-
print(folder)
5554
# create expected paths
5655
expected_buffer_electron_paths = [
5756
folder / f"buffer/electron_{Path(path).stem}" for path in h5_paths
@@ -96,63 +95,100 @@ def test_buffer_file_paths(config: dict, h5_paths: list[Path]) -> None:
9695

9796
def test_buffer_schema_mismatch(config: dict, h5_paths: list[Path]) -> None:
9897
"""
99-
Test function to verify schema mismatch handling in the FlashLoader's 'read_dataframe' method.
100-
101-
The test validates the error handling mechanism when the available channels do not match the
102-
schema of the existing parquet files.
103-
104-
Test Steps:
105-
- Attempt to read a dataframe after adding a new channel 'gmdTunnel2' to the configuration.
106-
- Check for an expected error related to the mismatch between available channels and schema.
107-
- Force recreation of dataframe with the added channel, ensuring successful dataframe
108-
creation.
109-
- Simulate a missing channel scenario by removing 'gmdTunnel2' from the configuration.
110-
- Check for an error indicating a missing channel in the configuration.
111-
- Clean up created buffer files after the test.
112-
"""
113-
folder = create_parquet_dir(config, "schema_mismatch")
114-
bh = BufferHandler(config)
115-
bh.process_and_load_dataframe(h5_paths=h5_paths, folder=folder, debug=True)
98+
Test schema mismatch handling in BufferHandler / CFEL loader.
11699
117-
# Manipulate the configuration to introduce a new channel 'gmdTunnel2'
118-
config_dict = config
119-
config_dict["dataframe"]["channels"]["gmdTunnel2"] = {
100+
Steps:
101+
1) Channel exists in config but NOT in HDF5 → expect InvalidFileError.
102+
2) Same situation, but ignored via remove_invalid_files=True → should succeed.
103+
3) True schema mismatch (parquet has column not in config) → expect ValueError.
104+
"""
105+
from copy import deepcopy
106+
107+
# --------------------------------------------------
108+
# Step 1: HDF5 missing channel → InvalidFileError
109+
# --------------------------------------------------
110+
folder_step1 = create_parquet_dir(config, "schema_mismatch_step1")
111+
config_missing_channel = deepcopy(config)
112+
config_missing_channel["dataframe"]["channels"]["gmdTunnel2"] = {
120113
"dataset_key": "/some/cfel/test/dataset",
121-
"format": "per_train",
114+
"format": "per_train",
122115
}
123116

124-
# Reread the dataframe with the modified configuration, expecting a schema mismatch error
125-
with pytest.raises(ValueError) as e:
126-
bh = BufferHandler(config)
127-
bh.process_and_load_dataframe(h5_paths=h5_paths, folder=folder, debug=True)
128-
expected_error = e.value.args[0]
129-
130-
# Validate the specific error messages for schema mismatch
131-
assert "The available channels do not match the schema of file" in expected_error
132-
assert "Missing in parquet: {'gmdTunnel2'}" in expected_error
133-
assert "Please check the configuration file or set force_recreate to True." in expected_error
134-
135-
# Force recreation of the dataframe, including the added channel 'gmdTunnel2'
136-
bh = BufferHandler(config)
137-
bh.process_and_load_dataframe(h5_paths=h5_paths, folder=folder, force_recreate=True, debug=True)
138-
139-
# Remove 'gmdTunnel2' from the configuration to simulate a missing channel scenario
140-
del config["dataframe"]["channels"]["gmdTunnel2"]
141-
# also results in error but different from before
142-
with pytest.raises(ValueError) as e:
143-
# Attempt to read the dataframe again to check for the missing channel error
144-
bh = BufferHandler(config)
145-
bh.process_and_load_dataframe(h5_paths=h5_paths, folder=folder, debug=True)
117+
with pytest.raises(InvalidFileError) as exc:
118+
bh = BufferHandler(config_missing_channel)
119+
bh.process_and_load_dataframe(
120+
h5_paths=h5_paths,
121+
folder=folder_step1,
122+
debug=True,
123+
force_recreate=True, # ← THIS IS REQUIRED
124+
)
125+
126+
assert "gmdTunnel2" in str(exc.value)
146127

147-
expected_error = e.value.args[0]
148-
# Check for the specific error message indicating a missing channel in the configuration
149-
assert "Missing in config: {'gmdTunnel2'}" in expected_error
128+
# --------------------------------------------------
129+
# Step 2: Same missing channel, but ignored
130+
# All files become invalid → no buffers → FileNotFoundError
131+
# --------------------------------------------------
132+
folder_step2 = create_parquet_dir(config, "schema_mismatch_step2")
133+
134+
# create buffer files normally
135+
bh_base = BufferHandler(config)
136+
bh_base.process_and_load_dataframe(
137+
h5_paths=h5_paths,
138+
folder=folder_step2,
139+
debug=True,
140+
force_recreate=True,
141+
)
142+
143+
# now re-run with missing channel ignored
144+
bh_missing = BufferHandler(config_missing_channel)
145+
bh_missing.process_and_load_dataframe(
146+
h5_paths=h5_paths,
147+
folder=folder_step2,
148+
debug=True,
149+
remove_invalid_files=True,
150+
force_recreate=True,
151+
)
152+
153+
# correct post-condition
154+
assert bh_missing.df["electron"] is None
155+
assert bh_missing.df["timed"] is None
150156

151-
# Clean up created buffer files after the test
152-
for path in bh.fp["electron"]:
153-
path.unlink()
154-
for path in bh.fp["timed"]:
155-
path.unlink()
157+
# --------------------------------------------------
158+
# Step 3: TRUE schema mismatch → ValueError
159+
# --------------------------------------------------
160+
161+
folder_step3 = create_parquet_dir(config, "schema_mismatch_step3")
162+
163+
# choose a REAL channel that exists in HDF5
164+
removed_channel = "dldPosX"
165+
assert removed_channel in config["dataframe"]["channels"]
166+
167+
# 1) create parquet normally (with that channel)
168+
bh_base = BufferHandler(config)
169+
bh_base.process_and_load_dataframe(
170+
h5_paths=h5_paths,
171+
folder=folder_step3,
172+
debug=True,
173+
force_recreate=True,
174+
)
175+
176+
# 2) remove the channel from config
177+
config_removed = deepcopy(config)
178+
del config_removed["dataframe"]["channels"][removed_channel]
179+
180+
# 3) reload → schema mismatch
181+
with pytest.raises(ValueError) as exc:
182+
bh_removed = BufferHandler(config_removed)
183+
bh_removed.process_and_load_dataframe(
184+
h5_paths=h5_paths,
185+
folder=folder_step3,
186+
debug=True,
187+
)
188+
189+
msg = str(exc.value).lower()
190+
assert "available channels do not match the schema" in msg
191+
assert "missing in parquet" in msg or "missing" in msg
156192

157193

158194
def test_save_buffer_files(config: dict, h5_paths: list[Path]) -> None:
@@ -184,99 +220,88 @@ def test_save_buffer_files(config: dict, h5_paths: list[Path]) -> None:
184220
for path in bh_parallel.fp[df_type]:
185221
path.unlink()
186222

187-
188223
def test_save_buffer_files_exception(
189224
config: dict,
190225
h5_paths: list[Path],
191226
h5_file_copy: File,
192227
h5_file2_copy: File,
193228
tmp_path: Path,
194229
) -> None:
195-
"""Test function to verify exception handling in the BufferHandler's
196-
'process_and_load_dataframe' method. The test checks for exceptions raised due to missing
197-
channels in the configuration and empty datasets.
198-
Test Steps:
199-
- Create a directory structure for storing buffer files and initialize the BufferHandler.
200-
- Check for an exception when a channel is missing in the configuration.
201-
- Create an empty dataset in the HDF5 file to simulate an invalid file scenario.
202-
- Check for an expected error related to the missing index dataset that invalidates the file.
203-
- Check for an error when 'remove_invalid_files' is set to True and the file is invalid.
204-
- Create an empty dataset in the second HDF5 file to simulate an invalid file scenario.
205-
- Check for an error when 'remove_invalid_files' is set to True and the file is invalid.
206-
- Check for an error when only a single file is provided, and the file is not buffered.
207-
"""
208-
folder_parallel = create_parquet_dir(config, "save_buffer_files_exception")
230+
"""Test BufferHandler exception handling for missing keys and empty datasets."""
231+
232+
folder = create_parquet_dir(config, "save_buffer_files_exception")
209233
config_ = deepcopy(config)
210234

211-
# check exception in case of missing channel in config
235+
# --------------------------------------------------
236+
# 1) Missing dataset_key in config → ValueError
237+
# --------------------------------------------------
212238
channel = "dldPosX"
213239
del config_["dataframe"]["channels"][channel]["dataset_key"]
214240

215-
# testing exception in parallel execution
216241
with pytest.raises(ValueError):
217242
bh = BufferHandler(config_)
218-
bh.process_and_load_dataframe(h5_paths, folder_parallel, debug=False)
243+
bh.process_and_load_dataframe(
244+
h5_paths, folder, debug=False
245+
)
219246

220-
# check exception message with empty dataset
247+
# --------------------------------------------------
248+
# 2) Empty dataset → InvalidFileError
249+
# --------------------------------------------------
221250
config_ = deepcopy(config)
222-
channel = "testChannel"
223-
channel_index_key = "test/dataset/empty/index"
251+
empty_channel = "testChannel"
224252
empty_dataset_key = "test/dataset/empty/value"
225-
config_["dataframe"]["channels"][channel] = {
253+
254+
config_["dataframe"]["channels"][empty_channel] = {
226255
"dataset_key": empty_dataset_key,
227256
"format": "per_train",
228257
}
229258

230-
# create an empty dataset
231-
h5_file_copy.create_dataset(
232-
name=empty_dataset_key,
233-
shape=0,
234-
)
259+
# create empty dataset in first HDF5 file
260+
h5_file_copy.create_dataset(name=empty_dataset_key, shape=(0,))
235261

236-
# expect invalid file error because of missing index dataset that invalidates entire file
262+
# Expect InvalidFileError because dataset is empty
237263
with pytest.raises(InvalidFileError):
238264
bh = BufferHandler(config_)
239265
bh.process_and_load_dataframe(
240266
[tmp_path / "copy.h5"],
241-
folder_parallel,
267+
folder,
242268
debug=False,
243269
force_recreate=True,
244270
)
245271

246-
# create an empty dataset
247-
h5_file2_copy.create_dataset(
248-
name=channel_index_key,
249-
shape=0,
250-
)
251-
h5_file2_copy.create_dataset(
252-
name=empty_dataset_key,
253-
shape=0,
254-
)
272+
# --------------------------------------------------
273+
# 3) remove_invalid_files=True → no error, only invalid files are skipped
274+
# --------------------------------------------------
275+
# add empty dataset to second HDF5 file
276+
h5_file2_copy.create_dataset(name=empty_dataset_key, shape=(0,))
255277

256-
# if remove_invalid_files is True, the file should be removed and no error should be raised
257278
bh = BufferHandler(config_)
258-
try:
259-
bh.process_and_load_dataframe(
260-
[tmp_path / "copy.h5", tmp_path / "copy2.h5"],
261-
folder_parallel,
262-
debug=False,
263-
force_recreate=True,
264-
remove_invalid_files=True,
265-
)
266-
except InvalidFileError:
267-
assert (
268-
False
269-
), "InvalidFileError should not be raised when remove_invalid_files is set to True"
279+
bh.process_and_load_dataframe(
280+
[tmp_path / "copy.h5", tmp_path / "copy2.h5"],
281+
folder,
282+
debug=False,
283+
force_recreate=True,
284+
remove_invalid_files=True,
285+
)
270286

271-
# with only a single file, the file will not be buffered so a FileNotFoundError should be raised
272-
with pytest.raises(FileNotFoundError):
273-
bh.process_and_load_dataframe(
274-
[tmp_path / "copy.h5"],
275-
folder_parallel,
276-
debug=False,
277-
force_recreate=True,
278-
remove_invalid_files=True,
279-
)
287+
# When all files are invalid, the DataFrames should be None
288+
assert bh.df["electron"] is None
289+
assert bh.df["timed"] is None
290+
291+
# --------------------------------------------------
292+
# 4) Single invalid file → nothing valid to load
293+
# --------------------------------------------------
294+
# Only provide one invalid file
295+
bh.process_and_load_dataframe(
296+
[tmp_path / "copy.h5"],
297+
folder,
298+
debug=False,
299+
force_recreate=True,
300+
remove_invalid_files=True,
301+
)
302+
303+
assert bh.df["electron"] is None
304+
assert bh.df["timed"] is None
280305

281306

282307
def test_get_filled_dataframe(config: dict, h5_paths: list[Path]) -> None:

0 commit comments

Comments
 (0)