29 Java - Multithreading 6 (Future | Callable | CompletableFuture)
Photo by amirali mirhashemian on Unsplash
Status of submitted task
public class Main {
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
//New thread will be created, and it will perform the task
poolExecutor.submit(()->{
System.out.println("this is the task, which thread will execute");
});
//main thread will continue processing
}
}
Now, what if caller want to know the status of the thread1. Whether its completed or failed etc.
To know the status of submit we use Future
Future
Interface which represents the result of the Async task.
Means, it allow you to check if:
Computation is complete
Ger the result
Take care of exception if any
public class Main {
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
//New thread will be created, and it will perform the task
Future<?> futureObj = poolExecutor.submit(()->{
System.out.println("this is the task, which thread will execute");
});
//caller is checking the status of the thread it created
System.out.println(futureObj.isDone());
poolExecutor.shutdown();
}
}
/*
false
this is the task, which thread will execute
*/
Future Methods
boolean cancel(boolean mayInterruptIfRunning)
Attempts to cancel the execution of the task.
Returns false, if task can not be cancelled. Typically because task already completed; returns true otherwise.
boolean isCancelled()
- Returns true, if task was cancelled before it get completed.
boolean isDone()
Returns true if this task completed.
Completion may be due to normal termination, an exception, or cancellation.
In all these cases, this method will return true.
V get()
Wait if required, for the completion of the task.
After task completed, retrieve the result if available.
V get(long timeout, TimeUnit unit)
Wait if required, for at most the given timeout period.
Throws
TimeoutException
if timeout period finished and task is not yet completed.
Future Examples
Example 1
public class Main {
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
//New thread will be created, and it will perform the task
Future<?> futureObj = poolExecutor.submit(()->{
try{
Thread.sleep(7000);
System.out.println("this is the task, which thread will execute");
}
catch (Exception e){
//handle the exception
}
});
//caller is checking the status of the thread it created
System.out.println(futureObj.isDone());
try{
futureObj.get();
System.out.println("Finished waiting for the future Object");
}
catch (Exception e){
}
poolExecutor.shutdown();
}
}
false
this is the task, which thread will execute
Finished waiting for the future Object
Example 2
public class Main {
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
//New thread will be created, and it will perform the task
Future<?> futureObj = poolExecutor.submit(()->{
try{
Thread.sleep(7000);
System.out.println("this is the task, which thread will execute");
}
catch (Exception e){
//handle the exception
}
});
//caller is checking the status of the thread it created
System.out.println(futureObj.isDone());
try{
futureObj.get(2, TimeUnit.SECONDS);
System.out.println("Finished waiting for the future Object");
}
catch (TimeoutException e){
System.out.println("TimeoutException happened");
}
catch (Exception e){
}
poolExecutor.shutdown();
}
}
false
TimeoutException happened
this is the task, which thread will execute
Example 3
public class Main {
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
//New thread will be created, and it will perform the task
Future<?> futureObj = poolExecutor.submit(()->{
try{
Thread.sleep(7000);
System.out.println("this is the task, which thread will execute");
}
catch (Exception e){
//handle the exception
}
});
//caller is checking the status of the thread it created
System.out.println(futureObj.isDone());
try{
futureObj.get(2, TimeUnit.SECONDS);
}
catch (TimeoutException e){
System.out.println("TimeoutException happened");
}
catch (Exception e){
}
try{
futureObj.get();
System.out.println("Finished waiting for the future Object");
}
catch (Exception e){
}
System.out.println("is Done: "+ futureObj.isDone());
System.out.println("is Cancelled: "+futureObj.isCancelled());
poolExecutor.shutdown();
}
}
false
TimeoutException happened
this is the task, which thread will execute
Finished waiting for the future Object
is Done: true
is Cancelled: false
FutureTask is the child of Future interface
FutureTask is a wrapper of Runnable and State of the thread.
Callable
Callable represents the task which need to be executed just like Runnable
But difference is:
Runnable do not have any Return type.
Callable has the capability to return the value
public class Main {
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
Future<Integer> futureObj = poolExecutor.submit(() -> {
System.out.println("do something");
return 45;
});
poolExecutor.shutdown();
}
}
Runnable interface
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
Callable interface
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
ThreadPoolExecutor submit types
Example:
public class Main {
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
//UseCase1 --> Runnable
//Runnable has no return type
//To know the future object we are using the wild card
//Internally it puts Void
Future<?> futureObj1 = poolExecutor.submit(() -> {
System.out.println("Task1 with Runnable");
});
try{
Object obj = futureObj1.get();
System.out.println(obj==null);
}
catch (Exception e){
}
//UseCase2
List<Integer> output = new ArrayList<>();
Future<List<Integer>> futureObj2 = poolExecutor.submit(() -> {
output.add(100);
System.out.println("do something");
return output;
});
try{
List<Integer> outputFromFutureObj2 = futureObj2.get();
System.out.println(outputFromFutureObj2);
}
catch (Exception e){
}
//UseCase3
Future<List<Integer>> futureObj3 = poolExecutor.submit(() -> {
System.out.println("Task3 with Callable");
List<Integer> listObj = new ArrayList<>();
listObj.add(200);
return listObj;
});
try{
List<Integer> outputFromFutureObject3 = futureObj3.get();
System.out.println(outputFromFutureObject3);
}
catch (Exception e){
}
poolExecutor.shutdown();
}
}
something
true
do something
[100]
Task3 with Callable
[200]
submit(Runnable)
Future<?> futureObj1 = poolExecutor.submit(() -> {
System.out.println("Task1 with Runnable");
});
try{
Object obj = futureObj1.get();
System.out.println(obj==null);
}
catch (Exception e){
}
//true
submit(Runnable, T)
//UseCase2
List<Integer> output = new ArrayList<>();
Future<List<Integer>> futureObj2 = poolExecutor.submit(() -> {
output.add(100);
System.out.println("do something");
return output;
});
try{
List<Integer> outputFromFutureObj2 = futureObj2.get();
System.out.println(outputFromFutureObj2);
}
catch (Exception e){
}
Or
//UseCase2
List<Integer> output = new ArrayList<>();
Future<List<Integer>> futureObj2 = poolExecutor.submit(new MyRunnable(output), output);
public class MyRunnable implements Runnable{
List<Integer> output;
public MyRunnable(List<Integer> output) {
this.output = output;
}
@Override
public void run() {
output.add(100);
System.out.println("do something");
}
}
submit(Callable<T>)
//UseCase3
Future<List<Integer>> futureObj3 = poolExecutor.submit(() -> {
System.out.println("Task3 with Callable");
List<Integer> listObj = new ArrayList<>();
listObj.add(200);
return listObj;
});
try{
List<Integer> outputFromFutureObject3 = futureObj3.get();
System.out.println(outputFromFutureObject3);
}
catch (Exception e){
}
CompletableFuture
Introduced in Java8
To help in async programming
We can considered it as advanced version of Future
Provides additional capability like chaining.
Future is an interface and it's child is CompletableFuture
CompletableFuture Methods
supplyAsync
public static<T> CompletableFuture<T> supplyAsync(Supplier<T> supplier)
public static<T> CompletableFuture<T> supplyAsync(Supplier<T> supplier, Executor executor)
supplyAsync method initiates an Async operation. Means it internally creates a new thread.
'supplier' is executed asynchronously in a separate new thread
If we want more control on Threads, we can pass Executor in the method
By default it uses, shared Fork-Join Pool executor. It dynamically adjust its pool size based on processors.
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
CompletableFuture<String> asyncTask1 = CompletableFuture.supplyAsync(() -> {
//this is the task which need to be completed by thread
return "task completed";
}, poolExecutor);
System.out.println(asyncTask1.get());
poolExecutor.shutdown();
}
}
thenApply & thenApplyAsync
Apply a function to the result of previous Async computation
Return a new CompletableFuture object.
thenApply
It's a synchronous execution
Means, it uses same thread which completed the previous Async task.
Example 1
CompletableFuture<String> asyncTask1 = CompletableFuture.supplyAsync(() -> {
//this is the task which need to be completed by thread
return "task completed";
}, poolExecutor).thenApply((String val)->{
//Functionality which can work on the result of the previous async task
return val + " in thenApply";
});
System.out.println(asyncTask1.get());
Example 2:
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
CompletableFuture<String> compFutureObj1 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("ThreadName of SupplyAsync: "+ Thread.currentThread().getName());
Thread.sleep(5000);
} catch (InterruptedException e) {
//handle exception
}
return "Supply Task1 ";
}, poolExecutor);
CompletableFuture<String> compFutureObj2 = compFutureObj1.thenApply((String val) -> {
System.out.println("ThreadName of thenApply: " + Thread.currentThread().getName());
return "And ";
});
System.out.println("Thread Name for 'after Completable Future: " + Thread.currentThread().getName());
poolExecutor.shutdown();
}
}
Thread Name for 'after Completable Future: main
ThreadName of SupplyAsync: pool-1-thread-1
ThreadName of thenApply: pool-1-thread-1
thenApplyAsync
It's a Asynchronous execution
Means, it uses different thread (from
fork-join
pool, if we do not provide the executor in the method), to complete this function.If multiple
thenApplyAsync
is used, ordering cannot be guarantee, they will run concurrently.
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
CompletableFuture<String> compFutureObj1 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("ThreadName of SupplyAsync: "+ Thread.currentThread().getName());
Thread.sleep(5000);
} catch (InterruptedException e) {
//handle exception
}
return "Supply Task1 ";
}, poolExecutor);
CompletableFuture<String> compFutureObj2 = compFutureObj1.thenApplyAsync((String val) -> {
System.out.println("ThreadName of thenApplyAsync: " + Thread.currentThread().getName());
return val + "And ";
});
System.out.println("Thread Name for 'after Completable Future: " + Thread.currentThread().getName());
System.out.println("Async output : "+ compFutureObj2.get());
poolExecutor.shutdown();
}
}
thenCompose & thenComposeAsync
Chain together dependent Async operations
Means when next Async operation depends on the result of the previous Async one.
we can tied them together.
For async tasks, we can bring some ordering using this
thenCompose
CompletableFuture<String> compFutureObj1 = CompletableFuture.supplyAsync(() -> {
return "Hello";
}, poolExecutor);
CompletableFuture<String> compFutureObj2 = compFutureObj1.thenCompose((String val) -> {
return CompletableFuture.supplyAsync(() -> val + " world");
});
System.out.println(compFutureObj2.get());
/*
Hello World
*/
theComposeAsync
CompletableFuture<String> compFutureObj1 = CompletableFuture.supplyAsync(() -> {
return "Hello";
}, poolExecutor)
.thenComposeAsync((String val) -> {
return CompletableFuture.supplyAsync(() -> val + " world");
})
.thenComposeAsync((String val)->{
return CompletableFuture.supplyAsync(() -> val+ " all");
})
;
System.out.println(compFutureObj1.get());
/*
Hello world all
*/
thenAccept & thenAcceptAsync
Generally end stage, in the chain of Sync operations
It does not return anything.
CompletableFuture<String> compFutureObj1 = CompletableFuture.supplyAsync(() -> {
return "Hello";
}, poolExecutor);
CompletableFuture<Void> acceptObj = compFutureObj1.thenAccept((String val) -> System.out.println("All stages completed"));
thenCombine & thenCombineAsync
Used to combine the result of 2 Comparable Future
Async means it creates/uses separate new thread.
CompletableFuture<Integer> asyncTask1 = CompletableFuture.supplyAsync(() -> {
return 10;
}, poolExecutor);
CompletableFuture<String> asyncTask2 = CompletableFuture.supplyAsync(()->{
return "k";
}, poolExecutor);
CompletableFuture<String> combinedFutureObj = asyncTask1.thenCombine(asyncTask2, (Integer val1, String val2) -> {
return val1 + val2;
});
System.out.println(combinedFutureObj.get());
//10k