Skip to content

Commit f42e225

Browse files
committed
DRIVER-153: negotiate and implement SCYLLA_USE_METADATA_ID extension
Scylla's SCYLLA_USE_METADATA_ID protocol extension (backport of CQL v5 prepared-statement metadata IDs to earlier protocol versions) allows the driver to skip sending full result metadata on every EXECUTE request. The server notifies the driver via the METADATA_CHANGED flag whenever the result schema changes, at which point the driver updates its cached metadata before deserialising the response. Changes: - protocol_features.py: parse SCYLLA_USE_METADATA_ID from SUPPORTED and include it in the STARTUP frame when negotiated - protocol.py: * fix _write_query_params to actually write _SKIP_METADATA_FLAG on the wire (it was stored on the message but never sent — dead code before) * recv_results_prepared: read result_metadata_id for Scylla extension (pre-v5) in addition to standard protocol v5+ * ExecuteMessage.send_body: send result_metadata_id for Scylla extension (pre-v5) when it is set - cluster.py: * ExecuteMessage is built with safe defaults (skip_meta=False, result_metadata_id=None); both are set in _query() after borrowing the connection, gated on connection.features.use_metadata_id and on the prepared statement actually having a result_metadata_id (so a statement prepared before the extension was available, or on a node that doesn't support it, never gets skip_meta=True with no id) * _set_result: update prepared_statement.result_metadata and result_metadata_id when the server signals METADATA_CHANGED in an EXECUTE response, keeping the driver's cached metadata in sync; uses getattr to safely handle FastResultMessage (Cython decoder)
1 parent 153c913 commit f42e225

5 files changed

Lines changed: 156 additions & 7 deletions

File tree

