Skip to content

Commit a08b923

Browse files
committed
IGNITE-28594 Use Message DTO for DiscoveryDataBag#JoiningNodeDiscoveryData
1 parent 44963ba commit a08b923

28 files changed

Lines changed: 387 additions & 201 deletions

modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyRequest;
4040
import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse;
4141
import org.apache.ignite.internal.managers.encryption.MasterKeyChangeRequest;
42+
import org.apache.ignite.internal.managers.encryption.NodeEncryptionKeys;
4243
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage;
4344
import org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider;
4445
import org.apache.ignite.internal.processors.authentication.User;
@@ -68,6 +69,7 @@
6869
import org.apache.ignite.internal.processors.cache.WalStateFinishMessage;
6970
import org.apache.ignite.internal.processors.cache.WalStateProposeMessage;
7071
import org.apache.ignite.internal.processors.cache.binary.BinaryMetadataVersionInfo;
72+
import org.apache.ignite.internal.processors.cache.binary.BinaryMetadataVersionsData;
7173
import org.apache.ignite.internal.processors.cache.binary.MetadataRemoveAcceptedMessage;
7274
import org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessage;
7375
import org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage;
@@ -186,15 +188,18 @@
186188
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
187189
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
188190
import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse;
191+
import org.apache.ignite.internal.processors.marshaller.MappedName;
189192
import org.apache.ignite.internal.processors.marshaller.MappingAcceptedMessage;
190193
import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage;
191194
import org.apache.ignite.internal.processors.marshaller.MarshallerMappingItem;
195+
import org.apache.ignite.internal.processors.marshaller.MarshallerMappingsData;
192196
import org.apache.ignite.internal.processors.marshaller.MissingMappingRequestMessage;
193197
import org.apache.ignite.internal.processors.marshaller.MissingMappingResponseMessage;
194198
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasAckMessage;
195199
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasMessage;
196200
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateAckMessage;
197201
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateMessage;
202+
import org.apache.ignite.internal.processors.query.InlineSizesData;
198203
import org.apache.ignite.internal.processors.query.QueryField;
199204
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
200205
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
@@ -243,6 +248,7 @@
243248
import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage;
244249
import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage;
245250
import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage;
251+
import org.apache.ignite.spi.discovery.ObjectData;
246252
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
247253
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
248254
import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
@@ -343,6 +349,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C
343349
withNoSchema(CacheVersionedValue.class);
344350
withNoSchema(GridCacheVersion.class);
345351
withNoSchema(GridCacheVersionEx.class);
352+
withNoSchemaResolvedClassLoader(ObjectData.class);
346353

347354
// [5700 - 5900]: Discovery originated messages.
348355
msgIdx = 5700;
@@ -560,6 +567,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C
560567
withNoSchema(StatisticsResponse.class);
561568
withNoSchema(CacheContinuousQueryBatchAck.class);
562569
withSchema(CacheContinuousQueryEntry.class);
570+
withNoSchema(InlineSizesData.class);
563571

564572
// [11200 - 11300]: Compute, distributed process messages.
565573
msgIdx = 11200;
@@ -625,13 +633,17 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C
625633
withNoSchema(MetadataResponseMessage.class);
626634
withNoSchema(MarshallerMappingItem.class);
627635
withNoSchema(BinaryMetadataVersionInfo.class);
636+
withNoSchema(BinaryMetadataVersionsData.class);
637+
withNoSchema(MappedName.class);
638+
withNoSchema(MarshallerMappingsData.class);
628639

629640
// [12400 - 12500]: Encryption messages.
630641
msgIdx = 12400;
631642
withNoSchema(GenerateEncryptionKeyRequest.class);
632643
withNoSchema(GenerateEncryptionKeyResponse.class);
633644
withNoSchema(ChangeCacheEncryptionRequest.class);
634645
withNoSchema(MasterKeyChangeRequest.class);
646+
withNoSchema(NodeEncryptionKeys.class);
635647

636648
// [13000 - 13300]: Control, configuration, diagnostincs and other messages.
637649
msgIdx = 13000;

modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
2929
import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
3030
import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
31+
import org.apache.ignite.spi.discovery.ObjectData;
3132
import org.jetbrains.annotations.Nullable;
3233

