Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package ac.at.uibk.dps.dapr.barber

import ac.at.uibk.dps.dapr.barber.barber.BarberActor
import ac.at.uibk.dps.dapr.barber.barber.BarberActorImpl
import ac.at.uibk.dps.dapr.barber.customer.CustomerActor
import ac.at.uibk.dps.dapr.barber.customer.CustomerActorImpl
Expand All @@ -20,7 +19,6 @@ import io.micrometer.core.instrument.util.HierarchicalNameMapper
import java.nio.file.Files
import java.nio.file.Paths
import java.util.concurrent.TimeUnit
import kotlin.apply
import org.springframework.boot.ApplicationArguments
import org.springframework.boot.ApplicationRunner
import org.springframework.boot.autoconfigure.SpringBootApplication
Expand Down Expand Up @@ -61,7 +59,7 @@ class SleepingBarber {

fun main(args: Array<String>) {
val role = System.getenv("ROLE") ?: "customer"
if (role == "waiting_room")
if (role == "waitingRoom")
ActorRuntime.getInstance().registerActor(WaitingRoomActorImpl::class.java)
if (role == "barber") ActorRuntime.getInstance().registerActor(BarberActorImpl::class.java)
if (role == "customer") ActorRuntime.getInstance().registerActor(CustomerActorImpl::class.java)
Expand All @@ -77,8 +75,6 @@ class AutoStarter : ApplicationRunner {
if (role == "customer") {
val id = System.getenv("CUSTOMER_ID")
ActorProxyBuilder(CustomerActor::class.java, client).build(ActorId(id)).request()
} else if (role == "barber") {
ActorProxyBuilder(BarberActor::class.java, client).build(ActorId("barber")).sleeping()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,5 @@ import io.dapr.actors.ActorType

@ActorType(name = "BarberActor")
interface BarberActor {
@ActorMethod(name = "sleeping") fun sleeping()

@ActorMethod(name = "cutting") fun cutting(data: Map<String, Any>)
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
package ac.at.uibk.dps.dapr.barber.barber

import ac.at.uibk.dps.dapr.barber.SleepingBarber.Companion.metricsRegistry
import ac.at.uibk.dps.dapr.barber.SleepingBarber
import io.dapr.actors.ActorId
import io.dapr.actors.runtime.AbstractActor
import io.dapr.actors.runtime.ActorRuntimeContext
import io.dapr.client.DaprClient
import io.dapr.client.DaprClientBuilder
import java.lang.Thread.sleep
import java.security.SecureRandom
import java.util.concurrent.TimeUnit
import kotlin.random.Random
import kotlin.time.Clock

class BarberActorImpl(runtimeContext: ActorRuntimeContext<BarberActorImpl>, id: ActorId) :
AbstractActor(runtimeContext, id), BarberActor {
val client: DaprClient? = DaprClientBuilder().build()
val client = DaprClientBuilder().build()

private val seedGenerator = SecureRandom()

Expand All @@ -25,17 +23,22 @@ class BarberActorImpl(runtimeContext: ActorRuntimeContext<BarberActorImpl>, id:
}
}

override fun sleeping() {
client!!.publishEvent("pubsub", "ready", getMap()).subscribe()
}
private val metricsRegistry = SleepingBarber.metricsRegistry

override fun cutting(data: Map<String, Any>) {
measureEventTime(data)
val customer = data["id"].toString().toInt()
client!!.publishEvent("pubsub", "comeIn", getMap(customer)).subscribe()
sleep(randomAround(10, 2).toLong())

client.publishEvent("pubsub", "comeIn", getMap(customer)).subscribe()

Thread.sleep(randomAround(10, 2).toLong())

`continue`(customer)
}

private fun `continue`(customer: Int) {
client.publishEvent("pubsub", "done", getMap(customer)).subscribe()
sleeping()
client.publishEvent("pubsub", "ready", getMap()).subscribe()
}

fun randomAround(base: Int, delta: Int): Int {
Expand All @@ -62,6 +65,6 @@ class BarberActorImpl(runtimeContext: ActorRuntimeContext<BarberActorImpl>, id:

val deltaNanos = (nowNanos - data["time"] as Long).coerceAtLeast(0L)

metricsRegistry.timer("event.latency")!!.update((deltaNanos), TimeUnit.NANOSECONDS)
metricsRegistry.timer("event.latency").update((deltaNanos), TimeUnit.NANOSECONDS)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import org.springframework.web.bind.annotation.RestController
@RestController
@ConditionalOnProperty("app.role", havingValue = "barber")
class BarberPubSub {
val barberActor: BarberActor? =
val barberActor =
ActorProxyBuilder(BarberActor::class.java, ActorClient()).build(ActorId("barber"))

@Topic(name = "cutting", pubsubName = "pubsub")
@PostMapping("/cutting")
fun cuttingSubscriber(@RequestBody event: CloudEvent<Map<String, Any>>) {
barberActor!!.cutting(event.data)
@Topic(name = "sit", pubsubName = "pubsub")
@PostMapping("/sit")
fun sittingSubscriber(@RequestBody event: CloudEvent<Map<String, Any>>) {
barberActor.cutting(event.data)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import ac.at.uibk.dps.dapr.barber.SleepingBarber
import io.dapr.actors.ActorId
import io.dapr.actors.runtime.AbstractActor
import io.dapr.actors.runtime.ActorRuntimeContext
import io.dapr.client.DaprClient
import io.dapr.client.DaprClientBuilder
import java.security.SecureRandom
import java.util.concurrent.TimeUnit
Expand All @@ -13,9 +12,8 @@ import kotlin.time.Clock

class CustomerActorImpl(runtimeContext: ActorRuntimeContext<CustomerActorImpl>, id: ActorId) :
AbstractActor(runtimeContext, id), CustomerActor {
val metricsRegistry = SleepingBarber.metricsRegistry

val client: DaprClient? = DaprClientBuilder().build()
val client = DaprClientBuilder().build()

private val seedGenerator = SecureRandom()
private val threadRng =
Expand All @@ -24,11 +22,12 @@ class CustomerActorImpl(runtimeContext: ActorRuntimeContext<CustomerActorImpl>,
return Random(seedGenerator.nextLong())
}
}
private val metricsRegistry = SleepingBarber.metricsRegistry

var count = 0

override fun request() {
client!!.publishEvent("pubsub", "enter", getMap(id.toString().toInt())).subscribe()
client.publishEvent("pubsub", "enter", getMap(id.toString().toInt())).subscribe()
}

override fun full(data: Map<String, Any>) {
Expand Down Expand Up @@ -60,9 +59,7 @@ class CustomerActorImpl(runtimeContext: ActorRuntimeContext<CustomerActorImpl>,

val deltaNanos = (nowNanos - data["time"] as Long).coerceAtLeast(0L)

SleepingBarber.Companion.metricsRegistry
.timer("event.latency")!!
.update((deltaNanos), TimeUnit.NANOSECONDS)
metricsRegistry.timer("event.latency").update((deltaNanos), TimeUnit.NANOSECONDS)
}

private fun randomAround(base: Int, delta: Int): Int {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,30 @@ import org.springframework.web.bind.annotation.RestController
class CustomerPubSub {
val id = System.getenv("CUSTOMER_ID")?.toInt() ?: 0

val customerProxy: CustomerActor? =
val customerProxy =
ActorProxyBuilder(CustomerActor::class.java, ActorClient()).build(ActorId(id.toString()))

@Topic(name = "full", pubsubName = "pubsub")
@PostMapping("/full")
fun fullSubscriber(@RequestBody event: CloudEvent<Map<String, Any>>) {
if (event.data["id"] == id) {
customerProxy!!.full(event.data)
customerProxy.full(event.data)
}
}

@Topic(name = "comeIn", pubsubName = "pubsub")
@PostMapping("/comeIn")
fun comeInSubscriber(@RequestBody event: CloudEvent<Map<String, Any>>) {
if (event.data["id"] == id) {
customerProxy!!.comeIn(event.data)
customerProxy.comeIn(event.data)
}
}

@Topic(name = "done", pubsubName = "pubsub")
@PostMapping("/done")
fun doneSubscriber(@RequestBody event: CloudEvent<Map<String, Any>>) {
if (event.data["id"] == id) {
customerProxy!!.done(event.data)
customerProxy.done(event.data)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,50 @@ import ac.at.uibk.dps.dapr.barber.SleepingBarber.Companion.metricsRegistry
import io.dapr.actors.ActorId
import io.dapr.actors.runtime.AbstractActor
import io.dapr.actors.runtime.ActorRuntimeContext
import io.dapr.client.DaprClient
import io.dapr.client.DaprClientBuilder
import java.util.concurrent.TimeUnit
import kotlin.time.Clock

class WaitingRoomActorImpl(runtimeContext: ActorRuntimeContext<WaitingRoomActorImpl>, id: ActorId) :
AbstractActor(runtimeContext, id), WaitingRoomActor {
val client: DaprClient? = DaprClientBuilder().build()
val client = DaprClientBuilder().build()

val waiting: ArrayList<Int> = arrayListOf()

var barber = "sleeping"
var isBarberSleeping = true

override fun enter(data: Map<String, Any>) {
val now = Clock.System.now()
val nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
val deltaNanos = (nowNanos - data["time"] as Long).coerceAtLeast(0L)
metricsRegistry.timer("event.latency")!!.update((deltaNanos), TimeUnit.NANOSECONDS)

metricsRegistry.timer("event.latency").update((deltaNanos), TimeUnit.NANOSECONDS)

val customer = data["id"].toString().toInt()
if (waiting.size == 3) {
client!!.publishEvent("pubsub", "full", getMap(customer)).subscribe()
} else if (barber == "sleeping" && waiting.isEmpty()) {
client!!.publishEvent("pubsub", "cutting", getMap(customer)).subscribe()
barber = "busy"
} else if (barber == "sleeping" && !waiting.isEmpty()) {
waiting.add(customer)
client!!.publishEvent("pubsub", "cutting", getMap(waiting[0])).subscribe()
barber = "busy"
waiting.removeFirst()
} else if (barber == "busy" && waiting.size < 3) {

if (waiting.size < 3) {
waiting.add(customer)
if (isBarberSleeping) {
isBarberSleeping = false
client.publishEvent("pubsub", "sit", getMap(waiting[0])).subscribe()
waiting.remove(0)
}
} else {
client.publishEvent("pubsub", "full", getMap(customer)).subscribe()
}
}

override fun ready(data: Map<String, Any>) {
val now = Clock.System.now()
val nowNanos = (now.epochSeconds * 1_000_000_000L) + now.nanosecondsOfSecond
val deltaNanos = (nowNanos - data["time"] as Long).coerceAtLeast(0L)
metricsRegistry.timer("event.latency")!!.update((deltaNanos), TimeUnit.NANOSECONDS)
barber = "sleeping"

metricsRegistry.timer("event.latency").update((deltaNanos), TimeUnit.NANOSECONDS)

isBarberSleeping = true
if (!waiting.isEmpty()) {
barber = "busy"
client!!.publishEvent("pubsub", "cutting", getMap(waiting[0])).subscribe()
waiting.removeFirst()
isBarberSleeping = false
client.publishEvent("pubsub", "sit", getMap(waiting[0])).subscribe()
waiting.remove(0)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import io.dapr.actors.ActorId
import io.dapr.actors.client.ActorClient
import io.dapr.actors.client.ActorProxyBuilder
import io.dapr.client.domain.CloudEvent
import kotlin.jvm.java
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
Expand All @@ -14,18 +13,18 @@ import org.springframework.web.bind.annotation.RestController
@RestController
@ConditionalOnProperty("app.role", havingValue = "waiting_room")
class WaitingRoomPubSub {
val waitingRoomActor: WaitingRoomActor? =
val waitingRoomActor =
ActorProxyBuilder(WaitingRoomActor::class.java, ActorClient()).build(ActorId("waitingRoom"))

@Topic(name = "enter", pubsubName = "pubsub")
@PostMapping("/enter")
fun enterSubscriber(@RequestBody(required = true) event: CloudEvent<Map<String, Any>>) {
waitingRoomActor!!.enter(event.data)
waitingRoomActor.enter(event.data)
}

@Topic(name = "ready", pubsubName = "pubsub")
@PostMapping("/ready")
fun readySubscriber(@RequestBody(required = true) event: CloudEvent<Map<String, Any>>) {
waitingRoomActor!!.ready(event.data)
waitingRoomActor.ready(event.data)
}
}
Loading