Skip to content

Commit 5feee85

Browse files
Aegeanerz-wenlin
authored andcommitted
Make sendControlMessage to retry when interrupted (#13371)
`sendControlMessage()` method has no retry attempts, this refactor abstracted a `sendto` system call wrapper with retry enabled. Co-authored-by: zwenlin <zwenlin@vmware.com>
1 parent da24730 commit 5feee85

File tree

2 files changed

+72
-44
lines changed

2 files changed

+72
-44
lines changed

contrib/interconnect/udp/ic_udpifc.c

Lines changed: 69 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -848,6 +848,8 @@ static inline void logPkt(char *prefix, icpkthdr *pkt);
848848
static void aggregateStatistics(ChunkTransportStateEntry *pChunkEntry);
849849

850850
static inline bool pollAcks(ChunkTransportState *transportStates, int fd, int timeout);
851+
852+
static ssize_t sendtoWithRetry(int socket, const void *message, size_t length, int flags, const struct sockaddr *dest_addr, socklen_t dest_len, int retry, const char *errDetail);
851853
static TupleChunkListItem receiveChunksUDPIFCLoop(ChunkTransportState *pTransportStates, ChunkTransportStateEntry *pEntry,
852854
int16 *srcRoute, MotionConn *conn, WaitEventSet *waitset, int nevent);
853855

@@ -2224,9 +2226,6 @@ destroyConnHashTable(ConnHashTable *ht)
22242226
/*
22252227
* sendControlMessage
22262228
* Helper function to send a control message.
2227-
*
2228-
* It is different from sendOnce which retries on interrupts...
2229-
* Here, we leave it to retransmit logic to handle these cases.
22302229
*/
22312230
static inline void
22322231
sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr *addr, socklen_t peerLen)
@@ -2247,13 +2246,10 @@ sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr *addr, socklen_t peerL
22472246
if (gp_interconnect_full_crc)
22482247
addCRC(pkt);
22492248

2250-
n = sendto(fd, (const char *) pkt, pkt->len, 0, addr, peerLen);
2251-
2252-
/*
2253-
* No need to handle EAGAIN here: no-space just means that we dropped the
2254-
* packet: our ordinary retransmit mechanism will handle that case
2255-
*/
2256-
2249+
char errDetail[100];
2250+
snprintf(errDetail, sizeof(errDetail), "Send control message: got error with seq %u", pkt->seq);
2251+
/* Retry for infinite times since we have no retransmit mechanism for control message */
2252+
n = sendtoWithRetry(fd, (const char *) pkt, pkt->len, 0, addr, peerLen, -1, errDetail);
22572253
if (n < pkt->len)
22582254
write_log("sendcontrolmessage: got error %d errno %d seq %d", n, errno, pkt->seq);
22592255
}
@@ -5346,43 +5342,41 @@ prepareXmit(MotionConn *mConn)
53465342
}
53475343

