Java ArrayBlockingQueue - A High Performance Data Structure for a Multithreaded Application

The queue data structure is the best choice for storing and retrieving elements in a First In First Out (FIFO) order.

The Java programming language represents this data structure with the Queue interface. Java Development Kit contains many implementations for this data structure that present different performance properties and safety guarantees. In this article we’re going to explore one particular Queue implementation, the ArrayBlockingQueue, which provides superior properties and guarantees for high performance, multithreaded, Java production applications.

 
Java ArrayBlockingQueue - A High Performance Data Structure for a Multithreaded Application
 

Before we talk about Java Multithreading and the specific ArrayBlockingQueue implementation, let’s define a few terms that are commonly used to describe operations and actors in the context of a Queue.

Queue Terminology

When it comes to operations we can perform on the data structure, the two most important ones are:

  1. Enqueue - add an element to the end of the queue. In the Java Queue interface this operation is represented by the add(), and offer() methods.

  2. Dequeue - remove an element from the front of the queue. In the Java Queue interface, this operation is represented by the remove(), and poll() methods.

In the context of multithreaded Java applications there are 2 types of actors performing operations on a queue:

  1. Producer - The thread that adds elements to the queue by calling the enqueue operation.

  2. Consumer - The thread that removes elements from the queue by calling the dequeue operation.

Depending on the use case there may be multiple producer threads and or multiple consumer threads.

Queue in a Multithreaded Java Application

In a Java multithreaded production application environment, two concerns need to be taken into account when choosing a concrete implementation for the queue data structure.

  1. Thread Safety

When working with a data structure that isn’t inherently thread-safe, many undesired side effects may happen when the data structure is accessed and modified by multiple threads. A few of those side effects include race conditions, data races and others. By trying to solve these Java Multithreading problems, developers who aren’t proficient in Java Multithreading and Concurrency may cause even more problems in a form of deadlocks or performance bottlenecks due to inefficient and excessive locking.

If you want to learn how to efficiently solve such challenges while providing your Java application with high performance make sure to check out Java Multithreading, Concurrency, and Performance Optimization online course.

  2. Bounded Size

When the number of elements in our queue grows uncontrollably to the point that we exceed the JVM’s heap size, our application may crash due to an OutOfMemoryError exception, which is generally unrecoverable. To prevent this from happening we need to set an upper bound for the number of elements we can store in our queue at any given moment. Additionally, we need to prevent any thread that attempts to add more elements to the queue from succeeding, which can be achieved by blocking that thread.

ArrayBlockingQueue

One implementation of the queue data structure that solves both of those concerns out-of-the-box is ArrayBlockingQueue, which is part of the java.util.concurrent package. The ArrayBlockingQueue uses a bounded cyclic buffer backed by a fixed-size array to store its elements.

ArrayBlockingQueue - Thread Safety

To prevent any concurrency issues and side effects due to multithreading, all of the ArrayBlockingQueue methods are protecting its operations with a thread lock.

Here are a few examples:

public boolean offer(E e) {
   Objects.requireNonNull(e);

   final ReentrantLock lock = this.lock;
   lock.lock(); // Start protected block

   try {
       if (count == items.length)
           return false;
       else {
           enqueue(e);
           return true;
       }
   } finally {
       lock.unlock(); // End protected block
   }
}
public E poll() {
   final ReentrantLock lock = this.lock;

   lock.lock(); // Start protected block

   try {
       return (count == 0) ? null : dequeue();
   } finally {
       lock.unlock(); // End protected block
   }
}

As we can see in the above two examples each operation starts with locking a reentrant lock, and each operation finishes with unlocking that lock.

ArrayBlockingQueue - Bounded Size

ArrayBlockingQueue is backed by an array, which is the perfect data structure for limiting the number of elements that can be stored inside the queue.

Additionally, to provide a clear contract for the ArrayBlockingQueue users which explicitly blocks a producer thread when the queue is already full, the ArrayBlockingQueue class implements the BlockingQueue interface instead of the Queue interface directly.

The BlockingQueue interface extends the Queue interface which means BlockingQueue contains all of Queue methods declarations but additionally it contains 2 more methods:

  1. put(element) - Inserts an element into this queue, blocks the thread if the queue is already full.

  2. take() - Retrieves and removes the head of this queue, blocks the thread if the queue is empty.

Let’s examine the implementations of the following methods:

public void put(E e) throws InterruptedException {
   Objects.requireNonNull(e);
   final ReentrantLock lock = this.lock;
   lock.lockInterruptibly();
   try {
       while (count == items.length)
           notFull.await(); // Block the thread until the queue has space for additional elements
       enqueue(e);
   } finally {
       lock.unlock();
   }
}

We can see that if the queue is already full the thread calling the put() method goes into a blocking state. By using the notFull condition variable’s await() method, the calling thread will remain dormant until it is explicitly woken up after the consumer thread removes an item from the queue.

Similarly, if we look at the take() method:

public E take() throws InterruptedException {
   final ReentrantLock lock = this.lock;
   lock.lockInterruptibly();
   try {
       while (count == 0)
           notEmpty.await(); // Block the current thread until there are elements in the queue
       return dequeue();
   } finally {
       lock.unlock();
   }
}

We can see if the count of elements in the queue is zero then the calling thread will be blocked through the notEmpty condition variable’s await(); method. And the calling thread will remain inactive until the producer thread adds an item to the queue which the consumer can consume.

If you want to gain deep knowledge of Java Multithreading and learn how to incorporate all the above-mentioned techniques into your own Java application, check out the Java Multithreading, Concurrency & Performance Optimization course.

In this top-rated, online course you will master Java Concurrency from the very beginning to expert level. This course is perfect for busy software engineers who want to up-level their skills and gain practical knowledge that they can apply to their projects right away.

More Articles

Previous
Previous

Java PriorityBlockingQueue - Thread-Safe and Memory Efficient Concurrent Heap

Next
Next

What makes JUnit the most popular Java Framework