Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions debian/control
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ Depends: ${shlibs:Depends},
python3-libusb1,
# Needed for acq.fastem
python3-shapely,
# Needed for communicating with AWS S3
python3-boto3,
Suggests: imagej
Description: Open Delmic Microscope Software
Odemis is the acquisition software for the Delmic microscopes. In particular,
Expand Down
26 changes: 26 additions & 0 deletions scripts/odemis-dc-fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Fetch DataCollector ZIP samples from S3.

Examples:
./scripts/odemis-dc-fetch.py
./scripts/odemis-dc-fetch.py --output ./downloads
./scripts/odemis-dc-fetch.py --event z_stack_acquired
./scripts/odemis-dc-fetch.py --since 2026-03-01
./scripts/odemis-dc-fetch.py --host meteor-5099
./scripts/odemis-dc-fetch.py --host meteor-5099,atlas-001,secom-22
./scripts/odemis-dc-fetch.py --bucket delmic-odemis-collect-test --region eu-west-1
./scripts/odemis-dc-fetch.py --since 2026-03-01T12:30:00 --event z_stack_acquired --output ./dc_samples
"""

import logging
import sys

from odemis.util.dc_fetch import main


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
rc = main(sys.argv[1:])
logging.shutdown()
sys.exit(rc)
176 changes: 173 additions & 3 deletions src/odemis/acq/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@
MicroscopePostureManager,
)
from odemis.acq.stitching._tiledacq import SAFE_REL_RANGE_DEFAULT
from odemis.acq.stream import Stream, StaticFluoStream
from odemis.acq.stream import Stream, StaticFluoStream, StaticSEMStream, StaticFIBStream
from odemis.dataio import find_fittest_converter, get_available_formats
from odemis.util import dataio, executeAsyncTask
from odemis.util.comp import generate_zlevels
from odemis.util.datacollector import DataCollector
from odemis.util.dataio import data_to_static_streams, open_acquisition, splitext
from odemis.util.driver import estimate_stage_movement_time
from odemis.util.filename import create_filename
Expand All @@ -70,6 +71,9 @@

REFERENCE_IMAGE_FILENAME = "Reference-Alignment-FIB.ome.tiff"

# Probability that a newly created feature is marked as eligible for data collection.
FEATURE_COLLECT_PROBABILITY = 0.2

USER_MILLING_TASKS_PATH = os.path.expanduser("~/.config/odemis/milling_tasks.yaml")


Expand Down Expand Up @@ -173,14 +177,19 @@ def __init__(self, name: str,
stage_position: Dict[str, float],
fm_focus_position: Dict[str, float],
streams: Optional[List[Stream]] = None,
milling_tasks: Optional[Dict[str, MillingTaskSettings]] = None, correlation_data=None):
milling_tasks: Optional[Dict[str, MillingTaskSettings]] = None,
correlation_data=None,
collect: bool = False):
"""
:param name: (string) the feature name
:param stage_position: (dict) the stage position of the feature (stage-bare)
:param fm_focus_position: (dict) the focus position of the feature
:param streams: (List of StaticStream) list of acquired streams on this feature
:param correlation_data: (Dict[str,FIBFMCorrelationData]) Dictionary mapping the feature status to
FIBFMCorrelationData, where feature status like Active, Rough Milled or polished is the key.
:param collect: (bool) Whether this feature is eligible for data collection.
Defaults to False. The GUI sets this based on the per-project sampling
decision made when a project is opened or created.
"""
self.name = model.StringVA(name)
# FIXME: The 'position' parameter should eventually contain the SampleStage coordinates and not stage bare from the stage_position!
Expand Down Expand Up @@ -211,6 +220,10 @@ def __init__(self, name: str,
correlation_data = {}
self.correlation_data = correlation_data

# Whether this feature is eligible for data collection.
# Set by the GUI from the per-project sampling decision; persisted in features.json.
self.collect: bool = collect

# attributes for automated milling
self.path: str = None # TODO:support path creation here, rather than on milling data save
self.reference_image: model.DataArray = None
Expand Down Expand Up @@ -318,6 +331,7 @@ def get_features_dict(features: List[CryoFeature]) -> Dict[str, str]:
for feature in features:
feature_item = {'name': feature.name.value,
'status': feature.status.value,
'collect': feature.collect,
'stage_position': feature.stage_position.value,
'fm_focus_position': feature.fm_focus_position.value,
'posture_positions': feature.posture_positions,
Expand Down Expand Up @@ -351,7 +365,8 @@ def object_hook(self, obj):
milling_task_json = obj.get('milling_tasks', {})
feature = CryoFeature(name=obj['name'],
stage_position=stage_position,
fm_focus_position=fm_focus_position
fm_focus_position=fm_focus_position,
collect=obj.get('collect', False)
)
feature.correlation_data = FIBFMCorrelationData.from_dict(correlation_data) if correlation_data else None
feature.status.value = obj['status']
Expand Down Expand Up @@ -471,6 +486,161 @@ def _create_fibsem_filename(filename: str, acq_type: str) -> str:

return create_filename(path, ptn, ext, count="001")


def _is_zstack_stream(stream: "Stream") -> bool:
"""Return True if the stream contains z-stack (3-D ZYX) data."""
return hasattr(stream, "zIndex")


def _stream_overlaps_position(stream: "Stream", x: float, y: float) -> bool:
"""Return True if the stage position (x, y) falls within the stream's field of view.

