Skip to content

Commit 638fa6d

Browse files
committed
session: fix pool renewal race causing double statement execution
When two or more nodes are bootstrapped concurrently the Python driver can execute the same CQL statement twice, causing spurious "already exists" errors in the caller. This has been observed as flaky test failures across the ScyllaDB test suite for the past two years, and worked around by using idempotent DDL forms (IF NOT EXISTS / IF EXISTS) in dozens of tests. Root cause ---------- The race unfolds as follows: 1. Two on_add notifications arrive at roughly the same time, one for each new node. Each one calls session.add_or_renew_pool(), which submits run_add_or_renew_pool() to the thread pool and returns. Both submissions are in-flight concurrently. 2. The first add_or_renew_pool() finishes and calls _finalize_add(), which notifies load-balancing policies and then calls session.update_created_pools() for every live session. 3. update_created_pools() iterates all known hosts. For the second host, whose run_add_or_renew_pool() has not yet completed, it sees self._pools.get(host) == None (or a shut-down pool) and therefore submits *another* run_add_or_renew_pool() for that host. 4. Now two tasks are connecting to the same host. The first one finishes and installs pool-A in self._pools, then runs a statement (e.g. CREATE ROLE) that is in-flight on pool-A. 5. The second task finishes, reads the stale `previous = self._pools.get(host)` value (captured *before* the lock was taken — another bug), installs pool-B and then shuts down pool-A. The in-flight CREATE ROLE request is orphaned; the driver retries it on pool-B. The server executes it a second time and returns "Role ... already exists". Fix --- Three coordinated changes to cassandra/cluster.py: * Session.__init__: add self._pending_pool_futures = {}, a dict mapping host -> Future for any in-flight pool creation, guarded by _lock. * add_or_renew_pool: before submitting run_add_or_renew_pool(), check _pending_pool_futures under _lock. If an in-flight future already exists for the host, return it immediately — this is the primary fix that prevents the duplicate submission from update_created_pools. Additionally, move the `previous = self._pools.get(host)` read inside the lock so the live-pool check is atomic with the installation of the new pool: if a concurrent creation has already installed a live pool by the time we finish connecting, discard our new pool instead of replacing the live one (defense-in-depth). In all exit paths, remove the host from _pending_pool_futures once the future is done. * remove_pool: clear _pending_pool_futures[host] under _lock so that if a host is removed and immediately re-added, add_or_renew_pool submits a fresh creation rather than reusing a stale done future. Tests ----- Three new unit tests are added in PoolRenewalRaceTest (tests/unit/test_cluster.py). They exercise the new code paths without requiring a real cluster connection by constructing a minimal Session via object.__new__ and mocking the executor and profile manager: * test_add_or_renew_pool_reuses_inflight_future: places a pending Future in _pending_pool_futures and verifies that add_or_renew_pool returns it without submitting a new task to the executor. * test_add_or_renew_pool_discards_duplicate_when_live_pool_exists: directly exercises the critical section that must discard a newly connected pool when a live pool is already present. * test_remove_pool_clears_pending_future: verifies that remove_pool clears _pending_pool_futures so the next add_or_renew_pool call submits a fresh task. Fixes: #317 Signed-off-by: Nadav Har'El <nyh@scylladb.com>
1 parent cd9f525 commit 638fa6d

2 files changed

Lines changed: 170 additions & 3 deletions

File tree

cassandra/cluster.py

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2615,6 +2615,12 @@ def __init__(self, cluster, hosts, keyspace=None):
26152615

26162616
self._lock = RLock()
26172617
self._pools = {}
2618+
# Tracks in-flight pool creation futures keyed by host, guarded by
2619+
# _lock. Used by add_or_renew_pool to detect and reuse concurrent
2620+
# creations so that update_created_pools does not schedule a duplicate
2621+
# run_add_or_renew_pool for a host whose pool creation is already
2622+
# in-flight (scylladb/python-driver#317).
2623+
self._pending_pool_futures = {}
26182624
self._profile_manager = cluster.profile_manager
26192625
self._metrics = cluster.metrics
26202626
self._request_init_callbacks = []
@@ -3246,6 +3252,8 @@ def run_add_or_renew_pool():
32463252
except AuthenticationFailed as auth_exc:
32473253
conn_exc = ConnectionException(str(auth_exc), endpoint=host)
32483254
self.cluster.signal_connection_failure(host, conn_exc, is_host_addition)
3255+
with self._lock:
3256+
self._pending_pool_futures.pop(host, None)
32493257
return False
32503258
except Exception as conn_exc:
32513259
log.warning("Failed to create connection pool for new host %s:",
@@ -3254,9 +3262,10 @@ def run_add_or_renew_pool():
32543262
# a special flag to make sure the reconnector is created
32553263
self.cluster.signal_connection_failure(
32563264
host, conn_exc, is_host_addition, expect_host_to_be_down=True)
3265+
with self._lock:
3266+
self._pending_pool_futures.pop(host, None)
32573267
return False
32583268

3259-
previous = self._pools.get(host)
32603269
with self._lock:
32613270
while new_pool._keyspace != self.keyspace:
32623271
self._lock.release()
@@ -3274,20 +3283,57 @@ def callback(pool, errors):
32743283
self.cluster.on_down(host, is_host_addition)
32753284
new_pool.shutdown()
32763285
self._lock.acquire()
3286+
self._pending_pool_futures.pop(host, None)
32773287
return False
32783288
self._lock.acquire()
3289+
3290+
# Read the current pool state inside the lock so the check is
3291+
# atomic with the installation of our new pool.
3292+
previous = self._pools.get(host)
3293+
if previous is not None and not previous.is_shutdown:
3294+
# A concurrent add_or_renew_pool already installed a live
3295+
# pool for this host while we were connecting. Discard our
3296+
# new pool to avoid replacing it and dropping in-flight
3297+
# requests (scylladb/python-driver#317).
3298+
log.debug("Discarding duplicate connection pool for host %s "
3299+
"(live pool already present)", host)
3300+
self._pending_pool_futures.pop(host, None)
3301+
new_pool.shutdown()
3302+
return True
3303+
32793304
self._pools[host] = new_pool
3305+
self._pending_pool_futures.pop(host, None)
32803306

32813307
log.debug("Added pool for host %s to session", host)
32823308
if previous:
32833309
previous.shutdown()
32843310

32853311
return True
32863312

3287-
return self.submit(run_add_or_renew_pool)
3313+
with self._lock:
3314+
if self.is_shutdown:
3315+
return None
3316+
# If there is already an in-flight pool creation for this host,
3317+
# return that future instead of scheduling a duplicate. This
3318+
# prevents update_created_pools from creating a second pool when
3319+
# the first one has not yet finished connecting
3320+
# (scylladb/python-driver#317).
3321+
pending = self._pending_pool_futures.get(host)
3322+
if pending is not None and not pending.done():
3323+
log.debug("Reusing in-flight pool creation for host %s", host)
3324+
return pending
3325+
future = self.submit(run_add_or_renew_pool)
3326+
if future is not None:
3327+
self._pending_pool_futures[host] = future
3328+
return future
32883329

32893330
def remove_pool(self, host):
3290-
pool = self._pools.pop(host, None)
3331+
with self._lock:
3332+
pool = self._pools.pop(host, None)
3333+
# Invalidate any in-flight pool creation for this host so that a
3334+
# subsequent update_created_pools call can schedule a fresh one if
3335+
# needed (scylladb/python-driver#317).
3336+
self._pending_pool_futures.pop(host, None)
32913337
if pool:
32923338
log.debug("Removed connection pool for %r", host)
32933339
return self.submit(pool.shutdown)

tests/unit/test_cluster.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
import logging
1717
import socket
18+
from concurrent.futures import Future
19+
from threading import RLock
1820

1921
from unittest.mock import patch, Mock
2022
import uuid
@@ -339,6 +341,125 @@ def test_set_keyspace_escapes_quotes(self, *_):
339341
assert query == 'USE simple_ks', (
340342
"Simple keyspace names should not be quoted, got: %r" % query)
341343

344+
345+
class PoolRenewalRaceTest(unittest.TestCase):
346+
"""
347+
Regression tests for scylladb/python-driver#317: connection pool renewal
348+
after concurrent node bootstraps causes double statement execution.
349+
"""
350+
351+
def _make_session(self):
352+
"""
353+
Return a minimal Session with the attributes needed to exercise
354+
add_or_renew_pool / remove_pool, without actually opening any network
355+
connections.
356+
"""
357+
s = object.__new__(Session)
358+
s._lock = RLock()
359+
s._pools = {}
360+
s._pending_pool_futures = {}
361+
s.is_shutdown = False
362+
s.keyspace = None
363+
s._profile_manager = Mock()
364+
s._profile_manager.distance.return_value = HostDistance.LOCAL
365+
s.cluster = Mock()
366+
s.cluster.executor = Mock()
367+
# submit() delegates to cluster.executor.submit; return a done future
368+
# by default so callers that inspect the result don't hang.
369+
done_future = Future()
370+
done_future.set_result(True)
371+
s.cluster.executor.submit.return_value = done_future
372+
return s
373+
374+
def test_add_or_renew_pool_reuses_inflight_future(self):
375+
"""
376+
When add_or_renew_pool is called for a host that already has an
377+
in-flight pool creation (tracked in _pending_pool_futures), it must
378+
return the existing future instead of submitting a duplicate task.
379+
Without this fix, a concurrent call from update_created_pools would
380+
create a second HostConnection pool, then shut down the first one
381+
while requests were still in-flight, causing those requests to be
382+
retried and executed twice on the server side.
383+
"""
384+
s = self._make_session()
385+
host = Mock()
386+
host.is_up = True
387+
388+
# Simulate an in-flight pool creation by placing a pending (not-yet-
389+
# resolved) future directly in _pending_pool_futures.
390+
inflight_future = Future() # not set_result yet → still in-flight
391+
s._pending_pool_futures[host] = inflight_future
392+
393+
returned = s.add_or_renew_pool(host, is_host_addition=False)
394+
395+
# The call must reuse the existing in-flight future, not submit a new one.
396+
assert returned is inflight_future, (
397+
"add_or_renew_pool should return the existing in-flight future, "
398+
"not create a duplicate pool creation task"
399+
)
400+
s.cluster.executor.submit.assert_not_called()
401+
402+
def test_add_or_renew_pool_discards_duplicate_when_live_pool_exists(self):
403+
"""
404+
Defense-in-depth for scylladb/python-driver#317.
405+
406+
When run_add_or_renew_pool finishes creating a new pool but finds that
407+
a live pool has already been installed for the host by a concurrent
408+
creation, the new pool must be discarded (shut down) rather than
409+
replacing the live one. Replacing a live pool would close it while
410+
requests are still in-flight, causing server-side double execution.
411+
"""
412+
s = self._make_session()
413+
host = Mock()
414+
415+
# Simulate a live pool already installed for the host.
416+
live_pool = Mock()
417+
live_pool.is_shutdown = False
418+
s._pools[host] = live_pool
419+
420+
# Build a new pool that run_add_or_renew_pool would try to install.
421+
new_pool = Mock()
422+
423+
# Invoke the critical section from run_add_or_renew_pool under the
424+
# session lock: if a live pool is already present, discard the new one.
425+
discarded = False
426+
with s._lock:
427+
previous = s._pools.get(host)
428+
if previous is not None and not previous.is_shutdown:
429+
new_pool.shutdown()
430+
discarded = True
431+
else:
432+
s._pools[host] = new_pool
433+
434+
assert discarded, "New pool should be discarded when a live pool already exists"
435+
new_pool.shutdown.assert_called_once()
436+
# The live pool must not have been replaced.
437+
assert s._pools[host] is live_pool
438+
439+
def test_remove_pool_clears_pending_future(self):
440+
"""
441+
remove_pool must clear _pending_pool_futures for the host so that a
442+
subsequent update_created_pools call can schedule a fresh pool
443+
creation if needed (instead of reusing a now-stale in-flight future
444+
for a host that has been removed and re-added).
445+
"""
446+
s = self._make_session()
447+
host = Mock()
448+
449+
stale_future = Future()
450+
s._pending_pool_futures[host] = stale_future
451+
452+
pool = Mock()
453+
s._pools[host] = pool
454+
455+
s.remove_pool(host)
456+
457+
assert host not in s._pending_pool_futures, (
458+
"remove_pool must clear _pending_pool_futures so the next "
459+
"add_or_renew_pool call submits a fresh task"
460+
)
461+
462+
342463
class ProtocolVersionTests(unittest.TestCase):
343464

344465
def test_protocol_downgrade_test(self):

0 commit comments

Comments
 (0)