Skip to content

Commit 3d96914

Browse files
committed
fix(rivetkit-core): chunk apply_batch puts/deletes at 128
1 parent aac9634 commit 3d96914

2 files changed

Lines changed: 21 additions & 5 deletions

File tree

rivetkit-rust/packages/rivetkit-core/src/actor/kv.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ use rivet_envoy_client::handle::EnvoyHandle;
1616
use crate::error::ActorRuntime;
1717
use crate::types::ListOpts;
1818

19+
/// Maximum keys per `apply_batch` put or delete list. Mirrors the engine-side
20+
/// `MAX_KEYS` limit in `engine/packages/pegboard/src/actor_kv/mod.rs`; the
21+
/// envoy backend rejects requests above this.
22+
pub(crate) const APPLY_BATCH_CHUNK_SIZE: usize = 128;
23+
1924
#[derive(Clone)]
2025
pub struct Kv {
2126
backend: KvBackend,

rivetkit-rust/packages/rivetkit-core/src/actor/state.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use tracing::Instrument;
1717

1818
use crate::actor::context::ActorContext;
1919
use crate::actor::keys::{LAST_PUSHED_ALARM_KEY, PERSIST_DATA_KEY, make_connection_key};
20+
use crate::actor::kv::APPLY_BATCH_CHUNK_SIZE;
2021
use crate::actor::messages::StateDelta;
2122
use crate::actor::persist::{
2223
decode_latest_with_embedded_version, encode_latest_with_embedded_version,
@@ -339,11 +340,21 @@ impl ActorContext {
339340
(puts, deletes, next_state, revision, self.begin_write())
340341
};
341342

342-
self.0
343-
.kv
344-
.apply_batch(&puts, &deletes)
345-
.await
346-
.context("persist actor state deltas to kv")?;
343+
// TODO: make this atomic; store in sqlite
344+
let mut put_chunks = puts.chunks(APPLY_BATCH_CHUNK_SIZE);
345+
let mut delete_chunks = deletes.chunks(APPLY_BATCH_CHUNK_SIZE);
346+
loop {
347+
let put_chunk = put_chunks.next().unwrap_or(&[]);
348+
let delete_chunk = delete_chunks.next().unwrap_or(&[]);
349+
if put_chunk.is_empty() && delete_chunk.is_empty() {
350+
break;
351+
}
352+
self.0
353+
.kv
354+
.apply_batch(put_chunk, delete_chunk)
355+
.await
356+
.context("persist actor state deltas to kv")?;
357+
}
347358

348359
if let Some(state) = next_state {
349360
*self.0.current_state.write() = state;

0 commit comments

Comments
 (0)