Skip to content

Commit eee4d33

Browse files
committed
[AIT-316] feat: enhance annotation handling and protocol integration
- Added support for updating annotation fields (`id`, `connectionId`, `timestamp`) from protocol messages. - Introduced validation for required annotation fields in `RestAnnotations`. - Enabled idempotent annotation publishing with auto-generated IDs. - Improved error handling for annotation processing in `RealtimeChannel`. - Allowed unsubscribing all annotation listeners when no arguments are provided.
1 parent 42c0fd4 commit eee4d33

5 files changed

Lines changed: 87 additions & 10 deletions

File tree

ably/realtime/annotations.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,16 +193,17 @@ def unsubscribe(self, *args):
193193
Unsubscribe from all annotations on the channel
194194
195195
When no type is provided, arg1 is used as the listener.
196+
When no arguments are provided, unsubscribes all annotation listeners (RTAN5).
196197
197198
Raises
198199
------
199200
ValueError
200-
If no valid unsubscribe arguments are passed
201+
If invalid unsubscribe arguments are passed
201202
"""
203+
# RTAN5: Support no arguments to unsubscribe all annotation listeners
202204
if len(args) == 0:
203-
raise ValueError("annotations.unsubscribe called without arguments")
204-
205-
if len(args) >= 2 and isinstance(args[0], str):
205+
self.__subscriptions.off()
206+
elif len(args) >= 2 and isinstance(args[0], str):
206207
annotation_type = args[0]
207208
listener = args[1]
208209
self.__subscriptions.off(annotation_type, listener)

ably/realtime/channel.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,11 +758,15 @@ def _on_message(self, proto_msg: dict) -> None:
758758
self.__presence.set_presence(decoded_presence, is_sync=True, sync_channel_serial=sync_channel_serial)
759759
elif action == ProtocolMessageAction.ANNOTATION:
760760
# Handle ANNOTATION messages
761+
# RTAN4b: Populate annotation fields from protocol message
762+
Annotation.update_inner_annotation_fields(proto_msg)
761763
annotation_data = proto_msg.get('annotations', [])
762764
try:
763765
annotations = Annotation.from_encoded_array(annotation_data, cipher=self.cipher)
764766
# Process annotations through the annotations handler
765767
self.annotations._process_incoming(annotations)
768+
# RTL15b: Update channel serial for ANNOTATION messages
769+
self.__channel_serial = channel_serial
766770
except Exception as e:
767771
log.error(f"Annotation processing error {e}. Skip annotations {annotation_data}")
768772
elif action == ProtocolMessageAction.ERROR:

ably/rest/annotations.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import json
44
import logging
5+
import uuid
56
from urllib import parse
67

78
import msgpack
@@ -13,6 +14,7 @@
1314
make_annotation_response_handler,
1415
)
1516
from ably.types.message import Message
17+
from ably.types.options import Options
1618
from ably.util.exceptions import AblyException
1719

1820
log = logging.getLogger(__name__)
@@ -73,6 +75,14 @@ def construct_validate_annotation(msg_or_serial, annotation: Annotation) -> Anno
7375
code=40003,
7476
)
7577

78+
# RSAN1a3: Validate that annotation type is specified
79+
if not annotation.type:
80+
raise AblyException(
81+
message='Annotation type must be specified',
82+
status_code=400,
83+
code=40000,
84+
)
85+
7686
return annotation._copy_with(
7787
message_serial=message_serial,
7888
)
@@ -83,6 +93,8 @@ class RestAnnotations:
8393
Provides REST API methods for managing annotations on messages.
8494
"""
8595

96+
__client_options: Options
97+
8698
def __init__(self, channel):
8799
"""
88100
Initialize RestAnnotations.
@@ -91,6 +103,7 @@ def __init__(self, channel):
91103
channel: The REST Channel this annotations instance belongs to
92104
"""
93105
self.__channel = channel
106+
self.__client_options = channel.ably.options
94107

