Java 9 Reactive Streams

By Arvind Rai, October 02, 2018
This page will walk through Java 9 Reactive Streams tutorial. Reactive stream is a specification that provides a standard for asynchronous stream processing with non-blocking backpressure. Reactive stream implementations are validated against Technology Compatibility Kit (TCK). Java 9 provides Flow class that has interrelated interfaces related to reactive stream. These interfaces are Flow.Publisher, Flow.Processor, Flow.Subscriber and Flow.Subscription. Java 9 provides SubmissionPublisher class that is a compliant reactive-streams publisher.
Reactive Manifesto
A reactive system is
a. Responsive: System responses in-time.
b. Resilient: System responses even if there is failure.
c. Elastic: System stays responsive even if there is workload.
d. Message Driven: Reactive systems rely on asynchronous message passing.
Backpressure
Backpressure is the growing backlog of unconsumed items. Backpressure is created when producer is producing items on faster rate than the subscriber is receiving. Reactive stream specification provides non-blocking backpressure.
Technology Compatibility Kit (TCK)
Reactive streams library implementations are validated by Technology Compatibility Kit (TCK). TCK guides and helps to implement reactive stream implementation against the reactive stream specification. Java 9 SubmissionPublisher is a compliant reactive-streams publisher.
Dynamic push/pull Model
In dynamic push/pull backpressure model, sometimes it uses pull-based model and sometimes it uses push-based model. If a system has a fast publisher and slow subscriber then dynamic push/pull Model uses pull-based approach in which subscriber always requests, how many items it can receive, to publisher and then only that number of item is produced by publisher for that subscriber. In push-based model, publisher will be slow and subscriber will be fast. In this case publisher can continue producing items as fast as it can and subscriber will receive items just-in-time. Java Flow.Subscriber requests given number of items from Flow.Publisher using request method of Flow.Subscription.

1. Flow

Flow is class that contains interrelated interfaces and static methods that manages publisher, subscriber and subscription. These interfaces are Flow.Publisher, Flow.Processor, Flow.Subscriber and Flow.Subscription and are related to reactive streams specifications. Publisher has subscribe method, Subscription has cancel and request method and Subscriber has onSubscribe, onNext, onError and onComplete method. Processor implements all methods of Flow.Publisher and Flow.Subscriber. All these seven methods return void. This is because to keep communication in one-way message style. Communication is performed in such flow control that avoids resource management problems. The interfaces inside Flow class are as following.
Flow.Subscriber<T>: It receives messages produced by producer.
Flow.Publisher<T>: It produces items to be received by subscribers.
Flow.Processor<T,R>: It acts as both a producer and a subscriber.
Flow.Subscription: It is a message control between producer and subscriber.

Let us understand these interfaces in detail.

1.1. Flow.Subscriber

Flow.Subscriber is the receiver of messages. It has following methods.
1.
void onSubscribe(Flow.Subscription subscription) 
onSubscribe method is invoked to enable subscriber to receive messages. Out of all methods of Flow.Subscriber, this method is invoked first. If there is error at this step, then resulting behavior is not certain, possible that subscription will not be established or to be cancelled. Inside this method we usually call request method of Flow.Subscription.
2.
void onNext(T item) 
onNext method is invoked for next item of Flow.Subscription. If this method throws an error, possibly subscription will be canceled.
3.
void onError(Throwable throwable) 
onError method is invoked when Flow.Publisher or Flow.Subscriber throws unrecoverable error. After getting error, no more method of Flow.Subscriber will be executed by Flow.Subscription. If onError method throws exception, then resulting behavior is undefined.
4.
void onComplete() 
onComplete method is invoked when no invocation of Flow.Subscriber method is left for a Flow.Subscription that is not already terminated by an error. For any Flow.Subscription error, no more Flow.Subscriber method will run and this will not be the case of onComplete method invocation. If onComplete method throws error, the resulting behavior is undefined.

1.2. Flow.Publisher

