Skip to content

Commit 70b0e36

Browse files
authored
IGNITE-26349 Refactoring setting socket timeout for TcpDiscoverySpi#writeToSocket (#12305)
1 parent 2189b3e commit 70b0e36

1 file changed

Lines changed: 25 additions & 69 deletions

File tree

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java

Lines changed: 25 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1674,18 +1674,11 @@ Socket createSocket() throws IOException {
16741674
* @param timeout Socket write timeout.
16751675
* @throws IOException If IO failed or write timed out.
16761676
*/
1677-
@SuppressWarnings("ThrowFromFinallyBlock")
16781677
protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data, long timeout) throws IOException {
16791678
assert sock != null;
16801679
assert data != null;
16811680

1682-
SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
1683-
1684-
addTimeoutObject(obj);
1685-
1686-
IOException err = null;
1687-
1688-
try {
1681+
try (SocketTimeoutObject ignored = startTimer(sock, timeout)) {
16891682
OutputStream out = sock.getOutputStream();
16901683

16911684
out.write(data);
@@ -1695,20 +1688,7 @@ protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[
16951688
catch (IOException e) {
16961689
SSLException sslEx = checkSslException(sock, e);
16971690

1698-
err = sslEx == null ? e : sslEx;
1699-
}
1700-
finally {
1701-
boolean cancelled = obj.cancel();
1702-
1703-
if (cancelled)
1704-
removeTimeoutObject(obj);
1705-
1706-
// Throw original exception.
1707-
if (err != null)
1708-
throw err;
1709-
1710-
if (!cancelled)
1711-
throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
1691+
throw sslEx == null ? e : sslEx;
17121692
}
17131693
}
17141694

@@ -1744,41 +1724,20 @@ protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
17441724
* @throws IOException If IO failed or write timed out.
17451725
* @throws IgniteCheckedException If marshalling failed.
17461726
*/
1747-
@SuppressWarnings("ThrowFromFinallyBlock")
17481727
protected void writeToSocket(Socket sock,
17491728
OutputStream out,
17501729
TcpDiscoveryAbstractMessage msg,
17511730
long timeout) throws IOException, IgniteCheckedException {
17521731
assert sock != null;
17531732
assert msg != null;
1754-
assert out != null;
17551733

1756-
SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
1757-
1758-
addTimeoutObject(obj);
1759-
1760-
IgniteCheckedException err = null;
1761-
1762-
try {
1734+
try (SocketTimeoutObject ignored = startTimer(sock, timeout)) {
17631735
U.marshal(marshaller(), msg, out);
17641736
}
17651737
catch (IgniteCheckedException e) {
17661738
SSLException sslEx = checkSslException(sock, e);
17671739

1768-
err = sslEx == null ? e : new IgniteCheckedException(sslEx);
1769-
}
1770-
finally {
1771-
boolean cancelled = obj.cancel();
1772-
1773-
if (cancelled)
1774-
removeTimeoutObject(obj);
1775-
1776-
// Throw original exception.
1777-
if (err != null)
1778-
throw err;
1779-
1780-
if (!cancelled)
1781-
throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
1740+
throw sslEx == null ? e : new IgniteCheckedException(sslEx);
17821741
}
17831742
}
17841743

@@ -1791,41 +1750,21 @@ protected void writeToSocket(Socket sock,
17911750
* @param timeout Socket timeout.
17921751
* @throws IOException If IO failed or write timed out.
17931752
*/
1794-
@SuppressWarnings("ThrowFromFinallyBlock")
17951753
protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout)
17961754
throws IOException {
17971755
assert sock != null;
17981756

1799-
SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
1800-
1801-
addTimeoutObject(obj);
1802-
1803-
OutputStream out = sock.getOutputStream();
1804-
1805-
IOException err = null;
1757+
try (SocketTimeoutObject ignored = startTimer(sock, timeout)) {
1758+
OutputStream out = sock.getOutputStream();
18061759

1807-
try {
18081760
out.write(res);
18091761

18101762
out.flush();
18111763
}
18121764
catch (IOException e) {
18131765
SSLException sslEx = checkSslException(sock, e);
18141766

1815-
err = sslEx == null ? e : sslEx;
1816-
}
1817-
finally {
1818-
boolean cancelled = obj.cancel();
1819-
1820-
if (cancelled)
1821-
removeTimeoutObject(obj);
1822-
1823-
// Throw original exception.
1824-
if (err != null)
1825-
throw err;
1826-
1827-
if (!cancelled)
1828-
throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
1767+
throw (sslEx == null) ? e : sslEx;
18291768
}
18301769
}
18311770

@@ -2489,10 +2428,19 @@ protected Marshaller marshaller() {
24892428
return S.toString(TcpDiscoverySpi.class, this);
24902429
}
24912430

2431+
/** Starts a timer for a socket operation. */
2432+
private SocketTimeoutObject startTimer(Socket sock, long timeout) {
2433+
SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
2434+
2435+
addTimeoutObject(obj);
2436+
2437+
return obj;
2438+
}
2439+
24922440
/**
24932441
* Socket timeout object.
24942442
*/
2495-
private class SocketTimeoutObject implements IgniteSpiTimeoutObject {
2443+
private class SocketTimeoutObject implements IgniteSpiTimeoutObject, AutoCloseable {
24962444
/** */
24972445
private final IgniteUuid id = IgniteUuid.randomUuid();
24982446

@@ -2550,6 +2498,14 @@ boolean cancel() {
25502498
return id;
25512499
}
25522500

2501+
/** */
2502+
@Override public void close() throws SocketTimeoutException {
2503+
if (cancel())
2504+
removeTimeoutObject(this);
2505+
else
2506+
throw new SocketTimeoutException("Write timed out (socket was concurrently closed).");
2507+
}
2508+
25532509
/** {@inheritDoc} */
25542510
@Override public String toString() {
25552511
return S.toString(SocketTimeoutObject.class, this);

0 commit comments

Comments
 (0)