29 Java - Multithreading 6 (Future | Callable | CompletableFuture)

Status of submitted task

public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

        //New thread will be created, and it will perform the task

        poolExecutor.submit(()->{
            System.out.println("this is the task, which thread will execute");
        });

        //main thread will continue processing
    }
}
  • Now, what if caller want to know the status of the thread1. Whether its completed or failed etc.

  • To know the status of submit we use Future

Future

  • Interface which represents the result of the Async task.

  • Means, it allow you to check if:

    • Computation is complete

    • Ger the result

    • Take care of exception if any

public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

        //New thread will be created, and it will perform the task

        Future<?> futureObj = poolExecutor.submit(()->{
            System.out.println("this is the task, which thread will execute");
        });

        //caller is checking the status of the thread it created
        System.out.println(futureObj.isDone());

        poolExecutor.shutdown();
    }
}
/*
false
this is the task, which thread will execute
*/

Future Methods

boolean cancel(boolean mayInterruptIfRunning)

  • Attempts to cancel the execution of the task.

  • Returns false, if task can not be cancelled. Typically because task already completed; returns true otherwise.

boolean isCancelled()

  • Returns true, if task was cancelled before it get completed.

boolean isDone()

  • Returns true if this task completed.

  • Completion may be due to normal termination, an exception, or cancellation.

  • In all these cases, this method will return true.

V get()

  • Wait if required, for the completion of the task.

  • After task completed, retrieve the result if available.

V get(long timeout, TimeUnit unit)

  • Wait if required, for at most the given timeout period.

  • Throws TimeoutException if timeout period finished and task is not yet completed.

Future Examples

Example 1

public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

        //New thread will be created, and it will perform the task

        Future<?> futureObj = poolExecutor.submit(()->{
            try{
                Thread.sleep(7000);
                System.out.println("this is the task, which thread will execute");
            }
            catch (Exception e){
                //handle the exception
            }
        });

        //caller is checking the status of the thread it created
        System.out.println(futureObj.isDone());

        try{
            futureObj.get();
            System.out.println("Finished waiting for the future Object");
        }
        catch (Exception e){

        }
        poolExecutor.shutdown();
    }
}
false
this is the task, which thread will execute
Finished waiting for the future Object

Example 2

public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

        //New thread will be created, and it will perform the task

        Future<?> futureObj = poolExecutor.submit(()->{
            try{
                Thread.sleep(7000);
                System.out.println("this is the task, which thread will execute");
            }
            catch (Exception e){
                //handle the exception
            }
        });

        //caller is checking the status of the thread it created
        System.out.println(futureObj.isDone());

        try{
            futureObj.get(2, TimeUnit.SECONDS);
            System.out.println("Finished waiting for the future Object");
        }
        catch (TimeoutException e){
            System.out.println("TimeoutException happened");
        }
        catch (Exception e){

        }

        poolExecutor.shutdown();
    }
}
false
TimeoutException happened
this is the task, which thread will execute

Example 3

public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

        //New thread will be created, and it will perform the task

        Future<?> futureObj = poolExecutor.submit(()->{
            try{
                Thread.sleep(7000);
                System.out.println("this is the task, which thread will execute");
            }
            catch (Exception e){
                //handle the exception
            }
        });

        //caller is checking the status of the thread it created
        System.out.println(futureObj.isDone());

        try{
            futureObj.get(2, TimeUnit.SECONDS);
        }
        catch (TimeoutException e){
            System.out.println("TimeoutException happened");
        }
        catch (Exception e){

        }


        try{
            futureObj.get();
            System.out.println("Finished waiting for the future Object");
        }
        catch (Exception e){

        }

        System.out.println("is Done: "+ futureObj.isDone());
        System.out.println("is Cancelled: "+futureObj.isCancelled());

        poolExecutor.shutdown();
    }
}
false
TimeoutException happened
this is the task, which thread will execute
Finished waiting for the future Object
is Done: true
is Cancelled: false
  • FutureTask is the child of Future interface

  • FutureTask is a wrapper of Runnable and State of the thread.

Callable

  • Callable represents the task which need to be executed just like Runnable

  • But difference is:

    • Runnable do not have any Return type.

    • Callable has the capability to return the value

