Skip to content

Commit 737d738

Browse files
committed
feat: add support for message update, delete, and append operations
- Introduced `_send_update` method to handle message operations (update, delete, append) on the channel. - Added `update_message`, `delete_message`, and `append_message` methods to enable respective operations via the API. - Implemented `MessageOperation`, `PublishResult`, and `UpdateDeleteResult` classes for handling operation metadata and results. - Bumped `api_version` to 5.
1 parent 048fbd1 commit 737d738

10 files changed

Lines changed: 753 additions & 24 deletions

ably/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
from ably.types.capability import Capability
88
from ably.types.channelsubscription import PushChannelSubscription
99
from ably.types.device import DeviceDetails
10+
from ably.types.message import MessageAction, MessageVersion
11+
from ably.types.operations import MessageOperation, PublishResult, UpdateDeleteResult
1012
from ably.types.options import Options, VCDiffDecoder
1113
from ably.util.crypto import CipherParams
1214
from ably.util.exceptions import AblyAuthException, AblyException, IncompatibleClientIdException
@@ -15,5 +17,5 @@
1517
logger = logging.getLogger(__name__)
1618
logger.addHandler(logging.NullHandler())
1719

18-
api_version = '4'
20+
api_version = '5'
1921
lib_version = '2.1.3'

ably/rest/channel.py

Lines changed: 203 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,21 @@
1010

1111
from ably.http.paginatedresult import PaginatedResult, format_params
1212
from ably.types.channeldetails import ChannelDetails
13-
from ably.types.message import Message, make_message_response_handler
13+
from ably.types.message import (
14+
Message,
15+
MessageAction,
16+
MessageVersion,
17+
make_message_response_handler,
18+
make_single_message_response_handler,
19+
)
20+
from ably.types.operations import MessageOperation, PublishResult, UpdateDeleteResult
1421
from ably.types.presence import Presence
1522
from ably.util.crypto import get_cipher
16-
from ably.util.exceptions import IncompatibleClientIdException, catch_all
23+
from ably.util.exceptions import (
24+
AblyException,
25+
IncompatibleClientIdException,
26+
catch_all,
27+
)
1728

1829
log = logging.getLogger(__name__)
1930

@@ -99,7 +110,17 @@ async def publish_messages(self, messages, params=None, timeout=None):
99110
if params:
100111
params = {k: str(v).lower() if type(v) is bool else v for k, v in params.items()}
101112
path += '?' + parse.urlencode(params)
102-
return await self.ably.http.post(path, body=request_body, timeout=timeout)
113+
response = await self.ably.http.post(path, body=request_body, timeout=timeout)
114+
115+
# Parse response to extract serials
116+
try:
117+
result_data = response.to_native()
118+
if result_data and isinstance(result_data, dict):
119+
return PublishResult.from_dict(result_data)
120+
return PublishResult()
121+
except Exception:
122+
# If response parsing fails, return empty PublishResult for backwards compatibility
123+
return PublishResult()
103124

104125
async def publish_name_data(self, name, data, timeout=None):
105126
messages = [Message(name, data)]
@@ -141,6 +162,185 @@ async def status(self):
141162
obj = response.to_native()
142163
return ChannelDetails.from_dict(obj)
143164

