From a8e1a42e9c9f413f45c9f538a53b56745a85f92b Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Thu, 23 Apr 2026 16:42:17 +0300 Subject: [PATCH 1/3] IGNITE-28594 Use Message DTO for DiscoveryDataBag#JoiningNodeDiscoveryData --- .../ignite/internal/CoreMessagesProvider.java | 12 +++ .../ignite/internal/GridPluginComponent.java | 3 +- .../encryption/GridEncryptionManager.java | 45 +-------- .../encryption/NodeEncryptionKeys.java | 72 ++++++++++++++ .../processors/cache/ClusterCachesInfo.java | 7 +- .../cache/ValidationOnNodeJoinUtils.java | 8 +- .../binary/BinaryMetadataVersionsData.java | 39 ++++++++ .../CacheObjectBinaryProcessorImpl.java | 17 ++-- .../processors/cluster/ClusterProcessor.java | 3 +- .../cluster/GridClusterStateProcessor.java | 38 +------- .../continuous/ContinuousRoutinesInfo.java | 5 +- .../continuous/GridContinuousProcessor.java | 8 +- .../GridMarshallerMappingProcessor.java | 6 +- .../processors/marshaller/MappedName.java | 13 ++- .../marshaller/MarshallerMappingsData.java | 40 ++++++++ .../DistributedMetaStorageImpl.java | 30 ++---- .../plugin/IgnitePluginProcessor.java | 5 +- .../processors/query/GridQueryProcessor.java | 22 ++--- .../processors/query/InlineSizesData.java | 39 ++++++++ .../service/IgniteServiceProcessor.java | 10 +- .../spi/discovery/DiscoveryDataBag.java | 22 +++-- .../ignite/spi/discovery/ObjectData.java | 82 ++++++++++++++++ .../ignite/spi/discovery/tcp/ServerImpl.java | 17 +--- .../spi/discovery/tcp/TcpDiscoverySpi.java | 8 +- .../tcp/internal/DiscoveryDataPacket.java | 93 +++---------------- .../TcpDiscoveryNodeAddedMessage.java | 9 -- .../resources/META-INF/classnames.properties | 2 +- .../DistributedMetaStoragePersistentTest.java | 12 +-- .../zk/internal/ZookeeperDiscoveryImpl.java | 11 ++- 29 files changed, 400 insertions(+), 278 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/NodeEncryptionKeys.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionsData.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java create mode 100644 modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index ddab45f84d3d1..c5384a48fb6c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -39,6 +39,7 @@ import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyRequest; import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse; import org.apache.ignite.internal.managers.encryption.MasterKeyChangeRequest; +import org.apache.ignite.internal.managers.encryption.NodeEncryptionKeys; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage; import org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider; import org.apache.ignite.internal.processors.authentication.User; @@ -68,6 +69,7 @@ import org.apache.ignite.internal.processors.cache.WalStateFinishMessage; import org.apache.ignite.internal.processors.cache.WalStateProposeMessage; import org.apache.ignite.internal.processors.cache.binary.BinaryMetadataVersionInfo; +import org.apache.ignite.internal.processors.cache.binary.BinaryMetadataVersionsData; import org.apache.ignite.internal.processors.cache.binary.MetadataRemoveAcceptedMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage; @@ -186,15 +188,18 @@ import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry; import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest; import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse; +import org.apache.ignite.internal.processors.marshaller.MappedName; import org.apache.ignite.internal.processors.marshaller.MappingAcceptedMessage; import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage; import org.apache.ignite.internal.processors.marshaller.MarshallerMappingItem; +import org.apache.ignite.internal.processors.marshaller.MarshallerMappingsData; import org.apache.ignite.internal.processors.marshaller.MissingMappingRequestMessage; import org.apache.ignite.internal.processors.marshaller.MissingMappingResponseMessage; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasAckMessage; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasMessage; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateAckMessage; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateMessage; +import org.apache.ignite.internal.processors.query.InlineSizesData; import org.apache.ignite.internal.processors.query.QueryField; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse; @@ -243,6 +248,7 @@ import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage; import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage; import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage; +import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage; @@ -343,6 +349,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(CacheVersionedValue.class); withNoSchema(GridCacheVersion.class); withNoSchema(GridCacheVersionEx.class); + withNoSchemaResolvedClassLoader(ObjectData.class); // [5700 - 5900]: Discovery originated messages. msgIdx = 5700; @@ -560,6 +567,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(StatisticsResponse.class); withNoSchema(CacheContinuousQueryBatchAck.class); withSchema(CacheContinuousQueryEntry.class); + withNoSchema(InlineSizesData.class); // [11200 - 11300]: Compute, distributed process messages. msgIdx = 11200; @@ -625,6 +633,9 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(MetadataResponseMessage.class); withNoSchema(MarshallerMappingItem.class); withNoSchema(BinaryMetadataVersionInfo.class); + withNoSchema(BinaryMetadataVersionsData.class); + withNoSchema(MappedName.class); + withNoSchema(MarshallerMappingsData.class); // [12400 - 12500]: Encryption messages. msgIdx = 12400; @@ -632,6 +643,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(GenerateEncryptionKeyResponse.class); withNoSchema(ChangeCacheEncryptionRequest.class); withNoSchema(MasterKeyChangeRequest.class); + withNoSchema(NodeEncryptionKeys.class); // [13000 - 13300]: Control, configuration, diagnostincs and other messages. msgIdx = 13000; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java index 56f292f407b4d..2d0dbebcde824 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java @@ -28,6 +28,7 @@ import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData; +import org.apache.ignite.spi.discovery.ObjectData; import org.jetbrains.annotations.Nullable; /** @@ -115,7 +116,7 @@ public PluginProvider plugin() { @Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node, JoiningNodeDiscoveryData discoData) { try { - Map map = (Map)discoData.joiningNodeData(); + Map map = ObjectData.unwrap(discoData.joiningNodeData()); if (map != null) plugin.validateNewNode(node, map.get(plugin.name())); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java index e20b1effd18bc..19c58d5d7b8cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java @@ -431,9 +431,9 @@ public void onLocalJoin() { "Cache group key change is in progress! Node join is rejected."); } - NodeEncryptionKeys nodeEncKeys = (NodeEncryptionKeys)discoData.joiningNodeData(); + NodeEncryptionKeys nodeEncKeys = discoData.joiningNodeData(); - if (!discoData.hasJoiningNodeData() || nodeEncKeys == null) { + if (nodeEncKeys == null) { return new IgniteNodeValidationResult(ctx.localNodeId(), "Joining node doesn't have encryption data [node=" + node.id() + "]", "Joining node doesn't have encryption data."); @@ -522,7 +522,7 @@ public void onLocalJoin() { /** {@inheritDoc} */ @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) { - NodeEncryptionKeys nodeEncryptionKeys = (NodeEncryptionKeys)data.joiningNodeData(); + NodeEncryptionKeys nodeEncryptionKeys = data.joiningNodeData(); if (nodeEncryptionKeys == null || nodeEncryptionKeys.newKeys == null || ctx.clientNode()) return; @@ -1748,45 +1748,6 @@ private String decryptKeyName(byte[] data) { }); } - /** */ - protected static class NodeEncryptionKeys implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - NodeEncryptionKeys( - HashMap> knownKeysWithIds, - Map newKeys, - byte[] masterKeyDigest - ) { - this.newKeys = newKeys; - this.masterKeyDigest = masterKeyDigest; - - if (F.isEmpty(knownKeysWithIds)) - return; - - // To be able to join the old cluster. - knownKeys = U.newHashMap(knownKeysWithIds.size()); - - for (Map.Entry> entry : knownKeysWithIds.entrySet()) - knownKeys.put(entry.getKey(), entry.getValue().get(0).key()); - - this.knownKeysWithIds = knownKeysWithIds; - } - - /** Known i.e. stored in {@code ReadWriteMetastorage} keys from node (in compatible format). */ - Map knownKeys; - - /** New keys i.e. keys for a local statically configured caches. */ - Map newKeys; - - /** Master key digest. */ - byte[] masterKeyDigest; - - /** Known i.e. stored in {@code ReadWriteMetastorage} keys from node. */ - Map> knownKeysWithIds; - } - /** */ private class GenerateEncryptionKeyFuture extends GridFutureAdapter, byte[]>> { /** */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/NodeEncryptionKeys.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/NodeEncryptionKeys.java new file mode 100644 index 0000000000000..b8ba29880baa3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/NodeEncryptionKeys.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.encryption; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** */ +public class NodeEncryptionKeys implements Serializable, Message { + /** */ + private static final long serialVersionUID = 0L; + + /** Known i.e. stored in {@code ReadWriteMetastorage} keys from node (in compatible format). */ + @Order(0) + Map knownKeys; + + /** New keys i.e. keys for a local statically configured caches. */ + @Order(1) + Map newKeys; + + /** Master key digest. */ + @Order(2) + byte[] masterKeyDigest; + + /** Known i.e. stored in {@code ReadWriteMetastorage} keys from node. */ + Map> knownKeysWithIds; + + /** */ + public NodeEncryptionKeys() {} + + /** */ + NodeEncryptionKeys( + HashMap> knownKeysWithIds, + Map newKeys, + byte[] masterKeyDigest + ) { + this.newKeys = newKeys; + this.masterKeyDigest = masterKeyDigest; + + if (F.isEmpty(knownKeysWithIds)) + return; + + // To be able to join the old cluster. + knownKeys = U.newHashMap(knownKeysWithIds.size()); + + for (Map.Entry> entry : knownKeysWithIds.entrySet()) + knownKeys.put(entry.getKey(), entry.getValue().get(0).key()); + + this.knownKeysWithIds = knownKeysWithIds; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 987be04ea2fe8..8cde6f58f6ae9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -83,6 +83,7 @@ import org.apache.ignite.plugin.CachePluginProvider; import org.apache.ignite.plugin.PluginProvider; import org.apache.ignite.spi.discovery.DiscoveryDataBag; +import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.systemview.view.CacheGroupView; import org.apache.ignite.spi.systemview.view.CacheView; import org.jetbrains.annotations.Nullable; @@ -1257,7 +1258,7 @@ private boolean validateStartNewCache( * @param dataBag Discovery data bag. */ void collectJoiningNodeData(DiscoveryDataBag dataBag) { - dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), joinDiscoveryData()); + dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), new ObjectData(joinDiscoveryData())); } /** @@ -2038,7 +2039,7 @@ public ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, Affini */ public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) { if (data.hasJoiningNodeData()) { - Serializable joiningNodeData = data.joiningNodeData(); + Serializable joiningNodeData = ObjectData.unwrap(data.joiningNodeData()); if (joiningNodeData instanceof CacheClientReconnectDiscoveryData) { if (disconnectedState()) { @@ -2062,7 +2063,7 @@ else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) */ public String validateJoiningNodeData(DiscoveryDataBag.JoiningNodeDiscoveryData data, boolean joiningNodeClient) { if (data.hasJoiningNodeData()) { - Serializable joiningNodeData = data.joiningNodeData(); + Serializable joiningNodeData = ObjectData.unwrap(data.joiningNodeData()); if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) { CacheJoinNodeDiscoveryData joinData = (CacheJoinNodeDiscoveryData)joiningNodeData; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java index 4f72dc259d8a8..efbc586e3cc9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -60,6 +61,7 @@ import org.apache.ignite.plugin.security.SecurityException; import org.apache.ignite.spi.IgniteNodeValidationResult; import org.apache.ignite.spi.discovery.DiscoveryDataBag; +import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.encryption.EncryptionSpi; import org.jetbrains.annotations.Nullable; @@ -123,8 +125,10 @@ public class ValidationOnNodeJoinUtils { GridKernalContext ctx, Function cacheDescProvider ) { - if (discoData.hasJoiningNodeData() && discoData.joiningNodeData() instanceof CacheJoinNodeDiscoveryData) { - CacheJoinNodeDiscoveryData nodeData = (CacheJoinNodeDiscoveryData)discoData.joiningNodeData(); + Serializable joiningNodeData = ObjectData.unwrap(discoData.joiningNodeData()); + + if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) { + CacheJoinNodeDiscoveryData nodeData = (CacheJoinNodeDiscoveryData)joiningNodeData; boolean isGridActive = ctx.state().clusterState().active(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionsData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionsData.java new file mode 100644 index 0000000000000..706d2c26f1caa --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionsData.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.binary; + +import java.util.Map; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** */ +public class BinaryMetadataVersionsData implements Message { + /** */ + @Order(0) + Map data; + + /** */ + public BinaryMetadataVersionsData() {} + + /** + * @param data Data. + */ + public BinaryMetadataVersionsData(Map data) { + this.data = Map.copyOf(data); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index a0682d41b4174..8867e576ec8af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -1394,7 +1394,7 @@ private int partition(CacheObjectContext ctx, @Nullable GridCacheContext cctx, O if ((res = validateBinaryConfiguration(rmtNode)) != null) return res; - return validateBinaryMetadata(rmtNode.id(), (Map)discoData.joiningNodeData()); + return validateBinaryMetadata(rmtNode.id(), discoData.joiningNodeData()); } /** */ @@ -1418,11 +1418,11 @@ private IgniteNodeValidationResult validateBinaryConfiguration(ClusterNode rmtNo } /** */ - private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId, Map newNodeMeta) { + private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId, BinaryMetadataVersionsData newNodeMeta) { if (newNodeMeta == null) return null; - for (Map.Entry metaEntry : newNodeMeta.entrySet()) { + for (Map.Entry metaEntry : newNodeMeta.data.entrySet()) { if (!metadataLocCache.containsKey(metaEntry.getKey())) continue; @@ -1470,24 +1470,19 @@ private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId, Map res = U.newHashMap(metadataLocCache.size()); - - for (Map.Entry e : metadataLocCache.entrySet()) - res.put(e.getKey(), e.getValue()); - - dataBag.addJoiningNodeData(BINARY_PROC.ordinal(), (Serializable)res); + dataBag.addJoiningNodeData(BINARY_PROC.ordinal(), new BinaryMetadataVersionsData(metadataLocCache)); } /** {@inheritDoc} */ @Override public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) { - Map newNodeMeta = (Map)data.joiningNodeData(); + BinaryMetadataVersionsData newNodeMeta = data.joiningNodeData(); if (newNodeMeta == null) return; UUID joiningNode = data.joiningNodeId(); - for (Map.Entry metaEntry : newNodeMeta.entrySet()) { + for (Map.Entry metaEntry : newNodeMeta.data.entrySet()) { if (metadataLocCache.containsKey(metaEntry.getKey())) { BinaryMetadataVersionInfo locMetaVerInfo = metadataLocCache.get(metaEntry.getKey()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java index b7d78a906a5b7..834fb3439784f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java @@ -79,6 +79,7 @@ import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; +import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.jetbrains.annotations.Nullable; @@ -465,7 +466,7 @@ public IgniteFuture clientReconnectFuture() { /** {@inheritDoc} */ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) { - dataBag.addJoiningNodeData(CLUSTER_PROC.ordinal(), getDiscoveryData()); + dataBag.addJoiningNodeData(CLUSTER_PROC.ordinal(), new ObjectData(getDiscoveryData())); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index 4640faf6d8e35..002c28bb68867 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -99,6 +99,7 @@ import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.spi.IgniteNodeValidationResult; import org.apache.ignite.spi.discovery.DiscoveryDataBag; +import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.systemview.view.BaselineNodeAttributeView; import org.apache.ignite.spi.systemview.view.BaselineNodeView; import org.jetbrains.annotations.Nullable; @@ -925,14 +926,7 @@ protected IgniteCheckedException concurrentStateChangeError(ClusterState state, /** {@inheritDoc} */ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) { - try { - byte[] marshalledState = marsh.marshal(globalState); - - dataBag.addJoiningNodeData(discoveryDataType().ordinal(), marshalledState); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + dataBag.addJoiningNodeData(discoveryDataType().ordinal(), new ObjectData(globalState)); } /** {@inheritDoc} */ @@ -953,20 +947,7 @@ protected IgniteCheckedException concurrentStateChangeError(ClusterState state, return; } - DiscoveryDataClusterState joiningNodeState = null; - - try { - if (joiningNodeData.joiningNodeData() != null) - joiningNodeState = marsh.unmarshal( - (byte[])joiningNodeData.joiningNodeData(), - U.resolveClassLoader(ctx.config()) - ); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal disco data from joining node: " + joiningNodeData.joiningNodeId()); - - return; - } + DiscoveryDataClusterState joiningNodeState = ObjectData.unwrap(joiningNodeData.joiningNodeData()); BaselineTopologyHistory historyToSend = null; @@ -1251,18 +1232,7 @@ public IgniteInternalFuture changeGlobalState( return null; } - DiscoveryDataClusterState joiningNodeState; - - try { - joiningNodeState = marsh.unmarshal((byte[])discoData.joiningNodeData(), U.resolveClassLoader(ctx.config())); - } - catch (IgniteCheckedException e) { - String msg = "Error on unmarshalling discovery data " + - "from node " + node.consistentId() + ": " + e.getMessage() + - "; node is not allowed to join"; - - return new IgniteNodeValidationResult(node.id(), msg); - } + DiscoveryDataClusterState joiningNodeState = ObjectData.unwrap(discoData.joiningNodeData()); if (joiningNodeState == null || joiningNodeState.baselineTopology() == null) return null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java index ad24ff1805387..b8109f5e36541 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java @@ -25,6 +25,7 @@ import java.util.UUID; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.spi.discovery.DiscoveryDataBag; +import org.apache.ignite.spi.discovery.ObjectData; import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC; @@ -56,8 +57,8 @@ void collectJoiningNodeData(DiscoveryDataBag dataBag) { info.sourceNodeId(dataBag.joiningNodeId()); } - dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(), - new ContinuousRoutinesJoiningNodeDiscoveryData(new ArrayList<>(startedRoutines.values()))); + dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(), new ObjectData( + new ContinuousRoutinesJoiningNodeDiscoveryData(new ArrayList<>(startedRoutines.values())))); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 1786bdf400317..44b350511f1af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -97,6 +97,7 @@ import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData; +import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.systemview.view.ContinuousQueryView; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; @@ -417,7 +418,7 @@ public void unlockStopping() { Serializable data = getDiscoveryData(dataBag.joiningNodeId()); if (data != null) - dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(), data); + dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(), new ObjectData(data)); } /** {@inheritDoc} */ @@ -521,8 +522,7 @@ private Map copyLocalInfos(Map l if (discoProtoVer == 2) { if (data.hasJoiningNodeData()) { - ContinuousRoutinesJoiningNodeDiscoveryData nodeData = (ContinuousRoutinesJoiningNodeDiscoveryData) - data.joiningNodeData(); + ContinuousRoutinesJoiningNodeDiscoveryData nodeData = ObjectData.unwrap(data.joiningNodeData()); for (ContinuousRoutineInfo routineInfo : nodeData.startedRoutines) { routinesInfo.addRoutineInfo(routineInfo); @@ -533,7 +533,7 @@ private Map copyLocalInfos(Map l } else { if (data.hasJoiningNodeData()) - onDiscoveryDataReceivedV1((DiscoveryData)data.joiningNodeData()); + onDiscoveryDataReceivedV1(ObjectData.unwrap(data.joiningNodeData())); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java index f6895734545a5..8946672364edb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java @@ -326,7 +326,7 @@ private final class MappingAcceptedListener implements CustomEventListener> mappings = (List>)data.joiningNodeData(); + MarshallerMappingsData mappingsData = data.joiningNodeData(); - processIncomingMappings(mappings); + processIncomingMappings(mappingsData.mappings); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java index eae07d2d01b8b..4a000fadc231e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java @@ -19,20 +19,27 @@ import java.io.Serializable; import java.util.Objects; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Contains mapped class name and boolean flag showing whether this mapping was accepted by other nodes or not. */ -public final class MappedName implements Serializable { +public final class MappedName implements Serializable, Message { /** */ private static final long serialVersionUID = 0L; /** */ - private final String clsName; + @Order(0) + String clsName; /** */ - private final boolean accepted; + @Order(1) + boolean accepted; + + /** */ + public MappedName() {} /** * @param clsName Class name. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java new file mode 100644 index 0000000000000..2207b1c21f47c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.marshaller; + +import java.util.List; +import java.util.Map; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** */ +public class MarshallerMappingsData implements Message { + /** */ + @Order(0) + List> mappings; + + /** */ + public MarshallerMappingsData() {} + + /** + * @param mappings Mappings. + */ + public MarshallerMappingsData(List> mappings) { + this.mappings = mappings; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java index 2e24715108b7b..66739cfc7d8d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java @@ -71,6 +71,7 @@ import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData; +import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.systemview.view.MetastorageView; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -571,14 +572,9 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) { EMPTY_ARRAY ); - try { - dataBag.addJoiningNodeData(COMPONENT_ID, marshaller.marshal(data)); + dataBag.addJoiningNodeData(COMPONENT_ID, new ObjectData(data)); - return; - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + return; } Serializable data = new DistributedMetaStorageJoiningNodeData( @@ -587,12 +583,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) { histCache.toArray() ); - try { - dataBag.addJoiningNodeData(COMPONENT_ID, marshaller.marshal(data)); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + dataBag.addJoiningNodeData(COMPONENT_ID, new ObjectData(data)); } finally { lock.readLock().unlock(); @@ -889,18 +880,9 @@ private String validatePayload(DistributedMetaStorageJoiningNodeData joiningData @Nullable private DistributedMetaStorageJoiningNodeData getJoiningNodeData( JoiningNodeDiscoveryData discoData ) { - byte[] data = (byte[])discoData.joiningNodeData(); + assert discoData.joiningNodeData() != null; - assert data != null; - - try { - return marshaller.unmarshal(data, U.gridClassLoader()); - } - catch (IgniteCheckedException e) { - log.error("Unable to unmarshal joinging node data for distributed metastorage component.", e); - - return null; - } + return ObjectData.unwrap(discoData.joiningNodeData()); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java index 6a8fbf2962ebb..65ec86d3567c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java @@ -42,6 +42,7 @@ import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData; +import org.apache.ignite.spi.discovery.ObjectData; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.PLUGIN; @@ -160,7 +161,7 @@ public T createComponent(Class cls) { Serializable pluginsData = getDiscoveryData(dataBag.joiningNodeId()); if (pluginsData != null) - dataBag.addJoiningNodeData(PLUGIN.ordinal(), pluginsData); + dataBag.addJoiningNodeData(PLUGIN.ordinal(), new ObjectData(pluginsData)); } /** {@inheritDoc} */ @@ -194,7 +195,7 @@ private Serializable getDiscoveryData(UUID joiningNodeId) { /** {@inheritDoc} */ @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) { if (data.hasJoiningNodeData()) { - Map pluginsData = (Map)data.joiningNodeData(); + Map pluginsData = ObjectData.unwrap(data.joiningNodeData()); applyPluginsData(data.joiningNodeId(), pluginsData); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 6df09f83c413c..f34d93f8ac258 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -152,6 +152,7 @@ import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.session.SessionContext; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.indexing.IndexingQueryFilter; @@ -496,28 +497,19 @@ public void onCacheReconnect() throws IgniteCheckedException { /** {@inheritDoc} */ @Override public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) { - if (data.hasJoiningNodeData() && data.joiningNodeData() instanceof Map) { - Map nodeSpecificDataMap = (Map)data.joiningNodeData(); + Message joiningNodeData = data.joiningNodeData(); - if (nodeSpecificDataMap.containsKey(INLINE_SIZES_DISCO_BAG_KEY)) { - Serializable serializable = nodeSpecificDataMap.get(INLINE_SIZES_DISCO_BAG_KEY); + if (joiningNodeData instanceof InlineSizesData) { + Map joiningNodeIndexesInlineSize = ((InlineSizesData)joiningNodeData).sizes; - assert serializable instanceof Map : serializable; - - Map joiningNodeIndexesInlineSize = (Map)serializable; - - checkInlineSizes(secondaryIndexesInlineSize(), joiningNodeIndexesInlineSize, data.joiningNodeId()); - } + checkInlineSizes(secondaryIndexesInlineSize(), joiningNodeIndexesInlineSize, data.joiningNodeId()); } } /** {@inheritDoc} */ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) { - HashMap dataMap = new HashMap<>(); - - dataMap.put(INLINE_SIZES_DISCO_BAG_KEY, collectSecondaryIndexesInlineSize()); - - dataBag.addJoiningNodeData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), dataMap); + dataBag.addJoiningNodeData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), + new InlineSizesData(secondaryIndexesInlineSize())); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java new file mode 100644 index 0000000000000..eb3813501f670 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import java.util.Map; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** */ +public class InlineSizesData implements Message { + /** */ + @Order(0) + Map sizes; + + /** */ + public InlineSizesData() {} + + /** + * @param sizes Inline sizes. + */ + public InlineSizesData(Map sizes) { + this.sizes = sizes; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java index ed9153b4969be..63e61a9c37e2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java @@ -93,6 +93,7 @@ import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoverySpi; +import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry; import org.apache.ignite.spi.systemview.view.ServiceView; @@ -407,7 +408,8 @@ private void cancelDeployedServices() { @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) { ArrayList staticServicesInfo = staticallyConfiguredServices(true); - dataBag.addJoiningNodeData(SERVICE_PROC.ordinal(), new ServiceProcessorJoinNodeDiscoveryData(staticServicesInfo)); + dataBag.addJoiningNodeData(SERVICE_PROC.ordinal(), new ObjectData( + new ServiceProcessorJoinNodeDiscoveryData(staticServicesInfo))); } /** {@inheritDoc} */ @@ -418,7 +420,9 @@ private void cancelDeployedServices() { if (data.joiningNodeData() == null) return null; - List svcs = ((ServiceProcessorJoinNodeDiscoveryData)data.joiningNodeData()).services(); + ServiceProcessorJoinNodeDiscoveryData srvcProcData = ObjectData.unwrap(data.joiningNodeData()); + + List svcs = srvcProcData.services(); if (ctx.security().enabled()) { SecurityException err = checkDeployPermissionDuringJoin(node, svcs); @@ -445,7 +449,7 @@ private void cancelDeployedServices() { if (data.joiningNodeData() == null) return; - ServiceProcessorJoinNodeDiscoveryData joinData = (ServiceProcessorJoinNodeDiscoveryData)data.joiningNodeData(); + ServiceProcessorJoinNodeDiscoveryData joinData = ObjectData.unwrap(data.joiningNodeData()); for (ServiceInfo desc : joinData.services()) { assert desc.topologySnapshot().isEmpty(); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java index 28fc5f1a7b841..2ffdd4e7e25e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java @@ -24,6 +24,7 @@ import java.util.UUID; import org.apache.ignite.internal.GridComponent; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** @@ -44,8 +45,12 @@ public interface JoiningNodeDiscoveryData { /** @return Whether joining node provided discovery data. */ boolean hasJoiningNodeData(); - /** @return Joining node data. */ - Serializable joiningNodeData(); + /** + * @param Type of message. + * + * @return Joining node data. + */ + T joiningNodeData(); } /** @@ -80,8 +85,8 @@ private final class JoiningNodeDiscoveryDataImpl implements JoiningNodeDiscovery } /** {@inheritDoc} */ - @Override @Nullable public Serializable joiningNodeData() { - return joiningNodeData.get(cmpId); + @Override @Nullable public T joiningNodeData() { + return (T)joiningNodeData.get(cmpId); } /** @@ -158,7 +163,7 @@ private void reinitNodeSpecData(int cmpId) { private Set cmnDataInitializedCmps; /** */ - private Map joiningNodeData = new HashMap<>(); + private Map joiningNodeData = new HashMap<>(); /** */ private Map commonData = new HashMap<>(); @@ -238,8 +243,9 @@ public JoiningNodeDiscoveryData newJoinerDiscoveryData(int cmpId) { /** * @param cmpId Component ID. * @param data Data. + * @param Type of message. */ - public void addJoiningNodeData(Integer cmpId, Serializable data) { + public void addJoiningNodeData(Integer cmpId, T data) { joiningNodeData.put(cmpId, data); } @@ -275,7 +281,7 @@ public boolean commonDataCollectedFor(Integer cmpId) { /** * @param joinNodeData Joining node data. */ - public void joiningNodeData(Map joinNodeData) { + public void joiningNodeData(Map joinNodeData) { joiningNodeData.putAll(joinNodeData); } @@ -294,7 +300,7 @@ public void nodeSpecificData(Map> nodeSpecData) } /** @return Discovery data for each Ignite component that is sent to the cluster nodes by joining node. */ - public Map joiningNodeData() { + public Map joiningNodeData() { return joiningNodeData; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java new file mode 100644 index 0000000000000..7321a67d49a3d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery; + +import java.io.Serializable; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.MarshallableMessage; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; + +/** Wrapper message for serializable data. */ +public class ObjectData implements MarshallableMessage { + /** */ + @GridToStringInclude + private Serializable data; + + /** */ + @GridToStringExclude + @Order(0) + byte[] dataBytes; + + /** */ + public ObjectData() {} + + /** + * @param data Original data. + */ + public ObjectData(Serializable data) { + this.data = data; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (data != null) + dataBytes = U.marshal(marsh, data); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + if (dataBytes != null) { + data = U.unmarshal(marsh, dataBytes, clsLdr); + + dataBytes = null; + } + } + + /** + * @param msg Message. + * @param Type of data. + * + * @return Original data unwrapped from a message. + */ + public static T unwrap(@Nullable Message msg) { + return msg != null ? (T)(((ObjectData)msg).data) : null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ObjectData.class, this); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 17bda88dcbc0b..e0fb821495479 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1915,7 +1915,6 @@ private void clearNodeAddedMessage(TcpDiscoveryAbstractMessage msg) { nodeAddedMsg.topology(null); nodeAddedMsg.topologyHistory(null); nodeAddedMsg.messages(null); - nodeAddedMsg.clearUnmarshalledDiscoveryData(); } } @@ -4337,19 +4336,9 @@ else if (log.isDebugEnabled()) err = spi.getSpiContext().validateNode(node); if (err == null) { - try { - DiscoveryDataBag data = msg.gridDiscoveryData().unmarshalJoiningNodeData( - spi.marshaller(), - U.resolveClassLoader(spi.ignite().configuration()), - false, - log - ); - - err = spi.getSpiContext().validateNode(node, data); - } - catch (IgniteCheckedException e) { - err = new IgniteNodeValidationResult(node.id(), e.getMessage()); - } + DiscoveryDataBag data = msg.gridDiscoveryData().bagForJoiningNodeData(); + + err = spi.getSpiContext().validateNode(node, data); } if (err != null) { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 903524c8aa96b..b9b69a488d754 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -2073,11 +2073,7 @@ DiscoveryDataPacket collectExchangeData(DiscoveryDataPacket dataPacket) { //marshall collected bag into packet, return packet if (dataPacket.joiningNodeId().equals(locNode.id())) - dataPacket.marshalJoiningNodeData( - dataBag, - marshaller(), - ignite.configuration().getNetworkCompressionLevel(), - log); + dataPacket.addJoiningNodeData(dataBag); else dataPacket.marshalGridNodeData( dataBag, @@ -2114,7 +2110,7 @@ protected void onExchange(DiscoveryDataPacket dataPacket, ClassLoader clsLdr) { } } else - dataBag = dataPacket.unmarshalJoiningNodeDataSilently(marshaller(), clsLdr, locNode.clientRouterNodeId() != null, log); + dataBag = dataPacket.bagForJoiningNodeData(); exchange.onExchange(dataBag); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java index 4922e7c0e86c3..98523a5c6bfbb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java @@ -24,10 +24,11 @@ import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.Compress; import org.apache.ignite.internal.GridComponent; import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; @@ -52,11 +53,9 @@ public class DiscoveryDataPacket implements Serializable, Message { UUID joiningNodeId; /** */ + @Compress @Order(1) - Map joiningNodeData = new HashMap<>(); - - /** */ - private transient Map unmarshalledJoiningNodeData; + Map joiningNodeData = new HashMap<>(); /** */ @Order(2) @@ -114,12 +113,10 @@ public void marshalGridNodeData(DiscoveryDataBag bag, UUID nodeId, Marshaller ma /** * @param bag Bag. - * @param marsh Marsh. - * @param log Logger. */ - public void marshalJoiningNodeData(DiscoveryDataBag bag, Marshaller marsh, - int compressionLevel, IgniteLogger log) { - marshalData(bag.joiningNodeData(), joiningNodeData, marsh, compressionLevel, log); + public void addJoiningNodeData(DiscoveryDataBag bag) { + if (!F.isEmpty(bag.joiningNodeData())) + joiningNodeData.putAll(bag.joiningNodeData()); } /** @@ -161,67 +158,13 @@ public DiscoveryDataBag unmarshalGridData( } /** - * @param marsh Marsh. - * @param clsLdr Class loader. - * @param clientNode Client node. - * @param log Logger. - * @throws IgniteCheckedException If unmarshalling failed. + * @return Data bag with joining node data. */ - public DiscoveryDataBag unmarshalJoiningNodeData( - Marshaller marsh, - ClassLoader clsLdr, - boolean clientNode, - IgniteLogger log - ) throws IgniteCheckedException { - return unmarshalJoiningNodeData(marsh, clsLdr, clientNode, log, true); - } - - /** - * @param marsh Marsh. - * @param clsLdr Class loader. - * @param clientNode Client node. - * @param log Logger. - */ - public DiscoveryDataBag unmarshalJoiningNodeDataSilently( - Marshaller marsh, - ClassLoader clsLdr, - boolean clientNode, - IgniteLogger log - ) { - try { - return unmarshalJoiningNodeData(marsh, clsLdr, clientNode, log, false); - } - catch (IgniteCheckedException impossible) { - assert false : impossible; - - log.error("Failed to unmarshal joining node data", impossible); - - throw new IgniteException(impossible); - } - } - - /** - * @param marsh Marsh. - * @param clsLdr Class loader. - * @param clientNode Client node. - * @param log Logger. - * @param panic Throw unmarshalling if {@code true}. - * @throws IgniteCheckedException If {@code panic} is {@code true} and unmarshalling failed. - */ - private DiscoveryDataBag unmarshalJoiningNodeData( - Marshaller marsh, - ClassLoader clsLdr, - boolean clientNode, - IgniteLogger log, - boolean panic - ) throws IgniteCheckedException { + public DiscoveryDataBag bagForJoiningNodeData() { DiscoveryDataBag dataBag = new DiscoveryDataBag(joiningNodeId, joiningNodeClient); - if (joiningNodeData != null && !joiningNodeData.isEmpty()) { - unmarshalledJoiningNodeData = unmarshalData(joiningNodeData, marsh, clsLdr, clientNode, log, panic); - - dataBag.joiningNodeData(unmarshalledJoiningNodeData); - } + if (!F.isEmpty(joiningNodeData)) + dataBag.joiningNodeData(joiningNodeData); return dataBag; } @@ -230,7 +173,7 @@ private DiscoveryDataBag unmarshalJoiningNodeData( * */ public boolean hasJoiningNodeData() { - return joiningNodeData != null && !joiningNodeData.isEmpty(); + return !F.isEmpty(joiningNodeData); } /** @@ -443,8 +386,8 @@ private void filterDuplicatedData(Map discoData) { public DiscoveryDataBag bagForDataCollection() { DiscoveryDataBag dataBag = new DiscoveryDataBag(joiningNodeId, commonData.keySet(), joiningNodeClient); - if (unmarshalledJoiningNodeData != null) - dataBag.joiningNodeData(unmarshalledJoiningNodeData); + if (joiningNodeData != null) + dataBag.joiningNodeData(joiningNodeData); return dataBag; } @@ -455,12 +398,4 @@ public DiscoveryDataBag bagForDataCollection() { public void joiningNodeClient(boolean joiningNodeClient) { this.joiningNodeClient = joiningNodeClient; } - - /** - * Clears {@link #unmarshalledJoiningNodeData} - */ - public void clearUnmarshalledJoiningNodeData() { - unmarshalledJoiningNodeData = null; - } - } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index ed717fcbbc070..b810ac6b32220 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -195,15 +195,6 @@ public void clearDiscoveryData() { dataPacket = null; } - /** - * Clears unmarshalled discovery data to minimize message size. - * These data are used only on "collect" stage and are not part of persistent state. - */ - public void clearUnmarshalledDiscoveryData() { - if (dataPacket != null) - dataPacket.clearUnmarshalledJoiningNodeData(); - } - /** @return First grid node start time. */ public long gridStartTime() { return gridStartTime; diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 977f9ddbf684b..34b3f3e3820cb 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -737,7 +737,7 @@ org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyRequest org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse org.apache.ignite.internal.managers.encryption.GridEncryptionManager$EmptyResult org.apache.ignite.internal.managers.encryption.GridEncryptionManager$MasterKeyChangeRequest -org.apache.ignite.internal.managers.encryption.GridEncryptionManager$NodeEncryptionKeys +org.apache.ignite.internal.managers.encryption.NodeEncryptionKeys org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage org.apache.ignite.internal.managers.indexing.GridIndexingManager$1 diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java index 965e75e26cb8d..6d1fa471cffbc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.metastorage; -import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -26,10 +25,11 @@ import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl; -import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; +import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.WithSystemProperty; @@ -537,7 +537,7 @@ public void testVerFromDiscoveryClusterData() throws Exception { DiscoverySpiDataExchange exchange = GridTestUtils.getFieldValue(spi, TcpDiscoverySpi.class, "exchange"); - List> dataBags = new ArrayList<>(); + List> dataBags = new ArrayList<>(); spi.setDataExchange(new DiscoverySpiDataExchange() { @Override public DiscoveryDataBag collect(DiscoveryDataBag dataBag) { @@ -555,11 +555,9 @@ public void testVerFromDiscoveryClusterData() throws Exception { assertEquals(1, dataBags.size()); - byte[] joiningNodeDataMarshalled = (byte[])dataBags.get(0).get(META_STORAGE.ordinal()); + Object joiningNodeData = ObjectData.unwrap(dataBags.get(0).get(META_STORAGE.ordinal())); - assertNotNull(joiningNodeDataMarshalled); - - Object joiningNodeData = TEST_JDK_MARSHALLER.unmarshal(joiningNodeDataMarshalled, U.gridClassLoader()); + assertNotNull(joiningNodeData); Object[] hist = GridTestUtils.getFieldValue(joiningNodeData, "hist"); diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 7abe3ddf1ed20..0d09119d205a9 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -69,6 +69,7 @@ import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.LT; @@ -88,6 +89,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; +import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; @@ -804,7 +806,8 @@ private void joinTopology(@Nullable ZkRuntimeState prevState) throws Interrupted exchange.collect(discoDataBag); - ZkJoiningNodeData joinData = new ZkJoiningNodeData(locNode, discoDataBag.joiningNodeData()); + ZkJoiningNodeData joinData = new ZkJoiningNodeData(locNode, + F.viewReadOnly(discoDataBag.joiningNodeData(), ObjectData::unwrap)); byte[] joinDataBytes; @@ -2070,7 +2073,7 @@ private ZkNodeValidateResult validateJoiningNode(ZkJoiningNodeData joiningNodeDa if (err == null) { DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(node.id(), joiningNodeData.node().isClient()); - joiningNodeBag.joiningNodeData(joiningNodeData.discoveryData()); + joiningNodeBag.joiningNodeData(F.viewReadOnly(joiningNodeData.discoveryData(), ObjectData::new)); err = spi.getSpiContext().validateNode(node, joiningNodeBag); } @@ -2237,7 +2240,7 @@ private void addJoinedNode( DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(nodeId, joiningNodeData.node().isClient()); - joiningNodeBag.joiningNodeData(joiningNodeData.discoveryData()); + joiningNodeBag.joiningNodeData(F.viewReadOnly(joiningNodeData.discoveryData(), ObjectData::new)); exchange.onExchange(joiningNodeBag); @@ -2873,7 +2876,7 @@ private boolean processBulkJoin(ZkDiscoveryEventsData evtsData, ZkDiscoveryNodeJ DiscoveryDataBag dataBag = new DiscoveryDataBag(joinedEvtData.nodeId, joiningData.node().isClient()); - dataBag.joiningNodeData(joiningData.discoveryData()); + dataBag.joiningNodeData(F.viewReadOnly(joiningData.discoveryData(), ObjectData::new)); exchange.onExchange(dataBag); } From 3657a4a61eb48502ad479baa91d2cf7544b248ff Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Fri, 24 Apr 2026 22:19:32 +0300 Subject: [PATCH 2/3] Fix binary --- .../ignite/internal/CoreMessagesProvider.java | 7 ++++- .../cache/binary/BinaryMetadataTransport.java | 20 ++---------- .../binary/BinaryMetadataVersionInfo.java | 31 +++++++------------ 3 files changed, 19 insertions(+), 39 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index c5384a48fb6c0..fa6e0526e1db0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -632,7 +632,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(MetadataRequestMessage.class); withNoSchema(MetadataResponseMessage.class); withNoSchema(MarshallerMappingItem.class); - withNoSchema(BinaryMetadataVersionInfo.class); + withSchemaResolvedClassLoader(BinaryMetadataVersionInfo.class); withNoSchema(BinaryMetadataVersionsData.class); withNoSchema(MappedName.class); withNoSchema(MarshallerMappingsData.class); @@ -674,6 +674,11 @@ private void withNoSchemaResolvedClassLoader(Class cls) { register(cls, dfltMarsh, resolvedClsLdr); } + /** Registers message using {@link #schemaAwareMarsh} and {@link #resolvedClsLdr}. */ + private void withSchemaResolvedClassLoader(Class cls) { + register(cls, schemaAwareMarsh, resolvedClsLdr); + } + /** Registers message using incrementing {@link #msgIdx} as the message id/type. */ private void register(Class cls, Marshaller marsh, ClassLoader clsLrd) { register(factory, cls, msgIdx++, marsh, clsLrd); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java index 0cfb2d96768b9..5b36542f1d6c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java @@ -841,15 +841,6 @@ private final class MetadataRequestListener implements GridMessageListener { MetadataResponseMessage resp = new MetadataResponseMessage(typeId); - if (metaVerInfo != null) { - try { - metaVerInfo.marshalMetadata(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal binary metadata for [typeId=" + typeId + ']', e); - } - } - resp.metadataVersionInfo(metaVerInfo); try { @@ -890,16 +881,9 @@ private final class MetadataResponseListener implements GridMessageListener { return; } - try { - metaVerInfo.unmarshalMetadata(); - - casBinaryMetadata(typeId, metaVerInfo); + casBinaryMetadata(typeId, metaVerInfo); - fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1)); - } - catch (IgniteCheckedException e) { - fut.onDone(MetadataUpdateResult.createFailureResult(new BinaryObjectException(e))); - } + fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1)); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionInfo.java index 1c71892fd1795..9fe6d19da4421 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionInfo.java @@ -18,12 +18,11 @@ import java.io.Serializable; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.MarshallableMessage; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.binary.BinaryMetadata; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.plugin.extensions.communication.Message; - -import static org.apache.ignite.marshaller.Marshallers.jdk; +import org.apache.ignite.marshaller.Marshaller; /** * Wrapper for {@link BinaryMetadata} which is stored in metadata local cache on each node. @@ -31,7 +30,7 @@ * The version refers solely to the internal protocol for updating BinaryMetadata and is unknown externally. * It can be updated dynamically from different nodes and threads on the same node. */ -public final class BinaryMetadataVersionInfo implements Serializable, Message { +public final class BinaryMetadataVersionInfo implements Serializable, MarshallableMessage { /** */ private static final long serialVersionUID = 0L; @@ -130,24 +129,16 @@ boolean removing() { return removing; } - /** - * Marshals binary metadata to byte array. - * - * @throws IgniteCheckedException If failed. - */ - public void marshalMetadata() throws IgniteCheckedException { - if (metadataBytes == null) - metadataBytes = U.marshal(jdk(), metadata); + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (metadata != null) + metadataBytes = U.marshal(marsh, metadata); } - /** - * Unmarshals binary metadata from byte array. - * - * @throws IgniteCheckedException If failed. - */ - public void unmarshalMetadata() throws IgniteCheckedException { - if (metadata == null && metadataBytes != null) { - metadata = U.unmarshal(jdk(), metadataBytes, U.gridClassLoader()); + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + if (metadataBytes != null) { + metadata = U.unmarshal(marsh, metadataBytes, clsLdr); // It is not required anymore. metadataBytes = null; From 511d55f5b7f22a6be4b7aa4397a9c2e592cf380b Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Sun, 26 Apr 2026 17:40:01 +0300 Subject: [PATCH 3/3] Fix encryption and zookeeper --- .../ignite/internal/CoreMessagesProvider.java | 2 ++ .../managers/encryption/GroupKeyEncrypted.java | 13 ++++++++++--- .../managers/encryption/NodeEncryptionKeys.java | 1 + .../zk/internal/DiscoveryMessageParser.java | 13 ++++++------- .../discovery/zk/internal/ZkJoiningNodeData.java | 6 +++--- .../zk/internal/ZookeeperDiscoveryImpl.java | 9 ++++----- 6 files changed, 26 insertions(+), 18 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index fa6e0526e1db0..1888931ba2d7e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.managers.encryption.ChangeCacheEncryptionRequest; import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyRequest; import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse; +import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted; import org.apache.ignite.internal.managers.encryption.MasterKeyChangeRequest; import org.apache.ignite.internal.managers.encryption.NodeEncryptionKeys; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage; @@ -644,6 +645,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(ChangeCacheEncryptionRequest.class); withNoSchema(MasterKeyChangeRequest.class); withNoSchema(NodeEncryptionKeys.class); + withNoSchema(GroupKeyEncrypted.class); // [13000 - 13300]: Control, configuration, diagnostincs and other messages. msgIdx = 13000; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GroupKeyEncrypted.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GroupKeyEncrypted.java index 6b2ed0543038b..ee02a8b36acda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GroupKeyEncrypted.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GroupKeyEncrypted.java @@ -18,19 +18,26 @@ package org.apache.ignite.internal.managers.encryption; import java.io.Serializable; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Cache group encryption key with identifier. Key is encrypted. */ -public class GroupKeyEncrypted implements Serializable { +public class GroupKeyEncrypted implements Serializable, Message { /** Serial version UID. */ private static final long serialVersionUID = 0L; /** Encryption key ID. */ - private final int id; + @Order(0) + int id; /** Encryption key. */ - private final byte[] key; + @Order(1) + byte[] key; + + /** */ + public GroupKeyEncrypted() {} /** * @param id Encryption key ID. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/NodeEncryptionKeys.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/NodeEncryptionKeys.java index b8ba29880baa3..624bf08382bb1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/NodeEncryptionKeys.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/NodeEncryptionKeys.java @@ -44,6 +44,7 @@ public class NodeEncryptionKeys implements Serializable, Message { byte[] masterKeyDigest; /** Known i.e. stored in {@code ReadWriteMetastorage} keys from node. */ + @Order(3) Map> knownKeysWithIds; /** */ diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java index 6512c2ac9a42b..d29a73f7039f4 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java @@ -31,7 +31,6 @@ import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageSerializer; import org.apache.ignite.spi.IgniteSpiException; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType; @@ -51,11 +50,11 @@ public DiscoveryMessageParser(MessageFactory msgFactory) { } /** Marshals discovery message to bytes array. */ - public byte[] marshalZip(DiscoverySpiCustomMessage msg) { + public byte[] marshalZip(Message msg) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (DeflaterOutputStream out = new DeflaterOutputStream(baos)) { - serializeMessage((Message)msg, out); + serializeMessage(msg, out); } catch (Exception e) { throw new IgniteSpiException("Failed to serialize message: " + msg, e); @@ -65,12 +64,12 @@ public byte[] marshalZip(DiscoverySpiCustomMessage msg) { } /** Unmarshals discovery message from bytes array. */ - public DiscoverySpiCustomMessage unmarshalZip(byte[] bytes) { + public T unmarshalZip(byte[] bytes) { try ( ByteArrayInputStream bais = new ByteArrayInputStream(bytes); InflaterInputStream in = new InflaterInputStream(bais) ) { - return (DiscoverySpiCustomMessage)deserializeMessage(in); + return deserializeMessage(in); } catch (Exception e) { throw new IgniteSpiException("Failed to deserialize message.", e); @@ -99,7 +98,7 @@ private void serializeMessage(Message m, OutputStream out) throws IOException { } /** */ - private Message deserializeMessage(InputStream in) throws IOException { + private T deserializeMessage(InputStream in) throws IOException { DirectMessageReader msgReader = new DirectMessageReader(msgFactory, null); ByteBuffer msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE); @@ -127,6 +126,6 @@ private Message deserializeMessage(InputStream in) throws IOException { } while (!finished); - return msg; + return (T)msg; } } diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java index ff8311d071ba8..f2b5ac463685d 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkJoiningNodeData.java @@ -38,7 +38,7 @@ class ZkJoiningNodeData implements Serializable { /** */ @GridToStringInclude - private Map discoData; + private Map discoData; /** * @param partCnt Number of parts in multi-parts message. @@ -51,7 +51,7 @@ class ZkJoiningNodeData implements Serializable { * @param node Node. * @param discoData Discovery data. */ - ZkJoiningNodeData(ZookeeperClusterNode node, Map discoData) { + ZkJoiningNodeData(ZookeeperClusterNode node, Map discoData) { assert node != null && node.id() != null : node; assert discoData != null; @@ -76,7 +76,7 @@ ZookeeperClusterNode node() { /** * @return Discovery data. */ - Map discoveryData() { + Map discoveryData() { return discoData; } diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 0d09119d205a9..0d0531d1a9a2c 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -89,7 +89,6 @@ import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; -import org.apache.ignite.spi.discovery.ObjectData; import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; @@ -807,7 +806,7 @@ private void joinTopology(@Nullable ZkRuntimeState prevState) throws Interrupted exchange.collect(discoDataBag); ZkJoiningNodeData joinData = new ZkJoiningNodeData(locNode, - F.viewReadOnly(discoDataBag.joiningNodeData(), ObjectData::unwrap)); + new HashMap<>(F.viewReadOnly(discoDataBag.joiningNodeData(), msgParser::marshalZip))); byte[] joinDataBytes; @@ -2073,7 +2072,7 @@ private ZkNodeValidateResult validateJoiningNode(ZkJoiningNodeData joiningNodeDa if (err == null) { DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(node.id(), joiningNodeData.node().isClient()); - joiningNodeBag.joiningNodeData(F.viewReadOnly(joiningNodeData.discoveryData(), ObjectData::new)); + joiningNodeBag.joiningNodeData(F.viewReadOnly(joiningNodeData.discoveryData(), msgParser::unmarshalZip)); err = spi.getSpiContext().validateNode(node, joiningNodeBag); } @@ -2240,7 +2239,7 @@ private void addJoinedNode( DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(nodeId, joiningNodeData.node().isClient()); - joiningNodeBag.joiningNodeData(F.viewReadOnly(joiningNodeData.discoveryData(), ObjectData::new)); + joiningNodeBag.joiningNodeData(F.viewReadOnly(joiningNodeData.discoveryData(), msgParser::unmarshalZip)); exchange.onExchange(joiningNodeBag); @@ -2876,7 +2875,7 @@ private boolean processBulkJoin(ZkDiscoveryEventsData evtsData, ZkDiscoveryNodeJ DiscoveryDataBag dataBag = new DiscoveryDataBag(joinedEvtData.nodeId, joiningData.node().isClient()); - dataBag.joiningNodeData(F.viewReadOnly(joiningData.discoveryData(), ObjectData::new)); + dataBag.joiningNodeData(F.viewReadOnly(joiningData.discoveryData(), msgParser::unmarshalZip)); exchange.onExchange(dataBag); }