Skip to content

Commit 121f18a

Browse files
Merge pull request #57 from ably/ECO-5465-port-integration-tests
[ECO-5465] Port remaining high-value JS integration tests
2 parents 010b3e0 + 792683d commit 121f18a

13 files changed

Lines changed: 3456 additions & 101 deletions

File tree

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
internal import AblyPlugin
2+
3+
internal extension ARTClientOptions {
4+
private class Box<T> {
5+
internal let boxed: T
6+
7+
internal init(boxed: T) {
8+
self.boxed = boxed
9+
}
10+
}
11+
12+
private static let garbageCollectionOptionsKey = "Objects.garbageCollectionOptions"
13+
14+
/// Can be overriden for testing purposes.
15+
var garbageCollectionOptions: InternalDefaultRealtimeObjects.GarbageCollectionOptions? {
16+
get {
17+
let optionsValue = PluginAPI.sharedInstance().pluginOptionsValue(
18+
forKey: Self.garbageCollectionOptionsKey,
19+
clientOptions: self,
20+
)
21+
22+
guard let optionsValue else {
23+
return nil
24+
}
25+
26+
guard let box = optionsValue as? Box<InternalDefaultRealtimeObjects.GarbageCollectionOptions> else {
27+
preconditionFailure("Expected GarbageCollectionOptionsBox, got \(optionsValue)")
28+
}
29+
30+
return box.boxed
31+
}
32+
33+
set {
34+
guard let newValue else {
35+
preconditionFailure("Not implemented the ability to un-set GC options")
36+
}
37+
38+
PluginAPI.sharedInstance().setPluginOptionsValue(
39+
Box<InternalDefaultRealtimeObjects.GarbageCollectionOptions>(boxed: newValue),
40+
forKey: Self.garbageCollectionOptionsKey,
41+
clientOptions: self,
42+
)
43+
}
44+
}
45+
}

Sources/AblyLiveObjects/Internal/CoreSDK.swift

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,31 @@ internal protocol CoreSDK: AnyObject, Sendable {
88
/// Implements the internal `#publish` method of RTO15.
99
func publish(objectMessages: [OutboundObjectMessage]) async throws(InternalError)
1010

11+
/// Replaces the implementation of ``publish(objectMessages:)``.
12+
///
13+
/// Used by integration tests, for example to disable `ObjectMessage` publishing so that a test can verify that a behaviour is not a side effect of an `ObjectMessage` sent by the SDK.
14+
func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(InternalError) -> Void)
15+
1116
/// Returns the current state of the Realtime channel that this wraps.
1217
var channelState: ARTRealtimeChannelState { get }
1318
}
1419

