Build Your Own BlockingQueue from Scratch
The Problem
In the previous lesson, we used ArrayBlockingQueue and it just worked. But how does it work internally? In interviews, you will be asked to build one from scratch.
The requirements are simple:
- Fixed-capacity buffer
put(item)blocks when the buffer is fulltake()blocks when the buffer is empty- Thread-safe for any number of producers and consumers
Let us build it step by step.
Let us See It Break
First, a naive attempt without proper synchronization:
1import java.util.LinkedList;
2import java.util.Queue;
3
4public class BrokenBlockingQueue<T> {
5 private final Queue<T> queue = new LinkedList<>();
6 private final int capacity;
7
8 public BrokenBlockingQueue(int capacity) {
9 this.capacity = capacity;
10 }
11
12 public void put(T item) {
13 while (queue.size() == capacity) {
14 // Busy-wait... burns CPU
15 }
16 queue.add(item);
17 }
18
19 public T take() {
20 while (queue.isEmpty()) {
21 // Busy-wait... burns CPU
22 }
23 return queue.poll();
24 }
25
26 public static void main(String[] args) {
27 BrokenBlockingQueue<Integer> bq = new BrokenBlockingQueue<>(3);
28
29 // Producer
30 Thread producer = new Thread(() -> {
31 for (int i = 1; i <= 20; i++) {
32 bq.put(i);
33 System.out.println("Produced: " + i);
34 }
35 });
36
37 // Consumer
38 Thread consumer = new Thread(() -> {
39 for (int i = 1; i <= 20; i++) {
40 int val = bq.take();
41 System.out.println("Consumed: " + val);
42 try { Thread.sleep(100); } catch (InterruptedException e) { break; }
43 }
44 });
45
46 producer.start();
47 consumer.start();
48 }
49}What goes wrong:
- Busy-waiting pegs the CPU at 100% while waiting
- No visibility guarantee: the while loop might never see the updated queue size (JIT compiler can hoist the
queue.size()call out of the loop) - LinkedList is not thread-safe: concurrent
add()andpoll()corrupt internal state - Lost items or duplicates: two consumers could both see
isEmpty() == falseand both callpoll(), one gets null
Why It Breaks
The core issues:
- No mutual exclusion. Multiple threads access the queue simultaneously.
LinkedListoperations are not atomic. - No blocking mechanism. Busy-waiting wastes CPU and is unreliable due to visibility issues.
- No signaling. When a producer adds an item, it does not wake up a waiting consumer. When a consumer removes an item, it does not wake up a waiting producer.
Fix #1: The Simple Way (synchronized + wait/notify)
1import java.util.LinkedList;
2import java.util.Queue;
3
4public class SimpleBlockingQueue<T> {
5 private final Queue<T> queue = new LinkedList<>();
6 private final int capacity;
7
8 public SimpleBlockingQueue(int capacity) {
9 this.capacity = capacity;
10 }
11
12 public synchronized void put(T item) throws InterruptedException {
13 while (queue.size() == capacity) {
14 wait(); // Release lock, sleep until notified
15 }
16 queue.add(item);
17 notifyAll(); // Wake consumers who might be waiting
18 }
19
20 public synchronized T take() throws InterruptedException {
21 while (queue.isEmpty()) {
22 wait(); // Release lock, sleep until notified
23 }
24 T item = queue.poll();
25 notifyAll(); // Wake producers who might be waiting
26 return item;
27 }
28
29 public synchronized int size() {
30 return queue.size();
31 }
32
33 public static void main(String[] args) {
34 SimpleBlockingQueue<Integer> bq = new SimpleBlockingQueue<>(3);
35
36 // Producer
37 Thread producer = new Thread(() -> {
38 try {
39 for (int i = 1; i <= 10; i++) {
40 bq.put(i);
41 System.out.println("Produced: " + i + " [size: " + bq.size() + "]");
42 }
43 } catch (InterruptedException e) {
44 Thread.currentThread().interrupt();
45 }
46 });
47
48 // Consumer
49 Thread consumer = new Thread(() -> {
50 try {
51 for (int i = 1; i <= 10; i++) {
52 int val = bq.take();
53 System.out.println("Consumed: " + val + " [size: " + bq.size() + "]");
54 Thread.sleep(200); // Slow consumer
55 }
56 } catch (InterruptedException e) {
57 Thread.currentThread().interrupt();
58 }
59 });
60
61 producer.start();
62 consumer.start();
63 }
64}This works. But notifyAll() wakes everyone -- both waiting producers AND waiting consumers. If there are 5 producers and 5 consumers waiting, and we add one item, all 10 threads wake up. 9 of them check the condition, find it is not for them, and go back to sleep. Wasteful.
Fix #2: The Better Way (ReentrantLock + Two Conditions)
This is the canonical interview answer. Two separate conditions eliminate wasted wakeups:
1import java.util.LinkedList;
2import java.util.Queue;
3import java.util.concurrent.locks.Condition;
4import java.util.concurrent.locks.ReentrantLock;
5
6public class MyBlockingQueue<T> {
7 private final Queue<T> queue = new LinkedList<>();
8 private final int capacity;
9 private final ReentrantLock lock = new ReentrantLock();
10 private final Condition notFull = lock.newCondition(); // Producers wait here
11 private final Condition notEmpty = lock.newCondition(); // Consumers wait here
12
13 public MyBlockingQueue(int capacity) {
14 if (capacity <= 0) throw new IllegalArgumentException("Capacity must be > 0");
15 this.capacity = capacity;
16 }
17
18 public void put(T item) throws InterruptedException {
19 lock.lock();
20 try {
21 // Wait while the queue is full
22 while (queue.size() == capacity) {
23 notFull.await(); // Block producer until space available
24 }
25 queue.add(item);
26 notEmpty.signal(); // Wake ONE waiting consumer
27 } finally {
28 lock.unlock();
29 }
30 }
31
32 public T take() throws InterruptedException {
33 lock.lock();
34 try {
35 // Wait while the queue is empty
36 while (queue.isEmpty()) {
37 notEmpty.await(); // Block consumer until item available
38 }
39 T item = queue.poll();
40 notFull.signal(); // Wake ONE waiting producer
41 return item;
42 } finally {
43 lock.unlock();
44 }
45 }
46
47 public int size() {
48 lock.lock();
49 try {
50 return queue.size();
51 } finally {
52 lock.unlock();
53 }
54 }
55
56 public static void main(String[] args) {
57 MyBlockingQueue<String> kitchen = new MyBlockingQueue<>(3);
58
59 // Two producers (chefs)
60 for (int c = 1; c <= 2; c++) {
61 final int chefId = c;
62 new Thread(() -> {
63 String[] dishes = {"Pasta", "Steak", "Salad", "Soup", "Pizza"};
64 try {
65 for (String dish : dishes) {
66 String item = "Chef-" + chefId + ":" + dish;
67 kitchen.put(item);
68 System.out.println(" [PUT] " + item
69 + " (size=" + kitchen.size() + ")");
70 Thread.sleep(50);
71 }
72 } catch (InterruptedException e) {
73 Thread.currentThread().interrupt();
74 }
75 }, "Chef-" + c).start();
76 }
77
78 // Three consumers (waiters)
79 for (int w = 1; w <= 3; w++) {
80 final int waiterId = w;
81 new Thread(() -> {
82 try {
83 for (int i = 0; i < 3; i++) { // Each waiter takes 3 items
84 String dish = kitchen.take();
85 System.out.println("[TAKE] Waiter-" + waiterId + " got " + dish
86 + " (size=" + kitchen.size() + ")");
87 Thread.sleep(200);
88 }
89 } catch (InterruptedException e) {
90 Thread.currentThread().interrupt();
91 }
92 }, "Waiter-" + w).start();
93 }
94 }
95}Key design choices:
- Two conditions:
notFullis signaled when an item is removed (space available).notEmptyis signaled when an item is added (data available). - **
signal()notsignalAll():** We only need to wake ONE waiting thread, since only one can proceed (one slot opened or one item added). - **
whilenotif:** After waking, the condition might have changed (another thread got there first). Always re-check. - **
finallyblock:** Ensures the lock is released even if the code throws an exception.
Fix #3: The Java Way (How ArrayBlockingQueue Works)
The actual ArrayBlockingQueue in the JDK uses the exact same approach -- but with a circular array instead of a linked list:
1import java.util.concurrent.locks.Condition;
2import java.util.concurrent.locks.ReentrantLock;
3
4public class MyArrayBlockingQueue<T> {
5 private final Object[] items;
6 private int putIndex; // Next slot for put
7 private int takeIndex; // Next slot for take
8 private int count; // Number of items in queue
9
10 private final ReentrantLock lock = new ReentrantLock();
11 private final Condition notFull = lock.newCondition();
12 private final Condition notEmpty = lock.newCondition();
13
14 public MyArrayBlockingQueue(int capacity) {
15 items = new Object[capacity];
16 }
17
18 public void put(T item) throws InterruptedException {
19 lock.lock();
20 try {
21 while (count == items.length) {
22 notFull.await();
23 }
24 items[putIndex] = item;
25 putIndex = (putIndex + 1) % items.length; // Circular wrap
26 count++;
27 notEmpty.signal();
28 } finally {
29 lock.unlock();
30 }
31 }
32
33 @SuppressWarnings("unchecked")
34 public T take() throws InterruptedException {
35 lock.lock();
36 try {
37 while (count == 0) {
38 notEmpty.await();
39 }
40 T item = (T) items[takeIndex];
41 items[takeIndex] = null; // Help GC
42 takeIndex = (takeIndex + 1) % items.length; // Circular wrap
43 count--;
44 notFull.signal();
45 return item;
46 } finally {
47 lock.unlock();
48 }
49 }
50
51 public static void main(String[] args) throws InterruptedException {
52 MyArrayBlockingQueue<Integer> q = new MyArrayBlockingQueue<>(3);
53
54 Thread producer = new Thread(() -> {
55 try {
56 for (int i = 1; i <= 8; i++) {
57 q.put(i);
58 System.out.println("Put: " + i);
59 }
60 } catch (InterruptedException e) {
61 Thread.currentThread().interrupt();
62 }
63 });
64
65 Thread consumer = new Thread(() -> {
66 try {
67 for (int i = 1; i <= 8; i++) {
68 int val = q.take();
69 System.out.println("Took: " + val);
70 Thread.sleep(100);
71 }
72 } catch (InterruptedException e) {
73 Thread.currentThread().interrupt();
74 }
75 });
76
77 producer.start();
78 consumer.start();
79 producer.join();
80 consumer.join();
81 System.out.println("Done!");
82 }
83}Why a circular array?
- No object allocation on put/take (unlike
LinkedListwhich creates aNodeobject each time) - Better cache locality (array elements are contiguous in memory)
- No GC pressure from node objects
The putIndex and takeIndex wrap around using modulo: (index + 1) % capacity. When they reach the end of the array, they loop back to 0.
Step-by-Step Trace
Circular array with capacity 3, producing A, B, C, D:
1Initial: [_, _, _] putIndex=0, takeIndex=0, count=0
2
3put(A): [A, _, _] putIndex=1, takeIndex=0, count=1
4put(B): [A, B, _] putIndex=2, takeIndex=0, count=2
5put(C): [A, B, C] putIndex=0, takeIndex=0, count=3 (wraps!)
6put(D): BLOCKS (count == capacity)
7
8take(): [_, B, C] putIndex=0, takeIndex=1, count=2 -> returns A
9 put(D) UNBLOCKS
10
11put(D): [D, B, C] putIndex=1, takeIndex=1, count=3 (D goes in slot 0!)
12
13take(): [D, _, C] putIndex=1, takeIndex=2, count=2 -> returns B
14take(): [D, _, _] putIndex=1, takeIndex=0, count=1 -> returns C (wraps!)
15take(): [_, _, _] putIndex=1, takeIndex=1, count=0 -> returns DNotice how putIndex and takeIndex chase each other around the array. The items come out in FIFO order: A, B, C, D.
Common Mistakes
- **Using
ifinstead ofwhilefor the await condition.** Between being signaled and re-acquiring the lock, another thread might have changed the state. Always re-check withwhile. - **Using
signalAll()whensignal()suffices.** For a standard blocking queue with two conditions,signal()is correct and more efficient. You only need to wake one waiter. - Forgetting to null out taken slots in the array. Without
items[takeIndex] = null, the array holds references to consumed objects, preventing garbage collection. This is a memory leak. - **Not making the lock
final.** If someone reassigns the lock field, different threads would lock on different objects. Alwaysfinal. - Forgetting InterruptedException.
await()throwsInterruptedException. If you catch and swallow it, the thread cannot be properly shut down.
Interview Tip: This is one of the most frequently asked concurrency interview questions. Know the two-condition pattern cold: one condition for "not full" (producers await, consumers signal) and one for "not empty" (consumers await, producers signal). If you can whiteboard the circular array version, you will impress. Mention that ArrayBlockingQueue uses this exact pattern internally -- it shows you have read the JDK source.
Try It Yourself
- **Add a
peek()method** that returns the head element without removing it, blocking if empty. Is this useful in practice? - **Add a
tryPut(item, timeout, unit)method** that waits at most the specified time and returnsfalseif it could not insert. Hint: useCondition.await(long, TimeUnit). - **Add a
drainTo(collection, maxElements)method** that removes up to N items and adds them to the given collection in one lock acquisition. Why is this faster than callingtake()N times?
Summary
- A blocking queue needs mutual exclusion (lock), blocking (await), and signaling (signal).
- Two conditions (notFull, notEmpty) avoid wasted wakeups compared to a single lock with
notifyAll(). - Circular array avoids allocation overhead and improves cache performance versus a linked list.
- Always use
whileloops aroundawait(), always unlock infinally, alwayssignal()after state changes.