博客 从源码全面解析 ArrayBlockingQueue 的来龙去脉

从源码全面解析 ArrayBlockingQueue 的来龙去脉

   数栈君   发表于 2023-05-22 11:16  185  0

一、引言
并发编程在互联网技术使用如此广泛,几乎所有的后端技术面试官都要在并发编程的使用和原理方面对小伙伴们进行 360° 的刁难。

作为一个在互联网公司面一次拿一次 Offer 的面霸,打败了无数竞争对手,每次都只能看到无数落寞的身影失望的离开,略感愧疚(请允许我使用一下夸张的修辞手法)。

于是在一个寂寞难耐的夜晚,暖男我痛定思痛,决定开始写 《吊打面试官》 系列,希望能帮助各位读者以后面试势如破竹,对面试官进行 360° 的反击,吊打问你的面试官,让一同面试的同僚瞠目结舌,疯狂收割大厂 Offer!

虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马

二、使用
对于阻塞队列,想必大家应该都不陌生,我们这里简单的介绍一下,对于 Java 里面的阻塞队列,其使用了 **生产者和消费者 **的模型

对于生产者来说,主要有以下几部分:

add(E) // 添加数据到队列,如果队列满了,无法存储,抛出异常
offer(E) // 添加数据到队列,如果队列满了,返回false
offer(E,timeout,unit) // 添加数据到队列,如果队列满了,阻塞timeout时间,如果阻塞一段时间,依然没添加进入,返回false
put(E) // 添加数据到队列,如果队列满了,挂起线程,等到队列中有位置,再扔数据进去,死等!
1
2
3
4
对于消费者来说,主要有以下几部分:

remove() // 从队列中移除数据,如果队列为空,抛出异常
poll() // 从队列中移除数据,如果队列为空,返回null,么的数据
poll(timeout,unit) // 从队列中移除数据,如果队列为空,挂起线程timeout时间,等生产者扔数据,再获取
take() // 从队列中移除数据,如果队列为空,线程挂起,一直等到生产者扔数据,再获取
1
2
3
4
我们本篇来讲讲堵塞队列中的第一员猛将,ArrayBlockingQueue 的故事

我们简单来写一个小demo

