Skip to content

Commit bca5991

Browse files
committed
[argus] checkpointer: add AsyncPostgresSaver.aprune
The langgraph postgres checkpointer ships adelete_thread but no aprune. langgraph_api warns about unbounded checkpoint growth when no prune method exists, and operators have no in-process way to keep history bounded short of dropping the whole thread. This adds an aprune method on import: - keep_latest (default) — for each (thread_id, checkpoint_ns) keep only the most recent checkpoint_id (ULID, lexicographically monotonic) and cascade to checkpoint_writes. checkpoint_blobs are left alone because blobs are content-addressed and reused across checkpoints; per-checkpoint blob cleanup risks dropping rows the surviving checkpoint still references, mirroring how the upstream adelete_thread is the only path that touches blobs. - delete_all — bulk delete across all three tables. Implemented as a small monkey-patch module rather than a subclass so deer-flow code that builds AsyncPostgresSaver via from_conn_string gets the new method automatically. Idempotent: no-op if the library ever ships a native aprune (the patch then becomes safe to delete). Activated by an explicit import in the postgres branch of async_provider.py — no .pth tricks, no hidden side effects. Tests skip when langgraph-checkpoint-postgres is not installed (the package is not in upstream deer-flow's default deps). They run in environments that install it (e.g. our argus overlay image). PR-candidate: yes Upstream-issue: none Reason: Fills an obvious gap in the postgres checkpointer story. PR could move the patch into langgraph-checkpoint-postgres itself rather than deer-flow, but having it here keeps deer-flow useful in the meantime.
1 parent 2f114e5 commit bca5991

3 files changed

Lines changed: 238 additions & 0 deletions

File tree

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
"""Add ``aprune`` to ``langgraph.checkpoint.postgres.aio.AsyncPostgresSaver``.
2+
3+
LangGraph's postgres checkpointer (as of langgraph-checkpoint-postgres 3.0.x)
4+
ships ``adelete_thread`` but not ``aprune``. ``langgraph_api`` warns about
5+
unbounded checkpoint growth when no prune method exists, and operators have no
6+
in-process way to keep history bounded short of dropping the whole thread.
7+
8+
This module installs an ``aprune`` method on the class at import time. Idempotent:
9+
no-op if the library already provides one (so a future upstream release that adds
10+
``aprune`` natively wins automatically and this file becomes safe to delete).
11+
12+
Two strategies:
13+
14+
- ``keep_latest`` (default) — for each ``(thread_id, checkpoint_ns)`` keep only
15+
the most recent ``checkpoint_id`` (ULID, lexicographically monotonic) and
16+
cascade the deletion to ``checkpoint_writes``. ``checkpoint_blobs`` is left
17+
alone because blobs are content-addressed and reused across checkpoints; per-
18+
checkpoint blob cleanup risks dropping rows the surviving checkpoint still
19+
references, mirroring how the upstream ``adelete_thread`` is the only path
20+
that touches blobs.
21+
- ``delete_all`` — bulk delete across all three tables for the given threads.
22+
23+
Activated by importing this module from the checkpointer providers (see
24+
``provider.py`` and ``async_provider.py``).
25+
"""
26+
27+
from __future__ import annotations
28+
29+
from collections.abc import Sequence
30+
from typing import Any
31+
32+
try:
33+
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
34+
except ImportError: # pragma: no cover — package not installed in this env
35+
AsyncPostgresSaver = None # type: ignore[assignment]
36+
37+
38+
_PRUNE_SQL = """
39+
WITH keep AS (
40+
SELECT thread_id, checkpoint_ns, MAX(checkpoint_id) AS keep_id
41+
FROM {table}
42+
WHERE thread_id = ANY(%s)
43+
GROUP BY thread_id, checkpoint_ns
44+
)
45+
DELETE FROM {table} t
46+
USING keep k
47+
WHERE t.thread_id = k.thread_id
48+
AND t.checkpoint_ns = k.checkpoint_ns
49+
AND t.checkpoint_id <> k.keep_id
50+
"""
51+
52+
53+
async def _aprune(
54+
self: Any,
55+
thread_ids: Sequence[str],
56+
*,
57+
strategy: str = "keep_latest",
58+
) -> None:
59+
if not thread_ids:
60+
return
61+
62+
ids: list[str] = [str(t) for t in thread_ids]
63+
64+
if strategy == "delete_all":
65+
async with self._cursor(pipeline=True) as cur:
66+
for table in ("checkpoints", "checkpoint_blobs", "checkpoint_writes"):
67+
await cur.execute(
68+
f"DELETE FROM {table} WHERE thread_id = ANY(%s)",
69+
(ids,),
70+
)
71+
return
72+
73+
if strategy != "keep_latest":
74+
raise ValueError(f"unknown aprune strategy: {strategy!r}")
75+
76+
async with self._cursor(pipeline=True) as cur:
77+
for table in ("checkpoints", "checkpoint_writes"):
78+
await cur.execute(_PRUNE_SQL.format(table=table), (ids,))
79+
80+
81+
def _install() -> None:
82+
"""Install ``aprune`` on AsyncPostgresSaver if the library doesn't already provide one."""
83+
if AsyncPostgresSaver is None:
84+
return
85+
if "aprune" in AsyncPostgresSaver.__dict__:
86+
return
87+
AsyncPostgresSaver.aprune = _aprune # type: ignore[attr-defined]
88+
89+
90+
_install()

backend/packages/harness/deerflow/agents/checkpointer/async_provider.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ async def _async_checkpointer(config) -> AsyncIterator[Checkpointer]:
6464
if config.type == "postgres":
6565
try:
6666
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
67+
68+
# Install AsyncPostgresSaver.aprune until upstream ships one natively.
69+
# See _postgres_aprune.py for rationale.
70+
from deerflow.agents.checkpointer import _postgres_aprune # noqa: F401
6771
except ImportError as exc:
6872
raise ImportError(POSTGRES_INSTALL) from exc
6973

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
"""Tests for AsyncPostgresSaver.aprune install and behavior.
2+
3+
The patch lives in ``deerflow.agents.checkpointer._postgres_aprune`` and is
4+
activated by importing it from ``async_provider.py``. These tests verify:
5+
6+
- ``_install()`` is idempotent.
7+
- ``_install()`` does not override a native ``aprune`` if upstream ever
8+
ships one (then the patch becomes a no-op and is safe to delete).
9+
- ``aprune`` issues the expected SQL for both strategies.
10+
- Unknown strategy raises ValueError.
11+
- Empty thread list is a no-op.
12+
"""
13+
14+
from __future__ import annotations
15+
16+
from contextlib import asynccontextmanager
17+
18+
import pytest
19+
20+
# Skip the whole module when langgraph-checkpoint-postgres is not installed
21+
# (e.g. in upstream's default test environment, which only depends on the
22+
# sqlite + memory checkpointers). Our Argus fork installs the postgres
23+
# checkpointer at image build time, so the test runs in our CI.
24+
pytest.importorskip("langgraph.checkpoint.postgres.aio")
25+
26+
from deerflow.agents.checkpointer import _postgres_aprune # noqa: E402
27+
28+
29+
# ---------------------------------------------------------------------------
30+
# Install logic
31+
# ---------------------------------------------------------------------------
32+
33+
34+
class TestInstall:
35+
def test_install_is_idempotent(self):
36+
"""Calling _install twice should not double-wrap or error."""
37+
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
38+
39+
before = AsyncPostgresSaver.aprune
40+
_postgres_aprune._install()
41+
_postgres_aprune._install()
42+
after = AsyncPostgresSaver.aprune
43+
assert before is after
44+
45+
def test_install_skips_when_native_method_exists(self, monkeypatch):
46+
"""If the library ever ships a native aprune, ours must not overwrite it."""
47+
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
48+
49+
sentinel = object()
50+
# Place a fake native method directly in the class __dict__
51+
original = AsyncPostgresSaver.__dict__.get("aprune")
52+
try:
53+
AsyncPostgresSaver.aprune = sentinel # type: ignore[assignment]
54+
_postgres_aprune._install()
55+
assert AsyncPostgresSaver.aprune is sentinel
56+
finally:
57+
if original is None:
58+
# Restore our installed method
59+
_postgres_aprune._install()
60+
else:
61+
AsyncPostgresSaver.aprune = original # type: ignore[assignment]
62+
63+
64+
# ---------------------------------------------------------------------------
65+
# aprune behavior — exercised against a fake saver with a mocked cursor
66+
# ---------------------------------------------------------------------------
67+
68+
69+
class _FakeCursor:
70+
"""Captures executed SQL + parameters for assertions."""
71+
72+
def __init__(self):
73+
self.calls: list[tuple[str, tuple]] = []
74+
75+
async def execute(self, sql: str, params: tuple) -> None:
76+
self.calls.append((sql, params))
77+
78+
79+
class _FakeSaver:
80+
"""Just enough surface to satisfy ``aprune``: a ``_cursor`` async ctx manager."""
81+
82+
def __init__(self):
83+
self.cursor = _FakeCursor()
84+
85+
@asynccontextmanager
86+
async def _cursor(self, pipeline: bool = False):
87+
yield self.cursor
88+
89+
90+
@pytest.mark.anyio
91+
class TestApruneBehavior:
92+
async def test_empty_thread_list_is_noop(self):
93+
saver = _FakeSaver()
94+
await _postgres_aprune._aprune(saver, [])
95+
assert saver.cursor.calls == []
96+
97+
async def test_keep_latest_issues_two_deletes(self):
98+
"""keep_latest must issue DELETEs for checkpoints and checkpoint_writes,
99+
but NOT for checkpoint_blobs (blobs are content-addressed and shared)."""
100+
saver = _FakeSaver()
101+
await _postgres_aprune._aprune(saver, ["t1", "t2"], strategy="keep_latest")
102+
103+
assert len(saver.cursor.calls) == 2
104+
tables_touched = [
105+
"checkpoints" if "FROM checkpoints" in sql else "checkpoint_writes"
106+
for sql, _ in saver.cursor.calls
107+
]
108+
assert set(tables_touched) == {"checkpoints", "checkpoint_writes"}
109+
# Blobs must not be touched
110+
for sql, _ in saver.cursor.calls:
111+
assert "checkpoint_blobs" not in sql
112+
113+
# Each DELETE must scope to the provided thread ids
114+
for _, params in saver.cursor.calls:
115+
assert params == (["t1", "t2"],)
116+
117+
async def test_delete_all_touches_all_three_tables(self):
118+
saver = _FakeSaver()
119+
await _postgres_aprune._aprune(saver, ["t1"], strategy="delete_all")
120+
121+
assert len(saver.cursor.calls) == 3
122+
tables = {
123+
"checkpoints" if "FROM checkpoints " in sql else
124+
"checkpoint_blobs" if "FROM checkpoint_blobs " in sql else
125+
"checkpoint_writes"
126+
for sql, _ in saver.cursor.calls
127+
}
128+
assert tables == {"checkpoints", "checkpoint_blobs", "checkpoint_writes"}
129+
130+
async def test_unknown_strategy_raises(self):
131+
saver = _FakeSaver()
132+
with pytest.raises(ValueError, match="unknown aprune strategy"):
133+
await _postgres_aprune._aprune(saver, ["t1"], strategy="bogus")
134+
135+
async def test_thread_ids_are_stringified(self):
136+
"""UUID objects (or anything else) should be coerced to str before the query."""
137+
import uuid
138+
139+
saver = _FakeSaver()
140+
tid = uuid.uuid4()
141+
await _postgres_aprune._aprune(saver, [tid], strategy="keep_latest")
142+
143+
for _, params in saver.cursor.calls:
144+
assert params == ([str(tid)],)

0 commit comments

Comments
 (0)