diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java index 039b14ed9e920..ebf5c85c533c0 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java @@ -32,12 +32,12 @@ import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker; import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker; import org.apache.ignite.internal.processors.query.calcite.message.CalciteErrorMessage; -import org.apache.ignite.internal.processors.query.calcite.message.InboxCloseMessage; import org.apache.ignite.internal.processors.query.calcite.message.MessageService; import org.apache.ignite.internal.processors.query.calcite.message.MessageType; import org.apache.ignite.internal.processors.query.calcite.message.QueryBatchAcknowledgeMessage; import org.apache.ignite.internal.processors.query.calcite.message.QueryBatchMessage; import org.apache.ignite.internal.processors.query.calcite.message.QueryCloseMessage; +import org.apache.ignite.internal.processors.query.calcite.message.QueryInboxCloseMessage; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription; import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext; import org.apache.ignite.internal.processors.query.calcite.util.AbstractService; @@ -165,7 +165,7 @@ public void queryRegistry(QueryRegistry qryRegistry) { /** {@inheritDoc} */ @Override public void closeInbox(UUID nodeId, UUID qryId, long fragmentId, long exchangeId) throws IgniteCheckedException { - messageService().send(nodeId, new InboxCloseMessage(qryId, fragmentId, exchangeId)); + messageService().send(nodeId, new QueryInboxCloseMessage(qryId, fragmentId, exchangeId)); } /** {@inheritDoc} */ @@ -188,8 +188,8 @@ public void queryRegistry(QueryRegistry qryRegistry) { /** {@inheritDoc} */ @Override public void init() { - messageService().register((n, m) -> onMessage(n, (InboxCloseMessage)m), MessageType.QUERY_INBOX_CANCEL_MESSAGE); - messageService().register((n, m) -> onMessage(n, (QueryBatchAcknowledgeMessage)m), MessageType.QUERY_ACKNOWLEDGE_MESSAGE); + messageService().register((n, m) -> onMessage(n, (QueryInboxCloseMessage)m), MessageType.QUERY_INBOX_CANCEL_MESSAGE); + messageService().register((n, m) -> onMessage(n, (QueryBatchAcknowledgeMessage)m), MessageType.QUERY_BATCH_ACKNOWLEDGE_MESSAGE); messageService().register((n, m) -> onMessage(n, (QueryBatchMessage)m), MessageType.QUERY_BATCH_MESSAGE); messageService().register((n, m) -> onMessage(n, (QueryCloseMessage)m), MessageType.QUERY_CLOSE_MESSAGE); } @@ -221,7 +221,7 @@ public void queryRegistry(QueryRegistry qryRegistry) { } /** */ - protected void onMessage(UUID nodeId, InboxCloseMessage msg) { + protected void onMessage(UUID nodeId, QueryInboxCloseMessage msg) { Collection> inboxes = mailboxRegistry().inboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId()); if (!F.isEmpty(inboxes)) { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMarshalableMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteContextMarshallableMessage.java similarity index 78% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMarshalableMessage.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteContextMarshallableMessage.java index b671639ebd6c4..f6ba42de95933 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMarshalableMessage.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteContextMarshallableMessage.java @@ -19,11 +19,10 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.plugin.extensions.communication.Message; -/** - * - */ -public interface CalciteMarshalableMessage extends CalciteMessage { +/** A Calcite engine related message which requires marshalling with context. */ +public interface CalciteContextMarshallableMessage extends Message { /** * Prepares the message before sending. * @@ -35,6 +34,7 @@ public interface CalciteMarshalableMessage extends CalciteMessage { * Prepares the message before processing. * * @param ctx Cache shared context. + * @param clsLdr Class loader. */ - void prepareUnmarshal(GridCacheSharedContext ctx) throws IgniteCheckedException; + void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader clsLdr) throws IgniteCheckedException; } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java index ba20b2526b5dc..9d62635cc2285 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteErrorMessage.java @@ -18,17 +18,11 @@ package org.apache.ignite.internal.processors.query.calcite.message; import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.managers.communication.ErrorMessage; -/** - * - */ -public class CalciteErrorMessage implements CalciteMarshalableMessage { +/** */ +public class CalciteErrorMessage extends ErrorMessage { /** */ @Order(0) UUID qryId; @@ -37,14 +31,6 @@ public class CalciteErrorMessage implements CalciteMarshalableMessage { @Order(1) long fragmentId; - /** Error bytes. */ - @Order(2) - @GridToStringExclude - @Nullable public byte[] errBytes; - - /** Error. */ - private @Nullable Throwable err; - /** */ public CalciteErrorMessage() { // No-op. @@ -52,47 +38,21 @@ public CalciteErrorMessage() { /** */ public CalciteErrorMessage(UUID qryId, long fragmentId, Throwable err) { + super(err); + assert err != null; this.qryId = qryId; this.fragmentId = fragmentId; - this.err = err; } - /** - * @return Query ID. - */ + /** @return Query ID. */ public UUID queryId() { return qryId; } - /** - * @return Fragment ID. - */ + /** @return Fragment ID. */ public long fragmentId() { return fragmentId; } - - /** */ - public @Nullable Throwable error() { - return err; - } - - /** {@inheritDoc} */ - @Override public MessageType type() { - return MessageType.QUERY_ERROR_MESSAGE; - } - - - /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - if (err != null) - errBytes = U.marshal(ctx.marshaller(), err); - } - - /** {@inheritDoc} */ - @Override public void prepareUnmarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - if (errBytes != null) - err = U.unmarshal(ctx.marshaller(), errBytes, U.resolveClassLoader(ctx.cache().context().gridConfig())); - } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessage.java deleted file mode 100644 index d23affbb5c302..0000000000000 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessage.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.calcite.message; - -import org.apache.ignite.plugin.extensions.communication.Message; - -/** - * - */ -public interface CalciteMessage extends Message { - /** - * @return Message type. - */ - MessageType type(); -} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java index 1d707cf0c8505..f3a9d6f431d3e 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteMessageFactory.java @@ -17,28 +17,21 @@ package org.apache.ignite.internal.processors.query.calcite.message; -import java.util.function.Supplier; +import org.apache.ignite.internal.MarshallableMessage; +import org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider; import org.apache.ignite.plugin.extensions.communication.MessageFactory; -import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; /** * Message factory. */ -public class CalciteMessageFactory implements MessageFactoryProvider { +public class CalciteMessageFactory extends AbstractMarshallableMessageFactoryProvider { /** {@inheritDoc} */ - @SuppressWarnings({"unchecked", "rawtypes"}) @Override public void registerAll(MessageFactory factory) { - for (MessageType type : MessageType.values()) - factory.register(type.directType(), (Supplier)type.factory(), type.serializer()); - } - - /** - * Produces a value message. - */ - public static ValueMessage asMessage(Object val) { - if (val == null) - return null; - - return new GenericValueMessage(val); + for (MessageType type : MessageType.values()) { + if (MarshallableMessage.class.isAssignableFrom(type.messageClass())) + register(factory, type.messageClass(), type.directType(), schemaAwareMarsh, resolvedClsLdr); + else + register(factory, type.messageClass(), type.directType(), dfltMarsh, dftlClsLdr); + } } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ExecutionContextAware.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ExecutionContextAware.java index 883f504af4755..5710a68fc703a 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ExecutionContextAware.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ExecutionContextAware.java @@ -18,11 +18,12 @@ package org.apache.ignite.internal.processors.query.calcite.message; import java.util.UUID; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Execution context is used to determine a stripe where to process a message. */ -public interface ExecutionContextAware extends CalciteMessage { +public interface ExecutionContextAware extends Message { /** * @return Query ID. */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java index b03659defa95f..40849bab6a73a 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java @@ -18,14 +18,13 @@ package org.apache.ignite.internal.processors.query.calcite.message; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.MarshallableMessage; import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; -/** - * - */ -public final class GenericValueMessage implements ValueMessage { +/** */ +public final class GenericValueMessage implements MarshallableMessage { /** */ private Object val; @@ -35,7 +34,7 @@ public final class GenericValueMessage implements ValueMessage { /** */ public GenericValueMessage() { - + // No-op. } /** */ @@ -43,25 +42,22 @@ public GenericValueMessage(Object val) { this.val = val; } - /** {@inheritDoc} */ - @Override public Object value() { + /** */ + public Object value() { return val; } /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { if (val != null && serialized == null) - serialized = U.marshal(ctx, val); + serialized = U.marshal(marsh, val); } /** {@inheritDoc} */ - @Override public void prepareUnmarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { if (serialized != null && val == null) - val = U.unmarshal(ctx, serialized, U.resolveClassLoader(ctx.gridConfig())); - } + val = U.unmarshal(marsh, serialized, clsLdr); - /** {@inheritDoc} */ - @Override public MessageType type() { - return MessageType.GENERIC_VALUE_MESSAGE; + serialized = null; } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageListener.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageListener.java index 61fc897715778..68e2d2710bd93 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageListener.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageListener.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.calcite.message; import java.util.UUID; +import org.apache.ignite.plugin.extensions.communication.Message; /** * @@ -27,5 +28,5 @@ public interface MessageListener { * @param nodeId Sender node ID. * @param msg Message. */ - void onMessage(UUID nodeId, CalciteMessage msg); + void onMessage(UUID nodeId, Message msg); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java index 96b2aee7c2f96..caa1de990fe07 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java @@ -20,6 +20,7 @@ import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.query.calcite.util.Service; +import org.apache.ignite.plugin.extensions.communication.Message; /** * @@ -31,7 +32,7 @@ public interface MessageService extends Service { * @param nodeId Node ID. * @param msg Message. */ - void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException; + void send(UUID nodeId, Message msg) throws IgniteCheckedException; /** * Checks whether a node with given ID is alive. diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java index 2cea51aa5cfac..dd421bf1f6cc3 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java @@ -17,7 +17,8 @@ package org.apache.ignite.internal.processors.query.calcite.message; -import java.util.EnumMap; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; @@ -49,6 +50,9 @@ public class MessageServiceImpl extends AbstractService implements MessageServic /** */ private final GridCacheSharedContext ctx; + /** */ + private final ClassLoader clsLdr; + /** */ private UUID localNodeId; @@ -62,14 +66,15 @@ public class MessageServiceImpl extends AbstractService implements MessageServic private FailureProcessor failureProcessor; /** */ - private EnumMap lsnrs; + private Map lsnrs; /** */ public MessageServiceImpl(GridKernalContext ctx) { super(ctx); this.ctx = ctx.cache().context(); - this.ioManager = ctx.io(); + clsLdr = U.resolveClassLoader(ctx.config()); + ioManager = ctx.io(); msgLsnr = this::onMessage; } @@ -146,7 +151,7 @@ public FailureProcessor failureProcessor() { } /** {@inheritDoc} */ - @Override public void send(UUID nodeId, CalciteMessage msg) throws IgniteCheckedException { + @Override public void send(UUID nodeId, Message msg) throws IgniteCheckedException { if (localNodeId().equals(nodeId)) onMessage(nodeId, msg); else { @@ -159,9 +164,9 @@ public FailureProcessor failureProcessor() { /** {@inheritDoc} */ @Override public void register(MessageListener lsnr, MessageType type) { if (lsnrs == null) - lsnrs = new EnumMap<>(MessageType.class); + lsnrs = new HashMap<>(); - MessageListener old = lsnrs.put(type, lsnr); + MessageListener old = lsnrs.put(type.directType(), lsnr); assert old == null : old; } @@ -179,8 +184,8 @@ public FailureProcessor failureProcessor() { /** */ protected void prepareMarshal(Message msg) throws IgniteCheckedException { try { - if (msg instanceof CalciteMarshalableMessage) - ((CalciteMarshalableMessage)msg).prepareMarshal(ctx); + if (msg instanceof CalciteContextMarshallableMessage) + ((CalciteContextMarshallableMessage)msg).prepareMarshal(ctx); } catch (Exception e) { failureProcessor().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); @@ -192,8 +197,8 @@ protected void prepareMarshal(Message msg) throws IgniteCheckedException { /** */ protected void prepareUnmarshal(Message msg) throws IgniteCheckedException { try { - if (msg instanceof CalciteMarshalableMessage) - ((CalciteMarshalableMessage)msg).prepareUnmarshal(ctx); + if (msg instanceof CalciteContextMarshallableMessage) + ((CalciteContextMarshallableMessage)msg).finishUnmarshal(ctx, clsLdr); } catch (Exception e) { failureProcessor().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); @@ -203,7 +208,7 @@ protected void prepareUnmarshal(Message msg) throws IgniteCheckedException { } /** */ - protected void onMessage(UUID nodeId, CalciteMessage msg) { + protected void onMessage(UUID nodeId, Message msg) { if (msg instanceof ExecutionContextAware) { ExecutionContextAware msg0 = (ExecutionContextAware)msg; taskExecutor().execute(msg0.queryId(), msg0.fragmentId(), () -> onMessageInternal(nodeId, msg)); @@ -218,16 +223,16 @@ protected void onMessage(UUID nodeId, CalciteMessage msg) { /** */ private void onMessage(UUID nodeId, Object msg, byte plc) { - if (msg instanceof CalciteMessage) - onMessage(nodeId, (CalciteMessage)msg); + if (msg instanceof Message && MessageType.isCalciteMessage((Message)msg)) + onMessage(nodeId, (Message)msg); } /** */ - private void onMessageInternal(UUID nodeId, CalciteMessage msg) { + private void onMessageInternal(UUID nodeId, Message msg) { try { prepareUnmarshal(msg); - MessageListener lsnr = Objects.requireNonNull(lsnrs.get(msg.type())); + MessageListener lsnr = Objects.requireNonNull(lsnrs.get(msg.directType())); lsnr.onMessage(nodeId, msg); } catch (IgniteCheckedException e) { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java index d0ce95c54daa0..045dac80f69c7 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java @@ -17,93 +17,82 @@ package org.apache.ignite.internal.processors.query.calcite.message; -import java.util.function.Supplier; import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup; -import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroupSerializer; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription; -import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescriptionSerializer; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping; -import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMappingSerializer; -import org.apache.ignite.plugin.extensions.communication.MessageSerializer; +import org.apache.ignite.plugin.extensions.communication.Message; -/** - * - */ +/** */ public enum MessageType { /** */ - QUERY_START_REQUEST(300, QueryStartRequest::new, new QueryStartRequestSerializer()), + QUERY_START_REQUEST(300, QueryStartRequest.class), /** */ - QUERY_START_RESPONSE(301, QueryStartResponse::new, new QueryStartResponseSerializer()), + QUERY_START_RESPONSE(301, QueryStartResponse.class), /** */ - QUERY_ERROR_MESSAGE(302, CalciteErrorMessage::new, new CalciteErrorMessageSerializer()), + QUERY_ERROR_MESSAGE(302, CalciteErrorMessage.class), /** */ - QUERY_BATCH_MESSAGE(303, QueryBatchMessage::new, new QueryBatchMessageSerializer()), + QUERY_BATCH_MESSAGE(303, QueryBatchMessage.class), /** */ - QUERY_ACKNOWLEDGE_MESSAGE(304, QueryBatchAcknowledgeMessage::new, new QueryBatchAcknowledgeMessageSerializer()), + QUERY_BATCH_ACKNOWLEDGE_MESSAGE(304, QueryBatchAcknowledgeMessage.class), /** */ - QUERY_INBOX_CANCEL_MESSAGE(305, InboxCloseMessage::new, new InboxCloseMessageSerializer()), + QUERY_INBOX_CANCEL_MESSAGE(305, QueryInboxCloseMessage.class), /** */ - QUERY_CLOSE_MESSAGE(306, QueryCloseMessage::new, new QueryCloseMessageSerializer()), + QUERY_CLOSE_MESSAGE(306, QueryCloseMessage.class), /** */ - GENERIC_VALUE_MESSAGE(307, GenericValueMessage::new, new GenericValueMessageSerializer()), + GENERIC_VALUE_MESSAGE(307, GenericValueMessage.class), /** */ - FRAGMENT_MAPPING(350, FragmentMapping::new, new FragmentMappingSerializer()), + FRAGMENT_MAPPING(350, FragmentMapping.class), /** */ - COLOCATION_GROUP(351, ColocationGroup::new, new ColocationGroupSerializer()), + COLOCATION_GROUP(351, ColocationGroup.class), /** */ - FRAGMENT_DESCRIPTION(352, FragmentDescription::new, new FragmentDescriptionSerializer()), + FRAGMENT_DESCRIPTION(352, FragmentDescription.class), /** */ - QUERY_TX_ENTRY(353, QueryTxEntry::new, new QueryTxEntrySerializer()); + QUERY_TX_ENTRY(353, QueryTxEntry.class); /** */ - private final int directType; + private final short directType; /** */ - private final Supplier factory; - - /** */ - private final MessageSerializer serializer; + private final Class msgCls; /** * @param directType Direct type. - * @param factory Message factory. - * @param serializer Message serializer. */ - MessageType(int directType, Supplier factory, MessageSerializer serializer) { - this.directType = directType; - this.factory = factory; - this.serializer = serializer; + MessageType(int directType, Class msgCls) { + this.directType = (short)directType; + this.msgCls = msgCls; } - /** - * @return Message factory. - */ - public Supplier factory() { - return factory; + /** */ + static boolean isCalciteMessage(Message msg) { + MessageType[] values = values(); + short msgType = msg.directType(); + + return msgType >= values[0].directType() && msgType <= values[values.length - 1].directType(); } /** * @return Message direct type. */ public short directType() { - return (short)directType; + return directType; } /** - * @return Message serializer. + * @return Message direct type. */ - public MessageSerializer serializer() { - return serializer; + public Class messageClass() { + return msgCls; } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchAcknowledgeMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchAcknowledgeMessage.java index ec7b04e05adfb..acf2b31bedcb6 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchAcknowledgeMessage.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchAcknowledgeMessage.java @@ -76,9 +76,4 @@ public long exchangeId() { public int batchId() { return batchId; } - - /** {@inheritDoc} */ - @Override public MessageType type() { - return MessageType.QUERY_ACKNOWLEDGE_MESSAGE; - } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java index c21297c3a5c7d..487baff9fdca1 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java @@ -17,17 +17,13 @@ package org.apache.ignite.internal.processors.query.calcite.message; -import java.util.ArrayList; import java.util.List; import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; +import java.util.stream.Collectors; import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -/** - * - */ -public class QueryBatchMessage implements CalciteMarshalableMessage, ExecutionContextAware { +/** */ +public class QueryBatchMessage implements ExecutionContextAware { /** */ @Order(0) UUID qryId; @@ -48,15 +44,13 @@ public class QueryBatchMessage implements CalciteMarshalableMessage, ExecutionCo @Order(4) boolean last; - /** */ - private List rows; - /** */ @Order(5) - List mRows; + List mRows; /** */ public QueryBatchMessage() { + // No-op. } /** */ @@ -66,7 +60,8 @@ public QueryBatchMessage(UUID qryId, long fragmentId, long exchangeId, int batch this.exchangeId = exchangeId; this.batchId = batchId; this.last = last; - this.rows = rows; + + mRows = rows.stream().map(o -> o == null ? null : new GenericValueMessage(o)).collect(Collectors.toList()); } /** {@inheritDoc} */ @@ -104,45 +99,6 @@ public boolean last() { * @return Rows. */ public List rows() { - return rows; - } - - /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - if (mRows != null || rows == null) - return; - - mRows = new ArrayList<>(rows.size()); - - for (Object row : rows) { - ValueMessage mRow = CalciteMessageFactory.asMessage(row); - - assert mRow != null; - - mRow.prepareMarshal(ctx); - - mRows.add(mRow); - } - } - - /** {@inheritDoc} */ - @Override public void prepareUnmarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - if (rows != null || mRows == null) - return; - - rows = new ArrayList<>(mRows.size()); - - for (ValueMessage mRow : mRows) { - assert mRow != null; - - mRow.prepareUnmarshal(ctx); - - rows.add(mRow.value()); - } - } - - /** {@inheritDoc} */ - @Override public MessageType type() { - return MessageType.QUERY_BATCH_MESSAGE; + return mRows.stream().map(GenericValueMessage::value).collect(Collectors.toList()); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCloseMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCloseMessage.java index 6db451d285cc8..4972314aa0a41 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCloseMessage.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryCloseMessage.java @@ -19,11 +19,12 @@ import java.util.UUID; import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; /** * */ -public class QueryCloseMessage implements CalciteMessage { +public class QueryCloseMessage implements Message { /** */ @Order(0) UUID qryId; @@ -44,9 +45,4 @@ public QueryCloseMessage(UUID qryId) { public UUID queryId() { return qryId; } - - /** {@inheritDoc} */ - @Override public MessageType type() { - return MessageType.QUERY_CLOSE_MESSAGE; - } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCloseMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryInboxCloseMessage.java similarity index 83% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCloseMessage.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryInboxCloseMessage.java index 19638738da0a4..532c024d6573d 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/InboxCloseMessage.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryInboxCloseMessage.java @@ -19,11 +19,12 @@ import java.util.UUID; import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; /** * */ -public class InboxCloseMessage implements CalciteMessage { +public class QueryInboxCloseMessage implements Message { /** */ @Order(0) UUID qryId; @@ -37,12 +38,12 @@ public class InboxCloseMessage implements CalciteMessage { long exchangeId; /** */ - public InboxCloseMessage() { + public QueryInboxCloseMessage() { // No-op. } /** */ - public InboxCloseMessage(UUID qryId, long fragmentId, long exchangeId) { + public QueryInboxCloseMessage(UUID qryId, long fragmentId, long exchangeId) { this.qryId = qryId; this.fragmentId = fragmentId; this.exchangeId = exchangeId; @@ -68,9 +69,4 @@ public long fragmentId() { public long exchangeId() { return exchangeId; } - - /** {@inheritDoc} */ - @Override public MessageType type() { - return MessageType.QUERY_INBOX_CANCEL_MESSAGE; - } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java index bc0817f24736e..15d34e00fbbfe 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java @@ -21,17 +21,19 @@ import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.MarshallableMessage; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; import org.jetbrains.annotations.Nullable; /** * */ -public class QueryStartRequest implements CalciteMarshalableMessage, ExecutionContextAware { +public class QueryStartRequest implements MarshallableMessage, CalciteContextMarshallableMessage, ExecutionContextAware { /** */ @Order(0) String schema; @@ -116,7 +118,9 @@ public QueryStartRequest( } /** */ - QueryStartRequest() {} + public QueryStartRequest() { + // No-op. + } /** * @return Schema name. @@ -211,12 +215,13 @@ public boolean keepBinaryMode() { } /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { if (paramsBytes == null && params != null) - paramsBytes = U.marshal(ctx, params); - - fragmentDesc.prepareMarshal(ctx); + paramsBytes = U.marshal(marsh, params); + } + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { if (qryTxEntries != null) { for (QueryTxEntry e : qryTxEntries) e.prepareMarshal(ctx); @@ -224,22 +229,18 @@ public boolean keepBinaryMode() { } /** {@inheritDoc} */ - @Override public void prepareUnmarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - ClassLoader ldr = U.resolveClassLoader(ctx.gridConfig()); - + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { if (params == null && paramsBytes != null) - params = U.unmarshal(ctx, paramsBytes, ldr); + params = U.unmarshal(marsh, paramsBytes, clsLdr); - fragmentDesc.prepareUnmarshal(ctx); + paramsBytes = null; + } + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader clsLdr) throws IgniteCheckedException { if (qryTxEntries != null) { for (QueryTxEntry e : qryTxEntries) - e.prepareUnmarshal(ctx, ldr); + e.finishUnmarshal(ctx, clsLdr); } } - - /** {@inheritDoc} */ - @Override public MessageType type() { - return MessageType.QUERY_START_REQUEST; - } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java index f908cb61383bf..afc9d7c78126e 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java @@ -20,12 +20,13 @@ import java.util.UUID; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.communication.ErrorMessage; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** * */ -public class QueryStartResponse implements CalciteMessage { +public class QueryStartResponse implements Message { /** */ @Order(0) UUID qryId; @@ -70,9 +71,4 @@ public long fragmentId() { public @Nullable Throwable error() { return ErrorMessage.error(errMsg); } - - /** {@inheritDoc} */ - @Override public MessageType type() { - return MessageType.QUERY_START_RESPONSE; - } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java index 7961f589d83ef..36f2d9dceee84 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java @@ -38,7 +38,7 @@ * @see ExecutionContext#transactionChanges(int, int[], Function, Comparator) * @see QueryStartRequest#queryTransactionEntries() */ -public class QueryTxEntry implements CalciteMessage { +public class QueryTxEntry implements CalciteContextMarshallableMessage { /** Cache id. */ @Order(0) int cacheId; @@ -95,8 +95,8 @@ public GridCacheVersion version() { return ver; } - /** */ - public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { CacheObjectContext coctx = ctx.cacheContext(cacheId).cacheObjectContext(); key.prepareMarshal(coctx); @@ -105,8 +105,8 @@ public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteChecke val.prepareMarshal(coctx); } - /** */ - public void prepareUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { CacheObjectContext coctx = ctx.cacheContext(cacheId).cacheObjectContext(); key.finishUnmarshal(coctx, ldr); @@ -114,9 +114,4 @@ public void prepareUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) if (val != null) val.finishUnmarshal(coctx, ldr); } - - /** {@inheritDoc} */ - @Override public MessageType type() { - return MessageType.QUERY_TX_ENTRY; - } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ValueMessage.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ValueMessage.java deleted file mode 100644 index 901bcbfafa456..0000000000000 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ValueMessage.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.calcite.message; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; - -/** */ -public interface ValueMessage extends CalciteMarshalableMessage { - /** - * @return Wrapped value. - */ - Object value(); - - /** {@inheritDoc} */ - @Override default void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - // No-op - } - - /** {@inheritDoc} */ - @Override default void prepareUnmarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - // No-op - } -} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java index 3e4f800158900..7981d736953e2 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java @@ -30,19 +30,18 @@ import java.util.stream.Collectors; import java.util.stream.LongStream; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.MarshallableMessage; import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; -import org.apache.ignite.internal.processors.query.calcite.message.CalciteMarshalableMessage; -import org.apache.ignite.internal.processors.query.calcite.message.MessageType; import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.util.GridIntIterator; import org.apache.ignite.internal.util.GridIntList; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; /** */ -public class ColocationGroup implements CalciteMarshalableMessage { +public class ColocationGroup implements MarshallableMessage { /** */ @Order(0) long[] srcIds; @@ -102,6 +101,7 @@ public ColocationGroup local(UUID nodeId) { /** */ public ColocationGroup() { + // No-op. } /** */ @@ -314,12 +314,7 @@ public int[] partitions(UUID nodeId) { } /** {@inheritDoc} */ - @Override public MessageType type() { - return MessageType.COLOCATION_GROUP; - } - - /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { if (assignments == null || primaryAssignment) return; @@ -348,7 +343,7 @@ public int[] partitions(UUID nodeId) { } /** {@inheritDoc} */ - @Override public void prepareUnmarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { if (F.isEmpty(marshalledAssignments)) return; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java index 009a9c6c9e3ba..7560be273c184 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java @@ -21,13 +21,12 @@ import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.MarshallableMessage; import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.query.calcite.message.CalciteMarshalableMessage; -import org.apache.ignite.internal.processors.query.calcite.message.MessageType; +import org.apache.ignite.marshaller.Marshaller; /** */ -public class FragmentDescription implements CalciteMarshalableMessage { +public class FragmentDescription implements MarshallableMessage { /** */ @Order(0) long fragmentId; @@ -46,6 +45,7 @@ public class FragmentDescription implements CalciteMarshalableMessage { /** */ public FragmentDescription() { + // No-op. } /** */ @@ -97,27 +97,14 @@ public void mapping(FragmentMapping mapping) { this.mapping = mapping; } - /** {@inheritDoc} */ - @Override public MessageType type() { - return MessageType.FRAGMENT_DESCRIPTION; - } - - /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - if (target != null) { + /** */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (target != null) target = target.explicitMapping(); - - target.prepareMarshal(ctx); - } - - if (mapping != null) - mapping.prepareMarshal(ctx); } - /** {@inheritDoc} */ - @Override public void prepareUnmarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - target.prepareUnmarshal(ctx); - - mapping.prepareUnmarshal(ctx); + /** */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + // No-op. } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java index 910b990476b8b..db8368e351d1a 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java @@ -23,26 +23,24 @@ import java.util.UUID; import java.util.function.Supplier; import java.util.stream.Collectors; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.query.calcite.message.CalciteMarshalableMessage; -import org.apache.ignite.internal.processors.query.calcite.message.MessageType; import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.NotNull; /** * */ -public class FragmentMapping implements CalciteMarshalableMessage { +public class FragmentMapping implements Message { /** */ @Order(0) List colocationGrps; /** */ public FragmentMapping() { + // No-op. } /** */ @@ -174,21 +172,4 @@ public FragmentMapping explicitMapping(Set srcIds) { return new FragmentMapping(Commons.transform(colocationGrps, g -> explicitMappingGrps.contains(g) ? g.explicitMapping() : g)); } - - /** {@inheritDoc} */ - @Override public MessageType type() { - return MessageType.FRAGMENT_MAPPING; - } - - /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - for (ColocationGroup grp : colocationGrps) - grp.prepareMarshal(ctx); - } - - /** {@inheritDoc} */ - @Override public void prepareUnmarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - for (ColocationGroup grp : colocationGrps) - grp.prepareUnmarshal(ctx); - } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java index 6cf79d4f5dfa3..e67cd18885ea8 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java @@ -55,7 +55,6 @@ import org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker; import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker; -import org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage; import org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl; import org.apache.ignite.internal.processors.query.calcite.message.TestIoManager; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription; @@ -389,7 +388,7 @@ private TestMessageServiceImpl(GridKernalContext kernal, TestIoManager mgr) { } /** {@inheritDoc} */ - @Override public void send(UUID nodeId, CalciteMessage msg) { + @Override public void send(UUID nodeId, Message msg) { mgr.send(localNodeId(), nodeId, msg); } @@ -397,16 +396,6 @@ private TestMessageServiceImpl(GridKernalContext kernal, TestIoManager mgr) { @Override public boolean alive(UUID nodeId) { return true; } - - /** {@inheritDoc} */ - @Override protected void prepareMarshal(Message msg) { - // No-op; - } - - /** {@inheritDoc} */ - @Override protected void prepareUnmarshal(Message msg) { - // No-op; - } } /** diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java index efd4bffc4ec72..af835b4f0adf6 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java @@ -27,11 +27,14 @@ import java.util.stream.Stream; import com.google.common.collect.ImmutableList; import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; import org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistry; +import org.apache.ignite.internal.processors.query.calcite.message.CalciteMessageFactory; import org.apache.ignite.internal.processors.query.calcite.trait.AllNodes; import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils; +import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.junit.Before; import org.junit.Test; import org.junit.runners.Parameterized; @@ -94,6 +97,9 @@ public static List data() { @Override public void setup() throws Exception { nodesCnt = remoteFragmentsCnt + 1; super.setup(); + + // Register messages in Message#REGISTRATIONS and avoids failure in Message#directType(). + new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new CalciteMessageFactory()}); } /** */ diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/message/TestIoManager.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/message/TestIoManager.java index 538b1f6426a9f..eaef5d2afe65b 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/message/TestIoManager.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/message/TestIoManager.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import org.apache.ignite.plugin.extensions.communication.Message; /** * @@ -29,7 +30,7 @@ public class TestIoManager { private final Map srvcMap = new ConcurrentHashMap<>(); /** */ - public void send(UUID senderId, UUID nodeId, CalciteMessage msg) { + public void send(UUID senderId, UUID nodeId, Message msg) { srvcMap.get(nodeId).onMessage(senderId, msg); } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java index b0310c799ce22..e91e894e7c277 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java @@ -53,7 +53,6 @@ import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; import org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader; -import org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage; import org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl; import org.apache.ignite.internal.processors.query.calcite.message.TestIoManager; import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup; @@ -740,7 +739,7 @@ static class TestMessageServiceImpl extends MessageServiceImpl { } /** {@inheritDoc} */ - @Override public void send(UUID nodeId, CalciteMessage msg) { + @Override public void send(UUID nodeId, Message msg) { mgr.send(localNodeId(), nodeId, msg); } @@ -748,16 +747,6 @@ static class TestMessageServiceImpl extends MessageServiceImpl { @Override public boolean alive(UUID nodeId) { return true; } - - /** {@inheritDoc} */ - @Override protected void prepareMarshal(Message msg) { - // No-op; - } - - /** {@inheritDoc} */ - @Override protected void prepareUnmarshal(Message msg) { - // No-op; - } } /** */ diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanExecutionTest.java index 11483e7418161..75530ae030454 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlanExecutionTest.java @@ -26,6 +26,7 @@ import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.util.ImmutableBitSet; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.query.calcite.QueryRegistryImpl; @@ -40,6 +41,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker; import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker; +import org.apache.ignite.internal.processors.query.calcite.message.CalciteMessageFactory; import org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl; import org.apache.ignite.internal.processors.query.calcite.message.TestIoManager; import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup; @@ -63,6 +65,7 @@ import org.apache.ignite.internal.processors.security.NoOpIgniteSecurityProcessor; import org.apache.ignite.internal.thread.pool.IgniteStripedThreadPoolExecutor; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.junit.Assert; import org.junit.Test; @@ -75,6 +78,14 @@ */ @SuppressWarnings({"TooBroadScope", "FieldCanBeLocal", "TypeMayBeWeakened"}) public class PlanExecutionTest extends AbstractPlannerTest { + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + // Register messages in Message#REGISTRATIONS and avoids failure in Message#directType(). + new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new CalciteMessageFactory()}); + } + /** * @throws Exception If failed. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index cb076954ad8ef..aa5d1e210cbaa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -17,8 +17,7 @@ package org.apache.ignite.internal; -import java.lang.reflect.Constructor; -import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.cache.query.index.IndexQueryResultMeta; import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition; @@ -40,6 +39,7 @@ import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse; import org.apache.ignite.internal.managers.encryption.MasterKeyChangeRequest; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage; +import org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider; import org.apache.ignite.internal.processors.authentication.User; import org.apache.ignite.internal.processors.authentication.UserAcceptedMessage; import org.apache.ignite.internal.processors.authentication.UserAuthenticateRequestMessage; @@ -230,14 +230,10 @@ import org.apache.ignite.internal.util.distributed.FullMessage; import org.apache.ignite.internal.util.distributed.InitMessage; import org.apache.ignite.internal.util.distributed.SingleNodeMessage; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; -import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; -import org.apache.ignite.plugin.extensions.communication.MessageSerializer; import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest; import org.apache.ignite.spi.communication.tcp.internal.TcpConnectionRequestDiscoveryMessage; import org.apache.ignite.spi.communication.tcp.internal.TcpInverseConnectionResponseMessage; @@ -278,7 +274,7 @@ import org.jetbrains.annotations.Nullable; /** */ -public class CoreMessagesProvider implements MessageFactoryProvider { +public class CoreMessagesProvider extends AbstractMarshallableMessageFactoryProvider { /** Node ID message type. */ public static final short NODE_ID_MSG_TYPE = 11500; @@ -291,15 +287,6 @@ public class CoreMessagesProvider implements MessageFactoryProvider { /** */ public static final short MAX_MESSAGE_ID = 15_000; - /** Binary marshaller. */ - private final Marshaller schemaAwareMarhaller; - - /** Binary marshaller. */ - private final Marshaller schemaLessMarshaller; - - /** Resolved classloader. */ - private final ClassLoader resolvedClsLdr; - /** */ private short msgIdx; @@ -307,14 +294,22 @@ public class CoreMessagesProvider implements MessageFactoryProvider { private @Nullable MessageFactory factory; /** - * @param schemaAwareMarhaller Schema-aware marshaller like {@link BinaryMarshaller}. - * @param schemaLessMarshaller Pure, schemaless marshaller like {@link JdkMarshaller}. - * @param resolvedClsLdr Resolved classloader. + * Default plugin-purposes constructor. + * + * @see #init(Marshaller, ClassLoader) */ - public CoreMessagesProvider(Marshaller schemaAwareMarhaller, Marshaller schemaLessMarshaller, ClassLoader resolvedClsLdr) { - this.schemaAwareMarhaller = schemaAwareMarhaller; - this.schemaLessMarshaller = schemaLessMarshaller; - this.resolvedClsLdr = resolvedClsLdr; + public CoreMessagesProvider() { + // No-op. + } + + /** + * Constructor allowing not to call {@link #init(Marshaller, ClassLoader)}. + * + * @param schemaAwareMarsh Schema-aware marshaller like {@link BinaryMarshaller}. + * @param resolvedClsLdr Resolved (configured) class loader like {@link IgniteConfiguration#setClassLoader(ClassLoader)}. + */ + public CoreMessagesProvider(Marshaller schemaAwareMarsh, ClassLoader resolvedClsLdr) { + init(schemaAwareMarsh, resolvedClsLdr); } /** The order is important. If wish to remove a message, put 'msgIdx++' on its place. */ @@ -644,52 +639,23 @@ public CoreMessagesProvider(Marshaller schemaAwareMarhaller, Marshaller schemaLe assert msgIdx <= MAX_MESSAGE_ID; } - /** Registers message using {@link #schemaAwareMarhaller} and {@link U#gridClassLoader()}. */ - private void withSchema(Class cls) { - register(cls, schemaAwareMarhaller, U.gridClassLoader()); + /** Registers message using {@link #dfltMarsh} and {@link #dftlClsLdr}. */ + private void withNoSchema(Class cls) { + register(cls, dfltMarsh, dftlClsLdr); } - /** Registers message using {@link #schemaLessMarshaller} and {@link U#gridClassLoader()}. */ - private void withNoSchema(Class cls) { - register(cls, schemaLessMarshaller, U.gridClassLoader()); + /** Registers message using {@link #schemaAwareMarsh} and {@link #dftlClsLdr}. */ + private void withSchema(Class cls) { + register(cls, schemaAwareMarsh, dftlClsLdr); } - /** Registers message using {@link #schemaLessMarshaller} and {@link #resolvedClsLdr}. */ + /** Registers message using {@link #schemaAwareMarsh} and {@link #resolvedClsLdr}. */ private void withNoSchemaResolvedClassLoader(Class cls) { - register(cls, schemaLessMarshaller, resolvedClsLdr); + register(cls, dfltMarsh, resolvedClsLdr); } /** Registers message using incrementing {@link #msgIdx} as the message id/type. */ private void register(Class cls, Marshaller marsh, ClassLoader clsLrd) { - Constructor ctor; - MessageSerializer serializer; - - try { - ctor = cls.getConstructor(); - - boolean marshallable = MarshallableMessage.class.isAssignableFrom(cls); - - Class serCls = Class.forName(cls.getName() + (marshallable ? "MarshallableSerializer" : "Serializer")); - - serializer = marshallable - ? (MessageSerializer)serCls.getConstructor(Marshaller.class, ClassLoader.class).newInstance(marsh, clsLrd) - : (MessageSerializer)serCls.getConstructor().newInstance(); - } - catch (Exception e) { - throw new IgniteException("Failed to register message of type " + cls.getSimpleName(), e); - } - - factory.register( - msgIdx++, - () -> { - try { - return ctor.newInstance(); - } - catch (Exception e) { - throw new IgniteException("Failed to create message of type " + cls.getSimpleName(), e); - } - }, - serializer - ); + register(factory, cls, msgIdx++, marsh, clsLrd); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 315130fe56123..a27c0623edafc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -112,6 +112,7 @@ import org.apache.ignite.internal.managers.systemview.GridSystemViewManager; import org.apache.ignite.internal.managers.systemview.IgniteConfigurationIterable; import org.apache.ignite.internal.managers.tracing.GridTracingManager; +import org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider; import org.apache.ignite.internal.plugin.IgniteLogInfoProvider; import org.apache.ignite.internal.plugin.IgniteLogInfoProviderImpl; import org.apache.ignite.internal.processors.GridProcessor; @@ -1098,7 +1099,7 @@ public void start( startProcessor(new GridTaskProcessor(ctx)); startProcessor((GridProcessor)SCHEDULE.createOptional(ctx)); startProcessor(createComponent(IgniteRestProcessor.class, ctx)); - startProcessor(new DataStreamProcessor(ctx)); + startProcessor(new DataStreamProcessor<>(ctx)); startProcessor(new GridContinuousProcessor(ctx)); startProcessor(new DataStructuresProcessor(ctx)); startProcessor(createComponent(PlatformProcessor.class, ctx)); @@ -1114,7 +1115,7 @@ public void start( startTimer.finishGlobalStage("Start processors"); // Start plugins. - for (PluginProvider provider : ctx.plugins().allProviders()) { + for (PluginProvider provider : ctx.plugins().allProviders()) { ctx.add(new GridPluginComponent(provider)); provider.start(ctx.plugins().pluginContextForProvider(provider)); @@ -1255,7 +1256,7 @@ public void start( } // Start plugins. - for (PluginProvider provider : ctx.plugins().allProviders()) + for (PluginProvider provider : ctx.plugins().allProviders()) provider.onIgniteStart(); if (recon) @@ -1319,14 +1320,17 @@ private void initMessageFactory() throws IgniteCheckedException { List compMsgs = new ArrayList<>(); - compMsgs.add(new CoreMessagesProvider(ctx.marshaller(), ctx.marshallerContext().jdkMarshaller(), - U.resolveClassLoader(ctx.config()))); + compMsgs.add(new CoreMessagesProvider(ctx.marshaller(), U.resolveClassLoader(ctx.config()))); for (IgniteComponentType compType : IgniteComponentType.values()) { MessageFactoryProvider f = compType.messageFactory(); - if (f != null) + if (f != null) { + if (f instanceof AbstractMarshallableMessageFactoryProvider) + ((AbstractMarshallableMessageFactoryProvider)f).init(ctx.marshaller(), U.resolveClassLoader(ctx.config())); + compMsgs.add(f); + } } DiscoverySpi discoSpi = ctx.config().getDiscoverySpi(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index a9d0b45145820..ef79d00696910 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -346,9 +346,6 @@ public class GridIoManager extends GridManagerAdapter> new GridBoundedConcurrentLinkedHashSet<>(MAX_CLOSED_TOPICS, MAX_CLOSED_TOPICS, 0.75f, 256, PER_SEGMENT_Q_OPTIMIZED_RMV); - /** */ - private MessageFactory msgFactory; - /** */ private MessageFormatter formatter; @@ -393,9 +390,9 @@ public GridIoManager(GridKernalContext ctx) { * @return Message factory. */ public MessageFactory messageFactory() { - assert msgFactory != null; + assert ctx.messageFactory() != null; - return msgFactory; + return ctx.messageFactory(); } /** @@ -437,8 +434,6 @@ public void resetMetrics() { }; } - msgFactory = ctx.messageFactory(); - CommunicationSpi spi = getSpi(); if ((CommunicationSpi)spi instanceof TcpCommunicationSpi) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/plugin/AbstractMarshallableMessageFactoryProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/plugin/AbstractMarshallableMessageFactoryProvider.java new file mode 100644 index 0000000000000..03a65b95b6e0c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/plugin/AbstractMarshallableMessageFactoryProvider.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.plugin; + +import java.lang.reflect.Constructor; +import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.MarshallableMessage; +import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.marshaller.Marshallers; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; +import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; +import org.apache.ignite.plugin.extensions.communication.MessageSerializer; + +/** + * An extension of {@link MessageFactoryProvider} allowing to use provided schema-aware marshaller + * and resolved class loader to register {@link MarshallableMessage}. + */ +public abstract class AbstractMarshallableMessageFactoryProvider implements MessageFactoryProvider { + /** Default schema-less marshaller. */ + protected final Marshaller dfltMarsh = Marshallers.jdk(); + + /** Default class loader. */ + protected final ClassLoader dftlClsLdr = U.gridClassLoader(); + + /** Schema-aware marshaller like {@link BinaryMarshaller}. */ + protected Marshaller schemaAwareMarsh; + + /** Resolved (configured) class loader like {@link IgniteConfiguration#setClassLoader(ClassLoader)}. */ + protected ClassLoader resolvedClsLdr; + + /** + * @param schemaAwareMarsh Schema-aware marshaller like {@link BinaryMarshaller}. + * @param resolvedClsLdr Resolved (configured) class loader like {@link IgniteConfiguration#setClassLoader(ClassLoader)}. + */ + public void init(Marshaller schemaAwareMarsh, ClassLoader resolvedClsLdr) { + this.schemaAwareMarsh = schemaAwareMarsh; + this.resolvedClsLdr = resolvedClsLdr; + } + + /** Registers message automatically generating message supplier and serializer. */ + protected static void register(MessageFactory factory, Class cls, short id, Marshaller marsh, + ClassLoader clsLrd) { + Constructor ctor; + MessageSerializer serializer; + + try { + ctor = cls.getConstructor(); + + boolean marshallable = MarshallableMessage.class.isAssignableFrom(cls); + + Class serCls = Class.forName(cls.getName() + (marshallable ? "MarshallableSerializer" : "Serializer")); + + serializer = marshallable + ? (MessageSerializer)serCls.getConstructor(Marshaller.class, ClassLoader.class).newInstance(marsh, clsLrd) + : (MessageSerializer)serCls.getConstructor().newInstance(); + } + catch (Exception e) { + throw new IgniteException("Failed to register message of type " + cls.getSimpleName(), e); + } + + factory.register( + id, + () -> { + try { + return ctor.newInstance(); + } + catch (Exception e) { + throw new IgniteException("Failed to create message of type " + cls.getSimpleName(), e); + } + }, + serializer + ); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java index ea37b6abe4545..a4c9164388911 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java @@ -46,7 +46,7 @@ public class DirectMarshallingMessagesTest extends GridCommonAbstractTest { /** Message factory. */ private final MessageFactory msgFactory = new IgniteMessageFactoryImpl(new MessageFactoryProvider[] { - new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()), + new CoreMessagesProvider(jdk(), U.gridClassLoader()), factory -> factory.register( TestNestedContainersMessage.TYPE, TestNestedContainersMessage::new, diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteCoreMessagesSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteCoreMessagesSerializationTest.java index 21f22330814af..74f1c90b9980f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteCoreMessagesSerializationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteCoreMessagesSerializationTest.java @@ -34,7 +34,7 @@ public class IgniteCoreMessagesSerializationTest extends AbstractMessageSerializationTest { /** {@inheritDoc} */ @Override protected MessageFactoryProvider messageFactory() { - return new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()); + return new CoreMessagesProvider(jdk(), U.gridClassLoader()); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java index 53fa342460b41..f124eb9822825 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java @@ -46,7 +46,7 @@ public class CompressedMessageTest { @Test public void testWriteReadHugeMessage() { MessageFactory msgFactory = new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{ - new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader())}); + new CoreMessagesProvider(jdk(), U.gridClassLoader())}); DirectMessageWriter writer = new DirectMessageWriter(msgFactory); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java index a41490c0e405f..15ddd54040c5a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java @@ -150,7 +150,7 @@ public void testCacheContinuousQueryEntrySerialization() { e0.markFiltered(); IgniteMessageFactoryImpl msgFactory = - new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader())}); + new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new CoreMessagesProvider(jdk(), U.gridClassLoader())}); ByteBuffer buf = ByteBuffer.allocate(4096); DirectMessageWriter writer = new DirectMessageWriter(msgFactory); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java index 1ca7217b67dca..f82c844bda973 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java @@ -154,7 +154,7 @@ private void startSpis() throws Exception { GridSpiTestContext ctx = initSpiContext(); ctx.messageFactory(new IgniteMessageFactoryImpl(new MessageFactoryProvider[] { - new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()), customMessageFactory()})); + new CoreMessagesProvider(jdk(), U.gridClassLoader()), customMessageFactory()})); ctx.setLocalNode(node); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java index 997d5b59fd4e6..1bc2358cc7e2f 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java @@ -434,7 +434,7 @@ private void startSpis(MessageListener lsnr) throws Exception { GridSpiTestContext ctx = initSpiContext(); ctx.messageFactory(new IgniteMessageFactoryImpl( - new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY}) + new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY}) ); ctx.setLocalNode(node); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java index 3f2d90924bf28..a604da0edb7f0 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java @@ -252,7 +252,7 @@ private TcpCommunicationSpi initializeSpi(GridSpiTestContext ctx, node.setId(rsrcs.getNodeId()); ctx.messageFactory(new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{ - new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY})); + new CoreMessagesProvider(jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY})); ctx.setLocalNode(node); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java index c043c068fc434..6a5d26eb26619 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java @@ -468,7 +468,7 @@ private int getSpiCount() { GridSpiTestContext ctx = initSpiContext(); ctx.messageFactory(new IgniteMessageFactoryImpl( - new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY}) + new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY}) ); ctx.timeoutProcessor(timeoutProcessor); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java index 56c6daceca5d2..de46f4b674a88 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java @@ -400,7 +400,7 @@ private void startSpis(int ackCnt, int idleTimeout, int queueLimit) throws Excep GridSpiTestContext ctx = initSpiContext(); ctx.messageFactory(new IgniteMessageFactoryImpl( - new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY}) + new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY}) ); ctx.setLocalNode(node); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java index d4f00e425a428..633bd332eaf28 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java @@ -729,7 +729,7 @@ private void startSpis() throws Exception { GridSpiTestContext ctx = initSpiContext(); ctx.messageFactory(new IgniteMessageFactoryImpl( - new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY}) + new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY}) ); ctx.setLocalNode(node); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java index 58a321e21c8a9..06d657a8a3bea 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java @@ -453,7 +453,7 @@ private void startSpis(int ackCnt, int idleTimeout, int queueLimit) throws Excep GridSpiTestContext ctx = initSpiContext(); ctx.messageFactory(new IgniteMessageFactoryImpl( - new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(), jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY}) + new MessageFactoryProvider[] {new CoreMessagesProvider(jdk(), U.gridClassLoader()), GRID_TEST_MESSAGE_FACTORY}) ); ctx.setLocalNode(node); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java index 2aaabee1aeb71..decf2bf4b5c3e 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java @@ -121,7 +121,7 @@ public void messageFactory(MessageFactoryProvider msgFactoryProvider) { assert !started(); this.msgFactory = new IgniteMessageFactoryImpl(new MessageFactoryProvider[] { - new CoreMessagesProvider(jdk(), jdk(), U.resolveClassLoader(ignite().configuration())), + new CoreMessagesProvider(jdk(), U.resolveClassLoader(ignite().configuration())), msgFactoryProvider }); } diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index 4eb5d79e0b6f9..b5c5035b7d030 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -554,7 +554,7 @@ public void triggerEvent(Event evt) { /** {@inheritDoc} */ @Override public MessageFactory messageFactory() { if (factory == null) - factory = new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new CoreMessagesProvider(jdk(), jdk(), + factory = new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new CoreMessagesProvider(jdk(), U.gridClassLoader())}); return factory; diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java index fe6becffafca7..e6638d4e1120d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java @@ -85,7 +85,6 @@ import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.logger.NullLogger; import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.marshaller.Marshallers; import org.apache.ignite.metric.IgniteMetrics; import org.apache.ignite.plugin.IgnitePlugin; import org.apache.ignite.plugin.PluginNotFoundException; @@ -156,7 +155,7 @@ public IgniteMock(String name, String locHost, UUID nodeId, Marshaller marshalle ClassLoader lrd = staticCfg == null ? U.gridClassLoader() : U.resolveClassLoader(staticCfg); msgFactory = new IgniteMessageFactoryImpl(new MessageFactoryProvider[] { - new CoreMessagesProvider(marshaller, Marshallers.jdk(), lrd)}); + new CoreMessagesProvider(marshaller, lrd)}); try { kernalCtx = new StandaloneGridKernalContext(new GridTestLog4jLogger(), null) {