Skip to content

Commit 03c9080

Browse files
committed
IGNITE-28058 Revise custom serialization in DistributedMetaStorage* messages
1 parent 6377254 commit 03c9080

3 files changed

Lines changed: 68 additions & 35 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@
1717

1818
package org.apache.ignite.internal.processors.metastorage.persistence;
1919

20+
import java.io.Serializable;
2021
import java.util.UUID;
22+
import org.apache.ignite.IgniteCheckedException;
2123
import org.apache.ignite.internal.Order;
2224
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
2325
import org.apache.ignite.internal.util.typedef.internal.S;
26+
import org.apache.ignite.internal.util.typedef.internal.U;
27+
import org.apache.ignite.marshaller.Marshaller;
2428
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2529
import org.jetbrains.annotations.Nullable;
2630

@@ -29,9 +33,12 @@ public class DistributedMetaStorageCasMessage extends DistributedMetaStorageUpda
2933
/** */
3034
private static final long serialVersionUID = 0L;
3135

32-
/** TODO: revise the external serialization https://issues.apache.org/jira/browse/IGNITE-28058. */
36+
/** */
37+
private @Nullable Serializable expVal;
38+
39+
/** */
3340
@Order(0)
34-
byte[] expectedVal;
41+
byte[] expValBytes;
3542

3643
/** */
3744
@Order(1)
@@ -43,16 +50,16 @@ public DistributedMetaStorageCasMessage() {
4350
}
4451

4552
/** */
46-
public DistributedMetaStorageCasMessage(UUID reqId, String key, byte[] expValBytes, byte[] valBytes) {
47-
super(reqId, key, valBytes);
53+
public DistributedMetaStorageCasMessage(UUID reqId, String key, @Nullable Serializable expVal, @Nullable Serializable val) {
54+
super(reqId, key, val);
4855

49-
expectedVal = expValBytes;
56+
this.expVal = expVal;
5057
matches = true;
5158
}
5259

5360
/** */
54-
public byte[] expectedValue() {
55-
return expectedVal;
61+
public Serializable expectedValue() {
62+
return expVal;
5663
}
5764

5865
/** */
@@ -67,9 +74,24 @@ public boolean matches() {
6774

6875
/** {@inheritDoc} */
6976
@Override @Nullable public DiscoveryCustomMessage ackMessage() {
70-
return new DistributedMetaStorageCasAckMessage(requestId(), matches);
77+
return new DistributedMetaStorageCasAckMessage(reqId, matches);
78+
}
79+
80+
/** {@inheritDoc} */
81+
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
82+
super.prepareMarshal(marsh);
83+
84+
if (expVal != null)
85+
expValBytes = U.marshal(marsh, expVal);
7186
}
7287

88+
/** {@inheritDoc} */
89+
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
90+
super.finishUnmarshal(marsh, clsLdr);
91+
92+
if (expValBytes != null)
93+
expVal = U.unmarshal(marsh, expValBytes, clsLdr);
94+
}
7395

7496
/** {@inheritDoc} */
7597
@Override public String toString() {

modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) {
479479
@Override public void write(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException {
480480
assert val != null : key;
481481

482-
startWrite(key, marshal(marshaller, val)).get();
482+
startWrite(key, val).get();
483483
}
484484

485485
/** {@inheritDoc} */
@@ -489,7 +489,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) {
489489
) throws IgniteCheckedException {
490490
assert val != null : key;
491491

492-
return startWrite(key, marshal(marshaller, val));
492+
return startWrite(key, val);
493493
}
494494

495495
/** {@inheritDoc} */
@@ -521,7 +521,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) {
521521
) throws IgniteCheckedException {
522522
assert newVal != null : key;
523523

524-
return startCas(key, marshal(marshaller, expVal), marshal(marshaller, newVal));
524+
return startCas(key, expVal, newVal);
525525
}
526526

527527
/** {@inheritDoc} */
@@ -531,7 +531,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) {
531531
) throws IgniteCheckedException {
532532
assert expVal != null : key;
533533

534-
return startCas(key, marshal(marshaller, expVal), null).get();
534+
return startCas(key, expVal, null).get();
535535
}
536536

