Skip to content

Commit cbdf160

Browse files
committed
IGNITE-28058 Revise custom serialization in DistributedMetaStorage* messages
1 parent d0a1ae0 commit cbdf160

File tree

3 files changed

+80
-32
lines changed

3 files changed

+80
-32
lines changed

modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,25 @@
1717

1818
package org.apache.ignite.internal.processors.metastorage.persistence;
1919

20+
import java.io.Serializable;
2021
import java.util.UUID;
22+
import org.apache.ignite.IgniteCheckedException;
2123
import org.apache.ignite.internal.Order;
2224
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
2325
import org.apache.ignite.internal.util.typedef.internal.S;
26+
import org.apache.ignite.internal.util.typedef.internal.U;
27+
import org.apache.ignite.marshaller.Marshaller;
2428
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2529
import org.jetbrains.annotations.Nullable;
2630

2731
/** */
2832
public class DistributedMetaStorageCasMessage extends DistributedMetaStorageUpdateMessage {
29-
/** TODO: revise the external serialization https://issues.apache.org/jira/browse/IGNITE-28058. */
33+
/** */
34+
private @Nullable Serializable expVal;
35+
36+
/** */
3037
@Order(0)
31-
byte[] expectedVal;
38+
byte[] expValBytes;
3239

3340
/** */
3441
@Order(1)
@@ -40,16 +47,16 @@ public DistributedMetaStorageCasMessage() {
4047
}
4148

4249
/** */
43-
public DistributedMetaStorageCasMessage(UUID reqId, String key, byte[] expValBytes, byte[] valBytes) {
44-
super(reqId, key, valBytes);
50+
public DistributedMetaStorageCasMessage(UUID reqId, String key, @Nullable Serializable expVal, @Nullable Serializable val) {
51+
super(reqId, key, val);
4552

46-
expectedVal = expValBytes;
53+
this.expVal = expVal;
4754
matches = true;
4855
}
4956

5057
/** */
51-
public byte[] expectedValue() {
52-
return expectedVal;
58+
public Serializable expectedValue() {
59+
return expVal;
5360
}
5461

5562
/** */
@@ -64,7 +71,23 @@ public boolean matches() {
6471

6572
/** {@inheritDoc} */
6673
@Override @Nullable public DiscoveryCustomMessage ackMessage() {
67-
return new DistributedMetaStorageCasAckMessage(requestId(), matches);
74+
return new DistributedMetaStorageCasAckMessage(reqId, matches);
75+
}
76+
77+
/** {@inheritDoc} */
78+
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
79+
super.prepareMarshal(marsh);
80+
81+
if (expVal != null && expValBytes == null)
82+
expValBytes = U.marshal(marsh, expVal);
83+
}
84+
85+
/** {@inheritDoc} */
86+
@Override public void finishUnmarshal(Marshaller marsh) throws IgniteCheckedException {
87+
super.finishUnmarshal(marsh);
88+
89+
if (expValBytes != null && expVal == null)
90+
expVal = U.unmarshal(marsh, expValBytes, U.gridClassLoader());
6891
}
6992

7093
/** {@inheritDoc} */

modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.apache.ignite.internal.IgniteInternalFuture;
4545
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
4646
import org.apache.ignite.internal.NodeStoppingException;
47-
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
4847
import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
4948
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
5049
import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -84,7 +83,6 @@
8483
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageHistoryItem.EMPTY_ARRAY;
8584
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemPrefix;
8685
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemVer;
87-
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.marshal;
8886
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.unmarshal;
8987
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageVersion.INITIAL_VERSION;
9088
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
@@ -479,7 +477,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) {
479477
@Override public void write(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException {
480478
assert val != null : key;
481479

482-
startWrite(key, marshal(marshaller, val)).get();
480+
startWrite(key, val).get();
483481
}
484482

485483
/** {@inheritDoc} */
@@ -489,7 +487,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) {
489487
) throws IgniteCheckedException {
490488
assert val != null : key;
491489

492-
return startWrite(key, marshal(marshaller, val));
490+
return startWrite(key, val);
493491
}
494492

495493
/** {@inheritDoc} */
@@ -521,7 +519,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) {
521519
) throws IgniteCheckedException {
522520
assert newVal != null : key;
523521

524-
return startCas(key, marshal(marshaller, expVal), marshal(marshaller, newVal));
522+
return startCas(key, expVal, newVal);
525523
}
526524

527525
/** {@inheritDoc} */
@@ -531,7 +529,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) {
531529
) throws IgniteCheckedException {
532530
assert expVal != null : key;
533531

534-
return startCas(key, marshal(marshaller, expVal), null).get();
532+
return startCas(key, expVal, null).get();
535533
}
536534

537535
/** {@inheritDoc} */
@@ -1046,28 +1044,30 @@ else if (!isClient && ver.id() > 0) {
10461044
* for operation to be completed.
10471045
*
10481046
* @param key The key.
1049-
* @param valBytes Value bytes to write. Null if value needs to be removed.
1047+
* @param val Value to write. Null if value needs to be removed.
10501048
* @throws IgniteCheckedException If there was an error while sending discovery message.
10511049
*/
1052-
private GridFutureAdapter<?> startWrite(String key, byte[] valBytes) throws IgniteCheckedException {
1050+
private GridFutureAdapter<?> startWrite(String key, @Nullable Serializable val) throws IgniteCheckedException {
10531051
UUID reqId = UUID.randomUUID();
10541052

10551053
GridFutureAdapter<?> fut = prepareWriteFuture(reqId);
10561054

10571055
if (fut.isDone())
10581056
return fut;
10591057

1060-
DiscoveryCustomMessage msg = new DistributedMetaStorageUpdateMessage(reqId, key, valBytes);
1058+
DistributedMetaStorageUpdateMessage msg = new DistributedMetaStorageUpdateMessage(reqId, key, val);
1059+
1060+
msg.prepareMarshal(marshaller);
10611061

10621062
ctx.discovery().sendCustomEvent(msg);
10631063

10641064
return fut;
10651065
}
10661066

10671067
/**
1068-
* Basically the same as {@link #startWrite(String, byte[])} but for CAS operations.
1068+
* Basically the same as {@link #startWrite(String, Serializable)} but for CAS operations.
10691069
*/
1070-
private GridFutureAdapter<Boolean> startCas(String key, byte[] expValBytes, byte[] newValBytes)
1070+
private GridFutureAdapter<Boolean> startCas(String key, @Nullable Serializable expVal, @Nullable Serializable newVal)
10711071
throws IgniteCheckedException {
10721072
UUID reqId = UUID.randomUUID();
10731073

@@ -1076,7 +1076,9 @@ private GridFutureAdapter<Boolean> startCas(String key, byte[] expValBytes, byte
10761076
if (fut.isDone())
10771077
return fut;
10781078

1079-
DiscoveryCustomMessage msg = new DistributedMetaStorageCasMessage(reqId, key, expValBytes, newValBytes);
1079+
DistributedMetaStorageCasMessage msg = new DistributedMetaStorageCasMessage(reqId, key, expVal, newVal);
1080+
1081+
msg.prepareMarshal(marshaller);
10801082

10811083
ctx.discovery().sendCustomEvent(msg);
10821084

@@ -1134,7 +1136,7 @@ private void onUpdateMessage(
11341136
if (msg instanceof DistributedMetaStorageCasMessage)
11351137
completeCas((DistributedMetaStorageCasMessage)msg);
11361138
else
1137-
completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), msg.value()));
1139+
completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), msg.valueBytes()));
11381140
}
11391141
catch (IgniteInterruptedCheckedException e) {
11401142
throw U.convertException(e);
@@ -1318,16 +1320,16 @@ private void completeCas(
13181320

13191321
Serializable oldVal = bridge.read(msg.key());
13201322

1321-
Serializable expVal = unmarshal(marshaller, msg.expectedValue());
1323+
msg.finishUnmarshal(marshaller);
13221324

1323-
if (!Objects.deepEquals(oldVal, expVal)) {
1325+
if (!Objects.deepEquals(oldVal, msg.expectedValue())) {
13241326
msg.setMatches(false);
13251327

13261328
// Do nothing if expected value doesn't match with the actual one.
13271329
return;
13281330
}
13291331

1330-
completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), msg.value()));
1332+
completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), msg.valueBytes()));
13311333
}
13321334

