@@ -26,7 +26,14 @@ EventTransformer<E> throttleDroppable<E>(Duration duration) {
2626 };
2727}
2828
29- EventTransformer <Event > debounce <Event >({
29+ EventTransformer <E > typingThrottleDroppable <E >() {
30+ Duration duration = const Duration (milliseconds: 5000 );
31+ return (events, mapper) {
32+ return droppable <E >().call (events.throttle (duration), mapper);
33+ };
34+ }
35+
36+ EventTransformer <Event > readDebounce <Event >({
3037 Duration duration = const Duration (milliseconds: 500 ),
3138}) {
3239 return (events, mapper) => events.debounce (duration).switchMap (mapper);
@@ -40,6 +47,7 @@ class ConversationBloc extends Bloc<ConversationEvent, ConversationState> {
4047
4148 StreamSubscription <ChatMessage >? incomingMessagesSubscription;
4249 StreamSubscription <MessageSendStatus >? statusMessagesSubscription;
50+ StreamSubscription <TypingStatus >? typingMessageSubscription;
4351 StreamSubscription <Map <String , dynamic >>? lastActivitySubscription;
4452 StreamSubscription <ConversationModel ?>? conversationWatcher;
4553
@@ -76,7 +84,7 @@ class ConversationBloc extends Bloc<ConversationEvent, ConversationState> {
7684 );
7785 on < _ReadStatusReceived > (
7886 _onReadStatusReceived,
79- transformer: debounce (),
87+ transformer: readDebounce (),
8088 );
8189 on < _FailedStatusReceived > (
8290 _onFailedStatusReceived,
@@ -87,6 +95,13 @@ class ConversationBloc extends Bloc<ConversationEvent, ConversationState> {
8795 on < ConversationDeleted > (
8896 _onConversationDeleted,
8997 );
98+ on < TypingStatusStartReceived > (
99+ _onTypingStatusStartReceived,
100+ transformer: typingThrottleDroppable (),
101+ );
102+ on < TypingStatusStopReceived > (
103+ _onTypingStatusStopReceived,
104+ );
90105
91106 add (const ParticipantsReceived ());
92107
@@ -126,6 +141,17 @@ class ConversationBloc extends Bloc<ConversationEvent, ConversationState> {
126141 }
127142 });
128143
144+ typingMessageSubscription =
145+ messagesRepository.typingMessageStream.listen ((typing) async {
146+ if (typing.cid == currentConversation.id) {
147+ if (typing.state == TypingState .start) {
148+ add (TypingStatusStartReceived (typing.from! ));
149+ } else if (typing.state == TypingState .stop) {
150+ add (TypingStatusStopReceived (typing.from! ));
151+ }
152+ }
153+ });
154+
129155 lastActivitySubscription =
130156 userRepository.lastActivityStream.listen ((data) async {
131157 var recentActivity = data[currentConversation.opponent? .id];
@@ -270,6 +296,20 @@ class ConversationBloc extends Bloc<ConversationEvent, ConversationState> {
270296 : emit (state.copyWith (status: ConversationStatus .failure));
271297 }
272298
299+ Future <void > _onTypingStatusStartReceived (
300+ TypingStatusStartReceived event, Emitter <ConversationState > emit) async {
301+ var user = await userRepository.getUserById (event.from);
302+ emit (state.copyWith (
303+ typingStatus: TypingMessageStatus (TypingState .start, user)));
304+ }
305+
306+ Future <void > _onTypingStatusStopReceived (
307+ TypingStatusStopReceived event, Emitter <ConversationState > emit) async {
308+ var user = await userRepository.getUserById (event.from);
309+ emit (state.copyWith (
310+ typingStatus: TypingMessageStatus (TypingState .stop, user)));
311+ }
312+
273313 FutureOr <void > _onMessageReceived (
274314 _MessageReceived event, Emitter <ConversationState > emit) {
275315 var messages = [...state.messages];
@@ -365,6 +405,7 @@ class ConversationBloc extends Bloc<ConversationEvent, ConversationState> {
365405 unsubscribeOpponentLastActivity ();
366406 incomingMessagesSubscription? .cancel ();
367407 statusMessagesSubscription? .cancel ();
408+ typingMessageSubscription? .cancel ();
368409 lastActivitySubscription? .cancel ();
369410 conversationWatcher? .cancel ();
370411 return super .close ();
0 commit comments