Skip to content

Commit 2d2b500

Browse files
authored
fix: aligning cigaretteSmokers example with csm version (#31)
* fix: aligning cigaretteSmokers example with csm version * fix: fixing issue in cigaretteSmokers
1 parent 640f32f commit 2d2b500

File tree

5 files changed

+53
-55
lines changed

5 files changed

+53
-55
lines changed

cigaretteSmokers/src/main/kotlin/ac/at/uibk/dps/dapr/cigarette/arbiter/ArbiterActorImpl.kt

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,48 +8,39 @@ import io.dapr.client.DaprClient
88
import io.dapr.client.DaprClientBuilder
99
import java.util.concurrent.TimeUnit
1010
import kotlin.time.Clock
11-
import kotlin.time.measureTime
12-
import kotlin.time.toJavaDuration
1311

1412
class ArbiterActorImpl(runtimeContext: ActorRuntimeContext<ArbiterActorImpl>, actorId: ActorId) :
1513
AbstractActor(runtimeContext, actorId), ArbiterActor {
1614

1715
val client: DaprClient = DaprClientBuilder().build()
1816

19-
val ingredients = listOf("0", "1", "2")
17+
val ingredients = listOf(0, 1, 2)
2018
var count = 0
2119

2220
var metricRegistry = CigaretteSmokers.provideMetricRegistry()
23-
var counter = metricRegistry.counter("arbiter.rounds")
24-
var eventTimer = metricRegistry.timer("event.latency")
25-
var provideTimer = metricRegistry.timer("provide.duration")
2621

2722
override fun provide(time: Long) {
28-
val delta = measureTime {
29-
var now = Clock.System.now()
30-
val nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
23+
var now = Clock.System.now()
24+
val nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
25+
val deltaNanos = (nowNanos - time).coerceAtLeast(0L)
3126

32-
val deltaNanos = (nowNanos - time).coerceAtLeast(0L)
27+
metricRegistry.timer("event.latency").update(deltaNanos, TimeUnit.NANOSECONDS)
3328

34-
eventTimer.update(deltaNanos, TimeUnit.NANOSECONDS)
29+
++count
30+
metricRegistry.counter("arbiter.rounds").inc()
3531

36-
count++
37-
counter.inc()
32+
val provide = ingredients.toMutableList()
33+
provide.remove(provide.random())
3834

39-
val provide = ingredients.toMutableList()
40-
provide.remove(provide.random())
35+
now = Clock.System.now()
36+
val epochNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
4137

42-
now = Clock.System.now()
43-
val epochNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
44-
45-
client
46-
.publishEvent(
47-
"pubsub",
48-
"provide",
49-
mutableMapOf("ingredients" to provide, "time" to epochNanos),
50-
)
51-
.subscribe()
52-
}
53-
provideTimer.update(delta.toJavaDuration())
38+
client
39+
.publishEvent(
40+
"pubsub",
41+
"provided",
42+
mutableMapOf("ingredients" to provide, "time" to epochNanos),
43+
)
44+
.subscribe()
5445
}
5546
}

cigaretteSmokers/src/main/kotlin/ac/at/uibk/dps/dapr/cigarette/arbiter/ArbiterSubscriber.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import io.dapr.Topic
44
import io.dapr.actors.ActorId
55
import io.dapr.actors.client.ActorClient
66
import io.dapr.actors.client.ActorProxyBuilder
7+
import io.dapr.client.domain.CloudEvent
78
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
89
import org.springframework.web.bind.annotation.PostMapping
910
import org.springframework.web.bind.annotation.RequestBody
@@ -16,9 +17,9 @@ class ArbiterSubscriber {
1617
private val arbiterProxy =
1718
ActorProxyBuilder(ArbiterActor::class.java, ActorClient()).build(ActorId("arbiter-1"))
1819

19-
@Topic(name = "done", pubsubName = "pubsub")
20-
@PostMapping("/done")
21-
fun handleDone(@RequestBody body: Map<String, Any>) {
22-
arbiterProxy.provide(body["data"] as Long)
20+
@Topic(name = "finish", pubsubName = "pubsub")
21+
@PostMapping("/finish")
22+
fun finishSubscriber(@RequestBody event: CloudEvent<Long>) {
23+
arbiterProxy.provide(event.data)
2324
}
2425
}

cigaretteSmokers/src/main/kotlin/ac/at/uibk/dps/dapr/cigarette/smoker/SmokerActor.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@ import io.dapr.actors.ActorType
44

55
@ActorType(name = "SmokerActor")
66
interface SmokerActor {
7-
fun smoke(data: Map<String, Any>)
7+
fun smoking(data: Map<String, Any>)
88
}

cigaretteSmokers/src/main/kotlin/ac/at/uibk/dps/dapr/cigarette/smoker/SmokerActorImpl.kt

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,41 +6,46 @@ import io.dapr.actors.runtime.AbstractActor
66
import io.dapr.actors.runtime.ActorRuntimeContext
77
import io.dapr.client.DaprClient
88
import io.dapr.client.DaprClientBuilder
9+
import java.security.SecureRandom
910
import java.util.concurrent.TimeUnit
11+
import kotlin.random.Random
1012
import kotlin.time.Clock
11-
import kotlin.time.measureTime
12-
import kotlin.time.toJavaDuration
1313

1414
class SmokerActorImpl(runtimeContext: ActorRuntimeContext<SmokerActorImpl>, val actorId: ActorId) :
1515
AbstractActor(runtimeContext, actorId), SmokerActor {
1616

1717
val client: DaprClient = DaprClientBuilder().build()
1818

1919
var metricRegistry = CigaretteSmokers.provideMetricRegistry()
20-
var eventTimer = metricRegistry.timer("event.latency")
21-
var smokeTimer = metricRegistry.timer("smoke.duration")
2220

23-
override fun smoke(data: Map<String, Any>) {
24-
val delta = measureTime {
25-
val ingredients = data["ingredients"] as List<String>
26-
val time = data["time"] as Long
21+
private val seedGenerator = SecureRandom()
22+
private val threadRng =
23+
object : ThreadLocal<Random>() {
24+
override fun initialValue(): Random {
25+
return Random(seedGenerator.nextLong())
26+
}
27+
}
2728

28-
if (!ingredients.contains(actorId.toString())) {
29-
var now = Clock.System.now()
30-
val nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
29+
override fun smoking(data: Map<String, Any>) {
30+
var now = Clock.System.now()
31+
val nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
32+
val deltaNanos = (nowNanos - data["time"] as Long).coerceAtLeast(0L)
3133

32-
val deltaNanos = (nowNanos - time).coerceAtLeast(0L)
34+
metricRegistry.timer("event.latency").update(deltaNanos, TimeUnit.NANOSECONDS)
3335

34-
eventTimer.update(deltaNanos, TimeUnit.NANOSECONDS)
36+
val ingredients = data["ingredients"] as List<*>
3537

36-
Thread.sleep(10)
38+
if (!ingredients.contains(actorId.toString().toInt())) {
39+
Thread.sleep(randomAround(10, 2).toLong())
3740

38-
now = Clock.System.now()
39-
val epochNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
41+
now = Clock.System.now()
42+
val epochNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
4043

41-
client.publishEvent("pubsub", "done", epochNanos).subscribe()
42-
}
44+
client.publishEvent("pubsub", "finish", epochNanos).subscribe()
4345
}
44-
smokeTimer.update(delta.toJavaDuration())
46+
}
47+
48+
fun randomAround(base: Int, delta: Int): Int {
49+
return (base - delta..base + delta).random(threadRng.get())
4550
}
4651
}

cigaretteSmokers/src/main/kotlin/ac/at/uibk/dps/dapr/cigarette/smoker/SmokerSubscriber.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import io.dapr.Topic
44
import io.dapr.actors.ActorId
55
import io.dapr.actors.client.ActorClient
66
import io.dapr.actors.client.ActorProxyBuilder
7+
import io.dapr.client.domain.CloudEvent
78
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
89
import org.springframework.web.bind.annotation.PostMapping
910
import org.springframework.web.bind.annotation.RequestBody
@@ -16,9 +17,9 @@ class SmokerSubscriber {
1617
private val smokerProxy =
1718
ActorProxyBuilder(SmokerActor::class.java, ActorClient()).build(ActorId("$id"))
1819

19-
@Topic(name = "provide", pubsubName = "pubsub")
20-
@PostMapping("/provide")
21-
fun handleProvide(@RequestBody body: Map<String, Any>) {
22-
smokerProxy.smoke(body["data"] as Map<String, Any>)
20+
@Topic(name = "provided", pubsubName = "pubsub")
21+
@PostMapping("/provided")
22+
fun provideSubscriber(@RequestBody event: CloudEvent<Map<String, Any>>) {
23+
smokerProxy.smoking(event.data)
2324
}
2425
}

0 commit comments

Comments
 (0)