Skip to content

Commit f69ca66

Browse files
authored
fix(server): align cache event actions in legacy EventHub path (#3017)
- Align legacy cache invalidation producers and listeners on ACTION_INVALID and ACTION_CLEAR, removing the obsolete ACTION_INVALIDED/ACTION_CLEARED constants. - Add EventHub.notifyExcept(...) so cache transactions and the cache notifier bridge can avoid re-processing their own local listener while still delivering events to other listeners. - Track registered graph/schema cache listeners per graph so notifyExcept(...) uses the listener instance actually registered on the EventHub, including multi-transaction cases where later transactions reuse the first listener. - Update cache notifier forwarding to prevent local RPC bridge loops after action names are unified. - Add regression coverage for notifyExcept semantics, graph/schema action names, listener teardown/re-registration, and notifier no-loop behavior. - The holder keeps the EventHub listener registered while any transaction for the graph is alive, and unregisters/removes it only when the last transaction releases it. The registry update, ref-count decrement, and hub unlisten now run inside ConcurrentMap.compute() to avoid owner-closes-first invalidation gaps. Also add graph/schema regression coverage for owner-first close and last-close cleanup, including graph close/reopen handling for stale EventHub holders.
1 parent e108076 commit f69ca66

11 files changed

Lines changed: 727 additions & 53 deletions

File tree

hugegraph-commons/hugegraph-common/src/main/java/org/apache/hugegraph/event/EventHub.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,27 @@ public int unlisten(String event, EventListener listener) {
149149
return count;
150150
}
151151

152+
/**
153+
* Notify all registered listeners for {@code event} EXCEPT
154+
* {@code ignoredListener}. ANY_EVENT listeners are notified unless they
155+
* are the ignored one.
156+
*
157+
* @return a Future<Integer> resolving to the count of listeners actually
158+
* invoked (the ignored listener is NOT counted)
159+
*/
160+
public Future<Integer> notifyExcept(String event,
161+
EventListener ignoredListener,
162+
@Nullable Object... args) {
163+
return this.notify(event, ignoredListener, args);
164+
}
165+
152166
public Future<Integer> notify(String event, @Nullable Object... args) {
167+
return this.notify(event, null, args);
168+
}
169+
170+
private Future<Integer> notify(String event,
171+
EventListener ignoredListener,
172+
@Nullable Object... args) {
153173
@SuppressWarnings("resource")
154174
ExtendableIterator<EventListener> all = new ExtendableIterator<>();
155175

@@ -173,8 +193,12 @@ public Future<Integer> notify(String event, @Nullable Object... args) {
173193
int count = 0;
174194
// Notify all listeners, and ignore the results
175195
while (all.hasNext()) {
196+
EventListener listener = all.next();
197+
if (listener == ignoredListener) {
198+
continue;
199+
}
176200
try {
177-
all.next().event(ev);
201+
listener.event(ev);
178202
count++;
179203
} catch (Throwable e) {
180204
LOG.warn("Failed to handle event: {}", ev, e);

hugegraph-commons/hugegraph-common/src/test/java/org/apache/hugegraph/unit/event/EventHubTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,43 @@ public void testEventNotifyWithArg2() {
388388
Assert.assertEquals(1, count.get());
389389
}
390390

391+
@Test
392+
public void testNotifyExcept() throws Exception {
393+
final String notify = "event-notify";
394+
AtomicInteger listenerACount = new AtomicInteger();
395+
AtomicInteger listenerBCount = new AtomicInteger();
396+
AtomicInteger listenerCCount = new AtomicInteger();
397+
398+
EventListener listenerA = event -> {
399+
event.checkArgs(String.class);
400+
Assert.assertEquals("fake-arg", event.args()[0]);
401+
listenerACount.incrementAndGet();
402+
return true;
403+
};
404+
EventListener listenerB = event -> {
405+
listenerBCount.incrementAndGet();
406+
return true;
407+
};
408+
EventListener listenerC = event -> {
409+
event.checkArgs(String.class);
410+
Assert.assertEquals("fake-arg", event.args()[0]);
411+
listenerCCount.incrementAndGet();
412+
return true;
413+
};
414+
415+
this.eventHub.listen(notify, listenerA);
416+
this.eventHub.listen(notify, listenerB);
417+
this.eventHub.listen(EventHub.ANY_EVENT, listenerC);
418+
419+
Assert.assertEquals(2, (int) this.eventHub
420+
.notifyExcept(notify, listenerB,
421+
"fake-arg")
422+
.get());
423+
Assert.assertEquals(1, listenerACount.get());
424+
Assert.assertEquals(0, listenerBCount.get());
425+
Assert.assertEquals(1, listenerCCount.get());
426+
}
427+
391428
@Test
392429
public void testEventNotifyWithMultiThreads() throws InterruptedException {
393430
final String notify = "event-notify";

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1394,7 +1394,7 @@ public AbstractCacheNotifier(EventHub hub, CacheNotifier proxy) {
13941394
"Expect event action argument");
13951395
String action = (String) args[0];
13961396
LOG.debug("Event action: {}", action);
1397-
if (Cache.ACTION_INVALIDED.equals(action)) {
1397+
if (Cache.ACTION_INVALID.equals(action)) {
13981398
event.checkArgs(String.class, HugeType.class, Object.class);
13991399
HugeType type = (HugeType) args[1];
14001400
Object ids = args[2];
@@ -1410,7 +1410,7 @@ public AbstractCacheNotifier(EventHub hub, CacheNotifier proxy) {
14101410
E.checkArgument(false, "Unexpected argument: %s", ids);
14111411
}
14121412
return true;
1413-
} else if (Cache.ACTION_CLEARED.equals(action)) {
1413+
} else if (Cache.ACTION_CLEAR.equals(action)) {
14141414
event.checkArgs(String.class, HugeType.class);
14151415
HugeType type = (HugeType) args[1];
14161416
LOG.debug("Calling proxy.clear with type: {}", type);
@@ -1435,17 +1435,20 @@ public void close() {
14351435

14361436
@Override
14371437
public void invalid(HugeType type, Id id) {
1438-
this.hub.notify(Events.CACHE, Cache.ACTION_INVALID, type, id);
1438+
this.hub.notifyExcept(Events.CACHE, this.cacheEventListener,
1439+
Cache.ACTION_INVALID, type, id);
14391440
}
14401441

14411442
@Override
14421443
public void invalid2(HugeType type, Object[] ids) {
1443-
this.hub.notify(Events.CACHE, Cache.ACTION_INVALID, type, ids);
1444+
this.hub.notifyExcept(Events.CACHE, this.cacheEventListener,
1445+
Cache.ACTION_INVALID, type, ids);
14441446
}
14451447

14461448
@Override
14471449
public void clear(HugeType type) {
1448-
this.hub.notify(Events.CACHE, Cache.ACTION_CLEAR, type);
1450+
this.hub.notifyExcept(Events.CACHE, this.cacheEventListener,
1451+
Cache.ACTION_CLEAR, type);
14491452
}
14501453

14511454
@Override

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/Cache.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ public interface Cache<K, V> {
2424

2525
String ACTION_INVALID = "invalid";
2626
String ACTION_CLEAR = "clear";
27-
String ACTION_INVALIDED = "invalided";
28-
String ACTION_CLEARED = "cleared";
2927

3028
V get(K id);
3129

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hugegraph.backend.cache;
19+
20+
import org.apache.hugegraph.event.EventHub;
21+
import org.apache.hugegraph.event.EventListener;
22+
23+
/*
24+
* Listener lifetime must cover all active transactions for the graph.
25+
* The holder is removed from the registry and unregistered from EventHub
26+
* only when the last transaction releases it.
27+
*/
28+
final class CacheListenerHolder {
29+
30+
final EventListener listener;
31+
final EventHub hub;
32+
// Must only be read or written inside ConcurrentMap.compute() for the
33+
// enclosing registry; ConcurrentHashMap.compute() serialises per-key access.
34+
int refCount;
35+
36+
CacheListenerHolder(EventListener listener, EventHub hub) {
37+
this.listener = listener;
38+
this.hub = hub;
39+
this.refCount = 1;
40+
}
41+
}

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedGraphTransaction.java

Lines changed: 61 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.util.Iterator;
2525
import java.util.List;
2626
import java.util.Set;
27+
import java.util.concurrent.ConcurrentHashMap;
28+
import java.util.concurrent.ConcurrentMap;
2729

2830
import org.apache.hugegraph.HugeGraphParams;
2931
import org.apache.hugegraph.backend.cache.CachedBackendStore.QueryId;
@@ -60,11 +62,20 @@ public final class CachedGraphTransaction extends GraphTransaction {
6062
private static final long AVG_VERTEX_ENTRY_SIZE = 40L;
6163
private static final long AVG_EDGE_ENTRY_SIZE = 100L;
6264

65+
/*
66+
* Listener lifetime must cover all active transactions for the graph.
67+
* The holder is removed from the registry and unregistered from EventHub
68+
* only when the last transaction releases it.
69+
*/
70+
private static final ConcurrentMap<String, CacheListenerHolder>
71+
GRAPH_CACHE_EVENT_LISTENERS = new ConcurrentHashMap<>();
72+
6373
private final Cache<Id, Object> verticesCache;
6474
private final Cache<Id, Object> edgesCache;
6575

6676
private EventListener storeEventListener;
6777
private EventListener cacheEventListener;
78+
private CacheListenerHolder holder;
6879

6980
public CachedGraphTransaction(HugeGraphParams graph, BackendStore store) {
7081
super(graph, store);
@@ -138,7 +149,7 @@ private void listenChanges() {
138149
}
139150

140151
// Listen cache event: "cache"(invalid cache item)
141-
this.cacheEventListener = event -> {
152+
EventListener listener = event -> {
142153
LOG.debug("Graph {} received graph cache event: {}",
143154
this.graph(), event);
144155
Object[] args = event.args();
@@ -184,31 +195,67 @@ private void listenChanges() {
184195
}
185196
return false;
186197
};
187-
if (graphCacheListenStatus.putIfAbsent(this.params().spaceGraphName(), true) == null) {
188-
EventHub graphEventHub = this.params().graphEventHub();
189-
graphEventHub.listen(Events.CACHE, this.cacheEventListener);
190-
}
198+
EventHub graphEventHub = this.params().graphEventHub();
199+
String graphName = this.params().spaceGraphName();
200+
CacheListenerHolder acquired = GRAPH_CACHE_EVENT_LISTENERS.compute(
201+
graphName, (key, existing) -> {
202+
if (existing == null || existing.hub != graphEventHub) {
203+
// Graph close/reopen creates a new EventHub for the
204+
// same graph name; replace the stale holder. Old
205+
// transactions skip decrement via identity check.
206+
if (existing != null) {
207+
existing.hub.unlisten(Events.CACHE,
208+
existing.listener);
209+
}
210+
graphEventHub.listen(Events.CACHE, listener);
211+
return new CacheListenerHolder(listener, graphEventHub);
212+
}
213+
existing.refCount++;
214+
return existing;
215+
});
216+
this.holder = acquired;
217+
this.cacheEventListener = acquired.listener;
191218
}
192219

193220
private void unlistenChanges() {
194221
String graphName = this.params().spaceGraphName();
195-
if (graphCacheListenStatus.remove(graphName) != null) {
196-
EventHub graphEventHub = this.params().graphEventHub();
197-
graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);
222+
CacheListenerHolder ours = this.holder;
223+
if (ours != null) {
224+
GRAPH_CACHE_EVENT_LISTENERS.compute(graphName, (key, existing) -> {
225+
if (existing == null || existing != ours) {
226+
return existing;
227+
}
228+
existing.refCount--;
229+
if (existing.refCount == 0) {
230+
existing.hub.unlisten(Events.CACHE, existing.listener);
231+
return null;
232+
}
233+
return existing;
234+
});
235+
this.holder = null;
236+
this.cacheEventListener = null;
198237
}
238+
// TODO (follow-up): storeEventListenStatus has the same owner-first
239+
// close bug this PR fixes for GRAPH_CACHE_EVENT_LISTENERS. A non-owner
240+
// transaction can remove the tracking entry, unlisten its own
241+
// never-registered storeEventListener as a no-op, and leave the
242+
// original store listener registered but untracked. Apply the same
243+
// ref-counted holder pattern in a follow-up PR.
199244
if (storeEventListenStatus.remove(graphName) != null) {
200245
this.store().provider().unlisten(this.storeEventListener);
201246
}
202247
}
203248

204249
private void notifyChanges(String action, HugeType type, Id[] ids) {
205250
EventHub graphEventHub = this.params().graphEventHub();
206-
graphEventHub.notify(Events.CACHE, action, type, ids);
251+
graphEventHub.notifyExcept(Events.CACHE, this.cacheEventListener,
252+
action, type, ids);
207253
}
208254

209255
private void notifyChanges(String action, HugeType type) {
210256
EventHub graphEventHub = this.params().graphEventHub();
211-
graphEventHub.notify(Events.CACHE, action, type);
257+
graphEventHub.notifyExcept(Events.CACHE, this.cacheEventListener,
258+
action, type);
212259
}
213260

214261
public void clearCache(HugeType type, boolean notify) {
@@ -220,7 +267,7 @@ public void clearCache(HugeType type, boolean notify) {
220267
}
221268

222269
if (notify) {
223-
this.notifyChanges(Cache.ACTION_CLEARED, null);
270+
this.notifyChanges(Cache.ACTION_CLEAR, null);
224271
}
225272
}
226273

@@ -397,7 +444,7 @@ protected void commitMutation2Backend(BackendMutation... mutations) {
397444
this.verticesCache.invalidate(vertex.id());
398445
}
399446
if (vertexOffset > 0) {
400-
this.notifyChanges(Cache.ACTION_INVALIDED,
447+
this.notifyChanges(Cache.ACTION_INVALID,
401448
HugeType.VERTEX, vertexIds);
402449
}
403450
}
@@ -411,7 +458,7 @@ protected void commitMutation2Backend(BackendMutation... mutations) {
411458
if (invalidEdgesCache && this.enableCacheEdge()) {
412459
// TODO: Use a more precise strategy to update the edge cache
413460
this.edgesCache.clear();
414-
this.notifyChanges(Cache.ACTION_CLEARED, HugeType.EDGE);
461+
this.notifyChanges(Cache.ACTION_CLEAR, HugeType.EDGE);
415462
}
416463
}
417464
}
@@ -425,7 +472,7 @@ public void removeIndex(IndexLabel indexLabel) {
425472
if (indexLabel.baseType() == HugeType.EDGE_LABEL) {
426473
// TODO: Use a more precise strategy to update the edge cache
427474
this.edgesCache.clear();
428-
this.notifyChanges(Cache.ACTION_CLEARED, HugeType.EDGE);
475+
this.notifyChanges(Cache.ACTION_CLEAR, HugeType.EDGE);
429476
}
430477
}
431478
}

0 commit comments

Comments
 (0)