14 | Lock和Condition(上):隐藏在并发包中的管程

Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题

再造管程的理由

synchronized 无法解决死锁问题

synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。

新设计一把互斥锁去解决这个问题

  1. 能够响应中断。synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。
  2. 支持超时。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
  3. 非阻塞地获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。

Lock 接口:

1
2
3
4
5
6
7
8
// 支持中断的 API
void lockInterruptibly()
throws InterruptedException;
// 支持超时的 API
boolean tryLock(long time, TimeUnit unit)
throws InterruptedException;
// 支持非阻塞获取锁的 API
boolean tryLock();

Java 里多线程的可见性是通过 Happens-Before 规则保证的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class X {
private final Lock rtl = new ReentrantLock();
int value;
public void addOne() {
// 获取锁
rtl.lock();
try {
value+=1;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}

Java SDK 里面的 ReentrantLock,内部持有一个 volatile 的成员变量 state,获取锁的时候,会读写 state 的值;解锁的时候,也会读写 state 的值。也就是说,在执行 value+=1 之前,程序先读写了一次 volatile 变量 state,在执行 value+=1 之后,又读写了一次 volatile 变量 state。

什么是可重入锁

指的是线程可以重复获取同一把锁

例如下面代码中,当线程 T1 执行到 ① 处时,已经获取到了锁 rtl ,当在 ① 处调用 get() 方法时,会在 ② 再次对锁 rtl 执行加锁操作。此时,如果锁 rtl 是可重入的,那么线程 T1 可以再次加锁成功;如果锁 rtl 是不可重入的,那么线程 T1 此时会被阻塞。

所谓可重入函数,指的是多个线程可以同时调用该函数,每个线程都能得到正确结果;同时在一个线程内支持线程切换,无论被切换多少次,结果都是正确的。多线程可以同时执行,还支持线程切换,这意味着什么呢?线程安全啊。所以,可重入函数是线程安全的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class X {
private final Lock rtl = new ReentrantLock();
int value;
public int get() {
// 获取锁
rtl.lock(); ②
try {
return value;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
public void addOne() {
// 获取锁
rtl.lock();
try {
value = 1 + get(); ①
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}

公平锁与非公平锁

ReentrantLock 这个类有两个构造函数,一个是无参构造函数,一个是传入 fair 参数的构造函数。fair 参数代表的是锁的公平策略,如果传入 true 就表示需要构造一个公平锁,反之则表示要构造一个非公平锁。

1
2
3
4
5
6
7
8
9
// 无参构造函数:默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 根据公平策略参数创建锁
public ReentrantLock(boolean fair){
sync = fair ? new FairSync()
: new NonfairSync();
}

我们介绍过入口等待队列,锁都对应着一个等待队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队列中唤醒一个等待的线程。如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁,很公平;如果是非公平锁,则不提供这个公平保证,有可能等待时间短的线程反而先被唤醒。

并发大师 Doug Lea《Java 并发编程:设计原则与模式》一书中,推荐的三个用锁的最佳实践,它们分别是:

  1. 永远只在更新对象的成员变量时加锁
  2. 永远只在访问可变的成员变量时加锁
  3. 永远不在调用其他对象的方法时加锁

15 | Lock和Condition(下):Dubbo如何用管程实现异步转同步?

Java SDK 并发包里的 Condition,Condition 实现了管程模型里面的条件变量

Java 语言内置的管程里只有一个条件变量,而 Lock&Condition 实现的管程是支持多个条件变量的

在很多并发场景下,支持多个条件变量能够让我们的并发程序可读性更好,实现起来也更容易。例如,实现一个阻塞队列,就需要两个条件变量。

那如何利用两个条件变量快速实现阻塞队列呢?

一个阻塞队列,需要两个条件变量,一个是队列不空(空队列不允许出队),另一个是队列不满(队列已满不允许入队)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class BlockedQueue<T>{
final Lock lock =
new ReentrantLock();
// 条件变量:队列不满
final Condition notFull =
lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty =
lock.newCondition();

// 入队
void enq(T x) {
lock.lock();
try {
while (队列已满){
// 等待队列不满
notFull.await();
}
// 省略入队操作...
// 入队后, 通知可出队
notEmpty.signal();
}finally {
lock.unlock();
}
}
// 出队
void deq(){
lock.lock();
try {
while (队列已空){
// 等待队列不空
notEmpty.await();
}
// 省略出队操作...
// 出队后,通知可入队
notFull.signal();
}finally {
lock.unlock();
}
}
}

Lock 和 Condition 实现的管程,**线程等待和通知需要调用 await()、signal()、signalAll()**,它们的语义和 wait()、notify()、notifyAll() 是相同的。但是不一样的是,Lock&Condition 实现的管程里只能使用前面的 await()、signal()、signalAll(),而后面的 wait()、notify()、notifyAll() 只有在 synchronized 实现的管程里才能使用。如果一不小心在 Lock&Condition 实现的管程里调用了 wait()、notify()、notifyAll(),那程序可就彻底玩儿完了。

同步与异步

通俗点来讲就是调用方是否需要等待结果,如果需要等待结果,就是同步;如果不需要等待结果,就是异步

Dubbo 源码分析

其实在编程领域,异步的场景还是挺多的,比如 TCP 协议本身就是异步的,我们工作中经常用到的 RPC 调用,在 TCP 协议层面,发送完 RPC 请求后,线程是不会等待 RPC 的响应结果的。平时工作中的 RPC 调用大多数都是同步的啊?这是怎么回事呢?

一定是有人帮你做了异步转同步的事情。例如目前知名的 RPC 框架 Dubbo 就给我们做了异步转同步的事情

对于下面一个简单的 RPC 调用,默认情况下 sayHello() 方法,是个同步方法,也就是说,执行 service.sayHello(“dubbo”) 的时候,线程会停下来等结果。

1
2
3
DemoService service = 初始化部分省略
String message = service.sayHello("dubbo");
System.out.println(message);

如果此时你将调用线程 dump 出来的话,会是下图这个样子,你会发现调用线程阻塞了,线程状态是 TIMED_WAITING。本来发送请求是异步的,但是调用线程却阻塞了,说明 Dubbo 帮我们做了异步转同步的事情。通过调用栈,能看到线程是阻塞在 DefaultFuture.get() 方法上,所以可以推断:Dubbo 异步转同步的功能应该是通过 DefaultFuture 这个类实现的。

image-20220521225902440

DubboInvoker 的 108 行调用了 DefaultFuture.get(),这一行很关键

这一行先调用了 request(inv, timeout) 方法,这个方法其实就是发送 RPC 请求,之后通过调用 get() 方法等待 RPC 返回结果。

1
2
3
4
5
6
7
8
9
public class DubboInvoker{
Result doInvoke(Invocation inv){
// 下面这行就是源码中 108 行
// 为了便于展示,做了修改
return currentClient
.request(inv, timeout)
.get();
}
}

DefaultFuture 这个类是很关键,我们的需求:当 RPC 返回结果之前,阻塞调用线程,让调用线程等待;当 RPC 返回结果后,唤醒调用线程,让调用线程重新执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 创建锁与条件变量
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();

// 调用方通过该方法等待结果
Object get(int timeout){
long start = System.nanoTime();
lock.lock();
try {
while (!isDone()) {
done.await(timeout);
long cur=System.nanoTime();
if (isDone() ||
cur-start > timeout){
break;
}
}
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException();
}
return returnFromResponse();
}
// RPC 结果是否已经返回
boolean isDone() {
return response != null;
}
// RPC 结果返回时调用该方法
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
}

调用线程通过调用 get() 方法等待 RPC 返回结果

调用 lock() 获取锁,在 finally 里面调用 unlock() 释放锁;获取锁后,通过经典的在循环中调用 await() 方法来实现等待。

当 RPC 结果返回时,会调用 doReceived() 方法,这个方法里面,调用 lock() 获取锁,在 finally 里面调用 unlock() 释放锁,获取锁后通过调用 signal() 来通知调用线程,结果已经返回,不用继续等待了。

websocket 也是一个异步的通信协议,如果基于这个协议实现一个简单的 RPC,你也会遇到异步转同步的问题。现在很多公有云的 API 本身也是异步的,例如创建云主机,就是一个异步的 API,调用虽然成功了,但是云主机并没有创建成功,你需要调用另外一个 API 去轮询云主机的状态。如果你需要在项目内部封装创建云主机的 API,你也会面临异步转同步的问题,因为同步的 API 更易用。

16 | Semaphore:如何快速实现一个限流器?

Semaphore,现在普遍翻译为“信号量”

信号量模型

一个计数器,一个等待队列,三个方法

在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它们,这三个方法分别是:init()、down() 和 up()

image-20220521230300801

  • init():设置计数器的初始值。
  • down():计数器的值减 1;如果此时计数器的值小于 0,则当前线程将被阻塞,否则当前线程可以继续执行。
  • up():计数器的值加 1;如果此时计数器的值小于或者等于 0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。

init()、down() 和 up() 三个方法都是原子性的

并且这个原子性是由信号量模型的实现方保证的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Semaphore{
// 计数器
int count;
// 等待队列
Queue queue;
// 初始化操作
Semaphore(int c){
this.count=c;
}
//
void down(){
this.count--;
if(this.count<0){
// 将当前线程插入等待队列
// 阻塞当前线程
}
}
void up(){
this.count++;
if(this.count<=0) {
// 移除等待队列中的某个线程 T
// 唤醒线程 T
}
}
}

信号量模型里面,down()、up() 这两个操作历史上最早称为 P 操作和 V 操作,所以信号量模型也被称为 PV 原语

在 Java SDK 并发包里,down() 和 up() 对应的则是 acquire() 和 release()。

如何使用信号量

十字路口的红绿灯可以控制交通,得益于它的一个关键规则:车辆在通过路口前必须先检查是否是绿灯,只有绿灯才能通行。

在累加器的例子里面,count+=1 操作是个临界区,只允许一个线程执行,也就是说要保证互斥。那这种情况用信号量怎么控制呢?

就像我们用互斥锁一样,只需要在进入临界区之前执行一下 down() 操作,退出临界区之前执行一下 up() 操作就可以了。

acquire() 就是信号量里的 down() 操作,release() 就是信号量里的 up() 操作。

1
2
3
4
5
6
7
8
9
10
11
12
static int count;
// 初始化信号量
static final Semaphore s = new Semaphore(1);
// 用信号量保证互斥
static void addOne() {
s.acquire();
try {
count+=1;
} finally {
s.release();
}
}

假设两个线程 T1 和 T2 同时访问 addOne() 方法,当它们同时调用 acquire() 的时候,由于 acquire() 是一个原子操作,所以只能有一个线程(假设 T1)把信号量里的计数器减为 0,另外一个线程(T2)则是将计数器减为 -1。对于线程 T1,信号量里面的计数器的值是 0,大于等于 0,所以线程 T1 会继续执行;对于线程 T2,信号量里面的计数器的值是 -1,小于 0,按照信号量模型里对 down() 操作的描述,线程 T2 将被阻塞。所以此时只有线程 T1 会进入临界区执行count+=1;

当线程 T1 执行 release() 操作,也就是 up() 操作的时候,信号量里计数器的值是 -1,加 1 之后的值是 0,小于等于 0,按照信号量模型里对 up() 操作的描述,此时等待队列中的 T2 将会被唤醒。于是 T2 在 T1 执行完临界区代码之后才获得了进入临界区执行的机会,从而保证了互斥性。

快速实现一个限流器

Semaphore 可以允许多个线程访问一个临界区

比较常见的需求就是我们工作中遇到的各种池化资源,例如连接池、对象池、线程池等等。其中,数据库连接池,在同一时刻,一定是允许多个线程同时使用连接池的,当然,每个连接在被释放前,是不允许其他线程使用的。

所谓对象池呢,指的是一次性创建出 N 个对象,之后所有的线程重复利用这 N 个对象,当然对象在被释放前,也是不允许其他线程使用的。对象池,可以用 List 保存实例对象。

信号量的计数器,在上面的例子中,我们设置成了 1,这个 1 表示只允许一个线程进入临界区,但如果我们把计数器的值设置成对象池里对象的个数 N,就能完美解决对象池的限流问题了。下面就是对象池的示例代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class ObjPool<T, R> {
final List<T> pool;
// 用信号量实现限流器
final Semaphore sem;
// 构造函数
ObjPool(int size, T t){
pool = new Vector<T>(){};
for(int i=0; i<size; i++){
pool.add(t);
}
sem = new Semaphore(size);
}
// 利用对象池的对象,调用 func
R exec(Function<T,R> func) {
T t = null;
sem.acquire();
try {
t = pool.remove(0);
return func.apply(t);
} finally {
pool.add(t);
sem.release();
}
}
}
// 创建对象池
ObjPool<Long, String> pool = new ObjPool<Long, String>(10, 2);
// 通过对象池获取 t,之后执行
pool.exec(t -> {
System.out.println(t);
return t.toString();

我们用一个 List来保存对象实例,用 Semaphore 实现限流器。

关键的代码是 ObjPool 里面的 exec() 方法,这个方法里面实现了限流的功能。在这个方法里面,我们首先调用 acquire() 方法(与之匹配的是在 finally 里面调用 release() 方法),假设对象池的大小是 10,信号量的计数器初始化为 10,那么前 10 个线程调用 acquire() 方法,都能继续执行,相当于通过了信号灯,而其他线程则会阻塞在 acquire() 方法上。对于通过信号灯的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过 pool.remove(0) 实现的),分配完之后会执行一个回调函数 func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释放工作是通过 pool.add(t) 实现的),同时调用 release() 方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于 0,那么说明有线程在等待,此时会自动唤醒等待的线程。

17 | ReadWriteLock:如何快速实现一个完备的缓存?

分场景优化性能,提升易用性

读多写少场景。实际工作中,为了优化性能,我们经常会使用缓存,例如缓存元数据、缓存基础数据等,这就是一种典型的读多写少应用场景。

缓存之所以能提升性能,一个重要的条件就是缓存的数据一定是读多写少的,例如元数据和基础数据基本上不会发生变化(写少),但是使用它们的地方却很多(读多)。

针对读多写少这种并发场景,Java SDK 并发包提供了读写锁——ReadWriteLock,非常容易使用,并且性能很好。

那什么是读写锁呢?

  1. 允许多个线程同时读共享变量;
  2. 只允许一个线程写共享变量;
  3. 如果一个写线程正在执行写操作,此时禁止读线程读共享变量。

读写锁与互斥锁的一个重要区别就是读写锁允许多个线程同时读共享变量,而互斥锁是不允许的,这是读写锁在读多写少场景下性能优于互斥锁的关键。但读写锁的写操作是互斥的,当一个线程在写共享变量的时候,是不允许其他线程执行写操作和读操作。

快速实现一个缓存

声明了一个 Cache<K, V> 类,其中类型参数 K 代表缓存里 key 的类型,V 代表缓存里 value 的类型。缓存的数据保存在 Cache 类内部的 HashMap 里面,HashMap 不是线程安全的,这里我们使用读写锁 ReadWriteLock 来保证其线程安全。ReadWriteLock 是一个接口,它的实现类是 ReentrantReadWriteLock,它是支持可重入的。下面我们通过 rwl 创建了一把读锁和一把写锁。

Cache 这个工具类,我们提供了两个方法,一个是读缓存方法 get(),另一个是写缓存方法 put()。读缓存需要用到读锁,读锁的使用和前面我们介绍的 Lock 的使用是相同的,都是 try{}finally{}这个编程范式。写缓存则需要用到写锁,写锁的使用和读锁是类似的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Cache<K,V> {
final Map<K, V> m = new HashMap<>();
final ReadWriteLock rwl = new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
// 写锁
final Lock w = rwl.writeLock();
// 读缓存
V get(K key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
// 写缓存
V put(String key, Data v) {
w.lock();
try { return m.put(key, v); }
finally { w.unlock(); }
}
}

使用缓存首先要解决缓存数据的初始化问题

缓存数据的初始化,可以采用一次性加载的方式,也可以使用按需加载的方式。

如果源头数据的数据量不大,就可以采用一次性加载的方式,这种方式最简单(可参考下图),只需在应用启动的时候把源头数据查询出来,依次调用类似上面示例代码中的 put() 方法就可以了。

image-20220521233020201

如果源头数据量非常大,那么就需要按需加载了,按需加载也叫懒加载,指的是只有当应用查询缓存,并且数据不在缓存里的时候,才触发加载源头相关数据进缓存的操作。

image-20220521233039766

实现缓存的按需加载

如果缓存中没有缓存目标对象,那么就需要从数据库中加载,然后写入缓存,写缓存需要用到写锁,所以在代码中的⑤处,我们调用了 w.lock() 来获取写锁。

还需要注意的是,在获取写锁之后,我们并没有直接去查询数据库,而是在代码⑥⑦处,重新验证了一次缓存中是否存在,再次验证如果还是不存在,我们才去查询数据库并更新本地缓存。为什么我们要再次验证呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class Cache<K,V> {
final Map<K, V> m = new HashMap<>();
final ReadWriteLock rwl = new ReentrantReadWriteLock();
final Lock r = rwl.readLock();
final Lock w = rwl.writeLock();

V get(K key) {
V v = null;
// 读缓存
r.lock(); ①
try {
v = m.get(key); ②
} finally{
r.unlock(); ③
}
// 缓存中存在,返回
if(v != null) { ④
return v;
}
// 缓存中不存在,查询数据库
w.lock(); ⑤
try {
// 再次验证
// 其他线程可能已经查询过数据库
v = m.get(key); ⑥
if(v == null){ ⑦
// 查询数据库
v= 省略代码无数
m.put(key, v);
}
} finally{
w.unlock();
}
return v;
}
}

原因是在高并发的场景下,有可能会有多线程竞争写锁。假设缓存是空的,没有缓存任何东西,如果此时有三个线程 T1、T2 和 T3 同时调用 get() 方法,并且参数 key 也是相同的。那么它们会同时执行到代码⑤处,但此时只有一个线程能够获得写锁,假设是线程 T1,线程 T1 获取写锁之后查询数据库并更新缓存,最终释放写锁。此时线程 T2 和 T3 会再有一个线程能够获取写锁,假设是 T2,如果不采用再次验证的方式,此时 T2 会再次查询数据库。T2 释放写锁之后,T3 也会再次查询一次数据库。而实际上线程 T1 已经把缓存的值设置好了,T2、T3 完全没有必要再次查询数据库。所以,再次验证的方式,能够避免高并发场景下重复查询数据的问题。

读写锁的升级与降级

上面按需加载的示例代码中,在①处获取读锁,在③处释放读锁,那是否可以在②处的下面增加验证缓存并更新缓存的逻辑呢?详细的代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 读缓存
r.lock(); ①
try {
v = m.get(key); ②
if (v == null) {
w.lock();
try {
// 再次验证并更新缓存
// 省略详细代码
} finally{
w.unlock();
}
}
} finally{
r.unlock(); ③
}

这样看上去好像是没有问题的,先是获取读锁,然后再升级为写锁,对此还有个专业的名字,叫锁的升级。可惜 ReadWriteLock 并不支持这种升级。在上面的代码示例中,读锁还没有释放,此时获取写锁,会导致写锁永久等待,最终导致相关线程都被阻塞,永远也没有机会被唤醒。锁的升级是不允许的。

不过,虽然锁的升级是不允许的,但是锁的降级却是允许的。以下代码来源自 ReentrantReadWriteLock 的官方示例,略做了改动。你会发现在代码①处,获取读锁的时候线程还是持有写锁的,这种锁的降级是支持的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class CachedData {
Object data;
volatile boolean cacheValid;
final ReadWriteLock rwl =
new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
// 写锁
final Lock w = rwl.writeLock();

void processCachedData() {
// 获取读锁
r.lock();
if (!cacheValid) {
// 释放读锁,因为不允许读锁的升级
r.unlock();
// 获取写锁
w.lock();
try {
// 再次检查状态
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 释放写锁前,降级为读锁
// 降级是可以的
r.lock(); ①
} finally {
// 释放写锁
w.unlock();
}
}
// 此处仍然持有读锁
try {use(data);}
finally {r.unlock();}
}
}

总结

读写锁类似于 ReentrantLock,也支持公平模式和非公平模式。读锁和写锁都实现了 java.util.concurrent.locks.Lock 接口,所以除了支持 lock() 方法外,tryLock()、lockInterruptibly() 等方法也都是支持的。但是有一点需要注意,那就是只有写锁支持条件变量,读锁是不支持条件变量的,读锁调用 newCondition() 会抛出 UnsupportedOperationException 异常。

用 ReadWriteLock 实现了一个简单的缓存,这个缓存虽然解决了缓存的初始化问题,但是没有解决缓存数据与源头数据的同步问题,这里的数据同步指的是保证缓存数据和源头数据的一致性。解决数据同步问题的一个最简单的方案就是超时机制。所谓超时机制指的是加载进缓存的数据不是长久有效的,而是有时效的,当缓存的数据超过时效,也就是超时之后,这条数据在缓存中就失效了。而访问缓存中失效的数据,会触发缓存重新从源头把数据加载进缓存。

当然也可以在源头数据发生变化时,快速反馈给缓存,但这个就要依赖具体的场景了。例如 MySQL 作为数据源头,可以通过近实时地解析 binlog 来识别数据是否发生了变化,如果发生了变化就将最新的数据推送给缓存。另外,还有一些方案采取的是数据库和缓存的双写方案。

总之,具体采用哪种方案,还是要看应用的场景。

18 | StampedLock:有没有比读写锁更快的锁?

在读多写少的场景中,还有没有更快的技术方案呢?还真有,Java 在 1.8 这个版本里,提供了一种叫 StampedLock 的锁,它的性能就比读写锁还要好。

StampedLock 支持的三种锁模式

ReadWriteLock 支持两种模式:一种是读锁,一种是写锁。而 StampedLock 支持三种模式,分别是:写锁悲观读锁乐观读

其中,写锁、悲观读锁的语义和 ReadWriteLock 的写锁、读锁的语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的。不同的是:StampedLock 里的写锁和悲观读锁加锁成功之后,都会返回一个 stamp;然后解锁的时候,需要传入这个 stamp。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final StampedLock sl =  new StampedLock();

// 获取 / 释放悲观读锁示意代码
long stamp = sl.readLock();
try {
// 省略业务相关代码
} finally {
sl.unlockRead(stamp);
}

// 获取 / 释放写锁示意代码
long stamp = sl.writeLock();
try {
// 省略业务相关代码
} finally {
sl.unlockWrite(stamp);
}

StampedLock 的性能之所以比 ReadWriteLock 还要好,其关键是 StampedLock 支持乐观读的方式。ReadWriteLock 支持多个线程同时读,但是当多个线程同时读的时候,所有的写操作会被阻塞;而 StampedLock 提供的乐观读,是允许一个线程获取写锁的,也就是说不是所有的写操作都被阻塞。

乐观读这个操作是无锁的,所以相比较 ReadWriteLock 的读锁,乐观读的性能更好一些。

在 distanceFromOrigin() 这个方法中,首先通过调用 tryOptimisticRead() 获取了一个 stamp,这里的 tryOptimisticRead() 就是我们前面提到的乐观读。之后将共享变量 x 和 y 读入方法的局部变量中,不过需要注意的是,由于 tryOptimisticRead() 是无锁的,所以共享变量 x 和 y 读入方法局部变量时,x 和 y 有可能被其他线程修改了。因此最后读完之后,还需要再次验证一下是否存在写操作,这个验证操作是通过调用 validate(stamp) 来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Point {
private int x, y;
final StampedLock sl = new StampedLock();
// 计算到原点的距离
int distanceFromOrigin() {
// 乐观读
long stamp = sl.tryOptimisticRead();
// 读入局部变量,
// 读的过程数据可能被修改
int curX = x, curY = y;
// 判断执行读操作期间,
// 是否存在写操作,如果存在,
// 则 sl.validate 返回 false
if (!sl.validate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try {
curX = x;
curY = y;
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}
return Math.sqrt(
curX * curX + curY * curY);
}
}

在上面这个代码示例中,如果执行乐观读操作的期间,存在写操作,会把乐观读升级为悲观读锁。这个做法挺合理的,否则你就需要在一个循环里反复执行乐观读,直到执行乐观读操作的期间没有写操作(只有这样才能保证 x 和 y 的正确性和一致性),而循环读会浪费大量的 CPU。升级为悲观读锁,代码简练且不易出错。

进一步理解乐观读

StampedLock 的乐观读和数据库的乐观锁有异曲同工之妙。

表增加了一个数值型版本号字段 version

数据库里的乐观锁,查询的时候需要把 version 字段查出来,更新的时候要利用 version 字段做验证。这个 version 字段就类似于 StampedLock 里面的 stamp。

StampedLock 使用注意事项

对于读多写少的场景 StampedLock 性能很好,简单的应用场景基本上可以替代 ReadWriteLock,但是StampedLock 的功能仅仅是 ReadWriteLock 的子集,在使用的时候,还是有几个地方需要注意一下。

StampedLock 不支持重入

另外,StampedLock 的悲观读锁、写锁都不支持条件变量

如果线程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上时,此时调用该阻塞线程的 interrupt() 方法,会导致 CPU 飙升

例如下面的代码中,线程 T1 获取写锁之后将自己阻塞,线程 T2 尝试获取悲观读锁,也会阻塞;如果此时调用线程 T2 的 interrupt() 方法来中断线程 T2 的话,你会发现线程 T2 所在 CPU 会飙升到 100%。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final StampedLock lock = new StampedLock();
Thread T1 = new Thread(()->{
// 获取写锁
lock.writeLock();
// 永远阻塞在此处,不释放写锁
LockSupport.park();
});
T1.start();
// 保证 T1 获取写锁
Thread.sleep(100);
Thread T2 = new Thread(()->
// 阻塞在悲观读锁
lock.readLock()
);
T2.start();
// 保证 T2 阻塞在读锁
Thread.sleep(100);
// 中断线程 T2
// 会导致线程 T2 所在 CPU 飙升
T2.interrupt();
T2.join();

所以,**使用 StampedLock 一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁 readLockInterruptibly() 和写锁 writeLockInterruptibly()**。

总结

把 Java 官方示例精简后,形成下面的代码模板

StampedLock 读模板:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final StampedLock sl = new StampedLock();

// 乐观读
long stamp = sl.tryOptimisticRead();
// 读入方法局部变量
......
// 校验 stamp
if (!sl.validate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try {
// 读入方法局部变量
.....
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}
// 使用方法局部变量执行业务操作
......

StampedLock 写模板:

1
2
3
4
5
6
7
long stamp = sl.writeLock();
try {
// 写共享变量
......
} finally {
sl.unlockWrite(stamp);
}

19 | CountDownLatch和CyclicBarrier:如何让多线程步调一致?

对账系统:

用户通过在线商城下单,会生成电子订单,保存在订单库;之后物流会生成派送单给用户发货,派送单保存在派送单库。为了防止漏派送或者重复派送,对账系统每天还会校验是否存在异常订单。

目前对账系统的处理逻辑是首先查询订单,然后查询派送单,之后对比订单和派送单,将差异写入差异库。

image-20220521235656123

核心代码如下,就是在一个单线程里面循环查询订单、派送单,然后执行对账,最后将写入差异库。

1
2
3
4
5
6
7
8
9
10
while(存在未对账订单){
// 查询未对账订单
pos = getPOrders();
// 查询派送单
dos = getDOrders();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}

利用并行优化对账系统

目前的对账系统,由于订单量和派送单量巨大,所以查询未对账订单 getPOrders() 和查询派送单 getDOrders() 相对较慢,那有没有办法快速优化一下呢?目前对账系统是单线程执行的,图形化后是下图这个样子。对于串行化的系统,优化性能首先想到的是能否利用多线程并行处理

image-20220521235744604

对账系统里的瓶颈:查询未对账订单 getPOrders() 和查询派送单 getDOrders() 是否可以并行处理呢?

显然是可以的,因为这两个操作并没有先后顺序的依赖。这两个最耗时的操作并行之后,执行过程如下图所示。对比一下单线程的执行示意图,你会发现同等时间里,并行执行的吞吐量近乎单线程的 2 倍,优化效果还是相对明显的。

image-20220521235836189

在下面的代码中,我们创建了两个线程 T1 和 T2,并行执行查询未对账订单 getPOrders() 和查询派送单 getDOrders() 这两个操作。在主线程中执行对账操作 check() 和差异写入 save() 两个操作。不过需要注意的是:主线程需要等待线程 T1 和 T2 执行完才能执行 check() 和 save() 这两个操作,为此我们通过调用 T1.join() 和 T2.join() 来实现等待,当 T1 和 T2 线程退出时,调用 T1.join() 和 T2.join() 的主线程就会从阻塞态被唤醒,从而执行之后的 check() 和 save()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
while(存在未对账订单){
// 查询未对账订单
Thread T1 = new Thread(()->{
pos = getPOrders();
});
T1.start();
// 查询派送单
Thread T2 = new Thread(()->{
dos = getDOrders();
});
T2.start();
// 等待 T1、T2 结束
T1.join();
T2.join();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}

用 CountDownLatch 实现线程等待

下面的代码就是用线程池优化后的:我们首先创建了一个固定大小为 2 的线程池,之后在 while 循环里重复利用。一切看上去都很顺利,但是有个问题好像无解了,那就是主线程如何知道 getPOrders() 和 getDOrders() 这两个操作什么时候执行完。前面主线程通过调用线程 T1 和 T2 的 join() 方法来等待线程 T1 和 T2 退出,但是在线程池的方案里,线程根本就不会退出,所以 join() 方法已经失效了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 创建 2 个线程的线程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
});

/* ??如何实现等待??*/

// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}

下面的代码示例中,在 while 循环里面,我们首先创建了一个 CountDownLatch,计数器的初始值等于 2,之后在pos = getPOrders();dos = getDOrders();两条语句的后面对计数器执行减 1 操作,这个对计数器减 1 的操作是通过调用 latch.countDown(); 来实现的。在主线程中,我们通过调用 latch.await() 来实现对计数器等于 0 的等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 创建 2 个线程的线程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 计数器初始化为 2
CountDownLatch latch = new CountDownLatch(2);
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
latch.countDown();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
latch.countDown();
});

// 等待两个查询操作结束
latch.await();

// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}

进一步优化性能

前面我们将 getPOrders() 和 getDOrders() 这两个查询操作并行了,但这两个查询操作和对账操作 check()、save() 之间还是串行的。很显然,这两个查询操作和对账操作也是可以并行的,也就是说,在执行对账操作的时候,可以同时去执行下一轮的查询操作,这个过程可以形象化地表述为下面这幅示意图。

image-20220522000213427

如何实现这步优化,两次查询操作能够和对账操作并行,对账操作还依赖查询操作的结果,这明显有点生产者 - 消费者的意思,两次查询操作是生产者,对账操作是消费者。既然是生产者 - 消费者模型,那就需要有个队列,来保存生产者生产的数据,而消费者则从这个队列消费数据。

设计了两个队列,并且两个队列的元素之间还有对应关系。具体如下图所示,订单查询操作将订单查询结果插入订单队列,派送单查询操作将派送单插入派送单队列,这两个队列的元素之间是有一一对应的关系的。两个队列的好处是,对账操作可以每次从订单队列出一个元素,从派送单队列出一个元素,然后对这两个元素执行对账操作,这样数据一定不会乱掉。

image-20220522000336587

下面再来看如何用双队列来实现完全的并行。一个最直接的想法是:一个线程 T1 执行订单的查询工作,一个线程 T2 执行派送单的查询工作,当线程 T1 和 T2 都各自生产完 1 条数据的时候,通知线程 T3 执行对账操作。这个想法虽看上去简单,但其实还隐藏着一个条件,那就是线程 T1 和线程 T2 的工作要步调一致,不能一个跑得太快,一个跑得太慢,只有这样才能做到各自生产完 1 条数据的时候,通知线程 T3。

下面这幅图形象地描述了上面的意图:线程 T1 和线程 T2 只有都生产完 1 条数据的时候,才能一起向下执行,也就是说,线程 T1 和线程 T2 要互相等待,步调要一致;同时当线程 T1 和 T2 都生产完一条数据的时候,还要能够通知线程 T3 执行对账操作。

image-20220522000431876

用 CyclicBarrier 实现线程同步

下面我们就来实现上面提到的方案。这个方案的难点有两个:一个是线程 T1 和 T2 要做到步调一致,另一个是要能够通知到线程 T3。

依然可以利用一个计数器来解决这两个难点,计数器初始化为 2,线程 T1 和 T2 生产完一条数据都将计数器减 1,如果计数器大于 0 则线程 T1 或者 T2 等待。如果计数器等于 0,则通知线程 T3,并唤醒等待的线程 T1 或者 T2,与此同时,将计数器重置为 2,这样线程 T1 和线程 T2 生产下一条数据的时候就可以继续使用这个计数器了。

Java 并发包里也已经提供了相关的工具类:CyclicBarrier

在下面的代码中,我们首先创建了一个计数器初始值为 2 的 CyclicBarrier,需要注意的是创建 CyclicBarrier 的时候,我们还传入了一个回调函数,当计数器减到 0 的时候,会调用这个回调函数。

线程 T1 负责查询订单,当查出一条时,调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;线程 T2 负责查询派送单,当查出一条时,也调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;当 T1 和 T2 都调用 barrier.await() 的时候,计数器会减到 0,此时 T1 和 T2 就可以执行下一条语句了,同时会调用 barrier 的回调函数来执行对账操作。

非常值得一提的是,CyclicBarrier 的计数器有自动重置的功能,当减到 0 的时候,会自动重置你设置的初始值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执行回调的线程池
Executor executor = Executors.newFixedThreadPool(1);
final CyclicBarrier barrier = new CyclicBarrier(2, ()->{
executor.execute(()->check());
});

void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 执行对账操作
diff = check(p, d);
// 差异写入差异库
save(diff);
}

void checkAll(){
// 循环查询订单库
Thread T1 = new Thread(()->{
while(存在未对账订单){
// 查询订单库
pos.add(getPOrders());
// 等待
barrier.await();
}
});
T1.start();
// 循环查询运单库
Thread T2 = new Thread(()->{
while(存在未对账订单){
// 查询运单库
dos.add(getDOrders());
// 等待
barrier.await();
}
});
T2.start();
}

总结

CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点;

CyclicBarrier 是一组线程之间互相等待,更像是几个驴友之间不离不弃。除此之外 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。但CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富。

另外,线程池提供了 Future 特性,我们也可以利用 Future 特性来实现线程之间的等待

20 | 并发容器:都有哪些“坑”需要我们填?

同步容器及其注意事项

如何将非线程安全的容器变成线程安全的容器?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
SafeArrayList<T>{
// 封装 ArrayList
List<T> c = new ArrayList<>();
// 控制访问路径
synchronized
T get(int idx){
return c.get(idx);
}

synchronized
void add(int idx, T t) {
c.add(idx, t);
}

synchronized
boolean addIfNotExist(T t){
if(!c.contains(t)) {
c.add(t);
return true;
}
return false;
}
}
1
2
3
List list = Collections.synchronizedList(new ArrayList());
Set set = Collections.synchronizedSet(new HashSet());
Map map = Collections.synchronizedMap(new HashMap());

组合操作需要注意竞态条件问题,例如上面提到的 addIfNotExist() 方法就包含组合操作。组合操作往往隐藏着竞态条件问题,即便每个操作都能保证原子性,也并不能保证组合操作的原子性,这个一定要注意。

在容器领域一个容易被忽视的“坑”是用迭代器遍历容器

例如在下面的代码中,通过迭代器遍历容器 list,对每个元素调用 foo() 方法,这就存在并发问题,这些组合的操作不具备原子性。

1
2
3
4
List list = Collections.synchronizedList(new ArrayList());
Iterator i = list.iterator();
while (i.hasNext())
foo(i.next());

而正确做法是下面这样,锁住 list 之后再执行遍历操作。如果你查看 Collections 内部的包装类源码,你会发现包装类的公共方法锁的是对象的 this,其实就是我们这里的 list,所以锁住 list 绝对是线程安全的。

1
2
3
4
5
6
List list = Collections.synchronizedList(new ArrayList());
synchronized (list) {
Iterator i = list.iterator();
while (i.hasNext())
foo(i.next());
}

Java 提供的同步容器还有 Vector、Stack 和 Hashtable,这三个容器不是基于包装类实现的,但同样是基于 synchronized 实现的,对这三个容器的遍历,同样要加锁保证互斥。

并发容器及其注意事项

Java 在 1.5 版本之前所谓的线程安全的容器,主要指的就是同步容器。不过同步容器有个最大的问题,那就是性能差,所有方法都用 synchronized 来保证互斥,串行度太高了。因此 Java 在 1.5 及之后版本提供了性能更高的容器,我们一般称为并发容器

List、Map、Set 和 Queue

image-20220522001321022

(一)List

List 里面只有一个实现类就是CopyOnWriteArrayList。CopyOnWrite,顾名思义就是写的时候会将共享变量新复制一份出来,这样做的好处是读操作完全无锁。

那 CopyOnWriteArrayList 的实现原理是怎样的呢?

CopyOnWriteArrayList 内部维护了一个数组,成员变量 array 就指向这个内部数组,所有的读操作都是基于 array 进行的,如下图所示,迭代器 Iterator 遍历的就是 array 数组。

image-20220522001432415

如果在遍历 array 的同时,还有一个写操作,例如增加元素,CopyOnWriteArrayList 是如何处理的呢?CopyOnWriteArrayList 会将 array 复制一份,然后在新复制处理的数组上执行增加元素的操作,执行完之后再将 array 指向这个新的数组。

读写是可以并行的,遍历操作一直都是基于原 array 执行,而写操作则是基于新 array 进行。

image-20220522001504546

使用 CopyOnWriteArrayList 需要注意的“坑”主要有两个方面。一个是应用场景,CopyOnWriteArrayList 仅适用于写操作非常少的场景,而且能够容忍读写的短暂不一致。例如上面的例子中,写入的新元素并不能立刻被遍历到。另一个需要注意的是,CopyOnWriteArrayList 迭代器是只读的,不支持增删改。因为迭代器遍历的仅仅是一个快照,而对快照进行增删改是没有意义的。

(二)Map

Map 接口的两个实现是 ConcurrentHashMap 和 ConcurrentSkipListMap,它们从应用的角度来看,主要区别在于ConcurrentHashMap 的 key 是无序的,而 ConcurrentSkipListMap 的 key 是有序的。所以如果你需要保证 key 的顺序,就只能使用 ConcurrentSkipListMap。

使用 ConcurrentHashMap 和 ConcurrentSkipListMap 需要注意的地方是,它们的 key 和 value 都不能为空,否则会抛出NullPointerException这个运行时异常。

image-20220522001602684

ConcurrentSkipListMap 里面的 SkipList 本身就是一种数据结构,中文一般都翻译为“跳表”。跳表插入、删除、查询操作平均的时间复杂度是 O(log n),理论上和并发线程数没有关系,所以在并发度非常高的情况下,若你对 ConcurrentHashMap 的性能还不满意,可以尝试一下 ConcurrentSkipListMap。

(三)Set

Set 接口的两个实现是 CopyOnWriteArraySet 和 ConcurrentSkipListSet,使用场景可以参考前面讲述的 CopyOnWriteArrayList 和 ConcurrentSkipListMap,它们的原理都是一样的

(四)Queue

一个维度是阻塞与非阻塞,所谓阻塞指的是当队列已满时,入队操作阻塞;当队列已空时,出队操作阻塞。另一个维度是单端与双端,单端指的是只能队尾入队,队首出队;而双端指的是队首队尾皆可入队出队。Java 并发包里阻塞队列都用 Blocking 关键字标识,单端队列使用 Queue 标识,双端队列使用 Deque 标识

这两个维度组合后,可以将 Queue 细分为四大类,分别是:

1.单端阻塞队列:其实现有 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、LinkedTransferQueue、PriorityBlockingQueue 和 DelayQueue。内部一般会持有一个队列,这个队列可以是数组(其实现是 ArrayBlockingQueue)也可以是链表(其实现是 LinkedBlockingQueue);甚至还可以不持有队列(其实现是 SynchronousQueue),此时生产者线程的入队操作必须等待消费者线程的出队操作。而 LinkedTransferQueue 融合 LinkedBlockingQueue 和 SynchronousQueue 的功能,性能比 LinkedBlockingQueue 更好;PriorityBlockingQueue 支持按照优先级出队;DelayQueue 支持延时出队。

image-20220522001724676

2.双端阻塞队列:其实现是 LinkedBlockingDeque。

image-20220522001739160

3.单端非阻塞队列:其实现是 ConcurrentLinkedQueue。
4.双端非阻塞队列:其实现是 ConcurrentLinkedDeque。

使用队列时,需要格外注意队列是否支持有界(所谓有界指的是内部的队列是否有容量限制)。实际工作中,一般都不建议使用无界的队列,因为数据量大了之后很容易导致 OOM。上面我们提到的这些 Queue 中,只有 ArrayBlockingQueue 和 LinkedBlockingQueue 是支持有界的,所以在使用其他无界队列时,一定要充分考虑是否存在导致 OOM 的隐患

Java7中的HashMap在执行put操作时会涉及到扩容,由于扩容时链表并发操作会造成链表成环,所以可能导致cpu飙升100%。

21 | 原子类:无锁工具类的典范

前面我们多次提到一个累加器的例子,示例代码如下。在这个例子中,add10K() 这个方法不是线程安全的,问题就出在变量 count 的可见性和 count+=1 的原子性上。可见性问题可以用 volatile 来解决,而原子性问题我们前面一直都是采用的互斥锁方案。

1
2
3
4
5
6
7
8
9
public class Test {
long count = 0;
void add10K() {
int idx = 0;
while(idx++ < 10000) {
count += 1;
}
}
}

对于简单的原子性问题,还有一种无锁方案。Java SDK 并发包将这种无锁方案封装提炼之后,实现了一系列的原子类

在下面的代码中,我们将原来的 long 型变量 count 替换为了原子类 AtomicLong,原来的 count +=1 替换成了 count.getAndIncrement()

1
2
3
4
5
6
7
8
9
public class Test {
AtomicLong count = new AtomicLong(0);
void add10K() {
int idx = 0;
while(idx++ < 10000) {
count.getAndIncrement();
}
}
}

无锁方案相对互斥锁方案,最大的好处就是性能。互斥锁方案为了保证互斥性,需要执行加锁、解锁操作,而加锁、解锁操作本身就消耗性能;同时拿不到锁的线程还会进入阻塞状态,进而触发线程切换,线程切换对性能的消耗也很大。 相比之下,无锁方案则完全没有加锁、解锁的性能消耗,同时还能保证互斥性,既解决了问题,又没有带来新的问题,可谓绝佳方案。那它是如何做到的呢?

无锁方案的实现原理

CPU 为了解决并发问题,提供了 CAS 指令(CAS,全称是 Compare And Swap,即“比较并交换”)。CAS 指令包含 3 个参数:共享变量的内存地址 A、用于比较的值 B 和共享变量的新值 C;并且只有当内存中地址 A 处的值等于 B 时,才能将内存中地址 A 处的值更新为新值 C。作为一条 CPU 指令,CAS 指令本身是能够保证原子性的

可以通过下面 CAS 指令的模拟代码来理解 CAS 的工作原理。在下面的模拟程序中有两个参数,一个是期望值 expect,另一个是需要写入的新值 newValue,只有当目前 count 的值和期望值 expect 相等时,才会将 count 更新为 newValue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class SimulatedCAS{
int count;
synchronized int cas(
int expect, int newValue){
// 读目前 count 的值
int curValue = count;
// 比较目前 count 值是否 == 期望值
if(curValue == expect){
// 如果是,则更新 count 的值
count = newValue;
}
// 返回写入前的值
return curValue;
}
}

使用 CAS 来解决并发问题,一般都会伴随着自旋,而所谓自旋,其实就是循环尝试。例如,实现一个线程安全的count += 1操作,“CAS+ 自旋”的实现方案如下所示,首先计算 newValue = count+1,如果 cas(count,newValue) 返回的值不等于 count,则意味着线程在执行完代码①处之后,执行代码②处之前,count 的值被其他线程更新过。那此时该怎么处理呢?可以采用自旋方案,就像下面代码中展示的,可以重新读 count 最新的值来计算 newValue 并尝试再次更新,直到成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class SimulatedCAS{
volatile int count;
// 实现 count+=1
addOne(){
do {
newValue = count+1; //①
}while(count !=
cas(count,newValue) //②
}
// 模拟实现 CAS,仅用来帮助理解
synchronized int cas(
int expect, int newValue){
// 读目前 count 的值
int curValue = count;
// 比较目前 count 值是否 == 期望值
if(curValue == expect){
// 如果是,则更新 count 的值
count= newValue;
}
// 返回写入前的值
return curValue;
}
}

什么是 ABA 问题呢?

如果 cas(count,newValue) 返回的值等于count,是否就能够认为 count 的值没有被其他线程更新过呢?显然不是的,假设 count 原本是 A,线程 T1 在执行完代码①处之后,执行代码②处之前,有可能 count 被线程 T2 更新成了 B,之后又被 T3 更新回了 A,这样线程 T1 虽然看到的一直是 A,但是其实已经被其他线程更新过了,这就是 ABA 问题。

所以在使用 CAS 方案的时候,一定要先 check 一下。

看 Java 如何实现原子化的 count += 1

在 Java 1.8 版本中,getAndIncrement() 方法会转调 unsafe.getAndAddLong() 方法。这里 this 和 valueOffset 两个参数可以唯一确定共享变量的内存地址。

1
2
3
4
final long getAndIncrement() {
return unsafe.getAndAddLong(
this, valueOffset, 1L);
}

unsafe.getAndAddLong() 方法的源码如下,该方法首先会在内存中读取共享变量的值,之后循环调用 compareAndSwapLong() 方法来尝试设置共享变量的值,直到成功为止。compareAndSwapLong() 是一个 native 方法,只有当内存中共享变量的值等于 expected 时,才会将共享变量的值更新为 x,并且返回 true;否则返回 fasle。compareAndSwapLong 的语义和 CAS 指令的语义的差别仅仅是返回值不同而已。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final long getAndAddLong(
Object o, long offset, long delta){
long v;
do {
// 读取内存中的值
v = getLongVolatile(o, offset);
} while (!compareAndSwapLong(
o, offset, v, v + delta));
return v;
}
// 原子性地将变量更新为 x
// 条件是内存中的值等于 expected
// 更新成功则返回 true
native boolean compareAndSwapLong(
Object o, long offset,
long expected,
long x);

Java 提供的原子类里面 CAS 一般被实现为 compareAndSet(),compareAndSet() 的语义和 CAS 指令的语义的差别仅仅是返回值不同而已,compareAndSet() 里面如果更新成功,则会返回 true,否则返回 false。

1
2
3
4
5
6
7
do {
// 获取当前值
oldV = xxxx;
// 根据当前值计算新值
newV = ...oldV...
}while(!compareAndSet(oldV,newV);

原子类概览

原子化的基本数据类型、原子化的对象引用类型、原子化数组、原子化对象属性更新器原子化的累加器

image-20220522002429603

1. 原子化的基本数据类型

AtomicBoolean、AtomicInteger 和 AtomicLong

2. 原子化的对象引用类型

AtomicReference、AtomicStampedReference 和 AtomicMarkableReference

对象引用的更新需要重点关注 ABA 问题,AtomicStampedReference 和 AtomicMarkableReference 这两个原子类可以解决 ABA 问题。

解决 ABA 问题的思路其实很简单,增加一个版本号维度就可以了。和乐观锁机制很类似,每次执行 CAS 操作,附加再更新一个版本号,只要保证版本号是递增的,那么即便 A 变成 B 之后再变回 A,版本号也不会变回来(版本号递增的)。AtomicStampedReference 实现的 CAS 方法就增加了版本号参数,方法签名如下:

1
2
3
4
5
boolean compareAndSet(
V expectedReference,
V newReference,
int expectedStamp,
int newStamp)

AtomicMarkableReference 的实现机制则更简单,将版本号简化成了一个 Boolean 值,方法签名如下:

1
2
3
4
5
boolean compareAndSet(
V expectedReference,
V newReference,
boolean expectedMark,
boolean newMark)

3. 原子化数组

AtomicIntegerArray、AtomicLongArray 和 AtomicReferenceArray

4. 原子化对象属性更新器

AtomicIntegerFieldUpdater、AtomicLongFieldUpdater 和 AtomicReferenceFieldUpdater

利用它们可以原子化地更新对象的属性,这三个方法都是利用反射机制实现的,创建更新器的方法如下:

1
2
3
4
public static <U>
AtomicXXXFieldUpdater<U>
newUpdater(Class<U> tclass,
String fieldName)

对象属性必须是 volatile 类型的,只有这样才能保证可见性;如果对象属性不是 volatile 类型的,newUpdater() 方法会抛出 IllegalArgumentException 这个运行时异常。

5. 原子化的累加器

DoubleAccumulator、DoubleAdder、LongAccumulator 和 LongAdder,这四个类仅仅用来执行累加操作,相比原子化的基本数据类型,速度更快,但是不支持 compareAndSet() 方法。如果你仅仅需要累加操作,使用原子化的累加器性能会更好。

上面我们所有原子类的方法都是针对一个共享变量的,如果你需要解决多个变量的原子性问题,建议还是使用互斥锁方案。原子类虽好,但使用要慎之又慎。

22 | Executor与线程池:如何创建正确的线程池?

创建对象,仅仅是在 JVM 的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,这个成本就很高了,所以线程是一个重量级的对象,应该避免频繁创建和销毁

一般意义上的池化资源,都是下面这样,当你需要资源的时候就调用 acquire() 方法来申请资源,用完之后就调用 release() 释放资源。若你带着这个固有模型来看并发包里线程池相关的工具类时,会很遗憾地发现它们完全匹配不上,Java 提供的线程池里面压根就没有申请线程和释放线程的方法。

1
2
3
4
5
6
7
8
class XXXPool{
// 获取池化资源
XXX acquire() {
}
// 释放池化资源
void release(XXX x){
}
}

线程池是一种生产者 - 消费者模式

为什么线程池没有采用一般意义上池化资源的设计方法呢?如果线程池采用一般意义上池化资源的设计方法,应该是下面示例代码这样。你可以来思考一下,假设我们获取到一个空闲线程 T1,然后该如何使用 T1 呢?你期望的可能是这样:通过调用 T1 的 execute() 方法,传入一个 Runnable 对象来执行具体业务逻辑,就像通过构造函数 Thread(Runnable target) 创建线程一样。可惜的是,你翻遍 Thread 对象的所有方法,都不存在类似 execute(Runnable target) 这样的公共方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 采用一般意义上池化资源的设计方法
class ThreadPool{
// 获取空闲线程
Thread acquire() {
}
// 释放线程
void release(Thread t){
}
}
// 期望的使用
ThreadPool pool;
Thread T1=pool.acquire();
// 传入 Runnable 对象
T1.execute(()->{
// 具体业务逻辑
......
});

所以,线程池的设计,没有办法直接采用一般意义上池化资源的设计方法

线程池的使用方是生产者,线程池本身是消费者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 简化的线程池,仅用来说明工作原理
class MyThreadPool{
// 利用阻塞队列实现生产者 - 消费者模式
BlockingQueue<Runnable> workQueue;
// 保存内部工作线程
List<WorkerThread> threads = new ArrayList<>();
// 构造方法
MyThreadPool(int poolSize,
BlockingQueue<Runnable> workQueue){
this.workQueue = workQueue;
// 创建工作线程
for(int idx=0; idx<poolSize; idx++){
WorkerThread work = new WorkerThread();
work.start();
threads.add(work);
}
}
// 提交任务
void execute(Runnable command){
workQueue.put(command);
}
// 工作线程负责消费任务,并执行任务
class WorkerThread extends Thread{
public void run() {
// 循环取任务并执行
while(true){ ①
Runnable task = workQueue.take();
task.run();
}
}
}
}

/** 下面是使用示例 **/
// 创建有界阻塞队列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);
// 创建线程池
MyThreadPool pool = new MyThreadPool(10, workQueue);
// 提交任务
pool.execute(()->{
System.out.println("hello");
});

在 MyThreadPool 的内部,我们维护了一个阻塞队列 workQueue 和一组工作线程,工作线程的个数由构造函数中的 poolSize 来指定。用户通过调用 execute() 方法来提交 Runnable 任务,execute() 方法的内部实现仅仅是将任务加入到 workQueue 中。MyThreadPool 内部维护的工作线程会消费 workQueue 中的任务并执行任务,相关的代码就是代码①处的 while 循环。

如何使用 Java 中的线程池

最核心的是ThreadPoolExecutor

它强调的是 Executor,而不是一般意义上的池化资源。

1
2
3
4
5
6
7
8
9
ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

可以把线程池类比为一个项目组,而线程就是项目组的成员

  • corePoolSize:表示线程池保有的最小线程数。有些项目很闲,但是也不能把人都撤了,至少要留 corePoolSize 个人坚守阵地。
  • maximumPoolSize:表示线程池创建的最大线程数。当项目很忙时,就需要加人,但是也不能无限制地加,最多就加到 maximumPoolSize 个人。当项目闲下来时,就要撤人了,最多能撤到 corePoolSize 个人。
  • keepAliveTime & unit:上面提到项目根据忙闲来增减人员,那在编程世界里,如何定义忙和闲呢?很简单,一个线程如果在一段时间内,都没有执行任务,说明很闲,keepAliveTime 和 unit 就是用来定义这个“一段时间”的参数。也就是说,如果一个线程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了。
  • workQueue:工作队列,和上面示例代码的工作队列同义。
  • threadFactory:通过这个参数你可以自定义如何创建线程,例如你可以给线程指定一个有意义的名字。
  • handler:通过这个参数你可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定。ThreadPoolExecutor 已经提供了以下 4 种策略。
    • CallerRunsPolicy:提交任务的线程自己去执行该任务。
    • AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。
    • DiscardPolicy:直接丢弃任务,没有任何异常抛出。
    • DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。

使用线程池要注意些什么

利用 Executors 你可以快速创建线程池

Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列

使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略。因此默认拒绝策略要慎重使用。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。

使用线程池,还要注意异常处理的问题,例如通过 ThreadPoolExecutor 对象的 execute() 方法提交任务时,如果任务在执行的过程中出现运行时异常,会导致执行任务的线程终止;不过,最致命的是任务虽然异常了,但是你却获取不到任何通知,这会让你误以为任务都执行得很正常。虽然线程池提供了很多用于异常处理的方法,但是最稳妥和简单的方案还是捕获所有异常并按需处理,参考下面的示例代码。

1
2
3
4
5
6
7
try {
// 业务逻辑
} catch (RuntimeException x) {
// 按需处理
} catch (Throwable x) {
// 按需处理
}

23 | Future:如何用多线程实现最优的“烧水泡茶”程序?

ThreadPoolExecutor 的 void execute(Runnable command) 方法,利用这个方法虽然可以提交任务,但是却没有办法获取任务的执行结果(execute() 方法没有返回值)。而很多场景下,我们又都是需要获取任务的执行结果的。那 ThreadPoolExecutor 是否提供了相关功能呢?

如何获取任务执行结果

Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。

1
2
3
4
5
6
// 提交 Runnable 任务
Future<?> submit(Runnable task);
// 提交 Callable 任务
<T> Future<T> submit(Callable<T> task);
// 提交 Runnable 任务及结果引用
<T> Future<T> submit(Runnable task, T result);

它们的返回值都是 Future 接口,Future 接口有 5 个方法

它们分别是**取消任务的方法 cancel()、判断任务是否已取消的方法 isCancelled()、判断任务是否已结束的方法 isDone()*以及*2 个获得任务执行结果的 get() 和 get(timeout, unit)

其中最后一个 get(timeout, unit) 支持超时机制。通过 Future 接口的这 5 个方法你会发现,我们提交的任务不但能够获取任务执行结果,还可以取消任务。不过需要注意的是:这两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。

1
2
3
4
5
6
7
8
9
10
// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 判断任务是否已取消
boolean isCancelled();
// 判断任务是否已结束
boolean isDone();
// 获得任务执行结果
get();
// 获得任务执行结果,支持超时
get(long timeout, TimeUnit unit);

这 3 个 submit() 方法之间的区别在于方法参数不同

  1. 提交 Runnable 任务 submit(Runnable task) :这个方法的参数是一个 Runnable 接口,Runnable 接口的 run() 方法是没有返回值的,所以 submit(Runnable task) 这个方法返回的 Future 仅可以用来断言任务已经结束了,类似于 Thread.join()。
  2. 提交 Callable 任务 submit(Callable<T> task):这个方法的参数是一个 Callable 接口,它只有一个 call() 方法,并且这个方法是有返回值的,所以这个方法返回的 Future 对象可以通过调用其 get() 方法来获取任务的执行结果。
  3. 提交 Runnable 任务及结果引用 submit(Runnable task, T result):这个方法很有意思,假设这个方法返回的 Future 对象是 f,f.get() 的返回值就是传给 submit() 方法的参数 result。这个方法该怎么用呢?下面这段示例代码展示了它的经典用法。需要注意的是 Runnable 接口的实现类 Task 声明了一个有参构造函数 Task(Result r) ,创建 Task 对象的时候传入了 result 对象,这样就能在类 Task 的 run() 方法中对 result 进行各种操作了。result 相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
ExecutorService executor = Executors.newFixedThreadPool(1);
// 创建 Result 对象 r
Result r = new Result();
r.setAAA(a);
// 提交任务
Future<Result> future = executor.submit(new Task(r), r);
Result fr = future.get();
// 下面等式成立
fr === r;
fr.getAAA() === a;
fr.getXXX() === x

class Task implements Runnable{
Result r;
// 通过构造函数传入 result
Task(Result r){
this.r = r;
}
void run() {
// 可以操作 result
a = r.getAAA();
r.setXXX(x);
}
}

FutureTask 工具类

Future 是一个接口,而 FutureTask 是一个实实在在的工具类,这个工具类有两个构造函数,它们的参数和前面介绍的 submit() 方法类似

1
2
FutureTask(Callable<V> callable);
FutureTask(Runnable runnable, V result);

FutureTask 实现了 Runnable 和 Future 接口,由于实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;又因为实现了 Future 接口,所以也能用来获得任务的执行结果。下面的示例代码是将 FutureTask 对象提交给 ThreadPoolExecutor 去执行。

1
2
3
4
5
6
7
8
// 创建 FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);
// 创建线程池
ExecutorService es = Executors.newCachedThreadPool();
// 提交 FutureTask
es.submit(futureTask);
// 获取计算结果
Integer result = futureTask.get();

利用 FutureTask 对象可以很容易获取子线程的执行结果。

1
2
3
4
5
6
7
// 创建 FutureTask
FutureTask<Integer> futureTask= new FutureTask<>(()-> 1+2);
// 创建并启动线程
Thread T1 = new Thread(futureTask);
T1.start();
// 获取计算结果
Integer result = futureTask.get();

实现最优的“烧水泡茶”程序

记得以前初中语文课文里有一篇著名数学家华罗庚先生的文章《统筹方法》,这篇文章里介绍了一个烧水泡茶的例子,文中提到最优的工序应该是下面这样:

image-20220522004003766

下面我们用程序来模拟一下这个最优工序。

并发编程可以总结为三个核心问题:分工、同步和互斥。

编写并发程序,首先要做的就是分工,所谓分工指的是如何高效地拆解任务并分配给线程。

对于烧水泡茶这个程序,一种最优的分工方案可以是下图所示的这样:用两个线程 T1 和 T2 来完成烧水泡茶程序,T1 负责洗水壶、烧开水、泡茶这三道工序,T2 负责洗茶壶、洗茶杯、拿茶叶三道工序,其中 T1 在执行泡茶这道工序时需要等待 T2 完成拿茶叶的工序。

对于 T1 的这个等待动作,你应该可以想出很多种办法,例如 Thread.join()、CountDownLatch,甚至阻塞队列都可以解决,不过今天我们用 Future 特性来实现。

image-20220522004130939

首先,我们创建了两个 FutureTask——ft1 和 ft2,ft1 完成洗水壶、烧开水、泡茶的任务,ft2 完成洗茶壶、洗茶杯、拿茶叶的任务;

这里需要注意的是 ft1 这个任务在执行泡茶任务前,需要等待 ft2 把茶叶拿来,所以 ft1 内部需要引用 ft2,并在执行泡茶之前,调用 ft2 的 get() 方法实现等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 创建任务 T2 的 FutureTask
FutureTask<String> ft2= new FutureTask<>(new T2Task());
// 创建任务 T1 的 FutureTask
FutureTask<String> ft1= new FutureTask<>(new T1Task(ft2));
// 线程 T1 执行任务 ft1
Thread T1 = new Thread(ft1);
T1.start();
// 线程 T2 执行任务 ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等待线程 T1 执行结果
System.out.println(ft1.get());

// T1Task 需要执行的任务:
// 洗水壶、烧开水、泡茶
class T1Task implements Callable<String>{
FutureTask<String> ft2;
// T1 任务需要 T2 任务的 FutureTask
T1Task(FutureTask<String> ft2){
this.ft2 = ft2;
}
@Override
String call() throws Exception {
System.out.println("T1: 洗水壶...");
TimeUnit.SECONDS.sleep(1);

System.out.println("T1: 烧开水...");
TimeUnit.SECONDS.sleep(15);
// 获取 T2 线程的茶叶
String tf = ft2.get();
System.out.println("T1: 拿到茶叶:"+tf);

System.out.println("T1: 泡茶...");
return " 上茶:" + tf;
}
}
// T2Task 需要执行的任务:
// 洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {
@Override
String call() throws Exception {
System.out.println("T2: 洗茶壶...");
TimeUnit.SECONDS.sleep(1);

System.out.println("T2: 洗茶杯...");
TimeUnit.SECONDS.sleep(2);

System.out.println("T2: 拿茶叶...");
TimeUnit.SECONDS.sleep(1);
return " 龙井 ";
}
}
// 一次执行结果:
T1: 洗水壶...
T2: 洗茶壶...
T1: 烧开水...
T2: 洗茶杯...
T2: 拿茶叶...
T1: 拿到茶叶: 龙井
T1: 泡茶...
上茶: 龙井

总结

利用 Java 并发包提供的 Future 可以很容易获得异步任务的执行结果

Future 可以类比为现实世界里的提货单,比如去蛋糕店订生日蛋糕,蛋糕店都是先给你一张提货单,你拿到提货单之后,没有必要一直在店里等着,可以先去干点其他事,比如看场电影;等看完电影后,基本上蛋糕也做好了,然后你就可以凭提货单领蛋糕了。

利用多线程可以快速将一些串行的任务并行化,从而提高性能;如果任务之间有依赖关系,比如当前任务依赖前一个任务的执行结果,这种问题基本上都可以用 Future 来解决

在分析这种问题的过程中,建议用有向图描述一下任务之间的依赖关系,同时将线程的分工也做好,类似于烧水泡茶最优分工方案那幅图。对照图来写代码,好处是更形象,且不易出错。

24 | CompletableFuture:异步编程没那么难

CompletableFuture 的核心优势

为了领略 CompletableFuture 异步编程的优势,这里我们用 CompletableFuture 重新实现前面曾提及的烧水泡茶程序。

首先还是需要先完成分工方案,在下面的程序中,我们分了 3 个任务:任务 1 负责洗水壶、烧开水,任务 2 负责洗茶壶、洗茶杯和拿茶叶,任务 3 负责泡茶。其中任务 3 要等待任务 1 和任务 2 都完成后才能开始。这个分工如下图所示。

image-20220522004605089

  1. 无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;
  2. 语义更清晰,例如 f3 = f1.thenCombine(f2, ()->{}) 能够清晰地表述“任务 3 要等待任务 1 和任务 2 都完成后才能开始”;
  3. 代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 任务 1:洗水壶 -> 烧开水
CompletableFuture<Void> f1 = CompletableFuture.runAsync(()->{
System.out.println("T1: 洗水壶...");
sleep(1, TimeUnit.SECONDS);

System.out.println("T1: 烧开水...");
sleep(15, TimeUnit.SECONDS);
});
// 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
System.out.println("T2: 洗茶壶...");
sleep(1, TimeUnit.SECONDS);

System.out.println("T2: 洗茶杯...");
sleep(2, TimeUnit.SECONDS);

System.out.println("T2: 拿茶叶...");
sleep(1, TimeUnit.SECONDS);
return " 龙井 ";
});
// 任务 3:任务 1 和任务 2 完成后执行:泡茶
CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf)->{
System.out.println("T1: 拿到茶叶:" + tf);
System.out.println("T1: 泡茶...");
return " 上茶:" + tf;
});
// 等待任务 3 执行结果
System.out.println(f3.join());

