26 Java - Multithreading 3 (Custom Locks)

Monitor Lock

  • Locking do not depends on Object like synchronized method.

  • synchronized method puts the monitor lock on the object

synchronized {
...
..Critical Section..
...
}

Example (Synchronized)

public class SharedResource {

    boolean isAvailable = false;

    public synchronized void producer(){
        try{
            System.out.println("Lock acquired by: "+Thread.currentThread().getName());
            isAvailable = true;
            Thread.sleep(4000);
        } catch (Exception e){

        }
        System.out.println("Lock release by: "+ Thread.currentThread().getName());
    }
}
public class Main {

    public static void main(String[] args) {
        SharedResource resource1 = new SharedResource();
        Thread th1 = new Thread(()->{
            resource1.producer();
        });

        SharedResource resource2 = new SharedResource();
        Thread th2 = new Thread(()->{
            resource2.producer();
        });

        th1.start();
        th2.start();
    }
}
Lock acquired by: Thread-1
Lock acquired by: Thread-0
Lock release by: Thread-1
Lock release by: Thread-0

Here each thread can access the critical section because each object has separate critical section.

Only one thread should access the critcal section irrespective of objects.

Custom Locks

  • Reenrant

  • ReadWrite

  • Semaphore

  • Stamp

Reentrant Lock

public class SharedResource {

    boolean isAvailable = false;


    public void producer(ReentrantLock lock){
        try{
            lock.lock();
            System.out.println("Lock acquired by: "+Thread.currentThread().getName());
            isAvailable = true;
            Thread.sleep(4000);
        } catch (Exception e){

        }
        finally {
            lock.unlock();
            System.out.println("Lock release by: "+ Thread.currentThread().getName());
        }
    }
}
import java.util.concurrent.locks.ReentrantLock;

public class Main {

    public static void main(String[] args) {

        ReentrantLock lock = new ReentrantLock();

        SharedResource resource1 = new SharedResource();
        Thread th1 = new Thread(()->{
            resource1.producer(lock);
        });

        SharedResource resource2 = new SharedResource();
        Thread th2 = new Thread(()->{
            resource2.producer(lock);
        });

        th1.start();
        th2.start();
    }
}
Lock acquired by: Thread-0
Lock release by: Thread-0
Lock acquired by: Thread-1
Lock release by: Thread-1

ReadWriteLock

Pessimistic Locks

ReadWriteLock implements the following 2 pessimistic locks:

  • Shared (S) Lock: Any number of threads can put a shared lock on a resource for read-only purposes. If one thread puts a shared lock on a resource, no other thread can put an exclusive lock on it until the shared lock is released.

  • Exclusive (X) Lock: This lock is used for writing purposes. If any thread puts an exclusive lock on a resource, no other thread can put either a shared or an exclusive lock on it until the exclusive lock is released.

T2/T1SX
SPossibleNot Possible
XNot PossibleNot Possible
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;

public class SharedResource {

    boolean isAvailable = false;

    public void producer(ReadWriteLock lock){
        try{
            lock.readLock().lock();
            System.out.println("Lock acquired by: "+Thread.currentThread().getName());
            Thread.sleep(4000);
        } catch (Exception e){

        }
        finally {
            lock.readLock().unlock();
            System.out.println("Lock release by: "+ Thread.currentThread().getName());
        }
    }

    public void consumer(ReadWriteLock lock){
        try{
            lock.writeLock().lock();
            System.out.println("Lock acquired by: "+Thread.currentThread().getName());
            isAvailable = false;
            Thread.sleep(4000);
        } catch (Exception e){

        }
        finally {
            lock.writeLock().unlock();
            System.out.println("Lock release by: "+ Thread.currentThread().getName());
        }
    }
}
public class Main {

