Skip to content

Commit a40bb30

Browse files
authored
Merge pull request #1163 from ably/ECO-5564/protocol-v4
feat: implement protocol v4 support for enhanced message annotations and versioning
2 parents 3f9d007 + d05b4dd commit a40bb30

16 files changed

Lines changed: 579 additions & 287 deletions

File tree

lib/src/main/java/io/ably/lib/http/HttpCore.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,8 +333,10 @@ private Map<String, String> collectRequestHeaders(URL url, String method, Param[
333333
requestHeaders.put(HttpConstants.Headers.ACCEPT, HttpConstants.ContentTypes.JSON);
334334
}
335335

336-
/* pass required headers */
337-
requestHeaders.put(Defaults.ABLY_PROTOCOL_VERSION_HEADER, Defaults.ABLY_PROTOCOL_VERSION); // RSC7a
336+
if (!requestHeaders.containsKey(Defaults.ABLY_PROTOCOL_VERSION_HEADER)) {
337+
requestHeaders.put(Defaults.ABLY_PROTOCOL_VERSION_HEADER, Defaults.ABLY_PROTOCOL_VERSION); // RSC7e
338+
}
339+
338340
Map<String, String> additionalAgents = new HashMap<>();
339341
if (options.agents != null) additionalAgents.putAll(options.agents);
340342
if (dynamicAgents != null) additionalAgents.putAll(dynamicAgents);

lib/src/main/java/io/ably/lib/realtime/ChannelBase.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,17 @@
2929
import io.ably.lib.types.DeltaExtras;
3030
import io.ably.lib.types.ErrorInfo;
3131
import io.ably.lib.types.Message;
32-
import io.ably.lib.types.MessageAction;
32+
import io.ably.lib.types.MessageAnnotations;
3333
import io.ably.lib.types.MessageDecodeException;
3434
import io.ably.lib.types.MessageSerializer;
35+
import io.ably.lib.types.MessageVersion;
3536
import io.ably.lib.types.PaginatedResult;
3637
import io.ably.lib.types.Param;
3738
import io.ably.lib.types.PresenceMessage;
3839
import io.ably.lib.types.ProtocolMessage;
3940
import io.ably.lib.types.ProtocolMessage.Action;
4041
import io.ably.lib.types.ProtocolMessage.Flag;
42+
import io.ably.lib.types.Summary;
4143
import io.ably.lib.util.CollectionUtils;
4244
import io.ably.lib.util.EventEmitter;
4345
import io.ably.lib.util.Log;
@@ -901,10 +903,16 @@ private void onMessage(final ProtocolMessage protocolMessage) {
901903
if(msg.connectionId == null) msg.connectionId = protocolMessage.connectionId;
902904
if(msg.timestamp == 0) msg.timestamp = protocolMessage.timestamp;
903905
if(msg.id == null) msg.id = protocolMessage.id + ':' + i;
904-
// (TM2k)
905-
if(msg.serial == null && msg.version != null && msg.action == MessageAction.MESSAGE_CREATE) msg.serial = msg.version;
906-
// (TM2o)
907-
if(msg.createdAt == null && msg.action == MessageAction.MESSAGE_CREATE) msg.createdAt = msg.timestamp;
906+
// (TM2s)
907+
if(msg.version == null) msg.version = new MessageVersion(msg.serial, msg.timestamp);
908+
// (TM2s1)
909+
if(msg.version.serial == null) msg.version.serial = msg.serial;
910+
// (TM2s2)
911+
if(msg.version.timestamp == 0) msg.version.timestamp = msg.timestamp;
912+
// (TM2u)
913+
if(msg.annotations == null) msg.annotations = new MessageAnnotations();
914+
// (TM8a)
915+
if(msg.annotations.summary == null) msg.annotations.summary = new Summary(new HashMap<>());
908916

909917
try {
910918
if (msg.data != null) msg.decode(options, decodingContext);

lib/src/main/java/io/ably/lib/rest/AblyBase.java

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import io.ably.lib.platform.Platform;
1515
import io.ably.lib.push.Push;
1616
import io.ably.lib.realtime.Connection;
17+
import io.ably.lib.transport.Defaults;
1718
import io.ably.lib.types.AblyException;
1819
import io.ably.lib.types.AsyncHttpPaginatedResponse;
1920
import io.ably.lib.types.AsyncPaginatedResult;
@@ -44,6 +45,16 @@
4445
*/
4546
public abstract class AblyBase implements AutoCloseable {
4647

48+
/**
49+
* Some REST endpoints (e.g., stats and batch) changed in protocol v3.
50+
* To preserve backward compatibility for those specific endpoints, we
51+
* explicitly request protocol v2 when calling them.
52+
* <p>
53+
* Use this only for legacy endpoints that must remain on v2; all other
54+
* calls should use the default protocol version.
55+
*/
56+
private static final int LEGACY_API_PROTOCOL_V2 = 2;
57+
4758
public final ClientOptions options;
4859
public final Http http;
4960
public final HttpCore httpCore;
@@ -249,7 +260,17 @@ public PaginatedResult<Stats> stats(Param[] params) throws AblyException {
249260
}
250261

251262
PaginatedResult<Stats> stats(Http http, Param[] params) throws AblyException {
252-
return new PaginatedQuery<>(http, "/stats", HttpUtils.defaultAcceptHeaders(false), params, StatsReader.statsResponseHandler).get();
263+
return new PaginatedQuery<>(
264+
http,
265+
"/stats",
266+
// Stats api uses protocol v2 format for now
267+
Param.set(
268+
HttpUtils.defaultAcceptHeaders(false),
269+
new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, LEGACY_API_PROTOCOL_V2)
270+
),
271+
params,
272+
StatsReader.statsResponseHandler
273+
).get();
253274
}
254275

255276
/**
@@ -276,8 +297,18 @@ public void statsAsync(Param[] params, Callback<AsyncPaginatedResult<Stats>> cal
276297
statsAsync(http, params, callback);
277298
}
278299

279-
void statsAsync(Http http, Param[] params, Callback<AsyncPaginatedResult<Stats>> callback) {
280-
(new AsyncPaginatedQuery<Stats>(http, "/stats", HttpUtils.defaultAcceptHeaders(false), params, StatsReader.statsResponseHandler)).get(callback);
300+
void statsAsync(Http http, Param[] params, Callback<AsyncPaginatedResult<Stats>> callback) {
301+
(new AsyncPaginatedQuery<Stats>(
302+
http,
303+
"/stats",
304+
// Stats api uses protocol v2 format for now
305+
Param.set(
306+
HttpUtils.defaultAcceptHeaders(false),
307+
new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, LEGACY_API_PROTOCOL_V2)
308+
),
309+
params,
310+
StatsReader.statsResponseHandler
311+
)).get(callback);
281312
}
282313

283314
/**
@@ -433,7 +464,12 @@ private Http.Request<PublishResponse[]> publishBatchImpl(final Message.Batch[] p
433464
public void execute(HttpScheduler http, final Callback<PublishResponse[]> callback) throws AblyException {
434465
HttpCore.RequestBody requestBody = options.useBinaryProtocol ? MessageSerializer.asMsgpackRequest(pubSpecs) : MessageSerializer.asJSONRequest(pubSpecs);
435466
final Param[] params = options.addRequestIds ? Param.set(initialParams, Crypto.generateRandomRequestId()) : initialParams ; // RSC7c
436-
http.post("/messages", HttpUtils.defaultAcceptHeaders(options.useBinaryProtocol), params, requestBody, new HttpCore.ResponseHandler<PublishResponse[]>() {
467+
// This method uses an old batch format from protocol v2
468+
Param[] headers = Param.set(
469+
HttpUtils.defaultAcceptHeaders(options.useBinaryProtocol),
470+
new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, LEGACY_API_PROTOCOL_V2)
471+
);
472+
http.post("/messages", headers, params, requestBody, new HttpCore.ResponseHandler<PublishResponse[]>() {
437473
@Override
438474
public PublishResponse[] handleResponse(HttpCore.Response response, ErrorInfo error) throws AblyException {
439475
if(error != null && error.code != 40020) {
@@ -446,11 +482,6 @@ public PublishResponse[] handleResponse(HttpCore.Response response, ErrorInfo er
446482
});
447483
}
448484

449-
/**
450-
* Authentication token has changed. waitForResult is true if there is a need to
451-
* wait for server response to auth request
452-
*/
453-
454485
/**
455486
* Override this method in AblyRealtime and pass updated token to ConnectionManager
456487
* @param token new token

lib/src/main/java/io/ably/lib/transport/Defaults.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public class Defaults {
1212
* spec: G4
1313
* </p>
1414
*/
15-
public static final String ABLY_PROTOCOL_VERSION = "2";
15+
public static final String ABLY_PROTOCOL_VERSION = "4";
1616

1717
public static final String ABLY_AGENT_VERSION = String.format("%s/%s", "ably-java", BuildConfig.VERSION);
1818

0 commit comments

Comments
 (0)