Skip to content

Commit ce6457b

Browse files
cursoragentarul28
andcommitted
Fix CRR PK retrofit corruption, sync schema skew ACK, and failed brain connect wipe
- Skip primary-key retrofit for tables with __crsql_clock companions (same guard as FK retrofit) - Reject unknown sync tables instead of silently skipping; use applyChanges appliedCount in ACKs - Defer device registry clear until connectToBrain succeeds so failed connects keep local pairing data Co-authored-by: Arul Sharma <arul28@users.noreply.github.com>
1 parent c04e0b3 commit ce6457b

7 files changed

Lines changed: 158 additions & 19 deletions

File tree

apps/ade-cli/src/services/sync/syncHostService.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2846,8 +2846,8 @@ export function createSyncHostService(args: SyncHostServiceArgs) {
28462846
try {
28472847
let appliedCount = 0;
28482848
if (changes.length > 0) {
2849-
args.db.sync.applyChanges(changes);
2850-
appliedCount = changes.length;
2849+
const applyResult = args.db.sync.applyChanges(changes);
2850+
appliedCount = applyResult.appliedCount;
28512851
peer.lastAppliedAt = nowIso();
28522852
lastBroadcastAt = nowIso();
28532853
args.onStateChanged?.();

apps/ade-cli/src/services/sync/syncPeerService.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,13 +313,15 @@ export function createSyncPeerService(args: SyncPeerServiceArgs) {
313313
const payload = (envelope.payload ?? {}) as SyncChangesetBatchPayload;
314314
const changes = Array.isArray(payload.changes) ? payload.changes : [];
315315
try {
316+
let appliedCount = 0;
316317
if (changes.length) {
317-
args.db.sync.applyChanges(changes);
318+
const applyResult = args.db.sync.applyChanges(changes);
319+
appliedCount = applyResult.appliedCount;
318320
args.onRemoteChangesApplied?.();
319321
}
320322
latestRemoteDbVersion = Math.max(latestRemoteDbVersion, Math.floor(payload.toDbVersion ?? latestRemoteDbVersion));
321323
if (connectionDraft) connectionDraft.lastRemoteDbVersion = latestRemoteDbVersion;
322-
sendChangesetAck(payload, true, args.db.sync.getDbVersion(), changes.length);
324+
sendChangesetAck(payload, true, args.db.sync.getDbVersion(), appliedCount);
323325
emitStatus();
324326
} catch (error) {
325327
sendChangesetAck(payload, false, args.db.sync.getDbVersion(), 0, error);
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import fs from "node:fs";
2+
import net from "node:net";
3+
import os from "node:os";
4+
import path from "node:path";
5+
import { afterEach, describe, expect, it, vi } from "vitest";
6+
import { openKvDb, type AdeDb } from "../../../../desktop/src/main/services/state/kvDb";
7+
import { createSyncService, type SyncService } from "./syncService";
8+
9+
function createLogger() {
10+
return {
11+
debug: vi.fn(),
12+
info: vi.fn(),
13+
warn: vi.fn(),
14+
error: vi.fn(),
15+
};
16+
}
17+
18+
function makeTempRoot(prefix: string): string {
19+
return fs.mkdtempSync(path.join(os.tmpdir(), prefix));
20+
}
21+
22+
async function getUnusedPort(): Promise<number> {
23+
const server = net.createServer();
24+
await new Promise<void>((resolve) => server.listen(0, "127.0.0.1", resolve));
25+
const address = server.address();
26+
const port = typeof address === "object" && address ? address.port : 0;
27+
await new Promise<void>((resolve, reject) => {
28+
server.close((error) => (error ? reject(error) : resolve()));
29+
});
30+
return port;
31+
}
32+
33+
function createService(db: AdeDb, projectRoot: string): SyncService {
34+
return createSyncService({
35+
db,
36+
logger: createLogger() as any,
37+
projectRoot,
38+
hostStartupEnabled: false,
39+
localDeviceIdPath: path.join(projectRoot, ".ade", "secrets", "sync-device-id"),
40+
phonePairingStateDir: path.join(projectRoot, ".ade", "secrets", "sync"),
41+
fileService: {} as any,
42+
laneService: {
43+
list: vi.fn(async () => []),
44+
} as any,
45+
prService: {} as any,
46+
sessionService: {
47+
list: vi.fn(() => []),
48+
get: vi.fn(() => null),
49+
readTranscriptTail: vi.fn(async () => ""),
50+
} as any,
51+
ptyService: {
52+
readTranscriptTail: vi.fn(async () => ""),
53+
enrichSessions: vi.fn((rows: unknown[]) => rows),
54+
} as any,
55+
computerUseArtifactBrokerService: {
56+
listArtifacts: vi.fn(() => []),
57+
} as any,
58+
agentChatService: {
59+
listSessions: vi.fn(async () => []),
60+
} as any,
61+
processService: {
62+
listRuntime: vi.fn(() => []),
63+
} as any,
64+
});
65+
}
66+
67+
describe("createSyncService", () => {
68+
const cleanupRoots: string[] = [];
69+
70+
afterEach(() => {
71+
for (const root of cleanupRoots.splice(0)) {
72+
fs.rmSync(root, { recursive: true, force: true });
73+
}
74+
});
75+
76+
it("keeps the local device registry when connectToBrain fails before handshake", async () => {
77+
const projectRoot = makeTempRoot("ade-sync-service-connect-fail-");
78+
cleanupRoots.push(projectRoot);
79+
const db = await openKvDb(path.join(projectRoot, ".ade", "kv.sqlite"), createLogger() as any);
80+
(db.sync as { isAvailable?: () => boolean }).isAvailable = () => true;
81+
const service = createService(db, projectRoot);
82+
const registry = service.getDeviceRegistryService();
83+
registry.upsertPeerMetadata({
84+
deviceId: "peer-old",
85+
deviceName: "Previous host",
86+
platform: "macOS",
87+
deviceType: "desktop",
88+
siteId: "peer-site",
89+
dbVersion: 12,
90+
});
91+
92+
expect(registry.getDevice("peer-old")?.name).toBe("Previous host");
93+
94+
try {
95+
await expect(service.connectToBrain({
96+
host: "127.0.0.1",
97+
port: await getUnusedPort(),
98+
token: "bad-token",
99+
})).rejects.toThrow();
100+
101+
expect(registry.getDevice("peer-old")?.name).toBe("Previous host");
102+
} finally {
103+
await service.dispose();
104+
db.close();
105+
}
106+
});
107+
});

apps/ade-cli/src/services/sync/syncService.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1018,11 +1018,11 @@ export function createSyncService(args: SyncServiceArgs) {
10181018
throw new Error("Machine sync is unavailable because the CRDT database extension is not loaded.");
10191019
}
10201020
await stopHostIfRunning();
1021-
deviceRegistryService.clearClusterRegistryForViewerJoin();
10221021
writeSavedDraft(draft);
10231022
syncPeerService.setSavedDraft(draft);
10241023
try {
10251024
await syncPeerService.connect(draft);
1025+
deviceRegistryService.clearClusterRegistryForViewerJoin();
10261026
deviceRegistryService.touchLocalDevice({ lastSeenAt: nowIso() });
10271027
syncPeerService.flushLocalChanges();
10281028
await sleep(150);

apps/desktop/src/main/services/state/kvDb.migrations.test.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { createRequire } from "node:module";
55
import { describe, expect, it } from "vitest";
66
import { createLaneWorktreeLockService } from "../lanes/laneWorktreeLockService";
77
import { openKvDb } from "./kvDb";
8+
import { isCrsqliteAvailable } from "./crsqliteExtension";
89

910
const require = createRequire(import.meta.url);
1011

@@ -156,6 +157,25 @@ describe("kvDb migrations - legacy upgrade paths", () => {
156157
}
157158
});
158159

160+
it.skipIf(!isCrsqliteAvailable())("skips primary-key retrofit for tables that already have __crsql_clock companions", async () => {
161+
const dbPath = makeDbPath("ade-kvdb-pk-retrofit-skip-crr-");
162+
const first = await openKvDb(dbPath, createLogger());
163+
first.run("create unique index if not exists temp_ade_pk_retrofit_probe on lanes(project_id, name)");
164+
first.close();
165+
166+
const reopened = await openKvDb(dbPath, createLogger());
167+
try {
168+
expect(
169+
reopened.get<{ name: string }>(
170+
"select name from sqlite_master where type = 'table' and name = 'lanes__crsql_clock' limit 1",
171+
)?.name,
172+
).toBe("lanes__crsql_clock");
173+
reopened.run("drop index if exists temp_ade_pk_retrofit_probe");
174+
} finally {
175+
reopened.close();
176+
}
177+
});
178+
159179
it("coalesces duplicate lane_linear_issue_links rows during migrate", async () => {
160180
const dbPath = makeDbPath("ade-kvdb-linear-links-dedupe-");
161181
fs.mkdirSync(path.dirname(dbPath), { recursive: true });

apps/desktop/src/main/services/state/kvDb.sync.test.ts

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -278,28 +278,28 @@ describe.skipIf(!isCrsqliteAvailable())("kvDb sync foundation", () => {
278278

279279
it("ignores CRDT changes for legacy unified_memories tables removed in #329", async () => {
280280
const db2 = await openKvDb(makeDbPath("ade-kvdb-sync-mem-skip-"), createLogger() as any);
281-
const legacyChange = {
282-
table: "unified_memories",
283-
pk: Buffer.from([0x01, 0x06, 0, 0, 0, 0, 0, 1]).toString("base64"),
281+
const legacyChanges = ["unified_memories", "unified_memories_fts_content"].map((table, index) => ({
282+
table,
283+
pk: Buffer.from([0x01, 0x06, 0, 0, 0, 0, 0, index + 1]).toString("base64"),
284284
cid: "id",
285285
val: null,
286286
col_version: 1,
287-
db_version: 1,
287+
db_version: index + 1,
288288
site_id: "a".repeat(32),
289289
cl: 1,
290-
seq: 1,
291-
};
290+
seq: index,
291+
}));
292292

293293
const beforeVersion = db2.sync.getDbVersion();
294-
const result = db2.sync.applyChanges([legacyChange as any]);
294+
const result = db2.sync.applyChanges(legacyChanges as any);
295295
expect(result.appliedCount).toBe(0);
296296
expect(result.touchedTables).toEqual([]);
297297
expect(db2.sync.getDbVersion()).toBe(beforeVersion);
298298

299299
db2.close();
300300
});
301301

302-
it("silently skips CRDT changes for unknown future tables", async () => {
302+
it("rejects CRDT changes for unknown future tables", async () => {
303303
const db2 = await openKvDb(makeDbPath("ade-kvdb-sync-future-table-"), createLogger() as any);
304304
const futureChange = {
305305
table: "missing_future_table",
@@ -314,9 +314,7 @@ describe.skipIf(!isCrsqliteAvailable())("kvDb sync foundation", () => {
314314
};
315315

316316
const beforeVersion = db2.sync.getDbVersion();
317-
const result = db2.sync.applyChanges([futureChange as any]);
318-
expect(result.appliedCount).toBe(0);
319-
expect(result.touchedTables).toEqual([]);
317+
expect(() => db2.sync.applyChanges([futureChange as any])).toThrow(/unknown_sync_table:missing_future_table/);
320318
expect(db2.sync.getDbVersion()).toBe(beforeVersion);
321319

322320
db2.close();

apps/desktop/src/main/services/state/kvDb.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@ import type { ApplyRemoteChangesResult, CrsqlChangeRow, SyncScalar } from "../..
1111

1212
type DatabaseSyncConstructor = new (dbPath: string, options?: { allowExtension?: boolean }) => DatabaseSyncType;
1313

14+
/** CRDT tables removed from the schema; inbound tombstones are ignored. */
15+
const SYNC_RETIRED_TABLES = new Set(["unified_memories", "unified_memories_fts"]);
16+
17+
function isRetiredIncomingSyncTable(tableName: string): boolean {
18+
return SYNC_RETIRED_TABLES.has(tableName) || tableName.startsWith("unified_memories_");
19+
}
20+
1421
// Anchor createRequire to a synthetic CJS file so builtin resolution follows the active runtime.
1522
const require = createRequire(path.join(process.cwd(), "ade-runtime.cjs"));
1623
const { DatabaseSync } = require("node:sqlite") as { DatabaseSync: DatabaseSyncConstructor };
@@ -208,6 +215,10 @@ function retrofitLegacyPrimaryKeyNotNullSchema(db: DatabaseSyncType): boolean {
208215
runStatement(db, "pragma foreign_keys = off");
209216
try {
210217
for (const table of tables) {
218+
// CRR tables must be altered via crsql_begin_alter/commit_alter or rebuilt
219+
// with rebuildCrrTableWithBackfill — never DROP/rename wholesale.
220+
if (rawHasTable(db, `${table.name}__crsql_clock`)) continue;
221+
211222
const tableInfo = allRows<{
212223
name: string;
213224
type: string;
@@ -3038,9 +3049,10 @@ export async function openKvDb(dbPath: string, logger: Logger): Promise<AdeDb> {
30383049
try {
30393050
for (const rawChange of changes) {
30403051
if (isLocalOnlyQueueWipeMarkerChange(rawChange)) continue;
3041-
// Skip changes for tables that no longer exist in the schema
3042-
// (e.g. unified_memories removed in #329).
3043-
if (!rawHasTable(db, rawChange.table)) continue;
3052+
if (!rawHasTable(db, rawChange.table)) {
3053+
if (isRetiredIncomingSyncTable(rawChange.table)) continue;
3054+
throw new Error(`unknown_sync_table:${rawChange.table}`);
3055+
}
30443056
const change = normalizeIncomingCrsqlChange(db, rawChange);
30453057
const result = runStatement(
30463058
db,

0 commit comments

Comments
 (0)