cassandra/cluster.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2998,9 +2998,7 @@ def _create_response_future(self, query, parameters, trace, custom_payload,
29982998
message = ExecuteMessage(
29992999
prepared_statement.query_id, query.values, cl,
30003000
serial_cl, fetch_size, paging_state, timestamp,
3001-
skip_meta=bool(prepared_statement.result_metadata),
3002-
continuous_paging_options=continuous_paging_options,
3003-
result_metadata_id=prepared_statement.result_metadata_id)
3001+
continuous_paging_options=continuous_paging_options)
30043002
elif isinstance(query, BatchStatement):
30053003
if self._protocol_version < 2:
30063004
raise UnsupportedOperation(
@@ -4618,6 +4616,15 @@ def _query(self, host, message=None, cb=None):
46184616
self._connection = connection
46194617
result_meta = self.prepared_statement.result_metadata if self.prepared_statement else []
46204618

4619+
if self.prepared_statement and isinstance(message, ExecuteMessage):
4620+
has_result_metadata_id = self.prepared_statement.result_metadata_id is not None
4621+
use_metadata_id = has_result_metadata_id and (
4622+
ProtocolVersion.uses_prepared_metadata(connection.protocol_version)
4623+
or connection.features.use_metadata_id
4624+
)
4625+
message.skip_meta = use_metadata_id
4626+
message.result_metadata_id = self.prepared_statement.result_metadata_id if use_metadata_id else None
4627+
46214628
if cb is None:
46224629
cb = partial(self._set_result, host, connection, pool)
46234630

@@ -4774,6 +4781,11 @@ def _set_result(self, host, connection, pool, response):
47744781
self._paging_state = response.paging_state
47754782
self._col_names = response.column_names
47764783
self._col_types = response.column_types
4784+
new_result_metadata_id = getattr(response, 'result_metadata_id', None)
4785+
if self.prepared_statement and new_result_metadata_id is not None:
4786+
if response.column_metadata:
4787+
self.prepared_statement.result_metadata = response.column_metadata
4788+
self.prepared_statement.result_metadata_id = new_result_metadata_id
47774789
if getattr(self.message, 'continuous_paging_options', None):
47784790
self._handle_continuous_paging_first_response(connection, response)
47794791
else:

cassandra/protocol.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,9 @@ def _write_query_params(self, f, protocol_version):
573573
if self.timestamp is not None:
574574
flags |= _PROTOCOL_TIMESTAMP_FLAG
575575

576+
if self.skip_meta:
577+
flags |= _SKIP_METADATA_FLAG
578+
576579
if self.keyspace is not None:
577580
if ProtocolVersion.uses_keyspace_flag(protocol_version):
578581
flags |= _WITH_KEYSPACE_FLAG
@@ -642,6 +645,8 @@ def send_body(self, f, protocol_version):
642645
write_string(f, self.query_id)
643646
if ProtocolVersion.uses_prepared_metadata(protocol_version):
644647
write_string(f, self.result_metadata_id)
648+
elif self.result_metadata_id is not None:
649+
write_string(f, self.result_metadata_id)
645650
self._write_query_params(f, protocol_version)
646651

647652

@@ -745,7 +750,7 @@ def decode_row(row):
745750

746751
def recv_results_prepared(self, f, protocol_version, protocol_features, user_type_map):
747752
self.query_id = read_binary_string(f)
748-
if ProtocolVersion.uses_prepared_metadata(protocol_version):
753+
if ProtocolVersion.uses_prepared_metadata(protocol_version) or protocol_features.use_metadata_id:
749754
self.result_metadata_id = read_binary_string(f)
750755
else:
751756
self.result_metadata_id = None

cassandra/protocol_features.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,28 +10,34 @@
1010
LWT_OPTIMIZATION_META_BIT_MASK = "LWT_OPTIMIZATION_META_BIT_MASK"
1111
RATE_LIMIT_ERROR_EXTENSION = "SCYLLA_RATE_LIMIT_ERROR"
1212
TABLETS_ROUTING_V1 = "TABLETS_ROUTING_V1"
13+
USE_METADATA_ID = "SCYLLA_USE_METADATA_ID"
1314

1415
class ProtocolFeatures(object):
1516
rate_limit_error = None
1617
shard_id = 0
1718
sharding_info = None
1819
tablets_routing_v1 = False
1920
lwt_info = None
21+
use_metadata_id = False
2022

21-
def __init__(self, rate_limit_error=None, shard_id=0, sharding_info=None, tablets_routing_v1=False, lwt_info=None):
23+
def __init__(self, rate_limit_error=None, shard_id=0, sharding_info=None, tablets_routing_v1=False, lwt_info=None,
24+
use_metadata_id=False):
2225
self.rate_limit_error = rate_limit_error
2326
self.shard_id = shard_id
2427
self.sharding_info = sharding_info
2528
self.tablets_routing_v1 = tablets_routing_v1
2629
self.lwt_info = lwt_info
30+
self.use_metadata_id = use_metadata_id
2731

2832
@staticmethod
2933
def parse_from_supported(supported):
3034
rate_limit_error = ProtocolFeatures.maybe_parse_rate_limit_error(supported)
3135
shard_id, sharding_info = ProtocolFeatures.parse_sharding_info(supported)
3236
tablets_routing_v1 = ProtocolFeatures.parse_tablets_info(supported)
3337
lwt_info = ProtocolFeatures.parse_lwt_info(supported)
34-
return ProtocolFeatures(rate_limit_error, shard_id, sharding_info, tablets_routing_v1, lwt_info)
38+
use_metadata_id = ProtocolFeatures.parse_use_metadata_id(supported)
39+
return ProtocolFeatures(rate_limit_error, shard_id, sharding_info, tablets_routing_v1, lwt_info,
40+
use_metadata_id)
3541

3642
@staticmethod
3743
def maybe_parse_rate_limit_error(supported):
@@ -57,6 +63,8 @@ def add_startup_options(self, options):
5763
options[TABLETS_ROUTING_V1] = ""
5864
if self.lwt_info is not None:
5965
options[LWT_ADD_METADATA_MARK] = str(self.lwt_info.lwt_meta_bit_mask)
66+
if self.use_metadata_id:
67+
options[USE_METADATA_ID] = ""
6068

6169
@staticmethod
6270
def parse_sharding_info(options):
@@ -81,6 +89,10 @@ def parse_sharding_info(options):
8189
def parse_tablets_info(options):
8290
return TABLETS_ROUTING_V1 in options
8391

92+
@staticmethod
93+
def parse_use_metadata_id(options):
94+
return USE_METADATA_ID in options
95+
8496
@staticmethod
8597
def parse_lwt_info(options):
8698
value_list = options.get(LWT_ADD_METADATA_MARK, [None])

tests/unit/test_protocol.py

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import io
16+
import struct
1517
import unittest
1618

1719
from unittest.mock import Mock
@@ -21,8 +23,10 @@
2123
PrepareMessage, QueryMessage, ExecuteMessage, UnsupportedOperation,
2224
_PAGING_OPTIONS_FLAG, _WITH_SERIAL_CONSISTENCY_FLAG,
2325
_PAGE_SIZE_FLAG, _WITH_PAGING_STATE_FLAG,
24-
BatchMessage
26+
_SKIP_METADATA_FLAG,
27+
BatchMessage, ResultMessage
2528
)
29+
from cassandra.protocol_features import ProtocolFeatures
2630
from cassandra.query import BatchType
2731
from cassandra.marshal import uint32_unpack
2832
from cassandra.cluster import ContinuousPagingOptions
@@ -68,6 +72,87 @@ def test_execute_message(self):
6872
(b'\x00\x04',),
6973
(b'\x00\x00\x00\x01',), (b'\x00\x00',)])
7074

