Skip to content

Commit bcc7749

Browse files
Chibintuhaihe
authored andcommitted
Accommodate for AF_INET6 when doing a motion layer IPC teardown
Previously on commit 70306db18e2, we removed pg_getaddrinfo_all for signal handlers. However, in doing so, the capability of supporting both AF_INET6 and AF_INET was lost; this responsibility must now be handled by us. The commit mentioned above fixed the issue for AF_INET (IPv4), but not for AF_INET6 (IPv6). This commit addresses the situation for both AF_INET and AF_INET6. Reviewed-by: Soumyadeep Chakraborty <soumyadeep2007@gmail.com>
1 parent 865b2e5 commit bcc7749

1 file changed

Lines changed: 100 additions & 58 deletions

File tree

contrib/interconnect/udp/ic_udpifc.c

Lines changed: 100 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -706,8 +706,8 @@ typedef struct ICStatistics
706706
/* Statistics for UDP interconnect. */
707707
static ICStatistics ic_statistics;
708708

709-
static struct addrinfo udp_dummy_packet_addrinfo;
710-
static struct sockaddr udp_dummy_packet_sockaddr;
709+
/* Cached sockaddr of the listening udp socket */
710+
static struct sockaddr_storage udp_dummy_packet_sockaddr;
711711

712712
/* UDP listen fd */
713713
int UDP_listenerFd;
@@ -736,10 +736,15 @@ static void setRxThreadError(int eno);
736736
static void resetRxThreadError(void);
737737
static void SendDummyPacket(void);
738738

739+
static void ConvertToIPv4MappedAddr(struct sockaddr_storage *sockaddr, socklen_t *o_len);
740+
#if defined(__darwin__)
741+
#define s6_addr32 __u6_addr.__u6_addr32
742+
static void ConvertIPv6WildcardToLoopback(struct sockaddr_storage* dest);
743+
#endif
739744
static void getSockAddr(struct sockaddr_storage *peer, socklen_t *peer_len, const char *listenerAddr, int listenerPort);
740745
static uint32 setUDPSocketBufferSize(int ic_socket, int buffer_type);
741746
static void setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort,
742-
int *txFamily, struct addrinfo *listenerAddrinfo, struct sockaddr *listenerSockaddr);
747+
int *txFamily, struct sockaddr_storage *listenerSockaddr);
743748
static ChunkTransportStateEntry *startOutgoingUDPConnections(ChunkTransportState *transportStates,
744749
ExecSlice *sendSlice,
745750
int *pOutgoingCount);
@@ -1577,7 +1582,7 @@ resetRxThreadError()
15771582
* Setup udp listening socket.
15781583
*/
15791584
static void
1580-
setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamily, struct addrinfo *listenerAddrinfo, struct sockaddr *listenerSockaddr)
1585+
setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamily, struct sockaddr_storage *listenerSockaddr)
15811586
{
15821587
struct addrinfo *addrs = NULL;
15831588
struct addrinfo *addr;
@@ -1698,16 +1703,6 @@ setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamil
16981703
if (!addr || ic_socket == PGINVALID_SOCKET)
16991704
goto startup_failed;
17001705

1701-
/*
1702-
* cache the successful addrinfo and sockaddr of the listening socket, so
1703-
* we can use this information to connect to the listening socket.
1704-
*/
1705-
if (listenerAddrinfo != NULL && listenerSockaddr != NULL )
1706-
{
1707-
memcpy(listenerAddrinfo, addr, sizeof(udp_dummy_packet_addrinfo));
1708-
memcpy(listenerSockaddr, addr->ai_addr, sizeof(udp_dummy_packet_sockaddr));
1709-
}
1710-
17111706
/* Memorize the socket fd, kernel assigned port and address family */
17121707
*listenerSocketFd = ic_socket;
17131708
if (listenerAddr.ss_family == AF_INET6)
@@ -1721,6 +1716,13 @@ setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamil
17211716
*txFamily = AF_INET;
17221717
}
17231718

