Skip to content

Commit 2162c78

Browse files
committed
4.x: Proposal, Streamable<T> / Streamer<T> extension for virtual
1 parent 6a27519 commit 2162c78

9 files changed

Lines changed: 369 additions & 4 deletions

File tree

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core;
15+
16+
import java.util.concurrent.CompletionStage;
17+
18+
import io.reactivex.rxjava4.disposables.Disposable;
19+
20+
public record CompletionStageDisposable<T>(CompletionStage<T> stage, Disposable disposable) {
21+
22+
}

src/main/java/io/reactivex/rxjava4/core/Streamable.java

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,15 @@
1313

1414
package io.reactivex.rxjava4.core;
1515

16+
import java.lang.reflect.InvocationTargetException;
17+
import java.util.Objects;
18+
import java.util.concurrent.*;
19+
1620
import io.reactivex.rxjava4.annotations.NonNull;
1721
import io.reactivex.rxjava4.disposables.*;
22+
import io.reactivex.rxjava4.exceptions.Exceptions;
23+
import io.reactivex.rxjava4.functions.Consumer;
24+
import io.reactivex.rxjava4.internal.operators.streamable.*;
1825

1926
/**
2027
* The {@code IAsyncEnumerable} of the Java world.
@@ -41,4 +48,105 @@ public abstract class Streamable<@NonNull T> {
4148
public final Streamer<T> stream() {
4249
return stream(new CompositeDisposable()); // FIXME, use a practically no-op disposable container instead
4350
}
51+
52+
/**
53+
* Returns an empty {@code Streamable} that never produces an item and just completes.
54+
* @param <T> the element type
55+
* @return the {@code Streamable} instance
56+
*/
57+
@NonNull
58+
public static <@NonNull T> Streamable<T> empty() {
59+
return new StreamableEmpty<>();
60+
}
61+
62+
/**
63+
* Returns a single-element {@code Streamable} that produces the constant item and completes.
64+
* @param <T> the element type
65+
* @param item the constant item to produce
66+
* @return the {@code Streamable} instance
67+
*/
68+
public static <@NonNull T> Streamable<T> just(@NonNull T item) {
69+
Objects.requireNonNull(item, "item is null");
70+
return new StreamableJust<>(item);
71+
}
72+
73+
/**
74+
* Consumes elements from this {@code Streamable} via the provided executor service.
75+
* @param consumer the callback that gets the elements until completion
76+
* @return a Disposable that let's one cancel the sequence asynchronously.
77+
*/
78+
@NonNull
79+
public final CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T> consumer) {
80+
CompositeDisposable canceller = new CompositeDisposable();
81+
return forEach(consumer, canceller, Executors.newVirtualThreadPerTaskExecutor());
82+
}
83+
84+
/**
85+
* Consumes elements from this {@code Streamable} via the provided executor service.
86+
* @param consumer the callback that gets the elements until completion
87+
* @param canceller the container to trigger cancellation of the sequence
88+
* @return the {@code CompletionStage} that gets notified when the sequence ends
89+
*/
90+
public final CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T> consumer, @NonNull DisposableContainer canceller) {
91+
return forEach(consumer, canceller, Executors.newVirtualThreadPerTaskExecutor());
92+
}
93+
94+
/**
95+
* Consumes elements from this {@code Streamable} via the provided executor service.
96+
* @param consumer the callback that gets the elements until completion
97+
* @param executor the service that hosts the blocking waits.
98+
* @return a Disposable that let's one cancel the sequence asynchronously.
99+
*/
100+
@NonNull
101+
public final CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T> consumer, @NonNull ExecutorService executor) {
102+
CompositeDisposable canceller = new CompositeDisposable();
103+
return forEach(consumer, canceller, executor);
104+
}
105+
106+
/**
107+
* Consumes elements from this {@code Streamable} via the provided executor service.
108+
* @param consumer the callback that gets the elements until completion
109+
* @param canceller the container to trigger cancellation of the sequence
110+
* @param executor the service that hosts the blocking waits.
111+
* @return the {@code CompletionStage} that gets notified when the sequence ends
112+
*/
113+
@SuppressWarnings("unchecked")
114+
public final CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T> consumer, @NonNull DisposableContainer canceller, @NonNull ExecutorService executor) {
115+
Objects.requireNonNull(consumer, "consumer is null");
116+
Objects.requireNonNull(canceller, "canceller is null");
117+
Objects.requireNonNull(executor, "executor is null");
118+
final Streamable<T> me = this;
119+
var future = executor.submit(() -> {
120+
try (var str = me.stream(canceller)) {
121+
while (!canceller.isDisposed()) {
122+
if (str.next().toCompletableFuture().join()) {
123+
consumer.accept(Objects.requireNonNull(str.current(), "The upstream Streamable " + me.getClass() + " produced a null element!"));
124+
} else {
125+
break;
126+
}
127+
}
128+
} catch (final Throwable crash) {
129+
Exceptions.throwIfFatal(crash);
130+
if (crash instanceof RuntimeException ex) {
131+
throw ex;
132+
}
133+
if (crash instanceof Exception ex) {
134+
throw ex;
135+
}
136+
throw new InvocationTargetException(crash);
137+
}
138+
return null;
139+
});
140+
canceller.add(Disposable.fromFuture(future));
141+
return new CompletionStageDisposable<Void>(StreamableHelper.toCompletionStage((Future<Void>)(Future<?>)future), canceller);
142+
}
143+
144+
/**
145+
* Consume this {@code Streamable} via the given flow-reactive-streams subscriber.
146+
* @param subscriber the subscriber to consume with.
147+
* @param executor the service that hosts the blocking waits.
148+
*/
149+
public final void subscribe(@NonNull Flow.Subscriber<? super T> subscriber, @NonNull ExecutorService executor) {
150+
151+
}
44152
}

