Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,7 @@
* @param typeName Type name.
* @return Type ID.
*/
// TODO?

Check warning on line 933 in modules/binary/api/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Complete the task associated to this TODO comment.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZqlz-K2yRRzOhfYny6F&open=AZqlz-K2yRRzOhfYny6F&pullRequest=12524
public int typeId(String typeName) {
Integer id = predefinedTypeNames.get(SIMPLE_NAME_LOWER_CASE_MAPPER.typeName(typeName));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,12 @@ private void returnFalseIfWriteFailed(VariableElement field) throws Exception {

String getExpr = (F.isEmpty(methodName) ? field.getSimpleName().toString() : methodName) + "()";

if (field.getAnnotation(Order.class).user()) {
returnFalseIfWriteFailed(write, "writer.writeUserObject", getExpr);

return;
}

TypeMirror type = field.asType();

if (type.getKind().isPrimitive()) {
Expand Down Expand Up @@ -432,6 +438,12 @@ private void returnFalseIfReadFailed(VariableElement field) throws Exception {

String name = F.isEmpty(methodName) ? field.getSimpleName().toString() : methodName;

if (field.getAnnotation(Order.class).user()) {
returnFalseIfReadFailed(name, "reader.readUserObject");

return;
}

if (type.getKind().isPrimitive()) {
String typeName = capitalizeOnlyFirst(type.getKind().name());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@

/** @return Getter and setter name. */
String method() default "";

/** */
boolean user() default false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,8 @@
@Order(value = 3, method = "exceptionMessage")
private @Nullable ErrorMessage gridExMsg;

/** Job result serialization call holder. */
@Order(value = 4, method = "jobResultBytes")
private @Nullable byte[] resBytes;

/** */
@Order(value = 4, method = "jobResult", user = true)
private @Nullable Object res;

/** */
Expand Down Expand Up @@ -152,22 +149,17 @@
}

/**
* @return Serialized job result.
* @return Job result.
*/
@Nullable public byte[] jobResultBytes() {
return resBytes;
}

/** */
public void jobResultBytes(@Nullable byte[] resBytes) {
this.resBytes = resBytes;
@Nullable public Object jobResult() {
return res;
}

/**
* @return Job result.
* @param res Job result.
*/
@Nullable public Object getJobResult() {
return res;
public void jobResult(Object res) {
this.res = res;
}

/**
Expand Down Expand Up @@ -274,25 +266,25 @@

/**
* Serializes user data to byte[] with provided marshaller.
* Erases non-marshalled data like {@link #getJobAttributes()} or {@link #getJobResult()}.
* Erases non-marshalled data like {@link #getJobAttributes()}.
*/
public void marshallUserData(Marshaller marsh, @Nullable IgniteLogger log) {
if (res != null) {
try {
resBytes = U.marshal(marsh, res);
}
catch (IgniteCheckedException e) {
resBytes = null;

String msg = "Failed to serialize job response [nodeId=" + nodeId +
", ses=" + sesId + ", jobId=" + jobId +
", resCls=" + (res == null ? null : res.getClass()) + ']';

wrapSerializationError(e, msg, log);
}

res = null;
}
// if (res != null) {

Check warning on line 272 in modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This block of commented-out lines of code should be removed.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZqlz-H-yRRzOhfYny6D&open=AZqlz-H-yRRzOhfYny6D&pullRequest=12524
// try {
// resBytes = U.marshal(marsh, res);
// }
// catch (IgniteCheckedException e) {
// resBytes = null;
//
// String msg = "Failed to serialize job response [nodeId=" + nodeId +
// ", ses=" + sesId + ", jobId=" + jobId +
// ", resCls=" + (res == null ? null : res.getClass()) + ']';
//
// wrapSerializationError(e, msg, log);
// }
//
// res = null;
// }

if (!F.isEmpty(jobAttrs)) {
try {
Expand All @@ -314,7 +306,7 @@

/**
* Deserializes user data from byte[] with provided marshaller and class loader.
* Erases marshalled data like {@link #jobAttrubutesBytes()} or {@link #jobResultBytes()}.
* Erases marshalled data like {@link #jobAttrubutesBytes()}.
*/
public void unmarshallUserData(Marshaller marshaller, ClassLoader clsLdr) throws IgniteCheckedException {
if (jobAttrsBytes != null) {
Expand All @@ -323,11 +315,11 @@
jobAttrsBytes = null;
}

if (resBytes != null) {
res = U.unmarshal(marshaller, resBytes, clsLdr);

resBytes = null;
}
// if (resBytes != null) {

Check warning on line 318 in modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This block of commented-out lines of code should be removed.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZqlz-H-yRRzOhfYny6E&open=AZqlz-H-yRRzOhfYny6E&pullRequest=12524
// res = U.unmarshal(marshaller, resBytes, clsLdr);
//
// resBytes = null;
// }
}

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.direct.state.DirectMessageState;
import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
Expand Down Expand Up @@ -58,10 +59,10 @@ public class DirectMessageReader implements MessageReader {
* @param msgFactory Message factory.
* @param cacheObjProc Cache object processor.
*/
public DirectMessageReader(final MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc) {
public DirectMessageReader(final MessageFactory msgFactory, BinaryMarshaller marsh, IgniteCacheObjectProcessor cacheObjProc) {
state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<StateItem>() {
@Override public StateItem apply() {
return new StateItem(msgFactory, cacheObjProc);
return new StateItem(msgFactory, marsh, cacheObjProc);
}
});
}
Expand Down Expand Up @@ -357,6 +358,17 @@ public ByteBuffer getBuffer() {
return ll;
}

/** {@inheritDoc} */
@Override public <T> T readUserObject() {
DirectByteBufferStream stream = state.item().stream;

T userObj = stream.readUserObject(this);

lastRead = stream.lastFinished();

return userObj;
}

/** {@inheritDoc} */
@Override public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls) {
DirectByteBufferStream stream = state.item().stream;
Expand Down Expand Up @@ -441,8 +453,8 @@ private static class StateItem implements DirectMessageStateItem {
* @param msgFactory Message factory.
* @param cacheObjProc Cache object processor.
*/
public StateItem(MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc) {
stream = new DirectByteBufferStream(msgFactory, cacheObjProc);
public StateItem(MessageFactory msgFactory, BinaryMarshaller marsh, IgniteCacheObjectProcessor cacheObjProc) {
stream = new DirectByteBufferStream(msgFactory, marsh, cacheObjProc);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.direct.state.DirectMessageState;
import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
Expand Down Expand Up @@ -51,10 +52,10 @@ public class DirectMessageWriter implements MessageWriter {
private ByteBuffer buf;

/** */
public DirectMessageWriter(final MessageFactory msgFactory) {
public DirectMessageWriter(final MessageFactory msgFactory, final BinaryMarshaller marsh) {
state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<StateItem>() {
@Override public StateItem apply() {
return new StateItem(msgFactory);
return new StateItem(msgFactory, marsh);
}
});
}
Expand Down Expand Up @@ -355,6 +356,15 @@ public ByteBuffer getBuffer() {
return stream.lastFinished();
}

/** {@inheritDoc} */
@Override public <T> boolean writeUserObject(T obj) {
DirectByteBufferStream stream = state.item().stream;

stream.writeUserObject(obj);

return stream.lastFinished();
}

/** {@inheritDoc} */
@Override public boolean isHeaderWritten() {
return state.item().hdrWritten;
Expand Down Expand Up @@ -410,8 +420,8 @@ private static class StateItem implements DirectMessageStateItem {
private boolean hdrWritten;

/** */
public StateItem(MessageFactory msgFactory) {
stream = new DirectByteBufferStream(msgFactory);
public StateItem(MessageFactory msgFactory, BinaryMarshaller marsh) {
stream = new DirectByteBufferStream(msgFactory, marsh);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
Expand Down Expand Up @@ -222,6 +223,9 @@
@GridToStringExclude
private final IgniteCacheObjectProcessor cacheObjProc;

/** Binary marshaller for marshalling user objects. */
private final BinaryMarshaller marsh;

/** */
@GridToStringExclude
protected ByteBuffer buf;
Expand Down Expand Up @@ -342,8 +346,9 @@
*
* @param msgFactory Message factory.
*/
public DirectByteBufferStream(MessageFactory msgFactory) {
public DirectByteBufferStream(MessageFactory msgFactory, BinaryMarshaller marsh) {
this.msgFactory = msgFactory;
this.marsh = marsh;

// Is not used while writing messages.
cacheObjProc = null;
Expand All @@ -355,8 +360,9 @@
* @param msgFactory Message factory.
* @param cacheObjProc Cache object processor.
*/
public DirectByteBufferStream(MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc) {
public DirectByteBufferStream(MessageFactory msgFactory, BinaryMarshaller marsh, IgniteCacheObjectProcessor cacheObjProc) {
this.msgFactory = msgFactory;
this.marsh = marsh;
this.cacheObjProc = cacheObjProc;
}

Expand Down Expand Up @@ -897,6 +903,16 @@
writeShort(Short.MIN_VALUE);
}

/** */
public <T> void writeUserObject(T obj) {
try {
writeByteArray(marsh.marshal(obj));
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}

/**
* @param arr Array.
* @param itemType Component type.
Expand Down Expand Up @@ -1557,6 +1573,18 @@
return null;
}

/** */
public <T> T readUserObject(MessageReader reader) {

Check warning on line 1577 in modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unused method parameter "reader".

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZqlz-CmyRRzOhfYny6C&open=AZqlz-CmyRRzOhfYny6C&pullRequest=12524
try {
byte[] arr = readByteArray();

return marsh.unmarshal(arr, null);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}

/**
* @param itemType Item type.
* @param itemCls Item class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject;
Expand Down Expand Up @@ -562,6 +563,10 @@ protected final String stopInfo() {
return ctx.io().formatter();
}

@Override public BinaryMarshaller binaryMarshaller() {
return ctx.marshaller();
}

@Override public MessageFactory messageFactory() {
return ctx.io().messageFactory();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,11 +433,11 @@ public void resetMetrics() {
else {
formatter = new MessageFormatter() {
@Override public MessageWriter writer(MessageFactory msgFactory) {
return new DirectMessageWriter(msgFactory);
return new DirectMessageWriter(msgFactory, ctx.marshaller());
}

@Override public MessageReader reader(MessageFactory msgFactory) {
return new DirectMessageReader(msgFactory, ctx.cacheObjects());
return new DirectMessageReader(msgFactory, ctx.marshaller(), ctx.cacheObjects());
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.function.Consumer;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.lang.IgniteBiPredicate;
Expand Down Expand Up @@ -182,6 +183,11 @@ public class StandaloneSpiContext implements IgniteSpiContext {
return null;
}

/** {@inheritDoc} */
@Override public BinaryMarshaller binaryMarshaller() {
return null;
}

/** {@inheritDoc} */
@Override public MessageFactory messageFactory() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ void onResponse(GridJobExecuteResponse msg) {
if (!loc)
res.unmarshallUserData(marsh, U.resolveClassLoader(dep.classLoader(), ctx.config()));

jobRes.onResponse(res.getJobResult(), res.exception(), res.getJobAttributes(), res.cancelled());
jobRes.onResponse(res.jobResult(), res.exception(), res.getJobAttributes(), res.cancelled());

if (loc)
ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobAfterSend.class);
Expand Down
Loading
Loading