void sleep(int t, TimeUnit u) {
try {
u.sleep(t);
}catch(InterruptedException e){}
}
// 一次执行结果:
T1: 洗水壶...
T2: 洗茶壶...
T1: 烧开水...
T2: 洗茶杯...
T2: 拿茶叶...
T1: 拿到茶叶: 龙井
T1: 泡茶...
上茶: 龙井

领略 CompletableFuture 异步编程的优势之后,下面我们详细介绍 CompletableFuture 的使用,首先是如何创建 CompletableFuture 对象。

创建 CompletableFuture 对象

创建 CompletableFuture 对象主要靠下面代码中展示的这 4 个静态方法,我们先看前两个。

runAsync(Runnable runnable)supplyAsync(Supplier<U> supplier),它们之间的区别是:Runnable 接口的 run() 方法没有返回值,而 Supplier 接口的 get() 方法是有返回值的。

前两个方法和后两个方法的区别在于:后两个方法可以指定线程池参数。

默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰

1
2
3
4
5
6
// 使用默认线程池
static CompletableFuture<Void> runAsync(Runnable runnable)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 可以指定线程池
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

创建完 CompletableFuture 对象之后,会自动地异步执行 runnable.run() 方法或者 supplier.get() 方法,对于一个异步操作,你需要关注两个问题:一个是异步操作什么时候结束,另一个是如何获取异步操作的执行结果。因为 CompletableFuture 类实现了 Future 接口,所以这两个问题你都可以通过 Future 接口来解决。另外,CompletableFuture 类还实现了 CompletionStage 接口

