Skip to content

Commit 30d3a44

Browse files
committed
perf: inline row decoding and eliminate closures in recv_results_rows
Split recv_results_rows into fast path (no column encryption) and slow path (column encryption enabled): Fast path (common case): - Reads raw column bytes and decodes types in a single pass per row via _decode_row_inline(), eliminating the intermediate list-of-lists - Skips ColDesc namedtuple creation entirely (only needed for CE) - No closure allocation per call - Wraps decode errors with column name/type info for diagnostics Slow path (column encryption): - Preserves full CE logic with ColDesc creation - Moves decode_val/decode_row closures to module-level functions (_decode_val_ce, _decode_row_ce) to avoid per-call closure overhead Note: This PR modifies the same method as PR #630 (which also splits recv_results_rows into CE/non-CE branches). There will be a merge conflict that needs manual resolution if both PRs are accepted.
1 parent caa98b6 commit 30d3a44

2 files changed

Lines changed: 81 additions & 25 deletions

File tree

cassandra/protocol.py

Lines changed: 61 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -717,31 +717,35 @@ def recv_results_rows(self, f, protocol_version, user_type_map, result_metadata,
717717
self.recv_results_metadata(f, user_type_map)
718718
column_metadata = self.column_metadata or result_metadata
719719
rowcount = read_int(f)
720-
rows = [self.recv_row(f, len(column_metadata)) for _ in range(rowcount)]
721720
self.column_names = [c[2] for c in column_metadata]
722721
self.column_types = [c[3] for c in column_metadata]
723-
col_descs = [ColDesc(md[0], md[1], md[2]) for md in column_metadata]
724722

725-
def decode_val(val, col_md, col_desc):
726-
uses_ce = column_encryption_policy and column_encryption_policy.contains_column(col_desc)
727-
col_type = column_encryption_policy.column_type(col_desc) if uses_ce else col_md[3]
728-
raw_bytes = column_encryption_policy.decrypt(col_desc, val) if uses_ce else val
729-
return col_type.from_binary(raw_bytes, protocol_version)
730-
731-
def decode_row(row):
732-
return tuple(decode_val(val, col_md, col_desc) for val, col_md, col_desc in zip(row, column_metadata, col_descs))
733-
734-
try:
735-
self.parsed_rows = [decode_row(row) for row in rows]
736-
except Exception:
737-
for row in rows:
738-
for val, col_md, col_desc in zip(row, column_metadata, col_descs):
739-
try:
740-
decode_val(val, col_md, col_desc)
741-
except Exception as e:
742-
raise DriverException('Failed decoding result column "%s" of type %s: %s' % (col_md[2],
743-
col_md[3].cql_parameterized_type(),
744-
str(e)))
723+
if not column_encryption_policy:
724+
# Fast path: no column encryption — decode inline, skip ColDesc creation
725+
self.parsed_rows = [
726+
_decode_row_inline(f, column_metadata, protocol_version)
727+
for _ in range(rowcount)
728+
]
729+
else:
730+
# Slow path: column encryption enabled — need ColDesc and per-column CE check
731+
rows = [self.recv_row(f, len(column_metadata)) for _ in range(rowcount)]
732+
col_descs = [ColDesc(md[0], md[1], md[2]) for md in column_metadata]
733+
try:
734+
self.parsed_rows = [
735+
_decode_row_ce(row, column_metadata, col_descs,
736+
column_encryption_policy, protocol_version)
737+
for row in rows
738+
]
739+
except Exception:
740+
for row in rows:
741+
for val, col_md, col_desc in zip(row, column_metadata, col_descs):
742+
try:
743+
_decode_val_ce(val, col_md, col_desc,
744+
column_encryption_policy, protocol_version)
745+
except Exception as e:
746+
raise DriverException('Failed decoding result column "%s" of type %s: %s' % (col_md[2],
747+
col_md[3].cql_parameterized_type(),
748+
str(e)))
745749

746750
def recv_results_prepared(self, f, protocol_version, protocol_features, user_type_map):
747751
self.query_id = read_binary_string(f)
@@ -1424,6 +1428,41 @@ def read_error_code_map(f):
14241428
return error_code_map
14251429

14261430

1431+
1432+
def _decode_row_inline(f, column_metadata, protocol_version):
1433+
"""Decode a single row directly from the buffer (no column encryption)."""
1434+
row = []
1435+
for col_md in column_metadata:
1436+
size = read_int(f)
1437+
if size < 0:
1438+
row.append(None)
1439+
else:
1440+
val = f.read(size)
1441+
try:
1442+
row.append(col_md[3].from_binary(val, protocol_version))
1443+
except Exception as e:
1444+
raise DriverException('Failed decoding result column "%s" of type %s: %s' % (col_md[2],
1445+
col_md[3].cql_parameterized_type(),
1446+
str(e)))
1447+
return tuple(row)
1448+
1449+
1450+
def _decode_val_ce(val, col_md, col_desc, column_encryption_policy, protocol_version):
1451+
"""Decode a single column value with column encryption support."""
1452+
uses_ce = column_encryption_policy.contains_column(col_desc)
1453+
col_type = column_encryption_policy.column_type(col_desc) if uses_ce else col_md[3]
1454+
raw_bytes = column_encryption_policy.decrypt(col_desc, val) if uses_ce else val
1455+
return col_type.from_binary(raw_bytes, protocol_version)
1456+
1457+
1458+
def _decode_row_ce(row, column_metadata, col_descs, column_encryption_policy, protocol_version):
1459+
"""Decode a full row with column encryption support."""
1460+
return tuple(
1461+
_decode_val_ce(val, col_md, col_desc, column_encryption_policy, protocol_version)
1462+
for val, col_md, col_desc in zip(row, column_metadata, col_descs)
1463+
)
1464+
1465+
14271466
def read_value(f):
14281467
size = read_int(f)
14291468
if size < 0:

tests/unit/test_protocol.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,17 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import io
1516
import unittest
1617

1718
from unittest.mock import Mock
1819

19-
from cassandra import ProtocolVersion, UnsupportedOperation
20+
from cassandra import DriverException, ProtocolVersion, UnsupportedOperation, type_codes
2021
from cassandra.protocol import (
21-
PrepareMessage, QueryMessage, ExecuteMessage, UnsupportedOperation,
22+
PrepareMessage, QueryMessage, ExecuteMessage, ResultMessage, UnsupportedOperation,
2223
_PAGING_OPTIONS_FLAG, _WITH_SERIAL_CONSISTENCY_FLAG,
2324
_PAGE_SIZE_FLAG, _WITH_PAGING_STATE_FLAG,
24-
BatchMessage
25+
BatchMessage, RESULT_KIND_ROWS, write_int, write_short, write_string
2526
)
2627
from cassandra.query import BatchType
2728
from cassandra.marshal import uint32_unpack
@@ -31,6 +32,22 @@
3132

3233
class MessageTest(unittest.TestCase):
3334

35+
def test_result_message_wraps_inline_decode_errors(self):
36+
body = io.BytesIO()
37+
write_int(body, RESULT_KIND_ROWS)
38+
write_int(body, 0)
39+
write_int(body, 1)
40+
write_string(body, "ks")
41+
write_string(body, "tbl")
42+
write_string(body, "v")
43+
write_short(body, type_codes.DateType)
44+
write_int(body, 1)
45+
write_int(body, 1)
46+
body.write(b"\x00")
47+
48+
with pytest.raises(DriverException, match='Failed decoding result column "v"'):
49+
ResultMessage.recv_body(io.BytesIO(body.getvalue()), ProtocolVersion.V4, 0, {}, None, None)
50+
3451
def test_prepare_message(self):
3552
"""
3653
Test to check the appropriate calls are made

0 commit comments

Comments
 (0)