diff --git a/rivetkit-rust/packages/actor-persist/src/versioned.rs b/rivetkit-rust/packages/actor-persist/src/versioned.rs index c4887f21e4..8b59de1a4e 100644 --- a/rivetkit-rust/packages/actor-persist/src/versioned.rs +++ b/rivetkit-rust/packages/actor-persist/src/versioned.rs @@ -29,7 +29,9 @@ impl OwnedVersionedData for Actor { 1 => Ok(Self::V1(serde_bare::from_slice(payload)?)), 2 => Ok(Self::V2(serde_bare::from_slice(payload)?)), 3 => Ok(Self::V3(serde_bare::from_slice(payload)?)), - 4 => Ok(Self::V4(serde_bare::from_slice(payload)?)), + 4 => Ok(Self::V4(Self::decode_v4_with_legacy_raw_args_fallback( + payload, + )?)), _ => bail!("invalid actor persist version: {version}"), } } @@ -54,6 +56,58 @@ impl OwnedVersionedData for Actor { } impl Actor { + fn decode_v4_with_legacy_raw_args_fallback(payload: &[u8]) -> Result { + #[derive(serde::Deserialize)] + struct LegacyRawV4Actor { + input: Option>, + has_initialized: bool, + state: Vec, + scheduled_events: Vec, + } + + #[derive(serde::Deserialize)] + struct LegacyRawV4ScheduleEvent { + event_id: String, + timestamp_ms: i64, + action: String, + args: Vec, + } + + // serde_bare accepts any nonzero bool as true, so legacy raw args can + // sometimes decode as current v4. Only accept canonical current-v4 bytes. + let current_error = match serde_bare::from_slice::(payload) { + Ok(actor) => { + if serde_bare::to_vec(&actor).is_ok_and(|encoded| encoded == payload) { + return Ok(actor); + } + None + } + Err(error) => Some(error), + }; + + // A short-lived v4 writer stored schedule args as raw `data` while + // the fixed v4 schema expects `optional`. Decode that reused + // version so actors persisted by the bad writer can restart. + match serde_bare::from_slice::(payload) { + Ok(actor) => Ok(v4::Actor { + input: actor.input, + has_initialized: actor.has_initialized, + state: actor.state, + scheduled_events: actor + .scheduled_events + .into_iter() + .map(|event| v4::ScheduleEvent { + event_id: event.event_id, + timestamp: event.timestamp_ms, + action: event.action, + args: (!event.args.is_empty()).then_some(event.args), + }) + .collect(), + }), + Err(legacy_error) => Err(current_error.unwrap_or(legacy_error).into()), + } + } + fn v1_to_v2(self) -> Result { let Self::V1(data) = self else { bail!("expected actor persist v1 Actor"); diff --git a/rivetkit-rust/packages/actor-persist/tests/versioned.rs b/rivetkit-rust/packages/actor-persist/tests/versioned.rs new file mode 100644 index 0000000000..c8750590a1 --- /dev/null +++ b/rivetkit-rust/packages/actor-persist/tests/versioned.rs @@ -0,0 +1,29 @@ +use rivetkit_actor_persist::versioned; +use vbare::OwnedVersionedData; + +#[test] +fn actor_decodes_legacy_raw_v4_schedule_args() { + let encoded = b"\x04\x00\x01\x05input\x01\x05state\x01\x07event-1\x2a\x00\x00\x00\x00\x00\x00\x00\x0fhandleTurnTimer\x01\x80"; + + let decoded = versioned::Actor::deserialize_with_embedded_version(encoded) + .expect("legacy raw v4 actor should decode"); + + assert_eq!(decoded.input, Some(b"input".to_vec())); + assert!(decoded.has_initialized); + assert_eq!(decoded.state, b"state"); + assert_eq!(decoded.scheduled_events.len(), 1); + assert_eq!(decoded.scheduled_events[0].event_id, "event-1"); + assert_eq!(decoded.scheduled_events[0].timestamp, 42); + assert_eq!(decoded.scheduled_events[0].action, "handleTurnTimer"); + assert_eq!(decoded.scheduled_events[0].args, Some(vec![0x80])); +} + +#[test] +fn actor_decodes_legacy_raw_v4_when_current_v4_accepts_bytes() { + let encoded = b"\x04\x00\x00\x01\x05state\x01\x07event-1\x2a\x00\x00\x00\x00\x00\x00\x00\x0fhandleTurnTimer\x02\x01\x99"; + + let decoded = versioned::Actor::deserialize_with_embedded_version(encoded) + .expect("legacy raw v4 actor accepted by current v4 should decode"); + + assert_eq!(decoded.scheduled_events[0].args, Some(vec![0x01, 0x99])); +}