博客 从源码全面解析 ThreadLocal 关键字的来龙去脉

从源码全面解析 ThreadLocal 关键字的来龙去脉

   数栈君   发表于 2023-05-24 15:45  186  0

一、引言
对于 Java 开发者而言,关于 并发编程,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。

但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。

本期 并发编程 解析系列文章,将带你领略 并发编程 的奥秘
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/4c68141cc8f83f82f8a5543991e4a1dd..png



二、概念
ThreadLocal 的英文字面意思为 “本地线程”,实际上 ThreadLocal 代表的是线程的本地变量,可能将其命名为 ThreadLocalVariable 更加容易让人理解。

ThreadLocal 如何做到为每个线程存有一份独立的本地值呢?

一个 ThreadLocal 实例可以形象地理解为一个 Map(早期版本的 ThreadLocal 是这样设计的)。

当工作线程 Thread 实例向本地变量保持某个值时,会以 “Key-Value对”(即键-值对)的形式保存在 ThreadLocal 内部的Map中,其中 Key 为线程 Thread 实例,Value 为待保存的值。

当工作线程 Thread 实例从 ThreadLocal 本地变量取值时,会以 Thread 实例为 Key,获取其绑定的 Value。

一个ThreadLocal实例内部结构的形象如下所示:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/f1fbf32df97caaf06f4e60ba548e266d..png


后续在 JDK8 中进行了优化
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/bb453d5eadd0d62f62190daedc7ab1a2..png



三、使用
public class ThreadLocalTest {
ThreadLocal<String> threadLocal = new ThreadLocal<String>();

public static void main(String[] args) {
ThreadLocalTest threadLocalTest = new ThreadLocalTest();
threadLocalTest.showTwoThread();
}

public void showTwoThread() {
new Thread(new Runnable() {
public void run() {
threadLocal.set("Thread1");
System.out.println("I am " + threadLocal.get());
}
}).start();

new Thread(new Runnable() {
public void run() {
threadLocal.set("Thread2");
System.out.println("I am " + threadLocal.get());
}
}).start();
}
}


上述是我们 ThreadLocal 的一个使用 Demo

最终结果输出:

I am Thread1
I am Thread2
1
2
由此可以看到,我们在线程中使用了 ThreadLocal 做到了线程之间彼此隔离的作用。

四、源码解析
1. set 方法
public void set(T value) {
// 获取当前线程
Thread t = Thread.currentThread();
// 通过当前线程获取 ThreadLocalMap
ThreadLocalMap map = getMap(t);
// 验证当前的ThreadLocalMap是否为空
if (map != null){
// 若不为空,则直接插入数据即可
map.set(this, value);
} else{
// 若为空,则需要创建 ThreadLocalMap
createMap(t, value);
}
}

// 根据当前的Thread创建ThreadLocalMap并赋予初始值firstValue
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}


从这一步我们可以看出,如果当前并没有创建过 ThreadLocalMap,则会去第一次创建:

1.1 初始化设值
// 初始化ThreadLocalMap
// 1) 创建底层数据存储的Entry数组table
// 2) 根据hashCode计算出来所在数组的下标i执行赋值操作
// 3) 初始化代表table中元素个数size的值
// 4) 初始化阈值threshold
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
// 初始化table,默认数值为16
table = new Entry[INITIAL_CAPACITY];
// hash获取下标
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
// 创建待插入的对象并放入数据
table[i] = new Entry(firstKey, firstValue);
// 调整容量
size = 1;
// 调整负载系数
setThreshold(INITIAL_CAPACITY);
}


但如果当前的 ThreadLocalMap 不是第一次创建,则会执行 map.set(this, value)

1.2 非初始化设值
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);

