Skip to content

Commit d70c0b2

Browse files
committed
cluster: replay up events queued during down handling
Previously, an on_up() signal that arrived while on_down_potentially_blocking() was still notifying policies, sessions, and listeners could be dropped. That left the host down even though the latest observed state was up. Queue a single pending up replay while down handling is active, then replay it after the down path clears. The replay is guarded by _node_down_event_revision so a newer down or remove event invalidates the older up instead of letting it mark the host up later. Reserve down handling before submitting executor work so an up arriving between submit and worker execution is queued deterministically. Use handling revisions to prevent stale async callbacks from clearing newer in-flight work. Duplicate down signals still avoid duplicate down notification and reconnector work. A later down only invalidates pending up replay. Unknown hosts from auth failures remain no-op unless the caller explicitly passes expect_host_to_be_down.
1 parent e2af311 commit d70c0b2

1 file changed

Lines changed: 197 additions & 20 deletions

File tree

cassandra/cluster.py

Lines changed: 197 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ def new_f(self, *args, **kwargs):
234234
try:
235235
future = self.executor.submit(f, self, *args, **kwargs)
236236
future.add_done_callback(_future_completed)
237+
return future
237238
except Exception:
238239
log.exception("Failed to submit task to executor")
239240

@@ -1870,7 +1871,61 @@ def _cleanup_failed_on_up_handling(self, host):
18701871

18711872
self._start_reconnector(host, is_host_addition=False)
18721873

1873-
def _on_up_future_completed(self, host, futures, results, lock, finished_future):
1874+
def _up_handling_was_superseded(self, host, up_handling_revision):
1875+
return (getattr(host, "_node_up_handling_revision", None) != up_handling_revision or
1876+
host._node_down_event_revision != up_handling_revision)
1877+
1878+
def _clear_up_handling(self, host, up_handling_revision=None):
1879+
if (up_handling_revision is not None and
1880+
getattr(host, "_node_up_handling_revision", None) != up_handling_revision):
1881+
return False
1882+
host._currently_handling_node_up = False
1883+
host._node_up_handling_revision = None
1884+
return True
1885+
1886+
def _cleanup_superseded_up_handling(self, host):
1887+
for session in tuple(self.sessions):
1888+
session.remove_pool(host)
1889+
1890+
def _pop_pending_node_up_if_ready(self, host):
1891+
if not host._pending_node_up:
1892+
return None
1893+
if host.is_up:
1894+
host._pending_node_up = False
1895+
host._pending_node_up_revision = None
1896+
return None
1897+
if host._currently_handling_node_up or host._currently_handling_node_down:
1898+
return None
1899+
1900+
pending_up_revision = host._pending_node_up_revision
1901+
# Leave the pending marker in place until on_up() reacquires host.lock so
1902+
# a newer down signal can still invalidate this replay.
1903+
return pending_up_revision
1904+
1905+
def _handle_pending_node_up(self, host, pending_up_revision):
1906+
if pending_up_revision is not None:
1907+
log.debug("Handling queued up status of node %s", host)
1908+
self.on_up(host, expected_down_event_revision=pending_up_revision)
1909+
1910+
def _clear_down_handling(self, host, down_event_revision=None):
1911+
if (down_event_revision is not None and
1912+
getattr(host, "_node_down_handling_revision", None) != down_event_revision):
1913+
return False
1914+
host._currently_handling_node_down = False
1915+
host._node_down_handling_revision = None
1916+
return True
1917+
1918+
def _finish_superseded_up_handling(self, host, up_handling_revision):
1919+
self._cleanup_superseded_up_handling(host)
1920+
1921+
pending_up_revision = None
1922+
with host.lock:
1923+
if self._clear_up_handling(host, up_handling_revision):
1924+
pending_up_revision = self._pop_pending_node_up_if_ready(host)
1925+
1926+
self._handle_pending_node_up(host, pending_up_revision)
1927+
1928+
def _on_up_future_completed(self, host, up_handling_revision, futures, results, lock, finished_future):
18741929
with lock:
18751930
futures.discard(finished_future)
18761931

@@ -1896,18 +1951,31 @@ def _on_up_future_completed(self, host, futures, results, lock, finished_future)
18961951

18971952
log.info("Connection pools established for node %s", host)
18981953
# mark the host as up and notify all listeners
1899-
host.set_up()
1954+
superseded = False
1955+
with host.lock:
1956+
if self._up_handling_was_superseded(host, up_handling_revision):
1957+
log.debug("Ignoring superseded up handling for node %s", host)
1958+
superseded = True
1959+
else:
1960+
host.set_up()
1961+
self._clear_up_handling(host, up_handling_revision)
1962+
if superseded:
1963+
self._finish_superseded_up_handling(host, up_handling_revision)
1964+
return
19001965
for listener in self.listeners:
19011966
listener.on_up(host)
19021967
finally:
1968+
pending_up_revision = None
19031969
with host.lock:
1904-
host._currently_handling_node_up = False
1970+
if self._clear_up_handling(host, up_handling_revision):
1971+
pending_up_revision = self._pop_pending_node_up_if_ready(host)
1972+
self._handle_pending_node_up(host, pending_up_revision)
19051973

