Skip to content

Commit 1723f5d

Browse files
committed
feat: add support for message update, delete, and append operations
- Introduced `_send_update` method to handle message operations (update, delete, append) on the channel. - Added `update_message`, `delete_message`, and `append_message` methods to enable respective operations via the API. - Implemented `MessageOperation`, `PublishResult`, and `UpdateDeleteResult` classes for handling operation metadata and results. - Bumped `api_version` to 5.
1 parent 048fbd1 commit 1723f5d

10 files changed

Lines changed: 739 additions & 25 deletions

ably/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
from ably.types.capability import Capability
88
from ably.types.channelsubscription import PushChannelSubscription
99
from ably.types.device import DeviceDetails
10+
from ably.types.message import MessageAction, MessageVersion
11+
from ably.types.operations import MessageOperation, PublishResult, UpdateDeleteResult
1012
from ably.types.options import Options, VCDiffDecoder
1113
from ably.util.crypto import CipherParams
1214
from ably.util.exceptions import AblyAuthException, AblyException, IncompatibleClientIdException
@@ -15,5 +17,5 @@
1517
logger = logging.getLogger(__name__)
1618
logger.addHandler(logging.NullHandler())
1719

18-
api_version = '4'
20+
api_version = '5'
1921
lib_version = '2.1.3'

ably/rest/channel.py

Lines changed: 202 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,28 @@
33
import logging
44
import os
55
from collections import OrderedDict
6-
from typing import Iterator
6+
from typing import Iterator, Optional
77
from urllib import parse
88

99
import msgpack
1010

1111
from ably.http.paginatedresult import PaginatedResult, format_params
1212
from ably.types.channeldetails import ChannelDetails
13-
from ably.types.message import Message, make_message_response_handler
13+
from ably.types.message import (
14+
Message,
15+
MessageAction,
16+
MessageVersion,
17+
make_message_response_handler,
18+
make_single_message_response_handler,
19+
)
20+
from ably.types.operations import MessageOperation, PublishResult, UpdateDeleteResult
1421
from ably.types.presence import Presence
1522
from ably.util.crypto import get_cipher
16-
from ably.util.exceptions import IncompatibleClientIdException, catch_all
23+
from ably.util.exceptions import (
24+
AblyException,
25+
IncompatibleClientIdException,
26+
catch_all,
27+
)
1728

1829
log = logging.getLogger(__name__)
1930

@@ -99,7 +110,13 @@ async def publish_messages(self, messages, params=None, timeout=None):
99110
if params:
100111
params = {k: str(v).lower() if type(v) is bool else v for k, v in params.items()}
101112
path += '?' + parse.urlencode(params)
102-
return await self.ably.http.post(path, body=request_body, timeout=timeout)
113+
response = await self.ably.http.post(path, body=request_body, timeout=timeout)
114+
115+
# Parse response to extract serials
116+
result_data = response.to_native()
117+
if result_data and isinstance(result_data, dict):
118+
return PublishResult.from_dict(result_data)
119+
return PublishResult()
103120

104121
async def publish_name_data(self, name, data, timeout=None):
105122
messages = [Message(name, data)]
@@ -141,6 +158,187 @@ async def status(self):
141158
obj = response.to_native()
142159
return ChannelDetails.from_dict(obj)
143160

161+
async def _send_update(
162+
self,
163+
message: Message,
164+
action: MessageAction,
165+
operation: Optional[MessageOperation] = None,
166+
params: Optional[dict] = None,
167+
):
168+
"""Internal method to send update/delete/append operations."""
169+
if not message.serial:
170+
raise AblyException(
171+
"Message serial is required for update/delete/append operations",
172+
400,
173+
40003
174+
)
175+
176+
if not operation:
177+
version = None
178+
else:
179+
version = MessageVersion(
180+
client_id=operation.client_id,
181+
description=operation.description,
182+
metadata=operation.metadata
183+
)
184+
185+
# Create a new message with the operation fields
186+
update_message = Message(
187+
name=message.name,
188+
data=message.data,
189+
client_id=message.client_id,
190+
serial=message.serial,
191+
action=action,
192+
version=version,
193+
)
194+
195+
# Encrypt if needed
196+
if self.cipher:
197+
update_message.encrypt(self.__cipher)
198+
199+
# Serialize the message
200+
request_body = update_message.as_dict(binary=self.ably.options.use_binary_protocol)
201+
202+
if not self.ably.options.use_binary_protocol:
203+
request_body = json.dumps(request_body, separators=(',', ':'))
204+
else:
205+
request_body = msgpack.packb(request_body, use_bin_type=True)
206+
207+
# Build path with params
208+
path = self.__base_path + 'messages/{}'.format(parse.quote_plus(message.serial, safe=':'))
209+
if params:
210+
params = {k: str(v).lower() if type(v) is bool else v for k, v in params.items()}
211+
path += '?' + parse.urlencode(params)
212+
213+
# Send request
214+
response = await self.ably.http.patch(path, body=request_body)
215+
216+
# Parse response
217+
result_data = response.to_native()
218+
if result_data and isinstance(result_data, dict):
219+
return UpdateDeleteResult.from_dict(result_data)
220+
return UpdateDeleteResult()
221+
222+
async def update_message(self, message: Message, operation: MessageOperation = None, params: dict = None):
223+
"""Updates an existing message on this channel.
224+
225+
Parameters:
226+
- message: Message object to update. Must have a serial field.
227+
- operation: Optional MessageOperation containing description and metadata for the update.
228+
- params: Optional dict of query parameters.
229+
230+
Returns:
231+
- UpdateDeleteResult containing the version serial of the updated message.
232+
"""
233+
return await self._send_update(message, MessageAction.MESSAGE_UPDATE, operation, params)
234+
235+
async def delete_message(self, message: Message, operation: MessageOperation = None, params: dict = None):
236+
"""Deletes a message on this channel.
237+
238+
Parameters:
239+
- message: Message object to delete. Must have a serial field.
240+
- operation: Optional MessageOperation containing description and metadata for the delete.
241+
- params: Optional dict of query parameters.
242+
243+
Returns:
244+
- UpdateDeleteResult containing the version serial of the deleted message.
245+
"""
246+
return await self._send_update(message, MessageAction.MESSAGE_DELETE, operation, params)
247+
248+
async def append_message(self, message: Message, operation: MessageOperation = None, params: dict = None):
249+
"""Appends data to an existing message on this channel.
250+
251+
Parameters:
252+
- message: Message object with data to append. Must have a serial field.
253+
- operation: Optional MessageOperation containing description and metadata for the append.
254+
- params: Optional dict of query parameters.
255+
256+
Returns:
257+
- UpdateDeleteResult containing the version serial of the appended message.
258+
"""
259+
return await self._send_update(message, MessageAction.MESSAGE_APPEND, operation, params)
260+
261+
async def get_message(self, serial_or_message, timeout=None):
262+
"""Retrieves a single message by its serial.
263+
264+
Parameters:
265+
- serial_or_message: Either a string serial or a Message object with a serial field.
266+
267+
Returns:
268+
- Message object for the requested serial.
269+
270+
Raises:
271+
- AblyException: If the serial is missing or the message cannot be retrieved.
272+
"""
273+
# Extract serial from string or Message object
274+
if isinstance(serial_or_message, str):
275+
serial = serial_or_message
276+
elif isinstance(serial_or_message, Message):
277+
serial = serial_or_message.serial
278+
else:
279+
serial = None
280+
281+
if not serial:
282+
raise AblyException(
283+
'This message lacks a serial. Make sure you have enabled "Message annotations, '
284+
'updates, and deletes" in channel settings on your dashboard.',
285+
400,
286+
40003
287+
)
288+
289+
# Build the path
290+
path = self.__base_path + 'messages/' + parse.quote_plus(serial, safe=':')
291+
292+
# Make the request
293+
response = await self.ably.http.get(path, timeout=timeout)
294+
295+
# Create Message from the response
296+
message_handler = make_single_message_response_handler(self.__cipher)
297+
return message_handler(response)
298+
299+
async def get_message_versions(self, serial_or_message, params=None):
300+
"""Retrieves version history for a message.
301+
302+
Parameters:
303+
- serial_or_message: Either a string serial or a Message object with a serial field.
304+
- params: Optional dict of query parameters for pagination (e.g., limit, start, end, direction).
305+
306+
Returns:
307+
- PaginatedResult containing Message objects representing each version.
308+
309+
Raises:
310+
- AblyException: If the serial is missing or versions cannot be retrieved.
311+
"""
312+
# Extract serial from string or Message object
313+
if isinstance(serial_or_message, str):
314+
serial = serial_or_message
315+
elif isinstance(serial_or_message, Message):
316+
serial = serial_or_message.serial
317+
else:
318+
serial = None
319+
320+
if not serial:
321+
raise AblyException(
322+
'This message lacks a serial. Make sure you have enabled "Message annotations, '
323+
'updates, and deletes" in channel settings on your dashboard.',
324+
400,
325+
40003
326+
)
327+
328+
# Build the path
329+
params_str = format_params({}, **params) if params else ''
330+
path = self.__base_path + 'messages/' + parse.quote_plus(serial, safe=':') + '/versions' + params_str
331+
332+
# Create message handler for decoding
333+
message_handler = make_message_response_handler(self.__cipher)
334+
335+
# Return paginated result
336+
return await PaginatedResult.paginated_query(
337+
self.ably.http,
338+
url=path,
339+
response_processor=message_handler
340+
)
341+
144342
@property
145343
def ably(self):
146344
return self.__ably

