@@ -521,6 +521,95 @@ function dropCrrTriggers(db: DatabaseSyncType, tableName: string, logger?: Logge
521521 return triggers . length ;
522522}
523523
524+ /** Strip CRR triggers/metadata so row deletes stay local (no replicated tombstones). */
525+ function deleteAllRowsWithoutCrrReplication (
526+ db : DatabaseSyncType ,
527+ tableName : string ,
528+ logger ?: Logger ,
529+ ) : void {
530+ if ( ! rawHasTable ( db , tableName ) ) return ;
531+
532+ const clockTableName = `${ tableName } __crsql_clock` ;
533+ const pksTableName = `${ tableName } __crsql_pks` ;
534+ if ( rawHasTable ( db , clockTableName ) || rawHasTable ( db , pksTableName ) || listCrrTriggers ( db , tableName ) . length > 0 ) {
535+ if ( rawHasTable ( db , "crsql_master" ) && rawHasColumn ( db , "crsql_master" , "tbl_name" ) ) {
536+ runStatement ( db , "delete from crsql_master where tbl_name = ?" , [ tableName ] ) ;
537+ }
538+ if ( rawHasTable ( db , "crsql_changes" ) && rawHasColumn ( db , "crsql_changes" , "table" ) ) {
539+ runStatement ( db , "delete from crsql_changes where [table] = ?" , [ tableName ] ) ;
540+ }
541+ try {
542+ getRow ( db , "select crsql_as_table(?) as ok" , [ tableName ] ) ;
543+ } catch {
544+ // Table may not be registered enough for crsql_as_table; shadow cleanup below still applies.
545+ }
546+ dropCrrTriggers ( db , tableName , logger ) ;
547+ runStatement ( db , `drop table if exists ${ quoteIdentifier ( clockTableName ) } ` ) ;
548+ runStatement ( db , `drop table if exists ${ quoteIdentifier ( pksTableName ) } ` ) ;
549+ }
550+
551+ runStatement ( db , `delete from ${ quoteIdentifier ( tableName ) } ` ) ;
552+ }
553+
554+ const QUEUE_OVERHAUL_WIPE_MARKER = "queue_landing_state.wiped_for_stacked_overhaul.v1" ;
555+
556+ function rawCrsqlPrimaryKeyMatchesText ( value : unknown , text : string ) : boolean {
557+ if ( value === text ) return true ;
558+ if ( ! ( value instanceof Uint8Array ) ) return false ;
559+
560+ const packed = packedCrsqlPrimaryKey ( text ) ;
561+ return isSyncScalarBytes ( packed )
562+ && Buffer . from ( value ) . equals ( Buffer . from ( packed . base64 , "base64" ) ) ;
563+ }
564+
565+ function syncScalarPrimaryKeyMatchesText ( value : SyncScalar , text : string ) : boolean {
566+ if ( value === text ) return true ;
567+
568+ const packed = packedCrsqlPrimaryKey ( text ) ;
569+ return isSyncScalarBytes ( value )
570+ && isSyncScalarBytes ( packed )
571+ && value . base64 === packed . base64 ;
572+ }
573+
574+ function isLocalOnlyQueueWipeMarkerRawChange ( change : { table_name : string ; pk : unknown } ) : boolean {
575+ return change . table_name === "kv"
576+ && rawCrsqlPrimaryKeyMatchesText ( change . pk , QUEUE_OVERHAUL_WIPE_MARKER ) ;
577+ }
578+
579+ function isLocalOnlyQueueWipeMarkerChange ( change : CrsqlChangeRow ) : boolean {
580+ return change . table === "kv"
581+ && syncScalarPrimaryKeyMatchesText ( change . pk , QUEUE_OVERHAUL_WIPE_MARKER ) ;
582+ }
583+
584+ /**
585+ * One-shot local wipe of legacy queue_landing_state on upgrade to the stacked-PR
586+ * queue overhaul. Must run after migrations and before ensureCrrTables so queue
587+ * deletes do not replicate to peers that have not upgraded yet. The marker is
588+ * stored in kv for compatibility with the original migration, but filtered from
589+ * CRDT import/export because it records local upgrade work, not shared state.
590+ */
591+ function wipeQueueLandingStateForStackedOverhaulIfNeeded ( db : DatabaseSyncType , logger ?: Logger ) : void {
592+ try {
593+ const row = getRow < { value : string } > (
594+ db ,
595+ "select value from kv where key = ?" ,
596+ [ QUEUE_OVERHAUL_WIPE_MARKER ] ,
597+ ) ;
598+ if ( row ) return ;
599+
600+ deleteAllRowsWithoutCrrReplication ( db , "queue_landing_state" , logger ) ;
601+ runStatement (
602+ db ,
603+ "insert into kv (key, value) values (?, ?) on conflict(key) do update set value = excluded.value" ,
604+ [ QUEUE_OVERHAUL_WIPE_MARKER , new Date ( ) . toISOString ( ) ] ,
605+ ) ;
606+ } catch {
607+ // Table may not exist on a brand-new DB; initialization will create both
608+ // tables and the next startup will record the marker. Skipping the wipe
609+ // on a fresh DB is correct (nothing to wipe).
610+ }
611+ }
612+
524613function removeExcludedCrrMetadata ( db : DatabaseSyncType , logger ?: Logger ) : void {
525614 for ( const tableName of LOCAL_ONLY_CRR_EXCLUDED_TABLES ) {
526615 const clockTableName = `${ tableName } __crsql_clock` ;
@@ -1821,32 +1910,6 @@ function migrate(db: MigrationDb) {
18211910 try { db . run ( "alter table queue_landing_state add column wait_reason text" ) ; } catch { }
18221911 try { db . run ( "alter table queue_landing_state add column updated_at text" ) ; } catch { }
18231912
1824- // One-shot wipe of legacy queue_landing_state on upgrade to the stacked-PR
1825- // queue overhaul. The new queue creates PRs with chain bases (PR_N's base =
1826- // previous lane's branch) instead of all-into-main, so any in-flight queue
1827- // from the old code path would be misinterpreted by the new landing loop.
1828- // Wiping rather than migrating is a deliberate choice — the user accepts
1829- // losing in-flight queues in exchange for not maintaining a translation
1830- // layer for every legacy field shape.
1831- const QUEUE_OVERHAUL_WIPE_MARKER = "queue_landing_state.wiped_for_stacked_overhaul.v1" ;
1832- try {
1833- const row = db . get < { value : string } > (
1834- "select value from kv where key = ?" ,
1835- [ QUEUE_OVERHAUL_WIPE_MARKER ] ,
1836- ) ;
1837- if ( ! row ) {
1838- db . run ( "delete from queue_landing_state" ) ;
1839- db . run (
1840- "insert into kv (key, value) values (?, ?) on conflict(key) do update set value = excluded.value" ,
1841- [ QUEUE_OVERHAUL_WIPE_MARKER , new Date ( ) . toISOString ( ) ] ,
1842- ) ;
1843- }
1844- } catch {
1845- // Table may not exist on a brand-new DB; initialization will create both
1846- // tables and the next startup will record the marker. Skipping the wipe
1847- // on a fresh DB is correct (nothing to wipe).
1848- }
1849-
18501913 // Rebase dismiss/defer persistence
18511914 db . run ( `
18521915 create table if not exists rebase_dismissed (
@@ -2849,6 +2912,8 @@ export async function openKvDb(dbPath: string, logger: Logger): Promise<AdeDb> {
28492912 removeExcludedCrrMetadata ( db , logger ) ;
28502913 }
28512914
2915+ wipeQueueLandingStateForStackedOverhaulIfNeeded ( db , logger ) ;
2916+
28522917 if ( crsqliteLoaded ) {
28532918 loadCrsqliteIfAvailable ( ) ;
28542919 ensureCrrTables ( db , logger ) ;
@@ -2951,17 +3016,19 @@ export async function openKvDb(dbPath: string, logger: Logger): Promise<AdeDb> {
29513016 [ version ]
29523017 ) ;
29533018
2954- return rows . map ( ( row ) => ( {
2955- table : row . table_name ,
2956- pk : encodeSyncScalar ( row . pk ) ,
2957- cid : row . cid ,
2958- val : encodeSyncScalar ( row . val ) ,
2959- col_version : Number ( row . col_version ) ,
2960- db_version : Number ( row . db_version ) ,
2961- site_id : Buffer . from ( row . site_id ) . toString ( "hex" ) ,
2962- cl : Number ( row . cl ) ,
2963- seq : Number ( row . seq ) ,
2964- } ) ) ;
3019+ return rows
3020+ . filter ( ( row ) => ! isLocalOnlyQueueWipeMarkerRawChange ( row ) )
3021+ . map ( ( row ) => ( {
3022+ table : row . table_name ,
3023+ pk : encodeSyncScalar ( row . pk ) ,
3024+ cid : row . cid ,
3025+ val : encodeSyncScalar ( row . val ) ,
3026+ col_version : Number ( row . col_version ) ,
3027+ db_version : Number ( row . db_version ) ,
3028+ site_id : Buffer . from ( row . site_id ) . toString ( "hex" ) ,
3029+ cl : Number ( row . cl ) ,
3030+ seq : Number ( row . seq ) ,
3031+ } ) ) ;
29653032 } ,
29663033 applyChanges : ( changes : CrsqlChangeRow [ ] ) => {
29673034 if ( ! crsqliteLoaded ) return { appliedCount : 0 , dbVersion : 0 , touchedTables : [ ] , rebuiltFts : false } ;
@@ -2970,6 +3037,7 @@ export async function openKvDb(dbPath: string, logger: Logger): Promise<AdeDb> {
29703037 runStatement ( db , "begin" ) ;
29713038 try {
29723039 for ( const rawChange of changes ) {
3040+ if ( isLocalOnlyQueueWipeMarkerChange ( rawChange ) ) continue ;
29733041 // Skip changes for tables that no longer exist in the schema
29743042 // (e.g. unified_memories removed in #329).
29753043 if ( ! rawHasTable ( db , rawChange . table ) ) continue ;
0 commit comments