2929from itertools import groupby , count , chain
3030import json
3131import logging
32- from typing import Any , Dict , Optional , Union
32+ from typing import Any , Dict , NamedTuple , Optional , Union
3333from warnings import warn
3434from random import random
3535import re
@@ -234,6 +234,7 @@ def new_f(self, *args, **kwargs):
234234 try :
235235 future = self .executor .submit (f , self , * args , ** kwargs )
236236 future .add_done_callback (_future_completed )
237+ return future
237238 except Exception :
238239 log .exception ("Failed to submit task to executor" )
239240
@@ -251,6 +252,15 @@ def _discard_cluster_shutdown(cluster):
251252 _clusters_for_shutdown .discard (cluster )
252253
253254
255+ class _StaleDownProtection (NamedTuple ):
256+ # Snapshot captured when a down event is scheduled, so delayed executor
257+ # handling can ignore work that was superseded by newer host state.
258+ # down_event_revision is None only for unreserved direct worker calls;
259+ # then the worker must not clear a handling slot owned by a newer event.
260+ host_state_revision : int
261+ down_event_revision : Optional [int ]
262+
263+
254264def _shutdown_clusters ():
255265 clusters = _clusters_for_shutdown .copy () # copy because shutdown modifies the global set "discard"
256266 for cluster in clusters :
@@ -1870,7 +1880,61 @@ def _cleanup_failed_on_up_handling(self, host):
18701880
18711881 self ._start_reconnector (host , is_host_addition = False )
18721882
1873- def _on_up_future_completed (self , host , futures , results , lock , finished_future ):
1883+ def _up_handling_was_superseded (self , host , up_handling_revision ):
1884+ return (getattr (host , "_node_up_handling_revision" , None ) != up_handling_revision or
1885+ host ._node_down_event_revision != up_handling_revision )
1886+
1887+ def _clear_up_handling (self , host , up_handling_revision = None ):
1888+ if (up_handling_revision is not None and
1889+ getattr (host , "_node_up_handling_revision" , None ) != up_handling_revision ):
1890+ return False
1891+ host ._currently_handling_node_up = False
1892+ host ._node_up_handling_revision = None
1893+ return True
1894+
1895+ def _cleanup_superseded_up_handling (self , host ):
1896+ for session in tuple (self .sessions ):
1897+ session .remove_pool (host )
1898+
1899+ def _pop_pending_node_up_if_ready (self , host ):
1900+ if not host ._pending_node_up :
1901+ return None
1902+ if host .is_up :
1903+ host ._pending_node_up = False
1904+ host ._pending_node_up_revision = None
1905+ return None
1906+ if host ._currently_handling_node_up or host ._currently_handling_node_down :
1907+ return None
1908+
1909+ pending_up_revision = host ._pending_node_up_revision
1910+ # Leave the pending marker in place until on_up() reacquires host.lock so
1911+ # a newer down signal can still invalidate this replay.
1912+ return pending_up_revision
1913+
1914+ def _handle_pending_node_up (self , host , pending_up_revision ):
1915+ if pending_up_revision is not None :
1916+ log .debug ("Handling queued up status of node %s" , host )
1917+ self .on_up (host , expected_down_event_revision = pending_up_revision )
1918+
1919+ def _clear_down_handling (self , host , down_event_revision = None ):
1920+ if (down_event_revision is not None and
1921+ getattr (host , "_node_down_handling_revision" , None ) != down_event_revision ):
1922+ return False
1923+ host ._currently_handling_node_down = False
1924+ host ._node_down_handling_revision = None
1925+ return True
1926+
1927+ def _finish_superseded_up_handling (self , host , up_handling_revision ):
1928+ self ._cleanup_superseded_up_handling (host )
1929+
1930+ pending_up_revision = None
1931+ with host .lock :
1932+ if self ._clear_up_handling (host , up_handling_revision ):
1933+ pending_up_revision = self ._pop_pending_node_up_if_ready (host )
1934+
1935+ self ._handle_pending_node_up (host , pending_up_revision )
1936+
1937+ def _on_up_future_completed (self , host , up_handling_revision , futures , results , lock , finished_future ):
18741938 with lock :
18751939 futures .discard (finished_future )
18761940
@@ -1896,18 +1960,32 @@ def _on_up_future_completed(self, host, futures, results, lock, finished_future)
18961960
18971961 log .info ("Connection pools established for node %s" , host )
18981962 # mark the host as up and notify all listeners
1899- host .set_up ()
1963+ superseded = False
1964+ with host .lock :
1965+ if self ._up_handling_was_superseded (host , up_handling_revision ):
1966+ log .debug ("Ignoring superseded up handling for node %s" , host )
1967+ superseded = True
1968+ else :
1969+ host .set_up ()
1970+ self ._clear_up_handling (host , up_handling_revision )
1971+ if superseded :
1972+ self ._finish_superseded_up_handling (host , up_handling_revision )
1973+ return
19001974 for listener in self .listeners :
19011975 listener .on_up (host )
19021976 finally :
1977+ pending_up_revision = None
19031978 with host .lock :
1904- host ._currently_handling_node_up = False
1979+ if self ._clear_up_handling (host , up_handling_revision ):
1980+ pending_up_revision = self ._pop_pending_node_up_if_ready (host )
1981+ self ._handle_pending_node_up (host , pending_up_revision )
19051982
19061983 # see if there are any pools to add or remove now that the host is marked up
19071984 for session in tuple (self .sessions ):
19081985 session .update_created_pools ()
1986+ return
19091987
1910- def on_up (self , host ):
1988+ def on_up (self , host , expected_down_event_revision = None ):
19111989 """
19121990 Intended for internal use only.
19131991 """
@@ -1916,14 +1994,38 @@ def on_up(self, host):
19161994
19171995 log .debug ("Waiting to acquire lock for handling up status of node %s" , host )
19181996 with host .lock :
1997+ if (expected_down_event_revision is not None and
1998+ host ._node_down_event_revision != expected_down_event_revision ):
1999+ log .debug ("Ignoring stale queued up handling for node %s" , host )
2000+ return
2001+
2002+ if host ._currently_handling_node_down :
2003+ log .debug ("Down status is being handled for node %s; queueing up handling" , host )
2004+ host ._pending_node_up = True
2005+ host ._pending_node_up_revision = host ._node_down_event_revision
2006+ return
2007+
19192008 if host ._currently_handling_node_up :
1920- log .debug ("Another thread is already handling up status of node %s" , host )
2009+ up_handling_revision = getattr (host , "_node_up_handling_revision" , None )
2010+ if self ._up_handling_was_superseded (host , up_handling_revision ):
2011+ log .debug ("Superseded up handling is still finishing for node %s; "
2012+ "queueing up handling" , host )
2013+ host ._pending_node_up = True
2014+ host ._pending_node_up_revision = host ._node_down_event_revision
2015+ else :
2016+ log .debug ("Another thread is already handling up status of node %s" , host )
19212017 return
19222018
19232019 if host .is_up :
19242020 log .debug ("Host %s was already marked up" , host )
2021+ host ._pending_node_up = False
2022+ host ._pending_node_up_revision = None
19252023 return
19262024
2025+ host ._pending_node_up = False
2026+ host ._pending_node_up_revision = None
2027+ up_handling_revision = host ._node_down_event_revision
2028+ host ._node_up_handling_revision = up_handling_revision
19272029 host ._currently_handling_node_up = True
19282030 log .debug ("Starting to handle up status of node %s" , host )
19292031
@@ -1953,7 +2055,8 @@ def on_up(self, host):
19532055 log .debug ("Attempting to open new connection pools for host %s" , host )
19542056 futures_lock = Lock ()
19552057 futures_results = []
1956- callback = partial (self ._on_up_future_completed , host , futures , futures_results , futures_lock )
2058+ callback = partial (self ._on_up_future_completed , host , up_handling_revision ,
2059+ futures , futures_results , futures_lock )
19572060 for session in tuple (self .sessions ):
19582061 future = session .add_or_renew_pool (host , is_host_addition = False )
19592062 if future is not None :
@@ -1967,14 +2070,24 @@ def on_up(self, host):
19672070
19682071 self ._cleanup_failed_on_up_handling (host )
19692072
2073+ pending_up_revision = None
19702074 with host .lock :
1971- host ._currently_handling_node_up = False
2075+ if self ._clear_up_handling (host , up_handling_revision ):
2076+ pending_up_revision = self ._pop_pending_node_up_if_ready (host )
2077+ self ._handle_pending_node_up (host , pending_up_revision )
19722078 raise
19732079 else :
19742080 if not have_future :
2081+ superseded = False
19752082 with host .lock :
1976- host .set_up ()
1977- host ._currently_handling_node_up = False
2083+ if self ._up_handling_was_superseded (host , up_handling_revision ):
2084+ log .debug ("Ignoring superseded up handling for node %s" , host )
2085+ superseded = True
2086+ else :
2087+ host .set_up ()
2088+ self ._clear_up_handling (host , up_handling_revision )
2089+ if superseded :
2090+ self ._finish_superseded_up_handling (host , up_handling_revision )
19782091
19792092 # for testing purposes
19802093 return futures
@@ -2004,16 +2117,65 @@ def _start_reconnector(self, host, is_host_addition):
20042117 reconnector .start ()
20052118
20062119 @run_in_executor
2007- def on_down_potentially_blocking (self , host , is_host_addition ):
2008- self .profile_manager .on_down (host )
2009- self .control_connection .on_down (host )
2010- for session in tuple (self .sessions ):
2011- session .on_down (host )
2120+ def on_down_potentially_blocking (
2121+ self , host : Host , is_host_addition : bool ,
2122+ stale_down_protection : Optional [_StaleDownProtection ] = None ) -> Any :
2123+ handle_stale_down_event = False
2124+ pending_up_revision = None
2125+ down_event_revision = None
2126+ if stale_down_protection is not None :
2127+ down_event_revision = stale_down_protection .down_event_revision
2128+ with host .lock :
2129+ down_handling_revision = getattr (host , "_node_down_handling_revision" , None )
2130+ owns_reserved_down_handling = (
2131+ down_event_revision is not None and
2132+ host ._currently_handling_node_down and
2133+ down_handling_revision == down_event_revision
2134+ )
2135+ node_up_handling_revision = getattr (host , "_node_up_handling_revision" , None )
2136+ stale_down_event = (
2137+ host .is_up or
2138+ (host ._currently_handling_node_up and
2139+ (down_event_revision is None or
2140+ node_up_handling_revision is None or
2141+ down_event_revision <= node_up_handling_revision )) or
2142+ (stale_down_protection is not None and
2143+ host ._state_revision != stale_down_protection .host_state_revision )
2144+ )
2145+ if stale_down_event :
2146+ log .debug ("Ignoring stale down handling for host %s" , host )
2147+ if owns_reserved_down_handling and self ._clear_down_handling (host , down_event_revision ):
2148+ pending_up_revision = self ._pop_pending_node_up_if_ready (host )
2149+ handle_stale_down_event = True
2150+ elif host ._currently_handling_node_down :
2151+ if not owns_reserved_down_handling :
2152+ log .debug ("Another thread is already handling down status of node %s" , host )
2153+ return
2154+ else :
2155+ host ._currently_handling_node_down = True
2156+ host ._node_down_handling_revision = down_event_revision
20122157
2013- for listener in self .listeners :
2014- listener .on_down (host )
2158+ if handle_stale_down_event :
2159+ self ._handle_pending_node_up (host , pending_up_revision )
2160+ return
20152161
2016- self ._start_reconnector (host , is_host_addition )
2162+ try :
2163+ self .profile_manager .on_down (host )
2164+ self .control_connection .on_down (host )
2165+ for session in tuple (self .sessions ):
2166+ session .on_down (host )
2167+
2168+ for listener in self .listeners :
2169+ listener .on_down (host )
2170+
2171+ self ._start_reconnector (host , is_host_addition )
2172+ finally :
2173+ pending_up_revision = None
2174+ with host .lock :
2175+ if self ._clear_down_handling (host , down_event_revision ):
2176+ pending_up_revision = self ._pop_pending_node_up_if_ready (host )
2177+
2178+ self ._handle_pending_node_up (host , pending_up_revision )
20172179
20182180 def on_down (self , host , is_host_addition , expect_host_to_be_down = False ):
20192181 """
@@ -2037,12 +2199,40 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False):
20372199 if connected :
20382200 return
20392201
2202+ if not expect_host_to_be_down :
2203+ if was_up is False :
2204+ if host ._pending_node_up :
2205+ host ._node_down_event_revision += 1
2206+ host ._pending_node_up = False
2207+ host ._pending_node_up_revision = None
2208+ host .set_down ()
2209+ return
2210+ if was_up is None :
2211+ return
2212+
2213+ host ._node_down_event_revision += 1
2214+ host ._pending_node_up = False
2215+ host ._pending_node_up_revision = None
20402216 host .set_down ()
2041- if (not was_up and not expect_host_to_be_down ) or host .is_currently_reconnecting ():
2217+ stale_down_protection = _StaleDownProtection (
2218+ host_state_revision = host ._state_revision ,
2219+ down_event_revision = host ._node_down_event_revision )
2220+ if host .is_currently_reconnecting ():
2221+ return
2222+ if host ._currently_handling_node_down :
20422223 return
2224+ host ._currently_handling_node_down = True
2225+ host ._node_down_handling_revision = stale_down_protection .down_event_revision
20432226 log .warning ("Host %s has been marked down" , host )
20442227
2045- self .on_down_potentially_blocking (host , is_host_addition )
2228+ future = self .on_down_potentially_blocking (
2229+ host , is_host_addition , stale_down_protection )
2230+ if future is None :
2231+ pending_up_revision = None
2232+ with host .lock :
2233+ if self ._clear_down_handling (host , stale_down_protection .down_event_revision ):
2234+ pending_up_revision = self ._pop_pending_node_up_if_ready (host )
2235+ self ._handle_pending_node_up (host , pending_up_revision )
20462236
20472237 def on_add (self , host , refresh_nodes = True ):
20482238 if self .is_shutdown :
@@ -2119,7 +2309,11 @@ def on_remove(self, host):
21192309 return
21202310
21212311 log .debug ("[cluster] Removing host %s" , host )
2122- host .set_down ()
2312+ with host .lock :
2313+ host ._node_down_event_revision += 1
2314+ host ._pending_node_up = False
2315+ host ._pending_node_up_revision = None
2316+ host .set_down ()
21232317 self .profile_manager .on_remove (host )
21242318 for session in tuple (self .sessions ):
21252319 session .on_remove (host )
0 commit comments