| title | Server-Sent Events (SSE) |
|---|---|
| description | Implementing real-time server-to-client communication using Server-Sent Events in DeployStack Backend. |
DeployStack Backend includes @fastify/sse for Server-Sent Events support. SSE provides a simple, unidirectional communication channel from server to client over HTTP - ideal for live updates, notifications, and streaming data.
The plugin is globally registered with a 30-second heartbeat interval to keep connections alive.
All SSE endpoints MUST follow this URL pattern:
- REST endpoint:
/api/{resource}/{action} - SSE endpoint:
/api/{resource}/{action}/stream
# Client Activity
GET /api/users/me/mcp/client-activity # REST API (polling)
GET /api/users/me/mcp/client-activity/stream # SSE stream
# Notifications
GET /api/teams/:teamId/notifications # REST API (polling)
GET /api/teams/:teamId/notifications/stream # SSE stream
# Metrics
GET /api/satellites/:satelliteId/metrics # REST API (polling)
GET /api/satellites/:satelliteId/metrics/stream # SSE streamEvery SSE endpoint should have a corresponding REST endpoint:
- Same Query Parameters: Both endpoints accept identical query parameters
- Same Data Structure: Both return the same data format
- Consistent Behavior: Both apply the same filters and limits
- Fallback Support: REST endpoint serves as fallback for clients without SSE support
- Industry Standard: Used by GitHub, Stripe, and Twitter APIs
- RESTful: Treats streaming as a sub-resource
- Technology Agnostic: Works for SSE, WebSockets, or any streaming protocol
- Clear Intent: Immediately indicates real-time streaming capability
Add the sse: true option to any route definition:
server.get('/events', { sse: true }, async (request, reply) => {
// SSE methods available on reply.sse
})For route-specific configuration:
server.get('/events', {
sse: {
heartbeat: false, // Disable heartbeat for this route
serializer: (data) => JSON.stringify(data) // Custom serializer
}
}, handler)reply.sse.send({ data: 'Hello world' })reply.sse.send({
id: '123',
event: 'user_update',
data: { userId: 'abc', status: 'online' },
retry: 5000 // Client retry interval in ms
})async function* generateUpdates() {
for (let i = 0; i < 10; i++) {
yield { data: { count: i } }
await new Promise(r => setTimeout(r, 1000))
}
}
reply.sse.send(generateUpdates())By default, the connection closes after the handler completes. To keep it open:
reply.sse.keepAlive()reply.sse.onClose(() => {
// Cleanup logic when client disconnects
server.log.info('Client disconnected')
})reply.sse.close()Handle reconnecting clients using the Last-Event-ID header:
reply.sse.replay(async (lastEventId) => {
// Fetch and send missed events since lastEventId
const missedEvents = await getMissedEvents(lastEventId)
for (const event of missedEvents) {
reply.sse.send(event)
}
})Access the last event ID directly:
const lastId = reply.sse.lastEventIdif (reply.sse.isConnected) {
reply.sse.send({ data: 'still connected' })
}import { type FastifyInstance } from 'fastify'
import { requirePermission } from '../../../middleware/roleMiddleware'
export default async function sseRoute(server: FastifyInstance) {
server.get('/notifications/stream', {
sse: true,
preValidation: requirePermission('notifications.read'),
schema: {
tags: ['Notifications'],
summary: 'Stream notifications',
description: 'Real-time notification stream via SSE',
security: [{ cookieAuth: [] }]
}
}, async (request, reply) => {
const userId = request.user!.id
// Handle client reconnection
reply.sse.replay(async (lastEventId) => {
const missed = await notificationService.getMissedNotifications(userId, lastEventId)
for (const notification of missed) {
reply.sse.send({ id: notification.id, event: 'notification', data: notification })
}
})
// Keep connection open
reply.sse.keepAlive()
// Subscribe to new notifications
const unsubscribe = notificationService.subscribe(userId, (notification) => {
if (reply.sse.isConnected) {
reply.sse.send({ id: notification.id, event: 'notification', data: notification })
}
})
// Cleanup on disconnect
reply.sse.onClose(() => {
unsubscribe()
server.log.debug({ userId }, 'SSE connection closed')
})
})
}When using setInterval with async database queries or API calls, you must check connection state after the async operation completes to prevent crashes.
// ❌ WRONG: Captures oldest timestamp after reverse
const newItems = await db.query().orderBy(desc(created_at))
for (const item of newItems.reverse()) { // reverse() MUTATES the array
reply.sse.send({ event: 'item', data: item })
}
lastSentTimestamp = newItems[0].created_at // BUG: Now points to OLDEST item!
// ✅ CORRECT: Capture newest timestamp before reversing
const newItems = await db.query().orderBy(desc(created_at))
const newestTimestamp = newItems[0].created_at // Capture FIRST (newest)
for (const item of newItems.reverse()) {
reply.sse.send({ event: 'item', data: item })
}
lastSentTimestamp = newestTimestamp // Use captured newest timestampWhy this matters:
- Query returns items in descending order (newest first):
[newest, ..., oldest] array.reverse()mutates the array:[oldest, ..., newest]- After reversal,
array[0]points to the oldest item - Using
array[0].created_atafter reversal setslastSentTimestampto the oldest timestamp - Next poll finds the same items again → infinite duplicate stream
The Fix:
Always capture the newest timestamp from array[0] before calling reverse().
import { type FastifyInstance } from 'fastify'
import { getDb, getSchema } from '../../../db'
import { eq, and, desc, gt } from 'drizzle-orm'
export default async function pollingStreamRoute(server: FastifyInstance) {
server.get('/logs/stream', {
sse: true,
preValidation: requirePermission('logs.view'),
schema: {
tags: ['Logs'],
summary: 'Stream logs via SSE with polling',
security: [{ cookieAuth: [] }]
}
}, async (request, reply) => {
const userId = request.user!.id
let pollInterval: NodeJS.Timeout | null = null
const db = getDb()
const { logs } = getSchema()
// Keep connection open
reply.sse.keepAlive()
// Send initial snapshot
const initialLogs = await db
.select()
.from(logs)
.where(eq(logs.user_id, userId))
.orderBy(desc(logs.created_at))
.limit(50)
reply.sse.send({
event: 'snapshot',
data: { logs: initialLogs }
})
// Track last sent timestamp for polling
let lastSentTimestamp = initialLogs[0]?.created_at || new Date(0)
// Poll for new logs every 3 seconds
pollInterval = setInterval(async () => {
// Check #1: Before starting async work
if (!reply.sse.isConnected) {
if (pollInterval) clearInterval(pollInterval)
return
}
try {
// Query for new logs (newest first)
const newLogs = await db
.select()
.from(logs)
.where(and(
eq(logs.user_id, userId),
gt(logs.created_at, lastSentTimestamp)
))
.orderBy(desc(logs.created_at))
.limit(100)
// ⚠️ CRITICAL: Check #2 after async operation completes
if (!reply.sse.isConnected) {
if (pollInterval) clearInterval(pollInterval)
return
}
if (newLogs.length > 0) {
// ⚠️ CRITICAL: Capture newest timestamp BEFORE reversing
const newestTimestamp = newLogs[0].created_at
// Send logs in chronological order (oldest first)
for (const log of newLogs.reverse()) {
// Check before each send
if (!reply.sse.isConnected) {
if (pollInterval) clearInterval(pollInterval)
return
}
reply.sse.send({
id: log.id,
event: 'log',
data: log
})
}
// Update to newest timestamp (captured before reversal)
lastSentTimestamp = newestTimestamp
}
} catch (error) {
server.log.error(error, 'Failed to poll for new logs')
// Don't crash - just log the error
}
}, 3000)
// Cleanup on disconnect
reply.sse.onClose(() => {
if (pollInterval) {
clearInterval(pollInterval)
pollInterval = null
}
server.log.debug({ userId }, 'Log stream closed')
})
})
}Without the second check after async operations:
Time 0ms: setInterval fires → Check isConnected ✅ (connected)
Time 5ms: Start database query (async)
Time 50ms: Client disconnects → isConnected = false
Time 100ms: Query completes → Call send() → CRASH! ❌
With proper checks:
Time 0ms: setInterval fires → Check #1 ✅ (connected)
Time 5ms: Start database query (async)
Time 50ms: Client disconnects → isConnected = false
Time 100ms: Query completes → Check #2 ✅ (disconnected) → Return early ✅
The client can disconnect during async operations, so checking only at the start of the interval is insufficient.
When implementing SSE with polling:
- ✅ Always check
reply.sse.isConnectedafter async operations - Client can disconnect during queries - ✅ Capture timestamps before array mutations -
array.reverse()mutates the array - ✅ Use
gt()(greater than) for timestamp filtering - Prevents re-sending same items - ✅ Clear interval on disconnect - Prevent memory leaks and unnecessary queries
- ✅ Send in chronological order - Oldest first for natural reading experience
- ✅ Wrap polling logic in try-catch - Don't crash on database errors
Common Pitfalls:
- ❌ Using
lastSentTimestamp = array[0]afterarray.reverse() - ❌ Only checking
isConnectedbefore async operations - ❌ Forgetting to clear interval in
onClosehandler - ❌ Not handling database errors gracefully
const eventSource = new EventSource('/api/notifications/stream', {
withCredentials: true // Include cookies for authentication
})
eventSource.addEventListener('notification', (event) => {
const data = JSON.parse(event.data)
console.log('New notification:', data)
})
eventSource.onerror = () => {
// Browser automatically reconnects
console.log('Connection lost, reconnecting...')
}Import types from the package:
import type { SSEMessage } from '@fastify/sse'
const message: SSEMessage = {
id: '123',
event: 'update',
data: { status: 'active' }
}- API Documentation Generation - General API development patterns
- API Security - Authorization middleware for protected SSE endpoints