如何理解 CompletionStage 接口

任务是有时序关系的,比如有串行关系、并行关系、汇聚关系等。

里还举前面烧水泡茶的例子,其中洗水壶和烧开水就是串行关系,洗水壶、烧开水和洗茶壶、洗茶杯这两组任务之间就是并行关系,而烧开水、拿茶叶和泡茶就是汇聚关系。

image-20220522005022203

CompletionStage 接口可以清晰地描述任务之间的这种时序关系,例如前面提到的 f3 = f1.thenCombine(f2, ()->{}) 描述的就是一种汇聚关系。烧水泡茶程序中的汇聚关系是一种 AND 聚合关系,这里的 AND 指的是所有依赖的任务(烧开水和拿茶叶)都完成后才开始执行当前任务(泡茶)。既然有 AND 聚合关系,那就一定还有 OR 聚合关系,所谓 OR 指的是依赖的任务只要有一个完成就可以执行当前任务。

在编程领域,还有一个绕不过去的山头,那就是异常处理,CompletionStage 接口也可以方便地描述异常处理。

1. 描述串行关系

CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口。

thenApply 系列函数里参数 fn 的类型是接口 Function<T, R>,这个接口里与 CompletionStage 相关的方法是 R apply(T t),这个方法既能接收参数也支持返回值,所以 thenApply 系列方法返回的是CompletionStage<R>

