Skip to content

Commit 8316ecb

Browse files
committed
store: Create postponed indexes in the background
The runner used to block block processing while postponed indexes were created. Since these are built with CREATE INDEX CONCURRENTLY and can take a long time, spawn the work in a background task so indexing continues. Errors are only logged; the postponed_indexes_created flag is set only on success, so a failed run is retried the next time the deployment starts.
1 parent 3bd36c9 commit 8316ecb

2 files changed

Lines changed: 46 additions & 19 deletions

File tree

store/postgres/src/deployment_store.rs

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use graph::internal_error;
4242
use graph::prelude::{
4343
AttributeNames, BlockNumber, BlockPtr, CheapClone, DeploymentHash, DeploymentState, ENV_VARS,
4444
Entity, EntityQuery, Error, Logger, QueryExecutionError, StopwatchMetrics, StoreError,
45-
UnfailOutcome, Value, anyhow, debug, info, o, warn,
45+
UnfailOutcome, Value, anyhow, debug, error, info, o, warn,
4646
};
4747
use graph::schema::{ApiSchema, EntityKey, EntityType, InputSchema};
4848

@@ -1686,28 +1686,55 @@ impl DeploymentStore {
16861686
/// this is a one-shot operation: indexes are created exactly once,
16871687
/// and we never recreate them — even if an external process removes
16881688
/// indexes that it considers unused.
1689-
pub(crate) async fn create_postponed_indexes(&self, site: Arc<Site>) -> Result<(), StoreError> {
1690-
let layout = self.find_layout(site.cheap_clone()).await?;
1691-
let creat = layout.index_creator(true, true);
1692-
let mut conn = self.pool.get_permitted().await?;
1689+
///
1690+
/// The actual index creation runs in the background so that callers are
1691+
/// not blocked while the (potentially long-running) `CREATE INDEX
1692+
/// CONCURRENTLY` statements execute. Because of that, errors are only
1693+
/// logged and not returned to the caller. The `postponed_indexes_created`
1694+
/// flag is only set once all indexes have been created successfully, so a
1695+
/// failed run is retried the next time `graph-node` starts the
1696+
/// deployment.
1697+
pub(crate) fn create_postponed_indexes(&self, site: Arc<Site>) {
1698+
async fn run(store: DeploymentStore, site: Arc<Site>) -> Result<(), StoreError> {
1699+
let layout = store.find_layout(site.cheap_clone()).await?;
1700+
let creat = layout.index_creator(true, true);
1701+
let mut conn = store.pool.get_permitted().await?;
16931702

1694-
if deployment::postponed_indexes_created(&mut conn, &site).await? {
1695-
return Ok(());
1696-
}
1703+
if deployment::postponed_indexes_created(&mut conn, &site).await? {
1704+
return Ok(());
1705+
}
16971706

1698-
for table in layout.tables.values() {
1699-
let indexes = table.indexes(&layout.input_schema).map_err(|e| {
1700-
StoreError::ConstraintViolation(format!("failed to generate indexes: {}", e))
1701-
})?;
1702-
for idx in indexes {
1703-
if idx.to_postpone() {
1704-
IndexCreator::execute(&creat, &mut conn, &idx).await?;
1707+
for table in layout.tables.values() {
1708+
let indexes = table.indexes(&layout.input_schema).map_err(|e| {
1709+
StoreError::ConstraintViolation(format!("failed to generate indexes: {}", e))
1710+
})?;
1711+
for idx in indexes {
1712+
if idx.to_postpone() {
1713+
IndexCreator::execute(&creat, &mut conn, &idx).await?;
1714+
}
17051715
}
17061716
}
1717+
1718+
deployment::set_postponed_indexes_created(&mut conn, &site).await?;
1719+
Ok(())
17071720
}
17081721

1709-
deployment::set_postponed_indexes_created(&mut conn, &site).await?;
1710-
Ok(())
1722+
let store = self.cheap_clone();
1723+
let logger = Logger::new(&self.logger, o!("component" => "IndexCreation"));
1724+
graph::spawn(async move {
1725+
let logger2 = logger.cheap_clone();
1726+
let res = retry::forever(&logger2, "create_postponed_indexes", || {
1727+
let store = store.cheap_clone();
1728+
let site = site.cheap_clone();
1729+
async move { run(store, site).await }
1730+
})
1731+
.await;
1732+
match res {
1733+
Ok(()) => debug!(logger, "Created postponed indexes"),
1734+
Err(e) => error!(logger, "Failed to create postponed indexes";
1735+
"error" => e.to_string()),
1736+
}
1737+
});
17111738
}
17121739

17131740
// If the current block of the deployment is the same as the fatal error,

store/postgres/src/writable.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -495,8 +495,8 @@ impl SyncStore {
495495

496496
async fn create_postponed_indexes(&self) -> Result<(), StoreError> {
497497
self.writable
498-
.create_postponed_indexes(self.site.cheap_clone())
499-
.await
498+
.create_postponed_indexes(self.site.cheap_clone());
499+
Ok(())
500500
}
501501

502502
fn input_schema(&self) -> InputSchema {

0 commit comments

Comments
 (0)