Skip to content

Commit 016fb4b

Browse files
committed
fix(firo): make Spark anon-set sync resumable
1 parent 12d515f commit 016fb4b

5 files changed

Lines changed: 565 additions & 96 deletions

File tree

lib/db/sqlite/firo_cache_coordinator.dart

Lines changed: 224 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -84,72 +84,228 @@ abstract class FiroCacheCoordinator {
8484
});
8585
}
8686

87+
/// Sync the Spark anonymity set cache for `groupId` from the node.
88+
///
89+
/// Each sector the server returns is persisted to disk in its own SQLite
90+
/// transaction against an in-progress SparkSet row (`complete = 0`). The
91+
/// row and its coins are invisible to readers until
92+
/// [_markSparkAnonSetComplete] flips the flag after a strict integrity
93+
/// check (linked coin count == expected delta) passes.
94+
///
95+
/// Resumability: if a prior sync crashed mid-download this function
96+
/// picks up at the count of coins already linked to the in-progress
97+
/// row, never re-downloading sectors that committed. If the server's
98+
/// blockHash has shifted between attempts the partial row is discarded
99+
/// (orderKey indices no longer align at the new blockHash) and the
100+
/// delta is fetched fresh.
101+
///
102+
/// Integrity: the finalize step rolls back and leaves `complete = 0`
103+
/// if the link count doesn't match the expected delta — a partial or
104+
/// over-full cache never becomes the current set.
87105
static Future<void> runFetchAndUpdateSparkAnonSetCacheForGroupId(
88106
int groupId,
89107
ElectrumXClient client,
90108
CryptoCurrencyNetwork network,
91109
void Function(int countFetched, int totalCount)? progressUpdated,
92110
) async {
93111
await _setLocks[network]!.protect(() async {
94-
const sectorSize =
95-
1500; // chosen as a somewhat decent value. Could be changed in the future if wanted/needed
112+
const sectorSize = 1500;
113+
96114
final prevMeta = await FiroCacheCoordinator.getLatestSetInfoForGroupId(
97115
groupId,
98116
network,
99117
);
100-
101118
final prevSize = prevMeta?.size ?? 0;
102119

103120
final meta = await client.getSparkAnonymitySetMeta(coinGroupId: groupId);
104121

105-
progressUpdated?.call(prevSize, meta.size);
122+
void updateProgress(int fetchedDelta) {
123+
progressUpdated?.call(prevSize + fetchedDelta, meta.size);
124+
}
106125

107126
if (prevMeta?.blockHash == meta.blockHash) {
108-
Logging.instance.d("prevMeta?.blockHash == meta.blockHash");
127+
updateProgress(0);
128+
if (prevMeta?.size == meta.size) {
129+
Logging.instance.d(
130+
"Spark anon set groupId=$groupId already up to date "
131+
"(blockHash=${meta.blockHash}, size=${meta.size})",
132+
);
133+
return;
134+
}
135+
// Server reports a different size for the same blockHash. On
136+
// Firo's server this should be impossible (blockHash advances
137+
// whenever a coin is added), so this is treated as an anomaly.
138+
// Refuse to sync: appending coins to the existing finalized row
139+
// (what INSERT OR IGNORE in the writer would produce) would leak
140+
// unverified coins into a set whose setHash we've already
141+
// committed to.
142+
Logging.instance.w(
143+
"Spark anon set groupId=$groupId server reports different size "
144+
"(${prevMeta!.size} -> ${meta.size}) at same blockHash "
145+
"${meta.blockHash}; skipping sync to preserve cached state.",
146+
);
109147
return;
110148
}
111149

112150
final numberOfCoinsToFetch = meta.size - prevSize;
151+
if (numberOfCoinsToFetch < 0) {
152+
updateProgress(0);
153+
// Reorg-style shrink: server has fewer coins than our last
154+
// finalized set. Refuse to sync rather than invalidate cached data.
155+
Logging.instance.w(
156+
"Spark anon set groupId=$groupId appears to have shrunk "
157+
"($prevSize -> ${meta.size}); skipping sync to preserve "
158+
"cached data.",
159+
);
160+
return;
161+
}
162+
if (numberOfCoinsToFetch == 0) {
163+
updateProgress(0);
164+
// blockHash advanced but no new coins in this group's set. We do
165+
// not materialise a new SparkSet row for an empty delta — a
166+
// same-size row would create a tiebreaker ambiguity in
167+
// _getLatestSetInfoForGroupId. Drop any stray in-progress row so
168+
// it doesn't confuse the next resume attempt.
169+
final stale = await _Reader._getIncompleteSetForGroupId(
170+
groupId,
171+
db: _FiroCache.setCacheDB(network),
172+
);
173+
if (stale.isNotEmpty) {
174+
await _workers[network]!.runTask(
175+
FCTask(
176+
func: FCFuncName._deleteIncompleteSparkSetsForGroup,
177+
data: groupId,
178+
),
179+
);
180+
}
181+
return;
182+
}
183+
184+
// Decide whether to resume an existing in-progress row or start
185+
// fresh. Cases:
186+
// * no in-progress row -> cursor = 0
187+
// * in-progress blockHash differs -> discard, cursor = 0
188+
// * in-progress linked > expected delta -> corrupt, discard,
189+
// cursor = 0
190+
// * in-progress blockHash matches -> cursor = linkedSoFar
191+
final incomplete = await _Reader._getIncompleteSetForGroupId(
192+
groupId,
193+
db: _FiroCache.setCacheDB(network),
194+
);
113195

114-
final fullSectorCount = numberOfCoinsToFetch ~/ sectorSize;
115-
final remainder = numberOfCoinsToFetch % sectorSize;
196+
int cursor;
197+
if (incomplete.isEmpty) {
198+
cursor = 0;
199+
} else if (incomplete.length > 1) {
200+
Logging.instance.w(
201+
"Spark anon set groupId=$groupId has ${incomplete.length} "
202+
"in-progress rows; discarding ambiguous state.",
203+
);
204+
await _workers[network]!.runTask(
205+
FCTask(
206+
func: FCFuncName._deleteIncompleteSparkSetsForGroup,
207+
data: groupId,
208+
),
209+
);
210+
cursor = 0;
211+
} else {
212+
final incBlockHash = incomplete.first["blockHash"] as String;
213+
final incSetHash = incomplete.first["setHash"] as String;
214+
final incSetId = incomplete.first["id"] as int;
215+
216+
// Discard the in-progress row if either blockHash or setHash
217+
// disagrees with the current meta. blockHash disagreement means
218+
// the server's indexing has shifted; setHash disagreement at the
219+
// same blockHash would indicate the in-progress row targets a
220+
// different set snapshot and resuming would mix coin contexts.
221+
if (incBlockHash != meta.blockHash ||
222+
incSetHash != meta.setHash) {
223+
Logging.instance.i(
224+
"Spark anon set groupId=$groupId in-progress "
225+
"(blockHash=$incBlockHash, setHash=$incSetHash) does not "
226+
"match meta (blockHash=${meta.blockHash}, "
227+
"setHash=${meta.setHash}); discarding in-flight row.",
228+
);
229+
await _workers[network]!.runTask(
230+
FCTask(
231+
func: FCFuncName._deleteIncompleteSparkSetsForGroup,
232+
data: groupId,
233+
),
234+
);
235+
cursor = 0;
236+
} else {
237+
final linked = await _Reader._countSetCoins(
238+
incSetId,
239+
db: _FiroCache.setCacheDB(network),
240+
);
241+
if (linked > numberOfCoinsToFetch) {
242+
Logging.instance.w(
243+
"Spark anon set groupId=$groupId in-progress row has "
244+
"$linked linked coins but delta is only "
245+
"$numberOfCoinsToFetch; discarding in-flight row.",
246+
);
247+
await _workers[network]!.runTask(
248+
FCTask(
249+
func: FCFuncName._deleteIncompleteSparkSetsForGroup,
250+
data: groupId,
251+
),
252+
);
253+
cursor = 0;
254+
} else {
255+
cursor = linked;
256+
}
257+
}
258+
}
116259

117-
final List<dynamic> coins = [];
260+
updateProgress(cursor);
261+
262+
while (cursor < numberOfCoinsToFetch) {
263+
final endIndex = cursor + sectorSize <= numberOfCoinsToFetch
264+
? cursor + sectorSize
265+
: numberOfCoinsToFetch;
266+
final expected = endIndex - cursor;
118267

119-
for (int i = 0; i < fullSectorCount; i++) {
120-
final start = (i * sectorSize);
121268
final data = await client.getSparkAnonymitySetBySector(
122269
coinGroupId: groupId,
123270
latestBlock: meta.blockHash,
124-
startIndex: start,
125-
endIndex: start + sectorSize,
271+
startIndex: cursor,
272+
endIndex: endIndex,
126273
);
127-
progressUpdated?.call(start + sectorSize, numberOfCoinsToFetch);
128274

129-
coins.addAll(data);
130-
}
275+
// Refuse to persist a sector whose size doesn't match the request:
276+
// a partial or over-full server response would break the finalize-
277+
// time integrity check and potentially skew the resume cursor.
278+
if (data.length != expected) {
279+
throw Exception(
280+
"Spark anon set sector size mismatch for groupId=$groupId: "
281+
"requested $expected coins in range [$cursor, $endIndex), "
282+
"server returned ${data.length}",
283+
);
284+
}
285+
286+
final sectorCoins = data
287+
.map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId))
288+
.toList();
131289

132-
if (remainder > 0) {
133-
final data = await client.getSparkAnonymitySetBySector(
134-
coinGroupId: groupId,
135-
latestBlock: meta.blockHash,
136-
startIndex: numberOfCoinsToFetch - remainder,
137-
endIndex: numberOfCoinsToFetch,
290+
await _workers[network]!.runTask(
291+
FCTask(
292+
func: FCFuncName._insertSparkAnonSetCoinsIncremental,
293+
data: (meta, sectorCoins, cursor),
294+
),
138295
);
139-
progressUpdated?.call(numberOfCoinsToFetch, numberOfCoinsToFetch);
140296

141-
coins.addAll(data);
297+
cursor = endIndex;
298+
updateProgress(cursor);
142299
}
143300

144-
final result =
145-
coins
146-
.map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId))
147-
.toList();
148-
301+
// All sectors persisted. Flip `complete = 1` iff the link count
302+
// matches the expected delta. On failure the in-progress row stays
303+
// hidden and the error propagates; the next sync will observe the
304+
// over-linked row and reset.
149305
await _workers[network]!.runTask(
150306
FCTask(
151-
func: FCFuncName._updateSparkAnonSetCoinsWith,
152-
data: (meta, result),
307+
func: FCFuncName._markSparkAnonSetComplete,
308+
data: (meta, numberOfCoinsToFetch),
153309
),
154310
);
155311
});
@@ -250,6 +406,44 @@ abstract class FiroCacheCoordinator {
250406
.toList();
251407
}
252408

