Brief
Executors provides Factory methods which we can use to create Thread Pool Executor.
Threase are present in
java.util.concurrent
package.
Fixed ThreadPoolExecutor
newFixedThreadPool
method creates a thread pool executor with a fixed no. of threads.
Property | |
Min and Max Pool | Same |
Queue Size | Unbounded Queue |
Thread Alive when idle | Yes |
When to use | Exact Info, how many Async task is needed |
Disadvantage | Not good when workload is heavy, as it will lead to limited concurrency |
//fixed thread pool executor
ExecutorService poolExecutor = Executors.newFixedThreadPool(5);
Future<String> submitObj = poolExecutor.submit(() -> {
return "this is the async task";
});
System.out.println(submitObj.get());
poolExecutor.shutdown();
Cached ThreadPoolExecutor
newCachedThreadPool
method creates a thread pool that creates a new thread as Needed (dynamically)
Property
Property | |
Min and Max Pool | Min: 0; Max: MAX_VALUE (Integer Maximum) |
Queue Size | Blocking Queue with Size 0 |
Thread Alive when idle | 60 SECONDS |
When to use | Good for handling burst of short lived tasks. |
Disadvantage | Many long lived tasks and submitted rapidly, ThreadPool can create so many threads which might lead to increase memory usage. |
ExecutorService poolExecutor = Executors.newCachedThreadPool();
Future<String> submitObj = poolExecutor.submit(() -> {
return "this is the async task";
});
System.out.println(submitObj.get());
poolExecutor.shutdown();
Single Thread Executor
newSingleThreadExecutor
creates Executor with just single Worker thread
Property | |
Min and Max Pool | Min:1; Max: 1 |
Queue Size | Unbounded Queue |
Thread Alive when idle | Yes |
When to use | When need to process tasks sequentially |
Disadvantage | No Concurrency at all |
ExecutorService poolExecutor = Executors.newSingleThreadExecutor();
Future<String> submitObj = poolExecutor.submit(() -> {
return "this is the async task";
});
WorkStealing Pool Executor
Fork-Join Pool
It creates a Fork-Join Executor
Number of threads depends upon the Available Processors or we can specify in the parameter
There are 2 queues
Submission Queue
Work-Stealing Queue for each thread (It's a Dequeue)
Steps
If all threads are busy, task would be placed in "Submission Queue". Alternatively, whenever we call the submit() method, tasks goes into submission queue only.
Let's say task1 picked by ThreadA. If task1 has 2 subtasks created using fork() method. Subtask1 will be executed by ThreadA only and Subtask2 is put into the ThreadA work-stealing queue.
If any other thread becomes free, and there is no tasks in submission queue, it can "STEAL" the task from the another thread work-stealing queue.
A task can be split into multiple small sub-tasks. For this, the Task should extend:
RecursiveTask: Use this one if task returns a value.
RecursiveAction: Use this one if the task doesn't return any value (void).
We can create Fork-Join Pool using
newWorkStealingPool
method in ExecutorService. Alternatively we can create by callingForkJoinPool.commonPool()
method.Another thread steals the subtask from backside of the deque.
Visualization of Work-Stealing
Code
While initializing the fork-join pool, if we don't specify the number of threads in the constructor, then the default number of threads created is exactly same as the number of cpu cores. For example, if we have 4 cores, then 4 threads are created. We can also specify minimum and maximum number of threads. There are a wide range of types of constructors present in fork-join pool.
We extend RecursiveTask as we return the sum.
RecursiveTask is an abstract class which has an abstract method
compute
.We create two separate objects which can perfrom independent operations, and the combination of both results gives the final result. Here, we split the main task into 2 subtasks.
fork()
function creates a subtaskjoin()
function waits for the subtask to finsh.
public class ComputeSumTask extends RecursiveTask<Integer> {
int start;
int end;
public ComputeSumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start <=4){
int total = 0;
for (int i=start; i<=end; i++)
total+=i;
return total;
}
//Split the task
int mid = (start+end)/2;
ComputeSumTask leftTask = new ComputeSumTask(start, mid);
ComputeSumTask rightTask = new ComputeSumTask(mid+1, end);
//Fork the subtasks for parallel execution
leftTask.fork();
rightTask.fork();
//combine the results of subtasks
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// combine the results
return leftResult + rightResult;
}
}
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//fixed thread pool executor
ForkJoinPool poolExecutor = ForkJoinPool.commonPool();
Future<Integer> submitObj = poolExecutor.submit(new ComputeSumTask(1,100));
System.out.println(submitObj.get());
poolExecutor.shutdown();
}
}
ForkJoinPool poolExecutor = (ForkJoinPool) Executors.newWorkStealingPool();