27 Java - Multithreading 4 (Atomic Variables)

Concurrency Types

Concurrency can be achieved using 2 ways

Lock based Mechanism

  • Synchronized

  • Reentrant

  • Stamped

  • ReadWrite

  • Semaphores

Lock Free Mechanism

CAS Operation (Compare-and Swap)

  • AtomicInteger

  • AtomicBoolean

  • AtomicLong

  • AtomicReference

Note:

  • Lock Free Mechanism is not an alternative to Lock Based Mechanism.

  • A lock-based mechanism is used when a lot of business logic is present, and a lock-free mechanism is used for specific business use cases.

Lock Free Mechanism

It uses CAS (Compare and Swap) technique

  • It's a Low level operation - It is supported by CPU.

  • It's Atomic - Irrespective of number of cpu's or threads its supports atomic.

  • All modern CPU Processor supports it.

Compare and Swap (CAS)

It involves 3 main parameters

  • Memory Location: Location where variable is stored.

  • Expected Value: value which should be present at the memory.

    • ABA problem is solved using version or timestamp
  • New Value: value to be written to memory, if the current value matches the expected value.

/*
X -> Memory:M1 -> 10
*/

CAS(Memory, ExpectedValue, NewValue){
    //1 Load/Read variable from Memory M1
    //2 Compare Memory Data vs Expected Value (COMPARE)
        //IF TRUE Update NewValue to this Memory:M1 (SWAP)
}

Note:

  • CAS is similar to Optimistic Concurrency Control because in optimistic low we use Row version as Expected Value.

  • CAS is cpu level operation where as Optimistic concurrency control operates on Db.

ABA problem

If the initial value of M1 is 10 and this is read by one thread, many threads may modify the M1 value after the read. However, during the CAS (Compare-and-Swap) operation, the comparison with the expected value shows them to be the same, hence it updates. In this case, it assumes that the M1 value has not been modified. This is referred to as the ABA issue. ABA means the initially read value is A, it was later modified to B, and then back to A.

The ABA issue is resolved by storing the version of the memory or differentiating using a timestamp.

/*
X -> Memory:M1 -> 10
*/

CAS(Memory:M1, ExpectedValue:10, NewValue:12){
    Read M1
    Compare M1 value with ExpectedValue
    Update If Matched.
}

Atomic Variables

Atomic means Single or "all or nothing"

Example: int counter = 10

This is single operation no matter how many threads executes this one the result is same or consistent.

Atomic Usage: This is only involved when Read, Modify, and Update operations are performed.

  • Read: This is the first step in an atomic operation. It involves reading the current value of a variable from memory.

  • Modified: After reading the current value, the next step is to modify this value. The modification could be any operation like addition, subtraction, etc.

  • Update: The final step is to update the original variable in memory with the new, modified value

Problem

When single thread run the loop for 400 times. No issues.

public class SharedResource {

    int counter;

    public void increment(){
        counter++;
    }

    public int get(){
        return counter;
    }
}
public class Main {

    public static void main(String[] args) {
        SharedResource resource = new SharedResource();

        for (int i=0; i<400; i++){
            resource.increment();
        }

        System.out.println(resource.get());
    }
}
//400

When two threads each run the loop 200 times, the result is different. This is the main problem because the increment operation is not atomic.

public class Main {

    public static void main(String[] args) {
        SharedResource resource = new SharedResource();

        Thread t1 = new Thread(()->{
            for (int i=0; i<200; i++){
                resource.increment();
            }
        });

        Thread t2 = new Thread(()->{
            for (int i=0; i<200; i++)
                resource.increment();
        });

        t1.start();
        t2.start();

        try{
            t1.join();
            t2.join();
        }
        catch (Exception e){

        }

        System.out.println(resource.get());
    }
}
// output - 397

2 Solutions

Lets first understand how the counter++ works.

  1. Load Counter value.

  2. Increment by 1

  3. Assign back to counter.

These 3 operations are not atomic.

When 2 threads simultaneously do the increment operations the counter value is 1 but expectation is 2.

OperationThread 1Thread 2
Load Counter valueCV = 0CV = 0
Increment by 111
Assign back to counterCV = 1CV =1

Using lock like synchronized

public class SharedResource {

    int counter;

    public synchronized void increment(){
        counter++;
    }

    public int get(){
        return counter;
    }
}

Using lock free operation like AtomicInteger

public class SharedResource {

    AtomicInteger counter = new AtomicInteger(0);

    public synchronized void increment(){
        counter.incrementAndGet();
    }

    public int get(){
        return counter.get();
    }
}

Explanation of Atomic Variables

In the AtomicInteger example if we go deep into the incrementAndGet we can see the following CAS type of method.

Explanation of the do-while Loop

do {
    Expected = Read Value from memory
} while (!CAS(memory, expected, newValue))
  • Read Value: The loop reads the current value from memory and stores it in Expected.

  • CAS Operation: The loop continues until the CAS operation is successful.

    • Load Data: Load the current value from memory.

    • Compare: Compare the loaded value with the expected value.

    • Swap: If they match, update the memory with the new value.

Increment Example (CAS)

  • CAS(memory, 0, 1):

    • Load data from memory: value = 0

    • Compare with expected: 0 == 0 (Matched)

      • Update memory: new value = 1
  • Threads T1 and T2 increment same time:

    • Both threads call the increment method simultaneously.

    • Both read the memory at the same time, so expected = 0.

    • One thread updates the memory directly.

    • The other thread continues to execute the while loop until the CAS operation is successful. This is because, for the second thread, the value at the time of reading was 0 (expectedValue). However, when it checks the memory again, the value has been updated to 1 or 2. Therefore, it needs to perform the operation again.

Volatile

  • Atomic operations are thread-safe. This means that they are designed to prevent race conditions between threads.

  • Volatile is not thread safe.

  • Volatile directly updates the value to the memory, bypassing any intermediary caches.

Cache Synchronization

  • Generally, before the CPU writes data into memory, it flows through the L1 and L2 caches. This is part of the memory hierarchy designed to provide high-speed data access to the CPU.

  • If one thread updates the value of x from 10 to 11, this change is not immediately written to memory. Meanwhile, another thread may read the value of x from memory(if x not present in L1/L2), which is still 10, and perform an operation based on this outdated value.

  • To counter this issue, we use the volatile keyword. This ensures that the value is directly read from/written to memory, without storing it in any cache.

  • Generally, if one thread updates x, it stores the change in the L1 cache first, then moves it to the L2 cache, and finally to memory. So, if any thread wants to read x, it first checks the L1 cache, then the L2 cache, and finally the memory.

  • Sometimes, the L1 cache might not get synchronized, or there might be some delay in synchronization. This can lead to threads working with outdated values. The volatile keyword helps prevent such issues by ensuring direct memory access.

Concurrent Collections Lock Mechnisms

CollectionConcurrent CollectionLock
PriorityQueuePriorityBlockingQueueReentrantLock
LinkedListConcurrentLinkedDequeCompare-and-swap operation
ArrayDequeConcurrentLinkedDequeCompare-and-swap operation
ArrayListCopyOnWriteArrayListReentrantLock
HashSetnewKeySet method inside concurrentHashMapSynchronized
TreeSetCollections.synchronizedSortedSetSynchronized
LinkedHashSetCollections.synchronizedSetSynchronized
Queue InterfaceConcurrentLinkedQueueCompare-and-swap operation