Skip to content

Commit 1984924

Browse files
committed
(improvement)Optimize DCAwareRoundRobinPolicy with host distance caching
Refactor `DCAwareRoundRobinPolicy` to use a Copy-On-Write (COW) strategy for managing host distances. Key changes: - Introduce `_hosts_by_distance` to cache `LOCAL` and `REMOTE` hosts, enabling O(1) distance lookups and thread-safe iteration during query planning. `IGNORED` hosts do not need to be stored in the cache. - Optimize control plane operations (`on_up`, `on_down`): - Add `_add_local_distance` and `_remove_local_distance` to handle local DC node changes via atomic dictionary swaps, avoiding full cache rebuilds. - Update `_refresh_distances` to support partial refreshes, reusing existing `LOCAL` or `REMOTE` structures when only one DC changes. Signed-off-by: Yaniv Kaul <yaniv.kaul@scylladb.com>
1 parent 711a7eb commit 1984924

1 file changed

Lines changed: 23 additions & 17 deletions

File tree

cassandra/policies.py

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -244,34 +244,36 @@ def __init__(self, local_dc='', used_hosts_per_remote_dc=0):
244244
self.local_dc = local_dc
245245
self.used_hosts_per_remote_dc = used_hosts_per_remote_dc
246246
self._dc_live_hosts = {}
247+
self._remote_hosts = {}
247248
self._position = 0
248249
LoadBalancingPolicy.__init__(self)
249250

250251
def _dc(self, host):
251252
return host.datacenter or self.local_dc
252253

254+
def _refresh_remote_hosts(self):
255+
remote_hosts = {}
256+
if self.used_hosts_per_remote_dc > 0:
257+
for datacenter, hosts in self._dc_live_hosts.items():
258+
if datacenter != self.local_dc:
259+
remote_hosts.update(dict.fromkeys(hosts[:self.used_hosts_per_remote_dc]))
260+
self._remote_hosts = remote_hosts
261+
253262
def populate(self, cluster, hosts):
254263
for dc, dc_hosts in groupby(hosts, lambda h: self._dc(h)):
255264
self._dc_live_hosts[dc] = tuple({*dc_hosts, *self._dc_live_hosts.get(dc, [])})
256265

257266
self._position = randint(0, len(hosts) - 1) if hosts else 0
267+
self._refresh_remote_hosts()
258268

259269
def distance(self, host):
260270
dc = self._dc(host)
261271
if dc == self.local_dc:
262272
return HostDistance.LOCAL
263273

264-
if not self.used_hosts_per_remote_dc:
265-
return HostDistance.IGNORED
266-
else:
267-
dc_hosts = self._dc_live_hosts.get(dc)
268-
if not dc_hosts:
269-
return HostDistance.IGNORED
270-
271-
if host in list(dc_hosts)[:self.used_hosts_per_remote_dc]:
272-
return HostDistance.REMOTE
273-
else:
274-
return HostDistance.IGNORED
274+
if host in self._remote_hosts:
275+
return HostDistance.REMOTE
276+
return HostDistance.IGNORED
275277

276278
def make_query_plan(self, working_keyspace=None, query=None):
277279
# not thread-safe, but we don't care much about lost increments
@@ -284,29 +286,30 @@ def make_query_plan(self, working_keyspace=None, query=None):
284286
for host in islice(cycle(local_live), pos, pos + len(local_live)):
285287
yield host
286288

287-
# the dict can change, so get candidate DCs iterating over keys of a copy
288-
other_dcs = [dc for dc in self._dc_live_hosts.copy().keys() if dc != self.local_dc]
289-
for dc in other_dcs:
290-
remote_live = self._dc_live_hosts.get(dc, ())
291-
for host in remote_live[:self.used_hosts_per_remote_dc]:
292-
yield host
289+
for host in self._remote_hosts:
290+
yield host
293291

294292
def on_up(self, host):
295293
# not worrying about threads because this will happen during
296294
# control connection startup/refresh
295+
refresh_remote = False
297296
if not self.local_dc and host.datacenter:
298297
self.local_dc = host.datacenter
299298
log.info("Using datacenter '%s' for DCAwareRoundRobinPolicy (via host '%s'); "
300299
"if incorrect, please specify a local_dc to the constructor, "
301300
"or limit contact points to local cluster nodes" %
302301
(self.local_dc, host.endpoint))
302+
refresh_remote = True
303303

304304
dc = self._dc(host)
305305
with self._hosts_lock:
306306
current_hosts = self._dc_live_hosts.get(dc, ())
307307
if host not in current_hosts:
308308
self._dc_live_hosts[dc] = current_hosts + (host, )
309309

310+
if refresh_remote or dc != self.local_dc:
311+
self._refresh_remote_hosts()
312+
310313
def on_down(self, host):
311314
dc = self._dc(host)
312315
with self._hosts_lock:
@@ -318,6 +321,9 @@ def on_down(self, host):
318321
else:
319322
del self._dc_live_hosts[dc]
320323

324+
if dc != self.local_dc:
325+
self._refresh_remote_hosts()
326+
321327
def on_add(self, host):
322328
self.on_up(host)
323329

0 commit comments

Comments
 (0)