537537
/** {@inheritDoc} */
@@ -1046,28 +1046,28 @@ else if (!isClient && ver.id() > 0) {
10461046
* for operation to be completed.
10471047
*
10481048
* @param key The key.
1049-
* @param valBytes Value bytes to write. Null if value needs to be removed.
1049+
* @param val Value to write. Null if value needs to be removed.
10501050
* @throws IgniteCheckedException If there was an error while sending discovery message.
10511051
*/
1052-
private GridFutureAdapter<?> startWrite(String key, byte[] valBytes) throws IgniteCheckedException {
1052+
private GridFutureAdapter<?> startWrite(String key, @Nullable Serializable val) throws IgniteCheckedException {
10531053
UUID reqId = UUID.randomUUID();
10541054

10551055
GridFutureAdapter<?> fut = prepareWriteFuture(reqId);
10561056

10571057
if (fut.isDone())
10581058
return fut;
10591059

1060-
DiscoveryCustomMessage msg = new DistributedMetaStorageUpdateMessage(reqId, key, valBytes);
1060+
DiscoveryCustomMessage msg = new DistributedMetaStorageUpdateMessage(reqId, key, val);
10611061

10621062
ctx.discovery().sendCustomEvent(msg);
10631063

10641064
return fut;
10651065
}
10661066

10671067
/**
1068-
* Basically the same as {@link #startWrite(String, byte[])} but for CAS operations.
1068+
* Basically the same as {@link #startWrite(String, Serializable)} but for CAS operations.
10691069
*/
1070-
private GridFutureAdapter<Boolean> startCas(String key, byte[] expValBytes, byte[] newValBytes)
1070+
private GridFutureAdapter<Boolean> startCas(String key, @Nullable Serializable expVal, @Nullable Serializable newVal)
10711071
throws IgniteCheckedException {
10721072
UUID reqId = UUID.randomUUID();
10731073

@@ -1076,7 +1076,7 @@ private GridFutureAdapter<Boolean> startCas(String key, byte[] expValBytes, byte
10761076
if (fut.isDone())
10771077
return fut;
10781078

1079-
DiscoveryCustomMessage msg = new DistributedMetaStorageCasMessage(reqId, key, expValBytes, newValBytes);
1079+
DiscoveryCustomMessage msg = new DistributedMetaStorageCasMessage(reqId, key, expVal, newVal);
10801080

10811081
ctx.discovery().sendCustomEvent(msg);
10821082

@@ -1134,7 +1134,7 @@ private void onUpdateMessage(
11341134
if (msg instanceof DistributedMetaStorageCasMessage)
11351135
completeCas((DistributedMetaStorageCasMessage)msg);
11361136
else
1137-
completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), msg.value()));
1137+
completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), marshal(marshaller, msg.value())));
11381138
}
11391139
catch (IgniteInterruptedCheckedException e) {
11401140
throw U.convertException(e);
@@ -1318,16 +1318,14 @@ private void completeCas(
13181318

13191319
Serializable oldVal = bridge.read(msg.key());
13201320

1321-
Serializable expVal = unmarshal(marshaller, msg.expectedValue());
1322-
1323-
if (!Objects.deepEquals(oldVal, expVal)) {
1321+
if (!Objects.deepEquals(oldVal, msg.expectedValue())) {
13241322
msg.setMatches(false);
13251323

13261324
// Do nothing if expected value doesn't match with the actual one.
13271325
return;
13281326
}
13291327

1330-
completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), msg.value()));
1328+
completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), marshal(marshaller, msg.value())));
13311329
}
13321330

13331331
/**

modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,22 @@
1717

1818
package org.apache.ignite.internal.processors.metastorage.persistence;
1919

20+
import java.io.Serializable;
2021
import java.util.UUID;
22+
import org.apache.ignite.IgniteCheckedException;
23+
import org.apache.ignite.internal.MarshallableMessage;
2124
import org.apache.ignite.internal.Order;
2225
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
2326
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
2427
import org.apache.ignite.internal.util.typedef.internal.S;
28+
import org.apache.ignite.internal.util.typedef.internal.U;
2529
import org.apache.ignite.lang.IgniteUuid;
26-
import org.apache.ignite.plugin.extensions.communication.Message;
30+
import org.apache.ignite.marshaller.Marshaller;
2731
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2832
import org.jetbrains.annotations.Nullable;
2933

3034
/** */
31-
public class DistributedMetaStorageUpdateMessage implements DiscoveryCustomMessage, Message {
35+
public class DistributedMetaStorageUpdateMessage implements DiscoveryCustomMessage, MarshallableMessage {
3236
/** */
3337
private static final long serialVersionUID = 0L;
3438

@@ -46,7 +50,10 @@ public class DistributedMetaStorageUpdateMessage implements DiscoveryCustomMessa
4650
@Order(2)
4751
String key;
4852

49-
/** TODO: revise the external serialization https://issues.apache.org/jira/browse/IGNITE-28058. */
53+
/** */
54+
private @Nullable Serializable val;
55+
56+
/** */
5057
@Order(3)
5158
byte[] valBytes;
5259

@@ -56,32 +63,27 @@ public DistributedMetaStorageUpdateMessage() {
5663
}
5764

5865
/** */
59-
public DistributedMetaStorageUpdateMessage(UUID reqId, String key, byte[] valBytes) {
66+
public DistributedMetaStorageUpdateMessage(UUID reqId, String key, @Nullable Serializable val) {
6067
id = IgniteUuid.randomUuid();
6168

6269
this.reqId = reqId;
6370
this.key = key;
64-
this.valBytes = valBytes;
71+
this.val = val;
6572
}
6673

6774
/** {@inheritDoc} */
6875
@Override public IgniteUuid id() {
6976
return id;
7077
}
7178

72-
/** */
73-
public UUID requestId() {
74-
return reqId;
75-
}
76-
7779
/** */
7880
public String key() {
7981
return key;
8082
}
8183

8284
/** */
83-
public byte[] value() {
84-
return valBytes;
85+
public Serializable value() {
86+
return val;
8587
}
8688

8789
/** {@inheritDoc} */
@@ -94,6 +96,17 @@ public byte[] value() {
9496
return true;
9597
}
9698

99+
/** {@inheritDoc} */
100+
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
101+
if (val != null)
102+
valBytes = U.marshal(marsh, val);
103+
}
104+
105+
/** {@inheritDoc} */
106+
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
107+
if (valBytes != null)
108+
val = U.unmarshal(marsh, valBytes, clsLdr);
109+
}
97110

98111
/** {@inheritDoc} */
99112
@Override public String toString() {

0 commit comments

Comments
 (0)