@@ -553,10 +553,40 @@ function deleteAllRowsWithoutCrrReplication(
553553
554554const QUEUE_OVERHAUL_WIPE_MARKER = "queue_landing_state.wiped_for_stacked_overhaul.v1" ;
555555
556+ function rawCrsqlPrimaryKeyMatchesText ( value : unknown , text : string ) : boolean {
557+ if ( value === text ) return true ;
558+ if ( ! ( value instanceof Uint8Array ) ) return false ;
559+
560+ const packed = packedCrsqlPrimaryKey ( text ) ;
561+ return isSyncScalarBytes ( packed )
562+ && Buffer . from ( value ) . equals ( Buffer . from ( packed . base64 , "base64" ) ) ;
563+ }
564+
565+ function syncScalarPrimaryKeyMatchesText ( value : SyncScalar , text : string ) : boolean {
566+ if ( value === text ) return true ;
567+
568+ const packed = packedCrsqlPrimaryKey ( text ) ;
569+ return isSyncScalarBytes ( value )
570+ && isSyncScalarBytes ( packed )
571+ && value . base64 === packed . base64 ;
572+ }
573+
574+ function isLocalOnlyQueueWipeMarkerRawChange ( change : { table_name : string ; pk : unknown } ) : boolean {
575+ return change . table_name === "kv"
576+ && rawCrsqlPrimaryKeyMatchesText ( change . pk , QUEUE_OVERHAUL_WIPE_MARKER ) ;
577+ }
578+
579+ function isLocalOnlyQueueWipeMarkerChange ( change : CrsqlChangeRow ) : boolean {
580+ return change . table === "kv"
581+ && syncScalarPrimaryKeyMatchesText ( change . pk , QUEUE_OVERHAUL_WIPE_MARKER ) ;
582+ }
583+
556584/**
557585 * One-shot local wipe of legacy queue_landing_state on upgrade to the stacked-PR
558- * queue overhaul. Must run after migrations and before ensureCrrTables so deletes
559- * do not replicate to peers that have not upgraded yet.
586+ * queue overhaul. Must run after migrations and before ensureCrrTables so queue
587+ * deletes do not replicate to peers that have not upgraded yet. The marker is
588+ * stored in kv for compatibility with the original migration, but filtered from
589+ * CRDT import/export because it records local upgrade work, not shared state.
560590 */
561591function wipeQueueLandingStateForStackedOverhaulIfNeeded ( db : DatabaseSyncType , logger ?: Logger ) : void {
562592 try {
@@ -2986,17 +3016,19 @@ export async function openKvDb(dbPath: string, logger: Logger): Promise<AdeDb> {
29863016 [ version ]
29873017 ) ;
29883018
2989- return rows . map ( ( row ) => ( {
2990- table : row . table_name ,
2991- pk : encodeSyncScalar ( row . pk ) ,
2992- cid : row . cid ,
2993- val : encodeSyncScalar ( row . val ) ,
2994- col_version : Number ( row . col_version ) ,
2995- db_version : Number ( row . db_version ) ,
2996- site_id : Buffer . from ( row . site_id ) . toString ( "hex" ) ,
2997- cl : Number ( row . cl ) ,
2998- seq : Number ( row . seq ) ,
2999- } ) ) ;
3019+ return rows
3020+ . filter ( ( row ) => ! isLocalOnlyQueueWipeMarkerRawChange ( row ) )
3021+ . map ( ( row ) => ( {
3022+ table : row . table_name ,
3023+ pk : encodeSyncScalar ( row . pk ) ,
3024+ cid : row . cid ,
3025+ val : encodeSyncScalar ( row . val ) ,
3026+ col_version : Number ( row . col_version ) ,
3027+ db_version : Number ( row . db_version ) ,
3028+ site_id : Buffer . from ( row . site_id ) . toString ( "hex" ) ,
3029+ cl : Number ( row . cl ) ,
3030+ seq : Number ( row . seq ) ,
3031+ } ) ) ;
30003032 } ,
30013033 applyChanges : ( changes : CrsqlChangeRow [ ] ) => {
30023034 if ( ! crsqliteLoaded ) return { appliedCount : 0 , dbVersion : 0 , touchedTables : [ ] , rebuiltFts : false } ;
@@ -3005,6 +3037,7 @@ export async function openKvDb(dbPath: string, logger: Logger): Promise<AdeDb> {
30053037 runStatement ( db , "begin" ) ;
30063038 try {
30073039 for ( const rawChange of changes ) {
3040+ if ( isLocalOnlyQueueWipeMarkerChange ( rawChange ) ) continue ;
30083041 // Skip changes for tables that no longer exist in the schema
30093042 // (e.g. unified_memories removed in #329).
30103043 if ( ! rawHasTable ( db , rawChange . table ) ) continue ;
0 commit comments