而 thenAccept 系列方法里参数 consumer 的类型是接口Consumer<T>,这个接口里与 CompletionStage 相关的方法是 void accept(T t),这个方法虽然支持参数,但却不支持回值,所以 thenAccept 系列方法返回的是CompletionStage<Void>

thenRun 系列方法里 action 的参数是 Runnable,所以 action 既不能接收参数也不支持返回值,所以 thenRun 系列方法返回的也是CompletionStage<Void>

这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。其中,需要你注意的是 thenCompose 系列方法,这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列是相同的。

1
2
3
4
5
6
7
8
CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);

首先通过 supplyAsync() 启动一个异步流程,之后是两个串行操作

不过,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖②的执行结果。

1
2
3
4
5
6
7
8
CompletableFuture<String> f0 = CompletableFuture.supplyAsync(
() -> "Hello World") //①
.thenApply(s -> s + " QQ") //②
.thenApply(String::toUpperCase);//③

System.out.println(f0.join());
// 输出结果
HELLO WORLD QQ

2. 描述 AND 汇聚关系

CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。

1
2
3
4
5
6
CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);

3. 描述 OR 汇聚关系

CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。

1
2
3
4
5
6
CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);

下面的示例代码展示了如何使用 applyToEither() 方法来描述一个 OR 汇聚关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CompletableFuture<String> f1 = 
CompletableFuture.supplyAsync(()->{
int t = getRandom(5, 10);
sleep(t, TimeUnit.SECONDS);
return String.valueOf(t);
});

