Skip to content

Commit c6f02fc

Browse files
committed
store: Make sure we always map the right set of foreign tables
1 parent 3bc2032 commit c6f02fc

2 files changed

Lines changed: 72 additions & 18 deletions

File tree

store/postgres/src/catalog.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,16 @@ pub fn drop_foreign_schema(conn: &mut PgConnection, src: &Site) -> Result<(), St
398398
Ok(())
399399
}
400400

401+
pub fn foreign_tables(conn: &mut PgConnection, nsp: &str) -> Result<Vec<String>, StoreError> {
402+
use foreign_tables as ft;
403+
404+
ft::table
405+
.filter(ft::foreign_table_schema.eq(nsp))
406+
.select(ft::foreign_table_name)
407+
.get_results::<String>(conn)
408+
.map_err(StoreError::from)
409+
}
410+
401411
/// Drop the schema `nsp` and all its contents if it exists, and create it
402412
/// again so that `nsp` is an empty schema
403413
pub fn recreate_schema(conn: &mut PgConnection, nsp: &str) -> Result<(), StoreError> {

store/postgres/src/connection_pool.rs

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ use crate::primary::{self, NAMESPACE_PUBLIC};
3737
use crate::{advisory_lock, catalog};
3838
use crate::{Shard, PRIMARY_SHARD};
3939

40+
/// Tables that we map from the primary into `primary_public` in each shard
41+
const PRIMARY_TABLES: [&str; 3] = ["deployment_schemas", "chains", "active_copies"];
42+
43+
/// Tables that we map from each shard into each other shard into the
44+
/// `shard_<name>_subgraphs` namespace
4045
const SHARDED_TABLES: [(&str, &[&str]); 2] = [
4146
("public", &["ethereum_networks"]),
4247
(
@@ -209,7 +214,7 @@ impl ForeignServer {
209214
catalog::recreate_schema(conn, Self::PRIMARY_PUBLIC)?;
210215

211216
let mut query = String::new();
212-
for table_name in ["deployment_schemas", "chains", "active_copies"] {
217+
for table_name in PRIMARY_TABLES {
213218
let create_stmt = if shard == &*PRIMARY_SHARD {
214219
format!(
215220
"create view {nsp}.{table_name} as select * from public.{table_name};",
@@ -246,6 +251,33 @@ impl ForeignServer {
246251
}
247252
Ok(conn.batch_execute(&query)?)
248253
}
254+
255+
fn needs_remap(&self, conn: &mut PgConnection) -> Result<bool, StoreError> {
256+
fn different(mut existing: Vec<String>, mut needed: Vec<String>) -> bool {
257+
existing.sort();
258+
needed.sort();
259+
existing != needed
260+
}
261+
262+
if &self.shard == &*PRIMARY_SHARD {
263+
let existing = catalog::foreign_tables(conn, Self::PRIMARY_PUBLIC)?;
264+
let needed = PRIMARY_TABLES
265+
.into_iter()
266+
.map(String::from)
267+
.collect::<Vec<_>>();
268+
if different(existing, needed) {
269+
return Ok(true);
270+
}
271+
}
272+
273+
let existing = catalog::foreign_tables(conn, &Self::metadata_schema(&self.shard))?;
274+
let needed = SHARDED_TABLES
275+
.iter()
276+
.flat_map(|(_, tables)| *tables)
277+
.map(|table| table.to_string())
278+
.collect::<Vec<_>>();
279+
Ok(different(existing, needed))
280+
}
249281
}
250282

251283
/// How long to keep connections in the `fdw_pool` around before closing
@@ -1037,16 +1069,14 @@ impl PoolInner {
10371069
let result = pool
10381070
.configure_fdw(coord.servers.as_ref())
10391071
.and_then(|()| pool.drop_cross_shard_views())
1040-
.and_then(|()| migrate_schema(&pool.logger, &mut conn))
1041-
.and_then(|count| {
1042-
pool.create_cross_shard_views(coord.servers.as_ref())
1043-
.map(|()| count)
1044-
});
1072+
.and_then(|()| migrate_schema(&pool.logger, &mut conn));
10451073
debug!(&pool.logger, "Release migration lock");
10461074
advisory_lock::unlock_migration(&mut conn).unwrap_or_else(|err| {
10471075
die(&pool.logger, "failed to release migration lock", &err);
10481076
});
1049-
let result = result.and_then(|count| coord.propagate(&pool, count));
1077+
let result = result
1078+
.and_then(|count| coord.propagate(&pool, count))
1079+
.and_then(|()| pool.create_cross_shard_views(coord.servers.as_ref()));
10501080
result.unwrap_or_else(|err| die(&pool.logger, "migrations failed", &err));
10511081

10521082
// Locale check
@@ -1178,9 +1208,9 @@ impl PoolInner {
11781208
.await
11791209
}
11801210

1181-
// The foreign server `server` had schema changes, and we therefore need
1182-
// to remap anything that we are importing via fdw to make sure we are
1183-
// using this updated schema
1211+
/// The foreign server `server` had schema changes, and we therefore
1212+
/// need to remap anything that we are importing via fdw to make sure we
1213+
/// are using this updated schema
11841214
pub fn remap(&self, server: &ForeignServer) -> Result<(), StoreError> {
11851215
if &server.shard == &*PRIMARY_SHARD {
11861216
info!(&self.logger, "Mapping primary");
@@ -1198,6 +1228,15 @@ impl PoolInner {
11981228
}
11991229
Ok(())
12001230
}
1231+
1232+
pub fn needs_remap(&self, server: &ForeignServer) -> Result<bool, StoreError> {
1233+
if &server.shard == &self.shard {
1234+
return Ok(false);
1235+
}
1236+
1237+
let mut conn = self.get()?;
1238+
server.needs_remap(&mut conn)
1239+
}
12011240
}
12021241

12031242
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
@@ -1211,10 +1250,6 @@ impl MigrationCount {
12111250
fn had_migrations(&self) -> bool {
12121251
self.old != self.new
12131252
}
1214-
1215-
fn is_new(&self) -> bool {
1216-
self.old == 0
1217-
}
12181253
}
12191254

12201255
/// Run all schema migrations.
@@ -1334,13 +1369,22 @@ impl PoolCoordinator {
13341369
/// code that does _not_ hold the migration lock as it will otherwise
13351370
/// deadlock
13361371
fn propagate(&self, pool: &PoolInner, count: MigrationCount) -> Result<(), StoreError> {
1337-
// pool is a new shard, map all other shards into it
1338-
if count.is_new() {
1339-
for server in self.servers.iter() {
1372+
// We need to remap all these servers into `pool` if the list of
1373+
// tables that are mapped have changed from the code of the previous
1374+
// version. Since dropping and recreating the foreign table
1375+
// definitions can slow the startup of other nodes down because of
1376+
// locking, we try to only do this when it is actually needed
1377+
for server in self.servers.iter() {
1378+
if pool.needs_remap(server)? {
13401379
pool.remap(server)?;
13411380
}
13421381
}
1343-
// pool had schema changes, refresh the import from pool into all other shards
1382+
1383+
// pool had schema changes, refresh the import from pool into all
1384+
// other shards. This makes sure that schema changes to
1385+
// already-mapped tables are propagated to all other shards. Since
1386+
// we run `propagate` after migrations have been applied to `pool`,
1387+
// we can be sure that these mappings use the correct schema
13441388
if count.had_migrations() {
13451389
let server = self.server(&pool.shard)?;
13461390
for pool in self.pools.lock().unwrap().values() {

0 commit comments

Comments
 (0)