Skip to content

Commit b2872d1

Browse files
committed
session: fix pool renewal race causing double statement execution
When two or more nodes are bootstrapped concurrently the Python driver can execute the same CQL statement twice, causing spurious "already exists" errors in the caller. This has been observed as flaky test failures across the ScyllaDB test suite for the past two years, and worked around by using idempotent DDL forms (IF NOT EXISTS / IF EXISTS) in dozens of tests. Root cause ---------- The race unfolds as follows: 1. Two on_add notifications arrive at roughly the same time, one for each new node. Each one calls session.add_or_renew_pool(), which submits run_add_or_renew_pool() to the thread pool and returns. Both submissions are in-flight concurrently. 2. The first add_or_renew_pool() finishes and calls _finalize_add(), which notifies load-balancing policies and then calls session.update_created_pools() for every live session. 3. update_created_pools() iterates all known hosts. For the second host, whose run_add_or_renew_pool() has not yet completed, it sees self._pools.get(host) == None (or a shut-down pool) and therefore submits *another* run_add_or_renew_pool() for that host. 4. Now two tasks are connecting to the same host. The first one finishes and installs pool-A in self._pools, then runs a statement (e.g. CREATE ROLE) that is in-flight on pool-A. 5. The second task finishes, reads the stale `previous = self._pools.get(host)` value (captured *before* the lock was taken — another bug), installs pool-B and then shuts down pool-A. The in-flight CREATE ROLE request is orphaned; the driver retries it on pool-B. The server executes it a second time and returns "Role ... already exists". Fix --- Three coordinated changes to cassandra/cluster.py: * Session.__init__: add self._pending_pool_futures = {}, a dict mapping host -> entry (with future, creation_id, distance, is_host_addition_cell) for any in-flight pool creation, guarded by _lock. * add_or_renew_pool: before submitting run_add_or_renew_pool(), check _pending_pool_futures under _lock. If an in-flight future already exists for the host with the same distance, return it immediately — this is the primary fix that prevents the duplicate submission from update_created_pools. If is_host_addition=True on the new call but the existing entry has False, upgrade it in-place via a shared is_host_addition_cell so the closure passes the correct flag to signal_connection_failure() and _HostReconnectionHandler dispatches through on_add() instead of on_up() on reconnect. If the distance changed, submit a fresh task (the old HostConnection was constructed with stale distance, e.g. no connections for REMOTE with connect_to_remote_hosts=False). Each submission gets a unique creation_id token. The closure checks _pending_pool_futures[host].creation_id before installing its pool: if remove_pool() ran and a fresher creation was submitted while this task was connecting, the stale task discards its pool rather than overwriting the fresher one. Additionally, move the `previous = self._pools.get(host)` read inside the lock so the live-pool check is atomic with the installation of the new pool: if a concurrent creation has already installed a live pool by the time we finish connecting, discard our new pool instead of replacing the live one (defense-in-depth). Cleanup of _pending_pool_futures is handled by a done_callback registered on the future. The callback acquires _lock and only clears the entry if it still holds the same creation_id it was registered on, so a concurrent remove_pool followed by a new add_or_renew_pool is not affected. The entry is stored before calling submit() so that the closure always finds a valid creation_id in the dict, even when the executor runs the task synchronously. * remove_pool: clear _pending_pool_futures[host] under _lock so that if a host is removed and immediately re-added, add_or_renew_pool submits a fresh creation rather than reusing a stale done future. Tests ----- Five new unit tests are added in PoolRenewalRaceTest (tests/unit/test_cluster.py). They exercise the new code paths without requiring a real cluster connection by constructing a minimal Session via object.__new__ and mocking the executor and profile manager. The tests use the new dict-based entry format for _pending_pool_futures: * test_add_or_renew_pool_reuses_inflight_future: places a pending entry in _pending_pool_futures and verifies that add_or_renew_pool returns the existing future without submitting a new task. * test_add_or_renew_pool_discards_duplicate_when_live_pool_exists: exercises the real production code path by patching HostConnection to a lightweight stub and using a synchronous executor shim that runs the submitted callable inline. Pre-installs a live pool for the host, then calls add_or_renew_pool() and asserts that the live pool is not replaced and the newly connected stub pool is shut down. * test_remove_pool_clears_pending_future: verifies that remove_pool clears _pending_pool_futures so the next add_or_renew_pool call submits a fresh task. * test_done_callback_clears_pending_future: verifies that the done_callback fires and removes the entry from _pending_pool_futures once the future completes. * test_done_callback_does_not_clear_newer_future: verifies the creation_id guard — an old future's callback does not evict a newer entry installed in its place after a remove_pool + add_or_renew_pool. Fixes: #317
1 parent cd9f525 commit b2872d1

