阻塞队列ArrayBlockingQueue
# 前言
ArrayBlockingQueue内部用数组存储元素(初始化时需要指定容量大小),是最典型的有界阻塞队列,利用 ReentrantLock
实现线程安全。
在生产者-消费者模型中使用时,如果生产速度远远大于消费速度,容易导致队列填满,大量生产线程被阻塞。
使用独占锁 ReentrantLock
实现线程安全,入队和出队操作使用同一个锁对象,这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。
重要方法
public void put(E e) throws InterruptedException
put(E e) 表示添加元素
public E take() throws InterruptedException
take() 表示取出元素
构造方法
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
重要属性
final Object[] items; // 元素数组
int takeIndex; // 下一个待取出的数组下标
int putIndex; // 下一个待添加的数组下标
int count; // 元素个数
final ReentrantLock lock; // 队列用到的锁
private final Condition notEmpty; // 消费者(内置条件队列)
private final Condition notFull; // 生产者(内置条件队列)
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 1 添加元素
public void put(E e) throws InterruptedException {
//不允许添加空元素
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//阻塞队列已经满了,则生产者挂起
while (count == items.length)
//await()方法的实现使用了条件队列
notFull.await();
//阻塞队列未满,则入队
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
//元素放进数组中,下标为putIndex
items[putIndex] = x;
//元素下标加1,若达到数组长度(下标最大为:数组长度-1),则重置为0
//此次用到了环形数组的设计
if (++putIndex == items.length)
putIndex = 0;
//总数加1
count++;
//唤醒消费者(入队了元素,数组不为空)
notEmpty.signal();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
环形数组
# 2 取出元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//阻塞队列空了,消费者挂起
while (count == 0)
notEmpty.await();
//阻塞队列不为空,则出队
return dequeue();
} finally {
lock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//获取takeIndex下标的数组元素,后续返回
E x = (E) items[takeIndex];
//将takeIndex下标的元素置空
items[takeIndex] = null;
//元素下标加1,若达到数组长度(下标最大为:数组长度-1),则重置为0
//此次用到了环形数组的设计
if (++takeIndex == items.length)
takeIndex = 0;
//总数减1
count--;
if (itrs != null)
itrs.elementDequeued();
//唤醒生产者(此时阻塞队列中有空位)
notFull.signal();
return x;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
上次更新: 2022/11/24, 17:59:25