public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

        Future<Integer> futureObj = poolExecutor.submit(() -> {
            System.out.println("do something");
            return 45;
        });

        poolExecutor.shutdown();
    }
}

Runnable interface

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

Callable interface

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

ThreadPoolExecutor submit types

Example:

public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

        //UseCase1 --> Runnable
        //Runnable has no return type
        //To know the future object we are using the wild card
        //Internally it puts Void
        Future<?> futureObj1 = poolExecutor.submit(() -> {
            System.out.println("Task1 with Runnable");
        });

        try{
            Object obj = futureObj1.get();
            System.out.println(obj==null);
        }
        catch (Exception e){

        }

        //UseCase2
        List<Integer> output = new ArrayList<>();
        Future<List<Integer>> futureObj2 = poolExecutor.submit(() -> {
            output.add(100);
            System.out.println("do something");
            return output;
        });

        try{
            List<Integer> outputFromFutureObj2 = futureObj2.get();
            System.out.println(outputFromFutureObj2);
        }
        catch (Exception e){

        }

        //UseCase3
        Future<List<Integer>> futureObj3 = poolExecutor.submit(() -> {
            System.out.println("Task3 with Callable");
            List<Integer> listObj = new ArrayList<>();
            listObj.add(200);
            return listObj;
        });

        try{
            List<Integer> outputFromFutureObject3 = futureObj3.get();
            System.out.println(outputFromFutureObject3);
        }
        catch (Exception e){

        }

        poolExecutor.shutdown();
    }
}
something
true
do something
[100]
Task3 with Callable
[200]

submit(Runnable)

        Future<?> futureObj1 = poolExecutor.submit(() -> {
            System.out.println("Task1 with Runnable");
        });

        try{
            Object obj = futureObj1.get();
            System.out.println(obj==null);
        }
        catch (Exception e){

        }
//true

submit(Runnable, T)

        //UseCase2
        List<Integer> output = new ArrayList<>();
        Future<List<Integer>> futureObj2 = poolExecutor.submit(() -> {
            output.add(100);
            System.out.println("do something");
            return output;
        });

        try{
            List<Integer> outputFromFutureObj2 = futureObj2.get();
            System.out.println(outputFromFutureObj2);
        }
        catch (Exception e){

        }

Or

//UseCase2
List<Integer> output = new ArrayList<>();
Future<List<Integer>> futureObj2 = poolExecutor.submit(new MyRunnable(output), output);
public class MyRunnable implements Runnable{

    List<Integer> output;

    public MyRunnable(List<Integer> output) {
        this.output = output;
    }

    @Override
    public void run() {
        output.add(100);
        System.out.println("do something");
    }
}

submit(Callable<T>)

        //UseCase3
        Future<List<Integer>> futureObj3 = poolExecutor.submit(() -> {
            System.out.println("Task3 with Callable");
            List<Integer> listObj = new ArrayList<>();
            listObj.add(200);
            return listObj;
        });

        try{
            List<Integer> outputFromFutureObject3 = futureObj3.get();
            System.out.println(outputFromFutureObject3);
        }
        catch (Exception e){

        }

CompletableFuture

  • Introduced in Java8

  • To help in async programming

  • We can considered it as advanced version of Future

  • Provides additional capability like chaining.

  • Future is an interface and it's child is CompletableFuture

CompletableFuture Methods

supplyAsync

  1. public static<T> CompletableFuture<T> supplyAsync(Supplier<T> supplier)

  2. public static<T> CompletableFuture<T> supplyAsync(Supplier<T> supplier, Executor executor)

  • supplyAsync method initiates an Async operation. Means it internally creates a new thread.

  • 'supplier' is executed asynchronously in a separate new thread

  • If we want more control on Threads, we can pass Executor in the method

  • By default it uses, shared Fork-Join Pool executor. It dynamically adjust its pool size based on processors.

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());


        CompletableFuture<String> asyncTask1 = CompletableFuture.supplyAsync(() -> {
            //this is the task which need to be completed by thread
            return "task completed";
        }, poolExecutor);
        System.out.println(asyncTask1.get());

        poolExecutor.shutdown();
    }
}

thenApply & thenApplyAsync

  • Apply a function to the result of previous Async computation

  • Return a new CompletableFuture object.

