Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions src/murfey/instrument_server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,16 @@
def setup_multigrid_watcher(
session_id: MurfeySessionID, watcher_spec: MultigridWatcherSpec
):
# Return True if controllers are already set up
if controllers.get(session_id) is not None:
return {"success": True}

# Load machine config as dictionary
machine_config: dict[str, Any] = requests.get(

Check warning on line 149 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L149

Added line #L149 was not covered by tests
f"{_get_murfey_url()}/instruments/{sanitise_nonpath(watcher_spec.instrument_name)}/machine",
headers={"Authorization": f"Bearer {tokens[session_id]}"},
).json()
Comment thread Dismissed

label = watcher_spec.label
for sid, controller in controllers.items():
if controller.dormant:
Expand All @@ -156,22 +164,19 @@
demo=True,
do_transfer=True,
processing_enabled=not watcher_spec.skip_existing_processing,
_machine_config=watcher_spec.configuration.dict(),
_machine_config=machine_config,
token=tokens.get(session_id, "token"),
data_collection_parameters=data_collection_parameters.get(label, {}),
rsync_restarts=watcher_spec.rsync_restarts,
visit_end_time=watcher_spec.visit_end_time,
)
watcher_spec.source.mkdir(exist_ok=True)
machine_config = requests.get(
f"{_get_murfey_url()}/instruments/{sanitise_nonpath(watcher_spec.instrument_name)}/machine",
headers={"Authorization": f"Bearer {tokens[session_id]}"},
).json()

