Skip to content

Commit 41f02ff

Browse files
committed
(improvement) serializers: add Cython-optimized serialization for VectorType
Add cassandra/serializers.pyx and cassandra/serializers.pxd implementing Cython-optimized serialization that mirrors the deserializers.pyx architecture. Implements type-specialized serializers for the three subtypes commonly used in vector columns: - SerFloatType: 4-byte big-endian IEEE 754 float - SerDoubleType: 8-byte big-endian double - SerInt32Type: 4-byte big-endian signed int32 SerVectorType pre-allocates a contiguous buffer and uses C-level byte swapping for float/double/int32 vectors, with a generic fallback for other subtypes. GenericSerializer delegates to the Python-level cqltype.serialize() classmethod. Factory functions find_serializer() and make_serializers() allow easy lookup and batch creation of serializers for column types. Benchmarks show ~30x speedup over the current io.BytesIO baseline and ~3x speedup over Python struct.pack for Vector<float, 1536> serialization. No setup.py changes needed - the existing cassandra/*.pyx glob already picks up new .pyx files.
1 parent caa98b6 commit 41f02ff

2 files changed

Lines changed: 360 additions & 0 deletions

File tree

cassandra/serializers.pxd

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Copyright ScyllaDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
cdef class Serializer:
17+
# The cqltypes._CassandraType corresponding to this serializer
18+
cdef object cqltype
19+
20+
cpdef bytes serialize(self, object value, int protocol_version)

cassandra/serializers.pyx

Lines changed: 340 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,340 @@
1+
# Copyright ScyllaDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Cython-optimized serializers for CQL types.
17+
18+
Mirrors the architecture of deserializers.pyx. Currently implements
19+
optimized serialization for:
20+
- FloatType (4-byte big-endian float)
21+
- DoubleType (8-byte big-endian double)
22+
- Int32Type (4-byte big-endian signed int)
23+
- VectorType (type-specialized for float/double/int32, generic fallback)
24+
25+
For all other types, GenericSerializer delegates to the Python-level
26+
cqltype.serialize() classmethod.
27+
"""
28+
29+
from libc.stdint cimport int32_t, uint32_t
30+
from libc.string cimport memcpy
31+
from libc.stdlib cimport malloc, free
32+
from cpython.bytes cimport PyBytes_FromStringAndSize
33+
34+
from cassandra import cqltypes
35+
36+
cdef bint is_little_endian
37+
from cassandra.util import is_little_endian
38+
39+
40+
# ---------------------------------------------------------------------------
41+
# Base class
42+
# ---------------------------------------------------------------------------
43+
44+
cdef class Serializer:
45+
"""Cython-based serializer class for a cqltype"""
46+
47+
def __init__(self, cqltype):
48+
self.cqltype = cqltype
49+
50+
cpdef bytes serialize(self, object value, int protocol_version):
51+
raise NotImplementedError
52+
53+
54+
# ---------------------------------------------------------------------------
55+
# Scalar serializers
56+
# ---------------------------------------------------------------------------
57+
58+
cdef class SerFloatType(Serializer):
59+
"""Serialize a Python float to 4-byte big-endian IEEE 754."""
60+
61+
cpdef bytes serialize(self, object value, int protocol_version):
62+
cdef float val = <float>value
63+
cdef char out[4]
64+
cdef char *src = <char *>&val
65+
66+
if is_little_endian:
67+
out[0] = src[3]
68+
out[1] = src[2]
69+
out[2] = src[1]
70+
out[3] = src[0]
71+
else:
72+
memcpy(out, src, 4)
73+
74+
return PyBytes_FromStringAndSize(out, 4)
75+
76+
77+
cdef class SerDoubleType(Serializer):
78+
"""Serialize a Python float to 8-byte big-endian IEEE 754."""
79+
80+
cpdef bytes serialize(self, object value, int protocol_version):
81+
cdef double val = <double>value
82+
cdef char out[8]
83+
cdef char *src = <char *>&val
84+
85+
if is_little_endian:
86+
out[0] = src[7]
87+
out[1] = src[6]
88+
out[2] = src[5]
89+
out[3] = src[4]
90+
out[4] = src[3]
91+
out[5] = src[2]
92+
out[6] = src[1]
93+
out[7] = src[0]
94+
else:
95+
memcpy(out, src, 8)
96+
97+
return PyBytes_FromStringAndSize(out, 8)
98+
99+
100+
cdef class SerInt32Type(Serializer):
101+
"""Serialize a Python int to 4-byte big-endian signed int32."""
102+
103+
cpdef bytes serialize(self, object value, int protocol_version):
104+
cdef int32_t val = <int32_t>value
105+
cdef char out[4]
106+
cdef char *src = <char *>&val
107+
108+
if is_little_endian:
109+
out[0] = src[3]
110+
out[1] = src[2]
111+
out[2] = src[1]
112+
out[3] = src[0]
113+
else:
114+
memcpy(out, src, 4)
115+
116+
return PyBytes_FromStringAndSize(out, 4)
117+
118+
119+
# ---------------------------------------------------------------------------
120+
# Type detection helpers
121+
# ---------------------------------------------------------------------------
122+
123+
cdef inline bint _is_float_type(object subtype):
124+
return subtype is cqltypes.FloatType or issubclass(subtype, cqltypes.FloatType)
125+
126+
cdef inline bint _is_double_type(object subtype):
127+
return subtype is cqltypes.DoubleType or issubclass(subtype, cqltypes.DoubleType)
128+
129+
cdef inline bint _is_int32_type(object subtype):
130+
return subtype is cqltypes.Int32Type or issubclass(subtype, cqltypes.Int32Type)
131+
132+
133+
# ---------------------------------------------------------------------------
134+
# VectorType serializer
135+
# ---------------------------------------------------------------------------
136+
137+
cdef class SerVectorType(Serializer):
138+
"""
139+
Optimized Cython serializer for VectorType.
140+
141+
For float, double, and int32 vectors, pre-allocates a contiguous buffer
142+
and uses C-level byte swapping. For other subtypes, falls back to
143+
per-element Python serialization.
144+
"""
145+
146+
cdef int vector_size
147+
cdef object subtype
148+
# 0 = generic, 1 = float, 2 = double, 3 = int32
149+
cdef int type_code
150+
151+
def __init__(self, cqltype):
152+
super().__init__(cqltype)
153+
self.vector_size = cqltype.vector_size
154+
self.subtype = cqltype.subtype
155+
156+
if _is_float_type(self.subtype):
157+
self.type_code = 1
158+
elif _is_double_type(self.subtype):
159+
self.type_code = 2
160+
elif _is_int32_type(self.subtype):
161+
self.type_code = 3
162+
else:
163+
self.type_code = 0
164+
165+
cpdef bytes serialize(self, object value, int protocol_version):
166+
cdef int v_length = len(value)
167+
if v_length != self.vector_size:
168+
raise ValueError(
169+
"Expected sequence of size %d for vector of type %s and "
170+
"dimension %d, observed sequence of length %d" % (
171+
self.vector_size, self.subtype.typename,
172+
self.vector_size, v_length))
173+
174+
if self.type_code == 1:
175+
return self._serialize_float(value)
176+
elif self.type_code == 2:
177+
return self._serialize_double(value)
178+
elif self.type_code == 3:
179+
return self._serialize_int32(value)
180+
else:
181+
return self._serialize_generic(value, protocol_version)
182+
183+
cdef inline bytes _serialize_float(self, object values):
184+
"""Serialize a list of floats into a contiguous big-endian buffer."""
185+
cdef Py_ssize_t i
186+
cdef Py_ssize_t buf_size = self.vector_size * 4
187+
cdef char *buf = <char *>malloc(buf_size)
188+
if buf == NULL:
189+
raise MemoryError("Failed to allocate %d bytes for vector serialization" % buf_size)
190+
191+
cdef float val
192+
cdef char *src
193+
cdef char *dst
194+
195+
try:
196+
for i in range(self.vector_size):
197+
val = <float>values[i]
198+
src = <char *>&val
199+
dst = buf + i * 4
200+
201+
if is_little_endian:
202+
dst[0] = src[3]
203+
dst[1] = src[2]
204+
dst[2] = src[1]
205+
dst[3] = src[0]
206+
else:
207+
memcpy(dst, src, 4)
208+
209+
return PyBytes_FromStringAndSize(buf, buf_size)
210+
finally:
211+
free(buf)
212+
213+
cdef inline bytes _serialize_double(self, object values):
214+
"""Serialize a list of doubles into a contiguous big-endian buffer."""
215+
cdef Py_ssize_t i
216+
cdef Py_ssize_t buf_size = self.vector_size * 8
217+
cdef char *buf = <char *>malloc(buf_size)
218+
if buf == NULL:
219+
raise MemoryError("Failed to allocate %d bytes for vector serialization" % buf_size)
220+
221+
cdef double val
222+
cdef char *src
223+
cdef char *dst
224+
225+
try:
226+
for i in range(self.vector_size):
227+
val = <double>values[i]
228+
src = <char *>&val
229+
dst = buf + i * 8
230+
231+
if is_little_endian:
232+
dst[0] = src[7]
233+
dst[1] = src[6]
234+
dst[2] = src[5]
235+
dst[3] = src[4]
236+
dst[4] = src[3]
237+
dst[5] = src[2]
238+
dst[6] = src[1]
239+
dst[7] = src[0]
240+
else:
241+
memcpy(dst, src, 8)
242+
243+
return PyBytes_FromStringAndSize(buf, buf_size)
244+
finally:
245+
free(buf)
246+
247+
cdef inline bytes _serialize_int32(self, object values):
248+
"""Serialize a list of int32 values into a contiguous big-endian buffer."""
249+
cdef Py_ssize_t i
250+
cdef Py_ssize_t buf_size = self.vector_size * 4
251+
cdef char *buf = <char *>malloc(buf_size)
252+
if buf == NULL:
253+
raise MemoryError("Failed to allocate %d bytes for vector serialization" % buf_size)
254+
255+
cdef int32_t val
256+
cdef char *src
257+
cdef char *dst
258+
259+
try:
260+
for i in range(self.vector_size):
261+
val = <int32_t>values[i]
262+
src = <char *>&val
263+
dst = buf + i * 4
264+
265+
if is_little_endian:
266+
dst[0] = src[3]
267+
dst[1] = src[2]
268+
dst[2] = src[1]
269+
dst[3] = src[0]
270+
else:
271+
memcpy(dst, src, 4)
272+
273+
return PyBytes_FromStringAndSize(buf, buf_size)
274+
finally:
275+
free(buf)
276+
277+
cdef inline bytes _serialize_generic(self, object values, int protocol_version):
278+
"""Fallback: element-by-element Python serialization for non-optimized types."""
279+
import io
280+
from cassandra.marshal import uvint_pack
281+
282+
serialized_size = self.subtype.serial_size()
283+
buf = io.BytesIO()
284+
for item in values:
285+
item_bytes = self.subtype.serialize(item, protocol_version)
286+
if serialized_size is None:
287+
buf.write(uvint_pack(len(item_bytes)))
288+
buf.write(item_bytes)
289+
return buf.getvalue()
290+
291+
292+
# ---------------------------------------------------------------------------
293+
# Generic serializer (fallback for all other types)
294+
# ---------------------------------------------------------------------------
295+
296+
cdef class GenericSerializer(Serializer):
297+
"""
298+
Wraps a generic cqltype for serialization, delegating to the Python-level
299+
cqltype.serialize() classmethod.
300+
"""
301+
302+
cpdef bytes serialize(self, object value, int protocol_version):
303+
return self.cqltype.serialize(value, protocol_version)
304+
305+
def __repr__(self):
306+
return "GenericSerializer(%s)" % (self.cqltype,)
307+
308+
309+
# ---------------------------------------------------------------------------
310+
# Lookup and factory
311+
# ---------------------------------------------------------------------------
312+
313+
cdef dict _ser_classes = {}
314+
315+
cpdef Serializer find_serializer(cqltype):
316+
"""Find a serializer for a cqltype."""
317+
318+
# For VectorType, always use SerVectorType (it handles generic subtypes internally)
319+
if issubclass(cqltype, cqltypes.VectorType):
320+
return SerVectorType(cqltype)
321+
322+
# For scalar types with dedicated serializers, look up by name
323+
name = 'Ser' + cqltype.__name__
324+
cls = _ser_classes.get(name)
325+
if cls is not None:
326+
return cls(cqltype)
327+
328+
# Fallback to generic
329+
return GenericSerializer(cqltype)
330+
331+
332+
def make_serializers(cqltypes_list):
333+
"""Create a list of Serializer objects for each given cqltype."""
334+
return [find_serializer(ct) for ct in cqltypes_list]
335+
336+
337+
# Build the lookup dict for scalar serializers at module load time
338+
_ser_classes['SerFloatType'] = SerFloatType
339+
_ser_classes['SerDoubleType'] = SerDoubleType
340+
_ser_classes['SerInt32Type'] = SerInt32Type

0 commit comments

Comments
 (0)