Skip to content

Commit b1ea5cc

Browse files
Chibinzhangyue1818
authored andcommitted
Accommodate for AF_INET6 when doing a motion layer IPC teardown
Previously on commit 70306db18e2, we removed pg_getaddrinfo_all for signal handlers. However, in doing so, the capability of supporting both AF_INET6 and AF_INET was lost; this responsibility must now be handled by us. The commit mentioned above fixed the issue for AF_INET (IPv4), but not for AF_INET6 (IPv6). This commit addresses the situation for both AF_INET and AF_INET6. Reviewed-by: Soumyadeep Chakraborty <soumyadeep2007@gmail.com>
1 parent 39c4f43 commit b1ea5cc

3 files changed

Lines changed: 445 additions & 58 deletions

File tree

contrib/interconnect/udp/ic_udpifc.c

Lines changed: 100 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -640,8 +640,8 @@ typedef struct ICStatistics
640640
/* Statistics for UDP interconnect. */
641641
static ICStatistics ic_statistics;
642642

643-
static struct addrinfo udp_dummy_packet_addrinfo;
644-
static struct sockaddr udp_dummy_packet_sockaddr;
643+
/* Cached sockaddr of the listening udp socket */
644+
static struct sockaddr_storage udp_dummy_packet_sockaddr;
645645

646646
/* UDP listen fd */
647647
int UDP_listenerFd;
@@ -670,10 +670,15 @@ static void setRxThreadError(int eno);
670670
static void resetRxThreadError(void);
671671
static void SendDummyPacket(void);
672672

673+
static void ConvertToIPv4MappedAddr(struct sockaddr_storage *sockaddr, socklen_t *o_len);
674+
#if defined(__darwin__)
675+
#define s6_addr32 __u6_addr.__u6_addr32
676+
static void ConvertIPv6WildcardToLoopback(struct sockaddr_storage* dest);
677+
#endif
673678
static void getSockAddr(struct sockaddr_storage *peer, socklen_t *peer_len, const char *listenerAddr, int listenerPort);
674679
static uint32 setUDPSocketBufferSize(int ic_socket, int buffer_type);
675680
static void setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort,
676-
int *txFamily, struct addrinfo *listenerAddrinfo, struct sockaddr *listenerSockaddr);
681+
int *txFamily, struct sockaddr_storage *listenerSockaddr);
677682
static ChunkTransportStateEntry *startOutgoingUDPConnections(ChunkTransportState *transportStates,
678683
ExecSlice *sendSlice,
679684
int *pOutgoingCount);
@@ -1166,7 +1171,7 @@ resetRxThreadError()
11661171
* Setup udp listening socket.
11671172
*/
11681173
static void
1169-
setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamily, struct addrinfo *listenerAddrinfo, struct sockaddr *listenerSockaddr)
1174+
setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamily, struct sockaddr_storage *listenerSockaddr)
11701175
{
11711176
struct addrinfo *addrs = NULL;
11721177
struct addrinfo *addr;
@@ -1287,16 +1292,6 @@ setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamil
12871292
if (!addr || ic_socket == PGINVALID_SOCKET)
12881293
goto startup_failed;
12891294

1290-
/*
1291-
* cache the successful addrinfo and sockaddr of the listening socket, so
1292-
* we can use this information to connect to the listening socket.
1293-
*/
1294-
if (listenerAddrinfo != NULL && listenerSockaddr != NULL )
1295-
{
1296-
memcpy(listenerAddrinfo, addr, sizeof(udp_dummy_packet_addrinfo));
1297-
memcpy(listenerSockaddr, addr->ai_addr, sizeof(udp_dummy_packet_sockaddr));
1298-
}
1299-
13001295
/* Memorize the socket fd, kernel assigned port and address family */
13011296
*listenerSocketFd = ic_socket;
13021297
if (listenerAddr.ss_family == AF_INET6)
@@ -1310,6 +1305,13 @@ setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamil
13101305
*txFamily = AF_INET;
13111306
}
13121307

