Skip to content

Commit 5072e1e

Browse files
committed
4.x: Enhance Streamable/Streamer and DisposableContainer APIs
1 parent 2e84b74 commit 5072e1e

File tree

6 files changed

+80
-5
lines changed

6 files changed

+80
-5
lines changed

src/main/java/io/reactivex/rxjava4/core/Streamer.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.concurrent.CompletionStage;
1818

1919
import io.reactivex.rxjava4.annotations.NonNull;
20+
import io.reactivex.rxjava4.disposables.*;
2021

2122
/**
2223
* A realized stream which can then be consumed asynchronously in steps.
@@ -29,10 +30,21 @@ public interface Streamer<@NonNull T> extends AutoCloseable {
2930

3031
/**
3132
* Determine if there are more elements available from the source.
33+
* @param cancellation ability to perform cancellation on a per-virtual-pull request.
3234
* @return eventually true or false, indicating availability or termination
3335
*/
3436
@NonNull
35-
CompletionStage<Boolean> next();
37+
CompletionStage<Boolean> next(@NonNull DisposableContainer cancellation);
38+
39+
/**
40+
* Determine if there are more elements available from the source.
41+
* Uses a default, individual {@link CompositeDisposable} to manage cancellation.
42+
* @return eventually true or false, indicating availability or termination
43+
*/
44+
@NonNull
45+
default CompletionStage<Boolean> next() {
46+
return next(new CompositeDisposable());
47+
}
3648

3749
/**
3850
* Returns the current element if {@link #next()} yielded {@code true}.
@@ -52,7 +64,7 @@ public interface Streamer<@NonNull T> extends AutoCloseable {
5264
CompletionStage<Void> cancel();
5365

5466
/**
55-
* Make this Streamer a resource and a Closeable.
67+
* Make this Streamer a resource and a Closeable, allowing virtually blocking closing.
5668
*/
5769
default void close() {
5870
cancel().toCompletableFuture().join();

src/main/java/io/reactivex/rxjava4/disposables/CompositeDisposable.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,4 +255,17 @@ void dispose(@Nullable OpenHashSet<Disposable> set) {
255255
throw new CompositeException(errors);
256256
}
257257
}
258+
259+
@Override
260+
public void reset() {
261+
if (disposed) {
262+
return;
263+
}
264+
synchronized (this) {
265+
if (disposed) {
266+
return;
267+
}
268+
resources = null;
269+
}
270+
}
258271
}

src/main/java/io/reactivex/rxjava4/disposables/DisposableContainer.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,40 @@ public interface DisposableContainer extends Disposable {
4242
* @return true if the operation was successful
4343
*/
4444
boolean delete(Disposable d);
45+
46+
/**
47+
* Removes all contained {@link Disposable}s without disposing them, making this
48+
* container fresh.
49+
*/
50+
void reset();
51+
52+
/**
53+
* Removes and disposes all contained {@link Disposable}s, making this container fresh
54+
* without disposing the entire container.
55+
*/
56+
void clear();
57+
58+
/**
59+
* Registers a {@link Disposable} with this container so that it can be removed and disposed
60+
* via a simple {@link #dispose()} call to the returned Disposable.
61+
* @param d the disposable to register
62+
* @return the Disposable to trigger a {@link #remove(Disposable)}
63+
* @see #subscribe(Disposable) for non-disposing removal.
64+
*/
65+
default Disposable register(Disposable d) {
66+
add(d);
67+
return Disposable.fromRunnable(() -> remove(d));
68+
}
69+
70+
/**
71+
* Registers a {@link Disposable} with this container so that it can be deleted, not disposed
72+
* via a simple {@link #dispose()} call to the returned Disposable.
73+
* @param d the disposable to register
74+
* @return the Disposable to trigger a {@link #remove(Disposable)}
75+
* @see #subscribe(Disposable) for non-disposing removal.
76+
*/
77+
default Disposable subscribe(Disposable d) {
78+
add(d);
79+
return Disposable.fromRunnable(() -> delete(d));
80+
}
4581
}

src/main/java/io/reactivex/rxjava4/internal/disposables/ListCompositeDisposable.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,4 +184,18 @@ void dispose(List<Disposable> set) {
184184
throw new CompositeException(errors);
185185
}
186186
}
187+
188+
@Override
189+
public void reset() {
190+
if (disposed) {
191+
return;
192+
}
193+
synchronized (this) {
194+
if (disposed) {
195+
return;
196+
}
197+
resources = null;
198+
}
199+
}
200+
187201
}

src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableEmpty.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import io.reactivex.rxjava4.annotations.NonNull;
2020
import io.reactivex.rxjava4.core.*;
21-
import io.reactivex.rxjava4.disposables.DisposableContainer;
21+
import io.reactivex.rxjava4.disposables.*;
2222

2323
public final class StreamableEmpty<T> extends Streamable<T> {
2424

@@ -30,7 +30,7 @@ public final class StreamableEmpty<T> extends Streamable<T> {
3030
static final class EmptyStreamer<T> implements Streamer<T> {
3131

3232
@Override
33-
public @NonNull CompletionStage<Boolean> next() {
33+
public @NonNull CompletionStage<Boolean> next(DisposableContainer cancellation) {
3434
return CompletableFuture.completedStage(false); // TODO would constant stages work here or is that contention?
3535
}
3636

src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableJust.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ static final class JustStreamer<T> implements Streamer<T>, Disposable {
4848
}
4949

5050
@Override
51-
public @NonNull CompletionStage<Boolean> next() {
51+
public @NonNull CompletionStage<Boolean> next(DisposableContainer cancellation) {
5252
if (stage == 0) {
5353
stage = 1;
5454
return CompletableFuture.completedStage(true);

0 commit comments

Comments
 (0)