51dev.com IT技术开发者社区

51dev.com 技术开发者社区

RX操作符之辅助操作

Android互联网报道阅读(18)2019-09-18 收藏0次评论

一、materialize

Materialize将数据项和事件通知都当做数据项发射,Dematerialize刚好相反。一个合法的有限的Obversable将调用它的观察者的onNext方法零次或多次,然后调用观察者的onCompleted或onError正好一次。Materialize操作符将这一系列调用,包括原来的onNext通知和终止通知onCompleted或onError都转换为一个Observable发射的数据序列。materialize将来自原始Observable的通知转换为Notification对象,然后它返回的Observable会发射这些数据。

 Observable observable =  Observable.create(new Observable.OnSubscribe() {

            @Override
            public void call(Subscriber subscriber) {
                for (int i = 0; i < 5; i++) {
                        subscriber.onNext(i + "");
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {

                    }
                }
                subscriber.onCompleted();
            }
        });


        Subscriber subscriber = new Subscriber() {

            @Override
            public void onNext(Object v) {
                Log.e(TAG,"onNext................."+v);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable.materialize().subscribe(subscriber);

运行结果:

 

 

二、dematerialize

Dematerialize操作符是Materialize的逆向过程,它将Materialize转换的结果还原成它原本的形式。

ematerialize反转这个过程,将原始Observable发射的Notification对象还原成Observable的通知。

 

 Observable observable =  Observable.create(new Observable.OnSubscribe() {

            @Override
            public void call(Subscriber subscriber) {
                for (int i = 0; i < 5; i++) {
                    subscriber.onNext(i + "");
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {

                    }
                }
                subscriber.onCompleted();
            }
        });


        Subscriber subscriber = new Subscriber() {

            @Override
            public void onNext(Object v) {
                Log.e(TAG,"onNext................."+v);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable.materialize().dematerialize().subscribe(subscriber);

 

 

运行结果:


 

 

三、timeStamp

timestamp发射T类型数据的Observable转换为一个发射类型为Timestamped的数据的Observable,每一项都包含数据的原始发射时间

 

String[] items = {"timeStamp1","timeStamp2","timeStamp3","timeStamp4","timeStamp5"};
        Observable observable =  Observable.from(items);
        Subscriber subscriber = new Subscriber() {

            @Override
            public void onNext(Object v) {
                Log.e(TAG,"onNext................."+v);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable.timestamp().subscribe(subscriber);
运行结果:

 


 

四、serialize

一个Observable可以异步调用它的观察者的方法,可能是从不同的线程调用。这可能会让Observable行为不正确,它可能会在某一个onNext调用之前尝试调用onCompleted或onError方法,或者从两个不同的线程同时调用onNext方法。使用Serialize操作符,你可以纠正这个Observable的行为,保证它的行为是正确的且是同步的。

 

 Observable observable =  Observable.create(new Observable.OnSubscribe() {

            @Override
            public void call(Subscriber subscriber) {
                    subscriber.onNext(1);
                    subscriber.onNext(2);
                    subscriber.onNext(3);
                    subscriber.onCompleted();
                    subscriber.onNext(4);
                    subscriber.onNext(5);
                    subscriber.onCompleted();
            }
        });

        Subscriber subscriber = new Subscriber() {

            @Override
            public void onNext(Integer v) {
                Log.e(TAG,"onNext................."+v);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        Log.e(TAG,"call.................unsubscribed");
                    }
                })
                .serialize().subscribe(subscriber);

运行结果:

 

 

五、replay

保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅。可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。如果在将一个Observable转换为可连接的Observable之前对它使用Replay操作符,产生的这个可连接Observable将总是发射完整的数据序列给任何未来的观察者,即使那些观察者在这个Observable开始给其它观察者发射数据之后才订阅。
 

 

 final ConnectableObservable observable =  Observable.interval(1,TimeUnit.SECONDS).observeOn(Schedulers.newThread())
                .take(5).replay(3);

        final Subscriber subscriber1 = new Subscriber() {

            @Override
            public void onNext(Long v) {
                Log.e(TAG,"onNext1................."+v);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted1.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError1.....................");
            }
        };


        Subscriber subscriber = new Subscriber() {

            @Override
            public void onNext(Long l) {
                Log.e(TAG,"onNext................."+l);
                if(l == 3){
                    observable.subscribe(subscriber1);
                }
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };




        observable.subscribe(subscriber);
        observable.connect();

运行结果:

 

 

六、observeOn

指定一个观察者在哪个调度器上观察这个Observable

 

 Observable observable =  Observable.create(new Observable.OnSubscribe() {

            @Override
            public void call(Subscriber subscriber) {
                for (int i = 0; i < 5; i++) {
                    subscriber.onNext(i + "");
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {

                    }
                }
                subscriber.onCompleted();
            }
        }).observeOn(Schedulers.io());


        Subscriber subscriber = new Subscriber() {

            @Override
            public void onNext(Object v) {
                Log.e(TAG,"onNext................."+v+"............"+Thread.currentThread().getName());
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable.subscribe(subscriber);

运行结果:

 

 

七、subscribeon

指定Observable自身在哪个调度器上执行

 

 Observable observable =  Observable.create(new Observable.OnSubscribe() {

            @Override
            public void call(Subscriber subscriber) {
                for (int i = 0; i < 5; i++) {
                    subscriber.onNext(i + "");
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {

                    }
                    Log.e(TAG, "call................."+ Thread.currentThread().getName());
                }
                subscriber.onCompleted();
            }
        })
                .subscribeOn(Schedulers.newThread());


        Subscriber subscriber = new Subscriber() {

            @Override
            public void onNext(Object v) {
                Log.e(TAG,"onNext................."+v+"............"+Thread.currentThread().getName());
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted................."+Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError....................."+Thread.currentThread().getName());
            }
        };

        observable.subscribe(subscriber);
运行结果:

 


 

八、doOnEach

发射完一个Observable时的回调

 

 Observable observable = Observable.just(1,2,3,4,5,6);
        Subscriber subscriber = new Subscriber() {

            @Override
            public void onNext(Integer v) {
                Log.e(TAG,"onNext................."+v);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable
                .doOnEach(new Action1>() {
                    @Override
                    public void call(rx.Notification notification) {
                        int i = (int) notification.getValue();
                        Log.e(TAG, "发射了一个数据....................."+i);
                    }
                })
                .subscribe(subscriber);

运行结果:

 

 

九、doOnNext

doOnNext操作符类似于doOnEach(Action1),但是它的Action不是接受一个Notification参数,而是接受发射的数据项。

 

Observable observable = Observable.just(1,2,3,4,5,6);
        Subscriber subscriber = new Subscriber() {

            @Override
            public void onNext(Integer v) {
                Log.e(TAG,"onNext................."+v);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable
                .doOnNext(new Action1() {
                    @Override
                    public void call(Integer integer) {
                        if(integer == 3){
                            Log.e(TAG, "doOnNext.....................");
                        }
                    }
                })
                .subscribe(subscriber);
运行结果:

 

十、doOnSubscribe

 

 

订阅生成时的回调 
Observable observable = Observable.just(1,2,3,4,5,6);
        Subscriber subscriber = new Subscriber() {

            @Override
            public void onNext(Integer v) {
                Log.e(TAG,"onNext................."+v);
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted.................");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError.....................");
            }
        };

        observable
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        Log.e(TAG, "观察者订阅了它生成的Observable.....................");
                    }
                })
                .subscribe(subscriber);

运行结果:

 

以上就是RX操作符之辅助操作的全部内容,请多关注【51DEV】IT技术开发者社区。