13331335
/**

modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717

1818
package org.apache.ignite.internal.processors.metastorage.persistence;
1919

20+
import java.io.Serializable;
2021
import java.util.UUID;
22+
import org.apache.ignite.IgniteCheckedException;
2123
import org.apache.ignite.internal.Order;
2224
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
2325
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
2426
import org.apache.ignite.internal.util.typedef.internal.S;
27+
import org.apache.ignite.internal.util.typedef.internal.U;
2528
import org.apache.ignite.lang.IgniteUuid;
29+
import org.apache.ignite.marshaller.Marshaller;
2630
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2731
import org.jetbrains.annotations.Nullable;
2832

@@ -42,7 +46,10 @@ public class DistributedMetaStorageUpdateMessage implements DiscoveryCustomMessa
4246
@Order(2)
4347
String key;
4448

45-
/** TODO: revise the external serialization https://issues.apache.org/jira/browse/IGNITE-28058. */
49+
/** */
50+
private @Nullable Serializable val;
51+
52+
/** */
4653
@Order(3)
4754
byte[] valBytes;
4855

@@ -52,12 +59,12 @@ public DistributedMetaStorageUpdateMessage() {
5259
}
5360

5461
/** */
55-
public DistributedMetaStorageUpdateMessage(UUID reqId, String key, byte[] valBytes) {
62+
public DistributedMetaStorageUpdateMessage(UUID reqId, String key, @Nullable Serializable val) {
5663
id = IgniteUuid.randomUuid();
5764

5865
this.reqId = reqId;
5966
this.key = key;
60-
this.valBytes = valBytes;
67+
this.val = val;
6168
}
6269

6370
/** {@inheritDoc} */
@@ -66,17 +73,17 @@ public DistributedMetaStorageUpdateMessage(UUID reqId, String key, byte[] valByt
6673
}
6774

6875
/** */
69-
public UUID requestId() {
70-
return reqId;
76+
public String key() {
77+
return key;
7178
}
7279

7380
/** */
74-
public String key() {
75-
return key;
81+
public Serializable value() {
82+
return val;
7683
}
7784

7885
/** */
79-
public byte[] value() {
86+
public byte[] valueBytes() {
8087
return valBytes;
8188
}
8289

@@ -90,6 +97,22 @@ public byte[] value() {
9097
return true;
9198
}
9299

100+
/**
101+
* @param marsh Marshaller.
102+
*/
103+
public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
104+
if (val != null && valBytes == null)
105+
valBytes = U.marshal(marsh, val);
106+
}
107+
108+
/**
109+
* @param marsh Marshaller.
110+
*/
111+
public void finishUnmarshal(Marshaller marsh) throws IgniteCheckedException {
112+
if (valBytes != null && val == null)
113+
val = U.unmarshal(marsh, valBytes, U.gridClassLoader());
114+
}
115+
93116
/** {@inheritDoc} */
94117
@Override public String toString() {
95118
return S.toString(DistributedMetaStorageUpdateMessage.class, this);

0 commit comments

Comments
 (0)