Skip to content

Commit 9c3b11f

Browse files
committed
Optimize column_encryption_policy checks in Cython parsers
Split TupleRowParser.unpack_row() into unpack_plain_row() and unpack_col_encrypted_row(), branching once in the callers (ListParser, LazyParser, row_parser error path) rather than per value. - Add boundscheck/wraparound Cython optimizations on both methods - Extract try/except outside the per-column loop - NumpyParser.parse_rows() raises NotImplementedError when column encryption is configured, since it has no decryption support - Update parsing.pxd declarations to match
1 parent 9c7ed99 commit 9c3b11f

5 files changed

Lines changed: 76 additions & 27 deletions

File tree

cassandra/numpy_parser.pyx

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ cdef class NumpyParser(ColumnParser):
8181
cdef ArrDesc[::1] array_descs
8282
cdef ArrDesc *arrs
8383

84+
if desc.column_encryption_policy:
85+
raise NotImplementedError(
86+
"NumpyParser does not support column encryption")
87+
8488
rowcount = read_int(reader)
8589
array_descs, arrays = make_arrays(desc, rowcount)
8690
arrs = &array_descs[0]
@@ -97,7 +101,7 @@ cdef _parse_rows(BytesIOReader reader, ParseDesc desc,
97101
cdef Py_ssize_t i
98102

99103
for i in range(rowcount):
100-
unpack_row(reader, desc, arrs)
104+
unpack_plain_row(reader, desc, arrs)
101105

102106

103107
### Helper functions to create NumPy arrays and array descriptors
@@ -144,7 +148,7 @@ def make_array(coltype, array_size):
144148

145149
@cython.boundscheck(False)
146150
@cython.wraparound(False)
147-
cdef inline int unpack_row(
151+
cdef inline int unpack_plain_row(
148152
BytesIOReader reader, ParseDesc desc, ArrDesc *arrays) except -1:
149153
cdef Buffer buf
150154
cdef Py_ssize_t i, rowsize = desc.rowsize

cassandra/obj_parser.pyx

Lines changed: 55 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ from cassandra.parsing cimport ParseDesc, ColumnParser, RowParser
2222
from cassandra.tuple cimport tuple_new, tuple_set
2323

2424
from cpython.bytes cimport PyBytes_AsStringAndSize
25+
cimport cython
2526

2627

2728
cdef class ListParser(ColumnParser):
@@ -31,7 +32,10 @@ cdef class ListParser(ColumnParser):
3132
cdef Py_ssize_t i, rowcount
3233
rowcount = read_int(reader)
3334
cdef RowParser rowparser = TupleRowParser()
34-
return [rowparser.unpack_row(reader, desc) for i in range(rowcount)]
35+
if desc.column_encryption_policy:
36+
return [rowparser.unpack_col_encrypted_row(reader, desc) for i in range(rowcount)]
37+
else:
38+
return [rowparser.unpack_plain_row(reader, desc) for i in range(rowcount)]
3539

3640

3741
cdef class LazyParser(ColumnParser):
@@ -47,48 +51,78 @@ def parse_rows_lazy(BytesIOReader reader, ParseDesc desc):
4751
cdef Py_ssize_t i, rowcount
4852
rowcount = read_int(reader)
4953
cdef RowParser rowparser = TupleRowParser()
50-
return (rowparser.unpack_row(reader, desc) for i in range(rowcount))
54+
if desc.column_encryption_policy:
55+
return (rowparser.unpack_col_encrypted_row(reader, desc) for i in range(rowcount))
56+
else:
57+
return (rowparser.unpack_plain_row(reader, desc) for i in range(rowcount))
5158

5259

5360
cdef class TupleRowParser(RowParser):
5461
"""
5562
Parse a single returned row into a tuple of objects:
5663
5764
(obj1, ..., objN)
65+
If CE (Column encryption) policy is enabled - use unpack_col_encrypted_row(),
66+
otherwise use unpack_plain_row()
5867
"""
5968

60-
cpdef unpack_row(self, BytesIOReader reader, ParseDesc desc):
61-
assert desc.rowsize >= 0
62-
69+
@cython.boundscheck(False)
70+
@cython.wraparound(False)
71+
cpdef unpack_col_encrypted_row(self, BytesIOReader reader, ParseDesc desc):
6372
cdef Buffer buf
6473
cdef Buffer newbuf
6574
cdef Py_ssize_t i, rowsize = desc.rowsize
6675
cdef Deserializer deserializer
6776
cdef tuple res = tuple_new(desc.rowsize)
6877

6978
ce_policy = desc.column_encryption_policy
70-
for i in range(rowsize):
71-
# Read the next few bytes
72-
get_buf(reader, &buf)
73-
74-
# Deserialize bytes to python object
75-
deserializer = desc.deserializers[i]
76-
coldesc = desc.coldescs[i]
77-
uses_ce = ce_policy and ce_policy.contains_column(coldesc)
78-
try:
79+
try:
80+
for i in range(rowsize):
81+
# Read the next few bytes
82+
get_buf(reader, &buf)
83+
84+
# Deserialize bytes to python object
85+
deserializer = desc.deserializers[i]
86+
coldesc = desc.coldescs[i]
87+
uses_ce = ce_policy.contains_column(coldesc)
7988
if uses_ce:
8089
col_type = ce_policy.column_type(coldesc)
8190
decrypted_bytes = ce_policy.decrypt(coldesc, to_bytes(&buf))
8291
PyBytes_AsStringAndSize(decrypted_bytes, &newbuf.ptr, &newbuf.size)
83-
deserializer = find_deserializer(ce_policy.column_type(coldesc))
92+
deserializer = find_deserializer(col_type)
8493
val = from_binary(deserializer, &newbuf, desc.protocol_version)
8594
else:
8695
val = from_binary(deserializer, &buf, desc.protocol_version)
87-
except Exception as e:
88-
raise DriverException('Failed decoding result column "%s" of type %s: %s' % (desc.colnames[i],
89-
desc.coltypes[i].cql_parameterized_type(),
90-
str(e)))
91-
# Insert new object into tuple
92-
tuple_set(res, i, val)
96+
# Insert new object into tuple
97+
tuple_set(res, i, val)
98+
except Exception as e:
99+
raise DriverException('Failed decoding result column "%s" of type %s: %s' % (desc.colnames[i],
100+
desc.coltypes[i].cql_parameterized_type(),
101+
str(e)))
102+
103+
return res
104+
105+
@cython.boundscheck(False)
106+
@cython.wraparound(False)
107+
cpdef unpack_plain_row(self, BytesIOReader reader, ParseDesc desc):
108+
cdef Buffer buf
109+
cdef Py_ssize_t i, rowsize = desc.rowsize
110+
cdef Deserializer deserializer
111+
cdef tuple res = tuple_new(desc.rowsize)
112+
113+
try:
114+
for i in range(rowsize):
115+
# Read the next few bytes
116+
get_buf(reader, &buf)
117+
118+
# Deserialize bytes to python object
119+
deserializer = desc.deserializers[i]
120+
val = from_binary(deserializer, &buf, desc.protocol_version)
121+
# Insert new object into tuple
122+
tuple_set(res, i, val)
123+
except Exception as e:
124+
raise DriverException('Failed decoding result column "%s" of type %s: %s' % (desc.colnames[i],
125+
desc.coltypes[i].cql_parameterized_type(),
126+
str(e)))
93127

94128
return res

cassandra/parsing.pxd

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,6 @@ cdef class ColumnParser:
2828
cpdef parse_rows(self, BytesIOReader reader, ParseDesc desc)
2929

3030
cdef class RowParser:
31-
cpdef unpack_row(self, BytesIOReader reader, ParseDesc desc)
31+
cpdef unpack_plain_row(self, BytesIOReader reader, ParseDesc desc)
32+
cpdef unpack_col_encrypted_row(self, BytesIOReader reader, ParseDesc desc)
3233

cassandra/parsing.pyx

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,14 @@ cdef class ColumnParser:
3939
cdef class RowParser:
4040
"""Parser for a single row"""
4141

42-
cpdef unpack_row(self, BytesIOReader reader, ParseDesc desc):
42+
cpdef unpack_plain_row(self, BytesIOReader reader, ParseDesc desc):
4343
"""
4444
Unpack a single row of data in a ResultMessage.
4545
"""
4646
raise NotImplementedError
47+
48+
cpdef unpack_col_encrypted_row(self, BytesIOReader reader, ParseDesc desc):
49+
"""
50+
Unpack a single row of data in a ResultMessage, with column encryption support.
51+
"""
52+
raise NotImplementedError

cassandra/row_parser.pyx

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,11 @@ def make_recv_results_rows(ColumnParser colparser):
4444
reader.buf_ptr = reader.buf
4545
reader.pos = 0
4646
rowcount = read_int(reader)
47-
for i in range(rowcount):
48-
rowparser.unpack_row(reader, desc)
47+
if desc.column_encryption_policy:
48+
for i in range(rowcount):
49+
rowparser.unpack_col_encrypted_row(reader, desc)
50+
else:
51+
for i in range(rowcount):
52+
rowparser.unpack_plain_row(reader, desc)
4953

5054
return recv_results_rows

0 commit comments

Comments
 (0)