1520
internal final class DefaultCoreSDK: CoreSDK {
21+
/// Used to synchronize access to internal mutable state.
22+
private let mutex = NSLock()
23+
1624
private let channel: AblyPlugin.RealtimeChannel
1725
private let client: AblyPlugin.RealtimeClient
1826
private let pluginAPI: PluginAPIProtocol
1927
private let logger: AblyPlugin.Logger
2028

29+
/// If set to true, ``publish(objectMessages:)`` will behave like a no-op.
30+
///
31+
/// This enables the `testsOnly_overridePublish(with:)` test hook.
32+
///
33+
/// - Note: This should be `throws(InternalError)` but that causes a compilation error of "Runtime support for typed throws function types is only available in macOS 15.0.0 or newer".
34+
private nonisolated(unsafe) var overriddenPublishImplementation: (([OutboundObjectMessage]) async throws -> Void)?
35+
2136
internal init(
2237
channel: AblyPlugin.RealtimeChannel,
2338
client: AblyPlugin.RealtimeClient,
@@ -35,6 +50,22 @@ internal final class DefaultCoreSDK: CoreSDK {
3550
internal func publish(objectMessages: [OutboundObjectMessage]) async throws(InternalError) {
3651
logger.log("publish(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)))", level: .debug)
3752

53+
// Use the overridden implementation if supplied
54+
let overriddenImplementation = mutex.withLock {
55+
overriddenPublishImplementation
56+
}
57+
if let overriddenImplementation {
58+
do {
59+
try await overriddenImplementation(objectMessages)
60+
} catch {
61+
guard let internalError = error as? InternalError else {
62+
preconditionFailure("Expected InternalError, got \(error)")
63+
}
64+
throw internalError
65+
}
66+
return
67+
}
68+
3869
// TODO: Implement the full spec of RTO15 (https://github.com/ably/ably-cocoa-liveobjects-plugin/issues/47)
3970
try await DefaultInternalPlugin.sendObject(
4071
objectMessages: objectMessages,
@@ -44,6 +75,12 @@ internal final class DefaultCoreSDK: CoreSDK {
4475
)
4576
}
4677

78+
internal func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(InternalError) -> Void) {
79+
mutex.withLock {
80+
overriddenPublishImplementation = newImplementation
81+
}
82+
}
83+
4784
internal var channelState: ARTRealtimeChannelState {
4885
channel.state
4986
}

Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,15 @@ internal final class DefaultInternalPlugin: NSObject, AblyPlugin.LiveObjectsInte
3636
internal func prepare(_ channel: AblyPlugin.RealtimeChannel, client: AblyPlugin.RealtimeClient) {
3737
let logger = pluginAPI.logger(for: channel)
3838
let callbackQueue = pluginAPI.callbackQueue(for: client)
39+
let options = pluginAPI.options(for: client)
3940

4041
logger.log("LiveObjects.DefaultInternalPlugin received prepare(_:)", level: .debug)
41-
let liveObjects = InternalDefaultRealtimeObjects(logger: logger, userCallbackQueue: callbackQueue, clock: DefaultSimpleClock())
42+
let liveObjects = InternalDefaultRealtimeObjects(
43+
logger: logger,
44+
userCallbackQueue: callbackQueue,
45+
clock: DefaultSimpleClock(),
46+
garbageCollectionOptions: options.garbageCollectionOptions ?? .init(),
47+
)
4248
pluginAPI.setPluginDataValue(liveObjects, forKey: Self.pluginDataKey, channel: channel)
4349
}
4450

Sources/AblyLiveObjects/Internal/InternalDefaultRealtimeObjects.swift

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
2626
private nonisolated(unsafe) var garbageCollectionTask: Task<Void, Never>!
2727

2828
/// Parameters used to control the garbage collection of tombstoned objects and map entries, as described in RTO10.
29-
internal struct GarbageCollectionOptions {
29+
internal struct GarbageCollectionOptions: Encodable, Hashable {
3030
/// The RTO10a interval at which we will perform garbage collection.
3131
///
3232
/// The default value comes from the suggestion in RTO10a.
@@ -98,6 +98,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
9898
(receivedObjectProtocolMessages, receivedObjectProtocolMessagesContinuation) = AsyncStream.makeStream()
9999
(receivedObjectSyncProtocolMessages, receivedObjectSyncProtocolMessagesContinuation) = AsyncStream.makeStream()
100100
(waitingForSyncEvents, waitingForSyncEventsContinuation) = AsyncStream.makeStream()
101+
(completedGarbageCollectionEvents, completedGarbageCollectionsEventsContinuation) = AsyncStream.makeStream()
101102
mutableState = .init(objectsPool: .init(logger: logger, userCallbackQueue: userCallbackQueue, clock: clock))
102103
garbageCollectionInterval = garbageCollectionOptions.interval
103104
garbageCollectionGracePeriod = garbageCollectionOptions.gracePeriod
@@ -331,21 +332,32 @@ internal final class InternalDefaultRealtimeObjects: Sendable, LiveMapObjectPool
331332
gracePeriod: garbageCollectionGracePeriod,
332333
clock: clock,
333334
logger: logger,
335+
eventsContinuation: completedGarbageCollectionsEventsContinuation,
334336
)
335337
}
336338
}
337339

340+
// These drive the testsOnly_completedGarbageCollectionEvents property that informs the test suite when a garbage collection cycle has completed.
341+
private let completedGarbageCollectionEvents: AsyncStream<Void>
342+
private let completedGarbageCollectionsEventsContinuation: AsyncStream<Void>.Continuation
343+
/// Emits an element whenever a garbage collection cycle has completed.
344+
internal var testsOnly_completedGarbageCollectionEvents: AsyncStream<Void> {
345+
completedGarbageCollectionEvents
346+
}
347+
338348
// MARK: - Testing
339349

340350
/// Finishes the following streams, to allow a test to perform assertions about which elements the streams have emitted to this moment:
341351
///
342352
/// - testsOnly_receivedObjectProtocolMessages
343353
/// - testsOnly_receivedObjectStateProtocolMessages
344354
/// - testsOnly_waitingForSyncEvents
355+
/// - testsOnly_completedGarbageCollectionEvents
345356
internal func testsOnly_finishAllTestHelperStreams() {
346357
receivedObjectProtocolMessagesContinuation.finish()
347358
receivedObjectSyncProtocolMessagesContinuation.finish()
348359
waitingForSyncEventsContinuation.finish()
360+
completedGarbageCollectionsEventsContinuation.finish()
349361
}
350362

351363
// MARK: - Mutable state and the operations that affect it

Sources/AblyLiveObjects/Internal/ObjectsPool.swift

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,12 @@ internal struct ObjectsPool {
408408
}
409409

410410
/// Performs garbage collection of tombstoned objects and map entries, per RTO10c.
411-
internal mutating func performGarbageCollection(gracePeriod: TimeInterval, clock: SimpleClock, logger: Logger) {
411+
internal mutating func performGarbageCollection(
412+
gracePeriod: TimeInterval,
413+
clock: SimpleClock,
414+
logger: Logger,
415+
eventsContinuation: AsyncStream<Void>.Continuation,
416+
) {
412417
logger.log("Performing garbage collection, grace period \(gracePeriod)s", level: .debug)
413418

414419
let now = clock.now
@@ -433,5 +438,7 @@ internal struct ObjectsPool {
433438
}
434439
return !shouldRelease
435440
}
441+
442+
eventsContinuation.yield()
436443
}
437444
}

Sources/AblyLiveObjects/Public/Public Proxy Objects/PublicDefaultRealtimeObjects.swift

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,13 @@ internal final class PublicDefaultRealtimeObjects: RealtimeObjects {
117117
internal var testsOnly_receivedObjectSyncProtocolMessages: AsyncStream<[InboundObjectMessage]> {
118118
proxied.testsOnly_receivedObjectSyncProtocolMessages
119119
}
120+
121+
// These are used by the integration tests.
122+
123+
/// Replaces the method that this `RealtimeObjects` uses to send any outbound `ObjectMessage`s.
124+
///
125+
/// Used by integration tests, for example to disable `ObjectMessage` publishing so that a test can verify that a behaviour is not a side effect of an `ObjectMessage` sent by the SDK.
126+
internal func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(InternalError) -> Void) {
127+
coreSDK.testsOnly_overridePublish(with: newImplementation)
128+
}
120129
}

