Skip to content

Commit 5cdf7bc

Browse files
authored
fix: send-message pending task race condition (#3538)
## 🎯 Goal This PR resolves a very long standing race condition that has evaded me for quite a while. Whenever we queue up send message requests while we're offline, they are to be executed when we regain connection. However, the UI SDK takes care of failed messages in the sense of re-adding them in case they weren't resolved or saved in the LLC. This is done so that when a resync happens of the channel, the state is preserved and the failed messages do not disappear. However, since our state layer is not LLC only fully, we have 2 vectors of resyncing the SDK: - One that comes from the LLC (i.e `channel.watch()` being called) - One that comes from the SDK itself (re-adding those failed messages) When `send-message` pending tasks are executed, the LLC takes care of upserting the state immediately so that the UI SDK can simply consume it. This is fine, except for the fact that for the UI SDK to also resolve its own state, we rely on WS events (which might be a bit late, especially for the last 1-2 messages, which is when this bug would mostly happen). So the flow would look something like this: - We regain connection - Pending tasks are executed, `channel.state.messages` are updated (from the pending task execution) - After this, the `onSyncStatusChange` callback is invoked - The `channel` is actually resynced - `channel.watch()` is called - We also upsert all failed messages once again - For any `message` for which we did not receive a WS event of the successful pending task execution on time, both the failed `message` and the actual (serverside) one appear in the list To fix this, we deem messages eligible for recovery if they: - are `FAILED` - are not already present in `channel.state` This makes sure that the race condition never happens when the state is already up to date. If it's not, we anyway have to do it. ## 🛠 Implementation details <!-- Provide a description of the implementation --> ## 🎨 UI Changes <!-- Add relevant screenshots --> <details> <summary>iOS</summary> <table> <thead> <tr> <td>Before</td> <td>After</td> </tr> </thead> <tbody> <tr> <td> <!--<img src="" /> --> </td> <td> <!--<img src="" /> --> </td> </tr> </tbody> </table> </details> <details> <summary>Android</summary> <table> <thead> <tr> <td>Before</td> <td>After</td> </tr> </thead> <tbody> <tr> <td> <!--<img src="" /> --> </td> <td> <!--<img src="" /> --> </td> </tr> </tbody> </table> </details> ## 🧪 Testing <!-- Explain how this change can be tested (or why it can't be tested) --> ## ☑️ Checklist - [ ] I have signed the [Stream CLA](https://docs.google.com/forms/d/e/1FAIpQLScFKsKkAJI7mhCr7K9rEIOpqIDThrWxuvxnwUq2XkHyG154vQ/viewform) (required) - [ ] PR targets the `develop` branch - [ ] Documentation is updated - [ ] New code is tested in main example apps, including all possible scenarios - [ ] SampleApp iOS and Android - [ ] Expo iOS and Android
1 parent 369c785 commit 5cdf7bc

File tree

2 files changed

+136
-8
lines changed

2 files changed

+136
-8
lines changed

package/src/__tests__/offline-support/optimistic-update.js

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -720,6 +720,131 @@ export const OptimisticUpdates = () => {
720720
expect(sendMessageSpy).toHaveBeenCalled();
721721
});
722722
});
723+
724+
it('should not re-add a failed local message after reconnect when its pending send task was resolved', async () => {
725+
const localMessage = generateMessage({
726+
status: MessageStatusTypes.SENDING,
727+
text: 'offline resend',
728+
user: chatClient.user,
729+
userId: chatClient.userID,
730+
});
731+
const serverMessage = generateMessage({
732+
id: localMessage.id,
733+
text: localMessage.text,
734+
user: chatClient.user,
735+
userId: chatClient.userID,
736+
});
737+
738+
jest.spyOn(channel.messageComposer, 'compose').mockResolvedValue({
739+
localMessage,
740+
message: localMessage,
741+
options: {},
742+
});
743+
744+
render(
745+
<Chat client={chatClient} enableOfflineSupport>
746+
<Channel channel={channel} initialValue={localMessage.text}>
747+
<CallbackEffectWithContext
748+
callback={async ({ sendMessage }) => {
749+
useMockedApis(chatClient, [erroredPostApi()]);
750+
await sendMessage();
751+
}}
752+
context={MessageInputContext}
753+
>
754+
<View testID='children' />
755+
</CallbackEffectWithContext>
756+
</Channel>
757+
</Chat>,
758+
);
759+
await waitFor(() => expect(screen.getByTestId('children')).toBeTruthy());
760+
761+
let pendingTask;
762+
await waitFor(async () => {
763+
const pendingTasks = await chatClient.offlineDb.getPendingTasks();
764+
expect(pendingTasks).toHaveLength(1);
765+
pendingTask = pendingTasks[0];
766+
});
767+
768+
expect(channel.state.messages.some((message) => message.id === localMessage.id)).toBe(true);
769+
770+
jest.spyOn(channel, 'watch').mockResolvedValue({});
771+
772+
channel.state.removeMessage(localMessage);
773+
channel.state.addMessageSorted(serverMessage, true);
774+
await chatClient.offlineDb.deletePendingTask({ id: pendingTask.id });
775+
776+
await act(async () => {
777+
await chatClient.offlineDb.syncManager.invokeSyncStatusListeners(true);
778+
});
779+
780+
await waitFor(() => {
781+
const matchingMessages = channel.state.messages.filter(
782+
(message) => message.text === localMessage.text,
783+
);
784+
785+
expect(matchingMessages).toHaveLength(1);
786+
expect(matchingMessages[0].id).toBe(serverMessage.id);
787+
expect(matchingMessages[0].status).not.toBe(MessageStatusTypes.FAILED);
788+
});
789+
});
790+
791+
it('should re-add a failed local message after reconnect when fresh state still does not contain it', async () => {
792+
const localMessage = generateMessage({
793+
status: MessageStatusTypes.SENDING,
794+
text: 'offline resend unresolved',
795+
user: chatClient.user,
796+
userId: chatClient.userID,
797+
});
798+
799+
jest.spyOn(channel.messageComposer, 'compose').mockResolvedValue({
800+
localMessage,
801+
message: localMessage,
802+
options: {},
803+
});
804+
805+
render(
806+
<Chat client={chatClient} enableOfflineSupport>
807+
<Channel channel={channel} initialValue={localMessage.text}>
808+
<CallbackEffectWithContext
809+
callback={async ({ sendMessage }) => {
810+
useMockedApis(chatClient, [erroredPostApi()]);
811+
await sendMessage();
812+
}}
813+
context={MessageInputContext}
814+
>
815+
<View testID='children' />
816+
</CallbackEffectWithContext>
817+
</Channel>
818+
</Chat>,
819+
);
820+
await waitFor(() => expect(screen.getByTestId('children')).toBeTruthy());
821+
822+
let pendingTask;
823+
await waitFor(async () => {
824+
const pendingTasks = await chatClient.offlineDb.getPendingTasks();
825+
expect(pendingTasks).toHaveLength(1);
826+
pendingTask = pendingTasks[0];
827+
});
828+
829+
jest.spyOn(channel, 'watch').mockResolvedValue({});
830+
831+
channel.state.removeMessage(localMessage);
832+
await chatClient.offlineDb.deletePendingTask({ id: pendingTask.id });
833+
834+
await act(async () => {
835+
await chatClient.offlineDb.syncManager.invokeSyncStatusListeners(true);
836+
});
837+
838+
await waitFor(() => {
839+
const matchingMessages = channel.state.messages.filter(
840+
(message) => message.id === localMessage.id,
841+
);
842+
843+
expect(matchingMessages).toHaveLength(1);
844+
expect(matchingMessages[0].status).toBe(MessageStatusTypes.FAILED);
845+
expect(matchingMessages[0].text).toBe(localMessage.text);
846+
});
847+
});
723848
});
724849
});
725850
};

