Skip to content

Commit 8f52b9b

Browse files
committed
[AIT-96] feat: RealtimeChannel publish over WebSocket implementation
Implemented Spec points: ## Message Publishing Specifications (RTL6) ### RTL6c - Messages published on channels in specific states - Messages published when channel is not **ATTACHED** should be published immediately ### RTL6c2 - Message queuing behavior - Messages can be queued when connection/channel is not ready - Relates to processing queued messages when connection becomes ready ### RTL6c3 - Publishing without implicit attach ### RTL6c4 - Behavior when queueMessages client option is false ### RTL6d - Message bundling restrictions #### RTL6d1: Maximum message size limits for bundling - **RTL6d2**: All messages in bundle must have same clientId #### RTL6d3: Can only bundle messages for same channel - **RTL6d4**: Can only bundle messages with same action (MESSAGE or PRESENCE) #### RTL6d7: Cannot bundle idempotent messages with non-idempotent messages --- ## Message Acknowledgment (RTN7) ### RTN7a All **PRESENCE**, **MESSAGE**, **ANNOTATION**, and **OBJECT** ProtocolMessages sent to Ably expect either an **ACK** or **NACK** to confirm successful receipt or failure ### RTN7b Every ProtocolMessage requiring acknowledgment must contain a unique serially incrementing `msgSerial` integer starting at zero ### RTN7c If connection enters **SUSPENDED**, **CLOSED**, or **FAILED** state and ACK/NACK has not been received, client should fail those messages and remove them from retry queues ### RTN7d If `queueMessages` is false, messages entering **DISCONNECTED** state without acknowledgment should be treated as failed immediately ### RTN7e When connection state changes to **SUSPENDED**/**CLOSED**/**FAILED**, pending messages (submitted via RTL6c1 or RTL6c2) awaiting ACK/NACK should be considered failed --- ## Message Resending and Serial Handling (RTN19) ### RTN19a Upon reconnection after disconnection, client library must resend all pending messages awaiting acknowledgment, allowing the realtime system to respond with ACK/NACK ### RTN19a2 In the event of a new `connectionId` (connection not resumed), previous `msgSerials` are meaningless and must be reset. The `msgSerial` counter resets to 0 for the new connection --- ## Channel State and Reattachment (RTL3, RTL4, RTL5) ### RTL3c Channel state implications when connection goes into **SUSPENDED** ### RTL3d When connection enters **CONNECTED** state, channels in **ATTACHING**, **ATTACHED**, or **SUSPENDED** states should transition to **ATTACHING** and initiate attach sequence. Connection should process queued messages immediately without waiting for attach operations to finish ### RTL4c - Attach sequence - **RTL4c1**: ATTACH message includes channel serial to resume from previous message or attachment ### RTL5i If channel is **DETACHING**, re-send **DETACH** and remain in 'detaching' state
1 parent 7df692f commit 8f52b9b

5 files changed

Lines changed: 1094 additions & 11 deletions

File tree

ably/realtime/connectionmanager.py

Lines changed: 232 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,85 @@
2424
log = logging.getLogger(__name__)
2525

2626

