Skip to content

Commit 231b158

Browse files
committed
[AIT-98] feat: realtime edits and deletes
1 parent 6563568 commit 231b158

11 files changed

Lines changed: 790 additions & 94 deletions

File tree

lib/src/main/java/io/ably/lib/realtime/ChannelBase.java

Lines changed: 100 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.ably.lib.types.DeltaExtras;
3232
import io.ably.lib.types.ErrorInfo;
3333
import io.ably.lib.types.Message;
34+
import io.ably.lib.types.MessageAction;
3435
import io.ably.lib.types.MessageAnnotations;
3536
import io.ably.lib.types.MessageDecodeException;
3637
import io.ably.lib.types.MessageOperation;
@@ -46,6 +47,7 @@
4647
import io.ably.lib.types.UpdateDeleteResult;
4748
import io.ably.lib.util.CollectionUtils;
4849
import io.ably.lib.util.EventEmitter;
50+
import io.ably.lib.util.Listeners;
4951
import io.ably.lib.util.Log;
5052
import io.ably.lib.util.ReconnectionStrategy;
5153
import io.ably.lib.util.StringUtils;
@@ -1125,7 +1127,7 @@ public synchronized void publish(Message[] messages, CompletionListener listener
11251127
case suspended:
11261128
throw AblyException.fromErrorInfo(new ErrorInfo("Unable to publish in failed or suspended state", 400, 40000));
11271129
default:
1128-
connectionManager.send(msg, queueMessages, listener);
1130+
connectionManager.send(msg, queueMessages, Listeners.fromCompletionListener(listener));
11291131
}
11301132
}
11311133

@@ -1208,103 +1210,89 @@ public void getMessageAsync(String serial, Callback<Message> callback) {
12081210
}
12091211

