Skip to content

Commit 7ec6fbe

Browse files
authored
Merge pull request #6 from Couchbase-Ecosystem/DA-1283/Update-Langgraph
DA-1283 update: dependencies and version
2 parents 9ad9950 + e373293 commit 7ec6fbe

4 files changed

Lines changed: 81 additions & 54 deletions

File tree

langgraph_checkpointer_couchbase/async_cb_saver.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@
2222

2323
logger = logging.getLogger(__name__)
2424

25+
# Default timeout for database operations
26+
DEFAULT_TIMEOUT = timedelta(seconds=5)
27+
# Default serialization type for metadata (for backward compatibility)
28+
DEFAULT_METADATA_TYPE = "json"
29+
2530
class AsyncCouchbaseSaver(BaseCheckpointSaver):
2631
"""A checkpoint saver that stores checkpoints in a Couchbase database."""
2732

@@ -94,10 +99,6 @@ async def from_conn_info(
9499
auth = PasswordAuthenticator(cb_username, cb_password)
95100
options = ClusterOptions(auth)
96101
cluster = await ACluster.connect(cb_conn_str, options)
97-
98-
cls.cluster = cluster
99-
cls.bucket_name = bucket_name
100-
cls.scope_name = scope_name
101102

102103
bucket = cluster.bucket(bucket_name)
103104
await bucket.on_connect()
@@ -211,7 +212,7 @@ async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
211212
return CheckpointTuple(
212213
{"configurable": config_values},
213214
checkpoint,
214-
self.serde.loads(_decode_binary(doc["metadata"])),
215+
self.serde.loads_typed((doc.get("metadata_type", DEFAULT_METADATA_TYPE), _decode_binary(doc["metadata"]))) if doc.get("metadata") else None,
215216
(
216217
{
217218
"configurable": {
@@ -226,6 +227,8 @@ async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
226227
pending_writes,
227228
)
228229

230+
return None
231+
229232
async def alist(
230233
self,
231234
config: Optional[RunnableConfig],
@@ -285,7 +288,7 @@ async def alist(
285288
}
286289
},
287290
checkpoint,
288-
self.serde.loads(_decode_binary(doc["metadata"])),
291+
self.serde.loads_typed((doc.get("metadata_type", DEFAULT_METADATA_TYPE), _decode_binary(doc["metadata"]))) if doc.get("metadata") else None,
289292
(
290293
{
291294
"configurable": {
@@ -327,15 +330,16 @@ async def aput(
327330
if serialized_checkpoint:
328331
serialized_checkpoint = _encode_binary(serialized_checkpoint)
329332

330-
metadata = self.serde.dumps(metadata)
331-
if metadata:
332-
metadata = _encode_binary(metadata)
333+
# Serialize and encode metadata
334+
metadata_type, metadata_bytes = self.serde.dumps_typed(metadata)
335+
serialized_metadata = _encode_binary(metadata_bytes) if metadata_bytes else None
333336

334337
doc = {
335338
"parent_checkpoint_id": config["configurable"].get("checkpoint_id"),
336339
"type": type_,
337340
"checkpoint": serialized_checkpoint,
338-
"metadata": metadata,
341+
"metadata": serialized_metadata,
342+
"metadata_type": metadata_type,
339343
"thread_id" : thread_id,
340344
"checkpoint_ns": checkpoint_ns,
341345
"checkpoint_id": checkpoint_id,
@@ -346,7 +350,7 @@ async def aput(
346350
# ensure bucket connected (idempotent)
347351
await self.bucket.on_connect()
348352
collection = self.bucket.scope(self.scope_name).collection(self.checkpoints_collection_name)
349-
await collection.upsert(upsert_key, (doc), UpsertOptions(timeout=timedelta(seconds=5)))
353+
await collection.upsert(upsert_key, (doc), UpsertOptions(timeout=DEFAULT_TIMEOUT))
350354

351355
return {
352356
"configurable": {
@@ -394,4 +398,4 @@ async def aput_writes(
394398
"type": type_,
395399
"value": serialized_value,
396400
}
397-
await collection.upsert(upsert_key, (doc), UpsertOptions(timeout=timedelta(seconds=5)))
401+
await collection.upsert(upsert_key, (doc), UpsertOptions(timeout=DEFAULT_TIMEOUT))

langgraph_checkpointer_couchbase/couchbase_saver.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@
2222
from .utils import _encode_binary, _decode_binary
2323

2424
logger = logging.getLogger(__name__)
25+
26+
# Default timeout for database operations
27+
DEFAULT_TIMEOUT = timedelta(seconds=5)
28+
# Default serialization type for metadata (for backward compatibility)
29+
DEFAULT_METADATA_TYPE = "json"
2530
class CouchbaseSaver(BaseCheckpointSaver):
2631
"""A checkpoint saver that stores checkpoints in a Couchbase database.
2732
@@ -77,11 +82,7 @@ def from_conn_info(
7782
auth = PasswordAuthenticator(cb_username, cb_password)
7883
options = ClusterOptions(auth)
7984
cluster = Cluster(cb_conn_str, options)
80-
cluster.wait_until_ready(timedelta(seconds=5))
81-
82-
cls.cluster = cluster
83-
cls.bucket_name = bucket_name
84-
cls.scope_name = scope_name
85+
cluster.wait_until_ready(DEFAULT_TIMEOUT)
8586

8687
saver = CouchbaseSaver(cluster, bucket_name, scope_name, checkpoints_collection_name, checkpoint_writes_collection_name)
8788

@@ -197,8 +198,8 @@ def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
197198
)
198199

199200
# Decode and deserialize metadata
200-
metadata = _decode_binary(doc["metadata"])
201-
metadata = self.serde.loads(metadata)
201+
metadata_data = _decode_binary(doc["metadata"])
202+
metadata = self.serde.loads_typed((doc.get("metadata_type", DEFAULT_METADATA_TYPE), metadata_data))
202203

203204
return CheckpointTuple(
204205
{"configurable": config_values},
@@ -218,6 +219,8 @@ def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
218219
pending_writes,
219220
)
220221

222+
return None
223+
221224
def list(
222225
self,
223226
config: Optional[RunnableConfig],
@@ -277,7 +280,7 @@ def list(
277280
}
278281
},
279282
checkpoint,
280-
self.serde.loads(_decode_binary(doc["metadata"])),
283+
self.serde.loads_typed((doc.get("metadata_type", DEFAULT_METADATA_TYPE), _decode_binary(doc["metadata"]))),
281284
(
282285
{
283286
"configurable": {
@@ -322,23 +325,23 @@ def put(
322325
serialized_checkpoint = _encode_binary(serialized_checkpoint)
323326

324327
# Serialize and encode metadata
325-
metadata_bytes = self.serde.dumps(metadata)
326-
if metadata_bytes:
327-
metadata = _encode_binary(metadata_bytes)
328+
metadata_type, metadata_bytes = self.serde.dumps_typed(metadata)
329+
serialized_metadata = _encode_binary(metadata_bytes) if metadata_bytes else None
328330

329331
doc = {
330332
"parent_checkpoint_id": config["configurable"].get("checkpoint_id"),
331333
"type": type_,
332334
"checkpoint": serialized_checkpoint,
333-
"metadata": metadata,
335+
"metadata": serialized_metadata,
336+
"metadata_type": metadata_type,
334337
"thread_id" : thread_id,
335338
"checkpoint_ns": checkpoint_ns,
336339
"checkpoint_id": checkpoint_id,
337340
}
338341
upsert_key = f"{thread_id}::{checkpoint_ns}::{checkpoint_id}"
339342

340343
collection = self.checkpoints_collection
341-
collection.upsert(upsert_key, (doc), UpsertOptions(timeout=timedelta(seconds=5)))
344+
collection.upsert(upsert_key, (doc), UpsertOptions(timeout=DEFAULT_TIMEOUT))
342345

343346
return {
344347
"configurable": {
@@ -387,4 +390,4 @@ def put_writes(
387390
"type": type_,
388391
"value": serialized_value,
389392
}
390-
collection.upsert(upsert_key, (doc), UpsertOptions(timeout=timedelta(seconds=5)))
393+
collection.upsert(upsert_key, doc, UpsertOptions(timeout=DEFAULT_TIMEOUT))

pyproject.toml

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "langgraph-checkpointer-couchbase"
7-
version = "1.0.7"
7+
version = "1.0.8"
88
description = ''
99
readme = "README.md"
1010
requires-python = ">=3.8"
@@ -25,11 +25,13 @@ classifiers = [
2525
"Programming Language :: Python :: Implementation :: PyPy",
2626
]
2727
dependencies = [
28-
"couchbase>=4.3.5",
29-
"langgraph>=0.3.22",
30-
"langchain-openai>=0.3.11",
31-
"pydantic>=2.11.1",
32-
"typing_extensions>=4.13.0"
28+
"couchbase>=4.5.0",
29+
"langgraph>=1.0.5",
30+
"langchain-openai>=1.1.3",
31+
"pydantic>=2.12.5",
32+
"typing_extensions>=4.15.0",
33+
"langchain>=1.1.3",
34+
"python-dotenv>=1.0.0",
3335
]
3436

3537
[project.urls]

tests/agent_e2e_test.py

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import asyncio
22
from typing import Literal
3-
from langchain_core.runnables import ConfigurableField
43
from langchain_core.tools import tool
54
from langchain_openai import ChatOpenAI
6-
from langgraph.prebuilt import create_react_agent
5+
from langchain.agents import create_agent
76
from langgraph_checkpointer_couchbase import CouchbaseSaver, AsyncCouchbaseSaver
87
from dotenv import load_dotenv
98
import os
9+
1010
load_dotenv()
1111

1212
@tool
@@ -21,37 +21,49 @@ def get_weather(city: Literal["nyc", "sf"]):
2121

2222

2323
tools = [get_weather]
24-
model = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)
24+
model = ChatOpenAI(model="gpt-5-mini", temperature=0)
25+
2526

2627
def syncTest():
2728
with CouchbaseSaver.from_conn_info(
28-
cb_conn_str=os.getenv("CB_CLUSTER") or "couchbase://localhost",
29-
cb_username=os.getenv("CB_USERNAME") or "Administrator",
30-
cb_password=os.getenv("CB_PASSWORD") or "password",
31-
bucket_name=os.getenv("CB_BUCKET") or "test",
32-
scope_name=os.getenv("CB_SCOPE") or "langgraph",
29+
cb_conn_str=os.getenv("CB_CLUSTER"),
30+
cb_username=os.getenv("CB_USERNAME"),
31+
cb_password=os.getenv("CB_PASSWORD"),
32+
bucket_name=os.getenv("CB_BUCKET"),
33+
scope_name=os.getenv("CB_SCOPE"),
3334
) as checkpointer:
34-
graph = create_react_agent(model, tools=tools, checkpointer=checkpointer)
35+
graph = create_agent(
36+
model,
37+
tools=tools,
38+
checkpointer=checkpointer,
39+
)
3540
config = {"configurable": {"thread_id": "1"}}
3641
res = graph.invoke({"messages": [("human", "what's the weather in sf")]}, config)
3742

3843
latest_checkpoint = checkpointer.get(config)
3944
latest_checkpoint_tuple = checkpointer.get_tuple(config)
4045
checkpoint_tuples = list(checkpointer.list(config))
4146

42-
print(latest_checkpoint)
43-
print(latest_checkpoint_tuple)
44-
print(checkpoint_tuples)
47+
print("=== Sync Test Results ===")
48+
print(f"Response: {res}")
49+
print(f"Latest checkpoint: {latest_checkpoint}")
50+
print(f"Latest checkpoint tuple: {latest_checkpoint_tuple}")
51+
print(f"All checkpoint tuples: {checkpoint_tuples}")
52+
4553

4654
async def asyncTest():
4755
async with AsyncCouchbaseSaver.from_conn_info(
48-
cb_conn_str=os.getenv("CB_CLUSTER") or "couchbase://localhost",
49-
cb_username=os.getenv("CB_USERNAME") or "Administrator",
50-
cb_password=os.getenv("CB_PASSWORD") or "password",
51-
bucket_name=os.getenv("CB_BUCKET") or "test",
52-
scope_name=os.getenv("CB_SCOPE") or "langgraph",
56+
cb_conn_str=os.getenv("CB_CLUSTER"),
57+
cb_username=os.getenv("CB_USERNAME"),
58+
cb_password=os.getenv("CB_PASSWORD"),
59+
bucket_name=os.getenv("CB_BUCKET"),
60+
scope_name=os.getenv("CB_SCOPE"),
5361
) as checkpointer:
54-
graph = create_react_agent(model, tools=tools, checkpointer=checkpointer)
62+
graph = create_agent(
63+
model,
64+
tools=tools,
65+
checkpointer=checkpointer,
66+
)
5567
config = {"configurable": {"thread_id": "2"}}
5668
res = await graph.ainvoke(
5769
{"messages": [("human", "what's the weather in nyc")]}, config
@@ -61,10 +73,16 @@ async def asyncTest():
6173
latest_checkpoint_tuple = await checkpointer.aget_tuple(config)
6274
checkpoint_tuples = [c async for c in checkpointer.alist(config)]
6375

64-
print(latest_checkpoint)
65-
print(latest_checkpoint_tuple)
66-
print(checkpoint_tuples)
76+
print("=== Async Test Results ===")
77+
print(f"Response: {res}")
78+
print(f"Latest checkpoint: {latest_checkpoint}")
79+
print(f"Latest checkpoint tuple: {latest_checkpoint_tuple}")
80+
print(f"All checkpoint tuples: {checkpoint_tuples}")
81+
6782

6883
if __name__ == "__main__":
84+
print("Running sync test...")
6985
syncTest()
70-
asyncio.run(asyncTest())
86+
print("\nRunning async test...")
87+
asyncio.run(asyncTest())
88+
print("\nAll tests completed!")

0 commit comments

Comments
 (0)