Skip to content

Commit 6c5fad7

Browse files
committed
IGNITE-28681 Fixed naming and made DiscoveryThread static.
1 parent 9b63f1e commit 6c5fad7

24 files changed

Lines changed: 205 additions & 208 deletions

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java

Lines changed: 47 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@
116116
import org.apache.ignite.internal.util.typedef.internal.SB;
117117
import org.apache.ignite.internal.util.typedef.internal.U;
118118
import org.apache.ignite.internal.util.worker.GridWorker;
119-
import org.apache.ignite.internal.util.worker.IgniteLinkedBlockingQueueProcessor;
119+
import org.apache.ignite.internal.util.worker.queue.IgniteObjectAsyncHandler;
120120
import org.apache.ignite.lang.IgniteClosure;
121121
import org.apache.ignite.lang.IgniteFuture;
122122
import org.apache.ignite.lang.IgniteInClosure;
@@ -231,14 +231,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
231231
/** */
232232
private final Object discoEvtMux = new Object();
233233

234-
/** Discovery event worker. */
235-
private final DiscoveryWorker discoWrk = new DiscoveryWorker();
234+
/** Handles discovery events received from {@link DiscoverySpi} implementation. */
235+
private final DiscoveryEventHandler discoEvtHnd = new DiscoveryEventHandler();
236236

237-
/** Discovery event notifier worker. */
238-
private final DiscoveryMessageNotifierWorker discoNtfWrk = new DiscoveryMessageNotifierWorker();
237+
/** Notifies Ignite components about new topology change event or {@link DiscoveryCustomMessage}. */
238+
private final DiscoveryMessageNotifier discoMsgNotifier = new DiscoveryMessageNotifier();
239239

240-
/** Network segment check worker. */
241-
private SegmentCheckWorker segChkWrk;
240+
/** Network segment checker. */
241+
private SegmentChecker segChecker;
242242

