Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/murfey/client/analyser.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ def _analyse(self):
)
self.post_transfer(transferred_file)
self.queue.task_done()
logger.debug("Analyer thread has stopped analysing incoming files")
self.notify(final=True)

def _xml_file(self, data_file: Path) -> Path:
Expand Down Expand Up @@ -403,6 +404,12 @@ def request_stop(self):
self._stopping = True
self._halt_thread = True

def is_safe_to_stop(self):
"""
Checks that the analyser thread is safe to stop
"""
return self._stopping and self._halt_thread and not self.queue.qsize()

def stop(self):
logger.debug("Analyser thread stop requested")
self._stopping = True
Expand All @@ -412,5 +419,8 @@ def stop(self):
self.queue.put(None)
self.thread.join()
except Exception as e:
logger.error(f"Exception encountered while stopping analyser: {e}")
logger.error(
f"Exception encountered while stopping Analyser: {e}",
exc_info=True,
)
logger.debug("Analyser thread stop completed")
98 changes: 71 additions & 27 deletions src/murfey/client/multigrid_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import subprocess
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime
from functools import partial
Expand Down Expand Up @@ -36,6 +37,7 @@ class MultigridController:
rsync_url: str = ""
rsync_module: str = "data"
demo: bool = False
finalising: bool = False
dormant: bool = False
multigrid_watcher_active: bool = True
processing_enabled: bool = True
Expand Down Expand Up @@ -117,34 +119,62 @@ def __post_init__(self):

def _multigrid_watcher_finalised(self):
self.multigrid_watcher_active = False
self.dormancy_check()

def dormancy_check(self):
def is_ready_for_dormancy(self):
"""
When the multigrid watcher is no longer active, sends a request to safely stop
the analyser and file watcher threads, then checks to see that those threads
and the RSyncer processes associated with the current session have all been
safely stopped
"""
log.debug(
f"Starting dormancy check for MultigridController for session {self.session_id}"
)
if not self.multigrid_watcher_active:
if (
for a in self.analysers.values():
if a.is_safe_to_stop():
a.stop()
for w in self._environment.watchers.values():
if w.is_safe_to_stop():
w.stop()
return (
all(r._finalised for r in self.rsync_processes.values())
and not any(a.thread.is_alive() for a in self.analysers.values())
and not any(
w.thread.is_alive() for w in self._environment.watchers.values()
)
):
)
log.debug(f"Multigrid watcher for session {self.session_id} is still active")
return False

def clean_up_once_dormant(self, running_threads: list[threading.Thread]):
"""
A function run in a separate thread that runs the post-session cleanup logic
once all threads associated with this current session are halted, and marks
the controller as being fully dormant after doing so.
"""
for thread in running_threads:
thread.join()
log.debug(f"RSyncer cleanup thread {thread.ident} has stopped safely")
while not self.is_ready_for_dormancy():
time.sleep(10)

# Once all threads are stopped, remove session from the database
log.debug(
f"Submitting request to remove session {self.session_id} from database"
)
response = capture_delete(
f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self.session_id)}",
)
success = response.status_code == 200 if response else False
if not success:
log.warning(f"Could not delete database data for {self.session_id}")

def call_remove_session():
response = capture_delete(
f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self.session_id)}",
)
success = response.status_code == 200 if response else False
if not success:
log.warning(
f"Could not delete database data for {self.session_id}"
)
# Send message to frontend to trigger a refresh
self.ws.send(json.dumps({"message": "refresh"}))

dormancy_thread = threading.Thread(
name=f"Session deletion thread {self.session_id}",
target=call_remove_session,
)
dormancy_thread.start()
self.dormant = True
# Mark as dormant
self.dormant = True

def abandon(self):
for a in self.analysers.values():
Expand All @@ -155,12 +185,26 @@ def abandon(self):
p.request_stop()

def finalise(self):
self.finalising = True
for a in self.analysers.values():
a.request_stop()
log.debug(f"Stop request sent to analyser {a}")
for w in self._environment.watchers.values():
w.request_stop()
log.debug(f"Stop request sent to watcher {w}")
rsync_finaliser_threads = []
for p in self.rsync_processes.keys():
self._finalise_rsyncer(p)
# Collect the running rsyncer finaliser threads to pass to the dormancy checker
rsync_finaliser_threads.append(self._finalise_rsyncer(p))
log.debug(f"Finalised rsyncer {p}")

