Skip to content

Commit 6a0f9ac

Browse files
Aegeanerz-wenlin
authored andcommitted
Make sendControlMessage to retry when interrupted (#13371)
`sendControlMessage()` method has no retry attempts, this refactor abstracted a `sendto` system call wrapper with retry enabled. Co-authored-by: zwenlin <zwenlin@vmware.com>
1 parent bcc7749 commit 6a0f9ac

2 files changed

Lines changed: 72 additions & 44 deletions

File tree

contrib/interconnect/udp/ic_udpifc.c

Lines changed: 69 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -849,6 +849,8 @@ static inline void logPkt(char *prefix, icpkthdr *pkt);
849849
static void aggregateStatistics(ChunkTransportStateEntry *pChunkEntry);
850850

851851
static inline bool pollAcks(ChunkTransportState *transportStates, int fd, int timeout);
852+
853+
static ssize_t sendtoWithRetry(int socket, const void *message, size_t length, int flags, const struct sockaddr *dest_addr, socklen_t dest_len, int retry, const char *errDetail);
852854
static TupleChunkListItem receiveChunksUDPIFCLoop(ChunkTransportState *pTransportStates, ChunkTransportStateEntry *pEntry,
853855
int16 *srcRoute, MotionConn *conn, WaitEventSet *waitset, int nevent);
854856

@@ -2225,9 +2227,6 @@ destroyConnHashTable(ConnHashTable *ht)
22252227
/*
22262228
* sendControlMessage
22272229
* Helper function to send a control message.
2228-
*
2229-
* It is different from sendOnce which retries on interrupts...
2230-
* Here, we leave it to retransmit logic to handle these cases.
22312230
*/
22322231
static inline void
22332232
sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr *addr, socklen_t peerLen)
@@ -2248,13 +2247,10 @@ sendControlMessage(icpkthdr *pkt, int fd, struct sockaddr *addr, socklen_t peerL
22482247
if (gp_interconnect_full_crc)
22492248
addCRC(pkt);
22502249

2251-
n = sendto(fd, (const char *) pkt, pkt->len, 0, addr, peerLen);
2252-
2253-
/*
2254-
* No need to handle EAGAIN here: no-space just means that we dropped the
2255-
* packet: our ordinary retransmit mechanism will handle that case
2256-
*/
2257-
2250+
char errDetail[100];
2251+
snprintf(errDetail, sizeof(errDetail), "Send control message: got error with seq %u", pkt->seq);
2252+
/* Retry for infinite times since we have no retransmit mechanism for control message */
2253+
n = sendtoWithRetry(fd, (const char *) pkt, pkt->len, 0, addr, peerLen, -1, errDetail);
22582254
if (n < pkt->len)
22592255
write_log("sendcontrolmessage: got error %d errno %d seq %d", n, errno, pkt->seq);
22602256
}
@@ -5353,43 +5349,41 @@ prepareXmit(MotionConn *mConn)
53535349
}
53545350

