@@ -34,13 +34,15 @@ def create_client():
3434 return publisher .Client (credentials = creds )
3535
3636
37- def create_batch (autocommit = False , ** batch_settings ):
37+ def create_batch (autocommit = False , topic = "topic_name" , ** batch_settings ):
3838 """Return a batch object suitable for testing.
3939
4040 Args:
4141 autocommit (bool): Whether the batch should commit after
4242 ``max_latency`` seconds. By default, this is ``False``
4343 for unit testing.
44+ topic (str): The name of the topic the batch should publish
45+ the messages to.
4446 batch_settings (dict): Arguments passed on to the
4547 :class:``~.pubsub_v1.types.BatchSettings`` constructor.
4648
@@ -49,7 +51,7 @@ def create_batch(autocommit=False, **batch_settings):
4951 """
5052 client = create_client ()
5153 settings = types .BatchSettings (** batch_settings )
52- return Batch (client , "topic_name" , settings , autocommit = autocommit )
54+ return Batch (client , topic , settings , autocommit = autocommit )
5355
5456
5557def test_init ():
@@ -299,8 +301,8 @@ def test_monitor_already_committed():
299301 assert batch ._status == status
300302
301303
302- def test_publish ():
303- batch = create_batch ()
304+ def test_publish_updating_batch_size ():
305+ batch = create_batch (topic = "topic_foo" )
304306 messages = (
305307 types .PubsubMessage (data = b"foobarbaz" ),
306308 types .PubsubMessage (data = b"spameggs" ),
@@ -314,22 +316,27 @@ def test_publish():
314316 assert len (batch .messages ) == 3
315317 assert batch ._futures == futures
316318
317- # The size should have been incremented by the sum of the size of the
318- # messages.
319- expected_size = sum ([message_pb .ByteSize () for message_pb in messages ])
320- assert batch .size == expected_size
319+ # The size should have been incremented by the sum of the size
320+ # contributions of each message to the PublishRequest.
321+ base_request_size = types .PublishRequest (topic = "topic_foo" ).ByteSize ()
322+ expected_request_size = base_request_size + sum (
323+ types .PublishRequest (messages = [msg ]).ByteSize () for msg in messages
324+ )
325+
326+ assert batch .size == expected_request_size
321327 assert batch .size > 0 # I do not always trust protobuf.
322328
323329
324330def test_publish_not_will_accept ():
325- batch = create_batch (max_messages = 0 )
331+ batch = create_batch (topic = "topic_foo" , max_messages = 0 )
332+ base_request_size = types .PublishRequest (topic = "topic_foo" ).ByteSize ()
326333
327334 # Publish the message.
328335 message = types .PubsubMessage (data = b"foobarbaz" )
329336 future = batch .publish (message )
330337
331338 assert future is None
332- assert batch .size == 0
339+ assert batch .size == base_request_size
333340 assert batch .messages == []
334341 assert batch ._futures == []
335342
@@ -361,6 +368,47 @@ def test_publish_exceed_max_messages():
361368 assert batch ._futures == futures
362369
363370
371+ @mock .patch .object (thread , "_SERVER_PUBLISH_MAX_BYTES" , 1000 )
372+ def test_publish_single_message_size_exceeds_server_size_limit ():
373+ batch = create_batch (
374+ topic = "topic_foo" ,
375+ max_messages = 1000 ,
376+ max_bytes = 1000 * 1000 , # way larger than (mocked) server side limit
377+ )
378+
379+ big_message = types .PubsubMessage (data = b"x" * 984 )
380+
381+ request_size = types .PublishRequest (
382+ topic = "topic_foo" , messages = [big_message ]
383+ ).ByteSize ()
384+ assert request_size == 1001 # sanity check, just above the (mocked) server limit
385+
386+ with pytest .raises (exceptions .MessageTooLargeError ):
387+ batch .publish (big_message )
388+
389+
390+ @mock .patch .object (thread , "_SERVER_PUBLISH_MAX_BYTES" , 1000 )
391+ def test_publish_total_messages_size_exceeds_server_size_limit ():
392+ batch = create_batch (topic = "topic_foo" , max_messages = 10 , max_bytes = 1500 )
393+
394+ messages = (
395+ types .PubsubMessage (data = b"x" * 500 ),
396+ types .PubsubMessage (data = b"x" * 600 ),
397+ )
398+
399+ # Sanity check - request size is still below BatchSettings.max_bytes,
400+ # but it exceeds the server-side size limit.
401+ request_size = types .PublishRequest (topic = "topic_foo" , messages = messages ).ByteSize ()
402+ assert 1000 < request_size < 1500
403+
404+ with mock .patch .object (batch , "commit" ) as fake_commit :
405+ batch .publish (messages [0 ])
406+ batch .publish (messages [1 ])
407+
408+ # The server side limit should kick in and cause a commit.
409+ fake_commit .assert_called_once ()
410+
411+
364412def test_publish_dict ():
365413 batch = create_batch ()
366414 future = batch .publish ({"data" : b"foobarbaz" , "attributes" : {"spam" : "eggs" }})
0 commit comments