Magren

Magren

Idealist & Garbage maker 🛸
twitter
jike

Android的rxjava2

RxJava 是利用观察者模式来实现一些列的操作,所以对于观察者模式中的观察者,被观察者,以及订阅、事件需要有一个了解。

Observable:在观察者模式中称为 “被观察者”;
Observer:观察者模式中的 “观察者”,可接收 Observable 发送的数据;
subscribe:订阅,观察者与被观察者,通过 Observable 的 subscribe () 方法进行订阅;
Subscriber:也是一种观察者,在 2.0 中 它与 Observer 没什么实质的区别,不同的是 Subscriber 要与 Flowable (也是一种被观察者) 联合使用,Obsesrver 用于订阅 Observable,而 Subscriber 用于订阅 Flowable.

观察者模式#

rxjava 的实现主要是通过观察者模式实现的。

A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应.
在程序的观察者模式,观察者不需要时刻盯着被观察者,而是采用注册或者称为订阅的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我
同时我们也可以多个观察者对应一个被观察者

其实在 android 中也有很多自带的观察者模式。最明显的莫过于点击事件。说个最简单的例子,点击按钮后弹一个 Toast。那么,我们在点击按钮的时候,告知系统,此时,我需要弹一个吐司。那么就这么弹出来了。那么,这个时候问题来了。我是否需要实时去监听这个按钮呢?答案是不需要的。这就和前面的举例有的差距了。换句话说。我只要在此按钮进行点击时进行监听就可以了。这种操作被称为订阅。也就是说 Button 通过 setOnClickListener 对 OnclickListener 进行了订阅了操作,来监听 onclick 方法。

基本使用#

rxjava 的基本实现主要是三点:

  • 初始化 Observable (被观察者)
  • 初始化 Observe(观察者)
  • 建立两者之间的订阅关系

创建 Observable#

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("hello world");
                e.onComplete(); //调用complete后下面将不再接受事件
            }
        });

创建 Observe#

 Observer<String>observer=new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("rxjava", "onSubscribe: " + d);
            }

            @Override
            public void onNext(String string) {
                Log.i("rxjava", "onNext: " + string);
            }

            @Override
            public void onError(Throwable e) {
                Log.i("rxjava", "onError: " + e);
            }

            @Override
            public void onComplete() {
                Log.i("rxjava", "onComplete: ");
            }
        };
  • onSubscribe:它会在事件还未发送之前被调用,可以用来做一些准备操作。而里面的 Disposable 则是用来切断上下游的关系的。
  • onNext:普通的事件。将要处理的事件添加到队列中。
  • onError:事件队列异常,在事件处理过程中出现异常情况时,此方法会被调用。同时队列将会终止,也就是不允许在有事件发出。
  • onComplete:事件队列完成。rxjava 不仅把每个事件单独处理。而且会把他们当成一个队列。当不再有 onNext 事件发出时,需要触发 onComplete 方法作为完成标识。

创建订阅#

observable.subscribe(observer);

结果#

先调用 onSubscribe,然后走了 onNext,最后以 onComplete 收尾:
rxjava_result.png

线程的调度#

  • subscribeOn () 指定的是发射事件的线程,observerOn 指定的就是订阅者接收事件的线程。
  • 多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn () 只有第一次的有效,其余的会被忽略。
  • 但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn (),下游的线程就会切换一次。

rxjava 中已经内置了一些线程供我们选择:

  • Schedulers.io () 代表 io 操作的线程,通常用于网络,读写文件等 io 密集型的操作
  • Schedulers.computation () 代表 CPU 计算密集型的操作,例如需要大量计算的操作
  • Schedulers.newThread () 代表一个常规的新线程
  • AndroidSchedulers.mainThread () 代表 Android 的主线程

例子:#

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("hello world   "+ Thread.currentThread().getName());
                e.onComplete(); //调用complete后下面将不再接受事件
            }
        }).subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread()) //切换到主线程
                .doOnNext(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.i("rxjava","主线程   "+Thread.currentThread().getName());
                    }
                })
                .observeOn(Schedulers.io())
                .doOnNext(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.i("rxjava","thread   "+Thread.currentThread().getName());
                    }
                });

结果#

2020-03-13 17:06:23.873 8760-8760/com.example.rxjava_test I/rxjava: onSubscribe: io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver@92153c7
2020-03-13 17:06:23.980 8760-8760/com.example.rxjava_test I/rxjava: 主线程   main
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: thread   RxCachedThreadScheduler-1
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: onNext: hello world   RxNewThreadScheduler-1
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: onComplete:

结合 rxjava 和 retorfit#

创建接收服务器返回数据的类#

public class TranslationBean {

    private int errorCode; //错误返回码
    private String query;     //源语言
    private List<String> translation; //翻译结果
    private basicEntity basic;
    private List<WebEntity> web;
    private String tSpeakUrl;

