|
1 | 1 | from __future__ import annotations |
| 2 | + |
2 | 3 | import asyncio |
3 | 4 | import logging |
4 | 5 | from typing import Optional, TYPE_CHECKING, Dict, Any, Union |
5 | 6 | from ably.realtime.connection import ConnectionState |
6 | | -from ably.transport.websockettransport import ProtocolMessageAction |
7 | 7 | from ably.rest.channel import Channel, Channels as RestChannels |
| 8 | +from ably.transport.websockettransport import ProtocolMessageAction |
8 | 9 | from ably.types.channelstate import ChannelState, ChannelStateChange |
9 | 10 | from ably.types.flags import Flag, has_flag |
10 | 11 | from ably.types.message import Message |
| 12 | +from ably.types.mixins import DecodingContext |
11 | 13 | from ably.util.eventemitter import EventEmitter |
12 | 14 | from ably.util.exceptions import AblyException |
13 | 15 | from ably.util.helper import Timer, is_callable_or_coroutine |
@@ -134,6 +136,11 @@ def __init__(self, realtime: AblyRealtime, name: str, channel_options: Optional[ |
134 | 136 | self.__error_reason: Optional[AblyException] = None |
135 | 137 | self.__channel_options = channel_options or ChannelOptions() |
136 | 138 |
|
| 139 | + # Delta-specific fields for RTL19/RTL20 compliance |
| 140 | + vcdiff_decoder = self.__realtime.options.vcdiff_decoder if self.__realtime.options.vcdiff_decoder else None |
| 141 | + self.__decoding_context = DecodingContext(vcdiff_decoder=vcdiff_decoder) |
| 142 | + self.__decode_failure_recovery_in_progress = False |
| 143 | + |
137 | 144 | # Used to listen to state changes internally, if we use the public event emitter interface then internals |
138 | 145 | # will be disrupted if the user called .off() to remove all listeners |
139 | 146 | self.__internal_state_emitter = EventEmitter() |
@@ -420,8 +427,16 @@ def _on_message(self, proto_msg: dict) -> None: |
420 | 427 | else: |
421 | 428 | self._request_state(ChannelState.ATTACHING) |
422 | 429 | elif action == ProtocolMessageAction.MESSAGE: |
423 | | - messages = Message.from_encoded_array(proto_msg.get('messages')) |
424 | | - self.__channel_serial = channel_serial |
| 430 | + messages = [] |
| 431 | + try: |
| 432 | + messages = Message.from_encoded_array(proto_msg.get('messages'), context=self.__decoding_context) |
| 433 | + self.__decoding_context.last_message_id = messages[-1].id |
| 434 | + self.__channel_serial = channel_serial |
| 435 | + except AblyException as e: |
| 436 | + if e.code == 40018: # Delta decode failure - start recovery |
| 437 | + self._start_decode_failure_recovery(e) |
| 438 | + else: |
| 439 | + log.error(f"Message processing error {e}. Skip messages {proto_msg.get('messages')}") |
425 | 440 | for message in messages: |
426 | 441 | self.__message_emitter._emit(message.name, message) |
427 | 442 | elif action == ProtocolMessageAction.ERROR: |
@@ -463,6 +478,9 @@ def _notify_state(self, state: ChannelState, reason: Optional[AblyException] = N |
463 | 478 | if state in (ChannelState.DETACHED, ChannelState.SUSPENDED, ChannelState.FAILED): |
464 | 479 | self.__channel_serial = None |
465 | 480 |
|
| 481 | + if state != ChannelState.ATTACHING: |
| 482 | + self.__decode_failure_recovery_in_progress = False |
| 483 | + |
466 | 484 | state_change = ChannelStateChange(self.__state, state, resumed, reason=reason) |
467 | 485 |
|
468 | 486 | self.__state = state |
@@ -553,6 +571,24 @@ def error_reason(self) -> Optional[AblyException]: |
553 | 571 | """An AblyException instance describing the last error which occurred on the channel, if any.""" |
554 | 572 | return self.__error_reason |
555 | 573 |
|
| 574 | + def _start_decode_failure_recovery(self, error: AblyException) -> None: |
| 575 | + """Start RTL18 decode failure recovery procedure""" |
| 576 | + |
| 577 | + if self.__decode_failure_recovery_in_progress: |
| 578 | + log.info('VCDiff recovery process already started, skipping') |
| 579 | + return |
| 580 | + |
| 581 | + self.__decode_failure_recovery_in_progress = True |
| 582 | + |
| 583 | + # RTL18a: Log error with code 40018 |
| 584 | + log.error(f'VCDiff decode failure: {error}') |
| 585 | + |
| 586 | + # RTL18b: Message is already discarded by not processing it |
| 587 | + |
| 588 | + # RTL18c: Send ATTACH with previous channel serial and transition to ATTACHING |
| 589 | + self._notify_state(ChannelState.ATTACHING, reason=error) |
| 590 | + self._check_pending_state() |
| 591 | + |
556 | 592 |
|
557 | 593 | class Channels(RestChannels): |
558 | 594 | """Creates and destroys RealtimeChannel objects. |
|
0 commit comments