he is coding

RxJava之compose操作符

| Comments

这个操作符的作用是对Publisher也就是对 「被观察者/被订阅者」 对象进行操作(区别于之前讲的map是对数据进行操作),然后返回一个新的Flowable对象

这里举一个使用场景就是代码的复用

e.g.Android中经常需要的线程切换

1
2
3
4
5
6
7
8
9
Flowable.just(666) //假装他是网络请求
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Integer>(){
            @overide
            public void accept(@NonNull Integer i) throw Exception{
                //update ui
            }
        });

这里的

1
2
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

把部分的代码可以复用,因此我们放到工具类中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class RxUtil {

 public static <T> FlowableTransformer<T, T> scheduler() {
        return new FlowableTransformer<T, T>() {
            @Override
            public Publisher<T> apply(Flowable<T> upstream) {
                return upstream
                        //代码移到这里了
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
    }
}

然后用compose操作符带上这坨代码

1
2
3
4
5
6
7
8
Flowable.just(666)
        .compose(RxUtil.<Integer>scheduler())
        .subscribe(new Consumer<Integer>(){
            @overide
            public void accept(@NonNull Integer i) throw Exception{
                //
            }
        });

compose操作符接收一个FlowableTransformer对象

1
2
3
4
5
public interface FlowableTransformer<Upstream, Downstream> {
    
    @NonNull
    Publisher<Downstream> apply(@NonNull Flowable<Upstream> upstream);
}

这里的upstream 代表的是上游的Flowable对象,本例中暂时可以理解成是integerFlowable

这里的需要一个返回值Publisher<Downstream>,给下游继续操作,我们返回的是.observeOn执行后的Flowable对象,也就是FlowableObserveOn对象

Comments