src/main/java/io/reactivex/rxjava4/core/Streamer.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
* TODO proper docs
2626
* @since 4.0.0
2727
*/
28-
public interface Streamer<@NonNull T> {
28+
public interface Streamer<@NonNull T> extends AutoCloseable {
2929

3030
/**
3131
* Determine if there are more elements available from the source.
@@ -45,8 +45,16 @@ public interface Streamer<@NonNull T> {
4545

4646
/**
4747
* Called when the stream ends or gets cancelled. Should be always invoked.
48+
* TODO, this is inherited from {@code IAsyncDisposable} in C#...
4849
* @return the stage you can await to cleanups to happen
4950
*/
5051
@NonNull
51-
CompletionStage<Void> close();
52+
CompletionStage<Void> cancel();
53+
54+
/**
55+
* Make this Streamer a resource and a Closeable.
56+
*/
57+
default void close() {
58+
cancel().toCompletableFuture().join();
59+
}
5260
}

src/main/java/io/reactivex/rxjava4/disposables/Disposable.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public interface Disposable extends AutoCloseable {
3939

4040
/**
4141
* Dispose the resource, the operation should be idempotent.
42+
* @since 4.0.0
4243
*/
4344
default void close() {
4445
dispose();

src/main/java/io/reactivex/rxjava4/disposables/DisposableContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* Common interface to add and remove disposables from a container.
1818
* @since 2.0
1919
*/
20-
public interface DisposableContainer {
20+
public interface DisposableContainer extends Disposable {
2121

2222
/**
2323
* Adds a disposable to this container or disposes it if the

src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableEmpty.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,35 @@
1313

1414
package io.reactivex.rxjava4.internal.operators.streamable;
1515

16-
public class StreamableEmpty<T> {
16+
import java.util.NoSuchElementException;
17+
import java.util.concurrent.*;
1718

19+
import io.reactivex.rxjava4.annotations.NonNull;
20+
import io.reactivex.rxjava4.core.*;
21+
import io.reactivex.rxjava4.disposables.DisposableContainer;
22+
23+
public final class StreamableEmpty<T> extends Streamable<T> {
24+
25+
@Override
26+
public @NonNull Streamer<@NonNull T> stream(@NonNull DisposableContainer cancellation) {
27+
return new EmptyStreamer<T>();
28+
}
29+
30+
static final class EmptyStreamer<T> implements Streamer<T> {
31+
32+
@Override
33+
public @NonNull CompletionStage<Boolean> next() {
34+
return CompletableFuture.completedStage(false); // TODO would constant stages work here or is that contention?
35+
}
36+
37+
@Override
38+
public @NonNull T current() {
39+
throw new NoSuchElementException("This Streamable/Streamer never has elements");
40+
}
41+
42+
@Override
43+
public @NonNull CompletionStage<Void> cancel() {
44+
return CompletableFuture.completedStage(null); // TODO would constant stages work here or is that contention?
45+
}
46+
}
1847
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.internal.operators.streamable;
15+
16+
import java.util.concurrent.*;
17+
18+
public enum StreamableHelper {
19+
INSTANCE;
20+
21+
@SuppressWarnings("unchecked")
22+
public static <T> CompletionStage<T> toCompletionStage(Future<T> future) {
23+
if (future instanceof CompletionStage) {
24+
return (CompletionStage<T>) future;
25+
}
26+
return CompletableFuture.supplyAsync(() -> {
27+
try {
28+
return future.get();
29+
} catch (Exception e) {
30+
throw new CompletionException(e);
31+
}
32+
});
33+
}
34+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.internal.operators.streamable;
15+
16+
import java.util.NoSuchElementException;
17+
import java.util.concurrent.*;
18+
19+
import io.reactivex.rxjava4.annotations.NonNull;
20+
import io.reactivex.rxjava4.core.*;
21+
import io.reactivex.rxjava4.disposables.*;
22+
23+
public final class StreamableJust<T> extends Streamable<T> {
24+
25+
final T item;
26+
27+
public StreamableJust(T item) {
28+
this.item = item;
29+
}
30+
31+
@Override
32+
public @NonNull Streamer<@NonNull T> stream(@NonNull DisposableContainer cancellation) {
33+
return new JustStreamer<T>(item, cancellation);
34+
}
35+
36+
static final class JustStreamer<T> implements Streamer<T>, Disposable {
37+
38+
volatile T item;
39+
40+
volatile DisposableContainer cancellation;
41+
42+
volatile int stage;
43+
44+
JustStreamer(T item, DisposableContainer cancellation) {
45+
this.item = item;
46+
this.cancellation = cancellation;
47+
cancellation.add(this);
48+
}
49+
50+
@Override
51+
public @NonNull CompletionStage<Boolean> next() {
52+
if (stage == 0) {
53+
stage = 1;
54+
return CompletableFuture.completedStage(true);
55+
}
56+
item = null;
57+
cancellation = null;
58+
stage = 2;
59+
return CompletableFuture.completedStage(false); // TODO would constant stages work here or is that contention?
60+
}
61+
62+
@Override
63+
public @NonNull T current() {
64+
var item = this.item;
65+
if (stage == 0) {
66+
throw new NoSuchElementException("Streamable.just not yet started!");
67+
}
68+
if (stage == 2) {
69+
throw new NoSuchElementException("Streamable.just already completed!");
70+
}
71+
return item;
72+
}
73+
74+
@Override
75+
public @NonNull CompletionStage<Void> cancel() {
76+
item = null;
77+
cancellation = null;
78+
stage = 2;
79+
return CompletableFuture.completedStage(null); // TODO would constant stages work here or is that contention?
80+
}
81+
82+
@Override
83+
public void close() {
84+
Streamer.super.close();
85+
}
86+
87+
@Override
88+
public boolean isDisposed() {
89+
return stage == 2;
90+
}
91+
92+
@Override
93+
public void dispose() {
94+
var dc = cancellation;
95+
if (dc != null) {
96+
if (dc.delete(this)) {
97+
close(); // FIXME not sure about this!
98+
}
99+
}
100+
}
101+
}
102+
}

0 commit comments

Comments
 (0)