Skip to content

Commit 434b956

Browse files
committed
Add ClusterFlags for update notifier
1 parent 62e207d commit 434b956

4 files changed

Lines changed: 67 additions & 29 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@
183183
import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage;
184184
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
185185
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
186+
import org.apache.ignite.internal.processors.cluster.ClusterFlags;
187+
import org.apache.ignite.internal.processors.cluster.ClusterIdAndTag;
186188
import org.apache.ignite.internal.processors.cluster.ClusterMetricsUpdateMessage;
187189
import org.apache.ignite.internal.processors.cluster.NodeFullMetricsMessage;
188190
import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage;
@@ -669,6 +671,8 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C
669671
withNoSchemaResolvedClassLoader(DynamicCacheChangeRequest.class);
670672
withNoSchema(PartitionHashRecord.class);
671673
withNoSchema(TransactionsHashRecord.class);
674+
withNoSchema(ClusterIdAndTag.class);
675+
withNoSchema(ClusterFlags.class);
672676

673677
assert msgIdx <= MAX_MESSAGE_ID;
674678
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.processors.cluster;
19+
20+
import org.apache.ignite.internal.Order;
21+
import org.apache.ignite.plugin.extensions.communication.Message;
22+
23+
/** */
24+
public class ClusterFlags implements Message {
25+
/** Update notifier enabled flag. */
26+
@Order(0)
27+
boolean updateNotifierEnabled;
28+
29+
/** */
30+
public ClusterFlags() { }
31+
32+
/**
33+
* @param updateNotifierEnabled Update notifier enabled flag.
34+
*/
35+
public ClusterFlags(boolean updateNotifierEnabled) {
36+
this.updateNotifierEnabled = updateNotifierEnabled;
37+
}
38+
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterIdAndTag.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,27 @@
2020
import java.io.Serializable;
2121
import java.util.Objects;
2222
import java.util.UUID;
23+
import org.apache.ignite.internal.Order;
2324
import org.apache.ignite.internal.util.typedef.internal.S;
25+
import org.apache.ignite.plugin.extensions.communication.Message;
2426

2527
/**
2628
* Container class to send cluster ID and tag in disco data and to write them atomically to metastorage.
2729
*/
28-
public class ClusterIdAndTag implements Serializable {
30+
public class ClusterIdAndTag implements Serializable, Message {
2931
/** */
3032
private static final long serialVersionUID = 0L;
3133

3234
/** */
33-
private final UUID id;
35+
@Order(0)
36+
UUID id;
3437

3538
/** */
36-
private final String tag;
39+
@Order(1)
40+
String tag;
41+
42+
/** */
43+
public ClusterIdAndTag() { }
3744

3845
/**
3946
* @param id Cluster ID.

modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.io.Serializable;
2121
import java.util.Collection;
22-
import java.util.HashMap;
2322
import java.util.Map;
2423
import java.util.Set;
2524
import java.util.Timer;
@@ -76,6 +75,7 @@
7675
import org.apache.ignite.lang.IgniteUuid;
7776
import org.apache.ignite.metric.MetricRegistry;
7877
import org.apache.ignite.mxbean.IgniteClusterMXBean;
78+
import org.apache.ignite.plugin.extensions.communication.Message;
7979
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
8080
import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
8181
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
@@ -465,33 +465,22 @@ public IgniteFuture<?> clientReconnectFuture() {
465465

466466
/** {@inheritDoc} */
467467
@Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
468-
dataBag.addJoiningNodeData(CLUSTER_PROC.ordinal(), getDiscoveryData());
468+
dataBag.addJoiningNodeData(CLUSTER_PROC.ordinal(), new ClusterFlags(notifyEnabled.get()));
469469
}
470470

471471
/** {@inheritDoc} */
472472
@Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
473-
dataBag.addNodeSpecificData(CLUSTER_PROC.ordinal(), getDiscoveryData());
473+
dataBag.addNodeSpecificData(CLUSTER_PROC.ordinal(), new ClusterFlags(notifyEnabled.get()));
474474

475-
dataBag.addGridCommonData(CLUSTER_PROC.ordinal(), new ClusterIdAndTag(cluster.id(), cluster.tag()));
476-
}
477-
478-
/**
479-
* @return Discovery data.
480-
*/
481-
private Serializable getDiscoveryData() {
482-
HashMap<String, Object> map = new HashMap<>(2);
483-
484-
map.put(ATTR_UPDATE_NOTIFIER_STATUS, notifyEnabled.get());
485-
486-
return map;
475+
dataBag.addGridCommonData(CLUSTER_PROC.ordinal(), (Message)new ClusterIdAndTag(cluster.id(), cluster.tag()));
487476
}
488477

489478
/** {@inheritDoc} */
490479
@Override public void onGridDataReceived(GridDiscoveryData data) {
491-
Map<UUID, Map<String, Boolean>> nodeSpecData = data.nodeSpecificData();
480+
Map<UUID, ClusterFlags> nodeSpecData = data.nodeSpecificData();
492481

493482
if (nodeSpecData != null) {
494-
Boolean lstFlag = findLastFlag(nodeSpecData.values());
483+
Boolean lstFlag = findLastUpdateNotifierFlag(nodeSpecData.values());
495484

496485
if (lstFlag != null)
497486
notifyEnabled.set(lstFlag);
@@ -500,7 +489,7 @@ private Serializable getDiscoveryData() {
500489
ClusterIdAndTag commonData = data.commonData();
501490

502491
if (commonData != null) {
503-
Serializable remoteClusterId = commonData.id();
492+
UUID remoteClusterId = commonData.id();
504493

505494
if (remoteClusterId != null) {
506495
if (locClusterId != null && !locClusterId.equals(remoteClusterId)) {
@@ -510,7 +499,7 @@ private Serializable getDiscoveryData() {
510499
", local cluster ID: " + locClusterId);
511500
}
512501

513-
locClusterId = (UUID)remoteClusterId;
502+
locClusterId = remoteClusterId;
514503
}
515504

516505
String remoteClusterTag = commonData.tag();
@@ -521,15 +510,15 @@ private Serializable getDiscoveryData() {
521510
}
522511

523512
/**
524-
* @param vals collection to seek through.
513+
* @param flags Flags collection to seek through.
525514
*/
526-
private Boolean findLastFlag(Collection<Map<String, Boolean>> vals) {
527-
for (Map<String, Boolean> map : vals) {
528-
if (map != null && map.containsKey(ATTR_UPDATE_NOTIFIER_STATUS))
529-
return map.get(ATTR_UPDATE_NOTIFIER_STATUS);
530-
}
515+
private Boolean findLastUpdateNotifierFlag(Collection<ClusterFlags> flags) {
516+
Boolean notifierFlag = null;
531517

532-
return null;
518+
for (ClusterFlags flag : flags)
519+
notifierFlag = flag.updateNotifierEnabled;
520+
521+
return notifierFlag;
533522
}
534523

535524
/** {@inheritDoc} */

0 commit comments

Comments
 (0)