Skip to content

Commit ffd9202

Browse files
committed
fix(pegboard): stabilize actor2 validation and lifecycle
1 parent 6521792 commit ffd9202

5 files changed

Lines changed: 130 additions & 16 deletions

File tree

engine/packages/pegboard/src/ops/actor/create.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ pub async fn pegboard_actor_create(ctx: &OperationCtx, input: &Input) -> Result<
6363
name: input.name.clone(),
6464
pool_name: input.runner_name_selector.clone(),
6565
key: input.key.clone(),
66+
crash_policy: input.crash_policy,
6667
namespace_id: input.namespace_id,
6768
input: input.input.clone(),
6869
from_v1: false,

engine/packages/pegboard/src/ops/actor/util.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use gas::db::WorkflowData;
22
use gas::prelude::*;
3-
use rivet_types::actors::{Actor, CrashPolicy};
3+
use rivet_types::actors::Actor;
44
use std::collections::{HashMap, HashSet};
55

66
use crate::workflows::actor::FailureReason as WorkflowFailureReason;
@@ -212,7 +212,7 @@ pub async fn build_actors_from_workflows(
212212
namespace_id: s.namespace_id,
213213
datacenter: dc_name.to_string(),
214214
runner_name_selector: s.pool_name,
215-
crash_policy: CrashPolicy::Sleep,
215+
crash_policy: s.crash_policy,
216216

217217
create_ts: s.create_ts,
218218
start_ts: s.start_ts,

engine/packages/pegboard/src/workflows/actor/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
230230
name: input.name.clone(),
231231
pool_name: input.runner_name_selector.clone(),
232232
key: input.key.clone(),
233+
crash_policy: input.crash_policy,
233234
namespace_id: input.namespace_id,
234235
input: input.input.clone(),
235236
from_v1: true,
@@ -865,6 +866,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
865866
name: input.name.clone(),
866867
pool_name: input.runner_name_selector.clone(),
867868
key: input.key.clone(),
869+
crash_policy: input.crash_policy,
868870
namespace_id: input.namespace_id,
869871
input: input.input.clone(),
870872
from_v1: true,

engine/packages/pegboard/src/workflows/actor2/mod.rs

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use futures_util::FutureExt;
22
use gas::prelude::*;
3+
use rivet_data::converted::ActorNameKeyData;
34
use rivet_data::converted::ActorByKeyKeyData;
45
use rivet_envoy_protocol as protocol;
6+
use rivet_types::actors::CrashPolicy;
57
use universaldb::prelude::*;
68

79
use crate::errors;
@@ -14,6 +16,9 @@ use runtime::{StoppedResult, Transition};
1416

1517
/// Batch size of how many events to ack.
1618
const EVENT_ACK_BATCH_SIZE: i64 = 250;
19+
pub const SQLITE_SCHEMA_VERSION_V1: u32 = 1;
20+
pub const SQLITE_SCHEMA_VERSION_V2: u32 = 2;
21+
const MAX_INPUT_SIZE: usize = util::size::mebibytes(4) as usize;
1722

1823
// NOTE: Assumes input is validated.
1924
#[derive(Clone, Debug, Serialize, Deserialize, Hash)]
@@ -22,6 +27,7 @@ pub struct Input {
2227
pub name: String,
2328
pub pool_name: String,
2429
pub key: Option<String>,
30+
pub crash_policy: CrashPolicy,
2531

2632
pub namespace_id: Id,
2733

@@ -36,6 +42,8 @@ pub struct State {
3642
pub name: String,
3743
pub pool_name: String,
3844
pub key: Option<String>,
45+
#[serde(default)]
46+
pub crash_policy: CrashPolicy,
3947
pub namespace_id: Id,
4048

4149
pub acquired_slot: bool,
@@ -68,6 +76,7 @@ impl State {
6876
name: String,
6977
pool_name: String,
7078
key: Option<String>,
79+
crash_policy: CrashPolicy,
7180
namespace_id: Id,
7281
create_ts: i64,
7382
) -> Self {
@@ -76,6 +85,7 @@ impl State {
7685
name,
7786
pool_name,
7887
key,
88+
crash_policy,
7989
namespace_id,
8090

8191
acquired_slot: false,
@@ -111,11 +121,30 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
111121
// we added to indexes before Epoxy validation, actors could appear in lists with duplicate
112122
// key (since reservation wasn't confirmed yet).
113123

124+
let validation_res = ctx
125+
.activity(ValidateInput {
126+
name: input.name.clone(),
127+
key: input.key.clone(),
128+
namespace_id: input.namespace_id,
129+
input: input.input.clone(),
130+
})
131+
.await?;
132+
133+
if let Err(error) = validation_res {
134+
ctx.msg(Failed { error })
135+
.topic(("actor_id", input.actor_id))
136+
.send()
137+
.await?;
138+
139+
return Ok(());
140+
}
141+
114142
ctx.activity(InitStateAndUdbInput {
115143
actor_id: input.actor_id,
116144
name: input.name.clone(),
117145
pool_name: input.pool_name.clone(),
118146
key: input.key.clone(),
147+
crash_policy: input.crash_policy,
119148
namespace_id: input.namespace_id,
120149
create_ts: ctx.create_ts(),
121150
from_v1: input.from_v1,
@@ -224,13 +253,63 @@ pub async fn pegboard_actor2(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
224253
destroy(ctx, input).await
225254
}
226255

256+
#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
257+
pub struct ValidateInput {
258+
pub namespace_id: Id,
259+
pub name: String,
260+
pub key: Option<String>,
261+
pub input: Option<String>,
262+
}
263+
264+
#[activity(Validate)]
265+
pub async fn validate(
266+
ctx: &ActivityCtx,
267+
input: &ValidateInput,
268+
) -> Result<std::result::Result<(), errors::Actor>> {
269+
let ns_res = ctx
270+
.op(namespace::ops::get_global::Input {
271+
namespace_ids: vec![input.namespace_id],
272+
})
273+
.await?;
274+
275+
if ns_res.is_empty() {
276+
return Ok(Err(errors::Actor::NamespaceNotFound));
277+
};
278+
279+
if input
280+
.input
281+
.as_ref()
282+
.map(|x| x.len() > MAX_INPUT_SIZE)
283+
.unwrap_or_default()
284+
{
285+
return Ok(Err(errors::Actor::InputTooLarge {
286+
max_size: MAX_INPUT_SIZE,
287+
}));
288+
}
289+
290+
if let Some(k) = &input.key {
291+
if k.is_empty() {
292+
return Ok(Err(errors::Actor::EmptyKey));
293+
}
294+
if k.len() > 1024 {
295+
return Ok(Err(errors::Actor::KeyTooLarge {
296+
max_size: 1024,
297+
key_preview: util::safe_slice(k, 0, 1024).to_string(),
298+
}));
299+
}
300+
}
301+
302+
Ok(Ok(()))
303+
}
304+
227305
#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
228306
pub struct InitStateAndUdbInput {
229307
pub actor_id: Id,
230308
pub name: String,
231309
pub key: Option<String>,
232310
pub namespace_id: Id,
233311
pub pool_name: String,
312+
pub crash_policy: CrashPolicy,
234313
pub create_ts: i64,
235314
pub from_v1: bool,
236315
}
@@ -244,6 +323,7 @@ pub async fn insert_state_and_db(ctx: &ActivityCtx, input: &InitStateAndUdbInput
244323
input.name.clone(),
245324
input.pool_name.clone(),
246325
input.key.clone(),
326+
input.crash_policy,
247327
input.namespace_id,
248328
input.create_ts,
249329
));
@@ -343,6 +423,17 @@ pub async fn populate_indexes(ctx: &ActivityCtx, input: &PopulateIndexesInput) -
343423
ctx.workflow_id(),
344424
)?;
345425

426+
// Write the name into the namespace actor names list with empty metadata if it does not already exist.
427+
let name_key = crate::keys::ns::ActorNameKey::new(namespace_id, name.clone());
428+
if !tx.exists(&name_key, Serializable).await? {
429+
tx.write(
430+
&name_key,
431+
ActorNameKeyData {
432+
metadata: serde_json::Map::new(),
433+
},
434+
)?;
435+
}
436+
346437
// NOTE: keys::ns::ActorByKeyKey is written in actor_keys.rs when reserved by epoxy
347438

348439
Ok(())

engine/packages/pegboard/src/workflows/actor2/runtime.rs

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use futures_util::TryStreamExt;
44
use gas::prelude::*;
55
use rand::prelude::SliceRandom;
66
use rivet_envoy_protocol::{self as protocol, PROTOCOL_VERSION, versioned};
7+
use rivet_types::actors::CrashPolicy;
78
use rivet_types::runner_configs::RunnerConfigKind;
89
use universaldb::prelude::*;
910
use universalpubsub::PublishOpts;
@@ -299,6 +300,8 @@ pub async fn allocate(ctx: &ActivityCtx, input: &AllocateInput) -> Result<Alloca
299300
state.acquired_slot = acquired_slot;
300301
state.error = error;
301302
state.envoy_last_command_idx = 0;
303+
state.sleep_ts = None;
304+
state.reschedule_ts = None;
302305

303306
Ok(AllocateOutput {
304307
allocation,
@@ -608,7 +611,11 @@ pub async fn handle_stopped(
608611
code: protocol::StopCode::Error,
609612
..
610613
}
611-
| StoppedVariant::Lost { .. } => Decision::Sleep,
614+
| StoppedVariant::Lost { .. } => match input.crash_policy {
615+
CrashPolicy::Restart => Decision::Backoff,
616+
CrashPolicy::Sleep => Decision::Sleep,
617+
CrashPolicy::Destroy => Decision::Destroy,
618+
},
612619
},
613620
};
614621

@@ -617,14 +624,20 @@ pub async fn handle_stopped(
617624
let allocate_res = ctx.activity(AllocateInput {}).await?;
618625

619626
if let Some(allocation) = allocate_res.allocation {
620-
start_new_generation(ctx, input, state, allocation).await?;
621-
622-
// Transition to allocating
623-
state.transition = Transition::Allocating {
624-
destroy_after_start: false,
625-
lost_timeout_ts: allocate_res.now
626-
+ ctx.config().pegboard().actor_allocation_threshold(),
627+
state.transition = match &allocation {
628+
Allocation::Serverless => Transition::Allocating {
629+
destroy_after_start: false,
630+
lost_timeout_ts: allocate_res.now
631+
+ ctx.config().pegboard().actor_allocation_threshold(),
632+
},
633+
Allocation::Serverful { .. } => Transition::Starting {
634+
destroy_after_start: false,
635+
lost_timeout_ts: allocate_res.now
636+
+ ctx.config().pegboard().actor_start_threshold(),
637+
},
627638
};
639+
640+
start_new_generation(ctx, input, state, allocation).await?;
628641
} else {
629642
// Transition to retry backoff
630643
state.transition = Transition::Reallocating {
@@ -658,13 +671,20 @@ pub async fn handle_stopped(
658671
let allocate_res = ctx.activity(AllocateInput {}).await?;
659672

660673
if let Some(allocation) = allocate_res.allocation {
661-
start_new_generation(ctx, input, state, allocation).await?;
662-
663-
state.transition = Transition::Allocating {
664-
destroy_after_start: false,
665-
lost_timeout_ts: allocate_res.now
666-
+ ctx.config().pegboard().actor_allocation_threshold(),
674+
state.transition = match &allocation {
675+
Allocation::Serverless => Transition::Allocating {
676+
destroy_after_start: false,
677+
lost_timeout_ts: allocate_res.now
678+
+ ctx.config().pegboard().actor_allocation_threshold(),
679+
},
680+
Allocation::Serverful { .. } => Transition::Starting {
681+
destroy_after_start: false,
682+
lost_timeout_ts: allocate_res.now
683+
+ ctx.config().pegboard().actor_start_threshold(),
684+
},
667685
};
686+
687+
start_new_generation(ctx, input, state, allocation).await?;
668688
} else {
669689
state.transition = Transition::Reallocating {
670690
since_ts: allocate_res.now,

0 commit comments

Comments
 (0)