Skip to content

Commit eb6fae7

Browse files
committed
store: Make sure we do not run setup twice for the same pool
With the previous code, we would run setup initially when creating all pools, but they would not be marked as set up. On the first access to the pool we would try to run setup again, which is not needed. This change makes it so that we remember that we ran setup successfully when pools are created
1 parent 6710835 commit eb6fae7

1 file changed

Lines changed: 84 additions & 48 deletions

File tree

store/postgres/src/connection_pool.rs

Lines changed: 84 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -373,22 +373,27 @@ impl PoolState {
373373
/// Get a connection pool that is ready, i.e., has been through setup
374374
/// and running migrations
375375
fn get_ready(&self) -> Result<Arc<PoolInner>, StoreError> {
376-
let mut guard = self.inner.lock(&self.logger);
376+
// We have to be careful here that we do not hold a lock when we
377+
// call `setup_bg`, otherwise we will deadlock
378+
let (pool, coord) = {
379+
let guard = self.inner.lock(&self.logger);
380+
381+
use PoolStateInner::*;
382+
match &*guard {
383+
Created(pool, coord) => (pool.cheap_clone(), coord.cheap_clone()),
384+
Ready(pool) => return Ok(pool.clone()),
385+
}
386+
};
377387

378-
use PoolStateInner::*;
379-
match &*guard {
380-
Created(pool, coord) => {
381-
let migrated = coord.cheap_clone().setup_bg(pool.cheap_clone())?;
388+
// self is `Created` and needs to have setup run
389+
coord.setup_bg(self.cheap_clone())?;
382390

383-
if migrated {
384-
let pool2 = pool.cheap_clone();
385-
*guard = Ready(pool.cheap_clone());
386-
Ok(pool2)
387-
} else {
388-
Err(StoreError::DatabaseUnavailable)
389-
}
390-
}
391-
Ready(pool) => Ok(pool.clone()),
391+
// We just tried to set up the pool; if it is still not set up and
392+
// we didn't have an error, it means the database is not available
393+
if self.needs_setup() {
394+
return Err(StoreError::DatabaseUnavailable);
395+
} else {
396+
Ok(pool)
392397
}
393398
}
394399

@@ -401,6 +406,16 @@ impl PoolState {
401406
Created(pool, _) | Ready(pool) => pool.cheap_clone(),
402407
}
403408
}
409+
410+
fn needs_setup(&self) -> bool {
411+
let guard = self.inner.lock(&self.logger);
412+
413+
use PoolStateInner::*;
414+
match &*guard {
415+
Created(_, _) => true,
416+
Ready(_) => false,
417+
}
418+
}
404419
}
405420
#[derive(Clone)]
406421
pub struct ConnectionPool {
@@ -1186,7 +1201,7 @@ impl PoolInner {
11861201
async fn migrate(
11871202
self: Arc<Self>,
11881203
servers: &[ForeignServer],
1189-
) -> Result<(Arc<Self>, MigrationCount), StoreError> {
1204+
) -> Result<MigrationCount, StoreError> {
11901205
self.configure_fdw(servers)?;
11911206
let mut conn = self.get()?;
11921207
let (this, count) = conn.transaction(|conn| -> Result<_, StoreError> {
@@ -1196,7 +1211,7 @@ impl PoolInner {
11961211

11971212
this.locale_check(&this.logger, conn)?;
11981213

1199-
Ok((this, count))
1214+
Ok(count)
12001215
}
12011216

12021217
/// If this is the primary shard, drop the namespace `CROSS_SHARD_NSP`
@@ -1379,7 +1394,7 @@ fn migrate_schema(logger: &Logger, conn: &mut PgConnection) -> Result<MigrationC
13791394
/// of tables imported from that shard
13801395
pub struct PoolCoordinator {
13811396
logger: Logger,
1382-
pools: Mutex<HashMap<Shard, Arc<PoolInner>>>,
1397+
pools: Mutex<HashMap<Shard, PoolState>>,
13831398
servers: Arc<Vec<ForeignServer>>,
13841399
}
13851400

@@ -1419,16 +1434,12 @@ impl PoolCoordinator {
14191434
// Ignore non-writable pools (replicas), there is no need (and no
14201435
// way) to coordinate schema changes with them
14211436
if is_writable {
1422-
// It is safe to take this lock here since nobody has seen the pool
1423-
// yet. We remember the `PoolInner` so that later, when we have to
1424-
// call `remap()`, we do not have to take this lock as that will be
1425-
// already held in `get_ready()`
1426-
let inner = pool.inner.get_unready();
14271437
self.pools
14281438
.lock()
14291439
.unwrap()
1430-
.insert(pool.shard.clone(), inner.clone());
1440+
.insert(pool.shard.clone(), pool.inner.cheap_clone());
14311441
}
1442+
14321443
pool
14331444
}
14341445

@@ -1460,6 +1471,7 @@ impl PoolCoordinator {
14601471
if count.had_migrations() {
14611472
let server = self.server(&pool.shard)?;
14621473
for pool in self.pools.lock().unwrap().values() {
1474+
let pool = pool.get_unready();
14631475
let remap_res = pool.remap(server);
14641476
if let Err(e) = remap_res {
14651477
error!(pool.logger, "Failed to map imports from {}", server.shard; "error" => e.to_string());
@@ -1470,8 +1482,15 @@ impl PoolCoordinator {
14701482
Ok(())
14711483
}
14721484

1485+
/// Return a list of all pools, regardless of whether they are ready or
1486+
/// not.
14731487
pub fn pools(&self) -> Vec<Arc<PoolInner>> {
1474-
self.pools.lock().unwrap().values().cloned().collect()
1488+
self.pools
1489+
.lock()
1490+
.unwrap()
1491+
.values()
1492+
.map(|state| state.get_unready())
1493+
.collect::<Vec<_>>()
14751494
}
14761495

14771496
pub fn servers(&self) -> Arc<Vec<ForeignServer>> {
@@ -1486,14 +1505,12 @@ impl PoolCoordinator {
14861505
}
14871506

14881507
fn primary(&self) -> Result<Arc<PoolInner>, StoreError> {
1489-
self.pools
1490-
.lock()
1491-
.unwrap()
1492-
.get(&*PRIMARY_SHARD)
1493-
.cloned()
1494-
.ok_or_else(|| {
1495-
constraint_violation!("internal error: primary shard not found in pool coordinator")
1496-
})
1508+
let map = self.pools.lock().unwrap();
1509+
let pool_state = map.get(&*&PRIMARY_SHARD).ok_or_else(|| {
1510+
constraint_violation!("internal error: primary shard not found in pool coordinator")
1511+
})?;
1512+
1513+
Ok(pool_state.get_unready())
14971514
}
14981515

14991516
/// Setup all pools the coordinator knows about and return the number of
@@ -1528,7 +1545,7 @@ impl PoolCoordinator {
15281545

15291546
/// A helper to call `setup` from a non-async context. Returns `true` if
15301547
/// the setup was actually run, i.e. if `pool` was available
1531-
fn setup_bg(self: Arc<Self>, pool: Arc<PoolInner>) -> Result<bool, StoreError> {
1548+
fn setup_bg(self: Arc<Self>, pool: PoolState) -> Result<bool, StoreError> {
15321549
let migrated = graph::spawn_thread("database-setup", move || {
15331550
graph::block_on(self.setup(vec![pool.clone()]))
15341551
})
@@ -1555,37 +1572,43 @@ impl PoolCoordinator {
15551572
/// This method tolerates databases that are not available and will
15561573
/// simply ignore them. The returned count is the number of pools that
15571574
/// were successfully set up.
1558-
async fn setup(&self, pools: Vec<Arc<PoolInner>>) -> Result<usize, StoreError> {
1559-
type MigrationCounts = Vec<(Arc<PoolInner>, MigrationCount)>;
1575+
///
1576+
/// When this method returns, the entries from `states` that were
1577+
/// successfully set up will be marked as ready. The method returns the
1578+
/// number of pools that were set up
1579+
async fn setup(&self, states: Vec<PoolState>) -> Result<usize, StoreError> {
1580+
type MigrationCounts = Vec<(PoolState, MigrationCount)>;
15601581

15611582
/// Filter out pools that are not available. We don't want to fail
15621583
/// because one of the pools is not available. We will just ignore
15631584
/// them and continue with the others.
15641585
fn filter_unavailable<T>(
1565-
(pool, res): (Arc<PoolInner>, Result<T, StoreError>),
1566-
) -> Option<Result<T, StoreError>> {
1586+
(state, res): (PoolState, Result<T, StoreError>),
1587+
) -> Option<Result<(PoolState, T), StoreError>> {
15671588
if let Err(StoreError::DatabaseUnavailable) = res {
15681589
error!(
1569-
pool.logger,
1590+
state.logger,
15701591
"migrations failed because database was unavailable"
15711592
);
15721593
None
15731594
} else {
1574-
Some(res)
1595+
Some(res.map(|count| (state, count)))
15751596
}
15761597
}
15771598

15781599
/// Migrate all pools in parallel
15791600
async fn migrate(
1580-
pools: &[Arc<PoolInner>],
1601+
pools: &[PoolState],
15811602
servers: &[ForeignServer],
15821603
) -> Result<MigrationCounts, StoreError> {
15831604
let futures = pools
15841605
.iter()
1585-
.map(|pool| {
1586-
pool.cheap_clone()
1606+
.map(|state| {
1607+
state
1608+
.get_unready()
1609+
.cheap_clone()
15871610
.migrate(servers)
1588-
.map(|res| (pool.cheap_clone(), res))
1611+
.map(|res| (state.cheap_clone(), res))
15891612
})
15901613
.collect::<Vec<_>>();
15911614
join_all(futures)
@@ -1599,26 +1622,32 @@ impl PoolCoordinator {
15991622
async fn propagate(
16001623
this: &PoolCoordinator,
16011624
migrated: MigrationCounts,
1602-
) -> Result<usize, StoreError> {
1625+
) -> Result<Vec<PoolState>, StoreError> {
16031626
let futures = migrated
16041627
.into_iter()
1605-
.map(|(pool, count)| async move {
1628+
.map(|(state, count)| async move {
1629+
let pool = state.get_unready();
16061630
let res = this.propagate(&pool, count);
1607-
(pool.cheap_clone(), res)
1631+
(state.cheap_clone(), res)
16081632
})
16091633
.collect::<Vec<_>>();
16101634
join_all(futures)
16111635
.await
16121636
.into_iter()
16131637
.filter_map(filter_unavailable)
1638+
.map(|res| res.map(|(state, ())| state))
16141639
.collect::<Result<Vec<_>, _>>()
1615-
.map(|v| v.len())
16161640
}
16171641

16181642
let primary = self.primary()?;
16191643

16201644
let mut pconn = primary.get().map_err(|_| StoreError::DatabaseUnavailable)?;
16211645

1646+
let pools: Vec<_> = states
1647+
.into_iter()
1648+
.filter(|pool| pool.needs_setup())
1649+
.collect();
1650+
16221651
// Everything here happens under the migration lock. Anything called
16231652
// from here should not try to get that lock, otherwise the process
16241653
// will deadlock
@@ -1636,6 +1665,13 @@ impl PoolCoordinator {
16361665
})
16371666
.await;
16381667
debug!(self.logger, "Database setup finished");
1639-
res
1668+
1669+
// Mark all pool states that we set up completely as ready
1670+
res.map(|states| {
1671+
for state in &states {
1672+
state.set_ready();
1673+
}
1674+
states.len()
1675+
})
16401676
}
16411677
}

0 commit comments

Comments
 (0)