Skip to content

Commit bc586b1

Browse files
committed
store: Make postponed index creation robust against concurrent builds
Acquire a fresh connection for each postponed index instead of holding one for the whole run, which can take hours. Before starting a `CREATE INDEX CONCURRENTLY`, wait for any index creation already running in the deployment's schema to finish, using `index_creation_is_running` to poll `pg_stat_progress_create_index`. Have CreateIndex::name return a string slice to avoid an allocation.
1 parent 01d6891 commit bc586b1

4 files changed

Lines changed: 122 additions & 27 deletions

File tree

store/postgres/src/catalog.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -882,6 +882,36 @@ pub(crate) async fn check_index_is_valid(
882882
Ok(matches!(result, Some(true)))
883883
}
884884

885+
/// Check whether there is an index creation currently running for any index
886+
/// in the given schema. If there is, return the PID of the backend that is
887+
/// creating the index and the name of the index being created.
888+
pub(crate) async fn index_creation_is_running(
889+
conn: &mut AsyncPgConnection,
890+
schema_name: &str,
891+
) -> Result<Option<(i32, String)>, StoreError> {
892+
#[derive(Queryable, QueryableByName)]
893+
struct IndexCreationCheck {
894+
#[diesel(sql_type = Integer)]
895+
pid: i32,
896+
#[diesel(sql_type = Text)]
897+
index_name: String,
898+
}
899+
900+
let query = "
901+
select ci.pid, ci.index_relid::regclass as index_name \
902+
from pg_stat_progress_create_index ci \
903+
join pg_class c on ci.relid = c.oid \
904+
join pg_namespace n on c.relnamespace = n.oid \
905+
where n.nspname = $1";
906+
sql_query(query)
907+
.bind::<Text, _>(schema_name)
908+
.get_result::<IndexCreationCheck>(conn)
909+
.await
910+
.optional()
911+
.map(|check| check.map(|check| (check.pid, check.index_name)))
912+
.map_err::<StoreError, _>(Into::into)
913+
}
914+
885915
pub(crate) async fn indexes_for_table(
886916
conn: &mut AsyncPgConnection,
887917
schema_name: &str,

store/postgres/src/deployment_store.rs

Lines changed: 89 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use detail::DeploymentDetail;
22
use diesel::sql_query;
33
use diesel_async::{AsyncConnection as _, RunQueryDsl, SimpleAsyncConnection};
4+
use graph::util::backoff::ExponentialBackoff;
45
use tokio::task::JoinHandle;
56

67
use graph::anyhow::Context;
@@ -1696,17 +1697,88 @@ impl DeploymentStore {
16961697
/// deployment. A run that was interrupted (for example by a restart while
16971698
/// a `CREATE INDEX CONCURRENTLY` was still in flight) can leave an invalid
16981699
/// index behind; such remnants are dropped and rebuilt on the next run.
1699-
pub(crate) fn create_postponed_indexes(&self, site: Arc<Site>) {
1700-
async fn run(store: DeploymentStore, site: Arc<Site>) -> Result<(), StoreError> {
1701-
let layout = store.find_layout(site.cheap_clone()).await?;
1702-
let creat = layout.index_creator(true, true);
1700+
pub(crate) fn create_postponed_indexes(&self, logger: &Logger, site: Arc<Site>) {
1701+
async fn index_creation_is_running(
1702+
store: &DeploymentStore,
1703+
site: &Site,
1704+
) -> Result<Option<(i32, String)>, StoreError> {
17031705
let mut conn = store.pool.get_permitted().await?;
1706+
catalog::index_creation_is_running(&mut conn, site.namespace.as_str()).await
1707+
}
17041708

1705-
if deployment::postponed_indexes_created(&mut conn, &site).await? {
1706-
return Ok(());
1709+
async fn postponed_indexes_created(
1710+
store: &DeploymentStore,
1711+
site: &Site,
1712+
) -> Result<bool, StoreError> {
1713+
let mut conn = store.pool.get_permitted().await?;
1714+
deployment::postponed_indexes_created(&mut conn, site).await
1715+
}
1716+
1717+
async fn wait_for_concurrent_index_creation(
1718+
logger: &Logger,
1719+
store: &DeploymentStore,
1720+
site: &Site,
1721+
) -> Result<(), StoreError> {
1722+
let mut backoff =
1723+
ExponentialBackoff::new(Duration::from_secs(1), Duration::from_mins(5));
1724+
let mut last_log = Instant::now() - Duration::from_mins(2);
1725+
while let Some((pid, index_name)) = index_creation_is_running(store, site).await? {
1726+
if last_log.elapsed() > Duration::from_mins(1) {
1727+
info!(logger,
1728+
"Waiting for concurrent index creation to finish";
1729+
"pid" => pid,
1730+
"index_name" => index_name,
1731+
);
1732+
last_log = Instant::now();
1733+
}
1734+
backoff.sleep_async().await;
17071735
}
1736+
Ok(())
1737+
}
17081738

1739+
async fn create_index(
1740+
store: &DeploymentStore,
1741+
layout: &Layout,
1742+
site: &Site,
1743+
idx: &CreateIndex,
1744+
) -> Result<(), StoreError> {
1745+
let mut conn = store.pool.get_permitted().await?;
17091746
let schema_name = site.namespace.as_str();
1747+
1748+
// A previous run that was interrupted (e.g. by a node
1749+
// restart) can leave an invalid index behind. Since we
1750+
// create indexes with `if not exists`, such a leftover
1751+
// would be skipped and never rebuilt, so drop it first.
1752+
// `check_index_is_valid` returns `false` both when the
1753+
// index is missing and when it is invalid; the
1754+
// `drop index ... if exists` is a no-op in the former
1755+
// case and removes the invalid remnant in the latter.
1756+
if let Some(name) = idx.name()
1757+
&& !catalog::check_index_is_valid(&mut conn, schema_name, name).await?
1758+
{
1759+
let drop_sql = format!("drop index concurrently if exists {schema_name}.{name}");
1760+
sql_query(drop_sql).execute(&mut conn).await?;
1761+
}
1762+
1763+
let creat = layout.index_creator(true, true);
1764+
IndexCreator::execute(&creat, &mut conn, idx).await
1765+
}
1766+
1767+
async fn run(
1768+
logger: Logger,
1769+
store: DeploymentStore,
1770+
site: Arc<Site>,
1771+
) -> Result<(), StoreError> {
1772+
let layout = store.find_layout(site.cheap_clone()).await?;
1773+
1774+
// Since this entire run can take many hours, we avoid holding a
1775+
// connection for the whole time. Instead, we get a new
1776+
// connection for each index that we create.
1777+
1778+
if postponed_indexes_created(&store, &site).await? {
1779+
return Ok(());
1780+
}
1781+
17101782
for table in layout.tables.values() {
17111783
let indexes = table.indexes(&layout.input_schema).map_err(|e| {
17121784
StoreError::ConstraintViolation(format!("failed to generate indexes: {}", e))
@@ -1716,38 +1788,31 @@ impl DeploymentStore {
17161788
continue;
17171789
}
17181790

1719-
// A previous run that was interrupted (e.g. by a node
1720-
// restart) can leave an invalid index behind. Since we
1721-
// create indexes with `if not exists`, such a leftover
1722-
// would be skipped and never rebuilt, so drop it first.
1723-
// `check_index_is_valid` returns `false` both when the
1724-
// index is missing and when it is invalid; the
1725-
// `drop index ... if exists` is a no-op in the former
1726-
// case and removes the invalid remnant in the latter.
1727-
if let Some(name) = idx.name()
1728-
&& !catalog::check_index_is_valid(&mut conn, schema_name, &name).await?
1729-
{
1730-
let drop_sql =
1731-
format!("drop index concurrently if exists {schema_name}.{name}");
1732-
sql_query(drop_sql).execute(&mut conn).await?;
1733-
}
1791+
wait_for_concurrent_index_creation(&logger, &store, &site).await?;
1792+
1793+
create_index(&store, &layout, &site, &idx).await?;
17341794

1735-
IndexCreator::execute(&creat, &mut conn, &idx).await?;
1795+
debug!(logger, "Created index";
1796+
"index_name" => idx.name().unwrap_or("<unknown>"),
1797+
"table_name" => table.name.as_str(),
1798+
);
17361799
}
17371800
}
17381801

1802+
let mut conn = store.pool.get_permitted().await?;
17391803
deployment::set_postponed_indexes_created(&mut conn, &site).await?;
17401804
Ok(())
17411805
}
17421806

17431807
let store = self.cheap_clone();
1744-
let logger = Logger::new(&self.logger, o!("component" => "IndexCreation"));
1808+
let logger = Logger::new(logger, o!("component" => "IndexCreation"));
17451809
graph::spawn(async move {
17461810
let logger2 = logger.cheap_clone();
17471811
let res = retry::forever(&logger2, "create_postponed_indexes", || {
17481812
let store = store.cheap_clone();
17491813
let site = site.cheap_clone();
1750-
async move { run(store, site).await }
1814+
let logger = logger2.cheap_clone();
1815+
async move { run(logger, store, site).await }
17511816
})
17521817
.await;
17531818
match res {

store/postgres/src/relational/index.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -666,10 +666,10 @@ impl CreateIndex {
666666
}
667667
}
668668

669-
pub fn name(&self) -> Option<String> {
669+
pub fn name(&self) -> Option<&str> {
670670
match self {
671671
CreateIndex::Unknown { .. } => None,
672-
CreateIndex::Parsed { name, .. } => Some(name.clone()),
672+
CreateIndex::Parsed { name, .. } => Some(name.as_str()),
673673
}
674674
}
675675

store/postgres/src/writable.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ impl SyncStore {
495495

496496
async fn create_postponed_indexes(&self) -> Result<(), StoreError> {
497497
self.writable
498-
.create_postponed_indexes(self.site.cheap_clone());
498+
.create_postponed_indexes(&self.logger, self.site.cheap_clone());
499499
Ok(())
500500
}
501501

0 commit comments

Comments
 (0)