One of the more interesting features in the upcoming release of Java 9 is the support for Reactive Programming in Java. At a high-level Reactive programming deals with asynchronous data streams. Reactive streams provides a standard for asynchronous stream processing. Java 9 supports Reactive programming implementing the Reactive Streams specification through the Flow API. But before the release of Java 9, there are ways to implement Reactive programming in Java 8. Reactor and RxJava are a couple of APIs to implement Reactive streams in Java 8. Reactor is used internally by the Spring Framework to it's own Reactive support in version 5. In this post, we will look into Reactor library.
Reactive Streams
Reactive Streams API has two high-level interfaces,Publisher
and Subscriber
. A Publisher is the source of events, while a Subscriber is the destination for those events. A Subscriber subscribes to a Publisher. When a Subscriber subscribes to a Pulisher, the Publisher invokes the onSubscribe()
method on the Subscriber. When the Subscriber is ready to start handling events, it signals this via a request to that Subscription using the request()
method. Upon receiving this signal, the Publisher begins to invoke onNext()
for each event. This continues until either completion of the stream onComplete()
or an error onError(Throwable)
.Reactor
While Reactive Streams defines the high-level interfaces, the actual implementation is not provided. The Reactor API provides implementations for these interfaces. The main Publishers provided by Reactor API areFlux
and Mono
. Flux is a publisher that can publish an unlimited number of events, while Mono can publish at most one event. Flux
There are many ways to create a Flux. Flux provides a few static methods to create instances.// Create empty Flux
Flux flux = Flux.empty();
// Create Flux with elements
Flux flux = Flux.just("one", "two", "three", "four");
// Create Flux from Iterable
Flux flux = Flux.fromIterable(Arrays.asList("one", "two", "three", "four"));
Mono
Following are a few ways to create a Mono // Create empty Mono
Mono mono = Mono.empty();
// Create mono from elements
Mono mono = Mono.just("one");
Subscriber
For this example, we will write a simple subscriber which prints the elements. To handle backpressure, this subscriber will read only one element. After processing each element, we will request one more element.package subscribers;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.SignalType;
public class StringSubscriber extends BaseSubscriber {
private Subscription subscription;
@Override
protected void hookOnSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
protected void hookOnNext(String value) {
System.out.println(Thread.currentThread().getName() + " : " + value);
// Request for one more event
subscription.request(1);
}
@Override
protected void hookOnComplete() {
System.out.println("Complete");
}
@Override
protected void hookOnError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
protected void hookOnCancel() {
System.out.println("Cancelled");
}
@Override
protected void hookFinally(SignalType type) {
System.out.println("Signal Type " + type.name());
}
}
And here is the full code for creating and subscribing to eventspackage main;
import java.util.Arrays;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import subscribers.StringSubscriber;
public class ReactorTest {
public static void main(String[] args) {
// Flux from Elements
Flux seq1 = Flux.just("one", "two", "three", "four");
seq1.subscribe(new StringSubscriber());
// Flux using generate.
Flux seq2 = Flux.generate(() -> 1, (state, sink) -> {
sink.next("x " + " = " + state);
if (state == 3)
sink.complete();
return state + 1;
});
seq2.subscribe(new StringSubscriber());
// Flux from List
Flux seq3 = Flux.fromIterable(Arrays.asList(new String[] {"one", "two", "three", "four"}));
seq3.subscribe(new StringSubscriber());
// Creating events using Emitter
EmitterProcessor seq4 = EmitterProcessor.create();
seq4.subscribe(new StringSubscriber());
seq4.onNext("1");
seq4.onNext("2");
}
}
Finally the gradlel build file used for this project
apply plugin:'application'
mainClassName = "main.ReactorTest"
applicationName = 'reactive-tests'
repositories {
jcenter()
}
dependencies {
compile group: 'io.projectreactor', name: 'reactor-core', version: '3.0.7.RELEASE'
}
No comments:
Post a Comment