75+
def test_execute_message_skip_meta_flag(self):
76+
"""skip_meta=True must set _SKIP_METADATA_FLAG (0x02) in the flags byte."""
77+
message = ExecuteMessage('1', [], 4, skip_meta=True)
78+
mock_io = Mock()
79+
80+
message.send_body(mock_io, 4)
81+
# flags byte should be VALUES_FLAG | SKIP_METADATA_FLAG = 0x01 | 0x02 = 0x03
82+
self._check_calls(mock_io, [(b'\x00\x01',), (b'1',), (b'\x00\x04',), (b'\x03',), (b'\x00\x00',)])
83+
84+
def test_execute_message_scylla_metadata_id_v4(self):
85+
"""result_metadata_id should be written on protocol v4 when set (Scylla extension)."""
86+
message = ExecuteMessage('1', [], 4)
87+
message.result_metadata_id = b'foo'
88+
mock_io = Mock()
89+
90+
message.send_body(mock_io, 4)
91+
# metadata_id written before query params (same position as v5)
92+
self._check_calls(mock_io, [(b'\x00\x01',), (b'1',),
93+
(b'\x00\x03',), (b'foo',),
94+
(b'\x00\x04',), (b'\x01',), (b'\x00\x00',)])
95+
96+
def test_recv_results_prepared_scylla_extension_reads_metadata_id(self):
97+
"""
98+
When use_metadata_id is True (Scylla extension), result_metadata_id must be
99+
read from the PREPARE response even for protocol v4.
100+
"""
101+
# Build a minimal valid PREPARE response binary (no bind/result columns):
102+
# query_id: short(2) + b'ab'
103+
# result_metadata_id: short(3) + b'xyz' <-- only present when extension active
104+
# prepared flags: int(1) = global_tables_spec
105+
# colcount: int(0)
106+
# num_pk_indexes: int(0)
107+
# ksname: short(2) + b'ks'
108+
# cfname: short(2) + b'tb'
109+
# result flags: int(4) = no_metadata
110+
# result colcount: int(0)
111+
buf = io.BytesIO(
112+
struct.pack('>H', 2) + b'ab' # query_id
113+
+ struct.pack('>H', 3) + b'xyz' # result_metadata_id
114+
+ struct.pack('>i', 1) # prepared flags: global_tables_spec
115+
+ struct.pack('>i', 0) # colcount = 0
116+
+ struct.pack('>i', 0) # num_pk_indexes = 0
117+
+ struct.pack('>H', 2) + b'ks' # ksname
118+
+ struct.pack('>H', 2) + b'tb' # cfname
119+
+ struct.pack('>i', 4) # result flags: no_metadata
120+
+ struct.pack('>i', 0) # result colcount = 0
121+
)
122+
123+
features_with_extension = ProtocolFeatures(use_metadata_id=True)
124+
msg = ResultMessage(kind=4) # RESULT_KIND_PREPARED = 4
125+
msg.recv_results_prepared(buf, protocol_version=4,
126+
protocol_features=features_with_extension,
127+
user_type_map={})
128+
assert msg.query_id == b'ab'
129+
assert msg.result_metadata_id == b'xyz'
130+
131+
def test_recv_results_prepared_no_extension_skips_metadata_id(self):
132+
"""
133+
Without use_metadata_id, result_metadata_id must NOT be read on protocol v4.
134+
The buffer must NOT contain a metadata_id field.
135+
"""
136+
buf = io.BytesIO(
137+
struct.pack('>H', 2) + b'ab' # query_id
138+
# no result_metadata_id
139+
+ struct.pack('>i', 1) # prepared flags: global_tables_spec
140+
+ struct.pack('>i', 0) # colcount = 0
141+
+ struct.pack('>i', 0) # num_pk_indexes = 0
142+
+ struct.pack('>H', 2) + b'ks' # ksname
143+
+ struct.pack('>H', 2) + b'tb' # cfname
144+
+ struct.pack('>i', 4) # result flags: no_metadata
145+
+ struct.pack('>i', 0) # result colcount = 0
146+
)
147+
148+
features_without_extension = ProtocolFeatures(use_metadata_id=False)
149+
msg = ResultMessage(kind=4)
150+
msg.recv_results_prepared(buf, protocol_version=4,
151+
protocol_features=features_without_extension,
152+
user_type_map={})
153+
assert msg.query_id == b'ab'
154+
assert msg.result_metadata_id is None
155+
71156
def test_query_message(self):
72157
"""
73158
Test to check the appropriate calls are made

tests/unit/test_protocol_features.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,38 @@ class OptionsHolder(object):
2222
assert protocol_features.rate_limit_error == 123
2323
assert protocol_features.shard_id == 0
2424
assert protocol_features.sharding_info is None
25+
26+
def test_use_metadata_id_parsing(self):
27+
"""
28+
Test that SCYLLA_USE_METADATA_ID is parsed from SUPPORTED options.
29+
"""
30+
options = {'SCYLLA_USE_METADATA_ID': ['']}
31+
protocol_features = ProtocolFeatures.parse_from_supported(options)
32+
assert protocol_features.use_metadata_id is True
33+
34+
def test_use_metadata_id_missing(self):
35+
"""
36+
Test that use_metadata_id is False when SCYLLA_USE_METADATA_ID is absent.
37+
"""
38+
options = {'SCYLLA_RATE_LIMIT_ERROR': ['ERROR_CODE=1']}
39+
protocol_features = ProtocolFeatures.parse_from_supported(options)
40+
assert protocol_features.use_metadata_id is False
41+
42+
def test_use_metadata_id_startup_options(self):
43+
"""
44+
Test that SCYLLA_USE_METADATA_ID is included in STARTUP options when negotiated.
45+
"""
46+
options = {'SCYLLA_USE_METADATA_ID': ['']}
47+
protocol_features = ProtocolFeatures.parse_from_supported(options)
48+
startup = {}
49+
protocol_features.add_startup_options(startup)
50+
assert 'SCYLLA_USE_METADATA_ID' in startup
51+
52+
def test_use_metadata_id_not_in_startup_when_not_negotiated(self):
53+
"""
54+
Test that SCYLLA_USE_METADATA_ID is NOT included in STARTUP when not negotiated.
55+
"""
56+
protocol_features = ProtocolFeatures.parse_from_supported({})
57+
startup = {}
58+
protocol_features.add_startup_options(startup)
59+
assert 'SCYLLA_USE_METADATA_ID' not in startup

0 commit comments

Comments
 (0)