Skip to content

Commit cf69d03

Browse files
committed
feat: validate time events and propagate proofs through aggregator
1 parent 784ea68 commit cf69d03

11 files changed

Lines changed: 179 additions & 138 deletions

File tree

event-svc/src/event/service.rs

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ use super::{
1212
};
1313
use async_trait::async_trait;
1414
use ceramic_core::{EventId, Network, NodeId, SerializeExt};
15-
use ceramic_pipeline::{concluder::TimeProof, ConclusionData, ConclusionEvent, ConclusionInit, ConclusionTime};
15+
use ceramic_pipeline::{
16+
concluder::TimeProof, ConclusionData, ConclusionEvent, ConclusionInit, ConclusionTime,
17+
};
1618
use ceramic_sql::sqlite::SqlitePool;
1719
use cid::Cid;
1820
use futures::stream::BoxStream;
@@ -402,30 +404,22 @@ impl EventService {
402404

403405
match event {
404406
ceramic_event::unvalidated::Event::Time(time_event) => {
405-
let proof = match self.discover_chain_proof(&time_event).await {
406-
Ok(proof) => Some(proof),
407-
Err(error) => {
408-
tracing::warn!(
409-
?event_cid,
410-
?error,
411-
"Failed to discover chain proof for time event"
412-
);
413-
None
414-
}
415-
};
407+
let proof = self.discover_chain_proof(&time_event).await.map_err(|e| {
408+
Error::new_app(anyhow::anyhow!("Failed to discover chain proof: {:?}", e))
409+
})?;
416410

417411
Ok(ConclusionEvent::Time(ConclusionTime {
418412
event_cid,
419413
init,
420414
previous: vec![*time_event.prev()],
421415
order: delivered as u64,
422-
time_proof: proof.map(|p| TimeProof {
423-
before: p
416+
time_proof: TimeProof {
417+
before: proof
424418
.timestamp
425419
.try_into()
426420
.expect("conclusion timestamp overflow"),
427-
chain_id: p.chain_id,
428-
}),
421+
chain_id: proof.chain_id,
422+
},
429423
}))
430424
}
431425
ceramic_event::unvalidated::Event::Signed(signed_event) => {
@@ -638,12 +632,6 @@ pub enum ValidationError {
638632
key: EventId,
639633
reason: String,
640634
},
641-
/// 'Soft error' -> should not kill recon conversation but should not be persisted
642-
/// A time event could not be validated because no RPC provider was available
643-
SoftError {
644-
key: EventId,
645-
reason: String,
646-
},
647635
}
648636

649637
#[derive(Debug, PartialEq, Eq, Default)]

event-svc/src/event/validator/event.rs

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use ceramic_core::{Cid, EventId, NodeId};
44
use ceramic_event::unvalidated;
55
use ipld_core::ipld::Ipld;
66
use recon::ReconItem;
7-
use tokio::try_join;
87

98
use crate::{
109
blockchain::eth_rpc,
@@ -117,6 +116,7 @@ impl ValidatedEvents {
117116
self.valid.extend(other.valid);
118117
self.invalid.extend(other.invalid);
119118
self.unvalidated.extend(other.unvalidated);
119+
self.proofs.extend(other.proofs);
120120
}
121121
}
122122

@@ -150,38 +150,34 @@ impl EventValidator {
150150
}
151151

152152
/// Validates the events with the given validation requirement
153-
/// If the [`ValidationRequirement`] is None, it just returns every event as valid
153+
/// Regardless of the validation requirement, time events are always validated.
154+
/// If the [`ValidationRequirement`] is None, it just returns every data event as valid.
154155
pub(crate) async fn validate_events(
155156
&self,
156157
validation_req: Option<&ValidationRequirement>,
157158
parsed_events: Vec<UnvalidatedEvent>,
158159
) -> Result<ValidatedEvents> {
159-
let validation_req = if let Some(req) = validation_req {
160-
req
161-
} else {
162-
// we don't validate so we just return done
163-
return Ok(ValidatedEvents {
164-
valid: parsed_events
165-
.into_iter()
166-
.map(ValidatedEvent::from_unvalidated_unchecked)
167-
.collect(),
168-
unvalidated: Vec::new(),
169-
invalid: Vec::new(),
170-
proofs: Vec::new(),
171-
});
172-
};
173-
174160
let mut validated = ValidatedEvents::new_with_expected_valid(parsed_events.len());
175-
// partition the events by type of validation needed and delegate to validators
161+
162+
// Partition the events by type of validation needed and delegate to validators
176163
let grouped = GroupedEvents::from(parsed_events);
177164

178-
let (validated_signed, validated_time) = try_join!(
179-
self.validate_signed_events(grouped.signed_batch, validation_req),
180-
self.validate_time_events(grouped.time_batch)
181-
)?;
182-
validated.extend_with(validated_signed);
165+
// Time events are always validated
166+
let validated_time = self.validate_time_events(grouped.time_batch).await?;
183167
validated.extend_with(validated_time);
184168

169+
if let Some(req) = validation_req {
170+
let validated_signed = self
171+
.validate_signed_events(grouped.signed_batch, req)
172+
.await?;
173+
validated.extend_with(validated_signed);
174+
} else {
175+
// Return all data events as valid
176+
validated
177+
.valid
178+
.extend(Vec::<ValidatedEvent>::from(grouped.signed_batch));
179+
};
180+
185181
if !validated.invalid.is_empty() {
186182
tracing::warn!(count=%validated.invalid.len(), "invalid events discovered");
187183
}

event-svc/src/event/validator/grouped.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,22 @@ pub struct TimeValidationBatch(pub(crate) Vec<Time>);
104104

105105
#[derive(Debug)]
106106
pub struct SignedValidationBatch {
107-
pub data: Vec<SignedData>,
108-
pub init: Vec<SignedInit>,
107+
pub(crate) data: Vec<SignedData>,
108+
pub(crate) init: Vec<SignedInit>,
109109
/// Possibly needed to verify data event controllers as these
110110
/// don't yet exist on disk, but the signatures aren't checked.
111-
pub unsigned: Vec<Unsigned>,
111+
pub(crate) unsigned: Vec<Unsigned>,
112+
}
113+
114+
impl From<SignedValidationBatch> for Vec<ValidatedEvent> {
115+
fn from(value: SignedValidationBatch) -> Self {
116+
let mut events =
117+
Vec::with_capacity(value.data.len() + value.init.len() + value.unsigned.len());
118+
events.extend(value.data.into_iter().map(Into::into));
119+
events.extend(value.init.into_iter().map(Into::into));
120+
events.extend(value.unsigned.into_iter().map(Into::into));
121+
events
122+
}
112123
}
113124

114125
impl From<Vec<UnvalidatedEvent>> for GroupedEvents {

one/src/daemon.rs

Lines changed: 5 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ use anyhow::{anyhow, bail, Result};
88
use ceramic_anchor_remote::RemoteCas;
99
use ceramic_anchor_service::AnchorService;
1010
use ceramic_core::NodeKey;
11-
use ceramic_event_svc::eth_rpc::HttpEthRpc;
12-
use ceramic_event_svc::{ChainInclusionProvider, EventService};
11+
use ceramic_event_svc::EventService;
1312
use ceramic_interest_svc::InterestService;
1413
use ceramic_kubo_rpc::Multiaddr;
1514
use ceramic_metrics::{config::Config as MetricsConfig, MetricsHandle};
@@ -276,56 +275,6 @@ pub struct DaemonOpts {
276275
object_store_url: url::Url,
277276
}
278277

279-
async fn get_eth_rpc_providers(
280-
ethereum_rpc_urls: Vec<String>,
281-
network: &Network,
282-
) -> Result<Vec<ChainInclusionProvider>> {
283-
let ethereum_rpc_urls = if ethereum_rpc_urls.is_empty() {
284-
network.default_rpc_urls()?
285-
} else {
286-
ethereum_rpc_urls
287-
};
288-
289-
let mut providers = Vec::new();
290-
for url in ethereum_rpc_urls {
291-
match HttpEthRpc::try_new(&url).await {
292-
Ok(provider) => {
293-
let provider_chain = provider.chain_id();
294-
if network
295-
.supported_chain_ids()
296-
.is_none_or(|ids| ids.contains(provider_chain))
297-
{
298-
info!(
299-
"Using ethereum rpc provider for chain: {} with url: {}",
300-
provider.chain_id(),
301-
provider.url()
302-
);
303-
let provider: ChainInclusionProvider = Arc::new(provider);
304-
providers.push(provider);
305-
} else {
306-
warn!("Eth RPC provider {} uses chainid {} which isn't supported by Ceramic network {:?}", url, provider_chain,network);
307-
}
308-
}
309-
Err(err) => {
310-
warn!("failed to create RPC client with url: '{url}': {:?}", err);
311-
}
312-
}
313-
}
314-
315-
if providers.is_empty() {
316-
if *network == Network::Local || *network == Network::InMemory {
317-
warn!("No usable ethereum RPC provided for network {}. All TimeEvent validation will fail", network.name());
318-
} else {
319-
bail!(
320-
"No usable ethereum RPC configured for network {}",
321-
network.name()
322-
);
323-
}
324-
}
325-
326-
Ok(providers)
327-
}
328-
329278
fn spawn_database_optimizer(
330279
sqlite_pool: SqlitePool,
331280
mut shutdown: ShutdownSignal,
@@ -422,7 +371,10 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
422371
None
423372
};
424373

425-
let rpc_providers = get_eth_rpc_providers(opts.ethereum_rpc_urls, &opts.network).await?;
374+
let rpc_providers = opts
375+
.network
376+
.get_eth_rpc_providers(opts.ethereum_rpc_urls)
377+
.await?;
426378

427379
// Construct services from pool
428380
let peer_svc = Arc::new(PeerService::new(sqlite_pool.clone()));

one/src/lib.rs

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ mod migrations;
99
mod network;
1010
mod query;
1111

12-
use anyhow::{anyhow, Result};
12+
use anyhow::{anyhow, bail, Result};
1313
use ceramic_core::ssi::caip2::ChainId;
14+
use ceramic_event_svc::{eth_rpc::HttpEthRpc, ChainInclusionProvider};
1415
use ceramic_metrics::config::Config as MetricsConfig;
1516
use ceramic_sql::sqlite::{SqliteOpts, SqlitePool};
1617
use clap::{Args, Parser, Subcommand, ValueEnum};
@@ -22,8 +23,7 @@ use multihash_codetable::Code;
2223
use multihash_derive::Hasher;
2324
use shutdown::Shutdown;
2425
use signal_hook_tokio::Signals;
25-
use std::str::FromStr;
26-
use std::{env, path::PathBuf};
26+
use std::{env, path::PathBuf, str::FromStr, sync::Arc};
2727
use tokio::io::AsyncReadExt;
2828
use tracing::{debug, error, info, warn};
2929

@@ -121,7 +121,7 @@ impl Network {
121121
}
122122

123123
/// Return the default ethereum rpc providers for each network.
124-
pub fn default_rpc_urls(&self) -> Result<Vec<String>> {
124+
fn default_rpc_urls(&self) -> Result<Vec<String>> {
125125
match self {
126126
Network::Mainnet => {
127127
anyhow::bail!("no Ethereum RPC URLs specified for Mainnet")
@@ -151,7 +151,7 @@ impl Network {
151151
}
152152

153153
/// return the allowed chain ids for this network. or None for any
154-
pub fn supported_chain_ids(&self) -> Option<Vec<ChainId>> {
154+
fn supported_chain_ids(&self) -> Option<Vec<ChainId>> {
155155
match self {
156156
Network::Mainnet => Some(vec![
157157
ChainId::from_str("eip155:1").expect("eip155:1 is a valid chain")
@@ -168,7 +168,7 @@ impl Network {
168168
}
169169

170170
/// Get the network as a unique name.
171-
pub fn name(&self) -> String {
171+
fn name(&self) -> String {
172172
match self {
173173
Network::Mainnet => "mainnet".to_owned(),
174174
Network::TestnetClay => "testnet-clay".to_owned(),
@@ -177,6 +177,56 @@ impl Network {
177177
Network::InMemory => "inmemory".to_owned(),
178178
}
179179
}
180+
181+
pub async fn get_eth_rpc_providers(
182+
&self,
183+
ethereum_rpc_urls: Vec<String>,
184+
) -> Result<Vec<ChainInclusionProvider>> {
185+
let ethereum_rpc_urls = if ethereum_rpc_urls.is_empty() {
186+
self.default_rpc_urls()?
187+
} else {
188+
ethereum_rpc_urls
189+
};
190+
191+
let mut providers = Vec::new();
192+
for url in ethereum_rpc_urls {
193+
match HttpEthRpc::try_new(&url).await {
194+
Ok(provider) => {
195+
let provider_chain = provider.chain_id();
196+
if self
197+
.supported_chain_ids()
198+
.is_none_or(|ids| ids.contains(provider_chain))
199+
{
200+
info!(
201+
"Using ethereum rpc provider for chain: {} with url: {}",
202+
provider.chain_id(),
203+
provider.url()
204+
);
205+
let provider: ChainInclusionProvider = Arc::new(provider);
206+
providers.push(provider);
207+
} else {
208+
warn!("Eth RPC provider {} uses chainid {} which isn't supported by Ceramic network {:?}", url, provider_chain, self);
209+
}
210+
}
211+
Err(err) => {
212+
warn!("failed to create RPC client with url: '{url}': {:?}", err);
213+
}
214+
}
215+
}
216+
217+
if providers.is_empty() {
218+
if *self == Network::Local || *self == Network::InMemory {
219+
warn!("No usable ethereum RPC provided for network {}. All TimeEvent validation will fail", self.name());
220+
} else {
221+
bail!(
222+
"No usable ethereum RPC configured for network {}",
223+
self.name()
224+
);
225+
}
226+
}
227+
228+
Ok(providers)
229+
}
180230
}
181231

182232
/// The default storage directory to use if none is provided. In order:

one/src/migrations.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,15 @@ pub struct FromIpfsOpts {
123123
/// For example, on mainnet, events are expected to have a chain id of 'eip155:1'.
124124
#[clap(long, env = "CERAMIC_ONE_VALIDATE_CHAIN")]
125125
validate_chain: bool,
126+
127+
/// Ethereum RPC URLs used for time events validation. Required when connecting to mainnet and uses fallback URLs if not specified for other networks.
128+
#[arg(
129+
long,
130+
use_value_delimiter = true,
131+
value_delimiter = ',',
132+
env = "CERAMIC_ONE_ETHEREUM_RPC_URLS"
133+
)]
134+
ethereum_rpc_urls: Vec<String>,
126135
}
127136

128137
impl From<&FromIpfsOpts> for DBOpts {
@@ -165,13 +174,18 @@ async fn from_ipfs(opts: FromIpfsOpts) -> Result<()> {
165174
let network = opts.network.to_network(&opts.local_network_id)?;
166175
let db_opts: DBOpts = (&opts).into();
167176
let sqlite_pool = db_opts.get_sqlite_pool(SqliteOpts::default()).await?;
177+
let rpc_providers = opts
178+
.network
179+
.get_eth_rpc_providers(opts.ethereum_rpc_urls)
180+
.await?;
181+
168182
// TODO: feature flags here? or just remove this entirely when enabling
169183
let event_svc = Arc::new(
170184
EventService::try_new(
171185
sqlite_pool,
172186
ceramic_event_svc::UndeliveredEventReview::Skip,
173187
false,
174-
vec![],
188+
rpc_providers,
175189
)
176190
.await?,
177191
);

0 commit comments

Comments
 (0)