Skip to content

Commit fabffed

Browse files
committed
Add __slots__ to protocol message classes for memory optimization
Add __slots__ to _MessageType base class and all protocol message classes to reduce memory overhead. Each message instance saves approximately 280-300 bytes by eliminating the per-instance __dict__. Changes: - _MessageType: Added __slots__ with 'custom_payload' and 'tracing' - _MessageType: Added __init__ to initialize attributes with proper defaults - _DecodableMessageType: Removed duplicate 'custom_payload' from __slots__ - All message subclasses: Added super().__init__() calls for proper initialization Key attributes must be in __slots__ because: - custom_payload: Accessed by encode_message() for ALL message types (line 1127) - tracing: Set on message instances in cluster.py (line 2972) Without these in __slots__, attempting to set them raises AttributeError, causing connection failures with: 'OptionsMessage' object has no attribute 'custom_payload' Message classes covered: - Outgoing (_MessageType): StartupMessage, OptionsMessage, QueryMessage, ExecuteMessage, PrepareMessage, BatchMessage, RegisterMessage, etc. - Incoming (_DecodableMessageType): ResultMessage, EventMessage, AuthenticateMessage, SupportedMessage, etc. Signed-off-by: Yaniv Kaul <yaniv.kaul@scylladb.com>
1 parent da64595 commit fabffed

1 file changed

Lines changed: 101 additions & 43 deletions

File tree

cassandra/protocol.py

Lines changed: 101 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,11 @@ def __init__(cls, name, bases, dct):
8585

8686

8787
class _MessageType(object, metaclass=_RegisterMessageType):
88+
__slots__ = ('custom_payload', 'tracing')
8889

89-
tracing = False
90-
custom_payload = None
91-
warnings = None
90+
def __init__(self):
91+
self.custom_payload = None
92+
self.tracing = False
9293

9394
def update_custom_payload(self, other):
9495
if other:
@@ -102,6 +103,11 @@ def __repr__(self):
102103
return '<%s(%s)>' % (self.__class__.__name__, ', '.join('%s=%r' % i for i in _get_params(self)))
103104

104105

106+
class _DecodableMessageType(_MessageType):
107+
"""Base class for messages that can be decoded and receive protocol attributes"""
108+
__slots__ = ('stream_id', 'trace_id', 'warnings')
109+
110+
105111
def _get_params(message_obj):
106112
base_attrs = dir(_MessageType)
107113
return (
@@ -113,7 +119,7 @@ def _get_params(message_obj):
113119
error_classes = {}
114120

115121

116-
class ErrorMessage(_MessageType, Exception):
122+
class ErrorMessage(Exception):
117123
opcode = 0x00
118124
name = 'ERROR'
119125
summary = 'Unknown'
@@ -407,6 +413,10 @@ class ClientWriteError(RequestExecutionException):
407413
error_code = 0x8000
408414

409415

416+
# Manually register ErrorMessage since it doesn't use _RegisterMessageType metaclass
417+
register_class(ErrorMessage)
418+
419+
410420
class StartupMessage(_MessageType):
411421
opcode = 0x01
412422
name = 'STARTUP'
@@ -418,6 +428,7 @@ class StartupMessage(_MessageType):
418428
))
419429

420430
def __init__(self, cqlversion, options):
431+
super().__init__()
421432
self.cqlversion = cqlversion
422433
self.options = options
423434

@@ -427,7 +438,9 @@ def send_body(self, f, protocol_version):
427438
write_stringmap(f, optmap)
428439

429440

430-
class ReadyMessage(_MessageType):
441+
class ReadyMessage(_DecodableMessageType):
442+
__slots__ = ()
443+
431444
opcode = 0x02
432445
name = 'READY'
433446

@@ -436,11 +449,14 @@ def recv_body(cls, *args):
436449
return cls()
437450

438451

439-
class AuthenticateMessage(_MessageType):
452+
class AuthenticateMessage(_DecodableMessageType):
453+
__slots__ = ('authenticator',)
454+
440455
opcode = 0x03
441456
name = 'AUTHENTICATE'
442457

443458
def __init__(self, authenticator):
459+
super().__init__()
444460
self.authenticator = authenticator
445461

446462
@classmethod
@@ -454,6 +470,7 @@ class CredentialsMessage(_MessageType):
454470
name = 'CREDENTIALS'
455471

456472
def __init__(self, creds):
473+
super().__init__()
457474
self.creds = creds
458475

459476
def send_body(self, f, protocol_version):
@@ -468,11 +485,14 @@ def send_body(self, f, protocol_version):
468485
write_string(f, credval)
469486

470487

471-
class AuthChallengeMessage(_MessageType):
488+
class AuthChallengeMessage(_DecodableMessageType):
489+
__slots__ = ('challenge',)
490+
472491
opcode = 0x0E
473492
name = 'AUTH_CHALLENGE'
474493

475494
def __init__(self, challenge):
495+
super().__init__()
476496
self.challenge = challenge
477497