// 首先,我们要先知晓,Entry对象是【弱引用】对象
// 注意这里循环的条件是e != null,这个很重要,它采用的就是上面讲的开放地址法。
// 这里遍历的逻辑是,先通过hash找到数组下标,然后寻找相等的ThreadLocal对象,找不到就往下一个index找。
// --------------------------------------------------------------------------------------------
// 有三种情况会跳出循环:
// 1) 找到了相同key的ThreadLocal对象,然后更新value值;
// 2) 找到了数组中的一个元素Entry,但是key=null,说明虚引用是可被GC回收的状态。
// 3) 一直往数组下一个index查找,直到下一个index对应的元素为null;
for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
// 拿到当前的ThreadLocal对象
ThreadLocal<?> k = e.get();
// 如果相同的ThreadLocal,直接更新即可
if (k == key) {
e.value = value;
return;
}
// 当前的key=null,则表示虚引用的ThreadLocal是被GC回收的状态
if (k == null) {
// 1) 向前找到第一个空闲的key下标为 first
// 2) 向后找到第一个空闲的key下标为 last
// 3) 设置当前的key-value
// 4) 【expungeStaleEntry】清除first~last之间的陈旧的Entry,直接将value置为null
replaceStaleEntry(key, value, i);
return;
}
}

// 走到这里就说明下标为i的位置上,是没有元素的,所以可以直接将新建的Entry元素插入到i这个位置
tab[i] = new Entry(key, value);
int sz = ++size;
// cleanSomeSlots:存在陈旧的Entry且已经被清除
if (!cleanSomeSlots(i, sz) && sz >= threshold){
rehash();
}
}

// 循环获取下标
// 0-1-2-3-0-1-2-3
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}


private void rehash() {
// 清除陈旧的key
expungeStaleEntries();

// 扩容
// 1) 创建一个长度是当前table的2倍
// 2) 遍历旧的table数组,依次获得里面的Entry元素
// 2-1) 计算Entry在新的table数组中应该插入的位置
// 2-2) 如果下标h已经被占用了,那么就向后查找空位,直到找到空闲的位置为止,插入进去。
// 2-3) 如果下标h没有被占用,那么就插入到h的位置上
// 2-4) count++
// 3) 根据新数组的长度,更新阈值threshold
// 4) 针对新的数组,更新全局变量size和table
if (size >= threshold - threshold / 4){
resize();
}
}

1.3 流程图
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/72f1b2ff4a9c1e23b9ef1397006fa41f..png


2. get方法
public T get() {
// 拿到当前线程
Thread t = Thread.currentThread();
// 通过当前线程获取 ThreadLocalMap
ThreadLocalMap map = getMap(t);
// 如果 ThreadLocalMap 不为空的情况下
if (map != null) {
// 得到当前的Entry
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 初始化(和Set一样的流程)
return setInitialValue();
}

private Entry getEntry(ThreadLocal<?> key) {
// 获取下标
int i = key.threadLocalHashCode & (table.length - 1);
// 获取键值
Entry e = table[i];
// 如果不为空且当前key相等
if (e != null && e.get() == key)
return e;
else
// 
return getEntryAfterMiss(key, i, e);
}

private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
// 取当前的table
Entry[] tab = table;
int len = tab.length;

// 循环遍历当前entry
while (e != null) {
ThreadLocal<?> k = e.get();
// 找到即返回
if (k == key)
return e;
if (k == null)
// 清理陈旧的key
expungeStaleEntry(i);
else
// 遍历下一个
i = nextIndex(i, len);
e = tab[i];
}
return null;
}

3. remove方法
public void remove() {
// 拿到当前线程的ThreadLocalMap
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null){
m.remove(this);
}
}

private void remove(ThreadLocal<?> key) {
// 获取当前key的所属下标
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);

// 循环找出当前的key
for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
// 若找到引用置为空并清理陈旧的entry
if (e.get() == key) {
e.clear();
expungeStaleEntry(i);
return;
}
}
}

4. 开放地址法
当我们看到这里,就发现这个 ThreadLocalMap 有些“奇怪”,它并没有按照我们之前在学习 HashMap 的 链式 方式去解决哈希冲突,即:数组+链表。它其实使用的是一种叫做 “开放地址法” 作为解决哈希冲突的一种方式。

