Feat/valkey 4 storage#5703
Conversation
fef7e5e to
975887f
Compare
975887f to
986d128
Compare
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds Valkey-GLIDE backends and wiring: shared cache config, ValkeyCache, ValkeyStorage with vector search and sync/async bridging, UploadCache multi-backend abstraction, embedding normalization and async-safety, task/agent cache integration, unified-memory drain changes, comprehensive tests, and dependency metadata. ChangesValkey Backend Integration
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related issues
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning Review ran into problems🔥 ProblemsStopped waiting for pipeline failures after 30000ms. One of your pipelines takes longer than our 30000ms fetch window to run, so review may not consider pipeline-failure results for inline comments if any failures occurred after the fetch window. Increase the timeout if you want to wait longer or run a Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (5)
lib/crewai/src/crewai/memory/encoding_flow.py (1)
71-94: 💤 Low valueMutating passed-in
MemoryRecordobjects in validator may cause unexpected side effects.This validator modifies
record.embeddingin place (lines 86, 93) onMemoryRecordobjects that are passed intoItemState. If those records are referenced elsewhere (e.g., in storage layer caches or other data structures), this mutation could cause unexpected behavior.Consider creating a copy of the record with the converted embedding, or relying on
MemoryRecord's ownvalidate_embeddingvalidator to handle the conversion at construction time.♻️ Alternative: create new MemoryRecord instead of mutating
`@field_validator`("similar_records", "result_record", mode="before") `@classmethod` def ensure_embedding_is_list(cls, v: Any) -> Any: """Ensure MemoryRecord embeddings are list[float], not bytes.""" if v is None: return None if isinstance(v, list): # Process list of MemoryRecords - for record in v: - if isinstance(record, MemoryRecord) and isinstance( - record.embedding, bytes - ): - import numpy as np - - arr = np.frombuffer(record.embedding, dtype=np.float32) - record.embedding = [float(x) for x in arr] - return v + result = [] + for record in v: + if isinstance(record, MemoryRecord) and isinstance(record.embedding, bytes): + import numpy as np + arr = np.frombuffer(record.embedding, dtype=np.float32) + # Use model_copy to create new record with converted embedding + result.append(record.model_copy(update={"embedding": [float(x) for x in arr]})) + else: + result.append(record) + return result🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/src/crewai/memory/encoding_flow.py` around lines 71 - 94, The validator ensure_embedding_is_list mutates MemoryRecord.embedding in-place (for fields similar_records and result_record), which can cause side effects; update it to avoid mutating inputs by creating and returning new MemoryRecord instances with converted embeddings (or convert entire list to new records) instead of assigning to record.embedding, or delegate conversion to MemoryRecord's own validator/constructor so ensure_embedding_is_list only returns new objects or primitives and leaves original MemoryRecord instances untouched; adjust handling for both list and single record paths to return copies with embeddings converted from bytes -> list[float].lib/crewai/tests/memory/storage/test_valkey_storage_scope.py (1)
7-7: 💤 Low valueUnused import:
uuid4The
uuid4import is not used anywhere in this test file.-from uuid import uuid4🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/tests/memory/storage/test_valkey_storage_scope.py` at line 7, Remove the unused import uuid4 from the top of the test file; locate the import statement "from uuid import uuid4" and delete it (or if future use is intended, use it where needed in tests like in test functions or fixtures), ensuring no other code references uuid4.lib/crewai/src/crewai/memory/storage/valkey_cache.py (1)
132-160: 💤 Low value
json.dumpsmay raiseTypeErrorfor non-JSON-serializable values.If a caller passes a non-serializable value (e.g., a custom object, datetime, or bytes),
json.dumps(value)on line 148 will raise aTypeError. Consider either documenting this limitation clearly or adding error handling.🛡️ Suggested defensive handling
async def set( self, key: str, value: Any, ttl: int | None = None, ) -> None: """Set value in cache. Args: key: Cache key. - value: Value to cache (will be serialized to JSON). + value: Value to cache (must be JSON-serializable). ttl: TTL in seconds (None uses default_ttl, 0 = no expiration). + + Raises: + TypeError: If value is not JSON-serializable. """ from glide import ExpirySet, ExpiryType client = await self._get_client() - serialized = json.dumps(value) + try: + serialized = json.dumps(value) + except TypeError as e: + _logger.error(f"Failed to serialize value for key {key}: {e}") + raise🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/src/crewai/memory/storage/valkey_cache.py` around lines 132 - 160, The set method in valkey_cache.py currently calls json.dumps(value) which can raise TypeError for non-JSON-serializable objects; wrap the serialization in a try/except around json.dumps in the set method (ValKeyCache.set) and handle TypeError by either (a) re-raising a clear error (e.g., ValueError/TypeError) that includes the cache key and the value's type, or (b) falling back to a safe serializer (e.g., json.dumps(value, default=str) or using pickle) based on project conventions; ensure the chosen behavior is consistent and documented in the method docstring.lib/crewai/src/crewai/a2a/utils/agent_card.py (1)
48-58: 💤 Low valueConsider thread-safety for
_cache_configuredflag.The
_cache_configuredflag is not protected by a lock, creating a potential race condition if_ensure_cache_configured()is called concurrently from multiple threads. Whilecaches.set_config()is likely idempotent (making this benign), the pattern intask.py(lines 76-78) uses a threading lock for similar initialization.For consistency and to prevent duplicate initialization logs or wasted work, consider adding a lock:
♻️ Suggested improvement
+import threading + _cache_configured = False +_cache_config_lock = threading.Lock() def _ensure_cache_configured() -> None: """Configure aiocache on first use (lazy initialization).""" global _cache_configured if _cache_configured: return - caches.set_config(get_aiocache_config()) - _cache_configured = True + with _cache_config_lock: + if _cache_configured: + return + caches.set_config(get_aiocache_config()) + _cache_configured = True🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/src/crewai/a2a/utils/agent_card.py` around lines 48 - 58, The _cache_configured flag in _ensure_cache_configured() can race when called concurrently; protect it with a threading.Lock to ensure only one thread runs caches.set_config(get_aiocache_config()) and flips the flag. Add a module-level lock (e.g., _cache_configured_lock = threading.Lock()), acquire it at the start of _ensure_cache_configured(), re-check _cache_configured inside the lock (double-checked locking), call caches.set_config(...) and set _cache_configured = True while holding the lock to avoid duplicate initialization work or logs.lib/crewai/tests/utilities/test_cache_config.py (1)
89-98: ⚡ Quick winAdd explicit
REDIS_URLcoverage forget_aiocache_config().This class currently validates the no-URL and
VALKEY_URLpaths, but not theREDIS_URL-only path. Adding it would lock in backward-compatible behavior and prevent regressions.Proposed test addition
class TestGetAiocacheConfig: @@ def test_returns_redis_cache_when_url_set(self) -> None: with patch.dict( os.environ, {"VALKEY_URL": "redis://myhost:6380/2"}, clear=True ): config = get_aiocache_config() assert config["default"]["cache"] == "aiocache.RedisCache" assert config["default"]["endpoint"] == "myhost" assert config["default"]["port"] == 6380 assert config["default"]["db"] == 2 + + def test_returns_redis_cache_when_only_redis_url_set(self) -> None: + with patch.dict( + os.environ, {"REDIS_URL": "redis://redis-host:6379/0"}, clear=True + ): + config = get_aiocache_config() + assert config["default"]["cache"] == "aiocache.RedisCache" + assert config["default"]["endpoint"] == "redis-host" + assert config["default"]["port"] == 6379 + assert config["default"]["db"] == 0🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/tests/utilities/test_cache_config.py` around lines 89 - 98, Add a new unit test for get_aiocache_config that covers the REDIS_URL-only path: create a test (e.g., test_returns_redis_cache_when_redis_url_set) that uses patch.dict(os.environ, {"REDIS_URL": "redis://myhost:6380/2"}, clear=True) to call get_aiocache_config(), then assert config["default"]["cache"] == "aiocache.RedisCache" and that endpoint == "myhost", port == 6380, and db == 2; place this alongside the existing test_returns_redis_cache_when_url_set in test_cache_config.py to ensure backward-compatible behavior when REDIS_URL is provided.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@lib/crewai/src/crewai/memory/storage/valkey_storage.py`:
- Around line 778-779: The update to record.last_accessed uses datetime.now(),
causing a timezone mismatch with MemoryRecord's defaults which use
datetime.utcnow(); replace datetime.now() with datetime.utcnow() in the code
that updates MemoryRecord.last_accessed (the assignment in valkey_storage.py
where record.last_accessed is set) and ensure datetime.utcnow is imported/used
consistently throughout any related methods manipulating
created_at/last_accessed to maintain UTC timestamps.
In `@lib/crewai/src/crewai/memory/unified_memory.py`:
- Around line 363-372: Update the timeout exception handling in the drain/writes
loop so it catches both the builtin TimeoutError and
concurrent.futures.TimeoutError: modify the except block in unified_memory.py
(the block that currently reads "except TimeoutError:" within the save/drain
logic) to catch a tuple including the concurrent.futures timeout class, and
ensure concurrent.futures.TimeoutError is imported or referenced (e.g., import
concurrent.futures or alias the futures TimeoutError) so Python 3.10 futures
timeouts trigger the same warning path (the _logger.warning call and
failed_saves increment).
In `@lib/crewai/src/crewai/utilities/cache_config.py`:
- Around line 14-33: The parse_cache_url function can raise ValueError when
converting parsed.path to int for non-numeric DB names; update parse_cache_url
to defensively handle parsed.path by validating or wrapping
int(parsed.path.lstrip("/")) in a try/except and fall back to 0 (or None) when
conversion fails, ensuring the returned dict (host, port, db, password) never
raises on non-numeric paths; reference parsed.path and the db key in the return
value when implementing this change.
---
Nitpick comments:
In `@lib/crewai/src/crewai/a2a/utils/agent_card.py`:
- Around line 48-58: The _cache_configured flag in _ensure_cache_configured()
can race when called concurrently; protect it with a threading.Lock to ensure
only one thread runs caches.set_config(get_aiocache_config()) and flips the
flag. Add a module-level lock (e.g., _cache_configured_lock = threading.Lock()),
acquire it at the start of _ensure_cache_configured(), re-check
_cache_configured inside the lock (double-checked locking), call
caches.set_config(...) and set _cache_configured = True while holding the lock
to avoid duplicate initialization work or logs.
In `@lib/crewai/src/crewai/memory/encoding_flow.py`:
- Around line 71-94: The validator ensure_embedding_is_list mutates
MemoryRecord.embedding in-place (for fields similar_records and result_record),
which can cause side effects; update it to avoid mutating inputs by creating and
returning new MemoryRecord instances with converted embeddings (or convert
entire list to new records) instead of assigning to record.embedding, or
delegate conversion to MemoryRecord's own validator/constructor so
ensure_embedding_is_list only returns new objects or primitives and leaves
original MemoryRecord instances untouched; adjust handling for both list and
single record paths to return copies with embeddings converted from bytes ->
list[float].
In `@lib/crewai/src/crewai/memory/storage/valkey_cache.py`:
- Around line 132-160: The set method in valkey_cache.py currently calls
json.dumps(value) which can raise TypeError for non-JSON-serializable objects;
wrap the serialization in a try/except around json.dumps in the set method
(ValKeyCache.set) and handle TypeError by either (a) re-raising a clear error
(e.g., ValueError/TypeError) that includes the cache key and the value's type,
or (b) falling back to a safe serializer (e.g., json.dumps(value, default=str)
or using pickle) based on project conventions; ensure the chosen behavior is
consistent and documented in the method docstring.
In `@lib/crewai/tests/memory/storage/test_valkey_storage_scope.py`:
- Line 7: Remove the unused import uuid4 from the top of the test file; locate
the import statement "from uuid import uuid4" and delete it (or if future use is
intended, use it where needed in tests like in test functions or fixtures),
ensuring no other code references uuid4.
In `@lib/crewai/tests/utilities/test_cache_config.py`:
- Around line 89-98: Add a new unit test for get_aiocache_config that covers the
REDIS_URL-only path: create a test (e.g.,
test_returns_redis_cache_when_redis_url_set) that uses patch.dict(os.environ,
{"REDIS_URL": "redis://myhost:6380/2"}, clear=True) to call
get_aiocache_config(), then assert config["default"]["cache"] ==
"aiocache.RedisCache" and that endpoint == "myhost", port == 6380, and db == 2;
place this alongside the existing test_returns_redis_cache_when_url_set in
test_cache_config.py to ensure backward-compatible behavior when REDIS_URL is
provided.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 79394168-72b2-49c2-ad41-99a802d771dd
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (18)
lib/crewai-files/src/crewai_files/cache/upload_cache.pylib/crewai/pyproject.tomllib/crewai/src/crewai/a2a/utils/agent_card.pylib/crewai/src/crewai/a2a/utils/task.pylib/crewai/src/crewai/memory/encoding_flow.pylib/crewai/src/crewai/memory/storage/valkey_cache.pylib/crewai/src/crewai/memory/storage/valkey_storage.pylib/crewai/src/crewai/memory/types.pylib/crewai/src/crewai/memory/unified_memory.pylib/crewai/src/crewai/utilities/cache_config.pylib/crewai/tests/memory/storage/test_valkey_cache.pylib/crewai/tests/memory/storage/test_valkey_storage.pylib/crewai/tests/memory/storage/test_valkey_storage_errors.pylib/crewai/tests/memory/storage/test_valkey_storage_scope.pylib/crewai/tests/memory/storage/test_valkey_storage_search.pylib/crewai/tests/memory/test_embedding_safety.pylib/crewai/tests/utilities/test_cache_config.pypyproject.toml
986d128 to
ba9bee1
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (4)
lib/crewai/tests/memory/storage/test_valkey_storage_search.py (2)
916-931: 💤 Low valueTest doesn't verify that limit=0 is passed to FT.SEARCH.
The test asserts empty results but doesn't check that the KNN clause contains
[KNN 0 ...]. If the implementation sent a higher limit and filtered client-side, this test would still pass.📝 Verify KNN limit in query
query_embedding = [0.1, 0.2, 0.3, 0.4] results = await valkey_storage.asearch(query_embedding, limit=0) + # Verify KNN limit in query + call_args = mock_ft_search.call_args + query = call_args[0][2] + assert "=>[KNN 0 `@embedding` $BLOB AS score]" in query + # Verify empty results assert len(results) == 0🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/tests/memory/storage/test_valkey_storage_search.py` around lines 916 - 931, Update the test_search_with_zero_limit_returns_empty test to also verify the KNN limit is passed to the Redis FT.SEARCH call: after calling valkey_storage.asearch(..., limit=0) assert that the patched crewai.memory.storage.valkey_storage.ft.search mock was called and inspect its query argument (from mock_ft_search.call_args or call_args_list) to confirm the KNN clause contains "[KNN 0" (or otherwise includes "KNN 0" with the expected embedding placeholder), ensuring ValkeyStorage.asearch sends limit=0 to FT.SEARCH rather than filtering client-side.
254-254: ⚡ Quick winEscaping assertions are too permissive.
Tests check for escaped OR unescaped versions (e.g.,
"agent\\-1" in query or "agent-1" in query), which means they pass regardless of whether escaping is actually applied. This reduces test effectiveness in catching escaping regressions.🔒 Strengthen escaping verification
- assert "@agent_id:{agent\\-1}" in query or "@agent_id:{agent-1}" in query + assert "@agent_id:{agent\\-1}" in query, "Special characters should be escaped"Apply similar changes to lines 298, 483, and 514.
Also applies to: 298-298, 483-483, 514-514
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/tests/memory/storage/test_valkey_storage_search.py` at line 254, Replace the permissive OR assertions that accept both escaped and unescaped forms with strict checks that require the escaped form; specifically change occurrences like assert "@agent_id:{agent\\-1}" in query or "@agent_id:{agent-1}" in query to assert "@agent_id:{agent\\-1}" in query (i.e., require the backslash-escaped hyphen) and do the same for the other similar assertions referenced (the ones currently allowing either "@agent_id:{...\\-...}" or unescaped "@agent_id:{...-...}" at the other locations), ensuring the tests fail if escaping is not applied.lib/crewai/src/crewai/memory/storage/valkey_storage.py (2)
252-274: 💤 Low valueBackground loop lifecycle: consider adding explicit shutdown method.
The class-level background event loop runs indefinitely as a daemon thread. While this works for most cases, providing an explicit shutdown mechanism (e.g.,
ValkeyStorage.shutdown_background_loop()) would allow clean resource cleanup in long-running processes that dynamically create/destroy storage instances.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/src/crewai/memory/storage/valkey_storage.py` around lines 252 - 274, Add an explicit classmethod (e.g., ValkeyStorage.shutdown_background_loop) to cleanly stop the background loop created by _get_or_create_loop: if _bg_loop exists and is running, use loop.call_soon_threadsafe(loop.stop) (and run_coroutine_threadsafe(loop.shutdown_asyncgens(), loop) if available) then join _bg_thread with a timeout, close the loop (or schedule loop.close() on the loop thread), and finally set _bg_loop and _bg_thread to None; protect the shutdown with the existing _bg_lock to avoid races.
481-490: ⚖️ Poor tradeoffConsider validating index schema when reusing existing index.
The code checks if the index exists but doesn't verify that its schema (vector dimensions, algorithm) matches the current configuration. A schema mismatch could lead to confusing runtime errors during search operations.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/src/crewai/memory/storage/valkey_storage.py` around lines 481 - 490, The check that only tests for the presence of "memory_index" should be extended to validate its schema: after detecting the index via ft.list(client) and the names set, fetch the index metadata (e.g., via the index info/describe call provided by your vector store client) for "memory_index" and compare its vector dimension, metric/algorithm and any relevant field names against the current storage configuration (use the class attributes like self._vector_dim / self._metric or whatever config variables exist in ValKeyStorage). If the schema matches, keep setting self._index_created = True; if it does not match, log a clear error and either recreate the index with the correct schema or raise an exception so callers know to resolve the mismatch. Ensure you use the same client object and the existing symbols (ft.list(client), "memory_index", existing, names, self._index_created) when locating and validating the index.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@lib/crewai/src/crewai/memory/storage/valkey_storage.py`:
- Around line 252-274: Add an explicit classmethod (e.g.,
ValkeyStorage.shutdown_background_loop) to cleanly stop the background loop
created by _get_or_create_loop: if _bg_loop exists and is running, use
loop.call_soon_threadsafe(loop.stop) (and
run_coroutine_threadsafe(loop.shutdown_asyncgens(), loop) if available) then
join _bg_thread with a timeout, close the loop (or schedule loop.close() on the
loop thread), and finally set _bg_loop and _bg_thread to None; protect the
shutdown with the existing _bg_lock to avoid races.
- Around line 481-490: The check that only tests for the presence of
"memory_index" should be extended to validate its schema: after detecting the
index via ft.list(client) and the names set, fetch the index metadata (e.g., via
the index info/describe call provided by your vector store client) for
"memory_index" and compare its vector dimension, metric/algorithm and any
relevant field names against the current storage configuration (use the class
attributes like self._vector_dim / self._metric or whatever config variables
exist in ValKeyStorage). If the schema matches, keep setting self._index_created
= True; if it does not match, log a clear error and either recreate the index
with the correct schema or raise an exception so callers know to resolve the
mismatch. Ensure you use the same client object and the existing symbols
(ft.list(client), "memory_index", existing, names, self._index_created) when
locating and validating the index.
In `@lib/crewai/tests/memory/storage/test_valkey_storage_search.py`:
- Around line 916-931: Update the test_search_with_zero_limit_returns_empty test
to also verify the KNN limit is passed to the Redis FT.SEARCH call: after
calling valkey_storage.asearch(..., limit=0) assert that the patched
crewai.memory.storage.valkey_storage.ft.search mock was called and inspect its
query argument (from mock_ft_search.call_args or call_args_list) to confirm the
KNN clause contains "[KNN 0" (or otherwise includes "KNN 0" with the expected
embedding placeholder), ensuring ValkeyStorage.asearch sends limit=0 to
FT.SEARCH rather than filtering client-side.
- Line 254: Replace the permissive OR assertions that accept both escaped and
unescaped forms with strict checks that require the escaped form; specifically
change occurrences like assert "@agent_id:{agent\\-1}" in query or
"@agent_id:{agent-1}" in query to assert "@agent_id:{agent\\-1}" in query (i.e.,
require the backslash-escaped hyphen) and do the same for the other similar
assertions referenced (the ones currently allowing either
"@agent_id:{...\\-...}" or unescaped "@agent_id:{...-...}" at the other
locations), ensuring the tests fail if escaping is not applied.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 740ada73-a4ea-43e2-b0fb-2031ed8ed6c3
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (18)
lib/crewai-files/src/crewai_files/cache/upload_cache.pylib/crewai/pyproject.tomllib/crewai/src/crewai/a2a/utils/agent_card.pylib/crewai/src/crewai/a2a/utils/task.pylib/crewai/src/crewai/memory/encoding_flow.pylib/crewai/src/crewai/memory/storage/valkey_cache.pylib/crewai/src/crewai/memory/storage/valkey_storage.pylib/crewai/src/crewai/memory/types.pylib/crewai/src/crewai/memory/unified_memory.pylib/crewai/src/crewai/utilities/cache_config.pylib/crewai/tests/memory/storage/test_valkey_cache.pylib/crewai/tests/memory/storage/test_valkey_storage.pylib/crewai/tests/memory/storage/test_valkey_storage_errors.pylib/crewai/tests/memory/storage/test_valkey_storage_scope.pylib/crewai/tests/memory/storage/test_valkey_storage_search.pylib/crewai/tests/memory/test_embedding_safety.pylib/crewai/tests/utilities/test_cache_config.pypyproject.toml
✅ Files skipped from review due to trivial changes (3)
- lib/crewai/pyproject.toml
- pyproject.toml
- lib/crewai/tests/utilities/test_cache_config.py
🚧 Files skipped from review as they are similar to previous changes (11)
- lib/crewai/tests/memory/storage/test_valkey_storage_errors.py
- lib/crewai/src/crewai/utilities/cache_config.py
- lib/crewai/tests/memory/storage/test_valkey_cache.py
- lib/crewai/src/crewai/memory/unified_memory.py
- lib/crewai/src/crewai/memory/types.py
- lib/crewai/tests/memory/test_embedding_safety.py
- lib/crewai/src/crewai/a2a/utils/agent_card.py
- lib/crewai/src/crewai/a2a/utils/task.py
- lib/crewai/src/crewai/memory/storage/valkey_cache.py
- lib/crewai/tests/memory/storage/test_valkey_storage_scope.py
- lib/crewai-files/src/crewai_files/cache/upload_cache.py
ba9bee1 to
9430075
Compare
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (2)
lib/crewai/src/crewai/a2a/utils/task.py (2)
93-99: ⚡ Quick winConsider clarifying
_cache_initializedsemantics.The
_cache_initializedflag is set toTrue(line 113) for both Valkey and aiocache paths, but for the aiocache path, the function does nothing (returns early at line 95) since aiocache is already configured at module level (line 84). This makes the flag's meaning ambiguous: does it mean "cache backend is ready" or "this function has been called"?♻️ Suggested refactor to clarify intent
Option 1: Rename the flag to reflect its actual purpose:
-_cache_initialized = False +_lazy_init_complete = FalseOption 2: Skip setting the flag for the aiocache path and add a comment:
with _cache_init_lock: if _cache_initialized: return if use_valkey_cache(): from crewai.memory.storage.valkey_cache import ValkeyCache conn = parse_cache_url() or {} _task_cache = ValkeyCache( host=conn.get("host", "localhost"), port=conn.get("port", 6379), db=conn.get("db", 0), password=conn.get("password"), default_ttl=3600, ) - - _cache_initialized = True + _cache_initialized = True + # For aiocache, nothing to do here (already configured at module level)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/src/crewai/a2a/utils/task.py` around lines 93 - 99, The _cache_initialized flag is ambiguous because it's set for both the Valkey initialization and the aiocache path even though the aiocache backend is configured at module import, so change the semantics: either rename _cache_initialized to _init_called (and update all checks that use _cache_initialized and _cache_init_lock in init_task_cache/_init_task_cache to reflect that it only means "this init function executed") OR only set _cache_initialized when this function actually performs initialization for Valkey (leave it false for the aiocache branch) and add a clarifying comment on the aiocache branch stating aiocache is configured at module level; update the guard uses of _cache_initialized and _cache_init_lock accordingly so other code (e.g., callers of init_task_cache, the Valkey init path) get the correct meaning.
156-156: 💤 Low valueConsider making polling interval configurable.
The cancellation polling interval is hardcoded to 0.1 seconds in both Valkey and aiocache paths. For high-frequency task environments or slower networks, a configurable interval (via environment variable or module constant) might be beneficial.
♻️ Example configurable interval
+# Cancellation polling interval in seconds +_CANCEL_POLL_INTERVAL = float(os.getenv("CREWAI_CANCEL_POLL_INTERVAL", "0.1")) + async def poll_for_cancel_valkey() -> bool: """Poll ValkeyCache for cancellation flag.""" while True: if _task_cache is not None and await _task_cache.get( f"cancel:{task_id}" ): return True - await asyncio.sleep(0.1) + await asyncio.sleep(_CANCEL_POLL_INTERVAL) async def poll_for_cancel_aiocache() -> bool: """Poll aiocache for cancellation flag.""" cache = caches.get("default") while True: if await cache.get(f"cancel:{task_id}"): return True - await asyncio.sleep(0.1) + await asyncio.sleep(_CANCEL_POLL_INTERVAL)Also applies to: 164-164
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/src/crewai/a2a/utils/task.py` at line 156, Replace the hardcoded asyncio.sleep(0.1) used in both the Valkey and aiocache cancellation loops with a configurable poll interval: add a module-level constant like CANCELLATION_POLL_INTERVAL (default 0.1) or read from an env var (e.g., os.getenv("CANCELLATION_POLL_INTERVAL")) and use that value (or accept it as an optional parameter) in the functions handling the Valkey and aiocache polling paths (referencing the Valkey cancellation loop and the aiocache cancellation loop in task.py); update both occurrences (the sleep at line with Valkey and the sleep at the aiocache path) to use the new constant/parameter and validate/convert the env value to float.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@lib/crewai/src/crewai/a2a/utils/task.py`:
- Around line 87-113: _wrap the ValkeyCache creation inside _ensure_task_cache
in a try/except to handle connection errors: when use_valkey_cache() is true and
before assigning to _task_cache, catch exceptions from ValkeyCache(...) (and
from parse_cache_url()) and log the error with context, then set _task_cache to
None (or a safe in-memory fallback) and still set _cache_initialized = True so
subsequent calls don’t crash; reference the symbols _ensure_task_cache,
ValkeyCache, parse_cache_url, use_valkey_cache, _task_cache, _cache_initialized,
and _cache_init_lock when making the change so the init is thread-safe and fails
gracefully.
In `@lib/crewai/src/crewai/memory/storage/valkey_storage.py`:
- Around line 513-519: The metadata_filter clauses built in _vector_search() are
not covered by the FT index because memory_index's schema only defines
VectorField("embedding"), TagField("scope"), TagField("categories"),
NumericField("created_at"), and NumericField("importance"), so metadata_filter
cannot actually restrict FT.SEARCH results; fix by either (A) adding the
necessary metadata fields to the schema (materialize each metadata key used in
metadata_filter as a TagField or NumericField depending on type) and updating
memory_index/schema construction where schema is defined, or (B) if you cannot
change the FT index, remove metadata clauses from the FT query in
_vector_search() and instead apply metadata_filter as a post-filter over the
FT.SEARCH results (filtering returned documents by metadata values before
returning final hits). Ensure the change addresses both the _vector_search()
flow and the other occurrences noted (around lines referenced ~1237-1245) so
metadata filters are actually applied.
- Around line 276-307: Add explicit synchronous and asynchronous close methods
on ValkeyStorage (e.g., close() and aclose()) that encapsulate the existing
cleanup logic currently in __aexit__ and __del__: aclose() should await
self._client.close() and set self._client = None; close() should synchronously
close the client (handling coroutine returns with asyncio.run when needed) and
set self._client = None. Update __aexit__ to call await self.aclose() and update
__del__ to call self.close() (or schedule it on _bg_loop as currently done) so
Memory.close() can deterministically release the Glide/Valkey client by calling
the new close()/aclose() API on ValkeyStorage.
- Around line 1220-1228: The current scope filter builds a Redis tag query like
f"@scope:{{{escaped_scope}*}}" which matches siblings (e.g., "/crew/a" matches
"/crew/ab"); update the logic that uses scope_prefix/escaped_scope/query_parts
so results are boundary-aware by either (A) switching to an
ancestor/explicit-tag representation for scopes, or (B) implementing overfetch +
post-filter: keep the existing relaxed query to fetch candidates, then in the
same method filter the returned items by verifying the stored scope string
strictly matches the requested subtree (e.g., for non-root ensure stored_scope
== scope_prefix or stored_scope startswith scope_prefix + "/" so "/crew/a" does
not match "/crew/ab"); modify the code paths that consume escaped_scope and
query_parts and ensure the final returned result list is the post-filtered
subset.
- Around line 975-983: Replace the raw startswith check with a boundary-safe
comparison: instead of if scope_path.startswith(scope_prefix), match either an
exact scope (scope_path == scope_prefix) or a child scope
(scope_path.startswith(scope_prefix + "/")), treating the root scope specially
(allow prefix "/" to match all). Update the check around scope_path/scope_prefix
before building scope_key and calling client.zrange/members_result so sibling
scopes (e.g., "/crew/a" vs "/crew/ab") no longer bleed into
delete/list_records/count/reset/get_scope_info.
In `@lib/crewai/src/crewai/memory/unified_memory.py`:
- Around line 219-229: parse_cache_url currently returns host/port/db/password
but drops the URL scheme so unified_memory.py always instantiates ValkeyStorage
without TLS; update parse_cache_url to detect the scheme (e.g., set use_tls =
scheme.startswith("rediss")) and return that flag, then modify the ValkeyStorage
construction in UnifiedMemory (the block that calls parse_cache_url and
instantiates ValkeyStorage) to pass use_tls=conn.get("use_tls", False) (or the
returned flag) so ValkeyStorage receives the correct TLS setting.
---
Nitpick comments:
In `@lib/crewai/src/crewai/a2a/utils/task.py`:
- Around line 93-99: The _cache_initialized flag is ambiguous because it's set
for both the Valkey initialization and the aiocache path even though the
aiocache backend is configured at module import, so change the semantics: either
rename _cache_initialized to _init_called (and update all checks that use
_cache_initialized and _cache_init_lock in init_task_cache/_init_task_cache to
reflect that it only means "this init function executed") OR only set
_cache_initialized when this function actually performs initialization for
Valkey (leave it false for the aiocache branch) and add a clarifying comment on
the aiocache branch stating aiocache is configured at module level; update the
guard uses of _cache_initialized and _cache_init_lock accordingly so other code
(e.g., callers of init_task_cache, the Valkey init path) get the correct
meaning.
- Line 156: Replace the hardcoded asyncio.sleep(0.1) used in both the Valkey and
aiocache cancellation loops with a configurable poll interval: add a
module-level constant like CANCELLATION_POLL_INTERVAL (default 0.1) or read from
an env var (e.g., os.getenv("CANCELLATION_POLL_INTERVAL")) and use that value
(or accept it as an optional parameter) in the functions handling the Valkey and
aiocache polling paths (referencing the Valkey cancellation loop and the
aiocache cancellation loop in task.py); update both occurrences (the sleep at
line with Valkey and the sleep at the aiocache path) to use the new
constant/parameter and validate/convert the env value to float.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 30afcf2c-fac3-468d-9c15-93b2802269e5
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (18)
lib/crewai-files/src/crewai_files/cache/upload_cache.pylib/crewai/pyproject.tomllib/crewai/src/crewai/a2a/utils/agent_card.pylib/crewai/src/crewai/a2a/utils/task.pylib/crewai/src/crewai/memory/encoding_flow.pylib/crewai/src/crewai/memory/storage/valkey_cache.pylib/crewai/src/crewai/memory/storage/valkey_storage.pylib/crewai/src/crewai/memory/types.pylib/crewai/src/crewai/memory/unified_memory.pylib/crewai/src/crewai/utilities/cache_config.pylib/crewai/tests/memory/storage/test_valkey_cache.pylib/crewai/tests/memory/storage/test_valkey_storage.pylib/crewai/tests/memory/storage/test_valkey_storage_errors.pylib/crewai/tests/memory/storage/test_valkey_storage_scope.pylib/crewai/tests/memory/storage/test_valkey_storage_search.pylib/crewai/tests/memory/test_embedding_safety.pylib/crewai/tests/utilities/test_cache_config.pypyproject.toml
✅ Files skipped from review due to trivial changes (1)
- lib/crewai/pyproject.toml
🚧 Files skipped from review as they are similar to previous changes (12)
- lib/crewai/src/crewai/a2a/utils/agent_card.py
- pyproject.toml
- lib/crewai/tests/memory/storage/test_valkey_cache.py
- lib/crewai/tests/memory/test_embedding_safety.py
- lib/crewai/src/crewai/utilities/cache_config.py
- lib/crewai/src/crewai/memory/types.py
- lib/crewai/src/crewai/memory/storage/valkey_cache.py
- lib/crewai-files/src/crewai_files/cache/upload_cache.py
- lib/crewai/tests/memory/storage/test_valkey_storage_errors.py
- lib/crewai/tests/memory/storage/test_valkey_storage_scope.py
- lib/crewai/tests/utilities/test_cache_config.py
- lib/crewai/tests/memory/storage/test_valkey_storage_search.py
2e5f9bb to
c5a9a8d
Compare
b83ca28 to
aa9ff4e
Compare
|
@greysonlalonde hoping I can get a review on this. |
8dfe861 to
62bebab
Compare
62bebab to
a53fc6b
Compare
Extract duplicated Redis URL parsing into a shared cache_config utility. Introduce ValkeyCache as a lightweight async key/value cache using valkey-glide. Wire it into A2A task handling, agent card caching, and file upload caching. Part 1/4 of Valkey storage implementation. fix: async-safe embeddings and resilient drain_writes Add bytes→float validators on MemoryRecord and ItemState to handle Valkey returning embeddings as raw bytes. Make embed_texts() safe when called from an async context by using a thread pool. Improve drain_writes() with per-save timeouts and error logging instead of raising on failure. Part 3/4 of Valkey storage implementation. feat(valkey): ValkeyStorage vector memory backend Add ValkeyStorage, a distributed StorageBackend implementation using Valkey-GLIDE with Valkey Search for vector similarity. Wire it into Memory as the 'valkey' storage option. Pin scrapegraph-py<2 to fix unrelated upstream breakage. Part 4/4 of Valkey storage implementation. fix: use datetime.utcnow() for last_accessed consistency MemoryRecord defaults use utcnow() for created_at and last_accessed. Match that in ValkeyStorage.update_record() to avoid timezone inconsistency in recency scoring. feat(valkey): shared cache config + ValkeyCache for A2A and file uploads Extract duplicated Redis URL parsing into a shared cache_config utility. Introduce ValkeyCache as a lightweight async key/value cache using valkey-glide. Wire it into A2A task handling, agent card caching, and file upload caching. Part 1/4 of Valkey storage implementation. fix: handle non-numeric database path in cache URL parsing Extract _parse_db_from_path() helper that catches ValueError for paths like /mydb and defaults to 0 with a warning, instead of crashing. fix: async-safe embeddings and resilient drain_writes Add bytes→float validators on MemoryRecord and ItemState to handle Valkey returning embeddings as raw bytes. Make embed_texts() safe when called from an async context by using a thread pool. Improve drain_writes() with per-save timeouts and error logging instead of raising on failure. Part 3/4 of Valkey storage implementation. fix: catch concurrent.futures.TimeoutError for Python 3.10 compat In Python <3.11, concurrent.futures.TimeoutError is distinct from the builtin TimeoutError. Catch both so the timeout warning path works on all supported Python versions.
The FT.* Search module APIs used by ValkeyStorage require valkey-glide 2.x. Bumping to 2.4.0 picks up Valkey Search 1.2 support, client-side caching, context manager support, and performance improvements. Signed-off-by: Matthias Howell <matthias.howell@improving.com>
f149997 to
376b471
Compare
Title: feat(valkey): ValkeyStorage vector memory backend
Description:
Part 4/4 of adding Valkey as a storage backend for CrewAI. This PR adds the core vector storage implementation and wires it into the memory system. Depends on parts 1 and 3.
What changed:
valkey_storage.py (new) — Full StorageBackend implementation using Valkey-GLIDE. Supports vector similarity search via Valkey Search module (FLAT and HNSW indexes), scope/category/metadata filtering through tag and numeric indexes, and both sync and async interfaces. Handles lazy client and index initialization, automatic index creation, and batch save operations with JSON + binary embedding serialization.
unified_memory.py — Added "valkey" as a recognized storage option. When selected, reads connection details from VALKEY_URL/REDIS_URL via the cache_config utility from part 1 and instantiates ValkeyStorage.
pyproject.toml (root) — Pinned scrapegraph-py>=1.46.0,<2 to fix an unrelated upstream breakage where 2.x removed the Client class.
Testing:
test_valkey_storage.py — Core CRUD operations, batch saves, record retrieval, deletion, flushing, TTL, metadata handling, connection management
test_valkey_storage_errors.py — Connection failures, index creation errors, malformed data, graceful degradation
test_valkey_storage_scope.py — Hierarchical scope queries, scope listing, child scope enumeration, cross-scope isolation
test_valkey_storage_search.py — Vector similarity search, composite scoring, category filtering, limit/offset, empty results
All tests use mocked Valkey clients and do not require a running Valkey instance.
Summary by CodeRabbit
New Features
Improvements
Tests