Spring 5.0 Projects
上QQ阅读APP看书,第一时间看更新

Hot Observable

Hot Observable, on the other hand, has the producer created or activated outside of it. Hot Observable emits the stream that is shared by all observers. Let's see the example, as follows:

public class RxJavaHotObservable1 {
public static void main(String args[]) {
Observable<Long> observableInterval = Observable.interval(2, TimeUnit.SECONDS);
PublishSubject<Long> publishSubject = PublishSubject.create();
observableInterval.subscribe(publishSubject);
publishSubject.subscribe(i -> System.out.println("Observable #1 : "+i));
addDelay(4000);
publishSubject.subscribe(i -> System.out.println("Observable #2 : "+i));
addDelay(10000);
}
private static void addDelay(int miliseconds) {
try {
Thread.sleep(miliseconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

The observableInterval observable emits the event instead of data in this example. The interval method is used to emit sequential numbers at given intervals. We have used PublishSubject to make this observable as a hot type.  It can be behave as either Observable or Observer. It is part of the Observable chain in this case. We then simply add two subscribers to PublishSubject with some delay in between. You will get an output as follows:

The second Observer is subscribed after some delay to the first Observer. The Observable emits the sequential number every two seconds. The second Observer starts at the fourth second. Hot Observable emits just a single stream, which is shared across all Observers. So, in the case of the second Observer, the actual value is started from 2 instead of 0 as it subscribes after some time. 

In this sense, hot Observable can be compared with a subscription to a radio station. A person who starts listening will not be able to hear what was played before he subscribed, as it is common to all subscribers (or say Observers in Reactive language). There are other ways to create hot Observable. We will see one of them as follows:

public class RxJavaHotObservable2 {
public static void main(String args[]) {
Observable<Long> observableInt = Observable.interval(2, TimeUnit.SECONDS);
ConnectableObservable<Long> connectableIntObservable = observableInt.publish();
connectableIntObservable.subscribe(i -> System.out.println("Observable #1 : "+i));
connectableIntObservable.connect();
addDelay(7000);
connectableIntObservable.
subscribe(i -> System.out.println("Observable #2 : "+i));
addDelay(10000);
}

private static void addDelay(int miliseconds) {
try {
Thread.sleep(miliseconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

In this code, hot Observable is created with ConnectableObservable. It will not start emitting the data until the connect method is called on it, making it more controllable. Soon after the connect method is called, it will start a single stream, which is shared across the Observers. You will get an output as follows:

You can see how the second Observer missed the first few items as it was subscribed with some delay. You can convert any cold Observable to ConnectableObservable by calling the publish method on it.