Skip to content

Feat/valkey 4 storage#5703

Open
MatthiasHowellYopp wants to merge 2 commits into
crewAIInc:mainfrom
MatthiasHowellYopp:feat/valkey-4-storage
Open

Feat/valkey 4 storage#5703
MatthiasHowellYopp wants to merge 2 commits into
crewAIInc:mainfrom
MatthiasHowellYopp:feat/valkey-4-storage

Conversation

@MatthiasHowellYopp
Copy link
Copy Markdown
Contributor

@MatthiasHowellYopp MatthiasHowellYopp commented May 4, 2026

Title: feat(valkey): ValkeyStorage vector memory backend

Description:

Part 4/4 of adding Valkey as a storage backend for CrewAI. This PR adds the core vector storage implementation and wires it into the memory system. Depends on parts 1 and 3.

What changed:

valkey_storage.py (new) — Full StorageBackend implementation using Valkey-GLIDE. Supports vector similarity search via Valkey Search module (FLAT and HNSW indexes), scope/category/metadata filtering through tag and numeric indexes, and both sync and async interfaces. Handles lazy client and index initialization, automatic index creation, and batch save operations with JSON + binary embedding serialization.

unified_memory.py — Added "valkey" as a recognized storage option. When selected, reads connection details from VALKEY_URL/REDIS_URL via the cache_config utility from part 1 and instantiates ValkeyStorage.

pyproject.toml (root) — Pinned scrapegraph-py>=1.46.0,<2 to fix an unrelated upstream breakage where 2.x removed the Client class.

Testing:

test_valkey_storage.py — Core CRUD operations, batch saves, record retrieval, deletion, flushing, TTL, metadata handling, connection management
test_valkey_storage_errors.py — Connection failures, index creation errors, malformed data, graceful degradation
test_valkey_storage_scope.py — Hierarchical scope queries, scope listing, child scope enumeration, cross-scope isolation
test_valkey_storage_search.py — Vector similarity search, composite scoring, category filtering, limit/offset, empty results
All tests use mocked Valkey clients and do not require a running Valkey instance.

Summary by CodeRabbit

  • New Features

    • Valkey-backed cache option and Valkey-based memory storage with server-side vector search.
    • Cached uploads now support JSON-compatible serialization for cross-backend storage.
    • Optional Valkey dependency added.
  • Improvements

    • Lazy cache initialization and backend-aware cancellation handling.
    • Embedding normalization (bytes/arrays → float lists) and safer async embedding execution.
    • Memory drain now timeout-aware with per-save logging; cache/storage TTL and cleanup consistency.
  • Tests

    • Extensive tests covering Valkey cache, storage, search, error cases, and embedding safety.

Review Change Stack

@MatthiasHowellYopp MatthiasHowellYopp force-pushed the feat/valkey-4-storage branch 3 times, most recently from fef7e5e to 975887f Compare May 7, 2026 20:01
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 11, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds Valkey-GLIDE backends and wiring: shared cache config, ValkeyCache, ValkeyStorage with vector search and sync/async bridging, UploadCache multi-backend abstraction, embedding normalization and async-safety, task/agent cache integration, unified-memory drain changes, comprehensive tests, and dependency metadata.

Changes

Valkey Backend Integration

