Skip to content

Commit 14cfa3c

Browse files
committed
(improvement)TokenAware round robin policy and others - improved query planning.
Optimize TokenAwarePolicy query plan generation This patch significantly improves the performance of TokenAwarePolicy by reducing overhead in list materialization and distance calculation. Key changes: 1. Introduced `make_query_plan_with_exclusion()` to the LoadBalancingPolicy interface. - This allows a parent policy (like TokenAware) to request a plan from a child policy while efficiently skipping a set of already-yielded hosts (replicas). - Implemented optimized versions in `DCAwareRoundRobinPolicy` and `RackAwareRoundRobinPolicy`. These implementations integrate the exclusion check directly into their generation loops, avoiding the need for inefficient external filtering or full list materialization. 2. Optimized `TokenAwarePolicy.make_query_plan`: - Removed list materialization of the child query plan. - Replaced multiple passes over replicas (checking `child.distance` each time) with a single pass that buckets replicas into local/remote lists. - Utilizes `make_query_plan_with_exclusion` to yield the remainder of the plan. - Added `__slots__` to reduce memory overhead and attribute access cost. Benchmark (100K queries, 45-node/5-DC topology, Python 3.14, median of 5 runs): Policy | Kops/s | vs master | delta | Mem KB ----------------------------------------------------------------- DCAware | 159 | +50% | | 1.5 RackAware | 128 | +88% | | 2.0 TokenAware(DCAware) | 81 | +350% | +305% | 2.9 TokenAware(RackAware) | 64 | +276% | +237% | 3.6 Default(DCAware) | 199 | +119% | +62% | 1.6 HostFilter(DCAware) | 56 | +6% | | 1.7 TokenAware throughput increases ~4x: DCAware-wrapped goes from 18 to 81 Kops/s (+350%), RackAware-wrapped from 17 to 64 Kops/s (+276%). DefaultPolicy also benefits strongly (+119%) from the new exclusion interface. Signed-off-by: Yaniv Kaul <yaniv.kaul@scylladb.com>
1 parent f962afe commit 14cfa3c

2 files changed

Lines changed: 862 additions & 289 deletions

File tree

cassandra/policies.py

Lines changed: 147 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import random
1515

1616
from collections import namedtuple
17-
from itertools import islice, cycle, groupby, repeat
17+
from itertools import islice, cycle, groupby, repeat, chain
1818
import logging
1919
from random import randint, shuffle
2020
from threading import Lock
@@ -157,6 +157,20 @@ def make_query_plan(self, working_keyspace=None, query=None):
157157
"""
158158
raise NotImplementedError()
159159

160+
def make_query_plan_with_exclusion(
161+
self, working_keyspace=None, query=None, excluded=()
162+
):
163+
"""
164+
Same as :meth:`make_query_plan`, but with an additional `excluded` parameter.
165+
`excluded` should be a container (set, list, etc.) of hosts to skip.
166+
167+
The default implementation simply delegates to `make_query_plan` and filters the result.
168+
Subclasses may override this for performance.
169+
"""
170+
for host in self.make_query_plan(working_keyspace, query):
171+
if host not in excluded:
172+
yield host
173+
160174
def check_supported(self):
161175
"""
162176
This will be called after the cluster Metadata has been initialized.
@@ -199,6 +213,22 @@ def make_query_plan(self, working_keyspace=None, query=None):
199213
else:
200214
return []
201215

216+
def make_query_plan_with_exclusion(
217+
self, working_keyspace=None, query=None, excluded=()
218+
):
219+
pos = self._position
220+
self._position += 1
221+
222+
hosts = self._live_hosts
223+
length = len(hosts)
224+
if length:
225+
pos %= length
226+
for host in islice(cycle(hosts), pos, pos + length):
227+
if host not in excluded:
228+
yield host
229+
else:
230+
return
231+
202232
def on_up(self, host):
203233
with self._hosts_lock:
204234
self._live_hosts = self._live_hosts.union((host,))
@@ -297,6 +327,26 @@ def make_query_plan(self, working_keyspace=None, query=None):
297327
for host in self._remote_hosts:
298328
yield host
299329

330+
def make_query_plan_with_exclusion(
331+
self, working_keyspace=None, query=None, excluded=()
332+
):
333+
# not thread-safe, but we don't care much about lost increments
334+
# for the purposes of load balancing
335+
pos = self._position
336+
self._position += 1
337+
338+
local_live = self._dc_live_hosts.get(self.local_dc, ())
339+
pos = (pos % len(local_live)) if local_live else 0
340+
for host in islice(cycle(local_live), pos, pos + len(local_live)):
341+
if excluded and host in excluded:
342+
continue
343+
yield host
344+
345+
for host in self._remote_hosts:
346+
if excluded and host in excluded:
347+
continue
348+
yield host
349+
300350
def on_up(self, host):
301351
# not worrying about threads because this will happen during
302352
# control connection startup/refresh
@@ -449,6 +499,35 @@ def make_query_plan(self, working_keyspace=None, query=None):
449499
for host in self._remote_hosts:
450500
yield host
451501

