Skip to content

Commit 8ff2552

Browse files
committed
feat: add support for message update, delete, and append operations
- Introduced `_send_update` method to handle message operations (update, delete, append) on the channel. - Added `update_message`, `delete_message`, and `append_message` methods to enable respective operations via the API. - Implemented `MessageOperation`, `PublishResult`, and `UpdateDeleteResult` classes for handling operation metadata and results. - Bumped `api_version` to 5.
1 parent 048fbd1 commit 8ff2552

10 files changed

Lines changed: 744 additions & 24 deletions

ably/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
from ably.types.capability import Capability
88
from ably.types.channelsubscription import PushChannelSubscription
99
from ably.types.device import DeviceDetails
10+
from ably.types.message import MessageAction, MessageVersion
11+
from ably.types.operations import MessageOperation, PublishResult, UpdateDeleteResult
1012
from ably.types.options import Options, VCDiffDecoder
1113
from ably.util.crypto import CipherParams
1214
from ably.util.exceptions import AblyAuthException, AblyException, IncompatibleClientIdException
@@ -15,5 +17,5 @@
1517
logger = logging.getLogger(__name__)
1618
logger.addHandler(logging.NullHandler())
1719

18-
api_version = '4'
20+
api_version = '5'
1921
lib_version = '2.1.3'

ably/rest/channel.py

Lines changed: 208 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,21 @@
1010

1111
from ably.http.paginatedresult import PaginatedResult, format_params
1212
from ably.types.channeldetails import ChannelDetails
13-
from ably.types.message import Message, make_message_response_handler
13+
from ably.types.message import (
14+
Message,
15+
MessageAction,
16+
MessageVersion,
17+
make_message_response_handler,
18+
make_single_message_response_handler,
19+
)
20+
from ably.types.operations import MessageOperation, PublishResult, UpdateDeleteResult
1421
from ably.types.presence import Presence
1522
from ably.util.crypto import get_cipher
16-
from ably.util.exceptions import IncompatibleClientIdException, catch_all
23+
from ably.util.exceptions import (
24+
AblyException,
25+
IncompatibleClientIdException,
26+
catch_all,
27+
)
1728

1829
log = logging.getLogger(__name__)
1930

@@ -99,7 +110,17 @@ async def publish_messages(self, messages, params=None, timeout=None):
99110
if params:
100111
params = {k: str(v).lower() if type(v) is bool else v for k, v in params.items()}
101112
path += '?' + parse.urlencode(params)
102-
return await self.ably.http.post(path, body=request_body, timeout=timeout)
113+
response = await self.ably.http.post(path, body=request_body, timeout=timeout)
114+
115+
# Parse response to extract serials
116+
try:
117+
result_data = response.to_native()
118+
if result_data and isinstance(result_data, dict):
119+
return PublishResult.from_dict(result_data)
120+
return PublishResult()
121+
except Exception:
122+
# If response parsing fails, return empty PublishResult for backwards compatibility
123+
return PublishResult()
103124

104125
async def publish_name_data(self, name, data, timeout=None):
105126
messages = [Message(name, data)]
@@ -141,6 +162,190 @@ async def status(self):
141162
obj = response.to_native()
142163
return ChannelDetails.from_dict(obj)
143164

