Skip to content

Commit a05a610

Browse files
committed
Route requests to the tablet leader for strongly-consistent tables
With TABLETS_ROUTING_V2 the server returns, on a tablet_version mismatch, the tablet's replica set with the Raft leader first plus the new tablet_version. Use this to route strongly-consistent reads and writes straight to the leader: * Every EXECUTE on a V2 connection carries a tablet_version_block computed from the cached version (or a random byte on cold start and for non-single-partition requests). The block is attached per connection, keyed off the connection that actually serves the request, so a rolling upgrade that mixes v1/v2 connections never desyncs the frame. * On the response, the routing payload is parsed according to what the serving connection negotiated; the v2 tuple additionally carries the tablet_version, which is stored back on the tablet. * TokenAwarePolicy yields the leader (the first replica) first, but only for strongly-consistent keyspaces -- a tablet_version is assigned to eventually-consistent tablet tables too and must not be mistaken for a leader hint. To keep this hot path cheap, the ring token is memoized once per statement (Statement.routing_token) and reused by the load balancing policy, the shard selection in the pool, and the tablet_version_block computation, instead of re-hashing the routing key three times.
1 parent 6503944 commit a05a610

6 files changed

Lines changed: 527 additions & 44 deletions

File tree

cassandra/cluster.py

Lines changed: 120 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@
8585
named_tuple_factory, dict_factory, tuple_factory, FETCH_SIZE_UNSET,
8686
HostTargetingStatement)
8787
from cassandra.marshal import int64_pack
88-
from cassandra.tablets import Tablet, Tablets
88+
from cassandra.tablets import Tablet, Tablets, choose_tablet_version_block, random_tablet_version_block
8989
from cassandra.timestamps import MonotonicTimestampGenerator
9090
from cassandra.util import _resolve_contact_points_to_string_map, Version, maybe_add_timeout_to_query
9191