19061974
# see if there are any pools to add or remove now that the host is marked up
19071975
for session in tuple(self.sessions):
19081976
session.update_created_pools()
19091977

1910-
def on_up(self, host):
1978+
def on_up(self, host, expected_down_event_revision=None):
19111979
"""
19121980
Intended for internal use only.
19131981
"""
@@ -1916,14 +1984,38 @@ def on_up(self, host):
19161984

19171985
log.debug("Waiting to acquire lock for handling up status of node %s", host)
19181986
with host.lock:
1987+
if (expected_down_event_revision is not None and
1988+
host._node_down_event_revision != expected_down_event_revision):
1989+
log.debug("Ignoring stale queued up handling for node %s", host)
1990+
return
1991+
1992+
if host._currently_handling_node_down:
1993+
log.debug("Down status is being handled for node %s; queueing up handling", host)
1994+
host._pending_node_up = True
1995+
host._pending_node_up_revision = host._node_down_event_revision
1996+
return
1997+
19191998
if host._currently_handling_node_up:
1920-
log.debug("Another thread is already handling up status of node %s", host)
1999+
up_handling_revision = getattr(host, "_node_up_handling_revision", None)
2000+
if self._up_handling_was_superseded(host, up_handling_revision):
2001+
log.debug("Superseded up handling is still finishing for node %s; "
2002+
"queueing up handling", host)
2003+
host._pending_node_up = True
2004+
host._pending_node_up_revision = host._node_down_event_revision
2005+
else:
2006+
log.debug("Another thread is already handling up status of node %s", host)
19212007
return
19222008

19232009
if host.is_up:
19242010
log.debug("Host %s was already marked up", host)
2011+
host._pending_node_up = False
2012+
host._pending_node_up_revision = None
19252013
return
19262014

2015+
host._pending_node_up = False
2016+
host._pending_node_up_revision = None
2017+
up_handling_revision = host._node_down_event_revision
2018+
host._node_up_handling_revision = up_handling_revision
19272019
host._currently_handling_node_up = True
19282020
log.debug("Starting to handle up status of node %s", host)
19292021

@@ -1953,7 +2045,8 @@ def on_up(self, host):
19532045
log.debug("Attempting to open new connection pools for host %s", host)
19542046
futures_lock = Lock()
19552047
futures_results = []
1956-
callback = partial(self._on_up_future_completed, host, futures, futures_results, futures_lock)
2048+
callback = partial(self._on_up_future_completed, host, up_handling_revision,
2049+
futures, futures_results, futures_lock)
19572050
for session in tuple(self.sessions):
19582051
future = session.add_or_renew_pool(host, is_host_addition=False)
19592052
if future is not None:
@@ -1967,14 +2060,24 @@ def on_up(self, host):
19672060

19682061
self._cleanup_failed_on_up_handling(host)
19692062

2063+
pending_up_revision = None
19702064
with host.lock:
1971-
host._currently_handling_node_up = False
2065+
if self._clear_up_handling(host, up_handling_revision):
2066+
pending_up_revision = self._pop_pending_node_up_if_ready(host)
2067+
self._handle_pending_node_up(host, pending_up_revision)
19722068
raise
19732069
else:
19742070
if not have_future:
2071+
superseded = False
19752072
with host.lock:
1976-
host.set_up()
1977-
host._currently_handling_node_up = False
2073+
if self._up_handling_was_superseded(host, up_handling_revision):
2074+
log.debug("Ignoring superseded up handling for node %s", host)
2075+
superseded = True
2076+
else:
2077+
host.set_up()
2078+
self._clear_up_handling(host, up_handling_revision)
2079+
if superseded:
2080+
self._finish_superseded_up_handling(host, up_handling_revision)
19782081

19792082
# for testing purposes
19802083
return futures
@@ -2004,16 +2107,59 @@ def _start_reconnector(self, host, is_host_addition):
20042107
reconnector.start()
20052108

