Skip to content

Commit af88909

Browse files
SimonWoolfQuintin Willison
authored andcommitted
Rest.publishBatch: support overloaded method that takes params
1 parent ff72dde commit af88909

2 files changed

Lines changed: 59 additions & 4 deletions

File tree

lib/src/main/java/io/ably/lib/rest/AblyBase.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -239,15 +239,25 @@ public void requestAsync(String method, String path, Param[] params, HttpCore.Re
239239
*/
240240
@Experimental
241241
public PublishResponse[] publishBatch(Message.Batch[] pubSpecs, ChannelOptions channelOptions) throws AblyException {
242-
return publishBatchImpl(pubSpecs, channelOptions).sync();
242+
return publishBatchImpl(pubSpecs, channelOptions, null).sync();
243+
}
244+
245+
@Experimental
246+
public PublishResponse[] publishBatch(Message.Batch[] pubSpecs, ChannelOptions channelOptions, Param[] params) throws AblyException {
247+
return publishBatchImpl(pubSpecs, channelOptions, params).sync();
243248
}
244249

245250
@Experimental
246251
public void publishBatchAsync(Message.Batch[] pubSpecs, ChannelOptions channelOptions, final Callback<PublishResponse[]> callback) throws AblyException {
247-
publishBatchImpl(pubSpecs, channelOptions).async(callback);
252+
publishBatchImpl(pubSpecs, channelOptions, null).async(callback);
253+
}
254+
255+
@Experimental
256+
public void publishBatchAsync(Message.Batch[] pubSpecs, ChannelOptions channelOptions, Param[] params, final Callback<PublishResponse[]> callback) throws AblyException {
257+
publishBatchImpl(pubSpecs, channelOptions, null).async(callback);
248258
}
249259

250-
private Http.Request<PublishResponse[]> publishBatchImpl(final Message.Batch[] pubSpecs, ChannelOptions channelOptions) throws AblyException {
260+
private Http.Request<PublishResponse[]> publishBatchImpl(final Message.Batch[] pubSpecs, ChannelOptions channelOptions, final Param[] params) throws AblyException {
251261
boolean hasClientSuppliedId = false;
252262
for(Message.Batch spec : pubSpecs) {
253263
for(Message message : spec.messages) {
@@ -270,7 +280,7 @@ private Http.Request<PublishResponse[]> publishBatchImpl(final Message.Batch[] p
270280
@Override
271281
public void execute(HttpScheduler http, final Callback<PublishResponse[]> callback) throws AblyException {
272282
HttpCore.RequestBody requestBody = options.useBinaryProtocol ? MessageSerializer.asMsgpackRequest(pubSpecs) : MessageSerializer.asJSONRequest(pubSpecs);
273-
http.post("/messages", HttpUtils.defaultAcceptHeaders(options.useBinaryProtocol), null, requestBody, new HttpCore.ResponseHandler<PublishResponse[]>() {
283+
http.post("/messages", HttpUtils.defaultAcceptHeaders(options.useBinaryProtocol), params, requestBody, new HttpCore.ResponseHandler<PublishResponse[]>() {
274284
@Override
275285
public PublishResponse[] handleResponse(HttpCore.Response response, ErrorInfo error) throws AblyException {
276286
if(error != null && error.code != 40020) {

lib/src/test/java/io/ably/lib/test/rest/RestChannelBulkPublishTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,51 @@ public void bulk_publish_multiple_channels_simple() {
6969
}
7070
}
7171

72+
/**
73+
* As above but with the param method
74+
*/
75+
@Test
76+
public void bulk_publish_multiple_channels_param() {
77+
try {
78+
/* setup library instance */
79+
ClientOptions opts = createOptions(testVars.keys[0].keyStr);
80+
AblyRest ably = new AblyRest(opts);
81+
82+
/* first, publish some messages */
83+
int channelCount = 5;
84+
ArrayList<String> channelIds = new ArrayList<String>();
85+
for(int i = 0; i < channelCount; i++) {
86+
channelIds.add("persisted:" + randomString());
87+
}
88+
89+
Message message = new Message(null, "bulk_publish_multiple_channels_param");
90+
String messageId = message.id = randomString();
91+
Message.Batch payload = new Message.Batch(channelIds, Collections.singleton(message));
92+
93+
Param[] params = new Param[] { new Param("quickAck", "true") };
94+
95+
PublishResponse[] result = ably.publishBatch(new Message.Batch[] { payload }, null, params);
96+
for(PublishResponse response : result) {
97+
assertEquals("Verify expected response id", response.messageId, messageId);
98+
assertTrue("Verify expected channel name", channelIds.contains(response.channelId));
99+
assertNull("Verify no publish error", response.error);
100+
}
101+
102+
/* get the history for this channel */
103+
for(String channel : channelIds) {
104+
PaginatedResult<Message> messages = ably.channels.get(channel).history(null);
105+
assertNotNull("Expected non-null messages", messages);
106+
assertEquals("Expected 1 message", messages.items().length, 1);
107+
/* verify message contents */
108+
assertEquals("Expect message data to be expected String", messages.items()[0].data, message.data);
109+
}
110+
} catch (AblyException e) {
111+
e.printStackTrace();
112+
fail("bulk_publish_multiple_channels_param: Unexpected exception");
113+
return;
114+
}
115+
}
116+
72117
/**
73118
* Publish a multiple messages on multiple channels
74119
*

0 commit comments

Comments
 (0)