1308+
/*
1309+
* cache the successful sockaddr of the listening socket, so
1310+
* we can use this information to connect to the listening socket.
1311+
*/
1312+
if (listenerSockaddr != NULL)
1313+
memcpy(listenerSockaddr, &listenerAddr, sizeof(struct sockaddr_storage));
1314+
13131315
/* Set up socket non-blocking mode */
13141316
if (!pg_set_noblock(ic_socket))
13151317
{
@@ -1442,9 +1444,8 @@ InitMotionUDPIFC(int *listenerSocketFd, int32 *listenerPort)
14421444
/*
14431445
* setup listening socket and sending socket for Interconnect.
14441446
*/
1445-
setupUDPListeningSocket(listenerSocketFd, listenerPort, &txFamily,
1446-
&udp_dummy_packet_addrinfo, &udp_dummy_packet_sockaddr);
1447-
setupUDPListeningSocket(&ICSenderSocket, &ICSenderPort, &ICSenderFamily, NULL, NULL);
1447+
setupUDPListeningSocket(listenerSocketFd, listenerPort, &txFamily, &udp_dummy_packet_sockaddr);
1448+
setupUDPListeningSocket(&ICSenderSocket, &ICSenderPort, &ICSenderFamily, NULL);
14481449

14491450
/* Initialize receive control data. */
14501451
resetMainThreadWaiting(&rx_control_info.mainWaitingState);
@@ -1552,7 +1553,6 @@ static inline void CleanupMotionUDPIFC(void)
15521553
ICSenderPort = 0;
15531554
ICSenderFamily = 0;
15541555

1555-
memset(&udp_dummy_packet_addrinfo, 0, sizeof(udp_dummy_packet_addrinfo));
15561556
memset(&udp_dummy_packet_sockaddr, 0, sizeof(udp_dummy_packet_sockaddr));
15571557

15581558
#ifdef USE_ASSERT_CHECKING
@@ -2848,30 +2848,8 @@ setupOutgoingUDPConnection(ChunkTransportState *transportStates, ChunkTransportS
28482848
*/
28492849
if (pEntry->txfd_family == AF_INET6)
28502850
{
2851-
struct sockaddr_storage temp;
2852-
const struct sockaddr_in *in = (const struct sockaddr_in *) &conn->peer;
2853-
struct sockaddr_in6 *in6_new = (struct sockaddr_in6 *) &temp;
2854-
2855-
memset(&temp, 0, sizeof(temp));
2856-
28572851
elog(DEBUG1, "We are inet6, remote is inet. Converting to v4 mapped address.");
2858-
2859-
/* Construct a V4-to-6 mapped address. */
2860-
temp.ss_family = AF_INET6;
2861-
in6_new->sin6_family = AF_INET6;
2862-
in6_new->sin6_port = in->sin_port;
2863-
in6_new->sin6_flowinfo = 0;
2864-
2865-
memset(&in6_new->sin6_addr, '\0', sizeof(in6_new->sin6_addr));
2866-
/* in6_new->sin6_addr.s6_addr16[5] = 0xffff; */
2867-
((uint16 *) &in6_new->sin6_addr)[5] = 0xffff;
2868-
/* in6_new->sin6_addr.s6_addr32[3] = in->sin_addr.s_addr; */
2869-
memcpy(((char *) &in6_new->sin6_addr) + 12, &(in->sin_addr), 4);
2870-
in6_new->sin6_scope_id = 0;
2871-
2872-
/* copy it back */
2873-
memcpy(&conn->peer, &temp, sizeof(struct sockaddr_in6));
2874-
conn->peer_len = sizeof(struct sockaddr_in6);
2852+
ConvertToIPv4MappedAddr(&conn->peer, &conn->peer_len);
28752853
}
28762854
else
28772855
{
@@ -7237,28 +7215,94 @@ WaitInterconnectQuitUDPIFC(void)
72377215
ic_control_info.threadCreated = false;
72387216
}
72397217

7218+
/*
7219+
* If the socket was created AF_INET6, but the address we want to
7220+
* send to is IPv4 (AF_INET), we need to change the address
7221+
* format. On Linux, this is not necessary: glibc automatically
7222+
* handles this. But on MAC OSX and Solaris, we need to convert
7223+
* the IPv4 address to IPv4-mapped IPv6 address in AF_INET6 format.
7224+
*
7225+
* The comment above relies on getaddrinfo() via function getSockAddr to get
7226+
* the correct V4-mapped address. We need to be careful here as we need to
7227+
* ensure that the platform we are using is POSIX 1003-2001 compliant.
7228+
* Just to be on the safeside, we'll be keeping this function for
7229+
* now to be used for all platforms and not rely on POSIX.
7230+
*
7231+
* Since this can be called in a signal handler, we avoid the use of
7232+
* async-signal unsafe functions such as memset/memcpy
7233+
*/
7234+
static void
7235+
ConvertToIPv4MappedAddr(struct sockaddr_storage *sockaddr, socklen_t *o_len)
7236+
{
7237+
const struct sockaddr_in *in = (const struct sockaddr_in *) sockaddr;
7238+
struct sockaddr_storage temp = {0};
7239+
struct sockaddr_in6 *in6_new = (struct sockaddr_in6 *) &temp;
7240+
7241+
/* Construct a IPv4-to-IPv6 mapped address. */
7242+
temp.ss_family = AF_INET6;
7243+
in6_new->sin6_family = AF_INET6;
7244+
in6_new->sin6_port = in->sin_port;
7245+
in6_new->sin6_flowinfo = 0;
7246+
7247+
((uint16 *) &in6_new->sin6_addr)[5] = 0xffff;
7248+
7249+
in6_new->sin6_addr.s6_addr32[3] = in->sin_addr.s_addr;
7250+
in6_new->sin6_scope_id = 0;
7251+
7252+
/* copy it back */
7253+
*sockaddr = temp;
7254+
*o_len = sizeof(struct sockaddr_in6);
7255+
}
7256+
7257+
#if defined(__darwin__)
7258+
/* macos does not accept :: as the destination, we will need to covert this to the IPv6 loopback */
7259+
static void
7260+
ConvertIPv6WildcardToLoopback(struct sockaddr_storage* dest)
7261+
{
7262+
char address[INET6_ADDRSTRLEN];
7263+
/* we want to terminate our own process, so this should be local */
7264+
const struct sockaddr_in6 *in6 = (const struct sockaddr_in6 *) &udp_dummy_packet_sockaddr;
7265+
inet_ntop(AF_INET6, &in6->sin6_addr, address, sizeof(address));
7266+
if (strcmp("::", address) == 0)
7267+
((struct sockaddr_in6 *)dest)->sin6_addr = in6addr_loopback;
7268+
}
7269+
#endif
7270+
72407271
/*
72417272
* Send a dummy packet to interconnect thread to exit poll() immediately
72427273
*/
72437274
static void
72447275
SendDummyPacket(void)
72457276
{
72467277
int ret;
7247-
in_port_t udp_listener_port;
72487278
char *dummy_pkt = "stop it";
72497279
int counter;
7250-
struct sockaddr_in *addr_in = NULL;
7251-
struct sockaddr_in dest_addr;
7252-
/*
7253-
* Get address info from interconnect udp listener port
7254-
*/
7255-
udp_listener_port = (Gp_listener_port >> 16) & 0x0ffff;
7280+
struct sockaddr_storage dest;
7281+
socklen_t dest_len;
7282+
7283+
Assert(udp_dummy_packet_sockaddr.ss_family == AF_INET || udp_dummy_packet_sockaddr.ss_family == AF_INET6);
7284+
Assert(ICSenderFamily == AF_INET || ICSenderFamily == AF_INET6);
72567285

7257-
addr_in = (struct sockaddr_in *) &udp_dummy_packet_sockaddr;
7258-
memset(&dest_addr, 0, sizeof(dest_addr));
7259-
dest_addr.sin_family = addr_in->sin_family;
7260-
dest_addr.sin_port = htons(udp_listener_port);
7261-
dest_addr.sin_addr.s_addr = addr_in->sin_addr.s_addr;
7286+
dest = udp_dummy_packet_sockaddr;
7287+
dest_len = (ICSenderFamily == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6);
7288+
7289+
if (ICSenderFamily == AF_INET6)
7290+
{
7291+
#if defined(__darwin__)
7292+
if (udp_dummy_packet_sockaddr.ss_family == AF_INET6)
7293+
ConvertIPv6WildcardToLoopback(&dest);
7294+
#endif
7295+
if (udp_dummy_packet_sockaddr.ss_family == AF_INET)
7296+
ConvertToIPv4MappedAddr(&dest, &dest_len);
7297+
}
7298+
7299+
if (ICSenderFamily == AF_INET && udp_dummy_packet_sockaddr.ss_family == AF_INET6)
7300+
{
7301+
/* the size of AF_INET6 is bigger than the side of IPv4, so
7302+
* converting from IPv6 to IPv4 may potentially not work. */
7303+
ereport(LOG, errmsg("sending dummy packet failed: cannot send from AF_INET to receiving on AF_INET6"));
7304+
return;
7305+
}
72627306

72637307
/*
72647308
* Send a dummy package to the interconnect listener, try 10 times.
@@ -7269,24 +7313,22 @@ SendDummyPacket(void)
72697313
while (counter < 10)
72707314
{
72717315
counter++;
7272-
ret = sendto(ICSenderSocket, dummy_pkt, strlen(dummy_pkt), 0, (struct sockaddr *) &dest_addr, sizeof(dest_addr));
7316+
ret = sendto(ICSenderSocket, dummy_pkt, strlen(dummy_pkt), 0, (struct sockaddr *) &dest, dest_len);
72737317
if (ret < 0)
72747318
{
72757319
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
72767320
continue;
72777321
else
72787322
{
7279-
elog(LOG, "send dummy packet failed, sendto failed: %m");
7323+
ereport(LOG, errmsg("send dummy packet failed, sendto failed: %m"));
72807324
return;
72817325
}
72827326
}
72837327
break;
72847328
}
72857329

72867330
if (counter >= 10)
7287-
{
7288-
elog(LOG, "send dummy packet failed, sendto failed with 10 times: %m");
7289-
}
7331+
ereport(LOG, errmsg("send dummy packet failed, sendto failed with 10 times: %m"));
72907332
}
72917333

72927334
void logChunkParseDetails(MotionConn *conn, uint32 ic_instance_id)
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
subdir=contrib/interconnect/udp
2+
top_builddir=../../../../..
3+
include $(top_builddir)/src/Makefile.global
4+
5+
TARGETS=cdbsenddummypacket
6+
7+
include $(top_builddir)/src/backend/mock.mk
8+
9+
cdbsenddummypacket.t: EXCL_OBJS += contrib/interconnect/udp/ic_udpifc.o
10+
cdbsenddummypacket.t: \
11+
$(MOCK_DIR)/backend/access/hash/hash_mock.o \
12+
$(MOCK_DIR)/backend/utils/fmgr/fmgr_mock.o

0 commit comments

Comments
 (0)