Skip to content

Commit 5d6fc8f

Browse files
committed
WIP: idempotent tests
1 parent 86c9121 commit 5d6fc8f

2 files changed

Lines changed: 231 additions & 0 deletions

File tree

pipeline/src/aggregator/mod.rs

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3527,4 +3527,209 @@ mod tests {
35273527
+-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+"#
35283528
]].assert_eq(&phase2_result.to_string());
35293529
}
3530+
3531+
#[tokio::test]
3532+
async fn patch_idempotency() {
3533+
// Test idempotency by processing same events twice
3534+
let events = &[
3535+
&model_and_mids_events()[0..3],
3536+
&model_and_mids_events()[0..3],
3537+
]
3538+
.concat();
3539+
let conclusion_events = conclusion_events_to_record_batch(events).unwrap();
3540+
3541+
let ctx = init().await.unwrap();
3542+
3543+
// First processing: process events normally
3544+
let result = do_pass(
3545+
ctx.actor_handle.clone(),
3546+
None,
3547+
Some(conclusion_events.clone()),
3548+
)
3549+
.await
3550+
.unwrap();
3551+
3552+
// // Second processing: send the SAME events again to the SAME context
3553+
// // This should trigger idempotency detection in join_event_states
3554+
// let _second_result = do_pass(
3555+
// ctx.actor_handle.clone(),
3556+
// None, // Continue from where we left off
3557+
// Some(conclusion_events.clone()), // Same events again!
3558+
// )
3559+
// .await
3560+
// .unwrap();
3561+
3562+
// Get final result to see if we have duplicates
3563+
// let final_result = do_pass(
3564+
// ctx.actor_handle.clone(),
3565+
// None, // Get all results
3566+
// None, // No new events
3567+
// )
3568+
// .await
3569+
// .unwrap();
3570+
3571+
ctx.shutdown().await.unwrap();
3572+
3573+
let result_str = result.to_string();
3574+
println!("pass result:\n{}", result_str);
3575+
3576+
// Count total rows - should only have 3 unique events, not 6 duplicates
3577+
let total_rows = result_str
3578+
.lines()
3579+
.filter(|line| line.starts_with("| ") && !line.contains("event_state_order"))
3580+
.count();
3581+
3582+
// With proper idempotency, should see only 3 processed events, not duplicates
3583+
assert!(
3584+
total_rows <= 4,
3585+
"Should not see duplicated events. Expected <= 4 rows, found {}",
3586+
total_rows
3587+
);
3588+
}
3589+
3590+
// #[tokio::test]
3591+
// async fn idempotency_mixed_duplicates() {
3592+
// // Test that processing duplicate events produces identical results
3593+
// let events = &model_and_mids_events()[0..3];
3594+
// let conclusion_events = conclusion_events_to_record_batch(events).unwrap();
3595+
3596+
// // First processing
3597+
// let first_result = {
3598+
// let ctx = init().await.unwrap();
3599+
// let result = do_pass(
3600+
// ctx.actor_handle.clone(),
3601+
// None,
3602+
// Some(conclusion_events.clone()),
3603+
// )
3604+
// .await
3605+
// .unwrap();
3606+
// ctx.shutdown().await.unwrap();
3607+
// result
3608+
// };
3609+
3610+
// // Second processing: same events (duplicates)
3611+
// let duplicate_result = {
3612+
// let ctx = init().await.unwrap();
3613+
// let result = do_pass(
3614+
// ctx.actor_handle.clone(),
3615+
// None,
3616+
// Some(conclusion_events.clone()),
3617+
// )
3618+
// .await
3619+
// .unwrap();
3620+
// ctx.shutdown().await.unwrap();
3621+
// result
3622+
// };
3623+
3624+
// // Results should be identical - this is the core idempotency test
3625+
// assert_eq!(first_result.to_string(), duplicate_result.to_string());
3626+
// }
3627+
3628+
// #[tokio::test]
3629+
// async fn idempotency_batch_boundaries() {
3630+
// let events = &model_and_mids_events()[0..2];
3631+
3632+
// // First processing: process events normally
3633+
// let first_result = {
3634+
// let ctx = init().await.unwrap();
3635+
// let result = do_pass(
3636+
// ctx.actor_handle.clone(),
3637+
// None,
3638+
// Some(conclusion_events_to_record_batch(events).unwrap()),
3639+
// )
3640+
// .await
3641+
// .unwrap();
3642+
// ctx.shutdown().await.unwrap();
3643+
// result
3644+
// };
3645+
3646+
// // Second processing: process same events again (duplicates at batch boundaries)
3647+
// let duplicate_result = {
3648+
// let ctx = init().await.unwrap();
3649+
// let result = do_pass(
3650+
// ctx.actor_handle.clone(),
3651+
// None,
3652+
// Some(conclusion_events_to_record_batch(events).unwrap()),
3653+
// )
3654+
// .await
3655+
// .unwrap();
3656+
// ctx.shutdown().await.unwrap();
3657+
// result
3658+
// };
3659+
3660+
// // Results should be identical - this is the core idempotency test
3661+
// assert_eq!(first_result.to_string(), duplicate_result.to_string());
3662+
// }
3663+
3664+
// #[tokio::test]
3665+
// async fn idempotency_error_recovery() {
3666+
// let events = &model_and_mids_events()[0..2];
3667+
3668+
// // First processing: process events normally
3669+
// let first_result = {
3670+
// let ctx = init().await.unwrap();
3671+
// let result = do_pass(
3672+
// ctx.actor_handle.clone(),
3673+
// None,
3674+
// Some(conclusion_events_to_record_batch(events).unwrap()),
3675+
// )
3676+
// .await
3677+
// .unwrap();
3678+
// ctx.shutdown().await.unwrap();
3679+
// result
3680+
// };
3681+
3682+
// // Second processing: process same events again (recovery scenario)
3683+
// let recovery_result = {
3684+
// let ctx = init().await.unwrap();
3685+
// let result = do_pass(
3686+
// ctx.actor_handle.clone(),
3687+
// None,
3688+
// Some(conclusion_events_to_record_batch(events).unwrap()),
3689+
// )
3690+
// .await
3691+
// .unwrap();
3692+
// ctx.shutdown().await.unwrap();
3693+
// result
3694+
// };
3695+
3696+
// // Results should be identical - this is the core idempotency test
3697+
// assert_eq!(first_result.to_string(), recovery_result.to_string());
3698+
// }
3699+
3700+
// #[tokio::test]
3701+
// async fn idempotency_cross_stream() {
3702+
// let events = &model_and_mids_events()[0..2];
3703+
3704+
// // First processing: process events normally
3705+
// let first_result = {
3706+
// let ctx = init().await.unwrap();
3707+
// let result = do_pass(
3708+
// ctx.actor_handle.clone(),
3709+
// None,
3710+
// Some(conclusion_events_to_record_batch(events).unwrap()),
3711+
// )
3712+
// .await
3713+
// .unwrap();
3714+
// ctx.shutdown().await.unwrap();
3715+
// result
3716+
// };
3717+
3718+
// // Second processing: process same events again (cross-stream scenario)
3719+
// let cross_stream_result = {
3720+
// let ctx = init().await.unwrap();
3721+
// let result = do_pass(
3722+
// ctx.actor_handle.clone(),
3723+
// None,
3724+
// Some(conclusion_events_to_record_batch(events).unwrap()),
3725+
// )
3726+
// .await
3727+
// .unwrap();
3728+
// ctx.shutdown().await.unwrap();
3729+
// result
3730+
// };
3731+
3732+
// // Results should be identical - this is the core idempotency test
3733+
// assert_eq!(first_result.to_string(), cross_stream_result.to_string());
3734+
// }
35303735
}

