Java面试题之多线程 BlockingQueue的使用?源码解析
BlockingQueue
是 Java 中一个非常重要的接口,属于 java.util.concurrent
包。它是一个线程安全的队列,支持阻塞操作,适合用于生产者-消费者模式。BlockingQueue
提供了多种实现,如 ArrayBlockingQueue
、LinkedBlockingQueue
、PriorityBlockingQueue
等。
BlockingQueue
的实现是线程安全的,多个线程可以安全地访问同一个队列。ArrayBlockingQueue
),而有些是无界的(如 LinkedBlockingQueue
)。put(E e)
:将指定元素插入队列,如果队列已满,则等待。take()
:从队列中获取并移除元素,如果队列为空,则等待。offer(E e)
:尝试将指定元素插入队列,如果队列已满,则返回 false。poll()
:尝试从队列中获取并移除元素,如果队列为空,则返回 null。remainingCapacity()
:返回队列中可以容纳的剩余元素数量。以 ArrayBlockingQueue
为例,下面是一些关键部分的源码解析。
ArrayBlockingQueue
使用一个数组来存储元素,并使用两个指针来标记头部和尾部。
private final Object[] items; // 存储元素的数组
private int takeIndex; // 从中取出元素的索引
private int putIndex; // 插入元素的索引
private int count; // 当前元素数量
构造函数用于初始化队列的大小和其他参数。
public ArrayBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.items = new Object[capacity];
this.capacity = capacity;
}
put
方法用于插入元素,如果队列已满,则会阻塞。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
notFull.await(); // 队列已满,等待
}
insert(e);
} finally {
lock.unlock();
}
}
take
方法用于获取元素,如果队列为空,则会阻塞。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
notEmpty.await(); // 队列为空,等待
}
return extract();
} finally {
lock.unlock();
}
}
下面是一个简单的生产者-消费者示例,使用 BlockingQueue
来实现。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerExample {
private static final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
public static void main(String[] args) {
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
queue.put(i);
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
Integer value = queue.take();
System.out.println("Consumed: " + value);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
BlockingQueue
是一个非常强大的工具,适合用于多线程环境中的任务调度和数据传递。通过阻塞操作,它简化了线程间的协作,避免了复杂的同步问题。在实际开发中,合理使用 BlockingQueue
可以提高程序的性能和可维护性。