Skip to content

Commit 47154ff

Browse files
Yicong-Huangclaude
andcommitted
refactor(amber): rename Controller to Coordinator
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
1 parent 3084881 commit 47154ff

121 files changed

Lines changed: 890 additions & 890 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ option (scalapb.options) = {
3434

3535
message ControlRequest {
3636
oneof sealed_value {
37-
// request for controller
37+
// request for coordinator
3838
PropagateEmbeddedControlMessageRequest propagateEmbeddedControlMessageRequest = 1;
3939
TakeGlobalCheckpointRequest takeGlobalCheckpointRequest = 2;
4040
DebugCommandRequest debugCommandRequest = 3;
@@ -146,7 +146,7 @@ enum ConsoleMessageType{
146146
}
147147

148148
message ConsoleMessage {
149-
option (scalapb.message).extends = "org.apache.texera.amber.engine.architecture.controller.ClientEvent";
149+
option (scalapb.message).extends = "org.apache.texera.amber.engine.architecture.coordinator.ClientEvent";
150150
string worker_id = 1;
151151
google.protobuf.Timestamp timestamp = 2 [(scalapb.field).no_box = true];
152152
ConsoleMessageType msg_type = 3;

amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ option (scalapb.options) = {
3232
message ControlReturn {
3333
// Oneof block for various return types
3434
oneof sealed_value {
35-
// controller responses
35+
// coordinator responses
3636
RetrieveWorkflowStateResponse retrieveWorkflowStateResponse = 1;
3737
PropagateEmbeddedControlMessageResponse propagateEmbeddedControlMessageResponse = 2;
3838
TakeGlobalCheckpointResponse takeGlobalCheckpointResponse = 3;

amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto renamed to amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/coordinatorservice.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ option (scalapb.options) = {
2929
};
3030

3131

32-
service ControllerService {
32+
service CoordinatorService {
3333
rpc RetrieveWorkflowState(EmptyRequest) returns (RetrieveWorkflowStateResponse);
3434
rpc PropagateEmbeddedControlMessage(PropagateEmbeddedControlMessageRequest) returns (PropagateEmbeddedControlMessageResponse);
3535
rpc TakeGlobalCheckpoint(TakeGlobalCheckpointRequest) returns (TakeGlobalCheckpointResponse);
@@ -44,7 +44,7 @@ service ControllerService {
4444
rpc WorkerExecutionCompleted(EmptyRequest) returns (EmptyReturn);
4545
rpc JumpToOperatorRegion(JumpToOperatorRegionRequest) returns (EmptyReturn);
4646
rpc LinkWorkers(LinkWorkersRequest) returns (EmptyReturn);
47-
rpc ControllerInitiateQueryStatistics(QueryStatisticsRequest) returns (EmptyReturn);
47+
rpc CoordinatorInitiateQueryStatistics(QueryStatisticsRequest) returns (EmptyReturn);
4848
rpc RetryWorkflow(RetryWorkflowRequest) returns (EmptyReturn);
4949
rpc ReconfigureWorkflow(WorkflowReconfigureRequest) returns (EmptyReturn);
5050
}

amber/src/main/python/core/architecture/handlers/control/end_worker_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class EndWorkerHandler(ControlHandler):
3333

3434
async def end_worker(self, req: EmptyRequest) -> EmptyReturn:
3535
"""
36-
The response of EndWorker to the controller indicates that this worker
36+
The response of EndWorker to the coordinator indicates that this worker
3737
has finished not only the data processing logic, but also the processing
3838
of all the control messages.
3939
"""

amber/src/main/python/core/architecture/rpc/async_rpc_client.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
ReturnInvocation,
3333
ControlReturn,
3434
ControlInvocation,
35-
ControllerServiceStub,
35+
CoordinatorServiceStub,
3636
WorkerServiceStub,
3737
ControlRequest,
3838
)
@@ -62,16 +62,16 @@ def __init__(self, output_queue: InternalQueue, context: Context):
6262
self._send_sequences: Dict[ActorVirtualIdentity, int] = defaultdict(int)
6363
self._unfulfilled_promises: Dict[(ActorVirtualIdentity, int), Future] = dict()
6464
# TODO: is this correct?
65-
self._controller_service_stub = ControllerServiceStub("")
65+
self._coordinator_service_stub = CoordinatorServiceStub("")
6666
rpc_context = AsyncRpcContext(
6767
ActorVirtualIdentity(self._context.worker_id),
68-
ActorVirtualIdentity(name="CONTROLLER"),
68+
ActorVirtualIdentity(name="COORDINATOR"),
6969
)
70-
self._controller_service_stub._unary_unary = AsyncRPCClient._assign_context(
70+
self._coordinator_service_stub._unary_unary = AsyncRPCClient._assign_context(
7171
self, rpc_context
7272
)
73-
# Apply async_run to all async methods of the controller service stub
74-
self._wrap_all_async_methods_with_async_run(self._controller_service_stub)
73+
# Apply async_run to all async methods of the coordinator service stub
74+
self._wrap_all_async_methods_with_async_run(self._coordinator_service_stub)
7575

7676
def _assign_context(
7777
self, rpc_context: AsyncRpcContext
@@ -108,11 +108,11 @@ def _wrap_all_async_methods_with_async_run(self, instance: Any) -> None:
108108
if inspect.iscoroutinefunction(attr):
109109
setattr(instance, attr_name, async_run(attr))
110110

111-
def controller_stub(self) -> ControllerServiceStub:
111+
def coordinator_stub(self) -> CoordinatorServiceStub:
112112
"""
113-
Returns a proxy for interacting with the controller interface.
113+
Returns a proxy for interacting with the coordinator interface.
114114
"""
115-
return self._controller_service_stub
115+
return self._coordinator_service_stub
116116

117117
def get_worker_interface(self, target_worker) -> WorkerServiceStub:
118118
"""

amber/src/main/python/core/runnables/data_processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def _executor_session(self):
106106
back to MainLoop on exit. Reporting must happen *before* the
107107
switch: MainLoop's post-switch hook flushes console messages and
108108
then enters EXCEPTION_PAUSE, so anything queued after the switch
109-
would arrive at the controller only after the worker resumes.
109+
would arrive at the coordinator only after the worker resumes.
110110
"""
111111
try:
112112
executor = self._context.executor_manager.executor

amber/src/main/python/core/runnables/main_loop.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def __init__(
9090
def complete(self) -> None:
9191
"""
9292
Complete the DataProcessor, marking state to COMPLETED, and notify the
93-
controller.
93+
coordinator.
9494
"""
9595
# flush the buffered console prints
9696
self._check_and_report_console_messages(force_flush=True)
@@ -99,8 +99,8 @@ def complete(self) -> None:
9999
self.data_processor.stop()
100100
self.context.state_manager.transit_to(WorkerState.COMPLETED)
101101
self.context.statistics_manager.update_total_execution_time(time.time_ns())
102-
controller_interface = self._async_rpc_client.controller_stub()
103-
controller_interface.worker_execution_completed(EmptyRequest())
102+
coordinator_interface = self._async_rpc_client.coordinator_stub()
103+
coordinator_interface.worker_execution_completed(EmptyRequest())
104104
self.context.close()
105105

106106
def _check_and_process_control(self) -> None:
@@ -265,7 +265,7 @@ def _process_end_channel(self) -> None:
265265
)
266266

267267
if input_port_id is not None:
268-
self._async_rpc_client.controller_stub().port_completed(
268+
self._async_rpc_client.coordinator_stub().port_completed(
269269
PortCompletedRequest(
270270
port_id=input_port_id,
271271
input=True,
@@ -285,7 +285,7 @@ def _process_end_channel(self) -> None:
285285

286286
# Need to send port completed even if there is no downstream link
287287
for port_id in self.context.output_manager.get_port_ids():
288-
self._async_rpc_client.controller_stub().port_completed(
288+
self._async_rpc_client.coordinator_stub().port_completed(
289289
PortCompletedRequest(port_id=port_id, input=False)
290290
)
291291
self.complete()
@@ -423,7 +423,7 @@ def _process_data_element(self, data_element: DataElement) -> None:
423423
logger.exception(err)
424424

425425
def _send_console_message(self, console_message: ConsoleMessage):
426-
self._async_rpc_client.controller_stub().console_message_triggered(
426+
self._async_rpc_client.coordinator_stub().console_message_triggered(
427427
ConsoleMessageTriggeredRequest(console_message=console_message)
428428
)
429429

amber/src/main/python/pytexera/storage/large_binary_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class LargeBinaryManager:
3232
"""Manages large binaries in S3 for a worker process.
3333
3434
A singleton, so the cached S3 client is shared process-wide. create() appends a
35-
unique suffix to an execution-scoped base URI handed down by the controller as
35+
unique suffix to an execution-scoped base URI handed down by the coordinator as
3636
process config (``StorageConfig.S3_LARGE_BINARIES_BASE_URI``); the worker never
3737
holds an execution id. This is the Python counterpart of the JVM
3838
``LargeBinaryManager``, which uses a thread-local instead because one JVM process
@@ -90,7 +90,7 @@ def _ensure_bucket_exists(self, bucket: str):
9090
logger.info(f"Created bucket: {bucket}")
9191

9292
def create(self) -> str:
93-
"""Append a unique suffix to the controller-provided base URI.
93+
"""Append a unique suffix to the coordinator-provided base URI.
9494
9595
Pure string construction (no S3 round-trip); the bucket is created on demand at
9696
upload time. Returns e.g. ``s3://bucket/objects/{execution_id}/{uuid}``.

amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ package org.apache.texera.amber.engine.architecture.common
2121

2222
import org.apache.pekko.actor.{Address, Deploy}
2323
import org.apache.pekko.remote.RemoteScope
24-
import org.apache.texera.amber.core.workflow.{PhysicalOp, PreferController, RoundRobinPreference}
25-
import org.apache.texera.amber.engine.architecture.controller.execution.OperatorExecution
24+
import org.apache.texera.amber.core.workflow.{PhysicalOp, PreferCoordinator, RoundRobinPreference}
25+
import org.apache.texera.amber.engine.architecture.coordinator.execution.OperatorExecution
2626
import org.apache.texera.amber.engine.architecture.deploysemantics.AddressInfo
2727
import org.apache.texera.amber.engine.architecture.pythonworker.PythonWorkflowWorker
2828
import org.apache.texera.amber.engine.architecture.scheduling.config.OperatorConfig
@@ -38,16 +38,16 @@ object ExecutorDeployment {
3838

3939
def createWorkers(
4040
op: PhysicalOp,
41-
controllerActorService: PekkoActorService,
41+
coordinatorActorService: PekkoActorService,
4242
operatorExecution: OperatorExecution,
4343
operatorConfig: OperatorConfig,
4444
stateRestoreConfig: Option[StateRestoreConfig],
4545
replayLoggingConfig: Option[FaultToleranceConfig]
4646
): Unit = {
4747

4848
val addressInfo = AddressInfo(
49-
controllerActorService.getClusterNodeAddresses,
50-
controllerActorService.self.path.address
49+
coordinatorActorService.getClusterNodeAddresses,
50+
coordinatorActorService.self.path.address
5151
)
5252

5353
operatorConfig.workerConfigs.foreach(workerConfig => {
@@ -61,8 +61,8 @@ object ExecutorDeployment {
6161
)
6262
val locationPreference = op.locationPreference.getOrElse(RoundRobinPreference)
6363
val preferredAddress: Address = locationPreference match {
64-
case PreferController =>
65-
addressInfo.controllerAddress
64+
case PreferCoordinator =>
65+
addressInfo.coordinatorAddress
6666
case RoundRobinPreference =>
6767
assert(
6868
addressInfo.allAddresses.nonEmpty,
@@ -83,8 +83,8 @@ object ExecutorDeployment {
8383
)
8484
}
8585
// Note: At this point, we don't know if the actor is fully initialized.
86-
// Thus, the ActorRef returned from `controllerActorService.actorOf` is ignored.
87-
controllerActorService.actorOf(
86+
// Thus, the ActorRef returned from `coordinatorActorService.actorOf` is ignored.
87+
coordinatorActorService.actorOf(
8888
workflowWorker.withDeploy(Deploy(scope = RemoteScope(preferredAddress)))
8989
)
9090
operatorExecution.initWorkerExecution(workerId)

amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorRefMappingService.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.texera.amber.engine.architecture.common.WorkflowActor.{
2828
RegisterActorRef
2929
}
3030
import org.apache.texera.amber.engine.common.AmberLogging
31-
import org.apache.texera.amber.engine.common.virtualidentity.util.{CONTROLLER, SELF}
31+
import org.apache.texera.amber.engine.common.virtualidentity.util.{COORDINATOR, SELF}
3232
import org.apache.texera.amber.util.VirtualIdentityUtils
3333

3434
import scala.collection.mutable
@@ -104,8 +104,8 @@ class PekkoActorRefMappingService(actorService: PekkoActorService) extends Amber
104104
replyTo.foreach { actor =>
105105
actor ! RegisterActorRef(id, actorRefMapping(id))
106106
}
107-
} else if (actorId != CONTROLLER) {
108-
// propagation stops at controller
107+
} else if (actorId != COORDINATOR) {
108+
// propagation stops at coordinator
109109
if (!queriedActorVirtualIdentities.contains(id)) {
110110
try {
111111
actorService.parent ! GetActorRef(id, replyTo + actorService.self)
@@ -118,7 +118,7 @@ class PekkoActorRefMappingService(actorService: PekkoActorService) extends Amber
118118
}
119119
}
120120
} else {
121-
// on controller, wait for actor ref registration.
121+
// on coordinator, wait for actor ref registration.
122122
logger.warn(s"unknown identifier: ${VirtualIdentityUtils.toShorterString(id)}")
123123
val toNotifySet = toNotifyOnRegistration.getOrElseUpdate(id, mutable.HashSet[ActorRef]())
124124
replyTo.foreach(toNotifySet.add)

0 commit comments

Comments
 (0)