30 Java - Multithreading 7 (Executors | Fork-Join Pool)

Brief

  • Executors provides Factory methods which we can use to create Thread Pool Executor.

  • Threase are present in java.util.concurrent package.

Fixed ThreadPoolExecutor

  • newFixedThreadPool method creates a thread pool executor with a fixed no. of threads.
Property
Min and Max PoolSame
Queue SizeUnbounded Queue
Thread Alive when idleYes
When to useExact Info, how many Async task is needed
DisadvantageNot good when workload is heavy, as it will lead to limited concurrency
//fixed thread pool executor
ExecutorService poolExecutor = Executors.newFixedThreadPool(5);
Future<String> submitObj = poolExecutor.submit(() -> {
    return "this is the async task";
});

System.out.println(submitObj.get());
poolExecutor.shutdown();

Cached ThreadPoolExecutor

  • newCachedThreadPool method creates a thread pool that creates a new thread as Needed (dynamically)

Property

Property
Min and Max PoolMin: 0; Max: MAX_VALUE (Integer Maximum)
Queue SizeBlocking Queue with Size 0
Thread Alive when idle60 SECONDS
When to useGood for handling burst of short lived tasks.
DisadvantageMany long lived tasks and submitted rapidly, ThreadPool can create so many threads which might lead to increase memory usage.
ExecutorService poolExecutor = Executors.newCachedThreadPool();
Future<String> submitObj = poolExecutor.submit(() -> {
    return "this is the async task";
});
System.out.println(submitObj.get());
poolExecutor.shutdown();

Single Thread Executor

  • newSingleThreadExecutor creates Executor with just single Worker thread
Property
Min and Max PoolMin:1; Max: 1
Queue SizeUnbounded Queue
Thread Alive when idleYes
When to useWhen need to process tasks sequentially
DisadvantageNo Concurrency at all
ExecutorService poolExecutor = Executors.newSingleThreadExecutor();
Future<String> submitObj = poolExecutor.submit(() -> {
    return "this is the async task";
});

WorkStealing Pool Executor

Fork-Join Pool

  • It creates a Fork-Join Executor

  • Number of threads depends upon the Available Processors or we can specify in the parameter

  • There are 2 queues

    • Submission Queue

    • Work-Stealing Queue for each thread (It's a Dequeue)

  • Steps

    • If all threads are busy, task would be placed in "Submission Queue". Alternatively, whenever we call the submit() method, tasks goes into submission queue only.

    • Let's say task1 picked by ThreadA. If task1 has 2 subtasks created using fork() method. Subtask1 will be executed by ThreadA only and Subtask2 is put into the ThreadA work-stealing queue.

    • If any other thread becomes free, and there is no tasks in submission queue, it can "STEAL" the task from the another thread work-stealing queue.

  • A task can be split into multiple small sub-tasks. For this, the Task should extend:

    • RecursiveTask: Use this one if task returns a value.

    • RecursiveAction: Use this one if the task doesn't return any value (void).

  • We can create Fork-Join Pool using newWorkStealingPool method in ExecutorService. Alternatively we can create by calling ForkJoinPool.commonPool() method.

  • Another thread steals the subtask from backside of the deque.

Visualization of Work-Stealing

Code

  • While initializing the fork-join pool, if we don't specify the number of threads in the constructor, then the default number of threads created is exactly same as the number of cpu cores. For example, if we have 4 cores, then 4 threads are created. We can also specify minimum and maximum number of threads. There are a wide range of types of constructors present in fork-join pool.

  • We extend RecursiveTask as we return the sum.

  • RecursiveTask is an abstract class which has an abstract method compute.

  • We create two separate objects which can perfrom independent operations, and the combination of both results gives the final result. Here, we split the main task into 2 subtasks.

  • fork() function creates a subtask

  • join() function waits for the subtask to finsh.

public class ComputeSumTask extends RecursiveTask<Integer> {

    int start;
    int end;

    public ComputeSumTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if (end - start <=4){
            int total = 0;
            for (int i=start; i<=end; i++)
                total+=i;
            return total;
        }

        //Split the task
        int mid = (start+end)/2;
        ComputeSumTask leftTask = new ComputeSumTask(start, mid);
        ComputeSumTask rightTask = new ComputeSumTask(mid+1, end);

        //Fork the subtasks for parallel execution
        leftTask.fork();
        rightTask.fork();

        //combine the results of subtasks
        int leftResult = leftTask.join();
        int rightResult = rightTask.join();

        // combine the results
        return leftResult + rightResult;
    }
}
public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //fixed thread pool executor
        ForkJoinPool poolExecutor = ForkJoinPool.commonPool();
        Future<Integer> submitObj = poolExecutor.submit(new ComputeSumTask(1,100));

        System.out.println(submitObj.get());
        poolExecutor.shutdown();
    }
}
ForkJoinPool poolExecutor = (ForkJoinPool) Executors.newWorkStealingPool();