CompletableFuture<String> f2 =
CompletableFuture.supplyAsync(()->{
int t = getRandom(5, 10);
sleep(t, TimeUnit.SECONDS);
return String.valueOf(t);
});

CompletableFuture<String> f3 =
f1.applyToEither(f2,s -> s);

System.out.println(f3.join());

4. 异常处理

虽然上面我们提到的 fn、consumer、action 它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常,例如下面的代码,执行 7/0 就会出现除零错误这个运行时异常。非异步编程里面,我们可以使用 try{}catch{}来捕获并处理异常,那在异步编程里面,异常该如何处理呢?

1
2
3
4
5
CompletableFuture<Integer> 
f0 = CompletableFuture.
.supplyAsync(()->(7/0))
.thenApply(r->r*10);
System.out.println(f0.join());

CompletionStage 接口给我们提供的方案非常简单,比 try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。

1
2
3
4
5
CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);

下面的示例代码展示了如何使用 exceptionally() 方法来处理异常,exceptionally() 的使用非常类似于 try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。既然有 try{}catch{},那就一定还有 try{}finally{},whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{},无论是否发生异常都会执行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。

1
2
3
4
5
6
CompletableFuture<Integer> 
f0 = CompletableFuture
.supplyAsync(()->7/0))
.thenApply(r->r*10)
.exceptionally(e->0);
System.out.println(f0.join());

