Spring 5.0 Projects
上QQ阅读APP看书,第一时间看更新

Anatomy of RxJava 

RxJava basically extends the observer pattern to support iteration on the sequence of event/data and allows the forming of sequences at the same time as abstracting away the low-level details, like threading, synchronization, concurrency, and thread safety. 

At the time of writing, the current version of RxJava-2.6 has a single dependency on Reactive Streams API and provides support for Java 6 and the later versions, along with Android 2.3+. Before going deep into RxJava, let's look at the basic building blocks of ReactiveX as follows:

  • Observable: It is  basically a data stream or in other words a source of data. It can emit the data just one time or periodically in a continuous manner, based on the configuration. Observable can send out specific data on particular events based on the operators used with Observable.  In short, Observable is the supplier of data to other components. 
  • Observer: The data stream emitted by Observable is consumed by Observers. For that, they need to subscribe to  Observable using the subscribeOn() method. One ore more observers can be subscribed to Observable. When  Observable sends the data, all registered observers receive the data with the onNext() callback method. Once the data is received, you can perform any operation on that. In case any error occurred during the transmission, observers will get the error data with the onError() callback.
  • Scheduler: They are used for thread management to achieve asynchronous programming in ReactiveX. They will instruct Observable and Observer to choose particular thread on which they can execute the operations. For that, Scheduler provide the observerOn() and scheduleOn() methods for the Observer and Observable respectively.

Let's understand these concepts with a practical example. We will create a Maven project in Eclipse with settings as follows:

We need to give RxJava specific dependency. The current version at this moment is 2.2.6. After adding the dependency,  pom.xml should look as follows:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>rx-java</groupId>
<artifactId>simple-rx-java-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>Simple RxJava demo</name>
<dependencies>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.6</version>
</dependency>
</dependencies>
</project>

Create a new Java class with an appropriate package and add the following code to it:

public class RxJavaBasics {
public static void main(String[] args) {
/* Observable */
Observable<String> adminUsers =
Observable.just("Dave",
"John",
"Nilang",
"Komal",
"David");

/* Observer in form of lambda expression */
adminUsers.subscribe(s -> System.out.println(s));
}
}

The adminUsers instance is of type Observable<String> that pushes five strings literals (name of admin users), which is essentially a data stream or the source of data. For simplicity, we have taken String Literals, but Observable can push the data or events from any source, such as a database query result, social media feed, REST API response, or anything similar.
The Observable.just() method is used to emit a fixed set of string literals. The last line of the code describes how the Observer can subscribe to Observable with the subscribe() method. It is defined as a lambda expression that specifies what to do with the string it receives from Observable. This relation can be described in the following diagram:

In this code, Observer is simply printing the string literal. RxJava provides several operators that can be used in between Observable and Observer. These operators are used to transform or manipulate each pushed data passed in between. Each operator processes the data coming from previous Observable and returns new Observable. Let's use one of the operators called map and update the code as follows:

adminUsers.map(s->s.startsWith("D") ? s:"*******")
.subscribe(s -> System.out.println(s));

In this code, the data emitted by the adminUsers observable is passed through a map operator before being sent to Observer. The map operator here provides a lambda expression, which is used to process the submitted data from adminUsers. It basically prints the return string if it starts with D or else simply returns a string with an asterisk mark (*). The  map operator returns new Observable that returns the data processed by the map operator and finally sends it to Observer. You will see the output, as follows: