Spring 5.0 Projects
上QQ阅读APP看书,第一时间看更新

Reactor in action 

Let's learn more about the reactor API with a practical example. Create a new Maven project similar to what we created in the Anatomy of RxJava section. The current version of the Project Reactor at the time of writing  is 3.2.6. We need to provide a Maven dependency for the reactor as follows:

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>reactor-demo</groupId>
<artifactId>simple-reactor-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>Smiple Reactor Dmo</name>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.6.RELEASE</version>
</dependency>
</dependencies>
</project>

When we define a Reactor dependency, Reactive Streams JAR will be added as a transitive dependency. Next, is to add a Java class as follows:

public class ReactorBasic {
private static List<String> carModels = Arrays.asList(
"Era","Magna","Sportz","Astha","Astha(O)");
public static void main(String args[]) {
Flux<String> fewWords = Flux.just("Hello", "World");
Flux<String> manyWords = Flux.fromIterable(carModels);
Mono<String> singleWord = Mono.just("Single value");
fewWords.subscribe(t->System.out.println(t));
System.out.println("-----------------------------");
manyWords.subscribe(System.out::println);
System.out.println("-----------------------------");
singleWord.subscribe(System.out::println);
}
}

We have used Flux and Mono to create various publishers. The  just() method is used to populate the stream. We can also reach the iterable types (like List, Set, n) to form a data stream with the fromIterable() method. A few other methods like from(), fromArray() , and fromStream() are used to construct data streams from other producers, arrays, and existing Java streams, respectively, and can be used as follows:

public class ReactorFromOtherPublisher {
public static void main(String[] args) {
Flux<String> fewWords = Flux.just("One","Two");
/* from array */
Flux<Integer> intFlux = Flux.fromArray(new Integer[]{1,2,3,4,5,6,7});
/* from Java 8 stream */
Flux<String> strFlux = Flux.fromStream(Stream.of(
"Ten", "Hundred", "Thousand", "Ten Thousands", "Lac","Ten Lac", "Crore"));
/* from other Publisher */
Flux<String> fromOtherPublisherFlux = Flux.from(fewWords);
intFlux.subscribe(System.out::println);
strFlux.subscribe(System.out::println);
fromOtherPublisherFlux.subscribe(System.out::println);
}
}

The subscriber can be plugged with the subscribe() method. This is similar to what we have done with Observable in RxJava. With Flux, we can create a publisher with the finite or infinite stream.

We can also control to generate a stream with a value or just an empty stream. All of that can be done with a few utility methods provided by the Flux class as follows:

  • Flux.empty(): It is used to generate an empty stream having no values and only executes completion events.
  • Flux.error(): It is used to signal the error condition by generating an error stream with no any value but only errors.
  • Flux.never(): As its name suggests, it generates a stream with no events of any type.
  • Flux.defer(): It is used to construct a publisher when a subscriber makes the subscription to Flux. In short, it is lazy in nature.