In this blog, I am going to explain how to implement a thread-safe, blocking queue in Java using low-level concurrency primitives like wait(), notify() and notifyAll().
Java is multi-threaded language and in concurrent programming BlockingQueue plays a crucial role in managing producer-consumer problems. While Java provides robust implementations like ArrayBlockingQueue and LinkedBlockingQueue, understanding how to build a custom blocking queue from scratch helps you grasp core concurrency principles.
What is a Blocking Queue?
A BlockingQueue in Java is a type of queue that supports thread-safe operations where elements can be added or removed with optional waiting behavior.
If the queue is full, a thread trying to add an element will block until space becomes available. Similarly, if the queue is empty, a thread trying to remove an element will wait until an item is present.
It’s commonly used in producer-consumer scenarios and is part of the java.util.concurrent package.
Some of the features of blocking queue –
- Fixed capacity
- Thread-safe operations
- Blocking insert/add operations when the queue is full
- Blocking remove operation when the queue is empty
Blocking Queue Implementation in Java
Before writing blocking queue implementation, it is important to understand the wait(), notify() and notifyAll().
- wait()
- When a thread calls wait(), it pauses its execution and gives up the lock it was holding on that object.
- The thread stays in the waiting state until another thread calls notify() or notifyAll() on the same object to wake it up.
- wait() must be called inside a synchronized block or method, otherwise it throws an exception.
2. notify()
- When notify() is called, it wakes up one thread that is currently waiting on the same object’s monitor.If there are multiple waiting threads, it chooses one randomly to wake up.
- Only threads that have previously called wait() on that object can be notified.
- The woken-up thread can’t continue right away — it must first re-acquire the lock on the object before it can proceed.
3. notifyAll()
- When notifyAll() calls, it wakes up all threads waiting on the same object’s monitor.
- It is useful when multiple threads are waiting and any one of them can proceed.
- Like notify(), each thread must re-acquire the lock before resuming.
Now, we understood what happens when we call wait(), notify() and notifyAll(). Let’s write a java code to implement blocking queue using wait() and notifyAll() construct.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
//Custom Blocking Queue public class CustomBlockingQueue<T> { private final Object[] queue; private int head = 0; private int tail = 0; private int count = 0; private final int capacity; public CustomBlockingQueue(int capacity) { this.capacity = capacity; this.queue = new Object[capacity]; } // Add an element to the queue public synchronized void put(T element) throws InterruptedException { // Wait if queue is full while (count == capacity) { wait(); } queue[tail] = element; tail = (tail + 1) % capacity; count++; notifyAll(); // Notify consumers } // Remove and return the head of the queue @SuppressWarnings("unchecked") public synchronized T take() throws InterruptedException { while (count == 0) { wait(); // Wait if queue is empty } T item = (T) queue[head]; queue[head] = null; // Help GC head = (head + 1) % capacity; count--; notifyAll(); // Notify producers return item; } public synchronized int size() { return count; } } |
Explanation
We use a circular array to manage queue indices efficiently.
* The put
method waits if the queue is full, and take
waits if the queue is empty.
* notifyAll() ensures that waiting threads are awakened when a change occurs (like adding or removing an element)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
public class BlockingQueueExample { public static void main(String[] args) { CustomBlockingQueue<Integer> queue = new CustomBlockingQueue<>(5); // Producer Runnable producer = () -> { try { for (int i = 1; i <= 10; i++) { queue.put(i); System.out.println("Produced: " + i); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }; // Consumer Runnable consumer = () -> { try { for (int i = 1; i <= 10; i++) { int item = queue.take(); System.out.println("Consumed: " + item); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }; new Thread(producer).start(); new Thread(consumer).start(); } } |
Conclusion
BlockingQueue is a powerful tool for building robust, thread-safe systems in Java, especially when dealing with concurrent producers and consumers. By handling synchronization and blocking internally, it simplifies multithreaded programming and helps avoid common pitfalls like race conditions and busy waiting.