@@ -500,7 +500,7 @@ def test_streaming_pull_ack_deadline(
500500 )
501501
502502 # publish some messages and wait for completion
503- self . _publish_messages (publisher , topic_path , batch_sizes = [2 ])
503+ _publish_messages (publisher , topic_path , batch_sizes = [2 ])
504504
505505 # subscribe to the topic
506506 callback = StreamingPullCallback (
@@ -543,7 +543,7 @@ def test_streaming_pull_max_messages(
543543 subscriber .create_subscription (subscription_path , topic_path )
544544
545545 batch_sizes = (7 , 4 , 8 , 2 , 10 , 1 , 3 , 8 , 6 , 1 ) # total: 50
546- self . _publish_messages (publisher , topic_path , batch_sizes = batch_sizes )
546+ _publish_messages (publisher , topic_path , batch_sizes = batch_sizes )
547547
548548 # now subscribe and do the main part, check for max pending messages
549549 total_messages = sum (batch_sizes )
@@ -585,10 +585,12 @@ def test_streaming_pull_max_messages(
585585 finally :
586586 subscription_future .cancel () # trigger clean shutdown
587587
588- @pytest .mark .skipif (
589- "KOKORO_GFILE_DIR" not in os .environ ,
590- reason = "Requires Kokoro environment with a service account with limited role." ,
591- )
588+
589+ @pytest .mark .skipif (
590+ "KOKORO_GFILE_DIR" not in os .environ ,
591+ reason = "Requires Kokoro environment with a service account with limited role." ,
592+ )
593+ class TestBasicRBAC (object ):
592594 def test_streaming_pull_subscriber_permissions_sufficient (
593595 self , publisher , topic_path , subscriber , subscription_path , cleanup
594596 ):
@@ -611,7 +613,7 @@ def test_streaming_pull_subscriber_permissions_sufficient(
611613 # successfully pulls and processes it.
612614 callback = StreamingPullCallback (processing_time = 0.01 , resolve_at_msg_count = 1 )
613615 future = streaming_pull_subscriber .subscribe (subscription_path , callback )
614- self . _publish_messages (publisher , topic_path , batch_sizes = [1 ])
616+ _publish_messages (publisher , topic_path , batch_sizes = [1 ])
615617
616618 try :
617619 callback .done_future .result (timeout = 10 )
@@ -624,10 +626,6 @@ def test_streaming_pull_subscriber_permissions_sufficient(
624626 finally :
625627 future .cancel ()
626628
627- @pytest .mark .skipif (
628- "KOKORO_GFILE_DIR" not in os .environ ,
629- reason = "Requires Kokoro environment with a service account with limited role." ,
630- )
631629 def test_publisher_role_can_publish_messages (
632630 self , publisher , topic_path , subscriber , subscription_path , cleanup
633631 ):
@@ -646,18 +644,14 @@ def test_publisher_role_can_publish_messages(
646644 )
647645 publisher_only_client = type (publisher ).from_service_account_file (filename )
648646
649- self . _publish_messages (publisher_only_client , topic_path , batch_sizes = [2 ])
647+ _publish_messages (publisher_only_client , topic_path , batch_sizes = [2 ])
650648
651649 response = subscriber .pull (subscription_path , max_messages = 2 )
652650 assert len (response .received_messages ) == 2
653651
654652 @pytest .mark .skip (
655653 "Snapshot creation is not instant on the backend, causing test falkiness."
656654 )
657- @pytest .mark .skipif (
658- "KOKORO_GFILE_DIR" not in os .environ ,
659- reason = "Requires Kokoro environment with a service account with limited role." ,
660- )
661655 def test_snapshot_seek_subscriber_permissions_sufficient (
662656 self , project , publisher , topic_path , subscriber , subscription_path , cleanup
663657 ):
@@ -682,13 +676,13 @@ def test_snapshot_seek_subscriber_permissions_sufficient(
682676 subscriber_only_client = type (subscriber ).from_service_account_file (filename )
683677
684678 # Publish two messages and create a snapshot inbetween.
685- self . _publish_messages (publisher , topic_path , batch_sizes = [1 ])
679+ _publish_messages (publisher , topic_path , batch_sizes = [1 ])
686680 response = subscriber .pull (subscription_path , max_messages = 10 )
687681 assert len (response .received_messages ) == 1
688682
689683 subscriber .create_snapshot (snapshot_path , subscription_path )
690684
691- self . _publish_messages (publisher , topic_path , batch_sizes = [1 ])
685+ _publish_messages (publisher , topic_path , batch_sizes = [1 ])
692686 response = subscriber .pull (subscription_path , max_messages = 10 )
693687 assert len (response .received_messages ) == 1
694688
@@ -699,10 +693,6 @@ def test_snapshot_seek_subscriber_permissions_sufficient(
699693 response = subscriber .pull (subscription_path , max_messages = 10 )
700694 assert len (response .received_messages ) == 1
701695
702- @pytest .mark .skipif (
703- "KOKORO_GFILE_DIR" not in os .environ ,
704- reason = "Requires Kokoro environment with a service account with limited role." ,
705- )
706696 def test_viewer_role_can_list_resources (
707697 self , project , publisher , topic_path , subscriber , cleanup
708698 ):
@@ -727,10 +717,6 @@ def test_viewer_role_can_list_resources(
727717 next (iter (viewer_only_subscriber .list_subscriptions (project_path )), None )
728718 next (iter (viewer_only_subscriber .list_snapshots (project_path )), None )
729719
730- @pytest .mark .skipif (
731- "KOKORO_GFILE_DIR" not in os .environ ,
732- reason = "Requires Kokoro environment with a service account with limited role." ,
733- )
734720 def test_editor_role_can_create_resources (
735721 self , project , publisher , topic_path , subscriber , subscription_path , cleanup
736722 ):
@@ -754,30 +740,29 @@ def test_editor_role_can_create_resources(
754740 editor_subscriber .create_subscription (subscription_path , topic_path )
755741 editor_subscriber .create_snapshot (snapshot_path , subscription_path )
756742
757- def _publish_messages (self , publisher , topic_path , batch_sizes ):
758- """Publish ``count`` messages in batches and wait until completion."""
759- publish_futures = []
760- msg_counter = itertools .count (start = 1 )
761743
762- for batch_size in batch_sizes :
763- msg_batch = self ._make_messages (count = batch_size )
764- for msg in msg_batch :
765- future = publisher .publish (
766- topic_path , msg , seq_num = str (next (msg_counter ))
767- )
768- publish_futures .append (future )
769- time .sleep (0.1 )
744+ def _publish_messages (publisher , topic_path , batch_sizes ):
745+ """Publish ``count`` messages in batches and wait until completion."""
746+ publish_futures = []
747+ msg_counter = itertools .count (start = 1 )
770748
771- # wait untill all messages have been successfully published
772- for future in publish_futures :
773- future .result (timeout = 30 )
749+ for batch_size in batch_sizes :
750+ msg_batch = _make_messages (count = batch_size )
751+ for msg in msg_batch :
752+ future = publisher .publish (topic_path , msg , seq_num = str (next (msg_counter )))
753+ publish_futures .append (future )
754+ time .sleep (0.1 )
774755
775- def _make_messages (self , count ):
776- messages = [
777- u"message {}/{}" .format (i , count ).encode ("utf-8" )
778- for i in range (1 , count + 1 )
779- ]
780- return messages
756+ # wait untill all messages have been successfully published
757+ for future in publish_futures :
758+ future .result (timeout = 30 )
759+
760+
761+ def _make_messages (count ):
762+ messages = [
763+ u"message {}/{}" .format (i , count ).encode ("utf-8" ) for i in range (1 , count + 1 )
764+ ]
765+ return messages
781766
782767
783768class AckCallback (object ):
0 commit comments