1- use std:: sync:: Arc ;
2-
31use anyhow:: { Context , Result , ensure} ;
4- use futures_util:: { StreamExt , stream} ;
52use gas:: prelude:: { Id , StandaloneCtx , util:: timestamp} ;
63use rivet_envoy_protocol as protocol;
7- use sqlite_storage:: { engine :: SqliteEngine , open:: OpenConfig } ;
4+ use sqlite_storage:: open:: OpenConfig ;
85
96use crate :: { conn:: Conn , sqlite_runtime} ;
107
11- const SHUTDOWN_CLOSE_PARALLELISM : usize = 256 ;
12-
13- #[ derive( Debug , Clone , PartialEq , Eq ) ]
14- pub struct ActiveActor {
15- pub actor_generation : u32 ,
16- pub sqlite_generation : Option < u64 > ,
17- pub state : ActiveActorState ,
18- }
19-
20- #[ derive( Debug , Clone , PartialEq , Eq ) ]
21- pub enum ActiveActorState {
22- Starting ,
23- Running ,
24- Stopping ,
25- }
26-
27- struct StartActorGuard < ' a > {
28- sqlite_engine : Arc < SqliteEngine > ,
29- active_actors : & ' a scc:: HashMap < String , ActiveActor > ,
30- actor_id : String ,
31- sqlite_generation : Option < u64 > ,
32- armed : bool ,
33- }
34-
35- impl < ' a > StartActorGuard < ' a > {
36- fn new (
37- sqlite_engine : Arc < SqliteEngine > ,
38- active_actors : & ' a scc:: HashMap < String , ActiveActor > ,
39- actor_id : String ,
40- ) -> Self {
41- Self {
42- sqlite_engine,
43- active_actors,
44- actor_id,
45- sqlite_generation : None ,
46- armed : true ,
47- }
48- }
49-
50- fn set_sqlite_generation ( & mut self , generation : u64 ) {
51- self . sqlite_generation = Some ( generation) ;
52- }
53-
54- fn disarm ( & mut self ) {
55- self . armed = false ;
56- }
57- }
58-
59- impl < ' a > Drop for StartActorGuard < ' a > {
60- fn drop ( & mut self ) {
61- if !self . armed {
62- return ;
63- }
64-
65- self . active_actors . remove_sync ( & self . actor_id ) ;
66-
67- if let Some ( generation) = self . sqlite_generation {
68- let sqlite_engine = self . sqlite_engine . clone ( ) ;
69- let actor_id = std:: mem:: take ( & mut self . actor_id ) ;
70- tokio:: spawn ( async move {
71- if let Err ( err) = sqlite_engine. close ( & actor_id, generation) . await {
72- tracing:: debug!(
73- actor_id = %actor_id,
74- ?err,
75- "sqlite db was already taken over during start cancellation"
76- ) ;
77- }
78- } ) ;
79- }
80- }
81- }
82-
838pub async fn start_actor (
849 ctx : & StandaloneCtx ,
8510 conn : & Conn ,
@@ -89,246 +14,39 @@ pub async fn start_actor(
8914 let actor_id = Id :: parse ( & checkpoint. actor_id ) . context ( "invalid start actor id" ) ?;
9015 let actor_id_string = actor_id. to_string ( ) ;
9116
92- match conn
93- . active_actors
94- . entry_async ( actor_id_string. clone ( ) )
95- . await
96- {
97- scc:: hash_map:: Entry :: Occupied ( _) => {
98- ensure ! ( false , "actor already active on envoy connection" ) ;
99- }
100- scc:: hash_map:: Entry :: Vacant ( entry) => {
101- entry. insert_entry ( ActiveActor {
102- actor_generation : checkpoint. generation ,
103- sqlite_generation : None ,
104- state : ActiveActorState :: Starting ,
105- } ) ;
106- }
107- }
108-
109- let mut start_guard = StartActorGuard :: new (
110- conn. sqlite_engine . clone ( ) ,
111- & conn. active_actors ,
112- actor_id_string. clone ( ) ,
113- ) ;
114-
115- let result = async {
116- let sqlite_open = conn
117- . sqlite_engine
118- . open ( & actor_id_string, OpenConfig :: new ( timestamp:: now ( ) ) )
119- . await ?;
120- let sqlite_generation = sqlite_open. generation ;
121- start_guard. set_sqlite_generation ( sqlite_generation) ;
122-
123- let populate_res = async {
124- ensure ! ( start. sqlite_startup_data. is_none( ) ) ;
125- ensure ! ( start. preloaded_kv. is_none( ) ) ;
126-
127- let hibernating_requests = ctx
128- . op ( pegboard:: ops:: actor:: hibernating_request:: list:: Input { actor_id } )
129- . await ?;
130- start. hibernating_requests = hibernating_requests
131- . into_iter ( )
132- . map ( |x| protocol:: HibernatingRequest {
133- gateway_id : x. gateway_id ,
134- request_id : x. request_id ,
135- } )
136- . collect ( ) ;
137-
138- let db = ctx. udb ( ) ?;
139- start. preloaded_kv = pegboard:: actor_kv:: preload:: fetch_preloaded_kv (
140- & db,
141- ctx. config ( ) . pegboard ( ) ,
142- actor_id,
143- conn. namespace_id ,
144- & start. config . name ,
145- )
146- . await ?;
147-
148- start. sqlite_startup_data =
149- Some ( sqlite_runtime:: protocol_sqlite_startup_data ( sqlite_open) ) ;
150-
151- Ok ( ( ) )
152- }
153- . await ;
154-
155- // Close SQLite if start command population fails.
156- if let Err ( err) = populate_res {
157- if let Err ( close_err) = conn
158- . sqlite_engine
159- . close ( & actor_id_string, sqlite_generation)
160- . await
161- {
162- tracing:: warn!(
163- actor_id = %actor_id_string,
164- ?close_err,
165- "failed to close sqlite db after start population failed"
166- ) ;
167- }
168- return Err ( err) ;
169- }
170-
171- Ok ( sqlite_generation)
172- }
173- . await ;
174-
175- match result {
176- Ok ( sqlite_generation) => {
177- let update_result = conn
178- . active_actors
179- . update_async ( & actor_id_string, |_, active| {
180- active. actor_generation = checkpoint. generation ;
181- active. sqlite_generation = Some ( sqlite_generation) ;
182- active. state = ActiveActorState :: Running ;
183- } )
184- . await ;
185- if update_result. is_none ( ) {
186- if let Err ( close_err) = conn
187- . sqlite_engine
188- . close ( & actor_id_string, sqlite_generation)
189- . await
190- {
191- tracing:: warn!(
192- actor_id = %actor_id_string,
193- ?close_err,
194- "failed to close sqlite db after active state disappeared"
195- ) ;
196- }
197- ensure ! ( false , "actor active state missing after start" ) ;
198- }
199- start_guard. disarm ( ) ;
200- Ok ( ( ) )
201- }
202- Err ( err) => {
203- conn. active_actors . remove_async ( & actor_id_string) . await ;
204- start_guard. disarm ( ) ;
205- Err ( err)
206- }
207- }
208- }
209-
210- pub async fn stop_actor ( conn : & Conn , checkpoint : & protocol:: ActorCheckpoint ) -> Result < ( ) > {
211- let actor_id = checkpoint. actor_id . clone ( ) ;
212- let update_result = conn
213- . active_actors
214- . update_async ( & actor_id, |_, active| {
215- if active. actor_generation == checkpoint. generation {
216- active. state = ActiveActorState :: Stopping ;
217- Ok ( ( ) )
218- } else {
219- Err ( active. actor_generation )
220- }
17+ ensure ! ( start. sqlite_startup_data. is_none( ) ) ;
18+ ensure ! ( start. preloaded_kv. is_none( ) ) ;
19+
20+ let hibernating_requests = ctx
21+ . op ( pegboard:: ops:: actor:: hibernating_request:: list:: Input { actor_id } )
22+ . await ?;
23+ start. hibernating_requests = hibernating_requests
24+ . into_iter ( )
25+ . map ( |x| protocol:: HibernatingRequest {
26+ gateway_id : x. gateway_id ,
27+ request_id : x. request_id ,
22128 } )
222- . await
223- . context ( "actor is not active on envoy connection" ) ?;
224-
225- if let Err ( active_generation) = update_result {
226- ensure ! (
227- false ,
228- "stop actor generation {} did not match active generation {}" ,
229- checkpoint. generation,
230- active_generation
231- ) ;
232- }
233- Ok ( ( ) )
234- }
235-
236- pub async fn actor_stopped ( conn : & Conn , checkpoint : & protocol:: ActorCheckpoint ) -> Result < ( ) > {
237- let actor_id = checkpoint. actor_id . clone ( ) ;
238- let active = match conn
239- . active_actors
240- . get_async ( & actor_id)
241- . await
242- . map ( |entry| entry. get ( ) . clone ( ) )
243- {
244- Some ( active) => active,
245- None if conn. is_serverless => {
246- conn. sqlite_engine . force_close ( & actor_id) . await ;
247- conn. serverless_sqlite_actors . remove_async ( & actor_id) . await ;
248- return Ok ( ( ) ) ;
249- }
250- None => {
251- ensure ! ( false , "actor stopped without active sqlite state" ) ;
252- unreachable ! ( ) ;
253- }
254- } ;
255- ensure ! (
256- active. actor_generation == checkpoint. generation,
257- "stopped actor generation {} did not match active generation {}" ,
258- checkpoint. generation,
259- active. actor_generation
260- ) ;
261-
262- let sqlite_generation = active
263- . sqlite_generation
264- . context ( "actor stopped before sqlite finished opening" ) ?;
265- let close_res = conn
29+ . collect ( ) ;
30+
31+ let db = ctx. udb ( ) ?;
32+ start. preloaded_kv = pegboard:: actor_kv:: preload:: fetch_preloaded_kv (
33+ & db,
34+ ctx. config ( ) . pegboard ( ) ,
35+ actor_id,
36+ conn. namespace_id ,
37+ & start. config . name ,
38+ )
39+ . await ?;
40+
41+ // Open SQLite to produce startup data for the envoy. The open is
42+ // fire-and-forget from the connection's perspective. The SqliteEngine's
43+ // takeover path on next open and the lenient `ensure_local_open` cache
44+ // catch-up handle ownership transitions.
45+ let sqlite_open = conn
26646 . sqlite_engine
267- . close ( & actor_id, sqlite_generation)
268- . await ;
269- if let Err ( err) = & close_res {
270- tracing:: warn!(
271- %actor_id,
272- ?err,
273- "close failed in actor_stopped"
274- ) ;
275- }
276- // Generation-checked remove so a concurrent `start_actor` for a fresh
277- // generation between the `get_async` above and this point does not have
278- // its newly-inserted entry deleted by the stale stop.
279- conn. active_actors
280- . remove_if_async ( & actor_id, |entry| {
281- entry. actor_generation == checkpoint. generation
282- } )
283- . await ;
284-
285- close_res
286- }
47+ . open ( & actor_id_string, OpenConfig :: new ( timestamp:: now ( ) ) )
48+ . await ?;
49+ start. sqlite_startup_data = Some ( sqlite_runtime:: protocol_sqlite_startup_data ( sqlite_open) ) ;
28750
288- pub async fn shutdown_conn_actors ( conn : & Conn ) {
289- let mut active_actors = Vec :: new ( ) ;
290- conn. active_actors . retain_sync ( |actor_id, active| {
291- active_actors. push ( ( actor_id. clone ( ) , active. clone ( ) ) ) ;
292- false
293- } ) ;
294-
295- stream:: iter ( active_actors. into_iter ( ) . map ( |( actor_id, active) | {
296- let sqlite_engine = conn. sqlite_engine . clone ( ) ;
297- close_actor_on_shutdown ( sqlite_engine, actor_id, active. sqlite_generation )
298- } ) )
299- . buffer_unordered ( SHUTDOWN_CLOSE_PARALLELISM )
300- . for_each ( |_| async { } )
301- . await ;
302-
303- let mut serverless_sqlite_actors = Vec :: new ( ) ;
304- conn. serverless_sqlite_actors
305- . retain_sync ( |actor_id, _generation| {
306- serverless_sqlite_actors. push ( actor_id. clone ( ) ) ;
307- false
308- } ) ;
309- stream:: iter ( serverless_sqlite_actors. into_iter ( ) . map ( |actor_id| {
310- let sqlite_engine = conn. sqlite_engine . clone ( ) ;
311- async move {
312- sqlite_engine. force_close ( & actor_id) . await ;
313- }
314- } ) )
315- . buffer_unordered ( SHUTDOWN_CLOSE_PARALLELISM )
316- . for_each ( |_| async { } )
317- . await ;
318- }
319-
320- async fn close_actor_on_shutdown (
321- sqlite_engine : Arc < SqliteEngine > ,
322- actor_id : String ,
323- sqlite_generation : Option < u64 > ,
324- ) {
325- if let Some ( generation) = sqlite_generation {
326- if let Err ( err) = sqlite_engine. close ( & actor_id, generation) . await {
327- tracing:: warn!(
328- actor_id = %actor_id,
329- ?err,
330- "close failed during envoy shutdown"
331- ) ;
332- }
333- }
51+ Ok ( ( ) )
33452}
0 commit comments