Skip to content

Commit 16c7ca6

Browse files
committed
node: fix graphman rewind to start block
1 parent e434138 commit 16c7ca6

2 files changed

Lines changed: 51 additions & 26 deletions

File tree

node/src/manager/commands/rewind.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use graph::anyhow::bail;
99
use graph::components::store::{BlockStore as _, ChainStore as _, DeploymentLocator};
1010
use graph::env::ENV_VARS;
1111
use graph::prelude::{anyhow, BlockNumber, BlockPtr};
12-
use graph_store_postgres::command_support::catalog::{self as store_catalog};
1312
use graph_store_postgres::{BlockStore, NotificationSender};
1413
use graph_store_postgres::{ConnectionPool, Store};
1514

@@ -78,8 +77,6 @@ pub async fn run(
7877
if !start_block && (block_hash.is_none() || block_number.is_none()) {
7978
bail!("--block-hash and --block-number must be specified when --start-block is not set");
8079
}
81-
let pconn = primary.get()?;
82-
let mut conn = store_catalog::Connection::new(pconn);
8380

8481
let subgraph_store = store.subgraph_store();
8582
let block_store = store.block_store();
@@ -126,14 +123,19 @@ pub async fn run(
126123

127124
println!("Checking if its safe to rewind deployments");
128125
for (_, locator) in &locators {
129-
let site = conn
130-
.locate_site(locator.clone())?
131-
.ok_or_else(|| anyhow!("failed to locate site for {locator}"))?;
132-
let deployment_store = subgraph_store.for_site(&site)?;
133-
let deployment_details = deployment_store.deployment_details_for_id(locator)?;
134-
let block_number_to = block_ptr_to.as_ref().map(|b| b.number).unwrap_or(0);
135-
136-
if block_number_to < deployment_details.earliest_block_number + ENV_VARS.reorg_threshold() {
126+
let deployment_details = subgraph_store.load_deployment_by_id(locator.clone().into())?;
127+
let mut block_number_to = block_ptr_to.as_ref().map(|b| b.number).unwrap_or(0);
128+
129+
if start_block {
130+
block_number_to = match deployment_details.start_block {
131+
Some(block) => block.number,
132+
None => 0,
133+
};
134+
}
135+
136+
if block_number_to < deployment_details.earliest_block_number + ENV_VARS.reorg_threshold()
137+
&& !start_block
138+
{
137139
bail!(
138140
"The block number {} is not safe to rewind to for deployment {}. The earliest block number of this deployment is {}. You can only safely rewind to block number {}",
139141
block_ptr_to.as_ref().map(|b| b.number).unwrap_or(0),

store/postgres/src/deployment.rs

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -581,22 +581,45 @@ pub fn revert_block_ptr(
581581
) -> Result<(), StoreError> {
582582
use deployment as d;
583583
use head as h;
584+
use subgraph_manifest as m;
584585

585-
// Intention is to revert to a block lower than the reorg threshold, on the other
586-
// hand the earliest we can possibly go is genesys block, so go to genesys even
587-
// if it's within the reorg threshold.
588-
let earliest_block = i32::max(ptr.number - ENV_VARS.reorg_threshold(), 0);
589-
let affected_rows = update(
590-
d::table
591-
.filter(d::id.eq(site.id))
592-
.filter(d::earliest_block_number.le(earliest_block)),
593-
)
594-
.set((
595-
d::reorg_count.eq(d::reorg_count + 1),
596-
d::current_reorg_depth.eq(d::current_reorg_depth + 1),
597-
d::max_reorg_depth.eq(sql("greatest(current_reorg_depth + 1, max_reorg_depth)")),
598-
))
599-
.execute(conn)?;
586+
// Check if ptr is equal to the start block of the deployment
587+
let (start_block_hash, start_block_number): (Option<Vec<u8>>, Option<BlockNumber>) = m::table
588+
.filter(m::id.eq(site.id))
589+
.select((m::start_block_hash, m::start_block_number))
590+
.first(conn)?;
591+
592+
let is_start_block = match (start_block_hash, start_block_number) {
593+
(Some(hash), Some(number)) => ptr.number == number && ptr.hash_slice() == hash.as_slice(),
594+
_ => false,
595+
};
596+
597+
let affected_rows = if is_start_block {
598+
update(d::table.filter(d::id.eq(site.id)))
599+
.set((
600+
d::reorg_count.eq(d::reorg_count + 1),
601+
d::current_reorg_depth.eq(d::current_reorg_depth + 1),
602+
d::max_reorg_depth.eq(sql("greatest(current_reorg_depth + 1, max_reorg_depth)")),
603+
d::earliest_block_number.eq(ptr.number),
604+
))
605+
.execute(conn)?
606+
} else {
607+
// Intention is to revert to a block lower than the reorg threshold, on the other
608+
// hand the earliest we can possibly go is genesis block, so go to genesis even
609+
// if it's within the reorg threshold.
610+
let earliest_block = i32::max(ptr.number - ENV_VARS.reorg_threshold(), 0);
611+
update(
612+
d::table
613+
.filter(d::id.eq(site.id))
614+
.filter(d::earliest_block_number.le(earliest_block)),
615+
)
616+
.set((
617+
d::reorg_count.eq(d::reorg_count + 1),
618+
d::current_reorg_depth.eq(d::current_reorg_depth + 1),
619+
d::max_reorg_depth.eq(sql("greatest(current_reorg_depth + 1, max_reorg_depth)")),
620+
))
621+
.execute(conn)?
622+
};
600623

601624
update(h::table.filter(h::id.eq(site.id)))
602625
.set((

0 commit comments

Comments
 (0)