502+
def make_query_plan_with_exclusion(
503+
self, working_keyspace=None, query=None, excluded=()
504+
):
505+
pos = self._position
506+
self._position += 1
507+
508+
local_rack_live = self._live_hosts.get((self.local_dc, self.local_rack), ())
509+
length = len(local_rack_live)
510+
if length:
511+
p = pos % length
512+
for host in islice(cycle(local_rack_live), p, p + length):
513+
if excluded and host in excluded:
514+
continue
515+
yield host
516+
517+
local_non_rack = self._non_local_rack_hosts
518+
length = len(local_non_rack)
519+
if length:
520+
p = pos % length
521+
for host in islice(cycle(local_non_rack), p, p + length):
522+
if excluded and host in excluded:
523+
continue
524+
yield host
525+
526+
for host in self._remote_hosts:
527+
if excluded and host in excluded:
528+
continue
529+
yield host
530+
452531
def on_up(self, host):
453532
dc = self._dc(host)
454533
rack = self._rack(host)
@@ -515,16 +594,12 @@ class TokenAwarePolicy(LoadBalancingPolicy):
515594
policy's query plan will be used as is.
516595
"""
517596

518-
_child_policy = None
519-
_cluster_metadata = None
520-
shuffle_replicas = True
521-
"""
522-
Yield local replicas in a random order.
523-
"""
597+
__slots__ = ("_child_policy", "_cluster_metadata", "shuffle_replicas")
524598

525599
def __init__(self, child_policy, shuffle_replicas=True):
526600
self._child_policy = child_policy
527601
self.shuffle_replicas = shuffle_replicas
602+
self._cluster_metadata = None
528603

529604
def populate(self, cluster, hosts):
530605
self._cluster_metadata = cluster.metadata
@@ -548,48 +623,79 @@ def make_query_plan(self, working_keyspace=None, query=None):
548623

549624
child = self._child_policy
550625
if query is None or query.routing_key is None or keyspace is None:
551-
for host in child.make_query_plan(keyspace, query):
552-
yield host
626+
yield from child.make_query_plan(keyspace, query)
553627
return
554628

629+
cluster_metadata = self._cluster_metadata
630+
token_map = cluster_metadata.token_map
555631
replicas = []
556-
tablet = self._cluster_metadata._tablets.get_tablet_for_key(
557-
keyspace,
558-
query.table,
559-
self._cluster_metadata.token_map.token_class.from_key(query.routing_key),
560-
)
561-
562-
if tablet is not None:
563-
replicas_mapped = set(map(lambda r: r[0], tablet.replicas))
564-
child_plan = child.make_query_plan(keyspace, query)
632+
if token_map:
633+
try:
634+
token = token_map.token_class.from_key(query.routing_key)
635+
tablet = cluster_metadata._tablets.get_tablet_for_key(
636+
keyspace, query.table, token
637+
)
565638

566-
replicas = [host for host in child_plan if host.host_id in replicas_mapped]
567-
else:
568-
replicas = self._cluster_metadata.get_replicas(keyspace, query.routing_key)
639+
if tablet is not None:
640+
replicas_mapped = {r[0] for r in tablet.replicas}
641+
child_plan = child.make_query_plan(keyspace, query)
642+
replicas = [
643+
host for host in child_plan if host.host_id in replicas_mapped
644+
]
645+
else:
646+
try:
647+
replicas = list(token_map.get_replicas(keyspace, token))
648+
except Exception:
649+
log.debug(
650+
"Failed to get replicas from token_map, falling back to cluster metadata"
651+
)
652+
replicas = cluster_metadata.get_replicas(
653+
keyspace, query.routing_key
654+
)
655+
except Exception:
656+
log.debug(
657+
"Failed to resolve token or tablet for query plan, "
658+
"falling back to child policy",
659+
exc_info=True,
660+
)
569661

570662
if self.shuffle_replicas and not query.is_lwt():
571663
shuffle(replicas)
572664

573-
def yield_in_order(hosts):
574-
for distance in [
575-
HostDistance.LOCAL_RACK,
576-
HostDistance.LOCAL,
577-
HostDistance.REMOTE,
578-
]:
579-
for replica in hosts:
580-
if replica.is_up and child.distance(replica) == distance:
581-
yield replica
582-
583-
# yield replicas: local_rack, local, remote
584-
yield from yield_in_order(replicas)
585-
# yield rest of the cluster: local_rack, local, remote
586-
yield from yield_in_order(
587-
[
588-
host
589-
for host in child.make_query_plan(keyspace, query)
590-
if host not in replicas
591-
]
592-
)
665+
local_rack = []
666+
local = []
667+
remote = []
668+
669+
child_distance = child.distance
670+
671+
for replica in replicas:
672+
if replica.is_up:
673+
d = child_distance(replica)
674+
if d == HostDistance.LOCAL_RACK:
675+
local_rack.append(replica)
676+
elif d == HostDistance.LOCAL:
677+
local.append(replica)
678+
elif d == HostDistance.REMOTE:
679+
remote.append(replica)
680+
681+
yielded_sequence = tuple(chain(local_rack, local, remote))
682+
683+
if yielded_sequence:
684+
yield from yielded_sequence
685+
686+
yielded = set(yielded_sequence)
687+
688+
# yield rest of the cluster
689+
try:
690+
yield from child.make_query_plan_with_exclusion(
691+
keyspace, query, yielded
692+
)
693+
except (AttributeError, TypeError):
694+
for host in child.make_query_plan(keyspace, query):
695+
if host not in yielded:
696+
yield host
697+
else:
698+
yield from child.make_query_plan(keyspace, query)
593699

594700
def on_up(self, *args, **kwargs):
595701
return self._child_policy.on_up(*args, **kwargs)

0 commit comments

Comments
 (0)