LLD
Machine Coding
Interview Course
Java โ€ข Interview Prep
๐Ÿงช Concurrency Practice/

Build Your Own BlockingQueue from Scratch

Lesson 4 of 5

Build Your Own BlockingQueue from Scratch


The Problem

In the previous lesson, we used ArrayBlockingQueue and it just worked. But how does it work internally? In interviews, you will be asked to build one from scratch.

The requirements are simple:

  • Fixed-capacity buffer
  • put(item) blocks when the buffer is full
  • take() blocks when the buffer is empty
  • Thread-safe for any number of producers and consumers

Let us build it step by step.


Let us See It Break

First, a naive attempt without proper synchronization:

java
1import java.util.LinkedList;
2import java.util.Queue;
3
4public class BrokenBlockingQueue<T> {
5    private final Queue<T> queue = new LinkedList<>();
6    private final int capacity;
7
8    public BrokenBlockingQueue(int capacity) {
9        this.capacity = capacity;
10    }
11
12    public void put(T item) {
13        while (queue.size() == capacity) {
14            // Busy-wait... burns CPU
15        }
16        queue.add(item);
17    }
18
19    public T take() {
20        while (queue.isEmpty()) {
21            // Busy-wait... burns CPU
22        }
23        return queue.poll();
24    }
25
26    public static void main(String[] args) {
27        BrokenBlockingQueue<Integer> bq = new BrokenBlockingQueue<>(3);
28
29        // Producer
30        Thread producer = new Thread(() -> {
31            for (int i = 1; i <= 20; i++) {
32                bq.put(i);
33                System.out.println("Produced: " + i);
34            }
35        });
36
37        // Consumer
38        Thread consumer = new Thread(() -> {
39            for (int i = 1; i <= 20; i++) {
40                int val = bq.take();
41                System.out.println("Consumed: " + val);
42                try { Thread.sleep(100); } catch (InterruptedException e) { break; }
43            }
44        });
45
46        producer.start();
47        consumer.start();
48    }
49}

What goes wrong:

  • Busy-waiting pegs the CPU at 100% while waiting
  • No visibility guarantee: the while loop might never see the updated queue size (JIT compiler can hoist the queue.size() call out of the loop)
  • LinkedList is not thread-safe: concurrent add() and poll() corrupt internal state
  • Lost items or duplicates: two consumers could both see isEmpty() == false and both call poll(), one gets null

Why It Breaks

The core issues:

  1. No mutual exclusion. Multiple threads access the queue simultaneously. LinkedList operations are not atomic.
  2. No blocking mechanism. Busy-waiting wastes CPU and is unreliable due to visibility issues.
  3. No signaling. When a producer adds an item, it does not wake up a waiting consumer. When a consumer removes an item, it does not wake up a waiting producer.

Fix #1: The Simple Way (synchronized + wait/notify)

java
1import java.util.LinkedList;
2import java.util.Queue;
3
4public class SimpleBlockingQueue<T> {
5    private final Queue<T> queue = new LinkedList<>();
6    private final int capacity;
7
8    public SimpleBlockingQueue(int capacity) {
9        this.capacity = capacity;
10    }
11
12    public synchronized void put(T item) throws InterruptedException {
13        while (queue.size() == capacity) {
14            wait();  // Release lock, sleep until notified
15        }
16        queue.add(item);
17        notifyAll();  // Wake consumers who might be waiting
18    }
19
20    public synchronized T take() throws InterruptedException {
21        while (queue.isEmpty()) {
22            wait();  // Release lock, sleep until notified
23        }
24        T item = queue.poll();
25        notifyAll();  // Wake producers who might be waiting
26        return item;
27    }
28
29    public synchronized int size() {
30        return queue.size();
31    }
32
33    public static void main(String[] args) {
34        SimpleBlockingQueue<Integer> bq = new SimpleBlockingQueue<>(3);
35
36        // Producer
37        Thread producer = new Thread(() -> {
38            try {
39                for (int i = 1; i <= 10; i++) {
40                    bq.put(i);
41                    System.out.println("Produced: " + i + " [size: " + bq.size() + "]");
42                }
43            } catch (InterruptedException e) {
44                Thread.currentThread().interrupt();
45            }
46        });
47
48        // Consumer
49        Thread consumer = new Thread(() -> {
50            try {
51                for (int i = 1; i <= 10; i++) {
52                    int val = bq.take();
53                    System.out.println("Consumed: " + val + " [size: " + bq.size() + "]");
54                    Thread.sleep(200);  // Slow consumer
55                }
56            } catch (InterruptedException e) {
57                Thread.currentThread().interrupt();
58            }
59        });
60
61        producer.start();
62        consumer.start();
63    }
64}

