Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ option (scalapb.options) = {

message ControlRequest {
oneof sealed_value {
// request for controller
// request for coordinator
PropagateEmbeddedControlMessageRequest propagateEmbeddedControlMessageRequest = 1;
TakeGlobalCheckpointRequest takeGlobalCheckpointRequest = 2;
DebugCommandRequest debugCommandRequest = 3;
Expand Down Expand Up @@ -146,7 +146,7 @@ enum ConsoleMessageType{
}

message ConsoleMessage {
option (scalapb.message).extends = "org.apache.texera.amber.engine.architecture.controller.ClientEvent";
option (scalapb.message).extends = "org.apache.texera.amber.engine.architecture.coordinator.ClientEvent";
string worker_id = 1;
google.protobuf.Timestamp timestamp = 2 [(scalapb.field).no_box = true];
ConsoleMessageType msg_type = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ option (scalapb.options) = {
message ControlReturn {
// Oneof block for various return types
oneof sealed_value {
// controller responses
// coordinator responses
RetrieveWorkflowStateResponse retrieveWorkflowStateResponse = 1;
PropagateEmbeddedControlMessageResponse propagateEmbeddedControlMessageResponse = 2;
TakeGlobalCheckpointResponse takeGlobalCheckpointResponse = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ option (scalapb.options) = {
};


service ControllerService {
service CoordinatorService {
rpc RetrieveWorkflowState(EmptyRequest) returns (RetrieveWorkflowStateResponse);
rpc PropagateEmbeddedControlMessage(PropagateEmbeddedControlMessageRequest) returns (PropagateEmbeddedControlMessageResponse);
rpc TakeGlobalCheckpoint(TakeGlobalCheckpointRequest) returns (TakeGlobalCheckpointResponse);
Expand All @@ -44,7 +44,7 @@ service ControllerService {
rpc WorkerExecutionCompleted(EmptyRequest) returns (EmptyReturn);
rpc JumpToOperatorRegion(JumpToOperatorRegionRequest) returns (EmptyReturn);
rpc LinkWorkers(LinkWorkersRequest) returns (EmptyReturn);
rpc ControllerInitiateQueryStatistics(QueryStatisticsRequest) returns (EmptyReturn);
rpc CoordinatorInitiateQueryStatistics(QueryStatisticsRequest) returns (EmptyReturn);
rpc RetryWorkflow(RetryWorkflowRequest) returns (EmptyReturn);
rpc ReconfigureWorkflow(WorkflowReconfigureRequest) returns (EmptyReturn);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class EndWorkerHandler(ControlHandler):

async def end_worker(self, req: EmptyRequest) -> EmptyReturn:
"""
The response of EndWorker to the controller indicates that this worker
The response of EndWorker to the coordinator indicates that this worker
has finished not only the data processing logic, but also the processing
of all the control messages.
"""
Expand Down
18 changes: 9 additions & 9 deletions amber/src/main/python/core/architecture/rpc/async_rpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
ReturnInvocation,
ControlReturn,
ControlInvocation,
ControllerServiceStub,
CoordinatorServiceStub,
WorkerServiceStub,
ControlRequest,
)
Expand Down Expand Up @@ -62,16 +62,16 @@ def __init__(self, output_queue: InternalQueue, context: Context):
self._send_sequences: Dict[ActorVirtualIdentity, int] = defaultdict(int)
self._unfulfilled_promises: Dict[(ActorVirtualIdentity, int), Future] = dict()
# TODO: is this correct?
self._controller_service_stub = ControllerServiceStub("")
self._coordinator_service_stub = CoordinatorServiceStub("")
rpc_context = AsyncRpcContext(
ActorVirtualIdentity(self._context.worker_id),
ActorVirtualIdentity(name="CONTROLLER"),
ActorVirtualIdentity(name="COORDINATOR"),
)
self._controller_service_stub._unary_unary = AsyncRPCClient._assign_context(
self._coordinator_service_stub._unary_unary = AsyncRPCClient._assign_context(
self, rpc_context
)
# Apply async_run to all async methods of the controller service stub
self._wrap_all_async_methods_with_async_run(self._controller_service_stub)
# Apply async_run to all async methods of the coordinator service stub
self._wrap_all_async_methods_with_async_run(self._coordinator_service_stub)

def _assign_context(
self, rpc_context: AsyncRpcContext
Expand Down Expand Up @@ -108,11 +108,11 @@ def _wrap_all_async_methods_with_async_run(self, instance: Any) -> None:
if inspect.iscoroutinefunction(attr):
setattr(instance, attr_name, async_run(attr))

def controller_stub(self) -> ControllerServiceStub:
def coordinator_stub(self) -> CoordinatorServiceStub:
"""
Returns a proxy for interacting with the controller interface.
Returns a proxy for interacting with the coordinator interface.
"""
return self._controller_service_stub
return self._coordinator_service_stub

def get_worker_interface(self, target_worker) -> WorkerServiceStub:
"""
Expand Down
2 changes: 1 addition & 1 deletion amber/src/main/python/core/runnables/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def _executor_session(self):
back to MainLoop on exit. Reporting must happen *before* the
switch: MainLoop's post-switch hook flushes console messages and
then enters EXCEPTION_PAUSE, so anything queued after the switch
would arrive at the controller only after the worker resumes.
would arrive at the coordinator only after the worker resumes.
"""
try:
executor = self._context.executor_manager.executor
Expand Down
12 changes: 6 additions & 6 deletions amber/src/main/python/core/runnables/main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def __init__(
def complete(self) -> None:
"""
Complete the DataProcessor, marking state to COMPLETED, and notify the
controller.
coordinator.
"""
# flush the buffered console prints
self._check_and_report_console_messages(force_flush=True)
Expand All @@ -99,8 +99,8 @@ def complete(self) -> None:
self.data_processor.stop()
self.context.state_manager.transit_to(WorkerState.COMPLETED)
self.context.statistics_manager.update_total_execution_time(time.time_ns())
controller_interface = self._async_rpc_client.controller_stub()
controller_interface.worker_execution_completed(EmptyRequest())
coordinator_interface = self._async_rpc_client.coordinator_stub()
coordinator_interface.worker_execution_completed(EmptyRequest())
self.context.close()

def _check_and_process_control(self) -> None:
Expand Down Expand Up @@ -265,7 +265,7 @@ def _process_end_channel(self) -> None:
)

if input_port_id is not None:
self._async_rpc_client.controller_stub().port_completed(
self._async_rpc_client.coordinator_stub().port_completed(
PortCompletedRequest(
port_id=input_port_id,
input=True,
Expand All @@ -285,7 +285,7 @@ def _process_end_channel(self) -> None:

# Need to send port completed even if there is no downstream link
for port_id in self.context.output_manager.get_port_ids():
self._async_rpc_client.controller_stub().port_completed(
self._async_rpc_client.coordinator_stub().port_completed(
PortCompletedRequest(port_id=port_id, input=False)
)
self.complete()
Expand Down Expand Up @@ -423,7 +423,7 @@ def _process_data_element(self, data_element: DataElement) -> None:
logger.exception(err)

def _send_console_message(self, console_message: ConsoleMessage):
self._async_rpc_client.controller_stub().console_message_triggered(
self._async_rpc_client.coordinator_stub().console_message_triggered(
ConsoleMessageTriggeredRequest(console_message=console_message)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class LargeBinaryManager:
"""Manages large binaries in S3 for a worker process.

A singleton, so the cached S3 client is shared process-wide. create() appends a
unique suffix to an execution-scoped base URI handed down by the controller as
unique suffix to an execution-scoped base URI handed down by the coordinator as
process config (``StorageConfig.S3_LARGE_BINARIES_BASE_URI``); the worker never
holds an execution id. This is the Python counterpart of the JVM
``LargeBinaryManager``, which uses a thread-local instead because one JVM process
Expand Down Expand Up @@ -90,7 +90,7 @@ def _ensure_bucket_exists(self, bucket: str):
logger.info(f"Created bucket: {bucket}")

def create(self) -> str:
"""Append a unique suffix to the controller-provided base URI.
"""Append a unique suffix to the coordinator-provided base URI.

Pure string construction (no S3 round-trip); the bucket is created on demand at
upload time. Returns e.g. ``s3://bucket/objects/{execution_id}/{uuid}``.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ package org.apache.texera.amber.engine.architecture.common

import org.apache.pekko.actor.{Address, Deploy}
import org.apache.pekko.remote.RemoteScope
import org.apache.texera.amber.core.workflow.{PhysicalOp, PreferController, RoundRobinPreference}
import org.apache.texera.amber.engine.architecture.controller.execution.OperatorExecution
import org.apache.texera.amber.core.workflow.{PhysicalOp, PreferCoordinator, RoundRobinPreference}
import org.apache.texera.amber.engine.architecture.coordinator.execution.OperatorExecution
import org.apache.texera.amber.engine.architecture.deploysemantics.AddressInfo
import org.apache.texera.amber.engine.architecture.pythonworker.PythonWorkflowWorker
import org.apache.texera.amber.engine.architecture.scheduling.config.OperatorConfig
Expand All @@ -38,16 +38,16 @@ object ExecutorDeployment {

def createWorkers(
op: PhysicalOp,
controllerActorService: PekkoActorService,
coordinatorActorService: PekkoActorService,
operatorExecution: OperatorExecution,
operatorConfig: OperatorConfig,
stateRestoreConfig: Option[StateRestoreConfig],
replayLoggingConfig: Option[FaultToleranceConfig]
): Unit = {

val addressInfo = AddressInfo(
controllerActorService.getClusterNodeAddresses,
controllerActorService.self.path.address
coordinatorActorService.getClusterNodeAddresses,
coordinatorActorService.self.path.address
)

operatorConfig.workerConfigs.foreach(workerConfig => {
Expand All @@ -61,8 +61,8 @@ object ExecutorDeployment {
)
val locationPreference = op.locationPreference.getOrElse(RoundRobinPreference)
val preferredAddress: Address = locationPreference match {
case PreferController =>
addressInfo.controllerAddress
case PreferCoordinator =>
addressInfo.coordinatorAddress
case RoundRobinPreference =>
assert(
addressInfo.allAddresses.nonEmpty,
Expand All @@ -83,8 +83,8 @@ object ExecutorDeployment {
)
}
// Note: At this point, we don't know if the actor is fully initialized.
// Thus, the ActorRef returned from `controllerActorService.actorOf` is ignored.
controllerActorService.actorOf(
// Thus, the ActorRef returned from `coordinatorActorService.actorOf` is ignored.
coordinatorActorService.actorOf(
workflowWorker.withDeploy(Deploy(scope = RemoteScope(preferredAddress)))
)
operatorExecution.initWorkerExecution(workerId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.texera.amber.engine.architecture.common.WorkflowActor.{
RegisterActorRef
}
import org.apache.texera.amber.engine.common.AmberLogging
import org.apache.texera.amber.engine.common.virtualidentity.util.{CONTROLLER, SELF}
import org.apache.texera.amber.engine.common.virtualidentity.util.{COORDINATOR, SELF}
import org.apache.texera.amber.util.VirtualIdentityUtils

import scala.collection.mutable
Expand Down Expand Up @@ -104,8 +104,8 @@ class PekkoActorRefMappingService(actorService: PekkoActorService) extends Amber
replyTo.foreach { actor =>
actor ! RegisterActorRef(id, actorRefMapping(id))
}
} else if (actorId != CONTROLLER) {
// propagation stops at controller
} else if (actorId != COORDINATOR) {
// propagation stops at coordinator
if (!queriedActorVirtualIdentities.contains(id)) {
try {
actorService.parent ! GetActorRef(id, replyTo + actorService.self)
Expand All @@ -118,7 +118,7 @@ class PekkoActorRefMappingService(actorService: PekkoActorService) extends Amber
}
}
} else {
// on controller, wait for actor ref registration.
// on coordinator, wait for actor ref registration.
logger.warn(s"unknown identifier: ${VirtualIdentityUtils.toShorterString(id)}")
val toNotifySet = toNotifyOnRegistration.getOrElseUpdate(id, mutable.HashSet[ActorRef]())
replyTo.foreach(toNotifySet.add)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.texera.amber.engine.architecture.controller
package org.apache.texera.amber.engine.architecture.coordinator

import org.apache.texera.amber.core.tuple.Tuple
import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
Expand Down
Loading
Loading