Skip to content

Commit 3ac9f9a

Browse files
dav1dosmrz2001
andauthored
fix: random things like feed queries, query CLI, migration logging, deps (#708)
* chore: update flight sql deps * fix: check migration model filter earlier this avoids lots of errors e.g. "fatal error: finding capability block" for events we don't want to review * fix: adjust query CLI to display streaming results querying the _feed tables without a limit will now print all the data retrieved and then wait for updates. previously it would just hang doing nothing. * test: modify tests so that events pass validation * fix: index -> conclusion_event_order this column was renamed but the predicate was not correctly updated so the feed queries always started from the beginning * fix: revert predicate pushdown change * fix: update ordering and push down filters we now pass predicates down into the pipeline so that _feed queries work better. if conclusion_event_order is included in the predicate, we will skip reading all events and start from the offset (we could potentially push the expr down but it's more involved). in general, WHERE clause predicates should be respected by _feed queries now. I had to adjust the ordering used as we were conflating conclusion_event_order and event_state_order, so we now use the conclusion_event_order as the offset, and apply predicates afterward to filter the stream data returned. afaict, the adjustment to use >= instead of > is required now, and doesn't cause issues as we expect we may get the same event in both streams and reduce to one correctly. * chore: point to right sdk docs * fix: improve stream/feed queries to support filters previously, we only supported highwater mark and tried to filter after the fact. now, we create two streams, the first using the filters to find all existing events, the second is all new events that are filtered before being pushed to the consumers. * chore: Option vec instead of allocating empty vec for filters * chore: fix clippy lints * fix: return error instead of simply logging when expression can't be evaluated * chore: update sccache version * fix: use SessionContext in feed to use execution props * chore: clippy * fix: remove TODO --------- Co-authored-by: Mohsin Zaidi <mohsinrzaidi@gmail.com>
1 parent ff912d4 commit 3ac9f9a

30 files changed

Lines changed: 749 additions & 521 deletions

File tree

.github/workflows/build-and-test.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ jobs:
2424
prefix-key: v0
2525
# Cache only the cargo registry
2626
cache-targets: false
27-
- uses: mozilla-actions/sccache-action@v0.0.3
27+
- uses: mozilla-actions/sccache-action@v0.0.9
2828
- name: git file permission config
2929
run: git config --global --add safe.directory '*'
3030
- name: Check fmt
@@ -52,7 +52,7 @@ jobs:
5252
prefix-key: v0
5353
# Cache only the cargo registry
5454
cache-targets: false
55-
- uses: mozilla-actions/sccache-action@v0.0.3
55+
- uses: mozilla-actions/sccache-action@v0.0.9
5656
- name: Run tests
5757
run: make test
5858
build:
@@ -69,7 +69,7 @@ jobs:
6969
prefix-key: v0
7070
# Cache only the cargo registry
7171
cache-targets: false
72-
- uses: mozilla-actions/sccache-action@v0.0.3
72+
- uses: mozilla-actions/sccache-action@v0.0.9
7373
-
7474
name: Set up Docker Buildx
7575
uses: docker/setup-buildx-action@v2
@@ -108,7 +108,7 @@ jobs:
108108
prefix-key: v0
109109
# Cache only the cargo registry
110110
cache-targets: false
111-
- uses: mozilla-actions/sccache-action@v0.0.3
111+
- uses: mozilla-actions/sccache-action@v0.0.9
112112
- name: Check fmt
113113
run: make -C tests check-fmt
114114
- name: Check clippy
@@ -171,7 +171,7 @@ jobs:
171171
prefix-key: v0
172172
# Cache only the cargo registry
173173
cache-targets: false
174-
- uses: mozilla-actions/sccache-action@v0.0.3
174+
- uses: mozilla-actions/sccache-action@v0.0.9
175175
- name: Build Tester
176176
run: make -C tests BUILD_PROFILE=release driver
177177
- uses: actions/upload-artifact@master

Cargo.lock

Lines changed: 4 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -269,9 +269,3 @@ repository = "https://github.com/3box/rust-ceramic"
269269
inherits = "release"
270270
debug = true
271271
strip = "none"
272-
273-
[patch."https://github.com/datafusion-contrib/datafusion-flight-sql-server.git"]
274-
datafusion-flight-sql-server = { git = "https://github.com/nathanielc/datafusion-flight-sql-server.git", branch = "chore/datafusion-45" }
275-
276-
[patch.crates-io]
277-
datafusion-federation = { git = "https://github.com/nathanielc/datafusion-federation.git", branch = "chore/datafusion-45" }

