-
-
Notifications
You must be signed in to change notification settings - Fork 24.2k
Expand file tree
/
Copy pathPredictionQueue.ts
More file actions
105 lines (94 loc) · 4.22 KB
/
PredictionQueue.ts
File metadata and controls
105 lines (94 loc) · 4.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import { DataSource } from 'typeorm'
import { executeFlow } from '../utils/buildChatflow'
import { IComponentNodes, IExecuteFlowParams } from '../Interface'
import { Telemetry } from '../utils/telemetry'
import { CachePool } from '../CachePool'
import { RedisEventPublisher } from './RedisEventPublisher'
import { AbortControllerPool } from '../AbortControllerPool'
import { BaseQueue } from './BaseQueue'
import { RedisOptions } from 'bullmq'
import { UsageCacheManager } from '../UsageCacheManager'
import logger from '../utils/logger'
import { generateAgentflowv2 as generateAgentflowv2_json } from 'flowise-components'
import { databaseEntities } from '../utils'
import { executeCustomNodeFunction } from '../utils/executeCustomNodeFunction'
interface PredictionQueueOptions {
appDataSource: DataSource
telemetry: Telemetry
cachePool: CachePool
componentNodes: IComponentNodes
abortControllerPool: AbortControllerPool
usageCacheManager: UsageCacheManager
}
interface IGenerateAgentflowv2Params extends IExecuteFlowParams {
prompt: string
componentNodes: IComponentNodes
toolNodes: IComponentNodes
selectedChatModel: Record<string, any>
question: string
isAgentFlowGenerator: boolean
}
export class PredictionQueue extends BaseQueue {
private componentNodes: IComponentNodes
private telemetry: Telemetry
private cachePool: CachePool
private appDataSource: DataSource
private abortControllerPool: AbortControllerPool
private usageCacheManager: UsageCacheManager
private redisPublisher: RedisEventPublisher
private queueName: string
constructor(name: string, connection: RedisOptions, options: PredictionQueueOptions) {
super(name, connection)
this.queueName = name
this.componentNodes = options.componentNodes || {}
this.telemetry = options.telemetry
this.cachePool = options.cachePool
this.appDataSource = options.appDataSource
this.abortControllerPool = options.abortControllerPool
this.usageCacheManager = options.usageCacheManager
this.redisPublisher = new RedisEventPublisher()
this.redisPublisher.connect()
}
public getQueueName() {
return this.queueName
}
public getQueue() {
return this.queue
}
async processJob(data: IExecuteFlowParams | IGenerateAgentflowv2Params) {
if (this.appDataSource) data.appDataSource = this.appDataSource
if (this.telemetry) data.telemetry = this.telemetry
if (this.cachePool) data.cachePool = this.cachePool
if (this.usageCacheManager) data.usageCacheManager = this.usageCacheManager
if (this.componentNodes) data.componentNodes = this.componentNodes
if (this.redisPublisher) data.sseStreamer = this.redisPublisher
if (Object.prototype.hasOwnProperty.call(data, 'isAgentFlowGenerator')) {
logger.info(`Generating Agentflow...`)
const { prompt, componentNodes, toolNodes, selectedChatModel, question } = data as IGenerateAgentflowv2Params
const options: Record<string, any> = {
appDataSource: this.appDataSource,
databaseEntities: databaseEntities,
logger: logger
}
return await generateAgentflowv2_json({ prompt, componentNodes, toolNodes, selectedChatModel }, question, options)
}
if (Object.prototype.hasOwnProperty.call(data, 'isExecuteCustomFunction')) {
const executeCustomFunctionData = data as any
logger.info(`[${executeCustomFunctionData.orgId}]: Executing Custom Function...`)
return await executeCustomNodeFunction({
appDataSource: this.appDataSource,
componentNodes: this.componentNodes,
data: executeCustomFunctionData.data,
workspaceId: executeCustomFunctionData.workspaceId,
orgId: executeCustomFunctionData.orgId
})
}
if (this.abortControllerPool) {
const abortControllerId = `${data.chatflow.id}_${data.chatId}`
const signal = new AbortController()
this.abortControllerPool.add(abortControllerId, signal)
data.signal = signal
}
return await executeFlow(data)
}
}