RxJava is a series of operations implemented using the observer pattern, so it's important to understand the observer, the observed, and the concepts of subscription and events in the observer pattern.
Observable: In the observer pattern, it is referred to as the "observed";
Observer: The "observer" in the observer pattern, which can receive data sent by the Observable;
subscribe: Subscription, where the observer subscribes to the observed through the Observable's subscribe() method;
Subscriber: Also a type of observer, in version 2.0 it is not substantially different from Observer, except that Subscriber is used in conjunction with Flowable (also an observed), where Observer is used to subscribe to Observable, and Subscriber is used to subscribe to Flowable.
Observer Pattern#
The implementation of rxjava is mainly through the observer pattern.
Object A (the observer) is highly sensitive to certain changes in Object B (the observed) and needs to react the moment B changes.
In the observer pattern of the program, the observer does not need to constantly monitor the observed; instead, it uses a registration or subscription method to inform the observed: I need your certain state, and you should notify me when it changes.
At the same time, we can also have multiple observers corresponding to one observed.
In fact, there are many built-in observer patterns in Android. The most obvious is the click event. For example, when you click a button, a Toast pops up. So, when we click the button, we inform the system that we need to show a toast at this moment. Then it pops up. Now, the question arises: Do I need to listen to this button in real-time? The answer is no. This is different from the previous example. In other words, I only need to listen when this button is clicked. This operation is called subscription. That is, the Button subscribes to the OnClickListener through setOnClickListener to listen for the onClick method.
Basic Usage#
The basic implementation of rxjava mainly consists of three points:
- Initialize Observable (the observed)
- Initialize Observer (the observer)
- Establish a subscription relationship between the two
Create Observable#
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hello world");
e.onComplete(); // After calling complete, no further events will be accepted
}
});
Create Observer#
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("rxjava", "onSubscribe: " + d);
}
@Override
public void onNext(String string) {
Log.i("rxjava", "onNext: " + string);
}
@Override
public void onError(Throwable e) {
Log.i("rxjava", "onError: " + e);
}
@Override
public void onComplete() {
Log.i("rxjava", "onComplete: ");
}
};
- onSubscribe: It is called before any events are sent and can be used for some preparation operations. The Disposable inside is used to sever the upstream and downstream relationship.
- onNext: Ordinary events. Adds the events to be processed to the queue.
- onError: Exception in the event queue, this method will be called when an exception occurs during event processing. The queue will also terminate, meaning no events will be emitted.
- onComplete: Event queue completion. Rxjava not only processes each event individually but also treats them as a queue. When there are no more onNext events emitted, the onComplete method needs to be triggered as a completion indicator.
Create Subscription#
observable.subscribe(observer);
Result#
First, onSubscribe is called, then onNext is executed, and finally it ends with onComplete:
Thread Scheduling#
- subscribeOn() specifies the thread that emits events, while observerOn specifies the thread that the subscriber receives events on.
- If the event-emitting thread is specified multiple times, only the first specification is effective, meaning that multiple calls to subscribeOn() will only consider the first one, and the rest will be ignored.
- However, it is possible to specify the subscriber's receiving thread multiple times, meaning that each time observerOn() is called, the downstream thread will switch once.
Rxjava has built-in several threads for us to choose from:
- Schedulers.io() represents the thread for IO operations, typically used for network, file read/write, and other IO-intensive operations.
- Schedulers.computation() represents CPU computation-intensive operations, such as operations that require a lot of calculations.
- Schedulers.newThread() represents a conventional new thread.
- AndroidSchedulers.mainThread() represents the main thread of Android.
Example:#
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hello world " + Thread.currentThread().getName());
e.onComplete(); // After calling complete, no further events will be accepted
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread()) // Switch to the main thread
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("rxjava", "Main thread " + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("rxjava", "Thread " + Thread.currentThread().getName());
}
});
Result#
2020-03-13 17:06:23.873 8760-8760/com.example.rxjava_test I/rxjava: onSubscribe: io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver@92153c7
2020-03-13 17:06:23.980 8760-8760/com.example.rxjava_test I/rxjava: Main thread main
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: Thread RxCachedThreadScheduler-1
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: onNext: hello world RxNewThreadScheduler-1
2020-03-13 17:06:23.982 8760-8826/com.example.rxjava_test I/rxjava: onComplete:
Combining rxjava and Retrofit#
Create a class to receive server return data#
public class TranslationBean {
private int errorCode; // Error return code
private String query; // Source language
private List<String> translation; // Translation result
private basicEntity basic;
private List<WebEntity> web;
private String tSpeakUrl;
public class basicEntity {
@SerializedName("us-phonetic")
private String usPhonetic; // British phonetic
@SerializedName("uk-speech")
private String ukSpeech; // American pronunciation
@SerializedName("us-speech")
private String usSpeech; // British pronunciation
private String phonetic; // Default phonetic
@SerializedName("uk-phonetic")
private String ukPhonetic; // American phonetic
private List<String> explains; // Basic meanings
public List<String> getExplains() {
return explains;
}
public void setExplains(List<String> explains) {
this.explains = explains;
}
public String getPhonetic() {
return phonetic;
}
public void setPhonetic(String phonetic) {
this.phonetic = phonetic;
}
public String getUkPhonetic() {
return ukPhonetic;
}
public void setUkPhonetic(String ukPhonetic) {
this.ukPhonetic = ukPhonetic;
}
public String getUsPhonetic() {
return usPhonetic;
}
public void setUsPhonetic(String usPhonetic) {
this.usPhonetic = usPhonetic;
}
public String getUkSpeech() {
return ukSpeech;
}
public void setUkSpeech(String ukSpeech) {
this.ukSpeech = ukSpeech;
}
public String getUsSpeech() {
return usSpeech;
}
public void setUsSpeech(String usSpeech) {
this.usSpeech = usSpeech;
}
}
public class WebEntity {
private String key;
private List<String> value;
public void setKey(String key) {
this.key = key;
}
public void setValue(List<String> value) {
this.value = value;
}
public String getKey() {
return key;
}
public List<String> getValue() {
return value;
}
}
public void setQuery(String query) {
this.query = query;
}
public void setErrorCode(int errorCode) {
this.errorCode = errorCode;
}
public void setTranslation(List<String> translation) {
this.translation = translation;
}
public void setWeb(List<WebEntity> web) {
this.web = web;
}
public String getQuery() {
return query;
}
public int getErrorCode() {
return errorCode;
}
public List<String> getTranslation() {
return translation;
}
public List<WebEntity> getWeb() {
return web;
}
public basicEntity getBasic() {
return basic;
}
public void setBasic(basicEntity basic) {
this.basic = basic;
}
public void settSpeakUrl(String tSpeakUrl){ this.tSpeakUrl = tSpeakUrl;}
public String gettSpeakUrl(){ return tSpeakUrl; }
}
Create an interface to describe network requests#
public interface networkApi {
@GET("api?")
Observable<TranslationBean> translateYouDao(
@Query("q") String q,
@Query("from") String from,
@Query("to") String to,
@Query("appKey") String appKey, // Application ID
@Query("salt") String salt, // UUID
@Query("sign") String sign, // Application ID + input + salt + curtime + application secret. input = first 10 characters of q + length of q + last 10 characters of q (length of q >= 20) or input = string
@Query("signType") String signType, // Signature type
@Query("curtime") String curtime // Timestamp
);
}
Create Retrofit#
public class netWork {
private static networkApi sContactsApi;
private static OkHttpClient okHttpClient = new OkHttpClient();
private static Converter.Factory gsonConverterFactory = GsonConverterFactory.create();
private static CallAdapter.Factory rxJavaCallAdapterFactory = RxJava2CallAdapterFactory.create();
private static class ApiClientHolder {
public static final netWork INSTANCE = new netWork();
}
public static netWork getInstance() {
return ApiClientHolder.INSTANCE;
}
public networkApi getDataService() {
if (sContactsApi == null) {
Retrofit retrofit = new Retrofit.Builder()
.client(okHttpClient)
.baseUrl(Constants.BASE_URL)
.addConverterFactory(gsonConverterFactory)
.addCallAdapterFactory(rxJavaCallAdapterFactory)
.build();
sContactsApi = retrofit.create(networkApi.class);
}
return sContactsApi;
}
}
Usage#
@SuppressLint("CheckResult")
public void netConnection(String q, String from, String to, String salt, String sign, String curtime){
netWork.getInstance().getDataService()
.translateYouDao(q, from, to, appID, salt, sign, signType, curtime)
.subscribeOn(Schedulers.newThread()) // Execution initiated in a new thread
.observeOn(AndroidSchedulers.mainThread()) // Result execution in the main thread
.subscribe(new Consumer<TranslationBean>() {
@Override
public void accept(TranslationBean translationBean) throws Exception {
// Process the received data class
}
});
}