Sources/AblyLiveObjects/Utility/ExtendedJSONValue.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ internal extension ExtendedJSONValue {
4343

4444
/// Converts an `ExtendedJSONValue` to an object.
4545
///
46-
/// The contract for what this will return are the same as those of `JSONValue.toJSONSerializationInputElemtn`, with one addition: any values in the input of case `.extra` will be passed to the `serializeExtraValue` function, and the result of this function call will be inserted into the output object.
46+
/// The contract for what this will return are the same as those of `JSONValue.toJSONSerializationInputElement`, with one addition: any values in the input of case `.extra` will be passed to the `serializeExtraValue` function, and the result of this function call will be inserted into the output object.
4747
func serialized(serializeExtraValue: (Extra) -> Any) -> Any {
4848
switch self {
4949
case let .object(underlying):

Tests/AblyLiveObjectsTests/Helpers/ClientHelper.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import Ably
2-
import AblyLiveObjects
2+
@testable import AblyLiveObjects
33

44
/// Helper for creating ably-cocoa objects, for use in integration tests.
55
enum ClientHelper {
@@ -26,6 +26,9 @@ enum ClientHelper {
2626
let logger = PrefixedLogger(prefix: "(\(logIdentifier)) ")
2727
clientOptions.logHandler = logger
2828
}
29+
if let garbageCollectionOptions = options.garbageCollectionOptions {
30+
clientOptions.garbageCollectionOptions = garbageCollectionOptions
31+
}
2932

3033
return ARTRealtime(options: clientOptions)
3134
}
@@ -69,6 +72,7 @@ enum ClientHelper {
6972
struct PartialClientOptions: Encodable, Hashable {
7073
var useBinaryProtocol: Bool?
7174
var autoConnect: Bool?
75+
var garbageCollectionOptions: InternalDefaultRealtimeObjects.GarbageCollectionOptions?
7276

7377
/// A prefix for all log messages emitted by the client. Allows clients to be distinguished in log messages for tests which use multiple clients.
7478
var logIdentifier: String?

Tests/AblyLiveObjectsTests/Helpers/Subscriber.swift

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ final class Subscriber<each CallbackArg: Sendable>: Sendable {
99
// Used to synchronize access to the nonisolated(unsafe) mutable state.
1010
private let mutex = NSLock()
1111
private nonisolated(unsafe) var invocations: [(repeat each CallbackArg)] = []
12+
private nonisolated(unsafe) var listeners: [CallbackWrapper] = []
1213

1314
/// Creates a `Subscriber`.
1415
///
@@ -39,13 +40,50 @@ final class Subscriber<each CallbackArg: Sendable>: Sendable {
3940
guard let self else {
4041
return
4142
}
42-
mutex.withLock {
43+
let callListeners = mutex.withLock {
4344
let invocation = (repeat each arg)
4445
invocations.append(invocation)
46+
47+
return { [listeners] in
48+
for listener in listeners {
49+
listener.callAsFunction(repeat each invocation)
50+
}
51+
}
4552
}
4653
if let action {
4754
action(repeat each arg)
4855
}
56+
callListeners()
4957
}
5058
}
59+
60+
/// A wrapper that allows us to store a callback that takes variadic args.
61+
///
62+
/// This allows us to avoid the error "Cannot fully abstract a value of variadic function type '@Sendable (repeat each CallbackArg) -> ()' because different contexts will not be able to reliably agree on a calling convention; try wrapping it in a struct" that we get if we try to directly store the callback in an array. Claude suggested this solution.
63+
private struct CallbackWrapper {
64+
let callback: @Sendable (repeat each CallbackArg) -> Void
65+
66+
func callAsFunction(_ args: repeat each CallbackArg) {
67+
callback(repeat each args)
68+
}
69+
}
70+
71+
/// Adds a listener which replays all previously buffered and future invocations of any function previously created by ``createListener(_:)``.
72+
///
73+
/// This is useful for the scenario where you want to set up a subscription synchronously (so as not to miss any events) but then in an `async` context perform actions as a result of the invocation of the listener. (You could equally use the SDK's `AsyncSequence` interface but the approach here is a closer mapping of the ported JS integration tests that call `subscribe`.)
74+
func addListener(_ listener: @escaping (@Sendable (repeat each CallbackArg) -> Void)) {
75+
let performInvocations = mutex.withLock {
76+
listeners.append(.init(callback: listener))
77+
78+
return { [invocations, callbackQueue] in
79+
for invocation in invocations {
80+
callbackQueue.async {
81+
listener(repeat each invocation)
82+
}
83+
}
84+
}
85+
}
86+
87+
performInvocations()
88+
}
5189
}

0 commit comments

Comments
 (0)