20062109
@run_in_executor
2007-
def on_down_potentially_blocking(self, host, is_host_addition):
2008-
self.profile_manager.on_down(host)
2009-
self.control_connection.on_down(host)
2010-
for session in tuple(self.sessions):
2011-
session.on_down(host)
2110+
def on_down_potentially_blocking(self, host, is_host_addition, down_revision=None, down_event_revision=None):
2111+
handle_stale_down_event = False
2112+
pending_up_revision = None
2113+
with host.lock:
2114+
down_handling_revision = getattr(host, "_node_down_handling_revision", None)
2115+
owns_reserved_down_handling = (
2116+
down_event_revision is not None and
2117+
host._currently_handling_node_down and
2118+
down_handling_revision == down_event_revision
2119+
)
2120+
node_up_handling_revision = getattr(host, "_node_up_handling_revision", None)
2121+
stale_down_event = (
2122+
host.is_up or
2123+
(host._currently_handling_node_up and
2124+
(down_event_revision is None or
2125+
node_up_handling_revision is None or
2126+
down_event_revision <= node_up_handling_revision)) or
2127+
(down_revision is not None and host._state_revision != down_revision)
2128+
)
2129+
if stale_down_event:
2130+
log.debug("Ignoring stale down handling for host %s", host)
2131+
if owns_reserved_down_handling and self._clear_down_handling(host, down_event_revision):
2132+
pending_up_revision = self._pop_pending_node_up_if_ready(host)
2133+
handle_stale_down_event = True
2134+
elif host._currently_handling_node_down:
2135+
if not owns_reserved_down_handling:
2136+
log.debug("Another thread is already handling down status of node %s", host)
2137+
return
2138+
else:
2139+
host._currently_handling_node_down = True
2140+
host._node_down_handling_revision = down_event_revision
20122141

2013-
for listener in self.listeners:
2014-
listener.on_down(host)
2142+
if handle_stale_down_event:
2143+
self._handle_pending_node_up(host, pending_up_revision)
2144+
return
20152145

2016-
self._start_reconnector(host, is_host_addition)
2146+
try:
2147+
self.profile_manager.on_down(host)
2148+
self.control_connection.on_down(host)
2149+
for session in tuple(self.sessions):
2150+
session.on_down(host)
2151+
2152+
for listener in self.listeners:
2153+
listener.on_down(host)
2154+
2155+
self._start_reconnector(host, is_host_addition)
2156+
finally:
2157+
pending_up_revision = None
2158+
with host.lock:
2159+
if self._clear_down_handling(host, down_event_revision):
2160+
pending_up_revision = self._pop_pending_node_up_if_ready(host)
2161+
2162+
self._handle_pending_node_up(host, pending_up_revision)
20172163

20182164
def on_down(self, host, is_host_addition, expect_host_to_be_down=False):
20192165
"""
@@ -2037,12 +2183,39 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False):
20372183
if connected:
20382184
return
20392185

2186+
if not expect_host_to_be_down:
2187+
if was_up is False:
2188+
if host._pending_node_up:
2189+
host._node_down_event_revision += 1
2190+
host._pending_node_up = False
2191+
host._pending_node_up_revision = None
2192+
host.set_down()
2193+
return
2194+
if was_up is None:
2195+
return
2196+
2197+
host._node_down_event_revision += 1
2198+
host._pending_node_up = False
2199+
host._pending_node_up_revision = None
20402200
host.set_down()
2041-
if (not was_up and not expect_host_to_be_down) or host.is_currently_reconnecting():
2201+
down_revision = host._state_revision
2202+
down_event_revision = host._node_down_event_revision
2203+
if host.is_currently_reconnecting():
2204+
return
2205+
if host._currently_handling_node_down:
20422206
return
2207+
host._currently_handling_node_down = True
2208+
host._node_down_handling_revision = down_event_revision
20432209
log.warning("Host %s has been marked down", host)
20442210

2045-
self.on_down_potentially_blocking(host, is_host_addition)
2211+
future = self.on_down_potentially_blocking(
2212+
host, is_host_addition, down_revision, down_event_revision)
2213+
if future is None:
2214+
pending_up_revision = None
2215+
with host.lock:
2216+
if self._clear_down_handling(host, down_event_revision):
2217+
pending_up_revision = self._pop_pending_node_up_if_ready(host)
2218+
self._handle_pending_node_up(host, pending_up_revision)
20462219

20472220
def on_add(self, host, refresh_nodes=True):
20482221
if self.is_shutdown:
@@ -2119,7 +2292,11 @@ def on_remove(self, host):
21192292
return
21202293

21212294
log.debug("[cluster] Removing host %s", host)
2122-
host.set_down()
2295+
with host.lock:
2296+
host._node_down_event_revision += 1
2297+
host._pending_node_up = False
2298+
host._pending_node_up_revision = None
2299+
host.set_down()
21232300
self.profile_manager.on_remove(host)
21242301
for session in tuple(self.sessions):
21252302
session.on_remove(host)

0 commit comments

Comments
 (0)