diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 5658e31cc..af8d12143 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -540,10 +540,6 @@ def _send_join_group_request(self): # send a join group request to the coordinator log.info("(Re-)joining group %s", self.group_id) - member_metadata = [ - (protocol, metadata if isinstance(metadata, bytes) else metadata.encode()) - for protocol, metadata in self.group_protocols() - ] version = self._client.api_version(JoinGroupRequest, max_version=5) if version == 0: request = JoinGroupRequest[version]( @@ -551,7 +547,7 @@ def _send_join_group_request(self): self.config['session_timeout_ms'], self._generation.member_id, self.protocol_type(), - member_metadata) + self.group_protocols()) elif version <= 4: request = JoinGroupRequest[version]( self.group_id, @@ -559,7 +555,7 @@ def _send_join_group_request(self): self.config['max_poll_interval_ms'], self._generation.member_id, self.protocol_type(), - member_metadata) + self.group_protocols()) else: request = JoinGroupRequest[version]( self.group_id, @@ -568,7 +564,7 @@ def _send_join_group_request(self): self._generation.member_id, self.group_instance_id, self.protocol_type(), - member_metadata) + self.group_protocols()) # create the request for the coordinator log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id) @@ -706,10 +702,6 @@ def _on_join_leader(self, response): group_assignment = self._perform_assignment(response.leader_id, response.group_protocol, members) - for member_id, assignment in group_assignment.items(): - if not isinstance(assignment, bytes): - group_assignment[member_id] = assignment.encode() - except Exception as e: return Future().failure(e) diff --git a/kafka/protocol/types.py b/kafka/protocol/types.py index 28369f29e..574b0f327 100644 --- a/kafka/protocol/types.py +++ b/kafka/protocol/types.py @@ -141,8 +141,9 @@ class Bytes(AbstractType): def encode(cls, value): if value is None: return Int32.encode(-1) - else: - return Int32.encode(len(value)) + value + elif not isinstance(value, bytes): + value = value.encode() + return Int32.encode(len(value)) + value @classmethod def decode(cls, data): diff --git a/test/protocol/test_schema.py b/test/protocol/test_schema.py index 4ad8d0d2f..85850f43b 100644 --- a/test/protocol/test_schema.py +++ b/test/protocol/test_schema.py @@ -3,7 +3,7 @@ import pytest from kafka.protocol.struct import Struct -from kafka.protocol.types import Schema, Int32, String, TaggedFields +from kafka.protocol.types import Schema, Int32, String, TaggedFields, Bytes def test_schema_type(): @@ -52,3 +52,12 @@ def test_struct(args, kwargs): data = struct(*args, **kwargs) assert data.encode() == encoded assert struct.decode(encoded) == data + + +def test_bytes_struct(): + schema = Schema(('f1', Int32), ('f2', String())) + struct = type('TestStruct', (Struct,), {'SCHEMA': schema}) + data = struct(f1=123, f2="bar") + bytes_encoded = Bytes.encode(data) + assert bytes_encoded[4:] == data.encode() + assert bytes_encoded[:4] == Int32.encode(len(data.encode()))