Flow.Publisher produces items to be received by subscribers. Each subscriber receives the same item in same order using onNext method of Flow.Subscriber unless no error occurred or subscription is cancelled. When Flow.Publisher throws error, it will be received by onError of Flow.Subscriber and no other message will be received by Flow.Subscriber. When Flow.Publisher completes producing message normally, the onComplete method of Flow.Subscriber will be invoked and then no other message will be received by subscriber. Flow.Subscriber has following method.
void subscribe(Flow.Subscriber<? super T> subscriber) 
subscribe method adds the given subscriber to the calling publisher. On successful subscription, the onSubscribe method of Flow.Subscriber is invoked. If there is an error to execute subscribe method, the onError method of Flow.Subscriber will be invoked.

1.3. Flow.Processor

Flow.Processor acts as both a subscriber and publisher. It inherits methods of Flow.Subscriber and Flow.Publisher interfaces. So Flow.Processor contains subscribe, onComplete, onError, onNext, onSubscribe methods.
Flow.Processor is used for object transformation. Suppose producer produces object A and subscriber will receive object B then processor will transform object A to object B. Subscriber will subscribe to processor and processor will subscribe to producer. The items produced by publisher, will be received by processor and then processor will produce that items to be received by subscriber.

1.4. Flow.Subscription

Flow.Subscription is a message control between Flow.Publisher and Flow.Subscriber. A subscriber receives a message only when requested and may cancel subscription at any time. The methods of Flow.Subscription are invoked only by their subscribers. Find the methods of Flow.Subscription.
1.
void request(long n) 
On calling request method, given n numbers of items are requested by subscriber. n can be 1,2, ..n. When we pass –ve number, exception is thrown and onError method of Flow.Subscriber is invoked
2.
void cancel() 
cancel method cancels the subscription of subscriber and stops receiving messages. A canceled subscription will not invoke onComplete or onError method of Flow.Subscriber.

2. SubmissionPublisher

SubmissionPublisher is the implementation of Flow.Publisher interface. It provides submitted items to subscriber. Once it is closed then no item will be delivered. All subscribers receive items in the same order as produced by producer. SubmissionPublisher is a compliant reactive-streams publisher. By default SubmissionPublisher uses ForkJoinPool.commonPool() for async delivery to subscriber. We can use Executor by passing as constructor argument while creating object of SubmissionPublisher. We will use SubmissionPublisher as publisher in our example.

3. Publisher and Subscriber Example

Find the diagram of communication between publisher and subscriber.
Java Reactive Streams
Now we will create a news publisher and subscriber example.
News.java
package com.concretepage.reactive.ex1;
import java.util.Date;

public class News {
   private String content;
   private Date currentDate;
   public News(String content, Date currentDate) {
	  this.content = content;
	  this.currentDate = currentDate;
   }
   public String getContent() {
	  return content;
   }
   public Date getCurrentDate() {
	  return currentDate;
   }  
} 
NewsSubscriber.java
package com.concretepage.reactive.ex1;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

public class NewsSubscriber implements Subscriber<News> {
	private Subscription subscription;
	int count = 0;
	
	@Override
	public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                subscription.request(1);
                System.out.println("Subscriber subscribed.");
	}

	@Override
	public void onNext(News item) {
		System.out.println("Received: "+item.getContent());
		readNews(item.getContent());
		subscription.request(1);
		count++;
	}

	@Override
	public void onError(Throwable throwable) {
		System.out.println("Error encountered by Publisher or Subscription: "
	              + throwable.getMessage());
	}

	@Override
	public void onComplete() {
                System.out.println("Subscription completed. Total Number of Subscription is "+ count);		
	}

	private void readNews(String newsContent) {
		System.out.println("Reading:"+ newsContent);
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("---Done---");
	}
	public int getNewsCount() {
		return count;
	}	
} 
ReactiveNewsApp.java
package com.concretepage.reactive.ex1;
import java.util.Date;
import java.util.concurrent.SubmissionPublisher;