2 files changed

Lines changed: 335 additions & 7 deletions

File tree

cassandra/cluster.py

Lines changed: 115 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2615,6 +2615,12 @@ def __init__(self, cluster, hosts, keyspace=None):
26152615

26162616
self._lock = RLock()
26172617
self._pools = {}
2618+
# Tracks in-flight pool creation futures keyed by host, guarded by
2619+
# _lock. Used by add_or_renew_pool to detect and reuse concurrent
2620+
# creations so that update_created_pools does not schedule a duplicate
2621+
# run_add_or_renew_pool for a host whose pool creation is already
2622+
# in-flight (scylladb/python-driver#317).
2623+
self._pending_pool_futures = {}
26182624
self._profile_manager = cluster.profile_manager
26192625
self._metrics = cluster.metrics
26202626
self._request_init_callbacks = []
@@ -3240,23 +3246,37 @@ def add_or_renew_pool(self, host, is_host_addition):
32403246
if distance == HostDistance.IGNORED:
32413247
return None
32423248

3249+
# Mutable one-element list so the outer code can upgrade the flag
3250+
# after the closure has been submitted but before it reads it. This
3251+
# fixes the coalescing race where an in-flight future created with
3252+
# is_host_addition=False is reused by a later on_add() call that
3253+
# needs is_host_addition=True: the closure then passes the wrong flag
3254+
# to signal_connection_failure(), causing _HostReconnectionHandler to
3255+
# call on_up() instead of on_add() on reconnect (scylladb/python-driver#317).
3256+
is_host_addition_cell = [is_host_addition]
3257+
3258+
# Unique token for this submission. The closure checks it before
3259+
# installing its pool so that a stale task (whose entry was replaced by
3260+
# remove_pool + a fresh add_or_renew_pool) discards its pool rather
3261+
# than overwriting the freshly-started one (scylladb/python-driver#317).
3262+
creation_id = object()
3263+
32433264
def run_add_or_renew_pool():
32443265
try:
32453266
new_pool = HostConnection(host, distance, self)
32463267
except AuthenticationFailed as auth_exc:
32473268
conn_exc = ConnectionException(str(auth_exc), endpoint=host)
3248-
self.cluster.signal_connection_failure(host, conn_exc, is_host_addition)
3269+
self.cluster.signal_connection_failure(host, conn_exc, is_host_addition_cell[0])
32493270
return False
32503271
except Exception as conn_exc:
32513272
log.warning("Failed to create connection pool for new host %s:",
32523273
host, exc_info=conn_exc)
32533274
# the host itself will still be marked down, so we need to pass
32543275
# a special flag to make sure the reconnector is created
32553276
self.cluster.signal_connection_failure(
3256-
host, conn_exc, is_host_addition, expect_host_to_be_down=True)
3277+
host, conn_exc, is_host_addition_cell[0], expect_host_to_be_down=True)
32573278
return False
32583279

3259-
previous = self._pools.get(host)
32603280
with self._lock:
32613281
while new_pool._keyspace != self.keyspace:
32623282
self._lock.release()
@@ -3271,23 +3291,111 @@ def callback(pool, errors):
32713291
set_keyspace_event.wait(self.cluster.connect_timeout)
32723292
if not set_keyspace_event.is_set() or errors_returned:
32733293
log.warning("Failed setting keyspace for pool after keyspace changed during connect: %s", errors_returned)
3274-
self.cluster.on_down(host, is_host_addition)
3294+
self.cluster.on_down(host, is_host_addition_cell[0])
32753295
new_pool.shutdown()
32763296
self._lock.acquire()
32773297
return False
32783298
self._lock.acquire()
3279-
self._pools[host] = new_pool
3299+
3300+
# Identity guard: if _pending_pool_futures no longer holds our
3301+
# creation_id it means remove_pool() ran (and possibly a fresh
3302+
# add_or_renew_pool was submitted) while we were connecting.
3303+
# Discard our pool so the fresher task can install its own
3304+
# (scylladb/python-driver#317).
3305+
entry = self._pending_pool_futures.get(host)
3306+
if entry is None or entry['creation_id'] is not creation_id:
3307+
log.debug("Discarding stale connection pool for host %s "
3308+
"(superseded by a newer creation)", host)
3309+
discard_new_pool = True
3310+
else:
3311+
# Read the current pool state inside the lock so the check
3312+
# is atomic with the installation of our new pool.
3313+
previous = self._pools.get(host)
3314+
if previous is not None and not previous.is_shutdown:
3315+
# A concurrent add_or_renew_pool already installed a
3316+
# live pool for this host while we were connecting.
3317+
# Discard ours to avoid replacing it and dropping
3318+
# in-flight requests (scylladb/python-driver#317).
3319+
log.debug("Discarding duplicate connection pool for host %s "
3320+
"(live pool already present)", host)
3321+
discard_new_pool = True
3322+
else:
3323+
discard_new_pool = False
3324+
self._pools[host] = new_pool
3325+
3326+
if discard_new_pool:
3327+
new_pool.shutdown()
3328+
return True
32803329

