@@ -16,6 +16,7 @@ const MAX_RETRIES = 3
1616const DB_FETCH_SIZE = 2000
1717const WRITE_FLUSH_SIZE = 500
1818const WRITE_FLUSH_MS = 5000
19+ const MAX_FLUSH_FAILURES = 3
1920
2021// ─── Token selection ─────────────────────────────────────────────────────────
2122
@@ -94,6 +95,7 @@ class WriteBuffer {
9495 private skipUrls : string [ ] = [ ]
9596 private lastFlushAt = Date . now ( )
9697 private flushing = false
98+ private flushFailures = 0
9799
98100 constructor ( private readonly qx : QueryExecutor ) { }
99101
@@ -118,26 +120,56 @@ class WriteBuffer {
118120 )
119121 }
120122
123+ private clearBatch ( resultCount : number , snapshotCount : number , skipCount : number ) : void {
124+ this . results . splice ( 0 , resultCount )
125+ this . snapshots . splice ( 0 , snapshotCount )
126+ this . skipUrls . splice ( 0 , skipCount )
127+ this . flushFailures = 0
128+ }
129+
121130 async flush ( ) : Promise < number > {
131+ this . lastFlushAt = Date . now ( )
132+ if ( this . results . length === 0 && this . snapshots . length === 0 && this . skipUrls . length === 0 ) {
133+ return 0
134+ }
135+
122136 const batch = [ ...this . results ]
123137 const snapshotBatch = [ ...this . snapshots ]
124138 const skips = [ ...this . skipUrls ]
125- this . lastFlushAt = Date . now ( )
126139 this . flushing = true
127140 try {
128- await Promise . all ( [
129- bulkUpdateEnrichedRepos ( this . qx , batch ) ,
130- bulkUpsertRepoActivitySnapshot ( this . qx , snapshotBatch ) ,
131- markReposSkipped ( this . qx , skips ) ,
132- ] )
133- // Clear only after all writes succeed — preserves items if the DB call throws
134- this . results . splice ( 0 , batch . length )
135- this . snapshots . splice ( 0 , snapshotBatch . length )
136- this . skipUrls . splice ( 0 , skips . length )
141+ // The snapshot upsert also updates repos rows — run in one transaction to avoid
142+ // deadlocking with bulkUpdateEnrichedRepos on overlapping rows (40P01)
143+ await this . qx . tx ( async ( tx ) => {
144+ await bulkUpdateEnrichedRepos ( tx , batch )
145+ await markReposSkipped ( tx , skips )
146+ await bulkUpsertRepoActivitySnapshot ( tx , snapshotBatch )
147+ } )
148+ this . clearBatch ( batch . length , snapshotBatch . length , skips . length )
149+ return batch . length
150+ } catch ( err ) {
151+ this . flushFailures ++
152+ // Dropping is safe: the rolled-back repos stay unsynced, so the next sweep retries them
153+ const dropBatch = this . flushFailures >= MAX_FLUSH_FAILURES
154+ log . error (
155+ {
156+ errCode : ( err as { code ?: string } ) . code ,
157+ errName : ( err as Error ) . name ,
158+ errMsg : ( err as Error ) . message ,
159+ errStack : ( err as Error ) . stack ,
160+ flushFailures : this . flushFailures ,
161+ bufferedResults : this . results . length ,
162+ dropBatch,
163+ } ,
164+ dropBatch
165+ ? 'Flush failed repeatedly — dropping batch, repos will be re-enriched next sweep'
166+ : 'Flush failed — will retry on next cycle' ,
167+ )
168+ if ( dropBatch ) this . clearBatch ( batch . length , snapshotBatch . length , skips . length )
169+ return 0
137170 } finally {
138171 this . flushing = false
139172 }
140- return batch . length
141173 }
142174}
143175
@@ -202,7 +234,7 @@ async function runStreamingPool(
202234 } )
203235 . catch ( ( err ) => {
204236 pendingFetch = null
205- log . warn ( { err } , 'DB batch fetch failed, will retry' )
237+ log . warn ( { errMsg : ( err as Error ) . message } , 'DB batch fetch failed, will retry' )
206238 } )
207239 }
208240
@@ -306,7 +338,15 @@ async function runStreamingPool(
306338 )
307339 queue . unshift ( row )
308340 } else {
309- log . error ( { url : row . url , err } , 'Unexpected error' )
341+ log . error (
342+ {
343+ url : row . url ,
344+ errName : ( err as Error ) . name ,
345+ errMsg : ( err as Error ) . message ,
346+ errStack : ( err as Error ) . stack ,
347+ } ,
348+ 'Unexpected error while enriching repo' ,
349+ )
310350 }
311351 }
312352
0 commit comments