Skip to content

Commit 4cedb86

Browse files
committed
Fix encryption adn zookeeper
1 parent 3657a4a commit 4cedb86

11 files changed

Lines changed: 93 additions & 48 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.ignite.internal.managers.encryption.ChangeCacheEncryptionRequest;
3939
import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyRequest;
4040
import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse;
41+
import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
4142
import org.apache.ignite.internal.managers.encryption.MasterKeyChangeRequest;
4243
import org.apache.ignite.internal.managers.encryption.NodeEncryptionKeys;
4344
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage;
@@ -644,6 +645,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C
644645
withNoSchema(ChangeCacheEncryptionRequest.class);
645646
withNoSchema(MasterKeyChangeRequest.class);
646647
withNoSchema(NodeEncryptionKeys.class);
648+
withNoSchema(GroupKeyEncrypted.class);
647649

648650
// [13000 - 13300]: Control, configuration, diagnostincs and other messages.
649651
msgIdx = 13000;

modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GroupKeyEncrypted.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,26 @@
1818
package org.apache.ignite.internal.managers.encryption;
1919

2020
import java.io.Serializable;
21+
import org.apache.ignite.internal.Order;
22+
import org.apache.ignite.plugin.extensions.communication.Message;
2123

2224
/**
2325
* Cache group encryption key with identifier. Key is encrypted.
2426
*/
25-
public class GroupKeyEncrypted implements Serializable {
27+
public class GroupKeyEncrypted implements Serializable, Message {
2628
/** Serial version UID. */
2729
private static final long serialVersionUID = 0L;
2830

2931
/** Encryption key ID. */
30-
private final int id;
32+
@Order(0)
33+
int id;
3134

3235
/** Encryption key. */
33-
private final byte[] key;
36+
@Order(1)
37+
byte[] key;
38+
39+
/** */
40+
public GroupKeyEncrypted() {}
3441

3542
/**
3643
* @param id Encryption key ID.

modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/NodeEncryptionKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public class NodeEncryptionKeys implements Serializable, Message {
4444
byte[] masterKeyDigest;
4545

4646
/** Known i.e. stored in {@code ReadWriteMetastorage} keys from node. */
47+
@Order(3)
4748
Map<Integer, List<GroupKeyEncrypted>> knownKeysWithIds;
4849

4950
/** */

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements IgniteDis
143143
/** */
144144
private final ZookeeperDiscoveryStatistics stats = new ZookeeperDiscoveryStatistics();
145145

146+
/** */
147+
private final ZkMessageFactory factory = new ZkMessageFactory();
148+
146149
/**
147150
* @return Base path in ZK for znodes created by SPI.
148151
*/
@@ -468,6 +471,9 @@ public DiscoverySpiNodeAuthenticator getAuthenticator() {
468471

469472
registerMBean(igniteInstanceName, new ZookeeperDiscoverySpiMBeanImpl(this), ZookeeperDiscoverySpiMBean.class);
470473

474+
// factory.init(((IgniteEx)ignite).context().marshallerContext().jdkMarshaller(),
475+
// U.resolveClassLoader(ignite.configuration()));
476+
471477
try {
472478
impl.startJoinAndWait();
473479
}
@@ -558,7 +564,7 @@ private ZookeeperClusterNode initLocalNode() {
558564

559565
/** {@inheritDoc} */
560566
@Override public MessageFactoryProvider messageFactoryProvider() {
561-
return new ZkMessageFactory();
567+
return factory;
562568
}
563569

564570
/** {@inheritDoc} */

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
3232
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
3333
import org.apache.ignite.spi.IgniteSpiException;
34-
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
3534

3635
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType;
3736

@@ -51,11 +50,11 @@ public DiscoveryMessageParser(MessageFactory msgFactory) {
5150
}
5251

5352
/** Marshals discovery message to bytes array. */
54-
public byte[] marshalZip(DiscoverySpiCustomMessage msg) {
53+
public byte[] marshalZip(Message msg) {
5554
ByteArrayOutputStream baos = new ByteArrayOutputStream();
5655

5756
try (DeflaterOutputStream out = new DeflaterOutputStream(baos)) {
58-
serializeMessage((Message)msg, out);
57+
serializeMessage(msg, out);
5958
}
6059
catch (Exception e) {
6160
throw new IgniteSpiException("Failed to serialize message: " + msg, e);
@@ -65,12 +64,12 @@ public byte[] marshalZip(DiscoverySpiCustomMessage msg) {
6564
}
6665

6766
/** Unmarshals discovery message from bytes array. */
68-
public DiscoverySpiCustomMessage unmarshalZip(byte[] bytes) {
67+
public <T extends Message> T unmarshalZip(byte[] bytes) {
6968
try (
7069
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
7170
InflaterInputStream in = new InflaterInputStream(bais)
7271
) {
73-
return (DiscoverySpiCustomMessage)deserializeMessage(in);
72+
return deserializeMessage(in);
7473
}
7574
catch (Exception e) {
7675
throw new IgniteSpiException("Failed to deserialize message.", e);
@@ -99,7 +98,7 @@ private void serializeMessage(Message m, OutputStream out) throws IOException {
9998
}
10099

101100
/** */
102-
private Message deserializeMessage(InputStream in) throws IOException {
101+
private <T extends Message> T deserializeMessage(InputStream in) throws IOException {
103102
DirectMessageReader msgReader = new DirectMessageReader(msgFactory, null);
104103
ByteBuffer msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE);
105104

@@ -127,6 +126,6 @@ private Message deserializeMessage(InputStream in) throws IOException {
127126
}
128127
while (!finished);
129128

130-
return msg;
129+
return (T)msg;
131130
}
132131
}

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
/**
2828
*
2929
*/
30-
class ZkCommunicationErrorResolveFinishMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
30+
public class ZkCommunicationErrorResolveFinishMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
3131
/** */
3232
@Order(0)
3333
UUID futId;

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,25 @@
1818
package org.apache.ignite.spi.discovery.zk.internal;
1919

2020
import java.io.Serializable;
21+
import org.apache.ignite.internal.Order;
22+
import org.apache.ignite.plugin.extensions.communication.Message;
2123

2224
/**
2325
*
2426
*/
25-
class ZkInternalJoinErrorMessage implements ZkInternalMessage, Serializable {
27+
public class ZkInternalJoinErrorMessage implements ZkInternalMessage, Serializable, Message {
2628
/** */
2729
private static final long serialVersionUID = 0L;
2830

2931
/** */
3032
transient boolean notifyNode = true;
3133

3234
/** */
33-
final String err;
35+
@Order(0)
36+
String err;
37+
38+
/** */
39+
public ZkInternalJoinErrorMessage() {}
3440

3541
/**
3642
* @param nodeInternalId Joining node internal ID.

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,45 @@
1919

2020
import java.io.Serializable;
2121
import java.util.Map;
22+
import org.apache.ignite.IgniteCheckedException;
23+
import org.apache.ignite.internal.Compress;
24+
import org.apache.ignite.internal.MarshallableMessage;
25+
import org.apache.ignite.internal.Order;
26+
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
2227
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
2328
import org.apache.ignite.internal.util.typedef.internal.S;
29+
import org.apache.ignite.internal.util.typedef.internal.U;
30+
import org.apache.ignite.marshaller.Marshaller;
31+
import org.apache.ignite.plugin.extensions.communication.Message;
2432

2533
/**
2634
*
2735
*/
28-
class ZkJoiningNodeData implements Serializable {
36+
public class ZkJoiningNodeData implements Serializable, MarshallableMessage {
2937
/** */
3038
private static final long serialVersionUID = 0L;
3139

3240
/** */
33-
private int partCnt;
41+
@Order(0)
42+
int partCnt;
3443

3544
/** */
3645
@GridToStringInclude
3746
private ZookeeperClusterNode node;
3847

3948
/** */
49+
@Order(1)
50+
@GridToStringExclude
51+
transient byte[] nodeBytes;
52+
53+
/** */
54+
@Order(2)
55+
@Compress
4056
@GridToStringInclude
41-
private Map<Integer, Serializable> discoData;
57+
Map<Integer, Message> discoData;
58+
59+
/** */
60+
public ZkJoiningNodeData() {}
4261

4362
/**
4463
* @param partCnt Number of parts in multi-parts message.
@@ -51,7 +70,7 @@ class ZkJoiningNodeData implements Serializable {
5170
* @param node Node.
5271
* @param discoData Discovery data.
5372
*/
54-
ZkJoiningNodeData(ZookeeperClusterNode node, Map<Integer, Serializable> discoData) {
73+
ZkJoiningNodeData(ZookeeperClusterNode node, Map<Integer, Message> discoData) {
5574
assert node != null && node.id() != null : node;
5675
assert discoData != null;
5776

@@ -76,12 +95,24 @@ ZookeeperClusterNode node() {
7695
/**
7796
* @return Discovery data.
7897
*/
79-
Map<Integer, Serializable> discoveryData() {
98+
Map<Integer, Message> discoveryData() {
8099
return discoData;
81100
}
82101

83102
/** {@inheritDoc} */
84103
@Override public String toString() {
85104
return S.toString(ZkJoiningNodeData.class, this);
86105
}
106+
107+
/** {@inheritDoc} */
108+
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
109+
if (node != null)
110+
nodeBytes = U.marshal(marsh, node);
111+
}
112+
113+
/** {@inheritDoc} */
114+
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
115+
if (nodeBytes != null)
116+
node = U.unmarshal(marsh, nodeBytes, clsLdr);
117+
}
87118
}

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,18 @@
1717

1818
package org.apache.ignite.spi.discovery.zk.internal;
1919

20+
import org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider;
2021
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
21-
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
2222

2323
/** */
24-
public class ZkMessageFactory implements MessageFactoryProvider {
24+
public class ZkMessageFactory extends AbstractMarshallableMessageFactoryProvider {
2525
/** {@inheritDoc} */
2626
@Override public void registerAll(MessageFactory factory) {
27-
factory.register(400, ZkCommunicationErrorResolveFinishMessage::new, new ZkCommunicationErrorResolveFinishMessageSerializer());
28-
factory.register(401, ZkCommunicationErrorResolveStartMessage::new, new ZkCommunicationErrorResolveStartMessageSerializer());
29-
factory.register(402, ZkForceNodeFailMessage::new, new ZkForceNodeFailMessageSerializer());
30-
factory.register(403, ZkNoServersMessage::new, new ZkNoServersMessageSerializer());
27+
register(factory, ZkCommunicationErrorResolveFinishMessage.class, (short)400, dfltMarsh, resolvedClsLdr);
28+
register(factory, ZkCommunicationErrorResolveStartMessage.class, (short)401, dfltMarsh, resolvedClsLdr);
29+
register(factory, ZkForceNodeFailMessage.class, (short)402, dfltMarsh, resolvedClsLdr);
30+
register(factory, ZkNoServersMessage.class, (short)403, dfltMarsh, resolvedClsLdr);
31+
register(factory, ZkJoiningNodeData.class, (short)404, dfltMarsh, resolvedClsLdr);
32+
register(factory, ZkInternalJoinErrorMessage.class, (short)405, dfltMarsh, resolvedClsLdr);
3133
}
3234
}

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
/**
2525
*
2626
*/
27-
class ZkNoServersMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
27+
public class ZkNoServersMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
2828
/** {@inheritDoc} */
2929
@Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
3030
return null;

0 commit comments

Comments
 (0)