Skip to content

Commit d00c2b0

Browse files
authored
chore(pegboard): cache actor kv info (#4617)
1 parent be09242 commit d00c2b0

1 file changed

Lines changed: 35 additions & 20 deletions

File tree

engine/packages/pegboard/src/ops/actor/get_for_kv.rs

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,36 +8,51 @@ pub struct Input {
88
pub actor_id: Id,
99
}
1010

11-
#[derive(Debug)]
11+
#[derive(Debug, Serialize, Deserialize)]
1212
pub struct Output {
1313
pub name: String,
1414
pub namespace_id: Id,
1515
}
1616

17-
// TODO: Add cache (remember to purge cache when runner changes)
1817
#[operation]
1918
pub async fn pegboard_actor_get_for_kv(
2019
ctx: &OperationCtx,
2120
input: &Input,
2221
) -> Result<Option<Output>> {
23-
ctx.udb()?
24-
.run(|tx| async move {
25-
let tx = tx.with_subspace(keys::subspace());
26-
27-
let name_key = keys::actor::NameKey::new(input.actor_id);
28-
let namespace_id_key = keys::actor::NamespaceIdKey::new(input.actor_id);
29-
30-
let (name_entry, namespace_id_entry) = tokio::try_join!(
31-
tx.read_opt(&name_key, Serializable),
32-
tx.read_opt(&namespace_id_key, Serializable),
33-
)?;
34-
35-
let (Some(name), Some(namespace_id)) = (name_entry, namespace_id_entry) else {
36-
return Ok(None);
37-
};
38-
39-
Ok(Some(Output { name, namespace_id }))
22+
ctx.cache()
23+
.clone()
24+
.request()
25+
.fetch_one_json("actor.actor_get_for_kv", input.actor_id, {
26+
move |mut cache, key| async move {
27+
let output = ctx
28+
.udb()?
29+
.run(|tx| async move {
30+
let tx = tx.with_subspace(keys::subspace());
31+
32+
let name_key = keys::actor::NameKey::new(input.actor_id);
33+
let namespace_id_key = keys::actor::NamespaceIdKey::new(input.actor_id);
34+
35+
let (name_entry, namespace_id_entry) = tokio::try_join!(
36+
tx.read_opt(&name_key, Serializable),
37+
tx.read_opt(&namespace_id_key, Serializable),
38+
)?;
39+
40+
let (Some(name), Some(namespace_id)) = (name_entry, namespace_id_entry)
41+
else {
42+
return Ok(None);
43+
};
44+
45+
Ok(Some(Output { name, namespace_id }))
46+
})
47+
.custom_instrument(tracing::info_span!("actor_get_for_kv_tx"))
48+
.await?;
49+
50+
if let Some(output) = output {
51+
cache.resolve(&key, output);
52+
}
53+
54+
Ok(cache)
55+
}
4056
})
41-
.custom_instrument(tracing::info_span!("actor_get_for_kv_tx"))
4257
.await
4358
}

0 commit comments

Comments
 (0)