Layer / File(s) Summary
UploadCache backend and CachedUpload
lib/crewai-files/src/crewai_files/cache/upload_cache.py
Introduce CacheBackend protocol, AiocacheBackend and ValkeyCacheBackend; CachedUpload.to_dict()/from_dict(); route UploadCache get/set/delete/evict/clear/enumeration through backend while preserving TTL/expiry semantics.
Cache configuration helpers
lib/crewai/src/crewai/utilities/cache_config.py, lib/crewai/tests/utilities/test_cache_config.py
parse_cache_url(), _parse_db_from_path(), get_aiocache_config(), use_valkey_cache() to derive aiocache config from VALKEY_URL/REDIS_URL with tests for precedence and defaults.
ValkeyCache implementation
lib/crewai/src/crewai/memory/storage/valkey_cache.py, lib/crewai/tests/memory/storage/test_valkey_cache.py
ValkeyCache with lazy Glide client, async get/set/delete/exists/close, JSON (de)serialization, per-call/default TTL, 10s init timeout, and tests for TTL, JSON edge cases, lifecycle, auth, connection errors, concurrency, and non-serializable-value behavior.
ValkeyStorage core & lifecycle
lib/crewai/src/crewai/memory/storage/valkey_storage.py
ValkeyStorage with lazy client, sync-to-async bridge using a background event loop, write locking, async context manager lifecycle, close semantics, and module logging.
ValkeyStorage serialization & indexing
lib/crewai/src/crewai/memory/storage/valkey_storage.py
Record ↔ Valkey hash conversion including float32 embedding bytes and JSON metadata; category/metadata parsing; scope/category/metadata index maintenance and ensure-index logic.
ValkeyStorage CRUD & Search
lib/crewai/src/crewai/memory/storage/valkey_storage.py
Async batch save/update/delete with index cleanup; candidate discovery for deletions; server-side FT.SEARCH KNN vector search with scope/category/metadata filters, parameter binding, and similarity parsing; public wrappers.
Embedding validators & async-safe embedder
lib/crewai/src/crewai/memory/types.py, lib/crewai/src/crewai/memory/encoding_flow.py, lib/crewai/tests/memory/test_embedding_safety.py
Pydantic field_validators normalize embeddings (bytes → list[float] or None); embed_texts offloads to a ThreadPoolExecutor when called inside a running event loop with 30s timeout and warning fallback; tests validate normalization and async-safety.
Unified memory drain_writes
lib/crewai/src/crewai/memory/unified_memory.py
drain_writes(timeout_per_save: float = 60.0) iterates pending save futures with per-save timeouts, logs per-future outcomes, counts failures, and emits aggregate warning without raising.
Agent card & task integration
lib/crewai/src/crewai/a2a/utils/agent_card.py, lib/crewai/src/crewai/a2a/utils/task.py
Agent card lazily ensures aiocache config; task cancellation supports Valkey and aiocache backends with thread-safe lazy init, Redis pub/sub watcher with polling fallback, and backend-aware set/delete of cancel flags.
ValkeyStorage tests — errors/scope/search
lib/crewai/tests/memory/storage/*
Extensive tests for ValkeyStorage: serialization/deserialization error handling and fallbacks, scope listing/aggregation, category counting, count/reset, FT.SEARCH query/response parsing, robustness, and sync-wrapper parity.
Embedding safety & cache-config tests
lib/crewai/tests/memory/test_embedding_safety.py, lib/crewai/tests/utilities/test_cache_config.py
Tests for embedding normalization, embed_texts async-safety, embed_text behavior, and cache URL parsing/selection logic.
Optional dependencies & overrides
lib/crewai/pyproject.toml, pyproject.toml
Added optional valkey = ["valkey-glide>=1.3.0"] and pinned scrapegraph-py >=1.46.0,<2 with comment in pyproject overrides.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related issues

Poem

"🐰 I hopped into code with JSON bright,

embeddings snug and Valkey in sight,
Glide whirred softly, indices align,
uploads cached, searches refine —
the rabbit cheers: new backends shine!"

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 77.61% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'Feat/valkey 4 storage' is related to the PR's main objective of adding a Valkey storage backend, but it is vague, uses non-standard formatting (lowercase 'feat', slash instead of colon), and lacks clarity about the specific feature being added. Use a clearer, more specific title following conventional commit format, e.g., 'feat: Add ValkeyStorage vector memory backend' or 'feat(memory): Add Valkey storage backend integration'.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Warning

Review ran into problems

🔥 Problems

Stopped waiting for pipeline failures after 30000ms. One of your pipelines takes longer than our 30000ms fetch window to run, so review may not consider pipeline-failure results for inline comments if any failures occurred after the fetch window. Increase the timeout if you want to wait longer or run a @coderabbit review after the pipeline has finished.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (5)
lib/crewai/src/crewai/memory/encoding_flow.py (1)

71-94: 💤 Low value

Mutating passed-in MemoryRecord objects in validator may cause unexpected side effects.

This validator modifies record.embedding in place (lines 86, 93) on MemoryRecord objects that are passed into ItemState. If those records are referenced elsewhere (e.g., in storage layer caches or other data structures), this mutation could cause unexpected behavior.

Consider creating a copy of the record with the converted embedding, or relying on MemoryRecord's own validate_embedding validator to handle the conversion at construction time.

♻️ Alternative: create new MemoryRecord instead of mutating
 `@field_validator`("similar_records", "result_record", mode="before")
 `@classmethod`
 def ensure_embedding_is_list(cls, v: Any) -> Any:
     """Ensure MemoryRecord embeddings are list[float], not bytes."""
     if v is None:
         return None
     if isinstance(v, list):
         # Process list of MemoryRecords
-        for record in v:
-            if isinstance(record, MemoryRecord) and isinstance(
-                record.embedding, bytes
-            ):
-                import numpy as np
-
-                arr = np.frombuffer(record.embedding, dtype=np.float32)
-                record.embedding = [float(x) for x in arr]
-        return v
+        result = []
+        for record in v:
+            if isinstance(record, MemoryRecord) and isinstance(record.embedding, bytes):
+                import numpy as np
+                arr = np.frombuffer(record.embedding, dtype=np.float32)
+                # Use model_copy to create new record with converted embedding
+                result.append(record.model_copy(update={"embedding": [float(x) for x in arr]}))
+            else:
+                result.append(record)
+        return result
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/src/crewai/memory/encoding_flow.py` around lines 71 - 94, The
validator ensure_embedding_is_list mutates MemoryRecord.embedding in-place (for
fields similar_records and result_record), which can cause side effects; update
it to avoid mutating inputs by creating and returning new MemoryRecord instances
with converted embeddings (or convert entire list to new records) instead of
assigning to record.embedding, or delegate conversion to MemoryRecord's own
validator/constructor so ensure_embedding_is_list only returns new objects or
primitives and leaves original MemoryRecord instances untouched; adjust handling
for both list and single record paths to return copies with embeddings converted
from bytes -> list[float].
lib/crewai/tests/memory/storage/test_valkey_storage_scope.py (1)

7-7: 💤 Low value

Unused import: uuid4

The uuid4 import is not used anywhere in this test file.

-from uuid import uuid4
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/tests/memory/storage/test_valkey_storage_scope.py` at line 7,
Remove the unused import uuid4 from the top of the test file; locate the import
statement "from uuid import uuid4" and delete it (or if future use is intended,
use it where needed in tests like in test functions or fixtures), ensuring no
other code references uuid4.
lib/crewai/src/crewai/memory/storage/valkey_cache.py (1)

132-160: 💤 Low value

json.dumps may raise TypeError for non-JSON-serializable values.

If a caller passes a non-serializable value (e.g., a custom object, datetime, or bytes), json.dumps(value) on line 148 will raise a TypeError. Consider either documenting this limitation clearly or adding error handling.

🛡️ Suggested defensive handling
     async def set(
         self,
         key: str,
         value: Any,
         ttl: int | None = None,
     ) -> None:
         """Set value in cache.

         Args:
             key: Cache key.
-            value: Value to cache (will be serialized to JSON).
+            value: Value to cache (must be JSON-serializable).
             ttl: TTL in seconds (None uses default_ttl, 0 = no expiration).
+
+        Raises:
+            TypeError: If value is not JSON-serializable.
         """
         from glide import ExpirySet, ExpiryType

         client = await self._get_client()
-        serialized = json.dumps(value)
+        try:
+            serialized = json.dumps(value)
+        except TypeError as e:
+            _logger.error(f"Failed to serialize value for key {key}: {e}")
+            raise
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/src/crewai/memory/storage/valkey_cache.py` around lines 132 - 160,
The set method in valkey_cache.py currently calls json.dumps(value) which can
raise TypeError for non-JSON-serializable objects; wrap the serialization in a
try/except around json.dumps in the set method (ValKeyCache.set) and handle
TypeError by either (a) re-raising a clear error (e.g., ValueError/TypeError)
that includes the cache key and the value's type, or (b) falling back to a safe
serializer (e.g., json.dumps(value, default=str) or using pickle) based on
project conventions; ensure the chosen behavior is consistent and documented in
the method docstring.
lib/crewai/src/crewai/a2a/utils/agent_card.py (1)

48-58: 💤 Low value

Consider thread-safety for _cache_configured flag.

The _cache_configured flag is not protected by a lock, creating a potential race condition if _ensure_cache_configured() is called concurrently from multiple threads. While caches.set_config() is likely idempotent (making this benign), the pattern in task.py (lines 76-78) uses a threading lock for similar initialization.

For consistency and to prevent duplicate initialization logs or wasted work, consider adding a lock:

♻️ Suggested improvement
+import threading
+
 _cache_configured = False
+_cache_config_lock = threading.Lock()


 def _ensure_cache_configured() -> None:
     """Configure aiocache on first use (lazy initialization)."""
     global _cache_configured
     if _cache_configured:
         return
-    caches.set_config(get_aiocache_config())
-    _cache_configured = True
+    with _cache_config_lock:
+        if _cache_configured:
+            return
+        caches.set_config(get_aiocache_config())
+        _cache_configured = True
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/src/crewai/a2a/utils/agent_card.py` around lines 48 - 58, The
_cache_configured flag in _ensure_cache_configured() can race when called
concurrently; protect it with a threading.Lock to ensure only one thread runs
caches.set_config(get_aiocache_config()) and flips the flag. Add a module-level
lock (e.g., _cache_configured_lock = threading.Lock()), acquire it at the start
of _ensure_cache_configured(), re-check _cache_configured inside the lock
(double-checked locking), call caches.set_config(...) and set _cache_configured
= True while holding the lock to avoid duplicate initialization work or logs.
lib/crewai/tests/utilities/test_cache_config.py (1)

89-98: ⚡ Quick win

Add explicit REDIS_URL coverage for get_aiocache_config().

This class currently validates the no-URL and VALKEY_URL paths, but not the REDIS_URL-only path. Adding it would lock in backward-compatible behavior and prevent regressions.

Proposed test addition
 class TestGetAiocacheConfig:
@@
     def test_returns_redis_cache_when_url_set(self) -> None:
         with patch.dict(
             os.environ, {"VALKEY_URL": "redis://myhost:6380/2"}, clear=True
         ):
             config = get_aiocache_config()
             assert config["default"]["cache"] == "aiocache.RedisCache"
             assert config["default"]["endpoint"] == "myhost"
             assert config["default"]["port"] == 6380
             assert config["default"]["db"] == 2
+
+    def test_returns_redis_cache_when_only_redis_url_set(self) -> None:
+        with patch.dict(
+            os.environ, {"REDIS_URL": "redis://redis-host:6379/0"}, clear=True
+        ):
+            config = get_aiocache_config()
+            assert config["default"]["cache"] == "aiocache.RedisCache"
+            assert config["default"]["endpoint"] == "redis-host"
+            assert config["default"]["port"] == 6379
+            assert config["default"]["db"] == 0
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/tests/utilities/test_cache_config.py` around lines 89 - 98, Add a
new unit test for get_aiocache_config that covers the REDIS_URL-only path:
create a test (e.g., test_returns_redis_cache_when_redis_url_set) that uses
patch.dict(os.environ, {"REDIS_URL": "redis://myhost:6380/2"}, clear=True) to
call get_aiocache_config(), then assert config["default"]["cache"] ==
"aiocache.RedisCache" and that endpoint == "myhost", port == 6380, and db == 2;
place this alongside the existing test_returns_redis_cache_when_url_set in
test_cache_config.py to ensure backward-compatible behavior when REDIS_URL is
provided.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@lib/crewai/src/crewai/memory/storage/valkey_storage.py`:
- Around line 778-779: The update to record.last_accessed uses datetime.now(),
causing a timezone mismatch with MemoryRecord's defaults which use
datetime.utcnow(); replace datetime.now() with datetime.utcnow() in the code
that updates MemoryRecord.last_accessed (the assignment in valkey_storage.py
where record.last_accessed is set) and ensure datetime.utcnow is imported/used
consistently throughout any related methods manipulating
created_at/last_accessed to maintain UTC timestamps.

In `@lib/crewai/src/crewai/memory/unified_memory.py`:
- Around line 363-372: Update the timeout exception handling in the drain/writes
loop so it catches both the builtin TimeoutError and
concurrent.futures.TimeoutError: modify the except block in unified_memory.py
(the block that currently reads "except TimeoutError:" within the save/drain
logic) to catch a tuple including the concurrent.futures timeout class, and
ensure concurrent.futures.TimeoutError is imported or referenced (e.g., import
concurrent.futures or alias the futures TimeoutError) so Python 3.10 futures
timeouts trigger the same warning path (the _logger.warning call and
failed_saves increment).

In `@lib/crewai/src/crewai/utilities/cache_config.py`:
- Around line 14-33: The parse_cache_url function can raise ValueError when
converting parsed.path to int for non-numeric DB names; update parse_cache_url
to defensively handle parsed.path by validating or wrapping
int(parsed.path.lstrip("/")) in a try/except and fall back to 0 (or None) when
conversion fails, ensuring the returned dict (host, port, db, password) never
raises on non-numeric paths; reference parsed.path and the db key in the return
value when implementing this change.

---

Nitpick comments:
In `@lib/crewai/src/crewai/a2a/utils/agent_card.py`:
- Around line 48-58: The _cache_configured flag in _ensure_cache_configured()
can race when called concurrently; protect it with a threading.Lock to ensure
only one thread runs caches.set_config(get_aiocache_config()) and flips the
flag. Add a module-level lock (e.g., _cache_configured_lock = threading.Lock()),
acquire it at the start of _ensure_cache_configured(), re-check
_cache_configured inside the lock (double-checked locking), call
caches.set_config(...) and set _cache_configured = True while holding the lock
to avoid duplicate initialization work or logs.

In `@lib/crewai/src/crewai/memory/encoding_flow.py`:
- Around line 71-94: The validator ensure_embedding_is_list mutates
MemoryRecord.embedding in-place (for fields similar_records and result_record),
which can cause side effects; update it to avoid mutating inputs by creating and
returning new MemoryRecord instances with converted embeddings (or convert
entire list to new records) instead of assigning to record.embedding, or
delegate conversion to MemoryRecord's own validator/constructor so
ensure_embedding_is_list only returns new objects or primitives and leaves
original MemoryRecord instances untouched; adjust handling for both list and
single record paths to return copies with embeddings converted from bytes ->
list[float].

In `@lib/crewai/src/crewai/memory/storage/valkey_cache.py`:
- Around line 132-160: The set method in valkey_cache.py currently calls
json.dumps(value) which can raise TypeError for non-JSON-serializable objects;
wrap the serialization in a try/except around json.dumps in the set method
(ValKeyCache.set) and handle TypeError by either (a) re-raising a clear error
(e.g., ValueError/TypeError) that includes the cache key and the value's type,
or (b) falling back to a safe serializer (e.g., json.dumps(value, default=str)
or using pickle) based on project conventions; ensure the chosen behavior is
consistent and documented in the method docstring.

In `@lib/crewai/tests/memory/storage/test_valkey_storage_scope.py`:
- Line 7: Remove the unused import uuid4 from the top of the test file; locate
the import statement "from uuid import uuid4" and delete it (or if future use is
intended, use it where needed in tests like in test functions or fixtures),
ensuring no other code references uuid4.

In `@lib/crewai/tests/utilities/test_cache_config.py`:
- Around line 89-98: Add a new unit test for get_aiocache_config that covers the
REDIS_URL-only path: create a test (e.g.,
test_returns_redis_cache_when_redis_url_set) that uses patch.dict(os.environ,
{"REDIS_URL": "redis://myhost:6380/2"}, clear=True) to call
get_aiocache_config(), then assert config["default"]["cache"] ==
"aiocache.RedisCache" and that endpoint == "myhost", port == 6380, and db == 2;
place this alongside the existing test_returns_redis_cache_when_url_set in
test_cache_config.py to ensure backward-compatible behavior when REDIS_URL is
provided.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 79394168-72b2-49c2-ad41-99a802d771dd

📥 Commits

Reviewing files that changed from the base of the PR and between 63a9e7e and 986d128.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (18)
  • lib/crewai-files/src/crewai_files/cache/upload_cache.py
  • lib/crewai/pyproject.toml
  • lib/crewai/src/crewai/a2a/utils/agent_card.py
  • lib/crewai/src/crewai/a2a/utils/task.py
  • lib/crewai/src/crewai/memory/encoding_flow.py
  • lib/crewai/src/crewai/memory/storage/valkey_cache.py
  • lib/crewai/src/crewai/memory/storage/valkey_storage.py
  • lib/crewai/src/crewai/memory/types.py
  • lib/crewai/src/crewai/memory/unified_memory.py
  • lib/crewai/src/crewai/utilities/cache_config.py
  • lib/crewai/tests/memory/storage/test_valkey_cache.py
  • lib/crewai/tests/memory/storage/test_valkey_storage.py
  • lib/crewai/tests/memory/storage/test_valkey_storage_errors.py
  • lib/crewai/tests/memory/storage/test_valkey_storage_scope.py
  • lib/crewai/tests/memory/storage/test_valkey_storage_search.py
  • lib/crewai/tests/memory/test_embedding_safety.py
  • lib/crewai/tests/utilities/test_cache_config.py
  • pyproject.toml

Comment thread lib/crewai/src/crewai/memory/storage/valkey_storage.py Outdated
Comment thread lib/crewai/src/crewai/memory/unified_memory.py Outdated
Comment thread lib/crewai/src/crewai/utilities/cache_config.py
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (4)
lib/crewai/tests/memory/storage/test_valkey_storage_search.py (2)

916-931: 💤 Low value

Test doesn't verify that limit=0 is passed to FT.SEARCH.

The test asserts empty results but doesn't check that the KNN clause contains [KNN 0 ...]. If the implementation sent a higher limit and filtered client-side, this test would still pass.

📝 Verify KNN limit in query
         query_embedding = [0.1, 0.2, 0.3, 0.4]
         results = await valkey_storage.asearch(query_embedding, limit=0)
 
+        # Verify KNN limit in query
+        call_args = mock_ft_search.call_args
+        query = call_args[0][2]
+        assert "=>[KNN 0 `@embedding` $BLOB AS score]" in query
+
         # Verify empty results
         assert len(results) == 0
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/tests/memory/storage/test_valkey_storage_search.py` around lines
916 - 931, Update the test_search_with_zero_limit_returns_empty test to also
verify the KNN limit is passed to the Redis FT.SEARCH call: after calling
valkey_storage.asearch(..., limit=0) assert that the patched
crewai.memory.storage.valkey_storage.ft.search mock was called and inspect its
query argument (from mock_ft_search.call_args or call_args_list) to confirm the
KNN clause contains "[KNN 0" (or otherwise includes "KNN 0" with the expected
embedding placeholder), ensuring ValkeyStorage.asearch sends limit=0 to
FT.SEARCH rather than filtering client-side.

254-254: ⚡ Quick win

Escaping assertions are too permissive.

Tests check for escaped OR unescaped versions (e.g., "agent\\-1" in query or "agent-1" in query), which means they pass regardless of whether escaping is actually applied. This reduces test effectiveness in catching escaping regressions.

🔒 Strengthen escaping verification
-        assert "@agent_id:{agent\\-1}" in query or "@agent_id:{agent-1}" in query
+        assert "@agent_id:{agent\\-1}" in query, "Special characters should be escaped"

Apply similar changes to lines 298, 483, and 514.

Also applies to: 298-298, 483-483, 514-514

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/tests/memory/storage/test_valkey_storage_search.py` at line 254,
Replace the permissive OR assertions that accept both escaped and unescaped
forms with strict checks that require the escaped form; specifically change
occurrences like assert "@agent_id:{agent\\-1}" in query or
"@agent_id:{agent-1}" in query to assert "@agent_id:{agent\\-1}" in query (i.e.,
require the backslash-escaped hyphen) and do the same for the other similar
assertions referenced (the ones currently allowing either
"@agent_id:{...\\-...}" or unescaped "@agent_id:{...-...}" at the other
locations), ensuring the tests fail if escaping is not applied.
lib/crewai/src/crewai/memory/storage/valkey_storage.py (2)

252-274: 💤 Low value

Background loop lifecycle: consider adding explicit shutdown method.

The class-level background event loop runs indefinitely as a daemon thread. While this works for most cases, providing an explicit shutdown mechanism (e.g., ValkeyStorage.shutdown_background_loop()) would allow clean resource cleanup in long-running processes that dynamically create/destroy storage instances.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/src/crewai/memory/storage/valkey_storage.py` around lines 252 -
274, Add an explicit classmethod (e.g., ValkeyStorage.shutdown_background_loop)
to cleanly stop the background loop created by _get_or_create_loop: if _bg_loop
exists and is running, use loop.call_soon_threadsafe(loop.stop) (and
run_coroutine_threadsafe(loop.shutdown_asyncgens(), loop) if available) then
join _bg_thread with a timeout, close the loop (or schedule loop.close() on the
loop thread), and finally set _bg_loop and _bg_thread to None; protect the
shutdown with the existing _bg_lock to avoid races.

481-490: ⚖️ Poor tradeoff

Consider validating index schema when reusing existing index.

The code checks if the index exists but doesn't verify that its schema (vector dimensions, algorithm) matches the current configuration. A schema mismatch could lead to confusing runtime errors during search operations.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/src/crewai/memory/storage/valkey_storage.py` around lines 481 -
490, The check that only tests for the presence of "memory_index" should be
extended to validate its schema: after detecting the index via ft.list(client)
and the names set, fetch the index metadata (e.g., via the index info/describe
call provided by your vector store client) for "memory_index" and compare its
vector dimension, metric/algorithm and any relevant field names against the
current storage configuration (use the class attributes like self._vector_dim /
self._metric or whatever config variables exist in ValKeyStorage). If the schema
matches, keep setting self._index_created = True; if it does not match, log a
clear error and either recreate the index with the correct schema or raise an
exception so callers know to resolve the mismatch. Ensure you use the same
client object and the existing symbols (ft.list(client), "memory_index",
existing, names, self._index_created) when locating and validating the index.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@lib/crewai/src/crewai/memory/storage/valkey_storage.py`:
- Around line 252-274: Add an explicit classmethod (e.g.,
ValkeyStorage.shutdown_background_loop) to cleanly stop the background loop
created by _get_or_create_loop: if _bg_loop exists and is running, use
loop.call_soon_threadsafe(loop.stop) (and
run_coroutine_threadsafe(loop.shutdown_asyncgens(), loop) if available) then
join _bg_thread with a timeout, close the loop (or schedule loop.close() on the
loop thread), and finally set _bg_loop and _bg_thread to None; protect the
shutdown with the existing _bg_lock to avoid races.
- Around line 481-490: The check that only tests for the presence of
"memory_index" should be extended to validate its schema: after detecting the
index via ft.list(client) and the names set, fetch the index metadata (e.g., via
the index info/describe call provided by your vector store client) for
"memory_index" and compare its vector dimension, metric/algorithm and any
relevant field names against the current storage configuration (use the class
attributes like self._vector_dim / self._metric or whatever config variables
exist in ValKeyStorage). If the schema matches, keep setting self._index_created
= True; if it does not match, log a clear error and either recreate the index
with the correct schema or raise an exception so callers know to resolve the
mismatch. Ensure you use the same client object and the existing symbols
(ft.list(client), "memory_index", existing, names, self._index_created) when
locating and validating the index.

In `@lib/crewai/tests/memory/storage/test_valkey_storage_search.py`:
- Around line 916-931: Update the test_search_with_zero_limit_returns_empty test
to also verify the KNN limit is passed to the Redis FT.SEARCH call: after
calling valkey_storage.asearch(..., limit=0) assert that the patched
crewai.memory.storage.valkey_storage.ft.search mock was called and inspect its
query argument (from mock_ft_search.call_args or call_args_list) to confirm the
KNN clause contains "[KNN 0" (or otherwise includes "KNN 0" with the expected
embedding placeholder), ensuring ValkeyStorage.asearch sends limit=0 to
FT.SEARCH rather than filtering client-side.
- Line 254: Replace the permissive OR assertions that accept both escaped and
unescaped forms with strict checks that require the escaped form; specifically
change occurrences like assert "@agent_id:{agent\\-1}" in query or
"@agent_id:{agent-1}" in query to assert "@agent_id:{agent\\-1}" in query (i.e.,
require the backslash-escaped hyphen) and do the same for the other similar
assertions referenced (the ones currently allowing either
"@agent_id:{...\\-...}" or unescaped "@agent_id:{...-...}" at the other
locations), ensuring the tests fail if escaping is not applied.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 740ada73-a4ea-43e2-b0fb-2031ed8ed6c3

📥 Commits

Reviewing files that changed from the base of the PR and between 986d128 and ba9bee1.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (18)
  • lib/crewai-files/src/crewai_files/cache/upload_cache.py
  • lib/crewai/pyproject.toml
  • lib/crewai/src/crewai/a2a/utils/agent_card.py
  • lib/crewai/src/crewai/a2a/utils/task.py
  • lib/crewai/src/crewai/memory/encoding_flow.py
  • lib/crewai/src/crewai/memory/storage/valkey_cache.py
  • lib/crewai/src/crewai/memory/storage/valkey_storage.py
  • lib/crewai/src/crewai/memory/types.py
  • lib/crewai/src/crewai/memory/unified_memory.py
  • lib/crewai/src/crewai/utilities/cache_config.py
  • lib/crewai/tests/memory/storage/test_valkey_cache.py
  • lib/crewai/tests/memory/storage/test_valkey_storage.py
  • lib/crewai/tests/memory/storage/test_valkey_storage_errors.py
  • lib/crewai/tests/memory/storage/test_valkey_storage_scope.py
  • lib/crewai/tests/memory/storage/test_valkey_storage_search.py
  • lib/crewai/tests/memory/test_embedding_safety.py
  • lib/crewai/tests/utilities/test_cache_config.py
  • pyproject.toml
✅ Files skipped from review due to trivial changes (3)
  • lib/crewai/pyproject.toml
  • pyproject.toml
  • lib/crewai/tests/utilities/test_cache_config.py
🚧 Files skipped from review as they are similar to previous changes (11)
  • lib/crewai/tests/memory/storage/test_valkey_storage_errors.py
  • lib/crewai/src/crewai/utilities/cache_config.py
  • lib/crewai/tests/memory/storage/test_valkey_cache.py
  • lib/crewai/src/crewai/memory/unified_memory.py
  • lib/crewai/src/crewai/memory/types.py
  • lib/crewai/tests/memory/test_embedding_safety.py
  • lib/crewai/src/crewai/a2a/utils/agent_card.py
  • lib/crewai/src/crewai/a2a/utils/task.py
  • lib/crewai/src/crewai/memory/storage/valkey_cache.py
  • lib/crewai/tests/memory/storage/test_valkey_storage_scope.py
  • lib/crewai-files/src/crewai_files/cache/upload_cache.py

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (2)
lib/crewai/src/crewai/a2a/utils/task.py (2)

93-99: ⚡ Quick win

Consider clarifying _cache_initialized semantics.

The _cache_initialized flag is set to True (line 113) for both Valkey and aiocache paths, but for the aiocache path, the function does nothing (returns early at line 95) since aiocache is already configured at module level (line 84). This makes the flag's meaning ambiguous: does it mean "cache backend is ready" or "this function has been called"?

♻️ Suggested refactor to clarify intent

Option 1: Rename the flag to reflect its actual purpose:

-_cache_initialized = False
+_lazy_init_complete = False

Option 2: Skip setting the flag for the aiocache path and add a comment:

     with _cache_init_lock:
         if _cache_initialized:
             return

         if use_valkey_cache():
             from crewai.memory.storage.valkey_cache import ValkeyCache

             conn = parse_cache_url() or {}
             _task_cache = ValkeyCache(
                 host=conn.get("host", "localhost"),
                 port=conn.get("port", 6379),
                 db=conn.get("db", 0),
                 password=conn.get("password"),
                 default_ttl=3600,
             )
-
-        _cache_initialized = True
+            _cache_initialized = True
+        # For aiocache, nothing to do here (already configured at module level)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/src/crewai/a2a/utils/task.py` around lines 93 - 99, The
_cache_initialized flag is ambiguous because it's set for both the Valkey
initialization and the aiocache path even though the aiocache backend is
configured at module import, so change the semantics: either rename
_cache_initialized to _init_called (and update all checks that use
_cache_initialized and _cache_init_lock in init_task_cache/_init_task_cache to
reflect that it only means "this init function executed") OR only set
_cache_initialized when this function actually performs initialization for
Valkey (leave it false for the aiocache branch) and add a clarifying comment on
the aiocache branch stating aiocache is configured at module level; update the
guard uses of _cache_initialized and _cache_init_lock accordingly so other code
(e.g., callers of init_task_cache, the Valkey init path) get the correct
meaning.

156-156: 💤 Low value

Consider making polling interval configurable.

The cancellation polling interval is hardcoded to 0.1 seconds in both Valkey and aiocache paths. For high-frequency task environments or slower networks, a configurable interval (via environment variable or module constant) might be beneficial.

♻️ Example configurable interval
+# Cancellation polling interval in seconds
+_CANCEL_POLL_INTERVAL = float(os.getenv("CREWAI_CANCEL_POLL_INTERVAL", "0.1"))
+
 async def poll_for_cancel_valkey() -> bool:
     """Poll ValkeyCache for cancellation flag."""
     while True:
         if _task_cache is not None and await _task_cache.get(
             f"cancel:{task_id}"
         ):
             return True
-        await asyncio.sleep(0.1)
+        await asyncio.sleep(_CANCEL_POLL_INTERVAL)

 async def poll_for_cancel_aiocache() -> bool:
     """Poll aiocache for cancellation flag."""
     cache = caches.get("default")
     while True:
         if await cache.get(f"cancel:{task_id}"):
             return True
-        await asyncio.sleep(0.1)
+        await asyncio.sleep(_CANCEL_POLL_INTERVAL)

Also applies to: 164-164

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/src/crewai/a2a/utils/task.py` at line 156, Replace the hardcoded
asyncio.sleep(0.1) used in both the Valkey and aiocache cancellation loops with
a configurable poll interval: add a module-level constant like
CANCELLATION_POLL_INTERVAL (default 0.1) or read from an env var (e.g.,
os.getenv("CANCELLATION_POLL_INTERVAL")) and use that value (or accept it as an
optional parameter) in the functions handling the Valkey and aiocache polling
paths (referencing the Valkey cancellation loop and the aiocache cancellation
loop in task.py); update both occurrences (the sleep at line with Valkey and the
sleep at the aiocache path) to use the new constant/parameter and
validate/convert the env value to float.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@lib/crewai/src/crewai/a2a/utils/task.py`:
- Around line 87-113: _wrap the ValkeyCache creation inside _ensure_task_cache
in a try/except to handle connection errors: when use_valkey_cache() is true and
before assigning to _task_cache, catch exceptions from ValkeyCache(...) (and
from parse_cache_url()) and log the error with context, then set _task_cache to
None (or a safe in-memory fallback) and still set _cache_initialized = True so
subsequent calls don’t crash; reference the symbols _ensure_task_cache,
ValkeyCache, parse_cache_url, use_valkey_cache, _task_cache, _cache_initialized,
and _cache_init_lock when making the change so the init is thread-safe and fails
gracefully.

In `@lib/crewai/src/crewai/memory/storage/valkey_storage.py`:
- Around line 513-519: The metadata_filter clauses built in _vector_search() are
not covered by the FT index because memory_index's schema only defines
VectorField("embedding"), TagField("scope"), TagField("categories"),
NumericField("created_at"), and NumericField("importance"), so metadata_filter
cannot actually restrict FT.SEARCH results; fix by either (A) adding the
necessary metadata fields to the schema (materialize each metadata key used in
metadata_filter as a TagField or NumericField depending on type) and updating
memory_index/schema construction where schema is defined, or (B) if you cannot
change the FT index, remove metadata clauses from the FT query in
_vector_search() and instead apply metadata_filter as a post-filter over the
FT.SEARCH results (filtering returned documents by metadata values before
returning final hits). Ensure the change addresses both the _vector_search()
flow and the other occurrences noted (around lines referenced ~1237-1245) so
metadata filters are actually applied.
- Around line 276-307: Add explicit synchronous and asynchronous close methods
on ValkeyStorage (e.g., close() and aclose()) that encapsulate the existing
cleanup logic currently in __aexit__ and __del__: aclose() should await
self._client.close() and set self._client = None; close() should synchronously
close the client (handling coroutine returns with asyncio.run when needed) and
set self._client = None. Update __aexit__ to call await self.aclose() and update
__del__ to call self.close() (or schedule it on _bg_loop as currently done) so
Memory.close() can deterministically release the Glide/Valkey client by calling
the new close()/aclose() API on ValkeyStorage.
- Around line 1220-1228: The current scope filter builds a Redis tag query like
f"@scope:{{{escaped_scope}*}}" which matches siblings (e.g., "/crew/a" matches
"/crew/ab"); update the logic that uses scope_prefix/escaped_scope/query_parts
so results are boundary-aware by either (A) switching to an
ancestor/explicit-tag representation for scopes, or (B) implementing overfetch +
post-filter: keep the existing relaxed query to fetch candidates, then in the
same method filter the returned items by verifying the stored scope string
strictly matches the requested subtree (e.g., for non-root ensure stored_scope
== scope_prefix or stored_scope startswith scope_prefix + "/" so "/crew/a" does
not match "/crew/ab"); modify the code paths that consume escaped_scope and
query_parts and ensure the final returned result list is the post-filtered
subset.
- Around line 975-983: Replace the raw startswith check with a boundary-safe
comparison: instead of if scope_path.startswith(scope_prefix), match either an
exact scope (scope_path == scope_prefix) or a child scope
(scope_path.startswith(scope_prefix + "/")), treating the root scope specially
(allow prefix "/" to match all). Update the check around scope_path/scope_prefix
before building scope_key and calling client.zrange/members_result so sibling
scopes (e.g., "/crew/a" vs "/crew/ab") no longer bleed into
delete/list_records/count/reset/get_scope_info.

In `@lib/crewai/src/crewai/memory/unified_memory.py`:
- Around line 219-229: parse_cache_url currently returns host/port/db/password
but drops the URL scheme so unified_memory.py always instantiates ValkeyStorage
without TLS; update parse_cache_url to detect the scheme (e.g., set use_tls =
scheme.startswith("rediss")) and return that flag, then modify the ValkeyStorage
construction in UnifiedMemory (the block that calls parse_cache_url and
instantiates ValkeyStorage) to pass use_tls=conn.get("use_tls", False) (or the
returned flag) so ValkeyStorage receives the correct TLS setting.

---

Nitpick comments:
In `@lib/crewai/src/crewai/a2a/utils/task.py`:
- Around line 93-99: The _cache_initialized flag is ambiguous because it's set
for both the Valkey initialization and the aiocache path even though the
aiocache backend is configured at module import, so change the semantics: either
rename _cache_initialized to _init_called (and update all checks that use
_cache_initialized and _cache_init_lock in init_task_cache/_init_task_cache to
reflect that it only means "this init function executed") OR only set
_cache_initialized when this function actually performs initialization for
Valkey (leave it false for the aiocache branch) and add a clarifying comment on
the aiocache branch stating aiocache is configured at module level; update the
guard uses of _cache_initialized and _cache_init_lock accordingly so other code
(e.g., callers of init_task_cache, the Valkey init path) get the correct
meaning.
- Line 156: Replace the hardcoded asyncio.sleep(0.1) used in both the Valkey and
aiocache cancellation loops with a configurable poll interval: add a
module-level constant like CANCELLATION_POLL_INTERVAL (default 0.1) or read from
an env var (e.g., os.getenv("CANCELLATION_POLL_INTERVAL")) and use that value
(or accept it as an optional parameter) in the functions handling the Valkey and
aiocache polling paths (referencing the Valkey cancellation loop and the
aiocache cancellation loop in task.py); update both occurrences (the sleep at
line with Valkey and the sleep at the aiocache path) to use the new
constant/parameter and validate/convert the env value to float.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 30afcf2c-fac3-468d-9c15-93b2802269e5

📥 Commits

Reviewing files that changed from the base of the PR and between ba9bee1 and 9430075.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (18)
  • lib/crewai-files/src/crewai_files/cache/upload_cache.py
  • lib/crewai/pyproject.toml
  • lib/crewai/src/crewai/a2a/utils/agent_card.py
  • lib/crewai/src/crewai/a2a/utils/task.py
  • lib/crewai/src/crewai/memory/encoding_flow.py
  • lib/crewai/src/crewai/memory/storage/valkey_cache.py
  • lib/crewai/src/crewai/memory/storage/valkey_storage.py
  • lib/crewai/src/crewai/memory/types.py
  • lib/crewai/src/crewai/memory/unified_memory.py
  • lib/crewai/src/crewai/utilities/cache_config.py
  • lib/crewai/tests/memory/storage/test_valkey_cache.py
  • lib/crewai/tests/memory/storage/test_valkey_storage.py
  • lib/crewai/tests/memory/storage/test_valkey_storage_errors.py
  • lib/crewai/tests/memory/storage/test_valkey_storage_scope.py
  • lib/crewai/tests/memory/storage/test_valkey_storage_search.py
  • lib/crewai/tests/memory/test_embedding_safety.py
  • lib/crewai/tests/utilities/test_cache_config.py
  • pyproject.toml
✅ Files skipped from review due to trivial changes (1)
  • lib/crewai/pyproject.toml
🚧 Files skipped from review as they are similar to previous changes (12)
  • lib/crewai/src/crewai/a2a/utils/agent_card.py
  • pyproject.toml
  • lib/crewai/tests/memory/storage/test_valkey_cache.py
  • lib/crewai/tests/memory/test_embedding_safety.py
  • lib/crewai/src/crewai/utilities/cache_config.py
  • lib/crewai/src/crewai/memory/types.py
  • lib/crewai/src/crewai/memory/storage/valkey_cache.py
  • lib/crewai-files/src/crewai_files/cache/upload_cache.py
  • lib/crewai/tests/memory/storage/test_valkey_storage_errors.py
  • lib/crewai/tests/memory/storage/test_valkey_storage_scope.py
  • lib/crewai/tests/utilities/test_cache_config.py
  • lib/crewai/tests/memory/storage/test_valkey_storage_search.py

Comment thread lib/crewai/src/crewai/a2a/utils/task.py Outdated
Comment thread lib/crewai/src/crewai/memory/storage/valkey_storage.py
Comment thread lib/crewai/src/crewai/memory/storage/valkey_storage.py
Comment thread lib/crewai/src/crewai/memory/storage/valkey_storage.py
Comment thread lib/crewai/src/crewai/memory/storage/valkey_storage.py Outdated
Comment thread lib/crewai/src/crewai/memory/storage/valkey_storage.py
Comment thread lib/crewai/src/crewai/memory/unified_memory.py
@MatthiasHowellYopp MatthiasHowellYopp force-pushed the feat/valkey-4-storage branch 2 times, most recently from 2e5f9bb to c5a9a8d Compare May 13, 2026 15:00
@MatthiasHowellYopp
Copy link
Copy Markdown
Contributor Author

@greysonlalonde hoping I can get a review on this.

@MatthiasHowellYopp MatthiasHowellYopp force-pushed the feat/valkey-4-storage branch 7 times, most recently from 8dfe861 to 62bebab Compare May 26, 2026 14:01
MatthiasHowellYopp and others added 2 commits May 28, 2026 16:20
Extract duplicated Redis URL parsing into a shared cache_config utility.
Introduce ValkeyCache as a lightweight async key/value cache using
valkey-glide. Wire it into A2A task handling, agent card caching, and
file upload caching.

Part 1/4 of Valkey storage implementation.

fix: async-safe embeddings and resilient drain_writes

Add bytes→float validators on MemoryRecord and ItemState to handle
Valkey returning embeddings as raw bytes. Make embed_texts() safe when
called from an async context by using a thread pool. Improve
drain_writes() with per-save timeouts and error logging instead of
raising on failure.

Part 3/4 of Valkey storage implementation.

feat(valkey): ValkeyStorage vector memory backend

Add ValkeyStorage, a distributed StorageBackend implementation using
Valkey-GLIDE with Valkey Search for vector similarity. Wire it into
Memory as the 'valkey' storage option. Pin scrapegraph-py<2 to fix
unrelated upstream breakage.

Part 4/4 of Valkey storage implementation.

fix: use datetime.utcnow() for last_accessed consistency

MemoryRecord defaults use utcnow() for created_at and last_accessed.
Match that in ValkeyStorage.update_record() to avoid timezone
inconsistency in recency scoring.

feat(valkey): shared cache config + ValkeyCache for A2A and file uploads

Extract duplicated Redis URL parsing into a shared cache_config utility.
Introduce ValkeyCache as a lightweight async key/value cache using
valkey-glide. Wire it into A2A task handling, agent card caching, and
file upload caching.

Part 1/4 of Valkey storage implementation.

fix: handle non-numeric database path in cache URL parsing

Extract _parse_db_from_path() helper that catches ValueError for
paths like /mydb and defaults to 0 with a warning, instead of
crashing.

fix: async-safe embeddings and resilient drain_writes

Add bytes→float validators on MemoryRecord and ItemState to handle
Valkey returning embeddings as raw bytes. Make embed_texts() safe when
called from an async context by using a thread pool. Improve
drain_writes() with per-save timeouts and error logging instead of
raising on failure.

Part 3/4 of Valkey storage implementation.

fix: catch concurrent.futures.TimeoutError for Python 3.10 compat

In Python <3.11, concurrent.futures.TimeoutError is distinct from the
builtin TimeoutError. Catch both so the timeout warning path works
on all supported Python versions.
The FT.* Search module APIs used by ValkeyStorage require valkey-glide
2.x. Bumping to 2.4.0 picks up Valkey Search 1.2 support, client-side
caching, context manager support, and performance improvements.

Signed-off-by: Matthias Howell <matthias.howell@improving.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant