Skip to content
Merged
6 changes: 6 additions & 0 deletions .changeset/eleven-garlics-work.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@tanstack/devtools-event-client': patch
'@tanstack/devtools-event-bus': patch
---

add queued events to event bus
6 changes: 6 additions & 0 deletions examples/react/start/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@ class QueryDevtoolsClient extends EventClient<EventMap> {
constructor() {
super({
pluginId: 'query-devtools',
debug: true,
})
}
}

export const queryPlugin = new QueryDevtoolsClient()
// this should be queued and emitted when bus is available
queryPlugin.emit('test', {
title: 'Query Devtools',
description: 'A plugin for query debugging',
})
73 changes: 72 additions & 1 deletion packages/event-bus-client/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,37 @@ export class EventClient<
#pluginId: TPluginId
#eventTarget: () => EventTarget
#debug: boolean
#queuedEvents: Array<TanStackDevtoolsEvent<string, any>>
#connected: boolean
#connectIntervalId: number | null
#connectEveryMs: number
#retryCount = 0
#maxRetries = 5
#onConnected = () => {
this.debugLog('Connected to event bus')
this.#connected = true
this.debugLog('Emitting queued events', this.#queuedEvents)
this.#queuedEvents.forEach((event) => this.emitEventToBus(event))
this.#queuedEvents = []
this.stopConnectLoop()
this.#eventTarget().removeEventListener(
'tanstack-connect-success',
this.#onConnected,
)
}
#connectFunction = () => {
if (this.#retryCount < this.#maxRetries) {
this.#retryCount++
this.#eventTarget().dispatchEvent(new CustomEvent('tanstack-connect'))
return
}
this.#eventTarget().removeEventListener(
'tanstack-connect',
this.#connectFunction,
)
this.debugLog('Max retries reached, giving up on connection')
this.stopConnectLoop()
}

