Skip to content

Commit 51a568c

Browse files
committed
fix(pegboard): revise actor rescheduling algorithm, add client metrics
1 parent 31b6aa7 commit 51a568c

4 files changed

Lines changed: 93 additions & 48 deletions

File tree

packages/edge/services/pegboard/src/metrics.rs

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,16 @@ lazy_static::lazy_static! {
88
*REGISTRY
99
).unwrap();
1010

11-
pub static ref CLIENT_CPU_ALLOCATED: IntGaugeVec = register_int_gauge_vec_with_registry!(
12-
"pegboard_client_cpu_allocated",
13-
"Total millicores of cpu allocated on a client.",
11+
pub static ref CLIENT_MEMORY_TOTAL: IntGaugeVec = register_int_gauge_vec_with_registry!(
12+
"pegboard_client_memory_total",
13+
"Total MiB of memory available on a client.",
14+
&["client_id", "flavor"],
15+
*REGISTRY
16+
).unwrap();
17+
18+
pub static ref CLIENT_CPU_TOTAL: IntGaugeVec = register_int_gauge_vec_with_registry!(
19+
"pegboard_client_cpu_total",
20+
"Total millicores of cpu available on a client.",
1421
&["client_id", "flavor"],
1522
*REGISTRY
1623
).unwrap();
@@ -22,6 +29,13 @@ lazy_static::lazy_static! {
2229
*REGISTRY
2330
).unwrap();
2431

32+
pub static ref CLIENT_CPU_ALLOCATED: IntGaugeVec = register_int_gauge_vec_with_registry!(
33+
"pegboard_client_cpu_allocated",
34+
"Total millicores of cpu allocated on a client.",
35+
&["client_id", "flavor"],
36+
*REGISTRY
37+
).unwrap();
38+
2539
pub static ref ACTOR_ALLOCATE_DURATION: HistogramVec = register_histogram_vec_with_registry!(
2640
"pegboard_actor_allocate_duration",
2741
"Total duration to reserve resources for an actor.",
@@ -38,16 +52,16 @@ lazy_static::lazy_static! {
3852
*REGISTRY,
3953
).unwrap();
4054

41-
pub static ref ENV_CPU_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!(
42-
"pegboard_env_cpu_usage",
43-
"Total millicores used by an environment.",
55+
pub static ref ENV_MEMORY_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!(
56+
"pegboard_env_memory_usage",
57+
"Total MiB of memory used by an environment.",
4458
&["env_id", "flavor"],
4559
*REGISTRY,
4660
).unwrap();
4761

48-
pub static ref ENV_MEMORY_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!(
49-
"pegboard_env_memory_usage",
50-
"Total MiB of memory used by an environment.",
62+
pub static ref ENV_CPU_USAGE: IntGaugeVec = register_int_gauge_vec_with_registry!(
63+
"pegboard_env_cpu_usage",
64+
"Total millicores used by an environment.",
5165
&["env_id", "flavor"],
5266
*REGISTRY,
5367
).unwrap();

packages/edge/services/pegboard/src/workflows/actor/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ const ACTOR_START_THRESHOLD_MS: i64 = util::duration::seconds(30);
2626
const ACTOR_STOP_THRESHOLD_MS: i64 = util::duration::seconds(30);
2727
/// How long to wait after stopped and not receiving an exit state before setting actor as lost.
2828
const ACTOR_EXIT_THRESHOLD_MS: i64 = util::duration::seconds(5);
29+
/// How long an actor goes without retries before it's retry count is reset to 0, effectively resetting its
30+
/// backoff to 0.
31+
const RETRY_RESET_DURATION_MS: i64 = util::duration::minutes(10);
2932

3033
#[derive(Clone, Debug, Serialize, Deserialize)]
3134
pub struct Input {
@@ -124,9 +127,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul
124127
.send()
125128
.await?;
126129

127-
let Some((client_id, client_workflow_id)) =
128-
runtime::spawn_actor(ctx, input, &initial_actor_setup, 0).await?
129-
else {
130+
let Some(res) = runtime::spawn_actor(ctx, input, &initial_actor_setup, 0).await? else {
130131
ctx.msg(Failed {
131132
message: "Failed to allocate (no availability).".into(),
132133
})
@@ -147,7 +148,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul
147148

148149
let state_res = ctx
149150
.loope(
150-
runtime::State::new(client_id, client_workflow_id, input.image_id),
151+
runtime::State::new(res.client_id, res.client_workflow_id, input.image_id),
151152
|ctx, state| {
152153
let input = input.clone();
153154

@@ -229,7 +230,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul
229230

230231
ctx.activity(runtime::UpdateFdbInput {
231232
actor_id: input.actor_id,
232-
client_id,
233+
client_id: state.client_id,
233234
state: sig.state.clone(),
234235
})
235236
.await?;

packages/edge/services/pegboard/src/workflows/actor/runtime.rs

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use util::serde::AsHashableExt;
1414
use super::{
1515
destroy::{self, KillCtx},
1616
setup, Destroy, Input, ACTOR_START_THRESHOLD_MS, BASE_RETRY_TIMEOUT_MS,
17+
RETRY_RESET_DURATION_MS,
1718
};
1819
use crate::{
1920
keys, metrics,
@@ -26,11 +27,15 @@ use crate::{
2627
#[derive(Deserialize, Serialize)]
2728
pub struct State {
2829
pub generation: u32,
30+
2931
pub client_id: Uuid,
3032
pub client_workflow_id: Uuid,
3133
pub image_id: Option<Uuid>,
34+
3235
pub drain_timeout_ts: Option<i64>,
3336
pub gc_timeout_ts: Option<i64>,
37+
38+
reschedule_state: RescheduleState,
3439
}
3540

3641
impl State {
@@ -42,6 +47,7 @@ impl State {
4247
image_id: Some(image_id),
4348
drain_timeout_ts: None,
4449
gc_timeout_ts: Some(util::timestamp::now() + ACTOR_START_THRESHOLD_MS),
50+
reschedule_state: RescheduleState::default(),
4551
}
4652
}
4753
}
@@ -51,6 +57,12 @@ pub struct StateRes {
5157
pub kill: Option<KillCtx>,
5258
}
5359

60+
#[derive(Serialize, Deserialize, Clone, Default)]
61+
struct RescheduleState {
62+
last_retry_ts: i64,
63+
retry_count: usize,
64+
}
65+
5466
#[derive(Debug, Serialize, Deserialize, Hash)]
5567
struct UpdateClientInput {
5668
client_id: Uuid,
@@ -224,9 +236,9 @@ struct AllocateActorInputV2 {
224236
}
225237

226238
#[derive(Debug, Serialize, Deserialize)]
227-
struct AllocateActorOutputV2 {
228-
client_id: Uuid,
229-
client_workflow_id: Uuid,
239+
pub struct AllocateActorOutputV2 {
240+
pub client_id: Uuid,
241+
pub client_workflow_id: Uuid,
230242
}
231243

232244
#[activity(AllocateActorV2)]
@@ -611,7 +623,7 @@ pub async fn spawn_actor(
611623
input: &Input,
612624
actor_setup: &setup::ActorSetupCtx,
613625
generation: u32,
614-
) -> GlobalResult<Option<(Uuid, Uuid)>> {
626+
) -> GlobalResult<Option<AllocateActorOutputV2>> {
615627
let res = match ctx.check_version(2).await? {
616628
1 => {
617629
ctx.activity(AllocateActorInputV1 {
@@ -744,7 +756,7 @@ pub async fn spawn_actor(
744756
.send()
745757
.await?;
746758

747-
Ok(Some((res.client_id, res.client_workflow_id)))
759+
Ok(Some(res))
748760
}
749761

750762
pub async fn reschedule_actor(
@@ -769,7 +781,7 @@ pub async fn reschedule_actor(
769781

770782
// Waits for the actor to be ready (or destroyed) and automatically retries if failed to allocate.
771783
let res = ctx
772-
.loope(RescheduleState::default(), |ctx, state| {
784+
.loope(state.reschedule_state.clone(), |ctx, state| {
773785
let input = input.clone();
774786
let actor_setup = actor_setup.clone();
775787

@@ -778,14 +790,13 @@ pub async fn reschedule_actor(
778790
let mut backoff =
779791
util::Backoff::new_at(8, None, BASE_RETRY_TIMEOUT_MS, 500, state.retry_count);
780792

781-
// If the last retry ts is more than 2 * backoff ago, reset retry count to 0
793+
// If the last retry ts is more than RETRY_RESET_DURATION_MS, reset retry count to 0
782794
let now = util::timestamp::now();
783-
state.retry_count =
784-
if state.last_retry_ts < now - i64::try_from(2 * backoff.current_duration())? {
785-
0
786-
} else {
787-
state.retry_count + 1
788-
};
795+
state.retry_count = if state.last_retry_ts < now - RETRY_RESET_DURATION_MS {
796+
0
797+
} else {
798+
state.retry_count + 1
799+
};
789800
state.last_retry_ts = now;
790801

791802
// Don't sleep for first retry
@@ -797,14 +808,14 @@ pub async fn reschedule_actor(
797808
.listen_with_timeout::<Destroy>(Instant::from(next) - Instant::now())
798809
.await?
799810
{
800-
tracing::debug!("destroying before actor start");
811+
tracing::debug!("destroying before actor reschedule");
801812

802813
return Ok(Loop::Break(Err(sig)));
803814
}
804815
}
805816

806817
if let Some(res) = spawn_actor(ctx, &input, &actor_setup, next_generation).await? {
807-
Ok(Loop::Break(Ok(res)))
818+
Ok(Loop::Break(Ok((state.clone(), res))))
808819
} else {
809820
tracing::debug!(actor_id=?input.actor_id, "failed to reschedule actor, retrying");
810821

@@ -817,10 +828,13 @@ pub async fn reschedule_actor(
817828

818829
// Update loop state
819830
match res {
820-
Ok((client_id, client_workflow_id)) => {
831+
Ok((reschedule_state, res)) => {
821832
state.generation = next_generation;
822-
state.client_id = client_id;
823-
state.client_workflow_id = client_workflow_id;
833+
state.client_id = res.client_id;
834+
state.client_workflow_id = res.client_workflow_id;
835+
836+
// Save reschedule state in global state
837+
state.reschedule_state = reschedule_state;
824838

825839
// Reset gc timeout once allocated
826840
state.gc_timeout_ts = Some(util::timestamp::now() + ACTOR_START_THRESHOLD_MS);
@@ -831,12 +845,6 @@ pub async fn reschedule_actor(
831845
}
832846
}
833847

834-
#[derive(Serialize, Deserialize, Default)]
835-
struct RescheduleState {
836-
last_retry_ts: i64,
837-
retry_count: usize,
838-
}
839-
840848
#[derive(Debug, Serialize, Deserialize, Hash)]
841849
struct ClearPortsAndResourcesInput {
842850
actor_id: Uuid,

packages/edge/services/pegboard/src/workflows/client/mod.rs

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -938,9 +938,19 @@ struct UpdateMetricsInput {
938938

939939
#[activity(UpdateMetrics)]
940940
async fn update_metrics(ctx: &ActivityCtx, input: &UpdateMetricsInput) -> GlobalResult<()> {
941-
let (memory, cpu) = if input.clear {
942-
(0, 0)
943-
} else {
941+
if input.clear {
942+
metrics::CLIENT_MEMORY_ALLOCATED
943+
.with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()])
944+
.set(0);
945+
946+
metrics::CLIENT_CPU_ALLOCATED
947+
.with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()])
948+
.set(0);
949+
950+
return Ok(());
951+
}
952+
953+
let (total_mem, total_cpu, remaining_mem, remaining_cpu) =
944954
ctx.fdb()
945955
.await?
946956
.run(|tx, _mc| async move {
@@ -983,21 +993,33 @@ async fn update_metrics(ctx: &ActivityCtx, input: &UpdateMetricsInput) -> Global
983993
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?;
984994

985995
Ok((
986-
total_mem.saturating_sub(remaining_mem),
987-
total_cpu.saturating_sub(remaining_cpu),
996+
total_mem,
997+
remaining_mem,
998+
total_cpu,
999+
remaining_cpu,
9881000
))
9891001
})
9901002
.custom_instrument(tracing::info_span!("client_update_metrics_tx"))
991-
.await?
992-
};
1003+
.await?;
9931004

994-
metrics::CLIENT_CPU_ALLOCATED
1005+
metrics::CLIENT_MEMORY_TOTAL
1006+
.with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()])
1007+
.set(total_mem.try_into()?);
1008+
1009+
metrics::CLIENT_CPU_TOTAL
9951010
.with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()])
996-
.set(cpu.try_into()?);
1011+
.set(total_cpu.try_into()?);
1012+
1013+
let alllocated_mem = total_mem.saturating_sub(remaining_mem);
1014+
let allocated_cpu = total_cpu.saturating_sub(remaining_cpu);
9971015

9981016
metrics::CLIENT_MEMORY_ALLOCATED
9991017
.with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()])
1000-
.set(memory.try_into()?);
1018+
.set(alllocated_mem.try_into()?);
1019+
1020+
metrics::CLIENT_CPU_ALLOCATED
1021+
.with_label_values(&[&input.client_id.to_string(), &input.flavor.to_string()])
1022+
.set(allocated_cpu.try_into()?);
10011023

10021024
Ok(())
10031025
}

0 commit comments

Comments
 (0)