Java 101: Java concurrency without the pain, Part 1

feature
Jun 20, 201334 mins
ConcurrencyCore JavaJakarta EE

Get started with the Java Concurrency Utilities

With the increasingly complexity of concurrent applications, many developers find that Java’s low-level threading capabilities are insufficient to their programming needs. In that case, it might be time to discover the Java Concurrency Utilities. Get started with java.util.concurrent, with Jeff Friesen’s detailed introduction to the Executor framework, synchronizer types, and the Java Concurrent Collections package.

Java 101: The next generation

The first article in this new JavaWorld series introduces the Java Date and Time API.

The Java platform provides low-level threading capabilities that enable developers to write concurrent applications where different threads execute simultaneously. Standard Java threading has some downsides, however:

  • Java’s low-level concurrency primitives (synchronized, volatile, wait(), notify(), and notifyAll()) aren’t easy to use correctly. Threading hazards like deadlock, thread starvation, and race conditions, which result from incorrect use of primitives, are also hard to detect and debug.
  • Relying on synchronized to coordinate access between threads leads to performance issues that affect application scalability, a requirement for many modern applications.
  • Java’s basic threading capabilities are too low level. Developers often need higher level constructs like semaphores and thread pools, which Java’s low-level threading capabilities don’t offer. As a result, developers will build their own constructs, which is both time consuming and error prone.

The JSR 166: Concurrency Utilities framework was designed to meet the need for a high-level threading facility. Initiated in early 2002, the framework was formalized and implemented two years later in Java 5. Enhancements have followed in Java 6, Java 7, and the forthcoming Java 8.

This two-part Java 101: The next generation series introduces software developers familiar with basic Java threading to the Java Concurrency Utilities packages and framework. In Part 1, I present an overview of the Java Concurrency Utilities framework and introduce its Executor framework, synchronizer utilities, and the Java Concurrent Collections package.

Understanding Java threads

Before you dive into this series, make sure you are familiar with the basics of threading. Start with the Java 101 introduction to Java’s low-level threading capabilities:

Inside the Java Concurrency Utilities

The Java Concurrency Utilities framework is a library of types that are designed to be used as building blocks for creating concurrent classes or applications. These types are thread-safe, have been thoroughly tested, and offer high performance.

Types in the Java Concurrency Utilities are organized into small frameworks; namely, Executor framework, synchronizer, concurrent collections, locks, atomic variables, and Fork/Join. They are further organized into a main package and a pair of subpackages:

  • java.util.concurrent contains high-level utility types that are commonly used in concurrent programming. Examples include semaphores, barriers, thread pools, and concurrent hashmaps.
    • The java.util.concurrent.atomic subpackage contains low-level utility classes that support lock-free thread-safe programming on single variables.
    • The java.util.concurrent.locks subpackage contains low-level utility types for locking and waiting for conditions, which are different from using Java’s low-level synchronization and monitors.

The Java Concurrency Utilities framework also exposes the low-level compare-and-swap (CAS) hardware instruction, variants of which are commonly supported by modern processors. CAS is much more lightweight than Java’s monitor-based synchronization mechanism and is used to implement some highly scalable concurrent classes. The CAS-based java.util.concurrent.locks.ReentrantLock class, for instance, is more performant than the equivalent monitor-based synchronized primitive. ReentrantLock offers more control over locking. (In Part 2 I’ll explain more about how CAS works in java.util.concurrent.)

System.nanoTime()

The Java Concurrency Utilities framework includes long nanoTime(), which is a member of the java.lang.System class. This method enables access to a nanosecond-granularity time source for making relative time measurements.

In the next sections I’ll introduce three useful features of the Java Concurrency Utilities, first explaining why they’re so important to modern concurrency and then demonstrating how they work to increase the speed, reliability, efficiency, and scalability of concurrent Java applications.

The Executor framework

In threading, a task is a unit of work. One problem with low-level threading in Java is that task submission is tightly coupled with a task-execution policy, as demonstrated by Listing 1.

Listing 1. Server.java (Version 1)

import java.io.IOException;

import java.net.ServerSocket;
import java.net.Socket;

class Server
{
   public static void main(String[] args) throws IOException
   {
      ServerSocket socket = new ServerSocket(9000);
      while (true)
      {
         final Socket s = socket.accept();
         Runnable r = new Runnable()
                      {
                         @Override
                         public void run()
                         {
                            doWork(s);
                         }
                      };
         new Thread(r).start();
      }
   }

   static void doWork(Socket s)
   {
   }
}

The above code describes a simple server application (with doWork(Socket) left empty for brevity). The server thread repeatedly calls socket.accept() to wait for an incoming request, and then starts a thread to service this request when it arrives.

Because this application creates a new thread for each request, it doesn’t scale well when faced with a huge number of requests. For example, each created thread requires memory, and too many threads may exhaust the available memory, forcing the application to terminate.

You could solve this problem by changing the task-execution policy. Rather than always creating a new thread, you could use a thread pool, in which a fixed number of threads would service incoming tasks. You would have to rewrite the application to make this change, however.

java.util.concurrent includes the Executor framework, a small framework of types that decouple task submission from task-execution policies. Using the Executor framework, it is possible to easily tune a program’s task-execution policy without having to significantly rewrite your code.

