Skip to content

Commit 3b48f7b

Browse files
committed
KafkaProtocol: validate network frame size (backport of #3019)
1 parent aeca021 commit 3b48f7b

7 files changed

Lines changed: 35 additions & 8 deletions

File tree

kafka/admin/client.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ class KafkaAdminClient(object):
8787
max_in_flight_requests_per_connection (int): Requests are pipelined
8888
to kafka brokers up to this number of maximum requests per
8989
broker connection. Default: 5.
90+
receive_message_max_bytes (int): Maximum allowed network frame size.
91+
Used to avoid OOM when decoding malformed network message header.
92+
Default: 1000000.
9093
receive_buffer_bytes (int): The size of the TCP receive buffer
9194
(SO_RCVBUF) to use when reading data. Default: None (relies on
9295
system defaults). Java client defaults to 32768.
@@ -163,11 +166,10 @@ class KafkaAdminClient(object):
163166
'reconnect_backoff_ms': 50,
164167
'reconnect_backoff_max_ms': 30000,
165168
'max_in_flight_requests_per_connection': 5,
169+
'receive_message_max_bytes': 1000000,
166170
'receive_buffer_bytes': None,
167171
'send_buffer_bytes': None,
168172
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
169-
'sock_chunk_bytes': 4096, # undocumented experimental option
170-
'sock_chunk_buffer_count': 1000, # undocumented experimental option
171173
'retry_backoff_ms': 100,
172174
'metadata_max_age_ms': 300000,
173175
'security_protocol': 'PLAINTEXT',

kafka/client_async.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ class KafkaClient(object):
8989
max_in_flight_requests_per_connection (int): Requests are pipelined
9090
to kafka brokers up to this number of maximum requests per
9191
broker connection. Default: 5.
92+
receive_message_max_bytes (int): Maximum allowed network frame size.
93+
Used to avoid OOM when decoding malformed network message header.
94+
Default: 1000000.
9295
receive_buffer_bytes (int): The size of the TCP receive buffer
9396
(SO_RCVBUF) to use when reading data. Default: None (relies on
9497
system defaults). Java client defaults to 32768.
@@ -186,6 +189,7 @@ class KafkaClient(object):
186189
'reconnect_backoff_ms': 50,
187190
'reconnect_backoff_max_ms': 30000,
188191
'max_in_flight_requests_per_connection': 5,
192+
'receive_message_max_bytes': 1000000,
189193
'receive_buffer_bytes': None,
190194
'send_buffer_bytes': None,
191195
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],

kafka/conn.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ class BrokerConnection(object):
122122
max_in_flight_requests_per_connection (int): Requests are pipelined
123123
to kafka brokers up to this number of maximum requests per
124124
broker connection. Default: 5.
125+
receive_message_max_bytes (int): Maximum allowed network frame size.
126+
Used to avoid OOM when decoding malformed network message header.
127+
Default: 1000000.
125128
receive_buffer_bytes (int): The size of the TCP receive buffer
126129
(SO_RCVBUF) to use when reading data. Default: None (relies on
127130
system defaults). Java client defaults to 32768.
@@ -202,6 +205,7 @@ class BrokerConnection(object):
202205
'reconnect_backoff_ms': 50,
203206
'reconnect_backoff_max_ms': 30000,
204207
'max_in_flight_requests_per_connection': 5,
208+
'receive_message_max_bytes': 1000000,
205209
'receive_buffer_bytes': None,
206210
'send_buffer_bytes': None,
207211
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
@@ -292,7 +296,8 @@ def __init__(self, host, port, afi, **configs):
292296

293297
self._protocol = KafkaProtocol(
294298
client_id=self.config['client_id'],
295-
api_version=self.config['api_version'])
299+
api_version=self.config['api_version'],
300+
max_frame_size=self.config['receive_message_max_bytes'])
296301
self.state = ConnectionStates.DISCONNECTED
297302
self._reset_reconnect_backoff()
298303
self._sock = None

