Java PriorityBlockingQueue - Thread-Safe and Memory Efficient Concurrent Heap
In this article, we are going to learn about a very important thread-safe and memory-efficient data structure implementation, the PriorityBlockingQueue. We will first start by comparing it to the traditional heap-based PriorityQueue implementation, and later we will see what concurrency and performance-related features and guarantees the PriorityBlockingQueue provides to us.
Finally, we will write some Java code to see how a PriorityBlockingQueue is used in a practical real-life example.
What is PriorityQueue
To illustrate the advantages we get from PriorityBlockingQueue, let’s compare it to the traditional PriorityQueue first.
A PriorityQueue in Java is a special type of queue which is used when the elements are required to be processed based on their priority. It is used when we don’t want our queue to follow the traditional F.I.F.O (first-in-first-out) ordering but instead gives preference to the higher priority elements. The PriorityQueue is based on the Binary Heap data structure. We will see the properties of Binary Heap later in this article.
PriorityQueue Use-Cases
A real-life example of a PriorityQueue is the hospital queue where all the patients are treated/admitted based on the criticality of the condition, patients with emergencies would be given priority over patients who came for regular health check-ups.
Another notable example is the Operating System’s CPU scheduling which uses the PriorityQueue internally and assigns CPU-Cycles based on the priority of the threads.
PriorityQueue Ordering and Implementation
The elements of the PriorityQueue are ordered as per the natural ordering. In case a custom order is required then we can pass a Comparator at construction time.
The head of the PriorityQueue will always have the smallest element based on natural ordering or comparator, the rear of the PriorityQueue will have the greatest element based on natural ordering or comparator.
PriorityQueue<Integer> pQueue = new PriorityQueue<Integer>();
However, we can reverse this ordering using the Collection.reverseOrder() method.
PriorityQueue<Integer> pQueue = new PriorityQueue<Integer>(Collections.reverseOrder());
As mentioned earlier, the underlining data structure that implements the PriorityQueue is a binary heap.
Binary Heap
A binary heap is a complete binary tree that satisfies the heap ordering property.
There are two types of heaps:
Min-Heap: Each node of the binary tree is greater or equal to its parent and the lowest element would be at the root. When we simply say heap, we refer to the Min-Heap.
Max-Heap: Each node of the binary tree is less or equal to its parent and the highest element would be at the root.
Concurrency and Thread Safety Issues
When we have multiple producer threads, concurrently adding items to the PriorityQueue and/or multiple consumer threads concurrently consuming items from the PriorityQueue we can have race conditions that will leave the underlining heap data structure corrupt. Because of this issue, PriorityQueue cannot support multiple producers or consumers.
If you’re not yet familiar with Java Multithreading, its performance benefits, and challenges associated with parle programming, make sure to check out the Java Concurrency, Multithreading & Performance Optimization course.
Memory Issues with a PriorityQueue
The PriorityQueue is a dynamic data structure that has no limitation on the number of elements it can accept from producers. If the number of elements exceeds the memory allocated for the JVM process, our application will crash due to an OutOfMemoryError exception.
Introducing PriorityBlockingQueue
PriorityBlockingQueue - Concurrency
Unlike the PriorityQueue, the PriorityBlockingQueue is thread-safe.
All of the PriorityBlockingQueue’s operations are guaranteed to be side-effect-free when it comes to race conditions or other concurrency issues. This makes the PriorityBlockingQueue the perfect data structure for production, Multithreaded Java application.
2. PriorityBlockingQueue - Unbound Capacity and Blocking
The PriorityBlockingQueue class is an implementation of the BlockingQueue interface in Java.
In a typical producer-consumer problem in a multithreaded environment, the PriorityBlockingQueue will block the producer from adding more elements to it if the queue is full. However, since the PriorityBlockingQueue is an unbound queue, meaning that it does not have a maximum capacity, the producers will never be blocked from adding more elements to the queue.
On the other hand, when the PriorityBlockingQueue becomes empty, consumer threads will be blocked until additional items are added to the queue.
3. PriorityBlockingQueue - Memory Efficiency and Performance
The PriorityBlockingQueue is backed by an array-based binary tree heap - which is extremely memory efficient. This provides our application with high performance and reduces the memory overhead associated with reference-based data structures.
However despite its array-based implementation, as we mentioned earlier, the PriorityBlockingQueue is an unbounded collection and it will grow dynamically as the elements get added to it.
The default initial capacity for PriorityBlockingQueue is 11, but it can easily be changed by using the appropriate constructor.
4. PriorityBlockingQueue - Ordering Of Elements
By default, the PriorityBlockingQueue is following the standard min-heap properties which mean the head of the PriorityBlockingQueue will have the smallest element based on natural ordering.
Objects in the PriorityBlockingQueue must be comparable for natural ordering, or the ClassCastException will be thrown.
For additional flexibility when it comes to the ordering of elements, a Comparator can be used.
PriorityBlockingQueue in Action
Now that we got familiar with the PriorityBlockingQueue structure and all the advantages it provides to us, let’s see this data structure in action in a practical example.
In this example, we’ll have a multithreaded application where a fast producer adds items to the PriorityBlockingQueue from one thread, and a slower consumer takes items from the PriorityBlockingQueue on a different thread.
This will demonstrate the ability:
To modify and access the PriorityBlockingQueue from multiple threads safely
For the PriorityBlockingQueue to grow dynamically as more elements are added to it
Order the elements based on their custom priority
When the PriorityBlockingQueue becomes empty, the consumer thread will block indefinitely until additional elements are added to the PriorityBlockingQueue
We will start with creating an Employee class:
public class Employee implements Comparable<Employee> { private int employeeId; private int age; // Older Employees Will Have a Higher Priority. public Employee(int employeeId, int age) { this.employeeId = employeeId; this.age = age; } @Override public int compareTo(Employee o) { if (this.age > o.age) { return 1; } else if (this.age < o.age) { return -1; } return 0; } @Override public String toString() { return "Employee{" + "employeeId=" + employeeId + ", age=" + age + '}'; } }
Notice that we will use the age of the employee as the priority inside the PriorityBlockingQueue.
And since we want to provide higher priority for older employees will need to create a max-heap.
So in the following piece of code, we’re going to create a PriorityBlockingQueue with an initial capacity of 5 elements, and use a custom Comparator to order the elements in a reverse order which will create a max-heap.
Additionally, we’re creating a producer thread that adds elements to the PriorityBlockingQueue, and a consumer thread that consumes elements from the PriorityBlockingQueue based on their priority, concurrently to the producer thread.
import java.util.Collections; import java.util.concurrent.PriorityBlockingQueue; public class PriorityBlockingQueueExample { public static void main(String[] args) { PriorityBlockingQueue<Employee> priorityBlockingQueue = new PriorityBlockingQueue<>(5, Collections.reverseOrder()); ConcurrentProducer producer = new ConcurrentProducer(priorityBlockingQueue); producer.start(); ConcurrentConsumer consumer = new ConcurrentConsumer(priorityBlockingQueue); consumer.start(); } }
Finally here is the code for the producer thread, that adds employees to the PriorityBlockingQueue with random ids and ages.
import java.util.Random; import java.util.UUID; import java.util.concurrent.PriorityBlockingQueue; public class ConcurrentProducer extends Thread { protected PriorityBlockingQueue<Employee> priorityBlockingQueue; Random random = new Random(); public ConcurrentProducer(PriorityBlockingQueue<Employee> queue) { this.priorityBlockingQueue = queue; } @Override public void run() { for (int i = 0; i < 10; i++) { try { int randomId = random.nextInt(10000)+1; int randomAge = random.nextInt(60) + 25; Employee employee = new Employee(randomId, randomAge); System.out.println("Producer adding employee: " + employee); priorityBlockingQueue.put(employee); Thread.sleep(1000); } catch (Exception e){ //(InterruptedException e) { e.printStackTrace(); } } } }
And this is the code for the consumer thread that removes elements from the PriorityBlockingQueue based on the employee’s age as the priority.
import java.util.concurrent.PriorityBlockingQueue; public class ConcurrentConsumer extends Thread { protected PriorityBlockingQueue<Employee> priorityBlockingQueue; public ConcurrentConsumer(PriorityBlockingQueue<Employee> queue) { this.priorityBlockingQueue = queue; } @Override public void run() { while (true) { try { Employee employee = priorityBlockingQueue.take(); System.out.println("Consumer taking the element as per priority: " + employee); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
Output
Producer adding employee: Employee{employeeId=8078, age=71} Consumer taking the element as per priority: Employee{employeeId=8078, age=71} Producer adding employee: Employee{employeeId=8506, age=66} Producer adding employee: Employee{employeeId=32, age=34} Producer adding employee: Employee{employeeId=6121, age=71} Producer adding employee: Employee{employeeId=3063, age=59} Consumer taking the element as per priority: Employee{employeeId=6121, age=71} Producer adding employee: Employee{employeeId=7950, age=60} Producer adding employee: Employee{employeeId=3518, age=51} Producer adding employee: Employee{employeeId=422, age=80} Producer adding employee: Employee{employeeId=3820, age=51} Producer adding employee: Employee{employeeId=5601, age=78} Consumer taking the element as per priority: Employee{employeeId=422, age=80} Consumer taking the element as per priority: Employee{employeeId=5601, age=78} Consumer taking the element as per priority: Employee{employeeId=8506, age=66} Consumer taking the element as per priority: Employee{employeeId=7950, age=60} Consumer taking the element as per priority: Employee{employeeId=3063, age=59} Consumer taking the element as per priority: Employee{employeeId=3518, age=51} Consumer taking the element as per priority: Employee{employeeId=3820, age=51} Consumer taking the element as per priority: Employee{employeeId=32, age=34}
Notice that at any given moment the consumer prefers the employee with the highest age and gets to the employees with the lower ages towards the end of the program.
Also notice that while both the producer and the consumer are adding and removing items to the same data structure concurrently there are no multithreading race conditions or any other concurrency issues.
If you want to become and expert in Multithreading and learn how to build high-performance Multithreaded applications as well as implement similar data structures for your own needs, join the best-selling course on Java Multithreading, Concurrency & Performance Optimization. In this course you will learn all the basics of Multithreading in Java, and how to utilize multiple threads to build highly responsive and blazingly fast algorithms and programs.