Site Search:

Use synchronized keyword and java.util.concurrent.atomic package to control the order of thread execution

Back>

At the end of topic Identify potential threading problems among deadlock, starvation, livelock, and race conditions, we discussed how bad the race conditions are.

Race conditions happen when a thread check then act on shared objects, which is changed by other threads between the check and act. We need to check and act on shared objects atomically, knowing no other threads can change the shared object between the current thread's check and act.

OCPJP tests two strategies: Atomic classes and synchronized blocks.

synchronized and atomic
synchronized and atomic


Atomic classes

ClassDescription
AtomicBoolean
boolean value that may be updated atomically.
AtomicInteger
An int value that may be updated atomically.
AtomicIntegerArray
An int array in which elements may be updated atomically.


AtomicLong
long value that may be updated atomically.
AtomicLongArray
long array in which elements may be updated atomically.

AtomicReference<V>
An object reference that may be updated atomically.
AtomicReferenceArray<E>
An array of object references in which elements may be updated atomically.

The java.util.concurrent.atomic package provides a small toolkit of classes that support lock-free thread-safe programming on single variables. These common atomic methods from AtomicInteger has corresponding pairs in other atomic classes:

Modifier and TypeMethod and Description
int get()
Gets the current value.
voidset(int newValue)
Sets to the given value.


intincrementAndGet()
Atomically increments by one the current value. ++i

intdecrementAndGet()
Atomically decrements by one the current value. --i


intgetAndDecrement()
Atomically decrements by one the current value. i--
intgetAndIncrement()
Atomically increments by one the current value. i++
intgetAndSet(int newValue)
Atomically sets to the given value and returns the old value.

We can use atomic classes to fix the CrazyCounter.java we show before.

OCPJP>cat AtomicCounter.java 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

public class AtomicCounter {
    private final AtomicInteger number = new AtomicInteger(0);
    
    private void checkAndAct() {
        System.out.print(" " + number.getAndIncrement());
    }
    
    public static void main(String...args) {
        AtomicCounter counter = new AtomicCounter();
        ExecutorService exec = Executors.newCachedThreadPool();
        //ExecutorService exec = Executors.newSingleThreadExecutor();
        Stream.iterate(0, x -> x+1).limit(10).forEach(x -> exec.submit(() -> counter.checkAndAct()));
        System.out.println("\n" + exec);
        exec.shutdown();
    }
}
OCPJP>
OCPJP>javac AtomicCounter.java 
OCPJP>java AtomicCounter
 0 4 5 3 2 6 1 7 8 9
java.util.concurrent.ThreadPoolExecutor@5674cd4d[Running, pool size = 8, active threads = 0, queued tasks = 0, completed tasks = 10]
OCPJP>java AtomicCounter
 0 4 3 2 1 6 5 7 8 9
java.util.concurrent.ThreadPoolExecutor@5674cd4d[Running, pool size = 8, active threads = 0, queued tasks = 0, completed tasks = 10]
OCPJP>java AtomicCounter
 0 4 3 2 1 5 6 7 8 9
java.util.concurrent.ThreadPoolExecutor@85ede7b[Running, pool size = 7, active threads = 0, queued tasks = 0, completed tasks = 10]
OCPJP>java AtomicCounter
 0 3 2 1 5 4 6 7 8 9
java.util.concurrent.ThreadPoolExecutor@85ede7b[Running, pool size = 7, active threads = 0, queued tasks = 0, completed tasks = 10]
OCPJP>java AtomicCounter
 0 4 3 2 1 6 5 7 8 9
java.util.concurrent.ThreadPoolExecutor@5674cd4d[Running, pool size = 8, active threads = 0, queued tasks = 0, completed tasks = 10]
OCPJP>for i in {1..100}; do java AtomicCounter | head -n 1 | tr " " "\n" | sort | uniq | wc -l | grep -v 11; done;
OCPJP>


In this code, shared variable number is now AtomicInteger instead of int; number ++ is replaced with atomic method getAndIncrement(). The read variable and increase variable value are now atomic, once threadA enters method getAndIncrement(), other threads can not change the value of the number object before threadA finish incrementing the value and returns. We run the program 100 times, didn't see any missing or duplicate digits.

As a demonstration of the usage of AtomicReferenceArray<E>, we rewrote AtomicCounter.java as AtomicCounters.java, where multiple threads update the elements of AtomicReferenceArray<File> concurrently with the indexes. Since the getAndSet is atomic, we didn't see missing or duplicate file names read/write.

OCPJP>cat AtomicCounters.java 
import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Stream;

public class AtomicCounters {
    private final AtomicReferenceArray<File> stores = new AtomicReferenceArray<>(2);
    private void checkAndAct(int index) {
        System.out.println(Thread.currentThread().getName() + "/" + index + 
                " " + stores.getAndSet(index, new File(Thread.currentThread().getName() + "/" + index)));
    }
    
    public static void main(String[] args) {
        AtomicCounters counters = new AtomicCounters();
        ExecutorService exec = Executors.newCachedThreadPool();
        Stream.iterate(0, x -> x+1).limit(10).forEach(i -> exec.submit(() -> counters.checkAndAct(i % 2)));
        System.out.println("\n" + exec);
        exec.shutdown();
    }
}

