Skip to content

Commit 55da346

Browse files
committed
feat: implement protocol v4 support for enhanced message annotations and versioning
Introduce `MessageAnnotations` and `MessageVersion` classes for protocol v4. Replace `summary` with `annotations` in `Message`. Enhance version tracking with detailed metadata, ensuring compatibility with the new protocol. Update tests and protocol version to `4`.
1 parent 3f9d007 commit 55da346

16 files changed

Lines changed: 570 additions & 287 deletions

File tree

lib/src/main/java/io/ably/lib/http/HttpCore.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,8 +333,10 @@ private Map<String, String> collectRequestHeaders(URL url, String method, Param[
333333
requestHeaders.put(HttpConstants.Headers.ACCEPT, HttpConstants.ContentTypes.JSON);
334334
}
335335

336-
/* pass required headers */
337-
requestHeaders.put(Defaults.ABLY_PROTOCOL_VERSION_HEADER, Defaults.ABLY_PROTOCOL_VERSION); // RSC7a
336+
if (!requestHeaders.containsKey(Defaults.ABLY_PROTOCOL_VERSION_HEADER)) {
337+
requestHeaders.put(Defaults.ABLY_PROTOCOL_VERSION_HEADER, Defaults.ABLY_PROTOCOL_VERSION); // RSC7e
338+
}
339+
338340
Map<String, String> additionalAgents = new HashMap<>();
339341
if (options.agents != null) additionalAgents.putAll(options.agents);
340342
if (dynamicAgents != null) additionalAgents.putAll(dynamicAgents);

lib/src/main/java/io/ably/lib/realtime/ChannelBase.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,17 @@
2929
import io.ably.lib.types.DeltaExtras;
3030
import io.ably.lib.types.ErrorInfo;
3131
import io.ably.lib.types.Message;
32-
import io.ably.lib.types.MessageAction;
32+
import io.ably.lib.types.MessageAnnotations;
3333
import io.ably.lib.types.MessageDecodeException;
3434
import io.ably.lib.types.MessageSerializer;
35+
import io.ably.lib.types.MessageVersion;
3536
import io.ably.lib.types.PaginatedResult;
3637
import io.ably.lib.types.Param;
3738
import io.ably.lib.types.PresenceMessage;
3839
import io.ably.lib.types.ProtocolMessage;
3940
import io.ably.lib.types.ProtocolMessage.Action;
4041
import io.ably.lib.types.ProtocolMessage.Flag;
42+
import io.ably.lib.types.Summary;
4143
import io.ably.lib.util.CollectionUtils;
4244
import io.ably.lib.util.EventEmitter;
4345
import io.ably.lib.util.Log;
@@ -901,10 +903,16 @@ private void onMessage(final ProtocolMessage protocolMessage) {
901903
if(msg.connectionId == null) msg.connectionId = protocolMessage.connectionId;
902904
if(msg.timestamp == 0) msg.timestamp = protocolMessage.timestamp;
903905
if(msg.id == null) msg.id = protocolMessage.id + ':' + i;
904-
// (TM2k)
905-
if(msg.serial == null && msg.version != null && msg.action == MessageAction.MESSAGE_CREATE) msg.serial = msg.version;
906-
// (TM2o)
907-
if(msg.createdAt == null && msg.action == MessageAction.MESSAGE_CREATE) msg.createdAt = msg.timestamp;
906+
// (TM2s)
907+
if(msg.version == null) msg.version = new MessageVersion(msg.serial, msg.timestamp);
908+
// (TM2s1)
909+
if(msg.version.serial == null) msg.version.serial = msg.serial;
910+
// (TM2s2)
911+
if(msg.version.timestamp == 0) msg.version.timestamp = msg.timestamp;
912+
// (TM2u)
913+
if(msg.annotations == null) msg.annotations = new MessageAnnotations();
914+
// (TM8a)
915+
if(msg.annotations.summary == null) msg.annotations.summary = new Summary(new HashMap<>());
908916

909917
try {
910918
if (msg.data != null) msg.decode(options, decodingContext);

lib/src/main/java/io/ably/lib/rest/AblyBase.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import io.ably.lib.platform.Platform;
1515
import io.ably.lib.push.Push;
1616
import io.ably.lib.realtime.Connection;
17+
import io.ably.lib.transport.Defaults;
1718
import io.ably.lib.types.AblyException;
1819
import io.ably.lib.types.AsyncHttpPaginatedResponse;
1920
import io.ably.lib.types.AsyncPaginatedResult;
@@ -249,7 +250,14 @@ public PaginatedResult<Stats> stats(Param[] params) throws AblyException {
249250
}
250251

251252
PaginatedResult<Stats> stats(Http http, Param[] params) throws AblyException {
252-
return new PaginatedQuery<>(http, "/stats", HttpUtils.defaultAcceptHeaders(false), params, StatsReader.statsResponseHandler).get();
253+
return new PaginatedQuery<>(
254+
http,
255+
"/stats",
256+
// Stats api uses protocol v2 format for now
257+
Param.set(HttpUtils.defaultAcceptHeaders(false), new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, 2)),
258+
params,
259+
StatsReader.statsResponseHandler
260+
).get();
253261
}
254262

255263
/**
@@ -276,8 +284,15 @@ public void statsAsync(Param[] params, Callback<AsyncPaginatedResult<Stats>> cal
276284
statsAsync(http, params, callback);
277285
}
278286

279-
void statsAsync(Http http, Param[] params, Callback<AsyncPaginatedResult<Stats>> callback) {
280-
(new AsyncPaginatedQuery<Stats>(http, "/stats", HttpUtils.defaultAcceptHeaders(false), params, StatsReader.statsResponseHandler)).get(callback);
287+
void statsAsync(Http http, Param[] params, Callback<AsyncPaginatedResult<Stats>> callback) {
288+
(new AsyncPaginatedQuery<Stats>(
289+
http,
290+
"/stats",
291+
// Stats api uses protocol v2 format for now
292+
Param.set(HttpUtils.defaultAcceptHeaders(false), new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, 2)),
293+
params,
294+
StatsReader.statsResponseHandler
295+
)).get(callback);
281296
}
282297

283298
/**
@@ -433,7 +448,12 @@ private Http.Request<PublishResponse[]> publishBatchImpl(final Message.Batch[] p
433448
public void execute(HttpScheduler http, final Callback<PublishResponse[]> callback) throws AblyException {
434449
HttpCore.RequestBody requestBody = options.useBinaryProtocol ? MessageSerializer.asMsgpackRequest(pubSpecs) : MessageSerializer.asJSONRequest(pubSpecs);
435450
final Param[] params = options.addRequestIds ? Param.set(initialParams, Crypto.generateRandomRequestId()) : initialParams ; // RSC7c
436-
http.post("/messages", HttpUtils.defaultAcceptHeaders(options.useBinaryProtocol), params, requestBody, new HttpCore.ResponseHandler<PublishResponse[]>() {
451+
// This method uses an old batch format from protocol v2
452+
Param[] headers = Param.set(
453+
HttpUtils.defaultAcceptHeaders(options.useBinaryProtocol),
454+
new Param(Defaults.ABLY_PROTOCOL_VERSION_HEADER, 2)
455+
);
456+
http.post("/messages", headers, params, requestBody, new HttpCore.ResponseHandler<PublishResponse[]>() {
437457
@Override
438458
public PublishResponse[] handleResponse(HttpCore.Response response, ErrorInfo error) throws AblyException {
439459
if(error != null && error.code != 40020) {
@@ -446,11 +466,6 @@ public PublishResponse[] handleResponse(HttpCore.Response response, ErrorInfo er
446466
});
447467
}
448468

449-
/**
450-
* Authentication token has changed. waitForResult is true if there is a need to
451-
* wait for server response to auth request
452-
*/
453-
454469
/**
455470
* Override this method in AblyRealtime and pass updated token to ConnectionManager
456471
* @param token new token

lib/src/main/java/io/ably/lib/transport/Defaults.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public class Defaults {
1212
* spec: G4
1313
* </p>
1414
*/
15-
public static final String ABLY_PROTOCOL_VERSION = "2";
15+
public static final String ABLY_PROTOCOL_VERSION = "4";
1616

1717
public static final String ABLY_AGENT_VERSION = String.format("%s/%s", "ably-java", BuildConfig.VERSION);
1818

0 commit comments

Comments
 (0)