@@ -348,22 +348,37 @@ public void testBrokerZeroPrefetchConfig() throws Exception {
348348 public void testBrokerZeroPrefetchConfigWithConsumerControl () throws Exception {
349349 Session session = connection .createSession (false , Session .AUTO_ACKNOWLEDGE );
350350
351- ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer ) session .createConsumer (brokerZeroQueue );
351+ final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer ) session .createConsumer (brokerZeroQueue );
352+
353+ // Wait for broker subscription to be created and policy applied
354+ final ActiveMQDestination transformedDest = ActiveMQDestination .transform (brokerZeroQueue );
355+ org .apache .activemq .util .Wait .waitFor (new org .apache .activemq .util .Wait .Condition () {
356+ @ Override
357+ public boolean isSatisified () throws Exception {
358+ return broker .getRegionBroker ().getDestinationMap ().get (transformedDest ) != null
359+ && !broker .getRegionBroker ().getDestinationMap ().get (transformedDest ).getConsumers ().isEmpty ();
360+ }
361+ }, 5000 , 100 );
362+
352363 assertEquals ("broker config prefetch in effect" , 0 , consumer .info .getCurrentPrefetchSize ());
353364
354365 // verify sub view broker
355366 Subscription sub =
356- broker .getRegionBroker ().getDestinationMap ().get (ActiveMQDestination . transform ( brokerZeroQueue ) ).getConsumers ().get (0 );
367+ broker .getRegionBroker ().getDestinationMap ().get (transformedDest ).getConsumers ().get (0 );
357368 assertEquals ("broker sub prefetch is correct" , 0 , sub .getConsumerInfo ().getCurrentPrefetchSize ());
358369
359370 // manipulate Prefetch (like failover and stomp)
360371 ConsumerControl consumerControl = new ConsumerControl ();
361372 consumerControl .setConsumerId (consumer .info .getConsumerId ());
362- consumerControl .setDestination (ActiveMQDestination . transform ( brokerZeroQueue ) );
373+ consumerControl .setDestination (transformedDest );
363374 consumerControl .setPrefetch (1000 ); // default for a q
364375
365376 Object reply = ((ActiveMQConnection ) connection ).getTransport ().request (consumerControl );
366377 assertTrue ("good request" , !(reply instanceof ExceptionResponse ));
378+
379+ // Wait for the ConsumerControl to be processed
380+ Thread .sleep (500 );
381+
367382 assertEquals ("broker config prefetch in effect" , 0 , consumer .info .getCurrentPrefetchSize ());
368383 assertEquals ("broker sub prefetch is correct" , 0 , sub .getConsumerInfo ().getCurrentPrefetchSize ());
369384 }
0 commit comments