31 Java - Multithreading 8 (Shutdown | Scheduler | ThreadLocal | Virtual Thread)

Executors Important Methods

Shutdown

  • Initiates orderly shutdown of the ExecutorService

  • AFter calling shutdown, Executor will not accept new task submission.

  • Already submitted tasks, will continue to execute

AwaitTermination

  • It's an optional functionality. Return true/false.

  • It is used after calling 'shutdown method

  • Blocks calling thread for specific timeout period, and wait for ExecutorService shutdown.

  • Return true, if ExecutorService gets shutdown with in specific timeout else false.

ShutdownNow

  • Best effort attempt to stop/interrupt the actively executing tasks.

  • Halt the processing of tasks which are waiting.

  • Return the list of tasks which are awaiting execution.

Important Scenarios

Task submissision after shutdown

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService poolObj = Executors.newFixedThreadPool(5);

        poolObj.submit(()->{
            System.out.println("Thread going to start its work");
        });

        poolObj.shutdown();

        poolObj.submit(()->{
            System.out.println("Thread going to start its work");
        });
    }
}
Thread going to start its work
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@378bf509[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@6d03e736[Wrapped task = learn.Multithreading.ThreadPool.Main$$Lambda$16/0x000001d8b3001418@568db2f2]] rejected from java.util.concurrent.ThreadPoolExecutor@5fd0d5ae[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
    at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:123)
    at learn.Multithreading.ThreadPool.Main.main(Main.java:15)

Shutdown do not impact the already submitted task

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService poolObj = Executors.newFixedThreadPool(5);

        poolObj.submit(()->{
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                //handle exception
            }
            System.out.println("new task");
        });

        poolObj.shutdown();
        System.out.println("Main thread unblocked and finished processing");
    }
}
Main thread unblocked and finished processing
new task

Usage of awaitTermination

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService poolObj = Executors.newFixedThreadPool(5);

        poolObj.submit(()->{
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                //handle exception
            }
            System.out.println("new task");
        });

        poolObj.shutdown();

        boolean isExecutorTerminated = poolObj.awaitTermination(3, TimeUnit.SECONDS);

        System.out.println("Main thread, isExecutorTerminated: "+ isExecutorTerminated);

    }
}
Main thread, isExecutorTerminated: false
new task

ScheduledThreadPoolExecutor

  • Helps to scheule the tasks

  • It extends from ThreadPoolExecutor class.

Methods

schedule(Runnable command, long delay, TimeUnit unit)

  • Schedules a Runnable task after specific delay.

  • Only one time task runs.

ScheduledExecutorService poolObj = Executors.newScheduledThreadPool(5);

poolObj.schedule(() -> {
    System.out.println("Task 1 executed after 5 seconds");
}, 5, TimeUnit.SECONDS);


poolObj.shutdown();

schedule(Callable<V> callable, long delay, TimeUnit unit)

  • Schedules a Callable task after specific delay.

  • Only one time tasks runs.

ScheduledExecutorService poolObj = Executors.newScheduledThreadPool(5);

ScheduledFuture<String> futureObj = poolObj.schedule(() -> {
    return "Hello";
}, 5, TimeUnit.SECONDS);

System.out.println(futureObj.get());
//Hello

scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

  • Schedules a Runnable task for repeated execution with fixed rate.

  • We an use cancel method to stop this repeated task.

  • Also lets say, if thread1 is taking too much time to complete the task and next is ready to run, till previous task will not get completed, new task can not be start (it will wait in queue).

Example 1: Repeating the task with an initial delay of 3 seconds with a time period of 5 seconds.

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ScheduledExecutorService poolObj = Executors.newScheduledThreadPool(5);

        ScheduledFuture<?> futureObj = poolObj.scheduleAtFixedRate(() -> {
            System.out.println("Hello World");
        }, 3, 5, TimeUnit.SECONDS);


        Thread.sleep(10000);
        futureObj.cancel(true);

        poolObj.shutdown();

    }
}

Example 2:

In this scenario, a task takes a time of 6 seconds, and the task is repeated every 3 seconds. The time period of the counter is independent of task duration. This means that the timer starts immediately when the task is triggered and attempts to initiates the task again after 3 seconds. If the pervious task has not yet finished, it will wait until it is completed and then immediately triggers the same task.

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ScheduledExecutorService poolObj = Executors.newScheduledThreadPool(5);

        ScheduledFuture<?> futureObj = poolObj.scheduleAtFixedRate(() -> {
            System.out.println("Thread picked the task");
            try {
                Thread.sleep(6000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("Task completed");
        }, 1, 3, TimeUnit.SECONDS);


        Thread.sleep(50000);
        futureObj.cancel(true);

        poolObj.shutdown();

    }
}

scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

  • Schedules a Runnable task for repeated execution with a fixed delay.

  • Means next task delay counter start only after previous one task completed.

ScheduledFuture<?> futureObj = poolObj.scheduleWithFixedDelay(() -> {
    System.out.println("Thread picked the task");
    try {
        Thread.sleep(6000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    System.out.println("Task completed");
}, 1, 3, TimeUnit.SECONDS);

ThreadLocal

  • ThreadLocal class provide access to Thread-Local variables.

  • This 'Thread-Local' variable hold the value for particular thread.

  • Means each Thread has its own copy of Thread-Local variable.

  • We need only 1 object of ThreadLocal class and each thread can use it to set and get its own Thread-Local variable.

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ThreadLocal<String> threadLocalObj = new ThreadLocal<>();

        //main thread
        threadLocalObj.set(Thread.currentThread().getName());

        Thread thread1 = new Thread(()->{
            threadLocalObj.set(Thread.currentThread().getName());
            System.out.println("task1");
        });

        thread1.start();
        Thread.sleep(2000);

        System.out.println("Main thread: "+threadLocalObj.get());
    }
}

Clean up ThreadLocals

  • Remember to clean up, if reusing the thread.

  • In ThreadPool we reuse the threads hence we have to clean the ThreadLocal variable.

  • threadLocalObj.remove();

Problem

Here we set the ThreadLocal variable for one of the thread and later when we iterate through each thread by submitting one task and also fetch its ThreadLocal variable value at the end. Here we see for thread 1 it is already set's name whenever it executes a new task.

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ThreadLocal<String> threadLocalObj = new ThreadLocal<>();

        ExecutorService poolObj = Executors.newFixedThreadPool(5);

        poolObj.submit(()->{
            threadLocalObj.set(Thread.currentThread().getName());
        });

        for (int i=1; i<15; i++){
            poolObj.submit(()->{
                System.out.println(threadLocalObj.get());
            });
        }
        poolObj.shutdown();
    }
}
null
null
null
pool-1-thread-1
pool-1-thread-1
null
null
null
null
null
null
null
pool-1-thread-1
null

Solution

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ThreadLocal<String> threadLocalObj = new ThreadLocal<>();

        ExecutorService poolObj = Executors.newFixedThreadPool(5);

        poolObj.submit(()->{
            threadLocalObj.set(Thread.currentThread().getName());
            threadLocalObj.remove();
        });

        for (int i=1; i<15; i++){
            poolObj.submit(()->{
                System.out.println(threadLocalObj.get());
            });
        }
        poolObj.shutdown();
    }
}
null
null
null
null
null
null
null
null
null
null
null
null
null
null

Virtual Thread vs Platform Thread (Normal)

Platform Thread

  • It is also called as Normal Thread

  • The following commands instruct the JVM to create native or OS threads. Thus, if we create 10 platform threads, it ultimately results in the creation of 10 OS threads. The platform is managed by the JVM, so a platform thread acts like a wrapper class for an OS thread.

  •     Thread t1 = new Thread(...)
        t1.start()
    
  • A thread will have a register, counter, and stack. These are all properties of the OS thread.

Disadvantages

  • he creation of an OS thread is slow because the JVM delegates the system to the OS for thread creation.

  • Let’s assume that a thread requires some information from a database and the database call takes around 4 seconds. Since the query takes a long time, the thread will be in a waiting state. Due to the 1-to-1 mapping, the OS thread is also in a waiting state. Consequently, the OS thread is idle and not performing any tasks. This is due to the direct linkage between the platform thread and the OS thread.

Virtual Thread

  • The motto of a Virtual Thread is to achieve higher throughput, not latency.

  • It is available from JDK 19.

  • We can create numerous objects of Virtual Threads.

  • If there is any virtual thread which is not in a waiting state, then it will be attached to an OS thread.

  • If any virtual thread goes into a waiting state, then the JVM removes the connection between this thread and the OS thread. Since one OS thread is free, it will attach one Virtual Thread to the freed OS thread. This is maintained by the JVM.

  • Virtual Thread is backward compatible. This means that whatever functionality the platform thread supported can also be supported by the virtual thread.

Thread th1 = Thread.ofVirtual().start(RunnableTask);

Or

ExecutorService myExecutorObj = Executors.newVirtualThreadPerTaskExecutor();
myExecutorObj.submit(RunnableTask);

Visualization of Virtual Thread