Skip to content

Commit f268b35

Browse files
committed
fix(rivetkit-core): chunk apply_batch puts/deletes at 128
1 parent 6208017 commit f268b35

2 files changed

Lines changed: 21 additions & 5 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/actor/kv.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ use rivet_envoy_client::handle::EnvoyHandle;
1616
use crate::error::ActorRuntime;
1717
use crate::types::ListOpts;
1818

19+
/// Maximum keys per `apply_batch` put or delete list. Mirrors the engine-side
20+
/// `MAX_KEYS` limit in `engine/packages/pegboard/src/actor_kv/mod.rs`; the
21+
/// envoy backend rejects requests above this.
22+
pub(crate) const APPLY_BATCH_CHUNK_SIZE: usize = 128;
23+
1924
#[derive(Clone)]
2025
pub struct Kv {
2126
backend: KvBackend,

rivetkit-rust/packages/rivetkit-core/src/actor/state.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use tracing::Instrument;
1717

1818
use crate::actor::context::ActorContext;
1919
use crate::actor::keys::{LAST_PUSHED_ALARM_KEY, PERSIST_DATA_KEY, make_connection_key};
20+
use crate::actor::kv::APPLY_BATCH_CHUNK_SIZE;
2021
use crate::actor::messages::StateDelta;
2122
use crate::actor::persist::{
2223
decode_latest_with_embedded_version, encode_latest_with_embedded_version,
@@ -349,11 +350,21 @@ impl ActorContext {
349350
(puts, deletes, next_state, revision, self.begin_write())
350351
};
351352

352-
self.0
353-
.kv
354-
.apply_batch(&puts, &deletes)
355-
.await
356-
.context("persist actor state deltas to kv")?;
353+
// TODO: make this atomic; store in sqlite
354+
let mut put_chunks = puts.chunks(APPLY_BATCH_CHUNK_SIZE);
355+
let mut delete_chunks = deletes.chunks(APPLY_BATCH_CHUNK_SIZE);
356+
loop {
357+
let put_chunk = put_chunks.next().unwrap_or(&[]);
358+
let delete_chunk = delete_chunks.next().unwrap_or(&[]);
359+
if put_chunk.is_empty() && delete_chunk.is_empty() {
360+
break;
361+
}
362+
self.0
363+
.kv
364+
.apply_batch(put_chunk, delete_chunk)
365+
.await
366+
.context("persist actor state deltas to kv")?;
367+
}
357368

358369
if let Some(state) = next_state {
359370
*self.0.current_state.write() = state;

0 commit comments

Comments
 (0)