@@ -706,6 +706,9 @@ typedef struct ICStatistics
706706/* Statistics for UDP interconnect. */
707707static ICStatistics ic_statistics ;
708708
709+ static struct addrinfo udp_dummy_packet_addrinfo ;
710+ static struct sockaddr udp_dummy_packet_sockaddr ;
711+
709712/* UDP listen fd */
710713int UDP_listenerFd ;
711714
@@ -735,7 +738,8 @@ static void SendDummyPacket(void);
735738
736739static void getSockAddr (struct sockaddr_storage * peer , socklen_t * peer_len , const char * listenerAddr , int listenerPort );
737740static uint32 setUDPSocketBufferSize (int ic_socket , int buffer_type );
738- static void setupUDPListeningSocket (int * listenerSocketFd , int32 * listenerPort , int * txFamily );
741+ static void setupUDPListeningSocket (int * listenerSocketFd , int32 * listenerPort ,
742+ int * txFamily , struct addrinfo * listenerAddrinfo , struct sockaddr * listenerSockaddr );
739743static ChunkTransportStateEntry * startOutgoingUDPConnections (ChunkTransportState * transportStates ,
740744 ExecSlice * sendSlice ,
741745 int * pOutgoingCount );
@@ -1568,13 +1572,12 @@ resetRxThreadError()
15681572 pg_atomic_write_u32 (& ic_control_info .eno , 0 );
15691573}
15701574
1571-
15721575/*
15731576 * setupUDPListeningSocket
15741577 * Setup udp listening socket.
15751578 */
15761579static void
1577- setupUDPListeningSocket (int * listenerSocketFd , int32 * listenerPort , int * txFamily )
1580+ setupUDPListeningSocket (int * listenerSocketFd , int32 * listenerPort , int * txFamily , struct addrinfo * listenerAddrinfo , struct sockaddr * listenerSockaddr )
15781581{
15791582 struct addrinfo * addrs = NULL ;
15801583 struct addrinfo * addr ;
@@ -1695,6 +1698,16 @@ setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamil
16951698 if (!addr || ic_socket == PGINVALID_SOCKET )
16961699 goto startup_failed ;
16971700
1701+ /*
1702+ * cache the successful addrinfo and sockaddr of the listening socket, so
1703+ * we can use this information to connect to the listening socket.
1704+ */
1705+ if (listenerAddrinfo != NULL && listenerSockaddr != NULL )
1706+ {
1707+ memcpy (listenerAddrinfo , addr , sizeof (udp_dummy_packet_addrinfo ));
1708+ memcpy (listenerSockaddr , addr -> ai_addr , sizeof (udp_dummy_packet_sockaddr ));
1709+ }
1710+
16981711 /* Memorize the socket fd, kernel assigned port and address family */
16991712 * listenerSocketFd = ic_socket ;
17001713 if (listenerAddr .ss_family == AF_INET6 )
@@ -1840,8 +1853,9 @@ InitMotionUDPIFC(int *listenerSocketFd, int32 *listenerPort)
18401853 /*
18411854 * setup listening socket and sending socket for Interconnect.
18421855 */
1843- setupUDPListeningSocket (listenerSocketFd , listenerPort , & txFamily );
1844- setupUDPListeningSocket (& ICSenderSocket , & ICSenderPort , & ICSenderFamily );
1856+ setupUDPListeningSocket (listenerSocketFd , listenerPort , & txFamily ,
1857+ & udp_dummy_packet_addrinfo , & udp_dummy_packet_sockaddr );
1858+ setupUDPListeningSocket (& ICSenderSocket , & ICSenderPort , & ICSenderFamily , NULL , NULL );
18451859
18461860 /* Initialize receive control data. */
18471861 resetMainThreadWaiting (& rx_control_info .mainWaitingState );
@@ -1949,6 +1963,9 @@ static inline void CleanupMotionUDPIFC(void)
19491963 ICSenderPort = 0 ;
19501964 ICSenderFamily = 0 ;
19511965
1966+ memset (& udp_dummy_packet_addrinfo , 0 , sizeof (udp_dummy_packet_addrinfo ));
1967+ memset (& udp_dummy_packet_sockaddr , 0 , sizeof (udp_dummy_packet_sockaddr ));
1968+
19521969#ifdef USE_ASSERT_CHECKING
19531970
19541971 /*
@@ -8033,104 +8050,50 @@ WaitInterconnectQuitUDPIFC(void)
80338050static void
80348051SendDummyPacket (void )
80358052{
8036- int sockfd = -1 ;
8037- int ret ;
8038- struct addrinfo * addrs = NULL ;
8039- struct addrinfo * rp ;
8040- struct addrinfo hint ;
8041- uint16 udp_listener ;
8042- char port_str [32 ] = {0 };
8043- char * dummy_pkt = "stop it" ;
8044- int counter ;
8045-
8053+ int ret ;
8054+ in_port_t udp_listener_port ;
8055+ char * dummy_pkt = "stop it" ;
8056+ int counter ;
8057+ struct sockaddr_in * addr_in = NULL ;
8058+ struct sockaddr_in dest_addr ;
80468059 /*
80478060 * Get address info from interconnect udp listener port
80488061 */
8049- udp_listener = GetListenPortUDP ();
8050- snprintf (port_str , sizeof (port_str ), "%d" , udp_listener );
8051-
8052- MemSet (& hint , 0 , sizeof (hint ));
8053- hint .ai_socktype = SOCK_DGRAM ;
8054- hint .ai_family = AF_UNSPEC ; /* Allow for IPv4 or IPv6 */
8055-
8056- /* Never do name resolution */
8057- #ifdef AI_NUMERICSERV
8058- hint .ai_flags = AI_NUMERICHOST | AI_NUMERICSERV ;
8059- #else
8060- hint .ai_flags = AI_NUMERICHOST ;
8061- #endif
8062-
8063- ret = pg_getaddrinfo_all (interconnect_address , port_str , & hint , & addrs );
8064- if (ret || !addrs )
8065- {
8066- elog (LOG , "send dummy packet failed, pg_getaddrinfo_all(): %m" );
8067- goto send_error ;
8068- }
8069-
8070- for (rp = addrs ; rp != NULL ; rp = rp -> ai_next )
8071- {
8072- /* Create socket according to pg_getaddrinfo_all() */
8073- sockfd = socket (rp -> ai_family , rp -> ai_socktype , rp -> ai_protocol );
8074- if (sockfd < 0 )
8075- continue ;
8062+ udp_listener_port = (Gp_listener_port >> 16 ) & 0x0ffff ;
80768063
8077- if (!pg_set_noblock (sockfd ))
8078- {
8079- if (sockfd >= 0 )
8080- {
8081- closesocket (sockfd );
8082- sockfd = -1 ;
8083- }
8084- continue ;
8085- }
8086- break ;
8087- }
8088-
8089- if (rp == NULL )
8090- {
8091- elog (LOG , "send dummy packet failed, create socket failed: %m" );
8092- goto send_error ;
8093- }
8064+ addr_in = (struct sockaddr_in * ) & udp_dummy_packet_sockaddr ;
8065+ memset (& dest_addr , 0 , sizeof (dest_addr ));
8066+ dest_addr .sin_family = addr_in -> sin_family ;
8067+ dest_addr .sin_port = htons (udp_listener_port );
8068+ dest_addr .sin_addr .s_addr = addr_in -> sin_addr .s_addr ;
80948069
80958070 /*
8096- * Send a dummy package to the interconnect listener, try 10 times
8071+ * Send a dummy package to the interconnect listener, try 10 times.
8072+ * We don't want to close the socket at the end of this function, since
8073+ * the socket will eventually close during the motion layer cleanup.
80978074 */
8098-
80998075 counter = 0 ;
81008076 while (counter < 10 )
81018077 {
81028078 counter ++ ;
8103- ret = sendto (sockfd , dummy_pkt , strlen (dummy_pkt ), 0 , rp -> ai_addr , rp -> ai_addrlen );
8079+ ret = sendto (ICSenderSocket , dummy_pkt , strlen (dummy_pkt ), 0 , ( struct sockaddr * ) & dest_addr , sizeof ( dest_addr ) );
81048080 if (ret < 0 )
81058081 {
81068082 if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK )
81078083 continue ;
81088084 else
81098085 {
81108086 elog (LOG , "send dummy packet failed, sendto failed: %m" );
8111- goto send_error ;
8087+ return ;
81128088 }
81138089 }
81148090 break ;
81158091 }
81168092
81178093 if (counter >= 10 )
81188094 {
8119- elog (LOG , "send dummy packet failed, sendto failed: %m" );
8120- goto send_error ;
8095+ elog (LOG , "send dummy packet failed, sendto failed with 10 times: %m" );
81218096 }
8122-
8123- pg_freeaddrinfo_all (hint .ai_family , addrs );
8124- closesocket (sockfd );
8125- return ;
8126-
8127- send_error :
8128-
8129- if (addrs )
8130- pg_freeaddrinfo_all (hint .ai_family , addrs );
8131- if (sockfd != -1 )
8132- closesocket (sockfd );
8133- return ;
81348097}
81358098
81368099void logChunkParseDetails (MotionConn * conn , uint32 ic_instance_id )
0 commit comments