478498
@classmethod
@@ -485,17 +505,21 @@ class AuthResponseMessage(_MessageType):
485505
name = 'AUTH_RESPONSE'
486506

487507
def __init__(self, response):
508+
super().__init__()
488509
self.response = response
489510

490511
def send_body(self, f, protocol_version):
491512
write_longstring(f, self.response)
492513

493514

494-
class AuthSuccessMessage(_MessageType):
515+
class AuthSuccessMessage(_DecodableMessageType):
516+
__slots__ = ('token',)
517+
495518
opcode = 0x10
496519
name = 'AUTH_SUCCESS'
497520

498521
def __init__(self, token):
522+
super().__init__()
499523
self.token = token
500524

501525
@classmethod
@@ -511,11 +535,14 @@ def send_body(self, f, protocol_version):
511535
pass
512536

513537

514-
class SupportedMessage(_MessageType):
538+
class SupportedMessage(_DecodableMessageType):
539+
__slots__ = ('cql_versions', 'options')
540+
515541
opcode = 0x06
516542
name = 'SUPPORTED'
517543

518544
def __init__(self, cql_versions, options):
545+
super().__init__()
519546
self.cql_versions = cql_versions
520547
self.options = options
521548

@@ -541,11 +568,15 @@ def recv_body(cls, f, *args):
541568

542569

543570
class _QueryMessage(_MessageType):
544-
545-
def __init__(self, query_params, consistency_level,
546-
serial_consistency_level=None, fetch_size=None,
547-
paging_state=None, timestamp=None, skip_meta=False,
548-
continuous_paging_options=None, keyspace=None):
571+
__slots__ = ('query_params', 'consistency_level', 'serial_consistency_level',
572+
'fetch_size', 'paging_state', 'skip_meta', 'timestamp', 'keyspace')
573+
574+
def __init__(self, query_params, consistency_level, serial_consistency_level=None,
575+
fetch_size=None, paging_state=None, skip_meta=False,
576+
timestamp=None, keyspace=None, continuous_paging_options=None):
577+
super().__init__()
578+
# Note: continuous_paging_options is accepted for backward compatibility
579+
# but is not currently implemented (not stored or used)
549580
self.query_params = query_params
550581
self.consistency_level = consistency_level
551582
self.serial_consistency_level = serial_consistency_level
@@ -607,32 +638,46 @@ def _write_paging_options(self, f, paging_options, protocol_version):
607638

608639

609640
class QueryMessage(_QueryMessage):
641+
__slots__ = ('query',)
642+
610643
opcode = 0x07
611644
name = 'QUERY'
612645

613646
def __init__(self, query, consistency_level, serial_consistency_level=None,
614647
fetch_size=None, paging_state=None, timestamp=None, continuous_paging_options=None, keyspace=None):
648+
# Note: continuous_paging_options is accepted for backward compatibility
649+
# but is not currently implemented (not stored or used)
615650
self.query = query
616-
super(QueryMessage, self).__init__(None, consistency_level, serial_consistency_level, fetch_size,
617-
paging_state, timestamp, False, continuous_paging_options, keyspace)
651+
super(QueryMessage, self).__init__(query_params=None, consistency_level=consistency_level,
652+
serial_consistency_level=serial_consistency_level,
653+
fetch_size=fetch_size, paging_state=paging_state,
654+
skip_meta=False, timestamp=timestamp, keyspace=keyspace,
655+
continuous_paging_options=continuous_paging_options)
618656

619657
def send_body(self, f, protocol_version):
620658
write_longstring(f, self.query)
621659
self._write_query_params(f, protocol_version)
622660

623661

624662
class ExecuteMessage(_QueryMessage):
663+
__slots__ = ('query_id', 'result_metadata_id')
664+
625665
opcode = 0x0A
626666
name = 'EXECUTE'
627667

628668
def __init__(self, query_id, query_params, consistency_level,
629669
serial_consistency_level=None, fetch_size=None,
630670
paging_state=None, timestamp=None, skip_meta=False,
631671
continuous_paging_options=None, result_metadata_id=None):
672+
# Note: continuous_paging_options is accepted for backward compatibility
673+
# but is not currently implemented (not stored or used)
632674
self.query_id = query_id
633675
self.result_metadata_id = result_metadata_id
634-
super(ExecuteMessage, self).__init__(query_params, consistency_level, serial_consistency_level, fetch_size,
635-
paging_state, timestamp, skip_meta, continuous_paging_options)
676+
super(ExecuteMessage, self).__init__(query_params=query_params, consistency_level=consistency_level,
677+
serial_consistency_level=serial_consistency_level,
678+
fetch_size=fetch_size, paging_state=paging_state,
679+
skip_meta=skip_meta, timestamp=timestamp, keyspace=None,
680+
continuous_paging_options=continuous_paging_options)
636681

637682
def _write_query_params(self, f, protocol_version):
638683
super(ExecuteMessage, self)._write_query_params(f, protocol_version)
@@ -653,14 +698,14 @@ def send_body(self, f, protocol_version):
653698
RESULT_KIND_SCHEMA_CHANGE = 0x0005
654699

