LeetCode 1188 - Design Bounded Blocking Queue

This problem asks us to implement a thread-safe bounded blocking queue, which is a data structure that stores elements in a first-in-first-out (FIFO) order, but with the added constraint that multiple threads may be concurrently accessing it.

LeetCode Problem 1188

Difficulty: 🟡 Medium
Topics: Concurrency

Solution

Problem Understanding

This problem asks us to implement a thread-safe bounded blocking queue, which is a data structure that stores elements in a first-in-first-out (FIFO) order, but with the added constraint that multiple threads may be concurrently accessing it. The queue has a fixed capacity, and two types of operations: enqueue, which adds elements to the queue, and dequeue, which removes elements from the queue. If the queue is full, an enqueue operation must block until there is space available. Similarly, if the queue is empty, a dequeue operation must block until an element is available. The size operation simply returns the current number of elements in the queue and does not block.

The input consists of the capacity of the queue and sequences of enqueue and dequeue operations performed by multiple producer and consumer threads. The output is the result of the dequeue operations and the final size of the queue.

Constraints tell us that the number of producers and consumers is small (at most 8 each) and the maximum queue size is modest (30). This means we do not need to worry about extreme scalability, but we must correctly handle thread synchronization. Edge cases include attempting to dequeue from an empty queue, enqueue to a full queue, and the interaction between multiple threads that may unblock each other.

Key guarantees simplify the implementation: the number of enqueue calls is always greater than or equal to dequeue calls, and the total number of calls is capped at 40.

Approaches

A naive or brute-force approach would be to implement a queue using a list or array and simply append and pop elements without synchronization. This approach would fail in a multithreaded context because concurrent accesses could corrupt the queue state, and it does not handle blocking when the queue is full or empty.

The optimal approach leverages thread synchronization primitives such as threading.Lock for mutual exclusion and threading.Condition for signaling. A Condition allows a thread to wait until a certain condition is met (e.g., queue not empty or not full) and then continue safely. Using these primitives ensures that only one thread modifies the queue at a time while other threads block efficiently instead of busy-waiting.

Approach Time Complexity Space Complexity Notes
Brute Force O(1) per operation O(capacity) Not thread-safe, cannot handle blocking
Optimal O(1) per operation O(capacity) Thread-safe, blocking handled efficiently using Condition variables

Algorithm Walkthrough

  1. Initialize the queue as an empty list and store the maximum capacity. Create a Lock object for mutual exclusion and two Condition objects, one for producers (waits when queue is full) and one for consumers (waits when queue is empty).
  2. For the enqueue operation, acquire the lock to enter the critical section. While the queue is full, call producer_condition.wait() to block until space is available. Once there is space, append the element to the queue and notify any waiting consumers that an element is available using consumer_condition.notify().
  3. For the dequeue operation, acquire the lock. While the queue is empty, call consumer_condition.wait() to block until an element is enqueued. Once an element is available, remove it from the front of the queue and notify any waiting producers that space is now available using producer_condition.notify().
  4. The size operation simply acquires the lock and returns the length of the queue. This ensures that the returned size is consistent with ongoing operations.

Why it works: The Lock ensures that only one thread modifies the queue at a time, preventing race conditions. The Condition objects ensure that threads block efficiently when the queue cannot proceed, waking up precisely when the necessary condition is met. The queue maintains FIFO order because elements are appended at the back and removed from the front.

Python Solution

import threading

class BoundedBlockingQueue(object):

    def __init__(self, capacity: int):
        self.capacity = capacity
        self.queue = []
        self.lock = threading.Lock()
        self.not_full = threading.Condition(self.lock)
        self.not_empty = threading.Condition(self.lock)

    def enqueue(self, element: int) -> None:
        with self.not_full:
            while len(self.queue) >= self.capacity:
                self.not_full.wait()
            self.queue.append(element)
            self.not_empty.notify()

    def dequeue(self) -> int:
        with self.not_empty:
            while not self.queue:
                self.not_empty.wait()
            value = self.queue.pop(0)
            self.not_full.notify()
            return value

    def size(self) -> int:
        with self.lock:
            return len(self.queue)

