Skip to content

Commit 2b0f6b8

Browse files
authored
add jspecify nullable annotations on streams Java DSL (#2617)
* add jspecify nullable annotations on streams Java DSL * Update package-info.java * stream-typed * testkit
1 parent f207a17 commit 2b0f6b8

12 files changed

Lines changed: 134 additions & 40 deletions

File tree

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,7 @@ lazy val streamTyped = pekkoModule("stream-typed")
573573
streamTestkit % "test->test",
574574
actorTestkitTyped % "test->test",
575575
actorTypedTests % "test->test")
576+
.settings(Dependencies.streamTyped)
576577
.settings(AutomaticModuleName.settings("pekko.stream.typed"))
577578
.enablePlugins(ScaladocNoVerificationOfDiagrams)
578579

project/Dependencies.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ object Dependencies {
109109

110110
val logback = "ch.qos.logback" % "logback-classic" % logbackVersion
111111

112+
val jspecify = "org.jspecify" % "jspecify" % "1.0.0" % Optional
113+
112114
object Docs {
113115
val sprayJson = "io.spray" %% "spray-json" % "1.3.6" % Test
114116
val gson = "com.google.code.gson" % "gson" % "2.13.2" % Test
@@ -362,9 +364,12 @@ object Dependencies {
362364

363365
// pekko stream
364366

365-
lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams, TestDependencies.scalatest)
367+
lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams, jspecify, TestDependencies.scalatest)
368+
369+
lazy val streamTyped = l ++= Seq[sbt.ModuleID](jspecify)
366370

367371
lazy val streamTestkit = l ++= Seq(
372+
jspecify,
368373
TestDependencies.scalatest,
369374
TestDependencies.scalatestScalaCheck,
370375
TestDependencies.junit)
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
/**
19+
* Java API for Pekko Streams TestKit.
20+
*
21+
* <p>This package contains the Java DSL for Pekko Streams TestKit. For the Scala DSL see
22+
* [[org.apache.pekko.stream.testkit.scaladsl]].
23+
*/
24+
@org.jspecify.annotations.NullMarked
25+
package org.apache.pekko.stream.testkit.javadsl;
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
/**
19+
* Java API for Pekko Streams Typed.
20+
*
21+
* <p>This package contains the Java DSL for Pekko Streams Typed. For the Scala DSL see
22+
* [[org.apache.pekko.stream.typed.scaladsl]].
23+
*/
24+
@org.jspecify.annotations.NullMarked
25+
package org.apache.pekko.stream.typed.javadsl;
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
/**
19+
* Java API for Pekko Streams.
20+
*
21+
* <p>This package contains the Java DSL for Pekko Streams. For the Scala DSL see
22+
* [[org.apache.pekko.stream.scaladsl]].
23+
*/
24+
@org.jspecify.annotations.NullMarked
25+
package org.apache.pekko.stream.javadsl;

stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
4343
import pekko.util.ConstantFun
4444
import pekko.util.Timeout
4545

46+
import org.jspecify.annotations.Nullable
4647
import org.reactivestreams.Processor
4748

4849
object Flow {
@@ -3496,7 +3497,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
34963497
* '''Cancels when''' downstream cancels
34973498
*/
34983499
def interleaveAll(
3499-
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
3500+
@Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
35003501
segmentSize: Int,
35013502
eagerClose: Boolean): javadsl.Flow[In, Out, Mat] = {
35023503
import pekko.util.Collections._
@@ -3580,7 +3581,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
35803581
* '''Cancels when''' downstream cancels
35813582
*/
35823583
def mergeAll(
3583-
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
3584+
@Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
35843585
eagerComplete: Boolean): javadsl.Flow[In, Out, Mat] = {
35853586
import pekko.util.Collections._
35863587
val seq = if (those ne null) those.collectToImmutableSeq {
@@ -4322,7 +4323,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
43224323
*
43234324
* '''Cancels when''' downstream cancels
43244325
*/
4325-
def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): javadsl.Flow[In, Out, Mat] =
4326+
def log(name: String, extract: function.Function[Out, Any], @Nullable log: LoggingAdapter)
4327+
: javadsl.Flow[In, Out, Mat] =
43264328
new Flow(delegate.log(name, e => extract.apply(e))(log))
43274329

43284330
/**
@@ -4363,7 +4365,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
43634365
*
43644366
* '''Cancels when''' downstream cancels
43654367
*/
4366-
def log(name: String, log: LoggingAdapter): javadsl.Flow[In, Out, Mat] =
4368+
def log(name: String, @Nullable log: LoggingAdapter): javadsl.Flow[In, Out, Mat] =
43674369
this.log(name, ConstantFun.javaIdentityFunction[Out], log)
43684370

43694371
/**
@@ -4410,7 +4412,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
44104412
name: String,
44114413
marker: function.Function[Out, LogMarker],
44124414
extract: function.Function[Out, Any],
4413-
log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] =
4415+
@Nullable log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] =
44144416
new Flow(delegate.logWithMarker(name, e => marker.apply(e), e => extract.apply(e))(log))
44154417

44164418
/**
@@ -4457,7 +4459,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
44574459
def logWithMarker(
44584460
name: String,
44594461
marker: function.Function[Out, LogMarker],
4460-
log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] =
4462+
@Nullable log: MarkerLoggingAdapter): javadsl.Flow[In, Out, Mat] =
44614463
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log)
44624464

44634465
/**

stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import pekko.japi.{ function, Pair }
2929
import pekko.stream._
3030
import pekko.util.ConstantFun
3131

32+
import org.jspecify.annotations.Nullable
33+
3234
object FlowWithContext {
3335

3436
def create[In, Ctx](): FlowWithContext[In, Ctx, In, Ctx, pekko.NotUsed] =
@@ -304,7 +306,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
304306
def log(
305307
name: String,
306308
extract: function.Function[Out, Any],
307-
log: LoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
309+
@Nullable log: LoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
308310
viaScala(_.log(name, e => extract.apply(e))(log))
309311

310312
/**
@@ -320,7 +322,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
320322
*
321323
* @see [[pekko.stream.javadsl.Flow.log]]
322324
*/
323-
def log(name: String, log: LoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
325+
def log(name: String, @Nullable log: LoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
324326
this.log(name, ConstantFun.javaIdentityFunction[Out], log)
325327

326328
/**
@@ -340,7 +342,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
340342
name: String,
341343
marker: function.Function2[Out, CtxOut, LogMarker],
342344
extract: function.Function[Out, Any],
343-
log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
345+
@Nullable log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
344346
viaScala(_.logWithMarker(name, (e, c) => marker.apply(e, c), e => extract.apply(e))(log))
345347

346348
/**
@@ -362,7 +364,7 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
362364
def logWithMarker(
363365
name: String,
364366
marker: function.Function2[Out, CtxOut, LogMarker],
365-
log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
367+
@Nullable log: MarkerLoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
366368
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log)
367369

368370
/**

stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import pekko.stream.impl.LinearTraversalBuilder
3434
import pekko.stream.scaladsl.SinkToCompletionStage
3535
import pekko.util.ConstantFun.scalaAnyToUnit
3636

37+
import org.jspecify.annotations.Nullable
3738
import org.reactivestreams.{ Publisher, Subscriber }
3839

3940
/** Java API */
@@ -435,7 +436,7 @@ object Sink {
435436
def combine[T, U](
436437
output1: Sink[U, _],
437438
output2: Sink[U, _],
438-
rest: java.util.List[Sink[U, _]],
439+
@Nullable rest: java.util.List[Sink[U, _]],
439440
fanOutStrategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]])
440441
: Sink[T, NotUsed] = {
441442
import scala.jdk.CollectionConverters._
@@ -462,7 +463,7 @@ object Sink {
462463
* @since 1.1.0
463464
*/
464465
def combine[T, U, M](
465-
sinks: java.util.List[_ <: Graph[SinkShape[U], M]],
466+
@Nullable sinks: java.util.List[_ <: Graph[SinkShape[U], M]],
466467
fanOutStrategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]])
467468
: Sink[T, java.util.List[M]] = {
468469
import pekko.util.Collections._

stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import pekko.stream.impl.Stages.DefaultAttributes
4141
import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
4242
import pekko.util._
4343

44+
import org.jspecify.annotations.Nullable
4445
import org.reactivestreams.{ Publisher, Subscriber }
4546

4647
/** Java API */
@@ -537,7 +538,7 @@ object Source {
537538
def combine[T, U](
538539
first: Source[T, _ <: Any],
539540
second: Source[T, _ <: Any],
540-
rest: java.util.List[Source[T, _ <: Any]],
541+
@Nullable rest: java.util.List[Source[T, _ <: Any]],
541542
fanInStrategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]])
542543
: Source[U, NotUsed] = {
543544
import scala.jdk.CollectionConverters._
@@ -563,7 +564,7 @@ object Source {
563564
* @since 1.1.0
564565
*/
565566
def combine[T, U, M](
566-
sources: java.util.List[_ <: Graph[SourceShape[T], M]],
567+
@Nullable sources: java.util.List[_ <: Graph[SourceShape[T], M]],
567568
fanInStrategy: function.Function[java.lang.Integer, Graph[UniformFanInShape[T, U], NotUsed]])
568569
: Source[U, java.util.List[M]] = {
569570
import pekko.util.Collections._
@@ -578,7 +579,7 @@ object Source {
578579
/**
579580
* Combine the elements of multiple streams into a stream of lists.
580581
*/
581-
def zipN[T](sources: java.util.List[Source[T, _ <: Any]]): Source[java.util.List[T], NotUsed] = {
582+
def zipN[T](@Nullable sources: java.util.List[Source[T, _ <: Any]]): Source[java.util.List[T], NotUsed] = {
582583
import scala.jdk.CollectionConverters._
583584
val seq = if (sources ne null) sources.asScala.map(_.asScala).toSeq else immutable.Seq()
584585
new Source(scaladsl.Source.zipN(seq).map(_.asJava))
@@ -589,7 +590,7 @@ object Source {
589590
*/
590591
def zipWithN[T, O](
591592
zipper: function.Function[java.util.List[T], O],
592-
sources: java.util.List[Source[T, _ <: Any]]): Source[O, NotUsed] = {
593+
@Nullable sources: java.util.List[Source[T, _ <: Any]]): Source[O, NotUsed] = {
593594
import scala.jdk.CollectionConverters._
594595
val seq = if (sources ne null) sources.asScala.map(_.asScala).toSeq else immutable.Seq()
595596
new Source(scaladsl.Source.zipWithN[T, O](seq => zipper.apply(seq.asJava))(seq))
@@ -837,7 +838,7 @@ object Source {
837838
* '''Cancels when''' downstream cancels
838839
*/
839840
def mergePrioritizedN[T](
840-
sourcesAndPriorities: java.util.List[Pair[Source[T, _ <: Any], java.lang.Integer]],
841+
@Nullable sourcesAndPriorities: java.util.List[Pair[Source[T, _ <: Any], java.lang.Integer]],
841842
eagerComplete: Boolean): javadsl.Source[T, NotUsed] = {
842843
import scala.jdk.CollectionConverters._
843844
val seq =
@@ -1618,7 +1619,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
16181619
* '''Cancels when''' downstream cancels
16191620
*/
16201621
def interleaveAll(
1621-
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
1622+
@Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
16221623
segmentSize: Int,
16231624
eagerClose: Boolean): javadsl.Source[Out, Mat] = {
16241625
import pekko.util.Collections._
@@ -1700,7 +1701,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
17001701
* '''Cancels when''' downstream cancels
17011702
*/
17021703
def mergeAll(
1703-
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
1704+
@Nullable those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
17041705
eagerComplete: Boolean): javadsl.Source[Out, Mat] = {
17051706
import pekko.util.Collections._
17061707
val seq = if (those ne null) those.collectToImmutableSeq {
@@ -4805,7 +4806,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
48054806
*
48064807
* '''Cancels when''' downstream cancels
48074808
*/
4808-
def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): javadsl.Source[Out, Mat] =
4809+
def log(name: String, extract: function.Function[Out, Any], @Nullable log: LoggingAdapter): javadsl.Source[Out, Mat] =
48094810
new Source(delegate.log(name, e => extract.apply(e))(log))
48104811

48114812
/**
@@ -4846,7 +4847,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
48464847
*
48474848
* '''Cancels when''' downstream cancels
48484849
*/
4849-
def log(name: String, log: LoggingAdapter): javadsl.Source[Out, Mat] =
4850+
def log(name: String, @Nullable log: LoggingAdapter): javadsl.Source[Out, Mat] =
48504851
this.log(name, ConstantFun.javaIdentityFunction[Out], log)
48514852

48524853
/**
@@ -4893,7 +4894,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
48934894
name: String,
48944895
marker: function.Function[Out, LogMarker],
48954896
extract: function.Function[Out, Any],
4896-
log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
4897+
@Nullable log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
48974898
new Source(delegate.logWithMarker(name, e => marker.apply(e), e => extract.apply(e))(log))
48984899

48994900
/**
@@ -4940,7 +4941,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
49404941
def logWithMarker(
49414942
name: String,
49424943
marker: function.Function[Out, LogMarker],
4943-
log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
4944+
@Nullable log: MarkerLoggingAdapter): javadsl.Source[Out, Mat] =
49444945
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log)
49454946

49464947
/**

stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import pekko.japi.function
3131
import pekko.stream._
3232
import pekko.util.ConstantFun
3333

34+
import org.jspecify.annotations.Nullable
35+
3436
object SourceWithContext {
3537

3638
/**
@@ -290,7 +292,8 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
290292
*
291293
* @see [[pekko.stream.javadsl.Source.log]]
292294
*/
293-
def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
295+
def log(name: String, extract: function.Function[Out, Any], @Nullable log: LoggingAdapter)
296+
: SourceWithContext[Out, Ctx, Mat] =
294297
viaScala(_.log(name, e => extract.apply(e))(log))
295298

296299
/**
@@ -306,7 +309,7 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
306309
*
307310
* @see [[pekko.stream.javadsl.Flow.log]]
308311
*/
309-
def log(name: String, log: LoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
312+
def log(name: String, @Nullable log: LoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
310313
this.log(name, ConstantFun.javaIdentityFunction[Out], log)
311314

312315
/**
@@ -326,7 +329,7 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
326329
name: String,
327330
marker: function.Function2[Out, Ctx, LogMarker],
328331
extract: function.Function[Out, Any],
329-
log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
332+
@Nullable log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
330333
viaScala(_.logWithMarker(name, (e, c) => marker.apply(e, c), e => extract.apply(e))(log))
331334

332335
/**
@@ -348,7 +351,7 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
348351
def logWithMarker(
349352
name: String,
350353
marker: function.Function2[Out, Ctx, LogMarker],
351-
log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
354+
@Nullable log: MarkerLoggingAdapter): SourceWithContext[Out, Ctx, Mat] =
352355
this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], log)
353356

354357
/**

0 commit comments

Comments
 (0)