-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathfile_io_instrument.py
More file actions
197 lines (170 loc) · 7.19 KB
/
Copy pathfile_io_instrument.py
File metadata and controls
197 lines (170 loc) · 7.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
from datetime import datetime
from logging import getLogger
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from sqlmodel import select
from werkzeug.utils import secure_filename
from murfey.server.api.auth import (
MurfeySessionIDInstrument as MurfeySessionID,
validate_instrument_token,
)
from murfey.server.api.file_io_shared import (
GainReference,
process_gain as _process_gain,
)
from murfey.server.murfey_db import murfey_db
from murfey.util import sanitise, secure_path
from murfey.util.config import get_machine_config
from murfey.util.db import Session, SessionProcessingParameters
from murfey.util.eer import num_frames
logger = getLogger("murfey.server.api.file_io_instrument")
router = APIRouter(
prefix="/file_io/instrument",
dependencies=[Depends(validate_instrument_token)],
tags=["File I/O: Instrument"],
)
class SuggestedPathParameters(BaseModel):
base_path: Path
touch: bool = False
extra_directory: str = ""
@router.post("/visits/{visit_name}/sessions/{session_id}/suggested_path")
def suggest_path(
visit_name: str, session_id: int, params: SuggestedPathParameters, db=murfey_db
):
count: Optional[int] = None
secure_path_parts = [secure_filename(p) for p in params.base_path.parts]
base_path = "/".join(secure_path_parts)
instrument_name = (
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
)
machine_config = get_machine_config(instrument_name=instrument_name)[
instrument_name
]
if not machine_config:
raise ValueError(
"No machine configuration set when suggesting destination path"
)
# Construct the full path to where the dataset is to be saved
check_path = machine_config.rsync_basepath / base_path
# Check previous year to account for the year rolling over during data collection
if not check_path.parent.exists():
base_path_parts = base_path.split("/")
for part in base_path_parts:
# Find the path part corresponding to the year
if len(part) == 4 and part.isdigit():
year_idx = base_path_parts.index(part)
base_path_parts[year_idx] = str(int(part) - 1)
base_path = "/".join(base_path_parts)
check_path_prev = check_path
check_path = machine_config.rsync_basepath / base_path
# If it's not in the previous year either, it's a genuine error
if not check_path.parent.exists():
log_message = (
"Unable to find current visit folder under "
f"{str(check_path_prev.parent)!r} or {str(check_path.parent)!r}"
)
logger.error(log_message)
raise FileNotFoundError(log_message)
check_path_name = check_path.name
while check_path.exists():
count = count + 1 if count else 2
check_path = check_path.parent / f"{check_path_name}{count}"
if params.touch:
check_path.mkdir(mode=0o750)
if params.extra_directory:
(check_path / secure_filename(params.extra_directory)).mkdir(mode=0o750)
return {"suggested_path": check_path.relative_to(machine_config.rsync_basepath)}
class Dest(BaseModel):
destination: Path
@router.post("/sessions/{session_id}/make_rsyncer_destination")
def make_rsyncer_destination(session_id: int, destination: Dest, db=murfey_db):
secure_path_parts = [secure_filename(p) for p in destination.destination.parts]
destination_path = "/".join(secure_path_parts)
instrument_name = (
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
)
machine_config = get_machine_config(instrument_name=instrument_name)[
instrument_name
]
if not machine_config:
raise ValueError("No machine configuration set when making rsyncer destination")
full_destination_path = machine_config.rsync_basepath / destination_path
for parent_path in full_destination_path.parents:
parent_path.mkdir(mode=0o750, exist_ok=True)
return destination
@router.post("/sessions/{session_id}/process_gain")
async def process_gain(
session_id: MurfeySessionID, gain_reference_params: GainReference, db=murfey_db
):
result = await _process_gain(session_id, gain_reference_params, db)
return result
class FractionationParameters(BaseModel):
fractionation: int
dose_per_frame: Optional[float] = None
num_frames: int = 0
eer_path: Optional[str] = None
fractionation_file_name: str = "eer_fractionation.txt"
@router.post("/visits/{visit_name}/sessions/{session_id}/eer_fractionation_file")
async def write_eer_fractionation_file(
visit_name: str,
session_id: int,
fractionation_params: FractionationParameters,
db=murfey_db,
) -> dict:
instrument_name = (
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
)
machine_config = get_machine_config(instrument_name=instrument_name)[
instrument_name
]
if machine_config.eer_fractionation_file_template:
file_path = Path(
machine_config.eer_fractionation_file_template.format(
visit=secure_filename(visit_name),
year=str(datetime.now().year),
)
) / secure_filename(fractionation_params.fractionation_file_name)
else:
file_path = (
Path(machine_config.rsync_basepath)
/ str(datetime.now().year)
/ secure_filename(visit_name)
/ machine_config.gain_directory_name
/ secure_filename(fractionation_params.fractionation_file_name)
)
session_parameters = db.exec(
select(SessionProcessingParameters).where(
SessionProcessingParameters.session_id == session_id
)
).all()
if session_parameters:
fractionation_params.dose_per_frame = session_parameters[0].dose_per_frame
fractionation_params.fractionation = session_parameters[0].eer_fractionation
session_parameters[0].eer_fractionation_file = str(file_path)
db.add(session_parameters[0])
db.commit()
if file_path.is_file():
return {"eer_fractionation_file": str(file_path)}
if not fractionation_params.dose_per_frame:
logger.error("EER fractionation dose not set")
return {"eer_fractionation_file": None}
if fractionation_params.num_frames:
num_eer_frames = fractionation_params.num_frames
elif (
fractionation_params.eer_path
and secure_path(Path(fractionation_params.eer_path)).is_file()
):
num_eer_frames = num_frames(Path(fractionation_params.eer_path))
else:
logger.warning(
f"EER fractionation unable to find {secure_path(Path(fractionation_params.eer_path)) if fractionation_params.eer_path else None} "
f"or use {int(sanitise(str(fractionation_params.num_frames)))} frames"
)
return {"eer_fractionation_file": None}
with open(file_path, "w") as frac_file:
frac_file.write(
f"{num_eer_frames} {fractionation_params.fractionation} {fractionation_params.dose_per_frame / fractionation_params.fractionation}"
)
return {"eer_fractionation_file": str(file_path)}