28 Java - Multithreading 5 (ThreadPoolExecutor)

Thread Pool

  • It's a collection of threads (aka workers), which are available to perform the submitted tasks.

  • Once task completed, worker thread get back to Thread Pool and wait for new task to assigned.

  • Means threads can be reused.

Advantages of Thread Pool

  • Thread Creation time can be saved.

    • When each thread created, space is allocated to it (stack, heap, program counter etc..) and this takes time.

    • With thread, this can be avoided by reusing the thread.

  • Overhead of managing the Thread lifecycle can be removed.

    • Thread has different state like Running, Waiting, terminate etc. and managing thread state includes complexity

    • Thread pool abstract away this management..

  • Increased the performance:

    • More threads means, more Context Switching time, using control over thread creation, excess context switching can be avoided.

Executor

java.util.concurrent package has an executor framework.

public interface Executor {
    void execute(Runnable command);
}

ThreadPoolExecutor

It's helps to create a customizable ThreadPool.

ThreadPoolExecutor Constructor

public ThreadPoolExecutor(
            int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue,
            ThreadFactory threadFactory,
            RejectedExecutionHandler handler
)

corePoolSize

Number of threads are initially created and keep in the pool, even if they are idle.

allowCoreThreadTimeout

If this property is set to TRUE (by default its FALSE), idle thread kept Alive till time specified by 'KeepAliveTime'

KeepAliveTime

Thread, which are idle get terminated after this time.

maxPoolSize

  • Maximum number of thread allowed in a pool.

  • If no. of threads are equals to corePoolSize and queue is also full, then new threads are created (till its less than maxPoolSize)

  • Excess thread, will remain in pool, this pool is not shutdown or if allowCoreThreadTimeOut is set to true, then excess thread get terminated after remain idle for keepAliveTime.

TimeUnit

  • TimeUnit for the keepAliveTime, whether Millisecond or Second or Hours etc.

BlockingQueue

  • Queue used to hold taks, before they got picked by the worker thread.

  • Bounded Queue: Queue with FIXED capacity.

    • Like: ArrayBlockingQueue
  • Unbounded Queue: Queue wiht NO FIXED capacity.

    • Like: LinkedBlockingQueue
  • Bounded Queue is generally used and recommended.

ThreadFactory

Factory for creating new thread. ThreadPoolExecutor use this to create new thread, this factory provide us an interface to:

  • To give custom Thread name

  • To give custom Thread priority

  • To set Thread Daemon flag etc.

RejectedExecutionHandler

Handler for tasks that cannot be accepted by thread pool. Generally logging logic can be put here for debugging purpose.

  • new ThreadPoolExecutor.AbortPolicy : Throws RejectedExecutionException. Just throws the exception.

  • new ThreadPoolExecutor.CallerRunsPolicy : Executed the rejected task in the caller thread (thread that attempted to submit the task)

  • new ThreadPoolExecutor.DiscardPolicy : Silently discard the Rejected task, without throwing any exception.

  • new ThreadPoolExecutor.DiscardOldestPolicy : Discard the oldest task in the queue, to accomodate new task. Just replaces the oldest task with the new task in the queue.

ThreadPoolExecutor Example

Configuration

Following is the configuration of ThreadPoolExecutor.

  • corePoolSize = 3 (There should be a minimum of 3 threads in the thread pool)

  • maxPoolSize = 5

  • Queue capacity = 5

  • Rejection policy = DiscardPolicy (Reject the task)

Since initial thread pool size is 3 let those threads be

  • Thread 0

  • Thread 1

  • Thread 2

Let Tasks be Task0, Task1, Task2, ...Task11

Step 1 - Occupy all threads

When Task0, Task1, and Task2 hits the ThreadPool, each thread will take one task and start executing them

  • Thread 0 takes Task0

  • Thread 1 takes Task1

  • Thread 2 takes Task2

Step 2 - Put in Queue

Now, if any threads hits the ThreadPool, they will be kept waiting state in the queue.

  • When Task3 hits, ThreadPool checks any available threads if there are none it puts the task3 in the queue.

  • Similarly, Task4, Task5, Task6, and Task7 kept in the queue as there are no available threads.

  • Now the queue is full and it cannot accommodate any other tasks.

