Android进阶:RxJava2 源码解析 1

Android进阶:RxJava2 源码解析 1,第1张

概述本文适合使用过Rxjava2或者了解Rxjava2的基本用法的同学阅读 一.Rxjava是什么 Rxjava在GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列

本文适合使用过Rxjava2或者了解Rxjava2的基本用法的同学阅读

一.Rxjava是什么

Rxjava在GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。

通俗来说,Rxjava是一个采用了观察者模式设计处理异步的框架。链式调用设计让代码优雅易读。

举个例子:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {            @OverrIDe            public voID subscribe(ObservableEmitter<String> e) throws Exception {                e.onNext("a");            }        });        observable.subscribe(new Observer<String>() {            @OverrIDe            public voID onSubscribe(disposable d) {            }            @OverrIDe            public voID onNext(String s) {            }            @OverrIDe            public voID onError(Throwable e) {            }            @OverrIDe            public voID onComplete() {            }        });

这是Rxjava2最简单的用法:
1.创建一个Observable,重写subscribe方法,这里主要处理被观察的事件。
2.订阅这个Observable,事件会回调observer的方法,我们可以对事件做响应的处理

二.Rxjava源码解析 2.1. 创建Observable:

创建Observable用的是Observable.create(ObservableOnSubscribe<T> source)方法。这个方法的参数是ObservableOnSubscribe:

public interface ObservableOnSubscribe<T> {    /**     * Called for each Observer that subscribes.     * @param e the safe emitter instance,never null     * @throws Exception on error     */    voID subscribe(@NonNull ObservableEmitter<T> e) throws Exception;}

ObservableOnSubscribe是一个接口,唯一的方法是subscribe,参数是ObservableEmitter<T> e。ObservableEmitter是一个继承了Emitter的接口,接口Emitter里定义了onNext、onError、onComplete等方法,和Observer(观察者)的方法相对应。

public interface Emitter<T> {    /**     * Signal a normal value.     * @param value the value to signal,not null     */    voID onNext(@NonNull T value);    /**     * Signal a Throwable exception.     * @param error the Throwable to signal,not null     */    voID onError(@NonNull Throwable error);    /**     * Signal a completion.     */    voID onComplete();}

ObservableEmitter对接口Emitter进行扩展,增加了setdisposable、setCancellable等方法
基本参数了解了,现在看看create方法里面做了什么,代码如下:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));    }

调用了RxJavaPlugins的onAssembly方法。又有一个新参数ObservableCreate<T>(source),我们看看它是什么:

final class ObservableCreate<T> extends Observable<T> {    public ObservableCreate(ObservableOnSubscribe<T> source) {        this.source = source;    }}

继承了Observable,所以也是个被观察对象,在构造函数中我们看到我们new的ObservableOnSubscribe对象,被存在了ObservableCreate的source里面
那我们继续看看onAssembly方法做什么:

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {        Function<? super Observable,? extends Observable> f = onObservableAssembly;        if (f != null) {            return apply(f,source);        }        return source;    }

一个Hook方法。onObservableAssembly是一个静态变量,我们没有设置,默认为空,所以直接返回source对象。也就是说,Observable的create方法其实就是把我们ObservableOnSubscribe对象,存储在ObservableCreate对象的source里面,然后返回ObservableCreate对象。
我们知道ObservableCreate是继承Observable的,所以创建了ObservableCreate对象,我们的Observable也就创建完了。

2.2 订阅事件(被观察者)

订阅被观察者的 *** 作是observable.subscribe(new Observer<String>())。这个 *** 作符其实是个“被动”,就是事件被观察者观察。因为subscribe方法里的参数Observer才是观察者。我们也会在Observer里的各个会调方法里接收到事件相关的返回值。
我们看看subscribe方法的源码:

public final voID subscribe(Observer<? super T> observer) {        try {            subscribeActual(observer);        } catch (NullPointerException e) { // nopMD            throw e;        } catch (Throwable e) {            RxJavaPlugins.onError(e);        }    }

看代码我们知道最主要调用的方法是:subscribeActual(observer);,这个方法是Observable里的抽象方法,而此时我们的Observable是一个ObservableCreate对象(前面create方法返回的对象)。所以我们去看一下ObservableCreate里面是如何重写这个方法的。代码如下:

protected voID subscribeActual(Observer<? super T> observer) {        CreateEmitter<T> parent = new CreateEmitter<T>(observer);        observer.onSubscribe(parent);        try {            source.subscribe(parent);        } catch (Throwable ex) {            Exceptions.throwIfFatal(ex);            parent.onError(ex);        }    }

我们一看到这个方法主要做了三件事:
①创建一个CreateEmitter对象parent。
②把parent传给source的subscribe方法。上面我们知道source就是刚才存的ObservableOnSubscribe对象,subscribe也就是我们重写的方法:

@OverrIDe            public voID subscribe(ObservableEmitter<String> e) throws Exception {                e.onNext("a");            }

所以我们在这个方法里就能收到一个Createemmiter,通过CreateEmitter可以回调相应的方法。CreateEmitter是实现ObservableEmitter接口,我们看看它内部实现,如:onNext源码如下:

@OverrIDepublic voID onNext(T t) {    observer.onNext(t);}

也就是说,当我们在ObservableOnSubscribe的subscribe方法里调用ObservableEmitter的onNext方法的时候,它里面会调用observer的onNext。于是通过这样的传递,我们就能在observer里响应的回调方法里收到事件的相关状态。

至此一个简单Rxjava流式传递原理已经讲完了,总结流程如下: 使用Observbable.create方法,产生一个ObservableCreate对象,对象里存着ObservableOnSubscribe对象source。 调用ObservableCreate.subscribe方法,实际调用的是subscribeActual方法,传入一个Observer对象。 subscribeActual方法中创建一个Createemmiter对象,调用source.subscribe方法,传入Createemmiter对象。 于是我们在ObservableOnSubscribe中就接收到了一个Createemmiter,Createemmiter是Observableemmiter的子类。我们可以在这里调用Createemmiter的方法进行事件回调。 调用Createemmiter方法,实际上会调用Observer的响应的方法。也就是Createemmiter把事件状态传递给观察者。 总结

以上是内存溢出为你收集整理的Android进阶:RxJava2 源码解析 1全部内容,希望文章能够帮你解决Android进阶:RxJava2 源码解析 1所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/web/1129507.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-30
下一篇 2022-05-30

发表评论

登录后才能评论

评论列表(0条)

保存