-
Notifications
You must be signed in to change notification settings - Fork 43
Expand file tree
/
Copy pathaio.py
More file actions
2199 lines (1899 loc) · 88.9 KB
/
aio.py
File metadata and controls
2199 lines (1899 loc) · 88.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Async implementation of Redis checkpoint saver."""
from __future__ import annotations
import asyncio
import json
import logging
from collections import defaultdict
from contextlib import asynccontextmanager
from types import TracebackType
from typing import (
Any,
AsyncIterator,
Dict,
List,
Optional,
Sequence,
Tuple,
Type,
Union,
cast,
)
import orjson
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.base import (
WRITES_IDX_MAP,
ChannelVersions,
Checkpoint,
CheckpointMetadata,
CheckpointTuple,
PendingWrite,
get_checkpoint_id,
)
from langgraph.constants import TASKS
from redis.asyncio import Redis as AsyncRedis
from redis.asyncio.cluster import RedisCluster as AsyncRedisCluster
from redisvl.index import AsyncSearchIndex
from redisvl.query import FilterQuery
from redisvl.query.filter import Num, Tag
from redisvl.redis.connection import RedisConnectionFactory
from ulid import ULID
from langgraph.checkpoint.redis.base import (
CHECKPOINT_PREFIX,
CHECKPOINT_WRITE_PREFIX,
REDIS_KEY_SEPARATOR,
BaseRedisSaver,
)
from langgraph.checkpoint.redis.key_registry import (
AsyncCheckpointKeyRegistry as AsyncKeyRegistry,
)
from langgraph.checkpoint.redis.util import (
EMPTY_ID_SENTINEL,
from_storage_safe_id,
from_storage_safe_str,
to_storage_safe_id,
to_storage_safe_str,
)
logger = logging.getLogger(__name__)
class AsyncRedisSaver(
BaseRedisSaver[Union[AsyncRedis, AsyncRedisCluster], AsyncSearchIndex]
):
"""Async Redis implementation for checkpoint saver.
Supports standard Redis URLs (redis://), SSL (rediss://), and
Sentinel URLs (redis+sentinel://host:26379/service_name/db).
"""
_redis_url: str
checkpoints_index: AsyncSearchIndex
checkpoint_writes_index: AsyncSearchIndex
_redis: Union[
AsyncRedis, AsyncRedisCluster
] # Support both standalone and cluster clients
# Whether to assume the Redis server is a cluster; None triggers auto-detection
cluster_mode: Optional[bool] = None
_key_registry: Optional[AsyncKeyRegistry] = None # Track keys to avoid SCAN/KEYS
# Instance-level cache (will be initialized in __init__)
def __init__(
self,
redis_url: Optional[str] = None,
*,
redis_client: Optional[Union[AsyncRedis, AsyncRedisCluster]] = None,
connection_args: Optional[Dict[str, Any]] = None,
ttl: Optional[Dict[str, Any]] = None,
checkpoint_prefix: str = CHECKPOINT_PREFIX,
checkpoint_write_prefix: str = CHECKPOINT_WRITE_PREFIX,
) -> None:
super().__init__(
redis_url=redis_url,
redis_client=redis_client,
connection_args=connection_args,
ttl=ttl,
checkpoint_prefix=checkpoint_prefix,
checkpoint_write_prefix=checkpoint_write_prefix,
)
# Deferred: the event loop is captured in asetup() so that the saver can
# be constructed outside an async context (Issue #179).
self.loop: Optional[asyncio.AbstractEventLoop] = None
# Instance-level cache for frequently used keys (limited size to prevent memory issues)
self._key_cache: Dict[str, str] = {}
self._key_cache_max_size = 1000 # Configurable limit
self._separator = REDIS_KEY_SEPARATOR
def configure_client(
self,
redis_url: Optional[str] = None,
redis_client: Optional[Union[AsyncRedis, AsyncRedisCluster]] = None,
connection_args: Optional[Dict[str, Any]] = None,
) -> None:
"""Configure the Redis client.
Supports standard Redis URLs (redis://), SSL (rediss://), and
Sentinel URLs (redis+sentinel://host:26379/service_name/db).
"""
self._owns_its_client = redis_client is None
if redis_client is None:
self._redis = RedisConnectionFactory.get_async_redis_connection(
redis_url, **(connection_args or {})
)
else:
self._redis = redis_client
def create_indexes(self) -> None:
"""Create indexes without connecting to Redis."""
self.checkpoints_index = AsyncSearchIndex.from_dict(
self.checkpoints_schema, redis_client=self._redis
)
self.checkpoint_writes_index = AsyncSearchIndex.from_dict(
self.writes_schema, redis_client=self._redis
)
def _make_redis_checkpoint_key_cached(
self, thread_id: str, checkpoint_ns: str, checkpoint_id: str
) -> str:
"""Optimized key generation with caching."""
# Create cache key
cache_key = f"ckpt:{thread_id}:{checkpoint_ns}:{checkpoint_id}"
# Check cache first
if cache_key in self._key_cache:
return self._key_cache[cache_key]
# Generate key using optimized string operations
safe_thread_id = str(to_storage_safe_id(thread_id))
safe_checkpoint_ns = to_storage_safe_str(checkpoint_ns)
safe_checkpoint_id = str(to_storage_safe_id(checkpoint_id))
# Use pre-computed prefix and join
key = self._separator.join(
[
self._checkpoint_prefix,
safe_thread_id,
safe_checkpoint_ns,
safe_checkpoint_id,
]
)
# Cache for future use (limit cache size to prevent memory issues)
if len(self._key_cache) < self._key_cache_max_size:
self._key_cache[cache_key] = key
return key
def _make_redis_checkpoint_writes_key_cached(
self,
thread_id: str,
checkpoint_ns: str,
checkpoint_id: str,
task_id: str,
idx: Optional[int],
) -> str:
"""Optimized writes key generation with caching."""
# Create cache key
cache_key = f"write:{thread_id}:{checkpoint_ns}:{checkpoint_id}:{task_id}:{idx}"
# Check cache first
if cache_key in self._key_cache:
return self._key_cache[cache_key]
# Generate key using optimized string operations
safe_thread_id = str(to_storage_safe_id(thread_id))
safe_checkpoint_ns = to_storage_safe_str(checkpoint_ns)
safe_checkpoint_id = str(to_storage_safe_id(checkpoint_id))
# Build key components
key_parts = [
self._checkpoint_write_prefix,
safe_thread_id,
safe_checkpoint_ns,
safe_checkpoint_id,
task_id,
]
if idx is not None:
key_parts.append(str(idx))
# Use pre-computed separator
key = self._separator.join(key_parts)
# Cache for future use (limit cache size)
if len(self._key_cache) < 10000:
self._key_cache[cache_key] = key
return key
async def __aenter__(self) -> AsyncRedisSaver:
"""Async context manager enter."""
await self.asetup()
# Set client info once Redis is set up
await self.aset_client_info()
return self
async def __aexit__(
self,
_exc_type: Optional[Type[BaseException]],
_exc_val: Optional[BaseException],
_exc_tb: Optional[TracebackType],
) -> None:
"""Async context manager exit."""
if self._owns_its_client:
await self._redis.aclose()
# RedisCluster doesn't have connection_pool attribute
if getattr(self._redis, "connection_pool", None):
coro = self._redis.connection_pool.disconnect()
if coro:
await coro
# Prevent RedisVL from attempting to close the client
# on an event loop in a separate thread.
self.checkpoints_index._redis_client = None
self.checkpoint_writes_index._redis_client = None
async def asetup(self) -> None:
"""Set up the checkpoint saver."""
# Capture the running event loop here so that sync wrapper methods
# (get_tuple, put, put_writes, …) can dispatch coroutines to it via
# asyncio.run_coroutine_threadsafe. Deferring this to asetup() instead
# of __init__ lets callers construct the saver outside an async context
# (Issue #179).
self.loop = asyncio.get_running_loop()
self.create_indexes()
await self.checkpoints_index.create(overwrite=False)
await self.checkpoint_writes_index.create(overwrite=False)
# Detect cluster mode if not explicitly set
await self._detect_cluster_mode()
# Initialize key registry
self._key_registry = AsyncKeyRegistry(self._redis)
async def setup(self) -> None: # type: ignore[override]
"""Set up the checkpoint saver asynchronously.
This method creates the necessary indices in Redis and detects cluster mode.
It MUST be called before using the checkpointer.
This async method follows the canonical pattern used by PostgreSQL and SQLite
checkpointers in the LangGraph ecosystem. The type ignore is necessary because
the base class defines a sync setup() method, but async checkpointers require
an async setup() method to properly handle coroutines.
Usage: await checkpointer.setup()
"""
await self.asetup()
async def _detect_cluster_mode(self) -> None:
"""Detect if the Redis client is a cluster client by inspecting its class."""
if self.cluster_mode is not None:
logger.info(
f"Redis cluster_mode explicitly set to {self.cluster_mode}, skipping detection."
)
return
# Determine cluster mode based on client class
if isinstance(self._redis, AsyncRedisCluster):
logger.info("Redis client is a cluster client")
self.cluster_mode = True
else:
logger.info("Redis client is a standalone client")
self.cluster_mode = False
async def _apply_ttl_to_keys(
self,
main_key: str,
related_keys: Optional[list[str]] = None,
ttl_minutes: Optional[float] = None,
) -> Any:
"""Apply Redis native TTL to keys asynchronously.
Args:
main_key: The primary Redis key
related_keys: Additional Redis keys that should expire at the same time
ttl_minutes: Time-to-live in minutes, overrides default_ttl if provided
Use -1 to remove TTL (make keys persistent)
Returns:
Result of the Redis operation
"""
if ttl_minutes is None:
# Check if there's a default TTL in config
if self.ttl_config and "default_ttl" in self.ttl_config:
ttl_minutes = self.ttl_config.get("default_ttl")
if ttl_minutes is not None:
# Special case: -1 means remove TTL (make persistent)
if ttl_minutes == -1:
# Apply PERSIST individually per key so that a single failure
# does not prevent TTL removal on the remaining keys.
all_keys = [main_key] + (related_keys or [])
for key in all_keys:
try:
await self._redis.persist(key)
except Exception:
logger.warning("Failed to remove TTL from key: %s", key)
return True
# Regular TTL setting
ttl_seconds = int(ttl_minutes * 60)
# Apply TTL individually per key so that a single EXPIRE failure
# (e.g. MOVED on Redis Enterprise proxy) does not prevent TTL
# from being set on the remaining keys.
all_keys = [main_key] + (related_keys or [])
for key in all_keys:
try:
await self._redis.expire(key, ttl_seconds)
except Exception:
logger.warning("Failed to apply TTL to key: %s", key)
return True
async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
"""Get a checkpoint tuple from Redis asynchronously."""
thread_id = config["configurable"]["thread_id"]
checkpoint_id = get_checkpoint_id(config)
checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
# For values we store in Redis, we need to convert empty strings to the
# sentinel value.
storage_safe_thread_id = to_storage_safe_id(thread_id)
storage_safe_checkpoint_ns = to_storage_safe_str(checkpoint_ns)
if checkpoint_id and checkpoint_id != EMPTY_ID_SENTINEL:
# Use direct key access instead of FT.SEARCH when checkpoint_id is known
storage_safe_checkpoint_id = to_storage_safe_id(checkpoint_id)
# Construct direct key for checkpoint data
checkpoint_key = self._make_redis_checkpoint_key(
storage_safe_thread_id,
storage_safe_checkpoint_ns,
storage_safe_checkpoint_id,
)
# Create pipeline for efficient batch operations
pipeline = self._redis.pipeline(transaction=False)
# Add checkpoint data fetch to pipeline
pipeline.json().get(checkpoint_key)
# Add TTL check if refresh_on_read is enabled
if self.ttl_config and self.ttl_config.get("refresh_on_read"):
pipeline.ttl(checkpoint_key)
# Execute pipeline to get checkpoint data and TTL
pipeline_results = await pipeline.execute()
checkpoint_data = pipeline_results[0]
if not checkpoint_data:
return None
# Extract TTL if we fetched it
current_ttl = None
if self.ttl_config and self.ttl_config.get("refresh_on_read"):
current_ttl = pipeline_results[1]
# Create doc-like object from direct access
doc = {
"thread_id": checkpoint_data.get("thread_id", storage_safe_thread_id),
"checkpoint_ns": checkpoint_data.get(
"checkpoint_ns", storage_safe_checkpoint_ns
),
"checkpoint_id": checkpoint_data.get(
"checkpoint_id", storage_safe_checkpoint_id
),
"parent_checkpoint_id": checkpoint_data.get(
"parent_checkpoint_id", storage_safe_checkpoint_id
),
"$.checkpoint": json.dumps(checkpoint_data.get("checkpoint", {})),
"$.metadata": checkpoint_data.get(
"metadata", "{}"
), # metadata is already a JSON string
}
else:
# Try to get latest checkpoint using pointer
latest_pointer_key = self._make_redis_checkpoint_latest_key(
thread_id, checkpoint_ns
)
checkpoint_key = await self._redis.get(latest_pointer_key)
if not checkpoint_key:
# No pointer means no checkpoints exist
return None
# Create pipeline for efficient operations
pipeline = self._redis.pipeline(transaction=False)
# Add checkpoint data fetch to pipeline
pipeline.json().get(checkpoint_key)
# Add TTL check if refresh_on_read is enabled
if self.ttl_config and self.ttl_config.get("refresh_on_read"):
pipeline.ttl(checkpoint_key)
# Execute pipeline
pipeline_results = await pipeline.execute()
checkpoint_data = pipeline_results[0]
if not checkpoint_data:
# Pointer exists but checkpoint is missing - data inconsistency
return None
# Extract TTL if we fetched it
current_ttl = None
if self.ttl_config and self.ttl_config.get("refresh_on_read"):
current_ttl = pipeline_results[1]
# Create doc-like object from direct access
doc = {
"thread_id": checkpoint_data.get("thread_id", storage_safe_thread_id),
"checkpoint_ns": checkpoint_data.get(
"checkpoint_ns", storage_safe_checkpoint_ns
),
"checkpoint_id": checkpoint_data.get("checkpoint_id"),
"parent_checkpoint_id": checkpoint_data.get("parent_checkpoint_id"),
"$.checkpoint": json.dumps(checkpoint_data.get("checkpoint", {})),
"$.metadata": checkpoint_data.get(
"metadata", "{}"
), # metadata is already a JSON string
}
doc_thread_id = from_storage_safe_id(doc["thread_id"])
doc_checkpoint_ns = from_storage_safe_str(doc["checkpoint_ns"])
doc_checkpoint_id = from_storage_safe_id(doc["checkpoint_id"])
doc_parent_checkpoint_id = from_storage_safe_id(doc["parent_checkpoint_id"])
# Lazy TTL refresh - only refresh if TTL is below threshold
if self.ttl_config and self.ttl_config.get("refresh_on_read"):
# If we didn't get TTL from pipeline (i.e., came from else branch), fetch it now
if "current_ttl" not in locals():
# Get the checkpoint key
checkpoint_key = self._make_redis_checkpoint_key(
to_storage_safe_id(doc_thread_id),
to_storage_safe_str(doc_checkpoint_ns),
to_storage_safe_id(doc_checkpoint_id),
)
current_ttl = await self._redis.ttl(checkpoint_key)
# Always refresh TTL when refresh_on_read is enabled
# This ensures all related keys maintain synchronized TTLs
# Only refresh if key exists and has TTL (skip keys with no expiry)
# TTL states: -2 = key doesn't exist, -1 = key exists but no TTL, 0 = expired, >0 = seconds remaining
if current_ttl > 0:
# Get write keys from registry instead of SCAN
write_keys = []
if self._key_registry:
write_keys = await self._key_registry.get_write_keys(
doc_thread_id, doc_checkpoint_ns, doc_checkpoint_id
)
# Apply TTL to checkpoint and write keys
await self._apply_ttl_to_keys(
checkpoint_key, write_keys if write_keys else None
)
# Also refresh TTL on registry keys if they exist
if self._key_registry and self.ttl_config:
ttl_minutes = self.ttl_config.get("default_ttl")
if ttl_minutes is not None:
ttl_seconds = int(ttl_minutes * 60)
# Registry TTL is handled per checkpoint
await self._key_registry.apply_ttl(
doc_thread_id,
doc_checkpoint_ns,
doc_checkpoint_id,
ttl_seconds,
)
# Fetch channel_values - pass channel_versions if we have them from direct access
checkpoint_raw = (
doc.get("$.checkpoint")
if isinstance(doc, dict)
else getattr(doc, "$.checkpoint", None)
)
if isinstance(checkpoint_raw, str):
checkpoint_data_dict = json.loads(checkpoint_raw)
else:
checkpoint_data_dict = checkpoint_raw
channel_versions_from_checkpoint = (
checkpoint_data_dict.get("channel_versions")
if checkpoint_data_dict
else None
)
# Run channel_values, pending_sends, and pending_writes loads in parallel
# Create list of coroutines to run
tasks: List[Any] = []
# Always load channel values
tasks.append(
self.aget_channel_values(
thread_id=doc_thread_id,
checkpoint_ns=doc_checkpoint_ns,
checkpoint_id=doc_checkpoint_id,
channel_versions=channel_versions_from_checkpoint,
)
)
# Conditionally load pending sends if parent exists
if doc_parent_checkpoint_id:
tasks.append(
self._aload_pending_sends(
thread_id=thread_id,
checkpoint_ns=doc_checkpoint_ns,
parent_checkpoint_id=doc_parent_checkpoint_id,
)
)
# Always load pending writes
tasks.append(
self._aload_pending_writes(thread_id, checkpoint_ns, doc_checkpoint_id)
)
# Execute all tasks in parallel - pending_sends is optional
if doc_parent_checkpoint_id:
results = await asyncio.gather(*tasks)
channel_values: Dict[str, Any] = self._recursive_deserialize(results[0])
pending_sends: List[Tuple[str, Union[str, bytes]]] = results[1]
pending_writes: List[PendingWrite] = results[2]
else:
# Only channel_values and pending_writes tasks
results = await asyncio.gather(*tasks)
channel_values = self._recursive_deserialize(results[0])
pending_sends = []
pending_writes = results[1]
# Fetch and parse metadata
raw_metadata = (
doc.get("$.metadata", "{}")
if isinstance(doc, dict)
else getattr(doc, "$.metadata", "{}")
)
metadata_dict = (
json.loads(raw_metadata) if isinstance(raw_metadata, str) else raw_metadata
)
# Ensure metadata matches CheckpointMetadata type
sanitized_metadata = {
k.replace("\u0000", ""): (
v.replace("\u0000", "") if isinstance(v, str) else v
)
for k, v in metadata_dict.items()
}
metadata = cast(CheckpointMetadata, sanitized_metadata)
config_param: RunnableConfig = {
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": doc_checkpoint_id,
}
}
# Handle both direct dict access and FT.SEARCH results
checkpoint_data = doc["$.checkpoint"]
if isinstance(checkpoint_data, dict):
# Direct key access returns dict, convert to JSON string for consistency
checkpoint_data = json.dumps(checkpoint_data)
checkpoint_param = self._load_checkpoint(
checkpoint_data,
channel_values,
pending_sends,
)
# Build parent config if parent_checkpoint_id exists
parent_config: RunnableConfig | None = None
if doc_parent_checkpoint_id:
parent_config = {
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": doc_parent_checkpoint_id,
}
}
return CheckpointTuple(
config=config_param,
checkpoint=checkpoint_param,
metadata=metadata,
parent_config=parent_config,
pending_writes=pending_writes,
)
async def alist(
self,
config: Optional[RunnableConfig],
*,
filter: Optional[dict[str, Any]] = None,
before: Optional[RunnableConfig] = None, # noqa: ARG002
limit: Optional[int] = None,
) -> AsyncIterator[CheckpointTuple]:
"""List checkpoints from Redis asynchronously."""
# Construct the filter expression
filter_expression = []
if config:
filter_expression.append(
Tag("thread_id")
== to_storage_safe_id(config["configurable"]["thread_id"])
)
if run_id := config["configurable"].get("run_id"):
filter_expression.append(Tag("run_id") == to_storage_safe_id(run_id))
# Search for checkpoints with any namespace, including an empty
# string, while `checkpoint_id` has to have a value.
if checkpoint_ns := config["configurable"].get("checkpoint_ns"):
filter_expression.append(
Tag("checkpoint_ns") == to_storage_safe_str(checkpoint_ns)
)
if checkpoint_id := get_checkpoint_id(config):
filter_expression.append(
Tag("checkpoint_id") == to_storage_safe_id(checkpoint_id)
)
if filter:
for k, v in filter.items():
if k == "source":
filter_expression.append(Tag("source") == v)
elif k == "step":
filter_expression.append(Num("step") == v)
elif k == "thread_id":
filter_expression.append(Tag("thread_id") == to_storage_safe_id(v))
elif k == "run_id":
filter_expression.append(Tag("run_id") == to_storage_safe_id(v))
else:
raise ValueError(f"Unsupported filter key: {k}")
if before:
before_checkpoint_id = get_checkpoint_id(before)
if before_checkpoint_id:
try:
before_ulid = ULID.from_str(before_checkpoint_id)
before_ts = before_ulid.timestamp
# Use numeric range query: checkpoint_ts < before_ts
filter_expression.append(Num("checkpoint_ts") < before_ts)
except Exception:
# If not a valid ULID, ignore the before filter
pass
# Combine all filter expressions
combined_filter = filter_expression[0] if filter_expression else "*"
for expr in filter_expression[1:]:
combined_filter &= expr
# Construct the Redis query
# Sort by checkpoint_id in descending order to get most recent checkpoints first
query = FilterQuery(
filter_expression=combined_filter,
return_fields=[
"thread_id",
"checkpoint_ns",
"checkpoint_id",
"parent_checkpoint_id",
"$.checkpoint",
"$.metadata",
"has_writes", # Include has_writes to optimize pending_writes loading
],
num_results=limit or 10000,
sort_by=("checkpoint_id", "DESC"),
)
# Execute the query asynchronously
results = await self.checkpoints_index.search(query)
# Pre-process all docs to collect batch query requirements
all_docs_data = []
pending_sends_batch_keys = []
pending_writes_batch_keys = []
for doc in results.docs:
# Extract all attributes once
doc_dict = doc.__dict__ if hasattr(doc, "__dict__") else {}
thread_id = from_storage_safe_id(doc["thread_id"])
checkpoint_ns = from_storage_safe_str(doc["checkpoint_ns"])
checkpoint_id = from_storage_safe_id(doc["checkpoint_id"])
parent_checkpoint_id = from_storage_safe_id(doc["parent_checkpoint_id"])
# Get channel values from inline checkpoint data (already returned by FT.SEARCH)
checkpoint_data = doc_dict.get("$.checkpoint") or getattr(
doc, "$.checkpoint", None
)
if checkpoint_data:
# Parse checkpoint to extract inline channel_values
if isinstance(checkpoint_data, list) and checkpoint_data:
checkpoint_data = checkpoint_data[0]
# Use orjson for faster parsing
checkpoint_dict = (
checkpoint_data
if isinstance(checkpoint_data, dict)
else orjson.loads(checkpoint_data)
)
channel_values = self._recursive_deserialize(
checkpoint_dict.get("channel_values", {})
)
else:
# If checkpoint data is missing, the document is corrupted
# Set empty channel values rather than attempting a fallback
channel_values = {}
# Collect batch keys for pending_sends
if parent_checkpoint_id and parent_checkpoint_id != "None":
batch_key = (thread_id, checkpoint_ns, parent_checkpoint_id)
pending_sends_batch_keys.append(batch_key)
# Collect batch keys for pending_writes
checkpoint_has_writes = doc_dict.get("has_writes") or getattr(
doc, "has_writes", False
)
# Convert string "False" to boolean false if needed (optimize for common case)
if checkpoint_has_writes == "true":
checkpoint_has_writes = True
elif checkpoint_has_writes == "false" or checkpoint_has_writes == "False":
checkpoint_has_writes = False
if checkpoint_has_writes:
batch_key = (thread_id, checkpoint_ns, checkpoint_id)
pending_writes_batch_keys.append(batch_key)
# Store processed doc data for final iteration
all_docs_data.append(
{
"doc": doc,
"doc_dict": doc_dict,
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
"parent_checkpoint_id": parent_checkpoint_id,
"checkpoint_data": checkpoint_data,
"checkpoint_dict": checkpoint_dict if checkpoint_data else None,
"channel_values": channel_values,
"has_writes": checkpoint_has_writes,
}
)
# Load pending_sends for all parent checkpoints at once
pending_sends_map = {}
if pending_sends_batch_keys:
pending_sends_map = await self._abatch_load_pending_sends(
pending_sends_batch_keys
)
# Load pending_writes for all checkpoints with writes at once
pending_writes_map = {}
if pending_writes_batch_keys:
pending_writes_map = await self._abatch_load_pending_writes(
pending_writes_batch_keys
)
# Process the results using pre-loaded batch data
for doc_data in all_docs_data:
thread_id = doc_data["thread_id"]
checkpoint_ns = doc_data["checkpoint_ns"]
checkpoint_id = doc_data["checkpoint_id"]
parent_checkpoint_id = doc_data["parent_checkpoint_id"]
# Get pending_sends from batch results
pending_sends: List[Tuple[str, Union[str, bytes]]] = []
if parent_checkpoint_id:
batch_key = (thread_id, checkpoint_ns, parent_checkpoint_id)
pending_sends = pending_sends_map.get(batch_key, [])
# Fetch and parse metadata
doc_dict = doc_data["doc_dict"]
raw_metadata = doc_dict.get("$.metadata") or getattr(
doc_data["doc"], "$.metadata", "{}"
)
# Use orjson for faster parsing
metadata_dict = (
orjson.loads(raw_metadata)
if isinstance(raw_metadata, str)
else raw_metadata
)
# Only sanitize if null bytes detected (rare case)
if any(
"\u0000" in str(v) for v in metadata_dict.values() if isinstance(v, str)
):
sanitized_metadata = {
k.replace("\u0000", ""): (
v.replace("\u0000", "") if isinstance(v, str) else v
)
for k, v in metadata_dict.items()
}
metadata = cast(CheckpointMetadata, sanitized_metadata)
else:
metadata = cast(CheckpointMetadata, metadata_dict)
# Pre-create the config structure more efficiently
config_param: RunnableConfig = {
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
}
}
# Pass already parsed checkpoint_dict to avoid re-parsing
checkpoint_param = self._load_checkpoint(
(
doc_data["checkpoint_dict"]
if doc_data["checkpoint_data"]
else doc_data["doc"]["$.checkpoint"]
),
doc_data["channel_values"],
pending_sends,
)
# Get pending_writes from batch results
pending_writes: List[PendingWrite] = []
if doc_data["has_writes"]:
batch_key = (thread_id, checkpoint_ns, checkpoint_id)
pending_writes = pending_writes_map.get(batch_key, [])
# Build parent config if parent_checkpoint_id exists
parent_config: RunnableConfig | None = None
if parent_checkpoint_id:
parent_config = {
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": parent_checkpoint_id,
}
}
yield CheckpointTuple(
config=config_param,
checkpoint=checkpoint_param,
metadata=metadata,
parent_config=parent_config,
pending_writes=pending_writes,
)
async def aput(
self,
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: ChannelVersions,
stream_mode: str = "values",
) -> RunnableConfig:
"""Store a checkpoint to Redis with proper transaction handling.
This method ensures that all Redis operations are performed atomically
using Redis transactions. In case of interruption (asyncio.CancelledError),
the transaction will be aborted, ensuring consistency.
Args:
config: The config to associate with the checkpoint
checkpoint: The checkpoint data to store
metadata: Additional metadata to save with the checkpoint
new_versions: New channel versions as of this write
stream_mode: The streaming mode being used (values, updates, etc.)
Returns:
Updated configuration after storing the checkpoint
Raises:
asyncio.CancelledError: If the operation is cancelled/interrupted
"""
configurable = config["configurable"].copy()
run_id = configurable.pop("run_id", metadata.get("run_id"))
thread_id = configurable.pop("thread_id")
checkpoint_ns = configurable.pop("checkpoint_ns")
# Get checkpoint_id from config - this will be parent if saving a child
config_checkpoint_id = configurable.pop("checkpoint_id", None)
# For backward compatibility with thread_ts
thread_ts = configurable.pop("thread_ts", "")
# Determine the checkpoint ID
# This follows the original logic but with clearer parent handling
checkpoint_id = config_checkpoint_id or thread_ts or checkpoint.get("id", "")
# If checkpoint has its own ID that's different from what we'd use,
# and we have a config checkpoint_id, then config checkpoint_id is the parent
parent_checkpoint_id = None
if (
checkpoint.get("id")
and config_checkpoint_id
and checkpoint.get("id") != config_checkpoint_id
):
parent_checkpoint_id = config_checkpoint_id
checkpoint_id = checkpoint["id"]
# For values we store in Redis, we need to convert empty strings to the
# sentinel value.
storage_safe_thread_id = to_storage_safe_id(thread_id)
storage_safe_checkpoint_ns = to_storage_safe_str(checkpoint_ns)
storage_safe_checkpoint_id = to_storage_safe_id(checkpoint_id)
copy = checkpoint.copy()
next_config = {
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
}
}
# Store checkpoint data with cluster-aware handling
try:
# Store checkpoint data WITH inline channel values
# Extract timestamp from checkpoint_id (ULID)
checkpoint_ts = None
if checkpoint_id:
try:
from ulid import ULID
ulid_obj = ULID.from_str(checkpoint_id)
checkpoint_ts = ulid_obj.timestamp # milliseconds since epoch
except Exception:
# If not a valid ULID, use current time
import time
checkpoint_ts = time.time() * 1000
checkpoint_data = {
"thread_id": storage_safe_thread_id,
"run_id": to_storage_safe_id(run_id) if run_id else "",
"checkpoint_ns": storage_safe_checkpoint_ns,
"checkpoint_id": storage_safe_checkpoint_id,
"parent_checkpoint_id": (
to_storage_safe_id(parent_checkpoint_id)
if parent_checkpoint_id
else ""
),
"checkpoint_ts": checkpoint_ts,
"checkpoint": self._dump_checkpoint(copy),
"metadata": self._dump_metadata(metadata),
"has_writes": False, # Track if this checkpoint has pending writes
}
# store at top-level for filters in list()
if all(key in metadata for key in ["source", "step"]):
checkpoint_data["source"] = metadata["source"]
checkpoint_data["step"] = metadata["step"]
# Prepare checkpoint key
checkpoint_key = self._make_redis_checkpoint_key_cached(
thread_id,
checkpoint_ns,
checkpoint_id,
)
# Calculate TTL in seconds if configured
ttl_seconds = None
if self.ttl_config and "default_ttl" in self.ttl_config:
ttl_seconds = int(self.ttl_config["default_ttl"] * 60)
# Store checkpoint with TTL in a single operation using SearchIndex
await self.checkpoints_index.load(
[checkpoint_data],
keys=[checkpoint_key],
ttl=ttl_seconds, # RedisVL applies TTL in its internal pipeline
)
# For test compatibility: ensure TTL operations are visible to mocks
if (
self.cluster_mode
and self.ttl_config
and "default_ttl" in self.ttl_config