diff --git a/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/SleepingBarber.kt b/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/SleepingBarber.kt index 9e8e5d5..9c8197c 100644 --- a/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/SleepingBarber.kt +++ b/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/SleepingBarber.kt @@ -1,6 +1,5 @@ package ac.at.uibk.dps.dapr.barber -import ac.at.uibk.dps.dapr.barber.barber.BarberActor import ac.at.uibk.dps.dapr.barber.barber.BarberActorImpl import ac.at.uibk.dps.dapr.barber.customer.CustomerActor import ac.at.uibk.dps.dapr.barber.customer.CustomerActorImpl @@ -20,7 +19,6 @@ import io.micrometer.core.instrument.util.HierarchicalNameMapper import java.nio.file.Files import java.nio.file.Paths import java.util.concurrent.TimeUnit -import kotlin.apply import org.springframework.boot.ApplicationArguments import org.springframework.boot.ApplicationRunner import org.springframework.boot.autoconfigure.SpringBootApplication @@ -61,7 +59,7 @@ class SleepingBarber { fun main(args: Array) { val role = System.getenv("ROLE") ?: "customer" - if (role == "waiting_room") + if (role == "waitingRoom") ActorRuntime.getInstance().registerActor(WaitingRoomActorImpl::class.java) if (role == "barber") ActorRuntime.getInstance().registerActor(BarberActorImpl::class.java) if (role == "customer") ActorRuntime.getInstance().registerActor(CustomerActorImpl::class.java) @@ -77,8 +75,6 @@ class AutoStarter : ApplicationRunner { if (role == "customer") { val id = System.getenv("CUSTOMER_ID") ActorProxyBuilder(CustomerActor::class.java, client).build(ActorId(id)).request() - } else if (role == "barber") { - ActorProxyBuilder(BarberActor::class.java, client).build(ActorId("barber")).sleeping() } } } diff --git a/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/barber/BarberActor.kt b/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/barber/BarberActor.kt index eb6e2f0..cc9ea9e 100644 --- a/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/barber/BarberActor.kt +++ b/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/barber/BarberActor.kt @@ -5,7 +5,5 @@ import io.dapr.actors.ActorType @ActorType(name = "BarberActor") interface BarberActor { - @ActorMethod(name = "sleeping") fun sleeping() - @ActorMethod(name = "cutting") fun cutting(data: Map) } diff --git a/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/barber/BarberActorImpl.kt b/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/barber/BarberActorImpl.kt index 311fddd..c562dae 100644 --- a/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/barber/BarberActorImpl.kt +++ b/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/barber/BarberActorImpl.kt @@ -1,12 +1,10 @@ package ac.at.uibk.dps.dapr.barber.barber -import ac.at.uibk.dps.dapr.barber.SleepingBarber.Companion.metricsRegistry +import ac.at.uibk.dps.dapr.barber.SleepingBarber import io.dapr.actors.ActorId import io.dapr.actors.runtime.AbstractActor import io.dapr.actors.runtime.ActorRuntimeContext -import io.dapr.client.DaprClient import io.dapr.client.DaprClientBuilder -import java.lang.Thread.sleep import java.security.SecureRandom import java.util.concurrent.TimeUnit import kotlin.random.Random @@ -14,7 +12,7 @@ import kotlin.time.Clock class BarberActorImpl(runtimeContext: ActorRuntimeContext, id: ActorId) : AbstractActor(runtimeContext, id), BarberActor { - val client: DaprClient? = DaprClientBuilder().build() + val client = DaprClientBuilder().build() private val seedGenerator = SecureRandom() @@ -25,17 +23,22 @@ class BarberActorImpl(runtimeContext: ActorRuntimeContext, id: } } - override fun sleeping() { - client!!.publishEvent("pubsub", "ready", getMap()).subscribe() - } + private val metricsRegistry = SleepingBarber.metricsRegistry override fun cutting(data: Map) { measureEventTime(data) val customer = data["id"].toString().toInt() - client!!.publishEvent("pubsub", "comeIn", getMap(customer)).subscribe() - sleep(randomAround(10, 2).toLong()) + + client.publishEvent("pubsub", "comeIn", getMap(customer)).subscribe() + + Thread.sleep(randomAround(10, 2).toLong()) + + `continue`(customer) + } + + private fun `continue`(customer: Int) { client.publishEvent("pubsub", "done", getMap(customer)).subscribe() - sleeping() + client.publishEvent("pubsub", "ready", getMap()).subscribe() } fun randomAround(base: Int, delta: Int): Int { @@ -62,6 +65,6 @@ class BarberActorImpl(runtimeContext: ActorRuntimeContext, id: val deltaNanos = (nowNanos - data["time"] as Long).coerceAtLeast(0L) - metricsRegistry.timer("event.latency")!!.update((deltaNanos), TimeUnit.NANOSECONDS) + metricsRegistry.timer("event.latency").update((deltaNanos), TimeUnit.NANOSECONDS) } } diff --git a/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/barber/BarberPubSub.kt b/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/barber/BarberPubSub.kt index c444c2a..a60a8e9 100644 --- a/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/barber/BarberPubSub.kt +++ b/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/barber/BarberPubSub.kt @@ -13,12 +13,12 @@ import org.springframework.web.bind.annotation.RestController @RestController @ConditionalOnProperty("app.role", havingValue = "barber") class BarberPubSub { - val barberActor: BarberActor? = + val barberActor = ActorProxyBuilder(BarberActor::class.java, ActorClient()).build(ActorId("barber")) - @Topic(name = "cutting", pubsubName = "pubsub") - @PostMapping("/cutting") - fun cuttingSubscriber(@RequestBody event: CloudEvent>) { - barberActor!!.cutting(event.data) + @Topic(name = "sit", pubsubName = "pubsub") + @PostMapping("/sit") + fun sittingSubscriber(@RequestBody event: CloudEvent>) { + barberActor.cutting(event.data) } } diff --git a/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/customer/CustomerActorImpl.kt b/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/customer/CustomerActorImpl.kt index 566323f..8c88d78 100644 --- a/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/customer/CustomerActorImpl.kt +++ b/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/customer/CustomerActorImpl.kt @@ -4,7 +4,6 @@ import ac.at.uibk.dps.dapr.barber.SleepingBarber import io.dapr.actors.ActorId import io.dapr.actors.runtime.AbstractActor import io.dapr.actors.runtime.ActorRuntimeContext -import io.dapr.client.DaprClient import io.dapr.client.DaprClientBuilder import java.security.SecureRandom import java.util.concurrent.TimeUnit @@ -13,9 +12,8 @@ import kotlin.time.Clock class CustomerActorImpl(runtimeContext: ActorRuntimeContext, id: ActorId) : AbstractActor(runtimeContext, id), CustomerActor { - val metricsRegistry = SleepingBarber.metricsRegistry - val client: DaprClient? = DaprClientBuilder().build() + val client = DaprClientBuilder().build() private val seedGenerator = SecureRandom() private val threadRng = @@ -24,11 +22,12 @@ class CustomerActorImpl(runtimeContext: ActorRuntimeContext, return Random(seedGenerator.nextLong()) } } + private val metricsRegistry = SleepingBarber.metricsRegistry var count = 0 override fun request() { - client!!.publishEvent("pubsub", "enter", getMap(id.toString().toInt())).subscribe() + client.publishEvent("pubsub", "enter", getMap(id.toString().toInt())).subscribe() } override fun full(data: Map) { @@ -60,9 +59,7 @@ class CustomerActorImpl(runtimeContext: ActorRuntimeContext, val deltaNanos = (nowNanos - data["time"] as Long).coerceAtLeast(0L) - SleepingBarber.Companion.metricsRegistry - .timer("event.latency")!! - .update((deltaNanos), TimeUnit.NANOSECONDS) + metricsRegistry.timer("event.latency").update((deltaNanos), TimeUnit.NANOSECONDS) } private fun randomAround(base: Int, delta: Int): Int { diff --git a/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/customer/CustomerPubSub.kt b/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/customer/CustomerPubSub.kt index f8e8496..899b95d 100644 --- a/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/customer/CustomerPubSub.kt +++ b/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/customer/CustomerPubSub.kt @@ -15,14 +15,14 @@ import org.springframework.web.bind.annotation.RestController class CustomerPubSub { val id = System.getenv("CUSTOMER_ID")?.toInt() ?: 0 - val customerProxy: CustomerActor? = + val customerProxy = ActorProxyBuilder(CustomerActor::class.java, ActorClient()).build(ActorId(id.toString())) @Topic(name = "full", pubsubName = "pubsub") @PostMapping("/full") fun fullSubscriber(@RequestBody event: CloudEvent>) { if (event.data["id"] == id) { - customerProxy!!.full(event.data) + customerProxy.full(event.data) } } @@ -30,7 +30,7 @@ class CustomerPubSub { @PostMapping("/comeIn") fun comeInSubscriber(@RequestBody event: CloudEvent>) { if (event.data["id"] == id) { - customerProxy!!.comeIn(event.data) + customerProxy.comeIn(event.data) } } @@ -38,7 +38,7 @@ class CustomerPubSub { @PostMapping("/done") fun doneSubscriber(@RequestBody event: CloudEvent>) { if (event.data["id"] == id) { - customerProxy!!.done(event.data) + customerProxy.done(event.data) } } } diff --git a/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/waitingroom/WaitingRoomActorImpl.kt b/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/waitingroom/WaitingRoomActorImpl.kt index 09483f7..04da4fa 100644 --- a/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/waitingroom/WaitingRoomActorImpl.kt +++ b/sleepingBarber/src/main/kotlin/ac/at/uibk/dps/dapr/barber/waitingroom/WaitingRoomActorImpl.kt @@ -4,37 +4,35 @@ import ac.at.uibk.dps.dapr.barber.SleepingBarber.Companion.metricsRegistry import io.dapr.actors.ActorId import io.dapr.actors.runtime.AbstractActor import io.dapr.actors.runtime.ActorRuntimeContext -import io.dapr.client.DaprClient import io.dapr.client.DaprClientBuilder import java.util.concurrent.TimeUnit import kotlin.time.Clock class WaitingRoomActorImpl(runtimeContext: ActorRuntimeContext, id: ActorId) : AbstractActor(runtimeContext, id), WaitingRoomActor { - val client: DaprClient? = DaprClientBuilder().build() + val client = DaprClientBuilder().build() val waiting: ArrayList = arrayListOf() - - var barber = "sleeping" + var isBarberSleeping = true override fun enter(data: Map) { val now = Clock.System.now() val nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond val deltaNanos = (nowNanos - data["time"] as Long).coerceAtLeast(0L) - metricsRegistry.timer("event.latency")!!.update((deltaNanos), TimeUnit.NANOSECONDS) + + metricsRegistry.timer("event.latency").update((deltaNanos), TimeUnit.NANOSECONDS) + val customer = data["id"].toString().toInt() - if (waiting.size == 3) { - client!!.publishEvent("pubsub", "full", getMap(customer)).subscribe() - } else if (barber == "sleeping" && waiting.isEmpty()) { - client!!.publishEvent("pubsub", "cutting", getMap(customer)).subscribe() - barber = "busy" - } else if (barber == "sleeping" && !waiting.isEmpty()) { - waiting.add(customer) - client!!.publishEvent("pubsub", "cutting", getMap(waiting[0])).subscribe() - barber = "busy" - waiting.removeFirst() - } else if (barber == "busy" && waiting.size < 3) { + + if (waiting.size < 3) { waiting.add(customer) + if (isBarberSleeping) { + isBarberSleeping = false + client.publishEvent("pubsub", "sit", getMap(waiting[0])).subscribe() + waiting.remove(0) + } + } else { + client.publishEvent("pubsub", "full", getMap(customer)).subscribe() } } @@ -42,12 +40,14 @@ class WaitingRoomActorImpl(runtimeContext: ActorRuntimeContext>) { - waitingRoomActor!!.enter(event.data) + waitingRoomActor.enter(event.data) } @Topic(name = "ready", pubsubName = "pubsub") @PostMapping("/ready") fun readySubscriber(@RequestBody(required = true) event: CloudEvent>) { - waitingRoomActor!!.ready(event.data) + waitingRoomActor.ready(event.data) } }