public class ArrayBlockingQueueTest {
public static void main(String[] args) throws Exception {
// 必须设置队列的长度
ArrayBlockingQueue queue = new ArrayBlockingQueue(4);

// 生产者扔数据
queue.add("1");
queue.offer("2");
queue.offer("3", 2, TimeUnit.SECONDS);
queue.put("2");

// 消费者取数据
System.out.println(queue.remove());
System.out.println(queue.poll());
System.out.println(queue.poll(2, TimeUnit.SECONDS));
System.out.println(queue.take());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
三、源码
1、初始化
由于我们的 ArrayBlockingQueue 底层使用的是数据结构,所以我们需要在初始化的时候指定其大小,如下:

// 设置其大小长度为 4
ArrayBlockingQueue queue = new ArrayBlockingQueue(4);

// 初始化
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

// 初始化ArrayBlockingQueue的一些初始变量
public ArrayBlockingQueue(int capacity, boolean fair) {
// 如果传一个负数,直接完蛋
if (capacity <= 0)
throw new IllegalArgumentException();
// 初始化数组items
this.items = new Object[capacity];
// 初始化lock非公平锁
lock = new ReentrantLock(fair);
// 消费者挂起线程和唤醒线程用到的Condition
notEmpty = lock.newCondition();
// 生产者挂起线程和唤醒线程用到的Condition
notFull = lock.newCondition();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
除了我们初始化的这些变量,也有其他的一些变量:

// 存储数据的下标
int putIndex
// 取数据的下标
int takeIndex
// 当前数组中存储的数据长度
int count
1
2
3
4
5
6
对于 ReentrantLock 和 newCondition 的知识点,可以看以下博文:

newCondition的源码分析
ReentrantLock的源码分析
2、生产者的源码
2.1 add()源码实现
public boolean add(E e) {
return super.add(e);
}

// 走到这里会发现,我们的add方法就是调用了offer方法
// offer: 添加数据到队列,如果队列满了,返回false
// 所以这里offer满了,就会抛出异常:"Queue full"
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2.2 offer()源码实现
public boolean offer(E e) {
// 检测当前的入参是否为null
// 如果是的话直接抛出异常
checkNotNull(e);
//【面试绝杀招】为什么会这样引用使用?
final ReentrantLock lock = this.lock;
// 直接加锁,保证线程安全
lock.lock();
try {
// 如果当前数组存储的长度等于总容量
// 直接返回false,插入失败
if (count == items.length)
return false;
else {
// 插入
enqueue(e);
return true;
}
} finally {
// 结束之后将锁释放掉
lock.unlock();
}
}

// 添加数据
private void enqueue(E x) {
// 我们发现,这引用又来了
final Object[] items = this.items;
// 当前数组的赋值
items[putIndex] = x;
// 如果下标等于我们的总容量,需要重新置下标值为0
if (++putIndex == items.length)
putIndex = 0;
// 数组容量加一
count++;
// 唤醒消费者等待的线程
notEmpty.signal();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
2.3 offer(time,unit)源码实现
生产者在添加数据时,如果队列已经满了,阻塞一会

阻塞到消费者消费了消息,然后唤醒当前阻塞线程
阻塞到了 time 时间,再次判断是否可以添加,不能,直接告辞
public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {
// 检测当前的入参是否为空
checkNotNull(e);
// 统一时间格式
long nanos = unit.toNanos(timeout);
// 持有引用
final ReentrantLock lock = this.lock;
// 允许的中断的加锁方式
lock.lockInterruptibly();
try {
// 如果当前的存储等于数组的长度
// 这里为什么不能用if判断,需要用while,牵扯到虚假唤醒,我们后面聊
while (count == items.length) {
// 时间小于0,直接返回false
if (nanos <= 0)
return false;
// 挂起当前线程
nanos = notFull.awaitNanos(nanos);
}
// 添加到数组中
enqueue(e);
return true;
} finally {
// 解锁
lock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2.4 put()源码实现
public void put(E e) throws InterruptedException {
// 检测当前的入参是否为空
checkNotNull(e);
// 持有引用
final ReentrantLock lock = this.lock;
// 允许的中断的加锁方式
lock.lockInterruptibly();
try {
// 如果当前的存储等于数组的长度
// 这里为什么不能用if判断,需要用while,牵扯到虚假唤醒,我们后面聊
while (count == items.length)
// 无时间挂起当前线程
notFull.await();
// 添加到队列
enqueue(e);
} finally {
// 解锁
lock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
通过上面的源码分析,我们应该可以理解上面说的这几句话了

add(E) // 添加数据到队列,如果队列满了,无法存储,抛出异常
offer(E) // 添加数据到队列,如果队列满了,返回false
offer(E,timeout,unit) // 添加数据到队列,如果队列满了,阻塞timeout时间,如果阻塞一段时间,依然没添加进入,返回false
put(E) // 添加数据到队列,如果队列满了,挂起线程,等到队列中有位置,再扔数据进去,死等!
1
2
3
4
接着我们讲两个小细节,也是面试震惊面试官的地方

2.5 final ReentrantLock lock = this.lock
在我们 Doug Lea 里写的代码中,java.util.concurrent 包下 和 HashMap 中都有类似的写法

这种写法到底有什么好处呢,为什么我们不能直接使用成员变量 lock 来进行加锁解锁

感兴趣的可以看下这篇博文:原因

不感兴趣的可以看我的总结分析:

首先我们需要准备下面两个代码,将其反编译得到 Java 字节码

引用状态

public void get1(){
final ReentrantLock lock = this.lock;
lock.lock();
}
1
2
3
4
非引用状态

public void get2(){
lock.lock();
}
1
2
3
通过对比字节码我们发现,引用状态的字节码相较于非引用状态少了一个指令:getfield

而这个缺少的指令,也正是 Doug Lea 优化的来源:从栈读变量比从堆读变量会更cache-friendly,本地变量最终绑定到CPU寄存器的可能性更高。

但由于现在的 Java 编译器已经非常先进了,不论采用哪种方式,最终形成的机器指令都是一样的

所以,Doug Lea 的优化在之前是字节码层面的优化,但如今确实没有卵用

2.6 虚假唤醒
我们上面有一段 while 循环的代码:

// 如果当前的存储等于数组的长度
// 这里为什么不能用if判断,需要用while,牵扯到虚假唤醒,我们后面聊
while (count == items.length) {
// 无时间挂起当前线程
notFull.await();
}
// 添加到队列
enqueue(e);
1
2
3
4
5
6
7
8
我们 A 线程判断数组内还有空余,则放入数组



我们 B 线程判断其 count == items.length 进入挂起状态,当我们的 B 线程被唤醒时,如果不经历 count == items.length 的过程,就会将我们 A 线程的 3 数据给覆盖掉

3、消费者的源码
3.1 remove()源码实现
主要使用了我们的poll方法
public E remove() {
// 直接调用poll方法
E x = poll();
// 如果有数据则返回
// 无数据则抛出异常
if (x != null)
return x;
else
throw new NoSuchElementException();
}
1
2
3
4
5
6
7
8
9
10
3.2 poll() 源码实现
public E poll() {
// 还是引用
final ReentrantLock lock = this.lock;
// 锁一下
lock.lock();
try {
// 判断数组容量是否等于0(数组无容量),返回null
// 如果数组中有数据,则进行dequeue方法
return (count == 0) ? null : dequeue();
} finally {
// 解锁
lock.unlock();
}
}

private E dequeue() {
// 还是引用
final Object[] items = this.items;
// 当前弹出数组的下标
E x = (E) items[takeIndex];
// 弹出后将当前下标的数据置为空
items[takeIndex] = null;
// 如果我们的弹出下标和我们数组的大小一样时,需要更新弹出下标
if (++takeIndex == items.length)
takeIndex = 0;
// 数组数据数量减一
count--;
// 迭代器内容,先忽略
if (itrs != null)
itrs.elementDequeued();
// 唤醒生产者的线程
notFull.signal();
return x;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
3.3 poll(time,unit)源码实现
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 将时间转化成统一单位
long nanos = unit.toNanos(timeout);
// 引用
final ReentrantLock lock = this.lock;
// 可中断的加锁
lock.lockInterruptibly();
try {
// 看一下当前的数组还有容量没
while (count == 0) {
// 如果没有容量并且时间也到期了,返回null
if (nanos <= 0)
return null;
// 进入带有时间的等待状态(扔到Condition队列中)
nanos = notEmpty.awaitNanos(nanos);
}
// 被唤醒后并且当前的数组有容量
// 弹出队列中的数据即可
return dequeue();
} finally {
// 解锁
lock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
3.4 take()源码实现
public E take() throws InterruptedException {
// 引用
final ReentrantLock lock = this.lock;
// 可中断的加锁
lock.lockInterruptibly();
try {
// 看一下当前的数组还有容量没
while (count == 0){
// 没容量直接扔Condition队列等待
notEmpty.await();
}
// 被唤醒后并且当前的数组有容量
// 弹出队列中的数据即可
return dequeue();
} finally {
// 解锁
lock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
四、流程图
私聊我获取高清流程图



五、总结
鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。

其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
上一篇:大数据安全方案
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群