diff --git a/.changeset/eleven-garlics-work.md b/.changeset/eleven-garlics-work.md new file mode 100644 index 00000000..8f6950c5 --- /dev/null +++ b/.changeset/eleven-garlics-work.md @@ -0,0 +1,6 @@ +--- +'@tanstack/devtools-event-client': patch +'@tanstack/devtools-event-bus': patch +--- + +add queued events to event bus diff --git a/examples/react/start/src/plugin.ts b/examples/react/start/src/plugin.ts index 18b3f2fc..11a36753 100644 --- a/examples/react/start/src/plugin.ts +++ b/examples/react/start/src/plugin.ts @@ -19,8 +19,14 @@ class QueryDevtoolsClient extends EventClient { constructor() { super({ pluginId: 'query-devtools', + debug: true, }) } } export const queryPlugin = new QueryDevtoolsClient() +// this should be queued and emitted when bus is available +queryPlugin.emit('test', { + title: 'Query Devtools', + description: 'A plugin for query debugging', +}) diff --git a/packages/event-bus-client/src/plugin.ts b/packages/event-bus-client/src/plugin.ts index 192cb0dd..d914a762 100644 --- a/packages/event-bus-client/src/plugin.ts +++ b/packages/event-bus-client/src/plugin.ts @@ -23,6 +23,37 @@ export class EventClient< #pluginId: TPluginId #eventTarget: () => EventTarget #debug: boolean + #queuedEvents: Array> + #connected: boolean + #connectIntervalId: number | null + #connectEveryMs: number + #retryCount = 0 + #maxRetries = 5 + #onConnected = () => { + this.debugLog('Connected to event bus') + this.#connected = true + this.debugLog('Emitting queued events', this.#queuedEvents) + this.#queuedEvents.forEach((event) => this.emitEventToBus(event)) + this.#queuedEvents = [] + this.stopConnectLoop() + this.#eventTarget().removeEventListener( + 'tanstack-connect-success', + this.#onConnected, + ) + } + #connectFunction = () => { + if (this.#retryCount < this.#maxRetries) { + this.#retryCount++ + this.#eventTarget().dispatchEvent(new CustomEvent('tanstack-connect')) + return + } + this.#eventTarget().removeEventListener( + 'tanstack-connect', + this.#connectFunction, + ) + this.debugLog('Max retries reached, giving up on connection') + this.stopConnectLoop() + } constructor({ pluginId, @@ -35,6 +66,36 @@ export class EventClient< this.#eventTarget = this.getGlobalTarget this.#debug = debug this.debugLog(' Initializing event subscription for plugin', this.#pluginId) + this.#queuedEvents = [] + this.#connected = false + this.#connectIntervalId = null + this.#connectEveryMs = 500 + + this.#eventTarget().addEventListener( + 'tanstack-connect-success', + this.#onConnected, + ) + this.#connectFunction() + this.startConnectLoop() + } + + private startConnectLoop() { + if (this.#connectIntervalId !== null || this.#connected) return + this.debugLog(`Starting connect loop (every ${this.#connectEveryMs}ms)`) + + this.#connectIntervalId = setInterval( + this.#connectFunction, + this.#connectEveryMs, + ) as unknown as number + } + + private stopConnectLoop() { + if (this.#connectIntervalId === null) { + return + } + clearInterval(this.#connectIntervalId) + this.#connectIntervalId = null + this.debugLog('Stopped connect loop') } private debugLog(...args: Array) { @@ -84,7 +145,17 @@ export class EventClient< eventSuffix: TSuffix, payload: TEventMap[`${TPluginId & string}:${TSuffix}`], ) { - this.emitEventToBus({ + // wait to connect to the bus + if (!this.#connected) { + this.debugLog('Bus not available, will be pushed as soon as connected') + return this.#queuedEvents.push({ + type: `${this.#pluginId}:${eventSuffix}`, + payload, + pluginId: this.#pluginId, + }) + } + // emit right now + return this.emitEventToBus({ type: `${this.#pluginId}:${eventSuffix}`, payload, pluginId: this.#pluginId, diff --git a/packages/event-bus-client/tests/index.test.ts b/packages/event-bus-client/tests/index.test.ts index 37eb501c..048ffa4a 100644 --- a/packages/event-bus-client/tests/index.test.ts +++ b/packages/event-bus-client/tests/index.test.ts @@ -1,12 +1,12 @@ import { describe, expect, it, vi } from 'vitest' -import { EventClient } from '../src' import { ClientEventBus } from '@tanstack/devtools-event-bus/client' +import { EventClient } from '../src' // start the client bus for testing -new ClientEventBus().start() +const bus = new ClientEventBus() +bus.start() // client bus uses window to dispatch events const clientBusEmitTarget = window - describe('EventClient', () => { describe('debug config', () => { it('should emit logs when debug set to true and have the correct plugin name', () => { @@ -35,7 +35,19 @@ describe('EventClient', () => { describe('getGlobalTarget', () => { it('if the global target is set it should re-use it for emitting/listening/removing of events', () => { const target = new EventTarget() - globalThis.__TANSTACK_EVENT_TARGET__ = target + const handleSuccessConnection = vi.fn() + target.addEventListener('tanstack-connect', () => { + target.dispatchEvent(new CustomEvent('tanstack-connect-success')) + }) + globalThis.__TANSTACK_EVENT_TARGET__ = null + + vi.spyOn( + globalThis, + '__TANSTACK_EVENT_TARGET__', + 'get', + ).mockImplementation(() => { + return target + }) const client = new EventClient({ debug: false, pluginId: 'test', @@ -55,9 +67,9 @@ describe('EventClient', () => { expect.any(String), expect.any(Function), ) - globalThis.__TANSTACK_EVENT_TARGET__ = null + vi.resetAllMocks() + target.removeEventListener('tanstack-connect', handleSuccessConnection) }) - it('should use the window object if the globalTarget is not set for emitting/listening/removing of events', () => { const target = window const client = new EventClient({ @@ -183,6 +195,27 @@ describe('EventClient', () => { }) }) + describe('queued events', () => { + it('emits queued events when connected to the event bus', async () => { + bus.stop() + const client = new EventClient({ + debug: false, + pluginId: 'test', + }) + const eventHandler = vi.fn() + client.on('event', eventHandler) + client.emit('event', { foo: 'bar' }) + + bus.start() + // wait to connect to the bus + await new Promise((resolve) => setTimeout(resolve, 500)) + expect(eventHandler).toHaveBeenCalledWith({ + type: 'test:event', + payload: { foo: 'bar' }, + pluginId: 'test', + }) + }) + }) describe('onAllPluginEvents', () => { it('should listen to all events that come from the plugin', () => { const client = new EventClient({ diff --git a/packages/event-bus/src/client/client.ts b/packages/event-bus/src/client/client.ts index 0695b781..393522ce 100644 --- a/packages/event-bus/src/client/client.ts +++ b/packages/event-bus/src/client/client.ts @@ -30,11 +30,18 @@ export class ClientEventBus { #eventTarget: EventTarget #debug: boolean #connectToServerBus: boolean + #dispatcher = (e: Event) => { const event = (e as CustomEvent).detail this.emitToServer(event) this.emitToClients(event) } + #connectFunction = () => { + this.debugLog( + 'Connection request made to event-bus, replying back with success', + ) + this.#eventTarget.dispatchEvent(new CustomEvent('tanstack-connect-success')) + } constructor({ port = 42069, debug = false, @@ -46,6 +53,7 @@ export class ClientEventBus { this.#socket = null this.#connectToServerBus = connectToServerBus this.#eventTarget = this.getGlobalTarget() + this.debugLog('Initializing client event bus') } @@ -91,6 +99,10 @@ export class ClientEventBus { 'tanstack-dispatch-event', this.#dispatcher, ) + this.#eventTarget.addEventListener( + 'tanstack-connect', + this.#connectFunction, + ) } stop() { this.debugLog('Stopping client event bus') @@ -101,6 +113,10 @@ export class ClientEventBus { 'tanstack-dispatch-event', this.#dispatcher, ) + this.#eventTarget.removeEventListener( + 'tanstack-connect', + this.#connectFunction, + ) this.#eventSource?.close() this.#socket?.close() this.#socket = null diff --git a/packages/event-bus/src/server/server.ts b/packages/event-bus/src/server/server.ts index 932381f5..719c94f6 100644 --- a/packages/event-bus/src/server/server.ts +++ b/packages/event-bus/src/server/server.ts @@ -33,7 +33,9 @@ export class ServerEventBus { this.debugLog('Dispatching event from dispatcher, forwarding', event) this.emit(event) } - + #connectFunction = () => { + this.#eventTarget.dispatchEvent(new CustomEvent('tanstack-connect-success')) + } constructor({ port = 42069, debug = false } = {}) { this.#port = port this.#eventTarget = globalThis.__EVENT_TARGET__ ?? new EventTarget() @@ -165,6 +167,10 @@ export class ServerEventBus { 'tanstack-dispatch-event', this.#dispatcher, ) + this.#eventTarget.addEventListener( + 'tanstack-connect', + this.#connectFunction, + ) this.handleNewConnection(wss) // Handle connection upgrade for WebSocket @@ -200,6 +206,10 @@ export class ServerEventBus { 'tanstack-dispatch-event', this.#dispatcher, ) + this.#eventTarget.removeEventListener( + 'tanstack-connect', + this.#connectFunction, + ) this.debugLog('[tanstack-devtools] All connections cleared') } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 99a27403..22bec3af 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -240,6 +240,8 @@ importers: specifier: ^4.2.4 version: 4.2.4 + examples/react/start/generated/prisma: {} + examples/solid/basic: dependencies: '@tanstack/solid-devtools':