Skip to content

Commit 6ba806c

Browse files
committed
Added basic implementation for public interfaces in kotlin
1 parent 4f35df8 commit 6ba806c

29 files changed

Lines changed: 2182 additions & 0 deletions

liveobjects/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ tasks.withType<Test>().configureEach {
3333
tasks.register<Test>("runLiveObjectUnitTests") {
3434
filter {
3535
includeTestsMatching("io.ably.lib.objects.unit.*")
36+
// unit tests for the path-based public API implementation (io.ably.lib.object)
37+
includeTestsMatching("io.ably.lib.object.unit.*")
3638
}
3739
}
3840

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.ably.lib.`object`
2+
3+
import io.ably.lib.`object`.value.LiveCounter
4+
5+
/**
6+
* Implementation of the [LiveCounter] value type: an immutable holder for the
7+
* initial count of a new LiveCounter object to be created by a mutation.
8+
*
9+
* Instantiated reflectively by `io.ably.lib.object.value.LiveCounter#create` —
10+
* the class name and the single `(java.lang.Number)` constructor are a frozen
11+
* contract with the `lib` module and must not change.
12+
*
13+
* Spec: RTLCV1, RTLCV2a, RTLCV3b, RTLCV3d
14+
*/
15+
public class DefaultLiveCounter(count: Number) : LiveCounter() {
16+
/** Internal initial count (RTLCV2a). */
17+
internal val count: Number = count
18+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package io.ably.lib.`object`
2+
3+
import io.ably.lib.`object`.value.LiveMap
4+
import io.ably.lib.`object`.value.LiveMapValue
5+
6+
/**
7+
* Implementation of the [LiveMap] value type: an immutable holder for the
8+
* initial entries of a new LiveMap object to be created by a mutation.
9+
*
10+
* Instantiated reflectively by `io.ably.lib.object.value.LiveMap#create` —
11+
* the class name and the single `(java.util.Map)` constructor are a frozen
12+
* contract with the `lib` module and must not change.
13+
*
14+
* Spec: RTLMV1, RTLMV2a, RTLMV3b, RTLMV3d
15+
*/
16+
public class DefaultLiveMap(entries: Map<String, LiveMapValue>) : LiveMap() {
17+
/** Internal initial entries (RTLMV2a); defensively copied for immutability (RTLMV3d). */
18+
internal val entries: Map<String, LiveMapValue> = HashMap(entries)
19+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.ably.lib.`object`
2+
3+
import io.ably.lib.`object`.Subscription
4+
5+
/**
6+
* Implementation of the public [Subscription] handle returned by the
7+
* `subscribe` methods of the path/instance APIs.
8+
*
9+
* Spec: SUB1, SUB2a, SUB2b (idempotent unsubscribe)
10+
*/
11+
internal class DefaultSubscription(private val onUnsubscribe: () -> Unit) : Subscription {
12+
13+
@Volatile
14+
private var unsubscribed = false
15+
16+
override fun unsubscribe() {
17+
if (unsubscribed) return // SUB2b - subsequent calls are no-ops
18+
unsubscribed = true
19+
onUnsubscribe()
20+
}
21+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package io.ably.lib.`object`
2+
3+
import io.ably.lib.types.AblyException
4+
import io.ably.lib.types.ErrorInfo
5+
6+
/**
7+
* Error codes and helpers for the path-based public API implementation.
8+
* Copied (and extended with the path-API codes) from the legacy package so
9+
* this package has no dependency on `io.ably.lib.objects`.
10+
*/
11+
internal enum class ObjectErrorCode(val code: Int) {
12+
BadRequest(40_000),
13+
InternalError(50_000),
14+
InvalidObject(92_000),
15+
InvalidInputParams(40_003),
16+
MapValueDataTypeUnsupported(40_013),
17+
PathNotResolved(92_005), // RTPO3c2 - write operation on a path that does not resolve
18+
ObjectsTypeMismatch(92_007), // RTTS5d2/RTTS9d2 - operation on a cast wrapper with mismatched resolved type
19+
}
20+
21+
internal enum class ObjectHttpStatusCode(val code: Int) {
22+
BadRequest(400),
23+
InternalServerError(500),
24+
}
25+
26+
internal fun objectsException(
27+
errorMessage: String,
28+
errorCode: ObjectErrorCode,
29+
statusCode: ObjectHttpStatusCode = ObjectHttpStatusCode.BadRequest,
30+
cause: Throwable? = null,
31+
): AblyException {
32+
val errorInfo = ErrorInfo(errorMessage, statusCode.code, errorCode.code)
33+
return cause?.let { AblyException.fromErrorInfo(it, errorInfo) } ?: AblyException.fromErrorInfo(errorInfo)
34+
}
35+
36+
/** ErrorInfo 400 / 40003 - invalid input (RTLMV4a/b, RTLCV4a, key validation). */
37+
internal fun invalidInputError(message: String) =
38+
objectsException(message, ObjectErrorCode.InvalidInputParams)
39+
40+
/** ErrorInfo 400 / 92005 - write operation on an unresolvable path (RTPO3c2). */
41+
internal fun pathNotResolvedError(path: String) =
42+
objectsException("Path could not be resolved: \"$path\"", ObjectErrorCode.PathNotResolved)
43+
44+
/** ErrorInfo 400 / 92007 - resolved/wrapped type does not match the typed wrapper (RTTS5d2/RTTS9d2). */
45+
internal fun typeMismatchError(message: String) =
46+
objectsException(message, ObjectErrorCode.ObjectsTypeMismatch)
47+
48+
/** ErrorInfo 500 / 92000 - invalid internal object state. */
49+
internal fun objectStateError(message: String) =
50+
objectsException(message, ObjectErrorCode.InvalidObject, ObjectHttpStatusCode.InternalServerError)
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package io.ably.lib.`object`
2+
3+
import io.ably.lib.util.Log
4+
import kotlinx.coroutines.CoroutineName
5+
import kotlinx.coroutines.CoroutineScope
6+
import kotlinx.coroutines.Dispatchers
7+
import kotlinx.coroutines.SupervisorJob
8+
import io.ably.lib.types.AblyException
9+
import java.util.concurrent.CompletableFuture
10+
import java.util.concurrent.ConcurrentHashMap
11+
import java.util.concurrent.CopyOnWriteArrayList
12+
import kotlinx.coroutines.launch
13+
14+
/**
15+
* The single abstract seam between this package and the realtime objects
16+
* system. The path/instance implementation classes depend ONLY on this
17+
* contract; a bridge (implemented outside this package, alongside the
18+
* realtime internals) provides the graph views, preconditions, publishing and
19+
* update fan-in. This keeps `io.ably.lib.object` free of any dependency on
20+
* `io.ably.lib.objects`.
21+
*/
22+
internal abstract class ObjectsBridge {
23+
24+
/** The channel this objects instance belongs to (used for PAOM2e/PAOM3b). */
25+
internal abstract val channelName: String
26+
27+
/** The root InternalLiveMap view (objectId `root`), or null if unavailable. Spec: RTO3, RTPO2b */
28+
internal abstract fun getRootNode(): MapNode?
29+
30+
/** Looks up a non-tombstoned object view by id, or null. Spec: RTO3a */
31+
internal abstract fun getNode(objectId: String): ObjectsNode?
32+
33+
/** Access API preconditions; throws ErrorInfo-carrying AblyException on failure. Spec: RTO25 */
34+
internal abstract fun throwIfInvalidAccessApiConfiguration()
35+
36+
/** Write API preconditions; throws ErrorInfo-carrying AblyException on failure. Spec: RTO26 */
37+
internal abstract fun throwIfInvalidWriteApiConfiguration()
38+
39+
/** Publishes the messages and applies them locally on ACK. Spec: RTO15, RTO20 */
40+
internal abstract suspend fun publish(messages: List<WireObjectMessage>)
41+
42+
/** Current server time in epoch milliseconds. Spec: RTO16 */
43+
internal abstract suspend fun getServerTime(): Long
44+
45+
/** Ensures the channel is attached and objects are SYNCED. Spec: RTO23b, RTO23c, RTO23e */
46+
internal abstract suspend fun ensureAttachedAndSynced()
47+
48+
/**
49+
* Registry for path-based subscriptions (RTPO19). Bridge implementations
50+
* feed it via [notifyUpdated].
51+
*/
52+
internal val pathSubscriptionRegister = PathObjectSubscriptionRegister(this)
53+
54+
/** Scope used to expose suspend write operations as CompletableFutures. */
55+
private val asyncScope =
56+
CoroutineScope(Dispatchers.Default + CoroutineName("ObjectsBridge") + SupervisorJob())
57+
58+
/** Per-object message-carrying update listeners (instance subscriptions, RTINS16). */
59+
private val updateListeners =
60+
ConcurrentHashMap<String, CopyOnWriteArrayList<(Set<String>, WireObjectMessage?) -> Unit>>()
61+
62+
/**
63+
* Subscribes to updates applied to the object with [objectId]. The listener
64+
* receives the set of updated map keys (empty for counters) and the source
65+
* message when the update originated from an operation. Returns an
66+
* unsubscribe handle.
67+
*/
68+
internal fun subscribeToUpdates(objectId: String, listener: (Set<String>, WireObjectMessage?) -> Unit): () -> Unit {
69+
val listeners = updateListeners.computeIfAbsent(objectId) { CopyOnWriteArrayList() }
70+
listeners.add(listener)
71+
return { listeners.remove(listener) }
72+
}
73+
74+
/**
75+
* Entry point for bridge implementations: call after an update has been
76+
* applied to an object, with the keys that changed (empty for counters) and
77+
* the source ObjectMessage when the update came from an operation (null for
78+
* sync-induced changes). Fans out to instance subscriptions (RTINS16) and
79+
* path subscriptions (RTPO19).
80+
*/
81+
internal fun notifyUpdated(objectId: String, updatedKeys: Set<String>, message: WireObjectMessage?) {
82+
updateListeners[objectId]?.forEach { listener ->
83+
try {
84+
listener(updatedKeys, message)
85+
} catch (t: Throwable) {
86+
Log.e("ObjectsBridge", "Error in update listener for objectId=$objectId", t)
87+
}
88+
}
89+
pathSubscriptionRegister.notifyObjectUpdated(objectId, updatedKeys, message)
90+
}
91+
92+
/**
93+
* Runs a suspend write and exposes it as a CompletableFuture<Void>;
94+
* failures complete exceptionally with the underlying AblyException.
95+
*/
96+
internal fun launchWithVoidFuture(block: suspend () -> Unit): CompletableFuture<Void> {
97+
val future = CompletableFuture<Void>()
98+
asyncScope.launch {
99+
try {
100+
block()
101+
future.complete(null)
102+
} catch (throwable: Throwable) {
103+
when (throwable) {
104+
is AblyException -> future.completeExceptionally(throwable)
105+
else -> future.completeExceptionally(
106+
objectsException("Error executing operation", ObjectErrorCode.BadRequest, cause = throwable)
107+
)
108+
}
109+
}
110+
}
111+
return future
112+
}
113+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package io.ably.lib.`object`
2+
3+
import java.nio.charset.StandardCharsets
4+
import java.security.MessageDigest
5+
import java.util.Base64
6+
7+
/** Object type discriminator used in objectId generation. Spec: RTO14 */
8+
internal enum class WireObjectType(val value: String) {
9+
Map("map"),
10+
Counter("counter"),
11+
}
12+
13+
/**
14+
* ObjectId generation for client-created objects. Copied from the legacy
15+
* `io.ably.lib.objects.ObjectId` so this package has no dependency on it -
16+
* the format `type:base64url(sha256(initialValue:nonce))@msTimestamp` is a
17+
* wire contract.
18+
*
19+
* Spec: RTO14, RTO6b1
20+
*/
21+
internal object ObjectsIdentifier {
22+
internal fun fromInitialValue(
23+
objectType: WireObjectType,
24+
initialValue: String,
25+
nonce: String,
26+
msTimestamp: Long,
27+
): String {
28+
val valueForHash = "$initialValue:$nonce".toByteArray(StandardCharsets.UTF_8)
29+
// RTO14b - hash the initial value and nonce to create a unique identifier
30+
val hashBytes = MessageDigest.getInstance("SHA-256").digest(valueForHash)
31+
val urlSafeHash = Base64.getUrlEncoder().withoutPadding().encodeToString(hashBytes)
32+
return "${objectType.value}:$urlSafeHash@$msTimestamp"
33+
}
34+
}
35+
36+
/**
37+
* Generates a random nonce string for object creation (16 alphanumeric chars).
38+
* Copied from the legacy `generateNonce`. Spec: RTLMV4g, RTLCV4d
39+
*/
40+
internal fun generateObjectNonce(): String {
41+
val chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
42+
return (1..16).map { chars.random() }.joinToString("")
43+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package io.ably.lib.`object`
2+
3+
/**
4+
* Abstract view over the live objects graph, implemented by the bridge that
5+
* connects this package to the realtime objects system (kept abstract so this
6+
* package has no dependency on `io.ably.lib.objects`).
7+
*
8+
* Contract for implementations:
9+
* - tombstoned objects are never returned by [ObjectsBridge.getNode] /
10+
* [ObjectsBridge.getRootNode];
11+
* - [MapNode.entries] / [MapNode.get] expose only non-tombstoned entries that
12+
* carry data; values referencing other objects carry `objectId` in their
13+
* [WireObjectData].
14+
*/
15+
internal interface ObjectsNode {
16+
val objectId: String
17+
}
18+
19+
/** View over an InternalLiveMap (RTLM1). */
20+
internal interface MapNode : ObjectsNode {
21+
/** Snapshot of the current non-tombstoned entries. */
22+
fun entries(): Map<String, WireObjectData>
23+
24+
/** The current non-tombstoned entry for [key], or null. */
25+
fun get(key: String): WireObjectData?
26+
}
27+
28+
/** View over an InternalLiveCounter (RTLC1). */
29+
internal interface CounterNode : ObjectsNode {
30+
/** The current counter value (RTLC5). */
31+
fun count(): Double
32+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package io.ably.lib.`object`
2+
3+
/**
4+
* Computes every path from the root map to a target object by walking the
5+
* objects graph on demand (over objectId references), instead of maintaining
6+
* incremental parent references like ably-js does (RTLO3f/RTLO4g/RTLO4h).
7+
* RTLO4f-equivalent observable behavior; cycle-safe.
8+
*
9+
* Spec: RTLO4f (equivalent)
10+
*/
11+
internal object PathFinder {
12+
13+
/**
14+
* Returns all paths (as segment lists) from the root map to the object with
15+
* [targetObjectId]. The root itself yields a single empty path.
16+
*/
17+
internal fun findFullPaths(bridge: ObjectsBridge, targetObjectId: String): List<List<String>> {
18+
val root = bridge.getRootNode() ?: return emptyList()
19+
if (targetObjectId == root.objectId) return listOf(emptyList())
20+
val result = mutableListOf<List<String>>()
21+
walk(bridge, root, targetObjectId, currentPath = mutableListOf(), visited = mutableSetOf(), result)
22+
return result
23+
}
24+
25+
private fun walk(
26+
bridge: ObjectsBridge,
27+
map: MapNode,
28+
targetObjectId: String,
29+
currentPath: MutableList<String>,
30+
visited: MutableSet<String>,
31+
result: MutableList<List<String>>,
32+
) {
33+
if (!visited.add(map.objectId)) return // cycle guard
34+
for ((key, data) in map.entries()) {
35+
val refId = data.objectId ?: continue
36+
if (refId == targetObjectId) {
37+
result.add(currentPath + key)
38+
continue
39+
}
40+
val refNode = bridge.getNode(refId)
41+
if (refNode is MapNode) {
42+
currentPath.add(key)
43+
walk(bridge, refNode, targetObjectId, currentPath, visited, result)
44+
currentPath.removeAt(currentPath.size - 1)
45+
}
46+
}
47+
visited.remove(map.objectId)
48+
}
49+
}

0 commit comments

Comments
 (0)