@@ -3,6 +3,10 @@ import { DurableObject } from 'cloudflare:workers';
33interface AircraftEntry {
44 data : Record < string , unknown > ;
55 receivedAt : number ; // epoch seconds
6+ // WebSockets that have reported this aircraft, keyed by their last
7+ // contribution timestamp. Lets us tell each recipient how many OTHER
8+ // peers also see the aircraft, so the UI can render "also seen by N".
9+ contributors : Map < WebSocket , number > ;
610}
711
812interface TokenRecord {
@@ -143,7 +147,7 @@ export class RelayDurableObject extends DurableObject {
143147 } ) ) ;
144148 }
145149
146- override webSocketMessage ( _ws : WebSocket , message : string | ArrayBuffer ) : void {
150+ override webSocketMessage ( ws : WebSocket , message : string | ArrayBuffer ) : void {
147151 if ( typeof message !== 'string' ) return ;
148152
149153 let parsed : Record < string , unknown > ;
@@ -163,17 +167,31 @@ export class RelayDurableObject extends DurableObject {
163167 if ( typeof ac !== 'object' || ac === null ) continue ;
164168 const icao = ( ac as Record < string , unknown > ) [ 'icao' ] ;
165169 if ( typeof icao !== 'string' || ! icao ) continue ;
166- this . aggregate . set ( icao . toLowerCase ( ) , { data : ac as Record < string , unknown > , receivedAt : now } ) ;
170+ const key = icao . toLowerCase ( ) ;
171+ const existing = this . aggregate . get ( key ) ;
172+ if ( existing ) {
173+ existing . data = ac as Record < string , unknown > ;
174+ existing . receivedAt = now ;
175+ existing . contributors . set ( ws , now ) ;
176+ } else {
177+ this . aggregate . set ( key , {
178+ data : ac as Record < string , unknown > ,
179+ receivedAt : now ,
180+ contributors : new Map ( [ [ ws , now ] ] ) ,
181+ } ) ;
182+ }
167183 }
168184 }
169185
170186 override webSocketClose ( ws : WebSocket ) : void {
171187 this . connections . delete ( ws ) ;
188+ this . dropContributor ( ws ) ;
172189 console . log ( JSON . stringify ( { event : 'disconnect' , connections : this . connections . size } ) ) ;
173190 }
174191
175192 override webSocketError ( ws : WebSocket , error : unknown ) : void {
176193 this . connections . delete ( ws ) ;
194+ this . dropContributor ( ws ) ;
177195 try { ws . close ( ) ; } catch { /* already closed */ }
178196 console . log ( JSON . stringify ( {
179197 event : 'ws_error' ,
@@ -182,6 +200,15 @@ export class RelayDurableObject extends DurableObject {
182200 } ) ) ;
183201 }
184202
203+ // Remove `ws` from every aircraft's contributor map. Called on clean
204+ // disconnect so the seen_by_others count drops immediately rather than
205+ // waiting up to STALE_S for the lazy eviction in broadcast() to catch up.
206+ private dropContributor ( ws : WebSocket ) : void {
207+ for ( const entry of this . aggregate . values ( ) ) {
208+ entry . contributors . delete ( ws ) ;
209+ }
210+ }
211+
185212 override async alarm ( ) : Promise < void > {
186213 this . broadcast ( ) ;
187214 // Re-schedule unless there are no connections (DO will hibernate)
@@ -196,27 +223,34 @@ export class RelayDurableObject extends DurableObject {
196223 const now = Date . now ( ) / 1000 ;
197224 const cutoff = now - STALE_S ;
198225
199- // Evict stale and collect fresh
200- const fresh : Record < string , unknown > [ ] = [ ] ;
226+ // Evict stale aggregate entries; for each surviving entry also sweep
227+ // out per-WS contributor timestamps that have aged past the same
228+ // cutoff, so seen_by_others counts a contributor only as long as
229+ // they're actively reporting the aircraft.
230+ const fresh : AircraftEntry [ ] = [ ] ;
201231 for ( const [ icao , entry ] of this . aggregate ) {
202232 if ( entry . receivedAt < cutoff ) {
203233 this . aggregate . delete ( icao ) ;
204- } else {
205- fresh . push ( entry . data ) ;
234+ continue ;
235+ }
236+ for ( const [ ws , ts ] of entry . contributors ) {
237+ if ( ts < cutoff ) entry . contributors . delete ( ws ) ;
206238 }
239+ fresh . push ( entry ) ;
207240 }
208241
209- if ( fresh . length === 0 && this . connections . size === 0 ) return ;
210-
211- // Count of "other" peers from any single recipient's perspective —
212- // every connected client is one of `connections`, so each sees
213- // `connections.size - 1` others. Same number for every recipient,
214- // so one shared payload is correct.
242+ // Total connected count goes in the envelope; per-aircraft counts go
243+ // in `seen_by_others` and are computed per-recipient (each WS subtracts
244+ // itself from every aircraft it's contributing to).
215245 const peers = Math . max ( 0 , this . connections . size - 1 ) ;
216- const payload = JSON . stringify ( { type : 'aggregate' , peers, aircraft : fresh } ) ;
217246 const dead : WebSocket [ ] = [ ] ;
218247
219248 for ( const ws of this . connections ) {
249+ const aircraft = fresh . map ( entry => ( {
250+ ...entry . data ,
251+ seen_by_others : entry . contributors . size - ( entry . contributors . has ( ws ) ? 1 : 0 ) ,
252+ } ) ) ;
253+ const payload = JSON . stringify ( { type : 'aggregate' , peers, aircraft } ) ;
220254 try {
221255 ws . send ( payload ) ;
222256 } catch {
@@ -226,6 +260,7 @@ export class RelayDurableObject extends DurableObject {
226260
227261 for ( const ws of dead ) {
228262 this . connections . delete ( ws ) ;
263+ this . dropContributor ( ws ) ;
229264 try { ws . close ( ) ; } catch { /* already closed */ }
230265 }
231266 }
0 commit comments