总结

异步编程已经慢慢开始成熟,Java 语言也开始官方支持异步编程:在 1.8 版本提供了 CompletableFuture,在 Java 9 版本则提供了更加完备的 Flow API,异步编程目前已经完全工业化。因此,学好异步编程还是很有必要的。

25 | CompletionService:如何批量执行异步任务?

如何优化一个询价应用的核心代码?如果采用“ThreadPoolExecutor+Future”的方案,你的优化结果很可能是下面示例代码这样:用三个线程异步执行询价,通过三次调用 Future 的 get() 方法获取询价结果,之后将询价结果保存在数据库中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 异步向电商 S1 询价
Future<Integer> f1 = executor.submit(
()->getPriceByS1());
// 异步向电商 S2 询价
Future<Integer> f2 = executor.submit(
()->getPriceByS2());
// 异步向电商 S3 询价
Future<Integer> f3 = executor.submit(
()->getPriceByS3());

// 获取电商 S1 报价并保存
r=f1.get();
executor.execute(()->save(r));

// 获取电商 S2 报价并保存
r=f2.get();
executor.execute(()->save(r));

// 获取电商 S3 报价并保存
r=f3.get();
executor.execute(()->save(r));

上面的这个方案本身没有太大问题,但是有个地方的处理需要你注意,那就是如果获取电商 S1 报价的耗时很长,那么即便获取电商 S2 报价的耗时很短,也无法让保存 S2 报价的操作先执行,因为这个主线程都阻塞在了 f1.get() 操作上。这点小瑕疵你该如何解决呢?

