Skip to content

Commit e76d157

Browse files
authored
fix: aligning dining philosopher implementation with csm version (#29)
* fix: aligning dining philosopher implementation with CSM version * style: fixing formatting for dining philosophers * style: switching dining philosophers to correct style * style: fixing imports for dining philosophers
1 parent 3066ffd commit e76d157

7 files changed

Lines changed: 68 additions & 144 deletions

File tree

diningPhilosophers/src/main/kotlin/ac/at/uibk/dps/dapr/philosophers/DiningPhilosophers.kt

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import io.dapr.actors.ActorId
99
import io.dapr.actors.client.ActorClient
1010
import io.dapr.actors.client.ActorProxyBuilder
1111
import io.dapr.actors.runtime.ActorRuntime
12-
import io.dapr.client.DaprClient
13-
import io.dapr.client.DaprClientBuilder
1412
import io.micrometer.core.instrument.Clock
1513
import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics
1614
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics
@@ -21,9 +19,6 @@ import io.micrometer.core.instrument.util.HierarchicalNameMapper
2119
import java.nio.file.Files
2220
import java.nio.file.Paths
2321
import java.util.concurrent.TimeUnit
24-
import kotlin.apply
25-
import org.slf4j.Logger
26-
import org.slf4j.LoggerFactory
2722
import org.springframework.boot.ApplicationArguments
2823
import org.springframework.boot.ApplicationRunner
2924
import org.springframework.boot.autoconfigure.SpringBootApplication
@@ -33,8 +28,6 @@ import org.springframework.stereotype.Component
3328
@SpringBootApplication
3429
class DiningPhilosophers {
3530
companion object {
36-
val logger: Logger = LoggerFactory.getLogger(DiningPhilosophers::class.java)
37-
val daprClient: DaprClient = DaprClientBuilder().build()
3831
val metricsDirectory = System.getenv("METRICS_DIRECTORY") ?: "metrics"
3932
val metricsPeriod = System.getenv("METRICS_PERIOD")?.toLong() ?: 1L
4033

@@ -70,10 +63,7 @@ class DiningPhilosophers {
7063
fun main(args: Array<String>) {
7164
val role = System.getenv("ROLE") ?: "philosopher"
7265
if (role == "arbitrator") {
73-
val numberOfPhilosophers = System.getenv("NUMBER_OF_PHILOSOPHERS").toInt()
74-
ActorRuntime.getInstance().registerActor(ArbitratorActorImpl::class.java) { runtime, id ->
75-
ArbitratorActorImpl(runtime, id, numberOfPhilosophers)
76-
}
66+
ActorRuntime.getInstance().registerActor(ArbitratorActorImpl::class.java)
7767
}
7868
if (role == "philosopher")
7969
ActorRuntime.getInstance().registerActor(PhilosopherActorImpl::class.java)
@@ -88,7 +78,6 @@ class AutoStarter : ApplicationRunner {
8878
if (role == "arbitrator") return
8979
val id = System.getenv("PHILOSOPHER_ID")
9080
val proxy = ActorProxyBuilder(PhilosopherActor::class.java, ActorClient()).build(ActorId(id))
91-
DiningPhilosophers.logger.info("philosopher requesting initial forks")
92-
proxy.start().subscribe()
81+
proxy.starting()
9382
}
9483
}

diningPhilosophers/src/main/kotlin/ac/at/uibk/dps/dapr/philosophers/arbitrator/ArbitratorActor.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@ package ac.at.uibk.dps.dapr.philosophers.arbitrator
22

33
import io.dapr.actors.ActorMethod
44
import io.dapr.actors.ActorType
5-
import reactor.core.publisher.Mono
65

76
@ActorType(name = "ArbitratorActor")
87
interface ArbitratorActor {
8+
@ActorMethod(name = "hungry") fun hungry(data: Map<String, Any>)
99

10-
@ActorMethod(name = "requestForks") fun requestForks(data: Map<String, Any>): Mono<Void>
11-
12-
@ActorMethod(name = "doneEating") fun doneEating(data: Map<String, Any>): Mono<Void>
10+
@ActorMethod(name = "release") fun release(data: Map<String, Any>)
1311
}
Lines changed: 33 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,71 +1,57 @@
11
package ac.at.uibk.dps.dapr.philosophers.arbitrator
22

33
import ac.at.uibk.dps.dapr.philosophers.DiningPhilosophers
4-
import ac.at.uibk.dps.dapr.philosophers.philosopher.PhilosopherPubSub
54
import io.dapr.actors.ActorId
65
import io.dapr.actors.runtime.AbstractActor
76
import io.dapr.actors.runtime.ActorRuntimeContext
7+
import io.dapr.client.DaprClient
8+
import io.dapr.client.DaprClientBuilder
89
import java.util.concurrent.TimeUnit
910
import kotlin.time.Clock
10-
import kotlin.time.measureTime
11-
import kotlin.time.toJavaDuration
12-
import reactor.core.publisher.Mono
13-
14-
class ArbitratorActorImpl(
15-
runtimeContext: ActorRuntimeContext<ArbitratorActorImpl>,
16-
id: ActorId,
17-
val numberOfPhilosophers: Int,
18-
) : AbstractActor(runtimeContext, id), ArbitratorActor {
19-
20-
companion object {
21-
const val EVENT_TIMER_NAME = "event.latency"
22-
const val REQUEST_DURATION = "request.duration"
23-
const val DONE_DURATION = "done.duration"
24-
}
2511

26-
private val forks = BooleanArray(numberOfPhilosophers) { true }
12+
class ArbitratorActorImpl(runtimeContext: ActorRuntimeContext<ArbitratorActorImpl>, id: ActorId) :
13+
AbstractActor(runtimeContext, id), ArbitratorActor {
14+
private val client: DaprClient = DaprClientBuilder().build()
15+
16+
private val forks = BooleanArray(6) { false }
2717

28-
private val waiting = BooleanArray(numberOfPhilosophers) { false }
18+
private val waiting = BooleanArray(6) { false }
2919

3020
val metricsRegistry = DiningPhilosophers.provideMetricRegistry()
3121

32-
private fun tryAssign(pos: Int) {
33-
if (forks[pos] && forks[next(pos)]) {
34-
forks[pos] = false
35-
forks[next(pos)] = false
36-
waiting[pos] = false
37-
PhilosopherPubSub.eat(DiningPhilosophers.daprClient, getMap(pos)).subscribe()
22+
override fun hungry(data: Map<String, Any>) {
23+
measureEventTime(data)
24+
val id = data["id"].toString().toInt()
25+
26+
if (!(forks[id] || forks[(id + 1) % 6])) {
27+
forks[id] = true
28+
forks[(id + 1) % 6] = true
29+
client.publishEvent("pubsub", "acquire", getMap(id)).subscribe()
3830
} else {
39-
waiting[pos] = true
31+
waiting[id] = true
4032
}
4133
}
4234

43-
override fun requestForks(data: Map<String, Any>): Mono<Void> {
35+
override fun release(data: Map<String, Any>) {
4436
measureEventTime(data)
45-
val duration = measureTime {
46-
val pos = data["id"].toString().toInt()
47-
if (pos in 0 until numberOfPhilosophers) tryAssign(pos)
48-
else DiningPhilosophers.logger.info("Invalid philosopher position: $pos")
49-
}
50-
metricsRegistry.timer(REQUEST_DURATION).update(duration.toJavaDuration())
51-
return Mono.empty()
52-
}
37+
val id = data["id"].toString().toInt()
5338

54-
override fun doneEating(data: Map<String, Any>): Mono<Void> {
55-
measureEventTime(data)
56-
val duration = measureTime {
57-
val pos = data["id"].toString().toInt()
58-
forks[pos] = true
59-
forks[next(pos)] = true
60-
listOf(next(pos), prev(pos), pos).filter { waiting[it] }.forEach { tryAssign(it) }
61-
}
62-
metricsRegistry.timer(DONE_DURATION).update(duration.toJavaDuration())
63-
return Mono.empty()
39+
forks[id] = false
40+
forks[(id + 1) % 6] = false
41+
42+
test((id + 6 - 1) % 6)
43+
test((id + 1) % 6)
6444
}
6545

66-
private fun next(i: Int) = (i + 1) % numberOfPhilosophers
46+
private fun test(nid: Int) {
47+
if (waiting[nid] && !(forks[nid] || forks[(nid + 1) % 6])) {
48+
waiting[nid] = false
49+
forks[nid] = true
50+
forks[(nid + 1) % 6] = true
6751

68-
private fun prev(i: Int) = (i - 1 + numberOfPhilosophers) % numberOfPhilosophers
52+
client.publishEvent("pubsub", "acquire", getMap(nid)).subscribe()
53+
}
54+
}
6955

7056
private fun getMap(i: Int): Map<String, Any> {
7157
val now = Clock.System.now()
@@ -80,6 +66,6 @@ class ArbitratorActorImpl(
8066

8167
val deltaNanos = (nowNanos - data["time"] as Long).coerceAtLeast(0L)
8268

83-
metricsRegistry.timer(EVENT_TIMER_NAME)!!.update((deltaNanos), TimeUnit.NANOSECONDS)
69+
metricsRegistry.timer("event.latency")!!.update((deltaNanos), TimeUnit.NANOSECONDS)
8470
}
8571
}

diningPhilosophers/src/main/kotlin/ac/at/uibk/dps/dapr/philosophers/arbitrator/ArbitratorPubSub.kt

Lines changed: 9 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,45 +4,27 @@ import io.dapr.Topic
44
import io.dapr.actors.ActorId
55
import io.dapr.actors.client.ActorClient
66
import io.dapr.actors.client.ActorProxyBuilder
7-
import io.dapr.client.DaprClient
87
import io.dapr.client.domain.CloudEvent
98
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
109
import org.springframework.web.bind.annotation.PostMapping
1110
import org.springframework.web.bind.annotation.RequestBody
1211
import org.springframework.web.bind.annotation.RestController
13-
import reactor.core.publisher.Mono
1412

1513
@RestController
1614
@ConditionalOnProperty("app.role", havingValue = "arbitrator")
1715
class ArbitratorPubSub {
18-
19-
companion object {
20-
const val REQUEST_FORKS_TOPIC_NAME = "requestForks"
21-
const val DONE_EATING_TOPIC_NAME = "doneEating"
22-
const val PUB_SUB_NAME = "pubsub"
23-
const val ARBITRATOR_NAME = "arbitrator"
24-
25-
fun requestForks(client: DaprClient, data: Map<String, Any>): Mono<Void> {
26-
return client.publishEvent(PUB_SUB_NAME, REQUEST_FORKS_TOPIC_NAME, data)
27-
}
28-
29-
fun doneEating(client: DaprClient, data: Map<String, Any>): Mono<Void> {
30-
return client.publishEvent(PUB_SUB_NAME, DONE_EATING_TOPIC_NAME, data)
31-
}
32-
}
33-
3416
val arbitratorProxy: ArbitratorActor =
35-
ActorProxyBuilder(ArbitratorActor::class.java, ActorClient()).build(ActorId(ARBITRATOR_NAME))
17+
ActorProxyBuilder(ArbitratorActor::class.java, ActorClient()).build(ActorId("arbitrator"))
3618

37-
@Topic(name = REQUEST_FORKS_TOPIC_NAME, pubsubName = PUB_SUB_NAME)
38-
@PostMapping("/requestForks")
39-
fun requestForksSubscriber(@RequestBody(required = true) event: CloudEvent<Map<String, Any>>) {
40-
arbitratorProxy.requestForks(event.data).subscribe()
19+
@Topic(name = "hungry", pubsubName = "pubsub")
20+
@PostMapping("/hungry")
21+
fun hungrySubscriber(@RequestBody(required = true) event: CloudEvent<Map<String, Any>>) {
22+
arbitratorProxy.hungry(event.data)
4123
}
4224

43-
@Topic(name = DONE_EATING_TOPIC_NAME, pubsubName = PUB_SUB_NAME)
44-
@PostMapping("/doneEating")
45-
fun doneEatingSubscriber(@RequestBody(required = true) event: CloudEvent<Map<String, Any>>) {
46-
arbitratorProxy.doneEating(event.data).subscribe()
25+
@Topic(name = "release", pubsubName = "pubsub")
26+
@PostMapping("/release")
27+
fun releaseSubscriber(@RequestBody(required = true) event: CloudEvent<Map<String, Any>>) {
28+
arbitratorProxy.release(event.data)
4729
}
4830
}

diningPhilosophers/src/main/kotlin/ac/at/uibk/dps/dapr/philosophers/philosopher/PhilosopherActor.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@ package ac.at.uibk.dps.dapr.philosophers.philosopher
22

33
import io.dapr.actors.ActorMethod
44
import io.dapr.actors.ActorType
5-
import reactor.core.publisher.Mono
65

76
@ActorType(name = "PhilosopherActor")
87
interface PhilosopherActor {
8+
@ActorMethod(name = "starting") fun starting()
99

10-
@ActorMethod(name = "eat") fun eat(data: Map<String, Any>): Mono<Void>
11-
12-
@ActorMethod(name = "start") fun start(): Mono<Void>
10+
@ActorMethod(name = "eating") fun eating(data: Map<String, Any>)
1311
}
Lines changed: 16 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,66 +1,50 @@
11
package ac.at.uibk.dps.dapr.philosophers.philosopher
22

33
import ac.at.uibk.dps.dapr.philosophers.DiningPhilosophers
4-
import ac.at.uibk.dps.dapr.philosophers.arbitrator.ArbitratorPubSub
54
import io.dapr.actors.ActorId
65
import io.dapr.actors.runtime.AbstractActor
76
import io.dapr.actors.runtime.ActorRuntimeContext
7+
import io.dapr.client.DaprClient
8+
import io.dapr.client.DaprClientBuilder
89
import java.security.SecureRandom
9-
import java.time.Duration
1010
import java.util.concurrent.TimeUnit
1111
import kotlin.random.Random
1212
import kotlin.time.Clock
13-
import kotlin.time.measureTime
14-
import kotlin.time.toJavaDuration
15-
import reactor.core.publisher.Mono
1613

1714
class PhilosopherActorImpl(runtimeContext: ActorRuntimeContext<PhilosopherActorImpl>, id: ActorId) :
1815
AbstractActor(runtimeContext, id), PhilosopherActor {
16+
private val client: DaprClient = DaprClientBuilder().build()
1917

20-
companion object {
21-
const val COUNTER_NAME = "philosopher.meals"
22-
const val EVENT_TIMER_NAME = "event.latency"
23-
const val EAT_DURATION_NAME = "eat.duration"
24-
}
25-
26-
val seedGenerator = SecureRandom()
27-
18+
private val seedGenerator = SecureRandom()
2819
private val threadRng =
2920
object : ThreadLocal<Random>() {
3021
override fun initialValue(): Random {
3122
return Random(seedGenerator.nextLong())
3223
}
3324
}
3425

35-
var completedRounds: Int = 0
26+
private val metricsRegistry = DiningPhilosophers.provideMetricRegistry()
3627

37-
val metricsRegistry = DiningPhilosophers.provideMetricRegistry()
28+
private var meals: Int = 0
3829

39-
override fun start(): Mono<Void> {
40-
return ArbitratorPubSub.requestForks(DiningPhilosophers.daprClient, getMap())
30+
override fun starting() {
31+
client.publishEvent("pubsub", "hungry", getMap()).subscribe()
4132
}
4233

43-
override fun eat(data: Map<String, Any>): Mono<Void> {
34+
override fun eating(data: Map<String, Any>) {
4435
val now = Clock.System.now()
4536
val nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
46-
4737
val deltaNanos = (nowNanos - data["time"] as Long).coerceAtLeast(0L)
4838

49-
metricsRegistry.timer(EVENT_TIMER_NAME)!!.update((deltaNanos), TimeUnit.NANOSECONDS)
39+
metricsRegistry.timer("event.latency")!!.update((deltaNanos), TimeUnit.NANOSECONDS)
5040

51-
val delta = measureTime {
52-
completedRounds++
53-
metricsRegistry.counter(COUNTER_NAME).inc(1L)
41+
Thread.sleep(randomAround(10, 2).toLong())
5442

55-
val delay =
56-
Mono.delay(Duration.ofMillis(randomAround(10, 2).toLong())).flatMap {
57-
ArbitratorPubSub.doneEating(DiningPhilosophers.daprClient, getMap())
58-
}
59-
delay.then(ArbitratorPubSub.requestForks(DiningPhilosophers.daprClient, getMap())).subscribe()
60-
}
43+
++meals
44+
metricsRegistry.counter("philosopher.meals").inc(1L)
6145

62-
metricsRegistry.timer(EAT_DURATION_NAME).update(delta.toJavaDuration())
63-
return Mono.empty()
46+
client.publishEvent("pubsub", "release", getMap()).subscribe()
47+
client.publishEvent("pubsub", "hungry", getMap()).subscribe()
6448
}
6549

6650
private fun getMap(): Map<String, Any> {
@@ -71,7 +55,6 @@ class PhilosopherActorImpl(runtimeContext: ActorRuntimeContext<PhilosopherActorI
7155
}
7256

7357
fun randomAround(base: Int, delta: Int): Int {
74-
val rng = threadRng.get()
75-
return (base - delta..base + delta).random(rng)
58+
return (base - delta..base + delta).random(threadRng.get())
7659
}
7760
}

diningPhilosophers/src/main/kotlin/ac/at/uibk/dps/dapr/philosophers/philosopher/PhilosopherPubSub.kt

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,37 +4,25 @@ import io.dapr.Topic
44
import io.dapr.actors.ActorId
55
import io.dapr.actors.client.ActorClient
66
import io.dapr.actors.client.ActorProxyBuilder
7-
import io.dapr.client.DaprClient
87
import io.dapr.client.domain.CloudEvent
98
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
109
import org.springframework.web.bind.annotation.PostMapping
1110
import org.springframework.web.bind.annotation.RequestBody
1211
import org.springframework.web.bind.annotation.RestController
13-
import reactor.core.publisher.Mono
1412

1513
@RestController
1614
@ConditionalOnProperty("app.role", havingValue = "philosopher")
1715
class PhilosopherPubSub {
18-
19-
companion object {
20-
const val EAT_TOPIC_NAME = "eat"
21-
const val PUB_SUB_NAME = "pubsub"
22-
23-
fun eat(client: DaprClient, data: Map<String, Any>): Mono<Void> {
24-
return client.publishEvent(PUB_SUB_NAME, EAT_TOPIC_NAME, data)
25-
}
26-
}
27-
2816
private val id = System.getenv("PHILOSOPHER_ID")
2917

3018
private val philosopherProxy =
3119
ActorProxyBuilder(PhilosopherActor::class.java, ActorClient()).build(ActorId("$id"))
3220

33-
@Topic(name = EAT_TOPIC_NAME, pubsubName = PUB_SUB_NAME)
34-
@PostMapping("/eat")
35-
fun eatSubscriber(@RequestBody event: CloudEvent<Map<String, Any>>) {
21+
@Topic(name = "acquire", pubsubName = "pubsub")
22+
@PostMapping("/acquire")
23+
fun acquireSubscriber(@RequestBody event: CloudEvent<Map<String, Any>>) {
3624
if (event.data["id"] as Int == id.toInt()) {
37-
philosopherProxy.eat(event.data).subscribe()
25+
philosopherProxy.eating(event.data)
3826
}
3927
}
4028
}

0 commit comments

Comments
 (0)