1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414
15+ use std:: fmt;
1516use std:: time:: Duration ;
1617
1718use anyhow:: Context ;
@@ -216,6 +217,26 @@ impl<A: Actor + Default> SpawnBuilder<A> {
216217 }
217218}
218219
220+ enum ActorExitPhase {
221+ Initializing ,
222+ Handling { message : & ' static str } ,
223+ Running ,
224+ OnDrainedMessaged ,
225+ Completed ,
226+ }
227+
228+ impl fmt:: Debug for ActorExitPhase {
229+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
230+ match self {
231+ ActorExitPhase :: Initializing => write ! ( f, "initializing" ) ,
232+ ActorExitPhase :: Handling { message } => write ! ( f, "handling({message})" ) ,
233+ ActorExitPhase :: Running => write ! ( f, "running" ) ,
234+ ActorExitPhase :: OnDrainedMessaged => write ! ( f, "on_drained_messages" ) ,
235+ ActorExitPhase :: Completed => write ! ( f, "completed" ) ,
236+ }
237+ }
238+ }
239+
219240/// Receives an envelope from either the high priority queue or the low priority queue.
220241///
221242/// In the paused state, the actor will only attempt to receive high priority messages.
@@ -250,41 +271,46 @@ impl<A: Actor> ActorExecutionEnv<A> {
250271 self . actor . get_mut ( ) . initialize ( & self . ctx ) . await
251272 }
252273
253- async fn process_messages ( & mut self ) -> ActorExitStatus {
274+ async fn process_messages ( & mut self ) -> ( ActorExitStatus , ActorExitPhase ) {
254275 loop {
255- if let Err ( exit_status) = self . process_all_available_messages ( ) . await {
256- return exit_status;
276+ if let Err ( ( exit_status, exit_phase ) ) = self . process_all_available_messages ( ) . await {
277+ return ( exit_status, exit_phase ) ;
257278 }
258279 }
259280 }
260281
261282 async fn process_one_message (
262283 & mut self ,
263284 mut envelope : Envelope < A > ,
264- ) -> Result < ( ) , ActorExitStatus > {
285+ ) -> Result < ( ) , ( ActorExitStatus , ActorExitPhase ) > {
265286 self . yield_and_check_if_killed ( ) . await ?;
266287 envelope
267288 . handle_message ( self . actor . get_mut ( ) , & self . ctx )
268- . await ?;
289+ . await
290+ . map_err ( |( exit_status, message) | {
291+ ( exit_status, ActorExitPhase :: Handling { message } )
292+ } ) ?;
269293 Ok ( ( ) )
270294 }
271295
272- async fn yield_and_check_if_killed ( & mut self ) -> Result < ( ) , ActorExitStatus > {
296+ async fn yield_and_check_if_killed ( & mut self ) -> Result < ( ) , ( ActorExitStatus , ActorExitPhase ) > {
273297 if self . ctx . kill_switch ( ) . is_dead ( ) {
274- return Err ( ActorExitStatus :: Killed ) ;
298+ return Err ( ( ActorExitStatus :: Killed , ActorExitPhase :: Running ) ) ;
275299 }
276300 if self . actor . get_mut ( ) . yield_after_each_message ( ) {
277301 self . ctx . yield_now ( ) . await ;
278302 if self . ctx . kill_switch ( ) . is_dead ( ) {
279- return Err ( ActorExitStatus :: Killed ) ;
303+ return Err ( ( ActorExitStatus :: Killed , ActorExitPhase :: Running ) ) ;
280304 }
281305 } else {
282306 self . ctx . record_progress ( ) ;
283307 }
284308 Ok ( ( ) )
285309 }
286310
287- async fn process_all_available_messages ( & mut self ) -> Result < ( ) , ActorExitStatus > {
311+ async fn process_all_available_messages (
312+ & mut self ,
313+ ) -> Result < ( ) , ( ActorExitStatus , ActorExitPhase ) > {
288314 self . yield_and_check_if_killed ( ) . await ?;
289315 let envelope = recv_envelope ( & mut self . inbox , & self . ctx ) . await ;
290316 self . process_one_message ( envelope) . await ?;
@@ -304,7 +330,11 @@ impl<A: Actor> ActorExecutionEnv<A> {
304330 break ;
305331 }
306332 }
307- self . actor . get_mut ( ) . on_drained_messages ( & self . ctx ) . await ?;
333+ self . actor
334+ . get_mut ( )
335+ . on_drained_messages ( & self . ctx )
336+ . await
337+ . map_err ( |exit_status| ( exit_status, ActorExitPhase :: OnDrainedMessaged ) ) ?;
308338 }
309339 if self . ctx . mailbox ( ) . is_last_mailbox ( ) {
310340 // We double check here that the mailbox does not contain any messages,
@@ -314,8 +344,7 @@ impl<A: Actor> ActorExecutionEnv<A> {
314344 if self . inbox . is_empty ( ) {
315345 // No one will be able to send us more messages.
316346 // We can exit the actor.
317- info ! ( actor = self . ctx. actor_instance_id( ) , "no more messages" ) ;
318- return Err ( ActorExitStatus :: Success ) ;
347+ return Err ( ( ActorExitStatus :: Success , ActorExitPhase :: Completed ) ) ;
319348 }
320349 }
321350
@@ -340,23 +369,6 @@ impl<A: Actor> ActorExecutionEnv<A> {
340369 }
341370 exit_status
342371 }
343-
344- fn process_exit_status ( & self , exit_status : & ActorExitStatus ) {
345- match & exit_status {
346- ActorExitStatus :: Success
347- | ActorExitStatus :: Quit
348- | ActorExitStatus :: DownstreamClosed
349- | ActorExitStatus :: Killed => { }
350- ActorExitStatus :: Failure ( err) => {
351- error ! ( cause=?err, exit_status=?exit_status, "actor-failure" ) ;
352- }
353- ActorExitStatus :: Panicked => {
354- error ! ( exit_status=?exit_status, "actor-failure" ) ;
355- }
356- }
357- info ! ( actor_id = %self . ctx. actor_instance_id( ) , exit_status = %exit_status, "actor-exit" ) ;
358- self . ctx . exit ( exit_status) ;
359- }
360372}
361373
362374impl < A : Actor > Drop for ActorExecutionEnv < A > {
@@ -382,19 +394,32 @@ async fn actor_loop<A: Actor>(
382394 let initialize_exit_status_res: Result < ( ) , ActorExitStatus > = actor_env. initialize ( ) . await ;
383395 drop ( no_advance_time_guard) ;
384396
385- let after_process_exit_status = if let Err ( initialize_exit_status) = initialize_exit_status_res
386- {
387- // We do not process messages if initialize yield an error.
388- // We still call finalize however!
389- initialize_exit_status
390- } else {
391- actor_env. process_messages ( ) . await
397+ let ( after_process_exit_status, exit_phase) =
398+ if let Err ( initialize_exit_status) = initialize_exit_status_res {
399+ // We do not process messages if initialize yield an error.
400+ // We still call finalize however!
401+ ( initialize_exit_status, ActorExitPhase :: Initializing )
402+ } else {
403+ actor_env. process_messages ( ) . await
404+ } ;
405+
406+ let actor_id = actor_env. ctx . actor_instance_id ( ) ;
407+ match after_process_exit_status {
408+ ActorExitStatus :: Success
409+ | ActorExitStatus :: Quit
410+ | ActorExitStatus :: DownstreamClosed
411+ | ActorExitStatus :: Killed => {
412+ info ! ( actor_id, phase = ?exit_phase, exit_status = ?after_process_exit_status, "actor-exit" ) ;
413+ }
414+ ActorExitStatus :: Failure ( _) | ActorExitStatus :: Panicked => {
415+ error ! ( actor_id, phase = ?exit_phase, exit_status = ?after_process_exit_status, "actor-exit" ) ;
416+ }
392417 } ;
393418
394419 // TODO the no advance time guard for finalize has a race condition. Ideally we would
395420 // like to have the guard before we drop the last envelope.
396421 let final_exit_status = actor_env. finalize ( after_process_exit_status) . await ;
397422 // The last observation is collected on `ActorExecutionEnv::Drop`.
398- actor_env. process_exit_status ( & final_exit_status) ;
423+ actor_env. ctx . exit ( & final_exit_status) ;
399424 final_exit_status
400425}
0 commit comments