    public static void main(String[] args) {

        ReadWriteLock lock = new ReentrantReadWriteLock();

        SharedResource resource1 = new SharedResource();
        Thread th1 = new Thread(()->{
            resource1.producer(lock);
        });

        SharedResource resource2 = new SharedResource();
        Thread th2 = new Thread(()->{
            resource2.producer(lock);
        });


        SharedResource resource3 = new SharedResource();
        Thread th3 = new Thread(()->{
            resource3.consumer(lock);
        });


        th1.start();
        th2.start();
        th3.start();
    }
}
Lock acquired by: Thread-1
Lock release by: Thread-1
Lock acquired by: Thread-2
Lock acquired by: Thread-0
Lock release by: Thread-2
Lock release by: Thread-0

StampedLock

It provides 2 locks

  • Read/Write Lock

  • Optimistic Read

Locks are of 2 types

  • Pessimistic Locks: All locks like Shared (S) and Exclusive locks(X)

  • Optimistic Locks: There is no Lock acquired.

Stamped Read/Write Lock

  • Support Read/Write Lock functionality like ReadWriteLock
import java.util.concurrent.locks.StampedLock;

public class SharedResource {

    int a = 10;

    StampedLock lock = new StampedLock();

    public void producer(){
        long stamp = lock.readLock();

        try{
            System.out.println("Read Lock acquired by: "+Thread.currentThread().getName());
            a = 11;
            Thread.sleep(6000);
        }catch (Exception e){

        }
        finally {
            lock.unlockRead(stamp);
            System.out.println("Read Lock release by: "+ Thread.currentThread().getName());
        }
    }

    public void consumer(){
        long stamp = lock.writeLock();
        System.out.println("write lock acquired by : "+ Thread.currentThread().getName());
        try{
            System.out.println("performing work");
            a = 9;
        }
        finally {
            lock.unlockWrite(stamp);
            System.out.println("write lock released by : "+Thread.currentThread().getName());
        }
    }
}
public class Main {

    public static void main(String[] args) {
        SharedResource resource = new SharedResource();

        Thread th1 = new Thread(()->{
            resource.producer();
        });

        Thread th2 = new Thread(()->{
            resource.producer();
        });


        Thread th3 = new Thread(()->{
            resource.consumer();
        });

        th1.start();
        th2.start();
    }
}
Read Lock acquired by: Thread-1
Read Lock release by: Thread-1
Read Lock acquired by: Thread-0
Read Lock release by: Thread-0

Optimistic Read (Example)

Db snapshot

IdNameTypeRow Version
123SJStudent1
456RamStudent1
Time\ThreadThread1Thread2Rowversion
Time 1Read(123) (Rowversion: 1)Read(123) (Rowversion: 1)1
Time 2Perform some operation(:::); Update Type to TeacherPerform some operation(:::); Update Type to Ex-Student1
Time 3update (123) type2update Table set Type='Ex-Student' where rowversion = 1 (Update successful, rowversion incremented)
Time 4update(123) type2Rollback because the rowversion is 1 at the time of read. So update will not happen. Now do from the start (read again)
Time 5Read(123) (Rowversion: 2)2
Time 6Perform some operation(:::); Update Type to Teacher2update Table set Type='Teacher' where rowversion = 2 (Update successful, rowversion incremented)

Optimistic Lock

  • Support Optimistic Lock functionality too
import java.util.concurrent.locks.StampedLock;

public class SharedResource {

    int a = 10;

    StampedLock lock = new StampedLock();

    //Here we applied optimistic read
    public void producer(){
        long stamp = lock.tryOptimisticRead();

        try{
            System.out.println("Taken optimistic read");
            a = 11;
            Thread.sleep(6000);
            if (lock.validate(stamp)){
                System.out.println("updated a value successfully");
            }
            else{
                System.out.println("rollback of work");
                a = 10; //rollback
            }
        }catch (Exception e){

        }
    }

    public void consumer(){
        long stamp = lock.writeLock();
        System.out.println("write lock acquired by : "+ Thread.currentThread().getName());
        try{
            System.out.println("performing work");
            a = 9;
        }
        finally {
            lock.unlockWrite(stamp);
            System.out.println("write lock released by : "+Thread.currentThread().getName());
        }
    }
}
public class Main {

