Similarly to #1582 and #1583 , we need a similar solution for the FlowAgent.
It's however a bit more involved.
The main loop has explicit waiting similarly:
async fn run(&self) -> Result<(), InternalError> {
// Main scanning loop
loop {
// Run scheduling for current time slot
self.tick_current_timeslot()
.instrument(tracing::debug_span!("FlowAgent::tick"))
.await?;
self.time_source
.sleep(self.agent_config.awaiting_step)
.await;
}
}
and each N seconds we are checking for the "nearest flow activation" moment in the database:
#[transactional_method]
async fn tick_current_timeslot(&self) -> Result<(), InternalError> {
let flow_event_store = transaction_catalog.get_one::<dyn FlowEventStore>().unwrap();
// Do we have a timeslot scheduled?
let Some(nearest_flow_activation_moment) =
flow_event_store.nearest_flow_activation_moment().await?
else {
return Ok(());
};
// Is it time to execute it yet?
let current_time = self.time_source.now();
if nearest_flow_activation_moment > current_time {
return Ok(());
}
...
However, we don't just activate when there is data, we also have to activate when current time is not earlier than this activation moment.
The query to the database looks like the following:
SELECT f.scheduled_for_activation_at as activation_time
FROM flows f
WHERE
f.scheduled_for_activation_at IS NOT NULL AND
(f.flow_status = 'waiting'::flow_status_type OR f.flow_status = 'retrying'::flow_status_type)
ORDER BY f.scheduled_for_activation_at
LIMIT 1
This could be replaced with NOTIFY/LISTEN channel, which is triggered whenever there is a change to scheduled_for_activation_at or flow_status fields.
However, we need to couple it with some kind of managed timers at a higher level.
So, the NOTIFY part will tell us the most up-to-date moment when the flow agent needs to activate.
This should be used somehow to wakeup the timeslot process.
However, if another flow is scheduled or this flow is cancelled, we must be able to cancel / reschedule that timeslot process.
Similarly to #1582 and #1583 , we need a similar solution for the
FlowAgent.It's however a bit more involved.
The main loop has explicit waiting similarly:
and each N seconds we are checking for the "nearest flow activation" moment in the database:
However, we don't just activate when there is data, we also have to activate when current time is not earlier than this activation moment.
The query to the database looks like the following:
This could be replaced with NOTIFY/LISTEN channel, which is triggered whenever there is a change to
scheduled_for_activation_atorflow_statusfields.However, we need to couple it with some kind of managed timers at a higher level.
So, the NOTIFY part will tell us the most up-to-date moment when the flow agent needs to activate.
This should be used somehow to wakeup the timeslot process.
However, if another flow is scheduled or this flow is cancelled, we must be able to cancel / reschedule that timeslot process.