3334
/**
@@ -115,7 +116,7 @@ public PluginProvider plugin() {
115116
@Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node,
116117
JoiningNodeDiscoveryData discoData) {
117118
try {
118-
Map<String, Serializable> map = (Map<String, Serializable>)discoData.joiningNodeData();
119+
Map<String, Serializable> map = ObjectData.unwrap(discoData.joiningNodeData());
119120

120121
if (map != null)
121122
plugin.validateNewNode(node, map.get(plugin.name()));

modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java

Lines changed: 3 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -431,9 +431,9 @@ public void onLocalJoin() {
431431
"Cache group key change is in progress! Node join is rejected.");
432432
}
433433

434-
NodeEncryptionKeys nodeEncKeys = (NodeEncryptionKeys)discoData.joiningNodeData();
434+
NodeEncryptionKeys nodeEncKeys = discoData.joiningNodeData();
435435

436-
if (!discoData.hasJoiningNodeData() || nodeEncKeys == null) {
436+
if (nodeEncKeys == null) {
437437
return new IgniteNodeValidationResult(ctx.localNodeId(),
438438
"Joining node doesn't have encryption data [node=" + node.id() + "]",
439439
"Joining node doesn't have encryption data.");
@@ -522,7 +522,7 @@ public void onLocalJoin() {
522522

523523
/** {@inheritDoc} */
524524
@Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
525-
NodeEncryptionKeys nodeEncryptionKeys = (NodeEncryptionKeys)data.joiningNodeData();
525+
NodeEncryptionKeys nodeEncryptionKeys = data.joiningNodeData();
526526

527527
if (nodeEncryptionKeys == null || nodeEncryptionKeys.newKeys == null || ctx.clientNode())
528528
return;
@@ -1748,45 +1748,6 @@ private String decryptKeyName(byte[] data) {
17481748
});
17491749
}
17501750

1751-
/** */
1752-
protected static class NodeEncryptionKeys implements Serializable {
1753-
/** */
1754-
private static final long serialVersionUID = 0L;
1755-
1756-
/** */
1757-
NodeEncryptionKeys(
1758-
HashMap<Integer, List<GroupKeyEncrypted>> knownKeysWithIds,
1759-
Map<Integer, byte[]> newKeys,
1760-
byte[] masterKeyDigest
1761-
) {
1762-
this.newKeys = newKeys;
1763-
this.masterKeyDigest = masterKeyDigest;
1764-
1765-
if (F.isEmpty(knownKeysWithIds))
1766-
return;
1767-
1768-
// To be able to join the old cluster.
1769-
knownKeys = U.newHashMap(knownKeysWithIds.size());
1770-
1771-
for (Map.Entry<Integer, List<GroupKeyEncrypted>> entry : knownKeysWithIds.entrySet())
1772-
knownKeys.put(entry.getKey(), entry.getValue().get(0).key());
1773-
1774-
this.knownKeysWithIds = knownKeysWithIds;
1775-
}
1776-
1777-
/** Known i.e. stored in {@code ReadWriteMetastorage} keys from node (in compatible format). */
1778-
Map<Integer, byte[]> knownKeys;
1779-
1780-
/** New keys i.e. keys for a local statically configured caches. */
1781-
Map<Integer, byte[]> newKeys;
1782-
1783-
/** Master key digest. */
1784-
byte[] masterKeyDigest;
1785-
1786-
/** Known i.e. stored in {@code ReadWriteMetastorage} keys from node. */
1787-
Map<Integer, List<GroupKeyEncrypted>> knownKeysWithIds;
1788-
}
1789-
17901751
/** */
17911752
private class GenerateEncryptionKeyFuture extends GridFutureAdapter<T2<Collection<byte[]>, byte[]>> {
17921753
/** */
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.managers.encryption;
19+
20+
import java.io.Serializable;
21+
import java.util.HashMap;
22+
import java.util.List;
23+
import java.util.Map;
24+
import org.apache.ignite.internal.Order;
25+
import org.apache.ignite.internal.util.typedef.F;
26+
import org.apache.ignite.internal.util.typedef.internal.U;
27+
import org.apache.ignite.plugin.extensions.communication.Message;
28+
29+
/** */
30+
public class NodeEncryptionKeys implements Serializable, Message {
31+
/** */
32+
private static final long serialVersionUID = 0L;
33+
34+
/** Known i.e. stored in {@code ReadWriteMetastorage} keys from node (in compatible format). */
35+
@Order(0)
36+
Map<Integer, byte[]> knownKeys;
37+
38+
/** New keys i.e. keys for a local statically configured caches. */
39+
@Order(1)
40+
Map<Integer, byte[]> newKeys;
41+
42+
/** Master key digest. */
43+
@Order(2)
44+
byte[] masterKeyDigest;
45+
46+
/** Known i.e. stored in {@code ReadWriteMetastorage} keys from node. */
47+
Map<Integer, List<GroupKeyEncrypted>> knownKeysWithIds;
48+
49+
/** */
50+
public NodeEncryptionKeys() {}
51+
52+
/** */
53+
NodeEncryptionKeys(
54+
HashMap<Integer, List<GroupKeyEncrypted>> knownKeysWithIds,
55+
Map<Integer, byte[]> newKeys,
56+
byte[] masterKeyDigest
57+
) {
58+
this.newKeys = newKeys;
59+
this.masterKeyDigest = masterKeyDigest;
60+
61+
if (F.isEmpty(knownKeysWithIds))
62+
return;
63+
64+
// To be able to join the old cluster.
65+
knownKeys = U.newHashMap(knownKeysWithIds.size());
66+
67+
for (Map.Entry<Integer, List<GroupKeyEncrypted>> entry : knownKeysWithIds.entrySet())
68+
knownKeys.put(entry.getKey(), entry.getValue().get(0).key());
69+
70+
this.knownKeysWithIds = knownKeysWithIds;
71+
}
72+
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import org.apache.ignite.plugin.CachePluginProvider;
8484
import org.apache.ignite.plugin.PluginProvider;
8585
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
86+
import org.apache.ignite.spi.discovery.ObjectData;
8687
import org.apache.ignite.spi.systemview.view.CacheGroupView;
8788
import org.apache.ignite.spi.systemview.view.CacheView;
8889
import org.jetbrains.annotations.Nullable;
@@ -1257,7 +1258,7 @@ private boolean validateStartNewCache(
12571258
* @param dataBag Discovery data bag.
12581259
*/
12591260
void collectJoiningNodeData(DiscoveryDataBag dataBag) {
1260-
dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), joinDiscoveryData());
1261+
dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), new ObjectData(joinDiscoveryData()));
12611262
}
12621263

