Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions engine/packages/guard-core/src/proxy_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -750,10 +750,10 @@ impl ProxyService {
.body(Full::new(req_body.clone()))
.map_err(|err| errors::RequestBuildError(err.to_string()).build())?;

// Send the request with timeout
let res = timeout(timeout_duration, self.state.client.request(proxied_req))
.await
.map_err(|_| {
// Send the request with timeout
let res = timeout(timeout_duration, self.state.client.request(proxied_req))
.await
.map_err(|_| {
errors::RequestTimeout {
timeout_seconds: timeout_duration.as_secs(),
}
Expand Down
4 changes: 1 addition & 3 deletions engine/packages/guard-core/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,7 @@ pub async fn run_server(
metrics::TCP_CONNECTION_PENDING.inc();
metrics::TCP_CONNECTION_TOTAL.inc();

if tcp_nodelay
&& let Err(err) = tcp_stream.set_nodelay(true)
{
if tcp_nodelay && let Err(err) = tcp_stream.set_nodelay(true) {
tracing::debug!(?err, "failed to enable tcp nodelay");
}

Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard/src/workflows/actor2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ async fn process_signal(
&input,
state,
metrics_workflow_id,
runtime::StoppedVariant::Normal {
runtime::StoppedVariant::Stopped {
code: code.clone(),
message: message.clone(),
},
Expand Down
188 changes: 86 additions & 102 deletions engine/packages/pegboard/src/workflows/actor2/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ pub async fn reschedule_actor(
#[derive(Debug)]
pub enum StoppedVariant {
FailedAllocation,
Normal {
Stopped {
code: protocol::StopCode,
message: Option<String>,
},
Expand All @@ -468,11 +468,11 @@ pub async fn handle_stopped(
// Save error to state
match &variant {
StoppedVariant::FailedAllocation => {}
StoppedVariant::Normal {
StoppedVariant::Stopped {
code: protocol::StopCode::Ok,
..
} => {}
StoppedVariant::Normal {
StoppedVariant::Stopped {
code: protocol::StopCode::Error,
message,
} => {
Expand Down Expand Up @@ -527,122 +527,106 @@ pub async fn handle_stopped(
.await?;
}

let (try_reallocate, going_away) = match state.transition {
Transition::SleepIntent {
rewake_after_stop, ..
} => (rewake_after_stop, false),
Transition::GoingAway { .. } => (true, true),
Transition::Destroying { .. } => return Ok(StoppedResult::Destroy),
_ => (true, false),
};

// Always immediately reallocate if going away
let stopped_res = if going_away {
let allocate_res = ctx.activity(AllocateInput {}).await?;

if let Some(allocation) = allocate_res.allocation {
state.generation += 1;

ctx.activity(SendOutboundInput {
generation: state.generation,
input: input.input.clone(),
allocation,
})
.await?;

// Transition to allocating
state.transition = Transition::Allocating {
destroy_after_start: false,
lost_timeout_ts: allocate_res.now
+ ctx.config().pegboard().actor_allocation_threshold(),
};
} else {
// Transition to retry backoff
state.transition = Transition::Reallocating {
since_ts: allocate_res.now,
};
}
enum Decision {
Reallocate,
Backoff,
Sleep,
Destroy,
}

StoppedResult::Continue
} else if try_reallocate {
let decision = match (&state.transition, input.crash_policy, variant) {
(
Transition::SleepIntent {
rewake_after_stop: true,
..
},
_,
_,
) => Decision::Reallocate,
(
Transition::SleepIntent {
rewake_after_stop: false,
..
},
_,
_,
) => Decision::Reallocate,
(Transition::GoingAway { .. }, _, _) => Decision::Reallocate,
(Transition::Destroying { .. }, _, _) => Decision::Destroy,
(_, _, StoppedVariant::FailedAllocation) => Decision::Backoff,
// An actor stopping with `StopCode::Ok` indicates a graceful exit
let graceful_exit = matches!(
variant,
StoppedVariant::Normal {
(
_,
_,
StoppedVariant::Stopped {
code: protocol::StopCode::Ok,
..
}
);
},
) => Decision::Destroy,
(_, CrashPolicy::Restart, _) => Decision::Reallocate,
(_, CrashPolicy::Sleep, _) => Decision::Sleep,
(_, CrashPolicy::Destroy, _) => Decision::Destroy,
};

match (input.crash_policy, graceful_exit) {
(CrashPolicy::Restart, false) => {
let allocate_res = ctx.activity(AllocateInput {}).await?;

if let Some(allocation) = allocate_res.allocation {
state.generation += 1;

ctx.activity(SendOutboundInput {
generation: state.generation,
input: input.input.clone(),
allocation,
})
.await?;

// Transition to allocating
state.transition = Transition::Allocating {
destroy_after_start: false,
lost_timeout_ts: allocate_res.now
+ ctx.config().pegboard().actor_allocation_threshold(),
};
} else {
// Transition to retry backoff
state.transition = Transition::Reallocating {
since_ts: allocate_res.now,
};
}
let stopped_res = match decision {
Decision::Reallocate => {
let allocate_res = ctx.activity(AllocateInput {}).await?;

StoppedResult::Continue
}
(CrashPolicy::Sleep, false) => {
tracing::debug!(actor_id=?input.actor_id, "actor sleeping due to ungraceful exit");
if let Some(allocation) = allocate_res.allocation {
state.generation += 1;

// Clear alarm
if let Some(alarm_ts) = state.alarm_ts {
let now = ctx.activity(GetTsInput {}).await?;
ctx.activity(SendOutboundInput {
generation: state.generation,
input: input.input.clone(),
allocation,
})
.await?;

if now >= alarm_ts {
state.alarm_ts = None;
}
}
// Transition to allocating
state.transition = Transition::Allocating {
destroy_after_start: false,
lost_timeout_ts: allocate_res.now
+ ctx.config().pegboard().actor_allocation_threshold(),
};
} else {
// Transition to retry backoff
state.transition = Transition::Reallocating {
since_ts: allocate_res.now,
};
}

// Transition to sleeping
state.transition = Transition::Sleeping;
StoppedResult::Continue
}
Decision::Backoff => {
let now = ctx.activity(GetTsInput {}).await?;

StoppedResult::Continue
}
_ => {
let now = ctx.activity(GetTsInput {}).await?;
state.transition = Transition::Reallocating { since_ts: now };

// Don't destroy on failed allocation, retry instead
if let StoppedVariant::FailedAllocation { .. } = &variant {
// Transition to retry backoff
state.transition = Transition::Reallocating { since_ts: now };
StoppedResult::Continue
}
Decision::Sleep => {
// Clear alarm
if let Some(alarm_ts) = state.alarm_ts {
let now = ctx.activity(GetTsInput {}).await?;

StoppedResult::Continue
} else {
StoppedResult::Destroy
if now >= alarm_ts {
state.alarm_ts = None;
}
}
}
} else {
// Transition to sleeping
state.transition = Transition::Sleeping;

StoppedResult::Continue
// Transition to sleeping
state.transition = Transition::Sleeping;

StoppedResult::Continue
}
Decision::Destroy => StoppedResult::Destroy,
};

if let Transition::Sleeping = state.transition {
ctx.activity(SetSleepingInput {}).await?;
match state.transition {
Transition::Sleeping | Transition::Reallocating { .. } => {
ctx.activity(SetSleepingInput {}).await?;
}
_ => {}
}

ctx.msg(Stopped {})
Expand Down
22 changes: 10 additions & 12 deletions engine/sdks/typescript/test-envoy/src/index.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading