Java 101: Java concurrency without the pain, Part 2

how-to
Aug 13, 201335 mins
ConcurrencyCore JavaJava

Locking, atomic variables, Fork/Join, and what to expect in Java 8

The Java Concurrency Utilities are high-level concurrency types that facilitate threading tasks especially on multicore systems. Part 1 of this introduction featured java.util.concurrent‘s Executor framework, synchronizer types, and Java Concurrent Collections package. In Part 2, learn how the Java Concurrency Utilities handle locking, atomic variables, and fork/join operations. Then prepare for the future with an overview of seven anticipated changes to the Java Concurrency Utilities coming in Java 8.

Java’s low-level threading capabilities are famously hard to use and error-prone, which is why they’re frequently associated with deadlock, thread starvation, race conditions, and other concurrency bugs. One alternative to losing your sleep and peace of mind is the Java Concurrency Utilities, introduced in JDK 5. The two articles in this short series are dedicated to exploring how Java developers use the packages and libraries in java.util.concurrent to work around common threading bugs and write cleaner, simpler programs.

In Part 1, I explored the Executor framework, synchronizer utilities, and the Java Concurrent Collections package. Part 2 is an in-depth look at the mechanics of java.util.concurrent‘s advanced locking mechanisms and atomic variables, as well as a short tutorial on the Fork/Join framework. I also discuss the new features and performance improvements coming to the Java Concurrency Utilities with Java 8.

See “Modern threading for not-quite-beginners” (January 2013) for an intermediate primer on multithreaded programming and “Java Tip 144: When to use ForkJoinPool vs ExecutorService” (October 2011) for a quick tip on using the Fork/Join framework.

The Locking framework

The Java language lets threads use synchronization to update shared variables safely and ensure that one thread’s updates are visible to other threads. In the Java language, you call synchronization via the synchronized keyword. The Java virtual machine (JVM) supports this mechanism via monitors and the associated monitorenter and monitorexit instructions.

Each Java object is associated with a monitor, which is a mutual exclusion mechanism that prevents multiple threads from concurrently executing in a critical section. Before a thread can enter this section, it must lock the monitor. If the monitor is already locked, the thread blocks until the monitor is unlocked.

Monitors also address the vagaries of memory caching and compiler optimizations that might otherwise prevent one thread from observing another thread’s update of a shared variable. Before a thread leaves the critical section, the monitor ensures that the thread’s updates are immediately visible, so that another thread about to enter the critical section will see those updates.

Synchronization vs volatile

Synchronization supports mutual exclusion and visibility. In contrast, the volatile keyword only supports visibility.

Although adequate for simple applications, Java’s low-level synchronization mechanism can be inconvenient for advanced applications that require additional capabilities such as timed waits and lock polling.

The Locking framework, which is found in java.util.concurrent.locks, addresses these limitations.

Locks

The Lock interface provides more extensive locking operations than it’s possible to obtain via synchronized methods and statements. For instance, you could use Lock to immediately back out of a lock-acquisition attempt if a lock wasn’t available, or you could wait indefinitely to acquire a lock and back out only if interrupted.

Lock declares the following methods:

  • void lock() acquires a lock, disabling the current thread when the lock is not available. The thread remains dormant until the lock becomes available.
  • void lockInterruptibly() is similar to void lock() but allows the disabled thread to be interrupted and resume execution through a thrown java.lang.InterruptedException. (Note that interrupting lock acquisition is not supported in all cases.)
  • Condition newCondition() returns a new Condition instance that’s bound to a given Lock instance. If the Lock implementation doesn’t support conditions, java.lang.UnsupportedOperationException is thrown. (I discuss conditions later in this article.)
  • void lock() acquires a lock, disabling the current thread when the lock is not available. The thread remains dormant until the lock becomes available.
  • boolean tryLock() acquires a lock only when it’s free at the time of invocation. This method returns true when the lock is acquired; otherwise, it returns false.
  • boolean tryLock(long time, TimeUnit unit) is similar to boolean tryLock(); however, it lets you specify an amount of time to wait for the lock to become available. Pass the magnitude of the delay to time and the units represented by this delay to unit. For example, you might pass 2 to time and TimeUnit.SECONDS to unit. (The java.util.concurrent.TimeUnit enum also offers DAYS, HOURS, MICROSECONDS, MILLISECONDS, MINUTES, and NANOSECONDS.) This method throws InterruptedException when the current thread is interrupted while acquiring the lock (in cases where interrupting lock acquisition is supported).
  • void unlock() releases the lock.

It’s important to always release a held lock. The Javadoc for the Lock interface presents the following idiom for locking a lock and ensuring that the lock is always unlocked:

Lock l = ...; // ... is a placeholder for code that obtains the lock
l.lock();
try 
{
  // access the resource protected by this lock
}
catch(Exception ex) 
{
  // restore invariants
}
finally 
{
   l.unlock();
}

Lock‘s Javadoc also discusses the memory synchronization semantics that are expected of Lock implementations. Essentially, Lock implementations must behave as the built-in monitor lock, enforcing mutual exclusion and visibility.

Working with locks

The ReentrantLock class implements Lock and describes a reentrant mutual exclusion lock. The lock is associated with an acquisition count. When a thread holds the lock and re-acquires the lock, the acquisition count is incremented and the lock must be released twice.

ReentrantLock offers the same concurrency and memory semantics as the implicit monitor lock normally accessed using synchronized methods and statements. However, it has extended capabilities and offers better performance under high thread contention (that is, when threads are frequently asking to acquire a lock that is already held by another thread). When many threads attempt to access a shared resource, the JVM spends less time scheduling these threads and more time executing them.

ReentrantLock or synchronized?

ReentrantLock behaves like synchronized and you might wonder when it’s appropriate to use one or the other. Use ReentrantLock when you need timed or interruptible lock waits, non-block-structured locks (obtain a lock in one method; return the lock in another), multiple condition variables, or lock polling. Furthermore, ReentrantLock supports scalability and is useful where there is high contention among threads. If none of these factors come into play, use synchronized.

ReentrantLock declares the following constructors:

  • ReentrantLock() creates a reentrant lock.
  • ReentrantLock(boolean fair) creates a reentrant lock with the given fairness policy. Passing true to fair results in a lock that uses a fair ordering policy, which means that under contention, the lock favors granting access to the longest-waiting thread. The former constructor invokes this constructor, passing false to fair.

ReentrantLock implements Lock‘s methods: its implementation of unlock() throws java.lang.IllegalMonitorStateException when the calling thread doesn’t hold the lock. Additionally, ReentrantLock provides its own methods, including the following trio:

  • int getHoldCount() returns the number of holds on this lock by the current thread: a thread has a hold on a lock for each lock action that isn’t matched by an unlock action. When the lock() method is called and the current thread already holds the lock, the hold count is incremented by one and the method returns immediately.
  • boolean isFair() returns the fairness setting.
  • boolean isHeldByCurrentThread() queries if this lock is held by the current thread, returning true when this is the case. This method is often used for debugging and testing.

Listing 1 is a simple demonstration of ReentrantLock.

Listing 1. LockDemo.java

import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.ReentrantLock;

public class LockDemo
{
   public static void main(String[] args)
   {
      ExecutorService executor = Executors.newFixedThreadPool(2);
      final ReentrantLock rl = new ReentrantLock();
 
      class Worker implements Runnable
      {
         private String name;

         Worker(String name)
         {
            this.name = name;
         }

         @Override
         public void run()
         {
           rl.lock();
           try
           {
              if (rl.isHeldByCurrentThread())
                System.out.printf("Thread %s has entered its critical section.%n", 
                                  name);
              System.out.printf("Thread %s is performing work for 2 seconds.%n", name);
              try
              {
                 Thread.sleep(2000);
              }
              catch (InterruptedException ie)
              {
                 ie.printStackTrace();
              }
              System.out.printf("Thread %s has finished working.%n", name);
           }
           finally
           {
              rl.unlock(); 
           }
         }
      }

      executor.execute(new Worker("A"));
      executor.execute(new Worker("B"));

      try
      {
         executor.awaitTermination(5, TimeUnit.SECONDS);
      }
      catch (InterruptedException ie)
      {
         ie.printStackTrace();
      }
      executor.shutdownNow();
   }
}

Listing 1 creates two worker threads. Each thread first acquires a lock to ensure that it has complete access to the critical section. It then outputs some messages and sleeps for two seconds to simulate work. After outputting another message, it releases the lock.

Compile LockDemo.java and run the application. You should observe output similar to the following:

Thread A has entered its critical section.
Thread A is performing work for 2 seconds.
Thread A has finished working.
Thread B has entered its critical section.
Thread B is performing work for 2 seconds.
Thread B has finished working.

Comment out rl.lock(); and rl.unlock(); and you should observe interleaved output like what is shown below:

Thread A is performing work for 2 seconds.
Thread B is performing work for 2 seconds.
Thread A has finished working.
Thread B has finished working.

Conditions

The Condition interface factors out the java.lang.Object monitor methods (wait(), notify(), and notifyAll()) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where Lock replaces synchronized methods and statements, Condition replaces Object monitor methods.

Condition declares the following methods:

  • void await() forces the current thread to wait until it’s signalled or interrupted.
  • boolean await(long time, TimeUnit unit) forces the current thread to wait until it’s signalled or interrupted, or the specified waiting time elapses.
  • long awaitNanos(long nanosTimeout) forces the current thread to wait until it’s signalled or interrupted, or the specified waiting time elapses.
  • void awaitUninterruptibly() forces the current thread to wait until it’s signalled.
  • boolean awaitUntil(Date deadline) forces the current thread to wait until it’s signalled or interrupted, or the specified deadline elapses.
  • void signal() wakes up one waiting thread.
  • void signalAll() wakes up all waiting threads.

Working with conditions

The classic producer-consumer example nicely demonstrates conditions. In this example, a producer thread repeatedly produces items for consumption by a consumer thread.

The producer thread must not produce a new item until the previously produced item has been consumed. Similarly, the consumer thread must not consume an item that hasn’t been produced. This is known as lockstep synchronization.

Listing 2 demonstrates conditions (and locks) in a producer-consumer context.

Listing 2. CondDemo.java

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class CondDemo
{
   public static void main(String[] args)
   {
      Shared s = new Shared();
      new Producer(s).start();
      new Consumer(s).start();
   }
}

class Shared
{
   // Fields c and available are volatile so that writes to them are visible to 
   // the various threads. Fields lock and condition are final so that they're
   // initial values are visible to the various threads. (The Java memory model 
   // promises that, after a final field has been initialized, any thread will 
   // see the same [correct] value.)

   private volatile char c;
   private volatile boolean available;
   private final Lock lock;
   private final Condition condition;

   Shared()
   {
      c = 'u0000';
      available = false;
      lock = new ReentrantLock();
      condition = lock.newCondition();
   }

   Lock getLock()
   {
      return lock;
   }

   char getSharedChar()
   {
      lock.lock();
      try
      {
         while (!available)
            try
            {
               condition.await();
            }
            catch (InterruptedException ie)
            {
               ie.printStackTrace();
            }
         available = false;
         condition.signal();
      }
      finally
      {
         lock.unlock();
         return c;
      }
   }

   void setSharedChar(char c)
   {
      lock.lock();
      try
      {
         while (available)
            try
            {
               condition.await();
            }
            catch (InterruptedException ie)
            {
               ie.printStackTrace();
            }
         this.c = c;
         available = true;
         condition.signal();
      }
      finally
      {
         lock.unlock();
      }
   }
}

class Producer extends Thread
{
   // l is final because it's initialized on the main thread and accessed on the
   // producer thread.

   private final Lock l;

   // s is final because it's initialized on the main thread and accessed on the
   // producer thread.

   private final Shared s;
   
   Producer(Shared s)
   {
      this.s = s;
      l = s.getLock();
   }

   @Override
   public void run()
   {
      for (char ch = 'A'; ch <= 'Z'; ch++)
      {
         l.lock();
         s.setSharedChar(ch);
         System.out.println(ch + " produced by producer.");
         l.unlock();
      }
   }
}
class Consumer extends Thread
{
   // l is final because it's initialized on the main thread and accessed on the
   // consumer thread.

   private final Lock l;

   // s is final because it's initialized on the main thread and accessed on the
   // consumer thread.

   private final Shared s;

   Consumer(Shared s)
   {
      this.s = s;
      l = s.getLock();
   }

   @Override
   public void run()
   {
      char ch;
      do
      {
         l.lock();
         ch = s.getSharedChar();
         System.out.println(ch + " consumed by consumer.");
         l.unlock();
      }
      while (ch != 'Z');
   }
}

Listing 2 presents four classes: CondDemo, Shared, Producer, and Consumer. CondDemo drives the application, Shared encapsulates the logic for setting and getting a shared variable’s value, Producer describes the producer thread, and Consumer describes the consumer thread.

The mechanics of CondDemo

CondDemo‘s main() method instantiates Shared, Producer, and Consumer. It passes the Shared instance to the Producer and Consumer thread instance constructors and starts these threads.

The Producer and Consumer constructors are invoked on the main thread. Because the Shared instance is also accessed on the producer and consumer threads, it’s necessary for this instance to be visible to them, especially when these threads run on different cores. Within each of Producer and Consumer, I accomplish this task by declaring s final. I could have declared this field volatile, but volatile suggests that there will be further writes to the field and s isn’t supposed to change after being initialized.

Shared‘s constructor creates a lock (lock = new ReentrantLock();) and an associated condition (condition = lock.newCondition();). This lock is made available to the producer and consumer threads via the Lock getLock() method.

The producer thread always invokes the void setSharedChar(char c) method to generate a new character. This method locks the previously created lock object and then enters a while loop that repeatedly tests variable available — this variable is true when a produced character hasn’t yet been consumed.

As long as available is true, the producer invokes the condition’s await() method to wait for available to become false. The consumer will signal the condition to wake up the producer when it has consumed the character. (A loop is used instead of an if statement because spurious wakeups are possible and available might still be true.)

After exiting the loop, the producer records the new character, assigns true to available to indicate that a new character is available for consumption, and signals the condition to wake up a waiting consumer. Lastly, it unlocks the lock and returns from setSharedChar().

Locking controls output order

Why am I locking the get/print and set/print code blocks? Without this locking, you might observe consuming messages before producing messages, even though characters are produced before they’re consumed. Locking these blocks prevents this strange output order.

The behavior of the consumer thread in the char getSharedChar() method is similar.

The mechanics of the producer and consumer threads are simpler. Each run() method first locks the lock, then sets or gets a character and outputs a message, and unlocks the lock. (I didn’t use the try/finally idiom because an exception isn’t thrown from this context.)

Compile CondDemo.java and run the application. You should observe the following output:

A produced by producer.
A consumed by consumer.
B produced by producer.
B consumed by consumer.
C produced by producer.
C consumed by consumer.
D produced by producer.
D consumed by consumer.
E produced by producer.
E consumed by consumer.
F produced by producer.
F consumed by consumer.
G produced by producer.
G consumed by consumer.
H produced by producer.
H consumed by consumer.
I produced by producer.
I consumed by consumer.
J produced by producer.
J consumed by consumer.
K produced by producer.
K consumed by consumer.
L produced by producer.
L consumed by consumer.
M produced by producer.
M consumed by consumer.
N produced by producer.
N consumed by consumer.
O produced by producer.
O consumed by consumer.
P produced by producer.
P consumed by consumer.
Q produced by producer.
Q consumed by consumer.
R produced by producer.
R consumed by consumer.
S produced by producer.
S consumed by consumer.
T produced by producer.
T consumed by consumer.
U produced by producer.
U consumed by consumer.
V produced by producer.
V consumed by consumer.
W produced by producer.
W consumed by consumer.
X produced by producer.
X consumed by consumer.
Y produced by producer.
Y consumed by consumer.
Z produced by producer.
Z consumed by consumer.

Read-write locks

You’ll occasionally encounter a situation where data structures are read more often than they’re modified. The Locking framework has a read-write locking mechanism for these situations that yields both greater concurrency when reading and the safety of exclusive access when writing.

The ReadWriteLock interface maintains a pair of associated locks, one for read-only operations and one for write operations. The read lock may be held simultaneously by multiple reader threads as long as there are no writers. The write lock is exclusive: only a single thread can modify shared data. (The lock that’s associated with the synchronized keyword is also exclusive.)

ReadWriteLock declares the following methods:

  • Lock readLock() returns the lock that’s used for reading.
  • Lock writeLock() returns the lock that’s used for writing.

Working with read-write locks

The ReentrantReadWriteLock class implements ReadWriteLock and describes a read-write lock with similar semantics to a reentrant lock. Like ReentrantLock, ReentrantReadWriteLock declares a pair of constructors:

  • ReentrantReadWriteLock() creates a reentrant read-write lock with default (nonfair) ordering properties.
  • ReentrantReadWriteLock(boolean fair) creates a reentrant read-write lock with the given fairness policy.

ReentrantReadWriteLock implements ReadWriteLock‘s methods and provides additional methods, including the following trio:

  • int getQueueLength() returns an estimate of the number of threads waiting to acquire either the read or write lock.
  • int getReadHoldCount() returns the number of read holds on this lock by the current thread. A reader thread has a hold on a lock for each lock action that is not matched by an unlock action.
  • boolean hasWaiters(Condition condition) returns true when there are threads waiting on the given condition associated with the write lock.

Listing 3 demonstrates ReentrantReadWriteLock.

LIsting 3. RWLockDemo.java

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class RWLockDemo
{
   final static int DELAY = 80;
   final static int NUMITER = 5;

   public static void main(String[] args) 
   {
      final Names names = new Names();

      class NamedThread implements ThreadFactory
      {
         private String name;

         NamedThread(String name)
         {
            this.name = name;
         }

         @Override
         public Thread newThread(Runnable r)
         {
            return new Thread(r, name);
         }
      }

      ExecutorService writer;
      writer = Executors.newSingleThreadExecutor(new NamedThread("writer"));
      Runnable wrunnable = new Runnable()
                           {
                              @Override
                              public void run()
                              {
                                 for (int i = 0; i < NUMITER; i++)
                                 {
                                    names.add(Thread.currentThread().getName(), 
                                              "A" + i);
                                    try
                                    {
                                       Thread.sleep(DELAY);
                                    }
                                    catch (InterruptedException ie)
                                    {
                                    }
                                 }
                              }
                           };
      writer.submit(wrunnable);

      ExecutorService reader1;
      reader1 = Executors.newSingleThreadExecutor(new NamedThread("reader1"));
      ExecutorService reader2;
      reader2 = Executors.newSingleThreadExecutor(new NamedThread("reader2"));
      Runnable rrunnable = new Runnable()
                           {
                              @Override
                              public void run()
                              {
                                 for (int i = 0; i < NUMITER; i++)
                                    names.dump(Thread.currentThread().getName());
                              }
                           };
      reader1.submit(rrunnable);
      reader2.submit(rrunnable);

      reader1.shutdown();
      reader2.shutdown();
      writer.shutdown();
   }
}

class Names 
{
   private final List<String> names;
 
   private final ReentrantReadWriteLock lock;
   private final Lock readLock, writeLock;
 
   Names()
   {
      names = new ArrayList<>();
      lock = new ReentrantReadWriteLock();
      readLock = lock.readLock();
      writeLock = lock.writeLock();
   }
 
   void add(String threadName, String name)
   {
      writeLock.lock();
      try 
      {
         System.out.printf("%s: num waiting threads = %d%n", 
                           threadName, lock.getQueueLength());
         names.add(name);
      } 
      finally 
      {
         writeLock.unlock();
      }
   }

   void dump(String threadName)
   {
      readLock.lock();
      try
      {
         System.out.printf("%s: num waiting threads = %d%n",
                           threadName, lock.getQueueLength());
         Iterator<String> iter = names.iterator();
         while (iter.hasNext())
         {
            System.out.printf("%s: %s%n", threadName, iter.next());
            try
            {
               Thread.sleep((int)(Math.random()*100));
            }
            catch (InterruptedException ie)
            {
            }
         }
      }
      finally
      {
         readLock.unlock();
      }
   }
}

Listing 3 describes an application where a writer thread appends names to a list of names and a pair of reader threads repeatedly dump this list to the standard output.

The mechanics of RWLockDemo

The main() method first instantiates the Names class, which stores the list of names and provides methods for adding names to and dumping the list. It then declares NamedThread.

NamedThread is a local class that is subsequently used in an executor context to provide a name for the executor’s thread. It implements the java.util.concurrent.ThreadFactory interface and its Thread newThread(Runnable r) method, which returns a new thread whose name was previously passed to the NamedThread(String name) constructor.

Next, main() invokes java.util.concurrent.Executors‘s ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) method to create an executor for the writer thread. The name is obtained from the NamedThread instance.

A runnable for the writer thread is then created and submitted to the executor. The runnable repeatedly creates and adds a name to the list of names, and then delays for a short amount of time to give the reader threads a chance to run.

A pair of executors for the reader threads are now created along with a shared runnable for repeatedly dumping the names list. This runnable is submitted to each of the reader executors.

Lastly, main() invokes shutdown() on each executor to initiate an orderly shutdown of the executor as soon as it finishes.

About Names

Names is a simple class that demonstrates read-write locks. It first declares list-of-names, reentrant read-write lock, read-lock, and write-lock fields followed by a constructor that initializes these fields.

The void add(String threadName, String name) method is invoked by the writer thread to add a new name. The threadName argument is used to identify the writer thread (perhaps we might want to add more writer threads) and the name argument identifies the name to be added to the list.

This method first executes writeLock.lock(); to acquire the write lock and then outputs the number of threads waiting to acquire the read (0 to 2) or write (1) lock. After adding the name to the list, it executes writeLock.unlock(); to release the write lock.

The void dump(String threadName) method is similar to add() except for iterating over the list of names, outputting each name, and sleeping for a random amount of time.

Execute javac RWLockDemo.java to compile Listing 3. Then execute java RWLockDemo to run the application. On a Windows 7 platform, I observe something like the following output:

writer: num waiting threads = 0
reader1: num waiting threads = 1
reader2: num waiting threads = 0
reader2: A0
reader1: A0
reader2: num waiting threads = 0
reader2: A0
writer: num waiting threads = 1
reader2: num waiting threads = 1
reader2: A0
reader1: num waiting threads = 0
reader1: A0
reader2: A1
reader1: A1
writer: num waiting threads = 2
reader2: num waiting threads = 1
reader2: A0
reader1: num waiting threads = 0
reader1: A0
reader2: A1
reader1: A1
reader1: A2
reader2: A2
writer: num waiting threads = 2
reader2: num waiting threads = 1
reader2: A0
reader1: num waiting threads = 0
reader1: A0
reader2: A1
reader1: A1
reader2: A2
reader1: A2
reader2: A3
reader1: A3
writer: num waiting threads = 0
reader1: num waiting threads = 0
reader1: A0
reader1: A1
reader1: A2
reader1: A3
reader1: A4

The interleaved output for the reader threads demonstrates that the read lock may be held simultaneously by multiple reader threads.

Atomic variables

Multithreaded applications that run on multicore processors or multiprocessor systems can achieve good hardware utilization and be highly scalable. They can achieve these ends by having their threads spend most of their time performing work rather than waiting for work to accomplish, or waiting to acquire locks in order to access shared data structures.

However, Java’s traditional synchronization mechanism, which enforces mutual exclusion (the thread holding the lock that guards a set of variables has exclusive access to them) and visibility (changes to the guarded variables become visible to other threads that subsequently acquire the lock), impacts hardware utilization and scalability, as follows:

  • Contended synchronization (multiple threads constantly competing for a lock) is expensive and throughput suffers as a result. A major reason for the expense is the frequent context switching that takes place; a context switch operation can take many processor cycles to complete. In contrast, uncontended synchronization is inexpensive on modern JVMs.
  • When a thread holding a lock is delayed (e.g., because of a scheduling delay), no thread that requires that lock makes any progress, and the hardware isn’t utilized as well as it otherwise might be.

You might think that you can use volatile as a synchronization alternative. However, volatile variables only solve the visibility problem. They cannot be used to safely implement the atomic read-modify-write sequences that are necessary for safely implementing counters and other entities that require mutual exclusion.

Java 5 introduced a synchronization alternative that offers mutual exclusion combined with the performance of volatile. This atomic variable alternative is based on a microprocessor’s compare-and-swap instruction and largely consists of the types in the java.util.concurrent.atomic package.

Understanding compare-and-swap

The compare-and-swap (CAS) instruction is an uninterruptible instruction that reads a memory location, compares the read value with an expected value, and stores a new value in the memory location when the read value matches the expected value. Otherwise, nothing is done. The actual microprocessor instruction may differ somewhat (e.g., return true if CAS succeeded or false otherwise instead of the read value).

Microprocessor CAS instructions

Modern microprocessors offer some kind of CAS instruction. For example, Intel microprocessors offer the cmpxchg family of instructions, whereas PowerPC microprocessors offer load-link (e.g., lwarx) and store-conditional (e.g., stwcx) instructions for the same purpose.

CAS makes it possible to support atomic read-modify-write sequences. You would typically use CAS as follows:

  1. Read value v from address X.
  2. Perform a multistep computation to derive a new value v2.
  3. Use CAS to change the value of X from v to v2. CAS succeeds when X’s value hasn’t changed while performing these steps.

To see how CAS offers better performance (and scalability) over synchronization, consider a counter example that lets you read its current value and increment the counter. The following class implements a counter based on synchronized:

Listing 4. Counter.java (version 1)

public class Counter 
{
   private int value;

   public synchronized int getValue() 
   { 
      return value; 
   }

   public synchronized int increment() 
   { 
      return ++value; 
   }
}

High contention for the monitor lock will result in excessive context switching that can delay all of the threads and result in an application that doesn’t scale well.

The CAS alternative requires an implementation of the compare-and-swap instruction. The following class emulates CAS. It uses synchronized instead of the actual hardware instruction to simplify the code:

Listing 5. EmulatedCAS.java

public class EmulatedCAS
{
   private int value;

   public synchronized int getValue()
   { 
      return value; 
   }

   public synchronized int compareAndSwap(int expectedValue, int newValue) 
   {
      int readValue = value;
      if (readValue == expectedValue)
         value = newValue;
      return readValue;
   }
}

Here, value identifies a memory location, which can be retrieved by getValue(). Also, compareAndSwap() implements the CAS algorithm.

The following class uses EmulatedCAS to implement a non-synchronized counter (pretend that EmulatedCAS doesn’t require synchronized):

Listing 6. Counter.java (version 2)

public class Counter 
{
   private EmulatedCAS value = new EmulatedCAS();

   public int getValue() 
   {
      return value.getValue();
   }

   public int increment() 
   {
      int readValue = value.getValue();
      while (value.compareAndSwap(readValue, readValue+1) != readValue)
         readValue = value.getValue();
      return readValue+1;
   }
}

Counter encapsulates an EmulatedCAS instance and declares methods for retrieving and incrementing a counter value with help from this instance. getValue() retrieves the instance’s “current counter value” and increment() safely increments the counter value.

increment() repeatedly invokes compareAndSwap() until readValue‘s value doesn’t change. It’s then free to change this value. When no lock is involved, contention is avoided along with excessive context switching. Performance improves and the code is more scalable.

ReentrantLock and CAS

You previously learned that ReentrantLock offers better performance than synchronized under high thread contention. To boost performance, ReentrantLock‘s synchronization is managed by a subclass of the abstract java.util.concurrent.locks.AbstractQueuedSynchronizer class. In turn, this class leverages the undocumented sun.misc.Unsafe class and its compareAndSwapInt() CAS method.

Exploring the atomic variables package

You don’t have to implement compareAndSwap() via the nonportable Java Native Interface. Instead, Java 5 offers this support via java.util.concurrent.atomic: a toolkit of classes used for lock-free, thread-safe programming on single variables.

According to java.util.concurrent.atomic‘s Javadoc, these classes

extend the notion of volatile values, fields, and array elements to those that also provide an atomic conditional update operation of the form boolean compareAndSet(expectedValue, updateValue). This method (which varies in argument types across different classes) atomically sets a variable to the updateValue if it currently holds the expectedValue, reporting true on success.

This package offers classes for Boolean (AtomicBoolean), integer (AtomicInteger), long integer (AtomicLong) and reference (AtomicReference) types. It also offers array versions of integer, long integer, and reference (AtomicIntegerArray, AtomicLongArray, and AtomicReferenceArray), markable and stamped reference classes for atomically updating a pair of values (AtomicMarkableReference and AtomicStampedReference), and more.

Implementing compareAndSet()

Java implements compareAndSet() via the fastest available native construct (e.g., cmpxchg or load-link/store-conditional) or (in the worst case) spin locks.

Consider AtomicInteger, which lets you update an int value atomically. We can use this class to implement the counter shown in Listing 6. Listing 7 presents the equivalent source code.

Listing 7. Counter.java (version 3)

import java.util.concurrent.atomic.AtomicInteger;

public class Counter 
{
   private AtomicInteger value = new AtomicInteger();

   public int getValue() 
   {
      return value.get();
   }

   public int increment() 
   {
      int readValue = value.get();
      while (!value.compareAndSet(readValue, readValue+1))
         readValue = value.get();
      return readValue+1;
   }
}

Listing 7 is very similar to Listing 6 except that it replaces EmulatedCAS with AtomicInteger. Incidentally, you can simplify increment() because AtomicInteger supplies its own int getAndIncrement() method (and similar methods).

Fork/Join framework

Computer hardware has evolved significantly since Java’s debut in 1995. Back in the day, single-processor systems dominated the computing landscape and Java’s synchronization primitives, such as synchronized and volatile, as well as its threading library (the Thread class, for example) were generally adequate.

Multiprocessor systems became cheaper and developers found themselves needing to create Java applications that effectively exploited the hardware parallelism that these systems offered. However, they soon discovered that Java’s low-level threading primitives and library were very difficult to use in this context, and the resulting solutions were often riddled with errors.

What is parallelism?

Parallelism is the simultaneous execution of multiple threads/tasks via some combination of multiple processors and processor cores.

The Java Concurrency Utilities framework simplifies the development of these applications; however, the utilities offered by this framework do not scale to thousands of processors or processor cores. In our many-core era, we need a solution for achieving a finer-grained parallelism, or we risk keeping processors idle even when there is lots of work for them to handle.

Professor Doug Lea presented a solution to this problem in his paper introducing the idea for a Java-based fork/join framework. Lea describes a framework that supports “a style of parallel programming in which problems are solved by (recursively) splitting them into subtasks that are solved in parallel.” The Fork/Join framework was eventually included in Java 7.

Overview of the Fork/Join framework

The Fork/Join framework is based on a special executor service for running a special kind of task. It consists of the following types that are located in the java.util.concurrent package:

  • ForkJoinPool: an ExecutorService implementation that runs ForkJoinTasks. ForkJoinPool provides task-submission methods, such as void execute(ForkJoinTask<?> task), along with management and monitoring methods, such as int getParallelism() and long getStealCount().
  • ForkJoinTask: an abstract base class for tasks that run within a ForkJoinPool context. ForkJoinTask describes thread-like entities that have a much lighter weight than normal threads. Many tasks and subtasks can be hosted by very few actual threads in a ForkJoinPool instance.
  • ForkJoinWorkerThread: a class that describes a thread managed by a ForkJoinPool instance. ForkJoinWorkerThread is responsible for executing ForkJoinTasks.
  • RecursiveAction: an abstract class that describes a recursive resultless ForkJoinTask.
  • RecursiveTask: an abstract class that describes a recursive result-bearing ForkJoinTask.

The ForkJoinPool executor service is the entry-point for submitting tasks that are typically described by subclasses of RecursiveAction or RecursiveTask. Behind the scenes, the task is divided into smaller tasks that are forked (distributed among different threads for execution) from the pool. A task waits until joined (its subtasks finish so that results can be combined).

ForkJoinPool manages a pool of worker threads, where each worker thread has its own double-ended work queue (deque). When a task forks a new subtask, the thread pushes the subtask onto the head of its deque. When a task tries to join with another task that hasn’t finished, the thread pops another task off the head of its deque and executes the task. If the thread’s deque is empty, it tries to steal another task from the tail of another thread’s deque. This work stealing behavior maximizes throughput while minimizing contention.

Using the Fork/Join framework

Fork/Join was designed to efficiently execute divide-and-conquer algorithms, which recursively divide problems into sub-problems until they are simple enough to solve directly; for example, a merge sort. The solutions to these sub-problems are combined to provide a solution to the original problem. Each sub-problem can be executed independently on a different processor or core.

Lea’s paper presents the following pseudocode to describe the divide-and-conquer behavior:

Result solve(Problem problem) 
{
   if (problem is small)
      directly solve problem
   else 
   {
      split problem into independent parts
      fork new subtasks to solve each part
      join all subtasks
      compose result from subresults
   }
}

The pseudocode presents a solve method that’s called with some problem to solve and which returns a Result that contains the problem‘s solution. If the problem is too small to solve via parallelism, it’s solved directly. (The overhead of using parallelism on a small problem exceeds any gained benefit.) Otherwise, the problem is divided into subtasks: each subtask independently focuses on part of the problem.

Operation fork launches a new fork/join subtask that will execute in parallel with other subtasks. Operation join delays the current task until the forked subtask finishes. At some point, the problem will be small enough to be executed sequentially, and its result will be combined along with other subresults to achieve an overall solution that’s returned to the caller.

The Javadoc for the RecursiveAction and RecursiveTask classes presents several divide-and-conquer algorithm examples implemented as fork/join tasks. For RecursiveAction the examples sort an array of long integers, increment each element in an array, and sum the squares of each element in an array of doubles. RecursiveTask‘s solitary example computes a Fibonacci number.

Listing 8 presents an application that demonstrates the sorting example in non-fork/join as well as fork/join contexts. It also presents some timing information to contrast the sorting speeds.

Listing 8. ForkJoinDemo.java

import java.util.Arrays;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class ForkJoinDemo
{
   static class SortTask extends RecursiveAction 
   {
      private final long[] array; 
      private final int lo, hi;

      SortTask(long[] array, int lo, int hi) 
      {
         this.array = array; 
         this.lo = lo; 
         this.hi = hi;
      }

      SortTask(long[] array) 
      { 
         this(array, 0, array.length); 
      }

      private final static int THRESHOLD = 1000;

      @Override
      protected void compute() 
      {
         if (hi-lo < THRESHOLD)
            sortSequentially(lo, hi);
         else 
         {
            int mid = (lo+hi) >>> 1;
            invokeAll(new SortTask(array, lo, mid),
                      new SortTask(array, mid, hi));
            merge(lo, mid, hi);
         }
      }

      private void sortSequentially(int lo, int hi) 
      {
         Arrays.sort(array, lo, hi);
      }

      private void merge(int lo, int mid, int hi) 
      {
         long[] buf = Arrays.copyOfRange(array, lo, mid);
         for (int i = 0, j = lo, k = mid; i < buf.length; j++)
            array[j] = (k == hi || buf[i] < array[k]) ? buf[i++] : array[k++];
      }
   }

   public static void main(String[] args)
   {
      long[] array = new long[300000];
      for (int i = 0; i < array.length; i++)
         array[i] = (long) (Math.random()*10000000);
      long[] array2 = new long[array.length];
      System.arraycopy(array, 0, array2, 0, array.length);

      long startTime = System.currentTimeMillis();
      Arrays.sort(array, 0, array.length-1);
      System.out.printf("sequential sort completed in %d millis%n", 
                        System.currentTimeMillis()-startTime);
      for (int i = 0; i < array.length; i++)
         System.out.println(array[i]);

      System.out.println();

      ForkJoinPool pool = new ForkJoinPool();
      startTime = System.currentTimeMillis();
      pool.invoke(new SortTask(array2));
      System.out.printf("parallel sort completed in %d millis%n", 
                        System.currentTimeMillis()-startTime);
      for (int i = 0; i < array2.length; i++)
         System.out.println(array2[i]);
   }
}

