Skip to content

Commit 3e2c4f5

Browse files
committed
fix(pegboard): refresh runner config after envoy connect
1 parent db4224a commit 3e2c4f5

1 file changed

Lines changed: 61 additions & 41 deletions

File tree

  • engine/packages/pegboard-envoy/src

engine/packages/pegboard-envoy/src/conn.rs

Lines changed: 61 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ pub async fn init_conn(
8686
.observe(start.elapsed().as_secs_f64());
8787

8888
let udb = ctx.udb()?;
89-
let (_, mut missed_commands) = tokio::try_join!(
89+
let (_, (mut missed_commands, runner_config_protocol_changed)) = tokio::try_join!(
9090
// Send init packet as soon as possible
9191
async {
9292
let pb = ctx.config().pegboard();
@@ -223,13 +223,11 @@ pub async fn init_conn(
223223
// the pool's protocol version is updated via the metadata_poller wf but that only runs for
224224
// serverless pools.
225225
let ns_tx = tx.with_subspace(namespace::keys::subspace());
226-
ns_tx.write(
227-
&pegboard::keys::runner_config::ProtocolVersionKey::new(
226+
let runner_config_protocol_version_key =
227+
pegboard::keys::runner_config::ProtocolVersionKey::new(
228228
namespace_id,
229229
pool_name.clone(),
230-
),
231-
protocol_version,
232-
)?;
230+
);
233231

234232
let envoy_actor_commands_subspace = pegboard::keys::subspace().subspace(
235233
&pegboard::keys::envoy::ActorCommandKey::subspace(
@@ -238,47 +236,69 @@ pub async fn init_conn(
238236
),
239237
);
240238

241-
// Read missed commands
242-
tx.get_ranges_keyvalues(
243-
RangeOption {
244-
mode: StreamingMode::WantAll,
245-
..(&envoy_actor_commands_subspace).into()
246-
},
247-
Serializable,
248-
)
249-
.map(|res| {
250-
let (key, command) =
251-
tx.read_entry::<pegboard::keys::envoy::ActorCommandKey>(&res?)?;
252-
match command {
253-
protocol::ActorCommandKeyData::CommandStartActor(x) => {
254-
Ok(protocol::CommandWrapper {
255-
checkpoint: protocol::ActorCheckpoint {
256-
actor_id: key.actor_id.to_string(),
257-
generation: key.generation,
258-
index: key.index,
259-
},
260-
inner: protocol::Command::CommandStartActor(x),
261-
})
262-
}
263-
protocol::ActorCommandKeyData::CommandStopActor(x) => {
264-
Ok(protocol::CommandWrapper {
265-
checkpoint: protocol::ActorCheckpoint {
266-
actor_id: key.actor_id.to_string(),
267-
generation: key.generation,
268-
index: key.index,
269-
},
270-
inner: protocol::Command::CommandStopActor(x),
271-
})
239+
let (existing_runner_config_protocol_version, missed_commands) = tokio::try_join!(
240+
ns_tx.read_opt(&runner_config_protocol_version_key, Serializable),
241+
// Read missed commands
242+
tx.get_ranges_keyvalues(
243+
RangeOption {
244+
mode: StreamingMode::WantAll,
245+
..(&envoy_actor_commands_subspace).into()
246+
},
247+
Serializable,
248+
)
249+
.map(|res| -> anyhow::Result<protocol::CommandWrapper> {
250+
let (key, command) =
251+
tx.read_entry::<pegboard::keys::envoy::ActorCommandKey>(&res?)?;
252+
match command {
253+
protocol::ActorCommandKeyData::CommandStartActor(x) => {
254+
Ok(protocol::CommandWrapper {
255+
checkpoint: protocol::ActorCheckpoint {
256+
actor_id: key.actor_id.to_string(),
257+
generation: key.generation,
258+
index: key.index,
259+
},
260+
inner: protocol::Command::CommandStartActor(x),
261+
})
262+
}
263+
protocol::ActorCommandKeyData::CommandStopActor(x) => {
264+
Ok(protocol::CommandWrapper {
265+
checkpoint: protocol::ActorCheckpoint {
266+
actor_id: key.actor_id.to_string(),
267+
generation: key.generation,
268+
index: key.index,
269+
},
270+
inner: protocol::Command::CommandStopActor(x),
271+
})
272+
}
272273
}
273-
}
274-
})
275-
.try_collect::<Vec<_>>()
276-
.await
274+
})
275+
.try_collect::<Vec<_>>(),
276+
)?;
277+
278+
let runner_config_protocol_changed =
279+
existing_runner_config_protocol_version != Some(protocol_version);
280+
if runner_config_protocol_changed {
281+
ns_tx.write(&runner_config_protocol_version_key, protocol_version)?;
282+
}
283+
284+
Ok((missed_commands, runner_config_protocol_changed))
277285
}
278286
})
279287
.custom_instrument(tracing::info_span!("envoy_init_tx")),
280288
)?;
281289

290+
if runner_config_protocol_changed {
291+
// This connection path updates the pool protocol version directly.
292+
// Purge runner-config caches so readers do not keep seeing stale
293+
// pre-mk2 data after the transaction commits.
294+
pegboard::utils::purge_runner_config_caches(
295+
ctx.cache(),
296+
namespace.namespace_id,
297+
&pool_name,
298+
)
299+
.await?;
300+
}
301+
282302
// Send missed commands (must be after init packet)
283303
if !missed_commands.is_empty() {
284304
let msg = {

0 commit comments

Comments
 (0)