Skip to content

Commit 473ca3d

Browse files
committed
Route requests to the tablet leader for strongly-consistent tables
With TABLETS_ROUTING_V2 the server returns, on a tablet_version mismatch, the tablet's replica set with the Raft leader first plus the new tablet_version. Use this to route strongly-consistent reads and writes straight to the leader: * Every EXECUTE on a V2 connection carries a tablet_version_block computed from the cached version (or a random byte on cold start and for non-single-partition requests). The block is attached per connection, keyed off the connection that actually serves the request, so a rolling upgrade that mixes v1/v2 connections never desyncs the frame. * On the response, the routing payload is parsed according to what the serving connection negotiated; the v2 tuple additionally carries the tablet_version, which is stored back on the tablet. * TokenAwarePolicy yields the leader (the first replica) first, but only for strongly-consistent keyspaces -- a tablet_version is assigned to eventually-consistent tablet tables too and must not be mistaken for a leader hint. To keep this hot path cheap, the ring token is memoized once per statement (Statement.routing_token) and reused by the load balancing policy, the shard selection in the pool, and the tablet_version_block computation, instead of re-hashing the routing key three times.
1 parent a77a737 commit 473ca3d

6 files changed

Lines changed: 527 additions & 44 deletions

File tree

cassandra/cluster.py

Lines changed: 120 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@
8484
named_tuple_factory, dict_factory, tuple_factory, FETCH_SIZE_UNSET,
8585
HostTargetingStatement)
8686
from cassandra.marshal import int64_pack
87-
from cassandra.tablets import Tablet
87+
from cassandra.tablets import Tablet, Tablets, choose_tablet_version_block, random_tablet_version_block
8888
from cassandra.timestamps import MonotonicTimestampGenerator
8989
from cassandra.util import _resolve_contact_points_to_string_map, Version, maybe_add_timeout_to_query
9090

