Skip to content

Commit 4c49a8f

Browse files
committed
Bug fix; removed async operations from db transactions
1 parent 47b2022 commit 4c49a8f

File tree

1 file changed

+159
-127
lines changed

1 file changed

+159
-127
lines changed

src/pages/aiAssistant/memory.js

Lines changed: 159 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,21 @@ import {
66
getCheckpointId,
77
} from "@langchain/langgraph-checkpoint";
88
import { executeSqlAsync, openDB } from "./db";
9-
9+
import { decode } from "utils/encodings";
1010
const checkpointMetadataKeys = ["source", "step", "writes", "parents"];
1111
function validateKeys(keys) {
1212
return keys;
1313
}
1414
const validCheckpointMetadataKeys = validateKeys(checkpointMetadataKeys);
1515

1616
// Helper to convert Uint8Array or other forms to a UTF-8 string for DB storage
17-
function ensureStringForDB(serializedData) {
17+
async function ensureStringForDB(serializedData) {
1818
if (typeof serializedData === "string") {
1919
return serializedData;
2020
}
2121
if (serializedData instanceof Uint8Array) {
2222
try {
23-
return new TextDecoder().decode(serializedData);
23+
return await decode(serializedData.buffer, "UTF-8");
2424
} catch (e) {
2525
console.error(
2626
"TextDecoder failed for Uint8Array, falling back to JSON.stringify:",
@@ -286,10 +286,10 @@ export class CordovaSqliteSaver extends BaseCheckpointSaver {
286286
const [typeMd, rawSerializedMetadata] = this.serde.dumpsTyped(metadata);
287287

288288
// Ensure strings for DB
289-
const finalSerializedCheckpoint = ensureStringForDB(
289+
const finalSerializedCheckpoint = await ensureStringForDB(
290290
rawSerializedCheckpoint,
291291
);
292-
const finalSerializedMetadata = ensureStringForDB(rawSerializedMetadata);
292+
const finalSerializedMetadata = await ensureStringForDB(rawSerializedMetadata);
293293

294294
return new Promise((resolve, reject) => {
295295
this.db.transaction((tx) => {
@@ -329,52 +329,85 @@ export class CordovaSqliteSaver extends BaseCheckpointSaver {
329329
* @returns {Promise<void>} A promise that resolves when writes are stored.
330330
*/
331331
async putWrites(config, writes, taskId) {
332-
if (!this.isSetup) await this.setup();
333-
334-
const thread_id = config.configurable?.thread_id;
335-
const checkpoint_ns = config.configurable?.checkpoint_ns ?? "";
336-
const checkpoint_id = config.configurable?.checkpoint_id;
337-
338-
if (!thread_id || !checkpoint_id) {
339-
throw new Error(
340-
"Missing thread_id or checkpoint_id in config for putWrites.",
341-
);
342-
}
343-
if (!writes || writes.length === 0) {
344-
return Promise.resolve();
345-
}
346-
347-
return new Promise((resolve, reject) => {
348-
this.db.transaction((tx) => {
349-
const writePromises = writes.map((writeTuple, idx) => {
350-
const channel = writeTuple[0];
351-
const value = writeTuple[1];
352-
const [type, rawSerializedValue] = this.serde.dumpsTyped(value);
353-
const finalSerializedValue = ensureStringForDB(rawSerializedValue); // Ensure string
354-
355-
return executeSqlAsync(
356-
tx,
357-
`INSERT OR REPLACE INTO writes (thread_id, checkpoint_ns, checkpoint_id, task_id, idx, channel, type, value)
332+
if (!this.isSetup) await this.setup();
333+
334+
const thread_id = config.configurable?.thread_id;
335+
const checkpoint_ns = config.configurable?.checkpoint_ns ?? "";
336+
const checkpoint_id = config.configurable?.checkpoint_id;
337+
338+
if (!thread_id || !checkpoint_id) {
339+
throw new Error("[CSCS] Missing thread_id or checkpoint_id in config for putWrites.");
340+
}
341+
if (!writes || writes.length === 0) {
342+
return; // Nothing to write
343+
}
344+
345+
// Stage 1: Prepare all data for writing
346+
let preparedWrites;
347+
try {
348+
preparedWrites = await Promise.all(
349+
writes.map(async (writeTuple, idx) => {
350+
const channel = writeTuple[0];
351+
const value = writeTuple[1];
352+
const [type, rawSerializedValue] = this.serde.dumpsTyped(value);
353+
const finalSerializedValue = await ensureStringForDB(rawSerializedValue);
354+
const dbIdx = WRITES_IDX_MAP[channel] !== undefined ? WRITES_IDX_MAP[channel] : idx;
355+
return { channel, type, finalSerializedValue, dbIdx };
356+
})
357+
);
358+
} catch (serializationError) {
359+
console.error("[CSCS] Error during putWrites serialization phase:", serializationError);
360+
throw serializationError;
361+
}
362+
363+
// Stage 2: Execute all SQL writes sequentially within a single transaction using callbacks
364+
return new Promise((resolve, reject) => {
365+
this.db.transaction(
366+
(tx) => {
367+
let pending = preparedWrites.length;
368+
let hasError = false;
369+
370+
preparedWrites.forEach(({ channel, type, finalSerializedValue, dbIdx }) => {
371+
tx.executeSql(
372+
`INSERT OR REPLACE INTO writes (thread_id, checkpoint_ns, checkpoint_id, task_id, idx, channel, type, value)
358373
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
359-
[
360-
thread_id,
361-
checkpoint_ns,
362-
checkpoint_id,
363-
taskId,
364-
WRITES_IDX_MAP[channel] || idx,
365-
channel,
366-
type,
367-
finalSerializedValue,
368-
],
369-
);
370-
});
371-
372-
Promise.all(writePromises)
373-
.then(() => resolve())
374-
.catch(reject); // Will catch first error from Promise.all
375-
}, reject); // Transaction errors
376-
});
377-
}
374+
[
375+
thread_id,
376+
checkpoint_ns,
377+
checkpoint_id,
378+
taskId,
379+
dbIdx,
380+
channel,
381+
type,
382+
finalSerializedValue,
383+
],
384+
() => {
385+
if (--pending === 0 && !hasError) {
386+
resolve();
387+
}
388+
},
389+
(tx, error) => {
390+
if (!hasError) {
391+
hasError = true;
392+
console.error("[CSCS] putWrites SQL error:", error);
393+
reject(error);
394+
}
395+
return true; // still try remaining queries
396+
}
397+
);
398+
});
399+
400+
if (pending === 0) {
401+
resolve();
402+
}
403+
},
404+
(transactionError) => {
405+
console.error("[CSCS] putWrites Transaction failed:", transactionError);
406+
reject(transactionError);
407+
}
408+
);
409+
});
410+
}
378411

379412
/**
380413
* Asynchronously lists checkpoints for a given thread.
@@ -383,81 +416,80 @@ export class CordovaSqliteSaver extends BaseCheckpointSaver {
383416
* @yields {object} A checkpoint tuple.
384417
*/
385418
async *list(config, options) {
386-
if (!this.isSetup) await this.setup();
387-
388-
const { limit, before, filter } = options ?? {};
389-
const thread_id = config.configurable?.thread_id;
390-
const checkpoint_ns = config.configurable?.checkpoint_ns ?? "";
391-
392-
if (!thread_id) {
393-
return;
394-
}
395-
396-
let checkpointIdRows = []; // To store { checkpoint_id: string }
397-
try {
398-
await new Promise((resolveOuter, rejectOuter) => {
399-
this.db.readTransaction((tx) => {
400-
let sql = `SELECT checkpoint_id FROM checkpoints`; // Only fetch ID initially
401-
const params = [];
402-
const whereClauses = ["thread_id = ?", "checkpoint_ns = ?"];
403-
params.push(thread_id, checkpoint_ns);
404-
405-
if (before?.configurable?.checkpoint_id) {
406-
whereClauses.push("checkpoint_id < ?");
407-
params.push(before.configurable.checkpoint_id);
408-
}
409-
if (filter && Object.keys(filter).length > 0) {
410-
}
411-
if (whereClauses.length > 0) {
412-
sql += ` WHERE ${whereClauses.join(" AND ")}`;
413-
}
414-
sql += ` ORDER BY checkpoint_id DESC`;
415-
if (limit) {
416-
// This limit is applied before JS filtering, so fetch more if JS filtering is heavy
417-
sql += ` LIMIT ${Number.parseInt(limit, 10) * (filter ? 5 : 1)}`; // Fetch more if filtering
418-
}
419-
420-
executeSqlAsync(tx, sql, params)
421-
.then((resultSet) => {
422-
for (let i = 0; i < resultSet.rows.length; i++) {
423-
checkpointIdRows.push(resultSet.rows.item(i));
424-
}
425-
resolveOuter();
426-
})
427-
.catch(rejectOuter);
428-
}, rejectOuter);
429-
});
430-
431-
let yieldedCount = 0;
432-
for (const idRow of checkpointIdRows) {
433-
const tupleConfig = {
434-
configurable: {
435-
thread_id,
436-
checkpoint_ns,
437-
checkpoint_id: idRow.checkpoint_id,
438-
},
439-
};
440-
const fullTuple = await this.getTuple(tupleConfig);
441-
if (fullTuple) {
442-
if (
443-
filter &&
444-
fullTuple.metadata &&
445-
!Object.entries(filter).every(
446-
([key, value]) => fullTuple.metadata[key] === value,
447-
)
448-
) {
449-
continue; // Skip if JS filter doesn't match
450-
}
451-
yield fullTuple;
452-
yieldedCount++;
453-
if (limit !== undefined && yieldedCount >= limit) break;
454-
}
455-
}
456-
} catch (error) {
457-
console.error(
458-
`list: Error fetching/processing for thread ${thread_id}:`,
459-
error,
460-
);
461-
}
462-
}
419+
if (!this.isSetup) await this.setup();
420+
421+
const { limit, before, filter } = options ?? {};
422+
const thread_id = config.configurable?.thread_id;
423+
const checkpoint_ns = config.configurable?.checkpoint_ns ?? "";
424+
425+
if (!thread_id) return;
426+
427+
let checkpointIdRows = [];
428+
429+
try {
430+
await new Promise((resolveOuter, rejectOuter) => {
431+
this.db.readTransaction((tx) => {
432+
let sql = `SELECT checkpoint_id FROM checkpoints`;
433+
const params = [];
434+
const whereClauses = ["thread_id = ?", "checkpoint_ns = ?"];
435+
params.push(thread_id, checkpoint_ns);
436+
437+
if (before?.configurable?.checkpoint_id) {
438+
whereClauses.push("checkpoint_id < ?");
439+
params.push(before.configurable.checkpoint_id);
440+
}
441+
442+
if (whereClauses.length > 0) {
443+
sql += ` WHERE ${whereClauses.join(" AND ")}`;
444+
}
445+
446+
sql += ` ORDER BY checkpoint_id DESC`;
447+
448+
if (limit) {
449+
sql += ` LIMIT ${parseInt(limit, 10) * (filter ? 5 : 1)}`;
450+
}
451+
452+
executeSqlAsync(tx, sql, params)
453+
.then((resultSet) => {
454+
for (let i = 0; i < resultSet.rows.length; i++) {
455+
checkpointIdRows.push(resultSet.rows.item(i));
456+
}
457+
resolveOuter();
458+
})
459+
.catch(rejectOuter);
460+
}, rejectOuter); // <- If the transaction itself fails
461+
});
462+
463+
let yieldedCount = 0;
464+
for (const idRow of checkpointIdRows) {
465+
const tupleConfig = {
466+
configurable: {
467+
thread_id,
468+
checkpoint_ns,
469+
checkpoint_id: idRow.checkpoint_id,
470+
},
471+
};
472+
473+
const fullTuple = await this.getTuple(tupleConfig);
474+
475+
if (fullTuple) {
476+
if (
477+
filter &&
478+
fullTuple.metadata &&
479+
!Object.entries(filter).every(
480+
([key, value]) => fullTuple.metadata[key] === value,
481+
)
482+
) {
483+
continue;
484+
}
485+
486+
yield fullTuple;
487+
yieldedCount++;
488+
if (limit !== undefined && yieldedCount >= limit) break;
489+
}
490+
}
491+
} catch (error) {
492+
console.error(`list: Error fetching/processing for thread ${thread_id}:`, error);
493+
}
494+
}
463495
}

0 commit comments

Comments
 (0)