1719+
/*
1720+
* cache the successful sockaddr of the listening socket, so
1721+
* we can use this information to connect to the listening socket.
1722+
*/
1723+
if (listenerSockaddr != NULL)
1724+
memcpy(listenerSockaddr, &listenerAddr, sizeof(struct sockaddr_storage));
1725+
17241726
/* Set up socket non-blocking mode */
17251727
if (!pg_set_noblock(ic_socket))
17261728
{
@@ -1853,9 +1855,8 @@ InitMotionUDPIFC(int *listenerSocketFd, int32 *listenerPort)
18531855
/*
18541856
* setup listening socket and sending socket for Interconnect.
18551857
*/
1856-
setupUDPListeningSocket(listenerSocketFd, listenerPort, &txFamily,
1857-
&udp_dummy_packet_addrinfo, &udp_dummy_packet_sockaddr);
1858-
setupUDPListeningSocket(&ICSenderSocket, &ICSenderPort, &ICSenderFamily, NULL, NULL);
1858+
setupUDPListeningSocket(listenerSocketFd, listenerPort, &txFamily, &udp_dummy_packet_sockaddr);
1859+
setupUDPListeningSocket(&ICSenderSocket, &ICSenderPort, &ICSenderFamily, NULL);
18591860

18601861
/* Initialize receive control data. */
18611862
resetMainThreadWaiting(&rx_control_info.mainWaitingState);
@@ -1963,7 +1964,6 @@ static inline void CleanupMotionUDPIFC(void)
19631964
ICSenderPort = 0;
19641965
ICSenderFamily = 0;
19651966

1966-
memset(&udp_dummy_packet_addrinfo, 0, sizeof(udp_dummy_packet_addrinfo));
19671967
memset(&udp_dummy_packet_sockaddr, 0, sizeof(udp_dummy_packet_sockaddr));
19681968

19691969
#ifdef USE_ASSERT_CHECKING
@@ -3270,30 +3270,8 @@ setupOutgoingUDPConnection(ChunkTransportState *transportStates, ChunkTransportS
32703270
*/
32713271
if (pEntry->txfd_family == AF_INET6)
32723272
{
3273-
struct sockaddr_storage temp;
3274-
const struct sockaddr_in *in = (const struct sockaddr_in *) &conn->peer;
3275-
struct sockaddr_in6 *in6_new = (struct sockaddr_in6 *) &temp;
3276-
3277-
memset(&temp, 0, sizeof(temp));
3278-
32793273
elog(DEBUG1, "We are inet6, remote is inet. Converting to v4 mapped address.");
3280-
3281-
/* Construct a V4-to-6 mapped address. */
3282-
temp.ss_family = AF_INET6;
3283-
in6_new->sin6_family = AF_INET6;
3284-
in6_new->sin6_port = in->sin_port;
3285-
in6_new->sin6_flowinfo = 0;
3286-
3287-
memset(&in6_new->sin6_addr, '\0', sizeof(in6_new->sin6_addr));
3288-
/* in6_new->sin6_addr.s6_addr16[5] = 0xffff; */
3289-
((uint16 *) &in6_new->sin6_addr)[5] = 0xffff;
3290-
/* in6_new->sin6_addr.s6_addr32[3] = in->sin_addr.s_addr; */
3291-
memcpy(((char *) &in6_new->sin6_addr) + 12, &(in->sin_addr), 4);
3292-
in6_new->sin6_scope_id = 0;
3293-
3294-
/* copy it back */
3295-
memcpy(&conn->peer, &temp, sizeof(struct sockaddr_in6));
3296-
conn->peer_len = sizeof(struct sockaddr_in6);
3274+
ConvertToIPv4MappedAddr(&conn->peer, &conn->peer_len);
32973275
}
32983276
else
32993277
{
@@ -8044,28 +8022,94 @@ WaitInterconnectQuitUDPIFC(void)
80448022
ic_control_info.threadCreated = false;
80458023
}
80468024

8025+
/*
8026+
* If the socket was created AF_INET6, but the address we want to
8027+
* send to is IPv4 (AF_INET), we need to change the address
8028+
* format. On Linux, this is not necessary: glibc automatically
8029+
* handles this. But on MAC OSX and Solaris, we need to convert
8030+
* the IPv4 address to IPv4-mapped IPv6 address in AF_INET6 format.
8031+
*
8032+
* The comment above relies on getaddrinfo() via function getSockAddr to get
8033+
* the correct V4-mapped address. We need to be careful here as we need to
8034+
* ensure that the platform we are using is POSIX 1003-2001 compliant.
8035+
* Just to be on the safeside, we'll be keeping this function for
8036+
* now to be used for all platforms and not rely on POSIX.
8037+
*
8038+
* Since this can be called in a signal handler, we avoid the use of
8039+
* async-signal unsafe functions such as memset/memcpy
8040+
*/
8041+
static void
8042+
ConvertToIPv4MappedAddr(struct sockaddr_storage *sockaddr, socklen_t *o_len)
8043+
{
8044+
const struct sockaddr_in *in = (const struct sockaddr_in *) sockaddr;
8045+
struct sockaddr_storage temp = {0};
8046+
struct sockaddr_in6 *in6_new = (struct sockaddr_in6 *) &temp;
8047+
8048+
/* Construct a IPv4-to-IPv6 mapped address. */
8049+
temp.ss_family = AF_INET6;
8050+
in6_new->sin6_family = AF_INET6;
8051+
in6_new->sin6_port = in->sin_port;
8052+
in6_new->sin6_flowinfo = 0;
8053+
8054+
((uint16 *) &in6_new->sin6_addr)[5] = 0xffff;
8055+
8056+
in6_new->sin6_addr.s6_addr32[3] = in->sin_addr.s_addr;
8057+
in6_new->sin6_scope_id = 0;
8058+
8059+
/* copy it back */
8060+
*sockaddr = temp;
8061+
*o_len = sizeof(struct sockaddr_in6);
8062+
}
8063+
8064+
#if defined(__darwin__)
8065+
/* macos does not accept :: as the destination, we will need to covert this to the IPv6 loopback */
8066+
static void
8067+
ConvertIPv6WildcardToLoopback(struct sockaddr_storage* dest)
8068+
{
8069+
char address[INET6_ADDRSTRLEN];
8070+
/* we want to terminate our own process, so this should be local */
8071+
const struct sockaddr_in6 *in6 = (const struct sockaddr_in6 *) &udp_dummy_packet_sockaddr;
8072+
inet_ntop(AF_INET6, &in6->sin6_addr, address, sizeof(address));
8073+
if (strcmp("::", address) == 0)
8074+
((struct sockaddr_in6 *)dest)->sin6_addr = in6addr_loopback;
8075+
}
8076+
#endif
8077+
80478078
/*
80488079
* Send a dummy packet to interconnect thread to exit poll() immediately
80498080
*/
80508081
static void
80518082
SendDummyPacket(void)
80528083
{
80538084
int ret;
8054-
in_port_t udp_listener_port;
80558085
char *dummy_pkt = "stop it";
80568086
int counter;
8057-
struct sockaddr_in *addr_in = NULL;
8058-
struct sockaddr_in dest_addr;
8059-
/*
8060-
* Get address info from interconnect udp listener port
8061-
*/
8062-
udp_listener_port = (Gp_listener_port >> 16) & 0x0ffff;
8087+
struct sockaddr_storage dest;
8088+
socklen_t dest_len;
80638089

8064-
addr_in = (struct sockaddr_in *) &udp_dummy_packet_sockaddr;
8065-
memset(&dest_addr, 0, sizeof(dest_addr));
8066-
dest_addr.sin_family = addr_in->sin_family;
8067-
dest_addr.sin_port = htons(udp_listener_port);
8068-
dest_addr.sin_addr.s_addr = addr_in->sin_addr.s_addr;
8090+
Assert(udp_dummy_packet_sockaddr.ss_family == AF_INET || udp_dummy_packet_sockaddr.ss_family == AF_INET6);
8091+
Assert(ICSenderFamily == AF_INET || ICSenderFamily == AF_INET6);
8092+
8093+
dest = udp_dummy_packet_sockaddr;
8094+
dest_len = (ICSenderFamily == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6);
8095+
8096+
if (ICSenderFamily == AF_INET6)
8097+
{
8098+
#if defined(__darwin__)
8099+
if (udp_dummy_packet_sockaddr.ss_family == AF_INET6)
8100+
ConvertIPv6WildcardToLoopback(&dest);
8101+
#endif
8102+
if (udp_dummy_packet_sockaddr.ss_family == AF_INET)
8103+
ConvertToIPv4MappedAddr(&dest, &dest_len);
8104+
}
8105+
8106+
if (ICSenderFamily == AF_INET && udp_dummy_packet_sockaddr.ss_family == AF_INET6)
8107+
{
8108+
/* the size of AF_INET6 is bigger than the side of IPv4, so
8109+
* converting from IPv6 to IPv4 may potentially not work. */
8110+
ereport(LOG, errmsg("sending dummy packet failed: cannot send from AF_INET to receiving on AF_INET6"));
8111+
return;
8112+
}
80698113

80708114
/*
80718115
* Send a dummy package to the interconnect listener, try 10 times.
@@ -8076,24 +8120,22 @@ SendDummyPacket(void)
80768120
while (counter < 10)
80778121
{
80788122
counter++;
8079-
ret = sendto(ICSenderSocket, dummy_pkt, strlen(dummy_pkt), 0, (struct sockaddr *) &dest_addr, sizeof(dest_addr));
8123+
ret = sendto(ICSenderSocket, dummy_pkt, strlen(dummy_pkt), 0, (struct sockaddr *) &dest, dest_len);
80808124
if (ret < 0)
80818125
{
80828126
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
80838127
continue;
80848128
else
80858129
{
8086-
elog(LOG, "send dummy packet failed, sendto failed: %m");
8130+
ereport(LOG, errmsg("send dummy packet failed, sendto failed: %m"));
80878131
return;
80888132
}
80898133
}
80908134
break;
80918135
}
80928136

80938137
if (counter >= 10)
8094-
{
8095-
elog(LOG, "send dummy packet failed, sendto failed with 10 times: %m");
8096-
}
8138+
ereport(LOG, errmsg("send dummy packet failed, sendto failed with 10 times: %m"));
80978139
}
80988140

80998141
void logChunkParseDetails(MotionConn *conn, uint32 ic_instance_id)

0 commit comments

Comments
 (0)