| title | Server-Sent Events (SSE) |
|---|---|
| description | Implementing real-time server-to-client communication using Server-Sent Events in DeployStack Backend. |
DeployStack Backend includes @fastify/sse for Server-Sent Events support. SSE provides a simple, unidirectional communication channel from server to client over HTTP - ideal for live updates, notifications, and streaming data.
The plugin is globally registered with a 30-second heartbeat interval to keep connections alive.
All SSE endpoints MUST follow this URL pattern:
- REST endpoint:
/api/{resource}/{action} - SSE endpoint:
/api/{resource}/{action}/stream
# Client Activity
GET /api/users/me/mcp/client-activity # REST API (polling)
GET /api/users/me/mcp/client-activity/stream # SSE stream
# Notifications
GET /api/teams/:teamId/notifications # REST API (polling)
GET /api/teams/:teamId/notifications/stream # SSE stream
# Metrics
GET /api/satellites/:satelliteId/metrics # REST API (polling)
GET /api/satellites/:satelliteId/metrics/stream # SSE streamEvery SSE endpoint should have a corresponding REST endpoint:
- Same Query Parameters: Both endpoints accept identical query parameters
- Same Data Structure: Both return the same data format
- Consistent Behavior: Both apply the same filters and limits
- Fallback Support: REST endpoint serves as fallback for clients without SSE support
- Industry Standard: Used by GitHub, Stripe, and Twitter APIs
- RESTful: Treats streaming as a sub-resource
- Technology Agnostic: Works for SSE, WebSockets, or any streaming protocol
- Clear Intent: Immediately indicates real-time streaming capability
Add the sse: true option to any route definition:
server.get('/events', { sse: true }, async (request, reply) => {
// SSE methods available on reply.sse
})For route-specific configuration:
server.get('/events', {
sse: {
heartbeat: false, // Disable heartbeat for this route
serializer: (data) => JSON.stringify(data) // Custom serializer
}
}, handler)reply.sse.send({ data: 'Hello world' })reply.sse.send({
id: '123',
event: 'user_update',
data: { userId: 'abc', status: 'online' },
retry: 5000 // Client retry interval in ms
})async function* generateUpdates() {
for (let i = 0; i < 10; i++) {
yield { data: { count: i } }
await new Promise(r => setTimeout(r, 1000))
}
}
reply.sse.send(generateUpdates())By default, the connection closes after the handler completes. To keep it open:
reply.sse.keepAlive()reply.sse.onClose(() => {
// Cleanup logic when client disconnects
server.log.info('Client disconnected')
})reply.sse.close()Handle reconnecting clients using the Last-Event-ID header:
reply.sse.replay(async (lastEventId) => {
// Fetch and send missed events since lastEventId
const missedEvents = await getMissedEvents(lastEventId)
for (const event of missedEvents) {
reply.sse.send(event)
}
})Access the last event ID directly:
const lastId = reply.sse.lastEventIdif (reply.sse.isConnected) {
reply.sse.send({ data: 'still connected' })
}import { type FastifyInstance } from 'fastify'
import { requirePermission } from '../../../middleware/roleMiddleware'
export default async function sseRoute(server: FastifyInstance) {
server.get('/notifications/stream', {
sse: true,
preValidation: requirePermission('notifications.read'),
schema: {
tags: ['Notifications'],
summary: 'Stream notifications',
description: 'Real-time notification stream via SSE',
security: [{ cookieAuth: [] }]
}
}, async (request, reply) => {
const userId = request.user!.id
// Handle client reconnection
reply.sse.replay(async (lastEventId) => {
const missed = await notificationService.getMissedNotifications(userId, lastEventId)
for (const notification of missed) {
reply.sse.send({ id: notification.id, event: 'notification', data: notification })
}
})
// Keep connection open
reply.sse.keepAlive()
// Subscribe to new notifications
const unsubscribe = notificationService.subscribe(userId, (notification) => {
if (reply.sse.isConnected) {
reply.sse.send({ id: notification.id, event: 'notification', data: notification })
}
})
// Cleanup on disconnect
reply.sse.onClose(() => {
unsubscribe()
server.log.debug({ userId }, 'SSE connection closed')
})
})
}When using setInterval with async database queries or API calls, you must check connection state after the async operation completes to prevent crashes:
export default async function pollingStreamRoute(server: FastifyInstance) {
server.get('/metrics/stream', {
sse: true,
preValidation: requirePermission('metrics.view'),
schema: {
tags: ['Metrics'],
summary: 'Stream metrics via SSE with polling',
security: [{ cookieAuth: [] }]
}
}, async (request, reply) => {
const userId = request.user!.id
let pollInterval: NodeJS.Timeout | null = null
// Keep connection open
reply.sse.keepAlive()
// Send initial snapshot
const initialData = await fetchMetrics(userId)
reply.sse.send({ event: 'snapshot', data: initialData })
// Poll for updates every 3 seconds
pollInterval = setInterval(async () => {
// Check #1: Before starting async work
if (!reply.sse.isConnected) {
if (pollInterval) clearInterval(pollInterval)
return
}
try {
// Async database query or API call
const data = await fetchMetrics(userId)
// ⚠️ CRITICAL: Check #2 after async operation completes
if (!reply.sse.isConnected) {
if (pollInterval) clearInterval(pollInterval)
return
}
// If sending multiple items in a loop, check before each send
for (const item of data) {
if (!reply.sse.isConnected) {
if (pollInterval) clearInterval(pollInterval)
return
}
reply.sse.send({ event: 'update', data: item })
}
} catch (error) {
server.log.error(error, 'Failed to fetch metrics')
// Don't crash - just log the error
}
}, 3000)
// Cleanup on disconnect
reply.sse.onClose(() => {
if (pollInterval) {
clearInterval(pollInterval)
pollInterval = null
}
server.log.debug({ userId }, 'Metrics stream closed')
})
})
}Without the second check after async operations:
Time 0ms: setInterval fires → Check isConnected ✅ (connected)
Time 5ms: Start database query (async)
Time 50ms: Client disconnects → isConnected = false
Time 100ms: Query completes → Call send() → CRASH! ❌
With proper checks:
Time 0ms: setInterval fires → Check #1 ✅ (connected)
Time 5ms: Start database query (async)
Time 50ms: Client disconnects → isConnected = false
Time 100ms: Query completes → Check #2 ✅ (disconnected) → Return early ✅
The client can disconnect during async operations, so checking only at the start of the interval is insufficient.
const eventSource = new EventSource('/api/notifications/stream', {
withCredentials: true // Include cookies for authentication
})
eventSource.addEventListener('notification', (event) => {
const data = JSON.parse(event.data)
console.log('New notification:', data)
})
eventSource.onerror = () => {
// Browser automatically reconnects
console.log('Connection lost, reconnecting...')
}Import types from the package:
import type { SSEMessage } from '@fastify/sse'
const message: SSEMessage = {
id: '123',
event: 'update',
data: { status: 'active' }
}- API Documentation Generation - General API development patterns
- API Security - Authorization middleware for protected SSE endpoints