Skip to content

Commit a9e0328

Browse files
committed
perf: inline row decoding and eliminate closures in recv_results_rows
Split recv_results_rows into fast path (no column encryption) and slow path (column encryption enabled): Fast path (common case): - Reads raw column bytes and decodes types in a single pass per row via _decode_row_inline(), eliminating the intermediate list-of-lists - Skips ColDesc namedtuple creation entirely (only needed for CE) - No closure allocation per call Slow path (column encryption): - Preserves full CE logic with ColDesc creation - Moves decode_val/decode_row closures to module-level functions (_decode_val_ce, _decode_row_ce) to avoid per-call closure overhead Note: This PR modifies the same method as PR scylladb#630 (which also splits recv_results_rows into CE/non-CE branches). There will be a merge conflict that needs manual resolution if both PRs are accepted.
1 parent 153c913 commit a9e0328

1 file changed

Lines changed: 63 additions & 22 deletions

File tree

cassandra/protocol.py

Lines changed: 63 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -717,31 +717,42 @@ def recv_results_rows(self, f, protocol_version, user_type_map, result_metadata,
717717
self.recv_results_metadata(f, user_type_map)
718718
column_metadata = self.column_metadata or result_metadata
719719
rowcount = read_int(f)
720-
rows = [self.recv_row(f, len(column_metadata)) for _ in range(rowcount)]
721720
self.column_names = [c[2] for c in column_metadata]
722721
self.column_types = [c[3] for c in column_metadata]
723-
col_descs = [ColDesc(md[0], md[1], md[2]) for md in column_metadata]
724722

725-
def decode_val(val, col_md, col_desc):
726-
uses_ce = column_encryption_policy and column_encryption_policy.contains_column(col_desc)
727-
col_type = column_encryption_policy.column_type(col_desc) if uses_ce else col_md[3]
728-
raw_bytes = column_encryption_policy.decrypt(col_desc, val) if uses_ce else val
729-
return col_type.from_binary(raw_bytes, protocol_version)
730-
731-
def decode_row(row):
732-
return tuple(decode_val(val, col_md, col_desc) for val, col_md, col_desc in zip(row, column_metadata, col_descs))
733-
734-
try:
735-
self.parsed_rows = [decode_row(row) for row in rows]
736-
except Exception:
737-
for row in rows:
738-
for val, col_md, col_desc in zip(row, column_metadata, col_descs):
739-
try:
740-
decode_val(val, col_md, col_desc)
741-
except Exception as e:
742-
raise DriverException('Failed decoding result column "%s" of type %s: %s' % (col_md[2],
743-
col_md[3].cql_parameterized_type(),
744-
str(e)))
723+
if not column_encryption_policy:
724+
# Fast path: no column encryption — decode inline, skip ColDesc creation
725+
col_types = self.column_types
726+
colcount = len(col_types)
727+
try:
728+
self.parsed_rows = [
729+
_decode_row_inline(f, colcount, col_types, protocol_version)
730+
for _ in range(rowcount)
731+
]
732+
except Exception:
733+
# Re-read is not possible since we consumed the buffer.
734+
# This path should be extremely rare (type mismatch in server response).
735+
raise
736+
else:
737+
# Slow path: column encryption enabled — need ColDesc and per-column CE check
738+
rows = [self.recv_row(f, len(column_metadata)) for _ in range(rowcount)]
739+
col_descs = [ColDesc(md[0], md[1], md[2]) for md in column_metadata]
740+
try:
741+
self.parsed_rows = [
742+
_decode_row_ce(row, column_metadata, col_descs,
743+
column_encryption_policy, protocol_version)
744+
for row in rows
745+
]
746+
except Exception:
747+
for row in rows:
748+
for val, col_md, col_desc in zip(row, column_metadata, col_descs):
749+
try:
750+
_decode_val_ce(val, col_md, col_desc,
751+
column_encryption_policy, protocol_version)
752+
except Exception as e:
753+
raise DriverException('Failed decoding result column "%s" of type %s: %s' % (col_md[2],
754+
col_md[3].cql_parameterized_type(),
755+
str(e)))
745756

746757
def recv_results_prepared(self, f, protocol_version, protocol_features, user_type_map):
747758
self.query_id = read_binary_string(f)
@@ -1424,6 +1435,36 @@ def read_error_code_map(f):
14241435
return error_code_map
14251436

14261437

1438+
1439+
def _decode_row_inline(f, colcount, col_types, protocol_version):
1440+
"""Decode a single row directly from the buffer (no column encryption)."""
1441+
row = []
1442+
for i in range(colcount):
1443+
size = read_int(f)
1444+
if size < 0:
1445+
row.append(None)
1446+
else:
1447+
val = f.read(size)
1448+
row.append(col_types[i].from_binary(val, protocol_version))
1449+
return tuple(row)
1450+
1451+
1452+
def _decode_val_ce(val, col_md, col_desc, column_encryption_policy, protocol_version):
1453+
"""Decode a single column value with column encryption support."""
1454+
uses_ce = column_encryption_policy.contains_column(col_desc)
1455+
col_type = column_encryption_policy.column_type(col_desc) if uses_ce else col_md[3]
1456+
raw_bytes = column_encryption_policy.decrypt(col_desc, val) if uses_ce else val
1457+
return col_type.from_binary(raw_bytes, protocol_version)
1458+
1459+
1460+
def _decode_row_ce(row, column_metadata, col_descs, column_encryption_policy, protocol_version):
1461+
"""Decode a full row with column encryption support."""
1462+
return tuple(
1463+
_decode_val_ce(val, col_md, col_desc, column_encryption_policy, protocol_version)
1464+
for val, col_md, col_desc in zip(row, column_metadata, col_descs)
1465+
)
1466+
1467+
14271468
def read_value(f):
14281469
size = read_int(f)
14291470
if size < 0:

0 commit comments

Comments
 (0)