This works. But notifyAll() wakes everyone -- both waiting producers AND waiting consumers. If there are 5 producers and 5 consumers waiting, and we add one item, all 10 threads wake up. 9 of them check the condition, find it is not for them, and go back to sleep. Wasteful.


Fix #2: The Better Way (ReentrantLock + Two Conditions)

This is the canonical interview answer. Two separate conditions eliminate wasted wakeups:

java
1import java.util.LinkedList;
2import java.util.Queue;
3import java.util.concurrent.locks.Condition;
4import java.util.concurrent.locks.ReentrantLock;
5
6public class MyBlockingQueue<T> {
7    private final Queue<T> queue = new LinkedList<>();
8    private final int capacity;
9    private final ReentrantLock lock = new ReentrantLock();
10    private final Condition notFull = lock.newCondition();   // Producers wait here
11    private final Condition notEmpty = lock.newCondition();  // Consumers wait here
12
13    public MyBlockingQueue(int capacity) {
14        if (capacity <= 0) throw new IllegalArgumentException("Capacity must be > 0");
15        this.capacity = capacity;
16    }
17
18    public void put(T item) throws InterruptedException {
19        lock.lock();
20        try {
21            // Wait while the queue is full
22            while (queue.size() == capacity) {
23                notFull.await();  // Block producer until space available
24            }
25            queue.add(item);
26            notEmpty.signal();  // Wake ONE waiting consumer
27        } finally {
28            lock.unlock();
29        }
30    }
31
32    public T take() throws InterruptedException {
33        lock.lock();
34        try {
35            // Wait while the queue is empty
36            while (queue.isEmpty()) {
37                notEmpty.await();  // Block consumer until item available
38            }
39            T item = queue.poll();
40            notFull.signal();  // Wake ONE waiting producer
41            return item;
42        } finally {
43            lock.unlock();
44        }
45    }
46
47    public int size() {
48        lock.lock();
49        try {
50            return queue.size();
51        } finally {
52            lock.unlock();
53        }
54    }
55
56    public static void main(String[] args) {
57        MyBlockingQueue<String> kitchen = new MyBlockingQueue<>(3);
58
59        // Two producers (chefs)
60        for (int c = 1; c <= 2; c++) {
61            final int chefId = c;
62            new Thread(() -> {
63                String[] dishes = {"Pasta", "Steak", "Salad", "Soup", "Pizza"};
64                try {
65                    for (String dish : dishes) {
66                        String item = "Chef-" + chefId + ":" + dish;
67                        kitchen.put(item);
68                        System.out.println("  [PUT] " + item
69                            + " (size=" + kitchen.size() + ")");
70                        Thread.sleep(50);
71                    }
72                } catch (InterruptedException e) {
73                    Thread.currentThread().interrupt();
74                }
75            }, "Chef-" + c).start();
76        }
77
78        // Three consumers (waiters)
79        for (int w = 1; w <= 3; w++) {
80            final int waiterId = w;
81            new Thread(() -> {
82                try {
83                    for (int i = 0; i < 3; i++) {  // Each waiter takes 3 items
84                        String dish = kitchen.take();
85                        System.out.println("[TAKE] Waiter-" + waiterId + " got " + dish
86                            + " (size=" + kitchen.size() + ")");
87                        Thread.sleep(200);
88                    }
89                } catch (InterruptedException e) {
90                    Thread.currentThread().interrupt();
91                }
92            }, "Waiter-" + w).start();
93        }
94    }
95}

Key design choices:

  • Two conditions: notFull is signaled when an item is removed (space available). notEmpty is signaled when an item is added (data available).
  • **signal() not signalAll():** We only need to wake ONE waiting thread, since only one can proceed (one slot opened or one item added).
  • **while not if:** After waking, the condition might have changed (another thread got there first). Always re-check.
  • **finally block:** Ensures the lock is released even if the code throws an exception.

Fix #3: The Java Way (How ArrayBlockingQueue Works)

The actual ArrayBlockingQueue in the JDK uses the exact same approach -- but with a circular array instead of a linked list:

