Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 78 additions & 10 deletions src/cachier/cores/pickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
# Licensed under the MIT license:
# http://www.opensource.org/licenses/MIT-license
# Copyright (c) 2016, Shay Palachy <shaypal5@gmail.com>
import logging
import os
import pickle # for local caching
import time
from datetime import datetime
from typing import Any, Dict, Optional, Tuple, Union

Expand Down Expand Up @@ -51,12 +53,14 @@ def _check_calculation(self) -> None:
if not entry._processing:
# print('stopping observer!')
self.value = entry.value
self.observer.stop()
if self.observer is not None:
self.observer.stop()
# else:
# print('NOT stopping observer... :(')
except AttributeError: # catching entry being None
self.value = None
self.observer.stop()
if self.observer is not None:
self.observer.stop()

def on_created(self, event) -> None:
"""A Watchdog Event Handler method.""" # noqa: D401
Expand Down Expand Up @@ -256,29 +260,93 @@ def mark_entry_not_calculated(self, key: str) -> None:
cache[key]._processing = False
self._save_cache(cache)

def _create_observer(self) -> Observer:
"""Create a new observer instance."""
return Observer()

def _cleanup_observer(self, observer: Observer) -> None:
"""Clean up observer properly."""
try:
if observer.is_alive():
observer.stop()
observer.join(timeout=1.0)
except Exception as e:
logging.debug("Observer cleanup failed: %s", e)

def wait_on_entry_calc(self, key: str) -> Any:
"""Wait for entry calculation to complete with inotify protection."""
if self.separate_files:
entry = self._load_cache_by_key(key)
filename = f"{self.cache_fname}_{key}"
else:
with self.lock:
entry = self.get_cache_dict()[key]
entry = self.get_cache_dict().get(key)
filename = self.cache_fname

if entry and not entry._processing:
return entry.value

# Try to use inotify-based waiting
try:
return self._wait_with_inotify(key, filename)
except OSError as e:
if "inotify instance limit reached" in str(e):
# Fall back to polling if inotify limit is reached
return self._wait_with_polling(key)
else:
raise

def _wait_with_inotify(self, key: str, filename: str) -> Any:
"""Wait for calculation using inotify with proper cleanup."""
event_handler = _PickleCore.CacheChangeHandler(
filename=filename, core=self, key=key
)
observer = Observer()

observer = self._create_observer()
event_handler.inject_observer(observer)
observer.schedule(event_handler, path=self.cache_dir, recursive=True)
observer.start()

try:
observer.schedule(
event_handler, path=self.cache_dir, recursive=True
)
observer.start()

time_spent = 0
while observer.is_alive():
observer.join(timeout=1.0)
time_spent += 1
self.check_calc_timeout(time_spent)

# Check if calculation is complete
if event_handler.value is not None:
break

return event_handler.value
finally:
# Always cleanup the observer
self._cleanup_observer(observer)

def _wait_with_polling(self, key: str) -> Any:
"""Fallback method using polling instead of inotify."""
time_spent = 0
while observer.is_alive():
observer.join(timeout=1.0)
while True:
time.sleep(1) # Poll every 1 second (matching other cores)
time_spent += 1
self.check_calc_timeout(time_spent)
return event_handler.value

try:
if self.separate_files:
entry = self._load_cache_by_key(key)
else:
with self.lock:
entry = self.get_cache_dict().get(key)

if entry and not entry._processing:
return entry.value

self.check_calc_timeout(time_spent)
except (FileNotFoundError, EOFError):
# Continue polling even if there are file errors
pass

def clear_cache(self) -> None:
if self.separate_files:
Expand Down
6 changes: 0 additions & 6 deletions tests/test_pickle_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,12 +615,6 @@ def _params_with_dataframe(*args, **kwargs):
not sys.platform.startswith("linux"),
reason="inotify instance limit is only relevant on Linux",
)
@pytest.mark.xfail(
reason=(
"inotify instance limit issue not yet fixed - test will pass "
"when issue is resolved"
)
)
def test_inotify_instance_limit_reached():
"""Reproduces the inotify instance exhaustion issue (see Issue #24).

Expand Down
Loading