Skip to content

Commit e47725a

Browse files
authored
Add readinto to StreamingBody (boto#2746)
This adds a readinto implementation to StreamingBody, following the contract outlined in the Python io package. The primary motivation for this is that it allows consumers to wrap the StreamingBody in an io.BufferedReader. This provides a nice performance boost for parsing streaming binary data formats, where you want to be able to read small numbers of bytes in your parsing algorithm, but benefit from reading large chunks of data from upstream.
1 parent a4c8c09 commit e47725a

4 files changed

Lines changed: 115 additions & 0 deletions

File tree

botocore/httpchecksum.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,17 @@ def read(self, amt=None):
244244
self._validate_checksum()
245245
return chunk
246246

247+
def readinto(self, b):
248+
amount_read = super().readinto(b)
249+
if amount_read == len(b):
250+
view = b
251+
else:
252+
view = memoryview(b)[:amount_read]
253+
self._checksum.update(view)
254+
if amount_read == 0 and len(b) > 0:
255+
self._validate_checksum()
256+
return amount_read
257+
247258
def _validate_checksum(self):
248259
if self._checksum.digest() != base64.b64decode(self._expected):
249260
error_msg = (

botocore/response.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,22 @@ def read(self, amt=None):
110110
self._verify_content_length()
111111
return chunk
112112

113+
def readinto(self, b):
114+
"""Read bytes into a pre-allocated, writable bytes-like object b, and return the number of bytes read."""
115+
try:
116+
amount_read = self._raw_stream.readinto(b)
117+
except URLLib3ReadTimeoutError as e:
118+
# TODO: the url will be None as urllib3 isn't setting it yet
119+
raise ReadTimeoutError(endpoint_url=e.url, error=e)
120+
except URLLib3ProtocolError as e:
121+
raise ResponseStreamingError(error=e)
122+
self._amount_read += amount_read
123+
if amount_read == 0 and len(b) > 0:
124+
# If the server sends empty contents then we know we need to verify
125+
# the content length.
126+
self._verify_content_length()
127+
return amount_read
128+
113129
def readlines(self):
114130
return self._raw_stream.readlines()
115131

tests/unit/test_httpchecksum.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -878,3 +878,38 @@ def test_iter_raises_error(self):
878878
with self.assertRaises(FlexibleChecksumError):
879879
for chunk in self.wrapper:
880880
pass
881+
882+
def test_readinto_good(self):
883+
chunk = bytearray(6)
884+
self.assertEqual(6, self.wrapper.readinto(chunk))
885+
self.assertEqual(chunk, bytearray(b"hello "))
886+
self.assertEqual(5, self.wrapper.readinto(chunk))
887+
# Note the trailing space here comes from the fact we've only got 5
888+
# bytes left to read from the stream into a 6 byte buffer, so it leaves
889+
# the last byte untouched, which is the space character from the
890+
# previous read.
891+
self.assertEqual(chunk, bytearray(b"world "))
892+
# Whole body has been read, next read signals the end of the stream and
893+
# validates the checksum of the body contents read
894+
self.wrapper.readinto(chunk)
895+
896+
def test_readinto_bad(self):
897+
self._make_wrapper("duorhq==")
898+
chunk = bytearray(6)
899+
self.assertEqual(6, self.wrapper.readinto(chunk))
900+
self.assertEqual(chunk, bytearray(b"hello "))
901+
self.assertEqual(5, self.wrapper.readinto(chunk))
902+
self.assertEqual(chunk, bytearray(b"world "))
903+
# Whole body has been read, next read signals the end of the stream and
904+
# validates the checksum of the body contents read
905+
with self.assertRaises(FlexibleChecksumError):
906+
self.wrapper.readinto(chunk)
907+
908+
def test_readinto_zero_bytes(self):
909+
# Test that readinto returns 0 when 0 bytes are requested
910+
chunk = bytearray(0)
911+
self.assertEqual(0, self.wrapper.readinto(chunk))
912+
self.assertEqual(chunk, bytearray(b""))
913+
# Whole body has been read, next read signals the end of the stream and
914+
# validates the checksum of the body contents read
915+
self.wrapper.readinto(chunk)

tests/unit/test_response.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,59 @@ def test_streaming_body_readlines(self):
110110
chunks = [b'1234567890\n', b'1234567890\n', b'12345']
111111
self.assertEqual(stream.readlines(), chunks)
112112

113+
def test_streaming_body_readinto(self):
114+
body = BytesIO(b"123456789")
115+
stream = response.StreamingBody(body, content_length=9)
116+
chunk = bytearray(b"\x00\x00\x00\x00\x00")
117+
self.assertEqual(5, stream.readinto(chunk))
118+
self.assertEqual(chunk, bytearray(b"\x31\x32\x33\x34\x35"))
119+
self.assertEqual(4, stream.readinto(chunk))
120+
self.assertEqual(chunk, bytearray(b"\x36\x37\x38\x39\x35"))
121+
122+
def test_streaming_body_readinto_with_invalid_length(self):
123+
body = BytesIO(b"12")
124+
stream = response.StreamingBody(body, content_length=9)
125+
chunk = bytearray(b"\xde\xad\xbe\xef")
126+
self.assertEqual(2, stream.readinto(chunk))
127+
self.assertEqual(chunk, bytearray(b"\x31\x32\xbe\xef"))
128+
with self.assertRaises(IncompleteReadError):
129+
stream.readinto(chunk)
130+
131+
def test_streaming_body_readinto_with_empty_buffer(self):
132+
body = BytesIO(b"12")
133+
stream = response.StreamingBody(body, content_length=9)
134+
chunk = bytearray(b"")
135+
# Does not raise IncompleteReadError
136+
self.assertEqual(0, stream.readinto(chunk))
137+
138+
def test_streaming_body_readinto_catches_urllib3_read_timeout(self):
139+
class TimeoutBody:
140+
def readinto(*args, **kwargs):
141+
raise URLLib3ReadTimeoutError(None, None, None)
142+
143+
def geturl(*args, **kwargs):
144+
return "http://example.com"
145+
146+
stream = response.StreamingBody(TimeoutBody(), content_length=None)
147+
with self.assertRaises(ReadTimeoutError):
148+
chunk = bytearray(b"\x00\x00\x00\x00\x00")
149+
stream.readinto(chunk)
150+
151+
def test_streaming_body_readinto_catches_urllib3_protocol_error(self):
152+
class ProtocolErrorBody:
153+
def readinto(*args, **kwargs):
154+
raise URLLib3ProtocolError(None, None, None)
155+
156+
def geturl(*args, **kwargs):
157+
return "http://example.com"
158+
159+
stream = response.StreamingBody(
160+
ProtocolErrorBody(), content_length=None
161+
)
162+
with self.assertRaises(ResponseStreamingError):
163+
chunk = bytearray(b"\x00\x00\x00\x00\x00")
164+
stream.readinto(chunk)
165+
113166
def test_streaming_body_tell(self):
114167
body = BytesIO(b'1234567890')
115168
stream = response.StreamingBody(body, content_length=10)

0 commit comments

Comments
 (0)