Skip to content

Commit a2fe887

Browse files
committed
store: Create contract_address index before call cache eviction
Add catalog::table_has_index() to check for index existence, and use it in clear_stale_call_cache to ensure a btree index on contract_address exists before running deletion queries. The index is created concurrently if missing, avoiding sequential scans on large call_cache tables.
1 parent 2e46573 commit a2fe887

2 files changed

Lines changed: 81 additions & 0 deletions

File tree

store/postgres/src/catalog.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -913,6 +913,32 @@ pub(crate) async fn indexes_for_table(
913913
Ok(results.into_iter().map(|i| i.def).collect())
914914
}
915915

916+
pub(crate) async fn table_has_index(
917+
conn: &mut AsyncPgConnection,
918+
schema_name: &str,
919+
index_name: &str,
920+
) -> Result<bool, StoreError> {
921+
#[derive(QueryableByName)]
922+
#[allow(dead_code)]
923+
struct Exists {
924+
#[diesel(sql_type = diesel::sql_types::Integer)]
925+
exists: i32,
926+
}
927+
928+
let exists = sql_query(
929+
"SELECT 1 AS exists FROM pg_indexes \
930+
WHERE schemaname = $1 AND indexname = $2",
931+
)
932+
.bind::<Text, _>(schema_name)
933+
.bind::<Text, _>(index_name)
934+
.get_result::<Exists>(conn)
935+
.await
936+
.optional()
937+
.map_err::<StoreError, _>(Into::into)?;
938+
939+
Ok(exists.is_some())
940+
}
941+
916942
pub(crate) async fn drop_index(
917943
conn: &mut AsyncPgConnection,
918944
schema_name: &str,

store/postgres/src/chain_store.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1643,13 +1643,68 @@ mod data {
16431643
}
16441644
}
16451645

1646+
const CALL_CACHE_CONTRACT_ADDRESS_INDEX: &str = "call_cache_contract_address";
1647+
1648+
/// Ensure that an index on `contract_address` exists on the
1649+
/// call_cache table to speed up deletion queries. If the index does
1650+
/// not exist, create it concurrently.
1651+
async fn ensure_contract_address_index(
1652+
&self,
1653+
conn: &mut AsyncPgConnection,
1654+
logger: &Logger,
1655+
) -> Result<(), Error> {
1656+
let (schema_name, table_qname) = match self {
1657+
Storage::Shared => ("public", "public.eth_call_cache".to_string()),
1658+
Storage::Private(Schema {
1659+
name, call_cache, ..
1660+
}) => (name.as_str(), call_cache.qname.clone()),
1661+
};
1662+
1663+
let has_index = crate::catalog::table_has_index(
1664+
conn,
1665+
schema_name,
1666+
Self::CALL_CACHE_CONTRACT_ADDRESS_INDEX,
1667+
)
1668+
.await?;
1669+
1670+
if !has_index {
1671+
let start = Instant::now();
1672+
info!(
1673+
logger,
1674+
"Creating index {} on {}.contract_address; \
1675+
this may take a long time",
1676+
Self::CALL_CACHE_CONTRACT_ADDRESS_INDEX,
1677+
table_qname
1678+
);
1679+
conn.batch_execute(&format!(
1680+
"create index concurrently if not exists {} \
1681+
on {}(contract_address)",
1682+
Self::CALL_CACHE_CONTRACT_ADDRESS_INDEX,
1683+
table_qname
1684+
))
1685+
.await?;
1686+
let duration = start.elapsed();
1687+
info!(
1688+
logger,
1689+
"Finished creating index {} on {}.contract_address in {:?}",
1690+
Self::CALL_CACHE_CONTRACT_ADDRESS_INDEX,
1691+
table_qname,
1692+
duration
1693+
);
1694+
}
1695+
1696+
Ok(())
1697+
}
1698+
16461699
pub async fn clear_stale_call_cache(
16471700
&self,
16481701
conn: &mut AsyncPgConnection,
16491702
logger: &Logger,
16501703
ttl_days: i32,
16511704
ttl_max_contracts: Option<i64>,
16521705
) -> Result<(), Error> {
1706+
self.ensure_contract_address_index(conn, logger).await?;
1707+
16531708
let mut total_calls: usize = 0;
16541709
let mut total_contracts: i64 = 0;
16551710
// We process contracts in batches to avoid loading too many

0 commit comments

Comments
 (0)