Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/murfey/client/multigrid_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@
tag: str = "",
limited: bool = False,
):
log.info(f"starting multigrid rsyncer: {source}")
log.info(f"Starting multigrid rsyncer: {source}")
log.debug(f"Analysis of {source} is {('enabled' if analyse else 'disabled')}")

Check warning on line 154 in src/murfey/client/multigrid_control.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/multigrid_control.py#L153-L154

Added lines #L153 - L154 were not covered by tests
destination_overrides = destination_overrides or {}
machine_data = requests.get(
f"{self._environment.url.geturl()}/instruments/{self.instrument_name}/machine"
Expand Down
38 changes: 25 additions & 13 deletions src/murfey/client/watchdir_multigrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@
):
super().__init__()
self._basepath = Path(path)
self._skip_existing_processing = skip_existing_processing
self._seen_dirs: List[Path] = []
self._stopping = False
self._machine_config = machine_config
self._seen_dirs: List[Path] = []

Check warning on line 25 in src/murfey/client/watchdir_multigrid.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/watchdir_multigrid.py#L25

Added line #L25 was not covered by tests
self.thread = threading.Thread(
name=f"MultigridDirWatcher {self._basepath}",
target=self._process,
daemon=True,
)
# Toggleable settings
self._analyse = True
self._skip_existing_processing = skip_existing_processing
self._stopping = False

Check warning on line 34 in src/murfey/client/watchdir_multigrid.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/client/watchdir_multigrid.py#L32-L34

Added lines #L32 - L34 were not covered by tests