Step 3 - New Threads (Queue Full)

  • Suppose Task8 hits the ThreadPool. The ThreadPool checks for any available threads. If there are none, then checks for any space left in the queue. If there is no space left, then ThreadPool creates one more thread, Thread 3, and assigns Task8 to it.

    • Thread 3 takes task8
  • Similarly, if Task9 comes and all both the available threads and queue are full, then ThreadPool creates one more thread, Thread 4, and assigns Task9 to it.

    • Thread 4 takes task9
  • Now, ThreadPool reached it's maximum pool size of 5, since it has utilized all the allocated threads.

Step 4 - Apply Rejection Policy (Max Threads reached)

Maximum Pool size reached

  • If a new Task10 hits the ThreadPool, then it checks for any available threads. If there are none, it checks for any available space in the queue. If there is no space, it checks whether it can create a new thread. if it cannot, it completely rejects the task, since we are using DiscardPolicy as our Rejection handler policy.

Why are we using Queue in ThreadPool?

  • If we don’t use a queue, we will quickly reach the maximum pool size and start rejecting tasks.

  • Moreover, most of the requests will be rejected if all the threads are busy.

  • If the thread pool reaches its maximum capacity and there are no incoming requests, then the threads will remain idle.

  • Even if we destroy the thread after a certain time, in the next minute all the minimum threads get occupied and it requires the creation of new threads to handle the tasks.

  • Most of the time, the corePoolSize (minimum number of threads) is sufficient.

Lifecycle of ThreadPool Executor

Running

  • Executor is in running state and submit() method will be used to add new task.

Shutdown

  • Executor do not accept new tasks, but continue to process existing tasks, once existing tasks finished, executor moves to terminated state.

  • Method used shutdown()

Stop (force shutdown)

  • Executor do not accept new tasks.

  • Executor forcefully stops all the tasks which are currently executing.

  • And once fully shutdown, moves to terminated state.

  • Method used shutdownNow()

Terminated

  • End of life for particular ThreadPoolExecutor

  • isTerminated() method can be used to check if particular thread pool executor is terminated or not.

Code Example

Here, we use ThreadPoolExecutor to execute the tasks. For simplicity, we run each task for 5 seconds.

ThreadPoolExecutor configuration

  • Min threads i.e core = 2

  • Max Threads = 4

  • Queue size = 2

We can also use default thread factory and in built rejection policies.

CustomRejectHandler

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class CustomRejectHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("Task rejected: "+r.toString());
    }
}

CustomThreadFactory

import java.util.concurrent.ThreadFactory;

public class CustomThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {

        Thread th =  new Thread(r);
        th.setPriority(Thread.NORM_PRIORITY);
        th.setDaemon(false);
        return th;
    }
}

Executing 4 tasks in parallel

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2,4,10, TimeUnit.MINUTES,
                new ArrayBlockingQueue<>(2),new CustomThreadFactory(),new CustomRejectHandler());
        for (int i = 0; i < 4; i++) {
            int taskNo = i;
            executor.submit(()->{
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    //Handle the exception
                }
                System.out.println("Task "+ taskNo + " processed by: "+Thread.currentThread().getName());
            });
        }
        executor.shutdown();
    }
}
Task 1 processed by: Thread-1
Task 0 processed by: Thread-0
Task 3 processed by: Thread-1
Task 2 processed by: Thread-0

Executing 5 tasks in parallel

public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2,4,10, TimeUnit.MINUTES,
                new ArrayBlockingQueue<>(2),new CustomThreadFactory(),new CustomRejectHandler());
        for (int i = 0; i < 5; i++) {
            int taskNo = i;
            executor.submit(()->{
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    //Handle the exception
                }
                System.out.println("Task "+ taskNo + " processed by: "+Thread.currentThread().getName());
            });
        }
        executor.shutdown();
    }
}
Task 0 processed by: Thread-0
Task 1 processed by: Thread-1
Task 4 processed by: Thread-2
Task 2 processed by: Thread-2
Task 3 processed by: Thread-0

Executing 7 tasks in parallel (Overflow - Rejection)

