Skip to content

Commit 6c11a03

Browse files
dav1dosmrz2001
andauthored
fix: aggregator restart missing events (#737)
* chore: deal with rustc 1.89 mismatched_lifetime_syntaxes warnings * fix: aggregator idempotency restart from the beginning of conclusion events to avoid skipping on restarts. need to figure out why memory table contains conclusion_event_order that is less than the data written to event_states still, and hopefully restart where we left off. for now, we just ignore things we've seen before. * fix: ensure conclusion event batch is written flushed from cache as a whole we split the batch into models/mids and were writing both pieces to disk separately. The entire batch is orded by conclusion_event_order, but each group could overlap. So we could flush the cache for the first write and then we'd have a cache that was "going backward" in conclusion_event_order. On restart, those events were missed and events failed to aggregate correctly * test: fix comment and modify test to better replicate ordering bug * chore(ci): add fmt/clippy * chore(ci): use newer jq image * chore(ci): use alpine image for jq * chore(ci): install bash in init container --------- Co-authored-by: Mohsin Zaidi <mohsinrzaidi@gmail.com>
1 parent 85d101a commit 6c11a03

11 files changed

Lines changed: 467 additions & 100 deletions

File tree

.github/workflows/build-and-test.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ jobs:
101101
with:
102102
package_json_file: tests/suite/package.json
103103
- uses: dtolnay/rust-toolchain@stable
104+
with:
105+
components: rustfmt, clippy
104106
- uses: Swatinem/rust-cache@v2
105107
with:
106108
# The prefix cache key, this can be changed to start a new cache manually.

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

anchor-service/src/anchor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl MerkleNodes {
5757
}
5858

5959
/// Return an iterator over the MerkleNodes
60-
pub fn iter(&self) -> indexmap::map::Iter<Cid, MerkleNode> {
60+
pub fn iter(&self) -> indexmap::map::Iter<'_, Cid, MerkleNode> {
6161
self.nodes.iter()
6262
}
6363

core/src/peer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ impl Builder<WithExpiration> {
224224
PeerKey(format!("{:0>11}", self.state.expiration))
225225
}
226226
/// Set the peer id. Note, a NodeKey is required so the [`PeerEntry`] can be signed.
227-
pub fn with_id(self, id: &NodeKey) -> Builder<WithId> {
227+
pub fn with_id(self, id: &NodeKey) -> Builder<WithId<'_>> {
228228
Builder {
229229
state: WithId {
230230
node_key: id,

flight/tests/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ async fn event_states_simple() -> Result<()> {
397397
data::varchar as data,
398398
before,
399399
chain_id
400-
FROM event_states LIMIT 4"#
400+
FROM event_states order by event_state_order LIMIT 4"#
401401
.to_string(),
402402
None,
403403
)

pipeline/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,5 @@ url.workspace = true
5252
mockall.workspace = true
5353
test-log.workspace = true
5454
tracing-subscriber.workspace = true
55+
rand.workspace = true
56+
multihash-codetable.workspace = true

pipeline/src/aggregator/mod.rs

Lines changed: 452 additions & 93 deletions
Large diffs are not rendered by default.

pipeline/src/aggregator/model_instance_patch.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ impl PartitionEvaluator for CeramicPatchEvaluator {
226226
}
227227
Err(err) => {
228228
warn!(%err, event_cid=?Cid::read_bytes(event_cids.value(i)), "failed to apply patch to model instance event");
229+
tracing::debug!(%previous_height, %num_rows, patch=?String::from_utf8_lossy(patches.value(i)), previous_state=?String::from_utf8_lossy(previous_state), "failed to apply patch to model instance event");
229230
new_states.append_null();
230231
model_versions.append_null();
231232
}

pipeline/src/aggregator/model_patch.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ impl PartitionEvaluator for CeramicPatchEvaluator {
176176
}
177177
Err(err) => {
178178
warn!(%err, event_cid=?Cid::read_bytes(event_cids.value(i)), "failed to apply patch to model event");
179+
tracing::debug!(%previous_height, %num_rows, patch=?String::from_utf8_lossy(patches.value(i)), previous_state=?String::from_utf8_lossy(previous_state), "failed to apply patch to model event");
179180
new_states.append_null();
180181
}
181182
};

sql/src/sqlite.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ impl SqlitePool {
192192

193193
/// Begin a transaction. The transaction must be committed by calling `commit_tx`.
194194
/// Will be rolled back on drop if not committed.
195-
pub async fn begin_tx(&self) -> Result<SqliteTransaction> {
195+
pub async fn begin_tx(&self) -> Result<SqliteTransaction<'_>> {
196196
let tx = self.writer.begin().await?;
197197
Ok(SqliteTransaction { tx })
198198
}

0 commit comments

Comments
 (0)