32813330
log.debug("Added pool for host %s to session", host)
32823331
if previous:
32833332
previous.shutdown()
32843333

32853334
return True
32863335

3287-
return self.submit(run_add_or_renew_pool)
3336+
with self._lock:
3337+
if self.is_shutdown:
3338+
return None
3339+
# If there is already an in-flight pool creation for this host,
3340+
# return that future instead of scheduling a duplicate. This
3341+
# prevents update_created_pools from creating a second pool when
3342+
# the first one has not yet finished connecting
3343+
# (scylladb/python-driver#317).
3344+
entry = self._pending_pool_futures.get(host)
3345+
if entry is not None and entry['future'] is not None and not entry['future'].done():
3346+
if distance == entry['distance']:
3347+
# Same distance: safe to coalesce. Upgrade is_host_addition
3348+
# in the shared cell if the new caller needs the stricter
3349+
# on_add() reconnect path (scylladb/python-driver#317).
3350+
if is_host_addition:
3351+
entry['is_host_addition_cell'][0] = True
3352+
log.debug("Reusing in-flight pool creation for host %s", host)
3353+
return entry['future']
3354+
# Distance changed: the in-flight HostConnection was constructed
3355+
# with the old distance (e.g. REMOTE with connect_to_remote_hosts
3356+
# =False => no connections). Submit a fresh task; the creation_id
3357+
# guard below ensures it wins over the stale one
3358+
# (scylladb/python-driver#317).
3359+
log.debug("Distance changed for host %s while pool creation was "
3360+
"in-flight; submitting fresh creation", host)
3361+
# Store the entry BEFORE calling submit so the closure always
3362+
# finds a valid creation_id in _pending_pool_futures, even when
3363+
# the executor runs the task synchronously
3364+
# (scylladb/python-driver#317).
3365+
new_entry = {
3366+
'future': None, # filled in immediately after submit returns
3367+
'creation_id': creation_id,
3368+
'distance': distance,
3369+
'is_host_addition_cell': is_host_addition_cell,
3370+
}
3371+
self._pending_pool_futures[host] = new_entry
3372+
future = self.submit(run_add_or_renew_pool)
3373+
if future is None:
3374+
# Session is shutting down; clean up the placeholder entry.
3375+
self._pending_pool_futures.pop(host, None)
3376+
return None
3377+
new_entry['future'] = future
3378+
# Remove the entry once the future finishes, regardless of how
3379+
# run_add_or_renew_pool exits (including unhandled exceptions).
3380+
# The callback acquires _lock and only clears the entry if it
3381+
# still holds *this* creation_id, so a concurrent remove_pool
3382+
# followed by a new add_or_renew_pool is not affected
3383+
# (scylladb/python-driver#317).
3384+
def _clear_pending(f, _host=host, _creation_id=creation_id):
3385+
with self._lock:
3386+
e = self._pending_pool_futures.get(_host)
3387+
if e is not None and e['creation_id'] is _creation_id:
3388+
self._pending_pool_futures.pop(_host, None)
3389+
future.add_done_callback(_clear_pending)
3390+
return future
32883391

32893392
def remove_pool(self, host):
3290-
pool = self._pools.pop(host, None)
3393+
with self._lock:
3394+
pool = self._pools.pop(host, None)
3395+
# Invalidate any in-flight pool creation for this host so that a
3396+
# subsequent update_created_pools call can schedule a fresh one if
3397+
# needed (scylladb/python-driver#317).
3398+
self._pending_pool_futures.pop(host, None)
32913399
if pool:
32923400
log.debug("Removed connection pool for %r", host)
32933401
return self.submit(pool.shutdown)

0 commit comments

Comments
 (0)