0%

Java-ThreadLocal和InheritableThreadLocal的局限性以及如何在线程池模型中传递上下文

跨线程使用ThreadLocal 如何做???

ThreadLocal

1、使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void testThreadLocal() {
final ThreadLocal<Object> threadLocal = new ThreadLocal<>();
new Thread(
() -> {
threadLocal.set(Thread.currentThread().getName());
System.out.println("current thread: " + threadLocal.get()); // current thread: thread-1
threadLocal.remove();
}
, "thread-1")
.start();
new Thread(
() -> {
threadLocal.set(Thread.currentThread().getName());
System.out.println("current thread: " + threadLocal.get()); // current thread: thread-2
threadLocal.remove();
}
, "thread-2")
.start();
}

执行

1
2
current thread: thread-1
current thread: thread-2

可以看到线程是可以拿到 threadlocal中的变量

2、threadlocal是否跨线程?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
public void testThreadLocalWithChild() {
final ThreadLocal<Object> threadLocal = new ThreadLocal<>();
new Thread(
() -> {
threadLocal.set(Thread.currentThread().getName());
System.out.println("current thread: " + threadLocal.get()); // current thread: thread-1
new Thread(() -> {
System.out.println("child thread current thread: " + threadLocal.get()); // child thread current thread: null
}, "thread-child-1").start();
threadLocal.remove();
}
, "thread-1")
.start();
}

输出

1
2
current thread: thread-1
current thread: thread-2

可以看到跨线程后,我们无法拿到父亲线程的变量,所以thread无法解决跨线程

3、问题

  • 使用简单,模型简单,如果普通业务完全满足,对于没有异步操作的业务都满足
  • 局限性就是多线程异步操作!

InheritableThreadLocal

1、简单实用

​ 它可以传递父线程的变量

1
2
3
4
5
6
7
8
9
@Test
public void testInheritableThreadLocal() {
final InheritableThreadLocal<Object> threadLocal = new InheritableThreadLocal<>();
threadLocal.set(Thread.currentThread().getName());
Thread thread = new Thread(() -> {
System.out.println("current thread: " + threadLocal.get());//current thread: main
});
thread.start();
}

输出

1
current thread: main

可以看到是可以拿到结果的,是如何实现的呢?

2、InheritableThreadLocal 原理

1、OBJECT_INHERITABLE_THREAD_LOCAL.set(Thread.currentThread().getName()); 到底执行了什么

java.lang.ThreadLocal#set 方法:

1
2
3
4
5
6
7
8
9
10
11
12
public void set(T value) {
// 获取当前线程为main
Thread t = Thread.currentThread();
// 去获取当前线程的map -> t.threadLocals 显然是没有,因为没有threadlocal去绑定到main线程里
ThreadLocalMap map = getMap(t);
if (map != null)
// 有的话,就设置 thradlocal=>value
map.set(this, value);
else
// 去创建一个map
createMap(t, value);
}

java.lang.InheritableThreadLocal#createMap 重写了 java.lang.ThreadLocal#createMap

1
2
3
4
5
6
7
8
9
# java.lang.ThreadLocal#createMap
void createMap(Thread t, T firstValue) {
// t.threadLocals 进行赋值,显然是线程捆绑对象罢了
t.threadLocals = new ThreadLocalMap(this, firstValue);
}

void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}

结论就是,inheritableThreadLocal进行set的时候会像thread线程绑定一个 k-v 对象,k就是 inheritableThreadLocal , v就是我们要的对象,然后如果线程的 inheritableThreadLocals 为空就初始化一下。注意ThreadLocalMap里有趣的WeakReference,值得测试一哈,如果真的内存满了,会回收线程的变量么?

2、new Thread()

1
2
3
public Thread(Runnable target) {
init(null, target, "Thread-" + nextThreadNum(), 0);
}

继续

1
2
3
4
private void init(ThreadGroup g, Runnable target, String name,
long stackSize) {
init(g, target, name, stackSize, null, true);
}

继续

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
if (name == null) {
throw new NullPointerException("name cannot be null");
}

this.name = name;

Thread parent = currentThread();
/// ................省略

// 如果支持inheritThreadLocals && 父线程的inheritableThreadLocals不为空,显然调用过java.lang.InheritableThreadLocal#set是会进行初始化的
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
// 设置当前线程的inheritableThreadLocals为父类的,下面其实就是一个map拷贝
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
/* Stash the specified stack size in case the VM cares */
this.stackSize = stackSize;

/* Set thread ID */
tid = nextThreadID();
}

3、局限性测试

1、测试是否支持线程传递

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void testInheritableThreadLocalQuestion() throws InterruptedException {
final InheritableThreadLocal<Object> threadLocal = new InheritableThreadLocal<>();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 10, 1000000, TimeUnit.SECONDS, new SynchronousQueue<>());
threadPoolExecutor.execute(() -> {
// 父线程set一个变量
threadLocal.set("parent");
threadPoolExecutor.execute(() -> {
// 子线程去拿,最后移除
System.out.println("child get thread name: " + threadLocal.get());
threadLocal.remove();
});
// 最后父线程执行完毕去删除当前线程变量
threadLocal.remove();
});