@@ -3058,6 +3058,8 @@ def _create_response_future(self, query, parameters, trace, custom_payload,
30583058
continuous_paging_options, statement_keyspace)
30593059
elif isinstance(query, BoundStatement):
30603060
prepared_statement = query.prepared_statement
3061+
# The tablet_version_block is filled in per-target-host at send time
3062+
# (see ResponseFuture._query), because V2 is negotiated per connection.
30613063
message = ExecuteMessage(
30623064
prepared_statement.query_id, query.values, cl,
30633065
serial_cl, fetch_size, paging_state, timestamp,
@@ -3092,6 +3094,42 @@ def _create_response_future(self, query, parameters, trace, custom_payload,
30923094
load_balancer=load_balancing_policy, start_time=start_time, speculative_execution_plan=spec_exec_plan,
30933095
continuous_paging_state=None, host=host)
30943096

3097+
def _compute_tablet_version_block(self, query):
3098+
"""
3099+
Compute the tablet_version_block byte for a BoundStatement.
3100+
3101+
Always returns an int in [0, 255]. When no cached tablet is known for the
3102+
routing key (unknown keyspace/table, vnode table, cold cache, or a
3103+
missing token map) a random block is returned; the server treats that as
3104+
a version miss and replies with fresh routing info. Callers invoke this
3105+
only for connections that negotiated TABLETS_ROUTING_V2.
3106+
"""
3107+
routing_key = query.routing_key
3108+
if routing_key is None:
3109+
return random_tablet_version_block()
3110+
3111+
keyspace = query.keyspace or self.keyspace
3112+
table = query.table
3113+
if not keyspace or not table:
3114+
return random_tablet_version_block()
3115+
3116+
# Skip the Murmur3 token hash + tablet lookup when we have no cached
3117+
# tablets for this table (vnode tables, or tablet tables on cold start);
3118+
# both correctly fall back to a random block below.
3119+
if not self.cluster.metadata._tablets.table_has_tablets(keyspace, table):
3120+
return random_tablet_version_block()
3121+
3122+
token_map = self.cluster.metadata.token_map
3123+
if token_map is None:
3124+
return random_tablet_version_block()
3125+
3126+
t = query.routing_token(token_map.token_class)
3127+
tablet = self.cluster.metadata._tablets.get_tablet_for_key(keyspace, table, t)
3128+
if tablet is None or tablet.tablet_version is None:
3129+
return random_tablet_version_block()
3130+
3131+
return choose_tablet_version_block(tablet.tablet_version)
3132+
30953133
def get_execution_profile(self, name):
30963134
"""
30973135
Returns the execution profile associated with the provided ``name``.
@@ -3768,7 +3806,6 @@ class PeersQueryType(object):
37683806
_schema_meta_page_size = 1000
37693807

37703808
_uses_peers_v2 = True
3771-
_tablets_routing_v1 = False
37723809

37733810
# for testing purposes
37743811
_time = time
@@ -3902,8 +3939,6 @@ def _try_connect(self, endpoint):
39023939
self._metadata_request_timeout = None if connection.features.sharding_info is None or not self._cluster.metadata_request_timeout \
39033940
else datetime.timedelta(seconds=self._cluster.metadata_request_timeout)
39043941

3905-
self._tablets_routing_v1 = connection.features.tablets_routing_v1
3906-
39073942
# use weak references in both directions
39083943
# _clear_watcher will be called when this ControlConnection is about to be finalized
39093944
# _watch_callback will get the actual callback from the Connection and relay it to
@@ -4717,6 +4752,7 @@ class ResponseFuture(object):
47174752
_host = None
47184753
_control_connection_query_attempted = False
47194754
_TABLET_ROUTING_CTYPE = None
4755+
_TABLET_ROUTING_V2_CTYPE = None
47204756

47214757
_warned_timeout = False
47224758

@@ -4934,6 +4970,36 @@ def _handle_control_connection_response(self, connection, cb, response):
49344970
connection.in_flight -= 1
49354971
cb(response)
49364972

4973+
def _prepare_message_for_connection(self, message, connection):
4974+
"""
4975+
Return the message to send on ``connection``, attaching the
4976+
tablet_version_block to ExecuteMessages according to the capability that
4977+
*this specific connection* negotiated.
4978+
4979+
Keying off the borrowed connection (already in hand at every call site)
4980+
is both necessary and sufficient: a connection that negotiated
4981+
TABLETS_ROUTING_V2 always gets the block -- even if the pool was created,
4982+
and any cached flag latched, before the cluster feature was enabled
4983+
(e.g. mid rolling-upgrade) -- while a non-V2 connection never gets one,
4984+
even if a sibling shard connection in the same pool already negotiated
4985+
V2. The server reads the trailing byte only on V2 connections, so
4986+
attaching it to a non-V2 connection would leave an unread trailing byte
4987+
and desync the frame; a pool-level flag cannot get this right.
4988+
4989+
ExecuteMessage is copied per send because ``self.message`` is shared
4990+
across speculative executions and retries that may run concurrently on
4991+
different threads; mutating tablet_version_block on the shared instance
4992+
would race with another in-flight send encoding the same object.
4993+
"""
4994+
if not isinstance(message, ExecuteMessage):
4995+
return message
4996+
message = copy(message)
4997+
if connection.features.tablets_routing_v2:
4998+
message.tablet_version_block = self.session._compute_tablet_version_block(self.query)
4999+
else:
5000+
message.tablet_version_block = None
5001+
return message
5002+
49375003
def _query_control_connection(self, message=None, cb=None, connection=None, host=None):
49385004
self._control_connection_query_attempted = True
49395005

@@ -4961,6 +5027,9 @@ def _query_control_connection(self, message=None, cb=None, connection=None, host
49615027
cb = partial(self._set_result, host, connection, None)
49625028
cb = partial(self._handle_control_connection_response, connection, cb)
49635029

5030+
# The control connection may also be a V2 connection, so it needs the
5031+
# trailing tablet_version_block byte just like a pooled send.
5032+
message = self._prepare_message_for_connection(message, connection)
49645033
log.debug("No usable node pools; falling back to control connection for host %s", host)
49655034
self.request_encoded_size = connection.send_msg(message, request_id, cb=cb,
49665035
encoder=self._protocol_handler.encode_message,
@@ -5006,7 +5075,12 @@ def _query(self, host, message=None, cb=None):
50065075
try:
50075076
# TODO get connectTimeout from cluster settings
50085077
if self.query:
5009-
connection, request_id = pool.borrow_connection(timeout=2.0, routing_key=self.query.routing_key, keyspace=self.query.keyspace, table=self.query.table)
5078+
# Pass the statement so the pool can reuse the ring token it
5079+
# memoized for this request instead of re-hashing the routing key.
5080+
connection, request_id = pool.borrow_connection(
5081+
timeout=2.0, routing_key=self.query.routing_key,
5082+
keyspace=self.query.keyspace, table=self.query.table,
5083+
query=self.query)
50105084
else:
50115085
connection, request_id = pool.borrow_connection(timeout=2.0)
50125086
self._connection = connection
@@ -5015,6 +5089,8 @@ def _query(self, host, message=None, cb=None):
50155089
if cb is None:
50165090
cb = partial(self._set_result, host, connection, pool)
50175091

5092+
message = self._prepare_message_for_connection(message, connection)
5093+
50185094
self.request_encoded_size = connection.send_msg(message, request_id, cb=cb,
50195095
encoder=self._protocol_handler.encode_message,
50205096
decoder=self._protocol_handler.decode_message,
@@ -5117,6 +5193,27 @@ def _reprepare(self, prepare_message, host, connection, pool):
51175193
# try to submit the original prepared statement on some other host
51185194
self.send_request()
51195195

5196+
def _cache_tablet_from_payload(self, payload_key, ctype):
5197+
"""
5198+
Parse a tablets-routing ``custom_payload`` entry and cache the Tablet.
5199+
5200+
``ctype`` is the tuple type for the negotiated extension. The V1 and V2
5201+
layouts differ only by a trailing ``tablet_version`` field, and
5202+
``Tablet.from_row`` accepts that as an optional final argument, so
5203+
unpacking the decoded tuple positionally serves both. The tablet is
5204+
cached under the effective keyspace (the statement's, else the
5205+
session's) so a prepared statement executed in a session keyspace lands
5206+
under the same key ``_compute_tablet_version_block`` looks it up by;
5207+
otherwise that lookup always misses.
5208+
"""
5209+
info = self._custom_payload.get(payload_key)
5210+
protocol = self.session.cluster.protocol_version
5211+
tablet = Tablet.from_row(*ctype.from_binary(info, protocol))
5212+
keyspace = self.query.keyspace or self.session.keyspace
5213+
table = self.query.table
5214+
if tablet and keyspace and table:
5215+
self.session.cluster.metadata._tablets.add_tablet(keyspace, table, tablet)
5216+
51205217
def _set_result(self, host, connection, pool, response):
51215218
try:
51225219
self.coordinator_host = host
@@ -5132,21 +5229,23 @@ def _set_result(self, host, connection, pool, response):
51325229
self._warnings = getattr(response, 'warnings', None)
51335230
self._custom_payload = getattr(response, 'custom_payload', None)
51345231

5135-
if self._custom_payload and self.session.cluster.control_connection._tablets_routing_v1 and 'tablets-routing-v1' in self._custom_payload:
5136-
protocol = self.session.cluster.protocol_version
5137-
info = self._custom_payload.get('tablets-routing-v1')
5138-
ctype = ResponseFuture._TABLET_ROUTING_CTYPE
5139-
if ctype is None:
5140-
ctype = types.lookup_casstype('TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)))')
5141-
ResponseFuture._TABLET_ROUTING_CTYPE = ctype
5142-
tablet_routing_info = ctype.from_binary(info, protocol)
5143-
first_token = tablet_routing_info[0]
5144-
last_token = tablet_routing_info[1]
5145-
tablet_replicas = tablet_routing_info[2]
5146-
tablet = Tablet.from_row(first_token, last_token, tablet_replicas)
5147-
keyspace = self.query.keyspace
5148-
table = self.query.table
5149-
self.session.cluster.metadata._tablets.add_tablet(keyspace, table, tablet)
5232+
if self._custom_payload and connection is not None:
5233+
# Parse the routing payload according to what the connection that
5234+
# *served this request* negotiated, not the control connection:
5235+
# during a rolling upgrade connections may differ, and each
5236+
# payload key matches the extension its own connection negotiated.
5237+
if connection.features.tablets_routing_v2 and 'tablets-routing-v2' in self._custom_payload:
5238+
ctype = ResponseFuture._TABLET_ROUTING_V2_CTYPE
5239+
if ctype is None:
5240+
ctype = types.lookup_casstype('TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)), LongType)')
5241+
ResponseFuture._TABLET_ROUTING_V2_CTYPE = ctype
5242+
self._cache_tablet_from_payload('tablets-routing-v2', ctype)
5243+
elif connection.features.tablets_routing_v1 and 'tablets-routing-v1' in self._custom_payload:
5244+
ctype = ResponseFuture._TABLET_ROUTING_CTYPE
5245+
if ctype is None:
5246+
ctype = types.lookup_casstype('TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)))')
5247+
ResponseFuture._TABLET_ROUTING_CTYPE = ctype
5248+
self._cache_tablet_from_payload('tablets-routing-v1', ctype)
51505249

51515250
if isinstance(response, ResultMessage):
51525251
if response.kind == RESULT_KIND_SET_KEYSPACE:
@@ -5560,7 +5659,7 @@ def add_callback(self, fn, *args, **kwargs):
55605659
55615660
Note: in the case that the result is not available when the callback is added,
55625661
the callback is executed by IO event thread. This means that the callback
5563-
should not block or attempt further synchronous requests, because no further
5662+
should not block or attempt further synchronous requests because no further
55645663
IO will be processed until the callback returns.
55655664
55665665
**Important**: if the callback you attach results in an exception being

cassandra/policies.py

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -503,14 +503,33 @@ def make_query_plan(self, working_keyspace=None, query=None):
503503
return
504504

505505
replicas = []
506-
tablet = self._cluster_metadata._tablets.get_tablet_for_key(
507-
keyspace, query.table, self._cluster_metadata.token_map.token_class.from_key(query.routing_key))
506+
leader_host = None
507+
token = query.routing_token(self._cluster_metadata.token_map.token_class)
508+
tablet = self._cluster_metadata._tablets.get_tablet_for_key(keyspace, query.table, token)
508509

509510
if tablet is not None:
510511
replicas_mapped = set(map(lambda r: r[0], tablet.replicas))
511512
child_plan = child.make_query_plan(keyspace, query)
512513

513514
replicas = [host for host in child_plan if host.host_id in replicas_mapped]
515+
516+
# The leader concept only exists for strongly-consistent keyspaces.
517+
# TABLETS_ROUTING_V2 assigns a tablet_version to *every* tablet table
518+
# (eventually- and strongly-consistent alike), so the version alone
519+
# must not be used to infer a leader. Conversely, replicas[0] is only
520+
# leader-ordered for a tablet that came from a V2 payload, so a
521+
# versionless tablet (V1-sourced, or stale across a consistency flip)
522+
# must not be treated as a leader hint either. Require both a
523+
# strongly-consistent keyspace and a versioned tablet; otherwise keep
524+
# normal token-aware/shuffled ordering.
525+
ks_meta = self._cluster_metadata.keyspaces.get(keyspace)
526+
if (ks_meta is not None and ks_meta.strongly_consistent
527+
and tablet.tablet_version is not None and tablet.replicas):
528+
leader_host_id = tablet.replicas[0][0]
529+
for host in replicas:
530+
if host.host_id == leader_host_id:
531+
leader_host = host
532+
break
514533
else:
515534
replicas = self._cluster_metadata.get_replicas(keyspace, query.routing_key)
516535

@@ -523,10 +542,21 @@ def yield_in_order(hosts):
523542
if replica.is_up and child.distance(replica) == distance:
524543
yield replica
525544

526-
# yield replicas: local_rack, local, remote
527-
yield from yield_in_order(replicas)
545+
# If we have a leader hint, yield it first -- but respect the child
546+
# policy's own filter: never front-run a host the child policy would
547+
# exclude (e.g. one a custom policy reports as IGNORED).
548+
if (leader_host is not None and leader_host.is_up
549+
and child.distance(leader_host) != HostDistance.IGNORED):
550+
yield leader_host
551+
552+
# yield replicas: local_rack, local, remote (skipping leader already yielded)
553+
for host in yield_in_order(replicas):
554+
if host is not leader_host:
555+
yield host
528556
# yield rest of the cluster: local_rack, local, remote
529-
yield from yield_in_order([host for host in child.make_query_plan(keyspace, query) if host not in replicas])
557+
for host in yield_in_order([host for host in child.make_query_plan(keyspace, query) if host not in replicas]):
558+
if host is not leader_host:
559+
yield host
530560

531561
def on_up(self, *args, **kwargs):
532562
return self._child_policy.on_up(*args, **kwargs)

0 commit comments

Comments
 (0)