@@ -4,6 +4,8 @@ use std::{
44 sync:: Arc ,
55} ;
66
7+ use slog:: { warn, Logger } ;
8+
79use crate :: {
810 blockchain:: { block_stream:: FirehoseCursor , BlockPtr , BlockTime } ,
911 cheap_clone:: CheapClone ,
@@ -345,19 +347,26 @@ pub struct RowGroup {
345347
346348 immutable : bool ,
347349 skip_duplicates : bool ,
350+ logger : Logger ,
348351
349352 /// Map the `key.entity_id` of all entries in `rows` to the index with
350353 /// the most recent entry for that id to speed up lookups.
351354 last_mod : LastMod ,
352355}
353356
354357impl RowGroup {
355- pub fn new ( entity_type : EntityType , immutable : bool , skip_duplicates : bool ) -> Self {
358+ pub fn new (
359+ entity_type : EntityType ,
360+ immutable : bool ,
361+ skip_duplicates : bool ,
362+ logger : Logger ,
363+ ) -> Self {
356364 Self {
357365 entity_type,
358366 rows : Vec :: new ( ) ,
359367 immutable,
360368 skip_duplicates,
369+ logger,
361370 last_mod : LastMod :: new ( ) ,
362371 }
363372 }
@@ -490,6 +499,13 @@ impl RowGroup {
490499 . and_then ( |& idx| self . rows . get ( idx) )
491500 {
492501 Some ( prev) if prev. block ( ) != row. block ( ) => {
502+ if self . skip_duplicates {
503+ warn ! ( self . logger, "Skipping duplicate insert for immutable entity" ;
504+ "entity" => row. key( ) . to_string( ) ,
505+ "block" => row. block( ) ,
506+ "previous_block" => prev. block( ) ) ;
507+ return Ok ( ( ) ) ;
508+ }
493509 return Err ( StoreError :: Input (
494510 format ! ( "entity {} is immutable; inserting it at block {} is not possible as it was already inserted at block {}" ,
495511 row. key( ) , row. block( ) , prev. block( ) ) ) ) ;
@@ -500,6 +516,12 @@ impl RowGroup {
500516 self . push_row ( row) ;
501517 }
502518 EntityModification :: Overwrite { .. } | EntityModification :: Remove { .. } => {
519+ if self . skip_duplicates {
520+ warn ! ( self . logger, "Skipping unsupported operation for immutable entity" ;
521+ "entity_type" => self . entity_type. to_string( ) ,
522+ "operation" => format!( "{:?}" , row) ) ;
523+ return Ok ( ( ) ) ;
524+ }
503525 return Err ( internal_error ! (
504526 "immutable entity type {} only allows inserts, not {:?}" ,
505527 self . entity_type,
@@ -606,8 +628,18 @@ impl RowGroup {
606628pub struct RowGroupForPerfTest ( RowGroup ) ;
607629
608630impl RowGroupForPerfTest {
609- pub fn new ( entity_type : EntityType , immutable : bool , skip_duplicates : bool ) -> Self {
610- Self ( RowGroup :: new ( entity_type, immutable, skip_duplicates) )
631+ pub fn new (
632+ entity_type : EntityType ,
633+ immutable : bool ,
634+ skip_duplicates : bool ,
635+ logger : Logger ,
636+ ) -> Self {
637+ Self ( RowGroup :: new (
638+ entity_type,
639+ immutable,
640+ skip_duplicates,
641+ logger,
642+ ) )
611643 }
612644
613645 pub fn push ( & mut self , emod : EntityModification , block : BlockNumber ) -> Result < ( ) , StoreError > {
@@ -663,11 +695,15 @@ impl<'a> Iterator for ClampsByBlockIterator<'a> {
663695#[ derive( Debug , CacheWeight ) ]
664696pub struct RowGroups {
665697 pub groups : Vec < RowGroup > ,
698+ logger : Logger ,
666699}
667700
668701impl RowGroups {
669702 fn new ( ) -> Self {
670- Self { groups : Vec :: new ( ) }
703+ Self {
704+ groups : Vec :: new ( ) ,
705+ logger : Logger :: root ( slog:: Discard , slog:: o!( ) ) ,
706+ }
671707 }
672708
673709 fn group ( & self , entity_type : & EntityType ) -> Option < & RowGroup > {
@@ -692,6 +728,7 @@ impl RowGroups {
692728 entity_type. clone ( ) ,
693729 immutable,
694730 skip_duplicates,
731+ self . logger . clone ( ) ,
695732 ) ) ;
696733 // unwrap: we just pushed an entry
697734 self . groups . last_mut ( ) . unwrap ( )
@@ -1054,6 +1091,8 @@ mod test {
10541091 } ;
10551092 use lazy_static:: lazy_static;
10561093
1094+ use slog:: Logger ;
1095+
10571096 use super :: { LastMod , RowGroup } ;
10581097
10591098 #[ track_caller]
@@ -1087,6 +1126,7 @@ mod test {
10871126 rows,
10881127 immutable : false ,
10891128 skip_duplicates : false ,
1129+ logger : Logger :: root ( slog:: Discard , slog:: o!( ) ) ,
10901130 last_mod,
10911131 } ;
10921132 let act = group
@@ -1127,6 +1167,8 @@ mod test {
11271167 type Thing @entity { id: ID!, count: Int! }
11281168 type RowGroup @entity { id: ID! }
11291169 type Entry @entity { id: ID! }
1170+ type ImmThing @entity(immutable: true) { id: ID!, count: Int! }
1171+ type SkipDupThing @entity(immutable: true, skipDuplicates: true) { id: ID!, count: Int! }
11301172 "# ;
11311173 lazy_static ! {
11321174 static ref DEPLOYMENT : DeploymentHash = DeploymentHash :: new( "batchAppend" ) . unwrap( ) ;
@@ -1135,6 +1177,8 @@ mod test {
11351177 static ref THING_TYPE : EntityType = SCHEMA . entity_type( "Thing" ) . unwrap( ) ;
11361178 static ref ROW_GROUP_TYPE : EntityType = SCHEMA . entity_type( "RowGroup" ) . unwrap( ) ;
11371179 static ref ENTRY_TYPE : EntityType = SCHEMA . entity_type( "Entry" ) . unwrap( ) ;
1180+ static ref IMM_THING_TYPE : EntityType = SCHEMA . entity_type( "ImmThing" ) . unwrap( ) ;
1181+ static ref SKIP_DUP_THING_TYPE : EntityType = SCHEMA . entity_type( "SkipDupThing" ) . unwrap( ) ;
11381182 }
11391183
11401184 /// Convenient notation for changes to a fixed entity
@@ -1194,7 +1238,12 @@ mod test {
11941238 impl Group {
11951239 fn new ( ) -> Self {
11961240 Self {
1197- group : RowGroup :: new ( THING_TYPE . clone ( ) , false , false ) ,
1241+ group : RowGroup :: new (
1242+ THING_TYPE . clone ( ) ,
1243+ false ,
1244+ false ,
1245+ Logger :: root ( slog:: Discard , slog:: o!( ) ) ,
1246+ ) ,
11981247 }
11991248 }
12001249
@@ -1299,4 +1348,80 @@ mod test {
12991348 let op = group. last_op ( & key, 0 ) ;
13001349 assert_eq ! ( None , op) ;
13011350 }
1351+
1352+ fn make_insert ( entity_type : & EntityType , id : & str , block : BlockNumber ) -> EntityModification {
1353+ EntityModification :: Insert {
1354+ key : entity_type. parse_key ( id) . unwrap ( ) ,
1355+ data : Arc :: new ( entity ! { SCHEMA => id: id, count: block } ) ,
1356+ block,
1357+ end : None ,
1358+ }
1359+ }
1360+
1361+ fn make_overwrite (
1362+ entity_type : & EntityType ,
1363+ id : & str ,
1364+ block : BlockNumber ,
1365+ ) -> EntityModification {
1366+ EntityModification :: Overwrite {
1367+ key : entity_type. parse_key ( id) . unwrap ( ) ,
1368+ data : Arc :: new ( entity ! { SCHEMA => id: id, count: block } ) ,
1369+ block,
1370+ end : None ,
1371+ }
1372+ }
1373+
1374+ fn make_remove ( entity_type : & EntityType , id : & str , block : BlockNumber ) -> EntityModification {
1375+ EntityModification :: Remove {
1376+ key : entity_type. parse_key ( id) . unwrap ( ) ,
1377+ block,
1378+ }
1379+ }
1380+
1381+ fn discard_logger ( ) -> Logger {
1382+ Logger :: root ( slog:: Discard , slog:: o!( ) )
1383+ }
1384+
1385+ #[ test]
1386+ fn append_row_immutable_default_rejects_cross_block_duplicate ( ) {
1387+ let mut group = RowGroup :: new ( IMM_THING_TYPE . clone ( ) , true , false , discard_logger ( ) ) ;
1388+ let res = group. push ( make_insert ( & IMM_THING_TYPE , "one" , 1 ) , 1 ) ;
1389+ assert ! ( res. is_ok( ) ) ;
1390+ let res = group. push ( make_insert ( & IMM_THING_TYPE , "one" , 2 ) , 2 ) ;
1391+ assert ! ( res. is_err( ) ) ;
1392+ }
1393+
1394+ #[ test]
1395+ fn append_row_skip_duplicates_allows_cross_block_duplicate ( ) {
1396+ let mut group = RowGroup :: new ( SKIP_DUP_THING_TYPE . clone ( ) , true , true , discard_logger ( ) ) ;
1397+ let res = group. push ( make_insert ( & SKIP_DUP_THING_TYPE , "one" , 1 ) , 1 ) ;
1398+ assert ! ( res. is_ok( ) ) ;
1399+ let res = group. push ( make_insert ( & SKIP_DUP_THING_TYPE , "one" , 2 ) , 2 ) ;
1400+ assert ! ( res. is_ok( ) ) ;
1401+ assert_eq ! ( group. row_count( ) , 1 ) ;
1402+ }
1403+
1404+ #[ test]
1405+ fn append_row_skip_duplicates_allows_overwrite ( ) {
1406+ let mut group = RowGroup :: new ( SKIP_DUP_THING_TYPE . clone ( ) , true , true , discard_logger ( ) ) ;
1407+ let res = group. append_row ( make_overwrite ( & SKIP_DUP_THING_TYPE , "one" , 1 ) ) ;
1408+ assert ! ( res. is_ok( ) ) ;
1409+ }
1410+
1411+ #[ test]
1412+ fn append_row_skip_duplicates_allows_remove ( ) {
1413+ let mut group = RowGroup :: new ( SKIP_DUP_THING_TYPE . clone ( ) , true , true , discard_logger ( ) ) ;
1414+ let res = group. append_row ( make_remove ( & SKIP_DUP_THING_TYPE , "one" , 1 ) ) ;
1415+ assert ! ( res. is_ok( ) ) ;
1416+ }
1417+
1418+ #[ test]
1419+ fn append_row_skip_duplicates_same_block_still_pushes ( ) {
1420+ let mut group = RowGroup :: new ( SKIP_DUP_THING_TYPE . clone ( ) , true , true , discard_logger ( ) ) ;
1421+ let res = group. push ( make_insert ( & SKIP_DUP_THING_TYPE , "one" , 1 ) , 1 ) ;
1422+ assert ! ( res. is_ok( ) ) ;
1423+ let res = group. push ( make_insert ( & SKIP_DUP_THING_TYPE , "one" , 1 ) , 1 ) ;
1424+ assert ! ( res. is_ok( ) ) ;
1425+ assert_eq ! ( group. row_count( ) , 2 ) ;
1426+ }
13021427}
0 commit comments