:param stream: A static stream with a getBoundingBox() method.
:param x: Stage x position in metres.
:param y: Stage y position in metres.
:returns: True when the position is inside the bounding box, False otherwise.
"""
try:
bbox = stream.getBoundingBox() # (left, top, right, bottom) in metres
except Exception:
return False
left, top, right, bottom = bbox
return left <= x <= right and top <= y <= bottom


def collect_feature_data(
feature: "CryoFeature",
overview_streams: Optional[List["Stream"]] = None,
project_dir: Optional[str] = None,
) -> None:
"""Collect anonymized data for a feature and submit it to the data collector.

Skips immediately if feature.collect is False or if data collection consent
has not been granted. Never raises — all errors are logged and suppressed.

The payload contains:
- First acquired z-stack per FM channel (or first FM image if no z-stack).
- FM and SEM overview images that spatially overlap the feature's position.
- Feature status, stage position, and FM focus position.

Privacy rules enforced:
- Feature name is never included.
- Image payload keys are generic (channel_0, overview_fm_0, etc.).
- Original filenames are not included in the payload.

After collection feature.collect is set to False to prevent re-collection.

:param feature: The feature to collect data for.
:param overview_streams: Optional list of overview static streams.
Used to find FM / SEM overviews that overlap the feature position.
:param project_dir: Optional project directory path. When provided and
feature.streams is empty, streams are loaded from disk first.
"""
if not feature.collect:
return

try:
_dc = DataCollector()
if not _dc.get_consent():
return
Comment on lines +542 to +545
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collect_feature_data() instantiates a new DataCollector() on every call. When consent is granted this will create a new background worker thread per invocation (and a new config instance), which can leak threads and increase CPU/memory usage—especially since collection is triggered from multiple GUI events and also runs inside separate threads already. Prefer reusing a single shared DataCollector instance (e.g., inject it from the GUI/controller, or use a module-level singleton) rather than constructing a new one each time.

Copilot uses AI. Check for mistakes.
except Exception:
logging.exception("collect_feature_data: failed to access DataCollector; skipping.")
return

try:

# Load feature streams from disk when not yet in memory.
if not feature.streams.value and project_dir:
try:
load_feature_streams_from_disk(feature, project_dir)
except Exception:
logging.exception(
"collect_feature_data: failed to load streams; skipping.")
return

feature_streams = list(feature.streams.value)

# Collect first z-stack per FM channel; fall back to first FM image per channel.
fm_zstacks: List = []
fm_images: List = []
for s in feature_streams:
if isinstance(s, StaticFluoStream):
if _is_zstack_stream(s):
fm_zstacks.append(s)
else:
fm_images.append(s)

# Per channel: prefer z-stack, then plain FM image.
# Channels are delineated by MD_OUT_WL; use index as fallback.
selected_fm: List = []
seen_channels: set = set()
for s in fm_zstacks + fm_images:
try:
channel_key = s.raw[0].metadata.get(model.MD_OUT_WL)
except (IndexError, AttributeError):
channel_key = None
if channel_key not in seen_channels:
seen_channels.add(channel_key)
selected_fm.append(s)
Comment on lines +573 to +584
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use a real per-stream fallback channel key.

The comment says “use index as fallback,” but Line 581 sets channel_key = None; every stream missing MD_OUT_WL then shares the same key, so only the first such FM stream is collected.

Proposed fix
-        for s in fm_zstacks + fm_images:
+        for idx, s in enumerate(fm_zstacks + fm_images):
+            channel_key = ("index", idx)
             try:
-                channel_key = s.raw[0].metadata.get(model.MD_OUT_WL)
-            except (IndexError, AttributeError):
-                channel_key = None
+                out_wl = s.raw[0].metadata.get(model.MD_OUT_WL)
+                if out_wl is not None:
+                    channel_key = ("out_wl", repr(out_wl))
+            except (IndexError, AttributeError, TypeError):
+                pass
             if channel_key not in seen_channels:
                 seen_channels.add(channel_key)
                 selected_fm.append(s)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/odemis/acq/feature.py` around lines 573 - 584, The code currently sets
channel_key = None when MD_OUT_WL is missing, causing all such streams to
collide and only the first to be kept; change to use a per-stream fallback like
the stream's position so missing MD_OUT_WL yields a unique key. Iterate with an
index over the concatenated list (fm_zstacks + fm_images) and set channel_key =
s.raw[0].metadata.get(model.MD_OUT_WL, f"fallback:{i}") (or similar unique
identifier from the stream) before checking seen_channels; update references to
selected_fm and seen_channels accordingly.


# Collect spatially overlapping overview streams.
stage_pos = feature.stage_position.value
feat_x = stage_pos.get("x", 0.0)
feat_y = stage_pos.get("y", 0.0)

overview_fm: List = []
overview_sem: List = []
for s in (overview_streams or []):
if not _stream_overlaps_position(s, feat_x, feat_y):
continue
if isinstance(s, StaticFluoStream):
overview_fm.append(s)
elif isinstance(s, (StaticSEMStream, StaticFIBStream)):
overview_sem.append(s)

# Build privacy-preserving payload — generic keys, no names or filenames.
payload: dict = {
"status": feature.status.value,
"stage_position": dict(stage_pos),
"fm_focus_position": dict(feature.fm_focus_position.value),
}

def _get_raw(stream: "Stream") -> Optional["model.DataArray"]:
try:
return stream.raw[0] if stream.raw else None
except Exception:
return None

for idx, s in enumerate(selected_fm):
da = _get_raw(s)
if da is not None:
payload[f"channel_{idx}"] = da

for idx, s in enumerate(overview_fm):
da = _get_raw(s)
if da is not None:
payload[f"overview_fm_{idx}"] = da

for idx, s in enumerate(overview_sem):
da = _get_raw(s)
if da is not None:
payload[f"overview_sem_{idx}"] = da

image_keys = [k for k in payload if k.startswith(("channel_", "overview_fm_", "overview_sem_"))]
if not image_keys:
logging.debug(
"collect_feature_data: no images found for feature; skipping upload."
)
return

_dc.record("feature_collected", "1.0", payload)

feature.collect = False
Comment on lines +539 to +638
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make the one-shot collect transition atomic.

Concurrent status/posture/delete triggers can all pass Line 539 before Line 638 flips feature.collect, causing duplicate submissions for the same feature.

Proposed fix
 FEATURE_COLLECT_PROBABILITY = 0.2
+_COLLECTION_STATE_LOCK = threading.Lock()
-        _dc.record("feature_collected", "1.0", payload)
-
-        feature.collect = False
+        with _COLLECTION_STATE_LOCK:
+            if not feature.collect:
+                return
+            feature.collect = False
+
+        try:
+            _dc.record("feature_collected", "1.0", payload)
+        except Exception:
+            with _COLLECTION_STATE_LOCK:
+                feature.collect = True
+            raise
         logging.debug("collect_feature_data: submitted feature data for collection.")
🧰 Tools
🪛 Ruff (0.15.10)

[warning] 611-611: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/odemis/acq/feature.py` around lines 539 - 638, The race is that multiple
threads can pass the initial if not feature.collect check and all run until
feature.collect is set False at the end; make the one-shot flip atomic by
performing an immediate compare-and-set at the start of the routine (e.g., in
collect_feature_data): replace the plain boolean check of feature.collect with
an atomic operation that sets feature.collect to False only if it was True (or
acquire a per-feature lock keyed by feature id, check feature.collect and set it
False while holding the lock), and if the CAS/lock indicates collect was already
False return early; reference feature.collect and the top-level routine
(collect_feature_data in this file) when adding the atomic CAS or lock.

logging.debug("collect_feature_data: submitted feature data for collection.")

except Exception:
logging.exception("collect_feature_data: unexpected error; feature data not collected.")

# To handle the timeout error when the stage is not able to move to the desired position
# It logs the message and raises the MoveError exception
class MoveError(Exception):
Expand Down
Loading
Loading