Skip to content

Commit 35f0ec0

Browse files
committed
fix: AsyncRedisSaver cannot be constructed outside an async context (#179)
1 parent e2a9448 commit 35f0ec0

4 files changed

Lines changed: 204 additions & 2 deletions

File tree

langgraph/checkpoint/redis/aio.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,9 @@ def __init__(
101101
checkpoint_prefix=checkpoint_prefix,
102102
checkpoint_write_prefix=checkpoint_write_prefix,
103103
)
104-
self.loop = asyncio.get_running_loop()
104+
# Deferred: the event loop is captured in asetup() so that the saver can
105+
# be constructed outside an async context (Issue #179).
106+
self.loop: Optional[asyncio.AbstractEventLoop] = None
105107

106108
# Instance-level cache for frequently used keys (limited size to prevent memory issues)
107109
self._key_cache: Dict[str, str] = {}
@@ -243,6 +245,13 @@ async def __aexit__(
243245

244246
async def asetup(self) -> None:
245247
"""Set up the checkpoint saver."""
248+
# Capture the running event loop here so that sync wrapper methods
249+
# (get_tuple, put, put_writes, …) can dispatch coroutines to it via
250+
# asyncio.run_coroutine_threadsafe. Deferring this to asetup() instead
251+
# of __init__ lets callers construct the saver outside an async context
252+
# (Issue #179).
253+
self.loop = asyncio.get_running_loop()
254+
246255
self.create_indexes()
247256
await self.checkpoints_index.create(overwrite=False)
248257
await self.checkpoint_writes_index.create(overwrite=False)
@@ -1307,6 +1316,11 @@ def put_writes(
13071316
task_id (str): Identifier for the task creating the writes.
13081317
task_path (str): Path of the task creating the writes.
13091318
"""
1319+
if self.loop is None:
1320+
raise RuntimeError(
1321+
"AsyncRedisSaver must be set up before calling synchronous methods. "
1322+
"Call `await saver.asetup()` or use `async with saver:` first."
1323+
)
13101324
return asyncio.run_coroutine_threadsafe(
13111325
self.aput_writes(config, writes, task_id), self.loop
13121326
).result()
@@ -1315,6 +1329,11 @@ def get_channel_values(
13151329
self, thread_id: str, checkpoint_ns: str = "", checkpoint_id: str = ""
13161330
) -> Dict[str, Any]:
13171331
"""Retrieve channel_values using efficient FT.SEARCH with checkpoint_id (sync wrapper)."""
1332+
if self.loop is None:
1333+
raise RuntimeError(
1334+
"AsyncRedisSaver must be set up before calling synchronous methods. "
1335+
"Call `await saver.asetup()` or use `async with saver:` first."
1336+
)
13181337
try:
13191338
if asyncio.get_running_loop() is self.loop:
13201339
raise asyncio.InvalidStateError(
@@ -1345,6 +1364,11 @@ def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
13451364
Raises:
13461365
asyncio.InvalidStateError: If called from the wrong thread/event loop
13471366
"""
1367+
if self.loop is None:
1368+
raise RuntimeError(
1369+
"AsyncRedisSaver must be set up before calling synchronous methods. "
1370+
"Call `await saver.asetup()` or use `async with saver:` first."
1371+
)
13481372
try:
13491373
# check if we are in the main thread, only bg threads can block
13501374
if asyncio.get_running_loop() is self.loop:
@@ -1381,6 +1405,11 @@ def put(
13811405
Raises:
13821406
asyncio.InvalidStateError: If called from the wrong thread/event loop
13831407
"""
1408+
if self.loop is None:
1409+
raise RuntimeError(
1410+
"AsyncRedisSaver must be set up before calling synchronous methods. "
1411+
"Call `await saver.asetup()` or use `async with saver:` first."
1412+
)
13841413
try:
13851414
# check if we are in the main thread, only bg threads can block
13861415
if asyncio.get_running_loop() is self.loop:

langgraph/checkpoint/redis/ashallow.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,9 @@ def __init__(
7777
checkpoint_prefix=checkpoint_prefix,
7878
checkpoint_write_prefix=checkpoint_write_prefix,
7979
)
80-
self.loop = asyncio.get_running_loop()
80+
# Deferred: the event loop is captured in asetup() so that the saver can
81+
# be constructed outside an async context (Issue #179).
82+
self.loop: Optional[asyncio.AbstractEventLoop] = None
8183

8284
# Instance-level cache for frequently used keys (limited size to prevent memory issues)
8385
self._key_cache: Dict[str, str] = {}
@@ -139,6 +141,13 @@ async def from_conn_string(
139141

140142
async def asetup(self) -> None:
141143
"""Initialize Redis indexes asynchronously."""
144+
# Capture the running event loop here so that sync wrapper methods
145+
# (get_tuple, put, put_writes, …) can dispatch coroutines to it via
146+
# asyncio.run_coroutine_threadsafe. Deferring this to asetup() instead
147+
# of __init__ lets callers construct the saver outside an async context
148+
# (Issue #179).
149+
self.loop = asyncio.get_running_loop()
150+
142151
await self.checkpoints_index.create(overwrite=False)
143152
await self.checkpoint_writes_index.create(overwrite=False)
144153

@@ -725,6 +734,11 @@ def create_indexes(self) -> None:
725734

726735
def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
727736
"""Retrieve a checkpoint tuple from Redis synchronously."""
737+
if self.loop is None:
738+
raise RuntimeError(
739+
"AsyncShallowRedisSaver must be set up before calling synchronous methods. "
740+
"Call `await saver.asetup()` or use `async with saver:` first."
741+
)
728742
try:
729743
if asyncio.get_running_loop() is self.loop:
730744
raise asyncio.InvalidStateError(
@@ -747,6 +761,11 @@ def put(
747761
new_versions: ChannelVersions,
748762
) -> RunnableConfig:
749763
"""Store only the latest checkpoint synchronously."""
764+
if self.loop is None:
765+
raise RuntimeError(
766+
"AsyncShallowRedisSaver must be set up before calling synchronous methods. "
767+
"Call `await saver.asetup()` or use `async with saver:` first."
768+
)
750769
return asyncio.run_coroutine_threadsafe(
751770
self.aput(config, checkpoint, metadata, new_versions), self.loop
752771
).result()
@@ -759,6 +778,11 @@ def put_writes(
759778
task_path: str = "",
760779
) -> None:
761780
"""Store intermediate writes synchronously."""
781+
if self.loop is None:
782+
raise RuntimeError(
783+
"AsyncShallowRedisSaver must be set up before calling synchronous methods. "
784+
"Call `await saver.asetup()` or use `async with saver:` first."
785+
)
762786
return asyncio.run_coroutine_threadsafe(
763787
self.aput_writes(config, writes, task_id), self.loop
764788
).result()
@@ -771,6 +795,11 @@ def get_channel_values(
771795
channel_versions: Optional[Dict[str, Any]] = None,
772796
) -> dict[str, Any]:
773797
"""Retrieve channel_values dictionary with properly constructed message objects (sync wrapper)."""
798+
if self.loop is None:
799+
raise RuntimeError(
800+
"AsyncShallowRedisSaver must be set up before calling synchronous methods. "
801+
"Call `await saver.asetup()` or use `async with saver:` first."
802+
)
774803
try:
775804
if asyncio.get_running_loop() is self.loop:
776805
raise asyncio.InvalidStateError(

tests/test_async.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -848,3 +848,77 @@ async def test_root_graph_checkpoint(
848848
checkpoints = [c async for c in checkpointer.alist(config)]
849849
assert len(checkpoints) > 0
850850
assert checkpoints[-1].checkpoint["id"] == latest["id"]
851+
852+
853+
# --- Issue #179: AsyncRedisSaver construction outside async context ---
854+
855+
856+
def test_async_redis_saver_construction_outside_event_loop(redis_url: str) -> None:
857+
"""AsyncRedisSaver should be constructable outside an async context (Issue #179).
858+
859+
Previously, AsyncRedisSaver.__init__ called asyncio.get_running_loop() which
860+
raised RuntimeError when no event loop was running.
861+
"""
862+
# This must not raise RuntimeError even when there is no running event loop
863+
saver = AsyncRedisSaver(redis_url)
864+
assert saver is not None
865+
# Loop should be None until asetup() is called
866+
assert saver.loop is None
867+
868+
869+
def test_async_redis_saver_construction_with_client_outside_event_loop(
870+
redis_url: str,
871+
) -> None:
872+
"""AsyncRedisSaver should accept a pre-built client without a running loop (Issue #179).
873+
874+
The typical use-case from the issue: constructing the saver synchronously,
875+
then setting up (and using it) later inside an async lifespan handler.
876+
"""
877+
from redis.asyncio import Redis as AsyncRedis
878+
879+
client = AsyncRedis.from_url(redis_url)
880+
try:
881+
saver = AsyncRedisSaver(redis_client=client)
882+
assert saver is not None
883+
assert saver.loop is None
884+
finally:
885+
asyncio.run(client.aclose())
886+
887+
888+
@pytest.mark.asyncio
889+
async def test_async_redis_saver_loop_captured_in_asetup(redis_url: str) -> None:
890+
"""asetup() must capture the running event loop so sync wrappers work (Issue #179)."""
891+
saver = AsyncRedisSaver(redis_url)
892+
assert saver.loop is None # not yet set
893+
894+
await saver.asetup()
895+
896+
# After asetup the loop attribute must point to the current running loop
897+
assert saver.loop is not None
898+
assert saver.loop is asyncio.get_running_loop()
899+
900+
await saver._redis.aclose()
901+
902+
903+
@pytest.mark.asyncio
904+
async def test_async_redis_saver_context_manager_after_sync_construction(
905+
redis_url: str,
906+
) -> None:
907+
"""Saver built outside an event loop must still work as an async context manager."""
908+
# Simulate sync-land construction (no running event loop at this point in an
909+
# async test, but we at least verify the object is usable end-to-end).
910+
saver = AsyncRedisSaver(redis_url)
911+
912+
async with saver:
913+
# After entering the context the loop must be set
914+
assert saver.loop is asyncio.get_running_loop()
915+
916+
# Basic functional smoke test
917+
config: RunnableConfig = {
918+
"configurable": {"thread_id": "issue-179-test", "checkpoint_ns": ""}
919+
}
920+
chk: Checkpoint = empty_checkpoint()
921+
meta: CheckpointMetadata = {"source": "input", "step": 0, "writes": {}}
922+
await saver.aput(config, chk, meta, {})
923+
result = await saver.aget_tuple(config)
924+
assert result is not None

tests/test_shallow_async.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from typing import Any, AsyncGenerator, Dict
23

34
import pytest
@@ -494,3 +495,72 @@ async def test_shallow_redis_saver_inline_storage(redis_url: str) -> None:
494495
# Clean up test data
495496
await redis_client.flushdb()
496497
await redis_client.aclose()
498+
499+
500+
# --- Issue #179: AsyncShallowRedisSaver construction outside async context ---
501+
502+
503+
def test_async_shallow_redis_saver_construction_outside_event_loop(
504+
redis_url: str,
505+
) -> None:
506+
"""AsyncShallowRedisSaver should be constructable outside an async context (Issue #179).
507+
508+
Previously, AsyncShallowRedisSaver.__init__ called asyncio.get_running_loop() which
509+
raised RuntimeError when no event loop was running.
510+
"""
511+
# This must not raise RuntimeError even when there is no running event loop
512+
saver = AsyncShallowRedisSaver(redis_url)
513+
assert saver is not None
514+
# Loop should be None until asetup() is called
515+
assert saver.loop is None
516+
517+
518+
def test_async_shallow_redis_saver_construction_with_client_outside_event_loop(
519+
redis_url: str,
520+
) -> None:
521+
"""AsyncShallowRedisSaver accepts a pre-built client without a running loop (Issue #179)."""
522+
from redis.asyncio import Redis as AsyncRedis
523+
524+
client = AsyncRedis.from_url(redis_url)
525+
try:
526+
saver = AsyncShallowRedisSaver(redis_client=client)
527+
assert saver is not None
528+
assert saver.loop is None
529+
finally:
530+
asyncio.run(client.aclose())
531+
532+
533+
@pytest.mark.asyncio
534+
async def test_async_shallow_redis_saver_loop_captured_in_asetup(
535+
redis_url: str,
536+
) -> None:
537+
"""asetup() must capture the running event loop so sync wrappers work (Issue #179)."""
538+
saver = AsyncShallowRedisSaver(redis_url)
539+
assert saver.loop is None # not yet set
540+
541+
await saver.asetup()
542+
543+
assert saver.loop is not None
544+
assert saver.loop is asyncio.get_running_loop()
545+
546+
await saver._redis.aclose()
547+
548+
549+
@pytest.mark.asyncio
550+
async def test_async_shallow_redis_saver_context_manager_after_sync_construction(
551+
redis_url: str,
552+
) -> None:
553+
"""Saver built outside an event loop must still work as an async context manager."""
554+
saver = AsyncShallowRedisSaver(redis_url)
555+
556+
async with saver:
557+
assert saver.loop is asyncio.get_running_loop()
558+
559+
config: RunnableConfig = {
560+
"configurable": {"thread_id": "issue-179-shallow-test", "checkpoint_ns": ""}
561+
}
562+
chk: Checkpoint = empty_checkpoint()
563+
meta: CheckpointMetadata = {"source": "input", "step": 0, "writes": {}}
564+
await saver.aput(config, chk, meta, {})
565+
result = await saver.aget_tuple(config)
566+
assert result is not None

0 commit comments

Comments
 (0)