Skip to content

Commit b41869a

Browse files
committed
fix(pb): clean up actor stop decision handling
1 parent ef85f3e commit b41869a

File tree

5 files changed

+102
-122
lines changed

5 files changed

+102
-122
lines changed

engine/packages/guard-core/src/proxy_service.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -750,10 +750,10 @@ impl ProxyService {
750750
.body(Full::new(req_body.clone()))
751751
.map_err(|err| errors::RequestBuildError(err.to_string()).build())?;
752752

753-
// Send the request with timeout
754-
let res = timeout(timeout_duration, self.state.client.request(proxied_req))
755-
.await
756-
.map_err(|_| {
753+
// Send the request with timeout
754+
let res = timeout(timeout_duration, self.state.client.request(proxied_req))
755+
.await
756+
.map_err(|_| {
757757
errors::RequestTimeout {
758758
timeout_seconds: timeout_duration.as_secs(),
759759
}

engine/packages/guard-core/src/server.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,7 @@ pub async fn run_server(
9696
metrics::TCP_CONNECTION_PENDING.inc();
9797
metrics::TCP_CONNECTION_TOTAL.inc();
9898

99-
if tcp_nodelay
100-
&& let Err(err) = tcp_stream.set_nodelay(true)
101-
{
99+
if tcp_nodelay && let Err(err) = tcp_stream.set_nodelay(true) {
102100
tracing::debug!(?err, "failed to enable tcp nodelay");
103101
}
104102

engine/packages/pegboard/src/workflows/actor2/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -720,7 +720,7 @@ async fn process_signal(
720720
&input,
721721
state,
722722
metrics_workflow_id,
723-
runtime::StoppedVariant::Normal {
723+
runtime::StoppedVariant::Stopped {
724724
code: code.clone(),
725725
message: message.clone(),
726726
},

engine/packages/pegboard/src/workflows/actor2/runtime.rs

Lines changed: 86 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ pub async fn reschedule_actor(
441441
#[derive(Debug)]
442442
pub enum StoppedVariant {
443443
FailedAllocation,
444-
Normal {
444+
Stopped {
445445
code: protocol::StopCode,
446446
message: Option<String>,
447447
},
@@ -468,11 +468,11 @@ pub async fn handle_stopped(
468468
// Save error to state
469469
match &variant {
470470
StoppedVariant::FailedAllocation => {}
471-
StoppedVariant::Normal {
471+
StoppedVariant::Stopped {
472472
code: protocol::StopCode::Ok,
473473
..
474474
} => {}
475-
StoppedVariant::Normal {
475+
StoppedVariant::Stopped {
476476
code: protocol::StopCode::Error,
477477
message,
478478
} => {
@@ -527,122 +527,106 @@ pub async fn handle_stopped(
527527
.await?;
528528
}
529529

530-
let (try_reallocate, going_away) = match state.transition {
531-
Transition::SleepIntent {
532-
rewake_after_stop, ..
533-
} => (rewake_after_stop, false),
534-
Transition::GoingAway { .. } => (true, true),
535-
Transition::Destroying { .. } => return Ok(StoppedResult::Destroy),
536-
_ => (true, false),
537-
};
538-
539-
// Always immediately reallocate if going away
540-
let stopped_res = if going_away {
541-
let allocate_res = ctx.activity(AllocateInput {}).await?;
542-
543-
if let Some(allocation) = allocate_res.allocation {
544-
state.generation += 1;
545-
546-
ctx.activity(SendOutboundInput {
547-
generation: state.generation,
548-
input: input.input.clone(),
549-
allocation,
550-
})
551-
.await?;
552-
553-
// Transition to allocating
554-
state.transition = Transition::Allocating {
555-
destroy_after_start: false,
556-
lost_timeout_ts: allocate_res.now
557-
+ ctx.config().pegboard().actor_allocation_threshold(),
558-
};
559-
} else {
560-
// Transition to retry backoff
561-
state.transition = Transition::Reallocating {
562-
since_ts: allocate_res.now,
563-
};
564-
}
530+
enum Decision {
531+
Reallocate,
532+
Backoff,
533+
Sleep,
534+
Destroy,
535+
}
565536

566-
StoppedResult::Continue
567-
} else if try_reallocate {
537+
let decision = match (&state.transition, input.crash_policy, variant) {
538+
(
539+
Transition::SleepIntent {
540+
rewake_after_stop: true,
541+
..
542+
},
543+
_,
544+
_,
545+
) => Decision::Reallocate,
546+
(
547+
Transition::SleepIntent {
548+
rewake_after_stop: false,
549+
..
550+
},
551+
_,
552+
_,
553+
) => Decision::Reallocate,
554+
(Transition::GoingAway { .. }, _, _) => Decision::Reallocate,
555+
(Transition::Destroying { .. }, _, _) => Decision::Destroy,
556+
(_, _, StoppedVariant::FailedAllocation) => Decision::Backoff,
568557
// An actor stopping with `StopCode::Ok` indicates a graceful exit
569-
let graceful_exit = matches!(
570-
variant,
571-
StoppedVariant::Normal {
558+
(
559+
_,
560+
_,
561+
StoppedVariant::Stopped {
572562
code: protocol::StopCode::Ok,
573563
..
574-
}
575-
);
564+
},
565+
) => Decision::Destroy,
566+
(_, CrashPolicy::Restart, _) => Decision::Reallocate,
567+
(_, CrashPolicy::Sleep, _) => Decision::Sleep,
568+
(_, CrashPolicy::Destroy, _) => Decision::Destroy,
569+
};
576570

577-
match (input.crash_policy, graceful_exit) {
578-
(CrashPolicy::Restart, false) => {
579-
let allocate_res = ctx.activity(AllocateInput {}).await?;
580-
581-
if let Some(allocation) = allocate_res.allocation {
582-
state.generation += 1;
583-
584-
ctx.activity(SendOutboundInput {
585-
generation: state.generation,
586-
input: input.input.clone(),
587-
allocation,
588-
})
589-
.await?;
590-
591-
// Transition to allocating
592-
state.transition = Transition::Allocating {
593-
destroy_after_start: false,
594-
lost_timeout_ts: allocate_res.now
595-
+ ctx.config().pegboard().actor_allocation_threshold(),
596-
};
597-
} else {
598-
// Transition to retry backoff
599-
state.transition = Transition::Reallocating {
600-
since_ts: allocate_res.now,
601-
};
602-
}
571+
let stopped_res = match decision {
572+
Decision::Reallocate => {
573+
let allocate_res = ctx.activity(AllocateInput {}).await?;
603574

604-
StoppedResult::Continue
605-
}
606-
(CrashPolicy::Sleep, false) => {
607-
tracing::debug!(actor_id=?input.actor_id, "actor sleeping due to ungraceful exit");
575+
if let Some(allocation) = allocate_res.allocation {
576+
state.generation += 1;
608577

609-
// Clear alarm
610-
if let Some(alarm_ts) = state.alarm_ts {
611-
let now = ctx.activity(GetTsInput {}).await?;
578+
ctx.activity(SendOutboundInput {
579+
generation: state.generation,
580+
input: input.input.clone(),
581+
allocation,
582+
})
583+
.await?;
612584

613-
if now >= alarm_ts {
614-
state.alarm_ts = None;
615-
}
616-
}
585+
// Transition to allocating
586+
state.transition = Transition::Allocating {
587+
destroy_after_start: false,
588+
lost_timeout_ts: allocate_res.now
589+
+ ctx.config().pegboard().actor_allocation_threshold(),
590+
};
591+
} else {
592+
// Transition to retry backoff
593+
state.transition = Transition::Reallocating {
594+
since_ts: allocate_res.now,
595+
};
596+
}
617597

618-
// Transition to sleeping
619-
state.transition = Transition::Sleeping;
598+
StoppedResult::Continue
599+
}
600+
Decision::Backoff => {
601+
let now = ctx.activity(GetTsInput {}).await?;
620602

621-
StoppedResult::Continue
622-
}
623-
_ => {
624-
let now = ctx.activity(GetTsInput {}).await?;
603+
state.transition = Transition::Reallocating { since_ts: now };
625604

626-
// Don't destroy on failed allocation, retry instead
627-
if let StoppedVariant::FailedAllocation { .. } = &variant {
628-
// Transition to retry backoff
629-
state.transition = Transition::Reallocating { since_ts: now };
605+
StoppedResult::Continue
606+
}
607+
Decision::Sleep => {
608+
// Clear alarm
609+
if let Some(alarm_ts) = state.alarm_ts {
610+
let now = ctx.activity(GetTsInput {}).await?;
630611

631-
StoppedResult::Continue
632-
} else {
633-
StoppedResult::Destroy
612+
if now >= alarm_ts {
613+
state.alarm_ts = None;
634614
}
635615
}
636-
}
637-
} else {
638-
// Transition to sleeping
639-
state.transition = Transition::Sleeping;
640616

641-
StoppedResult::Continue
617+
// Transition to sleeping
618+
state.transition = Transition::Sleeping;
619+
620+
StoppedResult::Continue
621+
}
622+
Decision::Destroy => StoppedResult::Destroy,
642623
};
643624

644-
if let Transition::Sleeping = state.transition {
645-
ctx.activity(SetSleepingInput {}).await?;
625+
match state.transition {
626+
Transition::Sleeping | Transition::Reallocating { .. } => {
627+
ctx.activity(SetSleepingInput {}).await?;
628+
}
629+
_ => {}
646630
}
647631

648632
ctx.msg(Stopped {})

engine/sdks/typescript/test-envoy/src/index.ts

Lines changed: 10 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)