Skip to content

Commit 541b631

Browse files
authored
fix(#162): remove dead blob infrastructure — channel values are stored inline (#164)
* test(#162): add integration tests proving no orphaned blob keys after prune 7 integration tests verify that channel values are stored inline in all saver implementations and that prune()/aprune() leaves no orphaned data. * refactor(#162): remove dead blob infrastructure — channel values are inline All 4 saver implementations store channel values inline within checkpoint documents. The blob index, schema, key generators, and related params were never used in production. Removing them eliminates the confusion that led to issue #162. Removed from source: - CHECKPOINT_BLOB_PREFIX constant, blobs_schema, checkpoint_blobs_index - _load_blobs(), _make_redis_checkpoint_blob_key() - _make_shallow_redis_checkpoint_blob_key_pattern/cached() - checkpoint_blob_prefix constructor/factory params - Blob search blocks in delete_thread()/adelete_thread() Updated prune()/aprune() docstrings to document inline storage. * test(#162): add explicit asetup() call in async test helper
1 parent d21ffba commit 541b631

14 files changed

Lines changed: 358 additions & 377 deletions

langgraph/checkpoint/redis/__init__.py

Lines changed: 5 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
from langgraph.checkpoint.redis.aio import AsyncRedisSaver
3030
from langgraph.checkpoint.redis.ashallow import AsyncShallowRedisSaver
3131
from langgraph.checkpoint.redis.base import (
32-
CHECKPOINT_BLOB_PREFIX,
3332
CHECKPOINT_PREFIX,
3433
CHECKPOINT_WRITE_PREFIX,
3534
REDIS_KEY_SEPARATOR,
@@ -69,7 +68,6 @@ def __init__(
6968
connection_args: Optional[Dict[str, Any]] = None,
7069
ttl: Optional[Dict[str, Any]] = None,
7170
checkpoint_prefix: str = CHECKPOINT_PREFIX,
72-
checkpoint_blob_prefix: str = CHECKPOINT_BLOB_PREFIX,
7371
checkpoint_write_prefix: str = CHECKPOINT_WRITE_PREFIX,
7472
) -> None:
7573
super().__init__(
@@ -78,7 +76,6 @@ def __init__(
7876
connection_args=connection_args,
7977
ttl=ttl,
8078
checkpoint_prefix=checkpoint_prefix,
81-
checkpoint_blob_prefix=checkpoint_blob_prefix,
8279
checkpoint_write_prefix=checkpoint_write_prefix,
8380
)
8481
# Prefixes are now set in BaseRedisSaver.__init__
@@ -122,9 +119,6 @@ def create_indexes(self) -> None:
122119
self.checkpoints_index = SearchIndex.from_dict(
123120
self.checkpoints_schema, redis_client=self._redis
124121
)
125-
self.checkpoint_blobs_index = SearchIndex.from_dict(
126-
self.blobs_schema, redis_client=self._redis
127-
)
128122
self.checkpoint_writes_index = SearchIndex.from_dict(
129123
self.writes_schema, redis_client=self._redis
130124
)
@@ -480,7 +474,7 @@ def put(
480474
metadata: CheckpointMetadata,
481475
new_versions: ChannelVersions,
482476
) -> RunnableConfig:
483-
"""Store a checkpoint to Redis with separate blob storage."""
477+
"""Store a checkpoint to Redis with inline channel value storage."""
484478
configurable = config["configurable"].copy()
485479

486480
run_id = configurable.pop("run_id", metadata.get("run_id"))
@@ -975,12 +969,6 @@ def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
975969
# TTL states: -2 = key doesn't exist, -1 = key exists but no TTL, 0 = expired, >0 = seconds remaining
976970
if current_ttl > 0:
977971
# Note: We don't refresh TTL for keys with no expiry (TTL = -1)
978-
# Get all blob keys related to this checkpoint
979-
from langgraph.checkpoint.redis.base import (
980-
CHECKPOINT_BLOB_PREFIX,
981-
CHECKPOINT_WRITE_PREFIX,
982-
)
983-
984972
# Get write keys - use key registry if available, otherwise fall back to search
985973
write_keys = []
986974

@@ -1132,7 +1120,6 @@ def from_conn_string(
11321120
connection_args: Optional[Dict[str, Any]] = None,
11331121
ttl: Optional[Dict[str, Any]] = None,
11341122
checkpoint_prefix: str = CHECKPOINT_PREFIX,
1135-
checkpoint_blob_prefix: str = CHECKPOINT_BLOB_PREFIX,
11361123
checkpoint_write_prefix: str = CHECKPOINT_WRITE_PREFIX,
11371124
) -> Iterator[RedisSaver]:
11381125
"""Create a new RedisSaver instance."""
@@ -1144,7 +1131,6 @@ def from_conn_string(
11441131
connection_args=connection_args,
11451132
ttl=ttl,
11461133
checkpoint_prefix=checkpoint_prefix,
1147-
checkpoint_blob_prefix=checkpoint_blob_prefix,
11481134
checkpoint_write_prefix=checkpoint_write_prefix,
11491135
)
11501136

@@ -1619,24 +1605,7 @@ def delete_thread(self, thread_id: str) -> None:
16191605
latest_pointer_key = f"checkpoint_latest:{storage_safe_thread_id}:{to_storage_safe_str(checkpoint_ns)}"
16201606
keys_to_delete.append(latest_pointer_key)
16211607

1622-
# Delete all blobs for this thread
1623-
blob_query = FilterQuery(
1624-
filter_expression=Tag("thread_id") == storage_safe_thread_id,
1625-
return_fields=["checkpoint_ns", "channel", "version"],
1626-
num_results=10000,
1627-
)
1628-
1629-
blob_results = self.checkpoint_blobs_index.search(blob_query)
1630-
1631-
for doc in blob_results.docs:
1632-
checkpoint_ns = getattr(doc, "checkpoint_ns", "")
1633-
channel = getattr(doc, "channel", "")
1634-
version = getattr(doc, "version", "")
1635-
1636-
blob_key = self._make_redis_checkpoint_blob_key(
1637-
storage_safe_thread_id, checkpoint_ns, channel, version
1638-
)
1639-
keys_to_delete.append(blob_key)
1608+
# Channel values are stored inline — no separate blob keys to clean up.
16401609

16411610
# Delete all writes for this thread
16421611
writes_query = FilterQuery(
@@ -1702,10 +1671,9 @@ def prune(
17021671
17031672
Each namespace (root ``""`` and any subgraph namespaces) is treated as
17041673
an independent checkpoint chain, so ``keep_last`` is applied separately
1705-
within each namespace. Checkpoint blobs are intentionally *not* deleted
1706-
because blob keys are shared across checkpoints via channel versioning —
1707-
the same blob version may be referenced by both kept and evicted
1708-
checkpoints.
1674+
within each namespace. Channel values are stored inline within each
1675+
checkpoint document, so they are automatically removed when the
1676+
checkpoint document is deleted.
17091677
17101678
Args:
17111679
thread_ids: Thread IDs whose old checkpoints should be pruned.

langgraph/checkpoint/redis/aio.py

Lines changed: 4 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
from ulid import ULID
4343

4444
from langgraph.checkpoint.redis.base import (
45-
CHECKPOINT_BLOB_PREFIX,
4645
CHECKPOINT_PREFIX,
4746
CHECKPOINT_WRITE_PREFIX,
4847
REDIS_KEY_SEPARATOR,
@@ -69,7 +68,6 @@ class AsyncRedisSaver(
6968

7069
_redis_url: str
7170
checkpoints_index: AsyncSearchIndex
72-
checkpoint_blobs_index: AsyncSearchIndex
7371
checkpoint_writes_index: AsyncSearchIndex
7472

7573
_redis: Union[
@@ -89,7 +87,6 @@ def __init__(
8987
connection_args: Optional[Dict[str, Any]] = None,
9088
ttl: Optional[Dict[str, Any]] = None,
9189
checkpoint_prefix: str = CHECKPOINT_PREFIX,
92-
checkpoint_blob_prefix: str = CHECKPOINT_BLOB_PREFIX,
9390
checkpoint_write_prefix: str = CHECKPOINT_WRITE_PREFIX,
9491
) -> None:
9592
super().__init__(
@@ -98,7 +95,6 @@ def __init__(
9895
connection_args=connection_args,
9996
ttl=ttl,
10097
checkpoint_prefix=checkpoint_prefix,
101-
checkpoint_blob_prefix=checkpoint_blob_prefix,
10298
checkpoint_write_prefix=checkpoint_write_prefix,
10399
)
104100
self.loop = asyncio.get_running_loop()
@@ -132,9 +128,6 @@ def create_indexes(self) -> None:
132128
self.checkpoints_index = AsyncSearchIndex.from_dict(
133129
self.checkpoints_schema, redis_client=self._redis
134130
)
135-
self.checkpoint_blobs_index = AsyncSearchIndex.from_dict(
136-
self.blobs_schema, redis_client=self._redis
137-
)
138131
self.checkpoint_writes_index = AsyncSearchIndex.from_dict(
139132
self.writes_schema, redis_client=self._redis
140133
)
@@ -240,14 +233,12 @@ async def __aexit__(
240233
# Prevent RedisVL from attempting to close the client
241234
# on an event loop in a separate thread.
242235
self.checkpoints_index._redis_client = None
243-
self.checkpoint_blobs_index._redis_client = None
244236
self.checkpoint_writes_index._redis_client = None
245237

246238
async def asetup(self) -> None:
247239
"""Set up the checkpoint saver."""
248240
self.create_indexes()
249241
await self.checkpoints_index.create(overwrite=False)
250-
await self.checkpoint_blobs_index.create(overwrite=False)
251242
await self.checkpoint_writes_index.create(overwrite=False)
252243

253244
# Detect cluster mode if not explicitly set
@@ -491,12 +482,6 @@ async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
491482
# Only refresh if key exists and has TTL (skip keys with no expiry)
492483
# TTL states: -2 = key doesn't exist, -1 = key exists but no TTL, 0 = expired, >0 = seconds remaining
493484
if current_ttl > 0:
494-
# Get all blob keys related to this checkpoint
495-
from langgraph.checkpoint.redis.base import (
496-
CHECKPOINT_BLOB_PREFIX,
497-
CHECKPOINT_WRITE_PREFIX,
498-
)
499-
500485
# Get write keys from registry instead of SCAN
501486
write_keys = []
502487

@@ -1455,7 +1440,6 @@ async def from_conn_string(
14551440
connection_args: Optional[Dict[str, Any]] = None,
14561441
ttl: Optional[Dict[str, Any]] = None,
14571442
checkpoint_prefix: str = CHECKPOINT_PREFIX,
1458-
checkpoint_blob_prefix: str = CHECKPOINT_BLOB_PREFIX,
14591443
checkpoint_write_prefix: str = CHECKPOINT_WRITE_PREFIX,
14601444
) -> AsyncIterator[AsyncRedisSaver]:
14611445
async with cls(
@@ -1464,7 +1448,6 @@ async def from_conn_string(
14641448
connection_args=connection_args,
14651449
ttl=ttl,
14661450
checkpoint_prefix=checkpoint_prefix,
1467-
checkpoint_blob_prefix=checkpoint_blob_prefix,
14681451
checkpoint_write_prefix=checkpoint_write_prefix,
14691452
) as saver:
14701453
yield saver
@@ -2009,24 +1992,7 @@ async def adelete_thread(self, thread_id: str) -> None:
20091992
latest_pointer_key = f"checkpoint_latest:{storage_safe_thread_id}:{to_storage_safe_str(checkpoint_ns)}"
20101993
keys_to_delete.append(latest_pointer_key)
20111994

2012-
# Delete all blobs for this thread
2013-
blob_query = FilterQuery(
2014-
filter_expression=Tag("thread_id") == storage_safe_thread_id,
2015-
return_fields=["checkpoint_ns", "channel", "version"],
2016-
num_results=10000,
2017-
)
2018-
2019-
blob_results = await self.checkpoint_blobs_index.search(blob_query)
2020-
2021-
for doc in blob_results.docs:
2022-
checkpoint_ns = getattr(doc, "checkpoint_ns", "")
2023-
channel = getattr(doc, "channel", "")
2024-
version = getattr(doc, "version", "")
2025-
2026-
blob_key = self._make_redis_checkpoint_blob_key(
2027-
storage_safe_thread_id, checkpoint_ns, channel, version
2028-
)
2029-
keys_to_delete.append(blob_key)
1995+
# Channel values are stored inline — no separate blob keys to clean up.
20301996

20311997
# Delete all writes for this thread
20321998
writes_query = FilterQuery(
@@ -2092,10 +2058,9 @@ async def aprune(
20922058
20932059
Each namespace (root ``""`` and any subgraph namespaces) is treated as
20942060
an independent checkpoint chain, so ``keep_last`` is applied separately
2095-
within each namespace. Checkpoint blobs are intentionally *not* deleted
2096-
because blob keys are shared across checkpoints via channel versioning —
2097-
the same blob version may be referenced by both kept and evicted
2098-
checkpoints.
2061+
within each namespace. Channel values are stored inline within each
2062+
checkpoint document, so they are automatically removed when the
2063+
checkpoint document is deleted.
20992064
21002065
Args:
21012066
thread_ids: Thread IDs whose old checkpoints should be pruned.

langgraph/checkpoint/redis/ashallow.py

Lines changed: 1 addition & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
from ulid import ULID
3030

3131
from langgraph.checkpoint.redis.base import (
32-
CHECKPOINT_BLOB_PREFIX,
3332
CHECKPOINT_PREFIX,
3433
CHECKPOINT_WRITE_PREFIX,
3534
REDIS_KEY_SEPARATOR,
@@ -49,7 +48,6 @@ class AsyncShallowRedisSaver(BaseRedisSaver[AsyncRedis, AsyncSearchIndex]):
4948

5049
_redis_url: str
5150
checkpoints_index: AsyncSearchIndex
52-
checkpoint_blobs_index: AsyncSearchIndex
5351
checkpoint_writes_index: AsyncSearchIndex
5452

5553
_redis: AsyncRedis # Override the type from the base class
@@ -62,7 +60,6 @@ def __init__(
6260
connection_args: Optional[dict[str, Any]] = None,
6361
ttl: Optional[dict[str, Any]] = None,
6462
checkpoint_prefix: str = CHECKPOINT_PREFIX,
65-
checkpoint_blob_prefix: str = CHECKPOINT_BLOB_PREFIX,
6663
checkpoint_write_prefix: str = CHECKPOINT_WRITE_PREFIX,
6764
) -> None:
6865
super().__init__(
@@ -71,7 +68,6 @@ def __init__(
7168
connection_args=connection_args,
7269
ttl=ttl,
7370
checkpoint_prefix=checkpoint_prefix,
74-
checkpoint_blob_prefix=checkpoint_blob_prefix,
7571
checkpoint_write_prefix=checkpoint_write_prefix,
7672
)
7773
self.loop = asyncio.get_running_loop()
@@ -109,7 +105,6 @@ async def __aexit__(
109105
# Prevent RedisVL from attempting to close the client
110106
# on an event loop in a separate thread.
111107
self.checkpoints_index._redis_client = None
112-
self.checkpoint_blobs_index._redis_client = None
113108
self.checkpoint_writes_index._redis_client = None
114109

115110
@classmethod
@@ -122,7 +117,6 @@ async def from_conn_string(
122117
connection_args: Optional[dict[str, Any]] = None,
123118
ttl: Optional[dict[str, Any]] = None,
124119
checkpoint_prefix: str = CHECKPOINT_PREFIX,
125-
checkpoint_blob_prefix: str = CHECKPOINT_BLOB_PREFIX,
126120
checkpoint_write_prefix: str = CHECKPOINT_WRITE_PREFIX,
127121
) -> AsyncIterator[AsyncShallowRedisSaver]:
128122
"""Create a new AsyncShallowRedisSaver instance."""
@@ -132,16 +126,13 @@ async def from_conn_string(
132126
connection_args=connection_args,
133127
ttl=ttl,
134128
checkpoint_prefix=checkpoint_prefix,
135-
checkpoint_blob_prefix=checkpoint_blob_prefix,
136129
checkpoint_write_prefix=checkpoint_write_prefix,
137130
) as saver:
138131
yield saver
139132

140133
async def asetup(self) -> None:
141-
"""Initialize Redis indexes asynchronously (skip blob index for shallow implementation)."""
142-
# Create only the indexes we actually use
134+
"""Initialize Redis indexes asynchronously."""
143135
await self.checkpoints_index.create(overwrite=False)
144-
# Skip creating blob index since shallow doesn't use separate blobs
145136
await self.checkpoint_writes_index.create(overwrite=False)
146137

147138
async def setup(self) -> None: # type: ignore[override]
@@ -700,10 +691,6 @@ def create_indexes(self) -> None:
700691
self.checkpoints_index = AsyncSearchIndex.from_dict(
701692
self.checkpoints_schema, redis_client=self._redis
702693
)
703-
# Shallow implementation doesn't use blobs, but base class requires the attribute
704-
self.checkpoint_blobs_index = AsyncSearchIndex.from_dict(
705-
self.blobs_schema, redis_client=self._redis
706-
)
707694
self.checkpoint_writes_index = AsyncSearchIndex.from_dict(
708695
self.writes_schema, redis_client=self._redis
709696
)
@@ -822,36 +809,6 @@ def _make_shallow_redis_checkpoint_writes_key_pattern(
822809
+ ":*"
823810
)
824811

825-
@staticmethod
826-
def _make_shallow_redis_checkpoint_blob_key_pattern(
827-
thread_id: str, checkpoint_ns: str
828-
) -> str:
829-
"""Create a pattern to match all blob keys for a thread and namespace."""
830-
return (
831-
REDIS_KEY_SEPARATOR.join(
832-
[
833-
CHECKPOINT_BLOB_PREFIX,
834-
str(to_storage_safe_id(thread_id)),
835-
to_storage_safe_str(checkpoint_ns),
836-
]
837-
)
838-
+ ":*"
839-
)
840-
841-
def _make_shallow_redis_checkpoint_blob_key_cached(
842-
self, thread_id: str, checkpoint_ns: str, channel: str, version: str
843-
) -> str:
844-
"""Create a cached key for checkpoint blobs."""
845-
cache_key = f"shallow_blob:{thread_id}:{checkpoint_ns}:{channel}:{version}"
846-
if cache_key not in self._key_cache:
847-
if len(self._key_cache) >= self._key_cache_max_size:
848-
# Remove oldest entry when cache is full
849-
self._key_cache.pop(next(iter(self._key_cache)))
850-
self._key_cache[cache_key] = self._make_redis_checkpoint_blob_key(
851-
thread_id, checkpoint_ns, channel, version
852-
)
853-
return self._key_cache[cache_key]
854-
855812
def _extract_fallback_timestamp(self, checkpoint: Checkpoint) -> float:
856813
"""Extract timestamp from checkpoint's ts field or use current time.
857814

0 commit comments

Comments
 (0)