Skip to content

Commit d0d812f

Browse files
committed
store: lenient store-layer enforcement for skip_duplicates
Added logger parameter to Layout::update() and Layout::delete() in relational.rs. When table.immutable && table.skip_duplicates, these methods now log a warning and return Ok(0) instead of returning an error. Default immutable behavior (skip_duplicates=false) is preserved. Updated all callers including deployment_store.rs and test files. Added 4 unit tests with SkipDupMink entity type to verify both skip_duplicates and default immutable behavior.
1 parent 7b0b0cc commit d0d812f

4 files changed

Lines changed: 133 additions & 11 deletions

File tree

store/postgres/src/deployment_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ impl DeploymentStore {
360360
// Clamp entities before inserting them to avoid having versions
361361
// with overlapping block ranges
362362
let section = stopwatch.start_section("apply_entity_modifications_delete");
363-
layout.delete(conn, group, stopwatch).await?;
363+
layout.delete(logger, conn, group, stopwatch).await?;
364364
section.end();
365365

366366
let section = stopwatch.start_section("check_interface_entity_uniqueness");

store/postgres/src/relational.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -944,12 +944,20 @@ impl Layout {
944944

945945
pub async fn update<'a>(
946946
&'a self,
947+
logger: &Logger,
947948
conn: &mut AsyncPgConnection,
948949
group: &'a RowGroup,
949950
stopwatch: &StopwatchMetrics,
950951
) -> Result<usize, StoreError> {
951952
let table = self.table_for_entity(&group.entity_type)?;
952953
if table.immutable && group.has_clamps() {
954+
if table.skip_duplicates {
955+
let ids = group.ids().join(", ");
956+
warn!(logger, "Skipping immutable entity update in store layer";
957+
"entity_type" => group.entity_type.to_string(),
958+
"ids" => ids);
959+
return Ok(0);
960+
}
953961
let ids = group
954962
.ids()
955963
.map(|id| id.to_string())
@@ -991,6 +999,7 @@ impl Layout {
991999

9921000
pub async fn delete(
9931001
&self,
1002+
logger: &Logger,
9941003
conn: &mut AsyncPgConnection,
9951004
group: &RowGroup,
9961005
stopwatch: &StopwatchMetrics,
@@ -1015,6 +1024,13 @@ impl Layout {
10151024

10161025
let table = self.table_for_entity(&group.entity_type)?;
10171026
if table.immutable {
1027+
if table.skip_duplicates {
1028+
let ids = group.ids().join(", ");
1029+
warn!(logger, "Skipping immutable entity delete in store layer";
1030+
"entity_type" => group.entity_type.to_string(),
1031+
"ids" => ids);
1032+
return Ok(0);
1033+
}
10181034
return Err(internal_error!(
10191035
"entities of type `{}` can not be deleted since they are immutable. Entity ids are [{}]",
10201036
table.object, group.ids().join(", ")

store/test-store/tests/postgres/relational.rs

Lines changed: 113 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@ const THINGS_GQL: &str = r#"
128128
order: Int,
129129
}
130130
131+
type SkipDupMink @entity(immutable: true, skipDuplicates: true) {
132+
id: ID!,
133+
order: Int,
134+
}
135+
131136
type User @entity {
132137
id: ID!,
133138
name: String!,
@@ -228,6 +233,7 @@ lazy_static! {
228233
static ref CAT_TYPE: EntityType = THINGS_SCHEMA.entity_type("Cat").unwrap();
229234
static ref FERRET_TYPE: EntityType = THINGS_SCHEMA.entity_type("Ferret").unwrap();
230235
static ref MINK_TYPE: EntityType = THINGS_SCHEMA.entity_type("Mink").unwrap();
236+
static ref SKIP_DUP_MINK_TYPE: EntityType = THINGS_SCHEMA.entity_type("SkipDupMink").unwrap();
231237
static ref CHAIR_TYPE: EntityType = THINGS_SCHEMA.entity_type("Chair").unwrap();
232238
static ref NULLABLE_STRINGS_TYPE: EntityType =
233239
THINGS_SCHEMA.entity_type("NullableStrings").unwrap();
@@ -315,7 +321,7 @@ async fn update_entity_at(
315321
);
316322
let group = row_group_update(entity_type, block, entities_with_keys_owned.clone());
317323
let updated = layout
318-
.update(conn, &group, &MOCK_STOPWATCH)
324+
.update(&LOGGER, conn, &group, &MOCK_STOPWATCH)
319325
.await
320326
.expect(&errmsg);
321327
assert_eq!(updated, entities_with_keys_owned.len());
@@ -643,7 +649,7 @@ async fn update() {
643649
let entities = vec![(key, entity.clone())];
644650
let group = row_group_update(&entity_type, 0, entities);
645651
layout
646-
.update(conn, &group, &MOCK_STOPWATCH)
652+
.update(&LOGGER, conn, &group, &MOCK_STOPWATCH)
647653
.await
648654
.expect("Failed to update");
649655

@@ -708,7 +714,7 @@ async fn update_many() {
708714
let entities: Vec<_> = keys.into_iter().zip(entities_vec.into_iter()).collect();
709715
let group = row_group_update(&entity_type, 0, entities);
710716
layout
711-
.update(conn, &group, &MOCK_STOPWATCH)
717+
.update(&LOGGER, conn, &group, &MOCK_STOPWATCH)
712718
.await
713719
.expect("Failed to update");
714720

@@ -780,7 +786,7 @@ async fn serialize_bigdecimal() {
780786
let entities = vec![(key, entity.clone())];
781787
let group = row_group_update(&entity_type, 0, entities);
782788
layout
783-
.update(conn, &group, &MOCK_STOPWATCH)
789+
.update(&LOGGER, conn, &group, &MOCK_STOPWATCH)
784790
.await
785791
.expect("Failed to update");
786792

@@ -870,7 +876,7 @@ async fn delete() {
870876
let mut entity_keys = vec![key];
871877
let group = row_group_delete(&entity_type, 1, entity_keys.clone());
872878
let count = layout
873-
.delete(conn, &group, &MOCK_STOPWATCH)
879+
.delete(&LOGGER, conn, &group, &MOCK_STOPWATCH)
874880
.await
875881
.expect("Failed to delete");
876882
assert_eq!(0, count);
@@ -884,7 +890,7 @@ async fn delete() {
884890

885891
let group = row_group_delete(&entity_type, 1, entity_keys);
886892
let count = layout
887-
.delete(conn, &group, &MOCK_STOPWATCH)
893+
.delete(&LOGGER, conn, &group, &MOCK_STOPWATCH)
888894
.await
889895
.expect("Failed to delete");
890896
assert_eq!(1, count);
@@ -915,7 +921,7 @@ async fn insert_many_and_delete_many() {
915921
.collect();
916922
let group = row_group_delete(&SCALAR_TYPE, 1, entity_keys);
917923
let num_removed = layout
918-
.delete(conn, &group, &MOCK_STOPWATCH)
924+
.delete(&LOGGER, conn, &group, &MOCK_STOPWATCH)
919925
.await
920926
.expect("Failed to delete");
921927
assert_eq!(2, num_removed);
@@ -2131,3 +2137,103 @@ async fn check_filters() {
21312137
})
21322138
.await;
21332139
}
2140+
2141+
/// Create a RowGroup with Overwrite modifications for testing store-layer
2142+
/// immutability enforcement. Uses immutable=false on the RowGroup to bypass
2143+
/// write-batch validation (the store layer checks the Table's immutability).
2144+
fn store_layer_row_group_update(
2145+
entity_type: &EntityType,
2146+
block: BlockNumber,
2147+
data: impl IntoIterator<Item = (EntityKey, Entity)>,
2148+
) -> RowGroup {
2149+
let mut group = RowGroup::new(
2150+
entity_type.clone(),
2151+
false,
2152+
false,
2153+
slog::Logger::root(slog::Discard, slog::o!()),
2154+
);
2155+
for (key, data) in data {
2156+
group
2157+
.push(EntityModification::overwrite(key, data, block), block)
2158+
.unwrap();
2159+
}
2160+
group
2161+
}
2162+
2163+
/// Create a RowGroup with Remove modifications for testing store-layer
2164+
/// immutability enforcement. Uses immutable=false on the RowGroup to bypass
2165+
/// write-batch validation (the store layer checks the Table's immutability).
2166+
fn store_layer_row_group_delete(
2167+
entity_type: &EntityType,
2168+
block: BlockNumber,
2169+
data: impl IntoIterator<Item = EntityKey>,
2170+
) -> RowGroup {
2171+
let mut group = RowGroup::new(
2172+
entity_type.clone(),
2173+
false,
2174+
false,
2175+
slog::Logger::root(slog::Discard, slog::o!()),
2176+
);
2177+
for key in data {
2178+
group
2179+
.push(EntityModification::remove(key, block), block)
2180+
.unwrap();
2181+
}
2182+
group
2183+
}
2184+
2185+
#[graph::test]
2186+
async fn skip_duplicates_update_returns_ok() {
2187+
run_test(async |conn, layout| {
2188+
let entity = entity! { layout.input_schema =>
2189+
id: "sd1",
2190+
order: 1,
2191+
vid: 0i64
2192+
};
2193+
let key = SKIP_DUP_MINK_TYPE.key(entity.id());
2194+
let entities = vec![(key, entity)];
2195+
let group = store_layer_row_group_update(&SKIP_DUP_MINK_TYPE, 1, entities);
2196+
let result = layout.update(&LOGGER, conn, &group, &MOCK_STOPWATCH).await;
2197+
assert_eq!(result.unwrap(), 0);
2198+
})
2199+
.await;
2200+
}
2201+
2202+
#[graph::test]
2203+
async fn skip_duplicates_delete_returns_ok() {
2204+
run_test(async |conn, layout| {
2205+
let key = SKIP_DUP_MINK_TYPE.parse_key("sd1").unwrap();
2206+
let group = store_layer_row_group_delete(&SKIP_DUP_MINK_TYPE, 1, vec![key]);
2207+
let result = layout.delete(&LOGGER, conn, &group, &MOCK_STOPWATCH).await;
2208+
assert_eq!(result.unwrap(), 0);
2209+
})
2210+
.await;
2211+
}
2212+
2213+
#[graph::test]
2214+
async fn default_immutable_update_still_errors() {
2215+
run_test(async |conn, layout| {
2216+
let entity = entity! { layout.input_schema =>
2217+
id: "m1",
2218+
order: 1,
2219+
vid: 0i64
2220+
};
2221+
let key = MINK_TYPE.key(entity.id());
2222+
let entities = vec![(key, entity)];
2223+
let group = store_layer_row_group_update(&MINK_TYPE, 1, entities);
2224+
let result = layout.update(&LOGGER, conn, &group, &MOCK_STOPWATCH).await;
2225+
assert!(result.is_err());
2226+
})
2227+
.await;
2228+
}
2229+
2230+
#[graph::test]
2231+
async fn default_immutable_delete_still_errors() {
2232+
run_test(async |conn, layout| {
2233+
let key = MINK_TYPE.parse_key("m1").unwrap();
2234+
let group = store_layer_row_group_delete(&MINK_TYPE, 1, vec![key]);
2235+
let result = layout.delete(&LOGGER, conn, &group, &MOCK_STOPWATCH).await;
2236+
assert!(result.is_err());
2237+
})
2238+
.await;
2239+
}

store/test-store/tests/postgres/relational_bytes.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ async fn update() {
365365
let entities = vec![(key, entity.clone())];
366366
let group = row_group_update(&entity_type, 1, entities);
367367
layout
368-
.update(conn, &group, &MOCK_STOPWATCH)
368+
.update(&LOGGER, conn, &group, &MOCK_STOPWATCH)
369369
.await
370370
.expect("Failed to update");
371371

@@ -397,7 +397,7 @@ async fn delete() {
397397
let mut entity_keys = vec![key.clone()];
398398
let group = row_group_delete(&entity_type, 1, entity_keys.clone());
399399
let count = layout
400-
.delete(conn, &group, &MOCK_STOPWATCH)
400+
.delete(&LOGGER, conn, &group, &MOCK_STOPWATCH)
401401
.await
402402
.expect("Failed to delete");
403403
assert_eq!(0, count);
@@ -409,7 +409,7 @@ async fn delete() {
409409
.expect("Failed to update entity types");
410410
let group = row_group_delete(&entity_type, 1, entity_keys);
411411
let count = layout
412-
.delete(conn, &group, &MOCK_STOPWATCH)
412+
.delete(&LOGGER, conn, &group, &MOCK_STOPWATCH)
413413
.await
414414
.expect("Failed to delete");
415415
assert_eq!(1, count);

0 commit comments

Comments
 (0)