@@ -39,6 +39,63 @@ const getAttachmentFingerprint = (attachments: Message["attachments"] = []) =>
3939 } )
4040 . join ( "::" ) ;
4141
42+ const EPHEMERAL_RECONCILE_WINDOW_MS = 15_000 ;
43+ const BROADCAST_DUPLICATE_WINDOW_MS = 1_500 ;
44+
45+ const isEphemeralMessageId = ( messageId : string ) =>
46+ messageId . startsWith ( "temp-" ) || messageId . startsWith ( "live-" ) ;
47+
48+ const isCreatedWithinWindow = (
49+ candidateCreatedAt : string ,
50+ incomingCreatedAt : string ,
51+ windowMs = EPHEMERAL_RECONCILE_WINDOW_MS ,
52+ ) => {
53+ const candidateTimestamp = Date . parse ( candidateCreatedAt ) ;
54+ const incomingTimestamp = Date . parse ( incomingCreatedAt ) ;
55+
56+ if ( ! Number . isFinite ( candidateTimestamp ) || ! Number . isFinite ( incomingTimestamp ) ) {
57+ return true ;
58+ }
59+
60+ return Math . abs ( incomingTimestamp - candidateTimestamp ) <= windowMs ;
61+ } ;
62+
63+ const normalizeAttachments = ( attachments : unknown ) : Message [ "attachments" ] => {
64+ if ( ! Array . isArray ( attachments ) ) return [ ] ;
65+
66+ return attachments
67+ . map ( ( rawAttachment ) => {
68+ if ( ! rawAttachment || typeof rawAttachment !== "object" ) return null ;
69+
70+ const attachment = rawAttachment as Record < string , unknown > ;
71+ const filename =
72+ typeof attachment . filename === "string" ? attachment . filename : "" ;
73+ const mimetype =
74+ typeof attachment . mimetype === "string" ? attachment . mimetype : "" ;
75+ const publicUrl =
76+ typeof attachment . public_url === "string" ? attachment . public_url : "" ;
77+ const rawFilesize = attachment . filesize ;
78+ const filesize =
79+ typeof rawFilesize === "number"
80+ ? rawFilesize
81+ : typeof rawFilesize === "string"
82+ ? Number ( rawFilesize )
83+ : 0 ;
84+
85+ return {
86+ filename,
87+ mimetype,
88+ filesize : Number . isFinite ( filesize ) ? filesize : 0 ,
89+ public_url : publicUrl ,
90+ } ;
91+ } )
92+ . filter (
93+ (
94+ attachment ,
95+ ) : attachment is Message [ "attachments" ] [ number ] => attachment !== null ,
96+ ) ;
97+ } ;
98+
4299export function useActiveConversationStream ( {
43100 supabase,
44101 conversationId,
@@ -91,6 +148,114 @@ export function useActiveConversationStream({
91148 setRemoteTypingState ( conversationId , null ) ;
92149 } ,
93150 )
151+ . on (
152+ "broadcast" ,
153+ {
154+ event : "message" ,
155+ } ,
156+ ( { payload } ) => {
157+ const messagePayload = payload as {
158+ conversation_id ?: string ;
159+ sender_id ?: string ;
160+ text ?: string ;
161+ attachments ?: unknown ;
162+ created_at ?: string ;
163+ client_message_id ?: string ;
164+ } ;
165+
166+ if ( messagePayload . conversation_id !== conversationId ) return ;
167+ if ( ! messagePayload . sender_id || messagePayload . sender_id === userId ) {
168+ return ;
169+ }
170+ const senderId = messagePayload . sender_id ;
171+
172+ const incomingCreatedAt =
173+ typeof messagePayload . created_at === "string"
174+ ? messagePayload . created_at
175+ : new Date ( ) . toISOString ( ) ;
176+ const incomingAttachments = normalizeAttachments (
177+ messagePayload . attachments ,
178+ ) ;
179+ const clientMessageId =
180+ typeof messagePayload . client_message_id === "string" &&
181+ messagePayload . client_message_id . length > 0
182+ ? messagePayload . client_message_id
183+ : `${ Date . now ( ) } -${ Math . random ( ) . toString ( 36 ) . slice ( 2 , 8 ) } ` ;
184+ const liveMessageId = `live-${ clientMessageId } ` ;
185+
186+ setMessages ( ( prev ) => {
187+ if ( prev . some ( ( message ) => message . id === liveMessageId ) ) {
188+ return prev ;
189+ }
190+
191+ const incomingText = messagePayload . text ?? "" ;
192+ const incomingFingerprint = getAttachmentFingerprint ( incomingAttachments ) ;
193+
194+ const hasMatchingMessage = prev . some ( ( message ) => {
195+ if ( isEphemeralMessageId ( message . id ) ) return false ;
196+ if ( message . sender_id !== senderId ) return false ;
197+ if ( message . conversation_id !== conversationId ) return false ;
198+ if ( message . text !== incomingText ) return false ;
199+ if (
200+ getAttachmentFingerprint ( message . attachments ) !== incomingFingerprint
201+ ) {
202+ return false ;
203+ }
204+
205+ return isCreatedWithinWindow (
206+ message . created_at ,
207+ incomingCreatedAt ,
208+ BROADCAST_DUPLICATE_WINDOW_MS ,
209+ ) ;
210+ } ) ;
211+
212+ if ( hasMatchingMessage ) {
213+ return prev ;
214+ }
215+
216+ return [
217+ ...prev ,
218+ {
219+ id : liveMessageId ,
220+ conversation_id : conversationId ,
221+ sender_id : senderId ,
222+ text : incomingText ,
223+ attachments : incomingAttachments ,
224+ created_at : incomingCreatedAt ,
225+ } ,
226+ ] ;
227+ } ) ;
228+
229+ void markConversationAsRead ( conversationId ) ;
230+
231+ window . setTimeout ( ( ) => {
232+ bottomRef . current ?. scrollIntoView ( { behavior : "smooth" } ) ;
233+ } , 100 ) ;
234+ } ,
235+ )
236+ . on (
237+ "broadcast" ,
238+ {
239+ event : "message_retract" ,
240+ } ,
241+ ( { payload } ) => {
242+ const retractPayload = payload as {
243+ conversation_id ?: string ;
244+ sender_id ?: string ;
245+ client_message_id ?: string ;
246+ } ;
247+
248+ if ( retractPayload . conversation_id !== conversationId ) return ;
249+ if ( retractPayload . sender_id === userId ) return ;
250+ if ( ! retractPayload . client_message_id ) return ;
251+
252+ const liveMessageId = `live-${ retractPayload . client_message_id } ` ;
253+
254+ setMessages ( ( prev ) =>
255+ prev . filter ( ( message ) => message . id !== liveMessageId ) ,
256+ ) ;
257+ } ,
258+ )
94259 . on (
95260 "postgres_changes" ,
96261 {
@@ -105,7 +270,7 @@ export function useActiveConversationStream({
105270 conversation_id : payload . new . conversation_id ,
106271 sender_id : payload . new . sender_id ,
107272 text : payload . new . text ,
108- attachments : payload . new . attachments ?? [ ] ,
273+ attachments : normalizeAttachments ( payload . new . attachments ) ,
109274 created_at : payload . new . created_at ,
110275 } ;
111276
@@ -119,12 +284,20 @@ export function useActiveConversationStream({
119284 ) ;
120285
121286 const optimisticMessageIndex = prev . findIndex ( ( message ) => {
122- if ( ! message . id . startsWith ( "temp-" ) ) return false ;
287+ if ( ! isEphemeralMessageId ( message . id ) ) return false ;
123288 if ( message . sender_id !== incomingMessage . sender_id ) return false ;
124289 if ( message . conversation_id !== incomingMessage . conversation_id ) {
125290 return false ;
126291 }
127292 if ( message . text !== incomingMessage . text ) return false ;
293+ if (
294+ ! isCreatedWithinWindow (
295+ message . created_at ,
296+ incomingMessage . created_at ,
297+ )
298+ ) {
299+ return false ;
300+ }
128301
129302 return (
130303 getAttachmentFingerprint ( message . attachments ) ===
0 commit comments