Skip to content

Commit 93ede64

Browse files
committed
(improvement) Further optimize load-balancing query plans and tests
Use index-based loops and excluded fast-paths in DC/Rack-aware policies to speed up TokenAware replica selection and exclusion handling. Snapshot `self._remote_hosts` to local variables in make_query_plan and make_query_plan_with_exclusion for GIL-free Python 3.13+ safety. Removed hot-path try/except and avoid eager list conversion. Updated TokenAware tests/mocks for token_map and deterministic ordering. Benchmark (100K queries, 45-node/5-DC topology, Python 3.14, median of 5 runs): Policy | Kops/s | vs master | delta | Mem KB ----------------------------------------------------------------- DCAware | 201 | +90% | +26% | 1.5 RackAware | 163 | +140% | +27% | 2.0 TokenAware(DCAware) | 96 | +433% | +19% | 1.7 TokenAware(RackAware) | 88 | +418% | +38% | 2.2 Default(DCAware) | 137 | +51% | -31% | 1.6 HostFilter(DCAware) | 65 | +23% | +16% | 1.7 Index-based loops add +19-38% to TokenAware, +26-27% to DC/RackAware. Cumulative: DCAware ~2x, RackAware ~2.4x, TokenAware ~5x vs master. Signed-off-by: Yaniv Kaul <yaniv.kaul@scylladb.com>
1 parent 14cfa3c commit 93ede64

2 files changed

Lines changed: 126 additions & 41 deletions

File tree

cassandra/policies.py

Lines changed: 81 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import random
1515

1616
from collections import namedtuple
17-
from itertools import islice, cycle, groupby, repeat, chain
17+
from itertools import islice, cycle, groupby, repeat
1818
import logging
1919
from random import randint, shuffle
2020
from threading import Lock
@@ -320,11 +320,14 @@ def make_query_plan(self, working_keyspace=None, query=None):
320320
self._position += 1
321321

322322
local_live = self._dc_live_hosts.get(self.local_dc, ())
323-
pos = (pos % len(local_live)) if local_live else 0
324-
for host in islice(cycle(local_live), pos, pos + len(local_live)):
325-
yield host
323+
length = len(local_live)
324+
if length:
325+
pos %= length
326+
for i in range(length):
327+
yield local_live[(pos + i) % length]
326328

327-
for host in self._remote_hosts:
329+
remote_hosts = self._remote_hosts
330+
for host in remote_hosts:
328331
yield host
329332

