From cbdf1603590a1ad753a39189cd6367b89cfef5fc Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Wed, 8 Apr 2026 17:22:04 +0500 Subject: [PATCH] IGNITE-28058 Revise custom serialization in DistributedMetaStorage* messages --- .../DistributedMetaStorageCasMessage.java | 39 +++++++++++++++---- .../DistributedMetaStorageImpl.java | 34 ++++++++-------- .../DistributedMetaStorageUpdateMessage.java | 39 +++++++++++++++---- 3 files changed, 80 insertions(+), 32 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java index 62c49f0f07bd9..d40661bbadcad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java @@ -17,18 +17,25 @@ package org.apache.ignite.internal.processors.metastorage.persistence; +import java.io.Serializable; import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.jetbrains.annotations.Nullable; /** */ public class DistributedMetaStorageCasMessage extends DistributedMetaStorageUpdateMessage { - /** TODO: revise the external serialization https://issues.apache.org/jira/browse/IGNITE-28058. */ + /** */ + private @Nullable Serializable expVal; + + /** */ @Order(0) - byte[] expectedVal; + byte[] expValBytes; /** */ @Order(1) @@ -40,16 +47,16 @@ public DistributedMetaStorageCasMessage() { } /** */ - public DistributedMetaStorageCasMessage(UUID reqId, String key, byte[] expValBytes, byte[] valBytes) { - super(reqId, key, valBytes); + public DistributedMetaStorageCasMessage(UUID reqId, String key, @Nullable Serializable expVal, @Nullable Serializable val) { + super(reqId, key, val); - expectedVal = expValBytes; + this.expVal = expVal; matches = true; } /** */ - public byte[] expectedValue() { - return expectedVal; + public Serializable expectedValue() { + return expVal; } /** */ @@ -64,7 +71,23 @@ public boolean matches() { /** {@inheritDoc} */ @Override @Nullable public DiscoveryCustomMessage ackMessage() { - return new DistributedMetaStorageCasAckMessage(requestId(), matches); + return new DistributedMetaStorageCasAckMessage(reqId, matches); + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + super.prepareMarshal(marsh); + + if (expVal != null && expValBytes == null) + expValBytes = U.marshal(marsh, expVal); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh) throws IgniteCheckedException { + super.finishUnmarshal(marsh); + + if (expValBytes != null && expVal == null) + expVal = U.unmarshal(marsh, expValBytes, U.gridClassLoader()); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java index c9c0d6c49f9a1..2e24715108b7b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java @@ -44,7 +44,6 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.NodeStoppingException; -import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.GridProcessorAdapter; @@ -84,7 +83,6 @@ import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageHistoryItem.EMPTY_ARRAY; import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemPrefix; import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemVer; -import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.marshal; import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.unmarshal; import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageVersion.INITIAL_VERSION; import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName; @@ -479,7 +477,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) { @Override public void write(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException { assert val != null : key; - startWrite(key, marshal(marshaller, val)).get(); + startWrite(key, val).get(); } /** {@inheritDoc} */ @@ -489,7 +487,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) { ) throws IgniteCheckedException { assert val != null : key; - return startWrite(key, marshal(marshaller, val)); + return startWrite(key, val); } /** {@inheritDoc} */ @@ -521,7 +519,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) { ) throws IgniteCheckedException { assert newVal != null : key; - return startCas(key, marshal(marshaller, expVal), marshal(marshaller, newVal)); + return startCas(key, expVal, newVal); } /** {@inheritDoc} */ @@ -531,7 +529,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) { ) throws IgniteCheckedException { assert expVal != null : key; - return startCas(key, marshal(marshaller, expVal), null).get(); + return startCas(key, expVal, null).get(); } /** {@inheritDoc} */ @@ -1046,10 +1044,10 @@ else if (!isClient && ver.id() > 0) { * for operation to be completed. * * @param key The key. - * @param valBytes Value bytes to write. Null if value needs to be removed. + * @param val Value to write. Null if value needs to be removed. * @throws IgniteCheckedException If there was an error while sending discovery message. */ - private GridFutureAdapter startWrite(String key, byte[] valBytes) throws IgniteCheckedException { + private GridFutureAdapter startWrite(String key, @Nullable Serializable val) throws IgniteCheckedException { UUID reqId = UUID.randomUUID(); GridFutureAdapter fut = prepareWriteFuture(reqId); @@ -1057,7 +1055,9 @@ private GridFutureAdapter startWrite(String key, byte[] valBytes) throws Igni if (fut.isDone()) return fut; - DiscoveryCustomMessage msg = new DistributedMetaStorageUpdateMessage(reqId, key, valBytes); + DistributedMetaStorageUpdateMessage msg = new DistributedMetaStorageUpdateMessage(reqId, key, val); + + msg.prepareMarshal(marshaller); ctx.discovery().sendCustomEvent(msg); @@ -1065,9 +1065,9 @@ private GridFutureAdapter startWrite(String key, byte[] valBytes) throws Igni } /** - * Basically the same as {@link #startWrite(String, byte[])} but for CAS operations. + * Basically the same as {@link #startWrite(String, Serializable)} but for CAS operations. */ - private GridFutureAdapter startCas(String key, byte[] expValBytes, byte[] newValBytes) + private GridFutureAdapter startCas(String key, @Nullable Serializable expVal, @Nullable Serializable newVal) throws IgniteCheckedException { UUID reqId = UUID.randomUUID(); @@ -1076,7 +1076,9 @@ private GridFutureAdapter startCas(String key, byte[] expValBytes, byte if (fut.isDone()) return fut; - DiscoveryCustomMessage msg = new DistributedMetaStorageCasMessage(reqId, key, expValBytes, newValBytes); + DistributedMetaStorageCasMessage msg = new DistributedMetaStorageCasMessage(reqId, key, expVal, newVal); + + msg.prepareMarshal(marshaller); ctx.discovery().sendCustomEvent(msg); @@ -1134,7 +1136,7 @@ private void onUpdateMessage( if (msg instanceof DistributedMetaStorageCasMessage) completeCas((DistributedMetaStorageCasMessage)msg); else - completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), msg.value())); + completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), msg.valueBytes())); } catch (IgniteInterruptedCheckedException e) { throw U.convertException(e); @@ -1318,16 +1320,16 @@ private void completeCas( Serializable oldVal = bridge.read(msg.key()); - Serializable expVal = unmarshal(marshaller, msg.expectedValue()); + msg.finishUnmarshal(marshaller); - if (!Objects.deepEquals(oldVal, expVal)) { + if (!Objects.deepEquals(oldVal, msg.expectedValue())) { msg.setMatches(false); // Do nothing if expected value doesn't match with the actual one. return; } - completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), msg.value())); + completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), msg.valueBytes())); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java index af4d7256bdba9..b8c06dbfdf244 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java @@ -17,12 +17,16 @@ package org.apache.ignite.internal.processors.metastorage.persistence; +import java.io.Serializable; import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.jetbrains.annotations.Nullable; @@ -42,7 +46,10 @@ public class DistributedMetaStorageUpdateMessage implements DiscoveryCustomMessa @Order(2) String key; - /** TODO: revise the external serialization https://issues.apache.org/jira/browse/IGNITE-28058. */ + /** */ + private @Nullable Serializable val; + + /** */ @Order(3) byte[] valBytes; @@ -52,12 +59,12 @@ public DistributedMetaStorageUpdateMessage() { } /** */ - public DistributedMetaStorageUpdateMessage(UUID reqId, String key, byte[] valBytes) { + public DistributedMetaStorageUpdateMessage(UUID reqId, String key, @Nullable Serializable val) { id = IgniteUuid.randomUuid(); this.reqId = reqId; this.key = key; - this.valBytes = valBytes; + this.val = val; } /** {@inheritDoc} */ @@ -66,17 +73,17 @@ public DistributedMetaStorageUpdateMessage(UUID reqId, String key, byte[] valByt } /** */ - public UUID requestId() { - return reqId; + public String key() { + return key; } /** */ - public String key() { - return key; + public Serializable value() { + return val; } /** */ - public byte[] value() { + public byte[] valueBytes() { return valBytes; } @@ -90,6 +97,22 @@ public byte[] value() { return true; } + /** + * @param marsh Marshaller. + */ + public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (val != null && valBytes == null) + valBytes = U.marshal(marsh, val); + } + + /** + * @param marsh Marshaller. + */ + public void finishUnmarshal(Marshaller marsh) throws IgniteCheckedException { + if (valBytes != null && val == null) + val = U.unmarshal(marsh, valBytes, U.gridClassLoader()); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DistributedMetaStorageUpdateMessage.class, this);