if (!threadPoolExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
threadPoolExecutor.shutdown();
}
}

输出: child get thread name: parent

这个现象显然是支持线程传递的,因为在传递过程中出现了创建线程,执行第二个execute时出现了线程池没有线程,然后去创建一个,就可以传递了,真实中线程池是基本保活的

2、再次模拟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Test
public void testInheritableThreadLocalQuestion2() throws InterruptedException {
final InheritableThreadLocal<Object> threadLocal = new InheritableThreadLocal<>();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 1000000, TimeUnit.SECONDS, new SynchronousQueue<>());

// 初始化线程池中的线程
threadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

threadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

TimeUnit.SECONDS.sleep(2);


// 测试
threadPoolExecutor.execute(() -> {
threadLocal.set("parent");
threadPoolExecutor.execute(() -> {
System.out.println("child get thread name: " + threadLocal.get());
threadLocal.remove();
});
threadLocal.remove();
});

if (!threadPoolExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
threadPoolExecutor.shutdown();
}
}

输出:

1
child get thread name: null

可以看到输出结果为null,是因为在执行测试任务的第二个execute的时候没有创建线程,也就是没有发生传递

4、问题

  • 虽然InheritableThreadLocal可以解决跨线程传递的问题,但是它的局限性就是在于跨线程是必须创建新的线程
  • 业务中通常使用线程池模型(这里就不做解释了),大多数实现的线程池无法满足调用时线程重复创建,所以无法

线程池如何跨线程传递?

参考Golang中的context.Context的实现,我们可以想到如果依赖ThreadLocalInheritableThreadLocal 是不解决问题的,除非我们修改了 ThreadPoolExecutor 的源码,在每次调用的时候,将线程变量传递进去,其实也只能通过 Runnable函数进行传递了,因为对于线程池还是线程来说他们都需要传递 Runnable函数,所以考虑这个通用性最好!

1、代码实现

​ 局限性就是依赖参数传递ThreadLocal,所以后期可以考虑加入多个ThreadLocal 或者 业务中通常依赖于Spring进行管理,所以可以对ThreadLocal进行bean的注入,全局使用spring的bean进行初始化ThreadLocalRunnable,那么参数传递其实只需要一个ApplicationContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private static class ThreadLocalRunnable<T> implements Runnable {

private ThreadLocal<T> local;
private T args;
private Runnable runnable;

public ThreadLocalRunnable(ThreadLocal<T> local, Runnable runnable) {
if (local == null || runnable == null) {
throw new RuntimeException("new ThreadLocalRunnable find args has null arg");
}
this.local = local;
// 初始化的时候获取绑定的线程变量
this.args = local.get();
this.runnable = runnable;
}

/**
* 被new的线程/线程池中调度的线程调用
*/
@Override
public void run() {
try {
// 设置绑定的线程变量
local.set(args);
runnable.run();
} finally {
// 移除绑定的线程变量
local.remove();
}
}
}

2、测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Test
public void testThreadLocalRunnable() throws InterruptedException {

final ThreadLocal<Object> threadLocal = new ThreadLocal<>();

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 1000000, TimeUnit.SECONDS, new SynchronousQueue<>());

// 初始化线程池中的线程
threadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

threadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

TimeUnit.SECONDS.sleep(2);


threadLocal.set("my name is main");

// 测试
threadPoolExecutor.execute(new ThreadLocalRunnable<>(threadLocal, () -> {
System.out.println("parent get thread name: " + threadLocal.get());
threadLocal.set("my name is parent");
threadPoolExecutor.execute(new ThreadLocalRunnable<>(threadLocal, () -> {
System.out.println("child get thread name: " + threadLocal.get());
threadLocal.remove();
}));
threadLocal.remove();
}));

if (!threadPoolExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
threadPoolExecutor.shutdown();
}
}

输出

1
2
parent get thread name: my name is main
child get thread name: my name is parent

3、参考 spring的 ThreadPoolTaskExecutor

其实就是一个包装模式的使用,给你一个Runnable,然后你去包装一个Runnable,依靠闭包去实现!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Configuration
public class Config {

@Bean
public Executor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix(ASYNC_EXECUTOR_NAME);
executor.setCorePoolSize(5);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(-1);
// 这里在每次异步调用的时候, 会包装一下.
executor.setTaskDecorator(runnable -> {
// 这个时候还是同步状态
RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes();
// 返回的这个 runnable对象 才是去调用线程池.
return () -> {
try {
// 我们set 进去 ,其实是一个ThreadLocal维护的.
RequestContextHolder.setRequestAttributes(requestAttributes);
runnable.run();
} finally {
// 最后记得释放内存
RequestContextHolder.resetRequestAttributes();
}
};
});
return executor;
}
}
本人坚持原创技术分享,如果你觉得文章对您有用,请随意打赏! 如果有需要咨询的请发送到我的邮箱!