Skip to content

Commit b6abd55

Browse files
Yicong-Huangclaude
andcommitted
refactor(amber): rename execution coordinators to managers
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
1 parent 908a61f commit b6abd55

19 files changed

Lines changed: 171 additions & 172 deletions

File tree

amber/src/main/python/core/architecture/packaging/output_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def is_missing_output_ports(self):
9797
This method is only used for ensuring correct region execution.
9898
Some operators may have input port dependency relationships, for
9999
which we currently use a two-phase region execution scheme.
100-
(See `RegionExecutionCoordinator.scala` for details.)
100+
(See `RegionExecutionManager.scala` for details.)
101101
This logic will only be executed when the worker is part of an
102102
`executingDependeePortPhase` region-execution phase.
103103
We currently assume that in this phase the operator (worker) will

amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class Controller(
116116
override def initState(): Unit = {
117117
attachRuntimeServicesToCPState()
118118
cp.workflowScheduler.updateSchedule(physicalPlan)
119-
cp.workflowExecutionCoordinator.schedule = cp.workflowScheduler.getSchedule
119+
cp.workflowExecutionManager.schedule = cp.workflowScheduler.getSchedule
120120

121121
val regions: List[(Long, List[String])] =
122122
cp.workflowScheduler.getSchedule.getRegions.map { region =>

amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.texera.amber.engine.architecture.common.{
2929
}
3030
import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
3131
import org.apache.texera.amber.engine.architecture.logreplay.ReplayLogManager
32-
import org.apache.texera.amber.engine.architecture.scheduling.WorkflowExecutionCoordinator
32+
import org.apache.texera.amber.engine.architecture.scheduling.WorkflowExecutionManager
3333
import org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.MainThreadDelegateMessage
3434
import org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessage
3535

@@ -43,7 +43,7 @@ class ControllerProcessor(
4343
val workflowExecution: WorkflowExecution = WorkflowExecution()
4444
val workflowScheduler: WorkflowScheduler =
4545
new WorkflowScheduler(workflowContext, actorId)
46-
val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new WorkflowExecutionCoordinator(
46+
val workflowExecutionManager: WorkflowExecutionManager = new WorkflowExecutionManager(
4747
workflowExecution,
4848
controllerConfig,
4949
asyncRPCClient
@@ -73,7 +73,7 @@ class ControllerProcessor(
7373

7474
def setupActorRefService(actorRefService: PekkoActorRefMappingService): Unit = {
7575
this.actorRefService = actorRefService
76-
this.workflowExecutionCoordinator.setupActorRefService(this.actorRefService)
76+
this.workflowExecutionManager.setupActorRefService(this.actorRefService)
7777
}
7878

7979
@transient var logManager: ReplayLogManager = _

amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@ trait JumpToOperatorRegionHandler {
3434
msg: JumpToOperatorRegionRequest,
3535
ctx: AsyncRPCContext
3636
): Future[EmptyReturn] = {
37-
val coordinator = cp.workflowExecutionCoordinator
38-
coordinator.schedule.levelSets
37+
val manager = cp.workflowExecutionManager
38+
manager.schedule.levelSets
3939
.collectFirst {
4040
case (level, regions)
4141
if regions.exists(_.getOperators.exists(_.id.logicalOpId == msg.targetOperatorId)) =>
4242
level
4343
}
4444
.foreach { targetLevel =>
45-
coordinator.schedule = coordinator.schedule.copy(initialLevelIndex = targetLevel)
45+
manager.schedule = manager.schedule.copy(initialLevelIndex = targetLevel)
4646
}
4747
EmptyReturn()
4848
}

amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/LinkWorkersHandler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ trait LinkWorkersHandler {
3838
this: ControllerAsyncRPCHandlerInitializer =>
3939

4040
override def linkWorkers(msg: LinkWorkersRequest, ctx: AsyncRPCContext): Future[EmptyReturn] = {
41-
val region = cp.workflowExecutionCoordinator.getRegionOfLink(msg.link)
41+
val region = cp.workflowExecutionManager.getRegionOfLink(msg.link)
4242
val resourceConfig = region.resourceConfig.get
4343
val linkConfig = resourceConfig.linkConfigs(msg.link)
4444
val linkExecution =

amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ trait PortCompletedHandler {
6464
msg.portId,
6565
input = msg.input
6666
)
67-
cp.workflowExecutionCoordinator.getRegionOfPortId(globalPortId) match {
67+
cp.workflowExecutionManager.getRegionOfPortId(globalPortId) match {
6868
case Some(region) =>
6969
val regionExecution = cp.workflowExecution.getRegionExecution(region.id)
7070
val operatorExecution =
@@ -81,8 +81,8 @@ trait PortCompletedHandler {
8181
else operatorExecution.isOutputPortCompleted(msg.portId)
8282

8383
if (isPortCompleted) {
84-
cp.workflowExecutionCoordinator
85-
.coordinateRegionExecutors(cp.actorService)
84+
cp.workflowExecutionManager
85+
.advanceRegionExecutions(cp.actorService)
8686
// Since this message is sent from a worker, any exception from the above code will be returned to that worker.
8787
// Additionally, a fatal error is sent to the client, indicating that the region cannot be scheduled.
8888
.onFailure {

amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ trait ReconfigurationHandler {
6161
}
6262
val futures = mutable.ArrayBuffer[Future[_]]()
6363
val friesComponents =
64-
FriesReconfigurationAlgorithm.getReconfigurations(cp.workflowExecutionCoordinator, msg)
64+
FriesReconfigurationAlgorithm.getReconfigurations(cp.workflowExecutionManager, msg)
6565
friesComponents.foreach { friesComponent =>
6666
if (friesComponent.scope.size == 1) {
6767
val updateExecutorRequest = friesComponent.reconfigurations.head

amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/StartWorkflowHandler.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ trait StartWorkflowHandler {
4141
ctx: AsyncRPCContext
4242
): Future[StartWorkflowResponse] = {
4343
if (cp.workflowExecution.getState.isUninitialized) {
44-
cp.workflowExecutionCoordinator
45-
.coordinateRegionExecutors(cp.actorService)
44+
cp.workflowExecutionManager
45+
.advanceRegionExecutions(cp.actorService)
4646
.map(_ => {
4747
cp.controllerTimerService.enableStatusUpdate()
4848
cp.controllerTimerService.enableRuntimeStatisticsCollection()

amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ trait WorkerExecutionCompletedHandler {
6464
val isWorkflowTerminal =
6565
cp.workflowExecution.isCompleted &&
6666
!cp.workflowScheduler.hasPendingRegions &&
67-
!cp.workflowExecutionCoordinator.hasUnfinishedRegionCoordinators
67+
!cp.workflowExecutionManager.hasUnfinishedRegionManagers
6868
if (isWorkflowTerminal) {
6969
// after query result come back: send completed event, cleanup ,and kill workflow
7070
sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState))

amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ class OutputManager(
285285

286286
/**
287287
* This method is only used for ensuring correct region execution. Some operators may have input port dependency
288-
* relationships, for which we currently use a two-phase region execution scheme. (See `RegionExecutionCoordinator`
288+
* relationships, for which we currently use a two-phase region execution scheme. (See `RegionExecutionManager`
289289
* for details.)
290290
* This logic will only be executed when the worker is part of an `executingDependeePort` region-execution phase.
291291
* We currently assume that in this phase the operator (worker) will not output any data, hence no output ports.
@@ -322,7 +322,7 @@ class OutputManager(
322322
writerThread.start()
323323

324324
// The state document is provisioned alongside the result document
325-
// by RegionExecutionCoordinator, so it is always present.
325+
// by RegionExecutionManager, so it is always present.
326326
val stateWriter = DocumentFactory
327327
.openDocument(VFSURIFactory.stateURI(portBaseURI))
328328
._1

0 commit comments

Comments
 (0)