ably/types/message.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -135,18 +135,8 @@ def __init__(self,
135135
self.__timestamp = timestamp
136136
self.__extras = extras
137137
self.__serial = serial
138-
# Handle action - can be MessageAction enum, int, or None
139-
if action is not None:
140-
try:
141-
self.__action = MessageAction(action)
142-
except ValueError:
143-
# If it's not a valid action value, store as None
144-
self.__action = None
145-
if isinstance(version, MessageVersion):
146-
self.__version = version
147-
else:
148-
self.__version = MessageVersion.from_dict(version)
149-
138+
self.__action = action
139+
self.__version = version
150140

151141
def __eq__(self, other):
152142
if isinstance(other, Message):
@@ -315,6 +305,21 @@ def from_encoded(obj, cipher=None, context=None):
315305

316306
decoded_data = Message.decode(data, encoding, cipher, context)
317307

308+
if action is not None:
309+
try:
310+
action = MessageAction(action)
311+
except ValueError:
312+
# If it's not a valid action value, store as None
313+
action = None
314+
else:
315+
action = None
316+
317+
if version is not None:
318+
version = MessageVersion.from_dict(version)
319+
else:
320+
# TM2s
321+
version = MessageVersion(serial=serial, timestamp=timestamp)
322+
318323
return Message(
319324
id=id,
320325
name=name,
@@ -359,3 +364,9 @@ def encrypted_message_response_handler(response):
359364
messages = response.to_native()
360365
return Message.from_encoded_array(messages, cipher=cipher)
361366
return encrypted_message_response_handler
367+
368+
def make_single_message_response_handler(cipher):
369+
def encrypted_message_response_handler(response):
370+
message = response.to_native()
371+
return Message.from_encoded(message, cipher=cipher)
372+
return encrypted_message_response_handler

ably/types/operations.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
class MessageOperation:
2+
"""Metadata for message update/delete/append operations."""
3+
4+
def __init__(self, client_id=None, description=None, metadata=None):
5+
"""
6+
Args:
7+
description: Optional description of the operation.
8+
metadata: Optional dict of metadata key-value pairs associated with the operation.
9+
"""
10+
self.__client_id = client_id
11+
self.__description = description
12+
self.__metadata = metadata
13+
14+
@property
15+
def client_id(self):
16+
return self.__client_id
17+
18+
@property
19+
def description(self):
20+
return self.__description
21+
22+
@property
23+
def metadata(self):
24+
return self.__metadata
25+
26+
def as_dict(self):
27+
"""Convert MessageOperation to dictionary format."""
28+
result = {
29+
'clientId': self.client_id,
30+
'description': self.description,
31+
'metadata': self.metadata,
32+
}
33+
# Remove None values
34+
return {k: v for k, v in result.items() if v is not None}
35+
36+
@staticmethod
37+
def from_dict(obj):
38+
"""Create MessageOperation from dictionary."""
39+
if obj is None:
40+
return None
41+
return MessageOperation(
42+
client_id=obj.get('clientId'),
43+
description=obj.get('description'),
44+
metadata=obj.get('metadata'),
45+
)
46+
47+
48+
class PublishResult:
49+
"""Result of a publish operation containing message serials."""
50+
51+
def __init__(self, serials=None):
52+
"""
53+
Args:
54+
serials: List of message serials (strings or None) in 1:1 correspondence with published messages.
55+
"""
56+
self.__serials = serials or []
57+
58+
@property
59+
def serials(self):
60+
return self.__serials
61+
62+
@staticmethod
63+
def from_dict(obj):
64+
"""Create PublishResult from dictionary."""
65+
if obj is None:
66+
return PublishResult()
67+
return PublishResult(serials=obj.get('serials', []))
68+
69+
70+
class UpdateDeleteResult:
71+
"""Result of an update or delete operation containing version serial."""
72+
73+
def __init__(self, version_serial=None):
74+
"""
75+
Args:
76+
version_serial: The serial of the resulting message version after the operation.
77+
"""
78+
self.__version_serial = version_serial
79+
80+
@property
81+
def version_serial(self):
82+
return self.__version_serial
83+
84+
@staticmethod
85+
def from_dict(obj):
86+
"""Create UpdateDeleteResult from dictionary."""
87+
if obj is None:
88+
return UpdateDeleteResult()
89+
return UpdateDeleteResult(version_serial=obj.get('versionSerial'))

0 commit comments

Comments
 (0)