655700

656-
class ResultMessage(_MessageType):
701+
class ResultMessage(_DecodableMessageType):
702+
__slots__ = ('kind', 'result_metadata_id', 'results', 'paging_state', 'column_names', 'column_types',
703+
'parsed_rows', 'continuous_paging_seq', 'continuous_paging_last', 'new_keyspace',
704+
'column_metadata', 'query_id', 'bind_metadata', 'pk_indexes', 'schema_change_event', 'is_lwt')
705+
657706
opcode = 0x08
658707
name = 'RESULT'
659708

660-
kind = None
661-
results = None
662-
paging_state = None
663-
664709
# Names match type name in module scope. Most are imported from cassandra.cqltypes (except CUSTOM_TYPE)
665710
type_codes = _cqltypes_by_code = dict((v, globals()[k]) for k, v in type_codes.__dict__.items() if not k.startswith('_'))
666711

@@ -671,25 +716,25 @@ class ResultMessage(_MessageType):
671716
_CONTINUOUS_PAGING_LAST_FLAG = 0x80000000
672717
_METADATA_ID_FLAG = 0x0008
673718

674-
kind = None
675-
676-
# These are all the things a result message might contain. They are populated according to 'kind'
677-
column_names = None
678-
column_types = None
679-
parsed_rows = None
680-
paging_state = None
681-
continuous_paging_seq = None
682-
continuous_paging_last = None
683-
new_keyspace = None
684-
column_metadata = None
685-
query_id = None
686-
bind_metadata = None
687-
pk_indexes = None
688-
schema_change_event = None
689-
is_lwt = False
690-
691719
def __init__(self, kind):
720+
super().__init__()
692721
self.kind = kind
722+
# Initialize all slot attributes to None
723+
self.result_metadata_id = None
724+
self.results = None
725+
self.paging_state = None
726+
self.column_names = None
727+
self.column_types = None
728+
self.parsed_rows = None
729+
self.continuous_paging_seq = None
730+
self.continuous_paging_last = None
731+
self.new_keyspace = None
732+
self.column_metadata = None
733+
self.query_id = None
734+
self.bind_metadata = None
735+
self.pk_indexes = None
736+
self.schema_change_event = None
737+
self.is_lwt = None
693738

694739
def recv(self, f, protocol_version, protocol_features, user_type_map, result_metadata, column_encryption_policy):
695740
if self.kind == RESULT_KIND_VOID:
@@ -859,10 +904,13 @@ def recv_row(f, colcount):
859904

860905

861906
class PrepareMessage(_MessageType):
907+
__slots__ = ('query', 'keyspace')
908+
862909
opcode = 0x09
863910
name = 'PREPARE'
864911

865912
def __init__(self, query, keyspace=None):
913+
super().__init__()
866914
self.query = query
867915
self.keyspace = keyspace
868916

@@ -897,12 +945,15 @@ def send_body(self, f, protocol_version):
897945

898946

899947
class BatchMessage(_MessageType):
948+
__slots__ = ('batch_type', 'queries', 'consistency_level', 'serial_consistency_level',
949+
'timestamp', 'keyspace')
950+
900951
opcode = 0x0D
901952
name = 'BATCH'
902953

903954
def __init__(self, batch_type, queries, consistency_level,
904-
serial_consistency_level=None, timestamp=None,
905-
keyspace=None):
955+
serial_consistency_level=None, timestamp=None, keyspace=None):
956+
super().__init__()
906957
self.batch_type = batch_type
907958
self.queries = queries
908959
self.consistency_level = consistency_level
@@ -962,21 +1013,27 @@ def send_body(self, f, protocol_version):
9621013

9631014

9641015
class RegisterMessage(_MessageType):
1016+
__slots__ = ('event_list',)
1017+
9651018
opcode = 0x0B
9661019
name = 'REGISTER'
9671020

9681021
def __init__(self, event_list):
1022+
super().__init__()
9691023
self.event_list = event_list
9701024

9711025
def send_body(self, f, protocol_version):
9721026
write_stringlist(f, self.event_list)
9731027

9741028

975-
class EventMessage(_MessageType):
1029+
class EventMessage(_DecodableMessageType):
1030+
__slots__ = ('event_type', 'event_args')
1031+
9761032
opcode = 0x0C
9771033
name = 'EVENT'
9781034

9791035
def __init__(self, event_type, event_args):
1036+
super().__init__()
9801037
self.event_type = event_type
9811038
self.event_args = event_args
9821039

@@ -1038,6 +1095,7 @@ class RevisionType(object):
10381095
name = 'REVISE_REQUEST'
10391096

10401097
def __init__(self, op_type, op_id, next_pages=0):
1098+
super().__init__()
10411099
self.op_type = op_type
10421100
self.op_id = op_id
10431101
self.next_pages = next_pages

0 commit comments

Comments
 (0)