diff --git a/CHANGELOG.md b/CHANGELOG.md index e265bb743a..7b17558948 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,14 @@ # Changelog +## 44.8.0 [#1376](https://github.com/openfisca/openfisca-core/pull/1376) + +#### New features + +- Introduce `_FactualStore` and `_CounterFactualStore` + - Allows for swappable variable storing mechanism supporting counter-factual variable holders +- Introduce `_LRUCache` and `_FIFOCache` + - Allows for swappable variable caching eviction policy, used notably by counter-factual calculations for O(k) reads + ## 44.7.0 [#1357](https://github.com/openfisca/openfisca-core/pull/1357) #### New features diff --git a/openfisca_core/commons/misc.py b/openfisca_core/commons/misc.py index 33f2e6eb8d..91f1364b74 100644 --- a/openfisca_core/commons/misc.py +++ b/openfisca_core/commons/misc.py @@ -1,18 +1,22 @@ from __future__ import annotations +from typing import TypeVar, cast + import numexpr from openfisca_core import types as t +_T = TypeVar("_T") + -def empty_clone(original: object) -> object: +def empty_clone(original: _T) -> _T: """Create an empty instance of the same class of the original object. Args: original: An object to clone. Returns: - object: The cloned, empty, object. + _T: The cloned, empty, object. Examples: >>> Foo = type("Foo", (list,), {}) @@ -31,15 +35,11 @@ def empty_clone(original: object) -> object: def __init__(_: object) -> None: ... - Dummy = type( - "Dummy", - (original.__class__,), - {"__init__": __init__}, - ) + dummy = type("Dummy", (type(original),), {"__init__": __init__}) + new = dummy() + new.__class__ = type(original) - new = Dummy() - new.__class__ = original.__class__ - return new + return cast(_T, new) def stringify_array(array: None | t.VarArray) -> str: diff --git a/openfisca_core/data_storage/__init__.py b/openfisca_core/data_storage/__init__.py index 4dbbb89543..77f1cd2310 100644 --- a/openfisca_core/data_storage/__init__.py +++ b/openfisca_core/data_storage/__init__.py @@ -1,7 +1,7 @@ """Different storage backends for the data of a simulation.""" -from . import types +from ._cache import StorageCache from .in_memory_storage import InMemoryStorage from .on_disk_storage import OnDiskStorage -__all__ = ["InMemoryStorage", "OnDiskStorage", "types"] +__all__ = ["StorageCache", "InMemoryStorage", "OnDiskStorage"] diff --git a/openfisca_core/data_storage/_cache.py b/openfisca_core/data_storage/_cache.py new file mode 100644 index 0000000000..4246403da3 --- /dev/null +++ b/openfisca_core/data_storage/_cache.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +from collections.abc import Callable, Iterable + +from openfisca_core import periods, types as t + + +def _day_period(instant: t.Instant) -> t.Period: + return periods.Period((periods.DAY, instant, 1)) + + +class StorageCache(t.Cache[t.Instant, t.Snapshot]): + """Cache adapter mapping Instant keys onto InMemoryStorage or OnDiskStorage. + + Instants are stored as single-day Periods so both storage backends work + transparently. Passing an OnDiskStorage lets snapshot arrays be offloaded + to disk under memory pressure. + """ + + def __init__(self, storage: t.Storage) -> None: + self._storage = storage + self._patch_idxs: dict[t.Instant, int] = {} + + def put(self, key: t.Instant, value: t.Snapshot) -> None: + """Store a snapshot for the given instant.""" + array, patch_idx = value + self._storage.put(array, _day_period(key)) + self._patch_idxs[key] = patch_idx + + def get(self, key: t.Instant) -> t.Snapshot | None: + """Return the snapshot for the given instant, or None if not cached.""" + if key not in self._patch_idxs: + return None + + array = self._storage.get(_day_period(key)) + + if array is None: + return None + + return (array, self._patch_idxs[key]) + + def __contains__(self, key: object) -> bool: + return key in self._patch_idxs + + def items(self) -> Iterable[tuple[t.Instant, t.Snapshot]]: + """Yield all (instant, snapshot) pairs currently in the cache.""" + for instant, patch_idx in self._patch_idxs.items(): + array = self._storage.get(_day_period(instant)) + + if array is not None: + yield instant, (array, patch_idx) + + def evict(self, predicate: Callable[[t.Instant], bool]) -> None: + """Remove all entries whose instant satisfies predicate.""" + to_evict = [k for k in self._patch_idxs if predicate(k)] + + for k in to_evict: + self._storage.delete(_day_period(k)) + del self._patch_idxs[k] + + def clear(self) -> None: + """Remove all entries from the cache and the backing storage.""" + for k in list(self._patch_idxs): + self._storage.delete(_day_period(k)) + + self._patch_idxs.clear() diff --git a/openfisca_core/data_storage/in_memory_storage.py b/openfisca_core/data_storage/in_memory_storage.py index d4d5240c92..fb6b85fd66 100644 --- a/openfisca_core/data_storage/in_memory_storage.py +++ b/openfisca_core/data_storage/in_memory_storage.py @@ -4,13 +4,11 @@ import numpy -from openfisca_core import periods +from openfisca_core import periods, types as t from openfisca_core.periods import DateUnit -from . import types as t - -class InMemoryStorage: +class InMemoryStorage(t.InMemoryStorage): """Storing and retrieving calculated vectors in memory. Args: diff --git a/openfisca_core/data_storage/on_disk_storage.py b/openfisca_core/data_storage/on_disk_storage.py index 731a8dcf7e..b5fbf7952e 100644 --- a/openfisca_core/data_storage/on_disk_storage.py +++ b/openfisca_core/data_storage/on_disk_storage.py @@ -7,14 +7,12 @@ import numpy -from openfisca_core import periods +from openfisca_core import periods, types as t from openfisca_core.indexed_enums import EnumArray from openfisca_core.periods import DateUnit -from . import types as t - -class OnDiskStorage: +class OnDiskStorage(t.OnDiskStorage): """Storing and retrieving calculated vectors on disk. Args: diff --git a/openfisca_core/data_storage/tests/__init__.py b/openfisca_core/data_storage/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/openfisca_core/data_storage/tests/test_in_memory_storage.py b/openfisca_core/data_storage/tests/test_in_memory_storage.py new file mode 100644 index 0000000000..c9ccf72e7a --- /dev/null +++ b/openfisca_core/data_storage/tests/test_in_memory_storage.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +import numpy + +from openfisca_core import data_storage, periods, types as t + + +def period(s: str) -> t.Period: + return periods.period(s) + + +def arr(*values: float) -> t.VarArray: + return numpy.array(values, dtype=numpy.float32) + + +def test_get_unknown_period_returns_none() -> None: + storage = data_storage.InMemoryStorage() + assert storage.get(period("2020")) is None + + +def test_put_and_get_returns_value() -> None: + storage = data_storage.InMemoryStorage() + storage.put(arr(1.0, 2.0), period("2020")) + numpy.testing.assert_array_equal(storage.get(period("2020")), arr(1.0, 2.0)) + + +def test_get_known_periods_empty() -> None: + storage = data_storage.InMemoryStorage() + assert list(storage.get_known_periods()) == [] + + +def test_get_known_periods_after_put() -> None: + storage = data_storage.InMemoryStorage() + p = period("2020") + storage.put(arr(1.0), p) + assert p in storage.get_known_periods() + + +def test_delete_specific_period() -> None: + storage = data_storage.InMemoryStorage() + storage.put(arr(1.0), period("2020")) + storage.delete(period("2020")) + assert storage.get(period("2020")) is None + + +def test_delete_all() -> None: + storage = data_storage.InMemoryStorage() + storage.put(arr(1.0), period("2020")) + storage.put(arr(2.0), period("2021")) + storage.delete() + assert storage.get(period("2020")) is None + assert storage.get(period("2021")) is None + + +def test_delete_year_removes_contained_months() -> None: + storage = data_storage.InMemoryStorage() + p_month = period("2020-03") + storage.put(arr(1.0), p_month) + storage.delete(period("2020")) + assert storage.get(p_month) is None + + +def test_delete_year_keeps_other_year() -> None: + storage = data_storage.InMemoryStorage() + storage.put(arr(1.0), period("2020")) + storage.put(arr(2.0), period("2021")) + storage.delete(period("2020")) + numpy.testing.assert_array_equal(storage.get(period("2021")), arr(2.0)) + + +def test_is_eternal_stores_under_eternity_key() -> None: + storage = data_storage.InMemoryStorage(is_eternal=True) + storage.put(arr(1.0), period("2020")) + numpy.testing.assert_array_equal(storage.get(period("2021")), arr(1.0)) + + +def test_get_memory_usage_empty() -> None: + storage = data_storage.InMemoryStorage() + usage = storage.get_memory_usage() + assert usage["nb_arrays"] == 0 + assert usage["total_nb_bytes"] == 0 + + +def test_get_memory_usage_after_put() -> None: + storage = data_storage.InMemoryStorage() + storage.put(arr(1.0, 2.0), period("2020")) + usage = storage.get_memory_usage() + assert usage["nb_arrays"] == 1 + assert usage["total_nb_bytes"] > 0 + + +def test_overwrite_period_replaces_value() -> None: + storage = data_storage.InMemoryStorage() + storage.put(arr(1.0), period("2020")) + storage.put(arr(9.0), period("2020")) + numpy.testing.assert_array_equal(storage.get(period("2020")), arr(9.0)) diff --git a/openfisca_core/data_storage/tests/test_on_disk_storage.py b/openfisca_core/data_storage/tests/test_on_disk_storage.py new file mode 100644 index 0000000000..23f3a38bd6 --- /dev/null +++ b/openfisca_core/data_storage/tests/test_on_disk_storage.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import numpy +import pytest + +from openfisca_core import data_storage, indexed_enums, periods, types as t + + +def period(s: str) -> t.Period: + return periods.period(s) + + +def arr(*values: float) -> t.VarArray: + return numpy.array(values, dtype=numpy.float32) + + +class Housing(indexed_enums.Enum): + OWNER = "Owner" + TENANT = "Tenant" + + +def test_get_unknown_period_returns_none(tmp_path: pytest.TempPathFactory) -> None: + storage = data_storage.OnDiskStorage(str(tmp_path), preserve_storage_dir=True) + assert storage.get(period("2020")) is None + + +def test_put_and_get_returns_value(tmp_path: pytest.TempPathFactory) -> None: + storage = data_storage.OnDiskStorage(str(tmp_path), preserve_storage_dir=True) + storage.put(arr(1.0, 2.0), period("2020")) + numpy.testing.assert_array_equal(storage.get(period("2020")), arr(1.0, 2.0)) + + +def test_get_known_periods_empty(tmp_path: pytest.TempPathFactory) -> None: + storage = data_storage.OnDiskStorage(str(tmp_path), preserve_storage_dir=True) + assert list(storage.get_known_periods()) == [] + + +def test_get_known_periods_after_put(tmp_path: pytest.TempPathFactory) -> None: + storage = data_storage.OnDiskStorage(str(tmp_path), preserve_storage_dir=True) + p = period("2020") + storage.put(arr(1.0), p) + assert p in storage.get_known_periods() + + +def test_delete_specific_period(tmp_path: pytest.TempPathFactory) -> None: + storage = data_storage.OnDiskStorage(str(tmp_path), preserve_storage_dir=True) + storage.put(arr(1.0), period("2020")) + storage.delete(period("2020")) + assert storage.get(period("2020")) is None + + +def test_delete_all(tmp_path: pytest.TempPathFactory) -> None: + storage = data_storage.OnDiskStorage(str(tmp_path), preserve_storage_dir=True) + storage.put(arr(1.0), period("2020")) + storage.put(arr(2.0), period("2021")) + storage.delete() + assert storage.get(period("2020")) is None + assert storage.get(period("2021")) is None + + +def test_delete_year_removes_contained_months(tmp_path: pytest.TempPathFactory) -> None: + storage = data_storage.OnDiskStorage(str(tmp_path), preserve_storage_dir=True) + p_month = period("2020-03") + storage.put(arr(1.0), p_month) + storage.delete(period("2020")) + assert storage.get(p_month) is None + + +def test_is_eternal_stores_under_eternity_key(tmp_path: pytest.TempPathFactory) -> None: + storage = data_storage.OnDiskStorage( + str(tmp_path), is_eternal=True, preserve_storage_dir=True + ) + storage.put(arr(1.0), period("2020")) + numpy.testing.assert_array_equal(storage.get(period("2021")), arr(1.0)) + + +def test_overwrite_period_replaces_value(tmp_path: pytest.TempPathFactory) -> None: + storage = data_storage.OnDiskStorage(str(tmp_path), preserve_storage_dir=True) + storage.put(arr(1.0), period("2020")) + storage.put(arr(9.0), period("2020")) + numpy.testing.assert_array_equal(storage.get(period("2020")), arr(9.0)) + + +def test_restore_reloads_files(tmp_path: pytest.TempPathFactory) -> None: + p = period("2020") + storage1 = data_storage.OnDiskStorage(str(tmp_path), preserve_storage_dir=True) + storage1.put(arr(1.0, 2.0), p) + storage2 = data_storage.OnDiskStorage(str(tmp_path), preserve_storage_dir=True) + assert storage2.get(p) is None + storage2.restore() + numpy.testing.assert_array_equal(storage2.get(p), arr(1.0, 2.0)) + + +def test_enum_array_round_trip(tmp_path: pytest.TempPathFactory) -> None: + storage = data_storage.OnDiskStorage(str(tmp_path), preserve_storage_dir=True) + p = period("2020") + value = indexed_enums.EnumArray(numpy.array([1], dtype=numpy.int32), Housing) + storage.put(value, p) + result = storage.get(p) + assert isinstance(result, indexed_enums.EnumArray) + assert result.possible_values is Housing diff --git a/openfisca_core/data_storage/tests/test_storage_cache.py b/openfisca_core/data_storage/tests/test_storage_cache.py new file mode 100644 index 0000000000..c6aa2d17c8 --- /dev/null +++ b/openfisca_core/data_storage/tests/test_storage_cache.py @@ -0,0 +1,127 @@ +from __future__ import annotations + +import numpy +import pytest + +from openfisca_core import data_storage, periods, types as t + + +def instant(s: str) -> t.Instant: + return periods.period(s).start + + +def arr(*values: float) -> t.VarArray: + return numpy.array(values, dtype=numpy.float32) + + +def make_cache() -> data_storage.StorageCache: + return data_storage.StorageCache(data_storage.InMemoryStorage()) + + +def test_get_unknown_key_returns_none() -> None: + cache = make_cache() + assert cache.get(instant("2020")) is None + + +def test_put_and_get_returns_snapshot() -> None: + cache = make_cache() + snap: t.Snapshot = (arr(1.0, 2.0), 0) + cache.put(instant("2020"), snap) + result = cache.get(instant("2020")) + assert result is not None + array, patch_idx = result + numpy.testing.assert_array_equal(array, arr(1.0, 2.0)) + assert patch_idx == 0 + + +def test_patch_idx_is_preserved() -> None: + cache = make_cache() + cache.put(instant("2020"), (arr(1.0), 5)) + result = cache.get(instant("2020")) + assert result is not None + _, patch_idx = result + assert patch_idx == 5 + + +def test_contains_after_put() -> None: + cache = make_cache() + key = instant("2020") + assert key not in cache + cache.put(key, (arr(1.0), 0)) + assert key in cache + + +def test_items_yields_stored_entries() -> None: + cache = make_cache() + k1, k2 = instant("2020"), instant("2021") + cache.put(k1, (arr(1.0), 0)) + cache.put(k2, (arr(2.0), 1)) + keys = {k for k, _ in cache.items()} + assert keys == {k1, k2} + + +def test_evict_removes_matching_keys() -> None: + cache = make_cache() + cache.put(instant("2020"), (arr(1.0), 0)) + cache.put(instant("2021"), (arr(2.0), 1)) + cache.evict(lambda k: k >= instant("2021")) + assert cache.get(instant("2021")) is None + assert cache.get(instant("2020")) is not None + + +def test_evict_keeps_nonmatching_keys() -> None: + cache = make_cache() + cache.put(instant("2020"), (arr(1.0), 0)) + cache.put(instant("2021"), (arr(2.0), 1)) + cache.evict(lambda k: k >= instant("2022")) + assert cache.get(instant("2020")) is not None + assert cache.get(instant("2021")) is not None + + +def test_clear_removes_all() -> None: + cache = make_cache() + cache.put(instant("2020"), (arr(1.0), 0)) + cache.put(instant("2021"), (arr(2.0), 1)) + cache.clear() + assert cache.get(instant("2020")) is None + assert cache.get(instant("2021")) is None + + +def test_evict_removes_from_underlying_storage() -> None: + storage = data_storage.InMemoryStorage() + cache = data_storage.StorageCache(storage) + key = instant("2020") + cache.put(key, (arr(1.0), 0)) + cache.evict(lambda k: True) + assert len(list(storage.get_known_periods())) == 0 + + +def test_clear_removes_from_underlying_storage() -> None: + storage = data_storage.InMemoryStorage() + cache = data_storage.StorageCache(storage) + cache.put(instant("2020"), (arr(1.0), 0)) + cache.put(instant("2021"), (arr(2.0), 1)) + cache.clear() + assert len(list(storage.get_known_periods())) == 0 + + +def test_overwrite_key_updates_snapshot() -> None: + cache = make_cache() + key = instant("2020") + cache.put(key, (arr(1.0), 0)) + cache.put(key, (arr(9.0), 3)) + result = cache.get(key) + assert result is not None + array, patch_idx = result + numpy.testing.assert_array_equal(array, arr(9.0)) + assert patch_idx == 3 + + +def test_with_on_disk_storage(tmp_path: pytest.TempPathFactory) -> None: + storage = data_storage.OnDiskStorage(str(tmp_path), preserve_storage_dir=True) + cache = data_storage.StorageCache(storage) + key = instant("2020") + cache.put(key, (arr(1.0, 2.0), 0)) + result = cache.get(key) + assert result is not None + numpy.testing.assert_array_equal(result[0], arr(1.0, 2.0)) diff --git a/openfisca_core/data_storage/types.py b/openfisca_core/data_storage/types.py deleted file mode 100644 index db71abbf57..0000000000 --- a/openfisca_core/data_storage/types.py +++ /dev/null @@ -1,14 +0,0 @@ -from typing_extensions import TypedDict - -from openfisca_core.types import Array, DTypeGeneric, Enum, Period - - -class MemoryUsage(TypedDict, total=True): - """Memory usage information.""" - - cell_size: float - nb_arrays: int - total_nb_bytes: int - - -__all__ = ["Array", "DTypeGeneric", "Enum", "Period"] diff --git a/openfisca_core/experimental/_memory_config.py b/openfisca_core/experimental/_memory_config.py index 6fba790e90..5faa187222 100644 --- a/openfisca_core/experimental/_memory_config.py +++ b/openfisca_core/experimental/_memory_config.py @@ -4,15 +4,23 @@ import warnings +from openfisca_core import types as t + from ._errors import MemoryConfigWarning -class MemoryConfig: +class MemoryConfig(t.MemoryConfig): """Experimental memory configuration.""" #: Maximum memory occupation allowed. max_memory_occupation: float + #: Maximum memory occupation percentage. + max_memory_occupation_pc: float + + #: Maximum number of as_of snapshots to keep. + asof_max_snapshots: int + #: Priority variables. priority_variables: frozenset[str] @@ -24,6 +32,7 @@ def __init__( max_memory_occupation: str | float, priority_variables: Iterable[str] = frozenset(), variables_to_drop: Iterable[str] = frozenset(), + asof_max_snapshots: int = 3, ) -> None: message = [ "Memory configuration is a feature that is still currently under " @@ -38,5 +47,6 @@ def __init__( msg = "max_memory_occupation must be <= 1" raise ValueError(msg) self.max_memory_occupation_pc = self.max_memory_occupation * 100 + self.asof_max_snapshots = asof_max_snapshots self.priority_variables = frozenset(priority_variables) self.variables_to_drop = frozenset(variables_to_drop) diff --git a/openfisca_core/holders/__init__.py b/openfisca_core/holders/__init__.py index a120a671b9..a1b69b27c1 100644 --- a/openfisca_core/holders/__init__.py +++ b/openfisca_core/holders/__init__.py @@ -21,7 +21,6 @@ # # See: https://www.python.org/dev/peps/pep-0008/#imports -from . import types from .helpers import set_input_dispatch_by_period, set_input_divide_by_period from .holder import Holder @@ -29,5 +28,4 @@ "Holder", "set_input_dispatch_by_period", "set_input_divide_by_period", - "types", ] diff --git a/openfisca_core/holders/_cache.py b/openfisca_core/holders/_cache.py new file mode 100644 index 0000000000..c509a664d7 --- /dev/null +++ b/openfisca_core/holders/_cache.py @@ -0,0 +1,83 @@ +from __future__ import annotations + +from collections.abc import Callable, Iterable +from typing import Generic, TypeVar + +from collections import OrderedDict + +from openfisca_core import types as t + +_K = TypeVar("_K") +_V = TypeVar("_V") + + +class _FIFOCache(t.Cache[_K, _V], Generic[_K, _V]): + """Bounded FIFO cache: the oldest entry is evicted when capacity is reached.""" + + def __init__(self, maxsize: int) -> None: + self._store: OrderedDict[_K, _V] = OrderedDict() + self._maxsize = maxsize + + def put(self, key: _K, value: _V) -> None: + self._store[key] = value + + if len(self._store) > self._maxsize: + self._store.popitem(last=False) + + def get(self, key: _K) -> _V | None: + return self._store.get(key) + + def __contains__(self, key: object) -> bool: + return key in self._store + + def items(self) -> Iterable[tuple[_K, _V]]: + return self._store.items() + + def evict(self, predicate: Callable[[_K], bool]) -> None: + to_evict = [k for k in self._store if predicate(k)] + + for k in to_evict: + del self._store[k] + + def clear(self) -> None: + self._store.clear() + + +class _LRUCache(t.Cache[_K, _V], Generic[_K, _V]): + """Bounded LRU cache: the least recently used entry is evicted when capacity is reached.""" + + def __init__(self, maxsize: int) -> None: + self._store: OrderedDict[_K, _V] = OrderedDict() + self._maxsize = maxsize + + def put(self, key: _K, value: _V) -> None: + if key in self._store: + self._store.move_to_end(key) + + self._store[key] = value + + if len(self._store) > self._maxsize: + self._store.popitem(last=False) + + def get(self, key: _K) -> _V | None: + if key not in self._store: + return None + + self._store.move_to_end(key) + + return self._store[key] + + def __contains__(self, key: object) -> bool: + return key in self._store + + def items(self) -> Iterable[tuple[_K, _V]]: + return self._store.items() + + def evict(self, predicate: Callable[[_K], bool]) -> None: + to_evict = [k for k in self._store if predicate(k)] + + for k in to_evict: + del self._store[k] + + def clear(self) -> None: + self._store.clear() diff --git a/openfisca_core/holders/_store.py b/openfisca_core/holders/_store.py new file mode 100644 index 0000000000..7f67b9c785 --- /dev/null +++ b/openfisca_core/holders/_store.py @@ -0,0 +1,231 @@ +from __future__ import annotations + +import bisect + +import numpy + +from openfisca_core import data_storage, types as t + +from ._cache import _FIFOCache + + +class _TemporalStore(t.Store): + """Shared temporal event-sourced store logic. + + Stores a read-only base array and a sorted list of sparse patches. + Reconstruction at any instant is O(k) patches, aided by an injected + snapshot cache whose backend determines the eviction policy. + """ + + def __init__(self, snapshots: t.Cache[t.Instant, t.Snapshot]) -> None: + self._base: t.VarArray | None = None + self._base_instant: t.Instant | None = None + self._patches: list[tuple[t.Instant, t.IntArray, t.VarArray]] = [] + self._patch_instants: list[t.Instant] = [] + self.snapshots = snapshots + + def put(self, instant: t.Instant, array: t.VarArray) -> None: + """Set the full array at instant. + + First call establishes the immutable base. + Subsequent calls store only the diff as a sparse patch. + """ + if self._base is None: + self._base = array.copy() + self._base.flags.writeable = False + self._base_instant = instant + self.snapshots.put(instant, (self._base, -1)) + return + + prev = self._reconstruct(instant) + changed = array != prev + + if not changed.any(): + return + + idx = numpy.where(changed)[0].astype(numpy.int32) + vals = array[idx].copy() + self._insert_patch(instant, idx, vals, full_array=array) + + def put_sparse(self, instant: t.Instant, idx: t.IntArray, vals: t.VarArray) -> None: + """Store a sparse patch directly without a full N-element array. + + Requires the base to be established via put() first. + """ + if self._base is None: + raise ValueError( + "Cannot call put_sparse before the base is established. " + "Call put() first for the initial instant." + ) + + if len(idx) == 0: + return + + self._insert_patch(instant, idx.astype(numpy.int32), vals.copy()) + + def get(self, instant: t.Instant) -> t.VarArray | None: + """Return the reconstructed array as-of instant, or None if not yet set.""" + return self._reconstruct(instant) + + def _insert_patch( + self, + instant: t.Instant, + idx: t.IntArray, + vals: t.VarArray, + full_array: t.VarArray | None = None, + ) -> None: + pos = bisect.bisect_right(self._patch_instants, instant) + self._patches.insert(pos, (instant, idx, vals)) + self._patch_instants.insert(pos, instant) + new_patch_idx = len(self._patches) - 1 + + if pos == new_patch_idx: + if full_array is not None: + new_snap = full_array.copy() + new_snap.flags.writeable = False + self.snapshots.put(instant, (new_snap, new_patch_idx)) + else: + snap = self._build_snapshot_up_to(instant, new_patch_idx, idx, vals) + + if snap is not None: + self.snapshots.put(instant, (snap, new_patch_idx)) + else: + self.snapshots.evict(lambda k: k >= instant) + + def _build_snapshot_up_to( + self, + instant: t.Instant, + new_patch_idx: int, + last_idx: t.IntArray, + last_vals: t.VarArray, + ) -> t.VarArray | None: + best_instant = None + best_array = None + best_snap_idx = None + + for snap_instant, (snap_array, snap_idx) in self.snapshots.items(): + if snap_instant <= instant: + if best_instant is None or snap_instant > best_instant: + best_instant = snap_instant + best_array = snap_array + best_snap_idx = snap_idx + + if best_array is None or best_snap_idx is None: + return None + + new_snap = best_array.copy() + + for i in range(best_snap_idx + 1, new_patch_idx): + _, pidx, pvals = self._patches[i] + new_snap[pidx] = pvals + + new_snap[last_idx] = last_vals + new_snap.flags.writeable = False + + return new_snap + + def _reconstruct(self, instant: t.Instant) -> t.VarArray | None: + if self._base is None or self._base_instant is None: + return None + + if instant < self._base_instant: + return None + + pos = bisect.bisect_right(self._patch_instants, instant) + last_patch_idx = pos - 1 + cached = self.snapshots.get(instant) + + if cached is not None: + array, _ = cached + return array + + best_instant = None + best_array = None + best_patch_idx = None + + for snap_instant, (snap_array, snap_idx) in self.snapshots.items(): + if snap_instant < instant: + if best_instant is None or snap_instant > best_instant: + best_instant = snap_instant + best_array = snap_array + best_patch_idx = snap_idx + + if best_array is not None and best_patch_idx is not None: + result = best_array + + for i in range(best_patch_idx + 1, last_patch_idx + 1): + _, idx, vals = self._patches[i] + + if result is best_array: + result = result.copy() + + result[idx] = vals + + if result is not best_array: + result.flags.writeable = False + elif last_patch_idx == -1: + result = self._base + else: + result = self._base.copy() + + for i in range(last_patch_idx + 1): + _, idx, vals = self._patches[i] + result[idx] = vals + + result.flags.writeable = False + + self.snapshots.put(instant, (result, last_patch_idx)) + + return result + + def _copy_state_to(self, new: _TemporalStore) -> None: + new._base = self._base + new._base_instant = self._base_instant + new._patches = list(self._patches) + new._patch_instants = list(self._patch_instants) + + +class _CounterFactualStore(_TemporalStore): + """Temporal store with a bounded in-memory snapshot cache. + + The cache class is injectable — use _FIFOCache (default) for + forward-sequential simulations, _LRUCache for mixed-access patterns. + """ + + def __init__( + self, + maxsize: int = 3, + cache: type = _FIFOCache, + ) -> None: + super().__init__(snapshots=cache(maxsize=maxsize)) + self._maxsize = maxsize + self._cache_class: type = cache + + def clone(self) -> _CounterFactualStore: + new = _CounterFactualStore(maxsize=self._maxsize, cache=self._cache_class) + self._copy_state_to(new) + return new + + +class _FactualStore(_TemporalStore): + """Temporal store backed by InMemoryStorage or OnDiskStorage for snapshots. + + Snapshots are kept in the injected storage backend — unbounded by default + (InMemoryStorage). Pass an OnDiskStorage to offload snapshot arrays to + disk under memory pressure. + """ + + def __init__( + self, + maxsize: int = 3, + storage: t.Storage | None = None, + ) -> None: + _storage = storage if storage is not None else data_storage.InMemoryStorage() + super().__init__(snapshots=data_storage.StorageCache(_storage)) + self._maxsize = maxsize + self._backing_storage = _storage + + def clone(self) -> _FactualStore: + new = _FactualStore(maxsize=self._maxsize) + self._copy_state_to(new) + return new diff --git a/openfisca_core/holders/helpers.py b/openfisca_core/holders/helpers.py index fcc6563c79..20fc3af150 100644 --- a/openfisca_core/holders/helpers.py +++ b/openfisca_core/holders/helpers.py @@ -1,13 +1,17 @@ +from __future__ import annotations + import logging import numpy -from openfisca_core import periods +from openfisca_core import periods, types as t log = logging.getLogger(__name__) -def set_input_dispatch_by_period(holder, period, array) -> None: +def set_input_dispatch_by_period( + holder: t.Holder, period: t.Period, array: t.VarArray +) -> None: """This function can be declared as a ``set_input`` attribute of a variable. In this case, the variable will accept inputs on larger periods that its definition period, and the value for the larger period will be applied to all its subperiods. @@ -31,19 +35,24 @@ def set_input_dispatch_by_period(holder, period, array) -> None: after_instant = period.start.offset(period_size, period_unit) # Cache the input data, skipping the existing cached months - sub_period = periods.Period((cached_period_unit, period.start, 1)) + sub_period: t.Period = periods.Period((cached_period_unit, period.start, 1)) + while sub_period.start < after_instant: existing_array = holder.get_array(sub_period) + if existing_array is None: holder._set(sub_period, array) else: # The array of the current sub-period is reused for the next ones. # TODO: refactor or document this behavior array = existing_array + sub_period = sub_period.offset(1) -def set_input_divide_by_period(holder, period, array) -> None: +def set_input_divide_by_period( + holder: t.Holder, period: t.Period, array: t.VarArray +) -> None: """This function can be declared as a ``set_input`` attribute of a variable. In this case, the variable will accept inputs on larger periods that its definition period, and the value for the larger period will be divided between its subperiods. @@ -68,14 +77,17 @@ def set_input_divide_by_period(holder, period, array) -> None: # Count the number of elementary periods to change, and the difference with what is already known. remaining_array = array.copy() - sub_period = periods.Period((cached_period_unit, period.start, 1)) + sub_period: t.Period = periods.Period((cached_period_unit, period.start, 1)) sub_periods_count = 0 + while sub_period.start < after_instant: existing_array = holder.get_array(sub_period) + if existing_array is not None: remaining_array -= existing_array else: sub_periods_count += 1 + sub_period = sub_period.offset(1) # Cache the input data diff --git a/openfisca_core/holders/holder.py b/openfisca_core/holders/holder.py index f60d92f70b..a9989139fa 100644 --- a/openfisca_core/holders/holder.py +++ b/openfisca_core/holders/holder.py @@ -1,7 +1,6 @@ from __future__ import annotations from collections.abc import Sequence -from typing import Any import os import warnings @@ -15,21 +14,39 @@ errors, indexed_enums as enums, periods, - types, + types as t, ) -from . import types as t +from ._store import _CounterFactualStore -class Holder: +class Holder(t.Holder): """A holder keeps tracks of a variable values after they have been calculated, or set as an input.""" - def __init__(self, variable, population) -> None: + def __init__(self, variable: t.Variable, population: t.CorePopulation) -> None: self.population = population self.variable = variable self.simulation = population.simulation self._eternal = self.variable.definition_period == periods.DateUnit.ETERNITY + self._as_of = getattr(self.variable, "as_of", False) self._memory_storage = storage.InMemoryStorage(is_eternal=self._eternal) + if self._as_of: + # Resolution order: variable.snapshot_count > MemoryConfig.asof_max_snapshots > 3 + _mc = self.simulation.memory_config if self.simulation else None + _maxsize: int = next( + ( + v + for v in [ + getattr(self.variable, "snapshot_count", None), + _mc.asof_max_snapshots if _mc is not None else None, + ] + if v is not None + ), + 3, + ) + self._as_of_store = _CounterFactualStore(maxsize=_maxsize) + # Instants for which transition_formula has already been applied. + self._as_of_transition_computed: set[t.Instant] = set() # By default, do not activate on-disk storage, or variable dropping self._disk_storage = None @@ -54,12 +71,20 @@ def clone(self, population: t.CorePopulation) -> t.Holder: if key not in ("population", "formula", "simulation"): new_dict[key] = value + if self._as_of: + new_dict["_as_of_store"] = self._as_of_store.clone() + new_dict["_as_of_transition_computed"] = set( + self._as_of_transition_computed + ) + new_dict["population"] = population new_dict["simulation"] = population.simulation return new - def create_disk_storage(self, directory=None, preserve=False): + def create_disk_storage( + self, directory: str | None = None, preserve: bool = False + ) -> storage.OnDiskStorage: if directory is None: directory = self.simulation.data_storage_dir storage_dir = os.path.join(directory, self.variable.name) @@ -71,7 +96,7 @@ def create_disk_storage(self, directory=None, preserve=False): preserve_storage_dir=preserve, ) - def delete_arrays(self, period=None) -> None: + def delete_arrays(self, period: t.Period | None = None) -> None: """If ``period`` is ``None``, remove all known values of the variable. If ``period`` is not ``None``, only remove all values for any period included in period (e.g. if period is "2017", values for "2017-01", "2017-07", etc. would be removed) @@ -80,13 +105,16 @@ def delete_arrays(self, period=None) -> None: if self._disk_storage: self._disk_storage.delete(period) - def get_array(self, period): + def get_array(self, period: t.Period) -> t.VarArray | None: """Get the value of the variable for the given period. If the value is not known, return ``None``. """ if self.variable.is_neutralized: return self.default_array() + if self._as_of: + # Patch-based storage: bypass _memory_storage entirely. + return self._get_as_of(period) value = self._memory_storage.get(period) if value is not None: return value @@ -94,6 +122,36 @@ def get_array(self, period): return self._disk_storage.get(period) return None + def _get_as_of(self, period: t.Period) -> t.VarArray | None: + """Return the reconstructed array as-of the reference instant of period.""" + target = period.start if self._as_of == "start" else period.stop + return self._as_of_store.get(target) + + def set_input_sparse( + self, + period: t.Period | t.PeriodStr | t.PeriodInt, + idx: t.IntArray, + vals: t.VarArray, + ) -> None: + """Set new values for only the specified individuals. + + Unlike set_input(), the caller provides the diff directly: + - idx : array of person indices that changed (int) + - vals : their new values + + This avoids O(N) diff computation when only k << N individuals change. + Requires that set_input() was called at least once to establish the base. + """ + if not self._as_of: + raise ValueError( + f"set_input_sparse is only valid for as_of variables. " + f'"{self.variable.name}" does not declare as_of.' + ) + _period: t.Period = periods.period(period) + idx = numpy.asarray(idx, dtype=numpy.int32) + vals = numpy.asarray(vals, dtype=self.variable.dtype) + self._as_of_store.put_sparse(_period.start, idx, vals) + def get_memory_usage(self) -> t.MemoryUsage: """Get data about the virtual memory usage of the Holder. @@ -136,6 +194,7 @@ def get_memory_usage(self) -> t.MemoryUsage: """ usage = t.MemoryUsage( + total_nb_bytes=0, nb_cells_by_array=self.population.count, dtype=self.variable.dtype, ) @@ -157,7 +216,7 @@ def get_memory_usage(self) -> t.MemoryUsage: return usage - def get_known_periods(self): + def get_known_periods(self) -> list[t.Period]: """Get the list of periods the variable value is known for.""" return list(self._memory_storage.get_known_periods()) + list( self._disk_storage.get_known_periods() if self._disk_storage else [], @@ -165,9 +224,9 @@ def get_known_periods(self): def set_input( self, - period: types.Period, - array: numpy.ndarray | Sequence[Any], - ) -> numpy.ndarray | None: + period: t.Period, + array: numpy.ndarray | Sequence[object], + ) -> None: """Set a Variable's array of values of a given Period. Args: @@ -232,14 +291,16 @@ def set_input( ) if self.variable.is_neutralized: warning_message = f"You cannot set a value for the variable {self.variable.name}, as it has been neutralized. The value you provided ({array}) will be ignored." - return warnings.warn(warning_message, Warning, stacklevel=2) + warnings.warn(warning_message, Warning, stacklevel=2) + return if self.variable.value_type in (float, int) and isinstance(array, str): array = commons.eval_expression(array) if self.variable.set_input: - return self.variable.set_input(self, period, array) - return self._set(period, array) + self.variable.set_input(self, period, array) + return + self._set(period, array) - def _to_array(self, value): + def _to_array(self, value: t.VarArray | Sequence[object]) -> t.VarArray: if not isinstance(value, numpy.ndarray): value = numpy.asarray(value) if value.ndim == 0: @@ -250,7 +311,10 @@ def _to_array(self, value): raise ValueError( msg, ) - if self.variable.value_type == enums.Enum: + if ( + self.variable.value_type == enums.Enum + and self.variable.possible_values is not None + ): value = self.variable.possible_values.encode(value) if value.dtype != self.variable.dtype: try: @@ -262,17 +326,9 @@ def _to_array(self, value): ) return value - def _set(self, period, value) -> None: + def _set(self, period: t.Period, value: t.VarArray | Sequence[object]) -> None: value = self._to_array(value) if not self._eternal: - if period is None: - msg = ( - f"A period must be specified to set values, except for variables with " - f"{periods.DateUnit.ETERNITY.upper()} as as period_definition." - ) - raise ValueError( - msg, - ) if self.variable.definition_period != period.unit or period.size > 1: name = self.variable.name period_size_adj = ( @@ -295,19 +351,25 @@ def _set(self, period, value) -> None: error_message, ) - should_store_on_disk = ( - self._on_disk_storable - and self._memory_storage.get(period) is None - and psutil.virtual_memory().percent # If there is already a value in memory, replace it and don't put a new value in the disk storage - >= self.simulation.memory_config.max_memory_occupation_pc - ) + if self._as_of: + self._as_of_store.put(period.start, value) + return - if should_store_on_disk: - self._disk_storage.put(value, period) - else: - self._memory_storage.put(value, period) + if self._on_disk_storable: + # Invariant: _on_disk_storable is only True when simulation and memory_config are set. + _mc = self.simulation.memory_config + if ( + _mc is not None + and self._disk_storage is not None + and self._memory_storage.get(period) is None + # Don't offload if the period is already in memory — replace in place. + and psutil.virtual_memory().percent >= _mc.max_memory_occupation_pc + ): + self._disk_storage.put(value, period) + return + self._memory_storage.put(value, period) - def put_in_cache(self, value, period) -> None: + def put_in_cache(self, value: t.VarArray, period: t.Period) -> None: if self._do_not_store: return @@ -320,6 +382,6 @@ def put_in_cache(self, value, period) -> None: self._set(period, value) - def default_array(self): + def default_array(self) -> t.VarArray: """Return a new array of the appropriate length for the entity, filled with the variable default values.""" return self.variable.default_array(self.population.count) diff --git a/openfisca_core/holders/tests/cache/__init__.py b/openfisca_core/holders/tests/cache/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/openfisca_core/holders/tests/cache/test_fifo_cache.py b/openfisca_core/holders/tests/cache/test_fifo_cache.py new file mode 100644 index 0000000000..b11f26dbb8 --- /dev/null +++ b/openfisca_core/holders/tests/cache/test_fifo_cache.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +import pytest + +from ..._cache import _FIFOCache as Cache + + +@pytest.fixture +def cache() -> Cache[int, str]: + return Cache(maxsize=3) + + +def test_put_and_get(cache: Cache[int, str]) -> None: + cache.put(1, "one") + assert cache.get(1) == "one" + + +def test_get_missing_returns_none(cache: Cache[int, str]) -> None: + assert cache.get(99) is None + + +def test_contains(cache: Cache[int, str]) -> None: + cache.put(1, "one") + assert 1 in cache + assert 2 not in cache + + +def test_put_overwrites_existing(cache: Cache[int, str]) -> None: + cache.put(1, "one") + cache.put(1, "ONE") + assert cache.get(1) == "ONE" + assert len(list(cache.items())) == 1 + + +def test_fifo_eviction_drops_oldest() -> None: + cache: Cache[int, str] = Cache(maxsize=2) + cache.put(1, "one") + cache.put(2, "two") + cache.put(3, "three") + assert cache.get(1) is None + assert cache.get(2) == "two" + assert cache.get(3) == "three" + + +def test_fifo_eviction_respects_maxsize() -> None: + cache: Cache[int, str] = Cache(maxsize=1) + cache.put(1, "one") + cache.put(2, "two") + assert cache.get(1) is None + assert cache.get(2) == "two" + + +def test_items_returns_insertion_order(cache: Cache[int, str]) -> None: + cache.put(10, "ten") + cache.put(20, "twenty") + cache.put(30, "thirty") + assert list(cache.items()) == [(10, "ten"), (20, "twenty"), (30, "thirty")] + + +def test_evict_removes_matching(cache: Cache[int, str]) -> None: + cache.put(1, "one") + cache.put(2, "two") + cache.put(3, "three") + cache.evict(lambda k: k >= 2) + assert cache.get(1) == "one" + assert cache.get(2) is None + assert cache.get(3) is None + + +def test_evict_no_match_leaves_cache_intact(cache: Cache[int, str]) -> None: + cache.put(1, "one") + cache.put(2, "two") + cache.evict(lambda k: k > 100) + assert list(cache.items()) == [(1, "one"), (2, "two")] + + +def test_evict_all_removes_everything(cache: Cache[int, str]) -> None: + cache.put(1, "one") + cache.put(2, "two") + cache.evict(lambda _: True) + assert list(cache.items()) == [] + + +def test_clear_empties_cache(cache: Cache[int, str]) -> None: + cache.put(1, "one") + cache.put(2, "two") + cache.clear() + assert cache.get(1) is None + assert list(cache.items()) == [] + + +def test_maxsize_zero_never_stores() -> None: + cache: Cache[int, str] = Cache(maxsize=0) + cache.put(1, "one") + assert cache.get(1) is None + + +def test_put_after_clear_works(cache: Cache[int, str]) -> None: + cache.put(1, "one") + cache.clear() + cache.put(2, "two") + assert cache.get(2) == "two" diff --git a/openfisca_core/holders/tests/cache/test_lru_cache.py b/openfisca_core/holders/tests/cache/test_lru_cache.py new file mode 100644 index 0000000000..820179adfd --- /dev/null +++ b/openfisca_core/holders/tests/cache/test_lru_cache.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +import pytest + +from ..._cache import _LRUCache as Cache + + +@pytest.fixture +def cache() -> Cache[int, str]: + return Cache(maxsize=3) + + +def test_put_and_get(cache: Cache[int, str]) -> None: + cache.put(1, "one") + assert cache.get(1) == "one" + + +def test_get_missing_returns_none(cache: Cache[int, str]) -> None: + assert cache.get(99) is None + + +def test_contains(cache: Cache[int, str]) -> None: + cache.put(1, "one") + assert 1 in cache + assert 2 not in cache + + +def test_put_overwrites_existing(cache: Cache[int, str]) -> None: + cache.put(1, "one") + cache.put(1, "ONE") + assert cache.get(1) == "ONE" + assert len(list(cache.items())) == 1 + + +def test_fifo_eviction_drops_oldest() -> None: + cache: Cache[int, str] = Cache(maxsize=2) + cache.put(1, "one") + cache.put(2, "two") + cache.put(3, "three") + assert cache.get(1) is None + assert cache.get(2) == "two" + assert cache.get(3) == "three" + + +def test_fifo_eviction_respects_maxsize() -> None: + cache: Cache[int, str] = Cache(maxsize=1) + cache.put(1, "one") + cache.put(2, "two") + assert cache.get(1) is None + assert cache.get(2) == "two" + + +def test_items_returns_insertion_order(cache: Cache[int, str]) -> None: + cache.put(10, "ten") + cache.put(20, "twenty") + cache.put(30, "thirty") + assert list(cache.items()) == [(10, "ten"), (20, "twenty"), (30, "thirty")] + + +def test_evict_removes_matching(cache: Cache[int, str]) -> None: + cache.put(1, "one") + cache.put(2, "two") + cache.put(3, "three") + cache.evict(lambda k: k >= 2) + assert cache.get(1) == "one" + assert cache.get(2) is None + assert cache.get(3) is None + + +def test_evict_no_match_leaves_cache_intact(cache: Cache[int, str]) -> None: + cache.put(1, "one") + cache.put(2, "two") + cache.evict(lambda k: k > 100) + assert list(cache.items()) == [(1, "one"), (2, "two")] + + +def test_evict_all_removes_everything(cache: Cache[int, str]) -> None: + cache.put(1, "one") + cache.put(2, "two") + cache.evict(lambda _: True) + assert list(cache.items()) == [] + + +def test_clear_empties_cache(cache: Cache[int, str]) -> None: + cache.put(1, "one") + cache.put(2, "two") + cache.clear() + assert cache.get(1) is None + assert list(cache.items()) == [] + + +def test_maxsize_zero_never_stores() -> None: + cache: Cache[int, str] = Cache(maxsize=0) + cache.put(1, "one") + assert cache.get(1) is None + + +def test_put_after_clear_works(cache: Cache[int, str]) -> None: + cache.put(1, "one") + cache.clear() + cache.put(2, "two") + assert cache.get(2) == "two" diff --git a/openfisca_core/holders/tests/store/__init__.py b/openfisca_core/holders/tests/store/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/openfisca_core/holders/tests/store/test_counter_factual_store.py b/openfisca_core/holders/tests/store/test_counter_factual_store.py new file mode 100644 index 0000000000..5e7092abad --- /dev/null +++ b/openfisca_core/holders/tests/store/test_counter_factual_store.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +import numpy + +from openfisca_core import periods, types as t + +from ..._store import _CounterFactualStore as Store + + +def instant(s: str) -> t.Instant: + return periods.period(s).start + + +def arr(*values: float) -> t.VarArray: + return numpy.array(values, dtype=numpy.float32) + + +def test_clone_returns_same_values() -> None: + store = Store() + store.put(instant("2020"), arr(1.0, 2.0)) + store.put(instant("2021"), arr(3.0, 2.0)) + clone = store.clone() + numpy.testing.assert_array_equal(clone.get(instant("2020")), arr(1.0, 2.0)) + numpy.testing.assert_array_equal(clone.get(instant("2021")), arr(3.0, 2.0)) + + +def test_clone_patches_are_independent() -> None: + store = Store() + store.put(instant("2020"), arr(1.0, 2.0)) + clone = store.clone() + clone.put(instant("2021"), arr(9.0, 2.0)) + assert store.get(instant("2021")) is not None + numpy.testing.assert_array_equal(store.get(instant("2021")), arr(1.0, 2.0)) + + +def test_clone_shares_base_array() -> None: + store = Store() + store.put(instant("2020"), arr(1.0, 2.0)) + clone = store.clone() + assert clone.get(instant("2020")) is store.get(instant("2020")) + + +def test_cache_eviction_still_reconstructs_old_instant() -> None: + store = Store(maxsize=1) + store.put(instant("2020"), arr(1.0)) + store.put(instant("2021"), arr(2.0)) + store.put(instant("2022"), arr(3.0)) + numpy.testing.assert_array_equal(store.get(instant("2020")), arr(1.0)) + numpy.testing.assert_array_equal(store.get(instant("2022")), arr(3.0)) diff --git a/openfisca_core/holders/tests/store/test_factual_store.py b/openfisca_core/holders/tests/store/test_factual_store.py new file mode 100644 index 0000000000..a5f686d886 --- /dev/null +++ b/openfisca_core/holders/tests/store/test_factual_store.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +import numpy + +from openfisca_core import data_storage, periods, types as t + +from ..._store import _FactualStore as Store + + +def instant(s: str) -> t.Instant: + return periods.period(s).start + + +def arr(*values: float) -> t.VarArray: + return numpy.array(values, dtype=numpy.float32) + + +def test_clone_returns_same_values() -> None: + store = Store() + store.put(instant("2020"), arr(1.0, 2.0)) + store.put(instant("2021"), arr(3.0, 2.0)) + clone = store.clone() + numpy.testing.assert_array_equal(clone.get(instant("2020")), arr(1.0, 2.0)) + numpy.testing.assert_array_equal(clone.get(instant("2021")), arr(3.0, 2.0)) + + +def test_clone_patches_are_independent() -> None: + store = Store() + store.put(instant("2020"), arr(1.0, 2.0)) + clone = store.clone() + clone.put(instant("2021"), arr(9.0, 2.0)) + assert store.get(instant("2021")) is not None + numpy.testing.assert_array_equal(store.get(instant("2021")), arr(1.0, 2.0)) + + +def test_clone_shares_base_array() -> None: + store = Store() + store.put(instant("2020"), arr(1.0, 2.0)) + clone = store.clone() + assert clone.get(instant("2020")) is store.get(instant("2020")) + + +def test_injectable_on_disk_storage(tmp_path: str) -> None: + storage = data_storage.OnDiskStorage(str(tmp_path), preserve_storage_dir=True) + store = Store(storage=storage) + store.put(instant("2020"), arr(1.0, 2.0)) + store.put(instant("2021"), arr(3.0, 2.0)) + numpy.testing.assert_array_equal(store.get(instant("2020")), arr(1.0, 2.0)) + numpy.testing.assert_array_equal(store.get(instant("2021")), arr(3.0, 2.0)) diff --git a/openfisca_core/holders/tests/store/test_temporal_store.py b/openfisca_core/holders/tests/store/test_temporal_store.py new file mode 100644 index 0000000000..cb23edccea --- /dev/null +++ b/openfisca_core/holders/tests/store/test_temporal_store.py @@ -0,0 +1,124 @@ +from __future__ import annotations + +import numpy +import pytest + +from openfisca_core import periods, types as t + +from ..._cache import _FIFOCache +from ..._store import _TemporalStore + + +def instant(s: str) -> t.Instant: + return periods.period(s).start + + +def arr(*values: float) -> t.VarArray: + return numpy.array(values, dtype=numpy.float32) + + +def make_store() -> _TemporalStore: + return _TemporalStore(snapshots=_FIFOCache(maxsize=10)) + + +def test_get_before_put_returns_none() -> None: + store = make_store() + assert store.get(instant("2020")) is None + + +def test_first_put_establishes_base() -> None: + store = make_store() + store.put(instant("2020"), arr(1.0, 2.0)) + result = store.get(instant("2020")) + assert result is not None + numpy.testing.assert_array_equal(result, arr(1.0, 2.0)) + + +def test_get_before_base_instant_returns_none() -> None: + store = make_store() + store.put(instant("2021"), arr(1.0, 2.0)) + assert store.get(instant("2020")) is None + + +def test_get_at_base_instant_after_later_puts_returns_base() -> None: + store = make_store() + store.put(instant("2020"), arr(1.0, 2.0)) + store.put(instant("2021"), arr(9.0, 2.0)) + numpy.testing.assert_array_equal(store.get(instant("2020")), arr(1.0, 2.0)) + + +def test_sequential_put_reflects_latest_values() -> None: + store = make_store() + store.put(instant("2020"), arr(1.0, 2.0)) + store.put(instant("2021"), arr(3.0, 2.0)) + numpy.testing.assert_array_equal(store.get(instant("2021")), arr(3.0, 2.0)) + + +def test_get_between_puts_interpolates_correctly() -> None: + store = make_store() + store.put(instant("2020"), arr(1.0, 1.0)) + store.put(instant("2022"), arr(9.0, 1.0)) + numpy.testing.assert_array_equal(store.get(instant("2021")), arr(1.0, 1.0)) + + +def test_no_patch_stored_when_values_unchanged() -> None: + store = make_store() + store.put(instant("2020"), arr(1.0, 2.0)) + store.put(instant("2021"), arr(1.0, 2.0)) + numpy.testing.assert_array_equal(store.get(instant("2021")), arr(1.0, 2.0)) + + +def test_out_of_order_put_corrects_earlier_instant() -> None: + store = make_store() + store.put(instant("2020"), arr(0.0, 0.0)) + store.put(instant("2022"), arr(5.0, 0.0)) + store.put(instant("2021"), arr(1.0, 0.0)) + numpy.testing.assert_array_equal(store.get(instant("2021")), arr(1.0, 0.0)) + numpy.testing.assert_array_equal(store.get(instant("2022")), arr(5.0, 0.0)) + + +def test_put_sparse_before_base_raises() -> None: + store = make_store() + with pytest.raises(ValueError): + store.put_sparse(instant("2020"), numpy.array([0], dtype=numpy.int32), arr(9.0)) + + +def test_put_sparse_empty_idx_is_noop() -> None: + store = make_store() + store.put(instant("2020"), arr(1.0, 2.0)) + store.put_sparse( + instant("2021"), + numpy.array([], dtype=numpy.int32), + numpy.array([], dtype=numpy.float32), + ) + numpy.testing.assert_array_equal(store.get(instant("2021")), arr(1.0, 2.0)) + + +def test_put_sparse_applies_patch() -> None: + store = make_store() + store.put(instant("2020"), arr(1.0, 2.0, 3.0)) + store.put_sparse( + instant("2021"), + numpy.array([1], dtype=numpy.int32), + arr(9.0), + ) + numpy.testing.assert_array_equal(store.get(instant("2021")), arr(1.0, 9.0, 3.0)) + + +def test_put_sparse_does_not_change_earlier_instant() -> None: + store = make_store() + store.put(instant("2020"), arr(1.0, 2.0, 3.0)) + store.put_sparse( + instant("2021"), + numpy.array([1], dtype=numpy.int32), + arr(9.0), + ) + numpy.testing.assert_array_equal(store.get(instant("2020")), arr(1.0, 2.0, 3.0)) + + +def test_returned_array_is_read_only() -> None: + store = make_store() + store.put(instant("2020"), arr(1.0, 2.0)) + result = store.get(instant("2020")) + assert result is not None + assert not result.flags.writeable diff --git a/openfisca_core/holders/types.py b/openfisca_core/holders/types.py deleted file mode 100644 index 7137b86483..0000000000 --- a/openfisca_core/holders/types.py +++ /dev/null @@ -1,3 +0,0 @@ -from openfisca_core.types import CorePopulation, Holder, MemoryUsage - -__all__ = ["CorePopulation", "Holder", "MemoryUsage"] diff --git a/openfisca_core/indexed_enums/__init__.py b/openfisca_core/indexed_enums/__init__.py index 494601fc8d..abdfea2848 100644 --- a/openfisca_core/indexed_enums/__init__.py +++ b/openfisca_core/indexed_enums/__init__.py @@ -1,6 +1,5 @@ """Enumerations for variables with a limited set of possible values.""" -from . import types from ._enum_type import EnumType from ._errors import EnumEncodingError, EnumMemberNotFoundError from .config import ENUM_ARRAY_DTYPE @@ -14,5 +13,4 @@ "EnumEncodingError", "EnumMemberNotFoundError", "EnumType", - "types", ] diff --git a/openfisca_core/indexed_enums/_enum_type.py b/openfisca_core/indexed_enums/_enum_type.py index c4e9521026..de9e36c1cb 100644 --- a/openfisca_core/indexed_enums/_enum_type.py +++ b/openfisca_core/indexed_enums/_enum_type.py @@ -4,7 +4,7 @@ import numpy -from . import types as t +from openfisca_core import types as t @final @@ -26,7 +26,7 @@ class EnumType(t.EnumType): ... TENANT = "Tenant" >>> Housing.indices - array([0, 1], dtype=int16) + array([0, 1], dtype=uint8) >>> Housing.names array(['OWNER', 'TENANT'], dtype=' t.IndexArray: @@ -39,22 +39,22 @@ def _enum_to_index(value: t.ObjArray | t.ArrayLike[t.Enum]) -> t.IndexArray: TypeError: 'Road' object is not iterable >>> _enum_to_index([Road.AVENUE]) - array([1], dtype=int16) + array([1], dtype=uint8) >>> _enum_to_index(numpy.array(Road.AVENUE)) Traceback (most recent call last): TypeError: iteration over a 0-d array >>> _enum_to_index(numpy.array([Road.AVENUE])) - array([1], dtype=int16) + array([1], dtype=uint8) >>> value = numpy.array([Road.STREET, Road.AVENUE, Road.STREET]) >>> _enum_to_index(value) - array([0, 1, 0], dtype=int16) + array([0, 1, 0], dtype=uint8) >>> value = numpy.array([Road.AVENUE, Road.AVENUE, Rogue.BOULEVARD]) >>> _enum_to_index(value) - array([1, 1, 0], dtype=int16) + array([1, 1, 0], dtype=uint8) """ return numpy.array([enum.index for enum in value], t.EnumDType) @@ -92,28 +92,28 @@ def _int_to_index( ... ) >>> _int_to_index(Road, 1) - array([1], dtype=int16) + array([1], dtype=uint8) >>> _int_to_index(Road, [1]) - array([1], dtype=int16) + array([1], dtype=uint8) >>> _int_to_index(Road, array("B", [1])) - array([1], dtype=int16) + array([1], dtype=uint8) >>> _int_to_index(Road, memoryview(array("B", [1]))) - array([1], dtype=int16) + array([1], dtype=uint8) >>> _int_to_index(Road, numpy.array(1)) - array([1], dtype=int16) + array([1], dtype=uint8) >>> _int_to_index(Road, numpy.array([1])) - array([1], dtype=int16) + array([1], dtype=uint8) >>> _int_to_index(Road, numpy.array([0, 1, 0])) - array([0, 1, 0], dtype=int16) + array([0, 1, 0], dtype=uint8) >>> _int_to_index(Road, numpy.array([1, 1, 2])) - array([1, 1], dtype=int16) + array([1, 1], dtype=uint8) """ indices = enum_class.indices @@ -151,22 +151,22 @@ def _str_to_index( ... ) >>> _str_to_index(Road, "AVENUE") - array([1], dtype=int16) + array([1], dtype=uint8) >>> _str_to_index(Road, ["AVENUE"]) - array([1], dtype=int16) + array([1], dtype=uint8) >>> _str_to_index(Road, numpy.array("AVENUE")) - array([1], dtype=int16) + array([1], dtype=uint8) >>> _str_to_index(Road, numpy.array(["AVENUE"])) - array([1], dtype=int16) + array([1], dtype=uint8) >>> _str_to_index(Road, numpy.array(["STREET", "AVENUE", "STREET"])) - array([0, 1, 0], dtype=int16) + array([0, 1, 0], dtype=uint8) >>> _str_to_index(Road, numpy.array(["AVENUE", "AVENUE", "BOULEVARD"])) - array([1, 1], dtype=int16) + array([1, 1], dtype=uint8) """ values = numpy.asarray(value) diff --git a/openfisca_core/indexed_enums/enum.py b/openfisca_core/indexed_enums/enum.py index 43a893e859..3a8001e030 100644 --- a/openfisca_core/indexed_enums/enum.py +++ b/openfisca_core/indexed_enums/enum.py @@ -4,7 +4,8 @@ import numpy -from . import types as t +from openfisca_core import types as t + from ._enum_type import EnumType from ._errors import EnumEncodingError, EnumMemberNotFoundError from ._guards import ( diff --git a/openfisca_core/indexed_enums/enum_array.py b/openfisca_core/indexed_enums/enum_array.py index 65bc209a79..b0f0e4b527 100644 --- a/openfisca_core/indexed_enums/enum_array.py +++ b/openfisca_core/indexed_enums/enum_array.py @@ -5,7 +5,7 @@ import numpy -from . import types as t +from openfisca_core import types as t class EnumArray(t.EnumArray): diff --git a/openfisca_core/indexed_enums/types.py b/openfisca_core/indexed_enums/types.py deleted file mode 100644 index fcf0ce5a4c..0000000000 --- a/openfisca_core/indexed_enums/types.py +++ /dev/null @@ -1,42 +0,0 @@ -from typing_extensions import TypeAlias - -from openfisca_core.types import Array, ArrayLike, DTypeLike, Enum, EnumArray, EnumType - -from enum import _EnumDict as EnumDict # noqa: PLC2701 - -from numpy import ( - bool_ as BoolDType, - generic as VarDType, - int32 as IntDType, - object_ as ObjDType, - str_ as StrDType, -) - -from .config import ENUM_ARRAY_DTYPE as EnumDType - -#: Type for enum indices arrays. -IndexArray: TypeAlias = Array[EnumDType] - -#: Type for boolean arrays. -BoolArray: TypeAlias = Array[BoolDType] - -#: Type for int arrays. -IntArray: TypeAlias = Array[IntDType] - -#: Type for str arrays. -StrArray: TypeAlias = Array[StrDType] - -#: Type for object arrays. -ObjArray: TypeAlias = Array[ObjDType] - -#: Type for generic arrays. -VarArray: TypeAlias = Array[VarDType] - -__all__ = [ - "ArrayLike", - "DTypeLike", - "Enum", - "EnumArray", - "EnumDict", - "EnumType", -] diff --git a/openfisca_core/types.py b/openfisca_core/types.py index 3a4c0a4682..3eb3cb0be6 100644 --- a/openfisca_core/types.py +++ b/openfisca_core/types.py @@ -1,6 +1,6 @@ from __future__ import annotations -from collections.abc import Iterable, Iterator, Sequence, Sized +from collections.abc import Callable, Iterable, Iterator, Sequence, Sized from numpy.typing import DTypeLike, NDArray from typing import NewType, TypeVar, Union from typing_extensions import Protocol, Required, Self, TypeAlias, TypedDict @@ -8,6 +8,7 @@ import abc import enum import re +from enum import _EnumDict as EnumDict import numpy import pendulum @@ -26,10 +27,12 @@ #: Generic covariant type var. _T_co = TypeVar("_T_co", covariant=True) + # Arrays + #: Type var for numpy arrays. -_N_co = TypeVar("_N_co", covariant=True, bound="DTypeGeneric") +_N_co = TypeVar("_N_co", covariant=True, bound=numpy.generic) #: Type representing an numpy array. Array: TypeAlias = NDArray[_N_co] @@ -61,11 +64,19 @@ #: Type alias for an array of generic objects. VarArray: TypeAlias = Array[VarDType] + # Arrays-like + #: Type var for array-like objects. _L = TypeVar("_L") +#: Type var for cache keys. +_K = TypeVar("_K") + +#: Type var for cache values. +_V = TypeVar("_V") + #: Type representing an array-like object. ArrayLike: TypeAlias = Sequence[_L] @@ -109,11 +120,47 @@ def __instancecheck__(self, arg: object, /) -> bool: ) -class SeqInt(list[int], metaclass=_SeqIntMeta): ... # type: ignore[misc] +class SeqInt(list[int], metaclass=_SeqIntMeta): ... + + +# Shared + + +Snapshot: TypeAlias = tuple[VarArray, int] + + +class Cache(Protocol[_K, _V]): + def put(self, key: _K, value: _V, /) -> None: ... + + def get(self, key: _K, /) -> _V | None: ... + + def __contains__(self, key: object, /) -> bool: ... + + def items(self, /) -> Iterable[tuple[_K, _V]]: ... + + def evict(self, predicate: Callable[[_K], bool], /) -> None: ... + + def clear(self, /) -> None: ... + + +# Data Storage + + +class Storage(Protocol): + def get(self, period: Period | None = ..., /) -> Array[DTypeGeneric] | None: ... + def put(self, value: Array[DTypeGeneric], period: Period | None, /) -> None: ... + def delete(self, period: Period | None = ..., /) -> None: ... + + +class InMemoryStorage(Storage, Protocol): ... + + +class OnDiskStorage(Storage, Protocol): ... # Entities + #: For example "person". EntityKey = NewType("EntityKey", str) @@ -168,6 +215,10 @@ def plural(self, /) -> None | RolePlural: ... # Indexed enums +class PossibleValues(Protocol): + def encode(self, array: VarArray | ArrayLike[object], /) -> EnumArray: ... + + class EnumType(enum.EnumMeta): indices: Array[DTypeEnum] names: Array[DTypeStr] @@ -179,7 +230,9 @@ class Enum(enum.Enum, metaclass=EnumType): _member_names_: list[str] -class EnumArray(Array[DTypeEnum], metaclass=abc.ABCMeta): +class EnumArray( + numpy.ndarray[tuple[int, ...], numpy.dtype[EnumDType]], metaclass=abc.ABCMeta +): possible_values: None | type[Enum] @abc.abstractmethod @@ -191,24 +244,36 @@ def __new__( # Holders +class Store(Protocol): ... + + class Holder(Protocol): + variable: Variable + def clone(self, population: CorePopulation, /) -> Holder: ... def get_memory_usage(self, /) -> MemoryUsage: ... + def get_array(self, period: Period, /) -> VarArray | None: ... + + def _to_array(self, value: VarArray, /) -> VarArray: ... + + def _set(self, period: Period, value: VarArray, /) -> None: ... + class MemoryUsage(TypedDict, total=False): - cell_size: int + cell_size: float dtype: DTypeLike nb_arrays: int nb_cells_by_array: int nb_requests: int - nb_requests_by_array: int + nb_requests_by_array: float total_nb_bytes: Required[int] # Parameters + #: A type representing a node of parameters. ParameterNode: TypeAlias = Union[ "ParameterNodeAtInstant", "VectorialParameterNodeAtInstant" @@ -240,6 +305,7 @@ def __getitem__( # Periods + #: Matches "2015", "2015-01", "2015-01-01" but not "2015-13", "2015-12-32". iso_format = re.compile(r"^\d{4}(-(?:0[1-9]|1[0-2])(-(?:0[1-9]|[12]\d|3[01]))?)?$") @@ -258,7 +324,7 @@ def __instancecheck__(self, arg: object) -> bool: return isinstance(arg, (ISOFormatStr, ISOCalendarStr)) -class InstantStr(str, metaclass=_InstantStrMeta): # type: ignore[misc] +class InstantStr(str, metaclass=_InstantStrMeta): __slots__ = () @@ -267,7 +333,7 @@ def __instancecheck__(self, arg: object) -> bool: return isinstance(arg, str) and bool(iso_format.match(arg)) -class ISOFormatStr(str, metaclass=_ISOFormatStrMeta): # type: ignore[misc] +class ISOFormatStr(str, metaclass=_ISOFormatStrMeta): __slots__ = () @@ -276,7 +342,7 @@ def __instancecheck__(self, arg: object) -> bool: return isinstance(arg, str) and bool(iso_calendar.match(arg)) -class ISOCalendarStr(str, metaclass=_ISOCalendarStrMeta): # type: ignore[misc] +class ISOCalendarStr(str, metaclass=_ISOCalendarStrMeta): __slots__ = () @@ -289,7 +355,7 @@ def __instancecheck__(self, arg: object) -> bool: ) -class PeriodStr(str, metaclass=_PeriodStrMeta): # type: ignore[misc] +class PeriodStr(str, metaclass=_PeriodStrMeta): __slots__ = () @@ -348,10 +414,14 @@ def offset( #: Type alias for a period-like object. PeriodLike: TypeAlias = Union[Period, PeriodStr, PeriodInt] + # Populations -class CorePopulation(Protocol): ... +class CorePopulation(Protocol): + count: int + entity: CoreEntity + simulation: Simulation class SinglePopulation(CorePopulation, Protocol): @@ -366,7 +436,21 @@ class GroupPopulation(CorePopulation, Protocol): ... # Simulations +class MemoryConfig(Protocol): + asof_max_snapshots: int + priority_variables: Container[str] + variables_to_drop: Container[str] + max_memory_occupation_pc: float + + class Simulation(Protocol): + memory_config: MemoryConfig | None + data_storage_dir: str + trace: bool + tracer: FullTracer + opt_out_cache: bool + tax_benefit_system: TaxBenefitSystem + def calculate( self, variable_name: VariableName, period: Period, / ) -> Array[DTypeGeneric]: ... @@ -387,6 +471,7 @@ def get_population(self, plural: None | str, /) -> CorePopulation: ... class TaxBenefitSystem(Protocol): person_entity: SingleEntity + cache_blacklist: Container[str] | None def get_variable( self, @@ -398,6 +483,7 @@ def get_variable( # Tracers + #: A type representing a unit time. Time: TypeAlias = float @@ -503,8 +589,10 @@ def append_child(self, __node: TraceNode, /) -> None: ... #: A stack of simple traces. SimpleStack: TypeAlias = list[SimpleTraceMap] + # Variables + #: For example "salary". VariableName = NewType("VariableName", str) @@ -512,6 +600,14 @@ def append_child(self, __node: TraceNode, /) -> None: ... class Variable(Protocol): entity: CoreEntity name: VariableName + definition_period: DateUnit + is_neutralized: bool + dtype: DTypeLike + value_type: type + possible_values: PossibleValues | None + set_input: Callable[[Holder, Period, VarArray | Sequence[object]], None] | None + + def default_array(self, count: int, /) -> VarArray: ... class Formula(Protocol): @@ -528,4 +624,4 @@ class Params(Protocol): def __call__(self, instant: Instant, /) -> ParameterNodeAtInstant: ... -__all__ = ["DTypeLike"] +__all__ = ["BoolDType", "DTypeLike", "EnumDict", "EnumDType", "ObjDType", "StrDType"] diff --git a/setup.cfg b/setup.cfg index 127e09f709..90139fa475 100644 --- a/setup.cfg +++ b/setup.cfg @@ -89,7 +89,6 @@ implicit_reexport = false install_types = true mypy_path = stubs non_interactive = true -plugins = numpy.typing.mypy_plugin pretty = true python_version = 3.10 strict = false diff --git a/setup.py b/setup.py index 774f3f4702..5e1094668e 100644 --- a/setup.py +++ b/setup.py @@ -71,7 +71,7 @@ "flake8-rst-docstrings >=0.3.0, <0.4.0", "idna >=3.10, <4.0", "isort >=5.13.2, <6.0", - "mypy >=1.11.2, <2.0", + "mypy >=1.11.2, <1.18.2", "openapi-spec-validator >=0.7.1, <0.8.0", "pylint >=3.3.1, <4.0", "pylint-per-file-ignores >=1.3.2, <2.0", @@ -84,7 +84,7 @@ setup( name="OpenFisca-Core", - version="44.7.0", + version="44.8.0", author="OpenFisca Team", author_email="contact@openfisca.org", classifiers=[ diff --git a/tasks/lint.mk b/tasks/lint.mk index 532518dc7e..29ce15ea05 100644 --- a/tasks/lint.mk +++ b/tasks/lint.mk @@ -47,6 +47,7 @@ check-types: openfisca_core/data_storage \ openfisca_core/experimental \ openfisca_core/entities \ + openfisca_core/holders \ openfisca_core/indexed_enums \ openfisca_core/periods \ openfisca_core/types.py