public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2,4,10, TimeUnit.MINUTES,
                new ArrayBlockingQueue<>(2),new CustomThreadFactory(),new CustomRejectHandler());
        for (int i = 0; i < 7; i++) {
            int taskNo = i;
            executor.submit(()->{
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    //Handle the exception
                }
                System.out.println("Task "+ taskNo + " processed by: "+Thread.currentThread().getName());
            });
        }
        executor.shutdown();
    }
}
Task rejected: java.util.concurrent.FutureTask@2d98a335[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@378bf509[Wrapped task = learn.Multithreading.ThreadPool.Main$$Lambda$15/0x0000020d20002000@5fd0d5ae]]
Task 0 processed by: Thread-0
Task 1 processed by: Thread-1
Task 4 processed by: Thread-2
Task 5 processed by: Thread-3
Task 2 processed by: Thread-2
Task 3 processed by: Thread-0

TimeOut Example

public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2,4,10, TimeUnit.MINUTES,
                new ArrayBlockingQueue<>(2),new CustomThreadFactory(),new CustomRejectHandler());

        executor.allowCoreThreadTimeOut(true);
        for (int i = 0; i < 4; i++) {
            int taskNo = i;
            executor.submit(()->{
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    //Handle the exception
                }
                System.out.println("Task "+ taskNo + " processed by: "+Thread.currentThread().getName());
            });
        }
        executor.shutdown();
    }
}

Interview question

Why you have choosen a corePoolSize of 2 instead of 10, 15 or any another number? what's the logic behind this choice?

Generally, the ThreadPool min and max size are depend on various factors like:

CPU Cores

  • If we have 2 CPU cores and 100 threads, then most of the time, the cpu is just performing context switching between the threads

JVM Memory

  • For each process, whether it creates 1 or 100 threads, the memory allocated to that process is fixed. So, each thread also requires dedicated share of memory from the JVM memory to maintain the Register, Stack and Counter. Hence we cannot create an unlimited number of threads because each thread also requires certain memory.

  • Memory Architecture (Link)

Task Nature

There are 2 types of tasks

  • CPU-Intensive: Tasks that require more CPU time are called CPU-intensive tasks.

  • I/O-intensive: Generally, tasks in which the threads are idle like Database operations and file operations are called I/O-intensive tasks

  • If the nature of the tasks is I/O-intensive, then CPU will be idle most of the time. To utilize the CPU time effectively, we can increase the number of threads, perform a context switch and execute other tasks in the meantime.

Tasks Nature (Majority)No. of Threads can be
CPU-intensiveLess
I/O-intensiveMore

Concurrency Requriement

Seeking high, medium, or low levels of concurrency

Memory Required to process a request

Example: If single POST request comes in and it requires 10 MB for processing, such as loading data from database and other miscellaneous data, all of this will be stored in the heap allocated to the process. However, heap space is limited. If we have 100 threads and each thread simultaneously accept the POST request, then the memory that the JVM would need to process is 100 * 10 MB, which equals 1000MB or nearly 1GB.

Hence, the memory required to process a request also matters.

Throughput

  • How fast we can process the requests.

Note:

Its an iterative process to update the min and max values based on monitoring.

Formula to find maximum no of threads

Max No of threads = No. of CPU Cores * (1 + Request waiting time/ processing time)

Example:

No. of CPU Cores = 64

Request waiting time = 50ms

Processing time = 100ms

Max no of threads = 64 + (1 + 50/100) = 64 (approx)

But this formula. do not consider Memory yet, which need to be consider.

JVM : 2 GB
( Heap space : 1 Gb
  Code Cache space: 128MB
  per Thread space: 5MB * No. of threads (includes Thread Stack space)
  JVM overhead: 356MB
)

/*
1000MB + 128MB + x + 356MB = 2000MB
x = 500MB
500MB left after removing all the necessary memory
*/
  • Assume per request requries 10MB of space to fulfill the request.

  • The memory space left for threads creation after excluding all the heap space, cache and overhead memories is 500MB

  • We can create maximum 100 threads because each thread requires only 5MB. So 500MB/5MB, which equals 100.

  • If 100 threads are created and each thread process 1 request then the memory required to process is 100*10MB, which equals 1GB (approx) this should store in heap memory. So this requires full heap space. The heap would be full that is risky

  • We can put some restriction like we can complelely use 60% of the heap. That is total 600MB can be used. So total 60 requests can be processed implies 60 threads maximum can be created and safe to use.

So minimum 60 and maximum 64/70.

Now, we perform iterative monitoring. We need to conduct load testing on these numbers and verify whether the minimum and maximum numbers are acceptable. If they are not, we optimize them.