53485344
/*
5349-
* sendOnce
5350-
* Send a packet.
5345+
* sendtoWithRetry
5346+
* Retry sendto logic and send the packets.
53515347
*/
5352-
static void
5353-
sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, ICBuffer *buf, MotionConn *mConn)
5348+
static ssize_t
5349+
sendtoWithRetry(int socket, const void *message, size_t length,
5350+
int flags, const struct sockaddr *dest_addr,
5351+
socklen_t dest_len, int retry, const char *errDetail)
53545352
{
53555353
int32 n;
5356-
ChunkTransportStateEntryUDP *pEntry = NULL;
5357-
MotionConnUDP *conn = NULL;
5358-
5359-
pEntry = CONTAINER_OF(pChunkEntry, ChunkTransportStateEntryUDP, entry);
5360-
Assert(pEntry);
5361-
5362-
conn = CONTAINER_OF(mConn, MotionConnUDP, mConn);
5363-
5364-
#ifdef USE_ASSERT_CHECKING
5365-
if (testmode_inject_fault(gp_udpic_dropxmit_percent))
5366-
{
5367-
#ifdef AMS_VERBOSE_LOGGING
5368-
write_log("THROW PKT with seq %d srcpid %d despid %d", buf->pkt->seq, buf->pkt->srcPid, buf->pkt->dstPid);
5369-
#endif
5370-
return;
5371-
}
5372-
#endif
5354+
int count = 0;
53735355

53745356
xmit_retry:
5375-
n = sendto(pEntry->txfd, buf->pkt, buf->pkt->len, 0,
5376-
(struct sockaddr *) &conn->peer, conn->peer_len);
5357+
/*
5358+
* If given retry count is positive, retry up to the limited times.
5359+
* Otherwise, retry for unlimited times until succeed.
5360+
*/
5361+
if (retry > 0 && ++count > retry)
5362+
return n;
5363+
n = sendto(socket, message, length, flags, dest_addr, dest_len);
53775364
if (n < 0)
53785365
{
53795366
int save_errno = errno;
53805367

53815368
if (errno == EINTR)
53825369
goto xmit_retry;
53835370

5384-
if (errno == EAGAIN) /* no space ? not an error. */
5385-
return;
5371+
/*
5372+
* EAGAIN: no space ? not an error.
5373+
*
5374+
* EFAULT: In Linux system call, it only happens when copying a socket
5375+
* address into kernel space failed, which is less likely to happen,
5376+
* but mocked heavily by our fault injection in regression tests.
5377+
*/
5378+
if (errno == EAGAIN || errno == EFAULT)
5379+
return n;
53865380

53875381
/*
53885382
* If Linux iptables (nf_conntrack?) drops an outgoing packet, it may
@@ -5394,20 +5388,52 @@ sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkE
53945388
ereport(LOG,
53955389
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
53965390
errmsg("Interconnect error writing an outgoing packet: %m"),
5397-
errdetail("error during sendto() for Remote Connection: contentId=%d at %s",
5398-
conn->mConn.remoteContentId, conn->mConn.remoteHostAndPort)));
5399-
return;
5391+
errdetail("error during sendto() %s", errDetail)));
5392+
return n;
54005393
}
54015394

54025395
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
54035396
errmsg("Interconnect error writing an outgoing packet: %m"),
54045397
errdetail("error during sendto() call (error:%d).\n"
5405-
"For Remote Connection: contentId=%d at %s",
5406-
save_errno, conn->mConn.remoteContentId,
5407-
conn->mConn.remoteHostAndPort)));
5398+
"%s", save_errno, errDetail)));
54085399
/* not reached */
54095400
}
54105401

5402+
return n;
5403+
}
5404+
5405+
/*
5406+
* sendOnce
5407+
* Send a packet.
5408+
*/
5409+
static void
5410+
sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, ICBuffer *buf, MotionConn *mConn)
5411+
{
5412+
int32 n;
5413+
ChunkTransportStateEntryUDP *pEntry = NULL;
5414+
MotionConnUDP *conn = NULL;
5415+
5416+
pEntry = CONTAINER_OF(pChunkEntry, ChunkTransportStateEntryUDP, entry);
5417+
Assert(pEntry);
5418+
5419+
conn = CONTAINER_OF(mConn, MotionConnUDP, mConn);
5420+
5421+
#ifdef USE_ASSERT_CHECKING
5422+
if (testmode_inject_fault(gp_udpic_dropxmit_percent))
5423+
{
5424+
#ifdef AMS_VERBOSE_LOGGING
5425+
write_log("THROW PKT with seq %d srcpid %d despid %d", buf->pkt->seq, buf->pkt->srcPid, buf->pkt->dstPid);
5426+
#endif
5427+
return;
5428+
}
5429+
#endif
5430+
5431+
char errDetail[100];
5432+
snprintf(errDetail, sizeof(errDetail), "For Remote Connection: contentId=%d at %s",
5433+
conn->mConn.remoteContentId,
5434+
conn->mConn.remoteHostAndPort);
5435+
n = sendtoWithRetry(pEntry->txfd, buf->pkt, buf->pkt->len, 0,
5436+
(struct sockaddr *) &conn->peer, conn->peer_len, -1, errDetail);
54115437
if (n != buf->pkt->len)
54125438
{
54135439
if (DEBUG1 >= log_min_messages)
@@ -5419,7 +5445,6 @@ sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkE
54195445
logPkt("PKT DETAILS ", buf->pkt);
54205446
#endif
54215447
}
5422-
54235448
return;
54245449
}
54255450

@@ -5903,7 +5928,7 @@ checkNetworkTimeout(ICBuffer *buf, uint64 now, bool *networkTimeoutIsLogged)
59035928
ereport(ERROR,
59045929
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
59055930
errmsg("interconnect encountered a network error, please check your network"),
5906-
errdetail("Failed to send packet (seq %d) to %s (pid %d cid %d) after %d retries in %d seconds.",
5931+
errdetail("Failed to send packet (seq %u) to %s (pid %d cid %d) after %u retries in %d seconds.",
59075932
buf->pkt->seq, buf->conn->remoteHostAndPort,
59085933
buf->pkt->dstPid, buf->pkt->dstContentId,
59095934
buf->nRetry, Gp_interconnect_transmit_timeout)));
@@ -5924,7 +5949,7 @@ checkNetworkTimeout(ICBuffer *buf, uint64 now, bool *networkTimeoutIsLogged)
59245949
{
59255950
ereport(WARNING,
59265951
(errmsg("interconnect may encountered a network error, please check your network"),
5927-
errdetail("Failed to send packet (seq %d) to %s (pid %d cid %d) after %d retries.",
5952+
errdetail("Failing to send packet (seq %u) to %s (pid %d cid %d) after %u retries.",
59285953
buf->pkt->seq, buf->conn->remoteHostAndPort,
59295954
buf->pkt->dstPid, buf->pkt->dstContentId,
59305955
buf->nRetry)));

src/test/regress/init_file

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ m/^WARNING: table ".*" contains rows in segment .*, which is outside the # of s
8888
# The following output is generated by \d on foreign tables, so ignore it.
8989
m/Distributed by: \(.*\)/
9090
m/Distributed randomly/
91+
# The following output is an interconnect network warning, but still not error out, so ignore it.
92+
m/WARNING: interconnect may encountered a network error, please check your network/
93+
m/Failing to send packet/
9194

9295
# directory_table test output is sensitive to the user running the tests
9396
m/^NOTICE:.*storage user mapping for .* does not exist for storage server/

0 commit comments

Comments
 (0)