    public static void main(String[] args) {
        SharedResource resource = new SharedResource();

        Thread th1 = new Thread(()->{
            resource.producer();
        });

        Thread th2 = new Thread(()->{
            resource.consumer();
        });

        th1.start();
        th2.start();
    }
}
Taken optimistic read
write lock acquired by : Thread-1
performing work
write lock released by : Thread-1
rollback of work

If no thread 2 then optimistic

Taken optimistic read
updated a value successfully

Semaphore Lock

A semaphore can permit a fixed number of threads to execute the critical section simultaneously.

import java.util.concurrent.Semaphore;

public class SharedResource {

    boolean isAvailable = false;

    Semaphore lock = new Semaphore(2);

    public void producer(){
        try{
            lock.acquire();
            System.out.println("Lock acquired by: "+Thread.currentThread().getName());
            isAvailable = true;
            Thread.sleep(4000);
        }
        catch (Exception e){

        }
        finally {
            lock.release();
            System.out.println("Lock release by: "+ Thread.currentThread().getName());
        }
    }
}
public class Main {

    public static void main(String[] args) {
        SharedResource resource = new SharedResource();

        Thread th1 = new Thread(()->{
            resource.producer();
        });

        Thread th2 = new Thread(()->{
            resource.producer();
        });

        Thread th3 = new Thread(()->{
            resource.producer();
        });


        Thread th4 = new Thread(()->{
            resource.producer();
        });


        th1.start();
        th2.start();
        th3.start();
        th4.start();

    }
}
Lock acquired by: Thread-1
Lock acquired by: Thread-0
Lock release by: Thread-1
Lock release by: Thread-0
Lock acquired by: Thread-2
Lock acquired by: Thread-3
Lock release by: Thread-2
Lock release by: Thread-3

Applications of Semaphore:

  1. Printers: If you have 2 printers, then only 2 processes can use them at a time.

  2. Connection Pool: If the connection pool has a size of 5, then a maximum of 5 threads can connect to it at the same time.

Condition (await & Signal)

Inter-thread communication

  • The synchronized keyword maintains monitor locks, and it also uses wait() and notify() methods to facilitate communication between threads.

The following communications in custom locks are equivalent to those in synchronized communications.

await() = wait()

signal() = notify()

public class SharedResource {

    boolean isAvailable = false;

    ReentrantLock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    public void producer(){
        try{
            lock.lock();
            System.out.println("Produce lock acquired by: "+Thread.currentThread().getName());
            if (isAvailable){
                //already available, thread has to wait for it to consume
                System.out.println("Produce thread is waiting: "+Thread.currentThread().getName());
                condition.await();
            }
            isAvailable = true;
            condition.signal();
        }
        catch (Exception e){

        }
        finally {
            lock.unlock();
            System.out.println("Produce Lock release by: "+ Thread.currentThread().getName());
        }
    }

    public void consume(){
        try{
            Thread.sleep(1000);
            lock.lock();
            System.out.println("Consume lock acquired by: "+Thread.currentThread().getName());

            if (!isAvailable){
                //already not available, thread has to wait for it to produced
                System.out.println("Consumer thread is waiting: "+Thread.currentThread().getName());
                condition.await();
            }
            isAvailable = false;
            condition.signal();
        }
        catch (Exception e){

        }
        finally {
            lock.unlock();
            System.out.println("Consume lock release by: "+Thread.currentThread().getName());
        }
    }
}
public class Main {

    public static void main(String[] args) {
        SharedResource resource = new SharedResource();

        Thread th1 = new Thread(()->{
            for (int i=0;i<2;i++)
                resource.producer();
        });

        Thread th2 = new Thread(()->{
            for (int i=0;i<2;i++)
                resource.consume();
        });


        th1.start();
        th2.start();

    }
}