-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathwatchdir_multigrid.py
More file actions
132 lines (119 loc) · 4.61 KB
/
Copy pathwatchdir_multigrid.py
File metadata and controls
132 lines (119 loc) · 4.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from __future__ import annotations
import logging
import os
import threading
import time
from pathlib import Path
from typing import List
from murfey.util.client import Observer
log = logging.getLogger("murfey.client.watchdir_multigrid")
class MultigridDirWatcher(Observer):
def __init__(
self,
path: str | os.PathLike,
machine_config: dict,
):
super().__init__()
self._basepath = Path(path)
self._machine_config = machine_config
self._seen_dirs: List[Path] = []
self.thread = threading.Thread(
name=f"MultigridDirWatcher {self._basepath}",
target=self._process,
daemon=True,
)
# Toggleable settings
self._analyse = True
self._stopping = False
def start(self):
if self.thread.is_alive():
raise RuntimeError("DirWatcher already running")
log.info(f"MultigridDirWatcher thread starting for {self}")
self.thread.start()
def request_stop(self):
self._stopping = True
self._halt_thread = True
def stop(self):
log.debug("MultigridDirWatcher thread stop requested")
self._stopping = True
self._halt_thread = True
if self.thread.is_alive():
self.thread.join()
log.debug("MultigridDirWatcher thread stop completed")
def _handle_metadata(self, directory: Path, extra_directory: str):
self.notify(
directory,
extra_directory=extra_directory,
analyse=self._analyse,
limited=True,
tag="metadata",
)
self._seen_dirs.append(directory)
def _handle_fractions(self, directory: Path):
processing_started = False
for d02 in directory.glob("Images-Disc*"):
if d02 not in self._seen_dirs:
self.notify(
d02,
remove_files=True,
analyse=self._analyse,
tag="fractions",
)
self._seen_dirs.append(d02)
processing_started = d02 in self._seen_dirs
if not processing_started:
if (
directory.is_dir()
and directory not in self._seen_dirs
and list(directory.iterdir())
):
self.notify(
directory,
analyse=self._analyse,
tag="fractions",
)
self._seen_dirs.append(directory)
def _process(self):
while not self._stopping:
for d in self._basepath.glob("*"):
if d.name in self._machine_config["create_directories"]:
if d.is_dir() and d not in self._seen_dirs:
self.notify(
d,
use_suggested_path=False,
analyse=(
(
d.name
in self._machine_config[
"analyse_created_directories"
]
)
if self._analyse
else False
),
tag="atlas",
)
self._seen_dirs.append(d)
else:
# hack for tomo multigrid metadata structure
sample_dirs = list(d.glob("Sample*"))
if d.is_dir() and len(sample_dirs):
for sample in sample_dirs:
if len(list(sample.glob("*.mdoc"))):
if sample not in self._seen_dirs:
self._handle_metadata(
sample,
extra_directory=f"metadata_{sample.parent.name}_{sample.name}",
)
self._handle_fractions(
sample.parent.parent.parent
/ f"{sample.parent.name}_{sample.name}",
)
else:
if d.is_dir() and d not in self._seen_dirs:
self._handle_metadata(
d, extra_directory=f"metadata_{d.name}"
)
self._handle_fractions(d.parent.parent / d.name)
time.sleep(15)
self.notify(final=True)