11use std:: {
22 result:: Result :: { Err , Ok } ,
33 sync:: Arc ,
4- time:: Duration ,
4+ time:: { Duration , Instant } ,
55} ;
66
77use anyhow:: * ;
88use indoc:: indoc;
99use nix:: sys:: signal:: Signal ;
1010use pegboard:: protocol;
1111use pegboard_config:: runner_protocol;
12+ use sqlx:: Acquire ;
1213use tokio:: { fs, sync:: Mutex } ;
1314use uuid:: Uuid ;
1415
@@ -75,16 +76,18 @@ impl Actor {
7576 actor_id,
7677 generation,
7778 config,
78- start_ts
79+ start_ts,
80+ image_id
7981 )
80- VALUES (?1, ?2, ?3, ?4)
82+ VALUES (?1, ?2, ?3, ?4, ?5 )
8183 ON CONFLICT (actor_id, generation) DO NOTHING
8284 " ,
8385 ) )
8486 . bind ( self . actor_id )
8587 . bind ( self . generation as i64 )
8688 . bind ( & config_json)
8789 . bind ( utils:: now ( ) )
90+ . bind ( self . config . image . id )
8891 . execute ( & mut * ctx. sql ( ) . await ?)
8992 . await
9093 } )
@@ -128,7 +131,7 @@ impl Actor {
128131 self : & Arc < Self > ,
129132 ctx : & Arc < Ctx > ,
130133 ) -> Result < protocol:: HashableMap < String , protocol:: ProxiedPort > > {
131- let setup_timer = std :: time :: Instant :: now ( ) ;
134+ let setup_timer = Instant :: now ( ) ;
132135 tracing:: info!( actor_id=?self . actor_id, generation=?self . generation, "setting up actor" ) ;
133136
134137 let actor_path = ctx. actor_path ( self . actor_id , self . generation ) ;
@@ -146,19 +149,13 @@ impl Actor {
146149 protocol:: ImageKind :: DockerImage | protocol:: ImageKind :: OciBundle
147150 ) && matches ! ( self . config. network_mode, protocol:: NetworkMode :: Bridge ) ;
148151
149- // Parallelize two independent jobs:
150- //
151- // - `download_image` takes a long time to download. `download_image` is dependent on
152- // `make_fs`
153- // - `setup_cni_network` takes a long time. `setup_cni_network` is dependent on
154- // `bind_ports`.
155152 tracing:: info!( actor_id=?self . actor_id, generation=?self . generation, "starting parallel setup tasks" ) ;
156- let parallel_timer = std :: time :: Instant :: now ( ) ;
153+ let parallel_timer = Instant :: now ( ) ;
157154
158155 let ( _, ports) = tokio:: try_join!(
159156 async {
160- self . make_fs( & ctx) . await ?;
161157 self . download_image( & ctx) . await ?;
158+ self . make_fs( & ctx) . await ?;
162159 Result :: <( ) , anyhow:: Error >:: Ok ( ( ) )
163160 } ,
164161 async {
@@ -205,7 +202,12 @@ impl Actor {
205202 let mut runner_env = vec ! [
206203 (
207204 "ROOT_USER_ENABLED" ,
208- if self . config. root_user_enabled { "1" } else { "0" } . to_string( ) ,
205+ if self . config. root_user_enabled {
206+ "1"
207+ } else {
208+ "0"
209+ }
210+ . to_string( ) ,
209211 ) ,
210212 ( "ACTOR_ID" , self . actor_id. to_string( ) ) ,
211213 ] ;
@@ -443,7 +445,10 @@ impl Actor {
443445 // Update stop_ts
444446 if matches ! ( signal, Signal :: SIGTERM | Signal :: SIGKILL ) || !has_runner {
445447 let stop_ts_set = utils:: sql:: query ( || async {
446- sqlx:: query_as :: < _ , ( bool , ) > ( indoc ! (
448+ let mut conn = ctx. sql ( ) . await ?;
449+ let mut tx = conn. begin ( ) . await ?;
450+
451+ let res = sqlx:: query_as :: < _ , ( bool , ) > ( indoc ! (
447452 "
448453 UPDATE actors
449454 SET stop_ts = ?3
@@ -457,11 +462,27 @@ impl Actor {
457462 . bind ( self . actor_id )
458463 . bind ( self . generation as i64 )
459464 . bind ( utils:: now ( ) )
460- . fetch_optional ( & mut * ctx. sql ( ) . await ?)
461- . await
465+ . fetch_optional ( & mut * tx)
466+ . await ?;
467+
468+ // Update LRU cache
469+ sqlx:: query ( indoc ! (
470+ "
471+ UPDATE images_cache
472+ SET last_used_ts = ?2
473+ WHERE image_id = ?1
474+ " ,
475+ ) )
476+ . bind ( self . config . image . id )
477+ . bind ( utils:: now ( ) )
478+ . execute ( & mut * tx)
479+ . await ?;
480+
481+ tx. commit ( ) . await ?;
482+
483+ Ok ( res. is_some ( ) )
462484 } )
463- . await ?
464- . is_some ( ) ;
485+ . await ?;
465486
466487 // Emit event if not stopped before
467488 if stop_ts_set {
0 commit comments