165+
async def _send_update(
166+
self,
167+
message: Message,
168+
action: MessageAction,
169+
operation: MessageOperation = None,
170+
params: dict = None,
171+
):
172+
"""Internal method to send update/delete/append operations."""
173+
if not message.serial:
174+
raise AblyException(
175+
"Message serial is required for update/delete/append operations",
176+
400,
177+
40000
178+
)
179+
180+
if not operation:
181+
version = None
182+
else:
183+
version = MessageVersion(
184+
client_id=operation.client_id,
185+
description=operation.description,
186+
metadata=operation.metadata
187+
)
188+
189+
# Create a new message with the operation fields
190+
update_message = Message(
191+
name=message.name,
192+
data=message.data,
193+
client_id=message.client_id,
194+
serial=message.serial,
195+
action=action,
196+
version=version,
197+
)
198+
199+
# Encrypt if needed
200+
if self.cipher:
201+
update_message.encrypt(self.__cipher)
202+
203+
# Serialize the message
204+
request_body = update_message.as_dict(binary=self.ably.options.use_binary_protocol)
205+
206+
if not self.ably.options.use_binary_protocol:
207+
request_body = json.dumps(request_body, separators=(',', ':'))
208+
else:
209+
request_body = msgpack.packb(request_body, use_bin_type=True)
210+
211+
# Build path with params
212+
path = self.__base_path + 'messages/{}'.format(parse.quote_plus(message.serial, safe=':'))
213+
if params:
214+
params = {k: str(v).lower() if type(v) is bool else v for k, v in params.items()}
215+
path += '?' + parse.urlencode(params)
216+
217+
# Send request
218+
response = await self.ably.http.patch(path, body=request_body)
219+
220+
# Parse response
221+
try:
222+
result_data = response.to_native()
223+
if result_data and isinstance(result_data, dict):
224+
return UpdateDeleteResult.from_dict(result_data)
225+
return UpdateDeleteResult()
226+
except Exception:
227+
return UpdateDeleteResult()
228+
229+
async def update_message(self, message: Message, operation: MessageOperation = None, params: dict = None):
230+
"""Updates an existing message on this channel.
231+
232+
Parameters:
233+
- message: Message object to update. Must have a serial field.
234+
- operation: Optional MessageOperation containing description and metadata for the update.
235+
- params: Optional dict of query parameters.
236+
237+
Returns:
238+
- UpdateDeleteResult containing the version serial of the updated message.
239+
"""
240+
return await self._send_update(message, MessageAction.MESSAGE_UPDATE, operation, params)
241+
242+
async def delete_message(self, message: Message, operation: MessageOperation = None, params: dict = None):
243+
"""Deletes a message on this channel.
244+
245+
Parameters:
246+
- message: Message object to delete. Must have a serial field.
247+
- operation: Optional MessageOperation containing description and metadata for the delete.
248+
- params: Optional dict of query parameters.
249+
250+
Returns:
251+
- UpdateDeleteResult containing the version serial of the deleted message.
252+
"""
253+
return await self._send_update(message, MessageAction.MESSAGE_DELETE, operation, params)
254+
255+
async def append_message(self, message: Message, operation: MessageOperation = None, params: dict = None):
256+
"""Appends data to an existing message on this channel.
257+
258+
Parameters:
259+
- message: Message object with data to append. Must have a serial field.
260+
- operation: Optional MessageOperation containing description and metadata for the append.
261+
- params: Optional dict of query parameters.
262+
263+
Returns:
264+
- UpdateDeleteResult containing the version serial of the appended message.
265+
"""
266+
return await self._send_update(message, MessageAction.MESSAGE_APPEND, operation, params)
267+
268+
async def get_message(self, serial_or_message, timeout=None):
269+
"""Retrieves a single message by its serial.
270+
271+
Parameters:
272+
- serial_or_message: Either a string serial or a Message object with a serial field.
273+
274+
Returns:
275+
- Message object for the requested serial.
276+
277+
Raises:
278+
- AblyException: If the serial is missing or the message cannot be retrieved.
279+
"""
280+
# Extract serial from string or Message object
281+
if isinstance(serial_or_message, str):
282+
serial = serial_or_message
283+
elif isinstance(serial_or_message, Message):
284+
serial = serial_or_message.serial
285+
else:
286+
serial = None
287+
288+
if not serial:
289+
raise AblyException(
290+
'This message lacks a serial. Make sure you have enabled "Message annotations, '
291+
'updates, and deletes" in channel settings on your dashboard.',
292+
400,
293+
40003
294+
)
295+
296+
# Build the path
297+
path = self.__base_path + 'messages/' + parse.quote_plus(serial, safe=':')
298+
299+
# Make the request
300+
response = await self.ably.http.get(path, timeout=timeout)
301+
302+
# Create Message from the response
303+
message_handler = make_single_message_response_handler(self.__cipher)
304+
return message_handler(response)
305+
306+
async def get_message_versions(self, serial_or_message, params=None):
307+
"""Retrieves version history for a message.
308+
309+
Parameters:
310+
- serial_or_message: Either a string serial or a Message object with a serial field.
311+
- params: Optional dict of query parameters for pagination (e.g., limit, start, end, direction).
312+
313+
Returns:
314+
- PaginatedResult containing Message objects representing each version.
315+
316+
Raises:
317+
- AblyException: If the serial is missing or versions cannot be retrieved.
318+
"""
319+
# Extract serial from string or Message object
320+
if isinstance(serial_or_message, str):
321+
serial = serial_or_message
322+
elif isinstance(serial_or_message, Message):
323+
serial = serial_or_message.serial
324+
else:
325+
serial = None
326+
327+
if not serial:
328+
raise AblyException(
329+
'This message lacks a serial. Make sure you have enabled "Message annotations, '
330+
'updates, and deletes" in channel settings on your dashboard.',
331+
400,
332+
40003
333+
)
334+
335+
# Build the path
336+
params_str = format_params({}, **params) if params else ''
337+
path = self.__base_path + 'messages/' + parse.quote_plus(serial, safe=':') + '/versions' + params_str
338+
339+
# Create message handler for decoding
340+
message_handler = make_message_response_handler(self.__cipher)
341+
342+
# Return paginated result
343+
return await PaginatedResult.paginated_query(
344+
self.ably.http,
345+
url=path,
346+
response_processor=message_handler
347+
)
348+
144349
@property
145350
def ably(self):
146351
return self.__ably

