-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathfile_io_instrument.py
More file actions
222 lines (193 loc) · 8.2 KB
/
Copy pathfile_io_instrument.py
File metadata and controls
222 lines (193 loc) · 8.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import os
from datetime import datetime
from logging import getLogger
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from sqlmodel import select
from werkzeug.utils import secure_filename
from murfey.server.api.auth import (
MurfeySessionIDInstrument as MurfeySessionID,
validate_instrument_token,
)
from murfey.server.api.file_io_shared import (
GainReference,
process_gain as _process_gain,
)
from murfey.server.murfey_db import murfey_db
from murfey.util import sanitise, secure_path
from murfey.util.config import get_machine_config
from murfey.util.db import Session, SessionProcessingParameters
from murfey.util.eer import num_frames
logger = getLogger("murfey.server.api.file_io_instrument")
router = APIRouter(
prefix="/file_io/instrument",
dependencies=[Depends(validate_instrument_token)],
tags=["File I/O: Instrument"],
)
class SuggestedPathParameters(BaseModel):
base_path: (
Path # Partial Path starting from immediately after the rsync destination
)
touch: bool = False
extra_directory: str = ""
@router.post("/visits/{visit_name}/sessions/{session_id}/suggested_path")
def suggest_path(
visit_name: str, session_id: int, params: SuggestedPathParameters, db=murfey_db
):
count: Optional[int] = None
secure_path_parts = [secure_filename(p) for p in params.base_path.parts]
base_path = "/".join(secure_path_parts)
instrument_name = (
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
)
machine_config = get_machine_config(instrument_name=instrument_name)[
instrument_name
]
if not machine_config:
raise ValueError(
"No machine configuration set when suggesting destination path"
)
# Construct the full path to where the dataset is to be saved
rsync_basepath = (machine_config.rsync_basepath or Path("")).resolve()
check_path = rsync_basepath / base_path
# Check previous year to account for the year rolling over during data collection
if not check_path.parent.exists():
base_path_parts = base_path.split("/")
for part in base_path_parts:
# Find the path part corresponding to the year
if len(part) == 4 and part.isdigit():
year_idx = base_path_parts.index(part)
base_path_parts[year_idx] = str(int(part) - 1)
base_path = "/".join(base_path_parts)
check_path_prev = check_path
check_path = rsync_basepath / base_path
# If it's not in the previous year either, it's a genuine error
if not check_path.parent.exists():
log_message = (
"Unable to find current visit folder under "
f"{str(check_path_prev.parent)!r} or {str(check_path.parent)!r}"
)
logger.error(log_message)
raise FileNotFoundError(log_message)
check_path_name = check_path.name
while check_path.exists():
count = count + 1 if count else 2
check_path = check_path.parent / f"{check_path_name}{count}"
if params.touch:
check_path.mkdir()
os.chmod(check_path, mode=machine_config.mkdir_chmod)
if params.extra_directory:
extra_dir = check_path / secure_filename(params.extra_directory)
extra_dir.mkdir()
os.chmod(extra_dir, mode=machine_config.mkdir_chmod)
return {"suggested_path": check_path.relative_to(rsync_basepath)}
class Dest(BaseModel):
destination: Path
@router.post("/sessions/{session_id}/make_rsyncer_destination")
def make_rsyncer_destination(session_id: int, destination: Dest, db=murfey_db):
secure_path_parts = [secure_filename(p) for p in destination.destination.parts]
destination_path = "/".join(secure_path_parts)
session_entry = db.exec(select(Session).where(Session.id == session_id)).one()
instrument_name = session_entry.instrument_name
visit = session_entry.visit
machine_config = get_machine_config(instrument_name=instrument_name)[
instrument_name
]
if not machine_config:
raise ValueError("No machine configuration set when making rsyncer destination")
# Make the destination directory and all parents
full_destination_path = (
machine_config.rsync_basepath or Path("")
).resolve() / destination_path
full_destination_path.mkdir(parents=True, exist_ok=True)
# Change permissions for every folder after the visit directory
try:
visit_index = full_destination_path.parts.index(visit)
except ValueError:
logger.error(f"Could not find directory level {visit!r} in destination path")
raise
current_path = full_destination_path.parents[-(visit_index + 1)]
for part in full_destination_path.parts[visit_index + 1 :]:
current_path = current_path / part
try:
os.chmod(current_path, mode=machine_config.mkdir_chmod)
except PermissionError:
logger.warning(f"Unable to change permissions for {current_path}")
continue
return destination
@router.post("/sessions/{session_id}/process_gain")
async def process_gain(
session_id: MurfeySessionID, gain_reference_params: GainReference, db=murfey_db
):
result = await _process_gain(session_id, gain_reference_params, db)
return result
class FractionationParameters(BaseModel):
fractionation: int
dose_per_frame: Optional[float] = None
num_frames: int = 0
eer_path: Optional[str] = None
fractionation_file_name: str = "eer_fractionation.txt"
@router.post("/visits/{visit_name}/sessions/{session_id}/eer_fractionation_file")
async def write_eer_fractionation_file(
visit_name: str,
session_id: int,
fractionation_params: FractionationParameters,
db=murfey_db,
) -> dict:
instrument_name = (
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
)
machine_config = get_machine_config(instrument_name=instrument_name)[
instrument_name
]
if machine_config.eer_fractionation_file_template:
file_path = Path(
machine_config.eer_fractionation_file_template.format(
visit=secure_filename(visit_name),
year=str(datetime.now().year),
)
) / secure_filename(fractionation_params.fractionation_file_name)
else:
file_path = (
(machine_config.rsync_basepath or Path("")).resolve()
/ str(datetime.now().year)
/ secure_filename(visit_name)
/ machine_config.gain_directory_name
/ secure_filename(fractionation_params.fractionation_file_name)
)
session_parameters = db.exec(
select(SessionProcessingParameters).where(
SessionProcessingParameters.session_id == session_id
)
).all()
if session_parameters:
fractionation_params.dose_per_frame = session_parameters[0].dose_per_frame
fractionation_params.fractionation = session_parameters[0].eer_fractionation
session_parameters[0].eer_fractionation_file = str(file_path)
db.add(session_parameters[0])
db.commit()
if file_path.is_file():
return {"eer_fractionation_file": str(file_path)}
if not fractionation_params.dose_per_frame:
logger.error("EER fractionation dose not set")
return {"eer_fractionation_file": None}
if fractionation_params.num_frames:
num_eer_frames = fractionation_params.num_frames
elif (
fractionation_params.eer_path
and secure_path(Path(fractionation_params.eer_path)).is_file()
):
num_eer_frames = num_frames(Path(fractionation_params.eer_path))
else:
logger.warning(
f"EER fractionation unable to find {secure_path(Path(fractionation_params.eer_path)) if fractionation_params.eer_path else None} "
f"or use {int(sanitise(str(fractionation_params.num_frames)))} frames"
)
return {"eer_fractionation_file": None}
with open(file_path, "w") as frac_file:
frac_file.write(
f"{num_eer_frames} {fractionation_params.fractionation} {fractionation_params.dose_per_frame / fractionation_params.fractionation}"
)
return {"eer_fractionation_file": str(file_path)}