Skip to content

Commit 511d55f

Browse files
committed
Fix encryption and zookeeper
1 parent 3657a4a commit 511d55f

6 files changed

Lines changed: 26 additions & 18 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.ignite.internal.managers.encryption.ChangeCacheEncryptionRequest;
3939
import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyRequest;
4040
import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse;
41+
import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
4142
import org.apache.ignite.internal.managers.encryption.MasterKeyChangeRequest;
4243
import org.apache.ignite.internal.managers.encryption.NodeEncryptionKeys;
4344
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage;
@@ -644,6 +645,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C
644645
withNoSchema(ChangeCacheEncryptionRequest.class);
645646
withNoSchema(MasterKeyChangeRequest.class);
646647
withNoSchema(NodeEncryptionKeys.class);
648+
withNoSchema(GroupKeyEncrypted.class);
647649

648650
// [13000 - 13300]: Control, configuration, diagnostincs and other messages.
649651
msgIdx = 13000;

modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GroupKeyEncrypted.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,26 @@
1818
package org.apache.ignite.internal.managers.encryption;
1919

2020
import java.io.Serializable;
21+
import org.apache.ignite.internal.Order;
22+
import org.apache.ignite.plugin.extensions.communication.Message;
2123

2224
/**
2325
* Cache group encryption key with identifier. Key is encrypted.
2426
*/
25-
public class GroupKeyEncrypted implements Serializable {
27+
public class GroupKeyEncrypted implements Serializable, Message {
2628
/** Serial version UID. */
2729
private static final long serialVersionUID = 0L;
2830

2931
/** Encryption key ID. */
30-
private final int id;
32+
@Order(0)
33+
int id;
3134

3235
/** Encryption key. */
33-
private final byte[] key;
36+
@Order(1)
37+
byte[] key;
38+
39+
/** */
40+
public GroupKeyEncrypted() {}
3441

3542
/**
3643
* @param id Encryption key ID.

modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/NodeEncryptionKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public class NodeEncryptionKeys implements Serializable, Message {
4444
byte[] masterKeyDigest;
4545

4646
/** Known i.e. stored in {@code ReadWriteMetastorage} keys from node. */
47+
@Order(3)
4748
Map<Integer, List<GroupKeyEncrypted>> knownKeysWithIds;
4849

4950
/** */

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
3232
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
3333
import org.apache.ignite.spi.IgniteSpiException;
34-
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
3534

3635
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType;
3736

@@ -51,11 +50,11 @@ public DiscoveryMessageParser(MessageFactory msgFactory) {
5150
}
5251

5352
/** Marshals discovery message to bytes array. */
54-
public byte[] marshalZip(DiscoverySpiCustomMessage msg) {
53+
public byte[] marshalZip(Message msg) {
5554
ByteArrayOutputStream baos = new ByteArrayOutputStream();
5655

5756
try (DeflaterOutputStream out = new DeflaterOutputStream(baos)) {
58-
serializeMessage((Message)msg, out);
57+
serializeMessage(msg, out);
5958
}
6059
catch (Exception e) {
6160
throw new IgniteSpiException("Failed to serialize message: " + msg, e);
@@ -65,12 +64,12 @@ public byte[] marshalZip(DiscoverySpiCustomMessage msg) {
6564
}
6665

6766
/** Unmarshals discovery message from bytes array. */
68-
public DiscoverySpiCustomMessage unmarshalZip(byte[] bytes) {
67+
public <T extends Message> T unmarshalZip(byte[] bytes) {
6968
try (
7069
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
7170
InflaterInputStream in = new InflaterInputStream(bais)
7271
) {
73-
return (DiscoverySpiCustomMessage)deserializeMessage(in);
72+
return deserializeMessage(in);
7473
}
7574
catch (Exception e) {
7675
throw new IgniteSpiException("Failed to deserialize message.", e);
@@ -99,7 +98,7 @@ private void serializeMessage(Message m, OutputStream out) throws IOException {
9998
}
10099

101100
/** */
102-
private Message deserializeMessage(InputStream in) throws IOException {
101+
private <T extends Message> T deserializeMessage(InputStream in) throws IOException {
103102
DirectMessageReader msgReader = new DirectMessageReader(msgFactory, null);
104103
ByteBuffer msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE);
105104

@@ -127,6 +126,6 @@ private Message deserializeMessage(InputStream in) throws IOException {
127126
}
128127
while (!finished);
129128

130-
return msg;
129+
return (T)msg;
131130
}
132131
}

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class ZkJoiningNodeData implements Serializable {
3838

3939
/** */
4040
@GridToStringInclude
41-
private Map<Integer, Serializable> discoData;
41+
private Map<Integer, byte[]> discoData;
4242

4343
/**
4444
* @param partCnt Number of parts in multi-parts message.
@@ -51,7 +51,7 @@ class ZkJoiningNodeData implements Serializable {
5151
* @param node Node.
5252
* @param discoData Discovery data.
5353
*/
54-
ZkJoiningNodeData(ZookeeperClusterNode node, Map<Integer, Serializable> discoData) {
54+
ZkJoiningNodeData(ZookeeperClusterNode node, Map<Integer, byte[]> discoData) {
5555
assert node != null && node.id() != null : node;
5656
assert discoData != null;
5757

@@ -76,7 +76,7 @@ ZookeeperClusterNode node() {
7676
/**
7777
* @return Discovery data.
7878
*/
79-
Map<Integer, Serializable> discoveryData() {
79+
Map<Integer, byte[]> discoveryData() {
8080
return discoData;
8181
}
8282

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@
8989
import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
9090
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
9191
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
92-
import org.apache.ignite.spi.discovery.ObjectData;
9392
import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
9493
import org.apache.zookeeper.AsyncCallback;
9594
import org.apache.zookeeper.CreateMode;
@@ -807,7 +806,7 @@ private void joinTopology(@Nullable ZkRuntimeState prevState) throws Interrupted
807806
exchange.collect(discoDataBag);
808807

809808
ZkJoiningNodeData joinData = new ZkJoiningNodeData(locNode,
810-
F.viewReadOnly(discoDataBag.joiningNodeData(), ObjectData::unwrap));
809+
new HashMap<>(F.viewReadOnly(discoDataBag.joiningNodeData(), msgParser::marshalZip)));
811810

812811
byte[] joinDataBytes;
813812

@@ -2073,7 +2072,7 @@ private ZkNodeValidateResult validateJoiningNode(ZkJoiningNodeData joiningNodeDa
20732072
if (err == null) {
20742073
DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(node.id(), joiningNodeData.node().isClient());
20752074

2076-
joiningNodeBag.joiningNodeData(F.viewReadOnly(joiningNodeData.discoveryData(), ObjectData::new));
2075+
joiningNodeBag.joiningNodeData(F.viewReadOnly(joiningNodeData.discoveryData(), msgParser::unmarshalZip));
20772076

20782077
err = spi.getSpiContext().validateNode(node, joiningNodeBag);
20792078
}
@@ -2240,7 +2239,7 @@ private void addJoinedNode(
22402239

22412240
DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(nodeId, joiningNodeData.node().isClient());
22422241

2243-
joiningNodeBag.joiningNodeData(F.viewReadOnly(joiningNodeData.discoveryData(), ObjectData::new));
2242+
joiningNodeBag.joiningNodeData(F.viewReadOnly(joiningNodeData.discoveryData(), msgParser::unmarshalZip));
22442243

22452244
exchange.onExchange(joiningNodeBag);
22462245

@@ -2876,7 +2875,7 @@ private boolean processBulkJoin(ZkDiscoveryEventsData evtsData, ZkDiscoveryNodeJ
28762875

28772876
DiscoveryDataBag dataBag = new DiscoveryDataBag(joinedEvtData.nodeId, joiningData.node().isClient());
28782877

2879-
dataBag.joiningNodeData(F.viewReadOnly(joiningData.discoveryData(), ObjectData::new));
2878+
dataBag.joiningNodeData(F.viewReadOnly(joiningData.discoveryData(), msgParser::unmarshalZip));
28802879

28812880
exchange.onExchange(dataBag);
28822881
}

0 commit comments

Comments
 (0)