diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsResponse.java index 314e00d4b9cf0..9aeb780795fd0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsResponse.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal; -import java.nio.ByteBuffer; import java.util.Collection; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.compute.ComputeJobSibling; @@ -25,8 +24,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; /** @@ -34,10 +31,10 @@ */ public class GridJobSiblingsResponse implements Message { /** */ - @GridDirectTransient - private Collection siblings; + private @Nullable Collection siblings; /** */ + @Order(0) private byte[] siblingsBytes; /** @@ -49,70 +46,56 @@ public GridJobSiblingsResponse() { /** * @param siblings Siblings. - * @param siblingsBytes Serialized siblings. */ - public GridJobSiblingsResponse(@Nullable Collection siblings, @Nullable byte[] siblingsBytes) { + public GridJobSiblingsResponse(@Nullable Collection siblings) { this.siblings = siblings; - this.siblingsBytes = siblingsBytes; } /** * @return Job siblings. */ - public Collection jobSiblings() { + public @Nullable Collection jobSiblings() { return siblings; } /** - * @param marsh Marshaller. - * @throws IgniteCheckedException In case of error. + * @return Serialized siblings. */ - public void unmarshalSiblings(Marshaller marsh) throws IgniteCheckedException { - assert marsh != null; - - if (siblingsBytes != null) - siblings = U.unmarshal(marsh, siblingsBytes, null); + public byte[] siblingsBytes() { + return siblingsBytes; } - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeByteArray(siblingsBytes)) - return false; - - writer.incrementState(); - - } - - return true; + /** + * @param siblingsBytes Serialized siblings. + */ + public void siblingsBytes(byte[] siblingsBytes) { + this.siblingsBytes = siblingsBytes; } - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - switch (reader.state()) { - case 0: - siblingsBytes = reader.readByteArray(); + /** + * Marshals siblings to byte array. + * + * @param marsh Marshaller. + * @throws IgniteCheckedException In case of error. + */ + public void marshalSiblings(Marshaller marsh) throws IgniteCheckedException { + siblingsBytes = U.marshal(marsh, siblings); + } - if (!reader.isLastRead()) - return false; + /** + * Unmarshals siblings from byte array. + * + * @param marsh Marshaller. + * @throws IgniteCheckedException In case of error. + */ + public void unmarshalSiblings(Marshaller marsh) throws IgniteCheckedException { + assert marsh != null; - reader.incrementState(); + if (siblingsBytes != null) { + siblings = U.unmarshal(marsh, siblingsBytes, null); + siblingsBytes = null; } - - return true; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index cf07a3fcfb8c0..c3804b5beaee2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -78,6 +78,7 @@ import org.apache.ignite.internal.codegen.GridJobCancelRequestSerializer; import org.apache.ignite.internal.codegen.GridJobExecuteResponseSerializer; import org.apache.ignite.internal.codegen.GridJobSiblingsRequestSerializer; +import org.apache.ignite.internal.codegen.GridJobSiblingsResponseSerializer; import org.apache.ignite.internal.codegen.GridNearAtomicCheckUpdateRequestSerializer; import org.apache.ignite.internal.codegen.GridNearAtomicSingleUpdateFilterRequestSerializer; import org.apache.ignite.internal.codegen.GridNearAtomicSingleUpdateRequestSerializer; @@ -313,7 +314,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider { factory.register((short)1, GridJobExecuteRequest::new); factory.register((short)2, GridJobExecuteResponse::new, new GridJobExecuteResponseSerializer()); factory.register((short)3, GridJobSiblingsRequest::new, new GridJobSiblingsRequestSerializer()); - factory.register((short)4, GridJobSiblingsResponse::new); + factory.register((short)4, GridJobSiblingsResponse::new, new GridJobSiblingsResponseSerializer()); factory.register((short)5, GridTaskCancelRequest::new, new GridTaskCancelRequestSerializer()); factory.register((short)6, GridTaskSessionRequest::new, new GridTaskSessionRequestSerializer()); factory.register((short)7, GridCheckpointRequest::new, new GridCheckpointRequestSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java index b6411017fc9c4..54775dcb63a54 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java @@ -1412,11 +1412,12 @@ private class JobSiblingsMessageListener implements GridMessageListener { boolean loc = ctx.localNodeId().equals(nodeId); - ctx.io().sendToCustomTopic(nodeId, topic, - new GridJobSiblingsResponse( - loc ? siblings : null, - loc ? null : U.marshal(marsh, siblings)), - SYSTEM_POOL); + GridJobSiblingsResponse resp = new GridJobSiblingsResponse(siblings); + + if (!loc) + resp.marshalSiblings(marsh); + + ctx.io().sendToCustomTopic(nodeId, topic, resp, SYSTEM_POOL); } catch (IgniteCheckedException e) { U.error(log, "Failed to send job sibling response.", e);