55import java .util .Random ;
66
77import io .ably .lib .test .common .ParameterizedTest ;
8+ import io .ably .lib .test .common .Helpers .ChannelWaiter ;
9+ import io .ably .lib .test .common .Helpers .MessageWaiter ;
810import io .ably .lib .types .*;
911
1012import io .ably .lib .rest .AblyRest ;
13+ import io .ably .lib .realtime .*;
1114import org .junit .Before ;
1215import org .junit .Ignore ;
1316import org .junit .Test ;
@@ -74,16 +77,24 @@ public void bulk_publish_multiple_channels_simple() {
7477 */
7578 @ Test
7679 public void bulk_publish_multiple_channels_param () {
80+ AblyRealtime rxAbly = null ;
7781 try {
7882 /* setup library instance */
7983 ClientOptions opts = createOptions (testVars .keys [0 ].keyStr );
8084 AblyRest ably = new AblyRest (opts );
85+ rxAbly = new AblyRealtime (opts );
8186
8287 /* first, publish some messages */
8388 int channelCount = 5 ;
8489 ArrayList <String > channelIds = new ArrayList <String >();
90+ ArrayList <MessageWaiter > rxWaiters = new ArrayList <MessageWaiter >();
8591 for (int i = 0 ; i < channelCount ; i ++) {
86- channelIds .add ("persisted:" + randomString ());
92+ String channelId = "persisted:" + randomString ();
93+ channelIds .add (channelId );
94+ Channel rxChannel = rxAbly .channels .get (channelId );
95+ MessageWaiter messageWaiter = new MessageWaiter (rxChannel );
96+ rxWaiters .add (messageWaiter );
97+ new ChannelWaiter (rxChannel ).waitFor (ChannelState .attached );
8798 }
8899
89100 Message message = new Message (null , "bulk_publish_multiple_channels_param" );
@@ -99,19 +110,23 @@ public void bulk_publish_multiple_channels_param() {
99110 assertNull ("Verify no publish error" , response .error );
100111 }
101112
102- /* get the history for this channel */
103- for (String channel : channelIds ) {
104- PaginatedResult <Message > messages = ably .channels .get (channel ).history (null );
105- assertNotNull ("Expected non-null messages" , messages );
106- assertEquals ("Expected 1 message" , messages .items ().length , 1 );
107- /* verify message contents */
108- assertEquals ("Expect message data to be expected String" , messages .items ()[0 ].data , message .data );
113+ /* Wait to get a message on each channel -- since we're using
114+ * quickAck, getting an ack is no longer a guarantee that the
115+ * message has been processed, so getting history immediately after
116+ * the publish returns may fail */
117+ for (MessageWaiter messageWaiter : rxWaiters ) {
118+ messageWaiter .waitFor (1 );
109119 }
110120 } catch (AblyException e ) {
111121 e .printStackTrace ();
112122 fail ("bulk_publish_multiple_channels_param: Unexpected exception" );
113123 return ;
124+ } finally {
125+ if (rxAbly != null ) {
126+ rxAbly .close ();
127+ }
114128 }
129+
115130 }
116131
117132 /**
0 commit comments