Skip to content

Commit e349ec5

Browse files
Aegeanerz-wenlin
authored andcommitted
Make sendControlMessage to retry when interrupted (#13371)
`sendControlMessage()` method has no retry attempts, this refactor abstracted a `sendto` system call wrapper with retry enabled. Co-authored-by: zwenlin <zwenlin@vmware.com>
1 parent fc8adb0 commit e349ec5

2 files changed

Lines changed: 72 additions & 44 deletions

File tree

contrib/interconnect/udp/ic_udpifc.c

Lines changed: 69 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -781,6 +781,8 @@ static inline void logPkt(char *prefix, icpkthdr *pkt);
781781
static void aggregateStatistics(ChunkTransportStateEntry *pChunkEntry);
782782

783783
static inline bool pollAcks(ChunkTransportState *transportStates, int fd, int timeout);
784+
785+
static ssize_t sendtoWithRetry(int socket, const void *message, size_t length, int flags, const struct sockaddr *dest_addr, socklen_t dest_len, int retry, const char *errDetail);
784786
static TupleChunkListItem receiveChunksUDPIFCLoop(ChunkTransportState *pTransportStates, ChunkTransportStateEntry *pEntry,
785787
int16 *srcRoute, MotionConn *conn, WaitEventSet *waitset, int nevent);
786788

@@ -1814,9 +1816,6 @@ destroyConnHashTable(ConnHashTable *ht)
18141816
/*
18151817
* sendControlMessage
18161818
* Helper function to send a control message.
1817-
*
1818-
* It is different from sendOnce which retries on interrupts...
1819-
* Here, we leave it to retransmit logic to handle these cases.
18201819
*/
18211820
static inline void
18221821
sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr *addr, socklen_t peerLen)
@@ -1837,13 +1836,10 @@ sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr *addr, socklen_t peerL
18371836
if (gp_interconnect_full_crc)
18381837
addCRC(pkt);
18391838

1840-
n = sendto(fd, (const char *) pkt, pkt->len, 0, addr, peerLen);
1841-
1842-
/*
1843-
* No need to handle EAGAIN here: no-space just means that we dropped the
1844-
* packet: our ordinary retransmit mechanism will handle that case
1845-
*/
1846-
1839+
char errDetail[100];
1840+
snprintf(errDetail, sizeof(errDetail), "Send control message: got error with seq %u", pkt->seq);
1841+
/* Retry for infinite times since we have no retransmit mechanism for control message */
1842+
n = sendtoWithRetry(fd, (const char *) pkt, pkt->len, 0, addr, peerLen, -1, errDetail);
18471843
if (n < pkt->len)
18481844
write_log("sendcontrolmessage: got error %d errno %d seq %d", n, errno, pkt->seq);
18491845
}
@@ -4831,43 +4827,41 @@ prepareXmit(MotionConn *mConn)
48314827
}
48324828

