Skip to content

Commit ba3cf1c

Browse files
committed
Handle DB duplicate key without recursion
Replace the previous recursive reprocessing on database duplicate-key errors with a safer flow: log the duplicate, invalidate the event cache, wait briefly, fetch the event inserted by the competing worker and treat it as a repetition. If the event cannot be read back, raise a DatabaseReadWriteError. This avoids recursive calls to handle(), ensures fresh DB reads (cache coherence), and adds logging for diagnostics; repetition processing is resumed using the fetched event.
1 parent 1ce34ea commit ba3cf1c

1 file changed

Lines changed: 34 additions & 4 deletions

File tree

workers/grouper/src/index.ts

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,17 +223,47 @@ export default class GrouperWorker extends Worker {
223223
} catch (e) {
224224
/**
225225
* If we caught Database duplication error, then another worker thread has already saved it to the database
226-
* and we need to process this event as repetition
226+
* Clear the cache and fetch the event that was just inserted, then process it as a repetition
227227
*/
228228
if (e.code?.toString() === DB_DUPLICATE_KEY_ERROR) {
229-
await this.handle(task);
229+
this.logger.info(`[handle] Duplicate key detected for groupHash=${uniqueEventHash}, fetching created event as repetition`);
230230

231-
return;
231+
const eventCacheKey = await this.getEventCacheKey(task.projectId, uniqueEventHash);
232+
233+
/**
234+
* Invalidate cache to force fresh fetch from database
235+
*/
236+
this.cache.del(eventCacheKey);
237+
238+
/**
239+
* Fetch the event that was just inserted by the competing worker
240+
* Add small delay to ensure the event is persisted
241+
*/
242+
await new Promise(resolve => setTimeout(resolve, 10));
243+
244+
existedEvent = await this.getEvent(task.projectId, uniqueEventHash);
245+
246+
if (!existedEvent) {
247+
this.logger.error(`[handle] Event not found after duplicate key error for groupHash=${uniqueEventHash}`);
248+
throw new DatabaseReadWriteError('Event not found after duplicate key error');
249+
}
250+
251+
this.logger.info(`[handle] Successfully fetched event after duplicate key for groupHash=${uniqueEventHash}`);
252+
253+
/**
254+
* Now continue processing as if this was not the first occurrence
255+
* This avoids recursion and properly handles the event as a repetition
256+
*/
232257
} else {
233258
throw e;
234259
}
235260
}
236-
} else {
261+
}
262+
263+
/**
264+
* Handle repetition processing when duplicate key was detected
265+
*/
266+
if (!isFirstOccurrence && existedEvent) {
237267
const [incrementAffectedUsers, shouldIncrementDailyAffectedUsers] = await this.shouldIncrementAffectedUsers(task, existedEvent);
238268

239269
incrementDailyAffectedUsers = shouldIncrementDailyAffectedUsers;

0 commit comments

Comments
 (0)