Skip to content

Commit 0b35e02

Browse files
authored
IGNITE-28751 Refactored TCP Discovery SPI joining node validation (#13217)
1 parent a30f898 commit 0b35e02

11 files changed

Lines changed: 297 additions & 449 deletions

File tree

modules/core/src/main/java/org/apache/ignite/events/DiscoveryEvent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
* @see EventType#EVT_NODE_JOINED
6363
* @see EventType#EVT_NODE_LEFT
6464
* @see EventType#EVT_NODE_SEGMENTED
65+
* @see EventType#EVT_NODE_VALIDATION_FAILED
6566
* @see EventType#EVTS_DISCOVERY_ALL
6667
* @see EventType#EVTS_DISCOVERY
6768
*/

modules/core/src/main/java/org/apache/ignite/events/EventType.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,6 +1102,7 @@ public interface EventType {
11021102
EVT_NODE_JOINED,
11031103
EVT_NODE_LEFT,
11041104
EVT_NODE_FAILED,
1105+
EVT_NODE_VALIDATION_FAILED,
11051106
EVT_NODE_SEGMENTED,
11061107
EVT_CLIENT_NODE_DISCONNECTED,
11071108
EVT_CLIENT_NODE_RECONNECTED
@@ -1119,6 +1120,7 @@ public interface EventType {
11191120
EVT_NODE_LEFT,
11201121
EVT_NODE_FAILED,
11211122
EVT_NODE_SEGMENTED,
1123+
EVT_NODE_VALIDATION_FAILED,
11221124
EVT_NODE_METRICS_UPDATED,
11231125
EVT_CLIENT_NODE_DISCONNECTED,
11241126
EVT_CLIENT_NODE_RECONNECTED

modules/core/src/main/java/org/apache/ignite/events/NodeValidationFailedEvent.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,10 @@
3131
* @see EventType#EVT_NODE_VALIDATION_FAILED
3232
* @see GridComponent#validateNode
3333
*/
34-
public class NodeValidationFailedEvent extends EventAdapter {
34+
public class NodeValidationFailedEvent extends DiscoveryEvent {
3535
/** */
3636
private static final long serialVersionUID = 0L;
3737

38-
/** The node that attempted to join cluster. */
39-
private final ClusterNode evtNode;
40-
4138
/** Validation result. */
4239
private final IgniteNodeValidationResult res;
4340

@@ -49,17 +46,11 @@ public class NodeValidationFailedEvent extends EventAdapter {
4946
* @param res Joining node validation result.
5047
*/
5148
public NodeValidationFailedEvent(ClusterNode node, ClusterNode evtNode, IgniteNodeValidationResult res) {
52-
super(node, res.message(), EVT_NODE_VALIDATION_FAILED);
49+
super(node, res.message(), EVT_NODE_VALIDATION_FAILED, evtNode);
5350

54-
this.evtNode = evtNode;
5551
this.res = res;
5652
}
5753

58-
/** @return Node that couldn't join the topology due to a validation failure. */
59-
public ClusterNode eventNode() {
60-
return evtNode;
61-
}
62-
6354
/** @return Joining node validation result. */
6455
public IgniteNodeValidationResult validationResult() {
6556
return res;

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1895,6 +1895,16 @@ public IgniteInternalFuture<Long> topologyFuture(final long awaitVer) {
18951895
return fut;
18961896
}
18971897

1898+
/**
1899+
* Returns a collection of all remote nodes known to the underlying {@link DiscoverySpi} implementation.
1900+
*
1901+
* <p>Unlike {@link #remoteNodes()}, this method may include nodes that have successfully completed
1902+
* validation but have not yet completed their join routine.</p>
1903+
*/
1904+
public Collection<ClusterNode> discoverySpiRemoteNodes() {
1905+
return getSpi().getRemoteNodes();
1906+
}
1907+
18981908
/**
18991909
* Gets discovery collection cache from SPI safely guarding against "floating" collections.
19001910
*

modules/core/src/main/java/org/apache/ignite/internal/processors/rollingupgrade/RollingUpgradeProcessor.java

Lines changed: 30 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@
1818
package org.apache.ignite.internal.processors.rollingupgrade;
1919

2020
import java.util.Objects;
21+
import java.util.SortedSet;
22+
import java.util.TreeSet;
2123
import java.util.UUID;
2224
import java.util.concurrent.CountDownLatch;
2325
import org.apache.ignite.IgniteCheckedException;
2426
import org.apache.ignite.IgniteException;
2527
import org.apache.ignite.cluster.ClusterNode;
2628
import org.apache.ignite.events.DiscoveryEvent;
27-
import org.apache.ignite.events.Event;
2829
import org.apache.ignite.internal.GridKernalContext;
29-
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
3030
import org.apache.ignite.internal.processors.GridProcessorAdapter;
3131
import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
3232
import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
@@ -39,14 +39,13 @@
3939
import org.apache.ignite.lang.IgniteProductVersion;
4040
import org.apache.ignite.plugin.security.SecurityPermission;
4141
import org.apache.ignite.spi.IgniteNodeValidationResult;
42-
import org.apache.ignite.spi.discovery.DiscoverySpi;
4342
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
44-
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNodesRing;
4543
import org.jetbrains.annotations.Nullable;
4644

4745
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
4846
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
4947
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
48+
import static org.apache.ignite.events.EventType.EVT_NODE_VALIDATION_FAILED;
5049
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER;
5150
import static org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage.IGNITE_INTERNAL_KEY_PREFIX;
5251

@@ -58,15 +57,9 @@ public class RollingUpgradeProcessor extends GridProcessorAdapter implements Dis
5857
/** Metastorage with the write access. */
5958
@Nullable private volatile DistributedMetaStorage metastorage;
6059

61-
/** TCP discovery nodes ring. */
62-
private TcpDiscoveryNodesRing ring;
63-
6460
/** Last joining node. */
6561
private ClusterNode lastJoiningNode;
6662

67-
/** Last joining node timestamp. */
68-
private long lastJoiningNodeTimestamp;
69-
7063
/** Lock for synchronization between tcp-disco-msg-worker thread and management operations. */
7164
private final Object lock = new Object();
7265

@@ -85,26 +78,25 @@ public RollingUpgradeProcessor(GridKernalContext ctx) {
8578

8679
/** {@inheritDoc} */
8780
@Override public void onKernalStart(boolean active) throws IgniteCheckedException {
88-
DiscoverySpi spi = ctx.config().getDiscoverySpi();
89-
90-
if (spi instanceof TcpDiscoverySpi)
91-
ring = ((TcpDiscoverySpi)spi).discoveryRing();
92-
9381
startLatch.countDown();
9482
}
9583

9684
/** {@inheritDoc} */
9785
@Override public void start() throws IgniteCheckedException {
98-
ctx.event().addLocalEventListener(new GridLocalEventListener() {
99-
@Override public void onEvent(Event evt) {
86+
ctx.event().addLocalEventListener(
87+
evt -> {
10088
UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
10189

10290
synchronized (lock) {
10391
if (lastJoiningNode != null && lastJoiningNode.id().equals(nodeId))
10492
lastJoiningNode = null;
10593
}
106-
}
107-
}, EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT);
94+
},
95+
EVT_NODE_JOINED,
96+
EVT_NODE_FAILED,
97+
EVT_NODE_LEFT,
98+
EVT_NODE_VALIDATION_FAILED
99+
);
108100

109101
ctx.internalSubscriptionProcessor().registerDistributedMetastorageListener(new DistributedMetastorageLifecycleListener() {
110102
@Override public void onReadyForWrite(DistributedMetaStorage metastorage) {
@@ -131,8 +123,6 @@ public RollingUpgradeProcessor(GridKernalContext ctx) {
131123
@Override public @Nullable IgniteNodeValidationResult validateNode(ClusterNode node) {
132124
synchronized (lock) {
133125
lastJoiningNode = node;
134-
135-
lastJoiningNodeTimestamp = U.currentTimeMillis();
136126
}
137127

138128
ClusterNode locNode = ctx.discovery().localNode();
@@ -247,21 +237,14 @@ public void disable() throws IgniteCheckedException {
247237
if (rollUpVers == null)
248238
return;
249239

250-
IgnitePair<IgniteProductVersion> minMaxVerPair = ring.minMaxNodeVersions();
251-
252-
if (!minMaxVerPair.get1().equals(minMaxVerPair.get2()))
253-
throw new IgniteCheckedException("Can't disable rolling upgrade with different versions in cluster: "
254-
+ minMaxVerPair.get1() + ", " + minMaxVerPair.get2());
240+
IgnitePair<IgniteProductVersion> minMaxVerPair;
255241

256242
synchronized (lock) {
257-
if (lastJoiningNode != null) {
258-
// Use 3 * joinTimeout as an upper time bound for joining nodes that may drop during validation
259-
// without sending NODE_LEFT / NODE_FAILED events.
260-
long timeout = ((TcpDiscoverySpi)ctx.config().getDiscoverySpi()).getJoinTimeout() * 3;
243+
minMaxVerPair = resolveMinMaxNodeVersions();
261244

262-
if (ring.node(lastJoiningNode.id()) != null || (timeout > 0 && U.currentTimeMillis() - lastJoiningNodeTimestamp > timeout))
263-
lastJoiningNode = null;
264-
}
245+
if (!minMaxVerPair.get1().equals(minMaxVerPair.get2()))
246+
throw new IgniteCheckedException("Can't disable rolling upgrade with different versions in cluster: "
247+
+ minMaxVerPair.get1() + ", " + minMaxVerPair.get2());
265248

266249
if (lastJoiningNode != null) {
267250
IgniteProductVersion lastJoiningNodeVer = IgniteProductVersion.fromString(lastJoiningNode.attribute(ATTR_BUILD_VER));
@@ -280,6 +263,20 @@ public void disable() throws IgniteCheckedException {
280263
log.info("Rolling upgrade disabled. Current version of nodes in cluster: " + minMaxVerPair.get1());
281264
}
282265

266+
/** */
267+
private IgnitePair<IgniteProductVersion> resolveMinMaxNodeVersions() {
268+
assert Thread.holdsLock(lock);
269+
270+
SortedSet<IgniteProductVersion> clusterNodes = new TreeSet<>();
271+
272+
for (ClusterNode node : ctx.discovery().discoverySpiRemoteNodes())
273+
clusterNodes.add(node.version());
274+
275+
clusterNodes.add(ctx.discovery().localNode().version());
276+
277+
return new IgnitePair<>(clusterNodes.first(), clusterNodes.last());
278+
}
279+
283280
/**
284281
* Returns a pair containing the current and target versions of the cluster.
285282
* <p>

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ class ClientImpl extends TcpDiscoveryImpl {
251251

252252
/** {@inheritDoc} */
253253
@Override public void dumpRingStructure(IgniteLogger log) {
254-
ClusterNode[] serverNodes = getRemoteNodes().stream()
254+
ClusterNode[] serverNodes = remoteVisibleNodes().stream()
255255
.filter(node -> !node.isClient())
256256
.sorted(Comparator.comparingLong(ClusterNode::order))
257257
.toArray(ClusterNode[]::new);
@@ -371,7 +371,7 @@ class ClientImpl extends TcpDiscoveryImpl {
371371

372372
/** {@inheritDoc} */
373373
@Override public Collection<ClusterNode> getRemoteNodes() {
374-
return U.arrayList(rmtNodes.values(), TcpDiscoveryNodesRing.VISIBLE_NODES);
374+
return U.arrayList(rmtNodes.values());
375375
}
376376

377377
/** {@inheritDoc} */
@@ -465,7 +465,7 @@ else if (state == DISCONNECTED) {
465465

466466
spi.getSpiContext().deregisterPorts();
467467

468-
Collection<ClusterNode> rmts = getRemoteNodes();
468+
Collection<ClusterNode> rmts = remoteVisibleNodes();
469469

470470
// This is restart/disconnection and remote nodes are not empty.
471471
// We need to fire FAIL event for each.
@@ -1077,6 +1077,11 @@ private WorkersRegistry getWorkersRegistry() {
10771077
return ignite instanceof IgniteEx ? ((IgniteEx)ignite).context().workersRegistry() : null;
10781078
}
10791079

1080+
/** */
1081+
private Collection<ClusterNode> remoteVisibleNodes() {
1082+
return U.arrayList(rmtNodes.values(), TcpDiscoveryNodesRing.VISIBLE_NODES);
1083+
}
1084+
10801085
/**
10811086
* Metrics sender.
10821087
*/
@@ -2277,7 +2282,7 @@ private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage ms
22772282
return;
22782283

22792284
if (log.isInfoEnabled()) {
2280-
for (ClusterNode node : getRemoteNodes()) {
2285+
for (ClusterNode node : remoteVisibleNodes()) {
22812286
if (node.id().equals(locNode.clientRouterNodeId())) {
22822287
if (log.isInfoEnabled())
22832288
log.info("Router node: " + node);

0 commit comments

Comments
 (0)