# Run the session cleanup function in a separate thread
cleanup_upon_dormancy_thread = threading.Thread(
target=self.clean_up_once_dormant,
args=[rsync_finaliser_threads],
daemon=True,
)
cleanup_upon_dormancy_thread.start()

def update_visit_time(self, new_end_time: datetime):
# Convert the received server timestamp into the local equivalent
Expand Down Expand Up @@ -235,15 +279,19 @@ def _rsyncer_stopped(self, source: Path, explicit_stop: bool = False):
capture_post(stop_url, json={"source": str(source)})

def _finalise_rsyncer(self, source: Path):
"""
Starts a new Rsyncer thread that cleans up the directories, and returns that
thread to be managed by a central thread.
"""
finalise_thread = threading.Thread(
name=f"Controller finaliser thread ({source})",
target=partial(
self.rsync_processes[source].finalise, callback=self.dormancy_check
),
target=self.rsync_processes[source].finalise,
kwargs={"thread": False},
daemon=True,
)
finalise_thread.start()
log.debug(f"Started RSync cleanup for {str(source)}")
return finalise_thread

def _restart_rsyncer(self, source: Path):
self.rsync_processes[source].restart()
Expand Down Expand Up @@ -368,7 +416,6 @@ def rsync_result(update: RSyncerUpdate):
)
else:
self.analysers[source].subscribe(self._data_collection_form)
self.analysers[source].subscribe(self.dormancy_check, final=True)
self.analysers[source].start()
if transfer:
self.rsync_processes[source].subscribe(self.analysers[source].enqueue)
Expand Down Expand Up @@ -408,9 +455,6 @@ def _rsync_update_converter(p: Path) -> None:
),
secondary=True,
)
self._environment.watchers[source].subscribe(
self.dormancy_check, final=True
)
self._environment.watchers[source].start()

def _data_collection_form(self, response: dict):
Expand Down
20 changes: 16 additions & 4 deletions src/murfey/client/watchdir.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,27 @@ def request_stop(self):
self._stopping = True
self._halt_thread = True

def is_safe_to_stop(self):
"""
Checks that the directory watcher thread is safe to stop
"""
return self._stopping and self._halt_thread and not self.queue.qsize()

def stop(self):
log.debug("DirWatcher thread stop requested")
self._stopping = True
if self.thread.is_alive():
self.queue.join()

self._halt_thread = True
if self.thread.is_alive():
self.queue.put(None)
self.thread.join()
try:
if self.thread.is_alive():
self.queue.put(None)
self.thread.join()
except Exception as e:
log.error(
f"Exception encountered while stopping DirWatcher: {e}",
exc_info=True,
)
log.debug("DirWatcher thread stop completed")

def _process(self):
Expand All @@ -94,6 +105,7 @@ def _process(self):
modification_time=modification_time, transfer_all=self._transfer_all
)
time.sleep(15)
log.debug(f"DirWatcher {self} has stopped scanning")
self.notify(final=True)

def scan(self, modification_time: float | None = None, transfer_all: bool = False):
Expand Down
21 changes: 16 additions & 5 deletions src/murfey/instrument_server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,32 @@ def start_instrument_server():

LogFilter.install()

# Log everything from Murfey by default
logging.getLogger("murfey").setLevel(logging.DEBUG)

# Show only logs at INFO level and above in the console
rich_handler = RichHandler(enable_link_path=False)
logging.getLogger("murfey").setLevel(logging.INFO)
rich_handler.setLevel(logging.INFO)
logging.getLogger("murfey").addHandler(rich_handler)
logging.getLogger("fastapi").addHandler(rich_handler)
logging.getLogger("uvicorn").addHandler(rich_handler)

# Create a websocket app to connect to the backend
ws = murfey.client.websocket.WSApp(
server=read_config().get("Murfey", "server", fallback=""),
register_client=False,
)

handler = CustomHandler(ws.send)
logging.getLogger("murfey").addHandler(handler)
logging.getLogger("fastapi").addHandler(handler)
logging.getLogger("uvicorn").addHandler(handler)
# Forward DEBUG levels logs and above from Murfey to the backend
murfey_ws_handler = CustomHandler(ws.send)
murfey_ws_handler.setLevel(logging.DEBUG)
logging.getLogger("murfey").addHandler(murfey_ws_handler)

# Forward only INFO level logs and above for other packages
other_ws_handler = CustomHandler(ws.send)
other_ws_handler.setLevel(logging.INFO)
logging.getLogger("fastapi").addHandler(other_ws_handler)
logging.getLogger("uvicorn").addHandler(other_ws_handler)

