diff --git a/emitter-sync-pattern.ts b/emitter-sync-pattern.ts index ff0dafa..228578a 100644 --- a/emitter-sync-pattern.ts +++ b/emitter-sync-pattern.ts @@ -1,7 +1,7 @@ /* Check the comments first */ import { EventEmitter } from "./emitter"; -import { EventDelayedRepository } from "./event-repository"; +import { EventDelayedRepository, EventRepositoryError } from "./event-repository"; import { EventStatistics } from "./event-statistics"; import { ResultsTester } from "./results-tester"; import { triggerRandomly } from "./utils"; @@ -59,29 +59,145 @@ function init() { */ class EventHandler extends EventStatistics { - // Feel free to edit this class - repository: EventRepository; constructor(emitter: EventEmitter, repository: EventRepository) { super(); this.repository = repository; - emitter.subscribe(EventName.EventA, () => - this.repository.saveEventData(EventName.EventA, 1) - ); + EVENT_NAMES.forEach((eventName) => { + emitter.subscribe(eventName, () => { + const currentLocalValue = this.getStats(eventName) + 1; + + this.setStats(eventName, currentLocalValue) + + this.repository.saveEventData(eventName, currentLocalValue) + }); + }) } } +class Throttler { + private func: (...args: unknown[]) => void; + private limit: number; + private lastFunc: ReturnType | null = null; + private lastRan: number | null = null; + + constructor(func: (...args: unknown[]) => void, limit: number) { + this.func = func; + this.limit = limit; + } + + public throttle(...args: unknown[]): void { + const context = this; + + if (this.lastRan === null) { + this.func.apply(context, args); + this.lastRan = Date.now(); + } else { + if (this.lastFunc !== null) { + clearTimeout(this.lastFunc); + } + + this.lastFunc = setTimeout(() => { + if ((Date.now() - (context.lastRan as number)) >= context.limit) { + context.func.apply(context, args); + context.lastRan = Date.now(); + } + }, this.limit - (Date.now() - (this.lastRan as number))); + } + } +} + +type TUpdatingThread = { + previousSyncedValue: number; + isUpdating: boolean; +}; + +const threadNotFoundError = new Error('Thread not found'); +const REQUEST_REMOTE_UPDATE_THROTTLE = 150; + class EventRepository extends EventDelayedRepository { - // Feel free to edit this class - - async saveEventData(eventName: EventName, _: number) { - try { - await this.updateEventStatsBy(eventName, 1); - } catch (e) { - // const _error = e as EventRepositoryError; - // console.warn(error); + /* + I split the synchronization of events into threads so that updating one does not block the other. + Blocking the threads is needed to avoid race conditions. + */ + eventUpdatingThreads = new Map(); + requestRemoteUpdateTrhrottled: Throttler + + constructor() { + super(); + + this.requestRemoteUpdateTrhrottled = new Throttler(this.requestRemoteUpdate, REQUEST_REMOTE_UPDATE_THROTTLE); + } + + public saveEventData = async (eventName: EventName, currentLocalValue: number) => { + this.requestRemoteUpdateTrhrottled.throttle(eventName, currentLocalValue); + } + + private lockThread = (eventName: EventName) => { + const thread = this.eventUpdatingThreads.get(eventName) + + if(!thread) { + throw threadNotFoundError + } + + this.eventUpdatingThreads.set(eventName, { + ...thread, + isUpdating: true + }) + } + + private unlockThread = (eventName: EventName) => { + const thread = this.eventUpdatingThreads.get(eventName) + + if(!thread) { + throw threadNotFoundError + } + + this.eventUpdatingThreads.set(eventName, { + ...thread, + isUpdating: false + }) + } + + private unlockAndUpdateThread = (eventName: EventName, currentSyncedValue: number) => { + this.eventUpdatingThreads.set(eventName, { + previousSyncedValue: currentSyncedValue, + isUpdating: false + }) + } + + private requestRemoteUpdate = async (eventName: EventName, currentLocalValue: number) => { + if(!this.eventUpdatingThreads.has(eventName)) { + this.eventUpdatingThreads.set(eventName, { + previousSyncedValue: 0, + isUpdating: false + }) + } + + const { isUpdating, previousSyncedValue } = this.eventUpdatingThreads.get(eventName) as TUpdatingThread + + if (isUpdating) { + return; + } + + try { + this.lockThread(eventName); + + await this.updateEventStatsBy(eventName, currentLocalValue - previousSyncedValue) + + this.unlockAndUpdateThread(eventName, currentLocalValue) + } catch (error) { + if(error.message === threadNotFoundError.message){ + throw error + } + + if (error === EventRepositoryError.RESPONSE_FAIL) { + this.unlockAndUpdateThread(eventName, currentLocalValue) + } else { + this.unlockThread(eventName) + } } } } diff --git a/event-repository.ts b/event-repository.ts index 64b7a6f..7d61dfa 100644 --- a/event-repository.ts +++ b/event-repository.ts @@ -14,7 +14,7 @@ import { awaitTimeout, randomTo } from "./utils"; const EVENT_SAVE_DELAY_MS = 3 * 100; -enum EventRepositoryError { +export enum EventRepositoryError { TOO_MANY = "Too many requests", RESPONSE_FAIL = "Response delivery fail", REQUEST_FAIL = "Request fail",