Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b13c4ad
Modified workflow entry points to return a dictionary so that we can …
tieneupin Oct 27, 2025
51717c4
Migrated 'data_collection_group' feedback-callback block into its own…
tieneupin Oct 27, 2025
97fef8d
Migrated 'data_collection' feedback-callback block into its own workf…
tieneupin Oct 28, 2025
005f2c6
Merged recent changes from 'main' branch
tieneupin Oct 28, 2025
e5e4506
Updated tomo picking workflow to return a dict instead of a boolean
tieneupin Oct 28, 2025
add807f
Fixed wrong extraction of results from 'do_insert_data_collection' ca…
tieneupin Oct 28, 2025
00730cb
Added logs
tieneupin Oct 28, 2025
33d84a2
Migrated 'atlas_update' feedback-callback block to its own workflows …
tieneupin Oct 28, 2025
9034eb8
'do_create_ispyb_job' function in TransportManager should return a di…
tieneupin Oct 28, 2025
4591e76
Migrated 'processing_job' feedback-callback block to its own workflow…
tieneupin Oct 28, 2025
95512ff
'ExtendedRecord' class and '_register' set of functions can now be re…
tieneupin Oct 28, 2025
a71358c
Added logs
tieneupin Oct 28, 2025
b39fd01
Removed previous version restrictions on fastapi and stomp-py due to …
tieneupin Oct 28, 2025
424dafe
Added initial test for 'feedback_callback' block to check that entry …
tieneupin Oct 28, 2025
c0115ad
Added unit test for 'register_atlas_update'
tieneupin Oct 28, 2025
83dfb75
Annotated int only needed on the API endpoints that call 'register_gr…
tieneupin Oct 28, 2025
a42f8d9
Import module for test only after mocking out the functions used to g…
tieneupin Oct 28, 2025
ce56f3a
Added unit test for 'register_data_collection' workflow
tieneupin Oct 29, 2025
8057a5c
Removed unneeded 'if _transport_object:' logic block
tieneupin Oct 29, 2025
e83b703
Fixed broken test iterations for 'register_data_collection'
tieneupin Oct 29, 2025
62d4724
Added unit test for 'register_data_collection_group' workflow
tieneupin Oct 29, 2025
7de09ab
Some formatting
tieneupin Oct 29, 2025
cb94acb
Added unit test for 'register_processing_job' workflow
tieneupin Oct 29, 2025
55fda57
Removed unreachable code found by CodeQL
tieneupin Oct 29, 2025
bbf6956
'backports.entry_points_selectable' no longer needed as a package dep…
tieneupin Oct 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,14 @@ developer = [
]
instrument-server = [
"aiohttp",
"fastapi[standard]<0.116.0",
"fastapi[standard-no-fastapi-cloud-cli]>=0.116.0",
"python-jose",
]
server = [
"aiohttp",
"cryptography",
"fastapi[standard]<0.116.0",
"fastapi[standard-no-fastapi-cloud-cli]>=0.116.0",
"graypy",
"ispyb>=10.2.4", # Responsible for setting requirements for SQLAlchemy and mysql-connector-python;
"jinja2",
"mrcfile",
Expand All @@ -73,7 +74,7 @@ server = [
"python-jose[cryptography]",
"sqlalchemy[postgresql]", # Add as explicit dependency
"sqlmodel",
"stomp-py<=8.1.0", # 8.1.1 (released 2024-04-06) doesn't work with our project
"stomp-py>8.1.1", # 8.1.1 (released 2024-04-06) doesn't work with our project
"zocalo>=1",
]
[project.urls]
Expand All @@ -100,14 +101,18 @@ GitHub = "https://github.com/DiamondLightSource/python-murfey"
[project.entry-points."murfey.config.extraction"]
"murfey_machine" = "murfey.util.config:get_extended_machine_config"
[project.entry-points."murfey.workflows"]
"atlas_update" = "murfey.workflows.register_atlas_update:run"
"clem.align_and_merge" = "murfey.workflows.clem.align_and_merge:submit_cluster_request"
"clem.process_raw_lifs" = "murfey.workflows.clem.process_raw_lifs:zocalo_cluster_request"
"clem.process_raw_tiffs" = "murfey.workflows.clem.process_raw_tiffs:zocalo_cluster_request"
"clem.register_align_and_merge_result" = "murfey.workflows.clem.register_align_and_merge_results:register_align_and_merge_result"
"clem.register_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:run"
"data_collection" = "murfey.workflows.register_data_collection:run"
"data_collection_group" = "murfey.workflows.register_data_collection_group:run"
"pato" = "murfey.workflows.notifications:notification_setup"
"picked_particles" = "murfey.workflows.spa.picking:particles_picked"
"picked_tomogram" = "murfey.workflows.tomo.picking:picked_tomogram"
"processing_job" = "murfey.workflows.register_processing_job:run"
"spa.flush_spa_preprocess" = "murfey.workflows.spa.flush_spa_preprocess:flush_spa_preprocess"

[tool.setuptools]
Expand Down
331 changes: 4 additions & 327 deletions src/murfey/server/feedback.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/murfey/server/ispyb.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ def do_create_ispyb_job(
dcid = record.dataCollectionId
if not dcid:
log.error("Can not create job: DCID not specified")
return False
return {"success": False, "return_value": None}

jp = self.ispyb.mx_processing.get_job_params()
jp["automatic"] = record.automatic
Expand Down
2 changes: 1 addition & 1 deletion src/murfey/workflows/clem/align_and_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,4 @@ def submit_cluster_request(
},
new_connection=True,
)
return True
return {"success": True}
10 changes: 5 additions & 5 deletions src/murfey/workflows/clem/register_align_and_merge_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def parse_stringified_list(cls, value):

def register_align_and_merge_result(
message: dict, murfey_db: Session, demo: bool = False
) -> bool:
) -> dict[str, bool]:
"""
session_id (recipe)
register (wrapper)
Expand Down Expand Up @@ -69,13 +69,13 @@ def register_align_and_merge_result(
"Invalid type for align-and-merge processing result: "
f"{type(message['result'])}"
)
return False
return {"success": False, "requeue": False}
except Exception:
logger.error(
"Exception encountered when parsing align-and-merge processing result: \n"
f"{traceback.format_exc()}"
)
return False
return {"success": False, "requeue": False}

# Outer try-finally block for tidying up database-related section of function
try:
Expand Down Expand Up @@ -103,8 +103,8 @@ def register_align_and_merge_result(
f"{result.series_name!r}: \n"
f"{traceback.format_exc()}"
)
return False
return {"success": False, "requeue": False}

return True
return {"success": True}
finally:
murfey_db.close()
14 changes: 7 additions & 7 deletions src/murfey/workflows/clem/register_preprocessing_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class CLEMPreprocessingResult(BaseModel):
extent: list[float]


def run(message: dict, murfey_db: Session, demo: bool = False) -> bool:
def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool]:
session_id: int = (
int(message["session_id"])
if not isinstance(message["session_id"], int)
Expand All @@ -72,13 +72,13 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> bool:
logger.error(
f"Invalid type for TIFF preprocessing result: {type(message['result'])}"
)
return False
return {"success": False, "requeue": False}
except Exception:
logger.error(
"Exception encountered when parsing TIFF preprocessing result: \n"
f"{traceback.format_exc()}"
)
return False
return {"success": False, "requeue": False}

# Outer try-finally block for tidying up database-related section of function
try:
Expand Down Expand Up @@ -181,7 +181,7 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> bool:
f"{result.series_name!r}: \n"
f"{traceback.format_exc()}"
)
return False
return {"success": False, "requeue": False}

# Load instrument name
try:
Expand All @@ -197,7 +197,7 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> bool:
f"Error requesting data from database for {result.series_name!r} series: \n"
f"{traceback.format_exc()}"
)
return False
return {"success": False, "requeue": False}

# Construct list of files to use for image alignment and merging steps
image_combos_to_process = [
Expand Down Expand Up @@ -234,12 +234,12 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> bool:
f"{result.series_name!r} series",
exc_info=True,
)
return False
return {"success": False, "requeue": False}
logger.info(
"Successfully requested image alignment and merging job for "
f"{result.series_name!r} series"
)
return True
return {"success": True}

finally:
murfey_db.close()
4 changes: 2 additions & 2 deletions src/murfey/workflows/notifications/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

def notification_setup(
message: dict, murfey_db: Session, num_instances_between_triggers: int = 500
) -> bool:
) -> dict[str, bool]:
parameters: Dict[str, Tuple[float, float]] = {}
for k in message.keys():
parameter_name = ""
Expand Down Expand Up @@ -48,4 +48,4 @@ def notification_setup(
murfey_db.add_all(existing_notification_parameters + new_notification_parameters)
murfey_db.commit()
murfey_db.close()
return True
return {"success": True}
36 changes: 36 additions & 0 deletions src/murfey/workflows/register_atlas_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import logging

from backports.entry_points_selectable import entry_points
Comment thread
tieneupin marked this conversation as resolved.
Outdated
from sqlmodel.orm.session import Session as SQLModelSession

from murfey.server import _transport_object

logger = logging.getLogger("murfey.workflows.register_atlas_update")


def run(
message: dict,
murfey_db: SQLModelSession, # Defined for compatibility but unused
demo: bool = False,
):
if _transport_object is None:
logger.error("Unable to find transport manager")
return {"success": False, "requeue": False}

logger.info(f"Registering updated atlas: \n{message}")

_transport_object.do_update_atlas(
message["atlas_id"],
message["atlas"],
message["atlas_pixel_size"],
message["sample"],
)
if dcg_hooks := entry_points().select(
group="murfey.hooks", name="data_collection_group"
):
try:
for hook in dcg_hooks:
hook.load()(message["dcgid"], session_id=message["session_id"])
except Exception:
logger.error("Call to data collection group hook failed", exc_info=True)
return {"success": True}
104 changes: 104 additions & 0 deletions src/murfey/workflows/register_data_collection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import logging

import ispyb.sqlalchemy._auto_db_schema as ISPyBDB
from sqlmodel import select
from sqlmodel.orm.session import Session as SQLModelSession

import murfey.util.db as MurfeyDB
from murfey.server import _transport_object
from murfey.server.ispyb import ISPyBSession, get_session_id
from murfey.util import sanitise

logger = logging.getLogger("murfey.workflows.register_data_collection")


def run(
message: dict, murfey_db: SQLModelSession, demo: bool = False
) -> dict[str, bool]:
# Fail immediately if transport manager was not provided
if _transport_object is None:
logger.error("Unable to find transport manager")
return {"success": False, "requeue": False}

logger.info(f"Registering the following data collection: \n{message}")

murfey_session_id = message["session_id"]
ispyb_session_id = get_session_id(
microscope=message["microscope"],
proposal_code=message["proposal_code"],
proposal_number=message["proposal_number"],
visit_number=message["visit_number"],
db=ISPyBSession(),
)
dcg = murfey_db.exec(
select(MurfeyDB.DataCollectionGroup)
.where(MurfeyDB.DataCollectionGroup.session_id == murfey_session_id)
.where(MurfeyDB.DataCollectionGroup.tag == message["source"])
).all()
if dcg:
dcgid = dcg[0].id
# flush_data_collections(message["source"], murfey_db)
else:
logger.warning(
"No data collection group ID was found for image directory "
f"{sanitise(message['image_directory'])} and source "
f"{sanitise(message['source'])}"
)
return {"success": False, "requeue": True}

if dc_murfey := murfey_db.exec(
select(MurfeyDB.DataCollection)
.where(MurfeyDB.DataCollection.tag == message.get("tag"))
.where(MurfeyDB.DataCollection.dcg_id == dcgid)
).all():
dcid = dc_murfey[0].id
else:
if ispyb_session_id is None:
murfey_dc = MurfeyDB.DataCollection(
tag=message.get("tag"),
dcg_id=dcgid,
)
else:
record = ISPyBDB.DataCollection(
SESSIONID=ispyb_session_id,
experimenttype=message["experiment_type"],
imageDirectory=message["image_directory"],
imageSuffix=message["image_suffix"],
voltage=message["voltage"],
dataCollectionGroupId=dcgid,
pixelSizeOnImage=message["pixel_size"],
imageSizeX=message["image_size_x"],
imageSizeY=message["image_size_y"],
slitGapHorizontal=message.get("slit_width"),
magnification=message.get("magnification"),
exposureTime=message.get("exposure_time"),
totalExposedDose=message.get("total_exposed_dose"),
c2aperture=message.get("c2aperture"),
phasePlate=int(message.get("phase_plate", 0)),
)
dcid = _transport_object.do_insert_data_collection(
record,
tag=(
message.get("tag")
if message["experiment_type"] == "tomography"
else ""
),
).get("return_value", None)
murfey_dc = MurfeyDB.DataCollection(
id=dcid,
tag=message.get("tag"),
dcg_id=dcgid,
)
murfey_db.add(murfey_dc)
murfey_db.commit()
dcid = murfey_dc.id
murfey_db.close()

if dcid is None:
logger.error(
"Failed to register the following data collection: \n"
f"{message} \n"
"Requeueing message"
)
return {"success": False, "requeue": True}
return {"success": True}
Loading