java
1import java.util.concurrent.locks.Condition;
2import java.util.concurrent.locks.ReentrantLock;
3
4public class MyArrayBlockingQueue<T> {
5    private final Object[] items;
6    private int putIndex;   // Next slot for put
7    private int takeIndex;  // Next slot for take
8    private int count;      // Number of items in queue
9
10    private final ReentrantLock lock = new ReentrantLock();
11    private final Condition notFull = lock.newCondition();
12    private final Condition notEmpty = lock.newCondition();
13
14    public MyArrayBlockingQueue(int capacity) {
15        items = new Object[capacity];
16    }
17
18    public void put(T item) throws InterruptedException {
19        lock.lock();
20        try {
21            while (count == items.length) {
22                notFull.await();
23            }
24            items[putIndex] = item;
25            putIndex = (putIndex + 1) % items.length;  // Circular wrap
26            count++;
27            notEmpty.signal();
28        } finally {
29            lock.unlock();
30        }
31    }
32
33    @SuppressWarnings("unchecked")
34    public T take() throws InterruptedException {
35        lock.lock();
36        try {
37            while (count == 0) {
38                notEmpty.await();
39            }
40            T item = (T) items[takeIndex];
41            items[takeIndex] = null;  // Help GC
42            takeIndex = (takeIndex + 1) % items.length;  // Circular wrap
43            count--;
44            notFull.signal();
45            return item;
46        } finally {
47            lock.unlock();
48        }
49    }
50
51    public static void main(String[] args) throws InterruptedException {
52        MyArrayBlockingQueue<Integer> q = new MyArrayBlockingQueue<>(3);
53
54        Thread producer = new Thread(() -> {
55            try {
56                for (int i = 1; i <= 8; i++) {
57                    q.put(i);
58                    System.out.println("Put: " + i);
59                }
60            } catch (InterruptedException e) {
61                Thread.currentThread().interrupt();
62            }
63        });
64
65        Thread consumer = new Thread(() -> {
66            try {
67                for (int i = 1; i <= 8; i++) {
68                    int val = q.take();
69                    System.out.println("Took: " + val);
70                    Thread.sleep(100);
71                }
72            } catch (InterruptedException e) {
73                Thread.currentThread().interrupt();
74            }
75        });
76
77        producer.start();
78        consumer.start();
79        producer.join();
80        consumer.join();
81        System.out.println("Done!");
82    }
83}

Why a circular array?

  • No object allocation on put/take (unlike LinkedList which creates a Node object each time)
  • Better cache locality (array elements are contiguous in memory)
  • No GC pressure from node objects

The putIndex and takeIndex wrap around using modulo: (index + 1) % capacity. When they reach the end of the array, they loop back to 0.


Step-by-Step Trace

Circular array with capacity 3, producing A, B, C, D:

java
1Initial:  [_, _, _]  putIndex=0, takeIndex=0, count=0
2
3put(A):   [A, _, _]  putIndex=1, takeIndex=0, count=1
4put(B):   [A, B, _]  putIndex=2, takeIndex=0, count=2
5put(C):   [A, B, C]  putIndex=0, takeIndex=0, count=3  (wraps!)
6put(D):   BLOCKS (count == capacity)
7
8take():   [_, B, C]  putIndex=0, takeIndex=1, count=2  -> returns A
9          put(D) UNBLOCKS
10
11put(D):   [D, B, C]  putIndex=1, takeIndex=1, count=3  (D goes in slot 0!)
12
13take():   [D, _, C]  putIndex=1, takeIndex=2, count=2  -> returns B
14take():   [D, _, _]  putIndex=1, takeIndex=0, count=1  -> returns C (wraps!)
15take():   [_, _, _]  putIndex=1, takeIndex=1, count=0  -> returns D

Notice how putIndex and takeIndex chase each other around the array. The items come out in FIFO order: A, B, C, D.


Common Mistakes

  • **Using if instead of while for the await condition.** Between being signaled and re-acquiring the lock, another thread might have changed the state. Always re-check with while.
  • **Using signalAll() when signal() suffices.** For a standard blocking queue with two conditions, signal() is correct and more efficient. You only need to wake one waiter.
  • Forgetting to null out taken slots in the array. Without items[takeIndex] = null, the array holds references to consumed objects, preventing garbage collection. This is a memory leak.
  • **Not making the lock final.** If someone reassigns the lock field, different threads would lock on different objects. Always final.
  • Forgetting InterruptedException. await() throws InterruptedException. If you catch and swallow it, the thread cannot be properly shut down.

Interview Tip: This is one of the most frequently asked concurrency interview questions. Know the two-condition pattern cold: one condition for "not full" (producers await, consumers signal) and one for "not empty" (consumers await, producers signal). If you can whiteboard the circular array version, you will impress. Mention that ArrayBlockingQueue uses this exact pattern internally -- it shows you have read the JDK source.


Try It Yourself

  1. **Add a peek() method** that returns the head element without removing it, blocking if empty. Is this useful in practice?
  2. **Add a tryPut(item, timeout, unit) method** that waits at most the specified time and returns false if it could not insert. Hint: use Condition.await(long, TimeUnit).
  3. **Add a drainTo(collection, maxElements) method** that removes up to N items and adds them to the given collection in one lock acquisition. Why is this faster than calling take() N times?

Summary

  • A blocking queue needs mutual exclusion (lock), blocking (await), and signaling (signal).
  • Two conditions (notFull, notEmpty) avoid wasted wakeups compared to a single lock with notifyAll().
  • Circular array avoids allocation overhead and improves cache performance versus a linked list.
  • Always use while loops around await(), always unlock in finally, always signal() after state changes.