Skip to content

Commit cd325c1

Browse files
Inject callback queue into map and counter
This is preparation for implementing subscriptions. Cursor updated the tests.
1 parent a30f635 commit cd325c1

9 files changed

Lines changed: 178 additions & 124 deletions

Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,12 @@ internal final class DefaultInternalPlugin: NSObject, AblyPlugin.LiveObjectsInte
3333
// MARK: - LiveObjectsInternalPluginProtocol
3434

3535
// Populates the channel's `objects` property.
36-
internal func prepare(_ channel: AblyPlugin.RealtimeChannel, client _: AblyPlugin.RealtimeClient) {
36+
internal func prepare(_ channel: AblyPlugin.RealtimeChannel, client: AblyPlugin.RealtimeClient) {
3737
let logger = pluginAPI.logger(for: channel)
38+
let callbackQueue = pluginAPI.callbackQueue(for: client)
3839

3940
logger.log("LiveObjects.DefaultInternalPlugin received prepare(_:)", level: .debug)
40-
let liveObjects = InternalDefaultRealtimeObjects(logger: logger)
41+
let liveObjects = InternalDefaultRealtimeObjects(logger: logger, userCallbackQueue: callbackQueue)
4142
pluginAPI.setPluginDataValue(liveObjects, forKey: Self.pluginDataKey, channel: channel)
4243
}
4344

Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,24 +28,28 @@ internal final class InternalDefaultLiveCounter: Sendable {
2828
}
2929

3030
private let logger: AblyPlugin.Logger
31+
private let userCallbackQueue: DispatchQueue
3132

3233
// MARK: - Initialization
3334

3435
internal convenience init(
3536
testsOnly_data data: Double,
3637
objectID: String,
37-
logger: AblyPlugin.Logger
38+
logger: AblyPlugin.Logger,
39+
userCallbackQueue: DispatchQueue
3840
) {
39-
self.init(data: data, objectID: objectID, logger: logger)
41+
self.init(data: data, objectID: objectID, logger: logger, userCallbackQueue: userCallbackQueue)
4042
}
4143

4244
private init(
4345
data: Double,
4446
objectID: String,
45-
logger: AblyPlugin.Logger
47+
logger: AblyPlugin.Logger,
48+
userCallbackQueue: DispatchQueue
4649
) {
4750
mutableState = .init(liveObject: .init(objectID: objectID), data: data)
4851
self.logger = logger
52+
self.userCallbackQueue = userCallbackQueue
4953
}
5054

5155
/// Creates a "zero-value LiveCounter", per RTLC4.
@@ -55,11 +59,13 @@ internal final class InternalDefaultLiveCounter: Sendable {
5559
internal static func createZeroValued(
5660
objectID: String,
5761
logger: AblyPlugin.Logger,
62+
userCallbackQueue: DispatchQueue,
5863
) -> Self {
5964
.init(
6065
data: 0,
6166
objectID: objectID,
6267
logger: logger,
68+
userCallbackQueue: userCallbackQueue,
6369
)
6470
}
6571

Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,31 +45,36 @@ internal final class InternalDefaultLiveMap: Sendable {
4545
}
4646

4747
private let logger: AblyPlugin.Logger
48+
private let userCallbackQueue: DispatchQueue
4849

4950
// MARK: - Initialization
5051

5152
internal convenience init(
5253
testsOnly_data data: [String: ObjectsMapEntry],
5354
objectID: String,
5455
testsOnly_semantics semantics: WireEnum<ObjectsMapSemantics>? = nil,
55-
logger: AblyPlugin.Logger
56+
logger: AblyPlugin.Logger,
57+
userCallbackQueue: DispatchQueue,
5658
) {
5759
self.init(
5860
data: data,
5961
objectID: objectID,
6062
semantics: semantics,
6163
logger: logger,
64+
userCallbackQueue: userCallbackQueue,
6265
)
6366
}
6467

6568
private init(
6669
data: [String: ObjectsMapEntry],
6770
objectID: String,
6871
semantics: WireEnum<ObjectsMapSemantics>?,
69-
logger: AblyPlugin.Logger
72+
logger: AblyPlugin.Logger,
73+
userCallbackQueue: DispatchQueue,
7074
) {
7175
mutableState = .init(liveObject: .init(objectID: objectID), data: data, semantics: semantics)
7276
self.logger = logger
77+
self.userCallbackQueue = userCallbackQueue
7378
}
7479

7580
/// Creates a "zero-value LiveMap", per RTLM4.
@@ -81,12 +86,14 @@ internal final class InternalDefaultLiveMap: Sendable {
8186
objectID: String,
8287
semantics: WireEnum<ObjectsMapSemantics>? = nil,
8388
logger: AblyPlugin.Logger,
89+
userCallbackQueue: DispatchQueue,
8490
) -> Self {
8591
.init(
8692
data: [:],
8793
objectID: objectID,
8894
semantics: semantics,
8995
logger: logger,
96+
userCallbackQueue: userCallbackQueue,
9097
)
9198
}
9299

@@ -211,6 +218,7 @@ internal final class InternalDefaultLiveMap: Sendable {
211218
using: state,
212219
objectsPool: &objectsPool,
213220
logger: logger,
221+
userCallbackQueue: userCallbackQueue,
214222
)
215223
}
216224
}
@@ -222,6 +230,7 @@ internal final class InternalDefaultLiveMap: Sendable {
222230
from: operation,
223231
objectsPool: &objectsPool,
224232
logger: logger,
233+
userCallbackQueue: userCallbackQueue,
225234
)
226235
}
227236
}
@@ -233,6 +242,7 @@ internal final class InternalDefaultLiveMap: Sendable {
233242
operation,
234243
objectsPool: &objectsPool,
235244
logger: logger,
245+
userCallbackQueue: userCallbackQueue,
236246
)
237247
}
238248
}
@@ -251,6 +261,7 @@ internal final class InternalDefaultLiveMap: Sendable {
251261
objectMessageSiteCode: objectMessageSiteCode,
252262
objectsPool: &objectsPool,
253263
logger: logger,
264+
userCallbackQueue: userCallbackQueue,
254265
)
255266
}
256267
}
@@ -271,6 +282,7 @@ internal final class InternalDefaultLiveMap: Sendable {
271282
operationData: operationData,
272283
objectsPool: &objectsPool,
273284
logger: logger,
285+
userCallbackQueue: userCallbackQueue,
274286
)
275287
}
276288
}
@@ -314,6 +326,7 @@ internal final class InternalDefaultLiveMap: Sendable {
314326
using state: ObjectState,
315327
objectsPool: inout ObjectsPool,
316328
logger: AblyPlugin.Logger,
329+
userCallbackQueue: DispatchQueue,
317330
) {
318331
// RTLM6a: Replace the private siteTimeserials with the value from ObjectState.siteTimeserials
319332
liveObject.siteTimeserials = state.siteTimeserials
@@ -330,6 +343,7 @@ internal final class InternalDefaultLiveMap: Sendable {
330343
from: createOp,
331344
objectsPool: &objectsPool,
332345
logger: logger,
346+
userCallbackQueue: userCallbackQueue,
333347
)
334348
}
335349
}
@@ -339,6 +353,7 @@ internal final class InternalDefaultLiveMap: Sendable {
339353
from operation: ObjectOperation,
340354
objectsPool: inout ObjectsPool,
341355
logger: AblyPlugin.Logger,
356+
userCallbackQueue: DispatchQueue,
342357
) {
343358
// RTLM17a: For each key–ObjectsMapEntry pair in ObjectOperation.map.entries
344359
if let entries = operation.map?.entries {
@@ -359,6 +374,7 @@ internal final class InternalDefaultLiveMap: Sendable {
359374
operationData: entry.data,
360375
objectsPool: &objectsPool,
361376
logger: logger,
377+
userCallbackQueue: userCallbackQueue,
362378
)
363379
}
364380
}
@@ -374,6 +390,7 @@ internal final class InternalDefaultLiveMap: Sendable {
374390
objectMessageSiteCode: String?,
375391
objectsPool: inout ObjectsPool,
376392
logger: Logger,
393+
userCallbackQueue: DispatchQueue,
377394
) {
378395
guard let applicableOperation = liveObject.canApplyOperation(objectMessageSerial: objectMessageSerial, objectMessageSiteCode: objectMessageSiteCode, logger: logger) else {
379396
// RTLM15b
@@ -391,6 +408,7 @@ internal final class InternalDefaultLiveMap: Sendable {
391408
operation,
392409
objectsPool: &objectsPool,
393410
logger: logger,
411+
userCallbackQueue: userCallbackQueue,
394412
)
395413
case .known(.mapSet):
396414
guard let mapOp = operation.mapOp else {
@@ -409,6 +427,7 @@ internal final class InternalDefaultLiveMap: Sendable {
409427
operationData: data,
410428
objectsPool: &objectsPool,
411429
logger: logger,
430+
userCallbackQueue: userCallbackQueue,
412431
)
413432
case .known(.mapRemove):
414433
guard let mapOp = operation.mapOp else {
@@ -433,6 +452,7 @@ internal final class InternalDefaultLiveMap: Sendable {
433452
operationData: ObjectData,
434453
objectsPool: inout ObjectsPool,
435454
logger: AblyPlugin.Logger,
455+
userCallbackQueue: DispatchQueue,
436456
) {
437457
// RTLM7a: If an entry exists in the private data for the specified key
438458
if let existingEntry = data[key] {
@@ -459,7 +479,7 @@ internal final class InternalDefaultLiveMap: Sendable {
459479
// RTLM7c: If the operation has a non-empty ObjectData.objectId attribute
460480
if let objectId = operationData.objectId, !objectId.isEmpty {
461481
// RTLM7c1: Create a zero-value LiveObject in the internal ObjectsPool per RTO6
462-
_ = objectsPool.createZeroValueObject(forObjectID: objectId, logger: logger)
482+
_ = objectsPool.createZeroValueObject(forObjectID: objectId, logger: logger, userCallbackQueue: userCallbackQueue)
463483
}
464484
}
465485

@@ -535,6 +555,7 @@ internal final class InternalDefaultLiveMap: Sendable {
535555
_ operation: ObjectOperation,
536556
objectsPool: inout ObjectsPool,
537557
logger: AblyPlugin.Logger,
558+
userCallbackQueue: DispatchQueue,
538559
) {
539560
if liveObject.createOperationIsMerged {
540561
// RTLM16b
@@ -549,6 +570,7 @@ internal final class InternalDefaultLiveMap: Sendable {
549570
from: operation,
550571
objectsPool: &objectsPool,
551572
logger: logger,
573+
userCallbackQueue: userCallbackQueue,
552574
)
553575
}
554576

Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
99
private nonisolated(unsafe) var mutableState: MutableState!
1010

1111
private let logger: AblyPlugin.Logger
12+
private let userCallbackQueue: DispatchQueue
1213

1314
// These drive the testsOnly_* properties that expose the received ProtocolMessages to the test suite.
1415
private let receivedObjectProtocolMessages: AsyncStream<[InboundObjectMessage]>
@@ -69,12 +70,13 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
6970
}
7071
}
7172

72-
internal init(logger: AblyPlugin.Logger) {
73+
internal init(logger: AblyPlugin.Logger, userCallbackQueue: DispatchQueue) {
7374
self.logger = logger
75+
self.userCallbackQueue = userCallbackQueue
7476
(receivedObjectProtocolMessages, receivedObjectProtocolMessagesContinuation) = AsyncStream.makeStream()
7577
(receivedObjectSyncProtocolMessages, receivedObjectSyncProtocolMessagesContinuation) = AsyncStream.makeStream()
7678
(waitingForSyncEvents, waitingForSyncEventsContinuation) = AsyncStream.makeStream()
77-
mutableState = .init(objectsPool: .init(logger: logger))
79+
mutableState = .init(objectsPool: .init(logger: logger, userCallbackQueue: userCallbackQueue))
7880
}
7981

8082
// MARK: - LiveMapObjectPoolDelegate
@@ -171,6 +173,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
171173
mutableState.handleObjectProtocolMessage(
172174
objectMessages: objectMessages,
173175
logger: logger,
176+
userCallbackQueue: userCallbackQueue,
174177
receivedObjectProtocolMessagesContinuation: receivedObjectProtocolMessagesContinuation,
175178
)
176179
}
@@ -187,6 +190,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
187190
objectMessages: objectMessages,
188191
protocolMessageChannelSerial: protocolMessageChannelSerial,
189192
logger: logger,
193+
userCallbackQueue: userCallbackQueue,
190194
receivedObjectSyncProtocolMessagesContinuation: receivedObjectSyncProtocolMessagesContinuation,
191195
)
192196
}
@@ -197,7 +201,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
197201
/// Intended as a way for tests to populate the object pool.
198202
internal func testsOnly_createZeroValueLiveObject(forObjectID objectID: String) -> ObjectsPool.Entry? {
199203
mutex.withLock {
200-
mutableState.objectsPool.createZeroValueObject(forObjectID: objectID, logger: logger)
204+
mutableState.objectsPool.createZeroValueObject(forObjectID: objectID, logger: logger, userCallbackQueue: userCallbackQueue)
201205
}
202206
}
203207

@@ -258,6 +262,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
258262
objectMessages: [InboundObjectMessage],
259263
protocolMessageChannelSerial: String?,
260264
logger: Logger,
265+
userCallbackQueue: DispatchQueue,
261266
receivedObjectSyncProtocolMessagesContinuation: AsyncStream<[InboundObjectMessage]>.Continuation,
262267
) {
263268
logger.log("handleObjectSyncProtocolMessage(objectMessages: \(objectMessages), protocolMessageChannelSerial: \(String(describing: protocolMessageChannelSerial)))", level: .debug)
@@ -314,6 +319,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
314319
objectsPool.applySyncObjectsPool(
315320
completedSyncObjectsPool,
316321
logger: logger,
322+
userCallbackQueue: userCallbackQueue,
317323
)
318324

319325
// RTO5c6
@@ -323,6 +329,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
323329
applyObjectProtocolMessageObjectMessage(
324330
objectMessage,
325331
logger: logger,
332+
userCallbackQueue: userCallbackQueue,
326333
)
327334
}
328335
}
@@ -338,6 +345,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
338345
internal mutating func handleObjectProtocolMessage(
339346
objectMessages: [InboundObjectMessage],
340347
logger: Logger,
348+
userCallbackQueue: DispatchQueue,
341349
receivedObjectProtocolMessagesContinuation: AsyncStream<[InboundObjectMessage]>.Continuation,
342350
) {
343351
receivedObjectProtocolMessagesContinuation.yield(objectMessages)
@@ -356,6 +364,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
356364
applyObjectProtocolMessageObjectMessage(
357365
objectMessage,
358366
logger: logger,
367+
userCallbackQueue: userCallbackQueue,
359368
)
360369
}
361370
}
@@ -365,6 +374,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
365374
private mutating func applyObjectProtocolMessageObjectMessage(
366375
_ objectMessage: InboundObjectMessage,
367376
logger: Logger,
377+
userCallbackQueue: DispatchQueue,
368378
) {
369379
guard let operation = objectMessage.operation else {
370380
// RTO9a1
@@ -380,6 +390,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
380390
guard let newEntry = objectsPool.createZeroValueObject(
381391
forObjectID: operation.objectId,
382392
logger: logger,
393+
userCallbackQueue: userCallbackQueue,
383394
) else {
384395
logger.log("Unable to create zero-value object for \(operation.objectId) when processing OBJECT message; dropping", level: .warn)
385396
return

0 commit comments

Comments
 (0)