Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/murfey/client/multigrid_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,13 @@ def rsync_result(update: RSyncerUpdate):
session_id=self._environment.murfey_session,
data=rsyncer_data,
)
self._environment.watchers[source] = DirWatcher(source, settling_time=30)
self._environment.watchers[source] = DirWatcher(
source,
settling_time=30,
substrings_blacklist=self._machine_config.get(
"substrings_blacklist", {"directories": [], "files": []}
),
)

if not self.analysers.get(source) and analyse:
log.info(f"Starting analyser for {source}")
Expand Down
24 changes: 19 additions & 5 deletions src/murfey/client/watchdir.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(
path: str | os.PathLike,
settling_time: float = 60,
appearance_time: float | None = None,
substrings_blacklist: dict[str, dict] = {},
transfer_all: bool = True,
status_bar: StatusBar | None = None,
):
Expand All @@ -42,6 +43,7 @@ def __init__(
self._statusbar = status_bar
self.settling_time = settling_time
self._appearance_time = appearance_time
self._substrings_blacklist = substrings_blacklist
self._transfer_all = transfer_all
self._modification_overwrite: float | None = None
self._init_time: float = time.time()
Expand Down Expand Up @@ -128,7 +130,7 @@ def scan(self, modification_time: float | None = None, transfer_all: bool = Fals
settling_time=scan_completion
)

# Create a list of files sroted based on their timestamps
# Create a list of files stored based on their timestamps
Comment thread
tieneupin marked this conversation as resolved.
Outdated
files_for_transfer = []
time_ordered_file_candidates = sorted(
self._file_candidates,
Expand All @@ -150,8 +152,9 @@ def scan(self, modification_time: float | None = None, transfer_all: bool = Fals
continue

if (
self._file_candidates[x].settling_time + self.settling_time # type: ignore
< time.time()
current_file_settling_time := self._file_candidates[x].settling_time
) is not None and (
current_file_settling_time + self.settling_time < time.time()
):
try:
file_stat = os.stat(x)
Expand Down Expand Up @@ -252,15 +255,26 @@ def _scan_directory(
raise
for entry in directory_contents:
entry_name = os.path.join(path, entry.name)
if entry.is_dir() and (
# Skip any directories with matching blacklisted substrings
if entry.is_dir() and any(
char in entry.name
for char in self._substrings_blacklist.get("directories", [])
):
continue
Comment thread
tieneupin marked this conversation as resolved.
elif entry.is_dir() and (
modification_time is None or entry.stat().st_ctime >= modification_time
):
result.update(self._scan_directory(entry_name))
else:
# Exclude textual log
if "textual" in str(entry):
continue

# Exclude files with blacklisted substrings
if any(
char in entry.name
for char in self._substrings_blacklist.get("files", [])
):
continue
Comment thread
tieneupin marked this conversation as resolved.
# Get file statistics and append file to dictionary
try:
file_stat = entry.stat()
Expand Down
4 changes: 4 additions & 0 deletions src/murfey/util/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class MachineConfig(BaseModel): # type: ignore
analyse_created_directories: list[str] = []
gain_reference_directory: Optional[Path] = None
eer_fractionation_file_template: str = ""
substrings_blacklist: dict[str, list] = {
"directories": [],
"files": [],
}

# Data transfer setup -------------------------------------------------------------
# Rsync setup
Expand Down
16 changes: 11 additions & 5 deletions src/murfey/workflows/clem/register_preprocessing_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ class CLEMPreprocessingResult(BaseModel):
extent: list[float] # [x0, x1, y0, y1]


def _is_clem_atlas(result: CLEMPreprocessingResult):
# If an image has a width/height of at least 1.5 mm, it should qualify as an atlas
return (
max(result.pixels_x * result.pixel_size, result.pixels_y * result.pixel_size)
>= 0.0015 # In metres
Comment thread
tieneupin marked this conversation as resolved.
Outdated
)


def _register_clem_image_series(
session_id: int,
result: CLEMPreprocessingResult,
Expand Down Expand Up @@ -142,9 +150,7 @@ def _register_clem_image_series(

# Add metadata for this series
clem_img_series.search_string = str(output_file.parent / "*tiff")
clem_img_series.data_type = (
"atlas" if "Overview_" in result.series_name else "grid_square"
)
clem_img_series.data_type = "atlas" if _is_clem_atlas(result) else "grid_square"
clem_img_series.number_of_members = result.number_of_members
clem_img_series.pixels_x = result.pixels_x
clem_img_series.pixels_y = result.pixels_y
Expand Down Expand Up @@ -181,7 +187,7 @@ def _register_dcg_and_atlas(
dcg_name += f"--{result.series_name.split('--')[1]}"

# Determine values for atlas
if "Overview_" in result.series_name: # These are atlas datasets
if _is_clem_atlas(result):
output_file = list(result.output_files.values())[0]
atlas_name = str(output_file.parent / "*.tiff")
atlas_pixel_size = result.pixel_size
Expand All @@ -197,7 +203,7 @@ def _register_dcg_and_atlas(
dcg_entry = dcg_search[0]
# Update atlas if registering atlas dataset
# and data collection group already exists
if "Overview_" in result.series_name:
if _is_clem_atlas(result):
atlas_message = {
"session_id": session_id,
"dcgid": dcg_entry.id,
Expand Down
180 changes: 180 additions & 0 deletions tests/client/test_watchdir.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import os
import queue
import threading
from pathlib import Path

import pytest

from murfey.client.watchdir import DirWatcher
from tests.conftest import ExampleVisit


def test_dirwatcher_initialises(tmp_path: Path):
# Check that the DirWatcher initialises with the default attributes
watcher = DirWatcher(path=str(tmp_path))
assert watcher._basepath == os.fspath(str(tmp_path))
assert watcher._lastscan == {}
assert watcher._file_candidates == {}
assert watcher._statusbar is None
assert watcher.settling_time == 60
assert watcher._appearance_time is None
assert watcher._substrings_blacklist == {}
assert watcher._transfer_all is True
assert watcher._modification_overwrite is None
assert isinstance(watcher._init_time, float)
assert isinstance(watcher.queue, queue.Queue)
assert isinstance(watcher.thread, threading.Thread)
assert watcher._stopping is False
assert watcher._halt_thread is False

# Check that the string representation is as expected
assert str(watcher) == f"<DirWatcher ({os.fspath(str(tmp_path))})>"


@pytest.fixture
def clem_visit_dir(tmp_path: Path):
visit_name = f"{ExampleVisit.proposal_code}{ExampleVisit.proposal_number}-{ExampleVisit.visit_number}"
visit_dir = tmp_path / "clem" / "data" / "2025" / visit_name
visit_dir.mkdir(parents=True, exist_ok=True)
return visit_dir


@pytest.fixture
def clem_test_files(clem_visit_dir: Path):
# Create test files for the DirWatcher to scan
file_list: list[Path] = []
project_dir = clem_visit_dir / "images" / "test_grid"

# Example atlas collection
for s in range(20):
file_list.append(
project_dir
/ "Overview 1"
/ "Image 1"
/ f"Image 1--Stage{str(s).zfill(2)}.tif"
)
file_list.append(
project_dir / "Overview 1" / "Image 1" / "Metadata" / "Image 1.xlif"
)

# Example image stack collection
for c in range(3):
for z in range(10):
file_list.append(
project_dir
/ "TileScan 1"
/ "Position 1"
/ f"Position 1--C{str(c).zfill(2)}--Z{str(z).zfill(2)}.tif"
)
file_list.append(
project_dir / "TileScan 1" / "Position 1" / "Metadata" / "Position 1.xlif"
)

# Create all files and directories specified
for file in file_list:
if not file.parent.exists():
file.parent.mkdir(parents=True)
if not file.exists():
file.touch()
return sorted(file_list)


@pytest.fixture
def clem_junk_files(clem_visit_dir: Path):
# Create junk files that are to be blacklisted from the CLEM workflow
file_list: list[Path] = []
project_dir = clem_visit_dir / "images" / "test_grid"

# Create junk atlas data
for n in range(5):
for s in range(20):
file_list.append(
project_dir
/ "Image 1"
/ f"Image 1_pmd_{n}"
/ f"Image 1_pmd_{n}--Stage{str(s).zfill(2)}.tif"
)
file_list.append(
project_dir / "Image 1" / f"Image 1_pmd_{n}" / "Metadata" / "Image 1.xlif"
)

# Creat junk image data
for n in range(5):
for c in range(3):
for z in range(10):
file_list.append(
project_dir
/ "Position 1"
/ f"Position 1_pmd_{n}"
/ f"Position 1_pmd_{n}--C{str(c).zfill(2)}--Z{str(z).zfill(2)}.tif"
)
file_list.append(
project_dir
/ "Position 1"
/ f"Position 1_pmd_{n}"
/ "Metadata"
/ "Position 1.xlif"
)

# Create remaining junk files
for file_path in (
"1.xlef",
"Metadata/IOManagerConfiguation.xlif",
"Metadata/Overview 1.xlcf",
"Metadata/TileScan 1.xlcf",
"Overview 1/Image 1/Image 1_histo.lof",
"TileScan 1/Position 1/Position 1_histo.lof",
"Overview 1/Image 1/Metadata/Image 1_histo.xlif",
"TileScan 1/Position 1/Metadata/Position 1_histo.xlif",
):
file_list.append(project_dir / file_path)

# Create files and directoriees
for file in file_list:
if not file.parent.exists():
file.parent.mkdir(parents=True)
if not file.exists():
file.touch()
return sorted(file_list)


scan_directory_params_matrix: tuple[tuple[str, dict[str, list[str]]], ...] = (
# Workflow type | Substrings blacklist
(
"clem",
{
"directories": [
"_pmd_",
],
"files": [
".xlef",
".xlcf",
"_histo.lof",
"_histo.xlif",
"IOManagerConfiguation.xlif",
],
},
),
)


@pytest.mark.parametrize("test_params", scan_directory_params_matrix)
def test_scan_directory(
clem_visit_dir: Path,
clem_test_files: list[Path],
clem_junk_files: list[Path],
test_params: tuple[str, dict[str, list[str]]],
):
# Unpack test params
workflow_type, substrings_blacklist = test_params

# Initialise different watchers based on the workflow to test and run the scan
if workflow_type == "clem":
watcher = DirWatcher(
path=str(clem_visit_dir),
substrings_blacklist=substrings_blacklist,
)
result = watcher._scan_directory()

# Check that the result does not contain the junk files
assert [str(file) for file in clem_test_files] == sorted(result.keys())
Loading