ably/types/message.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -135,18 +135,8 @@ def __init__(self,
135135
self.__timestamp = timestamp
136136
self.__extras = extras
137137
self.__serial = serial
138-
# Handle action - can be MessageAction enum, int, or None
139-
if action is not None:
140-
try:
141-
self.__action = MessageAction(action)
142-
except ValueError:
143-
# If it's not a valid action value, store as None
144-
self.__action = None
145-
if isinstance(version, MessageVersion):
146-
self.__version = version
147-
else:
148-
self.__version = MessageVersion.from_dict(version)
149-
138+
self.__action = action
139+
self.__version = version
150140

151141
def __eq__(self, other):
152142
if isinstance(other, Message):
@@ -315,6 +305,20 @@ def from_encoded(obj, cipher=None, context=None):
315305

316306
decoded_data = Message.decode(data, encoding, cipher, context)
317307

308+
if action is not None:
309+
try:
310+
action = MessageAction(action)
311+
except ValueError:
312+
# If it's not a valid action value, store as None
313+
action = None
314+
else:
315+
action = None
316+
317+
if version is not None:
318+
version = MessageVersion.from_dict(version)
319+
else:
320+
version = MessageVersion(serial=serial, timestamp=timestamp)
321+
318322
return Message(
319323
id=id,
320324
name=name,
@@ -359,3 +363,9 @@ def encrypted_message_response_handler(response):
359363
messages = response.to_native()
360364
return Message.from_encoded_array(messages, cipher=cipher)
361365
return encrypted_message_response_handler
366+
367+
def make_single_message_response_handler(cipher):
368+
def encrypted_message_response_handler(response):
369+
message = response.to_native()
370+
return Message.from_encoded(message, cipher=cipher)
371+
return encrypted_message_response_handler

ably/types/operations.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
class MessageOperation:
2+
"""Metadata for message update/delete/append operations."""
3+
4+
def __init__(self, client_id=None, description=None, metadata=None):
5+
"""
6+
Args:
7+
description: Optional description of the operation.
8+
metadata: Optional dict of metadata key-value pairs associated with the operation.
9+
"""
10+
self.__client_id = client_id
11+
self.__description = description
12+
self.__metadata = metadata
13+
14+
@property
15+
def client_id(self):
16+
return self.__client_id
17+
18+
@property
19+
def description(self):
20+
return self.__description
21+
22+
@property
23+
def metadata(self):
24+
return self.__metadata
25+
26+
def as_dict(self):
27+
"""Convert MessageOperation to dictionary format."""
28+
result = {
29+
'clientId': self.client_id,
30+
'description': self.description,
31+
'metadata': self.metadata,
32+
}
33+
# Remove None values
34+
return {k: v for k, v in result.items() if v is not None}
35+
36+
@staticmethod
37+
def from_dict(obj):
38+
"""Create MessageOperation from dictionary."""
39+
if obj is None:
40+
return None
41+
return MessageOperation(
42+
client_id=obj.get('clientId'),
43+
description=obj.get('description'),
44+
metadata=obj.get('metadata'),
45+
)
46+
47+
48+
class PublishResult:
49+
"""Result of a publish operation containing message serials."""
50+
51+
def __init__(self, serials=None):
52+
"""
53+
Args:
54+
serials: List of message serials (strings or None) in 1:1 correspondence with published messages.
55+
"""
56+
self.__serials = serials or []
57+
58+
@property
59+
def serials(self):
60+
return self.__serials
61+
62+
@staticmethod
63+
def from_dict(obj):
64+
"""Create PublishResult from dictionary."""
65+
if obj is None:
66+
return PublishResult()
67+
return PublishResult(serials=obj.get('serials', []))
68+
69+
70+
class UpdateDeleteResult:
71+
"""Result of an update or delete operation containing version serial."""
72+
73+
def __init__(self, version_serial=None):
74+
"""
75+
Args:
76+
version_serial: The serial of the resulting message version after the operation.
77+
"""
78+
self.__version_serial = version_serial
79+
80+
@property
81+
def version_serial(self):
82+
return self.__version_serial
83+
84+
@staticmethod
85+
def from_dict(obj):
86+
"""Create UpdateDeleteResult from dictionary."""
87+
if obj is None:
88+
return UpdateDeleteResult()
89+
return UpdateDeleteResult(version_serial=obj.get('versionSerial'))

0 commit comments

Comments
 (0)