Skip to content

Commit 5ab46d2

Browse files
committed
Fix bad cancellation storm
1 parent 0d269a1 commit 5ab46d2

4 files changed

Lines changed: 35 additions & 5 deletions

File tree

src/main/java/io/reactivex/rxjava4/core/Streamable.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import java.lang.reflect.InvocationTargetException;
1717
import java.util.*;
1818
import java.util.concurrent.*;
19-
import java.util.concurrent.atomic.AtomicInteger;
2019

2120
import io.reactivex.rxjava4.annotations.*;
2221
import io.reactivex.rxjava4.disposables.*;
@@ -203,13 +202,10 @@ static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source, @NonNu
203202
*/
204203
static <@NonNull T> Streamable<T> concat(Streamable<? extends Streamable<? extends T>> sources, ExecutorService exec) {
205204
return create(emitter -> {
206-
var counter = new AtomicInteger();
207205
try (var mainSource = sources.forEach(item -> {
208-
System.out.println(counter.incrementAndGet());
209206
try (var innerSource = item.forEach(inner -> {
210-
System.out.println("> " + inner);
211207
emitter.emit(inner);
212-
}, emitter.canceller(), exec)) {
208+
}, emitter.canceller().derive(), exec)) {
213209
innerSource.await(emitter.canceller());
214210
}
215211
}, emitter.canceller(), exec)) {

src/main/java/io/reactivex/rxjava4/disposables/CompositeDisposable.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,4 +268,14 @@ public void reset() {
268268
resources = null;
269269
}
270270
}
271+
272+
@Override
273+
public DisposableContainer derive() {
274+
var result = new CompositeDisposable();
275+
276+
add(result);
277+
result.add(Disposable.fromRunnable(() -> delete(result)));
278+
279+
return result;
280+
}
271281
}

src/main/java/io/reactivex/rxjava4/disposables/DisposableContainer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ public interface DisposableContainer extends Disposable {
5656
*/
5757
void clear();
5858

59+
/**
60+
* Create a derived sub container that can get cancelled by this container,
61+
* but cancelling the subcontainer does not cancel this container.
62+
* @return the derived subcontainer
63+
* @since 4.0
64+
*/
65+
DisposableContainer derive();
66+
5967
/**
6068
* Registers a {@link Disposable} with this container so that it can be removed and disposed
6169
* via a simple {@link #dispose()} call to the returned Disposable.
@@ -133,5 +141,10 @@ public void reset() {
133141
public void clear() {
134142
// Who cares?
135143
}
144+
145+
@Override
146+
public DisposableContainer derive() {
147+
return NEVER;
148+
}
136149
}
137150
}

src/main/java/io/reactivex/rxjava4/internal/disposables/ListCompositeDisposable.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,4 +198,15 @@ public void reset() {
198198
}
199199
}
200200

201+
202+
@Override
203+
public DisposableContainer derive() {
204+
var result = new ListCompositeDisposable();
205+
206+
add(result);
207+
result.add(Disposable.fromRunnable(() -> delete(result)));
208+
209+
return result;
210+
}
211+
201212
}

0 commit comments

Comments
 (0)