In a previous post about
Reactive programming with Java 8, we looked into reactive programming support by Reactor. Java 9 introduced reactive programming in Java with the Flow API. In this post, we will look at how the various components in the Flow API work with a few examples.
Reactive programming primarily deals with processing asynchronous streams where the application reacts to incoming stream of data. Since reactive programming model deals with streams of incoming data, there is no need to store data in memory (like a list) and iterate over the data. This results in reduction in memory usage.
Java 9 Flow API
The Java 9 Flow API is built around the java.util.concurrent.Flow class, which defines 4 interfaces.
- static interface Flow.Publisher<T> : The purpose of the publisher is to produce data and related control messages to the subscribers.
- static interface Flow.Subscriber<T>: Subscribers subscribe to a Publisher to consume data produced by the Publisher
- static interface Flow.Subscription: Subscription represents the link between a Publisher and Subscriber.
- static interface Flow.Processor<T,R>: Processor is a component which can act as a producer and consumer simultaneously.
In the following examples we will see how to use each of these interfaces.
Publisher
A publisher publishes the stream of data to the corresponding subscribers. Java 9 provides an implementation of the Publisher interface:
SubmissionPublisher
. You can initialize a publisher as shown below
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
Subscriber
We use a Subscriber to subscribe to the data that is being published by a publisher. We are required to implement
java.util.concurrent.Flow.Subscriber
interface and provide implementation for the abstract methods. In the following example of a custom Subscriber that I created, a few things are important to note.
- You can see that in the
onSubscribe()
and onNext()
methods we make a call to subscription.request(1)
. This is because without call to request() method, the subscriber will not accept any data from the publisher. - A subscriber can cancel at any time using the
cancel()
method. In this example, we cancel after processing 4 events.
package com.aoj.java9.reactive;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
public class MySubscriber<T> implements Subscriber<String> {
private Subscription subscription;
private int i = 0;
@Override
public void onComplete() {
System.out.println("Complete");
}
@Override
public void onError(Throwable e) {
System.out.println("Error ");
e.getStackTrace();
}
@Override
public void onNext(String str) {
System.out.println("String : " + str);
i++;
if (i > 3) {
System.out.println("Cancelling subscription on i = " + i);
subscription.cancel();
}
subscription.request(1);
}
@Override
public void onSubscribe(Subscription sub) {
this.subscription = sub;
System.out.println(sub);
subscription.request(1);
}
}
Subscription
Subscription represents the link between the publisher and subscriber. A subscription is created when we call the
subscribe()
method on a Publisher as shown below
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
Subscriber<String> subscriber = new MySubscriber<String>();
publisher.subscribe(subscriber);
Using Publisher and Subscriber together
The following class makes use of the
MySubscriber
class and uses the
SubmissionPublisher
publish a stream of strings from a list.
- The
Thread.sleep
is used, since this is an asynchronous model and this simple program will end before the subscriber gets to process the data. We wait for 1 second, for the subscriber to complete here. publisher.close()
is used to close the stream. In this example, it might not make much sense, but in an interactive scenario, it will be advisable to close the publisher at the appropriate time.
package com.aoj.java9.reactive;
import java.util.List;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.SubmissionPublisher;
public class ReactorTest {
public static void main(String[] args) {
List<String> strings = List.of("one", "two", "three", "four", "five");
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
Subscriber<String> subscriber = new MySubscriber<String>();
publisher.subscribe(subscriber);
strings.forEach(str -> publisher.submit(str));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
publisher.close();
}
}
Awesome article I really impress it’s very informative and useful.Thanks
ReplyDeleteCustom Software Development Sydney
great information.
ReplyDeletethank you for posting.
keep sharing.
this blog has some useful stuff
ReplyDeletekeep sharing it
job guaranteed courses in bangalore
full stack developer course
You have lots of great content that is helpful to gain more knowledge. Best wishes.
ReplyDeleteExcellent blog. Thanks for sharing.
ReplyDeleteJava course in Pune
So fantastic topic because information about this website is helpful
ReplyDeleteI truly enjoy looking at on this web site, it contains fantastic articles
ReplyDeleteThe site style is perfect, the articles is actually nice :D Good job for this, cheers
ReplyDelete