@@ -3059,6 +3059,8 @@ def _create_response_future(self, query, parameters, trace, custom_payload,
30593059
continuous_paging_options, statement_keyspace)
30603060
elif isinstance(query, BoundStatement):
30613061
prepared_statement = query.prepared_statement
3062+
# The tablet_version_block is filled in per-target-host at send time
3063+
# (see ResponseFuture._query), because V2 is negotiated per connection.
30623064
message = ExecuteMessage(
30633065
prepared_statement.query_id, query.values, cl,
30643066
serial_cl, fetch_size, paging_state, timestamp,
@@ -3093,6 +3095,42 @@ def _create_response_future(self, query, parameters, trace, custom_payload,
30933095
load_balancer=load_balancing_policy, start_time=start_time, speculative_execution_plan=spec_exec_plan,
30943096
continuous_paging_state=None, host=host)
30953097

3098+
def _compute_tablet_version_block(self, query):
3099+
"""
3100+
Compute the tablet_version_block byte for a BoundStatement.
3101+
3102+
Always returns an int in [0, 255]. When no cached tablet is known for the
3103+
routing key (unknown keyspace/table, vnode table, cold cache, or a
3104+
missing token map) a random block is returned; the server treats that as
3105+
a version miss and replies with fresh routing info. Callers invoke this
3106+
only for connections that negotiated TABLETS_ROUTING_V2.
3107+
"""
3108+
routing_key = query.routing_key
3109+
if routing_key is None:
3110+
return random_tablet_version_block()
3111+
3112+
keyspace = query.keyspace or self.keyspace
3113+
table = query.table
3114+
if not keyspace or not table:
3115+
return random_tablet_version_block()
3116+
3117+
# Skip the Murmur3 token hash + tablet lookup when we have no cached
3118+
# tablets for this table (vnode tables, or tablet tables on cold start);
3119+
# both correctly fall back to a random block below.
3120+
if not self.cluster.metadata._tablets.table_has_tablets(keyspace, table):
3121+
return random_tablet_version_block()
3122+
3123+
token_map = self.cluster.metadata.token_map
3124+
if token_map is None:
3125+
return random_tablet_version_block()
3126+
3127+
t = query.routing_token(token_map.token_class)
3128+
tablet = self.cluster.metadata._tablets.get_tablet_for_key(keyspace, table, t)
3129+
if tablet is None or tablet.tablet_version is None:
3130+
return random_tablet_version_block()
3131+
3132+
return choose_tablet_version_block(tablet.tablet_version)
3133+
30963134
def get_execution_profile(self, name):
30973135
"""
30983136
Returns the execution profile associated with the provided ``name``.
@@ -3764,7 +3802,6 @@ class PeersQueryType(object):
37643802
_schema_meta_page_size = 1000
37653803

37663804
_uses_peers_v2 = True
3767-
_tablets_routing_v1 = False
37683805

37693806
# for testing purposes
37703807
_time = time
@@ -3898,8 +3935,6 @@ def _try_connect(self, endpoint):
38983935
self._metadata_request_timeout = None if connection.features.sharding_info is None or not self._cluster.metadata_request_timeout \
38993936
else datetime.timedelta(seconds=self._cluster.metadata_request_timeout)
39003937

3901-
self._tablets_routing_v1 = connection.features.tablets_routing_v1
3902-
39033938
# use weak references in both directions
39043939
# _clear_watcher will be called when this ControlConnection is about to be finalized
39053940
# _watch_callback will get the actual callback from the Connection and relay it to
@@ -4713,6 +4748,7 @@ class ResponseFuture(object):
47134748
_host = None
47144749
_control_connection_query_attempted = False
47154750
_TABLET_ROUTING_CTYPE = None
4751+
_TABLET_ROUTING_V2_CTYPE = None
47164752

47174753
_warned_timeout = False
47184754

@@ -4930,6 +4966,36 @@ def _handle_control_connection_response(self, connection, cb, response):
49304966
connection.in_flight -= 1
49314967
cb(response)
49324968

4969+
def _prepare_message_for_connection(self, message, connection):
4970+
"""
4971+
Return the message to send on ``connection``, attaching the
4972+
tablet_version_block to ExecuteMessages according to the capability that
4973+
*this specific connection* negotiated.
4974+
4975+
Keying off the borrowed connection (already in hand at every call site)
4976+
is both necessary and sufficient: a connection that negotiated
4977+
TABLETS_ROUTING_V2 always gets the block -- even if the pool was created,
4978+
and any cached flag latched, before the cluster feature was enabled
4979+
(e.g. mid rolling-upgrade) -- while a non-V2 connection never gets one,
4980+
even if a sibling shard connection in the same pool already negotiated
4981+
V2. The server reads the trailing byte only on V2 connections, so
4982+
attaching it to a non-V2 connection would leave an unread trailing byte
4983+
and desync the frame; a pool-level flag cannot get this right.
4984+
4985+
ExecuteMessage is copied per send because ``self.message`` is shared
4986+
across speculative executions and retries that may run concurrently on
4987+
different threads; mutating tablet_version_block on the shared instance
4988+
would race with another in-flight send encoding the same object.
4989+
"""
4990+
if not isinstance(message, ExecuteMessage):
4991+
return message
4992+
message = copy(message)
4993+
if connection.features.tablets_routing_v2:
4994+
message.tablet_version_block = self.session._compute_tablet_version_block(self.query)
4995+
else:
4996+
message.tablet_version_block = None
4997+
return message
4998+
49334999
def _query_control_connection(self, message=None, cb=None, connection=None, host=None):
49345000
self._control_connection_query_attempted = True
49355001

@@ -4957,6 +5023,9 @@ def _query_control_connection(self, message=None, cb=None, connection=None, host
49575023
cb = partial(self._set_result, host, connection, None)
49585024
cb = partial(self._handle_control_connection_response, connection, cb)
49595025

5026+
# The control connection may also be a V2 connection, so it needs the
5027+
# trailing tablet_version_block byte just like a pooled send.
5028+
message = self._prepare_message_for_connection(message, connection)
49605029
log.debug("No usable node pools; falling back to control connection for host %s", host)
49615030
self.request_encoded_size = connection.send_msg(message, request_id, cb=cb,
49625031
encoder=self._protocol_handler.encode_message,
@@ -5002,7 +5071,12 @@ def _query(self, host, message=None, cb=None):
50025071
try:
50035072
# TODO get connectTimeout from cluster settings
50045073
if self.query:
5005-
connection, request_id = pool.borrow_connection(timeout=2.0, routing_key=self.query.routing_key, keyspace=self.query.keyspace, table=self.query.table)
5074+
# Pass the statement so the pool can reuse the ring token it
5075+
# memoized for this request instead of re-hashing the routing key.
5076+
connection, request_id = pool.borrow_connection(
5077+
timeout=2.0, routing_key=self.query.routing_key,
5078+
keyspace=self.query.keyspace, table=self.query.table,
5079+
query=self.query)
50065080
else:
50075081
connection, request_id = pool.borrow_connection(timeout=2.0)
50085082
self._connection = connection
@@ -5011,6 +5085,8 @@ def _query(self, host, message=None, cb=None):
50115085
if cb is None:
50125086
cb = partial(self._set_result, host, connection, pool)
50135087

5088+
message = self._prepare_message_for_connection(message, connection)
5089+
50145090
self.request_encoded_size = connection.send_msg(message, request_id, cb=cb,
50155091
encoder=self._protocol_handler.encode_message,
50165092
decoder=self._protocol_handler.decode_message,
@@ -5113,6 +5189,27 @@ def _reprepare(self, prepare_message, host, connection, pool):
51135189
# try to submit the original prepared statement on some other host
51145190
self.send_request()
51155191

5192+
def _cache_tablet_from_payload(self, payload_key, ctype):
5193+
"""
5194+
Parse a tablets-routing ``custom_payload`` entry and cache the Tablet.
5195+
5196+
``ctype`` is the tuple type for the negotiated extension. The V1 and V2
5197+
layouts differ only by a trailing ``tablet_version`` field, and
5198+
``Tablet.from_row`` accepts that as an optional final argument, so
5199+
unpacking the decoded tuple positionally serves both. The tablet is
5200+
cached under the effective keyspace (the statement's, else the
5201+
session's) so a prepared statement executed in a session keyspace lands
5202+
under the same key ``_compute_tablet_version_block`` looks it up by;
5203+
otherwise that lookup always misses.
5204+
"""
5205+
info = self._custom_payload.get(payload_key)
5206+
protocol = self.session.cluster.protocol_version
5207+
tablet = Tablet.from_row(*ctype.from_binary(info, protocol))
5208+
keyspace = self.query.keyspace or self.session.keyspace
5209+
table = self.query.table
5210+
if tablet and keyspace and table:
5211+
self.session.cluster.metadata._tablets.add_tablet(keyspace, table, tablet)
5212+
51165213
def _set_result(self, host, connection, pool, response):
51175214
try:
51185215
self.coordinator_host = host
@@ -5128,21 +5225,23 @@ def _set_result(self, host, connection, pool, response):
51285225
self._warnings = getattr(response, 'warnings', None)
51295226
self._custom_payload = getattr(response, 'custom_payload', None)
51305227

5131-
if self._custom_payload and self.session.cluster.control_connection._tablets_routing_v1 and 'tablets-routing-v1' in self._custom_payload:
5132-
protocol = self.session.cluster.protocol_version
5133-
info = self._custom_payload.get('tablets-routing-v1')
5134-
ctype = ResponseFuture._TABLET_ROUTING_CTYPE
5135-
if ctype is None:
5136-
ctype = types.lookup_casstype('TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)))')
5137-
ResponseFuture._TABLET_ROUTING_CTYPE = ctype
5138-
tablet_routing_info = ctype.from_binary(info, protocol)
5139-
first_token = tablet_routing_info[0]
5140-
last_token = tablet_routing_info[1]
5141-
tablet_replicas = tablet_routing_info[2]
5142-
tablet = Tablet.from_row(first_token, last_token, tablet_replicas)
5143-
keyspace = self.query.keyspace
5144-
table = self.query.table
5145-
self.session.cluster.metadata._tablets.add_tablet(keyspace, table, tablet)
5228+
if self._custom_payload and connection is not None:
5229+
# Parse the routing payload according to what the connection that
5230+
# *served this request* negotiated, not the control connection:
5231+
# during a rolling upgrade connections may differ, and each
5232+
# payload key matches the extension its own connection negotiated.
5233+
if connection.features.tablets_routing_v2 and 'tablets-routing-v2' in self._custom_payload:
5234+
ctype = ResponseFuture._TABLET_ROUTING_V2_CTYPE
5235+
if ctype is None:
5236+
ctype = types.lookup_casstype('TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)), LongType)')
5237+
ResponseFuture._TABLET_ROUTING_V2_CTYPE = ctype
5238+
self._cache_tablet_from_payload('tablets-routing-v2', ctype)
5239+
elif connection.features.tablets_routing_v1 and 'tablets-routing-v1' in self._custom_payload:
5240+
ctype = ResponseFuture._TABLET_ROUTING_CTYPE
5241+
if ctype is None:
5242+
ctype = types.lookup_casstype('TupleType(LongType, LongType, ListType(TupleType(UUIDType, Int32Type)))')
5243+
ResponseFuture._TABLET_ROUTING_CTYPE = ctype
5244+
self._cache_tablet_from_payload('tablets-routing-v1', ctype)
51465245

51475246
if isinstance(response, ResultMessage):
51485247
if response.kind == RESULT_KIND_SET_KEYSPACE:
@@ -5546,7 +5645,7 @@ def add_callback(self, fn, *args, **kwargs):
55465645
55475646
Note: in the case that the result is not available when the callback is added,
55485647
the callback is executed by IO event thread. This means that the callback
5549-
should not block or attempt further synchronous requests, because no further
5648+
should not block or attempt further synchronous requests because no further
55505649
IO will be processed until the callback returns.
55515650
55525651
**Important**: if the callback you attach results in an exception being

cassandra/policies.py

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -503,14 +503,33 @@ def make_query_plan(self, working_keyspace=None, query=None):
503503
return
504504

505505
replicas = []
506-
tablet = self._cluster_metadata._tablets.get_tablet_for_key(
507-
keyspace, query.table, self._cluster_metadata.token_map.token_class.from_key(query.routing_key))
506+
leader_host = None
507+
token = query.routing_token(self._cluster_metadata.token_map.token_class)
508+
tablet = self._cluster_metadata._tablets.get_tablet_for_key(keyspace, query.table, token)
508509

509510
if tablet is not None:
510511
replicas_mapped = set(map(lambda r: r[0], tablet.replicas))
511512
child_plan = child.make_query_plan(keyspace, query)
512513

513514
replicas = [host for host in child_plan if host.host_id in replicas_mapped]
515+
516+
# The leader concept only exists for strongly-consistent keyspaces.
517+
# TABLETS_ROUTING_V2 assigns a tablet_version to *every* tablet table
518+
# (eventually- and strongly-consistent alike), so the version alone
519+
# must not be used to infer a leader. Conversely, replicas[0] is only
520+
# leader-ordered for a tablet that came from a V2 payload, so a
521+
# versionless tablet (V1-sourced, or stale across a consistency flip)
522+
# must not be treated as a leader hint either. Require both a
523+
# strongly-consistent keyspace and a versioned tablet; otherwise keep
524+
# normal token-aware/shuffled ordering.
525+
ks_meta = self._cluster_metadata.keyspaces.get(keyspace)
526+
if (ks_meta is not None and ks_meta.strongly_consistent
527+
and tablet.tablet_version is not None and tablet.replicas):
528+
leader_host_id = tablet.replicas[0][0]
529+
for host in replicas:
530+
if host.host_id == leader_host_id:
531+
leader_host = host
532+
break
514533
else:
515534
replicas = self._cluster_metadata.get_replicas(keyspace, query.routing_key)
516535

@@ -523,10 +542,21 @@ def yield_in_order(hosts):
523542
if replica.is_up and child.distance(replica) == distance:
524543
yield replica
525544

526-
# yield replicas: local_rack, local, remote
527-
yield from yield_in_order(replicas)
545+
# If we have a leader hint, yield it first -- but respect the child
546+
# policy's own filter: never front-run a host the child policy would
547+
# exclude (e.g. one a custom policy reports as IGNORED).
548+
if (leader_host is not None and leader_host.is_up
549+
and child.distance(leader_host) != HostDistance.IGNORED):
550+
yield leader_host
551+
552+
# yield replicas: local_rack, local, remote (skipping leader already yielded)
553+
for host in yield_in_order(replicas):
554+
if host is not leader_host:
555+
yield host
528556
# yield rest of the cluster: local_rack, local, remote
529-
yield from yield_in_order([host for host in child.make_query_plan(keyspace, query) if host not in replicas])
557+
for host in yield_in_order([host for host in child.make_query_plan(keyspace, query) if host not in replicas]):
558+
if host is not leader_host:
559+
yield host
530560

531561
def on_up(self, *args, **kwargs):
532562
return self._child_policy.on_up(*args, **kwargs)

0 commit comments

Comments
 (0)