event-svc/src/event/migration.rs

Lines changed: 52 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use recon::ReconItem;
1414
use serde::Deserialize;
1515
use thiserror::Error;
1616
use tokio::{fs::File, io::AsyncWriteExt as _};
17-
use tracing::{debug, error, info, instrument, Level};
17+
use tracing::{debug, error, info, instrument, trace, Level};
1818

1919
use crate::{
2020
event::{BlockStore, DeliverableRequirement},
@@ -217,6 +217,13 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
217217
payload.header().controllers()[0].clone(),
218218
cid,
219219
);
220+
if self.is_filtered_out(&event_builder) {
221+
trace!(
222+
event_cid=%event_builder.event_cid,
223+
"skipping unreferenced init payload due to model filter"
224+
);
225+
return Ok(());
226+
}
220227
let event = unvalidated::init::Event::new(payload);
221228
let event: unvalidated::Event<Ipld> = unvalidated::Event::from(Box::new(event));
222229
self.validate_build_and_push(event_builder, event, &model)
@@ -284,6 +291,11 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
284291
}
285292
};
286293

294+
if self.is_filtered_out(&event_builder) {
295+
trace!(event_cid=%event_builder.event_cid, "skipping signed event due to model filter");
296+
return Ok(());
297+
}
298+
287299
let model = ModelContext::from(event_builder.sep.as_slice());
288300

