@@ -640,6 +640,9 @@ typedef struct ICStatistics
640640/* Statistics for UDP interconnect. */
641641static ICStatistics ic_statistics ;
642642
643+ static struct addrinfo udp_dummy_packet_addrinfo ;
644+ static struct sockaddr udp_dummy_packet_sockaddr ;
645+
643646/* UDP listen fd */
644647int UDP_listenerFd ;
645648
@@ -669,7 +672,8 @@ static void SendDummyPacket(void);
669672
670673static void getSockAddr (struct sockaddr_storage * peer , socklen_t * peer_len , const char * listenerAddr , int listenerPort );
671674static uint32 setUDPSocketBufferSize (int ic_socket , int buffer_type );
672- static void setupUDPListeningSocket (int * listenerSocketFd , int32 * listenerPort , int * txFamily );
675+ static void setupUDPListeningSocket (int * listenerSocketFd , int32 * listenerPort ,
676+ int * txFamily , struct addrinfo * listenerAddrinfo , struct sockaddr * listenerSockaddr );
673677static ChunkTransportStateEntry * startOutgoingUDPConnections (ChunkTransportState * transportStates ,
674678 ExecSlice * sendSlice ,
675679 int * pOutgoingCount );
@@ -1157,13 +1161,12 @@ resetRxThreadError()
11571161 pg_atomic_write_u32 (& ic_control_info .eno , 0 );
11581162}
11591163
1160-
11611164/*
11621165 * setupUDPListeningSocket
11631166 * Setup udp listening socket.
11641167 */
11651168static void
1166- setupUDPListeningSocket (int * listenerSocketFd , int32 * listenerPort , int * txFamily )
1169+ setupUDPListeningSocket (int * listenerSocketFd , int32 * listenerPort , int * txFamily , struct addrinfo * listenerAddrinfo , struct sockaddr * listenerSockaddr )
11671170{
11681171 struct addrinfo * addrs = NULL ;
11691172 struct addrinfo * addr ;
@@ -1284,6 +1287,16 @@ setupUDPListeningSocket(int *listenerSocketFd, int32 *listenerPort, int *txFamil
12841287 if (!addr || ic_socket == PGINVALID_SOCKET )
12851288 goto startup_failed ;
12861289
1290+ /*
1291+ * cache the successful addrinfo and sockaddr of the listening socket, so
1292+ * we can use this information to connect to the listening socket.
1293+ */
1294+ if (listenerAddrinfo != NULL && listenerSockaddr != NULL )
1295+ {
1296+ memcpy (listenerAddrinfo , addr , sizeof (udp_dummy_packet_addrinfo ));
1297+ memcpy (listenerSockaddr , addr -> ai_addr , sizeof (udp_dummy_packet_sockaddr ));
1298+ }
1299+
12871300 /* Memorize the socket fd, kernel assigned port and address family */
12881301 * listenerSocketFd = ic_socket ;
12891302 if (listenerAddr .ss_family == AF_INET6 )
@@ -1429,8 +1442,9 @@ InitMotionUDPIFC(int *listenerSocketFd, int32 *listenerPort)
14291442 /*
14301443 * setup listening socket and sending socket for Interconnect.
14311444 */
1432- setupUDPListeningSocket (listenerSocketFd , listenerPort , & txFamily );
1433- setupUDPListeningSocket (& ICSenderSocket , & ICSenderPort , & ICSenderFamily );
1445+ setupUDPListeningSocket (listenerSocketFd , listenerPort , & txFamily ,
1446+ & udp_dummy_packet_addrinfo , & udp_dummy_packet_sockaddr );
1447+ setupUDPListeningSocket (& ICSenderSocket , & ICSenderPort , & ICSenderFamily , NULL , NULL );
14341448
14351449 /* Initialize receive control data. */
14361450 resetMainThreadWaiting (& rx_control_info .mainWaitingState );
@@ -1538,6 +1552,9 @@ static inline void CleanupMotionUDPIFC(void)
15381552 ICSenderPort = 0 ;
15391553 ICSenderFamily = 0 ;
15401554
1555+ memset (& udp_dummy_packet_addrinfo , 0 , sizeof (udp_dummy_packet_addrinfo ));
1556+ memset (& udp_dummy_packet_sockaddr , 0 , sizeof (udp_dummy_packet_sockaddr ));
1557+
15411558#ifdef USE_ASSERT_CHECKING
15421559
15431560 /*
@@ -7226,104 +7243,50 @@ WaitInterconnectQuitUDPIFC(void)
72267243static void
72277244SendDummyPacket (void )
72287245{
7229- int sockfd = -1 ;
7230- int ret ;
7231- struct addrinfo * addrs = NULL ;
7232- struct addrinfo * rp ;
7233- struct addrinfo hint ;
7234- uint16 udp_listener ;
7235- char port_str [32 ] = {0 };
7236- char * dummy_pkt = "stop it" ;
7237- int counter ;
7238-
7246+ int ret ;
7247+ in_port_t udp_listener_port ;
7248+ char * dummy_pkt = "stop it" ;
7249+ int counter ;
7250+ struct sockaddr_in * addr_in = NULL ;
7251+ struct sockaddr_in dest_addr ;
72397252 /*
72407253 * Get address info from interconnect udp listener port
72417254 */
7242- udp_listener = GetListenPortUDP ();
7243- snprintf (port_str , sizeof (port_str ), "%d" , udp_listener );
7244-
7245- MemSet (& hint , 0 , sizeof (hint ));
7246- hint .ai_socktype = SOCK_DGRAM ;
7247- hint .ai_family = AF_UNSPEC ; /* Allow for IPv4 or IPv6 */
7248-
7249- /* Never do name resolution */
7250- #ifdef AI_NUMERICSERV
7251- hint .ai_flags = AI_NUMERICHOST | AI_NUMERICSERV ;
7252- #else
7253- hint .ai_flags = AI_NUMERICHOST ;
7254- #endif
7255-
7256- ret = pg_getaddrinfo_all (interconnect_address , port_str , & hint , & addrs );
7257- if (ret || !addrs )
7258- {
7259- elog (LOG , "send dummy packet failed, pg_getaddrinfo_all(): %m" );
7260- goto send_error ;
7261- }
7262-
7263- for (rp = addrs ; rp != NULL ; rp = rp -> ai_next )
7264- {
7265- /* Create socket according to pg_getaddrinfo_all() */
7266- sockfd = socket (rp -> ai_family , rp -> ai_socktype , rp -> ai_protocol );
7267- if (sockfd < 0 )
7268- continue ;
7255+ udp_listener_port = (Gp_listener_port >> 16 ) & 0x0ffff ;
72697256
7270- if (!pg_set_noblock (sockfd ))
7271- {
7272- if (sockfd >= 0 )
7273- {
7274- closesocket (sockfd );
7275- sockfd = -1 ;
7276- }
7277- continue ;
7278- }
7279- break ;
7280- }
7281-
7282- if (rp == NULL )
7283- {
7284- elog (LOG , "send dummy packet failed, create socket failed: %m" );
7285- goto send_error ;
7286- }
7257+ addr_in = (struct sockaddr_in * ) & udp_dummy_packet_sockaddr ;
7258+ memset (& dest_addr , 0 , sizeof (dest_addr ));
7259+ dest_addr .sin_family = addr_in -> sin_family ;
7260+ dest_addr .sin_port = htons (udp_listener_port );
7261+ dest_addr .sin_addr .s_addr = addr_in -> sin_addr .s_addr ;
72877262
72887263 /*
7289- * Send a dummy package to the interconnect listener, try 10 times
7264+ * Send a dummy package to the interconnect listener, try 10 times.
7265+ * We don't want to close the socket at the end of this function, since
7266+ * the socket will eventually close during the motion layer cleanup.
72907267 */
7291-
72927268 counter = 0 ;
72937269 while (counter < 10 )
72947270 {
72957271 counter ++ ;
7296- ret = sendto (sockfd , dummy_pkt , strlen (dummy_pkt ), 0 , rp -> ai_addr , rp -> ai_addrlen );
7272+ ret = sendto (ICSenderSocket , dummy_pkt , strlen (dummy_pkt ), 0 , ( struct sockaddr * ) & dest_addr , sizeof ( dest_addr ) );
72977273 if (ret < 0 )
72987274 {
72997275 if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK )
73007276 continue ;
73017277 else
73027278 {
73037279 elog (LOG , "send dummy packet failed, sendto failed: %m" );
7304- goto send_error ;
7280+ return ;
73057281 }
73067282 }
73077283 break ;
73087284 }
73097285
73107286 if (counter >= 10 )
73117287 {
7312- elog (LOG , "send dummy packet failed, sendto failed: %m" );
7313- goto send_error ;
7288+ elog (LOG , "send dummy packet failed, sendto failed with 10 times: %m" );
73147289 }
7315-
7316- pg_freeaddrinfo_all (hint .ai_family , addrs );
7317- closesocket (sockfd );
7318- return ;
7319-
7320- send_error :
7321-
7322- if (addrs )
7323- pg_freeaddrinfo_all (hint .ai_family , addrs );
7324- if (sockfd != -1 )
7325- closesocket (sockfd );
7326- return ;
73277290}
73287291
73297292void logChunkParseDetails (MotionConn * conn , uint32 ic_instance_id )
0 commit comments