logger.info(
f"Starting Murfey server version {murfey.__version__}, listening on {args.host}:{args.port}"
Expand Down
34 changes: 25 additions & 9 deletions src/murfey/instrument_server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from functools import partial
from logging import getLogger
from pathlib import Path
from threading import Lock
from typing import Annotated, Any, Optional
from urllib.parse import urlparse

Expand All @@ -31,6 +32,7 @@
watchers: dict[str | int, MultigridDirWatcher] = {}
rsyncers: dict[str, RSyncer] = {}
controllers: dict[int, MultigridController] = {}
controller_lock = Lock()
data_collection_parameters: dict = {}
tokens = {}

Expand Down Expand Up @@ -145,22 +147,26 @@
def setup_multigrid_watcher(
session_id: MurfeySessionID, watcher_spec: MultigridWatcherSpec
):
# Remove dormant controllers from memory
with controller_lock:
controllers_to_remove = [
sid for sid, controller in controllers.items() if controller.dormant
]
for sid in controllers_to_remove:
del controllers[sid]

# Return 'True' if controllers are already set up
if controllers.get(session_id) is not None:
return {"success": True}

label = watcher_spec.label
for sid, controller in controllers.items():
if controller.dormant:
del controllers[sid]

# Load machine config as dictionary
machine_config: dict[str, Any] = requests.get(
f"{_get_murfey_url()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=sanitise_nonpath(watcher_spec.instrument_name))}",
headers={"Authorization": f"Bearer {tokens[session_id]}"},
).json()

# Set up the multigrid controll controller
# Set up the multigrid controller
label = watcher_spec.label
controllers[session_id] = MultigridController(
[],
watcher_spec.visit,
Expand Down Expand Up @@ -205,7 +211,11 @@
return {"success": False}
if not process:
watchers[session_id]._analyse = False
watchers[session_id].start()
try:
watchers[session_id].start()
# Ignore RuntimeError; this happens when reconnecting after a backend server restart
except RuntimeError:
logger.debug(f"MultigridWatcher for session {session_id} is already active")
Comment thread Dismissed
return {"success": True}


Expand All @@ -216,11 +226,15 @@


@router.get("/sessions/{session_id}/multigrid_controller/status")
def check_multigrid_controller_exists(
def check_multigrid_controller_status(
session_id: MurfeySessionID,
):
if controllers.get(session_id, None) is not None:
return {"exists": True}
return {
"dormant": controllers[session_id].dormant,
"exists": True,
"finalising": controllers[session_id].finalising,
}
return {"exists": False}


Expand Down Expand Up @@ -268,7 +282,9 @@
@router.post("/sessions/{session_id}/finalise_session")
def finalise_session(session_id: MurfeySessionID):
watchers[session_id].request_stop()
logger.debug(f"Stop request sent to multigrid watcher for session {session_id}")
Comment thread Dismissed
controllers[session_id].finalise()
logger.debug(f"Stop orders sent to multigrid controller for session {session_id} ")
Comment thread Dismissed
return {"success": True}


Expand Down
6 changes: 4 additions & 2 deletions src/murfey/server/api/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async def check_if_session_is_active(


@router.get("/sessions/{session_id}/multigrid_controller/status")
async def check_multigrid_controller_exists(session_id: MurfeySessionID, db=murfey_db):
async def check_multigrid_controller_status(session_id: MurfeySessionID, db=murfey_db):
session = db.exec(select(Session).where(Session.id == session_id)).one()
instrument_name = session.instrument_name
machine_config = get_machine_config(instrument_name=instrument_name)[
Expand All @@ -114,7 +114,7 @@ async def check_multigrid_controller_exists(session_id: MurfeySessionID, db=murf
)
async with aiohttp.ClientSession() as clientsession:
async with clientsession.get(
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'check_multigrid_controller_exists', session_id=session_id)}",
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'check_multigrid_controller_status', session_id=session_id)}",
headers={
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
},
Expand Down Expand Up @@ -433,6 +433,7 @@ async def finalise_rsyncer(

@router.post("/sessions/{session_id}/finalise_session")
async def finalise_session(session_id: MurfeySessionID, db=murfey_db):
log.debug(f"Finalising session {session_id}")
Comment thread Dismissed
data = {}
instrument_name = (
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
Expand All @@ -449,6 +450,7 @@ async def finalise_session(session_id: MurfeySessionID, db=murfey_db):
},
) as resp:
data = await resp.json()
log.debug(f"Received response {data}")
return data


Expand Down
Loading
Loading