Skip to content

Commit 37638fb

Browse files
committed
graph, store: lenient write-batch enforcement for skip_duplicates
Modify RowGroup::append_row() so that when immutable=true and skip_duplicates=true, cross-block duplicate inserts and Overwrite/Remove operations log warnings and return Ok(()) instead of failing. Same-block duplicates remain allowed. Default behavior (skip_duplicates=false) is preserved exactly. Added Logger field to RowGroup/RowGroups with CacheWeight impl, threaded through all construction sites. 5 unit tests covering all scenarios.
1 parent 3a189e9 commit 37638fb

6 files changed

Lines changed: 170 additions & 12 deletions

File tree

graph/examples/append_row.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,12 @@ pub fn main() -> anyhow::Result<()> {
104104
};
105105
mods.push(md);
106106
}
107-
let mut group = RowGroup::new(THING_TYPE.clone(), false, false);
107+
let mut group = RowGroup::new(
108+
THING_TYPE.clone(),
109+
false,
110+
false,
111+
slog::Logger::root(slog::Discard, slog::o!()),
112+
);
108113

109114
let start = Instant::now();
110115
for md in mods {

graph/src/components/store/write.rs

Lines changed: 133 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ use std::{
44
sync::Arc,
55
};
66

7+
use slog::{warn, Logger};
8+
79
use crate::{
810
blockchain::{block_stream::FirehoseCursor, BlockPtr, BlockTime},
911
cheap_clone::CheapClone,
@@ -345,19 +347,26 @@ pub struct RowGroup {
345347

346348
immutable: bool,
347349
skip_duplicates: bool,
350+
logger: Logger,
348351

349352
/// Map the `key.entity_id` of all entries in `rows` to the index with
350353
/// the most recent entry for that id to speed up lookups.
351354
last_mod: LastMod,
352355
}
353356

354357
impl RowGroup {
355-
pub fn new(entity_type: EntityType, immutable: bool, skip_duplicates: bool) -> Self {
358+
pub fn new(
359+
entity_type: EntityType,
360+
immutable: bool,
361+
skip_duplicates: bool,
362+
logger: Logger,
363+
) -> Self {
356364
Self {
357365
entity_type,
358366
rows: Vec::new(),
359367
immutable,
360368
skip_duplicates,
369+
logger,
361370
last_mod: LastMod::new(),
362371
}
363372
}
@@ -490,6 +499,13 @@ impl RowGroup {
490499
.and_then(|&idx| self.rows.get(idx))
491500
{
492501
Some(prev) if prev.block() != row.block() => {
502+
if self.skip_duplicates {
503+
warn!(self.logger, "Skipping duplicate insert for immutable entity";
504+
"entity" => row.key().to_string(),
505+
"block" => row.block(),
506+
"previous_block" => prev.block());
507+
return Ok(());
508+
}
493509
return Err(StoreError::Input(
494510
format!("entity {} is immutable; inserting it at block {} is not possible as it was already inserted at block {}",
495511
row.key(), row.block(), prev.block())));
@@ -500,6 +516,12 @@ impl RowGroup {
500516
self.push_row(row);
501517
}
502518
EntityModification::Overwrite { .. } | EntityModification::Remove { .. } => {
519+
if self.skip_duplicates {
520+
warn!(self.logger, "Skipping unsupported operation for immutable entity";
521+
"entity_type" => self.entity_type.to_string(),
522+
"operation" => format!("{:?}", row));
523+
return Ok(());
524+
}
503525
return Err(internal_error!(
504526
"immutable entity type {} only allows inserts, not {:?}",
505527
self.entity_type,
@@ -606,8 +628,18 @@ impl RowGroup {
606628
pub struct RowGroupForPerfTest(RowGroup);
607629

608630
impl RowGroupForPerfTest {
609-
pub fn new(entity_type: EntityType, immutable: bool, skip_duplicates: bool) -> Self {
610-
Self(RowGroup::new(entity_type, immutable, skip_duplicates))
631+
pub fn new(
632+
entity_type: EntityType,
633+
immutable: bool,
634+
skip_duplicates: bool,
635+
logger: Logger,
636+
) -> Self {
637+
Self(RowGroup::new(
638+
entity_type,
639+
immutable,
640+
skip_duplicates,
641+
logger,
642+
))
611643
}
612644

613645
pub fn push(&mut self, emod: EntityModification, block: BlockNumber) -> Result<(), StoreError> {
@@ -663,11 +695,15 @@ impl<'a> Iterator for ClampsByBlockIterator<'a> {
663695
#[derive(Debug, CacheWeight)]
664696
pub struct RowGroups {
665697
pub groups: Vec<RowGroup>,
698+
logger: Logger,
666699
}
667700

668701
impl RowGroups {
669-
fn new() -> Self {
670-
Self { groups: Vec::new() }
702+
fn new(logger: Logger) -> Self {
703+
Self {
704+
groups: Vec::new(),
705+
logger,
706+
}
671707
}
672708

673709
fn group(&self, entity_type: &EntityType) -> Option<&RowGroup> {
@@ -692,6 +728,7 @@ impl RowGroups {
692728
entity_type.clone(),
693729
immutable,
694730
skip_duplicates,
731+
self.logger.clone(),
695732
));
696733
// unwrap: we just pushed an entry
697734
self.groups.last_mut().unwrap()
@@ -790,6 +827,7 @@ impl Batch {
790827
deterministic_errors: Vec<SubgraphError>,
791828
offchain_to_remove: Vec<StoredDynamicDataSource>,
792829
is_non_fatal_errors_active: bool,
830+
logger: Logger,
793831
) -> Result<Self, StoreError> {
794832
let block = block_ptr.number;
795833

@@ -803,7 +841,7 @@ impl Batch {
803841
EntityModification::Remove { .. } => 0,
804842
});
805843

806-
let mut mods = RowGroups::new();
844+
let mut mods = RowGroups::new(logger);
807845

808846
for m in raw_mods {
809847
mods.group_entry(&m.key().entity_type).push(m, block)?;
@@ -1054,6 +1092,8 @@ mod test {
10541092
};
10551093
use lazy_static::lazy_static;
10561094

1095+
use slog::Logger;
1096+
10571097
use super::{LastMod, RowGroup};
10581098

10591099
#[track_caller]
@@ -1087,6 +1127,7 @@ mod test {
10871127
rows,
10881128
immutable: false,
10891129
skip_duplicates: false,
1130+
logger: Logger::root(slog::Discard, slog::o!()),
10901131
last_mod,
10911132
};
10921133
let act = group
@@ -1127,6 +1168,8 @@ mod test {
11271168
type Thing @entity { id: ID!, count: Int! }
11281169
type RowGroup @entity { id: ID! }
11291170
type Entry @entity { id: ID! }
1171+
type ImmThing @entity(immutable: true) { id: ID!, count: Int! }
1172+
type SkipDupThing @entity(immutable: true, skipDuplicates: true) { id: ID!, count: Int! }
11301173
"#;
11311174
lazy_static! {
11321175
static ref DEPLOYMENT: DeploymentHash = DeploymentHash::new("batchAppend").unwrap();
@@ -1135,6 +1178,8 @@ mod test {
11351178
static ref THING_TYPE: EntityType = SCHEMA.entity_type("Thing").unwrap();
11361179
static ref ROW_GROUP_TYPE: EntityType = SCHEMA.entity_type("RowGroup").unwrap();
11371180
static ref ENTRY_TYPE: EntityType = SCHEMA.entity_type("Entry").unwrap();
1181+
static ref IMM_THING_TYPE: EntityType = SCHEMA.entity_type("ImmThing").unwrap();
1182+
static ref SKIP_DUP_THING_TYPE: EntityType = SCHEMA.entity_type("SkipDupThing").unwrap();
11381183
}
11391184

11401185
/// Convenient notation for changes to a fixed entity
@@ -1194,7 +1239,12 @@ mod test {
11941239
impl Group {
11951240
fn new() -> Self {
11961241
Self {
1197-
group: RowGroup::new(THING_TYPE.clone(), false, false),
1242+
group: RowGroup::new(
1243+
THING_TYPE.clone(),
1244+
false,
1245+
false,
1246+
Logger::root(slog::Discard, slog::o!()),
1247+
),
11981248
}
11991249
}
12001250

@@ -1299,4 +1349,80 @@ mod test {
12991349
let op = group.last_op(&key, 0);
13001350
assert_eq!(None, op);
13011351
}
1352+
1353+
fn make_insert(entity_type: &EntityType, id: &str, block: BlockNumber) -> EntityModification {
1354+
EntityModification::Insert {
1355+
key: entity_type.parse_key(id).unwrap(),
1356+
data: Arc::new(entity! { SCHEMA => id: id, count: block }),
1357+
block,
1358+
end: None,
1359+
}
1360+
}
1361+
1362+
fn make_overwrite(
1363+
entity_type: &EntityType,
1364+
id: &str,
1365+
block: BlockNumber,
1366+
) -> EntityModification {
1367+
EntityModification::Overwrite {
1368+
key: entity_type.parse_key(id).unwrap(),
1369+
data: Arc::new(entity! { SCHEMA => id: id, count: block }),
1370+
block,
1371+
end: None,
1372+
}
1373+
}
1374+
1375+
fn make_remove(entity_type: &EntityType, id: &str, block: BlockNumber) -> EntityModification {
1376+
EntityModification::Remove {
1377+
key: entity_type.parse_key(id).unwrap(),
1378+
block,
1379+
}
1380+
}
1381+
1382+
fn discard_logger() -> Logger {
1383+
Logger::root(slog::Discard, slog::o!())
1384+
}
1385+
1386+
#[test]
1387+
fn append_row_immutable_default_rejects_cross_block_duplicate() {
1388+
let mut group = RowGroup::new(IMM_THING_TYPE.clone(), true, false, discard_logger());
1389+
let res = group.push(make_insert(&IMM_THING_TYPE, "one", 1), 1);
1390+
assert!(res.is_ok());
1391+
let res = group.push(make_insert(&IMM_THING_TYPE, "one", 2), 2);
1392+
assert!(res.is_err());
1393+
}
1394+
1395+
#[test]
1396+
fn append_row_skip_duplicates_allows_cross_block_duplicate() {
1397+
let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone(), true, true, discard_logger());
1398+
let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 1), 1);
1399+
assert!(res.is_ok());
1400+
let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 2), 2);
1401+
assert!(res.is_ok());
1402+
assert_eq!(group.row_count(), 1);
1403+
}
1404+
1405+
#[test]
1406+
fn append_row_skip_duplicates_allows_overwrite() {
1407+
let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone(), true, true, discard_logger());
1408+
let res = group.append_row(make_overwrite(&SKIP_DUP_THING_TYPE, "one", 1));
1409+
assert!(res.is_ok());
1410+
}
1411+
1412+
#[test]
1413+
fn append_row_skip_duplicates_allows_remove() {
1414+
let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone(), true, true, discard_logger());
1415+
let res = group.append_row(make_remove(&SKIP_DUP_THING_TYPE, "one", 1));
1416+
assert!(res.is_ok());
1417+
}
1418+
1419+
#[test]
1420+
fn append_row_skip_duplicates_same_block_still_pushes() {
1421+
let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone(), true, true, discard_logger());
1422+
let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 1), 1);
1423+
assert!(res.is_ok());
1424+
let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 1), 1);
1425+
assert!(res.is_ok());
1426+
assert_eq!(group.row_count(), 2);
1427+
}
13021428
}

graph/src/util/cache_weight.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,12 @@ impl CacheWeight for EntityType {
188188
}
189189
}
190190

191+
impl CacheWeight for slog::Logger {
192+
fn indirect_weight(&self) -> usize {
193+
0
194+
}
195+
}
196+
191197
impl CacheWeight for [u8; 32] {
192198
fn indirect_weight(&self) -> usize {
193199
0

store/postgres/src/writable.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1839,6 +1839,7 @@ impl WritableStoreTrait for WritableStore {
18391839
deterministic_errors,
18401840
processed_data_sources,
18411841
is_non_fatal_errors_active,
1842+
self.store.logger.clone(),
18421843
)?;
18431844
self.writer.write(batch, stopwatch).await?;
18441845

store/test-store/tests/postgres/relational.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -996,7 +996,12 @@ async fn conflicting_entity() {
996996
let fred = entity! { layout.input_schema => id: id.clone(), name: id.clone() };
997997
let fred = Arc::new(fred);
998998
let types: Vec<_> = types.into_iter().cloned().collect();
999-
let mut group = RowGroup::new(entity_type.clone(), false, false);
999+
let mut group = RowGroup::new(
1000+
entity_type.clone(),
1001+
false,
1002+
false,
1003+
slog::Logger::root(slog::Discard, slog::o!()),
1004+
);
10001005
group
10011006
.push(
10021007
EntityModification::Insert {

store/test-store/tests/postgres/relational_bytes.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,12 @@ pub fn row_group_update(
8585
block: BlockNumber,
8686
data: impl IntoIterator<Item = (EntityKey, Entity)>,
8787
) -> RowGroup {
88-
let mut group = RowGroup::new(entity_type.clone(), false, false);
88+
let mut group = RowGroup::new(
89+
entity_type.clone(),
90+
false,
91+
false,
92+
slog::Logger::root(slog::Discard, slog::o!()),
93+
);
8994
for (key, data) in data {
9095
group
9196
.push(EntityModification::overwrite(key, data, block), block)
@@ -99,7 +104,12 @@ pub fn row_group_insert(
99104
block: BlockNumber,
100105
data: impl IntoIterator<Item = (EntityKey, Entity)>,
101106
) -> RowGroup {
102-
let mut group = RowGroup::new(entity_type.clone(), false, false);
107+
let mut group = RowGroup::new(
108+
entity_type.clone(),
109+
false,
110+
false,
111+
slog::Logger::root(slog::Discard, slog::o!()),
112+
);
103113
for (key, data) in data {
104114
group
105115
.push(EntityModification::insert(key, data, block), block)
@@ -113,7 +123,12 @@ pub fn row_group_delete(
113123
block: BlockNumber,
114124
data: impl IntoIterator<Item = EntityKey>,
115125
) -> RowGroup {
116-
let mut group = RowGroup::new(entity_type.clone(), false, false);
126+
let mut group = RowGroup::new(
127+
entity_type.clone(),
128+
false,
129+
false,
130+
slog::Logger::root(slog::Discard, slog::o!()),
131+
);
117132
for key in data {
118133
group
119134
.push(EntityModification::remove(key, block), block)

0 commit comments

Comments
 (0)