public class ReactiveNewsApp {
	public static void main(String[] args) throws InterruptedException {
		SubmissionPublisher<News> publisher = new SubmissionPublisher<>();
		NewsSubscriber subscriber = new NewsSubscriber();
		publisher.subscribe(subscriber);
		final int NEWS_COUNT = 3;	
		for(int i=1; i <= NEWS_COUNT; i++) {
			publisher.submit(new News("News-"+ i, new Date()));
		}
		//Wait to complete subscription
		while (subscriber.getNewsCount() != NEWS_COUNT) {
			Thread.sleep(50);
		}
		publisher.close();
	}
} 
Output
Subscriber subscribed.
Received: News-1
Reading:News-1
---Done---
Received: News-2
Reading:News-2
---Done---
Received: News-3
Reading:News-3
---Done---
Subscription completed. Total Number of Subscription is 3 

4. Multiple Subscriber Example

In this example publisher will be subscribed by more than one news subscriber.
NewsSubscriber.java
package com.concretepage.reactive.ex2;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import com.concretepage.reactive.ex1.News;

public class NewsSubscriber implements Subscriber<News> {
	private Subscription subscription;
	int count = 0;
	private String personName;
	public NewsSubscriber(String personName) {
		this.personName = personName;
	}
	@Override
	public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                subscription.request(1);
                System.out.println(personName + " subscribed for news.");
	}

	@Override
	public void onNext(News item) {
		System.out.println(personName + " received: "+item.getContent());
		readNews(item.getContent());
		subscription.request(1);
		count++;
	}

	@Override
	public void onError(Throwable throwable) {
		System.out.println("Error encountered by Publisher or Subscription: "
	              + throwable.getMessage());
	}

	@Override
	public void onComplete() {
                System.out.println("Subscription completed for "+ personName +". Total Number of Subscription is "+ count);		
	}
	
	private void readNews(String newsContent) {
		System.out.println(personName + " reading: "+ newsContent);
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	public int getNewsCount() {
		return count;
	}	
} 
ReactiveNewsApp.java
package com.concretepage.reactive.ex2;
import java.util.Date;
import java.util.concurrent.SubmissionPublisher;
import com.concretepage.reactive.ex1.News;

public class ReactiveNewsApp {
	public static void main(String[] args) throws InterruptedException {
		SubmissionPublisher<News> publisher = new SubmissionPublisher<>();
		NewsSubscriber mahesh = new NewsSubscriber("Mahesh");
		NewsSubscriber krishna = new NewsSubscriber("Krishna");
		publisher.subscribe(mahesh);
		publisher.subscribe(krishna);
	        final int NEWS_COUNT = 2;	
		for(int i=1; i <= NEWS_COUNT; i++) {
			publisher.submit(new News("News-"+ i, new Date()));
		}
		//Wait to complete subscription
		while (mahesh.getNewsCount() != NEWS_COUNT || krishna.getNewsCount() != NEWS_COUNT) {
			Thread.sleep(50);
		}
		publisher.close();
	}
} 
Output
Mahesh subscribed for news.
Krishna subscribed for news.
Krishna received: News-1
Krishna reading: News-1
Mahesh received: News-1
Mahesh reading: News-1
Krishna received: News-2
Mahesh received: News-2
Mahesh reading: News-2
Krishna reading: News-2
Subscription completed for Mahesh. Total Number of Subscription is 2
Subscription completed for Krishna. Total Number of Subscription is 2 

5. Publisher, Processor and Subscriber Example

Processor is used for transformation of object. When subscriber will receive different object than the publisher produces then we need processor. Find the diagram of communication between publisher and processor and between processor and subscriber.
Java Reactive Streams
Now find the example of article publisher, processor and subscriber. In our example, publisher will produce Article and subscriber will receive JavaArticle. Processor will transform Article object into JavaArticle object.
Article.java
package com.concretepage.reactive.ex3;

