-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdestinations.py
More file actions
75 lines (70 loc) · 3 KB
/
Copy pathdestinations.py
File metadata and controls
75 lines (70 loc) · 3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from __future__ import annotations
import logging
from pathlib import Path
from murfey.client.instance_environment import (
MurfeyInstanceEnvironment,
global_env_lock,
)
from murfey.util.client import capture_get, capture_post
logger = logging.getLogger("murfey.client.destinations")
def determine_default_destination(
visit: str,
source: Path,
destination: str,
environment: MurfeyInstanceEnvironment,
token: str,
touch: bool = False,
extra_directory: str = "",
include_mid_path: bool = True,
use_suggested_path: bool = True,
) -> str:
machine_data = capture_get(
base_url=str(environment.url.geturl()),
router_name="session_control.router",
function_name="machine_info_by_instrument",
token=token,
instrument_name=environment.instrument_name,
).json()
_default = f"{destination}/{visit}"
for data_dir in machine_data.get("data_directories", []):
if source.absolute() == Path(data_dir).absolute():
_default = f"{destination}/{visit}"
break
else:
mid_path = source.absolute().relative_to(Path(data_dir).absolute())
if use_suggested_path:
with global_env_lock:
if source.name == "Images-Disc1":
source_name = source.parent.name
elif source.name.startswith("Sample"):
source_name = f"{source.parent.name}_{source.name}"
else:
source_name = source.name
if environment.destination_registry.get(source_name):
_default = environment.destination_registry[source_name]
else:
suggested_path_response = capture_post(
base_url=str(environment.url.geturl()),
router_name="file_io_instrument.router",
function_name="suggest_path",
token=token,
visit_name=visit,
session_id=environment.murfey_session,
data={
"base_path": f"{destination}/{visit}/{mid_path.parent if include_mid_path else ''}/raw",
"touch": touch,
"extra_directory": extra_directory,
},
)
if suggested_path_response is None:
raise RuntimeError("Murfey server is unreachable")
_default = suggested_path_response.json().get("suggested_path")
environment.destination_registry[source_name] = _default
else:
_default = f"{destination}/{visit}/{mid_path if include_mid_path else source.name}"
break
return (
_default + f"/{extra_directory}"
if not _default.endswith("/")
else _default + f"{extra_directory}"
)