@@ -106,6 +106,7 @@ const PEER_BACKPRESSURE_BYTES = 4 * 1024 * 1024;
106106const MOBILE_COMMAND_RESULT_CACHE_TTL_MS = 30 * 60 * 1000 ;
107107const MOBILE_COMMAND_RESULT_CACHE_MAX_ENTRIES = 512 ;
108108const CHANGESET_ACK_TIMEOUT_MS = 10_000 ;
109+ const MAX_CHANGESET_ACK_RETRIES = 6 ;
109110const LANE_PRESENCE_TTL_MS = 60_000 ;
110111const SYNC_MDNS_SERVICE_TYPE = "ade-sync" ;
111112export const SYNC_TAILNET_DISCOVERY_SERVICE_NAME = "svc:ade-sync" ;
@@ -185,8 +186,6 @@ type PersistedMobileCommand = {
185186 completedAtMs : number ;
186187} ;
187188
188- const mobileCommandResultCache = new Map < string , CachedMobileCommand > ( ) ;
189-
190189function stableJsonValue ( value : unknown ) : unknown {
191190 if ( value == null ) return value ;
192191 if ( Array . isArray ( value ) ) return value . map ( stableJsonValue ) ;
@@ -214,31 +213,6 @@ function addMobileCommandWaiter(record: CachedMobileCommand, peer: PeerState, re
214213 record . waiters . push ( { peer, requestId } ) ;
215214}
216215
217- function pruneMobileCommandResultCache ( nowMs = Date . now ( ) ) : void {
218- for ( const [ key , record ] of mobileCommandResultCache ) {
219- const referenceMs = record . completedAtMs ?? record . acceptedAtMs ;
220- if ( nowMs - referenceMs > MOBILE_COMMAND_RESULT_CACHE_TTL_MS ) {
221- mobileCommandResultCache . delete ( key ) ;
222- }
223- }
224- if ( mobileCommandResultCache . size <= MOBILE_COMMAND_RESULT_CACHE_MAX_ENTRIES ) return ;
225-
226- const completed = [ ...mobileCommandResultCache . entries ( ) ]
227- . filter ( ( [ , record ] ) => record . completedAtMs != null )
228- . sort ( ( [ , left ] , [ , right ] ) => ( left . completedAtMs ?? left . acceptedAtMs ) - ( right . completedAtMs ?? right . acceptedAtMs ) ) ;
229- for ( const [ key ] of completed ) {
230- if ( mobileCommandResultCache . size <= MOBILE_COMMAND_RESULT_CACHE_MAX_ENTRIES ) break ;
231- mobileCommandResultCache . delete ( key ) ;
232- }
233- const inFlight = [ ...mobileCommandResultCache . entries ( ) ]
234- . filter ( ( [ , record ] ) => record . completedAtMs == null )
235- . sort ( ( [ , left ] , [ , right ] ) => left . acceptedAtMs - right . acceptedAtMs ) ;
236- for ( const [ key ] of inFlight ) {
237- if ( mobileCommandResultCache . size <= MOBILE_COMMAND_RESULT_CACHE_MAX_ENTRIES ) break ;
238- mobileCommandResultCache . delete ( key ) ;
239- }
240- }
241-
242216type SyncHostServiceArgs = {
243217 db : AdeDb ;
244218 logger : Logger ;
@@ -523,11 +497,30 @@ export function createSyncHostService(args: SyncHostServiceArgs) {
523497 } ;
524498
525499 const peers = new Set < PeerState > ( ) ;
500+ const mobileCommandResultCache = new Map < string , CachedMobileCommand > ( ) ;
526501 let commandReplayCount = 0 ;
527502 let commandConflictCount = 0 ;
528503 let lastCommandResultLatencyMs : number | null = null ;
529504 let lastChangesetAckLatencyMs : number | null = null ;
530505
506+ const pruneMobileCommandResultCache = ( nowMs = Date . now ( ) ) : void => {
507+ for ( const [ key , record ] of mobileCommandResultCache ) {
508+ if ( record . completedAtMs == null ) continue ;
509+ if ( nowMs - record . completedAtMs > MOBILE_COMMAND_RESULT_CACHE_TTL_MS ) {
510+ mobileCommandResultCache . delete ( key ) ;
511+ }
512+ }
513+ if ( mobileCommandResultCache . size <= MOBILE_COMMAND_RESULT_CACHE_MAX_ENTRIES ) return ;
514+
515+ const completed = [ ...mobileCommandResultCache . entries ( ) ]
516+ . filter ( ( [ , record ] ) => record . completedAtMs != null )
517+ . sort ( ( [ , left ] , [ , right ] ) => ( left . completedAtMs ?? left . acceptedAtMs ) - ( right . completedAtMs ?? right . acceptedAtMs ) ) ;
518+ for ( const [ key ] of completed ) {
519+ if ( mobileCommandResultCache . size <= MOBILE_COMMAND_RESULT_CACHE_MAX_ENTRIES ) break ;
520+ mobileCommandResultCache . delete ( key ) ;
521+ }
522+ } ;
523+
531524 const readPersistedCommandLedger = ( ) : PersistedMobileCommand [ ] => {
532525 try {
533526 if ( ! fs . existsSync ( commandLedgerPath ) ) return [ ] ;
@@ -1698,6 +1691,21 @@ export function createSyncHostService(args: SyncHostServiceArgs) {
16981691 if ( peer . pendingChangesetBatch ) {
16991692 if ( nowMs - peer . pendingChangesetBatch . sentAtMs >= CHANGESET_ACK_TIMEOUT_MS ) {
17001693 const pending = peer . pendingChangesetBatch ;
1694+ if ( pending . retryCount >= MAX_CHANGESET_ACK_RETRIES ) {
1695+ args . logger . warn ( "sync_host.changeset_ack_timeout" , {
1696+ peerDeviceId : peer . metadata . deviceId ,
1697+ batchId : pending . batchId ,
1698+ fromDbVersion : pending . fromDbVersion ,
1699+ toDbVersion : pending . toDbVersion ,
1700+ retryCount : pending . retryCount ,
1701+ } ) ;
1702+ try {
1703+ peer . ws . close ( 4000 , "Changeset acknowledgement timed out" ) ;
1704+ } catch {
1705+ // ignore close failures
1706+ }
1707+ continue ;
1708+ }
17011709 const resent = resendPendingChangesetBatch ( peer ) ;
17021710 args . logger . debug ( "sync_host.changeset_ack_retry" , {
17031711 peerDeviceId : peer . metadata . deviceId ,
@@ -1736,19 +1744,28 @@ export function createSyncHostService(args: SyncHostServiceArgs) {
17361744 args . logger . debug ( "sync_host.changeset_ack_ignored" , {
17371745 peerDeviceId : peer . metadata ?. deviceId ?? null ,
17381746 expectedBatchId : pending . batchId ,
1739- receivedBatchId : payload . batchId ?? null ,
1747+ receivedBatchId : payload . batchId ,
17401748 } ) ;
17411749 return ;
17421750 }
17431751 if ( ! payload . ok ) {
1752+ pending . retryCount += 1 ;
17441753 pending . sentAtMs = Date . now ( ) ;
17451754 args . logger . warn ( "sync_host.changeset_ack_failed" , {
17461755 peerDeviceId : peer . metadata ?. deviceId ?? null ,
17471756 batchId : pending . batchId ,
17481757 fromDbVersion : pending . fromDbVersion ,
17491758 toDbVersion : pending . toDbVersion ,
1759+ retryCount : pending . retryCount ,
17501760 error : payload . error ?. message ?? "Changeset apply failed." ,
17511761 } ) ;
1762+ if ( pending . retryCount >= MAX_CHANGESET_ACK_RETRIES ) {
1763+ try {
1764+ peer . ws . close ( 4000 , "Changeset apply failed repeatedly" ) ;
1765+ } catch {
1766+ // ignore close failures
1767+ }
1768+ }
17521769 return ;
17531770 }
17541771 if ( payload . toDbVersion < pending . toDbVersion ) return ;
@@ -2306,6 +2323,7 @@ export function createSyncHostService(args: SyncHostServiceArgs) {
23062323 }
23072324 case "changeset_batch" : {
23082325 const payload = ( envelope . payload ?? { } ) as SyncChangesetBatchPayload ;
2326+ const batchId = payload . batchId || envelope . requestId || "" ;
23092327 const changes = Array . isArray ( payload . changes ) ? payload . changes as CrsqlChangeRow [ ] : [ ] ;
23102328 try {
23112329 let appliedCount = 0 ;
@@ -2318,7 +2336,7 @@ export function createSyncHostService(args: SyncHostServiceArgs) {
23182336 broadcastBrainStatus ( ) ;
23192337 }
23202338 sendRequired ( peer , "changeset_ack" , {
2321- batchId : payload . batchId ?? null ,
2339+ batchId,
23222340 fromDbVersion : Number ( payload . fromDbVersion ?? 0 ) ,
23232341 toDbVersion : Number ( payload . toDbVersion ?? 0 ) ,
23242342 appliedDbVersion : args . db . sync . getDbVersion ( ) ,
@@ -2327,7 +2345,7 @@ export function createSyncHostService(args: SyncHostServiceArgs) {
23272345 } satisfies SyncChangesetAckPayload , envelope . requestId ) ;
23282346 } catch ( error ) {
23292347 sendRequired ( peer , "changeset_ack" , {
2330- batchId : payload . batchId ?? null ,
2348+ batchId,
23312349 fromDbVersion : Number ( payload . fromDbVersion ?? 0 ) ,
23322350 toDbVersion : Number ( payload . toDbVersion ?? 0 ) ,
23332351 appliedDbVersion : args . db . sync . getDbVersion ( ) ,
0 commit comments