Inside the Executor framework

The Executor framework is based on the Executor interface, which describes an executor as any object capable of executing java.lang.Runnable tasks. This interface declares the following solitary method for executing a Runnable task:

void execute(Runnable command)

You submit a Runnable task by passing it to execute(Runnable). If the executor cannot execute the task for any reason (for instance, if the executor has been shut down), this method will throw a RejectedExecutionException.

The key concept is that task submission is decoupled from the task-execution policy, which is described by an Executor implementation. The runnable task is thus able to execute via a new thread, a pooled thread, the calling thread, and so on.

Note that Executor is very limited. For example, you can’t shut down an executor or determine whether an asynchronous task has finished. You also can’t cancel a running task. For these and other reasons, the Executor framework provides an ExecutorService interface, which extends Executor.

Five of ExecutorService‘s methods are especially noteworthy:

  • boolean awaitTermination(long timeout, TimeUnit unit) blocks the calling thread until all tasks have completed execution after a shutdown request, the timeout occurs, or the current thread is interrupted, whichever happens first. The maximum time to wait is specified by timeout, and this value is expressed in the unit units specified by the TimeUnit enum; for example, TimeUnit.SECONDS. This method throws java.lang.InterruptedException when the current thread is interrupted. It returns true when the executor is terminated and false when the timeout elapses before termination.
  • boolean isShutdown() returns true when the executor has been shut down.
  • void shutdown() initiates an orderly shutdown in which previously submitted tasks are executed but no new tasks are accepted.
  • Future submit(Callable task) submits a value-returning task for execution and returns a Future representing the pending results of the task.
  • Future> submit(Runnable task) submits a Runnable task for execution and returns a Future representing that task.

The Future<V> interface represents the result of an asynchronous computation. The result is known as a future because it typically will not be available until some moment in the future. You can invoke methods to cancel a task, return a task’s result (waiting indefinitely or for a timeout to elapse when the task hasn’t finished), and determine if a task has been cancelled or has finished.

The Callable<V> interface is similar to the Runnable interface in that it provides a single method describing a task to execute. Unlike Runnable‘s void run() method, Callable<V>‘s V call() throws Exception method can return a value and throw an exception.

Executor factory methods

At some point, you’ll want to obtain an executor. The Executor framework supplies the Executors utility class for this purpose. Executors offers several factory methods for obtaining different kinds of executors that offer specific thread-execution policies. Here are three examples:

  • ExecutorService newCachedThreadPool() creates a thread pool that creates new threads as needed, but which reuses previously constructed threads when they’re available. Threads that haven’t been used for 60 seconds are terminated and removed from the cache. This thread pool typically improves the performance of programs that execute many short-lived asynchronous tasks.
  • ExecutorService newSingleThreadExecutor() creates an executor that uses a single worker thread operating off an unbounded queue — tasks are added to the queue and execute sequentially (no more than one task is active at any one time). If this thread terminates through failure during execution before shutdown of the executor, a new thread will be created to take its place when subsequent tasks need to be executed.
  • ExecutorService newFixedThreadPool(int nThreads) creates a thread pool that re-uses a fixed number of threads operating off a shared unbounded queue. At most nThreads threads are actively processing tasks. If additional tasks are submitted when all threads are active, they wait in the queue until a thread is available. If any thread terminates through failure during execution before shutdown, a new thread will be created to take its place when subsequent tasks need to be executed. The pool’s threads exist until the executor is shut down.

The Executor framework offers additional types (such as the ScheduledExecutorService interface), but the types you are likely to work with most often are ExecutorService, Future, Callable, and Executors.

See the java.util.concurrent Javadoc to explore additional types.

Working with the Executor framework

You’ll find that the Executor framework is fairly easy to work with. In Listing 2, I’ve used Executor and Executors to replace the server example from Listing 1 with a more scalable thread pool-based alternative.

Listing 2. Server.java (Version 2)

import java.io.IOException;

import java.net.ServerSocket;
import java.net.Socket;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

class Server
{
   static Executor pool = Executors.newFixedThreadPool(5);

   public static void main(String[] args) throws IOException
   {
      ServerSocket socket = new ServerSocket(9000);
      while (true)
      {
         final Socket s = socket.accept();
         Runnable r = new Runnable()
                      {
                         @Override
                         public void run()
                         {
                            doWork(s);
                         }
                      };
         pool.execute(r);
      }
   }

   static void doWork(Socket s)
   {
   }
}

Listing 2 uses newFixedThreadPool(int) to obtain a thread pool-based executor that reuses five threads. It also replaces new Thread(r).start(); with pool.execute(r); for executing runnable tasks via any of these threads.

Listing 3 presents another example in which an application reads the contents of an arbitrary web page. It outputs the resulting lines or an error message if the contents aren’t available within a maximum of five seconds.

Listing 3. ReadWebPage.java

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;

import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;

import java.util.ArrayList;
import java.util.List;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;

