Skip to content

Commit f30efb8

Browse files
authored
split runner build and start (#4126)
1 parent a3cc5b9 commit f30efb8

28 files changed

Lines changed: 561 additions & 139 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,7 @@ mod subgraph;
66

77
pub use crate::link_resolver::LinkResolver;
88
pub use crate::metrics::MetricsRegistry;
9-
pub use crate::subgraph::{SubgraphAssignmentProvider, SubgraphInstanceManager, SubgraphRegistrar};
9+
pub use crate::subgraph::{
10+
SubgraphAssignmentProvider, SubgraphInstanceManager, SubgraphRegistrar, SubgraphRunner,
11+
SubgraphTriggerProcessor,
12+
};

core/src/subgraph/context.rs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub type SharedInstanceKeepAliveMap = Arc<RwLock<HashMap<DeploymentId, CancelGua
3030
// Currently most of the changes are applied in `runner.rs`, but ideally more of that would be
3131
// refactored into the context so it wouldn't need `pub` fields. The entity cache should probably
3232
// also be moved here.
33-
pub(crate) struct IndexingContext<C, T>
33+
pub struct IndexingContext<C, T>
3434
where
3535
T: RuntimeHostBuilder<C>,
3636
C: Blockchain,
@@ -111,15 +111,22 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
111111
.await
112112
}
113113

