Skip to content

Commit 2b89543

Browse files
authored
feat: aligning chameneos with csm version (#33)
1 parent 2d2b500 commit 2b89543

File tree

5 files changed

+86
-131
lines changed

5 files changed

+86
-131
lines changed

chameneos/src/main/kotlin/ac/at/uibk/dps/dapr/chameneos/chameneos/ChameneosActor.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import io.dapr.actors.ActorType
66
interface ChameneosActor {
77
fun request()
88

9-
fun meet(data: Map<String, Any>)
9+
fun matchMade(data: Map<String, Any>)
1010

1111
fun change(data: Map<String, Any>)
1212
}

chameneos/src/main/kotlin/ac/at/uibk/dps/dapr/chameneos/chameneos/ChameneosActorImpl.kt

Lines changed: 35 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -8,88 +8,65 @@ import io.dapr.client.DaprClientBuilder
88
import java.util.concurrent.TimeUnit
99
import kotlin.random.Random
1010
import kotlin.time.Clock
11-
import kotlin.time.measureTime
12-
import kotlin.time.toJavaDuration
1311

1412
class ChameneosActorImpl(
1513
runtimeContext: ActorRuntimeContext<ChameneosActorImpl>,
1614
val actorId: ActorId,
1715
) : AbstractActor(runtimeContext, actorId), ChameneosActor {
1816
val client = DaprClientBuilder().build()
1917

20-
var color = Random.nextInt(1, 4)
21-
2218
var metricRegistry = Chameneos.provideMetricRegistry()
23-
2419
var eventTimer = metricRegistry.timer("event.latency")
25-
var requestTimer = metricRegistry.timer("request.duration")
26-
var meetTimer = metricRegistry.timer("meet.duration")
27-
var changeTimer = metricRegistry.timer("change.duration")
2820

29-
override fun request() {
30-
val delta = measureTime {
31-
val now = Clock.System.now()
32-
val epochNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
21+
var color = Random.nextInt(1, 4)
3322

34-
client
35-
.publishEvent(
36-
"pubsub",
37-
"request",
38-
mapOf<String, Any>(
39-
"requestor" to actorId.toString(),
40-
"color" to color,
41-
"time" to epochNanos,
42-
),
43-
)
44-
.subscribe()
45-
}
46-
requestTimer.update(delta.toJavaDuration())
23+
override fun request() {
24+
val now = Clock.System.now()
25+
val epochNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
26+
27+
client
28+
.publishEvent(
29+
"pubsub",
30+
"requesting",
31+
mapOf<String, Any>("id" to actorId.toString(), "color" to color, "time" to epochNanos),
32+
)
33+
.subscribe()
4734
}
4835

49-
override fun meet(data: Map<String, Any>) {
50-
val delta = measureTime {
51-
val time = data["time"] as Long
52-
val partnerColor = data["color"] as Int
53-
val partner = data["partner"] as String
36+
override fun matchMade(data: Map<String, Any>) {
37+
val now = Clock.System.now()
38+
val nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
39+
val deltaNanos = (nowNanos - data["time"] as Long).coerceAtLeast(0L)
5440

55-
var now = Clock.System.now()
56-
var nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
41+
eventTimer.update(deltaNanos, TimeUnit.NANOSECONDS)
5742

58-
val deltaNanos = (nowNanos - time).coerceAtLeast(0L)
43+
val partnerColor = data["color"] as Int
5944

60-
eventTimer.update(deltaNanos, TimeUnit.NANOSECONDS)
45+
color = if (color == partnerColor) color else (color xor partnerColor)
6146

62-
color = if (color == partnerColor) color else (color xor partnerColor)
47+
client
48+
.publishEvent(
49+
"pubsub",
50+
"change",
51+
mapOf<String, Any>(
52+
"partner" to data["partner"].toString(),
53+
"color" to color,
54+
"time" to nowNanos,
55+
),
56+
)
57+
.subscribe()
6358

64-
now = Clock.System.now()
65-
nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
66-
client
67-
.publishEvent(
68-
"pubsub",
69-
"change",
70-
mapOf<String, Any>("partner" to partner, "color" to color, "time" to nowNanos),
71-
)
72-
.subscribe()
73-
}
74-
meetTimer.update(delta.toJavaDuration())
7559
request()
7660
}
7761

7862
override fun change(data: Map<String, Any>) {
79-
val delta = measureTime {
80-
val time = data["time"] as Long
81-
82-
val now = Clock.System.now()
83-
val nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
84-
85-
val deltaNanos = (nowNanos - time).coerceAtLeast(0L)
86-
87-
eventTimer.update(deltaNanos, TimeUnit.NANOSECONDS)
63+
val now = Clock.System.now()
64+
val nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
65+
val deltaNanos = (nowNanos - data["time"] as Long).coerceAtLeast(0L)
8866

89-
color = data["color"] as Int
90-
}
91-
changeTimer.update(delta.toJavaDuration())
67+
eventTimer.update(deltaNanos, TimeUnit.NANOSECONDS)
9268

69+
color = data["color"] as Int
9370
request()
9471
}
9572
}

chameneos/src/main/kotlin/ac/at/uibk/dps/dapr/chameneos/chameneos/ChameneosSubscriber.kt

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import io.dapr.Topic
44
import io.dapr.actors.ActorId
55
import io.dapr.actors.client.ActorClient
66
import io.dapr.actors.client.ActorProxyBuilder
7+
import io.dapr.client.domain.CloudEvent
78
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
89
import org.springframework.web.bind.annotation.PostMapping
910
import org.springframework.web.bind.annotation.RequestBody
@@ -14,29 +15,22 @@ import org.springframework.web.bind.annotation.RestController
1415
class ChameneosSubscriber {
1516

1617
private val id = System.getenv("CHAMENEOS_ID")
17-
private val actorClient = ActorClient()
1818
private val chameneosProxy =
19-
ActorProxyBuilder(ChameneosActor::class.java, actorClient).build(ActorId("$id"))
19+
ActorProxyBuilder(ChameneosActor::class.java, ActorClient()).build(ActorId("$id"))
2020

21-
@Topic(name = "meet", pubsubName = "pubsub")
22-
@PostMapping("/meet")
23-
fun handleMeet(@RequestBody body: Map<String, Any>) {
24-
val data = body["data"] as? Map<String, Any> ?: body
25-
val initiator = data["initiator"] as? String ?: return
26-
27-
if (initiator == id) {
28-
chameneosProxy.meet(data)
21+
@Topic(name = "matchMade", pubsubName = "pubsub")
22+
@PostMapping("/matchMade")
23+
fun matchMadeSubscriber(@RequestBody event: CloudEvent<Map<String, Any>>) {
24+
if (event.data["target"] == id) {
25+
chameneosProxy.matchMade(event.data)
2926
}
3027
}
3128

3229
@Topic(name = "change", pubsubName = "pubsub")
3330
@PostMapping("/change")
34-
fun handleChange(@RequestBody body: Map<String, Any>) {
35-
val data = body["data"] as? Map<String, Any> ?: body
36-
val partner = data["partner"] as? String ?: return
37-
38-
if (partner == id) {
39-
chameneosProxy.change(data)
31+
fun changeSubscriber(@RequestBody event: CloudEvent<Map<String, Any>>) {
32+
if (event.data["partner"] == id) {
33+
chameneosProxy.change(event.data)
4034
}
4135
}
4236
}

chameneos/src/main/kotlin/ac/at/uibk/dps/dapr/chameneos/mall/MallActorImpl.kt

Lines changed: 34 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -7,63 +7,49 @@ import io.dapr.actors.runtime.ActorRuntimeContext
77
import io.dapr.client.DaprClientBuilder
88
import java.util.concurrent.TimeUnit
99
import kotlin.time.Clock
10-
import kotlin.time.measureTime
11-
import kotlin.time.toJavaDuration
1210

1311
class MallActorImpl(runtimeContext: ActorRuntimeContext<MallActorImpl>, actorId: ActorId) :
1412
AbstractActor(runtimeContext, actorId), MallActor {
1513

1614
val client = DaprClientBuilder().build()
1715

18-
var count = 0
19-
var startTime = System.nanoTime()
20-
var waiting: MutableList<String> = mutableListOf()
21-
2216
var metricRegistry = Chameneos.provideMetricRegistry()
23-
var counter = metricRegistry.counter("mall.meetings")
24-
var eventTimer = metricRegistry.timer("event.latency")
25-
var requestingTimer = metricRegistry.timer("request.duration")
26-
27-
override fun requesting(data: Map<String, Any>) {
28-
val delta = measureTime {
29-
val time = data["time"] as Long
30-
val requestor = data["requestor"] as String
31-
val requestorColor = data["color"] as Int
32-
33-
var now = Clock.System.now()
34-
var nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
35-
36-
val deltaNanos = (nowNanos - time).coerceAtLeast(0L)
3717

38-
eventTimer.update(deltaNanos, TimeUnit.NANOSECONDS)
39-
40-
if (count == 0) startTime = System.nanoTime()
41-
42-
if (waiting.isEmpty()) {
43-
waiting.add(requestor)
44-
} else {
45-
count++
46-
counter.inc()
47-
48-
val waitingId = waiting.removeAt(0)
49-
50-
now = Clock.System.now()
51-
nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
18+
var count = 0
19+
var waiting: ArrayList<String> = arrayListOf()
5220

53-
client
54-
.publishEvent(
55-
"pubsub",
56-
"meet",
57-
mapOf<String, Any>(
58-
"initiator" to waitingId,
59-
"partner" to requestor,
60-
"color" to requestorColor,
61-
"time" to nowNanos,
62-
),
63-
)
64-
.subscribe()
65-
}
21+
override fun requesting(data: Map<String, Any>) {
22+
var now = Clock.System.now()
23+
var nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
24+
val deltaNanos = (nowNanos - data["time"] as Long).coerceAtLeast(0L)
25+
26+
metricRegistry.timer("event.latency").update(deltaNanos, TimeUnit.NANOSECONDS)
27+
28+
val id = data["id"] as String
29+
30+
if (waiting.isEmpty()) {
31+
waiting.add(id)
32+
} else {
33+
++count
34+
metricRegistry.counter("mall.meetings").inc()
35+
36+
now = Clock.System.now()
37+
nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
38+
39+
client
40+
.publishEvent(
41+
"pubsub",
42+
"matchMade",
43+
mapOf<String, Any>(
44+
"target" to waiting[0],
45+
"partner" to id,
46+
"color" to data["color"] as Int,
47+
"time" to nowNanos,
48+
),
49+
)
50+
.subscribe()
51+
52+
waiting.removeAt(0)
6653
}
67-
requestingTimer.update(delta.toJavaDuration())
6854
}
6955
}

chameneos/src/main/kotlin/ac/at/uibk/dps/dapr/chameneos/mall/MallSubscriber.kt

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import io.dapr.Topic
44
import io.dapr.actors.ActorId
55
import io.dapr.actors.client.ActorClient
66
import io.dapr.actors.client.ActorProxyBuilder
7+
import io.dapr.client.domain.CloudEvent
78
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
89
import org.springframework.web.bind.annotation.PostMapping
910
import org.springframework.web.bind.annotation.RequestBody
@@ -12,15 +13,12 @@ import org.springframework.web.bind.annotation.RestController
1213
@RestController
1314
@ConditionalOnProperty("app.role", havingValue = "mall")
1415
class MallSubscriber {
15-
private val actorClient = ActorClient()
1616
private val mallProxy =
17-
ActorProxyBuilder(MallActor::class.java, actorClient).build(ActorId("mall-1"))
17+
ActorProxyBuilder(MallActor::class.java, ActorClient()).build(ActorId("mall-1"))
1818

19-
@Topic(name = "request", pubsubName = "pubsub")
20-
@PostMapping("/request")
21-
fun handleRequest(@RequestBody body: Map<String, Any>) {
22-
val data = body["data"] as? Map<String, Any> ?: body
23-
24-
mallProxy.requesting(data)
19+
@Topic(name = "requesting", pubsubName = "pubsub")
20+
@PostMapping("/requesting")
21+
fun requestSubscriber(@RequestBody event: CloudEvent<Map<String, Any>>) {
22+
mallProxy.requesting(event.data)
2523
}
2624
}

0 commit comments

Comments
 (0)