diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java new file mode 100644 index 0000000000000..c30286a909a59 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; +import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage; + +/** Message factory for discovery messages. */ +public class DiscoveryMessageFactory implements MessageFactoryProvider { + /** {@inheritDoc} */ + @Override public void registerAll(MessageFactory factory) { + factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 643d4d3fb25f1..caf8a530ffa3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -17,11 +17,8 @@ package org.apache.ignite.spi.discovery.tcp; -import java.io.BufferedInputStream; import java.io.IOException; -import java.io.InputStream; import java.io.InterruptedIOException; -import java.io.OutputStream; import java.io.StreamCorruptedException; import java.net.InetSocketAddress; import java.net.Socket; @@ -719,13 +716,13 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws boolean openSock = false; Socket sock = null; - OutputStream out; try { long tsNanos = System.nanoTime(); sock = spi.openSocket(addr, timeoutHelper); - out = spi.socketStream(sock); + + TcpDiscoveryIoSession ses = createSession(sock); openSock = true; @@ -733,9 +730,9 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws req.client(true); - spi.writeToSocket(sock, out, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + spi.writeMessage(ses, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); - TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0); + TcpDiscoveryHandshakeResponse res = spi.readMessage(ses, ackTimeout0); UUID rmtNodeId = res.creatorNodeId(); @@ -788,7 +785,7 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws if (msg instanceof TraceableMessage) tracing.messages().beforeSend((TraceableMessage)msg); - spi.writeToSocket(sock, out, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + spi.writeMessage(ses, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); spi.stats.onMessageSent(msg, U.millisSinceNanos(tsNanos)); @@ -1179,7 +1176,7 @@ private void forceStopRead() throws InterruptedException { + ":" + sockStream.sock.getPort()); try { - InputStream in = sockStream.stream(); + TcpDiscoveryIoSession ses = createSession(sock); assert sock.getKeepAlive() && sock.getTcpNoDelay() : "Socket wasn't configured properly:" + " KeepAlive " + sock.getKeepAlive() + @@ -1189,7 +1186,7 @@ private void forceStopRead() throws InterruptedException { TcpDiscoveryAbstractMessage msg; try { - msg = U.unmarshal(spi.marshaller(), in, U.resolveClassLoader(spi.ignite().configuration())); + msg = spi.readMessage(ses, sock.getSoTimeout()); } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) @@ -1266,6 +1263,9 @@ private class SocketWriter extends IgniteSpiThread { /** */ private Socket sock; + /** */ + private TcpDiscoveryIoSession ses; + /** */ private boolean clientAck; @@ -1333,6 +1333,8 @@ private void setSocket(Socket sock, boolean clientAck) { synchronized (mux) { this.sock = sock; + ses = createSession(sock); + this.clientAck = clientAck; unackedMsg = null; @@ -1387,11 +1389,7 @@ void ackReceived(TcpDiscoveryClientAckResponse res) { msg.client(true); try { - spi.writeToSocket( - sock, - spi.socketStream(sock), - msg, - sockTimeout); + spi.writeMessage(ses, msg, sockTimeout); } catch (IOException | IgniteCheckedException e) { if (log.isDebugEnabled()) { @@ -1434,11 +1432,7 @@ void ackReceived(TcpDiscoveryClientAckResponse res) { } } - spi.writeToSocket( - sock, - spi.socketStream(sock), - msg, - sockTimeout); + spi.writeMessage(ses, msg, sockTimeout); IgniteUuid latencyCheckId = msg instanceof TcpDiscoveryRingLatencyCheckMessage ? msg.id() : null; @@ -1601,6 +1595,7 @@ public void cancel() { clientAck = joinRes.get2(); Socket sock = sockStream.socket(); + TcpDiscoveryIoSession ses = createSession(sock); if (isInterrupted()) throw new InterruptedException(); @@ -1612,8 +1607,6 @@ public void cancel() { sock.setSoTimeout((int)spi.netTimeout); - InputStream in = sockStream.stream(); - assert sock.getKeepAlive() && sock.getTcpNoDelay() : "Socket wasn't configured properly:" + " KeepAlive " + sock.getKeepAlive() + " TcpNoDelay " + sock.getTcpNoDelay(); @@ -1621,8 +1614,7 @@ public void cancel() { List msgs = null; while (!isInterrupted()) { - TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in, - U.resolveClassLoader(spi.ignite().configuration())); + TcpDiscoveryAbstractMessage msg = spi.readMessage(ses, sock.getSoTimeout()); if (msg instanceof TcpDiscoveryClientReconnectMessage) { TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg; @@ -2769,9 +2761,6 @@ private static class SocketStream { /** */ private final Socket sock; - /** */ - private final InputStream in; - /** * @param sock Socket. * @throws IOException If failed to create stream. @@ -2780,8 +2769,6 @@ public SocketStream(Socket sock) throws IOException { assert sock != null; this.sock = sock; - - this.in = new BufferedInputStream(sock.getInputStream()); } /** @@ -2792,13 +2779,6 @@ Socket socket() { } - /** - * @return Socket input stream. - */ - InputStream stream() { - return in; - } - /** {@inheritDoc} */ @Override public String toString() { return sock.toString(); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 7905a5a8ac02b..8b5bdadbebdde 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -17,11 +17,8 @@ package org.apache.ignite.spi.discovery.tcp; -import java.io.BufferedInputStream; import java.io.IOException; -import java.io.InputStream; import java.io.ObjectStreamException; -import java.io.OutputStream; import java.io.Serializable; import java.io.StreamCorruptedException; import java.net.ConnectException; @@ -877,11 +874,12 @@ private boolean pingNode(TcpDiscoveryNode node) { openedSock = true; - spi.writeToSocket(sock, spi.socketStream(sock), new TcpDiscoveryPingRequest(locNodeId, clientNodeId), + TcpDiscoveryIoSession ses = createSession(sock); + + spi.writeMessage(ses, new TcpDiscoveryPingRequest(locNodeId, clientNodeId), timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); - TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk( - spi.getAckTimeout())); + TcpDiscoveryPingResponse res = spi.readMessage(ses, timeoutHelper.nextTimeoutChunk(spi.getAckTimeout())); if (locNodeId.equals(res.creatorNodeId())) { if (log.isDebugEnabled()) @@ -1469,16 +1467,16 @@ else if (U.millisSinceNanos(joinStartNanos) > spi.joinTimeout) long tsNanos = System.nanoTime(); sock = spi.openSocket(addr, timeoutHelper); + TcpDiscoveryIoSession ses = createSession(sock); openSock = true; TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId); // Handshake. - spi.writeToSocket(sock, spi.socketStream(sock), req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + spi.writeMessage(ses, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); - TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk( - ackTimeout0)); + TcpDiscoveryHandshakeResponse res = spi.readMessage(ses, timeoutHelper.nextTimeoutChunk(ackTimeout0)); if (msg instanceof TcpDiscoveryJoinRequestMessage) { boolean ignore = false; @@ -1510,7 +1508,7 @@ else if (U.millisSinceNanos(joinStartNanos) > spi.joinTimeout) // Send message. tsNanos = System.nanoTime(); - spi.writeToSocket(sock, spi.socketStream(sock), msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + spi.writeMessage(ses, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); long tsNanos0 = System.nanoTime(); @@ -2957,8 +2955,8 @@ private class RingMessageWorker extends MessageWorker connLsnr : spi.incomeConnLsnrs) connLsnr.apply(sock); - int rcvBufSize = sock.getReceiveBufferSize(); - - in = new BufferedInputStream(sock.getInputStream(), rcvBufSize > 0 ? rcvBufSize : 8192); - byte[] buf = new byte[4]; int read = 0; while (read < buf.length) { - int r = in.read(buf, read, buf.length - read); + int r = sock.getInputStream().read(buf, read, buf.length - read); if (r >= 0) read += r; @@ -6787,7 +6777,7 @@ private class SocketReader extends IgniteSpiThread { // Restore timeout. sock.setSoTimeout(timeout); - TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, spi.netTimeout); + TcpDiscoveryAbstractMessage msg = spi.readMessage(ses, spi.netTimeout); // Ping. if (msg instanceof TcpDiscoveryPingRequest) { @@ -6812,7 +6802,7 @@ private class SocketReader extends IgniteSpiThread { res.clientExists(clientWorker.ping(timeoutHelper)); } - spi.writeToSocket(sock, spi.socketStream(sock), res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + spi.writeMessage(ses, res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); if (!(sock instanceof SSLSocket)) sock.shutdownOutput(); @@ -6905,7 +6895,7 @@ else if (req.changeTopology()) { spi.getEffectiveSocketTimeout(srvSock) + " to " + rmtAddr + ":" + sock.getPort()); } - spi.writeToSocket(sock, spi.socketStream(sock), res, spi.getEffectiveSocketTimeout(srvSock)); + spi.writeMessage(ses, res, spi.getEffectiveSocketTimeout(srvSock)); // It can happen if a remote node is stopped and it has a loopback address in the list of addresses, // the local node sends a handshake request message on the loopback address, so we get here. @@ -7028,8 +7018,8 @@ else if (e.hasCause(ObjectStreamException.class) || try { SecurityUtils.serializeVersion(1); - TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in, - U.resolveClassLoader(spi.ignite().configuration())); + // Use inifinite timeout for accepting new messages. + TcpDiscoveryAbstractMessage msg = spi.readMessage(ses, 0); msg.senderNodeId(nodeId); @@ -7713,6 +7703,9 @@ private class ClientMessageWorker extends MessageWorker toOrderedList(Collection addrs) return res; } + /** + * Instantiates IO session for exchanging discovery messages with remote node. + * + * @param sock Socket to remote node. + * @return IO session for writing and reading {@link TcpDiscoveryAbstractMessage}. + */ + TcpDiscoveryIoSession createSession(Socket sock) { + return new TcpDiscoveryIoSession(sock, spi); + } + /** * @param msg Message. * @return Message logger. diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java new file mode 100644 index 0000000000000..1bf398c978089 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.StreamCorruptedException; +import java.net.Socket; +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.direct.DirectMessageReader; +import org.apache.ignite.internal.direct.DirectMessageWriter; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageSerializer; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; + +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType; + +/** + * Handles I/O operations between discovery nodes in the cluster. This class encapsulates the socket connection used + * by the {@link TcpDiscoverySpi} to exchange discovery protocol messages between nodes. + *

+ * Currently, there are two modes for message serialization: + *

    + *
  • Using {@link MessageSerializer} for messages implementing the {@link Message} interface.
  • + *
  • Deprecated: Using {@link JdkMarshaller} for messages that have not yet been refactored.
  • + *
+ * A leading byte is used to distinguish between the modes. The byte will be removed in future. + */ +public class TcpDiscoveryIoSession { + /** Default size of buffer used for buffering socket in/out. */ + private static final int DFLT_SOCK_BUFFER_SIZE = 8192; + + /** Size for an intermediate buffer for serializing discovery messages. */ + private static final int MSG_BUFFER_SIZE = 100; + + /** Leading byte for messages use {@link JdkMarshaller} for serialization. */ + // TODO: remove these flags after refactoring all discovery messages. + static final byte JAVA_SERIALIZATION = (byte)1; + + /** Leading byte for messages use {@link MessageSerializer} for serialization. */ + static final byte MESSAGE_SERIALIZATION = (byte)2; + + /** */ + private final TcpDiscoverySpi spi; + + /** Loads discovery messages classes during java deserialization. */ + private final ClassLoader clsLdr; + + /** */ + private final Socket sock; + + /** */ + private final DirectMessageWriter msgWriter; + + /** */ + private final DirectMessageReader msgReader; + + /** Buffered socket output stream. */ + private final OutputStream out; + + /** Buffered socket input stream. */ + private final InputStream in; + + /** Intermediate buffer for serializing discovery messages. */ + private final ByteBuffer msgBuf; + + /** + * Creates a new discovery I/O session bound to the given socket. + * + * @param sock Socket connected to a remote discovery node. + * @param spi Discovery SPI instance owning this session. + * @throws IgniteException If an I/O error occurs while initializing buffers. + */ + TcpDiscoveryIoSession(Socket sock, TcpDiscoverySpi spi) { + this.sock = sock; + this.spi = spi; + + clsLdr = U.resolveClassLoader(spi.ignite().configuration()); + + msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE); + + msgWriter = new DirectMessageWriter(spi.messageFactory()); + msgReader = new DirectMessageReader(spi.messageFactory(), null); + + try { + int sendBufSize = sock.getSendBufferSize() > 0 ? sock.getSendBufferSize() : DFLT_SOCK_BUFFER_SIZE; + int rcvBufSize = sock.getReceiveBufferSize() > 0 ? sock.getReceiveBufferSize() : DFLT_SOCK_BUFFER_SIZE; + + out = new BufferedOutputStream(sock.getOutputStream(), sendBufSize); + in = new BufferedInputStream(sock.getInputStream(), rcvBufSize); + } + catch (IOException e) { + throw new IgniteException(e); + } + } + + /** + * Writes a discovery message to the underlying socket output stream. + * + * @param msg Message to send to the remote node. + * @throws IgniteCheckedException If serialization fails. + */ + void writeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException, IOException { + if (!(msg instanceof Message)) { + out.write(JAVA_SERIALIZATION); + + U.marshal(spi.marshaller(), msg, out); + + return; + } + + try { + out.write(MESSAGE_SERIALIZATION); + + serializeMessage((Message)msg, out); + + out.flush(); + } + catch (Exception e) { + // Keep logic similar to `U.marshal(...)`. + if (e instanceof IgniteCheckedException) + throw (IgniteCheckedException)e; + + throw new IgniteCheckedException(e); + } + } + + /** + * Reads the next discovery message from the socket input stream. + * + * @param Type of the expected message. + * @return Deserialized message instance. + * @throws IgniteCheckedException If deserialization fails. + */ + T readMessage() throws IgniteCheckedException, IOException { + byte serMode = (byte)in.read(); + + if (JAVA_SERIALIZATION == serMode) + return U.unmarshal(spi.marshaller(), in, clsLdr); + + try { + if (MESSAGE_SERIALIZATION != serMode) { + detectSslAlert(serMode, in); + + throw new IgniteCheckedException("Received unexpected byte while reading discovery message: " + serMode); + } + + byte b0 = (byte)in.read(); + byte b1 = (byte)in.read(); + + Message msg = spi.messageFactory().create(makeMessageType(b0, b1)); + + msgReader.reset(); + msgReader.setBuffer(msgBuf); + + MessageSerializer msgSer = spi.messageFactory().serializer(msg.directType()); + + boolean finished; + + do { + msgBuf.clear(); + + int read = in.read(msgBuf.array(), 0, msgBuf.limit()); + + if (read == -1) + throw new EOFException("Connection closed before message was fully read."); + + msgBuf.limit(read); + + finished = msgSer.readFrom(msg, msgReader); + } while (!finished); + + return (T)msg; + } + catch (Exception e) { + // Keep logic similar to `U.marshal(...)`. + if (e instanceof IgniteCheckedException) + throw (IgniteCheckedException)e; + + throw new IgniteCheckedException(e); + } + } + + /** + * Serializes a discovery message into a byte array. + * + * @param msg Discovery message to serialize. + * @return Serialized byte array containing the message data. + * @throws IgniteCheckedException If serialization fails. + * @throws IOException If serialization fails. + */ + byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException, IOException { + if (!(msg instanceof Message)) + return U.marshal(spi.marshaller(), msg); + + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + serializeMessage((Message)msg, out); + + return out.toByteArray(); + } + } + + /** @return Socket. */ + public Socket socket() { + return sock; + } + + /** + * Serializes a discovery message into given output stream. + * + * @param m Discovery message to serialize. + * @param out Output stream to write serialized message. + * @throws IOException If serialization fails. + */ + private void serializeMessage(Message m, OutputStream out) throws IOException { + MessageSerializer msgSer = spi.messageFactory().serializer(m.directType()); + + msgWriter.reset(); + msgWriter.setBuffer(msgBuf); + + boolean finished; + + do { + finished = msgSer.writeTo(m, msgWriter); + + out.write(msgBuf.array(), 0, msgBuf.position()); + + msgBuf.clear(); + } + while (!finished); + } + + /** + * Checks wheter input stream contains SSL alert. + * See handling {@code StreamCorruptedException} in {@link #readMessage()}. + * Keeps logic similar to {@link java.io.ObjectInputStream#readStreamHeader}. + */ + private void detectSslAlert(byte firstByte, InputStream in) throws IOException { + byte[] hdr = new byte[4]; + hdr[0] = firstByte; + int read = in.readNBytes(hdr, 1, 3); + + if (read < 3) + throw new EOFException(); + + String hex = String.format("%02x%02x%02x%02x", hdr[0], hdr[1], hdr[2], hdr[3]); + + if (hex.matches("15....00")) + throw new StreamCorruptedException("invalid stream header: " + hex); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 6df10d88a0f40..2ed8e77e99f89 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -17,10 +17,8 @@ package org.apache.ignite.spi.discovery.tcp; -import java.io.BufferedOutputStream; import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import java.io.StreamCorruptedException; @@ -58,6 +56,8 @@ import org.apache.ignite.failure.FailureContext; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.processors.failure.FailureProcessor; import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; @@ -73,6 +73,9 @@ import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; +import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.IgniteSpiAdapter; @@ -449,6 +452,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery /** */ protected IgniteSpiContext spiCtx; + /** Discovery messages factory. */ + private MessageFactory msgFactory; + /** For test purposes. */ private boolean skipAddrsRandomization = false; @@ -1102,6 +1108,11 @@ public void setConnectionRecoveryTimeout(long connRecoveryTimeout) { locNodeVer = ver; } + /** @return Discovery messages factory. */ + public MessageFactory messageFactory() { + return msgFactory; + } + /** * Gets ID of the local node. * @@ -1554,18 +1565,6 @@ protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeou return openSocket(createSocket(), sockAddr, timeoutHelper); } - /** - * @param sock Socket. - * @return Buffered stream wrapping socket stream. - * @throws IOException If failed. - */ - final BufferedOutputStream socketStream(Socket sock) throws IOException { - int bufSize = sock.getSendBufferSize(); - - return bufSize > 0 ? new BufferedOutputStream(sock.getOutputStream(), bufSize) : - new BufferedOutputStream(sock.getOutputStream()); - } - /** * Connects to remote address sending {@code U.IGNITE_HEADER} when connection is established. * @@ -1681,6 +1680,13 @@ protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[ try (SocketTimeoutObject ignored = startTimer(sock, timeout)) { OutputStream out = sock.getOutputStream(); + // Write Ignite header without leading byte. + if (msg != null) { + byte mode = msg instanceof Message ? TcpDiscoveryIoSession.MESSAGE_SERIALIZATION : TcpDiscoveryIoSession.JAVA_SERIALIZATION; + + out.write(mode); + } + out.write(data); out.flush(); @@ -1717,22 +1723,24 @@ protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) { /** * Writes message to the socket. * - * @param sock Socket. - * @param out Stream to write to. + * @param ses IO session. * @param msg Message. * @param timeout Timeout. * @throws IOException If IO failed or write timed out. * @throws IgniteCheckedException If marshalling failed. */ - protected void writeToSocket(Socket sock, - OutputStream out, + protected void writeMessage( + TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, - long timeout) throws IOException, IgniteCheckedException { + long timeout + ) throws IOException, IgniteCheckedException { + Socket sock = ses.socket(); + assert sock != null; assert msg != null; try (SocketTimeoutObject ignored = startTimer(sock, timeout)) { - U.marshal(marshaller(), msg, out); + ses.writeMessage(msg); } catch (IgniteCheckedException e) { SSLException sslEx = checkSslException(sock, e); @@ -1771,15 +1779,16 @@ protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int r /** * Reads message from the socket limiting read time. * - * @param sock Socket. - * @param in Input stream (in case socket stream was wrapped). + * @param ses IO session. * @param timeout Socket timeout for this operation. * @return Message. * @throws IOException If IO failed or read timed out. * @throws IgniteCheckedException If unmarshalling failed. */ - protected T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, + protected T readMessage(TcpDiscoveryIoSession ses, long timeout) throws IOException, IgniteCheckedException { + Socket sock = ses.socket(); + assert sock != null; int oldTimeout = sock.getSoTimeout(); @@ -1787,10 +1796,7 @@ protected T readMessage(Socket sock, @Nullable InputStream in, long timeout) try { sock.setSoTimeout((int)timeout); - T res = U.unmarshal(marshaller(), in == null ? sock.getInputStream() : in, - U.resolveClassLoader(ignite.configuration())); - - return res; + return ses.readMessage(); } catch (IOException | IgniteCheckedException e) { if (X.hasCause(e, SocketTimeoutException.class)) @@ -2098,6 +2104,9 @@ protected void onExchange(DiscoveryDataPacket dataPacket, ClassLoader clsLdr) { registerMBean(igniteInstanceName, new TcpDiscoverySpiMBeanImpl(this), TcpDiscoverySpiMBean.class); + msgFactory = new IgniteMessageFactoryImpl( + new MessageFactoryProvider[] { new DiscoveryMessageFactory() }); + impl.spiStart(igniteInstanceName); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index 93d8bcf9e5e1e..505980bef3dc5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.Set; import java.util.UUID; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -58,6 +59,7 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { private transient UUID sndNodeId; /** Message ID. */ + @Order(0) private IgniteUuid id; /** @@ -135,6 +137,15 @@ public IgniteUuid id() { return id; } + /** + * Sets message ID. + * + * @param id Message ID. + */ + public void id(IgniteUuid id) { + this.id = id; + } + /** * Gets sender node ID. * diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCheckFailedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCheckFailedMessage.java index e282410f40e7e..c65db903e0f87 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCheckFailedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCheckFailedMessage.java @@ -18,17 +18,24 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.util.UUID; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Message telling joining node that it failed coordinator's validation check. */ -public class TcpDiscoveryCheckFailedMessage extends TcpDiscoveryAbstractMessage { +public class TcpDiscoveryCheckFailedMessage extends TcpDiscoveryAbstractMessage implements Message { /** */ private static final long serialVersionUID = 0L; /** Coordinator version. */ - private final String err; + @Order(value = 1, method = "error") + private String err; + + /** */ + public TcpDiscoveryCheckFailedMessage() { + } /** * Constructor. @@ -49,6 +56,18 @@ public String error() { return err; } + /** + * @param err message from coordinator. + */ + public void error(String err) { + this.err = err; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 0; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryCheckFailedMessage.class, this, "super", super.toString()); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java index 285a70efc7460..b29a8171fbce4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal; import java.io.IOException; -import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; @@ -45,6 +44,7 @@ import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.testframework.GridTestUtils; @@ -363,12 +363,12 @@ private class DiscoverySpi extends TcpDiscoverySpi { } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { - if (blockAll || block && sock.getPort() == 47500) + if (blockAll || block && ses.socket().getPort() == 47500) throw new SocketException("Test discovery exception"); - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java index 304991e46df00..b69d65446ff1b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal; import java.io.IOException; -import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.util.Arrays; @@ -33,6 +32,7 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; @@ -313,14 +313,14 @@ private class FailDiscoverySpi extends TcpDiscoverySpi { } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { - assertNotFailedNode(sock); + assertNotFailedNode(ses.socket()); if (isDrop(msg)) return; - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java index 75cab20d4acb5..4dfa5f43da590 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java @@ -18,8 +18,6 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.io.IOException; -import java.io.OutputStream; -import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -40,6 +38,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; @@ -75,9 +74,8 @@ public class CacheClientsConcurrentStartTest extends GridCommonAbstractTest { IgniteConfiguration cfg = super.getConfiguration(gridName); TcpDiscoverySpi testSpi = new TcpDiscoverySpi() { - @Override protected void writeToSocket( - Socket sock, - OutputStream out, + @Override protected void writeMessage( + TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout ) throws IOException, IgniteCheckedException { @@ -92,7 +90,7 @@ public class CacheClientsConcurrentStartTest extends GridCommonAbstractTest { } } - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } }; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java index 2c055f2dd99f4..fdfb3b90cd3f5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.io.IOException; -import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; @@ -38,6 +37,7 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -228,13 +228,12 @@ protected void checkSegmented(InetSocketAddress sockAddr, long timeout) throws S } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, - OutputStream out, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { - checkSegmented((InetSocketAddress)sock.getRemoteSocketAddress(), timeout); + checkSegmented((InetSocketAddress)ses.socket().getRemoteSocketAddress(), timeout); - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java index 0ba28ca3e0190..aaae9d6b9e270 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java @@ -18,8 +18,6 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot.incremental; import java.io.IOException; -import java.io.OutputStream; -import java.net.Socket; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Supplier; @@ -39,6 +37,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; @@ -219,9 +218,8 @@ private void checkRestoredSnapshotIsEmpty() throws Exception { /** */ private static class ClientBlockingDiscoverySpi extends TcpDiscoverySpi { /** {@inheritDoc} */ - @Override protected void writeToSocket( - Socket sock, - OutputStream out, + @Override protected void writeMessage( + TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout ) throws IOException, IgniteCheckedException { @@ -231,7 +229,7 @@ private static class ClientBlockingDiscoverySpi extends TcpDiscoverySpi { U.awaitQuiet(unblockClientJoinReq); } - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java index 8bce7675f88cf..ab8022dbdd22b 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.spi.communication.tcp; import java.io.IOException; -import java.io.OutputStream; import java.net.BindException; import java.net.InetSocketAddress; import java.net.Socket; @@ -37,6 +36,7 @@ import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; @@ -208,15 +208,14 @@ private class TestDiscoverySpi extends TcpDiscoverySpi { } /** {@inheritDoc} */ - @Override protected void writeToSocket( - Socket sock, - OutputStream out, + @Override protected void writeMessage( + TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout ) throws IOException, IgniteCheckedException { awaitLatch(); - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java index 0e26ff95b7645..aeba1e4d7c136 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java @@ -18,9 +18,6 @@ package org.apache.ignite.spi.communication.tcp; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; import java.net.SocketTimeoutException; import java.util.Set; import java.util.UUID; @@ -45,10 +42,10 @@ import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; import org.apache.ignite.spi.collision.fifoqueue.FifoQueueCollisionSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jetbrains.annotations.Nullable; import org.junit.Test; /** @@ -281,7 +278,7 @@ class CustomDiscoverySpi extends TcpDiscoverySpi { private final CountDownLatch netDisabledLatch = new CountDownLatch(1); /** {@inheritDoc} */ - @Override protected T readMessage(Socket sock, @Nullable InputStream in, + @Override protected T readMessage(TcpDiscoveryIoSession ses, long timeout) throws IOException, IgniteCheckedException { if (netDisabled) { U.sleep(timeout); @@ -289,11 +286,11 @@ class CustomDiscoverySpi extends TcpDiscoverySpi { throw new SocketTimeoutException("CustomDiscoverySpi: network is disabled."); } else - return super.readMessage(sock, in, timeout); + return super.readMessage(ses, timeout); } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (netDisabled) { netDisabledLatch.countDown(); @@ -301,7 +298,7 @@ class CustomDiscoverySpi extends TcpDiscoverySpi { throw new SocketTimeoutException("CustomDiscoverySpi: network is disabled."); } else - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java index 8b0c912eed549..3a02b54260dea 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/LongClientConnectToClusterTest.java @@ -18,8 +18,6 @@ package org.apache.ignite.spi.discovery; import java.io.IOException; -import java.io.OutputStream; -import java.net.Socket; import java.util.Collections; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; @@ -28,6 +26,7 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIoSession; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; @@ -147,7 +146,7 @@ private static class DelayedTcpDiscoverySpi extends TcpDiscoverySpi { public static final int DELAY_MSG_PERIOD_MILLIS = 2_000; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (msg instanceof TcpDiscoveryNodeAddFinishedMessage && msg.topologyVersion() == 3) { log.info("Catched discovery message: " + msg); @@ -162,7 +161,7 @@ private static class DelayedTcpDiscoverySpi extends TcpDiscoverySpi { } } - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } } } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java index 65688863a8436..f72eac9999460 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java @@ -18,7 +18,6 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; -import java.io.OutputStream; import java.net.Socket; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; @@ -88,13 +87,12 @@ private synchronized void apply(ClusterNode addr, TcpDiscoveryAbstractMessage ms } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, - OutputStream out, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (spiCtx != null) apply(spiCtx.localNode(), msg); - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java index a184cd42fe40d..b8a82d52cc90b 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java @@ -175,6 +175,7 @@ private void attack(byte[] data) throws IOException { OutputStream oos = new BufferedOutputStream(sock.getOutputStream()) ) { oos.write(U.IGNITE_HEADER); + oos.write((byte)1); // Flag for java serialization. oos.write(data); } } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java index 472682c19966b..ca31620403736 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java @@ -18,10 +18,8 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; -import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.Socket; import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -211,9 +209,8 @@ public void testClientConnectToBigTopology0() throws Exception { */ class TestTcpDiscoverySpi extends TcpDiscoverySpi { /** {@inheritDoc} */ - @Override protected void writeToSocket( - Socket sock, - OutputStream out, + @Override protected void writeMessage( + TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout ) throws IOException, IgniteCheckedException { @@ -228,10 +225,10 @@ class TestTcpDiscoverySpi extends TcpDiscoverySpi { fail("Unexpected interrupt on nodeAddFinishedDelay"); } - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } else - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java index 2708b606007ad..7eef5a2739b74 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java @@ -18,8 +18,6 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.Socket; import java.net.SocketTimeoutException; import java.util.ArrayList; @@ -42,7 +40,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; -import org.jetbrains.annotations.Nullable; import org.junit.Test; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; @@ -475,8 +472,7 @@ private static class TestTcpDiscoverySpi2 extends TcpDiscoverySpi { } /** */ - @Override protected void writeToSocket(Socket sock, - OutputStream out, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (writeToSocketDelay > 0) { @@ -490,8 +486,8 @@ private static class TestTcpDiscoverySpi2 extends TcpDiscoverySpi { } } - if (sock.getSoTimeout() >= writeToSocketDelay) - super.writeToSocket(sock, out, msg, timeout); + if (ses.socket().getSoTimeout() >= writeToSocketDelay) + super.writeMessage(ses, msg, timeout); else throw new SocketTimeoutException("Write to socket delay timeout exception."); } @@ -521,14 +517,14 @@ private static class TestTcpDiscoverySpi2 extends TcpDiscoverySpi { } /** {@inheritDoc} */ - @Override protected T readMessage(Socket sock, @Nullable InputStream in, long timeout) + @Override protected T readMessage(TcpDiscoveryIoSession ses, long timeout) throws IOException, IgniteCheckedException { long currTimeout = getLocalNode().isClient() ? clientFailureDetectionTimeout() : failureDetectionTimeout(); if (readDelay < currTimeout) { try { - return super.readMessage(sock, in, timeout); + return super.readMessage(ses, timeout); } catch (Exception e) { err = e; @@ -537,7 +533,7 @@ private static class TestTcpDiscoverySpi2 extends TcpDiscoverySpi { } } else { - T msg = super.readMessage(sock, in, timeout); + T msg = super.readMessage(ses, timeout); if (msg instanceof TcpDiscoveryPingRequest) { try { diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index f3d45f9f09769..6ca4782c65c4f 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; -import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.util.ArrayList; @@ -2573,19 +2572,18 @@ private void pauseResumeOperation(boolean isPause, AtomicBoolean... locks) { } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, - OutputStream out, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { waitFor(writeLock); - if (!onMessage(sock, msg)) + if (!onMessage(ses.socket(), msg)) return; - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); if (afterWrite != null) - afterWrite.apply(msg, sock); + afterWrite.apply(msg, ses.socket()); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryCoordinatorFailureTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryCoordinatorFailureTest.java index 10c6aa14242fd..f6b7024474451 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryCoordinatorFailureTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryCoordinatorFailureTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; -import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.util.Collections; @@ -302,9 +301,8 @@ public void awaitDrop() throws InterruptedException, IgniteCheckedException { } /** {@inheritDoc} */ - @Override protected void writeToSocket( - Socket sock, - OutputStream out, + @Override protected void writeMessage( + TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout ) throws IOException, IgniteCheckedException { @@ -314,7 +312,7 @@ public void awaitDrop() throws InterruptedException, IgniteCheckedException { msg = new TcpDiscoveryConnectionCheckMessage(locNode); } - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java index 481eb24a0fb06..e1e647c01e60f 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; -import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; @@ -199,10 +198,10 @@ private static class DropTcpDiscoverySpi extends TcpDiscoverySpi { } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { - if (sock.getPort() != FAIL_PORT) - super.writeToSocket(sock, out, msg, timeout); + if (ses.socket().getPort() != FAIL_PORT) + super.writeMessage(ses, msg, timeout); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java index 7162083af9cd2..3b2c925804430 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; -import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; @@ -538,9 +537,9 @@ private void breakDiscoConnectionToNext(IgniteEx ig) throws Exception { Object spis = GridTestUtils.getFieldValue(disco, GridManagerAdapter.class, "spis"); - OutputStream out = GridTestUtils.getFieldValue(((Object[])spis)[0], "impl", "msgWorker", "out"); + TcpDiscoveryIoSession ses = GridTestUtils.getFieldValue(((Object[])spis)[0], "impl", "msgWorker", "ses"); - out.close(); + ses.socket().getOutputStream().close(); } /** @@ -627,21 +626,21 @@ private boolean dropMsg(Socket sock) { } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { BiConsumer hsRqLsnr; BiConsumer hsRespLsnr; if (msg instanceof TcpDiscoveryHandshakeRequest && (hsRqLsnr = this.hsRqLsnr.get()) != null) - hsRqLsnr.accept(sock, (TcpDiscoveryHandshakeRequest)msg); + hsRqLsnr.accept(ses.socket(), (TcpDiscoveryHandshakeRequest)msg); if (msg instanceof TcpDiscoveryHandshakeResponse && (hsRespLsnr = this.hsRespLsnr.get()) != null) - hsRespLsnr.accept(sock, (TcpDiscoveryHandshakeResponse)msg); + hsRespLsnr.accept(ses.socket(), (TcpDiscoveryHandshakeResponse)msg); - if (dropMsg(sock)) + if (dropMsg(ses.socket())) return; - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeJoinAndFailureTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeJoinAndFailureTest.java index 0bdf0a0558a4e..f449be13c8e85 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeJoinAndFailureTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeJoinAndFailureTest.java @@ -167,8 +167,8 @@ Test reproduces the needed behavior (two nodes in CONNECTING state) doing the fo if (nodeId.equals(node2Id)) { Object workerObj = GridTestUtils.getFieldValue(impl, "msgWorker"); - - OutputStream out = GridTestUtils.getFieldValue(workerObj, "out"); + TcpDiscoveryIoSession ses = GridTestUtils.getFieldValue(workerObj, "ses"); + OutputStream out = GridTestUtils.getFieldValue(ses, "out"); try { out.close(); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java index ddb52738db5c7..15cb1f903827d 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; -import java.io.OutputStream; import java.net.Socket; import java.util.Set; import org.apache.ignite.Ignite; @@ -265,10 +264,10 @@ private class DyingDiscoverySpi extends TcpDiscoverySpi { } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (!blockMsgs) - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index c01fc8a5cbc14..3195284ba9262 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -19,9 +19,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.OutputStream; import java.net.InetSocketAddress; -import java.net.Socket; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Arrays; @@ -2448,9 +2446,8 @@ private static class TestRestoreConnectedSpi extends TcpDiscoverySpi { } /** {@inheritDoc} */ - @Override protected void writeToSocket( - Socket sock, - OutputStream out, + @Override protected void writeMessage( + TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { // Test relies on an error in this thread only. @@ -2459,14 +2456,14 @@ private static class TestRestoreConnectedSpi extends TcpDiscoverySpi { if (startTest && !(msg instanceof TcpDiscoveryConnectionCheckMessage) && ringMsgWorkerThread) { int errPort = errPortSupplier.get(); - if (sock.getPort() == errPort) { + if (ses.socket().getPort() == errPort) { log.info("Fail write on message send [port=" + errPort + ", msg=" + msg + ']'); throw new SocketTimeoutException(); } else if (locNode.discoveryPort() == errPort) { if (sleepEndTime == 0) { - errNextPort = sock.getPort(); + errNextPort = ses.socket().getPort(); sleepEndTime = System.currentTimeMillis() + 3000; } @@ -2487,8 +2484,8 @@ else if (locNode.discoveryPort() == errPort) { log.info("Stop sleep on message send: " + msg); - if (sock.getPort() == errNextPort) { - log.info("Fail write after sleep [port=" + sock.getPort() + ", msg=" + msg + ']'); + if (ses.socket().getPort() == errNextPort) { + log.info("Fail write after sleep [port=" + ses.socket().getPort() + ", msg=" + msg + ']'); throw new SocketTimeoutException(); } @@ -2496,7 +2493,7 @@ else if (locNode.discoveryPort() == errPort) { } } - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } } @@ -2514,7 +2511,7 @@ private static class TestDiscoveryDataDuplicateSpi extends TcpDiscoverySpi { static volatile boolean checkClientNodeAddFinished; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (msg instanceof TcpDiscoveryNodeAddedMessage) { @@ -2536,7 +2533,7 @@ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { } } - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } /** @@ -2580,7 +2577,7 @@ private static class TestEventDiscardSpi extends TcpDiscoverySpi { private volatile boolean failed; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { boolean add = msgIds.add(msg.id()); @@ -2591,7 +2588,7 @@ private static class TestEventDiscardSpi extends TcpDiscoverySpi { failed = true; } - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } } @@ -2603,8 +2600,7 @@ private static class TestCustomerEventAckSpi extends TcpDiscoverySpi { private volatile boolean stopBeforeSndAck; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, - OutputStream out, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (stopBeforeSndAck) { @@ -2616,7 +2612,7 @@ private static class TestCustomerEventAckSpi extends TcpDiscoverySpi { if (custMsg instanceof StartRoutineAckDiscoveryMessage) { log.info("Skip message send and stop node: " + msg); - sock.close(); + ses.socket().close(); GridTestUtils.runAsync(new Callable() { @Override public Object call() throws Exception { @@ -2635,7 +2631,7 @@ private static class TestCustomerEventAckSpi extends TcpDiscoverySpi { } } - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } } @@ -2666,8 +2662,7 @@ private static class TestFailedNodesSpi extends TcpDiscoverySpi { } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, - OutputStream out, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (stop) @@ -2678,7 +2673,7 @@ private static class TestFailedNodesSpi extends TcpDiscoverySpi { log.info("IO error on message send [locNode=" + locNode + ", msg=" + msg + ']'); - sock.close(); + ses.socket().close(); throw new SocketTimeoutException(); } @@ -2688,7 +2683,7 @@ private static class TestFailedNodesSpi extends TcpDiscoverySpi { failMsg.compareAndSet(false, true)) { log.info("IO error on message send [locNode=" + locNode + ", msg=" + msg + ']'); - sock.close(); + ses.socket().close(); throw new SocketTimeoutException(); } @@ -2700,7 +2695,7 @@ private static class TestFailedNodesSpi extends TcpDiscoverySpi { log.info("Skip messages send and stop node [locNode=" + locNode + ", msg=" + msg + ']'); - sock.close(); + ses.socket().close(); GridTestUtils.runAsync(new Callable() { @Override public Object call() throws Exception { @@ -2713,7 +2708,7 @@ private static class TestFailedNodesSpi extends TcpDiscoverySpi { return; } - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } } @@ -2728,7 +2723,7 @@ private static class TestCustomEventCoordinatorFailureSpi extends TcpDiscoverySp private boolean stop; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (msg instanceof TcpDiscoveryCustomEventMessage && latch != null) { log.info("Stop node on custom event: " + msg); @@ -2741,7 +2736,7 @@ private static class TestCustomEventCoordinatorFailureSpi extends TcpDiscoverySp if (stop) return; - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } } @@ -2762,7 +2757,7 @@ private static class TestCustomEventRaceSpi extends TcpDiscoverySpi { private boolean debug; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (msg instanceof TcpDiscoveryNodeAddedMessage) { if (nodeAdded1 != null) { @@ -2790,7 +2785,7 @@ private static class TestCustomEventRaceSpi extends TcpDiscoverySpi { if (debug && msg instanceof TcpDiscoveryCustomEventMessage) log.info("--- Send custom event: " + msg); - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } } @@ -2818,7 +2813,7 @@ public TestMessageWorkerFailureSpi1(int failureMode) { } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (stop) { @@ -2835,7 +2830,7 @@ public TestMessageWorkerFailureSpi1(int failureMode) { } - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } } @@ -2847,12 +2842,12 @@ private static class TestMessageWorkerFailureSpi2 extends TcpDiscoverySpi { private volatile boolean stop; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (stop) throw new RuntimeException("Failing ring message worker explicitly"); - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); if (msg instanceof TcpDiscoveryNodeAddedMessage) stop = true; diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java index 2368cb6d19b58..55f83d72fc603 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; -import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import org.apache.ignite.IgniteCheckedException; @@ -303,13 +302,13 @@ else if (openSockTimeoutWait) { } /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (!(msg instanceof TcpDiscoveryPingRequest)) { if (cntConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage) connCheckStatusMsgCntSent++; - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); return; } @@ -329,7 +328,7 @@ else if (openSockTimeoutWait) { } } else - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiReconnectDelayTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiReconnectDelayTest.java index e06733a3810cb..5a0726e7a60f9 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiReconnectDelayTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiReconnectDelayTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; -import java.io.OutputStream; import java.net.Socket; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -400,13 +399,13 @@ private static class FailingTcpDiscoverySpi extends TcpDiscoverySpi { private final AtomicInteger failReconReq = new AtomicInteger(); /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { - if (!onMessage(sock, msg)) + if (!onMessage(ses.socket(), msg)) return; - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySslSecuredUnsecuredTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySslSecuredUnsecuredTest.java index 4c876d14b0a28..dcf9931ef9236 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySslSecuredUnsecuredTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySslSecuredUnsecuredTest.java @@ -18,9 +18,7 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; -import java.io.InputStream; import java.io.StreamCorruptedException; -import java.net.Socket; import java.util.concurrent.Callable; import javax.net.ssl.SSLException; import org.apache.ignite.IgniteCheckedException; @@ -28,7 +26,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jetbrains.annotations.Nullable; import org.junit.Test; /** @@ -177,7 +174,7 @@ private FailDiscoverySpi(final boolean plain) { } /** {@inheritDoc} */ - @Override protected T readMessage(final Socket sock, @Nullable final InputStream in, + @Override protected T readMessage(final TcpDiscoveryIoSession ses, final long timeout) throws IOException, IgniteCheckedException { if (cnt-- > 0) { if (plain) @@ -186,7 +183,7 @@ private FailDiscoverySpi(final boolean plain) { throw new SSLException("Test SSL exception"); } - return super.readMessage(sock, in, timeout); + return super.readMessage(ses, timeout); } } } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java index 669b3112fd8bd..e3038bf5dc6a7 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java @@ -18,8 +18,6 @@ package org.apache.ignite.spi.discovery.tcp; import java.io.IOException; -import java.io.OutputStream; -import java.net.Socket; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; @@ -48,7 +46,7 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi implements IgniteDiscov private IgniteDiscoverySpiInternalListener internalLsnr; /** {@inheritDoc} */ - @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, + @Override protected void writeMessage(TcpDiscoveryIoSession ses, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse) return; @@ -61,7 +59,7 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi implements IgniteDiscov internalLsnr.beforeReconnect(locNode, log); } - super.writeToSocket(sock, out, msg, timeout); + super.writeMessage(ses, msg, timeout); } /** {@inheritDoc} */