Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def is_missing_output_ports(self):
This method is only used for ensuring correct region execution.
Some operators may have input port dependency relationships, for
which we currently use a two-phase region execution scheme.
(See `RegionExecutionCoordinator.scala` for details.)
(See `RegionExecutionManager.scala` for details.)
This logic will only be executed when the worker is part of an
`executingDependeePortPhase` region-execution phase.
We currently assume that in this phase the operator (worker) will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class Controller(
override def initState(): Unit = {
attachRuntimeServicesToCPState()
cp.workflowScheduler.updateSchedule(physicalPlan)
cp.workflowExecutionCoordinator.schedule = cp.workflowScheduler.getSchedule
cp.workflowExecutionManager.schedule = cp.workflowScheduler.getSchedule

val regions: List[(Long, List[String])] =
cp.workflowScheduler.getSchedule.getRegions.map { region =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.texera.amber.engine.architecture.common.{
}
import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
import org.apache.texera.amber.engine.architecture.logreplay.ReplayLogManager
import org.apache.texera.amber.engine.architecture.scheduling.WorkflowExecutionCoordinator
import org.apache.texera.amber.engine.architecture.scheduling.WorkflowExecutionManager
import org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.MainThreadDelegateMessage
import org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessage

Expand All @@ -43,7 +43,7 @@ class ControllerProcessor(
val workflowExecution: WorkflowExecution = WorkflowExecution()
val workflowScheduler: WorkflowScheduler =
new WorkflowScheduler(workflowContext, actorId)
val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new WorkflowExecutionCoordinator(
val workflowExecutionManager: WorkflowExecutionManager = new WorkflowExecutionManager(
workflowExecution,
controllerConfig,
asyncRPCClient
Expand Down Expand Up @@ -73,7 +73,7 @@ class ControllerProcessor(

def setupActorRefService(actorRefService: PekkoActorRefMappingService): Unit = {
this.actorRefService = actorRefService
this.workflowExecutionCoordinator.setupActorRefService(this.actorRefService)
this.workflowExecutionManager.setupActorRefService(this.actorRefService)
}

@transient var logManager: ReplayLogManager = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ trait JumpToOperatorRegionHandler {
msg: JumpToOperatorRegionRequest,
ctx: AsyncRPCContext
): Future[EmptyReturn] = {
val coordinator = cp.workflowExecutionCoordinator
coordinator.schedule.levelSets
val manager = cp.workflowExecutionManager
manager.schedule.levelSets
.collectFirst {
case (level, regions)
if regions.exists(_.getOperators.exists(_.id.logicalOpId == msg.targetOperatorId)) =>
level
}
.foreach { targetLevel =>
coordinator.schedule = coordinator.schedule.copy(initialLevelIndex = targetLevel)
manager.schedule = manager.schedule.copy(initialLevelIndex = targetLevel)
}
EmptyReturn()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ trait LinkWorkersHandler {
this: ControllerAsyncRPCHandlerInitializer =>

override def linkWorkers(msg: LinkWorkersRequest, ctx: AsyncRPCContext): Future[EmptyReturn] = {
val region = cp.workflowExecutionCoordinator.getRegionOfLink(msg.link)
val region = cp.workflowExecutionManager.getRegionOfLink(msg.link)
val resourceConfig = region.resourceConfig.get
val linkConfig = resourceConfig.linkConfigs(msg.link)
val linkExecution =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ trait PortCompletedHandler {
msg.portId,
input = msg.input
)
cp.workflowExecutionCoordinator.getRegionOfPortId(globalPortId) match {
cp.workflowExecutionManager.getRegionOfPortId(globalPortId) match {
case Some(region) =>
val regionExecution = cp.workflowExecution.getRegionExecution(region.id)
val operatorExecution =
Expand All @@ -81,8 +81,8 @@ trait PortCompletedHandler {
else operatorExecution.isOutputPortCompleted(msg.portId)

if (isPortCompleted) {
cp.workflowExecutionCoordinator
.coordinateRegionExecutors(cp.actorService)
cp.workflowExecutionManager
.advanceRegionExecutions(cp.actorService)
// Since this message is sent from a worker, any exception from the above code will be returned to that worker.
// Additionally, a fatal error is sent to the client, indicating that the region cannot be scheduled.
.onFailure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ trait ReconfigurationHandler {
}
val futures = mutable.ArrayBuffer[Future[_]]()
val friesComponents =
FriesReconfigurationAlgorithm.getReconfigurations(cp.workflowExecutionCoordinator, msg)
FriesReconfigurationAlgorithm.getReconfigurations(cp.workflowExecutionManager, msg)
friesComponents.foreach { friesComponent =>
if (friesComponent.scope.size == 1) {
val updateExecutorRequest = friesComponent.reconfigurations.head
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ trait StartWorkflowHandler {
ctx: AsyncRPCContext
): Future[StartWorkflowResponse] = {
if (cp.workflowExecution.getState.isUninitialized) {
cp.workflowExecutionCoordinator
.coordinateRegionExecutors(cp.actorService)
cp.workflowExecutionManager
.advanceRegionExecutions(cp.actorService)
.map(_ => {
cp.controllerTimerService.enableStatusUpdate()
cp.controllerTimerService.enableRuntimeStatisticsCollection()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ trait WorkerExecutionCompletedHandler {
val isWorkflowTerminal =
cp.workflowExecution.isCompleted &&
!cp.workflowScheduler.hasPendingRegions &&
!cp.workflowExecutionCoordinator.hasUnfinishedRegionCoordinators
!cp.workflowExecutionManager.hasUnfinishedRegionManagers
if (isWorkflowTerminal) {
// after query result come back: send completed event, cleanup ,and kill workflow
sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ class OutputManager(

/**
* This method is only used for ensuring correct region execution. Some operators may have input port dependency
* relationships, for which we currently use a two-phase region execution scheme. (See `RegionExecutionCoordinator`
* relationships, for which we currently use a two-phase region execution scheme. (See `RegionExecutionManager`
* for details.)
* This logic will only be executed when the worker is part of an `executingDependeePort` region-execution phase.
* We currently assume that in this phase the operator (worker) will not output any data, hence no output ports.
Expand Down Expand Up @@ -322,7 +322,7 @@ class OutputManager(
writerThread.start()

// The state document is provisioned alongside the result document
// by RegionExecutionCoordinator, so it is always present.
// by RegionExecutionManager, so it is always present.
val stateWriter = DocumentFactory
.openDocument(VFSURIFactory.stateURI(portBaseURI))
._1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.duration.{Duration => ScalaDuration}

object RegionExecutionCoordinator {
object RegionExecutionManager {

// Max EndWorker retries before termination fails. ~30s at DefaultKillRetryDelay (200ms).
private[scheduling] val DefaultMaxTerminationAttempts: Int = 150
Expand Down Expand Up @@ -99,16 +99,16 @@ object RegionExecutionCoordinator {
*
* 3. `Completed`
*/
class RegionExecutionCoordinator(
class RegionExecutionManager(
region: Region,
isRestart: Boolean,
workflowExecution: WorkflowExecution,
asyncRPCClient: AsyncRPCClient,
controllerConfig: ControllerConfig,
actorService: PekkoActorService,
actorRefService: PekkoActorRefMappingService,
maxTerminationAttempts: Int = RegionExecutionCoordinator.DefaultMaxTerminationAttempts,
killRetryDelay: TwitterDuration = RegionExecutionCoordinator.DefaultKillRetryDelay
maxTerminationAttempts: Int = RegionExecutionManager.DefaultMaxTerminationAttempts,
killRetryDelay: TwitterDuration = RegionExecutionManager.DefaultKillRetryDelay
) extends AmberLogging {

initRegionExecution()
Expand All @@ -126,8 +126,8 @@ class RegionExecutionCoordinator(
private val killRetryTimer: Timer = new JavaTimer(true)

/**
* Sync the status of `RegionExecution` and transition this coordinator's phase to `Completed` only when the
* coordinator is currently in `ExecutingNonDependeePortsPhase`, all the ports of this region are completed, and
* Sync the status of `RegionExecution` and transition this manager's phase to `Completed` only when the
* manager is currently in `ExecutingNonDependeePortsPhase`, all the ports of this region are completed, and
* all workers in this region are terminated.
*
* Additionally, this method will also terminate all the workers of this region:
Expand Down Expand Up @@ -156,8 +156,8 @@ class RegionExecutionCoordinator(
existingTerminationFuture
} else {
val terminationFuture = terminateWorkersWithRetry(regionExecution).flatMap { _ =>
// Set this coordinator's status to be completed so that subsequent regions can be started by
// WorkflowExecutionCoordinator.
// Set this manager's status to be completed so that subsequent regions can be started by
// WorkflowExecutionManager.
setPhase(Completed)
Future.Unit
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable

class WorkflowExecutionCoordinator(
class WorkflowExecutionManager(
workflowExecution: WorkflowExecution,
controllerConfig: ControllerConfig,
asyncRPCClient: AsyncRPCClient
Expand All @@ -44,8 +44,7 @@ class WorkflowExecutionCoordinator(

private val executedRegions: mutable.ListBuffer[Set[Region]] = mutable.ListBuffer()

private val regionExecutionCoordinators
: mutable.HashMap[RegionIdentity, RegionExecutionCoordinator] =
private val regionExecutionManagers: mutable.HashMap[RegionIdentity, RegionExecutionManager] =
mutable.HashMap()
private val completionNotified: AtomicBoolean = new AtomicBoolean(false)

Expand All @@ -56,29 +55,29 @@ class WorkflowExecutionCoordinator(
}

/**
* Each invocation first syncs the internal statuses of each exisiting `RegionExecutionCoordintor`, after which each
* of the `RegionExecutionCoordintor`s will launch the corresponding next phase of whenever needed until it is
* Each invocation first syncs the internal statuses of each exisiting `RegionExecutionManager`, after which each
* of the `RegionExecutionManager`s will launch the corresponding next phase of whenever needed until it is
* in `Completed` status (phase).
*
* After the syncs, if there are no running region(s), it will start new regions (if available).
*/
def coordinateRegionExecutors(actorService: PekkoActorService): Future[Unit] = {
val unfinishedRegionCoordinators =
regionExecutionCoordinators.values.filter(!_.isCompleted).toSeq
def advanceRegionExecutions(actorService: PekkoActorService): Future[Unit] = {
val unfinishedRegionManagers =
regionExecutionManagers.values.filter(!_.isCompleted).toSeq

// Trigger sync for each unfinished region.
unfinishedRegionCoordinators.foreach(_.syncStatusAndTransitionRegionExecutionPhase())
unfinishedRegionManagers.foreach(_.syncStatusAndTransitionRegionExecutionPhase())

// Wait only for region termination futures (kill path), then re-run coordination.
val terminationFutures = unfinishedRegionCoordinators.flatMap(_.getTerminationFutureOpt)
// Wait only for region termination futures (kill path), then re-run the advance loop.
val terminationFutures = unfinishedRegionManagers.flatMap(_.getTerminationFutureOpt)
if (terminationFutures.nonEmpty) {
return Future
.collect(terminationFutures)
.unit
.flatMap(_ => coordinateRegionExecutors(actorService))
.flatMap(_ => advanceRegionExecutions(actorService))
}

if (regionExecutionCoordinators.values.exists(!_.isCompleted)) {
if (regionExecutionManagers.values.exists(!_.isCompleted)) {
// Some regions are still not completed yet. Cannot start the new regions.
return Future.Unit
}
Expand All @@ -103,7 +102,7 @@ class WorkflowExecutionCoordinator(
} else {
workflowExecution.initRegionExecution(region)
}
regionExecutionCoordinators(region.id) = new RegionExecutionCoordinator(
regionExecutionManagers(region.id) = new RegionExecutionManager(
region,
isRestart,
workflowExecution,
Expand All @@ -112,7 +111,7 @@ class WorkflowExecutionCoordinator(
actorService,
actorRefService
)
regionExecutionCoordinators(region.id)
regionExecutionManagers(region.id)
})
.map(_.syncStatusAndTransitionRegionExecutionPhase())
.toSeq
Expand All @@ -134,8 +133,8 @@ class WorkflowExecutionCoordinator(
.toSet
}

def hasUnfinishedRegionCoordinators: Boolean = {
regionExecutionCoordinators.values.exists(!_.isCompleted)
def hasUnfinishedRegionManagers: Boolean = {
regionExecutionManagers.values.exists(!_.isCompleted)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
UpdateExecutorRequest,
WorkflowReconfigureRequest
}
import org.apache.texera.amber.engine.architecture.scheduling.{Region, WorkflowExecutionCoordinator}
import org.apache.texera.amber.engine.architecture.scheduling.{Region, WorkflowExecutionManager}
import org.jgrapht.alg.connectivity.ConnectivityInspector

import scala.collection.mutable
Expand All @@ -45,11 +45,11 @@ object FriesReconfigurationAlgorithm {
}

def getReconfigurations(
workflowExecutionCoordinator: WorkflowExecutionCoordinator,
workflowExecutionManager: WorkflowExecutionManager,
reconfiguration: WorkflowReconfigureRequest
): Set[FriesComponent] = {
// independently schedule reconfigurations for each region:
workflowExecutionCoordinator.getExecutingRegions
workflowExecutionManager.getExecutingRegions
.flatMap(region => computeMCS(region, reconfiguration, reconfiguration.reconfigurationId))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ class SyncExecutionResource extends LazyLogging {
killExecution(executionService)
(executionService.executionStateStore.metadataStore.getState, true, false)
case TargetResultsReady(_) =>
// RegionExecutionCoordinator caches upstream results asynchronously after operators
// RegionExecutionManager caches upstream results asynchronously after operators
// complete; sleep gives that caching a chance to finish before we shut down the client.
// TODO: replace with a synchronous signal from the engine.
Thread.sleep(500)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def test_state_written_by_output_manager_is_replayed_by_reader():
port_id = PortIdentity(id=0, internal=False)
worker_schema_for_result = State.SCHEMA # producer-side: only state matters

# 1. RegionExecutionCoordinator's responsibility: provision result +
# 1. RegionExecutionManager's responsibility: provision result +
# state documents at the port base URI before any worker starts.
# We emulate that here.
DocumentFactory.create_document(
Expand Down
Loading
Loading