243243
/** Last logged topology. */
244244
private final GridAtomicLong lastLoggedTop = new GridAtomicLong();
@@ -558,7 +558,7 @@ private void updateClientNodes(UUID leftNodeId) {
558558
@Override public IgniteFuture<?> onDiscovery(DiscoveryNotification notification) {
559559
GridFutureAdapter<?> notificationFut = new GridFutureAdapter<>();
560560

561-
discoNtfWrk.submit(notificationFut, ctx.security().enabled()
561+
discoMsgNotifier.submit(notificationFut, ctx.security().enabled()
562562
? new SecurityAwareNotificationTask(notification)
563563
: new NotificationTask(notification));
564564

@@ -779,7 +779,7 @@ else if (customMsg instanceof ChangeGlobalStateMessage) {
779779
if (notification.getSpanContainer() != null)
780780
discoEvt.span(notification.getSpanContainer().span());
781781

782-
discoWrk.discoCache = discoCache;
782+
discoEvtHnd.discoCache = discoCache;
783783

784784
if (!ctx.clientDisconnected()) {
785785
// The security processor must be notified first, since {@link IgniteSecurity#onLocalJoin}
@@ -862,7 +862,7 @@ else if (type == EVT_CLIENT_NODE_RECONNECTED) {
862862
try {
863863
fut.get();
864864

865-
discoWrk.addEvent(
865+
discoEvtHnd.addEvent(
866866
new NotificationEvent(
867867
EVT_CLIENT_NODE_RECONNECTED,
868868
nextTopVer,
@@ -884,7 +884,7 @@ else if (type == EVT_CLIENT_NODE_RECONNECTED) {
884884
}
885885

886886
if (type == EVT_CLIENT_NODE_DISCONNECTED || type == EVT_NODE_SEGMENTED || !ctx.clientDisconnected())
887-
discoWrk.addEvent(
887+
discoEvtHnd.addEvent(
888888
new NotificationEvent(
889889
type,
890890
nextTopVer,
@@ -896,7 +896,7 @@ else if (type == EVT_CLIENT_NODE_RECONNECTED) {
896896
);
897897

898898
if (stateFinishMsg != null)
899-
discoWrk.addEvent(
899+
discoEvtHnd.addEvent(
900900
new NotificationEvent(
901901
EVT_DISCOVERY_CUSTOM_EVT,
902902
nextTopVer,
@@ -909,7 +909,7 @@ else if (type == EVT_CLIENT_NODE_RECONNECTED) {
909909
);
910910

911911
if (type == EVT_CLIENT_NODE_DISCONNECTED)
912-
discoWrk.awaitDisconnectEvent();
912+
discoEvtHnd.awaitDisconnectEvent();
913913
}
914914

915915
/**
@@ -1053,7 +1053,7 @@ private void waitForLastStateChangeEventFuture() {
10531053
}
10541054
});
10551055

1056-
discoNtfWrk.start();
1056+
discoMsgNotifier.start();
10571057

10581058
startSpi();
10591059

@@ -1066,19 +1066,19 @@ private void waitForLastStateChangeEventFuture() {
10661066
throw new IgniteCheckedException("Failed to start discovery manager (thread has been interrupted).", e);
10671067
}
10681068

1069-
// Start segment check worker only if frequency is greater than 0.
1069+
// Start segment checker only if frequency is greater than 0.
10701070
if (hasRslvrs && segChkFreq > 0) {
1071-
segChkWrk = new SegmentCheckWorker();
1071+
segChecker = new SegmentChecker();
10721072

1073-
segChkWrk.start();
1073+
segChecker.start();
10741074
}
10751075

10761076
locNode = spi.getLocalNode();
10771077

10781078
checkAttributes(discoCache().remoteNodes());
10791079

1080-
// Start discovery worker.
1081-
discoWrk.start();
1080+
// Start discovery event handler.
1081+
discoEvtHnd.start();
10821082

10831083
if (log.isDebugEnabled())
10841084
log.debug(startInfo());
@@ -1719,11 +1719,11 @@ private static String nodeDescription(ClusterNode node) {
17191719
@Override public void onKernalStop0(boolean cancel) {
17201720
startLatch.countDown();
17211721

1722-
// Stop segment check worker.
1723-
if (segChkWrk != null) {
1724-
segChkWrk.cancel();
1722+
// Stop segment checker.
1723+
if (segChecker != null) {
1724+
segChecker.cancel();
17251725

1726-
U.join(segChkWrk, log);
1726+
U.join(segChecker, log);
17271727
}
17281728

17291729
if (!locJoin.isDone())
@@ -1737,13 +1737,13 @@ private static String nodeDescription(ClusterNode node) {
17371737
// Stop receiving notifications.
17381738
getSpi().setListener(null);
17391739

1740-
U.cancel(discoWrk);
1740+
U.cancel(discoEvtHnd);
17411741

1742-
U.join(discoWrk, log);
1742+
U.join(discoEvtHnd, log);
17431743

1744-
U.cancel(discoNtfWrk);
1744+
U.cancel(discoMsgNotifier);
17451745

1746-
U.join(discoNtfWrk, log);
1746+
U.join(discoMsgNotifier, log);
17471747

17481748
// Stop SPI itself.
17491749
stopSpi();
@@ -2349,9 +2349,9 @@ public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteCheckedExce
23492349
public void clientCacheStartEvent(UUID reqId,
23502350
@Nullable Map<String, DynamicCacheChangeRequest> startReqs,
23512351
@Nullable Set<String> cachesToClose) {
2352-
// Prevent race when discovery message was processed, but was passed to discoWrk.
2352+
// Prevent race when discovery message was processed, but was passed to discovery event handler.
23532353
synchronized (discoEvtMux) {
2354-
discoWrk.addEvent(
2354+
discoEvtHnd.addEvent(
23552355
new NotificationEvent(
23562356
EVT_DISCOVERY_CUSTOM_EVT,
23572357
AffinityTopologyVersion.NONE,
@@ -2366,13 +2366,13 @@ public void clientCacheStartEvent(UUID reqId,
23662366
}
23672367

23682368
/**
2369-
* Adds metrics update event to discovery worker queue.
2369+
* Adds metrics update event to discovery event handler.
23702370
*
23712371
* @param discoCache Discovery cache.
23722372
* @param node Event node.
23732373
*/
23742374
public void metricsUpdateEvent(DiscoCache discoCache, ClusterNode node) {
2375-
discoWrk.addEvent(
2375+
discoEvtHnd.addEvent(
23762376
new NotificationEvent(
23772377
EVT_NODE_METRICS_UPDATED,
23782378
discoCache.version(),
@@ -2714,10 +2714,10 @@ public ClusterNode historicalNode(UUID nodeId) {
27142714
return null;
27152715
}
27162716

2717-
/** Worker for network segment checks. */
2718-
private class SegmentCheckWorker extends IgniteLinkedBlockingQueueProcessor<Object> {
2717+
/** Network segments checker. */
2718+
private class SegmentChecker extends IgniteObjectAsyncHandler<Object> {
27192719
/** */
2720-
private SegmentCheckWorker() {
2720+
private SegmentChecker() {
27212721
super(ctx.igniteInstanceName(), "disco-net-seg-chk-worker", GridDiscoveryManager.this.log, ctx.workersRegistry());
27222722

27232723
assert hasRslvrs;
@@ -2768,7 +2768,7 @@ public void scheduleSegmentCheck() {
27682768

27692769
Collection<ClusterNode> locNodeOnlyTop = Collections.singleton(node);
27702770

2771-
discoWrk.addEvent(
2771+
discoEvtHnd.addEvent(
27722772
new NotificationEvent(
27732773
EVT_NODE_SEGMENTED,
27742774
AffinityTopologyVersion.NONE,
@@ -2796,12 +2796,12 @@ public void scheduleSegmentCheck() {
27962796

27972797
/** {@inheritDoc} */
27982798
@Override public String toString() {
2799-
return S.toString(SegmentCheckWorker.class, this);
2799+
return S.toString(SegmentChecker.class, this);
28002800
}
28012801
}
28022802

28032803
/** */
2804-
private class DiscoveryMessageNotifierThread extends IgniteThread implements IgniteDiscoveryThread {
2804+
private static class DiscoveryMessageNotifierThread extends IgniteThread implements IgniteDiscoveryThread {
28052805
/** */
28062806
private final GridWorker worker;
28072807

@@ -2819,9 +2819,9 @@ public DiscoveryMessageNotifierThread(GridWorker worker) {
28192819
}
28202820

28212821
/** */
2822-
private class DiscoveryMessageNotifierWorker extends IgniteLinkedBlockingQueueProcessor<T2<GridFutureAdapter<?>, Runnable>> {
2822+
private class DiscoveryMessageNotifier extends IgniteObjectAsyncHandler<T2<GridFutureAdapter<?>, Runnable>> {
28232823
/** Default constructor. */
2824-
protected DiscoveryMessageNotifierWorker() {
2824+
protected DiscoveryMessageNotifier() {
28252825
super(ctx.igniteInstanceName(), "disco-notifier-worker", GridDiscoveryManager.this.log, ctx.workersRegistry());
28262826
}
28272827

@@ -2943,8 +2943,8 @@ public NotificationEvent(
29432943
}
29442944
}
29452945

2946-
/** Worker for discovery events. */
2947-
private class DiscoveryWorker extends IgniteLinkedBlockingQueueProcessor<NotificationEvent> {
2946+
/** Handler for discovery events received from the {@link DiscoverySpi} implementation. */
2947+
private class DiscoveryEventHandler extends IgniteObjectAsyncHandler<NotificationEvent> {
29482948
/** */
29492949
private DiscoCache discoCache;
29502950

@@ -2965,10 +2965,8 @@ private class DiscoveryWorker extends IgniteLinkedBlockingQueueProcessor<Notific
29652965
*/
29662966
private volatile GridFutureAdapter disconnectEvtFut;
29672967

2968-
/**
2969-
*
2970-
*/
2971-
private DiscoveryWorker() {
2968+
/** */
2969+
private DiscoveryEventHandler() {
29722970
super(ctx.igniteInstanceName(), "disco-event-worker", GridDiscoveryManager.this.log, ctx.workersRegistry());
29732971
}
29742972

@@ -3036,7 +3034,7 @@ void addEvent(NotificationEvent notificationEvt) {
30363034
assert notificationEvt.node != null : notificationEvt.customMsg;
30373035

30383036
if (notificationEvt.type == EVT_CLIENT_NODE_DISCONNECTED)
3039-
discoWrk.disconnectEvtFut = new GridFutureAdapter();
3037+
discoEvtHnd.disconnectEvtFut = new GridFutureAdapter();
30403038

30413039
addToQueue(notificationEvt);
30423040
}
@@ -3111,7 +3109,7 @@ private void body0() throws InterruptedException {
31113109
case EVT_NODE_LEFT: {
31123110
// Check only if resolvers were configured.
31133111
if (hasRslvrs)
3114-
segChkWrk.scheduleSegmentCheck();
3112+
segChecker.scheduleSegmentCheck();
31153113

31163114
if (log.isInfoEnabled())
31173115
log.info("Node left topology: " + node);
@@ -3139,7 +3137,7 @@ private void body0() throws InterruptedException {
31393137
case EVT_NODE_FAILED: {
31403138
// Check only if resolvers were configured.
31413139
if (hasRslvrs)
3142-
segChkWrk.scheduleSegmentCheck();
3140+
segChecker.scheduleSegmentCheck();
31433141

31443142
U.warn(log, "Node FAILED: " + node);
31453143

@@ -3259,7 +3257,7 @@ private void awaitDisconnectEvent() {
32593257

32603258
/** {@inheritDoc} */
32613259
@Override public String toString() {
3262-
return S.toString(DiscoveryWorker.class, this);
3260+
return S.toString(DiscoveryEventHandler.class, this);
32633261
}
32643262
}
32653263

0 commit comments

Comments
 (0)