def start(self):
if self.thread.is_alive():
Expand Down Expand Up @@ -60,8 +62,14 @@
include_mid_path=False,
use_suggested_path=False,
analyse=(
d.name
in self._machine_config["analyse_created_directories"]
(
d.name
in self._machine_config[
"analyse_created_directories"
]
)
if self._analyse
else False
),
tag="atlas",
)
Expand All @@ -72,23 +80,25 @@
d,
extra_directory=f"metadata_{d.name}",
include_mid_path=False,
analyse=True, # not (first_loop and self._skip_existing_processing),
analyse=self._analyse,
limited=True,
tag="metadata",
)
self._seen_dirs.append(d)
processing_started = False
for d02 in (d.parent.parent / d.name).glob("Images-Disc*"):
if d02 not in self._seen_dirs:
# if skip exisiting processing is set then do not process for any
# data directories found on the first loop
# this allows you to avoid triggering processing again if murfey is restarted
# If 'skip_existing_processing' is set, do not process for
# any data directories found on the first loop.
# This allows you to avoid triggering processing again if Murfey is restarted
self.notify(
d02,
include_mid_path=False,
remove_files=True,
analyse=not (
first_loop and self._skip_existing_processing
analyse=(
not (first_loop and self._skip_existing_processing)
if self._analyse
else False
),
tag="fractions",
)
Expand All @@ -104,8 +114,10 @@
self.notify(
d02,
include_mid_path=False,
analyse=not (
first_loop and self._skip_existing_processing
analyse=(
not (first_loop and self._skip_existing_processing)
if self._analyse
else False
),
tag="fractions",
)
Expand Down
14 changes: 8 additions & 6 deletions src/murfey/instrument_server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from functools import partial
from logging import getLogger
from pathlib import Path
from typing import Annotated, Any, Dict, List, Optional, Union
from typing import Annotated, Any, Optional
from urllib.parse import urlparse

import requests
Expand All @@ -27,9 +27,9 @@

logger = getLogger("murfey.instrument_server.api")

watchers: Dict[Union[str, int], MultigridDirWatcher] = {}
rsyncers: Dict[str, RSyncer] = {}
controllers: Dict[int, MultigridController] = {}
watchers: dict[str | int, MultigridDirWatcher] = {}
rsyncers: dict[str, RSyncer] = {}
controllers: dict[int, MultigridController] = {}
data_collection_parameters: dict = {}
tokens = {}

Expand Down Expand Up @@ -186,9 +186,11 @@


@router.post("/sessions/{session_id}/start_multigrid_watcher")
def start_multigrid_watcher(session_id: MurfeySessionID):
def start_multigrid_watcher(session_id: MurfeySessionID, process: bool = True):
if watchers.get(session_id) is None:
return {"success": False}
if not process:
watchers[session_id]._analyse = False

Check warning on line 193 in src/murfey/instrument_server/api.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/instrument_server/api.py#L193

Added line #L193 was not covered by tests
watchers[session_id].start()
return {"success": True}

Expand Down Expand Up @@ -319,7 +321,7 @@
)
def get_possible_gain_references(
instrument_name: str, session_id: MurfeySessionID
) -> List[File]:
) -> list[File]:
machine_config = requests.get(
f"{_get_murfey_url()}/instruments/{sanitise_nonpath(instrument_name)}/machine",
headers={"Authorization": f"Bearer {tokens[session_id]}"},
Expand Down
32 changes: 26 additions & 6 deletions src/murfey/server/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,13 +367,22 @@
def increment_rsync_file_count(
visit_name: str, rsyncer_info: RsyncerInfo, db=murfey_db
):
rsync_instance = db.exec(
select(RsyncInstance).where(
RsyncInstance.source == rsyncer_info.source,
RsyncInstance.destination == rsyncer_info.destination,
RsyncInstance.session_id == rsyncer_info.session_id,
try:
rsync_instance = db.exec(

Check warning on line 371 in src/murfey/server/api/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/__init__.py#L370-L371

Added lines #L370 - L371 were not covered by tests
select(RsyncInstance).where(
RsyncInstance.source == rsyncer_info.source,
RsyncInstance.destination == rsyncer_info.destination,
RsyncInstance.session_id == rsyncer_info.session_id,
)
).one()
except Exception:
log.error(

Check warning on line 379 in src/murfey/server/api/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/__init__.py#L378-L379

Added lines #L378 - L379 were not covered by tests
f"Failed to find rsync instance for visit {sanitise(visit_name)} "
"with the following properties: \n"
f"{rsyncer_info.dict()}",
exc_info=True,
)
).one()
return None

Check warning on line 385 in src/murfey/server/api/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/__init__.py#L385

Added line #L385 was not covered by tests
rsync_instance.files_counted += rsyncer_info.increment_count
db.add(rsync_instance)
db.commit()
Expand Down Expand Up @@ -1976,6 +1985,17 @@
return sid


@router.post("/sessions/{session_id}")
def update_session(
session_id: MurfeySessionID, process: bool = True, db=murfey_db
) -> None:
session = db.exec(select(Session).where(Session.id == session_id)).one()
session.process = process
db.add(session)
db.commit()
Comment thread Fixed
return None

Check warning on line 1996 in src/murfey/server/api/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/__init__.py#L1992-L1996

Added lines #L1992 - L1996 were not covered by tests


@router.put("/sessions/{session_id}/current_gain_ref")
def update_current_gain_ref(
session_id: MurfeySessionID, new_gain_ref: CurrentGainRef, db=murfey_db
Expand Down
13 changes: 9 additions & 4 deletions src/murfey/server/api/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,21 +142,26 @@
@router.post("/sessions/{session_id}/start_multigrid_watcher")
async def start_multigrid_watcher(session_id: MurfeySessionID, db=murfey_db):
data = {}
instrument_name = (
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
)
session = db.exec(select(Session).where(Session.id == session_id)).one()
process = session.process
instrument_name = session.instrument_name

Check warning on line 147 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L145-L147

Added lines #L145 - L147 were not covered by tests
machine_config = get_machine_config(instrument_name=instrument_name)[
instrument_name
]
if machine_config.instrument_server_url:
log.debug(

Check warning on line 152 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L152

Added line #L152 was not covered by tests
f"Submitting request to start multigrid watcher for session {session_id} "
f"with processing {('enabled' if process else 'disabled')}"
Comment thread Dismissed
)
async with aiohttp.ClientSession() as clientsession:
async with clientsession.post(
f"{machine_config.instrument_server_url}/sessions/{session_id}/start_multigrid_watcher",
f"{machine_config.instrument_server_url}/sessions/{session_id}/start_multigrid_watcher?process={'true' if process else 'false'}",
headers={
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
},
) as resp:
data = await resp.json()
log.debug(f"Received response: {data}")

Check warning on line 164 in src/murfey/server/api/instrument.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/instrument.py#L164

Added line #L164 was not covered by tests
return data


Expand Down
16 changes: 10 additions & 6 deletions src/murfey/server/api/processing_parameters.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from logging import getLogger
from typing import Optional

import sqlalchemy
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from sqlmodel import Session, select
Expand All @@ -24,12 +25,15 @@
@router.get("/sessions/{session_id}/session_processing_parameters")
def get_session_processing_parameters(
session_id: MurfeySessionID, db: Session = murfey_db
) -> EditableSessionProcessingParameters:
proc_params = db.exec(
select(SessionProcessingParameters).where(
SessionProcessingParameters.session_id == session_id
)
).one()
) -> Optional[EditableSessionProcessingParameters]:
try:
proc_params = db.exec(

Check warning on line 30 in src/murfey/server/api/processing_parameters.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/processing_parameters.py#L29-L30

Added lines #L29 - L30 were not covered by tests
select(SessionProcessingParameters).where(
SessionProcessingParameters.session_id == session_id
)
).one()
except sqlalchemy.exc.NoResultFound:
return None

Check warning on line 36 in src/murfey/server/api/processing_parameters.py

View check run for this annotation

Codecov / codecov/patch

src/murfey/server/api/processing_parameters.py#L35-L36

Added lines #L35 - L36 were not covered by tests
return EditableSessionProcessingParameters(
gain_ref=proc_params.gain_ref,
dose_per_frame=proc_params.dose_per_frame,
Expand Down
1 change: 1 addition & 0 deletions src/murfey/util/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class Session(SQLModel, table=True): # type: ignore
started: bool = Field(default=False)
current_gain_ref: str = Field(default="")
instrument_name: str = Field(default="")
process: bool = Field(default=True)

# CLEM Workflow

Expand Down