Skip to content

Commit 6a27519

Browse files
committed
4.x: Introduce Streamable / Streamer after IAsyncEnumerable /
IAsyncEnumreator
1 parent ba12301 commit 6a27519

97 files changed

Lines changed: 497 additions & 5 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core;
15+
16+
import io.reactivex.rxjava4.annotations.NonNull;
17+
import io.reactivex.rxjava4.disposables.*;
18+
19+
/**
20+
* The {@code IAsyncEnumerable} of the Java world.
21+
* Runs best with Virtual Threads.
22+
* TODO proper docs
23+
* @param <T> the element type of the stream.
24+
* @since 4.0.0
25+
*/
26+
public abstract class Streamable<@NonNull T> {
27+
28+
/**
29+
* Realizes the stream and returns an interface that let's one consume it.
30+
* @param cancellation where to register and listen for cancellation calls.
31+
* @return the Streamer instance to consume.
32+
*/
33+
@NonNull
34+
public abstract Streamer<T> stream(@NonNull DisposableContainer cancellation);
35+
36+
/**
37+
* Realizes the stream and returns an interface that let's one consume it.
38+
* @return the Streamer instance to consume.
39+
*/
40+
@NonNull
41+
public final Streamer<T> stream() {
42+
return stream(new CompositeDisposable()); // FIXME, use a practically no-op disposable container instead
43+
}
44+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core;
15+
16+
import java.util.NoSuchElementException;
17+
import java.util.concurrent.CompletionStage;
18+
19+
import io.reactivex.rxjava4.annotations.NonNull;
20+
21+
/**
22+
* A realized stream which can then be consumed asynchronously in steps.
23+
* Think of it as the {@IAsyncEnumerator} of the Java world. Runs best on Virtual Threads.
24+
* @param <T> the element type.
25+
* TODO proper docs
26+
* @since 4.0.0
27+
*/
28+
public interface Streamer<@NonNull T> {
29+
30+
/**
31+
* Determine if there are more elements available from the source.
32+
* @return eventually true or false, indicating availability or termination
33+
*/
34+
@NonNull
35+
CompletionStage<Boolean> next();
36+
37+
/**
38+
* Returns the current element if {@link #next()} yielded {@code true}.
39+
* Can be called multiple times between {@link #next()} calls.
40+
* @return the current element
41+
* @throws NoSuchElementException before the very first {@link #next()} or after {@link #next()} returned {@code false}
42+
*/
43+
@NonNull
44+
T current();
45+
46+
/**
47+
* Called when the stream ends or gets cancelled. Should be always invoked.
48+
* @return the stage you can await to cleanups to happen
49+
*/
50+
@NonNull
51+
CompletionStage<Void> close();
52+
}

src/main/java/io/reactivex/rxjava4/disposables/Disposable.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,19 @@
1313

1414
package io.reactivex.rxjava4.disposables;
1515

16+
import java.util.Objects;
17+
import java.util.concurrent.Flow.Subscription;
18+
import java.util.concurrent.Future;
19+
1620
import io.reactivex.rxjava4.annotations.NonNull;
1721
import io.reactivex.rxjava4.functions.Action;
1822
import io.reactivex.rxjava4.internal.disposables.EmptyDisposable;
1923
import io.reactivex.rxjava4.internal.functions.Functions;
20-
import static java.util.concurrent.Flow.*;
21-
22-
import java.util.Objects;
23-
import java.util.concurrent.Future;
2424

2525
/**
2626
* Represents a disposable resource.
2727
*/
28-
public interface Disposable {
28+
public interface Disposable extends AutoCloseable {
2929
/**
3030
* Dispose the resource, the operation should be idempotent.
3131
*/
@@ -37,6 +37,13 @@ public interface Disposable {
3737
*/
3838
boolean isDisposed();
3939

40+
/**
41+
* Dispose the resource, the operation should be idempotent.
42+
*/
43+
default void close() {
44+
dispose();
45+
}
46+
4047
/**
4148
* Construct a {@code Disposable} by wrapping a {@link Runnable} that is
4249
* executed exactly once when the {@code Disposable} is disposed.

src/main/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableRefCount.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ protected void subscribeActual(Subscriber<? super T> s) {
9191
}
9292
}
9393

94+
@SuppressWarnings("resource")
9495
void cancel(RefConnection rc) {
9596
SequentialDisposable sd;
9697
synchronized (this) {

src/main/java/io/reactivex/rxjava4/internal/operators/observable/ObservableAmb.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public void subscribeActual(Observer<? super T> observer) {
6868
return;
6969
}
7070

71+
@SuppressWarnings("resource")
7172
AmbCoordinator<T> ac = new AmbCoordinator<>(observer, count);
7273
ac.subscribe(sources);
7374
}

src/main/java/io/reactivex/rxjava4/internal/operators/observable/ObservableCombineLatest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ public void subscribeActual(Observer<? super R> observer) {
7272
return;
7373
}
7474

75+
@SuppressWarnings("resource")
7576
LatestCoordinator<T, R> lc = new LatestCoordinator<>(observer, combiner, count, bufferSize, delayError);
7677
lc.subscribe(sources);
7778
}

src/main/java/io/reactivex/rxjava4/internal/operators/observable/ObservableRefCount.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ protected void subscribeActual(Observer<? super T> observer) {
8888
}
8989
}
9090

91+
@SuppressWarnings("resource")
9192
void cancel(RefConnection rc) {
9293
SequentialDisposable sd;
9394
synchronized (this) {

src/main/java/io/reactivex/rxjava4/internal/operators/observable/ObservableZip.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public void subscribeActual(Observer<? super R> observer) {
6868
return;
6969
}
7070

71+
@SuppressWarnings("resource")
7172
ZipCoordinator<T, R> zc = new ZipCoordinator<>(observer, zipper, count, delayError);
7273
zc.subscribe(sources, bufferSize);
7374
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.internal.operators.streamable;
15+
16+
public class StreamableEmpty<T> {
17+
18+
}

src/main/java/io/reactivex/rxjava4/internal/schedulers/ExecutorScheduler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ public Disposable schedule(@NonNull Runnable run) {
166166
task = interruptibleTask;
167167
disposable = interruptibleTask;
168168
} else {
169+
@SuppressWarnings("resource")
169170
BooleanRunnable runnableTask = new BooleanRunnable(decoratedRun);
170171

171172
task = runnableTask;

0 commit comments

Comments
 (0)