    public class basicEntity {
        @SerializedName("us-phonetic")
        private String usPhonetic;  //英式音标
        @SerializedName("uk-speech")
        private String ukSpeech;    //美式发音
        @SerializedName("us-speech")
        private String usSpeech;  //英式发音
        private String phonetic;   //默认音标
        @SerializedName("uk-phonetic")
        private String ukPhonetic;   //美式英标

        private List<String> explains;  //基本释义


        public List<String> getExplains() {
            return explains;
        }

        public void setExplains(List<String> explains) {
            this.explains = explains;
        }

        public String getPhonetic() {
            return phonetic;
        }

        public void setPhonetic(String phonetic) {
            this.phonetic = phonetic;
        }

        public String getUkPhonetic() {
            return ukPhonetic;
        }

        public void setUkPhonetic(String ukPhonetic) {
            this.ukPhonetic = ukPhonetic;
        }

        public String getUsPhonetic() {
            return usPhonetic;
        }

        public void setUsPhonetic(String usPhonetic) {
            this.usPhonetic = usPhonetic;
        }

        public String getUkSpeech() {
            return ukSpeech;
        }

        public void setUkSpeech(String ukSpeech) {
            this.ukSpeech = ukSpeech;
        }

        public String getUsSpeech() {
            return usSpeech;
        }

        public void setUsSpeech(String usSpeech) {
            this.usSpeech = usSpeech;
        }

    }
    public class WebEntity {
        private String key;
        private List<String> value;

        public void setKey(String key) {
            this.key = key;
        }

        public void setValue(List<String> value) {
            this.value = value;
        }

        public String getKey() {
            return key;
        }

        public List<String> getValue() {
            return value;
        }
    }
    public void setQuery(String query) {
        this.query = query;
    }

    public void setErrorCode(int errorCode) {
        this.errorCode = errorCode;
    }

    public void setTranslation(List<String> translation) {
        this.translation = translation;
    }

    public void setWeb(List<WebEntity> web) {
        this.web = web;
    }

    public String getQuery() {
        return query;
    }

    public int getErrorCode() {
        return errorCode;
    }

    public List<String> getTranslation() {
        return translation;
    }

    public List<WebEntity> getWeb() {
        return web;
    }

    public basicEntity getBasic() {
        return basic;
    }

    public void setBasic(basicEntity basic) {
        this.basic = basic;
    }

    public void settSpeakUrl(String tSpeakUrl){ this.tSpeakUrl = tSpeakUrl;}

    public String gettSpeakUrl(){ return tSpeakUrl; }
}

创建用于描述网络请求的接口#

public interface networkApi {
    @GET("api?")
    Observable<TranslationBean> translateYouDao(
            @Query("q") String q,
            @Query("from") String from,
            @Query("to") String to,
            @Query("appKey") String appKey, //应用ID
            @Query("salt") String salt,  //UUID
            @Query("sign") String sign,  //应用ID+input+salt+curtime+应用密钥 。 input= q前10个字符+q长度+q后10个字符(q的长度>=20) 或input = 字符串
            @Query("signType") String signType, //签名类型
            @Query("curtime") String curtime //时间戳
    );
}

创建 Retorfit#

public class netWork {
    private static networkApi sContactsApi;
    private static OkHttpClient okHttpClient = new OkHttpClient();
    private static Converter.Factory gsonConverterFactory = GsonConverterFactory.create();
    private static CallAdapter.Factory rxJavaCallAdapterFactory = RxJava2CallAdapterFactory.create();

    private static class ApiClientHolder {
        public static final netWork INSTANCE = new netWork();
    }

    public static netWork getInstance() {
        return ApiClientHolder.INSTANCE;
    }

    public networkApi getDataService() {
        if (sContactsApi == null) {
            Retrofit retrofit = new Retrofit.Builder()
                    .client(okHttpClient)
                    .baseUrl(Constants.BASE_URL)
                    .addConverterFactory(gsonConverterFactory)
                    .addCallAdapterFactory(rxJavaCallAdapterFactory)
                    .build();
            sContactsApi = retrofit.create(networkApi.class);
        }
        return sContactsApi;
    }

}

使用#

@SuppressLint("CheckResult")
   public void netConnection(String q,String from,String to,String salt,String sign,String curtime){
       netWork.getInstance().getDataService()
               .translateYouDao(q,from,to,appID,salt,sign,signType,curtime)
               .subscribeOn(Schedulers.newThread()) //发起的执行在一个新的线程
               .observeOn(AndroidSchedulers.mainThread()) //结果的执行在主线程
               .subscribe(new Consumer<TranslationBean>() {
                   @Override
                   public void accept(TranslationBean translationBean) throws Exception {
                       //对接收到数据的类进行处理
                   }
               });
   }
加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。