Skip to content

Commit b32ddd9

Browse files
committed
Annotation review fixes: spec compliance and code cleanup
- Refactor publish/delete to use shared __send_annotation() with explicit action setting per RSAN1c1/RSAN2a/RTAN1a/RTAN2a - RTAN4e: Change subscribe mode check from exception to warning per spec; guard against empty modes when server doesn't send flags - RTAN4c/RTAN5a: Support array of types in subscribe/unsubscribe - RSAN1c4: Fix idempotent ID generation to use base64(9 random bytes):0 - Export Annotation, AnnotationAction, ChannelMode, ChannelOptions from ably - Use isinstance() consistently for bool checks across channel modules
1 parent eee4d33 commit b32ddd9

6 files changed

Lines changed: 124 additions & 89 deletions

File tree

ably/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
from ably.rest.auth import Auth
55
from ably.rest.push import Push
66
from ably.rest.rest import AblyRest
7+
from ably.types.annotation import Annotation, AnnotationAction
78
from ably.types.capability import Capability
9+
from ably.types.channelmode import ChannelMode
10+
from ably.types.channeloptions import ChannelOptions
811
from ably.types.channelsubscription import PushChannelSubscription
912
from ably.types.device import DeviceDetails
1013
from ably.types.message import MessageAction, MessageVersion

ably/realtime/annotations.py

