Skip to content

Commit f2706d0

Browse files
committed
fix(server): skip local echo on schema cache V2 clear (#2617)
- Cross-node schema cache invalidation broke when the publisher cleared its own fresh cache. Tag events with a per-JVM UUID and skip self.
1 parent 78f47ad commit f2706d0

6 files changed

Lines changed: 259 additions & 15 deletions

File tree

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import java.util.Map;
2323
import java.util.Set;
24+
import java.util.UUID;
2425
import java.util.concurrent.ConcurrentHashMap;
2526
import java.util.concurrent.atomic.AtomicBoolean;
2627
import java.util.function.Consumer;
@@ -35,6 +36,7 @@
3536
import org.apache.hugegraph.event.EventListener;
3637
import org.apache.hugegraph.meta.MetaDriver;
3738
import org.apache.hugegraph.meta.MetaManager;
39+
import org.apache.hugegraph.meta.MetaManager.SchemaCacheClearEvent;
3840
import org.apache.hugegraph.perf.PerfUtil;
3941
import org.apache.hugegraph.schema.SchemaElement;
4042
import org.apache.hugegraph.type.HugeType;
@@ -58,6 +60,15 @@ public class CachedSchemaTransactionV2 extends SchemaTransactionV2 {
5860

5961
private static final Object META_LISTENER_LOCK = new Object();
6062

63+
/**
64+
* Per-JVM identifier emitted with every schema-cache-clear meta event so
65+
* the listener can skip its own echo. Lifecycle: generated once per
66+
* classloader at class init, never reused, regenerated on JVM restart.
67+
* This is not a stable node identity, only a local self-echo filter.
68+
*/
69+
private static final String SCHEMA_CACHE_CLEAR_SOURCE =
70+
UUID.randomUUID().toString();
71+
6172
private final Cache<Id, Object> idCache;
6273
private final Cache<Id, Object> nameCache;
6374

@@ -222,14 +233,18 @@ private static void listenSchemaCacheClear() {
222233
* depending on a live etcd/PD watch.
223234
*/
224235
static <T> void handleSchemaCacheClearEvent(T response) {
225-
List<String> graphNames =
236+
List<SchemaCacheClearEvent> events =
226237
MetaManager.instance()
227-
.extractSchemaCacheClearGraphNamesFromResponse(
238+
.extractSchemaCacheClearEventsFromResponse(
228239
response);
229-
if (graphNames == null) {
240+
if (events == null) {
230241
return;
231242
}
232-
for (String graphName : graphNames) {
243+
for (SchemaCacheClearEvent event : events) {
244+
if (SCHEMA_CACHE_CLEAR_SOURCE.equals(event.source())) {
245+
continue;
246+
}
247+
String graphName = event.graph();
233248
LOG.debug("Graph {} clear schema cache on meta event", graphName);
234249
clearSchemaCache(graphName);
235250
}
@@ -249,7 +264,7 @@ static <T> void handleSchemaCacheClearEvent(T response) {
249264
*/
250265
public static void resetMetaListenerForReconnect() {
251266
if (metaEventListenerRegistered.compareAndSet(true, false)) {
252-
LOG.warn("Schema cache clear meta listener lost on reconnect " +
267+
LOG.warn("Schema cache clear meta listener lost on reconnect - " +
253268
"will re-register on next schema operation.");
254269
}
255270
}
@@ -327,9 +342,7 @@ protected void addSchema(SchemaElement schema) {
327342

328343
// Schema additions must always propagate to remote nodes regardless
329344
// of TASK_SYNC_DELETION (which only gates removal flows).
330-
MetaManager.instance()
331-
.notifySchemaCacheClear(this.graph().graphSpace(),
332-
this.graph().name());
345+
this.notifySchemaCacheClear();
333346
}
334347

335348
private void updateCache(SchemaElement schema) {
@@ -361,12 +374,17 @@ private void maybeNotifySchemaCacheClear() {
361374
// TASK_SYNC_DELETION=true: the caller propagates cache invalidation
362375
// synchronously, so the meta-event broadcast would be redundant.
363376
if (!this.graph().option(CoreOptions.TASK_SYNC_DELETION)) {
364-
MetaManager.instance()
365-
.notifySchemaCacheClear(this.graph().graphSpace(),
366-
this.graph().name());
377+
this.notifySchemaCacheClear();
367378
}
368379
}
369380

381+
private void notifySchemaCacheClear() {
382+
MetaManager.instance()
383+
.notifySchemaCacheClear(this.graph().graphSpace(),
384+
this.graph().name(),
385+
SCHEMA_CACHE_CLEAR_SOURCE);
386+
}
387+
370388
@Override
371389
@SuppressWarnings("unchecked")
372390
protected <T extends SchemaElement> T getSchema(HugeType type, Id id) {

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/MetaManager.java

Lines changed: 87 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,16 @@
5757
import org.apache.hugegraph.space.SchemaTemplate;
5858
import org.apache.hugegraph.space.Service;
5959
import org.apache.hugegraph.util.E;
60+
import org.apache.hugegraph.util.JsonUtil;
61+
import org.apache.hugegraph.util.Log;
62+
import org.slf4j.Logger;
6063

6164
import com.google.common.collect.ImmutableMap;
6265

6366
public class MetaManager {
6467

68+
private static final Logger LOG = Log.logger(MetaManager.class);
69+
6570
public static final String META_PATH_DELIMITER = "/";
6671
public static final String META_PATH_JOIN = "-";
6772

@@ -119,6 +124,8 @@ public class MetaManager {
119124
public static final long LOCK_DEFAULT_LEASE = 30L;
120125
public static final long LOCK_DEFAULT_TIMEOUT = 10L;
121126
public static final int RANDOM_USER_ID = 100;
127+
private static final String SCHEMA_CACHE_CLEAR_GRAPH_KEY = "graph";
128+
private static final String SCHEMA_CACHE_CLEAR_SOURCE_KEY = "source";
122129
private static final String META_PATH_URLS = "URLS";
123130
private static final String META_PATH_PD_PEERS = "HSTORE_PD_PEERS";
124131
private static final MetaManager INSTANCE = new MetaManager();
@@ -380,9 +387,21 @@ public <T> List<String> extractGraphsFromResponse(T response) {
380387
return this.metaDriver.extractValuesFromResponse(response);
381388
}
382389

383-
public <T> List<String> extractSchemaCacheClearGraphNamesFromResponse(
390+
public <T> List<SchemaCacheClearEvent> extractSchemaCacheClearEventsFromResponse(
384391
T response) {
385-
return this.metaDriver.extractValuesFromResponse(response);
392+
List<String> values = this.metaDriver.extractValuesFromResponse(response);
393+
if (values == null) {
394+
return null;
395+
}
396+
397+
List<SchemaCacheClearEvent> events = new ArrayList<>(values.size());
398+
for (String value : values) {
399+
SchemaCacheClearEvent event = SchemaCacheClearEvent.fromValue(value);
400+
if (event != null) {
401+
events.add(event);
402+
}
403+
}
404+
return events;
386405
}
387406

388407
public <T> Map<String, String> extractKVFromResponse(T response) {
@@ -504,7 +523,12 @@ public void notifyGraphClear(String graphSpace, String graph) {
504523
}
505524

506525
public void notifySchemaCacheClear(String graphSpace, String graph) {
507-
this.graphMetaManager.notifySchemaCacheClear(graphSpace, graph);
526+
this.notifySchemaCacheClear(graphSpace, graph, null);
527+
}
528+
529+
public void notifySchemaCacheClear(String graphSpace, String graph,
530+
String source) {
531+
this.graphMetaManager.notifySchemaCacheClear(graphSpace, graph, source);
508532
}
509533

510534
public void notifyGraphCacheClear(String graphSpace, String graph) {
@@ -1292,6 +1316,66 @@ public void setWhiteIpStatus(boolean status) {
12921316
this.metaDriver.put(key, ((Boolean) status).toString());
12931317
}
12941318

1319+
public static String schemaCacheClearEventValue(String graph,
1320+
String source) {
1321+
if (StringUtils.isEmpty(source)) {
1322+
return graph;
1323+
}
1324+
return JsonUtil.toJson(ImmutableMap.of(SCHEMA_CACHE_CLEAR_GRAPH_KEY,
1325+
graph,
1326+
SCHEMA_CACHE_CLEAR_SOURCE_KEY,
1327+
source));
1328+
}
1329+
1330+
public static final class SchemaCacheClearEvent {
1331+
1332+
private final String graph;
1333+
private final String source;
1334+
1335+
private SchemaCacheClearEvent(String graph, String source) {
1336+
this.graph = graph;
1337+
this.source = source;
1338+
}
1339+
1340+
public String graph() {
1341+
return this.graph;
1342+
}
1343+
1344+
public String source() {
1345+
return this.source;
1346+
}
1347+
1348+
@SuppressWarnings("unchecked")
1349+
static SchemaCacheClearEvent fromValue(String value) {
1350+
if (StringUtils.isEmpty(value)) {
1351+
return null;
1352+
}
1353+
if (value.charAt(0) != '{') {
1354+
return new SchemaCacheClearEvent(value, null);
1355+
}
1356+
1357+
Map<String, Object> payload;
1358+
try {
1359+
payload = JsonUtil.fromJson(value, Map.class);
1360+
} catch (RuntimeException e) {
1361+
LOG.debug("Malformed schema-cache-clear payload, ignoring: {}",
1362+
value, e);
1363+
return null;
1364+
}
1365+
1366+
Object graph = payload.get(SCHEMA_CACHE_CLEAR_GRAPH_KEY);
1367+
if (graph == null) {
1368+
LOG.debug("Schema-cache-clear payload missing '{}' field: {}",
1369+
SCHEMA_CACHE_CLEAR_GRAPH_KEY, value);
1370+
return null;
1371+
}
1372+
1373+
Object source = payload.get(SCHEMA_CACHE_CLEAR_SOURCE_KEY);
1374+
String sourceValue = source == null ? null : source.toString();
1375+
return new SchemaCacheClearEvent(graph.toString(), sourceValue);
1376+
}
1377+
}
1378+
12951379
public enum MetaDriverType {
12961380
ETCD,
12971381
PD

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/managers/GraphMetaManager.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import static org.apache.hugegraph.meta.MetaManager.META_PATH_SYS_GRAPH_CONF;
3434
import static org.apache.hugegraph.meta.MetaManager.META_PATH_UPDATE;
3535
import static org.apache.hugegraph.meta.MetaManager.META_PATH_VERTEX_LABEL;
36+
import static org.apache.hugegraph.meta.MetaManager.schemaCacheClearEventValue;
3637

3738
import java.util.Map;
3839
import java.util.function.Consumer;
@@ -94,8 +95,14 @@ public void notifyGraphClear(String graphSpace, String graph) {
9495
}
9596

9697
public void notifySchemaCacheClear(String graphSpace, String graph) {
98+
this.notifySchemaCacheClear(graphSpace, graph, null);
99+
}
100+
101+
public void notifySchemaCacheClear(String graphSpace, String graph,
102+
String source) {
97103
this.metaDriver.put(this.schemaCacheClearKey(),
98-
graphName(graphSpace, graph));
104+
schemaCacheClearEventValue(
105+
graphName(graphSpace, graph), source));
99106
}
100107

101108
public void notifyGraphCacheClear(String graphSpace, String graph) {
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to You under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the
7+
* License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
18+
package org.apache.hugegraph.meta;
19+
20+
import org.apache.hugegraph.meta.MetaManager.SchemaCacheClearEvent;
21+
import org.apache.hugegraph.testutil.Assert;
22+
import org.junit.Test;
23+
24+
public class MetaManagerSchemaCacheClearEventTest {
25+
26+
@Test
27+
public void testFromValueReturnsNullForEmptyPayload() {
28+
Assert.assertNull(SchemaCacheClearEvent.fromValue(null));
29+
Assert.assertNull(SchemaCacheClearEvent.fromValue(""));
30+
}
31+
32+
@Test
33+
public void testFromValueParsesLegacyPlainGraphName() {
34+
SchemaCacheClearEvent event =
35+
SchemaCacheClearEvent.fromValue("DEFAULT-graph1");
36+
37+
assertEvent(event, "DEFAULT-graph1", null);
38+
}
39+
40+
@Test
41+
public void testFromValueIgnoresMalformedJson() {
42+
Assert.assertNull(SchemaCacheClearEvent.fromValue("{not-json"));
43+
}
44+
45+
@Test
46+
public void testFromValueParsesJsonWithSource() {
47+
String value = MetaManager.schemaCacheClearEventValue("g", "u");
48+
SchemaCacheClearEvent event = SchemaCacheClearEvent.fromValue(value);
49+
50+
assertEvent(event, "g", "u");
51+
}
52+
53+
@Test
54+
public void testFromValueParsesJsonWithoutSource() {
55+
SchemaCacheClearEvent event =
56+
SchemaCacheClearEvent.fromValue("{\"graph\":\"g\"}");
57+
58+
assertEvent(event, "g", null);
59+
}
60+
61+
@Test
62+
public void testFromValueIgnoresJsonWithoutGraph() {
63+
Assert.assertNull(
64+
SchemaCacheClearEvent.fromValue("{\"source\":\"u\"}"));
65+
}
66+
67+
private static void assertEvent(SchemaCacheClearEvent event,
68+
String graph,
69+
String source) {
70+
Assert.assertNotNull(event);
71+
Assert.assertEquals(graph, event.graph());
72+
Assert.assertEquals(source, event.source());
73+
}
74+
}

hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hugegraph.unit;
1919

2020
import org.apache.hugegraph.core.RoleElectionStateMachineTest;
21+
import org.apache.hugegraph.meta.MetaManagerSchemaCacheClearEventTest;
2122
import org.apache.hugegraph.unit.api.filter.LoadDetectFilterTest;
2223
import org.apache.hugegraph.unit.api.filter.PathFilterTest;
2324
import org.apache.hugegraph.unit.api.gremlin.GremlinQueryAPITest;
@@ -92,6 +93,7 @@
9293
CacheTest.OffheapCacheTest.class,
9394
CacheTest.LevelCacheTest.class,
9495
CachedSchemaTransactionTest.class,
96+
MetaManagerSchemaCacheClearEventTest.class,
9597
CachedGraphTransactionTest.class,
9698
CacheManagerTest.class,
9799
RamTableTest.class,

0 commit comments

Comments
 (0)