Spring 5.0 Projects
上QQ阅读APP看书,第一时间看更新

Custom Observers

We have seen how data emits from Observable, passes through a down stream of operators and eventually reaches Observer. Apparently, we can say that the data is transmitted from a series of Observable because each operator returns new Observable , which forms an Observable chain. The first Observable where the emission originates is called the Observable source. Therefore, we can say that Observable.create() and Observable.just() return the Observable source.

We can provide our custom implementation to handle the Observer events as follows:

public class RxJavaCustomObserverDemo {

public static void main(String[] args) {

Observable<String> months =
Observable.just("January", "February", "March", "April",
"May","June","July","August");

Observer<String> customObserver = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(" Subscription initiated ...");
}
@Override
public void onNext(String value) {
System.out.println("The value " + value +" is received from Observable");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
};

months.filter(month -> month.endsWith("y"))
.subscribe(customObserver);
}
}

Like previous examples, we have defined the Observable with the list of months. We also defined custom Observers with an implementation of various methods that will be called on for a specific event. When we register the observer (customObserver in our case), Observable will call the onSubscribe() method on Observer

Every time when Observable emits the data, it will call onNext() of registered observers, which will then be processed by observers. On sending the last data, Observable will call the onComplete() method on Observer. In case if any error occurs in between, Observable will call onError() method on Observer.

Certainly, the data will be passed through the Observable chain. In the previous case, the data emitted from the Observable source (months in this case) will be forwarded downstream to the filter operator, which will then reach the Observer or endpoint where the data is consumed and processed. By processed, we mean the data can be saved to the database, sent as a server response, written to the external document management system, composed as a structure for UI rendering, or simply printed in the console.

You will get an output as follows:

In this example, we have used an anonymous class to provide a custom implementation of O bserver's methods. However, you can use a lambda expression for this purpose.