kafka/consumer/group.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,9 @@ class KafkaConsumer(six.Iterator):
163163
should be set no higher than 1/3 of that value. It can be
164164
adjusted even lower to control the expected time for normal
165165
rebalances. Default: 3000
166+
receive_message_max_bytes (int): Maximum allowed network frame size.
167+
Used to avoid OOM when decoding malformed network message header.
168+
Default: 1000000.
166169
receive_buffer_bytes (int): The size of the TCP receive buffer
167170
(SO_RCVBUF) to use when reading data. Default: None (relies on
168171
system defaults). The java client defaults to 32768.
@@ -303,9 +306,8 @@ class KafkaConsumer(six.Iterator):
303306
'heartbeat_interval_ms': 3000,
304307
'receive_buffer_bytes': None,
305308
'send_buffer_bytes': None,
309+
'receive_message_max_bytes': 1000000,
306310
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
307-
'sock_chunk_bytes': 4096, # undocumented experimental option
308-
'sock_chunk_buffer_count': 1000, # undocumented experimental option
309311
'consumer_timeout_ms': float('inf'),
310312
'security_protocol': 'PLAINTEXT',
311313
'ssl_context': None,

kafka/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ class CorrelationIdError(KafkaProtocolError):
5757
retriable = True
5858

5959

60+
class InvalidReceiveError(KafkaProtocolError):
61+
pass
62+
63+
6064
class KafkaTimeoutError(KafkaError):
6165
retriable = True
6266

kafka/producer/kafka.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,9 @@ class KafkaProducer(object):
261261
errors. Default: 100.
262262
request_timeout_ms (int): Client request timeout in milliseconds.
263263
Default: 30000.
264+
receive_message_max_bytes (int): Maximum allowed network frame size.
265+
Used to avoid OOM when decoding malformed network message header.
266+
Default: 1000000.
264267
receive_buffer_bytes (int): The size of the TCP receive buffer
265268
(SO_RCVBUF) to use when reading data. Default: None (relies on
266269
system defaults). Java client defaults to 32768.
@@ -392,11 +395,10 @@ class KafkaProducer(object):
392395
'metadata_max_age_ms': 300000,
393396
'retry_backoff_ms': 100,
394397
'request_timeout_ms': 30000,
398+
'receive_message_max_bytes': 1000000,
395399
'receive_buffer_bytes': None,
396400
'send_buffer_bytes': None,
397401
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
398-
'sock_chunk_bytes': 4096, # undocumented experimental option
399-
'sock_chunk_buffer_count': 1000, # undocumented experimental option
400402
'reconnect_backoff_ms': 50,
401403
'reconnect_backoff_max_ms': 30000,
402404
'max_in_flight_requests_per_connection': 5,

kafka/protocol/parser.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@ class KafkaProtocol(object):
2323
api_version (tuple): Optional tuple to specify api_version to use.
2424
Currently only used to check for 0.8.2 protocol quirks, but
2525
may be used for more in the future.
26+
max_frame_size (int): Maximum allowed message frame size.
27+
Default: 100000000 (100MB).
2628
"""
27-
def __init__(self, client_id=None, api_version=None):
29+
def __init__(self, client_id=None, api_version=None, max_frame_size=100000000):
2830
if client_id is None:
2931
client_id = self._gen_client_id()
3032
self._client_id = client_id
3133
self._api_version = api_version
34+
self._max_frame_size = max_frame_size
3235
self._correlation_id = 0
3336
self._header = KafkaBytes(4)
3437
self._rbuffer = None
@@ -105,6 +108,7 @@ def receive_bytes(self, data):
105108
if self._header.tell() == 4:
106109
self._header.seek(0)
107110
nbytes = Int32.decode(self._header)
111+
self._validate_frame_size(nbytes)
108112
# reset buffer and switch state to receiving payload bytes
109113
self._rbuffer = KafkaBytes(nbytes)
110114
self._receiving = True
@@ -132,6 +136,10 @@ def receive_bytes(self, data):
132136
self._reset_buffer()
133137
return responses
134138

139+
def _validate_frame_size(self, nbytes):
140+
if nbytes < 0 or nbytes > self._max_frame_size:
141+
raise Errors.InvalidReceiveError('Invalid frame length: %d' % nbytes)
142+
135143
def _process_response(self, read_buffer):
136144
if not self.in_flight_requests:
137145
raise Errors.CorrelationIdError('No in-flight-request found for server response')

0 commit comments

Comments
 (0)