48334829
/*
4834-
* sendOnce
4835-
* Send a packet.
4830+
* sendtoWithRetry
4831+
* Retry sendto logic and send the packets.
48364832
*/
4837-
static void
4838-
sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, ICBuffer *buf, MotionConn *mConn)
4833+
static ssize_t
4834+
sendtoWithRetry(int socket, const void *message, size_t length,
4835+
int flags, const struct sockaddr *dest_addr,
4836+
socklen_t dest_len, int retry, const char *errDetail)
48394837
{
48404838
int32 n;
4841-
ChunkTransportStateEntryUDP *pEntry = NULL;
4842-
MotionConnUDP *conn = NULL;
4843-
4844-
pEntry = CONTAINER_OF(pChunkEntry, ChunkTransportStateEntryUDP, entry);
4845-
Assert(pEntry);
4846-
4847-
conn = CONTAINER_OF(mConn, MotionConnUDP, mConn);
4848-
4849-
#ifdef USE_ASSERT_CHECKING
4850-
if (testmode_inject_fault(gp_udpic_dropxmit_percent))
4851-
{
4852-
#ifdef AMS_VERBOSE_LOGGING
4853-
write_log("THROW PKT with seq %d srcpid %d despid %d", buf->pkt->seq, buf->pkt->srcPid, buf->pkt->dstPid);
4854-
#endif
4855-
return;
4856-
}
4857-
#endif
4839+
int count = 0;
48584840

48594841
xmit_retry:
4860-
n = sendto(pEntry->txfd, buf->pkt, buf->pkt->len, 0,
4861-
(struct sockaddr *) &conn->peer, conn->peer_len);
4842+
/*
4843+
* If given retry count is positive, retry up to the limited times.
4844+
* Otherwise, retry for unlimited times until succeed.
4845+
*/
4846+
if (retry > 0 && ++count > retry)
4847+
return n;
4848+
n = sendto(socket, message, length, flags, dest_addr, dest_len);
48624849
if (n < 0)
48634850
{
48644851
int save_errno = errno;
48654852

48664853
if (errno == EINTR)
48674854
goto xmit_retry;
48684855

4869-
if (errno == EAGAIN) /* no space ? not an error. */
4870-
return;
4856+
/*
4857+
* EAGAIN: no space ? not an error.
4858+
*
4859+
* EFAULT: In Linux system call, it only happens when copying a socket
4860+
* address into kernel space failed, which is less likely to happen,
4861+
* but mocked heavily by our fault injection in regression tests.
4862+
*/
4863+
if (errno == EAGAIN || errno == EFAULT)
4864+
return n;
48714865

48724866
/*
48734867
* If Linux iptables (nf_conntrack?) drops an outgoing packet, it may
@@ -4879,20 +4873,52 @@ sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkE
48794873
ereport(LOG,
48804874
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
48814875
errmsg("Interconnect error writing an outgoing packet: %m"),
4882-
errdetail("error during sendto() for Remote Connection: contentId=%d at %s",
4883-
conn->mConn.remoteContentId, conn->mConn.remoteHostAndPort)));
4884-
return;
4876+
errdetail("error during sendto() %s", errDetail)));
4877+
return n;
48854878
}
48864879

48874880
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
48884881
errmsg("Interconnect error writing an outgoing packet: %m"),
48894882
errdetail("error during sendto() call (error:%d).\n"
4890-
"For Remote Connection: contentId=%d at %s",
4891-
save_errno, conn->mConn.remoteContentId,
4892-
conn->mConn.remoteHostAndPort)));
4883+
"%s", save_errno, errDetail)));
48934884
/* not reached */
48944885
}
48954886

4887+
return n;
4888+
}
4889+
4890+
/*
4891+
* sendOnce
4892+
* Send a packet.
4893+
*/
4894+
static void
4895+
sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, ICBuffer *buf, MotionConn *mConn)
4896+
{
4897+
int32 n;
4898+
ChunkTransportStateEntryUDP *pEntry = NULL;
4899+
MotionConnUDP *conn = NULL;
4900+
4901+
pEntry = CONTAINER_OF(pChunkEntry, ChunkTransportStateEntryUDP, entry);
4902+
Assert(pEntry);
4903+
4904+
conn = CONTAINER_OF(mConn, MotionConnUDP, mConn);
4905+
4906+
#ifdef USE_ASSERT_CHECKING
4907+
if (testmode_inject_fault(gp_udpic_dropxmit_percent))
4908+
{
4909+
#ifdef AMS_VERBOSE_LOGGING
4910+
write_log("THROW PKT with seq %d srcpid %d despid %d", buf->pkt->seq, buf->pkt->srcPid, buf->pkt->dstPid);
4911+
#endif
4912+
return;
4913+
}
4914+
#endif
4915+
4916+
char errDetail[100];
4917+
snprintf(errDetail, sizeof(errDetail), "For Remote Connection: contentId=%d at %s",
4918+
conn->mConn.remoteContentId,
4919+
conn->mConn.remoteHostAndPort);
4920+
n = sendtoWithRetry(pEntry->txfd, buf->pkt, buf->pkt->len, 0,
4921+
(struct sockaddr *) &conn->peer, conn->peer_len, -1, errDetail);
48964922
if (n != buf->pkt->len)
48974923
{
48984924
if (DEBUG1 >= log_min_messages)
@@ -4904,7 +4930,6 @@ sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkE
49044930
logPkt("PKT DETAILS ", buf->pkt);
49054931
#endif
49064932
}
4907-
49084933
return;
49094934
}
49104935

@@ -5384,7 +5409,7 @@ checkNetworkTimeout(ICBuffer *buf, uint64 now, bool *networkTimeoutIsLogged)
53845409
ereport(ERROR,
53855410
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
53865411
errmsg("interconnect encountered a network error, please check your network"),
5387-
errdetail("Failed to send packet (seq %d) to %s (pid %d cid %d) after %d retries in %d seconds.",
5412+
errdetail("Failed to send packet (seq %u) to %s (pid %d cid %d) after %u retries in %d seconds.",
53885413
buf->pkt->seq, buf->conn->remoteHostAndPort,
53895414
buf->pkt->dstPid, buf->pkt->dstContentId,
53905415
buf->nRetry, Gp_interconnect_transmit_timeout)));
@@ -5405,7 +5430,7 @@ checkNetworkTimeout(ICBuffer *buf, uint64 now, bool *networkTimeoutIsLogged)
54055430
{
54065431
ereport(WARNING,
54075432
(errmsg("interconnect may encountered a network error, please check your network"),
5408-
errdetail("Failed to send packet (seq %d) to %s (pid %d cid %d) after %d retries.",
5433+
errdetail("Failing to send packet (seq %u) to %s (pid %d cid %d) after %u retries.",
54095434
buf->pkt->seq, buf->conn->remoteHostAndPort,
54105435
buf->pkt->dstPid, buf->pkt->dstContentId,
54115436
buf->nRetry)));

src/test/regress/init_file

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ m/^WARNING: table ".*" contains rows in segment .*, which is outside the # of s
8888
# The following output is generated by \d on foreign tables, so ignore it.
8989
m/Distributed by: \(.*\)/
9090
m/Distributed randomly/
91+
# The following output is an interconnect network warning, but still not error out, so ignore it.
92+
m/WARNING: interconnect may encountered a network error, please check your network/
93+
m/Failing to send packet/
9194

9295
# directory_table test output is sensitive to the user running the tests
9396
m/^NOTICE:.*storage user mapping for .* does not exist for storage server/

0 commit comments

Comments
 (0)