Skip to content

Commit 1642fc2

Browse files
committed
feat: implement AdminForthCheckpointSaver and integrate with agent system
1 parent 448e8bf commit 1642fc2

6 files changed

Lines changed: 419 additions & 15 deletions

File tree

agent/checkpointer.ts

Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
import type { RunnableConfig } from "@langchain/core/runnables";
2+
import {
3+
BaseCheckpointSaver,
4+
type Checkpoint,
5+
type CheckpointPendingWrite,
6+
type CheckpointMetadata,
7+
type CheckpointTuple,
8+
type PendingWrite,
9+
WRITES_IDX_MAP,
10+
} from "@langchain/langgraph-checkpoint";
11+
import type { PluginOptions } from "../types.js";
12+
import { Filters } from "adminforth";
13+
14+
const ROOT_CHECKPOINT_NAMESPACE = "__root__";
15+
16+
export class AdminForthCheckpointSaver extends BaseCheckpointSaver {
17+
constructor(
18+
private readonly adminforth: any,
19+
private readonly pluginOptions: PluginOptions,
20+
) {
21+
super();
22+
}
23+
24+
private get resourceConfig() {
25+
const resource = this.pluginOptions.checkpointResource;
26+
if (!resource) {
27+
throw new Error("checkpointResource is not configured");
28+
}
29+
return resource;
30+
}
31+
32+
private resource() {
33+
return this.adminforth.resource(this.resourceConfig.resourceId);
34+
}
35+
36+
private serialize(value: unknown): string | null {
37+
if (value === undefined || value === null) return null;
38+
return JSON.stringify(value);
39+
}
40+
41+
private deserialize<T>(value: unknown): T | null {
42+
if (value === undefined || value === null) return null;
43+
if (typeof value === "string") {
44+
return JSON.parse(value) as T;
45+
}
46+
return value as T;
47+
}
48+
49+
private now(): string {
50+
return new Date().toISOString();
51+
}
52+
53+
private encodeCheckpointNamespace(checkpointNs: string): string {
54+
return checkpointNs === "" ? ROOT_CHECKPOINT_NAMESPACE : checkpointNs;
55+
}
56+
57+
private decodeCheckpointNamespace(checkpointNs: unknown): string {
58+
return checkpointNs === ROOT_CHECKPOINT_NAMESPACE ? "" : String(checkpointNs ?? "");
59+
}
60+
61+
private getConfigValues(config: RunnableConfig) {
62+
const configurable = (config.configurable ?? {}) as Record<string, unknown>;
63+
64+
return {
65+
threadId: String(configurable.thread_id ?? ""),
66+
checkpointNs: String(configurable.checkpoint_ns ?? ""),
67+
checkpointId: configurable.checkpoint_id
68+
? String(configurable.checkpoint_id)
69+
: null,
70+
};
71+
}
72+
73+
private buildConfig(threadId: string, checkpointNs: string, checkpointId: string): RunnableConfig {
74+
return {
75+
configurable: {
76+
thread_id: threadId,
77+
checkpoint_ns: checkpointNs,
78+
checkpoint_id: checkpointId,
79+
},
80+
};
81+
}
82+
83+
private buildCheckpointRowId(threadId: string, checkpointNs: string, checkpointId: string) {
84+
return `cp:${threadId}:${checkpointNs}:${checkpointId}`;
85+
}
86+
87+
private buildWritesRowId(
88+
threadId: string,
89+
checkpointNs: string,
90+
checkpointId: string,
91+
taskId: string,
92+
seq: number,
93+
) {
94+
return `wr:${threadId}:${checkpointNs}:${checkpointId}:${taskId}:${seq}`;
95+
}
96+
97+
private getWriteIndex(channel: string, index: number): number {
98+
return WRITES_IDX_MAP[channel] ?? index;
99+
}
100+
101+
private isDuplicateCheckpointWriteError(error: unknown): boolean {
102+
return error instanceof Error && error.message.includes("UNIQUE constraint failed");
103+
}
104+
105+
async put(
106+
config: RunnableConfig,
107+
checkpoint: Checkpoint,
108+
metadata: CheckpointMetadata,
109+
_newVersions: Record<string, unknown>,
110+
): Promise<RunnableConfig> {
111+
const r = this.resourceConfig;
112+
const { threadId, checkpointNs } = this.getConfigValues(config);
113+
const checkpointId = String((checkpoint as any).id);
114+
const storedCheckpointNs = this.encodeCheckpointNamespace(checkpointNs);
115+
116+
const parentCheckpointId = this.getConfigValues(config).checkpointId;
117+
const createdAt = this.now();
118+
119+
await this.resource().create({
120+
[r.idField]: this.buildCheckpointRowId(threadId, storedCheckpointNs, checkpointId),
121+
[r.threadIdField]: threadId,
122+
[r.checkpointNamespaceField]: storedCheckpointNs,
123+
[r.checkpointIdField]: checkpointId,
124+
[r.parentCheckpointIdField]: parentCheckpointId,
125+
[r.rowKindField]: "checkpoint",
126+
[r.taskIdField]: null,
127+
[r.sequenceField]: 0,
128+
[r.createdAtField]: createdAt,
129+
[r.checkpointPayloadField]: this.serialize(checkpoint),
130+
[r.metadataPayloadField]: this.serialize(metadata),
131+
[r.writesPayloadField]: null,
132+
[r.schemaVersionField]: 1,
133+
});
134+
135+
return this.buildConfig(threadId, checkpointNs, checkpointId);
136+
}
137+
138+
async putWrites(
139+
config: RunnableConfig,
140+
writes: PendingWrite[],
141+
taskId: string,
142+
): Promise<void> {
143+
const r = this.resourceConfig;
144+
const { threadId, checkpointNs, checkpointId } = this.getConfigValues(config);
145+
const storedCheckpointNs = this.encodeCheckpointNamespace(checkpointNs);
146+
147+
if (!checkpointId) {
148+
throw new Error("putWrites requires checkpoint_id in config");
149+
}
150+
151+
const createdAt = this.now();
152+
153+
await Promise.all(
154+
writes.map(async ([channel, value], index) => {
155+
const writeIndex = this.getWriteIndex(channel, index);
156+
157+
try {
158+
await this.resource().create({
159+
[r.idField]: this.buildWritesRowId(
160+
threadId,
161+
storedCheckpointNs,
162+
checkpointId,
163+
taskId,
164+
writeIndex,
165+
),
166+
[r.threadIdField]: threadId,
167+
[r.checkpointNamespaceField]: storedCheckpointNs,
168+
[r.checkpointIdField]: checkpointId,
169+
[r.parentCheckpointIdField]: null,
170+
[r.rowKindField]: "writes",
171+
[r.taskIdField]: taskId,
172+
[r.sequenceField]: writeIndex,
173+
[r.createdAtField]: createdAt,
174+
[r.checkpointPayloadField]: null,
175+
[r.metadataPayloadField]: null,
176+
[r.writesPayloadField]: this.serialize([channel, value] satisfies PendingWrite),
177+
[r.schemaVersionField]: 1,
178+
});
179+
} catch (error) {
180+
if (!this.isDuplicateCheckpointWriteError(error)) {
181+
throw error;
182+
}
183+
}
184+
}),
185+
);
186+
}
187+
188+
async getTuple(config: RunnableConfig): Promise<CheckpointTuple | undefined> {
189+
const r = this.resourceConfig;
190+
const { threadId, checkpointNs, checkpointId } = this.getConfigValues(config);
191+
const storedCheckpointNs = this.encodeCheckpointNamespace(checkpointNs);
192+
193+
const checkpointRows = await this.resource().list(
194+
checkpointId
195+
? Filters.AND(
196+
Filters.EQ(r.threadIdField, threadId),
197+
Filters.EQ(r.checkpointNamespaceField, storedCheckpointNs),
198+
Filters.EQ(r.checkpointIdField, checkpointId),
199+
Filters.EQ(r.rowKindField, "checkpoint"),
200+
)
201+
: Filters.AND(
202+
Filters.EQ(r.threadIdField, threadId),
203+
Filters.EQ(r.checkpointNamespaceField, storedCheckpointNs),
204+
Filters.EQ(r.rowKindField, "checkpoint"),
205+
),
206+
1,
207+
undefined,
208+
[{ field: r.checkpointIdField, direction: "desc" }],
209+
);
210+
211+
const checkpointRow = checkpointRows[0];
212+
if (!checkpointRow) {
213+
return undefined;
214+
}
215+
216+
const resolvedCheckpointId = String(checkpointRow[r.checkpointIdField]);
217+
218+
const writesRows = await this.resource().list(
219+
Filters.AND(
220+
Filters.EQ(r.threadIdField, threadId),
221+
Filters.EQ(r.checkpointNamespaceField, storedCheckpointNs),
222+
Filters.EQ(r.checkpointIdField, resolvedCheckpointId),
223+
Filters.EQ(r.rowKindField, "writes"),
224+
),
225+
undefined,
226+
undefined,
227+
[{ field: r.sequenceField, direction: "asc" }],
228+
);
229+
230+
const pendingWrites: CheckpointPendingWrite[] = writesRows.flatMap((row) => {
231+
const taskId = String(row[r.taskIdField] ?? "");
232+
const write = this.deserialize<PendingWrite>(row[r.writesPayloadField]);
233+
if (!write) {
234+
return [];
235+
}
236+
237+
const [channel, value] = write;
238+
return [[taskId, channel, value]];
239+
});
240+
241+
const parentCheckpointId = checkpointRow[r.parentCheckpointIdField]
242+
? String(checkpointRow[r.parentCheckpointIdField])
243+
: null;
244+
245+
const tuple: CheckpointTuple = {
246+
config: this.buildConfig(threadId, checkpointNs, resolvedCheckpointId),
247+
checkpoint: this.deserialize<Checkpoint>(
248+
checkpointRow[r.checkpointPayloadField],
249+
) as Checkpoint,
250+
metadata: (this.deserialize<CheckpointMetadata>(
251+
checkpointRow[r.metadataPayloadField],
252+
) ?? {}) as CheckpointMetadata,
253+
parentConfig: parentCheckpointId
254+
? this.buildConfig(threadId, checkpointNs, parentCheckpointId)
255+
: undefined,
256+
pendingWrites,
257+
};
258+
259+
return tuple;
260+
}
261+
262+
async *list(
263+
config: RunnableConfig,
264+
options?: {
265+
before?: RunnableConfig;
266+
limit?: number;
267+
},
268+
): AsyncGenerator<CheckpointTuple> {
269+
const r = this.resourceConfig;
270+
const { threadId, checkpointNs } = this.getConfigValues(config);
271+
const storedCheckpointNs = this.encodeCheckpointNamespace(checkpointNs);
272+
const beforeCheckpointId = options?.before
273+
? this.getConfigValues(options.before).checkpointId
274+
: null;
275+
276+
const filters: Filters[] = [
277+
Filters.EQ(r.rowKindField, "checkpoint"),
278+
Filters.EQ(r.threadIdField, threadId),
279+
Filters.EQ(r.checkpointNamespaceField, storedCheckpointNs),
280+
];
281+
282+
if (beforeCheckpointId) {
283+
filters.push(Filters.LT(r.checkpointIdField, beforeCheckpointId));
284+
}
285+
286+
const rows = await this.resource().list(
287+
Filters.AND(...filters),
288+
options?.limit,
289+
undefined,
290+
[{ field: r.checkpointIdField, direction: "desc" }],
291+
);
292+
293+
for (const row of rows) {
294+
const tuple = await this.getTuple(
295+
this.buildConfig(
296+
String(row[r.threadIdField]),
297+
this.decodeCheckpointNamespace(row[r.checkpointNamespaceField]),
298+
String(row[r.checkpointIdField]),
299+
),
300+
);
301+
302+
if (tuple) {
303+
yield tuple;
304+
}
305+
}
306+
}
307+
308+
async deleteThread(threadId: string, checkpointNs = ""): Promise<void> {
309+
const r = this.resourceConfig;
310+
const storedCheckpointNs = this.encodeCheckpointNamespace(checkpointNs);
311+
312+
const rows = await this.resource().list(
313+
Filters.AND(
314+
Filters.EQ(r.threadIdField, threadId),
315+
Filters.EQ(r.checkpointNamespaceField, storedCheckpointNs),
316+
),
317+
undefined,
318+
undefined,
319+
[{ field: r.createdAtField, direction: "desc" }],
320+
);
321+
322+
for (const row of rows) {
323+
await this.adminforth
324+
.resource(this.pluginOptions.checkpointResource!.resourceId)
325+
.delete(row[r.idField]);
326+
}
327+
}
328+
}