12101212
/**
1211-
* Updates an existing message using patch semantics.
1212-
* <p>
1213-
* Non-null fields in the provided message (name, data, extras) will replace the corresponding
1214-
* fields in the existing message, while null fields will be left unchanged.
1213+
* Asynchronously updates an existing message.
12151214
*
12161215
* @param message A {@link Message} object containing the fields to update and the serial identifier.
1217-
* Only non-null fields will be applied to the existing message.
1218-
* @param operation operation metadata such as clientId, description, or metadata in the version field
1219-
* @throws AblyException If the update operation fails.
1220-
* @return A {@link UpdateDeleteResult} containing the updated message version serial.
1216+
* <p>
1217+
* This callback is invoked on a background thread.
12211218
*/
1222-
public UpdateDeleteResult updateMessage(Message message, MessageOperation operation) throws AblyException {
1223-
return messageEditsMixin.updateMessage(ably.http, message, operation);
1219+
public void updateMessage(Message message) throws AblyException {
1220+
updateMessage(message, null, null);
12241221
}
12251222

12261223
/**
1227-
* Updates an existing message using patch semantics.
1228-
* <p>
1229-
* Non-null fields in the provided message (name, data, extras) will replace the corresponding
1230-
* fields in the existing message, while null fields will be left unchanged.
1224+
* Asynchronously updates an existing message.
12311225
*
12321226
* @param message A {@link Message} object containing the fields to update and the serial identifier.
1233-
* Only non-null fields will be applied to the existing message.
1234-
* @throws AblyException If the update operation fails.
1235-
* @return A {@link UpdateDeleteResult} containing the updated message version serial.
1227+
* @param operation operation metadata such as clientId, description, or metadata in the version field
1228+
* <p>
1229+
* This callback is invoked on a background thread.
12361230
*/
1237-
public UpdateDeleteResult updateMessage(Message message) throws AblyException {
1238-
return updateMessage(message, null);
1231+
public void updateMessage(Message message, MessageOperation operation) throws AblyException {
1232+
updateMessage(message, operation, null);
12391233
}
12401234

12411235
/**
12421236
* Asynchronously updates an existing message.
12431237
*
12441238
* @param message A {@link Message} object containing the fields to update and the serial identifier.
12451239
* @param operation operation metadata such as clientId, description, or metadata in the version field
1246-
* @param callback A callback to be notified of the outcome of this operation.
1240+
* @param listener A callback to be notified of the outcome of this operation.
12471241
* <p>
12481242
* This callback is invoked on a background thread.
12491243
*/
1250-
public void updateMessageAsync(Message message, MessageOperation operation, Callback<UpdateDeleteResult> callback) {
1251-
messageEditsMixin.updateMessageAsync(ably.http, message, operation, callback);
1244+
public void updateMessage(Message message, MessageOperation operation, Callback<UpdateDeleteResult> listener) throws AblyException {
1245+
Log.v(TAG, "updateMessage(Message); channel = " + this.name + "; serial = " + message.serial);
1246+
updateDeleteImpl(message, operation, MessageAction.MESSAGE_UPDATE, listener);
12521247
}
12531248

12541249
/**
12551250
* Asynchronously updates an existing message.
12561251
*
12571252
* @param message A {@link Message} object containing the fields to update and the serial identifier.
1258-
* @param callback A callback to be notified of the outcome of this operation.
1253+
* @param listener A callback to be notified of the outcome of this operation.
12591254
* <p>
12601255
* This callback is invoked on a background thread.
12611256
*/
1262-
public void updateMessageAsync(Message message, Callback<UpdateDeleteResult> callback) {
1263-
updateMessageAsync(message, null, callback);
1257+
public void updateMessage(Message message, Callback<UpdateDeleteResult> listener) throws AblyException {
1258+
updateMessage(message, null, listener);
12641259
}
12651260

12661261
/**
1267-
* Marks a message as deleted.
1268-
* <p>
1269-
* This operation does not remove the message from history; it marks it as deleted
1270-
* while preserving the full message history. The deleted message can still be
1271-
* retrieved and will have its action set to MESSAGE_DELETE.
1262+
* Asynchronously marks a message as deleted.
12721263
*
1273-
* @param message A {@link Message} message containing the serial identifier.
1274-
* @param operation operation metadata such as clientId, description, or metadata in the version field
1275-
* @throws AblyException If the delete operation fails.
1276-
* @return A {@link UpdateDeleteResult} containing the deleted message version serial.
1264+
* @param message A {@link Message} object containing the serial identifier and operation metadata.
1265+
* <p>
1266+
* This callback is invoked on a background thread.
12771267
*/
1278-
public UpdateDeleteResult deleteMessage(Message message, MessageOperation operation) throws AblyException {
1279-
return messageEditsMixin.deleteMessage(ably.http, message, operation);
1268+
public void deleteMessage(Message message) throws AblyException {
1269+
deleteMessage(message, null, null);
12801270
}
12811271

12821272
/**
1283-
* Marks a message as deleted.
1284-
* <p>
1285-
* This operation does not remove the message from history; it marks it as deleted
1286-
* while preserving the full message history. The deleted message can still be
1287-
* retrieved and will have its action set to MESSAGE_DELETE.
1273+
* Asynchronously marks a message as deleted.
12881274
*
1289-
* @param message A {@link Message} message containing the serial identifier.
1290-
* @throws AblyException If the delete operation fails.
1291-
* @return A {@link UpdateDeleteResult} containing the deleted message version serial.
1275+
* @param message A {@link Message} object containing the serial identifier and operation metadata.
1276+
* @param operation operation metadata such as clientId, description, or metadata in the version field
1277+
* <p>
1278+
* This callback is invoked on a background thread.
12921279
*/
1293-
public UpdateDeleteResult deleteMessage(Message message) throws AblyException {
1294-
return deleteMessage(message, null);
1280+
public void deleteMessage(Message message, MessageOperation operation) throws AblyException {
1281+
deleteMessage(message, operation, null);
12951282
}
12961283

12971284
/**
12981285
* Asynchronously marks a message as deleted.
12991286
*
13001287
* @param message A {@link Message} object containing the serial identifier and operation metadata.
13011288
* @param operation operation metadata such as clientId, description, or metadata in the version field
1302-
* @param callback A callback to be notified of the outcome of this operation.
1289+
* @param listener A callback to be notified of the outcome of this operation.
13031290
* <p>
13041291
* This callback is invoked on a background thread.
13051292
*/
1306-
public void deleteMessageAsync(Message message, MessageOperation operation, Callback<UpdateDeleteResult> callback) {
1307-
messageEditsMixin.deleteMessageAsync(ably.http, message, operation, callback);
1293+
public void deleteMessage(Message message, MessageOperation operation, Callback<UpdateDeleteResult> listener) throws AblyException {
1294+
Log.v(TAG, "deleteMessage(Message); channel = " + this.name + "; serial = " + message.serial);
1295+
updateDeleteImpl(message, operation, MessageAction.MESSAGE_DELETE, listener);
13081296
}
13091297

13101298
/**
@@ -1315,44 +1303,45 @@ public void deleteMessageAsync(Message message, MessageOperation operation, Call
13151303
* <p>
13161304
* This callback is invoked on a background thread.
13171305
*/
1318-
public void deleteMessageAsync(Message message, Callback<UpdateDeleteResult> callback) {
1319-
deleteMessageAsync(message, null, callback);
1306+
public void deleteMessage(Message message, Callback<UpdateDeleteResult> callback) throws AblyException {
1307+
deleteMessage(message, null, callback);
13201308
}
13211309

13221310
/**
1323-
* Appends message text to the end of the message data.
1311+
* Asynchronously appends message text to the end of the message data.
13241312
*
13251313
* @param message A {@link Message} object containing the serial identifier and data to append.
1326-
* @param operation operation details such as clientId, description, or metadata
1327-
* @return A {@link UpdateDeleteResult} containing the updated message version serial.
1328-
* @throws AblyException If the append operation fails.
1314+
* <p>
1315+
* This callback is invoked on a background thread.
13291316
*/
1330-
public UpdateDeleteResult appendMessage(Message message, MessageOperation operation) throws AblyException {
1331-
return messageEditsMixin.appendMessage(ably.http, message, operation);
1317+
public void appendMessage(Message message) throws AblyException {
1318+
appendMessage(message, null, null);
13321319
}
13331320

13341321
/**
1335-
* Appends message text to the end of the message.
1322+
* Asynchronously appends message text to the end of the message.
13361323
*
13371324
* @param message A {@link Message} object containing the serial identifier and data to append.
1338-
* @return A {@link UpdateDeleteResult} containing the updated message version serial.
1339-
* @throws AblyException If the append operation fails.
1325+
* @param operation operation details such as clientId, description, or metadata
1326+
* <p>
1327+
* This callback is invoked on a background thread.
13401328
*/
1341-
public UpdateDeleteResult appendMessage(Message message) throws AblyException {
1342-
return appendMessage(message, null);
1329+
public void appendMessage(Message message, MessageOperation operation) throws AblyException {
1330+
appendMessage(message, operation, null);
13431331
}
13441332

13451333
/**
13461334
* Asynchronously appends message text to the end of the message.
13471335
*
13481336
* @param message A {@link Message} object containing the serial identifier and data to append.
13491337
* @param operation operation details such as clientId, description, or metadata
1350-
* @param callback A callback to be notified of the outcome of this operation.
1338+
* @param listener A callback to be notified of the outcome of this operation.
13511339
* <p>
13521340
* This callback is invoked on a background thread.
13531341
*/
1354-
public void appendMessageAsync(Message message, MessageOperation operation, Callback<UpdateDeleteResult> callback) {
1355-
messageEditsMixin.appendMessageAsync(ably.http, message, operation, callback);
1342+
public void appendMessage(Message message, MessageOperation operation, Callback<UpdateDeleteResult> listener) throws AblyException {
1343+
Log.v(TAG, "appendMessage(Message); channel = " + this.name + "; serial = " + message.serial);
1344+
updateDeleteImpl(message, operation, MessageAction.MESSAGE_APPEND, listener);
13561345
}
13571346

13581347
/**
@@ -1363,8 +1352,50 @@ public void appendMessageAsync(Message message, MessageOperation operation, Call
13631352
* <p>
13641353
* This callback is invoked on a background thread.
13651354
*/
1366-
public void appendMessageAsync(Message message, Callback<UpdateDeleteResult> callback) {
1367-
appendMessageAsync(message, null, callback);
1355+
public void appendMessage(Message message, Callback<UpdateDeleteResult> callback) throws AblyException {
1356+
appendMessage(message, null, callback);
1357+
}
1358+
1359+
private void updateDeleteImpl(
1360+
Message message,
1361+
MessageOperation operation,
1362+
MessageAction action,
1363+
Callback<UpdateDeleteResult> listener
1364+
) throws AblyException {
1365+
if (message.serial == null || message.serial.isEmpty()) {
1366+
throw AblyException.fromErrorInfo(new ErrorInfo("Message serial cannot be empty", 400, 40003));
1367+
}
1368+
ConnectionManager connectionManager = ably.connection.connectionManager;
1369+
ConnectionManager.State connectionState = connectionManager.getConnectionState();
1370+
boolean queueMessages = ably.options.queueMessages;
1371+
if (!connectionManager.isActive() || (connectionState.queueEvents && !queueMessages)) {
1372+
throw AblyException.fromErrorInfo(connectionState.defaultErrorInfo);
1373+
}
1374+
boolean connected = (connectionState.sendEvents);
1375+
1376+
Message updatedMessage = new Message(message.name, message.data, message.extras);
1377+
updatedMessage.serial = message.serial;
1378+
updatedMessage.action = action;
1379+
updatedMessage.version = new MessageVersion();
1380+
if (operation != null) {
1381+
updatedMessage.version.clientId = operation.clientId;
1382+
updatedMessage.version.description = operation.description;
1383+
updatedMessage.version.metadata = operation.metadata;
1384+
}
1385+
1386+
try {
1387+
ably.auth.checkClientId(message, true, connected);
1388+
updatedMessage.encode(options);
1389+
} catch (AblyException e) {
1390+
if (listener != null) {
1391+
listener.onError(e.errorInfo);
1392+
}
1393+
return;
1394+
}
1395+
1396+
ProtocolMessage msg = new ProtocolMessage(Action.message, this.name);
1397+
msg.messages = new Message[] { updatedMessage };
1398+
connectionManager.send(msg, queueMessages, Listeners.toPublishResultListener(listener));
13681399
}
13691400

13701401
/**
@@ -1685,7 +1716,7 @@ public void once(ChannelState state, ChannelStateListener listener) {
16851716
*/
16861717
public void sendProtocolMessage(ProtocolMessage protocolMessage, CompletionListener listener) throws AblyException {
16871718
ConnectionManager connectionManager = ably.connection.connectionManager;
1688-
connectionManager.send(protocolMessage, ably.options.queueMessages, listener);
1719+
connectionManager.send(protocolMessage, ably.options.queueMessages, Listeners.fromCompletionListener(listener));
16891720
}
16901721

16911722
private static final String TAG = Channel.class.getName();

lib/src/main/java/io/ably/lib/realtime/Presence.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import io.ably.lib.types.PresenceMessage;
1616
import io.ably.lib.types.PresenceSerializer;
1717
import io.ably.lib.types.ProtocolMessage;
18+
import io.ably.lib.types.PublishResult;
19+
import io.ably.lib.util.Listeners;
1820
import io.ably.lib.util.Log;
1921
import io.ably.lib.util.StringUtils;
2022

@@ -120,9 +122,9 @@ public synchronized PresenceMessage[] get(String clientId, boolean wait) throws
120122
return get(new Param(GET_WAITFORSYNC, String.valueOf(wait)), new Param(GET_CLIENTID, clientId));
121123
}
122124

123-
void addPendingPresence(PresenceMessage presenceMessage, CompletionListener listener) {
125+
void addPendingPresence(PresenceMessage presenceMessage, Callback<PublishResult> listener) {
124126
synchronized(channel) {
125-
final QueuedPresence queuedPresence = new QueuedPresence(presenceMessage,listener);
127+
final QueuedPresence queuedPresence = new QueuedPresence(presenceMessage, Listeners.unwrap(listener));
126128
pendingPresence.add(queuedPresence);
127129
}
128130
}
@@ -763,7 +765,7 @@ public void updatePresence(PresenceMessage msg, CompletionListener listener) thr
763765
ProtocolMessage message = new ProtocolMessage(ProtocolMessage.Action.presence, channel.name);
764766
message.presence = new PresenceMessage[] { msg };
765767
ConnectionManager connectionManager = ably.connection.connectionManager;
766-
connectionManager.send(message, ably.options.queueMessages, listener);
768+
connectionManager.send(message, ably.options.queueMessages, Listeners.fromCompletionListener(listener));
767769
break;
768770
default:
769771
throw AblyException.fromErrorInfo(new ErrorInfo("Unable to enter presence channel in detached or failed state", 400, 91001));
@@ -892,7 +894,7 @@ private void sendQueuedMessages() {
892894
pendingPresence.clear();
893895

894896
try {
895-
connectionManager.send(message, queueMessages, listener);
897+
connectionManager.send(message, queueMessages, Listeners.fromCompletionListener(listener));
896898
} catch(AblyException e) {
897899
Log.e(TAG, "sendQueuedMessages(): Unexpected exception sending message", e);
898900
if(listener != null)

0 commit comments

Comments
 (0)