From b6abd55c9911b24069593770d1348dc5d4fb4abb Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sat, 4 Jul 2026 17:16:25 -0700 Subject: [PATCH] refactor(amber): rename execution coordinators to managers Co-Authored-By: Claude Fable 5 --- .../architecture/packaging/output_manager.py | 2 +- .../architecture/controller/Controller.scala | 2 +- .../controller/ControllerProcessor.scala | 6 +- .../JumpToOperatorRegionHandler.scala | 6 +- .../promisehandlers/LinkWorkersHandler.scala | 2 +- .../PortCompletedHandler.scala | 6 +- .../ReconfigurationHandler.scala | 2 +- .../StartWorkflowHandler.scala | 4 +- .../WorkerExecutionCompletedHandler.scala | 2 +- .../messaginglayer/OutputManager.scala | 4 +- ...tor.scala => RegionExecutionManager.scala} | 16 +-- ...r.scala => WorkflowExecutionManager.scala} | 33 +++-- .../FriesReconfigurationAlgorithm.scala | 6 +- .../web/resource/SyncExecutionResource.scala | 2 +- .../test_state_materialization_e2e.py | 2 +- ...scala => RegionExecutionManagerSpec.scala} | 102 +++++++------- ...> RegionExecutionManagerTestSupport.scala} | 12 +- ...ala => WorkflowExecutionManagerSpec.scala} | 132 +++++++++--------- .../promisehandlers/EndHandlerSpec.scala | 2 +- 19 files changed, 171 insertions(+), 172 deletions(-) rename amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/{RegionExecutionCoordinator.scala => RegionExecutionManager.scala} (97%) rename amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/{WorkflowExecutionCoordinator.scala => WorkflowExecutionManager.scala} (79%) rename amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/{RegionExecutionCoordinatorSpec.scala => RegionExecutionManagerSpec.scala} (83%) rename amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/{RegionCoordinatorTestSupport.scala => RegionExecutionManagerTestSupport.scala} (96%) rename amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/{WorkflowExecutionCoordinatorSpec.scala => WorkflowExecutionManagerSpec.scala} (66%) diff --git a/amber/src/main/python/core/architecture/packaging/output_manager.py b/amber/src/main/python/core/architecture/packaging/output_manager.py index 90ceda2eb96..526bedc1a12 100644 --- a/amber/src/main/python/core/architecture/packaging/output_manager.py +++ b/amber/src/main/python/core/architecture/packaging/output_manager.py @@ -97,7 +97,7 @@ def is_missing_output_ports(self): This method is only used for ensuring correct region execution. Some operators may have input port dependency relationships, for which we currently use a two-phase region execution scheme. - (See `RegionExecutionCoordinator.scala` for details.) + (See `RegionExecutionManager.scala` for details.) This logic will only be executed when the worker is part of an `executingDependeePortPhase` region-execution phase. We currently assume that in this phase the operator (worker) will diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala index 4e365dba2de..2bbc8ea1227 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala @@ -116,7 +116,7 @@ class Controller( override def initState(): Unit = { attachRuntimeServicesToCPState() cp.workflowScheduler.updateSchedule(physicalPlan) - cp.workflowExecutionCoordinator.schedule = cp.workflowScheduler.getSchedule + cp.workflowExecutionManager.schedule = cp.workflowScheduler.getSchedule val regions: List[(Long, List[String])] = cp.workflowScheduler.getSchedule.getRegions.map { region => diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala index ef33174b6b0..883e250238a 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala @@ -29,7 +29,7 @@ import org.apache.texera.amber.engine.architecture.common.{ } import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution import org.apache.texera.amber.engine.architecture.logreplay.ReplayLogManager -import org.apache.texera.amber.engine.architecture.scheduling.WorkflowExecutionCoordinator +import org.apache.texera.amber.engine.architecture.scheduling.WorkflowExecutionManager import org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.MainThreadDelegateMessage import org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessage @@ -43,7 +43,7 @@ class ControllerProcessor( val workflowExecution: WorkflowExecution = WorkflowExecution() val workflowScheduler: WorkflowScheduler = new WorkflowScheduler(workflowContext, actorId) - val workflowExecutionCoordinator: WorkflowExecutionCoordinator = new WorkflowExecutionCoordinator( + val workflowExecutionManager: WorkflowExecutionManager = new WorkflowExecutionManager( workflowExecution, controllerConfig, asyncRPCClient @@ -73,7 +73,7 @@ class ControllerProcessor( def setupActorRefService(actorRefService: PekkoActorRefMappingService): Unit = { this.actorRefService = actorRefService - this.workflowExecutionCoordinator.setupActorRefService(this.actorRefService) + this.workflowExecutionManager.setupActorRefService(this.actorRefService) } @transient var logManager: ReplayLogManager = _ diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala index 0047efe45f4..9fbc1e48afe 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/JumpToOperatorRegionHandler.scala @@ -34,15 +34,15 @@ trait JumpToOperatorRegionHandler { msg: JumpToOperatorRegionRequest, ctx: AsyncRPCContext ): Future[EmptyReturn] = { - val coordinator = cp.workflowExecutionCoordinator - coordinator.schedule.levelSets + val manager = cp.workflowExecutionManager + manager.schedule.levelSets .collectFirst { case (level, regions) if regions.exists(_.getOperators.exists(_.id.logicalOpId == msg.targetOperatorId)) => level } .foreach { targetLevel => - coordinator.schedule = coordinator.schedule.copy(initialLevelIndex = targetLevel) + manager.schedule = manager.schedule.copy(initialLevelIndex = targetLevel) } EmptyReturn() } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/LinkWorkersHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/LinkWorkersHandler.scala index f8a967aad73..1049fc0ed19 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/LinkWorkersHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/LinkWorkersHandler.scala @@ -38,7 +38,7 @@ trait LinkWorkersHandler { this: ControllerAsyncRPCHandlerInitializer => override def linkWorkers(msg: LinkWorkersRequest, ctx: AsyncRPCContext): Future[EmptyReturn] = { - val region = cp.workflowExecutionCoordinator.getRegionOfLink(msg.link) + val region = cp.workflowExecutionManager.getRegionOfLink(msg.link) val resourceConfig = region.resourceConfig.get val linkConfig = resourceConfig.linkConfigs(msg.link) val linkExecution = diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala index 810c098c417..b961ee28042 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PortCompletedHandler.scala @@ -64,7 +64,7 @@ trait PortCompletedHandler { msg.portId, input = msg.input ) - cp.workflowExecutionCoordinator.getRegionOfPortId(globalPortId) match { + cp.workflowExecutionManager.getRegionOfPortId(globalPortId) match { case Some(region) => val regionExecution = cp.workflowExecution.getRegionExecution(region.id) val operatorExecution = @@ -81,8 +81,8 @@ trait PortCompletedHandler { else operatorExecution.isOutputPortCompleted(msg.portId) if (isPortCompleted) { - cp.workflowExecutionCoordinator - .coordinateRegionExecutors(cp.actorService) + cp.workflowExecutionManager + .advanceRegionExecutions(cp.actorService) // Since this message is sent from a worker, any exception from the above code will be returned to that worker. // Additionally, a fatal error is sent to the client, indicating that the region cannot be scheduled. .onFailure { diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala index 7653f873c13..6bedad07640 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala @@ -61,7 +61,7 @@ trait ReconfigurationHandler { } val futures = mutable.ArrayBuffer[Future[_]]() val friesComponents = - FriesReconfigurationAlgorithm.getReconfigurations(cp.workflowExecutionCoordinator, msg) + FriesReconfigurationAlgorithm.getReconfigurations(cp.workflowExecutionManager, msg) friesComponents.foreach { friesComponent => if (friesComponent.scope.size == 1) { val updateExecutorRequest = friesComponent.reconfigurations.head diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/StartWorkflowHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/StartWorkflowHandler.scala index 7d938dbedde..2acb8ea6316 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/StartWorkflowHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/StartWorkflowHandler.scala @@ -41,8 +41,8 @@ trait StartWorkflowHandler { ctx: AsyncRPCContext ): Future[StartWorkflowResponse] = { if (cp.workflowExecution.getState.isUninitialized) { - cp.workflowExecutionCoordinator - .coordinateRegionExecutors(cp.actorService) + cp.workflowExecutionManager + .advanceRegionExecutions(cp.actorService) .map(_ => { cp.controllerTimerService.enableStatusUpdate() cp.controllerTimerService.enableRuntimeStatisticsCollection() diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala index c3b3ddb234b..65926722f24 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerExecutionCompletedHandler.scala @@ -64,7 +64,7 @@ trait WorkerExecutionCompletedHandler { val isWorkflowTerminal = cp.workflowExecution.isCompleted && !cp.workflowScheduler.hasPendingRegions && - !cp.workflowExecutionCoordinator.hasUnfinishedRegionCoordinators + !cp.workflowExecutionManager.hasUnfinishedRegionManagers if (isWorkflowTerminal) { // after query result come back: send completed event, cleanup ,and kill workflow sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState)) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala index 80cc24780b0..e65a7b198f5 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -285,7 +285,7 @@ class OutputManager( /** * This method is only used for ensuring correct region execution. Some operators may have input port dependency - * relationships, for which we currently use a two-phase region execution scheme. (See `RegionExecutionCoordinator` + * relationships, for which we currently use a two-phase region execution scheme. (See `RegionExecutionManager` * for details.) * This logic will only be executed when the worker is part of an `executingDependeePort` region-execution phase. * We currently assume that in this phase the operator (worker) will not output any data, hence no output ports. @@ -322,7 +322,7 @@ class OutputManager( writerThread.start() // The state document is provisioned alongside the result document - // by RegionExecutionCoordinator, so it is always present. + // by RegionExecutionManager, so it is always present. val stateWriter = DocumentFactory .openDocument(VFSURIFactory.stateURI(portBaseURI)) ._1 diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionManager.scala similarity index 97% rename from amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala rename to amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionManager.scala index 9e84e5e80be..670014484b3 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionManager.scala @@ -64,7 +64,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference import scala.concurrent.duration.{Duration => ScalaDuration} -object RegionExecutionCoordinator { +object RegionExecutionManager { // Max EndWorker retries before termination fails. ~30s at DefaultKillRetryDelay (200ms). private[scheduling] val DefaultMaxTerminationAttempts: Int = 150 @@ -99,7 +99,7 @@ object RegionExecutionCoordinator { * * 3. `Completed` */ -class RegionExecutionCoordinator( +class RegionExecutionManager( region: Region, isRestart: Boolean, workflowExecution: WorkflowExecution, @@ -107,8 +107,8 @@ class RegionExecutionCoordinator( controllerConfig: ControllerConfig, actorService: PekkoActorService, actorRefService: PekkoActorRefMappingService, - maxTerminationAttempts: Int = RegionExecutionCoordinator.DefaultMaxTerminationAttempts, - killRetryDelay: TwitterDuration = RegionExecutionCoordinator.DefaultKillRetryDelay + maxTerminationAttempts: Int = RegionExecutionManager.DefaultMaxTerminationAttempts, + killRetryDelay: TwitterDuration = RegionExecutionManager.DefaultKillRetryDelay ) extends AmberLogging { initRegionExecution() @@ -126,8 +126,8 @@ class RegionExecutionCoordinator( private val killRetryTimer: Timer = new JavaTimer(true) /** - * Sync the status of `RegionExecution` and transition this coordinator's phase to `Completed` only when the - * coordinator is currently in `ExecutingNonDependeePortsPhase`, all the ports of this region are completed, and + * Sync the status of `RegionExecution` and transition this manager's phase to `Completed` only when the + * manager is currently in `ExecutingNonDependeePortsPhase`, all the ports of this region are completed, and * all workers in this region are terminated. * * Additionally, this method will also terminate all the workers of this region: @@ -156,8 +156,8 @@ class RegionExecutionCoordinator( existingTerminationFuture } else { val terminationFuture = terminateWorkersWithRetry(regionExecution).flatMap { _ => - // Set this coordinator's status to be completed so that subsequent regions can be started by - // WorkflowExecutionCoordinator. + // Set this manager's status to be completed so that subsequent regions can be started by + // WorkflowExecutionManager. setPhase(Completed) Future.Unit } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionManager.scala similarity index 79% rename from amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala rename to amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionManager.scala index deb753beb37..1c4982f90d7 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionManager.scala @@ -34,7 +34,7 @@ import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable -class WorkflowExecutionCoordinator( +class WorkflowExecutionManager( workflowExecution: WorkflowExecution, controllerConfig: ControllerConfig, asyncRPCClient: AsyncRPCClient @@ -44,8 +44,7 @@ class WorkflowExecutionCoordinator( private val executedRegions: mutable.ListBuffer[Set[Region]] = mutable.ListBuffer() - private val regionExecutionCoordinators - : mutable.HashMap[RegionIdentity, RegionExecutionCoordinator] = + private val regionExecutionManagers: mutable.HashMap[RegionIdentity, RegionExecutionManager] = mutable.HashMap() private val completionNotified: AtomicBoolean = new AtomicBoolean(false) @@ -56,29 +55,29 @@ class WorkflowExecutionCoordinator( } /** - * Each invocation first syncs the internal statuses of each exisiting `RegionExecutionCoordintor`, after which each - * of the `RegionExecutionCoordintor`s will launch the corresponding next phase of whenever needed until it is + * Each invocation first syncs the internal statuses of each exisiting `RegionExecutionManager`, after which each + * of the `RegionExecutionManager`s will launch the corresponding next phase of whenever needed until it is * in `Completed` status (phase). * * After the syncs, if there are no running region(s), it will start new regions (if available). */ - def coordinateRegionExecutors(actorService: PekkoActorService): Future[Unit] = { - val unfinishedRegionCoordinators = - regionExecutionCoordinators.values.filter(!_.isCompleted).toSeq + def advanceRegionExecutions(actorService: PekkoActorService): Future[Unit] = { + val unfinishedRegionManagers = + regionExecutionManagers.values.filter(!_.isCompleted).toSeq // Trigger sync for each unfinished region. - unfinishedRegionCoordinators.foreach(_.syncStatusAndTransitionRegionExecutionPhase()) + unfinishedRegionManagers.foreach(_.syncStatusAndTransitionRegionExecutionPhase()) - // Wait only for region termination futures (kill path), then re-run coordination. - val terminationFutures = unfinishedRegionCoordinators.flatMap(_.getTerminationFutureOpt) + // Wait only for region termination futures (kill path), then re-run the advance loop. + val terminationFutures = unfinishedRegionManagers.flatMap(_.getTerminationFutureOpt) if (terminationFutures.nonEmpty) { return Future .collect(terminationFutures) .unit - .flatMap(_ => coordinateRegionExecutors(actorService)) + .flatMap(_ => advanceRegionExecutions(actorService)) } - if (regionExecutionCoordinators.values.exists(!_.isCompleted)) { + if (regionExecutionManagers.values.exists(!_.isCompleted)) { // Some regions are still not completed yet. Cannot start the new regions. return Future.Unit } @@ -103,7 +102,7 @@ class WorkflowExecutionCoordinator( } else { workflowExecution.initRegionExecution(region) } - regionExecutionCoordinators(region.id) = new RegionExecutionCoordinator( + regionExecutionManagers(region.id) = new RegionExecutionManager( region, isRestart, workflowExecution, @@ -112,7 +111,7 @@ class WorkflowExecutionCoordinator( actorService, actorRefService ) - regionExecutionCoordinators(region.id) + regionExecutionManagers(region.id) }) .map(_.syncStatusAndTransitionRegionExecutionPhase()) .toSeq @@ -134,8 +133,8 @@ class WorkflowExecutionCoordinator( .toSet } - def hasUnfinishedRegionCoordinators: Boolean = { - regionExecutionCoordinators.values.exists(!_.isCompleted) + def hasUnfinishedRegionManagers: Boolean = { + regionExecutionManagers.values.exists(!_.isCompleted) } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/common/FriesReconfigurationAlgorithm.scala b/amber/src/main/scala/org/apache/texera/amber/engine/common/FriesReconfigurationAlgorithm.scala index c13e7801190..544ce819a17 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/common/FriesReconfigurationAlgorithm.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/common/FriesReconfigurationAlgorithm.scala @@ -25,7 +25,7 @@ import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ UpdateExecutorRequest, WorkflowReconfigureRequest } -import org.apache.texera.amber.engine.architecture.scheduling.{Region, WorkflowExecutionCoordinator} +import org.apache.texera.amber.engine.architecture.scheduling.{Region, WorkflowExecutionManager} import org.jgrapht.alg.connectivity.ConnectivityInspector import scala.collection.mutable @@ -45,11 +45,11 @@ object FriesReconfigurationAlgorithm { } def getReconfigurations( - workflowExecutionCoordinator: WorkflowExecutionCoordinator, + workflowExecutionManager: WorkflowExecutionManager, reconfiguration: WorkflowReconfigureRequest ): Set[FriesComponent] = { // independently schedule reconfigurations for each region: - workflowExecutionCoordinator.getExecutingRegions + workflowExecutionManager.getExecutingRegions .flatMap(region => computeMCS(region, reconfiguration, reconfiguration.reconfigurationId)) } diff --git a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala index b70bafb4b0b..3c9da603c86 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala @@ -281,7 +281,7 @@ class SyncExecutionResource extends LazyLogging { killExecution(executionService) (executionService.executionStateStore.metadataStore.getState, true, false) case TargetResultsReady(_) => - // RegionExecutionCoordinator caches upstream results asynchronously after operators + // RegionExecutionManager caches upstream results asynchronously after operators // complete; sleep gives that caching a chance to finish before we shut down the client. // TODO: replace with a synchronous signal from the engine. Thread.sleep(500) diff --git a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py index b77bdca3f54..5db9bdf9036 100644 --- a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py +++ b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py @@ -162,7 +162,7 @@ def test_state_written_by_output_manager_is_replayed_by_reader(): port_id = PortIdentity(id=0, internal=False) worker_schema_for_result = State.SCHEMA # producer-side: only state matters - # 1. RegionExecutionCoordinator's responsibility: provision result + + # 1. RegionExecutionManager's responsibility: provision result + # state documents at the port base URI before any worker starts. # We emulate that here. DocumentFactory.create_document( diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionManagerSpec.scala similarity index 83% rename from amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala rename to amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionManagerSpec.scala index 2feec2362c9..482af953d4a 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionManagerSpec.scala @@ -38,7 +38,7 @@ import org.apache.texera.amber.engine.architecture.common.PekkoActorRefMappingSe import org.apache.texera.amber.engine.architecture.controller.ControllerConfig import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution import org.apache.texera.amber.engine.architecture.rpc.controlreturns._ -import org.apache.texera.amber.engine.architecture.scheduling.RegionCoordinatorTestSupport._ +import org.apache.texera.amber.engine.architecture.scheduling.RegionExecutionManagerTestSupport._ import org.apache.texera.amber.engine.architecture.scheduling.config.{ OperatorConfig, OutputPortConfig, @@ -55,45 +55,45 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic /** - * Tests the real region-coordination lifecycle around synchronous region kill. + * Tests the real region-execution lifecycle around synchronous region kill. * - * The tests let the coordinator call the real `AsyncRPCClient.workerInterface`, capture the generated + * The tests let the manager call the real `AsyncRPCClient.workerInterface`, capture the generated * `ControlInvocation`s at the controller output gateway, and fulfill those RPC promises * explicitly. This keeps the important production behavior under test: * * - regular launch RPCs (`initializeExecutor`, `openExecutor`, `startWorker`) are allowed to * complete immediately; * - `endWorker` can be held pending or failed to model worker-side drain/termination behavior; - * - the real coordinator then decides when to remove actor refs, clean control channels, mark + * - the real manager then decides when to remove actor refs, clean control channels, mark * workers terminated, and allow the next region to start. */ -class RegionExecutionCoordinatorSpec - extends TestKit(ActorSystem("RegionExecutionCoordinatorSpec", AmberRuntime.pekkoConfig)) +class RegionExecutionManagerSpec + extends TestKit(ActorSystem("RegionExecutionManagerSpec", AmberRuntime.pekkoConfig)) with AnyFlatSpecLike with BeforeAndAfterAll - with RegionCoordinatorTestSupport { + with RegionExecutionManagerTestSupport { override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) } - "RegionExecutionCoordinator" should "send gracefulStop only after EndWorker succeeds" in { + "RegionExecutionManager" should "send gracefulStop only after EndWorker succeeds" in { val fixture = createSingleRegionFixture(endWorkerResponse = _ => None) - launchRegion(fixture.coordinator) - val completion = requestRegionCompletion(fixture.coordinator) + launchRegion(fixture.manager) + val completion = requestRegionCompletion(fixture.manager) assert( fixture.rpcProbe.methodTrace == Seq(InitializeExecutor, OpenExecutor, StartWorker, EndWorker) ) assert(completion.poll.isEmpty) - assert(!fixture.coordinator.isCompleted) + assert(!fixture.manager.isCompleted) assert(fixture.actorRefService.hasActorRef(fixture.workerId)) fixture.rpcProbe.fulfill(fixture.rpcProbe.onlyEndWorkerCall, EmptyReturn()) await(completion) - assert(fixture.coordinator.isCompleted) + assert(fixture.manager.isCompleted) assert(!fixture.actorRefService.hasActorRef(fixture.workerId)) assert(workerState(fixture) == WorkerState.TERMINATED) assertControlChannelsAreRemoved(fixture) @@ -101,7 +101,7 @@ class RegionExecutionCoordinatorSpec it should "retry EndWorker failures and delay gracefulStop until a retry succeeds" in { val attempts = new atomic.AtomicInteger(0) - // The first EndWorker is sent on the test thread; the retry is sent later from the coordinator's + // The first EndWorker is sent on the test thread; the retry is sent later from the manager's // kill-retry timer thread. Block on this latch — counted down from the probe callback once the // retry's call has been recorded — instead of polling `endWorkerCalls` from the test thread, so // the test never iterates the call buffer while the timer thread is appending to it. @@ -116,21 +116,21 @@ class RegionExecutionCoordinatorSpec } ) - launchRegion(fixture.coordinator) - val completion = requestRegionCompletion(fixture.coordinator) + launchRegion(fixture.manager) + val completion = requestRegionCompletion(fixture.manager) assert( retryAttempted.await(testTimeout.inMilliseconds, TimeUnit.MILLISECONDS), "EndWorker was not retried within the deadline" ) assert(completion.poll.isEmpty) - assert(!fixture.coordinator.isCompleted) + assert(!fixture.manager.isCompleted) assert(fixture.actorRefService.hasActorRef(fixture.workerId)) fixture.rpcProbe.fulfill(fixture.rpcProbe.endWorkerCalls.last, EmptyReturn()) await(completion) - assert(fixture.coordinator.isCompleted) + assert(fixture.manager.isCompleted) assert(fixture.rpcProbe.endWorkerCalls.size == 2) assert(!fixture.actorRefService.hasActorRef(fixture.workerId)) assert(workerState(fixture) == WorkerState.TERMINATED) @@ -144,14 +144,14 @@ class RegionExecutionCoordinatorSpec killRetryDelay = TwitterDuration.fromMilliseconds(5) ) - launchRegion(fixture.coordinator) - val completion = requestRegionCompletion(fixture.coordinator) + launchRegion(fixture.manager) + val completion = requestRegionCompletion(fixture.manager) val failure = intercept[IllegalStateException] { await(completion) } assert(failure.getMessage.contains("could not be terminated after 3 attempts")) - assert(!fixture.coordinator.isCompleted) + assert(!fixture.manager.isCompleted) assert(fixture.rpcProbe.endWorkerCalls.size == 3) assert(fixture.actorRefService.hasActorRef(fixture.workerId)) } @@ -163,8 +163,8 @@ class RegionExecutionCoordinatorSpec killRetryDelay = TwitterDuration.fromMilliseconds(5) ) - launchRegion(fixture.coordinator) - val completion = requestRegionCompletion(fixture.coordinator) + launchRegion(fixture.manager) + val completion = requestRegionCompletion(fixture.manager) val failure = intercept[IllegalStateException] { await(completion) @@ -190,11 +190,11 @@ class RegionExecutionCoordinatorSpec killRetryDelay = TwitterDuration.fromMilliseconds(5) ) - launchRegion(fixture.coordinator) - val completion = requestRegionCompletion(fixture.coordinator) + launchRegion(fixture.manager) + val completion = requestRegionCompletion(fixture.manager) await(completion) - assert(fixture.coordinator.isCompleted) + assert(fixture.manager.isCompleted) assert(fixture.rpcProbe.endWorkerCalls.size == 2) assert(!fixture.actorRefService.hasActorRef(fixture.workerId)) assert(workerState(fixture) == WorkerState.TERMINATED) @@ -209,8 +209,8 @@ class RegionExecutionCoordinatorSpec killRetryDelay = TwitterDuration.fromMilliseconds(5) ) - launchRegion(fixture.coordinator) - val completion = requestRegionCompletion(fixture.coordinator) + launchRegion(fixture.manager) + val completion = requestRegionCompletion(fixture.manager) val failure = intercept[IllegalStateException] { await(completion) @@ -220,7 +220,7 @@ class RegionExecutionCoordinatorSpec assert(failure.getMessage.contains(workerId.toString)) } assert(failure.getCause != null) - assert(!fixture.coordinator.isCompleted) + assert(!fixture.manager.isCompleted) // EndWorker is sent to every worker on every attempt. assert(fixture.rpcProbe.endWorkerCalls.size == fixture.workerIds.size * 2) } @@ -228,22 +228,22 @@ class RegionExecutionCoordinatorSpec it should "default to a bounded ~30s termination budget" in { // 150 attempts * 200 ms ≈ 30 s. These defaults are the documented contract for how long a // stuck region blocks before failing loudly; pin them so changes are deliberate. - assert(RegionExecutionCoordinator.DefaultMaxTerminationAttempts == 150) + assert(RegionExecutionManager.DefaultMaxTerminationAttempts == 150) assert( - RegionExecutionCoordinator.DefaultKillRetryDelay == TwitterDuration.fromMilliseconds(200) + RegionExecutionManager.DefaultKillRetryDelay == TwitterDuration.fromMilliseconds(200) ) } it should "surface the underlying cause when an output port schema is unavailable" in { // Reproduces issue #3546: when schema inference for an output port fails (e.g. because a // dataset used by the workflow has not been shared with the running user), the port's - // schema is stored as a `Left(cause)`. The coordinator must surface that real cause rather + // schema is stored as a `Left(cause)`. The manager must surface that real cause rather // than discarding it behind a generic "Schema is missing" message. val cause = new RuntimeException("User texera1 has no access to dataset 'iris'") - val coordinator = coordinatorWithUnresolvedOutputSchema(cause) + val manager = managerWithUnresolvedOutputSchema(cause) val thrown = intercept[IllegalStateException] { - await(coordinator.syncStatusAndTransitionRegionExecutionPhase()) + await(manager.syncStatusAndTransitionRegionExecutionPhase()) } assert(thrown.getCause eq cause) assert(thrown.getMessage.contains(cause.getMessage)) @@ -254,10 +254,10 @@ class RegionExecutionCoordinatorSpec // not read "...: null". val cause = new NullPointerException() assert(cause.getMessage == null) - val coordinator = coordinatorWithUnresolvedOutputSchema(cause) + val manager = managerWithUnresolvedOutputSchema(cause) val thrown = intercept[IllegalStateException] { - await(coordinator.syncStatusAndTransitionRegionExecutionPhase()) + await(manager.syncStatusAndTransitionRegionExecutionPhase()) } assert(thrown.getCause eq cause) assert(thrown.getMessage.contains(cause.toString)) @@ -265,13 +265,13 @@ class RegionExecutionCoordinatorSpec } /** - * Builds a coordinator for a single-source region whose only output port has an unresolved + * Builds a manager for a single-source region whose only output port has an unresolved * schema (`Left(cause)`) and a configured output storage, so that the non-dependee phase * reaches `createOutputPortStorageObjects` and attempts to read that schema. */ - private def coordinatorWithUnresolvedOutputSchema( + private def managerWithUnresolvedOutputSchema( cause: Throwable - ): RegionExecutionCoordinator = { + ): RegionExecutionManager = { val portId = PortIdentity(0) val baseOp = createSourceOp("schema-missing-op").withOutputPorts(List(OutputPort(portId))) val (outPort, links, _) = baseOp.outputPorts(portId) @@ -302,7 +302,7 @@ class RegionExecutionCoordinatorSpec val controller = createControllerHarness() registerLiveWorker(controller.actorRefService, workerId) - new RegionExecutionCoordinator( + new RegionExecutionManager( region, isRestart = false, workflowExecution, @@ -314,7 +314,7 @@ class RegionExecutionCoordinatorSpec } private case class SingleRegionFixture( - coordinator: RegionExecutionCoordinator, + manager: RegionExecutionManager, rpcProbe: ControllerRpcProbe, workflowExecution: WorkflowExecution, region: Region, @@ -325,8 +325,8 @@ class RegionExecutionCoordinatorSpec private def createSingleRegionFixture( endWorkerResponse: WorkerRpcCall => Option[ControlReturn], - maxTerminationAttempts: Int = RegionExecutionCoordinator.DefaultMaxTerminationAttempts, - killRetryDelay: TwitterDuration = RegionExecutionCoordinator.DefaultKillRetryDelay + maxTerminationAttempts: Int = RegionExecutionManager.DefaultMaxTerminationAttempts, + killRetryDelay: TwitterDuration = RegionExecutionManager.DefaultKillRetryDelay ): SingleRegionFixture = { val physicalOp = createSourceOp("test-op") val workerId = createWorkerId(physicalOp) @@ -346,7 +346,7 @@ class RegionExecutionCoordinatorSpec ChannelIdentity(CONTROLLER, workerId, isControl = true) ) - val coordinator = new RegionExecutionCoordinator( + val manager = new RegionExecutionManager( region, isRestart = false, workflowExecution, @@ -359,7 +359,7 @@ class RegionExecutionCoordinatorSpec ) SingleRegionFixture( - coordinator = coordinator, + manager = manager, rpcProbe = rpcProbe, workflowExecution = workflowExecution, region = region, @@ -370,7 +370,7 @@ class RegionExecutionCoordinatorSpec } private case class MultiWorkerFixture( - coordinator: RegionExecutionCoordinator, + manager: RegionExecutionManager, rpcProbe: ControllerRpcProbe, workerIds: Seq[ActorVirtualIdentity] ) @@ -394,7 +394,7 @@ class RegionExecutionCoordinatorSpec val controller = createControllerHarness() workerIds.foreach(registerLiveWorker(controller.actorRefService, _)) - val coordinator = new RegionExecutionCoordinator( + val manager = new RegionExecutionManager( region, isRestart = false, workflowExecution, @@ -406,17 +406,17 @@ class RegionExecutionCoordinatorSpec killRetryDelay ) - MultiWorkerFixture(coordinator, rpcProbe, workerIds) + MultiWorkerFixture(manager, rpcProbe, workerIds) } - private def launchRegion(coordinator: RegionExecutionCoordinator): Unit = { - await(coordinator.syncStatusAndTransitionRegionExecutionPhase()) + private def launchRegion(manager: RegionExecutionManager): Unit = { + await(manager.syncStatusAndTransitionRegionExecutionPhase()) } private def requestRegionCompletion( - coordinator: RegionExecutionCoordinator + manager: RegionExecutionManager ): Future[Unit] = { - coordinator.syncStatusAndTransitionRegionExecutionPhase() + manager.syncStatusAndTransitionRegionExecutionPhase() } private def workerState(fixture: SingleRegionFixture): WorkerState = diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionManagerTestSupport.scala similarity index 96% rename from amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala rename to amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionManagerTestSupport.scala index 64d85972ffb..1ad413244c0 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionManagerTestSupport.scala @@ -60,14 +60,14 @@ import org.apache.texera.amber.util.VirtualIdentityUtils import scala.collection.mutable -object RegionCoordinatorTestSupport { +object RegionExecutionManagerTestSupport { val InitializeExecutor = "initializeExecutor" val OpenExecutor = "openExecutor" val StartWorker = "startWorker" val EndWorker = "endWorker" // Generous deadline for the polling helpers below. Production timing under test (notably the - // 200 ms `killRetryDelay` in `RegionExecutionCoordinator`) fits comfortably; the rest is + // 200 ms `killRetryDelay` in `RegionExecutionManager`) fits comfortably; the rest is // headroom for slow CI. val testTimeout: Duration = Duration.fromSeconds(5) @@ -121,7 +121,7 @@ object RegionCoordinatorTestSupport { case invocation: ControlInvocation => recordAndMaybeFulfill(invocation) case _ => - // Client events and stats updates are irrelevant to the coordinator lifecycle assertions. + // Client events and stats updates are irrelevant to the manager lifecycle assertions. } } @@ -219,7 +219,7 @@ object RegionCoordinatorTestSupport { physicalOp: PhysicalOp, workerIds: Seq[ActorVirtualIdentity] ): Unit = { - // RegionExecutionCoordinator skips real worker creation when an execution for this operator + // RegionExecutionManager skips real worker creation when an execution for this operator // already exists. val operatorExecution = workflowExecution .initRegionExecution(createWorkerRegion(seedRegionId, physicalOp, workerIds)) @@ -238,8 +238,8 @@ object RegionCoordinatorTestSupport { } } -trait RegionCoordinatorTestSupport { self: TestKit => - import RegionCoordinatorTestSupport._ +trait RegionExecutionManagerTestSupport { self: TestKit => + import RegionExecutionManagerTestSupport._ protected def createControllerHarness(): ControllerHarnessFixture = { val controllerRef = TestActorRef(new ControllerHarness) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionManagerSpec.scala similarity index 66% rename from amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala rename to amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionManagerSpec.scala index 1a1e4afa319..572b8322770 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionManagerSpec.scala @@ -32,16 +32,16 @@ import org.apache.texera.amber.core.workflow.PhysicalOp import org.apache.texera.amber.engine.architecture.controller.ControllerConfig import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn -import org.apache.texera.amber.engine.architecture.scheduling.RegionCoordinatorTestSupport._ +import org.apache.texera.amber.engine.architecture.scheduling.RegionExecutionManagerTestSupport._ import org.apache.texera.amber.engine.common.AmberRuntime import org.scalatest.BeforeAndAfterAll import org.scalatest.flatspec.AnyFlatSpecLike -class WorkflowExecutionCoordinatorSpec - extends TestKit(ActorSystem("WorkflowExecutionCoordinatorSpec", AmberRuntime.pekkoConfig)) +class WorkflowExecutionManagerSpec + extends TestKit(ActorSystem("WorkflowExecutionManagerSpec", AmberRuntime.pekkoConfig)) with AnyFlatSpecLike with BeforeAndAfterAll - with RegionCoordinatorTestSupport { + with RegionExecutionManagerTestSupport { override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) @@ -73,34 +73,34 @@ class WorkflowExecutionCoordinatorSpec (first, second, third, schedule) } - private def newJumpCoordinator(schedule: Schedule): WorkflowExecutionCoordinator = { - val coordinator = new WorkflowExecutionCoordinator(WorkflowExecution(), null, null) - coordinator.schedule = schedule - coordinator + private def newJumpManager(schedule: Schedule): WorkflowExecutionManager = { + val manager = new WorkflowExecutionManager(WorkflowExecution(), null, null) + manager.schedule = schedule + manager } - private def nextRegions(coordinator: WorkflowExecutionCoordinator): Set[Region] = { - val schedule = coordinator.schedule + private def nextRegions(manager: WorkflowExecutionManager): Set[Region] = { + val schedule = manager.schedule if (schedule.hasNext) schedule.next() else Set.empty } // Mirrors what JumpToOperatorRegionHandler does: read the current schedule, scan for the // level containing the target operator, and replace the schedule with a copy whose cursor is // at that level. - private def jumpTo(coordinator: WorkflowExecutionCoordinator, opName: String): Unit = { + private def jumpTo(manager: WorkflowExecutionManager, opName: String): Unit = { val opId = OperatorIdentity(opName) - val schedule = coordinator.schedule + val schedule = manager.schedule schedule.levelSets .collectFirst { case (level, regions) if regions.exists(_.getOperators.exists(_.id.logicalOpId == opId)) => level } .foreach { targetLevel => - coordinator.schedule = schedule.copy(initialLevelIndex = targetLevel) + manager.schedule = schedule.copy(initialLevelIndex = targetLevel) } } - "WorkflowExecutionCoordinator" should + "WorkflowExecutionManager" should "start the next region only after previous region termination succeeds" in { val firstOp = createSourceOp("first-op") val firstWorkerId = createWorkerId(firstOp) @@ -124,26 +124,26 @@ class WorkflowExecutionCoordinatorSpec registerLiveWorker(controller.actorRefService, firstWorkerId) registerLiveWorker(controller.actorRefService, secondWorkerId) - val workflowCoordinator = new WorkflowExecutionCoordinator( + val workflowManager = new WorkflowExecutionManager( workflowExecution, ControllerConfig(None, None, None, None), rpcProbe.asyncRPCClient ) - workflowCoordinator.schedule = Schedule(Map(0 -> Set(firstRegion), 1 -> Set(secondRegion))) - workflowCoordinator.setupActorRefService(controller.actorRefService) + workflowManager.schedule = Schedule(Map(0 -> Set(firstRegion), 1 -> Set(secondRegion))) + workflowManager.setupActorRefService(controller.actorRefService) - await(workflowCoordinator.coordinateRegionExecutors(controller.actorService)) + await(workflowManager.advanceRegionExecutions(controller.actorService)) assert(rpcProbe.startedWorkers == Seq(firstWorkerId)) - val coordination = workflowCoordinator.coordinateRegionExecutors(controller.actorService) + val advanceFuture = workflowManager.advanceRegionExecutions(controller.actorService) waitUntil(rpcProbe.endWorkerCalls.size == 1) - assert(coordination.poll.isEmpty) + assert(advanceFuture.poll.isEmpty) assert(!rpcProbe.initializedWorkers.contains(secondWorkerId)) assert(controller.actorRefService.hasActorRef(firstWorkerId)) rpcProbe.fulfill(rpcProbe.onlyEndWorkerCall, EmptyReturn()) - await(coordination) + await(advanceFuture) assert(!controller.actorRefService.hasActorRef(firstWorkerId)) assert(rpcProbe.initializedWorkers.contains(secondWorkerId)) @@ -153,81 +153,81 @@ class WorkflowExecutionCoordinatorSpec "Jumping to an operator's region" should "make the next scheduled region contain the target operator's region" in { val (first, second, _, schedule) = threeLevelSchedule() - val coordinator = newJumpCoordinator(schedule) + val manager = newJumpManager(schedule) - assert(nextRegions(coordinator) == Set(first)) - assert(nextRegions(coordinator) == Set(second)) + assert(nextRegions(manager) == Set(first)) + assert(nextRegions(manager) == Set(second)) - jumpTo(coordinator, "first") + jumpTo(manager, "first") - assert(nextRegions(coordinator) == Set(first)) + assert(nextRegions(manager) == Set(first)) } it should "support multiple sequential jumps interleaved with region pulls" in { val (first, second, third, schedule) = threeLevelSchedule() - val coordinator = newJumpCoordinator(schedule) + val manager = newJumpManager(schedule) - assert(nextRegions(coordinator) == Set(first)) - assert(nextRegions(coordinator) == Set(second)) + assert(nextRegions(manager) == Set(first)) + assert(nextRegions(manager) == Set(second)) - jumpTo(coordinator, "first") - assert(nextRegions(coordinator) == Set(first)) + jumpTo(manager, "first") + assert(nextRegions(manager) == Set(first)) - jumpTo(coordinator, "second") - assert(nextRegions(coordinator) == Set(second)) - assert(nextRegions(coordinator) == Set(third)) + jumpTo(manager, "second") + assert(nextRegions(manager) == Set(second)) + assert(nextRegions(manager) == Set(third)) - jumpTo(coordinator, "first") - assert(nextRegions(coordinator) == Set(first)) + jumpTo(manager, "first") + assert(nextRegions(manager) == Set(first)) } it should "be a no-op when the target operator is not in any scheduled region" in { val (first, second, _, schedule) = threeLevelSchedule() - val coordinator = newJumpCoordinator(schedule) + val manager = newJumpManager(schedule) - assert(nextRegions(coordinator) == Set(first)) + assert(nextRegions(manager) == Set(first)) - jumpTo(coordinator, "does-not-exist") + jumpTo(manager, "does-not-exist") // Iteration position must be unaffected by an unknown target. - assert(nextRegions(coordinator) == Set(second)) + assert(nextRegions(manager) == Set(second)) } it should "leave the schedule untouched when called repeatedly with unknown operators" in { val (first, second, third, schedule) = threeLevelSchedule() - val coordinator = newJumpCoordinator(schedule) + val manager = newJumpManager(schedule) - jumpTo(coordinator, "ghost-1") - jumpTo(coordinator, "ghost-2") - jumpTo(coordinator, "ghost-3") + jumpTo(manager, "ghost-1") + jumpTo(manager, "ghost-2") + jumpTo(manager, "ghost-3") - assert(nextRegions(coordinator) == Set(first)) - assert(nextRegions(coordinator) == Set(second)) - assert(nextRegions(coordinator) == Set(third)) + assert(nextRegions(manager) == Set(first)) + assert(nextRegions(manager) == Set(second)) + assert(nextRegions(manager) == Set(third)) } it should "allow jumping back to the first region after the schedule is exhausted" in { val (first, second, third, schedule) = threeLevelSchedule() - val coordinator = newJumpCoordinator(schedule) + val manager = newJumpManager(schedule) - assert(nextRegions(coordinator) == Set(first)) - assert(nextRegions(coordinator) == Set(second)) - assert(nextRegions(coordinator) == Set(third)) - assert(nextRegions(coordinator) == Set.empty) + assert(nextRegions(manager) == Set(first)) + assert(nextRegions(manager) == Set(second)) + assert(nextRegions(manager) == Set(third)) + assert(nextRegions(manager) == Set.empty) - jumpTo(coordinator, "first") - assert(nextRegions(coordinator) == Set(first)) + jumpTo(manager, "first") + assert(nextRegions(manager) == Set(first)) } it should "support jumping forward past regions that have not yet been pulled" in { val (first, _, third, schedule) = threeLevelSchedule() - val coordinator = newJumpCoordinator(schedule) + val manager = newJumpManager(schedule) - assert(nextRegions(coordinator) == Set(first)) + assert(nextRegions(manager) == Set(first)) - jumpTo(coordinator, "third") - assert(nextRegions(coordinator) == Set(third)) - assert(nextRegions(coordinator) == Set.empty) + jumpTo(manager, "third") + assert(nextRegions(manager) == Set(third)) + assert(nextRegions(manager) == Set.empty) } it should "replay the target-onward range each time it jumps back" in { @@ -242,22 +242,22 @@ class WorkflowExecutionCoordinatorSpec val schedule = Schedule( Map(0 -> Set(a), 1 -> Set(b), 2 -> Set(c), 3 -> Set(d), 4 -> Set(e), 5 -> Set(f)) ) - val coordinator = newJumpCoordinator(schedule) + val manager = newJumpManager(schedule) Seq(a, b, c, d, e).foreach { region => - assert(nextRegions(coordinator) == Set(region)) + assert(nextRegions(manager) == Set(region)) } - jumpTo(coordinator, "c") + jumpTo(manager, "c") Seq(c, d, e).foreach { region => - assert(nextRegions(coordinator) == Set(region)) + assert(nextRegions(manager) == Set(region)) } - jumpTo(coordinator, "c") + jumpTo(manager, "c") Seq(c, d, e, f).foreach { region => - assert(nextRegions(coordinator) == Set(region)) + assert(nextRegions(manager) == Set(region)) } - assert(nextRegions(coordinator) == Set.empty) + assert(nextRegions(manager) == Set.empty) } } diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandlerSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandlerSpec.scala index 90e8b817bea..bdfd5598310 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandlerSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandlerSpec.scala @@ -49,7 +49,7 @@ import java.util.concurrent.LinkedBlockingQueue * `endWorker` is the controller's acknowledgement point before it sends actor-level `gracefulStop`. * * A successful reply means the worker has drained every queued workflow message. If the queue still contains work, - * the handler must fail so the region coordinator can retry the kill instead of stopping the actor too early. + * the handler must fail so the region execution manager can retry the kill instead of stopping the actor too early. */ class EndHandlerSpec extends AnyFlatSpec { private val workerId = ActorVirtualIdentity("Worker:WF1-test-op-main-0")