165+
async def _send_update(
166+
self,
167+
message: Message,
168+
action: MessageAction,
169+
operation: MessageOperation = None,
170+
params: dict = None,
171+
):
172+
"""Internal method to send update/delete/append operations."""
173+
if not message.serial:
174+
raise AblyException(
175+
"Message serial is required for update/delete/append operations",
176+
400,
177+
40000
178+
)
179+
180+
# Create a new message with the operation fields
181+
update_message = Message(
182+
name=message.name,
183+
data=message.data,
184+
client_id=message.client_id,
185+
serial=message.serial,
186+
action=action,
187+
version=MessageVersion(
188+
client_id=operation.client_id,
189+
description=operation.description,
190+
metadata=operation.metadata,
191+
),
192+
)
193+
194+
# Encrypt if needed
195+
if self.cipher:
196+
update_message.encrypt(self.__cipher)
197+
198+
# Serialize the message
199+
request_body = update_message.as_dict(binary=self.ably.options.use_binary_protocol)
200+
201+
if not self.ably.options.use_binary_protocol:
202+
request_body = json.dumps(request_body, separators=(',', ':'))
203+
else:
204+
request_body = msgpack.packb(request_body, use_bin_type=True)
205+
206+
# Build path with params
207+
path = self.__base_path + 'messages/{}'.format(parse.quote_plus(message.serial, safe=':'))
208+
if params:
209+
params = {k: str(v).lower() if type(v) is bool else v for k, v in params.items()}
210+
path += '?' + parse.urlencode(params)
211+
212+
# Send request
213+
response = await self.ably.http.patch(path, body=request_body)
214+
215+
# Parse response
216+
try:
217+
result_data = response.to_native()
218+
if result_data and isinstance(result_data, dict):
219+
return UpdateDeleteResult.from_dict(result_data)
220+
return UpdateDeleteResult()
221+
except Exception:
222+
return UpdateDeleteResult()
223+
224+
async def update_message(self, message: Message, operation: MessageOperation = None, params: dict = None):
225+
"""Updates an existing message on this channel.
226+
227+
Parameters:
228+
- message: Message object to update. Must have a serial field.
229+
- operation: Optional MessageOperation containing description and metadata for the update.
230+
- params: Optional dict of query parameters.
231+
232+
Returns:
233+
- UpdateDeleteResult containing the version serial of the updated message.
234+
"""
235+
return await self._send_update(message, MessageAction.MESSAGE_UPDATE, operation, params)
236+
237+
async def delete_message(self, message: Message, operation: MessageOperation = None, params: dict = None):
238+
"""Deletes a message on this channel.
239+
240+
Parameters:
241+
- message: Message object to delete. Must have a serial field.
242+
- operation: Optional MessageOperation containing description and metadata for the delete.
243+
- params: Optional dict of query parameters.
244+
245+
Returns:
246+
- UpdateDeleteResult containing the version serial of the deleted message.
247+
"""
248+
return await self._send_update(message, MessageAction.MESSAGE_DELETE, operation, params)
249+
250+
async def append_message(self, message: Message, operation: MessageOperation = None, params: dict = None):
251+
"""Appends data to an existing message on this channel.
252+
253+
Parameters:
254+
- message: Message object with data to append. Must have a serial field.
255+
- operation: Optional MessageOperation containing description and metadata for the append.
256+
- params: Optional dict of query parameters.
257+
258+
Returns:
259+
- UpdateDeleteResult containing the version serial of the appended message.
260+
"""
261+
return await self._send_update(message, MessageAction.MESSAGE_APPEND, operation, params)
262+
263+
async def get_message(self, serial_or_message, timeout=None):
264+
"""Retrieves a single message by its serial.
265+
266+
Parameters:
267+
- serial_or_message: Either a string serial or a Message object with a serial field.
268+
269+
Returns:
270+
- Message object for the requested serial.
271+
272+
Raises:
273+
- AblyException: If the serial is missing or the message cannot be retrieved.
274+
"""
275+
# Extract serial from string or Message object
276+
if isinstance(serial_or_message, str):
277+
serial = serial_or_message
278+
elif isinstance(serial_or_message, Message):
279+
serial = serial_or_message.serial
280+
else:
281+
serial = None
282+
283+
if not serial:
284+
raise AblyException(
285+
'This message lacks a serial. Make sure you have enabled "Message annotations, '
286+
'updates, and deletes" in channel settings on your dashboard.',
287+
400,
288+
40003
289+
)
290+
291+
# Build the path
292+
path = self.__base_path + 'messages/' + parse.quote_plus(serial, safe=':')
293+
294+
# Make the request
295+
response = await self.ably.http.get(path, timeout=timeout)
296+
297+
# Create Message from the response
298+
message_handler = make_single_message_response_handler(self.__cipher)
299+
return message_handler(response)
300+
301+
async def get_message_versions(self, serial_or_message, params=None):
302+
"""Retrieves version history for a message.
303+
304+
Parameters:
305+
- serial_or_message: Either a string serial or a Message object with a serial field.
306+
- params: Optional dict of query parameters for pagination (e.g., limit, start, end, direction).
307+
308+
Returns:
309+
- PaginatedResult containing Message objects representing each version.
310+
311+
Raises:
312+
- AblyException: If the serial is missing or versions cannot be retrieved.
313+
"""
314+
# Extract serial from string or Message object
315+
if isinstance(serial_or_message, str):
316+
serial = serial_or_message
317+
elif isinstance(serial_or_message, Message):
318+
serial = serial_or_message.serial
319+
else:
320+
serial = None
321+
322+
if not serial:
323+
raise AblyException(
324+
'This message lacks a serial. Make sure you have enabled "Message annotations, '
325+
'updates, and deletes" in channel settings on your dashboard.',
326+
400,
327+
40003
328+
)
329+
330+
# Build the path
331+
params_str = format_params({}, **params) if params else ''
332+
path = self.__base_path + 'messages/' + parse.quote_plus(serial, safe=':') + '/versions' + params_str
333+
334+
# Create message handler for decoding
335+
message_handler = make_message_response_handler(self.__cipher)
336+
337+
# Return paginated result
338+
return await PaginatedResult.paginated_query(
339+
self.ably.http,
340+
url=path,
341+
response_processor=message_handler
342+
)
343+
144344
@property
145345
def ably(self):
146346
return self.__ably

