-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Expand file tree
/
Copy pathmapping.rs
More file actions
389 lines (348 loc) · 15.2 KB
/
mapping.rs
File metadata and controls
389 lines (348 loc) · 15.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
use crate::gas_rules::GasRules;
use crate::module::{ExperimentalFeatures, ToAscPtr, WasmInstance, WasmInstanceData};
use graph::blockchain::{BlockTime, Blockchain, HostFn};
use graph::components::store::SubgraphFork;
use graph::components::subgraph::{MappingError, SharedProofOfIndexing};
use graph::data_source::{MappingTrigger, TriggerWithHandler};
use graph::futures01::sync::mpsc;
use graph::futures01::{Future as _, Stream as _};
use graph::futures03::channel::oneshot::Sender;
use graph::parking_lot::RwLock;
use graph::prelude::*;
use graph::runtime::IndexForAscTypeId;
use graph::runtime::gas::Gas;
use parity_wasm::elements::ExportEntry;
use std::collections::{BTreeMap, HashMap};
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{panic, thread};
/// Spawn a wasm module in its own thread.
pub fn spawn_module<C: Blockchain>(
raw_module: &[u8],
logger: Logger,
subgraph_id: DeploymentHash,
host_metrics: Arc<HostMetrics>,
runtime: tokio::runtime::Handle,
timeout: Option<Duration>,
experimental_features: ExperimentalFeatures,
) -> Result<mpsc::Sender<WasmRequest<C>>, anyhow::Error>
where
<C as Blockchain>::MappingTrigger: ToAscPtr,
{
static THREAD_COUNT: AtomicUsize = AtomicUsize::new(0);
let valid_module = Arc::new(ValidModule::new(&logger, raw_module, timeout)?);
// Create channel for event handling requests
let (mapping_request_sender, mapping_request_receiver) = mpsc::channel(100);
// It used to be that we had to create a dedicated thread since wasmtime
// instances were not `Send` and could therefore not be scheduled by the
// regular tokio executor. This isn't an issue anymore, but we still
// spawn a dedicated thread since running WASM code async can block and
// lock up the executor. See [the wasmtime
// docs](https://docs.rs/wasmtime/latest/wasmtime/struct.Config.html#execution-in-poll)
// on how this should be handled properly. As that is a fairly large
// change to how we use wasmtime, we keep the threading model for now.
// Once we are confident that things are working that way, we should
// revisit this and remove the dedicated thread.
//
// In case of failure, this thread may panic or simply terminate,
// dropping the `mapping_request_receiver` which ultimately causes the
// subgraph to fail the next time it tries to handle an event.
let next_id = THREAD_COUNT.fetch_add(1, Ordering::SeqCst);
let conf = thread::Builder::new().name(format!("mapping-{}-{:0>4}", &subgraph_id, next_id));
conf.spawn(move || {
let _runtime_guard = runtime.enter();
// Pass incoming triggers to the WASM module and return entity changes;
// Stop when canceled because all RuntimeHosts and their senders were dropped.
match mapping_request_receiver
.map_err(|()| unreachable!())
.for_each(move |request| {
let WasmRequest {
ctx,
inner,
result_sender,
} = request;
let logger = ctx.logger.clone();
let handle_fut = async {
let result = instantiate_module::<C>(
valid_module.cheap_clone(),
ctx,
host_metrics.cheap_clone(),
experimental_features,
)
.await;
match result {
Ok(module) => match inner {
WasmRequestInner::TriggerRequest(trigger) => {
handle_trigger(&logger, module, trigger, host_metrics.cheap_clone())
.await
}
},
Err(e) => Err(MappingError::Unknown(e)),
}
};
let result = panic::catch_unwind(AssertUnwindSafe(|| graph::block_on(handle_fut)));
let result = match result {
Ok(result) => result,
Err(panic_info) => {
let err_msg = if let Some(payload) = panic_info
.downcast_ref::<String>()
.map(String::as_str)
.or(panic_info.downcast_ref::<&str>().copied())
{
anyhow!("Subgraph panicked with message: {}", payload)
} else {
anyhow!("Subgraph panicked with an unknown payload.")
};
Err(MappingError::Unknown(err_msg))
}
};
result_sender
.send(result)
.map_err(|_| anyhow::anyhow!("WASM module result receiver dropped."))
})
.wait()
{
Ok(()) => debug!(logger, "Subgraph stopped, WASM runtime thread terminated"),
Err(e) => debug!(logger, "WASM runtime thread terminated abnormally";
"error" => e.to_string()),
}
})
.map(|_| ())
.context("Spawning WASM runtime thread failed")?;
Ok(mapping_request_sender)
}
async fn instantiate_module<C: Blockchain>(
valid_module: Arc<ValidModule>,
ctx: MappingContext,
host_metrics: Arc<HostMetrics>,
experimental_features: ExperimentalFeatures,
) -> Result<WasmInstance, anyhow::Error>
where
<C as Blockchain>::MappingTrigger: ToAscPtr,
{
// Start the WASM module runtime.
let _section = host_metrics.stopwatch.start_section("module_init");
WasmInstance::from_valid_module_with_ctx(
valid_module,
ctx,
host_metrics.cheap_clone(),
experimental_features,
)
.await
.context("module instantiation failed")
}
async fn handle_trigger<C: Blockchain>(
logger: &Logger,
module: WasmInstance,
trigger: TriggerWithHandler<MappingTrigger<C>>,
host_metrics: Arc<HostMetrics>,
) -> Result<(BlockState, Gas), MappingError>
where
<C as Blockchain>::MappingTrigger: ToAscPtr,
{
let logger = logger.cheap_clone();
let _section = host_metrics.stopwatch.start_section("run_handler");
if ENV_VARS.log_trigger_data {
debug!(logger, "trigger data: {:?}", trigger);
}
module.handle_trigger(trigger).await
}
pub struct WasmRequest<C: Blockchain> {
pub(crate) ctx: MappingContext,
pub(crate) inner: WasmRequestInner<C>,
pub(crate) result_sender: Sender<Result<(BlockState, Gas), MappingError>>,
}
impl<C: Blockchain> WasmRequest<C> {
pub(crate) fn new_trigger(
ctx: MappingContext,
trigger: TriggerWithHandler<MappingTrigger<C>>,
result_sender: Sender<Result<(BlockState, Gas), MappingError>>,
) -> Self {
WasmRequest {
ctx,
inner: WasmRequestInner::TriggerRequest(trigger),
result_sender,
}
}
}
pub enum WasmRequestInner<C: Blockchain> {
TriggerRequest(TriggerWithHandler<MappingTrigger<C>>),
}
pub struct MappingContext {
pub logger: Logger,
pub host_exports: Arc<crate::host_exports::HostExports>,
pub block_ptr: BlockPtr,
pub timestamp: BlockTime,
pub state: BlockState,
pub proof_of_indexing: SharedProofOfIndexing,
pub host_fns: Arc<Vec<HostFn>>,
pub debug_fork: Option<Arc<dyn SubgraphFork>>,
/// Logger for messages coming from mappings
pub mapping_logger: Logger,
/// Whether to log details about host fn execution
pub instrument: bool,
}
impl MappingContext {
pub fn derive_with_empty_block_state(&self) -> Self {
MappingContext {
logger: self.logger.cheap_clone(),
host_exports: self.host_exports.cheap_clone(),
block_ptr: self.block_ptr.cheap_clone(),
timestamp: self.timestamp,
state: BlockState::new(
self.state.entity_cache.store.clone(),
Default::default(),
self.state.seq_gen(),
),
proof_of_indexing: self.proof_of_indexing.cheap_clone(),
host_fns: self.host_fns.cheap_clone(),
debug_fork: self.debug_fork.cheap_clone(),
mapping_logger: Logger::new(&self.logger, o!("component" => "UserMapping")),
instrument: self.instrument,
}
}
}
// See the start_index comment below for more information.
const GN_START_FUNCTION_NAME: &str = "gn::start";
/// A pre-processed and valid WASM module, ready to be started as a WasmModule.
pub struct ValidModule {
pub module: wasmtime::Module,
/// Pre-linked instance template. Created once at module validation time and reused for every
/// trigger instantiation, avoiding the cost of rebuilding the linker (~60 host function
/// registrations) and resolving imports on each trigger.
pub instance_pre: wasmtime::InstancePre<WasmInstanceData>,
// Due to our internal architecture we don't want to run the start function at instantiation time,
// so we track it separately so that we can run it at an appropriate time.
// Since the start function is not an export, we will also create an export for it.
// It's an option because start might not be present.
pub start_function: Option<String>,
// A wasm import consists of a `module` and a `name`. AS will generate imports such that they
// have `module` set to the name of the file it is imported from and `name` set to the imported
// function name or `namespace.function` if inside a namespace. We'd rather not specify names of
// source files, so we consider that the import `name` uniquely identifies an import. Still we
// need to know the `module` to properly link it, so here we map import names to modules.
//
// AS now has an `@external("module", "name")` decorator which would make things cleaner, but
// the ship has sailed.
pub import_name_to_modules: BTreeMap<String, Vec<String>>,
// The timeout for the module.
pub timeout: Option<Duration>,
// Used as a guard to terminate this task dependency.
epoch_counter_abort_handle: Option<tokio::task::AbortHandle>,
/// Cache for asc_type_id results. Maps IndexForAscTypeId to their WASM runtime
/// type IDs. Populated lazily on first use; deterministic per compiled module.
asc_type_id_cache: RwLock<HashMap<IndexForAscTypeId, u32>>,
}
impl ValidModule {
/// Pre-process and validate the module.
pub fn new(
logger: &Logger,
raw_module: &[u8],
timeout: Option<Duration>,
) -> Result<Self, anyhow::Error> {
// Add the gas calls here. Module name "gas" must match. See also
// e3f03e62-40e4-4f8c-b4a1-d0375cca0b76. We do this by round-tripping the module through
// parity - injecting gas then serializing again.
let parity_module = parity_wasm::elements::Module::from_bytes(raw_module)?;
let mut parity_module = match parity_module.parse_names() {
Ok(module) => module,
Err((errs, module)) => {
for (index, err) in errs {
warn!(
logger,
"unable to parse function name for index {}: {}",
index,
err.to_string()
);
}
module
}
};
let start_function = parity_module.start_section().map(|index| {
let name = GN_START_FUNCTION_NAME.to_string();
parity_module.clear_start_section();
parity_module
.export_section_mut()
.unwrap()
.entries_mut()
.push(ExportEntry::new(
name.clone(),
parity_wasm::elements::Internal::Function(index),
));
name
});
let backend = wasm_instrument::gas_metering::host_function::Injector::new("gas", "gas");
let parity_module =
wasm_instrument::gas_metering::inject(parity_module, backend, &GasRules)
.map_err(|_| anyhow!("Failed to inject gas counter"))?;
let raw_module = parity_module.into_bytes()?;
// We use Cranelift as a compilation engine. Cranelift is an optimizing compiler, but that
// should not cause determinism issues since it adheres to the Wasm spec and NaN
// canonicalization is enabled below. The optimization level is configurable via
// GRAPH_WASM_OPT_LEVEL (default: speed).
let mut config = wasmtime::Config::new();
config.strategy(wasmtime::Strategy::Cranelift);
config.epoch_interruption(true);
config.cranelift_nan_canonicalization(true); // For NaN determinism.
config.cranelift_opt_level(match ENV_VARS.mappings.wasm_opt_level {
graph::env::WasmOptLevel::None => wasmtime::OptLevel::None,
graph::env::WasmOptLevel::Speed => wasmtime::OptLevel::Speed,
graph::env::WasmOptLevel::SpeedAndSize => wasmtime::OptLevel::SpeedAndSize,
});
config.max_wasm_stack(ENV_VARS.mappings.max_stack_size);
config.async_support(true);
let engine = &wasmtime::Engine::new(&config)?;
let module = wasmtime::Module::from_binary(engine, &raw_module)?;
let mut import_name_to_modules: BTreeMap<String, Vec<String>> = BTreeMap::new();
// Unwrap: Module linking is disabled.
for (name, module) in module
.imports()
.map(|import| (import.name(), import.module()))
{
import_name_to_modules
.entry(name.to_string())
.or_default()
.push(module.to_string());
}
let mut epoch_counter_abort_handle = None;
if let Some(timeout) = timeout {
let engine = engine.clone();
// The epoch counter task will perpetually increment the epoch every `timeout` seconds.
// Timeouts on instantiated modules will trigger on epoch deltas.
// Note: The epoch is an u64 so it will never overflow.
// See also: runtime-timeouts
let epoch_counter = async move {
loop {
tokio::time::sleep(timeout).await;
engine.increment_epoch();
}
};
epoch_counter_abort_handle = Some(graph::spawn(epoch_counter).abort_handle());
}
let linker = crate::module::build_linker(engine, &import_name_to_modules)?;
let instance_pre = linker.instantiate_pre(&module)?;
Ok(ValidModule {
module,
instance_pre,
import_name_to_modules,
start_function,
timeout,
epoch_counter_abort_handle,
asc_type_id_cache: RwLock::new(HashMap::new()),
})
}
pub fn get_cached_type_id(&self, idx: IndexForAscTypeId) -> Option<u32> {
self.asc_type_id_cache.read().get(&idx).copied()
}
pub fn cache_type_id(&self, idx: IndexForAscTypeId, type_id: u32) {
self.asc_type_id_cache.write().insert(idx, type_id);
}
}
impl Drop for ValidModule {
fn drop(&mut self) {
if let Some(handle) = self.epoch_counter_abort_handle.take() {
handle.abort();
}
}
}