Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,24 @@

package org.apache.ignite.internal;

import java.nio.ByteBuffer;
import java.util.Collection;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.compute.ComputeJobSibling;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;

/**
* Job siblings response.
*/
public class GridJobSiblingsResponse implements Message {
/** */
@GridDirectTransient
private Collection<ComputeJobSibling> siblings;
private @Nullable Collection<ComputeJobSibling> siblings;

/** */
@Order(0)
private byte[] siblingsBytes;

/**
Expand All @@ -49,70 +46,56 @@ public GridJobSiblingsResponse() {

/**
* @param siblings Siblings.
* @param siblingsBytes Serialized siblings.
*/
public GridJobSiblingsResponse(@Nullable Collection<ComputeJobSibling> siblings, @Nullable byte[] siblingsBytes) {
public GridJobSiblingsResponse(@Nullable Collection<ComputeJobSibling> siblings) {
this.siblings = siblings;
this.siblingsBytes = siblingsBytes;
}

/**
* @return Job siblings.
*/
public Collection<ComputeJobSibling> jobSiblings() {
public @Nullable Collection<ComputeJobSibling> jobSiblings() {
return siblings;
}

/**
* @param marsh Marshaller.
* @throws IgniteCheckedException In case of error.
* @return Serialized siblings.
*/
public void unmarshalSiblings(Marshaller marsh) throws IgniteCheckedException {
assert marsh != null;

if (siblingsBytes != null)
siblings = U.unmarshal(marsh, siblingsBytes, null);
public byte[] siblingsBytes() {
return siblingsBytes;
}

/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);

if (!writer.isHeaderWritten()) {
if (!writer.writeHeader(directType()))
return false;

writer.onHeaderWritten();
}

switch (writer.state()) {
case 0:
if (!writer.writeByteArray(siblingsBytes))
return false;

writer.incrementState();

}

return true;
/**
* @param siblingsBytes Serialized siblings.
*/
public void siblingsBytes(byte[] siblingsBytes) {
this.siblingsBytes = siblingsBytes;
}

/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
reader.setBuffer(buf);

switch (reader.state()) {
case 0:
siblingsBytes = reader.readByteArray();
/**
* Marshals siblings to byte array.
*
* @param marsh Marshaller.
* @throws IgniteCheckedException In case of error.
*/
public void marshalSiblings(Marshaller marsh) throws IgniteCheckedException {
siblingsBytes = U.marshal(marsh, siblings);
}

if (!reader.isLastRead())
return false;
/**
* Unmarshals siblings from byte array.
*
* @param marsh Marshaller.
* @throws IgniteCheckedException In case of error.
*/
public void unmarshalSiblings(Marshaller marsh) throws IgniteCheckedException {
assert marsh != null;

reader.incrementState();
if (siblingsBytes != null) {
siblings = U.unmarshal(marsh, siblingsBytes, null);

siblingsBytes = null;
}

return true;
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.ignite.internal.codegen.GridJobCancelRequestSerializer;
import org.apache.ignite.internal.codegen.GridJobExecuteResponseSerializer;
import org.apache.ignite.internal.codegen.GridJobSiblingsRequestSerializer;
import org.apache.ignite.internal.codegen.GridJobSiblingsResponseSerializer;
import org.apache.ignite.internal.codegen.GridNearAtomicCheckUpdateRequestSerializer;
import org.apache.ignite.internal.codegen.GridNearAtomicSingleUpdateFilterRequestSerializer;
import org.apache.ignite.internal.codegen.GridNearAtomicSingleUpdateRequestSerializer;
Expand Down Expand Up @@ -313,7 +314,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
factory.register((short)1, GridJobExecuteRequest::new);
factory.register((short)2, GridJobExecuteResponse::new, new GridJobExecuteResponseSerializer());
factory.register((short)3, GridJobSiblingsRequest::new, new GridJobSiblingsRequestSerializer());
factory.register((short)4, GridJobSiblingsResponse::new);
factory.register((short)4, GridJobSiblingsResponse::new, new GridJobSiblingsResponseSerializer());
factory.register((short)5, GridTaskCancelRequest::new, new GridTaskCancelRequestSerializer());
factory.register((short)6, GridTaskSessionRequest::new, new GridTaskSessionRequestSerializer());
factory.register((short)7, GridCheckpointRequest::new, new GridCheckpointRequestSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1412,11 +1412,12 @@ private class JobSiblingsMessageListener implements GridMessageListener {

boolean loc = ctx.localNodeId().equals(nodeId);

ctx.io().sendToCustomTopic(nodeId, topic,
new GridJobSiblingsResponse(
loc ? siblings : null,
loc ? null : U.marshal(marsh, siblings)),
SYSTEM_POOL);
GridJobSiblingsResponse resp = new GridJobSiblingsResponse(siblings);

if (!loc)
resp.marshalSiblings(marsh);

ctx.io().sendToCustomTopic(nodeId, topic, resp, SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send job sibling response.", e);
Expand Down
Loading