给初学者的RxJava2.0教程(十)
Outline
[TOC]
前言
在很久以前的一篇文章中,提到过如何利用Retrofit中的GsonConverter来解决API请求错误的方法,地址在这儿,今天给大家详情另外一种优雅的方法,利用RxJava内部的RxJavaPlugins来做这么一个骚操作。
正题
说到RxJavaPlugins可能有很多朋友还很陌生,毕竟我们日常开放也不会怎样接触这个东西,但是从它的名字上来看就应该觉得它不一般,毕竟人家名字里带了一个Plugin
,废话少说,我们先来看一下这个类究竟是什么东西。
先找到这个类的位置,在io.reactivex.plugins这个包中,这个包就这一个类,再来看看类的定义:
package io.reactivex.plugins;.../** * Utility class to inject handlers to certain standard RxJava operations. */public final class RxJavaPlugins { ....}
首先映入眼帘的就是这句类注释了,来翻译一下:用于将少量骚操作注入到某些标准RxJava操作的工具类。
听上去如同很牛逼啊!我们来看一下它里面究竟写了些什么骚操作:
//代码太长了,随意粘贴几句public final class RxJavaPlugins { static volatile Consumer<? super Throwable> errorHandler; static volatile Function<? super Runnable, ? extends Runnable> onScheduleHandler; static volatile Function<? super Callable<Scheduler>, ? extends Scheduler> onInitComputationHandler; ... static volatile Function<? super Scheduler, ? extends Scheduler> onComputationHandler; static volatile Function<? super Scheduler, ? extends Scheduler> onSingleHandler; static volatile Function<? super Scheduler, ? extends Scheduler> onIoHandler; ... static volatile BiFunction<? super Flowable, ? super Subscriber, ? extends Subscriber> onFlowableSubscribe; static volatile BiFunction<? super Maybe, ? super MaybeObserver, ? extends MaybeObserver> onMaybeSubscribe; static volatile BiFunction<? super Observable, ? super Observer, ? extends Observer> onObservableSubscribe; ... public static Consumer<? super Throwable> getErrorHandler() {} public static void setErrorHandler(@Nullable Consumer<? super Throwable> handler) {} ...}
看到这里,我相信大家应该都是和我一样的想法:这他吗是啥啊。。。为什么每个字母我都认识,写到一起我就不知道什么意思了。。。
懵逼
先不慌。。我们先粗略看一下这个类的结构,emmmm…先是定义了一大堆static的变量,但是没有public出来,所以应该会有对应的getter和setter方法,如同就是这样,没毛病,好了,到此为止,这个类可以关了,也看不出啥东西来。。
既然这条路碰壁了,那我们换一条路来试试。
先来看一段正常得不能再正常的RxJava代码:
Maybe.just(1) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Real onSuccess"); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Real onError"); } });
运行的结果就是:
zlc.season.javademo D/MainActivity: Real onSuccess
看过之前的教程的都知道这个subscribe()
方法是个很重要的方法啦,那我们就来看看这个方法究竟干了啥!
之前说过,subscribe
方法有多个重载的方法,通过源码得知,这些重载的方法最后都会调用到其中的一个subscribe
方法中:
public final void subscribe(MaybeObserver<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "observer returned by the RxJavaPlugins hook is null"); try { subscribeActual(observer); } catch (NullPointerException ex) {... } catch (Throwable ex) {...} }
通过这个源码我们一下子就找到了一行关键的代码:
observer = RxJavaPlugins.onSubscribe(this, observer);
先简单解释一下,这里的this就是当前的Maybe对象,也就是我们的上游,这里的observer就是我们的下游。
这意味着什么呢,意味着RxJavaPlugins
对我们的subscribe
方法做了一个骚操作呀!
这样我们一下子就找到了RxJavaPlugins
和调用链之间的联络,接下来就需要顺藤摸瓜,更加深入的理解一下,来看一下RxJavaPlugins.onSubscribe()
的源码吧:
//为了便于了解,把源码中的范型去掉了public final class RxJavaPlugins { ... static volatile BiFunction onMaybeSubscribe; ... public static void setOnMaybeSubscribe(BiFunction onMaybeSubscribe) { RxJavaPlugins.onMaybeSubscribe = onMaybeSubscribe; } ... //source就是我们的上游,observer就是我们的下游 public static MaybeObserver onSubscribe(Maybe source, MaybeObserver observer) { BiFunction f = onMaybeSubscribe; if (f != null) { //假如onMaybeSubscribe不为空 return apply(f, source, observer); //调用apply方法创立一个新的下游 } return observer; } ... static MaybeObserver apply(BiFunction f, Maybe source, MaybeObserver observer) { return f.apply(source, observer); }}
这个代码简直不能再清晰了,大概就是假如我调用了setOnMaybeSubscribe()
设置了一个BiFunction类型
的变量onMaybeSubscribe
,那么当我调用subscribe()
方法的时候就会调用这个变量的apply()
方法来做一个骚操作
返回一个新的下游
,否则就原封不动的把原来的下游
返回。
这就给了我们无限的想象力啊,我们可以通过这个apply()
方法直接把本来的下游
返回,这样就什么也不做,也可以包装一下原来的下游
,在真正的下游的方法执行前后插入少量自己的操作
,哇哦,如同很厉害的样子。。。
那既然要包装,首先一定得有一个包装类:
class WrapDownStreamObserver<T> implements MaybeObserver<T> { private MaybeObserver<T> actual; public WrapDownStreamObserver(MaybeObserver<T> actual) { this.actual = actual; } @Override public void onSubscribe(Disposable d) { actual.onSubscribe(d); } @Override public void onSuccess(T t) { Log.d(TAG, "Hooked onSuccess"); actual.onSuccess(t); } @Override public void onError(Throwable e) { Log.d(TAG, "Hooked onError"); actual.onError(e); } @Override public void onComplete() { Log.d(TAG, "Hooked onComplete"); actual.onComplete(); } }
这就是一个简单的包装类了,它和下游都是同样的类型,并且内部持有真正的下游,我们在真正的下游方法调用前都插入了一条日志。
有了包装类,那么我们即可以调用RxJavaPlugins的setOnMaybeSubscribe()方法来做骚操作了:
RxJavaPlugins.setOnMaybeSubscribe(new BiFunction<Maybe, MaybeObserver, MaybeObserver>() { @Override public MaybeObserver apply(Maybe maybe, MaybeObserver maybeObserver) throws Exception { return new WrapDownStreamObserver(maybeObserver); //这个maybeObserver就是我们真正的下游 } });
接下来就是拭目以待的运行结果啦:
zlc.season.javademo D/MainActivity: Hooked onSuccesszlc.season.javademo D/MainActivity: Real onSuccess
哈哈,果不其然,不愧是骚操作!!果然在真正的下游执行前先去执行了包装类里的代码,似乎已经看见了胜利的曙光!!
不过刚才的是在同一个线程的代码,我们再来一个带有线程切换的代码验证一下:
Maybe.just(1) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Real onSuccess"); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Real onError"); } });
当我们满怀信心的时候,生活总是会给你泼一盆冷水:
zlc.season.javademo D/MainActivity: Hooked onSuccesszlc.season.javademo D/MainActivity: Hooked onSuccesszlc.season.javademo D/MainActivity: Hooked onSuccesszlc.season.javademo D/MainActivity: Real onSuccess
发生了什么?是不是代码贴错了啊?为什么会打印三次Hooked onSuccess。。。我明明只包装了一个下游呀。。。
这个问题要详细的解释清楚预计得花一段时间了,这里就直接给出答案了,由于我们使用
RxJavaPlugins
的setOnMaybeSubscribe()
方法实际上是给所有的Maybe类型的subscribe()
方法都做了一个骚操作,而在我们的RxJava调用链
中,除了我们的上游
和下游
,其实还有中游
,这些中游
位于RxJava的内部,我们每做一次链式调用,都会生成一个新的中游,因而我们的骚操作
不仅仅只对下游
生效,对这些中游
也会生效,所以出现上面的打印结果。从代码也可以看出来,我们分别调用了一次subscribeOn
和一次observeOn
,因而对应的产生了两个中游
,再加上我们自己的下游
,所以一共打印三次Hooked onSuccess也说得通。
但是虽然打印了这么多,我们还是可以从中看到,我们的骚操作仍然是有效的,在真正的下游方法执行前,仍然执行了包装类中的代码,所以我们的这个方案是完全可行的,只要要避免一下重复解决即可以了。
看到这里,广大吃瓜群众预计还是处于一脸懵逼的状态。。。这TM跟我解决API错误有啥关系?
铲你一耳屎.jpg
emmmm…目前来说如同的确没什么太大的关系。。。但是,下面这段代码看完你也许就明白了。
我们继续来看一段Retrofit请求的代码:
public interface Api { @GET Maybe<BaseResponse> getSomeThing(@Url String url); //注意这里使用的是Maybe}private void requestSomeThing(String url) { api.getSomeThing(url) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<BaseResponse>() { @Override public void accept(BaseResponse baseResponse) throws Exception { if(baseResponse.getCode()==100){ //Token 过期,跳转登录页面。。。 .... }else if(...){ ... } } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.e(TAG, "Something wrong", throwable); if (throwable instanceof ConnectionException) { Log.d(TAG, "没有网络连接"); } else if (throwable instanceof SocketTimeoutException) { Log.d(TAG, "连接超时"); } else { //... } } }); }
这是一段普通的请求代码,包含了请求成功了要判断code能否正确,判断token能否过期,请求失败了要针对不同的异常情况来做不同的解决,等一系列操作。
通过前面的铺垫,我相信大家心里都有点B number了,我们只要要把判断code能否正确,token能否过期,以及异常的情况放到包装类里,这样不就做到统一解决了吗?
先别急,可能细心一点的朋友就发现了,我们这里Api 接口定义的时候使用的是Maybe
,而我们知道,在RxJava2
中除了Maybe
,还有Single
、Completable
、Observable
、Flowable
,我们定义接口也可以写成:
public interface Api { @GET //Maybe Maybe<BaseResponse> getSomeThing(@Url String url); @GET //Observable Observable<BaseResponse> getSomeThing(@Url String url); @GET //Flowable Flowable<BaseResponse> getSomeThing(@Url String url); ...}
那是不是意味着我们要对每一个都要用RxJavaPlugin来做骚操作啊?
答案是不需要,我们只要要对Observable
做骚操作就行了!是的,就是Observable
,为什么只要要对Observable
做骚操作呢?这个答案可以从Retrofit
的RxJava2CallAdapter
中找到答案:
final class RxJava2CallAdapter<R> implements CallAdapter<R, Object> { ...... @Override public Object adapt(Call<R> call) { //这就是我们真正的上游 Observable<Response<R>> responseObservable = isAsync ? new CallEnqueueObservable<>(call) : new CallExecuteObservable<>(call); Observable<?> observable; if (isResult) { observable = new ResultObservable<>(responseObservable); } else if (isBody) { observable = new BodyObservable<>(responseObservable); } else { observable = responseObservable; } if (scheduler != null) { observable = observable.subscribeOn(scheduler); } if (isFlowable) { return observable.toFlowable(BackpressureStrategy.LATEST); } if (isSingle) { return observable.singleOrError(); } if (isMaybe) { return observable.singleElement(); } if (isCompletable) { return observable.ignoreElements(); } return observable; }}
从这个代码中可以看到,我们请求真正的上游其实是一个Observable
,我们在Api接口中定义的不论是Maybe,还是Flowable,其实都是在Observable
做了一次链式调用
而已,所以我们只要要对Observable做一个骚操作,即可以了。
所以我们先来创立一个Observer的包装类:
class ObservableSubscribeHooker<T> implements Observer<T> { private Observer<T> actual; public ObservableSubscribeHooker(Observer<T> actual) { this.actual = actual; } @Override public void onSubscribe(Disposable d) { actual.onSubscribe(d); } @Override public void onNext(T t) { hookOnNext(t); actual.onNext(t); } private void hookOnNext(T t) { if (t instanceof BaseResponse) { BaseResponse baseResponse = (BaseResponse) t; if (baseResponse.getCode() == 100) { //登录过期,跳转到登录页 ... throw new Exceptions.TokenExpired(); //注意这里的trick } } } @Override public void onError(Throwable e) { if (e instanceof ConnectException) { Log.e(TAG, "Connect failed: ", e); //解决ConnectException ... actual.onError(new Exceptions.Offline()); //注意这里的trick return; } if (e instanceof SocketTimeoutException) { Log.e(TAG, "Time out ", e); //解决SocketTimeoutException ... actual.onError(new Exceptions.TimeOut()); //注意这里的trick return; } //其他的异常解决... actual.onError(e); } @Override public void onComplete() { actual.onComplete(); } }
注意这里面的几个小Trick,通过自己设置的异常,避免了重复解决的问题,并且下游依然可以针对自己的特殊情况进行自己的特殊解决。
接下来就是设置到RxJavaPlugins中了:
public class CustomApplication extends Application { @Override public void onCreate() { super.onCreate(); RxJavaPlugins.setOnObservableSubscribe(new BiFunction<Observable, Observer, Observer>() { @Override public Observer apply(Observable observable, Observer observer) throws Exception { return new ObservableSubscribeHooker(observer); } }); }}
好啦,今天的教程就写到这里吧~
最终的demo已经上传到GitHub,地址在 链接在这里
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » 给初学者的RxJava2.0教程(十)