Skip to content

Commit 2189b3e

Browse files
authored
IGNITE-26350 Enforce explicit use of OutputStream for TcpDiscoverySpi#writeToSocket (#12306)
1 parent d20774c commit 2189b3e

13 files changed

Lines changed: 15 additions & 130 deletions

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.IOException;
2222
import java.io.InputStream;
2323
import java.io.InterruptedIOException;
24+
import java.io.OutputStream;
2425
import java.io.StreamCorruptedException;
2526
import java.net.InetSocketAddress;
2627
import java.net.Socket;
@@ -718,19 +719,21 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws
718719
boolean openSock = false;
719720

720721
Socket sock = null;
722+
OutputStream out;
721723

722724
try {
723725
long tsNanos = System.nanoTime();
724726

725727
sock = spi.openSocket(addr, timeoutHelper);
728+
out = spi.socketStream(sock);
726729

727730
openSock = true;
728731

729732
TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId);
730733

731734
req.client(true);
732735

733-
spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
736+
spi.writeToSocket(sock, out, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
734737

735738
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, ackTimeout0);
736739

@@ -785,7 +788,7 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws
785788
if (msg instanceof TraceableMessage)
786789
tracing.messages().beforeSend((TraceableMessage)msg);
787790

788-
spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
791+
spi.writeToSocket(sock, out, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
789792

790793
spi.stats.onMessageSent(msg, U.millisSinceNanos(tsNanos));
791794

@@ -1386,6 +1389,7 @@ void ackReceived(TcpDiscoveryClientAckResponse res) {
13861389
try {
13871390
spi.writeToSocket(
13881391
sock,
1392+
spi.socketStream(sock),
13891393
msg,
13901394
sockTimeout);
13911395
}
@@ -1432,6 +1436,7 @@ void ackReceived(TcpDiscoveryClientAckResponse res) {
14321436

14331437
spi.writeToSocket(
14341438
sock,
1439+
spi.socketStream(sock),
14351440
msg,
14361441
sockTimeout);
14371442

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -877,7 +877,7 @@ private boolean pingNode(TcpDiscoveryNode node) {
877877

878878
openedSock = true;
879879

880-
spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId),
880+
spi.writeToSocket(sock, spi.socketStream(sock), new TcpDiscoveryPingRequest(locNodeId, clientNodeId),
881881
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
882882

883883
TcpDiscoveryPingResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk(
@@ -1475,7 +1475,7 @@ else if (U.millisSinceNanos(joinStartNanos) > spi.joinTimeout)
14751475
TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId);
14761476

14771477
// Handshake.
1478-
spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
1478+
spi.writeToSocket(sock, spi.socketStream(sock), req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
14791479

14801480
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk(
14811481
ackTimeout0));
@@ -1510,7 +1510,7 @@ else if (U.millisSinceNanos(joinStartNanos) > spi.joinTimeout)
15101510
// Send message.
15111511
tsNanos = System.nanoTime();
15121512

1513-
spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
1513+
spi.writeToSocket(sock, spi.socketStream(sock), msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
15141514

15151515
long tsNanos0 = System.nanoTime();
15161516

@@ -6769,7 +6769,7 @@ private class SocketReader extends IgniteSpiThread {
67696769
res.clientExists(clientWorker.ping(timeoutHelper));
67706770
}
67716771

6772-
spi.writeToSocket(sock, res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
6772+
spi.writeToSocket(sock, spi.socketStream(sock), res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
67736773

67746774
if (!(sock instanceof SSLSocket))
67756775
sock.shutdownOutput();
@@ -6862,7 +6862,7 @@ else if (req.changeTopology()) {
68626862
spi.getEffectiveSocketTimeout(srvSock) + " to " + rmtAddr + ":" + sock.getPort());
68636863
}
68646864

6865-
spi.writeToSocket(sock, res, spi.getEffectiveSocketTimeout(srvSock));
6865+
spi.writeToSocket(sock, spi.socketStream(sock), res, spi.getEffectiveSocketTimeout(srvSock));
68666866

68676867
// It can happen if a remote node is stopped and it has a loopback address in the list of addresses,
68686868
// the local node sends a handshake request message on the loopback address, so we get here.

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1726,20 +1726,6 @@ void configureSocketOptions(Socket sock) throws SocketException {
17261726
sock.setKeepAlive(true);
17271727
}
17281728

1729-
/**
1730-
* Writes message to the socket.
1731-
*
1732-
* @param sock Socket.
1733-
* @param msg Message.
1734-
* @param timeout Socket write timeout.
1735-
* @throws IOException If IO failed or write timed out.
1736-
* @throws IgniteCheckedException If marshalling failed.
1737-
*/
1738-
protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
1739-
IgniteCheckedException {
1740-
writeToSocket(sock, socketStream(sock), msg, timeout);
1741-
}
1742-
17431729
/**
17441730
* @param msg Message.
17451731
*/

modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -362,15 +362,6 @@ private class DiscoverySpi extends TcpDiscoverySpi {
362362
super.writeToSocket(sock, msg, data, timeout);
363363
}
364364

365-
/** {@inheritDoc} */
366-
@Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
367-
long timeout) throws IOException, IgniteCheckedException {
368-
if (blockAll || block && sock.getPort() == 47500)
369-
throw new SocketException("Test discovery exception");
370-
371-
super.writeToSocket(sock, msg, timeout);
372-
}
373-
374365
/** {@inheritDoc} */
375366
@Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg,
376367
long timeout) throws IOException, IgniteCheckedException {

modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -312,17 +312,6 @@ private class FailDiscoverySpi extends TcpDiscoverySpi {
312312
super.writeToSocket(sock, msg, data, timeout);
313313
}
314314

315-
/** {@inheritDoc} */
316-
@Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
317-
long timeout) throws IOException, IgniteCheckedException {
318-
assertNotFailedNode(sock);
319-
320-
if (isDrop(msg))
321-
return;
322-
323-
super.writeToSocket(sock, msg, timeout);
324-
}
325-
326315
/** {@inheritDoc} */
327316
@Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg,
328317
long timeout) throws IOException, IgniteCheckedException {

modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,12 @@
1717

1818
package org.apache.ignite.internal.processors.cache.distributed;
1919

20-
import java.io.IOException;
21-
import java.io.OutputStream;
22-
import java.net.Socket;
2320
import java.util.ArrayList;
2421
import java.util.Collection;
2522
import java.util.List;
2623
import java.util.concurrent.CyclicBarrier;
2724
import org.apache.ignite.Ignite;
2825
import org.apache.ignite.IgniteCache;
29-
import org.apache.ignite.IgniteCheckedException;
3026
import org.apache.ignite.IgniteException;
3127
import org.apache.ignite.IgniteSystemProperties;
3228
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
@@ -62,15 +58,6 @@ public class CacheStartOnJoinTest extends GridCommonAbstractTest {
6258
/** */
6359
private boolean delay = true;
6460

65-
@Override protected void writeToSocket(
66-
Socket sock,
67-
OutputStream out,
68-
TcpDiscoveryAbstractMessage msg,
69-
long timeout
70-
) throws IOException, IgniteCheckedException {
71-
super.writeToSocket(sock, out, msg, timeout);
72-
}
73-
7461
@Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
7562
if (getTestIgniteInstanceName(0).equals(ignite.name())) {
7663
if (msg instanceof TcpDiscoveryJoinRequestMessage) {

modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -237,17 +237,6 @@ protected void checkSegmented(InetSocketAddress sockAddr, long timeout) throws S
237237
super.writeToSocket(sock, out, msg, timeout);
238238
}
239239

240-
/** {@inheritDoc} */
241-
@Override protected void writeToSocket(
242-
Socket sock,
243-
TcpDiscoveryAbstractMessage msg,
244-
long timeout
245-
) throws IOException, IgniteCheckedException {
246-
checkSegmented((InetSocketAddress)sock.getRemoteSocketAddress(), timeout);
247-
248-
super.writeToSocket(sock, msg, timeout);
249-
}
250-
251240
/** {@inheritDoc} */
252241
@Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res,
253242
long timeout) throws IOException {

modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -207,17 +207,6 @@ private class TestDiscoverySpi extends TcpDiscoverySpi {
207207
super.writeToSocket(sock, msg, data, timeout);
208208
}
209209

210-
/** {@inheritDoc} */
211-
@Override protected void writeToSocket(
212-
Socket sock,
213-
TcpDiscoveryAbstractMessage msg,
214-
long timeout
215-
) throws IOException, IgniteCheckedException {
216-
awaitLatch();
217-
218-
super.writeToSocket(sock, msg, timeout);
219-
}
220-
221210
/** {@inheritDoc} */
222211
@Override protected void writeToSocket(
223212
Socket sock,

modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.IOException;
2121
import java.io.InputStream;
22+
import java.io.OutputStream;
2223
import java.net.Socket;
2324
import java.net.SocketTimeoutException;
2425
import java.util.Set;
@@ -292,15 +293,15 @@ class CustomDiscoverySpi extends TcpDiscoverySpi {
292293
}
293294

294295
/** {@inheritDoc} */
295-
@Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
296+
@Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg,
296297
long timeout) throws IOException, IgniteCheckedException {
297298
if (netDisabled) {
298299
netDisabledLatch.countDown();
299300

300301
throw new SocketTimeoutException("CustomDiscoverySpi: network is disabled.");
301302
}
302303
else
303-
super.writeToSocket(sock, msg, timeout);
304+
super.writeToSocket(sock, out, msg, timeout);
304305
}
305306

306307
/**

modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -496,29 +496,6 @@ private static class TestTcpDiscoverySpi2 extends TcpDiscoverySpi {
496496
throw new SocketTimeoutException("Write to socket delay timeout exception.");
497497
}
498498

499-
/** */
500-
@Override protected void writeToSocket(
501-
Socket sock,
502-
TcpDiscoveryAbstractMessage msg,
503-
long timeout
504-
) throws IOException, IgniteCheckedException {
505-
if (writeToSocketDelay > 0) {
506-
try {
507-
U.dumpStack(log, "Before sleep [msg=" + msg + ']');
508-
509-
Thread.sleep(writeToSocketDelay);
510-
}
511-
catch (InterruptedException ignore) {
512-
// Nothing to do.
513-
}
514-
}
515-
516-
if (sock.getSoTimeout() >= writeToSocketDelay)
517-
super.writeToSocket(sock, msg, timeout);
518-
else
519-
throw new SocketTimeoutException("Write to socket delay timeout exception.");
520-
}
521-
522499
/** */
523500
@Override protected void writeToSocket(
524501
TcpDiscoveryAbstractMessage msg,

0 commit comments

Comments
 (0)