55 copyCheckpoint ,
66 getCheckpointId ,
77} from "@langchain/langgraph-checkpoint" ;
8- import { executeSqlAsync , openDB } from "./db" ;
98import { decode } from "utils/encodings" ;
9+ import { executeSqlAsync , openDB } from "./db" ;
1010const checkpointMetadataKeys = [ "source" , "step" , "writes" , "parents" ] ;
1111function validateKeys ( keys ) {
1212 return keys ;
@@ -289,7 +289,9 @@ export class CordovaSqliteSaver extends BaseCheckpointSaver {
289289 const finalSerializedCheckpoint = await ensureStringForDB (
290290 rawSerializedCheckpoint ,
291291 ) ;
292- const finalSerializedMetadata = await ensureStringForDB ( rawSerializedMetadata ) ;
292+ const finalSerializedMetadata = await ensureStringForDB (
293+ rawSerializedMetadata ,
294+ ) ;
293295
294296 return new Promise ( ( resolve , reject ) => {
295297 this . db . transaction ( ( tx ) => {
@@ -329,85 +331,99 @@ export class CordovaSqliteSaver extends BaseCheckpointSaver {
329331 * @returns {Promise<void> } A promise that resolves when writes are stored.
330332 */
331333 async putWrites ( config , writes , taskId ) {
332- if ( ! this . isSetup ) await this . setup ( ) ;
333-
334- const thread_id = config . configurable ?. thread_id ;
335- const checkpoint_ns = config . configurable ?. checkpoint_ns ?? "" ;
336- const checkpoint_id = config . configurable ?. checkpoint_id ;
337-
338- if ( ! thread_id || ! checkpoint_id ) {
339- throw new Error ( "[CSCS] Missing thread_id or checkpoint_id in config for putWrites." ) ;
340- }
341- if ( ! writes || writes . length === 0 ) {
342- return ; // Nothing to write
343- }
344-
345- // Stage 1: Prepare all data for writing
346- let preparedWrites ;
347- try {
348- preparedWrites = await Promise . all (
349- writes . map ( async ( writeTuple , idx ) => {
350- const channel = writeTuple [ 0 ] ;
351- const value = writeTuple [ 1 ] ;
352- const [ type , rawSerializedValue ] = this . serde . dumpsTyped ( value ) ;
353- const finalSerializedValue = await ensureStringForDB ( rawSerializedValue ) ;
354- const dbIdx = WRITES_IDX_MAP [ channel ] !== undefined ? WRITES_IDX_MAP [ channel ] : idx ;
355- return { channel, type, finalSerializedValue, dbIdx } ;
356- } )
357- ) ;
358- } catch ( serializationError ) {
359- console . error ( "[CSCS] Error during putWrites serialization phase:" , serializationError ) ;
360- throw serializationError ;
361- }
362-
363- // Stage 2: Execute all SQL writes sequentially within a single transaction using callbacks
364- return new Promise ( ( resolve , reject ) => {
365- this . db . transaction (
366- ( tx ) => {
367- let pending = preparedWrites . length ;
368- let hasError = false ;
369-
370- preparedWrites . forEach ( ( { channel, type, finalSerializedValue, dbIdx } ) => {
371- tx . executeSql (
372- `INSERT OR REPLACE INTO writes (thread_id, checkpoint_ns, checkpoint_id, task_id, idx, channel, type, value)
334+ if ( ! this . isSetup ) await this . setup ( ) ;
335+
336+ const thread_id = config . configurable ?. thread_id ;
337+ const checkpoint_ns = config . configurable ?. checkpoint_ns ?? "" ;
338+ const checkpoint_id = config . configurable ?. checkpoint_id ;
339+
340+ if ( ! thread_id || ! checkpoint_id ) {
341+ throw new Error (
342+ "[CSCS] Missing thread_id or checkpoint_id in config for putWrites." ,
343+ ) ;
344+ }
345+ if ( ! writes || writes . length === 0 ) {
346+ return ; // Nothing to write
347+ }
348+
349+ // Stage 1: Prepare all data for writing
350+ let preparedWrites ;
351+ try {
352+ preparedWrites = await Promise . all (
353+ writes . map ( async ( writeTuple , idx ) => {
354+ const channel = writeTuple [ 0 ] ;
355+ const value = writeTuple [ 1 ] ;
356+ const [ type , rawSerializedValue ] = this . serde . dumpsTyped ( value ) ;
357+ const finalSerializedValue =
358+ await ensureStringForDB ( rawSerializedValue ) ;
359+ const dbIdx =
360+ WRITES_IDX_MAP [ channel ] !== undefined
361+ ? WRITES_IDX_MAP [ channel ]
362+ : idx ;
363+ return { channel, type, finalSerializedValue, dbIdx } ;
364+ } ) ,
365+ ) ;
366+ } catch ( serializationError ) {
367+ console . error (
368+ "[CSCS] Error during putWrites serialization phase:" ,
369+ serializationError ,
370+ ) ;
371+ throw serializationError ;
372+ }
373+
374+ // Stage 2: Execute all SQL writes sequentially within a single transaction using callbacks
375+ return new Promise ( ( resolve , reject ) => {
376+ this . db . transaction (
377+ ( tx ) => {
378+ let pending = preparedWrites . length ;
379+ let hasError = false ;
380+
381+ preparedWrites . forEach (
382+ ( { channel, type, finalSerializedValue, dbIdx } ) => {
383+ tx . executeSql (
384+ `INSERT OR REPLACE INTO writes (thread_id, checkpoint_ns, checkpoint_id, task_id, idx, channel, type, value)
373385 VALUES (?, ?, ?, ?, ?, ?, ?, ?)` ,
374- [
375- thread_id ,
376- checkpoint_ns ,
377- checkpoint_id ,
378- taskId ,
379- dbIdx ,
380- channel ,
381- type ,
382- finalSerializedValue ,
383- ] ,
384- ( ) => {
385- if ( -- pending === 0 && ! hasError ) {
386- resolve ( ) ;
387- }
388- } ,
389- ( tx , error ) => {
390- if ( ! hasError ) {
391- hasError = true ;
392- console . error ( "[CSCS] putWrites SQL error:" , error ) ;
393- reject ( error ) ;
394- }
395- return true ; // still try remaining queries
396- }
397- ) ;
398- } ) ;
399-
400- if ( pending === 0 ) {
401- resolve ( ) ;
402- }
403- } ,
404- ( transactionError ) => {
405- console . error ( "[CSCS] putWrites Transaction failed:" , transactionError ) ;
406- reject ( transactionError ) ;
407- }
408- ) ;
409- } ) ;
410- }
386+ [
387+ thread_id ,
388+ checkpoint_ns ,
389+ checkpoint_id ,
390+ taskId ,
391+ dbIdx ,
392+ channel ,
393+ type ,
394+ finalSerializedValue ,
395+ ] ,
396+ ( ) => {
397+ if ( -- pending === 0 && ! hasError ) {
398+ resolve ( ) ;
399+ }
400+ } ,
401+ ( tx , error ) => {
402+ if ( ! hasError ) {
403+ hasError = true ;
404+ console . error ( "[CSCS] putWrites SQL error:" , error ) ;
405+ reject ( error ) ;
406+ }
407+ return true ; // still try remaining queries
408+ } ,
409+ ) ;
410+ } ,
411+ ) ;
412+
413+ if ( pending === 0 ) {
414+ resolve ( ) ;
415+ }
416+ } ,
417+ ( transactionError ) => {
418+ console . error (
419+ "[CSCS] putWrites Transaction failed:" ,
420+ transactionError ,
421+ ) ;
422+ reject ( transactionError ) ;
423+ } ,
424+ ) ;
425+ } ) ;
426+ }
411427
412428 /**
413429 * Asynchronously lists checkpoints for a given thread.
@@ -416,80 +432,83 @@ export class CordovaSqliteSaver extends BaseCheckpointSaver {
416432 * @yields {object} A checkpoint tuple.
417433 */
418434 async * list ( config , options ) {
419- if ( ! this . isSetup ) await this . setup ( ) ;
420-
421- const { limit, before, filter } = options ?? { } ;
422- const thread_id = config . configurable ?. thread_id ;
423- const checkpoint_ns = config . configurable ?. checkpoint_ns ?? "" ;
424-
425- if ( ! thread_id ) return ;
426-
427- let checkpointIdRows = [ ] ;
428-
429- try {
430- await new Promise ( ( resolveOuter , rejectOuter ) => {
431- this . db . readTransaction ( ( tx ) => {
432- let sql = `SELECT checkpoint_id FROM checkpoints` ;
433- const params = [ ] ;
434- const whereClauses = [ "thread_id = ?" , "checkpoint_ns = ?" ] ;
435- params . push ( thread_id , checkpoint_ns ) ;
436-
437- if ( before ?. configurable ?. checkpoint_id ) {
438- whereClauses . push ( "checkpoint_id < ?" ) ;
439- params . push ( before . configurable . checkpoint_id ) ;
440- }
441-
442- if ( whereClauses . length > 0 ) {
443- sql += ` WHERE ${ whereClauses . join ( " AND " ) } ` ;
444- }
445-
446- sql += ` ORDER BY checkpoint_id DESC` ;
447-
448- if ( limit ) {
449- sql += ` LIMIT ${ parseInt ( limit , 10 ) * ( filter ? 5 : 1 ) } ` ;
450- }
451-
452- executeSqlAsync ( tx , sql , params )
453- . then ( ( resultSet ) => {
454- for ( let i = 0 ; i < resultSet . rows . length ; i ++ ) {
455- checkpointIdRows . push ( resultSet . rows . item ( i ) ) ;
456- }
457- resolveOuter ( ) ;
458- } )
459- . catch ( rejectOuter ) ;
460- } , rejectOuter ) ; // <- If the transaction itself fails
461- } ) ;
462-
463- let yieldedCount = 0 ;
464- for ( const idRow of checkpointIdRows ) {
465- const tupleConfig = {
466- configurable : {
467- thread_id,
468- checkpoint_ns,
469- checkpoint_id : idRow . checkpoint_id ,
470- } ,
471- } ;
472-
473- const fullTuple = await this . getTuple ( tupleConfig ) ;
474-
475- if ( fullTuple ) {
476- if (
477- filter &&
478- fullTuple . metadata &&
479- ! Object . entries ( filter ) . every (
480- ( [ key , value ] ) => fullTuple . metadata [ key ] === value ,
481- )
482- ) {
483- continue ;
484- }
485-
486- yield fullTuple ;
487- yieldedCount ++ ;
488- if ( limit !== undefined && yieldedCount >= limit ) break ;
489- }
490- }
491- } catch ( error ) {
492- console . error ( `list: Error fetching/processing for thread ${ thread_id } :` , error ) ;
493- }
494- }
435+ if ( ! this . isSetup ) await this . setup ( ) ;
436+
437+ const { limit, before, filter } = options ?? { } ;
438+ const thread_id = config . configurable ?. thread_id ;
439+ const checkpoint_ns = config . configurable ?. checkpoint_ns ?? "" ;
440+
441+ if ( ! thread_id ) return ;
442+
443+ let checkpointIdRows = [ ] ;
444+
445+ try {
446+ await new Promise ( ( resolveOuter , rejectOuter ) => {
447+ this . db . readTransaction ( ( tx ) => {
448+ let sql = `SELECT checkpoint_id FROM checkpoints` ;
449+ const params = [ ] ;
450+ const whereClauses = [ "thread_id = ?" , "checkpoint_ns = ?" ] ;
451+ params . push ( thread_id , checkpoint_ns ) ;
452+
453+ if ( before ?. configurable ?. checkpoint_id ) {
454+ whereClauses . push ( "checkpoint_id < ?" ) ;
455+ params . push ( before . configurable . checkpoint_id ) ;
456+ }
457+
458+ if ( whereClauses . length > 0 ) {
459+ sql += ` WHERE ${ whereClauses . join ( " AND " ) } ` ;
460+ }
461+
462+ sql += ` ORDER BY checkpoint_id DESC` ;
463+
464+ if ( limit ) {
465+ sql += ` LIMIT ${ Number . parseInt ( limit , 10 ) * ( filter ? 5 : 1 ) } ` ;
466+ }
467+
468+ executeSqlAsync ( tx , sql , params )
469+ . then ( ( resultSet ) => {
470+ for ( let i = 0 ; i < resultSet . rows . length ; i ++ ) {
471+ checkpointIdRows . push ( resultSet . rows . item ( i ) ) ;
472+ }
473+ resolveOuter ( ) ;
474+ } )
475+ . catch ( rejectOuter ) ;
476+ } , rejectOuter ) ; // <- If the transaction itself fails
477+ } ) ;
478+
479+ let yieldedCount = 0 ;
480+ for ( const idRow of checkpointIdRows ) {
481+ const tupleConfig = {
482+ configurable : {
483+ thread_id,
484+ checkpoint_ns,
485+ checkpoint_id : idRow . checkpoint_id ,
486+ } ,
487+ } ;
488+
489+ const fullTuple = await this . getTuple ( tupleConfig ) ;
490+
491+ if ( fullTuple ) {
492+ if (
493+ filter &&
494+ fullTuple . metadata &&
495+ ! Object . entries ( filter ) . every (
496+ ( [ key , value ] ) => fullTuple . metadata [ key ] === value ,
497+ )
498+ ) {
499+ continue ;
500+ }
501+
502+ yield fullTuple ;
503+ yieldedCount ++ ;
504+ if ( limit !== undefined && yieldedCount >= limit ) break ;
505+ }
506+ }
507+ } catch ( error ) {
508+ console . error (
509+ `list: Error fetching/processing for thread ${ thread_id } :` ,
510+ error ,
511+ ) ;
512+ }
513+ }
495514}
0 commit comments