Skip to content

Commit 7cd377c

Browse files
author
Brian Sam-Bodden
authored
fix(checkpoint): make TTL operations best-effort to prevent write loss (#174)
* fix(checkpoint): make all TTL operations best-effort to prevent write loss EXPIRE commands mixed into critical write pipelines caused complete pipeline failure on Redis Enterprise proxy when TTL commands were routed to different shards than the JSON module commands. This lost interrupt writes silently, breaking human-in-the-loop workflows. Changes: - Remove EXPIRE from all put_writes pipelines (aio, sync, shallow) - Use raise_on_error=False for pipeline execution with per-result inspection to preserve JSON.MERGE fallback logic - Apply TTL individually per key after pipeline success, wrapped in try/except so failures only log warnings - Wrap all remaining bare EXPIRE/PERSIST calls across the codebase (put, get_tuple, key_registry, _apply_ttl_to_keys) in try/except - Simplify _apply_ttl_to_keys to use individual calls instead of building EXPIRE-only pipelines that share the same failure mode * test(checkpoint): add HitL+TTL regression tests and update mocked TTL tests - Add test_hitl_ttl_regression.py with 9 tests covering: - HitL+TTL integration guards on real Redis (5 tests) - Pipeline no longer contains EXPIRE commands - Writes survive simulated TTL failure - TTL failure logs warning via caplog - TTL applied separately after pipeline succeeds - Update test_base_client_info_and_ttl.py to expect individual expire calls instead of pipeline-based TTL - Update test_cluster_mode.py to remove cluster/non-cluster branching since both modes now use individual expire calls * fix(checkpoint): address PR review comments on TTL warning diagnostics - Add exc_info=True and key names to all TTL warning logs for diagnosability (aio.py, shallow.py, ashallow.py, key_registry.py) - Remove redundant outer try/except around _key_registry.apply_ttl() in __init__.py since it's already best-effort internally - Remove dead cluster_mode variable in base.py _apply_ttl_to_keys - Remove unused imports in test_hitl_ttl_regression.py - Remove xfail marker from test_sync_hitl_with_ttl (was masking a local environment issue, not a real bug) * fix(serializer): support both old and new Interrupt field layouts The Interrupt dataclass changed between langgraph versions: - langgraph <=1.0.x: Interrupt(value, id) - langgraph >=1.1.x: Interrupt(value, resumable, ns, when) The serializer now introspects the installed Interrupt class at runtime via dataclasses.fields() and serializes/deserializes whichever fields are present. This ensures checkpoints written by either version can be read by either version without data loss. * fix(test): use state.next for sync interrupt detection Sync get_state() in langgraph 1.0.x does not populate task.interrupts (only aget_state does). Use state.next to verify the graph is paused at the interrupt node instead. Also reorder the sync test before async tests to avoid any potential event loop contamination.
1 parent 5832fe5 commit 7cd377c

11 files changed

Lines changed: 1113 additions & 383 deletions

langgraph/checkpoint/redis/__init__.py

Lines changed: 54 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -581,9 +581,15 @@ def put(
581581
)
582582
self._redis.set(latest_pointer_key, checkpoint_key)
583583

584-
# Apply TTL to latest pointer key as well
584+
# Apply TTL to latest pointer key as well (best-effort)
585585
if ttl_seconds is not None:
586-
self._redis.expire(latest_pointer_key, ttl_seconds)
586+
try:
587+
self._redis.expire(latest_pointer_key, ttl_seconds)
588+
except Exception:
589+
logger.warning(
590+
"Failed to apply TTL to latest pointer key: %s",
591+
latest_pointer_key,
592+
)
587593

588594
return next_config
589595

@@ -618,11 +624,16 @@ def put_writes(
618624
}
619625
writes_objects.append(write_obj)
620626

621-
with self._redis.pipeline(transaction=False) as pipeline:
622-
# Keep track of keys we're creating
623-
created_keys = []
624-
write_keys = []
627+
# IMPORTANT: Only critical commands (JSON.SET, JSON.MERGE) go in the pipeline.
628+
# EXPIRE (TTL) commands are applied separately afterward to avoid pipeline
629+
# failures on Redis Enterprise proxy, where mixed JSON module + native commands
630+
# in a single pipeline can cause EXPIRE to fail, aborting the entire pipeline
631+
# and losing interrupt writes.
632+
write_keys: list[str] = []
633+
checkpoint_key = ""
634+
merge_failed = False
625635

636+
with self._redis.pipeline(transaction=False) as pipeline:
626637
for write_obj in writes_objects:
627638
idx_value = write_obj["idx"]
628639
assert isinstance(idx_value, int)
@@ -635,80 +646,58 @@ def put_writes(
635646
)
636647
write_keys.append(key)
637648
pipeline.json().set(key, "$", cast(Any, write_obj))
638-
created_keys.append(key)
639649

640-
# Add TTL operations to the pipeline if configured
641-
if created_keys and self.ttl_config and "default_ttl" in self.ttl_config:
642-
ttl_seconds = int(self.ttl_config["default_ttl"] * 60)
643-
for key in created_keys:
644-
pipeline.expire(key, ttl_seconds)
645-
646-
# Update checkpoint to indicate it has writes
650+
# Update checkpoint to indicate it has writes (critical)
647651
if writes_objects:
648652
checkpoint_key = self._make_redis_checkpoint_key_cached(
649653
thread_id, checkpoint_ns, checkpoint_id
650654
)
651-
# Use merge to update existing document
652655
pipeline.json().merge(checkpoint_key, "$", {"has_writes": True})
653656

657+
# Execute critical commands with raise_on_error=False
658+
results = pipeline.execute(raise_on_error=False)
659+
660+
# Check results for critical command failures
661+
for result in results:
662+
if isinstance(result, Exception):
663+
err_str = str(result)
664+
if "JSON.MERGE" in err_str or "merge" in err_str.lower():
665+
merge_failed = True
666+
else:
667+
raise result
668+
669+
# Handle JSON.MERGE fallback for older Redis versions
670+
if merge_failed and checkpoint_key:
654671
try:
655-
pipeline.execute()
656-
except Exception as e:
657-
# Check if JSON.MERGE failed (older Redis versions)
658-
if "JSON.MERGE" in str(e) or "merge" in str(e).lower():
659-
# Retry without JSON.MERGE for older Redis versions
660-
with self._redis.pipeline(transaction=False) as fallback_pipeline:
661-
# Re-add all the write operations
662-
for write_obj in writes_objects:
663-
idx_value = write_obj["idx"]
664-
assert isinstance(idx_value, int)
665-
key = self._make_redis_checkpoint_writes_key_cached(
666-
thread_id,
667-
checkpoint_ns,
668-
checkpoint_id,
669-
task_id,
670-
idx_value,
671-
)
672-
fallback_pipeline.json().set(key, "$", cast(Any, write_obj))
673-
674-
# Add TTL operations if configured
675-
if (
676-
created_keys
677-
and self.ttl_config
678-
and "default_ttl" in self.ttl_config
679-
):
680-
ttl_seconds = int(self.ttl_config["default_ttl"] * 60)
681-
for key in created_keys:
682-
fallback_pipeline.expire(key, ttl_seconds)
683-
684-
# Execute the fallback pipeline
685-
fallback_pipeline.execute()
686-
687-
# Update has_writes flag separately for older Redis
688-
if checkpoint_key:
689-
try:
690-
checkpoint_data = self._redis.json().get(checkpoint_key)
691-
if isinstance(
692-
checkpoint_data, dict
693-
) and not checkpoint_data.get("has_writes"):
694-
checkpoint_data["has_writes"] = True
695-
self._redis.json().set(
696-
checkpoint_key, "$", checkpoint_data
697-
)
698-
except Exception:
699-
# If this fails, it's not critical - the writes are still saved
700-
pass
701-
else:
702-
# Re-raise other exceptions
703-
raise
672+
checkpoint_data = self._redis.json().get(checkpoint_key)
673+
if isinstance(checkpoint_data, dict) and not checkpoint_data.get(
674+
"has_writes"
675+
):
676+
checkpoint_data["has_writes"] = True
677+
self._redis.json().set(checkpoint_key, "$", checkpoint_data)
678+
except Exception:
679+
pass
680+
681+
# Apply TTL separately (best-effort — failures here don't lose writes).
682+
# Individual calls ensure partial success: if one key's EXPIRE fails
683+
# on RE proxy, the others still get TTL applied.
684+
if write_keys and self.ttl_config and "default_ttl" in self.ttl_config:
685+
ttl_seconds = int(self.ttl_config["default_ttl"] * 60)
686+
for key in write_keys:
687+
try:
688+
self._redis.expire(key, ttl_seconds)
689+
except Exception:
690+
logger.warning(
691+
"Failed to apply TTL to checkpoint write key: %s", key
692+
)
704693

705694
# Update key registry with the write keys
706695
if self._key_registry and write_keys:
707696
self._key_registry.register_write_keys_batch(
708697
thread_id, checkpoint_ns, checkpoint_id, write_keys
709698
)
710699

711-
# Apply TTL to registry key if configured
700+
# Apply TTL to registry key (already best-effort inside apply_ttl)
712701
if self.ttl_config and "default_ttl" in self.ttl_config:
713702
ttl_seconds = int(self.ttl_config["default_ttl"] * 60)
714703
self._key_registry.apply_ttl(

0 commit comments

Comments
 (0)