114-
// Removes data sources hosts with a creation block greater or equal to `reverted_block`, so
115-
// that they are no longer candidates for `process_trigger`.
116-
//
117-
// This does not currently affect the `offchain_monitor` or the `filter`, so they will continue
118-
// to include data sources that have been reverted. This is not ideal for performance, but it
119-
// does not affect correctness since triggers that have no matching host will be ignored by
120-
// `process_trigger`.
121-
pub fn revert_data_sources(&mut self, reverted_block: BlockNumber) {
122-
self.instance.revert_data_sources(reverted_block)
114+
/// Removes data sources hosts with a creation block greater or equal to `reverted_block`, so
115+
/// that they are no longer candidates for `process_trigger`.
116+
///
117+
/// This does not currently affect the `offchain_monitor` or the `filter`, so they will continue
118+
/// to include data sources that have been reverted. This is not ideal for performance, but it
119+
/// does not affect correctness since triggers that have no matching host will be ignored by
120+
/// `process_trigger`.
121+
///
122+
/// File data sources that have been marked not done during this process will get re-queued
123+
pub fn revert_data_sources(&mut self, reverted_block: BlockNumber) -> Result<(), Error> {
124+
let removed = self.instance.revert_data_sources(reverted_block);
125+
126+
removed
127+
.into_iter()
128+
.map(|source| self.offchain_monitor.add_source(source))
129+
.collect()
123130
}
124131

125132
pub fn add_dynamic_data_source(
@@ -142,9 +149,14 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
142149
pub fn causality_region_next_value(&mut self) -> CausalityRegion {
143150
self.instance.causality_region_next_value()
144151
}
152+
153+
#[cfg(debug_assertions)]
154+
pub fn instance(&self) -> &SubgraphInstance<C, T> {
155+
&self.instance
156+
}
145157
}
146158

147-
pub(crate) struct OffchainMonitor {
159+
pub struct OffchainMonitor {
148160
ipfs_monitor: PollingMonitor<CidFile>,
149161
ipfs_monitor_rx: mpsc::Receiver<(CidFile, Bytes)>,
150162
}

core/src/subgraph/context/instance.rs

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@ use futures01::sync::mpsc::Sender;
22
use graph::{
33
blockchain::Blockchain,
44
data_source::{
5-
causality_region::CausalityRegionSeq, CausalityRegion, DataSource, DataSourceTemplate,
5+
causality_region::CausalityRegionSeq, offchain, CausalityRegion, DataSource,
6+
DataSourceTemplate,
67
},
78
prelude::*,
89
};
910
use std::collections::HashMap;
1011

1112
use super::OffchainMonitor;
1213

13-
pub(crate) struct SubgraphInstance<C: Blockchain, T: RuntimeHostBuilder<C>> {
14+
pub struct SubgraphInstance<C: Blockchain, T: RuntimeHostBuilder<C>> {
1415
subgraph_id: DeploymentHash,
1516
network: String,
1617
host_builder: T,
@@ -155,7 +156,42 @@ where
155156
})
156157
}
157158

158-
pub(super) fn revert_data_sources(&mut self, reverted_block: BlockNumber) {
159+
/// Reverts any DataSources that have been added from the block forwards (inclusively)
160+
/// This function also reverts the done_at status if it was 'done' on this block or later.
161+
/// It only returns the offchain::Source because we don't currently need to know which
162+
/// DataSources were removed, the source is used so that the offchain DDS can be found again.
163+
pub(super) fn revert_data_sources(
164+
&mut self,
165+
reverted_block: BlockNumber,
166+
) -> Vec<offchain::Source> {
167+
self.revert_hosts_cheap(reverted_block);
168+
169+
// The following code handles resetting offchain datasources so in most
170+
// cases this is enough processing.
171+
// At some point we prolly need to improve the linear search but for now this
172+
// should be fine. *IT'S FINE*
173+
//
174+
// Any File DataSources (Dynamic Data Sources), will have their own causality region
175+
// which currently is the next number of the sequence but that should be an internal detail.
176+
// Regardless of the sequence logic, if the current causality region is ONCHAIN then there are
177+
// no others and therefore the remaining code is a noop and we can just stop here.
178+
if self.causality_region_seq.0 == CausalityRegion::ONCHAIN {
179+
return vec![];
180+
}
181+
182+
self.hosts
183+
.iter()
184+
.filter(|host| matches!(host.done_at(), Some(done_at) if done_at >= reverted_block))
185+
.map(|host| {
186+
host.set_done_at(None);
187+
// Safe to call unwrap() because only offchain DataSources have done_at = Some
188+
host.data_source().as_offchain().unwrap().source.clone()
189+
})
190+
.collect()
191+
}
192+
193+
/// Because hosts are ordered, removing them based on creation block is cheap and simple.
194+
fn revert_hosts_cheap(&mut self, reverted_block: BlockNumber) {
159195
// `hosts` is ordered by the creation block.
160196
// See also 8f1bca33-d3b7-4035-affc-fd6161a12448.
161197
while self
@@ -168,7 +204,7 @@ where
168204
}
169205
}
170206

171-
pub(super) fn hosts(&self) -> &[Arc<T::Host>] {
207+
pub fn hosts(&self) -> &[Arc<T::Host>] {
172208
&self.hosts
173209
}
174210

core/src/subgraph/instance_manager.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use graph::blockchain::{BlockchainKind, TriggerFilter};
1111
use graph::components::subgraph::ProofOfIndexingVersion;
1212
use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6};
1313
use graph::data_source::causality_region::CausalityRegionSeq;
14+
use graph::env::EnvVars;
1415
use graph::prelude::{SubgraphInstanceManager as SubgraphInstanceManagerTrait, *};
1516
use graph::{blockchain::BlockchainMap, components::store::DeploymentLocator};
1617
use graph_runtime_wasm::module::ToAscPtr;
@@ -20,6 +21,7 @@ use tokio::task;
2021
use super::context::OffchainMonitor;
2122
use super::SubgraphTriggerProcessor;
2223

24+
#[derive(Clone)]
2325
pub struct SubgraphInstanceManager<S: SubgraphStore> {
2426
logger_factory: LoggerFactory,
2527
subgraph_store: Arc<S>,
@@ -30,6 +32,7 @@ pub struct SubgraphInstanceManager<S: SubgraphStore> {
3032
link_resolver: Arc<dyn LinkResolver>,
3133
ipfs_service: IpfsService,
3234
static_filters: bool,
35+
env_vars: Arc<EnvVars>,
3336
}
3437

3538
#[async_trait]
@@ -51,6 +54,7 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
5154
let runner = instance_manager
5255
.build_subgraph_runner::<graph_chain_arweave::Chain>(
5356
logger.clone(),
57+
self.env_vars.cheap_clone(),
5458
loc.clone(),
5559
manifest,
5660
stop_block,
@@ -64,6 +68,7 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
6468
let runner = instance_manager
6569
.build_subgraph_runner::<graph_chain_ethereum::Chain>(
6670
logger.clone(),
71+
self.env_vars.cheap_clone(),
6772
loc.clone(),
6873
manifest,
6974
stop_block,
@@ -77,6 +82,7 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
7782
let runner = instance_manager
7883
.build_subgraph_runner::<graph_chain_near::Chain>(
7984
logger.clone(),
85+
self.env_vars.cheap_clone(),
8086
loc.clone(),
8187
manifest,
8288
stop_block,
@@ -90,6 +96,7 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
9096
let runner = instance_manager
9197
.build_subgraph_runner::<graph_chain_cosmos::Chain>(
9298
logger.clone(),
99+
self.env_vars.cheap_clone(),
93100
loc.clone(),
94101
manifest,
95102
stop_block,
@@ -103,6 +110,7 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
103110
let runner = instance_manager
104111
.build_subgraph_runner::<graph_chain_substreams::Chain>(
105112
logger.clone(),
113+
self.env_vars.cheap_clone(),
106114
loc.cheap_clone(),
107115
manifest,
108116
stop_block,
@@ -156,6 +164,7 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
156164
impl<S: SubgraphStore> SubgraphInstanceManager<S> {
157165
pub fn new(
158166
logger_factory: &LoggerFactory,
167+
env_vars: Arc<EnvVars>,
159168
subgraph_store: Arc<S>,
160169
chains: Arc<BlockchainMap>,
161170
metrics_registry: Arc<dyn MetricsRegistry>,
@@ -178,12 +187,14 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
178187
link_resolver,
179188
ipfs_service,
180189
static_filters,
190+
env_vars,
181191
}
182192
}
183193

184-
async fn build_subgraph_runner<C>(
194+
pub async fn build_subgraph_runner<C>(
185195
&self,
186196
logger: Logger,
197+
env_vars: Arc<EnvVars>,
187198
deployment: DeploymentLocator,
188199
manifest: serde_yaml::Mapping,
189200
stop_block: Option<BlockNumber>,
@@ -426,6 +437,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
426437
ctx,
427438
logger.cheap_clone(),
428439
metrics,
440+
env_vars,
429441
))
430442
}
431443

core/src/subgraph/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ mod trigger_processor;
1313
pub use self::instance_manager::SubgraphInstanceManager;
1414
pub use self::provider::SubgraphAssignmentProvider;
1515
pub use self::registrar::SubgraphRegistrar;
16+
pub use self::runner::SubgraphRunner;
1617
pub use self::trigger_processor::*;

core/src/subgraph/runner.rs

Lines changed: 47 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use graph::data::subgraph::{
1919
use graph::data_source::{
2020
offchain, DataSource, DataSourceCreationError, DataSourceTemplate, TriggerData,
2121
};
22+
use graph::env::EnvVars;
2223
use graph::prelude::*;
2324
use graph::util::{backoff::ExponentialBackoff, lfu_cache::LfuCache};
2425
use std::sync::Arc;
@@ -28,7 +29,7 @@ const MINUTE: Duration = Duration::from_secs(60);
2829

2930
const SKIP_PTR_UPDATES_THRESHOLD: Duration = Duration::from_secs(60 * 5);
3031

31-
pub(crate) struct SubgraphRunner<C, T>
32+
pub struct SubgraphRunner<C, T>
3233
where
3334
C: Blockchain,
3435
T: RuntimeHostBuilder<C>,
@@ -50,6 +51,7 @@ where
5051
ctx: IndexingContext<C, T>,
5152
logger: Logger,
5253
metrics: RunnerMetrics,
54+
env_vars: Arc<EnvVars>,
5355
) -> Self {
5456
Self {
5557
inputs: Arc::new(inputs),
@@ -59,8 +61,8 @@ where
5961
synced: false,
6062
skip_ptr_updates_timer: Instant::now(),
6163
backoff: ExponentialBackoff::new(
62-
(MINUTE * 2).min(ENV_VARS.subgraph_error_retry_ceil),
63-
ENV_VARS.subgraph_error_retry_ceil,
64+
(MINUTE * 2).min(env_vars.subgraph_error_retry_ceil),
65+
env_vars.subgraph_error_retry_ceil,
6466
),
6567
entity_lfu_cache: LfuCache::new(),
6668
},
@@ -69,7 +71,40 @@ where
6971
}
7072
}
7173

72-
pub async fn run(mut self) -> Result<(), Error> {
74+
/// Revert the state to a previous block. When handling revert operations
75+
/// or failed block processing, it is necessary to remove part of the existing
76+
/// in-memory state to keep it constent with DB changes.
77+
/// During block processing new dynamic data sources are added directly to the
78+
/// SubgraphInstance of the runner. This means that if, for whatever reason,
79+
/// the changes don;t complete then the remnants of that block processing must
80+
/// be removed. The same thing also applies to the block cache.
81+
/// This function must be called before continuing to process in order to avoid
82+
/// duplicated host insertion and POI issues with dirty entity changes.
83+
fn revert_state(&mut self, block_number: BlockNumber) -> Result<(), Error> {
84+
self.state.entity_lfu_cache = LfuCache::new();
85+
86+
// 1. Revert all hosts(created by DDS) up to block_number inclusively.
87+
// 2. Unmark any offchain data sources that were marked done on the blocks being removed.
88+
// When no offchain datasources are present, 2. should be a noop.
89+
self.ctx.revert_data_sources(block_number)?;
90+
Ok(())
91+
}
92+
93+
#[cfg(debug_assertions)]
94+
pub fn context(&self) -> &IndexingContext<C, T> {
95+
&self.ctx
96+
}
97+
98+
#[cfg(debug_assertions)]
99+
pub async fn run_for_test(self, break_on_restart: bool) -> Result<Self, Error> {
100+
self.run_inner(break_on_restart).await
101+
}
102+
103+
pub async fn run(self) -> Result<Self, Error> {
104+
self.run_inner(false).await
105+
}
106+
107+
async fn run_inner(mut self, break_on_restart: bool) -> Result<Self, Error> {
73108
// If a subgraph failed for deterministic reasons, before start indexing, we first
74109
// revert the deployment head. It should lead to the same result since the error was
75110
// deterministic.
@@ -134,7 +169,12 @@ where
134169
Action::Stop => {
135170
info!(self.logger, "Stopping subgraph");
136171
self.inputs.store.flush().await?;
137-
return Ok(());
172+
return Ok(self);
173+
}
174+
Action::Restart if break_on_restart => {
175+
info!(self.logger, "Stopping subgraph on break");
176+
self.inputs.store.flush().await?;
177+
return Ok(self);
138178
}
139179
Action::Restart => break,
140180
};
@@ -799,16 +839,8 @@ where
799839

800840
// Handle unexpected stream errors by marking the subgraph as failed.
801841
Err(e) => {
802-
// Clear entity cache when a subgraph fails.
803-
//
804-
// This is done to be safe and sure that there's no state that's
805-
// out of sync from the database.
806-
//
807-
// Without it, POI changes on failure would be kept in the entity cache
808-
// and be transacted incorrectly in the next run.
809-
self.state.entity_lfu_cache = LfuCache::new();
810-
811842
self.metrics.stream.deployment_failed.set(1.0);
843+
self.revert_state(block_ptr.block_number())?;
812844

813845
let message = format!("{:#}", e).replace("\n", "\t");
814846
let err = anyhow!("{}, code: {}", message, LogCode::SubgraphSyncingFailure);
@@ -920,11 +952,7 @@ where
920952
.deployment_head
921953
.set(subgraph_ptr.number as f64);
922954

923-
// Revert the in-memory state:
924-
// - Revert any dynamic data sources.
925-
// - Clear the entity cache.
926-
self.ctx.revert_data_sources(subgraph_ptr.number);
927-
self.state.entity_lfu_cache = LfuCache::new();
955+
self.revert_state(subgraph_ptr.number)?;
928956

929957
Ok(Action::Continue)
930958
}

graph/src/components/subgraph/host.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,14 @@ pub trait RuntimeHost<C: Blockchain>: Send + Sync + 'static {
6868
/// Block number in which this host was created.
6969
/// Returns `None` for static data sources.
7070
fn creation_block_number(&self) -> Option<BlockNumber>;
71+
72+
/// Offchain data sources track done_at which is set once the
73+
/// trigger has been processed.
74+
fn done_at(&self) -> Option<BlockNumber>;
75+
76+
/// Convenience function to avoid leaking internal representation of
77+
/// mutable number. Calling this on OnChain Datasources is a noop.
78+
fn set_done_at(&self, block: Option<BlockNumber>);
7179
}
7280

7381
pub struct HostMetrics {

graph/src/data/subgraph/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ where
6868

6969
/// The IPFS hash used to identifiy a deployment externally, i.e., the
7070
/// `Qm..` string that `graph-cli` prints when deploying to a subgraph
71-
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
71+
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Default)]
7272
pub struct DeploymentHash(String);
7373

7474
impl stable_hash_legacy::StableHash for DeploymentHash {

0 commit comments

Comments
 (0)