package/src/components/Channel/Channel.tsx

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,6 +1167,15 @@ const ChannelWithContext = (props: PropsWithChildren<ChannelPropsWithContext>) =
11671167
updated_at: message.updated_at?.toString(),
11681168
}) as unknown as MessageResponse;
11691169

1170+
const getRecoverableFailedMessages = (messages: LocalMessage[] = []) =>
1171+
messages
1172+
.filter(
1173+
(message) =>
1174+
message.status === MessageStatusTypes.FAILED &&
1175+
!channel.state.findMessage(message.id, message.parent_id),
1176+
)
1177+
.map(parseMessage);
1178+
11701179
try {
11711180
if (channelMessagesState?.messages) {
11721181
await channel?.watch({
@@ -1181,9 +1190,7 @@ const ChannelWithContext = (props: PropsWithChildren<ChannelPropsWithContext>) =
11811190
if (!thread) {
11821191
copyChannelState();
11831192

1184-
const failedMessages = channelMessagesState.messages
1185-
?.filter((message) => message.status === MessageStatusTypes.FAILED)
1186-
.map(parseMessage);
1193+
const failedMessages = getRecoverableFailedMessages(channelMessagesState.messages);
11871194
if (failedMessages?.length) {
11881195
channel.state.addMessagesSorted(failedMessages);
11891196
}
@@ -1192,11 +1199,7 @@ const ChannelWithContext = (props: PropsWithChildren<ChannelPropsWithContext>) =
11921199
} else {
11931200
await reloadThread();
11941201

1195-
const failedThreadMessages = thread
1196-
? threadMessages
1197-
.filter((message) => message.status === MessageStatusTypes.FAILED)
1198-
.map(parseMessage)
1199-
: [];
1202+
const failedThreadMessages = thread ? getRecoverableFailedMessages(threadMessages) : [];
12001203
if (failedThreadMessages.length) {
12011204
channel.state.addMessagesSorted(failedThreadMessages);
12021205
setThreadMessages([...channel.state.threads[thread.id]]);

0 commit comments

Comments
 (0)