OCPJP>javac AtomicCounters.java 
OCPJP>java AtomicCounters
pool-1-thread-1/0 pool-1-thread-3/0
pool-1-thread-6/1 pool-1-thread-4/1
pool-1-thread-5/0 pool-1-thread-1/0
pool-1-thread-3/0 null
pool-1-thread-4/1 pool-1-thread-2/1
pool-1-thread-8/1 pool-1-thread-6/1
pool-1-thread-2/1 null
pool-1-thread-7/0 pool-1-thread-5/0
pool-1-thread-9/0 pool-1-thread-7/0
pool-1-thread-8/1 pool-1-thread-8/1

java.util.concurrent.ThreadPoolExecutor@63961c42[Running, pool size = 9, active threads = 0, queued tasks = 0, completed tasks = 10]
OCPJP>java AtomicCounters
pool-1-thread-1/0 null
pool-1-thread-6/1 pool-1-thread-4/1
pool-1-thread-5/0 pool-1-thread-3/0
pool-1-thread-3/0 pool-1-thread-1/0
pool-1-thread-7/0 pool-1-thread-5/0
pool-1-thread-2/1 null
pool-1-thread-8/1 pool-1-thread-6/1
pool-1-thread-4/1 pool-1-thread-2/1
pool-1-thread-4/1 pool-1-thread-8/1
pool-1-thread-7/0 pool-1-thread-7/0

java.util.concurrent.ThreadPoolExecutor@5674cd4d[Running, pool size = 8, active threads = 0, queued tasks = 0, completed tasks = 10]
OCPJP>java AtomicCounters
pool-1-thread-2/1 null
pool-1-thread-6/1 pool-1-thread-4/1
pool-1-thread-5/0 pool-1-thread-1/0
pool-1-thread-4/1 pool-1-thread-2/1
pool-1-thread-1/0 pool-1-thread-3/0
pool-1-thread-3/0 null
pool-1-thread-9/0 pool-1-thread-7/0
pool-1-thread-8/1 pool-1-thread-6/1
pool-1-thread-7/0 pool-1-thread-5/0
pool-1-thread-4/1 pool-1-thread-8/1

java.util.concurrent.ThreadPoolExecutor@63961c42[Running, pool size = 9, active threads = 0, queued tasks = 0, completed tasks = 10]
OCPJP>


The other way to avoid check then act is synchronized keyword.
In java, any object can be used as a lock, along with the synchronized keyword to support mutual exclusion so that at most one thread can execute a particular code block at a given time.

You can synchronize a code block or a method. As the synchronized version fix of CrazyCounter.java

OCPJP>cat SynchronizedCounter.java 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;

public class SynchronizedCounter {
    private int number = 0;
    
    private synchronized void checkAndAct() {
        System.out.print(" " + number++);
    }
    
    private void checkAndAct2() {
        synchronized(this) {
            System.out.print("-" + number++);
        }
    }
    
    public static void main(String... args) {
        SynchronizedCounter counter = new SynchronizedCounter();
        ExecutorService exec = Executors.newCachedThreadPool();
        // ExecutorService exec = Executors.newSingleThreadExecutor();
        Stream.iterate(0, x -> x + 1).limit(5).forEach(x -> {
            exec.submit(() -> counter.checkAndAct());
            exec.submit(() -> counter.checkAndAct2());
        });
        System.out.println("\n" + exec);
        exec.shutdown();
    }
}
OCPJP>
OCPJP>javac SynchronizedCounter.java 
OCPJP>java SynchronizedCounter
 0-1 2-3 4-5 6-7 8-9
java.util.concurrent.ThreadPoolExecutor@7adf9f5f[Running, pool size = 5, active threads = 0, queued tasks = 0, completed tasks = 10]
OCPJP>java SynchronizedCounter
 0-1 2-3 4-5 6 7-8-9
java.util.concurrent.ThreadPoolExecutor@7adf9f5f[Running, pool size = 5, active threads = 0, queued tasks = 0, completed tasks = 10]
OCPJP>java SynchronizedCounter
 0 1-2-3 4-5 6 7-8-9
java.util.concurrent.ThreadPoolExecutor@5674cd4d[Running, pool size = 7, active threads = 0, queued tasks = 0, completed tasks = 10]
OCPJP>java SynchronizedCounter
 0 1-2-3 4-5 6 7-8-9


As you can see, the check then act on shared variable number is avoided because at a given time, only one thread can hold the lock, (which is "this" or the counter object itself,) therefore able to run the guarded code System.out.print(" " + number++); or System.out.print("-" + number++);

Despite the fact that your computer has more than one cpu cores, the SynchronizedCounter.java code is in fact degraded into single threaded code with synchronized keyword -- most of the life time of a thread is spent on blocking on the lock held by some thread instead of doing work on idle cpu cores.


So synchronized keyword can cause deadlock, synchronized code block put a bottleneck to your program's performance. Luckily, java introduced concurrent collections to hold the shared variables. Threads can access these thread-safe and optimized concurrent class objects without putting a synchronized keyword on a code block. We will discuss these concurrent classes in the next topic.