pipeline/src/aggregator/model_instance_patch.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,33 @@ impl PartitionEvaluator for CeramicPatchEvaluator {
181181
let mut new_heights = UInt32Builder::new();
182182
// We need to keep the patch around for validation.
183183
let mut resolved_patches = BinaryBuilder::new();
184+
184185
for i in 0..num_rows {
186+
// Simple duplicate detection: check if this CID was already processed
187+
let next = event_cids.value(i);
188+
if i > 0 {
189+
let last = event_cids.value(i - 1);
190+
println!("last event: {:?}", last);
191+
println!("next event: {:?}", next);
192+
println!("duplicate: {}", next == last);
193+
println!("---------------");
194+
}
195+
196+
// if i > 0
197+
// && event_cids.is_valid(i)
198+
// && event_cids.is_valid(i - 1)
199+
// && event_cids.value(i) == event_cids.value(i - 1)
200+
// {
201+
// println!(
202+
// "! Found duplicate CID {:?}, skipping...",
203+
// event_cids.value(i)
204+
// );
205+
// new_states.append_null();
206+
// model_versions.append_null();
207+
// resolved_patches.append_null();
208+
// new_heights.append_null();
209+
// continue;
210+
// }
185211
if previous_cids.is_valid(i) {
186212
if let Some((previous_state, previous_height)) = if previous_states.is_valid(i) {
187213
// We know the previous state already

0 commit comments

Comments
 (0)