Skip to content

Commit edd1989

Browse files
Merge pull request ably#604 from ably/publishBatch-params
Rest.publishBatch: support overloaded method that takes params
2 parents 19649e2 + 7976437 commit edd1989

2 files changed

Lines changed: 78 additions & 8 deletions

File tree

lib/src/main/java/io/ably/lib/rest/AblyBase.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -239,15 +239,25 @@ public void requestAsync(String method, String path, Param[] params, HttpCore.Re
239239
*/
240240
@Experimental
241241
public PublishResponse[] publishBatch(Message.Batch[] pubSpecs, ChannelOptions channelOptions) throws AblyException {
242-
return publishBatchImpl(pubSpecs, channelOptions).sync();
242+
return publishBatchImpl(pubSpecs, channelOptions, null).sync();
243+
}
244+
245+
@Experimental
246+
public PublishResponse[] publishBatch(Message.Batch[] pubSpecs, ChannelOptions channelOptions, Param[] params) throws AblyException {
247+
return publishBatchImpl(pubSpecs, channelOptions, params).sync();
243248
}
244249

245250
@Experimental
246251
public void publishBatchAsync(Message.Batch[] pubSpecs, ChannelOptions channelOptions, final Callback<PublishResponse[]> callback) throws AblyException {
247-
publishBatchImpl(pubSpecs, channelOptions).async(callback);
252+
publishBatchImpl(pubSpecs, channelOptions, null).async(callback);
253+
}
254+
255+
@Experimental
256+
public void publishBatchAsync(Message.Batch[] pubSpecs, ChannelOptions channelOptions, Param[] params, final Callback<PublishResponse[]> callback) throws AblyException {
257+
publishBatchImpl(pubSpecs, channelOptions, params).async(callback);
248258
}
249259

250-
private Http.Request<PublishResponse[]> publishBatchImpl(final Message.Batch[] pubSpecs, ChannelOptions channelOptions) throws AblyException {
260+
private Http.Request<PublishResponse[]> publishBatchImpl(final Message.Batch[] pubSpecs, ChannelOptions channelOptions, final Param[] params) throws AblyException {
251261
boolean hasClientSuppliedId = false;
252262
for(Message.Batch spec : pubSpecs) {
253263
for(Message message : spec.messages) {
@@ -270,7 +280,7 @@ private Http.Request<PublishResponse[]> publishBatchImpl(final Message.Batch[] p
270280
@Override
271281
public void execute(HttpScheduler http, final Callback<PublishResponse[]> callback) throws AblyException {
272282
HttpCore.RequestBody requestBody = options.useBinaryProtocol ? MessageSerializer.asMsgpackRequest(pubSpecs) : MessageSerializer.asJSONRequest(pubSpecs);
273-
http.post("/messages", HttpUtils.defaultAcceptHeaders(options.useBinaryProtocol), null, requestBody, new HttpCore.ResponseHandler<PublishResponse[]>() {
283+
http.post("/messages", HttpUtils.defaultAcceptHeaders(options.useBinaryProtocol), params, requestBody, new HttpCore.ResponseHandler<PublishResponse[]>() {
274284
@Override
275285
public PublishResponse[] handleResponse(HttpCore.Response response, ErrorInfo error) throws AblyException {
276286
if(error != null && error.code != 40020) {

lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@
55
import java.util.Random;
66

77
import io.ably.lib.test.common.ParameterizedTest;
8+
import io.ably.lib.test.common.Helpers.ChannelWaiter;
9+
import io.ably.lib.test.common.Helpers.MessageWaiter;
810
import io.ably.lib.types.*;
911

1012
import io.ably.lib.rest.AblyRest;
13+
import io.ably.lib.realtime.*;
1114
import org.junit.Before;
1215
import org.junit.Ignore;
1316
import org.junit.Test;
@@ -18,15 +21,15 @@ public class RestChannelBulkPublishTest extends ParameterizedTest {
1821

1922
/**
2023
* Publish a single message on multiple channels
21-
*
24+
*
2225
* The payload constructed has the form
2326
* [
2427
* {
2528
* channel: [ <channel 0>, <channel 1>, ... ],
2629
* message: [{ data: <message text> }]
2730
* }
2831
* ]
29-
*
32+
*
3033
* It publishes the given message on all of the given channels.
3134
*/
3235
@Test
@@ -69,9 +72,66 @@ public void bulk_publish_multiple_channels_simple() {
6972
}
7073
}
7174

75+
/**
76+
* As above but with the param method
77+
*/
78+
@Test
79+
public void bulk_publish_multiple_channels_param() {
80+
AblyRealtime rxAbly = null;
81+
try {
82+
/* setup library instance */
83+
ClientOptions opts = createOptions(testVars.keys[0].keyStr);
84+
AblyRest ably = new AblyRest(opts);
85+
rxAbly = new AblyRealtime(opts);
86+
87+
/* first, publish some messages */
88+
int channelCount = 5;
89+
ArrayList<String> channelIds = new ArrayList<String>();
90+
ArrayList<MessageWaiter> rxWaiters = new ArrayList<MessageWaiter>();
91+
for (int i = 0; i < channelCount; i++) {
92+
String channelId = "persisted:" + randomString();
93+
channelIds.add(channelId);
94+
Channel rxChannel = rxAbly.channels.get(channelId);
95+
MessageWaiter messageWaiter = new MessageWaiter(rxChannel);
96+
rxWaiters.add(messageWaiter);
97+
new ChannelWaiter(rxChannel).waitFor(ChannelState.attached);
98+
}
99+
100+
Message message = new Message(null, "bulk_publish_multiple_channels_param");
101+
String messageId = message.id = randomString();
102+
Message.Batch payload = new Message.Batch(channelIds, Collections.singleton(message));
103+
104+
Param[] params = new Param[]{new Param("quickAck", "true")};
105+
106+
PublishResponse[] result = ably.publishBatch(new Message.Batch[]{payload}, null, params);
107+
for (PublishResponse response : result) {
108+
assertEquals("Verify expected response id", response.messageId, messageId);
109+
assertTrue("Verify expected channel name", channelIds.contains(response.channelId));
110+
assertNull("Verify no publish error", response.error);
111+
}
112+
113+
/* Wait to get a message on each channel -- since we're using
114+
* quickAck, getting an ack is no longer a guarantee that the
115+
* message has been processed, so getting history immediately after
116+
* the publish returns may fail */
117+
for (MessageWaiter messageWaiter : rxWaiters) {
118+
messageWaiter.waitFor(1);
119+
}
120+
} catch (AblyException e) {
121+
e.printStackTrace();
122+
fail("bulk_publish_multiple_channels_param: Unexpected exception");
123+
return;
124+
} finally {
125+
if(rxAbly != null) {
126+
rxAbly.close();
127+
}
128+
}
129+
130+
}
131+
72132
/**
73133
* Publish a multiple messages on multiple channels
74-
*
134+
*
75135
* The payload constructed has the form
76136
* [
77137
* {
@@ -94,7 +154,7 @@ public void bulk_publish_multiple_channels_simple() {
94154
* },
95155
* ...
96156
* ]
97-
*
157+
*
98158
* It publishes the given messages on the associated channels.
99159
*/
100160
@Test

0 commit comments

Comments
 (0)