Spring 5.0 Projects
上QQ阅读APP看书,第一时间看更新

Types of subscribers

The Flux and Mono classes both allow Java 8 lambda expressions as a subscriber. They also support various overloaded versions of the subscribe() method, as per the following code.

public class ReactorWithSubscriberWays {

public static void main(String[] args) {
List<String> monthList = Arrays.asList(
"January","February","March","April","May");

Flux<String> months = Flux.fromIterable(monthList);
/* 1) No events is consumed. */
months.subscribe();
/* 2) Only value event is consumed */
months.subscribe(month->System.out.println("->"+month));

/* 3) Value and Error (total 2) events are handled */
months.subscribe(month->System.out.println("-->"+month),
e->e.printStackTrace());

/* 4) Value, Error and Completion (total 3) events are subscribed */
months.subscribe(month->System.out.println("--->"+month),
e->e.printStackTrace(),
()->System.out.println("Finished at THIRD PLACE.. !!"));

/* 5) Value, Error, Completion and Subscription (total 4) events are subscribed */
months.subscribe(month->System.out.println("---->"+month),
e->e.printStackTrace(),
()->System.out.println("Finished at FOURTH PLACE ..!!"),
s -> {System.out.println("Subscribed :");
s.request(5L);});
}
}

The Flux class is created with list of strings. There are five different variations of using the subscribe() method, and each has provision to capture various events. The detail is as follows:

  • The first version does not consume any event.
  • The second variant consumes the value event and it is defined with a lambda expression.
  • The third subscribe() method listens to error events as a second argument along with the value events. We are simply printing stack-trace through lambda expressions. 
  • The fourth one consumes value, error, and completion events. On completion of a data stream, the completion event will be executed, which we listen to with a lambda expression. 
  • The fifth version consumes value, error, completion, and subscription events. The last parameter of the Subscription type makes this version of subscribe() a special case. The Subscription type has a method called request(). The publisher will not send any event until, and unless, the subscriber sends a demand signal with a Subscription.request() call. This is only applicable if Subscription is defined for the subscriber. We have to make a method call as s.request(5L), meaning the publisher can only send five elements. It is less than than the total value in publisher and fires a completion event. In our case, the total elements in a data stream is five, and so it will call a completion event. If you pass fewer than five, you will not get a completion event call.