27+
class PendingMessage:
28+
"""Represents a message awaiting acknowledgment from the server"""
29+
30+
def __init__(self, message: dict):
31+
self.message = message
32+
self.future: asyncio.Future | None = None
33+
action = message.get('action')
34+
35+
# Messages that require acknowledgment: MESSAGE, PRESENCE, ANNOTATION, OBJECT
36+
self.ack_required = action in (
37+
ProtocolMessageAction.MESSAGE,
38+
ProtocolMessageAction.PRESENCE,
39+
ProtocolMessageAction.ANNOTATION,
40+
ProtocolMessageAction.OBJECT,
41+
)
42+
43+
44+
class PendingMessageQueue:
45+
"""Queue for tracking messages awaiting acknowledgment"""
46+
47+
def __init__(self):
48+
self.messages: list[PendingMessage] = []
49+
50+
def push(self, pending_message: PendingMessage) -> None:
51+
"""Add a message to the queue"""
52+
self.messages.append(pending_message)
53+
54+
def count(self) -> int:
55+
"""Return the number of pending messages"""
56+
return len(self.messages)
57+
58+
def complete_messages(self, serial: int, count: int, err: AblyException | None = None) -> None:
59+
"""Complete messages based on serial and count from ACK/NACK
60+
61+
Args:
62+
serial: The msgSerial of the first message being acknowledged
63+
count: The number of messages being acknowledged
64+
err: Error from NACK, or None for successful ACK
65+
"""
66+
log.debug(f'MessageQueue.complete_messages(): serial={serial}, count={count}, err={err}')
67+
68+
if not self.messages:
69+
log.warning('MessageQueue.complete_messages(): called on empty queue')
70+
return
71+
72+
first = self.messages[0]
73+
if first:
74+
start_serial = first.message.get('msgSerial')
75+
if start_serial is None:
76+
log.warning('MessageQueue.complete_messages(): first message has no msgSerial')
77+
return
78+
79+
end_serial = serial + count
80+
81+
if end_serial > start_serial:
82+
# Remove and complete the acknowledged messages
83+
num_to_complete = min(end_serial - start_serial, len(self.messages))
84+
completed_messages = self.messages[:num_to_complete]
85+
self.messages = self.messages[num_to_complete:]
86+
87+
for msg in completed_messages:
88+
if msg.future and not msg.future.done():
89+
if err:
90+
msg.future.set_exception(err)
91+
else:
92+
msg.future.set_result(None)
93+
94+
def complete_all_messages(self, err: AblyException) -> None:
95+
"""Complete all pending messages with an error"""
96+
while self.messages:
97+
msg = self.messages.pop(0)
98+
if msg.future and not msg.future.done():
99+
msg.future.set_exception(err)
100+
101+
def clear(self) -> None:
102+
"""Clear all messages from the queue"""
103+
self.messages.clear()
104+
105+
27106
class ConnectionManager(EventEmitter):
28107
def __init__(self, realtime: AblyRealtime, initial_state):
29108
self.options = realtime.options
@@ -43,6 +122,8 @@ def __init__(self, realtime: AblyRealtime, initial_state):
43122
self.__fallback_hosts: list[str] = self.options.get_fallback_realtime_hosts()
44123
self.queued_messages: Queue = Queue()
45124
self.__error_reason: AblyException | None = None
125+
self.msg_serial: int = 0
126+
self.pending_message_queue: PendingMessageQueue = PendingMessageQueue()
46127
super().__init__()
47128

48129
def enact_state_change(self, state: ConnectionState, reason: AblyException | None = None) -> None:
@@ -87,38 +168,131 @@ async def close_impl(self) -> None:
87168

88169
self.notify_state(ConnectionState.CLOSED)
89170

90-
async def send_protocol_message(self, protocol_message: dict) -> None:
171+
async def send_protocol_message(
172+
self, message_or_pending: dict | PendingMessage
173+
) -> None:
174+
"""Send a protocol message and optionally track it for acknowledgment
175+
176+
Args:
177+
message_or_pending: Either a protocol message dict (new message) or a PendingMessage (requeued message)
178+
Returns:
179+
None
180+
"""
181+
# Handle both new messages (dict) and requeued messages (PendingMessage)
182+
if isinstance(message_or_pending, PendingMessage):
183+
# Requeued message - reuse existing PendingMessage with its Future
184+
pending_message = message_or_pending
185+
protocol_message = pending_message.message
186+
else:
187+
# New message - create PendingMessage wrapper
188+
protocol_message = message_or_pending
189+
pending_message = PendingMessage(protocol_message)
190+
191+
# Create a future for async/await support
192+
if pending_message.ack_required:
193+
pending_message.future = asyncio.Future()
194+
195+
# Assign msgSerial to messages that need acknowledgment
196+
if pending_message.ack_required:
197+
# New message - assign fresh serial
198+
protocol_message['msgSerial'] = self.msg_serial
199+
self.msg_serial += 1
200+
91201
if self.state in (
92202
ConnectionState.DISCONNECTED,
93203
ConnectionState.CONNECTING,
94204
):
95-
self.queued_messages.put(protocol_message)
96-
return
205+
self.queued_messages.put(pending_message)
206+
# For queued messages requiring ack, add to pending queue
207+
if pending_message.ack_required:
208+
self.pending_message_queue.push(pending_message)
209+
210+
if pending_message.ack_required:
211+
await pending_message.future
212+
return None
97213