Listing 8’s ForkJoinDemo class declares a SortedTask nested class that describes a resultless fork/join task for sorting a long integer array. The key to this class is the overriding protected void compute() method, which is called by a worker thread to sort part of the array.

The mechanics of ForkJoinDemo

compute() first determines whether it should sort the array sequentially or divide the array among a pair of subtasks and invoke them recursively. As long as the difference between the lo and hi indexes is greater than or equal to THRESHOLD‘s value, the array is subdivided.

When a problem is very small, a sequential solution is often faster than a parallel solution because there’s less overhead. The ideal threshold depends on the cost of coordinating tasks that run in parallel. The lower this cost, the lower the threshold value can be.

RecursiveAction inherits ForkJoinTask‘s void invokeAll(ForkJoinTask<?>... tasks) method for forking a variable number of tasks. It’s passed two SortTask instances representing the low and high halves of the array to sort.

The invokeAll() call is followed by a call to merge(), which combines the results from the previously executed SortTask instances. This completes the behavior of this fork/join task.

Listing 8 also presents a main() method for running this application. This method first creates two large long integer arrays with identical content. It then sorts the first array sequentially, and sorts the second array via ForkJoinPool.

Compile Listing 8 (javac ForkJoinDemo.java) and run the application (java ForkJoinDemo). The following is a subset of the output that I observed during one run on my platform, a dual-core 64-bit AMD processor:

sequential sort completed in 94 millis
172
214
287
296
342
343
...

parallel sort completed in 78 millis
172
214
287
296
342
343
...

This output indicates that the parallel sorting is faster than sequential sorting.

Concurrency in Java 8

Java 8 is expected to reach general availability status in March 2014. Although this release will likely be celebrated (and remembered) for introducing Lambda expressions to the Java language, it includes other new features that will help to improve developer productivity. Consider the following enhancements to the Java Concurrency Utilities:

  • Improved ConcurrentHashMap class: The java.util.concurrent.ConcurrentHashMap class has been improved to make it and classes that use it more useful as caches. New methods include sequential and parallel bulk operations (forEach, search, and reduce) methods.
  • Fence intrinsics: This update exposes memory fence controls into Java code so that the Java Concurrency Utilities APIs can more accurately and efficiently control memory ordering and bounding. This task is accomplished by adding “three memory-ordering intrinsics to the sun.misc.Unsafe class.”
  • Changes to the Fork/Join framework: The ForkJoinPool and ForkJoinTask classes have been updated to improve performance and supply additional functionality. “New features include support for completion-based designs that are often most appropriate for IO-bound usages, among others.” Additionally, a new CountedCompleter class that subclasses ForkJoinTask and provides a completion action that’s “performed when triggered and there are no remaining pending actions” has been introduced.
  • New CompletableFuture class: The new java.util.concurrent.CompletableFuture class is a “Future that may be explicitly completed (setting its value and status), and may include dependent functions and actions that trigger upon its completion.” This class is associated with the new java.util.concurrent.CompletableFuture.AsynchronousCompletionTask interface and the new java.util.concurrent.CompletionException exception. Check out Tomasz Nurkiewicz’s Java 8: Definitive guide to CompletableFuture blog post for an extensive tutorial on how to use CompletableFuture.
  • New StampedLock class: The new java.util.concurrent.locks.StampedLock class is “a capability-based lock with three modes for controlling read/write access.” Check out Dr. Heinz Kabutz’s Phaser and StampedLock Concurrency Synchronizers video presentation to learn about StampedLock. A PDF file of this presentation is also available.
  • Parallel array sorting: The java.util.Arrays class has been augmented with several parallel-prefixed class methods (such as void parallelSort(int[] a)) that leverage the Fork/Join framework to sort arrays in parallel.
  • Scalable updatable variables: The java.util.concurrent.atomic package includes new DoubleAccumulator, DoubleAdder, LongAccumulator, and LongAdder classes that address a scalability problem in the context of maintaining a single count, sum, or some other value with the possibility of updates from many threads. These new classes “internally employ contention-reduction techniques that provide huge throughput improvements as compared to atomic variables. This is made possible by relaxing atomicity guarantees in a way that is acceptable in most applications.”

In conclusion

The Java Concurrency Utilities framework offers an alternative to Java’s low-level threading capabilities. This article completes my two-part series on java.util.concurrent by focusing on the Locking framework, atomic variables, and the Fork/Join framework. I also introduced seven significant enhancements to the Java Concurrency Utilities, which are coming in Java 8.

The two articles in this series couldn’t cover every API in the Java Concurrency Utilities, but there is more to explore. The code file for this article includes more demos, including exercises using the java.util.concurrent.ThreadLocalRandom and java.util.concurrent.locks.LockSupport classes, and more.

Jeff Friesen is a freelance tutor and software developer with an emphasis on Java and Android. In addition to writing Java and Android books for Apress, Jeff has written numerous articles on Java and other technologies for JavaWorld, informIT, Java.net, DevSource, and SitePoint. Jeff can be contacted via his website at TutorTutor.ca.