12631264
/**
@@ -2038,7 +2039,7 @@ public ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, Affini
20382039
*/
20392040
public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
20402041
if (data.hasJoiningNodeData()) {
2041-
Serializable joiningNodeData = data.joiningNodeData();
2042+
Serializable joiningNodeData = ObjectData.unwrap(data.joiningNodeData());
20422043

20432044
if (joiningNodeData instanceof CacheClientReconnectDiscoveryData) {
20442045
if (disconnectedState()) {
@@ -2062,7 +2063,7 @@ else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData)
20622063
*/
20632064
public String validateJoiningNodeData(DiscoveryDataBag.JoiningNodeDiscoveryData data, boolean joiningNodeClient) {
20642065
if (data.hasJoiningNodeData()) {
2065-
Serializable joiningNodeData = data.joiningNodeData();
2066+
Serializable joiningNodeData = ObjectData.unwrap(data.joiningNodeData());
20662067

20672068
if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) {
20682069
CacheJoinNodeDiscoveryData joinData = (CacheJoinNodeDiscoveryData)joiningNodeData;

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.ignite.internal.processors.cache;
1919

20+
import java.io.Serializable;
2021
import java.util.ArrayList;
2122
import java.util.Arrays;
2223
import java.util.Collection;
@@ -60,6 +61,7 @@
6061
import org.apache.ignite.plugin.security.SecurityException;
6162
import org.apache.ignite.spi.IgniteNodeValidationResult;
6263
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
64+
import org.apache.ignite.spi.discovery.ObjectData;
6365
import org.apache.ignite.spi.encryption.EncryptionSpi;
6466
import org.jetbrains.annotations.Nullable;
6567

@@ -123,8 +125,10 @@ public class ValidationOnNodeJoinUtils {
123125
GridKernalContext ctx,
124126
Function<String, DynamicCacheDescriptor> cacheDescProvider
125127
) {
126-
if (discoData.hasJoiningNodeData() && discoData.joiningNodeData() instanceof CacheJoinNodeDiscoveryData) {
127-
CacheJoinNodeDiscoveryData nodeData = (CacheJoinNodeDiscoveryData)discoData.joiningNodeData();
128+
Serializable joiningNodeData = ObjectData.unwrap(discoData.joiningNodeData());
129+
130+
if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) {
131+
CacheJoinNodeDiscoveryData nodeData = (CacheJoinNodeDiscoveryData)joiningNodeData;
128132

129133
boolean isGridActive = ctx.state().clusterState().active();
130134

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.processors.cache.binary;
19+
20+
import java.util.Map;
21+
import org.apache.ignite.internal.Order;
22+
import org.apache.ignite.plugin.extensions.communication.Message;
23+
24+
/** */
25+
public class BinaryMetadataVersionsData implements Message {
26+
/** */
27+
@Order(0)
28+
Map<Integer, BinaryMetadataVersionInfo> data;
29+
30+
/** */
31+
public BinaryMetadataVersionsData() {}
32+
33+
/**
34+
* @param data Data.
35+
*/
36+
public BinaryMetadataVersionsData(Map<Integer, BinaryMetadataVersionInfo> data) {
37+
this.data = Map.copyOf(data);
38+
}
39+
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1394,7 +1394,7 @@ private int partition(CacheObjectContext ctx, @Nullable GridCacheContext cctx, O
13941394
if ((res = validateBinaryConfiguration(rmtNode)) != null)
13951395
return res;
13961396

1397-
return validateBinaryMetadata(rmtNode.id(), (Map<Integer, BinaryMetadataVersionInfo>)discoData.joiningNodeData());
1397+
return validateBinaryMetadata(rmtNode.id(), discoData.joiningNodeData());
13981398
}
13991399

14001400
/** */
@@ -1418,11 +1418,11 @@ private IgniteNodeValidationResult validateBinaryConfiguration(ClusterNode rmtNo
14181418
}
14191419

14201420
/** */
1421-
private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId, Map<Integer, BinaryMetadataVersionInfo> newNodeMeta) {
1421+
private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId, BinaryMetadataVersionsData newNodeMeta) {
14221422
if (newNodeMeta == null)
14231423
return null;
14241424

1425-
for (Map.Entry<Integer, BinaryMetadataVersionInfo> metaEntry : newNodeMeta.entrySet()) {
1425+
for (Map.Entry<Integer, BinaryMetadataVersionInfo> metaEntry : newNodeMeta.data.entrySet()) {
14261426
if (!metadataLocCache.containsKey(metaEntry.getKey()))
14271427
continue;
14281428

@@ -1470,24 +1470,19 @@ private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId, Map<In
14701470

14711471
/** {@inheritDoc} */
14721472
@Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
1473-
Map<Integer, BinaryMetadataVersionInfo> res = U.newHashMap(metadataLocCache.size());
1474-
1475-
for (Map.Entry<Integer, BinaryMetadataVersionInfo> e : metadataLocCache.entrySet())
1476-
res.put(e.getKey(), e.getValue());
1477-
1478-
dataBag.addJoiningNodeData(BINARY_PROC.ordinal(), (Serializable)res);
1473+
dataBag.addJoiningNodeData(BINARY_PROC.ordinal(), new BinaryMetadataVersionsData(metadataLocCache));
14791474
}
14801475

14811476
/** {@inheritDoc} */
14821477
@Override public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
1483-
Map<Integer, BinaryMetadataVersionInfo> newNodeMeta = (Map<Integer, BinaryMetadataVersionInfo>)data.joiningNodeData();
1478+
BinaryMetadataVersionsData newNodeMeta = data.joiningNodeData();
14841479

14851480
if (newNodeMeta == null)
14861481
return;
14871482

14881483
UUID joiningNode = data.joiningNodeId();
14891484

1490-
for (Map.Entry<Integer, BinaryMetadataVersionInfo> metaEntry : newNodeMeta.entrySet()) {
1485+
for (Map.Entry<Integer, BinaryMetadataVersionInfo> metaEntry : newNodeMeta.data.entrySet()) {
14911486
if (metadataLocCache.containsKey(metaEntry.getKey())) {
14921487
BinaryMetadataVersionInfo locMetaVerInfo = metadataLocCache.get(metaEntry.getKey());
14931488

0 commit comments

Comments
 (0)