public class ReadWebPage
{
   public static void main(final String[] args)
   {
      if (args.length != 1)
      {
         System.err.println("usage: java ReadWebPage url");
         return;
      }
      ExecutorService executor = Executors.newSingleThreadExecutor();
      Callable<List<String>> callable;
      callable = new Callable<List<String>>()
                 {
                    @Override
                    public List<String> call()
                       throws IOException, MalformedURLException
                    {
                       List<String> lines = new ArrayList<>();
                       URL url = new URL(args[0]);
                       HttpURLConnection con;
                       con = (HttpURLConnection) url.openConnection();
                       InputStreamReader isr;
                       isr = new InputStreamReader(con.getInputStream());
                       BufferedReader br;
                       br = new BufferedReader(isr);
                       String line;
                       while ((line = br.readLine()) != null)
                          lines.add(line);
                       return lines;
                    }
                 };
      Future<List<String>> future = executor.submit(callable);
      try
      {
         List<String> lines = future.get(5, TimeUnit.SECONDS);
         for (String line: lines)
            System.out.println(line);
      }
      catch (ExecutionException ee)
      {
         System.err.println("Callable through exception: "+ee.getMessage());
      }
      catch (InterruptedException | TimeoutException eite)
      {
         System.err.println("URL not responding");
      }
      executor.shutdown();
    }
}

Listing 3’s main() method first verifies that a single (URL-based) command-line argument has been specified. It then creates a single-thread executor and a callable that tries to open a connection to this URL, read its contents line by line, and save these lines in a list, which it returns.

The callable is subsequently submitted to the executor and a future representing the list of strings is returned. main() invokes the future’s V get(long timeout, TimeUnit unit) method to obtain this list.

get() throws TimeoutException when the callable doesn’t finish within five seconds. It throws ExecutionException when the callable throws an exception (for instance, the callable will throw java.net.MalformedURLException when the URL is invalid).

Regardless of whether an exception is thrown or not, the executor must be shut down before the application exits. If the executor isn’t shut down, the application won’t exit because the non-daemon thread-pool threads are still executing.

Synchronizers

Synchronizers are high-level constructs that coordinate and control thread execution. The Java Concurrency Utilities framework provides classes that implement semaphore, cyclic barrier, countdown latch, exchanger, and phaser synchronizers. I’ll introduce each of these synchronizer types and then show you how they’d work in a concurrent Java application.

Semaphores

A semaphore is a thread-synchronization construct for controlling thread access to a common resource. It’s often implemented as a protected variable whose value is incremented by an acquire operation and decremented by a release operation.

The acquire operation either returns control to the invoking thread immediately or causes that thread to block when the semaphore’s current value reaches a certain limit. The release operation decreases the current value, which causes a blocked thread to resume.

Semaphores whose current values can be incremented past 1 are known as counting semaphores, whereas semaphores whose current values can be only 0 or 1 are known as binary semaphores or mutexes. In either case, the current value cannot be negative.

The java.lang.concurrent.Semaphore class conceptualizes a semaphore as an object maintaining a set of permits. This class provides Semaphore(int permits) and Semaphore(int permits, boolean fair) constructors for specifying the number of permits.

Each call to the Semaphore‘s void acquire() method takes one of the available permits or blocks the calling thread when one isn’t available. Each call to Semaphore‘s void release() method returns an available permit, potentially releasing a blocking acquirer thread.

Working with semaphores

Semaphores are often used to restrict the number of threads that can access a resource. Listing 4 demonstrates this capability by using a semaphore to control access to a pool of string items — the source code is based on Semaphore‘s Javadoc example code.

Listing 4. SemaphoreDemo.java

import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;

public class SemaphoreDemo
{
   public static void main(String[] args)
   {
      final Pool pool = new Pool();
      Runnable r = new Runnable()
                   {
                      @Override
                      public void run()
                      {
                         String name = Thread.currentThread().getName();
                         try
                         {
                            while (true)
                            {
                               String item;
                               System.out.printf("%s acquiring %s%n", name,
                                                 item = pool.getItem());
                               Thread.sleep(200+(int)(Math.random()*100));
                               System.out.printf("%s putting back %s%n",
name,
                                                 item);
                               pool.putItem(item);
                            }
                         }
                         catch (InterruptedException ie)
                         {
                            System.out.printf("%s interrupted%n", name);
                         }
                      }
                   };
      ExecutorService[] executors = new
ExecutorService[Pool.MAX_AVAILABLE+1];
      for (int i = 0; i < executors.length; i++)
      {
         executors[i] = Executors.newSingleThreadExecutor();
         executors[i].execute(r);
      }
   }
}

final class Pool
{
   public static final int MAX_AVAILABLE = 10;

   private Semaphore available = new Semaphore(MAX_AVAILABLE, true);
   private String[] items;
   private boolean[] used = new boolean[MAX_AVAILABLE];

   Pool()
   {
      items = new String[MAX_AVAILABLE];
      for (int i = 0; i < items.length; i++)
         items[i] = "ITEM"+i;
   }

   String getItem() throws InterruptedException
   {
      available.acquire();
      return getNextAvailableItem();
   }

   void putItem(String item)
   {
      if (markAsUnused(item))
         available.release();
   }

   private synchronized String getNextAvailableItem()
   {
      for (int i = 0; i < MAX_AVAILABLE; ++i)
      {
         if (!used[i])
         {
            used[i] = true;
            return items[i];
         }
      }
      return null; // not reached
   }

   private synchronized boolean markAsUnused(String item)
   {
      for (int i = 0; i < MAX_AVAILABLE; ++i)
      {
         if (item == items[i])
         {
            if (used[i])
            {
               used[i] = false;
               return true;
            }
            else
               return false;
         }
      }
      return false;
   }
}

Listing 4 presents SemaphoreDemo and Pool classes. SemaphoreDemo drives the application by creating executors and having them execute a runnable that repeatedly acquires string item resources from a pool (implemented by Pool) and then returns them.

Pool provides String getItem() and void putItem(String item) methods for obtaining and returning resources. Before obtaining an item in getItem(), a thread must acquire a permit from the semaphore, guaranteeing that an item is available for use. When the thread has finished with the item, it calls putItem(String), which returns the item to the pool and then returns a permit to the semaphore, which lets another thread acquire that item.

No synchronization lock is held when acquire() is called because that would prevent an item from being returned to the pool. However, String getNextAvailableItem() and boolean markAsUnused(String item) are synchronized to maintain pool consistency. (The semaphore encapsulates the synchronization needed to restrict access to the pool separately from the synchronization needed to maintain pool consistency.)

Compile Listing 4 (javac SemaphoreDemo.java) and run this application (java SemaphoreDemo). A prefix of the output generated from one run is shown below:

pool-1-thread-1 acquiring ITEM0
pool-9-thread-1 acquiring ITEM9
pool-7-thread-1 acquiring ITEM8
pool-5-thread-1 acquiring ITEM7
pool-3-thread-1 acquiring ITEM6
pool-10-thread-1 acquiring ITEM5
pool-8-thread-1 acquiring ITEM4
pool-6-thread-1 acquiring ITEM3
pool-4-thread-1 acquiring ITEM2
pool-2-thread-1 acquiring ITEM1
pool-5-thread-1 putting back ITEM7
pool-11-thread-1 acquiring ITEM7
pool-9-thread-1 putting back ITEM9
pool-5-thread-1 acquiring ITEM9
pool-7-thread-1 putting back ITEM8
pool-9-thread-1 acquiring ITEM8
pool-3-thread-1 putting back ITEM6
pool-7-thread-1 acquiring ITEM6

In the above output, eleven threads compete for ten resources. Thread pool-11-thread-1 is forced to wait when it attempts to acquire a resource. It resumes with the ITEM7 resource when thread pool-5-thread-1 returns this resource to the pool.

Cyclic barriers

A cyclic barrier is a thread-synchronization construct that lets a set of threads wait for each other to reach a common barrier point. The barrier is called cyclic because it can be re-used after the waiting threads are released.

A cyclic barrier is implemented by the java.lang.concurrent.CyclicBarrier class. This class provides the following constructors:

  • CyclicBarrier(int nthreads, Runnable barrierAction) causes a maximum of nthreads-1 threads to wait at the barrier. When one more thread arrives, it executes the nonnull barrierAction and then all threads proceed. This action is useful for updating shared state before any of the threads continue.
  • CyclicBarrier(int nthreads) is similar to the previous constructor except that no runnable is executed when the barrier is tripped.

Either constructor throws java.lang.IllegalArgumentException when the value passed to nthreads is less than 1.

CyclicBarrier declares an int await() method that typically causes the calling thread to wait unless the thread is the final thread. If so, and if a nonnull Runnable was passed to barrierAction, the final thread executes the runnable before the other threads continue.

await() throws InterruptedException when the thread that invoked this method is interrupted while waiting. This method throws BrokenBarrierException when another thread was interrupted while the invoking thread was waiting, the barrier was broken when await() was called, or the barrier action (when present) failed because an exception was thrown from the runnable’s run() method.

Working with cyclic barriers

Cyclic barriers can be used to perform lengthy calculations by breaking them into smaller individual tasks (as demonstrated by CyclicBarrier‘s Javadoc example code). They’re also used in multiplayer games that cannot start until the last player has joined, as shown in Listing 5.

Listing 5. CyclicBarrierDemo.java

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public class CyclicBarrierDemo
{
   public static void main(String[] args)
   {
      Runnable action = new Runnable()
                        {
                           @Override
                           public void run()
                           {
                              String name =
Thread.currentThread().getName();
                              System.out.printf("Thread %s "+
                                                "executing barrier
action.%n",
                                                name);
                           }
                        };
      final CyclicBarrier barrier = new CyclicBarrier(3, action);
      Runnable task = new Runnable()
                      {
                         @Override
                         public void run()
                         {
                            String name = Thread.currentThread().getName();
                            System.out.printf("%s about to join game...%n",
                                              name);
                            try
                            {
                               barrier.await();
                            }
                            catch (BrokenBarrierException bbe)
                            {
                               System.out.println("barrier is broken");
                               return;
                            }
                            catch (InterruptedException ie)
                            {
                               System.out.println("thread interrupted");
                               return;
                            }
                            System.out.printf("%s has joined game%n", name);
                         }
                      };
      ExecutorService[] executors = new ExecutorService[]
                                    {
                                       Executors.newSingleThreadExecutor(),
                                       Executors.newSingleThreadExecutor(),
                                       Executors.newSingleThreadExecutor()
                                    };
      for (ExecutorService executor: executors)
      {
         executor.execute(task);
         executor.shutdown();
      }
   }
}

The above main() method first creates a barrier action that’s run by the last thread to reach the barrier. Next, a cyclic barrier is created. When three players arrive it trips and executes the barrier action.

Reusing a CyclicBarrier

To reuse a CyclicBarrier instance, invoke its void reset() method.

main() now creates a runnable that outputs various status messages and invokes await(), followed by a three-executor array. Each executor runs this runnable and shuts down after the runnable finishes.

Compile and run this application. You should see output similar to the following:

pool-1-thread-1 about to join game...
pool-3-thread-1 about to join game...
pool-2-thread-1 about to join game...
Thread pool-2-thread-1 executing barrier action.
pool-2-thread-1 has joined game
pool-3-thread-1 has joined game
pool-1-thread-1 has joined game

Countdown latches

A countdown latch is a thread-synchronization construct that causes one or more threads to wait until a set of operations being performed by other threads finishes. It consists of a count and “cause a thread to wait until the count reaches zero” and “decrement the count” operations.

The java.util.concurrent.CountDownLatch class implements a countdown latch. Its CountDownLatch(int count) constructor initializes the countdown latch to the specified count. A thread invokes the void await() method to wait until the count has reached zero (or the thread has been interrupted). Subsequent calls to await() for a zero count return immediately. A thread calls void countDown() to decrement the count.

Working with countdown latches

Countdown latches are useful for decomposing a problem into smaller pieces and giving a piece to a separate thread, as follows:

  1. A main thread creates a countdown latch with a count of 1 that’s used as a “starting gate” to start a group of worker threads simultaneously.
  2. Each worker thread waits on the latch and the main thread decrements this latch to let all worker threads proceed.
  3. The main thread waits on another countdown latch initialized to the number of worker threads.
  4. When a worker thread completes, it decrements this count. After the count reaches zero (meaning that all worker threads have finished), the main thread proceeds and gathers the results.

Listing 6 demonstrates this scenario.

Listing 6. CountDownLatchDemo.java

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo
{
   final static int N = 3;

   public static void main(String[] args) throws InterruptedException
   {
      CountDownLatch startSignal = new CountDownLatch(1);
      CountDownLatch doneSignal = new CountDownLatch(N);
      for (int i = 0; i < N; ++i) // create and start threads
         new Thread(new Worker(startSignal, doneSignal)).start();
      System.out.println("about to let threads proceed");
      startSignal.countDown(); // let all threads proceed
      System.out.println("doing work");
      System.out.println("waiting for threads to finish");
      doneSignal.await(); // wait for all threads to finish
      System.out.println("main thread terminating");
   }
}

class Worker implements Runnable
{
   private final static int N = 5;

   private final CountDownLatch startSignal;
   private final CountDownLatch doneSignal;

   Worker(CountDownLatch startSignal, CountDownLatch doneSignal)
   {
      this.startSignal = startSignal;
      this.doneSignal = doneSignal;
   }

   @Override
   public void run()
   {
      try
      {
         String name = Thread.currentThread().getName();
         startSignal.await();
         for (int i = 0; i < N; i++)
         {
            System.out.printf("thread %s is working%n", name);
            try
            {
               Thread.sleep((int)(Math.random()*300));
            }
            catch (InterruptedException ie)
            {
            }
         }
         System.out.printf("thread %s finishing%n", name);
         doneSignal.countDown();
      }
      catch (InterruptedException ie)
      {
         System.out.println("interrupted");
      }
   }
}

Listing 6 presents CountDownLatchDemo and Worker classes. CountDownLatchDemo‘s main() method creates a startSignal countdown latch initialized to 1 and a doneSignal countdown latch initialized to 3, the number of worker threads.

main() proceeds to create three worker threads described by Worker and then start these threads. After outputting a message, main() executes startSignal.countDown() to tell the worker threads that they can proceed.

After outputting a few more messages, main() executes doneSignal.await() to wait until all worker threads have finished.

Worker‘s constructor saves these latches, and its run() method performs some work. Before performing this work, the thread executes startSignal.await() to block until the main thread allows it to proceed (by executing startSignal.countDown()).

The worker then enters a loop to simulate doing some work by alternately outputting messages and sleeping for random amounts of time. It then executes doneSignal.countDown() to decrement the doneSignal countdown latch so that the main thread will eventually wake up.

Compile and run this application. You should see output similar to the following:

about to let threads proceed
doing work
waiting for threads to finish
thread Thread-1 is working
thread Thread-0 is working
thread Thread-2 is working
thread Thread-1 is working
thread Thread-0 is working
thread Thread-0 is working
thread Thread-1 is working
thread Thread-2 is working
thread Thread-1 is working
thread Thread-2 is working
thread Thread-0 is working
thread Thread-0 is working
thread Thread-1 is working
thread Thread-2 is working
thread Thread-1 finishing
thread Thread-0 finishing
thread Thread-2 is working
thread Thread-2 finishing
main thread terminating

Exchangers

An exchanger (also known as a rendezvous) is a thread-synchronization construct that lets a pair of threads exchange data items. An exchanger is similar to a cyclic barrier whose count is set to 2 but also supports exchange of data when both threads reach the barrier.

The java.util.concurrent.Exchanger<V> class implements an exchanger. This class provides an Exchanger() constructor for initializing an exchanger that describes an exchange point and a pair of exchange() methods for performing an exchange.

