Skip to content

Commit 0ed1e93

Browse files
committed
fix: AsyncRedisSaver cannot be constructed outside an async context (#179)
1 parent e2a9448 commit 0ed1e93

4 files changed

Lines changed: 234 additions & 4 deletions

File tree

langgraph/checkpoint/redis/aio.py

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,9 @@ def __init__(
101101
checkpoint_prefix=checkpoint_prefix,
102102
checkpoint_write_prefix=checkpoint_write_prefix,
103103
)
104-
self.loop = asyncio.get_running_loop()
104+
# Deferred: the event loop is captured in asetup() so that the saver can
105+
# be constructed outside an async context (Issue #179).
106+
self.loop: Optional[asyncio.AbstractEventLoop] = None
105107

106108
# Instance-level cache for frequently used keys (limited size to prevent memory issues)
107109
self._key_cache: Dict[str, str] = {}
@@ -243,6 +245,13 @@ async def __aexit__(
243245

244246
async def asetup(self) -> None:
245247
"""Set up the checkpoint saver."""
248+
# Capture the running event loop here so that sync wrapper methods
249+
# (get_tuple, put, put_writes, …) can dispatch coroutines to it via
250+
# asyncio.run_coroutine_threadsafe. Deferring this to asetup() instead
251+
# of __init__ lets callers construct the saver outside an async context
252+
# (Issue #179).
253+
self.loop = asyncio.get_running_loop()
254+
246255
self.create_indexes()
247256
await self.checkpoints_index.create(overwrite=False)
248257
await self.checkpoint_writes_index.create(overwrite=False)
@@ -1307,6 +1316,20 @@ def put_writes(
13071316
task_id (str): Identifier for the task creating the writes.
13081317
task_path (str): Path of the task creating the writes.
13091318
"""
1319+
if self.loop is None:
1320+
raise RuntimeError(
1321+
"AsyncRedisSaver must be set up before calling synchronous methods. "
1322+
"Call `await saver.asetup()` or use `async with saver:` first."
1323+
)
1324+
try:
1325+
if asyncio.get_running_loop() is self.loop:
1326+
raise asyncio.InvalidStateError(
1327+
"Synchronous calls to AsyncRedisSaver are only allowed from a "
1328+
"different thread. From the main thread, use the async interface. "
1329+
"For example, use `await checkpointer.aput_writes(...)`."
1330+
)
1331+
except RuntimeError:
1332+
pass
13101333
return asyncio.run_coroutine_threadsafe(
13111334
self.aput_writes(config, writes, task_id), self.loop
13121335
).result()
@@ -1315,12 +1338,17 @@ def get_channel_values(
13151338
self, thread_id: str, checkpoint_ns: str = "", checkpoint_id: str = ""
13161339
) -> Dict[str, Any]:
13171340
"""Retrieve channel_values using efficient FT.SEARCH with checkpoint_id (sync wrapper)."""
1341+
if self.loop is None:
1342+
raise RuntimeError(
1343+
"AsyncRedisSaver must be set up before calling synchronous methods. "
1344+
"Call `await saver.asetup()` or use `async with saver:` first."
1345+
)
13181346
try:
13191347
if asyncio.get_running_loop() is self.loop:
13201348
raise asyncio.InvalidStateError(
13211349
"Synchronous calls to AsyncRedisSaver are only allowed from a "
1322-
"different thread. From the main thread, use the async interface."
1323-
"For example, use `await checkpointer.get_channel_values(...)`."
1350+
"different thread. From the main thread, use the async interface. "
1351+
"For example, use `await checkpointer.aget_channel_values(...)`."
13241352
)
13251353
except RuntimeError:
13261354
pass
@@ -1345,6 +1373,11 @@ def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
13451373
Raises:
13461374
asyncio.InvalidStateError: If called from the wrong thread/event loop
13471375
"""
1376+
if self.loop is None:
1377+
raise RuntimeError(
1378+
"AsyncRedisSaver must be set up before calling synchronous methods. "
1379+
"Call `await saver.asetup()` or use `async with saver:` first."
1380+
)
13481381
try:
13491382
# check if we are in the main thread, only bg threads can block
13501383
if asyncio.get_running_loop() is self.loop:
@@ -1381,6 +1414,11 @@ def put(
13811414
Raises:
13821415
asyncio.InvalidStateError: If called from the wrong thread/event loop
13831416
"""
1417+
if self.loop is None:
1418+
raise RuntimeError(
1419+
"AsyncRedisSaver must be set up before calling synchronous methods. "
1420+
"Call `await saver.asetup()` or use `async with saver:` first."
1421+
)
13841422
try:
13851423
# check if we are in the main thread, only bg threads can block
13861424
if asyncio.get_running_loop() is self.loop:

langgraph/checkpoint/redis/ashallow.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,9 @@ def __init__(
7777
checkpoint_prefix=checkpoint_prefix,
7878
checkpoint_write_prefix=checkpoint_write_prefix,
7979
)
80-
self.loop = asyncio.get_running_loop()
80+
# Deferred: the event loop is captured in asetup() so that the saver can
81+
# be constructed outside an async context (Issue #179).
82+
self.loop: Optional[asyncio.AbstractEventLoop] = None
8183

8284
# Instance-level cache for frequently used keys (limited size to prevent memory issues)
8385
self._key_cache: Dict[str, str] = {}
@@ -139,6 +141,13 @@ async def from_conn_string(
139141

140142
async def asetup(self) -> None:
141143
"""Initialize Redis indexes asynchronously."""
144+
# Capture the running event loop here so that sync wrapper methods
145+
# (get_tuple, put, put_writes, …) can dispatch coroutines to it via
146+
# asyncio.run_coroutine_threadsafe. Deferring this to asetup() instead
147+
# of __init__ lets callers construct the saver outside an async context
148+
# (Issue #179).
149+
self.loop = asyncio.get_running_loop()
150+
142151
await self.checkpoints_index.create(overwrite=False)
143152
await self.checkpoint_writes_index.create(overwrite=False)
144153

@@ -725,6 +734,11 @@ def create_indexes(self) -> None:
725734

726735
def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
727736
"""Retrieve a checkpoint tuple from Redis synchronously."""
737+
if self.loop is None:
738+
raise RuntimeError(
739+
"AsyncShallowRedisSaver must be set up before calling synchronous methods. "
740+
"Call `await saver.asetup()` or use `async with saver:` first."
741+
)
728742
try:
729743
if asyncio.get_running_loop() is self.loop:
730744
raise asyncio.InvalidStateError(
@@ -747,6 +761,20 @@ def put(
747761
new_versions: ChannelVersions,
748762
) -> RunnableConfig:
749763
"""Store only the latest checkpoint synchronously."""
764+
if self.loop is None:
765+
raise RuntimeError(
766+
"AsyncShallowRedisSaver must be set up before calling synchronous methods. "
767+
"Call `await saver.asetup()` or use `async with saver:` first."
768+
)
769+
try:
770+
if asyncio.get_running_loop() is self.loop:
771+
raise asyncio.InvalidStateError(
772+
"Synchronous calls to AsyncShallowRedisSaver are only allowed from a "
773+
"different thread. From the main thread, use the async interface. "
774+
"For example, use `await checkpointer.aput(...)`."
775+
)
776+
except RuntimeError:
777+
pass
750778
return asyncio.run_coroutine_threadsafe(
751779
self.aput(config, checkpoint, metadata, new_versions), self.loop
752780
).result()
@@ -759,6 +787,20 @@ def put_writes(
759787
task_path: str = "",
760788
) -> None:
761789
"""Store intermediate writes synchronously."""
790+
if self.loop is None:
791+
raise RuntimeError(
792+
"AsyncShallowRedisSaver must be set up before calling synchronous methods. "
793+
"Call `await saver.asetup()` or use `async with saver:` first."
794+
)
795+
try:
796+
if asyncio.get_running_loop() is self.loop:
797+
raise asyncio.InvalidStateError(
798+
"Synchronous calls to AsyncShallowRedisSaver are only allowed from a "
799+
"different thread. From the main thread, use the async interface. "
800+
"For example, use `await checkpointer.aput_writes(...)`."
801+
)
802+
except RuntimeError:
803+
pass
762804
return asyncio.run_coroutine_threadsafe(
763805
self.aput_writes(config, writes, task_id), self.loop
764806
).result()
@@ -771,6 +813,11 @@ def get_channel_values(
771813
channel_versions: Optional[Dict[str, Any]] = None,
772814
) -> dict[str, Any]:
773815
"""Retrieve channel_values dictionary with properly constructed message objects (sync wrapper)."""
816+
if self.loop is None:
817+
raise RuntimeError(
818+
"AsyncShallowRedisSaver must be set up before calling synchronous methods. "
819+
"Call `await saver.asetup()` or use `async with saver:` first."
820+
)
774821
try:
775822
if asyncio.get_running_loop() is self.loop:
776823
raise asyncio.InvalidStateError(

tests/test_async.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -848,3 +848,78 @@ async def test_root_graph_checkpoint(
848848
checkpoints = [c async for c in checkpointer.alist(config)]
849849
assert len(checkpoints) > 0
850850
assert checkpoints[-1].checkpoint["id"] == latest["id"]
851+
852+
853+
# --- Issue #179: AsyncRedisSaver construction outside async context ---
854+
855+
856+
def test_async_redis_saver_construction_outside_event_loop(redis_url: str) -> None:
857+
"""AsyncRedisSaver should be constructable outside an async context (Issue #179).
858+
859+
Previously, AsyncRedisSaver.__init__ called asyncio.get_running_loop() which
860+
raised RuntimeError when no event loop was running.
861+
"""
862+
# This must not raise RuntimeError even when there is no running event loop
863+
saver = AsyncRedisSaver(redis_url)
864+
assert saver is not None
865+
# Loop should be None until asetup() is called
866+
assert saver.loop is None
867+
868+
869+
def test_async_redis_saver_construction_with_client_outside_event_loop(
870+
redis_url: str,
871+
) -> None:
872+
"""AsyncRedisSaver should accept a pre-built client without a running loop (Issue #179).
873+
874+
The typical use-case from the issue: constructing the saver synchronously,
875+
then setting up (and using it) later inside an async lifespan handler.
876+
"""
877+
from redis.asyncio import Redis as AsyncRedis
878+
879+
client = AsyncRedis.from_url(redis_url)
880+
try:
881+
saver = AsyncRedisSaver(redis_client=client)
882+
assert saver is not None
883+
assert saver.loop is None
884+
finally:
885+
asyncio.run(client.aclose())
886+
887+
888+
@pytest.mark.asyncio
889+
async def test_async_redis_saver_loop_captured_in_asetup(redis_url: str) -> None:
890+
"""asetup() must capture the running event loop so sync wrappers work (Issue #179)."""
891+
saver = AsyncRedisSaver(redis_url)
892+
assert saver.loop is None # not yet set
893+
894+
await saver.asetup()
895+
896+
# After asetup the loop attribute must point to the current running loop
897+
assert saver.loop is not None
898+
assert saver.loop is asyncio.get_running_loop()
899+
900+
await saver._redis.aclose()
901+
902+
903+
@pytest.mark.asyncio
904+
async def test_async_redis_saver_context_manager_after_sync_construction(
905+
redis_url: str,
906+
) -> None:
907+
"""Saver built before entering the async context manager must still work."""
908+
# Construct before entering `async with`; in this async test a loop is already
909+
# running, but this still verifies the saver is usable end-to-end once setup
910+
# happens on context-manager entry.
911+
saver = AsyncRedisSaver(redis_url)
912+
913+
async with saver:
914+
# After entering the context the loop must be set
915+
assert saver.loop is asyncio.get_running_loop()
916+
917+
# Basic functional smoke test
918+
config: RunnableConfig = {
919+
"configurable": {"thread_id": "issue-179-test", "checkpoint_ns": ""}
920+
}
921+
chk: Checkpoint = empty_checkpoint()
922+
meta: CheckpointMetadata = {"source": "input", "step": 0, "writes": {}}
923+
await saver.aput(config, chk, meta, {})
924+
result = await saver.aget_tuple(config)
925+
assert result is not None

tests/test_shallow_async.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from typing import Any, AsyncGenerator, Dict
23

34
import pytest
@@ -494,3 +495,72 @@ async def test_shallow_redis_saver_inline_storage(redis_url: str) -> None:
494495
# Clean up test data
495496
await redis_client.flushdb()
496497
await redis_client.aclose()
498+
499+
500+
# --- Issue #179: AsyncShallowRedisSaver construction outside async context ---
501+
502+
503+
def test_async_shallow_redis_saver_construction_outside_event_loop(
504+
redis_url: str,
505+
) -> None:
506+
"""AsyncShallowRedisSaver should be constructable outside an async context (Issue #179).
507+
508+
Previously, AsyncShallowRedisSaver.__init__ called asyncio.get_running_loop() which
509+
raised RuntimeError when no event loop was running.
510+
"""
511+
# This must not raise RuntimeError even when there is no running event loop
512+
saver = AsyncShallowRedisSaver(redis_url)
513+
assert saver is not None
514+
# Loop should be None until asetup() is called
515+
assert saver.loop is None
516+
517+
518+
def test_async_shallow_redis_saver_construction_with_client_outside_event_loop(
519+
redis_url: str,
520+
) -> None:
521+
"""AsyncShallowRedisSaver accepts a pre-built client without a running loop (Issue #179)."""
522+
from redis.asyncio import Redis as AsyncRedis
523+
524+
client = AsyncRedis.from_url(redis_url)
525+
try:
526+
saver = AsyncShallowRedisSaver(redis_client=client)
527+
assert saver is not None
528+
assert saver.loop is None
529+
finally:
530+
asyncio.run(client.aclose())
531+
532+
533+
@pytest.mark.asyncio
534+
async def test_async_shallow_redis_saver_loop_captured_in_asetup(
535+
redis_url: str,
536+
) -> None:
537+
"""asetup() must capture the running event loop so sync wrappers work (Issue #179)."""
538+
saver = AsyncShallowRedisSaver(redis_url)
539+
assert saver.loop is None # not yet set
540+
541+
await saver.asetup()
542+
543+
assert saver.loop is not None
544+
assert saver.loop is asyncio.get_running_loop()
545+
546+
await saver._redis.aclose()
547+
548+
549+
@pytest.mark.asyncio
550+
async def test_async_shallow_redis_saver_context_manager_after_sync_construction(
551+
redis_url: str,
552+
) -> None:
553+
"""Saver constructed before entering the async context manager must still work."""
554+
saver = AsyncShallowRedisSaver(redis_url)
555+
556+
async with saver:
557+
assert saver.loop is asyncio.get_running_loop()
558+
559+
config: RunnableConfig = {
560+
"configurable": {"thread_id": "issue-179-shallow-test", "checkpoint_ns": ""}
561+
}
562+
chk: Checkpoint = empty_checkpoint()
563+
meta: CheckpointMetadata = {"source": "input", "step": 0, "writes": {}}
564+
await saver.aput(config, chk, meta, {})
565+
result = await saver.aget_tuple(config)
566+
assert result is not None

0 commit comments

Comments
 (0)