The Python solution uses a Lock for mutual exclusion. Two Condition objects manage blocking and waking for producers and consumers. enqueue waits while the queue is full, and dequeue waits while the queue is empty. This ensures correct FIFO behavior and proper thread synchronization.

Go Solution

package main

import "sync"

type BoundedBlockingQueue struct {
    capacity int
    queue    []int
    lock     sync.Mutex
    notFull  *sync.Cond
    notEmpty *sync.Cond
}

func Constructor(capacity int) BoundedBlockingQueue {
    b := BoundedBlockingQueue{
        capacity: capacity,
        queue:    make([]int, 0, capacity),
    }
    b.notFull = sync.NewCond(&b.lock)
    b.notEmpty = sync.NewCond(&b.lock)
    return b
}

func (b *BoundedBlockingQueue) Enqueue(element int) {
    b.lock.Lock()
    defer b.lock.Unlock()
    for len(b.queue) >= b.capacity {
        b.notFull.Wait()
    }
    b.queue = append(b.queue, element)
    b.notEmpty.Signal()
}

func (b *BoundedBlockingQueue) Dequeue() int {
    b.lock.Lock()
    defer b.lock.Unlock()
    for len(b.queue) == 0 {
        b.notEmpty.Wait()
    }
    val := b.queue[0]
    b.queue = b.queue[1:]
    b.notFull.Signal()
    return val
}

func (b *BoundedBlockingQueue) Size() int {
    b.lock.Lock()
    defer b.lock.Unlock()
    return len(b.queue)
}

The Go implementation mirrors the Python version. sync.Mutex ensures mutual exclusion. sync.Cond handles blocking for full or empty queues. Slices are used for the queue, with careful appends and slices for FIFO behavior.

Worked Examples

Example 1

Step Operation Queue State Output
1 enqueue(1) [1] -
2 dequeue() [] 1
3 dequeue() [] blocked
4 enqueue(0) [0] consumer unblocked
5 dequeue() [] 0
6 enqueue(2) [2] -
7 enqueue(3) [2,3] -
8 enqueue(4) [2,3] blocked
9 dequeue() [3,4] 2
10 size() [3,4] 2

Example 2

Due to multiple threads, queue order may vary. Key is that enqueue blocks when full, dequeue blocks when empty, and final size is consistent.

Complexity Analysis

Measure Complexity Explanation
Time O(1) per operation Each enqueue and dequeue performs a single append/pop with constant-time synchronization checks
Space O(capacity) The queue stores at most capacity elements

The time complexity is constant per operation because the critical sections are minimal and we do not scan the queue. The space is bounded by the queue capacity.

Test Cases

# Example 1
queue = BoundedBlockingQueue(2)
queue.enqueue(1)
assert queue.dequeue() == 1
queue.enqueue(0)
assert queue.dequeue() == 0
queue.enqueue(2)
queue.enqueue(3)
# next enqueue would block in real multithread scenario
assert queue.dequeue() == 2
assert queue.size() == 2

# Example 2
queue = BoundedBlockingQueue(3)
queue.enqueue(1)
queue.enqueue(0)
queue.enqueue(2)
vals = [queue.dequeue(), queue.dequeue(), queue.dequeue()]
assert sorted(vals) == [0,1,2]  # order may vary
queue.enqueue(3)
assert queue.size() == 1

# Edge test: single element
queue = BoundedBlockingQueue(1)
queue.enqueue(10)
assert queue.size() == 1
assert queue.dequeue() == 10
assert queue.size() == 0
Test Why
Examples 1 & 2 Verify correct blocking behavior and FIFO ordering with single/multiple threads
Single element queue Edge case for minimal capacity
Sorted output Ensures all dequeued elements are correct even with multithreading

Edge Cases

A critical edge case is when the queue has capacity 1. Both enqueue and dequeue must block appropriately, and the code must not deadlock. The implementation handles this using condition variables and ensures only one thread modifies the queue at a time.

Another edge case occurs when multiple threads simultaneously try to enqueue into a full queue. Here, notFull.wait() ensures that each producer blocks until space is available.