@@ -3,6 +3,7 @@ import cors from 'cors'
33import helmet from 'helmet'
44import bunyanMiddleware from 'bunyan-middleware'
55import { PostHog } from 'posthog-node'
6+ import * as http from 'http'
67import { authMiddleware } from '../middlewares/authMiddleware'
78import { tenantMiddleware } from '../middlewares/tenantMiddleware'
89import { databaseMiddleware } from '../middlewares/databaseMiddleware'
@@ -16,16 +17,41 @@ import { responseHandlerMiddleware } from '../middlewares/responseHandlerMiddlew
1617import { errorMiddleware } from '../middlewares/errorMiddleware'
1718import { passportStrategyMiddleware } from '../middlewares/passportStrategyMiddleware'
1819import { redisMiddleware } from '../middlewares/redisMiddleware'
19- import { createRedisClient } from '../utils/redis'
2020import { POSTHOG_CONFIG } from '../config'
21+ import { createRedisClient , createRedisPubSubPair } from '../utils/redis'
22+ import WebSockets from './websockets'
23+ import RedisPubSubReceiver from '../utils/redis/pubSubReceiver'
24+ import { ApiWebsocketMessage } from '../types/mq/apiWebsocketMessage'
2125
2226const serviceLogger = createServiceLogger ( )
2327
2428const app = express ( )
2529
30+ const server = http . createServer ( app )
31+
2632setImmediate ( async ( ) => {
2733 const redis = await createRedisClient ( true )
2834
35+ const redisPubSubPair = await createRedisPubSubPair ( )
36+ const userNamespace = await WebSockets . initialize ( server )
37+
38+ const pubSubReceiver = new RedisPubSubReceiver ( 'api-pubsub' , redisPubSubPair . subClient , ( err ) => {
39+ serviceLogger . error ( err , 'Error while listening to Redis Pub/Sub api-ws channel!' )
40+ process . exit ( 1 )
41+ } )
42+
43+ pubSubReceiver . subscribe ( 'user' , async ( message ) => {
44+ const data = message as ApiWebsocketMessage
45+
46+ if ( data . tenantId ) {
47+ await userNamespace . emitForTenant ( data . tenantId , data . event , data . data )
48+ } else if ( data . userId ) {
49+ userNamespace . emitToUserRoom ( data . userId , data . event , data . data )
50+ } else {
51+ serviceLogger . error ( { type : data . type } , 'Received invalid websocket message!' )
52+ }
53+ } )
54+
2955 let posthog = null
3056
3157 if ( POSTHOG_CONFIG . apiKey ) {
@@ -151,4 +177,4 @@ setImmediate(async () => {
151177 app . use ( io . expressErrorHandler ( ) )
152178} )
153179
154- export default app
180+ export default server
0 commit comments