Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.message.CalciteErrorMessage;
import org.apache.ignite.internal.processors.query.calcite.message.InboxCloseMessage;
import org.apache.ignite.internal.processors.query.calcite.message.MessageService;
import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
import org.apache.ignite.internal.processors.query.calcite.message.QueryBatchAcknowledgeMessage;
import org.apache.ignite.internal.processors.query.calcite.message.QueryBatchMessage;
import org.apache.ignite.internal.processors.query.calcite.message.QueryCloseMessage;
import org.apache.ignite.internal.processors.query.calcite.message.QueryInboxCloseMessage;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
Expand Down Expand Up @@ -165,7 +165,7 @@ public void queryRegistry(QueryRegistry qryRegistry) {

/** {@inheritDoc} */
@Override public void closeInbox(UUID nodeId, UUID qryId, long fragmentId, long exchangeId) throws IgniteCheckedException {
messageService().send(nodeId, new InboxCloseMessage(qryId, fragmentId, exchangeId));
messageService().send(nodeId, new QueryInboxCloseMessage(qryId, fragmentId, exchangeId));
}

/** {@inheritDoc} */
Expand All @@ -188,8 +188,8 @@ public void queryRegistry(QueryRegistry qryRegistry) {

/** {@inheritDoc} */
@Override public void init() {
messageService().register((n, m) -> onMessage(n, (InboxCloseMessage)m), MessageType.QUERY_INBOX_CANCEL_MESSAGE);
messageService().register((n, m) -> onMessage(n, (QueryBatchAcknowledgeMessage)m), MessageType.QUERY_ACKNOWLEDGE_MESSAGE);
messageService().register((n, m) -> onMessage(n, (QueryInboxCloseMessage)m), MessageType.QUERY_INBOX_CANCEL_MESSAGE);
messageService().register((n, m) -> onMessage(n, (QueryBatchAcknowledgeMessage)m), MessageType.QUERY_BATCH_ACKNOWLEDGE_MESSAGE);
messageService().register((n, m) -> onMessage(n, (QueryBatchMessage)m), MessageType.QUERY_BATCH_MESSAGE);
messageService().register((n, m) -> onMessage(n, (QueryCloseMessage)m), MessageType.QUERY_CLOSE_MESSAGE);
}
Expand Down Expand Up @@ -221,7 +221,7 @@ public void queryRegistry(QueryRegistry qryRegistry) {
}

/** */
protected void onMessage(UUID nodeId, InboxCloseMessage msg) {
protected void onMessage(UUID nodeId, QueryInboxCloseMessage msg) {
Collection<Inbox<?>> inboxes = mailboxRegistry().inboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());

if (!F.isEmpty(inboxes)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.plugin.extensions.communication.Message;

/**
*
*/
public interface CalciteMarshalableMessage extends CalciteMessage {
/** A Calcite engine related message which requires marshalling with context. */
public interface CalciteContextMarshallableMessage extends Message {
/**
* Prepares the message before sending.
*
Expand All @@ -35,6 +34,7 @@ public interface CalciteMarshalableMessage extends CalciteMessage {
* Prepares the message before processing.
*
* @param ctx Cache shared context.
* @param clsLdr Class loader.
*/
void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException;
void finishUnmarshal(GridCacheSharedContext<?, ?> ctx, ClassLoader clsLdr) throws IgniteCheckedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,11 @@
package org.apache.ignite.internal.processors.query.calcite.message;

import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import org.apache.ignite.internal.managers.communication.ErrorMessage;

/**
*
*/
public class CalciteErrorMessage implements CalciteMarshalableMessage {
/** */
public class CalciteErrorMessage extends ErrorMessage {
/** */
@Order(0)
UUID qryId;
Expand All @@ -37,62 +31,28 @@ public class CalciteErrorMessage implements CalciteMarshalableMessage {
@Order(1)
long fragmentId;

/** Error bytes. */
@Order(2)
@GridToStringExclude
@Nullable public byte[] errBytes;

/** Error. */
private @Nullable Throwable err;

/** */
public CalciteErrorMessage() {
// No-op.
}

/** */
public CalciteErrorMessage(UUID qryId, long fragmentId, Throwable err) {
super(err);

assert err != null;

this.qryId = qryId;
this.fragmentId = fragmentId;
this.err = err;
}

/**
* @return Query ID.
*/
/** @return Query ID. */
public UUID queryId() {
return qryId;
}

/**
* @return Fragment ID.
*/
/** @return Fragment ID. */
public long fragmentId() {
return fragmentId;
}

/** */
public @Nullable Throwable error() {
return err;
}

/** {@inheritDoc} */
@Override public MessageType type() {
return MessageType.QUERY_ERROR_MESSAGE;
}


/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
if (err != null)
errBytes = U.marshal(ctx.marshaller(), err);
}

/** {@inheritDoc} */
@Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
if (errBytes != null)
err = U.unmarshal(ctx.marshaller(), errBytes, U.resolveClassLoader(ctx.cache().context().gridConfig()));
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,21 @@

package org.apache.ignite.internal.processors.query.calcite.message;

import java.util.function.Supplier;
import org.apache.ignite.internal.MarshallableMessage;
import org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;

/**
* Message factory.
*/
public class CalciteMessageFactory implements MessageFactoryProvider {
public class CalciteMessageFactory extends AbstractMarshallableMessageFactoryProvider {
/** {@inheritDoc} */
@SuppressWarnings({"unchecked", "rawtypes"})
@Override public void registerAll(MessageFactory factory) {
for (MessageType type : MessageType.values())
factory.register(type.directType(), (Supplier)type.factory(), type.serializer());
}

/**
* Produces a value message.
*/
public static ValueMessage asMessage(Object val) {
if (val == null)
return null;

return new GenericValueMessage(val);
for (MessageType type : MessageType.values()) {
if (MarshallableMessage.class.isAssignableFrom(type.messageClass()))
register(factory, type.messageClass(), type.directType(), schemaAwareMarsh, resolvedClsLdr);
else
register(factory, type.messageClass(), type.directType(), dfltMarsh, dftlClsLdr);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
package org.apache.ignite.internal.processors.query.calcite.message;

import java.util.UUID;
import org.apache.ignite.plugin.extensions.communication.Message;

/**
* Execution context is used to determine a stripe where to process a message.
*/
public interface ExecutionContextAware extends CalciteMessage {
public interface ExecutionContextAware extends Message {
/**
* @return Query ID.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
package org.apache.ignite.internal.processors.query.calcite.message;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.MarshallableMessage;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;

/**
*
*/
public final class GenericValueMessage implements ValueMessage {
/** */
public final class GenericValueMessage implements MarshallableMessage {
/** */
private Object val;

Expand All @@ -35,33 +34,30 @@ public final class GenericValueMessage implements ValueMessage {

/** */
public GenericValueMessage() {

// No-op.
}

/** */
public GenericValueMessage(Object val) {
this.val = val;
}

/** {@inheritDoc} */
@Override public Object value() {
/** */
public Object value() {
return val;
}

/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
if (val != null && serialized == null)
serialized = U.marshal(ctx, val);
serialized = U.marshal(marsh, val);
}

/** {@inheritDoc} */
@Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
if (serialized != null && val == null)
val = U.unmarshal(ctx, serialized, U.resolveClassLoader(ctx.gridConfig()));
}
val = U.unmarshal(marsh, serialized, clsLdr);

/** {@inheritDoc} */
@Override public MessageType type() {
return MessageType.GENERIC_VALUE_MESSAGE;
serialized = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.message;

import java.util.UUID;
import org.apache.ignite.plugin.extensions.communication.Message;

/**
*
Expand All @@ -27,5 +28,5 @@ public interface MessageListener {
* @param nodeId Sender node ID.
* @param msg Message.
*/
void onMessage(UUID nodeId, CalciteMessage msg);
void onMessage(UUID nodeId, Message msg);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.query.calcite.util.Service;
import org.apache.ignite.plugin.extensions.communication.Message;

/**
*
Expand All @@ -31,7 +32,7 @@ public interface MessageService extends Service {
* @param nodeId Node ID.
* @param msg Message.
*/
void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException;
void send(UUID nodeId, Message msg) throws IgniteCheckedException;

/**
* Checks whether a node with given ID is alive.
Expand Down
Loading
Loading