Lines changed: 74 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -40,30 +40,21 @@ def __init__(self, channel: RealtimeChannel, connection_manager: ConnectionManag
4040
self.__subscriptions = EventEmitter()
4141
self.__rest_annotations = RestAnnotations(channel)
4242

43-
async def publish(self, msg_or_serial, annotation: Annotation, params: dict | None = None):
43+
async def __send_annotation(self, annotation: Annotation, params: dict | None = None):
4444
"""
45-
Publish an annotation on a message via the realtime connection.
45+
Internal method to send an annotation via the realtime connection.
4646
4747
Args:
48-
msg_or_serial: Either a message serial (string) or a Message object
49-
annotation: Annotation object
48+
annotation: Validated Annotation object with action and message_serial set
5049
params: Optional dict of query parameters
51-
52-
Returns:
53-
None
54-
55-
Raises:
56-
AblyException: If the request fails, inputs are invalid, or channel is in unpublishable state
5750
"""
58-
annotation = construct_validate_annotation(msg_or_serial, annotation)
59-
6051
# Check if channel and connection are in publishable state
6152
self.__channel._throw_if_unpublishable_state()
6253

6354
log.info(
64-
f'RealtimeAnnotations.publish(), channelName = {self.__channel.name}, '
65-
f'sending annotation with messageSerial = {annotation.message_serial}, '
66-
f'type = {annotation.type}'
55+
f'RealtimeAnnotations: sending annotation, channelName = {self.__channel.name}, '
56+
f'messageSerial = {annotation.message_serial}, '
57+
f'type = {annotation.type}, action = {annotation.action}'
6758
)
6859

6960
# Convert to wire format (array of annotations)
@@ -84,6 +75,28 @@ async def publish(self, msg_or_serial, annotation: Annotation, params: dict | No
8475
# Send via WebSocket
8576
await self.__connection_manager.send_protocol_message(protocol_message)
8677

78+
async def publish(self, msg_or_serial, annotation: Annotation, params: dict | None = None):
79+
"""
80+
Publish an annotation on a message via the realtime connection.
81+
82+
Args:
83+
msg_or_serial: Either a message serial (string) or a Message object
84+
annotation: Annotation object
85+
params: Optional dict of query parameters
86+
87+
Returns:
88+
None
89+
90+
Raises:
91+
AblyException: If the request fails, inputs are invalid, or channel is in unpublishable state
92+
"""
93+
annotation = construct_validate_annotation(msg_or_serial, annotation)
94+
95+
# RSAN1c1/RTAN1a: Explicitly set action to ANNOTATION_CREATE
96+
annotation = annotation._copy_with(action=AnnotationAction.ANNOTATION_CREATE)
97+
98+
await self.__send_annotation(annotation, params)
99+
87100
async def delete(
88101
self,
89102
msg_or_serial,
@@ -93,9 +106,6 @@ async def delete(
93106
"""
94107
Delete an annotation on a message.
95108
96-
This is a convenience method that sets the action to 'annotation.delete'
97-
and calls publish().
98-
99109
Args:
100110
msg_or_serial: Either a message serial (string) or a Message object
101111
annotation: Annotation containing annotation properties
@@ -107,23 +117,24 @@ async def delete(
107117
Raises:
108118
AblyException: If the request fails or inputs are invalid
109119
"""
110-
return await self.publish(
111-
msg_or_serial,
112-
annotation._copy_with(action=AnnotationAction.ANNOTATION_DELETE),
113-
params,
114-
)
120+
annotation = construct_validate_annotation(msg_or_serial, annotation)
121+
122+
# RSAN2a/RTAN2a: Explicitly set action to ANNOTATION_DELETE
123+
annotation = annotation._copy_with(action=AnnotationAction.ANNOTATION_DELETE)
124+
125+
await self.__send_annotation(annotation, params)
115126

116127
async def subscribe(self, *args):
117128
"""
118129
Subscribe to annotation events on this channel.
119130
120131
Parameters
121132
----------
122-
*args: type, listener
123-
Subscribe type and listener
133+
*args: type_or_types, listener
134+
Subscribe type(s) and listener
124135
125-
arg1(type): str, optional
126-
Subscribe to annotations of the given type
136+
arg1(type_or_types): str or list[str], optional
137+
Subscribe to annotations of the given type or types (RTAN4c)
127138
128139
arg2(listener): callable
129140
Subscribe to all annotations on the channel
@@ -132,62 +143,63 @@ async def subscribe(self, *args):
132143
133144
Raises
134145
------
135-
AblyException
136-
If unable to subscribe due to invalid channel state or missing ANNOTATION_SUBSCRIBE mode
137146
ValueError
138147
If no valid subscribe arguments are passed
139148
"""
140149
# Parse arguments similar to channel.subscribe
141150
if len(args) == 0:
142151
raise ValueError("annotations.subscribe called without arguments")
143152

144-
if len(args) >= 2 and isinstance(args[0], str):
145-
annotation_type = args[0]
153+
annotation_types = None
154+
155+
# RTAN4c: Support string or list of strings as first argument
156+
if len(args) >= 2 and isinstance(args[0], (str, list)):
157+
if isinstance(args[0], list):
158+
annotation_types = args[0]
159+
else:
160+
annotation_types = [args[0]]
146161
if not args[1]:
147162
raise ValueError("annotations.subscribe called without listener")
148163
if not is_callable_or_coroutine(args[1]):
149164
raise ValueError("subscribe listener must be function or coroutine function")
150165
listener = args[1]
151166
elif is_callable_or_coroutine(args[0]):
152167
listener = args[0]
153-
annotation_type = None
154168
else:
155169
raise ValueError('invalid subscribe arguments')
156170

157-
# Register subscription
158-
if annotation_type is not None:
159-
self.__subscriptions.on(annotation_type, listener)
160-
else:
161-
self.__subscriptions.on(listener)
162-
171+
# RTAN4d: Implicitly attach channel on subscribe
163172
await self.__channel.attach()
164173

165-
# Check if ANNOTATION_SUBSCRIBE mode is enabled
166-
if self.__channel.state == ChannelState.ATTACHED:
174+
# RTAN4e: Check if ANNOTATION_SUBSCRIBE mode is enabled (log warning per spec),
175+
# only when server explicitly sent modes (non-empty list)
176+
if self.__channel.state == ChannelState.ATTACHED and self.__channel.modes:
167177
if ChannelMode.ANNOTATION_SUBSCRIBE not in self.__channel.modes:
168-
if annotation_type is not None:
169-
self.__subscriptions.off(annotation_type, listener)
170-
else:
171-
self.__subscriptions.off(listener)
172-
raise AblyException(
173-
message="You are trying to add an annotation listener, but you haven't requested the "
174-
"annotation_subscribe channel mode in ChannelOptions, so this won't do anything "
175-
"(we only deliver annotations to clients who have explicitly requested them)",
176-
code=93001,
177-
status_code=400,
178+
log.warning(
179+
"You are trying to add an annotation listener, but the "
180+
"ANNOTATION_SUBSCRIBE channel mode was not included in the ATTACHED flags. "
181+
"This subscription may not receive annotations. Ensure you request the "
182+
"annotation_subscribe channel mode in ChannelOptions."
178183
)
179184

185+
# Register subscription after successful attach
186+
if annotation_types is not None:
187+
for t in annotation_types:
188+
self.__subscriptions.on(t, listener)
189+
else:
190+
self.__subscriptions.on(listener)
191+
180192
def unsubscribe(self, *args):
181193
"""
182194
Unsubscribe from annotation events on this channel.
183195
184196
Parameters
185197
----------
186-
*args: type, listener
187-
Unsubscribe type and listener
198+
*args: type_or_types, listener
199+
Unsubscribe type(s) and listener
188200
189-
arg1(type): str, optional
190-
Unsubscribe from annotations of the given type
201+
arg1(type_or_types): str or list[str], optional
202+
Unsubscribe from annotations of the given type or types
191203
192204
arg2(listener): callable
193205
Unsubscribe from all annotations on the channel
@@ -203,10 +215,15 @@ def unsubscribe(self, *args):
203215
# RTAN5: Support no arguments to unsubscribe all annotation listeners
204216
if len(args) == 0:
205217
self.__subscriptions.off()
206-
elif len(args) >= 2 and isinstance(args[0], str):
207-
annotation_type = args[0]
218+
elif len(args) >= 2 and isinstance(args[0], (str, list)):
219+
# RTAN5a: Support string or list of strings for type(s)
220+
if isinstance(args[0], list):
221+
annotation_types = args[0]
222+
else:
223+
annotation_types = [args[0]]
208224
listener = args[1]
209-
self.__subscriptions.off(annotation_type, listener)
225+
for t in annotation_types:
226+
self.__subscriptions.off(t, listener)
210227
elif is_callable_or_coroutine(args[0]):
211228
listener = args[0]
212229
self.__subscriptions.off(listener)

ably/realtime/channel.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,7 @@ async def _send_update(
540540
f'channel = {self.name}, state = {self.state}, serial = {message.serial}'
541541
)
542542

543-
stringified_params = {k: str(v).lower() if type(v) is bool else v for k, v in params.items()} \
543+
stringified_params = {k: str(v).lower() if isinstance(v, bool) else v for k, v in params.items()} \
544544
if params else None
545545

546546
# Send protocol message

ably/rest/annotations.py

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
from __future__ import annotations
22

3+
import base64
34
import json
45
import logging
5-
import uuid
6+
import os
67
from urllib import parse
78

89
import msgpack
@@ -118,31 +119,19 @@ def __base_path_for_serial(self, serial):
118119
channel_path = '/channels/{}/'.format(parse.quote_plus(self.__channel.name, safe=':'))
119120
return channel_path + 'messages/' + parse.quote_plus(serial, safe=':') + '/annotations'
120121

121-
async def publish(
122-
self,
123-
msg_or_serial,
124-
annotation: Annotation,
125-
params: dict | None = None,
126-
):
122+
async def __send_annotation(self, annotation: Annotation, params: dict | None = None):
127123
"""
128-
Publish an annotation on a message.
124+
Internal method to send an annotation to the API.
129125
130126
Args:
131-
msg_or_serial: Either a message serial (string) or a Message object
132-
annotation: Annotation object
127+
annotation: Validated Annotation object with action and message_serial set
133128
params: Optional dict of query parameters
134-
135-
Returns:
136-
None
137-
138-
Raises:
139-
AblyException: If the request fails or inputs are invalid
140129
"""
141-
annotation = construct_validate_annotation(msg_or_serial, annotation)
142-
143130
# RSAN1c4: Generate random ID if not provided (for idempotent publishing)
131+
# Spec: base64-encode at least 9 random bytes, append ':0'
144132
if not annotation.id and self.__client_options.idempotent_rest_publishing:
145-
annotation = annotation._copy_with(id=str(uuid.uuid4()))
133+
random_id = base64.b64encode(os.urandom(9)).decode('ascii') + ':0'
134+
annotation = annotation._copy_with(id=random_id)
146135

147136
# Convert to wire format
148137
request_body = annotation.as_dict(binary=self.__channel.ably.options.use_binary_protocol)
@@ -165,6 +154,33 @@ async def publish(
165154
# Send request
166155
await self.__channel.ably.http.post(path, body=request_body)
167156

157+
async def publish(
158+
self,
159+
msg_or_serial,
160+
annotation: Annotation,
161+
params: dict | None = None,
162+
):
163+
"""
164+
Publish an annotation on a message.
165+
166+
Args:
167+
msg_or_serial: Either a message serial (string) or a Message object
168+
annotation: Annotation object
169+
params: Optional dict of query parameters
170+
171+
Returns:
172+
None
173+
174+
Raises:
175+
AblyException: If the request fails or inputs are invalid
176+
"""
177+
annotation = construct_validate_annotation(msg_or_serial, annotation)
178+
179+
# RSAN1c1: Explicitly set action to ANNOTATION_CREATE
180+
annotation = annotation._copy_with(action=AnnotationAction.ANNOTATION_CREATE)
181+
182+
await self.__send_annotation(annotation, params)
183+
168184
async def delete(
169185
self,
170186
msg_or_serial,
@@ -188,11 +204,12 @@ async def delete(
188204
Raises:
189205
AblyException: If the request fails or inputs are invalid
190206
"""
191-
return await self.publish(
192-
msg_or_serial,
193-
annotation._copy_with(action=AnnotationAction.ANNOTATION_DELETE),
194-
params,
195-
)
207+
annotation = construct_validate_annotation(msg_or_serial, annotation)
208+
209+
# RSAN2a: Explicitly set action to ANNOTATION_DELETE
210+
annotation = annotation._copy_with(action=AnnotationAction.ANNOTATION_DELETE)
211+
212+
return await self.__send_annotation(annotation, params)
196213

197214
async def get(self, msg_or_serial, params: dict | None = None):
198215
"""

ably/rest/channel.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ async def publish_messages(self, messages, params=None, timeout=None):
112112

113113
path = self.__base_path + 'messages'
114114
if params:
115-
params = {k: str(v).lower() if type(v) is bool else v for k, v in params.items()}
115+
params = {k: str(v).lower() if isinstance(v, bool) else v for k, v in params.items()}
116116
path += '?' + parse.urlencode(params)
117117
response = await self.ably.http.post(path, body=request_body, timeout=timeout)
118118

@@ -211,7 +211,7 @@ async def _send_update(
211211
# Build path with params
212212
path = self.__base_path + 'messages/{}'.format(parse.quote_plus(message.serial, safe=':'))
213213
if params:
214-
params = {k: str(v).lower() if type(v) is bool else v for k, v in params.items()}
214+
params = {k: str(v).lower() if isinstance(v, bool) else v for k, v in params.items()}
215215
path += '?' + parse.urlencode(params)
216216

217217
# Send request

ably/types/annotation.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,8 @@ def from_encoded(obj, cipher=None, context=None):
187187
timestamp = obj.get('timestamp')
188188
extras = obj.get('extras', None)
189189

190-
# Decode data if present
191-
decoded_data = Annotation.decode(data, encoding, cipher, context) if data is not None else {}
190+
# Decode data if present, passing data=None explicitly when absent
191+
decoded_data = Annotation.decode(data, encoding, cipher, context) if data is not None else {'data': None}
192192

193193
# Convert action from int to enum
194194
if action is not None:
@@ -245,10 +245,8 @@ def update_inner_annotation_fields(proto_msg: dict):
245245
"""
246246
annotations: list[dict] = proto_msg.get('annotations')
247247
if annotations is not None:
248-
annotation_index = 0
249-
for annotation in annotations:
248+
for annotation_index, annotation in enumerate(annotations):
250249
Annotation.__update_empty_fields(proto_msg, annotation, annotation_index)
251-
annotation_index = annotation_index + 1
252250

253251
def __str__(self):
254252
return (

0 commit comments

Comments
 (0)