Java 9 Reactive Streams
October 02, 2018
This page will walk through Java 9 Reactive Streams tutorial. Reactive stream is a specification that provides a standard for asynchronous stream processing with non-blocking backpressure. Reactive stream implementations are validated against Technology Compatibility Kit (TCK). Java 9 provides Flow
class that has interrelated interfaces related to reactive stream. These interfaces are Flow.Publisher
, Flow.Processor
, Flow.Subscriber
and Flow.Subscription
. Java 9 provides SubmissionPublisher
class that is a compliant reactive-streams publisher.
Reactive Manifesto
A reactive system is
a. Responsive: System responses in-time.
b. Resilient: System responses even if there is failure.
c. Elastic: System stays responsive even if there is workload.
d. Message Driven: Reactive systems rely on asynchronous message passing.
Backpressure
Backpressure is the growing backlog of unconsumed items. Backpressure is created when producer is producing items on faster rate than the subscriber is receiving. Reactive stream specification provides non-blocking backpressure.
Technology Compatibility Kit (TCK)
Reactive streams library implementations are validated by Technology Compatibility Kit (TCK). TCK guides and helps to implement reactive stream implementation against the reactive stream specification. Java 9
SubmissionPublisher
is a compliant reactive-streams publisher.
Dynamic push/pull Model
In dynamic push/pull backpressure model, sometimes it uses pull-based model and sometimes it uses push-based model. If a system has a fast publisher and slow subscriber then dynamic push/pull Model uses pull-based approach in which subscriber always requests, how many items it can receive, to publisher and then only that number of item is produced by publisher for that subscriber. In push-based model, publisher will be slow and subscriber will be fast. In this case publisher can continue producing items as fast as it can and subscriber will receive items just-in-time. Java
Flow.Subscriber
requests given number of items from Flow.Publisher
using request
method of Flow.Subscription
.
Contents
1. Flow
Flow
is class that contains interrelated interfaces and static methods that manages publisher, subscriber and subscription. These interfaces are Flow.Publisher
, Flow.Processor
, Flow.Subscriber
and Flow.Subscription
and are related to reactive streams specifications. Publisher
has subscribe
method, Subscription
has cancel
and request
method and Subscriber
has onSubscribe
, onNext
, onError
and onComplete
method. Processor
implements all methods of Flow.Publisher
and Flow.Subscriber
. All these seven methods return void
. This is because to keep communication in one-way message style. Communication is performed in such flow control that avoids resource management problems. The interfaces inside Flow
class are as following.
Flow.Subscriber<T>: It receives messages produced by producer.
Flow.Publisher<T>: It produces items to be received by subscribers.
Flow.Processor<T,R>: It acts as both a producer and a subscriber.
Flow.Subscription: It is a message control between producer and subscriber.
Let us understand these interfaces in detail.
1.1. Flow.Subscriber
Flow.Subscriber
is the receiver of messages. It has following methods.
1.
void onSubscribe(Flow.Subscription subscription)
onSubscribe
method is invoked to enable subscriber to receive messages. Out of all methods of Flow.Subscriber
, this method is invoked first. If there is error at this step, then resulting behavior is not certain, possible that subscription will not be established or to be cancelled. Inside this method we usually call request
method of Flow.Subscription
.
2.
void onNext(T item)
onNext
method is invoked for next item of Flow.Subscription
. If this method throws an error, possibly subscription will be canceled.
3.
void onError(Throwable throwable)
onError
method is invoked when Flow.Publisher
or Flow.Subscriber
throws unrecoverable error. After getting error, no more method of Flow.Subscriber
will be executed by Flow.Subscription
. If onError
method throws exception, then resulting behavior is undefined.
4.
void onComplete()
onComplete
method is invoked when no invocation of Flow.Subscriber
method is left for a Flow.Subscription
that is not already terminated by an error. For any Flow.Subscription
error, no more Flow.Subscriber
method will run and this will not be the case of onComplete
method invocation. If onComplete
method throws error, the resulting behavior is undefined.
1.2. Flow.Publisher
Flow.Publisher
produces items to be received by subscribers. Each subscriber receives the same item in same order using onNext
method of Flow.Subscriber
unless no error occurred or subscription is cancelled. When Flow.Publisher
throws error, it will be received by onError
of Flow.Subscriber
and no other message will be received by Flow.Subscriber
. When Flow.Publisher
completes producing message normally, the onComplete
method of Flow.Subscriber
will be invoked and then no other message will be received by subscriber. Flow.Subscriber
has following method.
void subscribe(Flow.Subscriber<? super T> subscriber)
subscribe
method adds the given subscriber to the calling publisher. On successful subscription, the onSubscribe
method of Flow.Subscriber
is invoked. If there is an error to execute subscribe
method, the onError
method of Flow.Subscriber
will be invoked.
1.3. Flow.Processor
Flow.Processor
acts as both a subscriber and publisher. It inherits methods of Flow.Subscriber
and Flow.Publisher
interfaces. So Flow.Processor
contains subscribe
, onComplete
, onError
, onNext
, onSubscribe
methods.
Flow.Processor
is used for object transformation. Suppose producer produces object A and subscriber will receive object B then processor will transform object A to object B. Subscriber will subscribe to processor and processor will subscribe to producer. The items produced by publisher, will be received by processor and then processor will produce that items to be received by subscriber.
1.4. Flow.Subscription
Flow.Subscription
is a message control between Flow.Publisher
and Flow.Subscriber
. A subscriber receives a message only when requested and may cancel subscription at any time. The methods of Flow.Subscription
are invoked only by their subscribers. Find the methods of Flow.Subscription
.
1.
void request(long n)
request
method, given n numbers of items are requested by subscriber. n can be 1,2, ..n. When we pass –ve number, exception is thrown and onError
method of Flow.Subscriber
is invoked
2.
void cancel()
cancel
method cancels the subscription of subscriber and stops receiving messages. A canceled subscription will not invoke onComplete
or onError
method of Flow.Subscriber
.
2. SubmissionPublisher
SubmissionPublisher
is the implementation of Flow.Publisher
interface. It provides submitted items to subscriber. Once it is closed then no item will be delivered. All subscribers receive items in the same order as produced by producer. SubmissionPublisher
is a compliant reactive-streams publisher. By default SubmissionPublisher
uses ForkJoinPool.commonPool()
for async delivery to subscriber. We can use Executor
by passing as constructor argument while creating object of SubmissionPublisher
. We will use SubmissionPublisher
as publisher in our example.
3. Publisher and Subscriber Example
Find the diagram of communication between publisher and subscriber.News.java
package com.concretepage.reactive.ex1; import java.util.Date; public class News { private String content; private Date currentDate; public News(String content, Date currentDate) { this.content = content; this.currentDate = currentDate; } public String getContent() { return content; } public Date getCurrentDate() { return currentDate; } }
package com.concretepage.reactive.ex1; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; public class NewsSubscriber implements Subscriber<News> { private Subscription subscription; int count = 0; @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); System.out.println("Subscriber subscribed."); } @Override public void onNext(News item) { System.out.println("Received: "+item.getContent()); readNews(item.getContent()); subscription.request(1); count++; } @Override public void onError(Throwable throwable) { System.out.println("Error encountered by Publisher or Subscription: " + throwable.getMessage()); } @Override public void onComplete() { System.out.println("Subscription completed. Total Number of Subscription is "+ count); } private void readNews(String newsContent) { System.out.println("Reading:"+ newsContent); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("---Done---"); } public int getNewsCount() { return count; } }
package com.concretepage.reactive.ex1; import java.util.Date; import java.util.concurrent.SubmissionPublisher; public class ReactiveNewsApp { public static void main(String[] args) throws InterruptedException { SubmissionPublisher<News> publisher = new SubmissionPublisher<>(); NewsSubscriber subscriber = new NewsSubscriber(); publisher.subscribe(subscriber); final int NEWS_COUNT = 3; for(int i=1; i <= NEWS_COUNT; i++) { publisher.submit(new News("News-"+ i, new Date())); } //Wait to complete subscription while (subscriber.getNewsCount() != NEWS_COUNT) { Thread.sleep(50); } publisher.close(); } }
Subscriber subscribed. Received: News-1 Reading:News-1 ---Done--- Received: News-2 Reading:News-2 ---Done--- Received: News-3 Reading:News-3 ---Done--- Subscription completed. Total Number of Subscription is 3
4. Multiple Subscriber Example
In this example publisher will be subscribed by more than one news subscriber.NewsSubscriber.java
package com.concretepage.reactive.ex2; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import com.concretepage.reactive.ex1.News; public class NewsSubscriber implements Subscriber<News> { private Subscription subscription; int count = 0; private String personName; public NewsSubscriber(String personName) { this.personName = personName; } @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); System.out.println(personName + " subscribed for news."); } @Override public void onNext(News item) { System.out.println(personName + " received: "+item.getContent()); readNews(item.getContent()); subscription.request(1); count++; } @Override public void onError(Throwable throwable) { System.out.println("Error encountered by Publisher or Subscription: " + throwable.getMessage()); } @Override public void onComplete() { System.out.println("Subscription completed for "+ personName +". Total Number of Subscription is "+ count); } private void readNews(String newsContent) { System.out.println(personName + " reading: "+ newsContent); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } public int getNewsCount() { return count; } }
package com.concretepage.reactive.ex2; import java.util.Date; import java.util.concurrent.SubmissionPublisher; import com.concretepage.reactive.ex1.News; public class ReactiveNewsApp { public static void main(String[] args) throws InterruptedException { SubmissionPublisher<News> publisher = new SubmissionPublisher<>(); NewsSubscriber mahesh = new NewsSubscriber("Mahesh"); NewsSubscriber krishna = new NewsSubscriber("Krishna"); publisher.subscribe(mahesh); publisher.subscribe(krishna); final int NEWS_COUNT = 2; for(int i=1; i <= NEWS_COUNT; i++) { publisher.submit(new News("News-"+ i, new Date())); } //Wait to complete subscription while (mahesh.getNewsCount() != NEWS_COUNT || krishna.getNewsCount() != NEWS_COUNT) { Thread.sleep(50); } publisher.close(); } }
Mahesh subscribed for news. Krishna subscribed for news. Krishna received: News-1 Krishna reading: News-1 Mahesh received: News-1 Mahesh reading: News-1 Krishna received: News-2 Mahesh received: News-2 Mahesh reading: News-2 Krishna reading: News-2 Subscription completed for Mahesh. Total Number of Subscription is 2 Subscription completed for Krishna. Total Number of Subscription is 2
5. Publisher, Processor and Subscriber Example
Processor is used for transformation of object. When subscriber will receive different object than the publisher produces then we need processor. Find the diagram of communication between publisher and processor and between processor and subscriber.Article
and subscriber will receive JavaArticle
. Processor will transform Article
object into JavaArticle
object.
Article.java
package com.concretepage.reactive.ex3; public class Article { private int id; private String title; private String category; public Article(int id, String title, String category) { this.id = id; this.title = title; this.category = category; } public int getId() { return id; } public String getTitle() { return title; } public String getCategory() { return category; } }
package com.concretepage.reactive.ex3; public class JavaArticle { private int id; private String title; public JavaArticle(int id, String title) { this.id = id; this.title = title; } public int getId() { return id; } public String getTitle() { return title; } }
package com.concretepage.reactive.ex3; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; public class JavaSubscriber implements Subscriber<JavaArticle> { private Subscription subscription; int count = 0; @Override public void onSubscribe(Subscription subscription) { System.out.println("---Subscriber subscribed---"); this.subscription = subscription; subscription.request(1); } @Override public void onNext(JavaArticle item) { System.out.println("Received Java Artcile with id: "+item.getId()); readArticle(item.getTitle()); subscription.request(1); count++; } @Override public void onError(Throwable throwable) { System.out.println("Error encountered by Publisher or Subscription: " + throwable.getMessage()); } @Override public void onComplete() { System.out.println("Subscription completed. Total Number of Subscription is "+ count); } private void readArticle(String tile) { System.out.println("Reading: "+ tile); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("---Done---"); } public int getArticleCount() { return count; } }
package com.concretepage.reactive.ex3; import java.util.concurrent.Flow.Processor; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.SubmissionPublisher; import java.util.function.Function; public class ArticleTransformProcessor extends SubmissionPublisher<JavaArticle> implements Processor<Article, JavaArticle> { private Subscription subscription; final Function<Article, JavaArticle> function; public ArticleTransformProcessor(Function<Article, JavaArticle> function) { super(); this.function = function; } @Override public void onSubscribe(Subscription subscription) { System.out.println("---Processor subscribed---"); this.subscription = subscription; subscription.request(1); } @Override public void onNext(Article item) { subscription.request(1); submit(function.apply(item)); } @Override public void onError(Throwable throwable) { System.out.println("Error encountered by Publisher or Subscription: " + throwable.getMessage()); } @Override public void onComplete() { System.out.println("Processor Completed."); } }
package com.concretepage.reactive.ex3; import java.util.Arrays; import java.util.List; import java.util.concurrent.SubmissionPublisher; import java.util.function.Function; public class ReactiveArticleApp { public static void main(String[] args) { List<Article> list = Arrays.asList( new Article(1, "Java Functional Interface", "Java"), new Article(2, "Java Distinct Example", "Java"), new Article(3, "Java flatMap Example", "Java") ); Function<Article, JavaArticle> function = article -> new JavaArticle(article.getId(), article.getTitle()); ArticleTransformProcessor processor = new ArticleTransformProcessor(function); SubmissionPublisher<Article> publisher = new SubmissionPublisher<>(); JavaSubscriber javaSubscriber = new JavaSubscriber(); publisher.subscribe(processor); processor.subscribe(javaSubscriber); list.stream().forEach(article -> publisher.submit(article)); //Wait to complete subscription while (javaSubscriber.getArticleCount() != 3) { try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } } processor.close(); publisher.close(); } }
---Processor subscribed--- ---Subscriber subscribed--- Received Java Artcile with id: 1 Reading: Java Functional Interface ---Done--- Received Java Artcile with id: 2 Reading: Java Distinct Example ---Done--- Received Java Artcile with id: 3 Reading: Java flatMap Example ---Done--- Subscription completed. Total Number of Subscription is 3 Processor Completed.
6. Subscription Cancel Example
Find the example to cancel the subscription.ArticleTransformProcessor.java
package com.concretepage.reactive.ex4; import java.util.concurrent.Flow.Processor; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.SubmissionPublisher; import java.util.function.Function; import com.concretepage.reactive.ex3.Article; import com.concretepage.reactive.ex3.JavaArticle; public class ArticleTransformProcessor extends SubmissionPublisher<JavaArticle> implements Processor<Article, JavaArticle> { private Subscription subscription; final Function<Article, JavaArticle> function; int count = 0; public ArticleTransformProcessor(Function<Article, JavaArticle> function) { super(); this.function = function; } @Override public void onSubscribe(Subscription subscription) { System.out.println("---Processor subscribed---"); this.subscription = subscription; subscription.request(1); } @Override public void onNext(Article item) { count++; if (count == 2) { subscription.cancel(); } else { subscription.request(1); submit(function.apply(item)); } } @Override public void onError(Throwable throwable) { System.out.println("Error encountered by Publisher or Subscription: " + throwable.getMessage()); } @Override public void onComplete() { System.out.println("Processor Completed."); } }
package com.concretepage.reactive.ex4; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import com.concretepage.reactive.ex3.JavaArticle; public class JavaSubscriber implements Subscriber<JavaArticle> { private Subscription subscription; int count = 0; @Override public void onSubscribe(Subscription subscription) { System.out.println("---Subscriber subscribed---"); this.subscription = subscription; subscription.request(1); } @Override public void onNext(JavaArticle item) { System.out.println("Received Java Artcile with id: "+item.getId()); count++; readArticle(item.getTitle()); if (count == 1) { subscription.cancel(); } else { subscription.request(1); } } @Override public void onError(Throwable throwable) { System.out.println("Error encountered by Publisher or Subscription: " + throwable.getMessage()); } @Override public void onComplete() { System.out.println("Subscription completed. Total Number of Subscription is "+ count); } private void readArticle(String tile) { System.out.println("Reading: "+ tile); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("---Done---"); } public int getArticleCount() { return count; } }
package com.concretepage.reactive.ex4; import java.util.Arrays; import java.util.List; import java.util.concurrent.SubmissionPublisher; import java.util.function.Function; import com.concretepage.reactive.ex3.Article; import com.concretepage.reactive.ex3.JavaArticle; public class ReactiveArticleApp { public static void main(String[] args) { List<Article> list = Arrays.asList( new Article(1, "Java Functional Interface", "Java"), new Article(2, "Java Distinct Example", "Java"), new Article(3, "Java flatMap Example", "Java") ); Function<Article, JavaArticle> function = article -> new JavaArticle(article.getId(), article.getTitle()); ArticleTransformProcessor processor = new ArticleTransformProcessor(function); SubmissionPublisher<Article> publisher = new SubmissionPublisher<>(); JavaSubscriber javaSubscriber = new JavaSubscriber(); publisher.subscribe(processor); processor.subscribe(javaSubscriber); list.stream().forEach(article -> publisher.submit(article)); //Wait to complete subscription try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } processor.close(); publisher.close(); } }
---Subscriber subscribed--- ---Processor subscribed--- Received Java Artcile with id: 1 Reading: Java Functional Interface ---Done---
7. References
Java 9 Class FlowThe Reactive Manifesto