11package io.ably.lib.objects
22
3+ import io.ably.lib.objects.serialization.gson
34import io.ably.lib.objects.state.ObjectsStateChange
45import io.ably.lib.objects.state.ObjectsStateEvent
6+ import io.ably.lib.objects.type.ObjectType
57import io.ably.lib.objects.type.counter.LiveCounter
8+ import io.ably.lib.objects.type.livecounter.DefaultLiveCounter
9+ import io.ably.lib.objects.type.livemap.DefaultLiveMap
610import io.ably.lib.objects.type.map.LiveMap
711import io.ably.lib.objects.type.map.LiveMapValue
812import io.ably.lib.realtime.ChannelState
@@ -57,40 +61,28 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val
5761
5862 override fun getRoot (): LiveMap = runBlocking { getRootAsync() }
5963
60- override fun createMap (): LiveMap {
61- return createMap(mutableMapOf ())
62- }
64+ override fun createMap (): LiveMap = createMap(mutableMapOf ())
6365
64- override fun createMap (entries : MutableMap <String , LiveMapValue >): LiveMap {
65- TODO (" Not yet implemented" )
66- }
66+ override fun createMap (entries : MutableMap <String , LiveMapValue >): LiveMap = runBlocking { createMapAsync(entries) }
6767
68- override fun createCounter (): LiveCounter {
69- return createCounter(0 )
70- }
68+ override fun createCounter (): LiveCounter = createCounter(0 )
7169
72- override fun createCounter (initialValue : Number ): LiveCounter {
73- TODO (" Not yet implemented" )
74- }
70+ override fun createCounter (initialValue : Number ): LiveCounter = runBlocking { createCounterAsync(initialValue) }
7571
7672 override fun getRootAsync (callback : Callback <LiveMap >) {
7773 asyncScope.launchWithCallback(callback) { getRootAsync() }
7874 }
7975
80- override fun createMapAsync (callback : Callback <LiveMap >) {
81- TODO (" Not yet implemented" )
82- }
76+ override fun createMapAsync (callback : Callback <LiveMap >) = createMapAsync(mutableMapOf (), callback)
8377
8478 override fun createMapAsync (entries : MutableMap <String , LiveMapValue >, callback : Callback <LiveMap >) {
85- TODO ( " Not yet implemented " )
79+ asyncScope.launchWithCallback(callback) { createMapAsync(entries) }
8680 }
8781
88- override fun createCounterAsync (callback : Callback <LiveCounter >) {
89- TODO (" Not yet implemented" )
90- }
82+ override fun createCounterAsync (callback : Callback <LiveCounter >) = createCounterAsync(0 , callback)
9183
9284 override fun createCounterAsync (initialValue : Number , callback : Callback <LiveCounter >) {
93- TODO ( " Not yet implemented " )
85+ asyncScope.launchWithCallback(callback) { createCounterAsync(initialValue) }
9486 }
9587
9688 override fun on (event : ObjectsStateEvent , listener : ObjectsStateChange .Listener ): ObjectsSubscription =
@@ -106,6 +98,81 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val
10698 objectsPool.get(ROOT_OBJECT_ID ) as LiveMap
10799 }
108100
101+ private suspend fun createMapAsync (entries : MutableMap <String , LiveMapValue >): LiveMap {
102+ adapter.throwIfInvalidWriteApiConfiguration(channelName)
103+
104+ // Create initial value operation
105+ val initialMapValue = DefaultLiveMap .createInitialValue(entries)
106+
107+ // Create initial value JSON string
108+ val initialValueJSONString = gson.toJson(initialMapValue)
109+
110+ // Create object ID from initial value
111+ val (objectId, nonce) = fromInitialValue(ObjectType .Map , initialValueJSONString)
112+
113+ // Create ObjectMessage with the operation
114+ val msg = ObjectMessage (
115+ operation = ObjectOperation (
116+ action = ObjectOperationAction .MapCreate ,
117+ objectId = objectId,
118+ map = initialMapValue.map,
119+ nonce = nonce,
120+ initialValue = initialValueJSONString,
121+ )
122+ )
123+
124+ // Publish the message
125+ publish(arrayOf(msg))
126+
127+ // Check if object already exists in pool (from echoed message)
128+ return objectsPool.get(objectId) as ? LiveMap ? : withContext(sequentialScope.coroutineContext) {
129+ objectsPool.createZeroValueObjectIfNotExists(objectId) as LiveMap
130+ }
131+ }
132+
133+ private suspend fun createCounterAsync (initialValue : Number ): LiveCounter {
134+ adapter.throwIfInvalidWriteApiConfiguration(channelName)
135+
136+ // Validate input parameter
137+ if (initialValue.toDouble().isNaN() || initialValue.toDouble().isInfinite()) {
138+ throw objectError(" Counter value should be a valid number" )
139+ }
140+
141+ val initialCounterValue = DefaultLiveCounter .createInitialValue(initialValue)
142+ // Create initial value operation
143+ val initialValueJSONString = gson.toJson(initialCounterValue)
144+
145+ // Create object ID from initial value
146+ val (objectId, nonce) = fromInitialValue(ObjectType .Counter , initialValueJSONString)
147+
148+ // Create ObjectMessage with the operation
149+ val msg = ObjectMessage (
150+ operation = ObjectOperation (
151+ action = ObjectOperationAction .CounterCreate ,
152+ objectId = objectId,
153+ counter = initialCounterValue.counter,
154+ nonce = nonce,
155+ initialValue = initialValueJSONString
156+ )
157+ )
158+
159+ // Publish the message
160+ publish(arrayOf(msg))
161+
162+ // Check if object already exists in pool (from echoed message)
163+ return objectsPool.get(objectId) as ? LiveCounter ? : withContext(sequentialScope.coroutineContext) {
164+ objectsPool.createZeroValueObjectIfNotExists(objectId) as LiveCounter
165+ }
166+ }
167+
168+ private suspend fun fromInitialValue (objectType : ObjectType , initialValue : String ): Pair <String , String > {
169+ val nonce = generateNonce()
170+ val msTimestamp = withContext(Dispatchers .IO ) {
171+ adapter.getServerTime()
172+ }
173+ return Pair (ObjectId .fromInitialValue(objectType, initialValue, nonce, msTimestamp).toString(), nonce)
174+ }
175+
109176 /* *
110177 * Spec: RTO15
111178 */
0 commit comments