Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@

package org.apache.ignite.internal.processors.metastorage.persistence;

import java.io.Serializable;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.jetbrains.annotations.Nullable;

/** */
public class DistributedMetaStorageCasMessage extends DistributedMetaStorageUpdateMessage {
/** TODO: revise the external serialization https://issues.apache.org/jira/browse/IGNITE-28058. */
/** */
private @Nullable Serializable expVal;

/** */
@Order(0)
byte[] expectedVal;
byte[] expValBytes;

/** */
@Order(1)
Expand All @@ -40,16 +47,16 @@ public DistributedMetaStorageCasMessage() {
}

/** */
public DistributedMetaStorageCasMessage(UUID reqId, String key, byte[] expValBytes, byte[] valBytes) {
super(reqId, key, valBytes);
public DistributedMetaStorageCasMessage(UUID reqId, String key, @Nullable Serializable expVal, @Nullable Serializable val) {
super(reqId, key, val);

expectedVal = expValBytes;
this.expVal = expVal;
matches = true;
}

/** */
public byte[] expectedValue() {
return expectedVal;
public Serializable expectedValue() {
return expVal;
}

/** */
Expand All @@ -64,7 +71,23 @@ public boolean matches() {

/** {@inheritDoc} */
@Override @Nullable public DiscoveryCustomMessage ackMessage() {
return new DistributedMetaStorageCasAckMessage(requestId(), matches);
return new DistributedMetaStorageCasAckMessage(reqId, matches);
}

/** {@inheritDoc} */
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
super.prepareMarshal(marsh);

if (expVal != null && expValBytes == null)
expValBytes = U.marshal(marsh, expVal);
}

/** {@inheritDoc} */
@Override public void finishUnmarshal(Marshaller marsh) throws IgniteCheckedException {
super.finishUnmarshal(marsh);

if (expValBytes != null && expVal == null)
expVal = U.unmarshal(marsh, expValBytes, U.gridClassLoader());
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
Expand Down Expand Up @@ -84,7 +83,6 @@
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageHistoryItem.EMPTY_ARRAY;
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemPrefix;
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemVer;
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.marshal;
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.unmarshal;
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageVersion.INITIAL_VERSION;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
Expand Down Expand Up @@ -479,7 +477,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) {
@Override public void write(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException {
assert val != null : key;

startWrite(key, marshal(marshaller, val)).get();
startWrite(key, val).get();
}

/** {@inheritDoc} */
Expand All @@ -489,7 +487,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) {
) throws IgniteCheckedException {
assert val != null : key;

return startWrite(key, marshal(marshaller, val));
return startWrite(key, val);
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -521,7 +519,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) {
) throws IgniteCheckedException {
assert newVal != null : key;

return startCas(key, marshal(marshaller, expVal), marshal(marshaller, newVal));
return startCas(key, expVal, newVal);
}

/** {@inheritDoc} */
Expand All @@ -531,7 +529,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) {
) throws IgniteCheckedException {
assert expVal != null : key;

return startCas(key, marshal(marshaller, expVal), null).get();
return startCas(key, expVal, null).get();
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -1046,28 +1044,30 @@ else if (!isClient && ver.id() > 0) {
* for operation to be completed.
*
* @param key The key.
* @param valBytes Value bytes to write. Null if value needs to be removed.
* @param val Value to write. Null if value needs to be removed.
* @throws IgniteCheckedException If there was an error while sending discovery message.
*/
private GridFutureAdapter<?> startWrite(String key, byte[] valBytes) throws IgniteCheckedException {
private GridFutureAdapter<?> startWrite(String key, @Nullable Serializable val) throws IgniteCheckedException {
UUID reqId = UUID.randomUUID();

GridFutureAdapter<?> fut = prepareWriteFuture(reqId);

if (fut.isDone())
return fut;

DiscoveryCustomMessage msg = new DistributedMetaStorageUpdateMessage(reqId, key, valBytes);
DistributedMetaStorageUpdateMessage msg = new DistributedMetaStorageUpdateMessage(reqId, key, val);

msg.prepareMarshal(marshaller);

ctx.discovery().sendCustomEvent(msg);

return fut;
}

/**
* Basically the same as {@link #startWrite(String, byte[])} but for CAS operations.
* Basically the same as {@link #startWrite(String, Serializable)} but for CAS operations.
*/
private GridFutureAdapter<Boolean> startCas(String key, byte[] expValBytes, byte[] newValBytes)
private GridFutureAdapter<Boolean> startCas(String key, @Nullable Serializable expVal, @Nullable Serializable newVal)
throws IgniteCheckedException {
UUID reqId = UUID.randomUUID();

Expand All @@ -1076,7 +1076,9 @@ private GridFutureAdapter<Boolean> startCas(String key, byte[] expValBytes, byte
if (fut.isDone())
return fut;

DiscoveryCustomMessage msg = new DistributedMetaStorageCasMessage(reqId, key, expValBytes, newValBytes);
DistributedMetaStorageCasMessage msg = new DistributedMetaStorageCasMessage(reqId, key, expVal, newVal);

msg.prepareMarshal(marshaller);

ctx.discovery().sendCustomEvent(msg);

Expand Down Expand Up @@ -1134,7 +1136,7 @@ private void onUpdateMessage(
if (msg instanceof DistributedMetaStorageCasMessage)
completeCas((DistributedMetaStorageCasMessage)msg);
else
completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), msg.value()));
completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), msg.valueBytes()));
}
catch (IgniteInterruptedCheckedException e) {
throw U.convertException(e);
Expand Down Expand Up @@ -1318,16 +1320,16 @@ private void completeCas(

Serializable oldVal = bridge.read(msg.key());

Serializable expVal = unmarshal(marshaller, msg.expectedValue());
msg.finishUnmarshal(marshaller);

if (!Objects.deepEquals(oldVal, expVal)) {
if (!Objects.deepEquals(oldVal, msg.expectedValue())) {
msg.setMatches(false);

// Do nothing if expected value doesn't match with the actual one.
return;
}

completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), msg.value()));
completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), msg.valueBytes()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

package org.apache.ignite.internal.processors.metastorage.persistence;

import java.io.Serializable;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.jetbrains.annotations.Nullable;

Expand All @@ -42,7 +46,10 @@ public class DistributedMetaStorageUpdateMessage implements DiscoveryCustomMessa
@Order(2)
String key;

/** TODO: revise the external serialization https://issues.apache.org/jira/browse/IGNITE-28058. */
/** */
private @Nullable Serializable val;

/** */
@Order(3)
byte[] valBytes;

Expand All @@ -52,12 +59,12 @@ public DistributedMetaStorageUpdateMessage() {
}

/** */
public DistributedMetaStorageUpdateMessage(UUID reqId, String key, byte[] valBytes) {
public DistributedMetaStorageUpdateMessage(UUID reqId, String key, @Nullable Serializable val) {
id = IgniteUuid.randomUuid();

this.reqId = reqId;
this.key = key;
this.valBytes = valBytes;
this.val = val;
}

/** {@inheritDoc} */
Expand All @@ -66,17 +73,17 @@ public DistributedMetaStorageUpdateMessage(UUID reqId, String key, byte[] valByt
}

/** */
public UUID requestId() {
return reqId;
public String key() {
return key;
}

/** */
public String key() {
return key;
public Serializable value() {
return val;
}

/** */
public byte[] value() {
public byte[] valueBytes() {
return valBytes;
}

Expand All @@ -90,6 +97,22 @@ public byte[] value() {
return true;
}

/**
* @param marsh Marshaller.
*/
public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
if (val != null && valBytes == null)
valBytes = U.marshal(marsh, val);
}

/**
* @param marsh Marshaller.
*/
public void finishUnmarshal(Marshaller marsh) throws IgniteCheckedException {
if (valBytes != null && val == null)
val = U.unmarshal(marsh, valBytes, U.gridClassLoader());
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DistributedMetaStorageUpdateMessage.class, this);
Expand Down
Loading