thenApply

  • It's a synchronous execution

  • Means, it uses same thread which completed the previous Async task.

Example 1

CompletableFuture<String> asyncTask1 = CompletableFuture.supplyAsync(() -> {
    //this is the task which need to be completed by thread
    return "task completed";
}, poolExecutor).thenApply((String val)->{
    //Functionality which can work on the result of the previous async task
    return val + " in thenApply";
});
System.out.println(asyncTask1.get());

Example 2:

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());


        CompletableFuture<String> compFutureObj1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("ThreadName of SupplyAsync: "+ Thread.currentThread().getName());
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                //handle exception
            }
            return "Supply Task1 ";
        }, poolExecutor);

        CompletableFuture<String> compFutureObj2 = compFutureObj1.thenApply((String val) -> {
            System.out.println("ThreadName of thenApply: " + Thread.currentThread().getName());
            return "And ";
        });

        System.out.println("Thread Name for 'after Completable Future: " + Thread.currentThread().getName());


        poolExecutor.shutdown();
    }
}
Thread Name for 'after Completable Future: main
ThreadName of SupplyAsync: pool-1-thread-1
ThreadName of thenApply: pool-1-thread-1

thenApplyAsync

  • It's a Asynchronous execution

  • Means, it uses different thread (from fork-join pool, if we do not provide the executor in the method), to complete this function.

  • If multiple thenApplyAsync is used, ordering cannot be guarantee, they will run concurrently.

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());


        CompletableFuture<String> compFutureObj1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("ThreadName of SupplyAsync: "+ Thread.currentThread().getName());
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                //handle exception
            }
            return "Supply Task1 ";
        }, poolExecutor);

        CompletableFuture<String> compFutureObj2 = compFutureObj1.thenApplyAsync((String val) -> {
            System.out.println("ThreadName of thenApplyAsync: " + Thread.currentThread().getName());
            return val + "And ";
        });

        System.out.println("Thread Name for 'after Completable Future: " + Thread.currentThread().getName());

        System.out.println("Async output : "+ compFutureObj2.get());

        poolExecutor.shutdown();
    }
}

thenCompose & thenComposeAsync

  • Chain together dependent Async operations

  • Means when next Async operation depends on the result of the previous Async one.

  • we can tied them together.

  • For async tasks, we can bring some ordering using this

thenCompose

CompletableFuture<String> compFutureObj1 = CompletableFuture.supplyAsync(() -> {
    return "Hello";
}, poolExecutor);

CompletableFuture<String> compFutureObj2 = compFutureObj1.thenCompose((String val) -> {
    return CompletableFuture.supplyAsync(() -> val + " world");
});

System.out.println(compFutureObj2.get());

/*
Hello World
*/

theComposeAsync

CompletableFuture<String> compFutureObj1 = CompletableFuture.supplyAsync(() -> {
    return "Hello";
}, poolExecutor)
        .thenComposeAsync((String val) -> {
    return CompletableFuture.supplyAsync(() -> val + " world");
})
        .thenComposeAsync((String val)->{
            return CompletableFuture.supplyAsync(() -> val+ " all");
        })
        ;

System.out.println(compFutureObj1.get());
/*
Hello world all
*/

thenAccept & thenAcceptAsync

  • Generally end stage, in the chain of Sync operations

  • It does not return anything.

CompletableFuture<String> compFutureObj1 = CompletableFuture.supplyAsync(() -> {
    return "Hello";
}, poolExecutor);

CompletableFuture<Void> acceptObj = compFutureObj1.thenAccept((String val) -> System.out.println("All stages completed"));

thenCombine & thenCombineAsync

  • Used to combine the result of 2 Comparable Future

  • Async means it creates/uses separate new thread.

CompletableFuture<Integer> asyncTask1 = CompletableFuture.supplyAsync(() -> {
    return 10;
}, poolExecutor);

CompletableFuture<String> asyncTask2 = CompletableFuture.supplyAsync(()->{
    return "k";
}, poolExecutor);

CompletableFuture<String> combinedFutureObj = asyncTask1.thenCombine(asyncTask2, (Integer val1, String val2) -> {
    return val1 + val2;
});

System.out.println(combinedFutureObj.get());
//10k