For example, V exchange(V x) throws InterruptedException waits for another thread to arrive at the exchange point (unless the current thread is interrupted) and then transfers the given object to it, receiving its object in return.

Working with exchangers

Exchanger‘s Javadoc states that this synchronizer may be useful in genetic algorithms and pipeline designs, where one thread fills a buffer and the other thread empties the buffer. When both threads meet at the exchange point, they swap their buffers. Listing 7 demonstrates.

Listing 7. ExchangerDemo.java

import java.util.ArrayList;
import java.util.List;

import java.util.concurrent.Exchanger;

public class ExchangerDemo
{
   static Exchanger<DataBuffer> exchanger = new
Exchanger<DataBuffer>();
   static DataBuffer initialEmptyBuffer = new DataBuffer();
   static DataBuffer initialFullBuffer = new DataBuffer("ITEM");

   public static void main(String[] args)
   {
      class FillingLoop implements Runnable
      {
         int count = 0;

         @Override
         public void run()
         {
            DataBuffer currentBuffer = initialEmptyBuffer;
            try
            {
               while (true)
               {
                  addToBuffer(currentBuffer);
                  if (currentBuffer.isFull())
                  {
                     System.out.println("filling loop thread wants to
exchange");
                     currentBuffer = exchanger.exchange(currentBuffer);
                     System.out.println("filling loop thread observes an
exchange");
                  }
               }
            }
            catch (InterruptedException ie)
            {
               System.out.println("filling loop thread interrupted");
            }
         }

         void addToBuffer(DataBuffer buffer)
         {
            String item = "NEWITEM"+count++;
            System.out.printf("Adding %s%n", item);
            buffer.add(item);
         }
      }

      class EmptyingLoop implements Runnable
      {
         @Override
         public void run()
         {
            DataBuffer currentBuffer = initialFullBuffer;
            try
            {
               while (true)
               {
                  takeFromBuffer(currentBuffer);
                  if (currentBuffer.isEmpty())
                  {
                     System.out.println("emptying loop thread wants to
exchange");
                     currentBuffer = exchanger.exchange(currentBuffer);
                     System.out.println("emptying loop thread observes an
exchange");
                  }
               }
            }
            catch (InterruptedException ie)
            {
               System.out.println("emptying loop thread interrupted");
            }
         }

         void takeFromBuffer(DataBuffer buffer)
         {
            System.out.printf("taking %s%n", buffer.remove());
         }
      }

      new Thread(new EmptyingLoop()).start();
      new Thread(new FillingLoop()).start();
   }
}

class DataBuffer
{
   private final static int MAX = 10;
   private List<String> items = new ArrayList<>();

   DataBuffer()
   {
   }

   DataBuffer(String prefix)
   {
      for (int i = 0; i < MAX; i++)
      {
         String item = prefix+i;
         System.out.printf("Adding %s%n", item);
         items.add(item);
      }
   }

   void add(String s)
   {
      if (!isFull())
         items.add(s);
   }

   boolean isEmpty()
   {
      return items.size() == 0;
   }

   boolean isFull()
   {
      return items.size() == MAX;
   }

   String remove()
   {
      if (!isEmpty())
         return items.remove(0);
      return null;
   }
}

Listing 7 is based on the example code in Exchanger‘s Javadoc. One thread fills one buffer with strings while another thread empties another buffer. When the respective buffer is full or empty, these threads meet at an exchange point and swap buffers.

For example, when the filling thread’s currentBuffer.isFull() expression is true, it executes currentBuffer = exchanger.exchange(currentBuffer) and waits. The emptying thread continues until currentBuffer.isEmpty() evaluates to true, and also invokes exchange(currentBuffer). At this point, the buffers are swapped and the threads continue.

Compile and run this application. Your initial output should be similar to the following prefix:

Adding ITEM0
Adding ITEM1
Adding ITEM2
Adding ITEM3
Adding ITEM4
Adding ITEM5
Adding ITEM6
Adding ITEM7
Adding ITEM8
Adding ITEM9
taking ITEM0
taking ITEM1
taking ITEM2
taking ITEM3
taking ITEM4
taking ITEM5
taking ITEM6
taking ITEM7
taking ITEM8
Adding NEWITEM0
taking ITEM9
Adding NEWITEM1
emptying loop thread wants to exchange
Adding NEWITEM2
Adding NEWITEM3
Adding NEWITEM4
Adding NEWITEM5
Adding NEWITEM6
Adding NEWITEM7
Adding NEWITEM8
Adding NEWITEM9
filling loop thread wants to exchange
filling loop thread observes an exchange
emptying loop thread observes an exchange
Adding NEWITEM10
Adding NEWITEM11
taking NEWITEM0
taking NEWITEM1
Adding NEWITEM12
taking NEWITEM2
taking NEWITEM3
Adding NEWITEM13
taking NEWITEM4
taking NEWITEM5
Adding NEWITEM14
taking NEWITEM6
taking NEWITEM7
Adding NEWITEM15
taking NEWITEM8
taking NEWITEM9
emptying loop thread wants to exchange
Adding NEWITEM16
Adding NEWITEM17
Adding NEWITEM18
Adding NEWITEM19
filling loop thread wants to exchange
filling loop thread observes an exchange
emptying loop thread observes an exchange
Adding NEWITEM20

Phasers

A phaser is a thread-synchronization construct that’s similar to a cyclic barrier in that it lets a group of threads wait on a barrier and then proceed after the last thread arrives. It also offers the equivalent of a barrier action. However, a phaser is more flexible.

Unlike a cyclic barrier, which coordinates a fixed number of threads, a phaser can coordinate a variable number of threads, which can register at any time. To implement this capability, a phaser takes advantage of phases and phase numbers.

A phase is the phaser’s current state, and this state is identified by an integer-based phase number. When the last of the registered threads arrives at the phaser barrier, a phaser advances to the next phase and increments its phase number by 1.

The java.util.concurrent.Phaser class implements a phaser. Because this class is thoroughly described in its Javadoc, I’ll point out only a few constructors and methods:

  • The Phaser(int threads) constructor creates a phaser that initially coordinates nthreads threads (which have yet to arrive at the phaser barrier) and whose phase number is initially set to 0.
  • The int register() method adds a new unarrived thread to this phaser and returns the phase number to which the arrival applies. This number is known as the arrival phase number.
  • The int arriveAndAwaitAdvance() method records arrival and waits for the phaser to advance (which happens after the other threads have arrived). It returns the phase number to which the arrival applies.
  • The int arriveAndDeregister() method arrives at this phaser and deregisters from it without waiting for others to arrive, reducing the number of threads required to advance in future phases.

Working with phasers

The small application in Listing 8 demonstrates the constructor and methods described above.

Listing 8. PhaserDemo.java

import java.util.ArrayList;
import java.util.List;

import java.util.concurrent.Phaser;

public class PhaserDemo
{
   public static void main(String[] args)
   {
      List<Runnable> tasks = new ArrayList<>();
      tasks.add(new Runnable()
                {
                   @Override
                   public void run()
                   {
                      System.out.printf("%s running at %d%n",
                                        Thread.currentThread().getName(),
                                        System.currentTimeMillis());
                   }
                });
      tasks.add(new Runnable()
                {
                   @Override
                   public void run()
                   {
                      System.out.printf("%s running at %d%n",
                                        Thread.currentThread().getName(),
                                        System.currentTimeMillis());
                   }
                });
      runTasks(tasks);
   }

   static void runTasks(List<Runnable> tasks)
   {
      final Phaser phaser = new Phaser(1); // "1" to register self
      // create and start threads
      for (final Runnable task: tasks)
      {
         phaser.register();
         new Thread()
         {
            @Override
            public void run()
            {
               try
               {
                  Thread.sleep(50+(int)(Math.random()*300));
               }
               catch (InterruptedException ie)
               {
                  System.out.println("interrupted thread");
               }
               phaser.arriveAndAwaitAdvance(); // await all creation
               task.run();
            }
         }.start();
      }

      // allow threads to start and deregister self
      phaser.arriveAndDeregister();
   }
}

Listing 8 is based on the first code example in Phaser‘s Javadoc. This example shows how to use Phaser instead of CountDownLatch to control a one-shot action serving a variable number of threads.

The application creates a pair of runnable tasks that each report the time (in milliseconds relative to the Unix epoch) at which its starts to run. Compile and run this application, and you should observe output that’s similar to the following:

Thread-0 running at 1366315297635
Thread-1 running at 1366315297635

As you would expect from countdown latch behavior, both threads start running at (in this case) the same time even though a thread may have been delayed by as much as 349 milliseconds thanks to the presence of Thread.sleep().

Comment out phaser.arriveAndAwaitAdvance(); // await all creation and you should now observe the threads starting at radically different times, as illustrated below:

Thread-1 running at 1366315428871
Thread-0 running at 1366315429100

Concurrent collections

The Java Collections framework provides interfaces and classes in the java.util package that facilitate working with collections of objects. Interfaces include List, Map, and Set. Classes include ArrayList, Vector, Hashtable, HashMap, and TreeSet.

Collections classes such as Vector and Hashtable are thread-safe. You can make other classes (like ArrayList) thread-safe by using synchronized wrapper factory methods such as Collections.synchronizedMap(), Collections.synchronizedList(), and Collections.synchronizedSet().

There are a couple of problems with the thread-safe collections:

  1. Code that iterates over a collection that might be modified by another thread during the iteration requires a lock to avoid a thrown java.util.ConcurrentModificationException. This requirement is necessary because Collections framework classes return fail-fast iterators, which are iterators that throw ConcurrentModificationException when a collection is modified during iteration. Fail-fast iterators are often an inconvenience to concurrent applications.
  2. Performance often suffers when these synchronized collections are accessed frequently from multiple threads; this is a performance problem that impacts an application’s scalability.

The Java Concurrency Utilities framework overcomes these problems by introducing performant and highly-scalable collections-oriented types, which are part of java.util.concurrent. These collections-oriented classes return weakly consistent iterators, which have the following properties:

  • When an element is removed after iteration starts, but hasn’t yet been returned via the iterator’s next() method, it won’t be returned.
  • When an element is added after iteration starts, it may or may not be returned.
  • Regardless of changes to the collection, no element is returned more than once in an iteration.

The following list summarizes the Java Concurrency Utilities framework’s collection-oriented types:

  • BlockingDeque<E>: This interface extends BlockingQueue and java.util.Deque to describe a double-ended queue with additional support for blocking operations that wait for the deque to become non-empty when retrieving an element, and wait for space to become available in the deque when storing an element.
  • BlockingQueue<E>: This interface extends java.util.Queue to describe a queue with additional support for operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.
  • ConcurrentMap<K, V>: This interface extends java.util.Map to describe a map with additional atomic putIfAbsent, remove, and replace methods.
  • ConcurrentNavigableMap<K, V>: This interface extends ConcurrentMap and java.util.NavigableMap to describe a concurrent map with navigable operations.
  • TransferQueue<E>: This interface extends BlockingQueue to describe a blocking queue in which producers may wait for consumers to receive elements.
  • ArrayBlockingQueue<E>: This class describes a bounded blocking queue backed by an array.
  • ConcurrentHashMap<K, V>: This class describes a hash table supporting full concurrency of retrievals and adjustable expected concurrency for updates.
  • ConcurrentLinkedDeque<E>: This class describes an unbounded thread-safe deque based on linked nodes.
  • ConcurrentLinkedQueue<E>: This class describes an unbounded thread-safe queue based on linked nodes.
  • ConcurrentSkipListMap<K, V>: This class describes a scalable concurrent ConcurrentNavigableMap implementation.
  • ConcurrentSkipListSet<E>: This class describes a scalable concurrent java.util.NavigableSet implementation based on a ConcurrentSkipListMap.
  • CopyOnWriteArrayList<E>: This class describes a thread-safe variant of ArrayList in which all mutative operations (e.g., add and set) are implemented by making a fresh copy of the underlying array whenever an element is added or removed. However, in-progress iterations continue to work on the previous copy (when the iterator was created). Although there’s some cost to copying the array, this cost is acceptable in situations where there are many more iterations than modifications.
  • CopyOnWriteArraySet<E>: This class describes a Set that uses an internal CopyOnWriteArrayList for all of its operations.
  • DelayQueue<E extends Delayed>: This class describes an unbounded blocking queue of java.util.concurrent.Delayed elements, in which an element can only be taken when its delay has expired. (Delayed is an interface for marking objects that should be acted upon after a given delay.)
  • LinkedBlockingDeque<E>: This class describes an optionally-bounded blocking deque based on linked nodes.
  • LinkedBlockingQueue<E>: This class describes an optionally-bounded blocking queue based on linked nodes.
  • LinkedTransferQueue<E>: This class describes an unbounded transfer queue based on linked nodes.
  • PriorityBlockingQueue<E>: This class describes an unbounded blocking queue that uses the same ordering rules as java.util.PriorityQueue and supplies blocking retrieval operations.
  • SynchronousQueue<E>: This class describes a blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa.

Working with Concurrent Collections

For an example of what you can do with Concurrent Collections, consider CopyOnWriteArrayList, as demonstrated in Listing 9.

Listing 9. CopyOnWriteArrayListDemo.java

import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.List;

import java.util.concurrent.CopyOnWriteArrayList;

public class CopyOnWriteArrayListDemo
{
   public static void main(String[] args)
   {
      List<String> empList = new ArrayList<>();
      empList.add("John Doe");
      empList.add("Jane Doe");
      empList.add("Rita Smith");
      Iterator<String> empIter = empList.iterator();
      while (empIter.hasNext())
         try
         {
            System.out.println(empIter.next());
            if (!empList.contains("Tom Smith"))
               empList.add("Tom Smith");
         }
         catch (ConcurrentModificationException cme)
         {
            System.err.println("attempt to modify list during iteration");
            break;
         }

      List<String> empList2 = new CopyOnWriteArrayList<>();
      empList2.add("John Doe");
      empList2.add("Jane Doe");
      empList2.add("Rita Smith");
      empIter = empList2.iterator();
      while (empIter.hasNext())
      {
         System.out.println(empIter.next());
         if (!empList2.contains("Tom Smith"))
            empList2.add("Tom Smith");
      }

   }
}

Listing 9 contrasts CopyOnWriteArrayListDemo with ArrayList from a ConcurrentModificationException perspective. During each iteration, an attempt is made to add a new employee name to the list. The ArrayList iteration fails with this exception, whereas the CopyOnWriteArrayList iteration ignores the addition.

If you compile and run this application you should see the following output:

John Doe
attempt to modify list during iteration
John Doe
Jane Doe
Rita Smith

In conclusion

The Java Concurrency Utilities framework offers a high-level alternative to Java’s low-level threading capabilities. This library’s thread-safe and high-performant types were designed to be used as the building blocks in concurrent classes and applications.

The Java Concurrency Utilities framework is organized into several smaller frameworks, with types stored in java.util.concurrent and two subpackages, java.util.concurrent.atomic and java.util.concurrent.locks.

In this article, I introduced three of these frameworks: the Executor framework, synchronizers, and the Java Concurrent Collections. You can learn more about them by exploring the exercises in this article’s source code file. In Part 2 we’ll explore locks, atomic variables, Fork/Join, and more.

Jeff Friesen is a freelance tutor and software developer with an emphasis on Java and Android. In addition to writing Java and Android books for Apress, Jeff has written numerous articles on Java and other technologies for JavaWorld, informIT, Java.net, DevSource, and SitePoint. Jeff can be contacted via his website at TutorTutor.ca.