public class Article {
	private int id;
	private String title;
	private String category;
	public Article(int id, String title, String category) {
		this.id = id;
		this.title = title;
		this.category = category;
	}
	public int getId() {
		return id;
	}
	public String getTitle() {
		return title;
	}
	public String getCategory() {
		return category;
	}
} 
JavaArticle.java
package com.concretepage.reactive.ex3;

public class JavaArticle {
   private int id;
   private String title;
   public JavaArticle(int id, String title) {
	  this.id = id;
	  this.title = title;
   }
   public int getId() {
	  return id;
   }
   public String getTitle() {
	  return title;
   }
} 
JavaSubscriber.java
package com.concretepage.reactive.ex3;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

public class JavaSubscriber implements Subscriber<JavaArticle> {
	private Subscription subscription;
	int count = 0;
	
	@Override
	public void onSubscribe(Subscription subscription) {
		System.out.println("---Subscriber subscribed---");
		this.subscription = subscription;
		subscription.request(1);
	}
	@Override
	public void onNext(JavaArticle item) {
		System.out.println("Received Java Artcile with id: "+item.getId());
		readArticle(item.getTitle());
		subscription.request(1);
		count++;
	}
	@Override
	public void onError(Throwable throwable) {
		System.out.println("Error encountered by Publisher or Subscription: "
	              + throwable.getMessage());
	}
	@Override
	public void onComplete() {
                System.out.println("Subscription completed. Total Number of Subscription is "+ count);		
	}

	private void readArticle(String tile) {
		System.out.println("Reading: "+ tile);
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("---Done---");
	}
	public int getArticleCount() {
		return count;
	}	
} 
ArticleTransformProcessor.java
package com.concretepage.reactive.ex3;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;

public class ArticleTransformProcessor extends SubmissionPublisher<JavaArticle> implements Processor<Article, JavaArticle> {
        private Subscription subscription;
        final Function<Article, JavaArticle> function;
    
        public ArticleTransformProcessor(Function<Article, JavaArticle> function) {
    	        super();
    	        this.function = function;
        }
	@Override
	public void onSubscribe(Subscription subscription) {
		System.out.println("---Processor subscribed---");
		this.subscription = subscription;
		subscription.request(1);
	}
	@Override
	public void onNext(Article item) {
		subscription.request(1);
                submit(function.apply(item));
	}
	@Override
	public void onError(Throwable throwable) {
		System.out.println("Error encountered by Publisher or Subscription: "
	              + throwable.getMessage());
	}
	@Override
	public void onComplete() {
	        System.out.println("Processor Completed.");
	}
} 
ReactiveArticleApp.java
package com.concretepage.reactive.ex3;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;

