RxJava は、オブザーバーパターンを利用して一連の操作を実現するため、オブザーバーパターンにおけるオブザーバー、被オブザーバー、サブスクリプション、イベントについて理解する必要があります。
Observable:オブザーバーパターンにおいて「被オブザーバー」と呼ばれる;
Observer:オブザーバーパターンにおける「オブザーバー」で、Observable から送信されるデータを受信できる;
subscribe:サブスクリプション、オブザーバーと被オブザーバーが、Observable の subscribe () メソッドを通じてサブスクライブする;
Subscriber:これもオブザーバーの一種で、2.0 では Observer との実質的な違いはありませんが、Subscriber は Flowable(これも被オブザーバーの一種)と組み合わせて使用され、Observer は Observable をサブスクライブするために使用され、Subscriber は Flowable をサブスクライブするために使用されます。
オブザーバーパターン#
rxjava の実装は主にオブザーバーパターンを通じて実現されています。
A オブジェクト(オブザーバー)は B オブジェクト(被オブザーバー)のある変化に非常に敏感であり、B が変化した瞬間に反応する必要があります。
プログラムのオブザーバーパターンでは、オブザーバーは常に被オブザーバーを監視する必要はなく、登録またはサブスクリプションの方法を採用して、被オブザーバーに「私はあなたのある状態が必要であり、それが変化したときに通知してください」と伝えます。
同時に、複数のオブザーバーが 1 つの被オブザーバーに対応することもできます。
実際、Android にも多くの組み込みのオブザーバーパターンがあります。最も明白なのはクリックイベントです。最も簡単な例を挙げると、ボタンをクリックした後にトーストを表示することです。つまり、ボタンをクリックする際に、システムに「今、トーストを表示したい」と伝えます。そうすると、トーストが表示されます。このとき、問題が生じます。私はこのボタンをリアルタイムで監視する必要がありますか?答えは「必要ありません」です。これは前述の例とは異なります。言い換えれば、私はこのボタンがクリックされたときだけ監視すればよいのです。この操作はサブスクリプションと呼ばれます。つまり、Button は setOnClickListener を通じて OnclickListener にサブスクリプションを行い、onclick メソッドを監視します。
基本的な使用法#
rxjava の基本的な実装は主に 3 つのポイントです:
- Observable(被オブザーバー)の初期化
- Observe(オブザーバー)の初期化
- 両者の間のサブスクリプション関係の確立
Observable の作成#
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hello world");
e.onComplete(); // completeを呼び出した後、以下のイベントは受け付けられなくなります
}
});
Observe の作成#
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("rxjava", "onSubscribe: " + d);
}
@Override
public void onNext(String string) {
Log.i("rxjava", "onNext: " + string);
}
@Override
public void onError(Throwable e) {
Log.i("rxjava", "onError: " + e);
}
@Override
public void onComplete() {
Log.i("rxjava", "onComplete: ");
}
};
- onSubscribe:イベントが送信される前に呼び出され、準備操作を行うために使用できます。その中の Disposable は上下流の関係を切断するために使用されます。
- onNext:通常のイベント。処理するイベントをキューに追加します。
- onError:イベントキューの例外。イベント処理中に異常が発生した場合、このメソッドが呼び出されます。同時に、キューは終了し、イベントが発生することは許可されません。
- onComplete:イベントキューの完了。rxjava は各イベントを個別に処理するだけでなく、それらをキューとして扱います。onNext イベントが発生しなくなったとき、onComplete メソッドをトリガーして完了を示します。
サブスクリプションの作成#
observable.subscribe(observer);
結果#
最初に onSubscribe が呼び出され、次に onNext が実行され、最後に onComplete で終了します:
スレッドのスケジューリング#
- subscribeOn () はイベントを発生させるスレッドを指定し、observerOn はサブスクライバーがイベントを受信するスレッドを指定します。
- 複数回発生させるスレッドを指定しても、最初に指定したものだけが有効です。つまり、subscribeOn () を複数回呼び出しても、最初のものだけが有効で、残りは無視されます。
- ただし、サブスクライバーが受信するスレッドを複数回指定することは可能です。つまり、observerOn () を 1 回呼び出すごとに、下流のスレッドが 1 回切り替わります。
rxjava には、私たちが選択できるいくつかのスレッドがすでに組み込まれています:
- Schedulers.io () は IO 操作のスレッドを表し、通常はネットワーク、ファイルの読み書きなどの IO 集約型操作に使用されます。
- Schedulers.computation () は CPU 計算集約型操作を表し、大量の計算を必要とする操作に使用されます。
- Schedulers.newThread () は通常の新しいスレッドを表します。
- AndroidSchedulers.mainThread () は Android のメインスレッドを表します。
例:#
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hello world "+ Thread.currentThread().getName());
e.onComplete(); // completeを呼び出した後、以下のイベントは受け付けられなくなります
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread()) // メインスレッドに切り替え
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("rxjava","メインスレッド "+Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("rxjava","スレッド "+Thread.currentThread().getName());
}
});
結果#
2020-03-13 17:06:23.873 8760-8760/com.example.rxjava_test I/rxjava: onSubscribe: io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver@92153c7
2020-03-13 17:06:23.980 8760-8760/com.example.rxjava_test I/rxjava: メインスレッド main
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: スレッド RxCachedThreadScheduler-1
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: onNext: hello world RxNewThreadScheduler-1
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: onComplete:
rxjava と retrofit の組み合わせ#
サーバーからのデータを受信するクラスの作成#
public class TranslationBean {
private int errorCode; // エラー返却コード
private String query; // 元言語
private List<String> translation; // 翻訳結果
private basicEntity basic;
private List<WebEntity> web;
private String tSpeakUrl;
public class basicEntity {
@SerializedName("us-phonetic")
private String usPhonetic; // 英式音標
@SerializedName("uk-speech")
private String ukSpeech; // 美式発音
@SerializedName("us-speech")
private String usSpeech; // 英式発音
private String phonetic; // デフォルト音標
@SerializedName("uk-phonetic")
private String ukPhonetic; // 美式英標
private List<String> explains; // 基本的な意味
public List<String> getExplains() {
return explains;
}
public void setExplains(List<String> explains) {
this.explains = explains;
}
public String getPhonetic() {
return phonetic;
}
public void setPhonetic(String phonetic) {
this.phonetic = phonetic;
}
public String getUkPhonetic() {
return ukPhonetic;
}
public void setUkPhonetic(String ukPhonetic) {
this.ukPhonetic = ukPhonetic;
}
public String getUsPhonetic() {
return usPhonetic;
}
public void setUsPhonetic(String usPhonetic) {
this.usPhonetic = usPhonetic;
}
public String getUkSpeech() {
return ukSpeech;
}
public void setUkSpeech(String ukSpeech) {
this.ukSpeech = ukSpeech;
}
public String getUsSpeech() {
return usSpeech;
}
public void setUsSpeech(String usSpeech) {
this.usSpeech = usSpeech;
}
}
public class WebEntity {
private String key;
private List<String> value;
public void setKey(String key) {
this.key = key;
}
public void setValue(List<String> value) {
this.value = value;
}
public String getKey() {
return key;
}
public List<String> getValue() {
return value;
}
}
public void setQuery(String query) {
this.query = query;
}
public void setErrorCode(int errorCode) {
this.errorCode = errorCode;
}
public void setTranslation(List<String> translation) {
this.translation = translation;
}
public void setWeb(List<WebEntity> web) {
this.web = web;
}
public String getQuery() {
return query;
}
public int getErrorCode() {
return errorCode;
}
public List<String> getTranslation() {
return translation;
}
public List<WebEntity> getWeb() {
return web;
}
public basicEntity getBasic() {
return basic;
}
public void setBasic(basicEntity basic) {
this.basic = basic;
}
public void settSpeakUrl(String tSpeakUrl){ this.tSpeakUrl = tSpeakUrl;}
public String gettSpeakUrl(){ return tSpeakUrl; }
}
ネットワークリクエストを説明するインターフェースの作成#
public interface networkApi {
@GET("api?")
Observable<TranslationBean> translateYouDao(
@Query("q") String q,
@Query("from") String from,
@Query("to") String to,
@Query("appKey") String appKey, // アプリID
@Query("salt") String salt, // UUID
@Query("sign") String sign, // アプリID+input+salt+curtime+アプリ秘密鍵。 input= qの前10文字+qの長さ+qの後10文字(qの長さ>=20)またはinput = 文字列
@Query("signType") String signType, // 署名タイプ
@Query("curtime") String curtime // タイムスタンプ
);
}
Retrofit の作成#
public class netWork {
private static networkApi sContactsApi;
private static OkHttpClient okHttpClient = new OkHttpClient();
private static Converter.Factory gsonConverterFactory = GsonConverterFactory.create();
private static CallAdapter.Factory rxJavaCallAdapterFactory = RxJava2CallAdapterFactory.create();
private static class ApiClientHolder {
public static final netWork INSTANCE = new netWork();
}
public static netWork getInstance() {
return ApiClientHolder.INSTANCE;
}
public networkApi getDataService() {
if (sContactsApi == null) {
Retrofit retrofit = new Retrofit.Builder()
.client(okHttpClient)
.baseUrl(Constants.BASE_URL)
.addConverterFactory(gsonConverterFactory)
.addCallAdapterFactory(rxJavaCallAdapterFactory)
.build();
sContactsApi = retrofit.create(networkApi.class);
}
return sContactsApi;
}
}
使用方法#
@SuppressLint("CheckResult")
public void netConnection(String q,String from,String to,String salt,String sign,String curtime){
netWork.getInstance().getDataService()
.translateYouDao(q,from,to,appID,salt,sign,signType,curtime)
.subscribeOn(Schedulers.newThread()) // 新しいスレッドで実行を開始
.observeOn(AndroidSchedulers.mainThread()) // 結果の実行をメインスレッドで行う
.subscribe(new Consumer<TranslationBean>() {
@Override
public void accept(TranslationBean translationBean) throws Exception {
// 受信したデータのクラスを処理する
}
});
}