for d in machine_config.get("create_directories", []):
(watcher_spec.source / d).mkdir(exist_ok=True)
watchers[session_id] = MultigridDirWatcher(
watcher_spec.source,
watcher_spec.configuration.dict(),
machine_config,
skip_existing_processing=watcher_spec.skip_existing_processing,
)
watchers[session_id].subscribe(
Expand Down
11 changes: 1 addition & 10 deletions src/murfey/server/api/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,22 +106,13 @@ async def setup_multigrid_watcher(
if machine_config.instrument_server_url:
session = db.exec(select(Session).where(Session.id == session_id)).one()
visit = session.visit
_config = {
"acquisition_software": machine_config.acquisition_software,
"calibrations": machine_config.calibrations,
"data_directories": [str(k) for k in machine_config.data_directories],
"create_directories": [str(k) for k in machine_config.create_directories],
"rsync_basepath": str(machine_config.rsync_basepath),
"visit": visit,
"default_model": str(machine_config.default_model),
}

async with aiohttp.ClientSession() as clientsession:
async with clientsession.post(
f"{machine_config.instrument_server_url}/sessions/{session_id}/multigrid_watcher",
json={
"source": str(secure_path(watcher_spec.source / visit)),
"visit": visit,
"configuration": _config,
"label": visit,
"instrument_name": instrument_name,
"skip_existing_processing": watcher_spec.skip_existing_processing,
Expand Down
131 changes: 88 additions & 43 deletions src/murfey/util/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,62 @@
import socket
from functools import lru_cache
from pathlib import Path
from typing import Dict, List, Literal, Optional, Union
from typing import Literal, Optional, Union

import yaml
from backports.entry_points_selectable import entry_points
from pydantic import BaseModel, BaseSettings, Extra, validator


class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore
acquisition_software: List[str]
calibrations: Dict[str, Dict[str, Union[dict, float]]]
data_directories: List[Path]
rsync_basepath: Path
default_model: Path
class MachineConfig(BaseModel): # type: ignore
"""
Keys that describe the type of workflow conducted on the client side, and how
Murfey will handle its data transfer and processing
"""

# General info --------------------------------------------------------------------
display_name: str = ""
instrument_name: str = ""
image_path: Optional[Path] = None
software_versions: Dict[str, str] = {}
external_executables: Dict[str, str] = {}
external_executables_eer: Dict[str, str] = {}
external_environment: Dict[str, str] = {}
rsync_module: str = ""
machine_override: str = ""

# Hardware and software -----------------------------------------------------------
camera: str = "FALCON"
superres: bool = False
calibrations: dict[str, dict[str, Union[dict, float]]]
acquisition_software: list[str]
software_versions: dict[str, str] = {}
software_settings_output_directories: dict[str, list[str]] = {}
data_required_substrings: dict[str, dict[str, list[str]]] = {}

# Client side directory setup -----------------------------------------------------
data_directories: list[Path]
create_directories: list[str] = ["atlas"]
analyse_created_directories: List[str] = []
analyse_created_directories: list[str] = []
gain_reference_directory: Optional[Path] = None
eer_fractionation_file_template: str = ""
processed_directory_name: str = "processed"
gain_directory_name: str = "processing"
node_creator_queue: str = "node_creator"
superres: bool = False
camera: str = "FALCON"
data_required_substrings: Dict[str, Dict[str, List[str]]] = {}
allow_removal: bool = False

# Data transfer setup -------------------------------------------------------------
# Rsync setup
data_transfer_enabled: bool = True
rsync_url: str = ""
rsync_module: str = ""
rsync_basepath: Path
allow_removal: bool = False

# Upstream data download setup
upstream_data_directories: list[Path] = [] # Previous sessions
upstream_data_download_directory: Optional[Path] = None # Set by microscope config
upstream_data_tiff_locations: list[str] = ["processed"] # Location of CLEM TIFFs

# Data processing setup -----------------------------------------------------------
# General processing setup
processing_enabled: bool = True
machine_override: str = ""
processed_extra_directory: str = ""
plugin_packages: Dict[str, Path] = {}
software_settings_output_directories: Dict[str, List[str]] = {}
process_by_default: bool = True
recipes: Dict[str, str] = {
gain_directory_name: str = "processing"
processed_directory_name: str = "processed"
processed_extra_directory: str = ""
recipes: dict[str, str] = {
"em-spa-bfactor": "em-spa-bfactor",
"em-spa-class2d": "em-spa-class2d",
"em-spa-class3d": "em-spa-class3d",
Expand All @@ -53,26 +69,41 @@ class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore
"em-tomo-align": "em-tomo-align",
}

# Find and download upstream directories
upstream_data_directories: List[Path] = [] # Previous sessions
upstream_data_download_directory: Optional[Path] = None # Set by microscope config
upstream_data_tiff_locations: List[str] = ["processed"] # Location of CLEM TIFFs

# Particle picking setup
default_model: Path
model_search_directory: str = "processing"
initial_model_search_directory: str = "processing/initial_model"

failure_queue: str = ""
instrument_server_url: str = "http://localhost:8001"
frontend_url: str = "http://localhost:3000"
murfey_url: str = "http://localhost:8000"
rsync_url: str = ""
# Data analysis plugins
external_executables: dict[str, str] = {}
external_executables_eer: dict[str, str] = {}
external_environment: dict[str, str] = {}
plugin_packages: dict[str, Path] = {}

# Server and network setup --------------------------------------------------------
# Configurations and URLs
security_configuration_path: Optional[Path] = None
murfey_url: str = "http://localhost:8000"
frontend_url: str = "http://localhost:3000"
instrument_server_url: str = "http://localhost:8001"

# Messaging queues
failure_queue: str = ""
node_creator_queue: str = "node_creator"
notifications_queue: str = "pato_notification"

class Config:
"""
Inner class that defines this model's parsing and serialising behaviour
"""

def from_file(config_file_path: Path, instrument: str = "") -> Dict[str, MachineConfig]:
extra = Extra.allow
json_encoders = {
Path: str,
}


def from_file(config_file_path: Path, instrument: str = "") -> dict[str, MachineConfig]:
with open(config_file_path, "r") as config_stream:
config = yaml.safe_load(config_stream)
return {
Expand All @@ -83,22 +114,36 @@ def from_file(config_file_path: Path, instrument: str = "") -> Dict[str, Machine


class Security(BaseModel):
# Murfey database settings
murfey_db_credentials: Path
crypto_key: str
auth_key: str = ""
sqlalchemy_pooling: bool = True

# ISPyB settings
ispyb_credentials: Optional[Path] = None

# Murfey server connection settings
auth_algorithm: str = ""
auth_key: str = ""
auth_type: Literal["password", "cookie"] = "password"
auth_url: str = ""
sqlalchemy_pooling: bool = True
allow_origins: List[str] = ["*"]
cookie_key: str = ""
session_validation: str = ""
session_token_timeout: Optional[int] = None
auth_type: Literal["password", "cookie"] = "password"
cookie_key: str = ""
allow_origins: list[str] = ["*"]

# RabbitMQ settings
rabbitmq_credentials: Path
feedback_queue: str = "murfey_feedback"

# Graylog settings
graylog_host: str = ""
graylog_port: Optional[int] = None
ispyb_credentials: Optional[Path] = None

class Config:
json_encoders = {
Path: str,
}

@validator("graylog_port")
def check_port_present_if_host_is(
Expand Down Expand Up @@ -158,7 +203,7 @@ def get_security_config() -> Security:


@lru_cache(maxsize=1)
def get_machine_config(instrument_name: str = "") -> Dict[str, MachineConfig]:
def get_machine_config(instrument_name: str = "") -> dict[str, MachineConfig]:
machine_config = {
"": MachineConfig(
acquisition_software=[],
Expand Down
3 changes: 0 additions & 3 deletions src/murfey/util/instrument_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@

from pydantic import BaseModel

from murfey.util.config import MachineConfig


class MultigridWatcherSpec(BaseModel):
source: Path
configuration: MachineConfig
label: str
visit: str
instrument_name: str
Expand Down