diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 08aa21a9c..18023f54e 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -101,6 +101,8 @@ jobs: with: package_json_file: tests/suite/package.json - uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt, clippy - uses: Swatinem/rust-cache@v2 with: # The prefix cache key, this can be changed to start a new cache manually. diff --git a/Cargo.lock b/Cargo.lock index bc06cea3e..d59c51681 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2686,10 +2686,12 @@ dependencies = [ "lru 0.10.1", "mockall", "multibase 0.9.1", + "multihash-codetable", "object_store", "parking_lot", "paste", "prometheus-client", + "rand 0.8.5", "schemars 1.0.0-alpha.17", "serde", "serde_json", diff --git a/anchor-service/src/anchor.rs b/anchor-service/src/anchor.rs index 794b3cfd9..1ccf12001 100644 --- a/anchor-service/src/anchor.rs +++ b/anchor-service/src/anchor.rs @@ -57,7 +57,7 @@ impl MerkleNodes { } /// Return an iterator over the MerkleNodes - pub fn iter(&self) -> indexmap::map::Iter { + pub fn iter(&self) -> indexmap::map::Iter<'_, Cid, MerkleNode> { self.nodes.iter() } diff --git a/core/src/peer.rs b/core/src/peer.rs index a300a6137..f7db0e11d 100644 --- a/core/src/peer.rs +++ b/core/src/peer.rs @@ -224,7 +224,7 @@ impl Builder { PeerKey(format!("{:0>11}", self.state.expiration)) } /// Set the peer id. Note, a NodeKey is required so the [`PeerEntry`] can be signed. - pub fn with_id(self, id: &NodeKey) -> Builder { + pub fn with_id(self, id: &NodeKey) -> Builder> { Builder { state: WithId { node_key: id, diff --git a/flight/tests/server.rs b/flight/tests/server.rs index f267c0201..a932f1d35 100644 --- a/flight/tests/server.rs +++ b/flight/tests/server.rs @@ -397,7 +397,7 @@ async fn event_states_simple() -> Result<()> { data::varchar as data, before, chain_id - FROM event_states LIMIT 4"# + FROM event_states order by event_state_order LIMIT 4"# .to_string(), None, ) diff --git a/pipeline/Cargo.toml b/pipeline/Cargo.toml index ac572b6bc..672faafca 100644 --- a/pipeline/Cargo.toml +++ b/pipeline/Cargo.toml @@ -52,3 +52,5 @@ url.workspace = true mockall.workspace = true test-log.workspace = true tracing-subscriber.workspace = true +rand.workspace = true +multihash-codetable.workspace = true \ No newline at end of file diff --git a/pipeline/src/aggregator/mod.rs b/pipeline/src/aggregator/mod.rs index d3d11510f..c22996352 100644 --- a/pipeline/src/aggregator/mod.rs +++ b/pipeline/src/aggregator/mod.rs @@ -226,6 +226,12 @@ impl Aggregator { }) }); + debug!( + ?max_conclusion_event_order, + ?max_event_state_order, + "aggregator highwater marks" + ); + // Spawn actor let (broadcast_tx, _broadcast_rx) = broadcast::channel(1_000); let aggregator = Aggregator { @@ -292,7 +298,7 @@ impl Aggregator { let models = self.patch_models(models).await.context("patching models")?; let models = self.validate_models(models).context("validating models")?; let mut models = self - .store_event_states(models) + .store_event_states(models, false) .await .context("storing models")?; @@ -423,7 +429,7 @@ impl Aggregator { .await .context("validating mids")?; let mut mids = self - .store_event_states(mids) + .store_event_states(mids, true) .await .context("storing mids")?; let mut ordered_events = Vec::with_capacity(models.len() + mids.len()); @@ -453,7 +459,7 @@ impl Aggregator { .with_column_renamed("data", "previous_data")? .with_column_renamed("event_height", "previous_height")?; - Ok(conclusion_events + let conclusion_events = conclusion_events // MID only ever use the first previous, so we can optimize the join by selecting the // first element of the previous array. .select(vec![ @@ -471,7 +477,25 @@ impl Aggregator { col("before"), col("chain_id"), cid_part(col("event_cid")).alias("event_cid_partition"), - ])? + ]) + .context("selecting conclusion events")? + .alias("conclusion_events")?; + + // remove events we've already seen + let conclusion_events = conclusion_events + .join_on( + event_states.clone(), + JoinType::LeftAnti, + vec![ + col("conclusion_events.event_cid_partition").eq(col("ecp")), + col("conclusion_events.event_cid") + .eq(table_col(EVENT_STATES_TABLE, "event_cid")), + ], + ) + .context("anti join")?; + + // join all new conclusion_events with known history to apply patches + let conclusion_events = conclusion_events .join_on( event_states, JoinType::Left, @@ -483,20 +507,23 @@ impl Aggregator { .context("setup join")? .select(vec![ col("conclusion_event_order"), - anon_col("stream_cid").alias("stream_cid"), - anon_col("stream_type").alias("stream_type"), - anon_col("controller").alias("controller"), - anon_col("dimensions").alias("dimensions"), - anon_col("event_cid").alias("event_cid"), - anon_col("event_type").alias("event_type"), + col("conclusion_events.stream_cid").alias("stream_cid"), + col("conclusion_events.stream_type").alias("stream_type"), + col("conclusion_events.controller").alias("controller"), + col("conclusion_events.dimensions").alias("dimensions"), + col("conclusion_events.event_cid").alias("event_cid"), + col("conclusion_events.event_type").alias("event_type"), + col("conclusion_events.data").alias("data"), col("previous"), col("previous_data"), col("previous_height"), - anon_col("data").alias("data"), - anon_col("before").alias("before"), - anon_col("chain_id").alias("chain_id"), + col("conclusion_events.before").alias("before"), + col("conclusion_events.chain_id").alias("chain_id"), col("event_cid_partition"), - ])?) + ]) + .context("select joined conclusion events")?; + + Ok(conclusion_events) } // Retrieves all pending events and joins them with the models if they are now available. @@ -743,7 +770,11 @@ impl Aggregator { } #[instrument(skip_all)] - async fn store_event_states(&mut self, event_states: DataFrame) -> Result> { + async fn store_event_states( + &mut self, + event_states: DataFrame, + allow_cache_flush: bool, + ) -> Result> { let row_count = event_states.clone().count().await?; if row_count == 0 { return Ok(vec![RecordBatch::new_empty( @@ -802,10 +833,14 @@ impl Aggregator { .await .context("writing to mem table data")?; - let count = self.count_cache().await?; - // If we have enough data cached in memory write it out to persistent store - if count >= self.max_cached_rows { + if allow_cache_flush { self.flush_cache().await?; + let count = self.count_cache().await?; + tracing::debug!(%count, will_flush = %count >= self.max_cached_rows, "counts for mem table"); + // If we have enough data cached in memory write it out to persistent store + if count >= self.max_cached_rows { + self.flush_cache().await?; + } } ordered.collect().await.context("collecting ordered events") } @@ -993,6 +1028,7 @@ impl Handler for Aggregator { ); let batch = self.process_conclusion_events_batch(message.events).await?; if batch.num_rows() > 0 { + tracing::trace!(event_count = %batch.num_rows(), "sending events to subscribers"); let _ = self.broadcast_tx.send(batch); } Ok(()) @@ -1243,6 +1279,22 @@ mod tests { ConclusionData, ConclusionEvent, ConclusionInit, ConclusionTime, }; + async fn init_with_cache( + max_cached_rows: Option, + ) -> anyhow::Result> { + let mut mock_concluder = MockConcluder::new(); + mock_concluder + .expect_handle_subscribe_since() + .once() + .return_once(|_msg| { + Ok(Box::pin(RecordBatchStreamAdapter::new( + schemas::conclusion_events(), + stream::empty(), + ))) + }); + init_with_concluder(mock_concluder, max_cached_rows).await + } + async fn init() -> anyhow::Result> { let mut mock_concluder = MockConcluder::new(); mock_concluder @@ -1254,13 +1306,14 @@ mod tests { stream::empty(), ))) }); - init_with_concluder(mock_concluder).await + init_with_concluder(mock_concluder, None).await } async fn init_with_concluder( mock_concluder: MockConcluder, + max_cached_rows: Option, ) -> anyhow::Result> { - init_with_object_store(mock_concluder, Arc::new(InMemory::new()), None).await + init_with_object_store(mock_concluder, Arc::new(InMemory::new()), max_cached_rows).await } async fn init_with_object_store( mock_concluder: MockConcluder, @@ -1929,7 +1982,7 @@ mod tests { ))) }); - let ctx = init_with_concluder(mock_concluder).await.unwrap(); + let ctx = init_with_concluder(mock_concluder, None).await.unwrap(); let event_states = do_pass(ctx.actor_handle.clone(), None, None).await.unwrap(); expect![[r#" +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+ @@ -1944,93 +1997,95 @@ mod tests { } #[test(tokio::test)] - async fn sub_concluder_preexisting() { - // This test ensures that the logic that polls the concluder actor starts at the correct - // offset when there is preexisting data. - - let events = model_and_mids_events(); - // Use the same object store in both instances of the actor as that is where data is - // physically stored. - let object_store = Arc::new(InMemory::new()); - { - let first_events = conclusion_events_to_record_batch(&events[0..2]); + async fn sub_concluder_preexisting_from_beginning() { + // wrap it so it times out and we see logs if expectations are wrong + async fn run_test() { + // This test ensures that the logic that polls the concluder actor starts at the correct + // offset when there is preexisting data. + + let events = model_and_mids_events(); + // Use the same object store in both instances of the actor as that is where data is + // physically stored. + let object_store = Arc::new(InMemory::new()); + { + let first_events = conclusion_events_to_record_batch(&events[0..2]); + + // Setup mock that returns the conclusion_events + let mut mock_concluder = MockConcluder::new(); + mock_concluder + .expect_handle_subscribe_since() + .once() + .with(predicate::eq(SubscribeSinceMsg { + projection: None, + filters: None, + limit: None, + })) + .return_once(|_msg| { + Ok(Box::pin(RecordBatchStreamAdapter::new( + schemas::conclusion_events(), + stream::iter(first_events.into_iter().map(Ok)), + ))) + }); + + let ctx = init_with_object_store( + mock_concluder, + object_store.clone(), + // Set the max_cached_rows to zero so that all rows are written to the object store + Some(0), + ) + .await + .unwrap(); + let event_states = do_pass(ctx.actor_handle.clone(), None, None).await.unwrap(); + expect![[r#" + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+--------+----------+ + | event_state_order | stream_cid | stream_type | controller | dimensions | event_cid | event_type | event_height | data | validation_errors | before | chain_id | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+--------+----------+ + | 1 | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: ce01040171710b0009686d6f64656c2d7631} | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 0 | 0 | {"content":{"accountRelation":{"type":"list"},"implements":[],"interface":false,"name":"TestSmallModel","relations":{},"schema":{"$schema":"https://json-schema.org/draft/2020-12/schema","additionalProperties":false,"properties":{"blue":{"format":"int32","type":"integer"},"creator":{"type":"string"},"green":{"format":"int32","type":"integer"},"red":{"format":"int32","type":"integer"}},"required":["creator","red","green","blue"],"title":"SmallModel","type":"object"},"version":"2.0","views":{}},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + | 2 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | 0 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+--------+----------+"#]].assert_eq(&event_states.to_string()); + ctx.shutdown().await.unwrap(); + } - // Setup mock that returns the conclusion_events + // Now we have existing data, start a new aggregator actor and ensure that it starts + // where it left off. + // Setup mock that returns a second batch of conclusion_events + let second_events = conclusion_events_to_record_batch(&events[2..]); let mut mock_concluder = MockConcluder::new(); mock_concluder .expect_handle_subscribe_since() .once() .with(predicate::eq(SubscribeSinceMsg { projection: None, - filters: None, + filters: Some(vec![gt_expression("conclusion_event_order", 1)]), limit: None, })) .return_once(|_msg| { Ok(Box::pin(RecordBatchStreamAdapter::new( schemas::conclusion_events(), - stream::iter(first_events.into_iter().map(Ok)), + stream::iter(second_events.into_iter().map(Ok)), ))) }); - let ctx = init_with_object_store( - mock_concluder, - object_store.clone(), - // Set the max_cached_rows to zero so that all rows are written to the object store - Some(0), - ) - .await - .unwrap(); - let event_states = do_pass(ctx.actor_handle.clone(), None, None).await.unwrap(); - expect![[r#" - +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+--------+----------+ - | event_state_order | stream_cid | stream_type | controller | dimensions | event_cid | event_type | event_height | data | validation_errors | before | chain_id | - +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+--------+----------+ - | 1 | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: ce01040171710b0009686d6f64656c2d7631} | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 0 | 0 | {"content":{"accountRelation":{"type":"list"},"implements":[],"interface":false,"name":"TestSmallModel","relations":{},"schema":{"$schema":"https://json-schema.org/draft/2020-12/schema","additionalProperties":false,"properties":{"blue":{"format":"int32","type":"integer"},"creator":{"type":"string"},"green":{"format":"int32","type":"integer"},"red":{"format":"int32","type":"integer"}},"required":["creator","red","green","blue"],"title":"SmallModel","type":"object"},"version":"2.0","views":{}},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | - | 2 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | 0 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | - +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+--------+----------+"#]].assert_eq(&event_states.to_string()); - ctx.shutdown().await.unwrap(); - } - - // Now we have existing data, start a new aggregator actor and ensure that it starts - // where it left off. - // Setup mock that returns a second batch of conclusion_events - let second_events = conclusion_events_to_record_batch(&events[2..]); - let mut mock_concluder = MockConcluder::new(); - mock_concluder - .expect_handle_subscribe_since() - .once() - .with(predicate::eq(SubscribeSinceMsg { - projection: None, - filters: Some(vec![gt_expression("conclusion_event_order", 1)]), - limit: None, - })) - .return_once(|_msg| { - Ok(Box::pin(RecordBatchStreamAdapter::new( - schemas::conclusion_events(), - stream::iter(second_events.into_iter().map(Ok)), - ))) - }); - - let ctx = init_with_object_store(mock_concluder, object_store, None) - .await - .unwrap(); + let ctx = init_with_object_store(mock_concluder, object_store, None) + .await + .unwrap(); - let mut subscription = ctx - .actor_handle - .send(SubscribeSinceMsg { - projection: None, - filters: None, - limit: Some(4), - }) - .await - .unwrap() - .unwrap(); - let mut batches = Vec::new(); - while let Some(batch) = subscription.try_next().await.unwrap() { - batches.push(batch) - } - let event_states = pretty_event_states(batches).await.unwrap(); - expect![[r#" + let mut subscription = ctx + .actor_handle + .send(SubscribeSinceMsg { + projection: None, + filters: None, + limit: Some(4), + }) + .await + .unwrap() + .unwrap(); + let mut batches = Vec::new(); + while let Some(batch) = subscription.try_next().await.unwrap() { + batches.push(batch) + } + let event_states = pretty_event_states(batches).await.unwrap(); + expect![[r#" +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+ | event_state_order | stream_cid | stream_type | controller | dimensions | event_cid | event_type | event_height | data | validation_errors | before | chain_id | +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+ @@ -2039,7 +2094,12 @@ mod tests { | 3 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | 1 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | 1744383131980 | test:chain | | 4 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeibrtuyyqwd6y4aa62qxaimjhafielf7fc22fa5b7i7vptcu5263em | 0 | 2 | {"metadata":{"foo":2,"shouldIndex":true},"content":{"blue":255,"creator":"alice","green":255,"red":0}} | [] | | | +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+"#]].assert_eq(&event_states.to_string()); - ctx.shutdown().await.unwrap(); + ctx.shutdown().await.unwrap(); + } + + tokio::time::timeout(std::time::Duration::from_secs(3), run_test()) + .await + .unwrap(); } #[test(tokio::test)] @@ -3503,4 +3563,303 @@ mod tests { +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+"# ]].assert_eq(&phase2_result.to_string()); } + + // Tests for idempotent patch application + #[tokio::test] + async fn patch_idempotency() { + // Test idempotency by processing same events twice - should never have duplicate events in one pass + let events = &[&model_and_mids_events()[0..4]].concat(); + let conclusion_events = conclusion_events_to_record_batch(events).unwrap(); + + let ctx = init().await.unwrap(); + + let result = do_pass( + ctx.actor_handle.clone(), + None, + Some(conclusion_events.clone()), + ) + .await + .unwrap(); + + ctx.shutdown().await.unwrap(); + + let ctx = init().await.unwrap(); + + let result2 = do_pass( + ctx.actor_handle.clone(), + None, + Some(conclusion_events.clone()), + ) + .await + .unwrap(); + + ctx.shutdown().await.unwrap(); + + expect![[r#" + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+ + | event_state_order | stream_cid | stream_type | controller | dimensions | event_cid | event_type | event_height | data | validation_errors | before | chain_id | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+ + | 1 | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: ce01040171710b0009686d6f64656c2d7631} | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 0 | 0 | {"content":{"accountRelation":{"type":"list"},"implements":[],"interface":false,"name":"TestSmallModel","relations":{},"schema":{"$schema":"https://json-schema.org/draft/2020-12/schema","additionalProperties":false,"properties":{"blue":{"format":"int32","type":"integer"},"creator":{"type":"string"},"green":{"format":"int32","type":"integer"},"red":{"format":"int32","type":"integer"}},"required":["creator","red","green","blue"],"title":"SmallModel","type":"object"},"version":"2.0","views":{}},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + | 2 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | 0 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + | 3 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | 1 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | 1744383131980 | test:chain | + | 4 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeibrtuyyqwd6y4aa62qxaimjhafielf7fc22fa5b7i7vptcu5263em | 0 | 2 | {"metadata":{"foo":2,"shouldIndex":true},"content":{"blue":255,"creator":"alice","green":255,"red":0}} | [] | | | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+"# + ]].assert_eq(&result.to_string()); + + expect![[r#" + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+ + | event_state_order | stream_cid | stream_type | controller | dimensions | event_cid | event_type | event_height | data | validation_errors | before | chain_id | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+ + | 1 | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: ce01040171710b0009686d6f64656c2d7631} | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 0 | 0 | {"content":{"accountRelation":{"type":"list"},"implements":[],"interface":false,"name":"TestSmallModel","relations":{},"schema":{"$schema":"https://json-schema.org/draft/2020-12/schema","additionalProperties":false,"properties":{"blue":{"format":"int32","type":"integer"},"creator":{"type":"string"},"green":{"format":"int32","type":"integer"},"red":{"format":"int32","type":"integer"}},"required":["creator","red","green","blue"],"title":"SmallModel","type":"object"},"version":"2.0","views":{}},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + | 2 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | 0 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + | 3 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | 1 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | 1744383131980 | test:chain | + | 4 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeibrtuyyqwd6y4aa62qxaimjhafielf7fc22fa5b7i7vptcu5263em | 0 | 2 | {"metadata":{"foo":2,"shouldIndex":true},"content":{"blue":255,"creator":"alice","green":255,"red":0}} | [] | | | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+"# + ]].assert_eq(&result2.to_string()); + } + + #[tokio::test] + async fn patch_idempotency_at_batch_boundaries() { + let events = &model_and_mids_events()[0..4]; + let ctx = init().await.unwrap(); + + // First processing: process events normally + let first_result = { + let result = do_pass( + ctx.actor_handle.clone(), + None, + Some(conclusion_events_to_record_batch(events).unwrap()), + ) + .await + .unwrap(); + // ctx.shutdown().await.unwrap(); + result + }; + + // Second processing: process same events again in a new batch + let second_result = { + // let ctx = init().await.unwrap(); + let result = do_pass( + ctx.actor_handle.clone(), + None, + Some(conclusion_events_to_record_batch(events).unwrap()), + ) + .await + .unwrap(); + ctx.shutdown().await.unwrap(); + result + }; + + println!("first_result: {}", first_result.to_string()); + println!("second_result: {}", second_result.to_string()); + + expect![[r#" + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+ + | event_state_order | stream_cid | stream_type | controller | dimensions | event_cid | event_type | event_height | data | validation_errors | before | chain_id | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+ + | 1 | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: ce01040171710b0009686d6f64656c2d7631} | baeabeieatrkhby3dlzev6wuyin66mfvwmew2rm3vh7bo4nfigjflnbmf7u | 0 | 0 | {"content":{"accountRelation":{"type":"list"},"implements":[],"interface":false,"name":"TestSmallModel","relations":{},"schema":{"$schema":"https://json-schema.org/draft/2020-12/schema","additionalProperties":false,"properties":{"blue":{"format":"int32","type":"integer"},"creator":{"type":"string"},"green":{"format":"int32","type":"integer"},"red":{"format":"int32","type":"integer"}},"required":["creator","red","green","blue"],"title":"SmallModel","type":"object"},"version":"2.0","views":{}},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + | 2 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | 0 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | | | + | 3 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | 1 | {"content":{"blue":255,"creator":"alice","green":255,"red":255},"metadata":{"foo":1,"shouldIndex":true}} | [] | 1744383131980 | test:chain | + | 4 | baeabeicdwdrilh6gazn6a7eruxbt5q46cquzimxsk52vcwobfvjhlndafm | 3 | did:key:alice | {controller: 6469643a6b65793a616c696365, model: ce010201001220809c5470e3635e495f5a98437de616b6612da8b3753fc2ee34a8324ab68585fd, unique: 77676b3533} | baeabeibrtuyyqwd6y4aa62qxaimjhafielf7fc22fa5b7i7vptcu5263em | 0 | 2 | {"metadata":{"foo":2,"shouldIndex":true},"content":{"blue":255,"creator":"alice","green":255,"red":0}} | [] | | | + +-------------------+-------------------------------------------------------------+-------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+------------+"# + ]].assert_eq(&first_result.to_string()); + + // Results should be identical - this is the core idempotency test + assert_eq!(second_result.to_string(), first_result.to_string()); + } + + fn random_cid() -> Cid { + use multihash_codetable::{Code, MultihashDigest}; + use rand::{thread_rng, Rng}; + + let mut data = [0u8; 8]; + thread_rng().fill(&mut data); + let hash = MultihashDigest::digest(&Code::Sha2_256, &data); + Cid::new_v1(0x00, hash) + } + + fn test_model() -> (StreamId, ConclusionEvent) { + let model_def = ModelDefinition::new_v2::( + "TestSmallModel".to_owned(), + None, + false, + None, + None, + ModelAccountRelationV2::List, + ) + .unwrap(); + + let model_stream_id = + StreamId::from_str("k2t6wz4yhfp1pc9l42mm6vh20xmhm9ac7cznnpu4xcxe4jds13l9sjknm1accd") + .unwrap(); + ( + model_stream_id.clone(), + ConclusionEvent::Data(ConclusionData { + order: 0, + event_cid: model_stream_id.cid, + init: ConclusionInit { + stream_cid: model_stream_id.cid, + stream_type: StreamIdType::Model as u8, + controller: "did:key:bob".to_owned(), + dimensions: vec![ + ("controller".to_owned(), b"did:key:bob".to_vec()), + ("model".to_owned(), METAMODEL_STREAM_ID.to_vec()), + ], + }, + previous: vec![], + data: serde_json::to_string(&json!({ + "metadata":{ + "foo":1, + "shouldIndex":true + }, + "content": model_def, + })) + .unwrap() + .into(), + }), + ) + } + + /// WARNING: the order used here MUST BE GLOBAL for your events, so if you call this multiple times + /// you must correct the order manually. + fn n_mid_events(to_add: u64, model_stream_id: &StreamId) -> Vec { + // NOTE: These CIDs and StreamIDs are fake and do not represent the actual hash of the data. + // This makes testing easier as changing the contents does not mean you need to update all of + // the cids. + + let unique = random_cid().to_bytes(); + let instance_stream_id = StreamId::document(random_cid()); + let stream_init = ConclusionInit { + stream_cid: instance_stream_id.cid, + stream_type: StreamIdType::ModelInstanceDocument as u8, + controller: "did:key:alice".to_owned(), + dimensions: vec![ + ("controller".to_owned(), b"did:key:alice".to_vec()), + ("model".to_owned(), model_stream_id.to_vec()), + ("unique".to_owned(), unique), + ], + }; + + let mut cids = Vec::with_capacity(to_add as usize); + let mut events = Vec::with_capacity(to_add as usize); + for i in 0..to_add { + let event_cid = random_cid(); + cids.push(event_cid); + let event = match i { + 0 => ConclusionEvent::Data(ConclusionData { + order: 1, + event_cid, + init: stream_init.clone(), + previous: vec![], + data: serde_json::to_string(&json!({ + "metadata":{ + "foo":1, + "shouldIndex":true + }, + "content":{ + "creator":"alice", + "red":255, + "green":255, + "blue":255 + }})) + .unwrap() + .into(), + }), + _ => ConclusionEvent::Data(ConclusionData { + order: i + 1, + event_cid, + init: stream_init.clone(), + previous: vec![cids[i as usize - 1]], + data: serde_json::to_string(&json!({ + "metadata":{"foo":2}, + "content":[{ + "op":"replace", + "path": "/red", + "value":0 + }]})) + .unwrap() + .into(), + }), + }; + events.push(event); + } + events + } + + #[test_log::test(tokio::test)] + async fn interspersed_events() { + let (model_stream_id, model) = test_model(); + let mid1_events = n_mid_events(10, &model_stream_id); + let mid2_events = n_mid_events(10, &model_stream_id); + let mid3_events = n_mid_events(10, &model_stream_id); + let mut events = mid1_events[0..5] + .iter() + .cloned() + .chain(mid2_events[0..5].iter().cloned()) + .chain(mid3_events[0..5].iter().cloned()) + .chain(mid2_events[5..].iter().cloned()) + .chain(mid1_events[5..].iter().cloned()) + .chain([model]) + .chain(mid3_events[5..].iter().cloned()) + .collect::>(); + // events must come after their previous and the order number has to incrememt globally. + // we ensured condition 1, now we rewrite the order + events + .iter_mut() + .enumerate() + .for_each(|(i, event)| match event { + ConclusionEvent::Data(data) => data.order = i as u64, + ConclusionEvent::Time(time) => time.order = i as u64, + }); + + let events = &events; + + // First processing: process events normally + let first_result = { + let ctx = init_with_cache(Some(7)).await.unwrap(); + let result = do_pass( + ctx.actor_handle.clone(), + None, + Some(conclusion_events_to_record_batch(events).unwrap()), + ) + .await + .unwrap(); + ctx.shutdown().await.unwrap(); + result + }; + let partial_result = { + let ctx = init_with_cache(Some(7)).await.unwrap(); + let result = do_pass( + ctx.actor_handle.clone(), + Some(20), + Some(conclusion_events_to_record_batch(events).unwrap()), + ) + .await + .unwrap(); + ctx.shutdown().await.unwrap(); + result + }; + let third_result = { + let ctx = init_with_cache(Some(7)).await.unwrap(); + let result = do_pass( + ctx.actor_handle.clone(), + None, + Some(conclusion_events_to_record_batch(events).unwrap()), + ) + .await + .unwrap(); + ctx.shutdown().await.unwrap(); + result + }; + let res = first_result.to_string(); + let res2 = partial_result.to_string(); + let res3 = third_result.to_string(); + // Results should be identical - this is the core idempotency test + tracing::info!("first_result: {}", res); + tracing::info!("partial_result: {}", res2); + tracing::info!("third_result: {}", res3); + assert_eq!(res, res3); + assert!(!res.contains("cannot validate")); + assert!(!res2.contains("cannot validate")); + } } diff --git a/pipeline/src/aggregator/model_instance_patch.rs b/pipeline/src/aggregator/model_instance_patch.rs index ab57b4d43..f33433989 100644 --- a/pipeline/src/aggregator/model_instance_patch.rs +++ b/pipeline/src/aggregator/model_instance_patch.rs @@ -226,6 +226,7 @@ impl PartitionEvaluator for CeramicPatchEvaluator { } Err(err) => { warn!(%err, event_cid=?Cid::read_bytes(event_cids.value(i)), "failed to apply patch to model instance event"); + tracing::debug!(%previous_height, %num_rows, patch=?String::from_utf8_lossy(patches.value(i)), previous_state=?String::from_utf8_lossy(previous_state), "failed to apply patch to model instance event"); new_states.append_null(); model_versions.append_null(); } diff --git a/pipeline/src/aggregator/model_patch.rs b/pipeline/src/aggregator/model_patch.rs index 5cea00299..dc8cf2891 100644 --- a/pipeline/src/aggregator/model_patch.rs +++ b/pipeline/src/aggregator/model_patch.rs @@ -176,6 +176,7 @@ impl PartitionEvaluator for CeramicPatchEvaluator { } Err(err) => { warn!(%err, event_cid=?Cid::read_bytes(event_cids.value(i)), "failed to apply patch to model event"); + tracing::debug!(%previous_height, %num_rows, patch=?String::from_utf8_lossy(patches.value(i)), previous_state=?String::from_utf8_lossy(previous_state), "failed to apply patch to model event"); new_states.append_null(); } }; diff --git a/sql/src/sqlite.rs b/sql/src/sqlite.rs index 6fb0d3e82..3498d1ab1 100644 --- a/sql/src/sqlite.rs +++ b/sql/src/sqlite.rs @@ -192,7 +192,7 @@ impl SqlitePool { /// Begin a transaction. The transaction must be committed by calling `commit_tx`. /// Will be rolled back on drop if not committed. - pub async fn begin_tx(&self) -> Result { + pub async fn begin_tx(&self) -> Result> { let tx = self.writer.begin().await?; Ok(SqliteTransaction { tx }) } diff --git a/tests/hermetic/src/cli/tester.rs b/tests/hermetic/src/cli/tester.rs index a34e1cfc3..afc4eb9de 100644 --- a/tests/hermetic/src/cli/tester.rs +++ b/tests/hermetic/src/cli/tester.rs @@ -786,12 +786,12 @@ fn job_volumes() -> Vec { fn job_init() -> Container { Container { name: "init-tests".to_owned(), - image: Some("stedolan/jq".to_owned()), + image: Some("alpine:latest".to_owned()), image_pull_policy: Some("IfNotPresent".to_owned()), command: Some(vec![ - "/bin/bash".to_owned(), + "/bin/sh".to_owned(), "-c".to_owned(), - "/network/process-peers.sh".to_owned(), + "apk add --no-cache bash jq curl && /network/process-peers.sh".to_owned(), ]), env: Some(vec![ EnvVar {