constructor({
pluginId,
Expand All @@ -35,6 +66,36 @@ export class EventClient<
this.#eventTarget = this.getGlobalTarget
this.#debug = debug
this.debugLog(' Initializing event subscription for plugin', this.#pluginId)
this.#queuedEvents = []
this.#connected = false
this.#connectIntervalId = null
this.#connectEveryMs = 500

this.#eventTarget().addEventListener(
'tanstack-connect-success',
this.#onConnected,
)
this.#connectFunction()
this.startConnectLoop()
}

private startConnectLoop() {
if (this.#connectIntervalId !== null || this.#connected) return
this.debugLog(`Starting connect loop (every ${this.#connectEveryMs}ms)`)

this.#connectIntervalId = setInterval(
this.#connectFunction,
this.#connectEveryMs,
) as unknown as number
}

private stopConnectLoop() {
if (this.#connectIntervalId === null) {
return
}
clearInterval(this.#connectIntervalId)
this.#connectIntervalId = null
this.debugLog('Stopped connect loop')
}

private debugLog(...args: Array<any>) {
Expand Down Expand Up @@ -84,7 +145,17 @@ export class EventClient<
eventSuffix: TSuffix,
payload: TEventMap[`${TPluginId & string}:${TSuffix}`],
) {
this.emitEventToBus({
// wait to connect to the bus
if (!this.#connected) {
this.debugLog('Bus not available, will be pushed as soon as connected')
return this.#queuedEvents.push({
type: `${this.#pluginId}:${eventSuffix}`,
payload,
pluginId: this.#pluginId,
})
}
// emit right now
return this.emitEventToBus({
type: `${this.#pluginId}:${eventSuffix}`,
payload,
pluginId: this.#pluginId,
Expand Down
45 changes: 39 additions & 6 deletions packages/event-bus-client/tests/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { describe, expect, it, vi } from 'vitest'
import { EventClient } from '../src'
import { ClientEventBus } from '@tanstack/devtools-event-bus/client'
import { EventClient } from '../src'

// start the client bus for testing
new ClientEventBus().start()
const bus = new ClientEventBus()
bus.start()
// client bus uses window to dispatch events
const clientBusEmitTarget = window

describe('EventClient', () => {
describe('debug config', () => {
it('should emit logs when debug set to true and have the correct plugin name', () => {
Expand Down Expand Up @@ -35,7 +35,19 @@ describe('EventClient', () => {
describe('getGlobalTarget', () => {
it('if the global target is set it should re-use it for emitting/listening/removing of events', () => {
const target = new EventTarget()
globalThis.__TANSTACK_EVENT_TARGET__ = target
const handleSuccessConnection = vi.fn()
target.addEventListener('tanstack-connect', () => {
target.dispatchEvent(new CustomEvent('tanstack-connect-success'))
})
globalThis.__TANSTACK_EVENT_TARGET__ = null

vi.spyOn(
globalThis,
'__TANSTACK_EVENT_TARGET__',
'get',
).mockImplementation(() => {
return target
})
const client = new EventClient({
debug: false,
pluginId: 'test',
Expand All @@ -55,9 +67,9 @@ describe('EventClient', () => {
expect.any(String),
expect.any(Function),
)
globalThis.__TANSTACK_EVENT_TARGET__ = null
vi.resetAllMocks()
target.removeEventListener('tanstack-connect', handleSuccessConnection)
})

it('should use the window object if the globalTarget is not set for emitting/listening/removing of events', () => {
const target = window
const client = new EventClient({
Expand Down Expand Up @@ -183,6 +195,27 @@ describe('EventClient', () => {
})
})

describe('queued events', () => {
it('emits queued events when connected to the event bus', async () => {
bus.stop()
const client = new EventClient({
debug: false,
pluginId: 'test',
})
const eventHandler = vi.fn()
client.on('event', eventHandler)
client.emit('event', { foo: 'bar' })

bus.start()
// wait to connect to the bus
await new Promise((resolve) => setTimeout(resolve, 500))
expect(eventHandler).toHaveBeenCalledWith({
type: 'test:event',
payload: { foo: 'bar' },
pluginId: 'test',
})
})
})
describe('onAllPluginEvents', () => {
it('should listen to all events that come from the plugin', () => {
const client = new EventClient({
Expand Down
16 changes: 16 additions & 0 deletions packages/event-bus/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,18 @@ export class ClientEventBus {
#eventTarget: EventTarget
#debug: boolean
#connectToServerBus: boolean

#dispatcher = (e: Event) => {
const event = (e as CustomEvent).detail
this.emitToServer(event)
this.emitToClients(event)
}
#connectFunction = () => {
this.debugLog(
'Connection request made to event-bus, replying back with success',
)
this.#eventTarget.dispatchEvent(new CustomEvent('tanstack-connect-success'))
}
constructor({
port = 42069,
debug = false,
Expand All @@ -46,6 +53,7 @@ export class ClientEventBus {
this.#socket = null
this.#connectToServerBus = connectToServerBus
this.#eventTarget = this.getGlobalTarget()

this.debugLog('Initializing client event bus')
}

Expand Down Expand Up @@ -91,6 +99,10 @@ export class ClientEventBus {
'tanstack-dispatch-event',
this.#dispatcher,
)
this.#eventTarget.addEventListener(
'tanstack-connect',
this.#connectFunction,
)
}
stop() {
this.debugLog('Stopping client event bus')
Expand All @@ -101,6 +113,10 @@ export class ClientEventBus {
'tanstack-dispatch-event',
this.#dispatcher,
)
this.#eventTarget.removeEventListener(
'tanstack-connect',
this.#connectFunction,
)
this.#eventSource?.close()
this.#socket?.close()
this.#socket = null
Expand Down
12 changes: 11 additions & 1 deletion packages/event-bus/src/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ export class ServerEventBus {
this.debugLog('Dispatching event from dispatcher, forwarding', event)
this.emit(event)
}

#connectFunction = () => {
this.#eventTarget.dispatchEvent(new CustomEvent('tanstack-connect-success'))
}
constructor({ port = 42069, debug = false } = {}) {
this.#port = port
this.#eventTarget = globalThis.__EVENT_TARGET__ ?? new EventTarget()
Expand Down Expand Up @@ -165,6 +167,10 @@ export class ServerEventBus {
'tanstack-dispatch-event',
this.#dispatcher,
)
this.#eventTarget.addEventListener(
'tanstack-connect',
this.#connectFunction,
)
this.handleNewConnection(wss)

// Handle connection upgrade for WebSocket
Expand Down Expand Up @@ -200,6 +206,10 @@ export class ServerEventBus {
'tanstack-dispatch-event',
this.#dispatcher,
)
this.#eventTarget.removeEventListener(
'tanstack-connect',
this.#connectFunction,
)
this.debugLog('[tanstack-devtools] All connections cleared')
}
}
2 changes: 2 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.