Skip to content

Commit 37a8146

Browse files
committed
pool: ignore failures from replaced pools
1 parent a2a26ca commit 37a8146

2 files changed

Lines changed: 87 additions & 17 deletions

File tree

cassandra/pool.py

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -613,23 +613,8 @@ def return_connection(self, connection, stream_was_orphaned=False):
613613
if not connection.signaled_error:
614614
log.debug("Defunct or closed connection (%s) returned to pool, potentially "
615615
"marking host %s as down", id(connection), self.host)
616-
with self.host.lock:
617-
endpoint_matches = _endpoints_match(
618-
self._session.cluster, self.host.endpoint, self.endpoint)
619-
host_is_current = (endpoint_matches and
620-
_host_is_current_for_endpoint(
621-
self._session.cluster, self.host,
622-
self.endpoint))
623-
if not endpoint_matches:
624-
log.debug("Ignoring stale connection failure for host %s; endpoint changed from %s",
625-
self.host, self.endpoint)
626-
stale_endpoint_failure = True
627-
elif not host_is_current:
628-
log.debug("Ignoring stale connection failure for host %s; endpoint reassigned from %s",
629-
self.host, self.endpoint)
630-
stale_endpoint_failure = True
631-
else:
632-
is_down = self.host.signal_connection_failure(connection.last_error)
616+
is_down, stale_endpoint_failure = \
617+
self._signal_connection_failure_if_current(connection)
633618
connection.signaled_error = True
634619

635620
if stale_endpoint_failure:
@@ -683,6 +668,52 @@ def on_orphaned_stream_released(self):
683668
with self._stream_available_condition:
684669
self._stream_available_condition.notify()
685670

671+
def _is_current_pool_for_endpoint_locked(self, expected_endpoint):
672+
pools = getattr(self._session, "_pools", None)
673+
if not isinstance(pools, dict):
674+
return True
675+
676+
for pool_host, host_pool in pools.items():
677+
if pool_host is not self.host or host_pool is not self:
678+
continue
679+
pool_endpoint = getattr(host_pool, 'endpoint', None)
680+
if pool_endpoint is None:
681+
pool_endpoint = host_pool.host.endpoint
682+
return _endpoints_match(
683+
self._session.cluster, pool_endpoint, expected_endpoint)
684+
return False
685+
686+
def _signal_connection_failure_if_current(self, connection):
687+
session_lock = getattr(self._session, "_lock", None)
688+
if session_lock is None:
689+
with self.host.lock:
690+
return self._signal_connection_failure_locked(connection)
691+
692+
with session_lock:
693+
with self.host.lock:
694+
if not self._is_current_pool_for_endpoint_locked(self.endpoint):
695+
log.debug("Ignoring stale connection failure for host %s; pool was replaced for %s",
696+
self.host, self.endpoint)
697+
return False, True
698+
return self._signal_connection_failure_locked(connection)
699+
700+
def _signal_connection_failure_locked(self, connection):
701+
endpoint_matches = _endpoints_match(
702+
self._session.cluster, self.host.endpoint, self.endpoint)
703+
host_is_current = (endpoint_matches and
704+
_host_is_current_for_endpoint(
705+
self._session.cluster, self.host,
706+
self.endpoint))
707+
if not endpoint_matches:
708+
log.debug("Ignoring stale connection failure for host %s; endpoint changed from %s",
709+
self.host, self.endpoint)
710+
return False, True
711+
if not host_is_current:
712+
log.debug("Ignoring stale connection failure for host %s; endpoint reassigned from %s",
713+
self.host, self.endpoint)
714+
return False, True
715+
return self.host.signal_connection_failure(connection.last_error), False
716+
686717
def _remove_stale_pool(self, expected_endpoint):
687718
future = self._session.remove_pool(
688719
self.host, expected_host=self.host,

tests/unit/test_host_connection_pool.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,45 @@ def test_return_defunct_connection_after_endpoint_reassignment_is_ignored(self):
296296
conn.close.assert_called_once_with()
297297
assert not pool.is_shutdown
298298

299+
def test_return_defunct_connection_from_removed_pool_after_endpoint_flip_back_is_ignored(self):
300+
endpoint = DefaultEndPoint('127.0.0.1')
301+
host = Host(endpoint, SimpleConvictionPolicy, host_id=uuid.uuid4())
302+
host.signal_connection_failure = Mock(return_value=True)
303+
304+
session = self.make_session()
305+
session._lock = Lock()
306+
session._pools = {}
307+
session.remove_pool.return_value = None
308+
session.cluster.metadata = Metadata()
309+
session.cluster.metadata.add_or_return_host(host)
310+
session.cluster._endpoints_match.side_effect = Cluster._endpoints_match
311+
conn = HashableMock(spec=Connection, in_flight=0, is_defunct=False,
312+
is_closed=False, max_request_id=100,
313+
signaled_error=False,
314+
orphaned_threshold_reached=False)
315+
session.cluster.connection_factory.return_value = conn
316+
317+
stale_pool = self.PoolImpl(host, HostDistance.LOCAL, session)
318+
stale_pool.borrow_connection(timeout=0.01)
319+
320+
# The old pool was already removed, then the host endpoint flipped back
321+
# to the same endpoint and a replacement pool became current.
322+
current_pool = Mock()
323+
current_pool.host = host
324+
current_pool.endpoint = endpoint
325+
session._pools[host] = current_pool
326+
327+
conn.is_defunct = True
328+
stale_pool.return_connection(conn)
329+
330+
host.signal_connection_failure.assert_not_called()
331+
session.cluster.on_down.assert_not_called()
332+
session.remove_pool.assert_called_once_with(
333+
host, expected_host=host, expected_endpoint=endpoint,
334+
expected_pool=stale_pool)
335+
conn.close.assert_called_once_with()
336+
assert not stale_pool.is_shutdown
337+
299338
def test_host_instantiations(self):
300339
"""
301340
Ensure Host fails if not initialized properly

0 commit comments

Comments
 (0)