409+
static Future<({SparkAnonymitySetMeta meta, List<RawSparkCoin> coins})?>
410+
getSetCoinsAndLatestSetInfoForGroupId(
411+
int groupId,
412+
CryptoCurrencyNetwork network,
413+
) async {
414+
final resultSet = await _Reader._getSetCoinsAndLatestSetInfoForGroupId(
415+
groupId,
416+
db: _FiroCache.setCacheDB(network),
417+
);
418+
if (resultSet.isEmpty) {
419+
return null;
420+
}
421+
422+
final first = resultSet.first;
423+
final coins = resultSet
424+
.map(
425+
(row) => RawSparkCoin(
426+
serialized: row["serialized"] as String,
427+
txHash: row["txHash"] as String,
428+
context: row["context"] as String,
429+
groupId: groupId,
430+
),
431+
)
432+
.toList()
433+
.reversed
434+
.toList();
435+
436+
return (
437+
meta: SparkAnonymitySetMeta(
438+
coinGroupId: groupId,
439+
blockHash: first["blockHash"] as String,
440+
setHash: first["setHash"] as String,
441+
size: first["size"] as int,
442+
),
443+
coins: coins,
444+
);
445+
}
446+
253447
static Future<SparkAnonymitySetMeta?> getLatestSetInfoForGroupId(
254448
int groupId,
255449
CryptoCurrencyNetwork network,

0 commit comments

Comments
 (0)