agent/simpleAgent.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { createAgent, summarizationMiddleware } from "langchain";
22
import { logger, type AdminUser, type CompletionAdapter } from "adminforth";
33
import { BaseCallbackHandler } from "@langchain/core/callbacks/base";
4-
import { MemorySaver, type Messages } from "@langchain/langgraph";
4+
import {type BaseCheckpointSaver, type Messages } from "@langchain/langgraph";
55
import type { LLMResult } from "@langchain/core/outputs";
66
import { z } from "zod";
77
import { ChatOpenAI } from "@langchain/openai";
@@ -15,8 +15,6 @@ import { createOpenAiResponsesContinuationMiddleware } from "./middleware/openAi
1515
import type { ApiBasedTool } from "../apiBasedTools.js";
1616
import type { ToolCallEventSink } from "./toolCallEvents.js";
1717

18-
const checkpointer = new MemorySaver();
19-
2018
export const contextSchema = z.object({
2119
adminUser: z.custom<AdminUser>(),
2220
userTimeZone: z.string(),
@@ -36,6 +34,8 @@ type OpenAIBackedCompletionAdapter = CompletionAdapter & {
3634
};
3735
};
3836

37+
type OpenAiReasoningConfig = Record<string, unknown>;
38+
3939
type LlmOutputTokenUsage = {
4040
promptTokens?: unknown;
4141
completionTokens?: unknown;
@@ -178,7 +178,14 @@ export function createAgentChatModel(params: {
178178

179179
const model = params.modelName ?? options.model ?? "gpt-5-nano";
180180
const baseURL = options.baseURL ?? options.baseUrl;
181-
const reasoning = options.extraRequestBodyParameters?.reasoning;
181+
const reasoning = options.extraRequestBodyParameters
182+
?.reasoning as OpenAiReasoningConfig | undefined;
183+
const reasoningConfig = reasoning
184+
? {
185+
...reasoning,
186+
summary: "auto",
187+
}
188+
: undefined;
182189

183190
// @ts-ignore
184191
return new ChatOpenAI({
@@ -192,7 +199,7 @@ export function createAgentChatModel(params: {
192199

193200
promptCacheRetention: "in_memory",
194201

195-
...(reasoning ? { reasoning } : {}),
202+
...(reasoningConfig ? { reasoning: reasoningConfig } : {}),
196203
...(typeof options.timeoutMs === "number"
197204
? { timeout: options.timeoutMs }
198205
: {}),
@@ -210,6 +217,7 @@ export async function callAgent(params: {
210217
name: string;
211218
model: ChatOpenAI;
212219
summaryModel: ChatOpenAI;
220+
checkpointer?: BaseCheckpointSaver;
213221
messages: Messages;
214222
adminUser: AdminUser;
215223
apiBasedTools: Record<string, ApiBasedTool>;
@@ -224,6 +232,7 @@ export async function callAgent(params: {
224232
name,
225233
model,
226234
summaryModel,
235+
checkpointer,
227236
messages,
228237
adminUser,
229238
apiBasedTools,

0 commit comments

Comments
 (0)