public class ReactiveArticleApp {
  public static void main(String[] args) {
         List<Article> list = Arrays.asList(
            new Article(1, "Java Functional Interface", "Java"),
            new Article(2, "Java Distinct Example", "Java"),
            new Article(3, "Java flatMap Example", "Java")
         );
		
	 Function<Article, JavaArticle> function = article ->
	    new JavaArticle(article.getId(), article.getTitle());

	 ArticleTransformProcessor processor = new ArticleTransformProcessor(function);
	 SubmissionPublisher<Article> publisher = new SubmissionPublisher<>();
	 JavaSubscriber javaSubscriber = new JavaSubscriber();
	 publisher.subscribe(processor);
	 processor.subscribe(javaSubscriber);
	 list.stream().forEach(article -> publisher.submit(article));
	 
	 //Wait to complete subscription
	 while (javaSubscriber.getArticleCount() != 3) {
		try {
			Thread.sleep(50);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	 }
	 processor.close();
	 publisher.close();	 
  }
} 
Output
---Processor subscribed---
---Subscriber subscribed---
Received Java Artcile with id: 1
Reading: Java Functional Interface
---Done---
Received Java Artcile with id: 2
Reading: Java Distinct Example
---Done---
Received Java Artcile with id: 3
Reading: Java flatMap Example
---Done---
Subscription completed. Total Number of Subscription is 3
Processor Completed. 

6. Subscription Cancel Example

Find the example to cancel the subscription.
ArticleTransformProcessor.java
package com.concretepage.reactive.ex4;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
import com.concretepage.reactive.ex3.Article;
import com.concretepage.reactive.ex3.JavaArticle;

public class ArticleTransformProcessor extends SubmissionPublisher<JavaArticle> implements Processor<Article, JavaArticle> {
        private Subscription subscription;
        final Function<Article, JavaArticle> function;
        int count = 0;
        public ArticleTransformProcessor(Function<Article, JavaArticle> function) {
    	        super();
    	        this.function = function;
        }
	@Override
	public void onSubscribe(Subscription subscription) {
		System.out.println("---Processor subscribed---");
		this.subscription = subscription;
		subscription.request(1);
	}
	@Override
	public void onNext(Article item) {
		count++;
		if (count == 2) {
			subscription.cancel();
		} else {
			subscription.request(1);
	        submit(function.apply(item));			
		}
	}
	@Override
	public void onError(Throwable throwable) {
		System.out.println("Error encountered by Publisher or Subscription: "
	              + throwable.getMessage());
	}
	@Override
	public void onComplete() {
	       System.out.println("Processor Completed.");
	}
} 
JavaSubscriber.java
package com.concretepage.reactive.ex4;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import com.concretepage.reactive.ex3.JavaArticle;

public class JavaSubscriber implements Subscriber<JavaArticle> {
	private Subscription subscription;
	int count = 0;
	
	@Override
	public void onSubscribe(Subscription subscription) {
		System.out.println("---Subscriber subscribed---");
		this.subscription = subscription;
		subscription.request(1);
	}
	@Override
	public void onNext(JavaArticle item) {
		System.out.println("Received Java Artcile with id: "+item.getId());
		count++;
		readArticle(item.getTitle());
		if (count == 1) {
		   subscription.cancel();	
		} else {
		   subscription.request(1);
		}
	}
	@Override
	public void onError(Throwable throwable) {
		System.out.println("Error encountered by Publisher or Subscription: "
	              + throwable.getMessage());
	}
	@Override
	public void onComplete() {
                System.out.println("Subscription completed. Total Number of Subscription is "+ count);		
	}

	private void readArticle(String tile) {
		System.out.println("Reading: "+ tile);
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("---Done---");
	}
	public int getArticleCount() {
		return count;
	}	
} 
ReactiveArticleApp.java
package com.concretepage.reactive.ex4;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
import com.concretepage.reactive.ex3.Article;
import com.concretepage.reactive.ex3.JavaArticle;

public class ReactiveArticleApp {
  public static void main(String[] args) {
         List<Article> list = Arrays.asList(
            new Article(1, "Java Functional Interface", "Java"),
            new Article(2, "Java Distinct Example", "Java"),
            new Article(3, "Java flatMap Example", "Java")
         );
		
	 Function<Article, JavaArticle> function = article ->
	     new JavaArticle(article.getId(), article.getTitle());

	 ArticleTransformProcessor processor = new ArticleTransformProcessor(function);
	 SubmissionPublisher<Article> publisher = new SubmissionPublisher<>();
	 JavaSubscriber javaSubscriber = new JavaSubscriber();
	 publisher.subscribe(processor);
	 processor.subscribe(javaSubscriber);
	 list.stream().forEach(article -> publisher.submit(article));
	 
	 //Wait to complete subscription
         try {
		Thread.sleep(5000);
	 } catch (InterruptedException e) {
		e.printStackTrace();
	 }
	 processor.close();
	 publisher.close();	 
  }
} 
Output
---Subscriber subscribed---
---Processor subscribed---
Received Java Artcile with id: 1
Reading: Java Functional Interface
---Done--- 

7. References

Java 9 Class Flow
The Reactive Manifesto

8. Download Source Code

POSTED BY
ARVIND RAI
ARVIND RAI
LEARN MORE








©2024 concretepage.com | Privacy Policy | Contact Us