98214
if self.state == ConnectionState.CONNECTED:
99215
if self.transport:
216+
# Add to pending queue before sending
217+
if pending_message.ack_required:
218+
self.pending_message_queue.push(pending_message)
219+
100220
await self.transport.send(protocol_message)
101221
else:
102222
log.exception(
103223
"ConnectionManager.send_protocol_message(): can not send message with no active transport"
104224
)
105-
return
106-
107-
raise AblyException(f"ConnectionManager.send_protocol_message(): called in {self.state}", 500, 50000)
225+
if pending_message.future:
226+
pending_message.future.set_exception(
227+
AblyException("No active transport", 500, 50000)
228+
)
229+
if pending_message.ack_required:
230+
await pending_message.future
231+
return None
232+
233+
error = AblyException(f"ConnectionManager.send_protocol_message(): called in {self.state}", 500, 50000)
234+
if pending_message.future:
235+
pending_message.future.set_exception(error)
236+
raise error
108237

109238
def send_queued_messages(self) -> None:
110239
log.info(f'ConnectionManager.send_queued_messages(): sending {self.queued_messages.qsize()} message(s)')
111240
while not self.queued_messages.empty():
112-
asyncio.create_task(self.send_protocol_message(self.queued_messages.get()))
241+
pending_message = self.queued_messages.get()
242+
asyncio.create_task(self.send_protocol_message(pending_message))
243+
244+
def requeue_pending_messages(self) -> None:
245+
"""RTN19a: Requeue messages awaiting ACK/NACK when transport disconnects
246+
247+
These messages will be resent when connection becomes CONNECTED again.
248+
RTN19a2: msgSerial is preserved for resume, reset for new connection.
249+
"""
250+
pending_count = self.pending_message_queue.count()
251+
if pending_count == 0:
252+
return
253+
254+
log.info(
255+
f'ConnectionManager.requeue_pending_messages(): '
256+
f'requeuing {pending_count} pending message(s) for resend'
257+
)
258+
259+
# Get all pending messages and add them back to the queue
260+
# They'll be sent again when we reconnect
261+
pending_messages = list(self.pending_message_queue.messages)
262+
263+
# Add back to front of queue (FIFO but priority over new messages)
264+
# Store the entire PendingMessage object to preserve Future
265+
for pending_msg in reversed(pending_messages):
266+
# PendingMessage object retains its Future, msgSerial
267+
self.queued_messages.put(pending_msg)
268+
269+
# Clear the message queue since we're requeueing them all
270+
# When they're resent, the existing Future will be resolved
271+
self.pending_message_queue.clear()
113272

114273
def fail_queued_messages(self, err) -> None:
115274
log.info(
116275
f"ConnectionManager.fail_queued_messages(): discarding {self.queued_messages.qsize()} messages;" +
117276
f" reason = {err}"
118277
)
278+
error = err or AblyException("Connection failed", 80000, 500)
119279
while not self.queued_messages.empty():
120-
msg = self.queued_messages.get()
121-
log.exception(f"ConnectionManager.fail_queued_messages(): Failed to send protocol message: {msg}")
280+
pending_msg = self.queued_messages.get()
281+
log.exception(
282+
f"ConnectionManager.fail_queued_messages(): Failed to send protocol message: "
283+
f"{pending_msg.message}"
284+
)
285+
# Fail the Future if it exists
286+
if pending_msg.future and not pending_msg.future.done():
287+
pending_msg.future.set_exception(error)
288+
289+
# Also fail all pending messages awaiting acknowledgment
290+
if self.pending_message_queue.count() > 0:
291+
count = self.pending_message_queue.count()
292+
log.info(
293+
f"ConnectionManager.fail_queued_messages(): failing {count} pending messages"
294+
)
295+
self.pending_message_queue.complete_all_messages(error)
122296