53555351
/*
5356-
* sendOnce
5357-
* Send a packet.
5352+
* sendtoWithRetry
5353+
* Retry sendto logic and send the packets.
53585354
*/
5359-
static void
5360-
sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, ICBuffer *buf, MotionConn *mConn)
5355+
static ssize_t
5356+
sendtoWithRetry(int socket, const void *message, size_t length,
5357+
int flags, const struct sockaddr *dest_addr,
5358+
socklen_t dest_len, int retry, const char *errDetail)
53615359
{
53625360
int32 n;
5363-
ChunkTransportStateEntryUDP *pEntry = NULL;
5364-
MotionConnUDP *conn = NULL;
5365-
5366-
pEntry = CONTAINER_OF(pChunkEntry, ChunkTransportStateEntryUDP, entry);
5367-
Assert(pEntry);
5368-
5369-
conn = CONTAINER_OF(mConn, MotionConnUDP, mConn);
5370-
5371-
#ifdef USE_ASSERT_CHECKING
5372-
if (testmode_inject_fault(gp_udpic_dropxmit_percent))
5373-
{
5374-
#ifdef AMS_VERBOSE_LOGGING
5375-
write_log("THROW PKT with seq %d srcpid %d despid %d", buf->pkt->seq, buf->pkt->srcPid, buf->pkt->dstPid);
5376-
#endif
5377-
return;
5378-
}
5379-
#endif
5361+
int count = 0;
53805362

53815363
xmit_retry:
5382-
n = sendto(pEntry->txfd, buf->pkt, buf->pkt->len, 0,
5383-
(struct sockaddr *) &conn->peer, conn->peer_len);
5364+
/*
5365+
* If given retry count is positive, retry up to the limited times.
5366+
* Otherwise, retry for unlimited times until succeed.
5367+
*/
5368+
if (retry > 0 && ++count > retry)
5369+
return n;
5370+
n = sendto(socket, message, length, flags, dest_addr, dest_len);
53845371
if (n < 0)
53855372
{
53865373
int save_errno = errno;
53875374

53885375
if (errno == EINTR)
53895376
goto xmit_retry;
53905377

5391-
if (errno == EAGAIN) /* no space ? not an error. */
5392-
return;
5378+
/*
5379+
* EAGAIN: no space ? not an error.
5380+
*
5381+
* EFAULT: In Linux system call, it only happens when copying a socket
5382+
* address into kernel space failed, which is less likely to happen,
5383+
* but mocked heavily by our fault injection in regression tests.
5384+
*/
5385+
if (errno == EAGAIN || errno == EFAULT)
5386+
return n;
53935387

53945388
/*
53955389
* If Linux iptables (nf_conntrack?) drops an outgoing packet, it may
@@ -5401,20 +5395,52 @@ sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkE
54015395
ereport(LOG,
54025396
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
54035397
errmsg("Interconnect error writing an outgoing packet: %m"),
5404-
errdetail("error during sendto() for Remote Connection: contentId=%d at %s",
5405-
conn->mConn.remoteContentId, conn->mConn.remoteHostAndPort)));
5406-
return;
5398+
errdetail("error during sendto() %s", errDetail)));
5399+
return n;
54075400
}
54085401

54095402
ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
54105403
errmsg("Interconnect error writing an outgoing packet: %m"),
54115404
errdetail("error during sendto() call (error:%d).\n"
5412-
"For Remote Connection: contentId=%d at %s",
5413-
save_errno, conn->mConn.remoteContentId,
5414-
conn->mConn.remoteHostAndPort)));
5405+
"%s", save_errno, errDetail)));
54155406
/* not reached */
54165407
}
54175408

5409+
return n;
5410+
}
5411+
5412+
/*
5413+
* sendOnce
5414+
* Send a packet.
5415+
*/
5416+
static void
5417+
sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkEntry, ICBuffer *buf, MotionConn *mConn)
5418+
{
5419+
int32 n;
5420+
ChunkTransportStateEntryUDP *pEntry = NULL;
5421+
MotionConnUDP *conn = NULL;
5422+
5423+
pEntry = CONTAINER_OF(pChunkEntry, ChunkTransportStateEntryUDP, entry);
5424+
Assert(pEntry);
5425+
5426+
conn = CONTAINER_OF(mConn, MotionConnUDP, mConn);
5427+
5428+
#ifdef USE_ASSERT_CHECKING
5429+
if (testmode_inject_fault(gp_udpic_dropxmit_percent))
5430+
{
5431+
#ifdef AMS_VERBOSE_LOGGING
5432+
write_log("THROW PKT with seq %d srcpid %d despid %d", buf->pkt->seq, buf->pkt->srcPid, buf->pkt->dstPid);
5433+
#endif
5434+
return;
5435+
}
5436+
#endif
5437+
5438+
char errDetail[100];
5439+
snprintf(errDetail, sizeof(errDetail), "For Remote Connection: contentId=%d at %s",
5440+
conn->mConn.remoteContentId,
5441+
conn->mConn.remoteHostAndPort);
5442+
n = sendtoWithRetry(pEntry->txfd, buf->pkt, buf->pkt->len, 0,
5443+
(struct sockaddr *) &conn->peer, conn->peer_len, -1, errDetail);
54185444
if (n != buf->pkt->len)
54195445
{
54205446
if (DEBUG1 >= log_min_messages)
@@ -5426,7 +5452,6 @@ sendOnce(ChunkTransportState *transportStates, ChunkTransportStateEntry *pChunkE
54265452
logPkt("PKT DETAILS ", buf->pkt);
54275453
#endif
54285454
}
5429-
54305455
return;
54315456
}
54325457

@@ -5910,7 +5935,7 @@ checkNetworkTimeout(ICBuffer *buf, uint64 now, bool *networkTimeoutIsLogged)
59105935
ereport(ERROR,
59115936
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
59125937
errmsg("interconnect encountered a network error, please check your network"),
5913-
errdetail("Failed to send packet (seq %d) to %s (pid %d cid %d) after %d retries in %d seconds.",
5938+
errdetail("Failed to send packet (seq %u) to %s (pid %d cid %d) after %u retries in %d seconds.",
59145939
buf->pkt->seq, buf->conn->remoteHostAndPort,
59155940
buf->pkt->dstPid, buf->pkt->dstContentId,
59165941
buf->nRetry, Gp_interconnect_transmit_timeout)));
@@ -5931,7 +5956,7 @@ checkNetworkTimeout(ICBuffer *buf, uint64 now, bool *networkTimeoutIsLogged)
59315956
{
59325957
ereport(WARNING,
59335958
(errmsg("interconnect may encountered a network error, please check your network"),
5934-
errdetail("Failed to send packet (seq %d) to %s (pid %d cid %d) after %d retries.",
5959+
errdetail("Failing to send packet (seq %u) to %s (pid %d cid %d) after %u retries.",
59355960
buf->pkt->seq, buf->conn->remoteHostAndPort,
59365961
buf->pkt->dstPid, buf->pkt->dstContentId,
59375962
buf->nRetry)));

src/test/regress/init_file

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ m/^WARNING: table ".*" contains rows in segment .*, which is outside the # of s
8888
# The following output is generated by \d on foreign tables, so ignore it.
8989
m/Distributed by: \(.*\)/
9090
m/Distributed randomly/
91+
# The following output is an interconnect network warning, but still not error out, so ignore it.
92+
m/WARNING: interconnect may encountered a network error, please check your network/
93+
m/Failing to send packet/
9194

9295
# directory_table test output is sensitive to the user running the tests
9396
m/^NOTICE:.*storage user mapping for .* does not exist for storage server/

0 commit comments

Comments
 (0)