Wednesday, October 04, 2017

Java 9: Reactive programming with Flow API

In a previous post about Reactive programming with Java 8, we looked into reactive programming support by Reactor. Java 9 introduced reactive programming in Java with the Flow API. In this post, we will look at how the various components in the Flow API work with a few examples. Reactive programming primarily deals with processing asynchronous streams where the application reacts to incoming stream of data. Since reactive programming model deals with streams of incoming data, there is no need to store data in memory (like a list) and iterate over the data. This results in reduction in memory usage.
Java 9 Flow API
The Java 9 Flow API is built around the java.util.concurrent.Flow class, which defines 4 interfaces.
  • static interface Flow.Publisher<T> : The purpose of the publisher is to produce data and related control messages to the subscribers.
  • static interface Flow.Subscriber<T>: Subscribers subscribe to a Publisher to consume data produced by the Publisher
  • static interface Flow.Subscription: Subscription represents the link between a Publisher and Subscriber.
  • static interface Flow.Processor<T,R>: Processor is a component which can act as a producer and consumer simultaneously.
In the following examples we will see how to use each of these interfaces.
Publisher
A publisher publishes the stream of data to the corresponding subscribers. Java 9 provides an implementation of the Publisher interface: SubmissionPublisher. You can initialize a publisher as shown below
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
Subscriber
We use a Subscriber to subscribe to the data that is being published by a publisher. We are required to implement java.util.concurrent.Flow.Subscriber interface and provide implementation for the abstract methods. In the following example of a custom Subscriber that I created, a few things are important to note.
  • You can see that in the onSubscribe() and onNext() methods we make a call to subscription.request(1). This is because without call to request() method, the subscriber will not accept any data from the publisher.
  • A subscriber can cancel at any time using the cancel() method. In this example, we cancel after processing 4 events.
package com.aoj.java9.reactive;

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

public class MySubscriber<T> implements Subscriber<String> {

 private Subscription subscription;
 private int i = 0;

 @Override
 public void onComplete() {
  System.out.println("Complete");
 }

 @Override
 public void onError(Throwable e) {
  System.out.println("Error ");
  e.getStackTrace();

 }

 @Override
 public void onNext(String str) {
  System.out.println("String : " + str);
  i++;
  if (i > 3) {
   System.out.println("Cancelling subscription on i = " + i);
   subscription.cancel();
  }
  subscription.request(1);

 }

 @Override
 public void onSubscribe(Subscription sub) {
  this.subscription = sub;
  System.out.println(sub);
  subscription.request(1);
 }

}
Subscription
Subscription represents the link between the publisher and subscriber. A subscription is created when we call the subscribe() method on a Publisher as shown below
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
Subscriber<String> subscriber = new MySubscriber<String>();
publisher.subscribe(subscriber);
Using Publisher and Subscriber together
The following class makes use of the MySubscriber class and uses the SubmissionPublisher publish a stream of strings from a list.
  • The Thread.sleep is used, since this is an asynchronous model and this simple program will end before the subscriber gets to process the data. We wait for 1 second, for the subscriber to complete here.
  • publisher.close() is used to close the stream. In this example, it might not make much sense, but in an interactive scenario, it will be advisable to close the publisher at the appropriate time.
package com.aoj.java9.reactive;

import java.util.List;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.SubmissionPublisher;

public class ReactorTest {

 public static void main(String[] args) {

  List<String> strings = List.of("one", "two", "three", "four", "five");
  SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
  Subscriber<String> subscriber = new MySubscriber<String>();
  publisher.subscribe(subscriber);
  strings.forEach(str -> publisher.submit(str));
  try {
   Thread.sleep(1000);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  publisher.close();

 }
}

8 comments:

Popular Posts