Skip to content

Commit c2275c9

Browse files
committed
cluster: fence up pool creation by endpoint
1 parent 788e878 commit c2275c9

2 files changed

Lines changed: 51 additions & 8 deletions

File tree

cassandra/cluster.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2455,7 +2455,9 @@ def _clear_stale_reconnector():
24552455
futures, futures_results, futures_lock)
24562456
for session in tuple(self.sessions):
24572457
future = session.add_or_renew_pool(
2458-
host, is_host_addition=False, allow_retry_after_auth_failure=True)
2458+
host, is_host_addition=False,
2459+
allow_retry_after_auth_failure=True,
2460+
expected_endpoint=up_handling_endpoint)
24592461
if future is not None:
24602462
have_future = True
24612463
futures.add(future)
@@ -4076,7 +4078,9 @@ def _invalidate_pool_creation(self, host, expected_endpoint=None):
40764078
return True
40774079
return False
40784080

4079-
def add_or_renew_pool(self, host, is_host_addition, allow_retry_after_auth_failure=False):
4081+
def add_or_renew_pool(self, host, is_host_addition,
4082+
allow_retry_after_auth_failure=False,
4083+
expected_endpoint=None):
40804084
"""
40814085
For internal use only.
40824086
"""
@@ -4248,20 +4252,24 @@ def callback(pool, errors):
42484252
return True
42494253

42504254
with self._lock:
4255+
with host.lock:
4256+
creation_endpoint = host.endpoint
4257+
if (expected_endpoint is not None and
4258+
not self._endpoints_match(
4259+
creation_endpoint, expected_endpoint)):
4260+
return None
4261+
42514262
state = self._get_pool_creation_state(host)
42524263
if state.creation_epoch is not None:
4253-
with host.lock:
4254-
endpoint_changed = not self._endpoints_match(
4255-
host.endpoint, state.endpoint)
4264+
endpoint_changed = not self._endpoints_match(
4265+
creation_endpoint, state.endpoint)
42564266
if not endpoint_changed:
42574267
return state.future
42584268
self._invalidate_pool_creation(
42594269
host, expected_endpoint=state.endpoint)
42604270

42614271
creation_epoch = state.advance()
42624272
state.creation_epoch = creation_epoch
4263-
with host.lock:
4264-
creation_endpoint = host.endpoint
42654273
state.endpoint = creation_endpoint
42664274
future = self.submit(run_add_or_renew_pool)
42674275
if future is None:

tests/unit/test_cluster.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,40 @@ def make_pool(host, distance, pool_session, endpoint=None):
579579
created_pools[1].shutdown.assert_not_called()
580580
assert session._pools[host] is created_pools[1]
581581

582+
def test_on_up_does_not_publish_replacement_endpoint_pool_after_endpoint_swap(self):
583+
host = self._make_host("127.0.0.1")
584+
host.set_down()
585+
old_endpoint = host.endpoint
586+
new_endpoint = DefaultEndPoint("127.0.0.2")
587+
cluster, session, executor = self._make_cluster_and_session([host])
588+
created_pools = []
589+
590+
def make_pool(host, distance, pool_session, endpoint=None):
591+
pool = self._make_pool(host, distance, pool_session, endpoint)
592+
created_pools.append(pool)
593+
return pool
594+
595+
original_add_or_renew_pool = Session.add_or_renew_pool.__get__(
596+
session, Session)
597+
598+
def add_after_endpoint_swap(host, *args, **kwargs):
599+
host.endpoint = new_endpoint
600+
return original_add_or_renew_pool(host, *args, **kwargs)
601+
602+
session.add_or_renew_pool = Mock(side_effect=add_after_endpoint_swap)
603+
604+
with patch("cassandra.cluster.HostConnection", side_effect=make_pool):
605+
Cluster.on_up(cluster, host)
606+
while executor.submissions:
607+
executor.run_next()
608+
609+
assert created_pools == []
610+
assert session._pools == {}
611+
session.add_or_renew_pool.assert_called_once_with(
612+
host, is_host_addition=False,
613+
allow_retry_after_auth_failure=True,
614+
expected_endpoint=old_endpoint)
615+
582616
def test_pool_creation_publishes_before_endpoint_lock_is_released(self):
583617
host = self._make_host("127.0.0.1")
584618
new_endpoint = DefaultEndPoint("127.0.0.2")
@@ -1750,7 +1784,8 @@ def queue_up_then_fail(h):
17501784
cluster.control_connection.on_up.assert_called_once_with(host)
17511785
session.add_or_renew_pool.assert_called_once_with(
17521786
host, is_host_addition=False,
1753-
allow_retry_after_auth_failure=True)
1787+
allow_retry_after_auth_failure=True,
1788+
expected_endpoint=host.endpoint)
17541789
assert host.is_up
17551790
assert self._state(cluster, host).up_epoch is None
17561791
assert self._state(cluster, host).pending_up_epoch is None

0 commit comments

Comments
 (0)