-
Notifications
You must be signed in to change notification settings - Fork 41
[feature][MSD-506] Meteor store annoted data #3451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
1f2c680
895588f
c94530d
547d2c9
8c7f409
279f1b4
510e3ab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| #!/usr/bin/env python3 | ||
| # -*- coding: utf-8 -*- | ||
| """Fetch DataCollector ZIP samples from S3. | ||
|
|
||
| Examples: | ||
| ./scripts/odemis-dc-fetch.py | ||
| ./scripts/odemis-dc-fetch.py --output ./downloads | ||
| ./scripts/odemis-dc-fetch.py --event z_stack_acquired | ||
| ./scripts/odemis-dc-fetch.py --since 2026-03-01 | ||
| ./scripts/odemis-dc-fetch.py --host meteor-5099 | ||
| ./scripts/odemis-dc-fetch.py --host meteor-5099,atlas-001,secom-22 | ||
| ./scripts/odemis-dc-fetch.py --bucket delmic-odemis-collect-test --region eu-west-1 | ||
| ./scripts/odemis-dc-fetch.py --since 2026-03-01T12:30:00 --event z_stack_acquired --output ./dc_samples | ||
| """ | ||
|
|
||
| import logging | ||
| import sys | ||
|
|
||
| from odemis.util.dc_fetch import main | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| logging.basicConfig(level=logging.INFO) | ||
| rc = main(sys.argv[1:]) | ||
| logging.shutdown() | ||
| sys.exit(rc) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,10 +51,11 @@ | |
| MicroscopePostureManager, | ||
| ) | ||
| from odemis.acq.stitching._tiledacq import SAFE_REL_RANGE_DEFAULT | ||
| from odemis.acq.stream import Stream, StaticFluoStream | ||
| from odemis.acq.stream import Stream, StaticFluoStream, StaticSEMStream, StaticFIBStream | ||
| from odemis.dataio import find_fittest_converter, get_available_formats | ||
| from odemis.util import dataio, executeAsyncTask | ||
| from odemis.util.comp import generate_zlevels | ||
| from odemis.util.datacollector import DataCollector | ||
| from odemis.util.dataio import data_to_static_streams, open_acquisition, splitext | ||
| from odemis.util.driver import estimate_stage_movement_time | ||
| from odemis.util.filename import create_filename | ||
|
|
@@ -70,6 +71,9 @@ | |
|
|
||
| REFERENCE_IMAGE_FILENAME = "Reference-Alignment-FIB.ome.tiff" | ||
|
|
||
| # Probability that a newly created feature is marked as eligible for data collection. | ||
| FEATURE_COLLECT_PROBABILITY = 0.2 | ||
|
|
||
| USER_MILLING_TASKS_PATH = os.path.expanduser("~/.config/odemis/milling_tasks.yaml") | ||
|
|
||
|
|
||
|
|
@@ -173,14 +177,19 @@ def __init__(self, name: str, | |
| stage_position: Dict[str, float], | ||
| fm_focus_position: Dict[str, float], | ||
| streams: Optional[List[Stream]] = None, | ||
| milling_tasks: Optional[Dict[str, MillingTaskSettings]] = None, correlation_data=None): | ||
| milling_tasks: Optional[Dict[str, MillingTaskSettings]] = None, | ||
| correlation_data=None, | ||
| collect: bool = False): | ||
| """ | ||
| :param name: (string) the feature name | ||
| :param stage_position: (dict) the stage position of the feature (stage-bare) | ||
| :param fm_focus_position: (dict) the focus position of the feature | ||
| :param streams: (List of StaticStream) list of acquired streams on this feature | ||
| :param correlation_data: (Dict[str,FIBFMCorrelationData]) Dictionary mapping the feature status to | ||
| FIBFMCorrelationData, where feature status like Active, Rough Milled or polished is the key. | ||
| :param collect: (bool) Whether this feature is eligible for data collection. | ||
| Defaults to False. The GUI sets this based on the per-project sampling | ||
| decision made when a project is opened or created. | ||
| """ | ||
| self.name = model.StringVA(name) | ||
| # FIXME: The 'position' parameter should eventually contain the SampleStage coordinates and not stage bare from the stage_position! | ||
|
|
@@ -211,6 +220,10 @@ def __init__(self, name: str, | |
| correlation_data = {} | ||
| self.correlation_data = correlation_data | ||
|
|
||
| # Whether this feature is eligible for data collection. | ||
| # Set by the GUI from the per-project sampling decision; persisted in features.json. | ||
| self.collect: bool = collect | ||
|
|
||
| # attributes for automated milling | ||
| self.path: str = None # TODO:support path creation here, rather than on milling data save | ||
| self.reference_image: model.DataArray = None | ||
|
|
@@ -318,6 +331,7 @@ def get_features_dict(features: List[CryoFeature]) -> Dict[str, str]: | |
| for feature in features: | ||
| feature_item = {'name': feature.name.value, | ||
| 'status': feature.status.value, | ||
| 'collect': feature.collect, | ||
| 'stage_position': feature.stage_position.value, | ||
| 'fm_focus_position': feature.fm_focus_position.value, | ||
| 'posture_positions': feature.posture_positions, | ||
|
|
@@ -351,7 +365,8 @@ def object_hook(self, obj): | |
| milling_task_json = obj.get('milling_tasks', {}) | ||
| feature = CryoFeature(name=obj['name'], | ||
| stage_position=stage_position, | ||
| fm_focus_position=fm_focus_position | ||
| fm_focus_position=fm_focus_position, | ||
| collect=obj.get('collect', False) | ||
| ) | ||
| feature.correlation_data = FIBFMCorrelationData.from_dict(correlation_data) if correlation_data else None | ||
| feature.status.value = obj['status'] | ||
|
|
@@ -471,6 +486,161 @@ def _create_fibsem_filename(filename: str, acq_type: str) -> str: | |
|
|
||
| return create_filename(path, ptn, ext, count="001") | ||
|
|
||
|
|
||
| def _is_zstack_stream(stream: "Stream") -> bool: | ||
| """Return True if the stream contains z-stack (3-D ZYX) data.""" | ||
| return hasattr(stream, "zIndex") | ||
|
|
||
|
|
||
| def _stream_overlaps_position(stream: "Stream", x: float, y: float) -> bool: | ||
| """Return True if the stage position (x, y) falls within the stream's field of view. | ||
|
|
||
| :param stream: A static stream with a getBoundingBox() method. | ||
| :param x: Stage x position in metres. | ||
| :param y: Stage y position in metres. | ||
| :returns: True when the position is inside the bounding box, False otherwise. | ||
| """ | ||
| try: | ||
| bbox = stream.getBoundingBox() # (left, top, right, bottom) in metres | ||
| except Exception: | ||
| return False | ||
| left, top, right, bottom = bbox | ||
| return left <= x <= right and top <= y <= bottom | ||
|
|
||
|
|
||
| def collect_feature_data( | ||
| feature: "CryoFeature", | ||
| overview_streams: Optional[List["Stream"]] = None, | ||
| project_dir: Optional[str] = None, | ||
| ) -> None: | ||
| """Collect anonymized data for a feature and submit it to the data collector. | ||
|
|
||
| Skips immediately if feature.collect is False or if data collection consent | ||
| has not been granted. Never raises — all errors are logged and suppressed. | ||
|
|
||
| The payload contains: | ||
| - First acquired z-stack per FM channel (or first FM image if no z-stack). | ||
| - FM and SEM overview images that spatially overlap the feature's position. | ||
| - Feature status, stage position, and FM focus position. | ||
|
|
||
| Privacy rules enforced: | ||
| - Feature name is never included. | ||
| - Image payload keys are generic (channel_0, overview_fm_0, etc.). | ||
| - Original filenames are not included in the payload. | ||
|
|
||
| After collection feature.collect is set to False to prevent re-collection. | ||
|
|
||
| :param feature: The feature to collect data for. | ||
| :param overview_streams: Optional list of overview static streams. | ||
| Used to find FM / SEM overviews that overlap the feature position. | ||
| :param project_dir: Optional project directory path. When provided and | ||
| feature.streams is empty, streams are loaded from disk first. | ||
| """ | ||
| if not feature.collect: | ||
| return | ||
|
|
||
| try: | ||
| _dc = DataCollector() | ||
| if not _dc.get_consent(): | ||
| return | ||
| except Exception: | ||
| logging.exception("collect_feature_data: failed to access DataCollector; skipping.") | ||
| return | ||
|
|
||
| try: | ||
|
|
||
| # Load feature streams from disk when not yet in memory. | ||
| if not feature.streams.value and project_dir: | ||
| try: | ||
| load_feature_streams_from_disk(feature, project_dir) | ||
| except Exception: | ||
| logging.exception( | ||
| "collect_feature_data: failed to load streams; skipping.") | ||
| return | ||
|
|
||
| feature_streams = list(feature.streams.value) | ||
|
|
||
| # Collect first z-stack per FM channel; fall back to first FM image per channel. | ||
| fm_zstacks: List = [] | ||
| fm_images: List = [] | ||
| for s in feature_streams: | ||
| if isinstance(s, StaticFluoStream): | ||
| if _is_zstack_stream(s): | ||
| fm_zstacks.append(s) | ||
| else: | ||
| fm_images.append(s) | ||
|
|
||
| # Per channel: prefer z-stack, then plain FM image. | ||
| # Channels are delineated by MD_OUT_WL; use index as fallback. | ||
| selected_fm: List = [] | ||
| seen_channels: set = set() | ||
| for s in fm_zstacks + fm_images: | ||
| try: | ||
| channel_key = s.raw[0].metadata.get(model.MD_OUT_WL) | ||
| except (IndexError, AttributeError): | ||
| channel_key = None | ||
| if channel_key not in seen_channels: | ||
| seen_channels.add(channel_key) | ||
| selected_fm.append(s) | ||
|
Comment on lines
+573
to
+584
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use a real per-stream fallback channel key. The comment says “use index as fallback,” but Line 581 sets Proposed fix- for s in fm_zstacks + fm_images:
+ for idx, s in enumerate(fm_zstacks + fm_images):
+ channel_key = ("index", idx)
try:
- channel_key = s.raw[0].metadata.get(model.MD_OUT_WL)
- except (IndexError, AttributeError):
- channel_key = None
+ out_wl = s.raw[0].metadata.get(model.MD_OUT_WL)
+ if out_wl is not None:
+ channel_key = ("out_wl", repr(out_wl))
+ except (IndexError, AttributeError, TypeError):
+ pass
if channel_key not in seen_channels:
seen_channels.add(channel_key)
selected_fm.append(s)🤖 Prompt for AI Agents |
||
|
|
||
| # Collect spatially overlapping overview streams. | ||
| stage_pos = feature.stage_position.value | ||
| feat_x = stage_pos.get("x", 0.0) | ||
| feat_y = stage_pos.get("y", 0.0) | ||
|
|
||
| overview_fm: List = [] | ||
| overview_sem: List = [] | ||
| for s in (overview_streams or []): | ||
| if not _stream_overlaps_position(s, feat_x, feat_y): | ||
| continue | ||
| if isinstance(s, StaticFluoStream): | ||
| overview_fm.append(s) | ||
| elif isinstance(s, (StaticSEMStream, StaticFIBStream)): | ||
| overview_sem.append(s) | ||
|
|
||
| # Build privacy-preserving payload — generic keys, no names or filenames. | ||
| payload: dict = { | ||
| "status": feature.status.value, | ||
| "stage_position": dict(stage_pos), | ||
| "fm_focus_position": dict(feature.fm_focus_position.value), | ||
| } | ||
|
|
||
| def _get_raw(stream: "Stream") -> Optional["model.DataArray"]: | ||
| try: | ||
| return stream.raw[0] if stream.raw else None | ||
| except Exception: | ||
| return None | ||
|
|
||
| for idx, s in enumerate(selected_fm): | ||
| da = _get_raw(s) | ||
| if da is not None: | ||
| payload[f"channel_{idx}"] = da | ||
|
|
||
| for idx, s in enumerate(overview_fm): | ||
| da = _get_raw(s) | ||
| if da is not None: | ||
| payload[f"overview_fm_{idx}"] = da | ||
|
|
||
| for idx, s in enumerate(overview_sem): | ||
| da = _get_raw(s) | ||
| if da is not None: | ||
| payload[f"overview_sem_{idx}"] = da | ||
|
|
||
| image_keys = [k for k in payload if k.startswith(("channel_", "overview_fm_", "overview_sem_"))] | ||
| if not image_keys: | ||
| logging.debug( | ||
| "collect_feature_data: no images found for feature; skipping upload." | ||
| ) | ||
| return | ||
|
|
||
| _dc.record("feature_collected", "1.0", payload) | ||
|
|
||
| feature.collect = False | ||
|
Comment on lines
+539
to
+638
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make the one-shot Concurrent status/posture/delete triggers can all pass Line 539 before Line 638 flips Proposed fix FEATURE_COLLECT_PROBABILITY = 0.2
+_COLLECTION_STATE_LOCK = threading.Lock()- _dc.record("feature_collected", "1.0", payload)
-
- feature.collect = False
+ with _COLLECTION_STATE_LOCK:
+ if not feature.collect:
+ return
+ feature.collect = False
+
+ try:
+ _dc.record("feature_collected", "1.0", payload)
+ except Exception:
+ with _COLLECTION_STATE_LOCK:
+ feature.collect = True
+ raise
logging.debug("collect_feature_data: submitted feature data for collection.")🧰 Tools🪛 Ruff (0.15.10)[warning] 611-611: Do not catch blind exception: (BLE001) 🤖 Prompt for AI Agents |
||
| logging.debug("collect_feature_data: submitted feature data for collection.") | ||
|
|
||
| except Exception: | ||
| logging.exception("collect_feature_data: unexpected error; feature data not collected.") | ||
|
|
||
| # To handle the timeout error when the stage is not able to move to the desired position | ||
| # It logs the message and raises the MoveError exception | ||
| class MoveError(Exception): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
collect_feature_data()instantiates a newDataCollector()on every call. When consent is granted this will create a new background worker thread per invocation (and a new config instance), which can leak threads and increase CPU/memory usage—especially since collection is triggered from multiple GUI events and also runs inside separate threads already. Prefer reusing a single sharedDataCollectorinstance (e.g., inject it from the GUI/controller, or use a module-level singleton) rather than constructing a new one each time.