@@ -18,7 +18,6 @@ use std::process;
1818use std:: process:: Command ;
1919use std:: time:: Duration ;
2020use thiserror:: Error ;
21- use tokio:: runtime:: Handle ;
2221use tokio_cron_scheduler:: { Job , JobScheduler } ;
2322
2423#[ allow( dead_code) ]
@@ -59,7 +58,7 @@ async fn main() -> Result<(), anyhow::Error> {
5958
6059 env_logger:: Builder :: from_env ( Env :: default ( ) . default_filter_or ( "info" ) ) . init ( ) ;
6160 let host_dir = env:: var ( "HOST_DIR" ) . unwrap_or_else ( |_| DEFAULT_BASE_DIR . to_string ( ) ) ;
62- let core_dir = env:: var ( "CORE_DIR" ) . unwrap_or_else ( |_| DEFAULT_CORE_DIR . to_string ( ) ) ;
61+ let core_dir_command = env:: var ( "CORE_DIR" ) . unwrap_or_else ( |_| DEFAULT_CORE_DIR . to_string ( ) ) ;
6362 let suid = env:: var ( "SUID_DUMPABLE" ) . unwrap_or_else ( |_| DEFAULT_SUID_DUMPABLE . to_string ( ) ) ;
6463 let deploy_crio_config = env:: var ( "DEPLOY_CRIO_CONFIG" )
6564 . unwrap_or_else ( |_| "false" . to_string ( ) )
@@ -94,9 +93,8 @@ async fn main() -> Result<(), anyhow::Error> {
9493 info ! ( "Uploading {}" , file) ;
9594 process_file ( p, & bucket) . await ;
9695 } else {
97- let core_store = core_dir. clone ( ) ;
98- info ! ( "Uploading all content in {}" , core_store) ;
99- run_polling_agent ( core_store. as_str ( ) ) . await ;
96+ info ! ( "Uploading all content in {}" , core_dir_command) ;
97+ run_polling_agent ( ) . await ;
10098 }
10199 process:: exit ( 0 ) ;
102100 }
@@ -119,7 +117,7 @@ async fn main() -> Result<(), anyhow::Error> {
119117 format ! ( "{}/core_pattern.bak" , host_location) . as_str ( ) ,
120118 format ! (
121119 "|{}/{} -c=%c -e=%e -p=%p -s=%s -t=%t -d={} -h=%h -E=%E" ,
122- host_location, CDC_NAME , core_dir
120+ host_location, CDC_NAME , core_dir_command
123121 )
124122 . as_str ( ) ,
125123 ) ?;
@@ -135,8 +133,6 @@ async fn main() -> Result<(), anyhow::Error> {
135133 & suid,
136134 ) ?;
137135
138- let core_location = core_dir. clone ( ) ;
139-
140136 create_env_file ( host_location) ?;
141137 // Run polling agent on startup to clean up files.
142138
@@ -155,7 +151,7 @@ async fn main() -> Result<(), anyhow::Error> {
155151 std:: thread:: sleep ( Duration :: from_millis ( 1000 ) ) ;
156152 }
157153 } else {
158- run_polling_agent ( core_location . as_str ( ) ) . await ;
154+ run_polling_agent ( ) . await ;
159155 }
160156
161157 if !interval. is_empty ( ) && !schedule. is_empty ( ) {
@@ -180,7 +176,6 @@ async fn main() -> Result<(), anyhow::Error> {
180176 }
181177 }
182178
183- let notify_location = core_location. clone ( ) ;
184179 if !schedule. is_empty ( ) {
185180 info ! ( "Schedule Initialising with: {}" , schedule) ;
186181 let sched = match JobScheduler :: new ( ) . await {
@@ -190,12 +185,18 @@ async fn main() -> Result<(), anyhow::Error> {
190185 panic ! ( "Schedule Creation Failed with {}" , e)
191186 }
192187 } ;
193- let s_job = match Job :: new ( schedule. as_str ( ) , move |_uuid, _l| {
194- let handle = Handle :: current ( ) ;
195- let core_str = core_location. clone ( ) ;
196- handle. spawn ( async move {
197- run_polling_agent ( & core_str) . await ;
198- } ) ;
188+
189+ let s_job = match Job :: new_async ( schedule. as_str ( ) , move |uuid, mut l| {
190+ Box :: pin ( async move {
191+ let next_tick = l. next_tick_for_job ( uuid) . await ;
192+ match next_tick {
193+ Ok ( Some ( ts) ) => {
194+ info ! ( "Next scheduled run {:?}" , ts) ;
195+ run_polling_agent ( ) . await ;
196+ }
197+ _ => warn ! ( "Could not get next tick for job" ) ,
198+ }
199+ } )
199200 } ) {
200201 Ok ( v) => v,
201202 Err ( e) => {
@@ -218,6 +219,9 @@ async fn main() -> Result<(), anyhow::Error> {
218219 panic ! ( "Schedule Start failed, {:#?}" , e) ;
219220 }
220221 } ;
222+ loop {
223+ std:: thread:: sleep ( Duration :: from_millis ( 100 ) ) ;
224+ }
221225 }
222226
223227 if use_inotify == "true" {
@@ -231,14 +235,14 @@ async fn main() -> Result<(), anyhow::Error> {
231235 }
232236 } ;
233237 info ! ( "INotify Initialised..." ) ;
234- match inotify. add_watch ( & notify_location , WatchMask :: CLOSE ) {
238+ match inotify. add_watch ( & core_dir_command , WatchMask :: CLOSE ) {
235239 Ok ( _) => { }
236240 Err ( e) => {
237241 error ! ( "Add watch failed: {}" , e) ;
238242 panic ! ( "Add watch failed: {}" , e)
239243 }
240244 } ;
241- info ! ( "INotify watching : {}" , notify_location ) ;
245+ info ! ( "INotify watching : {}" , core_dir_command ) ;
242246 let mut buffer = [ 0 ; 4096 ] ;
243247 loop {
244248 let events = match inotify. read_events_blocking ( & mut buffer) {
@@ -264,7 +268,7 @@ async fn main() -> Result<(), anyhow::Error> {
264268 Some ( s) => {
265269 let file = format ! (
266270 "{}/{}" ,
267- notify_location ,
271+ core_dir_command ,
268272 s. to_str( ) . unwrap_or_default( )
269273 ) ;
270274 let p = Path :: new ( & file) ;
@@ -389,7 +393,8 @@ fn get_bucket() -> Result<Bucket, anyhow::Error> {
389393 Ok ( Bucket :: new ( & s3. bucket , s3. region , s3. credentials ) ?. with_path_style ( ) )
390394}
391395
392- async fn run_polling_agent ( core_location : & str ) {
396+ async fn run_polling_agent ( ) {
397+ let core_location = env:: var ( "CORE_DIR" ) . unwrap_or_else ( |_| DEFAULT_CORE_DIR . to_string ( ) ) ;
393398 info ! ( "Executing Agent with location : {}" , core_location) ;
394399
395400 let bucket = match get_bucket ( ) {
0 commit comments