Skip to content

Commit 5db8cc3

Browse files
committed
Support Backpressure,add ErrorHandleSubscriberOfFlowable and RetryWithDelayOfFlowable
1 parent 784d68a commit 5db8cc3

File tree

7 files changed

+145
-8
lines changed

7 files changed

+145
-8
lines changed

app/src/main/AndroidManifest.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package="me.jessyan.rxerrorhandler.demo">
44

55
<application
6+
android:name=".App"
67
android:allowBackup="true"
78
android:icon="@mipmap/ic_launcher"
89
android:label="@string/app_name"

app/src/main/java/me/jessyan/rxerrorhandler/demo/App.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
import me.jessyan.rxerrorhandler.core.RxErrorHandler;
2929
import me.jessyan.rxerrorhandler.handler.listener.ResponseErrorListener;
3030

31-
import static android.content.ContentValues.TAG;
32-
3331
/**
3432
* ================================================
3533
* Created by JessYan on 22/09/2017 15:01
@@ -38,7 +36,7 @@
3836
* ================================================
3937
*/
4038
public class App extends Application {
41-
39+
private final String TAG = getClass().getSimpleName();
4240
private RxErrorHandler mRxErrorHandler;
4341

4442
@Override

app/src/main/java/me/jessyan/rxerrorhandler/demo/MainActivity.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
import android.os.Bundle;
1919
import android.support.v7.app.AppCompatActivity;
2020

21+
import io.reactivex.Flowable;
2122
import io.reactivex.Observable;
2223
import me.jessyan.rxerrorhandler.core.RxErrorHandler;
2324
import me.jessyan.rxerrorhandler.handler.ErrorHandleSubscriber;
25+
import me.jessyan.rxerrorhandler.handler.ErrorHandleSubscriberOfFlowable;
2426
import me.jessyan.rxerrorhandler.handler.RetryWithDelay;
27+
import me.jessyan.rxerrorhandler.handler.RetryWithDelayOfFlowable;
2528

2629
/**
2730
* ================================================
@@ -48,5 +51,16 @@ public void onNext(Object o) {
4851

4952
}
5053
});
54+
55+
Flowable //Backpressure
56+
.error(new Exception("Error"))
57+
.retryWhen(new RetryWithDelayOfFlowable(3, 2))//retry(http connect timeout)
58+
.subscribe(new ErrorHandleSubscriberOfFlowable<Object>(rxErrorHandler) {
59+
@Override
60+
public void onNext(Object o) {
61+
62+
}
63+
});
64+
5165
}
5266
}

rxerrorhandler/src/main/java/me/jessyan/rxerrorhandler/handler/ErrorHandleSubscriber.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,11 @@ public void onComplete() {
4848

4949

5050
@Override
51-
public void onError(@NonNull Throwable e) {
52-
e.printStackTrace();
53-
mHandlerFactory.handleError(e);
51+
public void onError(@NonNull Throwable t) {
52+
t.printStackTrace();
53+
//如果你某个地方不想使用全局错误处理,则重写 onError(Throwable) 并将 super.onError(e); 删掉
54+
//如果你不仅想使用全局错误处理,还想加入自己的逻辑,则重写 onError(Throwable) 并在 super.onError(e); 后面加入自己的逻辑
55+
mHandlerFactory.handleError(t);
5456
}
5557
}
5658

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* Copyright 2017 JessYan
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package me.jessyan.rxerrorhandler.handler;
17+
18+
import org.reactivestreams.Subscriber;
19+
import org.reactivestreams.Subscription;
20+
21+
import io.reactivex.annotations.NonNull;
22+
import me.jessyan.rxerrorhandler.core.RxErrorHandler;
23+
24+
/**
25+
* ================================================
26+
* Created by JessYan on 9/22/2017 15:10
27+
* Contact with <mailto:jess.yan.effort@gmail.com>
28+
* Follow me on <https://github.com/JessYanCoding>
29+
* ================================================
30+
*/
31+
public abstract class ErrorHandleSubscriberOfFlowable<T> implements Subscriber<T> {
32+
private ErrorHandlerFactory mHandlerFactory;
33+
34+
public ErrorHandleSubscriberOfFlowable(RxErrorHandler rxErrorHandler){
35+
this.mHandlerFactory = rxErrorHandler.getHandlerFactory();
36+
}
37+
38+
@Override
39+
public void onSubscribe(Subscription s) {
40+
41+
}
42+
43+
@Override
44+
public void onComplete() {
45+
46+
}
47+
48+
49+
@Override
50+
public void onError(@NonNull Throwable t) {
51+
t.printStackTrace();
52+
//如果你某个地方不想使用全局错误处理,则重写 onError(Throwable) 并将 super.onError(e); 删掉
53+
//如果你不仅想使用全局错误处理,还想加入自己的逻辑,则重写 onError(Throwable) 并在 super.onError(e); 后面加入自己的逻辑
54+
mHandlerFactory.handleError(t);
55+
}
56+
}
57+

rxerrorhandler/src/main/java/me/jessyan/rxerrorhandler/handler/RetryWithDelay.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,13 @@ public RetryWithDelay(int maxRetries, int retryDelaySecond) {
4646

4747
@Override
4848
public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
49-
5049
return throwableObservable
5150
.flatMap(new Function<Throwable, ObservableSource<?>>() {
5251
@Override
5352
public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
5453
if (++retryCount <= maxRetries) {
5554
// When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).
56-
Log.d(TAG, "get error, it will try after " + retryDelaySecond
55+
Log.d(TAG, "Observable get error, it will try after " + retryDelaySecond
5756
+ " second, retry count " + retryCount);
5857
return Observable.timer(retryDelaySecond,
5958
TimeUnit.SECONDS);
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* Copyright 2017 JessYan
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* <p>
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
* <p>
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package me.jessyan.rxerrorhandler.handler;
17+
18+
import android.util.Log;
19+
20+
import org.reactivestreams.Publisher;
21+
22+
import java.util.concurrent.TimeUnit;
23+
24+
import io.reactivex.Flowable;
25+
import io.reactivex.annotations.NonNull;
26+
import io.reactivex.functions.Function;
27+
28+
/**
29+
* ================================================
30+
* Created by JessYan on 9/22/2017 15:25
31+
* Contact with <mailto:jess.yan.effort@gmail.com>
32+
* Follow me on <https://github.com/JessYanCoding>
33+
* ================================================
34+
*/
35+
public class RetryWithDelayOfFlowable implements
36+
Function<Flowable<Throwable>, Publisher<?>> {
37+
38+
public final String TAG = this.getClass().getSimpleName();
39+
private final int maxRetries;
40+
private final int retryDelaySecond;
41+
private int retryCount;
42+
43+
public RetryWithDelayOfFlowable(int maxRetries, int retryDelaySecond) {
44+
this.maxRetries = maxRetries;
45+
this.retryDelaySecond = retryDelaySecond;
46+
}
47+
48+
@Override
49+
public Publisher<?> apply(@NonNull Flowable<Throwable> throwableFlowable) throws Exception {
50+
return throwableFlowable
51+
.flatMap(new Function<Throwable, Publisher<?>>() {
52+
@Override
53+
public Publisher<?> apply(@NonNull Throwable throwable) throws Exception {
54+
if (++retryCount <= maxRetries) {
55+
// When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).
56+
Log.d(TAG, "Flowable get error, it will try after " + retryDelaySecond
57+
+ " second, retry count " + retryCount);
58+
return Flowable.timer(retryDelaySecond,
59+
TimeUnit.SECONDS);
60+
}
61+
// Max retries hit. Just pass the error along.
62+
return Flowable.error(throwable);
63+
}
64+
});
65+
}
66+
}

0 commit comments

Comments
 (0)