From c18b1d35f2c172972ff7b9b72bcf135e25332cec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Sun, 5 Jul 2026 13:42:41 +0800 Subject: [PATCH 1/2] fix: fix scaladoc warnings across all modules Motivation: The project had scaladoc warnings about unresolvable links, ambiguous overloads, and incorrect package prefixes across actor, actor-typed, stream, cluster, serialization-jackson, persistence, and other modules. Modification: - Replace unresolvable [[...]] links with backtick-quoted code for external types - Fix incorrect package prefixes (pekko. -> org.apache.pekko.) - Resolve ambiguous overload links using backtick-quoted code - Fix @throws tags with fully qualified type names - Fix variable references in code blocks (Helpers.scala) Result: All scaladoc warnings are resolved across the affected modules. Tests: Verified with `sbt doc` on affected subprojects - 0 warnings. References: None - scaladoc cleanup --- .../pekko/actor/testkit/typed/Effect.scala | 4 +- .../typed/javadsl/BehaviorTestKit.scala | 6 +- .../javadsl/TestKitJUnit5Extension.scala | 2 +- .../TestKitJUnitJupiterExtension.scala | 2 +- .../testkit/typed/javadsl/TestProbe.scala | 4 +- .../pekko/actor/typed/ActorSystem.scala | 2 +- .../actor/typed/SupervisorStrategy.scala | 4 +- .../actor/typed/javadsl/AskPattern.scala | 2 +- .../apache/pekko/actor/ActorSelection.scala | 4 +- .../scala/org/apache/pekko/actor/FSM.scala | 2 +- .../apache/pekko/dispatch/Dispatchers.scala | 2 +- .../org/apache/pekko/io/dns/DnsProtocol.scala | 2 +- .../org/apache/pekko/japi/Throwables.scala | 2 +- .../apache/pekko/pattern/BackoffOptions.scala | 6 +- .../pekko/pattern/BackoffSupervisor.scala | 4 +- .../apache/pekko/pattern/CircuitBreaker.scala | 20 +-- .../pekko/pattern/FutureTimeoutSupport.scala | 4 +- .../org/apache/pekko/pattern/Patterns.scala | 16 +- .../apache/pekko/pattern/StatusReply.scala | 2 +- .../internal/CircuitBreakerTelemetry.scala | 4 +- .../org/apache/pekko/util/ByteString.scala | 14 +- .../scala/org/apache/pekko/util/Helpers.scala | 2 +- .../org/apache/pekko/util/TokenBucket.scala | 2 +- .../metrics/ClusterMetricsCollector.scala | 6 +- .../metrics/ClusterMetricsStrategy.scala | 2 +- .../metrics/protobuf/MessageSerializer.scala | 2 +- .../typed/javadsl/ClusterSharding.scala | 4 +- .../typed/scaladsl/ClusterSharding.scala | 2 +- .../cluster/sharding/ClusterSharding.scala | 6 +- .../org/apache/pekko/cluster/Cluster.scala | 2 +- .../remote/testconductor/Conductor.scala | 8 +- .../pekko/remote/testconductor/Player.scala | 4 +- .../transport/netty/SSLEngineProvider.scala | 2 +- .../jackson/JacksonObjectMapperProvider.scala | 14 +- .../stream/typed/javadsl/ActorFlow.scala | 12 +- .../stream/typed/javadsl/ActorSource.scala | 2 +- .../stream/typed/scaladsl/ActorFlow.scala | 16 +- .../stream/typed/scaladsl/ActorSource.scala | 2 +- .../pekko/stream/ActorMaterializer.scala | 2 +- .../scala/org/apache/pekko/stream/Graph.scala | 2 +- .../pekko/stream/TypePreservingFanIn.scala | 4 +- .../pekko/stream/TypePreservingFanOut.scala | 2 +- .../apache/pekko/stream/impl/package.scala | 16 +- .../pekko/stream/scaladsl/BidiFlow.scala | 2 +- .../pekko/stream/scaladsl/Compression.scala | 8 +- .../apache/pekko/stream/scaladsl/FileIO.scala | 8 +- .../apache/pekko/stream/scaladsl/Flow.scala | 64 +++---- .../stream/scaladsl/FlowWithContextOps.scala | 156 +++++++++--------- .../pekko/stream/scaladsl/Framing.scala | 2 +- .../apache/pekko/stream/scaladsl/Graph.scala | 4 +- .../apache/pekko/stream/scaladsl/Queue.scala | 14 +- .../pekko/stream/scaladsl/RetryFlow.scala | 2 +- .../apache/pekko/stream/scaladsl/Sink.scala | 12 +- .../apache/pekko/stream/scaladsl/Source.scala | 4 +- .../apache/pekko/stream/scaladsl/TLS.scala | 2 +- .../pekko/stream/scaladsl/package.scala | 2 +- .../pekko/stream/stage/GraphStage.scala | 12 +- 57 files changed, 257 insertions(+), 257 deletions(-) diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/Effect.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/Effect.scala index ca1d04505bd..350639e4c88 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/Effect.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/Effect.scala @@ -49,8 +49,8 @@ object Effect { * The 'replyToRef' is exposed so that the target inbox can expect the actual message sent to * initiate the ask. * - * Note that this requires the ask to be initiated via the [[ActorContext]]. The [[Future]] returning - * ask is not testable in the [[BehaviorTestKit]]. + * Note that this requires the ask to be initiated via the [[ActorContext]]. The `Future` returning + * ask is not testable in the `BehaviorTestKit`. */ final case class AskInitiated[Req, Res, T](target: RecipientRef[Req], responseTimeout: FiniteDuration, diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/BehaviorTestKit.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/BehaviorTestKit.scala index 8057b0e8e66..52b3a87b668 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/BehaviorTestKit.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/BehaviorTestKit.scala @@ -86,7 +86,7 @@ abstract class BehaviorTestKit[T] { def runAsk[Res](messageFactory: JFunction[ActorRef[Res], T]): ReplyInbox[Res] /** - * The same as [[runAsk]], but with the response class specified. This improves type inference in Java + * The same as `runAsk`, but with the response class specified. This improves type inference in Java * when asserting on the reply in the same statement as the `runAsk` as in: * * ``` @@ -102,14 +102,14 @@ abstract class BehaviorTestKit[T] { runAsk(messageFactory) /** - * The same as [[runAsk]] but only for requests that result in a response of type [[pekko.pattern.StatusReply]]. + * The same as `runAsk` but only for requests that result in a response of type [[pekko.pattern.StatusReply]]. * * @since 1.3.0 */ def runAskWithStatus[Res](messageFactory: JFunction[ActorRef[StatusReply[Res]], T]): StatusReplyInbox[Res] /** - * The same as [[runAskWithStatus]], but with the response class specified. This improves type inference in + * The same as `runAskWithStatus`, but with the response class specified. This improves type inference in * Java when asserting on the reply in the same statement as the `runAskWithStatus` as in: * * ``` diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/TestKitJUnit5Extension.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/TestKitJUnit5Extension.scala index 679eae0df7c..276a0a117d8 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/TestKitJUnit5Extension.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/TestKitJUnit5Extension.scala @@ -28,7 +28,7 @@ final class TestKitJUnit5Extension() extends AfterAllCallback with BeforeTestExe var testKit: Option[ActorTestKit] = None /** - * Get a reference to the field annotated with `@JUnit5Testkit` [[JUnit5TestKit]] + * Get a reference to the field annotated with `@JUnit5TestKit` */ override def beforeTestExecution(context: ExtensionContext): Unit = { val testInstance: Option[AnyRef] = diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/TestKitJUnitJupiterExtension.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/TestKitJUnitJupiterExtension.scala index 6522897fd03..458239ae85b 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/TestKitJUnitJupiterExtension.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/TestKitJUnitJupiterExtension.scala @@ -27,7 +27,7 @@ final class TestKitJUnitJupiterExtension() extends AfterAllCallback with BeforeT var testKit: Option[ActorTestKit] = None /** - * Get a reference to the field annotated with `@JUnitJupiterTestKit` [[JUnitJupiterTestKit]] + * Get a reference to the field annotated with `@JUnitJupiterTestKit` `JUnitJupiterTestKit` */ override def beforeTestExecution(context: ExtensionContext): Unit = { val testInstance: Option[AnyRef] = diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/TestProbe.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/TestProbe.scala index bba58f2c455..b972630cfd0 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/TestProbe.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/TestProbe.scala @@ -69,8 +69,8 @@ object TestProbe { } /** - * Java API: * Create instances through the `create` factories in the [[TestProbe]] companion - * or via [[ActorTestKit#createTestProbe]]. + * Java API: Create instances through the `create` factories in the `TestProbe` companion + * or via `ActorTestKit.createTestProbe`. * * A test probe is essentially a queryable mailbox which can be used in place of an actor and the received * messages can then be asserted etc. diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/ActorSystem.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/ActorSystem.scala index 98022a026a6..24b8882396e 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/ActorSystem.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/ActorSystem.scala @@ -34,7 +34,7 @@ import com.typesafe.config.{ Config, ConfigFactory } /** * An ActorSystem is home to a hierarchy of Actors. It is created using - * [[ActorSystem#apply]] from a [[Behavior]] object that describes the root + * `ActorSystem.apply` from a [[Behavior]] object that describes the root * Actor of this hierarchy and which will create all other Actors beneath it. * A system also implements the [[ActorRef]] type, and sending a message to * the system directs that message to the root Actor. diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/SupervisorStrategy.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/SupervisorStrategy.scala index 9f6f8d163b5..0ec45b7fe39 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/SupervisorStrategy.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/SupervisorStrategy.scala @@ -36,7 +36,7 @@ object SupervisorStrategy { /** * Restart immediately without any limit on number of restart retries. A limit can be - * added with [[RestartSupervisorStrategy.withLimit]]. + * added with `RestartSupervisorStrategy.withLimit`. * * If the actor behavior is deferred and throws an exception on startup the actor is stopped * (restarting would be dangerous as it could lead to an infinite restart-loop) @@ -358,7 +358,7 @@ sealed abstract class BackoffSupervisorStrategy extends SupervisorStrategy { * The initial errors are logged at the level defined with [[BackoffSupervisorStrategy.withLogLevel]]. * For example, the first 3 errors can be logged at INFO level and thereafter at ERROR level. * - * The counter (and log level) is reset after the [[BackoffSupervisorStrategy.withResetBackoffAfter]] + * The counter (and log level) is reset after the `BackoffSupervisorStrategy.withResetBackoffAfter` * duration. */ def withCriticalLogLevel(criticalLevel: Level, afterErrors: Int): BackoffSupervisorStrategy diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/AskPattern.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/AskPattern.scala index 62440e97b21..58b47af6b31 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/AskPattern.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/AskPattern.scala @@ -55,7 +55,7 @@ object AskPattern { /** * The same as [[ask]] but only for requests that result in a response of type [[pekko.pattern.StatusReply]]. * If the response is a [[pekko.pattern.StatusReply#success]] the returned future is completed successfully with the wrapped response. - * If the status response is a [[pekko.pattern.StatusReply#error]] the returned future will be failed with the + * If the status response is a `StatusReply.error` the returned future will be failed with the * exception in the error (normally a [[pekko.pattern.StatusReply.ErrorMessage]]). */ def askWithStatus[Req, Res]( diff --git a/actor/src/main/scala/org/apache/pekko/actor/ActorSelection.scala b/actor/src/main/scala/org/apache/pekko/actor/ActorSelection.scala index 0cc578f23a6..dbb5b439ad8 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/ActorSelection.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/ActorSelection.scala @@ -95,7 +95,7 @@ abstract class ActorSelection extends Serializable { def resolveOne(timeout: FiniteDuration): Future[ActorRef] = resolveOne()(timeout) /** - * Java API for [[#resolveOne]] + * Java API for `resolveOne` * * Resolve the [[ActorRef]] matching this selection. * The result is returned as a CompletionStage that is completed with the [[ActorRef]] @@ -337,7 +337,7 @@ private[pekko] case object SelectParent extends SelectionPathElement { } /** - * When [[ActorSelection#resolveOne]] can't identify the actor the + * When `ActorSelection.resolveOne` can't identify the actor the * `Future` is completed with this failure. */ @SerialVersionUID(1L) diff --git a/actor/src/main/scala/org/apache/pekko/actor/FSM.scala b/actor/src/main/scala/org/apache/pekko/actor/FSM.scala index e7a2a10805d..8f3fe318b6e 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/FSM.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/FSM.scala @@ -693,7 +693,7 @@ trait FSM[S, D] extends Actor with Listeners with ActorLogging { * * An initial `currentState -> currentState` notification will be triggered by calling this method. * - * @see [[#startWith]] + * @see `startWith` */ final def initialize(): Unit = if (currentState ne null) makeTransition(currentState) diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/Dispatchers.scala b/actor/src/main/scala/org/apache/pekko/dispatch/Dispatchers.scala index 52ad8f08e4b..55aea4ae974 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/Dispatchers.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/Dispatchers.scala @@ -325,7 +325,7 @@ private[pekko] object BalancingDispatcherConfigurator { } /** - * Configurator for creating [[pekko.dispatch.BalancingDispatcher]]. + * Configurator for creating `BalancingDispatcher`. * Returns the same dispatcher instance for each invocation * of the `dispatcher()` method. */ diff --git a/actor/src/main/scala/org/apache/pekko/io/dns/DnsProtocol.scala b/actor/src/main/scala/org/apache/pekko/io/dns/DnsProtocol.scala index b73240afcf7..665665ebbb7 100644 --- a/actor/src/main/scala/org/apache/pekko/io/dns/DnsProtocol.scala +++ b/actor/src/main/scala/org/apache/pekko/io/dns/DnsProtocol.scala @@ -57,7 +57,7 @@ object DnsProtocol { def srvRequestType(): RequestType = Srv /** - * Sending this to the [[internal.AsyncDnsManager]] will either lead to a [[Resolved]] or a [[pekko.actor.Status.Failure]] response. + * Sending this to the `AsyncDnsManager` will either lead to a [[Resolved]] or a [[pekko.actor.Status.Failure]] response. * If request type are both, both resolutions must succeed or the response is a failure. */ final case class Resolve(name: String, requestType: RequestType) extends ConsistentHashable { diff --git a/actor/src/main/scala/org/apache/pekko/japi/Throwables.scala b/actor/src/main/scala/org/apache/pekko/japi/Throwables.scala index 0cab67fe1c6..f136c64b158 100644 --- a/actor/src/main/scala/org/apache/pekko/japi/Throwables.scala +++ b/actor/src/main/scala/org/apache/pekko/japi/Throwables.scala @@ -53,7 +53,7 @@ object Throwables { /** * Throws the given `Throwable`, without requiring the caller to declare it in a `throws` clause. * @param t the `Throwable` to throw - * @throws T the type of the `Throwable` to throw + * @throws java.lang.Throwable the type of the `Throwable` to throw * @return never returns normally, but has return type `R` to allow usage in expressions * @since 2.0.0 */ diff --git a/actor/src/main/scala/org/apache/pekko/pattern/BackoffOptions.scala b/actor/src/main/scala/org/apache/pekko/pattern/BackoffOptions.scala index 5c00b5df8df..9638b82553f 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/BackoffOptions.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/BackoffOptions.scala @@ -56,7 +56,7 @@ object BackoffOpts { * This supervisor should not be used with `Pekko Persistence` child actors. * `Pekko Persistence` actors shutdown unconditionally on `persistFailure()`s rather * than throw an exception on a failure like normal actors. - * [[#onStop]] should be used instead for cases where the child actor + * `onStop` should be used instead for cases where the child actor * terminates itself as a failure signal instead of the normal behavior of throwing an exception. * ***''' * You can define another @@ -110,7 +110,7 @@ object BackoffOpts { * This supervisor should not be used with `Pekko Persistence` child actors. * `Pekko Persistence` actors shutdown unconditionally on `persistFailure()`s rather * than throw an exception on a failure like normal actors. - * [[#onStop]] should be used instead for cases where the child actor + * `onStop` should be used instead for cases where the child actor * terminates itself as a failure signal instead of the normal behavior of throwing an exception. * ***''' * You can define another @@ -284,7 +284,7 @@ private[pekko] sealed trait ExtendedBackoffOptions[T <: ExtendedBackoffOptions[T * @param supervisorStrategy the supervisorStrategy that the back-off supervisor will use. * The default supervisor strategy is used as fallback if the specified supervisorStrategy (its decider) * does not explicitly handle an exception. As the BackoffSupervisor creates a separate actor to handle the - * backoff process, only a [[OneForOneStrategy]] makes sense here. + * backoff process, only a `OneForOneStrategy` makes sense here. * Note that changing the strategy will replace the previously defined maxNrOfRetries. */ def withSupervisorStrategy(supervisorStrategy: OneForOneStrategy): T diff --git a/actor/src/main/scala/org/apache/pekko/pattern/BackoffSupervisor.scala b/actor/src/main/scala/org/apache/pekko/pattern/BackoffSupervisor.scala index ca1c9810ab6..4214ba65caa 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/BackoffSupervisor.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/BackoffSupervisor.scala @@ -61,13 +61,13 @@ object BackoffSupervisor { /** * Send this message to the `BackoffSupervisor` and it will reset the back-off. - * This should be used in conjunction with `withManualReset` in [[BackoffOptions]]. + * This should be used in conjunction with `withManualReset` in `BackoffOptions`. */ case object Reset /** * Java API: Send this message to the `BackoffSupervisor` and it will reset the back-off. - * This should be used in conjunction with `withManualReset` in [[BackoffOptions]]. + * This should be used in conjunction with `withManualReset` in `BackoffOptions`. */ def reset = Reset diff --git a/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala b/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala index c64f76909d2..2a7b8b7b520 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala @@ -64,7 +64,7 @@ object CircuitBreaker { * Create or find a CircuitBreaker in registry. * * @param id Circuit Breaker identifier - * @param system [[ExtendedActorSystem]] that is storing this [[CircuitBreaker]] + * @param system `ExtendedActorSystem` that is storing this [[CircuitBreaker]] */ def apply(id: String)(implicit system: ExtendedActorSystem): CircuitBreaker = CircuitBreakersRegistry(system).get(id) @@ -92,7 +92,7 @@ object CircuitBreaker { * Java API: Lookup a CircuitBreaker in registry. * * @param id Circuit Breaker identifier - * @param system [[ExtendedActorSystem]] that is storing this [[CircuitBreaker]] + * @param system `ExtendedActorSystem` that is storing this [[CircuitBreaker]] */ def lookup(id: String, system: ExtendedActorSystem): CircuitBreaker = apply(id)(system) @@ -330,7 +330,7 @@ class CircuitBreaker( currentState.invoke(body, failureFn) /** - * Java API for [[#withCircuitBreaker]]. + * Java API for `withCircuitBreaker`. * * @param body Call needing protected * @return [[scala.concurrent.Future]] containing the call result or a @@ -340,7 +340,7 @@ class CircuitBreaker( withCircuitBreaker(body.call) /** - * Java API for [[#withCircuitBreaker]]. + * Java API for `withCircuitBreaker`. * * @param body Call needing protected * @param defineFailureFn function that define what should be consider failure and thus increase failure count @@ -356,7 +356,7 @@ class CircuitBreaker( } /** - * Java API (8) for [[#withCircuitBreaker]]. + * Java API (8) for `withCircuitBreaker`. * * @param body Call needing protected * @return [[java.util.concurrent.CompletionStage]] containing the call result or a @@ -368,7 +368,7 @@ class CircuitBreaker( }).asJava /** - * Java API (8) for [[#withCircuitBreaker]]. + * Java API (8) for `withCircuitBreaker`. * * @param body Call needing protected * @param defineFailureFn function that define what should be consider failure and thus increase failure count @@ -419,7 +419,7 @@ class CircuitBreaker( callTimeout) /** - * Java API for [[#withSyncCircuitBreaker]]. Throws [[java.util.concurrent.TimeoutException]] if the call timed out. + * Java API for `withSyncCircuitBreaker`. Throws [[java.util.concurrent.TimeoutException]] if the call timed out. * * @param body Call needing protected * @return The result of the call @@ -428,7 +428,7 @@ class CircuitBreaker( withSyncCircuitBreaker(body.call) /** - * Java API for [[#withSyncCircuitBreaker]]. Throws [[java.util.concurrent.TimeoutException]] if the call timed out. + * Java API for `withSyncCircuitBreaker`. Throws [[java.util.concurrent.TimeoutException]] if the call timed out. * * @param body Call needing protected * @param defineFailureFn function that define what should be consider failure and thus increase failure count @@ -444,7 +444,7 @@ class CircuitBreaker( /** * Mark a successful call through CircuitBreaker. Sometimes the callee of CircuitBreaker sends back a message to the * caller Actor. In such a case, it is convenient to mark a successful call instead of using Future - * via [[withCircuitBreaker]] + * via `withCircuitBreaker` */ def succeed(): Unit = { currentState.callSucceeds() @@ -453,7 +453,7 @@ class CircuitBreaker( /** * Mark a failed call through CircuitBreaker. Sometimes the callee of CircuitBreaker sends back a message to the * caller Actor. In such a case, it is convenient to mark a failed call instead of using Future - * via [[withCircuitBreaker]] + * via `withCircuitBreaker` */ def fail(): Unit = { currentState.callFails() diff --git a/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala b/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala index 787adf00bb7..3e696758f92 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala @@ -90,7 +90,7 @@ trait FutureTimeoutSupport { } /** - * Returns a [[scala.concurrent.Future]] that will be completed with a [[TimeoutException]] + * Returns a [[scala.concurrent.Future]] that will be completed with a `TimeoutException` * if the provided value is not completed within the specified duration. * @since 1.2.0 */ @@ -122,7 +122,7 @@ trait FutureTimeoutSupport { } /** - * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with a [[TimeoutException]] + * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with a `TimeoutException` * if the provided value is not completed within the specified duration. * @since 1.2.0 */ diff --git a/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala b/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala index 80b4fd61bf6..54cbd22916b 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala @@ -110,7 +110,7 @@ object Patterns { /** * Use for messages whose response is known to be a [[pekko.pattern.StatusReply]]. When a [[pekko.pattern.StatusReply#success]] response - * arrives the future is completed with the wrapped value, if a [[pekko.pattern.StatusReply#error]] arrives the future is instead + * arrives the future is completed with the wrapped value, if a `StatusReply.error` arrives the future is instead * failed. */ def askWithStatus(actor: ActorRef, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] = @@ -504,7 +504,7 @@ object Patterns { * When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none) * and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made, * each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1). - * Return an empty [[Optional]] instance for no delay. + * Return an empty `Optional` instance for no delay. * * If all additional attempts are exhausted the returned completion operator is simply the result of the last invoke attempt. * Note that the attempt function will be invoked on the given execution context for subsequent tries @@ -565,7 +565,7 @@ object Patterns { * When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none) * and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made, * each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1). - * Return an empty [[Optional]] instance for no delay. + * Return an empty `Optional` instance for no delay. * * If all additional attempts are exhausted the returned future is simply the result of the last invoke attempt. * Note that the attempt function will be invoked on the given execution context for subsequent tries and @@ -641,7 +641,7 @@ object Patterns { * When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none) * and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made, * each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1). - * Return an empty [[Optional]] instance for no delay. + * Return an empty `Optional` instance for no delay. * * If all additional attempts are exhausted the returned future is simply the result of the last invoke attempt. * Note that the attempt function will be invoked on the given execution context for subsequent tries and @@ -723,7 +723,7 @@ object Patterns { * When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none) * and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made, * each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1). - * Return an empty [[Optional]] instance for no delay. + * Return an empty `Optional` instance for no delay. * * If all additional attempts are exhausted the returned completion operator is simply the result of the last invoke attempt. * Note that the attempt function will be invoked on the given execution context for subsequent tries @@ -771,7 +771,7 @@ object Patterns { * When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none) * and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made, * each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1). - * Return an empty [[Optional]] instance for no delay. + * Return an empty `Optional` instance for no delay. * * If all additional attempts are exhausted the returned completion operator is simply the result of the last invoke attempt. * Note that the attempt function will be invoked on the given execution context for subsequent tries @@ -803,7 +803,7 @@ object Patterns { * Returns an internally retrying [[java.util.concurrent.CompletionStage]]. * The first attempt will be made immediately, each subsequent attempt will be made after * the 'delay' return by `delayFunction` (the input next attempt count start from 1). - * Return an empty [[Optional]] instance for no delay. + * Return an empty `Optional` instance for no delay. * A scheduler (eg context.system.scheduler) must be provided to delay each retry. * You could provide a function to generate the next delay duration after first attempt, * this function should never return `null`, otherwise a [[java.lang.IllegalArgumentException]] will be through. @@ -832,7 +832,7 @@ object Patterns { * When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none) * and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made, * each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1). - * Return an empty [[Optional]] instance for no delay. + * Return an empty `Optional` instance for no delay. * * A scheduler (eg context.system.scheduler) must be provided to delay each retry. * You could provide a function to generate the next delay duration after first attempt, diff --git a/actor/src/main/scala/org/apache/pekko/pattern/StatusReply.scala b/actor/src/main/scala/org/apache/pekko/pattern/StatusReply.scala index 47a3a7b015a..fbba788aaad 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/StatusReply.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/StatusReply.scala @@ -30,7 +30,7 @@ import pekko.pattern.StatusReply.ErrorMessage * Generic top-level message type for replies that signal failure or success. Convenient to use together with the * `askWithStatus` ask variants. * - * Create using the factory methods [[StatusReply#success]] and [[StatusReply#error]]. + * Create using the factory methods [[StatusReply#success]] and `StatusReply.error`. * * Pekko contains predefined serializers for the wrapper type and the textual error messages. * diff --git a/actor/src/main/scala/org/apache/pekko/pattern/internal/CircuitBreakerTelemetry.scala b/actor/src/main/scala/org/apache/pekko/pattern/internal/CircuitBreakerTelemetry.scala index 691134b10a2..e801fb20271 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/internal/CircuitBreakerTelemetry.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/internal/CircuitBreakerTelemetry.scala @@ -25,7 +25,7 @@ import pekko.annotation.{ InternalApi, InternalStableApi } * Service Provider Interface (SPI) for collecting metrics from Circuit Breaker. * * Implementations must include a single constructor with two arguments: Circuit Breaker id - * and [[ExtendedActorSystem]]. To setup your implementation, add a setting in your `application.conf`: + * and `ExtendedActorSystem`. To setup your implementation, add a setting in your `application.conf`: * * {{{ * pekko.circuit-breaker.telemetry.implementations += com.example.MyMetrics @@ -80,7 +80,7 @@ trait CircuitBreakerTelemetry { /** * Called when the circuit breaker is removed, e.g. expired due to inactivity. It is also called - * if the circuit breaker is re-configured, before calling [[CircuitBreakerTelemetryProvider#start]]. + * if the circuit breaker is re-configured, before calling `CircuitBreakerTelemetryProvider.start`. */ def stopped(): Unit } diff --git a/actor/src/main/scala/org/apache/pekko/util/ByteString.scala b/actor/src/main/scala/org/apache/pekko/util/ByteString.scala index 06d0162d501..cba7f0d9e52 100644 --- a/actor/src/main/scala/org/apache/pekko/util/ByteString.scala +++ b/actor/src/main/scala/org/apache/pekko/util/ByteString.scala @@ -2458,7 +2458,7 @@ sealed abstract class ByteString /** * Decodes this ByteString using a charset to produce a String. - * If you have a [[Charset]] instance available, use `decodeString(charset: java.nio.charset.Charset` instead. + * If you have a `Charset` instance available, use `decodeString(charset: java.nio.charset.Charset` instead. */ def decodeString(charset: String): String @@ -2494,7 +2494,7 @@ sealed abstract class ByteString * * @param offset the offset to read from * @return the short value - * @throws IndexOutOfBoundsException if the offset is negative or there are fewer than 2 bytes available from offset + * @throws java.lang.IndexOutOfBoundsException if the offset is negative or there are fewer than 2 bytes available from offset * @since 2.0.0 */ def readShortBE(offset: Int): Short = { @@ -2507,7 +2507,7 @@ sealed abstract class ByteString * * @param offset the offset to read from * @return the short value - * @throws IndexOutOfBoundsException if the offset is negative or there are fewer than 2 bytes available from offset + * @throws java.lang.IndexOutOfBoundsException if the offset is negative or there are fewer than 2 bytes available from offset * @since 2.0.0 */ def readShortLE(offset: Int): Short = { @@ -2520,7 +2520,7 @@ sealed abstract class ByteString * * @param offset the offset to read from * @return the int value - * @throws IndexOutOfBoundsException if the offset is negative or there are fewer than 4 bytes available from offset + * @throws java.lang.IndexOutOfBoundsException if the offset is negative or there are fewer than 4 bytes available from offset * @since 2.0.0 */ def readIntBE(offset: Int): Int = { @@ -2533,7 +2533,7 @@ sealed abstract class ByteString * * @param offset the offset to read from * @return the int value - * @throws IndexOutOfBoundsException if the offset is negative or there are fewer than 4 bytes available from offset + * @throws java.lang.IndexOutOfBoundsException if the offset is negative or there are fewer than 4 bytes available from offset * @since 2.0.0 */ def readIntLE(offset: Int): Int = { @@ -2546,7 +2546,7 @@ sealed abstract class ByteString * * @param offset the offset to read from * @return the long value - * @throws IndexOutOfBoundsException if the offset is negative or there are fewer than 8 bytes available from offset + * @throws java.lang.IndexOutOfBoundsException if the offset is negative or there are fewer than 8 bytes available from offset * @since 2.0.0 */ def readLongBE(offset: Int): Long = { @@ -2559,7 +2559,7 @@ sealed abstract class ByteString * * @param offset the offset to read from * @return the long value - * @throws IndexOutOfBoundsException if the offset is negative or there are fewer than 8 bytes available from offset + * @throws java.lang.IndexOutOfBoundsException if the offset is negative or there are fewer than 8 bytes available from offset * @since 2.0.0 */ def readLongLE(offset: Int): Long = { diff --git a/actor/src/main/scala/org/apache/pekko/util/Helpers.scala b/actor/src/main/scala/org/apache/pekko/util/Helpers.scala index 1c7bc597584..0f2e82b681f 100644 --- a/actor/src/main/scala/org/apache/pekko/util/Helpers.scala +++ b/actor/src/main/scala/org/apache/pekko/util/Helpers.scala @@ -73,7 +73,7 @@ object Helpers { /** * Converts a "currentTimeMillis"-obtained timestamp accordingly: * {{{ - * "$hours%02d:$minutes%02d:$seconds%02d.$ms%03dUTC" + * "%%02d:%%02d:%%02d.%%03dUTC" (hours, minutes, seconds, ms) * }}} * * @param timestamp a "currentTimeMillis"-obtained timestamp diff --git a/actor/src/main/scala/org/apache/pekko/util/TokenBucket.scala b/actor/src/main/scala/org/apache/pekko/util/TokenBucket.scala index e7fb53d98d3..adcc58c65e2 100644 --- a/actor/src/main/scala/org/apache/pekko/util/TokenBucket.scala +++ b/actor/src/main/scala/org/apache/pekko/util/TokenBucket.scala @@ -91,7 +91,7 @@ private[pekko] abstract class TokenBucket(capacity: Long, nanosBetweenTokens: Lo } /** - * Default implementation of [[TokenBucket]] that uses `System.nanoTime` as the time source. + * Default implementation of `TokenBucket` that uses `System.nanoTime` as the time source. */ final class NanoTimeTokenBucket(_cap: Long, _period: Long) extends TokenBucket(_cap, _period) { override def currentTime: Long = System.nanoTime() diff --git a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsCollector.scala b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsCollector.scala index b3744986853..bcb473101be 100644 --- a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsCollector.scala +++ b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsCollector.scala @@ -35,7 +35,7 @@ import pekko.cluster.MemberStatus sealed abstract class CollectionControlMessage extends Serializable /** - * Command for [[ClusterMetricsSupervisor]] to start metrics collection. + * Command for `ClusterMetricsSupervisor` to start metrics collection. */ @SerialVersionUID(1L) case object CollectionStartMessage extends CollectionControlMessage { @@ -45,7 +45,7 @@ case object CollectionStartMessage extends CollectionControlMessage { } /** - * Command for [[ClusterMetricsSupervisor]] to stop metrics collection. + * Command for `ClusterMetricsSupervisor` to stop metrics collection. */ @SerialVersionUID(1L) case object CollectionStopMessage extends CollectionControlMessage { @@ -95,7 +95,7 @@ private[metrics] class ClusterMetricsSupervisor extends Actor with ActorLogging /** * Local cluster metrics extension events. * - * Published to local event bus subscribers by [[ClusterMetricsCollector]]. + * Published to local event bus subscribers by `ClusterMetricsCollector`. */ trait ClusterMetricsEvent diff --git a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsStrategy.scala b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsStrategy.scala index fc926af6409..bbc3a1c064d 100644 --- a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsStrategy.scala +++ b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsStrategy.scala @@ -20,7 +20,7 @@ import pekko.util.Helpers.ConfigOps import com.typesafe.config.Config /** - * Default [[ClusterMetricsSupervisor]] strategy: + * Default `ClusterMetricsSupervisor` strategy: * A configurable [[pekko.actor.OneForOneStrategy]] with restart-on-throwable decider. */ class ClusterMetricsStrategy(config: Config) diff --git a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/protobuf/MessageSerializer.scala b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/protobuf/MessageSerializer.scala index c0baa709d01..706c221e1ad 100644 --- a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/protobuf/MessageSerializer.scala +++ b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/protobuf/MessageSerializer.scala @@ -31,7 +31,7 @@ import pekko.remote.ByteStringUtils import pekko.serialization.{ BaseSerializer, SerializationExtension, SerializerWithStringManifest, Serializers } /** - * Protobuf serializer for [[pekko.cluster.metrics.ClusterMetricsMessage]] types. + * Protobuf serializer for `ClusterMetricsMessage` types. */ class MessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer { diff --git a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/javadsl/ClusterSharding.scala b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/javadsl/ClusterSharding.scala index 7a41f057b9d..dcdd8b36bed 100644 --- a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/javadsl/ClusterSharding.scala +++ b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/javadsl/ClusterSharding.scala @@ -86,7 +86,7 @@ object ClusterSharding { * in the cluster, registering the supported entity types with the [[ClusterSharding#init]] * method, which returns the `ShardRegion` actor reference for a named entity type. * Messages to the entities are always sent via that `ActorRef`, i.e. the local `ShardRegion`. - * Messages can also be sent via the [[EntityRef]] retrieved with [[ClusterSharding#entityRefFor]], + * Messages can also be sent via the [[EntityRef]] retrieved with `entityRefFor`, * which will also send via the local `ShardRegion`. * * Some settings can be configured as described in the `pekko.cluster.sharding` @@ -490,7 +490,7 @@ object EntityTypeKey { /** * The same as [[ask]] but only for requests that result in a response of type [[pekko.pattern.StatusReply]]. * If the response is a [[pekko.pattern.StatusReply#success]] the returned future is completed successfully with the wrapped response. - * If the status response is a [[pekko.pattern.StatusReply#error]] the returned future will be failed with the + * If the status response is a `StatusReply.error` the returned future will be failed with the * exception in the error (normally a [[pekko.pattern.StatusReply.ErrorMessage]]). */ def askWithStatus[Res](f: ActorRef[StatusReply[Res]] => M, timeout: Duration): CompletionStage[Res] diff --git a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/scaladsl/ClusterSharding.scala index 5fbd1d5856d..820be201ebd 100644 --- a/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/scaladsl/ClusterSharding.scala +++ b/cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -85,7 +85,7 @@ object ClusterSharding extends ExtensionId[ClusterSharding] { * in the cluster, registering the supported entity types with the [[ClusterSharding#init]] * method, which returns the `ShardRegion` actor reference for a named entity type. * Messages to the entities are always sent via that `ActorRef`, i.e. the local `ShardRegion`. - * Messages can also be sent via the [[EntityRef]] retrieved with [[ClusterSharding#entityRefFor]], + * Messages can also be sent via the [[EntityRef]] retrieved with `entityRefFor`, * which will also send via the local `ShardRegion`. * * Some settings can be configured as described in the `pekko.cluster.sharding` diff --git a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala index d22984c178f..10b1d53d609 100755 --- a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala +++ b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ClusterSharding.scala @@ -81,7 +81,7 @@ import pekko.pattern.ask * * Typical usage of this extension: * 1. At system startup on each cluster node by registering the supported entity types with - * the [[ClusterSharding#start]] method + * the `ClusterSharding.start` method * 1. Retrieve the `ShardRegion` actor for a named entity type with [[ClusterSharding#shardRegion]] * Settings can be configured as described in the `pekko.cluster.sharding` section of the `reference.conf`. * @@ -638,7 +638,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { /** * Retrieve the actor reference of the `ShardRegion` actor responsible for the named entity type. - * The entity type must be registered with the [[#start]] or [[#startProxy]] method before it + * The entity type must be registered with the `start` or `startProxy` method before it * can be used here. Messages to the entity is always sent via the `ShardRegion`. */ def shardRegion(typeName: String): ActorRef = { @@ -658,7 +658,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * Retrieve the actor reference of the `ShardRegion` actor that will act as a proxy to the * named entity type running in another data center. A proxy within the same data center can be accessed * with [[#shardRegion]] instead of this method. The entity type must be registered with the - * [[#startProxy]] method before it can be used here. Messages to the entity is always sent + * `startProxy` method before it can be used here. Messages to the entity is always sent * via the `ShardRegion`. */ def shardRegionProxy(typeName: String, dataCenter: DataCenter): ActorRef = { diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/Cluster.scala b/cluster/src/main/scala/org/apache/pekko/cluster/Cluster.scala index 1bc7bc7cf1c..4555dca23ed 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/Cluster.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/Cluster.scala @@ -69,7 +69,7 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { /** * This module is responsible cluster membership information. Changes to the cluster - * information is retrieved through [[#subscribe]]. Commands to operate the cluster is + * information is retrieved through `subscribe`. Commands to operate the cluster is * available through methods in this class, such as [[#join]], [[#down]] and [[#leave]]. * * Each cluster [[Member]] is identified by its [[pekko.actor.Address]], and diff --git a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Conductor.scala b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Conductor.scala index 303e09b8c9f..a29af4ae9e8 100644 --- a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Conductor.scala +++ b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Conductor.scala @@ -52,10 +52,10 @@ import io.netty.channel.ChannelHandler.Sharable /** * The conductor is the one orchestrating the test: it governs the - * [[pekko.remote.testconductor.Controller]]’s port to which all + * `Controller`’s port to which all * [[pekko.remote.testconductor.Player]]s connect, it issues commands to their * `org.apache.pekko.remote.testconductor.NetworkFailureInjector` and provides support - * for barriers using the [[pekko.remote.testconductor.BarrierCoordinator]]. + * for barriers using the `BarrierCoordinator`. * All of this is bundled inside the [[pekko.remote.testconductor.TestConductorExt]] * extension. */ @@ -70,7 +70,7 @@ trait Conductor { this: TestConductorExt => } /** - * Start the [[pekko.remote.testconductor.Controller]], which in turn will + * Start the `Controller`, which in turn will * bind to a TCP port as specified in the `pekko.testconductor.port` config * property, where 0 denotes automatic allocation. Since the latter is * actually preferred, a `Future[Int]` is returned which will be completed @@ -428,7 +428,7 @@ private[pekko] object Controller { /** * This controls test execution by managing barriers (delegated to - * [[pekko.remote.testconductor.BarrierCoordinator]], its child) and allowing + * `BarrierCoordinator`, its child) and allowing * network and other failures to be injected at the test nodes. * * INTERNAL API. diff --git a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala index 0e525dbb5dd..7645c56e9d4 100644 --- a/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala +++ b/multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/Player.scala @@ -69,7 +69,7 @@ object Player { /** * The Player is the client component of the * [[pekko.remote.testconductor.TestConductorExt]] extension. It registers with - * the [[pekko.remote.testconductor.Conductor]]’s [[pekko.remote.testconductor.Controller]] + * the [[pekko.remote.testconductor.Conductor]]’s `Controller` * in order to participate in barriers and enable network failure injection. */ trait Player { this: TestConductorExt => @@ -162,7 +162,7 @@ private[pekko] object ClientFSM { /** * This is the controlling entity on the [[pekko.remote.testconductor.Player]] * side: in a first step it registers itself with a symbolic name and its remote - * address at the [[pekko.remote.testconductor.Controller]], then waits for the + * address at the `Controller`, then waits for the * `Done` message which signals that all other expected test participants have * done the same. After that, it will pass barrier requests to and from the * coordinator and react to the [[pekko.remote.testconductor.Conductor]]’s diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/SSLEngineProvider.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/SSLEngineProvider.scala index 7a0f4008566..c3586698cb7 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/SSLEngineProvider.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/SSLEngineProvider.scala @@ -48,7 +48,7 @@ trait SSLEngineProvider { /** * Create a client SSLEngine with the target hostname and port for hostname verification. - * Default implementation delegates to [[createClientSSLEngine()]]. + * Default implementation delegates to `createClientSSLEngine()`. */ @nowarn("msg=never used") def createClientSSLEngine(hostname: String, port: Int): SSLEngine = createClientSSLEngine() diff --git a/serialization-jackson/src/main/scala/org/apache/pekko/serialization/jackson/JacksonObjectMapperProvider.scala b/serialization-jackson/src/main/scala/org/apache/pekko/serialization/jackson/JacksonObjectMapperProvider.scala index 567307f7fc0..e2d665ef31e 100644 --- a/serialization-jackson/src/main/scala/org/apache/pekko/serialization/jackson/JacksonObjectMapperProvider.scala +++ b/serialization-jackson/src/main/scala/org/apache/pekko/serialization/jackson/JacksonObjectMapperProvider.scala @@ -300,7 +300,7 @@ object JacksonObjectMapperProvider extends ExtensionId[JacksonObjectMapperProvid } /** - * INTERNAL API: Use [[JacksonObjectMapperProvider#create]] + * INTERNAL API: Use `JacksonObjectMapperProvider.create` * * This is needed by one test in Lagom where the ObjectMapper is created without starting and ActorSystem. */ @@ -398,7 +398,7 @@ final class JacksonObjectMapperProvider(system: ExtendedActorSystem) extends Ext * @param bindingName name of this `ObjectMapper` * @param jsonFactory optional `JsonFactory` such as `CBORFactory`, for plain JSON `None` (defaults) * can be used - * @see [[JacksonObjectMapperProvider#getOrCreate]] + * @see `JacksonObjectMapperProvider.getOrCreate` */ def create(bindingName: String, jsonFactory: Option[JsonFactory]): ObjectMapper = { val log = Logging.getLogger(system, JacksonObjectMapperProvider.getClass) @@ -422,7 +422,7 @@ final class JacksonObjectMapperProvider(system: ExtendedActorSystem) extends Ext * @param bindingName name of this `ObjectMapper` * @param jsonFactory optional `JsonFactory` such as `CBORFactory`, for plain JSON `None` (defaults) * can be used - * @see [[JacksonObjectMapperProvider#getOrCreate]] + * @see `JacksonObjectMapperProvider.getOrCreate` */ def create(bindingName: String, jsonFactory: Optional[JsonFactory]): ObjectMapper = create(bindingName, jsonFactory.toScala) @@ -477,7 +477,7 @@ class JacksonObjectMapperFactory { * return the modules that are to be applied to the `ObjectMapper`. * * When implementing a `JacksonObjectMapperFactory` with Java the `immutable.Seq` can be - * created with [[pekko.japi.Util.immutableSeq]]. + * created with `Util.immutableSeq`. * * @param bindingName bindingName name of this `ObjectMapper` * @param configuredModules the list of `Modules` that were configured in @@ -494,7 +494,7 @@ class JacksonObjectMapperFactory { * return the features that are to be applied to the `ObjectMapper`. * * When implementing a `JacksonObjectMapperFactory` with Java the `immutable.Seq` can be - * created with [[pekko.japi.Util.immutableSeq]]. + * created with `Util.immutableSeq`. * * @param bindingName bindingName name of this `ObjectMapper` * @param configuredFeatures the list of `SerializationFeature` that were configured in @@ -512,7 +512,7 @@ class JacksonObjectMapperFactory { * return the features that are to be applied to the `ObjectMapper`. * * When implementing a `JacksonObjectMapperFactory` with Java the `immutable.Seq` can be - * created with [[pekko.japi.Util.immutableSeq]]. + * created with `Util.immutableSeq`. * * @param bindingName bindingName name of this `ObjectMapper` * @param configuredFeatures the list of `DeserializationFeature` that were configured in @@ -530,7 +530,7 @@ class JacksonObjectMapperFactory { * return the features that are to be applied to the `ObjectMapper`. * * When implementing a `JacksonObjectMapperFactory` with Java the `immutable.Seq` can be - * created with [[pekko.japi.Util.immutableSeq]]. + * created with `Util.immutableSeq`. * * @param bindingName bindingName name of this `ObjectMapper` * @param configuredFeatures the list of `DateTimeFeature` that were configured in diff --git a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorFlow.scala b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorFlow.scala index 7548319108e..1e086f31dd8 100644 --- a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorFlow.scala +++ b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorFlow.scala @@ -76,8 +76,8 @@ object ActorFlow { .asJava /** - * Use for messages whose response is known to be a [[pekko.pattern.StatusReply]]. When a [[pekko.pattern.StatusReply#success]] response - * arrives the future is completed with the wrapped value, if a [[pekko.pattern.StatusReply#error]] arrives the future is instead + * Use for messages whose response is known to be a [[pekko.pattern.StatusReply]]. When a `StatusReply.success` response + * arrives the future is completed with the wrapped value, if a `StatusReply.error` arrives the future is instead * failed. */ def askWithStatus[I, Q, A]( @@ -132,8 +132,8 @@ object ActorFlow { .asJava /** - * Use for messages whose response is known to be a [[pekko.pattern.StatusReply]]. When a [[pekko.pattern.StatusReply#success]] response - * arrives the future is completed with the wrapped value, if a [[pekko.pattern.StatusReply#error]] arrives the future is instead + * Use for messages whose response is known to be a [[pekko.pattern.StatusReply]]. When a `StatusReply.success` response + * arrives the future is completed with the wrapped value, if a `StatusReply.error` arrives the future is instead * failed. */ def askWithStatus[I, Q, A]( @@ -163,8 +163,8 @@ object ActorFlow { .asJava /** - * Use for messages whose response is known to be a [[pekko.pattern.StatusReply]]. When a [[pekko.pattern.StatusReply#success]] response - * arrives the future is completed with the wrapped value, if a [[pekko.pattern.StatusReply#error]] arrives the future is instead + * Use for messages whose response is known to be a [[pekko.pattern.StatusReply]]. When a `StatusReply.success` response + * arrives the future is completed with the wrapped value, if a `StatusReply.error` arrives the future is instead * failed. */ def askWithStatusAndContext[I, Q, A, Ctx]( diff --git a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala index 6b0df3ae265..58ab1afffb0 100644 --- a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala +++ b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala @@ -51,7 +51,7 @@ object ActorSource { * The actor will be stopped when the stream is completed, failed or canceled from downstream, * i.e. you can watch it to get notified when that happens. * - * See also [[pekko.stream.javadsl.Source.queue]]. + * See also `Source.queue`. * * @param bufferSize The size of the buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer diff --git a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorFlow.scala b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorFlow.scala index 4463761dc42..d7dcd21dab7 100644 --- a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorFlow.scala +++ b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorFlow.scala @@ -144,8 +144,8 @@ object ActorFlow { implicit timeout: Timeout): Flow[I, A, NotUsed] = askImpl(parallelism)(ref)(makeMessage, (_, o: Future[A]) => o) /** - * Use for messages whose response is known to be a [[pekko.pattern.StatusReply]]. When a [[pekko.pattern.StatusReply#success]] response - * arrives the future is completed with the wrapped value, if a [[pekko.pattern.StatusReply#error]] arrives the future is instead + * Use for messages whose response is known to be a [[pekko.pattern.StatusReply]]. When a `StatusReply.success` response + * arrives the future is completed with the wrapped value, if a `StatusReply.error` arrives the future is instead * failed. */ def askWithStatus[I, Q, A](ref: ActorRef[Q])(makeMessage: (I, ActorRef[StatusReply[A]]) => Q)( @@ -153,8 +153,8 @@ object ActorFlow { askWithStatus(2)(ref)(makeMessage) /** - * Use for messages whose response is known to be a [[pekko.pattern.StatusReply]]. When a [[pekko.pattern.StatusReply#success]] response - * arrives the future is completed with the wrapped value, if a [[pekko.pattern.StatusReply#error]] arrives the future is instead + * Use for messages whose response is known to be a [[pekko.pattern.StatusReply]]. When a `StatusReply.success` response + * arrives the future is completed with the wrapped value, if a `StatusReply.error` arrives the future is instead * failed. */ def askWithStatus[I, Q, A](parallelism: Int)(ref: ActorRef[Q])(makeMessage: (I, ActorRef[StatusReply[A]]) => Q)( @@ -186,8 +186,8 @@ object ActorFlow { (in, o: Future[A]) => o.map(a => a -> in._2)(ExecutionContext.parasitic)) /** - * Use for messages whose response is known to be a [[pekko.pattern.StatusReply]]. When a [[pekko.pattern.StatusReply#success]] response - * arrives the future is completed with the wrapped value, if a [[pekko.pattern.StatusReply#error]] arrives the future is instead + * Use for messages whose response is known to be a [[pekko.pattern.StatusReply]]. When a `StatusReply.success` response + * arrives the future is completed with the wrapped value, if a `StatusReply.error` arrives the future is instead * failed. */ def askWithStatusAndContext[I, Q, A, Ctx](ref: ActorRef[Q])(makeMessage: (I, ActorRef[StatusReply[A]]) => Q)( @@ -195,8 +195,8 @@ object ActorFlow { askWithStatusAndContext(2)(ref)(makeMessage) /** - * Use for messages whose response is known to be a [[pekko.pattern.StatusReply]]. When a [[pekko.pattern.StatusReply#success]] response - * arrives the future is completed with the wrapped value, if a [[pekko.pattern.StatusReply#error]] arrives the future is instead + * Use for messages whose response is known to be a [[pekko.pattern.StatusReply]]. When a `StatusReply.success` response + * arrives the future is completed with the wrapped value, if a `StatusReply.error` arrives the future is instead * failed. */ def askWithStatusAndContext[I, Q, A, Ctx](parallelism: Int)(ref: ActorRef[Q])( diff --git a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorSource.scala b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorSource.scala index 7e680371174..1a910b8d0b7 100644 --- a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorSource.scala +++ b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorSource.scala @@ -51,7 +51,7 @@ object ActorSource { * The actor will be stopped when the stream is completed, failed or canceled from downstream, * i.e. you can watch it to get notified when that happens. * - * See also [[pekko.stream.scaladsl.Source.queue]]. + * See also `Source.queue`. * * @param bufferSize The size of the buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer diff --git a/stream/src/main/scala/org/apache/pekko/stream/ActorMaterializer.scala b/stream/src/main/scala/org/apache/pekko/stream/ActorMaterializer.scala index eb14650226f..2a3d85060f6 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/ActorMaterializer.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/ActorMaterializer.scala @@ -227,7 +227,7 @@ private[pekko] object ActorMaterializerSettings { } /** - * This class describes the configurable properties of the [[ActorMaterializer]]. + * This class describes the configurable properties of the `ActorMaterializer`. * Please refer to the `withX` methods for descriptions of the individual settings. * * The constructor is not public API, use create or apply on the [[ActorMaterializerSettings]] companion instead. diff --git a/stream/src/main/scala/org/apache/pekko/stream/Graph.scala b/stream/src/main/scala/org/apache/pekko/stream/Graph.scala index f6cb2aca97e..cc0548aa90a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/Graph.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/Graph.scala @@ -45,7 +45,7 @@ trait Graph[+S <: Shape, +M] { private[stream] def traversalBuilder: TraversalBuilder /** - * Replace the attributes of this [[Flow]] with the given ones. If this Flow is a composite + * Replace the attributes of this `Flow` with the given ones. If this Flow is a composite * of multiple graphs, new attributes on the composite will be less specific than attributes * set directly on the individual graphs of the composite. */ diff --git a/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanIn.scala b/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanIn.scala index 857f5b8d24e..f139f86b3f2 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanIn.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanIn.scala @@ -22,7 +22,7 @@ package org.apache.pekko.stream * their input element type (i.e., `T => T`). * * Built-in stages with this trait: [[scaladsl.Merge]], [[scaladsl.Concat]], - * [[scaladsl.Interleave]], [[scaladsl.MergePrioritized]], [[scaladsl.OrElse]], + * [[scaladsl.Interleave]], [[scaladsl.MergePrioritized]], `OrElse`, * and [[scaladsl.MergeSequence]]. * * Note: some of these stages (Concat, Interleave, MergeSequence) have factory methods @@ -31,7 +31,7 @@ package org.apache.pekko.stream * correct, just slightly less optimal. The bypass optimization fires for stages whose * factory methods return the raw class (e.g., `Merge`, `MergePrioritized`). * - * This trait is used by [[scaladsl.Source.combine]] (and its Java API counterpart) + * This trait is used by `Source.combine` (and its Java API counterpart) * to safely optimize the single-source case. When only one source is provided, * the fan-in strategy can be bypassed with a direct pass-through if and only if the * strategy is type-preserving (output type equals input type). Without this marker, diff --git a/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanOut.scala b/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanOut.scala index 663819093aa..9a60528cceb 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanOut.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/TypePreservingFanOut.scala @@ -27,7 +27,7 @@ package org.apache.pekko.stream * `T => T` types, because its `partitioner` function provides user-specified routing * semantics that would be lost if the stage were bypassed. * - * This trait is used by [[scaladsl.Sink.combine]] (and its Java API counterpart) + * This trait is used by `Sink.combine` (and its Java API counterpart) * to safely optimize the single-sink case. When only one sink is provided, * the fan-out strategy can be bypassed with a direct pass-through if and only if the * strategy is type-preserving (output type equals input type). Without this marker, diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/package.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/package.scala index 7f551a106c4..706737a281d 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/package.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/package.scala @@ -17,13 +17,13 @@ package org.apache.pekko.stream * The architecture of Apache Pekko Streams internally consists of several distinct layers: * * * The DSLs like [[org.apache.pekko.stream.scaladsl.Flow]], [[org.apache.pekko.stream.scaladsl.Source]] etc. are the user facing API - * for composing streams. These DSLs are a thin wrappers around the internal [[org.apache.pekko.stream.impl.TraversalBuilder]] + * for composing streams. These DSLs are a thin wrappers around the internal `TraversalBuilder` * builder classes. There are Java alternatives of these DSLs in [[javadsl]] which basically wrap their scala * counterpart, delegating method calls. * * The [[org.apache.pekko.stream.stage.GraphStage]] API is the user facing API for creating new stream operators. These - * classes are used by the [[org.apache.pekko.stream.impl.fusing.GraphInterpreter]] which executes islands (subgraphs) of these + * classes are used by the `GraphInterpreter` which executes islands (subgraphs) of these * operators - * * The high level DSLs use the [[org.apache.pekko.stream.impl.TraversalBuilder]] classes to build instances of + * * The high level DSLs use the `TraversalBuilder` classes to build instances of * [[org.apache.pekko.stream.impl.Traversal]] which are the representation of a materializable stream description. These builders * are immutable and safely shareable. Unlike the top-level DSLs, these are classic, i.e. elements are treated as * Any. @@ -33,10 +33,10 @@ package org.apache.pekko.stream * interpreting a [[org.apache.pekko.stream.impl.Traversal]]. It delegates the actual task of creating executable entities * and Publishers/Producers to [[org.apache.pekko.stream.impl.PhaseIsland]]s which are plugins that understand atomic operators * in the graph and able to turn them into executable entities. - * * The [[org.apache.pekko.stream.impl.fusing.GraphInterpreter]] and its actor backed wrapper [[org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter]] + * * The `GraphInterpreter` and its actor backed wrapper [[org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter]] * are used to execute synchronous islands (subgraphs) of [[org.apache.pekko.stream.stage.GraphStage]]s. * - * For the execution layer, refer to [[org.apache.pekko.stream.impl.fusing.GraphInterpreter]]. + * For the execution layer, refer to `GraphInterpreter`. * * == Design goals == * @@ -193,7 +193,7 @@ package org.apache.pekko.stream * * the port merge1.out must be different from merge2.out. * - * For efficiency reasons, the linear and graph DSLs use different [[org.apache.pekko.stream.impl.TraversalBuilder]] types to + * For efficiency reasons, the linear and graph DSLs use different `TraversalBuilder` types to * build the [[org.apache.pekko.stream.impl.Traversal]] (we will discuss these next). One of the differences between the two * builders are their approach to port mapping. * @@ -205,7 +205,7 @@ package org.apache.pekko.stream * use any port mapping. * * The generic graph builder class [[org.apache.pekko.stream.impl.CompositeTraversalBuilder]] needs port mapping as it allows - * adding any kind of builders in any order. When adding a module (encoded as another [[org.apache.pekko.stream.impl.TraversalBuilder]]) + * adding any kind of builders in any order. When adding a module (encoded as another `TraversalBuilder`) * there are two entities in play: * * * The module (builder) to be added. This builder has a few ports unwired which are usually packaged in a [[Shape]] @@ -347,7 +347,7 @@ package org.apache.pekko.stream * All what we have discussed so far referred to the "mental array", the global address space in which slots * are assigned to ports. This model describes the wiring of the graph perfectly, but it does not map to the local * data structures needed by materialization when there are islands present. One of the important goals of this - * layout data structure is to be able to produce the data structures used by the [[org.apache.pekko.stream.impl.fusing.GraphInterpreter]] + * layout data structure is to be able to produce the data structures used by the `GraphInterpreter` * directly, without much translation. Unfortunately if there is an island inside a traversal, it might leave gaps * in the address space: * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/BidiFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/BidiFlow.scala index 47598bf82c3..508b8a25cae 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/BidiFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/BidiFlow.scala @@ -313,7 +313,7 @@ object BidiFlow { /** * If the time between two processed elements *in any direction* exceed the provided timeout, the stream is failed - * with a [[org.apache.pekko.StreamIdleTimeoutException]]. + * with a `StreamIdleTimeoutException`. * * There is a difference between this operator and having two idleTimeout Flows assembled into a BidiStage. * If the timeout is configured to be 1 seconds, then this operator will not fail even though there are elements flowing diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala index ceddb7f70ff..967ff09a877 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala @@ -32,7 +32,7 @@ object Compression { def gzip: Flow[ByteString, ByteString, NotUsed] = gzip(Deflater.BEST_COMPRESSION) /** - * Same as [[gzip]] with a custom level. + * Same as `gzip` with a custom level. * * @param level Compression level (0-9) */ @@ -40,7 +40,7 @@ object Compression { CompressionUtils.compressorFlow(() => new GzipCompressor(level)) /** - * Same as [[gzip]] with a custom level and configurable flush mode. + * Same as `gzip` with a custom level and configurable flush mode. * * @param level Compression level (0-9) * @param autoFlush If true will automatically flush after every single element in the stream. @@ -68,7 +68,7 @@ object Compression { def deflate: Flow[ByteString, ByteString, NotUsed] = deflate(Deflater.BEST_COMPRESSION, nowrap = false) /** - * Same as [[deflate]] with configurable level and nowrap + * Same as `deflate` with configurable level and nowrap * * @param level Compression level (0-9) * @param nowrap if true then use GZIP compatible compression @@ -77,7 +77,7 @@ object Compression { CompressionUtils.compressorFlow(() => new DeflateCompressor(level, nowrap)) /** - * Same as [[deflate]] with configurable level, nowrap and autoFlush. + * Same as `deflate` with configurable level, nowrap and autoFlush. * * @param level Compression level (0-9) * @param nowrap if true then use GZIP compatible compression diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FileIO.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FileIO.scala index f090acf4870..ac314fbf7ed 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FileIO.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FileIO.scala @@ -37,7 +37,7 @@ object FileIO { * You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[pekko.stream.ActorAttributes]]. * - * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, + * It materializes a `Future` of [[IOResult]] containing the number of bytes read from the source file upon completion, * and a possible exception if IO operation was not completed successfully. Note that bytes having been read by the source does * not give any guarantee that the bytes were seen by downstream stages. * @@ -55,7 +55,7 @@ object FileIO { * You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[pekko.stream.ActorAttributes]]. * - * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, + * It materializes a `Future` of [[IOResult]] containing the number of bytes read from the source file upon completion, * and a possible exception if IO operation was not completed successfully. Note that bytes having been read by the source does * not give any guarantee that the bytes were seen by downstream stages. * @@ -70,7 +70,7 @@ object FileIO { * Creates a Sink which writes incoming [[pekko.util.ByteString]] elements to the given file path. Overwrites existing files * by truncating their contents as default. * - * Materializes a [[Future]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, + * Materializes a `Future` of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, * and a possible exception if IO operation was not completed successfully. * * This source is backed by an Actor which will use the dedicated `pekko.stream.blocking-io-dispatcher`, @@ -94,7 +94,7 @@ object FileIO { * Creates a Sink which writes incoming [[pekko.util.ByteString]] elements to the given file path. Overwrites existing files * by truncating their contents as default. * - * Materializes a [[Future]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, + * Materializes a `Future` of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, * and a possible exception if IO operation was not completed successfully. * * This source is backed by an Actor which will use the dedicated `pekko.stream.blocking-io-dispatcher`, diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 70a8f263ec9..94d543dd3d1 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -189,7 +189,7 @@ final class Flow[-In, +Out, +Mat]( * via the provided [[Sink]] as a new [[Source]]. * * @param source A source that connects to this flow - * @param sink A sink which needs to materialize into a [[Future]], typically one + * @param sink A sink which needs to materialize into a `Future`, typically one * that collects values such as [[Sink.head]] or [[Sink.seq]] * @return A new [[Source]] that contains the results of the [[Flow]] with the provided * [[Source]]'s elements run with the [[Sink]] @@ -1120,7 +1120,7 @@ trait FlowOps[+Out, +Mat] { * Elements will be passed into this "side channel" function, and any of its results will be ignored. * * If the wire-tap operation is slow (it backpressures), elements that would've been sent to it will be dropped instead. - * It is similar to [[#alsoTo]] which does backpressure instead of dropping elements. + * It is similar to `alsoTo` which does backpressure instead of dropping elements. * * This operation is useful for inspecting the passed through element, usually by means of side-effecting * operations (such as `println`, or emitting metrics), for each element without having to modify it. @@ -1203,7 +1203,7 @@ trait FlowOps[+Out, +Mat] { * The `close` function is called only once when the upstream or downstream finishes or fails. You can do some clean-up here, * and if the returned value is not empty, it will be emitted to the downstream if available, otherwise the value will be dropped. * - * Early completion can be done with combination of the [[takeWhile]] operator. + * Early completion can be done with combination of the `takeWhile` operator. * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * @@ -1234,15 +1234,15 @@ trait FlowOps[+Out, +Mat] { ).withAttributes(DefaultAttributes.mapWithResource) /** - * Transform each stream element with the help of an [[AutoCloseable]] resource and close it when the stream finishes or fails. + * Transform each stream element with the help of an `AutoCloseable` resource and close it when the stream finishes or fails. * * The resource creation function is invoked once when the stream is materialized and the returned resource is passed to * the mapping function for mapping the first element. The mapping function returns a mapped element to emit * downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification. * - * The [[AutoCloseable]] resource is closed only once when the upstream or downstream finishes or fails. + * The `AutoCloseable` resource is closed only once when the upstream or downstream finishes or fails. * - * Early completion can be done with combination of the [[takeWhile]] operator. + * Early completion can be done with combination of the `takeWhile` operator. * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * @@ -1378,7 +1378,7 @@ trait FlowOps[+Out, +Mat] { * The function `partitioner` is always invoked on the elements in the order they arrive. * The function `f` is always invoked on the elements which in the same partition in the order they arrive. * - * If the function `partitioner` or `f` throws an exception or if the [[Future]] is completed + * If the function `partitioner` or `f` throws an exception or if the `Future` is completed * with failure and the supervision decision is [[pekko.stream.Supervision.Stop]] * the stream will be completed with failure, otherwise the stream continues and the current element is dropped. * @@ -1416,7 +1416,7 @@ trait FlowOps[+Out, +Mat] { * The function `partitioner` is always invoked on the elements in the order they arrive. * The function `f` is always invoked on the elements which in the same partition in the order they arrive. * - * If the function `partitioner` or `f` throws an exception or if the [[Future]] is completed + * If the function `partitioner` or `f` throws an exception or if the `Future` is completed * with failure and the supervision decision is [[pekko.stream.Supervision.Stop]] * the stream will be completed with failure, otherwise the stream continues and the current element is dropped. * @@ -1703,7 +1703,7 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' after predicate returned true or downstream cancels * - * See also [[FlowOps.limit]], [[FlowOps.limitWeighted]], [[FlowOps.takeWhile]] + * See also [[FlowOps.limit]], [[FlowOps.limitWeighted]], `FlowOps.takeWhile` * @since 1.2.0 */ def takeUntil(p: Out => Boolean): Repr[Out] = takeWhile(!p(_), inclusive = true) @@ -1923,7 +1923,7 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels * - * See also [[FlowOps.take]], [[FlowOps.takeWithin]], [[FlowOps.takeWhile]] + * See also [[FlowOps.take]], [[FlowOps.takeWithin]], `FlowOps.takeWhile` */ def limit(max: Long): Repr[Out] = limitWeighted(max)(_ => 1) @@ -1949,7 +1949,7 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels * - * See also [[FlowOps.take]], [[FlowOps.takeWithin]], [[FlowOps.takeWhile]] + * See also [[FlowOps.take]], [[FlowOps.takeWithin]], `FlowOps.takeWhile` */ def limitWeighted[T](max: Long)(costFn: Out => Long): Repr[Out] = via(LimitWeighted(max, costFn)) @@ -2110,7 +2110,7 @@ trait FlowOps[+Out, +Mat] { * yielding the next current value. * * If the stream is empty (i.e. completes before signalling any elements), - * the reduce operator will fail its downstream with a [[NoSuchElementException]], + * the reduce operator will fail its downstream with a `NoSuchElementException`, * which is semantically in-line with that Scala's standard library collections * do in such situations. * @@ -2129,7 +2129,7 @@ trait FlowOps[+Out, +Mat] { def reduce[T >: Out](f: (T, T) => T): Repr[T] = via(new Reduce[T](f)) /** - * Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]] + * Intersperses stream with provided element, similar to how `List.mkString` * injects a separator between a List's elements. * * Additionally can inject start and end marker elements to stream. @@ -2163,7 +2163,7 @@ trait FlowOps[+Out, +Mat] { via(Intersperse(Some(start), inject, Some(end))) /** - * Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]] + * Intersperses stream with provided element, similar to how `List.mkString` * injects a separator between a List's elements. * * Additionally can inject start and end marker elements to stream. @@ -2747,7 +2747,7 @@ trait FlowOps[+Out, +Mat] { * If you expect an infinite number of keys this can cause memory issues. Elements belonging * to those keys are drained directly and not send to the substream. * - * @see [[#groupBy]] + * @see `groupBy` */ def groupBy[K](maxSubstreams: Int, f: Out => K): SubFlow[Out, Mat, Repr, Closed] = groupBy(maxSubstreams, f, false) @@ -2803,7 +2803,7 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels and substreams cancel on `SubstreamCancelStrategy.drain`, downstream * cancels or any substream cancels on `SubstreamCancelStrategy.propagate` * - * See also [[FlowOps.splitAfter]]. + * See also `FlowOps.splitAfter`. */ @deprecated( "Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy", @@ -2834,7 +2834,7 @@ trait FlowOps[+Out, +Mat] { * emits them to a stream of output streams, always beginning a new one with * the current element if the given predicate returns true for it. * - * @see [[#splitWhen]] + * @see `splitWhen` */ def splitWhen(p: Out => Boolean): SubFlow[Out, Mat, Repr, Closed] = { val merge = new SubFlowImpl.MergeBack[Out, Repr] { @@ -2891,7 +2891,7 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels and substreams cancel on `SubstreamCancelStrategy.drain`, downstream * cancels or any substream cancels on `SubstreamCancelStrategy.propagate` * - * See also [[FlowOps.splitWhen]]. + * See also `FlowOps.splitWhen`. */ @deprecated( "Use .withAttributes(ActorAttributes.supervisionStrategy(equivalentDecider)) rather than a SubstreamCancelStrategy", @@ -2920,7 +2920,7 @@ trait FlowOps[+Out, +Mat] { * emits them to a stream of output streams. It *ends* the current substream when the * predicate is true. * - * @see [[#splitAfter]] + * @see `splitAfter` */ def splitAfter(p: Out => Boolean): SubFlow[Out, Mat, Repr, Closed] = { val merge = new SubFlowImpl.MergeBack[Out, Repr] { @@ -2966,7 +2966,7 @@ trait FlowOps[+Out, +Mat] { map(f).via(new FlattenConcat[T, M](parallelism)) /** - * Alias for [[flatMapConcat]], added to enable for comprehensions. + * Alias for `flatMapConcat`, added to enable for comprehensions. * * NOTE: Support for `for` comprehensions is still experimental and it's possible that we might need to change * the internal implementation. @@ -3314,7 +3314,7 @@ trait FlowOps[+Out, +Mat] { * By default element and completion signals are logged on debug level, and errors are logged on Error level. * This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow: * - * Uses implicit [[LoggingAdapter]] if available, otherwise uses an internally created one, + * Uses implicit `LoggingAdapter` if available, otherwise uses an internally created one, * which uses `org.apache.pekko.event.Logging(materializer.system, materializer)` as its source (use this class to configure slf4j loggers). * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. @@ -3337,7 +3337,7 @@ trait FlowOps[+Out, +Mat] { * By default element and completion signals are logged on debug level, and errors are logged on Error level. * This can be adjusted according to your needs by providing a custom [[Attributes.LogLevels]] attribute on the given Flow: * - * Uses implicit [[MarkerLoggingAdapter]] if available, otherwise uses an internally created one, + * Uses implicit `MarkerLoggingAdapter` if available, otherwise uses an internally created one, * which uses `org.apache.pekko.event.Logging.withMarker(materializer.system, materializer)` as its source (use this class to configure slf4j loggers). * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. @@ -4051,7 +4051,7 @@ trait FlowOps[+Out, +Mat] { * Attaches the given [[Sink]] to this [[Source]], meaning that elements that pass * through will also be sent to the [[Sink]]. * - * It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]] is not ready. + * It is similar to `wireTap` but will backpressure instead of dropping elements when the given [[Sink]] is not ready. * * '''Emits when''' element is available and demand exists both from the Sink and the downstream. * @@ -4080,9 +4080,9 @@ trait FlowOps[+Out, +Mat] { * only. This is useful for fire-and-forget side sinks (e.g. logging) where the side sink's * availability should not affect the main business stream. * - * When `propagateCancellation` is `true` (the default), this behaves identically to [[#alsoTo]]. + * When `propagateCancellation` is `true` (the default), this behaves identically to `alsoTo`. * - * It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]] is not ready. + * It is similar to `wireTap` but will backpressure instead of dropping elements when the given [[Sink]] is not ready. * * '''Emits when''' element is available and demand exists both from the Sink and the downstream. * @@ -4113,7 +4113,7 @@ trait FlowOps[+Out, +Mat] { * Attaches the given [[Sink]]s to this [[Source]], meaning that elements that pass * through will also be sent to the [[Sink]]. * - * It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]]s is not ready. + * It is similar to `wireTap` but will backpressure instead of dropping elements when the given [[Sink]]s is not ready. * * '''Emits when''' element is available and demand exists both from the Sinks and the downstream. * @@ -4164,7 +4164,7 @@ trait FlowOps[+Out, +Mat] { * through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow. * If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead. * - * It is similar to [[#alsoTo]] which does backpressure instead of dropping elements. + * It is similar to `alsoTo` which does backpressure instead of dropping elements. * * '''Emits when''' element is available and demand exists from the downstream; the element will * also be sent to the wire-tap Sink if there is demand. @@ -4204,7 +4204,7 @@ trait FlowOps[+Out, +Mat] { /** * Aggregate input elements into an arbitrary data structure that can be completed and emitted downstream * when custom condition is met which can be triggered by aggregate or timer. - * It can be thought of a more general [[groupedWeightedWithin]]. + * It can be thought of a more general `groupedWeightedWithin`. * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * @@ -4625,7 +4625,7 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that pass * through will also be sent to the [[Sink]]. * - * @see [[#alsoTo]] + * @see `alsoTo` * * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners * where appropriate instead of manually writing functions that pass through one of the values. @@ -4637,7 +4637,7 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that pass * through will also be sent to the [[Sink]]. * - * @see [[#alsoTo]] + * @see `alsoTo` * * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners * where appropriate instead of manually writing functions that pass through one of the values. @@ -4667,9 +4667,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { * through will also be sent to the wire-tap Sink, without the latter affecting the mainline flow. * If the wire-tap Sink backpressures, elements that would've been sent to it will be dropped instead. * - * It is similar to [[#alsoToMat]] which does backpressure instead of dropping elements. + * It is similar to `alsoToMat` which does backpressure instead of dropping elements. * - * @see [[#wireTap]] + * @see `wireTap` * * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners * where appropriate instead of manually writing functions that pass through one of the values. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala index ac034481fc5..9c7ad02806a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala @@ -49,7 +49,7 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { * This can be used as an escape hatch for operations that are not (yet) provided with automatic * context propagation here. * - * @see [[pekko.stream.scaladsl.FlowOps.via]] + * @see `FlowOps.via` */ def via[Out2, Ctx2, Mat2](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Out2, Ctx2] @@ -88,76 +88,76 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { combine: (Mat, Mat2) => Mat3): ReprMat[Out2, Ctx2, Mat3] /** - * Data variant of [[pekko.stream.scaladsl.FlowOps.alsoTo]] + * Data variant of `FlowOps.alsoTo` * - * @see [[pekko.stream.scaladsl.FlowOps.alsoTo]] + * @see `FlowOps.alsoTo` * @since 1.1.0 */ def alsoTo(that: Graph[SinkShape[Out], ?]): Repr[Out, Ctx] /** - * Data variant of [[pekko.stream.scaladsl.FlowOps.alsoTo]] with configurable cancellation propagation. + * Data variant of `FlowOps.alsoTo` with configurable cancellation propagation. * - * @see [[pekko.stream.scaladsl.FlowOps.alsoTo]] + * @see `FlowOps.alsoTo` * @since 2.0.0 */ def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): Repr[Out, Ctx] /** - * Context variant of [[pekko.stream.scaladsl.FlowOps.alsoTo]] + * Context variant of `FlowOps.alsoTo` * - * @see [[pekko.stream.scaladsl.FlowOps.alsoTo]] + * @see `FlowOps.alsoTo` * @since 1.1.0 */ def alsoToContext(that: Graph[SinkShape[Ctx], ?]): Repr[Out, Ctx] /** - * Data variant of [[pekko.stream.scaladsl.FlowOps.wireTap]] + * Data variant of `FlowOps.wireTap` * - * @see [[pekko.stream.scaladsl.FlowOps.wireTap]] + * @see `FlowOps.wireTap` * @since 1.1.0 */ def wireTap(that: Graph[SinkShape[Out], ?]): Repr[Out, Ctx] /** - * Context variant of [[pekko.stream.scaladsl.FlowOps.wireTap]] + * Context variant of `FlowOps.wireTap` * - * @see [[pekko.stream.scaladsl.FlowOps.wireTap]] + * @see `FlowOps.wireTap` * @since 1.1.0 */ def wireTapContext(that: Graph[SinkShape[Ctx], ?]): Repr[Out, Ctx] /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.map]]. + * Context-preserving variant of `FlowOps.map`. * - * @see [[pekko.stream.scaladsl.FlowOps.map]] + * @see `FlowOps.map` */ def map[Out2](f: Out => Out2): Repr[Out2, Ctx] = via(flow.map { case (e, ctx) => (f(e), ctx) }) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.mapOption]]. + * Context-preserving variant of `FlowOps.mapOption`. * * Note, that the context of elements that are filtered out is skipped as well. * - * @see [[pekko.stream.scaladsl.FlowOps.mapOption]] + * @see `FlowOps.mapOption` * @since 2.0.0 */ def mapOption[Out2](f: Out => Option[Out2]): Repr[Out2, Ctx] = via(flow.mapOption { case (e, ctx) => f(e).map(_ -> ctx) }) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.mapError]]. + * Context-preserving variant of `FlowOps.mapError`. * - * @see [[pekko.stream.scaladsl.FlowOps.mapError]] + * @see `FlowOps.mapError` */ def mapError(pf: PartialFunction[Throwable, Throwable]): Repr[Out, Ctx] = via(flow.mapError(pf)) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.mapAsync]]. + * Context-preserving variant of `FlowOps.mapAsync`. * - * @see [[pekko.stream.scaladsl.FlowOps.mapAsync]] + * @see `FlowOps.mapAsync` */ def mapAsync[Out2](parallelism: Int)(f: Out => Future[Out2]): Repr[Out2, Ctx] = via(flow.mapAsync(parallelism) { @@ -165,10 +165,10 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { }) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.mapAsyncPartitioned]]. + * Context-preserving variant of `FlowOps.mapAsyncPartitioned`. * * @since 1.1.0 - * @see [[pekko.stream.scaladsl.FlowOps.mapAsyncPartitioned]] + * @see `FlowOps.mapAsyncPartitioned` */ def mapAsyncPartitioned[Out2, P](parallelism: Int)( partitioner: Out => P)( @@ -180,10 +180,10 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { } /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.mapAsyncPartitionedUnordered]]. + * Context-preserving variant of `FlowOps.mapAsyncPartitionedUnordered`. * * @since 1.1.0 - * @see [[pekko.stream.scaladsl.FlowOps.mapAsyncPartitionedUnordered]] + * @see `FlowOps.mapAsyncPartitionedUnordered` */ def mapAsyncPartitionedUnordered[Out2, P](parallelism: Int)( partitioner: Out => P)( @@ -195,11 +195,11 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { } /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.collect]]. + * Context-preserving variant of `FlowOps.collect`. * * Note, that the context of elements that are filtered out is skipped as well. * - * @see [[pekko.stream.scaladsl.FlowOps.collect]] + * @see `FlowOps.collect` */ def collect[Out2](f: PartialFunction[Out, Out2]): Repr[Out2, Ctx] = via(flow.collect { @@ -207,21 +207,21 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { }) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.filter]]. + * Context-preserving variant of `FlowOps.filter`. * * Note, that the context of elements that are filtered out is skipped as well. * - * @see [[pekko.stream.scaladsl.FlowOps.filter]] + * @see `FlowOps.filter` */ def filter(pred: Out => Boolean): Repr[Out, Ctx] = collect { case e if pred(e) => e } /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.filterNot]]. + * Context-preserving variant of `FlowOps.filterNot`. * * Note, that the context of elements that are filtered out is skipped as well. * - * @see [[pekko.stream.scaladsl.FlowOps.filterNot]] + * @see `FlowOps.filterNot` */ def filterNot(pred: Out => Boolean): Repr[Out, Ctx] = collect { case e if !pred(e) => e } @@ -229,7 +229,7 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { /** * Alias for [[filter]], added to enable filtering in for comprehensions. * - * @see [[pekko.stream.scaladsl.FlowOps.withFilter]] + * @see `FlowOps.withFilter` * @since 2.0.0 */ @ApiMayChange @@ -237,65 +237,65 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { filter(pred) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.dropRepeated]]. + * Context-preserving variant of `FlowOps.dropRepeated`. * - * @see [[pekko.stream.scaladsl.FlowOps.dropRepeated]] + * @see `FlowOps.dropRepeated` * @since 2.0.0 */ def dropRepeated(): Repr[Out, Ctx] = dropRepeated(ConstantFun.scalaAnyTwoEquals) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.dropRepeated]]. + * Context-preserving variant of `FlowOps.dropRepeated`. * - * @see [[pekko.stream.scaladsl.FlowOps.dropRepeated]] + * @see `FlowOps.dropRepeated` * @since 2.0.0 */ def dropRepeated(pred: (Out, Out) => Boolean): Repr[Out, Ctx] = via(flow.dropRepeated((left, right) => pred(left._1, right._1))) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.takeWhile]]. + * Context-preserving variant of `FlowOps.takeWhile`. * - * @see [[pekko.stream.scaladsl.FlowOps.takeWhile]] + * @see `FlowOps.takeWhile` * @since 2.0.0 */ def takeWhile(pred: Out => Boolean): Repr[Out, Ctx] = takeWhile(pred, inclusive = false) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.takeUntil]]. + * Context-preserving variant of `FlowOps.takeUntil`. * - * @see [[pekko.stream.scaladsl.FlowOps.takeUntil]] + * @see `FlowOps.takeUntil` * @since 2.0.0 */ def takeUntil(pred: Out => Boolean): Repr[Out, Ctx] = takeWhile(!pred(_), inclusive = true) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.takeWhile]]. + * Context-preserving variant of `FlowOps.takeWhile`. * - * @see [[pekko.stream.scaladsl.FlowOps.takeWhile]] + * @see `FlowOps.takeWhile` * @since 2.0.0 */ def takeWhile(pred: Out => Boolean, inclusive: Boolean): Repr[Out, Ctx] = via(flow.takeWhile({ case (e, _) => pred(e) }, inclusive)) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.dropWhile]]. + * Context-preserving variant of `FlowOps.dropWhile`. * - * @see [[pekko.stream.scaladsl.FlowOps.dropWhile]] + * @see `FlowOps.dropWhile` * @since 2.0.0 */ def dropWhile(pred: Out => Boolean): Repr[Out, Ctx] = via(flow.dropWhile { case (e, _) => pred(e) }) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.collectFirst]]. + * Context-preserving variant of `FlowOps.collectFirst`. * * Note, that the context of elements that are filtered out is skipped as well. * - * @see [[pekko.stream.scaladsl.FlowOps.collectFirst]] + * @see `FlowOps.collectFirst` * @since 2.0.0 */ def collectFirst[Out2](f: PartialFunction[Out, Out2]): Repr[Out2, Ctx] = @@ -304,11 +304,11 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { }) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.collectWhile]]. + * Context-preserving variant of `FlowOps.collectWhile`. * * Note, that the context of elements that are filtered out is skipped as well. * - * @see [[pekko.stream.scaladsl.FlowOps.collectWhile]] + * @see `FlowOps.collectWhile` * @since 2.0.0 */ def collectWhile[Out2](f: PartialFunction[Out, Out2]): Repr[Out2, Ctx] = @@ -317,76 +317,76 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { }) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.collectType]]. + * Context-preserving variant of `FlowOps.collectType`. * * Note, that the context of elements that are filtered out is skipped as well. * - * @see [[pekko.stream.scaladsl.FlowOps.collectType]] + * @see `FlowOps.collectType` * @since 2.0.0 */ def collectType[Out2](implicit tag: ClassTag[Out2]): Repr[Out2, Ctx] = collect { case tag(e) => e } /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.drop]]. + * Context-preserving variant of `FlowOps.drop`. * - * @see [[pekko.stream.scaladsl.FlowOps.drop]] + * @see `FlowOps.drop` * @since 2.0.0 */ def drop(n: Long): Repr[Out, Ctx] = via(flow.drop(n)) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.dropWithin]]. + * Context-preserving variant of `FlowOps.dropWithin`. * - * @see [[pekko.stream.scaladsl.FlowOps.dropWithin]] + * @see `FlowOps.dropWithin` * @since 2.0.0 */ def dropWithin(d: FiniteDuration): Repr[Out, Ctx] = via(flow.dropWithin(d)) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.take]]. + * Context-preserving variant of `FlowOps.take`. * - * @see [[pekko.stream.scaladsl.FlowOps.take]] + * @see `FlowOps.take` * @since 2.0.0 */ def take(n: Long): Repr[Out, Ctx] = via(flow.take(n)) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.takeWithin]]. + * Context-preserving variant of `FlowOps.takeWithin`. * - * @see [[pekko.stream.scaladsl.FlowOps.takeWithin]] + * @see `FlowOps.takeWithin` * @since 2.0.0 */ def takeWithin(d: FiniteDuration): Repr[Out, Ctx] = via(flow.takeWithin(d)) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.limit]]. + * Context-preserving variant of `FlowOps.limit`. * - * @see [[pekko.stream.scaladsl.FlowOps.limit]] + * @see `FlowOps.limit` * @since 2.0.0 */ def limit(max: Long): Repr[Out, Ctx] = limitWeighted(max)(_ => 1) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.limitWeighted]]. + * Context-preserving variant of `FlowOps.limitWeighted`. * - * @see [[pekko.stream.scaladsl.FlowOps.limitWeighted]] + * @see `FlowOps.limitWeighted` * @since 2.0.0 */ def limitWeighted(max: Long)(costFn: Out => Long): Repr[Out, Ctx] = via(flow.limitWeighted(max) { case (e, _) => costFn(e) }) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.grouped]]. + * Context-preserving variant of `FlowOps.grouped`. * * Each output group will be associated with a `Seq` of corresponding context elements. * - * @see [[pekko.stream.scaladsl.FlowOps.grouped]] + * @see `FlowOps.grouped` */ def grouped(n: Int): Repr[immutable.Seq[Out], immutable.Seq[Ctx]] = via(flow.grouped(n).map { elsWithContext => @@ -395,11 +395,11 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { }) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.sliding]]. + * Context-preserving variant of `FlowOps.sliding`. * * Each output group will be associated with a `Seq` of corresponding context elements. * - * @see [[pekko.stream.scaladsl.FlowOps.sliding]] + * @see `FlowOps.sliding` */ def sliding(n: Int, step: Int = 1): Repr[immutable.Seq[Out], immutable.Seq[Ctx]] = via(flow.sliding(n, step).map { elsWithContext => @@ -408,7 +408,7 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { }) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.mapConcat]]. + * Context-preserving variant of `FlowOps.mapConcat`. * * The context of the input element will be associated with each of the output elements calculated from * this input element. @@ -433,7 +433,7 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { * ("b", 2) * ``` * - * @see [[pekko.stream.scaladsl.FlowOps.mapConcat]] + * @see `FlowOps.mapConcat` */ def mapConcat[Out2](f: Out => IterableOnce[Out2]): Repr[Out2, Ctx] = via(flow.mapConcat { @@ -447,9 +447,9 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { via(flow.map { case (e, ctx) => (e, f(ctx)) }) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.log]]. + * Context-preserving variant of `FlowOps.log`. * - * @see [[pekko.stream.scaladsl.FlowOps.log]] + * @see `FlowOps.log` */ def log(name: String, extract: Out => Any = ConstantFun.scalaIdentityFunction)( implicit log: LoggingAdapter = null): Repr[Out, Ctx] = { @@ -458,9 +458,9 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { } /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.logWithMarker]]. + * Context-preserving variant of `FlowOps.logWithMarker`. * - * @see [[pekko.stream.scaladsl.FlowOps.logWithMarker]] + * @see `FlowOps.logWithMarker` */ def logWithMarker( name: String, @@ -472,33 +472,33 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { } /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.throttle]]. + * Context-preserving variant of `FlowOps.throttle`. * - * @see [[pekko.stream.scaladsl.FlowOps.throttle]] + * @see `FlowOps.throttle` */ def throttle(elements: Int, per: FiniteDuration): Repr[Out, Ctx] = throttle(elements, per, Throttle.AutomaticMaximumBurst, ConstantFun.oneInt, ThrottleMode.Shaping) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.throttle]]. + * Context-preserving variant of `FlowOps.throttle`. * - * @see [[pekko.stream.scaladsl.FlowOps.throttle]] + * @see `FlowOps.throttle` */ def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Repr[Out, Ctx] = throttle(elements, per, maximumBurst, ConstantFun.oneInt, mode) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.throttle]]. + * Context-preserving variant of `FlowOps.throttle`. * - * @see [[pekko.stream.scaladsl.FlowOps.throttle]] + * @see `FlowOps.throttle` */ def throttle(cost: Int, per: FiniteDuration, costCalculation: (Out) => Int): Repr[Out, Ctx] = throttle(cost, per, Throttle.AutomaticMaximumBurst, costCalculation, ThrottleMode.Shaping) /** - * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.throttle]]. + * Context-preserving variant of `FlowOps.throttle`. * - * @see [[pekko.stream.scaladsl.FlowOps.throttle]] + * @see `FlowOps.throttle` */ def throttle( cost: Int, diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Framing.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Framing.scala index feaeacb016b..61d966e3ee6 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Framing.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Framing.scala @@ -108,7 +108,7 @@ object Framing { } /** - * Returns a BidiFlow that implements a simple framing protocol. This is a convenience wrapper over [[Framing#lengthField]] + * Returns a BidiFlow that implements a simple framing protocol. This is a convenience wrapper over `Framing.lengthField` * and simply attaches a length field header of four bytes (using big endian encoding) to outgoing messages, and decodes * such messages in the inbound direction. The decoded messages do not contain the header. * {{{ diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala index 3136297a0d0..5135f156513 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala @@ -760,7 +760,7 @@ object WireTap { private val singleton = new WireTap[Nothing] /** - * @see [[WireTap]] + * @see `WireTap` */ def apply[T](): WireTap[T] = singleton.asInstanceOf[WireTap[T]] } @@ -1461,7 +1461,7 @@ object OrElse { private val singleton = new OrElse[Nothing] /** - * @see [[OrElse]] + * @see `OrElse` */ def apply[T](): OrElse[T] = singleton.asInstanceOf[OrElse[T]] } diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Queue.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Queue.scala index 7071c5f5a7c..9382d11ab55 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Queue.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Queue.scala @@ -32,23 +32,23 @@ import pekko.stream.QueueOfferResult trait SourceQueue[T] { /** - * Offers an element to a stream and returns a [[Future]] that: + * Offers an element to a stream and returns a `Future` that: * - completes with `Enqueued` if the element is consumed by a stream * - completes with `Dropped` when the stream dropped the offered element - * - completes with `QueueClosed` when the stream is completed whilst the [[Future]] is active + * - completes with `QueueClosed` when the stream is completed whilst the `Future` is active * - completes with `Failure(f)` in case of failure to enqueue element from upstream * - fails when stream is already completed * * Additionally when using the backpressure overflowStrategy: - * - If the buffer is full the [[Future]] won't be completed until there is space in the buffer - * - Calling offer before the [[Future]] is completed, in this case it will return a failed [[Future]] + * - If the buffer is full the `Future` won't be completed until there is space in the buffer + * - Calling offer before the `Future` is completed, in this case it will return a failed `Future` * * @param elem element to send to a stream */ def offer(elem: T): Future[QueueOfferResult] /** - * Returns a [[Future]] that will be completed if this operator + * Returns a `Future` that will be completed if this operator * completes, or will be failed when the operator faces an internal failure. * * Note that this only means the elements have been passed downstream, not @@ -78,7 +78,7 @@ trait SourceQueueWithComplete[T] extends SourceQueue[T] { def fail(ex: Throwable): Unit /** - * Method returns a [[Future]] that will be completed if this operator + * Method returns a `Future` that will be completed if this operator * completes, or will be failed when the stream fails, * for example when [[SourceQueueWithComplete.fail]] is invoked. * @@ -122,7 +122,7 @@ object SourceQueueWithComplete { trait SinkQueue[T] { /** - * Pulls elements from the stream and returns a [[Future]] that: + * Pulls elements from the stream and returns a `Future` that: * - fails if the stream is failed * - completes with None in case the stream is completed * - completes with `Some(element)` in case the next element is available from stream. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RetryFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RetryFlow.scala index ec1aeb7c0a2..b0817f92fe4 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RetryFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RetryFlow.scala @@ -39,7 +39,7 @@ object RetryFlow { * Let's say the flow is handling an element, either first-time executing some calculation, or retrying. * The next element won't be emitted into the flow until the current element has been finished processing. * By finished, it means either succeed the very first attempt, succeed after a few attempts, or get dropped after - * using up [[maxRetries]] retries. + * using up `maxRetries` retries. * * @param minBackoff minimum duration to backoff between issuing retries * @param maxBackoff maximum duration to backoff between issuing retries diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index a020284b639..7828e4b503f 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -187,7 +187,7 @@ object Sink { /** * A `Sink` that materializes into a `Future` of the first value received. - * If the stream completes before signaling at least a single element, the Future will be failed with a [[NoSuchElementException]]. + * If the stream completes before signaling at least a single element, the Future will be failed with a `NoSuchElementException`. * If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception. * * See also [[headOption]]. @@ -211,7 +211,7 @@ object Sink { /** * A `Sink` that materializes into a `Future` of the last value received. - * If the stream completes before signaling at least a single element, the Future will be failed with a [[NoSuchElementException]]. + * If the stream completes before signaling at least a single element, the Future will be failed with a `NoSuchElementException`. * If the stream signals an error, the Future will be failed with the stream's exception. * * See also [[lastOption]], [[takeLast]]. @@ -254,7 +254,7 @@ object Sink { * `Seq` is limited to `Int.MaxValue` elements, this Sink will cancel the stream * after having received that many elements. * - * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] + * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], `Flow.takeWhile` */ def seq[T]: Sink[T, Future[immutable.Seq[T]]] = Sink.fromGraph(new SeqStage[T, Vector[T]]) @@ -274,7 +274,7 @@ object Sink { * @return a `Sink` that materializes to a `Future[Long]` with the element count * @since 1.3.0 * - * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] + * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], `Flow.takeWhile` */ def count[T]: Sink[T, Future[Long]] = Sink.fromGraph(CountSink) @@ -287,7 +287,7 @@ object Sink { * `Int.MaxValue` elements. See [The Architecture of Scala 2.13's Collections](https://docs.scala-lang.org/overviews/core/architecture-of-scala-213-collections.html) for more info. * This Sink will cancel the stream after having received that many elements. * - * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] + * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], `Flow.takeWhile` */ def collection[T, That]( implicit cbf: scala.collection.Factory[T, That with immutable.Iterable[?]]): Sink[T, Future[That]] = @@ -543,7 +543,7 @@ object Sink { * if there is a failure signaled in the stream. * * If the stream is empty (i.e. completes before signalling any elements), - * the reduce operator will fail its downstream with a [[NoSuchElementException]], + * the reduce operator will fail its downstream with a `NoSuchElementException`, * which is semantically in-line with that Scala's standard library collections * do in such situations. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 6dab7058aba..e4b5fc9bbe2 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -124,7 +124,7 @@ final class Source[+Out, +Mat]( * Materializes this [[Source]] using the [[Sink]], immediately returning the values via the * provided [[Sink]] as a new [[Source]]. * - * @param sink A sink which needs to materialize into a [[Future]], typically one + * @param sink A sink which needs to materialize into a `Future`, typically one * that collects values such as [[Sink.head]] or [[Sink.seq]] * @return A new [[Source]] that contains the results of the provided [[Source]]'s * elements run with the [[Sink]] @@ -191,7 +191,7 @@ final class Source[+Out, +Mat]( * if there is a failure signaled in the stream. * * If the stream is empty (i.e. completes before signalling any elements), - * the reduce operator will fail its downstream with a [[NoSuchElementException]], + * the reduce operator will fail its downstream with a `NoSuchElementException`, * which is semantically in-line with that Scala's standard library collections * do in such situations. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/TLS.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/TLS.scala index f69d8ef577d..ce15dc94c08 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/TLS.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/TLS.scala @@ -140,7 +140,7 @@ object TLS { /** * This object holds simple wrapping [[pekko.stream.scaladsl.BidiFlow]] implementations that can * be used instead of [[TLS]] when no encryption is desired. The flows will - * just adapt the message protocol by wrapping into [[SessionBytes]] and + * just adapt the message protocol by wrapping into `SessionBytes` and * unwrapping [[SendBytes]]. */ object TLSPlacebo { diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/package.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/package.scala index a1b02caa0b7..331496d68b7 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/package.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/package.scala @@ -54,7 +54,7 @@ import scala.jdk.FutureConverters._ * often than for corresponding transformations on strict collections like * [[scala.collection.immutable.List]]. *An important consequence* is that elements that were produced * into a stream may be discarded by later processors, e.g. when using the - * [[#take]] operator. + * `take` operator. * * By default every operation is executed within its own [[org.apache.pekko.actor.Actor]] * to enable full pipelining of the chained set of computations. This behavior diff --git a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala index 6c04d0cfdd4..bae071f83a9 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala @@ -736,7 +736,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * can be retrieved via this method. After [[grab]] has been called the port is considered to be empty, and further * calls to [[grab]] will fail until the port is pulled again and a new element is pushed as a response. * - * The method [[isAvailable]] can be used to query if the port has an element that can be grabbed or not. + * The method `isAvailable` can be used to query if the port has an element that can be grabbed or not. */ final protected def grab[T](in: Inlet[T]): T = { val connection = conn(in) @@ -768,7 +768,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: /** * Indicates whether there is already a pending pull for the given input port. If this method returns true - * then [[isAvailable]] must return false for that same port. + * then `isAvailable` must return false for that same port. */ final protected def hasBeenPulled[T](in: Inlet[T]): Boolean = (conn(in).portState & (InReady | InClosed)) == 0 @@ -808,7 +808,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: /** * Emits an element through the given output port. Calling this method twice before a [[pull]] has been arrived - * will fail. There can be only one outstanding push request at any given time. The method [[isAvailable]] can be + * will fail. There can be only one outstanding push request at any given time. The method `isAvailable` can be * used to check if the port is ready to be pushed or not. */ final protected def push[T](out: Outlet[T], elem: T): Unit = { @@ -861,14 +861,14 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: final protected def fail[T](out: Outlet[T], ex: Throwable): Unit = interpreter.fail(conn(out), ex) /** - * Automatically invokes [[cancel]] or [[complete]] on all the input or output ports that have been called, + * Automatically invokes `cancel` or `complete` on all the input or output ports that have been called, * then marks the operator as stopped. */ final def completeStage(): Unit = internalCompleteStage(SubscriptionWithCancelException.StageWasCompleted, OptionVal.None) /** - * Automatically invokes [[cancel]] or [[complete]] on all the input or output ports that have been called, + * Automatically invokes `cancel` or `complete` on all the input or output ports that have been called, * then marks the stage as stopped. */ final def cancelStage(cause: Throwable): Unit = @@ -893,7 +893,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: } /** - * Automatically invokes [[cancel]] or [[fail]] on all the input or output ports that have been called, + * Automatically invokes `cancel` or `fail` on all the input or output ports that have been called, * then marks the operator as stopped. */ final def failStage(ex: Throwable): Unit = internalCompleteStage(ex, OptionVal.Some(ex)) From ae96916cd1f89caec3e892214d79d81a2f86aebf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Sun, 5 Jul 2026 21:35:46 +0800 Subject: [PATCH 2/2] fix: restore fully qualified package name in ActorSource scaladoc references Motivation: PR #3285 review comment from pjfanning: "should we keep the package name?" The original [[pekko.stream.javadsl.Source.queue]] and [[pekko.stream.scaladsl.Source.queue]] links were replaced with just `Source.queue`, dropping the package name and making the reference ambiguous. Modification: Restore the fully qualified package names in the scaladoc "See also" references for both javadsl and scaladsl ActorSource.scala files. Result: Scaladoc references now include the full package path, making it clear which Source.queue class is being referenced. Tests: Not run - docs only References: Refs apache/pekko#3285 --- .../org/apache/pekko/stream/typed/javadsl/ActorSource.scala | 2 +- .../org/apache/pekko/stream/typed/scaladsl/ActorSource.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala index 58ab1afffb0..cea0acd0c4c 100644 --- a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala +++ b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala @@ -51,7 +51,7 @@ object ActorSource { * The actor will be stopped when the stream is completed, failed or canceled from downstream, * i.e. you can watch it to get notified when that happens. * - * See also `Source.queue`. + * See also `org.apache.pekko.stream.javadsl.Source.queue`. * * @param bufferSize The size of the buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer diff --git a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorSource.scala b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorSource.scala index 1a910b8d0b7..5e79f4fd650 100644 --- a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorSource.scala +++ b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorSource.scala @@ -51,7 +51,7 @@ object ActorSource { * The actor will be stopped when the stream is completed, failed or canceled from downstream, * i.e. you can watch it to get notified when that happens. * - * See also `Source.queue`. + * See also `org.apache.pekko.stream.scaladsl.Source.queue`. * * @param bufferSize The size of the buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer