diff --git a/store/postgres/src/dynds/private.rs b/store/postgres/src/dynds/private.rs index e8e7f4ce992..243a7dc5a57 100644 --- a/store/postgres/src/dynds/private.rs +++ b/store/postgres/src/dynds/private.rs @@ -1,8 +1,9 @@ -use std::ops::Bound; +use std::{collections::HashMap, i32, ops::Bound}; use diesel::{ - pg::sql_types, + pg::{sql_types, Pg}, prelude::*, + query_builder::{AstPass, QueryFragment, QueryId}, sql_query, sql_types::{Binary, Bool, Integer, Jsonb, Nullable}, PgConnection, QueryDsl, RunQueryDsl, @@ -16,7 +17,7 @@ use graph::{ prelude::{serde_json, BlockNumber, StoreError}, }; -use crate::primary::Namespace; +use crate::{primary::Namespace, relational_queries::POSTGRES_MAX_PARAMETERS}; type DynTable = diesel_dynamic_schema::Table; type DynColumn = diesel_dynamic_schema::Column; @@ -226,16 +227,12 @@ impl DataSourcesTable { return Ok(count as usize); } - type Tuple = ( - (Bound, Bound), - i32, - Option>, - Option, - i32, - Option, - ); + let manifest_map = + ManifestIdxMap::new(src_manifest_idx_and_name, dst_manifest_idx_and_name); - let src_tuples = self + // Load all data sources that were created up to and including + // `target_block` and transform them ready for insertion + let dss: Vec<_> = self .table .clone() .filter( @@ -250,55 +247,18 @@ impl DataSourcesTable { &self.done_at, )) .order_by(&self.vid) - .load::(conn)?; + .load::(conn)? + .into_iter() + .map(|ds| ds.src_to_dst(target_block, &manifest_map, &self.namespace, &dst.namespace)) + .collect::>()?; + // Split all dss into chunks so that we never use more than + // `POSTGRES_MAX_PARAMETERS` bind variables per chunk + let chunk_size = POSTGRES_MAX_PARAMETERS / CopyDsQuery::BIND_PARAMS; let mut count = 0; - for (block_range, src_manifest_idx, param, context, causality_region, done_at) in src_tuples - { - let name = &src_manifest_idx_and_name - .iter() - .find(|(idx, _)| idx == &src_manifest_idx) - .with_context(|| { - anyhow!( - "the source {} does not have a template with index {}", - self.namespace, - src_manifest_idx - ) - })? - .1; - let dst_manifest_idx = dst_manifest_idx_and_name - .iter() - .find(|(_, n)| n == name) - .with_context(|| { - anyhow!( - "the destination {} is missing a template with name {}. The source {} created one at block {:?}", - dst.namespace, - name, self.namespace, block_range.0 - ) - })? - .0; - - let query = format!( - "\ - insert into {dst}(block_range, manifest_idx, param, context, causality_region, done_at) - values(case - when upper($2) <= $1 then $2 - else int4range(lower($2), null) - end, - $3, $4, $5, $6, $7) - ", - dst = dst.qname - ); - - count += sql_query(query) - .bind::(target_block) - .bind::, _>(block_range) - .bind::(dst_manifest_idx) - .bind::, _>(param) - .bind::, _>(context) - .bind::(causality_region) - .bind::, _>(done_at) - .execute(conn)?; + for chunk in dss.chunks(chunk_size) { + let query = CopyDsQuery::new(dst, chunk)?; + count += query.execute(conn)?; } // If the manifest idxes remained constant, we can test that both tables have the same @@ -361,3 +321,141 @@ impl DataSourcesTable { .optional()?) } } + +/// Map src manifest indexes to dst manifest indexes. If the +/// destination is missing an entry, put `None` as the value for the +/// source index +struct ManifestIdxMap { + map: HashMap, String)>, +} + +impl ManifestIdxMap { + fn new(src: &[(i32, String)], dst: &[(i32, String)]) -> Self { + let dst_idx_map: HashMap<&String, i32> = + HashMap::from_iter(dst.iter().map(|(idx, name)| (name, *idx))); + let map = src + .iter() + .map(|(src_idx, src_name)| { + ( + *src_idx, + (dst_idx_map.get(src_name).copied(), src_name.to_string()), + ) + }) + .collect(); + ManifestIdxMap { map } + } + + fn dst_idx( + &self, + src_idx: i32, + src_nsp: &Namespace, + src_created: BlockNumber, + dst_nsp: &Namespace, + ) -> Result { + let (dst_idx, name) = self.map.get(&src_idx).with_context(|| { + anyhow!( + "the source {src_nsp} does not have a template with \ + index {src_idx} but created one at block {src_created}" + ) + })?; + let dst_idx = dst_idx.with_context(|| { + anyhow!( + "the destination {dst_nsp} is missing a template with \ + name {name}. The source {src_nsp} created one at block {src_created}" + ) + })?; + Ok(dst_idx) + } +} + +#[derive(Queryable)] +struct DsForCopy { + block_range: (Bound, Bound), + idx: i32, + param: Option>, + context: Option, + causality_region: i32, + done_at: Option, +} + +impl DsForCopy { + fn src_to_dst( + mut self, + target_block: BlockNumber, + map: &ManifestIdxMap, + src_nsp: &Namespace, + dst_nsp: &Namespace, + ) -> Result { + // unclamp block range if it ends beyond target block + match self.block_range.1 { + Bound::Included(block) if block > target_block => self.block_range.1 = Bound::Unbounded, + Bound::Excluded(block) if block - 1 > target_block => { + self.block_range.1 = Bound::Unbounded + } + _ => { /* use block range as is */ } + } + // Translate manifest index + let src_created = match self.block_range.0 { + Bound::Included(block) => block, + Bound::Excluded(block) => block + 1, + Bound::Unbounded => 0, + }; + self.idx = map.dst_idx(self.idx, src_nsp, src_created, dst_nsp)?; + Ok(self) + } +} + +struct CopyDsQuery<'a> { + dst: &'a DataSourcesTable, + dss: &'a [DsForCopy], +} + +impl<'a> CopyDsQuery<'a> { + const BIND_PARAMS: usize = 6; + + fn new(dst: &'a DataSourcesTable, dss: &'a [DsForCopy]) -> Result { + Ok(CopyDsQuery { dst, dss }) + } +} + +impl<'a> QueryFragment for CopyDsQuery<'a> { + fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> { + out.unsafe_to_cache_prepared(); + out.push_sql("insert into "); + out.push_sql(&self.dst.qname); + out.push_sql( + "(block_range, manifest_idx, param, context, causality_region, done_at) values ", + ); + let mut first = true; + for ds in self.dss.iter() { + if first { + first = false; + } else { + out.push_sql(", "); + } + out.push_sql("("); + out.push_bind_param::, _>(&ds.block_range)?; + out.push_sql(", "); + out.push_bind_param::(&ds.idx)?; + out.push_sql(", "); + out.push_bind_param::, _>(&ds.param)?; + out.push_sql(", "); + out.push_bind_param::, _>(&ds.context)?; + out.push_sql(", "); + out.push_bind_param::(&ds.causality_region)?; + out.push_sql(", "); + out.push_bind_param::, _>(&ds.done_at)?; + out.push_sql(")"); + } + + Ok(()) + } +} + +impl<'a> QueryId for CopyDsQuery<'a> { + type QueryId = (); + + const HAS_STATIC_QUERY_ID: bool = false; +} + +impl<'a, Conn> RunQueryDsl for CopyDsQuery<'a> {} diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index c6567c5d4f7..028f6044c34 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -53,7 +53,7 @@ use crate::{ const BASE_SQL_COLUMNS: [&str; 2] = ["id", "vid"]; /// The maximum number of bind variables that can be used in a query -const POSTGRES_MAX_PARAMETERS: usize = u16::MAX as usize; // 65535 +pub(crate) const POSTGRES_MAX_PARAMETERS: usize = u16::MAX as usize; // 65535 const SORT_KEY_COLUMN: &str = "sort_key$";