he is coding

使用RxJava

| Comments

RxJava 扩展了观察者模式,以支持对数据和事件的一系列操作

基本概念

被观察者

也就是事件源,Rxjava中用接口Publisher来表示 「被观察者」 这个概念,具体的实现类是Flowable,Flowable里面提供了很多的工厂方法来帮助我们创建各种各样的 「被观察者」

观察者

事件的消费者,Rxjava中用接口Subscriber来表示 「观察者」 这个概念

订阅

  • 先看一下代表 「被观察者/被订阅者」 的Publisher接口
1
2
3
4
5
6
7
8
9
10
public interface Publisher<T> {

/** 
* 请求 「被观察者」 开始传输数据/发出事件
* 
* 参数接收一个「观察者」对象,该 「观察者」 将收到「被观察者」发射的一系列数据/事件
*/
public void subscribe(Subscriber<? super T> s);

}
  • 再看一下 代表 「观察者」 的Subscriber接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public interface Subscriber<T> {
    
   /**
    * 订阅发生时,此方法被回调,
    * 也就是在Publisher#subscribe(Subscriber<? super T> s)被调用后
    * 这个方法被回调
    */
    public void onSubscribe(Subscription s);

   /**
    * 接收到 「被观察者」 发出的数据/事件
    */
    public void onNext(T t);

    /**
    * 整个订阅的流程中抛出的任何异常都会回调到这个方法
    * 
    */
    public void onError(Throwable t);

    /**
    * 当
    * 「被观察者」 发射数据/事件完成
    * 此方法会被回调
    * 此后 「被观察者」 不会再发出任何数据/事件
    */
    public void onComplete();
}

所以让整个流程开始运作只需要

1
2
3
publisher
    .subscribe
    (subscriber);

创建 被观察者/被订阅者

  • 用Flowable的工厂方法create

e.g. 创建一个 「被观察者」 ,他在被subscribe后会发射一个数字666

1
2
3
4
5
6
7
8
Flowable<Integer> integerFlowable 
= Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        e.onNext(666);//往下游传递数据
                        e.onComplete();//结束
                }
            }, BackpressureStrategy.LATEST);//背压策略

这里的 FlowableEmitter 暂时 可以理解成是我们在subscribe(Subscriber<? super T> s)中传入的 「观察者」 对象,看上去好像是 「被观察者」 直接持有了 「观察者」 ,然后调用他的各种方法。。。 (这里的细节后面再写。。。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
integerFlowable.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                //这里需要手动调用一下该方法,使 被观察者开始工作
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                //这里收到 被观察者/被订阅者 发出的666
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {

            }
        });

简化

  • 对于发射简单的数据,Flowable提供了很多便捷的静态方法用来创建Flowable对象, 比如像例子中的integerFlowable,可以这样写
1
Flowable<Integer> f = Flowable.just(666);
  • 很多时候我们创建 「观察者/订阅者」 时并不需要重写这么多的方法
    • 比如说每次都要调用s.request(Long.MAX_VALUE)
    • 有时候我们也并不关注onComplete
    • 我们大多数情况下只需要onNext(T t)onError(Throwable t)

因此Flowable中有很多subscribe()的方法重载

1
2
3
4
5
6
public final Disposable subscribe(
    Consumer<? super T> onNext,
    Consumer<? super Throwable> onError,
    Action onComplete,
    Consumer<? super Subscription> onSubscribe)
);

这里的四个参数就等价于Subscriber的那四个回调方法,可以只对需要处理的回调传入参数

比如我只处理结果和异常

1
2
3
4
5
6
7
8
9
10
11
12
    integerFlowable.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            //这里收到 被观察者/被订阅者 发出的666
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(@NonNull Throwable throwable) throws Exception {
            //异常处理
        }
    });

RxJava中的线程切换

RxJava在Android中常见的使用场景就是拿来异步更新UI,比如说 「被观察者/被订阅者」 从本地或者网络取数据,然后传给 「观察者/订阅者」 更新UI,但是这个取数据的过程可能是很耗时的,所以通常会让 「被观察者/被订阅者」 在子线程去处理他的逻辑,等数据ok后,「观察者/订阅者」在ui线程去更新ui

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Flowable remoteFlowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        //发起网络请求。。。。
                        e.onNext(数据);//传递数据
                        e.onComplete();//告诉订阅者,这事儿已经完成
                }
            }, BackpressureStrategy.LATEST);
            
            
remoteFlowable.subscribeOn(Schedulers.io())
              .observeOn(AndroidSchedulers.mainThread())
              .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(@NonNull Integer integer) throws Exception {
                            //更新UI
                        }
                    });

subscribeOn(Schedulers.io()) 订阅在io线程,也就是说 「被订阅者/被观察者」 将在io线程执行他的那一坨逻辑(网络请求)

observeOn(AndroidSchedulers.mainThread())观察在主线程,也就是说 「订阅者/观察者」将在主线程执行他的那一坨逻辑(更新ui)

线程如何切换

Comments