什么是开放地址法呢?

开放地址法的基本思想就是:一旦发生了冲突,那么就去寻找下一个空的地址;那么只要表足够大,空的地址总能找到,并将记录插入进去。

ThreadLocalMap 和 HashMap 的区别是什么呢?

HashMap:

数据结构是数组+链表
通过链地址法解决 hash 冲突的问题
里面的 Entry 内部类的引用都是强引用
ThreadLocalMap:

数据结构仅仅是数组
通过开放地址法来解决 hash 冲突的问题
Entry 内部类中的 key 是弱引用,value 是强引用
链地址法和开放地址法的优缺点是什么呢?

开放地址法

容易产生堆积问题,不适于大规模的数据存储。
散列函数的设计对冲突会有很大的影响,插入时可能会出现多次冲突的现象。
删除的元素是多个冲突元素中的一个,需要对后面的元素作处理,实现较复杂。
链地址法

处理冲突简单,且无堆积现象,平均查找长度短。
链表中的结点是动态申请的,适合构造表不能确定长度的情况。
删除结点的操作易于实现。只要简单地删去链表上相应的结点即可。
指针需要额外的空间,故当结点规模较小时,开放定址法较为节省空间。
ThreadLocalMap采用开放地址法原因是什么?

ThreadLocal 往往存放的数据量不会特别大(而且key 是弱引用又会被垃圾回收,及时让数据量更小)。

采用开放地址法简单的结构会更节省空间,同时数组的查询效率也是非常高,加上第一点的保障,冲突概率也比较低。

5. 清理方式
5.1 探测式清除
这个清除主要是通过这个方法:expungeStaleEntry

private int expungeStaleEntry(int staleSlot) {
// 获取数据
Entry[] tab = table;
int len = tab.length;
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
Entry e;
int i;


// 遍历Table,这里是一直向后遍历,直到遇到为null(也就是没用过的)为止
for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// 赋予空值
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
// 重新hash
int h = k.threadLocalHashCode & (len - 1);
if (h != i) {
tab[i] = null;.
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}

5.2 启发式清除
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/0eca8e639149430eb6daff2d6c4c5358..png


从上面探测式清除我们可以得到一个结论:探测式清除并不能完全清除我们table的所有陈旧数据

这个时候需要 启发式清除 的出场:

// i:探测式清除的返回地址
// n:table的总容量
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
// 下一个
i = nextIndex(i, len);
Entry e = tab[i];
// 如果这个已经被GC掉了
if (e != null && e.get() == null) {
n = len;
removed = true;
// 由当前地址去进行探测式清除
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}

看完说实话,实现有点操蛋,接着人家探测式清除外包了一层,就换了一个名字了

假设:m = n>>>=1 ,也就是 2 的 m 次幂等于 n

当连续 m 次没有去进行清除操作,则默认当前 table 中没有垃圾

例如:数组长度是16,那么2^4=16,也就是连续4次没有过期Entry,即 m = logn/log2(n为数组长度)

五、内存泄露
public class ThreadLocalForOOM {
/**
* -Xms50m -Xmx50m
*/
static class OOMObject {
private Long[] a = new Long[2 * 1024 * 1024];
}

final static ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 5, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

final static ThreadLocal<OOMObject> threadLocal = new ThreadLocal<>();

public static void main(String[] args) {
for (int i = 0; i < 50; i++) {
int finalI = i;
pool.execute(() -> {
threadLocal.set(new OOMObject());
System.out.println("oom object--->" + finalI);
OOMObject oomObject = threadLocal.get();
System.out.println("oomObject---->" + oomObject);
// threadLocal.remove(); // 记得remove 防止内存泄露,此时一定要在使用完remove
});

try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

我们运行上述代码,会出现 OOM 异常:Exception in thread "pool-1-thread-4" java.lang.OutOfMemoryError: Java heap space

我们在使用完 threadLocal 之后,需要及时的进行 remove 删除,加上这句就OK了。

免责申明:

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群