From 96539eb98ed4371844dbba1d87a7edb42abe5892 Mon Sep 17 00:00:00 2001 From: Pascal Tomecek Date: Fri, 17 Apr 2026 10:27:45 -0400 Subject: [PATCH 1/5] feat: behavior hashing for cache key invalidation Add compute_behavior_token() which produces a SHA-256 fingerprint of a class's method bytecode. Decorator chains (@Flow.call, etc.) are automatically unwrapped via inspect.unwrap so the hash reflects the user's implementation, not the wrapper. Key design: - Walks MRO with override semantics (subclass overrides parent) - Supports __ccflow_tokenizer_deps__ for extra standalone functions - Dependencies sorted by qualname (order-insensitive) - Cached per-class in __behavior_token_cache__ (not inherited) - Returns None for classes with no hashable methods Integration: cache_key() now includes behavior tokens for the model and any non-transparent evaluators, so code changes invalidate the cache without requiring a config change. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Pascal Tomecek --- ccflow/evaluators/common.py | 23 +- ccflow/tests/utils/test_behavior_hash.py | 406 +++++++++++++++++++++++ ccflow/utils/__init__.py | 2 +- ccflow/utils/tokenize.py | 186 +++++++++++ 4 files changed, 614 insertions(+), 3 deletions(-) create mode 100644 ccflow/tests/utils/test_behavior_hash.py diff --git a/ccflow/evaluators/common.py b/ccflow/evaluators/common.py index 7dd22f3..75f33e2 100644 --- a/ccflow/evaluators/common.py +++ b/ccflow/evaluators/common.py @@ -224,9 +224,14 @@ def cache_key(flow_obj: Union[ModelEvaluationContext, ContextBase, CallableModel only on the underlying model, context, fn, options, and any non-transparent evaluators in the chain. + When the underlying model has callable methods, a behavior token (SHA-256 of + method bytecode) is included so that code changes invalidate the cache. + Args: flow_obj: The object to be tokenized to form the cache key. """ + from ..utils.tokenize import compute_behavior_token + if isinstance(flow_obj, ModelEvaluationContext): fn = flow_obj.fn non_transparent = [] @@ -238,10 +243,24 @@ def cache_key(flow_obj: Union[ModelEvaluationContext, ContextBase, CallableModel d = flow_obj.model_dump(mode="python") d["fn"] = fn if fn != "__call__" else flow_obj.fn if non_transparent: - d["_evaluators"] = [e.model_dump(mode="python") for e in non_transparent] + evaluator_data = [] + for e in non_transparent: + ed = e.model_dump(mode="python") + eb = compute_behavior_token(type(e)) + if eb is not None: + ed["_behavior"] = eb + evaluator_data.append(ed) + d["_evaluators"] = evaluator_data + behavior = compute_behavior_token(type(flow_obj.model)) + if behavior is not None: + d["_behavior"] = behavior return dask.base.tokenize(d).encode("utf-8") elif isinstance(flow_obj, (ContextBase, CallableModel)): - return dask.base.tokenize(flow_obj.model_dump(mode="python")).encode("utf-8") + d = flow_obj.model_dump(mode="python") + behavior = compute_behavior_token(type(flow_obj)) + if behavior is not None: + d["_behavior"] = behavior + return dask.base.tokenize(d).encode("utf-8") else: raise TypeError(f"object of type {type(flow_obj)} cannot be serialized by this function!") diff --git a/ccflow/tests/utils/test_behavior_hash.py b/ccflow/tests/utils/test_behavior_hash.py new file mode 100644 index 0000000..7c33041 --- /dev/null +++ b/ccflow/tests/utils/test_behavior_hash.py @@ -0,0 +1,406 @@ +"""Tests for behavior hashing (compute_behavior_token).""" + +from ccflow.callable import CallableModel, ContextBase +from ccflow.context import NullContext +from ccflow.evaluators.common import cache_key +from ccflow.result import GenericResult +from ccflow.utils.tokenize import compute_behavior_token + +# --------------------------------------------------------------------------- +# Basic behavior +# --------------------------------------------------------------------------- + + +class TestComputeBehaviorToken: + def test_returns_sha256_hex(self): + class M: + def f(self): + return 1 + + token = compute_behavior_token(M) + assert isinstance(token, str) + assert len(token) == 64 + + def test_deterministic(self): + class M: + def f(self): + return 1 + + assert compute_behavior_token(M) == compute_behavior_token(M) + + def test_different_bytecode_different_token(self): + class A: + def f(self): + return 1 + + class B: + def f(self): + return 2 + + assert compute_behavior_token(A) != compute_behavior_token(B) + + def test_same_logic_same_token(self): + class A: + def f(self): + return 42 + + class B: + def f(self): + return 42 + + assert compute_behavior_token(A) == compute_behavior_token(B) + + def test_none_for_no_methods(self): + class Empty: + x = 1 + + assert compute_behavior_token(Empty) is None + + def test_cached_on_class(self): + class M: + def f(self): + return 1 + + token = compute_behavior_token(M) + assert M.__behavior_token_cache__ == token + # Second call returns cached value + assert compute_behavior_token(M) is token + + def test_docstring_ignored(self): + class A: + def f(self): + """This is a docstring.""" + return 1 + + class B: + def f(self): + """Different docstring.""" + return 1 + + assert compute_behavior_token(A) == compute_behavior_token(B) + + def test_comments_ignored(self): + """Comments don't exist in bytecode, so they're inherently ignored.""" + + class A: + def f(self): + # a comment + return 1 + + class B: + def f(self): + # different comment + return 1 + + assert compute_behavior_token(A) == compute_behavior_token(B) + + def test_constants_matter(self): + class A: + def f(self): + return "hello" + + class B: + def f(self): + return "world" + + assert compute_behavior_token(A) != compute_behavior_token(B) + + +# --------------------------------------------------------------------------- +# Method collection +# --------------------------------------------------------------------------- + + +class TestMethodCollection: + def test_includes_regular_methods(self): + class M: + def method_a(self): + return 1 + + def method_b(self): + return 2 + + token = compute_behavior_token(M) + assert token is not None + + def test_includes_staticmethod(self): + class A: + @staticmethod + def helper(): + return 1 + + class B: + @staticmethod + def helper(): + return 2 + + assert compute_behavior_token(A) != compute_behavior_token(B) + + def test_includes_classmethod(self): + class A: + @classmethod + def factory(cls): + return cls() + + class B: + @classmethod + def factory(cls): + return None + + assert compute_behavior_token(A) != compute_behavior_token(B) + + def test_method_order_insensitive(self): + """Alphabetical sorting means definition order doesn't matter.""" + + class A: + def beta(self): + return 2 + + def alpha(self): + return 1 + + class B: + def alpha(self): + return 1 + + def beta(self): + return 2 + + assert compute_behavior_token(A) == compute_behavior_token(B) + + def test_skips_ccflow_internal_attrs(self): + """Attributes starting with __ccflow_ are skipped.""" + + class A: + __ccflow_tokenizer_deps__ = [] + + def f(self): + return 1 + + class B: + def f(self): + return 1 + + assert compute_behavior_token(A) == compute_behavior_token(B) + + +# --------------------------------------------------------------------------- +# Dependencies (__ccflow_tokenizer_deps__) +# --------------------------------------------------------------------------- + + +def _helper_add(x): + return x + 1 + + +def _helper_mul(x): + return x * 2 + + +class TestDeps: + def test_deps_included(self): + class NoDeps: + def f(self): + return 1 + + class WithDeps: + __ccflow_tokenizer_deps__ = [_helper_add] + + def f(self): + return 1 + + assert compute_behavior_token(NoDeps) != compute_behavior_token(WithDeps) + + def test_dep_order_insensitive(self): + class A: + __ccflow_tokenizer_deps__ = [_helper_add, _helper_mul] + + def f(self): + return 1 + + class B: + __ccflow_tokenizer_deps__ = [_helper_mul, _helper_add] + + def f(self): + return 1 + + assert compute_behavior_token(A) == compute_behavior_token(B) + + def test_dep_change_changes_token(self): + def v1(): + return 1 + + def v2(): + return 2 + + class A: + __ccflow_tokenizer_deps__ = [v1] + + def f(self): + return 1 + + class B: + __ccflow_tokenizer_deps__ = [v2] + + def f(self): + return 1 + + assert compute_behavior_token(A) != compute_behavior_token(B) + + +# --------------------------------------------------------------------------- +# Integration with cache_key() +# --------------------------------------------------------------------------- + + +class TestCacheKeyIntegration: + def test_callable_model_includes_behavior(self): + """cache_key for a CallableModel includes the behavior hash.""" + from ccflow import Flow + + class MyModel(CallableModel): + x: int = 1 + + @Flow.call + def __call__(self, context: NullContext) -> GenericResult: + return GenericResult(value=self.x + 1) + + class MyModelV2(CallableModel): + x: int = 1 + + @Flow.call + def __call__(self, context: NullContext) -> GenericResult: + return GenericResult(value=self.x + 2) + + key1 = cache_key(MyModel(x=1)) + key2 = cache_key(MyModelV2(x=1)) + # Same config, different __call__ → different cache key + assert key1 != key2 + + def test_same_callable_same_key(self): + from ccflow import Flow + + class MyModel(CallableModel): + x: int = 1 + + @Flow.call + def __call__(self, context: NullContext) -> GenericResult: + return GenericResult(value=self.x) + + key1 = cache_key(MyModel(x=1)) + key2 = cache_key(MyModel(x=1)) + assert key1 == key2 + + def test_different_config_different_key(self): + from ccflow import Flow + + class MyModel(CallableModel): + x: int = 1 + + @Flow.call + def __call__(self, context: NullContext) -> GenericResult: + return GenericResult(value=self.x) + + key1 = cache_key(MyModel(x=1)) + key2 = cache_key(MyModel(x=2)) + assert key1 != key2 + + def test_context_base_no_behavior(self): + """ContextBase with no custom methods has no behavior token — still works.""" + + class MyContext(ContextBase): + value: int = 1 + + key = cache_key(MyContext(value=1)) + assert isinstance(key, bytes) + + +# --------------------------------------------------------------------------- +# Decorator unwrapping (Flow.call, etc.) +# --------------------------------------------------------------------------- + + +class TestDecoratorUnwrapping: + def test_flow_call_different_impls_differ(self): + """@Flow.call wrappers are unwrapped — different implementations hash differently.""" + from ccflow import Flow + + class A(CallableModel): + @Flow.call + def __call__(self, context: NullContext) -> GenericResult: + return GenericResult(value=1) + + class B(CallableModel): + @Flow.call + def __call__(self, context: NullContext) -> GenericResult: + return GenericResult(value=2) + + assert compute_behavior_token(A) != compute_behavior_token(B) + + def test_flow_call_same_impl_same(self): + """Same @Flow.call implementation produces the same token.""" + from ccflow import Flow + + class A(CallableModel): + @Flow.call + def __call__(self, context: NullContext) -> GenericResult: + return GenericResult(value=42) + + class B(CallableModel): + @Flow.call + def __call__(self, context: NullContext) -> GenericResult: + return GenericResult(value=42) + + assert compute_behavior_token(A) == compute_behavior_token(B) + + +# --------------------------------------------------------------------------- +# MRO / inherited methods +# --------------------------------------------------------------------------- + + +class TestInheritedMethods: + def test_inherited_call_included(self): + """Subclass that inherits __call__ from parent picks up parent's method.""" + + class Base: + def __call__(self): + return 1 + + class Sub(Base): + pass + + # Sub inherits __call__, so it should have a behavior token + assert compute_behavior_token(Sub) is not None + assert compute_behavior_token(Sub) == compute_behavior_token(Base) + + def test_override_changes_token(self): + """Subclass overriding a method gets a different token.""" + + class Base: + def __call__(self): + return 1 + + class Sub(Base): + def __call__(self): + return 2 + + assert compute_behavior_token(Sub) != compute_behavior_token(Base) + + def test_subclass_cache_independent(self): + """Parent and subclass don't share __behavior_token_cache__.""" + + class Base: + def f(self): + return 1 + + class Sub(Base): + def g(self): + return 2 + + t_base = compute_behavior_token(Base) + t_sub = compute_behavior_token(Sub) + # Sub has additional method g, so tokens differ + assert t_base != t_sub + # But base's cached token is unaffected + assert compute_behavior_token(Base) == t_base diff --git a/ccflow/utils/__init__.py b/ccflow/utils/__init__.py index e1b3188..4f9da2d 100644 --- a/ccflow/utils/__init__.py +++ b/ccflow/utils/__init__.py @@ -1,4 +1,4 @@ from .chunker import * from .core import * from .logging import * -from .tokenize import normalize_token, tokenize +from .tokenize import compute_behavior_token, normalize_token, tokenize diff --git a/ccflow/utils/tokenize.py b/ccflow/utils/tokenize.py index 20b7161..0f56040 100644 --- a/ccflow/utils/tokenize.py +++ b/ccflow/utils/tokenize.py @@ -1,2 +1,188 @@ # ruff: noqa: F401 +"""Tokenization utilities for ccflow models. + +Re-exports ``normalize_token`` and ``tokenize`` from dask for data hashing. +Adds behavior hashing: deterministic SHA-256 fingerprints of class method +bytecode, useful for cache-key invalidation when callable logic changes. +""" + +import hashlib +import inspect +import logging +from typing import Callable, List, Optional, Tuple + from dask.base import normalize_token, tokenize + +__all__ = [ + "normalize_token", + "tokenize", + "compute_behavior_token", +] + +logger = logging.getLogger(__name__) + +# Methods that are never behavior-relevant (pydantic/python internals) +_SKIPPED_METHODS = frozenset( + { + "__eq__", + "__hash__", + "__repr__", + "__str__", + "__init__", + "__init_subclass__", + "__class_getitem__", + "__get_pydantic_core_schema__", + "__get_pydantic_json_schema__", + "model_post_init", + } +) + + +# --------------------------------------------------------------------------- +# Behavior hashing — bytecode-based fingerprinting of class methods +# --------------------------------------------------------------------------- + + +def _unwrap_function(func: object) -> Optional[Callable]: + """Unwrap descriptors and decorator chains to get the underlying function. + + Handles ``classmethod``, ``staticmethod``, ``property`` (fget), and + ``functools.wraps``-style ``__wrapped__`` chains (e.g. ``@Flow.call``). + Returns ``None`` if the underlying object has no ``__code__``. + """ + if isinstance(func, classmethod): + func = func.__func__ + elif isinstance(func, staticmethod): + func = func.__func__ + elif isinstance(func, property): + func = func.fget + if func is None: + return None + + # Unwrap decorator chains (e.g. @Flow.call sets __wrapped__) + try: + func = inspect.unwrap(func) + except (TypeError, ValueError): + pass + + if not callable(func) or not hasattr(func, "__code__"): + return None + return func + + +def _hash_function_bytecode(func: Callable) -> Optional[str]: + """Return a SHA-256 hex digest of a function's bytecode and constants. + + The function is first unwrapped through any decorator chains, so that + e.g. ``@Flow.call`` wrappers do not mask the real implementation. + + Returns ``None`` for objects without ``__code__`` (C builtins, etc.). + """ + unwrapped = _unwrap_function(func) + if unwrapped is None: + return None + code = unwrapped.__code__ + h = hashlib.sha256(code.co_code) + # Include constants (skip first if it's the docstring) + consts = code.co_consts + if consts and isinstance(consts[0], str): + consts = consts[1:] + h.update(repr(consts).encode("utf-8")) + return h.hexdigest() + + +def _collect_methods(cls: type) -> List[Tuple[str, Callable]]: + """Collect callable methods from *cls* (walking MRO) plus ``__ccflow_tokenizer_deps__``. + + Methods are collected with MRO override semantics: for each method name, + the first definition found in the MRO wins. This means a subclass's + ``__call__`` overrides the base class's even if the subclass doesn't + redefine every method. + + Own methods are sorted alphabetically. Dependencies are sorted by + qualified name so that declaration order does not affect the hash. + + Internal framework attributes (``__ccflow_*``) and pydantic/python + boilerplate methods are skipped. + """ + # Walk MRO to collect methods with override semantics + seen_names = set() + methods = [] + for klass in cls.__mro__: + if klass is object: + break + for name, value in klass.__dict__.items(): + if name in seen_names: + continue + seen_names.add(name) + if name.startswith("__ccflow_"): + continue + if name in _SKIPPED_METHODS: + continue + if isinstance(value, (classmethod, staticmethod, property)) or callable(value): + methods.append((name, value)) + + methods.sort(key=lambda pair: pair[0]) + + # Collect __ccflow_tokenizer_deps__ (also walk MRO, first definition wins) + extra_deps = None + for klass in cls.__mro__: + if "__ccflow_tokenizer_deps__" in klass.__dict__: + extra_deps = klass.__dict__["__ccflow_tokenizer_deps__"] + break + + if extra_deps is not None: + deps = [] + for func in extra_deps: + unwrapped = _unwrap_function(func) or func + if callable(unwrapped): + func_id = getattr(unwrapped, "__qualname__", getattr(unwrapped, "__name__", repr(unwrapped))) + deps.append((f"__dep__:{func_id}", func)) + deps.sort(key=lambda pair: pair[0]) + methods.extend(deps) + + return methods + + +def compute_behavior_token(cls: type) -> Optional[str]: + """Compute a SHA-256 behavior token for *cls* based on its method bytecode. + + The token captures bytecode (``co_code``) and constants (``co_consts``, + minus docstrings) of every method in *cls*'s MRO (with standard override + semantics), plus any standalone functions listed in + ``cls.__ccflow_tokenizer_deps__``. + + Decorator chains (e.g. ``@Flow.call``) are automatically unwrapped so + that the hash reflects the user's implementation, not the wrapper. + + Results are cached on the class in ``cls.__behavior_token_cache__``. + The cache is stored directly on the class (not inherited), so subclass + tokens are independent of parent tokens. + + Returns ``None`` if the class has no hashable methods. + + .. note:: + + Monkey-patching methods on an existing class after its behavior token + has been computed will **not** invalidate the cached token. Redefining + the class (e.g. in Jupyter) creates a new class object and works fine. + """ + # Check cache on cls itself (not inherited) + cache = cls.__dict__.get("__behavior_token_cache__") + if cache is not None: + return cache + + methods = _collect_methods(cls) + method_hashes = [(name, h) for name, func in methods if (h := _hash_function_bytecode(func)) is not None] + + if not method_hashes: + return None + + token = hashlib.sha256(repr(method_hashes).encode("utf-8")).hexdigest() + + try: + cls.__behavior_token_cache__ = token + except (TypeError, AttributeError): + pass # e.g. C extension types that don't allow attribute setting + + return token From e65f2f90fa0e9c57fb8449f600f7c94caf7fe199 Mon Sep 17 00:00:00 2001 From: Pascal Tomecek Date: Thu, 23 Apr 2026 13:09:58 -0400 Subject: [PATCH 2/5] refactor: simplify cache key token assembly Add compute_data_token() as the single wrapper around dask tokenization and refactor cache_key() to combine precomputed data and behavior tokens instead of mutating one nested payload dict. This makes cache_key() mostly orchestration: - flatten the evaluation context chain - collect data/behavior tokens for the underlying model - collect data/behavior tokens for non-transparent evaluators - combine those tokens into one final cache key Also adds tests for compute_data_token() and opaque evaluator behavior. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Pascal Tomecek --- ccflow/evaluators/common.py | 64 +++++++++++++----------- ccflow/tests/utils/test_behavior_hash.py | 47 +++++++++++++++-- ccflow/utils/__init__.py | 2 +- ccflow/utils/tokenize.py | 15 ++++-- 4 files changed, 92 insertions(+), 36 deletions(-) diff --git a/ccflow/evaluators/common.py b/ccflow/evaluators/common.py index 75f33e2..978085f 100644 --- a/ccflow/evaluators/common.py +++ b/ccflow/evaluators/common.py @@ -7,7 +7,6 @@ from types import MappingProxyType from typing import Any, Callable, Dict, List, Optional, Set, Union -import dask.base from pydantic import Field, PrivateAttr, field_validator from typing_extensions import override @@ -59,6 +58,33 @@ def combine_evaluators(first: Optional[EvaluatorBase], second: Optional[Evaluato return MultiEvaluator(evaluators=[first, second]) +def _model_tokens(model: BaseModel, *, include_data: bool = True) -> List[str]: + from ..utils.tokenize import compute_behavior_token, compute_data_token + + tokens = [compute_data_token(model.model_dump(mode="python"))] if include_data else [] + behavior = compute_behavior_token(type(model)) + if behavior is not None: + tokens.append(behavior) + return tokens + + +def _combine_tokens(tokens: List[str]) -> bytes: + from ..utils.tokenize import compute_data_token + + return compute_data_token(tuple(tokens)).encode("utf-8") + + +def _flatten_cache_key_context(flow_obj: ModelEvaluationContext) -> tuple[ModelEvaluationContext, str, List[EvaluatorBase]]: + fn = flow_obj.fn + non_transparent: List[EvaluatorBase] = [] + while isinstance(flow_obj.context, ModelEvaluationContext): + fn = flow_obj.fn if flow_obj.fn != "__call__" else fn + if not isinstance(flow_obj, TransparentModelEvaluationContext): + non_transparent.append(flow_obj.model) + flow_obj = flow_obj.context + return flow_obj, fn if fn != "__call__" else flow_obj.fn, non_transparent + + class MultiEvaluator(EvaluatorBase): """An evaluator that combines multiple evaluators. @@ -230,37 +256,17 @@ def cache_key(flow_obj: Union[ModelEvaluationContext, ContextBase, CallableModel Args: flow_obj: The object to be tokenized to form the cache key. """ - from ..utils.tokenize import compute_behavior_token + from ..utils.tokenize import compute_data_token if isinstance(flow_obj, ModelEvaluationContext): - fn = flow_obj.fn - non_transparent = [] - while isinstance(flow_obj.context, ModelEvaluationContext): - fn = flow_obj.fn if flow_obj.fn != "__call__" else fn - if not isinstance(flow_obj, TransparentModelEvaluationContext): - non_transparent.append(flow_obj.model) - flow_obj = flow_obj.context - d = flow_obj.model_dump(mode="python") - d["fn"] = fn if fn != "__call__" else flow_obj.fn - if non_transparent: - evaluator_data = [] - for e in non_transparent: - ed = e.model_dump(mode="python") - eb = compute_behavior_token(type(e)) - if eb is not None: - ed["_behavior"] = eb - evaluator_data.append(ed) - d["_evaluators"] = evaluator_data - behavior = compute_behavior_token(type(flow_obj.model)) - if behavior is not None: - d["_behavior"] = behavior - return dask.base.tokenize(d).encode("utf-8") + flow_obj, fn, non_transparent = _flatten_cache_key_context(flow_obj) + tokens = [compute_data_token({**flow_obj.model_dump(mode="python"), "fn": fn})] + tokens.extend(_model_tokens(flow_obj.model, include_data=False)) + for evaluator in non_transparent: + tokens.extend(_model_tokens(evaluator)) + return _combine_tokens(tokens) elif isinstance(flow_obj, (ContextBase, CallableModel)): - d = flow_obj.model_dump(mode="python") - behavior = compute_behavior_token(type(flow_obj)) - if behavior is not None: - d["_behavior"] = behavior - return dask.base.tokenize(d).encode("utf-8") + return _combine_tokens(_model_tokens(flow_obj)) else: raise TypeError(f"object of type {type(flow_obj)} cannot be serialized by this function!") diff --git a/ccflow/tests/utils/test_behavior_hash.py b/ccflow/tests/utils/test_behavior_hash.py index 7c33041..474f24a 100644 --- a/ccflow/tests/utils/test_behavior_hash.py +++ b/ccflow/tests/utils/test_behavior_hash.py @@ -1,10 +1,25 @@ -"""Tests for behavior hashing (compute_behavior_token).""" +"""Tests for tokenize helpers used by cache_key().""" -from ccflow.callable import CallableModel, ContextBase +from ccflow.callable import CallableModel, ContextBase, EvaluatorBase, ModelEvaluationContext from ccflow.context import NullContext from ccflow.evaluators.common import cache_key from ccflow.result import GenericResult -from ccflow.utils.tokenize import compute_behavior_token +from ccflow.utils.tokenize import compute_behavior_token, compute_data_token + +# --------------------------------------------------------------------------- +# Data token +# --------------------------------------------------------------------------- + + +class TestComputeDataToken: + def test_deterministic(self): + value = {"a": [1, 2], "b": ("x", 3)} + + assert compute_data_token(value) == compute_data_token(value) + + def test_different_values_different_tokens(self): + assert compute_data_token({"x": 1}) != compute_data_token({"x": 2}) + # --------------------------------------------------------------------------- # Basic behavior @@ -314,6 +329,32 @@ class MyContext(ContextBase): key = cache_key(MyContext(value=1)) assert isinstance(key, bytes) + def test_opaque_evaluator_behavior_changes_key(self): + from ccflow import Flow + + class MyModel(CallableModel): + @Flow.call + def __call__(self, context: NullContext) -> GenericResult: + return GenericResult(value=1) + + class OpaqueA(EvaluatorBase): + tag: str = "same" + + def __call__(self, context: ModelEvaluationContext): + return context() + + class OpaqueB(EvaluatorBase): + tag: str = "same" + + def __call__(self, context: ModelEvaluationContext): + result = context() + return result + + inner = ModelEvaluationContext(model=MyModel(), context=NullContext()) + key1 = cache_key(ModelEvaluationContext(model=OpaqueA(), context=inner)) + key2 = cache_key(ModelEvaluationContext(model=OpaqueB(), context=inner)) + assert key1 != key2 + # --------------------------------------------------------------------------- # Decorator unwrapping (Flow.call, etc.) diff --git a/ccflow/utils/__init__.py b/ccflow/utils/__init__.py index 4f9da2d..1b428c8 100644 --- a/ccflow/utils/__init__.py +++ b/ccflow/utils/__init__.py @@ -1,4 +1,4 @@ from .chunker import * from .core import * from .logging import * -from .tokenize import compute_behavior_token, normalize_token, tokenize +from .tokenize import compute_behavior_token, compute_data_token, normalize_token, tokenize diff --git a/ccflow/utils/tokenize.py b/ccflow/utils/tokenize.py index 0f56040..cf6d9bc 100644 --- a/ccflow/utils/tokenize.py +++ b/ccflow/utils/tokenize.py @@ -2,18 +2,20 @@ """Tokenization utilities for ccflow models. Re-exports ``normalize_token`` and ``tokenize`` from dask for data hashing. -Adds behavior hashing: deterministic SHA-256 fingerprints of class method -bytecode, useful for cache-key invalidation when callable logic changes. +Adds thin wrappers around dask-based data hashing and ccflow-specific +behavior hashing, useful for cache-key invalidation when callable logic +changes. """ import hashlib import inspect import logging -from typing import Callable, List, Optional, Tuple +from typing import Any, Callable, List, Optional, Tuple from dask.base import normalize_token, tokenize __all__ = [ + "compute_data_token", "normalize_token", "tokenize", "compute_behavior_token", @@ -21,6 +23,13 @@ logger = logging.getLogger(__name__) + +def compute_data_token(value: Any) -> str: + """Compute a deterministic data token using dask's tokenization.""" + + return tokenize(value) + + # Methods that are never behavior-relevant (pydantic/python internals) _SKIPPED_METHODS = frozenset( { From f3b21d1d80b17eeb5fadf4add33820f14efd3811 Mon Sep 17 00:00:00 2001 From: Pascal Tomecek Date: Thu, 23 Apr 2026 13:26:10 -0400 Subject: [PATCH 3/5] fix: cover defaults closures and inherited deps Update behavior hashing so function defaults, keyword-only defaults, and closure cell contents contribute to compute_behavior_token(). This closes a cache-key correctness gap where semantic changes could leave behavior tokens unchanged. Also merge __ccflow_tokenizer_deps__ across the full MRO instead of first-definition-wins, with deterministic deduping so subclasses can add deps without dropping inherited ones. Add regression tests for defaults, kwdefaults, closures, inherited deps, and a cache_key integration check for helper default changes. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Pascal Tomecek --- ccflow/tests/utils/test_behavior_hash.py | 83 ++++++++++++++++++++++ ccflow/utils/tokenize.py | 87 ++++++++++++++++++------ 2 files changed, 150 insertions(+), 20 deletions(-) diff --git a/ccflow/tests/utils/test_behavior_hash.py b/ccflow/tests/utils/test_behavior_hash.py index 474f24a..e40cd72 100644 --- a/ccflow/tests/utils/test_behavior_hash.py +++ b/ccflow/tests/utils/test_behavior_hash.py @@ -120,6 +120,38 @@ def f(self): assert compute_behavior_token(A) != compute_behavior_token(B) + def test_defaults_matter(self): + class A: + def f(self, x=1): + return x + + class B: + def f(self, x=2): + return x + + assert compute_behavior_token(A) != compute_behavior_token(B) + + def test_kwdefaults_matter(self): + class A: + def f(self, *, x=1): + return x + + class B: + def f(self, *, x=2): + return x + + assert compute_behavior_token(A) != compute_behavior_token(B) + + def test_closure_values_matter(self): + def make_model(value): + class M: + def f(self): + return value + + return M + + assert compute_behavior_token(make_model(1)) != compute_behavior_token(make_model(2)) + # --------------------------------------------------------------------------- # Method collection @@ -262,6 +294,36 @@ def f(self): assert compute_behavior_token(A) != compute_behavior_token(B) + def test_subclass_deps_extend_inherited_deps(self): + def base_a(): + return 1 + + def base_b(): + return 2 + + def sub_dep(): + return 3 + + class BaseA: + __ccflow_tokenizer_deps__ = [base_a] + + def f(self): + return 1 + + class BaseB: + __ccflow_tokenizer_deps__ = [base_b] + + def f(self): + return 1 + + class SubA(BaseA): + __ccflow_tokenizer_deps__ = [sub_dep] + + class SubB(BaseB): + __ccflow_tokenizer_deps__ = [sub_dep] + + assert compute_behavior_token(SubA) != compute_behavior_token(SubB) + # --------------------------------------------------------------------------- # Integration with cache_key() @@ -329,6 +391,27 @@ class MyContext(ContextBase): key = cache_key(MyContext(value=1)) assert isinstance(key, bytes) + def test_helper_default_arg_changes_key(self): + from ccflow import Flow + + class A(CallableModel): + @Flow.call + def __call__(self, context: NullContext) -> GenericResult: + return GenericResult(value=self.helper()) + + def helper(self, x=1): + return x + + class B(CallableModel): + @Flow.call + def __call__(self, context: NullContext) -> GenericResult: + return GenericResult(value=self.helper()) + + def helper(self, x=2): + return x + + assert cache_key(A()) != cache_key(B()) + def test_opaque_evaluator_behavior_changes_key(self): from ccflow import Flow diff --git a/ccflow/utils/tokenize.py b/ccflow/utils/tokenize.py index cf6d9bc..24433b1 100644 --- a/ccflow/utils/tokenize.py +++ b/ccflow/utils/tokenize.py @@ -79,12 +79,40 @@ def _unwrap_function(func: object) -> Optional[Callable]: return func +def _function_state(func: Callable) -> Tuple[Any, Any, Tuple[Tuple[str, bool, Any], ...] | None]: + """Return defaults/kwdefaults/closure state that affects runtime behavior.""" + + kwdefaults = getattr(func, "__kwdefaults__", None) + if kwdefaults is not None: + kwdefaults = tuple(sorted(kwdefaults.items())) + + closure = getattr(func, "__closure__", None) + closure_state = None + if closure: + closure_state = [] + for name, cell in zip(func.__code__.co_freevars, closure): + try: + closure_state.append((name, True, cell.cell_contents)) + except ValueError: + closure_state.append((name, False, None)) + closure_state = tuple(closure_state) + + return getattr(func, "__defaults__", None), kwdefaults, closure_state + + def _hash_function_bytecode(func: Callable) -> Optional[str]: - """Return a SHA-256 hex digest of a function's bytecode and constants. + """Return a SHA-256 hex digest of a function's behavior-relevant state. The function is first unwrapped through any decorator chains, so that e.g. ``@Flow.call`` wrappers do not mask the real implementation. + In addition to ``co_code`` and ``co_consts``, this includes: + - positional defaults (``__defaults__``) + - keyword-only defaults (``__kwdefaults__``) + - closure cell contents + so that behavior changes that do not affect bytecode alone still change + the token. + Returns ``None`` for objects without ``__code__`` (C builtins, etc.). """ unwrapped = _unwrap_function(func) @@ -97,9 +125,20 @@ def _hash_function_bytecode(func: Callable) -> Optional[str]: if consts and isinstance(consts[0], str): consts = consts[1:] h.update(repr(consts).encode("utf-8")) + h.update(compute_data_token(_function_state(unwrapped)).encode("utf-8")) return h.hexdigest() +def _dependency_sort_key(func: Callable) -> Tuple[str, str, str]: + """Return a deterministic identity for dependency sorting/deduping.""" + + unwrapped = _unwrap_function(func) or func + module = getattr(unwrapped, "__module__", "") + qualname = getattr(unwrapped, "__qualname__", getattr(unwrapped, "__name__", repr(unwrapped))) + behavior = _hash_function_bytecode(unwrapped) or "" + return module, qualname, behavior + + def _collect_methods(cls: type) -> List[Tuple[str, Callable]]: """Collect callable methods from *cls* (walking MRO) plus ``__ccflow_tokenizer_deps__``. @@ -108,8 +147,9 @@ def _collect_methods(cls: type) -> List[Tuple[str, Callable]]: ``__call__`` overrides the base class's even if the subclass doesn't redefine every method. - Own methods are sorted alphabetically. Dependencies are sorted by - qualified name so that declaration order does not affect the hash. + Own methods are sorted alphabetically. Dependencies are merged across the + MRO, deduplicated, and sorted deterministically so that declaration order + does not affect the hash. Internal framework attributes (``__ccflow_*``) and pydantic/python boilerplate methods are skipped. @@ -133,22 +173,26 @@ def _collect_methods(cls: type) -> List[Tuple[str, Callable]]: methods.sort(key=lambda pair: pair[0]) - # Collect __ccflow_tokenizer_deps__ (also walk MRO, first definition wins) - extra_deps = None + # Collect __ccflow_tokenizer_deps__ from the full MRO. Subclasses may add + # deps without losing inherited ones. + deps = [] + seen_dep_keys = set() for klass in cls.__mro__: - if "__ccflow_tokenizer_deps__" in klass.__dict__: - extra_deps = klass.__dict__["__ccflow_tokenizer_deps__"] - break - - if extra_deps is not None: - deps = [] + extra_deps = klass.__dict__.get("__ccflow_tokenizer_deps__") + if extra_deps is None: + continue for func in extra_deps: unwrapped = _unwrap_function(func) or func - if callable(unwrapped): - func_id = getattr(unwrapped, "__qualname__", getattr(unwrapped, "__name__", repr(unwrapped))) - deps.append((f"__dep__:{func_id}", func)) - deps.sort(key=lambda pair: pair[0]) - methods.extend(deps) + if not callable(unwrapped): + continue + dep_key = _dependency_sort_key(func) + if dep_key in seen_dep_keys: + continue + seen_dep_keys.add(dep_key) + deps.append((dep_key, func)) + + deps.sort(key=lambda pair: pair[0]) + methods.extend((f"__dep__:{dep_key[1]}", func) for dep_key, func in deps) return methods @@ -156,14 +200,17 @@ def _collect_methods(cls: type) -> List[Tuple[str, Callable]]: def compute_behavior_token(cls: type) -> Optional[str]: """Compute a SHA-256 behavior token for *cls* based on its method bytecode. - The token captures bytecode (``co_code``) and constants (``co_consts``, - minus docstrings) of every method in *cls*'s MRO (with standard override - semantics), plus any standalone functions listed in - ``cls.__ccflow_tokenizer_deps__``. + The token captures behavior-relevant state for every method in *cls*'s MRO + (with standard override semantics): bytecode, constants (minus docstrings), + defaults, keyword-only defaults, and closure cell contents. It also + includes any standalone functions listed in ``cls.__ccflow_tokenizer_deps__``. Decorator chains (e.g. ``@Flow.call``) are automatically unwrapped so that the hash reflects the user's implementation, not the wrapper. + ``__ccflow_tokenizer_deps__`` values are merged across the full MRO, so + subclasses can add dependencies without dropping inherited ones. + Results are cached on the class in ``cls.__behavior_token_cache__``. The cache is stored directly on the class (not inherited), so subclass tokens are independent of parent tokens. From d1e692426980259dd4b92e826973a3e78d95ab70 Mon Sep 17 00:00:00 2001 From: Pascal Tomecek Date: Thu, 23 Apr 2026 14:35:50 -0400 Subject: [PATCH 4/5] feat: extend tokenizer cache APIs Add compute_cache_token() alongside compute_data_token() and compute_behavior_token(), refactor cache_key() to delegate to it, and rename the cached class attribute to __ccflow_tokenizer_cache__ so it matches __ccflow_tokenizer_deps__. This commit also keeps class support in __ccflow_tokenizer_deps__, including recursive class-dependency detection, and adds regression coverage for combined cache tokens and cache-key integration. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Pascal Tomecek --- ccflow/evaluators/common.py | 35 +++---- ccflow/tests/utils/test_behavior_hash.py | 106 +++++++++++++++++++++- ccflow/utils/__init__.py | 2 +- ccflow/utils/tokenize.py | 111 +++++++++++++++-------- 4 files changed, 191 insertions(+), 63 deletions(-) diff --git a/ccflow/evaluators/common.py b/ccflow/evaluators/common.py index 978085f..2cab984 100644 --- a/ccflow/evaluators/common.py +++ b/ccflow/evaluators/common.py @@ -58,22 +58,6 @@ def combine_evaluators(first: Optional[EvaluatorBase], second: Optional[Evaluato return MultiEvaluator(evaluators=[first, second]) -def _model_tokens(model: BaseModel, *, include_data: bool = True) -> List[str]: - from ..utils.tokenize import compute_behavior_token, compute_data_token - - tokens = [compute_data_token(model.model_dump(mode="python"))] if include_data else [] - behavior = compute_behavior_token(type(model)) - if behavior is not None: - tokens.append(behavior) - return tokens - - -def _combine_tokens(tokens: List[str]) -> bytes: - from ..utils.tokenize import compute_data_token - - return compute_data_token(tuple(tokens)).encode("utf-8") - - def _flatten_cache_key_context(flow_obj: ModelEvaluationContext) -> tuple[ModelEvaluationContext, str, List[EvaluatorBase]]: fn = flow_obj.fn non_transparent: List[EvaluatorBase] = [] @@ -256,17 +240,22 @@ def cache_key(flow_obj: Union[ModelEvaluationContext, ContextBase, CallableModel Args: flow_obj: The object to be tokenized to form the cache key. """ - from ..utils.tokenize import compute_data_token + from ..utils.tokenize import compute_cache_token if isinstance(flow_obj, ModelEvaluationContext): flow_obj, fn, non_transparent = _flatten_cache_key_context(flow_obj) - tokens = [compute_data_token({**flow_obj.model_dump(mode="python"), "fn": fn})] - tokens.extend(_model_tokens(flow_obj.model, include_data=False)) - for evaluator in non_transparent: - tokens.extend(_model_tokens(evaluator)) - return _combine_tokens(tokens) + return compute_cache_token( + data_values=[ + {**flow_obj.model_dump(mode="python"), "fn": fn}, + *(evaluator.model_dump(mode="python") for evaluator in non_transparent), + ], + behavior_classes=[type(flow_obj.model), *(type(evaluator) for evaluator in non_transparent)], + ).encode("utf-8") elif isinstance(flow_obj, (ContextBase, CallableModel)): - return _combine_tokens(_model_tokens(flow_obj)) + return compute_cache_token( + data_values=[flow_obj.model_dump(mode="python")], + behavior_classes=[type(flow_obj)], + ).encode("utf-8") else: raise TypeError(f"object of type {type(flow_obj)} cannot be serialized by this function!") diff --git a/ccflow/tests/utils/test_behavior_hash.py b/ccflow/tests/utils/test_behavior_hash.py index e40cd72..5bcb562 100644 --- a/ccflow/tests/utils/test_behavior_hash.py +++ b/ccflow/tests/utils/test_behavior_hash.py @@ -1,10 +1,12 @@ """Tests for tokenize helpers used by cache_key().""" +import pytest + from ccflow.callable import CallableModel, ContextBase, EvaluatorBase, ModelEvaluationContext from ccflow.context import NullContext from ccflow.evaluators.common import cache_key from ccflow.result import GenericResult -from ccflow.utils.tokenize import compute_behavior_token, compute_data_token +from ccflow.utils.tokenize import compute_behavior_token, compute_cache_token, compute_data_token # --------------------------------------------------------------------------- # Data token @@ -21,6 +23,39 @@ def test_different_values_different_tokens(self): assert compute_data_token({"x": 1}) != compute_data_token({"x": 2}) +class TestComputeCacheToken: + def test_deterministic(self): + class Helper: + def f(self): + return 1 + + token1 = compute_cache_token(data_values=[{"x": 1}], behavior_classes=[Helper]) + token2 = compute_cache_token(data_values=[{"x": 1}], behavior_classes=[Helper]) + assert token1 == token2 + + def test_data_changes_token(self): + class Helper: + def f(self): + return 1 + + token1 = compute_cache_token(data_values=[{"x": 1}], behavior_classes=[Helper]) + token2 = compute_cache_token(data_values=[{"x": 2}], behavior_classes=[Helper]) + assert token1 != token2 + + def test_behavior_changes_token(self): + class HelperA: + def f(self): + return 1 + + class HelperB: + def f(self): + return 2 + + token1 = compute_cache_token(data_values=[{"x": 1}], behavior_classes=[HelperA]) + token2 = compute_cache_token(data_values=[{"x": 1}], behavior_classes=[HelperB]) + assert token1 != token2 + + # --------------------------------------------------------------------------- # Basic behavior # --------------------------------------------------------------------------- @@ -77,7 +112,7 @@ def f(self): return 1 token = compute_behavior_token(M) - assert M.__behavior_token_cache__ == token + assert M.__ccflow_tokenizer_cache__ == token # Second call returns cached value assert compute_behavior_token(M) is token @@ -294,6 +329,29 @@ def f(self): assert compute_behavior_token(A) != compute_behavior_token(B) + def test_class_dep_included(self): + class HelperA: + def f(self): + return 1 + + class HelperB: + def f(self): + return 2 + + class A: + __ccflow_tokenizer_deps__ = [HelperA] + + def f(self): + return 1 + + class B: + __ccflow_tokenizer_deps__ = [HelperB] + + def f(self): + return 1 + + assert compute_behavior_token(A) != compute_behavior_token(B) + def test_subclass_deps_extend_inherited_deps(self): def base_a(): return 1 @@ -324,6 +382,21 @@ class SubB(BaseB): assert compute_behavior_token(SubA) != compute_behavior_token(SubB) + def test_recursive_class_deps_raise(self): + class A: + def f(self): + return 1 + + class B: + def g(self): + return 2 + + A.__ccflow_tokenizer_deps__ = [B] + B.__ccflow_tokenizer_deps__ = [A] + + with pytest.raises(TypeError, match="Recursive __ccflow_tokenizer_deps__ class dependency"): + compute_behavior_token(A) + # --------------------------------------------------------------------------- # Integration with cache_key() @@ -412,6 +485,33 @@ def helper(self, x=2): assert cache_key(A()) != cache_key(B()) + def test_class_dep_changes_key(self): + from ccflow import Flow + + class HelperA: + def f(self): + return 1 + + class HelperB: + def f(self): + return 2 + + class A(CallableModel): + __ccflow_tokenizer_deps__ = [HelperA] + + @Flow.call + def __call__(self, context: NullContext) -> GenericResult: + return GenericResult(value=1) + + class B(CallableModel): + __ccflow_tokenizer_deps__ = [HelperB] + + @Flow.call + def __call__(self, context: NullContext) -> GenericResult: + return GenericResult(value=1) + + assert cache_key(A()) != cache_key(B()) + def test_opaque_evaluator_behavior_changes_key(self): from ccflow import Flow @@ -512,7 +612,7 @@ def __call__(self): assert compute_behavior_token(Sub) != compute_behavior_token(Base) def test_subclass_cache_independent(self): - """Parent and subclass don't share __behavior_token_cache__.""" + """Parent and subclass don't share __ccflow_tokenizer_cache__.""" class Base: def f(self): diff --git a/ccflow/utils/__init__.py b/ccflow/utils/__init__.py index 1b428c8..7e9fa23 100644 --- a/ccflow/utils/__init__.py +++ b/ccflow/utils/__init__.py @@ -1,4 +1,4 @@ from .chunker import * from .core import * from .logging import * -from .tokenize import compute_behavior_token, compute_data_token, normalize_token, tokenize +from .tokenize import compute_behavior_token, compute_cache_token, compute_data_token, normalize_token, tokenize diff --git a/ccflow/utils/tokenize.py b/ccflow/utils/tokenize.py index 24433b1..473e2bb 100644 --- a/ccflow/utils/tokenize.py +++ b/ccflow/utils/tokenize.py @@ -2,26 +2,24 @@ """Tokenization utilities for ccflow models. Re-exports ``normalize_token`` and ``tokenize`` from dask for data hashing. -Adds thin wrappers around dask-based data hashing and ccflow-specific -behavior hashing, useful for cache-key invalidation when callable logic -changes. +Adds helpers for dask-based data hashing, ccflow-specific behavior hashing, +and combined cache-token hashing, useful for cache-key invalidation when +callable logic changes. """ import hashlib import inspect -import logging -from typing import Any, Callable, List, Optional, Tuple +from typing import Any, Callable, Iterable, List, Optional, Tuple from dask.base import normalize_token, tokenize -__all__ = [ +__all__ = ( + "compute_behavior_token", + "compute_cache_token", "compute_data_token", "normalize_token", "tokenize", - "compute_behavior_token", -] - -logger = logging.getLogger(__name__) +) def compute_data_token(value: Any) -> str: @@ -30,6 +28,22 @@ def compute_data_token(value: Any) -> str: return tokenize(value) +def compute_cache_token(*, data_values: Iterable[Any] = (), behavior_classes: Iterable[type] = ()) -> str: + """Compute a cache token by combining data and behavior tokens. + + Args: + data_values: Values whose serialized data should affect the cache key. + behavior_classes: Classes whose behavior tokens should affect the cache key. + """ + + tokens = [compute_data_token(value) for value in data_values] + for cls in behavior_classes: + behavior = compute_behavior_token(cls) + if behavior is not None: + tokens.append(behavior) + return compute_data_token(tuple(tokens)) + + # Methods that are never behavior-relevant (pydantic/python internals) _SKIPPED_METHODS = frozenset( { @@ -129,27 +143,37 @@ def _hash_function_bytecode(func: Callable) -> Optional[str]: return h.hexdigest() -def _dependency_sort_key(func: Callable) -> Tuple[str, str, str]: - """Return a deterministic identity for dependency sorting/deduping.""" +def _dependency_info(dep: object, *, _visited: Tuple[type, ...]) -> Optional[Tuple[Tuple[str, str, str, str], str, str]]: + """Return deterministic identity, name, and token for a dependency entry.""" - unwrapped = _unwrap_function(func) or func + if isinstance(dep, type): + module = getattr(dep, "__module__", "") + qualname = getattr(dep, "__qualname__", getattr(dep, "__name__", repr(dep))) + behavior = compute_behavior_token(dep, _visited=_visited) + if behavior is None: + return None + return ("class", module, qualname, behavior), f"__dep_class__:{qualname}", behavior + + unwrapped = _unwrap_function(dep) + if unwrapped is None: + return None module = getattr(unwrapped, "__module__", "") qualname = getattr(unwrapped, "__qualname__", getattr(unwrapped, "__name__", repr(unwrapped))) - behavior = _hash_function_bytecode(unwrapped) or "" - return module, qualname, behavior + behavior = _hash_function_bytecode(unwrapped) + if behavior is None: + return None + return ("callable", module, qualname, behavior), f"__dep__:{qualname}", behavior def _collect_methods(cls: type) -> List[Tuple[str, Callable]]: - """Collect callable methods from *cls* (walking MRO) plus ``__ccflow_tokenizer_deps__``. + """Collect callable methods from *cls* (walking MRO). Methods are collected with MRO override semantics: for each method name, the first definition found in the MRO wins. This means a subclass's ``__call__`` overrides the base class's even if the subclass doesn't redefine every method. - Own methods are sorted alphabetically. Dependencies are merged across the - MRO, deduplicated, and sorted deterministically so that declaration order - does not affect the hash. + Own methods are sorted alphabetically. Internal framework attributes (``__ccflow_*``) and pydantic/python boilerplate methods are skipped. @@ -173,45 +197,55 @@ def _collect_methods(cls: type) -> List[Tuple[str, Callable]]: methods.sort(key=lambda pair: pair[0]) - # Collect __ccflow_tokenizer_deps__ from the full MRO. Subclasses may add - # deps without losing inherited ones. + return methods + + +def _collect_dependency_hashes(cls: type, *, _visited: Tuple[type, ...]) -> List[Tuple[str, str]]: + """Collect hashed ``__ccflow_tokenizer_deps__`` entries from the full MRO. + + Dependencies are merged across the MRO, deduplicated, and sorted + deterministically so that declaration order does not affect the hash. + + Dependency entries may be either: + - function-like objects hashable via ``_hash_function_bytecode()`` + - classes, in which case ``compute_behavior_token(dep_class)`` is included + """ deps = [] seen_dep_keys = set() for klass in cls.__mro__: extra_deps = klass.__dict__.get("__ccflow_tokenizer_deps__") if extra_deps is None: continue - for func in extra_deps: - unwrapped = _unwrap_function(func) or func - if not callable(unwrapped): + for dep in extra_deps: + dep_info = _dependency_info(dep, _visited=_visited) + if dep_info is None: continue - dep_key = _dependency_sort_key(func) + dep_key, dep_name, dep_token = dep_info if dep_key in seen_dep_keys: continue seen_dep_keys.add(dep_key) - deps.append((dep_key, func)) + deps.append((dep_key, dep_name, dep_token)) - deps.sort(key=lambda pair: pair[0]) - methods.extend((f"__dep__:{dep_key[1]}", func) for dep_key, func in deps) + deps.sort(key=lambda item: item[0]) + return [(dep_name, dep_token) for _, dep_name, dep_token in deps] - return methods - -def compute_behavior_token(cls: type) -> Optional[str]: +def compute_behavior_token(cls: type, *, _visited: Tuple[type, ...] = ()) -> Optional[str]: """Compute a SHA-256 behavior token for *cls* based on its method bytecode. The token captures behavior-relevant state for every method in *cls*'s MRO (with standard override semantics): bytecode, constants (minus docstrings), defaults, keyword-only defaults, and closure cell contents. It also - includes any standalone functions listed in ``cls.__ccflow_tokenizer_deps__``. + includes any functions or classes listed in ``cls.__ccflow_tokenizer_deps__``. Decorator chains (e.g. ``@Flow.call``) are automatically unwrapped so that the hash reflects the user's implementation, not the wrapper. ``__ccflow_tokenizer_deps__`` values are merged across the full MRO, so - subclasses can add dependencies without dropping inherited ones. + subclasses can add dependencies without dropping inherited ones. Class + entries contribute their own ``compute_behavior_token()`` recursively. - Results are cached on the class in ``cls.__behavior_token_cache__``. + Results are cached on the class in ``cls.__ccflow_tokenizer_cache__``. The cache is stored directly on the class (not inherited), so subclass tokens are independent of parent tokens. @@ -223,13 +257,18 @@ def compute_behavior_token(cls: type) -> Optional[str]: has been computed will **not** invalidate the cached token. Redefining the class (e.g. in Jupyter) creates a new class object and works fine. """ + if cls in _visited: + raise TypeError(f"Recursive __ccflow_tokenizer_deps__ class dependency detected for {cls.__module__}.{cls.__qualname__}") + # Check cache on cls itself (not inherited) - cache = cls.__dict__.get("__behavior_token_cache__") + cache = cls.__dict__.get("__ccflow_tokenizer_cache__") if cache is not None: return cache + visited = _visited + (cls,) methods = _collect_methods(cls) method_hashes = [(name, h) for name, func in methods if (h := _hash_function_bytecode(func)) is not None] + method_hashes.extend(_collect_dependency_hashes(cls, _visited=visited)) if not method_hashes: return None @@ -237,7 +276,7 @@ def compute_behavior_token(cls: type) -> Optional[str]: token = hashlib.sha256(repr(method_hashes).encode("utf-8")).hexdigest() try: - cls.__behavior_token_cache__ = token + cls.__ccflow_tokenizer_cache__ = token except (TypeError, AttributeError): pass # e.g. C extension types that don't allow attribute setting From 66059da0ea21ae4adde0d6229a01a50c8a5196de Mon Sep 17 00:00:00 2001 From: Pascal Tomecek Date: Thu, 23 Apr 2026 17:53:22 -0400 Subject: [PATCH 5/5] refactor: document and centralize cache token hashing Add a private SHA-256 helper in ccflow.utils.tokenize so the hash algorithm is defined in one place, rename the tokenize tests to ccflow/tests/utils/test_tokenize.py to match the module name, and document how MemoryCacheEvaluator cache keys are built. The docs now describe how data tokens, behavior tokens, transparent vs non-transparent evaluators, and __ccflow_tokenizer_deps__ all feed into compute_cache_token(). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Pascal Tomecek --- ...test_behavior_hash.py => test_tokenize.py} | 2 +- ccflow/utils/tokenize.py | 22 +++++++--- docs/wiki/Workflows.md | 40 +++++++++++++++++++ 3 files changed, 58 insertions(+), 6 deletions(-) rename ccflow/tests/utils/{test_behavior_hash.py => test_tokenize.py} (99%) diff --git a/ccflow/tests/utils/test_behavior_hash.py b/ccflow/tests/utils/test_tokenize.py similarity index 99% rename from ccflow/tests/utils/test_behavior_hash.py rename to ccflow/tests/utils/test_tokenize.py index 5bcb562..0166894 100644 --- a/ccflow/tests/utils/test_behavior_hash.py +++ b/ccflow/tests/utils/test_tokenize.py @@ -1,4 +1,4 @@ -"""Tests for tokenize helpers used by cache_key().""" +"""Tests for tokenize helpers used by cache keys.""" import pytest diff --git a/ccflow/utils/tokenize.py b/ccflow/utils/tokenize.py index 473e2bb..528c5ed 100644 --- a/ccflow/utils/tokenize.py +++ b/ccflow/utils/tokenize.py @@ -22,6 +22,17 @@ ) +def _sha256_hexdigest(*parts: bytes | str) -> str: + """Return a SHA-256 hex digest for one or more byte/string parts.""" + + hasher = hashlib.sha256() + for part in parts: + if isinstance(part, str): + part = part.encode("utf-8") + hasher.update(part) + return hasher.hexdigest() + + def compute_data_token(value: Any) -> str: """Compute a deterministic data token using dask's tokenization.""" @@ -133,14 +144,15 @@ def _hash_function_bytecode(func: Callable) -> Optional[str]: if unwrapped is None: return None code = unwrapped.__code__ - h = hashlib.sha256(code.co_code) # Include constants (skip first if it's the docstring) consts = code.co_consts if consts and isinstance(consts[0], str): consts = consts[1:] - h.update(repr(consts).encode("utf-8")) - h.update(compute_data_token(_function_state(unwrapped)).encode("utf-8")) - return h.hexdigest() + return _sha256_hexdigest( + code.co_code, + repr(consts), + compute_data_token(_function_state(unwrapped)), + ) def _dependency_info(dep: object, *, _visited: Tuple[type, ...]) -> Optional[Tuple[Tuple[str, str, str, str], str, str]]: @@ -273,7 +285,7 @@ def compute_behavior_token(cls: type, *, _visited: Tuple[type, ...] = ()) -> Opt if not method_hashes: return None - token = hashlib.sha256(repr(method_hashes).encode("utf-8")).hexdigest() + token = _sha256_hexdigest(repr(method_hashes)) try: cls.__ccflow_tokenizer_cache__ = token diff --git a/docs/wiki/Workflows.md b/docs/wiki/Workflows.md index 526a957..6789d79 100644 --- a/docs/wiki/Workflows.md +++ b/docs/wiki/Workflows.md @@ -528,6 +528,46 @@ with FlowOptionsOverride(options={"cacheable":True, "evaluator": evaluator}): #> GenericResult[int](value=1) ``` +#### How cache keys are built + +`MemoryCacheEvaluator` uses `ccflow.evaluators.cache_key()`, which now delegates to `ccflow.utils.compute_cache_token()`. + +At a high level, the cache key combines: + +- a **data token** for the serialized model/context payload +- a **behavior token** for each callable/evaluator class that can affect the result + +For a direct `CallableModel` call, the cache key depends on: + +- `model.model_dump(mode="python")` +- `compute_behavior_token(type(model))` + +For a `ModelEvaluationContext`, the cache key depends on: + +- the underlying context payload plus the function name being evaluated (`__call__` vs `__deps__`) +- the behavior token of the underlying model class +- the data and behavior tokens of any **non-transparent** evaluators in the chain + +Transparent evaluators are skipped, so wrapping a model with logging or other pass-through evaluators does not change its cache identity. + +`compute_behavior_token()` hashes the class's Python-defined methods and also consults `__ccflow_tokenizer_deps__` for behavior that lives outside the class body. This is useful when the callable depends on module-level helpers or shared helper classes that should also invalidate the cache key when they change. + +```python +def helper(x): + return x + 1 + + +class SharedLogic: + def transform(self, x): + return x * 2 + + +class MyModel(CallableModel): + __ccflow_tokenizer_deps__ = [helper, SharedLogic] +``` + +Entries in `__ccflow_tokenizer_deps__` may be functions or classes. Class dependencies are tokenized recursively via `compute_behavior_token()`. Recursive class dependency graphs are rejected with a `TypeError`. + ### Graph Evaluation Dependencies between tasks/steps in a workflow is one of the defining characteristics of a workflow orchestration framework. Earlier we covered how dependencies defined (via composition) between configuration objects.