Skip to content

Commit 3adf208

Browse files
fix: checkpointers , db issue and tool resp
Co-Authored-By: vizzyfreezy <36118637+vizzyfreezy@users.noreply.github.com>
1 parent 53e1a20 commit 3adf208

File tree

3 files changed

+79
-121
lines changed

3 files changed

+79
-121
lines changed

src/pages/aiAssistant/assistant.js

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { isAIMessageChunk } from "@langchain/core/messages";
1+
import { isAIMessageChunk, isToolMessageChunk } from "@langchain/core/messages";
22
import { ChatGoogleGenerativeAI } from "@langchain/google-genai";
33
import { createReactAgent } from "@langchain/langgraph/prebuilt";
44
import confirm from "dialogs/confirm";
@@ -39,9 +39,6 @@ export default function openAIAssistantPage() {
3939

4040
const GEMINI_API_KEY = ""; // Replace
4141

42-
const searchTool = {
43-
googleSearch: {},
44-
};
4542
const agentCheckpointer = new CordovaSqliteSaver();
4643
const model = new ChatGoogleGenerativeAI({
4744
model: "gemini-2.0-flash",
@@ -644,7 +641,9 @@ export default function openAIAssistantPage() {
644641

645642
let chunkText = "";
646643
if (messageChunkPayload) {
647-
if (
644+
if (isToolMessageChunk(messageChunkPayload)) {
645+
console.log("tool call", messageChunkPayload);
646+
} else if (
648647
isAIMessageChunk(messageChunkPayload) &&
649648
messageChunkPayload.tool_call_chunks?.length
650649
) {

src/pages/aiAssistant/db.js

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ async function initializeTables(dbInstance) {
2929
return new Promise((resolve, reject) => {
3030
dbInstance.transaction(
3131
(tx) => {
32-
// Conversations Table
3332
tx.executeSql(`
3433
CREATE TABLE IF NOT EXISTS conversations (
3534
id TEXT PRIMARY KEY,
@@ -39,7 +38,6 @@ async function initializeTables(dbInstance) {
3938
profile TEXT
4039
)
4140
`);
42-
// Messages Table
4341
tx.executeSql(`
4442
CREATE TABLE IF NOT EXISTS messages (
4543
id TEXT PRIMARY KEY,
@@ -50,39 +48,52 @@ async function initializeTables(dbInstance) {
5048
FOREIGN KEY (conversationId) REFERENCES conversations(id) ON DELETE CASCADE
5149
)
5250
`);
53-
// LangGraph Checkpoints Table
5451
tx.executeSql(`
55-
CREATE TABLE IF NOT EXISTS langgraph_checkpoints (
56-
thread_id TEXT NOT NULL,
57-
checkpoint_id TEXT NOT NULL,
58-
parent_checkpoint_id TEXT,
59-
checkpoint TEXT, -- Store serialized CheckpointTuple as JSON string
60-
updated_at INTEGER, -- Unix timestamp (seconds or milliseconds)
61-
PRIMARY KEY (thread_id, checkpoint_id)
62-
)
63-
`);
52+
CREATE TABLE IF NOT EXISTS langgraph_checkpoints (
53+
thread_id TEXT NOT NULL,
54+
checkpoint_ns TEXT NOT NULL DEFAULT '',
55+
checkpoint_id TEXT NOT NULL,
56+
parent_checkpoint_id TEXT,
57+
checkpoint TEXT,
58+
metadata TEXT,
59+
updated_at INTEGER,
60+
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
61+
)
62+
`);
63+
tx.executeSql(`
64+
CREATE TABLE IF NOT EXISTS writes (
65+
thread_id TEXT NOT NULL,
66+
checkpoint_ns TEXT NOT NULL DEFAULT '',
67+
checkpoint_id TEXT NOT NULL,
68+
task_id TEXT NOT NULL,
69+
idx INTEGER NOT NULL,
70+
channel TEXT NOT NULL,
71+
value TEXT,
72+
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx, channel)
73+
)
74+
`);
6475

65-
// Indexes
6676
tx.executeSql(
6777
`CREATE INDEX IF NOT EXISTS idx_messages_conversationId_timestamp ON messages (conversationId, timestamp ASC)`,
6878
);
6979
tx.executeSql(
7080
`CREATE INDEX IF NOT EXISTS idx_conversations_lastModifiedAt ON conversations (lastModifiedAt DESC)`,
7181
);
7282
tx.executeSql(
73-
`CREATE INDEX IF NOT EXISTS idx_lg_checkpoints_thread_id_updated_at ON langgraph_checkpoints (thread_id, updated_at DESC)`,
83+
`CREATE INDEX IF NOT EXISTS idx_lg_checkpoints_main ON langgraph_checkpoints (thread_id, checkpoint_ns, checkpoint_id DESC)`,
84+
);
85+
tx.executeSql(
86+
`CREATE INDEX IF NOT EXISTS idx_writes_main ON writes (thread_id, checkpoint_ns, checkpoint_id, task_id, channel)`,
7487
);
7588
},
7689
(transactionError) => {
77-
// Error callback for the transaction
7890
console.error(
7991
"[DB] Transaction error during table initialization:",
8092
transactionError,
8193
);
8294
reject(transactionError);
8395
},
8496
() => {
85-
// Success callback for the transaction
8697
resolve();
8798
},
8899
);

src/pages/aiAssistant/memory.js

Lines changed: 48 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,22 @@ import {
55
copyCheckpoint,
66
getCheckpointId,
77
} from "@langchain/langgraph-checkpoint";
8-
import { decode } from "utils/encodings";
98
import { executeSqlAsync, openDB } from "./db";
9+
1010
const checkpointMetadataKeys = ["source", "step", "writes", "parents"];
1111
function validateKeys(keys) {
1212
return keys;
1313
}
1414
const validCheckpointMetadataKeys = validateKeys(checkpointMetadataKeys);
1515

1616
// Helper to convert Uint8Array or other forms to a UTF-8 string for DB storage
17-
async function ensureStringForDB(serializedData) {
17+
function ensureStringForDB(serializedData) {
1818
if (typeof serializedData === "string") {
1919
return serializedData;
2020
}
2121
if (serializedData instanceof Uint8Array) {
2222
try {
23-
return await decode(serializedData.buffer, "UTF-8");
23+
return new TextDecoder().decode(serializedData);
2424
} catch (e) {
2525
console.error(
2626
"TextDecoder failed for Uint8Array, falling back to JSON.stringify:",
@@ -108,8 +108,8 @@ export class CordovaSqliteSaver extends BaseCheckpointSaver {
108108
const fetchDataPromise = new Promise((resolveData, rejectData) => {
109109
this.db.readTransaction(
110110
(tx) => {
111-
let mainSql = `SELECT checkpoint_id, parent_checkpoint_id, type, checkpoint, metadata
112-
FROM checkpoints WHERE thread_id = ? AND checkpoint_ns = ?`;
111+
let mainSql = `SELECT checkpoint_id, parent_checkpoint_id, checkpoint, metadata
112+
FROM langgraph_checkpoints WHERE thread_id = ? AND checkpoint_ns = ?`;
113113
const mainParams = [
114114
thread_id,
115115
config.configurable?.checkpoint_ns ?? "",
@@ -132,7 +132,7 @@ export class CordovaSqliteSaver extends BaseCheckpointSaver {
132132
const mainRowData = mainResultSet.rows.item(0);
133133
const actual_checkpoint_id = mainRowData.checkpoint_id;
134134

135-
const writesSql = `SELECT task_id, channel, type, value FROM writes
135+
const writesSql = `SELECT task_id, channel, value FROM writes
136136
WHERE thread_id = ? AND checkpoint_ns = ? AND checkpoint_id = ?`;
137137
tx.executeSql(
138138
writesSql,
@@ -148,7 +148,7 @@ export class CordovaSqliteSaver extends BaseCheckpointSaver {
148148
}
149149

150150
if (mainRowData.parent_checkpoint_id) {
151-
const sendsSql = `SELECT type, value FROM writes
151+
const sendsSql = `SELECT value FROM writes
152152
WHERE thread_id = ? AND checkpoint_ns = ? AND checkpoint_id = ? AND channel = ?
153153
ORDER BY idx`;
154154
tx.executeSql(
@@ -286,25 +286,22 @@ export class CordovaSqliteSaver extends BaseCheckpointSaver {
286286
const [typeMd, rawSerializedMetadata] = this.serde.dumpsTyped(metadata);
287287

288288
// Ensure strings for DB
289-
const finalSerializedCheckpoint = await ensureStringForDB(
289+
const finalSerializedCheckpoint = ensureStringForDB(
290290
rawSerializedCheckpoint,
291291
);
292-
const finalSerializedMetadata = await ensureStringForDB(
293-
rawSerializedMetadata,
294-
);
292+
const finalSerializedMetadata = ensureStringForDB(rawSerializedMetadata);
295293

296294
return new Promise((resolve, reject) => {
297295
this.db.transaction((tx) => {
298296
executeSqlAsync(
299297
tx,
300-
`INSERT OR REPLACE INTO checkpoints (thread_id, checkpoint_ns, checkpoint_id, parent_checkpoint_id, type, checkpoint, metadata)
301-
VALUES (?, ?, ?, ?, ?, ?, ?)`,
298+
`INSERT OR REPLACE INTO langgraph_checkpoints (thread_id, checkpoint_ns, checkpoint_id, parent_checkpoint_id, checkpoint, metadata)
299+
VALUES ( ?, ?, ?, ?, ?, ?)`,
302300
[
303301
thread_id,
304302
checkpoint_ns,
305303
new_checkpoint_id,
306304
parent_checkpoint_id,
307-
typeCp,
308305
finalSerializedCheckpoint,
309306
finalSerializedMetadata,
310307
],
@@ -339,89 +336,41 @@ export class CordovaSqliteSaver extends BaseCheckpointSaver {
339336

340337
if (!thread_id || !checkpoint_id) {
341338
throw new Error(
342-
"[CSCS] Missing thread_id or checkpoint_id in config for putWrites.",
339+
"Missing thread_id or checkpoint_id in config for putWrites.",
343340
);
344341
}
345342
if (!writes || writes.length === 0) {
346-
return; // Nothing to write
343+
return Promise.resolve();
347344
}
348345

349-
// Stage 1: Prepare all data for writing
350-
let preparedWrites;
351-
try {
352-
preparedWrites = await Promise.all(
353-
writes.map(async (writeTuple, idx) => {
346+
return new Promise((resolve, reject) => {
347+
this.db.transaction((tx) => {
348+
const writePromises = writes.map((writeTuple, idx) => {
354349
const channel = writeTuple[0];
355350
const value = writeTuple[1];
356351
const [type, rawSerializedValue] = this.serde.dumpsTyped(value);
357-
const finalSerializedValue =
358-
await ensureStringForDB(rawSerializedValue);
359-
const dbIdx =
360-
WRITES_IDX_MAP[channel] !== undefined
361-
? WRITES_IDX_MAP[channel]
362-
: idx;
363-
return { channel, type, finalSerializedValue, dbIdx };
364-
}),
365-
);
366-
} catch (serializationError) {
367-
console.error(
368-
"[CSCS] Error during putWrites serialization phase:",
369-
serializationError,
370-
);
371-
throw serializationError;
372-
}
373-
374-
// Stage 2: Execute all SQL writes sequentially within a single transaction using callbacks
375-
return new Promise((resolve, reject) => {
376-
this.db.transaction(
377-
(tx) => {
378-
let pending = preparedWrites.length;
379-
let hasError = false;
380-
381-
preparedWrites.forEach(
382-
({ channel, type, finalSerializedValue, dbIdx }) => {
383-
tx.executeSql(
384-
`INSERT OR REPLACE INTO writes (thread_id, checkpoint_ns, checkpoint_id, task_id, idx, channel, type, value)
385-
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
386-
[
387-
thread_id,
388-
checkpoint_ns,
389-
checkpoint_id,
390-
taskId,
391-
dbIdx,
392-
channel,
393-
type,
394-
finalSerializedValue,
395-
],
396-
() => {
397-
if (--pending === 0 && !hasError) {
398-
resolve();
399-
}
400-
},
401-
(tx, error) => {
402-
if (!hasError) {
403-
hasError = true;
404-
console.error("[CSCS] putWrites SQL error:", error);
405-
reject(error);
406-
}
407-
return true; // still try remaining queries
408-
},
409-
);
410-
},
352+
const finalSerializedValue = ensureStringForDB(rawSerializedValue); // Ensure string
353+
354+
return executeSqlAsync(
355+
tx,
356+
`INSERT OR REPLACE INTO writes (thread_id, checkpoint_ns, checkpoint_id, task_id, idx, channel, value)
357+
VALUES (?, ?, ?, ?, ?, ?, ?)`,
358+
[
359+
thread_id,
360+
checkpoint_ns,
361+
checkpoint_id,
362+
taskId,
363+
WRITES_IDX_MAP[channel] || idx,
364+
channel,
365+
finalSerializedValue,
366+
],
411367
);
368+
});
412369

413-
if (pending === 0) {
414-
resolve();
415-
}
416-
},
417-
(transactionError) => {
418-
console.error(
419-
"[CSCS] putWrites Transaction failed:",
420-
transactionError,
421-
);
422-
reject(transactionError);
423-
},
424-
);
370+
Promise.all(writePromises)
371+
.then(() => resolve())
372+
.catch(reject); // Will catch first error from Promise.all
373+
}, reject); // Transaction errors
425374
});
426375
}
427376

@@ -438,14 +387,16 @@ export class CordovaSqliteSaver extends BaseCheckpointSaver {
438387
const thread_id = config.configurable?.thread_id;
439388
const checkpoint_ns = config.configurable?.checkpoint_ns ?? "";
440389

441-
if (!thread_id) return;
442-
443-
let checkpointIdRows = [];
390+
if (!thread_id) {
391+
return;
392+
}
444393

394+
let checkpointIdRows = []; // To store { checkpoint_id: string }
445395
try {
446396
await new Promise((resolveOuter, rejectOuter) => {
447397
this.db.readTransaction((tx) => {
448-
let sql = `SELECT checkpoint_id FROM checkpoints`;
398+
let sql = `SELECT checkpoint_id FROM langgraph_checkpoints`;
399+
449400
const params = [];
450401
const whereClauses = ["thread_id = ?", "checkpoint_ns = ?"];
451402
params.push(thread_id, checkpoint_ns);
@@ -454,15 +405,15 @@ export class CordovaSqliteSaver extends BaseCheckpointSaver {
454405
whereClauses.push("checkpoint_id < ?");
455406
params.push(before.configurable.checkpoint_id);
456407
}
457-
408+
if (filter && Object.keys(filter).length > 0) {
409+
}
458410
if (whereClauses.length > 0) {
459411
sql += ` WHERE ${whereClauses.join(" AND ")}`;
460412
}
461-
462413
sql += ` ORDER BY checkpoint_id DESC`;
463-
464414
if (limit) {
465-
sql += ` LIMIT ${Number.parseInt(limit, 10) * (filter ? 5 : 1)}`;
415+
// This limit is applied before JS filtering, so fetch more if JS filtering is heavy
416+
sql += ` LIMIT ${Number.parseInt(limit, 10) * (filter ? 5 : 1)}`; // Fetch more if filtering
466417
}
467418

468419
executeSqlAsync(tx, sql, params)
@@ -473,7 +424,7 @@ export class CordovaSqliteSaver extends BaseCheckpointSaver {
473424
resolveOuter();
474425
})
475426
.catch(rejectOuter);
476-
}, rejectOuter); // <- If the transaction itself fails
427+
}, rejectOuter);
477428
});
478429

479430
let yieldedCount = 0;
@@ -485,9 +436,7 @@ export class CordovaSqliteSaver extends BaseCheckpointSaver {
485436
checkpoint_id: idRow.checkpoint_id,
486437
},
487438
};
488-
489439
const fullTuple = await this.getTuple(tupleConfig);
490-
491440
if (fullTuple) {
492441
if (
493442
filter &&
@@ -496,9 +445,8 @@ export class CordovaSqliteSaver extends BaseCheckpointSaver {
496445
([key, value]) => fullTuple.metadata[key] === value,
497446
)
498447
) {
499-
continue;
448+
continue; // Skip if JS filter doesn't match
500449
}
501-
502450
yield fullTuple;
503451
yieldedCount++;
504452
if (limit !== undefined && yieldedCount >= limit) break;

0 commit comments

Comments
 (0)