估计你已经想到了,增加一个阻塞队列,获取到 S1、S2、S3 的报价都进入阻塞队列,然后在主线程中消费阻塞队列,这样就能保证先获取到的报价先保存到数据库了。下面的示例代码展示了如何利用阻塞队列实现先获取到的报价先保存到数据库。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 创建阻塞队列
BlockingQueue<Integer> bq = new LinkedBlockingQueue<>();
// 电商 S1 报价异步进入阻塞队列
executor.execute(()-> bq.put(f1.get()));
// 电商 S2 报价异步进入阻塞队列
executor.execute(()-> bq.put(f2.get()));
// 电商 S3 报价异步进入阻塞队列
executor.execute(()-> bq.put(f3.get()));
// 异步保存所有报价
for (int i=0; i<3; i++) {
Integer r = bq.take();
executor.execute(()->save(r));
}

利用 CompletionService 实现询价系统

不过在实际项目中,并不建议你这样做,因为 Java SDK 并发包里已经提供了设计精良的 CompletionService。利用 CompletionService 不但能帮你解决先获取到的报价先保存到数据库的问题,而且还能让代码更简练。

CompletionService 的实现原理也是内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果加入到阻塞队列中,不同的是 CompletionService 是把任务执行结果的 Future 对象加入到阻塞队列中,而上面的示例代码是把任务最终的执行结果放入了阻塞队列中。

那到底该如何创建 CompletionService 呢?

CompletionService 接口的实现类是 ExecutorCompletionService,这个实现类的构造方法有两个,分别是:

  1. ExecutorCompletionService(Executor executor)
  2. ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)

