-
-
Notifications
You must be signed in to change notification settings - Fork 24.3k
Expand file tree
/
Copy pathindex.ts
More file actions
400 lines (348 loc) · 17.4 KB
/
index.ts
File metadata and controls
400 lines (348 loc) · 17.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
import { ExpressAdapter } from '@bull-board/express'
import cookieParser from 'cookie-parser'
import cors from 'cors'
import express, { Request, Response } from 'express'
import 'global-agent/bootstrap'
import http from 'http'
import path from 'path'
import { DataSource } from 'typeorm'
import { AbortControllerPool } from './AbortControllerPool'
import { CachePool } from './CachePool'
import { ChatFlow } from './database/entities/ChatFlow'
import { getDataSource } from './DataSource'
import { Organization } from './enterprise/database/entities/organization.entity'
import { Workspace } from './enterprise/database/entities/workspace.entity'
import { LoggedInUser } from './enterprise/Interface.Enterprise'
import { initializeJwtCookieMiddleware, verifyToken, verifyTokenForBullMQDashboard } from './enterprise/middleware/passport'
import { initAuthSecrets } from './enterprise/utils/authSecrets'
import { IdentityManager } from './IdentityManager'
import { MODE, Platform } from './Interface'
import { IMetricsProvider } from './Interface.Metrics'
import { OpenTelemetry } from './metrics/OpenTelemetry'
import { Prometheus } from './metrics/Prometheus'
import errorHandlerMiddleware from './middlewares/errors'
import { NodesPool } from './NodesPool'
import { QueueManager } from './queue/QueueManager'
import { RedisEventSubscriber } from './queue/RedisEventSubscriber'
import flowiseApiV1Router from './routes'
import { UsageCacheManager } from './UsageCacheManager'
import { getEncryptionKey, getNodeModulesPackagePath } from './utils'
import { API_KEY_BLACKLIST_URLS, WHITELIST_URLS } from './utils/constants'
import logger, { expressRequestLogger } from './utils/logger'
import { RateLimiterManager } from './utils/rateLimit'
import { SSEStreamer } from './utils/SSEStreamer'
import { Telemetry } from './utils/telemetry'
import { validateAPIKey } from './utils/validateKey'
import { getAllowedIframeOrigins, getCorsOptions, sanitizeMiddleware } from './utils/XSS'
declare global {
namespace Express {
interface User extends LoggedInUser {}
interface Request {
user?: LoggedInUser
}
namespace Multer {
interface File {
bucket: string
key: string
acl: string
contentType: string
contentDisposition: null
storageClass: string
serverSideEncryption: null
metadata: any
location: string
etag: string
}
}
}
}
export class App {
app: express.Application
nodesPool: NodesPool
abortControllerPool: AbortControllerPool
cachePool: CachePool
telemetry: Telemetry
rateLimiterManager: RateLimiterManager
AppDataSource: DataSource = getDataSource()
sseStreamer: SSEStreamer
identityManager: IdentityManager
metricsProvider: IMetricsProvider
queueManager: QueueManager
redisSubscriber: RedisEventSubscriber
usageCacheManager: UsageCacheManager
sessionStore: any
constructor() {
this.app = express()
}
async initDatabase() {
// Initialize database
try {
await this.AppDataSource.initialize()
logger.info('📦 [server]: Data Source initialized successfully')
// Run Migrations Scripts
await this.AppDataSource.runMigrations({ transaction: 'each' })
logger.info('🔄 [server]: Database migrations completed successfully')
// Initialize Identity Manager
this.identityManager = await IdentityManager.getInstance()
logger.info('🔐 [server]: Identity Manager initialized successfully')
// Initialize nodes pool
this.nodesPool = new NodesPool()
await this.nodesPool.initialize()
logger.info('🔧 [server]: Nodes pool initialized successfully')
// Initialize abort controllers pool
this.abortControllerPool = new AbortControllerPool()
logger.info('⏹️ [server]: Abort controllers pool initialized successfully')
// Initialize encryption key
await getEncryptionKey()
logger.info('🔑 [server]: Encryption key initialized successfully')
// Initialize auth secrets (env → AWS Secrets Manager → filesystem)
await initAuthSecrets()
logger.info('🔐 [server]: Auth initialized successfully')
// Initialize Rate Limit
this.rateLimiterManager = RateLimiterManager.getInstance()
await this.rateLimiterManager.initializeRateLimiters(await getDataSource().getRepository(ChatFlow).find())
logger.info('🚦 [server]: Rate limiters initialized successfully')
// Initialize cache pool
this.cachePool = new CachePool()
logger.info('💾 [server]: Cache pool initialized successfully')
// Initialize usage cache manager
this.usageCacheManager = await UsageCacheManager.getInstance()
logger.info('📊 [server]: Usage cache manager initialized successfully')
// Initialize telemetry
this.telemetry = new Telemetry()
logger.info('📈 [server]: Telemetry initialized successfully')
// Initialize SSE Streamer
this.sseStreamer = new SSEStreamer()
this.sseStreamer.startHeartbeat()
logger.info('🌊 [server]: SSE Streamer initialized successfully')
// Init Queues
if (process.env.MODE === MODE.QUEUE) {
this.queueManager = QueueManager.getInstance()
const serverAdapter = new ExpressAdapter()
serverAdapter.setBasePath('/admin/queues')
this.queueManager.setupAllQueues({
componentNodes: this.nodesPool.componentNodes,
telemetry: this.telemetry,
cachePool: this.cachePool,
appDataSource: this.AppDataSource,
abortControllerPool: this.abortControllerPool,
usageCacheManager: this.usageCacheManager,
serverAdapter
})
logger.info('✅ [Queue]: All queues setup successfully')
this.redisSubscriber = new RedisEventSubscriber(this.sseStreamer)
await this.redisSubscriber.connect()
this.redisSubscriber.startPeriodicCleanup()
logger.info('🔗 [server]: Redis event subscriber connected successfully')
}
logger.info('🎉 [server]: All initialization steps completed successfully!')
} catch (error) {
logger.error('❌ [server]: Error during Data Source initialization:', error)
}
}
async config() {
// Limit is needed to allow sending/receiving base64 encoded string
const flowise_file_size_limit = process.env.FLOWISE_FILE_SIZE_LIMIT || '50mb'
this.app.use(express.json({ limit: flowise_file_size_limit }))
this.app.use(express.urlencoded({ limit: flowise_file_size_limit, extended: true }))
// Enhanced trust proxy settings for load balancer
let trustProxy: string | boolean | number | undefined = process.env.TRUST_PROXY
if (typeof trustProxy === 'undefined' || trustProxy.trim() === '' || trustProxy === 'true') {
// Default to trust all proxies
trustProxy = true
} else if (trustProxy === 'false') {
// Disable trust proxy
trustProxy = false
} else if (!isNaN(Number(trustProxy))) {
// Number: Trust specific number of proxies
trustProxy = Number(trustProxy)
}
this.app.set('trust proxy', trustProxy)
// Allow access from specified domains
this.app.use(cors(getCorsOptions()))
// Parse cookies
this.app.use(cookieParser())
// Allow embedding from specified domains.
this.app.use((req, res, next) => {
const allowedOrigins = getAllowedIframeOrigins()
if (allowedOrigins === '*') {
// Explicitly allow all origins (only when user opts in)
res.setHeader('Content-Security-Policy', 'frame-ancestors *')
} else {
const csp = `frame-ancestors ${allowedOrigins}`
res.setHeader('Content-Security-Policy', csp)
// X-Frame-Options for legacy browser support
if (allowedOrigins === "'self'") {
res.setHeader('X-Frame-Options', 'SAMEORIGIN')
} else {
res.setHeader('X-Frame-Options', 'DENY')
}
}
next()
})
// Switch off the default 'X-Powered-By: Express' header
this.app.disable('x-powered-by')
// Add the expressRequestLogger middleware to log all requests
this.app.use(expressRequestLogger)
// Add the sanitizeMiddleware to guard against XSS
this.app.use(sanitizeMiddleware)
const denylistURLs = process.env.DENYLIST_URLS ? process.env.DENYLIST_URLS.split(',') : []
const whitelistURLs = WHITELIST_URLS.filter((url) => !denylistURLs.includes(url))
const URL_CASE_INSENSITIVE_REGEX: RegExp = /\/api\/v1\//i
const URL_CASE_SENSITIVE_REGEX: RegExp = /\/api\/v1\//
await initializeJwtCookieMiddleware(this.app, this.identityManager)
this.app.use(async (req, res, next) => {
// Step 1: Check if the req path contains /api/v1 regardless of case
if (URL_CASE_INSENSITIVE_REGEX.test(req.path)) {
// Step 2: Check if the req path is casesensitive
if (URL_CASE_SENSITIVE_REGEX.test(req.path)) {
// Step 3: Check if the req path is in the whitelist
const isWhitelisted = whitelistURLs.some((url) => req.path.startsWith(url))
if (isWhitelisted) {
next()
} else if (req.headers['x-request-from'] === 'internal') {
verifyToken(req, res, next)
} else {
const isAPIKeyBlacklistedURLS = API_KEY_BLACKLIST_URLS.some((url) => req.path.startsWith(url))
if (isAPIKeyBlacklistedURLS) {
return res.status(401).json({ error: 'Unauthorized Access' })
}
// Only check license validity for non-open-source platforms
if (this.identityManager.getPlatformType() !== Platform.OPEN_SOURCE) {
if (!this.identityManager.isLicenseValid()) {
return res.status(401).json({ error: 'Unauthorized Access' })
}
}
const { isValid, apiKey } = await validateAPIKey(req)
if (!isValid || !apiKey) {
return res.status(401).json({ error: 'Unauthorized Access' })
}
// Find workspace
const workspace = await this.AppDataSource.getRepository(Workspace).findOne({
where: { id: apiKey.workspaceId }
})
if (!workspace) {
return res.status(401).json({ error: 'Unauthorized Access' })
}
// Find organization
const activeOrganizationId = workspace.organizationId as string
const org = await this.AppDataSource.getRepository(Organization).findOne({
where: { id: activeOrganizationId }
})
if (!org) {
return res.status(401).json({ error: 'Unauthorized Access' })
}
const subscriptionId = org.subscriptionId as string
const customerId = org.customerId as string
const features = await this.identityManager.getFeaturesByPlan(subscriptionId)
const productId = await this.identityManager.getProductIdFromSubscription(subscriptionId)
// @ts-ignore
req.user = {
permissions: apiKey.permissions,
features,
activeOrganizationId: activeOrganizationId,
activeOrganizationSubscriptionId: subscriptionId,
activeOrganizationCustomerId: customerId,
activeOrganizationProductId: productId,
isOrganizationAdmin: false,
activeWorkspaceId: workspace.id,
activeWorkspace: workspace.name
}
next()
}
} else {
return res.status(401).json({ error: 'Unauthorized Access' })
}
} else {
// If the req path does not contain /api/v1, then allow the request to pass through, example: /assets, /canvas
next()
}
})
// this is for SSO and must be after the JWT cookie middleware
await this.identityManager.initializeSSO(this.app)
if (process.env.ENABLE_METRICS === 'true') {
switch (process.env.METRICS_PROVIDER) {
// default to prometheus
case 'prometheus':
case undefined:
this.metricsProvider = new Prometheus(this.app)
break
case 'open_telemetry':
this.metricsProvider = new OpenTelemetry(this.app)
break
// add more cases for other metrics providers here
}
if (this.metricsProvider) {
await this.metricsProvider.initializeCounters()
logger.info(`📊 [server]: Metrics Provider [${this.metricsProvider.getName()}] has been initialized!`)
} else {
logger.error(
"❌ [server]: Metrics collection is enabled, but failed to initialize provider (valid values are 'prometheus' or 'open_telemetry'."
)
}
}
this.app.use('/api/v1', flowiseApiV1Router)
// ----------------------------------------
// Configure number of proxies in Host Environment
// ----------------------------------------
this.app.get('/api/v1/ip', (request, response) => {
response.send({
ip: request.ip,
msg: 'Check returned IP address in the response. If it matches your current IP address ( which you can get by going to http://ip.nfriedly.com/ or https://api.ipify.org/ ), then the number of proxies is correct and the rate limiter should now work correctly. If not, increase the number of proxies by 1 and restart Cloud-Hosted Flowise until the IP address matches your own. Visit https://docs.flowiseai.com/configuration/rate-limit#cloud-hosted-rate-limit-setup-guide for more information.'
})
})
if (process.env.MODE === MODE.QUEUE && process.env.ENABLE_BULLMQ_DASHBOARD === 'true' && !this.identityManager.isCloud()) {
// Initialize admin queues rate limiter
const id = 'bullmq_admin_dashboard'
await this.rateLimiterManager.addRateLimiter(
id,
60,
100,
process.env.ADMIN_RATE_LIMIT_MESSAGE || 'Too many requests to admin dashboard, please try again later.'
)
const rateLimiter = this.rateLimiterManager.getRateLimiterById(id)
this.app.use('/admin/queues', rateLimiter, verifyTokenForBullMQDashboard, this.queueManager.getBullBoardRouter())
}
// ----------------------------------------
// Serve UI static
// ----------------------------------------
const packagePath = getNodeModulesPackagePath('flowise-ui')
const uiBuildPath = path.join(packagePath, 'build')
const uiHtmlPath = path.join(packagePath, 'build', 'index.html')
this.app.use('/', express.static(uiBuildPath))
// All other requests not handled will return React app
this.app.use((req: Request, res: Response) => {
res.sendFile(uiHtmlPath)
})
// Error handling
this.app.use(errorHandlerMiddleware)
}
async stopApp() {
try {
this.sseStreamer.stopHeartbeat()
const removePromises: any[] = []
removePromises.push(this.telemetry.flush())
if (this.queueManager) {
removePromises.push(this.redisSubscriber.disconnect())
}
await Promise.all(removePromises)
} catch (e) {
logger.error(`❌[server]: Flowise Server shut down error: ${e}`)
}
}
}
let serverApp: App | undefined
export async function start(): Promise<void> {
serverApp = new App()
const host = process.env.HOST
const port = parseInt(process.env.PORT || '', 10) || 3000
const server = http.createServer(serverApp.app)
await serverApp.initDatabase()
await serverApp.config()
server.listen(port, host, () => {
logger.info(`⚡️ [server]: Flowise Server is listening at ${host ? 'http://' + host : ''}:${port}`)
})
}
export function getInstance(): App | undefined {
return serverApp
}