Java CompletableFuture supplyAsync()

By Arvind Rai, August 24, 2018
On this page we will provide Java CompletableFuture.supplyAsync() example. supplyAsync() is a static method of CompletableFuture introduced in Java 8. The method supplyAsync() completes a task asynchronously running in either ForkJoinPool.commonPool() or given Executor. Find the method signatures.
1. supplyAsync(Supplier<U> supplier)
We need to pass a Supplier as a task to supplyAsync() method. The task will be asynchronously completed running in ForkJoinPool.commonPool() by default and finally supplyAsync() will return new CompletableFuture with the value obtained by calling given Supplier. Find the sample code for supplyAsync() method.
CompletableFuture<String> cf = CompletableFuture.supplyAsync(()-> "Hello World!");
System.out.println(cf.get()); 
2. supplyAsync(Supplier<U> supplier, Executor executor)
We need to pass a Supplier as a task to supplyAsync() method. The task will be asynchronously completed running in given Executor and finally supplyAsync() will return new CompletableFuture with the value obtained by calling given Supplier. Find the sample code for supplyAsync() with Executor.
ExecutorService executorService = Executors.newSingleThreadExecutor();
CompletableFuture<String> cf = CompletableFuture.supplyAsync(
		()-> "Hello World!", 
		executorService
	 );
System.out.println(cf.get()); 
On this page we will provide supplyAsync() examples with thenApply(), whenComplete() with ForkJoinPool.commonPool() and Executor. We will also provide examples with Stream.

supplyAsync() Example with thenApply()

thenApply() executes a function by passing the stage result. When we use supplyAsync() with thenApply(), the thenApply() will execute the given function by passing the result as argument obtained from supplyAsync().
SupplyAsyncExample1.java
package com.concretepage;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class SupplyAsyncExample1 {
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		CompletableFuture<String> cf = CompletableFuture.supplyAsync(()-> getDataById(10))
				.thenApply(data -> sendData(data));
		
		cf.get();
	}
	private static String getDataById(int id) {
		System.out.println("getDataById: "+ Thread.currentThread().getName());
		return "Data:"+ id;
	}
	private static String sendData(String data) {
		System.out.println("sendData: "+ Thread.currentThread().getName());
		System.out.println(data);
		return data;
	}	
} 
Output
getDataById: ForkJoinPool.commonPool-worker-1
sendData: main
Data:10 
Main thread starts executing the code and when it reaches to supplyAsync() then supplyAsync() takes new thread from ForkJoinPool.commonPool() to executes its function asynchronously.
thenApply() will execute either by main thread or the thread used by supplyAsync(). If the supplier of supplyAsync() is taking longer time then thenApply() will be executed by thread used by supplyAsync() and hence main thread will not be blocked. To understand this, change the getDataById() method as following.
private static String getDataById(int id) {
  System.out.println("getDataById: "+ Thread.currentThread().getName());
  try {
	Thread.sleep(1000);
  } catch (InterruptedException e) {
	e.printStackTrace();
  }		
  return "Data:"+ id;
} 
Output
getDataById: ForkJoinPool.commonPool-worker-1
sendData: ForkJoinPool.commonPool-worker-1
Data:10 

supplyAsync() Example with Executor

Here we will pass our Executor as an argument to supplyAsync(). Now the supplier passed to supplyAsync() will be executed by given Executor and not by ForkJoinPool.commonPool().
SupplyAsyncExample2.java
package com.concretepage;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SupplyAsyncExample2 {
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		ExecutorService executorService = Executors.newSingleThreadExecutor();
		CompletableFuture<String> cf = CompletableFuture.supplyAsync(
				()-> getDataById(10), 
				executorService
			)
			.thenApply(data -> sendData(data));
		
		cf.get();
		executorService.shutdown();
	}
	private static String getDataById(int id) {
		System.out.println("getDataById: "+ Thread.currentThread().getName());
		return "Data:"+ id;
	}
	private static String sendData(String data) {
		System.out.println("sendData: "+ Thread.currentThread().getName());
		System.out.println(data);
		return data;
	}	
} 
Output
getDataById: pool-1-thread-1
sendData: main
Data:10 

supplyAsync() Example with whenComplete()

Here we will create supplyAsync() example with whenComplete() method. whenComplete() returns a new CompletionStage with same result or exception after completing a given action. Action is the BiConsumer in which first value is result of CompletionStage and second is the error if any otherwise null.
SupplyAsyncExample3.java
package com.concretepage;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class SupplyAsyncExample3 {
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		CompletableFuture<String> cf = CompletableFuture.supplyAsync(()-> getDataById(10))
			.whenComplete((data, error) -> {
				consumeData(data);
				if(error!= null) {
					System.out.println(error);
				}
			 });
		cf.get();
	}
	private static String getDataById(int id) {
		System.out.println("getDataById: "+ Thread.currentThread().getName());
		return "Data:"+ id;
	}
	private static void consumeData(String data) {
		System.out.println("consumeData: "+ Thread.currentThread().getName());
		System.out.println(data);
	}	
} 
Output
getDataById: ForkJoinPool.commonPool-worker-1
consumeData: main
Data:10 

supplyAsync() Example with Stream

Find the example of supplyAsync() with Stream.
SupplyAsyncExample4.java
package com.concretepage;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class SupplyAsyncExample4 {
    public static void main(String[] args) {
        List<Integer> list = Arrays.asList(10, 20, 30);
        long count = list.stream().map(n-> CompletableFuture.supplyAsync(()-> getDataById(n)))
             .map(cf -> cf.thenApply(data -> sendData(data)))
             .map(t->t.join()).count();
        
        System.out.println("Number of elements:"+ count);
    }
    private static String getDataById(int id) {
	System.out.println("getDataById: "+ Thread.currentThread().getName());
	return "Data:"+ id;
    }
    private static String sendData(String data) {
	System.out.println("sendData: "+ Thread.currentThread().getName());
	System.out.println(data);
	return data;
    }    
} 
Output
getDataById: ForkJoinPool.commonPool-worker-1
sendData: ForkJoinPool.commonPool-worker-1
Data:10
getDataById: ForkJoinPool.commonPool-worker-1
sendData: ForkJoinPool.commonPool-worker-1
Data:20
getDataById: ForkJoinPool.commonPool-worker-1
sendData: ForkJoinPool.commonPool-worker-1
Data:30
Number of elements:3 

References

CompletableFuture
CompletionStage
POSTED BY
ARVIND RAI
ARVIND RAI
LEARN MORE








©2024 concretepage.com | Privacy Policy | Contact Us