Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@
import org.apache.ignite.internal.codegen.TcpInverseConnectionResponseMessageSerializer;
import org.apache.ignite.internal.codegen.TransactionAttributesAwareRequestSerializer;
import org.apache.ignite.internal.codegen.TransactionIsolationMessageSerializer;
import org.apache.ignite.internal.codegen.TxEntryValueHolderSerializer;
import org.apache.ignite.internal.codegen.TxLockListSerializer;
import org.apache.ignite.internal.codegen.TxLockSerializer;
import org.apache.ignite.internal.codegen.TxLocksRequestSerializer;
Expand Down Expand Up @@ -399,7 +400,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
factory.register((short)97, CacheEvictionEntry::new, new CacheEvictionEntrySerializer());
factory.register((short)98, CacheEntryPredicateAdapter::new, new CacheEntryPredicateAdapterSerializer());
factory.register((short)100, IgniteTxEntry::new);
factory.register((short)101, TxEntryValueHolder::new);
factory.register((short)101, TxEntryValueHolder::new, new TxEntryValueHolderSerializer());
factory.register((short)102, CacheVersionedValue::new, new CacheVersionedValueSerializer());
factory.register((short)103, GridCacheRawVersionedEntry::new);
factory.register((short)104, GridCacheVersionEx::new, new GridCacheVersionExSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public IgniteTxEntry(GridCacheContext<?, ?> ctx,

this.ctx = ctx;
this.tx = tx;
this.val.value(op, val, false, false);
this.val.updateValue(op, val, false, false);
this.entry = entry;
this.ttl = ttl;
this.conflictExpireTime = conflictExpireTime;
Expand Down Expand Up @@ -321,7 +321,7 @@ public IgniteTxEntry(GridCacheContext<?, ?> ctx,

this.ctx = ctx;
this.tx = tx;
this.val.value(op, val, false, false);
this.val.updateValue(op, val, false, false);
this.entry = entry;
this.ttl = ttl;
this.filters = filters;
Expand Down Expand Up @@ -382,7 +382,7 @@ public IgniteTxEntry cleanCopy(GridCacheContext<?, ?> ctx) {
cp.val = new TxEntryValueHolder();

cp.filters = filters;
cp.val.value(val.op(), val.value(), val.hasWriteValue(), val.hasReadValue());
cp.val.updateValue(val.op(), val.value(), val.hasWriteValue(), val.hasReadValue());
cp.entryProcessorsCol = entryProcessorsCol;
cp.ttl = ttl;
cp.conflictExpireTime = conflictExpireTime;
Expand Down Expand Up @@ -475,7 +475,7 @@ void setAndMarkValid(GridCacheOperation op, CacheObject val) {
* @param hasWriteVal Has write value flag.
*/
void setAndMarkValid(GridCacheOperation op, CacheObject val, boolean hasWriteVal, boolean hasReadVal) {
this.val.value(op, val, hasWriteVal, hasReadVal);
this.val.updateValue(op, val, hasWriteVal, hasReadVal);

markValid();
}
Expand All @@ -485,7 +485,7 @@ void setAndMarkValid(GridCacheOperation op, CacheObject val, boolean hasWriteVal
* to further peek operations.
*/
public void markValid() {
prevVal.value(val.op(), val.value(), val.hasWriteValue(), val.hasReadValue());
prevVal.updateValue(val.op(), val.value(), val.hasWriteValue(), val.hasReadValue());
}

/**
Expand Down Expand Up @@ -671,7 +671,7 @@ public void oldValue(CacheObject oldVal) {
if (this.oldVal == null)
this.oldVal = new TxEntryValueHolder();

this.oldVal.value(op(), oldVal, true, true);
this.oldVal.updateValue(op(), oldVal, true, true);
}

/**
Expand Down Expand Up @@ -757,7 +757,7 @@ public void conflictExpireTime(long conflictExpireTime) {
* @param readVal Read value flag.
*/
public void value(@Nullable CacheObject val, boolean writeVal, boolean readVal) {
this.val.value(this.val.op(), val, writeVal, readVal);
this.val.updateValue(this.val.op(), val, writeVal, readVal);
}

/**
Expand All @@ -766,7 +766,7 @@ public void value(@Nullable CacheObject val, boolean writeVal, boolean readVal)
* @param val Read value to set.
*/
public void readValue(@Nullable CacheObject val) {
this.val.value(this.val.op(), val, false, true);
this.val.updateValue(this.val.op(), val, false, true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

package org.apache.ignite.internal.processors.cache.transactions;

import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.communication.GridCacheOperationMessage;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
Expand All @@ -29,8 +28,6 @@
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
Expand All @@ -42,23 +39,28 @@
/**
* Auxiliary class to hold value, value-has-been-set flag, value update operation, value bytes.
*/
@IgniteCodeGeneratingFail // Need to handle 'hasWriteVal' flag during write.
public class TxEntryValueHolder implements Message {
/** */
/** Stored value. */
@Order(value = 0, method = "value")
@GridToStringInclude(sensitive = true)
private CacheObject val;
private @Nullable CacheObject val;

/** */
@GridToStringInclude
private GridCacheOperation op = NOOP;

/** Cache operation stub for serialization. */
@Order(value = 1, method = "operationMessage")
@SuppressWarnings("unused")
private GridCacheOperationMessage opMsg;

/** Flag indicating that value has been set for write. */
@Order(value = 2, method = "hasWriteValue")
@GridToStringExclude
private boolean hasWriteVal;

/** Flag indicating that value has been set for read. */
@GridToStringExclude
@GridDirectTransient
private boolean hasReadVal;

/**
Expand All @@ -67,7 +69,7 @@ public class TxEntryValueHolder implements Message {
* @param hasWriteVal Write value presence flag.
* @param hasReadVal Read value presence flag.
*/
public void value(GridCacheOperation op, CacheObject val, boolean hasWriteVal, boolean hasReadVal) {
public void updateValue(GridCacheOperation op, CacheObject val, boolean hasWriteVal, boolean hasReadVal) {
if (hasReadVal && this.hasWriteVal)
return;

Expand All @@ -86,16 +88,14 @@ public boolean hasValue() {
}

/**
* Gets stored value.
*
* @return Value.
* @return Stored value.
*/
public CacheObject value() {
return val;
}

/**
* @param val Stored value.
* @param val New stored value.
*/
public void value(@Nullable CacheObject val) {
this.val = val;
Expand All @@ -120,19 +120,40 @@ public void op(GridCacheOperation op) {
}

/**
* @return {@code True} if write value was set.
* @return Flag indicating that value has been set for write.
*/
public boolean hasWriteValue() {
return hasWriteVal;
}

/**
* @param hasWriteVal New flag indicating that value has been set for write.
*/
public void hasWriteValue(boolean hasWriteVal) {
this.hasWriteVal = hasWriteVal;
}

/**
* @return {@code True} if read value was set.
*/
public boolean hasReadValue() {
return hasReadVal;
}

/**
* @return Cache operation message.
*/
public GridCacheOperationMessage operationMessage() {
return new GridCacheOperationMessage(op);
}

/**
* @param opMsg New cache operation message.
*/
public void operationMessage(GridCacheOperationMessage opMsg) {
op = opMsg != null ? opMsg.value() : null;
}

/**
* @param ctx Cache context.
* @throws org.apache.ignite.IgniteCheckedException If marshaling failed.
Expand All @@ -158,79 +179,6 @@ public void unmarshal(CacheObjectValueContext ctx, ClassLoader ldr) throws Ignit
return S.toString(TxEntryValueHolder.class, this);
}

/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);

if (!writer.isHeaderWritten()) {
if (!writer.writeHeader(directType()))
return false;

writer.onHeaderWritten();
}

switch (writer.state()) {
case 0:
if (!writer.writeBoolean(hasWriteVal))
return false;

writer.incrementState();

case 1:
if (!writer.writeByte(op != null ? (byte)op.ordinal() : -1))
return false;

writer.incrementState();

case 2:
if (!writer.writeCacheObject(hasWriteVal ? val : null))
return false;

writer.incrementState();

}

return true;
}

/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
reader.setBuffer(buf);

switch (reader.state()) {
case 0:
hasWriteVal = reader.readBoolean();

if (!reader.isLastRead())
return false;

reader.incrementState();

case 1:
byte opOrd;

opOrd = reader.readByte();

if (!reader.isLastRead())
return false;

op = GridCacheOperation.fromOrdinal(opOrd);

reader.incrementState();

case 2:
val = reader.readCacheObject();

if (!reader.isLastRead())
return false;

reader.incrementState();

}

return true;
}

/** {@inheritDoc} */
@Override public short directType() {
return 101;
Expand Down
Loading