diff --git a/ndviewer_light/core.py b/ndviewer_light/core.py index d22c87f..e364667 100644 --- a/ndviewer_light/core.py +++ b/ndviewer_light/core.py @@ -1461,8 +1461,16 @@ def __init__(self, dataset_path: str = ""): # External navigation state (push-based API for live acquisition) # _file_index is accessed from both main thread and dask workers, needs lock - self._file_index: Dict[tuple, str] = {} # (t, fov_idx, z, channel) -> filepath + # (t, fov_idx, z, channel) -> (filepath, page_idx) + self._file_index: Dict[Tuple[int, int, int, str], Tuple[str, int]] = {} self._file_index_lock = threading.Lock() + # LRU cache of open TiffFile handles to avoid re-parsing IFD chains. + # Each entry is (TiffFile, per-file Lock) so reads to different files + # proceed in parallel while same-file reads are serialized. + # Bounded to avoid fd exhaustion with thousands of files. + self._tiff_handles: OrderedDict = OrderedDict() # filepath -> (tif, lock) + self._tiff_handles_lock = threading.Lock() # protects the dict itself + self._tiff_handles_max = 128 self._fov_labels: List[str] = [] # ["A1:0", "A1:1", ...] self._channel_names: List[str] = [] self._z_levels: List[int] = [] @@ -1748,6 +1756,7 @@ def start_acquisition( # Clear previous state with self._file_index_lock: self._file_index.clear() + self._close_tiff_handle_cache() self._plane_cache.clear() self._max_fov_per_time.clear() @@ -1826,7 +1835,15 @@ def _rebuild_viewer_for_acquisition(self): self._xarray_data = xarr self._set_ndv_data(xarr) - def register_image(self, t: int, fov_idx: int, z: int, channel: str, filepath: str): + def register_image( + self, + t: int, + fov_idx: int, + z: int, + channel: str, + filepath: str, + page_idx: int = 0, + ): """Register a newly saved image file. Thread-safe: can be called from worker thread. @@ -1838,10 +1855,15 @@ def register_image(self, t: int, fov_idx: int, z: int, channel: str, filepath: s z: Z-level index channel: Channel name filepath: Path to the saved TIFF file + page_idx: Page index within the TIFF file (default 0). + For OME-TIFF stacks that store multiple planes per file, + specify which page to read. """ + if page_idx < 0: + raise ValueError(f"page_idx must be >= 0, got {page_idx}") # Update file index (protected by lock for dask worker thread safety) with self._file_index_lock: - self._file_index[(t, fov_idx, z, channel)] = filepath + self._file_index[(t, fov_idx, z, channel)] = (filepath, page_idx) # Emit signal with raw indices - main thread computes max values # to avoid race condition on _max_time_idx @@ -2014,29 +2036,71 @@ def _load_single_plane( # Load from file (lock protects concurrent access from dask workers) with self._file_index_lock: - filepath = self._file_index.get(cache_key) + entry = self._file_index.get(cache_key) - if not filepath: + if entry is None: # File not yet registered - expected during acquisition, not an error return np.zeros((self._image_height, self._image_width), dtype=np.uint16) + filepath, page_idx = entry + if not LAZY_LOADING_AVAILABLE: logger.error("tifffile not available for loading image planes") return np.zeros((self._image_height, self._image_width), dtype=np.uint16) try: - with tf.TiffFile(filepath) as tif: - plane = tif.pages[0].asarray() - self._plane_cache.put(cache_key, plane) - return plane + # Look up or create a cached (TiffFile, Lock) entry. + # Global lock is held only for dict bookkeeping; the per-file + # lock serializes reads to the same file while allowing parallel + # reads across different files. + evicted_entries = [] + with self._tiff_handles_lock: + entry = self._tiff_handles.get(filepath) + if entry is not None: + tif, file_lock = entry + self._tiff_handles.move_to_end(filepath) + else: + tif = tf.TiffFile(filepath) + file_lock = threading.Lock() + self._tiff_handles[filepath] = (tif, file_lock) + # Evict LRU handles if over limit + while len(self._tiff_handles) > self._tiff_handles_max: + _, evicted = self._tiff_handles.popitem(last=False) + evicted_entries.append(evicted) + # Close evicted handles. Safe because LRU eviction only removes + # the oldest entry, which the current thread just moved away from + # (move_to_end). For another thread to hold a reference to the + # evicted entry, 128+ new files would need to open in the + # microseconds between its lookup and file_lock acquire. + for old_tif, old_lock in evicted_entries: + with old_lock: + self._close_tiff_handles([old_tif]) + # Read page under per-file lock + with file_lock: + plane = tif.pages[page_idx].asarray() + self._plane_cache.put(cache_key, plane) + return plane except FileNotFoundError: logger.warning("Image file not found (may have been deleted): %s", filepath) + except IndexError: + # Evict stale entry so next lookup re-opens the file and sees + # newly appended pages. Don't close the handle here — another + # thread may still hold a reference and be about to read. The + # handle will be closed at end_acquisition() / closeEvent(). + with self._tiff_handles_lock: + self._tiff_handles.pop(filepath, None) + logger.warning( + "Page %d not available in %s (file may still be writing)", + page_idx, + filepath, + ) except PermissionError as e: logger.error("Permission denied reading image %s: %s", filepath, e) except Exception as e: logger.error( - "Failed to load image plane %s (t=%d, fov=%d, z=%d, ch=%s): %s", + "Failed to load image plane %s page %d (t=%d, fov=%d, z=%d, ch=%s): %s", filepath, + page_idx, t, fov_idx, z, @@ -2138,6 +2202,7 @@ def end_acquisition(self): self._acquisition_active = False # NOTE: _fov_labels is NOT cleared here - navigation must still work # after acquisition ends. Labels are cleared in start_acquisition(). + self._close_tiff_handle_cache() logger.info("NDViewer: Acquisition ended") # ───────────────────────────────────────────────────────────────────────── @@ -2774,6 +2839,16 @@ def _close_open_handles(self): self._close_tiff_handles(getattr(self, "_open_handles", [])) self._open_handles = [] + def _close_tiff_handle_cache(self): + """Close cached TiffFile handles used by push-based OME-TIFF loading.""" + with self._tiff_handles_lock: + entries = list(self._tiff_handles.values()) + self._tiff_handles.clear() + # Acquire each per-file lock before closing to wait for in-flight readers. + for tif, file_lock in entries: + with file_lock: + self._close_tiff_handles([tif]) + def closeEvent(self, event): """Clean up resources when the widget is closed.""" if self._refresh_timer: @@ -2800,6 +2875,7 @@ def closeEvent(self, event): with self._zarr_written_planes_lock: self._zarr_written_planes.clear() self._close_open_handles() + self._close_tiff_handle_cache() super().closeEvent(event) def _force_refresh(self): diff --git a/tests/test_push_api.py b/tests/test_push_api.py index aa26d13..83c24f8 100644 --- a/tests/test_push_api.py +++ b/tests/test_push_api.py @@ -449,18 +449,20 @@ def load(self, t, fov_idx, z, channel): return cached with self.lock: - filepath = self.file_index.get(cache_key) + entry = self.file_index.get(cache_key) - if not filepath: + if entry is None: return np.zeros((self.height, self.width), dtype=np.uint16) + filepath, page_idx = entry + if not self.load_from_disk: self.disk_reads += 1 return np.zeros((self.height, self.width), dtype=np.uint16) try: with self._tf.TiffFile(filepath) as tif: - plane = tif.pages[0].asarray() + plane = tif.pages[page_idx].asarray() self.cache.put(cache_key, plane) self.disk_reads += 1 return plane @@ -485,7 +487,7 @@ def test_load_single_plane_returns_zeros_when_file_not_registered(self): def test_load_single_plane_returns_zeros_on_file_not_found(self): """_load_single_plane returns zeros when file doesn't exist.""" loader = _PlaneLoader(height=100, width=100, load_from_disk=True) - loader.file_index[(0, 0, 0, "BF")] = "/nonexistent/path/image.tiff" + loader.file_index[(0, 0, 0, "BF")] = ("/nonexistent/path/image.tiff", 0) result = loader.load(0, 0, 0, "BF") @@ -508,7 +510,7 @@ def test_load_single_plane_loads_valid_tiff(self): test_image = np.random.randint(0, 65535, (50, 50), dtype=np.uint16) tf.imwrite(temp_path, test_image) - loader.file_index[(0, 0, 0, "BF")] = temp_path + loader.file_index[(0, 0, 0, "BF")] = (temp_path, 0) result = loader.load(0, 0, 0, "BF") assert result.shape == (50, 50) @@ -522,6 +524,191 @@ def test_load_single_plane_loads_valid_tiff(self): finally: os.unlink(temp_path) + def test_load_single_plane_loads_specific_page(self): + """_load_single_plane reads the correct page from a multi-page TIFF.""" + import tifffile as tf + + loader = _PlaneLoader(height=50, width=50, load_from_disk=True) + + with tempfile.NamedTemporaryFile(suffix=".tiff", delete=False) as f: + temp_path = f.name + + try: + # Write a multi-page TIFF with 3 distinct pages + pages = [ + np.full((50, 50), fill_value=i * 1000, dtype=np.uint16) + for i in range(3) + ] + with tf.TiffWriter(temp_path) as tw: + for page in pages: + tw.write(page) + + # Read page 0 + loader.file_index[(0, 0, 0, "Ch0")] = (temp_path, 0) + assert np.array_equal(loader.load(0, 0, 0, "Ch0"), pages[0]) + + # Read page 2 + loader.file_index[(0, 0, 0, "Ch2")] = (temp_path, 2) + assert np.array_equal(loader.load(0, 0, 0, "Ch2"), pages[2]) + + # Read page 1 + loader.file_index[(0, 0, 0, "Ch1")] = (temp_path, 1) + assert np.array_equal(loader.load(0, 0, 0, "Ch1"), pages[1]) + finally: + os.unlink(temp_path) + + def test_load_single_plane_returns_zeros_on_out_of_range_page(self): + """_load_single_plane returns zeros when page_idx is out of range.""" + import tifffile as tf + + loader = _PlaneLoader(height=50, width=50, load_from_disk=True) + + with tempfile.NamedTemporaryFile(suffix=".tiff", delete=False) as f: + temp_path = f.name + + try: + # Write a single-page TIFF + test_image = np.ones((50, 50), dtype=np.uint16) * 42 + tf.imwrite(temp_path, test_image) + + # Request page 5 (out of range) + loader.file_index[(0, 0, 0, "BF")] = (temp_path, 5) + result = loader.load(0, 0, 0, "BF") + + assert result.shape == (50, 50) + assert result.dtype == np.uint16 + assert np.all(result == 0) + finally: + os.unlink(temp_path) + + def test_load_single_plane_page0_of_multipage_file(self): + """_load_single_plane correctly reads page 0 from a multi-page TIFF.""" + import tifffile as tf + + loader = _PlaneLoader(height=50, width=50, load_from_disk=True) + + with tempfile.NamedTemporaryFile(suffix=".tiff", delete=False) as f: + temp_path = f.name + + try: + pages = [ + np.full((50, 50), fill_value=i * 100, dtype=np.uint16) for i in range(4) + ] + with tf.TiffWriter(temp_path) as tw: + for page in pages: + tw.write(page) + + # page_idx=0 should still read the correct first page + loader.file_index[(0, 0, 0, "Ch0")] = (temp_path, 0) + result = loader.load(0, 0, 0, "Ch0") + assert np.array_equal(result, pages[0]) + finally: + os.unlink(temp_path) + + def test_load_single_plane_multiple_files_different_pages(self): + """_load_single_plane reads correct pages from different files.""" + import tifffile as tf + + loader = _PlaneLoader(height=50, width=50, load_from_disk=True) + temp_paths = [] + + try: + # Create two multi-page TIFFs with distinct data + for file_idx in range(2): + with tempfile.NamedTemporaryFile(suffix=".tiff", delete=False) as f: + temp_paths.append(f.name) + pages = [ + np.full( + (50, 50), fill_value=(file_idx + 1) * 1000 + i, dtype=np.uint16 + ) + for i in range(3) + ] + with tf.TiffWriter(temp_paths[-1]) as tw: + for page in pages: + tw.write(page) + + # Read page 2 from file 0, page 1 from file 1 + loader.file_index[(0, 0, 0, "Ch0")] = (temp_paths[0], 2) + loader.file_index[(0, 0, 0, "Ch1")] = (temp_paths[1], 1) + + r0 = loader.load(0, 0, 0, "Ch0") + r1 = loader.load(0, 0, 0, "Ch1") + + assert np.all(r0 == 1002) # file 0, page 2 + assert np.all(r1 == 2001) # file 1, page 1 + finally: + for p in temp_paths: + os.unlink(p) + + def test_negative_page_idx_rejected(self): + """Negative page_idx should be rejected by register_image(). + + Tests the validation added to register_image(). Requires the + version from this repo (not an older editable install). + """ + import inspect as _inspect + + from ndviewer_light.core import LightweightViewer + + sig = _inspect.signature(LightweightViewer.register_image) + assert "page_idx" in sig.parameters, ( + "LightweightViewer.register_image is missing 'page_idx' parameter. " + "Tests may be running against an outdated ndviewer_light install." + ) + + import pytest + + dummy = type( + "_D", (), {"_file_index": {}, "_file_index_lock": threading.Lock()} + )() + with pytest.raises((ValueError, TypeError)): + LightweightViewer.register_image(dummy, 0, 0, 0, "BF", "/fake.tiff", -1) + + def test_load_single_plane_concurrent_different_channels(self): + """_load_single_plane handles concurrent reads of different channels.""" + import tifffile as tf + + loader = _PlaneLoader(height=50, width=50, load_from_disk=True) + + with tempfile.NamedTemporaryFile(suffix=".tiff", delete=False) as f: + temp_path = f.name + + try: + pages = [ + np.full((50, 50), fill_value=i * 500, dtype=np.uint16) for i in range(6) + ] + with tf.TiffWriter(temp_path) as tw: + for page in pages: + tw.write(page) + + # Register all 6 channels pointing to same file, different pages + for i in range(6): + loader.file_index[(0, 0, 0, f"Ch{i}")] = (temp_path, i) + + # Load all concurrently from threads + results = [None] * 6 + errors = [] + + def load_channel(idx): + try: + results[idx] = loader.load(0, 0, 0, f"Ch{idx}") + except Exception as e: + errors.append(e) + + threads = [ + threading.Thread(target=load_channel, args=(i,)) for i in range(6) + ] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors, f"Concurrent loads failed: {errors}" + for i in range(6): + assert np.all(results[i] == i * 500), f"Channel {i} has wrong data" + finally: + os.unlink(temp_path) + def test_load_single_plane_uses_cache(self): """_load_single_plane returns cached data without disk access.""" loader = _PlaneLoader(height=50, width=50) @@ -536,6 +723,143 @@ def test_load_single_plane_uses_cache(self): assert loader.disk_reads == 0 # No disk read occurred +class TestLRUHandleCache: + """Tests for the TiffFile handle LRU cache.""" + + def test_cache_stays_bounded_under_stress(self): + """Handle cache does not exceed max size with many files.""" + import tifffile as tf + + from ndviewer_light.core import LightweightViewer + + viewer = LightweightViewer.__new__(LightweightViewer) + viewer._tiff_handles = __import__("collections").OrderedDict() + viewer._tiff_handles_lock = threading.Lock() + viewer._tiff_handles_max = 8 # Small limit for fast test + viewer._file_index = {} + viewer._file_index_lock = threading.Lock() + viewer._plane_cache = __import__( + "ndviewer_light", fromlist=["MemoryBoundedLRUCache"] + ).MemoryBoundedLRUCache(max_memory_bytes=64 * 1024 * 1024) + viewer._image_height = 32 + viewer._image_width = 32 + + tmpdir = tempfile.mkdtemp() + try: + # Create 20 multi-page TIFFs (more than cache max of 8) + for fov in range(20): + path = os.path.join(tmpdir, f"fov{fov}.tiff") + with tf.TiffWriter(path) as tw: + for c in range(3): + tw.write( + np.full((32, 32), fill_value=fov * 100 + c, dtype=np.uint16) + ) + for c in range(3): + viewer.register_image( + t=0, + fov_idx=fov, + z=0, + channel=f"Ch{c}", + filepath=path, + page_idx=c, + ) + + # Force loading planes from all 20 files + for fov in range(20): + for c in range(3): + viewer._load_single_plane(0, fov, 0, f"Ch{c}") + + assert ( + len(viewer._tiff_handles) <= viewer._tiff_handles_max + ), f"Cache size {len(viewer._tiff_handles)} exceeds max {viewer._tiff_handles_max}" + finally: + # Clean up handles + for tif, _ in viewer._tiff_handles.values(): + try: + tif.close() + except Exception: + pass + viewer._tiff_handles.clear() + import shutil + + shutil.rmtree(tmpdir) + + def test_concurrent_register_and_load(self): + """Concurrent registration and loading does not corrupt state.""" + import tifffile as tf + + from ndviewer_light.core import LightweightViewer + + viewer = LightweightViewer.__new__(LightweightViewer) + viewer._tiff_handles = __import__("collections").OrderedDict() + viewer._tiff_handles_lock = threading.Lock() + viewer._tiff_handles_max = 128 + viewer._file_index = {} + viewer._file_index_lock = threading.Lock() + viewer._plane_cache = __import__( + "ndviewer_light", fromlist=["MemoryBoundedLRUCache"] + ).MemoryBoundedLRUCache(max_memory_bytes=64 * 1024 * 1024) + viewer._image_height = 32 + viewer._image_width = 32 + + n_fovs = 20 + n_channels = 4 + tmpdir = tempfile.mkdtemp() + errors = [] + + try: + # Pre-create files + paths = [] + for fov in range(n_fovs): + path = os.path.join(tmpdir, f"fov{fov}.tiff") + paths.append(path) + with tf.TiffWriter(path) as tw: + for c in range(n_channels): + tw.write( + np.full((32, 32), fill_value=fov * 100 + c, dtype=np.uint16) + ) + + def worker(fov_start, fov_end): + try: + for fov in range(fov_start, fov_end): + for c in range(n_channels): + viewer.register_image( + t=0, + fov_idx=fov, + z=0, + channel=f"Ch{c}", + filepath=paths[fov], + page_idx=c, + ) + viewer._load_single_plane(0, fov, 0, f"Ch{c}") + except Exception as e: + errors.append(e) + + threads = [] + chunk = n_fovs // 4 + for i in range(4): + start = i * chunk + end = start + chunk if i < 3 else n_fovs + t = threading.Thread(target=worker, args=(start, end)) + threads.append(t) + t.start() + for t in threads: + t.join() + + assert not errors, f"Concurrent ops failed: {errors}" + assert len(viewer._file_index) == n_fovs * n_channels + finally: + for tif, _ in viewer._tiff_handles.values(): + try: + tif.close() + except Exception: + pass + viewer._tiff_handles.clear() + import shutil + + shutil.rmtree(tmpdir) + + def _format_fov_label(fov_labels, value): """Format FOV label text for the given slider value.