Skip to content

Commit 8492729

Browse files
committed
feat(eth-bridge): make database persistance explicit
1 parent 3f5a971 commit 8492729

5 files changed

Lines changed: 62 additions & 28 deletions

File tree

bridges/centralized-ethereum/src/actors/dr_database.rs

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use actix::prelude::*;
22
use serde::{Deserialize, Serialize};
3-
use std::{cmp, collections::HashMap, collections::hash_map::Entry, fmt, future::Future};
3+
use std::{cmp, collections::HashMap, collections::hash_map::Entry, fmt};
44
use web3::{ethabi::Bytes, types::U256};
55
use witnet_data_structures::chain::Hash;
66
use witnet_node::{storage_mngr, utils::stop_system_if_panicking};
@@ -13,6 +13,9 @@ const BRIDGE_DB_KEY: &[u8] = b"bridge_db_key";
1313
pub struct DrDatabase {
1414
dr: HashMap<DrId, DrInfoBridge>,
1515
max_dr_id: DrId,
16+
/// In-memory only: set by writes, cleared after a successful persist.
17+
#[serde(skip)]
18+
dirty: bool,
1619
}
1720

1821
impl Drop for DrDatabase {
@@ -22,20 +25,6 @@ impl Drop for DrDatabase {
2225
}
2326
}
2427

25-
impl DrDatabase {
26-
// Persist Data Request Database
27-
fn persist(&mut self) -> impl Future<Output = ()> + use<> {
28-
let f = storage_mngr::put(&BRIDGE_DB_KEY, self);
29-
30-
async move {
31-
match f.await {
32-
Ok(_) => log::debug!("Bridge database successfully persisted"),
33-
Err(e) => log::error!("Bridge database error during persistence: {e}"),
34-
}
35-
}
36-
}
37-
}
38-
3928
/// Data request ID, as set in the ethereum contract
4029
pub type DrId = U256;
4130

@@ -186,19 +175,24 @@ impl Message for CountDrsPerState {
186175
type Result = Result<(u32, u32, u32, u32), ()>;
187176
}
188177