95108
def __base_path_for_serial(self, serial):
96109
"""
@@ -127,6 +140,10 @@ async def publish(
127140
"""
128141
annotation = construct_validate_annotation(msg_or_serial, annotation)
129142

143+
# RSAN1c4: Generate random ID if not provided (for idempotent publishing)
144+
if not annotation.id and self.__client_options.idempotent_rest_publishing:
145+
annotation = annotation._copy_with(id=str(uuid.uuid4()))
146+
130147
# Convert to wire format
131148
request_body = annotation.as_dict(binary=self.__channel.ably.options.use_binary_protocol)
132149

@@ -142,7 +159,7 @@ async def publish(
142159
# Build path
143160
path = self.__base_path_for_serial(annotation.message_serial)
144161
if params:
145-
params = {k: str(v).lower() if type(v) is bool else v for k, v in params.items()}
162+
params = {k: str(v).lower() if isinstance(v, bool) else v for k, v in params.items()}
146163
path += '?' + parse.urlencode(params)
147164

148165
# Send request

ably/types/annotation.py

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ def __init__(self,
3434
count=None,
3535
data=None,
3636
encoding='',
37+
id=None,
3738
client_id=None,
39+
connection_id=None,
3840
timestamp=None,
3941
extras=None):
4042
"""
@@ -47,7 +49,9 @@ def __init__(self,
4749
count: Count associated with the annotation
4850
data: Optional data payload for the annotation
4951
encoding: Encoding format for the data
52+
id: (TAN2a) A unique identifier for this annotation
5053
client_id: The client ID that created this annotation
54+
connection_id: The connection ID that created this annotation
5155
timestamp: Timestamp of the annotation
5256
extras: Additional metadata
5357
"""
@@ -60,18 +64,25 @@ def __init__(self,
6064
self.__action = action if action is not None else AnnotationAction.ANNOTATION_CREATE
6165
self.__count = count
6266
self.__data = data
67+
self.__id = to_text(id) if id is not None else None
6368
self.__client_id = to_text(client_id) if client_id is not None else None
69+
self.__connection_id = to_text(connection_id) if connection_id is not None else None
6470
self.__timestamp = timestamp
6571
self.__extras = extras
6672
self.__encoding = encoding
6773

6874
def __eq__(self, other):
6975
if isinstance(other, Annotation):
70-
return (self.serial == other.serial
71-
and self.message_serial == other.message_serial
76+
# TAN2i: serial is the unique identifier for the annotation
77+
# If both have serials, use serial for comparison
78+
if self.serial is not None and other.serial is not None:
79+
return self.serial == other.serial
80+
# Otherwise fall back to comparing multiple fields
81+
return (self.message_serial == other.message_serial
7282
and self.type == other.type
7383
and self.name == other.name
74-
and self.action == other.action)
84+
and self.action == other.action
85+
and self.client_id == other.client_id)
7586
return NotImplemented
7687

7788
def __ne__(self, other):
@@ -121,6 +132,14 @@ def timestamp(self):
121132
def extras(self):
122133
return self.__extras
123134

135+
@property
136+
def id(self):
137+
return self.__id
138+
139+
@property
140+
def connection_id(self):
141+
return self.__connection_id
142+
124143
def as_dict(self, binary=False):
125144
"""
126145
Convert annotation to dictionary format for API communication.
@@ -134,7 +153,9 @@ def as_dict(self, binary=False):
134153
'type': self.type, # Annotation type (not data type)
135154
'name': self.name,
136155
'count': self.count,
156+
'id': self.id or None,
137157
'clientId': self.client_id or None,
158+
'connectionId': self.connection_id or None,
138159
'timestamp': self.timestamp or None,
139160
'extras': self.extras,
140161
**encode_data(self.data, self._encoding_array, binary)
@@ -160,7 +181,9 @@ def from_encoded(obj, cipher=None, context=None):
160181
count = obj.get('count')
161182
data = obj.get('data')
162183
encoding = obj.get('encoding', '')
184+
id = obj.get('id')
163185
client_id = obj.get('clientId')
186+
connection_id = obj.get('connectionId')
164187
timestamp = obj.get('timestamp')
165188
extras = obj.get('extras', None)
166189

@@ -184,7 +207,9 @@ def from_encoded(obj, cipher=None, context=None):
184207
type=type_val,
185208
name=name,
186209
count=count,
210+
id=id,
187211
client_id=client_id,
212+
connection_id=connection_id,
188213
timestamp=timestamp,
189214
extras=extras,
190215
**decoded_data
@@ -200,6 +225,31 @@ def from_values(values):
200225
"""Create an Annotation from a dict of values"""
201226
return Annotation(**values)
202227

228+
@staticmethod
229+
def __update_empty_fields(proto_msg: dict, annotation: dict, annotation_index: int):
230+
"""Update empty annotation fields with values from protocol message"""
231+
if annotation.get("id") is None or annotation.get("id") == '':
232+
annotation['id'] = f"{proto_msg.get('id')}:{annotation_index}"
233+
if annotation.get("connectionId") is None or annotation.get("connectionId") == '':
234+
annotation['connectionId'] = proto_msg.get('connectionId')
235+
if annotation.get("timestamp") is None or annotation.get("timestamp") == 0:
236+
annotation['timestamp'] = proto_msg.get('timestamp')
237+
238+
@staticmethod
239+
def update_inner_annotation_fields(proto_msg: dict):
240+
"""
241+
Update inner annotation fields with protocol message data (RTAN4b).
242+
243+
Populates empty id, connectionId, and timestamp fields in annotations
244+
from the protocol message values.
245+
"""
246+
annotations: list[dict] = proto_msg.get('annotations')
247+
if annotations is not None:
248+
annotation_index = 0
249+
for annotation in annotations:
250+
Annotation.__update_empty_fields(proto_msg, annotation, annotation_index)
251+
annotation_index = annotation_index + 1
252+
203253
def __str__(self):
204254
return (
205255
f"Annotation(action={self.action}, messageSerial={self.message_serial}, "
@@ -218,7 +268,9 @@ def _copy_with(self,
218268
count=_UNSET,
219269
data=_UNSET,
220270
encoding=_UNSET,
271+
id=_UNSET,
221272
client_id=_UNSET,
273+
connection_id=_UNSET,
222274
timestamp=_UNSET,
223275
extras=_UNSET):
224276
"""
@@ -236,7 +288,9 @@ def _copy_with(self,
236288
count: Override the count (or None to clear it)
237289
data: Override the data payload (or None to clear it)
238290
encoding: Override the encoding format (or None to clear it)
291+
id: Override the ID (or None to clear it)
239292
client_id: Override the client ID (or None to clear it)
293+
connection_id: Override the connection ID (or None to clear it)
240294
timestamp: Override the timestamp (or None to clear it)
241295
extras: Override the extras metadata (or None to clear it)
242296
@@ -260,7 +314,9 @@ def _copy_with(self,
260314
count=self.__count if count is _UNSET else count,
261315
data=self.__data if data is _UNSET else data,
262316
encoding=self.__encoding if encoding is _UNSET else encoding,
317+
id=self.__id if id is _UNSET else id,
263318
client_id=self.__client_id if client_id is _UNSET else client_id,
319+
connection_id=self.__connection_id if connection_id is _UNSET else connection_id,
264320
timestamp=self.__timestamp if timestamp is _UNSET else timestamp,
265321
extras=self.__extras if extras is _UNSET else extras,
266322
)

ably/util/encoding.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ def encode_data(data: Any, encoding_array: list, binary: bool = False):
1111

1212
if isinstance(data, (dict, list)):
1313
encoding.append('json')
14-
data = json.dumps(data)
15-
data = str(data)
14+
data = json.dumps(data) # json.dumps already returns str
1615
elif isinstance(data, str) and not binary:
1716
pass
1817
elif not binary and isinstance(data, (bytearray, bytes)):

0 commit comments

Comments
 (0)