123297
async def ping(self) -> float:
124298
if self.__ping_future:
@@ -149,6 +323,16 @@ def on_connected(self, connection_details: ConnectionDetails, connection_id: str
149323
reason: AblyException | None = None) -> None:
150324
self.__fail_state = ConnectionState.DISCONNECTED
151325

326+
# RTN19a2: Reset msgSerial if connectionId changed (new connection)
327+
prev_connection_id = self.connection_id
328+
connection_id_changed = prev_connection_id is not None and prev_connection_id != connection_id
329+
330+
if connection_id_changed:
331+
log.info('ConnectionManager.on_connected(): New connectionId; resetting msgSerial')
332+
self.msg_serial = 0
333+
# Note: In JS they call resetSendAttempted() here, but we don't need it
334+
# because we fail all pending messages on disconnect per RTN7e
335+
152336
self.__connection_details = connection_details
153337
self.connection_id = connection_id
154338

@@ -244,7 +428,36 @@ def on_heartbeat(self, id: str | None) -> None:
244428
self.__ping_future.set_result(None)
245429
self.__ping_future = None
246430

431+
def on_ack(self, serial: int, count: int) -> None:
432+
"""Handle ACK protocol message from server
433+
434+
Args:
435+
serial: The msgSerial of the first message being acknowledged
436+
count: The number of messages being acknowledged
437+
"""
438+
log.debug(f'ConnectionManager.on_ack(): serial={serial}, count={count}')
439+
self.pending_message_queue.complete_messages(serial, count)
440+
441+
def on_nack(self, serial: int, count: int, err: AblyException | None) -> None:
442+
"""Handle NACK protocol message from server
443+
444+
Args:
445+
serial: The msgSerial of the first message being rejected
446+
count: The number of messages being rejected
447+
err: Error information from the server
448+
"""
449+
if not err:
450+
err = AblyException('Unable to send message; channel not responding', 50001, 500)
451+
452+
log.error(f'ConnectionManager.on_nack(): serial={serial}, count={count}, err={err}')
453+
self.pending_message_queue.complete_messages(serial, count, err)
454+
247455
def deactivate_transport(self, reason: AblyException | None = None):
456+
# RTN19a: Before disconnecting, requeue any pending messages
457+
# so they'll be resent on reconnection
458+
if self.transport:
459+
log.info('ConnectionManager.deactivate_transport(): requeuing pending messages')
460+
self.requeue_pending_messages()
248461
self.transport = None
249462
self.notify_state(ConnectionState.DISCONNECTED, reason)
250463

@@ -383,8 +596,16 @@ def notify_state(self, state: ConnectionState, reason: AblyException | None = No
383596
ConnectionState.SUSPENDED,
384597
ConnectionState.FAILED,
385598
):
599+
# RTN7e: Fail pending messages on SUSPENDED, CLOSED, FAILED
386600
self.fail_queued_messages(reason)
387601
self.ably.channels._propagate_connection_interruption(state, reason)
602+
elif state == ConnectionState.DISCONNECTED and not self.options.queue_messages:
603+
# RTN7d: If queueMessages is false, fail pending messages on DISCONNECTED
604+
log.info(
605+
'ConnectionManager.notify_state(): queueMessages is false; '
606+
'failing pending messages on DISCONNECTED'
607+
)
608+
self.fail_queued_messages(reason)
388609

389610
def start_transition_timer(self, state: ConnectionState, fail_state: ConnectionState | None = None) -> None:
390611
log.debug(f'ConnectionManager.start_transition_timer(): transition state = {state}')
@@ -466,6 +687,8 @@ def cancel_retry_timer(self) -> None:
466687
def disconnect_transport(self) -> None:
467688
log.info('ConnectionManager.disconnect_transport()')
468689
if self.transport:
690+
# RTN19a: Requeue pending messages before disposing transport
691+
self.requeue_pending_messages()
469692
self.disconnect_transport_task = asyncio.create_task(self.transport.dispose())
470693

471694
async def on_auth_updated(self, token_details: TokenDetails):

0 commit comments

Comments
 (0)