@@ -12,8 +12,13 @@ open class Realtime : Service {
1212 private let HEARTBEAT_INTERVAL : UInt64 = 20_000_000_000 // 20 seconds in nanoseconds
1313
1414 private var socketClient : WebSocketClient ? = nil
15- private var activeChannels = Set < String > ( )
15+ // Slot-centric state: Map<slot, { channels: Set<String>, queries: [String], callback: Function }>
1616 private var activeSubscriptions = [ Int: RealtimeCallback] ( )
17+ private var activeSubscriptionQueries = [ Int: [ String] ] ( ) // Map slot -> queries array
18+ // Map slot index -> subscriptionId (from backend)
19+ private var slotToSubscriptionId = [ Int: String] ( )
20+ // Inverse map: subscriptionId -> slot index (for O(1) lookup)
21+ private var subscriptionIdToSlot = [ String: Int] ( )
1722 private var heartbeatTask : Task < Void , Swift . Error > ? = nil
1823
1924 let connectSync = DispatchQueue ( label: " ConnectSync " )
@@ -63,16 +68,45 @@ open class Realtime : Service {
6368 }
6469
6570 private func createSocket( ) async throws {
66- guard activeChannels. count > 0 else {
71+ // Rebuild activeChannels from all slots
72+ var allChannels = Set < String > ( )
73+ for subscription in activeSubscriptions. values {
74+ allChannels. formUnion ( subscription. channels)
75+ }
76+
77+ guard allChannels. count > 0 else {
6778 reconnect = false
6879 try await closeSocket ( )
6980 return
7081 }
7182
7283 var queryParams = " project= \( client. config [ " project " ] !) "
7384
74- for channel in activeChannels {
75- queryParams += " &channels[]= \( channel) "
85+ for channel in allChannels {
86+ let encodedChannel = channel. addingPercentEncoding ( withAllowedCharacters: . urlQueryAllowed) ?? channel
87+ queryParams += " &channels[]= \( encodedChannel) "
88+ }
89+
90+ // Build query string from slots → channels → queries
91+ // Format: channel[slot][]=query (each query sent as separate parameter)
92+ // For each slot, repeat its queries under each channel it subscribes to
93+ // Example: slot 1 → channels [tests, prod], queries [q1, q2]
94+ // Produces: tests[1][]=q1&tests[1][]=q2&prod[1][]=q1&prod[1][]=q2
95+ let selectAllQuery = Query . select ( [ " * " ] )
96+ for (slot, subscription) in activeSubscriptions {
97+ // Get queries array - each query is a separate string
98+ let queries = activeSubscriptionQueries [ slot] ?? [ ]
99+ let queryArray = queries. isEmpty ? [ selectAllQuery] : queries
100+
101+ // Repeat this slot's queries under each channel it subscribes to
102+ // Each query is sent as a separate parameter: channel[slot][]=q1&channel[slot][]=q2
103+ for channel in subscription. channels {
104+ let encodedChannel = channel. addingPercentEncoding ( withAllowedCharacters: . urlQueryAllowed) ?? channel
105+ for query in queryArray {
106+ let encodedQuery = query. addingPercentEncoding ( withAllowedCharacters: . urlQueryAllowed) ?? query
107+ queryParams += " & \( encodedChannel) [ \( slot) ][]= \( encodedQuery) "
108+ }
109+ }
76110 }
77111
78112 let url = " \( client. endPointRealtime!) /realtime? \( queryParams) "
@@ -127,70 +161,83 @@ open class Realtime : Service {
127161
128162 public func subscribe(
129163 channel: ChannelValue ,
130- callback: @escaping ( RealtimeResponseEvent ) -> Void
164+ callback: @escaping ( RealtimeResponseEvent ) -> Void ,
165+ queries: [ String ] = [ ]
131166 ) async throws -> RealtimeSubscription {
132167 return try await subscribe (
133- channels: [ channel] ,
168+ channels: Set ( [ channelToString ( channel) ] ) ,
134169 payloadType: String . self,
170+ queries: queries,
135171 callback: callback
136172 )
137173 }
138174
139175 public func subscribe(
140176 channels: [ ChannelValue ] ,
141- callback: @escaping ( RealtimeResponseEvent ) -> Void
177+ callback: @escaping ( RealtimeResponseEvent ) -> Void ,
178+ queries: [ String ] = [ ]
142179 ) async throws -> RealtimeSubscription {
143180 return try await subscribe (
144181 channels: Set ( channels. map { channelToString ( $0) } ) ,
145182 payloadType: String . self,
183+ queries: queries,
146184 callback: callback
147185 )
148186 }
149187
150188 public func subscribe< T : Codable > (
151189 channel: ChannelValue ,
152190 payloadType: T . Type ,
153- callback: @escaping ( RealtimeResponseEvent ) -> Void
191+ callback: @escaping ( RealtimeResponseEvent ) -> Void ,
192+ queries: [ String ] = [ ]
154193 ) async throws -> RealtimeSubscription {
155194 return try await subscribe (
156195 channels: Set ( [ channelToString ( channel) ] ) ,
157196 payloadType: T . self,
197+ queries: queries,
158198 callback: callback
159199 )
160200 }
161201
162202 public func subscribe< T : Codable > (
163203 channels: [ ChannelValue ] ,
164204 payloadType: T . Type ,
165- callback: @escaping ( RealtimeResponseEvent ) -> Void
205+ callback: @escaping ( RealtimeResponseEvent ) -> Void ,
206+ queries: [ String ] = [ ]
166207 ) async throws -> RealtimeSubscription {
167208 return try await subscribe (
168209 channels: Set ( channels. map { channelToString ( $0) } ) ,
169210 payloadType: T . self,
211+ queries: queries,
170212 callback: callback
171213 )
172214 }
173215
174216 public func subscribe< T : Codable > (
175217 channels: Set < String > ,
176218 payloadType: T . Type ,
219+ queries: [ String ] = [ ] ,
177220 callback: @escaping ( RealtimeResponseEvent ) -> Void
178221 ) async throws -> RealtimeSubscription {
222+ // Allocate a new slot index
179223 subscriptionsCounter += 1
224+ let slot = subscriptionsCounter
180225
181- let count = subscriptionsCounter
182-
183- channels. forEach {
184- activeChannels. insert ( $0)
185- }
226+ // Convert queries to array of strings
227+ // queries is already [String], store as-is
228+ let queryStrings = queries
186229
187- activeSubscriptions [ count] = RealtimeCallback (
188- for: Set ( channels) ,
230+ // Store slot-centric data: channels, queries, and callback belong to the slot
231+ // queries is stored as [String] (array of query strings)
232+ // No channel mutation occurs here - channels are derived from slots in createSocket()
233+ activeSubscriptions [ slot] = RealtimeCallback (
234+ for: channels,
189235 with: callback
190236 )
237+ activeSubscriptionQueries [ slot] = queryStrings
191238
192239 connectSync. sync {
193- subCallDepth+= 1
240+ subCallDepth += 1
194241 }
195242
196243 try await Task . sleep ( nanoseconds: UInt64 ( DEBOUNCE_NANOS) )
@@ -204,23 +251,19 @@ open class Realtime : Service {
204251 }
205252
206253 return RealtimeSubscription {
207- self . activeSubscriptions [ count] = nil
208- self . cleanUp ( channels: channels)
254+ let subscriptionId = self . slotToSubscriptionId [ slot]
255+ self . activeSubscriptions [ slot] = nil
256+ self . activeSubscriptionQueries [ slot] = nil
257+ self . slotToSubscriptionId [ slot] = nil
258+ if let sid = subscriptionId {
259+ self . subscriptionIdToSlot [ sid] = nil
260+ }
209261 try await self . createSocket ( )
210262 }
211263 }
212264
213- func cleanUp( channels: Set < String > ) {
214- activeChannels = activeChannels. filter { channel in
215- guard channels. contains ( channel) else {
216- return true
217- }
218- let subsWithChannel = activeSubscriptions. filter { callback in
219- return callback. value. channels. contains ( channel)
220- }
221- return !subsWithChannel. isEmpty
222- }
223- }
265+ // cleanUp is no longer needed - slots are removed directly in subscribe().close()
266+ // Channels are automatically rebuilt from remaining slots in createSocket()
224267}
225268
226269extension Realtime : WebSocketClientDelegate {
@@ -230,18 +273,47 @@ extension Realtime: WebSocketClientDelegate {
230273 onOpenCallbacks. forEach { $0 ( ) }
231274 startHeartbeat ( )
232275 }
276+
277+ private func handleResponseConnected( from json: [ String : Any ] ) {
278+ guard let data = json [ " data " ] as? [ String : Any ] ,
279+ let subscriptions = data [ " subscriptions " ] as? [ String : String ] else {
280+ return
281+ }
282+
283+ // Store subscription ID mappings from backend
284+ // Format: { "0": "sub_a1f9", "1": "sub_b83c", ... }
285+ slotToSubscriptionId. removeAll ( )
286+ subscriptionIdToSlot. removeAll ( )
287+ for (slotStr, subscriptionId) in subscriptions {
288+ if let slot = Int ( slotStr) {
289+ slotToSubscriptionId [ slot] = subscriptionId
290+ subscriptionIdToSlot [ subscriptionId] = slot
291+ }
292+ }
293+ }
233294
234295 public func onMessage( text: String ) {
235296 let data = Data ( text. utf8)
236- if let json = try ! JSONSerialization . jsonObject ( with: data, options: [ ] ) as? [ String : Any ] {
237- if let type = json [ " type " ] as? String {
238- switch type {
239- case TYPE_ERROR: try ! handleResponseError ( from: json)
240- case TYPE_EVENT: handleResponseEvent ( from: json)
241- case TYPE_PONG: break // Handle pong response if needed
242- default : break
243- }
297+ guard let json = try ? JSONSerialization . jsonObject ( with: data, options: [ ] ) as? [ String : Any ] ,
298+ let type = json [ " type " ] as? String else {
299+ return
300+ }
301+
302+ switch type {
303+ case TYPE_ERROR:
304+ do {
305+ try handleResponseError ( from: json)
306+ } catch {
307+ onErrorCallbacks. forEach { $0 ( error, nil ) }
244308 }
309+ case " connected " :
310+ handleResponseConnected ( from: json)
311+ case TYPE_EVENT:
312+ handleResponseEvent ( from: json)
313+ case TYPE_PONG:
314+ break // Handle pong response if needed
315+ default :
316+ break
245317 }
246318 }
247319
@@ -281,24 +353,28 @@ extension Realtime: WebSocketClientDelegate {
281353 guard let data = json [ " data " ] as? [ String : Any ] ,
282354 let channels = data [ " channels " ] as? [ String ] ,
283355 let events = data [ " events " ] as? [ String ] ,
284- let payload = data [ " payload " ] as? [ String : Any ] else {
356+ let payload = data [ " payload " ] as? [ String : Any ] ,
357+ let subscriptions = data [ " subscriptions " ] as? [ String ] else {
285358 return
286359 }
287- guard channels. contains ( where: { channel in
288- activeChannels. contains ( channel)
289- } ) else {
360+
361+ guard subscriptions. count > 0 else {
290362 return
291363 }
292364
293- for subscription in activeSubscriptions {
294- if channels. contains ( where: { subscription. value. channels. contains ( $0) } ) {
295- let response = RealtimeResponseEvent (
296- events: events,
297- channels: channels,
298- timestamp: data [ " timestamp " ] as! String ,
299- payload: payload
300- )
301- subscription. value. callback ( response)
365+ // Iterate over all matching subscriptionIds and call callback for each
366+ for subscriptionId in subscriptions {
367+ // O(1) lookup using subscriptionId
368+ if let slot = subscriptionIdToSlot [ subscriptionId] {
369+ if let subscription = activeSubscriptions [ slot] {
370+ let response = RealtimeResponseEvent (
371+ events: events,
372+ channels: channels,
373+ timestamp: data [ " timestamp " ] as! String ,
374+ payload: payload
375+ )
376+ subscription. callback ( response)
377+ }
302378 }
303379 }
304380 }
0 commit comments