Skip to content

Commit 2b763e4

Browse files
authored
refactor(amber): rename remaining Akka* identifiers to Pekko* (#4949)
### What changes were proposed in this PR? The project moved off Akka onto Apache Pekko, but several internal Scala identifiers still carried the `Akka` prefix even though they wrap Pekko APIs. Pure rename across `amber` and `common/config`: - `AkkaConfig` → `PekkoConfig` (object + file) - `AkkaActorService` → `PekkoActorService` (class + file) - `AkkaActorRefMappingService` → `PekkoActorRefMappingService` - `AkkaMessageTransferService` → `PekkoMessageTransferService` - `akkaConfig`, `akkaActorService` method/parameter names → pekko-prefixed No behavior change. No string literals, config keys, or serialization registrations are touched — `cluster.conf` already uses `pekko.*` keys, the kryo registry doesn't reference these classes by name. The intentional `"akka"` literal in `DeployStrategiesSpec.scala` that contrasts pekko vs akka address strings stays. ### Any related issues, documentation, discussions? Closes #4948. ### How was this PR tested? `sbt WorkflowExecutionService/Test/compile` clean, `sbt WorkflowExecutionService/scalafmtCheckAll` clean. ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7 (Claude Code)
1 parent dcb457e commit 2b763e4

25 files changed

Lines changed: 62 additions & 62 deletions

amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ object ExecutorDeployment {
3838

3939
def createWorkers(
4040
op: PhysicalOp,
41-
controllerActorService: AkkaActorService,
41+
controllerActorService: PekkoActorService,
4242
operatorExecution: OperatorExecution,
4343
operatorConfig: OperatorConfig,
4444
stateRestoreConfig: Option[StateRestoreConfig],

amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaActorRefMappingService.scala renamed to amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorRefMappingService.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.texera.amber.util.VirtualIdentityUtils
3333

3434
import scala.collection.mutable
3535

36-
class AkkaActorRefMappingService(actorService: AkkaActorService) extends AmberLogging {
36+
class PekkoActorRefMappingService(actorService: PekkoActorService) extends AmberLogging {
3737

3838
override def actorId: ActorVirtualIdentity = actorService.id
3939

amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaActorService.scala renamed to amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoActorService.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.texera.amber.engine.common.FutureBijection._
2828
import scala.concurrent.ExecutionContext
2929
import scala.concurrent.duration.{DurationInt, FiniteDuration}
3030

31-
class AkkaActorService(val id: ActorVirtualIdentity, actorContext: ActorContext) {
31+
class PekkoActorService(val id: ActorVirtualIdentity, actorContext: ActorContext) {
3232

3333
implicit def ec: ExecutionContext = actorContext.dispatcher
3434

amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaMessageTransferService.scala renamed to amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/PekkoMessageTransferService.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ import org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessage
3030
import scala.collection.mutable
3131
import scala.concurrent.duration.DurationInt
3232

33-
class AkkaMessageTransferService(
34-
actorService: AkkaActorService,
35-
refService: AkkaActorRefMappingService,
33+
class PekkoMessageTransferService(
34+
actorService: PekkoActorService,
35+
refService: PekkoActorRefMappingService,
3636
handleBackpressure: Boolean => Unit
3737
) extends AmberLogging {
3838

amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/WorkflowActor.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,9 @@ abstract class WorkflowActor(
8383
with AmberLogging {
8484

8585
//
86-
// Akka related components:
86+
// Pekko related components:
8787
//
88-
val actorService: AkkaActorService = new AkkaActorService(actorId, this.context)
88+
val actorService: PekkoActorService = new PekkoActorService(actorId, this.context)
8989
actorService.getAvailableNodeAddressesFunc = () => {
9090
implicit val timeout: Timeout = 5.seconds
9191
Await
@@ -95,12 +95,12 @@ abstract class WorkflowActor(
9595
)
9696
.asInstanceOf[Array[Address]]
9797
}
98-
val actorRefMappingService: AkkaActorRefMappingService = new AkkaActorRefMappingService(
98+
val actorRefMappingService: PekkoActorRefMappingService = new PekkoActorRefMappingService(
9999
actorService
100100
)
101101
actorRefMappingService.registerActorRef(actorId, self)
102-
val transferService: AkkaMessageTransferService =
103-
new AkkaMessageTransferService(actorService, actorRefMappingService, handleBackpressure)
102+
val transferService: PekkoMessageTransferService =
103+
new PekkoMessageTransferService(actorService, actorRefMappingService, handleBackpressure)
104104

105105
logger.info(s"worker replay log writing conf: $replayLogConfOpt")
106106

amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerProcessor.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ package org.apache.texera.amber.engine.architecture.controller
2222
import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
2323
import org.apache.texera.amber.core.workflow.WorkflowContext
2424
import org.apache.texera.amber.engine.architecture.common.{
25-
AkkaActorRefMappingService,
26-
AkkaActorService,
27-
AkkaMessageTransferService,
25+
PekkoActorRefMappingService,
26+
PekkoActorService,
27+
PekkoMessageTransferService,
2828
AmberProcessor
2929
}
3030
import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
@@ -57,21 +57,21 @@ class ControllerProcessor(
5757
this.controllerTimerService = controllerTimerService
5858
}
5959

60-
@transient var transferService: AkkaMessageTransferService = _
60+
@transient var transferService: PekkoMessageTransferService = _
6161

62-
def setupTransferService(transferService: AkkaMessageTransferService): Unit = {
62+
def setupTransferService(transferService: PekkoMessageTransferService): Unit = {
6363
this.transferService = transferService
6464
}
6565

66-
@transient var actorService: AkkaActorService = _
66+
@transient var actorService: PekkoActorService = _
6767

68-
def setupActorService(akkaActorService: AkkaActorService): Unit = {
69-
this.actorService = akkaActorService
68+
def setupActorService(pekkoActorService: PekkoActorService): Unit = {
69+
this.actorService = pekkoActorService
7070
}
7171

72-
@transient var actorRefService: AkkaActorRefMappingService = _
72+
@transient var actorRefService: PekkoActorRefMappingService = _
7373

74-
def setupActorRefService(actorRefService: AkkaActorRefMappingService): Unit = {
74+
def setupActorRefService(actorRefService: PekkoActorRefMappingService): Unit = {
7575
this.actorRefService = actorRefService
7676
this.workflowExecutionCoordinator.setupActorRefService(this.actorRefService)
7777
}

amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerTimerService.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package org.apache.texera.amber.engine.architecture.controller
2121

2222
import org.apache.pekko.actor.Cancellable
23-
import org.apache.texera.amber.engine.architecture.common.AkkaActorService
23+
import org.apache.texera.amber.engine.architecture.common.PekkoActorService
2424
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
2525
AsyncRPCContext,
2626
QueryStatisticsRequest,
@@ -34,7 +34,7 @@ import scala.concurrent.duration.{DurationInt, FiniteDuration, MILLISECONDS}
3434

3535
class ControllerTimerService(
3636
controllerConfig: ControllerConfig,
37-
akkaActorService: AkkaActorService
37+
pekkoActorService: PekkoActorService
3838
) {
3939
var statusUpdateAskHandle: Option[Cancellable] = None
4040
var runtimeStatisticsAskHandle: Option[Cancellable] = None
@@ -46,7 +46,7 @@ class ControllerTimerService(
4646
): Option[Cancellable] = {
4747
if (intervalMs.nonEmpty && handleOpt.isEmpty) {
4848
Option(
49-
akkaActorService.sendToSelfWithFixedDelay(
49+
pekkoActorService.sendToSelfWithFixedDelay(
5050
0.milliseconds,
5151
FiniteDuration.apply(intervalMs.get, MILLISECONDS),
5252
ControlInvocation(

amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/WorkerTimerService.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ package org.apache.texera.amber.engine.architecture.messaginglayer
2121

2222
import org.apache.pekko.actor.Cancellable
2323
import org.apache.texera.amber.config.ApplicationConfig
24-
import org.apache.texera.amber.engine.architecture.common.AkkaActorService
24+
import org.apache.texera.amber.engine.architecture.common.PekkoActorService
2525
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
2626
AsyncRPCContext,
2727
EmptyRequest
@@ -33,7 +33,7 @@ import org.apache.texera.amber.engine.common.virtualidentity.util.SELF
3333

3434
import scala.concurrent.duration.{DurationInt, FiniteDuration, MILLISECONDS}
3535

36-
class WorkerTimerService(actorService: AkkaActorService) {
36+
class WorkerTimerService(actorService: PekkoActorService) {
3737

3838
private val enabledAdaptiveBatching = ApplicationConfig.enableAdaptiveNetworkBuffering
3939
private val adaptiveBatchInterval = ApplicationConfig.adaptiveBufferingTimeoutMs

amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI
2626
import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
2727
import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PhysicalLink, PhysicalOp}
2828
import org.apache.texera.amber.engine.architecture.common.{
29-
AkkaActorRefMappingService,
30-
AkkaActorService,
29+
PekkoActorRefMappingService,
30+
PekkoActorService,
3131
ExecutorDeployment
3232
}
3333
import org.apache.texera.amber.engine.architecture.controller.execution.{
@@ -95,8 +95,8 @@ class RegionExecutionCoordinator(
9595
workflowExecution: WorkflowExecution,
9696
asyncRPCClient: AsyncRPCClient,
9797
controllerConfig: ControllerConfig,
98-
actorService: AkkaActorService,
99-
actorRefService: AkkaActorRefMappingService
98+
actorService: PekkoActorService,
99+
actorRefService: PekkoActorRefMappingService
100100
) extends AmberLogging {
101101

102102
initRegionExecution()
@@ -374,7 +374,7 @@ class RegionExecutionCoordinator(
374374
}
375375

376376
private def buildOperator(
377-
actorService: AkkaActorService,
377+
actorService: PekkoActorService,
378378
physicalOp: PhysicalOp,
379379
operatorConfig: OperatorConfig,
380380
operatorExecution: OperatorExecution

amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ import com.twitter.util.Future
2323
import com.typesafe.scalalogging.LazyLogging
2424
import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PhysicalLink}
2525
import org.apache.texera.amber.engine.architecture.common.{
26-
AkkaActorRefMappingService,
27-
AkkaActorService
26+
PekkoActorRefMappingService,
27+
PekkoActorService
2828
}
2929
import org.apache.texera.amber.engine.architecture.controller.ControllerConfig
3030
import org.apache.texera.amber.engine.architecture.controller.ExecutionStateUpdate
@@ -49,9 +49,9 @@ class WorkflowExecutionCoordinator(
4949
mutable.HashMap()
5050
private val completionNotified: AtomicBoolean = new AtomicBoolean(false)
5151

52-
@transient var actorRefService: AkkaActorRefMappingService = _
52+
@transient var actorRefService: PekkoActorRefMappingService = _
5353

54-
def setupActorRefService(actorRefService: AkkaActorRefMappingService): Unit = {
54+
def setupActorRefService(actorRefService: PekkoActorRefMappingService): Unit = {
5555
this.actorRefService = actorRefService
5656
}
5757

@@ -62,7 +62,7 @@ class WorkflowExecutionCoordinator(
6262
*
6363
* After the syncs, if there are no running region(s), it will start new regions (if available).
6464
*/
65-
def coordinateRegionExecutors(actorService: AkkaActorService): Future[Unit] = {
65+
def coordinateRegionExecutors(actorService: PekkoActorService): Future[Unit] = {
6666
val unfinishedRegionCoordinators =
6767
regionExecutionCoordinators.values.filter(!_.isCompleted).toSeq
6868

0 commit comments

Comments
 (0)