Skip to content

Commit 8af1ab4

Browse files
authored
4.x: Remove Scheduler.when due to maintenance burden (#8153)
1 parent 4a1be1f commit 8af1ab4

3 files changed

Lines changed: 0 additions & 817 deletions

File tree

src/main/java/io/reactivex/rxjava4/core/Scheduler.java

Lines changed: 0 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,11 @@
1313

1414
package io.reactivex.rxjava4.core;
1515

16-
import java.util.Objects;
1716
import java.util.concurrent.*;
1817
import java.util.concurrent.atomic.AtomicReference;
1918

2019
import io.reactivex.rxjava4.annotations.*;
2120
import io.reactivex.rxjava4.disposables.Disposable;
22-
import io.reactivex.rxjava4.functions.Function;
2321
import io.reactivex.rxjava4.internal.disposables.*;
2422
import io.reactivex.rxjava4.internal.schedulers.*;
2523
import io.reactivex.rxjava4.plugins.RxJavaPlugins;
@@ -299,89 +297,6 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
299297
return periodicTask;
300298
}
301299

302-
/**
303-
* Allows the use of operators for controlling the timing around when
304-
* actions scheduled on workers are actually done. This makes it possible to
305-
* layer additional behavior on this {@link Scheduler}. The only parameter
306-
* is a function that flattens an {@link Flowable} of {@link Flowable}
307-
* of {@link Completable}s into just one {@link Completable}. There must be
308-
* a chain of operators connecting the returned value to the source
309-
* {@link Flowable} otherwise any work scheduled on the returned
310-
* {@link Scheduler} will not be executed.
311-
* <p>
312-
* When {@link Scheduler#createWorker()} is invoked a {@link Flowable} of
313-
* {@link Completable}s is onNext'd to the combinator to be flattened. If
314-
* the inner {@link Flowable} is not immediately subscribed to an calls to
315-
* {@link Worker#schedule} are buffered. Once the {@link Flowable} is
316-
* subscribed to actions are then onNext'd as {@link Completable}s.
317-
* <p>
318-
* Finally the actions scheduled on the parent {@link Scheduler} when the
319-
* inner most {@link Completable}s are subscribed to.
320-
* <p>
321-
* When the {@link Worker} is unsubscribed the {@link Completable} emits an
322-
* onComplete and triggers any behavior in the flattening operator. The
323-
* {@link Flowable} and all {@link Completable}s give to the flattening
324-
* function never onError.
325-
* <p>
326-
* Limit the amount concurrency two at a time without creating a new fix
327-
* size thread pool:
328-
*
329-
* <pre>
330-
* Scheduler limitScheduler = Schedulers.computation().when(workers -&gt; {
331-
* // use merge max concurrent to limit the number of concurrent
332-
* // callbacks two at a time
333-
* return Completable.merge(Flowable.merge(workers), 2);
334-
* });
335-
* </pre>
336-
* <p>
337-
* This is a slightly different way to limit the concurrency but it has some
338-
* interesting benefits and drawbacks to the method above. It works by
339-
* limited the number of concurrent {@link Worker}s rather than individual
340-
* actions. Generally each {@link Flowable} uses its own {@link Worker}.
341-
* This means that this will essentially limit the number of concurrent
342-
* subscribes. The danger comes from using operators like
343-
* {@link Flowable#zip(java.util.concurrent.Flow.Publisher, java.util.concurrent.Flow.Publisher, io.reactivex.rxjava4.functions.BiFunction)} where
344-
* subscribing to the first {@link Flowable} could deadlock the
345-
* subscription to the second.
346-
*
347-
* <pre>
348-
* Scheduler limitScheduler = Schedulers.computation().when(workers -&gt; {
349-
* // use merge max concurrent to limit the number of concurrent
350-
* // Flowables two at a time
351-
* return Completable.merge(Flowable.merge(workers, 2));
352-
* });
353-
* </pre>
354-
*
355-
* Slowing down the rate to no more than 1 a second. This suffers from
356-
* the same problem as the one above I could find an {@link Flowable}
357-
* operator that limits the rate without dropping the values (aka leaky
358-
* bucket algorithm).
359-
*
360-
* <pre>
361-
* Scheduler slowScheduler = Schedulers.computation().when(workers -&gt; {
362-
* // use concatenate to make each worker happen one at a time.
363-
* return Completable.concat(workers.map(actions -&gt; {
364-
* // delay the starting of the next worker by 1 second.
365-
* return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
366-
* }));
367-
* });
368-
* </pre>
369-
*
370-
* <p>History: 2.0.1 - experimental
371-
* @param <S> a Scheduler and a Subscription
372-
* @param combine the function that takes a two-level nested Flowable sequence of a Completable and returns
373-
* the Completable that will be subscribed to and should trigger the execution of the scheduled Actions.
374-
* @return the Scheduler with the customized execution behavior
375-
* @throws NullPointerException if {@code combine} is {@code null}
376-
* @since 2.1
377-
*/
378-
@SuppressWarnings("unchecked")
379-
@NonNull
380-
public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flowable<Completable>>, Completable> combine) {
381-
Objects.requireNonNull(combine, "combine is null");
382-
return (S) new SchedulerWhen(combine, this);
383-
}
384-
385300
/**
386301
* Turn this {@code Scheduler} into an {@link ExecutorService} implementation
387302
* using its various *Direct() methods instead of workers.

0 commit comments

Comments
 (0)