289301
let mut capability = None;
@@ -370,6 +382,12 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
370382
unvalidated::RawEvent::Unsigned(payload) => Ok(payload),
371383
}
372384
}
385+
386+
/// Returns true if the event should be skipped due to the model (sep) filter included
387+
fn is_filtered_out(&self, event_builder: &EventBuilder) -> bool {
388+
!self.sep_filter.is_empty() && !self.sep_filter.contains(&event_builder.sep)
389+
}
390+
373391
// Find and add all blocks related to this time event
374392
#[instrument(skip(self, event), ret(level = Level::DEBUG))]
375393
async fn process_time_event(
@@ -386,6 +404,10 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
386404
init_payload.header().controllers()[0].clone(),
387405
init,
388406
);
407+
if self.is_filtered_out(&event_builder) {
408+
trace!(event_cid=%event_builder.event_cid, "skipping time event due to model filter");
409+
return Ok(());
410+
}
389411
let proof_id = event.proof();
390412
let data = self
391413
.load_block(&proof_id)
@@ -440,40 +462,38 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
440462
event: unvalidated::Event<Ipld>,
441463
model: &ModelContext,
442464
) -> Result<()> {
443-
if self.sep_filter.is_empty() || self.sep_filter.contains(&event_builder.sep) {
444-
match &event {
445-
unvalidated::Event::Time(event) => {
446-
if let Some(supported_chains) = &self.supported_chains {
447-
let chain_id = event.proof().chain_id().to_owned();
448-
if !supported_chains.contains(&chain_id) {
449-
return Err(anyhow!("event has unsupported chain: {chain_id}"))
450-
.with_model_context(model);
451-
}
465+
match &event {
466+
unvalidated::Event::Time(event) => {
467+
if let Some(supported_chains) = &self.supported_chains {
468+
let chain_id = event.proof().chain_id().to_owned();
469+
if !supported_chains.contains(&chain_id) {
470+
return Err(anyhow!("event has unsupported chain: {chain_id}"))
471+
.with_model_context(model);
452472
}
453473
}
454-
unvalidated::Event::Signed(event) => {
455-
if self.validate_signatures {
456-
event
457-
.verify_signature(
458-
Some(&event_builder.controller),
459-
&VerifyJwsOpts {
460-
at_time: AtTime::SkipTimeChecks,
461-
..Default::default()
462-
},
463-
)
464-
.await
465-
.with_model_context(model)?;
466-
}
474+
}
475+
unvalidated::Event::Signed(event) => {
476+
if self.validate_signatures {
477+
event
478+
.verify_signature(
479+
Some(&event_builder.controller),
480+
&VerifyJwsOpts {
481+
at_time: AtTime::SkipTimeChecks,
482+
..Default::default()
483+
},
484+
)
485+
.await
486+
.with_model_context(model)?;
467487
}
468-
unvalidated::Event::Unsigned(_) => {}
469-
};
470-
self.batch.push(
471-
event_builder
472-
.build(&self.network, event)
473-
.await
474-
.with_model_context(model)?,
475-
);
476-
}
488+
}
489+
unvalidated::Event::Unsigned(_) => {}
490+
};
491+
self.batch.push(
492+
event_builder
493+
.build(&self.network, event)
494+
.await
495+
.with_model_context(model)?,
496+
);
477497
Ok(())
478498
}
479499
}

event-svc/src/store/sql/access/event.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ impl EventAccess {
199199
I: Iterator<Item = &'a EventInsertable>,
200200
{
201201
let mut inserted = Vec::new();
202-
let mut tx = self.pool.begin_tx().await.map_err(Error::from)?;
202+
let mut tx = self.pool.begin_tx().await?;
203203

204204
for item in to_add {
205205
let new_key = self
@@ -224,7 +224,7 @@ impl EventAccess {
224224
self.mark_ready_to_deliver(&mut tx, item.cid()).await?;
225225
}
226226
}
227-
tx.commit().await.map_err(Error::from)?;
227+
tx.commit().await?;
228228
let res = InsertResult::new(inserted);
229229

230230
Ok(res)

event-svc/src/store/sql/entities/event.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,7 @@ impl EventInsertable {
149149
let mut idx = 0;
150150
let mut blocks = vec![];
151151
while let Some((cid, data)) = reader.next_block().await.map_err(Error::new_app)? {
152-
let ebr = EventBlockRaw::try_new(&self.cid, idx, roots.contains(&cid), cid, data)
153-
.map_err(Error::from)?;
152+
let ebr = EventBlockRaw::try_new(&self.cid, idx, roots.contains(&cid), cid, data)?;
154153
blocks.push(ebr);
155154
idx += 1;
156155
}

flight/tests/server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ fn events(start_index: u64, highwater_mark: u64, limit: usize) -> Vec<Conclusion
140140
};
141141
vec![
142142
ConclusionEvent::Data(ConclusionData {
143-
order: start_index + 0,
143+
order: start_index,
144144
event_cid: model_stream_id.cid,
145145
init: ConclusionInit {
146146
stream_cid: model_stream_id.cid,
@@ -364,7 +364,7 @@ async fn event_states_simple() -> Result<()> {
364364
let mut sub = aggregator
365365
.send(SubscribeSinceMsg {
366366
projection: None,
367-
offset: None,
367+
filters: None,
368368
// We know there are only four events, query all of them
369369
limit: Some(4),
370370
})

interest-svc/src/store/sql/access/interest.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ impl CeramicOneInterest {
5757
impl CeramicOneInterest {
5858
/// Insert a single interest into the database.
5959
pub async fn insert(pool: &SqlitePool, key: &Interest) -> Result<bool> {
60-
let mut tx = pool.begin_tx().await.map_err(Error::from)?;
60+
let mut tx = pool.begin_tx().await?;
6161
let new_key = CeramicOneInterest::insert_tx(&mut tx, key).await?;
62-
tx.commit().await.map_err(Error::from)?;
62+
tx.commit().await?;
6363
Ok(new_key)
6464
}
6565

@@ -72,14 +72,14 @@ impl CeramicOneInterest {
7272
0 => Ok(InsertResult::new(0)),
7373
_ => {
7474
let mut results = 0;
75-
let mut tx = pool.begin_tx().await.map_err(Error::from)?;
75+
let mut tx = pool.begin_tx().await?;
7676

7777
for item in items.iter() {
7878
CeramicOneInterest::insert_tx(&mut tx, item)
7979
.await?
8080
.then(|| results += 1);
8181
}
82-
tx.commit().await.map_err(Error::from)?;
82+
tx.commit().await?;
8383
Ok(InsertResult::new(results))
8484
}
8585
}

one/src/daemon.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ async fn get_eth_rpc_providers(
293293
let provider_chain = provider.chain_id();
294294
if network
295295
.supported_chain_ids()
296-
.map_or(true, |ids| ids.contains(provider_chain))
296+
.is_none_or(|ids| ids.contains(provider_chain))
297297
{
298298
info!(
299299
"Using ethereum rpc provider for chain: {} with url: {}",

0 commit comments

Comments
 (0)