178+
/// Persist the data request database to storage
179+
pub struct PersistDrDatabase;
180+
181+
impl Message for PersistDrDatabase {
182+
type Result = ();
183+
}
184+
189185
impl Handler<SetDrInfoBridge> for DrDatabase {
190186
type Result = ();
191187

192-
fn handle(&mut self, msg: SetDrInfoBridge, ctx: &mut Self::Context) -> Self::Result {
188+
fn handle(&mut self, msg: SetDrInfoBridge, _ctx: &mut Self::Context) -> Self::Result {
193189
let SetDrInfoBridge(dr_id, dr_info) = msg;
194190
let dr_state = dr_info.dr_state;
195191
self.dr.insert(dr_id, dr_info);
196192

197193
self.max_dr_id = cmp::max(self.max_dr_id, dr_id);
194+
self.dirty = true;
198195
log::debug!("Data request #{dr_id} inserted with state {dr_state}");
199-
200-
// Persist Data Request Database
201-
ctx.spawn(self.persist().into_actor(self));
202196
}
203197
}
204198

@@ -254,7 +248,7 @@ impl Handler<GetLastDrId> for DrDatabase {
254248
impl Handler<SetDrState> for DrDatabase {
255249
type Result = Result<(), ()>;
256250

257-
fn handle(&mut self, msg: SetDrState, ctx: &mut Self::Context) -> Self::Result {
251+
fn handle(&mut self, msg: SetDrState, _ctx: &mut Self::Context) -> Self::Result {
258252
let SetDrState { dr_id, dr_state } = msg;
259253
match self.dr.entry(dr_id) {
260254
Entry::Occupied(entry) => {
@@ -273,14 +267,44 @@ impl Handler<SetDrState> for DrDatabase {
273267
}
274268

275269
self.max_dr_id = cmp::max(self.max_dr_id, dr_id);
276-
277-
// Persist Data Request Database
278-
ctx.spawn(self.persist().into_actor(self));
270+
self.dirty = true;
279271

280272
Ok(())
281273
}
282274
}
283275

276+
impl Handler<PersistDrDatabase> for DrDatabase {
277+
type Result = ();
278+
279+
fn handle(&mut self, _msg: PersistDrDatabase, ctx: &mut Self::Context) -> Self::Result {
280+
if !self.dirty {
281+
return;
282+
}
283+
284+
let snapshot = self.clone();
285+
ctx.spawn(
286+
async move {
287+
match storage_mngr::put(&BRIDGE_DB_KEY, &snapshot).await {
288+
Ok(_) => {
289+
log::debug!("Bridge database successfully persisted");
290+
Ok(())
291+
}
292+
Err(e) => {
293+
log::error!("Bridge database error during persistence: {e}");
294+
Err(())
295+
}
296+
}
297+
}
298+
.into_actor(self)
299+
.map(|result, act, _| {
300+
if result.is_ok() {
301+
act.dirty = false;
302+
}
303+
}),
304+
);
305+
}
306+
}
307+
284308
impl Handler<CountDrsPerState> for DrDatabase {
285309
type Result = Result<(u32, u32, u32, u32), ()>;
286310

bridges/centralized-ethereum/src/actors/dr_reporter.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::{
2-
actors::dr_database::{DrDatabase, DrId, DrState, SetDrState},
2+
actors::dr_database::{DrDatabase, DrId, DrState, PersistDrDatabase, SetDrState},
33
config::Config,
44
handle_receipt,
55
};
@@ -295,6 +295,7 @@ impl Handler<DrReporterMsg> for DrReporter {
295295
.ok();
296296
}
297297
}
298+
dr_database_addr.do_send(PersistDrDatabase);
298299
}
299300
Err(()) => {
300301
log::error!(

bridges/centralized-ethereum/src/actors/dr_sender.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use crate::{
22
actors::{
3-
dr_database::{DrDatabase, DrInfoBridge, DrState, GetAllNewDrs, SetDrInfoBridge},
3+
dr_database::{
4+
DrDatabase, DrInfoBridge, DrState, GetAllNewDrs, PersistDrDatabase, SetDrInfoBridge,
5+
},
46
dr_reporter::{DrReporter, DrReporterMsg, Report},
57
},
68
config::Config,
@@ -173,6 +175,7 @@ impl DrSender {
173175
log::error!("[{dr_id}] >< cannot broadcast dr_tx: {e}");
174176
// In this case, refrain from trying to send remaining data requests,
175177
// and let the dr_sender actor try again on next poll.
178+
dr_database_addr.do_send(PersistDrDatabase);
176179
return witnet_node_pkh;
177180
}
178181
}
@@ -200,6 +203,8 @@ impl DrSender {
200203
}
201204
}
202205

206+
dr_database_addr.do_send(PersistDrDatabase);
207+
203208
dr_reporter_addr
204209
.send(DrReporterMsg {
205210
reports: dr_reporter_msgs,

bridges/centralized-ethereum/src/actors/eth_poller.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{
22
actors::dr_database::{
3-
DrDatabase, DrInfoBridge, DrState, GetLastDrId, SetDrInfoBridge, SetDrState,
4-
WitnetQueryStatus,
3+
DrDatabase, DrInfoBridge, DrState, GetLastDrId, PersistDrDatabase, SetDrInfoBridge,
4+
SetDrState, WitnetQueryStatus,
55
},
66
config::Config,
77
};
@@ -186,6 +186,7 @@ impl EthPoller {
186186
);
187187
}
188188
}
189+
dr_database_addr.do_send(PersistDrDatabase);
189190
} else {
190191
log::error!(
191192
"Fail to get status of queries #{} to #{}: {}",

bridges/centralized-ethereum/src/actors/wit_poller.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ use witnet_util::timestamp::get_timestamp;
1313

1414
use crate::{
1515
actors::{
16-
dr_database::{DrDatabase, DrInfoBridge, DrState, GetAllPendingDrs, SetDrInfoBridge},
16+
dr_database::{
17+
DrDatabase, DrInfoBridge, DrState, GetAllPendingDrs, PersistDrDatabase, SetDrInfoBridge,
18+
},
1719
dr_reporter::{DrReporter, DrReporterMsg, Report},
1820
},
1921
config::Config,
@@ -189,6 +191,7 @@ impl WitPoller {
189191
.unwrap();
190192
}
191193
}
194+
dr_database_addr.do_send(PersistDrDatabase);
192195
}
193196

194197
dr_reporter_addr

0 commit comments

Comments
 (0)