From 1fea7d57257029e72b84d87d8cf5b47757463e0e Mon Sep 17 00:00:00 2001 From: Sunita Prajapati Date: Wed, 7 May 2025 13:19:25 +0530 Subject: [PATCH 1/5] feat: expose methods to clear queue and get event count --- packages/core/src/analytics.ts | 25 +++++++++++++++++++ packages/core/src/client.tsx | 2 ++ packages/core/src/plugin.ts | 9 +++++++ .../core/src/plugins/QueueFlushingPlugin.ts | 22 ++++++++++++++++ .../core/src/plugins/SegmentDestination.ts | 9 +++++++ packages/core/src/types.ts | 2 ++ 6 files changed, 69 insertions(+) diff --git a/packages/core/src/analytics.ts b/packages/core/src/analytics.ts index e6caf7816..23367bb74 100644 --- a/packages/core/src/analytics.ts +++ b/packages/core/src/analytics.ts @@ -982,4 +982,29 @@ export class SegmentClient { userId: userInfo.userId, }; }; + /* Method for clearing flush queue */ + clearFlushQueue() { + const plugins = this.getPlugins(); + plugins.forEach(async (plugin) => { + if (plugin.type === PluginType.destination) { + await plugin?.clearFlushQueue(); + this.flushPolicyExecuter.reset(); + } + }); + } + + /** + * Method to get count of events in flush queue. + */ + async getFlushQueueCount() { + const plugins = this.getPlugins(); + let totalEventsCount = 0; + for (let i = 0; i <= plugins.length; i++) { + if (plugins[i]?.type === PluginType.destination) { + const eventsCount = await plugins[i]?.getQueueCount(); + totalEventsCount += eventsCount; + } + } + return totalEventsCount; + } } diff --git a/packages/core/src/client.tsx b/packages/core/src/client.tsx index 4acae0024..0c3f7fd1b 100644 --- a/packages/core/src/client.tsx +++ b/packages/core/src/client.tsx @@ -71,6 +71,8 @@ export const useAnalytics = (): ClientMethods => { group: async (...args) => client?.group(...args), alias: async (...args) => client?.alias(...args), reset: async (...args) => client?.reset(...args), + clearFlushQueue: async () => client?.clearFlushQueue(), + getFlushQueueCount: async () => client?.getFlushQueueCount(), }; }, [client]); }; diff --git a/packages/core/src/plugin.ts b/packages/core/src/plugin.ts index d0328d19f..ab8ce33d0 100644 --- a/packages/core/src/plugin.ts +++ b/packages/core/src/plugin.ts @@ -36,6 +36,15 @@ export class Plugin { shutdown() { // do nothing by default, user can override. } + + async clearFlushQueue() { + // Overridden in Segment Destination + } + + async getQueueCount() { + // Overridden in Segment Destination + return 0; + } } export class EventPlugin extends Plugin { diff --git a/packages/core/src/plugins/QueueFlushingPlugin.ts b/packages/core/src/plugins/QueueFlushingPlugin.ts index 4ca749aac..feb8229f6 100644 --- a/packages/core/src/plugins/QueueFlushingPlugin.ts +++ b/packages/core/src/plugins/QueueFlushingPlugin.ts @@ -130,4 +130,26 @@ export class QueueFlushingPlugin extends UtilityPlugin { return { events: filteredEvents }; }); } + /** + * Clear all events from the queue + */ + async clearQueue() { + await this.queueStore?.dispatch(() => { + return { events: [] }; + }); + } + + /** + * * Returns the count of items in the queue + */ + async getQueueCount() { + const state = await this.queueStore?.getState(); + // const eventsCount = state?.events.length || 0; + // return eventsCount; + if (!state || !Array.isArray(state.events)) { + return 0; + } + const count = state.events.length; + return Number.isFinite(count) ? count : 0; + } } diff --git a/packages/core/src/plugins/SegmentDestination.ts b/packages/core/src/plugins/SegmentDestination.ts index cc7e911e6..210a1f4df 100644 --- a/packages/core/src/plugins/SegmentDestination.ts +++ b/packages/core/src/plugins/SegmentDestination.ts @@ -150,4 +150,13 @@ export class SegmentDestination extends DestinationPlugin { // Wait until the queue is done restoring before flushing return this.queuePlugin.flush(); } + async clearFlushQueue() { + //Wait until clearing current Flush queue + return this.queuePlugin.clearQueue(); + } + async getQueueCount() { + // Wait until getting the count of queue + const eventsCount = await this.queuePlugin.getQueueCount(); + return eventsCount; + } } diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index b0d4e9570..770169010 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -177,6 +177,8 @@ export type ClientMethods = { ) => Promise; alias: (newUserId: string, enrichment?: EnrichmentClosure) => Promise; reset: (resetAnonymousId?: boolean) => Promise; + clearFlushQueue: () => Promise; + getFlushQueueCount: () => Promise; }; type ContextApp = { From d81dccbea8f08a73919f35edec617b360d3e7f45 Mon Sep 17 00:00:00 2001 From: Sunita Prajapati Date: Thu, 8 May 2025 12:53:36 +0530 Subject: [PATCH 2/5] feat: added test cases for clear queue and get count --- .../__tests__/QueueFlushingPlugin.test.ts | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts b/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts index 81eabac7b..7a2246eac 100644 --- a/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts +++ b/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts @@ -77,4 +77,67 @@ describe('QueueFlushingPlugin', () => { // @ts-ignore expect(queuePlugin.queueStore?.getState().events).toHaveLength(0); }); + it('should clear all events from the queue', async () => { + const onFlush = jest.fn().mockResolvedValue(undefined); + const queuePlugin = setupQueuePlugin(onFlush, 10); + const event1: SegmentEvent = { + type: EventType.TrackEvent, + event: 'test1', + properties: { + test: 'test1', + }, + }; + const event2: SegmentEvent = { + type: EventType.TrackEvent, + event: 'test2', + properties: { + test: 'test2', + }, + }; + await queuePlugin.execute(event1); + await queuePlugin.execute(event2); + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + expect(queuePlugin.queueStore?.getState().events).toHaveLength(2); + await queuePlugin.clearQueue(); + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + expect(queuePlugin.queueStore?.getState().events).toHaveLength(0); + }); + it('should return the count of items in the queue', async () => { + const onFlush = jest.fn().mockResolvedValue(undefined); + const queuePlugin = setupQueuePlugin(onFlush, 10); + + const event1: SegmentEvent = { + type: EventType.TrackEvent, + event: 'test1', + properties: { + test: 'test1', + }, + }; + + const event2: SegmentEvent = { + type: EventType.TrackEvent, + event: 'test2', + properties: { + test: 'test2', + }, + }; + + await queuePlugin.execute(event1); + await queuePlugin.execute(event2); + + let eventsCount = await queuePlugin.getQueueCount(); + expect(eventsCount).toBe(2); + + await queuePlugin.dequeue(event1); + + eventsCount = await queuePlugin.getQueueCount(); + expect(eventsCount).toBe(1); + + await queuePlugin.clearQueue(); + + eventsCount = await queuePlugin.getQueueCount(); + expect(eventsCount).toBe(0); + }); }); From 0aff0b95c534285fbca40bdda673bc9e8894d23a Mon Sep 17 00:00:00 2001 From: Sunita Prajapati Date: Mon, 12 May 2025 12:31:18 +0530 Subject: [PATCH 3/5] refactor: refactored method names and some logic --- packages/core/src/analytics.ts | 8 ++++---- packages/core/src/client.tsx | 2 -- packages/core/src/plugin.ts | 4 ++-- packages/core/src/plugins/QueueFlushingPlugin.ts | 14 ++++---------- packages/core/src/plugins/SegmentDestination.ts | 8 ++++---- packages/core/src/storage/types.ts | 1 - packages/core/src/types.ts | 2 -- 7 files changed, 14 insertions(+), 25 deletions(-) diff --git a/packages/core/src/analytics.ts b/packages/core/src/analytics.ts index 23367bb74..0f334f70e 100644 --- a/packages/core/src/analytics.ts +++ b/packages/core/src/analytics.ts @@ -983,11 +983,11 @@ export class SegmentClient { }; }; /* Method for clearing flush queue */ - clearFlushQueue() { + clear() { const plugins = this.getPlugins(); plugins.forEach(async (plugin) => { if (plugin.type === PluginType.destination) { - await plugin?.clearFlushQueue(); + await plugin?.clear(); this.flushPolicyExecuter.reset(); } }); @@ -996,12 +996,12 @@ export class SegmentClient { /** * Method to get count of events in flush queue. */ - async getFlushQueueCount() { + async pendingEvents() { const plugins = this.getPlugins(); let totalEventsCount = 0; for (let i = 0; i <= plugins.length; i++) { if (plugins[i]?.type === PluginType.destination) { - const eventsCount = await plugins[i]?.getQueueCount(); + const eventsCount = await plugins[i]?.pendingEvents(); totalEventsCount += eventsCount; } } diff --git a/packages/core/src/client.tsx b/packages/core/src/client.tsx index 0c3f7fd1b..4acae0024 100644 --- a/packages/core/src/client.tsx +++ b/packages/core/src/client.tsx @@ -71,8 +71,6 @@ export const useAnalytics = (): ClientMethods => { group: async (...args) => client?.group(...args), alias: async (...args) => client?.alias(...args), reset: async (...args) => client?.reset(...args), - clearFlushQueue: async () => client?.clearFlushQueue(), - getFlushQueueCount: async () => client?.getFlushQueueCount(), }; }, [client]); }; diff --git a/packages/core/src/plugin.ts b/packages/core/src/plugin.ts index ab8ce33d0..128bae49c 100644 --- a/packages/core/src/plugin.ts +++ b/packages/core/src/plugin.ts @@ -37,11 +37,11 @@ export class Plugin { // do nothing by default, user can override. } - async clearFlushQueue() { + async clear() { // Overridden in Segment Destination } - async getQueueCount() { + async pendingEvents() { // Overridden in Segment Destination return 0; } diff --git a/packages/core/src/plugins/QueueFlushingPlugin.ts b/packages/core/src/plugins/QueueFlushingPlugin.ts index feb8229f6..1580ee288 100644 --- a/packages/core/src/plugins/QueueFlushingPlugin.ts +++ b/packages/core/src/plugins/QueueFlushingPlugin.ts @@ -133,7 +133,7 @@ export class QueueFlushingPlugin extends UtilityPlugin { /** * Clear all events from the queue */ - async clearQueue() { + async dequeueEvents() { await this.queueStore?.dispatch(() => { return { events: [] }; }); @@ -142,14 +142,8 @@ export class QueueFlushingPlugin extends UtilityPlugin { /** * * Returns the count of items in the queue */ - async getQueueCount() { - const state = await this.queueStore?.getState(); - // const eventsCount = state?.events.length || 0; - // return eventsCount; - if (!state || !Array.isArray(state.events)) { - return 0; - } - const count = state.events.length; - return Number.isFinite(count) ? count : 0; + async pendingEvents() { + const events = (await this.queueStore?.getState(true))?.events ?? []; + return events.length; } } diff --git a/packages/core/src/plugins/SegmentDestination.ts b/packages/core/src/plugins/SegmentDestination.ts index 210a1f4df..d2986f4a3 100644 --- a/packages/core/src/plugins/SegmentDestination.ts +++ b/packages/core/src/plugins/SegmentDestination.ts @@ -150,13 +150,13 @@ export class SegmentDestination extends DestinationPlugin { // Wait until the queue is done restoring before flushing return this.queuePlugin.flush(); } - async clearFlushQueue() { + async clear() { //Wait until clearing current Flush queue - return this.queuePlugin.clearQueue(); + return this.queuePlugin.dequeueEvents(); } - async getQueueCount() { + async pendingEvents() { // Wait until getting the count of queue - const eventsCount = await this.queuePlugin.getQueueCount(); + const eventsCount = await this.queuePlugin.pendingEvents(); return eventsCount; } } diff --git a/packages/core/src/storage/types.ts b/packages/core/src/storage/types.ts index 162398ee6..91955b715 100644 --- a/packages/core/src/storage/types.ts +++ b/packages/core/src/storage/types.ts @@ -92,7 +92,6 @@ export interface Storage { readonly pendingEvents: Watchable & Settable & Queue; - readonly enabled: Watchable & Settable; } export type DeepLinkData = { diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index 770169010..b0d4e9570 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -177,8 +177,6 @@ export type ClientMethods = { ) => Promise; alias: (newUserId: string, enrichment?: EnrichmentClosure) => Promise; reset: (resetAnonymousId?: boolean) => Promise; - clearFlushQueue: () => Promise; - getFlushQueueCount: () => Promise; }; type ContextApp = { From c8cd7a07cbe51e1f17933f1190b9cb6c5a008f89 Mon Sep 17 00:00:00 2001 From: Sunita Prajapati Date: Tue, 13 May 2025 13:46:05 +0530 Subject: [PATCH 4/5] fix: fixed test cases --- .../src/plugins/__tests__/QueueFlushingPlugin.test.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts b/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts index 7a2246eac..b3b02d802 100644 --- a/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts +++ b/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts @@ -99,7 +99,7 @@ describe('QueueFlushingPlugin', () => { // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore expect(queuePlugin.queueStore?.getState().events).toHaveLength(2); - await queuePlugin.clearQueue(); + await queuePlugin.dequeueEvents(); // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore expect(queuePlugin.queueStore?.getState().events).toHaveLength(0); @@ -127,17 +127,17 @@ describe('QueueFlushingPlugin', () => { await queuePlugin.execute(event1); await queuePlugin.execute(event2); - let eventsCount = await queuePlugin.getQueueCount(); + let eventsCount = await queuePlugin.pendingEvents(); expect(eventsCount).toBe(2); await queuePlugin.dequeue(event1); - eventsCount = await queuePlugin.getQueueCount(); + eventsCount = await queuePlugin.pendingEvents(); expect(eventsCount).toBe(1); - await queuePlugin.clearQueue(); + await queuePlugin.dequeueEvents(); - eventsCount = await queuePlugin.getQueueCount(); + eventsCount = await queuePlugin.pendingEvents(); expect(eventsCount).toBe(0); }); }); From eee758245ed63b42cb32d0db7be83f634d3e7ab1 Mon Sep 17 00:00:00 2001 From: Sunita Prajapati Date: Thu, 15 May 2025 13:03:13 +0530 Subject: [PATCH 5/5] feat: changed the logic to touch minimum file changes --- packages/core/src/analytics.ts | 32 +++++++++++++++---- packages/core/src/plugin.ts | 9 ------ .../core/src/plugins/SegmentDestination.ts | 9 ------ 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/packages/core/src/analytics.ts b/packages/core/src/analytics.ts index 0f334f70e..9f4ec7315 100644 --- a/packages/core/src/analytics.ts +++ b/packages/core/src/analytics.ts @@ -71,6 +71,7 @@ import { SegmentError, translateHTTPError, } from './errors'; +import { QueueFlushingPlugin } from './plugins/QueueFlushingPlugin'; type OnPluginAddedCallback = (plugin: Plugin) => void; @@ -985,12 +986,20 @@ export class SegmentClient { /* Method for clearing flush queue */ clear() { const plugins = this.getPlugins(); + plugins.forEach(async (plugin) => { - if (plugin.type === PluginType.destination) { - await plugin?.clear(); - this.flushPolicyExecuter.reset(); + if (plugin instanceof SegmentDestination) { + const timelinePlugins = plugin.timeline?.plugins?.after ?? []; + + for (const subPlugin of timelinePlugins) { + if (subPlugin instanceof QueueFlushingPlugin) { + await subPlugin.dequeueEvents(); + } + } } }); + + this.flushPolicyExecuter.reset(); } /** @@ -999,12 +1008,21 @@ export class SegmentClient { async pendingEvents() { const plugins = this.getPlugins(); let totalEventsCount = 0; - for (let i = 0; i <= plugins.length; i++) { - if (plugins[i]?.type === PluginType.destination) { - const eventsCount = await plugins[i]?.pendingEvents(); - totalEventsCount += eventsCount; + + for (const plugin of plugins) { + // We're looking inside SegmentDestination's `after` plugins + if (plugin instanceof SegmentDestination) { + const timelinePlugins = plugin.timeline?.plugins?.after ?? []; + + for (const subPlugin of timelinePlugins) { + if (subPlugin instanceof QueueFlushingPlugin) { + const eventsCount = await subPlugin.pendingEvents(); + totalEventsCount += eventsCount; + } + } } } + return totalEventsCount; } } diff --git a/packages/core/src/plugin.ts b/packages/core/src/plugin.ts index 128bae49c..d0328d19f 100644 --- a/packages/core/src/plugin.ts +++ b/packages/core/src/plugin.ts @@ -36,15 +36,6 @@ export class Plugin { shutdown() { // do nothing by default, user can override. } - - async clear() { - // Overridden in Segment Destination - } - - async pendingEvents() { - // Overridden in Segment Destination - return 0; - } } export class EventPlugin extends Plugin { diff --git a/packages/core/src/plugins/SegmentDestination.ts b/packages/core/src/plugins/SegmentDestination.ts index d2986f4a3..cc7e911e6 100644 --- a/packages/core/src/plugins/SegmentDestination.ts +++ b/packages/core/src/plugins/SegmentDestination.ts @@ -150,13 +150,4 @@ export class SegmentDestination extends DestinationPlugin { // Wait until the queue is done restoring before flushing return this.queuePlugin.flush(); } - async clear() { - //Wait until clearing current Flush queue - return this.queuePlugin.dequeueEvents(); - } - async pendingEvents() { - // Wait until getting the count of queue - const eventsCount = await this.queuePlugin.pendingEvents(); - return eventsCount; - } }