330333
def make_query_plan_with_exclusion(
@@ -336,14 +339,30 @@ def make_query_plan_with_exclusion(
336339
self._position += 1
337340

338341
local_live = self._dc_live_hosts.get(self.local_dc, ())
339-
pos = (pos % len(local_live)) if local_live else 0
340-
for host in islice(cycle(local_live), pos, pos + len(local_live)):
341-
if excluded and host in excluded:
342-
continue
343-
yield host
342+
length = len(local_live)
343+
remote_hosts = self._remote_hosts
344+
if not excluded:
345+
if length:
346+
pos %= length
347+
for i in range(length):
348+
yield local_live[(pos + i) % length]
349+
for host in remote_hosts:
350+
yield host
351+
return
352+
353+
if not isinstance(excluded, set):
354+
excluded = set(excluded)
355+
356+
if length:
357+
pos %= length
358+
for i in range(length):
359+
host = local_live[(pos + i) % length]
360+
if host in excluded:
361+
continue
362+
yield host
344363

345-
for host in self._remote_hosts:
346-
if excluded and host in excluded:
364+
for host in remote_hosts:
365+
if host in excluded:
347366
continue
348367
yield host
349368

@@ -486,17 +505,18 @@ def make_query_plan(self, working_keyspace=None, query=None):
486505
length = len(local_rack_live)
487506
if length:
488507
p = pos % length
489-
for host in islice(cycle(local_rack_live), p, p + length):
490-
yield host
508+
for i in range(length):
509+
yield local_rack_live[(p + i) % length]
491510

492511
local_non_rack = self._non_local_rack_hosts
493512
length = len(local_non_rack)
494513
if length:
495514
p = pos % length
496-
for host in islice(cycle(local_non_rack), p, p + length):
497-
yield host
515+
for i in range(length):
516+
yield local_non_rack[(p + i) % length]
498517

499-
for host in self._remote_hosts:
518+
remote_hosts = self._remote_hosts
519+
for host in remote_hosts:
500520
yield host
501521

502522
def make_query_plan_with_exclusion(
@@ -507,24 +527,47 @@ def make_query_plan_with_exclusion(
507527

508528
local_rack_live = self._live_hosts.get((self.local_dc, self.local_rack), ())
509529
length = len(local_rack_live)
530+
remote_hosts = self._remote_hosts
531+
if not excluded:
532+
if length:
533+
p = pos % length
534+
for i in range(length):
535+
yield local_rack_live[(p + i) % length]
536+
537+
local_non_rack = self._non_local_rack_hosts
538+
length = len(local_non_rack)
539+
if length:
540+
p = pos % length
541+
for i in range(length):
542+
yield local_non_rack[(p + i) % length]
543+
544+
for host in remote_hosts:
545+
yield host
546+
return
547+
548+
if not isinstance(excluded, set):
549+
excluded = set(excluded)
550+
510551
if length:
511552
p = pos % length
512-
for host in islice(cycle(local_rack_live), p, p + length):
513-
if excluded and host in excluded:
553+
for i in range(length):
554+
host = local_rack_live[(p + i) % length]
555+
if host in excluded:
514556
continue
515557
yield host
516558

517559
local_non_rack = self._non_local_rack_hosts
518560
length = len(local_non_rack)
519561
if length:
520562
p = pos % length
521-
for host in islice(cycle(local_non_rack), p, p + length):
522-
if excluded and host in excluded:
563+
for i in range(length):
564+
host = local_non_rack[(p + i) % length]
565+
if host in excluded:
523566
continue
524567
yield host
525568

526-
for host in self._remote_hosts:
527-
if excluded and host in excluded:
569+
for host in remote_hosts:
570+
if host in excluded:
528571
continue
529572
yield host
530573

@@ -644,7 +687,7 @@ def make_query_plan(self, working_keyspace=None, query=None):
644687
]
645688
else:
646689
try:
647-
replicas = list(token_map.get_replicas(keyspace, token))
690+
replicas = token_map.get_replicas(keyspace, token)
648691
except Exception:
649692
log.debug(
650693
"Failed to get replicas from token_map, falling back to cluster metadata"
@@ -660,6 +703,7 @@ def make_query_plan(self, working_keyspace=None, query=None):
660703
)
661704

662705
if self.shuffle_replicas and not query.is_lwt():
706+
replicas = list(replicas)
663707
shuffle(replicas)
664708

665709
local_rack = []
@@ -678,22 +722,23 @@ def make_query_plan(self, working_keyspace=None, query=None):
678722
elif d == HostDistance.REMOTE:
679723
remote.append(replica)
680724

681-
yielded_sequence = tuple(chain(local_rack, local, remote))
725+
if local_rack or local or remote:
726+
yielded = set()
682727

683-
if yielded_sequence:
684-
yield from yielded_sequence
728+
for replica in local_rack:
729+
yielded.add(replica)
730+
yield replica
685731

686-
yielded = set(yielded_sequence)
732+
for replica in local:
733+
yielded.add(replica)
734+
yield replica
735+
736+
for replica in remote:
737+
yielded.add(replica)
738+
yield replica
687739

688740
# yield rest of the cluster
689-
try:
690-
yield from child.make_query_plan_with_exclusion(
691-
keyspace, query, yielded
692-
)
693-
except (AttributeError, TypeError):
694-
for host in child.make_query_plan(keyspace, query):
695-
if host not in yielded:
696-
yield host
741+
yield from child.make_query_plan_with_exclusion(keyspace, query, yielded)
697742
else:
698743
yield from child.make_query_plan(keyspace, query)
699744

tests/unit/test_policies.py

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -749,6 +749,8 @@ def test_wrap_round_robin(self):
749749
cluster.metadata = Mock(spec=Metadata)
750750
cluster.metadata._tablets = Mock(spec=Tablets)
751751
cluster.metadata._tablets.get_tablet_for_key.return_value = None
752+
cluster.metadata.token_map = Mock()
753+
cluster.metadata.token_map.token_class.from_key.side_effect = lambda key: key
752754
hosts = [
753755
Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy, host_id=uuid.uuid4())
754756
for i in range(4)
@@ -761,8 +763,11 @@ def get_replicas(keyspace, packed_key):
761763
return list(islice(cycle(hosts), index, index + 2))
762764

763765
cluster.metadata.get_replicas.side_effect = get_replicas
766+
cluster.metadata.token_map.get_replicas.side_effect = (
767+
cluster.metadata.get_replicas
768+
)
764769

765-
policy = TokenAwarePolicy(RoundRobinPolicy())
770+
policy = TokenAwarePolicy(RoundRobinPolicy(), shuffle_replicas=False)
766771
policy.populate(cluster, hosts)
767772

768773
for i in range(4):
@@ -787,6 +792,8 @@ def test_wrap_dc_aware(self):
787792
cluster.metadata = Mock(spec=Metadata)
788793
cluster.metadata._tablets = Mock(spec=Tablets)
789794
cluster.metadata._tablets.get_tablet_for_key.return_value = None
795+
cluster.metadata.token_map = Mock()
796+
cluster.metadata.token_map.token_class.from_key.side_effect = lambda key: key
790797
hosts = [
791798
Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy, host_id=uuid.uuid4())
792799
for i in range(4)
@@ -807,9 +814,13 @@ def get_replicas(keyspace, packed_key):
807814
return [hosts[1], hosts[3]]
808815

809816
cluster.metadata.get_replicas.side_effect = get_replicas
817+
cluster.metadata.token_map.get_replicas.side_effect = (
818+
cluster.metadata.get_replicas
819+
)
810820

811821
policy = TokenAwarePolicy(
812-
DCAwareRoundRobinPolicy("dc1", used_hosts_per_remote_dc=2)
822+
DCAwareRoundRobinPolicy("dc1", used_hosts_per_remote_dc=2),
823+
shuffle_replicas=False,
813824
)
814825
policy.populate(cluster, hosts)
815826

@@ -843,6 +854,8 @@ def test_wrap_rack_aware(self):
843854
cluster.metadata = Mock(spec=Metadata)
844855
cluster.metadata._tablets = Mock(spec=Tablets)
845856
cluster.metadata._tablets.get_tablet_for_key.return_value = None
857+
cluster.metadata.token_map = Mock()
858+
cluster.metadata.token_map.token_class.from_key.side_effect = lambda key: key
846859
hosts = [
847860
Host(DefaultEndPoint(str(i)), SimpleConvictionPolicy, host_id=uuid.uuid4())
848861
for i in range(8)
@@ -867,9 +880,13 @@ def get_replicas(keyspace, packed_key):
867880
return [hosts[4], hosts[5], hosts[6], hosts[7]]
868881

869882
cluster.metadata.get_replicas.side_effect = get_replicas
883+
cluster.metadata.token_map.get_replicas.side_effect = (
884+
cluster.metadata.get_replicas
885+
)
870886

871887
policy = TokenAwarePolicy(
872-
RackAwareRoundRobinPolicy("dc1", "rack1", used_hosts_per_remote_dc=4)
888+
RackAwareRoundRobinPolicy("dc1", "rack1", used_hosts_per_remote_dc=4),
889+
shuffle_replicas=False,
873890
)
874891
policy.populate(cluster, hosts)
875892

@@ -1017,12 +1034,20 @@ def test_statement_keyspace(self):
10171034
replicas = hosts[2:]
10181035
cluster.metadata.get_replicas.return_value = replicas
10191036
cluster.metadata._tablets.get_tablet_for_key.return_value = None
1037+
cluster.metadata.token_map = Mock()
1038+
cluster.metadata.token_map.token_class.from_key.side_effect = lambda key: key
1039+
cluster.metadata.token_map.get_replicas.side_effect = (
1040+
cluster.metadata.get_replicas
1041+
)
10201042

10211043
child_policy = Mock()
10221044
child_policy.make_query_plan.return_value = hosts
1045+
child_policy.make_query_plan_with_exclusion.side_effect = lambda k, q, e: [
1046+
h for h in hosts if h not in e
1047+
]
10231048
child_policy.distance.return_value = HostDistance.LOCAL
10241049

1025-
policy = TokenAwarePolicy(child_policy)
1050+
policy = TokenAwarePolicy(child_policy, shuffle_replicas=False)
10261051
policy.populate(cluster, hosts)
10271052

10281053
# no keyspace, child policy is called
@@ -1141,6 +1166,11 @@ def _prepare_cluster_with_vnodes(self):
11411166
cluster.metadata.all_hosts.return_value = hosts
11421167
cluster.metadata.get_replicas.return_value = hosts[2:]
11431168
cluster.metadata._tablets.get_tablet_for_key.return_value = None
1169+
cluster.metadata.token_map = Mock()
1170+
cluster.metadata.token_map.token_class.from_key.side_effect = lambda key: key
1171+
cluster.metadata.token_map.get_replicas.side_effect = (
1172+
cluster.metadata.get_replicas
1173+
)
11441174
return cluster
11451175

11461176
def _prepare_cluster_with_tablets(self):
@@ -1158,6 +1188,11 @@ def _prepare_cluster_with_tablets(self):
11581188
cluster.metadata._tablets.get_tablet_for_key.return_value = Tablet(
11591189
replicas=[(h.host_id, 0) for h in hosts[2:]]
11601190
)
1191+
cluster.metadata.token_map = Mock()
1192+
cluster.metadata.token_map.token_class.from_key.side_effect = lambda key: key
1193+
cluster.metadata.token_map.get_replicas.side_effect = (
1194+
cluster.metadata.get_replicas
1195+
)
11611196
return cluster
11621197

11631198
@patch("cassandra.policies.shuffle")
@@ -2075,8 +2110,13 @@ def get_replicas(keyspace, packed_key):
20752110
cluster.metadata.get_replicas.side_effect = get_replicas
20762111
cluster.metadata._tablets = Mock(spec=Tablets)
20772112
cluster.metadata._tablets.get_tablet_for_key.return_value = None
2113+
cluster.metadata.token_map = Mock()
2114+
cluster.metadata.token_map.token_class.from_key.side_effect = lambda key: key
2115+
cluster.metadata.token_map.get_replicas.side_effect = (
2116+
cluster.metadata.get_replicas
2117+
)
20782118

2079-
child_policy = TokenAwarePolicy(RoundRobinPolicy())
2119+
child_policy = TokenAwarePolicy(RoundRobinPolicy(), shuffle_replicas=False)
20802120

20812121
hfp = HostFilterPolicy(
20822122
child_policy=child_policy,

0 commit comments

Comments
 (0)