@@ -3,15 +3,23 @@ use std::{
33 net:: { IpAddr , SocketAddr , SocketAddrV4 , SocketAddrV6 } ,
44 num:: NonZeroUsize ,
55 pin:: Pin ,
6- sync:: Arc ,
6+ sync:: { Arc , Mutex } ,
77 task:: { Context , Poll } ,
8+ time:: Duration ,
89} ;
910
1011use ipnet:: { Ipv4Net , Ipv6Net } ;
12+ use n0_future:: task:: AbortOnDropHandle ;
1113use n0_watcher:: Watchable ;
1214use netwatch:: { UdpSender , UdpSocket } ;
1315use pin_project:: pin_project;
14- use tracing:: { debug, info, trace} ;
16+ use tokio:: time;
17+ use tracing:: { debug, info, trace, warn} ;
18+
19+ /// Total budget for retrying a rebind that fails with `EADDRINUSE`.
20+ const REBIND_RETRY_ATTEMPTS : u32 = 12 ;
21+ /// Delay between rebind attempts that failed with `EADDRINUSE`.
22+ const REBIND_RETRY_DELAY : Duration = Duration :: from_millis ( 250 ) ;
1523
1624use super :: { Addr , Transmit } ;
1725use crate :: metrics:: { EndpointMetrics , SocketMetrics } ;
@@ -242,6 +250,7 @@ impl IpTransport {
242250 IpNetworkChangeSender {
243251 socket : self . socket . clone ( ) ,
244252 local_addr : self . local_addr . clone ( ) ,
253+ rebind_task : Default :: default ( ) ,
245254 }
246255 }
247256
@@ -259,17 +268,58 @@ impl IpTransport {
259268pub ( super ) struct IpNetworkChangeSender {
260269 socket : Arc < UdpSocket > ,
261270 local_addr : Watchable < SocketAddr > ,
271+ rebind_task : Mutex < Option < AbortOnDropHandle < ( ) > > > ,
262272}
263273
264274impl IpNetworkChangeSender {
265275 pub ( super ) fn rebind ( & self ) -> io:: Result < ( ) > {
266276 let old_addr = self . local_addr . get ( ) ;
267- self . socket . rebind ( ) ?;
268- let addr = self . socket . local_addr ( ) ?;
269- self . local_addr . set ( addr) . ok ( ) ;
270- trace ! ( "rebound from {} to {}" , old_addr, addr) ;
271-
272- Ok ( ( ) )
277+ // Clear any previous rebind task.
278+ let mut rebind_task = self . rebind_task . lock ( ) . expect ( "poisoned" ) ;
279+ * rebind_task = None ;
280+ // Try to rebind immediately.
281+ match self . socket . rebind ( ) {
282+ Ok ( ( ) ) => {
283+ let addr = self . socket . local_addr ( ) ?;
284+ self . local_addr . set ( addr) . ok ( ) ;
285+ trace ! ( "rebound from {} to {}" , old_addr, addr) ;
286+ Ok ( ( ) )
287+ }
288+ Err ( err) if err. kind ( ) == io:: ErrorKind :: AddrInUse => {
289+ let socket = self . socket . clone ( ) ;
290+ let local_addr = self . local_addr . clone ( ) ;
291+ let fut = async move {
292+ let mut attempt = 0 ;
293+ loop {
294+ match socket. rebind ( ) {
295+ Ok ( ( ) ) => break ,
296+ Err ( err)
297+ if err. kind ( ) == io:: ErrorKind :: AddrInUse
298+ && attempt < REBIND_RETRY_ATTEMPTS =>
299+ {
300+ attempt += 1 ;
301+ debug ! (
302+ ?err,
303+ attempt, "rebind hit EADDRINUSE on {old_addr}, retrying"
304+ ) ;
305+ time:: sleep ( REBIND_RETRY_DELAY ) . await ;
306+ }
307+ Err ( err) => {
308+ warn ! ( "rebinding IP transport failed: {err:#}" ) ;
309+ return ;
310+ }
311+ }
312+ }
313+ if let Ok ( addr) = socket. local_addr ( ) {
314+ local_addr. set ( addr) . ok ( ) ;
315+ trace ! ( "rebound from {} to {}" , old_addr, addr) ;
316+ }
317+ } ;
318+ * rebind_task = Some ( AbortOnDropHandle :: new ( n0_future:: task:: spawn ( fut) ) ) ;
319+ Ok ( ( ) )
320+ }
321+ Err ( err) => Err ( err) ,
322+ }
273323 }
274324
275325 pub ( super ) fn on_network_change ( & self , _info : & crate :: socket:: Report ) {
0 commit comments