Rxjava 2.x 源码系列 – 变换操作符 Map(上)

作者 : 开心源码 本文共4196个字,预计阅读时间需要11分钟 发布时间: 2022-05-11 共184人阅读

Rxjava 2.x 源码系列 – 基础框架分析

Rxjava 2.x 源码系列 – 线程切换 (上)

Rxjava 2.x 源码系列 – 线程切换 (下)

Rxjava 2.x 源码系列 – 变换操作符 Map(上)

前言

在前几篇博客中,我们详情了 Rxjava Observable 与 Observer 之间是如何订阅与取消订阅的,以及 Rxjava 是如何控制 subsribe 线程和 observer 的回调线程的。

今天,让我们一起来看一下 Rxjava 中另外一个比较重要的功可以,操作符变化功可以


基础知识

常使用的变换操作符

操作符作使用
map映射,将一种类型的数据流/Observable映射为另外一种类型的数据流/Observable
cast强转 传入一个class,对Observable的类型进行强转.
flatMap平铺映射,从数据流的每个数据元素中映射出多个数据,并将这些数据依次发射。(注意是无序的)
concatMapconcatMap 与 flatMap 的功可以非常相似,只不过发送的数据是有序的
buffer缓存/打包 按照肯定规则从Observable收集少量数据到一个集合,而后把这些数据作为集合打包发射。
groupby分组,将原来的Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据
to…将数据流中的对象转换为List/SortedList/Map/MultiMap集合对象,并打包发射
timeInterval将每个数据都换为包含本次数据和离上次发射数据时间间隔的对象并发射
timestamp将每个数据都转换为包含本次数据和发射数据时的时间戳的对象并发射

从 Demo 说起

接下来,我们一起来看一下一个 demo,我们通过 map 操作符将 Integer 转化为 String。

// 采使用RxJava基于事件流的链式操作Observable.create(new ObservableOnSubscribe<Integer>() {    // 1. 被观察者发送事件 = 参数为整型 = 1、2、3    @Override    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {        emitter.onNext(1);        emitter.onNext(2);        emitter.onNext(3);    }    // 2. 用Map变换操作符中的Function函数对被观察者发送的事件进行统一变换:整型变换成字符串类型}).map(new Function<Integer, String>() {    @Override    public String apply(Integer integer) throws Exception {        return "用 Map变换操作符 将事件" + integer +"的参数从 整型"+integer + " 变换成 字符串类型" + integer ;    }    // 3. 观察者接收事件时,是接收到变换后的事件 = 字符串类型}).subscribe(new Observer<String>() {    @Override    public void onSubscribe(Disposable d) {            }    @Override    public void onNext(String s) {        Log.d(TAG, s);    }    @Override    public void onError(Throwable e) {    }    @Override    public void onComplete() {    }});

输出结果

用 Map变换操作符 将事件1的参数从 整型1 变换成 字符串类型1用 Map变换操作符 将事件2的参数从 整型2 变换成 字符串类型2用 Map变换操作符 将事件3的参数从 整型3 变换成 字符串类型3

map 源码分析

  • 借鉴前面几篇博客的分析,我们先来看一下 Observable 的 map 方法,它的套路跟 create 方法的套路也是类似的,判空能否为 null,为 null 抛出异常。
  • 接着,使用一个包装类包装当前的 Observable 实例,只不过这个包装类是 ObservableMap。在 ObsevableMap 里面持有上游 observable 实例的引使用,这个是典型的装饰者模式. 关于装饰者模式,能参考我的这一篇博客。装饰者模式及其应使用
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {    ObjectHelper.requireNonNull(mapper, "mapper is null");    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));}

接下来,我们一起来看一下 ObservableMap。

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {    final Function<? super T, ? extends U> function;    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {        super(source);        this.function = function;    }    @Override    public void subscribeActual(Observer<? super U> t) {        source.subscribe(new MapObserver<T, U>(t, function));    }}

在前面博客中,我们已经说到,当我们调使用 observable.subscribe(observer) 的时候,代码调使用逻辑是这样的。

在 observable 的 subscribeActual 方法中

  • 假如有上游的话,会调使用上游的 subscribe 方法(即 source.subscribe() 方法),而在 subscribe 方法中,又会调使用当前 observable 的 subcribeActual 方法
  • 假如没有上游的话,会直接调使用当前 Observable 的 subscirbe 方法,并调使用 observable 的 onSuscribe 方法

observable 的 subscribe 流程图

在 ObservableMap 的 subscribeActual 方法里面,MapObserver 类对 Observer 进行包装,又是这样的套路,装饰者模式。

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {    final Function<? super T, ? extends U> mapper;    MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {        super(actual);        this.mapper = mapper;    }    @Override    public void onNext(T t) {       // 1 判断能否 done,假如已经 done ,直接返回        if (done) {            return;        }        if (sourceMode != NONE) {            actual.onNext(null);            return;        }        U v;              try {            // 2 调使用 mapper.apply(t) ,进行相应的转化            v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");        } catch (Throwable ex) {            fail(ex);            return;        }        // 3 调使用下游的 onNext 方法,并将 V 暴露出去        actual.onNext(v);    }            ----}

首先看他的构造方法,有两个参数, actual,mapper。 actual 代表下游的 Observer,mapper 为传入的 Function。

接着我们来看下 onNext 方法

  1. 判断能否 done,假如已经 done ,直接返回
  2. 调使用 mapper.apply(t) ,进行相应的转化
  3. 调使用下游的 onNext 方法,并将 V 暴露出去

这样就完成了操作符的操作功可以

总结

OK,我们在回到上面的 demo,来整理一下他的流程

image

当我们调使用 observable.subscribe(observer) 的时候

  • 会促发第二个 Observable 的 subscribeAtActual 方法,在该方法中,又会调使用上游 Observable 的 subscribe 方法,即第一个 Observable 的 subscribe 方法
  • 在第一个 Observable 的 subscribe 方法里面,又会调使用当前 Observable 的 subscribeAtActual 方法,会调使用 observer.onSubscribe(parent) 方法,并调使用 source.subscribe(parent) 将我们的 observer 的包装类 parent 暴露出去
  • 当我们在我们创立的 ObservableOnSubscribe 的 subscribe 方法中,调使用 emitter 的 onNext 方法的时候,这个时候会调使用到我们的 MapObserver 的 onNext 方法
  • 在 MapObserver 的 onNext 方法,有会调使用到下游 Observer 的 onNext 方法,进而调使用我们外部的 observer 的 onNext 方法

小结

  • map 的操作过程跟之前的线程切换的实现原理基本一样,通过在中间用装饰者模式插入一个中间的 Observable 和 Observer,你能想象为代理商。
  • 代理商 Observable 做的事就是接收下游 Obsever 的订阅事件,而后通过代理商 Obsever 订阅上游 Observer,而后在上游 Observer 下发数据給代理商 Observer 时,通过先调使用 mapper.apply 转换回调函数取得转换后的数据,而后下发给下游 Obsever。

最后的最后

卖一下广告,欢迎大家关注我的微信公众号,扫一扫下方二维码或者搜索微信号 stormjun,就可关注。 目前专注于 Android 开发,主要分享 Android开发相关知识和少量相关的优秀文章,包括个人总结,职场经验等。

image

说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » Rxjava 2.x 源码系列 – 变换操作符 Map(上)

发表回复