Skip to content

Commit d13799f

Browse files
feat: implement scheduling functionality for AgentflowV2
- Added ScheduleQueue class for managing scheduled jobs using BullMQ. - Introduced scheduleService for creating, updating, and disabling schedules. - Integrated schedule validation and cron expression handling. - Enhanced chatflow service to manage schedules for agentflows. - Added UI components for time and day selection (TimePicker, WeekDaysPicker, MonthDaysPicker). - Updated NodeInputHandler to support new input types for scheduling. - Improved validation for schedule configurations in agentflows.
1 parent e548b40 commit d13799f

27 files changed

Lines changed: 1879 additions & 11 deletions

packages/components/nodes/agentflow/Start/Start.ts

Lines changed: 143 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class Start_Agentflow implements INode {
1818
constructor() {
1919
this.label = 'Start'
2020
this.name = 'startAgentflow'
21-
this.version = 1.1
21+
this.version = 1.2
2222
this.type = 'Start'
2323
this.category = 'Agent Flows'
2424
this.description = 'Starting point of the agentflow'
@@ -40,6 +40,11 @@ class Start_Agentflow implements INode {
4040
label: 'Form Input',
4141
name: 'formInput',
4242
description: 'Start the workflow with form inputs'
43+
},
44+
{
45+
label: 'Schedule Input',
46+
name: 'scheduleInput',
47+
description: 'Start the workflow on a recurring schedule (cron)'
4348
}
4449
],
4550
default: 'chatInput'
@@ -125,6 +130,135 @@ class Start_Agentflow implements INode {
125130
}
126131
]
127132
},
133+
{
134+
label: 'Schedule Type',
135+
name: 'scheduleType',
136+
type: 'options',
137+
options: [
138+
{
139+
label: 'Visual Picker',
140+
name: 'visualPicker',
141+
description: 'Use a visual picker to select schedule options'
142+
},
143+
{
144+
label: 'Cron Expression',
145+
name: 'cronExpression',
146+
description: 'Use a cron expression to define the schedule'
147+
}
148+
],
149+
show: {
150+
startInputType: 'scheduleInput'
151+
}
152+
},
153+
{
154+
label: 'Cron Expression',
155+
name: 'scheduleCronExpression',
156+
type: 'string',
157+
placeholder: '0 9 * * 1-5',
158+
description:
159+
'Standard 5-field cron expression (minute hour day month weekday). Example: "0 9 * * 1-5" runs at 09:00 every weekday.',
160+
show: {
161+
startInputType: 'scheduleInput',
162+
scheduleType: 'cronExpression'
163+
}
164+
},
165+
{
166+
label: 'Frequency',
167+
name: 'scheduleFrequency',
168+
type: 'options',
169+
options: [
170+
{
171+
label: 'Hourly',
172+
name: 'hourly',
173+
description: 'Run every hour at the specified time'
174+
},
175+
{
176+
label: 'Daily',
177+
name: 'daily',
178+
description: 'Run every day at the specified time'
179+
},
180+
{
181+
label: 'Weekly',
182+
name: 'weekly',
183+
description: 'Run every week on the specified day and time'
184+
},
185+
{
186+
label: 'Monthly',
187+
name: 'monthly',
188+
description: 'Run every month on the specified date and time'
189+
}
190+
],
191+
show: {
192+
startInputType: 'scheduleInput',
193+
scheduleType: 'visualPicker'
194+
}
195+
},
196+
{
197+
label: 'On Minute',
198+
name: 'scheduleOnMinute',
199+
type: 'number',
200+
placeholder: '30',
201+
description:
202+
'Minute of the hour when the schedule should run (0-59). For example, "30" means the schedule will run at the 30th minute of the hour.',
203+
show: {
204+
startInputType: 'scheduleInput',
205+
scheduleType: 'visualPicker',
206+
scheduleFrequency: 'hourly'
207+
}
208+
},
209+
{
210+
label: 'On Time',
211+
name: 'scheduleOnTime',
212+
type: 'timePicker',
213+
show: {
214+
startInputType: 'scheduleInput',
215+
scheduleType: 'visualPicker',
216+
scheduleFrequency: ['daily', 'weekly', 'monthly']
217+
}
218+
},
219+
{
220+
label: 'On Day of Week',
221+
name: 'scheduleOnDayOfWeek',
222+
type: 'weekDaysPicker',
223+
show: {
224+
startInputType: 'scheduleInput',
225+
scheduleType: 'visualPicker',
226+
scheduleFrequency: 'weekly'
227+
}
228+
},
229+
{
230+
label: 'On Day of Month',
231+
name: 'scheduleOnDayOfMonth',
232+
type: 'monthDaysPicker',
233+
show: {
234+
startInputType: 'scheduleInput',
235+
scheduleType: 'visualPicker',
236+
scheduleFrequency: 'monthly'
237+
}
238+
},
239+
{
240+
label: 'Timezone',
241+
name: 'scheduleTimezone',
242+
type: 'string',
243+
placeholder: 'UTC',
244+
description: 'IANA timezone name, e.g. America/New_York. Defaults to UTC.',
245+
optional: true,
246+
show: {
247+
startInputType: 'scheduleInput'
248+
}
249+
},
250+
{
251+
label: 'Default Input',
252+
name: 'scheduleDefaultInput',
253+
type: 'string',
254+
placeholder: 'Run the daily report',
255+
description: 'Default question/input passed to the flow when it is triggered by the scheduler.',
256+
acceptVariable: true,
257+
rows: 4,
258+
show: {
259+
startInputType: 'scheduleInput'
260+
}
261+
},
128262
{
129263
label: 'Ephemeral Memory',
130264
name: 'startEphemeralMemory',
@@ -213,6 +347,14 @@ class Start_Agentflow implements INode {
213347
outputData.form = form
214348
}
215349

350+
if (startInputType === 'scheduleInput') {
351+
const defaultInput = nodeData.inputs?.scheduleDefaultInput as string
352+
const effectiveInput = (typeof input === 'string' && input) || defaultInput || ''
353+
inputData.question = effectiveInput
354+
outputData.question = effectiveInput
355+
outputData.scheduledAt = options.agentflowRuntime?.scheduledAt ?? new Date().toISOString()
356+
}
357+
216358
if (startEphemeralMemory) {
217359
outputData.ephemeralMemory = true
218360
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import { DataSource } from 'typeorm'
2+
import { IComponentNodes } from './Interface'
3+
import { Telemetry } from './utils/telemetry'
4+
import { CachePool } from './CachePool'
5+
import { UsageCacheManager } from './UsageCacheManager'
6+
7+
export interface IScheduleQueueAppServer {
8+
appDataSource: DataSource
9+
componentNodes: IComponentNodes
10+
telemetry: Telemetry
11+
cachePool: CachePool
12+
usageCacheManager: UsageCacheManager
13+
}
14+
15+
export interface IScheduleAgentflowJobData extends IScheduleQueueAppServer {
16+
scheduleRecordId: string
17+
targetId: string
18+
cronExpression: string
19+
timezone: string
20+
defaultInput?: string
21+
workspaceId: string
22+
scheduledAt: string // ISO string
23+
}

packages/server/src/commands/worker.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ interface CustomListener extends QueueEventsListener {
1616
export default class Worker extends BaseCommand {
1717
predictionWorkerId: string
1818
upsertionWorkerId: string
19+
scheduleWorkerId: string
1920

2021
async run(): Promise<void> {
2122
logger.info('Starting Flowise Worker...')
@@ -51,6 +52,12 @@ export default class Worker extends BaseCommand {
5152
this.upsertionWorkerId = upsertionWorker.id
5253
logger.info(`Upsertion Worker ${this.upsertionWorkerId} created`)
5354

55+
/** Schedule */
56+
const scheduleQueue = queueManager.getQueue('schedule')
57+
const scheduleWorker = scheduleQueue.createWorker()
58+
this.scheduleWorkerId = scheduleWorker.id
59+
logger.info(`Schedule Worker ${this.scheduleWorkerId} created`)
60+
5461
// Keep the process running
5562
process.stdin.resume()
5663
}
@@ -98,6 +105,10 @@ export default class Worker extends BaseCommand {
98105
const upsertWorker = queueManager.getQueue('upsert').getWorker()
99106
logger.info(`Shutting down Flowise Upsertion Worker ${this.upsertionWorkerId}...`)
100107
await upsertWorker.close()
108+
109+
const scheduleWorker = queueManager.getQueue('schedule').getWorker()
110+
logger.info(`Shutting down Flowise Schedule Worker...`)
111+
await scheduleWorker.close()
101112
} catch (error) {
102113
logger.error('There was an error shutting down Flowise Worker...', error)
103114
await this.failExit()
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/* eslint-disable */
2+
import { Entity, Column, PrimaryGeneratedColumn, CreateDateColumn, UpdateDateColumn, Index } from 'typeorm'
3+
4+
export enum ScheduleTriggerType {
5+
AGENTFLOW = 'AGENTFLOW'
6+
}
7+
8+
@Entity()
9+
export class ScheduleRecord {
10+
@PrimaryGeneratedColumn('uuid')
11+
id: string
12+
13+
/** Discriminator: which entity type is being scheduled */
14+
@Column({ type: 'varchar', length: 32 })
15+
triggerType: ScheduleTriggerType
16+
17+
/** FK to the target entity (ChatFlow.id for AGENTFLOW) */
18+
@Index()
19+
@Column({ type: 'varchar' })
20+
targetId: string
21+
22+
/** Node ID within the flow (for traceability) */
23+
@Column({ nullable: true, type: 'text' })
24+
nodeId?: string
25+
26+
/** Standard 5 or 6 field cron expression */
27+
@Column({ type: 'text' })
28+
cronExpression: string
29+
30+
/** IANA timezone string, e.g. "UTC" or "America/New_York" */
31+
@Column({ type: 'varchar', length: 64, default: 'UTC' })
32+
timezone: string
33+
34+
/** Whether the schedule is active */
35+
@Column({ type: 'boolean', default: true })
36+
enabled: boolean
37+
38+
/** Optional static text sent as question when the flow fires */
39+
@Column({ nullable: true, type: 'text' })
40+
defaultInput?: string
41+
42+
@Column({ nullable: true, type: 'timestamp' })
43+
lastRunAt?: Date
44+
45+
@Column({ nullable: true, type: 'timestamp' })
46+
nextRunAt?: Date
47+
48+
@Column({ type: 'varchar' })
49+
workspaceId: string
50+
51+
@Column({ type: 'timestamp' })
52+
@CreateDateColumn()
53+
createdDate: Date
54+
55+
@Column({ type: 'timestamp' })
56+
@UpdateDateColumn()
57+
updatedDate: Date
58+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/* eslint-disable */
2+
import { Entity, Column, PrimaryGeneratedColumn, CreateDateColumn, Index } from 'typeorm'
3+
import { ScheduleTriggerType } from './ScheduleRecord'
4+
5+
export enum ScheduleTriggerStatus {
6+
QUEUED = 'QUEUED',
7+
RUNNING = 'RUNNING',
8+
SUCCEEDED = 'SUCCEEDED',
9+
FAILED = 'FAILED',
10+
SKIPPED = 'SKIPPED'
11+
}
12+
13+
@Entity()
14+
export class ScheduleTriggerLog {
15+
@PrimaryGeneratedColumn('uuid')
16+
id: string
17+
18+
@Index()
19+
@Column({ type: 'varchar' })
20+
scheduleRecordId: string
21+
22+
@Column({ type: 'varchar', length: 32 })
23+
triggerType: ScheduleTriggerType
24+
25+
@Index()
26+
@Column({ type: 'varchar' })
27+
targetId: string
28+
29+
/** Resulting execution/chatMessage ID (for agentflow triggers) */
30+
@Column({ nullable: true, type: 'varchar' })
31+
executionId?: string
32+
33+
@Column({ type: 'varchar', length: 32 })
34+
status: ScheduleTriggerStatus
35+
36+
@Column({ nullable: true, type: 'text' })
37+
error?: string
38+
39+
@Column({ nullable: true, type: 'integer' })
40+
elapsedTimeMs?: number
41+
42+
@Column({ type: 'timestamp' })
43+
scheduledAt: Date
44+
45+
@Column({ type: 'varchar' })
46+
workspaceId: string
47+
48+
@Column({ type: 'timestamp' })
49+
@CreateDateColumn()
50+
createdDate: Date
51+
}

packages/server/src/database/entities/index.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import { Workspace } from '../../enterprise/database/entities/workspace.entity'
2626
import { WorkspaceUser } from '../../enterprise/database/entities/workspace-user.entity'
2727
import { LoginMethod } from '../../enterprise/database/entities/login-method.entity'
2828
import { LoginSession } from '../../enterprise/database/entities/login-session.entity'
29+
import { ScheduleRecord } from './ScheduleRecord'
30+
import { ScheduleTriggerLog } from './ScheduleTriggerLog'
2931

3032
export const entities = {
3133
ChatFlow,
@@ -57,5 +59,7 @@ export const entities = {
5759
Workspace,
5860
WorkspaceUser,
5961
LoginMethod,
60-
LoginSession
62+
LoginSession,
63+
ScheduleRecord,
64+
ScheduleTriggerLog
6165
}

0 commit comments

Comments
 (0)