Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/query/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ export type Unsubscriber = () => void
* Subscribes to the event and automatically unsubscribes after receiving it.
*/
export type OnceFunction = {
<T = unknown>(key: string, event: QueryEvent): Promise<CustomEventInit<T>>
<T = unknown>(key: string, event: QueryEvent, signal?: AbortSignal): Promise<CustomEventInit<T>>
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/query/query.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -805,8 +805,8 @@ describe.concurrent('query', function () {

const { items, resolvers } = caches()

expect(items).toBe(items)
expect(resolvers).toBe(resolvers)
expect(items).toBe(itemsCache)
expect(resolvers).toBe(resolversCache)
})

it('respects fresh option from configure()', async ({ expect }) => {
Expand Down
87 changes: 72 additions & 15 deletions src/query/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,12 @@ export function createQuery(instanceOptions?: Configuration): Query {
case 'resolved':
case 'hydrated':
case 'forgotten':
broadcast?.postMessage({ event: `${event}:${key}`, detail })
try {
broadcast?.postMessage({ event: `${event}:${key}`, detail })
} catch {
// Silently ignore DataCloneError or other postMessage failures
// (e.g. when the detail is not structurally cloneable).
}
}
}

Expand Down Expand Up @@ -271,7 +276,14 @@ export function createQuery(instanceOptions?: Configuration): Query {

if (item !== undefined) {
itemsCache.delete(key)
emit(key, 'forgotten', await item.item)

// Wrap in try-catch so that rejected or pending-then-rejected
// promises don't prevent the rest of the keys from being forgotten.
try {
emit(key, 'forgotten', await item.item)
} catch {
emit(key, 'forgotten', undefined)
}
}
}
}
Expand Down Expand Up @@ -368,6 +380,14 @@ export function createQuery(instanceOptions?: Configuration): Query {
// Awaits the fetching to get the result item.
const item = await result

// If the signal was aborted after the fetch resolved but
// before we write to the cache, bail out to avoid writing
// stale data that contradicts the abort.
if (controller.signal.aborted) {
reject(controller.signal.reason as Error)
return
}

const promise =
(resolversCache.get(key)?.item as Promise<T> | undefined) ?? Promise.resolve(item)

Expand Down Expand Up @@ -486,14 +506,19 @@ export function createQuery(instanceOptions?: Configuration): Query {
* context.
*/
function subscribeBroadcast(): Unsubscriber {
// Capture the current broadcast reference so that the unsubscriber
// always targets the same channel that was subscribed to, even if
// configure() replaces the broadcast channel later.
const currentBroadcast = broadcast

function onBroadcastMessage(message: MessageEvent<BroadcastPayload>) {
events.dispatchEvent(new CustomEvent(message.data.event, { detail: message.data.detail }))
}

broadcast?.addEventListener('message', onBroadcastMessage)
currentBroadcast?.addEventListener('message', onBroadcastMessage)

return function () {
broadcast?.removeEventListener('message', onBroadcastMessage)
currentBroadcast?.removeEventListener('message', onBroadcastMessage)
}
}

Expand All @@ -508,22 +533,25 @@ export function createQuery(instanceOptions?: Configuration): Query {
* @param keys - A single key, array of keys, or object mapping names to keys.
* @returns A promise that resolves with the fetched value(s).
*/
async function next<T = unknown>(keys: string | { [K in keyof T]: string }): Promise<T> {
async function next<T = unknown>(
keys: string | { [K in keyof T]: string },
signal?: AbortSignal
): Promise<T> {
if (typeof keys === 'string') {
const event = await once(keys, 'refetching')
const event = await once(keys, 'refetching', signal)
return (await (event.detail as Promise<T>)) as T
}

if (Array.isArray(keys)) {
const promises = keys.map((key) => once(key, 'refetching'))
const promises = keys.map((key) => once(key, 'refetching', signal))
const events = await Promise.all(promises)
const details = events.map((event) => event.detail as Promise<T>)
return (await Promise.all(details)) as T
}

const objectKeys = keys as Record<string, string>
const entries = Object.entries(objectKeys)
const promises = entries.map(([, key]) => once(key, 'refetching'))
const promises = entries.map(([, key]) => once(key, 'refetching', signal))
const events = await Promise.all(promises)
const details = await Promise.all(events.map((event) => event.detail as Promise<unknown>))
const result = Object.fromEntries(entries.map(([name], i) => [name, details[i]]))
Expand All @@ -538,8 +566,14 @@ export function createQuery(instanceOptions?: Configuration): Query {
* @yields The resolved value(s) each time a refetch completes.
*/
async function* stream<T = unknown>(keys: string | { [K in keyof T]: string }) {
for (;;) {
yield await next<T>(keys)
const controller = new AbortController()

try {
for (;;) {
yield await next<T>(keys, controller.signal)
}
} finally {
controller.abort()
}
}

Expand All @@ -551,12 +585,29 @@ export function createQuery(instanceOptions?: Configuration): Query {
* @param event - The type of event to wait for.
* @returns A promise that resolves with the event details.
*/
function once<T = unknown>(key: string, event: QueryEvent) {
return new Promise<CustomEventInit<T>>(function (resolve) {
function once<T = unknown>(key: string, event: QueryEvent, signal?: AbortSignal) {
return new Promise<CustomEventInit<T>>(function (resolve, reject) {
const unsubscribe = subscribe<T>(key, event, function (event) {
resolve(event)
unsubscribe()
cleanup()
})

function cleanup() {
unsubscribe()
signal?.removeEventListener('abort', onAbort)
}

function onAbort() {
cleanup()
reject(signal!.reason)
}

signal?.addEventListener('abort', onAbort)

// If the signal is already aborted, clean up immediately.
if (signal?.aborted) {
onAbort()
}
})
}

Expand All @@ -570,8 +621,14 @@ export function createQuery(instanceOptions?: Configuration): Query {
* @yields The event details each time the event occurs.
*/
async function* sequence<T = unknown>(key: string, event: QueryEvent) {
for (;;) {
yield await once<T>(key, event)
const controller = new AbortController()

try {
for (;;) {
yield await once<T>(key, event, controller.signal)
}
} finally {
controller.abort()
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/react/components/QueryProvider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ export function QueryProvider({

useEffect(
function () {
// Guard against environments where BroadcastChannel is unavailable
// (e.g. certain edge runtimes or older server-side environments).
if (typeof BroadcastChannel === 'undefined') {
return
}

const broadcast = new BroadcastChannel('query')

localQuery.configure({ broadcast })
Expand Down
Loading