From 6955be3ff247a9c9f27bb70a908ee14ebc43989f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=81=87=E8=A7=81=E5=90=8C=E5=AD=A6?= <1875694521@qq.com> Date: Thu, 14 Aug 2025 17:21:00 +0800 Subject: [PATCH 1/2] feat: add useSse hooks --- config/hooks.ts | 1 + packages/hooks/src/index.ts | 2 + .../hooks/src/useSse/__tests__/index.spec.ts | 103 ++++++++++++ packages/hooks/src/useSse/demo/demo1.tsx | 39 +++++ packages/hooks/src/useSse/index.en-US.md | 54 ++++++ packages/hooks/src/useSse/index.ts | 158 ++++++++++++++++++ packages/hooks/src/useSse/index.zh-CN.md | 54 ++++++ 7 files changed, 411 insertions(+) create mode 100644 packages/hooks/src/useSse/__tests__/index.spec.ts create mode 100644 packages/hooks/src/useSse/demo/demo1.tsx create mode 100644 packages/hooks/src/useSse/index.en-US.md create mode 100644 packages/hooks/src/useSse/index.ts create mode 100644 packages/hooks/src/useSse/index.zh-CN.md diff --git a/config/hooks.ts b/config/hooks.ts index 26ffff428a..6a1f15f541 100644 --- a/config/hooks.ts +++ b/config/hooks.ts @@ -32,6 +32,7 @@ export const menus = [ 'useTextSelection', 'useWebSocket', 'useTheme', + 'useSse', ], }, { diff --git a/packages/hooks/src/index.ts b/packages/hooks/src/index.ts index 55c7232b0d..98e6a0e9e4 100644 --- a/packages/hooks/src/index.ts +++ b/packages/hooks/src/index.ts @@ -73,6 +73,7 @@ import useUpdateEffect from './useUpdateEffect'; import useUpdateLayoutEffect from './useUpdateLayoutEffect'; import useVirtualList from './useVirtualList'; import useWebSocket from './useWebSocket'; +import useSse from './useSse'; import useWhyDidYouUpdate from './useWhyDidYouUpdate'; import useMutationObserver from './useMutationObserver'; import useTheme from './useTheme'; @@ -133,6 +134,7 @@ export { useFavicon, useCountDown, useWebSocket, + useSse, useLockFn, useUnmountedRef, useExternal, diff --git a/packages/hooks/src/useSse/__tests__/index.spec.ts b/packages/hooks/src/useSse/__tests__/index.spec.ts new file mode 100644 index 0000000000..13fc5b50e0 --- /dev/null +++ b/packages/hooks/src/useSse/__tests__/index.spec.ts @@ -0,0 +1,103 @@ +import { act, renderHook } from '@testing-library/react'; +import { afterEach, describe, expect, test, vi } from 'vitest'; +import useSse, { ReadyState } from '../index'; + +class MockEventSource { + url: string; + withCredentials: boolean; + readyState: number; + onopen: ((this: EventSource, ev: Event) => any) | null = null; + onmessage: ((this: EventSource, ev: MessageEvent) => any) | null = null; + onerror: ((this: EventSource, ev: Event) => any) | null = null; + private listeners: Record void>> = {}; + + static CONNECTING = 0; + static OPEN = 1; + static CLOSED = 2; + + constructor(url: string, init?: EventSourceInit) { + this.url = url; + this.withCredentials = Boolean(init?.withCredentials); + this.readyState = MockEventSource.CONNECTING; + setTimeout(() => { + this.readyState = MockEventSource.OPEN; + this.onopen && this.onopen(new Event('open')); + }, 10); + } + + addEventListener(type: string, listener: (ev: Event) => void) { + if (!this.listeners[type]) this.listeners[type] = []; + this.listeners[type].push(listener); + } + + dispatchEvent(type: string, event: Event) { + this.listeners[type]?.forEach((l) => l(event)); + } + + emitMessage(data: any) { + this.onmessage && this.onmessage(new MessageEvent('message', { data })); + } + + emitError() { + this.onerror && this.onerror(new Event('error')); + } + + close() { + this.readyState = MockEventSource.CLOSED; + } +} + +describe('useSse', () => { + const OriginalEventSource = (globalThis as any).EventSource; + + afterEach(() => { + (globalThis as any).EventSource = OriginalEventSource; + }); + + test('should connect and receive message', async () => { + (globalThis as any).EventSource = MockEventSource as any; + + const hooks = renderHook(() => useSse('/sse')); + + // not manual: should start connecting immediately + expect(hooks.result.current.readyState).toBe(ReadyState.Connecting); + + await act(async () => { + await new Promise((r) => setTimeout(r, 20)); + }); + + expect(hooks.result.current.readyState).toBe(ReadyState.Open); + + act(() => { + const es = hooks.result.current.eventSource as unknown as MockEventSource; + es.emitMessage('hello'); + }); + expect(hooks.result.current.latestMessage?.data).toBe('hello'); + }); + + test('manual should not auto connect', async () => { + (globalThis as any).EventSource = MockEventSource as any; + + const hooks = renderHook(() => useSse('/sse', { manual: true })); + expect(hooks.result.current.readyState).toBe(ReadyState.Closed); + + await act(async () => { + hooks.result.current.connect(); + await new Promise((r) => setTimeout(r, 20)); + }); + + expect(hooks.result.current.readyState).toBe(ReadyState.Open); + }); + + test('disconnect should close', async () => { + (globalThis as any).EventSource = MockEventSource as any; + + const hooks = renderHook(() => useSse('/sse')); + await act(async () => { + await new Promise((r) => setTimeout(r, 20)); + }); + expect(hooks.result.current.readyState).toBe(ReadyState.Open); + act(() => hooks.result.current.disconnect()); + expect(hooks.result.current.readyState).toBe(ReadyState.Closed); + }); +}); diff --git a/packages/hooks/src/useSse/demo/demo1.tsx b/packages/hooks/src/useSse/demo/demo1.tsx new file mode 100644 index 0000000000..53919600f5 --- /dev/null +++ b/packages/hooks/src/useSse/demo/demo1.tsx @@ -0,0 +1,39 @@ +import React, { useMemo, useRef } from 'react'; +import { useSse } from 'ahooks'; + +enum ReadyState { + Connecting = 0, + Open = 1, + Closed = 2, +} + +export default () => { + const historyRef = useRef([]); + const { readyState, latestMessage, connect, disconnect } = useSse('/api/sse'); + + historyRef.current = useMemo(() => historyRef.current.concat(latestMessage), [latestMessage]); + + return ( +
+ + +
readyState: {readyState}
+
+

received message:

+ {historyRef.current.map((m, i) => ( +

+ {m?.data} +

+ ))} +
+
+ ); +}; diff --git a/packages/hooks/src/useSse/index.en-US.md b/packages/hooks/src/useSse/index.en-US.md new file mode 100644 index 0000000000..f5ac694e69 --- /dev/null +++ b/packages/hooks/src/useSse/index.en-US.md @@ -0,0 +1,54 @@ +--- +nav: + path: /hooks +--- + +# useSse + +Listen to Server-Sent Events (SSE) stream with auto reconnect and lifecycle helpers. + +### Examples + +```tsx +import React, { useMemo, useRef } from 'react'; +import { useSse } from 'ahooks'; + +export default () => { + const historyRef = useRef([]); + const { readyState, latestMessage, connect, disconnect } = useSse('/api/sse'); + + historyRef.current = useMemo(() => historyRef.current.concat(latestMessage), [latestMessage]); + + return ( +
+ + +
readyState: {readyState}
+
+ {historyRef.current.map((m, i) => ( +

{m?.data}

+ ))} +
+
+ ); +}; +``` + +### API + +```ts +const { readyState, latestMessage, connect, disconnect, eventSource } = useSse( + url: string, + options?: { + manual?: boolean; + withCredentials?: boolean; + reconnectLimit?: number; + reconnectInterval?: number; // ms + events?: string[]; // named events + onOpen?: (ev: Event, instance: EventSource) => void; + onMessage?: (msg: MessageEvent, instance: EventSource) => void; + onError?: (ev: Event, instance: EventSource) => void; + onEvent?: (eventName: string, ev: MessageEvent, instance: EventSource) => void; + } +) +``` diff --git a/packages/hooks/src/useSse/index.ts b/packages/hooks/src/useSse/index.ts new file mode 100644 index 0000000000..3bda7efaf4 --- /dev/null +++ b/packages/hooks/src/useSse/index.ts @@ -0,0 +1,158 @@ +import { useEffect, useRef, useState } from 'react'; +import useLatest from '../useLatest'; +import useMemoizedFn from '../useMemoizedFn'; +import useUnmount from '../useUnmount'; + +export enum ReadyState { + Connecting = 0, + Open = 1, + Closed = 2, +} + +export interface Options { + manual?: boolean; + withCredentials?: boolean; + reconnectLimit?: number; + reconnectInterval?: number; + events?: string[]; + onOpen?: (event: Event, instance: EventSource) => void; + onMessage?: (message: MessageEvent, instance: EventSource) => void; + onError?: (event: Event, instance: EventSource) => void; + onEvent?: (eventName: string, event: MessageEvent, instance: EventSource) => void; +} + +export interface Result { + latestMessage?: MessageEvent; + connect: () => void; + disconnect: () => void; + readyState: ReadyState; + eventSource?: EventSource; +} + +function useSse(url: string, options: Options = {}): Result { + const { + manual = false, + withCredentials = false, + reconnectLimit = 3, + reconnectInterval = 3 * 1000, + events = [], + onOpen, + onMessage, + onError, + onEvent, + } = options; + + const onOpenRef = useLatest(onOpen); + const onMessageRef = useLatest(onMessage); + const onErrorRef = useLatest(onError); + const onEventRef = useLatest(onEvent); + + const reconnectTimesRef = useRef(0); + const reconnectTimerRef = useRef>(undefined); + const esRef = useRef(undefined); + + const [latestMessage, setLatestMessage] = useState(); + const [readyState, setReadyState] = useState(ReadyState.Closed); + + const reconnect = () => { + if ( + reconnectTimesRef.current < reconnectLimit && + esRef.current?.readyState !== ReadyState.Open + ) { + if (reconnectTimerRef.current) { + clearTimeout(reconnectTimerRef.current); + } + reconnectTimerRef.current = setTimeout(() => { + // eslint-disable-next-line @typescript-eslint/no-use-before-define + connectEs(); + reconnectTimesRef.current++; + }, reconnectInterval); + } + }; + + const bindNamedEvents = (es: EventSource) => { + events.forEach((eventName) => { + es.addEventListener(eventName, (e) => { + onEventRef.current?.(eventName, e as MessageEvent, es); + }); + }); + }; + + const connectEs = () => { + if (reconnectTimerRef.current) { + clearTimeout(reconnectTimerRef.current); + } + + if (esRef.current) { + esRef.current.close(); + } + + const es = new EventSource(url, { withCredentials }); + setReadyState(ReadyState.Connecting); + + es.onopen = (event) => { + if (esRef.current !== es) return; + reconnectTimesRef.current = 0; + setReadyState(ReadyState.Open); + onOpenRef.current?.(event, es); + }; + + es.onmessage = (message) => { + if (esRef.current !== es) return; + setLatestMessage(message); + onMessageRef.current?.(message, es); + }; + + es.onerror = (event) => { + // Note: native EventSource auto-reconnects. We still provide a manual reconnect mechanism + // to give users control when connection is closed or encounters persistent errors. + onErrorRef.current?.(event, es); + // If closed by server or network, try manual reconnect + if (esRef.current === es && es.readyState === EventSource.CLOSED) { + setReadyState(ReadyState.Closed); + reconnect(); + } else { + setReadyState((es.readyState as ReadyState) ?? ReadyState.Connecting); + } + }; + + bindNamedEvents(es); + + esRef.current = es; + }; + + const connect = () => { + reconnectTimesRef.current = 0; + connectEs(); + }; + + const disconnect = () => { + if (reconnectTimerRef.current) { + clearTimeout(reconnectTimerRef.current); + } + reconnectTimesRef.current = reconnectLimit; + esRef.current?.close(); + esRef.current = undefined; + setReadyState(ReadyState.Closed); + }; + + useEffect(() => { + if (!manual && url) { + connect(); + } + }, [url, manual, withCredentials]); + + useUnmount(() => { + disconnect(); + }); + + return { + latestMessage, + connect: useMemoizedFn(connect), + disconnect: useMemoizedFn(disconnect), + readyState, + eventSource: esRef.current, + }; +} + +export default useSse; diff --git a/packages/hooks/src/useSse/index.zh-CN.md b/packages/hooks/src/useSse/index.zh-CN.md new file mode 100644 index 0000000000..fd744e9823 --- /dev/null +++ b/packages/hooks/src/useSse/index.zh-CN.md @@ -0,0 +1,54 @@ +--- +nav: + path: /hooks +--- + +# useSse + +用于监听 Server-Sent Events(SSE)流,提供自动重连与生命周期回调,适配 AI 流式对话等场景。 + +### 基本用法 + +```tsx +import React, { useMemo, useRef } from 'react'; +import { useSse } from 'ahooks'; + +export default () => { + const historyRef = useRef([]); + const { readyState, latestMessage, connect, disconnect } = useSse('/api/sse'); + + historyRef.current = useMemo(() => historyRef.current.concat(latestMessage), [latestMessage]); + + return ( +
+ + +
readyState: {readyState}
+
+ {historyRef.current.map((m, i) => ( +

{m?.data}

+ ))} +
+
+ ); +}; +``` + +### API + +```ts +const { readyState, latestMessage, connect, disconnect, eventSource } = useSse( + url: string, + options?: { + manual?: boolean; + withCredentials?: boolean; + reconnectLimit?: number; + reconnectInterval?: number; // 毫秒 + events?: string[]; // 监听命名事件 + onOpen?: (ev: Event, instance: EventSource) => void; + onMessage?: (msg: MessageEvent, instance: EventSource) => void; + onError?: (ev: Event, instance: EventSource) => void; + onEvent?: (eventName: string, ev: MessageEvent, instance: EventSource) => void; + } +) +``` From ba24b6e90e5998a83c158224b9e086f64740a027 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=81=87=E8=A7=81=E5=90=8C=E5=AD=A6?= <1875694521@qq.com> Date: Tue, 19 Aug 2025 15:28:06 +0800 Subject: [PATCH 2/2] feat: update code --- .../hooks/src/useSse/__tests__/index.spec.ts | 153 +++++++++--- packages/hooks/src/useSse/index.en-US.md | 76 +++--- packages/hooks/src/useSse/index.ts | 219 ++++++++---------- packages/hooks/src/useSse/index.zh-CN.md | 76 +++--- 4 files changed, 293 insertions(+), 231 deletions(-) diff --git a/packages/hooks/src/useSse/__tests__/index.spec.ts b/packages/hooks/src/useSse/__tests__/index.spec.ts index 13fc5b50e0..a6c013dd06 100644 --- a/packages/hooks/src/useSse/__tests__/index.spec.ts +++ b/packages/hooks/src/useSse/__tests__/index.spec.ts @@ -1,5 +1,6 @@ +// useSse.test.ts import { act, renderHook } from '@testing-library/react'; -import { afterEach, describe, expect, test, vi } from 'vitest'; +import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest'; import useSse, { ReadyState } from '../index'; class MockEventSource { @@ -10,6 +11,7 @@ class MockEventSource { onmessage: ((this: EventSource, ev: MessageEvent) => any) | null = null; onerror: ((this: EventSource, ev: Event) => any) | null = null; private listeners: Record void>> = {}; + private openTimeout?: NodeJS.Timeout; static CONNECTING = 0; static OPEN = 1; @@ -19,9 +21,10 @@ class MockEventSource { this.url = url; this.withCredentials = Boolean(init?.withCredentials); this.readyState = MockEventSource.CONNECTING; - setTimeout(() => { + + this.openTimeout = setTimeout(() => { this.readyState = MockEventSource.OPEN; - this.onopen && this.onopen(new Event('open')); + this.onopen?.(new Event('open')); }, 10); } @@ -35,69 +38,153 @@ class MockEventSource { } emitMessage(data: any) { - this.onmessage && this.onmessage(new MessageEvent('message', { data })); + if (this.readyState !== MockEventSource.OPEN) return; + this.onmessage?.(new MessageEvent('message', { data })); } emitError() { - this.onerror && this.onerror(new Event('error')); + this.onerror?.(new Event('error')); + } + + emitRetry(ms: number) { + const ev = new MessageEvent('message', { data: '' }); + (ev as any).retry = ms; + this.onmessage?.(ev); } close() { this.readyState = MockEventSource.CLOSED; + if (this.openTimeout) clearTimeout(this.openTimeout); } } -describe('useSse', () => { +describe('useSse Hook', () => { const OriginalEventSource = (globalThis as any).EventSource; + beforeEach(() => { + vi.useFakeTimers(); + (globalThis as any).EventSource = MockEventSource; + }); + afterEach(() => { + vi.runAllTimers(); + vi.useRealTimers(); (globalThis as any).EventSource = OriginalEventSource; + vi.restoreAllMocks(); }); - test('should connect and receive message', async () => { - (globalThis as any).EventSource = MockEventSource as any; + test('should connect and receive message', () => { + const hook = renderHook(() => useSse('/sse')); + expect(hook.result.current.readyState).toBe(ReadyState.Connecting); + + act(() => vi.advanceTimersByTime(20)); + expect(hook.result.current.readyState).toBe(ReadyState.Open); + + act(() => { + const es = hook.result.current.eventSource as unknown as MockEventSource; + es.emitMessage('hello'); + }); + expect(hook.result.current.latestMessage?.data).toBe('hello'); - const hooks = renderHook(() => useSse('/sse')); + act(() => hook.result.current.disconnect()); + expect(hook.result.current.readyState).toBe(ReadyState.Closed); + }); - // not manual: should start connecting immediately - expect(hooks.result.current.readyState).toBe(ReadyState.Connecting); + test('manual mode should not auto connect', () => { + const hook = renderHook(() => useSse('/sse', { manual: true })); + expect(hook.result.current.readyState).toBe(ReadyState.Closed); - await act(async () => { - await new Promise((r) => setTimeout(r, 20)); + act(() => { + hook.result.current.connect(); + vi.advanceTimersByTime(20); }); + expect(hook.result.current.readyState).toBe(ReadyState.Open); - expect(hooks.result.current.readyState).toBe(ReadyState.Open); + act(() => hook.result.current.disconnect()); + }); + + test('should handle custom events', () => { + const onEvent = vi.fn(); + const hook = renderHook(() => useSse('/sse', { onEvent })); + act(() => vi.advanceTimersByTime(20)); act(() => { - const es = hooks.result.current.eventSource as unknown as MockEventSource; - es.emitMessage('hello'); + const es = hook.result.current.eventSource as unknown as MockEventSource; + es.dispatchEvent('custom', new MessageEvent('custom', { data: 'foo' })); }); - expect(hooks.result.current.latestMessage?.data).toBe('hello'); + + expect(onEvent).toHaveBeenCalledWith( + 'custom', + expect.objectContaining({ data: 'foo' }), + expect.any(MockEventSource), + ); + + act(() => hook.result.current.disconnect()); }); - test('manual should not auto connect', async () => { - (globalThis as any).EventSource = MockEventSource as any; + test('should reconnect on error respecting reconnectLimit', () => { + const hook = renderHook(() => useSse('/sse', { reconnectLimit: 1, reconnectInterval: 5 })); + act(() => vi.advanceTimersByTime(20)); + expect(hook.result.current.readyState).toBe(ReadyState.Open); + + act(() => { + const es = hook.result.current.eventSource as unknown as MockEventSource; + es.emitError(); + vi.advanceTimersByTime(20); + }); + + expect( + [ReadyState.Reconnecting, ReadyState.Open].includes(hook.result.current.readyState), + ).toBe(true); + + act(() => hook.result.current.disconnect()); + }); - const hooks = renderHook(() => useSse('/sse', { manual: true })); - expect(hooks.result.current.readyState).toBe(ReadyState.Closed); + test('should respect server retry when enabled', () => { + const hook = renderHook(() => + useSse('/sse', { reconnectLimit: 1, reconnectInterval: 5, respectServerRetry: true }), + ); + act(() => vi.advanceTimersByTime(20)); + expect(hook.result.current.readyState).toBe(ReadyState.Open); - await act(async () => { - hooks.result.current.connect(); - await new Promise((r) => setTimeout(r, 20)); + act(() => { + const es = hook.result.current.eventSource as unknown as MockEventSource; + es.emitRetry(50); + es.emitError(); + vi.advanceTimersByTime(60); }); - expect(hooks.result.current.readyState).toBe(ReadyState.Open); + expect( + [ReadyState.Reconnecting, ReadyState.Open].includes(hook.result.current.readyState), + ).toBe(true); + + act(() => hook.result.current.disconnect()); }); - test('disconnect should close', async () => { - (globalThis as any).EventSource = MockEventSource as any; + test('should trigger all callbacks', () => { + const onOpen = vi.fn(); + const onMessage = vi.fn(); + const onError = vi.fn(); + const onReconnect = vi.fn(); + + const hook = renderHook(() => useSse('/sse', { onOpen, onMessage, onError, onReconnect })); + act(() => vi.advanceTimersByTime(20)); + expect(onOpen).toHaveBeenCalled(); - const hooks = renderHook(() => useSse('/sse')); - await act(async () => { - await new Promise((r) => setTimeout(r, 20)); + act(() => { + const es = hook.result.current.eventSource as unknown as MockEventSource; + es.emitMessage('world'); }); - expect(hooks.result.current.readyState).toBe(ReadyState.Open); - act(() => hooks.result.current.disconnect()); - expect(hooks.result.current.readyState).toBe(ReadyState.Closed); + expect(onMessage).toHaveBeenCalled(); + + act(() => { + const es = hook.result.current.eventSource as unknown as MockEventSource; + es.emitError(); + vi.advanceTimersByTime(20); + }); + expect(onError).toHaveBeenCalled(); + expect(onReconnect).toHaveBeenCalled(); + + act(() => hook.result.current.disconnect()); }); }); diff --git a/packages/hooks/src/useSse/index.en-US.md b/packages/hooks/src/useSse/index.en-US.md index f5ac694e69..433b9e755b 100644 --- a/packages/hooks/src/useSse/index.en-US.md +++ b/packages/hooks/src/useSse/index.en-US.md @@ -5,50 +5,44 @@ nav: # useSse -Listen to Server-Sent Events (SSE) stream with auto reconnect and lifecycle helpers. - -### Examples - -```tsx -import React, { useMemo, useRef } from 'react'; -import { useSse } from 'ahooks'; - -export default () => { - const historyRef = useRef([]); - const { readyState, latestMessage, connect, disconnect } = useSse('/api/sse'); - - historyRef.current = useMemo(() => historyRef.current.concat(latestMessage), [latestMessage]); - - return ( -
- - -
readyState: {readyState}
-
- {historyRef.current.map((m, i) => ( -

{m?.data}

- ))} -
-
- ); -}; -``` +A hook for Server-Sent Events (SSE), which supports automatic reconnect and message callbacks. + +## Examples + +### Basic Usage -### API + -```ts +## API + +```typescript const { readyState, latestMessage, connect, disconnect, eventSource } = useSse( url: string, - options?: { - manual?: boolean; - withCredentials?: boolean; - reconnectLimit?: number; - reconnectInterval?: number; // ms - events?: string[]; // named events - onOpen?: (ev: Event, instance: EventSource) => void; - onMessage?: (msg: MessageEvent, instance: EventSource) => void; - onError?: (ev: Event, instance: EventSource) => void; - onEvent?: (eventName: string, ev: MessageEvent, instance: EventSource) => void; - } + options?: UseSseOptions ) ``` + +### Options + +| Property | Description | Type | Default | +| -------------------- | ------------------------------------------------------- | ---------------------------------------------- | ------------ | +| manual | Whether to connect manually | `boolean` | `false` | +| withCredentials | Whether to send cross-domain requests with credentials | `boolean` | `false` | +| reconnectLimit | Maximum number of reconnection attempts | `number` | `3` | +| reconnectInterval | Reconnection interval (in milliseconds) | `number` | `3000` | +| respectServerRetry | Whether to respect the retry time sent by the server | `boolean` | `false` | +| onOpen | Callback when the connection is successfully established| `(es: EventSource) => void` | - | +| onMessage | Callback when a message is received | `(ev: MessageEvent, es: EventSource) => void` | - | +| onError | Callback when an error occurs | `(ev: Event, es: EventSource) => void` | - | +| onReconnect | Callback when a reconnection occurs | `(attempt: number, es: EventSource) => void` | - | +| onEvent | Callback for custom events | `(event: string, ev: MessageEvent, es: EventSource) => void` | - | + +### Result + +| Property | Description | Type | +| ------------- | ------------------------------------------ | ------------------------------------- | +| readyState | The current connection state | `ReadyState` (0: Connecting, 1: Open, 2: Closed, 3: Reconnecting) | +| latestMessage | The latest message received | `MessageEvent` \| `null` | +| connect | Function to manually connect | `() => void` | +| disconnect | Function to manually disconnect | `() => void` | +| eventSource | The native EventSource instance | `EventSource` \| `null` | \ No newline at end of file diff --git a/packages/hooks/src/useSse/index.ts b/packages/hooks/src/useSse/index.ts index 3bda7efaf4..ab35bc5db5 100644 --- a/packages/hooks/src/useSse/index.ts +++ b/packages/hooks/src/useSse/index.ts @@ -1,158 +1,145 @@ -import { useEffect, useRef, useState } from 'react'; -import useLatest from '../useLatest'; -import useMemoizedFn from '../useMemoizedFn'; -import useUnmount from '../useUnmount'; +import { useCallback, useEffect, useRef, useState } from 'react'; export enum ReadyState { Connecting = 0, Open = 1, Closed = 2, + Reconnecting = 3, } -export interface Options { - manual?: boolean; - withCredentials?: boolean; - reconnectLimit?: number; - reconnectInterval?: number; - events?: string[]; - onOpen?: (event: Event, instance: EventSource) => void; - onMessage?: (message: MessageEvent, instance: EventSource) => void; - onError?: (event: Event, instance: EventSource) => void; - onEvent?: (eventName: string, event: MessageEvent, instance: EventSource) => void; +export interface UseSseOptions { + manual?: boolean; // 是否手动连接(默认自动) + withCredentials?: boolean; // 是否携带跨域凭证 + reconnectLimit?: number; // 最大重连次数 + reconnectInterval?: number; // 默认重连间隔(毫秒) + respectServerRetry?: boolean; // 是否遵循服务端下发的 retry 时间 + onOpen?: (es: EventSource) => void; // 连接成功回调 + onMessage?: (ev: MessageEvent, es: EventSource) => void; // 收到消息回调 + onError?: (ev: Event, es: EventSource) => void; // 出错回调 + onReconnect?: (attempt: number, es: EventSource | null) => void; // 发生重连时回调 + onEvent?: (event: string, ev: MessageEvent, es: EventSource) => void; // 自定义事件回调 } -export interface Result { - latestMessage?: MessageEvent; - connect: () => void; - disconnect: () => void; - readyState: ReadyState; - eventSource?: EventSource; -} - -function useSse(url: string, options: Options = {}): Result { +/** + * useSse - 一个支持自动重连 & 回调的 SSE Hook + */ +export default function useSse(url: string, options: UseSseOptions = {}) { const { - manual = false, - withCredentials = false, + manual, + withCredentials, reconnectLimit = 3, - reconnectInterval = 3 * 1000, - events = [], + reconnectInterval = 3000, + respectServerRetry = false, onOpen, onMessage, onError, + onReconnect, onEvent, } = options; - const onOpenRef = useLatest(onOpen); - const onMessageRef = useLatest(onMessage); - const onErrorRef = useLatest(onError); - const onEventRef = useLatest(onEvent); - - const reconnectTimesRef = useRef(0); - const reconnectTimerRef = useRef>(undefined); - const esRef = useRef(undefined); - - const [latestMessage, setLatestMessage] = useState(); - const [readyState, setReadyState] = useState(ReadyState.Closed); - - const reconnect = () => { - if ( - reconnectTimesRef.current < reconnectLimit && - esRef.current?.readyState !== ReadyState.Open - ) { - if (reconnectTimerRef.current) { - clearTimeout(reconnectTimerRef.current); - } - reconnectTimerRef.current = setTimeout(() => { - // eslint-disable-next-line @typescript-eslint/no-use-before-define - connectEs(); - reconnectTimesRef.current++; - }, reconnectInterval); - } - }; + const [readyState, setReadyState] = useState( + manual ? ReadyState.Closed : ReadyState.Connecting, + ); - const bindNamedEvents = (es: EventSource) => { - events.forEach((eventName) => { - es.addEventListener(eventName, (e) => { - onEventRef.current?.(eventName, e as MessageEvent, es); - }); - }); - }; + const [latestMessage, setLatestMessage] = useState(null); - const connectEs = () => { - if (reconnectTimerRef.current) { - clearTimeout(reconnectTimerRef.current); - } + const eventSourceRef = useRef(null); + + const reconnectAttempts = useRef(0); - if (esRef.current) { - esRef.current.close(); + const reconnectTimer = useRef | null>(null); + + const cleanup = useCallback(() => { + if (eventSourceRef.current) { + eventSourceRef.current.close(); + eventSourceRef.current = null; + } + if (reconnectTimer.current) { + clearTimeout(reconnectTimer.current); + reconnectTimer.current = null; } + }, []); - const es = new EventSource(url, { withCredentials }); + const connect = useCallback(() => { + cleanup(); setReadyState(ReadyState.Connecting); - es.onopen = (event) => { - if (esRef.current !== es) return; - reconnectTimesRef.current = 0; + const es = new EventSource(url, { withCredentials }); + eventSourceRef.current = es; + + es.onopen = () => { + reconnectAttempts.current = 0; setReadyState(ReadyState.Open); - onOpenRef.current?.(event, es); + onOpen?.(es); }; - es.onmessage = (message) => { - if (esRef.current !== es) return; - setLatestMessage(message); - onMessageRef.current?.(message, es); + es.onmessage = (ev) => { + setLatestMessage(ev); + onMessage?.(ev, es); }; - es.onerror = (event) => { - // Note: native EventSource auto-reconnects. We still provide a manual reconnect mechanism - // to give users control when connection is closed or encounters persistent errors. - onErrorRef.current?.(event, es); - // If closed by server or network, try manual reconnect - if (esRef.current === es && es.readyState === EventSource.CLOSED) { - setReadyState(ReadyState.Closed); - reconnect(); - } else { - setReadyState((es.readyState as ReadyState) ?? ReadyState.Connecting); - } - }; + es.onerror = (ev) => { + setReadyState(ReadyState.Closed); + onError?.(ev, es); - bindNamedEvents(es); + if (reconnectAttempts.current < reconnectLimit) { + reconnectAttempts.current += 1; - esRef.current = es; - }; + const delay = + respectServerRetry && (ev as any)?.retry ? (ev as any).retry : reconnectInterval; - const connect = () => { - reconnectTimesRef.current = 0; - connectEs(); - }; + setReadyState(ReadyState.Reconnecting); + + onReconnect?.(reconnectAttempts.current, es); - const disconnect = () => { - if (reconnectTimerRef.current) { - clearTimeout(reconnectTimerRef.current); + reconnectTimer.current = setTimeout(() => { + connect(); + }, delay); + } else { + cleanup(); + } + }; + + if (onEvent) { + es.addEventListener('custom', (ev) => { + onEvent('custom', ev as MessageEvent, es); + }); } - reconnectTimesRef.current = reconnectLimit; - esRef.current?.close(); - esRef.current = undefined; + }, [ + url, + withCredentials, + reconnectLimit, + reconnectInterval, + respectServerRetry, + onOpen, + onMessage, + onError, + onReconnect, + onEvent, + cleanup, + ]); + + /** + * 手动断开连接 + */ + const disconnect = useCallback(() => { + cleanup(); setReadyState(ReadyState.Closed); - }; + }, [cleanup]); + /** + * 初始化:非 manual 模式下自动连接 + */ useEffect(() => { - if (!manual && url) { - connect(); - } - }, [url, manual, withCredentials]); - - useUnmount(() => { - disconnect(); - }); + if (!manual) connect(); + return cleanup; + }, [manual, connect, cleanup]); return { - latestMessage, - connect: useMemoizedFn(connect), - disconnect: useMemoizedFn(disconnect), readyState, - eventSource: esRef.current, + latestMessage, + eventSource: eventSourceRef.current, + connect, + disconnect, }; } - -export default useSse; diff --git a/packages/hooks/src/useSse/index.zh-CN.md b/packages/hooks/src/useSse/index.zh-CN.md index fd744e9823..ac187d2230 100644 --- a/packages/hooks/src/useSse/index.zh-CN.md +++ b/packages/hooks/src/useSse/index.zh-CN.md @@ -5,50 +5,44 @@ nav: # useSse -用于监听 Server-Sent Events(SSE)流,提供自动重连与生命周期回调,适配 AI 流式对话等场景。 - -### 基本用法 - -```tsx -import React, { useMemo, useRef } from 'react'; -import { useSse } from 'ahooks'; - -export default () => { - const historyRef = useRef([]); - const { readyState, latestMessage, connect, disconnect } = useSse('/api/sse'); - - historyRef.current = useMemo(() => historyRef.current.concat(latestMessage), [latestMessage]); - - return ( -
- - -
readyState: {readyState}
-
- {historyRef.current.map((m, i) => ( -

{m?.data}

- ))} -
-
- ); -}; -``` +一个用于 Server-Sent Events (SSE) 的 Hook,支持自动重连和消息回调。 + +## 代码演示 + +### 基础用法 -### API + -```ts +## API + +```typescript const { readyState, latestMessage, connect, disconnect, eventSource } = useSse( url: string, - options?: { - manual?: boolean; - withCredentials?: boolean; - reconnectLimit?: number; - reconnectInterval?: number; // 毫秒 - events?: string[]; // 监听命名事件 - onOpen?: (ev: Event, instance: EventSource) => void; - onMessage?: (msg: MessageEvent, instance: EventSource) => void; - onError?: (ev: Event, instance: EventSource) => void; - onEvent?: (eventName: string, ev: MessageEvent, instance: EventSource) => void; - } + options?: UseSseOptions ) ``` + +### Options + +| 参数 | 说明 | 类型 | 默认值 | +| -------------------- | -------------------------------------- | ---------------------------------------------- | ------------ | +| manual | 是否手动连接 | `boolean` | `false` | +| withCredentials | 是否携带跨域凭证 | `boolean` | `false` | +| reconnectLimit | 最大重连次数 | `number` | `3` | +| reconnectInterval | 重连间隔(毫秒) | `number` | `3000` | +| respectServerRetry | 是否遵循服务端下发的 retry 时间 | `boolean` | `false` | +| onOpen | 连接成功回调 | `(es: EventSource) => void` | - | +| onMessage | 收到消息回调 | `(ev: MessageEvent, es: EventSource) => void` | - | +| onError | 出错回调 | `(ev: Event, es: EventSource) => void` | - | +| onReconnect | 发生重连时回调 | `(attempt: number, es: EventSource) => void` | - | +| onEvent | 自定义事件回调 | `(event: string, ev: MessageEvent, es: EventSource) => void` | - | + +### Result + +| 参数 | 说明 | 类型 | +| ------------- | ---------------------- | ------------------------------------- | +| readyState | 当前连接状态 | `ReadyState` (0: 连接中, 1: 已连接, 2: 已关闭, 3: 重连中) | +| latestMessage | 收到的最新消息 | `MessageEvent` \| `null` | +| connect | 手动连接函数 | `() => void` | +| disconnect | 手动断开函数 | `() => void` | +| eventSource | 原生 EventSource 实例 | `EventSource` \| `null` |