这两个构造方法都需要传入一个线程池,如果不指定 completionQueue,那么默认会使用无界的 LinkedBlockingQueue。任务执行结果的 Future 对象就是加入到 completionQueue 中。

下面的示例代码完整地展示了如何利用 CompletionService 来实现高性能的询价系统。其中,我们没有指定 completionQueue,因此默认使用无界的 LinkedBlockingQueue。之后通过 CompletionService 接口提供的 submit() 方法提交了三个询价操作,这三个询价操作将会被 CompletionService 异步执行。最后,我们通过 CompletionService 接口提供的 take() 方法获取一个 Future 对象(前面我们提到过,加入到阻塞队列中的是任务执行结果的 Future 对象),调用 Future 对象的 get() 方法就能返回询价操作的执行结果了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 异步向电商 S1 询价
cs.submit(()->getPriceByS1());
// 异步向电商 S2 询价
cs.submit(()->getPriceByS2());
// 异步向电商 S3 询价
cs.submit(()->getPriceByS3());
// 将询价结果异步保存到数据库
for (int i=0; i<3; i++) {
Integer r = cs.take().get();
executor.execute(()->save(r));
}

// todo 记录 2022/5/22/ 1:04

总结

当需要批量提交异步任务的时候建议你使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。除此之外,CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如 Forking Cluster 这样的需求。

26 | Fork/Join:单机版的MapReduce

我们介绍了线程池、Future、CompletableFuture 和 CompletionService,仔细观察会发现这些工具类都是在帮助我们站在任务的视角来解决并发问题,而不是让我们纠缠在线程之间如何协作的细节上(比如线程之间如何实现等待、通知等)。对于简单的并行任务,可以通过“线程池 +Future”的方案来解决;如果任务之间有聚合关系,无论是 AND 聚合还是 OR 聚合,都可以通过 CompletableFuture 来解决;而批量的并行任务,则可以通过 CompletionService 来解决。

我们一直讲,并发编程可以分为三个层面的问题,分别是分工、协作和互斥,当你关注于任务的时候,你会发现你的视角已经从并发编程的细节中跳出来了,应用的更多的是现实世界的思维模式,类比的往往是现实世界里的分工,所以把线程池、Future、CompletableFuture 和 CompletionService 都列到了分工里面。

用现实世界里的工作流程图描述了并发编程领域的简单并行任务、聚合任务和批量并行任务

image-20220522010739794

上面提到的简单并行、聚合、批量并行这三种任务模型,基本上能够覆盖日常工作中的并发场景了,但还是不够全面,因为还有一种“分治”的任务模型没有覆盖到。分治,顾名思义,即分而治之,是一种解决复杂问题的思维方法和模式;具体来讲,指的是把一个复杂的问题分解成多个相似的子问题,然后再把子问题分解成更小的子问题,直到子问题简单到可以直接求解。理论上来讲,解决每一个问题都对应着一个任务,所以对于问题的分治,实际上就是对于任务的分治。

分治思想在很多领域都有广泛的应用,例如算法领域有分治算法(归并排序、快速排序都属于分治算法,二分法查找也是一种分治算法);大数据领域知名的计算框架 MapReduce 背后的思想也是分治。既然分治这种任务模型如此普遍,那 Java 显然也需要支持,Java 并发包里提供了一种叫做 Fork/Join 的并行计算框架,就是用来支持分治这种任务模型的。

分治任务模型

分治任务模型可分为两个阶段:一个阶段是任务分解,也就是将任务迭代地分解为子任务,直至子任务可以直接计算出结果;另一个阶段是结果合并,即逐层合并子任务的执行结果,直至获得最终结果。下图是一个简化的分治任务模型

image-20220522010848805

在这个分治任务模型里,任务和分解后的子任务具有相似性,这种相似性往往体现在任务和子任务的算法是相同的,但是计算的数据规模是不同的。具备这种相似性的问题,我们往往都采用递归算法。

Fork/Join 的使用

Fork/Join 是一个并行计算的框架,主要就是用来支持分治任务模型的,这个计算框架里的Fork 对应的是分治任务模型里的任务分解,Join 对应的是结果合并。Fork/Join 计算框架主要包含两部分,一部分是分治任务的线程池 ForkJoinPool,另一部分是分治任务 ForkJoinTask。这两部分的关系类似于 ThreadPoolExecutor 和 Runnable 的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型 ForkJoinTask。

ForkJoinTask 是一个抽象类,它的方法有很多,最核心的是 fork() 方法和 join() 方法,其中 fork() 方法会异步地执行一个子任务,而 join() 方法则会阻塞当前线程来等待子任务的执行结果。ForkJoinTask 有两个子类——RecursiveAction 和 RecursiveTask

它们都是用递归的方式来处理分治任务的。这两个子类都定义了抽象方法 compute(),不过区别是 RecursiveAction 定义的 compute() 没有返回值,而 RecursiveTask 定义的 compute() 方法是有返回值的。这两个子类也是抽象类,在使用的时候,需要定义子类去扩展。

接下来我们就来实现一下,看看如何用 Fork/Join 这个并行计算框架计算斐波那契数列(下面的代码源自 Java 官方示例)。首先我们需要创建一个分治任务线程池以及计算斐波那契数列的分治任务,之后通过调用分治任务线程池的 invoke() 方法来启动分治任务。由于计算斐波那契数列需要有返回值,所以 Fibonacci 继承自 RecursiveTask。分治任务 Fibonacci 需要实现 compute() 方法,这个方法里面的逻辑和普通计算斐波那契数列非常类似,区别之处在于计算 Fibonacci(n - 1) 使用了异步子任务,这是通过 f1.fork() 这条语句实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static void main(String[] args){
// 创建分治任务线程池
ForkJoinPool fjp = new ForkJoinPool(4);
// 创建分治任务
Fibonacci fib = new Fibonacci(30);
// 启动分治任务
Integer result = fjp.invoke(fib);
// 输出结果
System.out.println(result);
}
// 递归任务
static class Fibonacci extends RecursiveTask<Integer>{
final int n;
Fibonacci(int n){this.n = n;}
protected Integer compute(){
if (n <= 1)
return n;
Fibonacci f1 = new Fibonacci(n - 1);
// 创建子任务
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
// 等待子任务结果,并合并结果
return f2.compute() + f1.join();
}
}

ForkJoinPool 工作原理

Fork/Join 并行计算的核心组件是 ForkJoinPool

ThreadPoolExecutor 本质上是一个生产者 - 消费者模式的实现,内部有一个任务队列,这个任务队列是生产者和消费者通信的媒介;ThreadPoolExecutor 可以有多个工作线程,但是这些工作线程都共享一个任务队列。

ForkJoinPool 本质上也是一个生产者 - 消费者的实现,但是更加智能,可以参考下面的 ForkJoinPool 工作原理图来理解其原理。ThreadPoolExecutor 内部只有一个任务队列,而 ForkJoinPool 内部有多个任务队列,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中。

如果工作线程对应的任务队列空了,是不是就没活儿干了呢?不是的,ForkJoinPool 支持一种叫做“任务窃取”的机制,如果工作线程空闲了,那它可以“窃取”其他工作任务队列里的任务,例如下图中,线程 T2 对应的任务队列已经空了,它可以“窃取”线程 T1 对应的任务队列的任务。如此一来,所有的工作线程都不会闲下来了。

ForkJoinPool 中的任务队列采用的是双端队列,工作线程正常获取任务和“窃取任务”分别是从任务队列不同的端消费,这样能避免很多不必要的数据竞争。我们这里介绍的仅仅是简化后的原理,ForkJoinPool 的实现远比我们这里介绍的复杂

image-20220522011320668

模拟 MapReduce 统计单词数量

学习 MapReduce 有一个入门程序,统计一个文件里面每个单词的数量,下面我们来看看如何用 Fork/Join 并行计算框架来实现。

我们可以先用二分法递归地将一个文件拆分成更小的文件,直到文件里只有一行数据,然后统计这一行数据里单词的数量,最后再逐级汇总结果

思路有了,我们马上来实现。下面的示例程序用一个字符串数组 String[] fc 来模拟文件内容,fc 里面的元素与文件里面的行数据一一对应。关键的代码在 compute() 这个方法里面,这是一个递归方法,前半部分数据 fork 一个递归任务去处理(关键代码 mr1.fork()),后半部分数据则在当前任务中递归处理(mr2.compute())。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
static void main(String[] args){
String[] fc = {"hello world",
"hello me",
"hello fork",
"hello join",
"fork join in world"};
// 创建 ForkJoin 线程池
ForkJoinPool fjp = new ForkJoinPool(3);
// 创建任务
MR mr = new MR( fc, 0, fc.length);
// 启动任务
Map<String, Long> result = fjp.invoke(mr);
// 输出结果
result.forEach((k, v)-> System.out.println(k+":"+v));
}
//MR 模拟类
static class MR extends RecursiveTask<Map<String, Long>> {
private String[] fc;
private int start, end;
// 构造函数
MR(String[] fc, int fr, int to){
this.fc = fc;
this.start = fr;
this.end = to;
}
@Override protected
Map<String, Long> compute(){
if (end - start == 1) {
return calc(fc[start]);
} else {
int mid = (start+end)/2;
MR mr1 = new MR(fc, start, mid);
mr1.fork();
MR mr2 = new MR( fc, mid, end);
// 计算子任务,并返回合并的结果
return merge(mr2.compute(),mr1.join());
}
}
// 合并结果
private Map<String, Long> merge(Map<String, Long> r1, Map<String, Long> r2) {
Map<String, Long> result = new HashMap<>();
result.putAll(r1);
// 合并结果
r2.forEach((k, v) -> {
Long c = result.get(k);
if (c != null)
result.put(k, c+v);
else
result.put(k, v);
});
return result;
}
// 统计单词数量
private Map<String, Long> calc(String line) {
Map<String, Long> result = new HashMap<>();
// 分割单词
String [] words = line.split("\\s+");
// 统计单词数量
for (String w : words) {
Long v = result.get(w);
if (v != null)
result.put(w, v+1);
else
result.put(w, 1L);
}
return result;
}
}

总结

Fork/Join 并行计算框架主要解决的是分治任务。

Java 1.8 提供的 Stream API 里面并行流也是以 ForkJoinPool 为基础的。不过需要注意的是,默认情况下所有的并行流计算都共享一个 ForkJoinPool,这个共享的 ForkJoinPool 默认的线程数是 CPU 的核数;如果所有的并行流计算都是 CPU 密集型计算的话,完全没有问题,但是如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。所以建议用不同的 ForkJoinPool 执行不同类型的计算任务。参考Doug Lea 的论文