ably/types/message.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -135,18 +135,8 @@ def __init__(self,
135135
self.__timestamp = timestamp
136136
self.__extras = extras
137137
self.__serial = serial
138-
# Handle action - can be MessageAction enum, int, or None
139-
if action is not None:
140-
try:
141-
self.__action = MessageAction(action)
142-
except ValueError:
143-
# If it's not a valid action value, store as None
144-
self.__action = None
145-
if isinstance(version, MessageVersion):
146-
self.__version = version
147-
else:
148-
self.__version = MessageVersion.from_dict(version)
149-
138+
self.__action = action
139+
self.__version = version
150140

151141
def __eq__(self, other):
152142
if isinstance(other, Message):
@@ -315,6 +305,20 @@ def from_encoded(obj, cipher=None, context=None):
315305

316306
decoded_data = Message.decode(data, encoding, cipher, context)
317307

308+
if action is not None:
309+
try:
310+
action = MessageAction(action)
311+
except ValueError:
312+
# If it's not a valid action value, store as None
313+
action = None
314+
else:
315+
action = None
316+
317+
if version is not None:
318+
version = MessageVersion.from_dict(version)
319+
else:
320+
version = MessageVersion(serial=serial, timestamp=timestamp)
321+
318322
return Message(
319323
id=id,
320324
name=name,
@@ -359,3 +363,9 @@ def encrypted_message_response_handler(response):
359363
messages = response.to_native()
360364
return Message.from_encoded_array(messages, cipher=cipher)
361365
return encrypted_message_response_handler
366+
367+
def make_single_message_response_handler(cipher):
368+
def encrypted_message_response_handler(response):
369+
message = response.to_native()
370+
return Message.from_encoded(message, cipher=cipher)
371+
return encrypted_message_response_handler

ably/types/operations.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
class MessageOperation:
2+
"""Metadata for message update/delete/append operations."""
3+
4+
def __init__(self, client_id=None, description=None, metadata=None):
5+
"""
6+
Args:
7+
description: Optional description of the operation.
8+
metadata: Optional dict of metadata key-value pairs associated with the operation.
9+
"""
10+
self.__client_id = client_id
11+
self.__description = description
12+
self.__metadata = metadata
13+
14+
@property
15+
def client_id(self):
16+
return self.__client_id
17+
18+
@property
19+
def description(self):
20+
return self.__description
21+
22+
@property
23+
def metadata(self):
24+
return self.__metadata
25+
26+
def as_dict(self):
27+
"""Convert MessageOperation to dictionary format."""
28+
result = {
29+
'clientId': self.client_id,
30+
'description': self.description,
31+
'metadata': self.metadata,
32+
}
33+
# Remove None values
34+
return {k: v for k, v in result.items() if v is not None}
35+
36+
@staticmethod
37+
def from_dict(obj):
38+
"""Create MessageOperation from dictionary."""
39+
if obj is None:
40+
return None
41+
return MessageOperation(
42+
client_id=obj.get('clientId'),
43+
description=obj.get('description'),
44+
metadata=obj.get('metadata'),
45+
)
46+
47+
48+
class PublishResult:
49+
"""Result of a publish operation containing message serials."""
50+
51+
def __init__(self, serials=None):
52+
"""
53+
Args:
54+
serials: List of message serials (strings or None) in 1:1 correspondence with published messages.
55+
"""
56+
self.__serials = serials or []
57+
58+
@property
59+
def serials(self):
60+
return self.__serials
61+
62+
@staticmethod
63+
def from_dict(obj):
64+
"""Create PublishResult from dictionary."""
65+
if obj is None:
66+
return PublishResult()
67+
return PublishResult(serials=obj.get('serials', []))
68+
69+
70+
class UpdateDeleteResult:
71+
"""Result of an update or delete operation containing version serial."""
72+
73+
def __init__(self, version_serial=None):
74+
"""
75+
Args:
76+
version_serial: The serial of the resulting message version after the operation.
77+
"""
78+
self.__version_serial = version_serial
79+
80+
@property
81+
def version_serial(self):
82+
return self.__version_serial
83+
84+
@staticmethod
85+
def from_dict(obj):
86+
"""Create UpdateDeleteResult from dictionary."""
87+
if obj is None:
88+
return UpdateDeleteResult()
89+
return UpdateDeleteResult(version_serial=obj.get('versionSerial'))

0 commit comments

Comments
 (0)